Flume+Kafka+Spark Streaming 大数据分析处理

一、开启 Zookeeper 、Kafka

## 先去下载一个 Zookeeper
wget https://www.apache.org/dist/zookeeper/zookeeper-3.5.6/apache-zookeeper-3.5.6-bin.tar.gz
## 解压
tar xf apache-zookeeper-3.5.6-bin.tar.gz
## 创建软连接
ln -s apache-zookeeper-3.5.6-bin apache
## 启动
cd apache && ./bin/zkServer.sh start


## Kafka 下载
wget https://www.apache.org/dist/kafka/2.2.0/kafka_2.11-2.2.0.tgz
## 解压
tar xf kafka_2.11-2.2.0.tgz
## 创建软连接
ln -s kafka_2.11-2.2.0.tgz kafka
## 启动
cd kafka && ./bin/kafka-server-start.sh -daemon conf/server.properties
## 创建 Topic
./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic spark-test

二、配置 Flume 、启动

vim tail-log-kafka.conf

tail-memory-kafka.sources = tail-source
tail-memory-kafka.sinks = kafka-sink
tail-memory-kafka.channels = memory-channel

## 这里监听 nignx 日志文件
tail-memory-kafka.sources.tail-source.type = exec
tail-memory-kafka.sources.tail-source.command = tail -F /home/houzhenglan/app/nginx/logs/access.log
exec-memory-avro.sources.tail-source.shell = /bin/sh -c

tail-memory-kafka.channels.memory-channel.type = memory

tail-memory-kafka.sinks.kafka-sink.type = org.apache.flume.sink.kafka.KafkaSink
tail-memory-kafka.sinks.kafka-sink.kafka.bootstrap.servers = localhost:9092
## 这里将内容交给我们刚刚创建的 topic
tail-memory-kafka.sinks.kafka-sink.kafka.topic = spark-test
tail-memory-kafka.sinks.kafka-sink.kafka.flumeBatchSize = 5
tail-memory-kafka.sinks.kafka-sink.kafka.producer.acks = 1


tail-memory-kafka.sources.tail-source.channels = memory-channel
tail-memory-kafka.sinks.kafka-sink.channel = memory-channel


## 启动 Flume
./bin/flume-ng agent --conf conf --conf-file conf/tail-memory-kafka.conf --name tail-memory-kafka -Dflume.root.logger=INFO,console

三、编写 Spark Streaming 代码

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

#设置使用两个线程,设置程序的名字为KafkaWordCount
sc = SparkContext(master="local[2]",appName="KafkaWordCount")

#处理时间间隔为5s
ssc = StreamingContext(sc, 5)

#设置brokers
brokers = "node02:9092"



#设置要监听的主题
topic = ['spark-test', ]

#在/user/kafka/config/consumer.properties 查看groupid="test-consumer-group"
groupids = "test-consumer-group"


## 构建 KafkaUtils ,消费 topic
lines = KafkaUtils.createDirectStream(ssc, topic, kafkaParams={"metadata.broker.list": brokers})

#统计每个IP的访问量
## 101.206.170.38 - - [11/Nov/2019:10:38:54 +0800] "GET / HTTP/1.1" 200 637 "-" "Mozilla/5.0 (iPhone; CPU iPhone OS 12_1_1 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/12.0 Mobile/15E148 Safari/604.1"
lines_map = lines.map(lambda x: x[1]).flatMap(lambda line: (line.split(' ')[0], 1)).reduceByKey(lambda a,b: a + b).pprint()

## 出入内容,例如: (10.10.10.10, 5)  此ip在1分钟内访问的次数


#启动spark streaming
ssc.start()

#等待计算终止
ssc.awaitTermination()
01-07 21:41