debuggers.hg

view tools/blktap2/drivers/block-remus.c @ 22848:6341fe0f4e5a

Added tag 4.1.0-rc2 for changeset 9dca60d88c63
author Keir Fraser <keir@xen.org>
date Tue Jan 25 14:06:55 2011 +0000 (2011-01-25)
parents a5032d7a87e0
children
line source
1 /* block-remus.c
2 *
3 * This disk sends all writes to a backup via a network interface before
4 * passing them to an underlying device.
5 * The backup is a bit more complicated:
6 * 1. It applies all incoming writes to a ramdisk.
7 * 2. When a checkpoint request arrives, it moves the ramdisk to
8 * a committing state and uses a new ramdisk for subsequent writes.
9 * It also acknowledges the request, to let the sender know it can
10 * release output.
11 * 3. The ramdisk flushes its contents to the underlying driver.
12 * 4. At failover, the backup waits for the in-flight ramdisk (if any) to
13 * drain before letting the domain be activated.
14 *
15 * The driver determines whether it is the client or server by attempting
16 * to bind to the replication address. If the address is not local,
17 * the driver acts as client.
18 *
19 * The following messages are defined for the replication stream:
20 * 1. write request
21 * "wreq" 4
22 * num_sectors 4
23 * sector 8
24 * buffer (num_sectors * sector_size)
25 * 2. submit request (may be used as a barrier
26 * "sreq" 4
27 * 3. commit request
28 * "creq" 4
29 * After a commit request, the client must wait for a competion message:
30 * 4. completion
31 * "done" 4
32 */
34 /* due to architectural choices in tapdisk, block-buffer is forced to
35 * reimplement some code which is meant to be private */
36 #include "tapdisk.h"
37 #include "tapdisk-server.h"
38 #include "tapdisk-driver.h"
39 #include "tapdisk-interface.h"
40 #include "hashtable.h"
41 #include "hashtable_itr.h"
42 #include "hashtable_utility.h"
44 #include <errno.h>
45 #include <inttypes.h>
46 #include <fcntl.h>
47 #include <stdio.h>
48 #include <stdlib.h>
49 #include <string.h>
50 #include <sys/time.h>
51 #include <sys/types.h>
52 #include <sys/socket.h>
53 #include <netdb.h>
54 #include <netinet/in.h>
55 #include <arpa/inet.h>
56 #include <sys/param.h>
57 #include <sys/sysctl.h>
58 #include <unistd.h>
59 #include <sys/stat.h>
61 /* timeout for reads and writes in ms */
62 #define HEARTBEAT_MS 1000
63 #define RAMDISK_HASHSIZE 128
65 /* connect retry timeout (seconds) */
66 #define REMUS_CONNRETRY_TIMEOUT 10
68 #define RPRINTF(_f, _a...) syslog (LOG_DEBUG, "remus: " _f, ## _a)
70 enum tdremus_mode {
71 mode_invalid = 0,
72 mode_unprotected,
73 mode_primary,
74 mode_backup
75 };
77 struct tdremus_req {
78 uint64_t sector;
79 int nb_sectors;
80 char buf[4096];
81 };
83 struct req_ring {
84 /* waste one slot to distinguish between empty and full */
85 struct tdremus_req requests[MAX_REQUESTS * 2 + 1];
86 unsigned int head;
87 unsigned int tail;
88 };
90 /* TODO: This isn't very pretty, but to properly generate our own treqs (needed
91 * by the backup) we need to know our td_vbt_t and td_image_t (blktap2
92 * internals). As a proper fix, we should consider extending the tapdisk
93 * interface with a td_create_request() function, or something similar.
94 *
95 * For now, we just grab the vbd in the td_open() command, and the td_image_t
96 * from the first read request.
97 */
98 td_vbd_t *device_vbd = NULL;
99 td_image_t *remus_image = NULL;
100 struct tap_disk tapdisk_remus;
102 struct ramdisk {
103 size_t sector_size;
104 struct hashtable* h;
105 /* when a ramdisk is flushed, h is given a new empty hash for writes
106 * while the old ramdisk (prev) is drained asynchronously. To avoid
107 * a race where a read request points to a sector in prev which has
108 * not yet been flushed, check prev on a miss in h */
109 struct hashtable* prev;
110 /* count of outstanding requests to the base driver */
111 size_t inflight;
112 };
114 /* the ramdisk intercepts the original callback for reads and writes.
115 * This holds the original data. */
116 /* Might be worth making this a static array in struct ramdisk to avoid
117 * a malloc per request */
119 struct tdremus_state;
121 struct ramdisk_cbdata {
122 td_callback_t cb;
123 void* private;
124 char* buf;
125 struct tdremus_state* state;
126 };
128 struct ramdisk_write_cbdata {
129 struct tdremus_state* state;
130 char* buf;
131 };
133 typedef void (*queue_rw_t) (td_driver_t *driver, td_request_t treq);
135 /* poll_fd type for blktap2 fd system. taken from block_log.c */
136 typedef struct poll_fd {
137 int fd;
138 event_id_t id;
139 } poll_fd_t;
141 struct tdremus_state {
142 // struct tap_disk* driver;
143 void* driver_data;
145 /* XXX: this is needed so that the server can perform operations on
146 * the driver from the stream_fd event handler. fix this. */
147 td_driver_t *tdremus_driver;
149 /* TODO: we may wish to replace these two FIFOs with a unix socket */
150 char* ctl_path; /* receive flush instruction here */
151 poll_fd_t ctl_fd; /* io_fd slot for control FIFO */
152 char* msg_path; /* output completion message here */
153 poll_fd_t msg_fd;
155 /* replication host */
156 struct sockaddr_in sa;
157 poll_fd_t server_fd; /* server listen port */
158 poll_fd_t stream_fd; /* replication channel */
160 /* queue write requests, batch-replicate at submit */
161 struct req_ring write_ring;
163 /* ramdisk data*/
164 struct ramdisk ramdisk;
166 /* mode methods */
167 enum tdremus_mode mode;
168 int (*queue_flush)(td_driver_t *driver);
169 };
171 typedef struct tdremus_wire {
172 uint32_t op;
173 uint64_t id;
174 uint64_t sec;
175 uint32_t secs;
176 } tdremus_wire_t;
178 #define TDREMUS_READ "rreq"
179 #define TDREMUS_WRITE "wreq"
180 #define TDREMUS_SUBMIT "sreq"
181 #define TDREMUS_COMMIT "creq"
182 #define TDREMUS_DONE "done"
183 #define TDREMUS_FAIL "fail"
185 /* primary read/write functions */
186 static void primary_queue_read(td_driver_t *driver, td_request_t treq);
187 static void primary_queue_write(td_driver_t *driver, td_request_t treq);
189 /* backup read/write functions */
190 static void backup_queue_read(td_driver_t *driver, td_request_t treq);
191 static void backup_queue_write(td_driver_t *driver, td_request_t treq);
193 /* unpritected read/write functions */
194 static void unprotected_queue_read(td_driver_t *driver, td_request_t treq);
195 static void unprotected_queue_write(td_driver_t *driver, td_request_t treq);
197 static int tdremus_close(td_driver_t *driver);
199 static int switch_mode(td_driver_t *driver, enum tdremus_mode mode);
200 static int ctl_respond(struct tdremus_state *s, const char *response);
202 /* ring functions */
203 static inline unsigned int ring_next(struct req_ring* ring, unsigned int pos)
204 {
205 if (++pos >= MAX_REQUESTS * 2 + 1)
206 return 0;
208 return pos;
209 }
211 static inline int ring_isempty(struct req_ring* ring)
212 {
213 return ring->head == ring->tail;
214 }
216 static inline int ring_isfull(struct req_ring* ring)
217 {
218 return ring_next(ring, ring->tail) == ring->head;
219 }
221 /* functions to create and sumbit treq's */
223 static void
224 replicated_write_callback(td_request_t treq, int err)
225 {
226 struct tdremus_state *s = (struct tdremus_state *) treq.cb_data;
227 td_vbd_request_t *vreq;
229 vreq = (td_vbd_request_t *) treq.private;
231 /* the write failed for now, lets panic. this is very bad */
232 if (err) {
233 RPRINTF("ramdisk write failed, disk image is not consistent\n");
234 exit(-1);
235 }
237 /* The write succeeded. let's pull the vreq off whatever request list
238 * it is on and free() it */
239 list_del(&vreq->next);
240 free(vreq);
242 s->ramdisk.inflight--;
243 if (!s->ramdisk.inflight && !s->ramdisk.prev) {
244 /* TODO: the ramdisk has been flushed */
245 }
246 }
248 static inline int
249 create_write_request(struct tdremus_state *state, td_sector_t sec, int secs, char *buf)
250 {
251 td_request_t treq;
252 td_vbd_request_t *vreq;
254 treq.op = TD_OP_WRITE;
255 treq.buf = buf;
256 treq.sec = sec;
257 treq.secs = secs;
258 treq.image = remus_image;
259 treq.cb = replicated_write_callback;
260 treq.cb_data = state;
261 treq.id = 0;
262 treq.sidx = 0;
264 vreq = calloc(1, sizeof(td_vbd_request_t));
265 treq.private = vreq;
267 if(!vreq)
268 return -1;
270 vreq->submitting = 1;
271 INIT_LIST_HEAD(&vreq->next);
272 tapdisk_vbd_move_request(treq.private, &device_vbd->pending_requests);
274 /* TODO:
275 * we should probably leave it up to the caller to forward the request */
276 td_forward_request(treq);
278 vreq->submitting--;
280 return 0;
281 }
284 /* ramdisk methods */
285 static int ramdisk_flush(td_driver_t *driver, struct tdremus_state *s);
287 /* http://www.concentric.net/~Ttwang/tech/inthash.htm */
288 static unsigned int uint64_hash(void* k)
289 {
290 uint64_t key = *(uint64_t*)k;
292 key = (~key) + (key << 18);
293 key = key ^ (key >> 31);
294 key = key * 21;
295 key = key ^ (key >> 11);
296 key = key + (key << 6);
297 key = key ^ (key >> 22);
299 return (unsigned int)key;
300 }
302 static int rd_hash_equal(void* k1, void* k2)
303 {
304 uint64_t key1, key2;
306 key1 = *(uint64_t*)k1;
307 key2 = *(uint64_t*)k2;
309 return key1 == key2;
310 }
312 static int ramdisk_read(struct ramdisk* ramdisk, uint64_t sector,
313 int nb_sectors, char* buf)
314 {
315 int i;
316 char* v;
317 uint64_t key;
319 for (i = 0; i < nb_sectors; i++) {
320 key = sector + i;
321 if (!(v = hashtable_search(ramdisk->h, &key))) {
322 /* check whether it is queued in a previous flush request */
323 if (!(ramdisk->prev && (v = hashtable_search(ramdisk->prev, &key))))
324 return -1;
325 }
326 memcpy(buf + i * ramdisk->sector_size, v, ramdisk->sector_size);
327 }
329 return 0;
330 }
332 static int ramdisk_write_hash(struct hashtable* h, uint64_t sector, char* buf,
333 size_t len)
334 {
335 char* v;
336 uint64_t* key;
338 if ((v = hashtable_search(h, &sector))) {
339 memcpy(v, buf, len);
340 return 0;
341 }
343 if (!(v = malloc(len))) {
344 DPRINTF("ramdisk_write_hash: malloc failed\n");
345 return -1;
346 }
347 memcpy(v, buf, len);
348 if (!(key = malloc(sizeof(*key)))) {
349 DPRINTF("ramdisk_write_hash: error allocating key\n");
350 free(v);
351 return -1;
352 }
353 *key = sector;
354 if (!hashtable_insert(h, key, v)) {
355 DPRINTF("ramdisk_write_hash failed on sector %" PRIu64 "\n", sector);
356 free(key);
357 free(v);
358 return -1;
359 }
361 return 0;
362 }
364 static inline int ramdisk_write(struct ramdisk* ramdisk, uint64_t sector,
365 int nb_sectors, char* buf)
366 {
367 int i, rc;
369 for (i = 0; i < nb_sectors; i++) {
370 rc = ramdisk_write_hash(ramdisk->h, sector + i,
371 buf + i * ramdisk->sector_size,
372 ramdisk->sector_size);
373 if (rc)
374 return rc;
375 }
377 return 0;
378 }
380 static int ramdisk_write_cb(td_driver_t *driver, int res, uint64_t sector,
381 int nb_sectors, int id, void* private)
382 {
383 struct ramdisk_write_cbdata *cbdata = (struct ramdisk_write_cbdata*)private;
384 struct tdremus_state *s = cbdata->state;
385 int rc;
387 /*
388 RPRINTF("ramdisk write callback: rc %d, %d sectors @ %" PRIu64 "\n", res, nb_sectors,
389 sector);
390 */
392 free(cbdata->buf);
393 free(cbdata);
395 s->ramdisk.inflight--;
396 if (!s->ramdisk.inflight && !s->ramdisk.prev) {
397 /* when this reaches 0 and prev is empty, the disk is flushed. */
398 /*
399 RPRINTF("ramdisk flush complete\n");
400 */
401 }
403 if (s->ramdisk.prev) {
404 /* resubmit as much as possible in the remaining disk */
405 /*
406 RPRINTF("calling ramdisk_flush from write callback\n");
407 */
408 return ramdisk_flush(driver, s);
409 }
411 return 0;
412 }
414 static int uint64_compare(const void* k1, const void* k2)
415 {
416 uint64_t u1 = *(uint64_t*)k1;
417 uint64_t u2 = *(uint64_t*)k2;
419 /* u1 - u2 is unsigned */
420 return u1 < u2 ? -1 : u1 > u2 ? 1 : 0;
421 }
423 /* set psectors to an array of the sector numbers in the hash, returning
424 * the number of entries (or -1 on error) */
425 static int ramdisk_get_sectors(struct hashtable* h, uint64_t** psectors)
426 {
427 struct hashtable_itr* itr;
428 uint64_t* sectors;
429 int count;
431 if (!(count = hashtable_count(h)))
432 return 0;
434 if (!(*psectors = malloc(count * sizeof(uint64_t)))) {
435 DPRINTF("ramdisk_get_sectors: error allocating sector map\n");
436 return -1;
437 }
438 sectors = *psectors;
440 itr = hashtable_iterator(h);
441 count = 0;
442 do {
443 sectors[count++] = *(uint64_t*)hashtable_iterator_key(itr);
444 } while (hashtable_iterator_advance(itr));
445 free(itr);
447 return count;
448 }
450 static char* merge_requests(struct ramdisk* ramdisk, uint64_t start,
451 size_t count)
452 {
453 char* buf;
454 char* sector;
455 int i;
457 if (!(buf = valloc(count * ramdisk->sector_size))) {
458 DPRINTF("merge_request: allocation failed\n");
459 return NULL;
460 }
462 for (i = 0; i < count; i++) {
463 if (!(sector = hashtable_search(ramdisk->prev, &start))) {
464 DPRINTF("merge_request: lookup failed on %"PRIu64"\n", start);
465 return NULL;
466 }
468 memcpy(buf + i * ramdisk->sector_size, sector, ramdisk->sector_size);
469 free(sector);
471 start++;
472 }
474 return buf;
475 }
477 /* The underlying driver may not handle having the whole ramdisk queued at
478 * once. We queue what we can and let the callbacks attempt to queue more. */
479 /* NOTE: may be called from callback, while dd->private still belongs to
480 * the underlying driver */
481 static int ramdisk_flush(td_driver_t *driver, struct tdremus_state* s)
482 {
483 uint64_t* sectors;
484 char* buf;
485 uint64_t base, batchlen;
486 int i, j, count = 0;
488 // RPRINTF("ramdisk flush\n");
490 if ((count = ramdisk_get_sectors(s->ramdisk.prev, &sectors)) <= 0)
491 return count;
493 /*
494 RPRINTF("ramdisk: flushing %d sectors\n", count);
495 */
497 /* sort and merge sectors to improve disk performance */
498 qsort(sectors, count, sizeof(*sectors), uint64_compare);
500 for (i = 0; i < count;) {
501 base = sectors[i++];
502 while (i < count && sectors[i] == sectors[i-1] + 1)
503 i++;
504 batchlen = sectors[i-1] - base + 1;
506 if (!(buf = merge_requests(&s->ramdisk, base, batchlen))) {
507 RPRINTF("ramdisk_flush: merge_requests failed\n");
508 free(sectors);
509 return -1;
510 }
512 /* NOTE: create_write_request() creates a treq AND forwards it down
513 * the driver chain */
514 // RPRINTF("forwarding write request at %" PRIu64 ", length: %" PRIu64 "\n", base, batchlen);
515 create_write_request(s, base, batchlen, buf);
516 //RPRINTF("write request at %" PRIu64 ", length: %" PRIu64 " forwarded\n", base, batchlen);
518 s->ramdisk.inflight++;
520 for (j = 0; j < batchlen; j++) {
521 hashtable_remove(s->ramdisk.prev, &base);
522 base++;
523 }
524 }
526 if (!hashtable_count(s->ramdisk.prev)) {
527 /* everything is in flight */
528 hashtable_destroy(s->ramdisk.prev, 0);
529 s->ramdisk.prev = NULL;
530 }
532 free(sectors);
534 // RPRINTF("ramdisk flush done\n");
535 return 0;
536 }
538 /* flush ramdisk contents to disk */
539 static int ramdisk_start_flush(td_driver_t *driver)
540 {
541 struct tdremus_state *s = (struct tdremus_state *)driver->data;
542 uint64_t* key;
543 char* buf;
544 int rc = 0;
545 int i, j, count, batchlen;
546 uint64_t* sectors;
548 if (!hashtable_count(s->ramdisk.h)) {
549 /*
550 RPRINTF("Nothing to flush\n");
551 */
552 return 0;
553 }
555 if (s->ramdisk.prev) {
556 /* a flush request issued while a previous flush is still in progress
557 * will merge with the previous request. If you want the previous
558 * request to be consistent, wait for it to complete. */
559 if ((count = ramdisk_get_sectors(s->ramdisk.h, &sectors)) < 0)
560 return count;
562 for (i = 0; i < count; i++) {
563 buf = hashtable_search(s->ramdisk.h, sectors + i);
564 ramdisk_write_hash(s->ramdisk.prev, sectors[i], buf,
565 s->ramdisk.sector_size);
566 }
567 free(sectors);
569 hashtable_destroy (s->ramdisk.h, 0);
570 } else
571 s->ramdisk.prev = s->ramdisk.h;
573 /* We create a new hashtable so that new writes can be performed before
574 * the old hashtable is completely drained. */
575 s->ramdisk.h = create_hashtable(RAMDISK_HASHSIZE, uint64_hash,
576 rd_hash_equal);
578 return ramdisk_flush(driver, s);
579 }
582 static int ramdisk_start(td_driver_t *driver)
583 {
584 struct tdremus_state *s = (struct tdremus_state *)driver->data;
586 if (s->ramdisk.h) {
587 RPRINTF("ramdisk already allocated\n");
588 return 0;
589 }
591 s->ramdisk.sector_size = driver->info.sector_size;
592 s->ramdisk.h = create_hashtable(RAMDISK_HASHSIZE, uint64_hash,
593 rd_hash_equal);
595 DPRINTF("Ramdisk started, %zu bytes/sector\n", s->ramdisk.sector_size);
597 return 0;
598 }
600 /* common client/server functions */
601 /* mayberead: Time out after a certain interval. */
602 static int mread(int fd, void* buf, size_t len)
603 {
604 fd_set rfds;
605 int rc;
606 size_t cur = 0;
607 struct timeval tv = {
608 .tv_sec = HEARTBEAT_MS / 1000,
609 .tv_usec = (HEARTBEAT_MS % 1000) * 1000
610 };
612 if (!len)
613 return 0;
615 /* read first. Only select if read is incomplete. */
616 rc = read(fd, buf, len);
617 while (rc < 0 || cur + rc < len) {
618 if (!rc) {
619 RPRINTF("end-of-file");
620 return -1;
621 }
622 if (rc < 0 && errno != EAGAIN) {
623 RPRINTF("error during read: %s\n", strerror(errno));
624 return -1;
625 }
626 if (rc > 0)
627 cur += rc;
629 FD_ZERO(&rfds);
630 FD_SET(fd, &rfds);
631 if (!(rc = select(fd + 1, &rfds, NULL, NULL, &tv))) {
632 RPRINTF("time out during read\n");
633 return -1;
634 } else if (rc < 0) {
635 RPRINTF("error during select: %d\n", errno);
636 return -1;
637 }
638 rc = read(fd, buf + cur, len - cur);
639 }
640 /*
641 RPRINTF("read %d bytes\n", cur + rc);
642 */
644 return 0;
645 }
647 static int mwrite(int fd, void* buf, size_t len)
648 {
649 fd_set wfds;
650 size_t cur = 0;
651 int rc;
652 struct timeval tv = {
653 .tv_sec = HEARTBEAT_MS / 1000,
654 .tv_usec = (HEARTBEAT_MS % 1000) * 1000
655 };
657 if (!len)
658 return 0;
660 /* read first. Only select if read is incomplete. */
661 rc = write(fd, buf, len);
662 while (rc < 0 || cur + rc < len) {
663 if (!rc) {
664 RPRINTF("end-of-file");
665 return -1;
666 }
667 if (rc < 0 && errno != EAGAIN) {
668 RPRINTF("error during write: %s\n", strerror(errno));
669 return -1;
670 }
671 if (rc > 0)
672 cur += rc;
674 FD_ZERO(&wfds);
675 FD_SET(fd, &wfds);
676 if (!(rc = select(fd + 1, NULL, &wfds, NULL, &tv))) {
677 RPRINTF("time out during write\n");
678 return -1;
679 } else if (rc < 0) {
680 RPRINTF("error during select: %d\n", errno);
681 return -1;
682 }
683 rc = write(fd, buf + cur, len - cur);
684 }
685 /*
686 RPRINTF("wrote %d bytes\n", cur + rc);
687 */
689 return 0;
690 FD_ZERO(&wfds);
691 FD_SET(fd, &wfds);
692 select(fd + 1, NULL, &wfds, NULL, &tv);
693 }
696 static void inline close_stream_fd(struct tdremus_state *s)
697 {
698 /* XXX: -2 is magic. replace with macro perhaps? */
699 tapdisk_server_unregister_event(s->stream_fd.id);
700 close(s->stream_fd.fd);
701 s->stream_fd.fd = -2;
702 }
704 /* primary functions */
705 static void remus_client_event(event_id_t, char mode, void *private);
706 static void remus_connect_event(event_id_t id, char mode, void *private);
707 static void remus_retry_connect_event(event_id_t id, char mode, void *private);
709 static int primary_do_connect(struct tdremus_state *state)
710 {
711 event_id_t id;
712 int fd;
713 int rc;
714 int flags;
716 RPRINTF("client connecting to %s:%d...\n", inet_ntoa(state->sa.sin_addr), ntohs(state->sa.sin_port));
718 if ((fd = socket(PF_INET, SOCK_STREAM, 0)) < 0) {
719 RPRINTF("could not create client socket: %d\n", errno);
720 return -1;
721 }
723 /* make socket nonblocking */
724 if ((flags = fcntl(fd, F_GETFL, 0)) == -1)
725 flags = 0;
726 if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1)
727 return -1;
729 /* once we have created the socket and populated the address, we can now start
730 * our non-blocking connect. rather than duplicating code we trigger a timeout
731 * on the socket fd, which calls out nonblocking connect code
732 */
733 if((id = tapdisk_server_register_event(SCHEDULER_POLL_TIMEOUT, fd, 0, remus_retry_connect_event, state)) < 0) {
734 RPRINTF("error registering timeout client connection event handler: %s\n", strerror(id));
735 /* TODO: we leak a fd here */
736 return -1;
737 }
738 state->stream_fd.fd = fd;
739 state->stream_fd.id = id;
740 return 0;
741 }
743 static int primary_blocking_connect(struct tdremus_state *state)
744 {
745 int fd;
746 int id;
747 int rc;
748 int flags;
750 RPRINTF("client connecting to %s:%d...\n", inet_ntoa(state->sa.sin_addr), ntohs(state->sa.sin_port));
752 if ((fd = socket(PF_INET, SOCK_STREAM, 0)) < 0) {
753 RPRINTF("could not create client socket: %d\n", errno);
754 return -1;
755 }
757 do {
758 if ((rc = connect(fd, (struct sockaddr *)&state->sa,
759 sizeof(state->sa))) < 0)
760 {
761 if (errno == ECONNREFUSED) {
762 RPRINTF("connection refused -- retrying in 1 second\n");
763 sleep(1);
764 } else {
765 RPRINTF("connection failed: %d\n", errno);
766 close(fd);
767 return -1;
768 }
769 }
770 } while (rc < 0);
772 RPRINTF("client connected\n");
774 /* make socket nonblocking */
775 if ((flags = fcntl(fd, F_GETFL, 0)) == -1)
776 flags = 0;
777 if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1)
778 {
779 RPRINTF("error making socket nonblocking\n");
780 close(fd);
781 return -1;
782 }
784 if((id = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD, fd, 0, remus_client_event, state)) < 0) {
785 RPRINTF("error registering client event handler: %s\n", strerror(id));
786 close(fd);
787 return -1;
788 }
790 state->stream_fd.fd = fd;
791 state->stream_fd.id = id;
792 return 0;
793 }
795 /* on read, just pass request through */
796 static void primary_queue_read(td_driver_t *driver, td_request_t treq)
797 {
798 /* just pass read through */
799 td_forward_request(treq);
800 }
802 /* TODO:
803 * The primary uses mwrite() to write the contents of a write request to the
804 * backup. This effectively blocks until all data has been copied into a system
805 * buffer or a timeout has occured. We may wish to instead use tapdisk's
806 * nonblocking i/o interface, tapdisk_server_register_event(), to set timeouts
807 * and write data in an asynchronous fashion.
808 */
809 static void primary_queue_write(td_driver_t *driver, td_request_t treq)
810 {
811 struct tdremus_state *s = (struct tdremus_state *)driver->data;
813 char header[sizeof(uint32_t) + sizeof(uint64_t)];
814 uint32_t *sectors = (uint32_t *)header;
815 uint64_t *sector = (uint64_t *)(header + sizeof(uint32_t));
817 // RPRINTF("write: stream_fd.fd: %d\n", s->stream_fd.fd);
819 /* -1 means we haven't connected yet, -2 means the connection was lost */
820 if(s->stream_fd.fd == -1) {
821 RPRINTF("connecting to backup...\n");
822 primary_blocking_connect(s);
823 }
825 *sectors = treq.secs;
826 *sector = treq.sec;
828 if (mwrite(s->stream_fd.fd, TDREMUS_WRITE, strlen(TDREMUS_WRITE)) < 0)
829 goto fail;
830 if (mwrite(s->stream_fd.fd, header, sizeof(header)) < 0)
831 goto fail;
833 if (mwrite(s->stream_fd.fd, treq.buf, treq.secs * driver->info.sector_size) < 0)
834 goto fail;
836 td_forward_request(treq);
838 return;
840 fail:
841 /* switch to unprotected mode and tell tapdisk to retry */
842 RPRINTF("write request replication failed, switching to unprotected mode");
843 switch_mode(s->tdremus_driver, mode_unprotected);
844 td_complete_request(treq, -EBUSY);
845 }
848 static int client_flush(td_driver_t *driver)
849 {
850 struct tdremus_state *s = (struct tdremus_state *)driver->data;
852 // RPRINTF("committing output\n");
854 if (s->stream_fd.fd == -1)
855 /* connection not yet established, nothing to flush */
856 return 0;
858 if (mwrite(s->stream_fd.fd, TDREMUS_COMMIT, strlen(TDREMUS_COMMIT)) < 0) {
859 RPRINTF("error flushing output");
860 close_stream_fd(s);
861 return -1;
862 }
864 return 0;
865 }
867 static int primary_start(td_driver_t *driver)
868 {
869 struct tdremus_state *s = (struct tdremus_state *)driver->data;
871 RPRINTF("activating client mode\n");
873 tapdisk_remus.td_queue_read = primary_queue_read;
874 tapdisk_remus.td_queue_write = primary_queue_write;
875 s->queue_flush = client_flush;
877 s->stream_fd.fd = -1;
878 s->stream_fd.id = -1;
880 return 0;
881 }
883 /* timeout callback */
884 static void remus_retry_connect_event(event_id_t id, char mode, void *private)
885 {
886 struct tdremus_state *s = (struct tdremus_state *)private;
888 /* do a non-blocking connect */
889 if (connect(s->stream_fd.fd, (struct sockaddr *)&s->sa, sizeof(s->sa))
890 && errno != EINPROGRESS)
891 {
892 if(errno == ECONNREFUSED || errno == ENETUNREACH || errno == EAGAIN || errno == ECONNABORTED)
893 {
894 /* try again in a second */
895 tapdisk_server_unregister_event(s->stream_fd.id);
896 if((id = tapdisk_server_register_event(SCHEDULER_POLL_TIMEOUT, s->stream_fd.fd, REMUS_CONNRETRY_TIMEOUT, remus_retry_connect_event, s)) < 0) {
897 RPRINTF("error registering timeout client connection event handler: %s\n", strerror(id));
898 return;
899 }
900 s->stream_fd.id = id;
901 }
902 else
903 {
904 /* not recoverable */
905 RPRINTF("error connection to server %s\n", strerror(errno));
906 return;
907 }
908 }
909 else
910 {
911 /* the connect returned EINPROGRESS (nonblocking connect) we must wait for the fd to be writeable to determine if the connect worked */
913 tapdisk_server_unregister_event(s->stream_fd.id);
914 if((id = tapdisk_server_register_event(SCHEDULER_POLL_WRITE_FD, s->stream_fd.fd, 0, remus_connect_event, s)) < 0) {
915 RPRINTF("error registering client connection event handler: %s\n", strerror(id));
916 return;
917 }
918 s->stream_fd.id = id;
919 }
920 }
922 /* callback when nonblocking connect() is finished */
923 /* called only by primary in unprotected state */
924 static void remus_connect_event(event_id_t id, char mode, void *private)
925 {
926 int socket_errno;
927 socklen_t socket_errno_size;
928 struct tdremus_state *s = (struct tdremus_state *)private;
930 /* check to se if the connect succeeded */
931 socket_errno_size = sizeof(socket_errno);
932 if (getsockopt(s->stream_fd.fd, SOL_SOCKET, SO_ERROR, &socket_errno, &socket_errno_size)) {
933 RPRINTF("error getting socket errno\n");
934 return;
935 }
937 RPRINTF("socket connect returned %d\n", socket_errno);
939 if(socket_errno)
940 {
941 /* the connect did not succeed */
943 if(socket_errno == ECONNREFUSED || socket_errno == ENETUNREACH || socket_errno == ETIMEDOUT
944 || socket_errno == ECONNABORTED || socket_errno == EAGAIN)
945 {
946 /* we can probably assume that the backup is down. just try again later */
947 tapdisk_server_unregister_event(s->stream_fd.id);
948 if((id = tapdisk_server_register_event(SCHEDULER_POLL_TIMEOUT, s->stream_fd.fd, REMUS_CONNRETRY_TIMEOUT, remus_retry_connect_event, s)) < 0) {
949 RPRINTF("error registering timeout client connection event handler: %s\n", strerror(id));
950 return;
951 }
952 s->stream_fd.id = id;
953 }
954 else
955 {
956 RPRINTF("socket connect returned %d, giving up\n", socket_errno);
957 }
958 }
959 else
960 {
961 /* the connect succeeded */
963 /* unregister this function and register a new event handler */
964 tapdisk_server_unregister_event(s->stream_fd.id);
965 if((id = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD, s->stream_fd.fd, 0, remus_client_event, s)) < 0) {
966 RPRINTF("error registering client event handler: %s\n", strerror(id));
967 return;
968 }
969 s->stream_fd.id = id;
971 /* switch from unprotected to protected client */
972 switch_mode(s->tdremus_driver, mode_primary);
973 }
974 }
977 /* we install this event handler on the primary once we have connected to the backup */
978 /* wait for "done" message to commit checkpoint */
979 static void remus_client_event(event_id_t id, char mode, void *private)
980 {
981 struct tdremus_state *s = (struct tdremus_state *)private;
982 char req[5];
983 int rc;
985 if (mread(s->stream_fd.fd, req, sizeof(req) - 1) < 0) {
986 /* replication stream closed or otherwise broken (timeout, reset, &c) */
987 RPRINTF("error reading from backup\n");
988 close_stream_fd(s);
989 return;
990 }
992 req[4] = '\0';
994 if (!strcmp(req, TDREMUS_DONE))
995 /* checkpoint committed, inform msg_fd */
996 ctl_respond(s, TDREMUS_DONE);
997 else {
998 RPRINTF("received unknown message: %s\n", req);
999 close_stream_fd(s);
1002 return;
1005 /* backup functions */
1006 static void remus_server_event(event_id_t id, char mode, void *private);
1008 /* returns the socket that receives write requests */
1009 static void remus_server_accept(event_id_t id, char mode, void* private)
1011 struct tdremus_state* s = (struct tdremus_state *) private;
1013 int stream_fd;
1014 event_id_t cid;
1016 /* XXX: add address-based black/white list */
1017 if ((stream_fd = accept(s->server_fd.fd, NULL, NULL)) < 0) {
1018 RPRINTF("error accepting connection: %d\n", errno);
1019 return;
1022 /* TODO: check to see if we are already replicating. if so just close the
1023 * connection (or do something smarter) */
1024 RPRINTF("server accepted connection\n");
1026 /* add tapdisk event for replication stream */
1027 cid = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD, stream_fd, 0,
1028 remus_server_event, s);
1030 if(cid < 0) {
1031 RPRINTF("error registering connection event handler: %s\n", strerror(errno));
1032 close(stream_fd);
1033 return;
1036 /* store replication file descriptor */
1037 s->stream_fd.fd = stream_fd;
1038 s->stream_fd.id = cid;
1041 /* returns -2 if EADDRNOTAVAIL */
1042 static int remus_bind(struct tdremus_state* s)
1044 // struct sockaddr_in sa;
1045 int opt;
1046 int rc = -1;
1048 if ((s->server_fd.fd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
1049 RPRINTF("could not create server socket: %d\n", errno);
1050 return rc;
1052 opt = 1;
1053 if (setsockopt(s->server_fd.fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0)
1054 RPRINTF("Error setting REUSEADDR on %d: %d\n", s->server_fd.fd, errno);
1056 if (bind(s->server_fd.fd, (struct sockaddr *)&s->sa, sizeof(s->sa)) < 0) {
1057 RPRINTF("could not bind server socket %d to %s:%d: %d %s\n", s->server_fd.fd,
1058 inet_ntoa(s->sa.sin_addr), ntohs(s->sa.sin_port), errno, strerror(errno));
1059 if (errno != EADDRINUSE)
1060 rc = -2;
1061 goto err_sfd;
1063 if (listen(s->server_fd.fd, 10)) {
1064 RPRINTF("could not listen on socket: %d\n", errno);
1065 goto err_sfd;
1068 /* The socket s now bound to the address and listening so we may now register
1069 * the fd with tapdisk */
1071 if((s->server_fd.id = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD,
1072 s->server_fd.fd, 0,
1073 remus_server_accept, s)) < 0) {
1074 RPRINTF("error registering server connection event handler: %s",
1075 strerror(s->server_fd.id));
1076 goto err_sfd;
1079 return 0;
1081 err_sfd:
1082 close(s->server_fd.fd);
1083 s->server_fd.fd = -1;
1085 return rc;
1088 /* wait for latest checkpoint to be applied */
1089 static inline int server_writes_inflight(td_driver_t *driver)
1091 struct tdremus_state *s = (struct tdremus_state *)driver->data;
1093 if (!s->ramdisk.inflight && !s->ramdisk.prev)
1094 return 0;
1096 return 1;
1099 /* Due to block device prefetching this code may be called on the server side
1100 * during normal replication. In this case we must return EBUSY, otherwise the
1101 * domain may be started with stale data.
1102 */
1103 void backup_queue_read(td_driver_t *driver, td_request_t treq)
1105 struct tdremus_state *s = (struct tdremus_state *)driver->data;
1107 if(!remus_image)
1108 remus_image = treq.image;
1110 #if 0
1111 /* due to prefetching, we must return EBUSY on server reads. This
1112 * maintains a consistent disk image */
1113 td_complete_request(treq, -EBUSY);
1114 #else
1115 /* what exactly is the race that requires the response above? */
1116 td_forward_request(treq);
1117 #endif
1120 /* see above */
1121 void backup_queue_write(td_driver_t *driver, td_request_t treq)
1123 struct tdremus_state *s = (struct tdremus_state *)driver->data;
1125 /* on a server write, we know the domain has failed over. we must change our
1126 * state to unprotected and then have the unprotected queue_write function
1127 * handle the write
1128 */
1130 switch_mode(driver, mode_unprotected);
1131 /* TODO: call the appropriate write function rather than return EBUSY */
1132 td_complete_request(treq, -EBUSY);
1135 static int backup_start(td_driver_t *driver)
1137 struct tdremus_state *s = (struct tdremus_state *)driver->data;
1138 int fd;
1140 if (ramdisk_start(driver) < 0)
1141 return -1;
1143 tapdisk_remus.td_queue_read = backup_queue_read;
1144 tapdisk_remus.td_queue_write = backup_queue_write;
1145 /* TODO set flush function */
1146 return 0;
1149 static int server_do_wreq(td_driver_t *driver)
1151 struct tdremus_state *s = (struct tdremus_state *)driver->data;
1152 static tdremus_wire_t twreq;
1153 char buf[4096];
1154 int len, rc;
1156 char header[sizeof(uint32_t) + sizeof(uint64_t)];
1157 uint32_t *sectors = (uint32_t *) header;
1158 uint64_t *sector = (uint64_t *) &header[sizeof(uint32_t)];
1160 // RPRINTF("received write request\n");
1162 if (mread(s->stream_fd.fd, header, sizeof(header)) < 0)
1163 goto err;
1165 len = *sectors * driver->info.sector_size;
1167 //RPRINTF("writing %d sectors (%d bytes) starting at %" PRIu64 "\n", *sectors, len,
1168 // *sector);
1170 if (len > sizeof(buf)) {
1171 /* freak out! */
1172 RPRINTF("write request too large: %d/%u\n", len, (unsigned)sizeof(buf));
1173 return -1;
1176 if (mread(s->stream_fd.fd, buf, len) < 0)
1177 goto err;
1179 if (ramdisk_write(&s->ramdisk, *sector, *sectors, buf) < 0)
1180 goto err;
1182 return 0;
1184 err:
1185 /* should start failover */
1186 RPRINTF("backup write request error\n");
1187 close_stream_fd(s);
1189 return -1;
1192 static int server_do_sreq(td_driver_t *driver)
1194 /*
1195 RPRINTF("submit request received\n");
1196 */
1198 return 0;
1201 /* at this point, the server can start applying the most recent
1202 * ramdisk. */
1203 static int server_do_creq(td_driver_t *driver)
1205 struct tdremus_state *s = (struct tdremus_state *)driver->data;
1207 // RPRINTF("committing buffer\n");
1209 ramdisk_start_flush(driver);
1211 /* XXX this message should not be sent until flush completes! */
1212 if (write(s->stream_fd.fd, TDREMUS_DONE, strlen(TDREMUS_DONE)) != 4)
1213 return -1;
1215 return 0;
1219 /* called when data is pending in s->rfd */
1220 static void remus_server_event(event_id_t id, char mode, void *private)
1222 struct tdremus_state *s = (struct tdremus_state *)private;
1223 td_driver_t *driver = s->tdremus_driver;
1224 char req[5];
1226 // RPRINTF("replication data waiting\n");
1228 /* TODO: add a get_connection_by_event_id() function.
1229 * for now we can assume that the fd is s->stream_fd */
1231 if (mread(s->stream_fd.fd, req, sizeof(req) - 1) < 0) {
1232 RPRINTF("error reading server event, activating backup\n");
1233 switch_mode(driver, mode_unprotected);
1234 return;
1237 req[4] = '\0';
1239 if (!strcmp(req, TDREMUS_WRITE))
1240 server_do_wreq(driver);
1241 else if (!strcmp(req, TDREMUS_SUBMIT))
1242 server_do_sreq(driver);
1243 else if (!strcmp(req, TDREMUS_COMMIT))
1244 server_do_creq(driver);
1245 else
1246 RPRINTF("unknown request received: %s\n", req);
1248 return;
1252 /* unprotected */
1254 void unprotected_queue_read(td_driver_t *driver, td_request_t treq)
1256 struct tdremus_state *s = (struct tdremus_state *)driver->data;
1258 /* wait for previous ramdisk to flush before servicing reads */
1259 if (server_writes_inflight(driver)) {
1260 /* for now lets just return EBUSY. if this becomes an issue we can
1261 * do something smarter */
1262 td_complete_request(treq, -EBUSY);
1264 else {
1265 /* here we just pass reads through */
1266 td_forward_request(treq);
1270 /* For a recoverable remus solution we need to log unprotected writes here */
1271 void unprotected_queue_write(td_driver_t *driver, td_request_t treq)
1273 struct tdremus_state *s = (struct tdremus_state *)driver->data;
1275 /* wait for previous ramdisk to flush */
1276 if (server_writes_inflight(driver)) {
1277 RPRINTF("queue_write: waiting for queue to drain");
1278 td_complete_request(treq, -EBUSY);
1280 else {
1281 // RPRINTF("servicing write request on backup\n");
1282 td_forward_request(treq);
1286 static int unprotected_start(td_driver_t *driver)
1288 struct tdremus_state *s = (struct tdremus_state *)driver->data;
1290 RPRINTF("failure detected, activating passthrough\n");
1292 /* close the server socket */
1293 close_stream_fd(s);
1295 /* unregister the replication stream */
1296 tapdisk_server_unregister_event(s->server_fd.id);
1298 /* close the replication stream */
1299 close(s->server_fd.fd);
1300 s->server_fd.fd = -1;
1302 /* install the unprotected read/write handlers */
1303 tapdisk_remus.td_queue_read = unprotected_queue_read;
1304 tapdisk_remus.td_queue_write = unprotected_queue_write;
1306 return 0;
1310 /* control */
1312 static inline int resolve_address(const char* addr, struct in_addr* ia)
1314 struct hostent* he;
1315 uint32_t ip;
1317 if (!(he = gethostbyname(addr))) {
1318 RPRINTF("error resolving %s: %d\n", addr, h_errno);
1319 return -1;
1322 if (!he->h_addr_list[0]) {
1323 RPRINTF("no address found for %s\n", addr);
1324 return -1;
1327 /* network byte order */
1328 ip = *((uint32_t**)he->h_addr_list)[0];
1329 ia->s_addr = ip;
1331 return 0;
1334 static int get_args(td_driver_t *driver, const char* name)
1336 struct tdremus_state *state = (struct tdremus_state *)driver->data;
1337 char* host;
1338 char* port;
1339 // char* driver_str;
1340 // char* parent;
1341 // int type;
1342 // char* path;
1343 // unsigned long ulport;
1344 // int i;
1345 // struct sockaddr_in server_addr_in;
1347 int gai_status;
1348 int valid_addr;
1349 struct addrinfo gai_hints;
1350 struct addrinfo *servinfo, *servinfo_itr;
1352 memset(&gai_hints, 0, sizeof gai_hints);
1353 gai_hints.ai_family = AF_UNSPEC;
1354 gai_hints.ai_socktype = SOCK_STREAM;
1356 port = strchr(name, ':');
1357 if (!port) {
1358 RPRINTF("missing host in %s\n", name);
1359 return -ENOENT;
1361 if (!(host = strndup(name, port - name))) {
1362 RPRINTF("unable to allocate host\n");
1363 return -ENOMEM;
1365 port++;
1367 if ((gai_status = getaddrinfo(host, port, &gai_hints, &servinfo)) != 0) {
1368 RPRINTF("getaddrinfo error: %s\n", gai_strerror(gai_status));
1369 return -ENOENT;
1372 /* TODO: do something smarter here */
1373 valid_addr = 0;
1374 for(servinfo_itr = servinfo; servinfo_itr != NULL; servinfo_itr = servinfo_itr->ai_next) {
1375 void *addr;
1376 char *ipver;
1378 if (servinfo_itr->ai_family == AF_INET) {
1379 valid_addr = 1;
1380 memset(&state->sa, 0, sizeof(state->sa));
1381 state->sa = *(struct sockaddr_in *)servinfo_itr->ai_addr;
1382 break;
1385 freeaddrinfo(servinfo);
1387 if (!valid_addr)
1388 return -ENOENT;
1390 RPRINTF("host: %s, port: %d\n", inet_ntoa(state->sa.sin_addr), ntohs(state->sa.sin_port));
1392 return 0;
1395 static int switch_mode(td_driver_t *driver, enum tdremus_mode mode)
1397 struct tdremus_state *s = (struct tdremus_state *)driver->data;
1398 int rc;
1400 if (mode == s->mode)
1401 return 0;
1403 if (s->queue_flush)
1404 if ((rc = s->queue_flush(driver)) < 0) {
1405 // fall back to unprotected mode on error
1406 RPRINTF("switch_mode: error flushing queue (old: %d, new: %d)", s->mode, mode);
1407 mode = mode_unprotected;
1410 if (mode == mode_unprotected)
1411 rc = unprotected_start(driver);
1412 else if (mode == mode_primary)
1413 rc = primary_start(driver);
1414 else if (mode == mode_backup)
1415 rc = backup_start(driver);
1416 else {
1417 RPRINTF("unknown mode requested: %d\n", mode);
1418 rc = -1;
1421 if (!rc)
1422 s->mode = mode;
1424 return rc;
1427 static void ctl_request(event_id_t id, char mode, void *private)
1429 struct tdremus_state *s = (struct tdremus_state *)private;
1430 td_driver_t *driver = s->tdremus_driver;
1431 char msg[80];
1432 int rc;
1434 // RPRINTF("data waiting on control fifo\n");
1436 if (!(rc = read(s->ctl_fd.fd, msg, sizeof(msg) - 1 /* append nul */))) {
1437 RPRINTF("0-byte read received, reopening FIFO\n");
1438 /*TODO: we may have to unregister/re-register with tapdisk_server */
1439 close(s->ctl_fd.fd);
1440 RPRINTF("FIFO closed\n");
1441 if ((s->ctl_fd.fd = open(s->ctl_path, O_RDWR)) < 0) {
1442 RPRINTF("error reopening FIFO: %d\n", errno);
1444 return;
1447 if (rc < 0) {
1448 RPRINTF("error reading from FIFO: %d\n", errno);
1449 return;
1452 /* TODO: need to get driver somehow */
1453 msg[rc] = '\0';
1454 if (!strncmp(msg, "flush", 5)) {
1455 if (s->queue_flush)
1456 if ((rc = s->queue_flush(driver))) {
1457 RPRINTF("error passing flush request to backup");
1458 ctl_respond(s, TDREMUS_FAIL);
1460 } else {
1461 RPRINTF("unknown command: %s\n", msg);
1465 static int ctl_respond(struct tdremus_state *s, const char *response)
1467 int rc;
1469 if ((rc = write(s->msg_fd.fd, response, strlen(response))) < 0) {
1470 RPRINTF("error writing notification: %d\n", errno);
1471 close(s->msg_fd.fd);
1472 if ((s->msg_fd.fd = open(s->msg_path, O_RDWR)) < 0)
1473 RPRINTF("error reopening FIFO: %d\n", errno);
1476 return rc;
1479 /* must be called after the underlying driver has been initialized */
1480 static int ctl_open(td_driver_t *driver, const char* name)
1482 struct tdremus_state *s = (struct tdremus_state *)driver->data;
1483 int i, l;
1485 /* first we must ensure that BLKTAP_CTRL_DIR exists */
1486 if (mkdir(BLKTAP_CTRL_DIR, 0755) && errno != EEXIST)
1488 DPRINTF("error creating directory %s: %d\n", BLKTAP_CTRL_DIR, errno);
1489 return -1;
1492 /* use the device name to create the control fifo path */
1493 if (asprintf(&s->ctl_path, BLKTAP_CTRL_DIR "/remus_%s", name) < 0)
1494 return -1;
1495 /* scrub fifo pathname */
1496 for (i = strlen(BLKTAP_CTRL_DIR) + 1, l = strlen(s->ctl_path); i < l; i++) {
1497 if (strchr(":/", s->ctl_path[i]))
1498 s->ctl_path[i] = '_';
1500 if (asprintf(&s->msg_path, "%s.msg", s->ctl_path) < 0)
1501 goto err_ctlfifo;
1503 if (mkfifo(s->ctl_path, S_IRWXU|S_IRWXG|S_IRWXO) && errno != EEXIST) {
1504 RPRINTF("error creating control FIFO %s: %d\n", s->ctl_path, errno);
1505 goto err_msgfifo;
1508 if (mkfifo(s->msg_path, S_IRWXU|S_IRWXG|S_IRWXO) && errno != EEXIST) {
1509 RPRINTF("error creating message FIFO %s: %d\n", s->msg_path, errno);
1510 goto err_msgfifo;
1513 /* RDWR so that fd doesn't block select when no writer is present */
1514 if ((s->ctl_fd.fd = open(s->ctl_path, O_RDWR)) < 0) {
1515 RPRINTF("error opening control FIFO %s: %d\n", s->ctl_path, errno);
1516 goto err_msgfifo;
1519 if ((s->msg_fd.fd = open(s->msg_path, O_RDWR)) < 0) {
1520 RPRINTF("error opening message FIFO %s: %d\n", s->msg_path, errno);
1521 goto err_openctlfifo;
1524 RPRINTF("control FIFO %s\n", s->ctl_path);
1525 RPRINTF("message FIFO %s\n", s->msg_path);
1527 return 0;
1529 err_openctlfifo:
1530 close(s->ctl_fd.fd);
1531 err_msgfifo:
1532 free(s->msg_path);
1533 s->msg_path = NULL;
1534 err_ctlfifo:
1535 free(s->ctl_path);
1536 s->ctl_path = NULL;
1537 return -1;
1540 static void ctl_close(td_driver_t *driver)
1542 struct tdremus_state *s = (struct tdremus_state *)driver->data;
1544 /* TODO: close *all* connections */
1546 if(s->ctl_fd.fd)
1547 close(s->ctl_fd.fd);
1549 if (s->ctl_path) {
1550 unlink(s->ctl_path);
1551 free(s->ctl_path);
1552 s->ctl_path = NULL;
1554 if (s->msg_path) {
1555 unlink(s->msg_path);
1556 free(s->msg_path);
1557 s->msg_path = NULL;
1561 static int ctl_register(struct tdremus_state *s)
1563 RPRINTF("registering ctl fifo\n");
1565 /* register ctl fd */
1566 s->ctl_fd.id = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD, s->ctl_fd.fd, 0, ctl_request, s);
1568 if (s->ctl_fd.id < 0) {
1569 RPRINTF("error registering ctrl FIFO %s: %d\n", s->ctl_path, s->ctl_fd.id);
1570 return -1;
1573 return 0;
1576 /* interface */
1578 static int tdremus_open(td_driver_t *driver, const char *name,
1579 td_flag_t flags)
1581 struct tdremus_state *s = (struct tdremus_state *)driver->data;
1582 int rc;
1584 RPRINTF("opening %s\n", name);
1586 /* first we need to get the underlying vbd for this driver stack. To do so we
1587 * need to know the vbd's id. Fortunately, for tapdisk2 this is hard-coded as
1588 * 0 (see tapdisk2.c)
1589 */
1590 device_vbd = tapdisk_server_get_vbd(0);
1592 memset(s, 0, sizeof(*s));
1593 s->server_fd.fd = -1;
1594 s->stream_fd.fd = -1;
1595 s->ctl_fd.fd = -1;
1596 s->msg_fd.fd = -1;
1598 /* TODO: this is only needed so that the server can send writes down
1599 * the driver stack from the stream_fd event handler */
1600 s->tdremus_driver = driver;
1602 /* parse name to get info etc */
1603 if ((rc = get_args(driver, name)))
1604 return rc;
1606 if ((rc = ctl_open(driver, name))) {
1607 RPRINTF("error setting up control channel\n");
1608 free(s->driver_data);
1609 return rc;
1612 if ((rc = ctl_register(s))) {
1613 RPRINTF("error registering control channel\n");
1614 free(s->driver_data);
1615 return rc;
1618 if (!(rc = remus_bind(s)))
1619 rc = switch_mode(driver, mode_backup);
1620 else if (rc == -2)
1621 rc = switch_mode(driver, mode_primary);
1623 if (!rc)
1624 return 0;
1626 tdremus_close(driver);
1627 return -EIO;
1630 static int tdremus_close(td_driver_t *driver)
1632 struct tdremus_state *s = (struct tdremus_state *)driver->data;
1634 RPRINTF("closing\n");
1636 if (s->driver_data) {
1637 free(s->driver_data);
1638 s->driver_data = NULL;
1640 if (s->server_fd.fd >= 0) {
1641 close(s->server_fd.fd);
1642 s->server_fd.fd = -1;
1644 if (s->stream_fd.fd >= 0)
1645 close_stream_fd(s);
1647 ctl_close(driver);
1649 return 0;
1652 static int tdremus_get_parent_id(td_driver_t *driver, td_disk_id_t *id)
1654 /* we shouldn't have a parent... for now */
1655 return -EINVAL;
1658 static int tdremus_validate_parent(td_driver_t *driver,
1659 td_driver_t *pdriver, td_flag_t flags)
1661 return 0;
1664 struct tap_disk tapdisk_remus = {
1665 .disk_type = "tapdisk_remus",
1666 .private_data_size = sizeof(struct tdremus_state),
1667 .td_open = tdremus_open,
1668 .td_queue_read = unprotected_queue_read,
1669 .td_queue_write = unprotected_queue_write,
1670 .td_close = tdremus_close,
1671 .td_get_parent_id = tdremus_get_parent_id,
1672 .td_validate_parent = tdremus_validate_parent,
1673 .td_debug = NULL,
1674 };