1/* FS-Cache worker operation management routines 2 * 3 * Copyright (C) 2008 Red Hat, Inc. All Rights Reserved. 4 * Written by David Howells (dhowells@redhat.com) 5 * 6 * This program is free software; you can redistribute it and/or 7 * modify it under the terms of the GNU General Public License 8 * as published by the Free Software Foundation; either version 9 * 2 of the License, or (at your option) any later version. 10 * 11 * See Documentation/filesystems/caching/operations.txt 12 */ 13 14#define FSCACHE_DEBUG_LEVEL OPERATION 15#include <linux/module.h> 16#include <linux/seq_file.h> 17#include <linux/slab.h> 18#include "internal.h" 19 20atomic_t fscache_op_debug_id; 21EXPORT_SYMBOL(fscache_op_debug_id); 22 23/** 24 * fscache_enqueue_operation - Enqueue an operation for processing 25 * @op: The operation to enqueue 26 * 27 * Enqueue an operation for processing by the FS-Cache thread pool. 28 * 29 * This will get its own ref on the object. 30 */ 31void fscache_enqueue_operation(struct fscache_operation *op) 32{ 33 _enter("{OBJ%x OP%x,%u}", 34 op->object->debug_id, op->debug_id, atomic_read(&op->usage)); 35 36 ASSERT(list_empty(&op->pend_link)); 37 ASSERT(op->processor != NULL); 38 ASSERT(fscache_object_is_available(op->object)); 39 ASSERTCMP(atomic_read(&op->usage), >, 0); 40 ASSERTCMP(op->state, ==, FSCACHE_OP_ST_IN_PROGRESS); 41 42 fscache_stat(&fscache_n_op_enqueue); 43 switch (op->flags & FSCACHE_OP_TYPE) { 44 case FSCACHE_OP_ASYNC: 45 _debug("queue async"); 46 atomic_inc(&op->usage); 47 if (!queue_work(fscache_op_wq, &op->work)) 48 fscache_put_operation(op); 49 break; 50 case FSCACHE_OP_MYTHREAD: 51 _debug("queue for caller's attention"); 52 break; 53 default: 54 pr_err("Unexpected op type %lx", op->flags); 55 BUG(); 56 break; 57 } 58} 59EXPORT_SYMBOL(fscache_enqueue_operation); 60 61/* 62 * start an op running 63 */ 64static void fscache_run_op(struct fscache_object *object, 65 struct fscache_operation *op) 66{ 67 ASSERTCMP(op->state, ==, FSCACHE_OP_ST_PENDING); 68 69 op->state = FSCACHE_OP_ST_IN_PROGRESS; 70 object->n_in_progress++; 71 if (test_and_clear_bit(FSCACHE_OP_WAITING, &op->flags)) 72 wake_up_bit(&op->flags, FSCACHE_OP_WAITING); 73 if (op->processor) 74 fscache_enqueue_operation(op); 75 fscache_stat(&fscache_n_op_run); 76} 77 78/* 79 * submit an exclusive operation for an object 80 * - other ops are excluded from running simultaneously with this one 81 * - this gets any extra refs it needs on an op 82 */ 83int fscache_submit_exclusive_op(struct fscache_object *object, 84 struct fscache_operation *op) 85{ 86 int ret; 87 88 _enter("{OBJ%x OP%x},", object->debug_id, op->debug_id); 89 90 ASSERTCMP(op->state, ==, FSCACHE_OP_ST_INITIALISED); 91 ASSERTCMP(atomic_read(&op->usage), >, 0); 92 93 spin_lock(&object->lock); 94 ASSERTCMP(object->n_ops, >=, object->n_in_progress); 95 ASSERTCMP(object->n_ops, >=, object->n_exclusive); 96 ASSERT(list_empty(&op->pend_link)); 97 98 op->state = FSCACHE_OP_ST_PENDING; 99 if (fscache_object_is_active(object)) { 100 op->object = object; 101 object->n_ops++; 102 object->n_exclusive++; /* reads and writes must wait */ 103 104 if (object->n_in_progress > 0) { 105 atomic_inc(&op->usage); 106 list_add_tail(&op->pend_link, &object->pending_ops); 107 fscache_stat(&fscache_n_op_pend); 108 } else if (!list_empty(&object->pending_ops)) { 109 atomic_inc(&op->usage); 110 list_add_tail(&op->pend_link, &object->pending_ops); 111 fscache_stat(&fscache_n_op_pend); 112 fscache_start_operations(object); 113 } else { 114 ASSERTCMP(object->n_in_progress, ==, 0); 115 fscache_run_op(object, op); 116 } 117 118 /* need to issue a new write op after this */ 119 clear_bit(FSCACHE_OBJECT_PENDING_WRITE, &object->flags); 120 ret = 0; 121 } else if (test_bit(FSCACHE_OBJECT_IS_LOOKED_UP, &object->flags)) { 122 op->object = object; 123 object->n_ops++; 124 object->n_exclusive++; /* reads and writes must wait */ 125 atomic_inc(&op->usage); 126 list_add_tail(&op->pend_link, &object->pending_ops); 127 fscache_stat(&fscache_n_op_pend); 128 ret = 0; 129 } else { 130 /* If we're in any other state, there must have been an I/O 131 * error of some nature. 132 */ 133 ASSERT(test_bit(FSCACHE_IOERROR, &object->cache->flags)); 134 ret = -EIO; 135 } 136 137 spin_unlock(&object->lock); 138 return ret; 139} 140 141/* 142 * report an unexpected submission 143 */ 144static void fscache_report_unexpected_submission(struct fscache_object *object, 145 struct fscache_operation *op, 146 const struct fscache_state *ostate) 147{ 148 static bool once_only; 149 struct fscache_operation *p; 150 unsigned n; 151 152 if (once_only) 153 return; 154 once_only = true; 155 156 kdebug("unexpected submission OP%x [OBJ%x %s]", 157 op->debug_id, object->debug_id, object->state->name); 158 kdebug("objstate=%s [%s]", object->state->name, ostate->name); 159 kdebug("objflags=%lx", object->flags); 160 kdebug("objevent=%lx [%lx]", object->events, object->event_mask); 161 kdebug("ops=%u inp=%u exc=%u", 162 object->n_ops, object->n_in_progress, object->n_exclusive); 163 164 if (!list_empty(&object->pending_ops)) { 165 n = 0; 166 list_for_each_entry(p, &object->pending_ops, pend_link) { 167 ASSERTCMP(p->object, ==, object); 168 kdebug("%p %p", op->processor, op->release); 169 n++; 170 } 171 172 kdebug("n=%u", n); 173 } 174 175 dump_stack(); 176} 177 178/* 179 * submit an operation for an object 180 * - objects may be submitted only in the following states: 181 * - during object creation (write ops may be submitted) 182 * - whilst the object is active 183 * - after an I/O error incurred in one of the two above states (op rejected) 184 * - this gets any extra refs it needs on an op 185 */ 186int fscache_submit_op(struct fscache_object *object, 187 struct fscache_operation *op) 188{ 189 const struct fscache_state *ostate; 190 int ret; 191 192 _enter("{OBJ%x OP%x},{%u}", 193 object->debug_id, op->debug_id, atomic_read(&op->usage)); 194 195 ASSERTCMP(op->state, ==, FSCACHE_OP_ST_INITIALISED); 196 ASSERTCMP(atomic_read(&op->usage), >, 0); 197 198 spin_lock(&object->lock); 199 ASSERTCMP(object->n_ops, >=, object->n_in_progress); 200 ASSERTCMP(object->n_ops, >=, object->n_exclusive); 201 ASSERT(list_empty(&op->pend_link)); 202 203 ostate = object->state; 204 smp_rmb(); 205 206 op->state = FSCACHE_OP_ST_PENDING; 207 if (fscache_object_is_active(object)) { 208 op->object = object; 209 object->n_ops++; 210 211 if (object->n_exclusive > 0) { 212 atomic_inc(&op->usage); 213 list_add_tail(&op->pend_link, &object->pending_ops); 214 fscache_stat(&fscache_n_op_pend); 215 } else if (!list_empty(&object->pending_ops)) { 216 atomic_inc(&op->usage); 217 list_add_tail(&op->pend_link, &object->pending_ops); 218 fscache_stat(&fscache_n_op_pend); 219 fscache_start_operations(object); 220 } else { 221 ASSERTCMP(object->n_exclusive, ==, 0); 222 fscache_run_op(object, op); 223 } 224 ret = 0; 225 } else if (test_bit(FSCACHE_OBJECT_IS_LOOKED_UP, &object->flags)) { 226 op->object = object; 227 object->n_ops++; 228 atomic_inc(&op->usage); 229 list_add_tail(&op->pend_link, &object->pending_ops); 230 fscache_stat(&fscache_n_op_pend); 231 ret = 0; 232 } else if (fscache_object_is_dying(object)) { 233 fscache_stat(&fscache_n_op_rejected); 234 op->state = FSCACHE_OP_ST_CANCELLED; 235 ret = -ENOBUFS; 236 } else if (!test_bit(FSCACHE_IOERROR, &object->cache->flags)) { 237 fscache_report_unexpected_submission(object, op, ostate); 238 ASSERT(!fscache_object_is_active(object)); 239 op->state = FSCACHE_OP_ST_CANCELLED; 240 ret = -ENOBUFS; 241 } else { 242 op->state = FSCACHE_OP_ST_CANCELLED; 243 ret = -ENOBUFS; 244 } 245 246 spin_unlock(&object->lock); 247 return ret; 248} 249 250/* 251 * queue an object for withdrawal on error, aborting all following asynchronous 252 * operations 253 */ 254void fscache_abort_object(struct fscache_object *object) 255{ 256 _enter("{OBJ%x}", object->debug_id); 257 258 fscache_raise_event(object, FSCACHE_OBJECT_EV_ERROR); 259} 260 261/* 262 * Jump start the operation processing on an object. The caller must hold 263 * object->lock. 264 */ 265void fscache_start_operations(struct fscache_object *object) 266{ 267 struct fscache_operation *op; 268 bool stop = false; 269 270 while (!list_empty(&object->pending_ops) && !stop) { 271 op = list_entry(object->pending_ops.next, 272 struct fscache_operation, pend_link); 273 274 if (test_bit(FSCACHE_OP_EXCLUSIVE, &op->flags)) { 275 if (object->n_in_progress > 0) 276 break; 277 stop = true; 278 } 279 list_del_init(&op->pend_link); 280 fscache_run_op(object, op); 281 282 /* the pending queue was holding a ref on the object */ 283 fscache_put_operation(op); 284 } 285 286 ASSERTCMP(object->n_in_progress, <=, object->n_ops); 287 288 _debug("woke %d ops on OBJ%x", 289 object->n_in_progress, object->debug_id); 290} 291 292/* 293 * cancel an operation that's pending on an object 294 */ 295int fscache_cancel_op(struct fscache_operation *op, 296 void (*do_cancel)(struct fscache_operation *)) 297{ 298 struct fscache_object *object = op->object; 299 int ret; 300 301 _enter("OBJ%x OP%x}", op->object->debug_id, op->debug_id); 302 303 ASSERTCMP(op->state, >=, FSCACHE_OP_ST_PENDING); 304 ASSERTCMP(op->state, !=, FSCACHE_OP_ST_CANCELLED); 305 ASSERTCMP(atomic_read(&op->usage), >, 0); 306 307 spin_lock(&object->lock); 308 309 ret = -EBUSY; 310 if (op->state == FSCACHE_OP_ST_PENDING) { 311 ASSERT(!list_empty(&op->pend_link)); 312 fscache_stat(&fscache_n_op_cancelled); 313 list_del_init(&op->pend_link); 314 if (do_cancel) 315 do_cancel(op); 316 op->state = FSCACHE_OP_ST_CANCELLED; 317 if (test_bit(FSCACHE_OP_EXCLUSIVE, &op->flags)) 318 object->n_exclusive--; 319 if (test_and_clear_bit(FSCACHE_OP_WAITING, &op->flags)) 320 wake_up_bit(&op->flags, FSCACHE_OP_WAITING); 321 fscache_put_operation(op); 322 ret = 0; 323 } 324 325 spin_unlock(&object->lock); 326 _leave(" = %d", ret); 327 return ret; 328} 329 330/* 331 * Cancel all pending operations on an object 332 */ 333void fscache_cancel_all_ops(struct fscache_object *object) 334{ 335 struct fscache_operation *op; 336 337 _enter("OBJ%x", object->debug_id); 338 339 spin_lock(&object->lock); 340 341 while (!list_empty(&object->pending_ops)) { 342 op = list_entry(object->pending_ops.next, 343 struct fscache_operation, pend_link); 344 fscache_stat(&fscache_n_op_cancelled); 345 list_del_init(&op->pend_link); 346 347 ASSERTCMP(op->state, ==, FSCACHE_OP_ST_PENDING); 348 op->state = FSCACHE_OP_ST_CANCELLED; 349 350 if (test_bit(FSCACHE_OP_EXCLUSIVE, &op->flags)) 351 object->n_exclusive--; 352 if (test_and_clear_bit(FSCACHE_OP_WAITING, &op->flags)) 353 wake_up_bit(&op->flags, FSCACHE_OP_WAITING); 354 fscache_put_operation(op); 355 cond_resched_lock(&object->lock); 356 } 357 358 spin_unlock(&object->lock); 359 _leave(""); 360} 361 362/* 363 * Record the completion or cancellation of an in-progress operation. 364 */ 365void fscache_op_complete(struct fscache_operation *op, bool cancelled) 366{ 367 struct fscache_object *object = op->object; 368 369 _enter("OBJ%x", object->debug_id); 370 371 ASSERTCMP(op->state, ==, FSCACHE_OP_ST_IN_PROGRESS); 372 ASSERTCMP(object->n_in_progress, >, 0); 373 ASSERTIFCMP(test_bit(FSCACHE_OP_EXCLUSIVE, &op->flags), 374 object->n_exclusive, >, 0); 375 ASSERTIFCMP(test_bit(FSCACHE_OP_EXCLUSIVE, &op->flags), 376 object->n_in_progress, ==, 1); 377 378 spin_lock(&object->lock); 379 380 op->state = cancelled ? 381 FSCACHE_OP_ST_CANCELLED : FSCACHE_OP_ST_COMPLETE; 382 383 if (test_bit(FSCACHE_OP_EXCLUSIVE, &op->flags)) 384 object->n_exclusive--; 385 object->n_in_progress--; 386 if (object->n_in_progress == 0) 387 fscache_start_operations(object); 388 389 spin_unlock(&object->lock); 390 _leave(""); 391} 392EXPORT_SYMBOL(fscache_op_complete); 393 394/* 395 * release an operation 396 * - queues pending ops if this is the last in-progress op 397 */ 398void fscache_put_operation(struct fscache_operation *op) 399{ 400 struct fscache_object *object; 401 struct fscache_cache *cache; 402 403 _enter("{OBJ%x OP%x,%d}", 404 op->object->debug_id, op->debug_id, atomic_read(&op->usage)); 405 406 ASSERTCMP(atomic_read(&op->usage), >, 0); 407 408 if (!atomic_dec_and_test(&op->usage)) 409 return; 410 411 _debug("PUT OP"); 412 ASSERTIFCMP(op->state != FSCACHE_OP_ST_COMPLETE, 413 op->state, ==, FSCACHE_OP_ST_CANCELLED); 414 op->state = FSCACHE_OP_ST_DEAD; 415 416 fscache_stat(&fscache_n_op_release); 417 418 if (op->release) { 419 op->release(op); 420 op->release = NULL; 421 } 422 423 object = op->object; 424 425 if (test_bit(FSCACHE_OP_DEC_READ_CNT, &op->flags)) 426 atomic_dec(&object->n_reads); 427 if (test_bit(FSCACHE_OP_UNUSE_COOKIE, &op->flags)) 428 fscache_unuse_cookie(object); 429 430 /* now... we may get called with the object spinlock held, so we 431 * complete the cleanup here only if we can immediately acquire the 432 * lock, and defer it otherwise */ 433 if (!spin_trylock(&object->lock)) { 434 _debug("defer put"); 435 fscache_stat(&fscache_n_op_deferred_release); 436 437 cache = object->cache; 438 spin_lock(&cache->op_gc_list_lock); 439 list_add_tail(&op->pend_link, &cache->op_gc_list); 440 spin_unlock(&cache->op_gc_list_lock); 441 schedule_work(&cache->op_gc); 442 _leave(" [defer]"); 443 return; 444 } 445 446 ASSERTCMP(object->n_ops, >, 0); 447 object->n_ops--; 448 if (object->n_ops == 0) 449 fscache_raise_event(object, FSCACHE_OBJECT_EV_CLEARED); 450 451 spin_unlock(&object->lock); 452 453 kfree(op); 454 _leave(" [done]"); 455} 456EXPORT_SYMBOL(fscache_put_operation); 457 458/* 459 * garbage collect operations that have had their release deferred 460 */ 461void fscache_operation_gc(struct work_struct *work) 462{ 463 struct fscache_operation *op; 464 struct fscache_object *object; 465 struct fscache_cache *cache = 466 container_of(work, struct fscache_cache, op_gc); 467 int count = 0; 468 469 _enter(""); 470 471 do { 472 spin_lock(&cache->op_gc_list_lock); 473 if (list_empty(&cache->op_gc_list)) { 474 spin_unlock(&cache->op_gc_list_lock); 475 break; 476 } 477 478 op = list_entry(cache->op_gc_list.next, 479 struct fscache_operation, pend_link); 480 list_del(&op->pend_link); 481 spin_unlock(&cache->op_gc_list_lock); 482 483 object = op->object; 484 spin_lock(&object->lock); 485 486 _debug("GC DEFERRED REL OBJ%x OP%x", 487 object->debug_id, op->debug_id); 488 fscache_stat(&fscache_n_op_gc); 489 490 ASSERTCMP(atomic_read(&op->usage), ==, 0); 491 ASSERTCMP(op->state, ==, FSCACHE_OP_ST_DEAD); 492 493 ASSERTCMP(object->n_ops, >, 0); 494 object->n_ops--; 495 if (object->n_ops == 0) 496 fscache_raise_event(object, FSCACHE_OBJECT_EV_CLEARED); 497 498 spin_unlock(&object->lock); 499 kfree(op); 500 501 } while (count++ < 20); 502 503 if (!list_empty(&cache->op_gc_list)) 504 schedule_work(&cache->op_gc); 505 506 _leave(""); 507} 508 509/* 510 * execute an operation using fs_op_wq to provide processing context - 511 * the caller holds a ref to this object, so we don't need to hold one 512 */ 513void fscache_op_work_func(struct work_struct *work) 514{ 515 struct fscache_operation *op = 516 container_of(work, struct fscache_operation, work); 517 unsigned long start; 518 519 _enter("{OBJ%x OP%x,%d}", 520 op->object->debug_id, op->debug_id, atomic_read(&op->usage)); 521 522 ASSERT(op->processor != NULL); 523 start = jiffies; 524 op->processor(op); 525 fscache_hist(fscache_ops_histogram, start); 526 fscache_put_operation(op); 527 528 _leave(""); 529} 530