# HG changeset patch # User mjw@wray-m-3.hpl.hp.com # Date 1114183843 0 # Node ID d781b9d08e804798acbb9c2a5d8228c0e3314314 # Parent 40af907d69a9768cd8e6e7192b10766461cb22ee bitkeeper revision 1.1327.2.4 (426918a34Af7gihN8mTkq-P3KrAZXg) Remove twisted from save/migrate handling. This needs to use threads, so add thread support for http server requests. Signed-off-by: Mike Wray diff -r 40af907d69a9 -r d781b9d08e80 tools/python/xen/web/SrvBase.py --- a/tools/python/xen/web/SrvBase.py Thu Apr 21 14:11:29 2005 +0000 +++ b/tools/python/xen/web/SrvBase.py Fri Apr 22 15:30:43 2005 +0000 @@ -2,6 +2,7 @@ import types + from xen.xend import sxp from xen.xend import PrettyPrint from xen.xend.Args import ArgError @@ -10,6 +11,7 @@ from xen.xend.XendLogging import log import resource import http +import httpserver import defer def uri_pathlist(p): @@ -29,19 +31,8 @@ class SrvBase(resource.Resource): def use_sxp(self, req): - """Determine whether to send an SXP response to a request. - Uses SXP if there is no User-Agent, no Accept, or application/sxp is in Accept. - - req request - returns 1 for SXP, 0 otherwise - """ - ok = 0 - user_agent = req.getHeader('User-Agent') - accept = req.getHeader('Accept') - if (not user_agent) or (not accept) or (accept.find(sxp.mime_type) >= 0): - ok = 1 - return ok - + return req.useSxp() + def get_op_method(self, op): """Get the method for an operation. For operation 'foo' looks for 'op_foo'. @@ -60,7 +51,7 @@ class SrvBase(resource.Resource): The method must return a list when req.use_sxp is true and an HTML string otherwise (or list). - Methods may also return a Deferred (for incomplete processing). + Methods may also return a ThreadRequest (for incomplete processing). req request """ @@ -76,85 +67,10 @@ class SrvBase(resource.Resource): req.write("Operation not implemented: " + op) return '' else: - return self._perform(op, op_method, req) - - def _perform(self, op, op_method, req): - try: - val = op_method(op, req) - except Exception, err: - self._perform_err(err, op, req) - return '' - - if isinstance(val, defer.Deferred): - val.addCallback(self._perform_cb, op, req, dfr=1) - val.addErrback(self._perform_err, op, req, dfr=1) - return server.NOT_DONE_YET - else: - self._perform_cb(val, op, req, dfr=0) - return '' - - def _perform_cb(self, val, op, req, dfr=0): - """Callback to complete the request. - May be called from a Deferred. - - @param err: the error - @param req: request causing the error - @param dfr: deferred flag - """ - if isinstance(val, resource.ErrorPage): - req.write(val.render(req)) - elif self.use_sxp(req): - req.setHeader("Content-Type", sxp.mime_type) - sxp.show(val, out=req) - else: - req.write('') - self.print_path(req) - if isinstance(val, types.ListType): - req.write('
')
-                PrettyPrint.prettyprint(val, out=req)
-                req.write('
') - else: - req.write(str(val)) - req.write('') - if dfr: - req.finish() - - def _perform_err(self, err, op, req, dfr=0): - """Error callback to complete a request. - May be called from a Deferred. - - @param err: the error - @param req: request causing the error - @param dfr: deferred flag - """ - if not (isinstance(err, ArgError) or - isinstance(err, sxp.ParseError) or - isinstance(err, XendError)): - if dfr: - return err - else: - raise - #log.exception("op=%s: %s", op, str(err)) - if self.use_sxp(req): - req.setHeader("Content-Type", sxp.mime_type) - sxp.show(['xend.err', str(err)], out=req) - else: - req.setHeader("Content-Type", "text/plain") - req.write('Error ') - req.write(': ') - req.write(str(err)) - if dfr: - req.finish() - + return op_method(op, req) def print_path(self, req): """Print the path with hyperlinks. """ - pathlist = [x for x in req.prepath if x != '' ] - s = "/" - req.write('

/') - for x in pathlist: - s += x + "/" - req.write(' %s/' % (s, x)) - req.write("

") + req.printPath() diff -r 40af907d69a9 -r d781b9d08e80 tools/python/xen/web/SrvDir.py --- a/tools/python/xen/web/SrvDir.py Thu Apr 21 14:11:29 2005 +0000 +++ b/tools/python/xen/web/SrvDir.py Fri Apr 22 15:30:43 2005 +0000 @@ -47,9 +47,6 @@ class SrvDir(SrvBase): self.table = {} self.order = [] - def __repr__(self): - return "" %(id(self), self.table.keys()) - def noChild(self, msg): return resource.ErrorPage(http.NOT_FOUND, msg=msg) diff -r 40af907d69a9 -r d781b9d08e80 tools/python/xen/web/connection.py --- a/tools/python/xen/web/connection.py Thu Apr 21 14:11:29 2005 +0000 +++ b/tools/python/xen/web/connection.py Fri Apr 22 15:30:43 2005 +0000 @@ -72,6 +72,8 @@ class SocketServerConnection: return True def dataReceived(self, data): + if not self.connected: + return True if not self.protocol: return True try: @@ -79,7 +81,7 @@ class SocketServerConnection: except SystemExit: raise except Exception, ex: - self.disconnect(ex) + self.loseConnection(ex) return True return False @@ -261,7 +263,7 @@ class SocketClientConnection: except SystemExit: raise except Exception, ex: - self.disconnect(ex) + self.loseConnection(ex) def mainLoop(self): # Something a protocol could call. @@ -282,7 +284,7 @@ class SocketClientConnection: if ex.args[0] in (EWOULDBLOCK, EAGAIN, EINTR): return False else: - self.disconnect(ex) + self.loseConnection(ex) return True def read(self): @@ -293,7 +295,7 @@ class SocketClientConnection: if ex.args[0] in (EWOULDBLOCK, EAGAIN, EINTR): return None else: - self.disconnect(ex) + self.loseConnection(ex) return True def dataReceived(self, data): @@ -304,11 +306,11 @@ class SocketClientConnection: except SystemExit: raise except Exception, ex: - self.disconnect(ex) + self.loseConnection(ex) return True return False - def disconnect(self, reason=None): + def loseConnection(self, reason=None): self.thread = None self.closeSocket(reason) self.closeProtocol(reason) @@ -350,6 +352,8 @@ class SocketConnector: def __init__(self, factory): self.factoryStarted = False + self.clientLost = False + self.clientFailed = False self.factory = factory self.state = "disconnected" self.transport = None @@ -364,11 +368,14 @@ class SocketConnector: if self.state != "disconnected": raise socket.error(EINVAL, "cannot connect in state " + self.state) self.state = "connecting" + self.clientLost = False + self.clientFailed = False if not self.factoryStarted: self.factoryStarted = True self.factory.doStart() - self.factory.startedConnecting() + self.factory.startedConnecting(self) self.connectTransport() + self.state = "connected" def stopConnecting(self): if self.state != "connecting": @@ -380,8 +387,12 @@ class SocketConnector: return self.factory.buildProtocol(addr) def connectionLost(self, reason=None): - self.factory.doStop() + if not self.clientLost: + self.clientLost = True + self.factory.clientConnectionLost(self, reason) def connectionFailed(self, reason=None): - self.factory.doStop() + if not self.clientFailed: + self.clientFailed = True + self.factory.clientConnectionFailed(self, reason) diff -r 40af907d69a9 -r d781b9d08e80 tools/python/xen/web/http.py --- a/tools/python/xen/web/http.py Thu Apr 21 14:11:29 2005 +0000 +++ b/tools/python/xen/web/http.py Fri Apr 22 15:30:43 2005 +0000 @@ -282,7 +282,6 @@ class HttpRequest: header_count += 1 if line == '\r\n' or line == '\n' or line == '': break - #print 'parseRequestHeaders>', header_bytes header_input = StringIO(header_bytes) self.request_headers = Message(header_input) @@ -329,7 +328,6 @@ class HttpRequest: self.content.seek(0,0) def parseRequest(self): - #print 'parseRequest>' self.request_line = self.rin.readline() self.parseRequestLine() self.parseRequestHeaders() @@ -338,7 +336,6 @@ class HttpRequest: self.setCloseConnection(connection_mode) self.readContent() self.parseRequestArgs() - #print 'parseRequest<' def setCloseConnection(self, mode): if not mode: return @@ -347,8 +344,10 @@ class HttpRequest: self.close_connection = True elif (mode == 'keep-alive') and (self.http_version >= (1, 1)): self.close_connection = False - #print 'setCloseConnection>', mode, self.close_connection + def getCloseConnection(self): + return self.close_connection + def getHeader(self, k, v=None): return self.request_headers.get(k, v) @@ -365,7 +364,6 @@ class HttpRequest: self.response_status = status def setResponseHeader(self, k, v): - #print 'setResponseHeader>', k, v k = k.lower() self.response_headers[k] = v if k == 'connection': @@ -432,7 +430,6 @@ class HttpRequest: self.send("\r\n") def sendResponse(self): - #print 'sendResponse>' if self.response_sent: return self.response_sent = True @@ -443,7 +440,6 @@ class HttpRequest: self.output.seek(0, 0) body = self.output.getvalue() body_length = len(body) - #print 'sendResponse> body=', body_length, body self.setResponseHeader("Content-Length", body_length) if self.http_version > (0, 9): self.send("%s %d %s\r\n" % (self.http_version_string, @@ -451,17 +447,19 @@ class HttpRequest: self.response_status)) self.sendResponseHeaders() if send_body: - #print 'sendResponse> writing body' self.send(body) + self.flush() def write(self, data): - #print 'write>', data self.output.write(data) def send(self, data): - #print 'send>', len(data), '|%s|' % data + #print 'send>', data self.out.write(data) + def flush(self): + self.out.flush() + def hasNoBody(self): return ((self.request_method == "HEAD") or (self.response_code in NO_BODY_CODES) or diff -r 40af907d69a9 -r d781b9d08e80 tools/python/xen/web/httpserver.py --- a/tools/python/xen/web/httpserver.py Thu Apr 21 14:11:29 2005 +0000 +++ b/tools/python/xen/web/httpserver.py Fri Apr 22 15:30:43 2005 +0000 @@ -1,27 +1,137 @@ +import threading + import string import socket +import types from urllib import quote, unquote +from xen.xend import sxp +from xen.xend.Args import ArgError +from xen.xend.XendError import XendError + import http +from resource import Resource, ErrorPage from SrvDir import SrvDir -class HttpServerRequest(http.HttpRequest): +class ThreadRequest: + """A request to complete processing using a thread. + """ + + def __init__(self, processor, req, fn, args, kwds): + self.processor = processor + self.req = req + self.fn = fn + self.args = args + self.kwds = kwds + + def run(self): + self.processor.setInThread() + thread = threading.Thread(target=self.main) + thread.setDaemon(True) + thread.start() + + def call(self): + try: + self.fn(*self.args, **self.kwds) + except SystemExit: + raise + except Exception, ex: + self.req.resultErr(ex) + self.req.finish() + + def main(self): + self.call() + self.processor.process() + + +class RequestProcessor: + """Processor for requests on a connection to an http server. + Requests are executed synchonously unless they ask for a thread by returning + a ThreadRequest. + """ + + done = False + + inThread = False - def __init__(self, server, addr, srd, srw): - #print 'HttpServerRequest>', addr + def __init__(self, server, sock, addr): self.server = server + self.sock = sock + self.srd = sock.makefile('rb') + self.srw = sock.makefile('wb') + self.srvaddr = server.getServerAddr() + + def isInThread(self): + return self.inThread + + def setInThread(self): + self.inThread = True + + def getServer(self): + return self.server + + def getRequest(self): + return HttpServerRequest(self, self.srvaddr, self.srd, self.srw) + + def close(self): + try: + self.sock.close() + except: + pass + + def finish(self): + self.done = True + self.close() + + def process(self): + while not self.done: + req = self.getRequest() + res = req.process() + if isinstance(res, ThreadRequest): + if self.isInThread(): + res.call() + else: + res.run() + break + else: + req.finish() + +class HttpServerRequest(http.HttpRequest): + """A single request to an http server. + """ + + def __init__(self, processor, addr, srd, srw): + self.processor = processor self.prepath = '' http.HttpRequest.__init__(self, addr, srd, srw) + def getServer(self): + return self.processor.getServer() + def process(self): - #print 'HttpServerRequest>process', 'path=', self.request_path - self.prepath = [] - self.postpath = map(unquote, string.split(self.request_path[1:], '/')) - res = self.getResource() - self.render(res) + """Process the request. If the return value is a ThreadRequest + it is evaluated in a thread. + """ + try: + self.prepath = [] + self.postpath = map(unquote, string.split(self.request_path[1:], '/')) + resource = self.getResource() + return self.render(resource) + except SystemExit: + raise + except Exception, ex: + self.processError(ex) + + def processError(self, ex): + import traceback; traceback.print_exc() + self.sendError(http.INTERNAL_SERVER_ERROR, msg=str(ex)) + self.setCloseConnection('close') + + def finish(self): self.sendResponse() - return self.close_connection - + if self.close_connection: + self.processor.finish() + def prePathURL(self): url_host = self.getRequestHostname() port = self.getPort() @@ -37,25 +147,111 @@ class HttpServerRequest(http.HttpRequest return ('%s://%s/%s' % (url_proto, url_host, url_path)) def getResource(self): - return self.server.getResource(self) + return self.getServer().getResource(self) - def render(self, res): - #print 'HttpServerRequest>render', res - if res is None: + def render(self, resource): + val = None + if resource is None: self.sendError(http.NOT_FOUND) else: - res.render(self) + try: + while True: + val = resource.render(self) + if not isinstance(val, Resource): + break + val = self.result(val) + except SystemExit: + raise + except Exception, ex: + self.resultErr(ex) + return val + + def threadRequest(self, _fn, *_args, **_kwds): + """Create a request to finish request processing in a thread. + Use this to create a ThreadRequest to return from rendering a + resource if you need a thread to complete processing. + """ + return ThreadRequest(self.processor, self, _fn, _args, _kwds) + + def result(self, val): + if isinstance(val, Exception): + return self.resultErr(val) + else: + return self.resultVal(val) + + def resultVal(self, val): + """Callback to complete the request. + @param val: the value + """ + if isinstance(val, ThreadRequest): + return val + elif self.useSxp(): + self.setHeader("Content-Type", sxp.mime_type) + sxp.show(val, out=self) + else: + self.write('') + self.printPath() + if isinstance(val, types.ListType): + self.write('
')
+                PrettyPrint.prettyprint(val, out=self)
+                self.write('
') + else: + self.write(str(val)) + self.write('') + return None + + def resultErr(self, err): + """Error callback to complete a request. + + @param err: the error + """ + if not isinstance(err, (ArgError, sxp.ParseError, XendError)): + raise + #log.exception("op=%s: %s", op, str(err)) + if self.useSxp(): + self.setHeader("Content-Type", sxp.mime_type) + sxp.show(['xend.err', str(err)], out=self) + else: + self.setHeader("Content-Type", "text/plain") + self.write('Error ') + self.write(': ') + self.write(str(err)) + return None + + def useSxp(self): + """Determine whether to send an SXP response to a request. + Uses SXP if there is no User-Agent, no Accept, or application/sxp is in Accept. + + returns 1 for SXP, 0 otherwise + """ + ok = 0 + user_agent = self.getHeader('User-Agent') + accept = self.getHeader('Accept') + if (not user_agent) or (not accept) or (accept.find(sxp.mime_type) >= 0): + ok = 1 + return ok + + def printPath(self): + pathlist = [x for x in self.prepath if x != '' ] + s = "/" + self.write('

/') + for x in pathlist: + s += x + "/" + self.write(' %s/' % (s, x)) + self.write("

") + class HttpServer: - request_queue_size = 5 + backlog = 5 + + closed = False def __init__(self, interface='', port=8080, root=None): if root is None: root = SrvDir() self.interface = interface self.port = port - self.closed = False self.root = root def getRoot(self): @@ -73,13 +269,12 @@ class HttpServer: self.close() def bind(self): - #print 'bind>', self.interface, self.port self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.socket.bind((self.interface, self.port)) def listen(self): - self.socket.listen(self.request_queue_size) + self.socket.listen(self.backlog) def accept(self): return self.socket.accept() @@ -96,33 +291,27 @@ class HttpServer: pass def acceptRequest(self): - #print 'acceptRequest>' try: (sock, addr) = self.accept() - #print 'acceptRequest>', sock, addr self.processRequest(sock, addr) except socket.error: return def processRequest(self, sock, addr): - #print 'processRequest>', sock, addr - srd = sock.makefile('rb') - srw = sock.makefile('wb') - srvaddr = (socket.gethostname(), self.port) - while True: - #print 'HttpServerRequest...' - req = HttpServerRequest(self, srvaddr, srd, srw) - close = req.process() - srw.flush() - #print 'HttpServerRequest close=', close - if close: - break try: - #print 'close...' - sock.close() - except: - pass - #print 'processRequest<', sock, addr + rp = RequestProcessor(self, sock, addr) + rp.process() + except SystemExit: + raise + except Exception, ex: + print 'HttpServer>processRequest> exception: ', ex + try: + sock.close() + except: + pass + + def getServerAddr(self): + return (socket.gethostname(), self.port) def getResource(self, req): return self.root.getRequestResource(req) diff -r 40af907d69a9 -r d781b9d08e80 tools/python/xen/web/protocol.py --- a/tools/python/xen/web/protocol.py Thu Apr 21 14:11:29 2005 +0000 +++ b/tools/python/xen/web/protocol.py Fri Apr 22 15:30:43 2005 +0000 @@ -1,29 +1,52 @@ class Factory: + """Generic protocol factory. + """ + + starts = 0 def __init__(self): pass - def startedConnecting(self): - print 'ServerProtocolFactory>startedConnecting>' - pass - def doStart(self): - print 'ServerProtocolFactory>doStart>' - pass + if self.starts == 0: + self.startFactory() + self.starts += 1 def doStop(self): - print 'ServerProtocolFactory>doStop>' - pass + if self.starts > 0: + self.starts -= 1 + else: + return + if self.starts == 0: + self.stopFactory() def buildProtocol(self, addr): - print 'ServerProtocolFactory>buildProtocol>', addr return Protocol(self) + def startFactory(self): + pass + + def stopFactory(self): + pass + class ServerFactory(Factory): + """Factory for server protocols. + """ pass class ClientFactory(Factory): - pass + """Factory for client protocols. + """ + + def startedConnecting(self, connector): + pass + + def clientConnectionLost(self, connector, reason): + pass + + def clientConnectionFailed(self, connector, reason): + pass + class Protocol: @@ -65,23 +88,32 @@ class Protocol: else: return None -class TestClientFactory(Factory): +class TestClientFactory(ClientFactory): def buildProtocol(self, addr): - print 'TestClientProtocolFactory>buildProtocol>', addr + print 'TestClientFactory>buildProtocol>', addr return TestClientProtocol(self) + def startedConnecting(self, connector): + print 'TestClientFactory>startedConnecting>', connector + + def clientConnectionLost(self, connector, reason): + print 'TestClientFactory>clientConnectionLost>', connector, reason + + def clientConnectionFailed(self, connector, reason): + print 'TestClientFactory>clientConnectionFailed>', connector, reason + class TestClientProtocol(Protocol): def connectionMade(self, addr): - print 'TestProtocol>connectionMade>', addr + print 'TestClientProtocol>connectionMade>', addr self.write("hello") self.write("there") class TestServerFactory(Factory): def buildProtocol(self, addr): - print 'TestServerProtocolFactory>buildProtocol>', addr + print 'TestServerFactory>buildProtocol>', addr return TestServerProtocol(self) class TestServerProtocol(Protocol): diff -r 40af907d69a9 -r d781b9d08e80 tools/python/xen/xend/XendDomain.py --- a/tools/python/xen/xend/XendDomain.py Thu Apr 21 14:11:29 2005 +0000 +++ b/tools/python/xen/xend/XendDomain.py Fri Apr 22 15:30:43 2005 +0000 @@ -369,7 +369,7 @@ class XendDomain: @param id: domain id """ - dominfo = xen_domain(id) + dominfo = self.xen_domain(id) if dominfo: d = self.domain_by_id.get(id) if d: @@ -454,7 +454,6 @@ class XendDomain: @param src: source file @param progress: output progress if true - @return: deferred """ xmigrate = XendMigrate.instance() return xmigrate.restore_begin(src) @@ -667,7 +666,6 @@ class XendDomain: """Start domain migration. @param id: domain id - @return: deferred """ # Need a cancel too? # Don't forget to cancel restart for it. @@ -681,7 +679,6 @@ class XendDomain: @param id: domain id @param dst: destination file @param progress: output progress if true - @return: deferred """ dominfo = self.domain_lookup(id) xmigrate = XendMigrate.instance() diff -r 40af907d69a9 -r d781b9d08e80 tools/python/xen/xend/XendMigrate.py --- a/tools/python/xen/xend/XendMigrate.py Thu Apr 21 14:11:29 2005 +0000 +++ b/tools/python/xen/xend/XendMigrate.py Fri Apr 22 15:30:43 2005 +0000 @@ -1,6 +1,7 @@ # Copyright (C) 2004 Mike Wray import traceback +import threading import errno import sys @@ -8,12 +9,8 @@ import socket import time import types -from twisted.internet import reactor -from twisted.internet import defer -#defer.Deferred.debug = 1 -from twisted.internet.protocol import Protocol -from twisted.internet.protocol import ClientFactory -from twisted.python.failure import Failure +from xen.web import reactor +from xen.web.protocol import Protocol, ClientFactory import sxp import XendDB @@ -37,11 +34,13 @@ class Xfrd(Protocol): self.parser = sxp.Parser() self.xinfo = xinfo - def connectionMade(self): + def connectionMade(self, addr=None): # Send hello. self.request(['xfr.hello', XFR_PROTO_MAJOR, XFR_PROTO_MINOR]) # Send request. self.xinfo.request(self) + # Run the transport mainLoop which reads from the peer. + self.transport.mainLoop() def request(self, req): sxp.show(req, out=self.transport) @@ -60,7 +59,6 @@ class Xfrd(Protocol): if self.parser.at_eof(): self.loseConnection() - class XfrdClientFactory(ClientFactory): """Factory for clients of the migration/save daemon xfrd. """ @@ -68,7 +66,33 @@ class XfrdClientFactory(ClientFactory): def __init__(self, xinfo): #ClientFactory.__init__(self) self.xinfo = xinfo + self.readyCond = threading.Condition() + self.ready = False + self.err = None + def start(self): + print 'XfrdClientFactory>start>' + reactor.connectTCP('localhost', XFRD_PORT, self) + try: + self.readyCond.acquire() + while not self.ready: + self.readyCond.wait() + finally: + self.readyCond.release() + print 'XfrdClientFactory>start>', 'err=', self.err + if self.err: + raise self.err + return 0 + + def notifyReady(self): + try: + self.readyCond.acquire() + self.ready = True + self.err = self.xinfo.error_summary() + self.readyCond.notify() + finally: + self.readyCond.release() + def startedConnecting(self, connector): pass @@ -76,10 +100,72 @@ class XfrdClientFactory(ClientFactory): return Xfrd(self.xinfo) def clientConnectionLost(self, connector, reason): - pass + print "XfrdClientFactory>clientConnectionLost>", reason + self.notifyReady() def clientConnectionFailed(self, connector, reason): + print "XfrdClientFactory>clientConnectionFailed>", reason self.xinfo.error(reason) + self.notifyReady() + +class SuspendHandler: + + def __init__(self, xinfo, vmid, timeout): + self.xinfo = xinfo + self.vmid = vmid + self.timeout = timeout + self.readyCond = threading.Condition() + self.ready = False + self.err = None + + def start(self): + self.subscribe(on=True) + timer = reactor.callLater(self.timeout, self.onTimeout) + try: + self.readyCond.acquire() + while not self.ready: + self.readyCond.wait() + finally: + self.readyCond.release() + self.subscribe(on=False) + timer.cancel() + if self.err: + raise XendError(self.err) + + def notifyReady(self, err=None): + try: + self.readyCond.acquire() + if not self.ready: + self.ready = True + self.err = err + self.readyCond.notify() + finally: + self.readyCond.release() + + def subscribe(self, on=True): + # Subscribe to 'suspended' events so we can tell when the + # suspend completes. Subscribe to 'died' events so we can tell if + # the domain died. + if on: + action = eserver.subscribe + else: + action = eserver.unsubscribe + action('xend.domain.suspended', self.onSuspended) + action('xend.domain.died', self.onDied) + + def onSuspended(self, e, v): + if v[1] != self.vmid: return + print 'SuspendHandler>onSuspended>', e, v + self.notifyReady() + + def onDied(self, e, v): + if v[1] != self.vmid: return + print 'SuspendHandler>onDied>', e, v + self.notifyReady('Domain %s died while suspending' % self.vmid) + + def onTimeout(self): + print 'SuspendHandler>onTimeout>' + self.notifyReady('Domain %s suspend timed out' % self.vmid) class XfrdInfo: """Abstract class for info about a session with xfrd. @@ -88,18 +174,17 @@ class XfrdInfo: """Suspend timeout (seconds). We set a timeout because suspending a domain can hang.""" - timeout = 10 + timeout = 30 def __init__(self): from xen.xend import XendDomain self.xd = XendDomain.instance() - self.deferred = defer.Deferred() self.suspended = {} self.paused = {} self.state = 'init' # List of errors encountered. self.errors = [] - + def vmconfig(self): dominfo = self.xd.domain_get(self.src_dom) if dominfo: @@ -110,11 +195,10 @@ class XfrdInfo: def add_error(self, err): """Add an error to the error list. - Returns the error added (which may have been unwrapped if it - was a Twisted Failure). + Returns the error added. """ - while isinstance(err, Failure): - err = err.value + #while isinstance(err, Failure): + # err = err.value if err not in self.errors: self.errors.append(err) return err @@ -122,6 +206,8 @@ class XfrdInfo: def error_summary(self, msg=None): """Get a XendError summarising the errors (if any). """ + if not self.errors: + return None if msg is None: msg = "errors" if self.errors: @@ -136,34 +222,27 @@ class XfrdInfo: return self.errors def error(self, err): + print 'XfrdInfo>error>', err self.state = 'error' self.add_error(err) - if not self.deferred.called: - self.deferred.errback(self.error_summary()) def dispatch(self, xfrd, val): - - def cbok(v): - if v is None: return - sxp.show(v, out=xfrd.transport) - - def cberr(err): - v = ['xfr.err', errno.EINVAL] - sxp.show(v, out=xfrd.transport) - self.error(err) - + print 'XfrdInfo>dispatch>', val op = sxp.name(val) op = op.replace('.', '_') if op.startswith('xfr_'): fn = getattr(self, op, self.unknown) else: fn = self.unknown - val = fn(xfrd, val) - if isinstance(val, defer.Deferred): - val.addCallback(cbok) - val.addErrback(cberr) - else: - cbok(val) + try: + val = fn(xfrd, val) + if val: + sxp.show(val, out=xfrd.transport) + except Exception, err: + print 'XfrdInfo>dispatch> error:', err + val = ['xfr.err', errno.EINVAL] + sxp.show(val, out=xfrd.transport) + self.error(err) def unknown(self, xfrd, val): xfrd.loseConnection() @@ -172,6 +251,7 @@ class XfrdInfo: def xfr_err(self, xfrd, val): # If we get an error with non-zero code the operation failed. # An error with code zero indicates hello success. + print 'XfrdInfo>xfr_err>', val v = sxp.child0(val) err = int(sxp.child0(val)) if not err: return @@ -220,50 +300,19 @@ class XfrdInfo: return ['xfr.err', val] def xfr_vm_suspend(self, xfrd, val): - """Suspend a domain. Suspending takes time, so we return - a Deferred that is called when the suspend completes. + """Suspend a domain. Suspending can hang, so we set a timeout and fail if it takes too long. """ try: vmid = sxp.child0(val) - d = defer.Deferred() - # Subscribe to 'suspended' events so we can tell when the - # suspend completes. Subscribe to 'died' events so we can tell if - # the domain died. Set a timeout and error handler so the subscriptions - # will be cleaned up if suspending hangs or there is an error. - def onSuspended(e, v): - if v[1] != vmid: return - subscribe(on=0) - if not d.called: - d.callback(v) - - def onDied(e, v): - if v[1] != vmid: return - if not d.called: - d.errback(XendError('Domain %s died while suspending' % vmid)) - - def subscribe(on=1): - if on: - action = eserver.subscribe - else: - action = eserver.unsubscribe - action('xend.domain.suspended', onSuspended) - action('xend.domain.died', onDied) - - def cberr(err): - subscribe(on=0) - self.add_error("suspend failed") - self.add_error(err) - return err - - d.addErrback(cberr) - d.setTimeout(self.timeout) - subscribe() + h = SuspendHandler(self, vmid, self.timeout) val = self.xd.domain_shutdown(vmid, reason='suspend') self.suspended[vmid] = 1 - return d + h.start() + print 'xfr_vm_suspend> suspended', vmid except Exception, err: + print 'xfr_vm_suspend> err', err self.add_error("suspend failed") self.add_error(err) traceback.print_exc() @@ -271,6 +320,7 @@ class XfrdInfo: return ['xfr.err', val] def connectionLost(self, reason=None): + print 'XfrdInfo>connectionLost>', reason for vmid in self.suspended: try: self.xd.domain_destroy(vmid) @@ -336,10 +386,11 @@ class XendMigrateInfo(XfrdInfo): self.state = 'ok' self.dst_dom = dom self.xd.domain_destroy(self.src_dom) - if not self.deferred.called: - self.deferred.callback(self) + #if not self.deferred.called: + # self.deferred.callback(self) def connectionLost(self, reason=None): + print 'XendMigrateInfo>connectionLost>', reason XfrdInfo.connectionLost(self, reason) if self.state =='ok': log.info('Migrate OK: ' + str(self.sxpr())) @@ -386,10 +437,11 @@ class XendSaveInfo(XfrdInfo): def xfr_save_ok(self, xfrd, val): self.state = 'ok' self.xd.domain_destroy(self.src_dom) - if not self.deferred.called: - self.deferred.callback(self) + #if not self.deferred.called: + # self.deferred.callback(self) def connectionLost(self, reason=None): + print 'XendSaveInfo>connectionLost>', reason XfrdInfo.connectionLost(self, reason) if self.state =='ok': log.info('Save OK: ' + str(self.sxpr())) @@ -493,29 +545,17 @@ class XendMigrate: def session_begin(self, info): """Add the session to the table and start it. - Set up callbacks to remove the session from the table - when it finishes. + Remove the session from the table when it finishes. @param info: session @return: deferred """ - dfr = defer.Deferred() - def cbok(val): - self._delete_session(info.xid) - if not dfr.called: - dfr.callback(val) - return val - def cberr(err): + self._add_session(info) + try: + xcf = XfrdClientFactory(info) + return xcf.start() + finally: self._delete_session(info.xid) - if not dfr.called: - dfr.errback(err) - return err - self._add_session(info) - info.deferred.addCallback(cbok) - info.deferred.addErrback(cberr) - xcf = XfrdClientFactory(info) - reactor.connectTCP('localhost', XFRD_PORT, xcf) - return dfr def migrate_begin(self, dominfo, host, port=XFRD_PORT, live=0, resource=0): """Begin to migrate a domain to another host. diff -r 40af907d69a9 -r d781b9d08e80 tools/python/xen/xend/server/SrvConsole.py --- a/tools/python/xen/xend/server/SrvConsole.py Thu Apr 21 14:11:29 2005 +0000 +++ b/tools/python/xen/xend/server/SrvConsole.py Fri Apr 22 15:30:43 2005 +0000 @@ -21,22 +21,18 @@ class SrvConsole(SrvDir): return self.perform(req) def render_GET(self, req): - try: - if self.use_sxp(req): - req.setHeader("Content-Type", sxp.mime_type) - sxp.show(self.info.sxpr(), out=req) - else: - req.write('') - self.print_path(req) - #self.ls() - req.write('

%s

' % self.info) - req.write('

Connect to domain %d

' - % (self.info.uri(), self.info.dom)) - self.form(req) - req.write('') - return '' - except Exception, ex: - self._perform_err(ex, req) + if self.use_sxp(req): + req.setHeader("Content-Type", sxp.mime_type) + sxp.show(self.info.sxpr(), out=req) + else: + req.write('') + self.print_path(req) + #self.ls() + req.write('

%s

' % self.info) + req.write('

Connect to domain %d

' + % (self.info.uri(), self.info.dom)) + self.form(req) + req.write('') def form(self, req): req.write('
' % req.prePathURL()) diff -r 40af907d69a9 -r d781b9d08e80 tools/python/xen/xend/server/SrvConsoleDir.py --- a/tools/python/xen/xend/server/SrvConsoleDir.py Thu Apr 21 14:11:29 2005 +0000 +++ b/tools/python/xen/xend/server/SrvConsoleDir.py Fri Apr 22 15:30:43 2005 +0000 @@ -31,20 +31,16 @@ class SrvConsoleDir(SrvDir): return v def render_GET(self, req): - try: - if self.use_sxp(req): - req.setHeader("Content-Type", sxp.mime_type) - self.ls_console(req, 1) - else: - req.write("") - self.print_path(req) - self.ls(req) - self.ls_console(req) - #self.form(req.wfile) - req.write("") - return '' - except Exception, ex: - self._perform_err(ex, req) + if self.use_sxp(req): + req.setHeader("Content-Type", sxp.mime_type) + self.ls_console(req, 1) + else: + req.write("") + self.print_path(req) + self.ls(req) + self.ls_console(req) + #self.form(req.wfile) + req.write("") def ls_console(self, req, use_sxp=0): url = req.prePathURL() diff -r 40af907d69a9 -r d781b9d08e80 tools/python/xen/xend/server/SrvDaemon.py --- a/tools/python/xen/xend/server/SrvDaemon.py Thu Apr 21 14:11:29 2005 +0000 +++ b/tools/python/xen/xend/server/SrvDaemon.py Fri Apr 22 15:30:43 2005 +0000 @@ -17,9 +17,6 @@ import StringIO import traceback import time -#from twisted.internet import pollreactor; pollreactor.install() -from twisted.internet import reactor - from xen.lowlevel import xu from xen.xend import sxp @@ -330,8 +327,6 @@ class Daemon: self.daemonize() print 'running serverthread...' serverthread.start() - print 'running reactor...' - reactor.run() except Exception, ex: print >>sys.stderr, 'Exception starting xend:', ex if DEBUG: @@ -356,7 +351,7 @@ class Daemon: self.channelF.start() def exit(self, rc=0): - reactor.disconnectAll() + #reactor.disconnectAll() self.channelF.stop() # Calling sys.exit() raises a SystemExit exception, which only # kills the current thread. Calling os._exit() makes the whole diff -r 40af907d69a9 -r d781b9d08e80 tools/python/xen/xend/server/SrvDmesg.py --- a/tools/python/xen/xend/server/SrvDmesg.py Thu Apr 21 14:11:29 2005 +0000 +++ b/tools/python/xen/xend/server/SrvDmesg.py Fri Apr 22 15:30:43 2005 +0000 @@ -19,19 +19,15 @@ class SrvDmesg(SrvDir): self.perform(req) def render_GET(self, req): - try: - if self.use_sxp(req): - req.setHeader("Content-Type", "text/plain") - req.write(self.info()) - else: - req.write('') - self.print_path(req) - req.write('
')
-                req.write(self.info())
-                req.write('
') - return '' - except Exception, ex: - self._perform_err(ex, req) + if self.use_sxp(req): + req.setHeader("Content-Type", "text/plain") + req.write(self.info()) + else: + req.write('') + self.print_path(req) + req.write('
')
+            req.write(self.info())
+            req.write('
') def info(self): return self.xd.info() diff -r 40af907d69a9 -r d781b9d08e80 tools/python/xen/xend/server/SrvDomain.py --- a/tools/python/xen/xend/server/SrvDomain.py Thu Apr 21 14:11:29 2005 +0000 +++ b/tools/python/xen/xend/server/SrvDomain.py Fri Apr 22 15:30:43 2005 +0000 @@ -26,24 +26,27 @@ class SrvDomain(SrvDir): not a domain name. """ fn = FormFn(self.xd.domain_configure, - [['dom', 'int'], + [['dom', 'int'], ['config', 'sxpr']]) - deferred = fn(req.args, {'dom': self.dom.dom}) - return deferred + return fn(req.args, {'dom': self.dom.dom}) def op_unpause(self, op, req): val = self.xd.domain_unpause(self.dom.name) return val def op_pause(self, op, req): + # Pause doesn't need a thread, but request one for testing. + return req.threadRequest(self.do_pause, op, req) + + def do_pause(self, op, req): val = self.xd.domain_pause(self.dom.name) return val def op_shutdown(self, op, req): fn = FormFn(self.xd.domain_shutdown, - [['dom', 'str'], + [['dom', 'str'], ['reason', 'str'], - ['key', 'int']]) + ['key', 'int']]) val = fn(req.args, {'dom': self.dom.id}) req.setResponseCode(http.ACCEPTED) req.setHeader("Location", "%s/.." % req.prePathURL()) @@ -51,42 +54,39 @@ class SrvDomain(SrvDir): def op_destroy(self, op, req): fn = FormFn(self.xd.domain_destroy, - [['dom', 'str'], + [['dom', 'str'], ['reason', 'str']]) val = fn(req.args, {'dom': self.dom.id}) req.setHeader("Location", "%s/.." % req.prePathURL()) return val def op_save(self, op, req): + return req.threadRequest(self.do_save, op, req) + + def do_save(self, op, req): fn = FormFn(self.xd.domain_save, - [['dom', 'str'], + [['dom', 'str'], ['file', 'str']]) - deferred = fn(req.args, {'dom': self.dom.id}) - deferred.addCallback(self._op_save_cb, req) - return deferred - - def _op_save_cb(self, val, req): + val = fn(req.args, {'dom': self.dom.id}) return 0 def op_migrate(self, op, req): + return req.threadRequest(self.do_migrate, op, req) + + def do_migrate(self, op, req): fn = FormFn(self.xd.domain_migrate, - [['dom', 'str'], + [['dom', 'str'], ['destination', 'str'], - ['live', 'int'], - ['resource', 'int']]) - deferred = fn(req.args, {'dom': self.dom.id}) - deferred.addCallback(self._op_migrate_cb, req) - return deferred - - def _op_migrate_cb(self, info, req): - print '_op_migrate_cb>', info, req + ['live', 'int'], + ['resource', 'int']]) + info = fn(req.args, {'dom': self.dom.id}) #req.setResponseCode(http.ACCEPTED) host = info.dst_host port = info.dst_port dom = info.dst_dom url = "http://%s:%d/xend/domain/%d" % (host, port, dom) req.setHeader("Location", url) - print '_op_migrate_cb> url=', url + print 'do_migrate> url=', url return url def op_pincpu(self, op, req): @@ -98,57 +98,57 @@ class SrvDomain(SrvDir): def op_cpu_bvt_set(self, op, req): fn = FormFn(self.xd.domain_cpu_bvt_set, - [['dom', 'str'], - ['mcuadv', 'int'], - ['warpback', 'int'], + [['dom', 'str'], + ['mcuadv', 'int'], + ['warpback', 'int'], ['warpvalue', 'int'], - ['warpl', 'long'], - ['warpu', 'long']]) + ['warpl', 'long'], + ['warpu', 'long']]) val = fn(req.args, {'dom': self.dom.id}) return val def op_maxmem_set(self, op, req): fn = FormFn(self.xd.domain_maxmem_set, - [['dom', 'str'], + [['dom', 'str'], ['memory', 'int']]) val = fn(req.args, {'dom': self.dom.id}) return val def op_device_create(self, op, req): fn = FormFn(self.xd.domain_device_create, - [['dom', 'str'], + [['dom', 'str'], ['config', 'sxpr']]) - d = fn(req.args, {'dom': self.dom.id}) - return d + val = fn(req.args, {'dom': self.dom.id}) + return val def op_device_refresh(self, op, req): fn = FormFn(self.xd.domain_device_refresh, - [['dom', 'str'], + [['dom', 'str'], ['type', 'str'], - ['idx', 'str']]) + ['idx', 'str']]) val = fn(req.args, {'dom': self.dom.id}) return val def op_device_destroy(self, op, req): fn = FormFn(self.xd.domain_device_destroy, - [['dom', 'str'], + [['dom', 'str'], ['type', 'str'], - ['idx', 'str']]) + ['idx', 'str']]) val = fn(req.args, {'dom': self.dom.id}) return val def op_device_configure(self, op, req): fn = FormFn(self.xd.domain_device_configure, - [['dom', 'str'], + [['dom', 'str'], ['config', 'sxpr'], - ['idx', 'str']]) - d = fn(req.args, {'dom': self.dom.id}) - return d + ['idx', 'str']]) + val = fn(req.args, {'dom': self.dom.id}) + return val def op_vif_credit_limit(self, op, req): fn = FormFn(self.xd.domain_vif_credit_limit, - [['dom', 'str'], - ['vif', 'int'], + [['dom', 'str'], + ['vif', 'int'], ['credit', 'int'], ['period', 'int']]) val = fn(req.args, {'dom': self.dom.id}) @@ -178,7 +178,7 @@ class SrvDomain(SrvDir): def op_mem_target_set(self, op, req): fn = FormFn(self.xd.domain_mem_target_set, - [['dom', 'str'], + [['dom', 'str'], ['target', 'int']]) val = fn(req.args, {'dom': self.dom.id}) return val diff -r 40af907d69a9 -r d781b9d08e80 tools/python/xen/xend/server/SrvDomainDir.py --- a/tools/python/xen/xend/server/SrvDomainDir.py Thu Apr 21 14:11:29 2005 +0000 +++ b/tools/python/xen/xend/server/SrvDomainDir.py Fri Apr 22 15:30:43 2005 +0000 @@ -68,7 +68,7 @@ class SrvDomainDir(SrvDir): raise XendError("Error creating domain: " + str(ex)) def _op_create_cb(self, dominfo, configstring, req): - """Callback to handle deferred domain creation. + """Callback to handle domain creation. """ dom = dominfo.name domurl = "%s/%s" % (req.prePathURL(), dom) @@ -90,14 +90,13 @@ class SrvDomainDir(SrvDir): def op_restore(self, op, req): """Restore a domain from file. - @return: deferred """ + return req.threadRequest(self.do_restore, op, req) + + def do_restore(self, op, req): fn = FormFn(self.xd.domain_restore, [['file', 'str']]) dominfo = fn(req.args) - return self._op_restore_cb(dominfo, req) - - def _op_restore_cb(self, dominfo, req): dom = dominfo.name domurl = "%s/%s" % (req.prePathURL(), dom) req.setResponseCode(http.CREATED) @@ -116,20 +115,16 @@ class SrvDomainDir(SrvDir): return self.perform(req) def render_GET(self, req): - try: - if self.use_sxp(req): - req.setHeader("Content-Type", sxp.mime_type) - self.ls_domain(req, 1) - else: - req.write("") - self.print_path(req) - self.ls(req) - self.ls_domain(req) - self.form(req) - req.write("") - return '' - except Exception, ex: - self._perform_err(ex, req) + if self.use_sxp(req): + req.setHeader("Content-Type", sxp.mime_type) + self.ls_domain(req, 1) + else: + req.write("") + self.print_path(req) + self.ls(req) + self.ls_domain(req) + self.form(req) + req.write("") def ls_domain(self, req, use_sxp=0): url = req.prePathURL() diff -r 40af907d69a9 -r d781b9d08e80 tools/python/xen/xend/server/SrvNode.py --- a/tools/python/xen/xend/server/SrvNode.py Thu Apr 21 14:11:29 2005 +0000 +++ b/tools/python/xen/xend/server/SrvNode.py Fri Apr 22 15:30:43 2005 +0000 @@ -35,26 +35,22 @@ class SrvNode(SrvDir): return self.perform(req) def render_GET(self, req): - try: - if self.use_sxp(req): - req.setHeader("Content-Type", sxp.mime_type) - sxp.show(['node'] + self.info(), out=req) - else: - url = req.prePathURL() - if not url.endswith('/'): - url += '/' - req.write('') - self.print_path(req) - req.write('