消息隊列是任務間通訊最靈活和強大的功能。相比郵箱只能傳遞4個字節,消息隊列則支持自定義長度。另外消息隊列則在消息層面體現的是一種字節流的概念,沒有具體數據類型綁定,這麼作就隨便用戶定義消息類型,對於消息隊列來講都是透明的很是靈活。不過因爲消息隊列實現中須要進行內存拷貝,因此效率倒是最低的,尤爲對於長度較長的消息進行通訊時要謹慎。php
-
消息隊列類型 -
消息隊列建立和初始化 -
消息隊列發送 -
消息隊列接收
消息隊列類型html
向👉滑動查看所有node
/** * message queue structure */struct rt_messagequeue{ struct rt_ipc_object parent; /**< inheritfrom ipc_object */ void *msg_pool; /**< startaddress of message queue */ rt_uint16_t msg_size; /**< messagesize of each message */ rt_uint16_t max_msgs; /**< maxnumber of messages */ rt_uint16_t entry; /**< index ofmessages in the queue */ void *msg_queue_head; /**< list head*/ void *msg_queue_tail; /**< list tail*/ void *msg_queue_free; /**< pointerindicated the free node of queue */};typedef struct rt_messagequeue* rt_mq_t;
從類型字段上來看,首先依然能夠看出消息隊列與其它IPC同樣都繼承自struct rt_ipc_objectgit
下面來分析餘下的擴展字段:github
msg_pool:消息隊列容納消息體的內存塊地址web
msg_size:消息隊列裏消息的長度瀏覽器
max_msgs:消息隊列可容納的消息個數微信
entry:消息隊列當前已存放的消息個數app
msg_queue_head:消息隊列裏最前面的消息函數
msg_queue_tail:消息隊列裏最後面的消息
msg_queue_free:消息隊列裏第一個空閒位置
不過還須要先介紹一下另外一個消息隊列的內部類型:
struct rt_mq_message{ struct rt_mq_message *next;};
這個類型時消息隊列裏面採用鏈表實現的鏈表原型,其實意義不是很大,僅僅做爲隊列的一個索引用。
消息隊列建立和初始化
/** *This function will create a message queue object from system resource * *@param name the name of message queue *@param msg_size the size of message *@param max_msgs the maximum number of message in queue *@param flag the flag of message queue * *@return the created message queue, RT_NULL on error happen */rt_mq_t rt_mq_create(const char *name, rt_size_t msg_size, rt_size_t max_msgs, rt_uint8_t flag){ struct rt_messagequeue *mq; struct rt_mq_message *head; register rt_base_t temp; RT_DEBUG_NOT_IN_INTERRUPT; /* allocate object */ mq = (rt_mq_t)rt_object_allocate(RT_Object_Class_MessageQueue, name); if (mq == RT_NULL) return mq; /* set parent */ mq->parent.parent.flag= flag; /* init ipcobject */ rt_ipc_object_init(&(mq->parent)); /* init messagequeue */ /* get correct message size */ mq->msg_size = RT_ALIGN(msg_size, RT_ALIGN_SIZE); mq->max_msgs = max_msgs; /* allocate message pool */ mq->msg_pool = RT_KERNEL_MALLOC((mq->msg_size + sizeof(struct rt_mq_message)) *mq->max_msgs); if (mq->msg_pool == RT_NULL) { rt_mq_delete(mq); return RT_NULL; } /* init messagelist */ mq->msg_queue_head = RT_NULL; mq->msg_queue_tail = RT_NULL; /* init message empty list */ mq->msg_queue_free = RT_NULL; for (temp = 0; temp < mq->max_msgs; temp++) { head = (struct rt_mq_message *)((rt_uint8_t*)mq->msg_pool + temp * (mq->msg_size + sizeof(struct rt_mq_message))); head->next = (struct rt_mq_message *)mq->msg_queue_free; mq->msg_queue_free = head; } /* the initial entry is zero */ mq->entry = 0; return mq;}
先看函數的參數:
name:消息隊列的object名稱
msg_size:消息的大小(注意不是消息隊列的大小)
max_msgs:消息隊列的大小(最多容納多少條消息)
flag:IPC等待隊列的類型(FIFO/PRIO)
/** *This function will send a message to message queue object, if there are *threads suspended on message queue object, it will be waked up. * *@param mq the message queue object *@param buffer the message *@param size the size of buffer * *@return the error code */rt_err_t rt_mq_send(rt_mq_t mq, const void *buffer,rt_size_t size){ register rt_ubase_t temp; struct rt_mq_message *msg; /* parameter check */ RT_ASSERT(mq != RT_NULL); RT_ASSERT(rt_object_get_type(&mq->parent.parent) == RT_Object_Class_MessageQueue); RT_ASSERT(buffer != RT_NULL); RT_ASSERT(size !=0); /* greater than one message size */ if (size > mq->msg_size) return-RT_ERROR; RT_OBJECT_HOOK_CALL(rt_object_put_hook, (&(mq->parent.parent))); /* disable interrupt */ temp = rt_hw_interrupt_disable(); /* get a free list, there must be an empty item */ msg = (struct rt_mq_message *)mq->msg_queue_free; /* message queue is full */ if (msg == RT_NULL) { /* enable interrupt */ rt_hw_interrupt_enable(temp); return-RT_EFULL; } /* move free list pointer */ mq->msg_queue_free=msg->next; /* enable interrupt */ rt_hw_interrupt_enable(temp); /* the msg isthe new tailer of list, the next shall be NULL */ msg->next = RT_NULL; /* copy buffer*/ rt_memcpy(msg + 1, buffer, size); /* disable interrupt */ temp = rt_hw_interrupt_disable(); /* link msg to message queue */ if (mq->msg_queue_tail != RT_NULL) { /* if the tail exists, */ ((struct rt_mq_message *)mq->msg_queue_tail)->next = msg; } /* set new tail*/ mq->msg_queue_tail = msg; /* if the headis empty, set head */ if (mq->msg_queue_head == RT_NULL) mq->msg_queue_head = msg; /* increase message entry */ mq->entry++; /* resume suspended thread */ if (!rt_list_isempty(&mq->parent.suspend_thread)) { rt_ipc_list_resume(&(mq->parent.suspend_thread)); /* enable interrupt */ rt_hw_interrupt_enable(temp); rt_schedule(); return RT_EOK; } /* enable interrupt */ rt_hw_interrupt_enable(temp); return RT_EOK;}
/** *This function will send an urgent message to message queue object, which *means the message will be inserted to the head of message queue. If there *are threads suspended on message queue object, it will be waked up. * *@param mq the message queue object *@param buffer the message *@param size the size of buffer * *@return the error code */rt_err_t rt_mq_urgent(rt_mq_t mq, const void *buffer, rt_size_t size){ register rt_ubase_t temp; struct rt_mq_message *msg; /* parametercheck */ RT_ASSERT(mq != RT_NULL); RT_ASSERT(rt_object_get_type(&mq->parent.parent) == RT_Object_Class_MessageQueue); RT_ASSERT(buffer != RT_NULL); RT_ASSERT(size != 0); /* greater than one message size */ if (size > mq->msg_size) return -RT_ERROR; RT_OBJECT_HOOK_CALL(rt_object_put_hook, (&(mq->parent.parent))); /* disable interrupt */ temp = rt_hw_interrupt_disable(); /* get a free list, there must be an empty item */ msg = (struct rt_mq_message *)mq->msg_queue_free; /* message queue is full */ if (msg == RT_NULL) { /* enable interrupt */ rt_hw_interrupt_enable(temp); return -RT_EFULL; } /* move free list pointer */ mq->msg_queue_free = msg->next; /* enable interrupt */ rt_hw_interrupt_enable(temp); /* copy buffer*/ rt_memcpy(msg + 1, buffer, size); /* disable interrupt */ temp = rt_hw_interrupt_disable(); /* link msg to the beginning of message queue */ msg->next = (struct rt_mq_message *)mq->msg_queue_head; mq->msg_queue_head = msg; /* if there is no tail */ if (mq->msg_queue_tail == RT_NULL) mq->msg_queue_tail = msg; /* increase message entry */ mq->entry++; /* resume suspended thread */ if (!rt_list_isempty(&mq->parent.suspend_thread)) { rt_ipc_list_resume(&(mq->parent.suspend_thread)); /* enableinterrupt */ rt_hw_interrupt_enable(temp); rt_schedule(); return RT_EOK; } /* enable interrupt */ rt_hw_interrupt_enable(temp); return RT_EOK;}
msg->next= (struct rt_mq_message *)mq->msg_queue_head;
mq->msg_queue_head= msg;
消息隊列接收
/** *This function will receive a message from message queue object, if there is *no message in message queue object, the thread shall wait for a specified *time. * *@param mq the message queue object *@param buffer the received message will be saved in *@param size the size of buffer *@param timeout the waiting time * *@return the error code */rt_err_t rt_mq_recv(rt_mq_t mq, void *buffer, rt_size_t size, rt_int32_t timeout){ struct rt_thread *thread; register rt_ubase_t temp; struct rt_mq_message *msg; rt_uint32_t tick_delta; /* parametercheck */ RT_ASSERT(mq != RT_NULL); RT_ASSERT(rt_object_get_type(&mq->parent.parent) == RT_Object_Class_MessageQueue); RT_ASSERT(buffer != RT_NULL); RT_ASSERT(size != 0); /* initialize delta tick */ tick_delta = 0; /* get current thread */ thread = rt_thread_self(); RT_OBJECT_HOOK_CALL(rt_object_trytake_hook, (&(mq->parent.parent))); /* disable interrupt */ temp =rt_hw_interrupt_disable(); /* for non-blocking call */ if (mq->entry == 0 && timeout == 0) { rt_hw_interrupt_enable(temp); return -RT_ETIMEOUT; } /* message queue is empty */ while (mq->entry == 0) { RT_DEBUG_IN_THREAD_CONTEXT; /* reset error number in thread */ thread->error= RT_EOK; /* no waiting, return timeout */ if (timeout == 0) { /* enable interrupt */ rt_hw_interrupt_enable(temp); thread->error = -RT_ETIMEOUT; return -RT_ETIMEOUT; } /* suspend current thread */ rt_ipc_list_suspend(&(mq->parent.suspend_thread), thread, mq->parent.parent.flag); /* has waiting time, start thread timer */ if (timeout > 0) { /* get the start tick of timer */ tick_delta = rt_tick_get(); RT_DEBUG_LOG(RT_DEBUG_IPC, ("set thread:%s to timer list", thread->name)); /* reset the timeout of thread timer and start it */ rt_timer_control(&(thread->thread_timer), RT_TIMER_CTRL_SET_TIME, &timeout); rt_timer_start(&(thread->thread_timer)); } /* enable interrupt */ rt_hw_interrupt_enable(temp); /* re-schedule*/ rt_schedule(); /* recv message*/ if (thread->error != RT_EOK) { /* return error */ return thread->error; } /* disable interrupt */ temp = rt_hw_interrupt_disable(); /* if it's not waiting forever and then re-calculate timeout tick */ if (timeout > 0) { tick_delta = rt_tick_get() - tick_delta; timeout -= tick_delta; if (timeout < 0) timeout = 0; } } /* get message from queue */ msg = (struct rt_mq_message *)mq->msg_queue_head; /* move message queue head */ mq->msg_queue_head = msg->next; /* reach queue tail, set to NULL */ if (mq->msg_queue_tail == msg) mq->msg_queue_tail = RT_NULL; /* decrease message entry */ mq->entry--; /* enable interrupt */ rt_hw_interrupt_enable(temp); /* copy message*/ rt_memcpy(buffer, msg + 1, size > mq->msg_size ? mq->msg_size : size); /* disable interrupt*/ temp = rt_hw_interrupt_disable(); /* put message to free list */ msg->next = (struct rt_mq_message *)mq->msg_queue_free; mq->msg_queue_free = msg; /* enable interrupt */ rt_hw_interrupt_enable(temp); RT_OBJECT_HOOK_CALL(rt_object_take_hook, (&(mq->parent.parent))); return RT_EOK;}
buffer:接收消息的內存地址
size:接收消息的長度
timeout:當消息隊列爲空時將根據timeout來決定是否等待
RT-Thread線上活動
一、【RT-Thread能力認證考試12月——RCEA】通過第一次考試的驗證,RT-Thread能力認證獲得了更多社區開發者和產業界的大力支持!(點此查看)若是您有晉升、求職、尋找更好機會的須要,有深刻學習和掌握RT-Thread的需求,歡迎垂詢/報考!
能力認證官網連接:https://www.rt-thread.org/page/rac.html(在外部瀏覽器打開)
當即報名
#題外話# 喜歡RT-Thread不要忘了在GitHub上留下你的STAR哦,你的star對咱們來講很是重要!連接地址:https://github.com/RT-Thread/rt-thread
你能夠添加微信18917005679爲好友,註明:公司+姓名,拉進 RT-Thread 官方微信交流羣
RT-Thread
讓物聯網終端的開發變得簡單、快速,芯片的價值獲得最大化發揮。Apache2.0協議,可免費在商業產品中使用,不須要公佈源碼,無潛在商業風險。
長按二維碼,關注咱們
點擊閱讀原文進入Github
本文分享自微信公衆號 - RTThread物聯網操做系統(RTThread)。
若有侵權,請聯繫 support@oschina.cn 刪除。
本文參與「OSC源創計劃」,歡迎正在閱讀的你也加入,一塊兒分享。