做者: 嘩啦啦 mesh團隊,熱衷於kubernetes、devops、apollo、istio、linkerd、openstack、calico 等領域技術。git
Linkerd由控制平面
和數據平面
組成:github
控制平面
是在所屬的Kubernetes命名空間
(linkerd默認狀況下)中運行的一組服務,這些服務能夠完成匯聚遙測數據
,提供面向用戶的API,並向數據平面
代理提供控制數據
等,它們共同驅動
數據平面。 數據平面
用Rust編寫的輕量級代理,該代理安裝在服務的每一個pod
中,併成爲數據平面的一部分,它接收Pod的全部接入
流量,並經過initContainer
配置iptables
正確轉發流量的攔截全部傳出流量,由於它是附加工具,而且攔截服務的全部傳入和傳出
流量,因此不須要更改代碼,甚至能夠將其添加到正在運行
的服務中。 借用官方的圖:api
proxy由rust開發完成,其內部的異步運行時採用了Tokio框架,服務組件用到了tower。app
本文主要關注proxy與destination組件交互相關的總體邏輯,分析proxy內部的運行邏輯。負載均衡
proxy啓動後:框架
app::init
初始化配置 app::Main::new
建立主邏輯main
, main.run_until
內新加一任務 ProxyParts::build_proxy_task
。 在ProxyParts::build_proxy_task
中會進行一系列的初始化工做,此處只關注dst_svc
,其建立代碼爲:less
dst_svc = svc::stack(connect::svc(keepalive))
.push(tls::client::layer(local_identity.clone()))
.push_timeout(config.control_connect_timeout)
.push(control::client::layer())
.push(control::resolve::layer(dns_resolver.clone()))
.push(reconnect::layer({
let backoff = config.control_backoff.clone();
move |_| Ok(backoff.stream())
}))
.push(http_metrics::layer::<_, classify::Response>(
ctl_http_metrics.clone(),
))
.push(proxy::grpc::req_body_as_payload::layer().per_make())
.push(control::add_origin::layer())
.push_buffer_pending(
config.destination_buffer_capacity,
config.control_dispatch_timeout,
)
.into_inner()
.make(config.destination_addr.clone())複製代碼
dst_svc
一共有2處引用,一是crate::resolve::Resolver
的建立會涉及;另外一個就是ProfilesClient
的建立。異步
Resolver
api_resolve::Resolve::new(dst_svc.clone())
建立resolver
對象 outbound::resolve
建立 map_endpoint::Resolve
類型對象,並當作參數resolve
傳入outbound::spawn
函數開啓出口線程 在outbound::spawn
中,resolve
被用於建立負載均衡控制層,並用於後續路由控制:ide
let balancer_layer = svc::layers()
.push_spawn_ready()
.push(discover::Layer::new(
DISCOVER_UPDATE_BUFFER_CAPACITY,
resolve,
))
.push(balance::layer(EWMA_DEFAULT_RTT, EWMA_DECAY));複製代碼
在discover::Layer::layer
中:函數
let from_resolve = FromResolve::new(self.resolve.clone());
let make_discover = MakeEndpoint::new(make_endpoint, from_resolve);
Buffer::new(self.capacity, make_discover)複製代碼
Profiles
ProfilesClient::new
中調用api::client::Destination::new(dst_svc)
建立grpc的client端並存於成員變量service
profiles_client
對象會被用於inbound
和outbound
的建立(省略無關代碼): let dst_stack = svc::stack(...)...
.push(profiles::router::layer(
profile_suffixes,
profiles_client,
dst_route_stack,
))
...複製代碼
其中profiles::router::layer
會建立一個Layer
對象,並將profiles_client
賦予get_routes
成員。而後在service
方法中,會調到Layer::layer
方法,裏面會建立一個MakeSvc
對象,其get_routes
成員的值即爲profiles_client
。
新的鏈接過來時,從listen
拿到鏈接對象後,會交給linkerd_proxy::transport::tls::accept::AcceptTls
的call
,而後是linkerd2_proxy::proxy::server::Server
的call
,並最終分別調用linkerd2_proxy_http::balance::MakeSvc::call
和linkerd2_proxy_http::profiles::router::MakeSvc::call
方法。
balance
在linkerd2_proxy_http::balance::MakeSvc::call
中:
inner.call(target)
,此處的inner
便是前面Buffer::new
的結果。 linkerd2_proxy_http::balance::MakeSvc
對象,當作Future
返回 先看inner.call
。它內部通過層層調用,依次觸發Buffer
、MakeEndpoint
、FromResolve
等結構的call
方法,最終會觸發最開始建立的resolve.resolve(target)
,其內部調用api_resolve::Resolve::call
。
在api_resolve::Resolve::call
中:
fn call(&mut self, target: T) -> Self::Future {
let path = target.to_string();
trace!("resolve {:?}", path);
self.service
// GRPC請求,獲取k8s的endpoint
.get(grpc::Request::new(api::GetDestination {
path,
scheme: self.scheme.clone(),
context_token: self.context_token.clone(),
}))
.map(|rsp| {
debug!(metadata = ?rsp.metadata());
// 拿到結果stream
Resolution {
inner: rsp.into_inner(),
}
})
}複製代碼
將返回的Resolution
再次放入MakeSvc
中,而後看其poll:
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
// 這個poll會依次調用:
// linkerd2_proxy_api_resolve::resolve::Resolution::poll
// linkerd2_proxy_discover::from_resolve::DiscoverFuture::poll
// linkerd2_proxy_discover::make_endpoint::DiscoverFuture::poll
// 最終得到Poll<Change<SocketAddr, Endpoint>>
let discover = try_ready!(self.inner.poll());
let instrument = PendingUntilFirstData::default();
let loaded = PeakEwmaDiscover::new(discover, self.default_rtt, self.decay, instrument);
let balance = Balance::new(loaded, self.rng.clone());
Ok(Async::Ready(balance))
}複製代碼
最終返回service Balance
。
當具體請求過來後,先會判斷Balance::poll_ready
:
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
// 獲取Update<Endpoint>
// 將Remove的從self.ready_services中刪掉
// 將Insert的構造UnreadyService結構加到self.unready_services
self.poll_discover()?;
// 對UnreadyService,調用其poll,內部會調用到svc的poll_ready判斷endpoint是否可用
// 可用時,將其加入self.ready_services
self.poll_unready();
loop {
if let Some(index) = self.next_ready_index {
// 找到對應的endpoint,可用則返回
if let Ok(Async::Ready(())) = self.poll_ready_index_or_evict(index) {
return Ok(Async::Ready(()));
}
}
// 選擇負載比較低的endpoint
self.next_ready_index = self.p2c_next_ready_index();
if self.next_ready_index.is_none() {
//
return Ok(Async::NotReady);
}
}
}複製代碼
就緒後,對請求req
調用call
:
fn call(&mut self, request: Req) -> Self::Future {
// 找到下一個可用的svc,並將其從ready_services中刪除
let index = self.next_ready_index.take().expect("not ready");
let (key, mut svc) = self
.ready_services
.swap_remove_index(index)
.expect("invalid ready index");
// 將請求轉過去
let fut = svc.call(request);
// 加到unready
self.push_unready(key, svc);
fut.map_err(Into::into)
}複製代碼
profiles
在linkerd2_proxy_http::profiles::router::MakeSvc::call
中:
// Initiate a stream to get route and dst_override updates for this
// destination.
let route_stream = match target.get_destination() {
Some(ref dst) => {
if self.suffixes.iter().any(|s| s.contains(dst.name())) {
debug!("fetching routes for {:?}", dst);
self.get_routes.get_routes(&dst)
} else {
debug!("skipping route discovery for dst={:?}", dst);
None
}
}
None => {
debug!("no destination for routes");
None
}
};複製代碼
通過若干判斷後,會調用ProfilesClient::get_routes
並將結果存於route_stream
。
進入get_routes
:
fn get_routes(&self, dst: &NameAddr) -> Option<Self::Stream> {
// 建立通道
let (tx, rx) = mpsc::channel(1);
// This oneshot allows the daemon to be notified when the Self::Stream
// is dropped.
let (hangup_tx, hangup_rx) = oneshot::channel();
// 建立Daemon對象(Future任務)
let daemon = Daemon {
tx,
hangup: hangup_rx,
dst: format!("{}", dst),
state: State::Disconnected,
service: self.service.clone(),
backoff: self.backoff,
context_token: self.context_token.clone(),
};
// 調用Daemon::poll
let spawn = DefaultExecutor::current().spawn(Box::new(daemon.map_err(|_| ())));
// 將通道接收端傳出
spawn.ok().map(|_| Rx {
rx,
_hangup: hangup_tx,
})
}複製代碼
接着看Daemon::poll
:
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
loop {
// 遍歷state成員狀態
self.state = match self.state {
// 未鏈接時
State::Disconnected => {
match self.service.poll_ready() {
Ok(Async::NotReady) => return Ok(Async::NotReady),
Ok(Async::Ready(())) => {}
Err(err) => {
error!(
"profile service unexpected error (dst = {}): {:?}",
self.dst, err,
);
return Ok(Async::Ready(()));
}
};
// 構造grpc請求
let req = api::GetDestination {
scheme: "k8s".to_owned(),
path: self.dst.clone(),
context_token: self.context_token.clone(),
};
debug!("getting profile: {:?}", req);
// 獲取請求任務
let rspf = self.service.get_profile(grpc::Request::new(req));
State::Waiting(rspf)
}
// 正在請求時,從請求中獲取回覆
State::Waiting(ref mut f) => match f.poll() {
Ok(Async::NotReady) => return Ok(Async::NotReady),
// 正常回復
Ok(Async::Ready(rsp)) => {
trace!("response received");
// 流式回覆
State::Streaming(rsp.into_inner())
}
Err(e) => {
warn!("error fetching profile for {}: {:?}", self.dst, e);
State::Backoff(Delay::new(clock::now() + self.backoff))
}
},
// 接收回復
State::Streaming(ref mut s) => {
// 處理回覆流
// 注意此處,參數1是get_profile請求的回覆流,
// 參數2是以前建立的通道發送端
match Self::proxy_stream(s, &mut self.tx, &mut self.hangup) {
Async::NotReady => return Ok(Async::NotReady),
Async::Ready(StreamState::SendLost) => return Ok(().into()),
Async::Ready(StreamState::RecvDone) => {
State::Backoff(Delay::new(clock::now() + self.backoff))
}
}
}
// 異常,結束請求
State::Backoff(ref mut f) => match f.poll() {
Ok(Async::NotReady) => return Ok(Async::NotReady),
Err(_) | Ok(Async::Ready(())) => State::Disconnected,
},
};
}
}複製代碼
接着 proxy_stream
:
fn proxy_stream(
rx: &mut grpc::Streaming<api::DestinationProfile, T::ResponseBody>,
tx: &mut mpsc::Sender<profiles::Routes>,
hangup: &mut oneshot::Receiver<Never>,
) -> Async<StreamState> {
loop {
// 發送端是否就緒
match tx.poll_ready() {
Ok(Async::NotReady) => return Async::NotReady,
Ok(Async::Ready(())) => {}
Err(_) => return StreamState::SendLost.into(),
}
// 從grpc stream中取得一條數據
match rx.poll() {
Ok(Async::NotReady) => match hangup.poll() {
Ok(Async::Ready(never)) => match never {}, // unreachable!
Ok(Async::NotReady) => {
// We are now scheduled to be notified if the hangup tx
// is dropped.
return Async::NotReady;
}
Err(_) => {
// Hangup tx has been dropped.
debug!("profile stream cancelled");
return StreamState::SendLost.into();
}
},
Ok(Async::Ready(None)) => return StreamState::RecvDone.into(),
// 正確取得profile結構
Ok(Async::Ready(Some(profile))) => {
debug!("profile received: {:?}", profile);
// 解析數據
let retry_budget = profile.retry_budget.and_then(convert_retry_budget);
let routes = profile
.routes
.into_iter()
.filter_map(move |orig| convert_route(orig, retry_budget.as_ref()))
.collect();
let dst_overrides = profile
.dst_overrides
.into_iter()
.filter_map(convert_dst_override)
.collect();
// 構造profiles::Routes結構並推到發送端
match tx.start_send(profiles::Routes {
routes,
dst_overrides,
}) {
Ok(AsyncSink::Ready) => {} // continue
Ok(AsyncSink::NotReady(_)) => {
info!("dropping profile update due to a full buffer");
// This must have been because another task stole
// our tx slot? It seems pretty unlikely, but possible?
return Async::NotReady;
}
Err(_) => {
return StreamState::SendLost.into();
}
}
}
Err(e) => {
warn!("profile stream failed: {:?}", e);
return StreamState::RecvDone.into();
}
}
}
}複製代碼
回到MakeSvc::call
方法,前面建立的route_stream
會被用於建立一個linkerd2_proxy::proxy::http::profiles::router::Service
任務對象,並在其poll_ready
方法中經過poll_route_stream
從route_steam
獲取profiles::Routes
並調用update_routes
建立具體可用的路由規則linkerd2_router::Router
,至此,路由規則已建好,就等具體的請求過來而後在call
中調用linkerd2_router::call
進行對請求的路由判斷。
proxy採用的tower框架,每一個處理邏輯都是其中的一個layer,開發時只需層層堆疊便可。不過,也正因如此,各層之間的接口都極其類似,須得當心不可調錯。 對於destination這部分邏輯,linkerd2的destination組件收到來自proxy的grpc請求後,每當endpoint或service profile有任何變更,都會當即經過stream發送過去,proxy收到後根據endpoint調整負載均衡策略,根據service profile調整路由,而後經過它們來處理用戶服務的實際請求。
ServiceMesher 社區是由一羣擁有相同價值觀和理念的志願者們共同發起,於 2018 年 4 月正式成立。
社區關注領域有:容器、微服務、Service Mesh、Serverless,擁抱開源和雲原生,致力於推進 Service Mesh 在中國的蓬勃發展。
社區官網:https://www.servicemesher.com