spug.io.proactor
index
/home/mmuller/w/spug/io/proactor.py

My first cut at writing a Proactor framework.
 
The Proactor pattern is a concept introduced to me by Alex Libman (author
of the ACE Proactor, and the single best communication framework programmer
that I have ever had the pleasure of working with).  Proactor is almost an
extension of the standard Reactor pattern - both make callbacks to
handler classes in response to low-level communications events.  The
difference between the two is that while the Reactor notifies the handler
when the communication channel is /ready/ for a read or write, the Proactor
actually /performs/ the IO, notifying the handler only to put received data
to it and get received data from it.
 
This approach is superior because it completely hides the
details of the underlying communication channel from the data handler,
allowing the implementation details to be custom-tailored to whatever
mechanisms are available on the operating system.
 
TODO: provide the proactor and the connection object to the handlers
through the callback methods.

 
Modules
       
errno
fcntl
os
select
socket
time
weakref

 
Classes
       
io.proactor.ConnectHandler
io.proactor.ControlQueue
io.proactor.DataHandler
io.proactor.Proactor
io.proactor.ReactorProactor
io.reactor.ConnectionSocket(io.reactor.Socket)
io.proactor.Connection
io.reactor.ListenerSocket(io.reactor.Socket)
io.proactor.Listener

 
class ConnectHandler
    Abstract interface for classes that provide a connect callback and
create @DataHandler's.
 
  Methods defined here:
handleConnect(self, connection)
Called by the @Proactor when a new connection has been received.  The
handler should use this opportunity to provide the connection with a
@DataHandler.
readyToClose(self)
When this method returns true, it signals the proactor to remove the
ConnectHandler and its associated listener from the list of monitored
objects and shut down the listener.
 
May be overriden by derived classes.  Base class implementation
always returns False.

 
class Connection(io.reactor.ConnectionSocket)
    Provids the glue between reactor's ConnectionSocket and the Proactor's
DataHandler.
 
 
Method resolution order:
Connection
io.reactor.ConnectionSocket
io.reactor.Socket
io.reactor.Reactable

Methods defined here:
__init__(self, proactor, sock=None, addr=None, dataHandler=None)
handleDisconnect(self, dispatcher)
handleError(self, dispatcher)
handleRead(self, dispatcher)
handleWrite(self, dispatcher)
setDataHandler(self, handler)
Sets the data handler for the connection.
wantsToRead(self)
wantsToWrite(self)

Methods inherited from io.reactor.Socket:
fileno(self)

 
class ControlQueue
    A queue that can be used to communicate with a proactor from an external
thread.
 
  Methods defined here:
__init__(self, writeStream, handler)
add(self, elem)
Add a new element to the queue.
close(self)
Close the queue.
processElems(self, count)
Process the specified number of eleements.

 
class DataHandler
    Abstract interface for classes that server and retrieve data.
 
  Methods defined here:
get(self, size)
Called by Proactor when the block of data returned by the last
@peek() has successfully been written to the communication
channel.
 
parms:
   size::
      [int] maximum size of buffer to return.  This will always be
      less than or equal to the size of the data a returned from the
      last peek().
handleConnect(self)
Called by Proactor when the connection is fully initialized.
 
May be overriden, base class version does nothing.
 
TODO: call this!!
handleDisconnect(self)
Called by Proactor when the connection disconnects.
 
The handler should not attempt to deregister the object from the
proactor - this will be done automatically by the proactor.
 
May be implemented by derived classes, base class implementation does
nothing.
handleError(self)
Called by Proactor when an error occurs on the stream.
 
May be implemented by derived classes, base class implementation
deregisters.
peek(self, size)
Called by Proactor when the connection is ready to write data and the
handler is ready to read.  Should return a string buffer no bigger
than /size/.  Note that the data buffer should not be considered to
have been consumed by the underlying communicaton channel until
@get() has been called - multiple calls to peek() without
a call to @get() should return the same data block.
 
parms:
   size::
      [int] maximum size of buffer to return
put(self, data)
Called by Proactor when data has been read from the connection.
 
parms:
   data::
      [string] the data that has been read.
readyToClose(self)
Returns true if the handler is ready to close.  The Proactor will
respond to this by shutting down the connection and discarding the
handler.
 
May be implemented by derived classes, base class implementation
always returns False.
readyToGet(self)
Returns true if the handler is ready to provide data to its
connection via @get().
readyToPut(self)
Returns true if the handler is ready to consume data from its
connection via @put()

 
class Listener(io.reactor.ListenerSocket)
    Provides the glue between ConnectionSocket and ConnectHandler.
 
 
