源碼解讀·RT-Thread操做系統IPC之消息隊列

消息隊列是任務間通訊最靈活和強大的功能。相比郵箱只能傳遞4個字節,消息隊列則支持自定義長度。另外消息隊列則在消息層面體現的是一種字節流的概念,沒有具體數據類型綁定,這麼作就隨便用戶定義消息類型,對於消息隊列來講都是透明的很是靈活。不過因爲消息隊列實現中須要進行內存拷貝,因此效率倒是最低的,尤爲對於長度較長的消息進行通訊時要謹慎。php


本篇文章牽涉到的內容以下:
  1. 消息隊列類型
  2. 消息隊列建立和初始化
  3. 消息隊列發送
  4. 消息隊列接收

消息隊列類型html

向👉滑動查看所有node

/** * message queue structure */struct rt_messagequeue{ struct rt_ipc_object parent; /**< inheritfrom ipc_object */  void *msg_pool; /**< startaddress of message queue */  rt_uint16_t msg_size; /**< messagesize of each message */ rt_uint16_t max_msgs; /**< maxnumber of messages */  rt_uint16_t entry; /**< index ofmessages in the queue */  void *msg_queue_head; /**< list head*/ void *msg_queue_tail; /**< list tail*/ void *msg_queue_free; /**< pointerindicated the free node of queue */};typedef struct rt_messagequeue* rt_mq_t;


從類型字段上來看,首先依然能夠看出消息隊列與其它IPC同樣都繼承自struct rt_ipc_objectgit


下面來分析餘下的擴展字段:github

msg_pool:消息隊列容納消息體的內存塊地址web

msg_size:消息隊列裏消息的長度瀏覽器

max_msgs:消息隊列可容納的消息個數微信

entry:消息隊列當前已存放的消息個數app

msg_queue_head:消息隊列裏最前面的消息函數

msg_queue_tail:消息隊列裏最後面的消息

msg_queue_free:消息隊列裏第一個空閒位置


關於這些字段如今不須要過多解釋,在後續接口實現裏能夠看到具體的用途。


不過還須要先介紹一下另外一個消息隊列的內部類型:

struct rt_mq_message{ struct rt_mq_message *next;};


這個類型時消息隊列裏面採用鏈表實現的鏈表原型,其實意義不是很大,僅僅做爲隊列的一個索引用。

消息隊列建立和初始化

消息隊列建立依然使用動態建立爲例來分析:
/** *This function will create a message queue object from system resource * *@param name the name of message queue *@param msg_size the size of message *@param max_msgs the maximum number of message in queue *@param flag the flag of message queue * *@return the created message queue, RT_NULL on error happen */rt_mq_t rt_mq_create(const char *name, rt_size_t msg_size, rt_size_t max_msgs, rt_uint8_t flag){ struct rt_messagequeue *mq; struct rt_mq_message *head; register rt_base_t temp;    RT_DEBUG_NOT_IN_INTERRUPT;  /* allocate object */ mq = (rt_mq_t)rt_object_allocate(RT_Object_Class_MessageQueue, name); if (mq == RT_NULL) return mq;  /* set parent */ mq->parent.parent.flag= flag;  /* init ipcobject */ rt_ipc_object_init(&(mq->parent));  /* init messagequeue */  /* get correct message size */ mq->msg_size = RT_ALIGN(msg_size, RT_ALIGN_SIZE); mq->max_msgs = max_msgs;  /* allocate message pool */ mq->msg_pool = RT_KERNEL_MALLOC((mq->msg_size + sizeof(struct rt_mq_message)) *mq->max_msgs); if (mq->msg_pool == RT_NULL) { rt_mq_delete(mq);  return RT_NULL; }  /* init messagelist */ mq->msg_queue_head = RT_NULL; mq->msg_queue_tail = RT_NULL;  /* init message empty list */ mq->msg_queue_free = RT_NULL;   for (temp = 0; temp < mq->max_msgs; temp++) { head = (struct rt_mq_message *)((rt_uint8_t*)mq->msg_pool + temp * (mq->msg_size + sizeof(struct rt_mq_message))); head->next = (struct rt_mq_message *)mq->msg_queue_free; mq->msg_queue_free = head; }  /* the initial entry is zero */ mq->entry = 0;  return mq;}


先看函數的參數:

name:消息隊列的object名稱

msg_size:消息的大小(注意不是消息隊列的大小)

max_msgs:消息隊列的大小(最多容納多少條消息)

flag:IPC等待隊列的類型(FIFO/PRIO)


