前些时间做了关于Apache Nifi分布式集群的搭建分享,但很多时候要搭建分布式集群机器资源是个问题,而现在的单机的配置还是相当不错的,故现在就做个关于Windows上搭建个伪分布式集群的分享,同时通过另外一种方式实现Apache Nifi的授权认证。

系统环境及软件版本

  • Windows8.1

  • JDK1.8.0_131

  • Nifi-1.4.0

xxx\nifi-ncm9443
xxx\nifi-cluster019444
xxx\nifi-cluster029445

Nifi的服务证书

生成本地Nifi服务证书

解压nifi-toolkit-1.4.0-bin.tar.gz文件后,通过CMD进入bin目录,执行以下的命令:

D:\DevelopTools\nifi-toolkit-1.4.0\bin>tls-toolkit.bat standalone -n "localhost(
3)" -C "CN=Admin, OU=ApacheNIFI" -o "..\target"
2017/10/26 18:21:32 INFO [main] org.apache.nifi.toolkit.tls.standalone.TlsToolki
tStandaloneCommandLine: No nifiPropertiesFile specified, using embedded one.
2017/10/26 18:21:32 INFO [main] org.apache.nifi.toolkit.tls.standalone.TlsToolki
tStandalone: Running standalone certificate generation with output directory ..\
target
******************************************************************************
2017/10/26 18:21:34 INFO [main] org.apache.nifi.toolkit.tls.standalone.TlsToolki
tStandalone: Successfully generated client certificate ..\target\CN=Admin_OU=Apa
cheNIFI.p12
2017/10/26 18:21:34 INFO [main] org.apache.nifi.toolkit.tls.standalone.TlsToolki
tStandalone: tls-toolkit standalone completed successfully

生成后的目录结构如下:

Folder PATH listing for volume senhui.li
Volume serial number is 000000F0 FA46:A0EB
D:.
│ CN=Admin_OU=ApacheNIFI.p12
│ CN=Admin_OU=ApacheNIFI.password
│ nifi-cert.pem
│ nifi-key.key

├─localhost
│ keystore.jks
│ nifi.properties
│ truststore.jks

├─localhost_2
│ keystore.jks
│ nifi.properties
│ truststore.jks

└─localhost_3
keystore.jks
nifi.properties
truststore.jks

拷贝Nifi服务证书

  • localhost目录下的文件拷贝到nifi-ncm目录下替换所有的文件
  • localhost_2目录下的文件拷贝到nifi-cluster01目录下替换所有的文件
  • localhost_3目录下的文件拷贝到nifi-cluster02目录下替换所有的文件
  • CN=Admin_OU=ApacheNIFI.p12CN=Admin_OU=ApacheNIFI.password拷贝到桌面备用,后续登录需要使用

配置单点Zookeeper相关

创建目录及id

进入nifi-ncm的目录,创建woker目录,并把server id写到文件中,命令如下:

D:\DevelopTools\nifi-ncm>mkdir -p state\zookeeper
D:\DevelopTools\nifi-ncm>echo -n '1' > state/zookeeper/myid

更新ZK配置

进入nifi-ncm的conf目录,打开zookeeper.properties文件,内容更新参考如下:

clientPort=2181
initLimit=10
autopurge.purgeInterval=24
syncLimit=5
tickTime=2000
dataDir=./state/zookeeper
autopurge.snapRetainCount=30 # 只需要配置端口服务
server.1=localhost:2181

更新Nifi配置

进入nifi-ncm的conf目录,打开nifi.properties文件,更新如下的配置属性:

nifi.state.management.embedded.zookeeper.start=true

# zookeeper properties, used for cluster management #
# 另外两个节点,只要编辑此字段即可
nifi.zookeeper.connect.string=localhost:2181

更新State配置

进入nifi-ncm的conf目录,打开state-management.xml文件,更新zookeeper配置,如下:


<cluster-provider>
<id>zk-provider</id>
<class>org.apache.nifi.controller.state.providers.zookeeper.ZooKeeperStateProvider</class>
<property name="Connect String">localhost:2181</property>
<property name="Root Node">/nifi</property>
<property name="Session Timeout">10 seconds</property>
<property name="Access Control">Open</property>
</cluster-provider>

配置Nifi Admin

添加Admin用户

进入nifi-ncm的conf目录,打开authorizers.xml文件,找到file-provider添加如下配置:

