原创: 影宸风洛 SpringForAll社区 昨天

1 概述

Spring Cloud Data Flow是一个用于构建实时数据管道和批处理过程的云原生工具包。 Spring Cloud Data Flow已准备好用于一系列数据处理用例,如简单的导入/导出,ETL处理,事件流和预测分析。

在本教程中,我们将学习使用流管道实时提取转换和加载(ETL)的示例,该管道从JDBC数据库中提取数据,将其转换为简单的POJO并将其加载到MongoDB中。

2. ETL and Event-Stream Processing

ETL - 提取,转换和加载 -通常被认为将数据从多个数据库和系统批量加载到公共数据仓库中的过程。在此数据仓库中,可以在不影响系统整体性能的情况下进行大量数据分析处理。

然而,新趋势正在改变如何做到这一点的方式。 ETL仍然可以将数据传输到数据仓库和数据池。

现在,可以借助Spring Cloud Data Flow的事件流体系架构使用流来完成此操作。

3. Spring Cloud Data Flow

借助Spring Cloud Data Flow(SCDF),开发人员可以创建两种风格的数据管道:

  • 使用Spring Cloud Stream的长效实时流应用程序

  • 使用Spring Cloud Task的批处理短期任务应用程序

在本文中,我们将介绍第一个,基于Spring Cloud Stream的长效流媒体应用程序。

3.1. Spring Cloud Stream Applications

SCDF管道流由不同的步骤组成,其中每一步都是使用Spring Cloud Stream微框架以Spring Boot样式构建的应用程序。这些应用程序集成了像Apache Kafka或RabbitMQ等的消息中间件。

这些应用程序分为源,处理器和接收器。与ETL过程相比,我们可以说源是“提取”,处理器是“转换器”,接收器是“加载”部分。

在某些情况下,我们可以在管道的一个或多个步骤中使用应用程序启动器。这意味着我们不需要为每一步实现新的应用程序,而是配置已实现的现有应用程序启动器。

可以在此处找到应用程序启动器列表。

3.2. Spring Cloud Data Flow Server

4. 环境设置(Environment Setup)

在开始之前,我们需要选择这个复杂部署的部分。要定义的第一部分是SCDF服务器。

为了进行测试,我们将使用SCDF Server Local进行本地开发。对于生产部署,我们稍后可以选择云本机运行时,如SCDF Server Kubernetes。我们可以在这里找到服务器运行列表。

现在,检查系统要求是否满足运行此服务器。

4.1. 系统要求(System Requirements)

要运行SCDF服务器,我们必须定义并设置两个依赖项:

  • 消息中间件

  • 关系数据库管理系统(the RDBMS)

我们将使用RabbitMQ作为消息中间件,我们选择PostgreSQL作为RDBMS来存储我们的管道流定义。

为了运行RabbitMQ,可以在此处下载最新版本并使用默认配置启动RabbitMQ实例或运行以下Docker命令:

  1. docker run --name dataflow-rabbit -p 15672:15672 -p 5672:5672 -d rabbitmq:3-management

最后的设置步骤:在默认端口5432上安装并运行PostgreSQL RDBMS。之后,创建一个数据库,SCDF可以使用以下脚本存储其流定义:

  1. CREATE DATABASE dataflow;

4.2. 本地Spring Cloud数据流服务器(Spring Cloud Data Flow Server Local)

我们可以选择使用docker-compose启动服务器,或者将其作为Java应用程序启动来运行SCDF Server Local。

在这里,我们将SCDF Server Local作为Java应用程序运行。为了配置应用程序,我们必须将配置定义为Java应用程序参数。我们在系统路径中需要配置Java 8。

为了托管jar和依赖项,我们需要为SCDF Server创建一个主文件夹,并将SCDF Server Local发行版下载到此文件夹中。您可以在此处下载SCDF Server Local最新分行版。

此外,我们需要创建一个lib文件夹并在其中放置JDBC驱动程序。这里提供了最新版本的PostgreSQL驱动程序。

