问题描述
我已经创建了 https://github.com/mongodb/mongo-kafka
但这如何与我正在运行的kafka实例连接.
But how does this run to connect with my running kafka instance.
这个问题听起来多么愚蠢.但是似乎没有可用的文档来使此代码与在本地运行的 mongodb
的 replicaset
一起使用.
Even how stupid this question sound. But there is no documentation seems to be available to make this working with locally running replicaset
of mongodb
.
所有博客都指向使用mongo地图集.
All blogs point to using mongo atlas instead.
如果您有足够的资源,请指导我.
If you have a good resource, please guide me towards it.
更新1-
使用的maven插件- https://search.maven.org/artifact/org.mongodb.kafka/mongo-kafka-connect
Used maven plugin - https://search.maven.org/artifact/org.mongodb.kafka/mongo-kafka-connect
将其放入kafka插件中,然后重新启动kafka.
Placed it in kafka plugins, restarted kafka.
更新2-如何启用mongodb作为kafka的源代码?
UPDATE 2 -- How to enable mongodb as source for kafka?
https://github.com/mongodb/mongo-kafka/blob/master/config/MongoSourceConnector.properties
文件用作Kafka的配置
file to be used as a configuration for Kafka
bin/kafka-server-start.sh config/server.properties --override config/MongoSourceConnector.properties
更新3-以上方法在返回博客,其中未提及端口8083是什么.
UPDATE 3 - The above method hasn't worked going back to the blog which does not mention what the port 8083 is.
安装了Confluent和confluent-hub,仍然不确定mongo-connector是否与kafka一起使用.
Installed Confluent and confluent-hub, still unsure of the mongo-connector working with kafka.
更新4-
Zookeeper,Kafka服务器,Kafka连接正在运行
Zookeeper, Kafka Server, Kafka connect running
Mongo Kafka库文件 Kafka Connect Avro连接器库文件
使用以下命令,我的源代码可以正常工作了
Using below commands my source got working -
bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties
bin/connect-standalone.sh config/connect-standalone.properties config/MongoSourceConnector.properties
使用以下配置进行logstash,我能够将数据推送到elasticsearch-
And using below configuration for logstash I was able to push data into elasticsearch -
input {
kafka {
bootstrap_servers => "localhost:9092"
topics => ["users","organisations","skills"]
}
}
output {
elasticsearch {
hosts => ["localhost:9200"]
}
stdout { codec => rubydebug }
}
因此,现在一个MongoSourceConnector.properties保留了从中读取的单个集合名称,我需要为每个集合使用不同的属性文件运行kafka connect.
So now one MongoSourceConnector.properties keeps a single collection name it reads from, I need to run kafka connect with different property files for each collection.
我的Logstash将新数据推送到elasticsearch中,而不是更新旧数据.另外,它不会根据集合名称创建索引.想法是这应该能够与我的MongoDB数据库完美同步.
My Logstash is pushing new data into elasticsearch, instead of updating old data. Plus it is not creating indexes as per the name of the collection. The idea is this should be able to sync with my MongoDB Database perfectly.
最终更新-现在一切正常,
FINAL UPDATE - Everything is now working smoothly,
- 为kafka connect创建了多个属性文件
- 最新的logstash实际上根据主题名称创建索引,并相应地更新索引
input {
kafka {
bootstrap_servers => "localhost:9092"
decorate_events => true
topics => ["users","organisations","skills"]
}
}
filter {
json {
source => "message"
target => "json_payload"
}
json {
source => "[json_payload][payload]"
target => "payload"
}
mutate {
add_field => { "[es_index]" => "%{[@metadata][kafka][topic]}" }
rename => { "[payload][fullDocument][_id][$oid]" => "mongo_id"}
rename => { "[payload][fullDocument]" => "document"}
remove_field => ["message","json_payload","payload"]
}
}
output {
elasticsearch {
hosts => ["localhost:9200"]
index => "%{es_index}"
action => "update"
doc_as_upsert => true
document_id => "%{mongo_id}"
}
stdout {
codec =>
rubydebug {
metadata => true
}
}
}
推荐答案
成功使MongoDb与Elasticsearch同步的步骤-
Steps to successfully get MongoDb syncing with Elasticsearch -
- 首先部署mongodb副本-
//Make sure no mongo deamon instance is running
//To check all the ports which are listening or open
sudo lsof -i -P -n | grep LISTEN
//Kill the process Id of mongo instance
sudo kill 775
//Deploy replicaset
mongod --replSet "rs0" --bind_ip localhost --dbpath=/data/db
- 为Kafka创建配置属性
//dummycollection.properties <- Filename
name=dummycollection-source
connector.class=com.mongodb.kafka.connect.MongoSourceConnector
tasks.max=1
# Connection and source configuration
connection.uri=mongodb://localhost:27017
database=dummydatabase
collection=dummycollection
copy.existing=true
topic.prefix=
poll.max.batch.size=1000
poll.await.time.ms=5000
# Change stream options
publish.full.document.only=true
pipeline=[]
batch.size=0
collation=
- 确保url以下的JAR文件可用于您的kafka插件-
- 部署kafka
//Zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties
//Kaka Server
bin/kafka-server-start.sh config/server.properties
//Kaka Connect
bin/connect-standalone.sh config/connect-standalone.properties config/dummycollection.properties
- 配置Logstash-
// /etc/logstash/conf.d/apache.conf <- File
input {
kafka {
bootstrap_servers => "localhost:9092"
decorate_events => true
topics => ["dummydatabase.dummycollection"]
}
}
filter {
json {
source => "message"
target => "json_payload"
}
json {
source => "[json_payload][payload]"
target => "payload"
}
mutate {
add_field => { "[es_index]" => "%{[@metadata][kafka][topic]}" }
rename => { "[payload][fullDocument][_id][$oid]" => "mongo_id"}
rename => { "[payload][fullDocument]" => "document"}
remove_field => ["message","json_payload","payload"]
}
}
output {
elasticsearch {
hosts => ["localhost:9200"]
index => "%{es_index}"
action => "update"
doc_as_upsert => true
document_id => "%{mongo_id}"
}
stdout {
codec =>
rubydebug {
metadata => true
}
}
}
- 启动ElasticSearch,Kibana和Logstash
sudo systemctl start elasticsearch
sudo systemctl start kibana
sudo systemctl start logstash
- 测试
- 创建一个集合,在logstash主题中提及这些集合,并为Kafka创建属性文件
- 向其中添加数据
- 更新数据
打开Mongo罗盘,然后
Open Mongo Compass, and
在Elasticsearch中查看索引
Review indexes in Elasticsearch
这篇关于如何将mongo-kafka连接器作为kafka的源运行,并将其与logstash输入集成以将elasticsearch用作接收器?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!