在前面的篇章中,咱們把重點放在livy的REPL
功能的展現和源碼分析。這篇咱們探索一下livy
Using the Programmatic API功能。html
與REPL
不一樣的是,Programmatic API
提供了在一個「已經存在」的SparkContext上執行處理程序的機制。用戶須要實現Job
接口:apache
public interface Job<T> extends Serializable { T call(JobContext jc) throws Exception; }
JobContext
對象能夠訪問到SparkContext
和SQLContext
,因此,用戶不用關心SparkContext
的建立。api
當用戶編寫了實現Job
接口的類後,打包成jar包,就能夠使用LivyClient
上傳jar包、提交Job了。session
這裏作個補充livyUrl
是指http://host:port/sessions/xxxx
,即爲對應session的RESTful地址。livy的例子文檔中,沒有寫清楚的是livyUrl
是什麼。
livy服務端,會在對應session的SparkContext上運行Job。框架
LivyClient
和LivyClientFactory
在livy源碼中是接口,而實現是HttpClientFactory
:dom
/** * Factory for HTTP Livy clients. */ public final class HttpClientFactory implements LivyClientFactory { @Override public LivyClient createClient(URI uri, Properties config) { if (!"http".equals(uri.getScheme()) && !"https".equals(uri.getScheme())) { return null; } return new HttpClient(uri, new HttpConf(config)); } }
而LivyClientBuilder
使用ServiceLoader加載LivyClientFactory
的實現類。ide
HttpClient會向以下幾個接口地址請求:源碼分析
例如,當調用sumbit時,會請求/sessions/%d/submit-job
。咱們重點看submit-job
,最終會調用sendJob
:ui
// command就是submit-job或run-job private <T> JobHandleImpl<T> sendJob(final String command, Job<T> job) { final ByteBuffer serializedJob = serializer.serialize(job); JobHandleImpl<T> handle = new JobHandleImpl<T>(config, conn, sessionId, executor, serializer); handle.start(command, serializedJob); return handle; }
這裏看到,對job類的實例進行了序列化
。這裏用的序列化方式就是前面篇章中提到的kryo
。序列化最終會把類轉化成byte[]
。稍後在服務端,能夠看到會使用反序列化恢復Job類。spa
客戶端的http請求首先會到達livyServer,入口主要在InteractiveSessionServlet
:
上面的代碼接收到客戶端請求,選擇到對應的session,並調用session的submitJob
。
在第三篇中,咱們知道livyServer中的一個session本質上對應了一個正在運行的driver程序。而且session會鏈接到driver,建立出一條鏈路,進而與driver通訊。
上述submitJob最終會經過這條鏈路,向driver發送消息,那麼發送的具體是什麼消息呢?經過下面代碼能夠看到,其實發送的是BypassJobRequest
,而且這裏尚未對Job進行反序列化
:
String jobId = UUID.randomUUID().toString(); Object msg = new BypassJobRequest(jobId, jobType, BufferUtils.toByteArray(serializedJob), sync); ... if (driverRpc.isSuccess()) { try { return driverRpc.get().call(msg, retType); } catch (Exception ie) { throw Utils.propagate(ie); } }
根據這條線索,以及第六篇中關於RPC框架的結論。咱們在RSCDriver
裏面找到了對應的handle
:
進一步在BypassJob
中看到,對序列化的job對象進行反序列化,並執行call
的地方:
本篇介紹了livy
的Programmatic API,這部分官方文檔目前比較少。所以,更多從源碼角度作個分析,以便後續採坑。
下圖總結了Job提交運行的基本流程:
BypassJobRequest
,發送給對應的drivercall