本文实例讲述了python检测远程udp端口是否打开的方法。分享给大家供大家参考。具体实现方法如下:
代码如下:
import socket
import threading
import time
import struct
import queue
queue = queue.queue()
def udp_sender(ip,port):
try:
addr = (ip,port)
sock_udp = socket.socket(socket.af_inet,socket.sock_dgram)
sock_udp.sendto(“abcd…”,addr)
sock_udp.close()
except:
pass
def icmp_receiver(ip,port):
icmp = socket.getprotobyname(“icmp”)
try:
sock_icmp = socket.socket(socket.af_inet, socket.sock_raw, icmp)
except socket.error, (errno, msg):
if errno == 1:
# operation not permitted
msg = msg + (
” – note that icmp messages can only be sent from processes”
” running as root.”
)
raise socket.error(msg)
raise # raise the original error
sock_icmp.settimeout(3)
try:
recpacket,addr = sock_icmp.recvfrom(64)
except:
queue.put(true)
return
icmpheader = recpacket[20:28]
icmpport = int(recpacket.encode(‘hex’)[100:104],16)
head_type, code, checksum, packetid, sequence = struct.unpack(
“bbhhh”, icmpheader
)
sock_icmp.close()
if code == 3 and icmpport == port and addr[0] == ip:
queue.put(false)
return
def checker_udp(ip,port):
thread_udp = threading.thread(target=udp_sender,args=(ip,port))
thread_icmp = threading.thread(target=icmp_receiver,args=(ip,port))
thread_udp.daemon= true
thread_icmp.daemon = true
thread_icmp.start()
time.sleep(0.1)
thread_udp.start()
thread_icmp.join()
thread_udp.join()
return queue.get(false)
if __name__ == ‘__main__’:
import sys
print checker_udp(sys.argv[1],int(sys.argv[2]))
希望本文所述对大家的python程序设计有所帮助。