debuggers.hg

view xen/common/rcupdate.c @ 22810:75b6287626ee

rcupdate: Make rcu_barrier() more paranoia-proof

I'm not sure my original barrier function is correct. It may allow a
CPU to exit the barrier loop, with no local work to do, while RCU work
is pending on other CPUs and needing one or more quiescent periods to
flush the work through.

Although rcu_pending() may handle this, it is easiest to follow
Linux's example and simply call_rcu() a callback function on every
CPU. When the callback has executed on every CPU, we know that all
previously-queued RCU work is completed, and we can exit the barrier.

Signed-off-by: Keir Fraser <keir@xen.org>
author Keir Fraser <keir@xen.org>
date Fri Jan 14 16:38:51 2011 +0000 (2011-01-14)
parents bb0d0141ebf7
children
line source
1 /*
2 * Read-Copy Update mechanism for mutual exclusion
3 *
4 * This program is free software; you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License as published by
6 * the Free Software Foundation; either version 2 of the License, or
7 * (at your option) any later version.
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * GNU General Public License for more details.
13 *
14 * You should have received a copy of the GNU General Public License
15 * along with this program; if not, write to the Free Software
16 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
17 *
18 * Copyright (C) IBM Corporation, 2001
19 *
20 * Authors: Dipankar Sarma <dipankar@in.ibm.com>
21 * Manfred Spraul <manfred@colorfullife.com>
22 *
23 * Modifications for Xen: Jose Renato Santos
24 * Copyright (C) Hewlett-Packard, 2006
25 *
26 * Based on the original work by Paul McKenney <paulmck@us.ibm.com>
27 * and inputs from Rusty Russell, Andrea Arcangeli and Andi Kleen.
28 * Papers:
29 * http://www.rdrop.com/users/paulmck/paper/rclockpdcsproof.pdf
30 * http://lse.sourceforge.net/locking/rclock_OLS.2001.05.01c.sc.pdf (OLS2001)
31 *
32 * For detailed explanation of Read-Copy Update mechanism see -
33 * http://lse.sourceforge.net/locking/rcupdate.html
34 */
35 #include <xen/types.h>
36 #include <xen/kernel.h>
37 #include <xen/init.h>
38 #include <xen/spinlock.h>
39 #include <xen/smp.h>
40 #include <xen/rcupdate.h>
41 #include <xen/sched.h>
42 #include <asm/atomic.h>
43 #include <xen/bitops.h>
44 #include <xen/percpu.h>
45 #include <xen/softirq.h>
46 #include <xen/cpu.h>
47 #include <xen/stop_machine.h>
49 /* Definition for rcupdate control block. */
50 struct rcu_ctrlblk rcu_ctrlblk = {
51 .cur = -300,
52 .completed = -300,
53 .lock = SPIN_LOCK_UNLOCKED,
54 .cpumask = CPU_MASK_NONE,
55 };
57 DEFINE_PER_CPU(struct rcu_data, rcu_data);
59 static int blimit = 10;
60 static int qhimark = 10000;
61 static int qlowmark = 100;
62 static int rsinterval = 1000;
64 struct rcu_barrier_data {
65 struct rcu_head head;
66 atomic_t *cpu_count;
67 };
69 static void rcu_barrier_callback(struct rcu_head *head)
70 {
71 struct rcu_barrier_data *data = container_of(
72 head, struct rcu_barrier_data, head);
73 atomic_inc(data->cpu_count);
74 }
76 static int rcu_barrier_action(void *_cpu_count)
77 {
78 struct rcu_barrier_data data = { .cpu_count = _cpu_count };
80 ASSERT(!local_irq_is_enabled());
81 local_irq_enable();
83 /*
84 * When callback is executed, all previously-queued RCU work on this CPU
85 * is completed. When all CPUs have executed their callback, data.cpu_count
86 * will have been incremented to include every online CPU.
87 */
88 call_rcu(&data.head, rcu_barrier_callback);
90 while ( atomic_read(data.cpu_count) != cpus_weight(cpu_online_map) )
91 {
92 process_pending_softirqs();
93 cpu_relax();
94 }
96 local_irq_disable();
98 return 0;
99 }
101 int rcu_barrier(void)
102 {
103 atomic_t cpu_count = ATOMIC_INIT(0);
104 return stop_machine_run(rcu_barrier_action, &cpu_count, NR_CPUS);
105 }
107 static void force_quiescent_state(struct rcu_data *rdp,
108 struct rcu_ctrlblk *rcp)
109 {
110 cpumask_t cpumask;
111 raise_softirq(SCHEDULE_SOFTIRQ);
112 if (unlikely(rdp->qlen - rdp->last_rs_qlen > rsinterval)) {
113 rdp->last_rs_qlen = rdp->qlen;
114 /*
115 * Don't send IPI to itself. With irqs disabled,
116 * rdp->cpu is the current cpu.
117 */
118 cpumask = rcp->cpumask;
119 cpu_clear(rdp->cpu, cpumask);
120 cpumask_raise_softirq(cpumask, SCHEDULE_SOFTIRQ);
121 }
122 }
124 /**
125 * call_rcu - Queue an RCU callback for invocation after a grace period.
126 * @head: structure to be used for queueing the RCU updates.
127 * @func: actual update function to be invoked after the grace period
128 *
129 * The update function will be invoked some time after a full grace
130 * period elapses, in other words after all currently executing RCU
131 * read-side critical sections have completed. RCU read-side critical
132 * sections are delimited by rcu_read_lock() and rcu_read_unlock(),
133 * and may be nested.
134 */
135 void fastcall call_rcu(struct rcu_head *head,
136 void (*func)(struct rcu_head *rcu))
137 {
138 unsigned long flags;
139 struct rcu_data *rdp;
141 head->func = func;
142 head->next = NULL;
143 local_irq_save(flags);
144 rdp = &__get_cpu_var(rcu_data);
145 *rdp->nxttail = head;
146 rdp->nxttail = &head->next;
147 if (unlikely(++rdp->qlen > qhimark)) {
148 rdp->blimit = INT_MAX;
149 force_quiescent_state(rdp, &rcu_ctrlblk);
150 }
151 local_irq_restore(flags);
152 }
154 /*
155 * Invoke the completed RCU callbacks. They are expected to be in
156 * a per-cpu list.
157 */
158 static void rcu_do_batch(struct rcu_data *rdp)
159 {
160 struct rcu_head *next, *list;
161 int count = 0;
163 list = rdp->donelist;
164 while (list) {
165 next = rdp->donelist = list->next;
166 list->func(list);
167 list = next;
168 rdp->qlen--;
169 if (++count >= rdp->blimit)
170 break;
171 }
172 if (rdp->blimit == INT_MAX && rdp->qlen <= qlowmark)
173 rdp->blimit = blimit;
174 if (!rdp->donelist)
175 rdp->donetail = &rdp->donelist;
176 else
177 raise_softirq(RCU_SOFTIRQ);
178 }
180 /*
181 * Grace period handling:
182 * The grace period handling consists out of two steps:
183 * - A new grace period is started.
184 * This is done by rcu_start_batch. The start is not broadcasted to
185 * all cpus, they must pick this up by comparing rcp->cur with
186 * rdp->quiescbatch. All cpus are recorded in the
187 * rcu_ctrlblk.cpumask bitmap.
188 * - All cpus must go through a quiescent state.
189 * Since the start of the grace period is not broadcasted, at least two
190 * calls to rcu_check_quiescent_state are required:
191 * The first call just notices that a new grace period is running. The
192 * following calls check if there was a quiescent state since the beginning
193 * of the grace period. If so, it updates rcu_ctrlblk.cpumask. If
194 * the bitmap is empty, then the grace period is completed.
195 * rcu_check_quiescent_state calls rcu_start_batch(0) to start the next grace
196 * period (if necessary).
197 */
198 /*
199 * Register a new batch of callbacks, and start it up if there is currently no
200 * active batch and the batch to be registered has not already occurred.
201 * Caller must hold rcu_ctrlblk.lock.
202 */
203 static void rcu_start_batch(struct rcu_ctrlblk *rcp)
204 {
205 if (rcp->next_pending &&
206 rcp->completed == rcp->cur) {
207 rcp->next_pending = 0;
208 /*
209 * next_pending == 0 must be visible in
210 * __rcu_process_callbacks() before it can see new value of cur.
211 */
212 smp_wmb();
213 rcp->cur++;
215 rcp->cpumask = cpu_online_map;
216 }
217 }
219 /*
220 * cpu went through a quiescent state since the beginning of the grace period.
221 * Clear it from the cpu mask and complete the grace period if it was the last
222 * cpu. Start another grace period if someone has further entries pending
223 */
224 static void cpu_quiet(int cpu, struct rcu_ctrlblk *rcp)
225 {
226 cpu_clear(cpu, rcp->cpumask);
227 if (cpus_empty(rcp->cpumask)) {
228 /* batch completed ! */
229 rcp->completed = rcp->cur;
230 rcu_start_batch(rcp);
231 }
232 }
234 /*
235 * Check if the cpu has gone through a quiescent state (say context
236 * switch). If so and if it already hasn't done so in this RCU
237 * quiescent cycle, then indicate that it has done so.
238 */
239 static void rcu_check_quiescent_state(struct rcu_ctrlblk *rcp,
240 struct rcu_data *rdp)
241 {
242 if (rdp->quiescbatch != rcp->cur) {
243 /* start new grace period: */
244 rdp->qs_pending = 1;
245 rdp->quiescbatch = rcp->cur;
246 return;
247 }
249 /* Grace period already completed for this cpu?
250 * qs_pending is checked instead of the actual bitmap to avoid
251 * cacheline trashing.
252 */
253 if (!rdp->qs_pending)
254 return;
256 rdp->qs_pending = 0;
258 spin_lock(&rcp->lock);
259 /*
260 * rdp->quiescbatch/rcp->cur and the cpu bitmap can come out of sync
261 * during cpu startup. Ignore the quiescent state.
262 */
263 if (likely(rdp->quiescbatch == rcp->cur))
264 cpu_quiet(rdp->cpu, rcp);
266 spin_unlock(&rcp->lock);
267 }
270 /*
271 * This does the RCU processing work from softirq context.
272 */
273 static void __rcu_process_callbacks(struct rcu_ctrlblk *rcp,
274 struct rcu_data *rdp)
275 {
276 if (rdp->curlist && !rcu_batch_before(rcp->completed, rdp->batch)) {
277 *rdp->donetail = rdp->curlist;
278 rdp->donetail = rdp->curtail;
279 rdp->curlist = NULL;
280 rdp->curtail = &rdp->curlist;
281 }
283 local_irq_disable();
284 if (rdp->nxtlist && !rdp->curlist) {
285 rdp->curlist = rdp->nxtlist;
286 rdp->curtail = rdp->nxttail;
287 rdp->nxtlist = NULL;
288 rdp->nxttail = &rdp->nxtlist;
289 local_irq_enable();
291 /*
292 * start the next batch of callbacks
293 */
295 /* determine batch number */
296 rdp->batch = rcp->cur + 1;
297 /* see the comment and corresponding wmb() in
298 * the rcu_start_batch()
299 */
300 smp_rmb();
302 if (!rcp->next_pending) {
303 /* and start it/schedule start if it's a new batch */
304 spin_lock(&rcp->lock);
305 rcp->next_pending = 1;
306 rcu_start_batch(rcp);
307 spin_unlock(&rcp->lock);
308 }
309 } else {
310 local_irq_enable();
311 }
312 rcu_check_quiescent_state(rcp, rdp);
313 if (rdp->donelist)
314 rcu_do_batch(rdp);
315 }
317 static void rcu_process_callbacks(void)
318 {
319 __rcu_process_callbacks(&rcu_ctrlblk, &__get_cpu_var(rcu_data));
320 }
322 static int __rcu_pending(struct rcu_ctrlblk *rcp, struct rcu_data *rdp)
323 {
324 /* This cpu has pending rcu entries and the grace period
325 * for them has completed.
326 */
327 if (rdp->curlist && !rcu_batch_before(rcp->completed, rdp->batch))
328 return 1;
330 /* This cpu has no pending entries, but there are new entries */
331 if (!rdp->curlist && rdp->nxtlist)
332 return 1;
334 /* This cpu has finished callbacks to invoke */
335 if (rdp->donelist)
336 return 1;
338 /* The rcu core waits for a quiescent state from the cpu */
339 if (rdp->quiescbatch != rcp->cur || rdp->qs_pending)
340 return 1;
342 /* nothing to do */
343 return 0;
344 }
346 int rcu_pending(int cpu)
347 {
348 return __rcu_pending(&rcu_ctrlblk, &per_cpu(rcu_data, cpu));
349 }
351 /*
352 * Check to see if any future RCU-related work will need to be done
353 * by the current CPU, even if none need be done immediately, returning
354 * 1 if so. This function is part of the RCU implementation; it is -not-
355 * an exported member of the RCU API.
356 */
357 int rcu_needs_cpu(int cpu)
358 {
359 struct rcu_data *rdp = &per_cpu(rcu_data, cpu);
361 return (!!rdp->curlist || rcu_pending(cpu));
362 }
364 void rcu_check_callbacks(int cpu)
365 {
366 raise_softirq(RCU_SOFTIRQ);
367 }
369 static void rcu_move_batch(struct rcu_data *this_rdp, struct rcu_head *list,
370 struct rcu_head **tail)
371 {
372 local_irq_disable();
373 *this_rdp->nxttail = list;
374 if (list)
375 this_rdp->nxttail = tail;
376 local_irq_enable();
377 }
379 static void rcu_offline_cpu(struct rcu_data *this_rdp,
380 struct rcu_ctrlblk *rcp, struct rcu_data *rdp)
381 {
382 /* If the cpu going offline owns the grace period we can block
383 * indefinitely waiting for it, so flush it here.
384 */
385 spin_lock(&rcp->lock);
386 if (rcp->cur != rcp->completed)
387 cpu_quiet(rdp->cpu, rcp);
388 spin_unlock(&rcp->lock);
390 rcu_move_batch(this_rdp, rdp->donelist, rdp->donetail);
391 rcu_move_batch(this_rdp, rdp->curlist, rdp->curtail);
392 rcu_move_batch(this_rdp, rdp->nxtlist, rdp->nxttail);
394 local_irq_disable();
395 this_rdp->qlen += rdp->qlen;
396 local_irq_enable();
397 }
399 static void rcu_init_percpu_data(int cpu, struct rcu_ctrlblk *rcp,
400 struct rcu_data *rdp)
401 {
402 memset(rdp, 0, sizeof(*rdp));
403 rdp->curtail = &rdp->curlist;
404 rdp->nxttail = &rdp->nxtlist;
405 rdp->donetail = &rdp->donelist;
406 rdp->quiescbatch = rcp->completed;
407 rdp->qs_pending = 0;
408 rdp->cpu = cpu;
409 rdp->blimit = blimit;
410 }
412 static int cpu_callback(
413 struct notifier_block *nfb, unsigned long action, void *hcpu)
414 {
415 unsigned int cpu = (unsigned long)hcpu;
416 struct rcu_data *rdp = &per_cpu(rcu_data, cpu);
418 switch ( action )
419 {
420 case CPU_UP_PREPARE:
421 rcu_init_percpu_data(cpu, &rcu_ctrlblk, rdp);
422 break;
423 case CPU_UP_CANCELED:
424 case CPU_DEAD:
425 rcu_offline_cpu(&this_cpu(rcu_data), &rcu_ctrlblk, rdp);
426 break;
427 default:
428 break;
429 }
431 return NOTIFY_DONE;
432 }
434 static struct notifier_block cpu_nfb = {
435 .notifier_call = cpu_callback
436 };
438 void __init rcu_init(void)
439 {
440 void *cpu = (void *)(long)smp_processor_id();
441 cpu_callback(&cpu_nfb, CPU_UP_PREPARE, cpu);
442 register_cpu_notifier(&cpu_nfb);
443 open_softirq(RCU_SOFTIRQ, rcu_process_callbacks);
444 }