我的用例是我想将Avro数据从Kafka推送到HDFS。加缪似乎是正确的工具,但是我无法使其工作。
我是camus的新手,正在尝试使用camus-example,
https://github.com/linkedin/camus

现在,我正在尝试使camus示例工作。但是我仍然面临问题。

DummyLogKafkaProducerClient的代码段

package com.linkedin.camus.example.schemaregistry;

import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.Random;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

import com.linkedin.camus.etl.kafka.coders.KafkaAvroMessageEncoder;
import com.linkedin.camus.example.records.DummyLog;

public class DummyLogKafkaProducerClient {


    public static void main(String[] args) {

        Properties props = new Properties();

        props.put("metadata.broker.list", "localhost:6667");
        // props.put("serializer.class", "kafka.serializer.StringEncoder");
        // props.put("partitioner.class", "example.producer.SimplePartitioner");
        //props.put("request.required.acks", "1");

        ProducerConfig config = new ProducerConfig(props);

        Producer<String, byte[]> producer = new Producer<String, byte[]>(config);

        KafkaAvroMessageEncoder encoder = get_DUMMY_LOG_Encoder();

        for (int i = 0; i < 500; i++) {
            KeyedMessage<String, byte[]> data = new KeyedMessage<String, byte[]>("DUMMY_LOG", encoder.toBytes(getDummyLog()));
            producer.send(data);

        }
    }

    public static DummyLog getDummyLog() {
        Random random = new Random();
        DummyLog dummyLog = DummyLog.newBuilder().build();
        dummyLog.setId(random.nextLong());
        dummyLog.setLogTime(new Date().getTime());
        Map<CharSequence, CharSequence> machoStuff = new HashMap<CharSequence, CharSequence>();
        machoStuff.put("macho1", "abcd");
        machoStuff.put("macho2", "xyz");
        dummyLog.setMuchoStuff(machoStuff);
        return dummyLog;
    }

    public static KafkaAvroMessageEncoder get_DUMMY_LOG_Encoder() {
        KafkaAvroMessageEncoder encoder = new KafkaAvroMessageEncoder("DUMMY_LOG", null);
        Properties props = new Properties();
        props.put(KafkaAvroMessageEncoder.KAFKA_MESSAGE_CODER_SCHEMA_REGISTRY_CLASS, "com.linkedin.camus.example.schemaregistry.DummySchemaRegistry");
        encoder.init(props, "DUMMY_LOG");
        return encoder;

    }
}

我还添加了默认的无参数构造函数ot DummySchemaRegistry,因为它给出了实例化异常
package com.linkedin.camus.example.schemaregistry;

import org.apache.avro.Schema;
import org.apache.hadoop.conf.Configuration;

import com.linkedin.camus.example.records.DummyLog;
import com.linkedin.camus.example.records.DummyLog2;
import com.linkedin.camus.schemaregistry.MemorySchemaRegistry;

/**
 * This is a little dummy registry that just uses a memory-backed schema registry to store two dummy Avro schemas. You
 * can use this with camus.properties
 */
public class DummySchemaRegistry extends MemorySchemaRegistry<Schema> {
    public DummySchemaRegistry(Configuration conf) {
        super();
        super.register("DUMMY_LOG", DummyLog.newBuilder().build().getSchema());
        super.register("DUMMY_LOG_2", DummyLog2.newBuilder().build()
                .getSchema());
    }
    public DummySchemaRegistry() {
        super();
        super.register("DUMMY_LOG", DummyLog.newBuilder().build().getSchema());
        super.register("DUMMY_LOG_2", DummyLog2.newBuilder().build().getSchema());
    }
}

运行程序后我得到的异常跟踪下面

最佳答案

我想camus希望Avro模式具有默认值。我将我的dummyLog.avsc更改为跟随并重新编译-

{
“namespace”:“com.linkedin.camus.example.records”,
“type”:“记录”,
“name”:“DummyLog”,
“doc”:“记录不太重要的内容。”,
“字段”:[
{
“name”:“id”,
“type”:“int”,
“默认”:0
},
{
“name”:“logTime”,
“type”:“int”,
“默认”:0
}
]
}

请让我知道这对你有没有用。

谢谢,
安巴

08-28 15:17