Method resolution order:
Listener
io.reactor.ListenerSocket
io.reactor.Socket
io.reactor.Reactable

Methods defined here:
__init__(self, proactor, sock=None, inetAddress=None, connectHandler=None)
handleNewConnection(self, dispatcher, sock, addr)
wantsToRead(self)

Methods inherited from io.reactor.ListenerSocket:
handleRead(self, reactor)
wantsToWrite(self)

Methods inherited from io.reactor.Socket:
fileno(self)

Methods inherited from io.reactor.Reactable:
handleDisconnect(self, dispatcher)
Called when a disconnect occurs.
handleError(self, dispatcher)
Called when there is an error.
handleWrite(self, dispatcher)
Called when the connection is ready to write.

 
class Proactor
    An abstract base class defining the Proactor interrface.  Proactor
attempts to abstract all details of communication multiplexing away from
the user.
 
  Methods defined here:
add(self, object)
Adds the object from the set of managed objects.  /object/ must be an
object returned from one of the "make" methods.
 
XXX I have some doubts about addding this method - if one proactor
tries to add objects created by another proactor using a very
different implementation, things will probably break.  Should at
least verify that the object is of the proper type for the proactor.
 
Must be implemented by derived class.
 
parms:
   object::
      [any] A proactor object created using one of the "make"
      methods.
hasConnections(self)
Returns true if the proactor has any connections.
makeConnection(self, address, dataHandler)
Creates and returns a new connection.
 
parms:
   address::
      [@Address]
makeControlQueue(self, queueHandler)
Creates a "control queue" which can be used to send messages to the
proactor from another thread.
 
parms:
   queueHandler::
      [callable<any>] a function that gets called (and passed the
      element) every time an element gets removed from the queue.
makeListener(self, address, connectHandler)
Creates and returns a new listener.
 
parms:
   address::
      [@Address]
makePipe(self, readHandler, writeHandler)
Creates and returns a tuple of two new connections constituting a
pipe.  The first is a read pipe, the second is a write pipe.
 
parms:
   readHandler::
      [@DataHandler] data handler for the read side of the
      connection.
   writeHandler::
      [@DataHandler] data handler for the write side of the
      conneciton.
makeSubprocess(self, cmd, stdinHandler=None, stdoutHandler=None, stderrHandler=None)
Creates a subprocess and connections for standard input, output and 
error.  Returns a tuple<spug.util.process.ChildProcess, Connection
...> containing as many connections as were created - same as the 
number of handlers passed in.
 
The function allows passing handlers for standard input, standard 
output and standard error.  When these are not provided (passed in as 
None) the channels are unmanaged, and fall through to the 
standard input, output and error channels for the process.
 
parms:
   cmd: [list<str>] the command to run.
   stdinHandler: [@DataHandler or None] Standard input handler.
   stdoutHandler: [@DataHandler or None] Standard output handler.
   stderrHandler: [@DataHandler or None] Standard error handler.
makeWrapper(self, file, dataHandler)
Creates and returns a wrapper around an existing file object.
 
The use of this method should be avoided, as it may not be portable.
remove(self, object)
Removes the object from the set of managed objects.  /object/ must be
an object returned from one of the "make" methods.
 
parms:
   object::
      [any] A proactor object created using one of the "make"
      methods.
run(self)
Waits for events and dispatches them to handlers until all
connections terminate.

 
class ReactorProactor(Proactor)
    Proactor implementation based on a reactor.
 
  Methods defined here:
__init__(self, reactor)
add(self, object)
hasConnections(self)
makeConnection(self, address, dataHandler)
makeControlQueue(self, queueHandler)
makeListener(self, address, connectHandler)
makePipe(self, readHandler, writeHandler)
makeSubprocess(self, cmd, stdinHandler=None, stdoutHandler=None, stderrHandler=None)
makeWrapper(self, file, dataHandler)
processOneEvent(self)
remove(self, object)
run(self)
schedule(self, time, action)
Schedules an action to be performed at a particular time.
 
parms:
   when: [float] Time measured in seconds from now.  If this is 
      negative or zero, the action will be performed the next time we 
      get an event.
   action: [callable<>] Action to be performed.

 
Functions
       
createProactor()
Returns a new instance of the optimal proactor for the current platform.

 
Data
        MTU = 4096