import re
import socket
import time
+import fcntl # For get_interface_addr
+import struct # For get_interface_addr
import connection
self.hosts_allow = hosts_allow
connection.SocketListener.__init__(self, protocol_class)
+ def isValidHex(self, word):
+ # If we have empty word we treat it as valid
+ if len(word) == 0:
+ return True
+ try:
+ int(word, 16)
+ return True
+ except ValueError:
+ return False
+
+ def isValidIP(self, ipaddr):
+ # Check for IPv4 address
+ numValid = 0
+ tmp = ipaddr.split('.')
+ for byte in tmp:
+ if byte.isdigit():
+ numValid += 1
+
+ if numValid == len(tmp):
+ return True
+
+ # Check for IPv6 address
+ numValid = 0
+ tmp = ipaddr.split(':')
+ for word in tmp:
+ if self.isValidHex(word):
+ numValid += 1
+
+ return numValid == len(tmp)
+
+ def getIfAddr(self, ifname):
+ s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+ try:
+ x = socket.inet_ntoa(fcntl.ioctl(
+ s.fileno(),
+ 0x8915, # SIOCGIFADDR
+ struct.pack('256s', ifname[:15])
+ )[20:24])
+ s.close()
+ except Exception, e:
+ x = ifname
+
+ return x
def createSocket(self):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
timeout = time.time() + 30
while True:
try:
+ if not self.isValidIP(self.interface):
+ self.interface = self.getIfAddr(self.interface)
+ log.debug("Listening on %s:%s" % (self.interface, self.port))
sock.bind((self.interface, self.port))
return sock
except socket.error, (_errno, strerrno):
TCPListener.__init__(self, protocol_class, port, interface, hosts_allow)
+ def isValidHex(self, word):
+ # If we have empty word we treat it as valid
+ if len(word) == 0:
+ return True
+ try:
+ int(word, 16)
+ return True
+ except ValueError:
+ return False
+
+ def isValidIP(self, ipaddr):
+ # Check for IPv4 address
+ numValid = 0
+ tmp = ipaddr.split('.')
+ for byte in tmp:
+ if byte.isdigit():
+ numValid += 1
+
+ if numValid == len(tmp):
+ return True
+
+ # Check for IPv6 address
+ numValid = 0
+ tmp = ipaddr.split(':')
+ for word in tmp:
+ if self.isValidHex(word):
+ numValid += 1
+
+ return numValid == len(tmp)
+
+ def getIfAddr(self, ifname):
+ s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+ try:
+ x = socket.inet_ntoa(fcntl.ioctl(
+ s.fileno(),
+ 0x8915, # SIOCGIFADDR
+ struct.pack('256s', ifname[:15])
+ )[20:24])
+ s.close()
+ except Exception, e:
+ x = ifname
+
+ return x
def createSocket(self):
from OpenSSL import SSL
timeout = time.time() + 30
while True:
try:
+ if not self.isValidIP(self.interface):
+ self.interface = self.getIfAddr(self.interface)
+ log.debug("Listening on %s:%s" % (self.interface, self.port))
sock.bind((self.interface, self.port))
return sock
except socket.error, (_errno, strerrno):