[hipl-dev] [Merge] lp:~ptman/hipl/methods-to-functions into lp:hipl

  • From: Paul Tötterman <ptman@xxxxxx>
  • To: mp+102584@xxxxxxxxxxxxxxxxxx
  • Date: Wed, 18 Apr 2012 20:33:20 -0000

Paul Tötterman has proposed merging lp:~ptman/hipl/methods-to-functions into 
lp:hipl.

Requested reviews:
  HIPL core team (hipl-core)

For more details, see:
https://code.launchpad.net/~ptman/hipl/methods-to-functions/+merge/102584

Fix pylint warnings about methods not depending on anything from the object by 
refactoring them into freestanding functions. Added some tests.

Test report:
- Built debian packages
- Ran hipdnsproxy
- dig @127.0.0.53 google.com a
- dig @127.0.0.53 crossroads.infrahip.net a
- dig @127.0.0.53 crossroads.infrahip.net aaaa
- Anything else that should be tested? These tests could probably be automated 
somehow.
-- 
https://code.launchpad.net/~ptman/hipl/methods-to-functions/+merge/102584
Your team HIPL core team is requested to review the proposed merge of 
lp:~ptman/hipl/methods-to-functions into lp:hipl.
=== modified file 'Makefile.am'
--- Makefile.am 2012-03-13 15:27:06 +0000
+++ Makefile.am 2012-04-18 20:32:26 +0000
@@ -272,7 +272,6 @@
 tools_hipdnskeyparse_PYTHON = tools/hipdnskeyparse/myasn.py
 
 tools_hipdnsproxy_PYTHON = tools/hipdnsproxy/hosts.py                   \
-                           tools/hipdnsproxy/pyip6.py                   \
                            tools/hipdnsproxy/util.py
 
 tools_hipdnskeyparsedir = $(pythondir)/hipdnskeyparse

=== modified file 'tools/hipdnsproxy/hipdnsproxy.in'
--- tools/hipdnsproxy/hipdnsproxy.in    2012-03-25 19:10:40 +0000
+++ tools/hipdnsproxy/hipdnsproxy.in    2012-04-18 20:32:26 +0000
@@ -87,6 +87,10 @@
 from DNS import Serialize, DeSerialize
 
 
+LSI_RE = re.compile(r'(?P<lsi>1\.\d+\.\d+\.\d+)')
+HIT_RE = re.compile(r'[^0-9a-f:](?P<hit>2001:1[0-9a-f]:[0-9a-f:]*)[^0-9a-f:]')
+
+
 def usage(unused_utyp, *msg):
     """Print usage instructions and exit."""
     sys.stderr.write('Usage: %s\n' % os.path.split(sys.argv[0])[1])
@@ -99,6 +103,45 @@
 MYID = '%d-%d' % (time.time(), os.getpid())
 
 
