1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132
|
import select
import types
import collections
# Object that represents a running task
class Task(object):
def __init__(self,target):
self.target = target # A coroutine
self.sendval = None # Value to send when resuming
self.stack = [] # Call stack
def run(self):
try:
result = self.target.send(self.sendval)
if isinstance(result,SystemCall):
return result
if isinstance(result,types.GeneratorType):
self.stack.append(self.target)
self.sendval = None
self.target = result
else:
if not self.task: return
self.sendval = result
self.target = self.stack.pop()
except StopIteration:
if not self.stack: raise
self.sendval = None
self.target = self.stack.pop()
# Object that represents a "system call"
class SystemCall(object):
def handle(self,sched,task): pass
# Implementation of different system calls
class ReadWait(SystemCall):
def __init__(self,f):
self.f = f
def handle(self,sched,task):
fileno = self.f.fileno()
sched.readwait(task,fileno)
class WriteWait(SystemCall):
def __init__(self,f):
self.f = f
def handle(self,sched,task):
fileno = self.f.fileno()
sched.writewait(task,fileno)
class NewTask(SystemCall):
def __init__(self,target):
self.target = target
def handle(self,sched,task):
sched.new(self.target)
sched.schedule(task)
# Scheduler object
class Scheduler(object):
def __init__(self):
self.task_queue = collections.deque()
self.read_waiting = {}
self.write_waiting = {}
self.numtasks = 0
# Create a new task out of a coroutine
def new(self,target):
newtask = Task(target)
self.schedule(newtask)
self.numtasks += 1
# Put a task on the task queue
def schedule(self,task):
self.task_queue.append(task)
# Have a task wait for data on a file descriptor
def readwait(self,task,fd):
self.read_waiting[fd] = task
# Have a task wait for writing on a file descriptor
def writewait(self,task,fd):
self.write_waiting[fd] = task
# Main scheduler loop
def mainloop(self,count=-1,timeout=None):
while self.numtasks:
# Check for I/O events to handle
if self.read_waiting or self.write_waiting:
wait = 0 if self.task_queue else timeout
#r,w,e = select.select(self.read_waiting,self.write_waiting,[],wait)
r = self.read_waiting.keys()
w = self.write_waiting.keys()
for fileno in r:
self.schedule(self.read_waiting.pop(fileno))
for fileno in w:
self.schedule(self.write_waiting.pop(fileno))
# Run all the tasks on the queue that are ready to run
while self.task_queue:
task = self.task_queue.popleft()
try:
result = task.run()
if isinstance(result,SystemCall):
result.handle(self,task)
else:
self.schedule(task)
except StopIteration:
self.numtasks -= 1
# If no tasks can run, we decide if we wait or return
else:
if count > 0: count -= 1
if count == 0: return
if __name__ == '__main__':
from socket import socket, AF_INET, SOCK_STREAM
def time_server(address):
import time
s = socket(AF_INET,SOCK_STREAM)
s.bind(address)
s.listen(5)
while True:
yield ReadWait(s)
conn, addr = s.accept()
print("Connection from %s " % str(addr))
yield WriteWait(conn)
resp = time.ctime() + "\r\n"
conn.send(resp.encode('latin-1'))
conn.close()
sched = Scheduler()
sched.new(time_server(('',10000))) # Server on port 10000
sched.new(time_server(('',11000))) # Server on port 11000
sched.mainloop() |
Partager