debuggers.hg

view tools/xfrd/xfrd.c @ 2628:98bdf2c88015

bitkeeper revision 1.1159.1.201 (41600e1fkVMoQU0dVgk1h6vT502hEg)

Merge
author iap10@labyrinth.cl.cam.ac.uk
date Sun Oct 03 14:35:11 2004 +0000 (2004-10-03)
parents 1bd15a9e5016 05ae99de2d3f
children e442cedb12d8
line source
1 /** @file
2 * XFRD - Domain Transfer Daemon for Xen.
3 *
4 * The xfrd is forked by xend to transfer a vm to a remote system.
5 *
6 * The vm is suspended, then its state and memory are transferred to the remote system.
7 * The remote system attempts to create a vm and copy the transferred state and memory into it,
8 * finally resuming the vm. If all is OK the vm ends up running on the remote
9 * system and is removed from the originating system. If the transfer does not complete
10 * successfully the originating system attempts to resume the vm.
11 * The children exit when the transfer completes.
12 *
13 * @author Mike Wray <mike.wray@hpl.hp.com>
14 */
16 #include <stdlib.h>
17 #include <unistd.h>
18 #include <stdio.h>
19 #include <getopt.h>
20 #include <errno.h>
21 #include <sys/types.h>
22 #include <sys/stat.h>
23 #include <fcntl.h>
24 #include <time.h>
25 #include <sys/socket.h>
26 #include <netinet/in.h>
27 #include <arpa/inet.h>
28 #include <string.h>
30 #include <signal.h>
31 #include <sys/wait.h>
32 #include <sys/select.h>
34 #include "allocate.h"
35 #include "file_stream.h"
36 #include "string_stream.h"
37 #include "lzi_stream.h"
38 #include "gzip_stream.h"
39 #include "sys_net.h"
40 #include "sys_string.h"
42 //#include "xdr.h"
43 #include "enum.h"
44 #include "xfrd.h"
46 #include "xen_domain.h"
48 #include "connection.h"
49 #include "select.h"
51 #define MODULE_NAME "XFRD"
52 #define DEBUG 1
53 #undef DEBUG
54 #include "debug.h"
56 /*
57 sender:
58 xend connects to xfrd and writes migrate message
59 xend writes domain config to xfrd
61 xfrd forks
63 xfrd connects to peer
64 xfrd sends hello, reads response
65 xfrd sends domain
66 xfrd reads response
67 reports progress/status to xend
69 xend reads xfrd for progress/status, disconnects
70 If ok, destroys domain.
71 If not ok, unpauses domain.
73 receiver:
74 xfrd accepts connection on inbound port
75 xfrd forks and accepts connection
76 xfrd receives hello, writes response
77 xfrd receives domain
78 xfrd connects to xend, configures new domain
79 xfrd writes status back to peer, child exits
82 (xfr.hello <major> <minor>)
83 (xfr.err <code> <reason>)
85 xend->xfrd (xfr.migrate <domain> <vmconfig> <host> <port> <live>)
86 (xfr.save <domain> <vmconfig> <file>)
87 xfrd->xend (xfr.suspend <domain>)
88 xfrd->xend (xfr.progress <percent> <rate: kb/s>)
89 xfrd->xend (xfr.err <code> <reason>) | (xfr.ok <domain>)
90 xfrd->xfrd (xfr.xfr <domain>)
91 xfrd->xfrd (xfr.err <code>) | (xfr.ok <domain>)
93 xfrd->xend (xfr.configure <domain> <vmconfig>)
94 */
96 Sxpr oxfr_configure; // (xfr.configure <vmid> <vmconfig>)
97 Sxpr oxfr_err; // (xfr.err <code>)
98 Sxpr oxfr_hello; // (xfr.hello <major> <minor>)
99 Sxpr oxfr_migrate; // (xfr.migrate <vmid> <vmconfig> <host> <port> <live>)
100 Sxpr oxfr_migrate_ok;// (xfr.migrate.ok <value>)
101 Sxpr oxfr_progress; // (xfr.progress <percent> <rate: kb/s>)
102 Sxpr oxfr_restore; // (xfr.restore <file>)
103 Sxpr oxfr_restore_ok;// (xfr.restore.ok <vmid>)
104 Sxpr oxfr_save; // (xfr.save <vmid> <vmconfig> <file>)
105 Sxpr oxfr_save_ok; // (xfr.save.ok)
106 Sxpr oxfr_vm_destroy;// (xfr.vm.destroy <vmid>)
107 Sxpr oxfr_vm_suspend;// (xfr.vm.suspend <vmid>)
108 Sxpr oxfr_xfr; // (xfr.xfr <vmid>)
109 Sxpr oxfr_xfr_ok; // (xfr.xfr.ok <vmid>)
111 void xfr_init(void){
112 oxfr_configure = intern("xfr.configure");
113 oxfr_err = intern("xfr.err");
114 oxfr_hello = intern("xfr.hello");
115 oxfr_migrate = intern("xfr.migrate");
116 oxfr_migrate_ok = intern("xfr.migrate.ok");
117 oxfr_progress = intern("xfr.progress");
118 oxfr_restore = intern("xfr.restore");
119 oxfr_restore_ok = intern("xfr.restore.ok");
120 oxfr_save = intern("xfr.save");
121 oxfr_save_ok = intern("xfr.save.ok");
122 oxfr_vm_destroy = intern("xfr.vm.destroy");
123 oxfr_vm_suspend = intern("xfr.vm.suspend");
124 oxfr_xfr = intern("xfr.xfr");
125 oxfr_xfr_ok = intern("xfr.xfr.ok");
126 }
128 #ifndef TRUE
129 #define TRUE 1
130 #endif
132 #ifndef FALSE
133 #define FALSE 0
134 #endif
136 #define PROGRAM "xfrd"
138 #define OPT_PORT 'P'
139 #define KEY_PORT "port"
140 #define DOC_PORT "<port>\n\txfr port (as a number or service name)"
142 #define OPT_COMPRESS 'Z'
143 #define KEY_COMPRESS "compress"
144 #define DOC_COMPRESS "\n\tuse compression for migration"
146 #define OPT_HELP 'h'
147 #define KEY_HELP "help"
148 #define DOC_HELP "\n\tprint help"
150 #define OPT_VERSION 'v'
151 #define KEY_VERSION "version"
152 #define DOC_VERSION "\n\tprint version"
154 #define OPT_VERBOSE 'V'
155 #define KEY_VERBOSE "verbose"
156 #define DOC_VERBOSE "\n\tverbose flag"
158 /** Print a usage message.
159 * Prints to stdout if err is zero, and exits with 0.
160 * Prints to stderr if err is non-zero, and exits with 1.
161 */
162 void usage(int err){
163 FILE *out = (err ? stderr : stdout);
165 fprintf(out, "Usage: %s [options]\n", PROGRAM);
166 fprintf(out, "-%c, --%s %s\n", OPT_PORT, KEY_PORT, DOC_PORT);
167 fprintf(out, "-%c, --%s %s\n", OPT_COMPRESS, KEY_COMPRESS, DOC_COMPRESS);
168 fprintf(out, "-%c, --%s %s\n", OPT_VERBOSE, KEY_VERBOSE, DOC_VERBOSE);
169 fprintf(out, "-%c, --%s %s\n", OPT_VERSION, KEY_VERSION, DOC_VERSION);
170 fprintf(out, "-%c, --%s %s\n", OPT_HELP, KEY_HELP, DOC_HELP);
171 exit(err ? 1 : 0);
172 }
174 /** Short options. Options followed by ':' take an argument. */
175 static char *short_opts = (char[]){
176 OPT_PORT, ':',
177 OPT_COMPRESS,
178 OPT_HELP,
179 OPT_VERSION,
180 OPT_VERBOSE,
181 0 };
183 /** Long options. */
184 static struct option const long_opts[] = {
185 { KEY_PORT, required_argument, NULL, OPT_PORT },
186 { KEY_COMPRESS, no_argument, NULL, OPT_COMPRESS },
187 { KEY_HELP, no_argument, NULL, OPT_HELP },
188 { KEY_VERSION, no_argument, NULL, OPT_VERSION },
189 { KEY_VERBOSE, no_argument, NULL, OPT_VERBOSE },
190 { NULL, 0, NULL, 0 }
191 };
193 typedef struct Args {
194 int bufsize;
195 unsigned long port;
196 int verbose;
197 int compress;
198 } Args;
200 /** Transfer states. */
201 enum {
202 XFR_INIT,
203 XFR_HELLO,
204 XFR_STATE,
205 XFR_RUN,
206 XFR_FAIL,
207 XFR_DONE,
208 XFR_MAX
209 };
211 /** Initialize an array element for a constant to its string name. */
212 #define VALDEF(val) { val, #val }
214 /** Names for the transfer states. */
215 static EnumDef xfr_states[] = {
216 VALDEF(XFR_INIT),
217 VALDEF(XFR_HELLO),
218 VALDEF(XFR_STATE),
219 VALDEF(XFR_RUN),
220 VALDEF(XFR_FAIL),
221 VALDEF(XFR_DONE),
222 { 0, NULL }
223 };
226 /** State machine for transfer. */
227 typedef struct XfrState {
228 /** Current state. */
229 int state;
230 /** Error codes for the states. */
231 int state_err[XFR_MAX];
232 /** First error. */
233 int err;
234 /** State when first error happened. */
235 int err_state;
237 uint32_t vmid;
238 char* vmconfig;
239 int vmconfig_n;
240 unsigned long xfr_port;
241 char *xfr_host;
242 uint32_t vmid_new;
243 int live;
244 } XfrState;
246 /** Get the name of a transfer state.
247 *
248 * @param s state
249 * @return name
250 */
251 char * xfr_state_name(int s){
252 return enum_val_to_name(s, xfr_states);
253 }
255 /** Set the state of a transfer.
256 *
257 * @param s transfer
258 * @param state state
259 * @return state
260 */
261 int XfrState_set_state(XfrState *s, int state){
262 s->state = state;
263 return s->state;
264 }
266 /** Get the state of a transfer.
267 *
268 * @param s transfer
269 * @return state
270 */
271 int XfrState_get_state(XfrState *s){
272 return s->state;
273 }
275 /** Set an error in the current state.
276 * Does nothing if an error is already set.
277 *
278 * @param s transfer
279 * @param err error
280 * @return error
281 */
282 int XfrState_set_err(XfrState *s, int err){
283 if(!s->state_err[s->state]){
284 s->state_err[s->state] = err;
285 }
286 if(!s->err){
287 s->err = err;
288 s->err_state = s->state;
289 }
290 return err;
291 }
293 /** Get the error in the current state.
294 *
295 * @param s transfer
296 * @return error
297 */
298 int XfrState_get_err(XfrState *s){
299 return s->state_err[s->state];
300 }
302 /** Get the first error of a transfer.
303 *
304 * @param s transfer
305 * @return error
306 */
307 int XfrState_first_err(XfrState *s){
308 return s->err;
309 }
311 /** Get the state a transfer was in when it had its first error.
312 *
313 * @param s transfer
314 * @return error state
315 */
316 int XfrState_first_err_state(XfrState *s){
317 return s->err_state;
318 }
320 /** Xfrd arguments. */
321 static Args _args = {};
323 /** Xfrd arguments. */
324 static Args *args = &_args;
326 /** Set xfrd default arguments.
327 *
328 * @param args arguments to set
329 */
330 void set_defaults(Args *args){
331 args->compress = FALSE;
332 args->bufsize = 128 * 1024;
333 args->port = htons(XFRD_PORT);
334 }
336 int stringof(Sxpr exp, char **s){
337 int err = 0;
338 //dprintf(">\n"); objprint(iostdout, exp, PRINT_TYPE); IOStream_print(iostdout, "\n");
339 if(ATOMP(exp)){
340 *s = atom_name(exp);
341 } else if(STRINGP(exp)){
342 *s = string_string(exp);
343 } else {
344 err = -EINVAL;
345 *s = NULL;
346 }
347 //dprintf("< err=%d s=%s\n", err, *s);
348 return err;
349 }
351 int intof(Sxpr exp, int *v){
352 int err = 0;
353 char *s;
354 unsigned long l;
355 //dprintf(">\n"); objprint(iostdout, exp, 0); IOStream_print(iostdout, "\n");
356 if(INTP(exp)){
357 *v = OBJ_INT(exp);
358 } else {
359 err = stringof(exp, &s);
360 if(err) goto exit;
361 err = convert_atoul(s, &l);
362 *v = (int)l;
363 }
364 exit:
365 //dprintf("< err=%d v=%d\n", err, *v);
366 return err;
367 }
369 int addrof(Sxpr exp, uint32_t *v){
370 char *h;
371 unsigned long a;
372 int err = 0;
373 //dprintf(">\n"); objprint(iostdout, exp, 0); IOStream_print(iostdout, "\n");
374 err = stringof(exp, &h);
375 if(err) goto exit;
376 if(get_host_address(h, &a)){
377 err = -EINVAL;
378 goto exit;
379 }
380 *v = a;
381 exit:
382 //dprintf("< err=%d v=%x\n", err, *v);
383 return err;
384 }
386 int portof(Sxpr exp, uint16_t *v){
387 char *s;
388 int err = 0;
389 //dprintf(">\n"); objprint(iostdout, exp, 0); IOStream_print(iostdout, "\n");
390 if(INTP(exp)){
391 *v = get_ul(exp);
392 *v = htons(*v);
393 } else {
394 unsigned long p;
395 err = stringof(exp, &s);
396 if(err) goto exit;
397 err = convert_service_to_port(s, &p);
398 if(err){
399 err = -EINVAL;
400 goto exit;
401 }
402 *v = p;
403 }
404 exit:
405 //dprintf("< err=%d v=%u\n", err, *v);
406 return err;
407 }
409 static inline struct in_addr inaddr(uint32_t addr){
410 return (struct in_addr){ .s_addr = addr };
411 }
413 time_t stats(time_t t0, uint64_t offset, uint64_t memory, float *percent, float *rate){
414 time_t t1 = time(NULL);
415 *percent = (offset * 100.0f) / memory;
416 t1 = time(NULL) - t0;
417 *rate = (t1 ? offset/(t1 * 1024.0f) : 0.0f);
418 return t1;
419 }
421 /** Notify success or error.
422 *
423 * @param conn connection
424 * @param errcode error code
425 * @return 0 on success, error code otherwise
426 */
427 int xfr_error(Conn *conn, int errcode){
428 int err = 0;
430 if(!conn->out) return -ENOTCONN;
431 if(errcode <0) errcode = -errcode;
432 err = IOStream_print(conn->out, "(%s %d)",
433 atom_name(oxfr_err), errcode);
434 return (err < 0 ? err : 0);
435 }
437 /** Read a response message - error or ok.
438 *
439 * @param conn connection
440 * @return 0 on success, error code otherwise
441 */
442 int xfr_response(Conn *conn){
443 int err;
444 Sxpr sxpr;
446 dprintf(">\n");
447 if(!conn->out) return -ENOTCONN;
448 err = Conn_sxpr(conn, &sxpr);
449 if(err) goto exit;
450 if(sxpr_elementp(sxpr, oxfr_err)){
451 int errcode;
452 err = intof(sxpr_childN(sxpr, 0, ONONE), &errcode);
453 if(err) goto exit;
454 err = errcode;
455 }
456 exit:
457 dprintf("< err=%d\n", err);
458 return err;
459 }
461 /** Get the initial hello message and check the protocol version.
462 * It is an error to receive anything other than a hello message
463 * with the correct protocol version.
464 *
465 * @param conn connection
466 * @return 0 on success, error code otherwise
467 */
468 int xfr_hello(Conn *conn){
469 int err;
470 uint32_t major = XFR_PROTO_MAJOR, minor = XFR_PROTO_MINOR;
471 uint32_t hello_major, hello_minor;
472 Sxpr sxpr;
473 if(!conn->in) return -ENOTCONN;
474 dprintf(">\n");
475 err = Conn_sxpr(conn, &sxpr);
476 if(err) goto exit;
477 if(!sxpr_elementp(sxpr, oxfr_hello)){
478 wprintf("> sxpr_elementp test failed\n");
479 err = -EINVAL;
480 goto exit;
481 }
482 err = intof(sxpr_childN(sxpr, 0, ONONE), &hello_major);
483 if(err) goto exit;
484 err = intof(sxpr_childN(sxpr, 1, ONONE), &hello_minor);
485 if(err) goto exit;
486 if(hello_major != major || hello_minor != minor){
487 eprintf("> Wanted protocol version %d.%d, got %d.%d",
488 major, minor, hello_major, hello_minor);
489 err = -EINVAL;
490 goto exit;
491 }
492 exit:
493 xfr_error(conn, err);
494 if(err){
495 eprintf("> Hello failed: %d\n", err);
496 }
497 dprintf("< err=%d\n", err);
498 return err;
499 }
501 /** Send the initial hello message.
502 *
503 * @param conn connection
504 * @param msg message
505 * @return 0 on success, error code otherwise
506 */
507 int xfr_send_hello(Conn *conn){
508 int err = 0;
509 dprintf(">\n");
511 err = IOStream_print(conn->out, "(%s %d %d)",
512 atom_name(oxfr_hello),
513 XFR_PROTO_MAJOR,
514 XFR_PROTO_MINOR);
515 if(err < 0) goto exit;
516 IOStream_flush(conn->out);
517 err = xfr_response(conn);
518 exit:
519 dprintf("< err=%d\n", err);
520 return err;
521 }
523 int xfr_send_xfr(Conn *conn, uint32_t vmid){
524 int err;
526 err = IOStream_print(conn->out, "(%s %d)",
527 atom_name(oxfr_xfr), vmid);
528 return (err < 0 ? err : 0);
529 }
531 int xfr_send_xfr_ok(Conn *conn, uint32_t vmid){
532 int err = 0;
534 err = IOStream_print(conn->out, "(%s %d)",
535 atom_name(oxfr_xfr_ok), vmid);
536 return (err < 0 ? err : 0);
537 }
539 int xfr_send_migrate_ok(Conn *conn, uint32_t vmid){
540 int err = 0;
542 err = IOStream_print(conn->out, "(%s %d)",
543 atom_name(oxfr_migrate_ok), vmid);
544 return (err < 0 ? err : 0);
545 }
547 int xfr_send_restore_ok(Conn *conn, uint32_t vmid){
548 int err = 0;
550 err = IOStream_print(conn->out, "(%s %d)",
551 atom_name(oxfr_restore_ok), vmid);
552 return (err < 0 ? err : 0);
553 }
555 int xfr_send_save_ok(Conn *conn){
556 int err = 0;
558 err = IOStream_print(conn->out, "(%s)",
559 atom_name(oxfr_save_ok));
560 return (err < 0 ? err : 0);
561 }
563 int xfr_send_suspend(Conn *conn, uint32_t vmid){
564 int err = 0;
566 err = IOStream_print(conn->out, "(%s %d)",
567 atom_name(oxfr_vm_suspend), vmid);
568 return (err < 0 ? err : 0);
569 }
571 /** Suspend a vm on behalf of save/migrate.
572 */
573 int xfr_vm_suspend(Conn *xend, uint32_t vmid){
574 int err = 0;
575 dprintf("> vmid=%u\n", vmid);
576 err = xfr_send_suspend(xend, vmid);
577 if(err) goto exit;
578 IOStream_flush(xend->out);
579 err = xfr_response(xend);
580 exit:
581 dprintf("< err=%d\n", err);
582 return err;
583 }
585 int xfr_send_destroy(Conn *conn, uint32_t vmid){
586 int err = 0;
588 err = IOStream_print(conn->out, "(%s %d)",
589 atom_name(oxfr_vm_destroy), vmid);
590 return (err < 0 ? err : 0);
591 }
593 /** Destroy a vm on behalf of save/migrate.
594 */
595 int xfr_vm_destroy(Conn *xend, uint32_t vmid){
596 int err = 0;
597 dprintf("> vmid=%u\n", vmid);
598 err = xfr_send_destroy(xend, vmid);
599 if(err) goto exit;
600 IOStream_flush(xend->out);
601 err = xfr_response(xend);
602 exit:
603 dprintf("< err=%d\n", err);
604 return err;
605 }
607 /** Get vm state. Send transfer message.
608 *
609 * @param peer connection
610 * @param msg message
611 * @return 0 on success, error code otherwise
612 */
613 int xfr_send_state(XfrState *state, Conn *xend, Conn *peer){
614 int err = 0;
615 Sxpr sxpr;
617 dprintf(">\n");
618 XfrState_set_state(state, XFR_STATE);
619 // Send xfr message and the domain state.
620 err = xfr_send_xfr(peer, state->vmid);
621 if(err) goto exit;
622 dprintf(">*** Sending domain %u\n", state->vmid);
623 err = xen_domain_snd(xend, peer->out,
624 state->vmid,
625 state->vmconfig, state->vmconfig_n,
626 state->live);
627 dprintf(">*** Sent domain %u\n", state->vmid);
628 if(err) goto exit;
629 // Sending the domain suspends it, and there's no way back.
630 // So destroy it now. If anything goes wrong now it's too late.
631 dprintf(">*** Destroying domain %u\n", state->vmid);
632 err = xfr_vm_destroy(xend, state->vmid);
633 if(err) goto exit;
634 err = xfr_error(peer, err);
635 if(err) goto exit;
636 IOStream_flush(peer->out);
637 // Read the response from the peer.
638 err = Conn_sxpr(peer, &sxpr);
639 if(err) goto exit;
640 if(sxpr_elementp(sxpr, oxfr_err)){
641 // Error.
642 int errcode;
643 err = intof(sxpr_childN(sxpr, 0, ONONE), &errcode);
644 if(!err) err = errcode;
645 } else if(sxpr_elementp(sxpr, oxfr_xfr_ok)){
646 // Ok - get the new domain id.
647 err = intof(sxpr_childN(sxpr, 0, ONONE), &state->vmid_new);
648 xfr_error(peer, err);
649 } else {
650 // Anything else is invalid. But it may be too late.
651 err = -EINVAL;
652 xfr_error(peer, err);
653 }
654 exit:
655 XfrState_set_err(state, err);
656 dprintf("< err=%d\n", err);
657 return err;
658 }
660 /** Finish the transfer.
661 */
662 int xfr_send_done(XfrState *state, Conn *xend){
663 int err = 0;
664 int first_err = 0;
666 first_err = XfrState_first_err(state);
667 if(first_err){
668 XfrState_set_state(state, XFR_FAIL);
669 } else {
670 XfrState_set_state(state, XFR_DONE);
671 }
672 if(first_err){
673 err = xfr_error(xend, first_err);
674 } else {
675 // Report new domain id to xend.
676 err = xfr_send_migrate_ok(xend, state->vmid_new);
677 }
679 XfrState_set_err(state, err);
680 if(XfrState_first_err(state)){
681 int s, serr;
683 wprintf("> Transfer errors:\n");
684 for(s = 0; s < XFR_MAX; s++){
685 serr = state->state_err[s];
686 if(!serr) continue;
687 wprintf("> state=%-12s err=%d\n", xfr_state_name(s), serr);
688 }
689 } else {
690 wprintf("> Transfer OK\n");
691 }
692 dprintf("< err=%d\n", err);
693 return err;
694 }
696 /** Migrate a vm to another node.
697 *
698 * @param xend connection
699 * @return 0 on success, error code otherwise
700 */
701 int xfr_send(Args *args, XfrState *state, Conn *xend, uint32_t addr, uint32_t port){
702 int err = 0;
703 Conn _peer = {}, *peer = &_peer;
704 int flags = 0;
705 struct in_addr xfr_addr;
706 uint16_t xfr_port;
707 time_t t0 = time(NULL), t1;
709 dprintf(">\n");
710 flags |= CONN_NOBUFFER;
711 if(args->compress){
712 flags |= CONN_WRITE_COMPRESS;
713 }
714 xfr_addr.s_addr = addr;
715 xfr_port = port;
716 if(!xfr_port) xfr_port = htons(XFRD_PORT);
717 dprintf("> Xfr vmid=%u\n", state->vmid);
718 dprintf("> Xfr xfr_addr=%s:%d\n", inet_ntoa(xfr_addr), ntohs(xfr_port));
719 err = Conn_connect(peer, flags, xfr_addr, xfr_port);
720 if(err) goto exit;
721 XfrState_set_state(state, XFR_HELLO);
722 // Send hello message.
723 err = xfr_send_hello(peer);
724 if(err) goto exit;
725 printf("\n");
726 // Send vm state.
727 err = xfr_send_state(state, xend, peer);
728 if(err) goto exit;
729 if(args->compress){
730 IOStream *zio = peer->out;
731 int plain_bytes = lzi_stream_plain_bytes(zio);
732 int comp_bytes = lzi_stream_comp_bytes(zio);
733 float ratio = lzi_stream_ratio(zio);
734 iprintf("> Compression: plain %d bytes, compressed %d bytes, ratio %3.2f\n",
735 plain_bytes, comp_bytes, ratio);
736 }
737 exit:
738 dprintf("> err=%d\n", err);
739 if(err && !XfrState_get_err(state)){
740 XfrState_set_err(state, err);
741 }
742 Conn_close(peer);
743 if(!err){
744 t1 = time(NULL) - t0;
745 iprintf("> Transfer complete in %lu seconds\n", t1);
746 }
747 dprintf("> done err=%d, notifying xend...\n", err);
748 xfr_send_done(state, xend);
749 dprintf("< err=%d\n", err);
750 return err;
751 }
753 /** Save a vm to file.
754 */
755 int xfr_save(Args *args, XfrState *state, Conn *xend, char *file){
756 int err = 0;
757 int compress = 0;
758 IOStream *io = NULL;
760 dprintf("> file=%s\n", file);
761 if(compress){
762 io = gzip_stream_fopen(file, "wb1");
763 } else {
764 io = file_stream_fopen(file, "wb");
765 }
766 if(!io){
767 eprintf("> Failed to open %s\n", file);
768 err = -EINVAL;
769 goto exit;
770 }
771 err = xen_domain_snd(xend, io,
772 state->vmid,
773 state->vmconfig, state->vmconfig_n,
774 0);
775 if(err){
776 err = xfr_error(xend, err);
777 } else {
778 err = xfr_send_save_ok(xend);
779 }
780 exit:
781 if(io){
782 IOStream_close(io);
783 IOStream_free(io);
784 }
785 if(err){
786 unlink(file);
787 }
788 dprintf("< err=%d\n", err);
789 return err;
790 }
792 /** Restore a vm from file.
793 *
794 * @return 0 on success, error code otherwise
795 */
796 int xfr_restore(Args *args, XfrState *state, Conn *xend, char *file){
797 int err = 0;
798 IOStream *io = NULL;
799 int configured=0;
801 dprintf("> file=%s\n", file);
802 io = gzip_stream_fopen(file, "rb");
803 if(!io){
804 eprintf("> Failed to open %s\n", file);
805 err = -EINVAL;
806 goto exit;
807 }
808 err = xen_domain_rcv(io,
809 &state->vmid_new,
810 &state->vmconfig, &state->vmconfig_n,
811 &configured);
812 if(err) goto exit;
813 if(!configured){
814 err = xen_domain_configure(state->vmid_new, state->vmconfig, state->vmconfig_n);
815 if(err) goto exit;
816 }
817 err = xen_domain_unpause(state->vmid_new);
818 exit:
819 if(io){
820 IOStream_close(io);
821 IOStream_free(io);
822 }
823 if(err){
824 xfr_error(xend, err);
825 } else {
826 xfr_send_restore_ok(xend, state->vmid_new);
827 }
828 dprintf("< err=%d\n", err);
829 return err;
830 }
832 /** Accept the transfer of a vm from another node.
833 *
834 * @param peer connection
835 * @param msg message
836 * @return 0 on success, error code otherwise
837 */
838 int xfr_recv(Args *args, XfrState *state, Conn *peer){
839 int err = 0;
840 time_t t0 = time(NULL), t1;
841 Sxpr sxpr;
842 int configured=0;
844 dprintf("> peer=%s\n", inet_ntoa(peer->addr.sin_addr));
845 // If receiving from localhost set configured so that that xen_domain_rcv()
846 // does not attempt to configure the new domain. This is because the old
847 // domain still exists and will make it fail.
848 if(peer->addr.sin_addr.s_addr == htonl(INADDR_LOOPBACK)){
849 dprintf("> Peer is localhost\n");
850 configured = 1;
851 }
852 err = xen_domain_rcv(peer->in,
853 &state->vmid_new,
854 &state->vmconfig, &state->vmconfig_n,
855 &configured);
856 if(err) goto exit;
857 // Read from the peer. This is just so we wait before configuring.
858 // When migrating to the same host the peer must destroy the domain
859 // before we configure the new one.
860 err = Conn_sxpr(peer, &sxpr);
861 if(err) goto exit;
862 if(!configured){
863 dprintf("> Configuring...\n");
864 err = xen_domain_configure(state->vmid_new, state->vmconfig, state->vmconfig_n);
865 if(err) goto exit;
866 }
867 err = xen_domain_unpause(state->vmid_new);
868 if(err) goto exit;
869 // Report new domain id to peer.
870 err = xfr_send_xfr_ok(peer, state->vmid_new);
871 if(err) goto exit;
872 // Get the final ok.
873 err = xfr_response(peer);
874 exit:
875 if(!err){
876 t1 = time(NULL) - t0;
877 iprintf("> Transfer complete in %lu seconds\n", t1);
878 }
879 if(err){
880 xfr_error(peer, err);
881 }
882 dprintf("< err=%d\n", err);
883 return err;
884 }
886 /** Listen for a hello followed by a service request.
887 * The request can be from the local xend or from xfrd on another node.
888 *
889 * @param peersock socket
890 * @param peer_in peer address
891 * @return 0 on success, error code otherwise
892 */
893 int xfrd_service(Args *args, int peersock, struct sockaddr_in peer_in){
894 int err = 0;
895 Sxpr sxpr;
896 Conn _conn = {}, *conn = &_conn;
897 int flags = CONN_NOBUFFER;
899 dprintf(">\n");
900 err = Conn_init(conn, flags, peersock, peer_in);
901 if(err) goto exit;
902 //dprintf(">xfr_hello... \n");
903 err = xfr_hello(conn);
904 if(err) goto exit;
905 //dprintf("> sxpr...\n");
906 err = Conn_sxpr(conn, &sxpr);
907 if(err) goto exit;
908 //dprintf("> sxpr=\n");
909 //objprint(iostdout, sxpr, PRINT_TYPE); IOStream_print(iostdout, "\n");
910 if(sxpr_elementp(sxpr, oxfr_migrate)){
911 // Migrate message from xend.
912 uint32_t addr;
913 uint16_t port;
914 XfrState _state = {}, *state = &_state;
915 int n = 0;
917 dprintf("> xfr.migrate\n");
918 err = intof(sxpr_childN(sxpr, n++, ONONE), &state->vmid);
919 if(err) goto exit;
920 err = stringof(sxpr_childN(sxpr, n++, ONONE), &state->vmconfig);
921 if(err) goto exit;
922 state->vmconfig_n = strlen(state->vmconfig);
923 err = addrof(sxpr_childN(sxpr, n++, ONONE), &addr);
924 if(err) goto exit;
925 err = portof(sxpr_childN(sxpr, n++, ONONE), &port);
926 if(err) goto exit;
927 err = intof(sxpr_childN(sxpr, n++, ONONE), &state->live);
928 if(err) goto exit;
929 err = xfr_send(args, state, conn, addr, port);
931 } else if(sxpr_elementp(sxpr, oxfr_save)){
932 // Save message from xend.
933 char *file;
934 XfrState _state = {}, *state = &_state;
935 int n = 0;
937 dprintf("> xfr.save\n");
938 err = intof(sxpr_childN(sxpr, n++, ONONE), &state->vmid);
939 if(err) goto exit;
940 err = stringof(sxpr_childN(sxpr, n++, ONONE), &state->vmconfig);
941 if(err) goto exit;
942 state->vmconfig_n = strlen(state->vmconfig);
943 err = stringof(sxpr_childN(sxpr, n++, ONONE), &file);
944 if(err) goto exit;
945 err = xfr_save(args, state, conn, file);
947 } else if(sxpr_elementp(sxpr, oxfr_restore)){
948 // Restore message from xend.
949 char *file;
950 XfrState _state = {}, *state = &_state;
951 int n = 0;
953 dprintf("> xfr.restore\n");
954 err = stringof(sxpr_childN(sxpr, n++, ONONE), &file);
955 if(err) goto exit;
956 err = xfr_restore(args, state, conn, file);
958 } else if(sxpr_elementp(sxpr, oxfr_xfr)){
959 // Xfr message from peer xfrd.
960 XfrState _state = {}, *state = &_state;
961 int n = 0;
963 dprintf("> xfr.xfr\n");
964 err = intof(sxpr_childN(sxpr, n++, ONONE), &state->vmid);
965 if(err) goto exit;
966 err = xfr_recv(args, state, conn);
968 } else{
969 // Anything else is invalid.
970 err = -EINVAL;
971 eprintf("> Invalid message: ");
972 objprint(iostderr, sxpr, 0);
973 IOStream_print(iostderr, "\n");
974 xfr_error(conn, err);
975 }
976 exit:
977 Conn_close(conn);
978 dprintf("< err=%d\n", err);
979 return err;
980 }
982 /** Accept an incoming connection.
983 *
984 * @param sock tcp socket
985 * @return 0 on success, error code otherwise
986 */
987 int xfrd_accept(Args *args, int sock){
988 struct sockaddr_in peer_in;
989 struct sockaddr *peer = (struct sockaddr *)&peer_in;
990 socklen_t peer_n = sizeof(peer_in);
991 int peersock;
992 pid_t pid;
993 int err = 0;
995 dprintf("> sock=%d\n", sock);
996 dprintf("> accept...\n");
997 peersock = accept(sock, peer, &peer_n);
998 dprintf("> accept=%d\n", peersock);
999 if(peersock < 0){
1000 perror("accept");
1001 err = -errno;
1002 goto exit;
1004 iprintf("> Accepted connection from %s:%d on %d\n",
1005 inet_ntoa(peer_in.sin_addr), htons(peer_in.sin_port), sock);
1006 pid = fork();
1007 if(pid > 0){
1008 // Parent, fork succeeded.
1009 iprintf("> Forked child pid=%d\n", pid);
1010 close(peersock);
1011 } else if (pid < 0){
1012 // Parent, fork failed.
1013 perror("fork");
1014 close(peersock);
1015 } else {
1016 // Child.
1017 iprintf("> Xfr service for %s:%d\n",
1018 inet_ntoa(peer_in.sin_addr), htons(peer_in.sin_port));
1019 err = xfrd_service(args, peersock, peer_in);
1020 iprintf("> Xfr service err=%d\n", err);
1021 shutdown(peersock, 2);
1022 exit(err ? 1 : 0);
1024 exit:
1025 dprintf("< err=%d\n", err);
1026 return err;
1029 /** Socket select loop.
1030 * Accepts connections on the tcp socket.
1032 * @param listen_sock tcp listen socket
1033 * @return 0 on success, error code otherwise
1034 */
1035 int xfrd_select(Args *args, int listen_sock){
1036 int err = 0;
1037 SelectSet set = {};
1038 dprintf("> socks: %d\n", listen_sock);
1039 while(1){
1040 SelectSet_zero(&set);
1041 SelectSet_add_read(&set, listen_sock);
1042 err = SelectSet_select(&set, NULL);
1043 if(err < 0){
1044 if(errno == EINTR) continue;
1045 perror("select");
1046 goto exit;
1048 if(FD_ISSET(listen_sock, &set.rd)){
1049 xfrd_accept(args, listen_sock);
1052 exit:
1053 dprintf("< err=%d\n", err);
1054 return err;
1057 /** Create a socket.
1059 * @param args program arguments
1060 * @param socktype socket type
1061 * @param reuse whether to set SO_REUSEADDR
1062 * @param val return value for the socket
1063 * @return 0 on success, error code otherwise
1064 */
1065 int create_socket(Args *args, int socktype, int reuse, int *val){
1066 int err = 0;
1067 int sock = 0;
1068 struct sockaddr_in addr_in;
1069 struct sockaddr *addr = (struct sockaddr *)&addr_in;
1070 socklen_t addr_n = sizeof(addr_in);
1072 dprintf(">\n");
1073 // Create socket and bind it.
1074 sock = socket(AF_INET, socktype, 0);
1075 if(sock < 0){
1076 err = -errno;
1077 goto exit;
1079 addr_in.sin_family = AF_INET;
1080 addr_in.sin_addr.s_addr = INADDR_ANY;
1081 addr_in.sin_port = args->port;
1082 dprintf("> port=%d\n", ntohs(addr_in.sin_port));
1083 if(reuse){
1084 // Set socket option to reuse address.
1085 int val = 1;
1086 err = setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val));
1087 if(err < 0){
1088 err = -errno;
1089 perror("setsockopt");
1090 goto exit;
1093 err = bind(sock, addr, addr_n);
1094 if(err < 0){
1095 err = -errno;
1096 perror("bind");
1097 goto exit;
1099 exit:
1100 *val = (err ? -1 : sock);
1101 dprintf("< err=%d\n", err);
1102 return err;
1105 /** Create the tcp listen socket.
1107 * @param args program arguments
1108 * @param val return value for the socket
1109 * @return 0 on success, error code otherwise
1110 */
1111 int xfrd_listen_socket(Args *args, int *val){
1112 int err = 0;
1113 int sock;
1114 dprintf(">\n");
1115 err = create_socket(args, SOCK_STREAM, 1, &sock);
1116 if(err) goto exit;
1117 dprintf("> listen...\n");
1118 err = listen(sock, 5);
1119 if(err < 0){
1120 err = -errno;
1121 perror("listen");
1122 goto exit;
1124 exit:
1125 *val = (err ? -1 : sock);
1126 if(err) close(sock);
1127 dprintf("< err=%d\n", err);
1128 return err;
1131 /** Type for signal handling functions. */
1132 typedef void SignalAction(int code, siginfo_t *info, void *data);
1134 /** Handle SIGCHLD by getting child exit status.
1135 * This prevents child processes being defunct.
1137 * @param code signal code
1138 * @param info signal info
1139 * @param data
1140 */
1141 void sigaction_SIGCHLD(int code, siginfo_t *info, void *data){
1142 int status;
1143 pid_t pid;
1144 //dprintf("> child_exit=%d waiting...\n", child_exit);
1145 pid = wait(&status);
1146 dprintf("> child pid=%d status=%d\n", pid, status);
1149 /** Handle SIGPIPE.
1151 * @param code signal code
1152 * @param info signal info
1153 * @param data
1154 */
1155 void sigaction_SIGPIPE(int code, siginfo_t *info, void *data){
1156 dprintf("> SIGPIPE\n");
1157 //fflush(stdout);
1158 //fflush(stderr);
1159 //exit(1);
1162 /** Handle SIGALRM.
1164 * @param code signal code
1165 * @param info signal info
1166 * @param data
1167 */
1168 void sigaction_SIGALRM(int code, siginfo_t *info, void *data){
1169 dprintf("> SIGALRM\n");
1172 /** Install a handler for a signal.
1174 * @param signum signal
1175 * @param action handler
1176 * @return 0 on success, error code otherwise
1177 */
1178 int catch_signal(int signum, SignalAction *action){
1179 int err = 0;
1180 struct sigaction sig = {};
1181 sig.sa_sigaction = action;
1182 sig.sa_flags = SA_SIGINFO;
1183 err = sigaction(signum, &sig, NULL);
1184 if(err){
1185 perror("sigaction");
1187 return err;
1190 /** Transfer daemon main program.
1192 * @param args program arguments
1193 * @return 0 on success, error code otherwise
1194 */
1195 int xfrd_main(Args *args){
1196 int err = 0;
1197 int listen_sock;
1199 dprintf(">\n");
1200 catch_signal(SIGCHLD,sigaction_SIGCHLD);
1201 catch_signal(SIGPIPE,sigaction_SIGPIPE);
1202 catch_signal(SIGALRM,sigaction_SIGALRM);
1203 err = xfrd_listen_socket(args, &listen_sock);
1204 if(err) goto exit;
1205 err = xfrd_select(args, listen_sock);
1206 exit:
1207 close(listen_sock);
1208 dprintf("< err=%d\n", err);
1209 return err;
1212 /** Parse command-line arguments and call the xfrd main program.
1214 * @param arg argument count
1215 * @param argv arguments
1216 * @return 0 on success, 1 otherwise
1217 */
1218 int main(int argc, char *argv[]){
1219 int err = 0;
1220 int key = 0;
1221 int long_index = 0;
1222 static const char * LOGFILE = "/var/log/xfrd.log";
1224 #ifndef DEBUG
1225 freopen(LOGFILE, "w+", stdout);
1226 fclose(stderr);
1227 stderr = stdout;
1228 #endif
1229 dprintf(">\n");
1230 set_defaults(args);
1231 while(1){
1232 key = getopt_long(argc, argv, short_opts, long_opts, &long_index);
1233 if(key == -1) break;
1234 switch(key){
1235 case OPT_PORT:
1236 err = !convert_service_to_port(optarg, &args->port);
1237 if(err) goto exit;
1238 break;
1239 case OPT_COMPRESS:
1240 args->compress = TRUE;
1241 break;
1242 case OPT_HELP:
1243 usage(0);
1244 break;
1245 case OPT_VERBOSE:
1246 args->verbose = TRUE;
1247 break;
1248 case OPT_VERSION:
1249 printf("> Version %d.%d\n", XFR_PROTO_MAJOR, XFR_PROTO_MINOR);
1250 exit(0);
1251 break;
1252 default:
1253 usage(EINVAL);
1254 break;
1257 xfr_init();
1258 err = xfrd_main(args);
1259 exit:
1260 if(err && key > 0){
1261 fprintf(stderr, "Error in arg %c\n", key);
1263 return (err ? 1 : 0);