问题描述
我写了一个小的Java程序,该程序应该监视目录中的新文件,并将它们以binay Avro格式发送给Kafka主题.我是Avro的新手,我是使用Avro文档和在线示例编写的.监视部分运行良好,但是该程序在进行Avro序列化时会在运行时失败.我收到此错误堆栈:
I have written a small Java program that is supposed to monitor a directory for new files and send them in binay Avro format to a Kafka topic.I am new to Avro and I wrote this using Avro documentation and online examples.The monitoring part works well, but the program fails at runtime when it gets to the Avro serialization. I get this error stack:
Exception in thread "main" java.lang.ClassCastException: [B cannot be cast to java.nio.ByteBuffer
at org.apache.avro.generic.GenericDatumWriter.writeBytes(GenericDatumWriter.java:260)
at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:116)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:73)
at org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:153)
at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:143)
at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:105)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:73)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:60)
at producers.AvroBinaryProducer.buildAvroData(AvroBinaryProducer.java:90)
at producers.AvroBinaryProducer.start(AvroBinaryProducer.java:120)
at producers.AvroBinaryProducer.main(AvroBinaryProducer.java:140)
C:\Users\guys\AppData\Local\NetBeans\Cache\8.1\executor-snippets\run.xml:53: Java returned: 1
BUILD FAILED (total time: 7 seconds)
此行失败:writer.write(datum,encoder);
This line is failing: writer.write(datum,encoder);
似乎正在期待一个ByteBuffer,而文档和示例说我应该传递GenericRecord.我在做什么错了?
It seems like it is expecting a ByteBuffer, while the documentation and examples say I should pass the GenericRecord. What am I doing wrong ?
这是我的代码(还有一个名为Config的实用程序类,它从文件中读取配置参数,但这里未包括它):
Here is my code (There is another utility class called Config that reads the configuration params from a file, but I did not include it here):
package producers;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.FileSystems;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.WatchService;
import java.util.Properties;
import org.apache.avro.Schema;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import static java.nio.file.StandardWatchEventKinds.*;
import java.nio.file.WatchEvent;
import java.nio.file.WatchKey;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.EncoderFactory;
/**
*
* @author guys
*/
public class AvroBinaryProducer {
String mySchema;
Schema avroSchema;
Config myConf;
Producer<String, byte[]> producer;
String topic, bootstrapServers, watchDir;
Path path;
ByteArrayOutputStream out;
BinaryEncoder encoder;
public AvroBinaryProducer(String configPath) throws IOException
{
// Read initial configuration
myConf=new Config(configPath);
// first setting the kafka producer stuff
Properties props = new Properties();
props.put("bootstrap.servers",myConf.get("bootstrap.servers"));
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
producer = new KafkaProducer<>(props);
topic=myConf.get("topic");
watchDir=myConf.get("watchdir");
path=FileSystems.getDefault().getPath(watchDir);
// Now define the Avro schema
mySchema="{\n" +
" \"type\": \"record\",\n" +
" \"name\": \"photo\",\n" +
" \"fields\": [\n" +
" {\"name\": \"name\", \"type\": \"string\"},\n" +
" {\"name\": \"data\", \"type\": \"bytes\"}\n" +
" ]\n" +
"}";
Schema.Parser parser = new Schema.Parser();
avroSchema=parser.parse(mySchema);
out = new ByteArrayOutputStream();
encoder = EncoderFactory.get().binaryEncoder( out, null );
}
private byte[] buildAvroData(String name, byte[] data) throws IOException
{
out.reset();
GenericRecord datum=new GenericData.Record(avroSchema);
datum.put("name", name);
datum.put("data",data);
DatumWriter<GenericRecord> writer=new GenericDatumWriter<>(avroSchema);
writer.write(datum,encoder);
encoder.flush();
return out.toByteArray();
}
private void start() throws IOException, InterruptedException
{
String fileName;
byte[] fileData;
WatchService watcher = FileSystems.getDefault().newWatchService();
WatchKey key=path.register(watcher, ENTRY_CREATE);
while (true)
{
key = watcher.take();
// The code gets beyond this point only when a filesystem event occurs
for (WatchEvent<?> event: key.pollEvents())
{
WatchEvent.Kind<?> kind = event.kind();
if (kind==ENTRY_CREATE)
{
WatchEvent<Path> ev = (WatchEvent<Path>)event;
Path filename = ev.context();
fileName=filename.toString();
System.out.println("New file "+fileName+" found !");
// We need this little delay to make sure the file is closed before we read it
Thread.sleep(500);
fileData=Files.readAllBytes(FileSystems.getDefault().getPath(watchDir+File.separator+fileName));
publishMessage(buildAvroData(fileName,fileData));
}
}
key.reset();
}
}
private void publishMessage(byte[] bytes)
{
ProducerRecord <String, byte[]> data =new ProducerRecord<>(topic, bytes);
producer.send(data);
}
public static void main (String args[])
{
AvroBinaryProducer abp;
try {
abp=new AvroBinaryProducer(args[0]);
try {
abp.start();
} catch (InterruptedException ex) {
Logger.getLogger(AvroBinaryProducer.class.getName()).log(Level.SEVERE, null, ex);
}
} catch (IOException ex) {
Logger.getLogger(AvroBinaryProducer.class.getName()).log(Level.SEVERE, null, ex);
}
}
}
谢谢!
推荐答案
这是我解决的方法.如果期望ByteBuffer,请给它ByteBuffer.我将功能更改为:
This is how I solved it. If it expects ByteBuffer lets give it ByteBuffer.I changed the function to:
private byte[] buildAvroData(String name, byte[] data) throws IOException
{
out.reset();
GenericRecord datum=new GenericData.Record(avroSchema);
datum.put("name", name);
datum.put("data",ByteBuffer.wrap(data));
DatumWriter<GenericRecord> writer=new GenericDatumWriter<>(avroSchema);
writer.write(datum,encoder);
encoder.flush();
return out.toByteArray();
我只是用ByteBuffer封装了数据,这行得通.您必须记住在使用者端从ByteBuffer提取bytes数组.
I just wrapped the data with a ByteBuffer and this worked.You have to remember to extract the bytes array from the ByteBuffer at the consumer end.
这篇关于尝试序列化avro记录时无法将B强制转换为java.nio.ByteBuffer的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!