网安需求:

1:IFTTT:随着物联网的兴起,if this then that 。如果出现这种情况,那么及时反映做出对应的操作。
判断手机号黑白名单,mac地址黑白名单。如果是碰到手机号或者mac地址黑名单,需要立即报警,通知当地网安部门。实现实时的黑白名单过滤。
2:路由器数据上报的方式:每三分钟上报一次,将最近三分钟所有人上网的情况以txt格式的形势进行上报。因为上传比较频繁,所以上传的数据全部都是以小文件的形势进行上传的,基本上不会有超过1M的文件。
且要保证做到所有的数据不丢不漏,不重不错。所有上报的数据需要能够做到立即查询。
3:所有的历史数据保存两年,且可供两年查询

商用路由器商户业务需求(新增)
1.实时查看附近的人流量有多少(实时处理任务。)
2.每天每小时入店上网人数有多少,统计出哪几个小时属于人流量高峰期(批量处理任务)
3.实时查看路由器周边一百米,三百米,五百米,一千米,三千米,五千米的人流量 (路由器周边人流量,实时处理任务。)

数据查询部分:使用的是mongoDB+ES的形式做数据查询。

数据平台架构:
需要考虑的地方:1.每天的数据量有多少
2.服务器的配置,包括网卡,CPU,内存,硬盘。
3.数据实时处理,选用哪种技术(考虑低成本,可维护)
4.数据批量处理,选用哪种技术(考虑低成本,可维护)

数据量情况:每天数据量大概在100G左右,全部都是以FTP小文件的形式进行上传。线上大概有8000台路由器(包括商户上网路由器+mac嗅探设备)
嗅探功能:以低于时速120KM/h + 开启移动端无线网 ==》可以获取到你的手机mac

大数据平台的架构设计:
一共七台机器:
两台主节点:64G内存+12T硬盘+内网千兆网卡。
五台从节点配置: 32G内存+12T硬盘+内网千兆网卡。

指定一台机器开通FTP,路由器以FTP方式上报小文件到服务器上面。启动一个Java线程实时读取目录下面上传的文件。
问题一:如何确认文件已经上传完成
问题二:文件的合并过程。

问题三:错误数据过滤。硬件上报的数据,很多都可能会出现错误数据的情况。

问题四: 每天晚上十二点的时候,不管合并数据量多大,都要上传

=======================================================

Storm第三天路由器数据监控

1、项目背景介绍:

随着路由器上网的普及,越来越多的人在各个场合选择使用路由器上网,特别是在一些公共场所,例如网吧,酒店,饭店,旅馆,宾馆,洗浴中心等。这些公共场所的网络安全也日益受到各地网安的关注,各种问题也日益凸显。

如何鉴别上网人员的身份问题?

如何通过路由器来解决人群聚集的问题?

如何对上网人员的身份进行追踪?

如何通过公共路由器来获取指定人员的行动轨迹?

如何获取上网人员的网络内容?

等等这些问题都困扰着各地的网安部门。为此我司特为各地网安推出定制化的路由器,在网安指定的公共地点安装我司的路由器,可以追踪每个人的上网情况,通过路由器或者嗅探设备的mac地址以及经纬度的追踪,可以定位每个人员的上网大致方位,了解每个人的上网内容,做到实时的网页内容监控,地理位置的监控,上网设备的mac地址追踪,通过嗅探设备,实现上网设备的实时路线追踪,为各地网安解决各种定制化的任务。

时速小于90KM/小时,并且开着无线网,嗅探设备就能抓到你的mac地址

2、项目数据处理流程

第2节 storm路由器项目开发:1 - 7、网络路由器项目-LMLPHP

第2节 storm路由器项目开发:1 - 7、网络路由器项目-LMLPHP

3、数据类型梳理

1、终端mac记录:

YT1013 MAC地址抓取记录表  "audit_mac_detail_" 表  length  25

获取到的所有的mac地址都会存入这种类型的文件中

YT1013=iumac,idmac,area_code,policeid,sumac,sdmac,datasource,netsite_type, capture_time,        netbar_wacode,          brandid, cache_ssid,terminal_filed_strength,ssid_position,access_ap_mac,access_ap_channel,access_ap_encryption_type,collection_equipment_id,collection_equipment_longitude,collection_equipment_latitude,wxid,province_code,city_code,typename,security_software_orgcode

2、虚拟身份记录

