------------------------------------------------------------ revno: 6371 committer: Paul Tötterman <paul.totterman@xxxxxx> branch nick: methods-to-functions timestamp: Wed 2012-04-04 20:52:18 +0300 message: Refactored hosts.Hosts.getrecord() and .getaddr_from_list() to functions with tests. modified: tools/hipdnsproxy/hosts.py -- lp:hipl https://code.launchpad.net/~hipl-core/hipl/trunk Your team HIPL core team is subscribed to branch lp:hipl. To unsubscribe from this branch go to https://code.launchpad.net/~hipl-core/hipl/trunk/+edit-subscription
=== modified file 'tools/hipdnsproxy/hosts.py' --- tools/hipdnsproxy/hosts.py 2012-04-03 19:55:22 +0000 +++ tools/hipdnsproxy/hosts.py 2012-04-04 17:52:18 +0000 @@ -154,6 +154,53 @@ return ':'.join(['%s' * 4] * 8) % tuple(reversed(ptr.split('.')[:32])) +def _getrecord(name, src): + """Return address + ttl for hostname from dictionary. + + >>> _getrecord('foo', {'foo': ('127.0.0.1', 0)}) + ('127.0.0.1', 122) + >>> _getrecord('bar', {}) is None + True + >>> _getrecord('baz', {'baz': ('::1', int(time.time())+5)}) + ('::1', 5) + >>> _getrecord('quux', {'quux': ('::', 5)}) is None + True + """ + addr = src.get(normalize(name)) + if addr is None: + return None + if addr[1] == 0: + ttl = 122 + else: + ttl = addr[1] - int(time.time()) + if ttl < 1: + del src[normalize(name)] + return + return (addr[0], ttl) + + +def _find_name_for_addr_from_src(addr, src): + """Find hostname matching address from source. + + >>> _find_name_for_addr_from_src('b::1', {'testhost': ('b::1', 0)}) + 'testhost' + >>> _find_name_for_addr_from_src('127.0.0.1', + ... {'localhost': ('127.0.0.1', 0)}) + 'localhost' + >>> _find_name_for_addr_from_src('foo', {}) is None + True + """ + for name, record in src.iteritems(): + naddr = record[0] + if valid_ipv6(naddr): + caddr = canonicalize_ipv6(addr) + else: + caddr = naddr + # XXX(ptman): normalize? but that's what's been done until now + if normalize(addr) == caddr: + return name + + class Hosts: """Class for handling a hosts file.""" @@ -238,55 +285,30 @@ self.name_aaaa = name_aaaa self.name_hit = name_hit - def getaddr_from_list(self, addr, names): - """Find hostname matching address from names.""" - for name in names: - naddr = names[name][0] - if valid_ipv6(naddr): - caddr = canonicalize_ipv6(naddr) - else: - caddr = naddr - if normalize(addr) == caddr: - return name - def getaddr(self, addr): """Find hostname matching address.""" if addr is None: - return None + return if valid_ipv6(addr): caddr = canonicalize_ipv6(addr) if valid_hit(addr): - return self.getaddr_from_list(caddr, self.name_hit) + return _find_name_for_addr_from_src(caddr, self.name_hit) else: - return self.getaddr_from_list(caddr, self.name_aaaa) + return _find_name_for_addr_from_src(caddr, self.name_aaaa) else: - return self.getaddr_from_list(addr, self.name_a) + return _find_name_for_addr_from_src(addr, self.name_a) def geta(self, name): """Return LSI record for name.""" - return self.getrecord(name, self.name_a) + return _getrecord(name, self.name_a) def getaaaa(self, name): """Return IPv6 record for name.""" - return self.getrecord(name, self.name_aaaa) + return _getrecord(name, self.name_aaaa) def getaaaa_hit(self, name): """Return HIT record for name.""" - return self.getrecord(name, self.name_hit) - - def getrecord(self, name, src): - """Return address + ttl for hostname from dictionary.""" - addr = src.get(normalize(name)) - if addr is None: - return None - if addr[1] == 0: - ttl = 122 - else: - ttl = addr[1] - int(time.time()) - if ttl < 1: - del src[normalize(name)] - return None - return (addr[0], ttl) + return _getrecord(name, self.name_hit) def cache_name(self, hostname, addr, ttl): """Store hostname-address -mapping in cache for ttl duration."""