【6.C++基礎】-框架

thrift

在網絡一節中簡單介紹了thrift的協議部分,在工程中會用獲得thrift的線程併發,process,server庫。定義idl後生成代碼和業務編寫代碼的關係以下:react

clipboard.png

運行過程:nginx

1.開啓threaft線程池
    a.主線程建立n個,開啓,數量不夠workerMonitor_.wait。到100個就死了(加鎖,結束釋放)
    b.工做線程開啓後,加鎖,增長數量,workerMonitor_.notify
      任務空monitor_.wait()
      不然取任務,判斷等待隊列長度不到閾值則manager_->maxMonitor_.notify(),
      釋放鎖。執行任務。結束後繼續搶鎖循環
2.開啓nonblockingserver,io線程就緒
    a.非0號其餘線程先start,設置eventbase(iothread),createpipe,註冊notify;
      event_base_loop(eventBase_, 0);【無監聽,每一個io線程本身的event_base】
    b.0號線程註冊事件,設置eventbase(iothread);註冊監聽;createpipe,註冊notify。
      0號io線程run,開始監聽。其餘io線程join
3.0號監聽到handleEvent
    accept
    加鎖create connection(init狀態)分配鏈接給io線程(輪詢)釋放鎖,通知分配的線程notifyhandler
4.分配到鏈接的IO線程notifyhandler(read notifyfd,transition)
    本次transition: init會加讀事件setread,回調爲workSocket,讀取後繼續給transition
    繼續循環到讀取結束,調用addtask=>setidle,不須要監聽cfd
5.addtask
    thrift,加鎖,若是tasks_.size() >= pendingTaskCountMax_,maxMonitor_.wait(timeout);
    加入task隊列,有空閒線程monitor_.notify()。任何一種monitor都公用一個鎖。
    這裏的task就是process而後notifyIOThread(read notifyfd,transition)。
6.處理後通知IO線程
    transition將cfd改成監聽寫事件,workSocket調用connenction的回調發送。
7.connenction的回調發送以後繼續notifyIOThread 本次transition重置緩存區結束。

clipboard.png
總結:多reactor多線程模式,一個accept,多個讀寫,單獨任務處理。正常只須要一個reactor。單reactor多線程形式。redis

http_server

clipboard.png

clipboard.png

關於優雅重啓

nginx這種多進程的比教好作,由於子進程能夠獨立於父進程。
主進程fork,繼承監聽fd,鎖等,exec()執行完整代碼。此時舊的子進程和新的子進程均可以搶鎖監聽fd處理鏈接,關閉舊主進程,給舊的子進程發送關閉信號,子進程在處理後纔會監聽到信號,作到了優雅。
線程沒辦法獨立監聽信號。數組

鏈接池

這裏0是可用的。可是不要真的ping,不然代價太大,能夠用read若是鏈接還在會發送EAGAIN錯誤則是鏈接中
clipboard.png
add的就是任意鏈接對象。實現connect,reconnect.
好比緩存

for (int i = 0; i < connectionCount; ++i) {
        RedisClient* redis = new RedisClient(host, port, conn_timeout_ms, rw_timeout_ms);
        redis->init();//CONNECT
        redisPool_.add(redis);
    }

改造的redis_pool

鏈接池+線程池+hiredis分別負責鏈接管理和併發請求處理。
封裝目的:通常併發到分片獲取數據的代理都有如下缺點:一個失敗所有失敗,要等全部返回才返回,而mget的失敗會被放大。所以本身在業務層控制整個mget的超時時間和返回,到代理層已經拆分爲當個get,用線程池實現。網絡

