debuggers.hg

view tools/remus/kmod/sch_queue.c @ 20486:1c8c18ae1d3b

Remus: support for network buffering

This currently relies on the third-party IMQ patch (linuximq.net)
being present in dom0. The plan is to replace this with a direct hook
into netback eventually.

This patch includes a pared-down and patched copy of ebtables to
install IMQ on a VIF.

Signed-off-by: Brendan Cully <brendan@cs.ubc.ca>
author Keir Fraser <keir.fraser@citrix.com>
date Fri Nov 13 15:34:46 2009 +0000 (2009-11-13)
parents
children 989014ce7b4a
line source
1 /*
2 * sch_queue.c Queue traffic until an explicit release command
3 *
4 * This program is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU General Public License
6 * as published by the Free Software Foundation; either version
7 * 2 of the License, or (at your option) any later version.
8 *
9 * The operation of the buffer is as follows:
10 * When a checkpoint begins, a barrier is inserted into the
11 * network queue by a netlink request (it operates by storing
12 * a pointer to the next packet which arrives and blocking dequeue
13 * when that packet is at the head of the queue).
14 * When a checkpoint completes (the backup acknowledges receipt),
15 * currently-queued packets are released.
16 * So it supports two operations, barrier and release.
17 */
19 #include <linux/config.h>
20 #include <linux/module.h>
21 #include <linux/types.h>
22 #include <linux/kernel.h>
23 #include <linux/errno.h>
24 #include <linux/netdevice.h>
25 #include <linux/skbuff.h>
26 #include <net/pkt_sched.h>
28 /* xenbus directory */
29 #define FIFO_BUF (10*1024*1024)
31 #define TCQ_CHECKPOINT 0
32 #define TCQ_DEQUEUE 1
34 struct queue_sched_data {
35 /* this packet is the first packet which should not be delivered.
36 * If it is NULL, queue_enqueue will set it to the next packet it sees. */
37 struct sk_buff *stop;
38 };
40 struct tc_queue_qopt {
41 /* 0: reset stop packet pointer
42 * 1: dequeue to stop pointer */
43 int action;
44 };
46 /* borrowed from drivers/xen/netback/loopback.c */
47 static int is_foreign(unsigned long pfn)
48 {
49 /* NB. Play it safe for auto-translation mode. */
50 return (xen_feature(XENFEAT_auto_translated_physmap) ||
51 (phys_to_machine_mapping[pfn] & FOREIGN_FRAME_BIT));
52 }
54 static int skb_remove_foreign_references(struct sk_buff *skb)
55 {
56 struct page *page;
57 unsigned long pfn;
58 int i, off;
59 char *vaddr;
61 BUG_ON(skb_shinfo(skb)->frag_list);
63 for (i = 0; i < skb_shinfo(skb)->nr_frags; i++) {
64 pfn = page_to_pfn(skb_shinfo(skb)->frags[i].page);
65 if (!is_foreign(pfn))
66 continue;
67 /*
68 printk("foreign ref found\n");
69 */
70 page = alloc_page(GFP_ATOMIC | __GFP_NOWARN);
71 if (unlikely(!page))
72 return 0;
74 vaddr = kmap_skb_frag(&skb_shinfo(skb)->frags[i]);
75 off = skb_shinfo(skb)->frags[i].page_offset;
76 memcpy(page_address(page) + off, vaddr + off,
77 skb_shinfo(skb)->frags[i].size);
78 kunmap_skb_frag(vaddr);
80 put_page(skb_shinfo(skb)->frags[i].page);
81 skb_shinfo(skb)->frags[i].page = page;
82 }
84 return 1;
85 }
87 static int queue_enqueue(struct sk_buff *skb, struct Qdisc* sch)
88 {
89 struct queue_sched_data *q = qdisc_priv(sch);
91 if (likely(sch->qstats.backlog + skb->len <= FIFO_BUF))
92 {
93 if (!q->stop)
94 q->stop = skb;
96 if (!skb_remove_foreign_references(skb)) {
97 printk("error removing foreign ref\n");
98 return qdisc_reshape_fail(skb, sch);
99 }
101 return qdisc_enqueue_tail(skb, sch);
102 }
103 printk("queue reported full: %d,%d\n", sch->qstats.backlog, skb->len);
105 return qdisc_reshape_fail(skb, sch);
106 }
108 /* dequeue doesn't actually dequeue until the release command is
109 * received. */
110 static inline struct sk_buff *queue_dequeue(struct Qdisc* sch)
111 {
112 struct queue_sched_data *q = qdisc_priv(sch);
113 struct sk_buff* peek;
114 /*
115 struct timeval tv;
117 if (!q->stop) {
118 do_gettimeofday(&tv);
119 printk("packet dequeued at %lu.%06lu\n", tv.tv_sec, tv.tv_usec);
120 }
121 */
123 if (sch->flags & TCQ_F_THROTTLED)
124 return NULL;
126 peek = (struct sk_buff *)((sch->q).next);
128 /* this pointer comparison may be shady */
129 if (peek == q->stop) {
130 /*
131 do_gettimeofday(&tv);
132 printk("stop packet at %lu.%06lu\n", tv.tv_sec, tv.tv_usec);
133 */
135 /* this is the tail of the last round. Release it and block the queue */
136 sch->flags |= TCQ_F_THROTTLED;
137 return NULL;
138 }
140 return qdisc_dequeue_head(sch);
141 }
143 static int queue_init(struct Qdisc *sch, struct rtattr *opt)
144 {
145 sch->flags |= TCQ_F_THROTTLED;
147 return 0;
148 }
150 /* receives two messages:
151 * 0: checkpoint queue (set stop to next packet)
152 * 1: dequeue until stop */
153 static int queue_change(struct Qdisc* sch, struct rtattr* opt)
154 {
155 struct queue_sched_data *q = qdisc_priv(sch);
156 struct tc_queue_qopt* msg;
157 /*
158 struct timeval tv;
159 */
161 if (!opt || RTA_PAYLOAD(opt) < sizeof(*msg))
162 return -EINVAL;
164 msg = RTA_DATA(opt);
166 if (msg->action == TCQ_CHECKPOINT) {
167 /* reset stop */
168 q->stop = NULL;
169 } else if (msg->action == TCQ_DEQUEUE) {
170 /* dequeue */
171 sch->flags &= ~TCQ_F_THROTTLED;
172 netif_schedule(sch->dev);
173 /*
174 do_gettimeofday(&tv);
175 printk("queue release at %lu.%06lu (%d bytes)\n", tv.tv_sec, tv.tv_usec,
176 sch->qstats.backlog);
177 */
178 } else {
179 return -EINVAL;
180 }
182 return 0;
183 }
185 struct Qdisc_ops queue_qdisc_ops = {
186 .id = "queue",
187 .priv_size = sizeof(struct queue_sched_data),
188 .enqueue = queue_enqueue,
189 .dequeue = queue_dequeue,
190 .init = queue_init,
191 .change = queue_change,
192 .owner = THIS_MODULE,
193 };
195 static int __init queue_module_init(void)
196 {
197 printk("loading queue\n");
198 return register_qdisc(&queue_qdisc_ops);
199 }
201 static void __exit queue_module_exit(void)
202 {
203 printk("queue unloaded\n");
204 unregister_qdisc(&queue_qdisc_ops);
205 }
206 module_init(queue_module_init)
207 module_exit(queue_module_exit)
208 MODULE_LICENSE("GPL");