Python解析Pcap包類源碼學習

0x一、前言

​ 在現場取證遇到分析流量包的狀況會比較少,雖然流量類設備原理是把數據都抓出來進行解析,很大必定程度上已經把人能夠作的事情交給了機器自動完成。html

​ 可用於PCAP包分析的軟件好比科來,Wireshark都是很好用的分析軟件,找Pcap解析的編程類代碼時發現已經有不少大佬寫過Python腳本輔助解析Pcap,也有提取將Pcap信息以界面形式展現出來框架。python

​ 本文對利用Python裏的Scapy庫提取協議五元組信息進行學習性總結,沒有用於實戰,由於實踐過程當中發現PCAP讀包解包查包速度太慢了。git

0x二、參考庫

Python解析pcap包的常見庫有Scapy、dpkt、Pyshark等。能夠參考的源碼,工具以下github

https://github.com/thepacketgeek/cloud-pcap
https://github.com/madpowah/ForensicPCAP
https://github.com/le4f/pcap-analyzer
https://github.com/HatBoy/Pcap-Analyzer
https://github.com/caesar0301/awesome-pcaptools
https://asecuritysite.com/forensics/pcap
https://github.com/DanMcInerney/net-creds

提供解析Pcap包服務的網站https://packettotal.com/、https://www.capanalysis.net/ca/、https://asecuritysite.com/forensics/pcap?infile=smtp.pcap&infile=smtp.pcap、https://app.any.run編程

0x三、郵件協議

提取發件人的郵箱,就要熟悉SMTP的幾個端口進行識別。windows

25  端口爲SMTP(Simple Mail Transfer Protocol,簡單郵件傳輸協議)服務所開放的,是用於發送郵件。
110端口是爲POP3(郵件協議3)服務開放的,POP二、POP3都是主要用於接收郵件的,目前POP3使用的比較多,許多服務器都同時支持POP2和POP3。
143端口主要是用於「Internet Message AccessProtocol」v2(Internet消息訪問協議,簡稱IMAP),和POP3同樣,是用於電子郵件的接收的協議。
465 端口是SSL/TLS通信協議的 內容一開始就被保護起來了 是看不到原文的。
465 端口(SMTPS):465端口是爲SMTPS(SMTP-over-SSL)協議服務開放的,這是SMTP協議基於SSL安全協議之上的一種變種協議,它繼承了SSL安全協議的非對稱加密的高度安全可靠性,可防止郵件泄露。
587 端口是STARTTLS協議的 屬於TLS通信協議 只是他是在STARTTLS命令執行後纔對以後的原文進行保護的。

SMTP經常使用命令語法

在Wireshark中SMTP協議數據是在創建TCP三次握手後會出現的,經常使用的命令以下。api

SMTP命令不區分大小寫,但參數區分大小寫,有關這方面的詳細說明請參考RFC821。
HELO <domain> <CRLF>。向服務器標識用戶身份發送者能欺騙,說謊,但通常狀況下服務器都能檢測到。
MAIL FROM: <reverse-path> <CRLF>。<reverse-path>爲發送者地址,此命令用來初始化郵件傳輸,即用來對全部的狀態和緩衝區進行初始化。
RCPT TO:<forward-path> <CRLF>。 <forward-path>用來標誌郵件接收者的地址,經常使用在MAIL FROM後,能夠有多個RCPT TO。
DATA <CRLF>。將以後的數據做爲數據發送,以<CRLF>.<CRLF>標誌數據的結尾。
REST <CRLF>。重置會話,當前傳輸被取消。
NOOP <CRLF>。要求服務器返回OK應答,通常用做測試。
QUIT <CRLF>。結束會話。
VRFY <string> <CRLF>。驗證指定的郵箱是否存在,因爲安全方面的緣由,服務器大多禁止此命令。
EXPN <string> <CRLF>。驗證給定的郵箱列表是否存在,因爲安全方面的緣由,服務器大多禁止此命令。
HELP <CRLF>。查詢服務器支持什麼命令。

代碼

forensicPCAP 能夠根據解析PCAP包,而後利用CMD模塊循環交互界面,輸入命令調出對應函數。核心原理爲安全

基於Scapy找到源端口或目的端口爲1十、143端口的數據記錄保存做爲郵件數據。服務器

關鍵代碼代碼以下:網絡

self.pcap是構造函數self.pcap = rdpcap(namefile)提早讀取,而後enumerate()遍歷。指定目標端口和源端口是1十、143的數據篩選出來。

def do_mail(self, arg, opts=None):
		"""Print the number of mail's requests and store its
Usage :
- mail"""
		sys.stdout.write(bcolors.TXT + "## Searching mail's request ... ")
		sys.stdout.flush()
		con = []
		mailpkts = []
		for i,packet in enumerate(self.pcap):
		    # TCP的包
			if TCP in packet:
			    # 獲取源端口或目的端口爲1十、143端口的數據記錄
				if packet.getlayer('TCP').dport == 110 or packet.getlayer('TCP').sport == 110 or packet.getlayer('TCP').dport == 143 or packet.getlayer('TCP').sport == 143 :
					if packet.getlayer('TCP').flags == 2:
						con.append(i)
					mailpkts.append(packet)
		sys.stdout.write("OK.\n")
		print "## Result : Mail's request : " + str(len(con))  
		sys.stdout.write(bcolors.TXT + "## Saving mails ... ")
		sys.stdout.flush()
		res = ""
		for packet in mailpkts:
				if packet.getlayer('TCP').flags == 24:
					res = res + packet.getlayer('Raw').load
					sys.stdout.write(".")
					sys.stdout.flush()
		sys.stdout.write("OK\n")
		sys.stdout.flush()			
		self.cmd = "mail"			
		self.last = res