+def hit_to_lsi(hit):
+    """Return LSI for HIT if found."""
+    output = subprocess.Popen(['hipconf', 'daemon', 'hit-to-lsi', hit],
+                              stdout=subprocess.PIPE,
+                              stderr=subprocess.STDOUT).stdout
+
+    for line in output:
+        match = LSI_RE.search(line)
+        if match:
+            return match.group('lsi')
+
+
+def lsi_to_hit(lsi):
+    """Return HIT for LSI if found."""
+    output = subprocess.Popen(['hipconf', 'daemon', 'lsi-to-hit', lsi],
+                              stdout=subprocess.PIPE,
+                              stderr=subprocess.STDOUT).stdout
+
+    for line in output:
+        match = HIT_RE.search(line)
+        if match:
+            return match.group('hit')
+
+
+def is_reverse_hit_query(name):
+    """Check if the query is a reverse query to a HIT.
+
+    >>> is_reverse_hit_query('::1')
+    False
+    >>> is_reverse_hit_query('8.e.b.8.b.3.c.9.1.a.0.c.e.e.2.c.c.e.d.0.9.c.'
+    ...                      '9.a.e.1.0.0.1.0.0.2.hit-to-ip.infrahip.net')
+    True
+    """
+    if (name.endswith('.1.0.0.1.0.0.2.hit-to-ip.infrahip.net') and
+        len(name) == 86):
+        return True
+    return False
+
+
 class Logger:
     """Logging to stdout or syslog.
 
@@ -164,8 +207,7 @@
         else:
             self.resolvconf_towrite = '/etc/resolv.conf'
 
-        self.dnsmasq_restart = (self.dnsmasq_initd_script +
-                                ' restart > /dev/null')
+        self.dnsmasq_restart = [self.dnsmasq_initd_script, 'restart']
         if filetowatch is None:
             self.filetowatch = self.guess_resolvconf()
         self.resolvconf_orig = self.filetowatch
@@ -245,7 +287,9 @@
                     if line.find(self.rh_before) == 0:
                         print self.rh_inject
                     print line,
-            os.system(self.dnsmasq_restart)
+            subprocess.check_call(self.dnsmasq_restart,
+                                  stdout=open(os.devnull, 'wb'),
+                                  stderr=subprocess.STDOUT)
             self.fout.write('Hooked with dnsmasq\n')
             # Restarting of dnsproxy changes also resolv conf. Reset timer
             # to make sure that we don't load dnsproxy's IP address. Otherwise
@@ -269,7 +313,9 @@
                                             inplace=1):
                     if line.find(self.rh_inject) == -1:
                         print line,
-            os.system(self.dnsmasq_restart)
+            subprocess.check_call(self.dnsmasq_restart,
+                                  stdout=open(os.devnull, 'wb'),
+                                  stderr=subprocess.STDOUT)
         if (not (self.use_dnsmasq_hook and self.use_resolvconf) and
             self.overwrite_resolv_conf):
             os.rename(self.resolvconf_bkname, self.resolvconf_towrite)
@@ -325,13 +371,15 @@
     def stop(self):
         """Perform shutdown routines."""
         self.restore_resolvconf_dnsmasq()
-        os.system("ifconfig lo:53 down")
+        subprocess.check_call(['ifconfig', 'lo:53', 'down'])
         # Sometimes hipconf processes get stuck, particularly when
         # hipd is busy or unresponsive. This is a workaround.
-        os.system('killall --quiet hipconf 2> /dev/null')
+        subprocess.check_call(['killall', '--quiet', 'hipconf'],
+                              stderr=open(os.devnull, 'wb'))
 
 
 class DNSProxy:
+    """HIP DNS proxy main class."""
     default_hiphosts = "@sysconfdir@/hosts"
     default_hosts = "/etc/hosts"
     re_nameserver = re.compile(r'nameserver\s+(\S+)$')
@@ -472,16 +520,6 @@
             if result:
                 return result
 
-    def str_is_lsi(self, lsi_str):
-        """Is the string a valid Local Scope Identifier?"""
-        for hostsdb in self.hosts:
-            return hostsdb.str_is_lsi(lsi_str)
-
-    def str_is_hit(self, hit_str):
-        """Is the string a valid Host Identity Tag?"""
-        for hostsdb in self.hosts:
-            return hostsdb.str_is_hit(hit_str)
-
     def cache_name(self, name, addr, ttl):
         """Cache the name-address mapping with ttl in all hosts files."""
         for hostsdb in self.hosts:
@@ -494,16 +532,6 @@
             if result:
                 return result
 
-    def ptr_str_to_addr_str(self, ptr_str):
-        """Convert PTR hostname to IP address."""
-        for hostsdb in self.hosts:
-            return hostsdb.ptr_str_to_addr_str(ptr_str)
-
-    def addr6_str_to_ptr_str(self, addr_str):
-        """Convert IPv6 address to PTR hostname."""
-        for hostsdb in self.hosts:
-            return hostsdb.addr6_str_to_ptr_str(addr_str)
-
     def forkme(self):
         """Daemonize current process."""
         pid = os.fork()
@@ -580,12 +608,13 @@
         adding them to the cache?
         """
         localhit = []
-        cmd = "ifconfig dummy0 2>&1"
-        proc = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE).stdout
+        proc = subprocess.Popen(['ifconfig', 'dummy0'],
+                                stdout=subprocess.PIPE,
+                                stderr=subprocess.STDOUT).stdout
         result = proc.readline()
         while result:
-            start = result.find("2001:1")
-            end = result.find("/28")
+            start = result.find('2001:1')
+            end = result.find('/28')
             if start != -1 and end != -1:
                 hit = result[start:end]
                 if not self.getaddr(hit):
@@ -594,48 +623,15 @@
         proc.close()
         ofile = open(self.default_hiphosts, 'a')
         for i in range(len(localhit)):
