1/* -*- mode: c; c-basic-offset: 8; -*-
2 * vim: noexpandtab sw=8 ts=8 sts=0:
3 *
4 * dlmmod.c
5 *
6 * standalone DLM module
7 *
8 * Copyright (C) 2004 Oracle.  All rights reserved.
9 *
10 * This program is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU General Public
12 * License as published by the Free Software Foundation; either
13 * version 2 of the License, or (at your option) any later version.
14 *
15 * This program is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18 * General Public License for more details.
19 *
20 * You should have received a copy of the GNU General Public
21 * License along with this program; if not, write to the
22 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
23 * Boston, MA 021110-1307, USA.
24 *
25 */
26
27
28#include <linux/module.h>
29#include <linux/fs.h>
30#include <linux/types.h>
31#include <linux/slab.h>
32#include <linux/highmem.h>
33#include <linux/init.h>
34#include <linux/sysctl.h>
35#include <linux/random.h>
36#include <linux/blkdev.h>
37#include <linux/socket.h>
38#include <linux/inet.h>
39#include <linux/spinlock.h>
40#include <linux/delay.h>
41
42
43#include "cluster/heartbeat.h"
44#include "cluster/nodemanager.h"
45#include "cluster/tcp.h"
46
47#include "dlmapi.h"
48#include "dlmcommon.h"
49#include "dlmdomain.h"
50#include "dlmdebug.h"
51
52#define MLOG_MASK_PREFIX (ML_DLM|ML_DLM_MASTER)
53#include "cluster/masklog.h"
54
55static void dlm_mle_node_down(struct dlm_ctxt *dlm,
56			      struct dlm_master_list_entry *mle,
57			      struct o2nm_node *node,
58			      int idx);
59static void dlm_mle_node_up(struct dlm_ctxt *dlm,
60			    struct dlm_master_list_entry *mle,
61			    struct o2nm_node *node,
62			    int idx);
63
64static void dlm_assert_master_worker(struct dlm_work_item *item, void *data);
65static int dlm_do_assert_master(struct dlm_ctxt *dlm,
66				struct dlm_lock_resource *res,
67				void *nodemap, u32 flags);
68static void dlm_deref_lockres_worker(struct dlm_work_item *item, void *data);
69
70static inline int dlm_mle_equal(struct dlm_ctxt *dlm,
71				struct dlm_master_list_entry *mle,
72				const char *name,
73				unsigned int namelen)
74{
75	if (dlm != mle->dlm)
76		return 0;
77
78	if (namelen != mle->mnamelen ||
79	    memcmp(name, mle->mname, namelen) != 0)
80		return 0;
81
82	return 1;
83}
84
85static struct kmem_cache *dlm_lockres_cache;
86static struct kmem_cache *dlm_lockname_cache;
87static struct kmem_cache *dlm_mle_cache;
88
89static void dlm_mle_release(struct kref *kref);
90static void dlm_init_mle(struct dlm_master_list_entry *mle,
91			enum dlm_mle_type type,
92			struct dlm_ctxt *dlm,
93			struct dlm_lock_resource *res,
94			const char *name,
95			unsigned int namelen);
96static void dlm_put_mle(struct dlm_master_list_entry *mle);
97static void __dlm_put_mle(struct dlm_master_list_entry *mle);
98static int dlm_find_mle(struct dlm_ctxt *dlm,
99			struct dlm_master_list_entry **mle,
100			char *name, unsigned int namelen);
101
102static int dlm_do_master_request(struct dlm_lock_resource *res,
103				 struct dlm_master_list_entry *mle, int to);
104
105
106static int dlm_wait_for_lock_mastery(struct dlm_ctxt *dlm,
107				     struct dlm_lock_resource *res,
108				     struct dlm_master_list_entry *mle,
109				     int *blocked);
110static int dlm_restart_lock_mastery(struct dlm_ctxt *dlm,
111				    struct dlm_lock_resource *res,
112				    struct dlm_master_list_entry *mle,
113				    int blocked);
114static int dlm_add_migration_mle(struct dlm_ctxt *dlm,
115				 struct dlm_lock_resource *res,
116				 struct dlm_master_list_entry *mle,
117				 struct dlm_master_list_entry **oldmle,
118				 const char *name, unsigned int namelen,
119				 u8 new_master, u8 master);
120
121static u8 dlm_pick_migration_target(struct dlm_ctxt *dlm,
122				    struct dlm_lock_resource *res);
123static void dlm_remove_nonlocal_locks(struct dlm_ctxt *dlm,
124				      struct dlm_lock_resource *res);
125static int dlm_mark_lockres_migrating(struct dlm_ctxt *dlm,
126				       struct dlm_lock_resource *res,
127				       u8 target);
128static int dlm_pre_master_reco_lockres(struct dlm_ctxt *dlm,
129				       struct dlm_lock_resource *res);
130
131
132int dlm_is_host_down(int errno)
133{
134	switch (errno) {
135		case -EBADF:
136		case -ECONNREFUSED:
137		case -ENOTCONN:
138		case -ECONNRESET:
139		case -EPIPE:
140		case -EHOSTDOWN:
141		case -EHOSTUNREACH:
142		case -ETIMEDOUT:
143		case -ECONNABORTED:
144		case -ENETDOWN:
145		case -ENETUNREACH:
146		case -ENETRESET:
147		case -ESHUTDOWN:
148		case -ENOPROTOOPT:
149		case -EINVAL:   /* if returned from our tcp code,
150				   this means there is no socket */
151			return 1;
152	}
153	return 0;
154}
155
156
157/*
158 * MASTER LIST FUNCTIONS
159 */
160
161
162/*
163 * regarding master list entries and heartbeat callbacks:
164 *
165 * in order to avoid sleeping and allocation that occurs in
166 * heartbeat, master list entries are simply attached to the
167 * dlm's established heartbeat callbacks.  the mle is attached
168 * when it is created, and since the dlm->spinlock is held at
169 * that time, any heartbeat event will be properly discovered
170 * by the mle.  the mle needs to be detached from the
171 * dlm->mle_hb_events list as soon as heartbeat events are no
172 * longer useful to the mle, and before the mle is freed.
173 *
174 * as a general rule, heartbeat events are no longer needed by
175 * the mle once an "answer" regarding the lock master has been
176 * received.
177 */
178static inline void __dlm_mle_attach_hb_events(struct dlm_ctxt *dlm,
179					      struct dlm_master_list_entry *mle)
180{
181	assert_spin_locked(&dlm->spinlock);
182
183	list_add_tail(&mle->hb_events, &dlm->mle_hb_events);
184}
185
186
187static inline void __dlm_mle_detach_hb_events(struct dlm_ctxt *dlm,
188					      struct dlm_master_list_entry *mle)
189{
190	if (!list_empty(&mle->hb_events))
191		list_del_init(&mle->hb_events);
192}
193
194
195static inline void dlm_mle_detach_hb_events(struct dlm_ctxt *dlm,
196					    struct dlm_master_list_entry *mle)
197{
198	spin_lock(&dlm->spinlock);
199	__dlm_mle_detach_hb_events(dlm, mle);
200	spin_unlock(&dlm->spinlock);
201}
202
203static void dlm_get_mle_inuse(struct dlm_master_list_entry *mle)
204{
205	struct dlm_ctxt *dlm;
206	dlm = mle->dlm;
207
208	assert_spin_locked(&dlm->spinlock);
209	assert_spin_locked(&dlm->master_lock);
210	mle->inuse++;
211	kref_get(&mle->mle_refs);
212}
213
214static void dlm_put_mle_inuse(struct dlm_master_list_entry *mle)
215{
216	struct dlm_ctxt *dlm;
217	dlm = mle->dlm;
218
219	spin_lock(&dlm->spinlock);
220	spin_lock(&dlm->master_lock);
221	mle->inuse--;
222	__dlm_put_mle(mle);
223	spin_unlock(&dlm->master_lock);
224	spin_unlock(&dlm->spinlock);
225
226}
227
228/* remove from list and free */
229static void __dlm_put_mle(struct dlm_master_list_entry *mle)
230{
231	struct dlm_ctxt *dlm;
232	dlm = mle->dlm;
233
234	assert_spin_locked(&dlm->spinlock);
235	assert_spin_locked(&dlm->master_lock);
236	if (!atomic_read(&mle->mle_refs.refcount)) {
237		/* this may or may not crash, but who cares.
238		 * it's a BUG. */
239		mlog(ML_ERROR, "bad mle: %p\n", mle);
240		dlm_print_one_mle(mle);
241		BUG();
242	} else
243		kref_put(&mle->mle_refs, dlm_mle_release);
244}
245
246
247/* must not have any spinlocks coming in */
248static void dlm_put_mle(struct dlm_master_list_entry *mle)
249{
250	struct dlm_ctxt *dlm;
251	dlm = mle->dlm;
252
253	spin_lock(&dlm->spinlock);
254	spin_lock(&dlm->master_lock);
255	__dlm_put_mle(mle);
256	spin_unlock(&dlm->master_lock);
257	spin_unlock(&dlm->spinlock);
258}
259
260static inline void dlm_get_mle(struct dlm_master_list_entry *mle)
261{
262	kref_get(&mle->mle_refs);
263}
264
265static void dlm_init_mle(struct dlm_master_list_entry *mle,
266			enum dlm_mle_type type,
267			struct dlm_ctxt *dlm,
268			struct dlm_lock_resource *res,
269			const char *name,
270			unsigned int namelen)
271{
272	assert_spin_locked(&dlm->spinlock);
273
274	mle->dlm = dlm;
275	mle->type = type;
276	INIT_HLIST_NODE(&mle->master_hash_node);
277	INIT_LIST_HEAD(&mle->hb_events);
278	memset(mle->maybe_map, 0, sizeof(mle->maybe_map));
279	spin_lock_init(&mle->spinlock);
280	init_waitqueue_head(&mle->wq);
281	atomic_set(&mle->woken, 0);
282	kref_init(&mle->mle_refs);
283	memset(mle->response_map, 0, sizeof(mle->response_map));
284	mle->master = O2NM_MAX_NODES;
285	mle->new_master = O2NM_MAX_NODES;
286	mle->inuse = 0;
287
288	BUG_ON(mle->type != DLM_MLE_BLOCK &&
289	       mle->type != DLM_MLE_MASTER &&
290	       mle->type != DLM_MLE_MIGRATION);
291
292	if (mle->type == DLM_MLE_MASTER) {
293		BUG_ON(!res);
294		mle->mleres = res;
295		memcpy(mle->mname, res->lockname.name, res->lockname.len);
296		mle->mnamelen = res->lockname.len;
297		mle->mnamehash = res->lockname.hash;
298	} else {
299		BUG_ON(!name);
300		mle->mleres = NULL;
301		memcpy(mle->mname, name, namelen);
302		mle->mnamelen = namelen;
303		mle->mnamehash = dlm_lockid_hash(name, namelen);
304	}
305
306	atomic_inc(&dlm->mle_tot_count[mle->type]);
307	atomic_inc(&dlm->mle_cur_count[mle->type]);
308
309	/* copy off the node_map and register hb callbacks on our copy */
310	memcpy(mle->node_map, dlm->domain_map, sizeof(mle->node_map));
311	memcpy(mle->vote_map, dlm->domain_map, sizeof(mle->vote_map));
312	clear_bit(dlm->node_num, mle->vote_map);
313	clear_bit(dlm->node_num, mle->node_map);
314
315	/* attach the mle to the domain node up/down events */
316	__dlm_mle_attach_hb_events(dlm, mle);
317}
318
319void __dlm_unlink_mle(struct dlm_ctxt *dlm, struct dlm_master_list_entry *mle)
320{
321	assert_spin_locked(&dlm->spinlock);
322	assert_spin_locked(&dlm->master_lock);
323
324	if (!hlist_unhashed(&mle->master_hash_node))
325		hlist_del_init(&mle->master_hash_node);
326}
327
328void __dlm_insert_mle(struct dlm_ctxt *dlm, struct dlm_master_list_entry *mle)
329{
330	struct hlist_head *bucket;
331
332	assert_spin_locked(&dlm->master_lock);
333
334	bucket = dlm_master_hash(dlm, mle->mnamehash);
335	hlist_add_head(&mle->master_hash_node, bucket);
336}
337
338/* returns 1 if found, 0 if not */
339static int dlm_find_mle(struct dlm_ctxt *dlm,
340			struct dlm_master_list_entry **mle,
341			char *name, unsigned int namelen)
342{
343	struct dlm_master_list_entry *tmpmle;
344	struct hlist_head *bucket;
345	unsigned int hash;
346
347	assert_spin_locked(&dlm->master_lock);
348
349	hash = dlm_lockid_hash(name, namelen);
350	bucket = dlm_master_hash(dlm, hash);
351	hlist_for_each_entry(tmpmle, bucket, master_hash_node) {
352		if (!dlm_mle_equal(dlm, tmpmle, name, namelen))
353			continue;
354		dlm_get_mle(tmpmle);
355		*mle = tmpmle;
356		return 1;
357	}
358	return 0;
359}
360
361void dlm_hb_event_notify_attached(struct dlm_ctxt *dlm, int idx, int node_up)
362{
363	struct dlm_master_list_entry *mle;
364
365	assert_spin_locked(&dlm->spinlock);
366
367	list_for_each_entry(mle, &dlm->mle_hb_events, hb_events) {
368		if (node_up)
369			dlm_mle_node_up(dlm, mle, NULL, idx);
370		else
371			dlm_mle_node_down(dlm, mle, NULL, idx);
372	}
373}
374
375static void dlm_mle_node_down(struct dlm_ctxt *dlm,
376			      struct dlm_master_list_entry *mle,
377			      struct o2nm_node *node, int idx)
378{
379	spin_lock(&mle->spinlock);
380
381	if (!test_bit(idx, mle->node_map))
382		mlog(0, "node %u already removed from nodemap!\n", idx);
383	else
384		clear_bit(idx, mle->node_map);
385
386	spin_unlock(&mle->spinlock);
387}
388
389static void dlm_mle_node_up(struct dlm_ctxt *dlm,
390			    struct dlm_master_list_entry *mle,
391			    struct o2nm_node *node, int idx)
392{
393	spin_lock(&mle->spinlock);
394
395	if (test_bit(idx, mle->node_map))
396		mlog(0, "node %u already in node map!\n", idx);
397	else
398		set_bit(idx, mle->node_map);
399
400	spin_unlock(&mle->spinlock);
401}
402
403
404int dlm_init_mle_cache(void)
405{
406	dlm_mle_cache = kmem_cache_create("o2dlm_mle",
407					  sizeof(struct dlm_master_list_entry),
408					  0, SLAB_HWCACHE_ALIGN,
409					  NULL);
410	if (dlm_mle_cache == NULL)
411		return -ENOMEM;
412	return 0;
413}
414
415void dlm_destroy_mle_cache(void)
416{
417	if (dlm_mle_cache)
418		kmem_cache_destroy(dlm_mle_cache);
419}
420
421static void dlm_mle_release(struct kref *kref)
422{
423	struct dlm_master_list_entry *mle;
424	struct dlm_ctxt *dlm;
425
426	mle = container_of(kref, struct dlm_master_list_entry, mle_refs);
427	dlm = mle->dlm;
428
429	assert_spin_locked(&dlm->spinlock);
430	assert_spin_locked(&dlm->master_lock);
431
432	mlog(0, "Releasing mle for %.*s, type %d\n", mle->mnamelen, mle->mname,
433	     mle->type);
434
435	/* remove from list if not already */
436	__dlm_unlink_mle(dlm, mle);
437
438	/* detach the mle from the domain node up/down events */
439	__dlm_mle_detach_hb_events(dlm, mle);
440
441	atomic_dec(&dlm->mle_cur_count[mle->type]);
442
443	/* NOTE: kfree under spinlock here.
444	 * if this is bad, we can move this to a freelist. */
445	kmem_cache_free(dlm_mle_cache, mle);
446}
447
448
449/*
450 * LOCK RESOURCE FUNCTIONS
451 */
452
453int dlm_init_master_caches(void)
454{
455	dlm_lockres_cache = kmem_cache_create("o2dlm_lockres",
456					      sizeof(struct dlm_lock_resource),
457					      0, SLAB_HWCACHE_ALIGN, NULL);
458	if (!dlm_lockres_cache)
459		goto bail;
460
461	dlm_lockname_cache = kmem_cache_create("o2dlm_lockname",
462					       DLM_LOCKID_NAME_MAX, 0,
463					       SLAB_HWCACHE_ALIGN, NULL);
464	if (!dlm_lockname_cache)
465		goto bail;
466
467	return 0;
468bail:
469	dlm_destroy_master_caches();
470	return -ENOMEM;
471}
472
473void dlm_destroy_master_caches(void)
474{
475	if (dlm_lockname_cache) {
476		kmem_cache_destroy(dlm_lockname_cache);
477		dlm_lockname_cache = NULL;
478	}
479
480	if (dlm_lockres_cache) {
481		kmem_cache_destroy(dlm_lockres_cache);
482		dlm_lockres_cache = NULL;
483	}
484}
485
486static void dlm_lockres_release(struct kref *kref)
487{
488	struct dlm_lock_resource *res;
489	struct dlm_ctxt *dlm;
490
491	res = container_of(kref, struct dlm_lock_resource, refs);
492	dlm = res->dlm;
493
494	/* This should not happen -- all lockres' have a name
495	 * associated with them at init time. */
496	BUG_ON(!res->lockname.name);
497
498	mlog(0, "destroying lockres %.*s\n", res->lockname.len,
499	     res->lockname.name);
500
501	spin_lock(&dlm->track_lock);
502	if (!list_empty(&res->tracking))
503		list_del_init(&res->tracking);
504	else {
505		mlog(ML_ERROR, "Resource %.*s not on the Tracking list\n",
506		     res->lockname.len, res->lockname.name);
507		dlm_print_one_lock_resource(res);
508	}
509	spin_unlock(&dlm->track_lock);
510
511	atomic_dec(&dlm->res_cur_count);
512
513	if (!hlist_unhashed(&res->hash_node) ||
514	    !list_empty(&res->granted) ||
515	    !list_empty(&res->converting) ||
516	    !list_empty(&res->blocked) ||
517	    !list_empty(&res->dirty) ||
518	    !list_empty(&res->recovering) ||
519	    !list_empty(&res->purge)) {
520		mlog(ML_ERROR,
521		     "Going to BUG for resource %.*s."
522		     "  We're on a list! [%c%c%c%c%c%c%c]\n",
523		     res->lockname.len, res->lockname.name,
524		     !hlist_unhashed(&res->hash_node) ? 'H' : ' ',
525		     !list_empty(&res->granted) ? 'G' : ' ',
526		     !list_empty(&res->converting) ? 'C' : ' ',
527		     !list_empty(&res->blocked) ? 'B' : ' ',
528		     !list_empty(&res->dirty) ? 'D' : ' ',
529		     !list_empty(&res->recovering) ? 'R' : ' ',
530		     !list_empty(&res->purge) ? 'P' : ' ');
531
532		dlm_print_one_lock_resource(res);
533	}
534
535	/* By the time we're ready to blow this guy away, we shouldn't
536	 * be on any lists. */
537	BUG_ON(!hlist_unhashed(&res->hash_node));
538	BUG_ON(!list_empty(&res->granted));
539	BUG_ON(!list_empty(&res->converting));
540	BUG_ON(!list_empty(&res->blocked));
541	BUG_ON(!list_empty(&res->dirty));
542	BUG_ON(!list_empty(&res->recovering));
543	BUG_ON(!list_empty(&res->purge));
544
545	kmem_cache_free(dlm_lockname_cache, (void *)res->lockname.name);
546
547	kmem_cache_free(dlm_lockres_cache, res);
548}
549
550void dlm_lockres_put(struct dlm_lock_resource *res)
551{
552	kref_put(&res->refs, dlm_lockres_release);
553}
554
555static void dlm_init_lockres(struct dlm_ctxt *dlm,
556			     struct dlm_lock_resource *res,
557			     const char *name, unsigned int namelen)
558{
559	char *qname;
560
561	/* If we memset here, we lose our reference to the kmalloc'd
562	 * res->lockname.name, so be sure to init every field
563	 * correctly! */
564
565	qname = (char *) res->lockname.name;
566	memcpy(qname, name, namelen);
567
568	res->lockname.len = namelen;
569	res->lockname.hash = dlm_lockid_hash(name, namelen);
570
571	init_waitqueue_head(&res->wq);
572	spin_lock_init(&res->spinlock);
573	INIT_HLIST_NODE(&res->hash_node);
574	INIT_LIST_HEAD(&res->granted);
575	INIT_LIST_HEAD(&res->converting);
576	INIT_LIST_HEAD(&res->blocked);
577	INIT_LIST_HEAD(&res->dirty);
578	INIT_LIST_HEAD(&res->recovering);
579	INIT_LIST_HEAD(&res->purge);
580	INIT_LIST_HEAD(&res->tracking);
581	atomic_set(&res->asts_reserved, 0);
582	res->migration_pending = 0;
583	res->inflight_locks = 0;
584	res->inflight_assert_workers = 0;
585
586	res->dlm = dlm;
587
588	kref_init(&res->refs);
589
590	atomic_inc(&dlm->res_tot_count);
591	atomic_inc(&dlm->res_cur_count);
592
593	/* just for consistency */
594	spin_lock(&res->spinlock);
595	dlm_set_lockres_owner(dlm, res, DLM_LOCK_RES_OWNER_UNKNOWN);
596	spin_unlock(&res->spinlock);
597
598	res->state = DLM_LOCK_RES_IN_PROGRESS;
599
600	res->last_used = 0;
601
602	spin_lock(&dlm->spinlock);
603	list_add_tail(&res->tracking, &dlm->tracking_list);
604	spin_unlock(&dlm->spinlock);
605
606	memset(res->lvb, 0, DLM_LVB_LEN);
607	memset(res->refmap, 0, sizeof(res->refmap));
608}
609
610struct dlm_lock_resource *dlm_new_lockres(struct dlm_ctxt *dlm,
611				   const char *name,
612				   unsigned int namelen)
613{
614	struct dlm_lock_resource *res = NULL;
615
616	res = kmem_cache_zalloc(dlm_lockres_cache, GFP_NOFS);
617	if (!res)
618		goto error;
619
620	res->lockname.name = kmem_cache_zalloc(dlm_lockname_cache, GFP_NOFS);
621	if (!res->lockname.name)
622		goto error;
623
624	dlm_init_lockres(dlm, res, name, namelen);
625	return res;
626
627error:
628	if (res)
629		kmem_cache_free(dlm_lockres_cache, res);
630	return NULL;
631}
632
633void dlm_lockres_set_refmap_bit(struct dlm_ctxt *dlm,
634				struct dlm_lock_resource *res, int bit)
635{
636	assert_spin_locked(&res->spinlock);
637
638	mlog(0, "res %.*s, set node %u, %ps()\n", res->lockname.len,
639	     res->lockname.name, bit, __builtin_return_address(0));
640
641	set_bit(bit, res->refmap);
642}
643
644void dlm_lockres_clear_refmap_bit(struct dlm_ctxt *dlm,
645				  struct dlm_lock_resource *res, int bit)
646{
647	assert_spin_locked(&res->spinlock);
648
649	mlog(0, "res %.*s, clr node %u, %ps()\n", res->lockname.len,
650	     res->lockname.name, bit, __builtin_return_address(0));
651
652	clear_bit(bit, res->refmap);
653}
654
655static void __dlm_lockres_grab_inflight_ref(struct dlm_ctxt *dlm,
656				   struct dlm_lock_resource *res)
657{
658	res->inflight_locks++;
659
660	mlog(0, "%s: res %.*s, inflight++: now %u, %ps()\n", dlm->name,
661	     res->lockname.len, res->lockname.name, res->inflight_locks,
662	     __builtin_return_address(0));
663}
664
665void dlm_lockres_grab_inflight_ref(struct dlm_ctxt *dlm,
666				   struct dlm_lock_resource *res)
667{
668	assert_spin_locked(&res->spinlock);
669	__dlm_lockres_grab_inflight_ref(dlm, res);
670}
671
672void dlm_lockres_drop_inflight_ref(struct dlm_ctxt *dlm,
673				   struct dlm_lock_resource *res)
674{
675	assert_spin_locked(&res->spinlock);
676
677	BUG_ON(res->inflight_locks == 0);
678
679	res->inflight_locks--;
680
681	mlog(0, "%s: res %.*s, inflight--: now %u, %ps()\n", dlm->name,
682	     res->lockname.len, res->lockname.name, res->inflight_locks,
683	     __builtin_return_address(0));
684
685	wake_up(&res->wq);
686}
687
688void __dlm_lockres_grab_inflight_worker(struct dlm_ctxt *dlm,
689		struct dlm_lock_resource *res)
690{
691	assert_spin_locked(&res->spinlock);
692	res->inflight_assert_workers++;
693	mlog(0, "%s:%.*s: inflight assert worker++: now %u\n",
694			dlm->name, res->lockname.len, res->lockname.name,
695			res->inflight_assert_workers);
696}
697
698static void __dlm_lockres_drop_inflight_worker(struct dlm_ctxt *dlm,
699		struct dlm_lock_resource *res)
700{
701	assert_spin_locked(&res->spinlock);
702	BUG_ON(res->inflight_assert_workers == 0);
703	res->inflight_assert_workers--;
704	mlog(0, "%s:%.*s: inflight assert worker--: now %u\n",
705			dlm->name, res->lockname.len, res->lockname.name,
706			res->inflight_assert_workers);
707}
708
709static void dlm_lockres_drop_inflight_worker(struct dlm_ctxt *dlm,
710		struct dlm_lock_resource *res)
711{
712	spin_lock(&res->spinlock);
713	__dlm_lockres_drop_inflight_worker(dlm, res);
714	spin_unlock(&res->spinlock);
715}
716
717/*
718 * lookup a lock resource by name.
719 * may already exist in the hashtable.
720 * lockid is null terminated
721 *
722 * if not, allocate enough for the lockres and for
723 * the temporary structure used in doing the mastering.
724 *
725 * also, do a lookup in the dlm->master_list to see
726 * if another node has begun mastering the same lock.
727 * if so, there should be a block entry in there
728 * for this name, and we should *not* attempt to master
729 * the lock here.   need to wait around for that node
730 * to assert_master (or die).
731 *
732 */
733struct dlm_lock_resource * dlm_get_lock_resource(struct dlm_ctxt *dlm,
734					  const char *lockid,
735					  int namelen,
736					  int flags)
737{
738	struct dlm_lock_resource *tmpres=NULL, *res=NULL;
739	struct dlm_master_list_entry *mle = NULL;
740	struct dlm_master_list_entry *alloc_mle = NULL;
741	int blocked = 0;
742	int ret, nodenum;
743	struct dlm_node_iter iter;
744	unsigned int hash;
745	int tries = 0;
746	int bit, wait_on_recovery = 0;
747
748	BUG_ON(!lockid);
749
750	hash = dlm_lockid_hash(lockid, namelen);
751
752	mlog(0, "get lockres %s (len %d)\n", lockid, namelen);
753
754lookup:
755	spin_lock(&dlm->spinlock);
756	tmpres = __dlm_lookup_lockres_full(dlm, lockid, namelen, hash);
757	if (tmpres) {
758		spin_unlock(&dlm->spinlock);
759		spin_lock(&tmpres->spinlock);
760
761		/*
762		 * Right after dlm spinlock was released, dlm_thread could have
763		 * purged the lockres. Check if lockres got unhashed. If so
764		 * start over.
765		 */
766		if (hlist_unhashed(&tmpres->hash_node)) {
767			spin_unlock(&tmpres->spinlock);
768			dlm_lockres_put(tmpres);
769			tmpres = NULL;
770			goto lookup;
771		}
772
773		/* Wait on the thread that is mastering the resource */
774		if (tmpres->owner == DLM_LOCK_RES_OWNER_UNKNOWN) {
775			__dlm_wait_on_lockres(tmpres);
776			BUG_ON(tmpres->owner == DLM_LOCK_RES_OWNER_UNKNOWN);
777			spin_unlock(&tmpres->spinlock);
778			dlm_lockres_put(tmpres);
779			tmpres = NULL;
780			goto lookup;
781		}
782
783		/* Wait on the resource purge to complete before continuing */
784		if (tmpres->state & DLM_LOCK_RES_DROPPING_REF) {
785			BUG_ON(tmpres->owner == dlm->node_num);
786			__dlm_wait_on_lockres_flags(tmpres,
787						    DLM_LOCK_RES_DROPPING_REF);
788			spin_unlock(&tmpres->spinlock);
789			dlm_lockres_put(tmpres);
790			tmpres = NULL;
791			goto lookup;
792		}
793
794		/* Grab inflight ref to pin the resource */
795		dlm_lockres_grab_inflight_ref(dlm, tmpres);
796
797		spin_unlock(&tmpres->spinlock);
798		if (res)
799			dlm_lockres_put(res);
800		res = tmpres;
801		goto leave;
802	}
803
804	if (!res) {
805		spin_unlock(&dlm->spinlock);
806		mlog(0, "allocating a new resource\n");
807		/* nothing found and we need to allocate one. */
808		alloc_mle = kmem_cache_alloc(dlm_mle_cache, GFP_NOFS);
809		if (!alloc_mle)
810			goto leave;
811		res = dlm_new_lockres(dlm, lockid, namelen);
812		if (!res)
813			goto leave;
814		goto lookup;
815	}
816
817	mlog(0, "no lockres found, allocated our own: %p\n", res);
818
819	if (flags & LKM_LOCAL) {
820		/* caller knows it's safe to assume it's not mastered elsewhere
821		 * DONE!  return right away */
822		spin_lock(&res->spinlock);
823		dlm_change_lockres_owner(dlm, res, dlm->node_num);
824		__dlm_insert_lockres(dlm, res);
825		dlm_lockres_grab_inflight_ref(dlm, res);
826		spin_unlock(&res->spinlock);
827		spin_unlock(&dlm->spinlock);
828		/* lockres still marked IN_PROGRESS */
829		goto wake_waiters;
830	}
831
832	/* check master list to see if another node has started mastering it */
833	spin_lock(&dlm->master_lock);
834
835	/* if we found a block, wait for lock to be mastered by another node */
836	blocked = dlm_find_mle(dlm, &mle, (char *)lockid, namelen);
837	if (blocked) {
838		int mig;
839		if (mle->type == DLM_MLE_MASTER) {
840			mlog(ML_ERROR, "master entry for nonexistent lock!\n");
841			BUG();
842		}
843		mig = (mle->type == DLM_MLE_MIGRATION);
844		/* if there is a migration in progress, let the migration
845		 * finish before continuing.  we can wait for the absence
846		 * of the MIGRATION mle: either the migrate finished or
847		 * one of the nodes died and the mle was cleaned up.
848		 * if there is a BLOCK here, but it already has a master
849		 * set, we are too late.  the master does not have a ref
850		 * for us in the refmap.  detach the mle and drop it.
851		 * either way, go back to the top and start over. */
852		if (mig || mle->master != O2NM_MAX_NODES) {
853			BUG_ON(mig && mle->master == dlm->node_num);
854			/* we arrived too late.  the master does not
855			 * have a ref for us. retry. */
856			mlog(0, "%s:%.*s: late on %s\n",
857			     dlm->name, namelen, lockid,
858			     mig ?  "MIGRATION" : "BLOCK");
859			spin_unlock(&dlm->master_lock);
860			spin_unlock(&dlm->spinlock);
861
862			/* master is known, detach */
863			if (!mig)
864				dlm_mle_detach_hb_events(dlm, mle);
865			dlm_put_mle(mle);
866			mle = NULL;
867			/* this is lame, but we can't wait on either
868			 * the mle or lockres waitqueue here */
869			if (mig)
870				msleep(100);
871			goto lookup;
872		}
873	} else {
874		/* go ahead and try to master lock on this node */
875		mle = alloc_mle;
876		/* make sure this does not get freed below */
877		alloc_mle = NULL;
878		dlm_init_mle(mle, DLM_MLE_MASTER, dlm, res, NULL, 0);
879		set_bit(dlm->node_num, mle->maybe_map);
880		__dlm_insert_mle(dlm, mle);
881
882		/* still holding the dlm spinlock, check the recovery map
883		 * to see if there are any nodes that still need to be
884		 * considered.  these will not appear in the mle nodemap
885		 * but they might own this lockres.  wait on them. */
886		bit = find_next_bit(dlm->recovery_map, O2NM_MAX_NODES, 0);
887		if (bit < O2NM_MAX_NODES) {
888			mlog(0, "%s: res %.*s, At least one node (%d) "
889			     "to recover before lock mastery can begin\n",
890			     dlm->name, namelen, (char *)lockid, bit);
891			wait_on_recovery = 1;
892		}
893	}
894
895	/* at this point there is either a DLM_MLE_BLOCK or a
896	 * DLM_MLE_MASTER on the master list, so it's safe to add the
897	 * lockres to the hashtable.  anyone who finds the lock will
898	 * still have to wait on the IN_PROGRESS. */
899
900	/* finally add the lockres to its hash bucket */
901	__dlm_insert_lockres(dlm, res);
902
903	/* since this lockres is new it doesn't not require the spinlock */
904	__dlm_lockres_grab_inflight_ref(dlm, res);
905
906	/* get an extra ref on the mle in case this is a BLOCK
907	 * if so, the creator of the BLOCK may try to put the last
908	 * ref at this time in the assert master handler, so we
909	 * need an extra one to keep from a bad ptr deref. */
910	dlm_get_mle_inuse(mle);
911	spin_unlock(&dlm->master_lock);
912	spin_unlock(&dlm->spinlock);
913
914redo_request:
915	while (wait_on_recovery) {
916		/* any cluster changes that occurred after dropping the
917		 * dlm spinlock would be detectable be a change on the mle,
918		 * so we only need to clear out the recovery map once. */
919		if (dlm_is_recovery_lock(lockid, namelen)) {
920			mlog(0, "%s: Recovery map is not empty, but must "
921			     "master $RECOVERY lock now\n", dlm->name);
922			if (!dlm_pre_master_reco_lockres(dlm, res))
923				wait_on_recovery = 0;
924			else {
925				mlog(0, "%s: waiting 500ms for heartbeat state "
926				    "change\n", dlm->name);
927				msleep(500);
928			}
929			continue;
930		}
931
932		dlm_kick_recovery_thread(dlm);
933		msleep(1000);
934		dlm_wait_for_recovery(dlm);
935
936		spin_lock(&dlm->spinlock);
937		bit = find_next_bit(dlm->recovery_map, O2NM_MAX_NODES, 0);
938		if (bit < O2NM_MAX_NODES) {
939			mlog(0, "%s: res %.*s, At least one node (%d) "
940			     "to recover before lock mastery can begin\n",
941			     dlm->name, namelen, (char *)lockid, bit);
942			wait_on_recovery = 1;
943		} else
944			wait_on_recovery = 0;
945		spin_unlock(&dlm->spinlock);
946
947		if (wait_on_recovery)
948			dlm_wait_for_node_recovery(dlm, bit, 10000);
949	}
950
951	/* must wait for lock to be mastered elsewhere */
952	if (blocked)
953		goto wait;
954
955	ret = -EINVAL;
956	dlm_node_iter_init(mle->vote_map, &iter);
957	while ((nodenum = dlm_node_iter_next(&iter)) >= 0) {
958		ret = dlm_do_master_request(res, mle, nodenum);
959		if (ret < 0)
960			mlog_errno(ret);
961		if (mle->master != O2NM_MAX_NODES) {
962			/* found a master ! */
963			if (mle->master <= nodenum)
964				break;
965			/* if our master request has not reached the master
966			 * yet, keep going until it does.  this is how the
967			 * master will know that asserts are needed back to
968			 * the lower nodes. */
969			mlog(0, "%s: res %.*s, Requests only up to %u but "
970			     "master is %u, keep going\n", dlm->name, namelen,
971			     lockid, nodenum, mle->master);
972		}
973	}
974
975wait:
976	/* keep going until the response map includes all nodes */
977	ret = dlm_wait_for_lock_mastery(dlm, res, mle, &blocked);
978	if (ret < 0) {
979		wait_on_recovery = 1;
980		mlog(0, "%s: res %.*s, Node map changed, redo the master "
981		     "request now, blocked=%d\n", dlm->name, res->lockname.len,
982		     res->lockname.name, blocked);
983		if (++tries > 20) {
984			mlog(ML_ERROR, "%s: res %.*s, Spinning on "
985			     "dlm_wait_for_lock_mastery, blocked = %d\n",
986			     dlm->name, res->lockname.len,
987			     res->lockname.name, blocked);
988			dlm_print_one_lock_resource(res);
989			dlm_print_one_mle(mle);
990			tries = 0;
991		}
992		goto redo_request;
993	}
994
995	mlog(0, "%s: res %.*s, Mastered by %u\n", dlm->name, res->lockname.len,
996	     res->lockname.name, res->owner);
997	/* make sure we never continue without this */
998	BUG_ON(res->owner == O2NM_MAX_NODES);
999
1000	/* master is known, detach if not already detached */
1001	dlm_mle_detach_hb_events(dlm, mle);
1002	dlm_put_mle(mle);
1003	/* put the extra ref */
1004	dlm_put_mle_inuse(mle);
1005
1006wake_waiters:
1007	spin_lock(&res->spinlock);
1008	res->state &= ~DLM_LOCK_RES_IN_PROGRESS;
1009	spin_unlock(&res->spinlock);
1010	wake_up(&res->wq);
1011
1012leave:
1013	/* need to free the unused mle */
1014	if (alloc_mle)
1015		kmem_cache_free(dlm_mle_cache, alloc_mle);
1016
1017	return res;
1018}
1019
1020
1021#define DLM_MASTERY_TIMEOUT_MS   5000
1022
1023static int dlm_wait_for_lock_mastery(struct dlm_ctxt *dlm,
1024				     struct dlm_lock_resource *res,
1025				     struct dlm_master_list_entry *mle,
1026				     int *blocked)
1027{
1028	u8 m;
1029	int ret, bit;
1030	int map_changed, voting_done;
1031	int assert, sleep;
1032
1033recheck:
1034	ret = 0;
1035	assert = 0;
1036
1037	/* check if another node has already become the owner */
1038	spin_lock(&res->spinlock);
1039	if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN) {
1040		mlog(0, "%s:%.*s: owner is suddenly %u\n", dlm->name,
1041		     res->lockname.len, res->lockname.name, res->owner);
1042		spin_unlock(&res->spinlock);
1043		/* this will cause the master to re-assert across
1044		 * the whole cluster, freeing up mles */
1045		if (res->owner != dlm->node_num) {
1046			ret = dlm_do_master_request(res, mle, res->owner);
1047			if (ret < 0) {
1048				/* give recovery a chance to run */
1049				mlog(ML_ERROR, "link to %u went down?: %d\n", res->owner, ret);
1050				msleep(500);
1051				goto recheck;
1052			}
1053		}
1054		ret = 0;
1055		goto leave;
1056	}
1057	spin_unlock(&res->spinlock);
1058
1059	spin_lock(&mle->spinlock);
1060	m = mle->master;
1061	map_changed = (memcmp(mle->vote_map, mle->node_map,
1062			      sizeof(mle->vote_map)) != 0);
1063	voting_done = (memcmp(mle->vote_map, mle->response_map,
1064			     sizeof(mle->vote_map)) == 0);
1065
1066	/* restart if we hit any errors */
1067	if (map_changed) {
1068		int b;
1069		mlog(0, "%s: %.*s: node map changed, restarting\n",
1070		     dlm->name, res->lockname.len, res->lockname.name);
1071		ret = dlm_restart_lock_mastery(dlm, res, mle, *blocked);
1072		b = (mle->type == DLM_MLE_BLOCK);
1073		if ((*blocked && !b) || (!*blocked && b)) {
1074			mlog(0, "%s:%.*s: status change: old=%d new=%d\n",
1075			     dlm->name, res->lockname.len, res->lockname.name,
1076			     *blocked, b);
1077			*blocked = b;
1078		}
1079		spin_unlock(&mle->spinlock);
1080		if (ret < 0) {
1081			mlog_errno(ret);
1082			goto leave;
1083		}
1084		mlog(0, "%s:%.*s: restart lock mastery succeeded, "
1085		     "rechecking now\n", dlm->name, res->lockname.len,
1086		     res->lockname.name);
1087		goto recheck;
1088	} else {
1089		if (!voting_done) {
1090			mlog(0, "map not changed and voting not done "
1091			     "for %s:%.*s\n", dlm->name, res->lockname.len,
1092			     res->lockname.name);
1093		}
1094	}
1095
1096	if (m != O2NM_MAX_NODES) {
1097		/* another node has done an assert!
1098		 * all done! */
1099		sleep = 0;
1100	} else {
1101		sleep = 1;
1102		/* have all nodes responded? */
1103		if (voting_done && !*blocked) {
1104			bit = find_next_bit(mle->maybe_map, O2NM_MAX_NODES, 0);
1105			if (dlm->node_num <= bit) {
1106				/* my node number is lowest.
1107			 	 * now tell other nodes that I am
1108				 * mastering this. */
1109				mle->master = dlm->node_num;
1110				/* ref was grabbed in get_lock_resource
1111				 * will be dropped in dlmlock_master */
1112				assert = 1;
1113				sleep = 0;
1114			}
1115			/* if voting is done, but we have not received
1116			 * an assert master yet, we must sleep */
1117		}
1118	}
1119
1120	spin_unlock(&mle->spinlock);
1121
1122	/* sleep if we haven't finished voting yet */
1123	if (sleep) {
1124		unsigned long timeo = msecs_to_jiffies(DLM_MASTERY_TIMEOUT_MS);
1125
1126		/*
1127		if (atomic_read(&mle->mle_refs.refcount) < 2)
1128			mlog(ML_ERROR, "mle (%p) refs=%d, name=%.*s\n", mle,
1129			atomic_read(&mle->mle_refs.refcount),
1130			res->lockname.len, res->lockname.name);
1131		*/
1132		atomic_set(&mle->woken, 0);
1133		(void)wait_event_timeout(mle->wq,
1134					 (atomic_read(&mle->woken) == 1),
1135					 timeo);
1136		if (res->owner == O2NM_MAX_NODES) {
1137			mlog(0, "%s:%.*s: waiting again\n", dlm->name,
1138			     res->lockname.len, res->lockname.name);
1139			goto recheck;
1140		}
1141		mlog(0, "done waiting, master is %u\n", res->owner);
1142		ret = 0;
1143		goto leave;
1144	}
1145
1146	ret = 0;   /* done */
1147	if (assert) {
1148		m = dlm->node_num;
1149		mlog(0, "about to master %.*s here, this=%u\n",
1150		     res->lockname.len, res->lockname.name, m);
1151		ret = dlm_do_assert_master(dlm, res, mle->vote_map, 0);
1152		if (ret) {
1153			/* This is a failure in the network path,
1154			 * not in the response to the assert_master
1155			 * (any nonzero response is a BUG on this node).
1156			 * Most likely a socket just got disconnected
1157			 * due to node death. */
1158			mlog_errno(ret);
1159		}
1160		/* no longer need to restart lock mastery.
1161		 * all living nodes have been contacted. */
1162		ret = 0;
1163	}
1164
1165	/* set the lockres owner */
1166	spin_lock(&res->spinlock);
1167	/* mastery reference obtained either during
1168	 * assert_master_handler or in get_lock_resource */
1169	dlm_change_lockres_owner(dlm, res, m);
1170	spin_unlock(&res->spinlock);
1171
1172leave:
1173	return ret;
1174}
1175
1176struct dlm_bitmap_diff_iter
1177{
1178	int curnode;
1179	unsigned long *orig_bm;
1180	unsigned long *cur_bm;
1181	unsigned long diff_bm[BITS_TO_LONGS(O2NM_MAX_NODES)];
1182};
1183
1184enum dlm_node_state_change
1185{
1186	NODE_DOWN = -1,
1187	NODE_NO_CHANGE = 0,
1188	NODE_UP
1189};
1190
1191static void dlm_bitmap_diff_iter_init(struct dlm_bitmap_diff_iter *iter,
1192				      unsigned long *orig_bm,
1193				      unsigned long *cur_bm)
1194{
1195	unsigned long p1, p2;
1196	int i;
1197
1198	iter->curnode = -1;
1199	iter->orig_bm = orig_bm;
1200	iter->cur_bm = cur_bm;
1201
1202	for (i = 0; i < BITS_TO_LONGS(O2NM_MAX_NODES); i++) {
1203       		p1 = *(iter->orig_bm + i);
1204	       	p2 = *(iter->cur_bm + i);
1205		iter->diff_bm[i] = (p1 & ~p2) | (p2 & ~p1);
1206	}
1207}
1208
1209static int dlm_bitmap_diff_iter_next(struct dlm_bitmap_diff_iter *iter,
1210				     enum dlm_node_state_change *state)
1211{
1212	int bit;
1213
1214	if (iter->curnode >= O2NM_MAX_NODES)
1215		return -ENOENT;
1216
1217	bit = find_next_bit(iter->diff_bm, O2NM_MAX_NODES,
1218			    iter->curnode+1);
1219	if (bit >= O2NM_MAX_NODES) {
1220		iter->curnode = O2NM_MAX_NODES;
1221		return -ENOENT;
1222	}
1223
1224	/* if it was there in the original then this node died */
1225	if (test_bit(bit, iter->orig_bm))
1226		*state = NODE_DOWN;
1227	else
1228		*state = NODE_UP;
1229
1230	iter->curnode = bit;
1231	return bit;
1232}
1233
1234
1235static int dlm_restart_lock_mastery(struct dlm_ctxt *dlm,
1236				    struct dlm_lock_resource *res,
1237				    struct dlm_master_list_entry *mle,
1238				    int blocked)
1239{
1240	struct dlm_bitmap_diff_iter bdi;
1241	enum dlm_node_state_change sc;
1242	int node;
1243	int ret = 0;
1244
1245	mlog(0, "something happened such that the "
1246	     "master process may need to be restarted!\n");
1247
1248	assert_spin_locked(&mle->spinlock);
1249
1250	dlm_bitmap_diff_iter_init(&bdi, mle->vote_map, mle->node_map);
1251	node = dlm_bitmap_diff_iter_next(&bdi, &sc);
1252	while (node >= 0) {
1253		if (sc == NODE_UP) {
1254			/* a node came up.  clear any old vote from
1255			 * the response map and set it in the vote map
1256			 * then restart the mastery. */
1257			mlog(ML_NOTICE, "node %d up while restarting\n", node);
1258
1259			/* redo the master request, but only for the new node */
1260			mlog(0, "sending request to new node\n");
1261			clear_bit(node, mle->response_map);
1262			set_bit(node, mle->vote_map);
1263		} else {
1264			mlog(ML_ERROR, "node down! %d\n", node);
1265			if (blocked) {
1266				int lowest = find_next_bit(mle->maybe_map,
1267						       O2NM_MAX_NODES, 0);
1268
1269				/* act like it was never there */
1270				clear_bit(node, mle->maybe_map);
1271
1272			       	if (node == lowest) {
1273					mlog(0, "expected master %u died"
1274					    " while this node was blocked "
1275					    "waiting on it!\n", node);
1276					lowest = find_next_bit(mle->maybe_map,
1277						       	O2NM_MAX_NODES,
1278						       	lowest+1);
1279					if (lowest < O2NM_MAX_NODES) {
1280						mlog(0, "%s:%.*s:still "
1281						     "blocked. waiting on %u "
1282						     "now\n", dlm->name,
1283						     res->lockname.len,
1284						     res->lockname.name,
1285						     lowest);
1286					} else {
1287						/* mle is an MLE_BLOCK, but
1288						 * there is now nothing left to
1289						 * block on.  we need to return
1290						 * all the way back out and try
1291						 * again with an MLE_MASTER.
1292						 * dlm_do_local_recovery_cleanup
1293						 * has already run, so the mle
1294						 * refcount is ok */
1295						mlog(0, "%s:%.*s: no "
1296						     "longer blocking. try to "
1297						     "master this here\n",
1298						     dlm->name,
1299						     res->lockname.len,
1300						     res->lockname.name);
1301						mle->type = DLM_MLE_MASTER;
1302						mle->mleres = res;
1303					}
1304				}
1305			}
1306
1307			/* now blank out everything, as if we had never
1308			 * contacted anyone */
1309			memset(mle->maybe_map, 0, sizeof(mle->maybe_map));
1310			memset(mle->response_map, 0, sizeof(mle->response_map));
1311			/* reset the vote_map to the current node_map */
1312			memcpy(mle->vote_map, mle->node_map,
1313			       sizeof(mle->node_map));
1314			/* put myself into the maybe map */
1315			if (mle->type != DLM_MLE_BLOCK)
1316				set_bit(dlm->node_num, mle->maybe_map);
1317		}
1318		ret = -EAGAIN;
1319		node = dlm_bitmap_diff_iter_next(&bdi, &sc);
1320	}
1321	return ret;
1322}
1323
1324
1325/*
1326 * DLM_MASTER_REQUEST_MSG
1327 *
1328 * returns: 0 on success,
1329 *          -errno on a network error
1330 *
1331 * on error, the caller should assume the target node is "dead"
1332 *
1333 */
1334
1335static int dlm_do_master_request(struct dlm_lock_resource *res,
1336				 struct dlm_master_list_entry *mle, int to)
1337{
1338	struct dlm_ctxt *dlm = mle->dlm;
1339	struct dlm_master_request request;
1340	int ret, response=0, resend;
1341
1342	memset(&request, 0, sizeof(request));
1343	request.node_idx = dlm->node_num;
1344
1345	BUG_ON(mle->type == DLM_MLE_MIGRATION);
1346
1347	request.namelen = (u8)mle->mnamelen;
1348	memcpy(request.name, mle->mname, request.namelen);
1349
1350again:
1351	ret = o2net_send_message(DLM_MASTER_REQUEST_MSG, dlm->key, &request,
1352				 sizeof(request), to, &response);
1353	if (ret < 0)  {
1354		if (ret == -ESRCH) {
1355			/* should never happen */
1356			mlog(ML_ERROR, "TCP stack not ready!\n");
1357			BUG();
1358		} else if (ret == -EINVAL) {
1359			mlog(ML_ERROR, "bad args passed to o2net!\n");
1360			BUG();
1361		} else if (ret == -ENOMEM) {
1362			mlog(ML_ERROR, "out of memory while trying to send "
1363			     "network message!  retrying\n");
1364			/* this is totally crude */
1365			msleep(50);
1366			goto again;
1367		} else if (!dlm_is_host_down(ret)) {
1368			/* not a network error. bad. */
1369			mlog_errno(ret);
1370			mlog(ML_ERROR, "unhandled error!");
1371			BUG();
1372		}
1373		/* all other errors should be network errors,
1374		 * and likely indicate node death */
1375		mlog(ML_ERROR, "link to %d went down!\n", to);
1376		goto out;
1377	}
1378
1379	ret = 0;
1380	resend = 0;
1381	spin_lock(&mle->spinlock);
1382	switch (response) {
1383		case DLM_MASTER_RESP_YES:
1384			set_bit(to, mle->response_map);
1385			mlog(0, "node %u is the master, response=YES\n", to);
1386			mlog(0, "%s:%.*s: master node %u now knows I have a "
1387			     "reference\n", dlm->name, res->lockname.len,
1388			     res->lockname.name, to);
1389			mle->master = to;
1390			break;
1391		case DLM_MASTER_RESP_NO:
1392			mlog(0, "node %u not master, response=NO\n", to);
1393			set_bit(to, mle->response_map);
1394			break;
1395		case DLM_MASTER_RESP_MAYBE:
1396			mlog(0, "node %u not master, response=MAYBE\n", to);
1397			set_bit(to, mle->response_map);
1398			set_bit(to, mle->maybe_map);
1399			break;
1400		case DLM_MASTER_RESP_ERROR:
1401			mlog(0, "node %u hit an error, resending\n", to);
1402			resend = 1;
1403			response = 0;
1404			break;
1405		default:
1406			mlog(ML_ERROR, "bad response! %u\n", response);
1407			BUG();
1408	}
1409	spin_unlock(&mle->spinlock);
1410	if (resend) {
1411		/* this is also totally crude */
1412		msleep(50);
1413		goto again;
1414	}
1415
1416out:
1417	return ret;
1418}
1419
1420/*
1421 * locks that can be taken here:
1422 * dlm->spinlock
1423 * res->spinlock
1424 * mle->spinlock
1425 * dlm->master_list
1426 *
1427 * if possible, TRIM THIS DOWN!!!
1428 */
1429int dlm_master_request_handler(struct o2net_msg *msg, u32 len, void *data,
1430			       void **ret_data)
1431{
1432	u8 response = DLM_MASTER_RESP_MAYBE;
1433	struct dlm_ctxt *dlm = data;
1434	struct dlm_lock_resource *res = NULL;
1435	struct dlm_master_request *request = (struct dlm_master_request *) msg->buf;
1436	struct dlm_master_list_entry *mle = NULL, *tmpmle = NULL;
1437	char *name;
1438	unsigned int namelen, hash;
1439	int found, ret;
1440	int set_maybe;
1441	int dispatch_assert = 0;
1442	int dispatched = 0;
1443
1444	if (!dlm_grab(dlm))
1445		return DLM_MASTER_RESP_NO;
1446
1447	if (!dlm_domain_fully_joined(dlm)) {
1448		response = DLM_MASTER_RESP_NO;
1449		goto send_response;
1450	}
1451
1452	name = request->name;
1453	namelen = request->namelen;
1454	hash = dlm_lockid_hash(name, namelen);
1455
1456	if (namelen > DLM_LOCKID_NAME_MAX) {
1457		response = DLM_IVBUFLEN;
1458		goto send_response;
1459	}
1460
1461way_up_top:
1462	spin_lock(&dlm->spinlock);
1463	res = __dlm_lookup_lockres(dlm, name, namelen, hash);
1464	if (res) {
1465		spin_unlock(&dlm->spinlock);
1466
1467		/* take care of the easy cases up front */
1468		spin_lock(&res->spinlock);
1469
1470		/*
1471		 * Right after dlm spinlock was released, dlm_thread could have
1472		 * purged the lockres. Check if lockres got unhashed. If so
1473		 * start over.
1474		 */
1475		if (hlist_unhashed(&res->hash_node)) {
1476			spin_unlock(&res->spinlock);
1477			dlm_lockres_put(res);
1478			goto way_up_top;
1479		}
1480
1481		if (res->state & (DLM_LOCK_RES_RECOVERING|
1482				  DLM_LOCK_RES_MIGRATING)) {
1483			spin_unlock(&res->spinlock);
1484			mlog(0, "returning DLM_MASTER_RESP_ERROR since res is "
1485			     "being recovered/migrated\n");
1486			response = DLM_MASTER_RESP_ERROR;
1487			if (mle)
1488				kmem_cache_free(dlm_mle_cache, mle);
1489			goto send_response;
1490		}
1491
1492		if (res->owner == dlm->node_num) {
1493			dlm_lockres_set_refmap_bit(dlm, res, request->node_idx);
1494			spin_unlock(&res->spinlock);
1495			response = DLM_MASTER_RESP_YES;
1496			if (mle)
1497				kmem_cache_free(dlm_mle_cache, mle);
1498
1499			/* this node is the owner.
1500			 * there is some extra work that needs to
1501			 * happen now.  the requesting node has
1502			 * caused all nodes up to this one to
1503			 * create mles.  this node now needs to
1504			 * go back and clean those up. */
1505			dispatch_assert = 1;
1506			goto send_response;
1507		} else if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN) {
1508			spin_unlock(&res->spinlock);
1509			// mlog(0, "node %u is the master\n", res->owner);
1510			response = DLM_MASTER_RESP_NO;
1511			if (mle)
1512				kmem_cache_free(dlm_mle_cache, mle);
1513			goto send_response;
1514		}
1515
1516		/* ok, there is no owner.  either this node is
1517		 * being blocked, or it is actively trying to
1518		 * master this lock. */
1519		if (!(res->state & DLM_LOCK_RES_IN_PROGRESS)) {
1520			mlog(ML_ERROR, "lock with no owner should be "
1521			     "in-progress!\n");
1522			BUG();
1523		}
1524
1525		// mlog(0, "lockres is in progress...\n");
1526		spin_lock(&dlm->master_lock);
1527		found = dlm_find_mle(dlm, &tmpmle, name, namelen);
1528		if (!found) {
1529			mlog(ML_ERROR, "no mle found for this lock!\n");
1530			BUG();
1531		}
1532		set_maybe = 1;
1533		spin_lock(&tmpmle->spinlock);
1534		if (tmpmle->type == DLM_MLE_BLOCK) {
1535			// mlog(0, "this node is waiting for "
1536			// "lockres to be mastered\n");
1537			response = DLM_MASTER_RESP_NO;
1538		} else if (tmpmle->type == DLM_MLE_MIGRATION) {
1539			mlog(0, "node %u is master, but trying to migrate to "
1540			     "node %u.\n", tmpmle->master, tmpmle->new_master);
1541			if (tmpmle->master == dlm->node_num) {
1542				mlog(ML_ERROR, "no owner on lockres, but this "
1543				     "node is trying to migrate it to %u?!\n",
1544				     tmpmle->new_master);
1545				BUG();
1546			} else {
1547				/* the real master can respond on its own */
1548				response = DLM_MASTER_RESP_NO;
1549			}
1550		} else if (tmpmle->master != DLM_LOCK_RES_OWNER_UNKNOWN) {
1551			set_maybe = 0;
1552			if (tmpmle->master == dlm->node_num) {
1553				response = DLM_MASTER_RESP_YES;
1554				/* this node will be the owner.
1555				 * go back and clean the mles on any
1556				 * other nodes */
1557				dispatch_assert = 1;
1558				dlm_lockres_set_refmap_bit(dlm, res,
1559							   request->node_idx);
1560			} else
1561				response = DLM_MASTER_RESP_NO;
1562		} else {
1563			// mlog(0, "this node is attempting to "
1564			// "master lockres\n");
1565			response = DLM_MASTER_RESP_MAYBE;
1566		}
1567		if (set_maybe)
1568			set_bit(request->node_idx, tmpmle->maybe_map);
1569		spin_unlock(&tmpmle->spinlock);
1570
1571		spin_unlock(&dlm->master_lock);
1572		spin_unlock(&res->spinlock);
1573
1574		/* keep the mle attached to heartbeat events */
1575		dlm_put_mle(tmpmle);
1576		if (mle)
1577			kmem_cache_free(dlm_mle_cache, mle);
1578		goto send_response;
1579	}
1580
1581	/*
1582	 * lockres doesn't exist on this node
1583	 * if there is an MLE_BLOCK, return NO
1584	 * if there is an MLE_MASTER, return MAYBE
1585	 * otherwise, add an MLE_BLOCK, return NO
1586	 */
1587	spin_lock(&dlm->master_lock);
1588	found = dlm_find_mle(dlm, &tmpmle, name, namelen);
1589	if (!found) {
1590		/* this lockid has never been seen on this node yet */
1591		// mlog(0, "no mle found\n");
1592		if (!mle) {
1593			spin_unlock(&dlm->master_lock);
1594			spin_unlock(&dlm->spinlock);
1595
1596			mle = kmem_cache_alloc(dlm_mle_cache, GFP_NOFS);
1597			if (!mle) {
1598				response = DLM_MASTER_RESP_ERROR;
1599				mlog_errno(-ENOMEM);
1600				goto send_response;
1601			}
1602			goto way_up_top;
1603		}
1604
1605		// mlog(0, "this is second time thru, already allocated, "
1606		// "add the block.\n");
1607		dlm_init_mle(mle, DLM_MLE_BLOCK, dlm, NULL, name, namelen);
1608		set_bit(request->node_idx, mle->maybe_map);
1609		__dlm_insert_mle(dlm, mle);
1610		response = DLM_MASTER_RESP_NO;
1611	} else {
1612		// mlog(0, "mle was found\n");
1613		set_maybe = 1;
1614		spin_lock(&tmpmle->spinlock);
1615		if (tmpmle->master == dlm->node_num) {
1616			mlog(ML_ERROR, "no lockres, but an mle with this node as master!\n");
1617			BUG();
1618		}
1619		if (tmpmle->type == DLM_MLE_BLOCK)
1620			response = DLM_MASTER_RESP_NO;
1621		else if (tmpmle->type == DLM_MLE_MIGRATION) {
1622			mlog(0, "migration mle was found (%u->%u)\n",
1623			     tmpmle->master, tmpmle->new_master);
1624			/* real master can respond on its own */
1625			response = DLM_MASTER_RESP_NO;
1626		} else
1627			response = DLM_MASTER_RESP_MAYBE;
1628		if (set_maybe)
1629			set_bit(request->node_idx, tmpmle->maybe_map);
1630		spin_unlock(&tmpmle->spinlock);
1631	}
1632	spin_unlock(&dlm->master_lock);
1633	spin_unlock(&dlm->spinlock);
1634
1635	if (found) {
1636		/* keep the mle attached to heartbeat events */
1637		dlm_put_mle(tmpmle);
1638	}
1639send_response:
1640	/*
1641	 * __dlm_lookup_lockres() grabbed a reference to this lockres.
1642	 * The reference is released by dlm_assert_master_worker() under
1643	 * the call to dlm_dispatch_assert_master().  If
1644	 * dlm_assert_master_worker() isn't called, we drop it here.
1645	 */
1646	if (dispatch_assert) {
1647		if (response != DLM_MASTER_RESP_YES)
1648			mlog(ML_ERROR, "invalid response %d\n", response);
1649		if (!res) {
1650			mlog(ML_ERROR, "bad lockres while trying to assert!\n");
1651			BUG();
1652		}
1653		mlog(0, "%u is the owner of %.*s, cleaning everyone else\n",
1654			     dlm->node_num, res->lockname.len, res->lockname.name);
1655		spin_lock(&res->spinlock);
1656		ret = dlm_dispatch_assert_master(dlm, res, 0, request->node_idx,
1657						 DLM_ASSERT_MASTER_MLE_CLEANUP);
1658		if (ret < 0) {
1659			mlog(ML_ERROR, "failed to dispatch assert master work\n");
1660			response = DLM_MASTER_RESP_ERROR;
1661			dlm_lockres_put(res);
1662		} else {
1663			dispatched = 1;
1664			__dlm_lockres_grab_inflight_worker(dlm, res);
1665		}
1666		spin_unlock(&res->spinlock);
1667	} else {
1668		if (res)
1669			dlm_lockres_put(res);
1670	}
1671
1672	if (!dispatched)
1673		dlm_put(dlm);
1674	return response;
1675}
1676
1677/*
1678 * DLM_ASSERT_MASTER_MSG
1679 */
1680
1681
1682/*
1683 * NOTE: this can be used for debugging
1684 * can periodically run all locks owned by this node
1685 * and re-assert across the cluster...
1686 */
1687static int dlm_do_assert_master(struct dlm_ctxt *dlm,
1688				struct dlm_lock_resource *res,
1689				void *nodemap, u32 flags)
1690{
1691	struct dlm_assert_master assert;
1692	int to, tmpret;
1693	struct dlm_node_iter iter;
1694	int ret = 0;
1695	int reassert;
1696	const char *lockname = res->lockname.name;
1697	unsigned int namelen = res->lockname.len;
1698
1699	BUG_ON(namelen > O2NM_MAX_NAME_LEN);
1700
1701	spin_lock(&res->spinlock);
1702	res->state |= DLM_LOCK_RES_SETREF_INPROG;
1703	spin_unlock(&res->spinlock);
1704
1705again:
1706	reassert = 0;
1707
1708	/* note that if this nodemap is empty, it returns 0 */
1709	dlm_node_iter_init(nodemap, &iter);
1710	while ((to = dlm_node_iter_next(&iter)) >= 0) {
1711		int r = 0;
1712		struct dlm_master_list_entry *mle = NULL;
1713
1714		mlog(0, "sending assert master to %d (%.*s)\n", to,
1715		     namelen, lockname);
1716		memset(&assert, 0, sizeof(assert));
1717		assert.node_idx = dlm->node_num;
1718		assert.namelen = namelen;
1719		memcpy(assert.name, lockname, namelen);
1720		assert.flags = cpu_to_be32(flags);
1721
1722		tmpret = o2net_send_message(DLM_ASSERT_MASTER_MSG, dlm->key,
1723					    &assert, sizeof(assert), to, &r);
1724		if (tmpret < 0) {
1725			mlog(ML_ERROR, "Error %d when sending message %u (key "
1726			     "0x%x) to node %u\n", tmpret,
1727			     DLM_ASSERT_MASTER_MSG, dlm->key, to);
1728			if (!dlm_is_host_down(tmpret)) {
1729				mlog(ML_ERROR, "unhandled error=%d!\n", tmpret);
1730				BUG();
1731			}
1732			/* a node died.  finish out the rest of the nodes. */
1733			mlog(0, "link to %d went down!\n", to);
1734			/* any nonzero status return will do */
1735			ret = tmpret;
1736			r = 0;
1737		} else if (r < 0) {
1738			/* ok, something horribly messed.  kill thyself. */
1739			mlog(ML_ERROR,"during assert master of %.*s to %u, "
1740			     "got %d.\n", namelen, lockname, to, r);
1741			spin_lock(&dlm->spinlock);
1742			spin_lock(&dlm->master_lock);
1743			if (dlm_find_mle(dlm, &mle, (char *)lockname,
1744					 namelen)) {
1745				dlm_print_one_mle(mle);
1746				__dlm_put_mle(mle);
1747			}
1748			spin_unlock(&dlm->master_lock);
1749			spin_unlock(&dlm->spinlock);
1750			BUG();
1751		}
1752
1753		if (r & DLM_ASSERT_RESPONSE_REASSERT &&
1754		    !(r & DLM_ASSERT_RESPONSE_MASTERY_REF)) {
1755				mlog(ML_ERROR, "%.*s: very strange, "
1756				     "master MLE but no lockres on %u\n",
1757				     namelen, lockname, to);
1758		}
1759
1760		if (r & DLM_ASSERT_RESPONSE_REASSERT) {
1761			mlog(0, "%.*s: node %u create mles on other "
1762			     "nodes and requests a re-assert\n",
1763			     namelen, lockname, to);
1764			reassert = 1;
1765		}
1766		if (r & DLM_ASSERT_RESPONSE_MASTERY_REF) {
1767			mlog(0, "%.*s: node %u has a reference to this "
1768			     "lockres, set the bit in the refmap\n",
1769			     namelen, lockname, to);
1770			spin_lock(&res->spinlock);
1771			dlm_lockres_set_refmap_bit(dlm, res, to);
1772			spin_unlock(&res->spinlock);
1773		}
1774	}
1775
1776	if (reassert)
1777		goto again;
1778
1779	spin_lock(&res->spinlock);
1780	res->state &= ~DLM_LOCK_RES_SETREF_INPROG;
1781	spin_unlock(&res->spinlock);
1782	wake_up(&res->wq);
1783
1784	return ret;
1785}
1786
1787/*
1788 * locks that can be taken here:
1789 * dlm->spinlock
1790 * res->spinlock
1791 * mle->spinlock
1792 * dlm->master_list
1793 *
1794 * if possible, TRIM THIS DOWN!!!
1795 */
1796int dlm_assert_master_handler(struct o2net_msg *msg, u32 len, void *data,
1797			      void **ret_data)
1798{
1799	struct dlm_ctxt *dlm = data;
1800	struct dlm_master_list_entry *mle = NULL;
1801	struct dlm_assert_master *assert = (struct dlm_assert_master *)msg->buf;
1802	struct dlm_lock_resource *res = NULL;
1803	char *name;
1804	unsigned int namelen, hash;
1805	u32 flags;
1806	int master_request = 0, have_lockres_ref = 0;
1807	int ret = 0;
1808
1809	if (!dlm_grab(dlm))
1810		return 0;
1811
1812	name = assert->name;
1813	namelen = assert->namelen;
1814	hash = dlm_lockid_hash(name, namelen);
1815	flags = be32_to_cpu(assert->flags);
1816
1817	if (namelen > DLM_LOCKID_NAME_MAX) {
1818		mlog(ML_ERROR, "Invalid name length!");
1819		goto done;
1820	}
1821
1822	spin_lock(&dlm->spinlock);
1823
1824	if (flags)
1825		mlog(0, "assert_master with flags: %u\n", flags);
1826
1827	/* find the MLE */
1828	spin_lock(&dlm->master_lock);
1829	if (!dlm_find_mle(dlm, &mle, name, namelen)) {
1830		/* not an error, could be master just re-asserting */
1831		mlog(0, "just got an assert_master from %u, but no "
1832		     "MLE for it! (%.*s)\n", assert->node_idx,
1833		     namelen, name);
1834	} else {
1835		int bit = find_next_bit (mle->maybe_map, O2NM_MAX_NODES, 0);
1836		if (bit >= O2NM_MAX_NODES) {
1837			/* not necessarily an error, though less likely.
1838			 * could be master just re-asserting. */
1839			mlog(0, "no bits set in the maybe_map, but %u "
1840			     "is asserting! (%.*s)\n", assert->node_idx,
1841			     namelen, name);
1842		} else if (bit != assert->node_idx) {
1843			if (flags & DLM_ASSERT_MASTER_MLE_CLEANUP) {
1844				mlog(0, "master %u was found, %u should "
1845				     "back off\n", assert->node_idx, bit);
1846			} else {
1847				/* with the fix for bug 569, a higher node
1848				 * number winning the mastery will respond
1849				 * YES to mastery requests, but this node
1850				 * had no way of knowing.  let it pass. */
1851				mlog(0, "%u is the lowest node, "
1852				     "%u is asserting. (%.*s)  %u must "
1853				     "have begun after %u won.\n", bit,
1854				     assert->node_idx, namelen, name, bit,
1855				     assert->node_idx);
1856			}
1857		}
1858		if (mle->type == DLM_MLE_MIGRATION) {
1859			if (flags & DLM_ASSERT_MASTER_MLE_CLEANUP) {
1860				mlog(0, "%s:%.*s: got cleanup assert"
1861				     " from %u for migration\n",
1862				     dlm->name, namelen, name,
1863				     assert->node_idx);
1864			} else if (!(flags & DLM_ASSERT_MASTER_FINISH_MIGRATION)) {
1865				mlog(0, "%s:%.*s: got unrelated assert"
1866				     " from %u for migration, ignoring\n",
1867				     dlm->name, namelen, name,
1868				     assert->node_idx);
1869				__dlm_put_mle(mle);
1870				spin_unlock(&dlm->master_lock);
1871				spin_unlock(&dlm->spinlock);
1872				goto done;
1873			}
1874		}
1875	}
1876	spin_unlock(&dlm->master_lock);
1877
1878	/* ok everything checks out with the MLE
1879	 * now check to see if there is a lockres */
1880	res = __dlm_lookup_lockres(dlm, name, namelen, hash);
1881	if (res) {
1882		spin_lock(&res->spinlock);
1883		if (res->state & DLM_LOCK_RES_RECOVERING)  {
1884			mlog(ML_ERROR, "%u asserting but %.*s is "
1885			     "RECOVERING!\n", assert->node_idx, namelen, name);
1886			goto kill;
1887		}
1888		if (!mle) {
1889			if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN &&
1890			    res->owner != assert->node_idx) {
1891				mlog(ML_ERROR, "DIE! Mastery assert from %u, "
1892				     "but current owner is %u! (%.*s)\n",
1893				     assert->node_idx, res->owner, namelen,
1894				     name);
1895				__dlm_print_one_lock_resource(res);
1896				BUG();
1897			}
1898		} else if (mle->type != DLM_MLE_MIGRATION) {
1899			if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN) {
1900				/* owner is just re-asserting */
1901				if (res->owner == assert->node_idx) {
1902					mlog(0, "owner %u re-asserting on "
1903					     "lock %.*s\n", assert->node_idx,
1904					     namelen, name);
1905					goto ok;
1906				}
1907				mlog(ML_ERROR, "got assert_master from "
1908				     "node %u, but %u is the owner! "
1909				     "(%.*s)\n", assert->node_idx,
1910				     res->owner, namelen, name);
1911				goto kill;
1912			}
1913			if (!(res->state & DLM_LOCK_RES_IN_PROGRESS)) {
1914				mlog(ML_ERROR, "got assert from %u, but lock "
1915				     "with no owner should be "
1916				     "in-progress! (%.*s)\n",
1917				     assert->node_idx,
1918				     namelen, name);
1919				goto kill;
1920			}
1921		} else /* mle->type == DLM_MLE_MIGRATION */ {
1922			/* should only be getting an assert from new master */
1923			if (assert->node_idx != mle->new_master) {
1924				mlog(ML_ERROR, "got assert from %u, but "
1925				     "new master is %u, and old master "
1926				     "was %u (%.*s)\n",
1927				     assert->node_idx, mle->new_master,
1928				     mle->master, namelen, name);
1929				goto kill;
1930			}
1931
1932		}
1933ok:
1934		spin_unlock(&res->spinlock);
1935	}
1936
1937	// mlog(0, "woo!  got an assert_master from node %u!\n",
1938	// 	     assert->node_idx);
1939	if (mle) {
1940		int extra_ref = 0;
1941		int nn = -1;
1942		int rr, err = 0;
1943
1944		spin_lock(&mle->spinlock);
1945		if (mle->type == DLM_MLE_BLOCK || mle->type == DLM_MLE_MIGRATION)
1946			extra_ref = 1;
1947		else {
1948			/* MASTER mle: if any bits set in the response map
1949			 * then the calling node needs to re-assert to clear
1950			 * up nodes that this node contacted */
1951			while ((nn = find_next_bit (mle->response_map, O2NM_MAX_NODES,
1952						    nn+1)) < O2NM_MAX_NODES) {
1953				if (nn != dlm->node_num && nn != assert->node_idx) {
1954					master_request = 1;
1955					break;
1956				}
1957			}
1958		}
1959		mle->master = assert->node_idx;
1960		atomic_set(&mle->woken, 1);
1961		wake_up(&mle->wq);
1962		spin_unlock(&mle->spinlock);
1963
1964		if (res) {
1965			int wake = 0;
1966			spin_lock(&res->spinlock);
1967			if (mle->type == DLM_MLE_MIGRATION) {
1968				mlog(0, "finishing off migration of lockres %.*s, "
1969			     		"from %u to %u\n",
1970			       		res->lockname.len, res->lockname.name,
1971			       		dlm->node_num, mle->new_master);
1972				res->state &= ~DLM_LOCK_RES_MIGRATING;
1973				wake = 1;
1974				dlm_change_lockres_owner(dlm, res, mle->new_master);
1975				BUG_ON(res->state & DLM_LOCK_RES_DIRTY);
1976			} else {
1977				dlm_change_lockres_owner(dlm, res, mle->master);
1978			}
1979			spin_unlock(&res->spinlock);
1980			have_lockres_ref = 1;
1981			if (wake)
1982				wake_up(&res->wq);
1983		}
1984
1985		/* master is known, detach if not already detached.
1986		 * ensures that only one assert_master call will happen
1987		 * on this mle. */
1988		spin_lock(&dlm->master_lock);
1989
1990		rr = atomic_read(&mle->mle_refs.refcount);
1991		if (mle->inuse > 0) {
1992			if (extra_ref && rr < 3)
1993				err = 1;
1994			else if (!extra_ref && rr < 2)
1995				err = 1;
1996		} else {
1997			if (extra_ref && rr < 2)
1998				err = 1;
1999			else if (!extra_ref && rr < 1)
2000				err = 1;
2001		}
2002		if (err) {
2003			mlog(ML_ERROR, "%s:%.*s: got assert master from %u "
2004			     "that will mess up this node, refs=%d, extra=%d, "
2005			     "inuse=%d\n", dlm->name, namelen, name,
2006			     assert->node_idx, rr, extra_ref, mle->inuse);
2007			dlm_print_one_mle(mle);
2008		}
2009		__dlm_unlink_mle(dlm, mle);
2010		__dlm_mle_detach_hb_events(dlm, mle);
2011		__dlm_put_mle(mle);
2012		if (extra_ref) {
2013			/* the assert master message now balances the extra
2014		 	 * ref given by the master / migration request message.
2015		 	 * if this is the last put, it will be removed
2016		 	 * from the list. */
2017			__dlm_put_mle(mle);
2018		}
2019		spin_unlock(&dlm->master_lock);
2020	} else if (res) {
2021		if (res->owner != assert->node_idx) {
2022			mlog(0, "assert_master from %u, but current "
2023			     "owner is %u (%.*s), no mle\n", assert->node_idx,
2024			     res->owner, namelen, name);
2025		}
2026	}
2027	spin_unlock(&dlm->spinlock);
2028
2029done:
2030	ret = 0;
2031	if (res) {
2032		spin_lock(&res->spinlock);
2033		res->state |= DLM_LOCK_RES_SETREF_INPROG;
2034		spin_unlock(&res->spinlock);
2035		*ret_data = (void *)res;
2036	}
2037	dlm_put(dlm);
2038	if (master_request) {
2039		mlog(0, "need to tell master to reassert\n");
2040		/* positive. negative would shoot down the node. */
2041		ret |= DLM_ASSERT_RESPONSE_REASSERT;
2042		if (!have_lockres_ref) {
2043			mlog(ML_ERROR, "strange, got assert from %u, MASTER "
2044			     "mle present here for %s:%.*s, but no lockres!\n",
2045			     assert->node_idx, dlm->name, namelen, name);
2046		}
2047	}
2048	if (have_lockres_ref) {
2049		/* let the master know we have a reference to the lockres */
2050		ret |= DLM_ASSERT_RESPONSE_MASTERY_REF;
2051		mlog(0, "%s:%.*s: got assert from %u, need a ref\n",
2052		     dlm->name, namelen, name, assert->node_idx);
2053	}
2054	return ret;
2055
2056kill:
2057	/* kill the caller! */
2058	mlog(ML_ERROR, "Bad message received from another node.  Dumping state "
2059	     "and killing the other node now!  This node is OK and can continue.\n");
2060	__dlm_print_one_lock_resource(res);
2061	spin_unlock(&res->spinlock);
2062	spin_lock(&dlm->master_lock);
2063	if (mle)
2064		__dlm_put_mle(mle);
2065	spin_unlock(&dlm->master_lock);
2066	spin_unlock(&dlm->spinlock);
2067	*ret_data = (void *)res;
2068	dlm_put(dlm);
2069	return -EINVAL;
2070}
2071
2072void dlm_assert_master_post_handler(int status, void *data, void *ret_data)
2073{
2074	struct dlm_lock_resource *res = (struct dlm_lock_resource *)ret_data;
2075
2076	if (ret_data) {
2077		spin_lock(&res->spinlock);
2078		res->state &= ~DLM_LOCK_RES_SETREF_INPROG;
2079		spin_unlock(&res->spinlock);
2080		wake_up(&res->wq);
2081		dlm_lockres_put(res);
2082	}
2083	return;
2084}
2085
2086int dlm_dispatch_assert_master(struct dlm_ctxt *dlm,
2087			       struct dlm_lock_resource *res,
2088			       int ignore_higher, u8 request_from, u32 flags)
2089{
2090	struct dlm_work_item *item;
2091	item = kzalloc(sizeof(*item), GFP_ATOMIC);
2092	if (!item)
2093		return -ENOMEM;
2094
2095
2096	/* queue up work for dlm_assert_master_worker */
2097	dlm_init_work_item(dlm, item, dlm_assert_master_worker, NULL);
2098	item->u.am.lockres = res; /* already have a ref */
2099	/* can optionally ignore node numbers higher than this node */
2100	item->u.am.ignore_higher = ignore_higher;
2101	item->u.am.request_from = request_from;
2102	item->u.am.flags = flags;
2103
2104	if (ignore_higher)
2105		mlog(0, "IGNORE HIGHER: %.*s\n", res->lockname.len,
2106		     res->lockname.name);
2107
2108	spin_lock(&dlm->work_lock);
2109	list_add_tail(&item->list, &dlm->work_list);
2110	spin_unlock(&dlm->work_lock);
2111
2112	queue_work(dlm->dlm_worker, &dlm->dispatched_work);
2113	return 0;
2114}
2115
2116static void dlm_assert_master_worker(struct dlm_work_item *item, void *data)
2117{
2118	struct dlm_ctxt *dlm = data;
2119	int ret = 0;
2120	struct dlm_lock_resource *res;
2121	unsigned long nodemap[BITS_TO_LONGS(O2NM_MAX_NODES)];
2122	int ignore_higher;
2123	int bit;
2124	u8 request_from;
2125	u32 flags;
2126
2127	dlm = item->dlm;
2128	res = item->u.am.lockres;
2129	ignore_higher = item->u.am.ignore_higher;
2130	request_from = item->u.am.request_from;
2131	flags = item->u.am.flags;
2132
2133	spin_lock(&dlm->spinlock);
2134	memcpy(nodemap, dlm->domain_map, sizeof(nodemap));
2135	spin_unlock(&dlm->spinlock);
2136
2137	clear_bit(dlm->node_num, nodemap);
2138	if (ignore_higher) {
2139		/* if is this just to clear up mles for nodes below
2140		 * this node, do not send the message to the original
2141		 * caller or any node number higher than this */
2142		clear_bit(request_from, nodemap);
2143		bit = dlm->node_num;
2144		while (1) {
2145			bit = find_next_bit(nodemap, O2NM_MAX_NODES,
2146					    bit+1);
2147		       	if (bit >= O2NM_MAX_NODES)
2148				break;
2149			clear_bit(bit, nodemap);
2150		}
2151	}
2152
2153	/*
2154	 * If we're migrating this lock to someone else, we are no
2155	 * longer allowed to assert out own mastery.  OTOH, we need to
2156	 * prevent migration from starting while we're still asserting
2157	 * our dominance.  The reserved ast delays migration.
2158	 */
2159	spin_lock(&res->spinlock);
2160	if (res->state & DLM_LOCK_RES_MIGRATING) {
2161		mlog(0, "Someone asked us to assert mastery, but we're "
2162		     "in the middle of migration.  Skipping assert, "
2163		     "the new master will handle that.\n");
2164		spin_unlock(&res->spinlock);
2165		goto put;
2166	} else
2167		__dlm_lockres_reserve_ast(res);
2168	spin_unlock(&res->spinlock);
2169
2170	/* this call now finishes out the nodemap
2171	 * even if one or more nodes die */
2172	mlog(0, "worker about to master %.*s here, this=%u\n",
2173		     res->lockname.len, res->lockname.name, dlm->node_num);
2174	ret = dlm_do_assert_master(dlm, res, nodemap, flags);
2175	if (ret < 0) {
2176		/* no need to restart, we are done */
2177		if (!dlm_is_host_down(ret))
2178			mlog_errno(ret);
2179	}
2180
2181	/* Ok, we've asserted ourselves.  Let's let migration start. */
2182	dlm_lockres_release_ast(dlm, res);
2183
2184put:
2185	dlm_lockres_drop_inflight_worker(dlm, res);
2186
2187	dlm_lockres_put(res);
2188
2189	mlog(0, "finished with dlm_assert_master_worker\n");
2190}
2191
2192/* SPECIAL CASE for the $RECOVERY lock used by the recovery thread.
2193 * We cannot wait for node recovery to complete to begin mastering this
2194 * lockres because this lockres is used to kick off recovery! ;-)
2195 * So, do a pre-check on all living nodes to see if any of those nodes
2196 * think that $RECOVERY is currently mastered by a dead node.  If so,
2197 * we wait a short time to allow that node to get notified by its own
2198 * heartbeat stack, then check again.  All $RECOVERY lock resources
2199 * mastered by dead nodes are purged when the hearbeat callback is
2200 * fired, so we can know for sure that it is safe to continue once
2201 * the node returns a live node or no node.  */
2202static int dlm_pre_master_reco_lockres(struct dlm_ctxt *dlm,
2203				       struct dlm_lock_resource *res)
2204{
2205	struct dlm_node_iter iter;
2206	int nodenum;
2207	int ret = 0;
2208	u8 master = DLM_LOCK_RES_OWNER_UNKNOWN;
2209
2210	spin_lock(&dlm->spinlock);
2211	dlm_node_iter_init(dlm->domain_map, &iter);
2212	spin_unlock(&dlm->spinlock);
2213
2214	while ((nodenum = dlm_node_iter_next(&iter)) >= 0) {
2215		/* do not send to self */
2216		if (nodenum == dlm->node_num)
2217			continue;
2218		ret = dlm_do_master_requery(dlm, res, nodenum, &master);
2219		if (ret < 0) {
2220			mlog_errno(ret);
2221			if (!dlm_is_host_down(ret))
2222				BUG();
2223			/* host is down, so answer for that node would be
2224			 * DLM_LOCK_RES_OWNER_UNKNOWN.  continue. */
2225			ret = 0;
2226		}
2227
2228		if (master != DLM_LOCK_RES_OWNER_UNKNOWN) {
2229			/* check to see if this master is in the recovery map */
2230			spin_lock(&dlm->spinlock);
2231			if (test_bit(master, dlm->recovery_map)) {
2232				mlog(ML_NOTICE, "%s: node %u has not seen "
2233				     "node %u go down yet, and thinks the "
2234				     "dead node is mastering the recovery "
2235				     "lock.  must wait.\n", dlm->name,
2236				     nodenum, master);
2237				ret = -EAGAIN;
2238			}
2239			spin_unlock(&dlm->spinlock);
2240			mlog(0, "%s: reco lock master is %u\n", dlm->name,
2241			     master);
2242			break;
2243		}
2244	}
2245	return ret;
2246}
2247
2248/*
2249 * DLM_DEREF_LOCKRES_MSG
2250 */
2251
2252int dlm_drop_lockres_ref(struct dlm_ctxt *dlm, struct dlm_lock_resource *res)
2253{
2254	struct dlm_deref_lockres deref;
2255	int ret = 0, r;
2256	const char *lockname;
2257	unsigned int namelen;
2258
2259	lockname = res->lockname.name;
2260	namelen = res->lockname.len;
2261	BUG_ON(namelen > O2NM_MAX_NAME_LEN);
2262
2263	memset(&deref, 0, sizeof(deref));
2264	deref.node_idx = dlm->node_num;
2265	deref.namelen = namelen;
2266	memcpy(deref.name, lockname, namelen);
2267
2268	ret = o2net_send_message(DLM_DEREF_LOCKRES_MSG, dlm->key,
2269				 &deref, sizeof(deref), res->owner, &r);
2270	if (ret < 0)
2271		mlog(ML_ERROR, "%s: res %.*s, error %d send DEREF to node %u\n",
2272		     dlm->name, namelen, lockname, ret, res->owner);
2273	else if (r < 0) {
2274		/* BAD.  other node says I did not have a ref. */
2275		mlog(ML_ERROR, "%s: res %.*s, DEREF to node %u got %d\n",
2276		     dlm->name, namelen, lockname, res->owner, r);
2277		dlm_print_one_lock_resource(res);
2278		BUG();
2279	}
2280	return ret;
2281}
2282
2283int dlm_deref_lockres_handler(struct o2net_msg *msg, u32 len, void *data,
2284			      void **ret_data)
2285{
2286	struct dlm_ctxt *dlm = data;
2287	struct dlm_deref_lockres *deref = (struct dlm_deref_lockres *)msg->buf;
2288	struct dlm_lock_resource *res = NULL;
2289	char *name;
2290	unsigned int namelen;
2291	int ret = -EINVAL;
2292	u8 node;
2293	unsigned int hash;
2294	struct dlm_work_item *item;
2295	int cleared = 0;
2296	int dispatch = 0;
2297
2298	if (!dlm_grab(dlm))
2299		return 0;
2300
2301	name = deref->name;
2302	namelen = deref->namelen;
2303	node = deref->node_idx;
2304
2305	if (namelen > DLM_LOCKID_NAME_MAX) {
2306		mlog(ML_ERROR, "Invalid name length!");
2307		goto done;
2308	}
2309	if (deref->node_idx >= O2NM_MAX_NODES) {
2310		mlog(ML_ERROR, "Invalid node number: %u\n", node);
2311		goto done;
2312	}
2313
2314	hash = dlm_lockid_hash(name, namelen);
2315
2316	spin_lock(&dlm->spinlock);
2317	res = __dlm_lookup_lockres_full(dlm, name, namelen, hash);
2318	if (!res) {
2319		spin_unlock(&dlm->spinlock);
2320		mlog(ML_ERROR, "%s:%.*s: bad lockres name\n",
2321		     dlm->name, namelen, name);
2322		goto done;
2323	}
2324	spin_unlock(&dlm->spinlock);
2325
2326	spin_lock(&res->spinlock);
2327	if (res->state & DLM_LOCK_RES_SETREF_INPROG)
2328		dispatch = 1;
2329	else {
2330		BUG_ON(res->state & DLM_LOCK_RES_DROPPING_REF);
2331		if (test_bit(node, res->refmap)) {
2332			dlm_lockres_clear_refmap_bit(dlm, res, node);
2333			cleared = 1;
2334		}
2335	}
2336	spin_unlock(&res->spinlock);
2337
2338	if (!dispatch) {
2339		if (cleared)
2340			dlm_lockres_calc_usage(dlm, res);
2341		else {
2342			mlog(ML_ERROR, "%s:%.*s: node %u trying to drop ref "
2343		     	"but it is already dropped!\n", dlm->name,
2344		     	res->lockname.len, res->lockname.name, node);
2345			dlm_print_one_lock_resource(res);
2346		}
2347		ret = 0;
2348		goto done;
2349	}
2350
2351	item = kzalloc(sizeof(*item), GFP_NOFS);
2352	if (!item) {
2353		ret = -ENOMEM;
2354		mlog_errno(ret);
2355		goto done;
2356	}
2357
2358	dlm_init_work_item(dlm, item, dlm_deref_lockres_worker, NULL);
2359	item->u.dl.deref_res = res;
2360	item->u.dl.deref_node = node;
2361
2362	spin_lock(&dlm->work_lock);
2363	list_add_tail(&item->list, &dlm->work_list);
2364	spin_unlock(&dlm->work_lock);
2365
2366	queue_work(dlm->dlm_worker, &dlm->dispatched_work);
2367	return 0;
2368
2369done:
2370	if (res)
2371		dlm_lockres_put(res);
2372	dlm_put(dlm);
2373
2374	return ret;
2375}
2376
2377static void dlm_deref_lockres_worker(struct dlm_work_item *item, void *data)
2378{
2379	struct dlm_ctxt *dlm;
2380	struct dlm_lock_resource *res;
2381	u8 node;
2382	u8 cleared = 0;
2383
2384	dlm = item->dlm;
2385	res = item->u.dl.deref_res;
2386	node = item->u.dl.deref_node;
2387
2388	spin_lock(&res->spinlock);
2389	BUG_ON(res->state & DLM_LOCK_RES_DROPPING_REF);
2390	if (test_bit(node, res->refmap)) {
2391		__dlm_wait_on_lockres_flags(res, DLM_LOCK_RES_SETREF_INPROG);
2392		dlm_lockres_clear_refmap_bit(dlm, res, node);
2393		cleared = 1;
2394	}
2395	spin_unlock(&res->spinlock);
2396
2397	if (cleared) {
2398		mlog(0, "%s:%.*s node %u ref dropped in dispatch\n",
2399		     dlm->name, res->lockname.len, res->lockname.name, node);
2400		dlm_lockres_calc_usage(dlm, res);
2401	} else {
2402		mlog(ML_ERROR, "%s:%.*s: node %u trying to drop ref "
2403		     "but it is already dropped!\n", dlm->name,
2404		     res->lockname.len, res->lockname.name, node);
2405		dlm_print_one_lock_resource(res);
2406	}
2407
2408	dlm_lockres_put(res);
2409}
2410
2411/*
2412 * A migrateable resource is one that is :
2413 * 1. locally mastered, and,
2414 * 2. zero local locks, and,
2415 * 3. one or more non-local locks, or, one or more references
2416 * Returns 1 if yes, 0 if not.
2417 */
2418static int dlm_is_lockres_migrateable(struct dlm_ctxt *dlm,
2419				      struct dlm_lock_resource *res)
2420{
2421	enum dlm_lockres_list idx;
2422	int nonlocal = 0, node_ref;
2423	struct list_head *queue;
2424	struct dlm_lock *lock;
2425	u64 cookie;
2426
2427	assert_spin_locked(&res->spinlock);
2428
2429	/* delay migration when the lockres is in MIGRATING state */
2430	if (res->state & DLM_LOCK_RES_MIGRATING)
2431		return 0;
2432
2433	/* delay migration when the lockres is in RECOCERING state */
2434	if (res->state & DLM_LOCK_RES_RECOVERING)
2435		return 0;
2436
2437	if (res->owner != dlm->node_num)
2438		return 0;
2439
2440        for (idx = DLM_GRANTED_LIST; idx <= DLM_BLOCKED_LIST; idx++) {
2441		queue = dlm_list_idx_to_ptr(res, idx);
2442		list_for_each_entry(lock, queue, list) {
2443			if (lock->ml.node != dlm->node_num) {
2444				nonlocal++;
2445				continue;
2446			}
2447			cookie = be64_to_cpu(lock->ml.cookie);
2448			mlog(0, "%s: Not migrateable res %.*s, lock %u:%llu on "
2449			     "%s list\n", dlm->name, res->lockname.len,
2450			     res->lockname.name,
2451			     dlm_get_lock_cookie_node(cookie),
2452			     dlm_get_lock_cookie_seq(cookie),
2453			     dlm_list_in_text(idx));
2454			return 0;
2455		}
2456	}
2457
2458	if (!nonlocal) {
2459		node_ref = find_next_bit(res->refmap, O2NM_MAX_NODES, 0);
2460		if (node_ref >= O2NM_MAX_NODES)
2461			return 0;
2462	}
2463
2464	mlog(0, "%s: res %.*s, Migrateable\n", dlm->name, res->lockname.len,
2465	     res->lockname.name);
2466
2467	return 1;
2468}
2469
2470/*
2471 * DLM_MIGRATE_LOCKRES
2472 */
2473
2474
2475static int dlm_migrate_lockres(struct dlm_ctxt *dlm,
2476			       struct dlm_lock_resource *res, u8 target)
2477{
2478	struct dlm_master_list_entry *mle = NULL;
2479	struct dlm_master_list_entry *oldmle = NULL;
2480 	struct dlm_migratable_lockres *mres = NULL;
2481	int ret = 0;
2482	const char *name;
2483	unsigned int namelen;
2484	int mle_added = 0;
2485	int wake = 0;
2486
2487	if (!dlm_grab(dlm))
2488		return -EINVAL;
2489
2490	BUG_ON(target == O2NM_MAX_NODES);
2491
2492	name = res->lockname.name;
2493	namelen = res->lockname.len;
2494
2495	mlog(0, "%s: Migrating %.*s to node %u\n", dlm->name, namelen, name,
2496	     target);
2497
2498	/* preallocate up front. if this fails, abort */
2499	ret = -ENOMEM;
2500	mres = (struct dlm_migratable_lockres *) __get_free_page(GFP_NOFS);
2501	if (!mres) {
2502		mlog_errno(ret);
2503		goto leave;
2504	}
2505
2506	mle = kmem_cache_alloc(dlm_mle_cache, GFP_NOFS);
2507	if (!mle) {
2508		mlog_errno(ret);
2509		goto leave;
2510	}
2511	ret = 0;
2512
2513	/*
2514	 * clear any existing master requests and
2515	 * add the migration mle to the list
2516	 */
2517	spin_lock(&dlm->spinlock);
2518	spin_lock(&dlm->master_lock);
2519	ret = dlm_add_migration_mle(dlm, res, mle, &oldmle, name,
2520				    namelen, target, dlm->node_num);
2521	/* get an extra reference on the mle.
2522	 * otherwise the assert_master from the new
2523	 * master will destroy this.
2524	 */
2525	dlm_get_mle_inuse(mle);
2526	spin_unlock(&dlm->master_lock);
2527	spin_unlock(&dlm->spinlock);
2528
2529	if (ret == -EEXIST) {
2530		mlog(0, "another process is already migrating it\n");
2531		goto fail;
2532	}
2533	mle_added = 1;
2534
2535	/*
2536	 * set the MIGRATING flag and flush asts
2537	 * if we fail after this we need to re-dirty the lockres
2538	 */
2539	if (dlm_mark_lockres_migrating(dlm, res, target) < 0) {
2540		mlog(ML_ERROR, "tried to migrate %.*s to %u, but "
2541		     "the target went down.\n", res->lockname.len,
2542		     res->lockname.name, target);
2543		spin_lock(&res->spinlock);
2544		res->state &= ~DLM_LOCK_RES_MIGRATING;
2545		wake = 1;
2546		spin_unlock(&res->spinlock);
2547		ret = -EINVAL;
2548	}
2549
2550fail:
2551	if (oldmle) {
2552		/* master is known, detach if not already detached */
2553		dlm_mle_detach_hb_events(dlm, oldmle);
2554		dlm_put_mle(oldmle);
2555	}
2556
2557	if (ret < 0) {
2558		if (mle_added) {
2559			dlm_mle_detach_hb_events(dlm, mle);
2560			dlm_put_mle(mle);
2561			dlm_put_mle_inuse(mle);
2562		} else if (mle) {
2563			kmem_cache_free(dlm_mle_cache, mle);
2564			mle = NULL;
2565		}
2566		goto leave;
2567	}
2568
2569	/*
2570	 * at this point, we have a migration target, an mle
2571	 * in the master list, and the MIGRATING flag set on
2572	 * the lockres
2573	 */
2574
2575	/* now that remote nodes are spinning on the MIGRATING flag,
2576	 * ensure that all assert_master work is flushed. */
2577	flush_workqueue(dlm->dlm_worker);
2578
2579	/* notify new node and send all lock state */
2580	/* call send_one_lockres with migration flag.
2581	 * this serves as notice to the target node that a
2582	 * migration is starting. */
2583	ret = dlm_send_one_lockres(dlm, res, mres, target,
2584				   DLM_MRES_MIGRATION);
2585
2586	if (ret < 0) {
2587		mlog(0, "migration to node %u failed with %d\n",
2588		     target, ret);
2589		/* migration failed, detach and clean up mle */
2590		dlm_mle_detach_hb_events(dlm, mle);
2591		dlm_put_mle(mle);
2592		dlm_put_mle_inuse(mle);
2593		spin_lock(&res->spinlock);
2594		res->state &= ~DLM_LOCK_RES_MIGRATING;
2595		wake = 1;
2596		spin_unlock(&res->spinlock);
2597		if (dlm_is_host_down(ret))
2598			dlm_wait_for_node_death(dlm, target,
2599						DLM_NODE_DEATH_WAIT_MAX);
2600		goto leave;
2601	}
2602
2603	/* at this point, the target sends a message to all nodes,
2604	 * (using dlm_do_migrate_request).  this node is skipped since
2605	 * we had to put an mle in the list to begin the process.  this
2606	 * node now waits for target to do an assert master.  this node
2607	 * will be the last one notified, ensuring that the migration
2608	 * is complete everywhere.  if the target dies while this is
2609	 * going on, some nodes could potentially see the target as the
2610	 * master, so it is important that my recovery finds the migration
2611	 * mle and sets the master to UNKNOWN. */
2612
2613
2614	/* wait for new node to assert master */
2615	while (1) {
2616		ret = wait_event_interruptible_timeout(mle->wq,
2617					(atomic_read(&mle->woken) == 1),
2618					msecs_to_jiffies(5000));
2619
2620		if (ret >= 0) {
2621		       	if (atomic_read(&mle->woken) == 1 ||
2622			    res->owner == target)
2623				break;
2624
2625			mlog(0, "%s:%.*s: timed out during migration\n",
2626			     dlm->name, res->lockname.len, res->lockname.name);
2627			/* avoid hang during shutdown when migrating lockres
2628			 * to a node which also goes down */
2629			if (dlm_is_node_dead(dlm, target)) {
2630				mlog(0, "%s:%.*s: expected migration "
2631				     "target %u is no longer up, restarting\n",
2632				     dlm->name, res->lockname.len,
2633				     res->lockname.name, target);
2634				ret = -EINVAL;
2635				/* migration failed, detach and clean up mle */
2636				dlm_mle_detach_hb_events(dlm, mle);
2637				dlm_put_mle(mle);
2638				dlm_put_mle_inuse(mle);
2639				spin_lock(&res->spinlock);
2640				res->state &= ~DLM_LOCK_RES_MIGRATING;
2641				wake = 1;
2642				spin_unlock(&res->spinlock);
2643				goto leave;
2644			}
2645		} else
2646			mlog(0, "%s:%.*s: caught signal during migration\n",
2647			     dlm->name, res->lockname.len, res->lockname.name);
2648	}
2649
2650	/* all done, set the owner, clear the flag */
2651	spin_lock(&res->spinlock);
2652	dlm_set_lockres_owner(dlm, res, target);
2653	res->state &= ~DLM_LOCK_RES_MIGRATING;
2654	dlm_remove_nonlocal_locks(dlm, res);
2655	spin_unlock(&res->spinlock);
2656	wake_up(&res->wq);
2657
2658	/* master is known, detach if not already detached */
2659	dlm_mle_detach_hb_events(dlm, mle);
2660	dlm_put_mle_inuse(mle);
2661	ret = 0;
2662
2663	dlm_lockres_calc_usage(dlm, res);
2664
2665leave:
2666	/* re-dirty the lockres if we failed */
2667	if (ret < 0)
2668		dlm_kick_thread(dlm, res);
2669
2670	/* wake up waiters if the MIGRATING flag got set
2671	 * but migration failed */
2672	if (wake)
2673		wake_up(&res->wq);
2674
2675	if (mres)
2676		free_page((unsigned long)mres);
2677
2678	dlm_put(dlm);
2679
2680	mlog(0, "%s: Migrating %.*s to %u, returns %d\n", dlm->name, namelen,
2681	     name, target, ret);
2682	return ret;
2683}
2684
2685#define DLM_MIGRATION_RETRY_MS  100
2686
2687/*
2688 * Should be called only after beginning the domain leave process.
2689 * There should not be any remaining locks on nonlocal lock resources,
2690 * and there should be no local locks left on locally mastered resources.
2691 *
2692 * Called with the dlm spinlock held, may drop it to do migration, but
2693 * will re-acquire before exit.
2694 *
2695 * Returns: 1 if dlm->spinlock was dropped/retaken, 0 if never dropped
2696 */
2697int dlm_empty_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res)
2698{
2699	int ret;
2700	int lock_dropped = 0;
2701	u8 target = O2NM_MAX_NODES;
2702
2703	assert_spin_locked(&dlm->spinlock);
2704
2705	spin_lock(&res->spinlock);
2706	if (dlm_is_lockres_migrateable(dlm, res))
2707		target = dlm_pick_migration_target(dlm, res);
2708	spin_unlock(&res->spinlock);
2709
2710	if (target == O2NM_MAX_NODES)
2711		goto leave;
2712
2713	/* Wheee! Migrate lockres here! Will sleep so drop spinlock. */
2714	spin_unlock(&dlm->spinlock);
2715	lock_dropped = 1;
2716	ret = dlm_migrate_lockres(dlm, res, target);
2717	if (ret)
2718		mlog(0, "%s: res %.*s, Migrate to node %u failed with %d\n",
2719		     dlm->name, res->lockname.len, res->lockname.name,
2720		     target, ret);
2721	spin_lock(&dlm->spinlock);
2722leave:
2723	return lock_dropped;
2724}
2725
2726int dlm_lock_basts_flushed(struct dlm_ctxt *dlm, struct dlm_lock *lock)
2727{
2728	int ret;
2729	spin_lock(&dlm->ast_lock);
2730	spin_lock(&lock->spinlock);
2731	ret = (list_empty(&lock->bast_list) && !lock->bast_pending);
2732	spin_unlock(&lock->spinlock);
2733	spin_unlock(&dlm->ast_lock);
2734	return ret;
2735}
2736
2737static int dlm_migration_can_proceed(struct dlm_ctxt *dlm,
2738				     struct dlm_lock_resource *res,
2739				     u8 mig_target)
2740{
2741	int can_proceed;
2742	spin_lock(&res->spinlock);
2743	can_proceed = !!(res->state & DLM_LOCK_RES_MIGRATING);
2744	spin_unlock(&res->spinlock);
2745
2746	/* target has died, so make the caller break out of the
2747	 * wait_event, but caller must recheck the domain_map */
2748	spin_lock(&dlm->spinlock);
2749	if (!test_bit(mig_target, dlm->domain_map))
2750		can_proceed = 1;
2751	spin_unlock(&dlm->spinlock);
2752	return can_proceed;
2753}
2754
2755static int dlm_lockres_is_dirty(struct dlm_ctxt *dlm,
2756				struct dlm_lock_resource *res)
2757{
2758	int ret;
2759	spin_lock(&res->spinlock);
2760	ret = !!(res->state & DLM_LOCK_RES_DIRTY);
2761	spin_unlock(&res->spinlock);
2762	return ret;
2763}
2764
2765
2766static int dlm_mark_lockres_migrating(struct dlm_ctxt *dlm,
2767				       struct dlm_lock_resource *res,
2768				       u8 target)
2769{
2770	int ret = 0;
2771
2772	mlog(0, "dlm_mark_lockres_migrating: %.*s, from %u to %u\n",
2773	       res->lockname.len, res->lockname.name, dlm->node_num,
2774	       target);
2775	/* need to set MIGRATING flag on lockres.  this is done by
2776	 * ensuring that all asts have been flushed for this lockres. */
2777	spin_lock(&res->spinlock);
2778	BUG_ON(res->migration_pending);
2779	res->migration_pending = 1;
2780	/* strategy is to reserve an extra ast then release
2781	 * it below, letting the release do all of the work */
2782	__dlm_lockres_reserve_ast(res);
2783	spin_unlock(&res->spinlock);
2784
2785	/* now flush all the pending asts */
2786	dlm_kick_thread(dlm, res);
2787	/* before waiting on DIRTY, block processes which may
2788	 * try to dirty the lockres before MIGRATING is set */
2789	spin_lock(&res->spinlock);
2790	BUG_ON(res->state & DLM_LOCK_RES_BLOCK_DIRTY);
2791	res->state |= DLM_LOCK_RES_BLOCK_DIRTY;
2792	spin_unlock(&res->spinlock);
2793	/* now wait on any pending asts and the DIRTY state */
2794	wait_event(dlm->ast_wq, !dlm_lockres_is_dirty(dlm, res));
2795	dlm_lockres_release_ast(dlm, res);
2796
2797	mlog(0, "about to wait on migration_wq, dirty=%s\n",
2798	       res->state & DLM_LOCK_RES_DIRTY ? "yes" : "no");
2799	/* if the extra ref we just put was the final one, this
2800	 * will pass thru immediately.  otherwise, we need to wait
2801	 * for the last ast to finish. */
2802again:
2803	ret = wait_event_interruptible_timeout(dlm->migration_wq,
2804		   dlm_migration_can_proceed(dlm, res, target),
2805		   msecs_to_jiffies(1000));
2806	if (ret < 0) {
2807		mlog(0, "woken again: migrating? %s, dead? %s\n",
2808		       res->state & DLM_LOCK_RES_MIGRATING ? "yes":"no",
2809		       test_bit(target, dlm->domain_map) ? "no":"yes");
2810	} else {
2811		mlog(0, "all is well: migrating? %s, dead? %s\n",
2812		       res->state & DLM_LOCK_RES_MIGRATING ? "yes":"no",
2813		       test_bit(target, dlm->domain_map) ? "no":"yes");
2814	}
2815	if (!dlm_migration_can_proceed(dlm, res, target)) {
2816		mlog(0, "trying again...\n");
2817		goto again;
2818	}
2819
2820	ret = 0;
2821	/* did the target go down or die? */
2822	spin_lock(&dlm->spinlock);
2823	if (!test_bit(target, dlm->domain_map)) {
2824		mlog(ML_ERROR, "aha. migration target %u just went down\n",
2825		     target);
2826		ret = -EHOSTDOWN;
2827	}
2828	spin_unlock(&dlm->spinlock);
2829
2830	/*
2831	 * if target is down, we need to clear DLM_LOCK_RES_BLOCK_DIRTY for
2832	 * another try; otherwise, we are sure the MIGRATING state is there,
2833	 * drop the unneded state which blocked threads trying to DIRTY
2834	 */
2835	spin_lock(&res->spinlock);
2836	BUG_ON(!(res->state & DLM_LOCK_RES_BLOCK_DIRTY));
2837	res->state &= ~DLM_LOCK_RES_BLOCK_DIRTY;
2838	if (!ret)
2839		BUG_ON(!(res->state & DLM_LOCK_RES_MIGRATING));
2840	spin_unlock(&res->spinlock);
2841
2842	/*
2843	 * at this point:
2844	 *
2845	 *   o the DLM_LOCK_RES_MIGRATING flag is set if target not down
2846	 *   o there are no pending asts on this lockres
2847	 *   o all processes trying to reserve an ast on this
2848	 *     lockres must wait for the MIGRATING flag to clear
2849	 */
2850	return ret;
2851}
2852
2853/* last step in the migration process.
2854 * original master calls this to free all of the dlm_lock
2855 * structures that used to be for other nodes. */
2856static void dlm_remove_nonlocal_locks(struct dlm_ctxt *dlm,
2857				      struct dlm_lock_resource *res)
2858{
2859	struct list_head *queue = &res->granted;
2860	int i, bit;
2861	struct dlm_lock *lock, *next;
2862
2863	assert_spin_locked(&res->spinlock);
2864
2865	BUG_ON(res->owner == dlm->node_num);
2866
2867	for (i=0; i<3; i++) {
2868		list_for_each_entry_safe(lock, next, queue, list) {
2869			if (lock->ml.node != dlm->node_num) {
2870				mlog(0, "putting lock for node %u\n",
2871				     lock->ml.node);
2872				/* be extra careful */
2873				BUG_ON(!list_empty(&lock->ast_list));
2874				BUG_ON(!list_empty(&lock->bast_list));
2875				BUG_ON(lock->ast_pending);
2876				BUG_ON(lock->bast_pending);
2877				dlm_lockres_clear_refmap_bit(dlm, res,
2878							     lock->ml.node);
2879				list_del_init(&lock->list);
2880				dlm_lock_put(lock);
2881				/* In a normal unlock, we would have added a
2882				 * DLM_UNLOCK_FREE_LOCK action. Force it. */
2883				dlm_lock_put(lock);
2884			}
2885		}
2886		queue++;
2887	}
2888	bit = 0;
2889	while (1) {
2890		bit = find_next_bit(res->refmap, O2NM_MAX_NODES, bit);
2891		if (bit >= O2NM_MAX_NODES)
2892			break;
2893		/* do not clear the local node reference, if there is a
2894		 * process holding this, let it drop the ref itself */
2895		if (bit != dlm->node_num) {
2896			mlog(0, "%s:%.*s: node %u had a ref to this "
2897			     "migrating lockres, clearing\n", dlm->name,
2898			     res->lockname.len, res->lockname.name, bit);
2899			dlm_lockres_clear_refmap_bit(dlm, res, bit);
2900		}
2901		bit++;
2902	}
2903}
2904
2905/*
2906 * Pick a node to migrate the lock resource to. This function selects a
2907 * potential target based first on the locks and then on refmap. It skips
2908 * nodes that are in the process of exiting the domain.
2909 */
2910static u8 dlm_pick_migration_target(struct dlm_ctxt *dlm,
2911				    struct dlm_lock_resource *res)
2912{
2913	enum dlm_lockres_list idx;
2914	struct list_head *queue = &res->granted;
2915	struct dlm_lock *lock;
2916	int noderef;
2917	u8 nodenum = O2NM_MAX_NODES;
2918
2919	assert_spin_locked(&dlm->spinlock);
2920	assert_spin_locked(&res->spinlock);
2921
2922	/* Go through all the locks */
2923	for (idx = DLM_GRANTED_LIST; idx <= DLM_BLOCKED_LIST; idx++) {
2924		queue = dlm_list_idx_to_ptr(res, idx);
2925		list_for_each_entry(lock, queue, list) {
2926			if (lock->ml.node == dlm->node_num)
2927				continue;
2928			if (test_bit(lock->ml.node, dlm->exit_domain_map))
2929				continue;
2930			nodenum = lock->ml.node;
2931			goto bail;
2932		}
2933	}
2934
2935	/* Go thru the refmap */
2936	noderef = -1;
2937	while (1) {
2938		noderef = find_next_bit(res->refmap, O2NM_MAX_NODES,
2939					noderef + 1);
2940		if (noderef >= O2NM_MAX_NODES)
2941			break;
2942		if (noderef == dlm->node_num)
2943			continue;
2944		if (test_bit(noderef, dlm->exit_domain_map))
2945			continue;
2946		nodenum = noderef;
2947		goto bail;
2948	}
2949
2950bail:
2951	return nodenum;
2952}
2953
2954/* this is called by the new master once all lockres
2955 * data has been received */
2956static int dlm_do_migrate_request(struct dlm_ctxt *dlm,
2957				  struct dlm_lock_resource *res,
2958				  u8 master, u8 new_master,
2959				  struct dlm_node_iter *iter)
2960{
2961	struct dlm_migrate_request migrate;
2962	int ret, skip, status = 0;
2963	int nodenum;
2964
2965	memset(&migrate, 0, sizeof(migrate));
2966	migrate.namelen = res->lockname.len;
2967	memcpy(migrate.name, res->lockname.name, migrate.namelen);
2968	migrate.new_master = new_master;
2969	migrate.master = master;
2970
2971	ret = 0;
2972
2973	/* send message to all nodes, except the master and myself */
2974	while ((nodenum = dlm_node_iter_next(iter)) >= 0) {
2975		if (nodenum == master ||
2976		    nodenum == new_master)
2977			continue;
2978
2979		/* We could race exit domain. If exited, skip. */
2980		spin_lock(&dlm->spinlock);
2981		skip = (!test_bit(nodenum, dlm->domain_map));
2982		spin_unlock(&dlm->spinlock);
2983		if (skip) {
2984			clear_bit(nodenum, iter->node_map);
2985			continue;
2986		}
2987
2988		ret = o2net_send_message(DLM_MIGRATE_REQUEST_MSG, dlm->key,
2989					 &migrate, sizeof(migrate), nodenum,
2990					 &status);
2991		if (ret < 0) {
2992			mlog(ML_ERROR, "%s: res %.*s, Error %d send "
2993			     "MIGRATE_REQUEST to node %u\n", dlm->name,
2994			     migrate.namelen, migrate.name, ret, nodenum);
2995			if (!dlm_is_host_down(ret)) {
2996				mlog(ML_ERROR, "unhandled error=%d!\n", ret);
2997				BUG();
2998			}
2999			clear_bit(nodenum, iter->node_map);
3000			ret = 0;
3001		} else if (status < 0) {
3002			mlog(0, "migrate request (node %u) returned %d!\n",
3003			     nodenum, status);
3004			ret = status;
3005		} else if (status == DLM_MIGRATE_RESPONSE_MASTERY_REF) {
3006			/* during the migration request we short-circuited
3007			 * the mastery of the lockres.  make sure we have
3008			 * a mastery ref for nodenum */
3009			mlog(0, "%s:%.*s: need ref for node %u\n",
3010			     dlm->name, res->lockname.len, res->lockname.name,
3011			     nodenum);
3012			spin_lock(&res->spinlock);
3013			dlm_lockres_set_refmap_bit(dlm, res, nodenum);
3014			spin_unlock(&res->spinlock);
3015		}
3016	}
3017
3018	if (ret < 0)
3019		mlog_errno(ret);
3020
3021	mlog(0, "returning ret=%d\n", ret);
3022	return ret;
3023}
3024
3025
3026/* if there is an existing mle for this lockres, we now know who the master is.
3027 * (the one who sent us *this* message) we can clear it up right away.
3028 * since the process that put the mle on the list still has a reference to it,
3029 * we can unhash it now, set the master and wake the process.  as a result,
3030 * we will have no mle in the list to start with.  now we can add an mle for
3031 * the migration and this should be the only one found for those scanning the
3032 * list.  */
3033int dlm_migrate_request_handler(struct o2net_msg *msg, u32 len, void *data,
3034				void **ret_data)
3035{
3036	struct dlm_ctxt *dlm = data;
3037	struct dlm_lock_resource *res = NULL;
3038	struct dlm_migrate_request *migrate = (struct dlm_migrate_request *) msg->buf;
3039	struct dlm_master_list_entry *mle = NULL, *oldmle = NULL;
3040	const char *name;
3041	unsigned int namelen, hash;
3042	int ret = 0;
3043
3044	if (!dlm_grab(dlm))
3045		return -EINVAL;
3046
3047	name = migrate->name;
3048	namelen = migrate->namelen;
3049	hash = dlm_lockid_hash(name, namelen);
3050
3051	/* preallocate.. if this fails, abort */
3052	mle = kmem_cache_alloc(dlm_mle_cache, GFP_NOFS);
3053
3054	if (!mle) {
3055		ret = -ENOMEM;
3056		goto leave;
3057	}
3058
3059	/* check for pre-existing lock */
3060	spin_lock(&dlm->spinlock);
3061	res = __dlm_lookup_lockres(dlm, name, namelen, hash);
3062	if (res) {
3063		spin_lock(&res->spinlock);
3064		if (res->state & DLM_LOCK_RES_RECOVERING) {
3065			/* if all is working ok, this can only mean that we got
3066		 	* a migrate request from a node that we now see as
3067		 	* dead.  what can we do here?  drop it to the floor? */
3068			spin_unlock(&res->spinlock);
3069			mlog(ML_ERROR, "Got a migrate request, but the "
3070			     "lockres is marked as recovering!");
3071			kmem_cache_free(dlm_mle_cache, mle);
3072			ret = -EINVAL; /* need a better solution */
3073			goto unlock;
3074		}
3075		res->state |= DLM_LOCK_RES_MIGRATING;
3076		spin_unlock(&res->spinlock);
3077	}
3078
3079	spin_lock(&dlm->master_lock);
3080	/* ignore status.  only nonzero status would BUG. */
3081	ret = dlm_add_migration_mle(dlm, res, mle, &oldmle,
3082				    name, namelen,
3083				    migrate->new_master,
3084				    migrate->master);
3085
3086	spin_unlock(&dlm->master_lock);
3087unlock:
3088	spin_unlock(&dlm->spinlock);
3089
3090	if (oldmle) {
3091		/* master is known, detach if not already detached */
3092		dlm_mle_detach_hb_events(dlm, oldmle);
3093		dlm_put_mle(oldmle);
3094	}
3095
3096	if (res)
3097		dlm_lockres_put(res);
3098leave:
3099	dlm_put(dlm);
3100	return ret;
3101}
3102
3103/* must be holding dlm->spinlock and dlm->master_lock
3104 * when adding a migration mle, we can clear any other mles
3105 * in the master list because we know with certainty that
3106 * the master is "master".  so we remove any old mle from
3107 * the list after setting it's master field, and then add
3108 * the new migration mle.  this way we can hold with the rule
3109 * of having only one mle for a given lock name at all times. */
3110static int dlm_add_migration_mle(struct dlm_ctxt *dlm,
3111				 struct dlm_lock_resource *res,
3112				 struct dlm_master_list_entry *mle,
3113				 struct dlm_master_list_entry **oldmle,
3114				 const char *name, unsigned int namelen,
3115				 u8 new_master, u8 master)
3116{
3117	int found;
3118	int ret = 0;
3119
3120	*oldmle = NULL;
3121
3122	assert_spin_locked(&dlm->spinlock);
3123	assert_spin_locked(&dlm->master_lock);
3124
3125	/* caller is responsible for any ref taken here on oldmle */
3126	found = dlm_find_mle(dlm, oldmle, (char *)name, namelen);
3127	if (found) {
3128		struct dlm_master_list_entry *tmp = *oldmle;
3129		spin_lock(&tmp->spinlock);
3130		if (tmp->type == DLM_MLE_MIGRATION) {
3131			if (master == dlm->node_num) {
3132				/* ah another process raced me to it */
3133				mlog(0, "tried to migrate %.*s, but some "
3134				     "process beat me to it\n",
3135				     namelen, name);
3136				ret = -EEXIST;
3137			} else {
3138				/* bad.  2 NODES are trying to migrate! */
3139				mlog(ML_ERROR, "migration error  mle: "
3140				     "master=%u new_master=%u // request: "
3141				     "master=%u new_master=%u // "
3142				     "lockres=%.*s\n",
3143				     tmp->master, tmp->new_master,
3144				     master, new_master,
3145				     namelen, name);
3146				BUG();
3147			}
3148		} else {
3149			/* this is essentially what assert_master does */
3150			tmp->master = master;
3151			atomic_set(&tmp->woken, 1);
3152			wake_up(&tmp->wq);
3153			/* remove it so that only one mle will be found */
3154			__dlm_unlink_mle(dlm, tmp);
3155			__dlm_mle_detach_hb_events(dlm, tmp);
3156			if (tmp->type == DLM_MLE_MASTER) {
3157				ret = DLM_MIGRATE_RESPONSE_MASTERY_REF;
3158				mlog(0, "%s:%.*s: master=%u, newmaster=%u, "
3159						"telling master to get ref "
3160						"for cleared out mle during "
3161						"migration\n", dlm->name,
3162						namelen, name, master,
3163						new_master);
3164			}
3165		}
3166		spin_unlock(&tmp->spinlock);
3167	}
3168
3169	/* now add a migration mle to the tail of the list */
3170	dlm_init_mle(mle, DLM_MLE_MIGRATION, dlm, res, name, namelen);
3171	mle->new_master = new_master;
3172	/* the new master will be sending an assert master for this.
3173	 * at that point we will get the refmap reference */
3174	mle->master = master;
3175	/* do this for consistency with other mle types */
3176	set_bit(new_master, mle->maybe_map);
3177	__dlm_insert_mle(dlm, mle);
3178
3179	return ret;
3180}
3181
3182/*
3183 * Sets the owner of the lockres, associated to the mle, to UNKNOWN
3184 */
3185static struct dlm_lock_resource *dlm_reset_mleres_owner(struct dlm_ctxt *dlm,
3186					struct dlm_master_list_entry *mle)
3187{
3188	struct dlm_lock_resource *res;
3189
3190	/* Find the lockres associated to the mle and set its owner to UNK */
3191	res = __dlm_lookup_lockres(dlm, mle->mname, mle->mnamelen,
3192				   mle->mnamehash);
3193	if (res) {
3194		spin_unlock(&dlm->master_lock);
3195
3196		/* move lockres onto recovery list */
3197		spin_lock(&res->spinlock);
3198		dlm_set_lockres_owner(dlm, res, DLM_LOCK_RES_OWNER_UNKNOWN);
3199		dlm_move_lockres_to_recovery_list(dlm, res);
3200		spin_unlock(&res->spinlock);
3201		dlm_lockres_put(res);
3202
3203		/* about to get rid of mle, detach from heartbeat */
3204		__dlm_mle_detach_hb_events(dlm, mle);
3205
3206		/* dump the mle */
3207		spin_lock(&dlm->master_lock);
3208		__dlm_put_mle(mle);
3209		spin_unlock(&dlm->master_lock);
3210	}
3211
3212	return res;
3213}
3214
3215static void dlm_clean_migration_mle(struct dlm_ctxt *dlm,
3216				    struct dlm_master_list_entry *mle)
3217{
3218	__dlm_mle_detach_hb_events(dlm, mle);
3219
3220	spin_lock(&mle->spinlock);
3221	__dlm_unlink_mle(dlm, mle);
3222	atomic_set(&mle->woken, 1);
3223	spin_unlock(&mle->spinlock);
3224
3225	wake_up(&mle->wq);
3226}
3227
3228static void dlm_clean_block_mle(struct dlm_ctxt *dlm,
3229				struct dlm_master_list_entry *mle, u8 dead_node)
3230{
3231	int bit;
3232
3233	BUG_ON(mle->type != DLM_MLE_BLOCK);
3234
3235	spin_lock(&mle->spinlock);
3236	bit = find_next_bit(mle->maybe_map, O2NM_MAX_NODES, 0);
3237	if (bit != dead_node) {
3238		mlog(0, "mle found, but dead node %u would not have been "
3239		     "master\n", dead_node);
3240		spin_unlock(&mle->spinlock);
3241	} else {
3242		/* Must drop the refcount by one since the assert_master will
3243		 * never arrive. This may result in the mle being unlinked and
3244		 * freed, but there may still be a process waiting in the
3245		 * dlmlock path which is fine. */
3246		mlog(0, "node %u was expected master\n", dead_node);
3247		atomic_set(&mle->woken, 1);
3248		spin_unlock(&mle->spinlock);
3249		wake_up(&mle->wq);
3250
3251		/* Do not need events any longer, so detach from heartbeat */
3252		__dlm_mle_detach_hb_events(dlm, mle);
3253		__dlm_put_mle(mle);
3254	}
3255}
3256
3257void dlm_clean_master_list(struct dlm_ctxt *dlm, u8 dead_node)
3258{
3259	struct dlm_master_list_entry *mle;
3260	struct dlm_lock_resource *res;
3261	struct hlist_head *bucket;
3262	struct hlist_node *tmp;
3263	unsigned int i;
3264
3265	mlog(0, "dlm=%s, dead node=%u\n", dlm->name, dead_node);
3266top:
3267	assert_spin_locked(&dlm->spinlock);
3268
3269	/* clean the master list */
3270	spin_lock(&dlm->master_lock);
3271	for (i = 0; i < DLM_HASH_BUCKETS; i++) {
3272		bucket = dlm_master_hash(dlm, i);
3273		hlist_for_each_entry_safe(mle, tmp, bucket, master_hash_node) {
3274			BUG_ON(mle->type != DLM_MLE_BLOCK &&
3275			       mle->type != DLM_MLE_MASTER &&
3276			       mle->type != DLM_MLE_MIGRATION);
3277
3278			/* MASTER mles are initiated locally. The waiting
3279			 * process will notice the node map change shortly.
3280			 * Let that happen as normal. */
3281			if (mle->type == DLM_MLE_MASTER)
3282				continue;
3283
3284			/* BLOCK mles are initiated by other nodes. Need to
3285			 * clean up if the dead node would have been the
3286			 * master. */
3287			if (mle->type == DLM_MLE_BLOCK) {
3288				dlm_clean_block_mle(dlm, mle, dead_node);
3289				continue;
3290			}
3291
3292			/* Everything else is a MIGRATION mle */
3293
3294			/* The rule for MIGRATION mles is that the master
3295			 * becomes UNKNOWN if *either* the original or the new
3296			 * master dies. All UNKNOWN lockres' are sent to
3297			 * whichever node becomes the recovery master. The new
3298			 * master is responsible for determining if there is
3299			 * still a master for this lockres, or if he needs to
3300			 * take over mastery. Either way, this node should
3301			 * expect another message to resolve this. */
3302
3303			if (mle->master != dead_node &&
3304			    mle->new_master != dead_node)
3305				continue;
3306
3307			if (mle->new_master == dead_node && mle->inuse) {
3308				mlog(ML_NOTICE, "%s: target %u died during "
3309						"migration from %u, the MLE is "
3310						"still keep used, ignore it!\n",
3311						dlm->name, dead_node,
3312						mle->master);
3313				continue;
3314			}
3315
3316			/* If we have reached this point, this mle needs to be
3317			 * removed from the list and freed. */
3318			dlm_clean_migration_mle(dlm, mle);
3319
3320			mlog(0, "%s: node %u died during migration from "
3321			     "%u to %u!\n", dlm->name, dead_node, mle->master,
3322			     mle->new_master);
3323
3324			/* If we find a lockres associated with the mle, we've
3325			 * hit this rare case that messes up our lock ordering.
3326			 * If so, we need to drop the master lock so that we can
3327			 * take the lockres lock, meaning that we will have to
3328			 * restart from the head of list. */
3329			res = dlm_reset_mleres_owner(dlm, mle);
3330			if (res)
3331				/* restart */
3332				goto top;
3333
3334			/* This may be the last reference */
3335			__dlm_put_mle(mle);
3336		}
3337	}
3338	spin_unlock(&dlm->master_lock);
3339}
3340
3341int dlm_finish_migration(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
3342			 u8 old_master)
3343{
3344	struct dlm_node_iter iter;
3345	int ret = 0;
3346
3347	spin_lock(&dlm->spinlock);
3348	dlm_node_iter_init(dlm->domain_map, &iter);
3349	clear_bit(old_master, iter.node_map);
3350	clear_bit(dlm->node_num, iter.node_map);
3351	spin_unlock(&dlm->spinlock);
3352
3353	/* ownership of the lockres is changing.  account for the
3354	 * mastery reference here since old_master will briefly have
3355	 * a reference after the migration completes */
3356	spin_lock(&res->spinlock);
3357	dlm_lockres_set_refmap_bit(dlm, res, old_master);
3358	spin_unlock(&res->spinlock);
3359
3360	mlog(0, "now time to do a migrate request to other nodes\n");
3361	ret = dlm_do_migrate_request(dlm, res, old_master,
3362				     dlm->node_num, &iter);
3363	if (ret < 0) {
3364		mlog_errno(ret);
3365		goto leave;
3366	}
3367
3368	mlog(0, "doing assert master of %.*s to all except the original node\n",
3369	     res->lockname.len, res->lockname.name);
3370	/* this call now finishes out the nodemap
3371	 * even if one or more nodes die */
3372	ret = dlm_do_assert_master(dlm, res, iter.node_map,
3373				   DLM_ASSERT_MASTER_FINISH_MIGRATION);
3374	if (ret < 0) {
3375		/* no longer need to retry.  all living nodes contacted. */
3376		mlog_errno(ret);
3377		ret = 0;
3378	}
3379
3380	memset(iter.node_map, 0, sizeof(iter.node_map));
3381	set_bit(old_master, iter.node_map);
3382	mlog(0, "doing assert master of %.*s back to %u\n",
3383	     res->lockname.len, res->lockname.name, old_master);
3384	ret = dlm_do_assert_master(dlm, res, iter.node_map,
3385				   DLM_ASSERT_MASTER_FINISH_MIGRATION);
3386	if (ret < 0) {
3387		mlog(0, "assert master to original master failed "
3388		     "with %d.\n", ret);
3389		/* the only nonzero status here would be because of
3390		 * a dead original node.  we're done. */
3391		ret = 0;
3392	}
3393
3394	/* all done, set the owner, clear the flag */
3395	spin_lock(&res->spinlock);
3396	dlm_set_lockres_owner(dlm, res, dlm->node_num);
3397	res->state &= ~DLM_LOCK_RES_MIGRATING;
3398	spin_unlock(&res->spinlock);
3399	/* re-dirty it on the new master */
3400	dlm_kick_thread(dlm, res);
3401	wake_up(&res->wq);
3402leave:
3403	return ret;
3404}
3405
3406/*
3407 * LOCKRES AST REFCOUNT
3408 * this is integral to migration
3409 */
3410
3411/* for future intent to call an ast, reserve one ahead of time.
3412 * this should be called only after waiting on the lockres
3413 * with dlm_wait_on_lockres, and while still holding the
3414 * spinlock after the call. */
3415void __dlm_lockres_reserve_ast(struct dlm_lock_resource *res)
3416{
3417	assert_spin_locked(&res->spinlock);
3418	if (res->state & DLM_LOCK_RES_MIGRATING) {
3419		__dlm_print_one_lock_resource(res);
3420	}
3421	BUG_ON(res->state & DLM_LOCK_RES_MIGRATING);
3422
3423	atomic_inc(&res->asts_reserved);
3424}
3425
3426/*
3427 * used to drop the reserved ast, either because it went unused,
3428 * or because the ast/bast was actually called.
3429 *
3430 * also, if there is a pending migration on this lockres,
3431 * and this was the last pending ast on the lockres,
3432 * atomically set the MIGRATING flag before we drop the lock.
3433 * this is how we ensure that migration can proceed with no
3434 * asts in progress.  note that it is ok if the state of the
3435 * queues is such that a lock should be granted in the future
3436 * or that a bast should be fired, because the new master will
3437 * shuffle the lists on this lockres as soon as it is migrated.
3438 */
3439void dlm_lockres_release_ast(struct dlm_ctxt *dlm,
3440			     struct dlm_lock_resource *res)
3441{
3442	if (!atomic_dec_and_lock(&res->asts_reserved, &res->spinlock))
3443		return;
3444
3445	if (!res->migration_pending) {
3446		spin_unlock(&res->spinlock);
3447		return;
3448	}
3449
3450	BUG_ON(res->state & DLM_LOCK_RES_MIGRATING);
3451	res->migration_pending = 0;
3452	res->state |= DLM_LOCK_RES_MIGRATING;
3453	spin_unlock(&res->spinlock);
3454	wake_up(&res->wq);
3455	wake_up(&dlm->migration_wq);
3456}
3457
3458void dlm_force_free_mles(struct dlm_ctxt *dlm)
3459{
3460	int i;
3461	struct hlist_head *bucket;
3462	struct dlm_master_list_entry *mle;
3463	struct hlist_node *tmp;
3464
3465	/*
3466	 * We notified all other nodes that we are exiting the domain and
3467	 * marked the dlm state to DLM_CTXT_LEAVING. If any mles are still
3468	 * around we force free them and wake any processes that are waiting
3469	 * on the mles
3470	 */
3471	spin_lock(&dlm->spinlock);
3472	spin_lock(&dlm->master_lock);
3473
3474	BUG_ON(dlm->dlm_state != DLM_CTXT_LEAVING);
3475	BUG_ON((find_next_bit(dlm->domain_map, O2NM_MAX_NODES, 0) < O2NM_MAX_NODES));
3476
3477	for (i = 0; i < DLM_HASH_BUCKETS; i++) {
3478		bucket = dlm_master_hash(dlm, i);
3479		hlist_for_each_entry_safe(mle, tmp, bucket, master_hash_node) {
3480			if (mle->type != DLM_MLE_BLOCK) {
3481				mlog(ML_ERROR, "bad mle: %p\n", mle);
3482				dlm_print_one_mle(mle);
3483			}
3484			atomic_set(&mle->woken, 1);
3485			wake_up(&mle->wq);
3486
3487			__dlm_unlink_mle(dlm, mle);
3488			__dlm_mle_detach_hb_events(dlm, mle);
3489			__dlm_put_mle(mle);
3490		}
3491	}
3492	spin_unlock(&dlm->master_lock);
3493	spin_unlock(&dlm->spinlock);
3494}
3495