0x四、DNS解析

forensicPCAP 解析DNS部分代碼。bcolors.TXT是設定了一個字體顯示的顏色,res = packet.getlayer('DNS').qd.qname是獲取DNS域名解析記錄。

############# do_dns() ###########
    def do_dns(self, arg, opts=None):
        """Print all DNS requests in the PCAP file
Usage :
- dns"""
        sys.stdout.write(bcolors.TXT + "## Listing all DNS requests ...")
        sys.stdout.flush()
        dns = []
        dns.append([])
        
        # 枚舉PCAP包的數據
        for i,packet in enumerate(self.pcap):
            if DNS in packet:
                # 獲取DNS域名解析記錄
                res = packet.getlayer('DNS').qd.qname
                if res[len(res) - 1] == '.':
                    res = res[:-1]
                # 保存域名
                dns.append([i, res])

        sys.stdout.write("OK.\n")
        # 統計DNS數目
        print bcolors.TXT + "## Result : " + str(len(dns) - 1) + " DNS request(s)" + bcolors.ENDC
        self.last = dns
        self.cmd = "dns"

pcap-analyzer核心的代碼是借鑑了forensicPCAP,獲取解析爲DNS的請求記錄顯示的時候會統計出次數最多的IP的前十名。

代碼

def get_dns(file):
    dns = []
    # 打開PCAP文件
    pcap = rdpcap(UPLOAD_FOLDER+file)
    for packet in pcap:
        if DNS in packet:
                # 核心代碼
                res = packet.getlayer('DNS').qd.qname
                if res[len(res) - 1] == '.':
                    res = res[:-1]
                dns.append(res)
    # 統計DNS協議,出現次數最多的IP前十名            
    dns = Counter(dns).most_common(10)

0x五、密碼信息提取

net-creds 以Scapy爲基礎,解析PCAP中含有密碼信息的一個腳本。

代碼

主要代碼由other_parser()函數實現,分割每一個包中的HTTP等內容,而後搜索身份驗證相關的關鍵字篩選出帳戶、密碼。

def other_parser(src_ip_port, dst_ip_port, full_load, ack, seq, pkt, verbose):
    '''
    Pull out pertinent info from the parsed HTTP packet data
    '''
    user_passwd = None
    http_url_req = None
    method = None
    http_methods = ['GET ', 'POST ', 'CONNECT ', 'TRACE ', 'TRACK ', 'PUT ', 'DELETE ', 'HEAD ']
    http_line, header_lines, body = parse_http_load(full_load, http_methods)
    headers = headers_to_dict(header_lines)
    if 'host' in headers:
        host = headers['host']
    else:
        host = ''

    if http_line != None:
        method, path = parse_http_line(http_line, http_methods)
        http_url_req = get_http_url(method, host, path, headers)
        if http_url_req != None:
            if verbose == False:
                if len(http_url_req) > 98:
                    http_url_req = http_url_req[:99] + '...'
            printer(src_ip_port, None, http_url_req)

    # Print search terms
    searched = get_http_searches(http_url_req, body, host)
    if searched:
        printer(src_ip_port, dst_ip_port, searched)

    # Print user/pwds
    if body != '':
        user_passwd = get_login_pass(body)
        if user_passwd != None:
            try:
                http_user = user_passwd[0].decode('utf8')
                http_pass = user_passwd[1].decode('utf8')
                # Set a limit on how long they can be prevent false+
                if len(http_user) > 75 or len(http_pass) > 75:
                    return
                user_msg = 'HTTP username: %s' % http_user
                printer(src_ip_port, dst_ip_port, user_msg)
                pass_msg = 'HTTP password: %s' % http_pass
                printer(src_ip_port, dst_ip_port, pass_msg)
            except UnicodeDecodeError:
                pass

    # Print POST loads
    # ocsp is a common SSL post load that's never interesting
    if method == 'POST' and 'ocsp.' not in host:
        try:
            if verbose == False and len(body) > 99:
                # If it can't decode to utf8 we're probably not interested in it
                msg = 'POST load: %s...' % body[:99].encode('utf8')
            else:
                msg = 'POST load: %s' % body.encode('utf8')
            printer(src_ip_port, None, msg)
        except UnicodeDecodeError:
            pass

    # Kerberos over TCP
    decoded = Decode_Ip_Packet(str(pkt)[14:])
    kerb_hash = ParseMSKerbv5TCP(decoded['data'][20:])
    if kerb_hash:
        printer(src_ip_port, dst_ip_port, kerb_hash)

    # Non-NETNTLM NTLM hashes (MSSQL, DCE-RPC,SMBv1/2,LDAP, MSSQL)
    NTLMSSP2 = re.search(NTLMSSP2_re, full_load, re.DOTALL)
    NTLMSSP3 = re.search(NTLMSSP3_re, full_load, re.DOTALL)
    if NTLMSSP2:
        parse_ntlm_chal(NTLMSSP2.group(), ack)
    if NTLMSSP3:
        ntlm_resp_found = parse_ntlm_resp(NTLMSSP3.group(), seq)
        if ntlm_resp_found != None:
            printer(src_ip_port, dst_ip_port, ntlm_resp_found)

    # Look for authentication headers
    if len(headers) == 0:
        authenticate_header = None
        authorization_header = None
    for header in headers:
        authenticate_header = re.match(authenticate_re, header)
        authorization_header = re.match(authorization_re, header)
        if authenticate_header or authorization_header:
            break

    if authorization_header or authenticate_header:
        # NETNTLM
        netntlm_found = parse_netntlm(authenticate_header, authorization_header, headers, ack, seq)
        if netntlm_found != None:
            printer(src_ip_port, dst_ip_port, netntlm_found)

        # Basic Auth
        parse_basic_auth(src_ip_port, dst_ip_port, headers, authorization_header)