-            ofile.write(localhit[i] + "\tlocalhit" + str(i + 1) + '\n')
+            ofile.write('%s\tlocalhit%s\n' % (localhit[i], i + 1))
         ofile.close()
 
-    def map_hit_to_lsi(self, hit):
-        """Query LSI for HIT."""
-        cmd = "hipconf daemon hit-to-lsi " + hit + " 2>&1"
-        proc = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE).stdout
-        result = proc.readline()
-        while result:
-            start = result.find("1.")
-            end = result.find("\n")
-            if start != -1 and end != -1:
-                return result[start:end]
-            result = proc.readline()
-
-    def lsi_to_hit(self, lsi):
-        """Query HIT corresponding to LSI."""
-        cmd = "hipconf daemon lsi-to-hit " + lsi + " 2>&1"
-        proc = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE).stdout
-        result = proc.readline()
-        while result:
-            start = result.find("2001:")
-            end = result.find("\n")
-            if start != -1 and end != -1:
-                return result[start:end]
-            result = proc.readline()
-
     def add_hit_ip_map(self, hit, addr):
         """Add IP for HIT."""
-        cmd = "hipconf daemon add map " + hit + " " + addr + \
-            " > /dev/null 2>&1"
         self.fout.write('Associating HIT %s with IP %s\n' % (hit, addr))
-        os.system(cmd)
-
-    def hip_is_reverse_hit_query(self, name):
-        """Check if the query is a reverse query to a HIT."""
-        # 8.e.b.8.b.3.c.9.1.a.0.c.e.e.2.c.c.e.d.0.9.c.9.a.e.1.0.0.1.0.0.2.\
-        # hit-to-ip.infrahip.net
-        if (len(name) > 64 and name.find('.1.0.0.1.0.0.2.') == 49):
-            return True
-        else:
-            return False
+        subprocess.check_call(['hipconf', 'daemon', 'add', 'map', hit, addr],
+                              stdout=open(os.devnull, 'wb'),
+                              stderr=subprocess.STDOUT)
 
     def hip_cache_lookup(self, packet):
         """Make a cache lookup."""
@@ -650,10 +646,10 @@
         # map host name to address from cache
         if qtype == 12:
             lr_ptr = None
-            addr_str = self.ptr_str_to_addr_str(qname)
+            addr_str = hosts.ptr_to_addr(qname)
             if (not self.disable_lsi and addr_str is not None and
-                self.str_is_lsi(addr_str)):
-                addr_str = self.lsi_to_hit(addr_str)
+                hosts.valid_lsi(addr_str)):
+                addr_str = lsi_to_hit(addr_str)
             lr_ptr = self.getaddr(addr_str)
             lr_aaaa_hit = None
         else:
@@ -671,7 +667,7 @@
             if qtype == 28:               # 28: AAAA
                 result = lr_aaaa_hit
             elif qtype == 1 and not self.disable_lsi:  # 1: A
-                lsi = self.map_hit_to_lsi(lr_aaaa_hit[0])
+                lsi = hit_to_lsi(lr_aaaa_hit[0])
                 if lsi is not None:
                     result = (lsi, lr_aaaa_hit[1])
         elif self.prefix and packet['questions'][0][0].startswith(self.prefix):
@@ -717,7 +713,7 @@
                 hit_ans.append([qname, 28, 1, answer[3], hit])
 
                 if qtype == 1 and not self.disable_lsi:
-                    lsi = self.map_hit_to_lsi(hit)
+                    lsi = hit_to_lsi(hit)
                     if lsi is not None:
                         lsi_ans.append([qname, 1, 1, self.hosts_ttl, lsi])
 
@@ -744,7 +740,6 @@
         # Default virtual interface and address for dnsproxy to
         # avoid problems with other dns forwarders (e.g. dnsmasq)
         os.system("ifconfig lo:53 %s" % (self.bind_ip,))
-        #os.system("ifconfig lo:53 inet6 add ::53/128")
 
         servsock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
         try:
@@ -884,9 +879,8 @@
                         pckt = copy.copy(packet)
                         pckt['id'] = query_id
                         if ((qtype == 28 or
-                             (qtype == 1 and not self.disable_lsi)) and
-                            not self.hip_is_reverse_hit_query(
-                                packet['questions'][0][0])):
+                             (qtype == 1 and not self.disable_lsi)) and not
+                            is_reverse_hit_query(packet['questions'][0][0])):
 
                             if not self.prefix:
                                 pckt['questions'][0][1] = 55
