history job的寫入
1. org.apache.flink.runtime.jobmanager,Object JobManager
runJobManager中指定使用MemoryArchivist進行做業保存
startJobManagerActors中建立了進行做業保存的actor
此archive的actor會被傳入jobmanager的actorweb
2. org.apache.flink.runtime.jobmanager,Class JobManager
handleMessage中接收到JobStatusChanged的msg以後會根據邏輯判斷調用removeJob
接收到RemoveJob消息後,會調用removeJob
接收到RemoveCachedJob的時候,會調用removeJob
在SubmitJob的時候若是發現沒有leader,會調用removeJob
3.MemoryArchivist
handleMessage中的 調用進行持久化的函數
archiveJsonFiles中的 傳入路徑path和執行圖graph調用FsJobArchivist進行持久化apache
4.FsJobArchivist
archiveJob(Path rootPath, AccessExecutionGraph graph)
rootPath是配置的路徑
graph是做業的執行圖
archiveJob中首先調用WebMonitorUtils.getJsonArchivists()獲取持久化的json類型,實際調用的是WebRuntimeMonitor.getJsonArchivists
目前的類型包括
new CurrentJobsOverviewHandler.CurrentJobsOverviewJsonArchivist(),//joboverviewjson
new JobPlanHandler.JobPlanJsonArchivist(),//jobs/:jobid/plan
new JobConfigHandler.JobConfigJsonArchivist(),//jobs/:jobid/config
new JobExceptionsHandler.JobExceptionsJsonArchivist(),//jobs/:jobid/exceptions
new JobDetailsHandler.JobDetailsJsonArchivist(),//jobs/:jobid,//jobs/:jobid/vertices
new JobAccumulatorsHandler.JobAccumulatorsJsonArchivist(),//jobs/:jobid/accumulatorsrestful
new CheckpointStatsHandler.CheckpointStatsJsonArchivist(),//jobs/:jobid/checkpoints
new CheckpointConfigHandler.CheckpointConfigJsonArchivist(),//jobs/:jobid/checkpoints/config
new CheckpointStatsDetailsHandler.CheckpointStatsDetailsJsonArchivist(),//jobs/:jobid/checkpoints/details/:checkpointid
new CheckpointStatsDetailsSubtasksHandler.CheckpointStatsDetailsSubtasksJsonArchivist(),//jobs/:jobid/checkpoints/details/:checkpointid/subtasks/:vertexid函數
new JobVertexDetailsHandler.JobVertexDetailsJsonArchivist(),//jobs/:jobid/vertices/:vertexid
new SubtasksTimesHandler.SubtasksTimesJsonArchivist(),//jobs/:jobid/vertices/:vertexid/subtasktimes
new JobVertexTaskManagersHandler.JobVertexTaskManagersJsonArchivist(),//jobs/:jobid/vertices/:vertexid/taskmanagers
new JobVertexAccumulatorsHandler.JobVertexAccumulatorsJsonArchivist(),//jobs/:jobid/vertices/:vertexid/accumulators
new SubtasksAllAccumulatorsHandler.SubtasksAllAccumulatorsJsonArchivist(),//jobs/:jobid/vertices/:vertexid/subtasks/accumulatorsthis
new SubtaskExecutionAttemptDetailsHandler.SubtaskExecutionAttemptDetailsJsonArchivist(),//jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum,//jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum/attempts/:attempt,
new SubtaskExecutionAttemptAccumulatorsHandler.SubtaskExecutionAttemptAccumulatorsJsonArchivist(),//jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum/attempts/:attempt/accumulators線程
上面全部的archivist都繼承於JsonArchivist
其中只有一個接口 Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException
其從graph中獲取相應的信息 組裝成ArchivedJson,ArchivedJson的定義以下
public ArchivedJson(String path, String json) {
this.path = Preconditions.checkNotNull(path);
this.json = Preconditions.checkNotNull(json);
}
其中path指定存儲的位置,json指定存儲的內容rest
若是要新定義restful接口,則能夠在上面增長JsonArchivist類型
若是隻是要在已有的restful接口中增長字段,則能夠修改上述的類型server
5.上述流程走完以後,每一個job會在hdfs上生成一個json文件,包含各類路徑、指明對應的維度對象
History Job的讀取
org.apache.flink.runtime.webmonitor.history
1.HistoryServer,負責歷史做業的存儲和展現,包含一個HistoryServerArchiveFetcher對象,此對象使用「刷新間隔,拉取路徑,本地臨時地址,」
2.HistoryServerArchiveFetcher根據指定的時間間隔,在單獨的線程中調用JobArchiveFetcherTask獲取的任務
3.JobArchiveFetcherTask是一個線程類,從指定的目錄中不斷的拉取數據,存入本地指定的路徑;若是設置了每次拉取以後更新joboverview,則在拉取完畢以後進行joboverview的更新
4.org.apache.flink.runtime.history
調用FsJobArchivist中的Collection<ArchivedJson> getArchivedJsons(Path file)來獲取數據,path指定存儲的位置,返回該位置的全部Json數據
5.上述流程完畢以後,會在本地臨時目錄每一個job建立一個目錄,目錄中有不少子目錄,分門別類的保存了各類的json文件
文件保存
從上述的過程當中,在jobmanager寫入文件的時候,是不考慮頻繁讀取的,因此寫成了一個大文件,也符合hdfs的要求,可是在history server的保存中,如上的在hdfs中的一個文件被安裝路徑和維度被拆成了不少個json文件,也是爲了在UI上便於展現。