debuggers.hg

view tools/blktap2/drivers/tapdisk-queue.c @ 20918:b0ffb4912c46

blktap2: Prefer AIO eventfd support on kernels >= 2.6.22

Mainline kernel support for eventfd(2) in linux aio was added between
2.6.21 and 2.6.22. Libaio after 0.3.107 has the header file, but
presently few systems support it. Neither do we rely on an up-to-date
libc6.

Instead, this patch adds a header which defines custom iocb_common
struct, and works around a potentially missing sys/eventfd.h.

Signed-off-by: Daniel Stodden <daniel.stodden@citrix.com>
author Keir Fraser <keir.fraser@citrix.com>
date Fri Jan 29 08:55:27 2010 +0000 (2010-01-29)
parents 218026df8d5f
children b60379dad533
line source
1 /*
2 * Copyright (c) 2008, XenSource Inc.
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 * * Redistributions of source code must retain the above copyright
8 * notice, this list of conditions and the following disclaimer.
9 * * Redistributions in binary form must reproduce the above copyright
10 * notice, this list of conditions and the following disclaimer in the
11 * documentation and/or other materials provided with the distribution.
12 * * Neither the name of XenSource Inc. nor the names of its contributors
13 * may be used to endorse or promote products derived from this software
14 * without specific prior written permission.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
17 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
18 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
19 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER
20 * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
21 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
22 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
23 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
24 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
25 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
26 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27 */
29 #include <errno.h>
30 #include <stdlib.h>
31 #include <unistd.h>
32 #include <libaio.h>
33 #ifdef __linux__
34 #include <linux/version.h>
35 #endif
37 #include "tapdisk.h"
38 #include "tapdisk-log.h"
39 #include "tapdisk-queue.h"
40 #include "tapdisk-filter.h"
41 #include "tapdisk-server.h"
42 #include "tapdisk-utils.h"
44 #include "libaio-compat.h"
45 #include "atomicio.h"
47 #define WARN(_f, _a...) tlog_write(TLOG_WARN, _f, ##_a)
48 #define DBG(_f, _a...) tlog_write(TLOG_DBG, _f, ##_a)
49 #define ERR(_err, _f, _a...) tlog_error(_err, _f, ##_a)
51 /*
52 * We used a kernel patch to return an fd associated with the AIO context
53 * so that we can concurrently poll on synchronous and async descriptors.
54 * This is signalled by passing 1 as the io context to io_setup.
55 */
56 #define REQUEST_ASYNC_FD ((io_context_t)1)
58 static inline void
59 queue_tiocb(struct tqueue *queue, struct tiocb *tiocb)
60 {
61 struct iocb *iocb = &tiocb->iocb;
63 if (queue->queued) {
64 struct tiocb *prev = (struct tiocb *)
65 queue->iocbs[queue->queued - 1]->data;
66 prev->next = tiocb;
67 }
69 queue->iocbs[queue->queued++] = iocb;
70 }
72 static inline int
73 deferred_tiocbs(struct tqueue *queue)
74 {
75 return (queue->deferred.head != NULL);
76 }
78 static inline void
79 defer_tiocb(struct tqueue *queue, struct tiocb *tiocb)
80 {
81 struct tlist *list = &queue->deferred;
83 if (!list->head)
84 list->head = list->tail = tiocb;
85 else
86 list->tail = list->tail->next = tiocb;
88 queue->tiocbs_deferred++;
89 queue->deferrals++;
90 }
92 static inline void
93 queue_deferred_tiocb(struct tqueue *queue)
94 {
95 struct tlist *list = &queue->deferred;
97 if (list->head) {
98 struct tiocb *tiocb = list->head;
100 list->head = tiocb->next;
101 if (!list->head)
102 list->tail = NULL;
104 queue_tiocb(queue, tiocb);
105 queue->tiocbs_deferred--;
106 }
107 }
109 static inline void
110 queue_deferred_tiocbs(struct tqueue *queue)
111 {
112 while (!tapdisk_queue_full(queue) && deferred_tiocbs(queue))
113 queue_deferred_tiocb(queue);
114 }
116 /*
117 * td_complete may queue more tiocbs
118 */
119 static void
120 complete_tiocb(struct tqueue *queue, struct tiocb *tiocb, unsigned long res)
121 {
122 int err;
123 struct iocb *iocb = &tiocb->iocb;
125 if (res == iocb->u.c.nbytes)
126 err = 0;
127 else if ((int)res < 0)
128 err = (int)res;
129 else
130 err = -EIO;
132 tiocb->cb(tiocb->arg, tiocb, err);
133 }
135 static int
136 cancel_tiocbs(struct tqueue *queue, int err)
137 {
138 int queued;
139 struct tiocb *tiocb;
141 if (!queue->queued)
142 return 0;
144 /*
145 * td_complete may queue more tiocbs, which
146 * will overwrite the contents of queue->iocbs.
147 * use a private linked list to keep track
148 * of the tiocbs we're cancelling.
149 */
150 tiocb = queue->iocbs[0]->data;
151 queued = queue->queued;
152 queue->queued = 0;
154 for (; tiocb != NULL; tiocb = tiocb->next)
155 complete_tiocb(queue, tiocb, err);
157 return queued;
158 }
160 static int
161 fail_tiocbs(struct tqueue *queue, int succeeded, int total, int err)
162 {
163 ERR(err, "io_submit error: %d of %d failed",
164 total - succeeded, total);
166 /* take any non-submitted, merged iocbs
167 * off of the queue, split them, and fail them */
168 queue->queued = io_expand_iocbs(&queue->opioctx,
169 queue->iocbs, succeeded, total);
171 return cancel_tiocbs(queue, err);
172 }
174 /*
175 * rwio
176 */
178 struct rwio {
179 struct io_event *aio_events;
180 };
182 static void
183 tapdisk_rwio_destroy(struct tqueue *queue)
184 {
185 struct rwio *rwio = queue->tio_data;
187 if (rwio->aio_events) {
188 free(rwio->aio_events);
189 rwio->aio_events = NULL;
190 }
191 }
193 static int
194 tapdisk_rwio_setup(struct tqueue *queue, int size)
195 {
196 struct rwio *rwio = queue->tio_data;
197 int err;
199 rwio->aio_events = calloc(size, sizeof(struct io_event));
200 if (!rwio->aio_events)
201 return -errno;
203 return 0;
204 }
206 static inline ssize_t
207 tapdisk_rwio_rw(const struct iocb *iocb)
208 {
209 int fd = iocb->aio_fildes;
210 char *buf = iocb->u.c.buf;
211 long long off = iocb->u.c.offset;
212 size_t size = iocb->u.c.nbytes;
213 ssize_t (*func)(int, void *, size_t) =
214 (iocb->aio_lio_opcode == IO_CMD_PWRITE ? vwrite : read);
216 if (lseek(fd, off, SEEK_SET) == (off_t)-1)
217 return -errno;
219 if (atomicio(func, fd, buf, size) != size)
220 return -errno;
222 return size;
223 }
225 static int
226 tapdisk_rwio_submit(struct tqueue *queue)
227 {
228 struct rwio *rwio = queue->tio_data;
229 int i, merged, split;
230 struct iocb *iocb;
231 struct tiocb *tiocb;
232 struct io_event *ep;
234 if (!queue->queued)
235 return 0;
237 tapdisk_filter_iocbs(queue->filter, queue->iocbs, queue->queued);
238 merged = io_merge(&queue->opioctx, queue->iocbs, queue->queued);
240 queue->queued = 0;
242 for (i = 0; i < merged; i++) {
243 ep = rwio->aio_events + i;
244 iocb = queue->iocbs[i];
245 ep->obj = iocb;
246 ep->res = tapdisk_rwio_rw(iocb);
247 }
249 split = io_split(&queue->opioctx, rwio->aio_events, merged);
250 tapdisk_filter_events(queue->filter, rwio->aio_events, split);
252 for (i = split, ep = rwio->aio_events; i-- > 0; ep++) {
253 iocb = ep->obj;
254 tiocb = iocb->data;
255 complete_tiocb(queue, tiocb, ep->res);
256 }
258 queue_deferred_tiocbs(queue);
260 return split;
261 }
263 static const struct tio td_tio_rwio = {
264 .name = "rwio",
265 .data_size = 0,
266 .tio_setup = NULL,
267 .tio_destroy = NULL,
268 .tio_submit = tapdisk_rwio_submit
269 };
271 /*
272 * libaio
273 */
275 struct lio {
276 io_context_t aio_ctx;
277 struct io_event *aio_events;
279 int event_fd;
280 int event_id;
282 int flags;
283 };
285 #define LIO_FLAG_EVENTFD (1<<0)
287 static int
288 tapdisk_lio_check_resfd(void)
289 {
290 return tapdisk_linux_version() >= KERNEL_VERSION(2, 6, 22);
291 }
293 static void
294 tapdisk_lio_destroy_aio(struct tqueue *queue)
295 {
296 struct lio *lio = queue->tio_data;
298 if (lio->event_fd >= 0) {
299 close(lio->event_fd);
300 lio->event_fd = -1;
301 }
303 if (lio->aio_ctx) {
304 io_destroy(lio->aio_ctx);
305 lio->aio_ctx = 0;
306 }
307 }
309 static int
310 __lio_setup_aio_poll(struct tqueue *queue, int qlen)
311 {
312 struct lio *lio = queue->tio_data;
313 int err, fd;
315 lio->aio_ctx = REQUEST_ASYNC_FD;
317 fd = io_setup(qlen, &lio->aio_ctx);
318 if (fd < 0) {
319 lio->aio_ctx = 0;
320 err = -errno;
322 if (err == -EINVAL)
323 goto fail_fd;
325 goto fail;
326 }
328 lio->event_fd = fd;
330 return 0;
332 fail_fd:
333 DPRINTF("Couldn't get fd for AIO poll support. This is probably "
334 "because your kernel does not have the aio-poll patch "
335 "applied.\n");
336 fail:
337 return err;
338 }
340 static int
341 __lio_setup_aio_eventfd(struct tqueue *queue, int qlen)
342 {
343 struct lio *lio = queue->tio_data;
344 int err;
346 err = io_setup(qlen, &lio->aio_ctx);
347 if (err < 0) {
348 lio->aio_ctx = 0;
349 return err;
350 }
352 lio->event_fd = tapdisk_sys_eventfd(0);
353 if (lio->event_fd < 0)
354 return -errno;
356 lio->flags |= LIO_FLAG_EVENTFD;
358 return 0;
359 }
361 static int
362 tapdisk_lio_setup_aio(struct tqueue *queue, int qlen)
363 {
364 struct lio *lio = queue->tio_data;
365 int err;
367 lio->aio_ctx = 0;
368 lio->event_fd = -1;
370 /*
371 * prefer the mainline eventfd(2) api, if available.
372 * if not, fall back to the poll fd patch.
373 */
375 err = !tapdisk_lio_check_resfd();
376 if (!err)
377 err = __lio_setup_aio_eventfd(queue, qlen);
378 if (err)
379 err = __lio_setup_aio_poll(queue, qlen);
381 if (err == -EAGAIN)
382 goto fail_rsv;
383 fail:
384 return err;
386 fail_rsv:
387 DPRINTF("Couldn't setup AIO context. If you are trying to "
388 "concurrently use a large number of blktap-based disks, you may "
389 "need to increase the system-wide aio request limit. "
390 "(e.g. 'echo 1048576 > /proc/sys/fs/aio-max-nr')\n");
391 goto fail;
392 }
395 static void
396 tapdisk_lio_destroy(struct tqueue *queue)
397 {
398 struct lio *lio = queue->tio_data;
400 if (!lio)
401 return;
403 if (lio->event_id >= 0) {
404 tapdisk_server_unregister_event(lio->event_id);
405 lio->event_id = -1;
406 }
408 tapdisk_lio_destroy_aio(queue);
410 if (lio->aio_events) {
411 free(lio->aio_events);
412 lio->aio_events = NULL;
413 }
414 }
416 static void
417 tapdisk_lio_set_eventfd(struct tqueue *queue, int n, struct iocb **iocbs)
418 {
419 struct lio *lio = queue->tio_data;
420 int i;
422 if (lio->flags & LIO_FLAG_EVENTFD)
423 for (i = 0; i < n; ++i)
424 __io_set_eventfd(iocbs[i], lio->event_fd);
425 }
427 static void
428 tapdisk_lio_ack_event(struct tqueue *queue)
429 {
430 struct lio *lio = queue->tio_data;
431 uint64_t val;
433 if (lio->flags & LIO_FLAG_EVENTFD)
434 read(lio->event_fd, &val, sizeof(val));
435 }
437 static void
438 tapdisk_lio_event(event_id_t id, char mode, void *private)
439 {
440 struct tqueue *queue = private;
441 struct lio *lio;
442 int i, ret, split;
443 struct iocb *iocb;
444 struct tiocb *tiocb;
445 struct io_event *ep;
447 tapdisk_lio_ack_event(queue);
449 lio = queue->tio_data;
450 ret = io_getevents(lio->aio_ctx, 0,
451 queue->size, lio->aio_events, NULL);
452 split = io_split(&queue->opioctx, lio->aio_events, ret);
453 tapdisk_filter_events(queue->filter, lio->aio_events, split);
455 DBG("events: %d, tiocbs: %d\n", ret, split);
457 queue->iocbs_pending -= ret;
458 queue->tiocbs_pending -= split;
460 for (i = split, ep = lio->aio_events; i-- > 0; ep++) {
461 iocb = ep->obj;
462 tiocb = iocb->data;
463 complete_tiocb(queue, tiocb, ep->res);
464 }
466 queue_deferred_tiocbs(queue);
467 }
469 static int
470 tapdisk_lio_setup(struct tqueue *queue, int qlen)
471 {
472 struct lio *lio = queue->tio_data;
473 size_t sz;
474 int err;
476 lio->event_id = -1;
478 err = tapdisk_lio_setup_aio(queue, qlen);
479 if (err)
480 goto fail;
482 lio->event_id =
483 tapdisk_server_register_event(SCHEDULER_POLL_READ_FD,
484 lio->event_fd, 0,
485 tapdisk_lio_event,
486 queue);
487 err = lio->event_id;
488 if (err < 0)
489 goto fail;
491 lio->aio_events = calloc(qlen, sizeof(struct io_event));
492 if (!lio->aio_events) {
493 err = -errno;
494 goto fail;
495 }
497 return 0;
499 fail:
500 tapdisk_lio_destroy(queue);
501 return err;
502 }
504 static int
505 tapdisk_lio_submit(struct tqueue *queue)
506 {
507 struct lio *lio = queue->tio_data;
508 int merged, submitted, err = 0;
510 if (!queue->queued)
511 return 0;
513 tapdisk_filter_iocbs(queue->filter, queue->iocbs, queue->queued);
514 merged = io_merge(&queue->opioctx, queue->iocbs, queue->queued);
515 tapdisk_lio_set_eventfd(queue, merged, queue->iocbs);
516 submitted = io_submit(lio->aio_ctx, merged, queue->iocbs);
518 DBG("queued: %d, merged: %d, submitted: %d\n",
519 queue->queued, merged, submitted);
521 if (submitted < 0) {
522 err = submitted;
523 submitted = 0;
524 } else if (submitted < merged)
525 err = -EIO;
527 queue->iocbs_pending += submitted;
528 queue->tiocbs_pending += queue->queued;
529 queue->queued = 0;
531 if (err)
532 queue->tiocbs_pending -=
533 fail_tiocbs(queue, submitted, merged, err);
535 return submitted;
536 }
538 static const struct tio td_tio_lio = {
539 .name = "lio",
540 .data_size = sizeof(struct lio),
541 .tio_setup = tapdisk_lio_setup,
542 .tio_destroy = tapdisk_lio_destroy,
543 .tio_submit = tapdisk_lio_submit,
544 };
546 static void
547 tapdisk_queue_free_io(struct tqueue *queue)
548 {
549 if (queue->tio) {
550 if (queue->tio->tio_destroy)
551 queue->tio->tio_destroy(queue);
552 queue->tio = NULL;
553 }
555 if (queue->tio_data) {
556 free(queue->tio_data);
557 queue->tio_data = NULL;
558 }
559 }
561 static int
562 tapdisk_queue_init_io(struct tqueue *queue, int drv)
563 {
564 const struct tio *tio;
565 int err;
567 switch (drv) {
568 case TIO_DRV_LIO:
569 tio = &td_tio_lio;
570 break;
571 case TIO_DRV_RWIO:
572 tio = &td_tio_rwio;
573 break;
574 default:
575 err = -EINVAL;
576 goto fail;
577 }
579 queue->tio_data = calloc(1, tio->data_size);
580 if (!queue->tio_data) {
581 PERROR("malloc(%zu)", tio->data_size);
582 err = -errno;
583 goto fail;
584 }
586 queue->tio = tio;
588 if (tio->tio_setup) {
589 err = tio->tio_setup(queue, queue->size);
590 if (err)
591 goto fail;
592 }
594 DPRINTF("I/O queue driver: %s\n", tio->name);
596 return 0;
598 fail:
599 tapdisk_queue_free_io(queue);
600 return err;
601 }
603 int
604 tapdisk_init_queue(struct tqueue *queue, int size,
605 int drv, struct tfilter *filter)
606 {
607 int i, err;
609 memset(queue, 0, sizeof(struct tqueue));
611 queue->size = size;
612 queue->filter = filter;
614 if (!size)
615 return 0;
617 err = tapdisk_queue_init_io(queue, drv);
618 if (err)
619 goto fail;
621 queue->iocbs = calloc(size, sizeof(struct iocb *));
622 if (!queue->iocbs) {
623 err = -errno;
624 goto fail;
625 }
627 err = opio_init(&queue->opioctx, size);
628 if (err)
629 goto fail;
631 return 0;
633 fail:
634 tapdisk_free_queue(queue);
635 return err;
636 }
638 void
639 tapdisk_free_queue(struct tqueue *queue)
640 {
641 tapdisk_queue_free_io(queue);
643 free(queue->iocbs);
644 queue->iocbs = NULL;
646 opio_free(&queue->opioctx);
647 }
649 void
650 tapdisk_debug_queue(struct tqueue *queue)
651 {
652 struct tiocb *tiocb = queue->deferred.head;
654 WARN("TAPDISK QUEUE:\n");
655 WARN("size: %d, tio: %s, queued: %d, iocbs_pending: %d, "
656 "tiocbs_pending: %d, tiocbs_deferred: %d, deferrals: %"PRIx64"\n",
657 queue->size, queue->tio->name, queue->queued, queue->iocbs_pending,
658 queue->tiocbs_pending, queue->tiocbs_deferred, queue->deferrals);
660 if (tiocb) {
661 WARN("deferred:\n");
662 for (; tiocb != NULL; tiocb = tiocb->next) {
663 struct iocb *io = &tiocb->iocb;
664 WARN("%s of %lu bytes at %lld\n",
665 (io->aio_lio_opcode == IO_CMD_PWRITE ?
666 "write" : "read"),
667 io->u.c.nbytes, io->u.c.offset);
668 }
669 }
670 }
672 void
673 tapdisk_prep_tiocb(struct tiocb *tiocb, int fd, int rw, char *buf, size_t size,
674 long long offset, td_queue_callback_t cb, void *arg)
675 {
676 struct iocb *iocb = &tiocb->iocb;
678 if (rw)
679 io_prep_pwrite(iocb, fd, buf, size, offset);
680 else
681 io_prep_pread(iocb, fd, buf, size, offset);
683 iocb->data = tiocb;
684 tiocb->cb = cb;
685 tiocb->arg = arg;
686 tiocb->next = NULL;
687 }
689 void
690 tapdisk_queue_tiocb(struct tqueue *queue, struct tiocb *tiocb)
691 {
692 if (!tapdisk_queue_full(queue))
693 queue_tiocb(queue, tiocb);
694 else
695 defer_tiocb(queue, tiocb);
696 }
699 /*
700 * fail_tiocbs may queue more tiocbs
701 */
702 int
703 tapdisk_submit_tiocbs(struct tqueue *queue)
704 {
705 return queue->tio->tio_submit(queue);
706 }
708 int
709 tapdisk_submit_all_tiocbs(struct tqueue *queue)
710 {
711 int submitted = 0;
713 do {
714 submitted += tapdisk_submit_tiocbs(queue);
715 } while (!tapdisk_queue_empty(queue));
717 return submitted;
718 }
720 /*
721 * cancel_tiocbs may queue more tiocbs
722 */
723 int
724 tapdisk_cancel_tiocbs(struct tqueue *queue)
725 {
726 return cancel_tiocbs(queue, -EIO);
727 }
729 int
730 tapdisk_cancel_all_tiocbs(struct tqueue *queue)
731 {
732 int cancelled = 0;
734 do {
735 cancelled += tapdisk_cancel_tiocbs(queue);
736 } while (!tapdisk_queue_empty(queue));
738 return cancelled;
739 }