@@ -899,14 +893,14 @@
 
                         if qtype == 12 and not self.disable_lsi:
                             qname = packet['questions'][0][0]
-                            addr_str = self.ptr_str_to_addr_str(qname)
+                            addr_str = hosts.ptr_to_addr(qname)
                             if (addr_str is not None and
-                                self.str_is_lsi(addr_str)):
+                                hosts.valid_lsi(addr_str)):
                                 query = (packet, from_a[0], from_a[1], qname)
-                                hit_str = self.lsi_to_hit(addr_str)
+                                hit_str = lsi_to_hit(addr_str)
                                 if hit_str is not None:
                                     pckt['questions'][0][0] = \
-                                            self.addr6_str_to_ptr_str(hit_str)
+                                            hosts.addr_to_ptr(hit_str)
 
                         outbuf = Serialize(pckt).get_packet()
                         clisock.sendto(outbuf, (self.server_ip,
@@ -964,7 +958,7 @@
                                 for answer in packet['answers']:
                                     if (answer[1] == 1 or
                                         (answer[1] == 28 and not
-                                         self.str_is_hit(answer[4]))):
+                                         hosts.valid_hit(answer[4]))):
                                         self.add_hit_ip_map(hit[0], answer[4])
                                 # Reply with HIT/LSI once it's been mapped to
                                 # an IP

=== modified file 'tools/hipdnsproxy/hosts.py' (properties changed: -x to +x)
--- tools/hipdnsproxy/hosts.py  2012-03-25 19:10:40 +0000
+++ tools/hipdnsproxy/hosts.py  2012-04-18 20:32:26 +0000
@@ -25,12 +25,188 @@
 
 """Hosts file handling for hipdnsproxy."""
 
-import binascii
 import os
-import pyip6
+import re
+import socket
 import time
 
 
