root/fs/dlm/recover.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. dlm_wait_function
  2. dlm_recover_status
  3. _set_recover_status
  4. dlm_set_recover_status
  5. wait_status_all
  6. wait_status_low
  7. wait_status
  8. dlm_recover_members_wait
  9. dlm_recover_directory_wait
  10. dlm_recover_locks_wait
  11. dlm_recover_done_wait
  12. recover_list_empty
  13. recover_list_add
  14. recover_list_del
  15. recover_list_clear
  16. recover_idr_empty
  17. recover_idr_add
  18. recover_idr_del
  19. recover_idr_find
  20. recover_idr_clear
  21. set_lock_master
  22. set_master_lkbs
  23. set_new_master
  24. recover_master
  25. recover_master_static
  26. dlm_recover_masters
  27. dlm_recover_master_reply
  28. recover_locks_queue
  29. recover_locks
  30. dlm_recover_locks
  31. dlm_recovered_lock
  32. recover_lvb
  33. recover_conversion
  34. recover_grant
  35. dlm_recover_rsbs
  36. dlm_create_root_list
  37. dlm_release_root_list
  38. dlm_clear_toss

   1 // SPDX-License-Identifier: GPL-2.0-only
   2 /******************************************************************************
   3 *******************************************************************************
   4 **
   5 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
   6 **  Copyright (C) 2004-2005 Red Hat, Inc.  All rights reserved.
   7 **
   8 **
   9 *******************************************************************************
  10 ******************************************************************************/
  11 
  12 #include "dlm_internal.h"
  13 #include "lockspace.h"
  14 #include "dir.h"
  15 #include "config.h"
  16 #include "ast.h"
  17 #include "memory.h"
  18 #include "rcom.h"
  19 #include "lock.h"
  20 #include "lowcomms.h"
  21 #include "member.h"
  22 #include "recover.h"
  23 
  24 
  25 /*
  26  * Recovery waiting routines: these functions wait for a particular reply from
  27  * a remote node, or for the remote node to report a certain status.  They need
  28  * to abort if the lockspace is stopped indicating a node has failed (perhaps
  29  * the one being waited for).
  30  */
  31 
  32 /*
  33  * Wait until given function returns non-zero or lockspace is stopped
  34  * (LS_RECOVERY_STOP set due to failure of a node in ls_nodes).  When another
  35  * function thinks it could have completed the waited-on task, they should wake
  36  * up ls_wait_general to get an immediate response rather than waiting for the
  37  * timeout.  This uses a timeout so it can check periodically if the wait
  38  * should abort due to node failure (which doesn't cause a wake_up).
  39  * This should only be called by the dlm_recoverd thread.
  40  */
  41 
  42 int dlm_wait_function(struct dlm_ls *ls, int (*testfn) (struct dlm_ls *ls))
  43 {
  44         int error = 0;
  45         int rv;
  46 
  47         while (1) {
  48                 rv = wait_event_timeout(ls->ls_wait_general,
  49                                         testfn(ls) || dlm_recovery_stopped(ls),
  50                                         dlm_config.ci_recover_timer * HZ);
  51                 if (rv)
  52                         break;
  53                 if (test_bit(LSFL_RCOM_WAIT, &ls->ls_flags)) {
  54                         log_debug(ls, "dlm_wait_function timed out");
  55                         return -ETIMEDOUT;
  56                 }
  57         }
  58 
  59         if (dlm_recovery_stopped(ls)) {
  60                 log_debug(ls, "dlm_wait_function aborted");
  61                 error = -EINTR;
  62         }
  63         return error;
  64 }
  65 
  66 /*
  67  * An efficient way for all nodes to wait for all others to have a certain
  68  * status.  The node with the lowest nodeid polls all the others for their
  69  * status (wait_status_all) and all the others poll the node with the low id
  70  * for its accumulated result (wait_status_low).  When all nodes have set
  71  * status flag X, then status flag X_ALL will be set on the low nodeid.
  72  */
  73 
  74 uint32_t dlm_recover_status(struct dlm_ls *ls)
  75 {
  76         uint32_t status;
  77         spin_lock(&ls->ls_recover_lock);
  78         status = ls->ls_recover_status;
  79         spin_unlock(&ls->ls_recover_lock);
  80         return status;
  81 }
  82 
  83 static void _set_recover_status(struct dlm_ls *ls, uint32_t status)
  84 {
  85         ls->ls_recover_status |= status;
  86 }
  87 
  88 void dlm_set_recover_status(struct dlm_ls *ls, uint32_t status)
  89 {
  90         spin_lock(&ls->ls_recover_lock);
  91         _set_recover_status(ls, status);
  92         spin_unlock(&ls->ls_recover_lock);
  93 }
  94 
  95 static int wait_status_all(struct dlm_ls *ls, uint32_t wait_status,
  96                            int save_slots)
  97 {
  98         struct dlm_rcom *rc = ls->ls_recover_buf;
  99         struct dlm_member *memb;
 100         int error = 0, delay;
 101 
 102         list_for_each_entry(memb, &ls->ls_nodes, list) {
 103                 delay = 0;
 104                 for (;;) {
 105                         if (dlm_recovery_stopped(ls)) {
 106                                 error = -EINTR;
 107                                 goto out;
 108                         }
 109 
 110                         error = dlm_rcom_status(ls, memb->nodeid, 0);
 111                         if (error)
 112                                 goto out;
 113 
 114                         if (save_slots)
 115                                 dlm_slot_save(ls, rc, memb);
 116 
 117                         if (rc->rc_result & wait_status)
 118                                 break;
 119                         if (delay < 1000)
 120                                 delay += 20;
 121                         msleep(delay);
 122                 }
 123         }
 124  out:
 125         return error;
 126 }
 127 
 128 static int wait_status_low(struct dlm_ls *ls, uint32_t wait_status,
 129                            uint32_t status_flags)
 130 {
 131         struct dlm_rcom *rc = ls->ls_recover_buf;
 132         int error = 0, delay = 0, nodeid = ls->ls_low_nodeid;
 133 
 134         for (;;) {
 135                 if (dlm_recovery_stopped(ls)) {
 136                         error = -EINTR;
 137                         goto out;
 138                 }
 139 
 140                 error = dlm_rcom_status(ls, nodeid, status_flags);
 141                 if (error)
 142                         break;
 143 
 144                 if (rc->rc_result & wait_status)
 145                         break;
 146                 if (delay < 1000)
 147                         delay += 20;
 148                 msleep(delay);
 149         }
 150  out:
 151         return error;
 152 }
 153 
 154 static int wait_status(struct dlm_ls *ls, uint32_t status)
 155 {
 156         uint32_t status_all = status << 1;
 157         int error;
 158 
 159         if (ls->ls_low_nodeid == dlm_our_nodeid()) {
 160                 error = wait_status_all(ls, status, 0);
 161                 if (!error)
 162                         dlm_set_recover_status(ls, status_all);
 163         } else
 164                 error = wait_status_low(ls, status_all, 0);
 165 
 166         return error;
 167 }
 168 
 169 int dlm_recover_members_wait(struct dlm_ls *ls)
 170 {
 171         struct dlm_member *memb;
 172         struct dlm_slot *slots;
 173         int num_slots, slots_size;
 174         int error, rv;
 175         uint32_t gen;
 176 
 177         list_for_each_entry(memb, &ls->ls_nodes, list) {
 178                 memb->slot = -1;
 179                 memb->generation = 0;
 180         }
 181 
 182         if (ls->ls_low_nodeid == dlm_our_nodeid()) {
 183                 error = wait_status_all(ls, DLM_RS_NODES, 1);
 184                 if (error)
 185                         goto out;
 186 
 187                 /* slots array is sparse, slots_size may be > num_slots */
 188 
 189                 rv = dlm_slots_assign(ls, &num_slots, &slots_size, &slots, &gen);
 190                 if (!rv) {
 191                         spin_lock(&ls->ls_recover_lock);
 192                         _set_recover_status(ls, DLM_RS_NODES_ALL);
 193                         ls->ls_num_slots = num_slots;
 194                         ls->ls_slots_size = slots_size;
 195                         ls->ls_slots = slots;
 196                         ls->ls_generation = gen;
 197                         spin_unlock(&ls->ls_recover_lock);
 198                 } else {
 199                         dlm_set_recover_status(ls, DLM_RS_NODES_ALL);
 200                 }
 201         } else {
 202                 error = wait_status_low(ls, DLM_RS_NODES_ALL, DLM_RSF_NEED_SLOTS);
 203                 if (error)
 204                         goto out;
 205 
 206                 dlm_slots_copy_in(ls);
 207         }
 208  out:
 209         return error;
 210 }
 211 
 212 int dlm_recover_directory_wait(struct dlm_ls *ls)
 213 {
 214         return wait_status(ls, DLM_RS_DIR);
 215 }
 216 
 217 int dlm_recover_locks_wait(struct dlm_ls *ls)
 218 {
 219         return wait_status(ls, DLM_RS_LOCKS);
 220 }
 221 
 222 int dlm_recover_done_wait(struct dlm_ls *ls)
 223 {
 224         return wait_status(ls, DLM_RS_DONE);
 225 }
 226 
 227 /*
 228  * The recover_list contains all the rsb's for which we've requested the new
 229  * master nodeid.  As replies are returned from the resource directories the
 230  * rsb's are removed from the list.  When the list is empty we're done.
 231  *
 232  * The recover_list is later similarly used for all rsb's for which we've sent
 233  * new lkb's and need to receive new corresponding lkid's.
 234  *
 235  * We use the address of the rsb struct as a simple local identifier for the
 236  * rsb so we can match an rcom reply with the rsb it was sent for.
 237  */
 238 
 239 static int recover_list_empty(struct dlm_ls *ls)
 240 {
 241         int empty;
 242 
 243         spin_lock(&ls->ls_recover_list_lock);
 244         empty = list_empty(&ls->ls_recover_list);
 245         spin_unlock(&ls->ls_recover_list_lock);
 246 
 247         return empty;
 248 }
 249 
 250 static void recover_list_add(struct dlm_rsb *r)
 251 {
 252         struct dlm_ls *ls = r->res_ls;
 253 
 254         spin_lock(&ls->ls_recover_list_lock);
 255         if (list_empty(&r->res_recover_list)) {
 256                 list_add_tail(&r->res_recover_list, &ls->ls_recover_list);
 257                 ls->ls_recover_list_count++;
 258                 dlm_hold_rsb(r);
 259         }
 260         spin_unlock(&ls->ls_recover_list_lock);
 261 }
 262 
 263 static void recover_list_del(struct dlm_rsb *r)
 264 {
 265         struct dlm_ls *ls = r->res_ls;
 266 
 267         spin_lock(&ls->ls_recover_list_lock);
 268         list_del_init(&r->res_recover_list);
 269         ls->ls_recover_list_count--;
 270         spin_unlock(&ls->ls_recover_list_lock);
 271 
 272         dlm_put_rsb(r);
 273 }
 274 
 275 static void recover_list_clear(struct dlm_ls *ls)
 276 {
 277         struct dlm_rsb *r, *s;
 278 
 279         spin_lock(&ls->ls_recover_list_lock);
 280         list_for_each_entry_safe(r, s, &ls->ls_recover_list, res_recover_list) {
 281                 list_del_init(&r->res_recover_list);
 282                 r->res_recover_locks_count = 0;
 283                 dlm_put_rsb(r);
 284                 ls->ls_recover_list_count--;
 285         }
 286 
 287         if (ls->ls_recover_list_count != 0) {
 288                 log_error(ls, "warning: recover_list_count %d",
 289                           ls->ls_recover_list_count);
 290                 ls->ls_recover_list_count = 0;
 291         }
 292         spin_unlock(&ls->ls_recover_list_lock);
 293 }
 294 
 295 static int recover_idr_empty(struct dlm_ls *ls)
 296 {
 297         int empty = 1;
 298 
 299         spin_lock(&ls->ls_recover_idr_lock);
 300         if (ls->ls_recover_list_count)
 301                 empty = 0;
 302         spin_unlock(&ls->ls_recover_idr_lock);
 303 
 304         return empty;
 305 }
 306 
 307 static int recover_idr_add(struct dlm_rsb *r)
 308 {
 309         struct dlm_ls *ls = r->res_ls;
 310         int rv;
 311 
 312         idr_preload(GFP_NOFS);
 313         spin_lock(&ls->ls_recover_idr_lock);
 314         if (r->res_id) {
 315                 rv = -1;
 316                 goto out_unlock;
 317         }
 318         rv = idr_alloc(&ls->ls_recover_idr, r, 1, 0, GFP_NOWAIT);
 319         if (rv < 0)
 320                 goto out_unlock;
 321 
 322         r->res_id = rv;
 323         ls->ls_recover_list_count++;
 324         dlm_hold_rsb(r);
 325         rv = 0;
 326 out_unlock:
 327         spin_unlock(&ls->ls_recover_idr_lock);
 328         idr_preload_end();
 329         return rv;
 330 }
 331 
 332 static void recover_idr_del(struct dlm_rsb *r)
 333 {
 334         struct dlm_ls *ls = r->res_ls;
 335 
 336         spin_lock(&ls->ls_recover_idr_lock);
 337         idr_remove(&ls->ls_recover_idr, r->res_id);
 338         r->res_id = 0;
 339         ls->ls_recover_list_count--;
 340         spin_unlock(&ls->ls_recover_idr_lock);
 341 
 342         dlm_put_rsb(r);
 343 }
 344 
 345 static struct dlm_rsb *recover_idr_find(struct dlm_ls *ls, uint64_t id)
 346 {
 347         struct dlm_rsb *r;
 348 
 349         spin_lock(&ls->ls_recover_idr_lock);
 350         r = idr_find(&ls->ls_recover_idr, (int)id);
 351         spin_unlock(&ls->ls_recover_idr_lock);
 352         return r;
 353 }
 354 
 355 static void recover_idr_clear(struct dlm_ls *ls)
 356 {
 357         struct dlm_rsb *r;
 358         int id;
 359 
 360         spin_lock(&ls->ls_recover_idr_lock);
 361 
 362         idr_for_each_entry(&ls->ls_recover_idr, r, id) {
 363                 idr_remove(&ls->ls_recover_idr, id);
 364                 r->res_id = 0;
 365                 r->res_recover_locks_count = 0;
 366                 ls->ls_recover_list_count--;
 367 
 368                 dlm_put_rsb(r);
 369         }
 370 
 371         if (ls->ls_recover_list_count != 0) {
 372                 log_error(ls, "warning: recover_list_count %d",
 373                           ls->ls_recover_list_count);
 374                 ls->ls_recover_list_count = 0;
 375         }
 376         spin_unlock(&ls->ls_recover_idr_lock);
 377 }
 378 
 379 
 380 /* Master recovery: find new master node for rsb's that were
 381    mastered on nodes that have been removed.
 382 
 383    dlm_recover_masters
 384    recover_master
 385    dlm_send_rcom_lookup            ->  receive_rcom_lookup
 386                                        dlm_dir_lookup
 387    receive_rcom_lookup_reply       <-
 388    dlm_recover_master_reply
 389    set_new_master
 390    set_master_lkbs
 391    set_lock_master
 392 */
 393 
 394 /*
 395  * Set the lock master for all LKBs in a lock queue
 396  * If we are the new master of the rsb, we may have received new
 397  * MSTCPY locks from other nodes already which we need to ignore
 398  * when setting the new nodeid.
 399  */
 400 
 401 static void set_lock_master(struct list_head *queue, int nodeid)
 402 {
 403         struct dlm_lkb *lkb;
 404 
 405         list_for_each_entry(lkb, queue, lkb_statequeue) {
 406                 if (!(lkb->lkb_flags & DLM_IFL_MSTCPY)) {
 407                         lkb->lkb_nodeid = nodeid;
 408                         lkb->lkb_remid = 0;
 409                 }
 410         }
 411 }
 412 
 413 static void set_master_lkbs(struct dlm_rsb *r)
 414 {
 415         set_lock_master(&r->res_grantqueue, r->res_nodeid);
 416         set_lock_master(&r->res_convertqueue, r->res_nodeid);
 417         set_lock_master(&r->res_waitqueue, r->res_nodeid);
 418 }
 419 
 420 /*
 421  * Propagate the new master nodeid to locks
 422  * The NEW_MASTER flag tells dlm_recover_locks() which rsb's to consider.
 423  * The NEW_MASTER2 flag tells recover_lvb() and recover_grant() which
 424  * rsb's to consider.
 425  */
 426 
 427 static void set_new_master(struct dlm_rsb *r)
 428 {
 429         set_master_lkbs(r);
 430         rsb_set_flag(r, RSB_NEW_MASTER);
 431         rsb_set_flag(r, RSB_NEW_MASTER2);
 432 }
 433 
 434 /*
 435  * We do async lookups on rsb's that need new masters.  The rsb's
 436  * waiting for a lookup reply are kept on the recover_list.
 437  *
 438  * Another node recovering the master may have sent us a rcom lookup,
 439  * and our dlm_master_lookup() set it as the new master, along with
 440  * NEW_MASTER so that we'll recover it here (this implies dir_nodeid
 441  * equals our_nodeid below).
 442  */
 443 
 444 static int recover_master(struct dlm_rsb *r, unsigned int *count)
 445 {
 446         struct dlm_ls *ls = r->res_ls;
 447         int our_nodeid, dir_nodeid;
 448         int is_removed = 0;
 449         int error;
 450 
 451         if (is_master(r))
 452                 return 0;
 453 
 454         is_removed = dlm_is_removed(ls, r->res_nodeid);
 455 
 456         if (!is_removed && !rsb_flag(r, RSB_NEW_MASTER))
 457                 return 0;
 458 
 459         our_nodeid = dlm_our_nodeid();
 460         dir_nodeid = dlm_dir_nodeid(r);
 461 
 462         if (dir_nodeid == our_nodeid) {
 463                 if (is_removed) {
 464                         r->res_master_nodeid = our_nodeid;
 465                         r->res_nodeid = 0;
 466                 }
 467 
 468                 /* set master of lkbs to ourself when is_removed, or to
 469                    another new master which we set along with NEW_MASTER
 470                    in dlm_master_lookup */
 471                 set_new_master(r);
 472                 error = 0;
 473         } else {
 474                 recover_idr_add(r);
 475                 error = dlm_send_rcom_lookup(r, dir_nodeid);
 476         }
 477 
 478         (*count)++;
 479         return error;
 480 }
 481 
 482 /*
 483  * All MSTCPY locks are purged and rebuilt, even if the master stayed the same.
 484  * This is necessary because recovery can be started, aborted and restarted,
 485  * causing the master nodeid to briefly change during the aborted recovery, and
 486  * change back to the original value in the second recovery.  The MSTCPY locks
 487  * may or may not have been purged during the aborted recovery.  Another node
 488  * with an outstanding request in waiters list and a request reply saved in the
 489  * requestqueue, cannot know whether it should ignore the reply and resend the
 490  * request, or accept the reply and complete the request.  It must do the
 491  * former if the remote node purged MSTCPY locks, and it must do the later if
 492  * the remote node did not.  This is solved by always purging MSTCPY locks, in
 493  * which case, the request reply would always be ignored and the request
 494  * resent.
 495  */
 496 
 497 static int recover_master_static(struct dlm_rsb *r, unsigned int *count)
 498 {
 499         int dir_nodeid = dlm_dir_nodeid(r);
 500         int new_master = dir_nodeid;
 501 
 502         if (dir_nodeid == dlm_our_nodeid())
 503                 new_master = 0;
 504 
 505         dlm_purge_mstcpy_locks(r);
 506         r->res_master_nodeid = dir_nodeid;
 507         r->res_nodeid = new_master;
 508         set_new_master(r);
 509         (*count)++;
 510         return 0;
 511 }
 512 
 513 /*
 514  * Go through local root resources and for each rsb which has a master which
 515  * has departed, get the new master nodeid from the directory.  The dir will
 516  * assign mastery to the first node to look up the new master.  That means
 517  * we'll discover in this lookup if we're the new master of any rsb's.
 518  *
 519  * We fire off all the dir lookup requests individually and asynchronously to
 520  * the correct dir node.
 521  */
 522 
 523 int dlm_recover_masters(struct dlm_ls *ls)
 524 {
 525         struct dlm_rsb *r;
 526         unsigned int total = 0;
 527         unsigned int count = 0;
 528         int nodir = dlm_no_directory(ls);
 529         int error;
 530 
 531         log_rinfo(ls, "dlm_recover_masters");
 532 
 533         down_read(&ls->ls_root_sem);
 534         list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
 535                 if (dlm_recovery_stopped(ls)) {
 536                         up_read(&ls->ls_root_sem);
 537                         error = -EINTR;
 538                         goto out;
 539                 }
 540 
 541                 lock_rsb(r);
 542                 if (nodir)
 543                         error = recover_master_static(r, &count);
 544                 else
 545                         error = recover_master(r, &count);
 546                 unlock_rsb(r);
 547                 cond_resched();
 548                 total++;
 549 
 550                 if (error) {
 551                         up_read(&ls->ls_root_sem);
 552                         goto out;
 553                 }
 554         }
 555         up_read(&ls->ls_root_sem);
 556 
 557         log_rinfo(ls, "dlm_recover_masters %u of %u", count, total);
 558 
 559         error = dlm_wait_function(ls, &recover_idr_empty);
 560  out:
 561         if (error)
 562                 recover_idr_clear(ls);
 563         return error;
 564 }
 565 
 566 int dlm_recover_master_reply(struct dlm_ls *ls, struct dlm_rcom *rc)
 567 {
 568         struct dlm_rsb *r;
 569         int ret_nodeid, new_master;
 570 
 571         r = recover_idr_find(ls, rc->rc_id);
 572         if (!r) {
 573                 log_error(ls, "dlm_recover_master_reply no id %llx",
 574                           (unsigned long long)rc->rc_id);
 575                 goto out;
 576         }
 577 
 578         ret_nodeid = rc->rc_result;
 579 
 580         if (ret_nodeid == dlm_our_nodeid())
 581                 new_master = 0;
 582         else
 583                 new_master = ret_nodeid;
 584 
 585         lock_rsb(r);
 586         r->res_master_nodeid = ret_nodeid;
 587         r->res_nodeid = new_master;
 588         set_new_master(r);
 589         unlock_rsb(r);
 590         recover_idr_del(r);
 591 
 592         if (recover_idr_empty(ls))
 593                 wake_up(&ls->ls_wait_general);
 594  out:
 595         return 0;
 596 }
 597 
 598 
 599 /* Lock recovery: rebuild the process-copy locks we hold on a
 600    remastered rsb on the new rsb master.
 601 
 602    dlm_recover_locks
 603    recover_locks
 604    recover_locks_queue
 605    dlm_send_rcom_lock              ->  receive_rcom_lock
 606                                        dlm_recover_master_copy
 607    receive_rcom_lock_reply         <-
 608    dlm_recover_process_copy
 609 */
 610 
 611 
 612 /*
 613  * keep a count of the number of lkb's we send to the new master; when we get
 614  * an equal number of replies then recovery for the rsb is done
 615  */
 616 
 617 static int recover_locks_queue(struct dlm_rsb *r, struct list_head *head)
 618 {
 619         struct dlm_lkb *lkb;
 620         int error = 0;
 621 
 622         list_for_each_entry(lkb, head, lkb_statequeue) {
 623                 error = dlm_send_rcom_lock(r, lkb);
 624                 if (error)
 625                         break;
 626                 r->res_recover_locks_count++;
 627         }
 628 
 629         return error;
 630 }
 631 
 632 static int recover_locks(struct dlm_rsb *r)
 633 {
 634         int error = 0;
 635 
 636         lock_rsb(r);
 637 
 638         DLM_ASSERT(!r->res_recover_locks_count, dlm_dump_rsb(r););
 639 
 640         error = recover_locks_queue(r, &r->res_grantqueue);
 641         if (error)
 642                 goto out;
 643         error = recover_locks_queue(r, &r->res_convertqueue);
 644         if (error)
 645                 goto out;
 646         error = recover_locks_queue(r, &r->res_waitqueue);
 647         if (error)
 648                 goto out;
 649 
 650         if (r->res_recover_locks_count)
 651                 recover_list_add(r);
 652         else
 653                 rsb_clear_flag(r, RSB_NEW_MASTER);
 654  out:
 655         unlock_rsb(r);
 656         return error;
 657 }
 658 
 659 int dlm_recover_locks(struct dlm_ls *ls)
 660 {
 661         struct dlm_rsb *r;
 662         int error, count = 0;
 663 
 664         down_read(&ls->ls_root_sem);
 665         list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
 666                 if (is_master(r)) {
 667                         rsb_clear_flag(r, RSB_NEW_MASTER);
 668                         continue;
 669                 }
 670 
 671                 if (!rsb_flag(r, RSB_NEW_MASTER))
 672                         continue;
 673 
 674                 if (dlm_recovery_stopped(ls)) {
 675                         error = -EINTR;
 676                         up_read(&ls->ls_root_sem);
 677                         goto out;
 678                 }
 679 
 680                 error = recover_locks(r);
 681                 if (error) {
 682                         up_read(&ls->ls_root_sem);
 683                         goto out;
 684                 }
 685 
 686                 count += r->res_recover_locks_count;
 687         }
 688         up_read(&ls->ls_root_sem);
 689 
 690         log_rinfo(ls, "dlm_recover_locks %d out", count);
 691 
 692         error = dlm_wait_function(ls, &recover_list_empty);
 693  out:
 694         if (error)
 695                 recover_list_clear(ls);
 696         return error;
 697 }
 698 
 699 void dlm_recovered_lock(struct dlm_rsb *r)
 700 {
 701         DLM_ASSERT(rsb_flag(r, RSB_NEW_MASTER), dlm_dump_rsb(r););
 702 
 703         r->res_recover_locks_count--;
 704         if (!r->res_recover_locks_count) {
 705                 rsb_clear_flag(r, RSB_NEW_MASTER);
 706                 recover_list_del(r);
 707         }
 708 
 709         if (recover_list_empty(r->res_ls))
 710                 wake_up(&r->res_ls->ls_wait_general);
 711 }
 712 
 713 /*
 714  * The lvb needs to be recovered on all master rsb's.  This includes setting
 715  * the VALNOTVALID flag if necessary, and determining the correct lvb contents
 716  * based on the lvb's of the locks held on the rsb.
 717  *
 718  * RSB_VALNOTVALID is set in two cases:
 719  *
 720  * 1. we are master, but not new, and we purged an EX/PW lock held by a
 721  * failed node (in dlm_recover_purge which set RSB_RECOVER_LVB_INVAL)
 722  *
 723  * 2. we are a new master, and there are only NL/CR locks left.
 724  * (We could probably improve this by only invaliding in this way when
 725  * the previous master left uncleanly.  VMS docs mention that.)
 726  *
 727  * The LVB contents are only considered for changing when this is a new master
 728  * of the rsb (NEW_MASTER2).  Then, the rsb's lvb is taken from any lkb with
 729  * mode > CR.  If no lkb's exist with mode above CR, the lvb contents are taken
 730  * from the lkb with the largest lvb sequence number.
 731  */
 732 
 733 static void recover_lvb(struct dlm_rsb *r)
 734 {
 735         struct dlm_lkb *lkb, *high_lkb = NULL;
 736         uint32_t high_seq = 0;
 737         int lock_lvb_exists = 0;
 738         int big_lock_exists = 0;
 739         int lvblen = r->res_ls->ls_lvblen;
 740 
 741         if (!rsb_flag(r, RSB_NEW_MASTER2) &&
 742             rsb_flag(r, RSB_RECOVER_LVB_INVAL)) {
 743                 /* case 1 above */
 744                 rsb_set_flag(r, RSB_VALNOTVALID);
 745                 return;
 746         }
 747 
 748         if (!rsb_flag(r, RSB_NEW_MASTER2))
 749                 return;
 750 
 751         /* we are the new master, so figure out if VALNOTVALID should
 752            be set, and set the rsb lvb from the best lkb available. */
 753 
 754         list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue) {
 755                 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
 756                         continue;
 757 
 758                 lock_lvb_exists = 1;
 759 
 760                 if (lkb->lkb_grmode > DLM_LOCK_CR) {
 761                         big_lock_exists = 1;
 762                         goto setflag;
 763                 }
 764 
 765                 if (((int)lkb->lkb_lvbseq - (int)high_seq) >= 0) {
 766                         high_lkb = lkb;
 767                         high_seq = lkb->lkb_lvbseq;
 768                 }
 769         }
 770 
 771         list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue) {
 772                 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
 773                         continue;
 774 
 775                 lock_lvb_exists = 1;
 776 
 777                 if (lkb->lkb_grmode > DLM_LOCK_CR) {
 778                         big_lock_exists = 1;
 779                         goto setflag;
 780                 }
 781 
 782                 if (((int)lkb->lkb_lvbseq - (int)high_seq) >= 0) {
 783                         high_lkb = lkb;
 784                         high_seq = lkb->lkb_lvbseq;
 785                 }
 786         }
 787 
 788  setflag:
 789         if (!lock_lvb_exists)
 790                 goto out;
 791 
 792         /* lvb is invalidated if only NL/CR locks remain */
 793         if (!big_lock_exists)
 794                 rsb_set_flag(r, RSB_VALNOTVALID);
 795 
 796         if (!r->res_lvbptr) {
 797                 r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
 798                 if (!r->res_lvbptr)
 799                         goto out;
 800         }
 801 
 802         if (big_lock_exists) {
 803                 r->res_lvbseq = lkb->lkb_lvbseq;
 804                 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, lvblen);
 805         } else if (high_lkb) {
 806                 r->res_lvbseq = high_lkb->lkb_lvbseq;
 807                 memcpy(r->res_lvbptr, high_lkb->lkb_lvbptr, lvblen);
 808         } else {
 809                 r->res_lvbseq = 0;
 810                 memset(r->res_lvbptr, 0, lvblen);
 811         }
 812  out:
 813         return;
 814 }
 815 
 816 /* All master rsb's flagged RECOVER_CONVERT need to be looked at.  The locks
 817    converting PR->CW or CW->PR need to have their lkb_grmode set. */
 818 
 819 static void recover_conversion(struct dlm_rsb *r)
 820 {
 821         struct dlm_ls *ls = r->res_ls;
 822         struct dlm_lkb *lkb;
 823         int grmode = -1;
 824 
 825         list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue) {
 826                 if (lkb->lkb_grmode == DLM_LOCK_PR ||
 827                     lkb->lkb_grmode == DLM_LOCK_CW) {
 828                         grmode = lkb->lkb_grmode;
 829                         break;
 830                 }
 831         }
 832 
 833         list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue) {
 834                 if (lkb->lkb_grmode != DLM_LOCK_IV)
 835                         continue;
 836                 if (grmode == -1) {
 837                         log_debug(ls, "recover_conversion %x set gr to rq %d",
 838                                   lkb->lkb_id, lkb->lkb_rqmode);
 839                         lkb->lkb_grmode = lkb->lkb_rqmode;
 840                 } else {
 841                         log_debug(ls, "recover_conversion %x set gr %d",
 842                                   lkb->lkb_id, grmode);
 843                         lkb->lkb_grmode = grmode;
 844                 }
 845         }
 846 }
 847 
 848 /* We've become the new master for this rsb and waiting/converting locks may
 849    need to be granted in dlm_recover_grant() due to locks that may have
 850    existed from a removed node. */
 851 
 852 static void recover_grant(struct dlm_rsb *r)
 853 {
 854         if (!list_empty(&r->res_waitqueue) || !list_empty(&r->res_convertqueue))
 855                 rsb_set_flag(r, RSB_RECOVER_GRANT);
 856 }
 857 
 858 void dlm_recover_rsbs(struct dlm_ls *ls)
 859 {
 860         struct dlm_rsb *r;
 861         unsigned int count = 0;
 862 
 863         down_read(&ls->ls_root_sem);
 864         list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
 865                 lock_rsb(r);
 866                 if (is_master(r)) {
 867                         if (rsb_flag(r, RSB_RECOVER_CONVERT))
 868                                 recover_conversion(r);
 869 
 870                         /* recover lvb before granting locks so the updated
 871                            lvb/VALNOTVALID is presented in the completion */
 872                         recover_lvb(r);
 873 
 874                         if (rsb_flag(r, RSB_NEW_MASTER2))
 875                                 recover_grant(r);
 876                         count++;
 877                 } else {
 878                         rsb_clear_flag(r, RSB_VALNOTVALID);
 879                 }
 880                 rsb_clear_flag(r, RSB_RECOVER_CONVERT);
 881                 rsb_clear_flag(r, RSB_RECOVER_LVB_INVAL);
 882                 rsb_clear_flag(r, RSB_NEW_MASTER2);
 883                 unlock_rsb(r);
 884         }
 885         up_read(&ls->ls_root_sem);
 886 
 887         if (count)
 888                 log_rinfo(ls, "dlm_recover_rsbs %d done", count);
 889 }
 890 
 891 /* Create a single list of all root rsb's to be used during recovery */
 892 
 893 int dlm_create_root_list(struct dlm_ls *ls)
 894 {
 895         struct rb_node *n;
 896         struct dlm_rsb *r;
 897         int i, error = 0;
 898 
 899         down_write(&ls->ls_root_sem);
 900         if (!list_empty(&ls->ls_root_list)) {
 901                 log_error(ls, "root list not empty");
 902                 error = -EINVAL;
 903                 goto out;
 904         }
 905 
 906         for (i = 0; i < ls->ls_rsbtbl_size; i++) {
 907                 spin_lock(&ls->ls_rsbtbl[i].lock);
 908                 for (n = rb_first(&ls->ls_rsbtbl[i].keep); n; n = rb_next(n)) {
 909                         r = rb_entry(n, struct dlm_rsb, res_hashnode);
 910                         list_add(&r->res_root_list, &ls->ls_root_list);
 911                         dlm_hold_rsb(r);
 912                 }
 913 
 914                 if (!RB_EMPTY_ROOT(&ls->ls_rsbtbl[i].toss))
 915                         log_error(ls, "dlm_create_root_list toss not empty");
 916                 spin_unlock(&ls->ls_rsbtbl[i].lock);
 917         }
 918  out:
 919         up_write(&ls->ls_root_sem);
 920         return error;
 921 }
 922 
 923 void dlm_release_root_list(struct dlm_ls *ls)
 924 {
 925         struct dlm_rsb *r, *safe;
 926 
 927         down_write(&ls->ls_root_sem);
 928         list_for_each_entry_safe(r, safe, &ls->ls_root_list, res_root_list) {
 929                 list_del_init(&r->res_root_list);
 930                 dlm_put_rsb(r);
 931         }
 932         up_write(&ls->ls_root_sem);
 933 }
 934 
 935 void dlm_clear_toss(struct dlm_ls *ls)
 936 {
 937         struct rb_node *n, *next;
 938         struct dlm_rsb *r;
 939         unsigned int count = 0;
 940         int i;
 941 
 942         for (i = 0; i < ls->ls_rsbtbl_size; i++) {
 943                 spin_lock(&ls->ls_rsbtbl[i].lock);
 944                 for (n = rb_first(&ls->ls_rsbtbl[i].toss); n; n = next) {
 945                         next = rb_next(n);
 946                         r = rb_entry(n, struct dlm_rsb, res_hashnode);
 947                         rb_erase(n, &ls->ls_rsbtbl[i].toss);
 948                         dlm_free_rsb(r);
 949                         count++;
 950                 }
 951                 spin_unlock(&ls->ls_rsbtbl[i].lock);
 952         }
 953 
 954         if (count)
 955                 log_rinfo(ls, "dlm_clear_toss %u done", count);
 956 }
 957 

/* [<][>][^][v][top][bottom][index][help] */