關鍵字列表:

# Regexs
authenticate_re = '(www-|proxy-)?authenticate'
authorization_re = '(www-|proxy-)?authorization'
ftp_user_re = r'USER (.+)\r\n'
ftp_pw_re = r'PASS (.+)\r\n'
irc_user_re = r'NICK (.+?)((\r)?\n|\s)'
irc_pw_re = r'NS IDENTIFY (.+)'
irc_pw_re2 = 'nickserv :identify (.+)'
mail_auth_re = '(\d+ )?(auth|authenticate) (login|plain)'
mail_auth_re1 =  '(\d+ )?login '
NTLMSSP2_re = 'NTLMSSP\x00\x02\x00\x00\x00.+'
NTLMSSP3_re = 'NTLMSSP\x00\x03\x00\x00\x00.+'
# Prone to false+ but prefer that to false-
http_search_re = '((search|query|&q|\?q|search\?p|searchterm|keywords|keyword|command|terms|keys|question|kwd|searchPhrase)=([^&][^&]*))'

0x六、提取數據須要關注的元素

  • 源IP、目的IP、源端口、目的端口、協議、數據包大小
關聯出受害者,攻擊者控制的跳板
  • 協議:HTTP、FTP、郵件協議
- 先查看HTTP、HTTPS類,而後查看數據包長度。
- 提取相關IP信息。
- 判斷是不是木馬遠控、密碼帳戶、可疑IP
  • 帳戶密碼字段
- 滲透測試,嗅探回來的數據包分析含有帳戶密碼信息
- 暴力破解IP

0x七、測試dpkt解析pcap

本想借助dpkt解析mail、dns、http來輔助分析pcap包進行分析,查閱資料學習卻發現並不如使用scapy那麼方便。

dpkt是一個python模塊,能夠對簡單的數據包建立/解析,以及基本TCP / IP協議的解析,速度很快。

dpkt 手冊

https://dpkt.readthedocs.io/en/latest/ dpkt 下載

https://pypi.org/project/dpkt/

看官方手冊發現DPKT是讀取每一個pcap包裏的內容,用isinstance判斷是否是有IP的包,再判斷是屬於哪一個協議,對應的協議已經封裝好API若是發現能夠匹配某個協議API就輸出來相關值。

想要擴展這個源碼還須要去學習一下協議相關的字段含義。

API調用:

https://dpkt.readthedocs.io/en/latest/api/api_auto.html#module-dpkt.qq

在手冊中找到了在Github中部分API的示例代碼,具有參考價值。

https://github.com/jeffsilverm/dpkt_doc

手冊例子

如下代碼是手冊中的例子,經過查詢發現inet_pton沒法直接使用,按照網絡上的解決方法修改了一下。

打印數據包

使用DPKT讀取pcap文件並打印出數據包的內容。打印出以太網幀和IP數據包中的字段。

python2測試代碼:

#!/usr/bin/env python
"""
Use DPKT to read in a pcap file and print out the contents of the packets
This example is focused on the fields in the Ethernet Frame and IP packet
"""
import dpkt
import datetime
import socket
from dpkt.compat import compat_ord
import ctypes
import os


def mac_addr(address):
    """Convert a MAC address to a readable/printable string

       Args:
           address (str): a MAC address in hex form (e.g. '\x01\x02\x03\x04\x05\x06')
       Returns:
           str: Printable/readable MAC address
    """
    return ':'.join('%02x' % compat_ord(b) for b in address)


class sockaddr(ctypes.Structure):
    _fields_ = [("sa_family", ctypes.c_short),
                ("__pad1", ctypes.c_ushort),
                ("ipv4_addr", ctypes.c_byte * 4),
                ("ipv6_addr", ctypes.c_byte * 16),
                ("__pad2", ctypes.c_ulong)]

if hasattr(ctypes, 'windll'):
    WSAStringToAddressA = ctypes.windll.ws2_32.WSAStringToAddressA
    WSAAddressToStringA = ctypes.windll.ws2_32.WSAAddressToStringA