+HIT_RE = re.compile(r'^2001:1[0-9a-f]:[0-9a-f:]*$')
+
+
+def valid_ipv6(addr):
+    """Is the string a valid IPv6 address?
+
+    >>> valid_ipv6('::1')
+    True
+    >>> valid_ipv6('127.0.0.1')
+    False
+    """
+    try:
+        socket.inet_pton(socket.AF_INET6, addr)
+        return True
+    except socket.error:
+        return False
+
+
+def valid_hit(addr):
+    """Is the string a valid Host Identity Tag?
+
+    >>> valid_hit('2001:010::1')
+    True
+    >>> valid_hit('::1')
+    False
+    >>> valid_hit('2001:1f::1')
+    True
+    >>> valid_hit('127.0.0.1')
+    False
+    """
+    if not valid_ipv6(addr):
+        return False
+
+    caddr = canonicalize_ipv6(addr)
+    if not HIT_RE.match(caddr):
+        return False
+
+    return True
+
+
+def valid_lsi(addr):
+    """Is the string a valid Local Scope Identifier?
+
+    >>> valid_lsi('1.0.0.1')
+    True
+    >>> valid_lsi('127.0.0.1')
+    False
+    >>> valid_lsi('1.0.1')
+    False
+    >>> valid_lsi('1.0.0.365')
+    False
+    >>> valid_lsi('1.foobar')
+    False
+    """
+    parts = addr.split('.')
+
+    if not len(parts) == 4:
+        return False
+
+    if not int(parts[0]) == 1:
+        return False
+
+    in_range = all([0 <= int(x) < 256 for x in parts])
+    if not in_range:
+        return False
+
+    return True
+
+
+def normalize(name):
+    """Normalize FQDN.
+
+    >>> normalize('this.is.a.test....')
+    'this.is.a.test'
+    >>> normalize('this...is..a.test..')
+    'this...is..a.test'
+    >>> normalize('this..is.a.test')
+    'this..is.a.test'
+    >>> normalize('this.is.a.test')
+    'this.is.a.test'
+    """
+    name = name.lower()
+    parts = name.split('.')
+    while parts and parts[-1] == '':
+        parts.pop()
+    return '.'.join(parts)
+
+
+def canonicalize_ipv6(addr):
+    """Return canonical form of IPv6 address.
+
+    >>> canonicalize_ipv6('ff00:1:02:0:0::3:4')
+    'ff00:1:2::3:4'
+    >>> canonicalize_ipv6('::1')
+    '::1'
+    """
+    return socket.inet_ntop(socket.AF_INET6, socket.inet_pton(socket.AF_INET6,
+                                                              addr))
+
+
+def addr_to_ptr(addr):
+    """Return PTR hostname string for IP address string.
+
+    >>> addr_to_ptr('192.168.1.2')
+    '2.1.168.192.in-addr.arpa'
+    >>> addr_to_ptr('2001:001e:361f:8a55:6730:6f82:ef36:2fff')
+    'f.f.f.2.6.3.f.e.2.8.f.6.0.3.7.6.5.5.a.8.f.1.6.3.e.1.0.0.1.0.0.2.ip6.arpa'
+    """
+    if valid_ipv6(addr):
+        return '%s.ip6.arpa' % '.'.join(reversed(addr.replace(':', '')))
+
+    return '%s.in-addr.arpa' % '.'.join(reversed(addr.split('.')))
+
+
+def ptr_to_addr(ptr):
+    """Return IP address string from PTR hostname string.
+
+    >>> ptr_to_addr('2.1.168.192.in-addr.arpa')
+    '192.168.1.2'
+    >>> ptr_to_addr('f.f.f.2.6.3.f.e.2.8.f.6.0.3.7.6.5.5.a.8.f.1.6.3.e.1.0.0'
+    ...             '.1.0.0.2.ip6.arpa')
+    '2001:001e:361f:8a55:6730:6f82:ef36:2fff'
+    """
+    if '.in-addr.arpa' in ptr:
+        return '.'.join(reversed(ptr.split('.')[:4]))
+    if '.ip6.arpa' in ptr:
+        return ':'.join(['%s' * 4] * 8) % tuple(reversed(ptr.split('.')[:32]))
+
+
+def _getrecord(name, src):
+    """Return address + ttl for hostname from dictionary.
+
+    >>> _getrecord('foo', {'foo': ('127.0.0.1', 0)})
+    ('127.0.0.1', 122)
+    >>> _getrecord('bar', {}) is None
+    True
+    >>> _getrecord('baz', {'baz': ('::1', int(time.time())+5)})
+    ('::1', 5)
+    >>> _getrecord('quux', {'quux': ('::', 5)}) is None
+    True
+    """
+    addr = src.get(normalize(name))
+    if addr is None:
+        return None
+    if addr[1] == 0:
+        ttl = 122
+    else:
+        ttl = addr[1] - int(time.time())
+        if ttl < 1:
+            del src[normalize(name)]
+            return
+    return (addr[0], ttl)
+
+
+def _find_name_for_addr_from_src(addr, src):
+    """Find hostname matching address from source.
+
+    >>> _find_name_for_addr_from_src('b::1', {'testhost': ('b::1', 0)})
+    'testhost'
+    >>> _find_name_for_addr_from_src('127.0.0.1',
+    ...                              {'localhost': ('127.0.0.1', 0)})
+    'localhost'
+    >>> _find_name_for_addr_from_src('foo', {}) is None
+    True
+    """
+    for name, record in src.iteritems():
+        naddr = record[0]
+        if valid_ipv6(naddr):
+            caddr = canonicalize_ipv6(addr)
+        else:
+            caddr = naddr
+        # XXX(ptman): normalize? but that's what's been done until now
+        if normalize(addr) == caddr:
+            return name
+
+
 class Hosts:
     """Class for handling a hosts file."""
 
@@ -65,22 +241,6 @@
             self.reread()
             self.modified[self.hostsfile] = hosts_mtime
 
-    def sani(self, name):
-        """Sanitize fqdn."""
-        name = name.lower()
-        parts = name.split('.')
-        while parts and parts[-1] == '':
-            parts.pop()
-        return '.'.join(parts)
-
-    def sani_aaaa(self, addr):
-        """Return PTR hostname of IPv6 address."""
-        binaddr = pyip6.inet_pton(addr)
-        hexaddr = list(binascii.b2a_hex(binaddr))
-        hexaddr.reverse()
-        hexaddr.extend(['ip6', 'arpa'])
-        return '.'.join(hexaddr)
-
     def rcreread(self):
         """Re-read resolv.conf."""
         self.suffixes = ()
