网安需求:
1:IFTTT:随着物联网的兴起,if this then that 。如果出现这种情况,那么及时反映做出对应的操作。
判断手机号黑白名单,mac地址黑白名单。如果是碰到手机号或者mac地址黑名单,需要立即报警,通知当地网安部门。实现实时的黑白名单过滤。
2:路由器数据上报的方式:每三分钟上报一次,将最近三分钟所有人上网的情况以txt格式的形势进行上报。因为上传比较频繁,所以上传的数据全部都是以小文件的形势进行上传的,基本上不会有超过1M的文件。
且要保证做到所有的数据不丢不漏,不重不错。所有上报的数据需要能够做到立即查询。
3:所有的历史数据保存两年,且可供两年查询
商用路由器商户业务需求(新增)
1.实时查看附近的人流量有多少(实时处理任务。)
2.每天每小时入店上网人数有多少,统计出哪几个小时属于人流量高峰期(批量处理任务)
3.实时查看路由器周边一百米,三百米,五百米,一千米,三千米,五千米的人流量 (路由器周边人流量,实时处理任务。)
数据查询部分:使用的是mongoDB+ES的形式做数据查询。
数据平台架构:
需要考虑的地方:1.每天的数据量有多少
2.服务器的配置,包括网卡,CPU,内存,硬盘。
3.数据实时处理,选用哪种技术(考虑低成本,可维护)
4.数据批量处理,选用哪种技术(考虑低成本,可维护)
数据量情况:每天数据量大概在100G左右,全部都是以FTP小文件的形式进行上传。线上大概有8000台路由器(包括商户上网路由器+mac嗅探设备)
嗅探功能:以低于时速120KM/h + 开启移动端无线网 ==》可以获取到你的手机mac
大数据平台的架构设计:
一共七台机器:
两台主节点:64G内存+12T硬盘+内网千兆网卡。
五台从节点配置: 32G内存+12T硬盘+内网千兆网卡。
指定一台机器开通FTP,路由器以FTP方式上报小文件到服务器上面。启动一个Java线程实时读取目录下面上传的文件。
问题一:如何确认文件已经上传完成
问题二:文件的合并过程。
问题三:错误数据过滤。硬件上报的数据,很多都可能会出现错误数据的情况。
问题四: 每天晚上十二点的时候,不管合并数据量多大,都要上传
=======================================================
Storm第三天路由器数据监控
1、项目背景介绍:
随着路由器上网的普及,越来越多的人在各个场合选择使用路由器上网,特别是在一些公共场所,例如网吧,酒店,饭店,旅馆,宾馆,洗浴中心等。这些公共场所的网络安全也日益受到各地网安的关注,各种问题也日益凸显。
如何鉴别上网人员的身份问题?
如何通过路由器来解决人群聚集的问题?
如何对上网人员的身份进行追踪?
如何通过公共路由器来获取指定人员的行动轨迹?
如何获取上网人员的网络内容?
等等这些问题都困扰着各地的网安部门。为此我司特为各地网安推出定制化的路由器,在网安指定的公共地点安装我司的路由器,可以追踪每个人的上网情况,通过路由器或者嗅探设备的mac地址以及经纬度的追踪,可以定位每个人员的上网大致方位,了解每个人的上网内容,做到实时的网页内容监控,地理位置的监控,上网设备的mac地址追踪,通过嗅探设备,实现上网设备的实时路线追踪,为各地网安解决各种定制化的任务。
时速小于90KM/小时,并且开着无线网,嗅探设备就能抓到你的mac地址
2、项目数据处理流程
3、数据类型梳理
1、终端mac记录:
YT1013 MAC地址抓取记录表 "audit_mac_detail_" 表 length 25
获取到的所有的mac地址都会存入这种类型的文件中
YT1013=iumac,idmac,area_code,policeid,sumac,sdmac,datasource,netsite_type, capture_time, netbar_wacode, brandid, cache_ssid,terminal_filed_strength,ssid_position,access_ap_mac,access_ap_channel,access_ap_encryption_type,collection_equipment_id,collection_equipment_longitude,collection_equipment_latitude,wxid,province_code,city_code,typename,security_software_orgcode
2、虚拟身份记录
YT1020 虚拟身份抓取记录表 virtual_detail表 length 22
获取到所有的虚拟身份都会存入这种类型的文件中
YT1020=mobile,iumac,idmac,area_code,policeid,netsite_type,sumac,account_type,soft_type,sdmac,netbar_wacode,sessionid,ip_address,account,capture_time,collection_equipment_id,wxid,province_code,city_code,datasource,typename,security_software_orgcode
3、终端上下线记录
YT1023 终端上下线记录表 wifi_online表 length 51
获取到所有的终端上下线的记录都会存入到这种类型的文件中
YT1023=iumac,idmac,area_code,policeid,netsite_type,capture_time,sumac, sdmac, netbar_wacode,auth_type,auth_account, collection_equipment_id,datafrom, start_time,end_time,ip_address,src_ip,src_port_start,src_port_end,src_port_start_v6,src_port_end_v6,dst_ip,dst_ip_ipv6,dst_port,dst_port_v6,certificate_type,certificate_code,app_company_name,app_software_name,app_version,appid,src_ipv6,longitude,latitude,sessionid,terminal_fieldstregth,x_coordinate,y_coordinate,name,imsi,imei_esn_meid,os_name,brand,model,network_app,port,wxid,province_code,city_code,typename,security_software_orgcode
4、搜索关键字记录
YT1033 搜索关键字记录表 searchkey_detail表 length 21
获取到所有的搜索记录关键字都会存入到这种类型的文件中
YT1033=iumac,idmac,area_code,policeid,netsite_type,capture_time,sumac,sdmac,netbar_wacode,src_ip,src_port,dst_ip,dst_port,search_groupid,http_domain,search_keyword,wxid,province_code,city_code,typename,security_software_orgcode
5、网页访问记录
YT1034 网页访问记录表 webpage_detail 表 length 24
获取到所有的网页访问记录都会存入到这种类型的文件中
YT1034=iumac,idmac,area_code,policeid,netsite_type,capture_time,sumac,sdmac,netbar_wacode,src_ip,src_port,dst_ip,dst_port,http_method,http_domain,http_action_match,web_url,http_categoryid,web_title,wxid,province_code,city_code,typename,security_software_orgcode
4、模拟数据产生
创建两个文件夹
mkdir -p /export/datas/destFile # 拷贝后的文件
mkdir -p /export/datas/sourceFile #源文件
详见资料当中的filegenerate.jar
java -jar filegenerate.jar /export/datas/sourceFile/ /export/datas/destFile 1000
5、创建kafka的topic
bin/kafka-topics.sh --create --replication-factor 2 --topic wifidata --zookeeper node01:2181,node02:2181,node03:2181 --partitions 6
6、定义flume配置文件并启动flume
配置kafka配置文件wifi.conf
#为我们的source channel sink起名
a1.sources = r1
a1.channels = c1
a1.sinks = k1
#指定我们的source收集到的数据发送到哪个管道
a1.sources.r1.channels = c1
#指定我们的source数据收集策略
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /export/datas/destFile
a1.sources.r1.deletePolicy = immediate
a1.sources.r1.ignorePattern = ^(.)*\\.tmp$
#指定我们的channel为memory,即表示所有的数据都装进memory当中
a1.channels.c1.type = memory
#指定我们的sink为kafka sink,并指定我们的sink从哪个channel当中读取数据
a1.sinks.k1.channel = c1
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = wifidata
a1.sinks.k1.kafka.bootstrap.servers = node01:9092,node02:9092,node03:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
启动flume程序
bin/flume-ng agent -n a1 -c conf -f /export/servers/apache-flume-1.8.0-bin/conf/wifi.conf -Dflume.root.logger=INFO,console
7、创建maven工程并导入相应的jar包
8、开发我们的WifiTypeBolt
过滤掉字段不同的一些脏数据
9、开发我们的WifiWarningBolt
实时告警我们的mac黑名单,手机号码黑名单等
10、开发我们的WriteFileBolt
文件合并到本地一定大小之后,就将数据上传到hdfs上面去
11、程序main函数入口
详见代码。