而後看函數內部實現,最開始分配RT_Object_Class_MessageQueue類型的object對象,並初始化,這和其它IPC沒什麼兩樣。接着計算msg_size並對大小進行調整到對齊。接着分配消息隊列內存塊時要仔細看分配時的大小: (mq->msg_size+sizeof(struct rt_mq_message)) *mq->max_msgs)爲何不是msg_size*max_msgs呢?這就跟消息隊列的實現有關了,消息隊列內部會將每一個消息經過rt_mq_message類型的鏈表給連接起來,因此會額外在每一個消息體前面放一個rt_mq_message鏈表節點。


而後接着往下看一個for循環塊,這塊代碼初看起來總感受哪裏有點怪怪的。仔細一看原來是將消息隊列裏每一個存儲消息的位置經過一個倒序的鏈表給鏈接起來了。首先咱們前面分配到一塊連續的內存空間,而後每一個消息佔用的大小是rt_mq_message+msg_size,因此使用(rt_mq_message+msg_size)*temp來得到消息在這片連續內存區裏的索引。而後在每條消息的起始位置放一個rt_mq_message指針,並一次鏈接起來。同時初始化head/tail/free等指針。初始化完後,此時整個消息隊列的狀態以下:  

   

消息隊列發送

消息隊列發送有兩個API,一個是基於FIFO模型的rt_mq_send,一個是基於LIFO模型的rt_mq_urgent。先來看rt_mq_send:
/** *This function will send a message to message queue object, if there are *threads suspended on message queue object, it will be waked up. * *@param mq the message queue object *@param buffer the message *@param size the size of buffer * *@return the error code */rt_err_t rt_mq_send(rt_mq_t mq, const void *buffer,rt_size_t size){ register rt_ubase_t temp; struct rt_mq_message *msg;  /* parameter check */ RT_ASSERT(mq != RT_NULL); RT_ASSERT(rt_object_get_type(&mq->parent.parent) == RT_Object_Class_MessageQueue); RT_ASSERT(buffer != RT_NULL); RT_ASSERT(size !=0);  /* greater than one message size */ if (size > mq->msg_size) return-RT_ERROR;  RT_OBJECT_HOOK_CALL(rt_object_put_hook, (&(mq->parent.parent)));  /* disable interrupt */ temp = rt_hw_interrupt_disable();  /* get a free list, there must be an empty item */ msg = (struct rt_mq_message *)mq->msg_queue_free; /* message queue is full */ if (msg == RT_NULL) { /* enable interrupt */ rt_hw_interrupt_enable(temp);  return-RT_EFULL; } /* move free list pointer */ mq->msg_queue_free=msg->next;  /* enable interrupt */ rt_hw_interrupt_enable(temp);  /* the msg isthe new tailer of list, the next shall be NULL */ msg->next = RT_NULL; /* copy buffer*/ rt_memcpy(msg + 1, buffer, size);  /* disable interrupt */ temp = rt_hw_interrupt_disable(); /* link msg to message queue */   if (mq->msg_queue_tail != RT_NULL) { /* if the tail exists, */ ((struct rt_mq_message *)mq->msg_queue_tail)->next = msg; }  /* set new tail*/ mq->msg_queue_tail = msg; /* if the headis empty, set head */ if (mq->msg_queue_head == RT_NULL) mq->msg_queue_head = msg;  /* increase message entry */ mq->entry++;  /* resume suspended thread */ if (!rt_list_isempty(&mq->parent.suspend_thread)) { rt_ipc_list_resume(&(mq->parent.suspend_thread));  /* enable interrupt */ rt_hw_interrupt_enable(temp);  rt_schedule();  return RT_EOK; }  /* enable interrupt */ rt_hw_interrupt_enable(temp);  return RT_EOK;}


先看函數的參數,mq爲建立或初始化過的消息隊列對象句柄。buffer爲所須要發送的消息指針,size爲所要發送消息的大小。固然size不要超過buffer的內存大小也不要超過初始化時指定的消息大小。超過buffer大小那麼就是內存越界,超過初始化時指定的大小則會發送失敗。可是卻能夠小於初始化時指定的大小。


看函數內部實現,直接跳過前面的部分,從關中斷的臨界區開始看。首先從msg_queue_free取出一個空閒節點(msg = (struct rt_mq_message *)mq->msg_queue_free;),若是不爲NULL,也就是消息隊列不滿的時候,則表明還能夠往消息隊列裏存入消息。那麼將msg_queue_free = msg->next;也就是將msg_queue_free指向下一個空閒區域,等價msg_queue_free = msg_queue_free->next。而後往消息頭部的鏈表節點後面開始拷貝發送的消息:rt_memcpy(msg +1, buffer, size);並接着初始化或者調整msg_queue_tail/head兩個指針。若是msg_queue_tail/head指向NULL,則說明消息隊列尚未消息,直接讓tail和head指向這個消息便可。不然tail則須要調整,指向這個新消息,表示最後一個發送來的消息,同時也代表新消息是放入隊列末尾(先入先出FIFO模型)。接着消息發送處理完成,對entry進行計數,而後看IPC等待隊列是否須要調度。


