I have the code below for connecting to kafka using the python beam sdk. I know that the ReadFromKafka transform is run in a java sdk harness (docker container) but I have not been able to figure out how to make ssl.truststore.location and ssl.keystore.location accesible inside the sdk harness' docker environment. The job_endpoint argument is pointing to java -jar beam-runners-flink-1.10-job-server-2.27.0.jar --flink-master localhost:8081pipeline_args.extend([ '--job_name=paul_test', '--runner=PortableRunner', '--sdk_location=container', '--job_endpoint=localhost:8099', '--streaming', "--environment_type=DOCKER", f"--sdk_harness_container_image_overrides=.*java.*,{my_beam_sdk_docker_image}:{my_beam_docker_tag}",])with beam.Pipeline(options=PipelineOptions(pipeline_args)) as pipeline: kafka = pipeline | ReadFromKafka( consumer_config={ "bootstrap.servers": "bootstrap-server:17032", "security.protocol": "SSL", "ssl.truststore.location": "/opt/keys/client.truststore.jks", # how do I make this available to the Java SDK harness "ssl.truststore.password": "password", "ssl.keystore.type": "PKCS12", "ssl.keystore.location": "/opt/keys/client.keystore.p12", # how do I make this available to the Java SDK harness "ssl.keystore.password": "password", "group.id": "group", "basic.auth.credentials.source": "USER_INFO", "schema.registry.basic.auth.user.info": "user:password" }, topics=["topic"], max_num_records=2, # expansion_service="localhost:56938" ) kafka | beam.Map(lambda x: print(x))我尝试指定 image override 选项为 --sdk_harness_container_image_overrides='.*java.*,beam_java_sdk:latest' - 其中 beam_java_sdk:latest 是我基于的 docker 镜像apache/beam_java11_sdk:2.27.0 并在其 entrypoint.sh 中提取凭证.但是 Beam 似乎没有使用它,我明白了I tried specifying the image override option as --sdk_harness_container_image_overrides='.*java.*,beam_java_sdk:latest' - where beam_java_sdk:latest is a docker image I based on apache/beam_java11_sdk:2.27.0 and that pulls the credetials in its entrypoint.sh. But Beam does not appear to use it, I seeINFO org.apache.beam.runners.fnexecution.environment.DockerEnvironmentFactory - Still waiting for startup of environment apache/beam_java11_sdk:2.27.0 for worker id 1-1在日志中.很快就会不可避免地紧随其后in the logs. Which is soon inevitebly followed byCaused by: org.apache.kafka.common.KafkaException: org.apache.kafka.common.KafkaException: org.apache.kafka.common.KafkaException: Failed to load SSL keystore /opt/keys/client.keystore.p12 of type PKCS12总而言之,我的问题是,在 Apache Beam 中,是否可以从 python 梁 sdk 中使文件在 java sdk 工具 docker 容器中可用?如果是这样,怎么做?非常感谢.推荐答案目前,没有直接的方法来实现这一点.正在进行讨论和跟踪问题以提供对这种扩展服务定制的支持(请参阅 此处,此处、BEAM-12538 和 BEAM-12539).这就是简短的回答.Currently, there is no straightforward way to achieve this. There is ongoing discussion and tracking issues to provide support for this kind of expansion service customization (see here, here, BEAM-12538 and BEAM-12539). That is the short answer.长答案是肯定的,你可以这样做.您必须复制并粘贴 ExpansionService.java 到您的代码库中并构建您的自定义扩展服务,您可以在其中指定默认环境 (DOCKER) 和默认环境配置(您的图像)此处.然后,您必须手动运行此扩展服务并使用 ReadFromKafka 的 expansion_service 参数指定其地址.Long answer is yes, you can do that. You would have to copy &paste ExpansionService.java into your codebase and build your custom expansion service, where you specify default environment (DOCKER) and default environment config (your image) here. You then have to manually run this expansion service and specify its address using expansion_service parameter of ReadFromKafka. 这篇关于使用 Beam Python SDK 和 PortableRunner 通过 SSL 连接到 Kafka的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持! 上岸,阿里云!
09-05 11:31
查看更多