debuggers.hg

view tools/xenstore/xs.c @ 0:7d21f7218375

Exact replica of unstable on 051908 + README-this
author Mukesh Rathor
date Mon May 19 15:34:57 2008 -0700 (2008-05-19)
parents
children f875aaa791f0
line source
1 /*
2 Xen Store Daemon interface providing simple tree-like database.
3 Copyright (C) 2005 Rusty Russell IBM Corporation
5 This library is free software; you can redistribute it and/or
6 modify it under the terms of the GNU Lesser General Public
7 License as published by the Free Software Foundation; either
8 version 2.1 of the License, or (at your option) any later version.
10 This library is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Lesser General Public License for more details.
15 You should have received a copy of the GNU Lesser General Public
16 License along with this library; if not, write to the Free Software
17 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18 */
20 #include <sys/types.h>
21 #include <sys/stat.h>
22 #include <fcntl.h>
23 #include <sys/uio.h>
24 #include <sys/socket.h>
25 #include <sys/un.h>
26 #include <string.h>
27 #include <unistd.h>
28 #include <stdbool.h>
29 #include <stdlib.h>
30 #include <assert.h>
31 #include <stdio.h>
32 #include <signal.h>
33 #include <stdint.h>
34 #include <errno.h>
35 #include "xs.h"
36 #include "list.h"
37 #include "utils.h"
39 struct xs_stored_msg {
40 struct list_head list;
41 struct xsd_sockmsg hdr;
42 char *body;
43 };
45 #ifdef USE_PTHREAD
47 #include <pthread.h>
49 struct xs_handle {
50 /* Communications channel to xenstore daemon. */
51 int fd;
53 /*
54 * A read thread which pulls messages off the comms channel and
55 * signals waiters.
56 */
57 pthread_t read_thr;
58 int read_thr_exists;
60 /*
61 * A list of fired watch messages, protected by a mutex. Users can
62 * wait on the conditional variable until a watch is pending.
63 */
64 struct list_head watch_list;
65 pthread_mutex_t watch_mutex;
66 pthread_cond_t watch_condvar;
68 /* Clients can select() on this pipe to wait for a watch to fire. */
69 int watch_pipe[2];
71 /*
72 * A list of replies. Currently only one will ever be outstanding
73 * because we serialise requests. The requester can wait on the
74 * conditional variable for its response.
75 */
76 struct list_head reply_list;
77 pthread_mutex_t reply_mutex;
78 pthread_cond_t reply_condvar;
80 /* One request at a time. */
81 pthread_mutex_t request_mutex;
82 };
84 #define mutex_lock(m) pthread_mutex_lock(m)
85 #define mutex_unlock(m) pthread_mutex_unlock(m)
86 #define condvar_signal(c) pthread_cond_signal(c)
87 #define condvar_wait(c,m,hnd) pthread_cond_wait(c,m)
89 static void *read_thread(void *arg);
91 #else /* !defined(USE_PTHREAD) */
93 struct xs_handle {
94 int fd;
95 struct list_head reply_list;
96 struct list_head watch_list;
97 /* Clients can select() on this pipe to wait for a watch to fire. */
98 int watch_pipe[2];
99 };
101 #define mutex_lock(m) ((void)0)
102 #define mutex_unlock(m) ((void)0)
103 #define condvar_signal(c) ((void)0)
104 #define condvar_wait(c,m,hnd) read_message(hnd)
106 #endif
108 static int read_message(struct xs_handle *h);
110 int xs_fileno(struct xs_handle *h)
111 {
112 char c = 0;
114 mutex_lock(&h->watch_mutex);
116 if ((h->watch_pipe[0] == -1) && (pipe(h->watch_pipe) != -1)) {
117 /* Kick things off if the watch list is already non-empty. */
118 if (!list_empty(&h->watch_list))
119 while (write(h->watch_pipe[1], &c, 1) != 1)
120 continue;
121 }
123 mutex_unlock(&h->watch_mutex);
125 return h->watch_pipe[0];
126 }
128 static int get_socket(const char *connect_to)
129 {
130 struct sockaddr_un addr;
131 int sock, saved_errno, flags;
133 sock = socket(PF_UNIX, SOCK_STREAM, 0);
134 if (sock < 0)
135 return -1;
137 if ((flags = fcntl(sock, F_GETFD)) < 0)
138 goto error;
139 flags |= FD_CLOEXEC;
140 if (fcntl(sock, F_SETFD, flags) < 0)
141 goto error;
143 addr.sun_family = AF_UNIX;
144 strcpy(addr.sun_path, connect_to);
146 if (connect(sock, (struct sockaddr *)&addr, sizeof(addr)) != 0)
147 goto error;
149 return sock;
151 error:
152 saved_errno = errno;
153 close(sock);
154 errno = saved_errno;
155 return -1;
156 }
158 static int get_dev(const char *connect_to)
159 {
160 return open(connect_to, O_RDWR);
161 }
163 static struct xs_handle *get_handle(const char *connect_to)
164 {
165 struct stat buf;
166 struct xs_handle *h = NULL;
167 int fd = -1, saved_errno;
169 if (stat(connect_to, &buf) != 0)
170 return NULL;
172 if (S_ISSOCK(buf.st_mode))
173 fd = get_socket(connect_to);
174 else
175 fd = get_dev(connect_to);
177 if (fd == -1)
178 return NULL;
180 h = malloc(sizeof(*h));
181 if (h == NULL) {
182 saved_errno = errno;
183 close(fd);
184 errno = saved_errno;
185 return NULL;
186 }
188 memset(h, 0, sizeof(*h));
190 h->fd = fd;
192 INIT_LIST_HEAD(&h->reply_list);
193 INIT_LIST_HEAD(&h->watch_list);
195 /* Watch pipe is allocated on demand in xs_fileno(). */
196 h->watch_pipe[0] = h->watch_pipe[1] = -1;
198 #ifdef USE_PTHREAD
199 pthread_mutex_init(&h->watch_mutex, NULL);
200 pthread_cond_init(&h->watch_condvar, NULL);
202 pthread_mutex_init(&h->reply_mutex, NULL);
203 pthread_cond_init(&h->reply_condvar, NULL);
205 pthread_mutex_init(&h->request_mutex, NULL);
206 #endif
208 return h;
209 }
211 struct xs_handle *xs_daemon_open(void)
212 {
213 return get_handle(xs_daemon_socket());
214 }
216 struct xs_handle *xs_daemon_open_readonly(void)
217 {
218 return get_handle(xs_daemon_socket_ro());
219 }
221 struct xs_handle *xs_domain_open(void)
222 {
223 return get_handle(xs_domain_dev());
224 }
226 void xs_daemon_close(struct xs_handle *h)
227 {
228 struct xs_stored_msg *msg, *tmsg;
230 mutex_lock(&h->request_mutex);
231 mutex_lock(&h->reply_mutex);
232 mutex_lock(&h->watch_mutex);
234 #ifdef USE_PTHREAD
235 if (h->read_thr_exists) {
236 /* XXX FIXME: May leak an unpublished message buffer. */
237 pthread_cancel(h->read_thr);
238 pthread_join(h->read_thr, NULL);
239 }
240 #endif
242 list_for_each_entry_safe(msg, tmsg, &h->reply_list, list) {
243 free(msg->body);
244 free(msg);
245 }
247 list_for_each_entry_safe(msg, tmsg, &h->watch_list, list) {
248 free(msg->body);
249 free(msg);
250 }
252 mutex_unlock(&h->request_mutex);
253 mutex_unlock(&h->reply_mutex);
254 mutex_unlock(&h->watch_mutex);
256 if (h->watch_pipe[0] != -1) {
257 close(h->watch_pipe[0]);
258 close(h->watch_pipe[1]);
259 }
261 close(h->fd);
263 free(h);
264 }
266 static bool read_all(int fd, void *data, unsigned int len)
267 {
268 while (len) {
269 int done;
271 done = read(fd, data, len);
272 if (done < 0) {
273 if (errno == EINTR)
274 continue;
275 return false;
276 }
277 if (done == 0) {
278 /* It closed fd on us? EBADF is appropriate. */
279 errno = EBADF;
280 return false;
281 }
282 data += done;
283 len -= done;
284 }
286 return true;
287 }
289 #ifdef XSTEST
290 #define read_all read_all_choice
291 #define xs_write_all write_all_choice
292 #endif
294 static int get_error(const char *errorstring)
295 {
296 unsigned int i;
298 for (i = 0; !streq(errorstring, xsd_errors[i].errstring); i++)
299 if (i == ARRAY_SIZE(xsd_errors) - 1)
300 return EINVAL;
301 return xsd_errors[i].errnum;
302 }
304 /* Adds extra nul terminator, because we generally (always?) hold strings. */
305 static void *read_reply(
306 struct xs_handle *h, enum xsd_sockmsg_type *type, unsigned int *len)
307 {
308 struct xs_stored_msg *msg;
309 char *body;
311 #ifdef USE_PTHREAD
312 /* Read from comms channel ourselves if there is no reader thread. */
313 if (!h->read_thr_exists && (read_message(h) == -1))
314 return NULL;
315 #endif
317 mutex_lock(&h->reply_mutex);
318 while (list_empty(&h->reply_list))
319 condvar_wait(&h->reply_condvar, &h->reply_mutex, h);
320 msg = list_top(&h->reply_list, struct xs_stored_msg, list);
321 list_del(&msg->list);
322 assert(list_empty(&h->reply_list));
323 mutex_unlock(&h->reply_mutex);
325 *type = msg->hdr.type;
326 if (len)
327 *len = msg->hdr.len;
328 body = msg->body;
330 free(msg);
332 return body;
333 }
335 /* Send message to xs, get malloc'ed reply. NULL and set errno on error. */
336 static void *xs_talkv(struct xs_handle *h, xs_transaction_t t,
337 enum xsd_sockmsg_type type,
338 const struct iovec *iovec,
339 unsigned int num_vecs,
340 unsigned int *len)
341 {
342 struct xsd_sockmsg msg;
343 void *ret = NULL;
344 int saved_errno;
345 unsigned int i;
346 struct sigaction ignorepipe, oldact;
348 msg.tx_id = t;
349 msg.req_id = 0;
350 msg.type = type;
351 msg.len = 0;
352 for (i = 0; i < num_vecs; i++)
353 msg.len += iovec[i].iov_len;
355 if (msg.len > XENSTORE_PAYLOAD_MAX) {
356 errno = E2BIG;
357 return 0;
358 }
360 ignorepipe.sa_handler = SIG_IGN;
361 sigemptyset(&ignorepipe.sa_mask);
362 ignorepipe.sa_flags = 0;
363 sigaction(SIGPIPE, &ignorepipe, &oldact);
365 mutex_lock(&h->request_mutex);
367 if (!xs_write_all(h->fd, &msg, sizeof(msg)))
368 goto fail;
370 for (i = 0; i < num_vecs; i++)
371 if (!xs_write_all(h->fd, iovec[i].iov_base, iovec[i].iov_len))
372 goto fail;
374 ret = read_reply(h, &msg.type, len);
375 if (!ret)
376 goto fail;
378 mutex_unlock(&h->request_mutex);
380 sigaction(SIGPIPE, &oldact, NULL);
381 if (msg.type == XS_ERROR) {
382 saved_errno = get_error(ret);
383 free(ret);
384 errno = saved_errno;
385 return NULL;
386 }
388 if (msg.type != type) {
389 free(ret);
390 saved_errno = EBADF;
391 goto close_fd;
392 }
393 return ret;
395 fail:
396 /* We're in a bad state, so close fd. */
397 saved_errno = errno;
398 mutex_unlock(&h->request_mutex);
399 sigaction(SIGPIPE, &oldact, NULL);
400 close_fd:
401 close(h->fd);
402 h->fd = -1;
403 errno = saved_errno;
404 return NULL;
405 }
407 /* free(), but don't change errno. */
408 static void free_no_errno(void *p)
409 {
410 int saved_errno = errno;
411 free(p);
412 errno = saved_errno;
413 }
415 /* Simplified version of xs_talkv: single message. */
416 static void *xs_single(struct xs_handle *h, xs_transaction_t t,
417 enum xsd_sockmsg_type type,
418 const char *string,
419 unsigned int *len)
420 {
421 struct iovec iovec;
423 iovec.iov_base = (void *)string;
424 iovec.iov_len = strlen(string) + 1;
425 return xs_talkv(h, t, type, &iovec, 1, len);
426 }
428 static bool xs_bool(char *reply)
429 {
430 if (!reply)
431 return false;
432 free(reply);
433 return true;
434 }
436 char **xs_directory(struct xs_handle *h, xs_transaction_t t,
437 const char *path, unsigned int *num)
438 {
439 char *strings, *p, **ret;
440 unsigned int len;
442 strings = xs_single(h, t, XS_DIRECTORY, path, &len);
443 if (!strings)
444 return NULL;
446 /* Count the strings. */
447 *num = xs_count_strings(strings, len);
449 /* Transfer to one big alloc for easy freeing. */
450 ret = malloc(*num * sizeof(char *) + len);
451 if (!ret) {
452 free_no_errno(strings);
453 return NULL;
454 }
455 memcpy(&ret[*num], strings, len);
456 free_no_errno(strings);
458 strings = (char *)&ret[*num];
459 for (p = strings, *num = 0; p < strings + len; p += strlen(p) + 1)
460 ret[(*num)++] = p;
461 return ret;
462 }
464 /* Get the value of a single file, nul terminated.
465 * Returns a malloced value: call free() on it after use.
466 * len indicates length in bytes, not including the nul.
467 */
468 void *xs_read(struct xs_handle *h, xs_transaction_t t,
469 const char *path, unsigned int *len)
470 {
471 return xs_single(h, t, XS_READ, path, len);
472 }
474 /* Write the value of a single file.
475 * Returns false on failure.
476 */
477 bool xs_write(struct xs_handle *h, xs_transaction_t t,
478 const char *path, const void *data, unsigned int len)
479 {
480 struct iovec iovec[2];
482 iovec[0].iov_base = (void *)path;
483 iovec[0].iov_len = strlen(path) + 1;
484 iovec[1].iov_base = (void *)data;
485 iovec[1].iov_len = len;
487 return xs_bool(xs_talkv(h, t, XS_WRITE, iovec,
488 ARRAY_SIZE(iovec), NULL));
489 }
491 /* Create a new directory.
492 * Returns false on failure, or success if it already exists.
493 */
494 bool xs_mkdir(struct xs_handle *h, xs_transaction_t t,
495 const char *path)
496 {
497 return xs_bool(xs_single(h, t, XS_MKDIR, path, NULL));
498 }
500 /* Destroy a file or directory (directories must be empty).
501 * Returns false on failure, or success if it doesn't exist.
502 */
503 bool xs_rm(struct xs_handle *h, xs_transaction_t t,
504 const char *path)
505 {
506 return xs_bool(xs_single(h, t, XS_RM, path, NULL));
507 }
509 /* Get permissions of node (first element is owner).
510 * Returns malloced array, or NULL: call free() after use.
511 */
512 struct xs_permissions *xs_get_permissions(struct xs_handle *h,
513 xs_transaction_t t,
514 const char *path, unsigned int *num)
515 {
516 char *strings;
517 unsigned int len;
518 struct xs_permissions *ret;
520 strings = xs_single(h, t, XS_GET_PERMS, path, &len);
521 if (!strings)
522 return NULL;
524 /* Count the strings: each one perms then domid. */
525 *num = xs_count_strings(strings, len);
527 /* Transfer to one big alloc for easy freeing. */
528 ret = malloc(*num * sizeof(struct xs_permissions));
529 if (!ret) {
530 free_no_errno(strings);
531 return NULL;
532 }
534 if (!xs_strings_to_perms(ret, *num, strings)) {
535 free_no_errno(ret);
536 ret = NULL;
537 }
539 free(strings);
540 return ret;
541 }
543 /* Set permissions of node (must be owner).
544 * Returns false on failure.
545 */
546 bool xs_set_permissions(struct xs_handle *h,
547 xs_transaction_t t,
548 const char *path,
549 struct xs_permissions *perms,
550 unsigned int num_perms)
551 {
552 unsigned int i;
553 struct iovec iov[1+num_perms];
555 iov[0].iov_base = (void *)path;
556 iov[0].iov_len = strlen(path) + 1;
558 for (i = 0; i < num_perms; i++) {
559 char buffer[MAX_STRLEN(unsigned int)+1];
561 if (!xs_perm_to_string(&perms[i], buffer, sizeof(buffer)))
562 goto unwind;
564 iov[i+1].iov_base = strdup(buffer);
565 iov[i+1].iov_len = strlen(buffer) + 1;
566 if (!iov[i+1].iov_base)
567 goto unwind;
568 }
570 if (!xs_bool(xs_talkv(h, t, XS_SET_PERMS, iov, 1+num_perms, NULL)))
571 goto unwind;
572 for (i = 0; i < num_perms; i++)
573 free(iov[i+1].iov_base);
574 return true;
576 unwind:
577 num_perms = i;
578 for (i = 0; i < num_perms; i++)
579 free_no_errno(iov[i+1].iov_base);
580 return false;
581 }
583 /* Watch a node for changes (poll on fd to detect, or call read_watch()).
584 * When the node (or any child) changes, fd will become readable.
585 * Token is returned when watch is read, to allow matching.
586 * Returns false on failure.
587 */
588 bool xs_watch(struct xs_handle *h, const char *path, const char *token)
589 {
590 struct iovec iov[2];
592 #ifdef USE_PTHREAD
593 /* We dynamically create a reader thread on demand. */
594 mutex_lock(&h->request_mutex);
595 if (!h->read_thr_exists) {
596 if (pthread_create(&h->read_thr, NULL, read_thread, h) != 0) {
597 mutex_unlock(&h->request_mutex);
598 return false;
599 }
600 h->read_thr_exists = 1;
601 }
602 mutex_unlock(&h->request_mutex);
603 #endif
605 iov[0].iov_base = (void *)path;
606 iov[0].iov_len = strlen(path) + 1;
607 iov[1].iov_base = (void *)token;
608 iov[1].iov_len = strlen(token) + 1;
610 return xs_bool(xs_talkv(h, XBT_NULL, XS_WATCH, iov,
611 ARRAY_SIZE(iov), NULL));
612 }
614 /* Find out what node change was on (will block if nothing pending).
615 * Returns array of two pointers: path and token, or NULL.
616 * Call free() after use.
617 */
618 char **xs_read_watch(struct xs_handle *h, unsigned int *num)
619 {
620 struct xs_stored_msg *msg;
621 char **ret, *strings, c = 0;
622 unsigned int num_strings, i;
624 mutex_lock(&h->watch_mutex);
626 /* Wait on the condition variable for a watch to fire. */
627 while (list_empty(&h->watch_list))
628 condvar_wait(&h->watch_condvar, &h->watch_mutex, h);
629 msg = list_top(&h->watch_list, struct xs_stored_msg, list);
630 list_del(&msg->list);
632 /* Clear the pipe token if there are no more pending watches. */
633 if (list_empty(&h->watch_list) && (h->watch_pipe[0] != -1))
634 while (read(h->watch_pipe[0], &c, 1) != 1)
635 continue;
637 mutex_unlock(&h->watch_mutex);
639 assert(msg->hdr.type == XS_WATCH_EVENT);
641 strings = msg->body;
642 num_strings = xs_count_strings(strings, msg->hdr.len);
644 ret = malloc(sizeof(char*) * num_strings + msg->hdr.len);
645 if (!ret) {
646 free_no_errno(strings);
647 free_no_errno(msg);
648 return NULL;
649 }
651 ret[0] = (char *)(ret + num_strings);
652 memcpy(ret[0], strings, msg->hdr.len);
654 free(strings);
655 free(msg);
657 for (i = 1; i < num_strings; i++)
658 ret[i] = ret[i - 1] + strlen(ret[i - 1]) + 1;
660 *num = num_strings;
662 return ret;
663 }
665 /* Remove a watch on a node.
666 * Returns false on failure (no watch on that node).
667 */
668 bool xs_unwatch(struct xs_handle *h, const char *path, const char *token)
669 {
670 struct iovec iov[2];
672 iov[0].iov_base = (char *)path;
673 iov[0].iov_len = strlen(path) + 1;
674 iov[1].iov_base = (char *)token;
675 iov[1].iov_len = strlen(token) + 1;
677 return xs_bool(xs_talkv(h, XBT_NULL, XS_UNWATCH, iov,
678 ARRAY_SIZE(iov), NULL));
679 }
681 /* Start a transaction: changes by others will not be seen during this
682 * transaction, and changes will not be visible to others until end.
683 * Returns XBT_NULL on failure.
684 */
685 xs_transaction_t xs_transaction_start(struct xs_handle *h)
686 {
687 char *id_str;
688 xs_transaction_t id;
690 id_str = xs_single(h, XBT_NULL, XS_TRANSACTION_START, "", NULL);
691 if (id_str == NULL)
692 return XBT_NULL;
694 id = strtoul(id_str, NULL, 0);
695 free(id_str);
697 return id;
698 }
700 /* End a transaction.
701 * If abandon is true, transaction is discarded instead of committed.
702 * Returns false on failure, which indicates an error: transactions will
703 * not fail spuriously.
704 */
705 bool xs_transaction_end(struct xs_handle *h, xs_transaction_t t,
706 bool abort)
707 {
708 char abortstr[2];
710 if (abort)
711 strcpy(abortstr, "F");
712 else
713 strcpy(abortstr, "T");
715 return xs_bool(xs_single(h, t, XS_TRANSACTION_END, abortstr, NULL));
716 }
718 /* Introduce a new domain.
719 * This tells the store daemon about a shared memory page and event channel
720 * associated with a domain: the domain uses these to communicate.
721 */
722 bool xs_introduce_domain(struct xs_handle *h,
723 unsigned int domid, unsigned long mfn,
724 unsigned int eventchn)
725 {
726 char domid_str[MAX_STRLEN(domid)];
727 char mfn_str[MAX_STRLEN(mfn)];
728 char eventchn_str[MAX_STRLEN(eventchn)];
729 struct iovec iov[3];
731 snprintf(domid_str, sizeof(domid_str), "%u", domid);
732 snprintf(mfn_str, sizeof(mfn_str), "%lu", mfn);
733 snprintf(eventchn_str, sizeof(eventchn_str), "%u", eventchn);
735 iov[0].iov_base = domid_str;
736 iov[0].iov_len = strlen(domid_str) + 1;
737 iov[1].iov_base = mfn_str;
738 iov[1].iov_len = strlen(mfn_str) + 1;
739 iov[2].iov_base = eventchn_str;
740 iov[2].iov_len = strlen(eventchn_str) + 1;
742 return xs_bool(xs_talkv(h, XBT_NULL, XS_INTRODUCE, iov,
743 ARRAY_SIZE(iov), NULL));
744 }
746 bool xs_set_target(struct xs_handle *h,
747 unsigned int domid, unsigned int target)
748 {
749 char domid_str[MAX_STRLEN(domid)];
750 char target_str[MAX_STRLEN(target)];
751 struct iovec iov[2];
753 snprintf(domid_str, sizeof(domid_str), "%u", domid);
754 snprintf(target_str, sizeof(target_str), "%u", target);
756 iov[0].iov_base = domid_str;
757 iov[0].iov_len = strlen(domid_str) + 1;
758 iov[1].iov_base = target_str;
759 iov[1].iov_len = strlen(target_str) + 1;
761 return xs_bool(xs_talkv(h, XBT_NULL, XS_SET_TARGET, iov,
762 ARRAY_SIZE(iov), NULL));
763 }
765 static void * single_with_domid(struct xs_handle *h,
766 enum xsd_sockmsg_type type,
767 unsigned int domid)
768 {
769 char domid_str[MAX_STRLEN(domid)];
771 snprintf(domid_str, sizeof(domid_str), "%u", domid);
773 return xs_single(h, XBT_NULL, type, domid_str, NULL);
774 }
776 bool xs_release_domain(struct xs_handle *h, unsigned int domid)
777 {
778 return xs_bool(single_with_domid(h, XS_RELEASE, domid));
779 }
781 /* clear the shutdown bit for the given domain */
782 bool xs_resume_domain(struct xs_handle *h, unsigned int domid)
783 {
784 return xs_bool(single_with_domid(h, XS_RESUME, domid));
785 }
787 char *xs_get_domain_path(struct xs_handle *h, unsigned int domid)
788 {
789 char domid_str[MAX_STRLEN(domid)];
791 snprintf(domid_str, sizeof(domid_str), "%u", domid);
793 return xs_single(h, XBT_NULL, XS_GET_DOMAIN_PATH, domid_str, NULL);
794 }
796 bool xs_is_domain_introduced(struct xs_handle *h, unsigned int domid)
797 {
798 return strcmp("F",
799 single_with_domid(h, XS_IS_DOMAIN_INTRODUCED, domid));
800 }
802 /* Only useful for DEBUG versions */
803 char *xs_debug_command(struct xs_handle *h, const char *cmd,
804 void *data, unsigned int len)
805 {
806 struct iovec iov[2];
808 iov[0].iov_base = (void *)cmd;
809 iov[0].iov_len = strlen(cmd) + 1;
810 iov[1].iov_base = data;
811 iov[1].iov_len = len;
813 return xs_talkv(h, XBT_NULL, XS_DEBUG, iov,
814 ARRAY_SIZE(iov), NULL);
815 }
817 static int read_message(struct xs_handle *h)
818 {
819 struct xs_stored_msg *msg = NULL;
820 char *body = NULL;
821 int saved_errno;
823 /* Allocate message structure and read the message header. */
824 msg = malloc(sizeof(*msg));
825 if (msg == NULL)
826 goto error;
827 if (!read_all(h->fd, &msg->hdr, sizeof(msg->hdr)))
828 goto error;
830 /* Allocate and read the message body. */
831 body = msg->body = malloc(msg->hdr.len + 1);
832 if (body == NULL)
833 goto error;
834 if (!read_all(h->fd, body, msg->hdr.len))
835 goto error;
836 body[msg->hdr.len] = '\0';
838 if (msg->hdr.type == XS_WATCH_EVENT) {
839 mutex_lock(&h->watch_mutex);
841 /* Kick users out of their select() loop. */
842 if (list_empty(&h->watch_list) &&
843 (h->watch_pipe[1] != -1))
844 while (write(h->watch_pipe[1], body, 1) != 1)
845 continue;
847 list_add_tail(&msg->list, &h->watch_list);
849 condvar_signal(&h->watch_condvar);
851 mutex_unlock(&h->watch_mutex);
852 } else {
853 mutex_lock(&h->reply_mutex);
855 /* There should only ever be one response pending! */
856 if (!list_empty(&h->reply_list)) {
857 mutex_unlock(&h->reply_mutex);
858 goto error;
859 }
861 list_add_tail(&msg->list, &h->reply_list);
862 condvar_signal(&h->reply_condvar);
864 mutex_unlock(&h->reply_mutex);
865 }
867 return 0;
869 error:
870 saved_errno = errno;
871 free(msg);
872 free(body);
873 errno = saved_errno;
874 return -1;
875 }
877 #ifdef USE_PTHREAD
878 static void *read_thread(void *arg)
879 {
880 struct xs_handle *h = arg;
882 while (read_message(h) != -1)
883 continue;
885 return NULL;
886 }
887 #endif
889 /*
890 * Local variables:
891 * c-file-style: "linux"
892 * indent-tabs-mode: t
893 * c-indent-level: 8
894 * c-basic-offset: 8
895 * tab-width: 8
896 * End:
897 */