debuggers.hg

view tools/blktap2/drivers/tapdisk-queue.c @ 20916:f2ef85551a09

blktap2: Sort out tapdisk AIO init.

Move event callbacks registration into tapdisk-queue. This should also
obsoletes the dummy pollfd pipe in the synchronous I/O case.

Signed-off-by: Daniel Stodden <daniel.stodden@citrix.com>
author Keir Fraser <keir.fraser@citrix.com>
date Fri Jan 29 08:54:22 2010 +0000 (2010-01-29)
parents b7f73a7f3078
children 218026df8d5f
line source
1 /*
2 * Copyright (c) 2008, XenSource Inc.
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 * * Redistributions of source code must retain the above copyright
8 * notice, this list of conditions and the following disclaimer.
9 * * Redistributions in binary form must reproduce the above copyright
10 * notice, this list of conditions and the following disclaimer in the
11 * documentation and/or other materials provided with the distribution.
12 * * Neither the name of XenSource Inc. nor the names of its contributors
13 * may be used to endorse or promote products derived from this software
14 * without specific prior written permission.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
17 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
18 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
19 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER
20 * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
21 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
22 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
23 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
24 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
25 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
26 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27 */
29 #include <errno.h>
30 #include <stdlib.h>
31 #include <unistd.h>
32 #include <libaio.h>
34 #include "tapdisk.h"
35 #include "tapdisk-log.h"
36 #include "tapdisk-queue.h"
37 #include "tapdisk-filter.h"
38 #include "tapdisk-server.h"
39 #include "atomicio.h"
41 #define WARN(_f, _a...) tlog_write(TLOG_WARN, _f, ##_a)
42 #define DBG(_f, _a...) tlog_write(TLOG_DBG, _f, ##_a)
43 #define ERR(_err, _f, _a...) tlog_error(_err, _f, ##_a)
45 /*
46 * We used a kernel patch to return an fd associated with the AIO context
47 * so that we can concurrently poll on synchronous and async descriptors.
48 * This is signalled by passing 1 as the io context to io_setup.
49 */
50 #define REQUEST_ASYNC_FD ((io_context_t)1)
52 static inline void
53 queue_tiocb(struct tqueue *queue, struct tiocb *tiocb)
54 {
55 struct iocb *iocb = &tiocb->iocb;
57 if (queue->queued) {
58 struct tiocb *prev = (struct tiocb *)
59 queue->iocbs[queue->queued - 1]->data;
60 prev->next = tiocb;
61 }
63 queue->iocbs[queue->queued++] = iocb;
64 }
66 static inline int
67 deferred_tiocbs(struct tqueue *queue)
68 {
69 return (queue->deferred.head != NULL);
70 }
72 static inline void
73 defer_tiocb(struct tqueue *queue, struct tiocb *tiocb)
74 {
75 struct tlist *list = &queue->deferred;
77 if (!list->head)
78 list->head = list->tail = tiocb;
79 else
80 list->tail = list->tail->next = tiocb;
82 queue->tiocbs_deferred++;
83 queue->deferrals++;
84 }
86 static inline void
87 queue_deferred_tiocb(struct tqueue *queue)
88 {
89 struct tlist *list = &queue->deferred;
91 if (list->head) {
92 struct tiocb *tiocb = list->head;
94 list->head = tiocb->next;
95 if (!list->head)
96 list->tail = NULL;
98 queue_tiocb(queue, tiocb);
99 queue->tiocbs_deferred--;
100 }
101 }
103 static inline void
104 queue_deferred_tiocbs(struct tqueue *queue)
105 {
106 while (!tapdisk_queue_full(queue) && deferred_tiocbs(queue))
107 queue_deferred_tiocb(queue);
108 }
110 /*
111 * td_complete may queue more tiocbs
112 */
113 static void
114 complete_tiocb(struct tqueue *queue, struct tiocb *tiocb, unsigned long res)
115 {
116 int err;
117 struct iocb *iocb = &tiocb->iocb;
119 if (res == iocb->u.c.nbytes)
120 err = 0;
121 else if ((int)res < 0)
122 err = (int)res;
123 else
124 err = -EIO;
126 tiocb->cb(tiocb->arg, tiocb, err);
127 }
129 static int
130 cancel_tiocbs(struct tqueue *queue, int err)
131 {
132 int queued;
133 struct tiocb *tiocb;
135 if (!queue->queued)
136 return 0;
138 /*
139 * td_complete may queue more tiocbs, which
140 * will overwrite the contents of queue->iocbs.
141 * use a private linked list to keep track
142 * of the tiocbs we're cancelling.
143 */
144 tiocb = (struct tiocb *)queue->iocbs[0]->data;
145 queued = queue->queued;
146 queue->queued = 0;
148 for (; tiocb != NULL; tiocb = tiocb->next)
149 complete_tiocb(queue, tiocb, err);
151 return queued;
152 }
154 static int
155 fail_tiocbs(struct tqueue *queue, int succeeded, int total, int err)
156 {
157 ERR(err, "io_submit error: %d of %d failed",
158 total - succeeded, total);
160 /* take any non-submitted, merged iocbs
161 * off of the queue, split them, and fail them */
162 queue->queued = io_expand_iocbs(&queue->opioctx,
163 queue->iocbs, succeeded, total);
165 return cancel_tiocbs(queue, err);
166 }
168 static inline ssize_t
169 iocb_rw(struct iocb *iocb)
170 {
171 int fd = iocb->aio_fildes;
172 char *buf = iocb->u.c.buf;
173 long long off = iocb->u.c.offset;
174 size_t size = iocb->u.c.nbytes;
175 ssize_t (*func)(int, void *, size_t) =
176 (iocb->aio_lio_opcode == IO_CMD_PWRITE ? vwrite : read);
178 if (lseek(fd, off, SEEK_SET) == (off_t)-1)
179 return -errno;
181 if (atomicio(func, fd, buf, size) != size)
182 return -errno;
184 return size;
185 }
187 static int
188 io_synchronous_rw(struct tqueue *queue)
189 {
190 int i, merged, split;
191 struct iocb *iocb;
192 struct tiocb *tiocb;
193 struct io_event *ep;
195 if (!queue->queued)
196 return 0;
198 tapdisk_filter_iocbs(queue->filter, queue->iocbs, queue->queued);
199 merged = io_merge(&queue->opioctx, queue->iocbs, queue->queued);
201 queue->queued = 0;
203 for (i = 0; i < merged; i++) {
204 ep = queue->aio_events + i;
205 iocb = queue->iocbs[i];
206 ep->obj = iocb;
207 ep->res = iocb_rw(iocb);
208 }
210 split = io_split(&queue->opioctx, queue->aio_events, merged);
211 tapdisk_filter_events(queue->filter, queue->aio_events, split);
213 for (i = split, ep = queue->aio_events; i-- > 0; ep++) {
214 iocb = ep->obj;
215 tiocb = (struct tiocb *)iocb->data;
216 complete_tiocb(queue, tiocb, ep->res);
217 }
219 queue_deferred_tiocbs(queue);
221 return split;
222 }
224 static void tapdisk_tiocb_event(event_id_t id, char mode, void *private);
226 int
227 tapdisk_init_queue(struct tqueue *queue, int size,
228 int sync, struct tfilter *filter)
229 {
230 int i, err;
232 memset(queue, 0, sizeof(struct tqueue));
234 queue->size = size;
235 queue->sync = sync;
236 queue->filter = filter;
238 queue->event = -1;
239 queue->aio_ctx = NULL;
241 if (!size)
242 return 0;
244 if (!sync) {
245 queue->aio_ctx = REQUEST_ASYNC_FD;
246 queue->poll_fd = io_setup(size, &queue->aio_ctx);
247 err = queue->poll_fd;
248 if (err < 0) {
249 if (err == -EAGAIN)
250 DPRINTF("Couldn't setup AIO context. If you "
251 "are trying to concurrently use a "
252 "large number of blktap-based disks, "
253 "you may need to increase the "
254 "system-wide aio request limit. "
255 "(e.g. 'echo 1048576 > /proc/sys/fs/"
256 "aio-max-nr')\n");
257 else
258 DPRINTF("Couldn't get fd for AIO poll "
259 "support. This is probably because "
260 "your kernel does not have the "
261 "aio-poll patch applied.\n");
262 queue->aio_ctx = NULL;
263 goto fail;
264 }
266 queue->event =
267 tapdisk_server_register_event(SCHEDULER_POLL_READ_FD,
268 queue->poll_fd, 0,
269 tapdisk_tiocb_event,
270 queue);
271 err = queue->event;
272 if (err < 0)
273 goto fail;
275 }
277 err = -ENOMEM;
278 queue->iocbs = calloc(size, sizeof(struct iocb *));
279 queue->aio_events = calloc(size, sizeof(struct io_event));
280 if (!queue->iocbs || !queue->aio_events)
281 goto fail;
283 err = opio_init(&queue->opioctx, size);
284 if (err)
285 goto fail;
287 return 0;
289 fail:
290 tapdisk_free_queue(queue);
291 return err;
292 }
294 void
295 tapdisk_free_queue(struct tqueue *queue)
296 {
297 if (queue->event >= 0) {
298 tapdisk_server_unregister_event(queue->event);
299 queue->event = -1;
300 }
302 if (queue->aio_ctx) {
303 io_destroy(queue->aio_ctx);
304 queue->aio_ctx = NULL;
305 }
307 free(queue->iocbs);
308 queue->iocbs = NULL;
310 free(queue->aio_events);
311 queue->aio_events = NULL;
313 opio_free(&queue->opioctx);
314 }
316 void
317 tapdisk_debug_queue(struct tqueue *queue)
318 {
319 struct tiocb *tiocb = queue->deferred.head;
321 WARN("TAPDISK QUEUE:\n");
322 WARN("size: %d, sync: %d, queued: %d, iocbs_pending: %d, "
323 "tiocbs_pending: %d, tiocbs_deferred: %d, deferrals: %"PRIx64"\n",
324 queue->size, queue->sync, queue->queued, queue->iocbs_pending,
325 queue->tiocbs_pending, queue->tiocbs_deferred, queue->deferrals);
327 if (tiocb) {
328 WARN("deferred:\n");
329 for (; tiocb != NULL; tiocb = tiocb->next) {
330 struct iocb *io = &tiocb->iocb;
331 WARN("%s of %lu bytes at %lld\n",
332 (io->aio_lio_opcode == IO_CMD_PWRITE ?
333 "write" : "read"),
334 io->u.c.nbytes, io->u.c.offset);
335 }
336 }
337 }
339 void
340 tapdisk_prep_tiocb(struct tiocb *tiocb, int fd, int rw, char *buf, size_t size,
341 long long offset, td_queue_callback_t cb, void *arg)
342 {
343 struct iocb *iocb = &tiocb->iocb;
345 if (rw)
346 io_prep_pwrite(iocb, fd, buf, size, offset);
347 else
348 io_prep_pread(iocb, fd, buf, size, offset);
350 iocb->data = tiocb;
351 tiocb->cb = cb;
352 tiocb->arg = arg;
353 tiocb->next = NULL;
354 }
356 void
357 tapdisk_queue_tiocb(struct tqueue *queue, struct tiocb *tiocb)
358 {
359 if (!tapdisk_queue_full(queue))
360 queue_tiocb(queue, tiocb);
361 else
362 defer_tiocb(queue, tiocb);
363 }
365 /*
366 * fail_tiocbs may queue more tiocbs
367 */
368 int
369 tapdisk_submit_tiocbs(struct tqueue *queue)
370 {
371 int merged, submitted, err = 0;
373 if (!queue->queued)
374 return 0;
376 if (queue->sync)
377 return io_synchronous_rw(queue);
379 tapdisk_filter_iocbs(queue->filter, queue->iocbs, queue->queued);
380 merged = io_merge(&queue->opioctx, queue->iocbs, queue->queued);
381 submitted = io_submit(queue->aio_ctx, merged, queue->iocbs);
383 DBG("queued: %d, merged: %d, submitted: %d\n",
384 queue->queued, merged, submitted);
386 if (submitted < 0) {
387 err = submitted;
388 submitted = 0;
389 } else if (submitted < merged)
390 err = -EIO;
392 queue->iocbs_pending += submitted;
393 queue->tiocbs_pending += queue->queued;
394 queue->queued = 0;
396 if (err)
397 queue->tiocbs_pending -=
398 fail_tiocbs(queue, submitted, merged, err);
400 return submitted;
401 }
403 int
404 tapdisk_submit_all_tiocbs(struct tqueue *queue)
405 {
406 int submitted = 0;
408 do {
409 submitted += tapdisk_submit_tiocbs(queue);
410 } while (!tapdisk_queue_empty(queue));
412 return submitted;
413 }
415 static void
416 tapdisk_complete_tiocbs(struct tqueue *queue)
417 {
418 int i, ret, split;
419 struct iocb *iocb;
420 struct tiocb *tiocb;
421 struct io_event *ep;
423 ret = io_getevents(queue->aio_ctx, 0,
424 queue->size, queue->aio_events, NULL);
425 split = io_split(&queue->opioctx, queue->aio_events, ret);
426 tapdisk_filter_events(queue->filter, queue->aio_events, split);
428 DBG("events: %d, tiocbs: %d\n", ret, split);
430 queue->iocbs_pending -= ret;
431 queue->tiocbs_pending -= split;
433 for (i = split, ep = queue->aio_events; i-- > 0; ep++) {
434 iocb = ep->obj;
435 tiocb = (struct tiocb *)iocb->data;
436 complete_tiocb(queue, tiocb, ep->res);
437 }
439 queue_deferred_tiocbs(queue);
440 }
442 static void
443 tapdisk_tiocb_event(event_id_t id, char mode, void *private)
444 {
445 struct tqueue *queue = private;
446 tapdisk_complete_tiocbs(queue);
447 }
449 /*
450 * cancel_tiocbs may queue more tiocbs
451 */
452 int
453 tapdisk_cancel_tiocbs(struct tqueue *queue)
454 {
455 return cancel_tiocbs(queue, -EIO);
456 }
458 int
459 tapdisk_cancel_all_tiocbs(struct tqueue *queue)
460 {
461 int cancelled = 0;
463 do {
464 cancelled += tapdisk_cancel_tiocbs(queue);
465 } while (!tapdisk_queue_empty(queue));
467 return cancelled;
468 }