else:
    def not_windows():
        raise SystemError(
            "Invalid platform. ctypes.windll must be available."
        )
    WSAStringToAddressA = not_windows
    WSAAddressToStringA = not_windows


def inet_pton(address_family, ip_string):
    addr = sockaddr()
    addr.sa_family = address_family
    addr_size = ctypes.c_int(ctypes.sizeof(addr))

    if WSAStringToAddressA(
            ip_string,
            address_family,
            None,
            ctypes.byref(addr),
            ctypes.byref(addr_size)
    ) != 0:
        raise socket.error(ctypes.FormatError())

    if address_family == socket.AF_INET:
        return ctypes.string_at(addr.ipv4_addr, 4)
    if address_family == socket.AF_INET6:
        return ctypes.string_at(addr.ipv6_addr, 16)

    raise socket.error('unknown address family')


def inet_ntop(address_family, packed_ip):
    addr = sockaddr()
    addr.sa_family = address_family
    addr_size = ctypes.c_int(ctypes.sizeof(addr))
    ip_string = ctypes.create_string_buffer(128)
    ip_string_size = ctypes.c_int(ctypes.sizeof(ip_string))

    if address_family == socket.AF_INET:
        if len(packed_ip) != ctypes.sizeof(addr.ipv4_addr):
            raise socket.error('packed IP wrong length for inet_ntoa')
        ctypes.memmove(addr.ipv4_addr, packed_ip, 4)
    elif address_family == socket.AF_INET6:
        if len(packed_ip) != ctypes.sizeof(addr.ipv6_addr):
            raise socket.error('packed IP wrong length for inet_ntoa')
        ctypes.memmove(addr.ipv6_addr, packed_ip, 16)
    else:
        raise socket.error('unknown address family')

    if WSAAddressToStringA(
            ctypes.byref(addr),
            addr_size,
            None,
            ip_string,
            ctypes.byref(ip_string_size)
    ) != 0:
        raise socket.error(ctypes.FormatError())

    return ip_string[:ip_string_size.value - 1]

# Adding our two functions to the socket library
if os.name == 'nt':
    socket.inet_pton = inet_pton
    socket.inet_ntop = inet_ntop

def inet_to_str(inet):
    return socket.inet_ntop(socket.AF_INET, inet)

def print_packets(pcap):
    """Print out information about each packet in a pcap

       Args:
           pcap: dpkt pcap reader object (dpkt.pcap.Reader)
    """
    # packet num count
    r_num = 0
    # For each packet in the pcap process the contents
    for timestamp, buf in pcap:
        r_num=r_num+1
        print ('packet num count :' , r_num )
        # Print out the timestamp in UTC
        print('Timestamp: ', str(datetime.datetime.utcfromtimestamp(timestamp)))

        # Unpack the Ethernet frame (mac src/dst, ethertype)
        eth = dpkt.ethernet.Ethernet(buf)
        print('Ethernet Frame: ', mac_addr(eth.src), mac_addr(eth.dst), eth.type)

        # Make sure the Ethernet data contains an IP packet
        if not isinstance(eth.data, dpkt.ip.IP):
            print('Non IP Packet type not supported %s\n' % eth.data.__class__.__name__)
            continue

        # Now unpack the data within the Ethernet frame (the IP packet)
        # Pulling out src, dst, length, fragment info, TTL, and Protocol
        ip = eth.data

        # Pull out fragment information (flags and offset all packed into off field, so use bitmasks)
        do_not_fragment = bool(ip.off & dpkt.ip.IP_DF)
        more_fragments = bool(ip.off & dpkt.ip.IP_MF)
        fragment_offset = ip.off & dpkt.ip.IP_OFFMASK

        # Print out the info
        print('IP: %s -> %s   (len=%d ttl=%d DF=%d MF=%d offset=%d)\n' % \
              (inet_to_str(ip.src), inet_to_str(ip.dst), ip.len, ip.ttl, do_not_fragment, more_fragments, fragment_offset))



def test():
    """Open up a test pcap file and print out the packets"""
    with open('pcap222.pcap', 'rb') as f:
        pcap = dpkt.pcap.Reader(f)
        print_packets(pcap)

if __name__ == '__main__':
    test()

輸出:

('packet num count :', 4474)
('Timestamp: ', '2017-08-01 03:55:03.314832')
('Ethernet Frame: ', '9c:5c:8e:76:bf:24', 'ec:88:8f:86:14:5c', 2048)
IP: 192.168.1.103 -> 211.90.25.31   (len=52 ttl=64 DF=1 MF=0 offset=0)

('packet num count :', 4475)
('Timestamp: ', '2017-08-01 03:55:03.485679')
('Ethernet Frame: ', '9c:5c:8e:76:bf:24', 'ec:88:8f:86:14:5c', 2048)
IP: 192.168.1.103 -> 180.97.33.12   (len=114 ttl=64 DF=0 MF=0 offset=0)

('packet num count :', 4476)
('Timestamp: ', '2017-08-01 03:55:03.486141')
('Ethernet Frame: ', '9c:5c:8e:76:bf:24', 'ec:88:8f:86:14:5c', 2048)
IP: 192.168.1.103 -> 119.75.222.122   (len=52 ttl=64 DF=1 MF=0 offset=0)

