0x一、前言
在現場取證遇到分析流量包的狀況會比較少,雖然流量類設備原理是把數據都抓出來進行解析,很大必定程度上已經把人能夠作的事情交給了機器自動完成。html
可用於PCAP包分析的軟件好比科來,Wireshark都是很好用的分析軟件,找Pcap解析的編程類代碼時發現已經有不少大佬寫過Python腳本輔助解析Pcap,也有提取將Pcap信息以界面形式展現出來框架。python
本文對利用Python裏的Scapy庫提取協議五元組信息進行學習性總結,沒有用於實戰,由於實踐過程當中發現PCAP讀包解包查包速度太慢了。git
0x二、參考庫
Python解析pcap包的常見庫有Scapy、dpkt、Pyshark等。能夠參考的源碼,工具以下github
https://github.com/thepacketgeek/cloud-pcap https://github.com/madpowah/ForensicPCAP https://github.com/le4f/pcap-analyzer https://github.com/HatBoy/Pcap-Analyzer https://github.com/caesar0301/awesome-pcaptools https://asecuritysite.com/forensics/pcap https://github.com/DanMcInerney/net-creds
提供解析Pcap包服務的網站https://packettotal.com/、https://www.capanalysis.net/ca/、https://asecuritysite.com/forensics/pcap?infile=smtp.pcap&infile=smtp.pcap、https://app.any.run
編程
0x三、郵件協議
提取發件人的郵箱,就要熟悉SMTP的幾個端口進行識別。windows
25 端口爲SMTP(Simple Mail Transfer Protocol,簡單郵件傳輸協議)服務所開放的,是用於發送郵件。 110端口是爲POP3(郵件協議3)服務開放的,POP二、POP3都是主要用於接收郵件的,目前POP3使用的比較多,許多服務器都同時支持POP2和POP3。 143端口主要是用於「Internet Message AccessProtocol」v2(Internet消息訪問協議,簡稱IMAP),和POP3同樣,是用於電子郵件的接收的協議。 465 端口是SSL/TLS通信協議的 內容一開始就被保護起來了 是看不到原文的。 465 端口(SMTPS):465端口是爲SMTPS(SMTP-over-SSL)協議服務開放的,這是SMTP協議基於SSL安全協議之上的一種變種協議,它繼承了SSL安全協議的非對稱加密的高度安全可靠性,可防止郵件泄露。 587 端口是STARTTLS協議的 屬於TLS通信協議 只是他是在STARTTLS命令執行後纔對以後的原文進行保護的。
SMTP經常使用命令語法
在Wireshark中SMTP協議數據是在創建TCP三次握手後會出現的,經常使用的命令以下。api
SMTP命令不區分大小寫,但參數區分大小寫,有關這方面的詳細說明請參考RFC821。 HELO <domain> <CRLF>。向服務器標識用戶身份發送者能欺騙,說謊,但通常狀況下服務器都能檢測到。 MAIL FROM: <reverse-path> <CRLF>。<reverse-path>爲發送者地址,此命令用來初始化郵件傳輸,即用來對全部的狀態和緩衝區進行初始化。 RCPT TO:<forward-path> <CRLF>。 <forward-path>用來標誌郵件接收者的地址,經常使用在MAIL FROM後,能夠有多個RCPT TO。 DATA <CRLF>。將以後的數據做爲數據發送,以<CRLF>.<CRLF>標誌數據的結尾。 REST <CRLF>。重置會話,當前傳輸被取消。 NOOP <CRLF>。要求服務器返回OK應答,通常用做測試。 QUIT <CRLF>。結束會話。 VRFY <string> <CRLF>。驗證指定的郵箱是否存在,因爲安全方面的緣由,服務器大多禁止此命令。 EXPN <string> <CRLF>。驗證給定的郵箱列表是否存在,因爲安全方面的緣由,服務器大多禁止此命令。 HELP <CRLF>。查詢服務器支持什麼命令。
代碼
forensicPCAP 能夠根據解析PCAP包,而後利用CMD模塊循環交互界面,輸入命令調出對應函數。核心原理爲安全
基於Scapy找到源端口或目的端口爲1十、143端口的數據記錄保存做爲郵件數據。服務器
關鍵代碼代碼以下:網絡
self.pcap是構造函數self.pcap = rdpcap(namefile)
提早讀取,而後enumerate()遍歷。指定目標端口和源端口是1十、143的數據篩選出來。
def do_mail(self, arg, opts=None): """Print the number of mail's requests and store its Usage : - mail""" sys.stdout.write(bcolors.TXT + "## Searching mail's request ... ") sys.stdout.flush() con = [] mailpkts = [] for i,packet in enumerate(self.pcap): # TCP的包 if TCP in packet: # 獲取源端口或目的端口爲1十、143端口的數據記錄 if packet.getlayer('TCP').dport == 110 or packet.getlayer('TCP').sport == 110 or packet.getlayer('TCP').dport == 143 or packet.getlayer('TCP').sport == 143 : if packet.getlayer('TCP').flags == 2: con.append(i) mailpkts.append(packet) sys.stdout.write("OK.\n") print "## Result : Mail's request : " + str(len(con)) sys.stdout.write(bcolors.TXT + "## Saving mails ... ") sys.stdout.flush() res = "" for packet in mailpkts: if packet.getlayer('TCP').flags == 24: res = res + packet.getlayer('Raw').load sys.stdout.write(".") sys.stdout.flush() sys.stdout.write("OK\n") sys.stdout.flush() self.cmd = "mail" self.last = res
0x四、DNS解析
forensicPCAP 解析DNS部分代碼。bcolors.TXT是設定了一個字體顯示的顏色,res = packet.getlayer('DNS').qd.qname
是獲取DNS域名解析記錄。
############# do_dns() ########### def do_dns(self, arg, opts=None): """Print all DNS requests in the PCAP file Usage : - dns""" sys.stdout.write(bcolors.TXT + "## Listing all DNS requests ...") sys.stdout.flush() dns = [] dns.append([]) # 枚舉PCAP包的數據 for i,packet in enumerate(self.pcap): if DNS in packet: # 獲取DNS域名解析記錄 res = packet.getlayer('DNS').qd.qname if res[len(res) - 1] == '.': res = res[:-1] # 保存域名 dns.append([i, res]) sys.stdout.write("OK.\n") # 統計DNS數目 print bcolors.TXT + "## Result : " + str(len(dns) - 1) + " DNS request(s)" + bcolors.ENDC self.last = dns self.cmd = "dns"
pcap-analyzer核心的代碼是借鑑了forensicPCAP,獲取解析爲DNS的請求記錄顯示的時候會統計出次數最多的IP的前十名。
代碼
def get_dns(file): dns = [] # 打開PCAP文件 pcap = rdpcap(UPLOAD_FOLDER+file) for packet in pcap: if DNS in packet: # 核心代碼 res = packet.getlayer('DNS').qd.qname if res[len(res) - 1] == '.': res = res[:-1] dns.append(res) # 統計DNS協議,出現次數最多的IP前十名 dns = Counter(dns).most_common(10)
0x五、密碼信息提取
net-creds 以Scapy爲基礎,解析PCAP中含有密碼信息的一個腳本。
代碼
主要代碼由other_parser()函數實現,分割每一個包中的HTTP等內容,而後搜索身份驗證相關的關鍵字篩選出帳戶、密碼。
def other_parser(src_ip_port, dst_ip_port, full_load, ack, seq, pkt, verbose): ''' Pull out pertinent info from the parsed HTTP packet data ''' user_passwd = None http_url_req = None method = None http_methods = ['GET ', 'POST ', 'CONNECT ', 'TRACE ', 'TRACK ', 'PUT ', 'DELETE ', 'HEAD '] http_line, header_lines, body = parse_http_load(full_load, http_methods) headers = headers_to_dict(header_lines) if 'host' in headers: host = headers['host'] else: host = '' if http_line != None: method, path = parse_http_line(http_line, http_methods) http_url_req = get_http_url(method, host, path, headers) if http_url_req != None: if verbose == False: if len(http_url_req) > 98: http_url_req = http_url_req[:99] + '...' printer(src_ip_port, None, http_url_req) # Print search terms searched = get_http_searches(http_url_req, body, host) if searched: printer(src_ip_port, dst_ip_port, searched) # Print user/pwds if body != '': user_passwd = get_login_pass(body) if user_passwd != None: try: http_user = user_passwd[0].decode('utf8') http_pass = user_passwd[1].decode('utf8') # Set a limit on how long they can be prevent false+ if len(http_user) > 75 or len(http_pass) > 75: return user_msg = 'HTTP username: %s' % http_user printer(src_ip_port, dst_ip_port, user_msg) pass_msg = 'HTTP password: %s' % http_pass printer(src_ip_port, dst_ip_port, pass_msg) except UnicodeDecodeError: pass # Print POST loads # ocsp is a common SSL post load that's never interesting if method == 'POST' and 'ocsp.' not in host: try: if verbose == False and len(body) > 99: # If it can't decode to utf8 we're probably not interested in it msg = 'POST load: %s...' % body[:99].encode('utf8') else: msg = 'POST load: %s' % body.encode('utf8') printer(src_ip_port, None, msg) except UnicodeDecodeError: pass # Kerberos over TCP decoded = Decode_Ip_Packet(str(pkt)[14:]) kerb_hash = ParseMSKerbv5TCP(decoded['data'][20:]) if kerb_hash: printer(src_ip_port, dst_ip_port, kerb_hash) # Non-NETNTLM NTLM hashes (MSSQL, DCE-RPC,SMBv1/2,LDAP, MSSQL) NTLMSSP2 = re.search(NTLMSSP2_re, full_load, re.DOTALL) NTLMSSP3 = re.search(NTLMSSP3_re, full_load, re.DOTALL) if NTLMSSP2: parse_ntlm_chal(NTLMSSP2.group(), ack) if NTLMSSP3: ntlm_resp_found = parse_ntlm_resp(NTLMSSP3.group(), seq) if ntlm_resp_found != None: printer(src_ip_port, dst_ip_port, ntlm_resp_found) # Look for authentication headers if len(headers) == 0: authenticate_header = None authorization_header = None for header in headers: authenticate_header = re.match(authenticate_re, header) authorization_header = re.match(authorization_re, header) if authenticate_header or authorization_header: break if authorization_header or authenticate_header: # NETNTLM netntlm_found = parse_netntlm(authenticate_header, authorization_header, headers, ack, seq) if netntlm_found != None: printer(src_ip_port, dst_ip_port, netntlm_found) # Basic Auth parse_basic_auth(src_ip_port, dst_ip_port, headers, authorization_header)
關鍵字列表:
# Regexs authenticate_re = '(www-|proxy-)?authenticate' authorization_re = '(www-|proxy-)?authorization' ftp_user_re = r'USER (.+)\r\n' ftp_pw_re = r'PASS (.+)\r\n' irc_user_re = r'NICK (.+?)((\r)?\n|\s)' irc_pw_re = r'NS IDENTIFY (.+)' irc_pw_re2 = 'nickserv :identify (.+)' mail_auth_re = '(\d+ )?(auth|authenticate) (login|plain)' mail_auth_re1 = '(\d+ )?login ' NTLMSSP2_re = 'NTLMSSP\x00\x02\x00\x00\x00.+' NTLMSSP3_re = 'NTLMSSP\x00\x03\x00\x00\x00.+' # Prone to false+ but prefer that to false- http_search_re = '((search|query|&q|\?q|search\?p|searchterm|keywords|keyword|command|terms|keys|question|kwd|searchPhrase)=([^&][^&]*))'
0x六、提取數據須要關注的元素
- 源IP、目的IP、源端口、目的端口、協議、數據包大小
關聯出受害者,攻擊者控制的跳板
- 協議:HTTP、FTP、郵件協議
- 先查看HTTP、HTTPS類,而後查看數據包長度。 - 提取相關IP信息。 - 判斷是不是木馬遠控、密碼帳戶、可疑IP
- 帳戶密碼字段
- 滲透測試,嗅探回來的數據包分析含有帳戶密碼信息 - 暴力破解IP
0x七、測試dpkt解析pcap
本想借助dpkt解析mail、dns、http來輔助分析pcap包進行分析,查閱資料學習卻發現並不如使用scapy那麼方便。
dpkt是一個python模塊,能夠對簡單的數據包建立/解析,以及基本TCP / IP協議的解析,速度很快。
dpkt 手冊
看官方手冊發現DPKT是讀取每一個pcap包裏的內容,用isinstance判斷是否是有IP的包,再判斷是屬於哪一個協議,對應的協議已經封裝好API若是發現能夠匹配某個協議API就輸出來相關值。
想要擴展這個源碼還須要去學習一下協議相關的字段含義。
API調用:
https://dpkt.readthedocs.io/en/latest/api/api_auto.html#module-dpkt.qq
在手冊中找到了在Github中部分API的示例代碼,具有參考價值。
手冊例子
如下代碼是手冊中的例子,經過查詢發現inet_pton沒法直接使用,按照網絡上的解決方法修改了一下。
打印數據包
使用DPKT讀取pcap文件並打印出數據包的內容。打印出以太網幀和IP數據包中的字段。
python2測試代碼:
#!/usr/bin/env python """ Use DPKT to read in a pcap file and print out the contents of the packets This example is focused on the fields in the Ethernet Frame and IP packet """ import dpkt import datetime import socket from dpkt.compat import compat_ord import ctypes import os def mac_addr(address): """Convert a MAC address to a readable/printable string Args: address (str): a MAC address in hex form (e.g. '\x01\x02\x03\x04\x05\x06') Returns: str: Printable/readable MAC address """ return ':'.join('%02x' % compat_ord(b) for b in address) class sockaddr(ctypes.Structure): _fields_ = [("sa_family", ctypes.c_short), ("__pad1", ctypes.c_ushort), ("ipv4_addr", ctypes.c_byte * 4), ("ipv6_addr", ctypes.c_byte * 16), ("__pad2", ctypes.c_ulong)] if hasattr(ctypes, 'windll'): WSAStringToAddressA = ctypes.windll.ws2_32.WSAStringToAddressA WSAAddressToStringA = ctypes.windll.ws2_32.WSAAddressToStringA else: def not_windows(): raise SystemError( "Invalid platform. ctypes.windll must be available." ) WSAStringToAddressA = not_windows WSAAddressToStringA = not_windows def inet_pton(address_family, ip_string): addr = sockaddr() addr.sa_family = address_family addr_size = ctypes.c_int(ctypes.sizeof(addr)) if WSAStringToAddressA( ip_string, address_family, None, ctypes.byref(addr), ctypes.byref(addr_size) ) != 0: raise socket.error(ctypes.FormatError()) if address_family == socket.AF_INET: return ctypes.string_at(addr.ipv4_addr, 4) if address_family == socket.AF_INET6: return ctypes.string_at(addr.ipv6_addr, 16) raise socket.error('unknown address family') def inet_ntop(address_family, packed_ip): addr = sockaddr() addr.sa_family = address_family addr_size = ctypes.c_int(ctypes.sizeof(addr)) ip_string = ctypes.create_string_buffer(128) ip_string_size = ctypes.c_int(ctypes.sizeof(ip_string)) if address_family == socket.AF_INET: if len(packed_ip) != ctypes.sizeof(addr.ipv4_addr): raise socket.error('packed IP wrong length for inet_ntoa') ctypes.memmove(addr.ipv4_addr, packed_ip, 4) elif address_family == socket.AF_INET6: if len(packed_ip) != ctypes.sizeof(addr.ipv6_addr): raise socket.error('packed IP wrong length for inet_ntoa') ctypes.memmove(addr.ipv6_addr, packed_ip, 16) else: raise socket.error('unknown address family') if WSAAddressToStringA( ctypes.byref(addr), addr_size, None, ip_string, ctypes.byref(ip_string_size) ) != 0: raise socket.error(ctypes.FormatError()) return ip_string[:ip_string_size.value - 1] # Adding our two functions to the socket library if os.name == 'nt': socket.inet_pton = inet_pton socket.inet_ntop = inet_ntop def inet_to_str(inet): return socket.inet_ntop(socket.AF_INET, inet) def print_packets(pcap): """Print out information about each packet in a pcap Args: pcap: dpkt pcap reader object (dpkt.pcap.Reader) """ # packet num count r_num = 0 # For each packet in the pcap process the contents for timestamp, buf in pcap: r_num=r_num+1 print ('packet num count :' , r_num ) # Print out the timestamp in UTC print('Timestamp: ', str(datetime.datetime.utcfromtimestamp(timestamp))) # Unpack the Ethernet frame (mac src/dst, ethertype) eth = dpkt.ethernet.Ethernet(buf) print('Ethernet Frame: ', mac_addr(eth.src), mac_addr(eth.dst), eth.type) # Make sure the Ethernet data contains an IP packet if not isinstance(eth.data, dpkt.ip.IP): print('Non IP Packet type not supported %s\n' % eth.data.__class__.__name__) continue # Now unpack the data within the Ethernet frame (the IP packet) # Pulling out src, dst, length, fragment info, TTL, and Protocol ip = eth.data # Pull out fragment information (flags and offset all packed into off field, so use bitmasks) do_not_fragment = bool(ip.off & dpkt.ip.IP_DF) more_fragments = bool(ip.off & dpkt.ip.IP_MF) fragment_offset = ip.off & dpkt.ip.IP_OFFMASK # Print out the info print('IP: %s -> %s (len=%d ttl=%d DF=%d MF=%d offset=%d)\n' % \ (inet_to_str(ip.src), inet_to_str(ip.dst), ip.len, ip.ttl, do_not_fragment, more_fragments, fragment_offset)) def test(): """Open up a test pcap file and print out the packets""" with open('pcap222.pcap', 'rb') as f: pcap = dpkt.pcap.Reader(f) print_packets(pcap) if __name__ == '__main__': test()
輸出:
('packet num count :', 4474) ('Timestamp: ', '2017-08-01 03:55:03.314832') ('Ethernet Frame: ', '9c:5c:8e:76:bf:24', 'ec:88:8f:86:14:5c', 2048) IP: 192.168.1.103 -> 211.90.25.31 (len=52 ttl=64 DF=1 MF=0 offset=0) ('packet num count :', 4475) ('Timestamp: ', '2017-08-01 03:55:03.485679') ('Ethernet Frame: ', '9c:5c:8e:76:bf:24', 'ec:88:8f:86:14:5c', 2048) IP: 192.168.1.103 -> 180.97.33.12 (len=114 ttl=64 DF=0 MF=0 offset=0) ('packet num count :', 4476) ('Timestamp: ', '2017-08-01 03:55:03.486141') ('Ethernet Frame: ', '9c:5c:8e:76:bf:24', 'ec:88:8f:86:14:5c', 2048) IP: 192.168.1.103 -> 119.75.222.122 (len=52 ttl=64 DF=1 MF=0 offset=0)
打印ICMP
檢查ICMP數據包並顯示ICMP內容。
#!/usr/bin/env python """ Use DPKT to read in a pcap file and print out the contents of the packets This example is focused on the fields in the Ethernet Frame and IP packet """ import dpkt import datetime import socket from dpkt.compat import compat_ord import ctypes import os def mac_addr(address): """Convert a MAC address to a readable/printable string Args: address (str): a MAC address in hex form (e.g. '\x01\x02\x03\x04\x05\x06') Returns: str: Printable/readable MAC address """ return ':'.join('%02x' % compat_ord(b) for b in address) class sockaddr(ctypes.Structure): _fields_ = [("sa_family", ctypes.c_short), ("__pad1", ctypes.c_ushort), ("ipv4_addr", ctypes.c_byte * 4), ("ipv6_addr", ctypes.c_byte * 16), ("__pad2", ctypes.c_ulong)] if hasattr(ctypes, 'windll'): WSAStringToAddressA = ctypes.windll.ws2_32.WSAStringToAddressA WSAAddressToStringA = ctypes.windll.ws2_32.WSAAddressToStringA else: def not_windows(): raise SystemError( "Invalid platform. ctypes.windll must be available." ) WSAStringToAddressA = not_windows WSAAddressToStringA = not_windows def inet_pton(address_family, ip_string): addr = sockaddr() addr.sa_family = address_family addr_size = ctypes.c_int(ctypes.sizeof(addr)) if WSAStringToAddressA( ip_string, address_family, None, ctypes.byref(addr), ctypes.byref(addr_size) ) != 0: raise socket.error(ctypes.FormatError()) if address_family == socket.AF_INET: return ctypes.string_at(addr.ipv4_addr, 4) if address_family == socket.AF_INET6: return ctypes.string_at(addr.ipv6_addr, 16) raise socket.error('unknown address family') def inet_ntop(address_family, packed_ip): addr = sockaddr() addr.sa_family = address_family addr_size = ctypes.c_int(ctypes.sizeof(addr)) ip_string = ctypes.create_string_buffer(128) ip_string_size = ctypes.c_int(ctypes.sizeof(ip_string)) if address_family == socket.AF_INET: if len(packed_ip) != ctypes.sizeof(addr.ipv4_addr): raise socket.error('packed IP wrong length for inet_ntoa') ctypes.memmove(addr.ipv4_addr, packed_ip, 4) elif address_family == socket.AF_INET6: if len(packed_ip) != ctypes.sizeof(addr.ipv6_addr): raise socket.error('packed IP wrong length for inet_ntoa') ctypes.memmove(addr.ipv6_addr, packed_ip, 16) else: raise socket.error('unknown address family') if WSAAddressToStringA( ctypes.byref(addr), addr_size, None, ip_string, ctypes.byref(ip_string_size) ) != 0: raise socket.error(ctypes.FormatError()) return ip_string[:ip_string_size.value - 1] # Adding our two functions to the socket library if os.name == 'nt': socket.inet_pton = inet_pton socket.inet_ntop = inet_ntop def inet_to_str(inet): return socket.inet_ntop(socket.AF_INET, inet) def print_icmp(pcap): """Print out information about each packet in a pcap Args: pcap: dpkt pcap reader object (dpkt.pcap.Reader) """ # packet num count r_num = 0 # For each packet in the pcap process the contents for timestamp, buf in pcap: r_num=r_num+1 print ('packet num count :' , r_num ) # Unpack the Ethernet frame (mac src/dst, ethertype) eth = dpkt.ethernet.Ethernet(buf) # Make sure the Ethernet data contains an IP packet if not isinstance(eth.data, dpkt.ip.IP): print('Non IP Packet type not supported %s\n' % eth.data.__class__.__name__) continue # Now grab the data within the Ethernet frame (the IP packet) ip = eth.data # Now check if this is an ICMP packet if isinstance(ip.data, dpkt.icmp.ICMP): icmp = ip.data # Pull out fragment information (flags and offset all packed into off field, so use bitmasks) do_not_fragment = bool(ip.off & dpkt.ip.IP_DF) more_fragments = bool(ip.off & dpkt.ip.IP_MF) fragment_offset = ip.off & dpkt.ip.IP_OFFMASK # Print out the info print('Timestamp: ', str(datetime.datetime.utcfromtimestamp(timestamp))) print( 'Ethernet Frame: ', mac_addr(eth.src), mac_addr(eth.dst), eth.type) print( 'IP: %s -> %s (len=%d ttl=%d DF=%d MF=%d offset=%d)' % \ (inet_to_str(ip.src), inet_to_str(ip.dst), ip.len, ip.ttl, do_not_fragment, more_fragments, fragment_offset)) print('ICMP: type:%d code:%d checksum:%d data: %s\n' % (icmp.type, icmp.code, icmp.sum, repr(icmp.data))) def test(): """Open up a test pcap file and print out the packets""" with open('pcap222.pcap', 'rb') as f: pcap = dpkt.pcap.Reader(f) print_icmp(pcap) if __name__ == '__main__': test()
輸出:
('packet num count :', 377) ('Timestamp: ', '2017-08-01 03:45:56.403640') ('Ethernet Frame: ', 'ec:88:8f:86:14:5c', '9c:5c:8e:76:bf:24', 2048) IP: 202.118.168.73 -> 192.168.1.103 (len=56 ttl=253 DF=0 MF=0 offset=0) ICMP: type:3 code:13 checksum:52074 data: Unreach(data=IP(len=28, id=2556, off=16384, ttl=61, p=6, sum=36831, src='\xc0\xa8\x01g', dst='\xcal\x17q', opts='', data='n\xb1\x00P\x85)=]'))
打印HTTP請求
#!/usr/bin/env python """ Use DPKT to read in a pcap file and print out the contents of the packets This example is focused on the fields in the Ethernet Frame and IP packet """ import dpkt import datetime import socket from dpkt.compat import compat_ord import ctypes import os def mac_addr(address): """Convert a MAC address to a readable/printable string Args: address (str): a MAC address in hex form (e.g. '\x01\x02\x03\x04\x05\x06') Returns: str: Printable/readable MAC address """ return ':'.join('%02x' % compat_ord(b) for b in address) class sockaddr(ctypes.Structure): _fields_ = [("sa_family", ctypes.c_short), ("__pad1", ctypes.c_ushort), ("ipv4_addr", ctypes.c_byte * 4), ("ipv6_addr", ctypes.c_byte * 16), ("__pad2", ctypes.c_ulong)] if hasattr(ctypes, 'windll'): WSAStringToAddressA = ctypes.windll.ws2_32.WSAStringToAddressA WSAAddressToStringA = ctypes.windll.ws2_32.WSAAddressToStringA else: def not_windows(): raise SystemError( "Invalid platform. ctypes.windll must be available." ) WSAStringToAddressA = not_windows WSAAddressToStringA = not_windows def inet_pton(address_family, ip_string): addr = sockaddr() addr.sa_family = address_family addr_size = ctypes.c_int(ctypes.sizeof(addr)) if WSAStringToAddressA( ip_string, address_family, None, ctypes.byref(addr), ctypes.byref(addr_size) ) != 0: raise socket.error(ctypes.FormatError()) if address_family == socket.AF_INET: return ctypes.string_at(addr.ipv4_addr, 4) if address_family == socket.AF_INET6: return ctypes.string_at(addr.ipv6_addr, 16) raise socket.error('unknown address family') def inet_ntop(address_family, packed_ip): addr = sockaddr() addr.sa_family = address_family addr_size = ctypes.c_int(ctypes.sizeof(addr)) ip_string = ctypes.create_string_buffer(128) ip_string_size = ctypes.c_int(ctypes.sizeof(ip_string)) if address_family == socket.AF_INET: if len(packed_ip) != ctypes.sizeof(addr.ipv4_addr): raise socket.error('packed IP wrong length for inet_ntoa') ctypes.memmove(addr.ipv4_addr, packed_ip, 4) elif address_family == socket.AF_INET6: if len(packed_ip) != ctypes.sizeof(addr.ipv6_addr): raise socket.error('packed IP wrong length for inet_ntoa') ctypes.memmove(addr.ipv6_addr, packed_ip, 16) else: raise socket.error('unknown address family') if WSAAddressToStringA( ctypes.byref(addr), addr_size, None, ip_string, ctypes.byref(ip_string_size) ) != 0: raise socket.error(ctypes.FormatError()) return ip_string[:ip_string_size.value - 1] # Adding our two functions to the socket library if os.name == 'nt': socket.inet_pton = inet_pton socket.inet_ntop = inet_ntop def inet_to_str(inet): return socket.inet_ntop(socket.AF_INET, inet) def print_http_requests(pcap): """Print out information about each packet in a pcap Args: pcap: dpkt pcap reader object (dpkt.pcap.Reader) """ # packet num count r_num = 0 # For each packet in the pcap process the contents for timestamp, buf in pcap: r_num=r_num+1 print ('packet num count :' , r_num ) # Unpack the Ethernet frame (mac src/dst, ethertype) eth = dpkt.ethernet.Ethernet(buf) # Make sure the Ethernet data contains an IP packet if not isinstance(eth.data, dpkt.ip.IP): print('Non IP Packet type not supported %s\n' % eth.data.__class__.__name__) continue # Now grab the data within the Ethernet frame (the IP packet) ip = eth.data # Check for TCP in the transport layer if isinstance(ip.data, dpkt.tcp.TCP): # Set the TCP data tcp = ip.data # Now see if we can parse the contents as a HTTP request try: request = dpkt.http.Request(tcp.data) except (dpkt.dpkt.NeedData, dpkt.dpkt.UnpackError): continue # Pull out fragment information (flags and offset all packed into off field, so use bitmasks) do_not_fragment = bool(ip.off & dpkt.ip.IP_DF) more_fragments = bool(ip.off & dpkt.ip.IP_MF) fragment_offset = ip.off & dpkt.ip.IP_OFFMASK # Print out the info print('Timestamp: ', str(datetime.datetime.utcfromtimestamp(timestamp))) print('Ethernet Frame: ', mac_addr(eth.src), mac_addr(eth.dst), eth.type) print('IP: %s -> %s (len=%d ttl=%d DF=%d MF=%d offset=%d)' % (inet_to_str(ip.src), inet_to_str(ip.dst), ip.len, ip.ttl, do_not_fragment, more_fragments, fragment_offset)) print('HTTP request: %s\n' % repr(request)) # Check for Header spanning acrossed TCP segments if not tcp.data.endswith(b'\r\n'): print('\nHEADER TRUNCATED! Reassemble TCP segments!\n') def test(): """Open up a test pcap file and print out the packets""" with open('pcap222.pcap', 'rb') as f: pcap = dpkt.pcap.Reader(f) print_http_requests(pcap) if __name__ == '__main__': test()
輸出:
Timestamp: 2004-05-13 10:17:08.222534 Ethernet Frame: 00:00:01:00:00:00 fe:ff:20:00:01:00 2048 IP: 145.254.160.237 -> 65.208.228.223 (len=519 ttl=128 DF=1 MF=0 offset=0) HTTP request: Request(body='', uri='/download.html', headers={'accept-language': 'en-us,en;q=0.5', 'accept-encoding': 'gzip,deflate', 'connection': 'keep-alive', 'keep-alive': '300', 'accept': 'text/xml,application/xml,application/xhtml+xml,text/html;q=0.9,text/plain;q=0.8,image/png,image/jpeg,image/gif;q=0.2,*/*;q=0.1', 'user-agent': 'Mozilla/5.0 (Windows; U; Windows NT 5.1; en-US; rv:1.6) Gecko/20040113', 'accept-charset': 'ISO-8859-1,utf-8;q=0.7,*;q=0.7', 'host': 'www.ethereal.com', 'referer': 'http://www.ethereal.com/development.html'}, version='1.1', data='', method='GET') Timestamp: 2004-05-13 10:17:10.295515 Ethernet Frame: 00:00:01:00:00:00 fe:ff:20:00:01:00 2048 IP: 145.254.160.237 -> 216.239.59.99 (len=761 ttl=128 DF=1 MF=0 offset=0) HTTP request: Request(body='', uri='/pagead/ads?client=ca-pub-2309191948673629&random=1084443430285&lmt=1082467020&format=468x60_as&output=html&url=http%3A%2F%2Fwww.ethereal.com%2Fdownload.html&color_bg=FFFFFF&color_text=333333&color_link=000000&color_url=666633&color_border=666633', headers={'accept-language': 'en-us,en;q=0.5', 'accept-encoding': 'gzip,deflate', 'connection': 'keep-alive', 'keep-alive': '300', 'accept': 'text/xml,application/xml,application/xhtml+xml,text/html;q=0.9,text/plain;q=0.8,image/png,image/jpeg,image/gif;q=0.2,*/*;q=0.1', 'user-agent': 'Mozilla/5.0 (Windows; U; Windows NT 5.1; en-US; rv:1.6) Gecko/20040113', 'accept-charset': 'ISO-8859-1,utf-8;q=0.7,*;q=0.7', 'host': 'pagead2.googlesyndication.com', 'referer': 'http://www.ethereal.com/download.html'}, version='1.1', data='', method='GET') ...
打印出以太網IP
594 MB的pcap解析速度是127秒。
# coding=utf-8 import dpkt import socket import time import ctypes import os import datetime # 測試dpkt獲取IP運行時間 # 使用dpkt獲取時間戳、源IP、目的IP class sockaddr(ctypes.Structure): _fields_ = [("sa_family", ctypes.c_short), ("__pad1", ctypes.c_ushort), ("ipv4_addr", ctypes.c_byte * 4), ("ipv6_addr", ctypes.c_byte * 16), ("__pad2", ctypes.c_ulong)] if hasattr(ctypes, 'windll'): WSAStringToAddressA = ctypes.windll.ws2_32.WSAStringToAddressA WSAAddressToStringA = ctypes.windll.ws2_32.WSAAddressToStringA else: def not_windows(): raise SystemError( "Invalid platform. ctypes.windll must be available." ) WSAStringToAddressA = not_windows WSAAddressToStringA = not_windows def inet_pton(address_family, ip_string): addr = sockaddr() addr.sa_family = address_family addr_size = ctypes.c_int(ctypes.sizeof(addr)) if WSAStringToAddressA( ip_string, address_family, None, ctypes.byref(addr), ctypes.byref(addr_size) ) != 0: raise socket.error(ctypes.FormatError()) if address_family == socket.AF_INET: return ctypes.string_at(addr.ipv4_addr, 4) if address_family == socket.AF_INET6: return ctypes.string_at(addr.ipv6_addr, 16) raise socket.error('unknown address family') def inet_ntop(address_family, packed_ip): addr = sockaddr() addr.sa_family = address_family addr_size = ctypes.c_int(ctypes.sizeof(addr)) ip_string = ctypes.create_string_buffer(128) ip_string_size = ctypes.c_int(ctypes.sizeof(ip_string)) if address_family == socket.AF_INET: if len(packed_ip) != ctypes.sizeof(addr.ipv4_addr): raise socket.error('packed IP wrong length for inet_ntoa') ctypes.memmove(addr.ipv4_addr, packed_ip, 4) elif address_family == socket.AF_INET6: if len(packed_ip) != ctypes.sizeof(addr.ipv6_addr): raise socket.error('packed IP wrong length for inet_ntoa') ctypes.memmove(addr.ipv6_addr, packed_ip, 16) else: raise socket.error('unknown address family') if WSAAddressToStringA( ctypes.byref(addr), addr_size, None, ip_string, ctypes.byref(ip_string_size) ) != 0: raise socket.error(ctypes.FormatError()) return ip_string[:ip_string_size.value - 1] # Adding our two functions to the socket library if os.name == 'nt': socket.inet_pton = inet_pton socket.inet_ntop = inet_ntop def inet_to_str(inet): return socket.inet_ntop(socket.AF_INET, inet) def getip(pcap): Num = 0 for timestamp, buf in pcap: eth = dpkt.ethernet.Ethernet(buf) # 對沒有IP段的包過濾掉 if eth.type != dpkt.ethernet.ETH_TYPE_IP: continue ip = eth.data ip_src = inet_to_str(ip.src) ip_dst = inet_to_str(ip.dst) # 打印時間戳,源->目標 #print(ts + " " + ip_src + "-->" + ip_dst) Num= Num+1 print ('{0}\ttime:{1}\tsrc:{2}-->dst:{3} '.format(Num,timestamp,ip_src ,ip_dst)) if eth.data.__class__.__name__ == 'IP': ip = '%d.%d.%d.%d' % tuple(map(ord, list(eth.data.dst))) if eth.data.data.__class__.__name__ == 'TCP': if eth.data.data.dport == 80: print eth.data.data.data # http 請求的數據 if __name__ == '__main__': starttime = datetime.datetime.now() f = open('pcap222.pcap', 'rb') # 要以rb方式打開,用r方式打開會報錯 pcap = dpkt.pcap.Reader(f) getip(pcap) endtime = datetime.datetime.now() print ('time : {0} seconds '.format((endtime - starttime).seconds))
輸出:
1290064 time:1501562988.75 src:113.142.85.151-->dst:192.168.1.103 1290065 time:1501562988.75 src:192.168.1.103-->dst:113.142.85.151 1290066 time:1501562988.75 src:192.168.1.103-->dst:113.142.85.151 1290067 time:1501562988.75 src:113.142.85.151-->dst:192.168.1.103 1290068 time:1501562988.75 src:192.168.1.103-->dst:113.142.85.151 1290069 time:1501562988.76 src:192.168.1.103-->dst:113.142.85.151 1290070 time:1501562988.76 src:122.228.91.14-->dst:192.168.1.103 1290071 time:1501562988.76 src:192.168.1.103-->dst:113.142.85.151 1290072 time:1501562988.76 src:113.142.85.151-->dst:192.168.1.103 1290073 time:1501562988.76 src:192.168.1.103-->dst:113.142.85.151 1290074 time:1501562988.76 src:192.168.1.103-->dst:113.142.85.151 GET / HTTP/1.1 Accept: application/x-ms-application, image/jpeg, application/xaml+xml, image/gif, image/pjpeg, application/x-ms-xbap, application/vnd.ms-excel, application/vnd.ms-powerpoint, application/msword, */* Accept-Language: zh-cn User-Agent: Mozilla/4.0 (compatible; MSIE 6.0; Mac_PowerPC; en) Opera 9.24 Referer: - Connection: Keep-Alive Host: win7.shangshai-qibao.cn
0x七、參考
SMTP協議分析 https://yq.aliyun.com/wenji/262429
scapy 解析pcap文件總結 https://www.cnblogs.com/14061216chen/p/8093441.html
python之字符串格式化(format) https://www.cnblogs.com/benric/p/4965224.html