<authorizer>
<identifier>file-provider</identifier>
<class>org.apache.nifi.authorization.FileAuthorizer</class>
<property name="Authorizations File">./conf/authorizations.xml</property>
<property name="Users File">./conf/users.xml</property>
<property name="Initial Admin Identity">CN=Admin, OU=ApacheNifi</property>
<property name="Legacy Authorized Users File"></property> <property name="Node Identity 1">CN=localhost, OU=NIFI</property>
<property name="Node Identity 2">CN=localhost_2, OU=NIFI</property>
<property name="Node Identity 3">CN=localhost_3, OU=NIFI</property>
</authorizer>

然后把此文件同时拷贝到别外两个节点目录。

安装证书

打开谷歌浏览器,在设置中找到安全选项中找到管理证书,点击Import开始导入上面生成的证书:CN=Admin_OU=ApacheNIFI.p12,密码在后缀名为.password的文件中,如下图所示:

Apache Nifi在Windows环境下搭建伪群集及证书登录-LMLPHP

Apache Nifi在Windows环境下搭建伪群集及证书登录-LMLPHP

启动Nifi服务

进入到Nifi安装目录,然后在bin目录中找到run-nifi.bat文件并双击运行,注意启动的顺序: nifi-ncm-->nifi-cluster01/2,等待片刻后(可能会有点久,需要一个选举的过程)打开浏览器输入https://localhost:9443/nifi,选择刚刚导入的证书,如看到下面的画面表示启动成功:

Apache Nifi在Windows环境下搭建伪群集及证书登录-LMLPHP

Apache Nifi在Windows环境下搭建伪群集及证书登录-LMLPHP

用户策略

刚登录NIFI页面时,你会发现图标都是灰色的,需要赋予相应的权限才可以开始编辑权限才可以开始编辑。点击页面左侧面板上的钥匙图标,会弹出访问策略的窗口,如下图所示:

在此会看到用户列表为空,那么就要给相应的行为添加用户,点击Create链接即可开始添加,如下图所示:

Apache Nifi在Windows环境下搭建伪群集及证书登录-LMLPHP

Apache Nifi在Windows环境下搭建伪群集及证书登录-LMLPHP

Apache Nifi在Windows环境下搭建伪群集及证书登录-LMLPHP

待所有的权限添加完成后,便可看到NIFI页面的按钮已经点亮,可以开始创建流程。

示例演示

模板上传

下载WordCountDemo.zip压缩包,解压出来有个WordCountDemo.xml文件。然后打开浏览器输入NIFI访问地址: https://localhost:9443/nifi/,点击左侧面板中的上传按钮上传模板,如下图所示:

Apache Nifi在Windows环境下搭建伪群集及证书登录-LMLPHP

模板代码

Nifi所有的流程可以导出为模板保存,输出的模板为XML文档,关键部分都加了注释说明,如下所示:

<?xml version="1.0" ?>
<template encoding-version="1.1">
<description>Local word count demo</description>
<groupId>326f6452-015f-1000-99be-1d670a0ae923</groupId>
<!-- Flow组的名称 --->
<name>WordCountDemo</name>
<snippet>
<processGroups>
<id>cf1fdc67-56e0-3629-0000-000000000000</id>
<parentGroupId>d6c1b1d9-24fa-3e40-0000-000000000000</parentGroupId>
<position>
<x>0.0</x>
<y>0.0</y>
</position>
<comments>Local Word Count Demo</comments>
<contents>
<connections>
<id>adf2a3c8-b97d-38b4-0000-000000000000</id>
<parentGroupId>cf1fdc67-56e0-3629-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>cf1fdc67-56e0-3629-0000-000000000000</groupId>
<id>be4176ae-781f-3eef-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>success</selectedRelationships>
<source>
<groupId>cf1fdc67-56e0-3629-0000-000000000000</groupId>
<id>5d756769-97bb-3dde-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>2870ac2c-9995-371f-0000-000000000000</id>
<parentGroupId>cf1fdc67-56e0-3629-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>cf1fdc67-56e0-3629-0000-000000000000</groupId>
<id>f257102e-8389-3fc4-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>success</selectedRelationships>
<source>
<groupId>cf1fdc67-56e0-3629-0000-000000000000</groupId>
<id>be4176ae-781f-3eef-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<labels>
<id>de134a82-8649-373d-0000-000000000000</id>
<parentGroupId>cf1fdc67-56e0-3629-0000-000000000000</parentGroupId>
<position>
<x>808.7726989746093</x>
<y>39.81819076538085</y>
</position>
<height>426.0</height>
<label>WordCountDemo</label>
<style>
<entry>
<key>background-color</key>
<value>#36a377</value>
</entry>
<entry>
<key>font-size</key>
<value>12px</value>
</entry>
</style>
<width>938.0</width>
</labels>
<processors>
<id>be4176ae-781f-3eef-0000-000000000000</id>
<parentGroupId>cf1fdc67-56e0-3629-0000-000000000000</parentGroupId>
<position>
<x>1165.7726989746093</x>
<y>294.8181945800781</y>
</position>
<bundle>
<artifact>nifi-scripting-nar</artifact>
<group>org.apache.nifi</group>
<version>1.4.0</version>
</bundle>
<config>
<bulletinLevel>INFO</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Script Engine</key>
<value>
<name>Script Engine</name>
</value>
</entry>
<entry>
<key>Script File</key>
<value>
<name>Script File</name>
</value>
</entry>
<entry>
<key>Script Body</key>
<value>
<name>Script Body</name>
</value>
</entry>
<entry>
<key>Module Directory</key>
<value>
<name>Module Directory</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Script Engine</key>
<value>Groovy</value>
</entry>
<entry>
<key>Script File</key>
</entry>
<entry>
<key>Script Body</key>
<!-- 计算WORD的代码 -->
<value>import org.apache.commons.io.IOUtils
import java.nio.charset.*
def flowFile = session.get()
if(!flowFile) return
flowFile = session.write(flowFile, {inputStream, outputStream -&gt;
def wordCount = [:]
def tellTaleHeart = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
def words = tellTaleHeart.split(/(!|\?|-|\.|\"|:|;|,|\s)+/)*.toLowerCase()
words.each { word -&gt;
def currentWordCount = wordCount.get(word)
if(!currentWordCount) {
wordCount.put(word, 1)
}
else {
wordCount.put(word, currentWordCount + 1)
}
}
def outputMapString = wordCount.inject("", {k,v -&gt; k += "${v.key}: ${v.value}\n"}) outputStream.write(outputMapString.getBytes(StandardCharsets.UTF_8))
} as StreamCallback)
flowFile = session.putAttribute(flowFile, 'filename', 'telltale_heart_wordcount')
session.transfer(flowFile, REL_SUCCESS)</value>
</entry>
<entry>
<key>Module Directory</key>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>ExecuteScript</name>
<relationships>
<autoTerminate>true</autoTerminate>
<name>failure</name>
</relationships>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<state>STOPPED</state>
<style></style>
<!-- 指定执行器的类型 --> <type>org.apache.nifi.processors.script.ExecuteScript</type>
</processors>
<processors>
<id>f257102e-8389-3fc4-0000-000000000000</id>
<parentGroupId>cf1fdc67-56e0-3629-0000-000000000000</parentGroupId>
<position>
<x>1354.7726989746093</x>
<y>75.81820983886718</y>
</position>
<bundle>
<artifact>nifi-standard-nar</artifact>
<group>org.apache.nifi</group>
<version>1.4.0</version>
</bundle>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Directory</key>
<value>
<name>Directory</name>
</value>
</entry>
<entry>
<key>Conflict Resolution Strategy</key>
<value>
<name>Conflict Resolution Strategy</name>
</value>
</entry>
<entry>
<key>Create Missing Directories</key>
<value>
<name>Create Missing Directories</name>
</value>
</entry>
<entry>
<key>Maximum File Count</key>
<value>
<name>Maximum File Count</name>
</value>
</entry>
<entry>
<key>Last Modified Time</key>
<value>
<name>Last Modified Time</name>
</value>
</entry>
<entry>
<key>Permissions</key>
<value>
<name>Permissions</name>
</value>
</entry>
<entry>
<key>Owner</key>
<value>
<name>Owner</name>
</value>
</entry>
<entry>
<key>Group</key>
<value>
<name>Group</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<!-- 结果保存目录 -->
<key>Directory</key>
<value>/data/tmp/</value>
</entry>
<entry>
<key>Conflict Resolution Strategy</key>
<value>fail</value>
</entry>
<entry>
<key>Create Missing Directories</key>
<value>true</value>
</entry>
<entry>
<key>Maximum File Count</key>
</entry>
<entry>
<key>Last Modified Time</key>
</entry>
<entry>
<key>Permissions</key>
</entry>
<entry>
<key>Owner</key>
</entry>
<entry>
<key>Group</key>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>PutFile</name>
<relationships>
<autoTerminate>true</autoTerminate>
<name>failure</name>
</relationships>
<relationships>
<autoTerminate>true</autoTerminate>
<name>success</name>
</relationships>
<state>STOPPED</state>
<style></style>
<type>org.apache.nifi.processors.standard.PutFile</type>
</processors>
<processors>
<id>5d756769-97bb-3dde-0000-000000000000</id>
<parentGroupId>cf1fdc67-56e0-3629-0000-000000000000</parentGroupId>
<position>
<x>834.7726989746093</x>
<y>81.31820983886718</y>
</position>
<bundle>
<artifact>nifi-standard-nar</artifact>
<group>org.apache.nifi</group>
<version>1.4.0</version>
</bundle>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments>Generate File Source</comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>File Size</key>
<value>
<name>File Size</name>
</value>
</entry>
<entry>
<key>Batch Size</key>
<value>
<name>Batch Size</name>
</value>
</entry>
<entry>
<key>Data Format</key>
<value>
<name>Data Format</name>
</value>
</entry>
<entry>
<key>Unique FlowFiles</key>
<value>
<name>Unique FlowFiles</name>
</value>
</entry>
<entry>
<key>generate-ff-custom-text</key>
<value>
<name>generate-ff-custom-text</name>
</value>
</entry>
<entry>
<key>character-set</key>
<value>
<name>character-set</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>File Size</key>
<value>0B</value>
</entry>
<entry>
<key>Batch Size</key>
<value>1</value>
</entry>
<entry>
<key>Data Format</key>
<value>Text</value>
</entry>
<entry>
<key>Unique FlowFiles</key>
<value>false</value>
</entry>
<entry>
<!-- 需要统计的文本内容 -->
<key>generate-ff-custom-text</key>
<value>Put simply NiFi was built to automate the flow of data between systems. While the term dataflow is used in a variety of contexts, we use it here to mean the automated and managed flow of information between systems. This problem space has been around ever since enterprises had more than one system, where some of the systems created data and some of the systems consumed data. The problems and solution patterns that emerged have been discussed and articulated extensively. A comprehensive and readily consumed form is found in the Enterprise Integration Patterns [eip]. Some of the high-level challenges of dataflow include: Systems fail
Networks fail, disks fail, software crashes, people make mistakes. Data access exceeds capacity to consume
Sometimes a given data source can outpace some part of the processing or delivery chain - it only takes one weak-link to have an issue. Boundary conditions are mere suggestions
You will invariably get data that is too big, too small, too fast, too slow, corrupt, wrong, or in the wrong format. What is noise one day becomes signal the next
Priorities of an organization change - rapidly. Enabling new flows and changing existing ones must be fast.</value>
</entry>
<entry>
<key>character-set</key>
<value>UTF-8</value>
</entry>
</properties>
<runDurationMillis>2000</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>GenerateFlowFile</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<state>STOPPED</state>
<style>
<entry>
<key>background-color</key>
<value>#0945eb</value>
</entry>
</style>
<type>org.apache.nifi.processors.standard.GenerateFlowFile</type>
</processors>
</contents>
<name>WordCountDemo</name>
</processGroups>
</snippet>
<timestamp>10/24/2017 10:46:13 CST</timestamp>
</template>

创建流程

拖动NIFI页面顶部的模板按钮到画板空白处,点击ADD按钮即可,然后双击打开WordCountDemo组找到PutFile组件,修改目录地址为你机器的实际可访问路径,如下图所示:

Apache Nifi在Windows环境下搭建伪群集及证书登录-LMLPHP

Apache Nifi在Windows环境下搭建伪群集及证书登录-LMLPHP

启动流程

点击NIFI页面左下角的NiFi Flow链接返回到主面板,点击WordCountDemo组,然后点击左侧面板中的开始按钮启动流程,如下图所示:

Apache Nifi在Windows环境下搭建伪群集及证书登录-LMLPHP

如无异常那么此时你可在目录下找到名为telltale_heart_wordcount的文件,打开便可看到如下图的统计内容:

Apache Nifi在Windows环境下搭建伪群集及证书登录-LMLPHP

至此在本地搭建NIFI伪集群就完成了,有问题欢迎留言。Apache Nifi在Windows环境下搭建伪群集及证书登录

05-03 23:34