我正在像这样将来自mqtt发送方的数据消耗到我在kafka中的代理mqtt
mosquitto_pub -h 0.0.0.0 -p 1883 -t temperature -m '{"who":"ben", "timeepoc":1558212482, "lat":-33.87052833, "lon":151.21292, "alt":31.0, "batt":0, "speed":12.86}'
然后我在kafka中创建一个主题kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic temperature
后来我用ksql中的表创建流create stream carsensor (who VARCHAR, batt INTEGER, lon DOUBLE, lat DOUBLE, timeepoc BIGINT, alt INTEGER, speed DOUBLE)
with (kafka_topic = 'temperature',value_format='JSON');
CREATE table runner_status with (value_format='JSON') AS
select who
, min(speed) as min_speed
, max(speed) as max_speed
, min(GEO_DISTANCE(lat, lon, -33.87014, 151.211945, 'km')) as dist_to_finish
, count(*) as num_events
from carsensor WINDOW TUMBLING (size 5 minute)
group by who;
这是我的表数据运行器{
"ROWTIME": 1597418628366,
"ROWKEY": "ben",
"WINDOWSTART": 1597418400000,
"WINDOWEND": 1597418700000,
"WHO": "ben",
"MIN_SPEED": 12.91,
"MAX_SPEED": 12.91,
"DIST_TO_FINISH": 0.07441178137496719,
"NUM_EVENTS": 2
}
这是我的桌子汽车传感器{
"ROWTIME": 1597418628366,
"ROWKEY": "temperature",
"WHO": "ben",
"BATT": 0,
"LON": 151.21273,
"LAT": -33.87029167,
"TIMEEPOC": 1558212492,
"ALT": 31,
"SPEED": 12.91
}
然后我创建一个与elasticsearch的连接器 CREATE SINK CONNECTOR SINK_ELASTIC_TEST_02_M WITH (
'connector.class' = 'io.confluent.connect.elasticsearch.ElasticsearchSinkConnector',
'connection.url' = 'http://localhost:9200',
'key.converter' = 'org.apache.kafka.connect.storage.StringConverter',
'value.converter'= 'io.confluent.connect.avro.AvroConverter',
'value.converter.schema.registry.url'= 'http://schema-registry:8081',
'type.name' = '_doc',
'topics' = 'carsensor',
'key.ignore' = 'false',
'schema.ignore' = 'false',
'transforms' = 'ExtractTimestamp',
'transforms.ExtractTimestamp.type' = 'org.apache.kafka.connect.transforms.InsertField$Value',
'transforms.ExtractTimestamp.timestamp.field' = 'EVENT_TS'
);
但是我可以在kibana上看到任何要在 map 上绘制位置的索引 最佳答案
我找到了解决方案jajaj,因为我的数据是我需要使用的json
键忽略和模式忽略,我只从温度创建表而不是两个
CREATE SINK CONNECTOR SINK_ELASTIC_TEST_JSON_x WITH (
'connector.class' = 'io.confluent.connect.elasticsearch.ElasticsearchSinkConnector',
'connection.url' = 'http://localhost:9200',
'key.converter' = 'org.apache.kafka.connect.storage.StringConverter',
'value.converter' = 'org.apache.kafka.connect.json.JsonConverter',
'value.converter.schemas.enable' = 'false',
'type.name' = '_doc',
'topics' = 'temperature',
'key.ignore' = 'true',
'schema.ignore' = 'true'
);