问题:
I am trying to create Dataset<Row> object from JavaRDD<Tuple2<Row, Row>> object.
I am following the below steps,
Convert Java<Tuple2<Row,Row...
可以将文章内容翻译成中文,广告屏蔽插件会导致该功能失效:
问题:
I am trying to create Dataset<Row>
object from JavaRDD<Tuple2<Row, Row>>
object.
I am following the below steps,
- Convert
Java<Tuple2<Row,Row>>
to JavaRDD<Row>
- Use
toDataset()
function of sqlContext
with schema to convert into Dataset.
But, in first step I am not able to use Row.fromTuple()
function like scala in code. In second step, I am not able to convert using rowTag.
It's showing me below runtime error.
Error: java: cannot find symbol
symbol: method fromTuple(scala.Tuple2<org.apache.spark.sql.Row,org.apache.spark.sql.Row>)
location: interface org.apache.spark.sql.Row
I have tried to convert like below
ClassTag<Row> rowTag = scala.reflect.ClassTag$.MODULE$.apply(Row.class);
private Dataset<Row> joinResults(SparkSession session, RDD<Tuple2<Row, Row>> resultRDD) {
JavaRDD<Tuple2<Row, Row>> results = resultRDD.toJavaRDD();
JavaRDD<Row> ds = results.map(new Function<Tuple2<Row, Row>, Row>() {
@Override
public Row call(Tuple2<Row, Row> rowRowTuple2) throws Exception {
return Row.fromTuple(rowRowTuple2); // run time error
}
});
return session.sqlContext().createDataset(ds, rowTag); //gives error
}
Any help will be appreciated. I am using LuceneRDD Record Linkage, which gives me RDD back so I don't have the option to do operations directly on Dataset. I don't want to create schema/encoder every time because that will limit the use of linkage function. I am using Scala 2.11 and Spark 2.4.3 libs.
回答1:
.createDataset()
accepts RDD<T>
not JavaRDD<T>.
JavaRDD: You need to use ds.rdd()
You need to create and pass org.apache.spark.sql.catalyst.encoders.RowEncoder
Do not create a row of rows by Row.fromTuple(rowRowTuple2)
(i.e. a row in which every element is a row). A single row should contain primitive types or nested structs (example).
回答2:
Perhaps this is useful -
Tuple2<Row, Row>
-> Dataset<Row>
StructType schema = new StructType()
.add(new StructField("id", DataTypes.IntegerType, true, Metadata.empty()))
.add(new StructField("name", DataTypes.StringType, true, Metadata.empty()));
JavaRDD<Tuple2<Row, Row>> tuple2JavaRDD = new JavaSparkContext(spark.sparkContext())
.parallelize(
Arrays.asList(Tuple2.apply(RowFactory.create(1), RowFactory.create("a")),
Tuple2.apply(RowFactory.create(2), RowFactory.create("b")))
);
JavaRDD<Row> rowJavaRDD1 = tuple2JavaRDD.map(t -> Row$.MODULE$.merge(
toScalaSeq(Arrays.asList(t._1, t._2))
));
Dataset<Row> df1 = spark.createDataFrame(rowJavaRDD1, schema);
df1.show(false);
df1.printSchema();
/**
* +---+----+
* |id |name|
* +---+----+
* |1 |a |
* |2 |b |
* +---+----+
*
* root
* |-- id: integer (nullable = true)
* |-- name: string (nullable = true)
*/
Tuple2<Integer, String>
-> Dataset<Row>
JavaRDD<Tuple2<Integer, String>> resultRDD = new JavaSparkContext(spark.sparkContext())
.parallelize(Arrays.asList(Tuple2.apply(1, "a"), Tuple2.apply(2, "b")));
JavaRDD<Row> rowJavaRDD = resultRDD.map(Row$.MODULE$::fromTuple);
Dataset<Row> dataFrame = spark.createDataFrame(rowJavaRDD, schema);
dataFrame.show(false);
dataFrame.printSchema();
/**
* +---+----+
* |id |name|
* +---+----+
* |1 |a |
* |2 |b |
* +---+----+
*
* root
* |-- id: integer (nullable = true)
* |-- name: string (nullable = true)
*/
Most of the spark apis work on scala seq, it's better to have below utility handy to convert java List -> scala Sequence
<T> Buffer<T> toScalaSeq(List<T> list) {
return JavaConversions.asScalaBuffer(list);
}