[Python] 多线程 ICMP 测试工具

作者 huhamhire,暂无评论,2014年2月2日 14:31 程序实践

接着上一次的多线程 NS 查询工具,获取域名 IP 地址只是后面 hosts 自动化生成工具功能上最为基础的第一步,后面更为重要的环节就是对这些地址的有效性,以及访问效果进行测试评估。所以,这一部分就来说下我目前实现的多线程 ICMP 测试模块原型。

既然连 NS 查询都已经用到了多线程,加上 NS 服务器对每个域名返回的 IP 结果可能会有多个,根服务器对于某些域名比如 Google 这样的,甚至会返回数十个结果。所以在针对这些服务器做 ICMP 测试也就是我们常见的 ping 测试的时候,使用多线程也就在所难免。

ICMP 可以用来检测相关主机当前的差错情况以及通信延迟,对于 hosts 评估来说,目前只需要获取当前位置到服务器的延迟即可。相对来说,ICMP 是比较底层的协议,实现起来其实也就是报文的发送与解析,下面就是我用于在独立线程中实现 ICMP 测试的类。


  1 class PingHost(threading.Thread):
  2     ERROR_DESCR = {
  3         1: 'ERROR: ICMP messages can only be sent from processes running as '
  4            'root.',
  5         10013: 'ERROR: ICMP messages can only be sent by users or processes '
  6                'with administrator rights.',
  7         10049: 'ERROR: "%s" is not available from the local computer',
  8     }
  9 
 10     def __init__(self, ip, ip_id, results, semaphore,
 11                  ping_count=4, timeout=5, v6_flag=False):
 12         threading.Thread.__init__(self)
 13         self.__data = struct.pack('d', time.time())
 14         self._pid = 0
 15         self._pack = None
 16 
 17         self._ip = ip
 18         self._ip_id = ip_id
 19         self._ping_count = ping_count
 20         self.results = results
 21         self.sem = semaphore
 22         self._timeout = timeout
 23         self._v6_flag = v6_flag
 24 
 25         self.time_log = []
 26         self.delay_stat = {}
 27 
 28         self._sock = self.__sock
 29 
 30     @property
 31     def __sock(self):
 32         try:
 33             if not self._v6_flag:
 34                 sock = socket.socket(socket.AF_INET, socket.SOCK_RAW,
 35                                      socket.getprotobyname("icmp"))
 36             else:
 37                 sock = socket.socket(socket.AF_INET6, socket.SOCK_RAW,
 38                                      socket.getprotobyname("ipv6-icmp"))
 39                 sock.settimeout(self._timeout)
 40             return sock
 41         except socket.error, (error_no, msg):
 42             if error_no in self.ERROR_DESCR:
 43                 raise socket.error(msg + self.ERROR_DESCR[error_no])
 44             raise
 45 
 46     @property
 47     def __pack(self):
 48         if not self._v6_flag:
 49             header = struct.pack('bbHHh', 8, 0, 0, self._pid, 0)
 50         else:
 51             header = struct.pack('BbHHh', 128, 0, 0, self._pid, 0)
 52 
 53         pack = header + self.__data
 54         checksum = self.__checksum(pack)
 55 
 56         if not self._v6_flag:
 57             header = struct.pack('bbHHh', 8, 0, checksum, self._pid, 0)
 58         else:
 59             header = struct.pack('BbHHh', 128, 0, checksum, self._pid, 0)
 60         return header + self.__data
 61 
 62     def __checksum(self, pack):
 63         if len(pack) & 1:
 64             pack += '\0'
 65         words = array.array('h', pack)
 66         sum = 0
 67         for word in words:
 68             sum += (word & 0xffff)
 69         sum = (sum >> 16) + (sum & 0xffff)
 70         sum += (sum >> 16)
 71         return (~sum) & 0xffff
 72 
 73     def send(self):
 74         pack = self._pack
 75         while pack:
 76             sent = self._sock.sendto(pack, (self._ip, 0))
 77             pack = pack[sent:]
 78         self.time_sent = time.time()
 79 
 80     def response(self):
 81         while True:
 82             timeout = self._timeout
 83             ready = select.select([self._sock], [], [], timeout)
 84             if not ready[0]:
 85                 return None
 86             time_received = time.time()
 87             rec_packet, address = self._sock.recvfrom(1024)
 88             header = rec_packet[20:28]
 89             rtype, code, checksum, rid, seq = struct.unpack('bbHHh', header)
 90             if rid == self._pid:
 91                 return time_received - self.time_sent
 92             timeout -= (time_received - self.time_sent)
 93             if timeout <= 0:
 94                 return None
 95 
 96     def session(self):
 97         try:
 98             time_log = []
 99             for i in range(self._ping_count):
