本文介绍了尝试序列化 avro 记录时,B 无法转换为 java.nio.ByteBuffer的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我编写了一个小型 Java 程序,用于监视新文件的目录并将它们以二进制 Avro 格式发送到 Kafka 主题.我是 Avro 的新手,我是使用 Avro 文档和在线示例编写的.监控部分运行良好,但程序在运行时遇到 Avro 序列化失败.我收到此错误堆栈:

I have written a small Java program that is supposed to monitor a directory for new files and send them in binay Avro format to a Kafka topic.I am new to Avro and I wrote this using Avro documentation and online examples.The monitoring part works well, but the program fails at runtime when it gets to the Avro serialization. I get this error stack:

Exception in thread "main" java.lang.ClassCastException: [B cannot be cast to java.nio.ByteBuffer
    at org.apache.avro.generic.GenericDatumWriter.writeBytes(GenericDatumWriter.java:260)
    at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:116)
    at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:73)
    at org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:153)
    at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:143)
    at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:105)
    at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:73)
    at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:60)
    at producers.AvroBinaryProducer.buildAvroData(AvroBinaryProducer.java:90)
    at producers.AvroBinaryProducer.start(AvroBinaryProducer.java:120)
    at producers.AvroBinaryProducer.main(AvroBinaryProducer.java:140)
C:\Users\guys\AppData\Local\NetBeans\Cache\8.1\executor-snippets\run.xml:53: Java returned: 1
BUILD FAILED (total time: 7 seconds)

这一行失败了:writer.write(datum,encoder);

This line is failing: writer.write(datum,encoder);

它似乎需要一个 ByteBuffer,而文档和示例说我应该通过 GenericRecord.我做错了什么?

It seems like it is expecting a ByteBuffer, while the documentation and examples say I should pass the GenericRecord. What am I doing wrong ?

这是我的代码(还有另一个名为 Config 的实用程序类从文件中读取配置参数,但我没有在此处包含它):

Here is my code (There is another utility class called Config that reads the configuration params from a file, but I did not include it here):

package producers;

import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.FileSystems;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.WatchService;
import java.util.Properties;
import org.apache.avro.Schema;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import static java.nio.file.StandardWatchEventKinds.*;
import java.nio.file.WatchEvent;
import java.nio.file.WatchKey;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.EncoderFactory;


/**
 *
 * @author guys
 */
public class AvroBinaryProducer {
    String mySchema;
    Schema avroSchema;
    Config myConf;  
    Producer<String, byte[]> producer;
    String topic, bootstrapServers, watchDir; 
    Path path;
    ByteArrayOutputStream out;
    BinaryEncoder encoder;


    public AvroBinaryProducer(String configPath) throws IOException
    {
        // Read initial configuration
        myConf=new Config(configPath);

        // first setting the kafka producer stuff
        Properties props = new Properties();   
        props.put("bootstrap.servers",myConf.get("bootstrap.servers"));        
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        producer = new KafkaProducer<>(props);
        topic=myConf.get("topic"); 
        watchDir=myConf.get("watchdir");
        path=FileSystems.getDefault().getPath(watchDir);

        // Now define the Avro schema
        mySchema="{\n" +
        " \"type\": \"record\",\n" +
        " \"name\": \"photo\",\n" +
        " \"fields\": [\n" +
        "     {\"name\": \"name\", \"type\": \"string\"},\n" +
        "     {\"name\": \"data\",  \"type\": \"bytes\"}\n" +
        " ]\n" +
        "}";

        Schema.Parser parser = new Schema.Parser();
        avroSchema=parser.parse(mySchema);   

        out = new ByteArrayOutputStream();
        encoder = EncoderFactory.get().binaryEncoder( out, null );


    }

    private byte[] buildAvroData(String name, byte[] data) throws IOException
    {       
        out.reset();                       
        GenericRecord datum=new GenericData.Record(avroSchema);        
        datum.put("name", name);
        datum.put("data",data);
        DatumWriter<GenericRecord> writer=new GenericDatumWriter<>(avroSchema);        
        writer.write(datum,encoder);
    encoder.flush();
        return out.toByteArray();        
    }

    private void start() throws IOException, InterruptedException
    {
        String fileName;
        byte[] fileData;       

        WatchService watcher = FileSystems.getDefault().newWatchService();
        WatchKey key=path.register(watcher, ENTRY_CREATE);

        while (true)
        {
            key = watcher.take();
            // The code gets beyond this point only when a filesystem event occurs

            for (WatchEvent<?> event: key.pollEvents()) 
            {
                WatchEvent.Kind<?> kind = event.kind();
                if (kind==ENTRY_CREATE)
                {
                    WatchEvent<Path> ev = (WatchEvent<Path>)event;
                    Path filename = ev.context();
                    fileName=filename.toString();
                    System.out.println("New file "+fileName+" found !");
                    // We need this little delay to make sure the file is closed before we read it
                    Thread.sleep(500);
                    fileData=Files.readAllBytes(FileSystems.getDefault().getPath(watchDir+File.separator+fileName));
                    publishMessage(buildAvroData(fileName,fileData));
                }
            }
            key.reset();
        }
    }

    private void publishMessage(byte[] bytes) 
    {        
        ProducerRecord <String, byte[]> data =new ProducerRecord<>(topic, bytes);
        producer.send(data);

    }

    public static void main (String args[])
    {
        AvroBinaryProducer abp;
        try {
            abp=new AvroBinaryProducer(args[0]);
            try {
                abp.start();
            } catch (InterruptedException ex) {
                Logger.getLogger(AvroBinaryProducer.class.getName()).log(Level.SEVERE, null, ex);
            }
        } catch (IOException ex) {
            Logger.getLogger(AvroBinaryProducer.class.getName()).log(Level.SEVERE, null, ex);
        }
    }
}

谢谢!

推荐答案

我就是这样解决的.如果它需要 ByteBuffer,就给它 ByteBuffer.我将功能更改为:

This is how I solved it. If it expects ByteBuffer lets give it ByteBuffer.I changed the function to:

private byte[] buildAvroData(String name, byte[] data) throws IOException
{       
    out.reset(); 
    GenericRecord datum=new GenericData.Record(avroSchema);        
    datum.put("name", name);
    datum.put("data",ByteBuffer.wrap(data));
    DatumWriter<GenericRecord> writer=new GenericDatumWriter<>(avroSchema);        
    writer.write(datum,encoder);
encoder.flush();
    return out.toByteArray(); 

我只是用 ByteBuffer 包装了数据,这奏效了.一定要记得在消费者端从ByteBuffer中提取bytes数组.

I just wrapped the data with a ByteBuffer and this worked.You have to remember to extract the bytes array from the ByteBuffer at the consumer end.

这篇关于尝试序列化 avro 记录时,B 无法转换为 java.nio.ByteBuffer的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

09-18 05:35