root/fs/ocfs2/stack_user.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. ocfs2_control_set_handshake_state
  2. ocfs2_control_get_handshake_state
  3. ocfs2_connection_find
  4. ocfs2_live_connection_attach
  5. ocfs2_live_connection_drop
  6. ocfs2_control_cfu
  7. ocfs2_control_validate_protocol
  8. ocfs2_control_send_down
  9. ocfs2_control_install_private
  10. ocfs2_control_get_this_node
  11. ocfs2_control_do_setnode_msg
  12. ocfs2_control_do_setversion_msg
  13. ocfs2_control_do_down_msg
  14. ocfs2_control_message
  15. ocfs2_control_write
  16. ocfs2_control_read
  17. ocfs2_control_release
  18. ocfs2_control_open
  19. ocfs2_control_init
  20. ocfs2_control_exit
  21. fsdlm_lock_ast_wrapper
  22. fsdlm_blocking_ast_wrapper
  23. user_dlm_lock
  24. user_dlm_unlock
  25. user_dlm_lock_status
  26. user_dlm_lvb_valid
  27. user_dlm_lvb
  28. user_dlm_dump_lksb
  29. user_plock
  30. fs_protocol_compare
  31. lvb_to_version
  32. version_to_lvb
  33. sync_wait_cb
  34. sync_unlock
  35. sync_lock
  36. version_lock
  37. version_unlock
  38. get_protocol_version
  39. user_recover_prep
  40. user_recover_slot
  41. user_recover_done
  42. user_cluster_disconnect
  43. user_cluster_connect
  44. user_cluster_this_node
  45. ocfs2_user_plugin_init
  46. ocfs2_user_plugin_exit

   1 // SPDX-License-Identifier: GPL-2.0-only
   2 /* -*- mode: c; c-basic-offset: 8; -*-
   3  * vim: noexpandtab sw=8 ts=8 sts=0:
   4  *
   5  * stack_user.c
   6  *
   7  * Code which interfaces ocfs2 with fs/dlm and a userspace stack.
   8  *
   9  * Copyright (C) 2007 Oracle.  All rights reserved.
  10  */
  11 
  12 #include <linux/module.h>
  13 #include <linux/fs.h>
  14 #include <linux/miscdevice.h>
  15 #include <linux/mutex.h>
  16 #include <linux/slab.h>
  17 #include <linux/reboot.h>
  18 #include <linux/sched.h>
  19 #include <linux/uaccess.h>
  20 
  21 #include "stackglue.h"
  22 
  23 #include <linux/dlm_plock.h>
  24 
  25 /*
  26  * The control protocol starts with a handshake.  Until the handshake
  27  * is complete, the control device will fail all write(2)s.
  28  *
  29  * The handshake is simple.  First, the client reads until EOF.  Each line
  30  * of output is a supported protocol tag.  All protocol tags are a single
  31  * character followed by a two hex digit version number.  Currently the
  32  * only things supported is T01, for "Text-base version 0x01".  Next, the
  33  * client writes the version they would like to use, including the newline.
  34  * Thus, the protocol tag is 'T01\n'.  If the version tag written is
  35  * unknown, -EINVAL is returned.  Once the negotiation is complete, the
  36  * client can start sending messages.
  37  *
  38  * The T01 protocol has three messages.  First is the "SETN" message.
  39  * It has the following syntax:
  40  *
  41  *  SETN<space><8-char-hex-nodenum><newline>
  42  *
  43  * This is 14 characters.
  44  *
  45  * The "SETN" message must be the first message following the protocol.
  46  * It tells ocfs2_control the local node number.
  47  *
  48  * Next comes the "SETV" message.  It has the following syntax:
  49  *
  50  *  SETV<space><2-char-hex-major><space><2-char-hex-minor><newline>
  51  *
  52  * This is 11 characters.
  53  *
  54  * The "SETV" message sets the filesystem locking protocol version as
  55  * negotiated by the client.  The client negotiates based on the maximum
  56  * version advertised in /sys/fs/ocfs2/max_locking_protocol.  The major
  57  * number from the "SETV" message must match
  58  * ocfs2_user_plugin.sp_max_proto.pv_major, and the minor number
  59  * must be less than or equal to ...sp_max_version.pv_minor.
  60  *
  61  * Once this information has been set, mounts will be allowed.  From this
  62  * point on, the "DOWN" message can be sent for node down notification.
  63  * It has the following syntax:
  64  *
  65  *  DOWN<space><32-char-cap-hex-uuid><space><8-char-hex-nodenum><newline>
  66  *
  67  * eg:
  68  *
  69  *  DOWN 632A924FDD844190BDA93C0DF6B94899 00000001\n
  70  *
  71  * This is 47 characters.
  72  */
  73 
  74 /*
  75  * Whether or not the client has done the handshake.
  76  * For now, we have just one protocol version.
  77  */
  78 #define OCFS2_CONTROL_PROTO                     "T01\n"
  79 #define OCFS2_CONTROL_PROTO_LEN                 4
  80 
  81 /* Handshake states */
  82 #define OCFS2_CONTROL_HANDSHAKE_INVALID         (0)
  83 #define OCFS2_CONTROL_HANDSHAKE_READ            (1)
  84 #define OCFS2_CONTROL_HANDSHAKE_PROTOCOL        (2)
  85 #define OCFS2_CONTROL_HANDSHAKE_VALID           (3)
  86 
  87 /* Messages */
  88 #define OCFS2_CONTROL_MESSAGE_OP_LEN            4
  89 #define OCFS2_CONTROL_MESSAGE_SETNODE_OP        "SETN"
  90 #define OCFS2_CONTROL_MESSAGE_SETNODE_TOTAL_LEN 14
  91 #define OCFS2_CONTROL_MESSAGE_SETVERSION_OP     "SETV"
  92 #define OCFS2_CONTROL_MESSAGE_SETVERSION_TOTAL_LEN      11
  93 #define OCFS2_CONTROL_MESSAGE_DOWN_OP           "DOWN"
  94 #define OCFS2_CONTROL_MESSAGE_DOWN_TOTAL_LEN    47
  95 #define OCFS2_TEXT_UUID_LEN                     32
  96 #define OCFS2_CONTROL_MESSAGE_VERNUM_LEN        2
  97 #define OCFS2_CONTROL_MESSAGE_NODENUM_LEN       8
  98 #define VERSION_LOCK                            "version_lock"
  99 
 100 enum ocfs2_connection_type {
 101         WITH_CONTROLD,
 102         NO_CONTROLD
 103 };
 104 
 105 /*
 106  * ocfs2_live_connection is refcounted because the filesystem and
 107  * miscdevice sides can detach in different order.  Let's just be safe.
 108  */
 109 struct ocfs2_live_connection {
 110         struct list_head                oc_list;
 111         struct ocfs2_cluster_connection *oc_conn;
 112         enum ocfs2_connection_type      oc_type;
 113         atomic_t                        oc_this_node;
 114         int                             oc_our_slot;
 115         struct dlm_lksb                 oc_version_lksb;
 116         char                            oc_lvb[DLM_LVB_LEN];
 117         struct completion               oc_sync_wait;
 118         wait_queue_head_t               oc_wait;
 119 };
 120 
 121 struct ocfs2_control_private {
 122         struct list_head op_list;
 123         int op_state;
 124         int op_this_node;
 125         struct ocfs2_protocol_version op_proto;
 126 };
 127 
 128 /* SETN<space><8-char-hex-nodenum><newline> */
 129 struct ocfs2_control_message_setn {
 130         char    tag[OCFS2_CONTROL_MESSAGE_OP_LEN];
 131         char    space;
 132         char    nodestr[OCFS2_CONTROL_MESSAGE_NODENUM_LEN];
 133         char    newline;
 134 };
 135 
 136 /* SETV<space><2-char-hex-major><space><2-char-hex-minor><newline> */
 137 struct ocfs2_control_message_setv {
 138         char    tag[OCFS2_CONTROL_MESSAGE_OP_LEN];
 139         char    space1;
 140         char    major[OCFS2_CONTROL_MESSAGE_VERNUM_LEN];
 141         char    space2;
 142         char    minor[OCFS2_CONTROL_MESSAGE_VERNUM_LEN];
 143         char    newline;
 144 };
 145 
 146 /* DOWN<space><32-char-cap-hex-uuid><space><8-char-hex-nodenum><newline> */
 147 struct ocfs2_control_message_down {
 148         char    tag[OCFS2_CONTROL_MESSAGE_OP_LEN];
 149         char    space1;
 150         char    uuid[OCFS2_TEXT_UUID_LEN];
 151         char    space2;
 152         char    nodestr[OCFS2_CONTROL_MESSAGE_NODENUM_LEN];
 153         char    newline;
 154 };
 155 
 156 union ocfs2_control_message {
 157         char                                    tag[OCFS2_CONTROL_MESSAGE_OP_LEN];
 158         struct ocfs2_control_message_setn       u_setn;
 159         struct ocfs2_control_message_setv       u_setv;
 160         struct ocfs2_control_message_down       u_down;
 161 };
 162 
 163 static struct ocfs2_stack_plugin ocfs2_user_plugin;
 164 
 165 static atomic_t ocfs2_control_opened;
 166 static int ocfs2_control_this_node = -1;
 167 static struct ocfs2_protocol_version running_proto;
 168 
 169 static LIST_HEAD(ocfs2_live_connection_list);
 170 static LIST_HEAD(ocfs2_control_private_list);
 171 static DEFINE_MUTEX(ocfs2_control_lock);
 172 
 173 static inline void ocfs2_control_set_handshake_state(struct file *file,
 174                                                      int state)
 175 {
 176         struct ocfs2_control_private *p = file->private_data;
 177         p->op_state = state;
 178 }
 179 
 180 static inline int ocfs2_control_get_handshake_state(struct file *file)
 181 {
 182         struct ocfs2_control_private *p = file->private_data;
 183         return p->op_state;
 184 }
 185 
 186 static struct ocfs2_live_connection *ocfs2_connection_find(const char *name)
 187 {
 188         size_t len = strlen(name);
 189         struct ocfs2_live_connection *c;
 190 
 191         BUG_ON(!mutex_is_locked(&ocfs2_control_lock));
 192 
 193         list_for_each_entry(c, &ocfs2_live_connection_list, oc_list) {
 194                 if ((c->oc_conn->cc_namelen == len) &&
 195                     !strncmp(c->oc_conn->cc_name, name, len))
 196                         return c;
 197         }
 198 
 199         return NULL;
 200 }
 201 
 202 /*
 203  * ocfs2_live_connection structures are created underneath the ocfs2
 204  * mount path.  Since the VFS prevents multiple calls to
 205  * fill_super(), we can't get dupes here.
 206  */
 207 static int ocfs2_live_connection_attach(struct ocfs2_cluster_connection *conn,
 208                                      struct ocfs2_live_connection *c)
 209 {
 210         int rc = 0;
 211 
 212         mutex_lock(&ocfs2_control_lock);
 213         c->oc_conn = conn;
 214 
 215         if ((c->oc_type == NO_CONTROLD) || atomic_read(&ocfs2_control_opened))
 216                 list_add(&c->oc_list, &ocfs2_live_connection_list);
 217         else {
 218                 printk(KERN_ERR
 219                        "ocfs2: Userspace control daemon is not present\n");
 220                 rc = -ESRCH;
 221         }
 222 
 223         mutex_unlock(&ocfs2_control_lock);
 224         return rc;
 225 }
 226 
 227 /*
 228  * This function disconnects the cluster connection from ocfs2_control.
 229  * Afterwards, userspace can't affect the cluster connection.
 230  */
 231 static void ocfs2_live_connection_drop(struct ocfs2_live_connection *c)
 232 {
 233         mutex_lock(&ocfs2_control_lock);
 234         list_del_init(&c->oc_list);
 235         c->oc_conn = NULL;
 236         mutex_unlock(&ocfs2_control_lock);
 237 
 238         kfree(c);
 239 }
 240 
 241 static int ocfs2_control_cfu(void *target, size_t target_len,
 242                              const char __user *buf, size_t count)
 243 {
 244         /* The T01 expects write(2) calls to have exactly one command */
 245         if ((count != target_len) ||
 246             (count > sizeof(union ocfs2_control_message)))
 247                 return -EINVAL;
 248 
 249         if (copy_from_user(target, buf, target_len))
 250                 return -EFAULT;
 251 
 252         return 0;
 253 }
 254 
 255 static ssize_t ocfs2_control_validate_protocol(struct file *file,
 256                                                const char __user *buf,
 257                                                size_t count)
 258 {
 259         ssize_t ret;
 260         char kbuf[OCFS2_CONTROL_PROTO_LEN];
 261 
 262         ret = ocfs2_control_cfu(kbuf, OCFS2_CONTROL_PROTO_LEN,
 263                                 buf, count);
 264         if (ret)
 265                 return ret;
 266 
 267         if (strncmp(kbuf, OCFS2_CONTROL_PROTO, OCFS2_CONTROL_PROTO_LEN))
 268                 return -EINVAL;
 269 
 270         ocfs2_control_set_handshake_state(file,
 271                                           OCFS2_CONTROL_HANDSHAKE_PROTOCOL);
 272 
 273         return count;
 274 }
 275 
 276 static void ocfs2_control_send_down(const char *uuid,
 277                                     int nodenum)
 278 {
 279         struct ocfs2_live_connection *c;
 280 
 281         mutex_lock(&ocfs2_control_lock);
 282 
 283         c = ocfs2_connection_find(uuid);
 284         if (c) {
 285                 BUG_ON(c->oc_conn == NULL);
 286                 c->oc_conn->cc_recovery_handler(nodenum,
 287                                                 c->oc_conn->cc_recovery_data);
 288         }
 289 
 290         mutex_unlock(&ocfs2_control_lock);
 291 }
 292 
 293 /*
 294  * Called whenever configuration elements are sent to /dev/ocfs2_control.
 295  * If all configuration elements are present, try to set the global
 296  * values.  If there is a problem, return an error.  Skip any missing
 297  * elements, and only bump ocfs2_control_opened when we have all elements
 298  * and are successful.
 299  */
 300 static int ocfs2_control_install_private(struct file *file)
 301 {
 302         int rc = 0;
 303         int set_p = 1;
 304         struct ocfs2_control_private *p = file->private_data;
 305 
 306         BUG_ON(p->op_state != OCFS2_CONTROL_HANDSHAKE_PROTOCOL);
 307 
 308         mutex_lock(&ocfs2_control_lock);
 309 
 310         if (p->op_this_node < 0) {
 311                 set_p = 0;
 312         } else if ((ocfs2_control_this_node >= 0) &&
 313                    (ocfs2_control_this_node != p->op_this_node)) {
 314                 rc = -EINVAL;
 315                 goto out_unlock;
 316         }
 317 
 318         if (!p->op_proto.pv_major) {
 319                 set_p = 0;
 320         } else if (!list_empty(&ocfs2_live_connection_list) &&
 321                    ((running_proto.pv_major != p->op_proto.pv_major) ||
 322                     (running_proto.pv_minor != p->op_proto.pv_minor))) {
 323                 rc = -EINVAL;
 324                 goto out_unlock;
 325         }
 326 
 327         if (set_p) {
 328                 ocfs2_control_this_node = p->op_this_node;
 329                 running_proto.pv_major = p->op_proto.pv_major;
 330                 running_proto.pv_minor = p->op_proto.pv_minor;
 331         }
 332 
 333 out_unlock:
 334         mutex_unlock(&ocfs2_control_lock);
 335 
 336         if (!rc && set_p) {
 337                 /* We set the global values successfully */
 338                 atomic_inc(&ocfs2_control_opened);
 339                 ocfs2_control_set_handshake_state(file,
 340                                         OCFS2_CONTROL_HANDSHAKE_VALID);
 341         }
 342 
 343         return rc;
 344 }
 345 
 346 static int ocfs2_control_get_this_node(void)
 347 {
 348         int rc;
 349 
 350         mutex_lock(&ocfs2_control_lock);
 351         if (ocfs2_control_this_node < 0)
 352                 rc = -EINVAL;
 353         else
 354                 rc = ocfs2_control_this_node;
 355         mutex_unlock(&ocfs2_control_lock);
 356 
 357         return rc;
 358 }
 359 
 360 static int ocfs2_control_do_setnode_msg(struct file *file,
 361                                         struct ocfs2_control_message_setn *msg)
 362 {
 363         long nodenum;
 364         char *ptr = NULL;
 365         struct ocfs2_control_private *p = file->private_data;
 366 
 367         if (ocfs2_control_get_handshake_state(file) !=
 368             OCFS2_CONTROL_HANDSHAKE_PROTOCOL)
 369                 return -EINVAL;
 370 
 371         if (strncmp(msg->tag, OCFS2_CONTROL_MESSAGE_SETNODE_OP,
 372                     OCFS2_CONTROL_MESSAGE_OP_LEN))
 373                 return -EINVAL;
 374 
 375         if ((msg->space != ' ') || (msg->newline != '\n'))
 376                 return -EINVAL;
 377         msg->space = msg->newline = '\0';
 378 
 379         nodenum = simple_strtol(msg->nodestr, &ptr, 16);
 380         if (!ptr || *ptr)
 381                 return -EINVAL;
 382 
 383         if ((nodenum == LONG_MIN) || (nodenum == LONG_MAX) ||
 384             (nodenum > INT_MAX) || (nodenum < 0))
 385                 return -ERANGE;
 386         p->op_this_node = nodenum;
 387 
 388         return ocfs2_control_install_private(file);
 389 }
 390 
 391 static int ocfs2_control_do_setversion_msg(struct file *file,
 392                                            struct ocfs2_control_message_setv *msg)
 393 {
 394         long major, minor;
 395         char *ptr = NULL;
 396         struct ocfs2_control_private *p = file->private_data;
 397         struct ocfs2_protocol_version *max =
 398                 &ocfs2_user_plugin.sp_max_proto;
 399 
 400         if (ocfs2_control_get_handshake_state(file) !=
 401             OCFS2_CONTROL_HANDSHAKE_PROTOCOL)
 402                 return -EINVAL;
 403 
 404         if (strncmp(msg->tag, OCFS2_CONTROL_MESSAGE_SETVERSION_OP,
 405                     OCFS2_CONTROL_MESSAGE_OP_LEN))
 406                 return -EINVAL;
 407 
 408         if ((msg->space1 != ' ') || (msg->space2 != ' ') ||
 409             (msg->newline != '\n'))
 410                 return -EINVAL;
 411         msg->space1 = msg->space2 = msg->newline = '\0';
 412 
 413         major = simple_strtol(msg->major, &ptr, 16);
 414         if (!ptr || *ptr)
 415                 return -EINVAL;
 416         minor = simple_strtol(msg->minor, &ptr, 16);
 417         if (!ptr || *ptr)
 418                 return -EINVAL;
 419 
 420         /*
 421          * The major must be between 1 and 255, inclusive.  The minor
 422          * must be between 0 and 255, inclusive.  The version passed in
 423          * must be within the maximum version supported by the filesystem.
 424          */
 425         if ((major == LONG_MIN) || (major == LONG_MAX) ||
 426             (major > (u8)-1) || (major < 1))
 427                 return -ERANGE;
 428         if ((minor == LONG_MIN) || (minor == LONG_MAX) ||
 429             (minor > (u8)-1) || (minor < 0))
 430                 return -ERANGE;
 431         if ((major != max->pv_major) ||
 432             (minor > max->pv_minor))
 433                 return -EINVAL;
 434 
 435         p->op_proto.pv_major = major;
 436         p->op_proto.pv_minor = minor;
 437 
 438         return ocfs2_control_install_private(file);
 439 }
 440 
 441 static int ocfs2_control_do_down_msg(struct file *file,
 442                                      struct ocfs2_control_message_down *msg)
 443 {
 444         long nodenum;
 445         char *p = NULL;
 446 
 447         if (ocfs2_control_get_handshake_state(file) !=
 448             OCFS2_CONTROL_HANDSHAKE_VALID)
 449                 return -EINVAL;
 450 
 451         if (strncmp(msg->tag, OCFS2_CONTROL_MESSAGE_DOWN_OP,
 452                     OCFS2_CONTROL_MESSAGE_OP_LEN))
 453                 return -EINVAL;
 454 
 455         if ((msg->space1 != ' ') || (msg->space2 != ' ') ||
 456             (msg->newline != '\n'))
 457                 return -EINVAL;
 458         msg->space1 = msg->space2 = msg->newline = '\0';
 459 
 460         nodenum = simple_strtol(msg->nodestr, &p, 16);
 461         if (!p || *p)
 462                 return -EINVAL;
 463 
 464         if ((nodenum == LONG_MIN) || (nodenum == LONG_MAX) ||
 465             (nodenum > INT_MAX) || (nodenum < 0))
 466                 return -ERANGE;
 467 
 468         ocfs2_control_send_down(msg->uuid, nodenum);
 469 
 470         return 0;
 471 }
 472 
 473 static ssize_t ocfs2_control_message(struct file *file,
 474                                      const char __user *buf,
 475                                      size_t count)
 476 {
 477         ssize_t ret;
 478         union ocfs2_control_message msg;
 479 
 480         /* Try to catch padding issues */
 481         WARN_ON(offsetof(struct ocfs2_control_message_down, uuid) !=
 482                 (sizeof(msg.u_down.tag) + sizeof(msg.u_down.space1)));
 483 
 484         memset(&msg, 0, sizeof(union ocfs2_control_message));
 485         ret = ocfs2_control_cfu(&msg, count, buf, count);
 486         if (ret)
 487                 goto out;
 488 
 489         if ((count == OCFS2_CONTROL_MESSAGE_SETNODE_TOTAL_LEN) &&
 490             !strncmp(msg.tag, OCFS2_CONTROL_MESSAGE_SETNODE_OP,
 491                      OCFS2_CONTROL_MESSAGE_OP_LEN))
 492                 ret = ocfs2_control_do_setnode_msg(file, &msg.u_setn);
 493         else if ((count == OCFS2_CONTROL_MESSAGE_SETVERSION_TOTAL_LEN) &&
 494                  !strncmp(msg.tag, OCFS2_CONTROL_MESSAGE_SETVERSION_OP,
 495                           OCFS2_CONTROL_MESSAGE_OP_LEN))
 496                 ret = ocfs2_control_do_setversion_msg(file, &msg.u_setv);
 497         else if ((count == OCFS2_CONTROL_MESSAGE_DOWN_TOTAL_LEN) &&
 498                  !strncmp(msg.tag, OCFS2_CONTROL_MESSAGE_DOWN_OP,
 499                           OCFS2_CONTROL_MESSAGE_OP_LEN))
 500                 ret = ocfs2_control_do_down_msg(file, &msg.u_down);
 501         else
 502                 ret = -EINVAL;
 503 
 504 out:
 505         return ret ? ret : count;
 506 }
 507 
 508 static ssize_t ocfs2_control_write(struct file *file,
 509                                    const char __user *buf,
 510                                    size_t count,
 511                                    loff_t *ppos)
 512 {
 513         ssize_t ret;
 514 
 515         switch (ocfs2_control_get_handshake_state(file)) {
 516                 case OCFS2_CONTROL_HANDSHAKE_INVALID:
 517                         ret = -EINVAL;
 518                         break;
 519 
 520                 case OCFS2_CONTROL_HANDSHAKE_READ:
 521                         ret = ocfs2_control_validate_protocol(file, buf,
 522                                                               count);
 523                         break;
 524 
 525                 case OCFS2_CONTROL_HANDSHAKE_PROTOCOL:
 526                 case OCFS2_CONTROL_HANDSHAKE_VALID:
 527                         ret = ocfs2_control_message(file, buf, count);
 528                         break;
 529 
 530                 default:
 531                         BUG();
 532                         ret = -EIO;
 533                         break;
 534         }
 535 
 536         return ret;
 537 }
 538 
 539 /*
 540  * This is a naive version.  If we ever have a new protocol, we'll expand
 541  * it.  Probably using seq_file.
 542  */
 543 static ssize_t ocfs2_control_read(struct file *file,
 544                                   char __user *buf,
 545                                   size_t count,
 546                                   loff_t *ppos)
 547 {
 548         ssize_t ret;
 549 
 550         ret = simple_read_from_buffer(buf, count, ppos,
 551                         OCFS2_CONTROL_PROTO, OCFS2_CONTROL_PROTO_LEN);
 552 
 553         /* Have we read the whole protocol list? */
 554         if (ret > 0 && *ppos >= OCFS2_CONTROL_PROTO_LEN)
 555                 ocfs2_control_set_handshake_state(file,
 556                                                   OCFS2_CONTROL_HANDSHAKE_READ);
 557 
 558         return ret;
 559 }
 560 
 561 static int ocfs2_control_release(struct inode *inode, struct file *file)
 562 {
 563         struct ocfs2_control_private *p = file->private_data;
 564 
 565         mutex_lock(&ocfs2_control_lock);
 566 
 567         if (ocfs2_control_get_handshake_state(file) !=
 568             OCFS2_CONTROL_HANDSHAKE_VALID)
 569                 goto out;
 570 
 571         if (atomic_dec_and_test(&ocfs2_control_opened)) {
 572                 if (!list_empty(&ocfs2_live_connection_list)) {
 573                         /* XXX: Do bad things! */
 574                         printk(KERN_ERR
 575                                "ocfs2: Unexpected release of ocfs2_control!\n"
 576                                "       Loss of cluster connection requires "
 577                                "an emergency restart!\n");
 578                         emergency_restart();
 579                 }
 580                 /*
 581                  * Last valid close clears the node number and resets
 582                  * the locking protocol version
 583                  */
 584                 ocfs2_control_this_node = -1;
 585                 running_proto.pv_major = 0;
 586                 running_proto.pv_minor = 0;
 587         }
 588 
 589 out:
 590         list_del_init(&p->op_list);
 591         file->private_data = NULL;
 592 
 593         mutex_unlock(&ocfs2_control_lock);
 594 
 595         kfree(p);
 596 
 597         return 0;
 598 }
 599 
 600 static int ocfs2_control_open(struct inode *inode, struct file *file)
 601 {
 602         struct ocfs2_control_private *p;
 603 
 604         p = kzalloc(sizeof(struct ocfs2_control_private), GFP_KERNEL);
 605         if (!p)
 606                 return -ENOMEM;
 607         p->op_this_node = -1;
 608 
 609         mutex_lock(&ocfs2_control_lock);
 610         file->private_data = p;
 611         list_add(&p->op_list, &ocfs2_control_private_list);
 612         mutex_unlock(&ocfs2_control_lock);
 613 
 614         return 0;
 615 }
 616 
 617 static const struct file_operations ocfs2_control_fops = {
 618         .open    = ocfs2_control_open,
 619         .release = ocfs2_control_release,
 620         .read    = ocfs2_control_read,
 621         .write   = ocfs2_control_write,
 622         .owner   = THIS_MODULE,
 623         .llseek  = default_llseek,
 624 };
 625 
 626 static struct miscdevice ocfs2_control_device = {
 627         .minor          = MISC_DYNAMIC_MINOR,
 628         .name           = "ocfs2_control",
 629         .fops           = &ocfs2_control_fops,
 630 };
 631 
 632 static int ocfs2_control_init(void)
 633 {
 634         int rc;
 635 
 636         atomic_set(&ocfs2_control_opened, 0);
 637 
 638         rc = misc_register(&ocfs2_control_device);
 639         if (rc)
 640                 printk(KERN_ERR
 641                        "ocfs2: Unable to register ocfs2_control device "
 642                        "(errno %d)\n",
 643                        -rc);
 644 
 645         return rc;
 646 }
 647 
 648 static void ocfs2_control_exit(void)
 649 {
 650         misc_deregister(&ocfs2_control_device);
 651 }
 652 
 653 static void fsdlm_lock_ast_wrapper(void *astarg)
 654 {
 655         struct ocfs2_dlm_lksb *lksb = astarg;
 656         int status = lksb->lksb_fsdlm.sb_status;
 657 
 658         /*
 659          * For now we're punting on the issue of other non-standard errors
 660          * where we can't tell if the unlock_ast or lock_ast should be called.
 661          * The main "other error" that's possible is EINVAL which means the
 662          * function was called with invalid args, which shouldn't be possible
 663          * since the caller here is under our control.  Other non-standard
 664          * errors probably fall into the same category, or otherwise are fatal
 665          * which means we can't carry on anyway.
 666          */
 667 
 668         if (status == -DLM_EUNLOCK || status == -DLM_ECANCEL)
 669                 lksb->lksb_conn->cc_proto->lp_unlock_ast(lksb, 0);
 670         else
 671                 lksb->lksb_conn->cc_proto->lp_lock_ast(lksb);
 672 }
 673 
 674 static void fsdlm_blocking_ast_wrapper(void *astarg, int level)
 675 {
 676         struct ocfs2_dlm_lksb *lksb = astarg;
 677 
 678         lksb->lksb_conn->cc_proto->lp_blocking_ast(lksb, level);
 679 }
 680 
 681 static int user_dlm_lock(struct ocfs2_cluster_connection *conn,
 682                          int mode,
 683                          struct ocfs2_dlm_lksb *lksb,
 684                          u32 flags,
 685                          void *name,
 686                          unsigned int namelen)
 687 {
 688         int ret;
 689 
 690         if (!lksb->lksb_fsdlm.sb_lvbptr)
 691                 lksb->lksb_fsdlm.sb_lvbptr = (char *)lksb +
 692                                              sizeof(struct dlm_lksb);
 693 
 694         ret = dlm_lock(conn->cc_lockspace, mode, &lksb->lksb_fsdlm,
 695                        flags|DLM_LKF_NODLCKWT, name, namelen, 0,
 696                        fsdlm_lock_ast_wrapper, lksb,
 697                        fsdlm_blocking_ast_wrapper);
 698         return ret;
 699 }
 700 
 701 static int user_dlm_unlock(struct ocfs2_cluster_connection *conn,
 702                            struct ocfs2_dlm_lksb *lksb,
 703                            u32 flags)
 704 {
 705         int ret;
 706 
 707         ret = dlm_unlock(conn->cc_lockspace, lksb->lksb_fsdlm.sb_lkid,
 708                          flags, &lksb->lksb_fsdlm, lksb);
 709         return ret;
 710 }
 711 
 712 static int user_dlm_lock_status(struct ocfs2_dlm_lksb *lksb)
 713 {
 714         return lksb->lksb_fsdlm.sb_status;
 715 }
 716 
 717 static int user_dlm_lvb_valid(struct ocfs2_dlm_lksb *lksb)
 718 {
 719         int invalid = lksb->lksb_fsdlm.sb_flags & DLM_SBF_VALNOTVALID;
 720 
 721         return !invalid;
 722 }
 723 
 724 static void *user_dlm_lvb(struct ocfs2_dlm_lksb *lksb)
 725 {
 726         if (!lksb->lksb_fsdlm.sb_lvbptr)
 727                 lksb->lksb_fsdlm.sb_lvbptr = (char *)lksb +
 728                                              sizeof(struct dlm_lksb);
 729         return (void *)(lksb->lksb_fsdlm.sb_lvbptr);
 730 }
 731 
 732 static void user_dlm_dump_lksb(struct ocfs2_dlm_lksb *lksb)
 733 {
 734 }
 735 
 736 static int user_plock(struct ocfs2_cluster_connection *conn,
 737                       u64 ino,
 738                       struct file *file,
 739                       int cmd,
 740                       struct file_lock *fl)
 741 {
 742         /*
 743          * This more or less just demuxes the plock request into any
 744          * one of three dlm calls.
 745          *
 746          * Internally, fs/dlm will pass these to a misc device, which
 747          * a userspace daemon will read and write to.
 748          *
 749          * For now, cancel requests (which happen internally only),
 750          * are turned into unlocks. Most of this function taken from
 751          * gfs2_lock.
 752          */
 753 
 754         if (cmd == F_CANCELLK) {
 755                 cmd = F_SETLK;
 756                 fl->fl_type = F_UNLCK;
 757         }
 758 
 759         if (IS_GETLK(cmd))
 760                 return dlm_posix_get(conn->cc_lockspace, ino, file, fl);
 761         else if (fl->fl_type == F_UNLCK)
 762                 return dlm_posix_unlock(conn->cc_lockspace, ino, file, fl);
 763         else
 764                 return dlm_posix_lock(conn->cc_lockspace, ino, file, cmd, fl);
 765 }
 766 
 767 /*
 768  * Compare a requested locking protocol version against the current one.
 769  *
 770  * If the major numbers are different, they are incompatible.
 771  * If the current minor is greater than the request, they are incompatible.
 772  * If the current minor is less than or equal to the request, they are
 773  * compatible, and the requester should run at the current minor version.
 774  */
 775 static int fs_protocol_compare(struct ocfs2_protocol_version *existing,
 776                                struct ocfs2_protocol_version *request)
 777 {
 778         if (existing->pv_major != request->pv_major)
 779                 return 1;
 780 
 781         if (existing->pv_minor > request->pv_minor)
 782                 return 1;
 783 
 784         if (existing->pv_minor < request->pv_minor)
 785                 request->pv_minor = existing->pv_minor;
 786 
 787         return 0;
 788 }
 789 
 790 static void lvb_to_version(char *lvb, struct ocfs2_protocol_version *ver)
 791 {
 792         struct ocfs2_protocol_version *pv =
 793                 (struct ocfs2_protocol_version *)lvb;
 794         /*
 795          * ocfs2_protocol_version has two u8 variables, so we don't
 796          * need any endian conversion.
 797          */
 798         ver->pv_major = pv->pv_major;
 799         ver->pv_minor = pv->pv_minor;
 800 }
 801 
 802 static void version_to_lvb(struct ocfs2_protocol_version *ver, char *lvb)
 803 {
 804         struct ocfs2_protocol_version *pv =
 805                 (struct ocfs2_protocol_version *)lvb;
 806         /*
 807          * ocfs2_protocol_version has two u8 variables, so we don't
 808          * need any endian conversion.
 809          */
 810         pv->pv_major = ver->pv_major;
 811         pv->pv_minor = ver->pv_minor;
 812 }
 813 
 814 static void sync_wait_cb(void *arg)
 815 {
 816         struct ocfs2_cluster_connection *conn = arg;
 817         struct ocfs2_live_connection *lc = conn->cc_private;
 818         complete(&lc->oc_sync_wait);
 819 }
 820 
 821 static int sync_unlock(struct ocfs2_cluster_connection *conn,
 822                 struct dlm_lksb *lksb, char *name)
 823 {
 824         int error;
 825         struct ocfs2_live_connection *lc = conn->cc_private;
 826 
 827         error = dlm_unlock(conn->cc_lockspace, lksb->sb_lkid, 0, lksb, conn);
 828         if (error) {
 829                 printk(KERN_ERR "%s lkid %x error %d\n",
 830                                 name, lksb->sb_lkid, error);
 831                 return error;
 832         }
 833 
 834         wait_for_completion(&lc->oc_sync_wait);
 835 
 836         if (lksb->sb_status != -DLM_EUNLOCK) {
 837                 printk(KERN_ERR "%s lkid %x status %d\n",
 838                                 name, lksb->sb_lkid, lksb->sb_status);
 839                 return -1;
 840         }
 841         return 0;
 842 }
 843 
 844 static int sync_lock(struct ocfs2_cluster_connection *conn,
 845                 int mode, uint32_t flags,
 846                 struct dlm_lksb *lksb, char *name)
 847 {
 848         int error, status;
 849         struct ocfs2_live_connection *lc = conn->cc_private;
 850 
 851         error = dlm_lock(conn->cc_lockspace, mode, lksb, flags,
 852                         name, strlen(name),
 853                         0, sync_wait_cb, conn, NULL);
 854         if (error) {
 855                 printk(KERN_ERR "%s lkid %x flags %x mode %d error %d\n",
 856                                 name, lksb->sb_lkid, flags, mode, error);
 857                 return error;
 858         }
 859 
 860         wait_for_completion(&lc->oc_sync_wait);
 861 
 862         status = lksb->sb_status;
 863 
 864         if (status && status != -EAGAIN) {
 865                 printk(KERN_ERR "%s lkid %x flags %x mode %d status %d\n",
 866                                 name, lksb->sb_lkid, flags, mode, status);
 867         }
 868 
 869         return status;
 870 }
 871 
 872 
 873 static int version_lock(struct ocfs2_cluster_connection *conn, int mode,
 874                 int flags)
 875 {
 876         struct ocfs2_live_connection *lc = conn->cc_private;
 877         return sync_lock(conn, mode, flags,
 878                         &lc->oc_version_lksb, VERSION_LOCK);
 879 }
 880 
 881 static int version_unlock(struct ocfs2_cluster_connection *conn)
 882 {
 883         struct ocfs2_live_connection *lc = conn->cc_private;
 884         return sync_unlock(conn, &lc->oc_version_lksb, VERSION_LOCK);
 885 }
 886 
 887 /* get_protocol_version()
 888  *
 889  * To exchange ocfs2 versioning, we use the LVB of the version dlm lock.
 890  * The algorithm is:
 891  * 1. Attempt to take the lock in EX mode (non-blocking).
 892  * 2. If successful (which means it is the first mount), write the
 893  *    version number and downconvert to PR lock.
 894  * 3. If unsuccessful (returns -EAGAIN), read the version from the LVB after
 895  *    taking the PR lock.
 896  */
 897 
 898 static int get_protocol_version(struct ocfs2_cluster_connection *conn)
 899 {
 900         int ret;
 901         struct ocfs2_live_connection *lc = conn->cc_private;
 902         struct ocfs2_protocol_version pv;
 903 
 904         running_proto.pv_major =
 905                 ocfs2_user_plugin.sp_max_proto.pv_major;
 906         running_proto.pv_minor =
 907                 ocfs2_user_plugin.sp_max_proto.pv_minor;
 908 
 909         lc->oc_version_lksb.sb_lvbptr = lc->oc_lvb;
 910         ret = version_lock(conn, DLM_LOCK_EX,
 911                         DLM_LKF_VALBLK|DLM_LKF_NOQUEUE);
 912         if (!ret) {
 913                 conn->cc_version.pv_major = running_proto.pv_major;
 914                 conn->cc_version.pv_minor = running_proto.pv_minor;
 915                 version_to_lvb(&running_proto, lc->oc_lvb);
 916                 version_lock(conn, DLM_LOCK_PR, DLM_LKF_CONVERT|DLM_LKF_VALBLK);
 917         } else if (ret == -EAGAIN) {
 918                 ret = version_lock(conn, DLM_LOCK_PR, DLM_LKF_VALBLK);
 919                 if (ret)
 920                         goto out;
 921                 lvb_to_version(lc->oc_lvb, &pv);
 922 
 923                 if ((pv.pv_major != running_proto.pv_major) ||
 924                                 (pv.pv_minor > running_proto.pv_minor)) {
 925                         ret = -EINVAL;
 926                         goto out;
 927                 }
 928 
 929                 conn->cc_version.pv_major = pv.pv_major;
 930                 conn->cc_version.pv_minor = pv.pv_minor;
 931         }
 932 out:
 933         return ret;
 934 }
 935 
 936 static void user_recover_prep(void *arg)
 937 {
 938 }
 939 
 940 static void user_recover_slot(void *arg, struct dlm_slot *slot)
 941 {
 942         struct ocfs2_cluster_connection *conn = arg;
 943         printk(KERN_INFO "ocfs2: Node %d/%d down. Initiating recovery.\n",
 944                         slot->nodeid, slot->slot);
 945         conn->cc_recovery_handler(slot->nodeid, conn->cc_recovery_data);
 946 
 947 }
 948 
 949 static void user_recover_done(void *arg, struct dlm_slot *slots,
 950                 int num_slots, int our_slot,
 951                 uint32_t generation)
 952 {
 953         struct ocfs2_cluster_connection *conn = arg;
 954         struct ocfs2_live_connection *lc = conn->cc_private;
 955         int i;
 956 
 957         for (i = 0; i < num_slots; i++)
 958                 if (slots[i].slot == our_slot) {
 959                         atomic_set(&lc->oc_this_node, slots[i].nodeid);
 960                         break;
 961                 }
 962 
 963         lc->oc_our_slot = our_slot;
 964         wake_up(&lc->oc_wait);
 965 }
 966 
 967 static const struct dlm_lockspace_ops ocfs2_ls_ops = {
 968         .recover_prep = user_recover_prep,
 969         .recover_slot = user_recover_slot,
 970         .recover_done = user_recover_done,
 971 };
 972 
 973 static int user_cluster_disconnect(struct ocfs2_cluster_connection *conn)
 974 {
 975         version_unlock(conn);
 976         dlm_release_lockspace(conn->cc_lockspace, 2);
 977         conn->cc_lockspace = NULL;
 978         ocfs2_live_connection_drop(conn->cc_private);
 979         conn->cc_private = NULL;
 980         return 0;
 981 }
 982 
 983 static int user_cluster_connect(struct ocfs2_cluster_connection *conn)
 984 {
 985         dlm_lockspace_t *fsdlm;
 986         struct ocfs2_live_connection *lc;
 987         int rc, ops_rv;
 988 
 989         BUG_ON(conn == NULL);
 990 
 991         lc = kzalloc(sizeof(struct ocfs2_live_connection), GFP_KERNEL);
 992         if (!lc)
 993                 return -ENOMEM;
 994 
 995         init_waitqueue_head(&lc->oc_wait);
 996         init_completion(&lc->oc_sync_wait);
 997         atomic_set(&lc->oc_this_node, 0);
 998         conn->cc_private = lc;
 999         lc->oc_type = NO_CONTROLD;
1000 
1001         rc = dlm_new_lockspace(conn->cc_name, conn->cc_cluster_name,
1002                                DLM_LSFL_FS | DLM_LSFL_NEWEXCL, DLM_LVB_LEN,
1003                                &ocfs2_ls_ops, conn, &ops_rv, &fsdlm);
1004         if (rc) {
1005                 if (rc == -EEXIST || rc == -EPROTO)
1006                         printk(KERN_ERR "ocfs2: Unable to create the "
1007                                 "lockspace %s (%d), because a ocfs2-tools "
1008                                 "program is running on this file system "
1009                                 "with the same name lockspace\n",
1010                                 conn->cc_name, rc);
1011                 goto out;
1012         }
1013 
1014         if (ops_rv == -EOPNOTSUPP) {
1015                 lc->oc_type = WITH_CONTROLD;
1016                 printk(KERN_NOTICE "ocfs2: You seem to be using an older "
1017                                 "version of dlm_controld and/or ocfs2-tools."
1018                                 " Please consider upgrading.\n");
1019         } else if (ops_rv) {
1020                 rc = ops_rv;
1021                 goto out;
1022         }
1023         conn->cc_lockspace = fsdlm;
1024 
1025         rc = ocfs2_live_connection_attach(conn, lc);
1026         if (rc)
1027                 goto out;
1028 
1029         if (lc->oc_type == NO_CONTROLD) {
1030                 rc = get_protocol_version(conn);
1031                 if (rc) {
1032                         printk(KERN_ERR "ocfs2: Could not determine"
1033                                         " locking version\n");
1034                         user_cluster_disconnect(conn);
1035                         goto out;
1036                 }
1037                 wait_event(lc->oc_wait, (atomic_read(&lc->oc_this_node) > 0));
1038         }
1039 
1040         /*
1041          * running_proto must have been set before we allowed any mounts
1042          * to proceed.
1043          */
1044         if (fs_protocol_compare(&running_proto, &conn->cc_version)) {
1045                 printk(KERN_ERR
1046                        "Unable to mount with fs locking protocol version "
1047                        "%u.%u because negotiated protocol is %u.%u\n",
1048                        conn->cc_version.pv_major, conn->cc_version.pv_minor,
1049                        running_proto.pv_major, running_proto.pv_minor);
1050                 rc = -EPROTO;
1051                 ocfs2_live_connection_drop(lc);
1052                 lc = NULL;
1053         }
1054 
1055 out:
1056         if (rc)
1057                 kfree(lc);
1058         return rc;
1059 }
1060 
1061 
1062 static int user_cluster_this_node(struct ocfs2_cluster_connection *conn,
1063                                   unsigned int *this_node)
1064 {
1065         int rc;
1066         struct ocfs2_live_connection *lc = conn->cc_private;
1067 
1068         if (lc->oc_type == WITH_CONTROLD)
1069                 rc = ocfs2_control_get_this_node();
1070         else if (lc->oc_type == NO_CONTROLD)
1071                 rc = atomic_read(&lc->oc_this_node);
1072         else
1073                 rc = -EINVAL;
1074 
1075         if (rc < 0)
1076                 return rc;
1077 
1078         *this_node = rc;
1079         return 0;
1080 }
1081 
1082 static struct ocfs2_stack_operations ocfs2_user_plugin_ops = {
1083         .connect        = user_cluster_connect,
1084         .disconnect     = user_cluster_disconnect,
1085         .this_node      = user_cluster_this_node,
1086         .dlm_lock       = user_dlm_lock,
1087         .dlm_unlock     = user_dlm_unlock,
1088         .lock_status    = user_dlm_lock_status,
1089         .lvb_valid      = user_dlm_lvb_valid,
1090         .lock_lvb       = user_dlm_lvb,
1091         .plock          = user_plock,
1092         .dump_lksb      = user_dlm_dump_lksb,
1093 };
1094 
1095 static struct ocfs2_stack_plugin ocfs2_user_plugin = {
1096         .sp_name        = "user",
1097         .sp_ops         = &ocfs2_user_plugin_ops,
1098         .sp_owner       = THIS_MODULE,
1099 };
1100 
1101 
1102 static int __init ocfs2_user_plugin_init(void)
1103 {
1104         int rc;
1105 
1106         rc = ocfs2_control_init();
1107         if (!rc) {
1108                 rc = ocfs2_stack_glue_register(&ocfs2_user_plugin);
1109                 if (rc)
1110                         ocfs2_control_exit();
1111         }
1112 
1113         return rc;
1114 }
1115 
1116 static void __exit ocfs2_user_plugin_exit(void)
1117 {
1118         ocfs2_stack_glue_unregister(&ocfs2_user_plugin);
1119         ocfs2_control_exit();
1120 }
1121 
1122 MODULE_AUTHOR("Oracle");
1123 MODULE_DESCRIPTION("ocfs2 driver for userspace cluster stacks");
1124 MODULE_LICENSE("GPL");
1125 module_init(ocfs2_user_plugin_init);
1126 module_exit(ocfs2_user_plugin_exit);

/* [<][>][^][v][top][bottom][index][help] */