本來地址:基於Scala的產品開發實踐
博客地址:zhangyi.farbox.com/javascript
咱們的產品代號爲Mort(這個代號來自電影《馬達加斯加》那隻萌萌的大眼猴),是基於大數據平臺的商業智能(BI)產品。產品架構以下所示:前端
咱們選擇了Spark做爲咱們的大數據分析平臺。基於目前的應用場景,主要使用了Spark SQL,目前使用的版本爲Spark 1.5.0。咱們有計劃去同步升級Spark最新版本。java
在研發期間,咱們從Spark 1.4升級到1.5,通過性能測評的Benchmark,性能確有顯著提升。Spark 1.6版本在內存管理方面有明顯的改善,Execution Memory與Store Memory的比例能夠動態分配,但通過測試,產品的主要性能瓶頸實際上是CPU,由於產品的數據分析功能屬於計算密集型。這是咱們暫時沒有考慮升級1.6的主因。react
從第一次升級Spark的性能測評,以及咱們對這一年來Spark版本演進的觀察,咱們對Spark的將來充滿信心,尤爲是Tungsten項目計劃,會在內存管理、代碼生成以及緩存管理等多方面都會有所提升,對於咱們產品而言,算是「不勞而獲」了。git
因爲咱們要分析的維度和指標是由客戶指定的,這就須要數據分析的聚合操做是靈活可定製的。所以,咱們的產品寫了一個簡單的語法Parser,用以組裝Spark SQL的SQL語句,用以執行分析,最後將DataFrame轉換爲咱們期待的數據結構返回給前端。github
可是,這種設計方案其實牽涉到兩層解析的性能損耗,一個是咱們本身的語法Parser,另外一個是Spark SQL提供的Parser(經過它將其解析爲DataFrame的API調用)。咱們考慮在未來會調整方案,直接將客戶定製的聚合操做解析爲對DataFrame的API調用(可能會使用新版本Spark的DataSet)。sql
咱們的產品須要支持多種數據源,對數據源的訪問是由另一個standalone的服務CData完成的,經過它能夠隔離這種數據源的多樣性。這至關於一個簡單的微服務架構,目前僅提供兩個服務,一個服務用於數據分析,一個服務用於對客戶數據源的處理:數據庫
將來,咱們的產品不止限於現有的兩個服務,例如我正在考慮將按期的郵件導出服務獨立出來,保證該服務的獨立性,避免受到其餘功能執行的影響。由於這個功能一旦失敗,可能會對客戶的業務產生重要影響。npm
然而,咱們仍是在理智地控制服務的粒度。咱們不但願由於盲目地追求微服務架構,而帶來運維上的成本。編程
咱們的產品須要存儲元數據(Metadata),用以支持Report、Dashboard以及數據分析,主要的數據模型結果如圖所示:
針對元數據的處理邏輯,咱們將之分爲職責清晰的三層架構。自上而下分別爲REST路由層、應用服務層和元數據資源庫層。
REST路由層:將元數據視爲資源,響應客戶端的HTTP請求,並利用Spray Route將請求路由到對應的動詞上。路由層爲核心資源提供Router的trait。這些Router只負責處理客戶端請求,以及服務端的響應,不該包含具體的業務邏輯。傳遞的消息格式爲Json格式,由Spray實現消息到Json數據的序列化與反序列化。
應用服務層:每一個應用服務對應元數據資源的操做用例。因爲Mort對元數據的操做並無很是複雜的業務邏輯,所以這些服務實際上成爲了Router到Repository的中轉站,目的是爲了隔離REST路由層對元數據資源庫的依賴。每一個服務都被細分爲Creator、Editor、Fetcher與Destroyer這樣四個細粒度的trait,並放在對應服務的同一個scala文件中。同時,應用服務要負責保障元數據操做的數據完整性和一致性,於是引入了橫切關注點(Cross Concern Points)中的事務管理。同時,對操做的驗證以及權限和受權操做也會放到應用服務中。
元數據資源庫層:每一個資源庫對象都是一個Scala Object,並對應着數據庫中的元數據表。這些對象中的CRUD操做都是原子操做。事實上咱們能夠認爲每一個資源庫對象就是元數據的訪問入口。在其實現中,實際上封裝了scalikejdbc的訪問邏輯。
REST路由層和應用服務層須要接收和返回的消息很是類似,甚至在某些場景中,消息結構徹底相同,但咱們仍然定義了兩套消息體系(皆被定義爲Case Class)。邏輯層與消息之間的關係以下圖所示:
在REST路由層,全部的消息皆以Request或Response做爲類的後綴名,並被定義爲Scala的Case Class。在應用服務層以及元數據資源庫層使用的消息對象則被單獨定義在Messages模塊中。此外,元數據資源庫層還會訪問由ScalikeJDBC生成的Model對象。
咱們選擇的語言是Scala。選擇它的一個主因是由於Spark;另外一個緣由呢?或許是由於我確實不想再寫Java代碼了。
其實有時候我以爲語言的選型是沒有什麼道理的。除了特殊的應用場景,幾乎全部的程序設計語言都能知足現在的軟件開發需求。因此我悲哀地看到,語言的紛爭成了宗教的紛爭。
在咱們團隊,有熟悉Java的、有熟悉JavaScript包括NodeJS的,有熟悉Clojure的,固然也有熟悉Scala的。除了NodeJS,後端開發幾乎都在JVM平臺下。
我對語言選型的判斷標準是:實用、高效、簡潔、可維護。我對Java沒有成見,但我始終認爲:即便引入了Lambda以及Method Reference,Java 8在語法方面仍是太冗長了。
Scala彷佛從誕生開始,一直爭議很大。早在2014年1月ThoughtWorks的Tech Radar中,就講Scala列入了Adopt圈中,但卻在其中特別標註了「the good parts」:
在2016年Stack Overflow發佈的開發人員調查結果中,咱們也收穫了一些信心。在最愛語言的調查中,Scala排在了第四名:
在引領技術趨勢的調查中,咱們選用的React與Spark分列冠亞軍:
在Top Paying Tech調查中,在美國學習Spark和Scala所值不菲,竟然並列冠軍:
其實有了微服務,在不影響代碼維護性的狀況下,使用多語言進行開發也成爲了可能。或許在未來,咱們產品的可能會用clojure或者Ruby來寫DSL,用NodeJS負責元數據(以免Spray + JSON4S不太好的Json對象序列化)。
說明:將元數據管理單獨獨立爲一個NodeJS服務,已經列到了後續架構演進的計劃中。針對元數據管理,咱們會統一成JavaScript技術棧,從前端到後端再到數據庫,統一爲React+ES六、NodeJS和MongoDB。
坦白說,我沒有強烈的語言傾向性。
咱們還有一個最初的技術選型,後來被認爲是失敗的選擇。
CData服務須要將客戶的數據源通過簡單的ETL導入到系統中,咱們稱之爲數據集(DataSet)。最初在進行技術選型時,我前後考慮過MySQL、Cassandra、HBase。後面兩種都屬於列式存儲的NoSQL數據庫。團隊中沒有一我的有Cassandra的經驗,至於HBase,雖然支持高效的數據查詢,但對聚合運算的支持明顯不足,不適合咱們的場景。再加上團隊中有一位成員比較熟悉MySQL,我最終決定使用MySQL。
然而,咱們的產品須要支持大數據,當數據量上升到必定級別時,就須要系統很好地支持水平擴展,經過增長更多機器來知足性能上的需求。評估咱們的架構,後端平臺能夠簡單劃分爲三個層次:Web應用服務層(Spray + Nginix)、數據分析層(MESOS + Spark)以及存儲層(主要用於存儲分析數據DataSet,MySQL)。顯然,MySQL會成爲水平伸縮的最大障礙。
還好咱們醒悟得早,在項目初期就否認了這個方案,而改成採用HDFS+Parquet。
Parquet文件是一種列式數據存儲結構,對於主要爲分析型查詢方式的BI數據操做,可以提供更好的查詢性能。同時,Parquet文件存儲的內容以二進制形式存放,相較於文本形式容量更小,能夠節省更多的存儲空間。
Spark SQL提供了對訪問Parquet文件很好的集成。將Parquet文件存放到HDFS中,而後再經過Spark SQL訪問,能夠保證在存儲層與數據分析層都能很好地支持分佈式處理,從而保證系統的水平伸縮。當對大規模數據集進行分析處理時,能夠經過水平增長更多的節點來知足高性能的實時查詢要求。
咱們曾經比較了Parquet方案與MySQL方案,在同等配置下前者的性能要遠遠優於後者,且Spark對Parquet的支持也要好於MySQL。
爲了更好地提高性能,咱們還計劃在HDFS層之上引入Tachyon,充分發揮內存的優點,減小磁盤IO帶來的性能損耗。
前端的技術選型則爲React + Redux。選擇React的緣由很簡單,一方面咱們認爲這種component方式的前端開發,能夠極大地提升UI控件的重用,另外一方面,咱們認爲React這種虛擬DOM的方式在性能上存在必定優點。此外,React的學習曲線也不高,很容易上手。咱們招了3個大學還未畢業的實習生,JS基礎很是薄弱,在咱們的培養下,一週後就能夠慢慢開始完成React Component開發的小Story了。
在最初的團隊,咱們僅有一位前端開發。他選擇了使用CoffeeScript來開發React,可是在項目早期,咱們仍是忍痛去掉了這些代碼,改成使用ES 6。畢竟隨着ES 6乃至ES 7的普及,JS的標準已經變得愈來癒合理,CoffeeScript的生存空間彷佛被壓縮了。
在前端技術選型方面,咱們經歷了好幾回演變。從CoffeeScript到ES 6,從Reflux到Redux,每次變化都在必定程度上增長了工做量。我在文章《技術選型的理想與現實》中講述的就是這個故事。
在《技術選型的理想與現實》這篇文章中,我講到咱們選擇了Reflux。然而到如今,最終仍是遷移到了Redux。咱們一開始並無用好Redux,最近的一次重構才讓代碼更符合Redux的最佳實踐。
技術負責人一個很是重要的能力要求就是——善於作出好的技術決策。選擇技術時,並不能一味追求新技術,也不能以自我爲中心,選擇「我」認爲好的技術。而應該根據產品的需求場景、可能的技術風險、團隊成員能力,並經過分析將來的技術發展趨勢綜合地判斷。
技術決策不可能一成不變,須要與時俱進。若是發現決策錯誤,應該及時糾正,不要遲疑,更不要擔憂會影響本身的技術聲譽。
與大多數團隊相比,由於咱們使用了小衆的Scala,能夠算得上是「撈偏門」了,因此總結的技術實踐未必具備普適性,但對於同爲Scala的友朋,或許值得借鑑一二。Scala社區發出的聲音仍是過小,有點孤獨——「鸚其鳴也,求其友聲」。
這些實踐不是書本上的創做,而是在產品研發中逐漸演化而來,甚至一些實踐會很是細節。不過,那個優秀的產品不是靠這些細節堆砌出來的呢?
兩年前我還在ThoughtWorks的時候,與同事楊雲(大魔頭)在一個Scala的大數據項目,利用工做之餘,我結合了一些文檔整理了一份Scala編碼規範,放在了github上:Scala編碼規範與最佳實踐。
咱們的產品後端所有由Scala進行開發。對於編寫Scala代碼,個人要求很低,只有兩點:
對於Scala編程,咱們還總結了幾條小原則:
咱們產品用的AKKA並不夠深刻,僅僅使用了AKKA的基本功能。主要用於處理前端發來的數據分析消息,至關於一個dispatcher,也承擔了部分消息處理的職責,例如對消息包含的元數據進行解析,生成SQL語句,用以發送給Spark的SqlContext。分析的結果則以Future的方式返回給Spray。
幾條AKKA實踐的小原則:
如下是爲AKKA的ActorRefFactory定義的工廠方法:
trait ActorSupport {
implicit val requestTimeout: Timeout = ActorConfig.requestTimeout
def actorOf(className: String)(implicit refFactory: ActorRefFactory, trackID: TrackID = random): ActorRef = refFactory.actorOf(new Props(Props.defaultDeploy, Class.forName(className).asInstanceOf[Class[Actor]], List.empty), id(className))
def actorOf[T <: Actor : ClassTag](implicit refFactory: ActorRefFactory, trackID: TrackID = random): ActorRef = refFactory.actorOf(Props[T], id(classTag[T].toString))
def actorOf[T <: Actor : ClassTag](initial: ActorRefFactory)(implicit trackID: TrackID = random): ActorRef = initial.actorOf(Props[T], id(classTag[T].toString))
}複製代碼
經過向自定義的工廠方法actorOf()傳入Actor的名稱來建立Actor:
def importDataSetData(dataSetId: ID) {
val importDataSetDataActor = actorOf(actorByPersistence("import"))(actorRefFactory)
importDataSetDataActor ! ImportDataSet(dataSetId)
}
def createDataSetPersistence: Future[Any] = {
val createDataSetPersistenceActor = actorOf(actorByPersistence("create"))(actorRefFactory)
createDataSetPersistenceActor ? dataSet
}複製代碼
trait ActorExceptionHandler extends MortActor {
self: Actor =>override
def receive: Receive = {
case any: Any =>
try {
super.receive(any)
} catch {
case notFound: ActorNotFound =>
val errorMsg: String = s"invalid parameters: ${notFound.toString}"
log.error(errorMsg)
exceptionSender ! ExecutionFailed(BadRequestException(s"invalid parameters ${notFound.getMessage}"), errorMsg)
case e: Throwable =>
exceptionSender ! ExecutionFailed(withTrackID(e, context.self.path.toString), e.getMessage)
}
}
def exceptionSender = sender
}複製代碼
或者以相似Decorator模式擴展Actor
trait DelegationActor extends MortActor {
this: Actor =>private
val executionResultHandler: Receive = {
case _: ExecutionResult =>
}
override def receive: Receive = {
case any: Any =>
try {
(mortReceive orElse executionResultHandler) (any)
} catch {
case e: Throwable =>
log.error(e, "")
self ! ExecutionFailed(e)
throw e
} finally {
any match {
case _: ExecutionResult => self ! PoisonPillcase _ =>
}
}
}
}複製代碼
目前的產品特性還未用到更高級的Spark功能。針對一些特殊的客戶,咱們計劃採用Spark Streaming來進行流處理,除此以外,核心的數據分析功能都是使用Spark SQL。
如下是咱們的一些總結:
spark.sql.inMemoryColumnarStorage.batchSize
和spark.sql.shuffle.partitions
;第一次執行的SQL語句:
SELECT UniqueCarrier,Origin,count(distinct(Year)) AS Year FROM airline GROUP BY UniqueCarrier,Origin複製代碼
第二次執行的SQL語句:
SELECT UniqueCarrier,Dest,count(distinct(Year)) AS Year FROM airline GROUP BY UniqueCarrier,Dest複製代碼
第三次執行的SQL語句:
SELECT Dest , Origin , count(distinct(Year)) AS Year FROM airline GROUP BY Dest , Origin複製代碼
觀察執行的結果以下所示:
觀察執行count操做的job,顯然第一次執行SQL時的耗時最長,達到2s,而另外兩個job執行的時間則不到一秒。
針對複雜的數據分析,要學會充分利用Spark提供的函數擴展機制:UDF((User Defined Function)與UDAF(User Defined Aggregation Function);詳細內容,請閱讀文章《Spark強大的函數擴展功能》。
咱們一開始並無用好React+Redux。隨着對它們的逐漸熟悉,結合社區的一些實踐,咱們慢慢體會到了其中的一些好處,也摸索出一些好的實踐。
組件設計的原則
- 一個純組件利用props接受全部它須要的數據,相似一個函數的入參,除此以外它不會被任何其它因素影響;
- 一個純組件一般沒有內部狀態。它用來渲染的數據徹底來自於輸入props,使用相同的props來渲染相同的純組件屢次,
- 將獲得相同的UI。不存在隱藏的內部狀態致使渲染不一樣。
Redux的三大基本原則
- 單一數據源
- State 是隻讀的
- 使用純函數來執行修改
在咱們的項目中,將全部向後臺發送異步請求的操做都封裝到service中,action會調用這些服務。咱們使用了redux-actions的createAction建立dispatch須要的消息:
export const loadDataSource = (id) => {
return dispatch => {
return DataSourceServices.getDataSource(id)
.then(dataSource => { dispatch(createAction(DataSourceActionTypes.DATA_SOURCE_RECEIVED)(dataSource)) })
}
}複製代碼
在Reducer中,經過redux-actions的handleAction來處理action,避免使用醜陋的switch語句:
export const dataSources = handleActions({
[DataSourceActionTypes.DATA_SOURCES_RECEIVED]: (state, {payload}) => {
const newState = reduce(payload, (result, dataSource) => {
set(result, dataSource.id, dataSource)
return result
}, state)
return assign({}, newState)
},
[DataSourceActionTypes.DATA_SOURCE_RECEIVED]: (state, {payload}) => {
set(state, payload.id, payload)
return assign({}, state)
},
[DataSourceActionTypes.DATA_SOURCE_DELETED]: (state, {payload}) => {
return omit(state, payload) }
}, {})複製代碼
在Container組件中,若是Store裏面的模型對象須要根據id進行filter或merge之類的操做,則交給selector對其進行封裝。因而Container組件中就能夠這樣來調用:
@connect(state => {
return {
dataSourcesOfDirectory: DataSourcesSelectors.getDataSourcesOfDirectory(state),
dataSetsOfDataSource: DataSetsSelectors.getDataSetsOfDataSource(state),
selectedDataSource: DataSourcesSelectors.getSelectedDataSource(state),
currentDirectory: DataSourcesSelectors.getCurrentDirectory(state), memories: state.next.commons.memories
}
}, {
loadDataSourcesOfDirectory: DataSourcesActions.loadDataSourcesOfDirectory,
selectDataSource: selectedDataSourceAction.selectDataSource,
cleanSelectedDataSource: selectedDataSourceAction.cleanSelectedDataSource,
loadDataSetsOfDataSource: DataSetsActions.loadDataSetsOfDataSource,
updateDataSource: DataSourcesActions.updateDataSource,
deleteDataSource: DataSourcesActions.deleteDataSource,
navigate: commonActions.navigate,
memory: memoryActions.memory,
cleanMemory: memoryActions.cleanMemory,
goToNewDataSource: NavigationActions.goToNewDataSource
})複製代碼
echo "npm run lint" > .git/hooks/pre-pushchmod +x .git/hooks/pre-push複製代碼
咱們的一些總結:
def reportRoute(implicit userId: ID) = pathPrefix("reports") {
getReport ~ getViewsOfReport ~ createReport ~ updateReport ~ deleteReport ~ getVirtualField ~ getVirtualFields ~ fuzzyMatch ~ createVirtualField
}複製代碼