打印ICMP

檢查ICMP數據包並顯示ICMP內容。

#!/usr/bin/env python
"""
Use DPKT to read in a pcap file and print out the contents of the packets
This example is focused on the fields in the Ethernet Frame and IP packet
"""
import dpkt
import datetime
import socket
from dpkt.compat import compat_ord
import ctypes
import os


def mac_addr(address):
    """Convert a MAC address to a readable/printable string

       Args:
           address (str): a MAC address in hex form (e.g. '\x01\x02\x03\x04\x05\x06')
       Returns:
           str: Printable/readable MAC address
    """
    return ':'.join('%02x' % compat_ord(b) for b in address)


class sockaddr(ctypes.Structure):
    _fields_ = [("sa_family", ctypes.c_short),
                ("__pad1", ctypes.c_ushort),
                ("ipv4_addr", ctypes.c_byte * 4),
                ("ipv6_addr", ctypes.c_byte * 16),
                ("__pad2", ctypes.c_ulong)]

if hasattr(ctypes, 'windll'):
    WSAStringToAddressA = ctypes.windll.ws2_32.WSAStringToAddressA
    WSAAddressToStringA = ctypes.windll.ws2_32.WSAAddressToStringA
else:
    def not_windows():
        raise SystemError(
            "Invalid platform. ctypes.windll must be available."
        )
    WSAStringToAddressA = not_windows
    WSAAddressToStringA = not_windows


def inet_pton(address_family, ip_string):
    addr = sockaddr()
    addr.sa_family = address_family
    addr_size = ctypes.c_int(ctypes.sizeof(addr))

    if WSAStringToAddressA(
            ip_string,
            address_family,
            None,
            ctypes.byref(addr),
            ctypes.byref(addr_size)
    ) != 0:
        raise socket.error(ctypes.FormatError())

    if address_family == socket.AF_INET:
        return ctypes.string_at(addr.ipv4_addr, 4)
    if address_family == socket.AF_INET6:
        return ctypes.string_at(addr.ipv6_addr, 16)

    raise socket.error('unknown address family')


def inet_ntop(address_family, packed_ip):
    addr = sockaddr()
    addr.sa_family = address_family
    addr_size = ctypes.c_int(ctypes.sizeof(addr))
    ip_string = ctypes.create_string_buffer(128)
    ip_string_size = ctypes.c_int(ctypes.sizeof(ip_string))

    if address_family == socket.AF_INET:
        if len(packed_ip) != ctypes.sizeof(addr.ipv4_addr):
            raise socket.error('packed IP wrong length for inet_ntoa')
        ctypes.memmove(addr.ipv4_addr, packed_ip, 4)
    elif address_family == socket.AF_INET6:
        if len(packed_ip) != ctypes.sizeof(addr.ipv6_addr):
            raise socket.error('packed IP wrong length for inet_ntoa')
        ctypes.memmove(addr.ipv6_addr, packed_ip, 16)
    else:
        raise socket.error('unknown address family')

    if WSAAddressToStringA(
            ctypes.byref(addr),
            addr_size,
            None,
            ip_string,
            ctypes.byref(ip_string_size)
    ) != 0:
        raise socket.error(ctypes.FormatError())

    return ip_string[:ip_string_size.value - 1]

# Adding our two functions to the socket library
if os.name == 'nt':
    socket.inet_pton = inet_pton
    socket.inet_ntop = inet_ntop

def inet_to_str(inet):
    return socket.inet_ntop(socket.AF_INET, inet)

def print_icmp(pcap):
    """Print out information about each packet in a pcap

       Args:
           pcap: dpkt pcap reader object (dpkt.pcap.Reader)
    """
    # packet num count
    r_num = 0
    # For each packet in the pcap process the contents
    for timestamp, buf in pcap:
        r_num=r_num+1
        print ('packet num count :' , r_num )
        # Unpack the Ethernet frame (mac src/dst, ethertype)
        eth = dpkt.ethernet.Ethernet(buf)

        # Make sure the Ethernet data contains an IP packet
        if not isinstance(eth.data, dpkt.ip.IP):
            print('Non IP Packet type not supported %s\n' % eth.data.__class__.__name__)
            continue

        # Now grab the data within the Ethernet frame (the IP packet)
        ip = eth.data

        # Now check if this is an ICMP packet
        if isinstance(ip.data, dpkt.icmp.ICMP):
            icmp = ip.data

            # Pull out fragment information (flags and offset all packed into off field, so use bitmasks)
            do_not_fragment = bool(ip.off & dpkt.ip.IP_DF)
            more_fragments = bool(ip.off & dpkt.ip.IP_MF)
            fragment_offset = ip.off & dpkt.ip.IP_OFFMASK

            # Print out the info
            print('Timestamp: ', str(datetime.datetime.utcfromtimestamp(timestamp)))
            print( 'Ethernet Frame: ', mac_addr(eth.src), mac_addr(eth.dst), eth.type)
            print( 'IP: %s -> %s   (len=%d ttl=%d DF=%d MF=%d offset=%d)' % \
                  (inet_to_str(ip.src), inet_to_str(ip.dst), ip.len, ip.ttl, do_not_fragment, more_fragments, fragment_offset))
            print('ICMP: type:%d code:%d checksum:%d data: %s\n' % (icmp.type, icmp.code, icmp.sum, repr(icmp.data)))