YT1020 虚拟身份抓取记录表  virtual_detail表  length 22

获取到所有的虚拟身份都会存入这种类型的文件中

YT1020=mobile,iumac,idmac,area_code,policeid,netsite_type,sumac,account_type,soft_type,sdmac,netbar_wacode,sessionid,ip_address,account,capture_time,collection_equipment_id,wxid,province_code,city_code,datasource,typename,security_software_orgcode

3、终端上下线记录

YT1023 终端上下线记录表  wifi_online表   length 51

获取到所有的终端上下线的记录都会存入到这种类型的文件中

YT1023=iumac,idmac,area_code,policeid,netsite_type,capture_time,sumac, sdmac, netbar_wacode,auth_type,auth_account, collection_equipment_id,datafrom,             start_time,end_time,ip_address,src_ip,src_port_start,src_port_end,src_port_start_v6,src_port_end_v6,dst_ip,dst_ip_ipv6,dst_port,dst_port_v6,certificate_type,certificate_code,app_company_name,app_software_name,app_version,appid,src_ipv6,longitude,latitude,sessionid,terminal_fieldstregth,x_coordinate,y_coordinate,name,imsi,imei_esn_meid,os_name,brand,model,network_app,port,wxid,province_code,city_code,typename,security_software_orgcode

4、搜索关键字记录

YT1033 搜索关键字记录表  searchkey_detail表   length  21

获取到所有的搜索记录关键字都会存入到这种类型的文件中

YT1033=iumac,idmac,area_code,policeid,netsite_type,capture_time,sumac,sdmac,netbar_wacode,src_ip,src_port,dst_ip,dst_port,search_groupid,http_domain,search_keyword,wxid,province_code,city_code,typename,security_software_orgcode

5、网页访问记录

YT1034 网页访问记录表 webpage_detail 表  length 24

获取到所有的网页访问记录都会存入到这种类型的文件中

YT1034=iumac,idmac,area_code,policeid,netsite_type,capture_time,sumac,sdmac,netbar_wacode,src_ip,src_port,dst_ip,dst_port,http_method,http_domain,http_action_match,web_url,http_categoryid,web_title,wxid,province_code,city_code,typename,security_software_orgcode

4、模拟数据产生

创建两个文件夹

mkdir  -p  /export/datas/destFile   # 拷贝后的文件

mkdir  -p /export/datas/sourceFile    #源文件

详见资料当中的filegenerate.jar

java -jar filegenerate.jar /export/datas/sourceFile/  /export/datas/destFile 1000

5、创建kafka的topic

bin/kafka-topics.sh --create  --replication-factor 2 --topic wifidata --zookeeper node01:2181,node02:2181,node03:2181 --partitions 6

6、定义flume配置文件并启动flume

配置kafka配置文件wifi.conf

#为我们的source channel  sink起名

a1.sources = r1

a1.channels = c1

a1.sinks = k1

#指定我们的source收集到的数据发送到哪个管道

a1.sources.r1.channels = c1

#指定我们的source数据收集策略

a1.sources.r1.type = spooldir

a1.sources.r1.spoolDir = /export/datas/destFile

a1.sources.r1.deletePolicy = immediate

a1.sources.r1.ignorePattern = ^(.)*\\.tmp$

#指定我们的channel为memory,即表示所有的数据都装进memory当中

a1.channels.c1.type = memory

#指定我们的sink为kafka  sink,并指定我们的sink从哪个channel当中读取数据

a1.sinks.k1.channel = c1

a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink

a1.sinks.k1.kafka.topic = wifidata

a1.sinks.k1.kafka.bootstrap.servers = node01:9092,node02:9092,node03:9092

a1.sinks.k1.kafka.flumeBatchSize = 20

a1.sinks.k1.kafka.producer.acks = 1

启动flume程序

bin/flume-ng agent -n a1 -c conf -f /export/servers/apache-flume-1.8.0-bin/conf/wifi.conf -Dflume.root.logger=INFO,console

7、创建maven工程并导入相应的jar包

8、开发我们的WifiTypeBolt

过滤掉字段不同的一些脏数据

9、开发我们的WifiWarningBolt

实时告警我们的mac黑名单,手机号码黑名单等

10、开发我们的WriteFileBolt

文件合并到本地一定大小之后,就将数据上传到hdfs上面去

11、程序main函数入口

详见代码。

05-22 04:20