"""Factory for creating channels.
Maintains a table of channels.
"""
-
+
+ """ Channels indexed by index. """
channels = {}
def __init__(self):
+ """Constructor - do not use. Use the channelFactory function."""
self.notifier = xend.utils.notifier()
def addChannel(self, channel):
+ """Add a channel.
+ """
idx = channel.idx
self.channels[idx] = channel
self.notifier.bind(idx)
#channel.notify()
def getChannel(self, idx):
+ """Get the channel with the given index (if any).
+ """
return self.channels.get(idx)
def delChannel(self, idx):
+ """Remove the channel with the given index (if any).
+ """
if idx in self.channels:
del self.channels[idx]
self.notifier.unbind(idx)
def domChannel(self, dom):
+ """Get the channel for the given domain.
+ """
for chan in self.channels.values():
if chan.dom == dom:
return chan
return chan
def channelClosed(self, channel):
+ """The given channel has been closed - remove it.
+ """
self.delChannel(channel.idx)
def createPort(self, dom):
+ """Create a port for a channel to the given domain.
+ """
return xend.utils.port(dom)
def channelFactory():
+ """Singleton constructor for the channel factory.
+ Use this instead of the class constructor.
+ """
global inst
try:
inst
"""
def __init__(self, factory, dom):
+ """Create a channel to the given domain using the given factory.
+ """
self.factory = factory
self.dom = dom
self.port = self.factory.createPort(dom)
self.queue = []
def getIndex(self):
+ """Get the channel index.
+ """
return self.idx
def getLocalPort(self):
+ """Get the local port.
+ """
return self.port.local_port
def getRemotePort(self):
+ """Get the remote port.
+ """
return self.port.remote_port
def close(self):
+ """Close the channel. Calls lostChannel() on all its devices and
+ channelClosed() on the factory.
+ """
for d in self.devs:
d.lostChannel()
self.factory.channelClosed(self)
return self.devs_by_type.get(type)
def getMessageType(self, msg):
+ """Get a 2-tuple of the message type and subtype.
+ """
hdr = msg.get_header()
return (hdr['type'], hdr.get('subtype'))
#print 'notificationReceived<', work
def notify(self):
- #print 'notify>', self
self.port.notify()
def handleRequests(self):
- #print 'handleRequests>'
work = 0
while 1:
- #print 'handleRequests>', work
msg = self.readRequest()
- #print 'handleRequests> msg=', msg
if not msg: break
self.requestReceived(msg)
work += 1
- #print 'handleRequests<', work
return work
def requestReceived(self, msg):
(ty, subty) = self.getMessageType(msg)
- #print 'requestReceived>', ty, subty, self
#todo: Must respond before writing any more messages.
#todo: Should automate this (respond on write)
self.port.write_response(msg)
% (msgTypeName(ty, subty), ty, subty)), self
def handleResponses(self):
- #print 'handleResponses>', self
work = 0
while 1:
- #print 'handleResponses>', work
msg = self.readResponse()
- #print 'handleResponses> msg=', msg
if not msg: break
self.responseReceived(msg)
work += 1
- #print 'handleResponses<', work
return work
def responseReceived(self, msg):
(ty, subty) = self.getMessageType(msg)
- #print 'responseReceived>', ty, subty
dev = self.getDevice(ty)
if dev:
dev.responseReceived(msg, ty, subty)
% (msgTypeName(ty, subty), ty, subty)), self
def handleWrites(self):
- #print 'handleWrites>', self
work = 0
# Pull data from producers.
- #print 'handleWrites> pull...'
for dev in self.devs:
work += dev.produceRequests()
# Flush the queue.
- #print 'handleWrites> flush...'
while self.queue and self.port.space_to_write_request():
msg = self.queue.pop(0)
self.port.write_request(msg)
work += 1
- #print 'handleWrites<', work
return work
def writeRequest(self, msg, notify=1):
- #print 'writeRequest>', self
if self.closed:
val = -1
elif self.writeReady():
else:
self.queue.append(msg)
val = 0
- #print 'writeRequest<', val
return val
def writeResponse(self, msg):
- #print 'writeResponse>', self
if self.closed: return -1
self.port.write_response(msg)
return 1
return self.port.space_to_write_request()
def readRequest(self):
- #print 'readRequest>', self
if self.closed:
- #print 'readRequest> closed'
return None
if self.port.request_to_read():
val = self.port.read_request()
else:
val = None
- #print 'readRequest< ', val
return val
def readResponse(self):
- #print 'readResponse>', self
if self.closed:
- #print 'readResponse> closed'
return None
if self.port.response_to_read():
val = self.port.read_response()
else:
val = None
- #print 'readResponse<', val
return val
import xend.utils
+DEBUG = 0
+
""" All message formats.
Added to incrementally for the various message types.
See below.
pass
def packMsg(ty, params):
- print '>packMsg', ty, params
+ if DEBUG: print '>packMsg', ty, params
(major, minor, packing) = msg_formats[ty]
args = {}
for (k, v) in params.items():
args['mac[%d]' % i] = v[i]
else:
args[k] = v
- for (k, v) in args.items():
- print 'packMsg>', k, v, type(v)
+ if DEBUG:
+ for (k, v) in args.items():
+ print 'packMsg>', k, v, type(v)
msgid = 0
msg = xend.utils.message(major, minor, msgid, args)
return msg
args['mac'] = mac
for k in macs:
del args[k]
- print '<unpackMsg', ty, args
+ if DEBUG: print '<unpackMsg', ty, args
return args
def msgTypeName(ty, subty):