我有一个火花dataFrame如下:

输入

+----------+-------------+------------------+-----------+------------+-----------+--------------+------------------+--------------+------+---------+------+--------+----------+----------+
| accountId|accountNumber|acctNumberTypeCode|cisDivision|currencyCode|priceItemCd|priceItemParam|priceItemParamCode|processingDate|txnAmt|  txnDttm|txnVol|udfChar1|  udfChar2|  udfChar3|
+----------+-------------+------------------+-----------+------------+-----------+--------------+------------------+--------------+------+---------+------+--------+----------+----------+
|2032000000|   2032000000|          C1_F_ANO|         CA|         USD| PRICEITEM2|            UK|           Country|    2018-06-06|   100|28-MAY-18|   100|   TYPE1|PRICEITEM1|PRICEITEM2|
|2032000000|   2032000000|          C1_F_ANO|         CA|         USD| PRICEITEM2|            UK|           Country|    2018-06-06|   100|28-MAY-18|   100|   TYPE1|PRICEITEM1|PRICEITEM2|
|1322000000|   1322000000|          C1_F_ANO|         CA|         USD| PRICEITEM1|            US|           Country|    2018-06-06|   100|28-MAY-18|   100|   TYPE1|PRICEITEM1|PRICEITEM2|
|1322000000|   1322000000|          C1_F_ANO|         CA|         USD| PRICEITEM1|            US|           Country|    2018-06-06|   100|28-MAY-18|   100|   TYPE1|PRICEITEM1|PRICEITEM2|


现在我要表演


具有相同accountId和帐号的记录的“ txnAmt”列的总和。
删除重复的记录。


输出量

+----------+-------------+------------------+-----------+------------+-----------+--------------+------------------+--------------+------+---------+------+--------+----------+----------+
| accountId|accountNumber|acctNumberTypeCode|cisDivision|currencyCode|priceItemCd|priceItemParam|priceItemParamCode|processingDate|txnAmt|  txnDttm|txnVol|udfChar1|  udfChar2|  udfChar3|
+----------+-------------+------------------+-----------+------------+-----------+--------------+------------------+--------------+------+---------+------+--------+----------+----------+
|2032000000|   2032000000|          C1_F_ANO|         CA|         USD| PRICEITEM2|            UK|           Country|    2018-06-06|   200|28-MAY-18|   100|   TYPE1|PRICEITEM1|PRICEITEM2|
|1322000000|   1322000000|          C1_F_ANO|         CA|         USD| PRICEITEM1|            US|           Country|    2018-06-06|   200|28-MAY-18|   100|   TYPE1|PRICEITEM1|PRICEITEM2|


我不确定如何执行步骤1?

我已经编写了执行步骤2的代码,根据accountId和帐号删除重复项:

String[] colNames = {"accountId", "accountNumber"};
Dataset<RuleOutputParams> finalDs = rulesParamDS.dropDuplicates(colNames);


有人可以帮忙吗?

最佳答案

加载数据并为其创建一个SQL表

val df = spark.read.format("csv").option("header", true).load("data.csv")
df.createOrReplaceTempView("t")


然后,将您需要的称为Window Aggregation functions,再加上row_number()的技巧来删除重复项

val df2 = spark.sql("""SELECT * FROM (
  SELECT *,
    sum(txnAmt) OVER (PARTITION BY accountId, accountNumber) s,
    row_number() OVER (PARTITION BY accountId, accountNumber ORDER BY processingDate) r FROM t)
  WHERE r=1""")
  .drop("txnAmt", "r")
  .withColumnRenamed("s", "txnAmt")


如果显示出来,您会看到

+----------+-------------+------------------+-----------+------------+-----------+--------------+------------------+--------------+---------+------+--------+----------+----------+------+
| accountId|accountNumber|acctNumberTypeCode|cisDivision|currencyCode|priceItemCd|priceItemParam|priceItemParamCode|processingDate|  txnDttm|txnVol|udfChar1|  udfChar2|  udfChar3|txnAmt|
+----------+-------------+------------------+-----------+------------+-----------+--------------+------------------+--------------+---------+------+--------+----------+----------+------+
|2032000000|   2032000000|          C1_F_ANO|         CA|         USD| PRICEITEM2|            UK|           Country|    2018-06-06|28-MAY-18|   100|   TYPE1|PRICEITEM1|PRICEITEM2| 200.0|
|1322000000|   1322000000|          C1_F_ANO|         CA|         USD| PRICEITEM1|            US|           Country|    2018-06-06|28-MAY-18|   100|   TYPE1|PRICEITEM1|PRICEITEM2| 200.0|
+----------+-------------+------------------+-----------+------------+-----------+--------------+------------------+--------------+---------+------+--------+----------+----------+------+




附带说明一下,您可能会尝试在以下内容中添加更多列,但是您需要将其添加到group by子句中

spark.sql("SELECT accountId, accountNumber, SUM(txnAmt) txnAmt FROM t GROUP BY accountId, accountNumber").show
+----------+-------------+------+
| accountId|accountNumber|txnAmt|
+----------+-------------+------+
|2032000000|   2032000000| 200.0|
|1322000000|   1322000000| 200.0|
+----------+-------------+------+

关于java - Spark Java DataFrame基于列求和和删除重复项,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/50713641/

10-14 12:46