debuggers.hg

view tools/python/xen/xend/EventServer.py @ 4655:a838a908e38e

bitkeeper revision 1.1327.2.2 (4267a9b3MhPpljnjQ5IbfLdzcW2K3w)

Remove twisted from the HTTP server and replace with a
threaded server. Add classes to provide tcp and unix servers
using threads instead of twisted. Remove use of twisted from
the consoles, event server and HTTP resources

Signed-off-by: Mike Wray <mike.wray@hp.com>
author mjw@wray-m-3.hpl.hp.com
date Thu Apr 21 13:25:07 2005 +0000 (2005-04-21)
parents 236a9f2698a3
children e81b04e1c86a
line source
1 # Copyright (C) 2004 Mike Wray <mike.wray@hp.com>
2 """Simple publish/subscribe event server.
4 """
5 import string
6 from threading import Lock
8 #from twisted.internet import reactor
9 from xen.web import reactor
11 # subscribe a.b.c h: map a.b.c -> h
12 # subscribe a.b.* h: map a.b.* -> h
13 # subscribe a.b.? h: map a.b.? -> h
14 #
15 # for event a.b.c.d:
16 #
17 # lookup a.b.c.d, call handlers
18 #
19 # lookup a.b.c.?, call handlers
20 #
21 # lookup a.b.c.d.*, call handlers
22 # lookup a.b.c.*, call handlers
23 # lookup a.b.*, call handlers
24 # lookup a.*, call handlers
25 # lookup *, call handlers
27 # a.b.c.d = (a b c d)
28 # a.b.c.? = (a b c _)
29 # a.b.c.* = (a b c . _)
31 class EventServer:
33 DOT = '.'
34 QUERY = '?'
35 DOT_QUERY = DOT + QUERY
36 STAR = '*'
37 DOT_STAR = DOT + STAR
39 def __init__(self, run=0):
40 self.handlers = {}
41 self.run = run
42 self.queue = []
43 self.lock = Lock()
45 def start(self):
46 """Enable event handling. Sends any queued events.
47 """
48 try:
49 self.lock.acquire()
50 self.run = 1
51 queue = self.queue
52 self.queue = []
53 finally:
54 self.lock.release()
55 for (e,v) in queue:
56 self.inject(e, v)
58 def stop(self):
59 """Suspend event handling. Events injected while suspended
60 are queued until we are started again.
61 """
62 try:
63 self.lock.acquire()
64 self.run = 0
65 finally:
66 self.lock.release()
68 def subscribe(self, event, handler):
69 """Subscribe to an event. For example 'a.b.c.d'.
70 A subcription like 'a.b.c.?' ending in '?' matches any value
71 for the '?'. A subscription like 'a.b.c.*' ending in '*' matches
72 any event type with the same prefix, 'a.b.c' in this case.
74 event event name
75 handler event handler fn(event, val)
76 """
77 try:
78 self.lock.acquire()
79 hl = self.handlers.get(event)
80 if hl is None:
81 self.handlers[event] = [handler]
82 else:
83 hl.append(handler)
84 finally:
85 self.lock.release()
87 def unsubscribe_all(self, event=None):
88 """Unsubscribe all handlers for a given event, or all handlers.
90 event event (optional)
91 """
92 try:
93 self.lock.acquire()
94 if event == None:
95 self.handlers.clear()
96 elif event in self.handlers:
97 del self.handlers[event]
98 finally:
99 self.lock.release()
101 def unsubscribe(self, event, handler):
102 """Unsubscribe a given event and handler.
104 event event
105 handler handler
106 """
107 try:
108 self.lock.acquire()
109 hl = self.handlers.get(event)
110 if hl is None:
111 return
112 if handler in hl:
113 hl.remove(handler)
114 finally:
115 self.lock.release()
117 def inject(self, event, val, async=1):
118 """Inject an event. Handlers for it are called if running, otherwise
119 it is queued.
121 event event type
122 val event value
123 """
124 try:
125 self.lock.acquire()
126 if not self.run:
127 self.queue.append( (event, val) )
128 return
129 finally:
130 self.lock.release()
132 if async:
133 reactor.callLater(0, self.call_handlers, event, val)
134 else:
135 self.notify_handlers(event, val)
137 def call_handlers(self, event, val):
138 """Internal method to call event handlers.
139 """
140 #print ">event", event, val
141 self.call_event_handlers(event, event, val)
142 self.call_query_handlers(event, val)
143 self.call_star_handlers(event, val)
145 def call_event_handlers(self, key, event, val):
146 """Call the handlers for an event.
147 It is safe for handlers to subscribe or unsubscribe.
149 key key for handler list
150 event event type
151 val event value
152 """
153 try:
154 self.lock.acquire()
155 hl = self.handlers.get(key)
156 if hl is None:
157 return
158 # Copy the handler list so that handlers can call
159 # subscribe/unsubscribe safely - python list iteration
160 # is not safe against list modification.
161 hl = hl[:]
162 finally:
163 self.lock.release()
164 # Must not hold the lock while calling the handlers.
165 for h in hl:
166 try:
167 h(event, val)
168 except:
169 pass
171 def call_query_handlers(self, event, val):
172 """Call regex handlers for events matching 'event' that end in '?'.
174 event event type
175 val event value
176 """
177 dot_idx = event.rfind(self.DOT)
178 if dot_idx == -1:
179 self.call_event_handlers(self.QUERY, event, val)
180 else:
181 event_query = event[0:dot_idx] + self.DOT_QUERY
182 self.call_event_handlers(event_query, event, val)
184 def call_star_handlers(self, event, val):
185 """Call regex handlers for events matching 'event' that end in '*'.
187 event event type
188 val event value
189 """
190 etype = string.split(event, self.DOT)
191 for i in range(len(etype), 0, -1):
192 event_star = self.DOT.join(etype[0:i]) + self.DOT_STAR
193 self.call_event_handlers(event_star, event, val)
194 self.call_event_handlers(self.STAR, event, val)
196 def instance():
197 global inst
198 try:
199 inst
200 except:
201 inst = EventServer()
202 inst.start()
203 return inst
205 def main():
206 def sys_star(event, val):
207 print 'sys_star', event, val
209 def sys_foo(event, val):
210 print 'sys_foo', event, val
211 s.unsubscribe('sys.foo', sys_foo)
213 def sys_foo2(event, val):
214 print 'sys_foo2', event, val
216 def sys_bar(event, val):
217 print 'sys_bar', event, val
219 def sys_foo_bar(event, val):
220 print 'sys_foo_bar', event, val
222 def foo_bar(event, val):
223 print 'foo_bar', event, val
225 s = EventServer()
226 s.start()
227 s.subscribe('sys.*', sys_star)
228 s.subscribe('sys.foo', sys_foo)
229 s.subscribe('sys.foo', sys_foo2)
230 s.subscribe('sys.bar', sys_bar)
231 s.subscribe('sys.foo.bar', sys_foo_bar)
232 s.subscribe('foo.bar', foo_bar)
233 s.inject('sys.foo', 'hello')
234 print
235 s.inject('sys.bar', 'hello again')
236 print
237 s.inject('sys.foo.bar', 'hello again')
238 print
239 s.inject('foo.bar', 'hello again')
240 print
241 s.inject('foo', 'hello again')
242 print
243 s.start()
244 s.unsubscribe('sys.*', sys_star)
245 s.unsubscribe_all('sys.*')
246 s.inject('sys.foo', 'hello')
248 if __name__ == "__main__":
249 main()