最近在做企业安全建设,企业安全建设中最常见的一项就是做监控,监控的种类多种多样,但是底层的技术栈却基本是一致的————大数据技术,下面我记录一下我最近学习到的一些大数据技术,下文只是描述个脉络而已。

大数据的技术栈,以及对应的上下依赖图如下:
大数据技术栈浅述-LMLPHP

看完这个图,是不是觉得和之前学习过的网络协议、框架都非常相识,无非就是把里面的名词替换了一下而已。我感觉软件产品的设计思路都是要分模块化、解耦合,你看TCP/IP协议层,每层都各司其职,每层里面的每个功能也是按照这个总体思路继续向下设计。解耦合的好处很多,建议自行百度。

我个人觉得,里面比较有难度的就是Flink那块,因为对数据的分析、计算处理都是在这一块中完成的,Flink也可以用storm替换,不过性能没有flink好。
当将计算结果存储到ES之后,就可以做很多事了,比如做自动告警功能了。


数据源

数据源可以是任何数据,不过现在采集最多的应该就是日志类数据


Filebeat

采集器是最容易理解的,主要是用来汇总日志然后转发的,采集器的技术方案也有很多,这里举例filebeat。

Filebeat主要由两个组件构成:prospector(探测器)harvester(收集器),这两类组件一起协作完成Filebeat的工作。

Filebeat的工作流程如下:
当开启Filebeat程序的时候,它会启动一个或多个探测器去检测指定的日志目录或文件,对于探测器找出的每一个日志文件,Filebeat会启动收集进程,每一个收集进程读取一个日志文件的内容,然后将这些日志数据发送到后台处理程序,后台处理程序会集合这些事件,最后发送集合的数据到output指定的目的地。

Filebeat在有数据源的机器安装好之后,要做的就是写一下配置,
主要配置读取文件的路径,以及输出流的位置以及相应的性能参数等,以Kafka消息中间件作为缓冲,所有的日志收集器都向Kafka输送日志流。

定义日志信息输出格式:

<Properties>
        //存放日志的文件夹名称
        <Property name="LOG_HOME">logs</Property>
        //日志文件名称
        <property name="FILE_NAME">collector</property>
        //日志格式
        //[%d{yyyy-MM-dd'T'HH:mm:ss.SSSZZ}] 日志输入时间,东八区
        //[%level{length=5}]    日志级别,debug、info、warn、error
        //[%thread-%tid]    当前线程信息
        //[%logger] 当前日志信息所属类全路径
        //[%X{hostName}]    当前节点主机名。需要通过MDC来自定义。
        //[%X{ip}]  当前节点ip。需要通过MDC来自定义。
        //[%X{applicationName}] 当前应用程序名。需要通过MDC来自定义。
        //[%F,%L,%C,%M] %F:当前日志信息所属的文件(类)名,%L:日志信息在所属文件中的行号,%C:当前日志所属文件的全类名,%M:当前日志所属的方法名
        //[%m]  日志详情
        //%ex   异常信息
        //%n    换行
        <property name="patternLayout">[%d{yyyy-MM-dd'T'HH:mm:ss.SSSZZ}] [%level{length=5}] [%thread-%tid] [%logger]
            [%X{hostName}] [%X{ip}] [%X{applicationName}] [%F,%L,%C,%M] [%m] ## '%ex'%n
</property>

Filebeat配置参考信息:

  paths:
    - /usr/local/logs/error-collector.log
  document_type: "error-log"
  multiline:
    # pattern: '^\s*(\d{4}|\d{2})\-(\d{2}|[a-zA-Z]{3})\-(\d{2}|\d{4})'
    # 指定匹配的表达式(匹配以 2017-11-15 08:04:23:889 时间格式开头的字符串)
    pattern: '^\['                                # 指定匹配的表达式(匹配以 "{ 开头的字符串)
    negate: true                                # 是否匹配到
    match: after                                # 合并到上一行的末尾
    max_lines: 2000                         # 最大的行数
    timeout: 2s                                 # 如果在规定时间没有新的日志事件就不等待后面的日志
  fields:
    logbiz: collector
    logtopic: error-log-collector   ## 按服务划分用作kafka topic
    evn: dev

output.kafka:
  enabled: true
  hosts: ["192.168.204.139:9092"]
  topic: '%{[fields.logtopic]}'
  partition.hash:
    reachable_only: true
  compression: gzip
  max_message_bytes: 1000000
  required_acks: 1
logging.to_files: true

Kafka

Apache kafka是消息中间件的一种,功能是高吞吐量的分布式发布订阅消息系统

