Skip to content.

logo Stackless Python


Personal tools
Views

ChannelExamples

last edited 7 years ago by mprovost

Deriving from stackless.channel

Keep in mind that you are free to derive from stackless channels to implement additional logic.

Example: Nonblocking Channel:

import stackless

class NonblockingChannel(stackless.channel):
    def send(self, value, wait=False):
        if wait or self.balance < 0: 
            # there are tasklets waiting to receive
            return stackless.channel.send(self, value)
    def send_exception(self, exc, value, wait=False):
        if wait or self.balance < 0: 
            # there are tasklets waiting to receive
            return stackless.channel.send_exception(self, exc, value)
    def receive(self, default=None, wait=False):
        if wait or self.balance > 0:
            # there are tasklets waiting to send
            return stackless.channel.receive(self)
        else:
            return default

def testNonblockingChannel():
    print
    print "testNonblockingChannel"
    print "----------------------"

    def recv(ch, name):
        print "Started recv<%s>" % (name,)
        print "recv<%s>: got a message from \\"%s\\"" % (name, ch.receive(wait=True))
        ch.send(name)

    ch = NonblockingChannel()
    ch.receive() # nonblocking receive
    ch.send("nonblocking!") # nonblocking send

    for name in "ABCDE":
        task = stackless.tasklet(recv)(ch, name)
        task.run()

    ch.send('host')
    print

testNonblockingChannel() should print:

testNonblockingChannel
----------------------
Started recv<A>
Started recv<B>
Started recv<C>
Started recv<D>
Started recv<E>
recv<A>: got a message from "host"
recv<B>: got a message from "A"
recv<C>: got a message from "B"
recv<D>: got a message from "C"
recv<E>: got a message from "D"

Example: Broadcast Channel:

import stackless

class BroadcastChannel(stackless.channel):
    def send(self, value, wait=False):
        result = None
        for idx in range(0, self.balance, -1):
            # there are tasklets waiting to receive
            result = stackless.channel.send(self, value)
        return result
    def send_exception(self, exc, value, wait=False):
        result = None
        for idx in range(0, self.balance, -1):
            # there are tasklets waiting to receive
            result = stackless.channel.send_exception(self, exc, value)
        return result

def testBroadcastChannel():
    print
    print "testBroadcastChannel"
    print "--------------------"
    def recv(ch, name):
        print "Started recv<%s>" % (name,)
        print "recv<%s>: %r" % (name, ch.receive())

    ch = BroadcastChannel()

    # Essentially nonblocking on sends when there are no receivers
    ch.send("Test when empty")

    for name in "ABCDE":
        task = stackless.tasklet(recv)(ch, name)
        task.run()

    ch.send("broadcast from host")
    print

testBroadcastChannel() should print:

testBroadcastChannel
--------------------
Started recv<A>
Started recv<B>
Started recv<C>
Started recv<D>
Started recv<E>
recv<A>: 'broadcast from host'
recv<B>: 'broadcast from host'
recv<C>: 'broadcast from host'
recv<D>: 'broadcast from host'
recv<E>: 'broadcast from host'

Example: Iterator Channel:

import stackless
from exceptions import StopIteration?

# subclass a channel so when you send an iterator it also sends a StopIteration?
class iterchannel (stackless.channel):
    def send_sequence(self, sequence):
        stackless.channel.send_sequence(self, sequence)
        return stackless.channel.send_exception(self, StopIteration?)

def test_send(ch):
    channel.send_sequence(xrange(10))

def test_receive(channel):
    for y in channel:
            print y
    print "done!"

def testIterChannel():
    ch = iterchannel()
    sender = stackless.tasklet(test_send)(ch)
    stackless.tasklet(test_receive)(ch)
    stackless.run()

testIterChannel() should print:

0
1
2
3
4
5
6
7
8
9
done!

 

Powered by Plone