本文介绍了如何在Palantir Foundry中合并多个动态输入?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想在Palantir Foundry中合并多个数据集,数据集的名称是动态的,因此我将无法静态地在transforms_df(..)中提供数据集名称.有没有一种方法可以动态地在transforms_df中接受多个输入并合并所有这些数据帧?

I want to Union multiple datasets in Palantir Foundry, the name of the datasets are dynamic so I would not be able to give the dataset names in transforms_df(..) statically. Is there a way I can dynamically take multiple input in transforms_df and union all of those dataframes?

我尝试遍历像这样的数据集:

I tried looping over the datasets like:

li = ['dataset1_path','dataset2_path']

li = ['dataset1_path', 'dataset2_path']

union_df = None
for p in li:
  @transforms_df(
    my_input = Input(p), 
    Output(p+"_output")
  )

  def my_compute_function(my_input):
    return my_input

  if union_df is None:
    union_df = my_compute_function
  else:
    union_df = union_df.union(my_compute_function)

但是,这不会生成联合输出.

But, this doesn't generated the unioned output.

谢谢您的任何帮助.

推荐答案

经过一些更改,它应该可以为您工作,这是带有json文件的动态数据集的示例,您的情况可能会有所不同.这是一种通用的方式,您可以执行动态json输入数据集,该数据集应适用于任何类型的动态输入文件类型或可以指定的内部铸造数据集.这个通用示例正在处理一组json文件,这些文件已上传到平台中的数据集节点.这应该是完全动态的.在此之后进行工会应该很简单.

This should be able to work for you with some changes, this is an example of dynamic dataset with json files, your situation would maybe be only a little different. Here is a generalized way you could be doing dynamic json input datasets that should be adaptable to any type of dynamic input file type or internal to foundry dataset that you can specify. This generic example is working on a set of json files uploaded to a dataset node in the platform. This should be fully dynamic. Doing a union after this should be a simple matter.

这里也有一些奖励记录.

There's some bonus logging going on here as well.

希望这会有所帮助

from transforms.api import Input, Output, transform
from pyspark.sql import functions as F
import json
import logging


def transform_generator():
    transforms = []
    transf_dict = {## enter your dynamic mappings here ##}

    for value in transf_dict:
        @transform(
            out=Output(' path to your output here '.format(val=value)),
            inpt=Input(" path to input here ".format(val=value)),
        )
        def update_set(ctx, inpt, out):
            spark = ctx.spark_session
            sc = spark.sparkContext

            filesystem = list(inpt.filesystem().ls())
            file_dates = []
            for files in filesystem:
                with inpt.filesystem().open(files.path) as fi:
                    data = json.load(fi)
                file_dates.append(data)

            logging.info('info logs:')
            logging.info(file_dates)
            json_object = json.dumps(file_dates)
            df_2 = spark.read.option("multiline", "true").json(sc.parallelize([json_object]))
            df_2 = df_2.withColumn('upload_date', F.current_date())

            df_2.drop_duplicates()
            out.write_dataframe(df_2)
        transforms.append(update_logs)
    return transforms


TRANSFORMS = transform_generator()

这篇关于如何在Palantir Foundry中合并多个动态输入?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

10-30 14:40