]> xenbits.xen.org Git - xenclient/linux-2.6.27-pq.git/commitdiff
Add v2v async implementation.
authorRoss Philipson <ross.philipson@citrix.com>
Wed, 21 Oct 2009 18:15:48 +0000 (14:15 -0400)
committerRoss Philipson <ross.philipson@citrix.com>
Wed, 21 Oct 2009 18:15:48 +0000 (14:15 -0400)
This patch applies over the v2v-core patch and adds the asynchronous
functionality to the base code and test driver. It also fixes a
number of other bugs and issues in the core v2v code so they should
be used together.

 Changes to be committed:
modified:   master/series
new file:   master/v2v-async

master/series
master/v2v-async [new file with mode: 0644]

index b9c5a62eccfe7e0cd028dadce6ff0b7b90d11a7f..9d164ddd59dd0b4003a6036b4e0941409b3ca843 100644 (file)
@@ -354,3 +354,4 @@ bridge-carrier-follows-prio0.patch
 bsg-add-global-sgio-mutex.patch
 itpm
 v2v-core
+v2v-async
diff --git a/master/v2v-async b/master/v2v-async
new file mode 100644 (file)
index 0000000..77b32ed
--- /dev/null
@@ -0,0 +1,2350 @@
+diff --git a/drivers/xen/v2v/v2v.c b/drivers/xen/v2v/v2v.c
+index 8f6efd4..57d51be 100644
+--- a/drivers/xen/v2v/v2v.c
++++ b/drivers/xen/v2v/v2v.c
+@@ -3,8 +3,8 @@
+  *
+  * V2V interdomain communication driver.
+  * 
+- * Copyright (c) 2009 Steven Smith
+  * Copyright (c) 2009 Ross Philipson
++ * Copyright (c) 2009 Steven Smith
+  * Copyright (c) 2009 Citrix Systems, Inc.
+  *
+  * This program is free software; you can redistribute it and/or
+@@ -36,6 +36,7 @@
+ #include <linux/init.h>
+ #include <linux/module.h>
+ #include <linux/vmalloc.h>
++#include <linux/interrupt.h>
+ #include <xen/xenbus.h>
+ #include <xen/evtchn.h>
+ #include <xen/gnttab.h>
+@@ -67,6 +68,9 @@ struct v2v_channel {
+     
+     int receive_evtchn_irq;
+     int send_evtchn_irq;
++
++    /* async values */
++    struct v2v_async asv;
+     
+     unsigned nr_prod_ring_pages;
+     unsigned nr_cons_ring_pages;
+@@ -77,15 +81,17 @@ struct v2v_channel {
+     int has_watch;
+     int is_temple;
++    int is_sync;
+     union {
+         struct {
+             int has_grant_alloc;
++            int accepted;
+             grant_ref_t gref_head;
+             grant_ref_t prod_grefs[MAX_RING_PAGES];
+             grant_ref_t cons_grefs[MAX_RING_PAGES];
+-            grant_ref_t control_gref;
++            grant_ref_t control_gref;           
+         } temple;
+         struct {
+             struct vm_struct *prod_area;
+@@ -203,12 +209,23 @@ v2v_remote_state_changed(struct xenbus_watch *watch,
+         return; /* not much we can do */
+     v2v_set_event(channel);
++
++    /* Callback interested parties that regitered a control callback */
++    if (channel->is_temple && !channel->u.temple.accepted)
++        return;
++    if (channel->asv.control_cb)
++        channel->asv.control_cb(channel->asv.control_ctx);
+ }
+ static irqreturn_t
+ send_int(int irq, void *dev_id)
+ {
+     struct v2v_channel *channel = dev_id;
++
++    if (!channel->is_sync) {
++        channel->asv.send_int(channel->asv.send_ctx);
++        return IRQ_HANDLED;
++    }
+     
+     if (v2v_wrq_queue(channel, V2V_WAKE_REASON_SEND))
+         return IRQ_HANDLED; /* not much we can do */
+@@ -223,6 +240,11 @@ receive_int(int irq, void *dev_id)
+ {
+     struct v2v_channel *channel = dev_id;
+     
++    if (!channel->is_sync) {
++        channel->asv.receive_int(channel->asv.receive_ctx);
++        return IRQ_HANDLED;
++    }
++
+     if (v2v_wrq_queue(channel, V2V_WAKE_REASON_RECEIVE))
+         return IRQ_HANDLED; /* not much we can do */
+@@ -231,6 +253,26 @@ receive_int(int irq, void *dev_id)
+     return IRQ_HANDLED;
+ }
++static inline void
++v2v_close_receive_evtchn(struct v2v_channel *channel)
++{
++    DPRINTK("closing receive evtchn irg: %d\n", channel->receive_evtchn_irq);
++    if (channel->receive_evtchn_irq != 0) {
++        unbind_from_irqhandler(channel->receive_evtchn_irq, channel);        
++        channel->receive_evtchn_irq = 0;
++    }
++}
++
++static inline void
++v2v_close_send_evtchn(struct v2v_channel *channel)
++{
++    DPRINTK("closing send evtchn irg: %d\n", channel->send_evtchn_irq);
++    if (channel->send_evtchn_irq != 0) {
++        unbind_from_irqhandler(channel->send_evtchn_irq, channel);        
++        channel->send_evtchn_irq = 0;
++    }
++}
++
+ static void
+ v2v_destroy_channel(const struct v2v_channel *_chan, int free_temple)
+ {    
+@@ -308,10 +350,9 @@ v2v_destroy_channel(const struct v2v_channel *_chan, int free_temple)
+         DPRINTK("freed grant refs and rings for temple.\n");
+     }
+-    if (chan->receive_evtchn_irq != 0)
+-        unbind_from_irqhandler(chan->receive_evtchn_irq, chan);
+-    if (chan->send_evtchn_irq != 0)
+-        unbind_from_irqhandler(chan->send_evtchn_irq, chan);
++    v2v_close_receive_evtchn(chan);
++    v2v_close_send_evtchn(chan);
++
+     DPRINTK("unbound irq handlers.\n");
+     /* cleanup anything in chan->wait_entry_list once the evtchns are shutdown */
+@@ -340,7 +381,7 @@ v2v_read_peer_domid(struct v2v_channel *chan)
+ }
+ static struct v2v_channel *
+-v2v_make_channel(const char *xenbus_prefix)
++v2v_make_channel(const char *xenbus_prefix, struct v2v_async *async_values)
+ {
+     struct v2v_channel *chan;
+@@ -357,9 +398,17 @@ v2v_make_channel(const char *xenbus_prefix)
+     strcpy(chan->local_prefix, xenbus_prefix);
+     
+     init_waitqueue_head(&chan->wait_state.wait_event);
++    atomic_set(&chan->wait_state.wait_condition, 0);
+     spin_lock_init(&chan->wait_list_lock);
+     INIT_LIST_HEAD(&chan->wait_entry_list);
++    if (async_values) {
++        memcpy(&chan->asv, async_values, sizeof(struct v2v_async));        
++        chan->is_sync = 0;
++    }
++    else
++        chan->is_sync = 1;
++
+     DPRINTK("created channel: %p with local prefix - %s\n", chan, chan->local_prefix);
+     
+     return chan;
+@@ -479,7 +528,8 @@ v2v_change_local_state(struct v2v_channel *channel,
+ int
+ v2v_listen(const char *xenbus_prefix, struct v2v_channel **channel,
+-           unsigned prod_ring_page_order, unsigned cons_ring_page_order)
++           unsigned prod_ring_page_order, unsigned cons_ring_page_order,
++           struct v2v_async *async_values)
+ {
+     unsigned prod_ring_size = PAGE_SIZE << prod_ring_page_order;
+     unsigned cons_ring_size = PAGE_SIZE << cons_ring_page_order;
+@@ -499,9 +549,15 @@ v2v_listen(const char *xenbus_prefix, struct v2v_channel **channel,
+         return -EINVAL;
+     }
++    if (async_values && 
++       (!async_values->receive_int || !async_values->send_int)) {
++        EPRINTK("v2v_listen - invalid async arguments\n");
++        return -EINVAL;
++    }
++
+     *channel = NULL;
+-    chan = v2v_make_channel(xenbus_prefix);
++    chan = v2v_make_channel(xenbus_prefix, async_values);
+     if (!chan) {
+         EPRINTK("v2v_listen - out of memory making channel\n");
+         return -ENOMEM;
+@@ -587,8 +643,7 @@ v2v_listen(const char *xenbus_prefix, struct v2v_channel **channel,
+         gnttab_grant_foreign_access_ref(ref, chan->peer_domid, mfn, 0);
+         DPRINTK("created control grant ref\n");
+-        err = 
+-            bind_listening_port_to_irqhandler(chan->peer_domid, receive_int, 0, "v2v", chan);
++        err = bind_listening_port_to_irqhandler(chan->peer_domid, receive_int, 0, "v2v", chan);
+         if (err < 0) {
+             EPRINTK("bind_listening_port_to_irqhandler(receive) failed - err: %d\n", err);
+             goto err_out;
+@@ -596,8 +651,7 @@ v2v_listen(const char *xenbus_prefix, struct v2v_channel **channel,
+         chan->receive_evtchn_irq = err;
+         DPRINTK("created listening port for receive: %d\n", chan->receive_evtchn_irq);
+-        err = 
+-            bind_listening_port_to_irqhandler(chan->peer_domid, send_int, 0, "v2v", chan);
++        err = bind_listening_port_to_irqhandler(chan->peer_domid, send_int, 0, "v2v", chan);
+         if (err < 0) {
+             EPRINTK("bind_listening_port_to_irqhandler(send) failed - err: %d\n", err);
+             goto err_out;
+@@ -664,10 +718,8 @@ v2v_listen(const char *xenbus_prefix, struct v2v_channel **channel,
+                                        chan->u.temple.control_gref);
+         chan->u.temple.control_gref = GRANT_INVALID_REF;
+-        unbind_from_irqhandler(chan->receive_evtchn_irq, chan);
+-        chan->receive_evtchn_irq = 0;
+-        unbind_from_irqhandler(chan->send_evtchn_irq, chan);
+-        chan->send_evtchn_irq = 0;
++        v2v_close_receive_evtchn(chan);
++        v2v_close_send_evtchn(chan);
+         /* undo what v2v_connect_channel_xenbus did */
+         unregister_xenbus_watch(&chan->remote_state_watch);
+@@ -785,7 +837,7 @@ v2v_accept(struct v2v_channel *channel)
+             }
+             err = xenbus_transaction_end(xbt, 0);
+             if (err == 0)
+-                return 0;
++                goto final; /* success */
+             if (err != -EAGAIN) {
+                 EPRINTK("v2v_accept - error commiting xs transaction - err: %d\n", err);
+                 return err;
+@@ -794,6 +846,9 @@ v2v_accept(struct v2v_channel *channel)
+         }          
+     }
++final:
++    channel->u.temple.accepted = 1;
++
+     /* the initial state of the send event is set to wakeup for send since there
+        is always send room on startup of the rings */
+     err = v2v_wrq_queue(channel, V2V_WAKE_REASON_SEND);
+@@ -808,7 +863,8 @@ v2v_accept(struct v2v_channel *channel)
+ EXPORT_SYMBOL_GPL(v2v_accept);
+ int
+-v2v_connect(const char *xenbus_prefix, struct v2v_channel **channel)
++v2v_connect(const char *xenbus_prefix, struct v2v_channel **channel,
++            struct v2v_async *async_values)
+ {
+     int err = 0;
+     struct xenbus_transaction xbt = {0};
+@@ -825,10 +881,16 @@ v2v_connect(const char *xenbus_prefix, struct v2v_channel **channel)
+         return -EINVAL;
+     }
++    if (async_values && 
++       (!async_values->receive_int || !async_values->send_int)) {
++        EPRINTK("v2v_connect - invalid async arguments\n");
++        return -EINVAL;
++    }
++
+     *channel = NULL;
+     for (;;) {
+-        chan = v2v_make_channel(xenbus_prefix);
++        chan = v2v_make_channel(xenbus_prefix, async_values);
+         if (!chan) {
+             EPRINTK("v2v_connect - out of memory making channel\n");
+             return -ENOMEM;
+@@ -970,9 +1032,9 @@ v2v_connect(const char *xenbus_prefix, struct v2v_channel **channel)
+         chan->control = chan->u.supplicant.control_area->addr;
+         DPRINTK("mapped controle grant ref\n");
+-        err = 
+-            bind_interdomain_evtchn_to_irqhandler(chan->peer_domid, chan->u.supplicant.prod_evtchn_port,
+-                                                  receive_int, 0, "v2v", chan);
++        err = bind_interdomain_evtchn_to_irqhandler(chan->peer_domid,
++                                                    chan->u.supplicant.prod_evtchn_port,
++                                                    receive_int, 0, "v2v", chan);
+         if (err < 0) {
+             EPRINTK("bind_interdomain_evtchn_to_irqhandler(receive_int) failed - err: %d\n", err);
+             goto err_out;
+@@ -980,9 +1042,9 @@ v2v_connect(const char *xenbus_prefix, struct v2v_channel **channel)
+         chan->receive_evtchn_irq = err;
+         DPRINTK("bound interdomain port for receive: %d\n", chan->receive_evtchn_irq);
+-        err = 
+-            bind_interdomain_evtchn_to_irqhandler(chan->peer_domid, chan->u.supplicant.cons_evtchn_port,
+-                                                  send_int, 0, "v2v", chan);
++        err = bind_interdomain_evtchn_to_irqhandler(chan->peer_domid,
++                                                    chan->u.supplicant.cons_evtchn_port,
++                                                    send_int, 0, "v2v", chan);
+         if (err < 0) {
+             EPRINTK("bind_interdomain_evtchn_to_irqhandler(send_int) failed - err: %d\n", err);
+             goto err_out;
+@@ -1052,6 +1114,8 @@ v2v_disconnect_temple(const struct v2v_channel *_channel)
+     err = v2v_change_local_state(channel, XBT_NIL, v2v_state_disconnecting);
+     if (err)
+         return err;
++
++    channel->u.temple.accepted = 0;
+     
+     /* Get the other end to disconnect */
+     for (;;) {
+@@ -1167,14 +1231,8 @@ v2v_disconnect_temple(const struct v2v_channel *_channel)
+     if (!any_failed)
+         gnttab_free_grant_references(channel->u.temple.gref_head);
+-    if (channel->receive_evtchn_irq != 0) {
+-        unbind_from_irqhandler(channel->receive_evtchn_irq, channel);
+-        channel->receive_evtchn_irq = 0;
+-    }
+-    if (channel->send_evtchn_irq != 0) {
+-        unbind_from_irqhandler(channel->send_evtchn_irq, channel);
+-        channel->send_evtchn_irq = 0;
+-    }
++    v2v_close_receive_evtchn(channel);
++    v2v_close_send_evtchn(channel);
+     DPRINTK("finished disconnecting temple, channel: %p\n", channel);
+@@ -1197,14 +1255,8 @@ v2v_disconnect_supplicant(const struct v2v_channel *_channel)
+     channel->cons_sring = NULL;
+     channel->control = NULL;
+-    if (channel->receive_evtchn_irq != 0) {
+-        unbind_from_irqhandler(channel->receive_evtchn_irq, channel);
+-        channel->receive_evtchn_irq = 0;
+-    }
+-    if (channel->send_evtchn_irq != 0) {
+-        unbind_from_irqhandler(channel->send_evtchn_irq, channel);
+-        channel->send_evtchn_irq = 0;
+-    }
++    v2v_close_receive_evtchn(channel);
++    v2v_close_send_evtchn(channel);
+     err = v2v_change_local_state(channel, XBT_NIL, v2v_state_disconnected);
+     if (err) {
+@@ -1239,6 +1291,11 @@ EXPORT_SYMBOL_GPL(v2v_get_wait_state);
+ u8
+ v2v_get_wake_reason(struct v2v_channel *channel, u8 reasons)
+ {
++    if (!channel->is_sync) {
++        if ((reasons & (V2V_WAKE_REASON_SEND|V2V_WAKE_REASON_RECEIVE)) != 0)
++            return -EINVAL;
++    }
++
+     return v2v_wrq_dequeue(channel, reasons);
+ }
+ EXPORT_SYMBOL_GPL(v2v_get_wake_reason);
+@@ -1248,6 +1305,11 @@ v2v_set_wake_reason(struct v2v_channel *channel, u8 reason)
+ {
+     int err;
++    if (!channel->is_sync) {
++        if ((reason == V2V_WAKE_REASON_SEND)||(reason == V2V_WAKE_REASON_RECEIVE))
++            return -EINVAL;
++    }
++
+     err = v2v_wrq_queue(channel, reason);
+     if (err)
+         return err;
+@@ -1299,12 +1361,15 @@ retry:
+             goto retry;
+         }
+         /* ### clear the event (reset) */
+-        v2v_wrq_clear(channel, V2V_WAKE_REASON_RECEIVE);
++        if (channel->is_sync)
++            v2v_wrq_clear(channel, V2V_WAKE_REASON_RECEIVE);
+         if (nc2_final_check_for_messages(&channel->nc2_rings, prod)) {
+             /* ### now set the event */
+-            v2v_wrq_queue(channel, V2V_WAKE_REASON_RECEIVE);
+-            v2v_set_event(channel);
++            if (channel->is_sync) {
++                v2v_wrq_queue(channel, V2V_WAKE_REASON_RECEIVE);
++                v2v_set_event(channel);
++            }
+             goto retry;
+         }
+         return -ENODATA;
+@@ -1378,14 +1443,17 @@ v2v_nc2_prep_message(struct v2v_channel *channel,
+         v2v_nc2_send_messages(channel);
+     if (!nc2_can_send_payload_bytes(&channel->nc2_rings, rounded_size)) {
+         /* ### clear the event (reset) */
+-        v2v_wrq_clear(channel, V2V_WAKE_REASON_SEND);
++        if (channel->is_sync)
++            v2v_wrq_clear(channel, V2V_WAKE_REASON_SEND);
+         if (!nc2_can_send_payload_bytes(&channel->nc2_rings, rounded_size))
+             return -EAGAIN;
+         
+         /* ### now set the event */
+-        v2v_wrq_queue(channel, V2V_WAKE_REASON_SEND);
+-        v2v_set_event(channel);
++        if (channel->is_sync) {
++            v2v_wrq_queue(channel, V2V_WAKE_REASON_SEND);
++            v2v_set_event(channel);
++        }
+     }
+     __nc2_avoid_ring_wrap(&channel->nc2_rings, rounded_size);
+     hdr = __nc2_get_message_ptr(&channel->nc2_rings);
+@@ -1499,4 +1567,3 @@ module_init(v2v_init);
+ module_exit(v2v_cleanup);
+ MODULE_LICENSE("Dual BSD/GPL");
+-
+diff --git a/drivers/xen/v2v/v2v_private.h b/drivers/xen/v2v/v2v_private.h
+index c623a2d..4af02ee 100644
+--- a/drivers/xen/v2v/v2v_private.h
++++ b/drivers/xen/v2v/v2v_private.h
+@@ -70,14 +70,15 @@ void v2v_xenops_grant_unmap(struct vm_struct *vm_area,
+ #define       INVALID_GRANT_HANDLE  ((grant_handle_t)~0U)
+ #define GRANT_INVALID_REF     0
+-#define DPRINTK(fmt, args...)                         \
+-    pr_debug("v2v (%s:%d) " fmt,                  \
++#define DPRINTK(fmt, args...)               \
++    pr_debug("v2v-dbg (%s:%d) " fmt,        \
+          __FUNCTION__, __LINE__, ##args)
+-#define IPRINTK(fmt, args...)                         \
++
++#define IPRINTK(fmt, args...)               \
+     printk(KERN_INFO "v2v: " fmt, ##args)
+-#define WPRINTK(fmt, args...)                         \
++#define WPRINTK(fmt, args...)               \
+     printk(KERN_WARNING "v2v: " fmt, ##args)
+-#define EPRINTK(fmt, args...)                         \
++#define EPRINTK(fmt, args...)               \
+     printk(KERN_ERR "v2v: " fmt, ##args)
+ #endif /* !V2V_PRIVATE_H__ */
+diff --git a/drivers/xen/v2v/v2vdrv.c b/drivers/xen/v2v/v2vdrv.c
+index 0c362bb..dfa8a43 100644
+--- a/drivers/xen/v2v/v2vdrv.c
++++ b/drivers/xen/v2v/v2vdrv.c
+@@ -49,11 +49,13 @@
+ static const char v2vdrv_usage[] = \
+     "Run as either the listener or connector by writing the following information:\n" \
+-    "listener,<local_prefix>\n" \
+-    "connector,<local_prefix>,[count],[size],[timout]\n" \
++    "listener,<local_prefix>,[async],[fastrx]\n" \
++    "connector,<local_prefix>,[async],[fastrx],[count],[size],[timout]\n" \
+     "    local_prefix: XenStore local path for the V2V endpoint\n" \
+-    "    count: (opt) number of messages to send\n" \
+-    "    size: (opt) size of each message\n" \
++    "    async:  (opt) set to 1 to run in async mode, 0 for sync mode\n" \
++    "    fastrx: (opt) set to 1 to use fast rx mode (on with sync mode)\n" \
++    "    count:  (opt) number of messages to send\n" \
++    "    size:   (opt) size of each message\n" \
+     "    timout: (opt) timeout in ms for awaiting response\n" \
+     "Also a xenstore reader test routine:\n" \
+     "reader,<path>[:value]\n";
+@@ -61,13 +63,6 @@ static const char v2vdrv_usage[] = \
+ extern void v2vdrv_run_connector(struct v2vdrv_config *config);
+ extern void v2vdrv_run_listener(struct v2vdrv_config *config);
+-enum v2vdrv_role {
+-    role_unknown = 0,
+-    role_listener,
+-    role_connector,
+-    role_reader
+-};
+-
+ static void v2vdrv_run_reader(struct v2vdrv_config *config)
+ {
+     char *path = NULL;
+@@ -127,37 +122,40 @@ out:
+         kfree(value);
+ }
+-static enum v2vdrv_role v2vdrv_parse_config(const char *cfgstr, struct v2vdrv_config *config)
++static void
++v2vdrv_parse_config(const char *cfgstr, struct v2vdrv_config *config)
+ {
+     size_t len;
+-    enum v2vdrv_role role = role_unknown;
+     int i, err, val, parsed;
++    config->role = role_unknown;
+     len = strlen(cfgstr);
+-
++    
+     do {
+         if ((len > 9)&&(strncmp(cfgstr, "listener,", 9) == 0)) {
+             cfgstr += 9;
+             len -= 9;     
+-            role = role_listener;
++            config->role = role_listener;
+         }
+         else if ((len > 10)&&(strncmp(cfgstr, "connector,", 10) == 0)) {
+             cfgstr += 10;
+             len -= 10;
+-            role = role_connector;
++            config->role = role_connector;
+         }
+         else if ((len > 7)&&(strncmp(cfgstr, "reader,", 7) == 0)) {
+             cfgstr += 7;
+             len -= 7;
+-            role = role_reader;
++            config->role = role_reader;
+         }
+         else
+             break;
+-
++        
+         /* some defaults */
+         config->xfer_count = 0; /* connect only, no data */
+         config->xfer_size = 512;
+-        config->timeout = V2VDRV_RESPONSE_WAIT_TIMEOUT; 
++        config->timeout = msecs_to_jiffies((const unsigned long)V2VDRV_RESPONSE_WAIT_TIMEOUT);
++        config->async = 0; /* default to sync mode */
++        config->fastrx = 0; /* default to no fastrx mode */
+         for (i = 0; i < len; i++)
+             if (cfgstr[i] == ',')
+@@ -166,29 +164,50 @@ static enum v2vdrv_role v2vdrv_parse_config(const char *cfgstr, struct v2vdrv_co
+         /* this is our local prefix */
+         config->local_prefix = kmalloc(i + 1, GFP_KERNEL);
+         if (!config->local_prefix) {
+-            role = role_unknown;
++            config->role = role_unknown;
+             break;
+         }
+         strncpy(config->local_prefix, cfgstr, i);
+         config->local_prefix[i] = '\0';
+-        if (role == role_listener || role == role_reader)
++        if (config->role == role_reader)
+             break;
+         if (i == len) /* no opt args */
+             break;
+-        /* else this is a connector with more opt args */
++        /* else this is a listener/connector with more opt args */
+         cfgstr += i + 1;
+         parsed = 0;
+         err = sscanf(cfgstr, "%d,%n", &val, &parsed);
+         if (err < 1)
+             break;
+-                
++        config->async = val;
++        if (parsed == 0)
++            break;        
++
++        cfgstr += parsed;
++        parsed = 0;
++        err = sscanf(cfgstr, "%d,%n", &val, &parsed);
++        if (err < 1)
++            break;
++        config->fastrx = (uint32_t)val;
++        if (parsed == 0)
++            break;
++
++        /* all others are connector only */
++        if (config->role == role_listener)
++            break;
++
++        cfgstr += parsed;
++        parsed = 0;
++        err = sscanf(cfgstr, "%d,%n", &val, &parsed);
++        if (err < 1)
++            break;
+         config->xfer_count = (uint32_t)val;
+         if (parsed == 0)
+             break;
+-        
++
+         cfgstr += parsed;
+         parsed = 0;
+         err = sscanf(cfgstr, "%d,%n", &val, &parsed);
+@@ -205,8 +224,6 @@ static enum v2vdrv_role v2vdrv_parse_config(const char *cfgstr, struct v2vdrv_co
+             break;
+         config->timeout = msecs_to_jiffies((const unsigned long)val);
+     } while (0);
+-
+-    return role;
+ }
+ static ssize_t v2vdrv_read(struct file *file, char __user *buf,
+@@ -235,7 +252,6 @@ static ssize_t v2vdrv_write(struct file *file, const char __user *buf,
+ {
+     char *cfgstr = NULL;
+     struct v2vdrv_config *config = NULL;
+-    enum v2vdrv_role role;
+     size_t written = -EFAULT;
+     
+     if (count == 0)
+@@ -262,21 +278,31 @@ static ssize_t v2vdrv_write(struct file *file, const char __user *buf,
+     if (cfgstr[count - 1] == '\n')
+         cfgstr[count - 1] = '\0';
+-    role = v2vdrv_parse_config(cfgstr, config);
++    v2vdrv_parse_config(cfgstr, config);
++
++    /* input checking */
++    if (config->async && config->fastrx) {
++        printk("V2V-DRV WARNING cannot use async and fastrx mode together; disabling fastrx.\n");
++        config->fastrx = 0;
++    }
+-    if (role == role_listener) {
++    if (config->role == role_listener) {
+         printk("V2V-DRV loaded listener config...\n");
+         printk("        local prefix: %s\n", config->local_prefix);
++        printk("        async: %d\n", config->async);
++        printk("        fastrx: %d\n", config->fastrx);
+         v2vdrv_run_listener(config);
+     }
+-    else if (role == role_connector) {
++    else if (config->role == role_connector) {
+         printk("V2V-DRV loaded connector config...\n");
+         printk("        local prefix: %s\n", config->local_prefix);
++        printk("        async: %d\n", config->async);
++        printk("        fastrx: %d\n", config->fastrx);
+         printk("        count: %d size: %d timeout: %lu\n",
+                config->xfer_count, config->xfer_size, config->timeout);
+         v2vdrv_run_connector(config);
+     }
+-    else if (role == role_reader) {
++    else if (config->role == role_reader) {
+         printk("V2V-DRV loaded reader config...\n");
+         printk("        local prefix: %s\n", config->local_prefix);
+         v2vdrv_run_reader(config);
+diff --git a/drivers/xen/v2v/v2vdrv.h b/drivers/xen/v2v/v2vdrv.h
+index 8c3cd06..c695176 100644
+--- a/drivers/xen/v2v/v2vdrv.h
++++ b/drivers/xen/v2v/v2vdrv.h
+@@ -48,6 +48,13 @@
+ #define V2V_MESSAGE_STATUS_NODATA    0xFFFFF102
+ #define V2V_MESSAGE_STATUS_WRITE_ERR 0xFFFFF103
++enum v2vdrv_role {
++    role_unknown = 0,
++    role_listener,
++    role_connector,
++    role_reader
++};
++
+ struct v2vdrv_frame_header {
+     uint16_t id;
+     uint8_t  type;
+@@ -74,9 +81,12 @@ struct v2vdrv_listener_resp_item {
+ struct v2vdrv_config {
+     char *local_prefix;
++    enum v2vdrv_role role;
+     uint32_t xfer_count;
+     uint32_t xfer_size;
+     unsigned long timeout;
++    int async;
++    int fastrx;
+ };
+ static inline uint8_t v2vdrv_checksum(const uint8_t *ptr, uint32_t length)
+diff --git a/drivers/xen/v2v/v2vops.c b/drivers/xen/v2v/v2vops.c
+index 0c2798c..83854a2 100644
+--- a/drivers/xen/v2v/v2vops.c
++++ b/drivers/xen/v2v/v2vops.c
+@@ -1,7 +1,7 @@
+ /******************************************************************************
+  * drivers/xen/v2v/v2vops.c
+  *
+- * V2V sample client driver synchronous operations.
++ * V2V sample client driver operations.
+  * 
+  * Copyright (c) 2009 Ross Philipson
+  * Copyright (c) 2009 Citrix Systems, Inc.
+@@ -39,6 +39,7 @@
+ #include <linux/sched.h>
+ #include <linux/wait.h>
+ #include <linux/time.h>
++#include <linux/timer.h>
+ #include <linux/random.h>
+ #include <xen/xenbus.h>
+ #include <xen/v2v.h>
+@@ -46,38 +47,118 @@
+ #include "v2vdrv.h"
+ /************************** GENERAL **************************/
++#define V2V_WAKE_REASON_TERMINATE V2V_WAKE_REASON_USER1
++#define V2V_MAX_FASTRX_SIZE 1024
++#define MIN(x, y) ((x) < (y) ? (x) : (y))
++
++#define V2V_TERM_UNKNOWN        (uint32_t)-1
++#define V2V_TERM_COMPLETE       0x0
++#define V2V_TERM_GENERAL_ERROR  0x1
++#define V2V_TERM_RX_ERROR       0x2
++#define V2V_TERM_TX_ERROR       0x3
++#define V2V_TERM_TIMEOUT        0x4
++
++struct v2vdrv_context {
++    struct v2vdrv_config *config;
++    struct v2v_channel *channel;
++    struct v2v_async *asvp;
++    uint32_t tx_counter;
++    uint32_t rx_counter;
++
++    union {
++        struct {
++            struct v2vdrv_listener_resp_item *resp_list;
++            struct v2vdrv_listener_resp_item *resp_tail;
++        } listener;
++        struct {
++            int reserved;
++        } connector;
++    } r;
++
++    union {
++        struct {
++            struct v2v_async asv;
++            struct work_struct rx_work;
++            struct work_struct tx_work;
++            struct timer_list to_timer;
++            spinlock_t rx_lock;
++            spinlock_t tx_lock;
++            atomic_t running;
++            uint32_t term_status;
++        } async;
++        struct {
++            int reserved;
++        } sync;
++    } s;
++};
++
++static void v2vdrv_async_init(struct v2vdrv_context *ctx);
++
++static int
++v2vdrv_input_sanity_check(struct v2vdrv_context *ctx)
++{
++    if (ctx->config->role == role_connector) {
++        if (ctx->config->xfer_size <= sizeof(struct v2vdrv_post_internal)) {
++            printk("%s (%s) transfer size %d (0x%x) too small; %d (0x%x) required\n", V2VDRV_LOGTAG,
++                   "connector", ctx->config->xfer_size, ctx->config->xfer_size,
++                   sizeof(struct v2vdrv_post_internal) + 1, sizeof(struct v2vdrv_post_internal) + 1);
++            return 0;
++        }
++    }
++    /* listener always sends fixed sized messages and doesn use xfer_size */
++
++    return 1;
++}
++
+ static int
+ v2vdrv_message_header_check(const char *rstr,
+                             struct v2vdrv_frame_header *header,
+                             size_t msg_size,
+-                            size_t min_size,
+-                            uint32_t rx_counter)
++                            size_t min_size)
+ {
+     if ((msg_size < sizeof(struct v2vdrv_frame_header))||(msg_size < min_size)) {
+-        printk("%s (%s) response #%d is too small!!!\n", V2VDRV_LOGTAG, rstr, rx_counter);
++        printk("%s (%s) response is too small!!!\n", V2VDRV_LOGTAG, rstr);
+         return 0;
+     }
+     if (header->length < msg_size) {
+-        printk("%s (%s) response #%d header length incorrect!!!\n", V2VDRV_LOGTAG, rstr, rx_counter);
++        printk("%s (%s) response header length incorrect!!!\n", V2VDRV_LOGTAG, rstr);
+         return 0;
+     }       
+     
+-    printk("%s (%s) received message #%d\n", V2VDRV_LOGTAG, rstr, rx_counter);
++    printk("%s (%s) received message\n", V2VDRV_LOGTAG, rstr);
+     printk("------ id=%d type=%d length=0x%x\n", header->id, header->type, header->length);
+     return 1;
+ }
++static int
++v2vdrv_status_check(struct v2vdrv_context *ctx, const char *rstr)
++{
++    int err;
++    enum v2v_endpoint_state state;
++    
++    err = v2v_get_remote_state(ctx->channel, &state);
++    if (err) {
++        printk("%s %s(%p) failure in v2v_get_remote_state(); aborting - error: %d\n",
++               V2VDRV_LOGTAG, rstr, ctx, err);
++        return 1; /* done with error */
++    }
++    printk("%s %s(%p) state changed for other end - new state: %s\n",
++           V2VDRV_LOGTAG, rstr, ctx, v2v_endpoint_state_name(state));
++    if (v2v_state_requests_disconnect(state)) {
++        printk("%s %s(%p) main processing loop ending for disconnect request...\n",
++               V2VDRV_LOGTAG, rstr, ctx);
++        return 1; /* done */        
++    }
++    return 0;
++}
++
+ /************************* CONNECTOR *************************/
+-struct v2vdrv_connector_context {
+-    struct v2vdrv_config *config;
+-    struct v2v_channel *channel;
+-    uint32_t tx_counter;
+-    uint32_t rx_counter;
+-};
++static void v2vdrv_connector_process_messages_sync(struct v2vdrv_context *ctx);
++static void v2vdrv_connector_process_messages_async(struct v2vdrv_context *ctx);
+ static int
+-v2vdrv_connect(struct v2vdrv_connector_context *vcc)
++v2vdrv_connect(struct v2vdrv_context *ctx)
+ {
+     int err = 0;    
+     enum v2v_endpoint_state state;
+@@ -87,18 +168,18 @@ v2vdrv_connect(struct v2vdrv_connector_context *vcc)
+     struct timespec ts = {0}, tsz = {0}, now, delta;
+     /* Connect to the listener, get back a channel handle */
+-    err = v2v_connect(vcc->config->local_prefix, &vcc->channel);
++    err = v2v_connect(ctx->config->local_prefix, &ctx->channel, ctx->asvp);
+     if (err) {
+-        printk("%s connector(%p) failure in v2v_connect() - error: %d\n", V2VDRV_LOGTAG, vcc, err);
++        printk("%s connector(%p) failure in v2v_connect() - error: %d\n", V2VDRV_LOGTAG, ctx, err);
+         return err;
+     }
+-    BUG_ON(vcc->channel == NULL);
++    BUG_ON(ctx->channel == NULL);
+-    printk("%s connector(%p) connected to listener; wait for listenter to indicate it has accepted the connection...\n", V2VDRV_LOGTAG, vcc);    
++    printk("%s connector(%p) connected to listener; wait for listenter to indicate it has accepted the connection...\n", V2VDRV_LOGTAG, ctx);    
+-    wait_state = v2v_get_wait_state(vcc->channel);
+-    to = vcc->config->timeout << 2; /* in jiffies x4*/
++    wait_state = v2v_get_wait_state(ctx->channel);
++    to = ctx->config->timeout << 2; /* in jiffies x4*/
+     
+     do {
+         if (!timespec_equal(&ts, &tsz)) {
+@@ -120,24 +201,24 @@ v2vdrv_connect(struct v2vdrv_connector_context *vcc)
+                                 to);
+         if (rc == 0) {
+             printk("%s connector(%p) timed out waiting for accept from listener; disconnecting\n",
+-                   V2VDRV_LOGTAG, vcc);
++                   V2VDRV_LOGTAG, ctx);
+             err = -ETIMEDOUT;
+             break;
+         }
+-        reasons = v2v_get_wake_reason(vcc->channel, V2V_WAKE_REASON_CONTROL);
++        reasons = v2v_get_wake_reason(ctx->channel, V2V_WAKE_REASON_CONTROL);
+         if (reasons & V2V_WAKE_REASON_CONTROL) {
+-            err = v2v_get_remote_state(vcc->channel, &state);
++            err = v2v_get_remote_state(ctx->channel, &state);
+             if (err) {
+                 printk("%s connector(%p) failure in v2v_get_remote_state(); aborting - error: %d\n",
+-                       V2VDRV_LOGTAG, vcc, err);
++                       V2VDRV_LOGTAG, ctx, err);
+                 break;
+             }
+             printk("%s connector(%p) state changed for other end - new state: %s\n",
+-                   V2VDRV_LOGTAG, vcc, v2v_endpoint_state_name(state));
++                   V2VDRV_LOGTAG, ctx, v2v_endpoint_state_name(state));
+             if (state == v2v_state_connected) {
+                 printk("%s connector(%p) listener reports connected; begin processing messages.\n",
+-                       V2VDRV_LOGTAG, vcc);
++                       V2VDRV_LOGTAG, ctx);
+                 err = 0;
+                 break;
+             }
+@@ -145,31 +226,89 @@ v2vdrv_connect(struct v2vdrv_connector_context *vcc)
+     } while (1);
+     if (err)
+-        v2v_disconnect(vcc->channel);
++        v2v_disconnect(ctx->channel);
+     return err;
+ }
++static void
++v2vdrv_connector_disconnect(struct v2vdrv_context *ctx)
++{
++    int err;
++
++    printk("%s connector(%p) Disconnecting...\n", V2VDRV_LOGTAG, ctx);
++    err = v2v_disconnect(ctx->channel);
++    printk("%s connector(%p) Disconnected - status: %d\n", V2VDRV_LOGTAG, ctx, err);
++
++    printk("%s connector(%p) Sent message counter: %d\n", V2VDRV_LOGTAG, ctx, ctx->tx_counter);
++    printk("%s connector(%p) Received response counter: %d\n", V2VDRV_LOGTAG, ctx, ctx->rx_counter);
++
++    if (ctx->tx_counter != ctx->rx_counter)
++        printk("%s connector(%p) WARNING Response count does not match the send count\n", V2VDRV_LOGTAG, ctx);
++}
++
++void v2vdrv_run_connector(struct v2vdrv_config *config)
++{
++    struct v2vdrv_context *ctx;
++    int err;
++
++    ctx = kmalloc(sizeof(struct v2vdrv_context), GFP_KERNEL);
++    if (!ctx) {
++        printk("%s connector out of memory\n", V2VDRV_LOGTAG);
++        return;
++    }
++    memset(ctx, 0, sizeof(struct v2vdrv_context));
++    ctx->config = config;
++
++    if (config->async)
++        v2vdrv_async_init(ctx);
++
++    if (!v2vdrv_input_sanity_check(ctx)) {
++        kfree(ctx);
++        return;
++    }
++
++    err = v2vdrv_connect(ctx);
++    if (err) {
++        kfree(ctx);
++        return;
++    }
++
++    /* This runs the main processing loop, when it is done we disconnect
++       and cleanup regardless of what may have occured */
++    if (config->async)
++        v2vdrv_connector_process_messages_async(ctx);
++    else
++        v2vdrv_connector_process_messages_sync(ctx);
++
++    v2vdrv_connector_disconnect(ctx);
++
++    kfree(ctx);
++}
++
++/********************** CONNECTOR SYNC ***********************/
+ static int
+-v2vdrv_connector_process_internal_rx(struct v2vdrv_connector_context *vcc)
++v2vdrv_connector_process_internal_sync_rx(struct v2vdrv_context *ctx)
+ {
+     int err;
+     volatile void *msg;
+     size_t size;
+-    unsigned type;
+-    unsigned flags;
++    unsigned vtype;
++    unsigned vflags;
+     struct v2vdrv_frame_header *header;
+     struct v2vdrv_resp_internal *vri;
+     uint8_t sum;
+-    while ((err = v2v_nc2_get_message(vcc->channel, (const volatile void **)&msg, &size, &type, &flags))
++    if (ctx->config->fastrx)
++        v2v_nc2_request_fast_receive(ctx->channel);
++
++    while ((err = v2v_nc2_get_message(ctx->channel, (const volatile void **)&msg, &size, &vtype, &vflags))
+             == 0) {
+-        vcc->rx_counter++;   
++        ctx->rx_counter++;   
+         header = (struct v2vdrv_frame_header*)msg;
+         if (!v2vdrv_message_header_check("connector", header, size,
+-                                         sizeof(struct v2vdrv_resp_internal),
+-                                         vcc->rx_counter)) {
+-            v2v_nc2_finish_message(vcc->channel);
++                                         sizeof(struct v2vdrv_resp_internal))) {
++            v2v_nc2_finish_message(ctx->channel);
+             return -EBADMSG;
+         }
+@@ -184,97 +323,107 @@ v2vdrv_connector_process_internal_rx(struct v2vdrv_connector_context *vcc)
+         sum = v2vdrv_checksum((const uint8_t*)msg, header->length);
+         if (sum != 0)         
+-            printk("%s connector(%p) bad checksumm on response #%d!!!\n", V2VDRV_LOGTAG, vcc, vcc->rx_counter);
++            printk("%s connector(%p) bad checksumm on response #%d!!!\n", V2VDRV_LOGTAG, ctx, ctx->rx_counter);
+-        v2v_nc2_finish_message(vcc->channel);
++        v2v_nc2_finish_message(ctx->channel);
+     }
++    if (ctx->config->fastrx)
++        v2v_nc2_cancel_fast_receive(ctx->channel);
++
+     if (err == -ENODATA) {
+         /* No more messages */
+-        printk("%s connector(%p) no more messages, returning\n", V2VDRV_LOGTAG, vcc);
++        printk("%s connector(%p) no more messages, returning\n", V2VDRV_LOGTAG, ctx);
+         return 0;
+     }
+     printk("%s connector(%p) receive internal data failure; abort processing - error: %d\n",
+-           V2VDRV_LOGTAG, vcc, err);
++           V2VDRV_LOGTAG, ctx, err);
+     return err; /* failure */
+ }
+ static int
+-v2vdrv_connector_process_internal_tx(struct v2vdrv_connector_context *vcc)
++v2vdrv_connector_process_internal_sync_tx(struct v2vdrv_context *ctx)
+ {
+     int err;
+     unsigned available;
+     volatile void *msg;
+     uint8_t *msgp;
++    size_t msize;
+     struct v2vdrv_frame_header *header;
+     struct v2vdrv_post_internal *vpi;
+-    printk("%s connector(%p) sending internal message #%d\n", V2VDRV_LOGTAG, vcc, vcc->tx_counter + 1);
+-    available = v2v_nc2_producer_bytes_available(vcc->channel);
+-    printk("%s connector(%p) channel indicates minimum bytes available: 0x%x\n", V2VDRV_LOGTAG, vcc, available);
++    printk("%s connector(%p) sending internal message #%d\n", V2VDRV_LOGTAG, ctx, ctx->tx_counter + 1);
++    available = v2v_nc2_producer_bytes_available(ctx->channel);
++    printk("%s connector(%p) channel indicates minimum bytes available: 0x%x\n", V2VDRV_LOGTAG, ctx, available);
++
++    if (ctx->config->fastrx && v2v_nc2_remote_requested_fast_wakeup(ctx->channel))
++        msize = MIN(ctx->config->xfer_size, V2V_MAX_FASTRX_SIZE);
++    else
++        msize = ctx->config->xfer_size;
+-    err = v2v_nc2_prep_message(vcc->channel, vcc->config->xfer_size, V2V_MESSAGE_TYPE_INTERNAL, 0, &msg);
++    err = v2v_nc2_prep_message(ctx->channel, msize, V2V_MESSAGE_TYPE_INTERNAL, 0, &msg);
+     if (err) {
+         if (err == -EAGAIN) {
+             /* No room right now, return and try again later */
+             printk("%s connector(%p) not enough buffer space to send message #%d; retry\n",
+-                   V2VDRV_LOGTAG, vcc, vcc->tx_counter + 1);
++                   V2VDRV_LOGTAG, ctx, ctx->tx_counter + 1);
+             return -EAGAIN;
+         }
+         printk("%s connector(%p) transmit internal data failure; abort processing - error: %d\n",
+-               V2VDRV_LOGTAG, vcc, err);
++               V2VDRV_LOGTAG, ctx, err);
+         return err; /* failure */
+     }
+-    vcc->tx_counter++; /* next message */
++    ctx->tx_counter++; /* next message */
+     header = (struct v2vdrv_frame_header*)msg;
+-    header->id = (uint16_t)vcc->tx_counter;
++    header->id = (uint16_t)ctx->tx_counter;
+     header->type = V2V_MESSAGE_TYPE_INTERNAL;
+     header->cs = 0;
+-    header->length = vcc->config->xfer_size;
++    header->length = ctx->config->xfer_size;
+     vpi = (struct v2vdrv_post_internal*)msg;
+     generate_random_uuid(vpi->guid);
+     
+     /* Fill it up with some data and send it */
+     msgp = (uint8_t*)msg + sizeof(struct v2vdrv_post_internal);
+-    memset(msgp, 'X', (vcc->config->xfer_size - sizeof(struct v2vdrv_post_internal)));
+-    header->cs = v2vdrv_checksum((const uint8_t*)msg, vcc->config->xfer_size);
+-    v2v_nc2_send_messages(vcc->channel);
++    memset(msgp, 'X', (ctx->config->xfer_size - sizeof(struct v2vdrv_post_internal)));
++    header->cs = v2vdrv_checksum((const uint8_t*)msg, ctx->config->xfer_size);
++    v2v_nc2_send_messages(ctx->channel);
+     /* Keep the send loop going by setting the event. If there is no more room, the prep message call
+        will return ERROR_RETRY and just land us back in the wait. */
+-    v2v_set_wake_reason(vcc->channel, V2V_WAKE_REASON_SEND);
++    v2v_set_wake_reason(ctx->channel, V2V_WAKE_REASON_SEND);
+     
+     return 0;
+ }
+ static void
+-v2vdrv_connector_process_messages(struct v2vdrv_connector_context *vcc)
++v2vdrv_connector_process_messages_sync(struct v2vdrv_context *ctx)
+ {
+     int err = 0, wait_to = 0, done = 0;
+     struct v2v_wait *wait_state;
+     u8 reasons_mask = V2V_WAKE_REASON_CONTROL|V2V_WAKE_REASON_RECEIVE|V2V_WAKE_REASON_SEND;
+-    u8 reasons;
+-    enum v2v_endpoint_state state;
++    u8 reason;
+     unsigned long to, td, rc;
+     struct timespec ts = {0}, tsz = {0}, now, delta;
++    printk("%s connector(%p) started SYNC processing loop for transfer\n", V2VDRV_LOGTAG, ctx);
++
+     /* A transfer count of 0 is used to just test connecting and disconnecting
+        w/o sending any data */
+-    if (vcc->config->xfer_count == 0) {      
+-        printk("%s connector(%p) tranfer count set to 0; disconnecting.\n", V2VDRV_LOGTAG, vcc);
++    if (ctx->config->xfer_count == 0) {      
++        printk("%s connector(%p) tranfer count set to 0; disconnecting.\n", V2VDRV_LOGTAG, ctx);
+         return;
+     }
+-    wait_state = v2v_get_wait_state(vcc->channel);
+-    to = vcc->config->timeout << 2; /* in jiffies x4*/
++    wait_state = v2v_get_wait_state(ctx->channel);
++    to = ctx->config->timeout; /* in jiffies */
+     /* Send our first file chunk to the listener to start things off */
+-    err = v2vdrv_connector_process_internal_tx(vcc);
++    err = v2vdrv_connector_process_internal_sync_tx(ctx);
+     if (err) {        
+         /* we should not get an -EAGAIN on first message, return it as an error */
+         BUG_ON(err == -EAGAIN); /* SNO on first message */
+         printk("%s connector(%p) transmit internal data failure on first message; abort processing - error: %d\n",
+-               V2VDRV_LOGTAG, vcc, err);
++               V2VDRV_LOGTAG, ctx, err);
+         return;
+     }
+@@ -282,10 +431,10 @@ v2vdrv_connector_process_messages(struct v2vdrv_connector_context *vcc)
+     do {
+         /* When the tx counter reaches the transfer count value, stop sending and wait for 
+            the rest of the responses */
+-        if (vcc->tx_counter == vcc->config->xfer_count) {
++        if (ctx->tx_counter == ctx->config->xfer_count) {
+             /* First see if we are done */
+-            if (vcc->rx_counter == vcc->tx_counter) {
+-                printk("%s connector(%p) received all remaing responses from listener; disconnecting.\n", V2VDRV_LOGTAG, vcc);
++            if (ctx->rx_counter == ctx->tx_counter) {
++                printk("%s connector(%p) received all remaing responses from listener; disconnecting.\n", V2VDRV_LOGTAG, ctx);
+                 err = 0;
+                 break;
+             }
+@@ -318,50 +467,35 @@ v2vdrv_connector_process_messages(struct v2vdrv_connector_context *vcc)
+         }
+         if (rc == 0) {
+             printk("%s connector(%p) timed out waiting for ack responses from listener; disconnecting\n",
+-                   V2VDRV_LOGTAG, vcc);
++                   V2VDRV_LOGTAG, ctx);
+             break;         
+         }
+         do {
+-            reasons = v2v_get_wake_reason(vcc->channel, reasons_mask);
++            reason = v2v_get_wake_reason(ctx->channel, reasons_mask);
+-            if (reasons & V2V_WAKE_REASON_CONTROL) {
+-                err = v2v_get_remote_state(vcc->channel, &state);
+-                if (err) {
+-                    printk("%s connector(%p) failure in v2v_get_remote_state(); aborting - error: %d\n",
+-                           V2VDRV_LOGTAG, vcc, err);
+-                    done = 1;
+-                    break;
+-                }
+-                printk("%s connector(%p) state changed for other end - new state: %s\n",
+-                       V2VDRV_LOGTAG, vcc, v2v_endpoint_state_name(state));
+-                if (v2v_state_requests_disconnect(state)) {
+-                    printk("%s connector(%p) main processing loop ending for disconnect request...\n",
+-                           V2VDRV_LOGTAG, vcc);
+-                    err = 0;
+-                    done = 1;
+-                    break;
+-                }
++            if (reason & V2V_WAKE_REASON_CONTROL) {
++                done = v2vdrv_status_check(ctx, "connector");
++                if (done)
++                    break;               
+             }
+-
+-            if (reasons & V2V_WAKE_REASON_SEND) {
+-                if (vcc->tx_counter != vcc->config->xfer_count) {
+-                    err = v2vdrv_connector_process_internal_tx(vcc);
++            else if (reason & V2V_WAKE_REASON_SEND) {
++                if (ctx->tx_counter != ctx->config->xfer_count) {
++                    err = v2vdrv_connector_process_internal_sync_tx(ctx);
+                     if ((err != 0)&&(err != -EAGAIN)) {
+                         done = 1;
+                         break;
+                     }
+                 }
+             }
+-
+-            if (reasons & V2V_WAKE_REASON_RECEIVE) {
+-                err = v2vdrv_connector_process_internal_rx(vcc);
++            else if (reason & V2V_WAKE_REASON_RECEIVE) {
++                err = v2vdrv_connector_process_internal_sync_rx(ctx);
+                 if (err) {
+                     done = 1;
+                     break;
+                 }
+             }
+-        } while (reasons != V2V_WAKE_REASON_NONE);
++        } while (reason != V2V_WAKE_REASON_NONE);
+         if (done)
+             break;
+@@ -369,114 +503,364 @@ v2vdrv_connector_process_messages(struct v2vdrv_connector_context *vcc)
+     } while (1);
+ }
+-static void
+-v2vdrv_connector_disconnect(struct v2vdrv_connector_context *vcc)
++/********************* CONNECTOR ASYNC ***********************/
++static int
++v2vdrv_connector_process_internal_async_rx(struct v2vdrv_context *ctx)
+ {
+-    int err;
++    int err, last = 0;
++    unsigned long flags;
++    uint32_t rxc = 0;
++    volatile void *msg;
++    size_t size;
++    unsigned vtype;
++    unsigned vflags;
++    struct v2vdrv_frame_header *header;
++    struct v2vdrv_resp_internal lvri;
++    uint8_t *pguid;
++    uint8_t sum;
+-    printk("%s connector(%p) Disconnecting...\n", V2VDRV_LOGTAG, vcc);
+-    err = v2v_disconnect(vcc->channel);
+-    printk("%s connector(%p) Disconnected - status: %d\n", V2VDRV_LOGTAG, vcc, err);
++    do {
++        /* Critical section where we access the rings - have to lock this area */
++        spin_lock_irqsave(&ctx->s.async.rx_lock, flags);
+-    printk("%s connector(%p) Sent message counter: %d\n", V2VDRV_LOGTAG, vcc, vcc->tx_counter);
+-    printk("%s connector(%p) Received response counter: %d\n", V2VDRV_LOGTAG, vcc, vcc->rx_counter);
++        err = v2v_nc2_get_message(ctx->channel, (const volatile void **)&msg, &size, &vtype, &vflags);
++        if (err != 0) {
++            spin_unlock_irqrestore(&ctx->s.async.rx_lock, flags);
++            break;
++        }
+-    if (vcc->tx_counter != vcc->rx_counter)
+-        printk("%s connector(%p) WARNING Response count does not match the send count\n", V2VDRV_LOGTAG, vcc);
++        header = (struct v2vdrv_frame_header*)msg;
++        if (!v2vdrv_message_header_check("connector", header, size,
++                                         sizeof(struct v2vdrv_resp_internal))) {
++            v2v_nc2_finish_message(ctx->channel);
++            spin_unlock_irqrestore(&ctx->s.async.rx_lock, flags);
++            err = -EBADMSG;
++            break;
++        }
++
++        /* Make a local copy of the message header and run a checksum over the message */
++        memcpy(&lvri, (void*)msg, sizeof(struct v2vdrv_resp_internal));
++        sum = v2vdrv_checksum((const uint8_t*)msg, header->length);
++
++        v2v_nc2_finish_message(ctx->channel);
++        spin_unlock_irqrestore(&ctx->s.async.rx_lock, flags);
++
++        /* Critical section to test counter state */
++        spin_lock_irqsave(&ctx->s.async.tx_lock, flags);
++        
++        /* Update counter tracking state, we just sent one */
++        rxc = ++ctx->rx_counter;
++
++        /* See if we are done sending and receiving and set the completion event */
++        if ((ctx->tx_counter == ctx->config->xfer_count)&&(ctx->rx_counter == ctx->tx_counter)) {
++            ctx->s.async.term_status = V2V_TERM_COMPLETE;
++            v2v_set_wake_reason(ctx->channel, V2V_WAKE_REASON_TERMINATE);
++            last = 0;            
++        }
++        spin_unlock_irqrestore(&ctx->s.async.tx_lock, flags);
++
++        /* Out of critical section and the buffer in the ring has been released,
++           back; can't touch the msg any longer - do some testing and tracing */
++        printk("------ message status=%d\n", lvri.status);
++        pguid = &lvri.guid[0];
++        printk("------ GUID1=%2.2x:%2.2x:%2.2x:%2.2x:%2.2x:%2.2x:%2.2x:%2.2x\n",
++               pguid[0], pguid[1], pguid[2], pguid[3], pguid[4], pguid[5], pguid[6], pguid[7]);
++        printk("------ GUID2=%2.2x:%2.2x:%2.2x:%2.2x:%2.2x:%2.2x:%2.2x:%2.2x\n",
++               pguid[8], pguid[9], pguid[10], pguid[11], pguid[12], pguid[13], pguid[14], pguid[15]);
++        if (sum != 0)         
++            printk("%s connector(%p) bad checksumm on response #%d!!!\n", V2VDRV_LOGTAG, ctx, ctx->rx_counter);
++
++        if (last) {
++            err = 0;
++            break;
++        }         
++    } while (1);
++
++    /* If we ended the receive handler with this code, there are no more message right
++       now so we wait for an rx interrupt. If the status is success then it was the last
++       message. */
++    if (err == -ENODATA) {
++        /* No more messages */
++        printk("%s connector(%p) no more messages, returning\n", V2VDRV_LOGTAG, ctx);
++        return 0;
++    }
++    else if (err == 0) {
++        /* Last message received */
++        printk("%s connector(%p) last message sent, returning\n", V2VDRV_LOGTAG, ctx);
++        return 0;
++    }
++
++    printk("%s connector(%p) receive internal data failure; abort processing - error: %d\n",
++           V2VDRV_LOGTAG, ctx, err);
++    return err; /* failure */
+ }
+-void v2vdrv_run_connector(struct v2vdrv_config *config)
++static int
++v2vdrv_connector_process_internal_async_tx(struct v2vdrv_context *ctx)
+ {
+-    struct v2vdrv_connector_context *vcc;
+     int err;
++    unsigned long flags;
++    uint32_t txc = 0;
++    unsigned available;
++    volatile void *msg;
++    uint8_t *msgp;
++    struct v2vdrv_frame_header *header;
++    struct v2vdrv_post_internal *vpi;
++    uint8_t guid[16];
+-    vcc = kmalloc(sizeof(struct v2vdrv_connector_context), GFP_KERNEL);
+-    if (!vcc) {
+-        printk("%s connector out of memory\n", V2VDRV_LOGTAG);
++    /* Loop and send as many messages as possible. The internal message connector does not wait for
++       responses before posting more messages. */
++    do {
++        /* Make a GUID for the next message we send if any */
++        generate_random_uuid(&guid[0]);
++
++        /* Critical section where we access the rings - have to lock this area */
++        spin_lock_irqsave(&ctx->s.async.tx_lock, flags);
++
++        /* See if we are done sending, if so set a timer and exit */
++        if (ctx->tx_counter == ctx->config->xfer_count) {            
++            spin_unlock_irqrestore(&ctx->s.async.tx_lock, flags);
++            /* Set the expiry for waiting for responses now that all messages have been sent */
++            if (!timer_pending(&ctx->s.async.to_timer)) {
++                ctx->s.async.to_timer.expires = jiffies + ctx->config->timeout; /* in jiffies */
++                add_timer(&ctx->s.async.to_timer);
++                printk("%s connector(%p) finished sending message, setting timer and exiting\n", V2VDRV_LOGTAG, ctx);
++            }
++            else
++                printk("%s connector(%p) finished sending message, timer already pending???\n", V2VDRV_LOGTAG, ctx);
++
++            break;
++        }
++
++        available = v2v_nc2_producer_bytes_available(ctx->channel);
++        err = v2v_nc2_prep_message(ctx->channel, ctx->config->xfer_size, V2V_MESSAGE_TYPE_INTERNAL, 0, &msg);
++        if (err) {
++            spin_unlock_irqrestore(&ctx->s.async.tx_lock, flags);
++            if (err == -EAGAIN) {
++                /* No room right now, return and try again later */
++                printk("%s connector(%p) not enough buffer space to send message #%d; retry\n",
++                       V2VDRV_LOGTAG, ctx, ctx->tx_counter + 1);
++                err = 0;
++            }
++            else /* failure */
++                printk("%s connector(%p) transmit internal data failure; abort processing - error: %d\n",
++                       V2VDRV_LOGTAG, ctx, err);
++            break;
++        }
++
++        txc = ++ctx->tx_counter; /* next message */
++        header = (struct v2vdrv_frame_header*)msg;
++        header->id = (uint16_t)ctx->tx_counter;
++        header->type = V2V_MESSAGE_TYPE_INTERNAL;
++        header->cs = 0;
++        header->length = ctx->config->xfer_size;
++        vpi = (struct v2vdrv_post_internal*)msg;
++        memcpy(&vpi->guid[0], &guid, sizeof(guid));
++
++        /* Fill it up with some data and send it */
++        msgp = (uint8_t*)msg + sizeof(struct v2vdrv_post_internal);
++        memset(msgp, 'X', (ctx->config->xfer_size - sizeof(struct v2vdrv_post_internal)));
++        header->cs = v2vdrv_checksum((const uint8_t*)msg, ctx->config->xfer_size);
++        v2v_nc2_send_messages(ctx->channel);
++
++        spin_unlock_irqrestore(&ctx->s.async.tx_lock, flags);
++
++        printk("%s connector(%p) sent internal message #%d\n", V2VDRV_LOGTAG, ctx, txc);
++    } while (1);
++    
++    return err;
++}
++
++static void
++v2vdrv_connector_process_messages_async(struct v2vdrv_context *ctx)
++{
++    int done = 0;
++    struct v2v_wait *wait_state;
++    u8 reasons_mask = V2V_WAKE_REASON_CONTROL|V2V_WAKE_REASON_TERMINATE;
++    u8 reason;
++
++    printk("%s connector(%p) started ASYNC processing loop for transfer\n", V2VDRV_LOGTAG, ctx);
++
++    /* A transfer count of 0 is used to just test connecting and disconnecting
++       w/o sending any data */
++    if (ctx->config->xfer_count == 0) {      
++        printk("%s connector(%p) tranfer count set to 0; disconnecting.\n", V2VDRV_LOGTAG, ctx);
+         return;
+     }
+-    memset(vcc, 0, sizeof(struct v2vdrv_connector_context));
+-    vcc->config = config;
+-    err = v2vdrv_connect(vcc);
+-    if (err)
+-        return;
++    atomic_inc(&ctx->s.async.running);
++    wait_state = v2v_get_wait_state(ctx->channel);
+-    /* This runs the main processing loop, when it is done we disconnect
+-       and cleanup regardless of what may have occured */
+-    v2vdrv_connector_process_messages(vcc);
++    /* Send our first message to the listener to start things off */
++    schedule_work(&ctx->s.async.tx_work);
+-    v2vdrv_connector_disconnect(vcc);
++    /* Start out processing loop, wait for status changes */
++    do {
++        wait_event(wait_state->wait_event,
++                   atomic_xchg(&wait_state->wait_condition, 0) == 1);
++
++        do {
++            reason = v2v_get_wake_reason(ctx->channel, reasons_mask);
++
++            if (reason & V2V_WAKE_REASON_CONTROL) {
++                done = v2vdrv_status_check(ctx, "connector");
++                if (done)
++                    break;               
++            }
++            else if (reason & V2V_WAKE_REASON_TERMINATE) {
++                /* The terminate event may indicate an error or normal completion */
++                switch (ctx->s.async.term_status) {
++                case V2V_TERM_COMPLETE:
++                case V2V_TERM_TIMEOUT:
++                    printk("%s connector(%p) async handlers signalled a terminate for reason: %s; exiting.\n",
++                             V2VDRV_LOGTAG, ctx, (ctx->s.async.term_status == V2V_TERM_COMPLETE) ? "complete" : "timeout");
++                    break;
++                default:
++                    printk("%s connector(%p) async handlers signalled a terminate with error status=0x%x; exiting.\n",
++                             V2VDRV_LOGTAG, ctx, ctx->s.async.term_status);
++                }                
++                done = 1;
++                break;
++            }           
++        } while (reason != V2V_WAKE_REASON_NONE);
++
++        if (done)
++            break;
++
++    } while (1);
+-    kfree(vcc);
++    atomic_dec(&ctx->s.async.running);
++    del_timer_sync(&ctx->s.async.to_timer);
+ }
+-/************************* LISTENER *************************/
+-struct v2vdrv_listener_context {
+-    struct v2vdrv_config *config;
+-    struct v2v_channel *channel;
+-    uint32_t tx_counter;
+-    uint32_t rx_counter;
+-    struct v2vdrv_listener_resp_item *resp_list;
+-    struct v2vdrv_listener_resp_item *resp_tail;
+-};
++/************************* LISTENER **************************/
++static void v2vdrv_listener_process_messages_sync(struct v2vdrv_context *ctx);
++static void v2vdrv_listener_process_messages_async(struct v2vdrv_context *ctx);
+ static int
+-v2vdrv_listen_accept(struct v2vdrv_listener_context *vlc)
++v2vdrv_listen_accept(struct v2vdrv_context *ctx)
+ {
+     int err, err2;
+     /* Start the listener, get back a channel handle */
+-    err = v2v_listen(vlc->config->local_prefix, &vlc->channel, 0, 0);
++    err = v2v_listen(ctx->config->local_prefix, &ctx->channel, 0, 0, ctx->asvp);
+     if (err) {
+-        printk("%s listener(%p) failure in v2v_listen() - error: %d\n", V2VDRV_LOGTAG, vlc, err);
++        printk("%s listener(%p) failure in v2v_listen() - error: %d\n", V2VDRV_LOGTAG, ctx, err);
+         return err;
+     }
+-    BUG_ON(vlc->channel == NULL);
+-    printk("%s listener(%p) listener started, wait to accept...\n", V2VDRV_LOGTAG, vlc);
++    BUG_ON(ctx->channel == NULL);
++    printk("%s listener(%p) listener started, wait to accept...\n", V2VDRV_LOGTAG, ctx);
+     
+     /* Wait to accept the connection from the connector end */
+-    err = v2v_accept(vlc->channel);
++    err = v2v_accept(ctx->channel);
+     if (err) {
+         if (err != -ENOLINK)
+-            printk("%s listener(%p) failure in v2v_accept() - error: %d\n", V2VDRV_LOGTAG, vlc, err);
++            printk("%s listener(%p) failure in v2v_accept() - error: %d\n", V2VDRV_LOGTAG, ctx, err);
+         else
+-            printk("%s listener(%p) remote end disconnected while waiting to accept\n", V2VDRV_LOGTAG, vlc);
++            printk("%s listener(%p) remote end disconnected while waiting to accept\n", V2VDRV_LOGTAG, ctx);
+-        err2 = v2v_disconnect(vlc->channel);
++        err2 = v2v_disconnect(ctx->channel);
+         if (err2) {
+             printk("%s listener(%p) secondary failure in v2v_disconnect() after accept failed - error: %d\n",
+-                     V2VDRV_LOGTAG, vlc, err2);
++                     V2VDRV_LOGTAG, ctx, err2);
+         }
+         return err;
+     }
+    
+-    printk("%s listener(%p) accepted connection, ready to process incoming data.\n", V2VDRV_LOGTAG, vlc);
++    printk("%s listener(%p) accepted connection, ready to process incoming data.\n", V2VDRV_LOGTAG, ctx);
+     return 0;
+ }
++static void
++v2vdrv_listener_disconnect(struct v2vdrv_context *ctx)
++{
++    int err;
++    uint32_t i = 0;
++    struct v2vdrv_listener_resp_item *resp;
++
++    printk("%s listener(%p) Disconnecting...\n", V2VDRV_LOGTAG, ctx);
++    err = v2v_disconnect(ctx->channel);
++    printk("%s listener(%p) Disconnected - status: %d\n", V2VDRV_LOGTAG, ctx, err);
++
++    printk("%s listener(%p) Sent message counter: %d\n", V2VDRV_LOGTAG, ctx, ctx->tx_counter);
++    printk("%s listener(%p) Received response counter: %d\n", V2VDRV_LOGTAG, ctx, ctx->rx_counter);
++    if (ctx->tx_counter != ctx->rx_counter)
++        printk("%s listener(%p) WARNING Response count does not match the send count\n", V2VDRV_LOGTAG, ctx);
++
++    while (ctx->r.listener.resp_list) {
++        resp = ctx->r.listener.resp_list;
++        ctx->r.listener.resp_list = resp->next;
++        kfree(resp);
++        i++;
++    }
++    if (i > 0)
++        printk("%s listener(%p) WARNING Found %d unsent responses\n", V2VDRV_LOGTAG, ctx, i);
++}
++
++void v2vdrv_run_listener(struct v2vdrv_config *config)
++{
++    struct v2vdrv_context *ctx;
++    int err;
++
++    ctx = kmalloc(sizeof(struct v2vdrv_context), GFP_KERNEL);
++    if (!ctx) {
++        printk("%s listener out of memory\n", V2VDRV_LOGTAG);
++        return;
++    }
++    memset(ctx, 0, sizeof(struct v2vdrv_context));
++    ctx->config = config;
++
++    if (config->async)
++        v2vdrv_async_init(ctx);
++
++    if (!v2vdrv_input_sanity_check(ctx)) {
++        kfree(ctx);
++        return;
++    }
++
++    err = v2vdrv_listen_accept(ctx);
++    if (err) {
++        kfree(ctx);
++        return;
++    }     
++
++    /* This runs the main processing loop, when it is done we disconnect
++       and cleanup regardless of what may have occured */
++    if (config->async)
++        v2vdrv_listener_process_messages_async(ctx);
++    else
++        v2vdrv_listener_process_messages_sync(ctx);
++
++    v2vdrv_listener_disconnect(ctx);
++
++    kfree(ctx);
++}
++
++/********************** LISTENER SYNC ************************/
+ static int
+-v2vdrv_listener_process_internal_rx(struct v2vdrv_listener_context *vlc)
++v2vdrv_listener_process_internal_sync_rx(struct v2vdrv_context *ctx)
+ {
+     int err;
+     volatile void *msg;
+     size_t size;
+-    unsigned type;
+-    unsigned flags;
++    unsigned vtype;
++    unsigned vflags;
+     struct v2vdrv_frame_header *header;
+     struct v2vdrv_post_internal *vpi;
+     struct v2vdrv_listener_resp_item *vlri;
+     uint8_t sum;
+-    while ((err = v2v_nc2_get_message(vlc->channel, (const volatile void**)&msg, &size, &type, &flags))
++    if (ctx->config->fastrx)
++        v2v_nc2_request_fast_receive(ctx->channel);
++
++    while ((err = v2v_nc2_get_message(ctx->channel, (const volatile void**)&msg, &size, &vtype, &vflags))
+             == 0) {
+-        vlc->rx_counter++;   
++        ctx->rx_counter++;   
+         header = (struct v2vdrv_frame_header*)msg;
+         if (!v2vdrv_message_header_check("listener", header, size,
+-                                         sizeof(struct v2vdrv_post_internal),
+-                                         vlc->rx_counter)) {
+-            v2v_nc2_finish_message(vlc->channel);
++                                         sizeof(struct v2vdrv_post_internal))) {
++            v2v_nc2_finish_message(ctx->channel);
+             return -EBADMSG;
+         }        
+@@ -490,7 +874,7 @@ v2vdrv_listener_process_internal_rx(struct v2vdrv_listener_context *vlc)
+         sum = v2vdrv_checksum((const uint8_t*)msg, header->length);
+         if (sum != 0)
+-            printk("%s listener(%p) bad checksumm on message #%d!!!\n", V2VDRV_LOGTAG, vlc, vlc->rx_counter);
++            printk("%s listener(%p) bad checksumm on message #%d!!!\n", V2VDRV_LOGTAG, ctx, ctx->rx_counter);
+         /* Queue a response */
+         vlri = kmalloc(sizeof(struct v2vdrv_listener_resp_item), GFP_KERNEL);
+@@ -501,35 +885,38 @@ v2vdrv_listener_process_internal_rx(struct v2vdrv_listener_context *vlc)
+             vlri->resp.header.cs = 0;
+             vlri->resp.header.length = sizeof(struct v2vdrv_resp_internal); /* header + resp data */
+             vlri->resp.status = (sum == 0 ? V2V_MESSAGE_STATUS_OK : V2V_MESSAGE_STATUS_BADCS);
+-            memcpy(&vlri->resp.guid, vpi->guid, 16);
++            memcpy(&vlri->resp.guid[0], vpi->guid, sizeof(vlri->resp.guid));
+             vlri->resp.header.cs = v2vdrv_checksum((const uint8_t*)vlri, sizeof(struct v2vdrv_resp_internal));
+-            if (vlc->resp_list) {
+-                vlc->resp_tail->next = vlri;
+-                vlc->resp_tail = vlri;
++            if (ctx->r.listener.resp_list) {
++                ctx->r.listener.resp_tail->next = vlri;
++                ctx->r.listener.resp_tail = vlri;
+             }
+             else {
+-                vlc->resp_list = vlri;
+-                vlc->resp_tail = vlri;
++                ctx->r.listener.resp_list = vlri;
++                ctx->r.listener.resp_tail = vlri;
+             }            
+         }
+         else
+-            printk("%s listener(%p) cannot queue response; out of memory\n", V2VDRV_LOGTAG, vlc);
++            printk("%s listener(%p) cannot queue response; out of memory\n", V2VDRV_LOGTAG, ctx);
+-        v2v_nc2_finish_message(vlc->channel);
++        v2v_nc2_finish_message(ctx->channel);
+     }
++    if (ctx->config->fastrx)
++        v2v_nc2_cancel_fast_receive(ctx->channel);
++
+     if (err == -ENODATA) {
+         /* No more messages */
+-        printk("%s listener(%p) no more messages, returning\n", V2VDRV_LOGTAG, vlc);
++        printk("%s listener(%p) no more messages, returning\n", V2VDRV_LOGTAG, ctx);
+         return 0;
+     }
+     printk("%s listener(%p) receive internal data failure; abort processing - error: %d\n",
+-           V2VDRV_LOGTAG, vlc, err);
++           V2VDRV_LOGTAG, ctx, err);
+     return err; /* failure */
+ }
+ static int
+-v2vdrv_listener_process_internal_tx(struct v2vdrv_listener_context *vlc)
++v2vdrv_listener_process_internal_sync_tx(struct v2vdrv_context *ctx)
+ {
+     int err;
+     unsigned available;
+@@ -537,59 +924,60 @@ v2vdrv_listener_process_internal_tx(struct v2vdrv_listener_context *vlc)
+     uint8_t *msgp;
+     struct v2vdrv_listener_resp_item *vlri; 
+-    printk("%s listener(%p) sending internal response #%d\n", V2VDRV_LOGTAG, vlc, vlc->tx_counter + 1);
+-    available = v2v_nc2_producer_bytes_available(vlc->channel);
+-    printk("%s listener(%p) channel indicates minimum bytes available: 0x%x\n", V2VDRV_LOGTAG, vlc, available);
+-    BUG_ON(vlc->resp_list == NULL);
++    printk("%s listener(%p) sending internal response #%d\n", V2VDRV_LOGTAG, ctx, ctx->tx_counter + 1);
++    available = v2v_nc2_producer_bytes_available(ctx->channel);
++    printk("%s listener(%p) channel indicates minimum bytes available: 0x%x\n", V2VDRV_LOGTAG, ctx, available);
++    BUG_ON(ctx->r.listener.resp_list == NULL);
++
++    /* No resizing fixed responses for fastrx */
+-    err = v2v_nc2_prep_message(vlc->channel, sizeof(struct v2vdrv_resp_internal), V2V_MESSAGE_TYPE_INTERNAL, 0, &msg);
++    err = v2v_nc2_prep_message(ctx->channel, sizeof(struct v2vdrv_resp_internal), V2V_MESSAGE_TYPE_INTERNAL, 0, &msg);
+     if (err) {
+         if (err == -EAGAIN) {
+             /* No room right now, return and try again later */
+             printk("%s listener(%p) not enough buffer space to send response #%d; retry\n",
+-                   V2VDRV_LOGTAG, vlc, vlc->tx_counter + 1);
++                   V2VDRV_LOGTAG, ctx, ctx->tx_counter + 1);
+             return -EAGAIN;
+         }
+         printk("%s listener(%p) transmit internal response failure; abort processing - error: %d\n",
+-               V2VDRV_LOGTAG, vlc, err);
++               V2VDRV_LOGTAG, ctx, err);
+         return err; /* failure */
+     }
+-    vlc->tx_counter++; /* next message */
+-    vlri = vlc->resp_list;
+-    vlc->resp_list = vlri->next;
+-    if (!vlc->resp_list)
+-         vlc->resp_tail = NULL;
+-
+-    /* Response already formed, just copy it in */
+-    mb();
++    ctx->tx_counter++; /* next message */
++    vlri = ctx->r.listener.resp_list;
++    ctx->r.listener.resp_list = vlri->next;
++    if (!ctx->r.listener.resp_list)
++         ctx->r.listener.resp_tail = NULL;
++
++    /* Response already formed, just copy it in */   
+     msgp = (uint8_t*)msg;
+     memcpy(msgp, vlri, sizeof(struct v2vdrv_resp_internal));
+-    mb();
+     kfree(vlri);
+-    v2v_nc2_send_messages(vlc->channel);
++    v2v_nc2_send_messages(ctx->channel);
+     /* Keep the send loop going by setting the event. If there is no more room, the prep message call
+        will return ERROR_RETRY and just land us back in the wait. */
+-    v2v_set_wake_reason(vlc->channel, V2V_WAKE_REASON_SEND);
++    v2v_set_wake_reason(ctx->channel, V2V_WAKE_REASON_SEND);
+     
+     return 0;
+ }
+ static void
+-v2vdrv_listener_process_messages(struct v2vdrv_listener_context *vlc)
++v2vdrv_listener_process_messages_sync(struct v2vdrv_context *ctx)
+ {
+     int err = 0, done = 0;
+-    enum v2v_endpoint_state state;
+     struct v2v_wait *wait_state;
+     u8 reasons_mask = V2V_WAKE_REASON_CONTROL|V2V_WAKE_REASON_RECEIVE;
+-    u8 reasons;
++    u8 reason;
+-    wait_state = v2v_get_wait_state(vlc->channel);
++    printk("%s listener(%p) started SYNC processing loop for transfer\n", V2VDRV_LOGTAG, ctx);
++
++    wait_state = v2v_get_wait_state(ctx->channel);
+     /* Start out processing loop, wait for message */
+     do {
+-        if (vlc->resp_list)
++        if (ctx->r.listener.resp_list)
+             reasons_mask = V2V_WAKE_REASON_CONTROL|V2V_WAKE_REASON_RECEIVE|V2V_WAKE_REASON_SEND;
+         else
+             reasons_mask = V2V_WAKE_REASON_CONTROL|V2V_WAKE_REASON_RECEIVE;
+@@ -597,102 +985,355 @@ v2vdrv_listener_process_messages(struct v2vdrv_listener_context *vlc)
+         wait_event(wait_state->wait_event, atomic_xchg(&wait_state->wait_condition, 0) == 1);
+         do {
+-            reasons = v2v_get_wake_reason(vlc->channel, reasons_mask);
++            reason = v2v_get_wake_reason(ctx->channel, reasons_mask);
+             
+-            if (reasons & V2V_WAKE_REASON_CONTROL) {
+-                err = v2v_get_remote_state(vlc->channel, &state);
+-                if (err) {
+-                    printk("%s listener(%p) failure in v2v_get_remote_state(); aborting - error: %d\n",
+-                           V2VDRV_LOGTAG, vlc, err);
+-                    done = 1;
++            if (reason & V2V_WAKE_REASON_CONTROL) {
++                done = v2vdrv_status_check(ctx, "listener");
++                if (done)
+                     break;
+-                }
+-                printk("%s listener(%p) state changed for other end - new state: %s\n",
+-                       V2VDRV_LOGTAG, vlc, v2v_endpoint_state_name(state));
+-                if (v2v_state_requests_disconnect(state)) {
+-                    printk("%s listener(%p) main processing loop ending for disconnect request...\n",
+-                           V2VDRV_LOGTAG, vlc);
+-                    err = 0;
+-                    done = 1;
+-                    break;
+-                }
+             }
+-
+-            if (reasons & V2V_WAKE_REASON_SEND) {
+-                if (vlc->resp_list) {
+-                    err = v2vdrv_listener_process_internal_tx(vlc);
++            else if (reason & V2V_WAKE_REASON_SEND) {
++                if (ctx->r.listener.resp_list) {
++                    err = v2vdrv_listener_process_internal_sync_tx(ctx);
+                     if ((err != 0)&&(err != -EAGAIN)) {
+                         done = 1;
+                         break;
+                     }
+                 }
+             }
+-
+-            if (reasons & V2V_WAKE_REASON_RECEIVE) {
+-                err = v2vdrv_listener_process_internal_rx(vlc);
++            else if (reason & V2V_WAKE_REASON_RECEIVE) {
++                err = v2vdrv_listener_process_internal_sync_rx(ctx);
+                 if (err) {
+                     done = 1;
+                     break;
+                 }
+                 /* we now have data, set the event again to process sends */
+-                v2v_set_wake_reason(vlc->channel, V2V_WAKE_REASON_SEND);
++                v2v_set_wake_reason(ctx->channel, V2V_WAKE_REASON_SEND);
+             }
+-        } while (reasons != V2V_WAKE_REASON_NONE);
++        } while (reason != V2V_WAKE_REASON_NONE);
+         if (done)
+             break;
+     } while (1);
+ }
+-static void
+-v2vdrv_listener_disconnect(struct v2vdrv_listener_context *vlc)
++/********************* LISTENER ASYNC ************************/
++static int v2vdrv_listener_process_internal_async_tx(struct v2vdrv_context *ctx);
++
++static int
++v2vdrv_listener_process_internal_async_rx(struct v2vdrv_context *ctx)
+ {
+     int err;
+-    uint32_t i = 0;
+-    struct v2vdrv_listener_resp_item *resp;
++    unsigned long flags;
++    uint32_t rxc = 0;
++    volatile void *msg;
++    size_t size;
++    unsigned vtype;
++    unsigned vflags;
++    struct v2vdrv_frame_header *header;
++    struct v2vdrv_post_internal *vpi;
++    struct v2vdrv_listener_resp_item *vlri;
++    uint8_t *pguid;
++    uint8_t sum;
+-    printk("%s listener(%p) Disconnecting...\n", V2VDRV_LOGTAG, vlc);
+-    err = v2v_disconnect(vlc->channel);
+-    printk("%s listener(%p) Disconnected - status: %d\n", V2VDRV_LOGTAG, vlc, err);
++    do {
++        /* Create one up front, drop out if we can't */
++        vlri = kmalloc(sizeof(struct v2vdrv_listener_resp_item), GFP_KERNEL);
++        if (vlri == NULL) {
++            printk("%s listener(%p) cannot queue response; out of memory\n", V2VDRV_LOGTAG, ctx);
++            err = -ENOMEM;
++            break;
++        }
+-    printk("%s listener(%p) Sent message counter: %d\n", V2VDRV_LOGTAG, vlc, vlc->tx_counter);
+-    printk("%s listener(%p) Received response counter: %d\n", V2VDRV_LOGTAG, vlc, vlc->rx_counter);
+-    if (vlc->tx_counter != vlc->rx_counter)
+-        printk("%s listener(%p) WARNING Response count does not match the send count\n", V2VDRV_LOGTAG, vlc);
++        /* Critical section where we access the rings - have to lock this area */
++        spin_lock_irqsave(&ctx->s.async.rx_lock, flags);
+-    while (vlc->resp_list) {
+-        resp = vlc->resp_list;
+-        vlc->resp_list = resp->next;
+-        kfree(resp);
+-        i++;
++        err = v2v_nc2_get_message(ctx->channel, (const volatile void**)&msg, &size, &vtype, &vflags);
++        if (err != 0) {
++            spin_unlock_irqrestore(&ctx->s.async.rx_lock, flags);
++            kfree(vlri);
++            break;
++        }
++
++        rxc = ++ctx->rx_counter;
++        header = (struct v2vdrv_frame_header*)msg;
++        if (!v2vdrv_message_header_check("listener", header, size,
++                                         sizeof(struct v2vdrv_post_internal))) {
++            v2v_nc2_finish_message(ctx->channel);            
++            spin_unlock_irqrestore(&ctx->s.async.rx_lock, flags);
++            kfree(vlri);
++            err = -EBADMSG;
++            break;
++        }
++
++        vpi = (struct v2vdrv_post_internal*)msg;
++        pguid = &vpi->guid[0];
++
++        /* Save the GUID and checksum and exit the critical section */
++        vlri->resp.header.id = header->id;        
++        memcpy(&vlri->resp.guid[0], pguid, sizeof(vlri->resp.guid));
++        sum = v2vdrv_checksum((const uint8_t*)msg, header->length);       
++
++        v2v_nc2_finish_message(ctx->channel); 
++        spin_unlock_irqrestore(&ctx->s.async.rx_lock, flags);
++
++        /* Out of critical section and the buffer in the ring has been released,
++         * back; can't touch the msg any longer - do some testing and tracing */
++        pguid = &vlri->resp.guid[0];
++        printk("------ GUID1=%2.2x:%2.2x:%2.2x:%2.2x:%2.2x:%2.2x:%2.2x:%2.2x\n",
++               pguid[0], pguid[1], pguid[2], pguid[3],
++               pguid[4], pguid[5], pguid[6], pguid[7]);
++        printk("------ GUID2=%2.2x:%2.2x:%2.2x:%2.2x:%2.2x:%2.2x:%2.2x:%2.2x\n",
++               pguid[8], pguid[9], pguid[10], pguid[11], 
++               pguid[12], pguid[13], pguid[14], pguid[15]);
++        if (sum != 0)
++            printk("%s listener(%p) bad checksumm on message #%d!!!\n", V2VDRV_LOGTAG, ctx, ctx->rx_counter);
++
++        /* Finish setting up the response and queue it for sending */
++        vlri->next = NULL;
++        vlri->resp.header.type = V2V_MESSAGE_TYPE_INTERNAL;
++        vlri->resp.header.cs = 0;
++        vlri->resp.header.length = sizeof(struct v2vdrv_resp_internal); /* header + resp data */
++        vlri->resp.status = (sum == 0 ? V2V_MESSAGE_STATUS_OK : V2V_MESSAGE_STATUS_BADCS);
++        vlri->resp.header.cs = v2vdrv_checksum((const uint8_t*)vlri, sizeof(struct v2vdrv_resp_internal));
++
++        spin_lock_irqsave(&ctx->s.async.tx_lock, flags);
++        if (ctx->r.listener.resp_list) {
++            ctx->r.listener.resp_tail->next = vlri;
++            ctx->r.listener.resp_tail = vlri;
++        }
++        else {
++            ctx->r.listener.resp_list = vlri;
++            ctx->r.listener.resp_tail = vlri;
++        }
++        spin_unlock_irqrestore(&ctx->s.async.tx_lock, flags);
++
++    } while (1);
++
++    /* If we ended the receive handler with this code, there are no more message right
++       now. At this point we can send out any queued responses we might have queued */
++    if (err == -ENODATA) {
++        /* No more messages */
++        printk("%s listener(%p) no more messages, calling TX processor\n", V2VDRV_LOGTAG, ctx);
++        return v2vdrv_listener_process_internal_async_tx(ctx);
+     }
+-    if (i > 0)
+-        printk("%s listener(%p) WARNING Found %d unsent responses\n", V2VDRV_LOGTAG, vlc, i);
++
++    printk("%s listener(%p) receive internal data failure; abort processing - error: %d\n",
++           V2VDRV_LOGTAG, ctx, err);
++    return err; /* failure */
+ }
+-void v2vdrv_run_listener(struct v2vdrv_config *config)
++static int
++v2vdrv_listener_process_internal_async_tx(struct v2vdrv_context *ctx)
+ {
+-    struct v2vdrv_listener_context *vlc;
+     int err;
++    unsigned long flags;
++    uint32_t txc = 0;
++    unsigned available;
++    volatile void *msg;
++    uint8_t *msgp;
++    struct v2vdrv_listener_resp_item *vlri; 
+-    vlc = kmalloc(sizeof(struct v2vdrv_listener_context), GFP_KERNEL);
+-    if (!vlc) {
+-        printk("%s listener out of memory\n", V2VDRV_LOGTAG);
+-        return;
++    /* Loop and send any queued response */
++    do {
++        /* Critical section where we access the rings - have to lock this area */
++        spin_lock_irqsave(&ctx->s.async.tx_lock, flags);
++
++        if (ctx->r.listener.resp_list == NULL) {
++            spin_unlock_irqrestore(&ctx->s.async.tx_lock, flags);
++            printk("%s listener(%p) no responses to send in internal TX handler; exiting\n", V2VDRV_LOGTAG, ctx);            
++            err = 0;
++            break;
++        }
++
++        available = v2v_nc2_producer_bytes_available(ctx->channel);
++        err = v2v_nc2_prep_message(ctx->channel, sizeof(struct v2vdrv_resp_internal), V2V_MESSAGE_TYPE_INTERNAL, 0, &msg);
++        if (err) {
++            spin_unlock_irqrestore(&ctx->s.async.tx_lock, flags);
++            if (err == -EAGAIN) {
++                /* No room right now, return and try again later */
++                printk("%s listener(%p) not enough buffer space to send message #%d; retry\n",
++                       V2VDRV_LOGTAG, ctx, ctx->tx_counter + 1);
++                err = 0;
++            }
++            else /* failure */
++                printk("%s listener(%p) transmit internal data failure; abort processing - error: %d\n",
++                       V2VDRV_LOGTAG, ctx, err);
++            break;
++        }
++
++        txc = ++ctx->tx_counter; /* next message */
++        vlri = ctx->r.listener.resp_list;
++        ctx->r.listener.resp_list = vlri->next;
++        if (!ctx->r.listener.resp_list)
++            ctx->r.listener.resp_tail = NULL;
++        /* Response already formed, just copy it in */        
++        msgp = (uint8_t*)msg;
++        memcpy(msgp, vlri, sizeof(struct v2vdrv_resp_internal));
++        kfree(vlri);
++
++        v2v_nc2_send_messages(ctx->channel);
++        spin_unlock_irqrestore(&ctx->s.async.tx_lock, flags);
++
++        printk("%s listener(%p) sent internal response #%d\n", V2VDRV_LOGTAG, ctx, txc);
++    } while (1);
++
++    return err;
++}
++
++static void
++v2vdrv_listener_process_messages_async(struct v2vdrv_context *ctx)
++{
++    int done = 0;
++    struct v2v_wait *wait_state;
++    u8 reasons_mask = V2V_WAKE_REASON_CONTROL|V2V_WAKE_REASON_TERMINATE;
++    u8 reason;
++
++    printk("%s listener(%p) started ASYNC processing loop for transfer\n", V2VDRV_LOGTAG, ctx);
++
++    atomic_inc(&ctx->s.async.running);
++    wait_state = v2v_get_wait_state(ctx->channel);
++
++    /* Start out processing loop, wait for status changes */
++    do {
++        wait_event(wait_state->wait_event,
++                   atomic_xchg(&wait_state->wait_condition, 0) == 1);
++
++        do {
++            reason = v2v_get_wake_reason(ctx->channel, reasons_mask);
++
++            if (reason & V2V_WAKE_REASON_CONTROL) {
++                done = v2vdrv_status_check(ctx, "listener");
++                if (done)
++                    break;               
++            }
++            else if (reason & V2V_WAKE_REASON_TERMINATE) {
++                /* The terminate event indicates an error since the listener side does not
++                   complete or timeout through this means */
++                printk("%s listener(%p) async handlers signalled a terminate with error status=0x%x; exiting.\n",
++                        V2VDRV_LOGTAG, ctx, ctx->s.async.term_status);
++                done = 1;
++                break;
++            }           
++        } while (reason != V2V_WAKE_REASON_NONE);
++
++        if (done)
++            break;
++
++    } while (1);
++
++    atomic_dec(&ctx->s.async.running);
++}
++
++/********************** ASYNC COMMON *************************/
++static void
++v2vdrv_rx_work(struct work_struct *arg)
++{
++      struct v2vdrv_context *ctx = container_of(arg, struct v2vdrv_context, s.async.rx_work);
++    int err;
++      
++    if (ctx->config->role == role_listener)
++        err = v2vdrv_listener_process_internal_async_rx(ctx);
++    else
++        err = v2vdrv_connector_process_internal_async_rx(ctx);
++
++    if (err) {
++        ctx->s.async.term_status = V2V_TERM_TX_ERROR;
++        v2v_set_wake_reason(ctx->channel, V2V_WAKE_REASON_TERMINATE);
+     }
+-    memset(vlc, 0, sizeof(struct v2vdrv_listener_context));
+-    vlc->config = config;
++}
+-    err = v2vdrv_listen_accept(vlc);
+-    if (err)
++static void
++v2vdrv_tx_work(struct work_struct *arg)
++{
++      struct v2vdrv_context *ctx = container_of(arg, struct v2vdrv_context, s.async.tx_work);
++    int err;
++
++      if (ctx->config->role == role_listener)
++        err = v2vdrv_listener_process_internal_async_tx(ctx);
++    else
++        err = v2vdrv_connector_process_internal_async_tx(ctx);
++    
++    if (err) {
++        ctx->s.async.term_status = V2V_TERM_TX_ERROR;
++        v2v_set_wake_reason(ctx->channel, V2V_WAKE_REASON_TERMINATE);
++    }
++}
++
++static void
++v2vdrv_timeout_cb(unsigned long ptr)
++{
++    struct v2vdrv_context *ctx = (struct v2vdrv_context *)ptr;
++
++    ctx->s.async.term_status = V2V_TERM_TIMEOUT;
++    v2v_set_wake_reason(ctx->channel, V2V_WAKE_REASON_TERMINATE);
++}
++
++static void
++v2vdrv_receive_int(void *receive_ctx)
++{
++    struct v2vdrv_context *ctx = receive_ctx;
++
++    /* Avoid spurious interrupts before initialization is complete */
++    if (atomic_read(&ctx->s.async.running) == 0)
+         return;
+-    /* This runs the main processing loop, when it is done we disconnect
+-       and cleanup regardless of what may have occured */
+-    v2vdrv_listener_process_messages(vlc);
++    /* The V2V async APIs allow raw interrupt service routines to be 
++       registered to allow maximum flexibility to the client. In almost all
++       cases though, the best practice is to dispatch the work to a bottom
++       half and complete the irq asap. */
++    schedule_work(&ctx->s.async.rx_work);
++}
++
++static void
++v2vdrv_send_int(void *send_ctx)
++{
++    struct v2vdrv_context *ctx = send_ctx;
++
++    /* Avoid spurious interrupts before initialization is complete */
++    if (atomic_read(&ctx->s.async.running) == 0)
++        return;
++
++    /* See comments for v2vdrv_receive_int() */
++    schedule_work(&ctx->s.async.tx_work);
++}
+-    v2vdrv_listener_disconnect(vlc);
++static void
++v2vdrv_control_cb(void *control_ctx)
++{
++    struct v2vdrv_context *ctx = control_ctx;
+-    kfree(vlc);
++    /* Avoid xenstore events before initialization is complete */
++    if (atomic_read(&ctx->s.async.running) == 0)
++        return;
++
++    /* The async V2V functionality provides a way to register an
++     * async callback for control message processing as an alternative
++     * to waiting on the control event. For purposes of the sample,
++     * we will use the control event to keep the processing thread
++     * busy waiting for events.
++     */    
++    v2vdrv_status_check(ctx, "ctrlcb");
+ }
++static void
++v2vdrv_async_init(struct v2vdrv_context *ctx)
++{
++    ctx->s.async.asv.receive_int = v2vdrv_receive_int;
++    ctx->s.async.asv.receive_ctx = ctx;
++    ctx->s.async.asv.send_int = v2vdrv_send_int;
++    ctx->s.async.asv.send_ctx = ctx;
++    ctx->s.async.asv.control_cb = v2vdrv_control_cb;
++    ctx->s.async.asv.control_ctx = ctx;
++    atomic_set(&ctx->s.async.running, 0);
++    INIT_WORK(&ctx->s.async.rx_work, v2vdrv_rx_work);
++    INIT_WORK(&ctx->s.async.tx_work, v2vdrv_tx_work);
++    if (ctx->config->role == role_connector) {
++        init_timer(&ctx->s.async.to_timer);
++        ctx->s.async.to_timer.data = (unsigned long)ctx;
++        ctx->s.async.to_timer.function = v2vdrv_timeout_cb;
++    }
++    spin_lock_init(&ctx->s.async.rx_lock);
++    spin_lock_init(&ctx->s.async.tx_lock);
++    ctx->asvp = &ctx->s.async.asv;
++}
+diff --git a/drivers/xen/v2v/v2vutl.c b/drivers/xen/v2v/v2vutl.c
+index 3dc36c7..454157c 100644
+--- a/drivers/xen/v2v/v2vutl.c
++++ b/drivers/xen/v2v/v2vutl.c
+@@ -223,4 +223,3 @@ v2v_xenops_grant_unmap(struct vm_struct *vm_area, grant_handle_t *ghandles, unsi
+     free_vm_area(vm_area);
+ }
+-
+diff --git a/include/xen/v2v.h b/include/xen/v2v.h
+index abef7be..5233f80 100644
+--- a/include/xen/v2v.h
++++ b/include/xen/v2v.h
+@@ -49,6 +49,43 @@
+  */
+ struct v2v_channel;
++/* Input structure used for initilializing asynchronous v2v 
++ * comminucations.  The @receive_int and @send_int values must be provided.
++ * These routines will be called back for receive and send event 
++ * notification.  The @control_cb may or may not be set depending on whether
++ * the caller wants to get control callbacks or use the control event through
++ * returned by v2v_get_control_event to process changes in control state.  
++ * The @control_ctx will be passed in to the control callback routine when 
++ * called.
++ *
++ * Note that the @receive_int and @send_int callbacks are actually in the 
++ * context of the interrupt service routines for the event channels. This is
++ * done to allow the v2v client the most flexibility in processing events
++ * including doing work in the interrupt context (though this should be
++ * avoided). What this mainly allows is for the client to determine what
++ * bottom half processing it wants to emply. In most cases, the client's
++ * event handler callbacks would simply do something like the following:
++ *
++ *  schedule_work(&my_context->my_work);
++ *
++ * And immediately returning; doing the actual event processing in the
++ * work handler.
++ *
++ * Listening, connecting and accepting the v2v channel for asynchronous
++ * operations is done the same way as for synchronous operations (except
++ * for providing this structure to the APIs). Once the channel is established,
++ * the client using v2v in asynchronous mode will be called back to indicate
++ * receipt of data or space availability for further sending.
++ */
++struct v2v_async {
++    void (*receive_int)(void *receive_ctx);
++    void *receive_ctx;
++    void (*send_int)(void *send_ctx);
++    void *send_ctx;
++    void (*control_cb)(void *control_ctx);
++    void *control_ctx;
++};
++
+ /* Wait state structure returned by v2v_get_wait_state. See this
+  * function for more details.
+  */
+@@ -67,12 +104,17 @@ struct v2v_wait {
+  * It is generally a mistake to have several processes listening on
+  * the same xenbus prefix.
+  *
++ * Passing a valid @async_values structure to this routine will
++ * initialize the endpoint for ascynchronous communications operation.
++ * To use synchronous operations, NULL should be passed.
++ *
+  * Returns 0 on success and an appropriate errno code on failure.
+  */
+ int v2v_listen(const char *xenbus_prefix,
+                struct v2v_channel **channel,
+                unsigned prod_ring_page_order,
+-               unsigned cons_ring_page_order);
++               unsigned cons_ring_page_order,
++               struct v2v_async *async_values);
+ /* Wait for a remote domain to connect to the channel @channel, which
+  * should have been allocated with v2v_listen().
+@@ -90,10 +132,15 @@ int v2v_accept(struct v2v_channel *channel);
+  * This will fail if the remote endpoint is not currently listening on
+  * the specified channel.
+  *
++ * Passing a valid @async_values structure to this routine will
++ * initialize the endpoint for ascynchronous communications operation.
++ * To use synchronous operations, NULL should be passed.
++ *
+  * Returns 0 on success and an appropriate errno code on failure.
+  */
+ int v2v_connect(const char *xenbus_prefix,
+-                struct v2v_channel **channel);
++                struct v2v_channel **channel,
++                struct v2v_async *async_values);
+ /* Disconnect from a VM-to-VM channel @channel which was previously
+  * established using either v2v_connect() or v2v_listen().  The channel
+@@ -202,11 +249,16 @@ const char *v2v_endpoint_state_name(enum v2v_endpoint_state state);
+ /* Wake reasons. The values can be OR'd together in the functions below
+  * to specify more than one wait reason. 
+  */
+-#define V2V_WAKE_REASON_NONE    0x00
+-#define V2V_WAKE_REASON_CONTROL 0x01
+-#define V2V_WAKE_REASON_SEND    0x02
+-#define V2V_WAKE_REASON_RECEIVE 0x04
+-#define V2V_WAKE_REASON_ANY     0xFF
++#define V2V_WAKE_REASON_NONE     0x00
++#define V2V_WAKE_REASON_CONTROL  0x01
++#define V2V_WAKE_REASON_SEND     0x02
++#define V2V_WAKE_REASON_RECEIVE  0x04
++#define V2V_WAKE_REASON_RESERVED 0x08
++#define V2V_WAKE_REASON_USER1    0x10
++#define V2V_WAKE_REASON_USER2    0x20
++#define V2V_WAKE_REASON_USER3    0x40
++#define V2V_WAKE_REASON_USER4    0x80
++#define V2V_WAKE_REASON_ANY      0xFF
+ /* Retrieve the wait state object for the @channel.  This object allows
+  * a thread to wait until one of several conditions is true.  Normally
+@@ -235,19 +287,29 @@ const char *v2v_endpoint_state_name(enum v2v_endpoint_state state);
+  * V2V_WAKE_REASON_RECEIVE: This reason is set when data is recieved.  Note 
+  * that it may sometimes be set when there are no messages incoming, and
+  * will remain so until v2v_nc2_get_message() is called.
++ *
++ * V2V_WAKE_REASON_USERn: These 4 wait reasons can be used by clients
++ * to add user defined events to the queue. These wake reasons will
++ * never be set internally by the V2V library.
++ *
++ * For asynchronous operation, the wait state object will be used only
++ * for V2V_WAKE_REASON_CONTROL events.
+  */
+ struct v2v_wait *v2v_get_wait_state(struct v2v_channel *channel);
+ /* Retrive the @reasons that a wait was satisfied.  The @reasons value
+- * is an OR'ed list list of the V2V_WAKE_REASONE_* values above.  Each
++ * is an OR'ed list list of the V2V_WAKE_REASON_* values above.  Each
+  * call to v2v_get_wake_reason will return that next wake reason in the 
+  * internal queue that matches the @reasons mask.  This routine can be
+  * called multiple times before and after a wait depending on the specific
+  * use case involved.
+  *
+- * Returns one of V2V_WAKE_REASONE_CONTROL, V2V_WAKE_REASONE_SEND or
+- * V2V_WAKE_REASONE_RECEIVE from the internal queue. When no more wake
++ * Returns one of V2V_WAKE_REASON_CONTROL, V2V_WAKE_REASON_SEND or
++ * V2V_WAKE_REASON_RECEIVE from the internal queue. When no more wake
+  * reasons are present, V2V_WAKE_REASON_NONE is returned.
++ *
++ * For asynchronous operation, only V2V_WAKE_REASON_CONTROL or
++ * V2V_WAKE_REASON_NONE would be returned.
+  */
+ u8 v2v_get_wake_reason(struct v2v_channel *channel, u8 reasons);
+@@ -258,7 +320,7 @@ u8 v2v_get_wake_reason(struct v2v_channel *channel, u8 reasons);
+  */
+ int v2v_set_wake_reason(struct v2v_channel *channel, u8 reason);
+-/* This routine can be used to explicitly set a wake @reason in the 
++/* This routine can be used to explicitly clear wake @reasons in the 
+  * internal queue.
+  */
+ void v2v_clear_wake_reason(struct v2v_channel *channel, u8 reasons);
+@@ -271,6 +333,11 @@ void v2v_clear_wake_reason(struct v2v_channel *channel, u8 reasons);
+  * written to, and a malicious remote could cause its contents to
+  * change at any time.
+  *
++ * When using asynchronous operations mode, the caller must provide 
++ * locking (mutual exclusion) to this routine and the finalizing call 
++ * to v2v_nc2_finish_message() within the same locked section (using the 
++ * same synchronization object).
++ *
+  * Returns 0 on success. If no more messages are available, -ENODATA
+  * is returned. On failure an errno error code is returned.
+  *
+@@ -288,6 +355,11 @@ int v2v_nc2_get_message(struct v2v_channel *channel,
+  * released, so that the remote can use it to send another message,
+  * and the channel advances to the next incoming message.
+  *
++ * When using asynchronous operations mode, the caller must provide 
++ * locking (mutual exclusion) to this routine and the initializing call 
++ * to v2v_nc2_get_message() within the same locked section (using the 
++ * same synchronization object).
++ *
+  * The payload returned by v2v_nc2_get_message() must not be touched
+  * once this returns.
+  */
+@@ -302,6 +374,11 @@ void v2v_nc2_finish_message(struct v2v_channel *channel);
+  * The message is not actually sent until v2v_nc2_send_messages() is
+  * called.
+  *
++ * When using asynchronous operations mode, the caller must provide 
++ * locking (mutual exclusion) to this routine and the finalizing call 
++ * to v2v_nc2_send_messages() within the same locked section (using the 
++ * same synchronization object).
++ *
+  * If there is insufficient space in the ring, this routine requests
+  * that the remote endpoint set the local ring's send event when more
+  * space becomes available, the call returns -EAGAIN.
+@@ -319,6 +396,11 @@ int v2v_nc2_prep_message(struct v2v_channel *channel,
+  * (incorrectly behaving remote endpoints can always look ahead) and
+  * the remote is woken up if appropriate.
+  *
++ * When using asynchronous operations mode, the caller must provide 
++ * locking (mutual exclusion) to this routine and the initializing call 
++ * to v2v_nc2_prep_message() within the same locked section (using the 
++ * same synchronization object).
++ *
+  * The client must not touch the payload pointers returned by
+  * v2v_nc2_prep_message() after calling this function.
+  */