GNU Radio: Overview of the GNU Radio Scheduler

Scetion 1: The Flowgraph

The flowgraph moves data from sources into sinks.html

一個流圖由多個模塊組成,其中通常包括信源(Source)模塊和信宿(Sink)模塊,而後經過連線將他們鏈接在一塊兒並造成一個頂層模塊(top_block類),最後經過調用頂層模塊下的start()成員函數啓動GNU Radio平臺下的軟件運行。在運行過程當中,每個模塊都至關因而一個線程,經過GNU Radio平臺下的調度機制將信源模塊產生的數據通過一系列的處理模塊,最後傳輸到信宿模塊。這樣一個運行過程包括了硬件驅動、數據buffer管理以及線程調度等工做。其中數據buffer的管理機制是零拷貝循環緩存(Section 4 會講到),這樣可以保證來自源的數據流高效在各個模塊之間傳輸。api

Example of data moving with rate changes.緩存

GNU Radio 中的block根據輸入輸出樣點的關係分爲: 任意比例、1: 1,N: 1,1: N (Section 2會講到),其中 sync 是1: 1的關係,即輸出樣點數等於輸入樣點數,decim是N: 1的關係,即輸入: 輸出 = N: 1。經過decim(抽取器)實現降速率傳輸。app

The flowgraph must check the bounds to satify input/output requirements. less

All input streams and output streams must satify the constraints. 異步

Flowgraph 運行時會檢查每一個模塊(block)是否知足輸入輸出的要求。每一個模塊的輸出端口都綁定一個循環緩衝區(buffer),經過一個寫指針 r_ptr 輸出數據。它的下游模塊經過一個讀指針w_ptr 讀取數據。每個模塊在知足輸出緩衝區空間足夠,而且輸入緩衝區可讀數據足夠,該模塊才能正常運行。對於上圖 sync 模塊來講,須要知足n_out >= 2048 && n_in >= 2048 該模塊才能正常運行。 ide

The boundary conditions can change with rate changing blocks. 函數

Decimators need enough input to calculate the decimated output. oop

The conditions are independently established with each block. post

This block is asking for less than it can on the input. 

Scetion 2: The general_work and work functions

The input and output buffers

general_work / work have two vectors passed to it. 

int block::general_work(int output_items, gr_vector_int &ninput_items, gr_vector_const_void_star &input_items, gr_vector_void_star &output_items) int block::work(int output_items, gr_vector_const_void_star &input_items, gr_vector_void_star &output_items)
  • intput_items is a vector of pointers to input buffers. 
  • output_items is a vector of pointers to output buffers. 

前面提到,每個模塊(block)都是一個線程,general_work() / work()是該線程的入口函數。經過兩個 vector: intput_items 和 output_items 分別控制輸入輸出緩衝區的讀寫。對於上圖的sync 模塊來講,它有兩個輸入端口,一個輸出端口,那麼input_items[0],input_items[1]分別是兩個輸入端口的讀指針,output_items[0]是輸出端口的寫指針。

general_work has not input/output relationship

It's told the number of output and input items:

int block::general_work(int noutput_items, gr_vector_int &ninput_items, gr_vector_const_void_star &input_items, gr_vector_void_star &output_items)
  • noutput_items: minimum number of output available on all output buffers.
  • ninput_items: vector of items available on all input buffers.

general_work函數對應的模塊的輸入輸出關係是任意的,經過noutput_items 和 nintput_items來控制輸入和輸出之間的關係。其中咱們注意到:noutput_items是一個變量,而nintout_items 是vector。這是由於GNU Radio要求模塊的每一個輸出端口產生的樣點數都必須相同,而對於輸入則沒有這個要求。對於上圖的 sync 模塊來講,nintput_items[0],nintput_items[1]分別表示兩個端口輸入的樣點數,noutput_items表示全部輸出端口的樣點數。

Number of input and output items?

