我试图使用robinhood /浮士德,但没有成功!

我已经创建了一个生产者,该生产者成功地在我的confluent-kafka localhost实例中插入了原始主题!

但是浮士德人无法连接到本地主机。

我的app.py:

import faust
import base64
import random
from datetime import datetime


SOURCE_TOPIC="input_msgs"
TARGET_TOPIC="output_msgs"

app = faust.App("messages-stream",
    broker="kafka://"+'localhost:9092',
    topic_partitions=1,
    store="memory://")

class OriginalMessage(faust.Record):
    msg: str


class TransformedMessage(faust.Record):
    msg_id: int
    msg_data: str
    msg_base64: str
    created_at: float
    source_topic: str
    target_topic: str
    deleted: bool

topic = app.topic(SOURCE_TOPIC, value_type=OriginalMessage)
out_topic = app.topic(TARGET_TOPIC, partitions=1)

table = app.Table(
    "output_msgs",
    default=TransformedMessage,
    partitions=1,
    changelog_topic=out_topic,
)

print('Initializing Thread Processor...')


@app.agent(topic)
async def transformedmessage(messageevents):
    async for transfmessage in messageevents:
        try:

            table[transfmessage.msg_id] = random.randint(1, 999999)
            table[transfmessage.msg_data] = transfmessage.msg
            table[transfmessage.msg_base64] = base64.b64encode(transfmessage.msg)
            table[transfmessage.created_at] = datetime.now().isoformat()
            table[transfmessage.source_topic] = SOURCE_TOPIC
            table[transfmessage.target_topic] = TARGET_TOPIC
            table[transfmessage.deleted] = False

        except Exception as e:
            print(f"Error: {e}")


if __name__ == "__main__":
    app.main()


错误

[2020-01-24 18:05:36,910] [55712] [ERROR] Unable connect to node with id 1: [Errno 8] nodename nor servname provided, or not known
[2020-01-24 18:05:36,910] [55712] [ERROR] [^Worker]: Error: ConnectionError('No connection to node with id 1')

    "No connection to node with id {}".format(node_id))
kafka.errors.ConnectionError: ConnectionError: No connection to node with id 1


我正在使用:faust -A app worker -l debug

最佳答案

我决定将所有的卡夫卡融合在一起并解决。

confluent-kafka all in one

关于python - 无法连接到ID为1的节点:[Worker]:错误:ConnectionError(“无连接到ID为1的节点”),我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/59903780/

10-15 00:38