我已经构建了一个软件,该软件使用GCP Pub / Sub作为消息队列,使用Apache Beam来构建管道,使用Flask来构建网络服务器。它在生产中运行平稳,但是我很难使所有组件都与docker-compose连接在一起,尤其是Apache Beam管道。

我按照Dataflow pipeline and pubsub emulator进行操作,通过将SO回答中的localhost替换为我的docker-compose.yaml中定义的服务名称,使管道监听GCP发布/订阅模拟器:

  pubsub_emulator:
    build: docker_images/message_queue
    ports:
      - 8085:8085

  webserver:
    build: docker_images/webserver
    environment:
      PUBSUB_EMULATOR_HOST: pubsub_emulator:8085
      PUBSUB_PROJECT_ID: my-dev
    restart: unless-stopped
    ports:
      - 8899:8080
    depends_on:
      - pubsub_emulator

   pipeline:
    build: docker_images/pipeline
    environment:
      PUBSUB_EMULATOR_HOST: pubsub_emulator:8085
      PUBSUB_PROJECT_ID: my-dev
    restart: unless-stopped
    depends_on:
      - pubsub_emulator


该网络服务器能够访问发布/订阅模拟器并生成主题。

但是,使用MalformedURLException启动时管道失败:

Caused by: java.lang.IllegalArgumentException: java.net.MalformedURLException: no protocol: pubsub_emulator:8085/v1/projects/my-dev/subscriptions/sync_beam_1702190853678138166


管道的选项似乎很好,我用以下方法定义了它们:

final String pubSubEmulatorHost = System.getenv("PUBSUB_EMULATOR_HOST");

BasePipeline.PipeOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
                                .as(BasePipeline.PipeOptions.class);

options.as(DataflowPipelineOptions.class).setStreaming(true);

options.as(PubsubOptions.class).setPubsubRootUrl(pubSubEmulatorHost);

Pipeline pipeline = Pipeline.create(options);


是否有人暗示发生了什么以及如何解决?唯一的解决方案是否意味着将仿真器和管道设置在同一个docker中?

最佳答案

您可以尝试将值更改为以下内容:

http://pubsub_emulator:8085


由于错误提示您缺少protocol(在您的情况下可能是http

根据Apache Beam SDK,该值应为完全限定的URL:

// getPubsubRootUrl
@Default.String(value="https://pubsub.googleapis.com")
 @Hidden
java.lang.String getPubsubRootUrl()
// Root URL for use with the Google Cloud Pub/Sub API.


但是,如果您来自python背景,则会注意到Python SDK,它使用gRPC Python,如here所示,期望仅由地址和端口组成的服务器地址

# A snippet from google-cloud-python library.
if os.environ.get("PUBSUB_EMULATOR_HOST"):
    kwargs["channel"] = grpc.insecure_channel(
        target=os.environ.get("PUBSUB_EMULATOR_HOST")
    )


grpc.insecure_channel(target, options=None)
Creates an insecure Channel to a server.

The returned Channel is thread-safe.

Parameters:
target – The server address

关于java - Apache Beam:无法通过docker-compose访问发布/订阅模拟器,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/55064146/

10-09 19:26