noutput_items: how many output items work can produce

  • general_work: no guaranteed relationship between inputs and outputs.
  • work: knowing noutput_items tells us ninput_items based on the established relationship
    • gr::sync_block: ninput_items[i] = noutput_items
    • gr::sync_decimator: ninput_items[i] = noutput_items*decimation()
    • gr::sync_interpolator: ninput_items[i] = noutput_items/interpolation()
  • Because of the input/output relationship of a sync block, only need to know one side

work函數對應的模塊輸入輸出關係是肯定的,分爲3種:gr::sync_block,gr::sync_decimator,gr::sync_interpolator,輸入輸出的關係分別爲1: 1,N: 1,1: N。而且在work()函數中沒有nintput_items這個變量,由於能夠根據上述關係,經過noutput_items計算出nintput_items。

work operates off just noutput items

From this number, we infer how many input items we have:

int block::work(int noutput_items, gr_vector_const_void_star &input_items, gr_vector_void_star &output_items)
  • noutput_items: minimum number of output available on all output buffers.
  • ninput_items: calculated from noutput_items and type of sync block.

Scetion 3: Scheduler's job

Overview

The scheduler handles the buffer states, block requirements, messages, and stream tags. 

"A stream of samples is much more interesting when there is parsable metadata connected to that stream, such as the time of reception, center frequency, subframe index or even protocol-specific information. So there must be a way to identify PDU boundaries, pass control data between blocks. GNU Radio supports two ways to do this: Message passing and stream tag."

GNU Radio採用兩種機制: Message passing and stream tag,在block 之間傳輸信息,例如接收時間,中心頻率,子幀號或者特定的協議信息。其中Stream tag是同步標籤,只能單向傳輸。Message是異步消息,能夠向任何方向傳輸。在GNU Radio中,Stream tag中實線表示,Message用虛線表示。注意:Stream tag與數據流是並行傳輸模式,不是插入到原始數據流,也不會改變原始數據流,而是綁定到數據流的某一個樣點,只能在block之間傳遞消息,不能經過天線發送出去!

Message Passing Layer
Send commands, metadata, and packets between blocks.
Asynchronous messages from and to any block:

tb.msg_connect(Blk1, "out port", Blk0, "in port") tb.msg_connect(Blk2, "out port", Sink, "in port")

Scheduler Handles the Asynchronous Message Passing

Asynchronous Message Passing:

  • When a message is posted, it is placed in each subscribers queue.
  • Messages are handled before general_work is called.
  • The scheduler dispatches the messages:
    • Checks if there is a handler for the message type.
      • If there is no handler, a queue of max_nmsgs is held.
      • Oldest message is dropped if more than max_nmsgs in queue.
      • max_nmsgs is set in preferences le in [DEFAULT]:max_messages.
    • Pops the message off the queue.
    • Dispatches the message by calling the block's handler.

Stream tag layer

Adds a Control, Logic, and Metadata layer to data flow

Tags carry key/value data associated with a specic sample.

Tags are propagated downstream through each block.

Tags are updated by data rate changes.

 

 1 #ifndef INCLUDED_GR_TAGS_H
 2 #define INCLUDED_GR_TAGS_H
 3 
 4 #include <gnuradio/api.h>
 5 #include <pmt/pmt.h>
 6 
 7 namespace gr {
 8 
 9   struct GR_RUNTIME_API tag_t
10   {
11     //! the item \p tag occurred at (as a uint64_t)
12     uint64_t offset;
13 
14     //! the key of \p tag (as a PMT symbol)
15     pmt::pmt_t key;
16 
17     //! the value of \p tag (as a PMT)
18     pmt::pmt_t value;
19 
20     //! the source ID of \p tag (as a PMT)
21     pmt::pmt_t srcid;
22 
23     //! Used by gr_buffer to mark a tagged as deleted by a specific block. You can usually ignore this.
24     std::vector<long> marked_deleted;
25 
26     /*!
27      * Comparison function to test which tag, \p x or \p y, came
28      * first in time
29      */
30     static inline bool offset_compare(const tag_t &x,
31                                       const tag_t &y)
32     {
33       return x.offset < y.offset;
34     }
35 
36     inline bool operator == (const tag_t &t) const
37     {
38       return (t.key == key) && (t.value == value) && \
39       (t.srcid == srcid) && (t.offset == offset);
40     }
41 
42     tag_t()
43       : offset(0),
44         key(pmt::PMT_NIL),
45         value(pmt::PMT_NIL),
46         srcid(pmt::PMT_F)    // consistent with default srcid value in block::add_item_tag
47     {
48     }
49 
50     ~tag_t()
51     {
52     }
53   };
54 
55 } /* namespace gr */
56 
57 #endif /*INCLUDED_GR_TAGS_H*/
View Code tags.h

注意:在通過變速率的block以後,會改變tag的位置。

Propagate tags downstream based on the tag_propagation_policy. 

在block.h 中定義了3中tag傳播策略:

 

enum tag_propagation_policy_t {
    TPP_DONT = 0,
    TPP_ALL_TO_ALL = 1,
    TPP_ONE_TO_ONE = 2
};

 

 

 

其中默認是TPP_ALL_TO_ALL,即輸入端口收到的tag 會傳輸到每個輸出端口。TPP_ONE_TO_ONE模式是輸入端口 i 收到的tag只能傳出到輸出端口 i 。TPP_DONE表示該block 收到tag 以後再也不向後面的模塊傳輸。

Tag propagation:

  • tag_propagation_policy typically set in block's constructor.Called after general_work.
    • Defaults to block::TPP_ALL_TO_ALL.
  • If propagating:
    • Gets tags in window of last work function.
    • If relative_ratio is 1, copies all tags as is.
    • Otherwise, adjusts offset of tag based on relative_ratio.

Review of propagation policies
block::TPP_ALL_TO_ALL

block::TPP_ONE_TO_ONE

block::TPP_DONT

  • Tags are not propagated and are removed from the stream.
  • Can allow block to handle propagation on its own.

Alignment

set_alignment(int multiple)

Set alignment in number of items.

  • Restricts number of items available to multiple of alignment.
  • Not guaranteed, but recovers quickly if unalignment unavoidable.

Output Multiple

set_output_multiple(int multiple)

  • Restricts number of items available to set multiple.
  • Similar to alignment, but this is guaranteed.
  • If not enough for alignment, will wait until there is.
  • Cannot be set dynamically.

Forecast

Overloaded function of the class

Tells scheduler how many input items are required for each output item.

  • Given noutput_items, calculates ninput_items[i] for each input stream.
    • Default: ninput_items[i]=noutput_items+history()-1;
    • Decim: ninput_items[i]=noutput_items*decimation()+history()-1;
    • Interp: ninput_items[i]=noutput_items/interpolation()+history()-1;
  • Use this to reduce the book-keeping checks in a block.
    • Can guaranteed ninput_items[i] > noutput_items
    • Don't have to check both conditions.

History

sethistory(nitems+1)

History sets read pointer history() items back in time.

  • Makes sure we have valid data history() items beyond noutput_items.
  • Used to allow causal signals between calls to work.

Buffer Size and Controlling Flow and Latency 

Set of features that affect the buffers 

  • set_max_noutput_items(int)
    • Caps the maximum noutput_items.
    • Will round down to nearest output multiple, if set.
    • Does not change the size of any buffers.
  • set_max_output_buffer(long)
    • Sets the maximum buffer size for all output buffers.
    • Buffer calculations are based on a number of factors, this limits overall size.
    • On most systems, will round to nearest page size.
  • set_min_output_buffer(long)
    • Sets the minimum buffer size for all output buffers.
    • On most systems, will round to nearest page size.

Scheduler Manages the Data Stream Conditions

General tasks:

  1. Calculate how many items are available on the input.
  2. Calculate how much space is available on the output.
  3. Determine restrictions: alignment, output_multiple, forecast requirements, etc.
  4. Adjust as necessary or abort and try again.
  5. Call the general_work function and pass appropriate pointers and number of items.
  6. Take returned info from general_work to update the pointers in the gr::buffer and gr::buffer_reader objects.

Scetion 4: Scheduler Flow Chart

Scheduler Flow Chart: top_block.start()

Start in scheduler_tpb.cc

Initialize thread for each block:

Each block's thread runs the loop until done

Handles messages, state, and calls run_one_iteration: 

run_one_iteration in block_executor.cc

Start of the iteration:

run_one_iteration::try_again

If block has inputs (sinks/blocks), handle input/output reqs.:

run_one_iteration::try_again: Fixed Rate

Fixed rate blocks have special restrictions:  

run_one_iteration::try_again: Alignment

Works to keeps buers aligned if possible:

run_one_iteration::try_again: Failure

If something goes wrong, try again, fail, or block and wait: 

run_one_iteration::setup_call_to_work

Call work and do book-keeping:

run_one_iteration::were_done

When the owgraph can't continue, end it: 

"Get items_available for all inputs"

Gets dierence between write pointers and read pointers for all inputs:

"Calc space on output buffer"

Space available is the difference between write pointers to the first read pointer. noutput_items is the minimum for all output buffers:

"call forecast, sets ninput_items_required"

Given noutput_items, forecast calculates the required number of items available for each input.

void sync_decimator::forecast(int noutput_items, gr_vector_int &ninput_items_required) { unsigned ninputs = ninput_items_required.size(); for(unsigned i = 0; i < ninputs; i++) ninput_items_required[i] = \ fixed_rate_noutput_to_ninput(noutput_items); } int sync_decimator::fixed_rate_noutput_to_ninput(int noutput_items) { return noutput_items * decimation() + history() - 1; }

"Do all inputs have nitems req.?"

Tests that items_available[i] >= ninput_items_required[i] for all i.

  • If yes, run the setup_call_to_work section.
  • Otherwise, we're in a fail mode:
    • If we still have enough output space, goto try_again.
    • If the input is marked done, goto were_done.
    • If block requires more than is possible, goto were_done.
    • Otherwise, we're blocked so we exit and will start over on next iteration.

Scetion 5: Buffer Creation

Buffers are handled almost completely behind the scenes

Standard Creation

  • GNU Radio selects the best option for how to create buffers.
  • Allocated at the start of a page.
  • Length is a multiple of the page size.
  • Memory mapped second half for easy circular buffer.
  • Guard pages one either side.

User controls

  • Minimum buffer size.
  • Maximum buffer size.

Circular buers in memory

Shows guard pages and memory-mapped half

Buffer creation techniques

Controlled by the vmcircbuf classes

  • Selects from:Reads from a preference file, if set.
    • vmcircbuf_createfilemapping.h
    • vmcircbuf_sysv_shm.h
    • vmcircbuf_mmap_shm_open.h
    • vmcircbuf_mmap_tmpfile.h
  • Tests all factories, saves preferred to preference file.

Buffer creation: Create File Mapping

Generally used for MS Windows

  • size required to be a multiple of the page size.
    • Uses CreateFileMapping to get a handle to paging file.
  • Allocates virtual memory of 2*size.
    • Uses VirtualAlloc to get first_tmp.
  • Map the paging file to the first half of the virtual memory.
    • Uses MapViewOfFileEx with first_tmp as pointer base.
  • Map the paging file to the second half of the virtual memory.Both first and second half are mapped to the same paging file. 
    • Uses MapViewOfFileEx with first_tmp+size as pointer base.

Buffer creation: Memory-mapped Temp File

Generally used for OSX

  • size required to be a multiple of the page size.Uses unlink to hide file and remove it when program closes.
    • Creates a temp file with permissions 0x0600.
  • Sets length of temp file to 2*size.
    • Uses ftruncate.
  • Map the first half of the file to a pointer first_copy.
    • Uses mmap to point to start of temp file.
  • Map the second half of the file to a pointer second_copy.Resets temp file to size with ftruncate.
    • Uses mmap to point to first_copy+size.
  • Uses first_copy as the Buffer's base address.

Buffer creation: System V Shared Memory

Generally used for Linux/POSIX

  • size required to be a multiple of the page size.Attach shmid1 to first half of schmid2 with shmat.
    • Uses shmget to get 2*size (plus guard pages) as schmid2.
    • Uses shmget to get size as shmid1.
  • Attach shmid1 to second half of schmid2 with shmat.
  • Memory in both halves of shmid2 are mapped to the same
  • virtual space.
  • Keep guard pages as read-only.
  • Return memory in shcmid2+pagesize as Buffer base location.
  • Keeps 2*size allocated.

Buffer creation: Memory-mapped Shared Memory

Alternative implementation for Linux/POSIX

  • size required to be a multiple of the page size.
    • Creates a shared memory segment with shm_open.
  • Sets length of memory segment to 2*size.
    • Uses ftruncate.
  • Map the rst half of the file to a pointer first_copy.
    • Uses mmap to point to start of memory segment.
  • Map the second half of the file to a pointer second_copy.
    • Uses mmap to point to first_copy+size.
  • We should reset memory segment to size with ftruncate.Uses first_copy as the buffer's base address.
    • on OSX this isn't allowed, though; not actually compiled.

VM circular buffer preference setting

Working VM Circular Buffer technique is stored in a prefs file

  • Handled by vmcircbuf_prefs class.
  • Path: $HOME/.gnuradio/prefs/vmcircbuf_default_factory
  • Single line that species the default factory function:If no file, we find the best version and store it here.
    • e.g., gr::vmcircbuf_sysv_shm_factory
  • Should only be created once on a machine when GNU Radio is first run.

Building a gr::buffer

Buffers are built and attached at runtime

  • When start is called, flowgraph is flattened and connections created.
  • gr::block_details are created and a gr::buffer for each output.Connects inputs by attaching a gr::buffer_reader.
    • Buffer size is calculated as the number of items to hold.
      • min/max restrictions applied, if set.

Calculating gr::buffer size

gr::flat_flowgraph::allocate_buffer

  • Takes in item_size.
  • Calculates number of items: nitems = s_fixed_buffer_size*2/item_size.Checks that nitems is at least 2x output_multiple.
    • s_fixed_buffer_size = 32768 bytes.
    • doubling the size to allow double buffering.
  • Checks max_output_buffer & min_output_buffer settings.
    • Both default to -1, which means no limit.
  • Checks that nitems is greater than decimation*output_multiple+history.
    • Must have enough to read in all of this at one time.

Calculating gr::buffer size: granularity

gr::buffer::allocate_buffer handles the actual creation

  • Checks if we have the minimum number of items:
    • Based on system granularity (i.e., page size) and item size.
    • Rounds up to a multiple of this minimum number of items.
  • Rounding up based on item size may result in unusual buffer sizes.Calls VM circular buffer default factory function.
    • We handle this; just sends a warning to the user.
  • Sets d_base, the address in memory for the buffer, from the circular buffer.

Controlling the size of buffers: min/max

User interface allows us to set min/max buffer for all blocks

  • Methods of gr::block:
    • gr::block::set_min_output_buffer(port, length)
    • gr::block::set_max_output_buffer(port, length)
  • Methods to set all ports the same:Will still round up to the nearest granularity of a buffer.
    • gr::block::set_min_output_buffer(length)
    • gr::block::set_max_output_buffer(length)
  • Can only be set before runtime to have an effect.

Scetion 6: Wrap-up

Review:

This presentation covered:

  • The responsibility of the scheduler.
  • And understanding of the user interaction with the scheduler.
  • The roles the scheduler plays in the three data streams:
    • Overview of the data stream, message passing, and tag streams.
    • Alignment, output multiple, forecast, and history.
  • Flow chart of the threaded loops each block runs.In-depth look into how the scheduler makes its calculations.
    • Launching the thread body.
    • Handling messages.
    • calling run_one_iteration and its tasks.
  • Buffer structure, calculations.

Purpose:

From the information in this presentation, you should be able to:

  • Better interact with the three data stream models
  • Use the features of the data flow model (forecast, history, etc.) to improve logic, performance
  • Understand how the buffer system works and how to extend or alter it for dierent architectures

參考連接: Overview of the GNU Radio Scheduler

參考連接: Explaining the GNU Radio Scheduler

相關文章
相關標籤/搜索