ThreadX——IPC應用之消息隊列

  • 做者:zzssdd2html

  • E-mail:zzssdd2@foxmail.comapi

1、應用簡介

消息隊列是RTOS中經常使用的一種數據通訊方式,經常使用於任務與任務之間或是中斷與任務之間的數據傳遞。在裸機系統中咱們一般會使用全局變量的方式進行數據傳遞,好比在事件發生後在中斷中改變數據和設置標誌,而後在主循環中輪詢不一樣的標誌是否生效來對全局數據執行不一樣的操做,執行完畢後清除相關標誌。可是這種方式須要不斷地輪詢標誌狀態,使得CPU的利用率並不高。而使用RTOS的消息隊列則具備任務阻塞機制,當沒有須要處理的消息時任務掛起等待消息,此時其餘任務佔用CPU執行其餘操做,當有消息放入隊列時任務恢復運行進行消息接收和處理。這種消息處理機制相比裸機而言大大地提升了CPU利用率。安全

  • ThreadX的消息隊列支持「消息置頂通知」功能,也就是能夠將消息放在隊列的最前面,使得任務能夠及時處理某些緊急消息(RT-Thread的消息隊列也有該功能)
  • ThreadX的消息隊列能夠傳遞任意長度的數據,由於它是採用傳遞數據指針的方式(uCOS也是採用這種引用傳遞的方式,而FreeRTOS和RT-Thread則支持傳遞總體數據內容。這兩種方式各有優劣吧,指針傳遞方式優勢是執行效率高,缺點是存數據的內存區域若是數據還未及時處理就被覆寫了那麼就會引起問題;總體數據傳遞方式優勢是安全不需擔憂數據覆寫致錯,缺點是數據量大的話傳遞數據過程執行時間長致使效率低)

2、API簡介

下面介紹使用ThreadX的消息隊列時經常使用的幾個API函數。app

一、建立消息隊列

  • 描述
    • 該服務用於建立消息隊列。 消息總數是根據指定的消息大小和隊列中的字節總數來計算的
    • 若是在隊列的內存區域中指定的字節總數不能被指定的消息大小均分,則不會使用該內存區域中的其他字節
  • 參數
    • queue_ptr 指向消息隊列控制塊的指針
    • name_ptr 指向消息隊列名稱的指針
    • message_size 指定隊列中每條消息的大小。 消息大小選項爲1個32位字到16個32位字之間(包含)
    • queue_start 消息隊列的起始地址。 起始地址必須與ULONG數據類型的大小對齊
    • queue_size 消息隊列可用的字節總數
  • 返回值
    • TX_SUCCESS (0x00) 建立成功
    • TX_QUEUE_ERROR (0x09) 無效的消息隊列指針,指針爲NULL或隊列已建立
    • TX_PTR_ERROR (0x03) 消息隊列的起始地址無效
    • TX_SIZE_ERROR (0x05) 消息隊列大小無效
    • TX_CALLER_ERROR (0x13) 該服務的調用者無效
UINT tx_queue_create(
    TX_QUEUE *queue_ptr, 
    CHAR *name_ptr,
    UINT message_size,
    VOID *queue_start, 
    ULONG queue_size);

二、刪除消息隊列

  • 描述
    • 此服務刪除指定的消息隊列。全部掛起等待此隊列消息的線程都將恢復,並給出TX_DELETED返回狀態
    • 在刪除隊列以前,應用程序必須確保已完成(或禁用)此隊列的全部send_notify回調。 此外,應用程序必須防止未來使用已刪除的隊列
    • 應用程序還負責管理與隊列相關聯的內存區域,該內存區域在此服務完成後可用
  • 參數
    • queue_ptr 指向先前建立的消息隊列的指針
  • 返回值
    • TX_SUCCESS (0x00) 刪除成功
    • TX_QUEUE_ERROR (0x09) 消息隊列指針無效
    • TX_CALLER_ERROR (0x13) 該服務的調用者無效
UINT tx_queue_delete(TX_QUEUE *queue_ptr);

三、清空消息隊列

  • 描述
    • 此服務刪除存儲在指定消息隊列中的全部消息
    • 若是隊列已滿,將丟棄全部掛起線程的消息,而後恢復每一個掛起的線程,並返回一個指示消息發送成功的返回狀態。若是隊列爲空,則此服務不執行任何操做。
  • 參數
    • queue_ptr 指向先前建立的消息隊列的指針
  • 返回值
    • TX_SUCCESS (0x00) 操做成功
    • TX_QUEUE_ERROR (0x09) 消息隊列指針無效
UINT tx_queue_flush(TX_QUEUE *queue_ptr);

