我正在像这样将来自mqtt发送方的数据消耗到我在kafka中的代理mqtt

mosquitto_pub -h 0.0.0.0 -p 1883 -t temperature -m '{"who":"ben", "timeepoc":1558212482, "lat":-33.87052833, "lon":151.21292, "alt":31.0, "batt":0, "speed":12.86}'
然后我在kafka中创建一个主题
kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic temperature
后来我用ksql中的表创建流
create stream carsensor (who VARCHAR, batt INTEGER, lon DOUBLE, lat DOUBLE, timeepoc BIGINT, alt INTEGER, speed DOUBLE)
with (kafka_topic = 'temperature',value_format='JSON');
CREATE table runner_status with (value_format='JSON') AS
select who
, min(speed) as min_speed
, max(speed) as max_speed
, min(GEO_DISTANCE(lat, lon, -33.87014, 151.211945, 'km')) as dist_to_finish
, count(*) as num_events
from carsensor WINDOW TUMBLING (size 5 minute)
group by who;
这是我的表数据运行器
{
  "ROWTIME": 1597418628366,
  "ROWKEY": "ben",
  "WINDOWSTART": 1597418400000,
  "WINDOWEND": 1597418700000,
  "WHO": "ben",
  "MIN_SPEED": 12.91,
  "MAX_SPEED": 12.91,
  "DIST_TO_FINISH": 0.07441178137496719,
  "NUM_EVENTS": 2
}
这是我的桌子汽车传感器
{
  "ROWTIME": 1597418628366,
  "ROWKEY": "temperature",
  "WHO": "ben",
  "BATT": 0,
  "LON": 151.21273,
  "LAT": -33.87029167,
  "TIMEEPOC": 1558212492,
  "ALT": 31,
  "SPEED": 12.91
}
然后我创建一个与elasticsearch的连接器
 CREATE SINK CONNECTOR SINK_ELASTIC_TEST_02_M WITH (
  'connector.class'         = 'io.confluent.connect.elasticsearch.ElasticsearchSinkConnector',
  'connection.url'          = 'http://localhost:9200',
  'key.converter'           = 'org.apache.kafka.connect.storage.StringConverter',
  'value.converter'= 'io.confluent.connect.avro.AvroConverter',
  'value.converter.schema.registry.url'= 'http://schema-registry:8081',
  'type.name'               = '_doc',
  'topics'                  = 'carsensor',
  'key.ignore'              = 'false',
  'schema.ignore'           = 'false',
  'transforms'                                  = 'ExtractTimestamp',
  'transforms.ExtractTimestamp.type'            = 'org.apache.kafka.connect.transforms.InsertField$Value',
  'transforms.ExtractTimestamp.timestamp.field' = 'EVENT_TS'
);
但是我可以在kibana上看到任何要在 map 上绘制位置的索引

最佳答案

我找到了解决方案jajaj,因为我的数据是我需要使用的json
键忽略和模式忽略,我只从温度创建表而不是两个

CREATE SINK CONNECTOR SINK_ELASTIC_TEST_JSON_x WITH (
  'connector.class'         = 'io.confluent.connect.elasticsearch.ElasticsearchSinkConnector',
  'connection.url'          = 'http://localhost:9200',
  'key.converter'           = 'org.apache.kafka.connect.storage.StringConverter',
  'value.converter'         = 'org.apache.kafka.connect.json.JsonConverter',
  'value.converter.schemas.enable' = 'false',
  'type.name'               = '_doc',
  'topics'                  = 'temperature',
  'key.ignore'              = 'true',
  'schema.ignore'           = 'true'
);

10-06 12:48