示例环境

java.version: 1.8.x
flink.version: 1.11.1
rabbitMq:3.5.7

示例数据源 (项目码云下载)

Flink 系例 之 搭建开发环境与数据

示例模块 (pom.xml)

Flink 系例 之 DataStream Connectors 与 示例模块

数据流输入

DataStreamSource.java

package com.flink.examples.rabbitmq;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.rabbitmq.RMQSource;
import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig;

/**
 * @Description 从MQ中获取数据并输出到DataStream流中
 */
public class DataStreamSource {

    /**
     * 官方文档:https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/connectors/rabbitmq.html
     */

    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
                .setHost("127.0.0.1")
                .setPort(5672)
                .setUserName("admin")
                .setPassword("admin")
                .setVirtualHost("datastream")
                .build();

        final DataStream<String> stream = env
                .addSource(new RMQSource<String>( connectionConfig, "test", true, new SimpleStringSchema()))
                .setParallelism(1);

        stream.print();
        env.execute("flink rabbitMq source");
    }
}

数据流输出

DataStreamSink.java

package com.flink.examples.rabbitmq;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.rabbitmq.RMQSink;
import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig;

/**
 * @Description 将DataStream流中的数据输出到rabbitMq队列中
 */
public class DataStreamSink {

    /**
     * 官方文档:https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/connectors/rabbitmq.html
     */

    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
                .setHost("127.0.0.1")
                .setPort(5672)
                .setUserName("admin")
                .setPassword("admin")
                .setVirtualHost("datastream")
                .build();

        String [] words = new String[]{"props","student","build","name","execute"};
        final DataStream<String> stream = env.fromElements(words);
        stream.addSink(new RMQSink<String>(connectionConfig,"test",new SimpleStringSchema()));
        env.execute("flink rabbitMq sink");
    }
}

数据展示

Flink 系例 之 Connectors 连接 RabbitMq-LMLPHP

05-05 11:00