本文介绍了Spark Streaming无法使用Spark SQL的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在火花流播放期间我遇到了一个问题.流式传输并传递给"parse"方法后,我得到的是空记录.

I am facing an issue during spark streaming. I am getting empty records after it gets streamed and passes to the "parse" method.

我的代码:

import spark.implicits._
import org.apache.spark.sql.types._
import org.apache.spark.sql.Encoders
import org.apache.spark.streaming._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession
import spark.implicits._
import org.apache.spark.sql.types.{StructType, StructField, StringType,
IntegerType}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession
import spark.implicits._
import org.apache.spark.sql.types.{StructType, StructField, StringType,
IntegerType}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.storage.StorageLevel
import java.util.regex.Pattern
import java.util.regex.Matcher
import org.apache.spark.sql.hive.HiveContext;
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql._

val conf = new SparkConf().setAppName("streamHive").setMaster("local[*]").set("spark.driver.allowMultipleContexts", "true")

val ssc = new StreamingContext(conf, Seconds(5))

val sc=ssc.sparkContext

val lines = ssc.textFileStream("file:///home/sadr/testHive")

case class Prices(name: String, age: String,sex: String, location: String)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

def parse (rdd : org.apache.spark.rdd.RDD[String] ) =
{
var l = rdd.map(_.split(","))
val prices = l.map(p => Prices(p(0),p(1),p(2),p(3)))
val pricesDf = sqlContext.createDataFrame(prices)
pricesDf.registerTempTable("prices")
pricesDf.show()
var x = sqlContext.sql("select count(*) from prices")
x.show()}
lines.foreachRDD { rdd => parse(rdd)}
lines.print()
ssc.start()

我的输入文件:

cat test1.csv

Riaz,32,M,uk
tony,23,M,india
manu,33,M,china
imart,34,F,AUS

我得到以下输出:

lines.foreachRDD { rdd => parse(rdd)}

lines.print()

ssc.start()

scala> +----+---+---+--------+
|name|age|sex|location|
+----+---+---+--------+
+----+---+---+--------+

我正在使用Spark版本2.3 ....我在添加X.SHOW()之后出现错误

I am using Spark version 2.3....I AM GETTING FOLLOWING ERROR AFTER ADDING X.SHOW()

推荐答案

不确定您是否真正能够读取流.

Not sure if you are actually able to read the streams.

textFileStream仅在程序启动后读取添加到目录中的新文件,而不读取现有文件.文件已经在那里吗?如果是,将其从目录中删除,启动程序并再次复制文件?

textFileStream reads only the new files added to the directory after the program starts and not the existing ones. Was the file already there?If yes, remove it from the directory, start the program and copy the file again?

这篇关于Spark Streaming无法使用Spark SQL的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-20 13:25