def test():
    """Open up a test pcap file and print out the packets"""
    with open('pcap222.pcap', 'rb') as f:
        pcap = dpkt.pcap.Reader(f)
        print_icmp(pcap)



if __name__ == '__main__':
    test()

輸出:

('packet num count :', 377)
('Timestamp: ', '2017-08-01 03:45:56.403640')
('Ethernet Frame: ', 'ec:88:8f:86:14:5c', '9c:5c:8e:76:bf:24', 2048)
IP: 202.118.168.73 -> 192.168.1.103   (len=56 ttl=253 DF=0 MF=0 offset=0)
ICMP: type:3 code:13 checksum:52074 data: Unreach(data=IP(len=28, id=2556, off=16384, ttl=61, p=6, sum=36831, src='\xc0\xa8\x01g', dst='\xcal\x17q', opts='', data='n\xb1\x00P\x85)=]'))

打印HTTP請求

#!/usr/bin/env python
"""
Use DPKT to read in a pcap file and print out the contents of the packets
This example is focused on the fields in the Ethernet Frame and IP packet
"""
import dpkt
import datetime
import socket
from dpkt.compat import compat_ord
import ctypes
import os


def mac_addr(address):
    """Convert a MAC address to a readable/printable string

       Args:
           address (str): a MAC address in hex form (e.g. '\x01\x02\x03\x04\x05\x06')
       Returns:
           str: Printable/readable MAC address
    """
    return ':'.join('%02x' % compat_ord(b) for b in address)


class sockaddr(ctypes.Structure):
    _fields_ = [("sa_family", ctypes.c_short),
                ("__pad1", ctypes.c_ushort),
                ("ipv4_addr", ctypes.c_byte * 4),
                ("ipv6_addr", ctypes.c_byte * 16),
                ("__pad2", ctypes.c_ulong)]

if hasattr(ctypes, 'windll'):
    WSAStringToAddressA = ctypes.windll.ws2_32.WSAStringToAddressA
    WSAAddressToStringA = ctypes.windll.ws2_32.WSAAddressToStringA
else:
    def not_windows():
        raise SystemError(
            "Invalid platform. ctypes.windll must be available."
        )
    WSAStringToAddressA = not_windows
    WSAAddressToStringA = not_windows


def inet_pton(address_family, ip_string):
    addr = sockaddr()
    addr.sa_family = address_family
    addr_size = ctypes.c_int(ctypes.sizeof(addr))

    if WSAStringToAddressA(
            ip_string,
            address_family,
            None,
            ctypes.byref(addr),
            ctypes.byref(addr_size)
    ) != 0:
        raise socket.error(ctypes.FormatError())

    if address_family == socket.AF_INET:
        return ctypes.string_at(addr.ipv4_addr, 4)
    if address_family == socket.AF_INET6:
        return ctypes.string_at(addr.ipv6_addr, 16)

    raise socket.error('unknown address family')


def inet_ntop(address_family, packed_ip):
    addr = sockaddr()
    addr.sa_family = address_family
    addr_size = ctypes.c_int(ctypes.sizeof(addr))
    ip_string = ctypes.create_string_buffer(128)
    ip_string_size = ctypes.c_int(ctypes.sizeof(ip_string))

    if address_family == socket.AF_INET:
        if len(packed_ip) != ctypes.sizeof(addr.ipv4_addr):
            raise socket.error('packed IP wrong length for inet_ntoa')
        ctypes.memmove(addr.ipv4_addr, packed_ip, 4)
    elif address_family == socket.AF_INET6:
        if len(packed_ip) != ctypes.sizeof(addr.ipv6_addr):
            raise socket.error('packed IP wrong length for inet_ntoa')
        ctypes.memmove(addr.ipv6_addr, packed_ip, 16)
    else:
        raise socket.error('unknown address family')

    if WSAAddressToStringA(
            ctypes.byref(addr),
            addr_size,
            None,
            ip_string,
            ctypes.byref(ip_string_size)
    ) != 0:
        raise socket.error(ctypes.FormatError())

    return ip_string[:ip_string_size.value - 1]

# Adding our two functions to the socket library
if os.name == 'nt':
    socket.inet_pton = inet_pton
    socket.inet_ntop = inet_ntop

def inet_to_str(inet):
    return socket.inet_ntop(socket.AF_INET, inet)