四、消息置頂

  • 描述
    • 該服務將消息發送到指定消息隊列的最前面。 消息從源指針指定的存儲區域複製到隊列的最前面
  • 參數
    • queue_ptr 指向消息隊列控制塊的指針
    • source_ptr 指向存放消息的指針
    • wait_option 定義消息隊列已滿時服務的行爲
      • TX_NO_WAIT (0x00000000) - 不管是否成功都當即返回(用於非線程調用,例如中斷裏面)
      • TX_WAIT_FOREVER (0xFFFFFFFF) - 一直等待直到消息隊列有空閒爲止
  • 返回值
    • TX_SUCCESS (0x00) 操做成功
    • TX_DELETED (0x01) 線程掛起時,消息隊列被刪除
    • TX_QUEUE_FULL (0x0B) 服務沒法發送消息,由於在指定的等待時間內隊列已滿
    • TX_WAIT_ABORTED (0x1A) 被另外一個線程、計時器或ISR中斷給停止
    • TX_QUEUE_ERROR (0x09) 無效的消息隊列指針
    • TX_PTR_ERROR (0x03) 消息的源指針無效
    • TX_WAIT_ERROR (0x04) 在非線程調用中指定了TX_NO_WAIT之外的等待選項
UINT tx_queue_front_send(
    TX_QUEUE *queue_ptr,
    VOID *source_ptr, 
    ULONG wait_option);

五、獲取消息隊列信息

  • 描述
    • 該服務檢索有關指定消息隊列的信息
  • 參數(TX_NULL表示不須要獲取該參數表明的信息)
    • queue_ptr 指向先前建立的消息隊列的指針
    • name 指向目標的指針,用於指向隊列名稱
    • enqueued 指向目標的指針,表示當前隊列中的消息數
    • available_storage 指向目標的指針,表示隊列當前有空間容納的消息數
    • first_suspended 指向目標的指針,該指針指向該隊列的掛起列表中第一個線程
    • suspended_count 指向目標的指針,用於指示當前在此隊列上掛起的線程數
    • next_queue 指向下一個建立隊列的指針的目標的指針
  • 返回值
    • TX_SUCCESS (0x00) 操做成功
    • TX_QUEUE_ERROR (0x09) 無效的消息隊列指針
UINT tx_queue_info_get(
    TX_QUEUE *queue_ptr, 
    CHAR **name,
    ULONG *enqueued, 
    ULONG *available_storage
    TX_THREAD **first_suspended, 
    ULONG *suspended_count,
    TX_QUEUE **next_queue);

六、從隊列獲取消息

  • 描述
    • 該服務從指定的消息隊列中檢索消息。 檢索到的消息從隊列複製到目標指針指定的存儲區域。 而後將該消息從隊列中刪除
    • 指定的目標存儲區必須足夠大以容納消息。 也就是說,由destination_ptr 指向的消息目標必須至少與此隊列的消息大小同樣大。 不然,若是目標不夠大,則會在存儲區域中發生內存地址非法錯誤
  • 參數
    • queue_ptr 指向先前建立的消息隊列的指針
    • destination_ptr 指向儲存消息的地址
    • wait_option 定義消息隊列爲空時服務的行爲
      • TX_NO_WAIT (0x00000000) - 不管是否成功都當即返回(用於非線程調用,例如中斷裏面)
      • TX_WAIT_FOREVER (0xFFFFFFFF) - 一直等待直到有消息能夠獲取
      • 0x00000001 ~ 0xFFFFFFFE- 指定具體等待心跳節拍數(若是心跳頻率1KHZ,那麼單位就是ms )
  • 返回值
    • TX_SUCCESS (0x00) 操做成功
    • TX_DELETED (0x01) 線程掛起時刪除了消息隊列
    • TX_QUEUE_EMPTY (0x0A) 服務沒法檢索消息,由於隊列在指定的等待時間段內爲空
    • TX_WAIT_ABORTED (0x1A) 被另外一個線程、計時器或ISR中斷給停止
    • TX_QUEUE_ERROR (0x09) 無效的消息隊列指針
    • TX_PTR_ERROR (0x03) 消息的目標指針無效
    • TX_WAIT_ERROR (0x04) 在非線程調用中指定了TX_NO_WAIT之外的等待選項
UINT tx_queue_receive(
    TX_QUEUE *queue_ptr,
    VOID *destination_ptr, 
    ULONG wait_option);

