Spark SQL Java Unable to convert fromTuple to Row and Dataframe

问题: I am trying to create Dataset<Row> object from JavaRDD<Tuple2<Row, Row>> object. I am following the below steps, Convert Java<Tuple2<Row,Row...

问题:

I am trying to create Dataset<Row> object from JavaRDD<Tuple2<Row, Row>> object.

I am following the below steps,

  1. Convert Java<Tuple2<Row,Row>> to JavaRDD<Row>
  2. Use toDataset() function of sqlContext with schema to convert into Dataset.

But, in first step I am not able to use Row.fromTuple() function like scala in code. In second step, I am not able to convert using rowTag.

It's showing me below runtime error.

Error: java: cannot find symbol
  symbol:   method fromTuple(scala.Tuple2<org.apache.spark.sql.Row,org.apache.spark.sql.Row>)
  location: interface org.apache.spark.sql.Row

I have tried to convert like below

ClassTag<Row> rowTag = scala.reflect.ClassTag$.MODULE$.apply(Row.class);

private Dataset<Row> joinResults(SparkSession session, RDD<Tuple2<Row, Row>> resultRDD) {
    JavaRDD<Tuple2<Row, Row>> results = resultRDD.toJavaRDD();

    JavaRDD<Row> ds = results.map(new Function<Tuple2<Row, Row>, Row>() {
        @Override
        public Row call(Tuple2<Row, Row> rowRowTuple2) throws Exception {
            return Row.fromTuple(rowRowTuple2); // run time error
        }
    });
    
    return session.sqlContext().createDataset(ds, rowTag); //gives error
}

Any help will be appreciated. I am using LuceneRDD Record Linkage, which gives me RDD back so I don't have the option to do operations directly on Dataset. I don't want to create schema/encoder every time because that will limit the use of linkage function. I am using Scala 2.11 and Spark 2.4.3 libs.


回答1:

  1. .createDataset() accepts RDD<T> not JavaRDD<T>. JavaRDD: You need to use ds.rdd()

  2. You need to create and pass org.apache.spark.sql.catalyst.encoders.RowEncoder

  3. Do not create a row of rows by Row.fromTuple(rowRowTuple2) (i.e. a row in which every element is a row). A single row should contain primitive types or nested structs (example).


回答2:

Perhaps this is useful -

Tuple2<Row, Row> -> Dataset<Row>

  StructType schema = new StructType()
                .add(new StructField("id", DataTypes.IntegerType, true, Metadata.empty()))
                .add(new StructField("name", DataTypes.StringType, true, Metadata.empty()));

        JavaRDD<Tuple2<Row, Row>> tuple2JavaRDD = new JavaSparkContext(spark.sparkContext())
                .parallelize(
                        Arrays.asList(Tuple2.apply(RowFactory.create(1), RowFactory.create("a")),
                                Tuple2.apply(RowFactory.create(2), RowFactory.create("b")))
                );
        JavaRDD<Row> rowJavaRDD1 = tuple2JavaRDD.map(t -> Row$.MODULE$.merge(
                toScalaSeq(Arrays.asList(t._1, t._2))
        ));
        Dataset<Row> df1 = spark.createDataFrame(rowJavaRDD1, schema);
        df1.show(false);
        df1.printSchema();
        /**
         * +---+----+
         * |id |name|
         * +---+----+
         * |1  |a   |
         * |2  |b   |
         * +---+----+
         *
         * root
         *  |-- id: integer (nullable = true)
         *  |-- name: string (nullable = true)
         */

Tuple2<Integer, String> -> Dataset<Row>


        JavaRDD<Tuple2<Integer, String>> resultRDD = new JavaSparkContext(spark.sparkContext())
                .parallelize(Arrays.asList(Tuple2.apply(1, "a"), Tuple2.apply(2, "b")));
        JavaRDD<Row> rowJavaRDD = resultRDD.map(Row$.MODULE$::fromTuple);
        Dataset<Row> dataFrame = spark.createDataFrame(rowJavaRDD, schema);
        dataFrame.show(false);
        dataFrame.printSchema();
        /**
         * +---+----+
         * |id |name|
         * +---+----+
         * |1  |a   |
         * |2  |b   |
         * +---+----+
         *
         * root
         *  |-- id: integer (nullable = true)
         *  |-- name: string (nullable = true)
         */

Most of the spark apis work on scala seq, it's better to have below utility handy to convert java List -> scala Sequence

 <T> Buffer<T> toScalaSeq(List<T> list) {
        return JavaConversions.asScalaBuffer(list);
    }
  • 发表于 2020-06-27 20:38
  • 阅读 ( 123 )
  • 分类:sof

条评论

请先 登录 后评论
不写代码的码农
小编

篇文章

作家榜 »

  1. 小编 文章
返回顶部
部分文章转自于网络,若有侵权请联系我们删除