]> xenbits.xen.org Git - xenclient/linux-2.6.27-pq.git/commitdiff
V2V core driver patch (and sample driver).
authorRoss Philipson <ross.philipson@citrix.com>
Mon, 14 Sep 2009 19:50:06 +0000 (15:50 -0400)
committerRoss Philipson <ross.philipson@citrix.com>
Mon, 14 Sep 2009 19:50:06 +0000 (15:50 -0400)
 Changes to be committed:
modified:   master/series
new file:   master/v2v-core

master/series
master/v2v-core [new file with mode: 0644]

index d8ce2cfc34bd9b85d22625bc27435b811daae3a3..ba314553b22679542f725cfc28e85f7f0dcc6779 100644 (file)
@@ -313,3 +313,4 @@ bridge-carrier
 blktap2-pause-unpause
 blktap2-smp-map-unmap
 bridge-carrier-follows-prio0.patch
+v2v-core
diff --git a/master/v2v-core b/master/v2v-core
new file mode 100644 (file)
index 0000000..cd90dba
--- /dev/null
@@ -0,0 +1,3847 @@
+diff --git a/drivers/xen/Kconfig b/drivers/xen/Kconfig
+index ed4b89b..44c4052 100644
+--- a/drivers/xen/Kconfig
++++ b/drivers/xen/Kconfig
+@@ -262,6 +262,8 @@ config XEN_ACPI_WMI_WRAPPER
+           Facilitates OEM specific hotkey implementation within
+           guest space.
++source "drivers/xen/v2v/Kconfig"
++          
+ choice
+       prompt "Xen version compatibility"
+       default XEN_COMPAT_030002_AND_LATER
+diff --git a/drivers/xen/Makefile b/drivers/xen/Makefile
+index 873e5a3..408e7eb 100644
+--- a/drivers/xen/Makefile
++++ b/drivers/xen/Makefile
+@@ -31,3 +31,4 @@ obj-$(CONFIG_XEN_NETDEV_ACCEL_SFC_UTIL)              += sfc_netutil/
+ obj-$(CONFIG_XEN_NETDEV_ACCEL_SFC_FRONTEND)   += sfc_netfront/
+ obj-$(CONFIG_XEN_NETDEV_ACCEL_SFC_BACKEND)    += sfc_netback/
+ obj-$(CONFIG_XEN_ACPI_WMI_WRAPPER)              += acpi-wmi/
++obj-y                         += v2v/
+diff --git a/drivers/xen/v2v/Kconfig b/drivers/xen/v2v/Kconfig
+new file mode 100644
+index 0000000..afcbba2
+--- /dev/null
++++ b/drivers/xen/v2v/Kconfig
+@@ -0,0 +1,17 @@
++#
++# This Kconfig describe xen v2v options
++#
++
++config XEN_V2V
++        tristate "Xen V2V communications driver"
++        depends on XEN
++      default y
++        help
++          Xen interdomain communication services.
++
++config XEN_V2V_DRV
++        tristate "Xen V2V sample communications client driver"
++        depends on XEN_V2V
++      default n
++        help
++          Sample for Xen V2V interdomain communication services.
+diff --git a/drivers/xen/v2v/Makefile b/drivers/xen/v2v/Makefile
+new file mode 100644
+index 0000000..7d6e45e
+--- /dev/null
++++ b/drivers/xen/v2v/Makefile
+@@ -0,0 +1,3 @@
++
++obj-$(CONFIG_XEN_V2V) += v2v.o v2vutl.o
++obj-$(CONFIG_XEN_V2V_DRV) += v2vdrv.o v2vops.o
+diff --git a/drivers/xen/v2v/v2v.c b/drivers/xen/v2v/v2v.c
+new file mode 100644
+index 0000000..104c31d
+--- /dev/null
++++ b/drivers/xen/v2v/v2v.c
+@@ -0,0 +1,1501 @@
++/******************************************************************************
++ * drivers/xen/v2v/v2vutl.c
++ *
++ * V2V interdomain communication driver.
++ * 
++ * Copyright (c) 2009 Ross Philipson
++ * Copyright (c) 2009 Citrix Systems, Inc.
++ *
++ * This program is free software; you can redistribute it and/or
++ * modify it under the terms of the GNU General Public License version 2
++ * as published by the Free Software Foundation; or, when distributed
++ * separately from the Linux kernel or incorporated into other
++ * software packages, subject to the following license:
++ *
++ * Permission is hereby granted, free of charge, to any person obtaining a copy
++ * of this source file (the "Software"), to deal in the Software without
++ * restriction, including without limitation the rights to use, copy, modify,
++ * merge, publish, distribute, sublicense, and/or sell copies of the Software,
++ * and to permit persons to whom the Software is furnished to do so, subject to
++ * the following conditions:
++ *
++ * The above copyright notice and this permission notice shall be included in
++ * all copies or substantial portions of the Software.
++ *
++ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
++ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
++ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
++ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
++ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
++ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
++ * IN THE SOFTWARE.
++ */
++
++#include <linux/mm.h>
++#include <linux/init.h>
++#include <linux/module.h>
++#include <linux/vmalloc.h>
++#include <xen/xenbus.h>
++#include <xen/evtchn.h>
++#include <xen/gnttab.h>
++#include <xen/v2v.h>
++#include <xen/interface/io/uring.h>
++#include "v2v_private.h"
++
++#define CONSUMER_SPIN_LIMIT 2048
++#define GREF_STRING_LEN     32
++
++struct v2v_wait_entry {
++    struct list_head list;
++    u8 reason;
++};
++
++struct v2v_channel {
++    struct xenbus_watch remote_state_watch;
++    char *local_prefix;
++    char *remote_prefix;
++    domid_t peer_domid;
++
++    struct v2v_wait wait_state;
++    spinlock_t wait_list_lock;
++    struct list_head wait_entry_list;
++
++    void *prod_sring;
++    void *cons_sring;
++    void *control;
++    
++    int receive_evtchn_irq;
++    int send_evtchn_irq;
++    
++    unsigned nr_prod_ring_pages;
++    unsigned nr_cons_ring_pages;
++
++    unsigned current_message_size;
++
++    struct nc2_ring_pair nc2_rings;
++
++    int has_watch;
++    int is_temple;
++
++    union {
++        struct {
++            int has_grant_alloc;
++            grant_ref_t gref_head;
++
++            grant_ref_t prod_grefs[MAX_RING_PAGES];
++            grant_ref_t cons_grefs[MAX_RING_PAGES];
++            grant_ref_t control_gref;
++        } temple;
++        struct {
++            struct vm_struct *prod_area;
++            struct vm_struct *cons_area;
++            struct vm_struct *control_area;
++
++            grant_ref_t prod_grefs[MAX_RING_PAGES];
++            grant_handle_t prod_shmem_handles[MAX_RING_PAGES];
++
++            grant_ref_t cons_grefs[MAX_RING_PAGES];
++            grant_handle_t cons_shmem_handles[MAX_RING_PAGES];
++
++            grant_ref_t control_gref;
++            grant_handle_t control_shmem_handle;
++
++            unsigned int prod_evtchn_port;
++            unsigned int cons_evtchn_port;
++        } supplicant;
++    } u;
++};
++
++static inline void
++v2v_set_event(struct v2v_channel *channel)
++{
++    atomic_set(&channel->wait_state.wait_condition, 1);
++    wake_up(&channel->wait_state.wait_event);
++}
++
++/* Wait Reason Queue functions */
++static int
++v2v_wrq_queue(struct v2v_channel *channel, u8 reason)
++{
++    struct v2v_wait_entry *entry = NULL;
++    unsigned long flags;
++
++    /* Wake reason queue function - like the set event function. Add an instances
++       of 'reason' to the queue. Caller should set the event condition */
++    spin_lock_irqsave(&channel->wait_list_lock, flags);
++    entry = (struct v2v_wait_entry*)kmalloc(sizeof(*entry), GFP_KERNEL);
++    if (entry) {
++        entry->reason = reason;
++        list_add_tail(&entry->list, &channel->wait_entry_list);
++    }
++    spin_unlock_irqrestore(&channel->wait_list_lock, flags);
++
++    return (entry) ? 0 : -ENOMEM;
++}
++
++static u8
++v2v_wrq_dequeue(struct v2v_channel *channel, u8 reasons)
++{
++    struct v2v_wait_entry *entry = NULL, *t;
++    u8 found_reason = V2V_WAKE_REASON_NONE;
++    unsigned long flags;
++
++    /* Wake reason dequeue function - like the wait event function. Return a wake
++       reason from the from of the queue that matches the 'reasons' criteria. */
++    spin_lock_irqsave(&channel->wait_list_lock, flags);
++    list_for_each_entry_safe(entry, t, &channel->wait_entry_list, list) {
++        if (entry->reason & reasons) {
++            found_reason = entry->reason;
++            list_del(&entry->list);
++            kfree(entry);
++            break;
++        }
++    }
++    spin_unlock_irqrestore(&channel->wait_list_lock, flags);
++
++    return found_reason;
++}
++
++static void
++v2v_wrq_clear(struct v2v_channel *channel, u8 reasons)
++{
++    struct v2v_wait_entry *entry, *t;
++    unsigned long flags;
++
++    /* Wake reason clear function - like the reset event function. Remove all instances
++       of 'reasons' from the queue. */
++    spin_lock_irqsave(&channel->wait_list_lock, flags);
++    list_for_each_entry_safe(entry, t, &channel->wait_entry_list, list) {
++        if (entry->reason & reasons) {
++            list_del(&entry->list);
++            kfree(entry);
++        }
++    }
++    spin_unlock_irqrestore(&channel->wait_list_lock, flags);
++}
++
++static void
++v2v_wrq_cleanup(struct v2v_channel *channel)
++{
++    struct v2v_wait_entry *entry, *t;
++    unsigned long flags;
++
++    /* Wake reason clear function - delete all elements and reset list head */
++    spin_lock_irqsave(&channel->wait_list_lock, flags);
++    list_for_each_entry_safe(entry, t, &channel->wait_entry_list, list) {
++        list_del(&entry->list); /* not really needed */
++        kfree(entry);
++    }
++    INIT_LIST_HEAD(&channel->wait_entry_list);
++    spin_unlock_irqrestore(&channel->wait_list_lock, flags);
++}
++
++static void
++v2v_remote_state_changed(struct xenbus_watch *watch,
++                         const char **vec,
++                         unsigned int len)
++{
++    struct v2v_channel *channel = 
++        container_of(watch, struct v2v_channel, remote_state_watch);
++
++    if (v2v_wrq_queue(channel, V2V_WAKE_REASON_CONTROL))
++        return; /* not much we can do */
++
++    v2v_set_event(channel);
++}
++
++static irqreturn_t
++send_int(int irq, void *dev_id)
++{
++    struct v2v_channel *channel = dev_id;
++    
++    if (v2v_wrq_queue(channel, V2V_WAKE_REASON_SEND))
++        return IRQ_HANDLED; /* not much we can do */
++
++    v2v_set_event(channel);
++
++    return IRQ_HANDLED;
++}
++
++static irqreturn_t
++receive_int(int irq, void *dev_id)
++{
++    struct v2v_channel *channel = dev_id;
++    
++    if (v2v_wrq_queue(channel, V2V_WAKE_REASON_RECEIVE))
++        return IRQ_HANDLED; /* not much we can do */
++
++    v2v_set_event(channel);
++
++    return IRQ_HANDLED;
++}
++
++static void
++v2v_destroy_channel(const struct v2v_channel *_chan, int free_temple)
++{    
++    struct v2v_channel *chan = (struct v2v_channel *)_chan;
++    unsigned x;
++
++    DPRINTK("destroying channel: %p\n", chan);
++
++    if (chan->has_watch) {
++        unregister_xenbus_watch(&chan->remote_state_watch);
++        DPRINTK("unregistered watch on: %s\n", chan->remote_state_watch.node);
++        kfree(chan->remote_state_watch.node);
++    }
++
++    if (chan->local_prefix)
++        kfree(chan->local_prefix);
++    if (chan->remote_prefix)
++        kfree(chan->remote_prefix);
++    DPRINTK("cleaned up prefix strings\n");
++    
++    if (!chan->is_temple) {
++        /* Note that the cons ring page count is based on the prod ring order and vv */
++        if (chan->u.supplicant.prod_area) {
++            v2v_xenops_grant_unmap(chan->u.supplicant.prod_area,
++                                   chan->u.supplicant.prod_shmem_handles,
++                                   chan->nr_cons_ring_pages,
++                                   1);
++        }
++        if (chan->u.supplicant.cons_area) {
++            v2v_xenops_grant_unmap(chan->u.supplicant.cons_area,
++                                   chan->u.supplicant.cons_shmem_handles,
++                                   chan->nr_prod_ring_pages,
++                                   0);
++        }
++        if (chan->u.supplicant.control_area) {
++            v2v_xenops_grant_unmap(chan->u.supplicant.control_area,
++                                   &chan->u.supplicant.control_shmem_handle,
++                                   1,
++                                   0);
++        }
++        DPRINTK("unmapped grant refs for supplicant.\n");
++    }
++    else if (free_temple) { /* and is temple */
++        if (chan->u.temple.has_grant_alloc) {            
++            for (x = 0; x < chan->nr_prod_ring_pages; x++) {
++                if (chan->u.temple.prod_grefs[x] != GRANT_INVALID_REF) {
++                    gnttab_end_foreign_access_ref(chan->u.temple.prod_grefs[x]);
++                    gnttab_release_grant_reference(&chan->u.temple.gref_head,
++                                                   chan->u.temple.prod_grefs[x]);
++                }
++            }
++            
++            for (x = 0; x < chan->nr_cons_ring_pages; x++) {
++                if (chan->u.temple.cons_grefs[x] != GRANT_INVALID_REF) {
++                    gnttab_end_foreign_access_ref(chan->u.temple.cons_grefs[x]);
++                    gnttab_release_grant_reference(&chan->u.temple.gref_head,
++                                                   chan->u.temple.cons_grefs[x]);
++                }
++            }
++            
++            if (chan->u.temple.control_gref != GRANT_INVALID_REF) {
++                gnttab_end_foreign_access_ref(chan->u.temple.control_gref);
++                gnttab_release_grant_reference(&chan->u.temple.gref_head,
++                                               chan->u.temple.control_gref);
++            }
++            gnttab_free_grant_references(chan->u.temple.gref_head);            
++        }
++
++        if (chan->prod_sring)
++            kfree(chan->prod_sring);
++        if (chan->cons_sring)
++            kfree(chan->cons_sring);
++        if (chan->control)
++            kfree(chan->control);
++        DPRINTK("freed grant refs and rings for temple.\n");
++    }
++
++    if (chan->receive_evtchn_irq != 0)
++        unbind_from_irqhandler(chan->receive_evtchn_irq, chan);
++    if (chan->send_evtchn_irq != 0)
++        unbind_from_irqhandler(chan->send_evtchn_irq, chan);
++    DPRINTK("unbound irq handlers.\n");
++
++    /* cleanup anything in chan->wait_entry_list once the evtchns are shutdown */
++    v2v_wrq_cleanup(chan);
++
++    kfree(chan);
++    DPRINTK("channel freed, exiting.\n");
++}
++
++static int
++v2v_read_peer_domid(struct v2v_channel *chan)
++{
++    char *val;
++    unsigned int n;
++
++    val = xenbus_read(XBT_NIL, chan->local_prefix, "peer-domid", NULL);
++    if (IS_ERR(val))
++        return PTR_ERR(val);
++    sscanf(val, "%u", &n);
++    chan->peer_domid = (domid_t)n;
++    kfree(val);
++
++    DPRINTK("read peer domain id: %d\n", chan->peer_domid);
++
++    return 0;
++}
++
++static struct v2v_channel *
++v2v_make_channel(const char *xenbus_prefix)
++{
++    struct v2v_channel *chan;
++
++    chan = (struct v2v_channel*)kmalloc(sizeof(*chan), GFP_KERNEL);
++    if (!chan)
++        return NULL;
++    memset(chan, 0, sizeof(*chan));
++
++    chan->local_prefix = (char*)kmalloc(strlen(xenbus_prefix) + 1, GFP_KERNEL);
++    if (!chan->local_prefix) {
++        kfree(chan);
++        return NULL;
++    }
++    strcpy(chan->local_prefix, xenbus_prefix);
++    
++    init_waitqueue_head(&chan->wait_state.wait_event);
++    spin_lock_init(&chan->wait_list_lock);
++    INIT_LIST_HEAD(&chan->wait_entry_list);
++
++    DPRINTK("created channel: %p with local prefix - %s\n", chan, chan->local_prefix);
++    
++    return chan;
++}
++
++static int
++v2v_connect_channel_xenbus(struct v2v_channel *chan, struct xenbus_transaction xbt)
++{
++    char *val = NULL;
++    int err;
++
++    val = xenbus_read(XBT_NIL, chan->local_prefix, "backend", NULL);
++    if (IS_ERR(val)) {
++        EPRINTK("xenbus read backend failed - local prefix: %s err: %li\n",
++                chan->local_prefix, PTR_ERR(val));
++        return PTR_ERR(val);
++    }
++    chan->remote_prefix = val;
++    DPRINTK("connect channel: %p - local prefix: %s remote prefix: %s\n",
++            chan, chan->local_prefix, chan->remote_prefix);
++
++    err = v2v_read_peer_domid(chan);
++    if (err) {
++        EPRINTK("failed to read peer domain id - err: %d\n", err);
++        return err;
++    }
++
++    val = kasprintf(GFP_NOIO | __GFP_HIGH, "%s/%s", chan->remote_prefix, "state");
++      if (IS_ERR(val)) {
++        EPRINTK("kasprintf remote_prefix/state failed - err: %li\n", PTR_ERR(val));
++        return PTR_ERR(val);
++    }
++
++    chan->remote_state_watch.node = val;
++    chan->remote_state_watch.callback = v2v_remote_state_changed;
++
++    err = register_xenbus_watch(&chan->remote_state_watch);
++    if (err) {
++        EPRINTK("failed to regsister xenbus watch for peer - err: %d\n", err);
++        kfree(chan->remote_state_watch.node);
++        return err;
++    }
++    chan->has_watch = 1;
++    DPRINTK("registered watch on: %s\n", chan->remote_state_watch.node);
++
++    return 0;
++}
++
++static void
++v2v_nc2_attach_rings_temple(struct nc2_ring_pair *ncrp,
++                            const volatile void *cons_sring,
++                            unsigned cons_ring_size,
++                            void *prod_sring,
++                            unsigned prod_ring_size,
++                            struct netchannel2_frontend_shared *control)
++{
++    memset(ncrp, 0, sizeof(*ncrp));
++    ncrp->local_endpoint = &control->a;
++    ncrp->remote_endpoint = &control->b;
++    ncrp->producer_payload_bytes = prod_ring_size;
++    ncrp->producer_payload = prod_sring;
++    ncrp->consumer_payload_bytes = cons_ring_size;
++    ncrp->consumer_payload = cons_sring;
++
++    ncrp->local_endpoint->prod_event = ncrp->remote_endpoint->prod + 1;
++}
++
++static void
++v2v_nc2_attach_rings_supplicant(struct nc2_ring_pair *ncrp,
++                                const volatile void *cons_sring,
++                                unsigned cons_ring_size,
++                                void *prod_sring,
++                                unsigned prod_ring_size,
++                                struct netchannel2_backend_shared *control)
++{
++    memset(ncrp, 0, sizeof(*ncrp));
++    ncrp->local_endpoint = &control->a;
++    ncrp->remote_endpoint = &control->b;
++    ncrp->producer_payload_bytes = prod_ring_size;
++    ncrp->producer_payload = prod_sring;
++    ncrp->consumer_payload_bytes = cons_ring_size;
++    ncrp->consumer_payload = cons_sring;
++
++    ncrp->local_endpoint->prod_event = ncrp->remote_endpoint->prod + 1;
++}
++
++const char *
++v2v_endpoint_state_name(enum v2v_endpoint_state state)
++{
++    switch (state) {
++    case v2v_state_unknown:
++        return "unknown";
++    case v2v_state_unready:
++        return "unready";
++    case v2v_state_listening:
++        return "listening";
++    case v2v_state_connected:
++        return "connected";
++    case v2v_state_disconnecting:
++        return "disconnecting";
++    case v2v_state_disconnected:
++        return "disconnected";
++    case v2v_state_crashed:
++        return "crashed";
++    }
++    return "v2v_state_invalid";
++}
++EXPORT_SYMBOL_GPL(v2v_endpoint_state_name);
++
++static inline int
++v2v_change_local_state(struct v2v_channel *channel,
++                       struct xenbus_transaction xbt,
++                       enum v2v_endpoint_state state)
++{
++    return xenbus_printf(xbt, channel->local_prefix, "state", v2v_endpoint_state_name(state));
++}
++
++int
++v2v_listen(const char *xenbus_prefix, struct v2v_channel **channel,
++           unsigned prod_ring_page_order, unsigned cons_ring_page_order)
++{
++    unsigned prod_ring_size = PAGE_SIZE << prod_ring_page_order;
++    unsigned cons_ring_size = PAGE_SIZE << cons_ring_page_order;
++    struct v2v_channel *chan;
++    struct xenbus_transaction xbt = {0};
++    unsigned long mfn;
++    grant_ref_t ref;
++    unsigned x;
++    int xbt_pending = 0;
++    int err = 0;
++    char buf[GREF_STRING_LEN];
++
++    if (prod_ring_page_order > MAX_RING_PAGE_ORDER ||
++        cons_ring_page_order > MAX_RING_PAGE_ORDER ||
++        !channel || !xenbus_prefix) {
++        EPRINTK("v2v_listen - invalid arguments\n");
++        return -EINVAL;
++    }
++
++    *channel = NULL;
++
++    chan = v2v_make_channel(xenbus_prefix);
++    if (!chan) {
++        EPRINTK("v2v_listen - out of memory making channel\n");
++        return -ENOMEM;
++    }
++
++    chan->is_temple = 1;
++
++    chan->prod_sring = kmalloc(prod_ring_size, GFP_KERNEL);
++    chan->cons_sring = kmalloc(cons_ring_size, GFP_KERNEL);
++    chan->control = kmalloc(PAGE_SIZE, GFP_KERNEL);
++    if (!chan->prod_sring || !chan->cons_sring || !chan->control) {
++        EPRINTK("v2v_listen - out of memory making rings\n");
++        goto err_nomem;
++    }
++
++    memset(chan->prod_sring, 0, prod_ring_size);
++    memset(chan->cons_sring, 0, cons_ring_size);
++    memset(chan->control, 0, PAGE_SIZE);
++
++    chan->nr_prod_ring_pages = 1 << prod_ring_page_order;
++    chan->nr_cons_ring_pages = 1 << cons_ring_page_order;
++
++    /* pre-allocate the granf refs we are going to need below */
++    err = gnttab_alloc_grant_references(chan->nr_prod_ring_pages + chan->nr_cons_ring_pages + 1, 
++                                        &chan->u.temple.gref_head);
++    if (err) {
++        EPRINTK("gnttab_alloc_grant_references call failed - err: %d\n", err);
++        goto err_out;
++    }
++    chan->u.temple.has_grant_alloc = 1;
++
++    v2v_nc2_attach_rings_temple(&chan->nc2_rings,
++                                chan->cons_sring,
++                                cons_ring_size,
++                                chan->prod_sring,
++                                prod_ring_size,
++                                chan->control);
++
++    for (;;) {
++        xenbus_transaction_start(&xbt);
++        xbt_pending = 1;
++        
++        err = v2v_connect_channel_xenbus(chan, xbt);
++        if (err) {
++            EPRINTK("v2v_listen - failed to connect channel - err: %d\n", err);
++            goto err_out;
++        }
++
++        for (x = 0; x < 1u << prod_ring_page_order; x++) {           
++            mfn = virt_to_mfn((void *)((off_t)chan->prod_sring + x * PAGE_SIZE));
++            ref = gnttab_claim_grant_reference(&chan->u.temple.gref_head);
++            BUG_ON((signed short)ref < 0);
++            chan->u.temple.prod_grefs[x] = ref;
++            gnttab_grant_foreign_access_ref(ref, chan->peer_domid, mfn, GTF_readonly);
++            snprintf(buf, GREF_STRING_LEN, "prod-gref-%d", x);
++            err = xenbus_printf(xbt, chan->local_prefix, buf, "%d", ref);
++            if (err) {
++                EPRINTK("xenbus_printf(prod-gref) failed - err: %d\n", err);
++                goto err_out;
++            }
++        }
++        DPRINTK("created %d producer grant refs\n", 1u << prod_ring_page_order);
++
++        for (x = 0; x < 1u << cons_ring_page_order; x++) {           
++            mfn = virt_to_mfn((void *)((off_t)chan->cons_sring + x * PAGE_SIZE));
++            ref = gnttab_claim_grant_reference(&chan->u.temple.gref_head);
++            BUG_ON((signed short)ref < 0);
++            chan->u.temple.cons_grefs[x] = ref;
++            gnttab_grant_foreign_access_ref(ref, chan->peer_domid, mfn, 0);
++            snprintf(buf, GREF_STRING_LEN, "cons-gref-%d", x);
++            err = xenbus_printf(xbt, chan->local_prefix, buf, "%d", ref);
++            if (err) {
++                EPRINTK("xenbus_printf(cons-gref) failed - err: %d\n", err);
++                goto err_out;
++            }
++        }
++        DPRINTK("created %d consumer grant refs\n", 1u << cons_ring_page_order);
++
++        mfn = virt_to_mfn((void *)((off_t)chan->control));
++        ref = gnttab_claim_grant_reference(&chan->u.temple.gref_head);
++        BUG_ON((signed short)ref < 0);
++        chan->u.temple.control_gref = ref;
++        gnttab_grant_foreign_access_ref(ref, chan->peer_domid, mfn, 0);
++        DPRINTK("created control grant ref\n");
++
++        err = 
++            bind_listening_port_to_irqhandler(chan->peer_domid, receive_int, 0, "v2v", chan);
++        if (err < 0) {
++            EPRINTK("bind_listening_port_to_irqhandler(receive) failed - err: %d\n", err);
++            goto err_out;
++        }
++        chan->receive_evtchn_irq = err;
++        DPRINTK("created listening port for receive: %d\n", chan->receive_evtchn_irq);
++
++        err = 
++            bind_listening_port_to_irqhandler(chan->peer_domid, send_int, 0, "v2v", chan);
++        if (err < 0) {
++            EPRINTK("bind_listening_port_to_irqhandler(send) failed - err: %d\n", err);
++            goto err_out;
++        }
++        chan->send_evtchn_irq = err;
++        DPRINTK("created listening port for send: %d\n", chan->send_evtchn_irq);
++
++        err = 
++            v2v_xenstore_scatter(xbt, chan->local_prefix,
++                                 "prod-order", xenstore_scatter_type_int,
++                                     prod_ring_page_order,
++                                 "cons-order", xenstore_scatter_type_int,
++                                     cons_ring_page_order,
++                                 "control-gref", xenstore_scatter_type_grant_ref,
++                                     chan->u.temple.control_gref,
++                                 "prod-evtchn",xenstore_scatter_type_evtchn_irq,
++                                     chan->send_evtchn_irq,
++                                 "cons-evtchn",xenstore_scatter_type_evtchn_irq,
++                                     chan->receive_evtchn_irq,
++                                 NULL);
++        if (err) {
++            EPRINTK("v2v_xenstore_scatter failed - err: %d\n", err);
++            goto err_out;
++        }
++        DPRINTK("scattered listen values to: %s\n", chan->local_prefix);
++        DPRINTK("  -- prod-order=%d cons-order=%d control-gref=%d prod-evtchn=%d cons-evtchn=%d\n",
++                prod_ring_page_order, cons_ring_page_order, chan->u.temple.control_gref,
++                chan->send_evtchn_irq, chan->receive_evtchn_irq);
++        
++        err = v2v_change_local_state(chan, xbt, v2v_state_listening);
++        if (err) {
++            EPRINTK("v2v_change_local_state(listening) failed - err: %d\n", err);
++            goto err_out;
++        }
++
++        err = xenbus_transaction_end(xbt, 0);
++        xbt_pending = 0;
++        if (err == 0)
++            break;
++        if (err != -EAGAIN) {
++            EPRINTK("v2v_listen - error commiting xs transaction - err: %d\n", err);
++            goto err_out;
++        }
++
++        DPRINTK("v2v_listen - attempting listen again...\n");
++
++        /* cleanup for retry, gref cannot be busy since a supplicant has never connected */
++        for (x = 0; x < 1u << prod_ring_page_order; x++) {          
++            gnttab_end_foreign_access_ref(chan->u.temple.prod_grefs[x]);
++            gnttab_release_grant_reference(&chan->u.temple.gref_head,
++                                           chan->u.temple.prod_grefs[x]);
++            chan->u.temple.prod_grefs[x] = GRANT_INVALID_REF;
++        }
++        
++        for (x = 0; x < 1u << cons_ring_page_order; x++) {          
++            gnttab_end_foreign_access_ref(chan->u.temple.cons_grefs[x]);
++            gnttab_release_grant_reference(&chan->u.temple.gref_head,
++                                           chan->u.temple.cons_grefs[x]);
++            chan->u.temple.cons_grefs[x] = GRANT_INVALID_REF;
++        }
++
++        gnttab_end_foreign_access_ref(chan->u.temple.control_gref);
++        gnttab_release_grant_reference(&chan->u.temple.gref_head,
++                                       chan->u.temple.control_gref);
++        chan->u.temple.control_gref = GRANT_INVALID_REF;
++
++        unbind_from_irqhandler(chan->receive_evtchn_irq, chan);
++        chan->receive_evtchn_irq = 0;
++        unbind_from_irqhandler(chan->send_evtchn_irq, chan);
++        chan->send_evtchn_irq = 0;
++
++        /* undo what v2v_connect_channel_xenbus did */
++        unregister_xenbus_watch(&chan->remote_state_watch);
++        kfree(chan->remote_state_watch.node);
++        chan->has_watch = 0;
++
++        kfree(chan->remote_prefix);
++        chan->remote_prefix = NULL;
++    }
++
++    *channel = chan;
++
++    DPRINTK("listening, channel: %p\n", chan);
++
++    return 0;
++
++err_nomem:
++    err = -ENOMEM;
++err_out:
++    if (xbt_pending)
++        xenbus_transaction_end(xbt, 1);
++    /* since the channel has never been connected here, it is safe 
++       to free any temple resources that may have been allocated in 
++       this routine */
++    v2v_destroy_channel(chan, 1);
++    return err;
++}
++EXPORT_SYMBOL_GPL(v2v_listen);
++
++static int
++v2v_get_remote_state_internal(struct xenbus_transaction xbt,
++                              struct v2v_channel *channel,
++                              enum v2v_endpoint_state *state)
++{
++    char *raw;
++    
++    if (!state)
++        return -EINVAL;
++    *state = v2v_state_unknown;
++
++    raw = xenbus_read(xbt, channel->remote_prefix, "state", NULL);
++    if (IS_ERR(raw))
++        return PTR_ERR(raw);
++        
++    if (!strcmp(raw, "unready"))
++        *state = v2v_state_unready;
++    else if (!strcmp(raw, "listening"))
++        *state = v2v_state_listening;
++    else if (!strcmp(raw, "connected"))
++        *state = v2v_state_connected;
++    else if (!strcmp(raw, "disconnecting"))
++        *state = v2v_state_disconnecting;
++    else if (!strcmp(raw, "disconnected"))
++        *state = v2v_state_disconnected;
++    else if (!strcmp(raw, "crashed"))
++        *state = v2v_state_crashed;
++    else
++        *state = v2v_state_unknown;    
++
++    kfree(raw);
++    
++    return (*state != v2v_state_unknown ? 0 : -EBADMSG); 
++}
++
++int
++v2v_get_remote_state(struct v2v_channel *channel, enum v2v_endpoint_state *state)
++{
++    return v2v_get_remote_state_internal(XBT_NIL, channel, state);
++}
++EXPORT_SYMBOL_GPL(v2v_get_remote_state);
++
++int
++v2v_accept(struct v2v_channel *channel)
++{
++    int err = 0;
++    struct xenbus_transaction xbt = {0};
++    enum v2v_endpoint_state remote_state;   
++    u8 reason;
++
++    for (;;) {
++        xenbus_transaction_start(&xbt);        
++        err = v2v_get_remote_state_internal(xbt, channel, &remote_state);
++        switch (remote_state) {
++        case v2v_state_unready:
++        case v2v_state_disconnected:
++        case v2v_state_crashed:
++            xenbus_transaction_end(xbt, 1);
++            wait_event(channel->wait_state.wait_event,
++                       atomic_xchg(&channel->wait_state.wait_condition, 0) == 1);
++            /* ### get the event reason, should be only the control event right now */
++            reason = v2v_wrq_dequeue(channel, V2V_WAKE_REASON_ANY);
++            if (reason != V2V_WAKE_REASON_CONTROL) {
++                EPRINTK("v2v_accept - invalid wake reason in; aborting - reason: %2.2x\n", reason);
++                return -EINVAL;
++            }
++            break;
++        case v2v_state_listening:
++            xenbus_transaction_end(xbt, 1);
++            EPRINTK("v2v_accept - v2v_state_listening implies possible deadlock!\n");
++            return -EDEADLK;
++        case v2v_state_disconnecting:
++            xenbus_transaction_end(xbt, 1);
++            EPRINTK("v2v_accept - v2v_state_disconnecting during accept, remote end shutting down\n");
++            return -ENOLINK;
++        case v2v_state_unknown:
++            xenbus_transaction_end(xbt, 1);
++            EPRINTK("v2v_accept - v2v_state_unknown - err: %d\n", err);
++            return err; /* return the error from get state call */
++        case v2v_state_connected:
++            err = v2v_change_local_state(channel, xbt, v2v_state_connected);
++            if (err) {                
++                xenbus_transaction_end(xbt, 1);
++                EPRINTK("v2v_accept - v2v_change_local_state(connected) failed - err: %d\n", err);
++                return err;
++            }
++            err = xenbus_transaction_end(xbt, 0);
++            if (err == 0)
++                return 0;
++            if (err != -EAGAIN) {
++                EPRINTK("v2v_accept - error commiting xs transaction - err: %d\n", err);
++                return err;
++            }
++            break; /* try again */                     
++        }          
++    }
++
++    /* the initial state of the send event is set to wakeup for send since there
++       is always send room on startup of the rings */
++    err = v2v_wrq_queue(channel, V2V_WAKE_REASON_SEND);
++    if (err)
++        return err;
++    v2v_set_event(channel);
++
++    DPRINTK("accepted, channel: %p\n", channel);
++
++    return 0;
++}
++EXPORT_SYMBOL_GPL(v2v_accept);
++
++int
++v2v_connect(const char *xenbus_prefix, struct v2v_channel **channel)
++{
++    int err = 0;
++    struct xenbus_transaction xbt = {0};
++    struct v2v_channel *chan;
++    enum v2v_endpoint_state remote_state;
++    int producer_ring_order;
++    int consumer_ring_order;
++    unsigned x;
++    int xbt_pending = 0;
++    char buf[GREF_STRING_LEN];
++
++    if (!channel || !xenbus_prefix) {
++        EPRINTK("v2v_connect - invalid arguments\n");
++        return -EINVAL;
++    }
++
++    *channel = NULL;
++
++    for (;;) {
++        chan = v2v_make_channel(xenbus_prefix);
++        if (!chan) {
++            EPRINTK("v2v_connect - out of memory making channel\n");
++            return -ENOMEM;
++        }
++        /* initialize the gref handles, an invalid value is all bits set */
++        memset(chan->u.supplicant.prod_shmem_handles, 0xFF,
++               sizeof(grant_handle_t)*MAX_RING_PAGES);
++        memset(chan->u.supplicant.cons_shmem_handles, 0xFF,
++               sizeof(grant_handle_t)*MAX_RING_PAGES);
++
++        xenbus_transaction_start(&xbt);
++        xbt_pending = 1;
++
++        err = v2v_connect_channel_xenbus(chan, xbt);
++        if (err) {
++            EPRINTK("v2v_connect - failed to connect channel - err: %d\n", err);
++            goto err_out;
++        }
++        
++        err = v2v_get_remote_state_internal(xbt, chan, &remote_state);
++        if (remote_state == v2v_state_unknown) {
++            EPRINTK("v2v_connect - v2v_change_local_state failed - err: %d\n", err);
++            goto err_out; /* err set to error code */
++        }
++        if (remote_state != v2v_state_listening) {
++            WPRINTK("v2v_connect - remote side is not listening, state: %d\n", remote_state);
++            err = -ENODEV;
++            goto err_out;
++        }
++
++        err = 
++            v2v_xenstore_gather(xbt, chan->remote_prefix,
++                             "prod-order",
++                                 xenstore_gather_type_int,
++                                 &producer_ring_order,
++                             "cons-order",
++                                 xenstore_gather_type_int,
++                                 &consumer_ring_order,
++                             "control-gref",
++                                  xenstore_gather_type_alien_grant_ref,
++                                  &chan->u.supplicant.control_gref,
++                             "prod-evtchn",
++                                  xenstore_gather_type_alien_evtchn_port,
++                                  &chan->u.supplicant.prod_evtchn_port,
++                             "cons-evtchn",
++                                  xenstore_gather_type_alien_evtchn_port,
++                                  &chan->u.supplicant.cons_evtchn_port,
++                             NULL);
++        if (err) {
++            EPRINTK("v2v_xenstore_gather failed - err: %d\n", err);
++            goto err_out;
++        }
++
++        if (producer_ring_order > MAX_RING_PAGE_ORDER ||
++            consumer_ring_order > MAX_RING_PAGE_ORDER) {
++            EPRINTK("v2v_connect - invalid ring order(s) gathered - prod: %d cons: %d\n",
++                    producer_ring_order, consumer_ring_order);
++            err = -EINVAL;
++            goto err_out;
++        }
++        DPRINTK("gathered listen values from: %s\n", chan->remote_prefix);
++        DPRINTK("  -- prod-order=%d cons-order=%d control-gref=%d prod-evtchn=%d cons-evtchn=%d\n",
++                producer_ring_order, consumer_ring_order, chan->u.supplicant.control_gref,
++                chan->u.supplicant.prod_evtchn_port, chan->u.supplicant.cons_evtchn_port);
++
++        for (x = 0; x < 1 << producer_ring_order; x++) {
++            snprintf(buf, GREF_STRING_LEN, "prod-gref-%d", x);
++            err = xenbus_scanf(xbt, chan->remote_prefix, buf,
++                              "%d", &chan->u.supplicant.prod_grefs[x]);
++            if (err != 1) {
++                EPRINTK("xenbus_scanf(prod-gref) failed on index %d - err: %d\n", x, err);
++                err = -EINVAL;
++                goto err_out;
++            }
++        }
++
++        for (x = 0; x < 1 << consumer_ring_order; x++) {
++            snprintf(buf, GREF_STRING_LEN, "cons-gref-%d", x);
++            err = xenbus_scanf(xbt, chan->remote_prefix, buf,
++                              "%d", &chan->u.supplicant.cons_grefs[x]);
++            if (err != 1) {
++                EPRINTK("xenbus_scanf(cons-gref) failed on index %d - err: %d\n", x, err);
++                err = -EINVAL;
++                goto err_out;
++            }
++        }
++
++        /* Swap them round: *_ring_order is from the point of view of the
++           temple, but we need the supplicant's viewpoint. Note have to set 
++           these up here - need the ring page numbers to successfully clean
++           up any grant mappings */
++        chan->nr_prod_ring_pages = 1 << consumer_ring_order;
++        chan->nr_cons_ring_pages = 1 << producer_ring_order;
++
++        err = 
++            v2v_xenops_grant_map(&chan->u.supplicant.prod_area,
++                                 chan->u.supplicant.prod_shmem_handles,
++                                 chan->peer_domid,
++                                 1 << producer_ring_order,
++                                 chan->u.supplicant.prod_grefs,
++                                 1);
++        if (err) {
++            EPRINTK("v2v_xenops_grant_map(producer) failed - err: %d\n", err);
++            goto err_out;
++        }
++
++        /* foriegn producer ring on this end is our cons sring */
++        chan->cons_sring = chan->u.supplicant.prod_area->addr;
++        DPRINTK("mapped %d producer grant refs\n", 1 << producer_ring_order);
++
++        err = 
++            v2v_xenops_grant_map(&chan->u.supplicant.cons_area,
++                                 chan->u.supplicant.cons_shmem_handles,
++                                 chan->peer_domid,
++                                 1 << consumer_ring_order,
++                                 chan->u.supplicant.cons_grefs,
++                                 0);
++        if (err) {
++            EPRINTK("v2v_xenops_grant_map(consumer) failed - err: %d\n", err);
++            goto err_out;
++        }
++
++        /* foriegn consumer ring on this end is our prod sring */
++        chan->prod_sring = chan->u.supplicant.cons_area->addr;
++        DPRINTK("mapped %d consumer grant refs\n", 1 << consumer_ring_order);
++
++        err = 
++            v2v_xenops_grant_map(&chan->u.supplicant.control_area,
++                                 &chan->u.supplicant.control_shmem_handle,
++                                 chan->peer_domid,
++                                 1,
++                                 &chan->u.supplicant.control_gref,
++                                 0);
++        if (err) {
++            EPRINTK("v2v_xenops_grant_map(control) failed - err: %d\n", err);
++            goto err_out;
++        }
++
++        chan->control = chan->u.supplicant.control_area->addr;
++        DPRINTK("mapped controle grant ref\n");
++
++        err = 
++            bind_interdomain_evtchn_to_irqhandler(chan->peer_domid, chan->u.supplicant.prod_evtchn_port,
++                                                  receive_int, 0, "v2v", chan);
++        if (err < 0) {
++            EPRINTK("bind_interdomain_evtchn_to_irqhandler(receive_int) failed - err: %d\n", err);
++            goto err_out;
++        }
++        chan->receive_evtchn_irq = err;
++        DPRINTK("bound interdomain port for receive: %d\n", chan->receive_evtchn_irq);
++
++        err = 
++            bind_interdomain_evtchn_to_irqhandler(chan->peer_domid, chan->u.supplicant.cons_evtchn_port,
++                                                  send_int, 0, "v2v", chan);
++        if (err < 0) {
++            EPRINTK("bind_interdomain_evtchn_to_irqhandler(send_int) failed - err: %d\n", err);
++            goto err_out;
++        }
++        chan->send_evtchn_irq = err;
++        DPRINTK("bound interdomain port for send: %d\n", chan->send_evtchn_irq);
++
++        err = v2v_change_local_state(chan, xbt, v2v_state_connected);
++        if (err) {
++            EPRINTK("v2v_change_local_state(connected) failed - err: %d\n", err);
++            goto err_out;
++        }
++
++        err = xenbus_transaction_end(xbt, 0);
++        xbt_pending = 0;
++        if (err == 0)
++            break;
++        if (err != -EAGAIN) {
++            EPRINTK("v2v_connect - error commiting xs transaction - err: %d\n", err);
++            goto err_out;
++        }
++
++        DPRINTK("v2v_connect - attempting connect again...\n");
++
++        /* cleanup and try again */        
++        v2v_destroy_channel(chan, 0);
++    }
++
++    /* the initial state of the send event is set to wakeup for send since there
++       is always send room on startup of the rings */
++    err = v2v_wrq_queue(chan, V2V_WAKE_REASON_SEND);
++    if (err)
++        goto err_out;
++    v2v_set_event(chan);
++
++    v2v_nc2_attach_rings_supplicant(&chan->nc2_rings,
++                                    chan->cons_sring,
++                                    PAGE_SIZE << producer_ring_order,
++                                    chan->prod_sring,
++                                    PAGE_SIZE << consumer_ring_order,
++                                    chan->control);
++
++    *channel = chan;
++    DPRINTK("connected, channel: %p\n", chan);
++
++    return 0;
++
++err_out:
++    if (xbt_pending)
++        xenbus_transaction_end(xbt, 1);
++    v2v_destroy_channel(chan, 0);
++    return err;
++}
++EXPORT_SYMBOL_GPL(v2v_connect);
++
++static int
++v2v_disconnect_temple(const struct v2v_channel *_channel)
++{
++    int err = 0;
++    struct xenbus_transaction xbt = {0};
++    struct v2v_channel *channel = (struct v2v_channel *)_channel;
++    enum v2v_endpoint_state remote_state;
++    int failed, any_failed = 0;
++    unsigned x;
++    u8 reason = V2V_WAKE_REASON_NONE;
++
++    err = v2v_change_local_state(channel, XBT_NIL, v2v_state_disconnecting);
++    if (err)
++        return err;
++    
++    /* Get the other end to disconnect */
++    for (;;) {
++        xenbus_transaction_start(&xbt);
++        err = v2v_get_remote_state_internal(xbt, channel, &remote_state);
++        switch (remote_state) {
++        case v2v_state_unknown:            
++            if (XENBUS_EXIST_ERR(err)) {
++                WPRINTK("v2v_disconnect_temple - cannot find remote path\n");
++                break;
++            }
++            xenbus_transaction_end(xbt, 1);
++            EPRINTK("v2v_disconnect_temple - v2v_state_unknown - err: %d\n", err);
++            return err;
++
++            /* The first two shouldn't really happen, but sometimes
++               can if we've managed to screw (e.g.  if two processes
++               try to use the same endpoint).  Try to recover. */
++        case v2v_state_unready:
++        case v2v_state_listening:
++        case v2v_state_disconnecting:
++
++        case v2v_state_disconnected:
++        case v2v_state_crashed:
++            break;
++        case v2v_state_connected:
++            xenbus_transaction_end(xbt, 1);
++            while (reason != V2V_WAKE_REASON_CONTROL) {
++                wait_event(channel->wait_state.wait_event,
++                           atomic_xchg(&channel->wait_state.wait_condition, 0) == 1);
++                /* ### get the event reason, only care about control event right now - we are shutting down */
++                reason = v2v_wrq_dequeue(channel, V2V_WAKE_REASON_CONTROL);
++            }
++            continue;
++        }
++        err = v2v_change_local_state(channel, xbt, v2v_state_disconnected);
++        if (err) {
++            xenbus_transaction_end(xbt, 1);
++            EPRINTK("v2v_disconnect_temple - v2v_change_local_state(disconnected) failed - err: %d\n", err);
++            return err;
++        }
++
++        err = xenbus_transaction_end(xbt, 0);
++        if (err == 0)
++            break; /* drop out of loop and do rest */
++        if (err == -EAGAIN)
++            continue; /* try again */
++
++        EPRINTK("v2v_disconnect_temple - error commiting xs transaction - err: %d\n", err);
++        return err; /* else return the error */
++    }
++
++    BUG_ON(channel->u.temple.has_grant_alloc == 0);
++
++    failed = 0;
++    for (x = 0; x < channel->nr_prod_ring_pages; x++) {
++        if (channel->u.temple.prod_grefs[x] != GRANT_INVALID_REF) {
++            if (gnttab_query_foreign_access(channel->u.temple.prod_grefs[x]) == 0) {
++                gnttab_end_foreign_access_ref(channel->u.temple.prod_grefs[x]);
++                gnttab_release_grant_reference(&channel->u.temple.gref_head,
++                                               channel->u.temple.prod_grefs[x]);
++                channel->u.temple.prod_grefs[x] = GRANT_INVALID_REF;
++            }
++            else {
++                WPRINTK("v2v_disconnect_temple -- prod grant %u still in use by backend "
++                        "domain during listener disconnect\n", channel->u.temple.prod_grefs[x]);
++                failed = any_failed = 1;
++            }
++        }
++    }
++    if (!failed) {
++        kfree(channel->prod_sring);
++        channel->prod_sring = NULL;
++    }
++
++    failed = 0;
++    for (x = 0; x < channel->nr_cons_ring_pages; x++) {
++        if (channel->u.temple.cons_grefs[x] != GRANT_INVALID_REF) {
++            if (gnttab_query_foreign_access(channel->u.temple.cons_grefs[x]) == 0) {
++                gnttab_end_foreign_access_ref(channel->u.temple.cons_grefs[x]);
++                gnttab_release_grant_reference(&channel->u.temple.gref_head,
++                                               channel->u.temple.cons_grefs[x]);
++                channel->u.temple.cons_grefs[x] = GRANT_INVALID_REF;
++            }
++            else {                
++                WPRINTK("v2v_disconnect_temple -- cons grant %u still in use by backend "
++                        "domain during listener disconnect\n", channel->u.temple.cons_grefs[x]);
++                failed = any_failed = 1;
++            }
++        }
++    }
++    if (!failed) {
++        kfree(channel->cons_sring);
++        channel->cons_sring = NULL;
++    }
++
++    if (channel->u.temple.control_gref != GRANT_INVALID_REF) {
++        if (gnttab_query_foreign_access(channel->u.temple.control_gref) == 0) {
++            gnttab_end_foreign_access_ref(channel->u.temple.control_gref);
++            gnttab_release_grant_reference(&channel->u.temple.gref_head,
++                                           channel->u.temple.control_gref);
++            channel->u.temple.control_gref = GRANT_INVALID_REF;
++            kfree(channel->control);
++            channel->control = NULL;
++        }
++        else {
++            WPRINTK("v2v_disconnect_temple -- control grant %u still in use by backend "
++                    "domain during listener disconnect\n", channel->u.temple.control_gref);
++            any_failed = 1;
++        }
++    }
++
++    if (!any_failed)
++        gnttab_free_grant_references(channel->u.temple.gref_head);
++
++    if (channel->receive_evtchn_irq != 0) {
++        unbind_from_irqhandler(channel->receive_evtchn_irq, channel);
++        channel->receive_evtchn_irq = 0;
++    }
++    if (channel->send_evtchn_irq != 0) {
++        unbind_from_irqhandler(channel->send_evtchn_irq, channel);
++        channel->send_evtchn_irq = 0;
++    }
++
++    DPRINTK("finished disconnecting temple, channel: %p\n", channel);
++
++    /* We either freed the rings here or they could not be freed. Prevent
++       v2v_destroy_channel() from trying to free grants/rings with 
++       outstanding grant refs */
++    v2v_destroy_channel(channel, 0);
++    
++    return 0;
++}
++
++static int
++v2v_disconnect_supplicant(const struct v2v_channel *_channel)
++{
++    int err;
++    struct v2v_channel *channel = (struct v2v_channel *)_channel;
++
++    /* The grant mapping and vm areas are cleaned up in v2v_destroy_channel() */ 
++    channel->prod_sring = NULL;
++    channel->cons_sring = NULL;
++    channel->control = NULL;
++
++    if (channel->receive_evtchn_irq != 0) {
++        unbind_from_irqhandler(channel->receive_evtchn_irq, channel);
++        channel->receive_evtchn_irq = 0;
++    }
++    if (channel->send_evtchn_irq != 0) {
++        unbind_from_irqhandler(channel->send_evtchn_irq, channel);
++        channel->send_evtchn_irq = 0;
++    }
++
++    err = v2v_change_local_state(channel, XBT_NIL, v2v_state_disconnected);
++    if (err) {
++        EPRINTK("v2v_disconnect_supplicant - v2v_change_local_state(disconnected) failed - err: %d\n", err);
++        return err;
++    }
++    
++    DPRINTK("finished disconnecting supplicant, channel: %p\n", channel);
++
++    v2v_destroy_channel(channel, 0);
++
++    return 0;
++}
++
++int
++v2v_disconnect(const struct v2v_channel *channel)
++{
++    if (channel->is_temple)
++        return v2v_disconnect_temple(channel);
++    else
++        return v2v_disconnect_supplicant(channel);
++}
++EXPORT_SYMBOL_GPL(v2v_disconnect);
++
++struct v2v_wait*
++v2v_get_wait_state(struct v2v_channel *channel)
++{
++    return &channel->wait_state;
++}
++EXPORT_SYMBOL_GPL(v2v_get_wait_state);
++
++u8
++v2v_get_wake_reason(struct v2v_channel *channel, u8 reasons)
++{
++    return v2v_wrq_dequeue(channel, reasons);
++}
++EXPORT_SYMBOL_GPL(v2v_get_wake_reason);
++
++int
++v2v_set_wake_reason(struct v2v_channel *channel, u8 reason)
++{
++    int err;
++
++    err = v2v_wrq_queue(channel, reason);
++    if (err)
++        return err;
++
++    v2v_set_event(channel);
++
++    return 0;
++}
++EXPORT_SYMBOL_GPL(v2v_set_wake_reason);
++
++void
++v2v_clear_wake_reason(struct v2v_channel *channel, u8 reasons)
++{
++    v2v_wrq_clear(channel, reasons);
++}
++EXPORT_SYMBOL_GPL(v2v_clear_wake_reason);
++
++int
++v2v_nc2_get_message(struct v2v_channel *channel,
++                    const volatile void **msg,
++                    size_t *out_size,
++                    unsigned *type,
++                    unsigned *flags)
++{
++    RING_IDX prod;
++    RING_IDX cons_pvt;
++    const volatile struct netchannel2_msg_hdr *hdr;
++    unsigned size;
++    unsigned counter;
++
++    counter = 0;
++
++retry:
++    cons_pvt = channel->nc2_rings.local_cons_pvt;
++    prod = channel->nc2_rings.remote_endpoint->prod;
++    rmb();
++    if (prod == cons_pvt) {
++        if (channel->nc2_rings.remote_endpoint->producer_active &&
++            counter < CONSUMER_SPIN_LIMIT) {
++            channel->nc2_rings.local_endpoint->consumer_spinning = 1;
++            while (channel->nc2_rings.remote_endpoint->producer_active &&
++                   counter < CONSUMER_SPIN_LIMIT)
++                ;
++            channel->nc2_rings.local_endpoint->consumer_spinning = 0;
++            /* The write to local_endpoint->consumer_spinning needs to
++               come before any write of prod_event which might happen
++               shortly.  Fortunately, they're both volatile, so happen
++               in-order, and we don't need any explicit barriers. */
++            goto retry;
++        }
++        /* ### clear the event (reset) */
++        v2v_wrq_clear(channel, V2V_WAKE_REASON_RECEIVE);
++
++        if (nc2_final_check_for_messages(&channel->nc2_rings, prod)) {
++            /* ### now set the event */
++            v2v_wrq_queue(channel, V2V_WAKE_REASON_RECEIVE);
++            v2v_set_event(channel);
++            goto retry;
++        }
++        return -ENODATA;
++    }
++    hdr = __nc2_incoming_message(&channel->nc2_rings);
++    if (!__nc2_contained_in_cons_ring(&channel->nc2_rings,
++                                      hdr,
++                                      sizeof(*hdr))) {
++        /* This can't happen, unless the other end is misbehaving. */
++invalid_message:
++        EPRINTK("v2v_nc2_get_message - received invalid data!\n");
++        return -EBADMSG;
++    }
++    size = hdr->size;
++    if (size < sizeof(*hdr) ||
++        !__nc2_contained_in_cons_ring(&channel->nc2_rings, hdr, size))
++        goto invalid_message;
++    if (hdr->type == NETCHANNEL2_MSG_PAD) {
++        /* Discard pad message */
++        channel->nc2_rings.local_cons_pvt += size;
++        goto retry;
++    }
++
++    *msg = hdr + 1;
++    *out_size = channel->current_message_size = size - sizeof(*hdr);
++    *type = hdr->type;
++    *flags = hdr->flags;
++
++    return 0;
++}
++EXPORT_SYMBOL_GPL(v2v_nc2_get_message);
++
++void
++v2v_nc2_finish_message(struct v2v_channel *channel)
++{
++    channel->nc2_rings.local_cons_pvt +=
++        (channel->current_message_size + sizeof(struct netchannel2_msg_hdr) + 7) & ~7;
++
++    if (nc2_finish_messages(&channel->nc2_rings)&&(channel->receive_evtchn_irq != 0))
++        notify_remote_via_irq(channel->receive_evtchn_irq);
++}
++EXPORT_SYMBOL_GPL(v2v_nc2_finish_message);
++
++int
++v2v_nc2_prep_message(struct v2v_channel *channel,
++                     size_t msg_size,
++                     unsigned char type,
++                     unsigned char flags,
++                     volatile void **payload)
++{
++    volatile struct netchannel2_msg_hdr *hdr;
++    unsigned short size;
++    unsigned short rounded_size;
++
++    msg_size += sizeof(*hdr);
++    if ( ((msg_size + 7) & ~7) >
++         channel->nc2_rings.producer_payload_bytes ) {
++        EPRINTK("v2v_nc2_prep_message - invalid input\n");
++        return -EINVAL;
++    }
++
++    if (type >= NETCHANNEL2_MSG_PAD) {
++        EPRINTK("v2v_nc2_prep_message - invalid message type: %2.2x\n", type);
++        return -ENOSYS;
++    }
++
++    size = (unsigned short)msg_size;
++    rounded_size = (size + 7) & ~7;
++
++    if (channel->nc2_rings.remote_endpoint->consumer_active)
++        v2v_nc2_send_messages(channel);
++    if (!nc2_can_send_payload_bytes(&channel->nc2_rings, rounded_size)) {
++        /* ### clear the event (reset) */
++        v2v_wrq_clear(channel, V2V_WAKE_REASON_SEND);
++
++        if (!nc2_can_send_payload_bytes(&channel->nc2_rings, rounded_size))
++            return -EAGAIN;
++        
++        /* ### now set the event */
++        v2v_wrq_queue(channel, V2V_WAKE_REASON_SEND);
++        v2v_set_event(channel);
++    }
++    __nc2_avoid_ring_wrap(&channel->nc2_rings, rounded_size);
++    hdr = __nc2_get_message_ptr(&channel->nc2_rings);
++    hdr->size = size;
++    hdr->type = type;
++    hdr->flags = flags;
++    *payload = hdr + 1;
++    channel->nc2_rings.local_prod_pvt += rounded_size;
++    channel->nc2_rings.local_prod_bytes_available -= rounded_size;
++
++    if (channel->nc2_rings.remote_endpoint->consumer_active &&
++        !channel->nc2_rings.local_producer_active &&
++        __nc2_flush_would_trigger_event(&channel->nc2_rings)) {
++        channel->nc2_rings.local_endpoint->producer_active = 1;
++        channel->nc2_rings.local_producer_active = 1;
++
++        if (channel->receive_evtchn_irq != 0)
++            notify_remote_via_irq(channel->receive_evtchn_irq);
++    }
++
++    return 0;
++}
++EXPORT_SYMBOL_GPL(v2v_nc2_prep_message);
++
++/* A rough estimate of the largest size you can pass to prep_message()
++   without needing to either block or generate a pad message */
++unsigned
++v2v_nc2_producer_bytes_available(struct v2v_channel *channel)
++{
++    RING_IDX cons;
++    RING_IDX prod;
++    unsigned mask;
++    unsigned res;
++
++    cons = channel->nc2_rings.remote_endpoint->cons;
++    prod = channel->nc2_rings.local_prod_pvt;
++    mask = channel->nc2_rings.producer_payload_bytes - 1;
++    if ( (cons & mask) > (prod & mask) ) {
++        res = (cons & mask) - (prod & mask);
++    } else {
++        res = channel->nc2_rings.producer_payload_bytes - (prod & mask);
++        if (res < 16)
++            res = cons & mask;
++    }
++    if (res < sizeof(struct netchannel2_msg_hdr) + 8)
++        return 0;
++    else
++        return res - sizeof(struct netchannel2_msg_hdr) - 8;
++}
++EXPORT_SYMBOL_GPL(v2v_nc2_producer_bytes_available);
++
++void
++v2v_nc2_send_messages(struct v2v_channel *channel)
++{
++    if (nc2_flush_ring(&channel->nc2_rings)) {
++        /* The read of consumer_spinning needs to be after the read of
++         * prod_event in nc2_flush_ring().  Both fields are volatile,
++         * so the compiler gives us that for free and we don't need
++         * explicit barriers. */
++        if (!channel->nc2_rings.remote_endpoint->consumer_spinning) {
++            if (channel->send_evtchn_irq != 0)
++                notify_remote_via_irq(channel->send_evtchn_irq);
++        }
++        if (channel->nc2_rings.local_producer_active) {
++            channel->nc2_rings.local_producer_active = 0;
++            channel->nc2_rings.local_endpoint->producer_active = 0;
++        }
++    }
++}
++EXPORT_SYMBOL_GPL(v2v_nc2_send_messages);
++
++void
++v2v_nc2_request_fast_receive(struct v2v_channel *channel)
++{
++    channel->nc2_rings.local_endpoint->consumer_active = 1;
++}
++EXPORT_SYMBOL_GPL(v2v_nc2_request_fast_receive);
++
++void
++v2v_nc2_cancel_fast_receive(struct v2v_channel *channel)
++{
++    channel->nc2_rings.local_endpoint->consumer_active = 0;
++}
++EXPORT_SYMBOL_GPL(v2v_nc2_cancel_fast_receive);
++
++int
++v2v_nc2_remote_requested_fast_wakeup(struct v2v_channel *channel)
++{
++    if (channel->nc2_rings.remote_endpoint->consumer_active)
++        return 1;
++    else
++        return 0;
++}
++EXPORT_SYMBOL_GPL(v2v_nc2_remote_requested_fast_wakeup);
++
++static int __init v2v_init(void)
++{
++    if (!is_running_on_xen())
++        return -ENODEV;
++
++    printk("Xen V2V driver installed.\n");
++
++    return 0;
++}
++
++static void __exit v2v_cleanup(void)
++{
++}
++
++module_init(v2v_init);
++module_exit(v2v_cleanup);
++
++MODULE_LICENSE("Dual BSD/GPL");
++
+diff --git a/drivers/xen/v2v/v2v_private.h b/drivers/xen/v2v/v2v_private.h
+new file mode 100644
+index 0000000..b2f2a90
+--- /dev/null
++++ b/drivers/xen/v2v/v2v_private.h
+@@ -0,0 +1,82 @@
++/******************************************************************************
++ * drivers/xen/v2v/v2v_private.h
++ *
++ * V2V interdomain communication internal definitions.
++ * 
++ * Copyright (c) 2009 Ross Philipson
++ * Copyright (c) 2009 Citrix Systems, Inc.
++ *
++ * This program is free software; you can redistribute it and/or
++ * modify it under the terms of the GNU General Public License version 2
++ * as published by the Free Software Foundation; or, when distributed
++ * separately from the Linux kernel or incorporated into other
++ * software packages, subject to the following license:
++ *
++ * Permission is hereby granted, free of charge, to any person obtaining a copy
++ * of this source file (the "Software"), to deal in the Software without
++ * restriction, including without limitation the rights to use, copy, modify,
++ * merge, publish, distribute, sublicense, and/or sell copies of the Software,
++ * and to permit persons to whom the Software is furnished to do so, subject to
++ * the following conditions:
++ *
++ * The above copyright notice and this permission notice shall be included in
++ * all copies or substantial portions of the Software.
++ *
++ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
++ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
++ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
++ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
++ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
++ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
++ * IN THE SOFTWARE.
++ */
++#ifndef V2V_PRIVATE_H__
++#define V2V_PRIVATE_H__
++
++enum xenstore_scatter_type {
++    xenstore_scatter_type_bad,
++    xenstore_scatter_type_grant_ref = 0xbeef,
++    xenstore_scatter_type_evtchn_irq,
++    xenstore_scatter_type_string,
++    xenstore_scatter_type_int
++};
++int v2v_xenstore_scatter(struct xenbus_transaction xbt, const char *prefix, ...);
++
++enum xenstore_gather_type {
++    xenstore_gather_type_bad,
++    xenstore_gather_type_alien_grant_ref = 0xfeeb,
++    xenstore_gather_type_alien_evtchn_port,
++    xenstore_gather_type_int
++};
++int v2v_xenstore_gather(struct xenbus_transaction xbt, const char *prefix, ...);
++
++int v2v_xenops_grant_map(struct vm_struct **vm_area_out,
++                         grant_handle_t *ghandles_out,
++                         domid_t domid,
++                         unsigned int nr_grefs,
++                         grant_ref_t *grefs,
++                         int readonly);
++
++void v2v_xenops_grant_unmap(struct vm_struct *vm_area,
++                            grant_handle_t *ghandles,
++                            unsigned int nr_grefs,
++                            int readonly);
++
++
++#define MAX_RING_PAGE_ORDER   4
++#define MAX_RING_PAGES        (1 << MAX_RING_PAGE_ORDER)
++
++#define       INVALID_GRANT_HANDLE  ((grant_handle_t)~0U)
++#define GRANT_INVALID_REF     0
++
++#define DPRINTK(fmt, args...)                         \
++    pr_debug("v2v (%s:%d) " fmt,                  \
++         __FUNCTION__, __LINE__, ##args)
++#define IPRINTK(fmt, args...)                         \
++    printk(KERN_INFO "v2v: " fmt, ##args)
++#define WPRINTK(fmt, args...)                         \
++    printk(KERN_WARNING "v2v: " fmt, ##args)
++#define EPRINTK(fmt, args...)                         \
++    printk(KERN_ERR "v2v: " fmt, ##args)
++
++#endif /* !V2V_PRIVATE_H__ */
+diff --git a/drivers/xen/v2v/v2vdrv.c b/drivers/xen/v2v/v2vdrv.c
+new file mode 100644
+index 0000000..0c362bb
+--- /dev/null
++++ b/drivers/xen/v2v/v2vdrv.c
+@@ -0,0 +1,369 @@
++/******************************************************************************
++ * drivers/xen/v2v/v2vdrv.c
++ *
++ * V2V sample client driver.
++ * 
++ * Copyright (c) 2009 Ross Philipson
++ * Copyright (c) 2009 Citrix Systems, Inc.
++ *
++ * This program is free software; you can redistribute it and/or
++ * modify it under the terms of the GNU General Public License version 2
++ * as published by the Free Software Foundation; or, when distributed
++ * separately from the Linux kernel or incorporated into other
++ * software packages, subject to the following license:
++ *
++ * Permission is hereby granted, free of charge, to any person obtaining a copy
++ * of this source file (the "Software"), to deal in the Software without
++ * restriction, including without limitation the rights to use, copy, modify,
++ * merge, publish, distribute, sublicense, and/or sell copies of the Software,
++ * and to permit persons to whom the Software is furnished to do so, subject to
++ * the following conditions:
++ *
++ * The above copyright notice and this permission notice shall be included in
++ * all copies or substantial portions of the Software.
++ *
++ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
++ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
++ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
++ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
++ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
++ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
++ * IN THE SOFTWARE.
++ */
++
++#include <linux/init.h>
++#include <linux/module.h>
++#include <linux/kernel.h>
++#include <linux/sched.h>
++#include <linux/slab.h>
++#include <linux/string.h>
++#include <linux/errno.h>
++#include <linux/fs.h>
++#include <linux/miscdevice.h>
++#include <linux/major.h>
++#include <linux/proc_fs.h>
++#include <linux/poll.h>
++#include <linux/jiffies.h>
++#include <xen/xenbus.h>
++#include "v2vdrv.h"
++
++static const char v2vdrv_usage[] = \
++    "Run as either the listener or connector by writing the following information:\n" \
++    "listener,<local_prefix>\n" \
++    "connector,<local_prefix>,[count],[size],[timout]\n" \
++    "    local_prefix: XenStore local path for the V2V endpoint\n" \
++    "    count: (opt) number of messages to send\n" \
++    "    size: (opt) size of each message\n" \
++    "    timout: (opt) timeout in ms for awaiting response\n" \
++    "Also a xenstore reader test routine:\n" \
++    "reader,<path>[:value]\n";
++
++extern void v2vdrv_run_connector(struct v2vdrv_config *config);
++extern void v2vdrv_run_listener(struct v2vdrv_config *config);
++
++enum v2vdrv_role {
++    role_unknown = 0,
++    role_listener,
++    role_connector,
++    role_reader
++};
++
++static void v2vdrv_run_reader(struct v2vdrv_config *config)
++{
++    char *path = NULL;
++    char *value = NULL;
++    char *result = NULL;
++    int i;
++    size_t len;
++
++    len = strlen(config->local_prefix);
++
++    for (i = 0; i < len; i++)
++        if (config->local_prefix[i] == ':')
++            break;
++
++    printk("Running reader for: %s len: %d i: %d\n", config->local_prefix, len, i);
++
++    if (i < len) {
++        path = kmalloc(i + 1, GFP_KERNEL);
++        if (!path) {
++            printk("reader out of memory\n");
++            goto out;
++        }
++        strncpy(path, config->local_prefix, i);
++        path[i] = '\0';
++        value = kmalloc(len - i, GFP_KERNEL);
++        if (!value) {
++            printk("reader out of memory\n");
++            goto out;
++        }
++        strncpy(value, config->local_prefix + i + 1, len - i - 1);
++        value[len - i - 1] = '\0';
++        printk("reader path: %s value: %s\n", path, value);
++    }
++    else {
++        path = kmalloc(len + 1, GFP_KERNEL);
++        if (!path) {
++            printk("reader out of memory\n");
++            goto out;
++        }
++        strncpy(path, config->local_prefix, len);
++        path[len] = '\0';
++        printk("reader path: %s value: (null)\n", path);
++    }
++
++    result = xenbus_read(XBT_NIL, path, (value == NULL ? "" : value), NULL);
++    if (IS_ERR(result)) {
++        printk("reader xenbus read failed - err: %li\n", PTR_ERR(result));
++        goto out;
++    }
++    printk("reader result: %s\n", result);
++    kfree(result);
++
++out:
++    if (path)
++        kfree(path);
++    if (value)
++        kfree(value);
++}
++
++static enum v2vdrv_role v2vdrv_parse_config(const char *cfgstr, struct v2vdrv_config *config)
++{
++    size_t len;
++    enum v2vdrv_role role = role_unknown;
++    int i, err, val, parsed;
++
++    len = strlen(cfgstr);
++
++    do {
++        if ((len > 9)&&(strncmp(cfgstr, "listener,", 9) == 0)) {
++            cfgstr += 9;
++            len -= 9;     
++            role = role_listener;
++        }
++        else if ((len > 10)&&(strncmp(cfgstr, "connector,", 10) == 0)) {
++            cfgstr += 10;
++            len -= 10;
++            role = role_connector;
++        }
++        else if ((len > 7)&&(strncmp(cfgstr, "reader,", 7) == 0)) {
++            cfgstr += 7;
++            len -= 7;
++            role = role_reader;
++        }
++        else
++            break;
++
++        /* some defaults */
++        config->xfer_count = 0; /* connect only, no data */
++        config->xfer_size = 512;
++        config->timeout = V2VDRV_RESPONSE_WAIT_TIMEOUT; 
++
++        for (i = 0; i < len; i++)
++            if (cfgstr[i] == ',')
++                break;
++
++        /* this is our local prefix */
++        config->local_prefix = kmalloc(i + 1, GFP_KERNEL);
++        if (!config->local_prefix) {
++            role = role_unknown;
++            break;
++        }
++        strncpy(config->local_prefix, cfgstr, i);
++        config->local_prefix[i] = '\0';
++
++        if (role == role_listener || role == role_reader)
++            break;
++
++        if (i == len) /* no opt args */
++            break;
++
++        /* else this is a connector with more opt args */
++        cfgstr += i + 1;
++        parsed = 0;
++        err = sscanf(cfgstr, "%d,%n", &val, &parsed);
++        if (err < 1)
++            break;
++                
++        config->xfer_count = (uint32_t)val;
++        if (parsed == 0)
++            break;
++        
++        cfgstr += parsed;
++        parsed = 0;
++        err = sscanf(cfgstr, "%d,%n", &val, &parsed);
++        if (err < 1)
++            break;
++        config->xfer_size = (uint32_t)val;
++        if (parsed == 0)
++            break;
++
++        cfgstr += parsed;
++        parsed = 0;
++        err = sscanf(cfgstr, "%d,%n", &val, &parsed);
++        if (err < 1)
++            break;
++        config->timeout = msecs_to_jiffies((const unsigned long)val);
++    } while (0);
++
++    return role;
++}
++
++static ssize_t v2vdrv_read(struct file *file, char __user *buf,
++                         size_t count, loff_t *ppos)
++{
++    size_t len;
++    unsigned int i;
++    char __user *p = buf;
++
++    len = strlen(v2vdrv_usage);
++
++    if (*ppos >= len)
++        return 0;
++
++    for (i = *ppos; count > 0 && i < len; ++i, ++p, --count)
++        if (__put_user(v2vdrv_usage[i], p))
++              return -EFAULT;
++
++    *ppos = i;
++
++    return p - buf;
++}
++
++static ssize_t v2vdrv_write(struct file *file, const char __user *buf,
++                          size_t count, loff_t *ppos)
++{
++    char *cfgstr = NULL;
++    struct v2vdrv_config *config = NULL;
++    enum v2vdrv_role role;
++    size_t written = -EFAULT;
++    
++    if (count == 0)
++        return 0;
++    
++    cfgstr = kmalloc(count + 1, GFP_KERNEL);
++    if (!cfgstr) {
++        written = -ENOMEM;
++        goto out;
++    }
++
++    config = kmalloc(sizeof(struct v2vdrv_config), GFP_KERNEL);
++    if (!cfgstr) {
++        written = -ENOMEM;
++        goto out;
++    }
++    memset(config, 0, sizeof(struct v2vdrv_config));
++
++    if (copy_from_user(cfgstr, buf, count) != 0)
++        goto out;
++    cfgstr[count] = '\0';
++    written = count;
++    /* strip off trailing newline */
++    if (cfgstr[count - 1] == '\n')
++        cfgstr[count - 1] = '\0';
++
++    role = v2vdrv_parse_config(cfgstr, config);
++
++    if (role == role_listener) {
++        printk("V2V-DRV loaded listener config...\n");
++        printk("        local prefix: %s\n", config->local_prefix);
++        v2vdrv_run_listener(config);
++    }
++    else if (role == role_connector) {
++        printk("V2V-DRV loaded connector config...\n");
++        printk("        local prefix: %s\n", config->local_prefix);
++        printk("        count: %d size: %d timeout: %lu\n",
++               config->xfer_count, config->xfer_size, config->timeout);
++        v2vdrv_run_connector(config);
++    }
++    else if (role == role_reader) {
++        printk("V2V-DRV loaded reader config...\n");
++        printk("        local prefix: %s\n", config->local_prefix);
++        v2vdrv_run_reader(config);
++    }
++    else
++        printk("V2V-DRV invalid configuration written: %s\n", cfgstr);
++    
++out:
++    if (config) {
++        if (config->local_prefix)
++            kfree(config->local_prefix);
++        kfree(config);
++    }
++    if (cfgstr)
++        kfree(cfgstr);
++    *ppos = 0;
++
++    return written;
++}
++
++static long v2vdrv_ioctl(struct file *file,
++                       unsigned int cmd, unsigned long arg)
++{   
++    int rc;
++
++    switch (cmd) {
++    default:
++        rc = -ENOSYS;
++        break;
++    }
++
++    return rc;
++}
++
++static int v2vdrv_open(struct inode *inode, struct file *filp)
++{
++    printk("V2V-DRV open request\n");
++    //filp->private_data = whatever;
++    return 0;
++}
++
++static int v2vdrv_release(struct inode *inode, struct file *filp)
++{
++    printk("V2V-DRV release request\n");
++    //filp->private_data
++    return 0;
++}
++
++static const struct file_operations v2vdrv_fops = {
++    .owner   = THIS_MODULE,
++    .write   = v2vdrv_write,
++    .read    = v2vdrv_read,
++    .unlocked_ioctl = v2vdrv_ioctl,
++    .open    = v2vdrv_open,
++    .release = v2vdrv_release,
++};
++
++static struct miscdevice v2vdrv_miscdev = {
++    .minor        = MISC_DYNAMIC_MINOR,
++    .name         = "v2vdrv",
++    .fops         = &v2vdrv_fops,
++};
++
++static int __init v2vdrv_init(void)
++{
++    int err = 0;
++
++    if (!is_running_on_xen())
++        return -ENODEV;
++
++    err = misc_register(&v2vdrv_miscdev);
++    if (err != 0) {
++        printk(KERN_ALERT "Could not register /dev/misc/v2vdrv\n");
++        return err;
++    }
++    
++    printk("Xen V2V sample device installed.\n");
++
++    return 0;
++}
++
++static void __exit v2vdrv_cleanup(void)
++{
++    misc_deregister(&v2vdrv_miscdev);
++}
++
++module_init(v2vdrv_init);
++module_exit(v2vdrv_cleanup);
++
++MODULE_LICENSE("Dual BSD/GPL");
+diff --git a/drivers/xen/v2v/v2vdrv.h b/drivers/xen/v2v/v2vdrv.h
+new file mode 100644
+index 0000000..8c3cd06
+--- /dev/null
++++ b/drivers/xen/v2v/v2vdrv.h
+@@ -0,0 +1,93 @@
++/******************************************************************************
++ * drivers/xen/v2v/v2vdrv.h
++ *
++ * V2V sample client driver definitions.
++ * 
++ * Copyright (c) 2009 Ross Philipson
++ * Copyright (c) 2009 Citrix Systems, Inc.
++ *
++ * This program is free software; you can redistribute it and/or
++ * modify it under the terms of the GNU General Public License version 2
++ * as published by the Free Software Foundation; or, when distributed
++ * separately from the Linux kernel or incorporated into other
++ * software packages, subject to the following license:
++ *
++ * Permission is hereby granted, free of charge, to any person obtaining a copy
++ * of this source file (the "Software"), to deal in the Software without
++ * restriction, including without limitation the rights to use, copy, modify,
++ * merge, publish, distribute, sublicense, and/or sell copies of the Software,
++ * and to permit persons to whom the Software is furnished to do so, subject to
++ * the following conditions:
++ *
++ * The above copyright notice and this permission notice shall be included in
++ * all copies or substantial portions of the Software.
++ *
++ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
++ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
++ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
++ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
++ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
++ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
++ * IN THE SOFTWARE.
++ */
++#ifndef V2V_DRV_H__
++#define V2V_DRV_H__
++
++#define V2VDRV_LOGTAG "V2V-DRV"
++
++#define V2VDRV_RESPONSE_WAIT_TIMEOUT 2000
++
++#define V2V_MESSAGE_TYPE_INTERNAL 10
++#define V2V_MESSAGE_TYPE_FILE     15
++
++#define V2V_MESSAGE_STATUS_OK        0
++#define V2V_MESSAGE_STATUS_EOF       1
++#define V2V_MESSAGE_STATUS_MORE      2
++#define V2V_MESSAGE_STATUS_BADCS     0xFFFFF100
++#define V2V_MESSAGE_STATUS_BADSEQ    0xFFFFF101
++#define V2V_MESSAGE_STATUS_NODATA    0xFFFFF102
++#define V2V_MESSAGE_STATUS_WRITE_ERR 0xFFFFF103
++
++struct v2vdrv_frame_header {
++    uint16_t id;
++    uint8_t  type;
++    uint8_t  cs;
++    uint32_t length;
++};
++
++struct v2vdrv_post_internal {
++    struct v2vdrv_frame_header header;
++    uint8_t guid[16];
++    /* data */
++};
++
++struct v2vdrv_resp_internal {
++    struct v2vdrv_frame_header header;
++    uint32_t status;
++    uint8_t guid[16];
++};
++
++struct v2vdrv_listener_resp_item {    
++    struct v2vdrv_resp_internal resp;
++    struct v2vdrv_listener_resp_item *next;
++};
++
++struct v2vdrv_config {
++    char *local_prefix;
++    uint32_t xfer_count;
++    uint32_t xfer_size;
++    unsigned long timeout;
++};
++
++static inline uint8_t v2vdrv_checksum(const uint8_t *ptr, uint32_t length)
++{
++    uint32_t count;
++    uint8_t sum;
++
++    for (count = 0, sum = 0; count < length; count++)
++        sum = sum + ptr[count];
++
++    return -sum;
++}
++
++#endif /* !V2V_DRV_H__ */
+diff --git a/drivers/xen/v2v/v2vops.c b/drivers/xen/v2v/v2vops.c
+new file mode 100644
+index 0000000..ff06deb
+--- /dev/null
++++ b/drivers/xen/v2v/v2vops.c
+@@ -0,0 +1,698 @@
++/******************************************************************************
++ * drivers/xen/v2v/v2vops.c
++ *
++ * V2V sample client driver synchronous operations.
++ * 
++ * Copyright (c) 2009 Ross Philipson
++ * Copyright (c) 2009 Citrix Systems, Inc.
++ *
++ * This program is free software; you can redistribute it and/or
++ * modify it under the terms of the GNU General Public License version 2
++ * as published by the Free Software Foundation; or, when distributed
++ * separately from the Linux kernel or incorporated into other
++ * software packages, subject to the following license:
++ *
++ * Permission is hereby granted, free of charge, to any person obtaining a copy
++ * of this source file (the "Software"), to deal in the Software without
++ * restriction, including without limitation the rights to use, copy, modify,
++ * merge, publish, distribute, sublicense, and/or sell copies of the Software,
++ * and to permit persons to whom the Software is furnished to do so, subject to
++ * the following conditions:
++ *
++ * The above copyright notice and this permission notice shall be included in
++ * all copies or substantial portions of the Software.
++ *
++ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
++ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
++ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
++ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
++ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
++ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
++ * IN THE SOFTWARE.
++ */
++
++#include <linux/init.h>
++#include <linux/module.h>
++#include <linux/kernel.h>
++#include <linux/string.h>
++#include <linux/errno.h>
++#include <linux/sched.h>
++#include <linux/wait.h>
++#include <linux/time.h>
++#include <linux/random.h>
++#include <xen/xenbus.h>
++#include <xen/v2v.h>
++#include <xen/interface/io/uring.h>
++#include "v2vdrv.h"
++
++/************************** GENERAL **************************/
++static int
++v2vdrv_message_header_check(const char *rstr,
++                            struct v2vdrv_frame_header *header,
++                            size_t msg_size,
++                            size_t min_size,
++                            uint32_t rx_counter)
++{
++    if ((msg_size < sizeof(struct v2vdrv_frame_header))||(msg_size < min_size)) {
++        printk("%s (%s) response #%d is too small!!!\n", V2VDRV_LOGTAG, rstr, rx_counter);
++        return 0;
++    }
++    if (header->length < msg_size) {
++        printk("%s (%s) response #%d header length incorrect!!!\n", V2VDRV_LOGTAG, rstr, rx_counter);
++        return 0;
++    }       
++    
++    printk("%s (%s) received message #%d\n", V2VDRV_LOGTAG, rstr, rx_counter);
++    printk("------ id=%d type=%d length=0x%x\n", header->id, header->type, header->length);
++
++    return 1;
++}
++
++/************************* CONNECTOR *************************/
++struct v2vdrv_connector_context {
++    struct v2vdrv_config *config;
++    struct v2v_channel *channel;
++    uint32_t tx_counter;
++    uint32_t rx_counter;
++};
++
++static int
++v2vdrv_connect(struct v2vdrv_connector_context *vcc)
++{
++    int err = 0;    
++    enum v2v_endpoint_state state;
++    struct v2v_wait *wait_state;
++    u8 reasons;
++    unsigned long to, td, rc;
++    struct timespec ts = {0}, tsz = {0}, now, delta;
++
++    /* Connect to the listener, get back a channel handle */
++    err = v2v_connect(vcc->config->local_prefix, &vcc->channel);
++    if (err) {
++        printk("%s connector(%p) failure in v2v_connect() - error: %d\n", V2VDRV_LOGTAG, vcc, err);
++        return err;
++    }
++
++    BUG_ON(vcc->channel == NULL);
++
++    printk("%s connector(%p) connected to listener; wait for listenter to indicate it has accepted the connection...\n", V2VDRV_LOGTAG, vcc);    
++
++    wait_state = v2v_get_wait_state(vcc->channel);
++    to = vcc->config->timeout << 2; /* in jiffies x4*/
++    
++    do {
++        if (!timespec_equal(&ts, &tsz)) {
++            now = current_kernel_time();
++            delta = timespec_sub(now, ts);
++            td = timespec_to_jiffies(&delta);
++            if (td < to) /* rundown timer */ 
++                to -= td;
++            else
++                to = 0;            
++        }
++        else {
++            /* initial state */            
++            ts = current_kernel_time();           
++        }
++
++        rc = wait_event_timeout(wait_state->wait_event,
++                                atomic_xchg(&wait_state->wait_condition, 0) == 1,
++                                to);
++        if (rc == 0) {
++            printk("%s connector(%p) timed out waiting for accept from listener; disconnecting\n",
++                   V2VDRV_LOGTAG, vcc);
++            err = -ETIMEDOUT;
++            break;
++        }
++
++        reasons = v2v_get_wake_reason(vcc->channel, V2V_WAKE_REASON_CONTROL);
++        if (reasons & V2V_WAKE_REASON_CONTROL) {
++            err = v2v_get_remote_state(vcc->channel, &state);
++            if (err) {
++                printk("%s connector(%p) failure in v2v_get_remote_state(); aborting - error: %d\n",
++                       V2VDRV_LOGTAG, vcc, err);
++                break;
++            }
++            printk("%s connector(%p) state changed for other end - new state: %s\n",
++                   V2VDRV_LOGTAG, vcc, v2v_endpoint_state_name(state));
++            if (state == v2v_state_connected) {
++                printk("%s connector(%p) listener reports connected; begin processing messages.\n",
++                       V2VDRV_LOGTAG, vcc);
++                err = 0;
++                break;
++            }
++        }
++    } while (1);
++
++    if (err)
++        v2v_disconnect(vcc->channel);
++
++    return err;
++}
++
++static int
++v2vdrv_connector_process_internal_rx(struct v2vdrv_connector_context *vcc)
++{
++    int err;
++    volatile void *msg;
++    size_t size;
++    unsigned type;
++    unsigned flags;
++    struct v2vdrv_frame_header *header;
++    struct v2vdrv_resp_internal *vri;
++    uint8_t sum;
++
++    while ((err = v2v_nc2_get_message(vcc->channel, (const volatile void **)&msg, &size, &type, &flags))
++            == 0) {
++        vcc->rx_counter++;   
++        header = (struct v2vdrv_frame_header*)msg;
++        if (!v2vdrv_message_header_check("connector", header, size,
++                                         sizeof(struct v2vdrv_resp_internal),
++                                         vcc->rx_counter)) {
++            v2v_nc2_finish_message(vcc->channel);
++            return -EBADMSG;
++        }
++
++        vri = (struct v2vdrv_resp_internal*)msg;
++        printk("------ message status=%d\n", vri->status);
++        printk("------ GUID1=%2.2x:%2.2x:%2.2x:%2.2x:%2.2x:%2.2x:%2.2x:%2.2x\n",
++               vri->guid[0], vri->guid[1], vri->guid[2], vri->guid[3],
++               vri->guid[4], vri->guid[5], vri->guid[6], vri->guid[7]);
++        printk("------ GUID2=%2.2x:%2.2x:%2.2x:%2.2x:%2.2x:%2.2x:%2.2x:%2.2x\n",
++               vri->guid[8], vri->guid[9], vri->guid[10], vri->guid[11], 
++               vri->guid[12], vri->guid[13], vri->guid[14], vri->guid[15]);
++
++        sum = v2vdrv_checksum((const uint8_t*)msg, header->length);
++        if (sum != 0)         
++            printk("%s connector(%p) bad checksumm on response #%d!!!\n", V2VDRV_LOGTAG, vcc, vcc->rx_counter);
++
++        v2v_nc2_finish_message(vcc->channel);
++    }
++    if (err == -ENODATA) {
++        /* No more messages */
++        printk("%s connector(%p) no more messages, returning\n", V2VDRV_LOGTAG, vcc);
++        return 0;
++    }
++
++    printk("%s connector(%p) receive internal data failure; abort processing - error: %d\n",
++           V2VDRV_LOGTAG, vcc, err);
++    return err; /* failure */
++}
++
++static int
++v2vdrv_connector_process_internal_tx(struct v2vdrv_connector_context *vcc)
++{
++    int err;
++    unsigned available;
++    volatile void *msg;
++    uint8_t *msgp;
++    struct v2vdrv_frame_header *header;
++    struct v2vdrv_post_internal *vpi;
++
++    printk("%s connector(%p) sending internal message #%d\n", V2VDRV_LOGTAG, vcc, vcc->tx_counter + 1);
++    available = v2v_nc2_producer_bytes_available(vcc->channel);
++    printk("%s connector(%p) channel indicates minimum bytes available: 0x%x\n", V2VDRV_LOGTAG, vcc, available);
++
++    err = v2v_nc2_prep_message(vcc->channel, vcc->config->xfer_size, V2V_MESSAGE_TYPE_INTERNAL, 0, &msg);
++    if (err) {
++        if (err == -EAGAIN) {
++            /* No room right now, return and try again later */
++            printk("%s connector(%p) not enough buffer space to send message #%d; retry\n",
++                   V2VDRV_LOGTAG, vcc, vcc->tx_counter + 1);
++            return -EAGAIN;
++        }
++        printk("%s connector(%p) transmit internal data failure; abort processing - error: %d\n",
++               V2VDRV_LOGTAG, vcc, err);
++        return err; /* failure */
++    }
++    vcc->tx_counter++; /* next message */
++    header = (struct v2vdrv_frame_header*)msg;
++    header->id = (uint16_t)vcc->tx_counter;
++    header->type = V2V_MESSAGE_TYPE_INTERNAL;
++    header->cs = 0;
++    header->length = vcc->config->xfer_size;
++    vpi = (struct v2vdrv_post_internal*)msg;
++    generate_random_uuid(vpi->guid);
++    
++    /* Fill it up with some data and send it */
++    msgp = (uint8_t*)msg + sizeof(struct v2vdrv_post_internal);
++    memset(msgp, 'X', (vcc->config->xfer_size - sizeof(struct v2vdrv_post_internal)));
++    header->cs = v2vdrv_checksum((const uint8_t*)msg, vcc->config->xfer_size);
++    v2v_nc2_send_messages(vcc->channel);
++
++    /* Keep the send loop going by setting the event. If there is no more room, the prep message call
++       will return ERROR_RETRY and just land us back in the wait. */
++    v2v_set_wake_reason(vcc->channel, V2V_WAKE_REASON_SEND);
++    
++    return 0;
++}
++
++static void
++v2vdrv_connector_process_messages(struct v2vdrv_connector_context *vcc)
++{
++    int err = 0, wait_to = 0, done = 0;
++    struct v2v_wait *wait_state;
++    u8 reasons_mask = V2V_WAKE_REASON_CONTROL|V2V_WAKE_REASON_RECEIVE|V2V_WAKE_REASON_SEND;
++    u8 reasons;
++    enum v2v_endpoint_state state;
++    unsigned long to, td, rc;
++    struct timespec ts = {0}, tsz = {0}, now, delta;
++
++    /* A transfer count of 0 is used to just test connecting and disconnecting
++       w/o sending any data */
++    if (vcc->config->xfer_count == 0) {      
++        printk("%s connector(%p) tranfer count set to 0; disconnecting.\n", V2VDRV_LOGTAG, vcc);
++        return;
++    }
++
++    wait_state = v2v_get_wait_state(vcc->channel);
++    to = vcc->config->timeout << 2; /* in jiffies x4*/
++
++    /* Send our first file chunk to the listener to start things off */
++    err = v2vdrv_connector_process_internal_tx(vcc);
++    if (err) {        
++        /* we should not get an -EAGAIN on first message, return it as an error */
++        BUG_ON(err == -EAGAIN); /* SNO on first message */
++        printk("%s connector(%p) transmit internal data failure on first message; abort processing - error: %d\n",
++               V2VDRV_LOGTAG, vcc, err);
++        return;
++    }
++
++    /* Start out processing loop, wait for a response and send more file chunks */
++    do {
++        /* When the tx counter reaches the transfer count value, stop sending and wait for 
++           the rest of the responses */
++        if (vcc->tx_counter == vcc->config->xfer_count) {
++            /* First see if we are done */
++            if (vcc->rx_counter == vcc->tx_counter) {
++                printk("%s connector(%p) received all remaing responses from listener; disconnecting.\n", V2VDRV_LOGTAG, vcc);
++                err = 0;
++                break;
++            }
++            if (!timespec_equal(&ts, &tsz)) {
++                now = current_kernel_time();
++                delta = timespec_sub(now, ts);
++                td = timespec_to_jiffies(&delta);
++                if (td < to) /* rundown timer */ 
++                    to -= td;
++                else
++                    to = 0;            
++            }
++            else {
++                /* initial state */
++                wait_to = 1;
++                reasons_mask = V2V_WAKE_REASON_CONTROL|V2V_WAKE_REASON_RECEIVE;                
++                ts = current_kernel_time();           
++            }
++        }
++
++        if (!wait_to) {
++            rc = 1;
++            wait_event(wait_state->wait_event,
++                       atomic_xchg(&wait_state->wait_condition, 0) == 1);
++        }
++        else {
++            rc = wait_event_timeout(wait_state->wait_event,
++                                    atomic_xchg(&wait_state->wait_condition, 0) == 1,
++                                    to);
++        }
++        if (rc == 0) {
++            printk("%s connector(%p) timed out waiting for ack responses from listener; disconnecting\n",
++                   V2VDRV_LOGTAG, vcc);
++            break;         
++        }
++
++        do {
++            reasons = v2v_get_wake_reason(vcc->channel, reasons_mask);
++
++            if (reasons & V2V_WAKE_REASON_CONTROL) {
++                err = v2v_get_remote_state(vcc->channel, &state);
++                if (err) {
++                    printk("%s connector(%p) failure in v2v_get_remote_state(); aborting - error: %d\n",
++                           V2VDRV_LOGTAG, vcc, err);
++                    done = 1;
++                    break;
++                }
++                printk("%s connector(%p) state changed for other end - new state: %s\n",
++                       V2VDRV_LOGTAG, vcc, v2v_endpoint_state_name(state));
++                if (v2v_state_requests_disconnect(state)) {
++                    printk("%s connector(%p) main processing loop ending for disconnect request...\n",
++                           V2VDRV_LOGTAG, vcc);
++                    err = 0;
++                    done = 1;
++                    break;
++                }
++            }
++
++            if (reasons & V2V_WAKE_REASON_SEND) {
++                if (vcc->tx_counter != vcc->config->xfer_count) {
++                    err = v2vdrv_connector_process_internal_tx(vcc);
++                    if ((err != 0)&&(err != -EAGAIN)) {
++                        done = 1;
++                        break;
++                    }
++                }
++            }
++
++            if (reasons & V2V_WAKE_REASON_RECEIVE) {
++                err = v2vdrv_connector_process_internal_rx(vcc);
++                if (err) {
++                    done = 1;
++                    break;
++                }
++            }
++        } while (reasons != V2V_WAKE_REASON_NONE);
++
++        if (done)
++            break;
++
++    } while (1);
++}
++
++static void
++v2vdrv_connector_disconnect(struct v2vdrv_connector_context *vcc)
++{
++    int err;
++
++    printk("%s connector(%p) Disconnecting...\n", V2VDRV_LOGTAG, vcc);
++    err = v2v_disconnect(vcc->channel);
++    printk("%s connector(%p) Disconnected - status: %d\n", V2VDRV_LOGTAG, vcc, err);
++
++    printk("%s connector(%p) Sent message counter: %d\n", V2VDRV_LOGTAG, vcc, vcc->tx_counter);
++    printk("%s connector(%p) Received response counter: %d\n", V2VDRV_LOGTAG, vcc, vcc->rx_counter);
++
++    if (vcc->tx_counter != vcc->rx_counter)
++        printk("%s connector(%p) WARNING Response count does not match the send count\n", V2VDRV_LOGTAG, vcc);
++}
++
++void v2vdrv_run_connector(struct v2vdrv_config *config)
++{
++    struct v2vdrv_connector_context *vcc;
++    int err;
++
++    vcc = kmalloc(sizeof(struct v2vdrv_connector_context), GFP_KERNEL);
++    if (!vcc) {
++        printk("%s connector out of memory\n", V2VDRV_LOGTAG);
++        return;
++    }
++    memset(vcc, 0, sizeof(struct v2vdrv_connector_context));
++    vcc->config = config;
++
++    err = v2vdrv_connect(vcc);
++    if (err)
++        return;
++
++    /* This runs the main processing loop, when it is done we disconnect
++       and cleanup regardless of what may have occured */
++    v2vdrv_connector_process_messages(vcc);
++
++    v2vdrv_connector_disconnect(vcc);
++
++    kfree(vcc);
++}
++
++/************************* LISTENER *************************/
++struct v2vdrv_listener_context {
++    struct v2vdrv_config *config;
++    struct v2v_channel *channel;
++    uint32_t tx_counter;
++    uint32_t rx_counter;
++    struct v2vdrv_listener_resp_item *resp_list;
++    struct v2vdrv_listener_resp_item *resp_tail;
++};
++
++static int
++v2vdrv_listen_accept(struct v2vdrv_listener_context *vlc)
++{
++    int err, err2;
++
++    /* Start the listener, get back a channel handle */
++    err = v2v_listen(vlc->config->local_prefix, &vlc->channel, 0, 0);
++    if (err) {
++        printk("%s listener(%p) failure in v2v_listen() - error: %d\n", V2VDRV_LOGTAG, vlc, err);
++        return err;
++    }
++    BUG_ON(vlc->channel == NULL);
++    printk("%s listener(%p) listener started, wait to accept...\n", V2VDRV_LOGTAG, vlc);
++    
++    /* Wait to accept the connection from the connector end */
++    err = v2v_accept(vlc->channel);
++    if (err) {
++        if (err != -ENOLINK)
++            printk("%s listener(%p) failure in v2v_accept() - error: %d\n", V2VDRV_LOGTAG, vlc, err);
++        else
++            printk("%s listener(%p) remote end disconnected while waiting to accept\n", V2VDRV_LOGTAG, vlc);
++
++        err2 = v2v_disconnect(vlc->channel);
++        if (err2) {
++            printk("%s listener(%p) secondary failure in v2v_disconnect() after accept failed - error: %d\n",
++                     V2VDRV_LOGTAG, vlc, err2);
++        }
++        return err;
++    }
++   
++    printk("%s listener(%p) accepted connection, ready to process incoming data.\n", V2VDRV_LOGTAG, vlc);
++
++    return 0;
++}
++
++static int
++v2vdrv_listener_process_internal_rx(struct v2vdrv_listener_context *vlc)
++{
++    int err;
++    volatile void *msg;
++    size_t size;
++    unsigned type;
++    unsigned flags;
++    struct v2vdrv_frame_header *header;
++    struct v2vdrv_post_internal *vpi;
++    struct v2vdrv_listener_resp_item *vlri;
++    uint8_t sum;
++
++    while ((err = v2v_nc2_get_message(vlc->channel, (const volatile void**)&msg, &size, &type, &flags))
++            == 0) {
++        vlc->rx_counter++;   
++        header = (struct v2vdrv_frame_header*)msg;
++        if (!v2vdrv_message_header_check("listener", header, size,
++                                         sizeof(struct v2vdrv_post_internal),
++                                         vlc->rx_counter)) {
++            v2v_nc2_finish_message(vlc->channel);
++            return -EBADMSG;
++        }        
++
++        vpi = (struct v2vdrv_post_internal*)msg;
++        printk("------ GUID1=%2.2x:%2.2x:%2.2x:%2.2x:%2.2x:%2.2x:%2.2x:%2.2x\n",
++               vpi->guid[0], vpi->guid[1], vpi->guid[2], vpi->guid[3],
++               vpi->guid[4], vpi->guid[5], vpi->guid[6], vpi->guid[7]);
++        printk("------ GUID2=%2.2x:%2.2x:%2.2x:%2.2x:%2.2x:%2.2x:%2.2x:%2.2x\n",
++               vpi->guid[8], vpi->guid[9], vpi->guid[10], vpi->guid[11], 
++               vpi->guid[12], vpi->guid[13], vpi->guid[14], vpi->guid[15]);
++
++        sum = v2vdrv_checksum((const uint8_t*)msg, header->length);
++        if (sum != 0)
++            printk("%s listener(%p) bad checksumm on message #%d!!!\n", V2VDRV_LOGTAG, vlc, vlc->rx_counter);
++
++        /* Queue a response */
++        vlri = kmalloc(sizeof(struct v2vdrv_listener_resp_item), GFP_KERNEL);
++        if (vlri) {
++            vlri->next = NULL;
++            vlri->resp.header.id = header->id;
++            vlri->resp.header.type = V2V_MESSAGE_TYPE_INTERNAL;
++            vlri->resp.header.cs = 0;
++            vlri->resp.header.length = sizeof(struct v2vdrv_resp_internal); /* header + resp data */
++            vlri->resp.status = (sum == 0 ? V2V_MESSAGE_STATUS_OK : V2V_MESSAGE_STATUS_BADCS);
++            memcpy(&vlri->resp.guid, vpi->guid, 16);
++            vlri->resp.header.cs = v2vdrv_checksum((const uint8_t*)vlri, sizeof(struct v2vdrv_resp_internal));
++            if (vlc->resp_list) {
++                vlc->resp_tail->next = vlri;
++                vlc->resp_tail = vlri;
++            }
++            else {
++                vlc->resp_list = vlri;
++                vlc->resp_tail = vlri;
++            }            
++        }
++        else
++            printk("%s listener(%p) cannot queue response; out of memory\n", V2VDRV_LOGTAG, vlc);
++
++        v2v_nc2_finish_message(vlc->channel);
++    }
++    if (err == -ENODATA) {
++        /* No more messages */
++        printk("%s listener(%p) no more messages, returning\n", V2VDRV_LOGTAG, vlc);
++        return 0;
++    }
++
++    printk("%s listener(%p) receive internal data failure; abort processing - error: %d\n",
++           V2VDRV_LOGTAG, vlc, err);
++    return err; /* failure */
++}
++
++static int
++v2vdrv_listener_process_internal_tx(struct v2vdrv_listener_context *vlc)
++{
++    int err;
++    unsigned available;
++    volatile void *msg;
++    uint8_t *msgp;
++    struct v2vdrv_listener_resp_item *vlri; 
++
++    printk("%s listener(%p) sending internal response #%d\n", V2VDRV_LOGTAG, vlc, vlc->tx_counter + 1);
++    available = v2v_nc2_producer_bytes_available(vlc->channel);
++    printk("%s listener(%p) channel indicates minimum bytes available: 0x%x\n", V2VDRV_LOGTAG, vlc, available);
++    BUG_ON(vlc->resp_list == NULL);
++
++    err = v2v_nc2_prep_message(vlc->channel, sizeof(struct v2vdrv_resp_internal), V2V_MESSAGE_TYPE_INTERNAL, 0, &msg);
++    if (err) {
++        if (err == -EAGAIN) {
++            /* No room right now, return and try again later */
++            printk("%s listener(%p) not enough buffer space to send response #%d; retry\n",
++                   V2VDRV_LOGTAG, vlc, vlc->tx_counter + 1);
++            return -EAGAIN;
++        }
++        printk("%s listener(%p) transmit internal response failure; abort processing - error: %d\n",
++               V2VDRV_LOGTAG, vlc, err);
++        return err; /* failure */
++    }
++    vlc->tx_counter++; /* next message */
++    vlri = vlc->resp_list;
++    vlc->resp_list = vlri->next;
++    if (!vlc->resp_list)
++         vlc->resp_tail = NULL;
++
++    /* Response already formed, just copy it in */
++    mb();
++    msgp = (uint8_t*)msg;
++    memcpy(msgp, vlri, sizeof(struct v2vdrv_resp_internal));
++    mb();
++    kfree(vlri);
++
++    v2v_nc2_send_messages(vlc->channel);
++
++    /* Keep the send loop going by setting the event. If there is no more room, the prep message call
++       will return ERROR_RETRY and just land us back in the wait. */
++    v2v_set_wake_reason(vlc->channel, V2V_WAKE_REASON_SEND);
++    
++    return 0;
++}
++
++static void
++v2vdrv_listener_process_messages(struct v2vdrv_listener_context *vlc)
++{
++    int err = 0, done = 0;
++    enum v2v_endpoint_state state;
++    struct v2v_wait *wait_state;
++    u8 reasons_mask = V2V_WAKE_REASON_CONTROL|V2V_WAKE_REASON_RECEIVE;
++    u8 reasons;
++
++    wait_state = v2v_get_wait_state(vlc->channel);
++
++    /* Start out processing loop, wait for message */
++    do {
++        if (vlc->resp_list)
++            reasons_mask = V2V_WAKE_REASON_CONTROL|V2V_WAKE_REASON_RECEIVE|V2V_WAKE_REASON_SEND;
++        else
++            reasons_mask = V2V_WAKE_REASON_CONTROL|V2V_WAKE_REASON_RECEIVE;
++
++        wait_event(wait_state->wait_event, atomic_xchg(&wait_state->wait_condition, 0) == 1);
++
++        do {
++            reasons = v2v_get_wake_reason(vlc->channel, reasons_mask);
++            
++            if (reasons & V2V_WAKE_REASON_CONTROL) {
++                err = v2v_get_remote_state(vlc->channel, &state);
++                if (err) {
++                    printk("%s listener(%p) failure in v2v_get_remote_state(); aborting - error: %d\n",
++                           V2VDRV_LOGTAG, vlc, err);
++                    done = 1;
++                    break;
++                }
++                printk("%s listener(%p) state changed for other end - new state: %s\n",
++                       V2VDRV_LOGTAG, vlc, v2v_endpoint_state_name(state));
++                if (v2v_state_requests_disconnect(state)) {
++                    printk("%s listener(%p) main processing loop ending for disconnect request...\n",
++                           V2VDRV_LOGTAG, vlc);
++                    err = 0;
++                    done = 1;
++                    break;
++                }
++            }
++
++            if (reasons & V2V_WAKE_REASON_SEND) {
++                if (vlc->resp_list) {
++                    err = v2vdrv_listener_process_internal_tx(vlc);
++                    if ((err != 0)&&(err != -EAGAIN)) {
++                        done = 1;
++                        break;
++                    }
++                }
++            }
++
++            if (reasons & V2V_WAKE_REASON_RECEIVE) {
++                err = v2vdrv_listener_process_internal_rx(vlc);
++                if (err) {
++                    done = 1;
++                    break;
++                }
++                /* we now have data, set the event again to process sends */
++                v2v_set_wake_reason(vlc->channel, V2V_WAKE_REASON_SEND);
++            }
++        } while (reasons != V2V_WAKE_REASON_NONE);
++
++        if (done)
++            break;
++    } while (1);
++}
++
++static void
++v2vdrv_listener_disconnect(struct v2vdrv_listener_context *vlc)
++{
++    int err;
++    uint32_t i = 0;
++    struct v2vdrv_listener_resp_item *resp;
++
++    printk("%s listener(%p) Disconnecting...\n", V2VDRV_LOGTAG, vlc);
++    err = v2v_disconnect(vlc->channel);
++    printk("%s listener(%p) Disconnected - status: %d\n", V2VDRV_LOGTAG, vlc, err);
++
++    printk("%s listener(%p) Sent message counter: %d\n", V2VDRV_LOGTAG, vlc, vlc->tx_counter);
++    printk("%s listener(%p) Received response counter: %d\n", V2VDRV_LOGTAG, vlc, vlc->rx_counter);
++    if (vlc->tx_counter != vlc->rx_counter)
++        printk("%s listener(%p) WARNING Response count does not match the send count\n", V2VDRV_LOGTAG, vlc);
++
++    while (vlc->resp_list) {
++        resp = vlc->resp_list;
++        vlc->resp_list = resp->next;
++        kfree(resp);
++        i++;
++    }
++    if (i > 0)
++        printk("%s listener(%p) WARNING Found %d unsent responses\n", V2VDRV_LOGTAG, vlc, i);
++}
++
++void v2vdrv_run_listener(struct v2vdrv_config *config)
++{
++    struct v2vdrv_listener_context *vlc;
++    int err;
++
++    vlc = kmalloc(sizeof(struct v2vdrv_listener_context), GFP_KERNEL);
++    if (!vlc) {
++        printk("%s listener out of memory\n", V2VDRV_LOGTAG);
++        return;
++    }
++    memset(vlc, 0, sizeof(struct v2vdrv_listener_context));
++    vlc->config = config;
++
++    err = v2vdrv_listen_accept(vlc);
++    if (err)
++        return;
++
++    /* This runs the main processing loop, when it is done we disconnect
++       and cleanup regardless of what may have occured */
++    v2vdrv_listener_process_messages(vlc);
++
++    v2vdrv_listener_disconnect(vlc);
++
++    kfree(vlc);
++}
++
+diff --git a/drivers/xen/v2v/v2vutl.c b/drivers/xen/v2v/v2vutl.c
+new file mode 100644
+index 0000000..7eb95d5
+--- /dev/null
++++ b/drivers/xen/v2v/v2vutl.c
+@@ -0,0 +1,225 @@
++/******************************************************************************
++ * drivers/xen/v2v/v2v.c
++ *
++ * V2V interdomain communication driver utilities.
++ * 
++ * Copyright (c) 2009 Ross Philipson
++ * Copyright (c) 2009 Citrix Systems, Inc.
++ *
++ * This program is free software; you can redistribute it and/or
++ * modify it under the terms of the GNU General Public License version 2
++ * as published by the Free Software Foundation; or, when distributed
++ * separately from the Linux kernel or incorporated into other
++ * software packages, subject to the following license:
++ *
++ * Permission is hereby granted, free of charge, to any person obtaining a copy
++ * of this source file (the "Software"), to deal in the Software without
++ * restriction, including without limitation the rights to use, copy, modify,
++ * merge, publish, distribute, sublicense, and/or sell copies of the Software,
++ * and to permit persons to whom the Software is furnished to do so, subject to
++ * the following conditions:
++ *
++ * The above copyright notice and this permission notice shall be included in
++ * all copies or substantial portions of the Software.
++ *
++ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
++ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
++ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
++ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
++ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
++ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
++ * IN THE SOFTWARE.
++ */
++
++#include <linux/mm.h>
++#include <linux/vmalloc.h>
++#include <xen/xenbus.h>
++#include <xen/evtchn.h>
++#include <xen/gnttab.h>
++#include <xen/v2v.h>
++#include "v2v_private.h"
++
++int
++v2v_xenstore_scatter(struct xenbus_transaction xbt, const char *prefix, ...)
++{
++    va_list args;
++    int err = 0;
++    enum xenstore_scatter_type type;
++    const char *name;
++
++    va_start(args, prefix);
++    while (1) {
++        name = va_arg(args, const char *);
++        if (!name)
++            break;
++        type = va_arg(args, enum xenstore_scatter_type);
++        switch (type) {
++        case xenstore_scatter_type_grant_ref: {
++            grant_ref_t gref;
++            gref = va_arg(args, grant_ref_t);
++            err = xenbus_printf(xbt, prefix, name, "%u", gref);
++            break;
++        }
++        case xenstore_scatter_type_evtchn_irq: {            
++            int irq;
++            irq = va_arg(args, int);
++            err = xenbus_printf(xbt, prefix, name, "%u", irq_to_evtchn_port(irq));
++            break;
++        }
++        case xenstore_scatter_type_string: {
++            const char *str;
++            str = va_arg(args, const char *);
++            err = xenbus_printf(xbt, prefix, name, "%s", str);
++            break;
++        }
++        case xenstore_scatter_type_int: {
++            int i;
++            i = va_arg(args, int);
++            err = xenbus_printf(xbt, prefix, name, "%d", i);
++            break;
++        }
++        default: {
++            err = -ENOSYS;
++            break;
++        }
++        }
++        if (err)
++            break;
++    }
++    va_end(args);
++    return err;
++}
++
++int
++v2v_xenstore_gather(struct xenbus_transaction xbt, const char *prefix, ...)
++{
++    va_list args;
++    const char *name;
++    enum xenstore_gather_type type;
++    char *raw_data;
++    int err = 0, r;
++
++    va_start(args, prefix);
++    while (1) {
++        name = va_arg(args, const char *);
++        if (!name)
++            break;
++        type = va_arg(args, enum xenstore_gather_type);
++        raw_data = xenbus_read(xbt, prefix, name, NULL);    
++        if (IS_ERR(raw_data)) {
++            err = PTR_ERR(raw_data);
++            break;
++        }
++
++        switch (type) {
++        case xenstore_gather_type_alien_grant_ref: {
++            grant_ref_t *gref;
++            unsigned raw;
++            gref = va_arg(args, grant_ref_t *);
++            r = sscanf(raw_data, "%d", &raw);
++            *gref = raw;
++            break;
++        }
++
++        case xenstore_gather_type_alien_evtchn_port: {
++            unsigned int *out;
++            unsigned raw;
++            out = va_arg(args, unsigned int *);
++            r = sscanf(raw_data, "%d", &raw);
++            *out = raw;
++            break;
++        }
++
++        case xenstore_gather_type_int: {
++            int *out;
++            out = va_arg(args, int *);
++            r = sscanf(raw_data, "%d", out);
++            break;
++        }
++
++        default: {
++            err = -ENOSYS;
++            break;
++        }
++        }
++
++        kfree(raw_data);
++        if (r != 1)
++            err = -EINVAL;            
++
++        if (err)
++            break;
++    }
++    va_end(args);
++    return err;
++}
++
++int
++v2v_xenops_grant_map(struct vm_struct **vm_area_out, grant_handle_t *ghandles_out,
++                     domid_t domid, unsigned int nr_grefs, grant_ref_t *grefs, int readonly)
++{
++    struct gnttab_map_grant_ref op[MAX_RING_PAGES];
++    uint8_t *addr;
++    unsigned int flags = GNTMAP_host_map;
++    unsigned x;
++    int err = 0;    
++    
++    if (readonly)
++        flags |= GNTMAP_readonly;    
++
++    /* create our virt memory area for the grant mapping */
++    *vm_area_out = alloc_vm_area(nr_grefs * PAGE_SIZE);
++    if (*vm_area_out == NULL)
++        return -ENOMEM;
++
++    for (x = 0; x < nr_grefs; x++) {
++        addr = (uint8_t*)(*vm_area_out)->addr + (x * PAGE_SIZE);
++        gnttab_set_map_op(&op[x], (unsigned long)addr, flags, grefs[x], domid);
++    }
++
++    if (HYPERVISOR_grant_table_op(GNTTABOP_map_grant_ref, op, nr_grefs))
++        BUG();
++
++    for (x = 0; x < nr_grefs; x++) {
++        if (op[x].status != 0) {
++            err = op[x].status;
++            ghandles_out[x] = INVALID_GRANT_HANDLE;
++        }
++        else
++            ghandles_out[x] = op[x].handle;
++      }
++    
++    if (err)
++        v2v_xenops_grant_unmap(*vm_area_out, ghandles_out, nr_grefs, readonly);
++
++    return err;
++}
++
++void
++v2v_xenops_grant_unmap(struct vm_struct *vm_area, grant_handle_t *ghandles, unsigned int nr_grefs, int readonly)
++{
++    struct gnttab_unmap_grant_ref op[MAX_RING_PAGES];
++    uint8_t *addr;
++    unsigned int flags = GNTMAP_host_map;
++    unsigned x, y = 0;
++
++    if (readonly)
++        flags |= GNTMAP_readonly; 
++
++    for (x = 0; x < nr_grefs; x++) {
++        addr = (uint8_t*)vm_area->addr + (x * PAGE_SIZE);
++
++        if (ghandles[x] != INVALID_GRANT_HANDLE) {
++            gnttab_set_unmap_op(&op[y++], (unsigned long)addr, flags, ghandles[x]);
++            ghandles[x] = INVALID_GRANT_HANDLE;
++        }
++    }
++    
++    if (y != 0) {
++        if (HYPERVISOR_grant_table_op(GNTTABOP_unmap_grant_ref, op, y))
++            BUG();
++    }
++
++    free_vm_area(vm_area);
++}
++
+diff --git a/include/xen/interface/io/uring.h b/include/xen/interface/io/uring.h
+new file mode 100644
+index 0000000..5d9d60a
+--- /dev/null
++++ b/include/xen/interface/io/uring.h
+@@ -0,0 +1,411 @@
++/******************************************************************************
++ * uring.h
++ *
++ * Shared producer-consumer ring pair macros.
++ *
++ * Permission is hereby granted, free of charge, to any person obtaining a copy
++ * of this software and associated documentation files (the "Software"), to
++ * deal in the Software without restriction, including without limitation the
++ * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
++ * sell copies of the Software, and to permit persons to whom the Software is
++ * furnished to do so, subject to the following conditions:
++ *
++ * The above copyright notice and this permission notice shall be included in
++ * all copies or substantial portions of the Software.
++ *
++ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
++ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
++ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
++ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
++ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
++ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
++ * DEALINGS IN THE SOFTWARE.
++ *
++ * Copyright (c) 2009 Citrix Systems, Inc.
++ */
++
++#ifndef __XEN_PUBLIC_IO_URING_H__
++#define __XEN_PUBLIC_IO_URING_H__
++
++typedef unsigned RING_IDX;
++
++#define NETCHANNEL2_MSG_PAD 255
++
++/* The sring structures themselves.  The _cons and _prod variants are
++   different views of the same bit of shared memory, and are supposed
++   to provide better checking of the expected use patterns.  Fields in
++   the shared ring are owned by either the producer end or the
++   consumer end.  If a field is owned by your end, the other end will
++   never modify it.  If it's owned by the other end, the other end is
++   allowed to modify it whenever it likes, and you can never do so.
++
++   Fields owned by the other end are always const (because you can't
++   change them).  They're also volatile, because there are a bunch
++   of places where we go:
++
++   local_x = sring->x;
++   validate(local_x);
++   use(local_x);
++
++   and it would be very bad if the compiler turned that into:
++
++   local_x = sring->x;
++   validate(sring->x);
++   use(local_x);
++
++   because that contains a potential TOCTOU race (hard to exploit, but
++   still present).  The compiler is only allowed to do that
++   optimisation because it knows that local_x == sring->x at the start
++   of the call to validate(), and it only knows that if it can reorder
++   the read of sring->x over the sequence point at the end of the
++   first statement.  In other words, it can only do the bad
++   optimisation if it knows that reads of sring->x are side-effect
++   free.  volatile stops it from making that assumption.
++
++   We don't need a full memory barrier here, because it's sufficient
++   to copy the volatile data into stable guest-local storage, and
++   volatile achieves that.  i.e. we don't need local_x to be precisely
++   sring->x, but we do need it to be a stable snapshot of some
++   previous valud of sring->x.
++
++   Note that there are still plenty of other places where we *do* need
++   full barriers.  volatile just deals with this one, specific, case.
++
++   We could also deal with it by putting compiler barriers in all over
++   the place.  The downside of that approach is that you need to put
++   the barrier()s in lots of different places (basically, everywhere
++   which needs to access these fields), and it's easy to forget one.
++   barrier()s also have somewhat heavier semantics than volatile
++   (because they prevent all reordering, rather than just reordering
++   on this one field), although that's pretty much irrelevant because
++   gcc usually treats pretty much any volatile access as a call to
++   barrier().
++*/
++
++/* Messages are sent over sring pairs.  Each sring in a pair provides
++ * a unidirectional byte stream which can generate events when either
++ * the producer or consumer pointers cross a particular threshold.
++ *
++ * We define both sring_prod and sring_cons structures.  The two
++ * structures will always map onto the same physical bytes in memory,
++ * but they provide different views of that memory which are
++ * appropriate to either producers or consumers.
++ *
++ * Obviously, the endpoints need to agree on which end produces
++ * messages on which ring.  The endpoint which provided the memory
++ * backing the ring always produces on the first sring, and the one
++ * which just mapped the ring produces on the second.  By convention,
++ * these are known as the frontend and backend, respectively.
++ */
++
++/* For both rings, the producer (consumer) pointers point at the
++ * *next* byte which is going to be produced (consumed).  An endpoint
++ * must generate an event on the event channel port if it moves the
++ * producer pointer (consumer pointer) across prod_event (cons_event).
++ *
++ * i.e if an endpoint ever updates a pointer so that the old pointer
++ * is strictly less than the event, and the new pointer is greater
++ * than or equal to the event then the remote must be notified.  If
++ * the pointer overflows the ring, treat the new value as if it were
++ * (actual new value) + (1 << 32).
++ */
++struct netchannel2_sring_fields {
++        RING_IDX prod;
++        RING_IDX prod_event;
++        RING_IDX cons;
++        RING_IDX cons_event;
++
++        /* Setting consumer_waiting gives the other end a hint that it
++           should flush messages more aggressively, because the other
++           end is sitting waiting for them. */
++        unsigned consumer_active;
++        unsigned consumer_spinning;
++        unsigned producer_active;
++        unsigned char pad[36];
++};
++struct netchannel2_frontend_shared {
++        struct netchannel2_sring_fields a;
++        volatile const struct netchannel2_sring_fields b;
++};
++struct netchannel2_backend_shared {
++        volatile const struct netchannel2_sring_fields b;
++        struct netchannel2_sring_fields a;
++};
++
++struct nc2_ring_pair {
++        volatile struct netchannel2_sring_fields *local_endpoint;
++        volatile const struct netchannel2_sring_fields *remote_endpoint;
++
++        void *producer_payload;
++        volatile const void *consumer_payload;
++
++        /* The previous value written to local_endpoint->prod */
++        RING_IDX local_prod;
++
++        /* Will get written to local_endpoint->prod next time
++           we flush. */
++        RING_IDX local_prod_pvt;
++
++        /* The previous value written to local_endpoint->cons */
++        RING_IDX local_cons;
++
++        /* Will get written to local_endpoint->cons next time we
++           finish. */
++        RING_IDX local_cons_pvt;
++
++        /* This is the number of bytes available after local_prod_pvt
++           last time we checked, minus the number of bytes which we've
++           consumed since then.  It's used to a avoid a bunch of
++           memory barriers when checking for ring space. */
++        unsigned local_prod_bytes_available;
++
++        /* shadow of local_endpoint->producer_active */
++        unsigned local_producer_active;
++
++        unsigned producer_payload_bytes;
++        unsigned consumer_payload_bytes;
++};
++
++/* A message header.  There is one of these at the start of every
++ * message.  @type is one of the #define's below, and @size is the
++ * size of the message, including the header and any padding.
++ * size should be a multiple of 8 so we avoid unaligned memory copies.
++ * structs defining message formats should have sizes multiple of 8
++ * bytes and should use paddding fields if needed.
++ */
++struct netchannel2_msg_hdr {
++        unsigned char type;
++        unsigned char flags;
++        unsigned short size;
++};
++
++static __inline const volatile void *
++__nc2_incoming_message(struct nc2_ring_pair *ring)
++{
++        return (const volatile void *)((off_t)ring->consumer_payload +
++                                       (ring->local_cons_pvt &
++                                        (ring->consumer_payload_bytes - 1)));
++}
++
++static __inline int
++__nc2_contained_in_cons_ring(struct nc2_ring_pair *ring,
++                             const volatile void *msg,
++                             size_t size)
++{
++        if (msg < ring->consumer_payload ||
++            size > ring->consumer_payload_bytes ||
++            (off_t)msg + size >
++                (off_t)ring->consumer_payload +
++                ring->consumer_payload_bytes)
++                return 0;
++        else
++                return 1;
++}
++
++static __inline volatile void *
++__nc2_get_message_ptr(struct nc2_ring_pair *ncrp)
++{
++        return (volatile void *)((off_t)ncrp->producer_payload +
++                                 (ncrp->local_prod_pvt & (ncrp->producer_payload_bytes-1)));
++}
++
++static __inline void
++__nc2_send_pad(struct nc2_ring_pair *ncrp, unsigned short nr_bytes)
++{
++        volatile struct netchannel2_msg_hdr *msg;
++        msg = __nc2_get_message_ptr(ncrp);
++        msg->type = NETCHANNEL2_MSG_PAD;
++        msg->flags = 0;
++        msg->size = nr_bytes;
++        ncrp->local_prod_pvt += nr_bytes;
++        ncrp->local_prod_bytes_available -= nr_bytes;
++}
++
++static __inline int
++__nc2_ring_would_wrap(struct nc2_ring_pair *ring, unsigned short nr_bytes)
++{
++        RING_IDX mask;
++        mask = ~(ring->producer_payload_bytes - 1);
++        return (ring->local_prod_pvt & mask) != ((ring->local_prod_pvt + nr_bytes) & mask);
++}
++
++static __inline unsigned short
++__nc2_pad_needed(struct nc2_ring_pair *ring)
++{
++        return (unsigned short)(ring->producer_payload_bytes -
++                                (ring->local_prod_pvt &
++                                 (ring->producer_payload_bytes - 1)));
++}
++
++static __inline void
++__nc2_avoid_ring_wrap(struct nc2_ring_pair *ring, unsigned short nr_bytes)
++{
++        if (!__nc2_ring_would_wrap(ring, nr_bytes))
++                return;
++        __nc2_send_pad(ring, __nc2_pad_needed(ring));
++
++}
++
++/* A quick test of whether calling nc2_flush_ring() is likely to
++   trigger an event channel notification.  This is *not* guaranteed to
++   be correct, in either direction; constantly returning 0 and
++   constantly returning 1 would both be correct implementations. */
++static __inline int
++__nc2_flush_would_trigger_event(const struct nc2_ring_pair *ring)
++{
++        if ( (RING_IDX)(ring->local_prod_pvt - ring->remote_endpoint->prod_event) <
++             (RING_IDX)(ring->local_prod_pvt - ring->local_prod) )
++                return 1;
++        else
++                return 0;
++}
++
++/* Copy the private producer pointer to the shared producer pointer,
++ * with a suitable memory barrier such that all messages placed on the
++ * ring are stable before we do the copy.  This effectively pushes any
++ * messages which we've just sent out to the other end.  Returns 1 if
++ * we need to notify the other end and 0 otherwise.
++ */
++static __inline int
++nc2_flush_ring(struct nc2_ring_pair *ring)
++{
++        RING_IDX old_prod, new_prod;
++
++        new_prod = ring->local_prod_pvt;
++        old_prod = ring->local_prod;
++
++        ring->local_endpoint->prod = new_prod;
++        ring->local_prod = new_prod;
++
++        /* Need to publish our new producer pointer before checking
++           event. */
++        mb();
++
++        /* We notify if the producer pointer moves across the event
++         * pointer. */
++        if ( (RING_IDX)(new_prod - ring->remote_endpoint->prod_event) <
++             (RING_IDX)(new_prod - old_prod) ) {
++                return 1;
++        } else {
++                return 0;
++        }
++}
++
++/* Copy the private consumer pointer to the shared consumer pointer,
++ * with a memory barrier so that any previous reads from the ring
++ * complete before the pointer is updated.  This tells the other end
++ * that we're finished with the messages, and that it can re-use the
++ * ring space for more messages.  Returns 1 if we need to notify the
++ * other end and 0 otherwise.
++ */
++static __inline int
++nc2_finish_messages(struct nc2_ring_pair *ring)
++{
++        RING_IDX old_cons, new_cons;
++
++        old_cons = ring->local_cons;
++        new_cons = ring->local_cons_pvt;
++
++        ring->local_endpoint->cons = new_cons;
++        ring->local_cons = new_cons;
++
++        /* Need to publish our new consumer pointer before checking
++           event. */
++        mb();
++        if ( (RING_IDX)(new_cons - ring->remote_endpoint->cons_event) <
++             (RING_IDX)(new_cons - old_cons) )
++                return 1;
++        else
++                return 0;
++}
++
++/* Check whether there are any unconsumed messages left on the shared
++ * ring.  Returns 1 if there are, and 0 if there aren't.  If there are
++ * no more messages, set the producer event so that we'll get a
++ * notification as soon as another one gets sent.  It is assumed that
++ * all messages up to @prod have been processed, and none of the ones
++ * after it have been. */
++static __inline int
++nc2_final_check_for_messages(struct nc2_ring_pair *ring, RING_IDX prod)
++{
++        if (prod != ring->remote_endpoint->prod)
++                return 1;
++        /* Request an event when more stuff gets poked on the ring. */
++        ring->local_endpoint->prod_event = prod + 1;
++
++        /* Publish event before final check for responses. */
++        mb();
++        if (prod != ring->remote_endpoint->prod)
++                return 1;
++        else
++                return 0;
++}
++
++/* Can we send a message with @nr_bytes payload bytes?  Returns 1 if
++ * we can or 0 if we can't.  If there isn't space right now, set the
++ * consumer event so that we'll get notified when space is
++ * available. */
++static __inline int
++nc2_can_send_payload_bytes(struct nc2_ring_pair *ring,
++                           unsigned short nr_bytes)
++{
++        unsigned space;
++        RING_IDX cons;
++        /* Times 2 because we might need to send a pad message */
++        if (ring->local_prod_bytes_available > (unsigned)(nr_bytes * 2))
++                return 1;
++        if (__nc2_ring_would_wrap(ring, nr_bytes))
++                nr_bytes = nr_bytes + __nc2_pad_needed(ring);
++retry:
++        cons = ring->remote_endpoint->cons;
++        space = ring->producer_payload_bytes - (ring->local_prod_pvt - cons);
++        if (space >= nr_bytes) {
++                /* We have enough space to send the message. */
++
++                /* No memory barrier: we need the read of cons to have
++                   acquire semantics, which it does, because it's
++                   volatile. */
++
++                ring->local_prod_bytes_available = space;
++
++                return 1;
++        } else {
++                /* Not enough space available.  Set an event pointer
++                   when cons changes.  We need to be sure that the
++                   @cons used here is the same as the cons used to
++                   calculate @space above, and the volatile modifier
++                   on sring->cons achieves that. */
++                ring->local_endpoint->cons_event = cons + 1;
++
++                /* Check whether more space became available while we
++                   were messing about. */
++
++                /* Need the event pointer to be stable before we do
++                   the check. */
++                mb();
++                if (cons != ring->remote_endpoint->cons) {
++                        /* Cons pointer changed.  Try again. */
++                        goto retry;
++                }
++
++                /* There definitely isn't space on the ring now, and
++                   an event has been set such that we'll be notified
++                   if more space becomes available. */
++                /* XXX we get a notification as soon as any more space
++                   becomes available.  We could maybe optimise by
++                   setting the event such that we only get notified
++                   when we know that enough space is available.  The
++                   main complication is handling the case where you
++                   try to send a message of size A, fail due to lack
++                   of space, and then try to send one of size B, where
++                   B < A.  It's not clear whether you want to set the
++                   event for A bytes or B bytes.  The obvious answer
++                   is B, but that means moving the event pointer
++                   backwards, and it's not clear that that's always
++                   safe.  Always setting for a single byte is safe, so
++                   stick with that for now. */
++                return 0;
++        }
++}
++
++#endif /* __XEN_PUBLIC_IO_URING_H__ */
+diff --git a/include/xen/v2v.h b/include/xen/v2v.h
+new file mode 100644
+index 0000000..77f4472
+--- /dev/null
++++ b/include/xen/v2v.h
+@@ -0,0 +1,366 @@
++/******************************************************************************
++ * include/xen/v2v.h
++ *
++ * V2V interdomain communication API.
++ *
++ * Copyright (c) 2009 Citrix Systems, Inc.
++ *
++ * This program is free software; you can redistribute it and/or
++ * modify it under the terms of the GNU General Public License version 2
++ * as published by the Free Software Foundation; or, when distributed
++ * separately from the Linux kernel or incorporated into other
++ * software packages, subject to the following license:
++ *
++ * Permission is hereby granted, free of charge, to any person obtaining a copy
++ * of this source file (the "Software"), to deal in the Software without
++ * restriction, including without limitation the rights to use, copy, modify,
++ * merge, publish, distribute, sublicense, and/or sell copies of the Software,
++ * and to permit persons to whom the Software is furnished to do so, subject to
++ * the following conditions:
++ *
++ * The above copyright notice and this permission notice shall be included in
++ * all copies or substantial portions of the Software.
++ *
++ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
++ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
++ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
++ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
++ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
++ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
++ * IN THE SOFTWARE.
++ */
++#ifndef _XEN_V2V_H
++#define _XEN_V2V_H
++
++#include <linux/wait.h>
++
++/* An opaque type representing a low-level VM-to-VM communication
++ * channel.  These are point-to-point, record-oriented connections
++ * using shared memory and event channels.  They are created with
++ * v2v_listen() and v2v_connect(), and should be destroyed with
++ * v2v_disconnect() when no longer needed.
++ *
++ * They will not, in general, survive hibernation, suspend/resume, or
++ * migration, and will become disconnected following any such events.
++ * They must still be torn down in the usual way, but will not be
++ * functional.
++ */
++struct v2v_channel;
++
++/* Wait state structure returned by v2v_get_wait_state. See this
++ * function for more details.
++ */
++struct v2v_wait {
++    wait_queue_head_t wait_event;
++    atomic_t          wait_condition;
++};
++
++/* Start listening for incoming VM-to-VM connections using parameters
++ * from the xenstore area @xenbus_prefix, which should have been
++ * previously set up by the toolstack.  The newly constructed channel
++ * is placed in *@channel.  The size of the outgoing message ring is
++ * 2**@prod_ring_page_order pages, and the size of the incoming one
++ * 2**@cons_ring_page_order pages.
++ *
++ * It is generally a mistake to have several processes listening on
++ * the same xenbus prefix.
++ *
++ * Returns 0 on success and an appropriate errno code on failure.
++ */
++int v2v_listen(const char *xenbus_prefix,
++               struct v2v_channel **channel,
++               unsigned prod_ring_page_order,
++               unsigned cons_ring_page_order);
++
++/* Wait for a remote domain to connect to the channel @channel, which
++ * should have been allocated with v2v_listen().
++ *
++ * Returns 0 on success and an appropriate errno code on failure.
++ */
++int v2v_accept(struct v2v_channel *channel);
++
++/* Connect to a VM-to-VM channel specified by @xenbus_prefix, which
++ * should previously have been initialised by the tools, and place the
++ * newly constructed channel structure in *@channel.  Note that
++ * @xenbus_prefix specifies the *local* xenstore prefix, and not the
++ * server prefix.
++ *
++ * This will fail if the remote endpoint is not currently listening on
++ * the specified channel.
++ *
++ * Returns 0 on success and an appropriate errno code on failure.
++ */
++int v2v_connect(const char *xenbus_prefix,
++                struct v2v_channel **channel);
++
++/* Disconnect from a VM-to-VM channel @channel which was previously
++ * established using either v2v_connect() or v2v_listen().  The channel
++ * is no longer valid once this returns, and should not be used.
++ *
++ * If the channel was constructued by v2v_connect(), this is
++ * instaneous.  If it was constructed by v2v_listen(), and a remote
++ * endpoint is currently connected, we must wait for the peer to call
++ * v2v_disconnect() on their end of the connection before this can
++ * return.  In that case, the local state is set to
++ * v2v_endpoint_state_disconnecting, so as to encourage the remote to
++ * disconnect quickly.
++ *
++ * Note that this can fail if the remote endpoint is misbehaving.  In
++ * that case, there is no reliable way to destroy the connection, and
++ * it must be leaked.
++ *
++ * Returns 0 on success and an appropriate errno code on failure.
++ */
++int v2v_disconnect(const struct v2v_channel *channel);
++
++/* An enumeration representing the states which a remote endpoint can
++ * be in.  Obviously, the remote endpoint can change its state at any
++ * time; if this happens, the local control event will be notified.
++ */
++enum v2v_endpoint_state {
++
++    /* There was an error reading the remote endpoint state, and it is
++     * currently unknown.
++     */
++    v2v_state_unknown,
++
++    /* The remote endpoint is not ready yet, and doesn't appear to
++     * have ever been ready.  New endpoints are placed in this state
++     * before they connect or listen for the first time.  Note that if
++     * the endpoint has previously connected and disconnected, the
++     * state will be either v2v_state_disconnected or
++     * v2v_state_crashed, and not v2v_state_unready.
++     */
++    v2v_state_unready = 0x123,
++
++    /* The remote endpoint is currently listening for connections. */
++    v2v_state_listening,
++
++    /* The remote endpoint has connected, and is willing to accept any
++     * messages which we send.
++     */
++    v2v_state_connected,
++
++    /* The remote endpoint is connected, but is asking us to
++     * disconnect as quickly as possible.  This is used as part of
++     * connection teardown.
++     */
++    v2v_state_disconnecting,
++
++    /* The remote endpoint has disconnected cleanly. */
++    v2v_state_disconnected,
++
++    /* The remote endpoint has disconnected uncleanly.  This should be
++     * treated similarly to v2v_state_disconnected, except possibly
++     * for providing more informative error messages.
++     */
++    v2v_state_crashed
++};
++
++/* Returns 1 if, upon seeing state @state, the local endpoint is supposed
++ * to start disconnecting.
++ */
++static inline int
++v2v_state_requests_disconnect(enum v2v_endpoint_state state)
++{
++    if (state >= v2v_state_disconnecting)
++        return 1;
++    else
++        return 0;
++}
++
++/* Get the current remote state for channel @channel, which should
++ * have been constructed with v2v_connect() or v2v_listen(). The
++ * argument *@state receives the current state value. 
++ *
++ * The value of *@state will set to v2v_state_unknown when there
++ * is no remote state returned internall or an error occurs.  This
++ * includes when the remote end is not present.  In this case the
++ * function will return -ENOENT or -ERANGE. The XENBUS_EXIST_ERR()
++ * macro in xenbus.h can be used to test for this condition.
++ *
++ * Note that there is no guarantee that the returned state will remain
++ * valid after this returns.  However, we do guarantee to set the
++ * channel's control event whenever the remote state changes.
++ *
++ * Returns 0 on success and an appropriate errno code on failure.
++ */
++int v2v_get_remote_state(struct v2v_channel *channel,
++                         enum v2v_endpoint_state *state);
++
++/* Convert a v2v_endpoint_state @state to a string.  The resulting string
++ * is static, and the caller should not attempt to modify it in any
++ * way or to free it.
++ *
++ * Always succeeds, provided that @state is a valid memory of the
++ * v2v_endpoint_state enumeration.
++ */
++const char *v2v_endpoint_state_name(enum v2v_endpoint_state state);
++
++/* Wake reasons. The values can be OR'd together in the functions below
++ * to specify more than one wait reason. 
++ */
++#define V2V_WAKE_REASON_NONE    0x00
++#define V2V_WAKE_REASON_CONTROL 0x01
++#define V2V_WAKE_REASON_SEND    0x02
++#define V2V_WAKE_REASON_RECEIVE 0x04
++#define V2V_WAKE_REASON_ANY     0xFF
++
++/* Retrieve the wait state object for the @channel.  This object allows
++ * a thread to wait until one of several conditions is true.  Normally
++ * the event would be used in a similar manner to the following following:
++ *
++ *   wait_event(wait_state->wait_event,
++ *              atomic_xchg(&wait_state->wait_condition, 0) == 1);
++ * 
++ * Following the wait,  the function v2v_get_wake_reason() would be used
++ * to determine the reasons the wait was satisfied.  One or more of the
++ * wake reasons above indicated the reason the wait was satified.
++ *
++ * V2V_WAKE_REASON_CONTROL: This event reason will be set whenever there 
++ * is any possibility that the remote endpoint state has changed.  Note 
++ * that it may also be set when the state has not changed.
++ *
++ * V2V_WAKE_REASON_SEND: Send event reasons are set whenever space is
++ * available in the ring to send messages, and cleared whenever
++ * v2v_nc2_prep_message() fails due to a lack of outgoing ring space.
++ * Note that this event being set does not imply that it will be
++ * possible to send a message (because it doesn't tell you how *much*
++ * space is available in the ring, which might not even be enough for
++ * a header), but if it is possible to send a message then this event
++ * will be set.
++ *
++ * V2V_WAKE_REASON_RECEIVE: This reason is set when data is recieved.  Note 
++ * that it may sometimes be set when there are no messages incoming, and
++ * will remain so until v2v_nc2_get_message() is called.
++ */
++struct v2v_wait *v2v_get_wait_state(struct v2v_channel *channel);
++
++/* Retrive the @reasons that a wait was satisfied.  The @reasons value
++ * is an OR'ed list list of the V2V_WAKE_REASONE_* values above.  Each
++ * call to v2v_get_wake_reason will return that next wake reason in the 
++ * internal queue that matches the @reasons mask.  This routine can be
++ * called multiple times before and after a wait depending on the specific
++ * use case involved.
++ *
++ * Returns one of V2V_WAKE_REASONE_CONTROL, V2V_WAKE_REASONE_SEND or
++ * V2V_WAKE_REASONE_RECEIVE from the internal queue. When no more wake
++ * reasons are present, V2V_WAKE_REASON_NONE is returned.
++ */
++u8 v2v_get_wake_reason(struct v2v_channel *channel, u8 reasons);
++
++/* This routine can be used to explicitly set a wake @reason in the 
++ * internal queue.
++ *
++ * Returns 0 on success and an appropriate errno code on failure.
++ */
++int v2v_set_wake_reason(struct v2v_channel *channel, u8 reason);
++
++/* This routine can be used to explicitly set a wake @reason in the 
++ * internal queue.
++ */
++void v2v_clear_wake_reason(struct v2v_channel *channel, u8 reasons);
++
++/* Try to fetch a message from the incoming message queue.  On
++ * success, *@payload is set to a pointer to the message payload,
++ * *@size is set to its size, *@type is set to its type, and *@flags
++ * is set to its flags.  Note thath *@payload will point at read-only
++ * memory which is shared with the remote endpoint; it must not be
++ * written to, and a malicious remote could cause its contents to
++ * change at any time.
++ *
++ * Returns 0 on success. If no more messages are available, -ENODATA
++ * is returned. On failure an errno error code is returned.
++ *
++ * Once the client has finished consuming the message, it should call
++ * v2v_nc2_finish_message().
++ */
++int v2v_nc2_get_message(struct v2v_channel *channel,
++                        const volatile void **payload,
++                        size_t *size,
++                        unsigned *type,
++                        unsigned *flags);
++
++/* Finish consuming the message which was most recently returned by
++ * v2v_nc2_get_message() on channel @channel.  The ring space is
++ * released, so that the remote can use it to send another message,
++ * and the channel advances to the next incoming message.
++ *
++ * The payload returned by v2v_nc2_get_message() must not be touched
++ * once this returns.
++ */
++void v2v_nc2_finish_message(struct v2v_channel *channel);
++
++/* Prepare to send a message of size @msg_size on the ring, using type
++ * @type and flags @flags.  Space for @msg_size bytes of payload is
++ * allocated on the ring, and returned in *@payload.  Note that
++ * *@payload will point to shared memory, and so the remote endpoint may
++ * be able to modify it under certain circumstances.
++ *
++ * The message is not actually sent until v2v_nc2_send_messages() is
++ * called.
++ *
++ * If there is insufficient space in the ring, this routine requests
++ * that the remote endpoint set the local ring's send event when more
++ * space becomes available, the call returns -EAGAIN.
++ *
++ * Returns 0 on success and an appropriate errno code on failure.
++ */
++int v2v_nc2_prep_message(struct v2v_channel *channel,
++                         size_t msg_size,
++                         unsigned char type,
++                         unsigned char flags,
++                         volatile void **payload);
++
++/* Flush the current batch of outgoing messages to the ring.  The
++ * messages are made visible to the correctly behaving remote endpoint
++ * (incorrectly behaving remote endpoints can always look ahead) and
++ * the remote is woken up if appropriate.
++ *
++ * The client must not touch the payload pointers returned by
++ * v2v_nc2_prep_message() after calling this function.
++ */
++void v2v_nc2_send_messages(struct v2v_channel *channel);
++
++/* Estimate the largest message size which could be passed to
++ * v2v_nc2_prep_message() such that it would succeed without
++ * generating a large pad message.
++ *
++ * This is a lower bound on the message size.  Larger sends may
++ * sometimes succeed.  It is guaranteed that a send at the returned
++ * size will never fail.
++ */
++unsigned v2v_nc2_producer_bytes_available(struct v2v_channel *channel);
++
++/* Check whether the remote endpoint is currently in fast-receive
++ * mode.  Returns 1 if they have and 0 otherwise.
++ *
++ * Note that there is no way of preventing the remote's fast-receive
++ * state from changing after this has been called, and so the value
++ * returned is inherently inaccurate.  It should only be used for
++ * performance tuning, and not for correctness.
++ */
++int v2v_nc2_remote_requested_fast_wakeup(struct v2v_channel *channel);
++
++/* Enter fast-receive mode on channel @channel.  In this mode, we
++ * optimise for latency rather than throughput, by, for instance,
++ * flushing outgoing messages to the ring more aggressively (which
++ * means they get sent sooner, but in smaller batches).  This is
++ * generally worthwhile if the local endpoint is blocked waiting to
++ * receive messages, but not during normal operation.
++ *
++ * Entering fast receive mode should never affect the correctness of
++ * clients, but may have a significant performance impact.
++ */
++void v2v_nc2_request_fast_receive(struct v2v_channel *channel);
++
++/* Exit fast-receive mode on channel @channel.  This will undo the
++ * effects of v2v_nc2_request_fast_receive().  Fast-receive mode is
++ * not reference counted in any way, and so calling
++ * v2v_nc2_cancel_fast_receive() will cause the channel to leave
++ * fast-receive mode regardless of how many times
++ * v2v_nc2_request_fast_receive() has been called.
++ */
++void v2v_nc2_cancel_fast_receive(struct v2v_channel *channel);
++
++#endif /* !_XEN_V2V_H */