@@ -98,80 +258,6 @@
             if keyword == 'search':
                 self.suffixes = tuple([part.lower() for part in parts])
 
-    def str_is_ipv6(self, addr_str):
-        """Is the string a valid IPv6 address?
-
-        TODO(ptman): Improve the test
-        """
-        if addr_str.find(':') == -1:
-            return False
-        return True
-
-    def str_is_hit(self, addr_str):
-        """Is the string a valid Host Identity Tag?"""
-        if addr_str.startswith('2001:001') or (addr_str.startswith('2001:1')
-                                               and addr_str[7] == ':'):
-            return True
-        return False
-
-    def str_is_lsi(self, addr_str):
-        """Is the string a valid Local Scope Identifier?"""
-        if addr_str.startswith('1.'):
-            return True
-        return False
-
-    def ptr4_str_to_addr_str(self, ptr_str):
-        """Convert IPv4 PTR to IPv4 address."""
-        in4 = ''
-        octet = ''
-        for i in range(len(ptr_str)):
-            if ptr_str[i] == '.':
-                in4 = octet + '.' + in4
-                octet = ''
-            else:
-                octet += ptr_str[i]
-        in4 = octet + '.' + in4[0:len(in4) - 1]
-        return in4
-
-    def ptr6_str_to_addr_str(self, ptr_str):
-        """Convert IPv6 PTR to IPv6 address."""
-        in6 = ''
-        for i in range(len(ptr_str)):
-            if (((i + 1) % 8) == 0):
-                in6 += ':'
-            if ptr_str[i] != '.':
-                in6 += ptr_str[i]
-        return in6
-
-    def addr6_str_to_ptr_str(self, addr):
-        """Convert IPv6 address to IPv6 PTR hostname."""
-        # Note: address string must be in the full notation
-        ptr = ''
-        addr = addr[::-1]
-        for char in addr:
-            if char != ':':
-                ptr += char + '.'
-        ptr += 'ip6.arpa'
-        return ptr
-
-    def ptr_str_to_addr_str(self, ptr_str):
-        """Convert PTR hostname to IP address."""
-        # IPv4:
-        # - 102.2.168.192.in-addr.arpa
-        # - 4.3.2.1.in-addr.arpa
-        # IPv6:
-        # - 9.0...f.3.ip6.arpa
-        if not ptr_str:
-            return None
-        end = ptr_str.find('.i')
-        if end == -1:
-            return None
-        addr_part = ptr_str[0:end]
-        if ptr_str.find('.ip6') == -1:
-            return self.ptr4_str_to_addr_str(addr_part)
-        else:
-            return self.ptr6_str_to_addr_str(addr_part[::-1])
-
     def reread(self):
         """Re-read hosts file."""
         name_a = {}
@@ -192,78 +278,54 @@
             addr = fields.pop(0)
 
             for name in fields:
-                name = self.sani(name)
+                name = normalize(name)
 
-                if self.str_is_hit(addr):
+                if valid_hit(addr):
                     name_hit[name] = (addr, 0)
-                elif self.str_is_ipv6(addr):
+                elif valid_ipv6(addr):
                     name_aaaa[name] = (addr, 0)
-                elif not self.str_is_lsi(addr):
+                elif valid_lsi(addr):
                     name_a[name] = (addr, 0)
 
         self.name_a = name_a
         self.name_aaaa = name_aaaa
         self.name_hit = name_hit
 
-    def getaddr_from_list(self, addr_str, names):
-        """Find hostname matching address from names."""
-        for name in names:
-            if self.str_is_ipv6(names[name][0]):
-                # canonicalize IPv6 address
-                binaddr = pyip6.inet_pton(names[name][0])
-                caddr = pyip6.inet_ntop(binaddr)
-            else:
-                caddr = names[name][0]
-            if self.sani(addr_str) == caddr:
-                return name
-
     def getaddr(self, addr):
         """Find hostname matching address."""
         if addr is None:
-            return None
-        if self.str_is_ipv6(addr):
-            # canonicalize IPv6 address
-            binaddr = pyip6.inet_pton(addr)
-            addr_str = pyip6.inet_ntop(binaddr)
-            if self.str_is_hit(addr):
-                return self.getaddr_from_list(addr_str, self.name_hit)
+            return
+        if valid_ipv6(addr):
+            caddr = canonicalize_ipv6(addr)
+            if valid_hit(addr):
+                return _find_name_for_addr_from_src(caddr, self.name_hit)
             else:
