前言

上一篇中简单介绍了 COG 的概念和 Geotrellis 中引入 COG 的原因及简单的原理,本文为大家介绍如何在 Geotrellis 中使用 COG 来写入和读取 GeoTIFF数据。

一、写入数据——ETL

1.1 实现方案

其实这与之前的普通 ETL 操作在概念上是相似的,都是将原始数据转换成系统能用的数据的过程,这是宽泛的 ETL 的定义。在 Geotrellis 中实现很简单,与之前代码基本一致,只要切换一下 writer 类型以及最后建立金字塔额时候略有不同。实现方案如下:

val inputPath = "file://" + new File("in.tif").getAbsolutePath
val outputPath = "/your/catalog/path" def main(args: Array[String]): Unit = {
// Setup Spark to use Kryo serializer.
val conf =
new SparkConf()
.setMaster("local[*]")
.setAppName("Spark Tiler")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("spark.kryo.registrator", "geotrellis.spark.io.kryo.KryoRegistrator") val sc = new SparkContext(conf)
try {
run(sc)
} finally {
sc.stop()
}
} def run(implicit sc: SparkContext) = {
val inputRdd: RDD[(ProjectedExtent, MultibandTile)] =
sc.hadoopMultibandGeoTiffRDD(inputPath) val (_, rasterMetaData) =
TileLayerMetadata.fromRdd(inputRdd, FloatingLayoutScheme(256)) val tiled: RDD[(SpatialKey, MultibandTile)] =
inputRdd
.tileToLayout(rasterMetaData.cellType, rasterMetaData.layout, Bilinear)
.repartition(100) val layoutScheme = ZoomedLayoutScheme(WebMercator, tileSize = 256) val (zoom, reprojected): (Int, RDD[(SpatialKey, MultibandTile)] with Metadata[TileLayerMetadata[SpatialKey]]) =
MultibandTileLayerRDD(tiled, rasterMetaData)
.reproject(WebMercator, layoutScheme, Bilinear) val attributeStore = FileAttributeStore(outputPath) val writer = FileCOGLayerWriter(attributeStore) val layerName = "layername" val cogLayer =
COGLayer.fromLayerRDD(
reprojected,
zoom,
compression = NoCompression,
maxTileSize = 4096
) val keyIndexes =
cogLayer.metadata.zoomRangeInfos.
map { case (zr, bounds) => zr -> ZCurveKeyIndexMethod.createIndex(bounds) }.
toMap writer.writeCOGLayer(layerName, cogLayer, keyIndexes)
}

执行 main 函数即可实现 COG 方式的 ETL 操作,其他部分与之前介绍过的 ingest 相同,主要区别在于 writer,此处为 FileCOGLayerWriter 实例,从名字可以看出这是一个文件系统的 COG writer,目前 Geotrellis 实现了三种,分别为 S3、Hadoop、File,这三种后端理论上都是对大量小文件支持不好的。

1.2 背后逻辑

下面来详细分析一下 Geotrellis 中 COG 实现原理。

首先看上面的 cogLayer 对象:

val cogLayer =
COGLayer.fromLayerRDD(
reprojected,
zoom,
compression = NoCompression,
maxTileSize = 4096
)

cogLayer 对象是 COGLayer 实例,fromLayerRDD 源码如下:

def fromLayerRDD[
K: SpatialComponent: Ordering: JsonFormat: ClassTag,
V <: CellGrid: ClassTag: ? => TileMergeMethods[V]: ? => TilePrototypeMethods[V]: ? => TileCropMethods[V]: GeoTiffBuilder
](
rdd: RDD[(K, V)] with Metadata[TileLayerMetadata[K]],
baseZoom: Int,
compression: Compression = Deflate,
maxTileSize: Int = 4096,
minZoom: Option[Int] = None
): COGLayer[K, V] = { if(minZoom.getOrElse(Double.NaN) != baseZoom.toDouble) {
if(rdd.metadata.layout.tileCols != rdd.metadata.layout.tileRows) {
sys.error("Cannot create Pyramided COG layer for non-square tiles.")
} if(!isPowerOfTwo(rdd.metadata.layout.tileCols)) {
sys.error("Cannot create Pyramided COG layer for tile sizes that are not power-of-two.")
}
} val layoutScheme =
ZoomedLayoutScheme(rdd.metadata.crs, rdd.metadata.layout.tileCols) if(rdd.metadata.layout != layoutScheme.levelForZoom(baseZoom).layout) {
sys.error(s"Tile Layout of layer and ZoomedLayoutScheme do not match. ${rdd.metadata.layout} != ${layoutScheme.levelForZoom(baseZoom).layout}")
} val keyBounds =
rdd.metadata.bounds match {
case kb: KeyBounds[K] => kb
case _ => sys.error(s"Cannot create COGLayer with empty Bounds")
} val cogLayerMetadata: COGLayerMetadata[K] =
COGLayerMetadata(
rdd.metadata.cellType,
rdd.metadata.extent,
rdd.metadata.crs,
keyBounds,
layoutScheme,
baseZoom,
minZoom.getOrElse(0),
maxTileSize
) val layers: Map[ZoomRange, RDD[(K, GeoTiff[V])]] =
cogLayerMetadata.zoomRanges.
sorted(Ordering[ZoomRange].reverse).
foldLeft(List[(ZoomRange, RDD[(K, GeoTiff[V])])]()) { case (acc, range) =>
if(acc.isEmpty) {
List(range -> generateGeoTiffRDD(rdd, range, layoutScheme, cogLayerMetadata.cellType, compression))
} else {
val previousLayer: RDD[(K, V)] = acc.head._2.mapValues { tiff =>
if(tiff.overviews.nonEmpty) tiff.overviews.last.tile
else tiff.tile
} val tmd: TileLayerMetadata[K] = cogLayerMetadata.tileLayerMetadata(range.maxZoom + 1)
val upsampledPreviousLayer =
Pyramid.up(ContextRDD(previousLayer, tmd), layoutScheme, range.maxZoom + 1)._2 val rzz = generateGeoTiffRDD(upsampledPreviousLayer, range, layoutScheme, cogLayerMetadata.cellType, compression) (range -> rzz) :: acc
}
}.
toMap COGLayer(layers, cogLayerMetadata)
}

此函数返回类型正是 COGLayer,其两个属性分别为 layers 和 cogLayerMetadata。

cogLayerMetadata 是 COGLayerMetadata 对象,表示 COG 层的元数据信息,包含每层对应的瓦片范围等,这个与传统的元数据很接近,唯一不同的在于此处使用了 ZommRange 的概念,即“ 1 层”可能对应多个 Zoom,而不再是 1 对 1 的关系,这样能够更进一步的节省存储空间,在处理速度和存储空间上做了综合考虑。

layers 是 Map[ZoomRange, RDD[(K, GeoTiff[V])]] 对象,ZoomRange 即为上述元数据中的每层的 zoom 最大和最小值,RDD[(K, GeoTiff[V])] 是 spark rdd 对象,即每一个层级范围对应一个 Tiff 对象,从此可以看出,COG 方式 ETL 后每层存储的不再是 Tile,而是 Tiff 文件,这个 Tiff 文件是 COG 类型的,当用户请求某个瓦片的时候直接从对应的 Tiff 文件中读出需要的部分即可。

最后调用 writer.writeCOGLayer(layerName, cogLayer, keyIndexes) 即可将元数据信息和 Tiff 数据写入相应的位置,完成 ETL 过程。

此处还需要注意的是为了防止单个 Tiff 文件过大, Geotrellis 对每一层进行了分割处理,这样每一层可能会得到多个 Tiff 文件,而为了达到 COG 的真实效果,又引入了 GDAL 中 VRT 的概念(参见http://www.gdal.org/gdal_vrttut.html),其中很详细的讲述了 VRT 的格式和意义,简单来说 VRT 就是将多个 Tiff 文件合并成一个虚拟的 Tiff 文件。

二、读取数据

数据做了 ETL 后,就可以读取出来并进行相应的处理。

2.1 实现方案

其实现方式也基本与传统方式相同,代码如下:

val catalogPath = new java.io.File("/your/catalog/path").getAbsolutePath
val fileValueReader = FileCOGValueReader(catalogPath)
val key = SpatialKey(x, y)
val tile = fileValueReader.reader(LayerId("layername", z)).read(key)

这样就能根据瓦片的 x、y 编号和 z(zoom)取到对应的瓦片。

2.2 原理

主要代码在 COGValueReader 类中的 baseReader 方法中 read 方法,如下:

GeoTiffReader[V].read(uri, decompress = false, streaming = true)
.getOverview(overviewIndex)
.crop(gridBounds)
.tile

传统方式存储的是切割好的瓦片,可以直接定位到确定的瓦片,这里是完全符合 COG 方式的读取方式。getOverview 获取到对应层(z)的 Tiff 文件,crop 对 Tiff 根据需要的范围(x、y)进行切割,tile 函数将其转为瓦片。

三、总结

本文介绍了如何在 Geotrellis 中如何进行 COG 方式的 ETL 操作,实现了全新的数据写入和读取方式。

04-27 06:20