本文介绍了如何使用 spark-submit 和 pyspark 运行 luigi 任务的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个 luigi python 任务,其中包括一些 pyspark 库.现在我想用 spark-submit 在 mesos 上提交这个任务.我应该怎么做才能运行它?下面是我的代码框架:

I have a luigi python task which includes some pyspark libs. Now I would like to submit this task on mesos with spark-submit. What should I do to run it? Below is my code skeleton:

from pyspark.sql import functions as F
from pyspark import SparkContext

class myClass(SparkSubmitTask):
# date = luigi.DateParameter()

  def __init__(self, date):
    self.date = date # date is datetime.date.today().isoformat()

  def output(self):

  def input(self):

  def run(self):
    # Some functions are using pyspark libs

if __name__ == "__main__":
  luigi.run()

没有 luigi,我将此任务作为以下命令行提交:

Without luigi, I'm submmitting this task as the following command-line:

/opt/spark/bin/spark-submit --master mesos://host:port --deploy-mode cluster --total-executor-cores 1 --driver-cores 1 --executor-memory 1G --driver-memory 1G my_module.py

现在的问题是如何触发提交包含 luigi 命令行的 luigi 任务,例如:

Now the problem is how I can spark-submit the luigi task that includes luigi command-line such as:

luigi --module my_module myClass --local-scheduler --date 2016-01

还有一个问题是如果 my_module.py 有一个需要先完成的任务,我需要为它做更多的事情还是只是设置与当前命令行相同?

One more question is if my_module.py has a required task to finish first, do I need to do something more for it or just set the same as the current command-line?

我非常感谢您对此的任何提示或建议.非常感谢.

I really appreciate for any hints or suggestions for this. Thanks very much.

推荐答案

Luigi 有一些模板任务.其中之一称为 PySparkTask.您可以从此类继承并覆盖属性:

Luigi has some template Tasks. One of them called PySparkTask.You can inherit from this class and override the properties:

https://github.com/spotify/luigi/blob/master/luigi/contrib/spark.py.

我还没有测试过,但根据我对 luigi 的经验,我会尝试这个:

I haven't tested it but based on my experience with luigi I would have try this:

import my_module


class MyPySparkTask(PySparkTask):
    date = luigi.DateParameter()

    @property
    def name(self):
        return self.__class__.__name__

    @property
    def master(self):
        return 'mesos://host:port'

    @property
    def deploy_mode(self):
        return 'cluster'

    @property
    def total_executor_cores(self):
        return 1

    @property
    def driver_cores(self):
        return 1

    @property
    def executor-memory(self):
        return 1G

    @property
    def driver-memory(self):
        return 1G

    def main(self, sc, *args):
        my_module.run(sc)

    def self.app_options():
        return [date]

然后你可以运行它:luigi --module task_module MyPySparkTask --local-scheduler --date 2016-01

Then you can run it with: luigi --module task_module MyPySparkTask --local-scheduler --date 2016-01

还有一个选项可以在 client.cfg 文件中设置属性,以使它们成为其他 PySparkTasks 的默认值:

There is also an option to set the properties in a client.cfg file in order to make them the default values for other PySparkTasks:

[spark]
master: mesos://host:port
deploy_mode: cluster
total_executor_cores: 1
driver_cores: 1
executor-memory: 1G
driver-memory: 1G

这篇关于如何使用 spark-submit 和 pyspark 运行 luigi 任务的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

05-26 15:47