debuggers.hg

view tools/python/xen/xend/server/console.py @ 4656:40af907d69a9

bitkeeper revision 1.1327.2.3 (4267b4918M714ImdecocSvKqAkVj1A)

Add some locking to console handling.
Remove a dead file.

Signed-off-by: Mike Wray <mike.wray@hp.com>
author mjw@wray-m-3.hpl.hp.com
date Thu Apr 21 14:11:29 2005 +0000 (2005-04-21)
parents a838a908e38e
children 369e382b2f81
line source
1 # Copyright (C) 2004 Mike Wray <mike.wray@hp.com>
3 import socket
4 import threading
6 from xen.web import reactor, protocol
8 from xen.lowlevel import xu
10 from xen.xend.XendError import XendError
11 from xen.xend import EventServer; eserver = EventServer.instance()
12 from xen.xend.XendLogging import log
13 from xen.xend import XendRoot; xroot = XendRoot.instance()
14 from xen.xend import sxp
16 from controller import CtrlMsgRcvr, Dev, DevController
17 from messages import *
18 from params import *
20 class ConsoleProtocol(protocol.Protocol):
21 """Asynchronous handler for a console TCP socket.
22 """
24 def __init__(self, console, id):
25 self.console = console
26 self.id = id
27 self.addr = None
29 def connectionMade(self, addr=None):
30 peer = self.transport.getPeer()
31 self.addr = addr
32 if self.console.connect(self.addr, self):
33 self.transport.write("Cannot connect to console %d on domain %d\n"
34 % (self.id, self.console.getDomain()))
35 self.loseConnection()
36 return
37 else:
38 log.info("Console connected %s %s %s",
39 self.id, str(self.addr[0]), str(self.addr[1]))
40 eserver.inject('xend.console.connect',
41 [self.id, self.addr[0], self.addr[1]])
43 def dataReceived(self, data):
44 if self.console.receiveInput(self, data):
45 self.loseConnection()
47 def write(self, data):
48 self.transport.write(data)
49 return len(data)
51 def connectionLost(self, reason=None):
52 print 'ConsoleProtocol>connectionLost>', reason
53 log.info("Console disconnected %s %s %s",
54 str(self.id), str(self.addr[0]), str(self.addr[1]))
55 eserver.inject('xend.console.disconnect',
56 [self.id, self.addr[0], self.addr[1]])
57 self.console.disconnect(conn=self)
59 def loseConnection(self):
60 self.transport.loseConnection()
62 class ConsoleFactory(protocol.ServerFactory):
63 """Asynchronous handler for a console server socket.
64 """
65 protocol = ConsoleProtocol
67 def __init__(self, console, id):
68 #protocol.ServerFactory.__init__(self)
69 self.console = console
70 self.id = id
72 def buildProtocol(self, addr):
73 proto = self.protocol(self.console, self.id)
74 proto.factory = self
75 return proto
77 class ConsoleDev(Dev):
78 """Console device for a domain.
79 Does not poll for i/o itself, but relies on the domain to post console
80 output and the connected TCP sockets to post console input.
81 """
83 STATUS_NEW = 'new'
84 STATUS_CLOSED = 'closed'
85 STATUS_CONNECTED = 'connected'
86 STATUS_LISTENING = 'listening'
88 def __init__(self, controller, id, config, recreate=False):
89 Dev.__init__(self, controller, id, config)
90 self.lock = threading.RLock()
91 self.status = self.STATUS_NEW
92 self.addr = None
93 self.conn = None
94 self.console_port = None
95 self.obuf = xu.buffer()
96 self.ibuf = xu.buffer()
97 self.channel = None
98 self.listener = None
100 console_port = sxp.child_value(self.config, "console_port")
101 if console_port is None:
102 console_port = xroot.get_console_port_base() + self.getDomain()
103 self.checkConsolePort(console_port)
104 self.console_port = console_port
106 log.info("Created console id=%d domain=%d port=%d",
107 self.id, self.getDomain(), self.console_port)
108 eserver.inject('xend.console.create',
109 [self.id, self.getDomain(), self.console_port])
111 def init(self, recreate=False, reboot=False):
112 try:
113 self.lock.acquire()
114 self.destroyed = False
115 self.channel = self.getChannel()
116 self.listen()
117 finally:
118 self.lock.release()
120 def checkConsolePort(self, console_port):
121 """Check that a console port is not in use by another console.
122 """
123 xd = XendRoot.get_component('xen.xend.XendDomain')
124 for vm in xd.domains():
125 ctrl = vm.getDeviceController(self.getType(), error=False)
126 if (not ctrl): continue
127 ctrl.checkConsolePort(console_port)
129 def sxpr(self):
130 try:
131 self.lock.acquire()
132 val = ['console',
133 ['status', self.status ],
134 ['id', self.id ],
135 ['domain', self.getDomain() ] ]
136 val.append(['local_port', self.getLocalPort() ])
137 val.append(['remote_port', self.getRemotePort() ])
138 val.append(['console_port', self.console_port ])
139 val.append(['index', self.getIndex()])
140 if self.addr:
141 val.append(['connected', self.addr[0], self.addr[1]])
142 finally:
143 self.lock.release()
144 return val
146 def getLocalPort(self):
147 try:
148 self.lock.acquire()
149 if self.channel:
150 return self.channel.getLocalPort()
151 else:
152 return 0
153 finally:
154 self.lock.release()
156 def getRemotePort(self):
157 try:
158 self.lock.acquire()
159 if self.channel:
160 return self.channel.getRemotePort()
161 else:
162 return 0
163 finally:
164 self.lock.release()
166 def uri(self):
167 """Get the uri to use to connect to the console.
168 This will be a telnet: uri.
170 return uri
171 """
172 host = socket.gethostname()
173 return "telnet://%s:%d" % (host, self.console_port)
175 def closed(self):
176 return self.status == self.STATUS_CLOSED
178 def connected(self):
179 return self.status == self.STATUS_CONNECTED
181 def destroy(self, change=False, reboot=False):
182 """Close the console.
183 """
184 print 'ConsoleDev>destroy>', self, reboot
185 if reboot:
186 return
187 try:
188 self.lock.acquire()
189 self.status = self.STATUS_CLOSED
190 if self.conn:
191 self.conn.loseConnection()
192 self.listener.stopListening()
193 finally:
194 self.lock.release()
196 def listen(self):
197 """Listen for TCP connections to the console port..
198 """
199 try:
200 self.lock.acquire()
201 if self.closed():
202 return
203 if self.listener:
204 pass
205 else:
206 self.status = self.STATUS_LISTENING
207 cf = ConsoleFactory(self, self.id)
208 interface = xroot.get_console_address()
209 self.listener = reactor.listenTCP(self.console_port, cf, interface=interface)
210 finally:
211 self.lock.release()
213 def connect(self, addr, conn):
214 """Connect a TCP connection to the console.
215 Fails if closed or already connected.
217 addr peer address
218 conn connection
220 returns 0 if ok, negative otherwise
221 """
222 try:
223 self.lock.acquire()
224 if self.closed():
225 return -1
226 if self.connected():
227 return -1
228 self.addr = addr
229 self.conn = conn
230 self.status = self.STATUS_CONNECTED
231 self.writeOutput()
232 finally:
233 self.lock.release()
234 return 0
236 def disconnect(self, conn=None):
237 """Disconnect the TCP connection to the console.
238 """
239 print 'ConsoleDev>disconnect>', conn
240 try:
241 self.lock.acquire()
242 if conn and conn != self.conn: return
243 if self.conn:
244 self.conn.loseConnection()
245 self.addr = None
246 self.conn = None
247 self.status = self.STATUS_LISTENING
248 self.listen()
249 finally:
250 self.lock.release()
252 def receiveOutput(self, msg):
253 """Receive output console data from the console channel.
255 msg console message
256 type major message type
257 subtype minor message typ
258 """
259 # Treat the obuf as a ring buffer.
260 try:
261 self.lock.acquire()
262 data = msg.get_payload()
263 data_n = len(data)
264 if self.obuf.space() < data_n:
265 self.obuf.discard(data_n)
266 if self.obuf.space() < data_n:
267 data = data[-self.obuf.space():]
268 self.obuf.write(data)
269 self.writeOutput()
270 finally:
271 self.lock.release()
273 def writeOutput(self):
274 """Handle buffered output from the console device.
275 Sends it to the connected TCP connection (if any).
276 """
277 try:
278 self.lock.acquire()
279 if self.closed():
280 return -1
281 if not self.conn:
282 return 0
283 while not self.obuf.empty():
284 try:
285 bytes = self.conn.write(self.obuf.peek())
286 if bytes > 0:
287 self.obuf.discard(bytes)
288 except socket.error:
289 pass
290 finally:
291 self.lock.release()
292 return 0
294 def receiveInput(self, conn, data):
295 """Receive console input from a TCP connection. Ignores the
296 input if the calling connection (conn) is not the one
297 connected to the console (self.conn).
299 conn connection
300 data input data
301 """
302 try:
303 self.lock.acquire()
304 if self.closed(): return -1
305 if conn != self.conn: return 0
306 self.ibuf.write(data)
307 self.writeInput()
308 finally:
309 self.lock.release()
310 return 0
312 def writeInput(self):
313 """Write pending console input to the console channel.
314 Writes as much to the channel as it can.
315 """
316 try:
317 self.lock.acquire()
318 while self.channel and not self.ibuf.empty():
319 msg = xu.message(CMSG_CONSOLE, 0, 0)
320 msg.append_payload(self.ibuf.read(msg.MAX_PAYLOAD))
321 self.channel.writeRequest(msg)
322 finally:
323 self.lock.release()
325 class ConsoleController(DevController):
326 """Device controller for all the consoles for a domain.
327 """
329 def __init__(self, dctype, vm, recreate=False):
330 DevController.__init__(self, dctype, vm, recreate=recreate)
331 self.rcvr = None
333 def initController(self, recreate=False, reboot=False):
334 self.destroyed = False
335 self.rcvr = CtrlMsgRcvr(self.getChannel())
336 self.rcvr.addHandler(CMSG_CONSOLE,
337 0,
338 self.receiveOutput)
339 self.rcvr.registerChannel()
340 if reboot:
341 self.rebootDevices()
343 def destroyController(self, reboot=False):
344 print 'ConsoleController>destroyController>', self, reboot
345 self.destroyed = True
346 self.destroyDevices(reboot=reboot)
347 self.rcvr.deregisterChannel()
349 def newDevice(self, id, config, recreate=False):
350 return ConsoleDev(self, id, config, recreate=recreate)
352 def checkConsolePort(self, console_port):
353 """Check that a console port is not in use by a console.
354 """
355 for c in self.getDevices():
356 if c.console_port == console_port:
357 raise XendError('console port in use: ' + str(console_port))
359 def receiveOutput(self, msg):
360 """Handle a control request.
361 The CMSG_CONSOLE messages just contain data, and no console id,
362 so just send to console 0 (if there is one).
364 todo: extend CMSG_CONSOLE to support more than one console?
365 """
366 console = self.getDevice(0)
367 if console:
368 console.receiveOutput(msg)
369 else:
370 log.warning('no console: domain %d', self.getDomain())