以前 GeoTrellis 爲方便用戶將數據(GeoTiff 等遙感影像)導入到 backend (包含 Accumulo、File、Hadoop 等格式)中,編寫了一個 ETL 類,該類的輸入爲用戶配置好的 json 文件,其中包含數據源、數據類型、投影、瓦片類型、處理方式等等處理過程當中須要用到的信息。html
從 2.0 版開始,GeoTrellis 加入了流水線(pipeline)功能,用戶可使用 json 或者 AST 將數據的處理過程配置成處理的流水線過程,這樣只須要執行此流水線,系統便會自動的將輸入數據處理成最終結果。node
本文簡單分析 GeoTrellis 中的流水線實現方式,並探討此技術在其餘方面的應用。json
其實在功能和性能上並無任何的改進,只是將原來的 ETL 類代碼變成了流水線中的一個個節點,這些節點的信息還是原來 json 配置文件中的信息。那麼作此改進到底有什麼用呢?咱們先來看一下先後兩種方式同一種數據處理方式的代碼。服務器
object Etl { val defaultModules = Array(s3.S3Module, hadoop.HadoopModule, file.FileModule, accumulo.AccumuloModule, cassandra.CassandraModule, hbase.HBaseModule) type SaveAction[K, V, M] = (AttributeStore, Writer[LayerId, RDD[(K, V)] with Metadata[M]], LayerId, RDD[(K, V)] with Metadata[M]) => Unit object SaveAction { def DEFAULT[K, V, M] = { (_: AttributeStore, writer: Writer[LayerId, RDD[(K, V)] with Metadata[M]], layerId: LayerId, rdd: RDD[(K, V)] with Metadata[M]) => writer.write(layerId, rdd) } } def ingest[ I: Component[?, ProjectedExtent]: TypeTag: ? => TilerKeyMethods[I, K], K: SpatialComponent: Boundable: TypeTag, V <: CellGrid: TypeTag: RasterRegionReproject: Stitcher: (? => TileReprojectMethods[V]): (? => CropMethods[V]): (? => TileMergeMethods[V]): (? => TilePrototypeMethods[V]) ]( args: Seq[String], modules: Seq[TypedModule] = Etl.defaultModules )(implicit sc: SparkContext) = { implicit def classTagK = ClassTag(typeTag[K].mirror.runtimeClass(typeTag[K].tpe)).asInstanceOf[ClassTag[K]] implicit def classTagV = ClassTag(typeTag[V].mirror.runtimeClass(typeTag[V].tpe)).asInstanceOf[ClassTag[V]] EtlConf(args) foreach { conf => /* parse command line arguments */ val etl = Etl(conf, modules) /* load source tiles using input module specified */ val sourceTiles = etl.load[I, V] /* perform the reprojection and mosaicing step to fit tiles to LayoutScheme specified */ val (zoom, tiled) = etl.tile(sourceTiles) /* save and optionally pyramid the mosaiced layer */ etl.save[K, V](LayerId(etl.input.name, zoom), tiled) } } } case class Etl(conf: EtlConf, @transient modules: Seq[TypedModule] = Etl.defaultModules) extends LazyLogging { import Etl.SaveAction val input = conf.input val output = conf.output def scheme: Either[LayoutScheme, LayoutDefinition] = { if (output.layoutScheme.nonEmpty) { val scheme = output.getLayoutScheme logger.info(scheme.toString) Left(scheme) } else if (output.layoutExtent.nonEmpty) { val layout = output.getLayoutDefinition logger.info(layout.toString) Right(layout) } else sys.error("Either layoutScheme or layoutExtent with cellSize/tileLayout must be provided") } @transient val combinedModule = modules reduce (_ union _) /** * Loads RDD of rasters using the input module specified in the arguments. * This RDD will contain rasters as they are stored, possibly overlapping and not conforming to any tile layout. * * @tparam I Input key type * @tparam V Input raster value type */ def load[I: Component[?, ProjectedExtent]: TypeTag, V <: CellGrid: TypeTag]()(implicit sc: SparkContext): RDD[(I, V)] = { val plugin = { val plugins = combinedModule.findSubclassOf[InputPlugin[I, V]] if(plugins.isEmpty) sys.error(s"Unable to find input module for input key type '${typeTag[I].tpe.toString}' and tile type '${typeTag[V].tpe.toString}'") plugins.find(_.suitableFor(input.backend.`type`.name, input.format)).getOrElse(sys.error(s"Unable to find input module of type '${input.backend.`type`.name}' for format `${input.format} " + s"for input key type '${typeTag[I].tpe.toString}' and tile type '${typeTag[V].tpe.toString}'")) } // clip in dest crs input.clip.fold(plugin(conf))(extent => plugin(conf).filter { case (k, _) => val pe = k.getComponent[ProjectedExtent] output.getCrs.fold(extent.contains(pe.extent))(crs => extent.contains(pe.extent.reproject(pe.crs, crs))) }) } /** * Tiles RDD of arbitrary rasters to conform to a layout scheme or definition provided in the arguments. * First metadata will be collected over input rasters to determine the overall extent, common crs, and resolution. * This information will be used to select a LayoutDefinition if LayoutScheme is provided in the arguments. * * The tiling step will use this LayoutDefinition to cut input rasters into chunks that conform to the layout. * If multiple rasters contribute to single target tile their values will be merged cell by cell. * * The timing of the reproject steps depends on the method chosen. * BufferedReproject must be performed after the tiling step because it leans on SpatialComponent to identify neighboring * tiles and sample their edge pixels. This method is the default and produces the best results. * * PerTileReproject method will be performed before the tiling step, on source tiles. When using this method the * reproject logic does not have access to pixels past the tile boundary and will see them as NODATA. * However, this approach does not require all source tiles to share a projection. * * @param rdd RDD of source rasters * @param method Resampling method to be used when merging raster chunks in tiling step */ def tile[ I: Component[?, ProjectedExtent]: (? => TilerKeyMethods[I, K]), V <: CellGrid: RasterRegionReproject: Stitcher: ClassTag: (? => TileMergeMethods[V]): (? => TilePrototypeMethods[V]): (? => TileReprojectMethods[V]): (? => CropMethods[V]), K: SpatialComponent: Boundable: ClassTag ](rdd: RDD[(I, V)], method: ResampleMethod = output.resampleMethod): (Int, RDD[(K, V)] with Metadata[TileLayerMetadata[K]]) = { val targetCellType = output.cellType val destCrs = output.getCrs.get /** Tile layers form some resolution and adjust partition count based on resolution difference */ def resizingTileRDD( rdd: RDD[(I, V)], floatMD: TileLayerMetadata[K], targetLayout: LayoutDefinition ): RDD[(K, V)] with Metadata[TileLayerMetadata[K]] = { // rekey metadata to targetLayout val newSpatialBounds = KeyBounds(targetLayout.mapTransform(floatMD.extent)) val tiledMD = floatMD.copy( bounds = floatMD.bounds.setSpatialBounds(newSpatialBounds), layout = targetLayout ) // > 1 means we're upsampling during tiling process val resolutionRatio = floatMD.layout.cellSize.resolution / targetLayout.cellSize.resolution val tilerOptions = Tiler.Options( resampleMethod = method, partitioner = new HashPartitioner( partitions = (math.pow(2, (resolutionRatio - 1) * 2) * rdd.partitions.length).toInt)) rdd.tileToLayout[K](tiledMD, tilerOptions) } output.reprojectMethod match { case PerTileReproject => def reprojected(targetCellSize: Option[CellSize] = None) = { val reprojectedRdd = rdd.reproject(destCrs, RasterReprojectOptions(method = method, targetCellSize = targetCellSize)) val floatMD = { // collecting floating metadata allows detecting upsampling val (_, md) = reprojectedRdd.collectMetadata(FloatingLayoutScheme(output.tileSize)) md.copy(cellType = targetCellType.getOrElse(md.cellType)) } reprojectedRdd -> floatMD } scheme match { case Left(scheme: ZoomedLayoutScheme) if output.maxZoom.isDefined => val LayoutLevel(zoom, layoutDefinition) = scheme.levelForZoom(output.maxZoom.get) val (reprojectedRdd, floatMD) = reprojected(Some(layoutDefinition.cellSize)) zoom -> resizingTileRDD(reprojectedRdd, floatMD, layoutDefinition) case Left(scheme) => // True for both FloatinglayoutScheme and ZoomedlayoutScheme val (reprojectedRdd, floatMD) = reprojected() val LayoutLevel(zoom, layoutDefinition) = scheme.levelFor(floatMD.extent, floatMD.cellSize) zoom -> resizingTileRDD(reprojectedRdd, floatMD, layoutDefinition) case Right(layoutDefinition) => val (reprojectedRdd, floatMD) = reprojected() 0 -> resizingTileRDD(reprojectedRdd, floatMD, layoutDefinition) } case BufferedReproject => // Buffered reproject requires that tiles are already in some layout so we can find the neighbors val md = { // collecting floating metadata allows detecting upsampling val (_, md) = rdd.collectMetadata(FloatingLayoutScheme(output.tileSize)) md.copy(cellType = targetCellType.getOrElse(md.cellType)) } val tiled = ContextRDD(rdd.tileToLayout[K](md, method), md) scheme match { case Left(layoutScheme: ZoomedLayoutScheme) if output.maxZoom.isDefined => val LayoutLevel(zoom, layoutDefinition) = layoutScheme.levelForZoom(output.maxZoom.get) (zoom, output.bufferSize match { case Some(bs) => tiled.reproject(destCrs, layoutDefinition, bs, RasterReprojectOptions(method = method, targetCellSize = Some(layoutDefinition.cellSize)))._2 case _ => tiled.reproject(destCrs, layoutDefinition, RasterReprojectOptions(method = method, targetCellSize = Some(layoutDefinition.cellSize)))._2 }) case Left(layoutScheme) => output.bufferSize match { case Some(bs) => tiled.reproject(destCrs, layoutScheme, bs, method) case _ => tiled.reproject(destCrs, layoutScheme, method) } case Right(layoutDefinition) => output.bufferSize match { case Some(bs) => tiled.reproject(destCrs, layoutDefinition, bs, method) case _ => tiled.reproject(destCrs, layoutDefinition, method) } } } } /** * Saves provided RDD to an output module specified by the ETL arguments. * This step may perform two to one pyramiding until zoom level 1 is reached. * * @param id Layout ID to b * @param rdd Tiled raster RDD with TileLayerMetadata * @param saveAction Function to be called for saving. Defaults to writing the layer. * This gives the caller an oppurtunity to modify the layer before writing, * or to save additional attributes in the attributes store. * * @tparam K Key type with SpatialComponent corresponding LayoutDefinition * @tparam V Tile raster with cells from single tile in LayoutDefinition */ def save[ K: SpatialComponent: TypeTag, V <: CellGrid: TypeTag: ? => TileMergeMethods[V]: ? => TilePrototypeMethods[V] ]( id: LayerId, rdd: RDD[(K, V)] with Metadata[TileLayerMetadata[K]], saveAction: SaveAction[K, V, TileLayerMetadata[K]] = SaveAction.DEFAULT[K, V, TileLayerMetadata[K]] ): Unit = { implicit def classTagK = ClassTag(typeTag[K].mirror.runtimeClass(typeTag[K].tpe)).asInstanceOf[ClassTag[K]] implicit def classTagV = ClassTag(typeTag[V].mirror.runtimeClass(typeTag[V].tpe)).asInstanceOf[ClassTag[V]] val outputPlugin = combinedModule .findSubclassOf[OutputPlugin[K, V, TileLayerMetadata[K]]] .find { _.suitableFor(output.backend.`type`.name) } .getOrElse(sys.error(s"Unable to find output module of type '${output.backend.`type`.name}'")) def savePyramid(zoom: Int, rdd: RDD[(K, V)] with Metadata[TileLayerMetadata[K]]): Unit = { val currentId = id.copy(zoom = zoom) outputPlugin(currentId, rdd, conf, saveAction) scheme match { case Left(s) => if (output.pyramid && zoom >= 1) { val (nextLevel, nextRdd) = Pyramid.up(rdd, s, zoom, output.getPyramidOptions) savePyramid(nextLevel, nextRdd) } case Right(_) => if (output.pyramid) logger.error("Pyramiding only supported with layoutScheme, skipping pyramid step") } } savePyramid(id.zoom, rdd) logger.info("Done") } }
implicit val sc: SparkContext = ??? val scheme = Left[LayoutScheme, LayoutDefinition](FloatingLayoutScheme(512)) val jsonRead = JsonRead("s3://geotrellis-test/", `type` = ReadTypes.SpatialS3Type) val jsonTileToLayout = TileToLayout(`type` = TransformTypes.SpatialTileToLayoutType) val jsonReproject = Reproject("EPSG:3857", scheme, `type` = TransformTypes.SpatialBufferedReprojectType) val jsonPyramid = Pyramid(`type` = TransformTypes.SpatialPyramidType) val jsonWrite = JsonWrite("mask", "s3://geotrellis-test/pipeline/", PipelineKeyIndexMethod("zorder"), scheme, `type` = WriteTypes.SpatialType) val list: List[PipelineExpr] = jsonRead ~ jsonTileToLayout ~ jsonReproject ~ jsonPyramid ~ jsonWrite // typed way, as in the JSON example above val typedAst: Node[Stream[(Int, TileLayerRDD[SpatialKey])]] = list .node[Stream[(Int, TileLayerRDD[SpatialKey])]] val result: Stream[(Int, TileLayerRDD[SpatialKey])] = typedAst.eval // in some cases you may want just to evaluate the pipeline // to add some flexibility we can do parsing and avaluation steps manually // erasedNode function would parse JSON into an ErasedNode type that can be evaluated val untypedAst: ErasedNode = list.erasedNode // it would be an untyped result, just some evaluation // but you still have a chance to catch and handle some types of exceptions val untypedResult: Any = Try { untypedAst.unsafeEval } match { case Success(_) => case Failure(e) => } // typed result val typedResult: Option[Stream[(Int, TileLayerRDD[SpatialKey])]] = Try { untypedAst.eval } match { case Success(stream) => Some(stream) case Failure(e) => None }
從代碼量咱們就能看出來新的流水線方式明顯減小了不少,其實正如前面所說,流水線就是將以前的操做封裝成了一個個的操做節點,每種節點的代碼已經寫好,用戶只須要將本身須要操做的節點串聯起來,最終執行整個流水線便可。網絡
認真的或者是熟悉 GeoTreliis 數據 ETL 的用戶都知道,其實 ETL 無非是以單波段、多波段兩種波段形式的柵格數據以及無時間數據和時間序列數據的兩種時間格式的組合類型爲輸入及數據的存儲位置(S三、Hadoop、File等),取出此數據並作投影轉換、合併、生成金字塔等變換,最終將數據寫入指定的 backend。app
因此其 Pipeline 實現就是定義了對應的 ReadType、TreansformType、WriteType。咱們看上面的例子框架
val jsonRead = JsonRead("s3://geotrellis-test/", `type` = ReadTypes.SpatialS3Type)
指定了 Read 部分,包含存放路徑、存放位置(S3)、數據類型(Singleband)、時間格式(Spatial 無時間標記)。ide
val jsonTileToLayout = TileToLayout(`type` = TransformTypes.SpatialTileToLayoutType) val jsonReproject = Reproject("EPSG:3857", scheme, `type` = TransformTypes.SpatialBufferedReprojectType) val jsonPyramid = Pyramid(`type` = TransformTypes.SpatialPyramidType)
TileToLayout 將數據變成具備數據類型、空間佈局等信息的 RDD,方便後續的瓦片切割等操做。函數
Reproject 對數據作投影變換。oop
Pyramid 將數據切割成金字塔。
val jsonWrite = JsonWrite("mask", "s3://geotrellis-test/pipeline/", PipelineKeyIndexMethod("zorder"), scheme, `type` = WriteTypes.SpatialType)
JsonWrite 指定數據的輸出方式,包含索引方式、輸出類型,而且系統自動根據給定的 uri 判斷輸出存儲位置。
到此,已經指定好了上述的三種節點。
val list: List[PipelineExpr] = jsonRead ~ jsonTileToLayout ~ jsonReproject ~ jsonPyramid ~ jsonWrite
此句將上述的操做節點串聯起來生成了一個 List。
val typedAst: Node[Stream[(Int, TileLayerRDD[SpatialKey])]] = list .node[Stream[(Int, TileLayerRDD[SpatialKey])]] val result: Stream[(Int, TileLayerRDD[SpatialKey])] = typedAst.eval
上述兩句生成對應的節點序列,最終執行 eval
函數,執行整個流水線獲得最終結果。
就是這麼簡單的幾句,完成了整個數據的處理流程,須要注意的是在串聯最終流水線的時候,前一個數據的輸出必定是後一個數據的輸入類型,不然流水線便沒法繼續執行。
整個原理很相似最近很火的 TensorFlow、Keras 等神經網絡框架,首先定義一個神經網絡節點處理模型,其實就是數據處理模型,兩者是一致的,只不過神經網絡更關注數據的狀態,好比維度、尺寸(節點數量)等等,而 GeoTrellis 關注的是數據處理 的方式。
關於 GeoTrellis 的流水線實現原理就介紹到這裏,感興趣的朋友能夠查閱源碼進行進一步分析。
認真學習了 GeoTrellis 的 Pipeline 技術 後,我發現不少東西均可以用這種方式來實現,好比剛剛講到的神經網絡。再好比咱們能夠將遙感數據的其餘處理也封裝成流水線,如不一樣的模型計算、勻光勻色、正射糾正等等。
凡是這種涉及到先後因果關聯或是須要一步步進行操做的過程均可以封裝成流水線,使得在後續處理的時候更加的規範化而且代碼更精簡,更方便使用。這也正是福特汽車爲整個汽車工業帶來的革命性鉅變。
最近讀計算機原理的相關書籍,也着重介紹了 CPU 指令工做的流水線技術,這些技術也能夠用到數據處理中來,將數據處理流程按照指令來運行,這樣好比對於涉及到大量內存操做或涉及到大量 CPU 操做的就能夠錯開,能夠保持服務器的全負荷運行,必然可以加快處理速度。
本文介紹了 GeoTrellis 中的 Pipeline 實現原理,並簡單分析了此技術對於咱們處理其餘技術的一些啓發。
Geotrellis系列文章連接地址http://www.cnblogs.com/shoufengwei/p/5619419.html