七、向隊列發送消息

  • 描述
    • 此服務將消息發送到指定的消息隊列。發送的消息將從源指針指定的內存區域複製到隊列中。
  • 參數
    • queue_ptr 指向先前建立的消息隊列的指針
    • source_ptr 指向消息的指針
    • wait_option 定義消息隊列已滿時服務的行爲
      • TX_NO_WAIT (0x00000000) - 不管是否成功都當即返回(用於非線程調用,例如中斷裏面)
      • TX_WAIT_FOREVER (0xFFFFFFFF) - 一直等待直到隊列有空位能夠放置消息
      • 0x00000001 ~ 0xFFFFFFFE - 指定具體等待心跳節拍數(若是心跳頻率1KHZ,那麼單位就是ms )
  • 返回值
    • TX_SUCCESS (0x00) 操做成功
    • TX_DELETED (0x01) 線程掛起時刪除了消息隊列
    • TX_QUEUE_FULL (0x0B) 服務沒法發送消息,由於隊列在指定的等待時間內已滿
    • TX_WAIT_ABORTED (0x1A) 被另外一個線程、計時器或ISR中斷給停止
    • TX_QUEUE_ERROR (0x09) 無效的消息隊列指針
    • TX_PTR_ERROR (0x03) 消息的目標指針無效
    • TX_WAIT_ERROR (0x04) 在非線程調用中指定了TX_NO_WAIT之外的等待選項
UINT tx_queue_send(
    TX_QUEUE *queue_ptr,
    VOID *source_ptr, 
    ULONG wait_option);

八、註冊發送通知回調函數

  • 描述
    • 此服務註冊一個通知回調函數,每當一條消息發送到指定的隊列時就會調用該函數。 通知回調的處理由應用程序定義
    • 不容許在應用程序的隊列發送通知回調函數中調用具備暫停選項的ThreadX API
  • 參數
    • queue_ptr 指向先前建立的隊列的指針
    • queue_send_notify 指向應用程序隊列發送通知功能的指針。 若是此值爲TX_NULL,則禁用通知
  • 返回值
    • TX_SUCCESS (0x00) 操做成功
    • TX_QUEUE_ERROR (0x09) 無效的隊列指針
    • TX_FEATURE_NOT_ENABLED (0xFF) 禁用了通知功能
UINT tx_queue_send_notify(
    TX_QUEUE *queue_ptr,
    VOID (*queue_send_notify)(TX_QUEUE *));

3、實例演示

  • 該應用實例建立三個任務和一個隊列消息發送通知回調
  • 任務1:按鍵1按一次向消息隊列1發送一條消息(單個變量消息)
  • 任務2:按鍵2按一次向消息隊列2發送一條消息(結構體指針消息)
  • 任務3:向消息隊列3發送消息;接收任務1和任務2的消息並打印輸出消息內容
  • 回調功能:輸出消息隊列3的相關信息

建立消息隊列函數

#define DEMO_STACK_SIZE         (2 * 1024)
#define DEMO_BYTE_POOL_SIZE     (32 * 1024)

TX_THREAD       thread_0;
TX_THREAD       thread_1;
TX_THREAD       thread_2;

TX_BYTE_POOL    byte_pool_0;
UCHAR           memory_area[DEMO_BYTE_POOL_SIZE];

/* 消息隊列 */
TX_QUEUE        tx_queue1;
TX_QUEUE        tx_queue2;
TX_QUEUE        tx_queue3;

ULONG           msg_queue1[32];
ULONG           msg_queue2[16];
ULONG           msg_queue3[8];

struct S_DATA{
    uint32_t id;
    uint16_t flag;
    uint8_t msg[2];
};
struct S_DATA data_package;

void thread_0_entry(ULONG thread_input);
void thread_1_entry(ULONG thread_input);
void thread_2_entry(ULONG thread_input);
void queue3_send_notify(TX_QUEUE *input);
void tx_application_define(void *first_unused_memory)
{
    CHAR    *pointer = TX_NULL;

    /* Create a byte memory pool from which to allocate the thread stacks. */
    tx_byte_pool_create(&byte_pool_0, "byte pool 0", memory_area, DEMO_BYTE_POOL_SIZE);

    /* Allocate the stack for thread 0. */
    tx_byte_allocate(&byte_pool_0, (VOID **) &pointer, DEMO_STACK_SIZE, TX_NO_WAIT);
    /* Create the main thread. */
    tx_thread_create(&thread_0, "thread 0", thread_0_entry, 0,  
                    pointer, DEMO_STACK_SIZE, 
                    1, 1, TX_NO_TIME_SLICE, TX_AUTO_START);

    /* Allocate the stack for thread 1. */
    tx_byte_allocate(&byte_pool_0, (VOID **) &pointer, DEMO_STACK_SIZE, TX_NO_WAIT);
    /* Create threads 1 */
    tx_thread_create(&thread_1, "thread 1", thread_1_entry, 0,  
                    pointer, DEMO_STACK_SIZE, 
                    2, 2, TX_NO_TIME_SLICE, TX_AUTO_START);

    /* Allocate the stack for thread 2. */
    tx_byte_allocate(&byte_pool_0, (VOID **) &pointer, DEMO_STACK_SIZE, TX_NO_WAIT);
    /* Create threads 1 */
    tx_thread_create(&thread_2, "thread 2", thread_2_entry, 0,  
                    pointer, DEMO_STACK_SIZE, 
                    3, 3, TX_NO_TIME_SLICE, TX_AUTO_START);

    /* 建立消息隊列 */
    tx_queue_create(&tx_queue1, "tx_queue1", 1, msg_queue1, sizeof(msg_queue1));
    tx_queue_create(&tx_queue2, "tx_queue2", 1, msg_queue2, sizeof(msg_queue2));
    tx_queue_create(&tx_queue3, "tx_queue2", 1, msg_queue3, sizeof(msg_queue3));
    
    /* 註冊發送消息回調 */
    tx_queue_send_notify(&tx_queue3, queue3_send_notify);
}