/** *This function will send an urgent message to message queue object, which *means the message will be inserted to the head of message queue. If there *are threads suspended on message queue object, it will be waked up. * *@param mq the message queue object *@param buffer the message *@param size the size of buffer * *@return the error code */rt_err_t rt_mq_urgent(rt_mq_t mq, const void *buffer, rt_size_t size){ register rt_ubase_t temp; struct rt_mq_message *msg;  /* parametercheck */ RT_ASSERT(mq != RT_NULL); RT_ASSERT(rt_object_get_type(&mq->parent.parent) == RT_Object_Class_MessageQueue); RT_ASSERT(buffer != RT_NULL); RT_ASSERT(size != 0);  /* greater than one message size */ if (size > mq->msg_size) return -RT_ERROR;  RT_OBJECT_HOOK_CALL(rt_object_put_hook, (&(mq->parent.parent)));  /* disable interrupt */ temp = rt_hw_interrupt_disable();  /* get a free list, there must be an empty item */ msg = (struct rt_mq_message *)mq->msg_queue_free; /* message queue is full */ if (msg == RT_NULL) { /* enable interrupt */ rt_hw_interrupt_enable(temp);  return -RT_EFULL; } /* move free list pointer */   mq->msg_queue_free = msg->next;  /* enable interrupt */ rt_hw_interrupt_enable(temp);  /* copy buffer*/ rt_memcpy(msg + 1, buffer, size);  /* disable interrupt */ temp = rt_hw_interrupt_disable();  /* link msg to the beginning of message queue */ msg->next = (struct rt_mq_message *)mq->msg_queue_head; mq->msg_queue_head = msg;  /* if there is no tail */ if (mq->msg_queue_tail == RT_NULL) mq->msg_queue_tail = msg;  /* increase message entry */ mq->entry++;  /* resume suspended thread */ if (!rt_list_isempty(&mq->parent.suspend_thread)) { rt_ipc_list_resume(&(mq->parent.suspend_thread));  /* enableinterrupt */ rt_hw_interrupt_enable(temp);  rt_schedule();  return RT_EOK; }  /* enable interrupt */ rt_hw_interrupt_enable(temp);  return RT_EOK;}


rt_mq_urgent與rt_mq_send區別就在於消息存放的位置。urgent從語義上表示的是發送緊急消息,從代碼上看起始就是將消息放入消息隊列頭:

msg->next= (struct rt_mq_message *)mq->msg_queue_head;

mq->msg_queue_head= msg;

第一句讓當前消息的鏈表next指針指向已有的消息隊列頭節點。第二局調整消息隊列頭指針msg_queue_head指向新的消息。因此新的消息就猶如放在消息隊列頭同樣,在接收消息的時候從msg_queue_head開始取出消息便可(後入先出LIFO模型)。

消息隊列接收

