# -*- coding: utf-8 -*- import socket import struct class IPAddresss: def __init__(self, ipdbFile): self.ipdb = open(ipdbFile, "rb") str = self.ipdb.read(8) (self.firstIndex, self.lastIndex) = struct.unpack('II', str) self.indexCount = int((self.lastIndex - self.firstIndex) / 7 + 1) # print self.getVersion(), u" 紀錄總數: %d 條 "%(self.indexCount) def getVersion(self): s = self.getIpAddr(0xffffff00) return s def getAreaAddr(self, offset=0): if offset: self.ipdb.seek(offset) str = self.ipdb.read(1) (byte,) = struct.unpack('B', str) if byte == 0x01 or byte == 0x02: p = self.getLong3() if p: return self.getString(p) else: return "" else: self.ipdb.seek(-1, 1) return self.getString(offset) def getAddr(self, offset, ip=0): self.ipdb.seek(offset + 4) countryAddr = self.text_("") areaAddr = self.text_("") str = self.ipdb.read(1) (byte,) = struct.unpack('B', str) if byte == 0x01: countryOffset = self.getLong3() self.ipdb.seek(countryOffset) str = self.ipdb.read(1) (b,) = struct.unpack('B', str) if b == 0x02: countryAddr = self.getString(self.getLong3()) self.ipdb.seek(countryOffset + 4) else: countryAddr = self.getString(countryOffset) areaAddr = self.getAreaAddr() elif byte == 0x02: countryAddr = self.getString(self.getLong3()) areaAddr = self.getAreaAddr(offset + 8) else: countryAddr = self.getString(offset + 4) areaAddr = self.getAreaAddr() return countryAddr + self.text_(" ") + areaAddr def dump(self, first, last): if last > self.indexCount: last = self.indexCount for index in range(first, last): offset = self.firstIndex + index * 7 self.ipdb.seek(offset) buf = self.ipdb.read(7) (ip, of1, of2) = struct.unpack("IHB", buf) address = self.getAddr(of1 + (of2 << 16)) # 把GBK轉爲utf-8 address = self.text_(address, 'gbk').encode("utf-8") # logger.info("%d %s %s" % (index, self.ip2str(ip), address)) def setIpRange(self, index): offset = self.firstIndex + index * 7 self.ipdb.seek(offset) buf = self.ipdb.read(7) (self.curStartIp, of1, of2) = struct.unpack("IHB", buf) self.curEndIpOffset = of1 + (of2 << 16) self.ipdb.seek(self.curEndIpOffset) buf = self.ipdb.read(4) (self.curEndIp,) = struct.unpack("I", buf) def getIpAddr(self, ip): L = 0 R = self.indexCount - 1 while L < R - 1: M = int((L + R) / 2) self.setIpRange(M) if ip == self.curStartIp: L = M break if ip > self.curStartIp: L = M else: R = M self.setIpRange(L) # version information, 255.255.255.X, urgy but useful if ip & 0xffffff00 == 0xffffff00: self.setIpRange(R) if self.curStartIp <= ip <= self.curEndIp: address = self.getAddr(self.curEndIpOffset) # 把GBK轉爲utf-8 address = self.text_(address) else: address = self.text_("未找到該IP的地址") return address def getIpRange(self, ip): self.getIpAddr(ip) range = self.ip2str(self.curStartIp) + ' - ' \ + self.ip2str(self.curEndIp) return range def getString(self, offset=0): if offset: self.ipdb.seek(offset) str = b'' ch = self.ipdb.read(1) (byte,) = struct.unpack('B', ch) while byte != 0: str += ch ch = self.ipdb.read(1) (byte,) = struct.unpack('B', ch) return str.decode('gbk') def ip2str(self, ip): return str(ip >> 24) + '.' + str((ip >> 16) & 0xff) + '.' + str((ip >> 8) & 0xff) + '.' + str(ip & 0xff) def str2ip(self, s): (ip,) = struct.unpack('I', socket.inet_aton(s)) return ((ip >> 24) & 0xff) | ((ip & 0xff) << 24) | ((ip >> 8) & 0xff00) | ((ip & 0xff00) << 8) def getLong3(self, offset=0): if offset: self.ipdb.seek(offset) str = self.ipdb.read(3) (a, b) = struct.unpack('HB', str) return (b << 16) + a def text_(self, s, encoding='utf-8', errors='strict'): if isinstance(s, str): return s.decode(encoding, errors) return s if __name__ == '__main__': ips = IPAddresss('qqwry.dat') print type(ips.getIpAddr(ips.str2ip('36.157.14.139')))
qqwry.dat 自行下載socket
百度雲盤:連接: https://pan.baidu.com/s/1Evw4CR6bciZREq_uDfcxeA 密碼: guqtspa