-                return self.getaddr_from_list(addr_str, self.name_aaaa)
+                return _find_name_for_addr_from_src(caddr, self.name_aaaa)
         else:
-            return self.getaddr_from_list(addr, self.name_a)
+            return _find_name_for_addr_from_src(addr, self.name_a)
 
     def geta(self, name):
         """Return LSI record for name."""
-        return self.getrecord(name, self.name_a)
+        return _getrecord(name, self.name_a)
 
     def getaaaa(self, name):
         """Return IPv6 record for name."""
-        return self.getrecord(name, self.name_aaaa)
+        return _getrecord(name, self.name_aaaa)
 
     def getaaaa_hit(self, name):
         """Return HIT record for name."""
-        return self.getrecord(name, self.name_hit)
-
-    def getrecord(self, name, src):
-        """Return address + ttl for hostname from dictionary."""
-        addr = src.get(self.sani(name))
-        if addr is None:
-            return None
-        if addr[1] == 0:
-            ttl = 122
-        else:
-            ttl = addr[1] - int(time.time())
-            if ttl < 1:
-                del src[self.sani(name)]
-                return None
-        return (addr[0], ttl)
+        return _getrecord(name, self.name_hit)
 
     def cache_name(self, hostname, addr, ttl):
         """Store hostname-address -mapping in cache for ttl duration."""
         valid_to = int(time.time()) + ttl
-        if self.str_is_hit(addr):
+        if valid_hit(addr):
             self.name_hit[hostname] = (addr, valid_to)
-        elif self.str_is_ipv6(addr):
+        elif valid_ipv6(addr):
             self.name_aaaa[hostname] = (addr, valid_to)
         else:
             self.name_a[hostname] = (addr, valid_to)
+
+if __name__ == '__main__':
+    import doctest
+    doctest.testmod()

