Paul Tötterman has proposed merging lp:~ptman/hipl/methods-to-functions into lp:hipl. Requested reviews: HIPL core team (hipl-core) For more details, see: https://code.launchpad.net/~ptman/hipl/methods-to-functions/+merge/102584 Fix pylint warnings about methods not depending on anything from the object by refactoring them into freestanding functions. Added some tests. Test report: - Built debian packages - Ran hipdnsproxy - dig @127.0.0.53 google.com a - dig @127.0.0.53 crossroads.infrahip.net a - dig @127.0.0.53 crossroads.infrahip.net aaaa - Anything else that should be tested? These tests could probably be automated somehow. -- https://code.launchpad.net/~ptman/hipl/methods-to-functions/+merge/102584 Your team HIPL core team is requested to review the proposed merge of lp:~ptman/hipl/methods-to-functions into lp:hipl.
=== modified file 'Makefile.am' --- Makefile.am 2012-03-13 15:27:06 +0000 +++ Makefile.am 2012-04-18 20:32:26 +0000 @@ -272,7 +272,6 @@ tools_hipdnskeyparse_PYTHON = tools/hipdnskeyparse/myasn.py tools_hipdnsproxy_PYTHON = tools/hipdnsproxy/hosts.py \ - tools/hipdnsproxy/pyip6.py \ tools/hipdnsproxy/util.py tools_hipdnskeyparsedir = $(pythondir)/hipdnskeyparse === modified file 'tools/hipdnsproxy/hipdnsproxy.in' --- tools/hipdnsproxy/hipdnsproxy.in 2012-03-25 19:10:40 +0000 +++ tools/hipdnsproxy/hipdnsproxy.in 2012-04-18 20:32:26 +0000 @@ -87,6 +87,10 @@ from DNS import Serialize, DeSerialize +LSI_RE = re.compile(r'(?P<lsi>1\.\d+\.\d+\.\d+)') +HIT_RE = re.compile(r'[^0-9a-f:](?P<hit>2001:1[0-9a-f]:[0-9a-f:]*)[^0-9a-f:]') + + def usage(unused_utyp, *msg): """Print usage instructions and exit.""" sys.stderr.write('Usage: %s\n' % os.path.split(sys.argv[0])[1]) @@ -99,6 +103,45 @@ MYID = '%d-%d' % (time.time(), os.getpid()) +def hit_to_lsi(hit): + """Return LSI for HIT if found.""" + output = subprocess.Popen(['hipconf', 'daemon', 'hit-to-lsi', hit], + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT).stdout + + for line in output: + match = LSI_RE.search(line) + if match: + return match.group('lsi') + + +def lsi_to_hit(lsi): + """Return HIT for LSI if found.""" + output = subprocess.Popen(['hipconf', 'daemon', 'lsi-to-hit', lsi], + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT).stdout + + for line in output: + match = HIT_RE.search(line) + if match: + return match.group('hit') + + +def is_reverse_hit_query(name): + """Check if the query is a reverse query to a HIT. + + >>> is_reverse_hit_query('::1') + False + >>> is_reverse_hit_query('8.e.b.8.b.3.c.9.1.a.0.c.e.e.2.c.c.e.d.0.9.c.' + ... '9.a.e.1.0.0.1.0.0.2.hit-to-ip.infrahip.net') + True + """ + if (name.endswith('.1.0.0.1.0.0.2.hit-to-ip.infrahip.net') and + len(name) == 86): + return True + return False + + class Logger: """Logging to stdout or syslog. @@ -164,8 +207,7 @@ else: self.resolvconf_towrite = '/etc/resolv.conf' - self.dnsmasq_restart = (self.dnsmasq_initd_script + - ' restart > /dev/null') + self.dnsmasq_restart = [self.dnsmasq_initd_script, 'restart'] if filetowatch is None: self.filetowatch = self.guess_resolvconf() self.resolvconf_orig = self.filetowatch @@ -245,7 +287,9 @@ if line.find(self.rh_before) == 0: print self.rh_inject print line, - os.system(self.dnsmasq_restart) + subprocess.check_call(self.dnsmasq_restart, + stdout=open(os.devnull, 'wb'), + stderr=subprocess.STDOUT) self.fout.write('Hooked with dnsmasq\n') # Restarting of dnsproxy changes also resolv conf. Reset timer # to make sure that we don't load dnsproxy's IP address. Otherwise @@ -269,7 +313,9 @@ inplace=1): if line.find(self.rh_inject) == -1: print line, - os.system(self.dnsmasq_restart) + subprocess.check_call(self.dnsmasq_restart, + stdout=open(os.devnull, 'wb'), + stderr=subprocess.STDOUT) if (not (self.use_dnsmasq_hook and self.use_resolvconf) and self.overwrite_resolv_conf): os.rename(self.resolvconf_bkname, self.resolvconf_towrite) @@ -325,13 +371,15 @@ def stop(self): """Perform shutdown routines.""" self.restore_resolvconf_dnsmasq() - os.system("ifconfig lo:53 down") + subprocess.check_call(['ifconfig', 'lo:53', 'down']) # Sometimes hipconf processes get stuck, particularly when # hipd is busy or unresponsive. This is a workaround. - os.system('killall --quiet hipconf 2> /dev/null') + subprocess.check_call(['killall', '--quiet', 'hipconf'], + stderr=open(os.devnull, 'wb')) class DNSProxy: + """HIP DNS proxy main class.""" default_hiphosts = "@sysconfdir@/hosts" default_hosts = "/etc/hosts" re_nameserver = re.compile(r'nameserver\s+(\S+)$') @@ -472,16 +520,6 @@ if result: return result - def str_is_lsi(self, lsi_str): - """Is the string a valid Local Scope Identifier?""" - for hostsdb in self.hosts: - return hostsdb.str_is_lsi(lsi_str) - - def str_is_hit(self, hit_str): - """Is the string a valid Host Identity Tag?""" - for hostsdb in self.hosts: - return hostsdb.str_is_hit(hit_str) - def cache_name(self, name, addr, ttl): """Cache the name-address mapping with ttl in all hosts files.""" for hostsdb in self.hosts: @@ -494,16 +532,6 @@ if result: return result - def ptr_str_to_addr_str(self, ptr_str): - """Convert PTR hostname to IP address.""" - for hostsdb in self.hosts: - return hostsdb.ptr_str_to_addr_str(ptr_str) - - def addr6_str_to_ptr_str(self, addr_str): - """Convert IPv6 address to PTR hostname.""" - for hostsdb in self.hosts: - return hostsdb.addr6_str_to_ptr_str(addr_str) - def forkme(self): """Daemonize current process.""" pid = os.fork() @@ -580,12 +608,13 @@ adding them to the cache? """ localhit = [] - cmd = "ifconfig dummy0 2>&1" - proc = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE).stdout + proc = subprocess.Popen(['ifconfig', 'dummy0'], + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT).stdout result = proc.readline() while result: - start = result.find("2001:1") - end = result.find("/28") + start = result.find('2001:1') + end = result.find('/28') if start != -1 and end != -1: hit = result[start:end] if not self.getaddr(hit): @@ -594,48 +623,15 @@ proc.close() ofile = open(self.default_hiphosts, 'a') for i in range(len(localhit)): - ofile.write(localhit[i] + "\tlocalhit" + str(i + 1) + '\n') + ofile.write('%s\tlocalhit%s\n' % (localhit[i], i + 1)) ofile.close() - def map_hit_to_lsi(self, hit): - """Query LSI for HIT.""" - cmd = "hipconf daemon hit-to-lsi " + hit + " 2>&1" - proc = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE).stdout - result = proc.readline() - while result: - start = result.find("1.") - end = result.find("\n") - if start != -1 and end != -1: - return result[start:end] - result = proc.readline() - - def lsi_to_hit(self, lsi): - """Query HIT corresponding to LSI.""" - cmd = "hipconf daemon lsi-to-hit " + lsi + " 2>&1" - proc = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE).stdout - result = proc.readline() - while result: - start = result.find("2001:") - end = result.find("\n") - if start != -1 and end != -1: - return result[start:end] - result = proc.readline() - def add_hit_ip_map(self, hit, addr): """Add IP for HIT.""" - cmd = "hipconf daemon add map " + hit + " " + addr + \ - " > /dev/null 2>&1" self.fout.write('Associating HIT %s with IP %s\n' % (hit, addr)) - os.system(cmd) - - def hip_is_reverse_hit_query(self, name): - """Check if the query is a reverse query to a HIT.""" - # 8.e.b.8.b.3.c.9.1.a.0.c.e.e.2.c.c.e.d.0.9.c.9.a.e.1.0.0.1.0.0.2.\ - # hit-to-ip.infrahip.net - if (len(name) > 64 and name.find('.1.0.0.1.0.0.2.') == 49): - return True - else: - return False + subprocess.check_call(['hipconf', 'daemon', 'add', 'map', hit, addr], + stdout=open(os.devnull, 'wb'), + stderr=subprocess.STDOUT) def hip_cache_lookup(self, packet): """Make a cache lookup.""" @@ -650,10 +646,10 @@ # map host name to address from cache if qtype == 12: lr_ptr = None - addr_str = self.ptr_str_to_addr_str(qname) + addr_str = hosts.ptr_to_addr(qname) if (not self.disable_lsi and addr_str is not None and - self.str_is_lsi(addr_str)): - addr_str = self.lsi_to_hit(addr_str) + hosts.valid_lsi(addr_str)): + addr_str = lsi_to_hit(addr_str) lr_ptr = self.getaddr(addr_str) lr_aaaa_hit = None else: @@ -671,7 +667,7 @@ if qtype == 28: # 28: AAAA result = lr_aaaa_hit elif qtype == 1 and not self.disable_lsi: # 1: A - lsi = self.map_hit_to_lsi(lr_aaaa_hit[0]) + lsi = hit_to_lsi(lr_aaaa_hit[0]) if lsi is not None: result = (lsi, lr_aaaa_hit[1]) elif self.prefix and packet['questions'][0][0].startswith(self.prefix): @@ -717,7 +713,7 @@ hit_ans.append([qname, 28, 1, answer[3], hit]) if qtype == 1 and not self.disable_lsi: - lsi = self.map_hit_to_lsi(hit) + lsi = hit_to_lsi(hit) if lsi is not None: lsi_ans.append([qname, 1, 1, self.hosts_ttl, lsi]) @@ -744,7 +740,6 @@ # Default virtual interface and address for dnsproxy to # avoid problems with other dns forwarders (e.g. dnsmasq) os.system("ifconfig lo:53 %s" % (self.bind_ip,)) - #os.system("ifconfig lo:53 inet6 add ::53/128") servsock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) try: @@ -884,9 +879,8 @@ pckt = copy.copy(packet) pckt['id'] = query_id if ((qtype == 28 or - (qtype == 1 and not self.disable_lsi)) and - not self.hip_is_reverse_hit_query( - packet['questions'][0][0])): + (qtype == 1 and not self.disable_lsi)) and not + is_reverse_hit_query(packet['questions'][0][0])): if not self.prefix: pckt['questions'][0][1] = 55 @@ -899,14 +893,14 @@ if qtype == 12 and not self.disable_lsi: qname = packet['questions'][0][0] - addr_str = self.ptr_str_to_addr_str(qname) + addr_str = hosts.ptr_to_addr(qname) if (addr_str is not None and - self.str_is_lsi(addr_str)): + hosts.valid_lsi(addr_str)): query = (packet, from_a[0], from_a[1], qname) - hit_str = self.lsi_to_hit(addr_str) + hit_str = lsi_to_hit(addr_str) if hit_str is not None: pckt['questions'][0][0] = \ - self.addr6_str_to_ptr_str(hit_str) + hosts.addr_to_ptr(hit_str) outbuf = Serialize(pckt).get_packet() clisock.sendto(outbuf, (self.server_ip, @@ -964,7 +958,7 @@ for answer in packet['answers']: if (answer[1] == 1 or (answer[1] == 28 and not - self.str_is_hit(answer[4]))): + hosts.valid_hit(answer[4]))): self.add_hit_ip_map(hit[0], answer[4]) # Reply with HIT/LSI once it's been mapped to # an IP === modified file 'tools/hipdnsproxy/hosts.py' (properties changed: -x to +x) --- tools/hipdnsproxy/hosts.py 2012-03-25 19:10:40 +0000 +++ tools/hipdnsproxy/hosts.py 2012-04-18 20:32:26 +0000 @@ -25,12 +25,188 @@ """Hosts file handling for hipdnsproxy.""" -import binascii import os -import pyip6 +import re +import socket import time +HIT_RE = re.compile(r'^2001:1[0-9a-f]:[0-9a-f:]*$') + + +def valid_ipv6(addr): + """Is the string a valid IPv6 address? + + >>> valid_ipv6('::1') + True + >>> valid_ipv6('127.0.0.1') + False + """ + try: + socket.inet_pton(socket.AF_INET6, addr) + return True + except socket.error: + return False + + +def valid_hit(addr): + """Is the string a valid Host Identity Tag? + + >>> valid_hit('2001:010::1') + True + >>> valid_hit('::1') + False + >>> valid_hit('2001:1f::1') + True + >>> valid_hit('127.0.0.1') + False + """ + if not valid_ipv6(addr): + return False + + caddr = canonicalize_ipv6(addr) + if not HIT_RE.match(caddr): + return False + + return True + + +def valid_lsi(addr): + """Is the string a valid Local Scope Identifier? + + >>> valid_lsi('1.0.0.1') + True + >>> valid_lsi('127.0.0.1') + False + >>> valid_lsi('1.0.1') + False + >>> valid_lsi('1.0.0.365') + False + >>> valid_lsi('1.foobar') + False + """ + parts = addr.split('.') + + if not len(parts) == 4: + return False + + if not int(parts[0]) == 1: + return False + + in_range = all([0 <= int(x) < 256 for x in parts]) + if not in_range: + return False + + return True + + +def normalize(name): + """Normalize FQDN. + + >>> normalize('this.is.a.test....') + 'this.is.a.test' + >>> normalize('this...is..a.test..') + 'this...is..a.test' + >>> normalize('this..is.a.test') + 'this..is.a.test' + >>> normalize('this.is.a.test') + 'this.is.a.test' + """ + name = name.lower() + parts = name.split('.') + while parts and parts[-1] == '': + parts.pop() + return '.'.join(parts) + + +def canonicalize_ipv6(addr): + """Return canonical form of IPv6 address. + + >>> canonicalize_ipv6('ff00:1:02:0:0::3:4') + 'ff00:1:2::3:4' + >>> canonicalize_ipv6('::1') + '::1' + """ + return socket.inet_ntop(socket.AF_INET6, socket.inet_pton(socket.AF_INET6, + addr)) + + +def addr_to_ptr(addr): + """Return PTR hostname string for IP address string. + + >>> addr_to_ptr('192.168.1.2') + '2.1.168.192.in-addr.arpa' + >>> addr_to_ptr('2001:001e:361f:8a55:6730:6f82:ef36:2fff') + 'f.f.f.2.6.3.f.e.2.8.f.6.0.3.7.6.5.5.a.8.f.1.6.3.e.1.0.0.1.0.0.2.ip6.arpa' + """ + if valid_ipv6(addr): + return '%s.ip6.arpa' % '.'.join(reversed(addr.replace(':', ''))) + + return '%s.in-addr.arpa' % '.'.join(reversed(addr.split('.'))) + + +def ptr_to_addr(ptr): + """Return IP address string from PTR hostname string. + + >>> ptr_to_addr('2.1.168.192.in-addr.arpa') + '192.168.1.2' + >>> ptr_to_addr('f.f.f.2.6.3.f.e.2.8.f.6.0.3.7.6.5.5.a.8.f.1.6.3.e.1.0.0' + ... '.1.0.0.2.ip6.arpa') + '2001:001e:361f:8a55:6730:6f82:ef36:2fff' + """ + if '.in-addr.arpa' in ptr: + return '.'.join(reversed(ptr.split('.')[:4])) + if '.ip6.arpa' in ptr: + return ':'.join(['%s' * 4] * 8) % tuple(reversed(ptr.split('.')[:32])) + + +def _getrecord(name, src): + """Return address + ttl for hostname from dictionary. + + >>> _getrecord('foo', {'foo': ('127.0.0.1', 0)}) + ('127.0.0.1', 122) + >>> _getrecord('bar', {}) is None + True + >>> _getrecord('baz', {'baz': ('::1', int(time.time())+5)}) + ('::1', 5) + >>> _getrecord('quux', {'quux': ('::', 5)}) is None + True + """ + addr = src.get(normalize(name)) + if addr is None: + return None + if addr[1] == 0: + ttl = 122 + else: + ttl = addr[1] - int(time.time()) + if ttl < 1: + del src[normalize(name)] + return + return (addr[0], ttl) + + +def _find_name_for_addr_from_src(addr, src): + """Find hostname matching address from source. + + >>> _find_name_for_addr_from_src('b::1', {'testhost': ('b::1', 0)}) + 'testhost' + >>> _find_name_for_addr_from_src('127.0.0.1', + ... {'localhost': ('127.0.0.1', 0)}) + 'localhost' + >>> _find_name_for_addr_from_src('foo', {}) is None + True + """ + for name, record in src.iteritems(): + naddr = record[0] + if valid_ipv6(naddr): + caddr = canonicalize_ipv6(addr) + else: + caddr = naddr + # XXX(ptman): normalize? but that's what's been done until now + if normalize(addr) == caddr: + return name + + class Hosts: """Class for handling a hosts file.""" @@ -65,22 +241,6 @@ self.reread() self.modified[self.hostsfile] = hosts_mtime - def sani(self, name): - """Sanitize fqdn.""" - name = name.lower() - parts = name.split('.') - while parts and parts[-1] == '': - parts.pop() - return '.'.join(parts) - - def sani_aaaa(self, addr): - """Return PTR hostname of IPv6 address.""" - binaddr = pyip6.inet_pton(addr) - hexaddr = list(binascii.b2a_hex(binaddr)) - hexaddr.reverse() - hexaddr.extend(['ip6', 'arpa']) - return '.'.join(hexaddr) - def rcreread(self): """Re-read resolv.conf.""" self.suffixes = () @@ -98,80 +258,6 @@ if keyword == 'search': self.suffixes = tuple([part.lower() for part in parts]) - def str_is_ipv6(self, addr_str): - """Is the string a valid IPv6 address? - - TODO(ptman): Improve the test - """ - if addr_str.find(':') == -1: - return False - return True - - def str_is_hit(self, addr_str): - """Is the string a valid Host Identity Tag?""" - if addr_str.startswith('2001:001') or (addr_str.startswith('2001:1') - and addr_str[7] == ':'): - return True - return False - - def str_is_lsi(self, addr_str): - """Is the string a valid Local Scope Identifier?""" - if addr_str.startswith('1.'): - return True - return False - - def ptr4_str_to_addr_str(self, ptr_str): - """Convert IPv4 PTR to IPv4 address.""" - in4 = '' - octet = '' - for i in range(len(ptr_str)): - if ptr_str[i] == '.': - in4 = octet + '.' + in4 - octet = '' - else: - octet += ptr_str[i] - in4 = octet + '.' + in4[0:len(in4) - 1] - return in4 - - def ptr6_str_to_addr_str(self, ptr_str): - """Convert IPv6 PTR to IPv6 address.""" - in6 = '' - for i in range(len(ptr_str)): - if (((i + 1) % 8) == 0): - in6 += ':' - if ptr_str[i] != '.': - in6 += ptr_str[i] - return in6 - - def addr6_str_to_ptr_str(self, addr): - """Convert IPv6 address to IPv6 PTR hostname.""" - # Note: address string must be in the full notation - ptr = '' - addr = addr[::-1] - for char in addr: - if char != ':': - ptr += char + '.' - ptr += 'ip6.arpa' - return ptr - - def ptr_str_to_addr_str(self, ptr_str): - """Convert PTR hostname to IP address.""" - # IPv4: - # - 102.2.168.192.in-addr.arpa - # - 4.3.2.1.in-addr.arpa - # IPv6: - # - 9.0...f.3.ip6.arpa - if not ptr_str: - return None - end = ptr_str.find('.i') - if end == -1: - return None - addr_part = ptr_str[0:end] - if ptr_str.find('.ip6') == -1: - return self.ptr4_str_to_addr_str(addr_part) - else: - return self.ptr6_str_to_addr_str(addr_part[::-1]) - def reread(self): """Re-read hosts file.""" name_a = {} @@ -192,78 +278,54 @@ addr = fields.pop(0) for name in fields: - name = self.sani(name) + name = normalize(name) - if self.str_is_hit(addr): + if valid_hit(addr): name_hit[name] = (addr, 0) - elif self.str_is_ipv6(addr): + elif valid_ipv6(addr): name_aaaa[name] = (addr, 0) - elif not self.str_is_lsi(addr): + elif valid_lsi(addr): name_a[name] = (addr, 0) self.name_a = name_a self.name_aaaa = name_aaaa self.name_hit = name_hit - def getaddr_from_list(self, addr_str, names): - """Find hostname matching address from names.""" - for name in names: - if self.str_is_ipv6(names[name][0]): - # canonicalize IPv6 address - binaddr = pyip6.inet_pton(names[name][0]) - caddr = pyip6.inet_ntop(binaddr) - else: - caddr = names[name][0] - if self.sani(addr_str) == caddr: - return name - def getaddr(self, addr): """Find hostname matching address.""" if addr is None: - return None - if self.str_is_ipv6(addr): - # canonicalize IPv6 address - binaddr = pyip6.inet_pton(addr) - addr_str = pyip6.inet_ntop(binaddr) - if self.str_is_hit(addr): - return self.getaddr_from_list(addr_str, self.name_hit) + return + if valid_ipv6(addr): + caddr = canonicalize_ipv6(addr) + if valid_hit(addr): + return _find_name_for_addr_from_src(caddr, self.name_hit) else: - return self.getaddr_from_list(addr_str, self.name_aaaa) + return _find_name_for_addr_from_src(caddr, self.name_aaaa) else: - return self.getaddr_from_list(addr, self.name_a) + return _find_name_for_addr_from_src(addr, self.name_a) def geta(self, name): """Return LSI record for name.""" - return self.getrecord(name, self.name_a) + return _getrecord(name, self.name_a) def getaaaa(self, name): """Return IPv6 record for name.""" - return self.getrecord(name, self.name_aaaa) + return _getrecord(name, self.name_aaaa) def getaaaa_hit(self, name): """Return HIT record for name.""" - return self.getrecord(name, self.name_hit) - - def getrecord(self, name, src): - """Return address + ttl for hostname from dictionary.""" - addr = src.get(self.sani(name)) - if addr is None: - return None - if addr[1] == 0: - ttl = 122 - else: - ttl = addr[1] - int(time.time()) - if ttl < 1: - del src[self.sani(name)] - return None - return (addr[0], ttl) + return _getrecord(name, self.name_hit) def cache_name(self, hostname, addr, ttl): """Store hostname-address -mapping in cache for ttl duration.""" valid_to = int(time.time()) + ttl - if self.str_is_hit(addr): + if valid_hit(addr): self.name_hit[hostname] = (addr, valid_to) - elif self.str_is_ipv6(addr): + elif valid_ipv6(addr): self.name_aaaa[hostname] = (addr, valid_to) else: self.name_a[hostname] = (addr, valid_to) + +if __name__ == '__main__': + import doctest + doctest.testmod() === removed file 'tools/hipdnsproxy/pyip6.py' --- tools/hipdnsproxy/pyip6.py 2012-03-24 14:56:51 +0000 +++ tools/hipdnsproxy/pyip6.py 1970-01-01 00:00:00 +0000 @@ -1,142 +0,0 @@ -#!/usr/bin/env python - -"""Pure Python IP6 parsing and formatting - -Copyright (c) 2006 Stuart Gathman <stuart@xxxxxxxx> - -This module is free software, and you may redistribute it and/or modify -it under the same terms as Python itself, so long as this copyright message -and disclaimer are retained in their original form. -""" - - -import struct -import re - - -PAT_IP4 = r'\.'.join([r'(?:\d|[1-9]\d|1\d\d|2[0-4]\d|25[0-5])'] * 4) -RE_IP4 = re.compile(PAT_IP4 + '$') - - -def inet_ntop(buf): - """ - Convert ip6 address to standard hex notation. - - Examples: - - >>> inet_ntop(struct.pack("!HHHHHHHH", 0, 0, 0, 0, 0, 0xFFFF, 0x0102, - ... 0x0304)) - '::FFFF:1.2.3.4' - - >>> inet_ntop(struct.pack("!HHHHHHHH", 0x1234, 0x5678, 0, 0, 0, 0, 0x0102, - ... 0x0304)) - '1234:5678::102:304' - - >>> inet_ntop(struct.pack("!HHHHHHHH", 0, 0, 0, 0x1234, 0x5678, 0, 0x0102, - ... 0x0304)) - '::1234:5678:0:102:304' - - >>> inet_ntop(struct.pack("!HHHHHHHH", 0x1234, 0x5678, 0, 0x0102, 0x0304, - ... 0, 0, 0)) - '1234:5678:0:102:304::' - - >>> inet_ntop(struct.pack("!HHHHHHHH", 0, 0, 0, 0, 0, 0, 0, 0)) - '::' - """ - # convert to 8 words - words = struct.unpack('!HHHHHHHH', buf) - null = (0, 0, 0, 0, 0, 0, 0, 0) - if words == null: - return '::' - # check for ip4 mapped - if words[:5] == (0, 0, 0, 0, 0) and words[5] in (0, 0xFFFF): - ip4 = '.'.join([str(i) for i in struct.unpack('!BBBB', buf[12:])]) - if words[5]: - return '::FFFF:' + ip4 - return '::' + ip4 - # find index of longest sequence of 0 - for longest in (7, 6, 5, 4, 3, 2, 1): - zeroes = null[:longest] - for i in range(9 - longest): - if words[i:i + longest] == zeroes: - if i == 0: - return ':' + ':%x' * (8 - longest) % words[longest:] - if i == 8 - longest: - return '%x:' * (8 - longest) % words[:-longest] + ':' - return ('%x:' * i % words[:i] + - ':%x' * (8 - longest - i) % words[i + longest:]) - return '%x:%x:%x:%x:%x:%x:%x:%x' % words - - -def inet_pton(addr): - """ - Convert ip6 standard hex notation to ip6 address. - - Examples: - - >>> struct.unpack('!HHHHHHHH', inet_pton('::')) - (0, 0, 0, 0, 0, 0, 0, 0) - - >>> struct.unpack('!HHHHHHHH', inet_pton('::1234')) - (0, 0, 0, 0, 0, 0, 0, 4660) - - >>> struct.unpack('!HHHHHHHH', inet_pton('1234::')) - (4660, 0, 0, 0, 0, 0, 0, 0) - - >>> struct.unpack('!HHHHHHHH', inet_pton('1234::5678')) - (4660, 0, 0, 0, 0, 0, 0, 22136) - - >>> struct.unpack('!HHHHHHHH', inet_pton('::FFFF:1.2.3.4')) - (0, 0, 0, 0, 0, 65535, 258, 772) - - >>> struct.unpack('!HHHHHHHH', inet_pton('1.2.3.4')) - (0, 0, 0, 0, 0, 65535, 258, 772) - - >>> try: - ... inet_pton('::1.2.3.4.5') - ... except ValueError, x: - ... print x - ::1.2.3.4.5 - """ - if addr == '::': - return '\0' * 16 - buf = addr - match = RE_IP4.search(buf) - try: - if match: - pos = match.start() - ip4 = [int(i) for i in buf[pos:].split('.')] - if not pos: - # pylint: disable=W0142 - return struct.pack('!QLBBBB', 0, 65535, *ip4) - buf = buf[:pos] + '%x%02x:%x%02x' % tuple(ip4) - zero_split = buf.split('::') - if len(zero_split) == 2: - head, tail = zero_split - if not head: - tail = tail.split(':') - return struct.pack('!HHHHHHHH', - *[0] * (8 - len(tail)) + - [int(x, 16) for x in tail]) - if not tail: - head = head.split(':') - return struct.pack('!HHHHHHHH', - *[int(x, 16) for x in head] + - [0] * (8 - len(head))) - head = head.split(':') - tail = tail.split(':') - return struct.pack('!HHHHHHHH', - *[int(x, 16) for x in head] + - [0] * (8 - len(head) - len(tail)) + - [int(x, 16) for x in tail]) - if len(zero_split) == 1: - return struct.pack('!HHHHHHHH', - *[int(x, 16) for x in zero_split[0].split(':')]) - except ValueError: - pass - raise ValueError(addr) - - -if __name__ == '__main__': - import doctest - doctest.testmod() === modified file 'tools/hipdnsproxy/util.py' --- tools/hipdnsproxy/util.py 2012-03-24 14:56:51 +0000 +++ tools/hipdnsproxy/util.py 2012-04-18 20:32:26 +0000 @@ -23,11 +23,7 @@ # FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR # OTHER DEALINGS IN THE SOFTWARE. -"""Utility functions for hipdnsproxy. - -There are some doctests in this file. To run tests, use -`python -m doctest util.py`. -""" +"""Utility functions for hipdnsproxy.""" import re import signal