/** *This function will receive a message from message queue object, if there is *no message in message queue object, the thread shall wait for a specified *time. * *@param mq the message queue object *@param buffer the received message will be saved in *@param size the size of buffer *@param timeout the waiting time * *@return the error code */rt_err_t rt_mq_recv(rt_mq_t mq, void *buffer, rt_size_t size, rt_int32_t timeout){ struct rt_thread *thread; register rt_ubase_t temp; struct rt_mq_message *msg; rt_uint32_t tick_delta;  /* parametercheck */ RT_ASSERT(mq != RT_NULL); RT_ASSERT(rt_object_get_type(&mq->parent.parent) == RT_Object_Class_MessageQueue); RT_ASSERT(buffer != RT_NULL); RT_ASSERT(size != 0);  /* initialize delta tick */ tick_delta = 0; /* get current thread */ thread = rt_thread_self(); RT_OBJECT_HOOK_CALL(rt_object_trytake_hook, (&(mq->parent.parent)));  /* disable interrupt */ temp =rt_hw_interrupt_disable();  /* for non-blocking call */ if (mq->entry == 0 && timeout == 0) { rt_hw_interrupt_enable(temp);  return -RT_ETIMEOUT; }  /* message queue is empty */ while (mq->entry == 0) { RT_DEBUG_IN_THREAD_CONTEXT;  /* reset error number in thread */ thread->error= RT_EOK;  /* no waiting, return timeout */ if (timeout == 0) { /* enable interrupt */ rt_hw_interrupt_enable(temp);  thread->error = -RT_ETIMEOUT;  return -RT_ETIMEOUT; }  /* suspend current thread */ rt_ipc_list_suspend(&(mq->parent.suspend_thread), thread, mq->parent.parent.flag);  /* has waiting time, start thread timer */ if (timeout > 0) { /* get the start tick of timer */ tick_delta = rt_tick_get();  RT_DEBUG_LOG(RT_DEBUG_IPC, ("set thread:%s to timer list", thread->name));  /* reset the timeout of thread timer and start it */ rt_timer_control(&(thread->thread_timer), RT_TIMER_CTRL_SET_TIME, &timeout); rt_timer_start(&(thread->thread_timer)); }  /* enable interrupt */ rt_hw_interrupt_enable(temp);  /* re-schedule*/ rt_schedule();  /* recv message*/ if (thread->error != RT_EOK) { /* return error */ return thread->error; }  /* disable interrupt */ temp = rt_hw_interrupt_disable();  /* if it's not waiting forever and then re-calculate timeout tick */ if (timeout > 0) { tick_delta = rt_tick_get() - tick_delta; timeout -= tick_delta; if (timeout < 0) timeout = 0; } }  /* get message from queue */ msg = (struct rt_mq_message *)mq->msg_queue_head;  /* move message queue head */ mq->msg_queue_head = msg->next; /* reach queue tail, set to NULL */ if (mq->msg_queue_tail == msg) mq->msg_queue_tail = RT_NULL;  /* decrease message entry */ mq->entry--;  /* enable interrupt */ rt_hw_interrupt_enable(temp);  /* copy message*/ rt_memcpy(buffer, msg + 1, size > mq->msg_size ? mq->msg_size : size);  /* disable interrupt*/ temp = rt_hw_interrupt_disable(); /* put message to free list */ msg->next = (struct rt_mq_message *)mq->msg_queue_free; mq->msg_queue_free = msg; /* enable interrupt */ rt_hw_interrupt_enable(temp);  RT_OBJECT_HOOK_CALL(rt_object_take_hook, (&(mq->parent.parent)));  return RT_EOK;}


消息隊列接收與其它IPC接收過程並沒有太大區別,並都支持timeout超時等待。先來看函數的參數:

buffer:接收消息的內存地址

size:接收消息的長度

timeout:當消息隊列爲空時將根據timeout來決定是否等待


函數內部實現前部分與其它IPC接收狀況相似,根據消息隊列是否爲空以及是否須要timeout等待來處理,當等待時則啓動任務timer,並將任務掛起在IPC的suspend鏈表中,而後觸發調度器調度到其它任務。


等到等待超時則直接返回,不然就表明被髮送端喚醒。只要是被髮送端喚醒則表明可能有可用的消息能夠接收,這種狀況和上篇文章mbox狀況相似。喚醒後首先判斷當前剩餘的等待時間,而後檢查是否有可用的消息能夠接收。接收消息的時候從msg_queue_head處取出消息便可,同時將msg_queue_head指向next,接着將entry計數減一,而後拷貝消息到buffer中,注意拷貝的尺寸,當超過消息隊列的消息尺寸時則最多隻拷貝消息的長度,不然消息將被截短。最後將當前的隊列節點進行回收,回收時將其放入msg_queue_free鏈表中。

END


RT-Thread線上活動


一、【RT-Thread能力認證考試12月——RCEA】通過第一次考試的驗證,RT-Thread能力認證獲得了更多社區開發者和產業界的大力支持(點此查看)若是您有晉升、求職、尋找更好機會的須要,有深刻學習和掌握RT-Thread的需求,歡迎垂詢/報考!

能力認證官網連接:https://www.rt-thread.org/page/rac.html(在外部瀏覽器打開)


當即報名


#題外話# 喜歡RT-Thread不要忘了在GitHub上留下你的STAR哦,你的star對咱們來講很是重要!連接地址:https://github.com/RT-Thread/rt-thread


你能夠添加微信18917005679爲好友,註明:公司+姓名,拉進 RT-Thread 官方微信交流羣

RT-Thread


讓物聯網終端的開發變得簡單、快速,芯片的價值獲得最大化發揮。Apache2.0協議,可免費在商業產品中使用,不須要公佈源碼,無潛在商業風險。

長按二維碼,關注咱們


看這裏,求贊!求轉發!

點擊閱讀原文進入Github

本文分享自微信公衆號 - RTThread物聯網操做系統(RTThread)。
若有侵權,請聯繫 support@oschina.cn 刪除。
本文參與「OSC源創計劃」,歡迎正在閱讀的你也加入,一塊兒分享。

相關文章
相關標籤/搜索