def print_http_requests(pcap):
    """Print out information about each packet in a pcap

       Args:
           pcap: dpkt pcap reader object (dpkt.pcap.Reader)
    """
    # packet num count
    r_num = 0
    # For each packet in the pcap process the contents
    for timestamp, buf in pcap:
        r_num=r_num+1
        print ('packet num count :' , r_num )
        # Unpack the Ethernet frame (mac src/dst, ethertype)
        eth = dpkt.ethernet.Ethernet(buf)

        # Make sure the Ethernet data contains an IP packet
        if not isinstance(eth.data, dpkt.ip.IP):
            print('Non IP Packet type not supported %s\n' % eth.data.__class__.__name__)
            continue

        # Now grab the data within the Ethernet frame (the IP packet)
        ip = eth.data

        # Check for TCP in the transport layer
        if isinstance(ip.data, dpkt.tcp.TCP):

            # Set the TCP data
            tcp = ip.data

            # Now see if we can parse the contents as a HTTP request
            try:
                request = dpkt.http.Request(tcp.data)
            except (dpkt.dpkt.NeedData, dpkt.dpkt.UnpackError):
                continue

            # Pull out fragment information (flags and offset all packed into off field, so use bitmasks)
            do_not_fragment = bool(ip.off & dpkt.ip.IP_DF)
            more_fragments = bool(ip.off & dpkt.ip.IP_MF)
            fragment_offset = ip.off & dpkt.ip.IP_OFFMASK

            # Print out the info
            print('Timestamp: ', str(datetime.datetime.utcfromtimestamp(timestamp)))
            print('Ethernet Frame: ', mac_addr(eth.src), mac_addr(eth.dst), eth.type)
            print('IP: %s -> %s   (len=%d ttl=%d DF=%d MF=%d offset=%d)' %
                  (inet_to_str(ip.src), inet_to_str(ip.dst), ip.len, ip.ttl, do_not_fragment, more_fragments, fragment_offset))
            print('HTTP request: %s\n' % repr(request))

            # Check for Header spanning acrossed TCP segments
            if not tcp.data.endswith(b'\r\n'):
                print('\nHEADER TRUNCATED! Reassemble TCP segments!\n')




def test():
    """Open up a test pcap file and print out the packets"""
    with open('pcap222.pcap', 'rb') as f:
        pcap = dpkt.pcap.Reader(f)
        print_http_requests(pcap)

if __name__ == '__main__':
    test()

輸出:

Timestamp:  2004-05-13 10:17:08.222534
Ethernet Frame:  00:00:01:00:00:00 fe:ff:20:00:01:00 2048
IP: 145.254.160.237 -> 65.208.228.223   (len=519 ttl=128 DF=1 MF=0 offset=0)
HTTP request: Request(body='', uri='/download.html', headers={'accept-language': 'en-us,en;q=0.5', 'accept-encoding': 'gzip,deflate', 'connection': 'keep-alive', 'keep-alive': '300', 'accept': 'text/xml,application/xml,application/xhtml+xml,text/html;q=0.9,text/plain;q=0.8,image/png,image/jpeg,image/gif;q=0.2,*/*;q=0.1', 'user-agent': 'Mozilla/5.0 (Windows; U; Windows NT 5.1; en-US; rv:1.6) Gecko/20040113', 'accept-charset': 'ISO-8859-1,utf-8;q=0.7,*;q=0.7', 'host': 'www.ethereal.com', 'referer': 'http://www.ethereal.com/development.html'}, version='1.1', data='', method='GET')

Timestamp:  2004-05-13 10:17:10.295515
Ethernet Frame:  00:00:01:00:00:00 fe:ff:20:00:01:00 2048
IP: 145.254.160.237 -> 216.239.59.99   (len=761 ttl=128 DF=1 MF=0 offset=0)
HTTP request: Request(body='', uri='/pagead/ads?client=ca-pub-2309191948673629&random=1084443430285&lmt=1082467020&format=468x60_as&output=html&url=http%3A%2F%2Fwww.ethereal.com%2Fdownload.html&color_bg=FFFFFF&color_text=333333&color_link=000000&color_url=666633&color_border=666633', headers={'accept-language': 'en-us,en;q=0.5', 'accept-encoding': 'gzip,deflate', 'connection': 'keep-alive', 'keep-alive': '300', 'accept': 'text/xml,application/xml,application/xhtml+xml,text/html;q=0.9,text/plain;q=0.8,image/png,image/jpeg,image/gif;q=0.2,*/*;q=0.1', 'user-agent': 'Mozilla/5.0 (Windows; U; Windows NT 5.1; en-US; rv:1.6) Gecko/20040113', 'accept-charset': 'ISO-8859-1,utf-8;q=0.7,*;q=0.7', 'host': 'pagead2.googlesyndication.com', 'referer': 'http://www.ethereal.com/download.html'}, version='1.1', data='', method='GET')

...

打印出以太網IP

594 MB的pcap解析速度是127秒。

# coding=utf-8
import dpkt
import socket
import time
import ctypes
import os
import datetime

# 測試dpkt獲取IP運行時間
# 使用dpkt獲取時間戳、源IP、目的IP

class sockaddr(ctypes.Structure):
    _fields_ = [("sa_family", ctypes.c_short),
                ("__pad1", ctypes.c_ushort),
                ("ipv4_addr", ctypes.c_byte * 4),
                ("ipv6_addr", ctypes.c_byte * 16),
                ("__pad2", ctypes.c_ulong)]

if hasattr(ctypes, 'windll'):
    WSAStringToAddressA = ctypes.windll.ws2_32.WSAStringToAddressA
    WSAAddressToStringA = ctypes.windll.ws2_32.WSAAddressToStringA
else:
    def not_windows():
        raise SystemError(
            "Invalid platform. ctypes.windll must be available."
        )
    WSAStringToAddressA = not_windows
    WSAAddressToStringA = not_windows


def inet_pton(address_family, ip_string):
    addr = sockaddr()
    addr.sa_family = address_family
    addr_size = ctypes.c_int(ctypes.sizeof(addr))

    if WSAStringToAddressA(
            ip_string,
            address_family,
            None,
            ctypes.byref(addr),
            ctypes.byref(addr_size)
    ) != 0:
        raise socket.error(ctypes.FormatError())

    if address_family == socket.AF_INET:
        return ctypes.string_at(addr.ipv4_addr, 4)
    if address_family == socket.AF_INET6:
        return ctypes.string_at(addr.ipv6_addr, 16)

    raise socket.error('unknown address family')


def inet_ntop(address_family, packed_ip):
    addr = sockaddr()
    addr.sa_family = address_family
    addr_size = ctypes.c_int(ctypes.sizeof(addr))
    ip_string = ctypes.create_string_buffer(128)
    ip_string_size = ctypes.c_int(ctypes.sizeof(ip_string))

    if address_family == socket.AF_INET:
        if len(packed_ip) != ctypes.sizeof(addr.ipv4_addr):
            raise socket.error('packed IP wrong length for inet_ntoa')
        ctypes.memmove(addr.ipv4_addr, packed_ip, 4)
    elif address_family == socket.AF_INET6:
        if len(packed_ip) != ctypes.sizeof(addr.ipv6_addr):
            raise socket.error('packed IP wrong length for inet_ntoa')
        ctypes.memmove(addr.ipv6_addr, packed_ip, 16)
    else:
        raise socket.error('unknown address family')

    if WSAAddressToStringA(
            ctypes.byref(addr),
            addr_size,
            None,
            ip_string,
            ctypes.byref(ip_string_size)
    ) != 0:
        raise socket.error(ctypes.FormatError())

    return ip_string[:ip_string_size.value - 1]

# Adding our two functions to the socket library
if os.name == 'nt':
    socket.inet_pton = inet_pton
    socket.inet_ntop = inet_ntop

def inet_to_str(inet):
    return socket.inet_ntop(socket.AF_INET, inet)


def getip(pcap):

    Num = 0
    for timestamp, buf in pcap:
        eth = dpkt.ethernet.Ethernet(buf)

        # 對沒有IP段的包過濾掉
        if eth.type != dpkt.ethernet.ETH_TYPE_IP:
            continue

        ip = eth.data
        ip_src = inet_to_str(ip.src)
        ip_dst = inet_to_str(ip.dst)
        # 打印時間戳,源->目標

        #print(ts + " " + ip_src + "-->" + ip_dst)
        Num= Num+1
        print ('{0}\ttime:{1}\tsrc:{2}-->dst:{3} '.format(Num,timestamp,ip_src ,ip_dst))
        if eth.data.__class__.__name__ == 'IP':

            ip = '%d.%d.%d.%d' % tuple(map(ord, list(eth.data.dst)))

            if eth.data.data.__class__.__name__ == 'TCP':

                if eth.data.data.dport == 80:
                    print eth.data.data.data  # http 請求的數據

if __name__ == '__main__':
    starttime = datetime.datetime.now()
    f = open('pcap222.pcap', 'rb')  # 要以rb方式打開,用r方式打開會報錯
    pcap = dpkt.pcap.Reader(f)
    getip(pcap)
    endtime = datetime.datetime.now()
    print ('time : {0} seconds '.format((endtime - starttime).seconds))

輸出:

1290064	time:1501562988.75	src:113.142.85.151-->dst:192.168.1.103 
1290065	time:1501562988.75	src:192.168.1.103-->dst:113.142.85.151 

1290066	time:1501562988.75	src:192.168.1.103-->dst:113.142.85.151 

1290067	time:1501562988.75	src:113.142.85.151-->dst:192.168.1.103 
1290068	time:1501562988.75	src:192.168.1.103-->dst:113.142.85.151 

1290069	time:1501562988.76	src:192.168.1.103-->dst:113.142.85.151 

1290070	time:1501562988.76	src:122.228.91.14-->dst:192.168.1.103 
1290071	time:1501562988.76	src:192.168.1.103-->dst:113.142.85.151 

1290072	time:1501562988.76	src:113.142.85.151-->dst:192.168.1.103 
1290073	time:1501562988.76	src:192.168.1.103-->dst:113.142.85.151 

1290074	time:1501562988.76	src:192.168.1.103-->dst:113.142.85.151 
GET / HTTP/1.1
Accept: application/x-ms-application, image/jpeg, application/xaml+xml, image/gif, image/pjpeg, application/x-ms-xbap, application/vnd.ms-excel, application/vnd.ms-powerpoint, application/msword, */*
Accept-Language: zh-cn
User-Agent: Mozilla/4.0 (compatible; MSIE 6.0; Mac_PowerPC; en) Opera 9.24
Referer: -
Connection: Keep-Alive
Host: win7.shangshai-qibao.cn

0x七、參考

SMTP協議分析 https://yq.aliyun.com/wenji/262429

scapy 解析pcap文件總結 https://www.cnblogs.com/14061216chen/p/8093441.html

python之字符串格式化(format) https://www.cnblogs.com/benric/p/4965224.html

相關文章
相關標籤/搜索