探测网络设备ACL规则
背景:在互联网企业的生产网络中,往往在网络入口处的网络设备上会有成千上万条ACL策略,这么多的ACL导致了网络管理员很难彻底梳理清楚其中的逻辑关系,从而不知道到底对外开放了哪些IP和哪些端口。
解决手段:编写ACL规则探测程序,从公网扫描该网络设备的ACL规则
工作原理:不管是交换机还是路由器或防火墙,在处理数据包时ACL规则总是优先于ICMP规则。即:当网络设备收到一个TTL为0的报文时会先匹配ACL规则之后再向发送者发送 ICMP time exceeded消息,基于此原理就可以在公网发送以IDC内地址为目的IP且TTL到被探测设备时刚好减为0的数据包,如果被探测设备返回了ICMP time exceeded消息则说明它的ACL策略针对此IP及port开放,如果没有返回包则说明数据包被它的ACL阻拦
图示:
程序实现语言:python3
源码:
1 # coding:utf-8
2
3 from itertools import groupby
4 from scapy.all import *
5 import re
6 import sys
7 import IPy
8
9
10 class RangeException(Exception):
11 pass
12
13
14 class InputType(Exception):
15 pass
16
17
18 class TargetNotSupport(Exception):
19 pass
20
21
22 class OptionError(Exception):
23 pass
24
25
26 class PortScan(object):
27 def __init__(self, speed=3):
28 self.open_port = []
29 self.speed = speed
30
31 def __str__(self):
32 speed_statement = '使用PortScan(*)创建对象时可以在*处指定扫描速率,默认为3,数值越小扫描速度越快\n' \
33 '注意:随着扫描速度的增加准确率会相应降低!'
34 return speed_statement
35
36 # 从本地文件读取IP资源
37 def __target(self):
38 try:
39 open_file = input('请输入要导入资源的文件名字:')
40 address_file = open(open_file, 'r')
41 address_list = []
42 for i in address_file.readlines():
43 i = i.replace('\n', '')
44 address_list.append(i)
45 except FileNotFoundError:
46 print('\n')
47 print('请先在本地创建对应名字的IP列表文本文件!!!')
48 print('\n')
49 self.scan()
50
51 except KeyboardInterrupt:
52 print('')
53 sys.exit()
54
55 except Exception as error:
56 print('打开本地文件有误!!!')
57 print(error)
58 self.scan()
59 else:
60 return address_list
61
62 # 获取IP资源
63 # 输入1从一个文件读取IP,输入2从屏幕输入获取IP
64 # 获取的IP信息可以是单个IP地址(例:220.12.12.12),也可以是一个地址段(例:192.168.1.0/24)
65 # 最终返回一个IP地址列表,此列表包含了输入的所有单个IP地址以及地址段中的可用IP
66 def get_ip(self, option, string):
67
68 address_store = [] # IP资源存储
69
70 # 如果选1则从文件读取IP资源
71 if option == 1:
72 # 得到打开IP表文件名字及其IP表
73 address_list = self.__target()
74
75 # 如果选2则手动输入IP资源
76 if option == 2:
77 # 接收IP数据
78 address_list = input(string) # 1.1.1.1,2.2.2.0/24
79 address_list = address_list.split(',')
80
81 # 1.1.1.1/24 的正则
82 ip_range_re = r'( *(25[0-5]|2[0-4]\d|[0-1]\d{2}|[1-9]?\d)\.' \
83 r'(25[0-5]|2[0-4]\d|[0-1]\d{2}|[1-9]?\d)\.' \
84 r'(25[0-5]|2[0-4]\d|[0-1]\d{2}|[1-9]?\d)\.' \
85 r'(25[0-5]|2[0-4]\d|[0-1]\d{2}|[1-9]?\d))/' \
86 r'(3[012]|[12][0-9]|[1-9]) *'
87 # 1.1.1.1,2.2.2.2,3.3.3.3 的正则
88 ip_address_re = r'( *(25[0-5]|2[0-4]\d|[0-1]\d{2}|[1-9]?\d)\.' \
89 r'(25[0-5]|2[0-4]\d|[0-1]\d{2}|[1-9]?\d)\.' \
90 r'(25[0-5]|2[0-4]\d|[0-1]\d{2}|[1-9]?\d)\.' \
91 r'(25[0-5]|2[0-4]\d|[0-1]\d{2}|[1-9]?\d))'
92
93 # 对输入的值进行便利,提取其中的IP地址
94 for i in address_list:
95 range_re_result = re.match(ip_range_re, i) # 1.1.1.1/24的正则匹配结果
96 ip_re_result = re.match(ip_address_re, i) # 1.1.1.1,2.2.2.2,3.3.3.3 的正则匹配结果
97 if range_re_result:
98 subnet_mask = range_re_result.group(6)
99 network_number = re.sub(r'^0+', '', range_re_result.group(1))
100 address_string = network_number + '/' + subnet_mask # 如果输入1.1.1.1/24类型则address_store为字符串
101 try:
102 # 提取网段内所有可用IP地址并加表
103 address_subset = IPy.IP(address_string)
104 for i in address_subset:
105 if i == address_subset[len(address_subset)-1] or i == address_subset[0]:
106 continue
107 else:
108 address_store.append(str(i))
109 except ValueError:
110 print('输入有误,请按"网络号/掩码"或"IP地址"格式输入')
111 self.scan()
112
113 elif ip_re_result:
114 ii = re.sub(r'^0+', '', ip_re_result.group())
115 address_store.append(ii) #对单个IP地址形式的输入直接加表
116 else:
117 print('输入有误,请输入正确的IP地址(e.g:1.1.1.1,192.168.1.0/24)!!!')
118 self.get_ip(option)
119 return address_store
120
121 # 通过从屏幕输入获取端口资源
122 # 输入形式可以为单个端口号(例:3389),也可以是一个端口范围(例:22-25)
123 # 返回数据为一个列表,其中每个元素都以元组形式存在. 每个元组包含两个整数元素,第一个为端口范围的最小值,第二个为端口范围的最大值
124 # 注意: 单个端口号形式的输入最后也将以范围形式输出,其最大值与最小值都为他本身
125 # 返回数据举例: [(22-25),(3389,3389)]
126 def get_port(self):
127 port_range = input('请输入要扫描端口范围(e.g: 3389,20-25):')
128 target_port = [] # 端口资源存储
129 try:
130 port_range = port_range.split(',') # 例:['1', '2', '3-10', '11-20']
131 for i in port_range:
132 if re.match(' *(\d+)-(\d+).*', i):
133 # for ii in range(len(open_port_list)):
134 low_port = int(re.match(' *(\d+)-(\d+).*', i).group(1))
135 high_port = int(re.match(' *(\d+)-(\d+).*', i).group(2))
136 if low_port >= high_port or low_port <= 0 or low_port > 65535 or high_port <= 0 or high_port > 65535:
137 raise RangeException
138 else:
139 target_port.append((low_port, high_port)) # 如果是范围则把最小值和最大值以元组形式加表
140 elif re.match(' *\d+ *', i):
141 singular = int(re.match(' *(\d+) *', i).group(1))
142 if 0 < singular <= 65535:
143 target_port.append((singular, singular)) # 如果是单整数则把它当作范围一样处理,最大值和最小值均为它自己
144 else:
145 raise RangeException
146 else:
147 raise InputType
148 except RangeException:
149 print('端口应为1-65535之间的整数,且输入范围格式应当为从小到大')
150 self.get_port()
151 except InputType:
152 print('端口类型应为整数')
153 self.get_port()
154 except KeyboardInterrupt:
155 print('')
156 sys.exit()
157 except Exception as unusual:
158 print('输入有误!')
159 print(unusual)
160 self.get_port()
161 return target_port # 返回经过处理的目标端口列表
162
163 # 对纯数字的列表进行排序且范围切块
164 # 例:导入[11,22,33,1,2,3,4,5]----->导出[1-5,11,22,33]
165 @staticmethod
166 def int_single_to_range(original):
167 original.sort() # 先排序
168 open_port_range = []
169 fun = lambda x: x[1] - x[0]
170 for k, g in groupby(enumerate(original), fun):
171 l1 = [j for i, j in g] # 连续数字的列表
172 if len(l1) > 1:
173 scop = str(min(l1)) + '-' + str(max(l1)) # 将连续数字范围用"-"连接
174 else:
175 scop = l1[0]
176 open_port_range.append("{}".format(scop))
177 return open_port_range
178
179 # TTL自动检测
180 # 导入一个被探测设备IP列表,返回一个被探测设备IP与相应TTL的字典,例:{'220.2.2.2':15}
181 def ttl_check(self, address_list):
182 print('准备中...')
183 probe_device_ttl = {}
184 # switch = 0 # 检测返回数据包的源IP是否为被探测设备
185 try:
186 for i in address_list:
187 for ii in range(1, 129):
188 print(i, ii)
189 scan_packet = IP(dst=i, ttl=ii) / TCP(dport=8080, flags='S')
190 ttl_source = sr1(scan_packet, timeout=3, verbose=False)
191 #while 1:
192 # time.sleep(0.001)
193 if ttl_source:
194 try:
195 if ttl_source['IP'].fields['src'] == i:
196 probe_device_ttl[i] = ii
197 # switch = 1
198 break
199 else:
200 continue
201 except Exception as receive_error:
202 print(receive_error)
203 raise
204 # if switch == 1:
205 # break
206 else:
207 print('TTL超时!!!')
208
209 except KeyboardInterrupt:
210 print('')
211 sys.exit()
212
213 except Exception as error:
214 print('程序出现错误!!!')
215 print(error)
216 self.scan()
217 else:
218 print('准备完毕')
219 return probe_device_ttl
220
221 @staticmethod
222 def option():
223 print('请选择导入被扫描信息方式:\n'
224 '1 从文件导入\n'
225 '2 在程序中手动输入\n')
226
227 def scan(self):
228 # 功能选择
229 self.option()
230 try:
231 option = int(input('我选择: '))
232 print(option)
233 if option != 2 and option != 1:
234 raise OptionError
235 except OptionError:
236 print('请输入功能标号!')
237 self.scan()
238
239 # 获取要扫描IP列表
240 address_store = self.get_ip(option, '请输入被探测IP资源:')
241
242 # 获取要扫描的端口列表
243 port_range = self.get_port()
244
245 probe_device = self.get_ip(2, '请输入被探测的安全设备IP地址:')
246
247 # 自动检测到探测设备的TTL值,该值为一个字典,key为被探测安全设备IP,value为到该设备的TTL值
248 ttl = self.ttl_check(probe_device)
249
250 count = 0 # 用作进度百分比的分子. 以每个IP的每个端口为单位进行计数,总数为IP个数*端口个数
251
252 if ttl:
253 # 挨个儿朝被探测设备发送端口探测包
254 for probe_device_ip, ttl in ttl.items():
255
256 print(probe_device_ip + '端口开放情况:')
257
258 # 创建一个新文件,准备导入结果
259 write_file = open(probe_device_ip + '-result.txt', 'w')
260
261 try:
262 # 为每个被探测设备计算IP资源池中所有的IP资源
263 for i in address_store:
264
265 # 为每个IP计算各个输入IP端口范围开放情况
266 for port in port_range:
267 (low_port, high_port) = port
268 scan_packets = IP(dst=i, ttl=ttl) / TCP(dport=(low_port, high_port), flags='S') # 构造检测包
269 replay_packets_total = sr(scan_packets, timeout=self.speed, verbose=False) # 发送检测包及接收返回包
270 open_port_list = replay_packets_total[0].res # 开放端口原始对象列表(一个IP不同端口范围回包的集合)
271
272 # 一个IP有几个端口开放就有几个回包(如果端口被ACL干掉则不会回包),以下遍历回包来读取开放的端口
273 for ii in range(len(open_port_list)):
274 try:
275 if open_port_list[ii][1]['ICMP'].fields['type'] == 11: # ICMP类型为11时为TTL超时包
276 self.open_port.append(open_port_list[ii][0]['TCP'].fields['dport']) # TTL超时则为开放端口,将开放端口进行加表
277 continue
278 else:
279 if open_port_list[ii][1]['ICMP'].fields['type'] == 3: # 不知为啥有时候会返回类型为3的ICMP包(即:端口不可达包)
280 continue
281 else:
282 # 除11和3外其他类型的ICMP回包,需进行人工排查
283 print('ICMP返回类型不对')
284 print(open_port_list[ii][1]['ICMP'].fields)
285 print(open_port_list[ii][0]['TCP'].fields)
286 except IndexError:
287
288 # 如果探测设备IP刚好为要扫描的IP时,开放端口会返回SYN,ACK包
289 if open_port_list[ii][1]['TCP'].fields['flags'] == 'SA':
290 self.open_port.append(open_port_list[ii][0]['TCP'].fields['dport'])
291 continue
292
293 # 不知为啥有时候交换机会返回RST ACK的包
294 if open_port_list[ii][1]['TCP'].fields['flags'] == 'RA':
295 continue
296 else:
297 print('返回未知TCP包,需人工分析')
298 print(open_port_list[ii][1]['TCP'].fields,
299 open_port_list[ii][1]['TCP'].fields['flags'])
300 print(open_port_list[ii])
301
302 count += 1 # 执行进度+1(每计算完一个IP进度+1)
303 print(count)
304
305 # 进度统计
306 speed_to_progress = count / len(address_list) * len(port_range) * len(ttl) * 100
307 print('\r已完成:%.2f%% ' % speed_to_progress, end='')
308
309 self.open_port = self.int_single_to_range(self.open_port) # 对开放端口列表进行排序和范围化
310 print('针对' + i + '开放端口: ', self.open_port)
311 write_file.write(str(i) + ':' + str(self.open_port) + '\n') # 每扫描完一个IP就把该IP结果写入文件
312 self.open_port = [] # 扫尾工作,为下个IP扫描准备一个干净的开放端口列表
313
314 write_file.close()
315
316 except KeyboardInterrupt:
317 print('')
318 write_file.close()
319 sys.exit()
320 except Exception as error:
321 write_file.close()
322 print('程序异常退出!')
323 print(error)
324 else:
325 write_file.close()
326 print('')
327 if option == 1:
328 print('被探测设备%s已完成,结果已导入当前路径''\'%s\'''文件中' % (probe_device_ip, probe_device_ip + '-result.txt'))
329 if option == 2:
330 print('扫描已完成!')
331
332
333 if __name__ == '__main__':
334
335 def banner():
336 print('\n')
337 print('============================================')
338 print('\n')
339 print('\n')
340 print(' ACL有效性探测系统v1.0 ')
341 print('\n')
342 print('\n')
343 print('============================================')
344 print('\n')
345
346 def main():
347 banner()
348 a = PortScan()
349 a.scan()
350
351 main()