我在javascript中创建了一个producer,并开始将消息推送到kafka主题中。我不知道为什么这里是我的卡夫卡制片人代码。
v
以下是nodejs服务器日志,表明邮件已发送到kafka主题:
data arrived
get into postdata
kafka producer is connected and ready.
Sent payload to Kafka: [ { topic: 'test1',
message:
{ values:
'{"AlternateUniqueKey":"","DownloadedDeviceUniqueKey”:”1235”,”NetworkInfo":{"SIM1":false,"IMEI1":"","NetworkType1":"","OperatorName1":"","PhoneNumber1":"","PhoneType1":"","SignalStrength1":0,"SimCountryCode1":"","SimSerialNumber1":"","StateOfService1":"","SIM2":false,"IMEI2":"","NetworkType2":"","OperatorName2":"","PhoneNumber2":"","PhoneType2":"","SignalStrength2":0,"SimCountryCode2":"","SimSerialNumber2":"","StateOfService2":"","Error":"android.permission.READ_PHONE_STATEpermission not granted","Status":-1},"FirstBootDate":"2018-12-06T16:21:35.744+0530","DeviceID":"39a2afecbe00dae1","PhoneInfo":{"SDKVersion":27,"AndroidVersion":"8.1.0","Brand":"LAVA","Device":"Z50","Hardware":"mt6735","IMEI1":"","IMEI2":"","IsRooted":false,"IsRootedString":"No","Manufacturer":"LAVA","Model":"Z50","Product":"Z50","Serial":"","SoftwareVersion":"1528860449","Status":1},"Battery":{"BatteryState":"Discharging","Capacity":0,"Health":"Good","Level":83,"Status":1,"Temperature":24,"Voltage":0},"CreatedDate":"2018-12-06T16:21:35.770+0530","ConsumerID":0,"WiFi":{"BSSID":"02:00:00:00:00:00","Frequency":2462,"Is5GHz":false,"MACAddress":"02:00:00:00:00:00","MaxWifiSpeed":65,"SSID":"<unknown ssid>","SignalStrength":-68,"Status":1},"device":{"Brand":"LAVA","AlternateUniqueKey":"","ProductName":"Z50","DownloadedDeviceUniqueKey":"","Device":"Z50","Manufacturer":"LAVA","ProductUniqueID":""},"ProductUniqueID":"","status":1,"app":"Servify","timezone":"+0530","version":"53","languagecode":1,"LanguageID":1,"LanguageCode":"en","CountryCode":"IN","CountryID":105,"PhoneCode":91,"sourcedevice":"Android","skipMapping":true}' },
partition: 0,
attributes: 0 } ]
result: { test1: { '0': 41 } }
Kafka主题日志文件中的数据格式不可读:
8��Z������������������������������
8��Z������������������������������
以下是消费者产出:
kafka-console-consumer --bootstrap-server localhost:9092 --topic test1 --from-beginning
null
null
null
null
null
非常感谢您的帮助!
最佳答案
您在payloads
中使用了错误的键,它应该是messages:messages
,而不是message:messages
。
完整示例
var kafka = require('kafka-node');
var topicName = 'test1';
var client = new kafka.Client('localhost:2181');
var producer = new kafka.HighLevelProducer(client);
var messages = 'hello world';
console.log('get into postdata');
payloads = [{topic: topicName, messages: messages, partition:0}];
producer.on('ready', function() {
producer.send(payloads, function(error, result) {
console.info('Sent payload to Kafka: ', payloads);
if (error) {
console.error(error);
} else {
console.log('result: ', result);
}
});
});