spdlog

  • 業務調用多線程

    spdlog::set_async_mode(8192*4, spdlog::async_overflow_policy::block_retry,nullptr, std::chrono::seconds(3));
    std::string info_file =  FLAGS_log_path + "/" + FLAGS_info_file
    auto debug_logger = spdlog::basic_logger_mt("debug_logger", info_file.c_str());
    debug_logger->set_pattern("INFO: %Y-%m-%d %H:%M:%S %v");
    
    
    inline std::shared_ptr<spdlog::logger> spdlog::create(const std::string& logger_name, Args... args)
    {
        sink_ptr sink = std::make_shared<Sink>(args...);
        return details::registry::instance().create(logger_name, { sink });    
        /*鎖控制  new_logger = std::make_shared<async_logger>(logger_name, sinks_begin, sinks_end, _async_q_size, _overflow_policy, _worker_warmup_cb, _flush_interval_ms, _worker_teardown_cb);    //這裏啓線程
        _loggers[logger_name] = new_logger;*/
    }
    
     auto logger = spdlog::get("warn_logger");\
               if (logger != NULL) { \
                   logger->info("{}:{} {}", cplusutils::servbase_basename(__FILE__), __LINE__, log_info.str()); \
               }
    
    info()=>log()->push_msg()
  • spdlog的push_msg就是enqueue併發

    inline void spdlog::details::async_log_helper::push_msg(details::async_log_helper::async_msg&& new_msg)
    {
        if (!_q.enqueue(std::move(new_msg)) && _overflow_policy != async_overflow_policy::discard_log_msg)
        {
            auto last_op_time = details::os::now();
            auto now = last_op_time;
            do
            {
                now = details::os::now();
                sleep_or_yield(now, last_op_time);
            }
            while (!_q.enqueue(std::move(new_msg)));
        }
    }
  • spdlog每一個日誌都一個線程,啓動後會循環等dequeue到落盤async

    _worker_thread(&async_log_helper::worker_loop, this)
    while (active)
        {
            try
            {
                active = process_next_msg(last_pop, last_flush);
            }
        }
    
    
        inline bool spdlog::details::async_log_helper::process_next_msg(log_clock::time_point& last_pop, log_clock::time_point& last_flush)
    {
        async_msg incoming_async_msg;
    
        if (_q.dequeue(incoming_async_msg))
        {
                for (auto &s : _sinks)
                {
                    if (s->should_log(incoming_log_msg.level))
                    {
                        s->log(incoming_log_msg);   //調用正常的文件讀寫。
                    }
                }
           
        }
        else
        {
            auto now = details::os::now();
            handle_flush_interval(now, last_flush);
            sleep_or_yield(now, last_pop);
            return !_terminate_requested;
        }
    }
  • 無鎖隊列oop

    bool enqueue(T&& data)
        {
            cell_t* cell;
            size_t pos = enqueue_pos_.load(std::memory_order_relaxed);
            for (;;)
            {
                cell = &buffer_[pos & buffer_mask_];
                size_t seq = cell->sequence_.load(std::memory_order_acquire);
                intptr_t dif = static_cast<intptr_t>(seq) - static_cast<intptr_t>(pos);
                if (dif == 0)
                {
                    if (enqueue_pos_.compare_exchange_weak(pos, pos + 1, std::memory_order_relaxed))
                        break;
                }
                else if (dif < 0)
                {
                    return false;
                }
                else
                {
                    pos = enqueue_pos_.load(std::memory_order_relaxed);
                }
            }
            cell->data_ = std::move(data);
            cell->sequence_.store(pos + 1, std::memory_order_release);
            return true;
        }
    
        bool dequeue(T& data)
        {
            cell_t* cell;
            size_t pos = dequeue_pos_.load(std::memory_order_relaxed);
            for (;;)
            {
                cell = &buffer_[pos & buffer_mask_];
                size_t seq =
                    cell->sequence_.load(std::memory_order_acquire);
                intptr_t dif = static_cast<intptr_t>(seq) - static_cast<intptr_t>(pos + 1);
                if (dif == 0)
                {
                    if (dequeue_pos_.compare_exchange_weak(pos, pos + 1, std::memory_order_relaxed))
                        break;
                }
                else if (dif < 0)
                    return false;
                else
                    pos = dequeue_pos_.load(std::memory_order_relaxed);
            }
            data = std::move(cell->data_);
            cell->sequence_.store(pos + buffer_mask_ + 1, std::memory_order_release);
            return true;
        }

    buffer 數組。每一個有seq,data
    enqueue seq前移pos+1
    dequeue seq前移pos+1+mask 循環複用
    memory_order_relaxed:不對執行順序作保證
    memory_order_acquire:本線程中,全部後續的讀操做必須在本條原子操做完成後執行 memory_order_release:本線程中,全部以前的寫操做完成後才能執行本條原子操做 a.compare_exchange_weak(n,w):比較a和n,若是相等,a賦值爲w。不相等,n賦值爲a,返回false

    buffer {sequence,data} enqueue_pos 兩個和cell中的值一直加1 dequeue_pos 同上

    爲什麼一個acquire一個relaxed呢? pos的CAS能夠保證寫的原子性。最低relaxed。能保證單獨操做原子,保證不了順序, 這種對順序的限制性能必定比鎖好嗎? 這個只對單指令作限制,性能比鎖好

相關文章
相關標籤/搜索