Kafka特点:
kafka中的消息不是kafka主动去拉去的,而必须有生产者往kafka写消息。
kafka是不会主动往消费者发布消息的,而必须有消费者主动从kafka拉取消息。

kafka名词解释:
kafka的几个名词需要知道一下,比如topic、producer、consumer、broker,下面用最俗的方式解释

  • producer:生产者,就是它来生产“鸡蛋”的。
  • consumer:消费者,生出的“鸡蛋”它来消费。
  • topic:你把它理解为标签,生产者每生产出来一个鸡蛋就贴上一个标签(topic),消费者可不是谁生产的“鸡蛋”都吃的,这样不同的生产者生产出来的“鸡蛋”,消费者就可以选择性的“吃”了。
  • broker:相当于菜市场的小贩,小贩从生产者手里收购了鸡蛋,然后一直存储在商店中,等待消费者来购买。他在中间作鸡蛋的存储、转发、接受顾客问价(请求)和回答(响应)等功能。
    一个单独的Kafka Server就是一个Broker。在一般的生产环境中,一个Broker独占一台物理服务器。Broker的主要工作就是接收生产者发过来的消息,分配offset,之后保存到磁盘中。同时,接收消费者、其他Broker的请求,根据请求类型进行相应处理并返回响应。

kafka的单节点基本操作:
生产者

# 创建一个主题(标签),Hello-Kafka
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic Hello-Kafka
# 生产者将等待来自stdin的输入并发布到Kafka集群。 默认情况下,每个新行都作为新消息发布,然后在 config / producer.properties 文件中指定默认生产者属性。

# 在终端中键入几行消息
egg1
egg2

消费者

# 与生产者类似,在 config / consumer.proper-ties 文件中指定了缺省使用者属性。 打开一个新终端并键入以下消息消息语法。

bin/kafka-console-consumer.sh --zookeeper localhost:2181 —topic Hello-Kafka
--from-beginning

# 自动出现
egg1
egg2

Flink

Flink核心是一个流式的数据流执行引擎,其针对数据流的分布式计算提供了数据分布、数据通信以及容错机制等功。
简单的说就是,Flink可以对数据流进行转换、计算、聚合等功能。如果你采集的数据需要做告警功能,那么就需要用Flink或者storm,如果只是将采集的数据进行存储,然后展示,那么就不需要用到Flink这种技术。

比如在企业安全建设中,做监控平台就需要有告警功能,采集到的监控数据会直接往 kafka 里塞,然后告警这边需要从 kafka topic 里面实时读取到监控数据,并将读取到的监控数据做一些 转换、计算、聚合等操作,然后将计算后的结果与告警规则的阈值进行比较,然后做出相应的告警措施(钉钉群、邮件、短信、电话等)。画了个简单的图如下:
大数据技术栈浅述-LMLPHP

flink处理静态sql的代理流程:
大数据技术栈浅述-LMLPHP

这个sql只能是写死在代码里面,如果是想要动态的修改sql,那么就要重启flink服务才能生效。

但是有个需求,就像下图这样,sql语句来之外部,因为需要让安全人员来描述规则,他们跟进安全态势来修改,并且需要常常更新规则来挖掘出最新安全事件,
大数据技术栈浅述-LMLPHP
那么就出现一个问题了,像上面的flink只能处理静态sql,想动态处理怎么办?

使用 flink-siddhi 来处理动态sql:
SIDDHI 是一款功能强大的open source CEP(Complex Event Processing)引擎引擎,具有自己的DSL,丰富的模式匹配功能和可扩展性,

使用Siddhi 引擎的好处就是,里面的sql语句可以任意修改,修改sql后,也不需要重启flink服务。
siddhi引擎我最近也是刚开始学习,这里就不过多笔墨了,后面会出siddhi的专项文章。


ES

ES太常见了,以后有空在补充吧。


Kibana

Kibana也很常见,以后有空在补充吧。希望读者给个评论或者推荐,让我有动力更新完。


参考

https://www.cnblogs.com/monument/p/12944718.html
https://www.jianshu.com/p/a8b66f586fd4
http://kafka.apachecn.org/
https://www.w3cschool.cn/apache_kafka/apache_kafka_introduction.html
https://blog.csdn.net/leanaoo/article/details/84310604
https://ci.apache.org/projects/flink/flink-docs-release-1.4/
https://www.cnblogs.com/fxjwind/p/5048583.html
https://baijiahao.baidu.com/s?id=1623279487849430246&wfr=spider&for=pc

07-30 23:04