目前,这些是options来安排数据流作业的执行,我知道:
使用App Engine Cron服务或云功能。
example是用Java编写的,有没有Python的官方示例这么简单?
example与Python一起使用,但我不确定当前是否仍然是一个好的选项,或者是否“已弃用”
从计算引擎中的cron作业
有这方面的教程吗?
在流管道中使用窗口
我认为这是最简单的,但是,总成本是最好的想法吗?
Scheduler
这是有效的方法吗?

最佳答案

我使用App Engine Flex作为数据流启动程序。此微服务具有按需启动数据流作业的终结点,cron也可以命中这些终结点。
这是我的项目结构:

df_tasks/
- __init__.py
- datastore_to_csv.py
- ...other_piplines
__init__.py
dflaunch.yaml
main.py
setup.py <-- used by pipelines

我的诀窍是正确设置管道依赖项。即,对管道依赖项使用setup.py。设置成这样的例子最有帮助:
https://github.com/apache/beam/tree/master/sdks/python/apache_beam/examples/complete/juliaset
设置.py:
import setuptools

setuptools.setup(
    name='dataflow_python_pipeline',
    version='1.0.0',
    description='DataFlow Python Pipeline',
    packages=setuptools.find_packages(),
)

我的管道在df_tasks中配置如下:
pipeline_options = PipelineOptions.from_dictionary({
        'project': project,
        'runner': 'DataflowRunner',
        'staging_location': bucket_path+'/staging',
        'temp_location': bucket_path+'/temp',
        'setup_file': './setup.py'
    })

然后在main.py中:
from df_tasks import datastore_to_csv

project_id = os.environ['GCLOUD_PROJECT']

@app.route('/datastore-to-csv', methods=['POST'])
def df_day_summary():
    # Extract Payload
        payload = request.get_json()
        model = payload['model']
        for_date = datetime.datetime.strptime(payload['for_date'], '%Y/%m/%d')
    except Exception as e:
        print traceback.format_exc()
        return traceback.format_exc()
    # launch the job
    try:
        job_id, job_name = datastore_to_csv.run(
            project=project_id,
            model=model,
            for_date=for_date,
        )
        # return the job id
        return jsonify({'jobId': job_id, 'jobName': job_name})
    except Exception as e:
        print traceback.format_exc()
        return traceback.format_exc()

10-02 07:22
查看更多