Geotrellis 是針對大數據量柵格數據進行分佈式空間計算的框架,這一點毋庸置疑,而且不管採起何種操做,其實都是先將大塊的數據切割成必定大小的小數據(專業術語爲瓦片),這是分治的思想,也是分佈式計算的精髓,因此使用 Geotrellis 的第一步工做就是要將數據切片(不管是存儲在內存中仍是進行持久化),然而即便其能力再「大」在實際工做中也難以處理如下幾種需求:html
這幾種狀況下咱們都很難或者沒有辦法同時對這些數據進行處理,可行的方案就是執行更新操做或者分批處理。在 Geotrellis 框架中提供了數據的 ETL 接口,可是隻能進行 write 操做,並不能進行 update 操做,write 操做會覆蓋掉此圖層中已有數據,而且相鄰數據之間沒法進行拼接,致使接邊處數據缺失,因此分批處理只能寫到不一樣的圖層,這又給數據的調用計算等處理形成很大的麻煩。本文在原有 ETL 的基礎上簡單介紹如何實現同層瓦片的 update 操做。數據庫
ETL 完成的工做是將數據切割成瓦片並進行持久化,在 Geotrellis 中你能夠將數據直接放在內存中(雖然也未提供現成的解決方案,我前面的文章簡單介紹瞭如何實現),也能夠將數據放在 Accumulo、HBASE 等分佈式數據庫或者是 HDFS 和 普通文件系統中。實現代碼在 geotrellis.spark.etl
包下的 Etl 類中,調用 ingest 方法的時候傳入不一樣的參數便可實現數據入庫的操做,此部分前面也已經介紹過,這裏再也不贅述。ingest 方法主要代碼以下:app
val etl = Etl(conf, modules) val sourceTiles = etl.load[I, V] val (zoom, tiled) = etl.tile(sourceTiles) etl.save[K, V](LayerId(etl.input.name, zoom), tiled)
整個流程爲首先使用 load 函數讀取原始數據,再調用 tile 函數對數據進行切割,然後調用 save 函數將切割後的瓦片進行持久化。因此只要在 save 方法中判斷要存放數據的圖層是否存在,若是不存在執行已有操做,若是存在則執行 update 操做。框架
原生 save 方法以下:分佈式
def save[ K: SpatialComponent: TypeTag, V <: CellGrid: TypeTag: ? => TileMergeMethods[V]: ? => TilePrototypeMethods[V] ]( id: LayerId, rdd: RDD[(K, V)] with Metadata[TileLayerMetadata[K]], saveAction: SaveAction[K, V, TileLayerMetadata[K]] = SaveAction.DEFAULT[K, V, TileLayerMetadata[K]] ): Unit = { implicit def classTagK = ClassTag(typeTag[K].mirror.runtimeClass(typeTag[K].tpe)).asInstanceOf[ClassTag[K]] implicit def classTagV = ClassTag(typeTag[V].mirror.runtimeClass(typeTag[V].tpe)).asInstanceOf[ClassTag[V]] val outputPlugin = combinedModule .findSubclassOf[OutputPlugin[K, V, TileLayerMetadata[K]]] .find { _.suitableFor(output.backend.`type`.name) } .getOrElse(sys.error(s"Unable to find output module of type '${output.backend.`type`.name}'")) def savePyramid(zoom: Int, rdd: RDD[(K, V)] with Metadata[TileLayerMetadata[K]]): Unit = { val currentId = id.copy(zoom = zoom) outputPlugin(currentId, rdd, conf, saveAction) scheme match { case Left(s) => if (output.pyramid && zoom >= 1) { val (nextLevel, nextRdd) = Pyramid.up(rdd, s, zoom, output.getPyramidOptions) savePyramid(nextLevel, nextRdd) } case Right(_) => if (output.pyramid) logger.error("Pyramiding only supported with layoutScheme, skipping pyramid step") } } savePyramid(id.zoom, rdd) logger.info("Done") }
主要邏輯在 savePyramid 函數中(scala 支持內部函數),其中 outputPlugin(currentId, rdd, conf, saveAction)
是將瓦片持久化的關鍵操做,val outputPlugin = ...
是取到持久化的種類,這裏無需過多考慮,只要考慮成是 Accumulo 或者其餘種類便可,因此 outputPlugin(currentId, rdd, conf, saveAction)
調用了 OutputPlugin
類型的 apply 方法,以下:函數
def apply( id: LayerId, rdd: RDD[(K, V)] with Metadata[M], conf: EtlConf, saveAction: SaveAction[K, V, M] = SaveAction.DEFAULT[K, V, M] ): Unit = { implicit val sc = rdd.sparkContext saveAction(attributes(conf), writer(conf), id, rdd) }
saveAction 默認取了 SaveAction.DEFAULT[K, V, M]
,這是定義在 ETL 類中的一個方法,是的,此處傳入了一個方法, saveAction(attributes(conf), writer(conf), id, rdd)
實際執行了下述方法:大數據
def DEFAULT[K, V, M] = { (_: AttributeStore, writer: Writer[LayerId, RDD[(K, V)] with Metadata[M]], layerId: LayerId, rdd: RDD[(K, V)] with Metadata[M]) => writer.write(layerId, rdd) }
能夠看到最後調用的是 writer.write(layerId, rdd)
,此處 writer 根據持久化對象不一樣而不一樣,在 Accumulo 中爲 AccumuloLayerWriter。ui
到此咱們便清楚了 save 方法的工做流程以及整個 ETL 操做的工做流程,下面開始對其進行改造。spa
本文僅針對瓦片數據持久化放到 Accumulo 數據庫中進行介紹,並未如原代碼同樣對全部狀況進行自動適配,其餘持久化方式只需判斷和修改對應的 LayerWriter 實例便可。scala
首先判斷持久化對象中是否已存在此圖層,代碼以下:
val currentId: LayerId = ... val instance = conf.outputProfile.get.asInstanceOf[AccumuloProfile].getInstance.get val attributeStore = AccumuloAttributeStore(instance) val exist = attributeStore.layerExists(currentId)
首先取到持久化的實例,本文直接指定爲 Accumulo 類型,然後獲取 AccumuloAttributeStore 對象,此對象至關因而元數據,其中存儲圖層的範圍層級等信息,最後經過 layerExists 方法便可獲得圖層是否存在。
若是圖層不存在則直接調用原生的 outputPlugin(currentId, rdd, conf)
便可,若是圖層已經存在則執行下述操做:
AccumuloLayerWriter(instance = instance, conf.output.backend.path.toString, AccumuloLayerWriterOptions(SocketWriteStrategy())) .update(currentId, rdd, (v1: V, v2: V) => v1.merge(v2))
此處須要特別指出的是 AccumuloLayerWriterOptions(SocketWriteStrategy())
,此句指明瞭 Accumulo 的操做策略,按照官方說法,使用 SocketWriteStrategy 會致使操做變慢,切不能針對大量數據的導入操做,使用 HdfsWriteStrategy 支持 Accumulo 大批量導入操做(我的猜想是 Accumulo 數據存放在 HDFS 中,首先把數據寫入 HDFS 而後再並行持久化到 Accumulo,因此能夠進行大量數據操做)。雖然看上去 HdfsWriteStrategy 很是完美,可是問題在於使用此策略沒法執行 update 操做,會報錯。魚和熊掌不能兼得,須要根據實際狀況進行選擇和設計。
這樣就可實現圖層中瓦片的更新操做。
固然寫到這並無完成工做,若是僅在 save 函數中完成上述改造,再真正的 update 的時候會報錯,提示 key index 超出定義的範圍,須要從新定義。還記得上面說的 attributeStore 吧,經過此方法能夠取到元數據信息,此處的 key index 也寫在元數據中,key index 說白了就是瓦片編號的範圍,咱們都知道瓦片是根據編號進行請求的,那麼一塊數據就會有一個編號範圍,因此圖層不存在的時候執行的是 write 方法,寫入的是當時數據瓦片編號範圍,可是真正執行 update 的時候通常確定是跟第一次數據範圍不一樣的,因而提示你須要更新編號的範圍。這個問題很容易解決,咱們只須要在第一次寫入的時候將數據範圍設置成全球便可。
在 tile 方法的 resizingTileRDD 方法定義以下:
def resizingTileRDD( rdd: RDD[(I, V)], floatMD: TileLayerMetadata[K], targetLayout: LayoutDefinition ): RDD[(K, V)] with Metadata[TileLayerMetadata[K]] = { // rekey metadata to targetLayout val newSpatialBounds = KeyBounds(targetLayout.mapTransform(floatMD.extent)) val tiledMD = floatMD.copy( bounds = floatMD.bounds.setSpatialBounds(newSpatialBounds), layout = targetLayout ) // > 1 means we're upsampling during tiling process val resolutionRatio = floatMD.layout.cellSize.resolution / targetLayout.cellSize.resolution val tilerOptions = Tiler.Options( resampleMethod = method, partitioner = new HashPartitioner( partitions = (math.pow(2, (resolutionRatio - 1) * 2) * rdd.partitions.length).toInt)) val tiledRDD = rdd.tileToLayout[K](tiledMD, tilerOptions) ContextRDD(tiledRDD, tiledMD) }
val newSpatialBounds = KeyBounds(targetLayout.mapTransform(floatMD.extent))
是獲取到當前數據在此 zoom 下的瓦片編號範圍,那麼咱們只須要將此處改爲整個範圍便可,以下:
val newSpatialBounds = KeyBounds( SpatialKey(0, 0), SpatialKey( col = targetLayout.layoutCols, row = targetLayout.layoutRows ))
這樣便可實現正常的 update 操做。
閱讀此文須要對 Geotrellis 框架有總體瞭解並熟悉其基本使用,能夠參考本系列博客,使用 geotrellis 也須要對 scala 有所掌握,scala 語法在我接觸過的全部語言中應當是比較靈活的,靈活就致使麻煩。。。。
本文簡單介紹瞭如何實現 ETL 過程的 update 操做。這是我失業後寫的第一篇博客,失業後整我的對全部事情的理解更上了一步,不管是對技術仍是生活都有更多的感悟,生活和技術都須要慢慢品味。
Geotrellis系列文章連接地址http://www.cnblogs.com/shoufengwei/p/5619419.html