问题描述
当我在地图中使用richfatMapFunction 从hbase 读取时,出现序列化错误.我想要做的是如果数据流等于从 hbase else 忽略读取的特定字符串.下面是我得到的示例程序和错误.
When I read from hbase using richfatMapFunction inside a map I am getting serialization error. What I am trying to do is if a datastream equals to a particular string read from hbase else ignore. Below is the sample program and error I am getting.
package com.abb.Flinktest
import java.text.SimpleDateFormat
import java.util.Properties
import scala.collection.concurrent.TrieMap
import org.apache.flink.addons.hbase.TableInputFormat
import org.apache.flink.api.common.functions.RichFlatMapFunction
import org.apache.flink.api.common.io.OutputFormat
import org.apache.flink.api.java.tuple.Tuple2
import org.apache.flink.streaming.api.scala.DataStream
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala.createTypeInformation
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
import org.apache.flink.util.Collector
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.TableName
import org.apache.hadoop.hbase.client.ConnectionFactory
import org.apache.hadoop.hbase.client.HTable
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.client.Scan
import org.apache.hadoop.hbase.filter.BinaryComparator
import org.apache.hadoop.hbase.filter.CompareFilter
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter
import org.apache.hadoop.hbase.util.Bytes
import org.apache.log4j.Level
import org.apache.flink.api.common.functions.RichMapFunction
object Flinktesthbaseread {
def main(args:Array[String])
{
val env = StreamExecutionEnvironment.createLocalEnvironment()
val kafkaStream = env.fromElements("hello")
val c=kafkaStream.map(x => if(x.equals("hello"))kafkaStream.flatMap(new ReadHbase()) )
env.execute()
}
class ReadHbase extends RichFlatMapFunction[String,Tuple11[String,String,String,String,String,String,String,String,String,String,String]] with Serializable
{
var conf: org.apache.hadoop.conf.Configuration = null;
var table: org.apache.hadoop.hbase.client.HTable = null;
var hbaseconnection:org.apache.hadoop.hbase.client.Connection =null
var taskNumber: String = null;
var rowNumber = 0;
val serialVersionUID = 1L;
override def open(parameters: org.apache.flink.configuration.Configuration) {
println("getting table")
conf = HBaseConfiguration.create()
val in = getClass().getResourceAsStream("/hbase-site.xml")
conf.addResource(in)
hbaseconnection = ConnectionFactory.createConnection(conf)
table = new HTable(conf, "testtable");
// this.taskNumber = String.valueOf(taskNumber);
}
override def flatMap(msg:String,out:Collector[Tuple11[String,String,String,String,String,String,String,String,String,String,String]])
{
//flatmap operation here
}
override def close() {
table.flushCommits();
table.close();
}
}
}
错误:
log4j:WARN No appenders could be found for logger (org.apache.flink.api.scala.ClosureCleaner$).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: Task not serializable
at org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:172)
at org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:164)
at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.scalaClean(StreamExecutionEnvironment.scala:617)
at org.apache.flink.streaming.api.scala.DataStream.clean(DataStream.scala:959)
at org.apache.flink.streaming.api.scala.DataStream.map(DataStream.scala:484)
at com.abb.Flinktest.Flinktesthbaseread$.main(Flinktesthbaseread.scala:45)
at com.abb.Flinktest.Flinktesthbaseread.main(Flinktesthbaseread.scala)
Caused by: java.io.NotSerializableException: org.apache.flink.streaming.api.scala.DataStream
- field (class "com.abb.Flinktest.Flinktesthbaseread$$anonfun$1", name: "kafkaStream$1", type: "class org.apache.flink.streaming.api.scala.DataStream")
- root object (class "com.abb.Flinktest.Flinktesthbaseread$$anonfun$1", <function1>)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1182)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:301)
at org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:170)
... 6 more
我尝试通过使类可序列化来将字段包装在方法和类中,但没有成功.有人可以对此有所了解或为此提出一些解决方法.
I tried wrapping the field inside a method and a class by making the class serializable as wel, but no luck. Could someone throw some lights on this or suggest some workaround for this.
推荐答案
问题是您试图访问 map 函数中的 kafka 流变量,该变量根本无法序列化.它只是数据的抽象表示.它不包含任何内容,这首先会使您的函数无效.
The problem is that you're trying to access the kafka stream variable in the map function which is simply not serializable. It is just an abstract representation of the data. It doesn't contain anything, which invalidates your function in the first place.
相反,请执行以下操作:
instead, do something like this:
kafkaStream.filter(x => x.equals("hello")).flatMap(new ReadHBase())
过滤器函数将只保留条件为真的元素,并将这些元素传递给您的 flatMap 函数.
The filter funtion will only retain the elements for which the condition is true, and those will be passed to your flatMap function.
我强烈建议您阅读基础 API 概念文档,因为对于指定转换时实际发生的情况似乎存在一些误解.
I would highly recommend you to read the basis API concepts documentation, as there appears to be some misunderstanding as to what actually happens when you specify a transformation.
这篇关于从hbase读取时Flink抛出序列化错误的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!