自定义一个Receiver
class SocketTextStreamReceiver(host: String, port: Int( extends NetworkReceiver[String] { protected lazy val blocksGenerator: BlockGenerator = new BlockGenerator(StorageLevel.MEMORY_ONLY_SER_2) protected def onStart() = { blocksGenerator.start() val socket = new Socket(host, port) val dataInputStream = new BufferedReader(new InputStreamReader(socket.getInputStream(), "UTF-8")) var data: String = dataInputStream.readLine() while (data != null) { blocksGenerator += data data = dataInputStream.readLine() } } protected def onStop() { blocksGenerator.stop() } }
An Actor as Receiver
class SocketTextStreamReceiver (host:String, port:Int, bytesToString: ByteString => String) extends Actor with Receiver { override def preStart = IOManager(context.system).connect(host, port) def receive = { case IO.Read(socket, bytes) => pushBlock(bytesToString(bytes)) } }
A Sample Spark Application
val ssc = new StreamingContext(master, "WordCountCustomStreamSource", Seconds(batchDuration)) //使用自定义的receiver val lines = ssc.networkStream[String](new SocketTextStreamReceiver( "localhost", 8445)) //或者使用这个自定义的actor Receiver val lines2 = ssc.actorStream[String](Props(new SocketTextStreamReceiver( "localhost",8445, z => z.utf8String)),"SocketReceiver") */
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
ssc.start()
提交成功之后,启动Netcat测试一下
$ nc -l localhost 8445 hello world hello hello
下面是合并多个输入流的方法:
val lines = ssc.actorStream[String](Props(new SocketTextStreamReceiver( "localhost",8445, z => z.utf8String)),"SocketReceiver") // Another socket stream receiver val lines2 = ssc.actorStream[String](Props(new SocketTextStreamReceiver( "localhost",8446, z => z.utf8String)),"SocketReceiver") val union = lines.union(lines2)