在網絡一節中簡單介紹了thrift的協議部分,在工程中會用獲得thrift的線程併發,process,server庫。定義idl後生成代碼和業務編寫代碼的關係以下:react
運行過程:nginx
1.開啓threaft線程池 a.主線程建立n個,開啓,數量不夠workerMonitor_.wait。到100個就死了(加鎖,結束釋放) b.工做線程開啓後,加鎖,增長數量,workerMonitor_.notify 任務空monitor_.wait() 不然取任務,判斷等待隊列長度不到閾值則manager_->maxMonitor_.notify(), 釋放鎖。執行任務。結束後繼續搶鎖循環 2.開啓nonblockingserver,io線程就緒 a.非0號其餘線程先start,設置eventbase(iothread),createpipe,註冊notify; event_base_loop(eventBase_, 0);【無監聽,每一個io線程本身的event_base】 b.0號線程註冊事件,設置eventbase(iothread);註冊監聽;createpipe,註冊notify。 0號io線程run,開始監聽。其餘io線程join 3.0號監聽到handleEvent accept 加鎖create connection(init狀態)分配鏈接給io線程(輪詢)釋放鎖,通知分配的線程notifyhandler 4.分配到鏈接的IO線程notifyhandler(read notifyfd,transition) 本次transition: init會加讀事件setread,回調爲workSocket,讀取後繼續給transition 繼續循環到讀取結束,調用addtask=>setidle,不須要監聽cfd 5.addtask thrift,加鎖,若是tasks_.size() >= pendingTaskCountMax_,maxMonitor_.wait(timeout); 加入task隊列,有空閒線程monitor_.notify()。任何一種monitor都公用一個鎖。 這裏的task就是process而後notifyIOThread(read notifyfd,transition)。 6.處理後通知IO線程 transition將cfd改成監聽寫事件,workSocket調用connenction的回調發送。 7.connenction的回調發送以後繼續notifyIOThread 本次transition重置緩存區結束。
總結:多reactor多線程模式,一個accept,多個讀寫,單獨任務處理。正常只須要一個reactor。單reactor多線程形式。redis
nginx這種多進程的比教好作,由於子進程能夠獨立於父進程。
主進程fork,繼承監聽fd,鎖等,exec()執行完整代碼。此時舊的子進程和新的子進程均可以搶鎖監聽fd處理鏈接,關閉舊主進程,給舊的子進程發送關閉信號,子進程在處理後纔會監聽到信號,作到了優雅。
線程沒辦法獨立監聽信號。數組
這裏0是可用的。可是不要真的ping,不然代價太大,能夠用read若是鏈接還在會發送EAGAIN錯誤則是鏈接中
add的就是任意鏈接對象。實現connect,reconnect.
好比緩存
for (int i = 0; i < connectionCount; ++i) { RedisClient* redis = new RedisClient(host, port, conn_timeout_ms, rw_timeout_ms); redis->init();//CONNECT redisPool_.add(redis); }
鏈接池+線程池+hiredis分別負責鏈接管理和併發請求處理。
封裝目的:通常併發到分片獲取數據的代理都有如下缺點:一個失敗所有失敗,要等全部返回才返回,而mget的失敗會被放大。所以本身在業務層控制整個mget的超時時間和返回,到代理層已經拆分爲當個get,用線程池實現。網絡
業務調用多線程
spdlog::set_async_mode(8192*4, spdlog::async_overflow_policy::block_retry,nullptr, std::chrono::seconds(3)); std::string info_file = FLAGS_log_path + "/" + FLAGS_info_file auto debug_logger = spdlog::basic_logger_mt("debug_logger", info_file.c_str()); debug_logger->set_pattern("INFO: %Y-%m-%d %H:%M:%S %v"); inline std::shared_ptr<spdlog::logger> spdlog::create(const std::string& logger_name, Args... args) { sink_ptr sink = std::make_shared<Sink>(args...); return details::registry::instance().create(logger_name, { sink }); /*鎖控制 new_logger = std::make_shared<async_logger>(logger_name, sinks_begin, sinks_end, _async_q_size, _overflow_policy, _worker_warmup_cb, _flush_interval_ms, _worker_teardown_cb); //這裏啓線程 _loggers[logger_name] = new_logger;*/ } auto logger = spdlog::get("warn_logger");\ if (logger != NULL) { \ logger->info("{}:{} {}", cplusutils::servbase_basename(__FILE__), __LINE__, log_info.str()); \ } info()=>log()->push_msg()
spdlog的push_msg就是enqueue併發
inline void spdlog::details::async_log_helper::push_msg(details::async_log_helper::async_msg&& new_msg) { if (!_q.enqueue(std::move(new_msg)) && _overflow_policy != async_overflow_policy::discard_log_msg) { auto last_op_time = details::os::now(); auto now = last_op_time; do { now = details::os::now(); sleep_or_yield(now, last_op_time); } while (!_q.enqueue(std::move(new_msg))); } }
spdlog每一個日誌都一個線程,啓動後會循環等dequeue到落盤async
_worker_thread(&async_log_helper::worker_loop, this) while (active) { try { active = process_next_msg(last_pop, last_flush); } } inline bool spdlog::details::async_log_helper::process_next_msg(log_clock::time_point& last_pop, log_clock::time_point& last_flush) { async_msg incoming_async_msg; if (_q.dequeue(incoming_async_msg)) { for (auto &s : _sinks) { if (s->should_log(incoming_log_msg.level)) { s->log(incoming_log_msg); //調用正常的文件讀寫。 } } } else { auto now = details::os::now(); handle_flush_interval(now, last_flush); sleep_or_yield(now, last_pop); return !_terminate_requested; } }
無鎖隊列oop
bool enqueue(T&& data) { cell_t* cell; size_t pos = enqueue_pos_.load(std::memory_order_relaxed); for (;;) { cell = &buffer_[pos & buffer_mask_]; size_t seq = cell->sequence_.load(std::memory_order_acquire); intptr_t dif = static_cast<intptr_t>(seq) - static_cast<intptr_t>(pos); if (dif == 0) { if (enqueue_pos_.compare_exchange_weak(pos, pos + 1, std::memory_order_relaxed)) break; } else if (dif < 0) { return false; } else { pos = enqueue_pos_.load(std::memory_order_relaxed); } } cell->data_ = std::move(data); cell->sequence_.store(pos + 1, std::memory_order_release); return true; } bool dequeue(T& data) { cell_t* cell; size_t pos = dequeue_pos_.load(std::memory_order_relaxed); for (;;) { cell = &buffer_[pos & buffer_mask_]; size_t seq = cell->sequence_.load(std::memory_order_acquire); intptr_t dif = static_cast<intptr_t>(seq) - static_cast<intptr_t>(pos + 1); if (dif == 0) { if (dequeue_pos_.compare_exchange_weak(pos, pos + 1, std::memory_order_relaxed)) break; } else if (dif < 0) return false; else pos = dequeue_pos_.load(std::memory_order_relaxed); } data = std::move(cell->data_); cell->sequence_.store(pos + buffer_mask_ + 1, std::memory_order_release); return true; }
buffer 數組。每一個有seq,data
enqueue seq前移pos+1
dequeue seq前移pos+1+mask 循環複用
memory_order_relaxed:不對執行順序作保證
memory_order_acquire:本線程中,全部後續的讀操做必須在本條原子操做完成後執行 memory_order_release:本線程中,全部以前的寫操做完成後才能執行本條原子操做 a.compare_exchange_weak(n,w):比較a和n,若是相等,a賦值爲w。不相等,n賦值爲a,返回false
buffer {sequence,data} enqueue_pos 兩個和cell中的值一直加1 dequeue_pos 同上
爲什麼一個acquire一個relaxed呢? pos的CAS能夠保證寫的原子性。最低relaxed。能保證單獨操做原子,保證不了順序, 這種對順序的限制性能必定比鎖好嗎? 這個只對單指令作限制,性能比鎖好