Livy探究(七) -- 編程接口分析

在前面的篇章中,咱們把重點放在livy的REPL功能的展現和源碼分析。這篇咱們探索一下livyUsing the Programmatic API功能。html

REPL不一樣的是,Programmatic API提供了在一個「已經存在」的SparkContext上執行處理程序的機制。用戶須要實現Job接口:apache

public interface Job<T> extends Serializable {
  T call(JobContext jc) throws Exception;
}

JobContext對象能夠訪問到SparkContextSQLContext,因此,用戶不用關心SparkContext的建立。api

當用戶編寫了實現Job接口的類後,打包成jar包,就能夠使用LivyClient上傳jar包、提交Job了。session

這裏作個補充 livyUrl是指 http://host:port/sessions/xxxx,即爲對應session的RESTful地址。livy的例子文檔中,沒有寫清楚的是 livyUrl是什麼。

livy服務端,會在對應session的SparkContext上運行Job。框架

源碼分析

客戶端代碼

LivyClientLivyClientFactory在livy源碼中是接口,而實現是HttpClientFactorydom

/**
 * Factory for HTTP Livy clients. */
public final class HttpClientFactory implements LivyClientFactory {
    @Override
    public LivyClient createClient(URI uri, Properties config) {
        if (!"http".equals(uri.getScheme()) && !"https".equals(uri.getScheme())) {
            return null;
        }
        return new HttpClient(uri, new HttpConf(config));
    }
}

LivyClientBuilder使用ServiceLoader加載LivyClientFactory的實現類。ide

HttpClient會向以下幾個接口地址請求:源碼分析

  • /sessions/%d/upload-jar
  • /sessions/%d/add-jar
  • /sessions/%d/upload-file
  • /sessions/%d/add-file
  • /sessions/%d/submit-job
  • /sessions/%d/run-job

例如,當調用sumbit時,會請求/sessions/%d/submit-job。咱們重點看submit-job,最終會調用sendJobui

// command就是submit-job或run-job
private <T> JobHandleImpl<T> sendJob(final String command, Job<T> job) {
    final ByteBuffer serializedJob = serializer.serialize(job);
    JobHandleImpl<T> handle = new JobHandleImpl<T>(config, conn, sessionId, executor, serializer);
    handle.start(command, serializedJob);
    return handle;
}

這裏看到,對job類的實例進行了序列化。這裏用的序列化方式就是前面篇章中提到的kryo。序列化最終會把類轉化成byte[]。稍後在服務端,能夠看到會使用反序列化恢復Job類。spa

服務端代碼

客戶端的http請求首先會到達livyServer,入口主要在InteractiveSessionServlet

image.png

上面的代碼接收到客戶端請求,選擇到對應的session,並調用session的submitJob

在第三篇中,咱們知道livyServer中的一個session本質上對應了一個正在運行的driver程序。而且session會鏈接到driver,建立出一條鏈路,進而與driver通訊。

上述submitJob最終會經過這條鏈路,向driver發送消息,那麼發送的具體是什麼消息呢?經過下面代碼能夠看到,其實發送的是BypassJobRequest,而且這裏尚未對Job進行反序列化

String jobId = UUID.randomUUID().toString();
Object msg = new BypassJobRequest(jobId, jobType, BufferUtils.toByteArray(serializedJob), sync);
...
if (driverRpc.isSuccess()) {
  try {
    return driverRpc.get().call(msg, retType);
 } catch (Exception ie) {
    throw Utils.propagate(ie);
 }
}

根據這條線索,以及第六篇中關於RPC框架的結論。咱們在RSCDriver裏面找到了對應的handle

image.png

進一步在BypassJob中看到,對序列化的job對象進行反序列化,並執行call的地方:

image.png

總結

本篇介紹了livyProgrammatic API,這部分官方文檔目前比較少。所以,更多從源碼角度作個分析,以便後續採坑。

下圖總結了Job提交運行的基本流程:

image.png

  • Client端首先對Job對象採用kryo進行序列化,經過http接口調用到livyServer
  • livyServer封裝請求爲RPC消息BypassJobRequest,發送給對應的driver
  • driver側對Job執行反序列化,並調用其call
相關文章
相關標籤/搜索