任務1ui

void    thread_0_entry(ULONG thread_input)
{
    uint8_t i =0, key_flag = 0;
    uint8_t data_single = 0;

    while(1)
    {
        if (0 == key_flag)
        {
            if (GPIO_PIN_SET == HAL_GPIO_ReadPin(KEY1_GPIO_Port, KEY1_Pin))
            {
                key_flag = 1;
            }
        }
        else
        {
            if (GPIO_PIN_RESET == HAL_GPIO_ReadPin(KEY1_GPIO_Port,KEY1_Pin))
            {
                key_flag = 0;
                /*按鍵1觸發,向隊列1發送消息*/
                data_single++;
                tx_queue_send(&tx_queue1, &data_single, TX_NO_WAIT);
            }
        }
        tx_thread_sleep(20);
    }
}

任務2線程

void    thread_1_entry(ULONG thread_input)
{
    uint8_t key_flag = 0;
    struct S_DATA *pData;

    pData = &data_package;
    pData->id       = 1;
    pData->flag     = 2;
    pData->msg[0]   = 3;
    pData->msg[1]   = 4;

    while(1)
    {
        if (0 == key_flag)
        {
            if (GPIO_PIN_SET == HAL_GPIO_ReadPin(KEY2_GPIO_Port, KEY2_Pin))
            {
                key_flag = 1;
            }
        }
        else
        {
            if (GPIO_PIN_RESET == HAL_GPIO_ReadPin(KEY2_GPIO_Port,KEY2_Pin))
            {
                key_flag = 0;
                /*按鍵2觸發,向隊列2發送消息*/
                pData->id       += 8;
                pData->flag     += 4;
                pData->msg[0]   += 2;
                pData->msg[1]   += 1;
                tx_queue_send(&tx_queue2, &pData, TX_NO_WAIT);
            }
        }
        tx_thread_sleep(20);
    }
}

任務3指針

void    thread_2_entry(ULONG thread_input)
{
    UINT status;
    uint8_t char_data;
    ULONG long_data = 0;
    struct S_DATA *buf_data;

    while(1)
    {
        /* 向隊列3發送消息 */
        long_data++;
        tx_queue_send(&tx_queue3, &long_data, TX_NO_WAIT);
        if (0 == (long_data & 7))
        {
            tx_queue_flush(&tx_queue3);
        }

        /* 接收隊列1消息 */
        status = tx_queue_receive(&tx_queue1, &char_data, 1000);
        if (TX_SUCCESS == status)
        {
            SEGGER_RTT_SetTerminal(0);
            SEGGER_RTT_printf(0, RTT_CTRL_TEXT_BRIGHT_GREEN"message queue1 receive data is %d\r\n", char_data);
        }

        /* 接收隊列2消息 */
        status = tx_queue_receive(&tx_queue2, &buf_data, 1000);
        if (TX_SUCCESS == status)
        {
            SEGGER_RTT_SetTerminal(1);
            SEGGER_RTT_printf(0, RTT_CTRL_TEXT_BRIGHT_YELLOW"message queue2 receive data is %d\t%d\t%d\t%d \r\n", \
                                buf_data->id, \
                                buf_data->flag, \
                                buf_data->msg[0], \
                                buf_data->msg[1]);
        }
    } 	
}

發送隊列消息回調功能code

void    queue3_send_notify(TX_QUEUE *input)
{
    ULONG enqueued;             // 隊列中的消息數
    ULONG available_storage;    // 隊列剩餘空間
    
    tx_queue_info_get(&tx_queue3, TX_NULL, &enqueued, &available_storage, TX_NULL, TX_NULL, TX_NULL);
    
    SEGGER_RTT_SetTerminal(2);
    SEGGER_RTT_printf(0, "the number of messages in the queue3 %d\r\n", enqueued);
    SEGGER_RTT_printf(0, "the queue3 remaining size %d\r\n", available_storage);
}

任務1演示結果htm

任務2演示結果

任務3演示結果

注:關於使用SEGGER_RTT打印功能能夠參考這篇筆記:https://www.cnblogs.com/zzssdd2/p/14162382.html

相關文章
相關標籤/搜索