问题描述
我正在使用arcpy
对栅格进行一些数字运算,希望使用multiprocessing
包来加快速度。基本上,我需要遍历元组列表,使用每个元组进行一些栅格计算,并将一些输出写入文件。我的输入由一个数据栅格(水深测量)、一个定义区域的栅格和一个由两个浮点(水面高程、深度)组成的元组组成。我的过程由一个函数computeplane
组成,该函数获取一个元组并运行一系列栅格计算以生成五个栅格(总栅格、滨海栅格、表层栅格、次表层栅格和深栅格栅格),然后为每个栅格调用processtable
函数以使用arcpy.sa.ZonalStatisticsAsTable
将值写入DBF,使用arcpy.AddField_management
添加一些字段,使用arcpy.TableToTable_conversion
将DBF转换为CSV,最后使用arcpy.Delete_management
删除DBF文件。
基于一些other SO posts,我将我的代码包装在main()
中,以便multiprocessing.Pool
可以很好地发挥作用。我使用main()
创建元组集,使用pool.map
进行多处理。我使用tempfile
包为DBF文件选择名称以避免名称冲突;CSV文件名保证不会与其他线程冲突。
我已经使用for循环测试了我的代码,它工作得很好,但当我尝试使用pool.map
时,我收到
这里发生了什么?这个错误没有出现在代码的非多进程版本中,我也不会写出任何地方的栅格-话又说回来,我不知道arcpy
如何处理中间栅格(我肯定不认为它会让它们留在内存中-它们太大了)。我需要告诉arcpy
它处理栅格计算的方式才能使多处理工作吗?我已在下面包含了我的python文件。
import arcpy
arcpy.CheckOutExtension("Spatial")
import arcpy.sa
import numpy
import multiprocessing
import tempfile
bathymetry_path = r'C:/GIS workspace/RRE/habitat.gdb/bathymetry_NGVD_meters'
zones_path = r'C:/GIS workspace/RRE/habitat.gdb/markerzones_meters'
table_folder = r'C:/GIS workspace/RRE/zonetables'
bathymetry = arcpy.sa.Raster(bathymetry_path)
zones = arcpy.sa.Raster(zones_path)
def processtable(raster_obj, zone_file, w, z, out_folder, out_file):
temp_name = "/" + next(tempfile._get_candidate_names()) + ".dbf"
arcpy.sa.ZonalStatisticsAsTable(zone_file, 'VALUE', raster_obj, out_folder + temp_name, "DATA", "SUM")
arcpy.AddField_management(out_folder + temp_name, "wse", 'TEXT')
arcpy.AddField_management(out_folder + temp_name, "z", 'TEXT')
arcpy.CalculateField_management(out_folder + temp_name, "wse", "'" + str(w) + "'", "PYTHON")
arcpy.CalculateField_management(out_folder + temp_name, "z", "'" + str(z) + "'", "PYTHON")
arcpy.TableToTable_conversion(out_folder + temp_name, out_folder, out_file)
arcpy.Delete_management(out_folder + temp_name)
def computeplane(wsedepth):
wse = wsedepth[0]
depth = wsedepth[1]
total = bathymetry < depth
littoral = total & ((wse - bathymetry) < 2)
surface = total & ~(littoral) & ((total + wse - depth) < (total + 2))
profundal = total & ((total + wse - depth) > (total + 5))
subsurface = total & ~(profundal | surface | littoral)
# zonal statistics table names
total_name = 'total_w' + str(wse) + '_z' + str(depth) + '.csv'
littoral_name = 'littoral_w' + str(wse) + '_z' + str(depth) + '.csv'
surface_name = 'surface_w' + str(wse) + '_z' + str(depth) + '.csv'
subsurface_name = 'subsurface_w' + str(wse) + '_z' + str(depth) + '.csv'
profundal_name = 'profundal_w' + str(wse) + '_z' + str(depth) + '.csv'
# compute zonal statistics
processtable(total, zones, wse, depth, table_folder, total_name)
processtable(littoral, zones, wse, depth, table_folder, littoral_name)
processtable(surface, zones, wse, depth, table_folder, surface_name)
processtable(profundal, zones, wse, depth, table_folder, profundal_name)
processtable(subsurface, zones, wse, depth, table_folder, subsurface_name)
def main():
watersurface = numpy.arange(-15.8, 2.7, 0.1)
# take small subset of the tuples: watersurface[33:34]
wsedepths = [(watersurface[x], watersurface[y]) for x in range(watersurface.size)[33:34] for y in range(watersurface[0:x+1].size)]
pool = multiprocessing.Pool()
pool.map(computeplane, wsedepths)
pool.close()
pool.join()
if __name__ == '__main__':
main()
更新
更多的调查表明,这与其说是multiprocessing
的问题,不如说是ArcGIS处理栅格的方式的问题。栅格代数结果被写入到默认工作空间中的文件中;在我的例子中,我没有指定文件夹,因此arcpy
将栅格写入到某个AppData文件夹。ArcGIS根据您的代数表达式使用诸如Lessth、Lessth_1等基本名称。因为我没有指定工作区,所以所有multiprocessing
线程都写入此文件夹。虽然单个arcpy
进程可以跟踪名称,但多个进程都试图写入相同的栅格名称并撞上其他进程的锁。我尝试在computeplane
开头创建一个随机工作区(文件gdb),然后在结尾将其删除,但arcpy
通常不会及时释放其锁定,并且进程在DELETE语句上崩溃。所以我不确定如何继续。
推荐答案
我也花了很大力气来制作一个可以工作的脚本。
由于您的数据与我习惯的数据略有不同,我无法测试答案,也不知道它是否有效,但可以说一些东西来打开您的思维,使您能够进行多进程处理,并提出一个新的脚本供您测试。
我看到有些东西可以并行化,有些东西不能并行化,或者从多进程中几乎得不到任何好处。
使用多进程和arcpy的好做法是不使用地理数据库,尝试写入实际的‘.tif’文件或‘in_Memory/Tempfile’,然后在计算后删除,在要并行化的函数内部。不要一开始就为空间而烦恼,减少要测试的数据,当它奏效时,通过删除临时文件来改善它,在中间放一些有用的打印。回到问题上,脚本中有一些东西是不能使用的,比如传递一个arcpy.Raster对象,而不是传递给栅格的路径,并在函数中读取它。栅格的路径是一个字符串,因此它是可拾取的,这意味着它们可以轻松地传递到一个新进程,必须特别注意这一点。另一件事是声明一个类似于‘TABLE_FOLDER’的变量,并期望您的所有7核内核都返回到第一个内核中的原始进程,以询问它的路径。多进程之间不容易共享变量,对于原始代码,您必须创建一个独立运行的函数。主模块向空闲的CPU发送的内容是创建一个新的Python进程,其中几乎只有函数声明、传递给它的参数和必要的导入模块,所有这些都准备好执行,它不能回头询问。
对我起作用的还是在一个非常切分的函数(不是我创建的)中使用apply_async
,该函数使用进程作为键,apply_async
的结果作为值来创建字典,然后将其放入try/except
中,以便如果一个进程发生错误,它不会停止主进程的执行。大概是这样的:
def computeplane_multi(processList):
pool = Pool(processes=4, maxtasksperchild=10)
jobs= {}
for item in processList:
jobs[item[0]] = pool.apply_async(computeplane, [x for x in item])
for item,result in jobs.items():
try:
result = result.get()
except Exception as e:
print(e)
pool.close()
pool.join()
回到您的代码,我对您的尝试(并告诉我)做了一些修改:
import arcpy
import numpy
import multiprocessing
import tempfile
bathymetry_path = r'C:/GIS workspace/RRE/habitat.gdb/bathymetry_NGVD_meters'
zones_path = r'C:/GIS workspace/RRE/habitat.gdb/markerzones_meters'
table_folder = r'C:/GIS workspace/RRE/zonetables'
def computeplane(bathymetry_path,zones_path,wsedepth):
def processtable(raster_obj, zones_path, w, z, out_folder, out_file):
zone_file = arcpy.sa.Raster(zones_path)
temp_name = "/" + next(tempfile._get_candidate_names()) + ".dbf"
arcpy.sa.ZonalStatisticsAsTable(zone_file, 'VALUE', raster_obj, out_folder + temp_name, "DATA", "SUM")
arcpy.AddField_management(out_folder + temp_name, "wse", 'TEXT')
arcpy.AddField_management(out_folder + temp_name, "z", 'TEXT')
arcpy.CalculateField_management(out_folder + temp_name, "wse", "'" + str(w) + "'", "PYTHON")
arcpy.CalculateField_management(out_folder + temp_name, "z", "'" + str(z) + "'", "PYTHON")
arcpy.TableToTable_conversion(out_folder + temp_name, out_folder, out_file)
arcpy.Delete_management(out_folder + temp_name)
bathymetry = arcpy.sa.Raster(bathymetry_path)
wse = wsedepth[0]
depth = wsedepth[1]
total = bathymetry < depth
littoral = total & ((wse - bathymetry) < 2)
surface = total & ~(littoral) & ((total + wse - depth) < (total + 2))
profundal = total & ((total + wse - depth) > (total + 5))
subsurface = total & ~(profundal | surface | littoral)
# zonal statistics table names
total_name = 'total_w' + str(wse) + '_z' + str(depth) + '.csv'
littoral_name = 'littoral_w' + str(wse) + '_z' + str(depth) + '.csv'
surface_name = 'surface_w' + str(wse) + '_z' + str(depth) + '.csv'
subsurface_name = 'subsurface_w' + str(wse) + '_z' + str(depth) + '.csv'
profundal_name = 'profundal_w' + str(wse) + '_z' + str(depth) + '.csv'
# compute zonal statistics
processtable(total, zones_path, wse, depth, table_folder, total_name)
processtable(littoral, zones_path, wse, depth, table_folder, littoral_name)
processtable(surface, zones_path, wse, depth, table_folder, surface_name)
processtable(profundal, zones_path, wse, depth, table_folder, profundal_name)
processtable(subsurface, zones_path, wse, depth, table_folder, subsurface_name)
print('point processed : {},{}'.format(wsedepth[0],wsedepth[1]))
def computeplane_multi(processList):
pool = Pool(processes=4, maxtasksperchild=10)
jobs= {}
for item in processList:
jobs[item[0]] = pool.apply_async(computeplane, [x for x in item])
for item,result in jobs.items():
try:
result = result.get()
except Exception as e:
print(e)
pool.close()
pool.join()
def main():
watersurface = numpy.arange(-15.8, 2.7, 0.1)
# take small subset of the tuples: watersurface[33:34]
wsedepths = [(watersurface[x], watersurface[y]) for x in range(watersurface.size)[33:34] for y in range(watersurface[0:x+1].size)]
processList = []
for i in wsedepths:
processList.append((bathymetry_path,zones_path,i))
computeplane_multi(processList)
if __name__ == '__main__':
main()
这篇关于ArcPy+多处理:错误:无法保存栅格数据集的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!