debuggers.hg

view tools/blktap2/drivers/block-remus.c @ 20983:a948403c8f99

Remus: increase failover timeout from 500ms to 1s

500ms is aggressive enough to trigger split-brain under fairly
ordinary workloads, particularly for HVM. The long-term fix is to
integrate with a real HA monitor like linux HA.

Signed-off-by: Brendan Cully <brendan@cs.ubc.ca>
author Keir Fraser <keir.fraser@citrix.com>
date Fri Feb 12 09:23:10 2010 +0000 (2010-02-12)
parents aac490021bb8
children c1f272c3a441
line source
1 /* block-remus.c
2 *
3 * This disk sends all writes to a backup via a network interface before
4 * passing them to an underlying device.
5 * The backup is a bit more complicated:
6 * 1. It applies all incoming writes to a ramdisk.
7 * 2. When a checkpoint request arrives, it moves the ramdisk to
8 * a committing state and uses a new ramdisk for subsequent writes.
9 * It also acknowledges the request, to let the sender know it can
10 * release output.
11 * 3. The ramdisk flushes its contents to the underlying driver.
12 * 4. At failover, the backup waits for the in-flight ramdisk (if any) to
13 * drain before letting the domain be activated.
14 *
15 * The driver determines whether it is the client or server by attempting
16 * to bind to the replication address. If the address is not local,
17 * the driver acts as client.
18 *
19 * The following messages are defined for the replication stream:
20 * 1. write request
21 * "wreq" 4
22 * num_sectors 4
23 * sector 8
24 * buffer (num_sectors * sector_size)
25 * 2. submit request (may be used as a barrier
26 * "sreq" 4
27 * 3. commit request
28 * "creq" 4
29 * After a commit request, the client must wait for a competion message:
30 * 4. completion
31 * "done" 4
32 */
34 /* due to architectural choices in tapdisk, block-buffer is forced to
35 * reimplement some code which is meant to be private */
36 #define TAPDISK
37 #include "tapdisk.h"
38 #include "tapdisk-server.h"
39 #include "tapdisk-driver.h"
40 #include "tapdisk-interface.h"
41 #include "hashtable.h"
42 #include "hashtable_itr.h"
43 #include "hashtable_utility.h"
45 #include <errno.h>
46 #include <inttypes.h>
47 #include <fcntl.h>
48 #include <stdio.h>
49 #include <stdlib.h>
50 #include <string.h>
51 #include <sys/time.h>
52 #include <sys/types.h>
53 #include <sys/socket.h>
54 #include <netdb.h>
55 #include <netinet/in.h>
56 #include <arpa/inet.h>
57 #include <sys/param.h>
58 #include <sys/sysctl.h>
59 #include <unistd.h>
61 /* timeout for reads and writes in ms */
62 #define HEARTBEAT_MS 1000
63 #define RAMDISK_HASHSIZE 128
65 /* connect retry timeout (seconds) */
66 #define REMUS_CONNRETRY_TIMEOUT 10
68 #define RPRINTF(_f, _a...) syslog (LOG_DEBUG, "remus: " _f, ## _a)
70 enum tdremus_mode {
71 mode_invalid = 0,
72 mode_unprotected,
73 mode_primary,
74 mode_backup
75 };
77 struct tdremus_req {
78 uint64_t sector;
79 int nb_sectors;
80 char buf[4096];
81 };
83 struct req_ring {
84 /* waste one slot to distinguish between empty and full */
85 struct tdremus_req requests[MAX_REQUESTS * 2 + 1];
86 unsigned int head;
87 unsigned int tail;
88 };
90 /* TODO: This isn't very pretty, but to properly generate our own treqs (needed
91 * by the backup) we need to know our td_vbt_t and td_image_t (blktap2
92 * internals). As a proper fix, we should consider extending the tapdisk
93 * interface with a td_create_request() function, or something similar.
94 *
95 * For now, we just grab the vbd in the td_open() command, and the td_image_t
96 * from the first read request.
97 */
98 td_vbd_t *device_vbd = NULL;
99 td_image_t *remus_image = NULL;
101 struct ramdisk {
102 size_t sector_size;
103 struct hashtable* h;
104 /* when a ramdisk is flushed, h is given a new empty hash for writes
105 * while the old ramdisk (prev) is drained asynchronously. To avoid
106 * a race where a read request points to a sector in prev which has
107 * not yet been flushed, check prev on a miss in h */
108 struct hashtable* prev;
109 /* count of outstanding requests to the base driver */
110 size_t inflight;
111 };
113 /* the ramdisk intercepts the original callback for reads and writes.
114 * This holds the original data. */
115 /* Might be worth making this a static array in struct ramdisk to avoid
116 * a malloc per request */
118 struct tdremus_state;
120 struct ramdisk_cbdata {
121 td_callback_t cb;
122 void* private;
123 char* buf;
124 struct tdremus_state* state;
125 };
127 struct ramdisk_write_cbdata {
128 struct tdremus_state* state;
129 char* buf;
130 };
132 typedef void (*queue_rw_t) (td_driver_t *driver, td_request_t treq);
134 /* poll_fd type for blktap2 fd system. taken from block_log.c */
135 typedef struct poll_fd {
136 int fd;
137 event_id_t id;
138 } poll_fd_t;
140 struct tdremus_state {
141 // struct tap_disk* driver;
142 void* driver_data;
144 /* XXX: this is needed so that the server can perform operations on
145 * the driver from the stream_fd event handler. fix this. */
146 td_driver_t *tdremus_driver;
148 /* TODO: we may wish to replace these two FIFOs with a unix socket */
149 char* ctl_path; /* receive flush instruction here */
150 poll_fd_t ctl_fd; /* io_fd slot for control FIFO */
151 char* msg_path; /* output completion message here */
152 poll_fd_t msg_fd;
154 /* replication host */
155 struct sockaddr_in sa;
156 poll_fd_t server_fd; /* server listen port */
157 poll_fd_t stream_fd; /* replication channel */
159 /* queue write requests, batch-replicate at submit */
160 struct req_ring write_ring;
162 /* ramdisk data*/
163 struct ramdisk ramdisk;
165 /* mode methods */
166 enum tdremus_mode mode;
167 int (*queue_flush)(td_driver_t *driver);
168 };
170 typedef struct tdremus_wire {
171 uint32_t op;
172 uint64_t id;
173 uint64_t sec;
174 uint32_t secs;
175 } tdremus_wire_t;
177 #define TDREMUS_READ "rreq"
178 #define TDREMUS_WRITE "wreq"
179 #define TDREMUS_SUBMIT "sreq"
180 #define TDREMUS_COMMIT "creq"
181 #define TDREMUS_DONE "done"
182 #define TDREMUS_FAIL "fail"
184 /* primary read/write functions */
185 static void primary_queue_read(td_driver_t *driver, td_request_t treq);
186 static void primary_queue_write(td_driver_t *driver, td_request_t treq);
188 /* backup read/write functions */
189 static void backup_queue_read(td_driver_t *driver, td_request_t treq);
190 static void backup_queue_write(td_driver_t *driver, td_request_t treq);
192 /* unpritected read/write functions */
193 static void unprotected_queue_read(td_driver_t *driver, td_request_t treq);
194 static void unprotected_queue_write(td_driver_t *driver, td_request_t treq);
196 static int tdremus_close(td_driver_t *driver);
198 static int switch_mode(td_driver_t *driver, enum tdremus_mode mode);
199 static int ctl_respond(struct tdremus_state *s, const char *response);
201 /* ring functions */
202 static inline unsigned int ring_next(struct req_ring* ring, unsigned int pos)
203 {
204 if (++pos >= MAX_REQUESTS * 2 + 1)
205 return 0;
207 return pos;
208 }
210 static inline int ring_isempty(struct req_ring* ring)
211 {
212 return ring->head == ring->tail;
213 }
215 static inline int ring_isfull(struct req_ring* ring)
216 {
217 return ring_next(ring, ring->tail) == ring->head;
218 }
220 /* functions to create and sumbit treq's */
222 static void
223 replicated_write_callback(td_request_t treq, int err)
224 {
225 struct tdremus_state *s = (struct tdremus_state *) treq.cb_data;
226 td_vbd_request_t *vreq;
228 vreq = (td_vbd_request_t *) treq.private;
230 /* the write failed for now, lets panic. this is very bad */
231 if (err) {
232 RPRINTF("ramdisk write failed, disk image is not consistent\n");
233 exit(-1);
234 }
236 /* The write succeeded. let's pull the vreq off whatever request list
237 * it is on and free() it */
238 list_del(&vreq->next);
239 free(vreq);
241 s->ramdisk.inflight--;
242 if (!s->ramdisk.inflight && !s->ramdisk.prev) {
243 /* TODO: the ramdisk has been flushed */
244 }
245 }
247 static inline int
248 create_write_request(struct tdremus_state *state, td_sector_t sec, int secs, char *buf)
249 {
250 td_request_t treq;
251 td_vbd_request_t *vreq;
253 treq.op = TD_OP_WRITE;
254 treq.buf = buf;
255 treq.sec = sec;
256 treq.secs = secs;
257 treq.image = remus_image;
258 treq.cb = replicated_write_callback;
259 treq.cb_data = state;
260 treq.id = 0;
261 treq.sidx = 0;
263 vreq = calloc(1, sizeof(td_vbd_request_t));
264 treq.private = vreq;
266 if(!vreq)
267 return -1;
269 vreq->submitting = 1;
270 INIT_LIST_HEAD(&vreq->next);
271 tapdisk_vbd_move_request(treq.private, &device_vbd->pending_requests);
273 /* TODO:
274 * we should probably leave it up to the caller to forward the request */
275 td_forward_request(treq);
277 vreq->submitting--;
279 return 0;
280 }
283 /* ramdisk methods */
284 static int ramdisk_flush(td_driver_t *driver, struct tdremus_state *s);
286 /* http://www.concentric.net/~Ttwang/tech/inthash.htm */
287 static unsigned int uint64_hash(void* k)
288 {
289 uint64_t key = *(uint64_t*)k;
291 key = (~key) + (key << 18);
292 key = key ^ (key >> 31);
293 key = key * 21;
294 key = key ^ (key >> 11);
295 key = key + (key << 6);
296 key = key ^ (key >> 22);
298 return (unsigned int)key;
299 }
301 static int rd_hash_equal(void* k1, void* k2)
302 {
303 uint64_t key1, key2;
305 key1 = *(uint64_t*)k1;
306 key2 = *(uint64_t*)k2;
308 return key1 == key2;
309 }
311 static int ramdisk_read(struct ramdisk* ramdisk, uint64_t sector,
312 int nb_sectors, char* buf)
313 {
314 int i;
315 char* v;
316 uint64_t key;
318 for (i = 0; i < nb_sectors; i++) {
319 key = sector + i;
320 if (!(v = hashtable_search(ramdisk->h, &key))) {
321 /* check whether it is queued in a previous flush request */
322 if (!(ramdisk->prev && (v = hashtable_search(ramdisk->prev, &key))))
323 return -1;
324 }
325 memcpy(buf + i * ramdisk->sector_size, v, ramdisk->sector_size);
326 }
328 return 0;
329 }
331 static int ramdisk_write_hash(struct hashtable* h, uint64_t sector, char* buf,
332 size_t len)
333 {
334 char* v;
335 uint64_t* key;
337 if ((v = hashtable_search(h, &sector))) {
338 memcpy(v, buf, len);
339 return 0;
340 }
342 if (!(v = malloc(len))) {
343 DPRINTF("ramdisk_write_hash: malloc failed\n");
344 return -1;
345 }
346 memcpy(v, buf, len);
347 if (!(key = malloc(sizeof(*key)))) {
348 DPRINTF("ramdisk_write_hash: error allocating key\n");
349 free(v);
350 return -1;
351 }
352 *key = sector;
353 if (!hashtable_insert(h, key, v)) {
354 DPRINTF("ramdisk_write_hash failed on sector %" PRIu64 "\n", sector);
355 free(key);
356 free(v);
357 return -1;
358 }
360 return 0;
361 }
363 static inline int ramdisk_write(struct ramdisk* ramdisk, uint64_t sector,
364 int nb_sectors, char* buf)
365 {
366 int i, rc;
368 for (i = 0; i < nb_sectors; i++) {
369 rc = ramdisk_write_hash(ramdisk->h, sector + i,
370 buf + i * ramdisk->sector_size,
371 ramdisk->sector_size);
372 if (rc)
373 return rc;
374 }
376 return 0;
377 }
379 static int ramdisk_write_cb(td_driver_t *driver, int res, uint64_t sector,
380 int nb_sectors, int id, void* private)
381 {
382 struct ramdisk_write_cbdata *cbdata = (struct ramdisk_write_cbdata*)private;
383 struct tdremus_state *s = cbdata->state;
384 int rc;
386 /*
387 RPRINTF("ramdisk write callback: rc %d, %d sectors @ %" PRIu64 "\n", res, nb_sectors,
388 sector);
389 */
391 free(cbdata->buf);
392 free(cbdata);
394 s->ramdisk.inflight--;
395 if (!s->ramdisk.inflight && !s->ramdisk.prev) {
396 /* when this reaches 0 and prev is empty, the disk is flushed. */
397 /*
398 RPRINTF("ramdisk flush complete\n");
399 */
400 }
402 if (s->ramdisk.prev) {
403 /* resubmit as much as possible in the remaining disk */
404 /*
405 RPRINTF("calling ramdisk_flush from write callback\n");
406 */
407 return ramdisk_flush(driver, s);
408 }
410 return 0;
411 }
413 static int uint64_compare(const void* k1, const void* k2)
414 {
415 uint64_t u1 = *(uint64_t*)k1;
416 uint64_t u2 = *(uint64_t*)k2;
418 /* u1 - u2 is unsigned */
419 return u1 < u2 ? -1 : u1 > u2 ? 1 : 0;
420 }
422 /* set psectors to an array of the sector numbers in the hash, returning
423 * the number of entries (or -1 on error) */
424 static int ramdisk_get_sectors(struct hashtable* h, uint64_t** psectors)
425 {
426 struct hashtable_itr* itr;
427 uint64_t* sectors;
428 int count;
430 if (!(count = hashtable_count(h)))
431 return 0;
433 if (!(*psectors = malloc(count * sizeof(uint64_t)))) {
434 DPRINTF("ramdisk_get_sectors: error allocating sector map\n");
435 return -1;
436 }
437 sectors = *psectors;
439 itr = hashtable_iterator(h);
440 count = 0;
441 do {
442 sectors[count++] = *(uint64_t*)hashtable_iterator_key(itr);
443 } while (hashtable_iterator_advance(itr));
444 free(itr);
446 return count;
447 }
449 static char* merge_requests(struct ramdisk* ramdisk, uint64_t start,
450 size_t count)
451 {
452 char* buf;
453 char* sector;
454 int i;
456 if (!(buf = valloc(count * ramdisk->sector_size))) {
457 DPRINTF("merge_request: allocation failed\n");
458 return NULL;
459 }
461 for (i = 0; i < count; i++) {
462 if (!(sector = hashtable_search(ramdisk->prev, &start))) {
463 DPRINTF("merge_request: lookup failed on %"PRIu64"\n", start);
464 return NULL;
465 }
467 memcpy(buf + i * ramdisk->sector_size, sector, ramdisk->sector_size);
468 free(sector);
470 start++;
471 }
473 return buf;
474 }
476 /* The underlying driver may not handle having the whole ramdisk queued at
477 * once. We queue what we can and let the callbacks attempt to queue more. */
478 /* NOTE: may be called from callback, while dd->private still belongs to
479 * the underlying driver */
480 static int ramdisk_flush(td_driver_t *driver, struct tdremus_state* s)
481 {
482 uint64_t* sectors;
483 char* buf;
484 uint64_t base, batchlen;
485 int i, j, count = 0;
487 // RPRINTF("ramdisk flush\n");
489 if ((count = ramdisk_get_sectors(s->ramdisk.prev, &sectors)) <= 0)
490 return count;
492 /*
493 RPRINTF("ramdisk: flushing %d sectors\n", count);
494 */
496 /* sort and merge sectors to improve disk performance */
497 qsort(sectors, count, sizeof(*sectors), uint64_compare);
499 for (i = 0; i < count;) {
500 base = sectors[i++];
501 while (i < count && sectors[i] == sectors[i-1] + 1)
502 i++;
503 batchlen = sectors[i-1] - base + 1;
505 if (!(buf = merge_requests(&s->ramdisk, base, batchlen))) {
506 RPRINTF("ramdisk_flush: merge_requests failed\n");
507 free(sectors);
508 return -1;
509 }
511 /* NOTE: create_write_request() creates a treq AND forwards it down
512 * the driver chain */
513 // RPRINTF("forwarding write request at %" PRIu64 ", length: %" PRIu64 "\n", base, batchlen);
514 create_write_request(s, base, batchlen, buf);
515 //RPRINTF("write request at %" PRIu64 ", length: %" PRIu64 " forwarded\n", base, batchlen);
517 s->ramdisk.inflight++;
519 for (j = 0; j < batchlen; j++) {
520 hashtable_remove(s->ramdisk.prev, &base);
521 base++;
522 }
523 }
525 if (!hashtable_count(s->ramdisk.prev)) {
526 /* everything is in flight */
527 hashtable_destroy(s->ramdisk.prev, 0);
528 s->ramdisk.prev = NULL;
529 }
531 free(sectors);
533 // RPRINTF("ramdisk flush done\n");
534 return 0;
535 }
537 /* flush ramdisk contents to disk */
538 static int ramdisk_start_flush(td_driver_t *driver)
539 {
540 struct tdremus_state *s = (struct tdremus_state *)driver->data;
541 uint64_t* key;
542 char* buf;
543 int rc = 0;
544 int i, j, count, batchlen;
545 uint64_t* sectors;
547 if (!hashtable_count(s->ramdisk.h)) {
548 /*
549 RPRINTF("Nothing to flush\n");
550 */
551 return 0;
552 }
554 if (s->ramdisk.prev) {
555 /* a flush request issued while a previous flush is still in progress
556 * will merge with the previous request. If you want the previous
557 * request to be consistent, wait for it to complete. */
558 if ((count = ramdisk_get_sectors(s->ramdisk.h, &sectors)) < 0)
559 return count;
561 for (i = 0; i < count; i++) {
562 buf = hashtable_search(s->ramdisk.h, sectors + i);
563 ramdisk_write_hash(s->ramdisk.prev, sectors[i], buf,
564 s->ramdisk.sector_size);
565 }
566 free(sectors);
568 hashtable_destroy (s->ramdisk.h, 0);
569 } else
570 s->ramdisk.prev = s->ramdisk.h;
572 /* We create a new hashtable so that new writes can be performed before
573 * the old hashtable is completely drained. */
574 s->ramdisk.h = create_hashtable(RAMDISK_HASHSIZE, uint64_hash,
575 rd_hash_equal);
577 return ramdisk_flush(driver, s);
578 }
581 static int ramdisk_start(td_driver_t *driver)
582 {
583 struct tdremus_state *s = (struct tdremus_state *)driver->data;
585 if (s->ramdisk.h) {
586 RPRINTF("ramdisk already allocated\n");
587 return 0;
588 }
590 s->ramdisk.sector_size = driver->info.sector_size;
591 s->ramdisk.h = create_hashtable(RAMDISK_HASHSIZE, uint64_hash,
592 rd_hash_equal);
594 DPRINTF("Ramdisk started, %zu bytes/sector\n", s->ramdisk.sector_size);
596 return 0;
597 }
599 /* common client/server functions */
600 /* mayberead: Time out after a certain interval. */
601 static int mread(int fd, void* buf, size_t len)
602 {
603 fd_set rfds;
604 int rc;
605 size_t cur = 0;
606 struct timeval tv = {
607 .tv_sec = HEARTBEAT_MS / 1000,
608 .tv_usec = (HEARTBEAT_MS % 1000) * 1000
609 };
611 if (!len)
612 return 0;
614 /* read first. Only select if read is incomplete. */
615 rc = read(fd, buf, len);
616 while (rc < 0 || cur + rc < len) {
617 if (!rc) {
618 RPRINTF("end-of-file");
619 return -1;
620 }
621 if (rc < 0 && errno != EAGAIN) {
622 RPRINTF("error during read: %s\n", strerror(errno));
623 return -1;
624 }
625 if (rc > 0)
626 cur += rc;
628 FD_ZERO(&rfds);
629 FD_SET(fd, &rfds);
630 if (!(rc = select(fd + 1, &rfds, NULL, NULL, &tv))) {
631 RPRINTF("time out during read\n");
632 return -1;
633 } else if (rc < 0) {
634 RPRINTF("error during select: %d\n", errno);
635 return -1;
636 }
637 rc = read(fd, buf + cur, len - cur);
638 }
639 /*
640 RPRINTF("read %d bytes\n", cur + rc);
641 */
643 return 0;
644 }
646 static int mwrite(int fd, void* buf, size_t len)
647 {
648 fd_set wfds;
649 size_t cur = 0;
650 int rc;
651 struct timeval tv = {
652 .tv_sec = HEARTBEAT_MS / 1000,
653 .tv_usec = (HEARTBEAT_MS % 1000) * 1000
654 };
656 if (!len)
657 return 0;
659 /* read first. Only select if read is incomplete. */
660 rc = write(fd, buf, len);
661 while (rc < 0 || cur + rc < len) {
662 if (!rc) {
663 RPRINTF("end-of-file");
664 return -1;
665 }
666 if (rc < 0 && errno != EAGAIN) {
667 RPRINTF("error during write: %s\n", strerror(errno));
668 return -1;
669 }
670 if (rc > 0)
671 cur += rc;
673 FD_ZERO(&wfds);
674 FD_SET(fd, &wfds);
675 if (!(rc = select(fd + 1, NULL, &wfds, NULL, &tv))) {
676 RPRINTF("time out during write\n");
677 return -1;
678 } else if (rc < 0) {
679 RPRINTF("error during select: %d\n", errno);
680 return -1;
681 }
682 rc = write(fd, buf + cur, len - cur);
683 }
684 /*
685 RPRINTF("wrote %d bytes\n", cur + rc);
686 */
688 return 0;
689 FD_ZERO(&wfds);
690 FD_SET(fd, &wfds);
691 select(fd + 1, NULL, &wfds, NULL, &tv);
692 }
695 static void inline close_stream_fd(struct tdremus_state *s)
696 {
697 /* XXX: -2 is magic. replace with macro perhaps? */
698 tapdisk_server_unregister_event(s->stream_fd.id);
699 close(s->stream_fd.fd);
700 s->stream_fd.fd = -2;
701 }
703 /* primary functions */
704 static void remus_client_event(event_id_t, char mode, void *private);
705 static void remus_connect_event(event_id_t id, char mode, void *private);
706 static void remus_retry_connect_event(event_id_t id, char mode, void *private);
708 static int primary_do_connect(struct tdremus_state *state)
709 {
710 event_id_t id;
711 int fd;
712 int rc;
713 int flags;
715 RPRINTF("client connecting to %s:%d...\n", inet_ntoa(state->sa.sin_addr), ntohs(state->sa.sin_port));
717 if ((fd = socket(PF_INET, SOCK_STREAM, 0)) < 0) {
718 RPRINTF("could not create client socket: %d\n", errno);
719 return -1;
720 }
722 /* make socket nonblocking */
723 if ((flags = fcntl(fd, F_GETFL, 0)) == -1)
724 flags = 0;
725 if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1)
726 return -1;
728 /* once we have created the socket and populated the address, we can now start
729 * our non-blocking connect. rather than duplicating code we trigger a timeout
730 * on the socket fd, which calls out nonblocking connect code
731 */
732 if((id = tapdisk_server_register_event(SCHEDULER_POLL_TIMEOUT, fd, 0, remus_retry_connect_event, state)) < 0) {
733 RPRINTF("error registering timeout client connection event handler: %s\n", strerror(id));
734 /* TODO: we leak a fd here */
735 return -1;
736 }
737 state->stream_fd.fd = fd;
738 state->stream_fd.id = id;
739 return 0;
740 }
742 static int primary_blocking_connect(struct tdremus_state *state)
743 {
744 int fd;
745 int id;
746 int rc;
747 int flags;
749 RPRINTF("client connecting to %s:%d...\n", inet_ntoa(state->sa.sin_addr), ntohs(state->sa.sin_port));
751 if ((fd = socket(PF_INET, SOCK_STREAM, 0)) < 0) {
752 RPRINTF("could not create client socket: %d\n", errno);
753 return -1;
754 }
756 do {
757 if ((rc = connect(fd, (struct sockaddr *)&state->sa,
758 sizeof(state->sa))) < 0)
759 {
760 if (errno == ECONNREFUSED) {
761 RPRINTF("connection refused -- retrying in 1 second\n");
762 sleep(1);
763 } else {
764 RPRINTF("connection failed: %d\n", errno);
765 close(fd);
766 return -1;
767 }
768 }
769 } while (rc < 0);
771 RPRINTF("client connected\n");
773 /* make socket nonblocking */
774 if ((flags = fcntl(fd, F_GETFL, 0)) == -1)
775 flags = 0;
776 if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1)
777 {
778 RPRINTF("error making socket nonblocking\n");
779 close(fd);
780 return -1;
781 }
783 if((id = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD, fd, 0, remus_client_event, state)) < 0) {
784 RPRINTF("error registering client event handler: %s\n", strerror(id));
785 close(fd);
786 return -1;
787 }
789 state->stream_fd.fd = fd;
790 state->stream_fd.id = id;
791 return 0;
792 }
794 /* on read, just pass request through */
795 static void primary_queue_read(td_driver_t *driver, td_request_t treq)
796 {
797 /* just pass read through */
798 td_forward_request(treq);
799 }
801 /* TODO:
802 * The primary uses mwrite() to write the contents of a write request to the
803 * backup. This effectively blocks until all data has been copied into a system
804 * buffer or a timeout has occured. We may wish to instead use tapdisk's
805 * nonblocking i/o interface, tapdisk_server_register_event(), to set timeouts
806 * and write data in an asynchronous fashion.
807 */
808 static void primary_queue_write(td_driver_t *driver, td_request_t treq)
809 {
810 struct tdremus_state *s = (struct tdremus_state *)driver->data;
812 char header[sizeof(uint32_t) + sizeof(uint64_t)];
813 uint32_t *sectors = (uint32_t *)header;
814 uint64_t *sector = (uint64_t *)(header + sizeof(uint32_t));
816 // RPRINTF("write: stream_fd.fd: %d\n", s->stream_fd.fd);
818 /* -1 means we haven't connected yet, -2 means the connection was lost */
819 if(s->stream_fd.fd == -1) {
820 RPRINTF("connecting to backup...\n");
821 primary_blocking_connect(s);
822 }
824 *sectors = treq.secs;
825 *sector = treq.sec;
827 if (mwrite(s->stream_fd.fd, TDREMUS_WRITE, strlen(TDREMUS_WRITE)) < 0)
828 goto fail;
829 if (mwrite(s->stream_fd.fd, header, sizeof(header)) < 0)
830 goto fail;
832 if (mwrite(s->stream_fd.fd, treq.buf, treq.secs * driver->info.sector_size) < 0)
833 goto fail;
835 td_forward_request(treq);
837 return;
839 fail:
840 /* switch to unprotected mode and tell tapdisk to retry */
841 RPRINTF("write request replication failed, switching to unprotected mode");
842 switch_mode(s->tdremus_driver, mode_unprotected);
843 td_complete_request(treq, -EBUSY);
844 }
847 static int client_flush(td_driver_t *driver)
848 {
849 struct tdremus_state *s = (struct tdremus_state *)driver->data;
851 // RPRINTF("committing output\n");
853 if (s->stream_fd.fd == -1)
854 /* connection not yet established, nothing to flush */
855 return 0;
857 if (mwrite(s->stream_fd.fd, TDREMUS_COMMIT, strlen(TDREMUS_COMMIT)) < 0) {
858 RPRINTF("error flushing output");
859 close_stream_fd(s);
860 return -1;
861 }
863 return 0;
864 }
866 static int primary_start(td_driver_t *driver)
867 {
868 struct tdremus_state *s = (struct tdremus_state *)driver->data;
870 RPRINTF("activating client mode\n");
872 tapdisk_remus.td_queue_read = primary_queue_read;
873 tapdisk_remus.td_queue_write = primary_queue_write;
874 s->queue_flush = client_flush;
876 s->stream_fd.fd = -1;
877 s->stream_fd.id = -1;
879 return 0;
880 }
882 /* timeout callback */
883 static void remus_retry_connect_event(event_id_t id, char mode, void *private)
884 {
885 struct tdremus_state *s = (struct tdremus_state *)private;
887 /* do a non-blocking connect */
888 if (connect(s->stream_fd.fd, (struct sockaddr *)&s->sa, sizeof(s->sa))
889 && errno != EINPROGRESS)
890 {
891 if(errno == ECONNREFUSED || errno == ENETUNREACH || errno == EAGAIN || errno == ECONNABORTED)
892 {
893 /* try again in a second */
894 tapdisk_server_unregister_event(s->stream_fd.id);
895 if((id = tapdisk_server_register_event(SCHEDULER_POLL_TIMEOUT, s->stream_fd.fd, REMUS_CONNRETRY_TIMEOUT, remus_retry_connect_event, s)) < 0) {
896 RPRINTF("error registering timeout client connection event handler: %s\n", strerror(id));
897 return;
898 }
899 s->stream_fd.id = id;
900 }
901 else
902 {
903 /* not recoverable */
904 RPRINTF("error connection to server %s\n", strerror(errno));
905 return;
906 }
907 }
908 else
909 {
910 /* the connect returned EINPROGRESS (nonblocking connect) we must wait for the fd to be writeable to determine if the connect worked */
912 tapdisk_server_unregister_event(s->stream_fd.id);
913 if((id = tapdisk_server_register_event(SCHEDULER_POLL_WRITE_FD, s->stream_fd.fd, 0, remus_connect_event, s)) < 0) {
914 RPRINTF("error registering client connection event handler: %s\n", strerror(id));
915 return;
916 }
917 s->stream_fd.id = id;
918 }
919 }
921 /* callback when nonblocking connect() is finished */
922 /* called only by primary in unprotected state */
923 static void remus_connect_event(event_id_t id, char mode, void *private)
924 {
925 int socket_errno;
926 socklen_t socket_errno_size;
927 struct tdremus_state *s = (struct tdremus_state *)private;
929 /* check to se if the connect succeeded */
930 socket_errno_size = sizeof(socket_errno);
931 if (getsockopt(s->stream_fd.fd, SOL_SOCKET, SO_ERROR, &socket_errno, &socket_errno_size)) {
932 RPRINTF("error getting socket errno\n");
933 return;
934 }
936 RPRINTF("socket connect returned %d\n", socket_errno);
938 if(socket_errno)
939 {
940 /* the connect did not succeed */
942 if(socket_errno == ECONNREFUSED || socket_errno == ENETUNREACH || socket_errno == ETIMEDOUT
943 || socket_errno == ECONNABORTED || socket_errno == EAGAIN)
944 {
945 /* we can probably assume that the backup is down. just try again later */
946 tapdisk_server_unregister_event(s->stream_fd.id);
947 if((id = tapdisk_server_register_event(SCHEDULER_POLL_TIMEOUT, s->stream_fd.fd, REMUS_CONNRETRY_TIMEOUT, remus_retry_connect_event, s)) < 0) {
948 RPRINTF("error registering timeout client connection event handler: %s\n", strerror(id));
949 return;
950 }
951 s->stream_fd.id = id;
952 }
953 else
954 {
955 RPRINTF("socket connect returned %d, giving up\n", socket_errno);
956 }
957 }
958 else
959 {
960 /* the connect succeeded */
962 /* unregister this function and register a new event handler */
963 tapdisk_server_unregister_event(s->stream_fd.id);
964 if((id = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD, s->stream_fd.fd, 0, remus_client_event, s)) < 0) {
965 RPRINTF("error registering client event handler: %s\n", strerror(id));
966 return;
967 }
968 s->stream_fd.id = id;
970 /* switch from unprotected to protected client */
971 switch_mode(s->tdremus_driver, mode_primary);
972 }
973 }
976 /* we install this event handler on the primary once we have connected to the backup */
977 /* wait for "done" message to commit checkpoint */
978 static void remus_client_event(event_id_t id, char mode, void *private)
979 {
980 struct tdremus_state *s = (struct tdremus_state *)private;
981 char req[5];
982 int rc;
984 if (mread(s->stream_fd.fd, req, sizeof(req) - 1) < 0) {
985 /* replication stream closed or otherwise broken (timeout, reset, &c) */
986 RPRINTF("error reading from backup\n");
987 close_stream_fd(s);
988 return;
989 }
991 req[4] = '\0';
993 if (!strcmp(req, TDREMUS_DONE))
994 /* checkpoint committed, inform msg_fd */
995 ctl_respond(s, TDREMUS_DONE);
996 else {
997 RPRINTF("received unknown message: %s\n", req);
998 close_stream_fd(s);
999 }
1001 return;
1004 /* backup functions */
1005 static void remus_server_event(event_id_t id, char mode, void *private);
1007 /* returns the socket that receives write requests */
1008 static void remus_server_accept(event_id_t id, char mode, void* private)
1010 struct tdremus_state* s = (struct tdremus_state *) private;
1012 int stream_fd;
1013 event_id_t cid;
1015 /* XXX: add address-based black/white list */
1016 if ((stream_fd = accept(s->server_fd.fd, NULL, NULL)) < 0) {
1017 RPRINTF("error accepting connection: %d\n", errno);
1018 return;
1021 /* TODO: check to see if we are already replicating. if so just close the
1022 * connection (or do something smarter) */
1023 RPRINTF("server accepted connection\n");
1025 /* add tapdisk event for replication stream */
1026 cid = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD, stream_fd, 0,
1027 remus_server_event, s);
1029 if(cid < 0) {
1030 RPRINTF("error registering connection event handler: %s\n", strerror(errno));
1031 close(stream_fd);
1032 return;
1035 /* store replication file descriptor */
1036 s->stream_fd.fd = stream_fd;
1037 s->stream_fd.id = cid;
1040 /* returns -2 if EADDRNOTAVAIL */
1041 static int remus_bind(struct tdremus_state* s)
1043 // struct sockaddr_in sa;
1044 int opt;
1045 int rc = -1;
1047 if ((s->server_fd.fd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
1048 RPRINTF("could not create server socket: %d\n", errno);
1049 return rc;
1051 opt = 1;
1052 if (setsockopt(s->server_fd.fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0)
1053 RPRINTF("Error setting REUSEADDR on %d: %d\n", s->server_fd.fd, errno);
1055 if (bind(s->server_fd.fd, (struct sockaddr *)&s->sa, sizeof(s->sa)) < 0) {
1056 RPRINTF("could not bind server socket %d to %s:%d: %d %s\n", s->server_fd.fd,
1057 inet_ntoa(s->sa.sin_addr), ntohs(s->sa.sin_port), errno, strerror(errno));
1058 if (errno != EADDRINUSE)
1059 rc = -2;
1060 goto err_sfd;
1062 if (listen(s->server_fd.fd, 10)) {
1063 RPRINTF("could not listen on socket: %d\n", errno);
1064 goto err_sfd;
1067 /* The socket s now bound to the address and listening so we may now register
1068 * the fd with tapdisk */
1070 if((s->server_fd.id = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD,
1071 s->server_fd.fd, 0,
1072 remus_server_accept, s)) < 0) {
1073 RPRINTF("error registering server connection event handler: %s",
1074 strerror(s->server_fd.id));
1075 goto err_sfd;
1078 return 0;
1080 err_sfd:
1081 close(s->server_fd.fd);
1082 s->server_fd.fd = -1;
1084 return rc;
1087 /* wait for latest checkpoint to be applied */
1088 static inline int server_writes_inflight(td_driver_t *driver)
1090 struct tdremus_state *s = (struct tdremus_state *)driver->data;
1092 if (!s->ramdisk.inflight && !s->ramdisk.prev)
1093 return 0;
1095 return 1;
1098 /* Due to block device prefetching this code may be called on the server side
1099 * during normal replication. In this case we must return EBUSY, otherwise the
1100 * domain may be started with stale data.
1101 */
1102 void backup_queue_read(td_driver_t *driver, td_request_t treq)
1104 struct tdremus_state *s = (struct tdremus_state *)driver->data;
1106 if(!remus_image)
1107 remus_image = treq.image;
1109 #if 0
1110 /* due to prefetching, we must return EBUSY on server reads. This
1111 * maintains a consistent disk image */
1112 td_complete_request(treq, -EBUSY);
1113 #else
1114 /* what exactly is the race that requires the response above? */
1115 td_forward_request(treq);
1116 #endif
1119 /* see above */
1120 void backup_queue_write(td_driver_t *driver, td_request_t treq)
1122 struct tdremus_state *s = (struct tdremus_state *)driver->data;
1124 /* on a server write, we know the domain has failed over. we must change our
1125 * state to unprotected and then have the unprotected queue_write function
1126 * handle the write
1127 */
1129 switch_mode(driver, mode_unprotected);
1130 /* TODO: call the appropriate write function rather than return EBUSY */
1131 td_complete_request(treq, -EBUSY);
1134 static int backup_start(td_driver_t *driver)
1136 struct tdremus_state *s = (struct tdremus_state *)driver->data;
1137 int fd;
1139 if (ramdisk_start(driver) < 0)
1140 return -1;
1142 tapdisk_remus.td_queue_read = backup_queue_read;
1143 tapdisk_remus.td_queue_write = backup_queue_write;
1144 /* TODO set flush function */
1145 return 0;
1148 static int server_do_wreq(td_driver_t *driver)
1150 struct tdremus_state *s = (struct tdremus_state *)driver->data;
1151 static tdremus_wire_t twreq;
1152 char buf[4096];
1153 int len, rc;
1155 char header[sizeof(uint32_t) + sizeof(uint64_t)];
1156 uint32_t *sectors = (uint32_t *) header;
1157 uint64_t *sector = (uint64_t *) &header[sizeof(uint32_t)];
1159 // RPRINTF("received write request\n");
1161 if (mread(s->stream_fd.fd, header, sizeof(header)) < 0)
1162 goto err;
1164 len = *sectors * driver->info.sector_size;
1166 //RPRINTF("writing %d sectors (%d bytes) starting at %" PRIu64 "\n", *sectors, len,
1167 // *sector);
1169 if (len > sizeof(buf)) {
1170 /* freak out! */
1171 RPRINTF("write request too large: %d/%u\n", len, (unsigned)sizeof(buf));
1172 return -1;
1175 if (mread(s->stream_fd.fd, buf, len) < 0)
1176 goto err;
1178 if (ramdisk_write(&s->ramdisk, *sector, *sectors, buf) < 0)
1179 goto err;
1181 return 0;
1183 err:
1184 /* should start failover */
1185 RPRINTF("backup write request error\n");
1186 close_stream_fd(s);
1188 return -1;
1191 static int server_do_sreq(td_driver_t *driver)
1193 /*
1194 RPRINTF("submit request received\n");
1195 */
1197 return 0;
1200 /* at this point, the server can start applying the most recent
1201 * ramdisk. */
1202 static int server_do_creq(td_driver_t *driver)
1204 struct tdremus_state *s = (struct tdremus_state *)driver->data;
1206 // RPRINTF("committing buffer\n");
1208 ramdisk_start_flush(driver);
1210 /* XXX this message should not be sent until flush completes! */
1211 if (write(s->stream_fd.fd, TDREMUS_DONE, strlen(TDREMUS_DONE)) != 4)
1212 return -1;
1214 return 0;
1218 /* called when data is pending in s->rfd */
1219 static void remus_server_event(event_id_t id, char mode, void *private)
1221 struct tdremus_state *s = (struct tdremus_state *)private;
1222 td_driver_t *driver = s->tdremus_driver;
1223 char req[5];
1225 // RPRINTF("replication data waiting\n");
1227 /* TODO: add a get_connection_by_event_id() function.
1228 * for now we can assume that the fd is s->stream_fd */
1230 if (mread(s->stream_fd.fd, req, sizeof(req) - 1) < 0) {
1231 RPRINTF("error reading server event, activating backup\n");
1232 switch_mode(driver, mode_unprotected);
1233 return;
1236 req[4] = '\0';
1238 if (!strcmp(req, TDREMUS_WRITE))
1239 server_do_wreq(driver);
1240 else if (!strcmp(req, TDREMUS_SUBMIT))
1241 server_do_sreq(driver);
1242 else if (!strcmp(req, TDREMUS_COMMIT))
1243 server_do_creq(driver);
1244 else
1245 RPRINTF("unknown request received: %s\n", req);
1247 return;
1251 /* unprotected */
1253 void unprotected_queue_read(td_driver_t *driver, td_request_t treq)
1255 struct tdremus_state *s = (struct tdremus_state *)driver->data;
1257 /* wait for previous ramdisk to flush before servicing reads */
1258 if (server_writes_inflight(driver)) {
1259 /* for now lets just return EBUSY. if this becomes an issue we can
1260 * do something smarter */
1261 td_complete_request(treq, -EBUSY);
1263 else {
1264 /* here we just pass reads through */
1265 td_forward_request(treq);
1269 /* For a recoverable remus solution we need to log unprotected writes here */
1270 void unprotected_queue_write(td_driver_t *driver, td_request_t treq)
1272 struct tdremus_state *s = (struct tdremus_state *)driver->data;
1274 /* wait for previous ramdisk to flush */
1275 if (server_writes_inflight(driver)) {
1276 RPRINTF("queue_write: waiting for queue to drain");
1277 td_complete_request(treq, -EBUSY);
1279 else {
1280 // RPRINTF("servicing write request on backup\n");
1281 td_forward_request(treq);
1285 static int unprotected_start(td_driver_t *driver)
1287 struct tdremus_state *s = (struct tdremus_state *)driver->data;
1289 RPRINTF("failure detected, activating passthrough\n");
1291 /* close the server socket */
1292 close_stream_fd(s);
1294 /* unregister the replication stream */
1295 tapdisk_server_unregister_event(s->server_fd.id);
1297 /* close the replication stream */
1298 close(s->server_fd.fd);
1299 s->server_fd.fd = -1;
1301 /* install the unprotected read/write handlers */
1302 tapdisk_remus.td_queue_read = unprotected_queue_read;
1303 tapdisk_remus.td_queue_write = unprotected_queue_write;
1305 return 0;
1309 /* control */
1311 static inline int resolve_address(const char* addr, struct in_addr* ia)
1313 struct hostent* he;
1314 uint32_t ip;
1316 if (!(he = gethostbyname(addr))) {
1317 RPRINTF("error resolving %s: %d\n", addr, h_errno);
1318 return -1;
1321 if (!he->h_addr_list[0]) {
1322 RPRINTF("no address found for %s\n", addr);
1323 return -1;
1326 /* network byte order */
1327 ip = *((uint32_t**)he->h_addr_list)[0];
1328 ia->s_addr = ip;
1330 return 0;
1333 static int get_args(td_driver_t *driver, const char* name)
1335 struct tdremus_state *state = (struct tdremus_state *)driver->data;
1336 char* host;
1337 char* port;
1338 // char* driver_str;
1339 // char* parent;
1340 // int type;
1341 // char* path;
1342 // unsigned long ulport;
1343 // int i;
1344 // struct sockaddr_in server_addr_in;
1346 int gai_status;
1347 int valid_addr;
1348 struct addrinfo gai_hints;
1349 struct addrinfo *servinfo, *servinfo_itr;
1351 memset(&gai_hints, 0, sizeof gai_hints);
1352 gai_hints.ai_family = AF_UNSPEC;
1353 gai_hints.ai_socktype = SOCK_STREAM;
1355 port = strchr(name, ':');
1356 if (!port) {
1357 RPRINTF("missing host in %s\n", name);
1358 return -ENOENT;
1360 if (!(host = strndup(name, port - name))) {
1361 RPRINTF("unable to allocate host\n");
1362 return -ENOMEM;
1364 port++;
1366 if ((gai_status = getaddrinfo(host, port, &gai_hints, &servinfo)) != 0) {
1367 RPRINTF("getaddrinfo error: %s\n", gai_strerror(gai_status));
1368 return -ENOENT;
1371 /* TODO: do something smarter here */
1372 valid_addr = 0;
1373 for(servinfo_itr = servinfo; servinfo_itr != NULL; servinfo_itr = servinfo_itr->ai_next) {
1374 void *addr;
1375 char *ipver;
1377 if (servinfo_itr->ai_family == AF_INET) {
1378 valid_addr = 1;
1379 memset(&state->sa, 0, sizeof(state->sa));
1380 state->sa = *(struct sockaddr_in *)servinfo_itr->ai_addr;
1381 break;
1384 freeaddrinfo(servinfo);
1386 if (!valid_addr)
1387 return -ENOENT;
1389 RPRINTF("host: %s, port: %d\n", inet_ntoa(state->sa.sin_addr), ntohs(state->sa.sin_port));
1391 return 0;
1394 static int switch_mode(td_driver_t *driver, enum tdremus_mode mode)
1396 struct tdremus_state *s = (struct tdremus_state *)driver->data;
1397 int rc;
1399 if (mode == s->mode)
1400 return 0;
1402 if (s->queue_flush)
1403 if ((rc = s->queue_flush(driver)) < 0) {
1404 // fall back to unprotected mode on error
1405 RPRINTF("switch_mode: error flushing queue (old: %d, new: %d)", s->mode, mode);
1406 mode = mode_unprotected;
1409 if (mode == mode_unprotected)
1410 rc = unprotected_start(driver);
1411 else if (mode == mode_primary)
1412 rc = primary_start(driver);
1413 else if (mode == mode_backup)
1414 rc = backup_start(driver);
1415 else {
1416 RPRINTF("unknown mode requested: %d\n", mode);
1417 rc = -1;
1420 if (!rc)
1421 s->mode = mode;
1423 return rc;
1426 static void ctl_request(event_id_t id, char mode, void *private)
1428 struct tdremus_state *s = (struct tdremus_state *)private;
1429 td_driver_t *driver = s->tdremus_driver;
1430 char msg[80];
1431 int rc;
1433 // RPRINTF("data waiting on control fifo\n");
1435 if (!(rc = read(s->ctl_fd.fd, msg, sizeof(msg) - 1 /* append nul */))) {
1436 RPRINTF("0-byte read received, reopening FIFO\n");
1437 /*TODO: we may have to unregister/re-register with tapdisk_server */
1438 close(s->ctl_fd.fd);
1439 RPRINTF("FIFO closed\n");
1440 if ((s->ctl_fd.fd = open(s->ctl_path, O_RDWR)) < 0) {
1441 RPRINTF("error reopening FIFO: %d\n", errno);
1443 return;
1446 if (rc < 0) {
1447 RPRINTF("error reading from FIFO: %d\n", errno);
1448 return;
1451 /* TODO: need to get driver somehow */
1452 msg[rc] = '\0';
1453 if (!strncmp(msg, "flush", 5)) {
1454 if (s->queue_flush)
1455 if ((rc = s->queue_flush(driver))) {
1456 RPRINTF("error passing flush request to backup");
1457 ctl_respond(s, TDREMUS_FAIL);
1459 } else {
1460 RPRINTF("unknown command: %s\n", msg);
1464 static int ctl_respond(struct tdremus_state *s, const char *response)
1466 int rc;
1468 if ((rc = write(s->msg_fd.fd, response, strlen(response))) < 0) {
1469 RPRINTF("error writing notification: %d\n", errno);
1470 close(s->msg_fd.fd);
1471 if ((s->msg_fd.fd = open(s->msg_path, O_RDWR)) < 0)
1472 RPRINTF("error reopening FIFO: %d\n", errno);
1475 return rc;
1478 /* must be called after the underlying driver has been initialized */
1479 static int ctl_open(td_driver_t *driver, const char* name)
1481 struct tdremus_state *s = (struct tdremus_state *)driver->data;
1482 int i, l;
1484 /* first we must ensure that BLKTAP_CTRL_DIR exists */
1485 if (mkdir(BLKTAP_CTRL_DIR, 0755) && errno != EEXIST)
1487 DPRINTF("error creating directory %s: %d\n", BLKTAP_CTRL_DIR, errno);
1488 return -1;
1491 /* use the device name to create the control fifo path */
1492 if (asprintf(&s->ctl_path, BLKTAP_CTRL_DIR "/remus_%s", name) < 0)
1493 return -1;
1494 /* scrub fifo pathname */
1495 for (i = strlen(BLKTAP_CTRL_DIR) + 1, l = strlen(s->ctl_path); i < l; i++) {
1496 if (strchr(":/", s->ctl_path[i]))
1497 s->ctl_path[i] = '_';
1499 if (asprintf(&s->msg_path, "%s.msg", s->ctl_path) < 0)
1500 goto err_ctlfifo;
1502 if (mkfifo(s->ctl_path, S_IRWXU|S_IRWXG|S_IRWXO) && errno != EEXIST) {
1503 RPRINTF("error creating control FIFO %s: %d\n", s->ctl_path, errno);
1504 goto err_msgfifo;
1507 if (mkfifo(s->msg_path, S_IRWXU|S_IRWXG|S_IRWXO) && errno != EEXIST) {
1508 RPRINTF("error creating message FIFO %s: %d\n", s->msg_path, errno);
1509 goto err_msgfifo;
1512 /* RDWR so that fd doesn't block select when no writer is present */
1513 if ((s->ctl_fd.fd = open(s->ctl_path, O_RDWR)) < 0) {
1514 RPRINTF("error opening control FIFO %s: %d\n", s->ctl_path, errno);
1515 goto err_msgfifo;
1518 if ((s->msg_fd.fd = open(s->msg_path, O_RDWR)) < 0) {
1519 RPRINTF("error opening message FIFO %s: %d\n", s->msg_path, errno);
1520 goto err_openctlfifo;
1523 RPRINTF("control FIFO %s\n", s->ctl_path);
1524 RPRINTF("message FIFO %s\n", s->msg_path);
1526 return 0;
1528 err_openctlfifo:
1529 close(s->ctl_fd.fd);
1530 err_msgfifo:
1531 free(s->msg_path);
1532 s->msg_path = NULL;
1533 err_ctlfifo:
1534 free(s->ctl_path);
1535 s->ctl_path = NULL;
1536 return -1;
1539 static void ctl_close(td_driver_t *driver)
1541 struct tdremus_state *s = (struct tdremus_state *)driver->data;
1543 /* TODO: close *all* connections */
1545 if(s->ctl_fd.fd)
1546 close(s->ctl_fd.fd);
1548 if (s->ctl_path) {
1549 unlink(s->ctl_path);
1550 free(s->ctl_path);
1551 s->ctl_path = NULL;
1553 if (s->msg_path) {
1554 unlink(s->msg_path);
1555 free(s->msg_path);
1556 s->msg_path = NULL;
1560 static int ctl_register(struct tdremus_state *s)
1562 RPRINTF("registering ctl fifo\n");
1564 /* register ctl fd */
1565 s->ctl_fd.id = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD, s->ctl_fd.fd, 0, ctl_request, s);
1567 if (s->ctl_fd.id < 0) {
1568 RPRINTF("error registering ctrl FIFO %s: %d\n", s->ctl_path, s->ctl_fd.id);
1569 return -1;
1572 return 0;
1575 /* interface */
1577 static int tdremus_open(td_driver_t *driver, const char *name,
1578 td_flag_t flags)
1580 struct tdremus_state *s = (struct tdremus_state *)driver->data;
1581 int rc;
1583 RPRINTF("opening %s\n", name);
1585 /* first we need to get the underlying vbd for this driver stack. To do so we
1586 * need to know the vbd's id. Fortunately, for tapdisk2 this is hard-coded as
1587 * 0 (see tapdisk2.c)
1588 */
1589 device_vbd = tapdisk_server_get_vbd(0);
1591 memset(s, 0, sizeof(*s));
1592 s->server_fd.fd = -1;
1593 s->stream_fd.fd = -1;
1594 s->ctl_fd.fd = -1;
1595 s->msg_fd.fd = -1;
1597 /* TODO: this is only needed so that the server can send writes down
1598 * the driver stack from the stream_fd event handler */
1599 s->tdremus_driver = driver;
1601 /* parse name to get info etc */
1602 if ((rc = get_args(driver, name)))
1603 return rc;
1605 if ((rc = ctl_open(driver, name))) {
1606 RPRINTF("error setting up control channel\n");
1607 free(s->driver_data);
1608 return rc;
1611 if ((rc = ctl_register(s))) {
1612 RPRINTF("error registering control channel\n");
1613 free(s->driver_data);
1614 return rc;
1617 if (!(rc = remus_bind(s)))
1618 rc = switch_mode(driver, mode_backup);
1619 else if (rc == -2)
1620 rc = switch_mode(driver, mode_primary);
1622 if (!rc)
1623 return 0;
1625 tdremus_close(driver);
1626 return -EIO;
1629 static int tdremus_close(td_driver_t *driver)
1631 struct tdremus_state *s = (struct tdremus_state *)driver->data;
1633 RPRINTF("closing\n");
1635 if (s->driver_data) {
1636 free(s->driver_data);
1637 s->driver_data = NULL;
1639 if (s->server_fd.fd >= 0) {
1640 close(s->server_fd.fd);
1641 s->server_fd.fd = -1;
1643 if (s->stream_fd.fd >= 0)
1644 close_stream_fd(s);
1646 ctl_close(driver);
1648 return 0;
1651 static int tdremus_get_parent_id(td_driver_t *driver, td_disk_id_t *id)
1653 /* we shouldn't have a parent... for now */
1654 return -EINVAL;
1657 static int tdremus_validate_parent(td_driver_t *driver,
1658 td_driver_t *pdriver, td_flag_t flags)
1660 return 0;
1663 struct tap_disk tapdisk_remus = {
1664 .disk_type = "tapdisk_remus",
1665 .private_data_size = sizeof(struct tdremus_state),
1666 .td_open = tdremus_open,
1667 .td_queue_read = unprotected_queue_read,
1668 .td_queue_write = unprotected_queue_write,
1669 .td_close = tdremus_close,
1670 .td_get_parent_id = tdremus_get_parent_id,
1671 .td_validate_parent = tdremus_validate_parent,
1672 .td_debug = NULL,
1673 };