本文介绍了将Eclipse Ditto连接到Apache Kafka的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在按照本文档中介绍的说明将Apache Kafka连接到Eclipse Ditto.

I am following the instructions explained in this documentation to connect Apache Kafka to Eclipse Ditto.

https://www.eclipse.org/ditto/connectivity-protocol-bindings-kafka2.html

我不确定以下内容.

1)在授权上下文中[[ditto:outbound-auth-subject," ...].

1) ["ditto:outbound-auth-subject", "..."] under the Authorization context.

2)地址":主题/关键字"

2) "address": "topic/key"

请让我知道它们!预先谢谢你!!

Please let me know about them! Thank you in advance.!

请找到我用来连接Ditto和Kafka的命令

Please find the command that I used to connect Ditto and Kafka

curl -X POST -i -u devops:foobar -H 'Content-Type: application/json' -d '{
"targetActorSelection": "/system/sharding/connection",
"headers": {
    "aggregate": false
},
"piggybackCommand": {
    "type": "connectivity.commands:createConnection",
    "connection": {
        "id": "MyKafkaConnection1",
        "connectionType": "kafka",
        "connectionStatus": "open",
        "uri": "tcp://radsah:password@localhost:9092",
        "specificConfig": {
        "bootstrapServers": "10.196.2.218:9092",
        "saslMechanism": "plain"
        },
        "failoverEnabled": true,
        "targets": [
              {
             "address": "digital-twins",
              "topics": [
               "_/_/things/twin/events",
               "_/_/things/live/messages"
                            ],
            "authorizationContext": ["ditto:outbound-auth-subject"]
        }],
        "mappingContext": {
            "mappingEngine": "JavaScript",
            "options": {
                "incomingScript": "function mapToDittoProtocolMsg(\n    headers,\n    textPayload,\n    bytePayload,\n    contentType\n) {\n\n    if (contentType !== \"application/json\") {\n        return null;\n    }\n\n    var jsonData = JSON.parse(textPayload);\n    var temperature = jsonData.temp;\n    var humidity = jsonData.hum;\n    \n    var path;\n    var value;\n    if (temperature != null && humidity != null) {\n        path = \"/features\";\n        value = {\n                temperature: {\n                    properties: {\n                        value: temperature\n                    }\n                },\n                humidity: {\n                    properties: {\n                        value: humidity\n                    }\n                }\n            };\n    } else if (temperature != null) {\n        path = \"/features/temperature/properties/value\";\n        value = temperature;\n    } else if (humidity != null) {\n        path = \"/features/humidity/properties/value\";\n        value = humidity;\n    }\n    \n    if (!path || !value) {\n        return null;\n    }\n\n    return Ditto.buildDittoProtocolMsg(\n        \"org.eclipse.ditto\",\n        headers[\"device_id\"],\n        \"things\",\n        \"twin\",\n        \"commands\",\n        \"modify\",\n        path,\n        headers,\n        value\n    );\n}"
            }
        }
    }
}
}' http://localhost:8080/devops/piggyback/connectivity?timeout=8000

我已经使用Hono注册了设备,并且正在将数据发送到Ditto.同上可以成功接收数据.但是我想将接收到的数据发送给Kafka.

I have registered a device using Hono and I am sending the data to Ditto. Ditto successfully receives the data. But I want send this received data to Kafka.

Kafka和Ditto之间的连接已成功建立.但是我没有收到卡夫卡消费者的数字双胞胎".我想念什么吗?

Connection is successfully established between Kafka and Ditto. But I am not receiving at the kafka-consumer "digital-twins". Am I missing something?

使用策略"命令

curl -X PUT 'http://localhost:8080/api/2/policies/org.eclipse.ditto:5100' -u 'ditto:ditto' -H 'Content-Type: application/json' -d '{
"entries": {
    "owner": {
        "subjects": {
            "nginx:ditto": {
                "type": "nginx basic auth user"
            }
        },
        "resources": {
            "thing:/": {
                "grant": [
                    "READ","WRITE"
                ],
                "revoke": []
            },
            "policy:/": {
                "grant": [
                    "READ","WRITE"
                ],
                "revoke": []
            },
            "message:/": {
                "grant": [
                    "READ","WRITE"
                ],
                "revoke": []
            }
        }
    }
}
}

推荐答案

关于授权上下文,您可以查看我们的连接文档中的授权部分.它必须包含您的事物的策略或ACL中定义的主题.

regarding the authorization context you can have a look at the authorization section in our connections documentation. It has to hold a subject that is defined in the policy or ACL of your things.

例如:

事物"foo:bar"的策略已为主题"somePrefix:someValue"定义了对整个事物的读取访问权限.

The policy of Thing "foo:bar" has read access of the whole thing for subject "somePrefix:someValue" defined.

{
  "policyId": "foo:bar",
  "entries": {
    ... //Maybe more entries
    "MyKafkaConnection": {
      "subjects": {
        "somePrefix:someValue": {
          "type": "my description for this subject"
        }
      },
      "resources": {
        "thing:/": {
          "grant": [
            "READ"
          ],
          "revoke": []
        },
        "message:/": {
          "grant": [
            "READ"
          ],
          "revoke": []
        }
      }
    }
  }
}

在您引用的示例中,与"foo:bar"相关的事件将通过kafka连接在您在地址字段中指定的主题上发布.

In the sample you referring to, the events related to "foo:bar" would then be published via the kafka connection on the topic you specified in the address field.

这篇关于将Eclipse Ditto连接到Apache Kafka的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

09-01 21:13