最后,运行SCDF本地服务器:

  1. $java -Dloader.path=lib -jar spring-cloud-dataflow-server-local-1.6.3.RELEASE.jar \

  2.    --spring.datasource.url=jdbc:postgresql://127.0.0.1:5432/dataflow \

  3.    --spring.datasource.username=postgres_username \

  4.    --spring.datasource.password=postgres_password \

  5.    --spring.datasource.driver-class-name=org.postgresql.Driver \

  6.    --spring.rabbitmq.host=127.0.0.1 \

  7.    --spring.rabbitmq.port=5672 \

  8.    --spring.rabbitmq.username=guest \

  9.    --spring.rabbitmq.password=guest

我们可以通过查看此URL来检查它是否正在运行:http://localhost:9393/dashboard

4.3. Spring Cloud Data Flow Shell

SCDF Shell是一个命令行工具,可以轻松组合和部署我们的应用程序和管道。这些Shell命令在Spring Cloud Data Flow Server REST API上运行。

在此处获得最新版本的jar,并且下载到SCDF主文件夹中。完成后,运行以下命令(根据需要更新版本):

  1. $ java -jar spring-cloud-dataflow-shell-1.6.3.RELEASE.jar

  2.  ____                              ____ _                __

  3. / ___| _ __  _ __(_)_ __   __ _   / ___| | ___  _   _  __| |

  4. \___ \| '_ \| '__| | '_ \ / _` | | |   | |/ _ \| | | |/ _` |

  5.  ___) | |_) | |  | | | | | (_| | | |___| | (_) | |_| | (_| |

  6. |____/| .__/|_|  |_|_| |_|\__, |  \____|_|\___/ \__,_|\__,_|

  7.  ____ |_|    _          __|___/                 __________

  8. |  _ \  __ _| |_ __ _  |  ___| | _____      __  \ \ \ \ \ \

  9. | | | |/ _` | __/ _` | | |_  | |/ _ \ \ /\ / /   \ \ \ \ \ \

  10. | |_| | (_| | || (_| | |  _| | | (_) \ V  V /    / / / / / /

  11. |____/ \__,_|\__\__,_| |_|   |_|\___/ \_/\_/    /_/_/_/_/_/

  12. Welcome to the Spring Cloud Data Flow shell. For assistance hit TAB or type "help".

  13. dataflow:>

如果最后一行中获得“server-unknown:>”而不是“dataflow:>”,则表示您没有在localhost上运行SCDF服务器。在这种情况下,请运行以下命令以连接到另一台主机:

  1. server-unknown:>dataflow config server http://{host}

现在,Shell连接到SCDF服务器,我们可以运行我们的命令。

我们在Shell中需要做的第一件事就是导入应用程序启动器。在Spring Boot 2.0.x中找到RabbitMQ + Maven的最新版本,并运行以下命令(再次声明,根据需要更新版本,此处为“Darwin-SR1”):

  1. $ dataflow:>app import --uri http://bit.ly/Darwin-SR1-stream-applications-rabbit-maven

检查应用程序是否安装完成,请运行以下Shell命令:

  1. $ dataflow:> app list

因此,我们应该看到一个包含所有已安装应用程序的表。

此外,SCDF提供了一个名为Flo的图形界面,我们可以通过以下地址访问:http://localhost:9393/dashboard。但是,它的使用不在本文的范围内。

5 编写ETL管道(Composing an ETL Pipeline)

我们现在创建我们的流管道。为此,我们将使用JDBC Source应用启动程序从关系数据库中提取信息。

此外,我们将创建一个自定义处理器,用于转换信息结构和一个自定义接收器,将数据加载到MongoDB中。

5.1 提取-准备关系数据库以进行提取

创建一个名为crm的数据库和一个名为customer的表:

  1. CREATE DATABASE crm;

  2. CREATE TABLE customer (

  3.    id bigint NOT NULL,

  4.    imported boolean DEFAULT false,

  5.    customer_name character varying(50),

  6.    PRIMARY KEY(id)

  7. )

请注意,我们正在使用导入的标志,该标志将存储已导入的记录。如有必要,我们还可以将此信息存储在另一个表中。

现在,插入一些数据:

  1. INSERT INTO customer(id, customer_name, imported) VALUES (1, 'John Doe', false);

5.2 转换-JDBC字段映射到MongoDB字段结构

对于转换步骤,我们将把源表customer_name字段简单转换为新字段名称。其他转换可以在这里完成,但尽量保持简短例子。

为此,我们将创建一个名为customer-transform的新项目。最简单的方法是使用Spring Initializr站点创建项目。看到网站后,选择一个Group和一个Artifact名称。我们将分别使用com.customer和customer-transform。

完成后,单击“生成项目”按钮下载项目。然后,解压缩项目并将其导入喜欢的IDE,并将以下依赖项添加到pom.xml:

  1. <dependency>

  2.    <groupId>org.springframework.cloud</groupId>

  3.    <artifactId>spring-cloud-stream-binder-rabbit</artifactId>

  4. </dependency>

现在我们开始为字段名称转换进行编码。为此,我们将创建Customer类作为适配器。此类将通过setName()方法接收customer_name,并将通过getName方法输出其值。

@JsonProperty注释在JSON反序列化到Java时执行转换:

  1. public class Customer {

  2.    private Long id;

  3.    private String name;

  4.    @JsonProperty("customer_name")

  5.    public void setName(String name) {

  6.        this.name = name;

  7.    }

  8.    @JsonProperty("name")

  9.    public String getName() {

  10.        return name;

  11.    }

  12.    // Getters and Setters

  13. }

处理器需要从输入接收数据,进行转换并将结果绑定到输出通道。创建一个类来执行此操作:

  1. import org.springframework.cloud.stream.annotation.EnableBinding;

  2. import org.springframework.cloud.stream.messaging.Processor;

  3. import org.springframework.integration.annotation.Transformer;

  4. @EnableBinding(Processor.class)

  5. public class CustomerProcessorConfiguration {

  6.    @Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)

  7.    public Customer convertToPojo(Customer payload) {

  8.        return payload;

  9.    }

  10. }

在上面的代码中,我们可以观察到转换是自动发生的。输入接收JSON数据,Jackson使用set方法将其反序列化为Customer对象。

与输入相反,输出使用get方法将数据序列化为JSON。

5.3 加载-MongoDB接收器

与转换步骤类似,我们将创建另一个maven项目,名为customer-mongodb-sink。再次,访问Spring Initializr,Group名为com.customer,Artifact名为customer-mongodb-sink。然后,在依赖项搜索框中键入“MongoDB”并下载项目。

接下来,项目解压缩并导入IDE.

然后,添加与customer-transform项目中相同的额外依赖项。

现在我们将创建另一个Customer类,用于在此步骤中接收输入:

  1. import org.springframework.data.mongodb.core.mapping.Document;

  2. @Document(collection="customer")

  3. public class Customer {

  4.    private Long id;

  5.    private String name;

  6.    // Getters and Setters

  7. }

为了接收Customer,我们将创建一个Listener类,它将使用CustomerRepository保存客户实体:

  1. @EnableBinding(Sink.class)

  2. public class CustomerListener {

  3.    @Autowired

  4.    private CustomerRepository repository;

  5.    @StreamListener(Sink.INPUT)

  6.    public void save(Customer customer) {

  7.        repository.save(customer);

  8.    }

  9. }

在这种情况下,CustomerRepository是Spring Data的MongoRepository:

  1. import org.springframework.data.mongodb.repository.MongoRepository;

  2. import org.springframework.stereotype.Repository;

  3. @Repository

  4. public interface CustomerRepository extends MongoRepository<Customer, Long> {

  5. }

5.4 流定义

现在,两个自定义应用程序都可以在SCDF服务器上注册。为了实现这个目标,先使用Maven命令mvn install编译这两个项目。

之后,使用Spring Cloud Data Flow Shell注册它们:

  1. app register --name customer-transform --type processor --uri maven://com.customer:customer-transform:0.0.1-SNAPSHOT

  2. app register --name customer-mongodb-sink --type sink --uri maven://com.customer:customer-mongodb-sink:jar:0.0.1-SNAPSHOT

最后,检查应用程序是否存储在SCDF中,在shell中运行application list命令:

  1. app list

因此,我们应该在结果表中看到这两个应用程序。

5.4.1 流管道领域特定语言-DSL

DSL定义应用程序之间的配置和数据流。 SCDF DSL很简单。在第一个单词中,我们定义应用程序的名称,然后是配置。

此外,语法是受Unix启发的Pipeline语法,它使用垂直条(也称为“管道”)来连接多个应用程序:

  1. http --port=8181 | log

创建端口是8181HTTP应用程序,该应用程序将收到的任何正文有效负载发送到日志。

现在,让我们看看如何创建JDBC Source的DSL流定义。

5.4.2. JDBC Source流定义

JDBC Source的关键配置是查询和更新。查询将选择未读记录,而更新将更改标志以防止重新读取当前记录。

此外,我们将定义JDBC Source以30秒的固定延迟轮询并轮询最多1000行。最后,我们将定义连接的配置,如驱动程序,用户名,密码和连接URL:

  1. jdbc

  2.    --query='SELECT id, customer_name FROM public.customer WHERE imported = false'

  3.    --update='UPDATE public.customer SET imported = true WHERE id in (:id)'

  4.    --max-rows-per-poll=1000

  5.    --fixed-delay=30 --time-unit=SECONDS

  6.    --driver-class-name=org.postgresql.Driver

  7.    --url=jdbc:postgresql://localhost:5432/crm

  8.    --username=postgres

  9.    --password=postgres

可以在此处找到更多JDBC Source配置属性。

5.4.3 Customer MongoDB Sink流定义

由于我们没有在customer-mongodb-sink的application.properties中定义连接配置,我们将通过DSL参数进行配置。

我们的应用程序完全基于MongoDataAutoConfiguration。您可以在此处查看其他可能的配置。基本上,我们将定义spring.data.mongodb.uri:

  1. customer-mongodb-sink --spring.data.mongodb.uri=mongodb://localhost/main

5.4.4 创建和部署定义

首先,要创建最终的流定义,请返回到Shell并执行以下命令(没有换行符,刚刚插入它们以便于阅读):

  1. stream create --name jdbc-to-mongodb

  2.  --definition "jdbc

  3.  --query='SELECT id, customer_name FROM public.customer WHERE imported=false'

  4.  --fixed-delay=30

  5.  --max-rows-per-poll=1000

  6.  --update='UPDATE customer SET imported=true WHERE id in (:id)'

  7.  --time-unit=SECONDS

  8.  --password=postgres

  9.  --driver-class-name=org.postgresql.Driver

  10.  --username=postgres

  11.  --url=jdbc:postgresql://localhost:5432/crm | customer-transform | customer-mongodb-sink

  12.  --spring.data.mongodb.uri=mongodb://localhost/main"

此DSL流被定义名为jdbc-to-mongodb。接下来,我们将按名称部署流:

  1. stream deploy --name jdbc-to-mongodb

最后,我们应该在日志输出中看到所有可用日志的位置:

  1. Logs will be in {PATH_TO_LOG}/spring-cloud-deployer/jdbc-to-mongodb/jdbc-to-mongodb.customer-mongodb-sink

  2. Logs will be in {PATH_TO_LOG}/spring-cloud-deployer/jdbc-to-mongodb/jdbc-to-mongodb.customer-transform

  3. Logs will be in {PATH_TO_LOG}/spring-cloud-deployer/jdbc-to-mongodb/jdbc-to-mongodb.jdbc

6 结束语

在本文中,我们已经看到了使用Spring Cloud Data Flow的ETL数据管道的完整示例。

最值得注意的是,我们看到了应用启动程序的配置,使用Spring Cloud Data Flow Shell创建了一个ETL流管道,并为我们的读取,转换和写数据实现了自定义应用程序。

与往常一样,示例代码可以在GitHub中找到。

05-04 00:02