我有一个包含userId(字符串),itemId(整数)和rating(整数)列的数据集。

+----------+----------+---------+
| userId   |  itemId  |  rating |
+----------+----------+---------+
|  abc13   |    23    |    1    |
+----------+----------+---------+
|  qwe34   |    56    |    3    |
+----------+----------+---------+
|  qwe34   |    35    |    4    |
+----------+----------+---------+

我想将字符串userIds映射到唯一的长值。我尝试使用zipWithUniqueId()映射userIds,它给出了pairRDD
+------------+----------------+
|   userId   |  userIdMapped  |
+------------+----------------+
|    abc13   |        0       |
+------------+----------------+
|    qwe34   |        1       |
+------------+----------------+

我想将长值添加到另一列并创建数据集,如下所示:
+----------+----------+---------+----------------+
| userId   |  itemId  |  rating |  userIdMapped  |
+----------+----------+---------+----------------+
|  abc13   |    23    |    1    |       0        |
+----------+----------+---------+----------------+
|  qwe34   |    56    |    3    |       1        |
+----------+----------+---------+----------------+
|  qwe34   |    35    |    4    |       1        |
+----------+----------+---------+----------------+

我尝试了以下方法:
JavaRDD<Feedback> feedbackRDD = spark.read().jdbc(MYSQL_CONNECTION_URL, feedbackQuery, connectionProperties)
            .javaRDD().map(Feedback.mapFunc);
JavaPairRDD<String, Long> mappedPairRDD = feedbackRDD.map(new Function<Feedback, String>() {
    public String call(Feedback p) throws Exception {
        return p.getUserId();
    }).distinct().zipWithUniqueId();
Dataset<Row> feedbackDS = spark.createDataFrame(feedbackRDD, Feedback.class);
Dataset<String> stringIds = spark.createDataset(zipped.keys().collect(), Encoders.STRING());
Dataset<Long> valueIds = spark.createDataset(zipped.values().collect(), Encoders.LONG());
Dataset<Row> longIds = valueIds.withColumnRenamed("value", "userIdMapped");
Dataset<Row> userIdMap = intIds.join(stringIds);
Dataset<Row> feedbackDSUserMapped = feedbackDS.join(userIdMap, feedbackDS.col("userId").equalTo(userIdMap.col("value")),
            "inner");
//Here 'value' column contains string user ids
userIdMap数据集未正确连接,如下所示:
+-----------------+----------------+
|   userIdMapped  |     value      |
+-----------------+----------------+
|         0       |     abc13      |
+-----------------+----------------+
|         0       |     qwe34      |
+-----------------+----------------+
|         1       |     abc13      |
+-----------------+----------------+
|         1       |     qwe34      |
+-----------------+----------------+

因此,生成的feedbackDSUserMapped是错误的。

我是Spark的新手,我敢肯定必须有一种更好的方法来做到这一点。

pairRDD获取长值并将其设置为初始数据集中的相关userId的最佳方法是什么(RDD)?

任何帮助将非常感激。

数据将用于ALS模型。

最佳答案

您可以尝试以下方法。使用内置函数以及与原始数据集的连接分配唯一的ID

/**
 * Created by RGOVIND on 11/16/2016.
 */

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.*;

import java.util.ArrayList;
import java.util.List;

public class SparkUserObjectMain {
    static public void main(String[] args) {
        SparkConf conf = new SparkConf().setMaster("local").setAppName("Stack Overflow App");
        JavaSparkContext sc = new JavaSparkContext(conf);
        SQLContext sqlContext = new SQLContext(sc);
        List<UserObject> users = new ArrayList<UserObject>();

        //seed the data
        UserObject user1 = new UserObject("abc13", "23", "1");
        UserObject user2 = new UserObject("qwe34", "56", "3");
        UserObject user3 = new UserObject("qwe34", "35", "4");
        users.add(user1);
        users.add(user2);
        users.add(user3);

        //how to encode the object ?
        Encoder<UserObject> userObjectEncoder = Encoders.bean(UserObject.class);
        //Create the user dataset
        Dataset<UserObject> usersDataSet = sqlContext.createDataset(users, userObjectEncoder);
        //assign unique id's
        Dataset<Row> uniqueUsersWithId = usersDataSet.dropDuplicates("userId").select("userId").withColumn("id", functions.monotonically_increasing_id());
        //join with original
        Dataset<Row> joinedDataSet = usersDataSet.join(uniqueUsersWithId, "userId");
        joinedDataSet.show();

    }
}

豆:
/**
 * Created by RGOVIND on 11/16/2016.
 */
public class UserObject {

    private String userId;
    private String itemId;
    private String rating;

public UserObject(){

    }

    public UserObject(String userId, String itemId, String rating) {
        this.userId = userId;
        this.itemId = itemId;
        this.rating = rating;
    }

    public String getUserId() {

        return userId;
    }

    public void setUserId(String userId) {
        this.userId = userId;
    }

    public String getItemId() {
        return itemId;
    }

    public void setItemId(String itemId) {
        this.itemId = itemId;
    }

    public String getRating() {
        return rating;
    }

    public void setRating(String rating) {
        this.rating = rating;
    }

}

版画:
+------+------+------+------------+
|userId|itemId|rating|          id|
+------+------+------+------------+
| abc13|    23|     1|403726925824|
| qwe34|    56|     3|901943132160|
| qwe34|    35|     4|901943132160|
+------+------+------+------------+

10-08 13:21