我在javascript中创建了一个producer,并开始将消息推送到kafka主题中。我不知道为什么这里是我的卡夫卡制片人代码。

        v

以下是nodejs服务器日志,表明邮件已发送到kafka主题:
data arrived
get into postdata
kafka producer is connected and ready.
Sent payload to Kafka:  [ { topic: 'test1',
message:
 { values:
    '{"AlternateUniqueKey":"","DownloadedDeviceUniqueKey”:”1235”,”NetworkInfo":{"SIM1":false,"IMEI1":"","NetworkType1":"","OperatorName1":"","PhoneNumber1":"","PhoneType1":"","SignalStrength1":0,"SimCountryCode1":"","SimSerialNumber1":"","StateOfService1":"","SIM2":false,"IMEI2":"","NetworkType2":"","OperatorName2":"","PhoneNumber2":"","PhoneType2":"","SignalStrength2":0,"SimCountryCode2":"","SimSerialNumber2":"","StateOfService2":"","Error":"android.permission.READ_PHONE_STATEpermission not granted","Status":-1},"FirstBootDate":"2018-12-06T16:21:35.744+0530","DeviceID":"39a2afecbe00dae1","PhoneInfo":{"SDKVersion":27,"AndroidVersion":"8.1.0","Brand":"LAVA","Device":"Z50","Hardware":"mt6735","IMEI1":"","IMEI2":"","IsRooted":false,"IsRootedString":"No","Manufacturer":"LAVA","Model":"Z50","Product":"Z50","Serial":"","SoftwareVersion":"1528860449","Status":1},"Battery":{"BatteryState":"Discharging","Capacity":0,"Health":"Good","Level":83,"Status":1,"Temperature":24,"Voltage":0},"CreatedDate":"2018-12-06T16:21:35.770+0530","ConsumerID":0,"WiFi":{"BSSID":"02:00:00:00:00:00","Frequency":2462,"Is5GHz":false,"MACAddress":"02:00:00:00:00:00","MaxWifiSpeed":65,"SSID":"<unknown ssid>","SignalStrength":-68,"Status":1},"device":{"Brand":"LAVA","AlternateUniqueKey":"","ProductName":"Z50","DownloadedDeviceUniqueKey":"","Device":"Z50","Manufacturer":"LAVA","ProductUniqueID":""},"ProductUniqueID":"","status":1,"app":"Servify","timezone":"+0530","version":"53","languagecode":1,"LanguageID":1,"LanguageCode":"en","CountryCode":"IN","CountryID":105,"PhoneCode":91,"sourcedevice":"Android","skipMapping":true}' },
partition: 0,
attributes: 0 } ]
result:  { test1: { '0': 41 } }

Kafka主题日志文件中的数据格式不可读:
 8��Z������������������������������
                              8��Z������������������������������

以下是消费者产出:
kafka-console-consumer --bootstrap-server localhost:9092 --topic test1 --from-beginning
null
null
null
null
null

非常感谢您的帮助!

最佳答案

您在payloads中使用了错误的键,它应该是messages:messages,而不是message:messages
完整示例

var kafka = require('kafka-node');

var topicName = 'test1';
var client = new kafka.Client('localhost:2181');
var producer = new kafka.HighLevelProducer(client);
var messages = 'hello world';

console.log('get into postdata');

payloads = [{topic: topicName, messages: messages, partition:0}];
producer.on('ready', function() {
  producer.send(payloads, function(error, result) {
    console.info('Sent payload to Kafka: ', payloads);
    if (error) {
      console.error(error);
    } else {
      console.log('result: ', result);
    }
  });
});

node.js - 从Node.js推送到Kafka主题时,获取使用者的空值?-LMLPHP

10-04 14:58