=== removed file 'tools/hipdnsproxy/pyip6.py'
--- tools/hipdnsproxy/pyip6.py  2012-03-24 14:56:51 +0000
+++ tools/hipdnsproxy/pyip6.py  1970-01-01 00:00:00 +0000
@@ -1,142 +0,0 @@
-#!/usr/bin/env python
-
-"""Pure Python IP6 parsing and formatting
-
-Copyright (c) 2006 Stuart Gathman <stuart@xxxxxxxx>
-
-This module is free software, and you may redistribute it and/or modify
-it under the same terms as Python itself, so long as this copyright message
-and disclaimer are retained in their original form.
-"""
-
-
-import struct
-import re
-
-
-PAT_IP4 = r'\.'.join([r'(?:\d|[1-9]\d|1\d\d|2[0-4]\d|25[0-5])'] * 4)
-RE_IP4 = re.compile(PAT_IP4 + '$')
-
-
-def inet_ntop(buf):
-    """
-    Convert ip6 address to standard hex notation.
-
-    Examples:
-
-    >>> inet_ntop(struct.pack("!HHHHHHHH", 0, 0, 0, 0, 0, 0xFFFF, 0x0102,
-    ...                       0x0304))
-    '::FFFF:1.2.3.4'
-
-    >>> inet_ntop(struct.pack("!HHHHHHHH", 0x1234, 0x5678, 0, 0, 0, 0, 0x0102,
-    ...                       0x0304))
-    '1234:5678::102:304'
-
-    >>> inet_ntop(struct.pack("!HHHHHHHH", 0, 0, 0, 0x1234, 0x5678, 0, 0x0102,
-    ...                       0x0304))
-    '::1234:5678:0:102:304'
-
-    >>> inet_ntop(struct.pack("!HHHHHHHH", 0x1234, 0x5678, 0, 0x0102, 0x0304,
-    ...                       0, 0, 0))
-    '1234:5678:0:102:304::'
-
-    >>> inet_ntop(struct.pack("!HHHHHHHH", 0, 0, 0, 0, 0, 0, 0, 0))
-    '::'
-    """
-    # convert to 8 words
-    words = struct.unpack('!HHHHHHHH', buf)
-    null = (0, 0, 0, 0, 0, 0, 0, 0)
-    if words == null:
-        return '::'
-    # check for ip4 mapped
-    if words[:5] == (0, 0, 0, 0, 0) and words[5] in (0, 0xFFFF):
-        ip4 = '.'.join([str(i) for i in struct.unpack('!BBBB', buf[12:])])
-        if words[5]:
-            return '::FFFF:' + ip4
-        return '::' + ip4
-    # find index of longest sequence of 0
-    for longest in (7, 6, 5, 4, 3, 2, 1):
-        zeroes = null[:longest]
-        for i in range(9 - longest):
-            if words[i:i + longest] == zeroes:
-                if i == 0:
-                    return ':' + ':%x' * (8 - longest) % words[longest:]
-                if i == 8 - longest:
-                    return '%x:' * (8 - longest) % words[:-longest] + ':'
-                return ('%x:' * i % words[:i] +
-                        ':%x' * (8 - longest - i) % words[i + longest:])
-    return '%x:%x:%x:%x:%x:%x:%x:%x' % words
-
-
-def inet_pton(addr):
-    """
-    Convert ip6 standard hex notation to ip6 address.
-
-    Examples:
-
-    >>> struct.unpack('!HHHHHHHH', inet_pton('::'))
-    (0, 0, 0, 0, 0, 0, 0, 0)
-
-    >>> struct.unpack('!HHHHHHHH', inet_pton('::1234'))
-    (0, 0, 0, 0, 0, 0, 0, 4660)
-
-    >>> struct.unpack('!HHHHHHHH', inet_pton('1234::'))
-    (4660, 0, 0, 0, 0, 0, 0, 0)
-
-    >>> struct.unpack('!HHHHHHHH', inet_pton('1234::5678'))
-    (4660, 0, 0, 0, 0, 0, 0, 22136)
-
-    >>> struct.unpack('!HHHHHHHH', inet_pton('::FFFF:1.2.3.4'))
-    (0, 0, 0, 0, 0, 65535, 258, 772)
-
-    >>> struct.unpack('!HHHHHHHH', inet_pton('1.2.3.4'))
-    (0, 0, 0, 0, 0, 65535, 258, 772)
-
-    >>> try:
-    ...     inet_pton('::1.2.3.4.5')
-    ... except ValueError, x:
-    ...     print x
-    ::1.2.3.4.5
-    """
-    if addr == '::':
-        return '\0' * 16
-    buf = addr
-    match = RE_IP4.search(buf)
-    try:
-        if match:
-            pos = match.start()
-            ip4 = [int(i) for i in buf[pos:].split('.')]
-            if not pos:
-                # pylint: disable=W0142
-                return struct.pack('!QLBBBB', 0, 65535, *ip4)
-            buf = buf[:pos] + '%x%02x:%x%02x' % tuple(ip4)
-        zero_split = buf.split('::')
-        if len(zero_split) == 2:
-            head, tail = zero_split
-            if not head:
-                tail = tail.split(':')
-                return struct.pack('!HHHHHHHH',
-                                   *[0] * (8 - len(tail)) +
-                                   [int(x, 16) for x in tail])
-            if not tail:
-                head = head.split(':')
-                return struct.pack('!HHHHHHHH',
-                                   *[int(x, 16) for x in head] +
-                                   [0] * (8 - len(head)))
-            head = head.split(':')
-            tail = tail.split(':')
-            return struct.pack('!HHHHHHHH',
-                               *[int(x, 16) for x in head] +
-                               [0] * (8 - len(head) - len(tail)) +
-                               [int(x, 16) for x in tail])
-        if len(zero_split) == 1:
-            return struct.pack('!HHHHHHHH',
-                               *[int(x, 16) for x in zero_split[0].split(':')])
-    except ValueError:
-        pass
-    raise ValueError(addr)
-
-
-if __name__ == '__main__':
-    import doctest
-    doctest.testmod()

=== modified file 'tools/hipdnsproxy/util.py'
--- tools/hipdnsproxy/util.py   2012-03-24 14:56:51 +0000
+++ tools/hipdnsproxy/util.py   2012-04-18 20:32:26 +0000
@@ -23,11 +23,7 @@
 # FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
 # OTHER DEALINGS IN THE SOFTWARE.
 
-"""Utility functions for hipdnsproxy.
-
-There are some doctests in this file. To run tests, use
-`python -m doctest util.py`.
-"""
+"""Utility functions for hipdnsproxy."""
 
 import re
 import signal

Other related posts: