sonic orchagent與syncd之間的請求與應答

​ syncd進程是介於orchagent與driver之間的進程。syncd從asic-db中讀取的數據經轉換後調用驅動提供的sai接口進行下硬件,同時須要將驅動的應答進行必定的處理,還須要處理驅動的事件通知(好比端口up/down,mac老化等信息)。處理的消息以下圖所示:c++

clipboard.png

orchagent與syncd之間的操做

orchagent與syncd之間會進行以下幾種操做:redis

  • create:建立一個對象
  • remove:刪除一個對象
  • set:設置對象屬性
  • get:獲取對象屬性
  • notify:driver事件通知

對於create,remove,set請求,orchagent會在sairedis層構建一個虛擬的sai層:sairedis。orchagent執行sai接口只是對asic-db進行操做,生成或者刪除虛擬對象(vid)。默認全部操做都是成功的,直接返回,不等待syncd的應答。執行上圖的1和6。syncd從asic-db中讀出請求執行上圖的2,3,4。若是4步驟返回成功,則整個請求運行結束,不然syncd將會發送shutdown通知給orchagent。orchagent會退出,如上圖的5,6.數據庫

對於get操做,orchagent執行1後會使用select阻塞等待syncd的應答,若是syncd在60分鐘內沒有應答,那麼orchagent會產生segment退出。get操做執行順序爲1->2->3->4->5->6。api

對於driver的notify,orchagent會在主進程的select中監聽asic-db。驅動檢測到硬件事件後,調用syncd註冊的回調函數通知syncd。syncd中有一個專門處理driver-notify的線程ntf-thread。ntf-thread解析driver的notify,而後經過asic-db通知orchagent。執行順序7->8->9。app

注:orchagent與syncd關於sai這一層很是類似。它們會調用大量的同名函數。這些函數只是名字相同,orchagent調用的是sai-redis庫中的函數,而syncd調用的是driver提供的sai庫異步

get操做阻塞等待

orchagent執行sai的get操做時會調用到redis_generic_get函數。函數

std::shared_ptr<swss::ConsumerTable>        g_redisGetConsumer;

sai_status_t redis_generic_get(
        _In_ sai_object_type_t object_type,
        _In_ sai_object_id_t object_id,
        _In_ uint32_t attr_count,
        _Out_ sai_attribute_t *attr_list)
{
    SWSS_LOG_ENTER();

    std::string str_object_id = sai_serialize_object_id(object_id);

    return internal_redis_generic_get(
            object_type,
            str_object_id,
            attr_count,
            attr_list);
}

sai_status_t internal_redis_generic_get(
        _In_ sai_object_type_t object_type,
        _In_ const std::string &serialized_object_id,
        _In_ uint32_t attr_count,
        _Out_ sai_attribute_t *attr_list)
{
    SWSS_LOG_ENTER();

    /*
     * Since user may reuse buffers, then oid list buffers maybe not cleared
     * and contain som garbage, let's clean them so we send all oids as null to
     * syncd.
     */

    clear_oid_values(object_type, attr_count, attr_list);

    std::vector<swss::FieldValueTuple> entry = SaiAttributeList::serialize_attr_list(
            object_type,
            attr_count,
            attr_list,
            false);

    std::string str_object_type = sai_serialize_object_type(object_type);

    std::string key = str_object_type + ":" + serialized_object_id;

    SWSS_LOG_DEBUG("generic get key: %s, fields: %lu", key.c_str(), entry.size());

    if (g_record)
    {
        recordLine("g|" + key + "|" + joinFieldValues(entry));
    }

    // get is special, it will not put data
    // into asic view, only to message queue
    // 寫入本次get事件
    g_asicState->set(key, entry, "get");

    // wait for response
    // 建立臨時 select
    swss::Select s;
    // 添加事件
    s.addSelectable(g_redisGetConsumer.get());
    //循環等待syncd的應答
    while (true)
    {
        SWSS_LOG_DEBUG("wait for response");

        swss::Selectable *sel;
        //阻塞等待,時間爲GET_RESPONSE_TIMEOUT  
        int result = s.select(&sel, GET_RESPONSE_TIMEOUT);
        //只處理應答狀況OBJECT
        if (result == swss::Select::OBJECT)
        {
            swss::KeyOpFieldsValuesTuple kco;

            g_redisGetConsumer->pop(kco);

            const std::string &op = kfvOp(kco);
            const std::string &opkey = kfvKey(kco);

            SWSS_LOG_DEBUG("response: op = %s, key = %s", opkey.c_str(), op.c_str());

            if (op != "getresponse") // ignore non response messages
            {
                continue;
            }

            sai_status_t status = internal_redis_get_process(
                    object_type,
                    attr_count,
                    attr_list,
                    kco);

            if (g_record)
            {
                const std::string &str_status = kfvKey(kco);
                const std::vector<swss::FieldValueTuple> &values = kfvFieldsValues(kco);

                // first serialized is status
                recordLine("G|" + str_status + "|" + joinFieldValues(values));
            }

            SWSS_LOG_DEBUG("generic get status: %d", status);

            return status;
        }

        SWSS_LOG_ERROR("generic get failed due to SELECT operation result: %s", getSelectResultAsString(result).c_str());
        break;
    }
    //超時和異常都返回SAI_STATUS_FAILURE
    if (g_record)
    {
        recordLine("G|SAI_STATUS_FAILURE");
    }

    SWSS_LOG_ERROR("generic get failed to get response");

    return SAI_STATUS_FAILURE;
}

對於get操做,當syncd比較忙的時候,極端狀況下會致使orchagent異常退出。ui

notify處理

syncd向驅動註冊回調函數

syncd定義了幾個notify全局函數指針:this

sai_switch_state_change_notification_fn     on_switch_state_change_ntf = on_switch_state_change;
sai_switch_shutdown_request_notification_fn on_switch_shutdown_request_ntf = on_switch_shutdown_request;
sai_fdb_event_notification_fn               on_fdb_event_ntf = on_fdb_event;
sai_port_state_change_notification_fn       on_port_state_change_ntf = on_port_state_change;
sai_packet_event_notification_fn            on_packet_event_ntf = on_packet_event;
sai_queue_pfc_deadlock_notification_fn      on_queue_deadlock_ntf = on_queue_deadlock;

syncd和sai共享命名空間,因此驅動直接使用這些函數指針便可調用對應的函數,在初始化的時候將這些全局函數指針經過sai_set_switch_attribute函數設置到sai層。spa

syncd設置sai層

void check_notifications_pointers(
        _In_ uint32_t attr_count,
        _In_ sai_attribute_t *attr_list)
{
    SWSS_LOG_ENTER();

    /*
     * This function should only be called on CREATE/SET api when object is
     * SWITCH.
     *
     * Notifications pointers needs to be corrected since those we receive from
     * sairedis are in sairedis memory space and here we are using those ones
     * we declared in syncd memory space.
     *
     * Also notice that we are using the same pointers for ALL switches.
     */

    for (uint32_t index = 0; index < attr_count; ++index)
    {
        sai_attribute_t &attr = attr_list[index];

        auto meta = sai_metadata_get_attr_metadata(SAI_OBJECT_TYPE_SWITCH, attr.id);

        if (meta->attrvaluetype != SAI_ATTR_VALUE_TYPE_POINTER)
        {
            continue;
        }

        /*
         * Does not matter if pointer is valid or not, we just want the
         * previous value.
         */

        sai_pointer_t prev = attr.value.ptr;

        if (prev == NULL)
        {
            /*
             * If pointer is NULL, then fine, let it be.
             */

            continue;
        }

        switch (attr.id)
        {
            case SAI_SWITCH_ATTR_SWITCH_STATE_CHANGE_NOTIFY:
                attr.value.ptr = (void*)on_switch_state_change_ntf;
                break;

            case SAI_SWITCH_ATTR_SHUTDOWN_REQUEST_NOTIFY:
                attr.value.ptr = (void*)on_switch_shutdown_request_ntf;
                break;

            case SAI_SWITCH_ATTR_FDB_EVENT_NOTIFY:
                attr.value.ptr = (void*)on_fdb_event_ntf;
                break;

            case SAI_SWITCH_ATTR_PORT_STATE_CHANGE_NOTIFY:
                attr.value.ptr = (void*)on_port_state_change_ntf;
                break;

            case SAI_SWITCH_ATTR_PACKET_EVENT_NOTIFY:
                attr.value.ptr = (void*)on_packet_event_ntf;
                break;

            case SAI_SWITCH_ATTR_QUEUE_PFC_DEADLOCK_NOTIFY:
                attr.value.ptr = (void*)on_queue_deadlock_ntf;
                break;

            default:

                SWSS_LOG_ERROR("pointer for %s is not handled, FIXME!", meta->attridname);
                continue;
        }

        /*
         * Here we translated pointer, just log it.
         */

        SWSS_LOG_INFO("%s: %lp (orch) => %lp (syncd)", meta->attridname, prev, attr.value.ptr);
    }
}
/*
* Routine Description:
*    Set switch attribute value
*
* Arguments:
 *   [in] switch_id Switch id
*    [in] attr - switch attribute
*
* Return Values:
*    SAI_STATUS_SUCCESS on success
*    Failure status code on error
*/
sai_status_t sai_set_switch_attribute(_In_ sai_object_id_t switch_id,
                                      _In_ const sai_attribute_t *attr) {
  SAI_LOG_ENTER();

  sai_status_t status = SAI_STATUS_SUCCESS;
  switch_status_t switch_status = SWITCH_STATUS_SUCCESS;
  switch_uint64_t flags = 0;
  switch_api_device_info_t api_device_info;
  sai_packet_action_t sai_packet_action;
  switch_acl_action_t switch_packet_action;
  switch_packet_type_t switch_packet_type = SWITCH_PACKET_TYPE_UNICAST;
  bool cut_through = false;

  if (!attr) {
    status = SAI_STATUS_INVALID_PARAMETER;
    SAI_LOG_ERROR("null attribute: %s", sai_status_to_string(status));
    return status;
  }

  memset(&api_device_info, 0x0, sizeof(api_device_info));
  if (status != SAI_STATUS_SUCCESS) {
    return status;
  }
  if (attr->id <= SAI_SWITCH_ATTR_ACL_STAGE_EGRESS) {  // Unsupported
    SAI_LOG_DEBUG("Switch attribute set: %s", switch_attr_name[attr->id]);
  }

  switch (attr->id) {
    ......
        
    case SAI_SWITCH_ATTR_FDB_EVENT_NOTIFY:
      sai_switch_notifications.on_fdb_event = attr->value.ptr;
      break;
    case SAI_SWITCH_ATTR_PORT_STATE_CHANGE_NOTIFY:
      sai_switch_notifications.on_port_state_change = attr->value.ptr;
      break;
    case SAI_SWITCH_ATTR_PACKET_EVENT_NOTIFY:
      sai_switch_notifications.on_packet_event = attr->value.ptr;
      break;
    case SAI_SWITCH_ATTR_SWITCH_STATE_CHANGE_NOTIFY:
      sai_switch_notifications.on_switch_state_change = attr->value.ptr;
      break;
    case SAI_SWITCH_ATTR_SHUTDOWN_REQUEST_NOTIFY:
      sai_switch_notifications.on_switch_shutdown_request = attr->value.ptr;
      break;
    ......

    default:
      SAI_LOG_ERROR("Unsupported Switch attribute: %d", attr->id);
      // Unsupported: Temporary hack till all attrs are supported
      switch_status = SWITCH_STATUS_SUCCESS;
  }
  ......
}

sai接口初始化的時候會向驅動註冊回調函數,回調函數中會調用咱們註冊的全局函數指針,咱們以fdb爲例進行說明:

sai_status_t sai_fdb_initialize(sai_api_service_t *sai_api_service) {
  SAI_LOG_DEBUG("initializing fdb");
  sai_api_service->fdb_api = fdb_api;
  switch_uint16_t mac_event_flags = 0;
  mac_event_flags |= SWITCH_MAC_EVENT_LEARN | SWITCH_MAC_EVENT_AGE |
                     SWITCH_MAC_EVENT_MOVE | SWITCH_MAC_EVENT_DELETE;
  switch_api_mac_notification_register(
      device, SWITCH_SAI_APP_ID, mac_event_flags, &sai_mac_notify_cb);
  switch_api_mac_table_set_learning_timeout(device, SAI_L2_LEARN_TIMEOUT);
  return SAI_STATUS_SUCCESS;
}
//初始化fdb的sai接口的時候,向驅動註冊了sai_mac_notify_cb回調函數。
static void sai_mac_notify_cb(const switch_device_t device,
                              const uint16_t num_entries,
                              const switch_api_mac_entry_t *mac_entry,
                              const switch_mac_event_t mac_event,
                              void *app_data) {
  SAI_LOG_ENTER();
  sai_fdb_event_notification_data_t fdb_event[num_entries];
  sai_attribute_t attr_lists[num_entries][2];
  uint16_t entry = 0;
  //判斷回調函數是否爲空
  if (!sai_switch_notifications.on_fdb_event) {
    return;
  }

  if (!mac_entry) {
    SAI_LOG_ERROR("invalid argument");
    return;
  }

  if (!num_entries) {
    SAI_LOG_DEBUG("sai mac notify callback with null entries");
    return;
  }

  for (entry = 0; entry < num_entries; entry++) {
    memset(&fdb_event[entry], 0, sizeof(fdb_event[entry]));
    fdb_event[entry].event_type = switch_mac_event_to_sai_fdb_event(mac_event);
    memcpy(fdb_event[entry].fdb_entry.mac_address,
           mac_entry[entry].mac.mac_addr,
           ETH_ALEN);
    fdb_event[entry].fdb_entry.switch_id =
        (((unsigned long)SWITCH_HANDLE_TYPE_DEVICE)
         << SWITCH_HANDLE_TYPE_SHIFT) |
        0x1;
    fdb_event[entry].fdb_entry.bv_id = mac_entry[entry].network_handle;

    memset(attr_lists[entry], 0, sizeof(attr_lists[entry]));
    attr_lists[entry][0].id = SAI_FDB_ENTRY_ATTR_TYPE;
    attr_lists[entry][0].value.s32 = SAI_FDB_ENTRY_TYPE_DYNAMIC;
    attr_lists[entry][1].id = SAI_FDB_ENTRY_ATTR_BRIDGE_PORT_ID;
    attr_lists[entry][1].value.oid = mac_entry->handle;
    fdb_event[entry].attr_count = 2;
    if (fdb_event[entry].event_type == SAI_FDB_EVENT_FLUSHED) {
      // Overwriting now for SONiC to be able to process it correctly
      fdb_event[entry].event_type = SAI_FDB_EVENT_AGED;
    }
    fdb_event[entry].attr = attr_lists[entry];
  }
  //調用syncd的回調函數
  sai_switch_notifications.on_fdb_event(num_entries, fdb_event);

  SAI_LOG_EXIT();

  return;
}

syncd啓動notify線程

std::shared_ptr<std::thread> ntf_process_thread;

void startNotificationsProcessingThread()
{
    SWSS_LOG_ENTER();

    runThread = true;

    ntf_process_thread = std::make_shared<std::thread>(ntf_process_function);
}

void ntf_process_function()
{
    SWSS_LOG_ENTER();

    while (runThread)
    {
        cv.wait(ulock);

        // this is notifications processing thread context, which is different
        // from SAI notifications context, we can safe use g_mutex here,
        // processing each notification is under same mutex as processing main
        // events, counters and reinit

        swss::KeyOpFieldsValuesTuple item;

        while (tryDequeue(item))//從隊列中取出notify
        {
            processNotification(item);//處理notify
        }
    }
}
bool tryDequeue(
        _Out_ swss::KeyOpFieldsValuesTuple &item)
{
    std::lock_guard<std::mutex> lock(queue_mutex);

    SWSS_LOG_ENTER();

    if (ntf_queue.empty())
    {
        return false;
    }

    item = ntf_queue.front();

    ntf_queue.pop();

    return true;
}

void processNotification(
        _In_ const swss::KeyOpFieldsValuesTuple &item)
{
    std::lock_guard<std::mutex> lock(g_mutex);

    SWSS_LOG_ENTER();

    std::string notification = kfvKey(item);
    std::string data = kfvOp(item);

    if (notification == "switch_state_change")
    {
        handle_switch_state_change(data);
    }
    else if (notification == "fdb_event")
    {
        handle_fdb_event(data);
    }
    else if (notification == "port_state_change")
    {
        handle_port_state_change(data);
    }
    else if (notification == "switch_shutdown_request")
    {
        handle_switch_shutdown_request(data);
    }
    else if (notification == "queue_deadlock")
    {
        handle_queue_deadlock(data);
    }
    else
    {
        SWSS_LOG_ERROR("unknow notification: %s", notification.c_str());
    }
}

void handle_fdb_event(
        _In_ const std::string &data)
{
    SWSS_LOG_ENTER();

    uint32_t count;
    sai_fdb_event_notification_data_t *fdbevent = NULL;

    sai_deserialize_fdb_event_ntf(data, count, &fdbevent);

    process_on_fdb_event(count, fdbevent);

    sai_deserialize_free_fdb_event_ntf(count, fdbevent);
}

void process_on_fdb_event(
        _In_ uint32_t count,
        _In_ sai_fdb_event_notification_data_t *data)
{
    SWSS_LOG_ENTER();

    SWSS_LOG_DEBUG("fdb event count: %d", count);

    for (uint32_t i = 0; i < count; i++)
    {
        sai_fdb_event_notification_data_t *fdb = &data[i];

        SWSS_LOG_DEBUG("fdb %u: type: %d", i, fdb->event_type);

        fdb->fdb_entry.switch_id = translate_rid_to_vid(fdb->fdb_entry.switch_id, SAI_NULL_OBJECT_ID);

        fdb->fdb_entry.bv_id = translate_rid_to_vid(fdb->fdb_entry.bv_id, fdb->fdb_entry.switch_id);

        translate_rid_to_vid_list(SAI_OBJECT_TYPE_FDB_ENTRY, fdb->fdb_entry.switch_id, fdb->attr_count, fdb->attr);

        /*
         * Currently because of bcrm bug, we need to install fdb entries in
         * asic view and currently this event don't have fdb type which is
         * required on creation.
         */

        redisPutFdbEntryToAsicView(fdb);
    }

    std::string s = sai_serialize_fdb_event_ntf(count, data);

    send_notification("fdb_event", s);
}
void send_notification(
        _In_ std::string op,
        _In_ std::string data,
        _In_ std::vector<swss::FieldValueTuple> &entry)
{
    SWSS_LOG_ENTER();

    SWSS_LOG_INFO("%s %s", op.c_str(), data.c_str());
    //寫入數據庫
    notifications->send(op, data, entry);

    SWSS_LOG_DEBUG("notification send successfull");
}

void send_notification(
        _In_ std::string op,
        _In_ std::string data)
{
    SWSS_LOG_ENTER();

    std::vector<swss::FieldValueTuple> entry;

    send_notification(op, data, entry);
}

orchagent啓動notify線程

//啓動線程
sai_status_t sai_api_initialize(
        _In_ uint64_t flags,
        _In_ const sai_service_method_table_t* services)
{
    ......
    notification_thread = std::make_shared<std::thread>(std::thread(ntf_thread));
    ......
}
//線程主函數
void ntf_thread()
{
    SWSS_LOG_ENTER();

    swss::Select s;

    s.addSelectable(g_redisNotifications.get());
    s.addSelectable(&g_redisNotificationTrheadEvent);

    while (g_run)
    {
        swss::Selectable *sel;

        int result = s.select(&sel);

        if (sel == &g_redisNotificationTrheadEvent)
        {
            // user requested shutdown_switch
            break;
        }

        if (result == swss::Select::OBJECT)
        {
            swss::KeyOpFieldsValuesTuple kco;

            std::string op;
            std::string data;
            std::vector<swss::FieldValueTuple> values;

            g_redisNotifications->pop(op, data, values);

            SWSS_LOG_DEBUG("notification: op = %s, data = %s", op.c_str(), data.c_str());

            handle_notification(op, data, values);
        }
    }
}

void handle_fdb_event(
        _In_ const std::string &data)
{
    SWSS_LOG_ENTER();

    SWSS_LOG_DEBUG("data: %s", data.c_str());

    uint32_t count;
    sai_fdb_event_notification_data_t *fdbevent = NULL;

    sai_deserialize_fdb_event_ntf(data, count, &fdbevent);

    {
        std::lock_guard<std::mutex> lock(g_apimutex);

        // NOTE: this meta api must be under mutex since
        // it will access meta DB and notification comes
        // from different thread

        meta_sai_on_fdb_event(count, fdbevent);
    }

    if (on_fdb_event != NULL)
    {
        on_fdb_event(count, fdbevent);
    }

    sai_deserialize_free_fdb_event_ntf(count, fdbevent);
}

syncd的回調函數

std::mutex queue_mutex;
std::queue<swss::KeyOpFieldsValuesTuple> ntf_queue;
void on_fdb_event(
        _In_ uint32_t count,
        _In_ const sai_fdb_event_notification_data_t *data)
{
    SWSS_LOG_ENTER();

    std::string s = sai_serialize_fdb_event_ntf(count, data);

    enqueue_notification("fdb_event", s);
}

void enqueue_notification(
        _In_ std::string op,
        _In_ std::string data,
        _In_ std::vector<swss::FieldValueTuple> &entry)
{
    SWSS_LOG_ENTER();

    SWSS_LOG_INFO("%s %s", op.c_str(), data.c_str());

    swss::KeyOpFieldsValuesTuple item(op, data, entry);

    // this is notification context, so we need to protect queue

    std::lock_guard<std::mutex> lock(queue_mutex);
    //壓入隊列
    ntf_queue.push(item);

    cv.notify_all();
}

void enqueue_notification(
        _In_ std::string op,
        _In_ std::string data)
{
    SWSS_LOG_ENTER();

    std::vector<swss::FieldValueTuple> entry;

    enqueue_notification(op, data, entry);
}

上面三部分就是硬件觸發的異步事件從硬件層同步到syncd層,再到orchagent層的處理過程。涉及一個回調函數,兩個notify處理線程。

相關文章
相關標籤/搜索