前些时间做了关于Apache Nifi
分布式集群的搭建分享,但很多时候要搭建分布式集群机器资源是个问题,而现在的单机的配置还是相当不错的,故现在就做个关于Windows上搭建个伪分布式集群的分享,同时通过另外一种方式实现Apache Nifi的授权认证。
系统环境及软件版本
Windows8.1
JDK1.8.0_131
Nifi-1.4.0
xxx\nifi-ncm | 9443 |
xxx\nifi-cluster01 | 9444 |
xxx\nifi-cluster02 | 9445 |
Nifi的服务证书
生成本地Nifi服务证书
解压nifi-toolkit-1.4.0-bin.tar.gz
文件后,通过CMD进入bin
目录,执行以下的命令:
D:\DevelopTools\nifi-toolkit-1.4.0\bin>tls-toolkit.bat standalone -n "localhost(
3)" -C "CN=Admin, OU=ApacheNIFI" -o "..\target"
2017/10/26 18:21:32 INFO [main] org.apache.nifi.toolkit.tls.standalone.TlsToolki
tStandaloneCommandLine: No nifiPropertiesFile specified, using embedded one.
2017/10/26 18:21:32 INFO [main] org.apache.nifi.toolkit.tls.standalone.TlsToolki
tStandalone: Running standalone certificate generation with output directory ..\
target
******************************************************************************
2017/10/26 18:21:34 INFO [main] org.apache.nifi.toolkit.tls.standalone.TlsToolki
tStandalone: Successfully generated client certificate ..\target\CN=Admin_OU=Apa
cheNIFI.p12
2017/10/26 18:21:34 INFO [main] org.apache.nifi.toolkit.tls.standalone.TlsToolki
tStandalone: tls-toolkit standalone completed successfully
生成后的目录结构如下:
Folder PATH listing for volume senhui.li
Volume serial number is 000000F0 FA46:A0EB
D:.
│ CN=Admin_OU=ApacheNIFI.p12
│ CN=Admin_OU=ApacheNIFI.password
│ nifi-cert.pem
│ nifi-key.key
│
├─localhost
│ keystore.jks
│ nifi.properties
│ truststore.jks
│
├─localhost_2
│ keystore.jks
│ nifi.properties
│ truststore.jks
│
└─localhost_3
keystore.jks
nifi.properties
truststore.jks
拷贝Nifi服务证书
- 将
localhost
目录下的文件拷贝到nifi-ncm
目录下替换所有的文件 - 将
localhost_2
目录下的文件拷贝到nifi-cluster01
目录下替换所有的文件 - 将
localhost_3
目录下的文件拷贝到nifi-cluster02
目录下替换所有的文件 - 将
CN=Admin_OU=ApacheNIFI.p12
和CN=Admin_OU=ApacheNIFI.password
拷贝到桌面备用,后续登录需要使用
配置单点Zookeeper相关
创建目录及id
进入nifi-ncm
的目录,创建woker目录,并把server id写到文件中,命令如下:
D:\DevelopTools\nifi-ncm>mkdir -p state\zookeeper
D:\DevelopTools\nifi-ncm>echo -n '1' > state/zookeeper/myid
更新ZK配置
进入nifi-ncm
的conf目录,打开zookeeper.properties
文件,内容更新参考如下:
clientPort=2181
initLimit=10
autopurge.purgeInterval=24
syncLimit=5
tickTime=2000
dataDir=./state/zookeeper
autopurge.snapRetainCount=30
# 只需要配置端口服务
server.1=localhost:2181
更新Nifi配置
进入nifi-ncm
的conf目录,打开nifi.properties
文件,更新如下的配置属性:
nifi.state.management.embedded.zookeeper.start=true
# zookeeper properties, used for cluster management #
# 另外两个节点,只要编辑此字段即可
nifi.zookeeper.connect.string=localhost:2181
更新State配置
进入nifi-ncm
的conf目录,打开state-management.xml
文件,更新zookeeper配置,如下:
<cluster-provider>
<id>zk-provider</id>
<class>org.apache.nifi.controller.state.providers.zookeeper.ZooKeeperStateProvider</class>
<property name="Connect String">localhost:2181</property>
<property name="Root Node">/nifi</property>
<property name="Session Timeout">10 seconds</property>
<property name="Access Control">Open</property>
</cluster-provider>
配置Nifi Admin
添加Admin用户
进入nifi-ncm
的conf目录,打开authorizers.xml
文件,找到file-provider添加如下配置:
<authorizer>
<identifier>file-provider</identifier>
<class>org.apache.nifi.authorization.FileAuthorizer</class>
<property name="Authorizations File">./conf/authorizations.xml</property>
<property name="Users File">./conf/users.xml</property>
<property name="Initial Admin Identity">CN=Admin, OU=ApacheNifi</property>
<property name="Legacy Authorized Users File"></property>
<property name="Node Identity 1">CN=localhost, OU=NIFI</property>
<property name="Node Identity 2">CN=localhost_2, OU=NIFI</property>
<property name="Node Identity 3">CN=localhost_3, OU=NIFI</property>
</authorizer>
然后把此文件同时拷贝到别外两个节点目录。
安装证书
打开谷歌浏览器,在设置中找到安全选项中找到管理证书,点击Import开始导入上面生成的证书:CN=Admin_OU=ApacheNIFI.p12
,密码在后缀名为.password
的文件中,如下图所示:
启动Nifi服务
进入到Nifi安装目录,然后在bin目录中找到run-nifi.bat文件并双击运行,注意启动的顺序: nifi-ncm-->nifi-cluster01/2,等待片刻后(可能会有点久,需要一个选举的过程)打开浏览器输入https://localhost:9443/nifi
,选择刚刚导入的证书,如看到下面的画面表示启动成功:
用户策略
刚登录NIFI页面时,你会发现图标都是灰色的,需要赋予相应的权限才可以开始编辑权限才可以开始编辑。点击页面左侧面板上的钥匙图标,会弹出访问策略的窗口,如下图所示:
在此会看到用户列表为空,那么就要给相应的行为添加用户,点击Create链接即可开始添加,如下图所示:
待所有的权限添加完成后,便可看到NIFI页面的按钮已经点亮,可以开始创建流程。
示例演示
模板上传
下载WordCountDemo.zip压缩包,解压出来有个WordCountDemo.xml
文件。然后打开浏览器输入NIFI访问地址: https://localhost:9443/nifi/
,点击左侧面板中的上传按钮上传模板,如下图所示:
模板代码
Nifi
所有的流程可以导出为模板保存,输出的模板为XML文档,关键部分都加了注释说明,如下所示:
<?xml version="1.0" ?>
<template encoding-version="1.1">
<description>Local word count demo</description>
<groupId>326f6452-015f-1000-99be-1d670a0ae923</groupId>
<!-- Flow组的名称 --->
<name>WordCountDemo</name>
<snippet>
<processGroups>
<id>cf1fdc67-56e0-3629-0000-000000000000</id>
<parentGroupId>d6c1b1d9-24fa-3e40-0000-000000000000</parentGroupId>
<position>
<x>0.0</x>
<y>0.0</y>
</position>
<comments>Local Word Count Demo</comments>
<contents>
<connections>
<id>adf2a3c8-b97d-38b4-0000-000000000000</id>
<parentGroupId>cf1fdc67-56e0-3629-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>cf1fdc67-56e0-3629-0000-000000000000</groupId>
<id>be4176ae-781f-3eef-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>success</selectedRelationships>
<source>
<groupId>cf1fdc67-56e0-3629-0000-000000000000</groupId>
<id>5d756769-97bb-3dde-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>2870ac2c-9995-371f-0000-000000000000</id>
<parentGroupId>cf1fdc67-56e0-3629-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>cf1fdc67-56e0-3629-0000-000000000000</groupId>
<id>f257102e-8389-3fc4-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>success</selectedRelationships>
<source>
<groupId>cf1fdc67-56e0-3629-0000-000000000000</groupId>
<id>be4176ae-781f-3eef-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<labels>
<id>de134a82-8649-373d-0000-000000000000</id>
<parentGroupId>cf1fdc67-56e0-3629-0000-000000000000</parentGroupId>
<position>
<x>808.7726989746093</x>
<y>39.81819076538085</y>
</position>
<height>426.0</height>
<label>WordCountDemo</label>
<style>
<entry>
<key>background-color</key>
<value>#36a377</value>
</entry>
<entry>
<key>font-size</key>
<value>12px</value>
</entry>
</style>
<width>938.0</width>
</labels>
<processors>
<id>be4176ae-781f-3eef-0000-000000000000</id>
<parentGroupId>cf1fdc67-56e0-3629-0000-000000000000</parentGroupId>
<position>
<x>1165.7726989746093</x>
<y>294.8181945800781</y>
</position>
<bundle>
<artifact>nifi-scripting-nar</artifact>
<group>org.apache.nifi</group>
<version>1.4.0</version>
</bundle>
<config>
<bulletinLevel>INFO</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Script Engine</key>
<value>
<name>Script Engine</name>
</value>
</entry>
<entry>
<key>Script File</key>
<value>
<name>Script File</name>
</value>
</entry>
<entry>
<key>Script Body</key>
<value>
<name>Script Body</name>
</value>
</entry>
<entry>
<key>Module Directory</key>
<value>
<name>Module Directory</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Script Engine</key>
<value>Groovy</value>
</entry>
<entry>
<key>Script File</key>
</entry>
<entry>
<key>Script Body</key>
<!-- 计算WORD的代码 -->
<value>import org.apache.commons.io.IOUtils
import java.nio.charset.*
def flowFile = session.get()
if(!flowFile) return
flowFile = session.write(flowFile, {inputStream, outputStream ->
def wordCount = [:]
def tellTaleHeart = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
def words = tellTaleHeart.split(/(!|\?|-|\.|\"|:|;|,|\s)+/)*.toLowerCase()
words.each { word ->
def currentWordCount = wordCount.get(word)
if(!currentWordCount) {
wordCount.put(word, 1)
}
else {
wordCount.put(word, currentWordCount + 1)
}
}
def outputMapString = wordCount.inject("", {k,v -> k += "${v.key}: ${v.value}\n"})
outputStream.write(outputMapString.getBytes(StandardCharsets.UTF_8))
} as StreamCallback)
flowFile = session.putAttribute(flowFile, 'filename', 'telltale_heart_wordcount')
session.transfer(flowFile, REL_SUCCESS)</value>
</entry>
<entry>
<key>Module Directory</key>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>ExecuteScript</name>
<relationships>
<autoTerminate>true</autoTerminate>
<name>failure</name>
</relationships>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<state>STOPPED</state>
<style></style>
<!-- 指定执行器的类型 --> <type>org.apache.nifi.processors.script.ExecuteScript</type>
</processors>
<processors>
<id>f257102e-8389-3fc4-0000-000000000000</id>
<parentGroupId>cf1fdc67-56e0-3629-0000-000000000000</parentGroupId>
<position>
<x>1354.7726989746093</x>
<y>75.81820983886718</y>
</position>
<bundle>
<artifact>nifi-standard-nar</artifact>
<group>org.apache.nifi</group>
<version>1.4.0</version>
</bundle>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Directory</key>
<value>
<name>Directory</name>
</value>
</entry>
<entry>
<key>Conflict Resolution Strategy</key>
<value>
<name>Conflict Resolution Strategy</name>
</value>
</entry>
<entry>
<key>Create Missing Directories</key>
<value>
<name>Create Missing Directories</name>
</value>
</entry>
<entry>
<key>Maximum File Count</key>
<value>
<name>Maximum File Count</name>
</value>
</entry>
<entry>
<key>Last Modified Time</key>
<value>
<name>Last Modified Time</name>
</value>
</entry>
<entry>
<key>Permissions</key>
<value>
<name>Permissions</name>
</value>
</entry>
<entry>
<key>Owner</key>
<value>
<name>Owner</name>
</value>
</entry>
<entry>
<key>Group</key>
<value>
<name>Group</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<!-- 结果保存目录 -->
<key>Directory</key>
<value>/data/tmp/</value>
</entry>
<entry>
<key>Conflict Resolution Strategy</key>
<value>fail</value>
</entry>
<entry>
<key>Create Missing Directories</key>
<value>true</value>
</entry>
<entry>
<key>Maximum File Count</key>
</entry>
<entry>
<key>Last Modified Time</key>
</entry>
<entry>
<key>Permissions</key>
</entry>
<entry>
<key>Owner</key>
</entry>
<entry>
<key>Group</key>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>PutFile</name>
<relationships>
<autoTerminate>true</autoTerminate>
<name>failure</name>
</relationships>
<relationships>
<autoTerminate>true</autoTerminate>
<name>success</name>
</relationships>
<state>STOPPED</state>
<style></style>
<type>org.apache.nifi.processors.standard.PutFile</type>
</processors>
<processors>
<id>5d756769-97bb-3dde-0000-000000000000</id>
<parentGroupId>cf1fdc67-56e0-3629-0000-000000000000</parentGroupId>
<position>
<x>834.7726989746093</x>
<y>81.31820983886718</y>
</position>
<bundle>
<artifact>nifi-standard-nar</artifact>
<group>org.apache.nifi</group>
<version>1.4.0</version>
</bundle>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments>Generate File Source</comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>File Size</key>
<value>
<name>File Size</name>
</value>
</entry>
<entry>
<key>Batch Size</key>
<value>
<name>Batch Size</name>
</value>
</entry>
<entry>
<key>Data Format</key>
<value>
<name>Data Format</name>
</value>
</entry>
<entry>
<key>Unique FlowFiles</key>
<value>
<name>Unique FlowFiles</name>
</value>
</entry>
<entry>
<key>generate-ff-custom-text</key>
<value>
<name>generate-ff-custom-text</name>
</value>
</entry>
<entry>
<key>character-set</key>
<value>
<name>character-set</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>File Size</key>
<value>0B</value>
</entry>
<entry>
<key>Batch Size</key>
<value>1</value>
</entry>
<entry>
<key>Data Format</key>
<value>Text</value>
</entry>
<entry>
<key>Unique FlowFiles</key>
<value>false</value>
</entry>
<entry>
<!-- 需要统计的文本内容 -->
<key>generate-ff-custom-text</key>
<value>Put simply NiFi was built to automate the flow of data between systems. While the term dataflow is used in a variety of contexts, we use it here to mean the automated and managed flow of information between systems. This problem space has been around ever since enterprises had more than one system, where some of the systems created data and some of the systems consumed data. The problems and solution patterns that emerged have been discussed and articulated extensively. A comprehensive and readily consumed form is found in the Enterprise Integration Patterns [eip].
Some of the high-level challenges of dataflow include:
Systems fail
Networks fail, disks fail, software crashes, people make mistakes.
Data access exceeds capacity to consume
Sometimes a given data source can outpace some part of the processing or delivery chain - it only takes one weak-link to have an issue.
Boundary conditions are mere suggestions
You will invariably get data that is too big, too small, too fast, too slow, corrupt, wrong, or in the wrong format.
What is noise one day becomes signal the next
Priorities of an organization change - rapidly. Enabling new flows and changing existing ones must be fast.</value>
</entry>
<entry>
<key>character-set</key>
<value>UTF-8</value>
</entry>
</properties>
<runDurationMillis>2000</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>GenerateFlowFile</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<state>STOPPED</state>
<style>
<entry>
<key>background-color</key>
<value>#0945eb</value>
</entry>
</style>
<type>org.apache.nifi.processors.standard.GenerateFlowFile</type>
</processors>
</contents>
<name>WordCountDemo</name>
</processGroups>
</snippet>
<timestamp>10/24/2017 10:46:13 CST</timestamp>
</template>
创建流程
拖动NIFI页面顶部的模板按钮到画板空白处,点击ADD按钮即可,然后双击打开WordCountDemo
组找到PutFile
组件,修改目录地址为你机器的实际可访问路径,如下图所示:
启动流程
点击NIFI页面左下角的NiFi Flow
链接返回到主面板,点击WordCountDemo
组,然后点击左侧面板中的开始按钮启动流程,如下图所示:
如无异常那么此时你可在目录下找到名为telltale_heart_wordcount的文件,打开便可看到如下图的统计内容:
至此在本地搭建NIFI伪集群就完成了,有问题欢迎留言。Apache Nifi在Windows环境下搭建伪群集及证书登录