--- /dev/null
+diff --git a/drivers/xen/v2v/v2v.c b/drivers/xen/v2v/v2v.c
+index 8f6efd4..57d51be 100644
+--- a/drivers/xen/v2v/v2v.c
++++ b/drivers/xen/v2v/v2v.c
+@@ -3,8 +3,8 @@
+ *
+ * V2V interdomain communication driver.
+ *
+- * Copyright (c) 2009 Steven Smith
+ * Copyright (c) 2009 Ross Philipson
++ * Copyright (c) 2009 Steven Smith
+ * Copyright (c) 2009 Citrix Systems, Inc.
+ *
+ * This program is free software; you can redistribute it and/or
+@@ -36,6 +36,7 @@
+ #include <linux/init.h>
+ #include <linux/module.h>
+ #include <linux/vmalloc.h>
++#include <linux/interrupt.h>
+ #include <xen/xenbus.h>
+ #include <xen/evtchn.h>
+ #include <xen/gnttab.h>
+@@ -67,6 +68,9 @@ struct v2v_channel {
+
+ int receive_evtchn_irq;
+ int send_evtchn_irq;
++
++ /* async values */
++ struct v2v_async asv;
+
+ unsigned nr_prod_ring_pages;
+ unsigned nr_cons_ring_pages;
+@@ -77,15 +81,17 @@ struct v2v_channel {
+
+ int has_watch;
+ int is_temple;
++ int is_sync;
+
+ union {
+ struct {
+ int has_grant_alloc;
++ int accepted;
+ grant_ref_t gref_head;
+
+ grant_ref_t prod_grefs[MAX_RING_PAGES];
+ grant_ref_t cons_grefs[MAX_RING_PAGES];
+- grant_ref_t control_gref;
++ grant_ref_t control_gref;
+ } temple;
+ struct {
+ struct vm_struct *prod_area;
+@@ -203,12 +209,23 @@ v2v_remote_state_changed(struct xenbus_watch *watch,
+ return; /* not much we can do */
+
+ v2v_set_event(channel);
++
++ /* Callback interested parties that regitered a control callback */
++ if (channel->is_temple && !channel->u.temple.accepted)
++ return;
++ if (channel->asv.control_cb)
++ channel->asv.control_cb(channel->asv.control_ctx);
+ }
+
+ static irqreturn_t
+ send_int(int irq, void *dev_id)
+ {
+ struct v2v_channel *channel = dev_id;
++
++ if (!channel->is_sync) {
++ channel->asv.send_int(channel->asv.send_ctx);
++ return IRQ_HANDLED;
++ }
+
+ if (v2v_wrq_queue(channel, V2V_WAKE_REASON_SEND))
+ return IRQ_HANDLED; /* not much we can do */
+@@ -223,6 +240,11 @@ receive_int(int irq, void *dev_id)
+ {
+ struct v2v_channel *channel = dev_id;
+
++ if (!channel->is_sync) {
++ channel->asv.receive_int(channel->asv.receive_ctx);
++ return IRQ_HANDLED;
++ }
++
+ if (v2v_wrq_queue(channel, V2V_WAKE_REASON_RECEIVE))
+ return IRQ_HANDLED; /* not much we can do */
+
+@@ -231,6 +253,26 @@ receive_int(int irq, void *dev_id)
+ return IRQ_HANDLED;
+ }
+
++static inline void
++v2v_close_receive_evtchn(struct v2v_channel *channel)
++{
++ DPRINTK("closing receive evtchn irg: %d\n", channel->receive_evtchn_irq);
++ if (channel->receive_evtchn_irq != 0) {
++ unbind_from_irqhandler(channel->receive_evtchn_irq, channel);
++ channel->receive_evtchn_irq = 0;
++ }
++}
++
++static inline void
++v2v_close_send_evtchn(struct v2v_channel *channel)
++{
++ DPRINTK("closing send evtchn irg: %d\n", channel->send_evtchn_irq);
++ if (channel->send_evtchn_irq != 0) {
++ unbind_from_irqhandler(channel->send_evtchn_irq, channel);
++ channel->send_evtchn_irq = 0;
++ }
++}
++
+ static void
+ v2v_destroy_channel(const struct v2v_channel *_chan, int free_temple)
+ {
+@@ -308,10 +350,9 @@ v2v_destroy_channel(const struct v2v_channel *_chan, int free_temple)
+ DPRINTK("freed grant refs and rings for temple.\n");
+ }
+
+- if (chan->receive_evtchn_irq != 0)
+- unbind_from_irqhandler(chan->receive_evtchn_irq, chan);
+- if (chan->send_evtchn_irq != 0)
+- unbind_from_irqhandler(chan->send_evtchn_irq, chan);
++ v2v_close_receive_evtchn(chan);
++ v2v_close_send_evtchn(chan);
++
+ DPRINTK("unbound irq handlers.\n");
+
+ /* cleanup anything in chan->wait_entry_list once the evtchns are shutdown */
+@@ -340,7 +381,7 @@ v2v_read_peer_domid(struct v2v_channel *chan)
+ }
+
+ static struct v2v_channel *
+-v2v_make_channel(const char *xenbus_prefix)
++v2v_make_channel(const char *xenbus_prefix, struct v2v_async *async_values)
+ {
+ struct v2v_channel *chan;
+
+@@ -357,9 +398,17 @@ v2v_make_channel(const char *xenbus_prefix)
+ strcpy(chan->local_prefix, xenbus_prefix);
+
+ init_waitqueue_head(&chan->wait_state.wait_event);
++ atomic_set(&chan->wait_state.wait_condition, 0);
+ spin_lock_init(&chan->wait_list_lock);
+ INIT_LIST_HEAD(&chan->wait_entry_list);
+
++ if (async_values) {
++ memcpy(&chan->asv, async_values, sizeof(struct v2v_async));
++ chan->is_sync = 0;
++ }
++ else
++ chan->is_sync = 1;
++
+ DPRINTK("created channel: %p with local prefix - %s\n", chan, chan->local_prefix);
+
+ return chan;
+@@ -479,7 +528,8 @@ v2v_change_local_state(struct v2v_channel *channel,
+
+ int
+ v2v_listen(const char *xenbus_prefix, struct v2v_channel **channel,
+- unsigned prod_ring_page_order, unsigned cons_ring_page_order)
++ unsigned prod_ring_page_order, unsigned cons_ring_page_order,
++ struct v2v_async *async_values)
+ {
+ unsigned prod_ring_size = PAGE_SIZE << prod_ring_page_order;
+ unsigned cons_ring_size = PAGE_SIZE << cons_ring_page_order;
+@@ -499,9 +549,15 @@ v2v_listen(const char *xenbus_prefix, struct v2v_channel **channel,
+ return -EINVAL;
+ }
+
++ if (async_values &&
++ (!async_values->receive_int || !async_values->send_int)) {
++ EPRINTK("v2v_listen - invalid async arguments\n");
++ return -EINVAL;
++ }
++
+ *channel = NULL;
+
+- chan = v2v_make_channel(xenbus_prefix);
++ chan = v2v_make_channel(xenbus_prefix, async_values);
+ if (!chan) {
+ EPRINTK("v2v_listen - out of memory making channel\n");
+ return -ENOMEM;
+@@ -587,8 +643,7 @@ v2v_listen(const char *xenbus_prefix, struct v2v_channel **channel,
+ gnttab_grant_foreign_access_ref(ref, chan->peer_domid, mfn, 0);
+ DPRINTK("created control grant ref\n");
+
+- err =
+- bind_listening_port_to_irqhandler(chan->peer_domid, receive_int, 0, "v2v", chan);
++ err = bind_listening_port_to_irqhandler(chan->peer_domid, receive_int, 0, "v2v", chan);
+ if (err < 0) {
+ EPRINTK("bind_listening_port_to_irqhandler(receive) failed - err: %d\n", err);
+ goto err_out;
+@@ -596,8 +651,7 @@ v2v_listen(const char *xenbus_prefix, struct v2v_channel **channel,
+ chan->receive_evtchn_irq = err;
+ DPRINTK("created listening port for receive: %d\n", chan->receive_evtchn_irq);
+
+- err =
+- bind_listening_port_to_irqhandler(chan->peer_domid, send_int, 0, "v2v", chan);
++ err = bind_listening_port_to_irqhandler(chan->peer_domid, send_int, 0, "v2v", chan);
+ if (err < 0) {
+ EPRINTK("bind_listening_port_to_irqhandler(send) failed - err: %d\n", err);
+ goto err_out;
+@@ -664,10 +718,8 @@ v2v_listen(const char *xenbus_prefix, struct v2v_channel **channel,
+ chan->u.temple.control_gref);
+ chan->u.temple.control_gref = GRANT_INVALID_REF;
+
+- unbind_from_irqhandler(chan->receive_evtchn_irq, chan);
+- chan->receive_evtchn_irq = 0;
+- unbind_from_irqhandler(chan->send_evtchn_irq, chan);
+- chan->send_evtchn_irq = 0;
++ v2v_close_receive_evtchn(chan);
++ v2v_close_send_evtchn(chan);
+
+ /* undo what v2v_connect_channel_xenbus did */
+ unregister_xenbus_watch(&chan->remote_state_watch);
+@@ -785,7 +837,7 @@ v2v_accept(struct v2v_channel *channel)
+ }
+ err = xenbus_transaction_end(xbt, 0);
+ if (err == 0)
+- return 0;
++ goto final; /* success */
+ if (err != -EAGAIN) {
+ EPRINTK("v2v_accept - error commiting xs transaction - err: %d\n", err);
+ return err;
+@@ -794,6 +846,9 @@ v2v_accept(struct v2v_channel *channel)
+ }
+ }
+
++final:
++ channel->u.temple.accepted = 1;
++
+ /* the initial state of the send event is set to wakeup for send since there
+ is always send room on startup of the rings */
+ err = v2v_wrq_queue(channel, V2V_WAKE_REASON_SEND);
+@@ -808,7 +863,8 @@ v2v_accept(struct v2v_channel *channel)
+ EXPORT_SYMBOL_GPL(v2v_accept);
+
+ int
+-v2v_connect(const char *xenbus_prefix, struct v2v_channel **channel)
++v2v_connect(const char *xenbus_prefix, struct v2v_channel **channel,
++ struct v2v_async *async_values)
+ {
+ int err = 0;
+ struct xenbus_transaction xbt = {0};
+@@ -825,10 +881,16 @@ v2v_connect(const char *xenbus_prefix, struct v2v_channel **channel)
+ return -EINVAL;
+ }
+
++ if (async_values &&
++ (!async_values->receive_int || !async_values->send_int)) {
++ EPRINTK("v2v_connect - invalid async arguments\n");
++ return -EINVAL;
++ }
++
+ *channel = NULL;
+
+ for (;;) {
+- chan = v2v_make_channel(xenbus_prefix);
++ chan = v2v_make_channel(xenbus_prefix, async_values);
+ if (!chan) {
+ EPRINTK("v2v_connect - out of memory making channel\n");
+ return -ENOMEM;
+@@ -970,9 +1032,9 @@ v2v_connect(const char *xenbus_prefix, struct v2v_channel **channel)
+ chan->control = chan->u.supplicant.control_area->addr;
+ DPRINTK("mapped controle grant ref\n");
+
+- err =
+- bind_interdomain_evtchn_to_irqhandler(chan->peer_domid, chan->u.supplicant.prod_evtchn_port,
+- receive_int, 0, "v2v", chan);
++ err = bind_interdomain_evtchn_to_irqhandler(chan->peer_domid,
++ chan->u.supplicant.prod_evtchn_port,
++ receive_int, 0, "v2v", chan);
+ if (err < 0) {
+ EPRINTK("bind_interdomain_evtchn_to_irqhandler(receive_int) failed - err: %d\n", err);
+ goto err_out;
+@@ -980,9 +1042,9 @@ v2v_connect(const char *xenbus_prefix, struct v2v_channel **channel)
+ chan->receive_evtchn_irq = err;
+ DPRINTK("bound interdomain port for receive: %d\n", chan->receive_evtchn_irq);
+
+- err =
+- bind_interdomain_evtchn_to_irqhandler(chan->peer_domid, chan->u.supplicant.cons_evtchn_port,
+- send_int, 0, "v2v", chan);
++ err = bind_interdomain_evtchn_to_irqhandler(chan->peer_domid,
++ chan->u.supplicant.cons_evtchn_port,
++ send_int, 0, "v2v", chan);
+ if (err < 0) {
+ EPRINTK("bind_interdomain_evtchn_to_irqhandler(send_int) failed - err: %d\n", err);
+ goto err_out;
+@@ -1052,6 +1114,8 @@ v2v_disconnect_temple(const struct v2v_channel *_channel)
+ err = v2v_change_local_state(channel, XBT_NIL, v2v_state_disconnecting);
+ if (err)
+ return err;
++
++ channel->u.temple.accepted = 0;
+
+ /* Get the other end to disconnect */
+ for (;;) {
+@@ -1167,14 +1231,8 @@ v2v_disconnect_temple(const struct v2v_channel *_channel)
+ if (!any_failed)
+ gnttab_free_grant_references(channel->u.temple.gref_head);
+
+- if (channel->receive_evtchn_irq != 0) {
+- unbind_from_irqhandler(channel->receive_evtchn_irq, channel);
+- channel->receive_evtchn_irq = 0;
+- }
+- if (channel->send_evtchn_irq != 0) {
+- unbind_from_irqhandler(channel->send_evtchn_irq, channel);
+- channel->send_evtchn_irq = 0;
+- }
++ v2v_close_receive_evtchn(channel);
++ v2v_close_send_evtchn(channel);
+
+ DPRINTK("finished disconnecting temple, channel: %p\n", channel);
+
+@@ -1197,14 +1255,8 @@ v2v_disconnect_supplicant(const struct v2v_channel *_channel)
+ channel->cons_sring = NULL;
+ channel->control = NULL;
+
+- if (channel->receive_evtchn_irq != 0) {
+- unbind_from_irqhandler(channel->receive_evtchn_irq, channel);
+- channel->receive_evtchn_irq = 0;
+- }
+- if (channel->send_evtchn_irq != 0) {
+- unbind_from_irqhandler(channel->send_evtchn_irq, channel);
+- channel->send_evtchn_irq = 0;
+- }
++ v2v_close_receive_evtchn(channel);
++ v2v_close_send_evtchn(channel);
+
+ err = v2v_change_local_state(channel, XBT_NIL, v2v_state_disconnected);
+ if (err) {
+@@ -1239,6 +1291,11 @@ EXPORT_SYMBOL_GPL(v2v_get_wait_state);
+ u8
+ v2v_get_wake_reason(struct v2v_channel *channel, u8 reasons)
+ {
++ if (!channel->is_sync) {
++ if ((reasons & (V2V_WAKE_REASON_SEND|V2V_WAKE_REASON_RECEIVE)) != 0)
++ return -EINVAL;
++ }
++
+ return v2v_wrq_dequeue(channel, reasons);
+ }
+ EXPORT_SYMBOL_GPL(v2v_get_wake_reason);
+@@ -1248,6 +1305,11 @@ v2v_set_wake_reason(struct v2v_channel *channel, u8 reason)
+ {
+ int err;
+
++ if (!channel->is_sync) {
++ if ((reason == V2V_WAKE_REASON_SEND)||(reason == V2V_WAKE_REASON_RECEIVE))
++ return -EINVAL;
++ }
++
+ err = v2v_wrq_queue(channel, reason);
+ if (err)
+ return err;
+@@ -1299,12 +1361,15 @@ retry:
+ goto retry;
+ }
+ /* ### clear the event (reset) */
+- v2v_wrq_clear(channel, V2V_WAKE_REASON_RECEIVE);
++ if (channel->is_sync)
++ v2v_wrq_clear(channel, V2V_WAKE_REASON_RECEIVE);
+
+ if (nc2_final_check_for_messages(&channel->nc2_rings, prod)) {
+ /* ### now set the event */
+- v2v_wrq_queue(channel, V2V_WAKE_REASON_RECEIVE);
+- v2v_set_event(channel);
++ if (channel->is_sync) {
++ v2v_wrq_queue(channel, V2V_WAKE_REASON_RECEIVE);
++ v2v_set_event(channel);
++ }
+ goto retry;
+ }
+ return -ENODATA;
+@@ -1378,14 +1443,17 @@ v2v_nc2_prep_message(struct v2v_channel *channel,
+ v2v_nc2_send_messages(channel);
+ if (!nc2_can_send_payload_bytes(&channel->nc2_rings, rounded_size)) {
+ /* ### clear the event (reset) */
+- v2v_wrq_clear(channel, V2V_WAKE_REASON_SEND);
++ if (channel->is_sync)
++ v2v_wrq_clear(channel, V2V_WAKE_REASON_SEND);
+
+ if (!nc2_can_send_payload_bytes(&channel->nc2_rings, rounded_size))
+ return -EAGAIN;
+
+ /* ### now set the event */
+- v2v_wrq_queue(channel, V2V_WAKE_REASON_SEND);
+- v2v_set_event(channel);
++ if (channel->is_sync) {
++ v2v_wrq_queue(channel, V2V_WAKE_REASON_SEND);
++ v2v_set_event(channel);
++ }
+ }
+ __nc2_avoid_ring_wrap(&channel->nc2_rings, rounded_size);
+ hdr = __nc2_get_message_ptr(&channel->nc2_rings);
+@@ -1499,4 +1567,3 @@ module_init(v2v_init);
+ module_exit(v2v_cleanup);
+
+ MODULE_LICENSE("Dual BSD/GPL");
+-
+diff --git a/drivers/xen/v2v/v2v_private.h b/drivers/xen/v2v/v2v_private.h
+index c623a2d..4af02ee 100644
+--- a/drivers/xen/v2v/v2v_private.h
++++ b/drivers/xen/v2v/v2v_private.h
+@@ -70,14 +70,15 @@ void v2v_xenops_grant_unmap(struct vm_struct *vm_area,
+ #define INVALID_GRANT_HANDLE ((grant_handle_t)~0U)
+ #define GRANT_INVALID_REF 0
+
+-#define DPRINTK(fmt, args...) \
+- pr_debug("v2v (%s:%d) " fmt, \
++#define DPRINTK(fmt, args...) \
++ pr_debug("v2v-dbg (%s:%d) " fmt, \
+ __FUNCTION__, __LINE__, ##args)
+-#define IPRINTK(fmt, args...) \
++
++#define IPRINTK(fmt, args...) \
+ printk(KERN_INFO "v2v: " fmt, ##args)
+-#define WPRINTK(fmt, args...) \
++#define WPRINTK(fmt, args...) \
+ printk(KERN_WARNING "v2v: " fmt, ##args)
+-#define EPRINTK(fmt, args...) \
++#define EPRINTK(fmt, args...) \
+ printk(KERN_ERR "v2v: " fmt, ##args)
+
+ #endif /* !V2V_PRIVATE_H__ */
+diff --git a/drivers/xen/v2v/v2vdrv.c b/drivers/xen/v2v/v2vdrv.c
+index 0c362bb..dfa8a43 100644
+--- a/drivers/xen/v2v/v2vdrv.c
++++ b/drivers/xen/v2v/v2vdrv.c
+@@ -49,11 +49,13 @@
+
+ static const char v2vdrv_usage[] = \
+ "Run as either the listener or connector by writing the following information:\n" \
+- "listener,<local_prefix>\n" \
+- "connector,<local_prefix>,[count],[size],[timout]\n" \
++ "listener,<local_prefix>,[async],[fastrx]\n" \
++ "connector,<local_prefix>,[async],[fastrx],[count],[size],[timout]\n" \
+ " local_prefix: XenStore local path for the V2V endpoint\n" \
+- " count: (opt) number of messages to send\n" \
+- " size: (opt) size of each message\n" \
++ " async: (opt) set to 1 to run in async mode, 0 for sync mode\n" \
++ " fastrx: (opt) set to 1 to use fast rx mode (on with sync mode)\n" \
++ " count: (opt) number of messages to send\n" \
++ " size: (opt) size of each message\n" \
+ " timout: (opt) timeout in ms for awaiting response\n" \
+ "Also a xenstore reader test routine:\n" \
+ "reader,<path>[:value]\n";
+@@ -61,13 +63,6 @@ static const char v2vdrv_usage[] = \
+ extern void v2vdrv_run_connector(struct v2vdrv_config *config);
+ extern void v2vdrv_run_listener(struct v2vdrv_config *config);
+
+-enum v2vdrv_role {
+- role_unknown = 0,
+- role_listener,
+- role_connector,
+- role_reader
+-};
+-
+ static void v2vdrv_run_reader(struct v2vdrv_config *config)
+ {
+ char *path = NULL;
+@@ -127,37 +122,40 @@ out:
+ kfree(value);
+ }
+
+-static enum v2vdrv_role v2vdrv_parse_config(const char *cfgstr, struct v2vdrv_config *config)
++static void
++v2vdrv_parse_config(const char *cfgstr, struct v2vdrv_config *config)
+ {
+ size_t len;
+- enum v2vdrv_role role = role_unknown;
+ int i, err, val, parsed;
+
++ config->role = role_unknown;
+ len = strlen(cfgstr);
+-
++
+ do {
+ if ((len > 9)&&(strncmp(cfgstr, "listener,", 9) == 0)) {
+ cfgstr += 9;
+ len -= 9;
+- role = role_listener;
++ config->role = role_listener;
+ }
+ else if ((len > 10)&&(strncmp(cfgstr, "connector,", 10) == 0)) {
+ cfgstr += 10;
+ len -= 10;
+- role = role_connector;
++ config->role = role_connector;
+ }
+ else if ((len > 7)&&(strncmp(cfgstr, "reader,", 7) == 0)) {
+ cfgstr += 7;
+ len -= 7;
+- role = role_reader;
++ config->role = role_reader;
+ }
+ else
+ break;
+-
++
+ /* some defaults */
+ config->xfer_count = 0; /* connect only, no data */
+ config->xfer_size = 512;
+- config->timeout = V2VDRV_RESPONSE_WAIT_TIMEOUT;
++ config->timeout = msecs_to_jiffies((const unsigned long)V2VDRV_RESPONSE_WAIT_TIMEOUT);
++ config->async = 0; /* default to sync mode */
++ config->fastrx = 0; /* default to no fastrx mode */
+
+ for (i = 0; i < len; i++)
+ if (cfgstr[i] == ',')
+@@ -166,29 +164,50 @@ static enum v2vdrv_role v2vdrv_parse_config(const char *cfgstr, struct v2vdrv_co
+ /* this is our local prefix */
+ config->local_prefix = kmalloc(i + 1, GFP_KERNEL);
+ if (!config->local_prefix) {
+- role = role_unknown;
++ config->role = role_unknown;
+ break;
+ }
+ strncpy(config->local_prefix, cfgstr, i);
+ config->local_prefix[i] = '\0';
+
+- if (role == role_listener || role == role_reader)
++ if (config->role == role_reader)
+ break;
+
+ if (i == len) /* no opt args */
+ break;
+
+- /* else this is a connector with more opt args */
++ /* else this is a listener/connector with more opt args */
+ cfgstr += i + 1;
+ parsed = 0;
+ err = sscanf(cfgstr, "%d,%n", &val, &parsed);
+ if (err < 1)
+ break;
+-
++ config->async = val;
++ if (parsed == 0)
++ break;
++
++ cfgstr += parsed;
++ parsed = 0;
++ err = sscanf(cfgstr, "%d,%n", &val, &parsed);
++ if (err < 1)
++ break;
++ config->fastrx = (uint32_t)val;
++ if (parsed == 0)
++ break;
++
++ /* all others are connector only */
++ if (config->role == role_listener)
++ break;
++
++ cfgstr += parsed;
++ parsed = 0;
++ err = sscanf(cfgstr, "%d,%n", &val, &parsed);
++ if (err < 1)
++ break;
+ config->xfer_count = (uint32_t)val;
+ if (parsed == 0)
+ break;
+-
++
+ cfgstr += parsed;
+ parsed = 0;
+ err = sscanf(cfgstr, "%d,%n", &val, &parsed);
+@@ -205,8 +224,6 @@ static enum v2vdrv_role v2vdrv_parse_config(const char *cfgstr, struct v2vdrv_co
+ break;
+ config->timeout = msecs_to_jiffies((const unsigned long)val);
+ } while (0);
+-
+- return role;
+ }
+
+ static ssize_t v2vdrv_read(struct file *file, char __user *buf,
+@@ -235,7 +252,6 @@ static ssize_t v2vdrv_write(struct file *file, const char __user *buf,
+ {
+ char *cfgstr = NULL;
+ struct v2vdrv_config *config = NULL;
+- enum v2vdrv_role role;
+ size_t written = -EFAULT;
+
+ if (count == 0)
+@@ -262,21 +278,31 @@ static ssize_t v2vdrv_write(struct file *file, const char __user *buf,
+ if (cfgstr[count - 1] == '\n')
+ cfgstr[count - 1] = '\0';
+
+- role = v2vdrv_parse_config(cfgstr, config);
++ v2vdrv_parse_config(cfgstr, config);
++
++ /* input checking */
++ if (config->async && config->fastrx) {
++ printk("V2V-DRV WARNING cannot use async and fastrx mode together; disabling fastrx.\n");
++ config->fastrx = 0;
++ }
+
+- if (role == role_listener) {
++ if (config->role == role_listener) {
+ printk("V2V-DRV loaded listener config...\n");
+ printk(" local prefix: %s\n", config->local_prefix);
++ printk(" async: %d\n", config->async);
++ printk(" fastrx: %d\n", config->fastrx);
+ v2vdrv_run_listener(config);
+ }
+- else if (role == role_connector) {
++ else if (config->role == role_connector) {
+ printk("V2V-DRV loaded connector config...\n");
+ printk(" local prefix: %s\n", config->local_prefix);
++ printk(" async: %d\n", config->async);
++ printk(" fastrx: %d\n", config->fastrx);
+ printk(" count: %d size: %d timeout: %lu\n",
+ config->xfer_count, config->xfer_size, config->timeout);
+ v2vdrv_run_connector(config);
+ }
+- else if (role == role_reader) {
++ else if (config->role == role_reader) {
+ printk("V2V-DRV loaded reader config...\n");
+ printk(" local prefix: %s\n", config->local_prefix);
+ v2vdrv_run_reader(config);
+diff --git a/drivers/xen/v2v/v2vdrv.h b/drivers/xen/v2v/v2vdrv.h
+index 8c3cd06..c695176 100644
+--- a/drivers/xen/v2v/v2vdrv.h
++++ b/drivers/xen/v2v/v2vdrv.h
+@@ -48,6 +48,13 @@
+ #define V2V_MESSAGE_STATUS_NODATA 0xFFFFF102
+ #define V2V_MESSAGE_STATUS_WRITE_ERR 0xFFFFF103
+
++enum v2vdrv_role {
++ role_unknown = 0,
++ role_listener,
++ role_connector,
++ role_reader
++};
++
+ struct v2vdrv_frame_header {
+ uint16_t id;
+ uint8_t type;
+@@ -74,9 +81,12 @@ struct v2vdrv_listener_resp_item {
+
+ struct v2vdrv_config {
+ char *local_prefix;
++ enum v2vdrv_role role;
+ uint32_t xfer_count;
+ uint32_t xfer_size;
+ unsigned long timeout;
++ int async;
++ int fastrx;
+ };
+
+ static inline uint8_t v2vdrv_checksum(const uint8_t *ptr, uint32_t length)
+diff --git a/drivers/xen/v2v/v2vops.c b/drivers/xen/v2v/v2vops.c
+index 0c2798c..83854a2 100644
+--- a/drivers/xen/v2v/v2vops.c
++++ b/drivers/xen/v2v/v2vops.c
+@@ -1,7 +1,7 @@
+ /******************************************************************************
+ * drivers/xen/v2v/v2vops.c
+ *
+- * V2V sample client driver synchronous operations.
++ * V2V sample client driver operations.
+ *
+ * Copyright (c) 2009 Ross Philipson
+ * Copyright (c) 2009 Citrix Systems, Inc.
+@@ -39,6 +39,7 @@
+ #include <linux/sched.h>
+ #include <linux/wait.h>
+ #include <linux/time.h>
++#include <linux/timer.h>
+ #include <linux/random.h>
+ #include <xen/xenbus.h>
+ #include <xen/v2v.h>
+@@ -46,38 +47,118 @@
+ #include "v2vdrv.h"
+
+ /************************** GENERAL **************************/
++#define V2V_WAKE_REASON_TERMINATE V2V_WAKE_REASON_USER1
++#define V2V_MAX_FASTRX_SIZE 1024
++#define MIN(x, y) ((x) < (y) ? (x) : (y))
++
++#define V2V_TERM_UNKNOWN (uint32_t)-1
++#define V2V_TERM_COMPLETE 0x0
++#define V2V_TERM_GENERAL_ERROR 0x1
++#define V2V_TERM_RX_ERROR 0x2
++#define V2V_TERM_TX_ERROR 0x3
++#define V2V_TERM_TIMEOUT 0x4
++
++struct v2vdrv_context {
++ struct v2vdrv_config *config;
++ struct v2v_channel *channel;
++ struct v2v_async *asvp;
++ uint32_t tx_counter;
++ uint32_t rx_counter;
++
++ union {
++ struct {
++ struct v2vdrv_listener_resp_item *resp_list;
++ struct v2vdrv_listener_resp_item *resp_tail;
++ } listener;
++ struct {
++ int reserved;
++ } connector;
++ } r;
++
++ union {
++ struct {
++ struct v2v_async asv;
++ struct work_struct rx_work;
++ struct work_struct tx_work;
++ struct timer_list to_timer;
++ spinlock_t rx_lock;
++ spinlock_t tx_lock;
++ atomic_t running;
++ uint32_t term_status;
++ } async;
++ struct {
++ int reserved;
++ } sync;
++ } s;
++};
++
++static void v2vdrv_async_init(struct v2vdrv_context *ctx);
++
++static int
++v2vdrv_input_sanity_check(struct v2vdrv_context *ctx)
++{
++ if (ctx->config->role == role_connector) {
++ if (ctx->config->xfer_size <= sizeof(struct v2vdrv_post_internal)) {
++ printk("%s (%s) transfer size %d (0x%x) too small; %d (0x%x) required\n", V2VDRV_LOGTAG,
++ "connector", ctx->config->xfer_size, ctx->config->xfer_size,
++ sizeof(struct v2vdrv_post_internal) + 1, sizeof(struct v2vdrv_post_internal) + 1);
++ return 0;
++ }
++ }
++ /* listener always sends fixed sized messages and doesn use xfer_size */
++
++ return 1;
++}
++
+ static int
+ v2vdrv_message_header_check(const char *rstr,
+ struct v2vdrv_frame_header *header,
+ size_t msg_size,
+- size_t min_size,
+- uint32_t rx_counter)
++ size_t min_size)
+ {
+ if ((msg_size < sizeof(struct v2vdrv_frame_header))||(msg_size < min_size)) {
+- printk("%s (%s) response #%d is too small!!!\n", V2VDRV_LOGTAG, rstr, rx_counter);
++ printk("%s (%s) response is too small!!!\n", V2VDRV_LOGTAG, rstr);
+ return 0;
+ }
+ if (header->length < msg_size) {
+- printk("%s (%s) response #%d header length incorrect!!!\n", V2VDRV_LOGTAG, rstr, rx_counter);
++ printk("%s (%s) response header length incorrect!!!\n", V2VDRV_LOGTAG, rstr);
+ return 0;
+ }
+
+- printk("%s (%s) received message #%d\n", V2VDRV_LOGTAG, rstr, rx_counter);
++ printk("%s (%s) received message\n", V2VDRV_LOGTAG, rstr);
+ printk("------ id=%d type=%d length=0x%x\n", header->id, header->type, header->length);
+
+ return 1;
+ }
+
++static int
++v2vdrv_status_check(struct v2vdrv_context *ctx, const char *rstr)
++{
++ int err;
++ enum v2v_endpoint_state state;
++
++ err = v2v_get_remote_state(ctx->channel, &state);
++ if (err) {
++ printk("%s %s(%p) failure in v2v_get_remote_state(); aborting - error: %d\n",
++ V2VDRV_LOGTAG, rstr, ctx, err);
++ return 1; /* done with error */
++ }
++ printk("%s %s(%p) state changed for other end - new state: %s\n",
++ V2VDRV_LOGTAG, rstr, ctx, v2v_endpoint_state_name(state));
++ if (v2v_state_requests_disconnect(state)) {
++ printk("%s %s(%p) main processing loop ending for disconnect request...\n",
++ V2VDRV_LOGTAG, rstr, ctx);
++ return 1; /* done */
++ }
++ return 0;
++}
++
+ /************************* CONNECTOR *************************/
+-struct v2vdrv_connector_context {
+- struct v2vdrv_config *config;
+- struct v2v_channel *channel;
+- uint32_t tx_counter;
+- uint32_t rx_counter;
+-};
++static void v2vdrv_connector_process_messages_sync(struct v2vdrv_context *ctx);
++static void v2vdrv_connector_process_messages_async(struct v2vdrv_context *ctx);
+
+ static int
+-v2vdrv_connect(struct v2vdrv_connector_context *vcc)
++v2vdrv_connect(struct v2vdrv_context *ctx)
+ {
+ int err = 0;
+ enum v2v_endpoint_state state;
+@@ -87,18 +168,18 @@ v2vdrv_connect(struct v2vdrv_connector_context *vcc)
+ struct timespec ts = {0}, tsz = {0}, now, delta;
+
+ /* Connect to the listener, get back a channel handle */
+- err = v2v_connect(vcc->config->local_prefix, &vcc->channel);
++ err = v2v_connect(ctx->config->local_prefix, &ctx->channel, ctx->asvp);
+ if (err) {
+- printk("%s connector(%p) failure in v2v_connect() - error: %d\n", V2VDRV_LOGTAG, vcc, err);
++ printk("%s connector(%p) failure in v2v_connect() - error: %d\n", V2VDRV_LOGTAG, ctx, err);
+ return err;
+ }
+
+- BUG_ON(vcc->channel == NULL);
++ BUG_ON(ctx->channel == NULL);
+
+- printk("%s connector(%p) connected to listener; wait for listenter to indicate it has accepted the connection...\n", V2VDRV_LOGTAG, vcc);
++ printk("%s connector(%p) connected to listener; wait for listenter to indicate it has accepted the connection...\n", V2VDRV_LOGTAG, ctx);
+
+- wait_state = v2v_get_wait_state(vcc->channel);
+- to = vcc->config->timeout << 2; /* in jiffies x4*/
++ wait_state = v2v_get_wait_state(ctx->channel);
++ to = ctx->config->timeout << 2; /* in jiffies x4*/
+
+ do {
+ if (!timespec_equal(&ts, &tsz)) {
+@@ -120,24 +201,24 @@ v2vdrv_connect(struct v2vdrv_connector_context *vcc)
+ to);
+ if (rc == 0) {
+ printk("%s connector(%p) timed out waiting for accept from listener; disconnecting\n",
+- V2VDRV_LOGTAG, vcc);
++ V2VDRV_LOGTAG, ctx);
+ err = -ETIMEDOUT;
+ break;
+ }
+
+- reasons = v2v_get_wake_reason(vcc->channel, V2V_WAKE_REASON_CONTROL);
++ reasons = v2v_get_wake_reason(ctx->channel, V2V_WAKE_REASON_CONTROL);
+ if (reasons & V2V_WAKE_REASON_CONTROL) {
+- err = v2v_get_remote_state(vcc->channel, &state);
++ err = v2v_get_remote_state(ctx->channel, &state);
+ if (err) {
+ printk("%s connector(%p) failure in v2v_get_remote_state(); aborting - error: %d\n",
+- V2VDRV_LOGTAG, vcc, err);
++ V2VDRV_LOGTAG, ctx, err);
+ break;
+ }
+ printk("%s connector(%p) state changed for other end - new state: %s\n",
+- V2VDRV_LOGTAG, vcc, v2v_endpoint_state_name(state));
++ V2VDRV_LOGTAG, ctx, v2v_endpoint_state_name(state));
+ if (state == v2v_state_connected) {
+ printk("%s connector(%p) listener reports connected; begin processing messages.\n",
+- V2VDRV_LOGTAG, vcc);
++ V2VDRV_LOGTAG, ctx);
+ err = 0;
+ break;
+ }
+@@ -145,31 +226,89 @@ v2vdrv_connect(struct v2vdrv_connector_context *vcc)
+ } while (1);
+
+ if (err)
+- v2v_disconnect(vcc->channel);
++ v2v_disconnect(ctx->channel);
+
+ return err;
+ }
+
++static void
++v2vdrv_connector_disconnect(struct v2vdrv_context *ctx)
++{
++ int err;
++
++ printk("%s connector(%p) Disconnecting...\n", V2VDRV_LOGTAG, ctx);
++ err = v2v_disconnect(ctx->channel);
++ printk("%s connector(%p) Disconnected - status: %d\n", V2VDRV_LOGTAG, ctx, err);
++
++ printk("%s connector(%p) Sent message counter: %d\n", V2VDRV_LOGTAG, ctx, ctx->tx_counter);
++ printk("%s connector(%p) Received response counter: %d\n", V2VDRV_LOGTAG, ctx, ctx->rx_counter);
++
++ if (ctx->tx_counter != ctx->rx_counter)
++ printk("%s connector(%p) WARNING Response count does not match the send count\n", V2VDRV_LOGTAG, ctx);
++}
++
++void v2vdrv_run_connector(struct v2vdrv_config *config)
++{
++ struct v2vdrv_context *ctx;
++ int err;
++
++ ctx = kmalloc(sizeof(struct v2vdrv_context), GFP_KERNEL);
++ if (!ctx) {
++ printk("%s connector out of memory\n", V2VDRV_LOGTAG);
++ return;
++ }
++ memset(ctx, 0, sizeof(struct v2vdrv_context));
++ ctx->config = config;
++
++ if (config->async)
++ v2vdrv_async_init(ctx);
++
++ if (!v2vdrv_input_sanity_check(ctx)) {
++ kfree(ctx);
++ return;
++ }
++
++ err = v2vdrv_connect(ctx);
++ if (err) {
++ kfree(ctx);
++ return;
++ }
++
++ /* This runs the main processing loop, when it is done we disconnect
++ and cleanup regardless of what may have occured */
++ if (config->async)
++ v2vdrv_connector_process_messages_async(ctx);
++ else
++ v2vdrv_connector_process_messages_sync(ctx);
++
++ v2vdrv_connector_disconnect(ctx);
++
++ kfree(ctx);
++}
++
++/********************** CONNECTOR SYNC ***********************/
+ static int
+-v2vdrv_connector_process_internal_rx(struct v2vdrv_connector_context *vcc)
++v2vdrv_connector_process_internal_sync_rx(struct v2vdrv_context *ctx)
+ {
+ int err;
+ volatile void *msg;
+ size_t size;
+- unsigned type;
+- unsigned flags;
++ unsigned vtype;
++ unsigned vflags;
+ struct v2vdrv_frame_header *header;
+ struct v2vdrv_resp_internal *vri;
+ uint8_t sum;
+
+- while ((err = v2v_nc2_get_message(vcc->channel, (const volatile void **)&msg, &size, &type, &flags))
++ if (ctx->config->fastrx)
++ v2v_nc2_request_fast_receive(ctx->channel);
++
++ while ((err = v2v_nc2_get_message(ctx->channel, (const volatile void **)&msg, &size, &vtype, &vflags))
+ == 0) {
+- vcc->rx_counter++;
++ ctx->rx_counter++;
+ header = (struct v2vdrv_frame_header*)msg;
+ if (!v2vdrv_message_header_check("connector", header, size,
+- sizeof(struct v2vdrv_resp_internal),
+- vcc->rx_counter)) {
+- v2v_nc2_finish_message(vcc->channel);
++ sizeof(struct v2vdrv_resp_internal))) {
++ v2v_nc2_finish_message(ctx->channel);
+ return -EBADMSG;
+ }
+
+@@ -184,97 +323,107 @@ v2vdrv_connector_process_internal_rx(struct v2vdrv_connector_context *vcc)
+
+ sum = v2vdrv_checksum((const uint8_t*)msg, header->length);
+ if (sum != 0)
+- printk("%s connector(%p) bad checksumm on response #%d!!!\n", V2VDRV_LOGTAG, vcc, vcc->rx_counter);
++ printk("%s connector(%p) bad checksumm on response #%d!!!\n", V2VDRV_LOGTAG, ctx, ctx->rx_counter);
+
+- v2v_nc2_finish_message(vcc->channel);
++ v2v_nc2_finish_message(ctx->channel);
+ }
++ if (ctx->config->fastrx)
++ v2v_nc2_cancel_fast_receive(ctx->channel);
++
+ if (err == -ENODATA) {
+ /* No more messages */
+- printk("%s connector(%p) no more messages, returning\n", V2VDRV_LOGTAG, vcc);
++ printk("%s connector(%p) no more messages, returning\n", V2VDRV_LOGTAG, ctx);
+ return 0;
+ }
+
+ printk("%s connector(%p) receive internal data failure; abort processing - error: %d\n",
+- V2VDRV_LOGTAG, vcc, err);
++ V2VDRV_LOGTAG, ctx, err);
+ return err; /* failure */
+ }
+
+ static int
+-v2vdrv_connector_process_internal_tx(struct v2vdrv_connector_context *vcc)
++v2vdrv_connector_process_internal_sync_tx(struct v2vdrv_context *ctx)
+ {
+ int err;
+ unsigned available;
+ volatile void *msg;
+ uint8_t *msgp;
++ size_t msize;
+ struct v2vdrv_frame_header *header;
+ struct v2vdrv_post_internal *vpi;
+
+- printk("%s connector(%p) sending internal message #%d\n", V2VDRV_LOGTAG, vcc, vcc->tx_counter + 1);
+- available = v2v_nc2_producer_bytes_available(vcc->channel);
+- printk("%s connector(%p) channel indicates minimum bytes available: 0x%x\n", V2VDRV_LOGTAG, vcc, available);
++ printk("%s connector(%p) sending internal message #%d\n", V2VDRV_LOGTAG, ctx, ctx->tx_counter + 1);
++ available = v2v_nc2_producer_bytes_available(ctx->channel);
++ printk("%s connector(%p) channel indicates minimum bytes available: 0x%x\n", V2VDRV_LOGTAG, ctx, available);
++
++ if (ctx->config->fastrx && v2v_nc2_remote_requested_fast_wakeup(ctx->channel))
++ msize = MIN(ctx->config->xfer_size, V2V_MAX_FASTRX_SIZE);
++ else
++ msize = ctx->config->xfer_size;
+
+- err = v2v_nc2_prep_message(vcc->channel, vcc->config->xfer_size, V2V_MESSAGE_TYPE_INTERNAL, 0, &msg);
++ err = v2v_nc2_prep_message(ctx->channel, msize, V2V_MESSAGE_TYPE_INTERNAL, 0, &msg);
+ if (err) {
+ if (err == -EAGAIN) {
+ /* No room right now, return and try again later */
+ printk("%s connector(%p) not enough buffer space to send message #%d; retry\n",
+- V2VDRV_LOGTAG, vcc, vcc->tx_counter + 1);
++ V2VDRV_LOGTAG, ctx, ctx->tx_counter + 1);
+ return -EAGAIN;
+ }
+ printk("%s connector(%p) transmit internal data failure; abort processing - error: %d\n",
+- V2VDRV_LOGTAG, vcc, err);
++ V2VDRV_LOGTAG, ctx, err);
+ return err; /* failure */
+ }
+- vcc->tx_counter++; /* next message */
++ ctx->tx_counter++; /* next message */
+ header = (struct v2vdrv_frame_header*)msg;
+- header->id = (uint16_t)vcc->tx_counter;
++ header->id = (uint16_t)ctx->tx_counter;
+ header->type = V2V_MESSAGE_TYPE_INTERNAL;
+ header->cs = 0;
+- header->length = vcc->config->xfer_size;
++ header->length = ctx->config->xfer_size;
+ vpi = (struct v2vdrv_post_internal*)msg;
+ generate_random_uuid(vpi->guid);
+
+ /* Fill it up with some data and send it */
+ msgp = (uint8_t*)msg + sizeof(struct v2vdrv_post_internal);
+- memset(msgp, 'X', (vcc->config->xfer_size - sizeof(struct v2vdrv_post_internal)));
+- header->cs = v2vdrv_checksum((const uint8_t*)msg, vcc->config->xfer_size);
+- v2v_nc2_send_messages(vcc->channel);
++ memset(msgp, 'X', (ctx->config->xfer_size - sizeof(struct v2vdrv_post_internal)));
++ header->cs = v2vdrv_checksum((const uint8_t*)msg, ctx->config->xfer_size);
++ v2v_nc2_send_messages(ctx->channel);
+
+ /* Keep the send loop going by setting the event. If there is no more room, the prep message call
+ will return ERROR_RETRY and just land us back in the wait. */
+- v2v_set_wake_reason(vcc->channel, V2V_WAKE_REASON_SEND);
++ v2v_set_wake_reason(ctx->channel, V2V_WAKE_REASON_SEND);
+
+ return 0;
+ }
+
+ static void
+-v2vdrv_connector_process_messages(struct v2vdrv_connector_context *vcc)
++v2vdrv_connector_process_messages_sync(struct v2vdrv_context *ctx)
+ {
+ int err = 0, wait_to = 0, done = 0;
+ struct v2v_wait *wait_state;
+ u8 reasons_mask = V2V_WAKE_REASON_CONTROL|V2V_WAKE_REASON_RECEIVE|V2V_WAKE_REASON_SEND;
+- u8 reasons;
+- enum v2v_endpoint_state state;
++ u8 reason;
+ unsigned long to, td, rc;
+ struct timespec ts = {0}, tsz = {0}, now, delta;
+
++ printk("%s connector(%p) started SYNC processing loop for transfer\n", V2VDRV_LOGTAG, ctx);
++
+ /* A transfer count of 0 is used to just test connecting and disconnecting
+ w/o sending any data */
+- if (vcc->config->xfer_count == 0) {
+- printk("%s connector(%p) tranfer count set to 0; disconnecting.\n", V2VDRV_LOGTAG, vcc);
++ if (ctx->config->xfer_count == 0) {
++ printk("%s connector(%p) tranfer count set to 0; disconnecting.\n", V2VDRV_LOGTAG, ctx);
+ return;
+ }
+
+- wait_state = v2v_get_wait_state(vcc->channel);
+- to = vcc->config->timeout << 2; /* in jiffies x4*/
++ wait_state = v2v_get_wait_state(ctx->channel);
++ to = ctx->config->timeout; /* in jiffies */
+
+ /* Send our first file chunk to the listener to start things off */
+- err = v2vdrv_connector_process_internal_tx(vcc);
++ err = v2vdrv_connector_process_internal_sync_tx(ctx);
+ if (err) {
+ /* we should not get an -EAGAIN on first message, return it as an error */
+ BUG_ON(err == -EAGAIN); /* SNO on first message */
+ printk("%s connector(%p) transmit internal data failure on first message; abort processing - error: %d\n",
+- V2VDRV_LOGTAG, vcc, err);
++ V2VDRV_LOGTAG, ctx, err);
+ return;
+ }
+
+@@ -282,10 +431,10 @@ v2vdrv_connector_process_messages(struct v2vdrv_connector_context *vcc)
+ do {
+ /* When the tx counter reaches the transfer count value, stop sending and wait for
+ the rest of the responses */
+- if (vcc->tx_counter == vcc->config->xfer_count) {
++ if (ctx->tx_counter == ctx->config->xfer_count) {
+ /* First see if we are done */
+- if (vcc->rx_counter == vcc->tx_counter) {
+- printk("%s connector(%p) received all remaing responses from listener; disconnecting.\n", V2VDRV_LOGTAG, vcc);
++ if (ctx->rx_counter == ctx->tx_counter) {
++ printk("%s connector(%p) received all remaing responses from listener; disconnecting.\n", V2VDRV_LOGTAG, ctx);
+ err = 0;
+ break;
+ }
+@@ -318,50 +467,35 @@ v2vdrv_connector_process_messages(struct v2vdrv_connector_context *vcc)
+ }
+ if (rc == 0) {
+ printk("%s connector(%p) timed out waiting for ack responses from listener; disconnecting\n",
+- V2VDRV_LOGTAG, vcc);
++ V2VDRV_LOGTAG, ctx);
+ break;
+ }
+
+ do {
+- reasons = v2v_get_wake_reason(vcc->channel, reasons_mask);
++ reason = v2v_get_wake_reason(ctx->channel, reasons_mask);
+
+- if (reasons & V2V_WAKE_REASON_CONTROL) {
+- err = v2v_get_remote_state(vcc->channel, &state);
+- if (err) {
+- printk("%s connector(%p) failure in v2v_get_remote_state(); aborting - error: %d\n",
+- V2VDRV_LOGTAG, vcc, err);
+- done = 1;
+- break;
+- }
+- printk("%s connector(%p) state changed for other end - new state: %s\n",
+- V2VDRV_LOGTAG, vcc, v2v_endpoint_state_name(state));
+- if (v2v_state_requests_disconnect(state)) {
+- printk("%s connector(%p) main processing loop ending for disconnect request...\n",
+- V2VDRV_LOGTAG, vcc);
+- err = 0;
+- done = 1;
+- break;
+- }
++ if (reason & V2V_WAKE_REASON_CONTROL) {
++ done = v2vdrv_status_check(ctx, "connector");
++ if (done)
++ break;
+ }
+-
+- if (reasons & V2V_WAKE_REASON_SEND) {
+- if (vcc->tx_counter != vcc->config->xfer_count) {
+- err = v2vdrv_connector_process_internal_tx(vcc);
++ else if (reason & V2V_WAKE_REASON_SEND) {
++ if (ctx->tx_counter != ctx->config->xfer_count) {
++ err = v2vdrv_connector_process_internal_sync_tx(ctx);
+ if ((err != 0)&&(err != -EAGAIN)) {
+ done = 1;
+ break;
+ }
+ }
+ }
+-
+- if (reasons & V2V_WAKE_REASON_RECEIVE) {
+- err = v2vdrv_connector_process_internal_rx(vcc);
++ else if (reason & V2V_WAKE_REASON_RECEIVE) {
++ err = v2vdrv_connector_process_internal_sync_rx(ctx);
+ if (err) {
+ done = 1;
+ break;
+ }
+ }
+- } while (reasons != V2V_WAKE_REASON_NONE);
++ } while (reason != V2V_WAKE_REASON_NONE);
+
+ if (done)
+ break;
+@@ -369,114 +503,364 @@ v2vdrv_connector_process_messages(struct v2vdrv_connector_context *vcc)
+ } while (1);
+ }
+
+-static void
+-v2vdrv_connector_disconnect(struct v2vdrv_connector_context *vcc)
++/********************* CONNECTOR ASYNC ***********************/
++static int
++v2vdrv_connector_process_internal_async_rx(struct v2vdrv_context *ctx)
+ {
+- int err;
++ int err, last = 0;
++ unsigned long flags;
++ uint32_t rxc = 0;
++ volatile void *msg;
++ size_t size;
++ unsigned vtype;
++ unsigned vflags;
++ struct v2vdrv_frame_header *header;
++ struct v2vdrv_resp_internal lvri;
++ uint8_t *pguid;
++ uint8_t sum;
+
+- printk("%s connector(%p) Disconnecting...\n", V2VDRV_LOGTAG, vcc);
+- err = v2v_disconnect(vcc->channel);
+- printk("%s connector(%p) Disconnected - status: %d\n", V2VDRV_LOGTAG, vcc, err);
++ do {
++ /* Critical section where we access the rings - have to lock this area */
++ spin_lock_irqsave(&ctx->s.async.rx_lock, flags);
+
+- printk("%s connector(%p) Sent message counter: %d\n", V2VDRV_LOGTAG, vcc, vcc->tx_counter);
+- printk("%s connector(%p) Received response counter: %d\n", V2VDRV_LOGTAG, vcc, vcc->rx_counter);
++ err = v2v_nc2_get_message(ctx->channel, (const volatile void **)&msg, &size, &vtype, &vflags);
++ if (err != 0) {
++ spin_unlock_irqrestore(&ctx->s.async.rx_lock, flags);
++ break;
++ }
+
+- if (vcc->tx_counter != vcc->rx_counter)
+- printk("%s connector(%p) WARNING Response count does not match the send count\n", V2VDRV_LOGTAG, vcc);
++ header = (struct v2vdrv_frame_header*)msg;
++ if (!v2vdrv_message_header_check("connector", header, size,
++ sizeof(struct v2vdrv_resp_internal))) {
++ v2v_nc2_finish_message(ctx->channel);
++ spin_unlock_irqrestore(&ctx->s.async.rx_lock, flags);
++ err = -EBADMSG;
++ break;
++ }
++
++ /* Make a local copy of the message header and run a checksum over the message */
++ memcpy(&lvri, (void*)msg, sizeof(struct v2vdrv_resp_internal));
++ sum = v2vdrv_checksum((const uint8_t*)msg, header->length);
++
++ v2v_nc2_finish_message(ctx->channel);
++ spin_unlock_irqrestore(&ctx->s.async.rx_lock, flags);
++
++ /* Critical section to test counter state */
++ spin_lock_irqsave(&ctx->s.async.tx_lock, flags);
++
++ /* Update counter tracking state, we just sent one */
++ rxc = ++ctx->rx_counter;
++
++ /* See if we are done sending and receiving and set the completion event */
++ if ((ctx->tx_counter == ctx->config->xfer_count)&&(ctx->rx_counter == ctx->tx_counter)) {
++ ctx->s.async.term_status = V2V_TERM_COMPLETE;
++ v2v_set_wake_reason(ctx->channel, V2V_WAKE_REASON_TERMINATE);
++ last = 0;
++ }
++ spin_unlock_irqrestore(&ctx->s.async.tx_lock, flags);
++
++ /* Out of critical section and the buffer in the ring has been released,
++ back; can't touch the msg any longer - do some testing and tracing */
++ printk("------ message status=%d\n", lvri.status);
++ pguid = &lvri.guid[0];
++ printk("------ GUID1=%2.2x:%2.2x:%2.2x:%2.2x:%2.2x:%2.2x:%2.2x:%2.2x\n",
++ pguid[0], pguid[1], pguid[2], pguid[3], pguid[4], pguid[5], pguid[6], pguid[7]);
++ printk("------ GUID2=%2.2x:%2.2x:%2.2x:%2.2x:%2.2x:%2.2x:%2.2x:%2.2x\n",
++ pguid[8], pguid[9], pguid[10], pguid[11], pguid[12], pguid[13], pguid[14], pguid[15]);
++ if (sum != 0)
++ printk("%s connector(%p) bad checksumm on response #%d!!!\n", V2VDRV_LOGTAG, ctx, ctx->rx_counter);
++
++ if (last) {
++ err = 0;
++ break;
++ }
++ } while (1);
++
++ /* If we ended the receive handler with this code, there are no more message right
++ now so we wait for an rx interrupt. If the status is success then it was the last
++ message. */
++ if (err == -ENODATA) {
++ /* No more messages */
++ printk("%s connector(%p) no more messages, returning\n", V2VDRV_LOGTAG, ctx);
++ return 0;
++ }
++ else if (err == 0) {
++ /* Last message received */
++ printk("%s connector(%p) last message sent, returning\n", V2VDRV_LOGTAG, ctx);
++ return 0;
++ }
++
++ printk("%s connector(%p) receive internal data failure; abort processing - error: %d\n",
++ V2VDRV_LOGTAG, ctx, err);
++ return err; /* failure */
+ }
+
+-void v2vdrv_run_connector(struct v2vdrv_config *config)
++static int
++v2vdrv_connector_process_internal_async_tx(struct v2vdrv_context *ctx)
+ {
+- struct v2vdrv_connector_context *vcc;
+ int err;
++ unsigned long flags;
++ uint32_t txc = 0;
++ unsigned available;
++ volatile void *msg;
++ uint8_t *msgp;
++ struct v2vdrv_frame_header *header;
++ struct v2vdrv_post_internal *vpi;
++ uint8_t guid[16];
+
+- vcc = kmalloc(sizeof(struct v2vdrv_connector_context), GFP_KERNEL);
+- if (!vcc) {
+- printk("%s connector out of memory\n", V2VDRV_LOGTAG);
++ /* Loop and send as many messages as possible. The internal message connector does not wait for
++ responses before posting more messages. */
++ do {
++ /* Make a GUID for the next message we send if any */
++ generate_random_uuid(&guid[0]);
++
++ /* Critical section where we access the rings - have to lock this area */
++ spin_lock_irqsave(&ctx->s.async.tx_lock, flags);
++
++ /* See if we are done sending, if so set a timer and exit */
++ if (ctx->tx_counter == ctx->config->xfer_count) {
++ spin_unlock_irqrestore(&ctx->s.async.tx_lock, flags);
++ /* Set the expiry for waiting for responses now that all messages have been sent */
++ if (!timer_pending(&ctx->s.async.to_timer)) {
++ ctx->s.async.to_timer.expires = jiffies + ctx->config->timeout; /* in jiffies */
++ add_timer(&ctx->s.async.to_timer);
++ printk("%s connector(%p) finished sending message, setting timer and exiting\n", V2VDRV_LOGTAG, ctx);
++ }
++ else
++ printk("%s connector(%p) finished sending message, timer already pending???\n", V2VDRV_LOGTAG, ctx);
++
++ break;
++ }
++
++ available = v2v_nc2_producer_bytes_available(ctx->channel);
++ err = v2v_nc2_prep_message(ctx->channel, ctx->config->xfer_size, V2V_MESSAGE_TYPE_INTERNAL, 0, &msg);
++ if (err) {
++ spin_unlock_irqrestore(&ctx->s.async.tx_lock, flags);
++ if (err == -EAGAIN) {
++ /* No room right now, return and try again later */
++ printk("%s connector(%p) not enough buffer space to send message #%d; retry\n",
++ V2VDRV_LOGTAG, ctx, ctx->tx_counter + 1);
++ err = 0;
++ }
++ else /* failure */
++ printk("%s connector(%p) transmit internal data failure; abort processing - error: %d\n",
++ V2VDRV_LOGTAG, ctx, err);
++ break;
++ }
++
++ txc = ++ctx->tx_counter; /* next message */
++ header = (struct v2vdrv_frame_header*)msg;
++ header->id = (uint16_t)ctx->tx_counter;
++ header->type = V2V_MESSAGE_TYPE_INTERNAL;
++ header->cs = 0;
++ header->length = ctx->config->xfer_size;
++ vpi = (struct v2vdrv_post_internal*)msg;
++ memcpy(&vpi->guid[0], &guid, sizeof(guid));
++
++ /* Fill it up with some data and send it */
++ msgp = (uint8_t*)msg + sizeof(struct v2vdrv_post_internal);
++ memset(msgp, 'X', (ctx->config->xfer_size - sizeof(struct v2vdrv_post_internal)));
++ header->cs = v2vdrv_checksum((const uint8_t*)msg, ctx->config->xfer_size);
++ v2v_nc2_send_messages(ctx->channel);
++
++ spin_unlock_irqrestore(&ctx->s.async.tx_lock, flags);
++
++ printk("%s connector(%p) sent internal message #%d\n", V2VDRV_LOGTAG, ctx, txc);
++ } while (1);
++
++ return err;
++}
++
++static void
++v2vdrv_connector_process_messages_async(struct v2vdrv_context *ctx)
++{
++ int done = 0;
++ struct v2v_wait *wait_state;
++ u8 reasons_mask = V2V_WAKE_REASON_CONTROL|V2V_WAKE_REASON_TERMINATE;
++ u8 reason;
++
++ printk("%s connector(%p) started ASYNC processing loop for transfer\n", V2VDRV_LOGTAG, ctx);
++
++ /* A transfer count of 0 is used to just test connecting and disconnecting
++ w/o sending any data */
++ if (ctx->config->xfer_count == 0) {
++ printk("%s connector(%p) tranfer count set to 0; disconnecting.\n", V2VDRV_LOGTAG, ctx);
+ return;
+ }
+- memset(vcc, 0, sizeof(struct v2vdrv_connector_context));
+- vcc->config = config;
+
+- err = v2vdrv_connect(vcc);
+- if (err)
+- return;
++ atomic_inc(&ctx->s.async.running);
++ wait_state = v2v_get_wait_state(ctx->channel);
+
+- /* This runs the main processing loop, when it is done we disconnect
+- and cleanup regardless of what may have occured */
+- v2vdrv_connector_process_messages(vcc);
++ /* Send our first message to the listener to start things off */
++ schedule_work(&ctx->s.async.tx_work);
+
+- v2vdrv_connector_disconnect(vcc);
++ /* Start out processing loop, wait for status changes */
++ do {
++ wait_event(wait_state->wait_event,
++ atomic_xchg(&wait_state->wait_condition, 0) == 1);
++
++ do {
++ reason = v2v_get_wake_reason(ctx->channel, reasons_mask);
++
++ if (reason & V2V_WAKE_REASON_CONTROL) {
++ done = v2vdrv_status_check(ctx, "connector");
++ if (done)
++ break;
++ }
++ else if (reason & V2V_WAKE_REASON_TERMINATE) {
++ /* The terminate event may indicate an error or normal completion */
++ switch (ctx->s.async.term_status) {
++ case V2V_TERM_COMPLETE:
++ case V2V_TERM_TIMEOUT:
++ printk("%s connector(%p) async handlers signalled a terminate for reason: %s; exiting.\n",
++ V2VDRV_LOGTAG, ctx, (ctx->s.async.term_status == V2V_TERM_COMPLETE) ? "complete" : "timeout");
++ break;
++ default:
++ printk("%s connector(%p) async handlers signalled a terminate with error status=0x%x; exiting.\n",
++ V2VDRV_LOGTAG, ctx, ctx->s.async.term_status);
++ }
++ done = 1;
++ break;
++ }
++ } while (reason != V2V_WAKE_REASON_NONE);
++
++ if (done)
++ break;
++
++ } while (1);
+
+- kfree(vcc);
++ atomic_dec(&ctx->s.async.running);
++ del_timer_sync(&ctx->s.async.to_timer);
+ }
+
+-/************************* LISTENER *************************/
+-struct v2vdrv_listener_context {
+- struct v2vdrv_config *config;
+- struct v2v_channel *channel;
+- uint32_t tx_counter;
+- uint32_t rx_counter;
+- struct v2vdrv_listener_resp_item *resp_list;
+- struct v2vdrv_listener_resp_item *resp_tail;
+-};
++/************************* LISTENER **************************/
++static void v2vdrv_listener_process_messages_sync(struct v2vdrv_context *ctx);
++static void v2vdrv_listener_process_messages_async(struct v2vdrv_context *ctx);
+
+ static int
+-v2vdrv_listen_accept(struct v2vdrv_listener_context *vlc)
++v2vdrv_listen_accept(struct v2vdrv_context *ctx)
+ {
+ int err, err2;
+
+ /* Start the listener, get back a channel handle */
+- err = v2v_listen(vlc->config->local_prefix, &vlc->channel, 0, 0);
++ err = v2v_listen(ctx->config->local_prefix, &ctx->channel, 0, 0, ctx->asvp);
+ if (err) {
+- printk("%s listener(%p) failure in v2v_listen() - error: %d\n", V2VDRV_LOGTAG, vlc, err);
++ printk("%s listener(%p) failure in v2v_listen() - error: %d\n", V2VDRV_LOGTAG, ctx, err);
+ return err;
+ }
+- BUG_ON(vlc->channel == NULL);
+- printk("%s listener(%p) listener started, wait to accept...\n", V2VDRV_LOGTAG, vlc);
++ BUG_ON(ctx->channel == NULL);
++ printk("%s listener(%p) listener started, wait to accept...\n", V2VDRV_LOGTAG, ctx);
+
+ /* Wait to accept the connection from the connector end */
+- err = v2v_accept(vlc->channel);
++ err = v2v_accept(ctx->channel);
+ if (err) {
+ if (err != -ENOLINK)
+- printk("%s listener(%p) failure in v2v_accept() - error: %d\n", V2VDRV_LOGTAG, vlc, err);
++ printk("%s listener(%p) failure in v2v_accept() - error: %d\n", V2VDRV_LOGTAG, ctx, err);
+ else
+- printk("%s listener(%p) remote end disconnected while waiting to accept\n", V2VDRV_LOGTAG, vlc);
++ printk("%s listener(%p) remote end disconnected while waiting to accept\n", V2VDRV_LOGTAG, ctx);
+
+- err2 = v2v_disconnect(vlc->channel);
++ err2 = v2v_disconnect(ctx->channel);
+ if (err2) {
+ printk("%s listener(%p) secondary failure in v2v_disconnect() after accept failed - error: %d\n",
+- V2VDRV_LOGTAG, vlc, err2);
++ V2VDRV_LOGTAG, ctx, err2);
+ }
+ return err;
+ }
+
+- printk("%s listener(%p) accepted connection, ready to process incoming data.\n", V2VDRV_LOGTAG, vlc);
++ printk("%s listener(%p) accepted connection, ready to process incoming data.\n", V2VDRV_LOGTAG, ctx);
+
+ return 0;
+ }
+
++static void
++v2vdrv_listener_disconnect(struct v2vdrv_context *ctx)
++{
++ int err;
++ uint32_t i = 0;
++ struct v2vdrv_listener_resp_item *resp;
++
++ printk("%s listener(%p) Disconnecting...\n", V2VDRV_LOGTAG, ctx);
++ err = v2v_disconnect(ctx->channel);
++ printk("%s listener(%p) Disconnected - status: %d\n", V2VDRV_LOGTAG, ctx, err);
++
++ printk("%s listener(%p) Sent message counter: %d\n", V2VDRV_LOGTAG, ctx, ctx->tx_counter);
++ printk("%s listener(%p) Received response counter: %d\n", V2VDRV_LOGTAG, ctx, ctx->rx_counter);
++ if (ctx->tx_counter != ctx->rx_counter)
++ printk("%s listener(%p) WARNING Response count does not match the send count\n", V2VDRV_LOGTAG, ctx);
++
++ while (ctx->r.listener.resp_list) {
++ resp = ctx->r.listener.resp_list;
++ ctx->r.listener.resp_list = resp->next;
++ kfree(resp);
++ i++;
++ }
++ if (i > 0)
++ printk("%s listener(%p) WARNING Found %d unsent responses\n", V2VDRV_LOGTAG, ctx, i);
++}
++
++void v2vdrv_run_listener(struct v2vdrv_config *config)
++{
++ struct v2vdrv_context *ctx;
++ int err;
++
++ ctx = kmalloc(sizeof(struct v2vdrv_context), GFP_KERNEL);
++ if (!ctx) {
++ printk("%s listener out of memory\n", V2VDRV_LOGTAG);
++ return;
++ }
++ memset(ctx, 0, sizeof(struct v2vdrv_context));
++ ctx->config = config;
++
++ if (config->async)
++ v2vdrv_async_init(ctx);
++
++ if (!v2vdrv_input_sanity_check(ctx)) {
++ kfree(ctx);
++ return;
++ }
++
++ err = v2vdrv_listen_accept(ctx);
++ if (err) {
++ kfree(ctx);
++ return;
++ }
++
++ /* This runs the main processing loop, when it is done we disconnect
++ and cleanup regardless of what may have occured */
++ if (config->async)
++ v2vdrv_listener_process_messages_async(ctx);
++ else
++ v2vdrv_listener_process_messages_sync(ctx);
++
++ v2vdrv_listener_disconnect(ctx);
++
++ kfree(ctx);
++}
++
++/********************** LISTENER SYNC ************************/
+ static int
+-v2vdrv_listener_process_internal_rx(struct v2vdrv_listener_context *vlc)
++v2vdrv_listener_process_internal_sync_rx(struct v2vdrv_context *ctx)
+ {
+ int err;
+ volatile void *msg;
+ size_t size;
+- unsigned type;
+- unsigned flags;
++ unsigned vtype;
++ unsigned vflags;
+ struct v2vdrv_frame_header *header;
+ struct v2vdrv_post_internal *vpi;
+ struct v2vdrv_listener_resp_item *vlri;
+ uint8_t sum;
+
+- while ((err = v2v_nc2_get_message(vlc->channel, (const volatile void**)&msg, &size, &type, &flags))
++ if (ctx->config->fastrx)
++ v2v_nc2_request_fast_receive(ctx->channel);
++
++ while ((err = v2v_nc2_get_message(ctx->channel, (const volatile void**)&msg, &size, &vtype, &vflags))
+ == 0) {
+- vlc->rx_counter++;
++ ctx->rx_counter++;
+ header = (struct v2vdrv_frame_header*)msg;
+ if (!v2vdrv_message_header_check("listener", header, size,
+- sizeof(struct v2vdrv_post_internal),
+- vlc->rx_counter)) {
+- v2v_nc2_finish_message(vlc->channel);
++ sizeof(struct v2vdrv_post_internal))) {
++ v2v_nc2_finish_message(ctx->channel);
+ return -EBADMSG;
+ }
+
+@@ -490,7 +874,7 @@ v2vdrv_listener_process_internal_rx(struct v2vdrv_listener_context *vlc)
+
+ sum = v2vdrv_checksum((const uint8_t*)msg, header->length);
+ if (sum != 0)
+- printk("%s listener(%p) bad checksumm on message #%d!!!\n", V2VDRV_LOGTAG, vlc, vlc->rx_counter);
++ printk("%s listener(%p) bad checksumm on message #%d!!!\n", V2VDRV_LOGTAG, ctx, ctx->rx_counter);
+
+ /* Queue a response */
+ vlri = kmalloc(sizeof(struct v2vdrv_listener_resp_item), GFP_KERNEL);
+@@ -501,35 +885,38 @@ v2vdrv_listener_process_internal_rx(struct v2vdrv_listener_context *vlc)
+ vlri->resp.header.cs = 0;
+ vlri->resp.header.length = sizeof(struct v2vdrv_resp_internal); /* header + resp data */
+ vlri->resp.status = (sum == 0 ? V2V_MESSAGE_STATUS_OK : V2V_MESSAGE_STATUS_BADCS);
+- memcpy(&vlri->resp.guid, vpi->guid, 16);
++ memcpy(&vlri->resp.guid[0], vpi->guid, sizeof(vlri->resp.guid));
+ vlri->resp.header.cs = v2vdrv_checksum((const uint8_t*)vlri, sizeof(struct v2vdrv_resp_internal));
+- if (vlc->resp_list) {
+- vlc->resp_tail->next = vlri;
+- vlc->resp_tail = vlri;
++ if (ctx->r.listener.resp_list) {
++ ctx->r.listener.resp_tail->next = vlri;
++ ctx->r.listener.resp_tail = vlri;
+ }
+ else {
+- vlc->resp_list = vlri;
+- vlc->resp_tail = vlri;
++ ctx->r.listener.resp_list = vlri;
++ ctx->r.listener.resp_tail = vlri;
+ }
+ }
+ else
+- printk("%s listener(%p) cannot queue response; out of memory\n", V2VDRV_LOGTAG, vlc);
++ printk("%s listener(%p) cannot queue response; out of memory\n", V2VDRV_LOGTAG, ctx);
+
+- v2v_nc2_finish_message(vlc->channel);
++ v2v_nc2_finish_message(ctx->channel);
+ }
++ if (ctx->config->fastrx)
++ v2v_nc2_cancel_fast_receive(ctx->channel);
++
+ if (err == -ENODATA) {
+ /* No more messages */
+- printk("%s listener(%p) no more messages, returning\n", V2VDRV_LOGTAG, vlc);
++ printk("%s listener(%p) no more messages, returning\n", V2VDRV_LOGTAG, ctx);
+ return 0;
+ }
+
+ printk("%s listener(%p) receive internal data failure; abort processing - error: %d\n",
+- V2VDRV_LOGTAG, vlc, err);
++ V2VDRV_LOGTAG, ctx, err);
+ return err; /* failure */
+ }
+
+ static int
+-v2vdrv_listener_process_internal_tx(struct v2vdrv_listener_context *vlc)
++v2vdrv_listener_process_internal_sync_tx(struct v2vdrv_context *ctx)
+ {
+ int err;
+ unsigned available;
+@@ -537,59 +924,60 @@ v2vdrv_listener_process_internal_tx(struct v2vdrv_listener_context *vlc)
+ uint8_t *msgp;
+ struct v2vdrv_listener_resp_item *vlri;
+
+- printk("%s listener(%p) sending internal response #%d\n", V2VDRV_LOGTAG, vlc, vlc->tx_counter + 1);
+- available = v2v_nc2_producer_bytes_available(vlc->channel);
+- printk("%s listener(%p) channel indicates minimum bytes available: 0x%x\n", V2VDRV_LOGTAG, vlc, available);
+- BUG_ON(vlc->resp_list == NULL);
++ printk("%s listener(%p) sending internal response #%d\n", V2VDRV_LOGTAG, ctx, ctx->tx_counter + 1);
++ available = v2v_nc2_producer_bytes_available(ctx->channel);
++ printk("%s listener(%p) channel indicates minimum bytes available: 0x%x\n", V2VDRV_LOGTAG, ctx, available);
++ BUG_ON(ctx->r.listener.resp_list == NULL);
++
++ /* No resizing fixed responses for fastrx */
+
+- err = v2v_nc2_prep_message(vlc->channel, sizeof(struct v2vdrv_resp_internal), V2V_MESSAGE_TYPE_INTERNAL, 0, &msg);
++ err = v2v_nc2_prep_message(ctx->channel, sizeof(struct v2vdrv_resp_internal), V2V_MESSAGE_TYPE_INTERNAL, 0, &msg);
+ if (err) {
+ if (err == -EAGAIN) {
+ /* No room right now, return and try again later */
+ printk("%s listener(%p) not enough buffer space to send response #%d; retry\n",
+- V2VDRV_LOGTAG, vlc, vlc->tx_counter + 1);
++ V2VDRV_LOGTAG, ctx, ctx->tx_counter + 1);
+ return -EAGAIN;
+ }
+ printk("%s listener(%p) transmit internal response failure; abort processing - error: %d\n",
+- V2VDRV_LOGTAG, vlc, err);
++ V2VDRV_LOGTAG, ctx, err);
+ return err; /* failure */
+ }
+- vlc->tx_counter++; /* next message */
+- vlri = vlc->resp_list;
+- vlc->resp_list = vlri->next;
+- if (!vlc->resp_list)
+- vlc->resp_tail = NULL;
+-
+- /* Response already formed, just copy it in */
+- mb();
++ ctx->tx_counter++; /* next message */
++ vlri = ctx->r.listener.resp_list;
++ ctx->r.listener.resp_list = vlri->next;
++ if (!ctx->r.listener.resp_list)
++ ctx->r.listener.resp_tail = NULL;
++
++ /* Response already formed, just copy it in */
+ msgp = (uint8_t*)msg;
+ memcpy(msgp, vlri, sizeof(struct v2vdrv_resp_internal));
+- mb();
+ kfree(vlri);
+
+- v2v_nc2_send_messages(vlc->channel);
++ v2v_nc2_send_messages(ctx->channel);
+
+ /* Keep the send loop going by setting the event. If there is no more room, the prep message call
+ will return ERROR_RETRY and just land us back in the wait. */
+- v2v_set_wake_reason(vlc->channel, V2V_WAKE_REASON_SEND);
++ v2v_set_wake_reason(ctx->channel, V2V_WAKE_REASON_SEND);
+
+ return 0;
+ }
+
+ static void
+-v2vdrv_listener_process_messages(struct v2vdrv_listener_context *vlc)
++v2vdrv_listener_process_messages_sync(struct v2vdrv_context *ctx)
+ {
+ int err = 0, done = 0;
+- enum v2v_endpoint_state state;
+ struct v2v_wait *wait_state;
+ u8 reasons_mask = V2V_WAKE_REASON_CONTROL|V2V_WAKE_REASON_RECEIVE;
+- u8 reasons;
++ u8 reason;
+
+- wait_state = v2v_get_wait_state(vlc->channel);
++ printk("%s listener(%p) started SYNC processing loop for transfer\n", V2VDRV_LOGTAG, ctx);
++
++ wait_state = v2v_get_wait_state(ctx->channel);
+
+ /* Start out processing loop, wait for message */
+ do {
+- if (vlc->resp_list)
++ if (ctx->r.listener.resp_list)
+ reasons_mask = V2V_WAKE_REASON_CONTROL|V2V_WAKE_REASON_RECEIVE|V2V_WAKE_REASON_SEND;
+ else
+ reasons_mask = V2V_WAKE_REASON_CONTROL|V2V_WAKE_REASON_RECEIVE;
+@@ -597,102 +985,355 @@ v2vdrv_listener_process_messages(struct v2vdrv_listener_context *vlc)
+ wait_event(wait_state->wait_event, atomic_xchg(&wait_state->wait_condition, 0) == 1);
+
+ do {
+- reasons = v2v_get_wake_reason(vlc->channel, reasons_mask);
++ reason = v2v_get_wake_reason(ctx->channel, reasons_mask);
+
+- if (reasons & V2V_WAKE_REASON_CONTROL) {
+- err = v2v_get_remote_state(vlc->channel, &state);
+- if (err) {
+- printk("%s listener(%p) failure in v2v_get_remote_state(); aborting - error: %d\n",
+- V2VDRV_LOGTAG, vlc, err);
+- done = 1;
++ if (reason & V2V_WAKE_REASON_CONTROL) {
++ done = v2vdrv_status_check(ctx, "listener");
++ if (done)
+ break;
+- }
+- printk("%s listener(%p) state changed for other end - new state: %s\n",
+- V2VDRV_LOGTAG, vlc, v2v_endpoint_state_name(state));
+- if (v2v_state_requests_disconnect(state)) {
+- printk("%s listener(%p) main processing loop ending for disconnect request...\n",
+- V2VDRV_LOGTAG, vlc);
+- err = 0;
+- done = 1;
+- break;
+- }
+ }
+-
+- if (reasons & V2V_WAKE_REASON_SEND) {
+- if (vlc->resp_list) {
+- err = v2vdrv_listener_process_internal_tx(vlc);
++ else if (reason & V2V_WAKE_REASON_SEND) {
++ if (ctx->r.listener.resp_list) {
++ err = v2vdrv_listener_process_internal_sync_tx(ctx);
+ if ((err != 0)&&(err != -EAGAIN)) {
+ done = 1;
+ break;
+ }
+ }
+ }
+-
+- if (reasons & V2V_WAKE_REASON_RECEIVE) {
+- err = v2vdrv_listener_process_internal_rx(vlc);
++ else if (reason & V2V_WAKE_REASON_RECEIVE) {
++ err = v2vdrv_listener_process_internal_sync_rx(ctx);
+ if (err) {
+ done = 1;
+ break;
+ }
+ /* we now have data, set the event again to process sends */
+- v2v_set_wake_reason(vlc->channel, V2V_WAKE_REASON_SEND);
++ v2v_set_wake_reason(ctx->channel, V2V_WAKE_REASON_SEND);
+ }
+- } while (reasons != V2V_WAKE_REASON_NONE);
++ } while (reason != V2V_WAKE_REASON_NONE);
+
+ if (done)
+ break;
+ } while (1);
+ }
+
+-static void
+-v2vdrv_listener_disconnect(struct v2vdrv_listener_context *vlc)
++/********************* LISTENER ASYNC ************************/
++static int v2vdrv_listener_process_internal_async_tx(struct v2vdrv_context *ctx);
++
++static int
++v2vdrv_listener_process_internal_async_rx(struct v2vdrv_context *ctx)
+ {
+ int err;
+- uint32_t i = 0;
+- struct v2vdrv_listener_resp_item *resp;
++ unsigned long flags;
++ uint32_t rxc = 0;
++ volatile void *msg;
++ size_t size;
++ unsigned vtype;
++ unsigned vflags;
++ struct v2vdrv_frame_header *header;
++ struct v2vdrv_post_internal *vpi;
++ struct v2vdrv_listener_resp_item *vlri;
++ uint8_t *pguid;
++ uint8_t sum;
+
+- printk("%s listener(%p) Disconnecting...\n", V2VDRV_LOGTAG, vlc);
+- err = v2v_disconnect(vlc->channel);
+- printk("%s listener(%p) Disconnected - status: %d\n", V2VDRV_LOGTAG, vlc, err);
++ do {
++ /* Create one up front, drop out if we can't */
++ vlri = kmalloc(sizeof(struct v2vdrv_listener_resp_item), GFP_KERNEL);
++ if (vlri == NULL) {
++ printk("%s listener(%p) cannot queue response; out of memory\n", V2VDRV_LOGTAG, ctx);
++ err = -ENOMEM;
++ break;
++ }
+
+- printk("%s listener(%p) Sent message counter: %d\n", V2VDRV_LOGTAG, vlc, vlc->tx_counter);
+- printk("%s listener(%p) Received response counter: %d\n", V2VDRV_LOGTAG, vlc, vlc->rx_counter);
+- if (vlc->tx_counter != vlc->rx_counter)
+- printk("%s listener(%p) WARNING Response count does not match the send count\n", V2VDRV_LOGTAG, vlc);
++ /* Critical section where we access the rings - have to lock this area */
++ spin_lock_irqsave(&ctx->s.async.rx_lock, flags);
+
+- while (vlc->resp_list) {
+- resp = vlc->resp_list;
+- vlc->resp_list = resp->next;
+- kfree(resp);
+- i++;
++ err = v2v_nc2_get_message(ctx->channel, (const volatile void**)&msg, &size, &vtype, &vflags);
++ if (err != 0) {
++ spin_unlock_irqrestore(&ctx->s.async.rx_lock, flags);
++ kfree(vlri);
++ break;
++ }
++
++ rxc = ++ctx->rx_counter;
++ header = (struct v2vdrv_frame_header*)msg;
++ if (!v2vdrv_message_header_check("listener", header, size,
++ sizeof(struct v2vdrv_post_internal))) {
++ v2v_nc2_finish_message(ctx->channel);
++ spin_unlock_irqrestore(&ctx->s.async.rx_lock, flags);
++ kfree(vlri);
++ err = -EBADMSG;
++ break;
++ }
++
++ vpi = (struct v2vdrv_post_internal*)msg;
++ pguid = &vpi->guid[0];
++
++ /* Save the GUID and checksum and exit the critical section */
++ vlri->resp.header.id = header->id;
++ memcpy(&vlri->resp.guid[0], pguid, sizeof(vlri->resp.guid));
++ sum = v2vdrv_checksum((const uint8_t*)msg, header->length);
++
++ v2v_nc2_finish_message(ctx->channel);
++ spin_unlock_irqrestore(&ctx->s.async.rx_lock, flags);
++
++ /* Out of critical section and the buffer in the ring has been released,
++ * back; can't touch the msg any longer - do some testing and tracing */
++ pguid = &vlri->resp.guid[0];
++ printk("------ GUID1=%2.2x:%2.2x:%2.2x:%2.2x:%2.2x:%2.2x:%2.2x:%2.2x\n",
++ pguid[0], pguid[1], pguid[2], pguid[3],
++ pguid[4], pguid[5], pguid[6], pguid[7]);
++ printk("------ GUID2=%2.2x:%2.2x:%2.2x:%2.2x:%2.2x:%2.2x:%2.2x:%2.2x\n",
++ pguid[8], pguid[9], pguid[10], pguid[11],
++ pguid[12], pguid[13], pguid[14], pguid[15]);
++ if (sum != 0)
++ printk("%s listener(%p) bad checksumm on message #%d!!!\n", V2VDRV_LOGTAG, ctx, ctx->rx_counter);
++
++ /* Finish setting up the response and queue it for sending */
++ vlri->next = NULL;
++ vlri->resp.header.type = V2V_MESSAGE_TYPE_INTERNAL;
++ vlri->resp.header.cs = 0;
++ vlri->resp.header.length = sizeof(struct v2vdrv_resp_internal); /* header + resp data */
++ vlri->resp.status = (sum == 0 ? V2V_MESSAGE_STATUS_OK : V2V_MESSAGE_STATUS_BADCS);
++ vlri->resp.header.cs = v2vdrv_checksum((const uint8_t*)vlri, sizeof(struct v2vdrv_resp_internal));
++
++ spin_lock_irqsave(&ctx->s.async.tx_lock, flags);
++ if (ctx->r.listener.resp_list) {
++ ctx->r.listener.resp_tail->next = vlri;
++ ctx->r.listener.resp_tail = vlri;
++ }
++ else {
++ ctx->r.listener.resp_list = vlri;
++ ctx->r.listener.resp_tail = vlri;
++ }
++ spin_unlock_irqrestore(&ctx->s.async.tx_lock, flags);
++
++ } while (1);
++
++ /* If we ended the receive handler with this code, there are no more message right
++ now. At this point we can send out any queued responses we might have queued */
++ if (err == -ENODATA) {
++ /* No more messages */
++ printk("%s listener(%p) no more messages, calling TX processor\n", V2VDRV_LOGTAG, ctx);
++ return v2vdrv_listener_process_internal_async_tx(ctx);
+ }
+- if (i > 0)
+- printk("%s listener(%p) WARNING Found %d unsent responses\n", V2VDRV_LOGTAG, vlc, i);
++
++ printk("%s listener(%p) receive internal data failure; abort processing - error: %d\n",
++ V2VDRV_LOGTAG, ctx, err);
++ return err; /* failure */
+ }
+
+-void v2vdrv_run_listener(struct v2vdrv_config *config)
++static int
++v2vdrv_listener_process_internal_async_tx(struct v2vdrv_context *ctx)
+ {
+- struct v2vdrv_listener_context *vlc;
+ int err;
++ unsigned long flags;
++ uint32_t txc = 0;
++ unsigned available;
++ volatile void *msg;
++ uint8_t *msgp;
++ struct v2vdrv_listener_resp_item *vlri;
+
+- vlc = kmalloc(sizeof(struct v2vdrv_listener_context), GFP_KERNEL);
+- if (!vlc) {
+- printk("%s listener out of memory\n", V2VDRV_LOGTAG);
+- return;
++ /* Loop and send any queued response */
++ do {
++ /* Critical section where we access the rings - have to lock this area */
++ spin_lock_irqsave(&ctx->s.async.tx_lock, flags);
++
++ if (ctx->r.listener.resp_list == NULL) {
++ spin_unlock_irqrestore(&ctx->s.async.tx_lock, flags);
++ printk("%s listener(%p) no responses to send in internal TX handler; exiting\n", V2VDRV_LOGTAG, ctx);
++ err = 0;
++ break;
++ }
++
++ available = v2v_nc2_producer_bytes_available(ctx->channel);
++ err = v2v_nc2_prep_message(ctx->channel, sizeof(struct v2vdrv_resp_internal), V2V_MESSAGE_TYPE_INTERNAL, 0, &msg);
++ if (err) {
++ spin_unlock_irqrestore(&ctx->s.async.tx_lock, flags);
++ if (err == -EAGAIN) {
++ /* No room right now, return and try again later */
++ printk("%s listener(%p) not enough buffer space to send message #%d; retry\n",
++ V2VDRV_LOGTAG, ctx, ctx->tx_counter + 1);
++ err = 0;
++ }
++ else /* failure */
++ printk("%s listener(%p) transmit internal data failure; abort processing - error: %d\n",
++ V2VDRV_LOGTAG, ctx, err);
++ break;
++ }
++
++ txc = ++ctx->tx_counter; /* next message */
++ vlri = ctx->r.listener.resp_list;
++ ctx->r.listener.resp_list = vlri->next;
++ if (!ctx->r.listener.resp_list)
++ ctx->r.listener.resp_tail = NULL;
++ /* Response already formed, just copy it in */
++ msgp = (uint8_t*)msg;
++ memcpy(msgp, vlri, sizeof(struct v2vdrv_resp_internal));
++ kfree(vlri);
++
++ v2v_nc2_send_messages(ctx->channel);
++ spin_unlock_irqrestore(&ctx->s.async.tx_lock, flags);
++
++ printk("%s listener(%p) sent internal response #%d\n", V2VDRV_LOGTAG, ctx, txc);
++ } while (1);
++
++ return err;
++}
++
++static void
++v2vdrv_listener_process_messages_async(struct v2vdrv_context *ctx)
++{
++ int done = 0;
++ struct v2v_wait *wait_state;
++ u8 reasons_mask = V2V_WAKE_REASON_CONTROL|V2V_WAKE_REASON_TERMINATE;
++ u8 reason;
++
++ printk("%s listener(%p) started ASYNC processing loop for transfer\n", V2VDRV_LOGTAG, ctx);
++
++ atomic_inc(&ctx->s.async.running);
++ wait_state = v2v_get_wait_state(ctx->channel);
++
++ /* Start out processing loop, wait for status changes */
++ do {
++ wait_event(wait_state->wait_event,
++ atomic_xchg(&wait_state->wait_condition, 0) == 1);
++
++ do {
++ reason = v2v_get_wake_reason(ctx->channel, reasons_mask);
++
++ if (reason & V2V_WAKE_REASON_CONTROL) {
++ done = v2vdrv_status_check(ctx, "listener");
++ if (done)
++ break;
++ }
++ else if (reason & V2V_WAKE_REASON_TERMINATE) {
++ /* The terminate event indicates an error since the listener side does not
++ complete or timeout through this means */
++ printk("%s listener(%p) async handlers signalled a terminate with error status=0x%x; exiting.\n",
++ V2VDRV_LOGTAG, ctx, ctx->s.async.term_status);
++ done = 1;
++ break;
++ }
++ } while (reason != V2V_WAKE_REASON_NONE);
++
++ if (done)
++ break;
++
++ } while (1);
++
++ atomic_dec(&ctx->s.async.running);
++}
++
++/********************** ASYNC COMMON *************************/
++static void
++v2vdrv_rx_work(struct work_struct *arg)
++{
++ struct v2vdrv_context *ctx = container_of(arg, struct v2vdrv_context, s.async.rx_work);
++ int err;
++
++ if (ctx->config->role == role_listener)
++ err = v2vdrv_listener_process_internal_async_rx(ctx);
++ else
++ err = v2vdrv_connector_process_internal_async_rx(ctx);
++
++ if (err) {
++ ctx->s.async.term_status = V2V_TERM_TX_ERROR;
++ v2v_set_wake_reason(ctx->channel, V2V_WAKE_REASON_TERMINATE);
+ }
+- memset(vlc, 0, sizeof(struct v2vdrv_listener_context));
+- vlc->config = config;
++}
+
+- err = v2vdrv_listen_accept(vlc);
+- if (err)
++static void
++v2vdrv_tx_work(struct work_struct *arg)
++{
++ struct v2vdrv_context *ctx = container_of(arg, struct v2vdrv_context, s.async.tx_work);
++ int err;
++
++ if (ctx->config->role == role_listener)
++ err = v2vdrv_listener_process_internal_async_tx(ctx);
++ else
++ err = v2vdrv_connector_process_internal_async_tx(ctx);
++
++ if (err) {
++ ctx->s.async.term_status = V2V_TERM_TX_ERROR;
++ v2v_set_wake_reason(ctx->channel, V2V_WAKE_REASON_TERMINATE);
++ }
++}
++
++static void
++v2vdrv_timeout_cb(unsigned long ptr)
++{
++ struct v2vdrv_context *ctx = (struct v2vdrv_context *)ptr;
++
++ ctx->s.async.term_status = V2V_TERM_TIMEOUT;
++ v2v_set_wake_reason(ctx->channel, V2V_WAKE_REASON_TERMINATE);
++}
++
++static void
++v2vdrv_receive_int(void *receive_ctx)
++{
++ struct v2vdrv_context *ctx = receive_ctx;
++
++ /* Avoid spurious interrupts before initialization is complete */
++ if (atomic_read(&ctx->s.async.running) == 0)
+ return;
+
+- /* This runs the main processing loop, when it is done we disconnect
+- and cleanup regardless of what may have occured */
+- v2vdrv_listener_process_messages(vlc);
++ /* The V2V async APIs allow raw interrupt service routines to be
++ registered to allow maximum flexibility to the client. In almost all
++ cases though, the best practice is to dispatch the work to a bottom
++ half and complete the irq asap. */
++ schedule_work(&ctx->s.async.rx_work);
++}
++
++static void
++v2vdrv_send_int(void *send_ctx)
++{
++ struct v2vdrv_context *ctx = send_ctx;
++
++ /* Avoid spurious interrupts before initialization is complete */
++ if (atomic_read(&ctx->s.async.running) == 0)
++ return;
++
++ /* See comments for v2vdrv_receive_int() */
++ schedule_work(&ctx->s.async.tx_work);
++}
+
+- v2vdrv_listener_disconnect(vlc);
++static void
++v2vdrv_control_cb(void *control_ctx)
++{
++ struct v2vdrv_context *ctx = control_ctx;
+
+- kfree(vlc);
++ /* Avoid xenstore events before initialization is complete */
++ if (atomic_read(&ctx->s.async.running) == 0)
++ return;
++
++ /* The async V2V functionality provides a way to register an
++ * async callback for control message processing as an alternative
++ * to waiting on the control event. For purposes of the sample,
++ * we will use the control event to keep the processing thread
++ * busy waiting for events.
++ */
++ v2vdrv_status_check(ctx, "ctrlcb");
+ }
+
++static void
++v2vdrv_async_init(struct v2vdrv_context *ctx)
++{
++ ctx->s.async.asv.receive_int = v2vdrv_receive_int;
++ ctx->s.async.asv.receive_ctx = ctx;
++ ctx->s.async.asv.send_int = v2vdrv_send_int;
++ ctx->s.async.asv.send_ctx = ctx;
++ ctx->s.async.asv.control_cb = v2vdrv_control_cb;
++ ctx->s.async.asv.control_ctx = ctx;
++ atomic_set(&ctx->s.async.running, 0);
++ INIT_WORK(&ctx->s.async.rx_work, v2vdrv_rx_work);
++ INIT_WORK(&ctx->s.async.tx_work, v2vdrv_tx_work);
++ if (ctx->config->role == role_connector) {
++ init_timer(&ctx->s.async.to_timer);
++ ctx->s.async.to_timer.data = (unsigned long)ctx;
++ ctx->s.async.to_timer.function = v2vdrv_timeout_cb;
++ }
++ spin_lock_init(&ctx->s.async.rx_lock);
++ spin_lock_init(&ctx->s.async.tx_lock);
++ ctx->asvp = &ctx->s.async.asv;
++}
+diff --git a/drivers/xen/v2v/v2vutl.c b/drivers/xen/v2v/v2vutl.c
+index 3dc36c7..454157c 100644
+--- a/drivers/xen/v2v/v2vutl.c
++++ b/drivers/xen/v2v/v2vutl.c
+@@ -223,4 +223,3 @@ v2v_xenops_grant_unmap(struct vm_struct *vm_area, grant_handle_t *ghandles, unsi
+
+ free_vm_area(vm_area);
+ }
+-
+diff --git a/include/xen/v2v.h b/include/xen/v2v.h
+index abef7be..5233f80 100644
+--- a/include/xen/v2v.h
++++ b/include/xen/v2v.h
+@@ -49,6 +49,43 @@
+ */
+ struct v2v_channel;
+
++/* Input structure used for initilializing asynchronous v2v
++ * comminucations. The @receive_int and @send_int values must be provided.
++ * These routines will be called back for receive and send event
++ * notification. The @control_cb may or may not be set depending on whether
++ * the caller wants to get control callbacks or use the control event through
++ * returned by v2v_get_control_event to process changes in control state.
++ * The @control_ctx will be passed in to the control callback routine when
++ * called.
++ *
++ * Note that the @receive_int and @send_int callbacks are actually in the
++ * context of the interrupt service routines for the event channels. This is
++ * done to allow the v2v client the most flexibility in processing events
++ * including doing work in the interrupt context (though this should be
++ * avoided). What this mainly allows is for the client to determine what
++ * bottom half processing it wants to emply. In most cases, the client's
++ * event handler callbacks would simply do something like the following:
++ *
++ * schedule_work(&my_context->my_work);
++ *
++ * And immediately returning; doing the actual event processing in the
++ * work handler.
++ *
++ * Listening, connecting and accepting the v2v channel for asynchronous
++ * operations is done the same way as for synchronous operations (except
++ * for providing this structure to the APIs). Once the channel is established,
++ * the client using v2v in asynchronous mode will be called back to indicate
++ * receipt of data or space availability for further sending.
++ */
++struct v2v_async {
++ void (*receive_int)(void *receive_ctx);
++ void *receive_ctx;
++ void (*send_int)(void *send_ctx);
++ void *send_ctx;
++ void (*control_cb)(void *control_ctx);
++ void *control_ctx;
++};
++
+ /* Wait state structure returned by v2v_get_wait_state. See this
+ * function for more details.
+ */
+@@ -67,12 +104,17 @@ struct v2v_wait {
+ * It is generally a mistake to have several processes listening on
+ * the same xenbus prefix.
+ *
++ * Passing a valid @async_values structure to this routine will
++ * initialize the endpoint for ascynchronous communications operation.
++ * To use synchronous operations, NULL should be passed.
++ *
+ * Returns 0 on success and an appropriate errno code on failure.
+ */
+ int v2v_listen(const char *xenbus_prefix,
+ struct v2v_channel **channel,
+ unsigned prod_ring_page_order,
+- unsigned cons_ring_page_order);
++ unsigned cons_ring_page_order,
++ struct v2v_async *async_values);
+
+ /* Wait for a remote domain to connect to the channel @channel, which
+ * should have been allocated with v2v_listen().
+@@ -90,10 +132,15 @@ int v2v_accept(struct v2v_channel *channel);
+ * This will fail if the remote endpoint is not currently listening on
+ * the specified channel.
+ *
++ * Passing a valid @async_values structure to this routine will
++ * initialize the endpoint for ascynchronous communications operation.
++ * To use synchronous operations, NULL should be passed.
++ *
+ * Returns 0 on success and an appropriate errno code on failure.
+ */
+ int v2v_connect(const char *xenbus_prefix,
+- struct v2v_channel **channel);
++ struct v2v_channel **channel,
++ struct v2v_async *async_values);
+
+ /* Disconnect from a VM-to-VM channel @channel which was previously
+ * established using either v2v_connect() or v2v_listen(). The channel
+@@ -202,11 +249,16 @@ const char *v2v_endpoint_state_name(enum v2v_endpoint_state state);
+ /* Wake reasons. The values can be OR'd together in the functions below
+ * to specify more than one wait reason.
+ */
+-#define V2V_WAKE_REASON_NONE 0x00
+-#define V2V_WAKE_REASON_CONTROL 0x01
+-#define V2V_WAKE_REASON_SEND 0x02
+-#define V2V_WAKE_REASON_RECEIVE 0x04
+-#define V2V_WAKE_REASON_ANY 0xFF
++#define V2V_WAKE_REASON_NONE 0x00
++#define V2V_WAKE_REASON_CONTROL 0x01
++#define V2V_WAKE_REASON_SEND 0x02
++#define V2V_WAKE_REASON_RECEIVE 0x04
++#define V2V_WAKE_REASON_RESERVED 0x08
++#define V2V_WAKE_REASON_USER1 0x10
++#define V2V_WAKE_REASON_USER2 0x20
++#define V2V_WAKE_REASON_USER3 0x40
++#define V2V_WAKE_REASON_USER4 0x80
++#define V2V_WAKE_REASON_ANY 0xFF
+
+ /* Retrieve the wait state object for the @channel. This object allows
+ * a thread to wait until one of several conditions is true. Normally
+@@ -235,19 +287,29 @@ const char *v2v_endpoint_state_name(enum v2v_endpoint_state state);
+ * V2V_WAKE_REASON_RECEIVE: This reason is set when data is recieved. Note
+ * that it may sometimes be set when there are no messages incoming, and
+ * will remain so until v2v_nc2_get_message() is called.
++ *
++ * V2V_WAKE_REASON_USERn: These 4 wait reasons can be used by clients
++ * to add user defined events to the queue. These wake reasons will
++ * never be set internally by the V2V library.
++ *
++ * For asynchronous operation, the wait state object will be used only
++ * for V2V_WAKE_REASON_CONTROL events.
+ */
+ struct v2v_wait *v2v_get_wait_state(struct v2v_channel *channel);
+
+ /* Retrive the @reasons that a wait was satisfied. The @reasons value
+- * is an OR'ed list list of the V2V_WAKE_REASONE_* values above. Each
++ * is an OR'ed list list of the V2V_WAKE_REASON_* values above. Each
+ * call to v2v_get_wake_reason will return that next wake reason in the
+ * internal queue that matches the @reasons mask. This routine can be
+ * called multiple times before and after a wait depending on the specific
+ * use case involved.
+ *
+- * Returns one of V2V_WAKE_REASONE_CONTROL, V2V_WAKE_REASONE_SEND or
+- * V2V_WAKE_REASONE_RECEIVE from the internal queue. When no more wake
++ * Returns one of V2V_WAKE_REASON_CONTROL, V2V_WAKE_REASON_SEND or
++ * V2V_WAKE_REASON_RECEIVE from the internal queue. When no more wake
+ * reasons are present, V2V_WAKE_REASON_NONE is returned.
++ *
++ * For asynchronous operation, only V2V_WAKE_REASON_CONTROL or
++ * V2V_WAKE_REASON_NONE would be returned.
+ */
+ u8 v2v_get_wake_reason(struct v2v_channel *channel, u8 reasons);
+
+@@ -258,7 +320,7 @@ u8 v2v_get_wake_reason(struct v2v_channel *channel, u8 reasons);
+ */
+ int v2v_set_wake_reason(struct v2v_channel *channel, u8 reason);
+
+-/* This routine can be used to explicitly set a wake @reason in the
++/* This routine can be used to explicitly clear wake @reasons in the
+ * internal queue.
+ */
+ void v2v_clear_wake_reason(struct v2v_channel *channel, u8 reasons);
+@@ -271,6 +333,11 @@ void v2v_clear_wake_reason(struct v2v_channel *channel, u8 reasons);
+ * written to, and a malicious remote could cause its contents to
+ * change at any time.
+ *
++ * When using asynchronous operations mode, the caller must provide
++ * locking (mutual exclusion) to this routine and the finalizing call
++ * to v2v_nc2_finish_message() within the same locked section (using the
++ * same synchronization object).
++ *
+ * Returns 0 on success. If no more messages are available, -ENODATA
+ * is returned. On failure an errno error code is returned.
+ *
+@@ -288,6 +355,11 @@ int v2v_nc2_get_message(struct v2v_channel *channel,
+ * released, so that the remote can use it to send another message,
+ * and the channel advances to the next incoming message.
+ *
++ * When using asynchronous operations mode, the caller must provide
++ * locking (mutual exclusion) to this routine and the initializing call
++ * to v2v_nc2_get_message() within the same locked section (using the
++ * same synchronization object).
++ *
+ * The payload returned by v2v_nc2_get_message() must not be touched
+ * once this returns.
+ */
+@@ -302,6 +374,11 @@ void v2v_nc2_finish_message(struct v2v_channel *channel);
+ * The message is not actually sent until v2v_nc2_send_messages() is
+ * called.
+ *
++ * When using asynchronous operations mode, the caller must provide
++ * locking (mutual exclusion) to this routine and the finalizing call
++ * to v2v_nc2_send_messages() within the same locked section (using the
++ * same synchronization object).
++ *
+ * If there is insufficient space in the ring, this routine requests
+ * that the remote endpoint set the local ring's send event when more
+ * space becomes available, the call returns -EAGAIN.
+@@ -319,6 +396,11 @@ int v2v_nc2_prep_message(struct v2v_channel *channel,
+ * (incorrectly behaving remote endpoints can always look ahead) and
+ * the remote is woken up if appropriate.
+ *
++ * When using asynchronous operations mode, the caller must provide
++ * locking (mutual exclusion) to this routine and the initializing call
++ * to v2v_nc2_prep_message() within the same locked section (using the
++ * same synchronization object).
++ *
+ * The client must not touch the payload pointers returned by
+ * v2v_nc2_prep_message() after calling this function.
+ */