root/fs/ocfs2/dlm/dlmthread.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. __dlm_wait_on_lockres_flags
  2. __dlm_lockres_has_locks
  3. __dlm_lockres_unused
  4. __dlm_lockres_calc_usage
  5. dlm_lockres_calc_usage
  6. __dlm_do_purge_lockres
  7. dlm_purge_lockres
  8. dlm_run_purge_list
  9. dlm_shuffle_lists
  10. dlm_kick_thread
  11. __dlm_dirty_lockres
  12. dlm_launch_thread
  13. dlm_complete_thread
  14. dlm_dirty_list_empty
  15. dlm_flush_asts
  16. dlm_thread

   1 // SPDX-License-Identifier: GPL-2.0-or-later
   2 /* -*- mode: c; c-basic-offset: 8; -*-
   3  * vim: noexpandtab sw=8 ts=8 sts=0:
   4  *
   5  * dlmthread.c
   6  *
   7  * standalone DLM module
   8  *
   9  * Copyright (C) 2004 Oracle.  All rights reserved.
  10  */
  11 
  12 
  13 #include <linux/module.h>
  14 #include <linux/fs.h>
  15 #include <linux/types.h>
  16 #include <linux/highmem.h>
  17 #include <linux/init.h>
  18 #include <linux/sysctl.h>
  19 #include <linux/random.h>
  20 #include <linux/blkdev.h>
  21 #include <linux/socket.h>
  22 #include <linux/inet.h>
  23 #include <linux/timer.h>
  24 #include <linux/kthread.h>
  25 #include <linux/delay.h>
  26 
  27 
  28 #include "../cluster/heartbeat.h"
  29 #include "../cluster/nodemanager.h"
  30 #include "../cluster/tcp.h"
  31 
  32 #include "dlmapi.h"
  33 #include "dlmcommon.h"
  34 #include "dlmdomain.h"
  35 
  36 #define MLOG_MASK_PREFIX (ML_DLM|ML_DLM_THREAD)
  37 #include "../cluster/masklog.h"
  38 
  39 static int dlm_thread(void *data);
  40 static void dlm_flush_asts(struct dlm_ctxt *dlm);
  41 
  42 #define dlm_lock_is_remote(dlm, lock)     ((lock)->ml.node != (dlm)->node_num)
  43 
  44 /* will exit holding res->spinlock, but may drop in function */
  45 /* waits until flags are cleared on res->state */
  46 void __dlm_wait_on_lockres_flags(struct dlm_lock_resource *res, int flags)
  47 {
  48         DECLARE_WAITQUEUE(wait, current);
  49 
  50         assert_spin_locked(&res->spinlock);
  51 
  52         add_wait_queue(&res->wq, &wait);
  53 repeat:
  54         set_current_state(TASK_UNINTERRUPTIBLE);
  55         if (res->state & flags) {
  56                 spin_unlock(&res->spinlock);
  57                 schedule();
  58                 spin_lock(&res->spinlock);
  59                 goto repeat;
  60         }
  61         remove_wait_queue(&res->wq, &wait);
  62         __set_current_state(TASK_RUNNING);
  63 }
  64 
  65 int __dlm_lockres_has_locks(struct dlm_lock_resource *res)
  66 {
  67         if (list_empty(&res->granted) &&
  68             list_empty(&res->converting) &&
  69             list_empty(&res->blocked))
  70                 return 0;
  71         return 1;
  72 }
  73 
  74 /* "unused": the lockres has no locks, is not on the dirty list,
  75  * has no inflight locks (in the gap between mastery and acquiring
  76  * the first lock), and has no bits in its refmap.
  77  * truly ready to be freed. */
  78 int __dlm_lockres_unused(struct dlm_lock_resource *res)
  79 {
  80         int bit;
  81 
  82         assert_spin_locked(&res->spinlock);
  83 
  84         if (__dlm_lockres_has_locks(res))
  85                 return 0;
  86 
  87         /* Locks are in the process of being created */
  88         if (res->inflight_locks)
  89                 return 0;
  90 
  91         if (!list_empty(&res->dirty) || res->state & DLM_LOCK_RES_DIRTY)
  92                 return 0;
  93 
  94         if (res->state & (DLM_LOCK_RES_RECOVERING|
  95                         DLM_LOCK_RES_RECOVERY_WAITING))
  96                 return 0;
  97 
  98         /* Another node has this resource with this node as the master */
  99         bit = find_next_bit(res->refmap, O2NM_MAX_NODES, 0);
 100         if (bit < O2NM_MAX_NODES)
 101                 return 0;
 102 
 103         return 1;
 104 }
 105 
 106 
 107 /* Call whenever you may have added or deleted something from one of
 108  * the lockres queue's. This will figure out whether it belongs on the
 109  * unused list or not and does the appropriate thing. */
 110 void __dlm_lockres_calc_usage(struct dlm_ctxt *dlm,
 111                               struct dlm_lock_resource *res)
 112 {
 113         assert_spin_locked(&dlm->spinlock);
 114         assert_spin_locked(&res->spinlock);
 115 
 116         if (__dlm_lockres_unused(res)){
 117                 if (list_empty(&res->purge)) {
 118                         mlog(0, "%s: Adding res %.*s to purge list\n",
 119                              dlm->name, res->lockname.len, res->lockname.name);
 120 
 121                         res->last_used = jiffies;
 122                         dlm_lockres_get(res);
 123                         list_add_tail(&res->purge, &dlm->purge_list);
 124                         dlm->purge_count++;
 125                 }
 126         } else if (!list_empty(&res->purge)) {
 127                 mlog(0, "%s: Removing res %.*s from purge list\n",
 128                      dlm->name, res->lockname.len, res->lockname.name);
 129 
 130                 list_del_init(&res->purge);
 131                 dlm_lockres_put(res);
 132                 dlm->purge_count--;
 133         }
 134 }
 135 
 136 void dlm_lockres_calc_usage(struct dlm_ctxt *dlm,
 137                             struct dlm_lock_resource *res)
 138 {
 139         spin_lock(&dlm->spinlock);
 140         spin_lock(&res->spinlock);
 141 
 142         __dlm_lockres_calc_usage(dlm, res);
 143 
 144         spin_unlock(&res->spinlock);
 145         spin_unlock(&dlm->spinlock);
 146 }
 147 
 148 /*
 149  * Do the real purge work:
 150  *     unhash the lockres, and
 151  *     clear flag DLM_LOCK_RES_DROPPING_REF.
 152  * It requires dlm and lockres spinlock to be taken.
 153  */
 154 void __dlm_do_purge_lockres(struct dlm_ctxt *dlm,
 155                 struct dlm_lock_resource *res)
 156 {
 157         assert_spin_locked(&dlm->spinlock);
 158         assert_spin_locked(&res->spinlock);
 159 
 160         if (!list_empty(&res->purge)) {
 161                 mlog(0, "%s: Removing res %.*s from purgelist\n",
 162                      dlm->name, res->lockname.len, res->lockname.name);
 163                 list_del_init(&res->purge);
 164                 dlm_lockres_put(res);
 165                 dlm->purge_count--;
 166         }
 167 
 168         if (!__dlm_lockres_unused(res)) {
 169                 mlog(ML_ERROR, "%s: res %.*s in use after deref\n",
 170                      dlm->name, res->lockname.len, res->lockname.name);
 171                 __dlm_print_one_lock_resource(res);
 172                 BUG();
 173         }
 174 
 175         __dlm_unhash_lockres(dlm, res);
 176 
 177         spin_lock(&dlm->track_lock);
 178         if (!list_empty(&res->tracking))
 179                 list_del_init(&res->tracking);
 180         else {
 181                 mlog(ML_ERROR, "%s: Resource %.*s not on the Tracking list\n",
 182                      dlm->name, res->lockname.len, res->lockname.name);
 183                 __dlm_print_one_lock_resource(res);
 184         }
 185         spin_unlock(&dlm->track_lock);
 186 
 187         /*
 188          * lockres is not in the hash now. drop the flag and wake up
 189          * any processes waiting in dlm_get_lock_resource.
 190          */
 191         res->state &= ~DLM_LOCK_RES_DROPPING_REF;
 192 }
 193 
 194 static void dlm_purge_lockres(struct dlm_ctxt *dlm,
 195                              struct dlm_lock_resource *res)
 196 {
 197         int master;
 198         int ret = 0;
 199 
 200         assert_spin_locked(&dlm->spinlock);
 201         assert_spin_locked(&res->spinlock);
 202 
 203         master = (res->owner == dlm->node_num);
 204 
 205         mlog(0, "%s: Purging res %.*s, master %d\n", dlm->name,
 206              res->lockname.len, res->lockname.name, master);
 207 
 208         if (!master) {
 209                 if (res->state & DLM_LOCK_RES_DROPPING_REF) {
 210                         mlog(ML_NOTICE, "%s: res %.*s already in DLM_LOCK_RES_DROPPING_REF state\n",
 211                                 dlm->name, res->lockname.len, res->lockname.name);
 212                         spin_unlock(&res->spinlock);
 213                         return;
 214                 }
 215 
 216                 res->state |= DLM_LOCK_RES_DROPPING_REF;
 217                 /* drop spinlock...  retake below */
 218                 spin_unlock(&res->spinlock);
 219                 spin_unlock(&dlm->spinlock);
 220 
 221                 spin_lock(&res->spinlock);
 222                 /* This ensures that clear refmap is sent after the set */
 223                 __dlm_wait_on_lockres_flags(res, DLM_LOCK_RES_SETREF_INPROG);
 224                 spin_unlock(&res->spinlock);
 225 
 226                 /* clear our bit from the master's refmap, ignore errors */
 227                 ret = dlm_drop_lockres_ref(dlm, res);
 228                 if (ret < 0) {
 229                         if (!dlm_is_host_down(ret))
 230                                 BUG();
 231                 }
 232                 spin_lock(&dlm->spinlock);
 233                 spin_lock(&res->spinlock);
 234         }
 235 
 236         if (!list_empty(&res->purge)) {
 237                 mlog(0, "%s: Removing res %.*s from purgelist, master %d\n",
 238                      dlm->name, res->lockname.len, res->lockname.name, master);
 239                 list_del_init(&res->purge);
 240                 dlm_lockres_put(res);
 241                 dlm->purge_count--;
 242         }
 243 
 244         if (!master && ret == DLM_DEREF_RESPONSE_INPROG) {
 245                 mlog(0, "%s: deref %.*s in progress\n",
 246                         dlm->name, res->lockname.len, res->lockname.name);
 247                 spin_unlock(&res->spinlock);
 248                 return;
 249         }
 250 
 251         if (!__dlm_lockres_unused(res)) {
 252                 mlog(ML_ERROR, "%s: res %.*s in use after deref\n",
 253                      dlm->name, res->lockname.len, res->lockname.name);
 254                 __dlm_print_one_lock_resource(res);
 255                 BUG();
 256         }
 257 
 258         __dlm_unhash_lockres(dlm, res);
 259 
 260         spin_lock(&dlm->track_lock);
 261         if (!list_empty(&res->tracking))
 262                 list_del_init(&res->tracking);
 263         else {
 264                 mlog(ML_ERROR, "Resource %.*s not on the Tracking list\n",
 265                                 res->lockname.len, res->lockname.name);
 266                 __dlm_print_one_lock_resource(res);
 267         }
 268         spin_unlock(&dlm->track_lock);
 269 
 270         /* lockres is not in the hash now.  drop the flag and wake up
 271          * any processes waiting in dlm_get_lock_resource. */
 272         if (!master) {
 273                 res->state &= ~DLM_LOCK_RES_DROPPING_REF;
 274                 spin_unlock(&res->spinlock);
 275                 wake_up(&res->wq);
 276         } else
 277                 spin_unlock(&res->spinlock);
 278 }
 279 
 280 static void dlm_run_purge_list(struct dlm_ctxt *dlm,
 281                                int purge_now)
 282 {
 283         unsigned int run_max, unused;
 284         unsigned long purge_jiffies;
 285         struct dlm_lock_resource *lockres;
 286 
 287         spin_lock(&dlm->spinlock);
 288         run_max = dlm->purge_count;
 289 
 290         while(run_max && !list_empty(&dlm->purge_list)) {
 291                 run_max--;
 292 
 293                 lockres = list_entry(dlm->purge_list.next,
 294                                      struct dlm_lock_resource, purge);
 295 
 296                 spin_lock(&lockres->spinlock);
 297 
 298                 purge_jiffies = lockres->last_used +
 299                         msecs_to_jiffies(DLM_PURGE_INTERVAL_MS);
 300 
 301                 /* Make sure that we want to be processing this guy at
 302                  * this time. */
 303                 if (!purge_now && time_after(purge_jiffies, jiffies)) {
 304                         /* Since resources are added to the purge list
 305                          * in tail order, we can stop at the first
 306                          * unpurgable resource -- anyone added after
 307                          * him will have a greater last_used value */
 308                         spin_unlock(&lockres->spinlock);
 309                         break;
 310                 }
 311 
 312                 /* Status of the lockres *might* change so double
 313                  * check. If the lockres is unused, holding the dlm
 314                  * spinlock will prevent people from getting and more
 315                  * refs on it. */
 316                 unused = __dlm_lockres_unused(lockres);
 317                 if (!unused ||
 318                     (lockres->state & DLM_LOCK_RES_MIGRATING) ||
 319                     (lockres->inflight_assert_workers != 0)) {
 320                         mlog(0, "%s: res %.*s is in use or being remastered, "
 321                              "used %d, state %d, assert master workers %u\n",
 322                              dlm->name, lockres->lockname.len,
 323                              lockres->lockname.name,
 324                              !unused, lockres->state,
 325                              lockres->inflight_assert_workers);
 326                         list_move_tail(&lockres->purge, &dlm->purge_list);
 327                         spin_unlock(&lockres->spinlock);
 328                         continue;
 329                 }
 330 
 331                 dlm_lockres_get(lockres);
 332 
 333                 dlm_purge_lockres(dlm, lockres);
 334 
 335                 dlm_lockres_put(lockres);
 336 
 337                 /* Avoid adding any scheduling latencies */
 338                 cond_resched_lock(&dlm->spinlock);
 339         }
 340 
 341         spin_unlock(&dlm->spinlock);
 342 }
 343 
 344 static void dlm_shuffle_lists(struct dlm_ctxt *dlm,
 345                               struct dlm_lock_resource *res)
 346 {
 347         struct dlm_lock *lock, *target;
 348         int can_grant = 1;
 349 
 350         /*
 351          * Because this function is called with the lockres
 352          * spinlock, and because we know that it is not migrating/
 353          * recovering/in-progress, it is fine to reserve asts and
 354          * basts right before queueing them all throughout
 355          */
 356         assert_spin_locked(&dlm->ast_lock);
 357         assert_spin_locked(&res->spinlock);
 358         BUG_ON((res->state & (DLM_LOCK_RES_MIGRATING|
 359                               DLM_LOCK_RES_RECOVERING|
 360                               DLM_LOCK_RES_IN_PROGRESS)));
 361 
 362 converting:
 363         if (list_empty(&res->converting))
 364                 goto blocked;
 365         mlog(0, "%s: res %.*s has locks on the convert queue\n", dlm->name,
 366              res->lockname.len, res->lockname.name);
 367 
 368         target = list_entry(res->converting.next, struct dlm_lock, list);
 369         if (target->ml.convert_type == LKM_IVMODE) {
 370                 mlog(ML_ERROR, "%s: res %.*s converting lock to invalid mode\n",
 371                      dlm->name, res->lockname.len, res->lockname.name);
 372                 BUG();
 373         }
 374         list_for_each_entry(lock, &res->granted, list) {
 375                 if (lock==target)
 376                         continue;
 377                 if (!dlm_lock_compatible(lock->ml.type,
 378                                          target->ml.convert_type)) {
 379                         can_grant = 0;
 380                         /* queue the BAST if not already */
 381                         if (lock->ml.highest_blocked == LKM_IVMODE) {
 382                                 __dlm_lockres_reserve_ast(res);
 383                                 __dlm_queue_bast(dlm, lock);
 384                         }
 385                         /* update the highest_blocked if needed */
 386                         if (lock->ml.highest_blocked < target->ml.convert_type)
 387                                 lock->ml.highest_blocked =
 388                                         target->ml.convert_type;
 389                 }
 390         }
 391 
 392         list_for_each_entry(lock, &res->converting, list) {
 393                 if (lock==target)
 394                         continue;
 395                 if (!dlm_lock_compatible(lock->ml.type,
 396                                          target->ml.convert_type)) {
 397                         can_grant = 0;
 398                         if (lock->ml.highest_blocked == LKM_IVMODE) {
 399                                 __dlm_lockres_reserve_ast(res);
 400                                 __dlm_queue_bast(dlm, lock);
 401                         }
 402                         if (lock->ml.highest_blocked < target->ml.convert_type)
 403                                 lock->ml.highest_blocked =
 404                                         target->ml.convert_type;
 405                 }
 406         }
 407 
 408         /* we can convert the lock */
 409         if (can_grant) {
 410                 spin_lock(&target->spinlock);
 411                 BUG_ON(target->ml.highest_blocked != LKM_IVMODE);
 412 
 413                 mlog(0, "%s: res %.*s, AST for Converting lock %u:%llu, type "
 414                      "%d => %d, node %u\n", dlm->name, res->lockname.len,
 415                      res->lockname.name,
 416                      dlm_get_lock_cookie_node(be64_to_cpu(target->ml.cookie)),
 417                      dlm_get_lock_cookie_seq(be64_to_cpu(target->ml.cookie)),
 418                      target->ml.type,
 419                      target->ml.convert_type, target->ml.node);
 420 
 421                 target->ml.type = target->ml.convert_type;
 422                 target->ml.convert_type = LKM_IVMODE;
 423                 list_move_tail(&target->list, &res->granted);
 424 
 425                 BUG_ON(!target->lksb);
 426                 target->lksb->status = DLM_NORMAL;
 427 
 428                 spin_unlock(&target->spinlock);
 429 
 430                 __dlm_lockres_reserve_ast(res);
 431                 __dlm_queue_ast(dlm, target);
 432                 /* go back and check for more */
 433                 goto converting;
 434         }
 435 
 436 blocked:
 437         if (list_empty(&res->blocked))
 438                 goto leave;
 439         target = list_entry(res->blocked.next, struct dlm_lock, list);
 440 
 441         list_for_each_entry(lock, &res->granted, list) {
 442                 if (lock==target)
 443                         continue;
 444                 if (!dlm_lock_compatible(lock->ml.type, target->ml.type)) {
 445                         can_grant = 0;
 446                         if (lock->ml.highest_blocked == LKM_IVMODE) {
 447                                 __dlm_lockres_reserve_ast(res);
 448                                 __dlm_queue_bast(dlm, lock);
 449                         }
 450                         if (lock->ml.highest_blocked < target->ml.type)
 451                                 lock->ml.highest_blocked = target->ml.type;
 452                 }
 453         }
 454 
 455         list_for_each_entry(lock, &res->converting, list) {
 456                 if (lock==target)
 457                         continue;
 458                 if (!dlm_lock_compatible(lock->ml.type, target->ml.type)) {
 459                         can_grant = 0;
 460                         if (lock->ml.highest_blocked == LKM_IVMODE) {
 461                                 __dlm_lockres_reserve_ast(res);
 462                                 __dlm_queue_bast(dlm, lock);
 463                         }
 464                         if (lock->ml.highest_blocked < target->ml.type)
 465                                 lock->ml.highest_blocked = target->ml.type;
 466                 }
 467         }
 468 
 469         /* we can grant the blocked lock (only
 470          * possible if converting list empty) */
 471         if (can_grant) {
 472                 spin_lock(&target->spinlock);
 473                 BUG_ON(target->ml.highest_blocked != LKM_IVMODE);
 474 
 475                 mlog(0, "%s: res %.*s, AST for Blocked lock %u:%llu, type %d, "
 476                      "node %u\n", dlm->name, res->lockname.len,
 477                      res->lockname.name,
 478                      dlm_get_lock_cookie_node(be64_to_cpu(target->ml.cookie)),
 479                      dlm_get_lock_cookie_seq(be64_to_cpu(target->ml.cookie)),
 480                      target->ml.type, target->ml.node);
 481 
 482                 /* target->ml.type is already correct */
 483                 list_move_tail(&target->list, &res->granted);
 484 
 485                 BUG_ON(!target->lksb);
 486                 target->lksb->status = DLM_NORMAL;
 487 
 488                 spin_unlock(&target->spinlock);
 489 
 490                 __dlm_lockres_reserve_ast(res);
 491                 __dlm_queue_ast(dlm, target);
 492                 /* go back and check for more */
 493                 goto converting;
 494         }
 495 
 496 leave:
 497         return;
 498 }
 499 
 500 /* must have NO locks when calling this with res !=NULL * */
 501 void dlm_kick_thread(struct dlm_ctxt *dlm, struct dlm_lock_resource *res)
 502 {
 503         if (res) {
 504                 spin_lock(&dlm->spinlock);
 505                 spin_lock(&res->spinlock);
 506                 __dlm_dirty_lockres(dlm, res);
 507                 spin_unlock(&res->spinlock);
 508                 spin_unlock(&dlm->spinlock);
 509         }
 510         wake_up(&dlm->dlm_thread_wq);
 511 }
 512 
 513 void __dlm_dirty_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res)
 514 {
 515         assert_spin_locked(&dlm->spinlock);
 516         assert_spin_locked(&res->spinlock);
 517 
 518         /* don't shuffle secondary queues */
 519         if (res->owner == dlm->node_num) {
 520                 if (res->state & (DLM_LOCK_RES_MIGRATING |
 521                                   DLM_LOCK_RES_BLOCK_DIRTY))
 522                     return;
 523 
 524                 if (list_empty(&res->dirty)) {
 525                         /* ref for dirty_list */
 526                         dlm_lockres_get(res);
 527                         list_add_tail(&res->dirty, &dlm->dirty_list);
 528                         res->state |= DLM_LOCK_RES_DIRTY;
 529                 }
 530         }
 531 
 532         mlog(0, "%s: res %.*s\n", dlm->name, res->lockname.len,
 533              res->lockname.name);
 534 }
 535 
 536 
 537 /* Launch the NM thread for the mounted volume */
 538 int dlm_launch_thread(struct dlm_ctxt *dlm)
 539 {
 540         mlog(0, "Starting dlm_thread...\n");
 541 
 542         dlm->dlm_thread_task = kthread_run(dlm_thread, dlm, "dlm-%s",
 543                         dlm->name);
 544         if (IS_ERR(dlm->dlm_thread_task)) {
 545                 mlog_errno(PTR_ERR(dlm->dlm_thread_task));
 546                 dlm->dlm_thread_task = NULL;
 547                 return -EINVAL;
 548         }
 549 
 550         return 0;
 551 }
 552 
 553 void dlm_complete_thread(struct dlm_ctxt *dlm)
 554 {
 555         if (dlm->dlm_thread_task) {
 556                 mlog(ML_KTHREAD, "Waiting for dlm thread to exit\n");
 557                 kthread_stop(dlm->dlm_thread_task);
 558                 dlm->dlm_thread_task = NULL;
 559         }
 560 }
 561 
 562 static int dlm_dirty_list_empty(struct dlm_ctxt *dlm)
 563 {
 564         int empty;
 565 
 566         spin_lock(&dlm->spinlock);
 567         empty = list_empty(&dlm->dirty_list);
 568         spin_unlock(&dlm->spinlock);
 569 
 570         return empty;
 571 }
 572 
 573 static void dlm_flush_asts(struct dlm_ctxt *dlm)
 574 {
 575         int ret;
 576         struct dlm_lock *lock;
 577         struct dlm_lock_resource *res;
 578         u8 hi;
 579 
 580         spin_lock(&dlm->ast_lock);
 581         while (!list_empty(&dlm->pending_asts)) {
 582                 lock = list_entry(dlm->pending_asts.next,
 583                                   struct dlm_lock, ast_list);
 584                 /* get an extra ref on lock */
 585                 dlm_lock_get(lock);
 586                 res = lock->lockres;
 587                 mlog(0, "%s: res %.*s, Flush AST for lock %u:%llu, type %d, "
 588                      "node %u\n", dlm->name, res->lockname.len,
 589                      res->lockname.name,
 590                      dlm_get_lock_cookie_node(be64_to_cpu(lock->ml.cookie)),
 591                      dlm_get_lock_cookie_seq(be64_to_cpu(lock->ml.cookie)),
 592                      lock->ml.type, lock->ml.node);
 593 
 594                 BUG_ON(!lock->ast_pending);
 595 
 596                 /* remove from list (including ref) */
 597                 list_del_init(&lock->ast_list);
 598                 dlm_lock_put(lock);
 599                 spin_unlock(&dlm->ast_lock);
 600 
 601                 if (lock->ml.node != dlm->node_num) {
 602                         ret = dlm_do_remote_ast(dlm, res, lock);
 603                         if (ret < 0)
 604                                 mlog_errno(ret);
 605                 } else
 606                         dlm_do_local_ast(dlm, res, lock);
 607 
 608                 spin_lock(&dlm->ast_lock);
 609 
 610                 /* possible that another ast was queued while
 611                  * we were delivering the last one */
 612                 if (!list_empty(&lock->ast_list)) {
 613                         mlog(0, "%s: res %.*s, AST queued while flushing last "
 614                              "one\n", dlm->name, res->lockname.len,
 615                              res->lockname.name);
 616                 } else
 617                         lock->ast_pending = 0;
 618 
 619                 /* drop the extra ref.
 620                  * this may drop it completely. */
 621                 dlm_lock_put(lock);
 622                 dlm_lockres_release_ast(dlm, res);
 623         }
 624 
 625         while (!list_empty(&dlm->pending_basts)) {
 626                 lock = list_entry(dlm->pending_basts.next,
 627                                   struct dlm_lock, bast_list);
 628                 /* get an extra ref on lock */
 629                 dlm_lock_get(lock);
 630                 res = lock->lockres;
 631 
 632                 BUG_ON(!lock->bast_pending);
 633 
 634                 /* get the highest blocked lock, and reset */
 635                 spin_lock(&lock->spinlock);
 636                 BUG_ON(lock->ml.highest_blocked <= LKM_IVMODE);
 637                 hi = lock->ml.highest_blocked;
 638                 lock->ml.highest_blocked = LKM_IVMODE;
 639                 spin_unlock(&lock->spinlock);
 640 
 641                 /* remove from list (including ref) */
 642                 list_del_init(&lock->bast_list);
 643                 dlm_lock_put(lock);
 644                 spin_unlock(&dlm->ast_lock);
 645 
 646                 mlog(0, "%s: res %.*s, Flush BAST for lock %u:%llu, "
 647                      "blocked %d, node %u\n",
 648                      dlm->name, res->lockname.len, res->lockname.name,
 649                      dlm_get_lock_cookie_node(be64_to_cpu(lock->ml.cookie)),
 650                      dlm_get_lock_cookie_seq(be64_to_cpu(lock->ml.cookie)),
 651                      hi, lock->ml.node);
 652 
 653                 if (lock->ml.node != dlm->node_num) {
 654                         ret = dlm_send_proxy_bast(dlm, res, lock, hi);
 655                         if (ret < 0)
 656                                 mlog_errno(ret);
 657                 } else
 658                         dlm_do_local_bast(dlm, res, lock, hi);
 659 
 660                 spin_lock(&dlm->ast_lock);
 661 
 662                 /* possible that another bast was queued while
 663                  * we were delivering the last one */
 664                 if (!list_empty(&lock->bast_list)) {
 665                         mlog(0, "%s: res %.*s, BAST queued while flushing last "
 666                              "one\n", dlm->name, res->lockname.len,
 667                              res->lockname.name);
 668                 } else
 669                         lock->bast_pending = 0;
 670 
 671                 /* drop the extra ref.
 672                  * this may drop it completely. */
 673                 dlm_lock_put(lock);
 674                 dlm_lockres_release_ast(dlm, res);
 675         }
 676         wake_up(&dlm->ast_wq);
 677         spin_unlock(&dlm->ast_lock);
 678 }
 679 
 680 
 681 #define DLM_THREAD_TIMEOUT_MS (4 * 1000)
 682 #define DLM_THREAD_MAX_DIRTY  100
 683 #define DLM_THREAD_MAX_ASTS   10
 684 
 685 static int dlm_thread(void *data)
 686 {
 687         struct dlm_lock_resource *res;
 688         struct dlm_ctxt *dlm = data;
 689         unsigned long timeout = msecs_to_jiffies(DLM_THREAD_TIMEOUT_MS);
 690 
 691         mlog(0, "dlm thread running for %s...\n", dlm->name);
 692 
 693         while (!kthread_should_stop()) {
 694                 int n = DLM_THREAD_MAX_DIRTY;
 695 
 696                 /* dlm_shutting_down is very point-in-time, but that
 697                  * doesn't matter as we'll just loop back around if we
 698                  * get false on the leading edge of a state
 699                  * transition. */
 700                 dlm_run_purge_list(dlm, dlm_shutting_down(dlm));
 701 
 702                 /* We really don't want to hold dlm->spinlock while
 703                  * calling dlm_shuffle_lists on each lockres that
 704                  * needs to have its queues adjusted and AST/BASTs
 705                  * run.  So let's pull each entry off the dirty_list
 706                  * and drop dlm->spinlock ASAP.  Once off the list,
 707                  * res->spinlock needs to be taken again to protect
 708                  * the queues while calling dlm_shuffle_lists.  */
 709                 spin_lock(&dlm->spinlock);
 710                 while (!list_empty(&dlm->dirty_list)) {
 711                         int delay = 0;
 712                         res = list_entry(dlm->dirty_list.next,
 713                                          struct dlm_lock_resource, dirty);
 714 
 715                         /* peel a lockres off, remove it from the list,
 716                          * unset the dirty flag and drop the dlm lock */
 717                         BUG_ON(!res);
 718                         dlm_lockres_get(res);
 719 
 720                         spin_lock(&res->spinlock);
 721                         /* We clear the DLM_LOCK_RES_DIRTY state once we shuffle lists below */
 722                         list_del_init(&res->dirty);
 723                         spin_unlock(&res->spinlock);
 724                         spin_unlock(&dlm->spinlock);
 725                         /* Drop dirty_list ref */
 726                         dlm_lockres_put(res);
 727 
 728                         /* lockres can be re-dirtied/re-added to the
 729                          * dirty_list in this gap, but that is ok */
 730 
 731                         spin_lock(&dlm->ast_lock);
 732                         spin_lock(&res->spinlock);
 733                         if (res->owner != dlm->node_num) {
 734                                 __dlm_print_one_lock_resource(res);
 735                                 mlog(ML_ERROR, "%s: inprog %d, mig %d, reco %d,"
 736                                      " dirty %d\n", dlm->name,
 737                                      !!(res->state & DLM_LOCK_RES_IN_PROGRESS),
 738                                      !!(res->state & DLM_LOCK_RES_MIGRATING),
 739                                      !!(res->state & DLM_LOCK_RES_RECOVERING),
 740                                      !!(res->state & DLM_LOCK_RES_DIRTY));
 741                         }
 742                         BUG_ON(res->owner != dlm->node_num);
 743 
 744                         /* it is now ok to move lockreses in these states
 745                          * to the dirty list, assuming that they will only be
 746                          * dirty for a short while. */
 747                         BUG_ON(res->state & DLM_LOCK_RES_MIGRATING);
 748                         if (res->state & (DLM_LOCK_RES_IN_PROGRESS |
 749                                           DLM_LOCK_RES_RECOVERING |
 750                                           DLM_LOCK_RES_RECOVERY_WAITING)) {
 751                                 /* move it to the tail and keep going */
 752                                 res->state &= ~DLM_LOCK_RES_DIRTY;
 753                                 spin_unlock(&res->spinlock);
 754                                 spin_unlock(&dlm->ast_lock);
 755                                 mlog(0, "%s: res %.*s, inprogress, delay list "
 756                                      "shuffle, state %d\n", dlm->name,
 757                                      res->lockname.len, res->lockname.name,
 758                                      res->state);
 759                                 delay = 1;
 760                                 goto in_progress;
 761                         }
 762 
 763                         /* at this point the lockres is not migrating/
 764                          * recovering/in-progress.  we have the lockres
 765                          * spinlock and do NOT have the dlm lock.
 766                          * safe to reserve/queue asts and run the lists. */
 767 
 768                         /* called while holding lockres lock */
 769                         dlm_shuffle_lists(dlm, res);
 770                         res->state &= ~DLM_LOCK_RES_DIRTY;
 771                         spin_unlock(&res->spinlock);
 772                         spin_unlock(&dlm->ast_lock);
 773 
 774                         dlm_lockres_calc_usage(dlm, res);
 775 
 776 in_progress:
 777 
 778                         spin_lock(&dlm->spinlock);
 779                         /* if the lock was in-progress, stick
 780                          * it on the back of the list */
 781                         if (delay) {
 782                                 spin_lock(&res->spinlock);
 783                                 __dlm_dirty_lockres(dlm, res);
 784                                 spin_unlock(&res->spinlock);
 785                         }
 786                         dlm_lockres_put(res);
 787 
 788                         /* unlikely, but we may need to give time to
 789                          * other tasks */
 790                         if (!--n) {
 791                                 mlog(0, "%s: Throttling dlm thread\n",
 792                                      dlm->name);
 793                                 break;
 794                         }
 795                 }
 796 
 797                 spin_unlock(&dlm->spinlock);
 798                 dlm_flush_asts(dlm);
 799 
 800                 /* yield and continue right away if there is more work to do */
 801                 if (!n) {
 802                         cond_resched();
 803                         continue;
 804                 }
 805 
 806                 wait_event_interruptible_timeout(dlm->dlm_thread_wq,
 807                                                  !dlm_dirty_list_empty(dlm) ||
 808                                                  kthread_should_stop(),
 809                                                  timeout);
 810         }
 811 
 812         mlog(0, "quitting DLM thread\n");
 813         return 0;
 814 }

/* [<][>][^][v][top][bottom][index][help] */