100                 self._pid = int((id(self._timeout) * random.random()) % 65535)
101                 self._pack = self.__pack
102                 self.send()
103                 delay = self.response()
104                 time_log.append(delay)
105                 time.sleep(0.1)
106             self._sock.close()
107             self.time_log = time_log
108         except socket.error, (error_no, msg):
109             self.time_log = [None for i in range(self._ping_count)]
110             if error_no in self.ERROR_DESCR:
111                 msg += self.ERROR_DESCR[error_no]
112                 if "%s" in msg:
113                     print(msg % self._ip)
114                 else:
115                     print(msg)
116             else:
117                 raise
118         finally:
119             self.sem.release()
120 
121     def stat(self):
122         log = self.time_log
123         log = [delay * 1000 for delay in log if delay is not None]
124         if log:
125             min_delay = round(min(log), 3)
126             max_delay = round(max(log), 3)
127             avg_delay = round(sum(log) / len(log), 3)
128             loss = round(1.0 * len(log) / self._ping_count, 3)
129             self.delay_stat = {"min": min_delay, "max": max_delay,
130                                "avg": avg_delay, "ratio": loss}
131         else:
132             self.delay_stat = {"min": None, "max": None,
133                                "avg": None, "ratio": 0}
134         self.delay_stat["ping_count"] = self._ping_count
135 
136     def set_results(self):
137         self.results[self._ip_id] = self.delay_stat
138 
139     def show_state(self):
140         msg = "PING: %s" % self._ip
141         if self.delay_stat["ratio"] == 1:
142             Progress.show_status(msg, "OK")
143         else:
144             if self.delay_stat["ratio"] > 0:
145                 status = "Lose Pack"
146             else:
147                 status = "Failed"
148             Progress.show_status(msg, status, 1)
149         Progress.progress_bar()
150 
151     def run(self):
152         self.session()
153         self.stat()
154         self.set_results()
155         self.show_state()

从代码来看,整个实现过程非常简单,直接使用 Raw Socket 来发送自己打包的报文,然后再接收解析就可以完成了。

由于是在同一台机器上多线程实现,所以在发送ICMP包的时候,需要采用类似 Unix 系统下的做法,对每一个请求都打上单独的标签 ID 加以区分,否则可能出现对返回的结果产生误判的情况。加上标签以后,每个线程就只会独立的处理自己请求的结果,而不会将其他线程返回的结果错误的当作自己的结果来处理。

测试的结果,我采用类似 Windows 下的统计结果进行储存,主要统计最大延迟、最小延迟、平均延迟以及连通率。

ICMP 测试的实现大概就是这样,下面再来看用于多线程调度的主程序。


 1 class MultiPing(object):
 2     # Limit the number of concurrent sessions
 3     sem = threading.Semaphore(0x40)
 4 
 5     def __init__(self, combinations):
 6         self.combs = combinations
 7         self._responses = {}
 8 
 9     def ping_test(self):
10         Progress.set_total(len(self.combs))
11         Progress.set_counter(self._responses)
12         threads = []
13         for comb in self.combs:
14             self.sem.acquire()
15             ping_host = PingHost(comb["ip"], comb["ip_id"],
16                                  self._responses, self.sem)
17             ping_host.start()
18             threads.append(ping_host)
19 
20         for ping_host in threads:
21             ping_host.join()
22 
23         Progress.progress_bar()
24         return self._responses

与上一次 NS 查询调度相同,这个程序依旧使用信号量来实现线程调度,具体可以参考这个系类的前一篇文章。唯一的区别就是 ICMP 不存在短时间内向同一主机发出大量类似请求的情况,几乎不存在被服务器方面防护栏街的机会,所以线程池可以相对放宽。

最后是这个工具的主程序,同样是类似于上次的存储过程,在所有测试完成后,汇总测试结果并写入数据库,这里就不详细说明了。

另外需要额外说明的是,因为 ICMP 协议和 IP 协议同属于 OSI 模型的第三层,这里实现时使用了 Raw Socket,因此在使用这个工具的时候需要获得当前系统下访问原始套接字的权限,一般可以用 Linux/Unix 下的 root 或者 Windows 下的 Administrator 权限。


1 if __name__ == '__main__':
2     SourceData.connect_db()
3     combs = SourceData.get_ping_test_comb()
4 
5     ping_tests = MultiPing(combs)
6     results = ping_tests.ping_test()
7 
8     SourceData.set_multi_ping_test_dict(results)

当然,下面少不了实际使用过程的截图 :-)

icmp
登录后进行评论