SDP(0):Streaming-Data-Processor - Data Processing with Akka-Stream

   再有兩天就進入2018了,想一想仍是要準備一下明年的工做方向。回想當初開始學習函數式編程時的主要目的是想設計一套標準API給那些習慣了OOP方式開發商業應用軟件的程序員們,使他們能用一種接近傳統數據庫軟件編程的方式來實現多線程,並行運算,分佈式的數據處理應用程序,前提是這種編程方式不須要對函數式編程語言、多線程軟件編程以及集羣環境下的分佈式軟件編程方式有很高的經驗要求。前面試着發佈了一個基於scalaz-stream-fs2的數據處理工具開源項目。該項目基本實現了多線程的數據庫數據並行處理,能充分利用域內服務器的多核CPU環境以streaming,non-blocking方式提升數據處理效率。最近剛完成了對整個akka套裝(suite)的瞭解,感受akka是一套理想的分佈式編程工具:一是actor模式提供了多種多線程編程方式,再就是akka-cluster能輕鬆地實現集羣式的分佈式編程,而集羣環境變化只須要調整配置文件,無需改變代碼。akka-stream是一套功能更加完整和強大的streaming工具庫,那麼若是以akka-stream爲基礎,設計一套能在集羣環境裏進行分佈式多線程並行數據處理的開源編程工具應該能夠是2018的首要任務。一樣,用戶仍是可以按照他們熟悉的數據庫應用編程方式輕鬆實現分佈式多線程並行數據處理程序的開發。react

   我把通常中小企業的IT系統分紅兩大部分:一是實時的數據採集(輸入)部分,二是批量數據抽取、分析、處理部分。爲了讓傳統中小型企業IT軟件編程人員能開發服務器集羣環境上數據平臺(如雲端數據平臺)運行的軟件系統,我打算經過這個DSP(Streaming-Data-Processor)項目來實現上面提到的第二部分。第一部分能夠用CQRS(Command-Query-Responsibility-Separation)即讀寫分離架構和事件記錄(event-sourcing)模式來實現一種高效快速響應、安全穩定運行的數據採集體系。這部分我會在完成SDP項目後以akka-persistence爲核心,經過akka-http,AMQP如RabitMQ等技術來實現。程序員

  按通常的scala和akka的編程方式編寫多線程分佈式數據庫管理軟件時一是要按照akka代碼模式,使用scala編程語言的一些較深的語法;二是須要涉及異步Async調用,集羣Cluster節點任務部署及Streaming對外集成actor運算模式的細節,用戶須要具有必定的scala,akka使用經驗。再接下來就須要按業務流程把各業務環節分解成不依賴順序的功能模塊,而後把這些分拆出來的功能分派給集羣中不一樣的節點上去運算處理。而對於SDP用戶來講,具有最基本的scala知識,無需瞭解akka、actor、threads、cluster,只要按照SDP自定義的業務處理流模式就能夠編制多線程分佈式數據處理程序了。下面我就用一些文字及僞代碼來描述一下SDP的結構和功能:面試

整體來講SDP是由一或多個Stream組成的;每一個Stream就表明一段程序。一段完整的程序Stream是由流元素源Source、處理節點Process-Node(Flow)及數據輸出終點Sink三個環節組成,下面是一個典型的程序框架:sql

  def load(qry: Query): PRG[R,M] = ??? def process1: PRG[R,M] = ??? def process2: PRG[R,M] = ??? def recursiveProcess(prg: PRG[R,M]): PRG[R,M] = ??? def results: PRG = ??? load(qryOrders).process1.process2.recursiveProcess(subprogram).results.run

從上面的示範中咱們能夠看到全部定義的函數都產生PRG[R,M]類型結果。其中R類型就是stream的元素,它流動貫穿了程序的全部環節。就像下水道網絡運做原理同樣:污水由源頭Source流入終點Sink,在途中可能通過多個污水處理節點Node。每個節點表明對管道中流淌污水處理的方式,包括分叉引流、並叉合流、添加化學物質、最後經過終點把處理過的水向外輸出。在PRG中流動的R類型多是數據如數據庫表的一行,又或者是一條Sring類型的query如plain-sql,能夠用JDBC來運行。cassandra的CQL也是String類型的。Slick,Quill,ScalikeJDBC和一些其它ORM的Query均可以產生plain-sql。數據庫

Source是一段程序的開始部分。通常來講Source是經過運算Query產生一串數據行或者人工構建而成。Source也能夠並行運算Query產生,而後合併成一條無序的數據源,以下僞代碼的類型:編程

  def load_par(qrys: Query*): PRG[R,M] = ???

Process-Node是SDP最重要的一個組成部分,由於大部分用戶定義的各類業務功能是在這裏運算的。用戶能夠選擇對業務功能進行拆分而後分派給不一樣的線程或不一樣的集羣節點進行多線程並行或分佈式的運算。SDP應該爲用戶程序提供多線程,並行式、分佈式的運算函數。首先,運算用戶程序後應產生R類型結果並且,做爲一種reactive軟件,必須保證徹底消耗上一階段產生的全部R類型元素。下面是一個用戶函數的款式:安全

  type UserFunc = R => R 

除了fire-and-run類型的運算函數,SDP還應該提供針對多線程或分佈式程序的map-reduce式運算函數。初步想法是:不管返回結果與否,分派任務都是由persistence-actor來執行的,這樣能保證不會漏掉任何任務。若是總體任務須要在全部分派任務返回運算結果後再統一進行深度運算時akka的actor消息驅動模式是最適合不過的了。具體狀況能夠參考我前面關於cluster-sharding的博文。服務器

Sink的主要做用其實是保證徹底消耗程序中產生的全部元素,這是reactive類型程序的必需要求。網絡

好了,不知不覺還有幾個鐘頭就進入2017倒計時了。趕快湊合着在跨入2018以前把這篇發佈出去,恰好是今年的最後一篇博文。祝各位在新的一年中工做生活趁心如意!多線程

相關文章
相關標籤/搜索