1#include <linux/ceph/ceph_debug.h>
2
3#include <linux/module.h>
4#include <linux/sched.h>
5#include <linux/slab.h>
6#include <linux/file.h>
7#include <linux/mount.h>
8#include <linux/namei.h>
9#include <linux/writeback.h>
10#include <linux/falloc.h>
11
12#include "super.h"
13#include "mds_client.h"
14#include "cache.h"
15
16/*
17 * Ceph file operations
18 *
19 * Implement basic open/close functionality, and implement
20 * read/write.
21 *
22 * We implement three modes of file I/O:
23 *  - buffered uses the generic_file_aio_{read,write} helpers
24 *
25 *  - synchronous is used when there is multi-client read/write
26 *    sharing, avoids the page cache, and synchronously waits for an
27 *    ack from the OSD.
28 *
29 *  - direct io takes the variant of the sync path that references
30 *    user pages directly.
31 *
32 * fsync() flushes and waits on dirty pages, but just queues metadata
33 * for writeback: since the MDS can recover size and mtime there is no
34 * need to wait for MDS acknowledgement.
35 */
36
37
38/*
39 * Prepare an open request.  Preallocate ceph_cap to avoid an
40 * inopportune ENOMEM later.
41 */
42static struct ceph_mds_request *
43prepare_open_request(struct super_block *sb, int flags, int create_mode)
44{
45	struct ceph_fs_client *fsc = ceph_sb_to_client(sb);
46	struct ceph_mds_client *mdsc = fsc->mdsc;
47	struct ceph_mds_request *req;
48	int want_auth = USE_ANY_MDS;
49	int op = (flags & O_CREAT) ? CEPH_MDS_OP_CREATE : CEPH_MDS_OP_OPEN;
50
51	if (flags & (O_WRONLY|O_RDWR|O_CREAT|O_TRUNC))
52		want_auth = USE_AUTH_MDS;
53
54	req = ceph_mdsc_create_request(mdsc, op, want_auth);
55	if (IS_ERR(req))
56		goto out;
57	req->r_fmode = ceph_flags_to_mode(flags);
58	req->r_args.open.flags = cpu_to_le32(flags);
59	req->r_args.open.mode = cpu_to_le32(create_mode);
60out:
61	return req;
62}
63
64/*
65 * initialize private struct file data.
66 * if we fail, clean up by dropping fmode reference on the ceph_inode
67 */
68static int ceph_init_file(struct inode *inode, struct file *file, int fmode)
69{
70	struct ceph_file_info *cf;
71	int ret = 0;
72	struct ceph_inode_info *ci = ceph_inode(inode);
73	struct ceph_fs_client *fsc = ceph_sb_to_client(inode->i_sb);
74	struct ceph_mds_client *mdsc = fsc->mdsc;
75
76	switch (inode->i_mode & S_IFMT) {
77	case S_IFREG:
78		/* First file open request creates the cookie, we want to keep
79		 * this cookie around for the filetime of the inode as not to
80		 * have to worry about fscache register / revoke / operation
81		 * races.
82		 *
83		 * Also, if we know the operation is going to invalidate data
84		 * (non readonly) just nuke the cache right away.
85		 */
86		ceph_fscache_register_inode_cookie(mdsc->fsc, ci);
87		if ((fmode & CEPH_FILE_MODE_WR))
88			ceph_fscache_invalidate(inode);
89	case S_IFDIR:
90		dout("init_file %p %p 0%o (regular)\n", inode, file,
91		     inode->i_mode);
92		cf = kmem_cache_alloc(ceph_file_cachep, GFP_NOFS | __GFP_ZERO);
93		if (cf == NULL) {
94			ceph_put_fmode(ceph_inode(inode), fmode); /* clean up */
95			return -ENOMEM;
96		}
97		cf->fmode = fmode;
98		cf->next_offset = 2;
99		file->private_data = cf;
100		BUG_ON(inode->i_fop->release != ceph_release);
101		break;
102
103	case S_IFLNK:
104		dout("init_file %p %p 0%o (symlink)\n", inode, file,
105		     inode->i_mode);
106		ceph_put_fmode(ceph_inode(inode), fmode); /* clean up */
107		break;
108
109	default:
110		dout("init_file %p %p 0%o (special)\n", inode, file,
111		     inode->i_mode);
112		/*
113		 * we need to drop the open ref now, since we don't
114		 * have .release set to ceph_release.
115		 */
116		ceph_put_fmode(ceph_inode(inode), fmode); /* clean up */
117		BUG_ON(inode->i_fop->release == ceph_release);
118
119		/* call the proper open fop */
120		ret = inode->i_fop->open(inode, file);
121	}
122	return ret;
123}
124
125/*
126 * If we already have the requisite capabilities, we can satisfy
127 * the open request locally (no need to request new caps from the
128 * MDS).  We do, however, need to inform the MDS (asynchronously)
129 * if our wanted caps set expands.
130 */
131int ceph_open(struct inode *inode, struct file *file)
132{
133	struct ceph_inode_info *ci = ceph_inode(inode);
134	struct ceph_fs_client *fsc = ceph_sb_to_client(inode->i_sb);
135	struct ceph_mds_client *mdsc = fsc->mdsc;
136	struct ceph_mds_request *req;
137	struct ceph_file_info *cf = file->private_data;
138	struct inode *parent_inode = NULL;
139	int err;
140	int flags, fmode, wanted;
141
142	if (cf) {
143		dout("open file %p is already opened\n", file);
144		return 0;
145	}
146
147	/* filter out O_CREAT|O_EXCL; vfs did that already.  yuck. */
148	flags = file->f_flags & ~(O_CREAT|O_EXCL);
149	if (S_ISDIR(inode->i_mode))
150		flags = O_DIRECTORY;  /* mds likes to know */
151
152	dout("open inode %p ino %llx.%llx file %p flags %d (%d)\n", inode,
153	     ceph_vinop(inode), file, flags, file->f_flags);
154	fmode = ceph_flags_to_mode(flags);
155	wanted = ceph_caps_for_mode(fmode);
156
157	/* snapped files are read-only */
158	if (ceph_snap(inode) != CEPH_NOSNAP && (file->f_mode & FMODE_WRITE))
159		return -EROFS;
160
161	/* trivially open snapdir */
162	if (ceph_snap(inode) == CEPH_SNAPDIR) {
163		spin_lock(&ci->i_ceph_lock);
164		__ceph_get_fmode(ci, fmode);
165		spin_unlock(&ci->i_ceph_lock);
166		return ceph_init_file(inode, file, fmode);
167	}
168
169	/*
170	 * No need to block if we have caps on the auth MDS (for
171	 * write) or any MDS (for read).  Update wanted set
172	 * asynchronously.
173	 */
174	spin_lock(&ci->i_ceph_lock);
175	if (__ceph_is_any_real_caps(ci) &&
176	    (((fmode & CEPH_FILE_MODE_WR) == 0) || ci->i_auth_cap)) {
177		int mds_wanted = __ceph_caps_mds_wanted(ci);
178		int issued = __ceph_caps_issued(ci, NULL);
179
180		dout("open %p fmode %d want %s issued %s using existing\n",
181		     inode, fmode, ceph_cap_string(wanted),
182		     ceph_cap_string(issued));
183		__ceph_get_fmode(ci, fmode);
184		spin_unlock(&ci->i_ceph_lock);
185
186		/* adjust wanted? */
187		if ((issued & wanted) != wanted &&
188		    (mds_wanted & wanted) != wanted &&
189		    ceph_snap(inode) != CEPH_SNAPDIR)
190			ceph_check_caps(ci, 0, NULL);
191
192		return ceph_init_file(inode, file, fmode);
193	} else if (ceph_snap(inode) != CEPH_NOSNAP &&
194		   (ci->i_snap_caps & wanted) == wanted) {
195		__ceph_get_fmode(ci, fmode);
196		spin_unlock(&ci->i_ceph_lock);
197		return ceph_init_file(inode, file, fmode);
198	}
199
200	spin_unlock(&ci->i_ceph_lock);
201
202	dout("open fmode %d wants %s\n", fmode, ceph_cap_string(wanted));
203	req = prepare_open_request(inode->i_sb, flags, 0);
204	if (IS_ERR(req)) {
205		err = PTR_ERR(req);
206		goto out;
207	}
208	req->r_inode = inode;
209	ihold(inode);
210
211	req->r_num_caps = 1;
212	if (flags & O_CREAT)
213		parent_inode = ceph_get_dentry_parent_inode(file->f_path.dentry);
214	err = ceph_mdsc_do_request(mdsc, parent_inode, req);
215	iput(parent_inode);
216	if (!err)
217		err = ceph_init_file(inode, file, req->r_fmode);
218	ceph_mdsc_put_request(req);
219	dout("open result=%d on %llx.%llx\n", err, ceph_vinop(inode));
220out:
221	return err;
222}
223
224
225/*
226 * Do a lookup + open with a single request.  If we get a non-existent
227 * file or symlink, return 1 so the VFS can retry.
228 */
229int ceph_atomic_open(struct inode *dir, struct dentry *dentry,
230		     struct file *file, unsigned flags, umode_t mode,
231		     int *opened)
232{
233	struct ceph_fs_client *fsc = ceph_sb_to_client(dir->i_sb);
234	struct ceph_mds_client *mdsc = fsc->mdsc;
235	struct ceph_mds_request *req;
236	struct dentry *dn;
237	struct ceph_acls_info acls = {};
238	int err;
239
240	dout("atomic_open %p dentry %p '%pd' %s flags %d mode 0%o\n",
241	     dir, dentry, dentry,
242	     d_unhashed(dentry) ? "unhashed" : "hashed", flags, mode);
243
244	if (dentry->d_name.len > NAME_MAX)
245		return -ENAMETOOLONG;
246
247	err = ceph_init_dentry(dentry);
248	if (err < 0)
249		return err;
250
251	if (flags & O_CREAT) {
252		err = ceph_pre_init_acls(dir, &mode, &acls);
253		if (err < 0)
254			return err;
255	}
256
257	/* do the open */
258	req = prepare_open_request(dir->i_sb, flags, mode);
259	if (IS_ERR(req)) {
260		err = PTR_ERR(req);
261		goto out_acl;
262	}
263	req->r_dentry = dget(dentry);
264	req->r_num_caps = 2;
265	if (flags & O_CREAT) {
266		req->r_dentry_drop = CEPH_CAP_FILE_SHARED;
267		req->r_dentry_unless = CEPH_CAP_FILE_EXCL;
268		if (acls.pagelist) {
269			req->r_pagelist = acls.pagelist;
270			acls.pagelist = NULL;
271		}
272	}
273	req->r_locked_dir = dir;           /* caller holds dir->i_mutex */
274	err = ceph_mdsc_do_request(mdsc,
275				   (flags & (O_CREAT|O_TRUNC)) ? dir : NULL,
276				   req);
277	err = ceph_handle_snapdir(req, dentry, err);
278	if (err)
279		goto out_req;
280
281	if (err == 0 && (flags & O_CREAT) && !req->r_reply_info.head->is_dentry)
282		err = ceph_handle_notrace_create(dir, dentry);
283
284	if (d_unhashed(dentry)) {
285		dn = ceph_finish_lookup(req, dentry, err);
286		if (IS_ERR(dn))
287			err = PTR_ERR(dn);
288	} else {
289		/* we were given a hashed negative dentry */
290		dn = NULL;
291	}
292	if (err)
293		goto out_req;
294	if (dn || d_really_is_negative(dentry) || d_is_symlink(dentry)) {
295		/* make vfs retry on splice, ENOENT, or symlink */
296		dout("atomic_open finish_no_open on dn %p\n", dn);
297		err = finish_no_open(file, dn);
298	} else {
299		dout("atomic_open finish_open on dn %p\n", dn);
300		if (req->r_op == CEPH_MDS_OP_CREATE && req->r_reply_info.has_create_ino) {
301			ceph_init_inode_acls(d_inode(dentry), &acls);
302			*opened |= FILE_CREATED;
303		}
304		err = finish_open(file, dentry, ceph_open, opened);
305	}
306out_req:
307	if (!req->r_err && req->r_target_inode)
308		ceph_put_fmode(ceph_inode(req->r_target_inode), req->r_fmode);
309	ceph_mdsc_put_request(req);
310out_acl:
311	ceph_release_acls_info(&acls);
312	dout("atomic_open result=%d\n", err);
313	return err;
314}
315
316int ceph_release(struct inode *inode, struct file *file)
317{
318	struct ceph_inode_info *ci = ceph_inode(inode);
319	struct ceph_file_info *cf = file->private_data;
320
321	dout("release inode %p file %p\n", inode, file);
322	ceph_put_fmode(ci, cf->fmode);
323	if (cf->last_readdir)
324		ceph_mdsc_put_request(cf->last_readdir);
325	kfree(cf->last_name);
326	kfree(cf->dir_info);
327	dput(cf->dentry);
328	kmem_cache_free(ceph_file_cachep, cf);
329
330	/* wake up anyone waiting for caps on this inode */
331	wake_up_all(&ci->i_cap_wq);
332	return 0;
333}
334
335enum {
336	CHECK_EOF = 1,
337	READ_INLINE = 2,
338};
339
340/*
341 * Read a range of bytes striped over one or more objects.  Iterate over
342 * objects we stripe over.  (That's not atomic, but good enough for now.)
343 *
344 * If we get a short result from the OSD, check against i_size; we need to
345 * only return a short read to the caller if we hit EOF.
346 */
347static int striped_read(struct inode *inode,
348			u64 off, u64 len,
349			struct page **pages, int num_pages,
350			int *checkeof, bool o_direct,
351			unsigned long buf_align)
352{
353	struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
354	struct ceph_inode_info *ci = ceph_inode(inode);
355	u64 pos, this_len, left;
356	int io_align, page_align;
357	int pages_left;
358	int read;
359	struct page **page_pos;
360	int ret;
361	bool hit_stripe, was_short;
362
363	/*
364	 * we may need to do multiple reads.  not atomic, unfortunately.
365	 */
366	pos = off;
367	left = len;
368	page_pos = pages;
369	pages_left = num_pages;
370	read = 0;
371	io_align = off & ~PAGE_MASK;
372
373more:
374	if (o_direct)
375		page_align = (pos - io_align + buf_align) & ~PAGE_MASK;
376	else
377		page_align = pos & ~PAGE_MASK;
378	this_len = left;
379	ret = ceph_osdc_readpages(&fsc->client->osdc, ceph_vino(inode),
380				  &ci->i_layout, pos, &this_len,
381				  ci->i_truncate_seq,
382				  ci->i_truncate_size,
383				  page_pos, pages_left, page_align);
384	if (ret == -ENOENT)
385		ret = 0;
386	hit_stripe = this_len < left;
387	was_short = ret >= 0 && ret < this_len;
388	dout("striped_read %llu~%llu (read %u) got %d%s%s\n", pos, left, read,
389	     ret, hit_stripe ? " HITSTRIPE" : "", was_short ? " SHORT" : "");
390
391	if (ret >= 0) {
392		int didpages;
393		if (was_short && (pos + ret < inode->i_size)) {
394			int zlen = min(this_len - ret,
395				       inode->i_size - pos - ret);
396			int zoff = (o_direct ? buf_align : io_align) +
397				    read + ret;
398			dout(" zero gap %llu to %llu\n",
399				pos + ret, pos + ret + zlen);
400			ceph_zero_page_vector_range(zoff, zlen, pages);
401			ret += zlen;
402		}
403
404		didpages = (page_align + ret) >> PAGE_CACHE_SHIFT;
405		pos += ret;
406		read = pos - off;
407		left -= ret;
408		page_pos += didpages;
409		pages_left -= didpages;
410
411		/* hit stripe and need continue*/
412		if (left && hit_stripe && pos < inode->i_size)
413			goto more;
414	}
415
416	if (read > 0) {
417		ret = read;
418		/* did we bounce off eof? */
419		if (pos + left > inode->i_size)
420			*checkeof = CHECK_EOF;
421	}
422
423	dout("striped_read returns %d\n", ret);
424	return ret;
425}
426
427/*
428 * Completely synchronous read and write methods.  Direct from __user
429 * buffer to osd, or directly to user pages (if O_DIRECT).
430 *
431 * If the read spans object boundary, just do multiple reads.
432 */
433static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *i,
434				int *checkeof)
435{
436	struct file *file = iocb->ki_filp;
437	struct inode *inode = file_inode(file);
438	struct page **pages;
439	u64 off = iocb->ki_pos;
440	int num_pages, ret;
441	size_t len = iov_iter_count(i);
442
443	dout("sync_read on file %p %llu~%u %s\n", file, off,
444	     (unsigned)len,
445	     (file->f_flags & O_DIRECT) ? "O_DIRECT" : "");
446
447	if (!len)
448		return 0;
449	/*
450	 * flush any page cache pages in this range.  this
451	 * will make concurrent normal and sync io slow,
452	 * but it will at least behave sensibly when they are
453	 * in sequence.
454	 */
455	ret = filemap_write_and_wait_range(inode->i_mapping, off,
456						off + len);
457	if (ret < 0)
458		return ret;
459
460	if (iocb->ki_flags & IOCB_DIRECT) {
461		while (iov_iter_count(i)) {
462			size_t start;
463			ssize_t n;
464
465			n = iov_iter_get_pages_alloc(i, &pages, INT_MAX, &start);
466			if (n < 0)
467				return n;
468
469			num_pages = (n + start + PAGE_SIZE - 1) / PAGE_SIZE;
470
471			ret = striped_read(inode, off, n,
472					   pages, num_pages, checkeof,
473					   1, start);
474
475			ceph_put_page_vector(pages, num_pages, true);
476
477			if (ret <= 0)
478				break;
479			off += ret;
480			iov_iter_advance(i, ret);
481			if (ret < n)
482				break;
483		}
484	} else {
485		num_pages = calc_pages_for(off, len);
486		pages = ceph_alloc_page_vector(num_pages, GFP_NOFS);
487		if (IS_ERR(pages))
488			return PTR_ERR(pages);
489		ret = striped_read(inode, off, len, pages,
490					num_pages, checkeof, 0, 0);
491		if (ret > 0) {
492			int l, k = 0;
493			size_t left = ret;
494
495			while (left) {
496				size_t page_off = off & ~PAGE_MASK;
497				size_t copy = min_t(size_t,
498						    PAGE_SIZE - page_off, left);
499				l = copy_page_to_iter(pages[k++], page_off,
500						      copy, i);
501				off += l;
502				left -= l;
503				if (l < copy)
504					break;
505			}
506		}
507		ceph_release_page_vector(pages, num_pages);
508	}
509
510	if (off > iocb->ki_pos) {
511		ret = off - iocb->ki_pos;
512		iocb->ki_pos = off;
513	}
514
515	dout("sync_read result %d\n", ret);
516	return ret;
517}
518
519/*
520 * Write commit request unsafe callback, called to tell us when a
521 * request is unsafe (that is, in flight--has been handed to the
522 * messenger to send to its target osd).  It is called again when
523 * we've received a response message indicating the request is
524 * "safe" (its CEPH_OSD_FLAG_ONDISK flag is set), or when a request
525 * is completed early (and unsuccessfully) due to a timeout or
526 * interrupt.
527 *
528 * This is used if we requested both an ACK and ONDISK commit reply
529 * from the OSD.
530 */
531static void ceph_sync_write_unsafe(struct ceph_osd_request *req, bool unsafe)
532{
533	struct ceph_inode_info *ci = ceph_inode(req->r_inode);
534
535	dout("%s %p tid %llu %ssafe\n", __func__, req, req->r_tid,
536		unsafe ? "un" : "");
537	if (unsafe) {
538		ceph_get_cap_refs(ci, CEPH_CAP_FILE_WR);
539		spin_lock(&ci->i_unsafe_lock);
540		list_add_tail(&req->r_unsafe_item,
541			      &ci->i_unsafe_writes);
542		spin_unlock(&ci->i_unsafe_lock);
543	} else {
544		spin_lock(&ci->i_unsafe_lock);
545		list_del_init(&req->r_unsafe_item);
546		spin_unlock(&ci->i_unsafe_lock);
547		ceph_put_cap_refs(ci, CEPH_CAP_FILE_WR);
548	}
549}
550
551
552/*
553 * Synchronous write, straight from __user pointer or user pages.
554 *
555 * If write spans object boundary, just do multiple writes.  (For a
556 * correct atomic write, we should e.g. take write locks on all
557 * objects, rollback on failure, etc.)
558 */
559static ssize_t
560ceph_sync_direct_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos)
561{
562	struct file *file = iocb->ki_filp;
563	struct inode *inode = file_inode(file);
564	struct ceph_inode_info *ci = ceph_inode(inode);
565	struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
566	struct ceph_snap_context *snapc;
567	struct ceph_vino vino;
568	struct ceph_osd_request *req;
569	struct page **pages;
570	int num_pages;
571	int written = 0;
572	int flags;
573	int check_caps = 0;
574	int ret;
575	struct timespec mtime = CURRENT_TIME;
576	size_t count = iov_iter_count(from);
577
578	if (ceph_snap(file_inode(file)) != CEPH_NOSNAP)
579		return -EROFS;
580
581	dout("sync_direct_write on file %p %lld~%u\n", file, pos,
582	     (unsigned)count);
583
584	ret = filemap_write_and_wait_range(inode->i_mapping, pos, pos + count);
585	if (ret < 0)
586		return ret;
587
588	ret = invalidate_inode_pages2_range(inode->i_mapping,
589					    pos >> PAGE_CACHE_SHIFT,
590					    (pos + count) >> PAGE_CACHE_SHIFT);
591	if (ret < 0)
592		dout("invalidate_inode_pages2_range returned %d\n", ret);
593
594	flags = CEPH_OSD_FLAG_ORDERSNAP |
595		CEPH_OSD_FLAG_ONDISK |
596		CEPH_OSD_FLAG_WRITE;
597
598	while (iov_iter_count(from) > 0) {
599		u64 len = iov_iter_single_seg_count(from);
600		size_t start;
601		ssize_t n;
602
603		snapc = ci->i_snap_realm->cached_context;
604		vino = ceph_vino(inode);
605		req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
606					    vino, pos, &len, 0,
607					    2,/*include a 'startsync' command*/
608					    CEPH_OSD_OP_WRITE, flags, snapc,
609					    ci->i_truncate_seq,
610					    ci->i_truncate_size,
611					    false);
612		if (IS_ERR(req)) {
613			ret = PTR_ERR(req);
614			break;
615		}
616
617		osd_req_op_init(req, 1, CEPH_OSD_OP_STARTSYNC);
618
619		n = iov_iter_get_pages_alloc(from, &pages, len, &start);
620		if (unlikely(n < 0)) {
621			ret = n;
622			ceph_osdc_put_request(req);
623			break;
624		}
625
626		num_pages = (n + start + PAGE_SIZE - 1) / PAGE_SIZE;
627		/*
628		 * throw out any page cache pages in this range. this
629		 * may block.
630		 */
631		truncate_inode_pages_range(inode->i_mapping, pos,
632				   (pos+n) | (PAGE_CACHE_SIZE-1));
633		osd_req_op_extent_osd_data_pages(req, 0, pages, n, start,
634						false, false);
635
636		/* BUG_ON(vino.snap != CEPH_NOSNAP); */
637		ceph_osdc_build_request(req, pos, snapc, vino.snap, &mtime);
638
639		ret = ceph_osdc_start_request(&fsc->client->osdc, req, false);
640		if (!ret)
641			ret = ceph_osdc_wait_request(&fsc->client->osdc, req);
642
643		ceph_put_page_vector(pages, num_pages, false);
644
645		ceph_osdc_put_request(req);
646		if (ret)
647			break;
648		pos += n;
649		written += n;
650		iov_iter_advance(from, n);
651
652		if (pos > i_size_read(inode)) {
653			check_caps = ceph_inode_set_size(inode, pos);
654			if (check_caps)
655				ceph_check_caps(ceph_inode(inode),
656						CHECK_CAPS_AUTHONLY,
657						NULL);
658		}
659	}
660
661	if (ret != -EOLDSNAPC && written > 0) {
662		iocb->ki_pos = pos;
663		ret = written;
664	}
665	return ret;
666}
667
668
669/*
670 * Synchronous write, straight from __user pointer or user pages.
671 *
672 * If write spans object boundary, just do multiple writes.  (For a
673 * correct atomic write, we should e.g. take write locks on all
674 * objects, rollback on failure, etc.)
675 */
676static ssize_t
677ceph_sync_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos)
678{
679	struct file *file = iocb->ki_filp;
680	struct inode *inode = file_inode(file);
681	struct ceph_inode_info *ci = ceph_inode(inode);
682	struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
683	struct ceph_snap_context *snapc;
684	struct ceph_vino vino;
685	struct ceph_osd_request *req;
686	struct page **pages;
687	u64 len;
688	int num_pages;
689	int written = 0;
690	int flags;
691	int check_caps = 0;
692	int ret;
693	struct timespec mtime = CURRENT_TIME;
694	size_t count = iov_iter_count(from);
695
696	if (ceph_snap(file_inode(file)) != CEPH_NOSNAP)
697		return -EROFS;
698
699	dout("sync_write on file %p %lld~%u\n", file, pos, (unsigned)count);
700
701	ret = filemap_write_and_wait_range(inode->i_mapping, pos, pos + count);
702	if (ret < 0)
703		return ret;
704
705	ret = invalidate_inode_pages2_range(inode->i_mapping,
706					    pos >> PAGE_CACHE_SHIFT,
707					    (pos + count) >> PAGE_CACHE_SHIFT);
708	if (ret < 0)
709		dout("invalidate_inode_pages2_range returned %d\n", ret);
710
711	flags = CEPH_OSD_FLAG_ORDERSNAP |
712		CEPH_OSD_FLAG_ONDISK |
713		CEPH_OSD_FLAG_WRITE |
714		CEPH_OSD_FLAG_ACK;
715
716	while ((len = iov_iter_count(from)) > 0) {
717		size_t left;
718		int n;
719
720		snapc = ci->i_snap_realm->cached_context;
721		vino = ceph_vino(inode);
722		req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
723					    vino, pos, &len, 0, 1,
724					    CEPH_OSD_OP_WRITE, flags, snapc,
725					    ci->i_truncate_seq,
726					    ci->i_truncate_size,
727					    false);
728		if (IS_ERR(req)) {
729			ret = PTR_ERR(req);
730			break;
731		}
732
733		/*
734		 * write from beginning of first page,
735		 * regardless of io alignment
736		 */
737		num_pages = (len + PAGE_CACHE_SIZE - 1) >> PAGE_CACHE_SHIFT;
738
739		pages = ceph_alloc_page_vector(num_pages, GFP_NOFS);
740		if (IS_ERR(pages)) {
741			ret = PTR_ERR(pages);
742			goto out;
743		}
744
745		left = len;
746		for (n = 0; n < num_pages; n++) {
747			size_t plen = min_t(size_t, left, PAGE_SIZE);
748			ret = copy_page_from_iter(pages[n], 0, plen, from);
749			if (ret != plen) {
750				ret = -EFAULT;
751				break;
752			}
753			left -= ret;
754		}
755
756		if (ret < 0) {
757			ceph_release_page_vector(pages, num_pages);
758			goto out;
759		}
760
761		/* get a second commit callback */
762		req->r_unsafe_callback = ceph_sync_write_unsafe;
763		req->r_inode = inode;
764
765		osd_req_op_extent_osd_data_pages(req, 0, pages, len, 0,
766						false, true);
767
768		/* BUG_ON(vino.snap != CEPH_NOSNAP); */
769		ceph_osdc_build_request(req, pos, snapc, vino.snap, &mtime);
770
771		ret = ceph_osdc_start_request(&fsc->client->osdc, req, false);
772		if (!ret)
773			ret = ceph_osdc_wait_request(&fsc->client->osdc, req);
774
775out:
776		ceph_osdc_put_request(req);
777		if (ret == 0) {
778			pos += len;
779			written += len;
780
781			if (pos > i_size_read(inode)) {
782				check_caps = ceph_inode_set_size(inode, pos);
783				if (check_caps)
784					ceph_check_caps(ceph_inode(inode),
785							CHECK_CAPS_AUTHONLY,
786							NULL);
787			}
788		} else
789			break;
790	}
791
792	if (ret != -EOLDSNAPC && written > 0) {
793		ret = written;
794		iocb->ki_pos = pos;
795	}
796	return ret;
797}
798
799/*
800 * Wrap generic_file_aio_read with checks for cap bits on the inode.
801 * Atomically grab references, so that those bits are not released
802 * back to the MDS mid-read.
803 *
804 * Hmm, the sync read case isn't actually async... should it be?
805 */
806static ssize_t ceph_read_iter(struct kiocb *iocb, struct iov_iter *to)
807{
808	struct file *filp = iocb->ki_filp;
809	struct ceph_file_info *fi = filp->private_data;
810	size_t len = iov_iter_count(to);
811	struct inode *inode = file_inode(filp);
812	struct ceph_inode_info *ci = ceph_inode(inode);
813	struct page *pinned_page = NULL;
814	ssize_t ret;
815	int want, got = 0;
816	int retry_op = 0, read = 0;
817
818again:
819	dout("aio_read %p %llx.%llx %llu~%u trying to get caps on %p\n",
820	     inode, ceph_vinop(inode), iocb->ki_pos, (unsigned)len, inode);
821
822	if (fi->fmode & CEPH_FILE_MODE_LAZY)
823		want = CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO;
824	else
825		want = CEPH_CAP_FILE_CACHE;
826	ret = ceph_get_caps(ci, CEPH_CAP_FILE_RD, want, -1, &got, &pinned_page);
827	if (ret < 0)
828		return ret;
829
830	if ((got & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) == 0 ||
831	    (iocb->ki_flags & IOCB_DIRECT) ||
832	    (fi->flags & CEPH_F_SYNC)) {
833
834		dout("aio_sync_read %p %llx.%llx %llu~%u got cap refs on %s\n",
835		     inode, ceph_vinop(inode), iocb->ki_pos, (unsigned)len,
836		     ceph_cap_string(got));
837
838		if (ci->i_inline_version == CEPH_INLINE_NONE) {
839			/* hmm, this isn't really async... */
840			ret = ceph_sync_read(iocb, to, &retry_op);
841		} else {
842			retry_op = READ_INLINE;
843		}
844	} else {
845		dout("aio_read %p %llx.%llx %llu~%u got cap refs on %s\n",
846		     inode, ceph_vinop(inode), iocb->ki_pos, (unsigned)len,
847		     ceph_cap_string(got));
848
849		ret = generic_file_read_iter(iocb, to);
850	}
851	dout("aio_read %p %llx.%llx dropping cap refs on %s = %d\n",
852	     inode, ceph_vinop(inode), ceph_cap_string(got), (int)ret);
853	if (pinned_page) {
854		page_cache_release(pinned_page);
855		pinned_page = NULL;
856	}
857	ceph_put_cap_refs(ci, got);
858	if (retry_op && ret >= 0) {
859		int statret;
860		struct page *page = NULL;
861		loff_t i_size;
862		if (retry_op == READ_INLINE) {
863			page = __page_cache_alloc(GFP_NOFS);
864			if (!page)
865				return -ENOMEM;
866		}
867
868		statret = __ceph_do_getattr(inode, page,
869					    CEPH_STAT_CAP_INLINE_DATA, !!page);
870		if (statret < 0) {
871			 __free_page(page);
872			if (statret == -ENODATA) {
873				BUG_ON(retry_op != READ_INLINE);
874				goto again;
875			}
876			return statret;
877		}
878
879		i_size = i_size_read(inode);
880		if (retry_op == READ_INLINE) {
881			BUG_ON(ret > 0 || read > 0);
882			if (iocb->ki_pos < i_size &&
883			    iocb->ki_pos < PAGE_CACHE_SIZE) {
884				loff_t end = min_t(loff_t, i_size,
885						   iocb->ki_pos + len);
886				end = min_t(loff_t, end, PAGE_CACHE_SIZE);
887				if (statret < end)
888					zero_user_segment(page, statret, end);
889				ret = copy_page_to_iter(page,
890						iocb->ki_pos & ~PAGE_MASK,
891						end - iocb->ki_pos, to);
892				iocb->ki_pos += ret;
893				read += ret;
894			}
895			if (iocb->ki_pos < i_size && read < len) {
896				size_t zlen = min_t(size_t, len - read,
897						    i_size - iocb->ki_pos);
898				ret = iov_iter_zero(zlen, to);
899				iocb->ki_pos += ret;
900				read += ret;
901			}
902			__free_pages(page, 0);
903			return read;
904		}
905
906		/* hit EOF or hole? */
907		if (retry_op == CHECK_EOF && iocb->ki_pos < i_size &&
908		    ret < len) {
909			dout("sync_read hit hole, ppos %lld < size %lld"
910			     ", reading more\n", iocb->ki_pos,
911			     inode->i_size);
912
913			read += ret;
914			len -= ret;
915			retry_op = 0;
916			goto again;
917		}
918	}
919
920	if (ret >= 0)
921		ret += read;
922
923	return ret;
924}
925
926/*
927 * Take cap references to avoid releasing caps to MDS mid-write.
928 *
929 * If we are synchronous, and write with an old snap context, the OSD
930 * may return EOLDSNAPC.  In that case, retry the write.. _after_
931 * dropping our cap refs and allowing the pending snap to logically
932 * complete _before_ this write occurs.
933 *
934 * If we are near ENOSPC, write synchronously.
935 */
936static ssize_t ceph_write_iter(struct kiocb *iocb, struct iov_iter *from)
937{
938	struct file *file = iocb->ki_filp;
939	struct ceph_file_info *fi = file->private_data;
940	struct inode *inode = file_inode(file);
941	struct ceph_inode_info *ci = ceph_inode(inode);
942	struct ceph_osd_client *osdc =
943		&ceph_sb_to_client(inode->i_sb)->client->osdc;
944	ssize_t count, written = 0;
945	int err, want, got;
946	loff_t pos;
947
948	if (ceph_snap(inode) != CEPH_NOSNAP)
949		return -EROFS;
950
951	mutex_lock(&inode->i_mutex);
952
953	/* We can write back this queue in page reclaim */
954	current->backing_dev_info = inode_to_bdi(inode);
955
956	err = generic_write_checks(iocb, from);
957	if (err <= 0)
958		goto out;
959
960	pos = iocb->ki_pos;
961	count = iov_iter_count(from);
962	err = file_remove_suid(file);
963	if (err)
964		goto out;
965
966	err = file_update_time(file);
967	if (err)
968		goto out;
969
970	if (ci->i_inline_version != CEPH_INLINE_NONE) {
971		err = ceph_uninline_data(file, NULL);
972		if (err < 0)
973			goto out;
974	}
975
976retry_snap:
977	if (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL)) {
978		err = -ENOSPC;
979		goto out;
980	}
981
982	dout("aio_write %p %llx.%llx %llu~%zd getting caps. i_size %llu\n",
983	     inode, ceph_vinop(inode), pos, count, inode->i_size);
984	if (fi->fmode & CEPH_FILE_MODE_LAZY)
985		want = CEPH_CAP_FILE_BUFFER | CEPH_CAP_FILE_LAZYIO;
986	else
987		want = CEPH_CAP_FILE_BUFFER;
988	got = 0;
989	err = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, pos + count,
990			    &got, NULL);
991	if (err < 0)
992		goto out;
993
994	dout("aio_write %p %llx.%llx %llu~%zd got cap refs on %s\n",
995	     inode, ceph_vinop(inode), pos, count, ceph_cap_string(got));
996
997	if ((got & (CEPH_CAP_FILE_BUFFER|CEPH_CAP_FILE_LAZYIO)) == 0 ||
998	    (iocb->ki_flags & IOCB_DIRECT) || (fi->flags & CEPH_F_SYNC)) {
999		struct iov_iter data;
1000		mutex_unlock(&inode->i_mutex);
1001		/* we might need to revert back to that point */
1002		data = *from;
1003		if (iocb->ki_flags & IOCB_DIRECT)
1004			written = ceph_sync_direct_write(iocb, &data, pos);
1005		else
1006			written = ceph_sync_write(iocb, &data, pos);
1007		if (written == -EOLDSNAPC) {
1008			dout("aio_write %p %llx.%llx %llu~%u"
1009				"got EOLDSNAPC, retrying\n",
1010				inode, ceph_vinop(inode),
1011				pos, (unsigned)count);
1012			mutex_lock(&inode->i_mutex);
1013			goto retry_snap;
1014		}
1015		if (written > 0)
1016			iov_iter_advance(from, written);
1017	} else {
1018		loff_t old_size = inode->i_size;
1019		/*
1020		 * No need to acquire the i_truncate_mutex. Because
1021		 * the MDS revokes Fwb caps before sending truncate
1022		 * message to us. We can't get Fwb cap while there
1023		 * are pending vmtruncate. So write and vmtruncate
1024		 * can not run at the same time
1025		 */
1026		written = generic_perform_write(file, from, pos);
1027		if (likely(written >= 0))
1028			iocb->ki_pos = pos + written;
1029		if (inode->i_size > old_size)
1030			ceph_fscache_update_objectsize(inode);
1031		mutex_unlock(&inode->i_mutex);
1032	}
1033
1034	if (written >= 0) {
1035		int dirty;
1036		spin_lock(&ci->i_ceph_lock);
1037		ci->i_inline_version = CEPH_INLINE_NONE;
1038		dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR);
1039		spin_unlock(&ci->i_ceph_lock);
1040		if (dirty)
1041			__mark_inode_dirty(inode, dirty);
1042	}
1043
1044	dout("aio_write %p %llx.%llx %llu~%u  dropping cap refs on %s\n",
1045	     inode, ceph_vinop(inode), pos, (unsigned)count,
1046	     ceph_cap_string(got));
1047	ceph_put_cap_refs(ci, got);
1048
1049	if (written >= 0 &&
1050	    ((file->f_flags & O_SYNC) || IS_SYNC(file->f_mapping->host) ||
1051	     ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_NEARFULL))) {
1052		err = vfs_fsync_range(file, pos, pos + written - 1, 1);
1053		if (err < 0)
1054			written = err;
1055	}
1056
1057	goto out_unlocked;
1058
1059out:
1060	mutex_unlock(&inode->i_mutex);
1061out_unlocked:
1062	current->backing_dev_info = NULL;
1063	return written ? written : err;
1064}
1065
1066/*
1067 * llseek.  be sure to verify file size on SEEK_END.
1068 */
1069static loff_t ceph_llseek(struct file *file, loff_t offset, int whence)
1070{
1071	struct inode *inode = file->f_mapping->host;
1072	int ret;
1073
1074	mutex_lock(&inode->i_mutex);
1075
1076	if (whence == SEEK_END || whence == SEEK_DATA || whence == SEEK_HOLE) {
1077		ret = ceph_do_getattr(inode, CEPH_STAT_CAP_SIZE, false);
1078		if (ret < 0) {
1079			offset = ret;
1080			goto out;
1081		}
1082	}
1083
1084	switch (whence) {
1085	case SEEK_END:
1086		offset += inode->i_size;
1087		break;
1088	case SEEK_CUR:
1089		/*
1090		 * Here we special-case the lseek(fd, 0, SEEK_CUR)
1091		 * position-querying operation.  Avoid rewriting the "same"
1092		 * f_pos value back to the file because a concurrent read(),
1093		 * write() or lseek() might have altered it
1094		 */
1095		if (offset == 0) {
1096			offset = file->f_pos;
1097			goto out;
1098		}
1099		offset += file->f_pos;
1100		break;
1101	case SEEK_DATA:
1102		if (offset >= inode->i_size) {
1103			ret = -ENXIO;
1104			goto out;
1105		}
1106		break;
1107	case SEEK_HOLE:
1108		if (offset >= inode->i_size) {
1109			ret = -ENXIO;
1110			goto out;
1111		}
1112		offset = inode->i_size;
1113		break;
1114	}
1115
1116	offset = vfs_setpos(file, offset, inode->i_sb->s_maxbytes);
1117
1118out:
1119	mutex_unlock(&inode->i_mutex);
1120	return offset;
1121}
1122
1123static inline void ceph_zero_partial_page(
1124	struct inode *inode, loff_t offset, unsigned size)
1125{
1126	struct page *page;
1127	pgoff_t index = offset >> PAGE_CACHE_SHIFT;
1128
1129	page = find_lock_page(inode->i_mapping, index);
1130	if (page) {
1131		wait_on_page_writeback(page);
1132		zero_user(page, offset & (PAGE_CACHE_SIZE - 1), size);
1133		unlock_page(page);
1134		page_cache_release(page);
1135	}
1136}
1137
1138static void ceph_zero_pagecache_range(struct inode *inode, loff_t offset,
1139				      loff_t length)
1140{
1141	loff_t nearly = round_up(offset, PAGE_CACHE_SIZE);
1142	if (offset < nearly) {
1143		loff_t size = nearly - offset;
1144		if (length < size)
1145			size = length;
1146		ceph_zero_partial_page(inode, offset, size);
1147		offset += size;
1148		length -= size;
1149	}
1150	if (length >= PAGE_CACHE_SIZE) {
1151		loff_t size = round_down(length, PAGE_CACHE_SIZE);
1152		truncate_pagecache_range(inode, offset, offset + size - 1);
1153		offset += size;
1154		length -= size;
1155	}
1156	if (length)
1157		ceph_zero_partial_page(inode, offset, length);
1158}
1159
1160static int ceph_zero_partial_object(struct inode *inode,
1161				    loff_t offset, loff_t *length)
1162{
1163	struct ceph_inode_info *ci = ceph_inode(inode);
1164	struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
1165	struct ceph_osd_request *req;
1166	int ret = 0;
1167	loff_t zero = 0;
1168	int op;
1169
1170	if (!length) {
1171		op = offset ? CEPH_OSD_OP_DELETE : CEPH_OSD_OP_TRUNCATE;
1172		length = &zero;
1173	} else {
1174		op = CEPH_OSD_OP_ZERO;
1175	}
1176
1177	req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
1178					ceph_vino(inode),
1179					offset, length,
1180					0, 1, op,
1181					CEPH_OSD_FLAG_WRITE |
1182					CEPH_OSD_FLAG_ONDISK,
1183					NULL, 0, 0, false);
1184	if (IS_ERR(req)) {
1185		ret = PTR_ERR(req);
1186		goto out;
1187	}
1188
1189	ceph_osdc_build_request(req, offset, NULL, ceph_vino(inode).snap,
1190				&inode->i_mtime);
1191
1192	ret = ceph_osdc_start_request(&fsc->client->osdc, req, false);
1193	if (!ret) {
1194		ret = ceph_osdc_wait_request(&fsc->client->osdc, req);
1195		if (ret == -ENOENT)
1196			ret = 0;
1197	}
1198	ceph_osdc_put_request(req);
1199
1200out:
1201	return ret;
1202}
1203
1204static int ceph_zero_objects(struct inode *inode, loff_t offset, loff_t length)
1205{
1206	int ret = 0;
1207	struct ceph_inode_info *ci = ceph_inode(inode);
1208	s32 stripe_unit = ceph_file_layout_su(ci->i_layout);
1209	s32 stripe_count = ceph_file_layout_stripe_count(ci->i_layout);
1210	s32 object_size = ceph_file_layout_object_size(ci->i_layout);
1211	u64 object_set_size = object_size * stripe_count;
1212	u64 nearly, t;
1213
1214	/* round offset up to next period boundary */
1215	nearly = offset + object_set_size - 1;
1216	t = nearly;
1217	nearly -= do_div(t, object_set_size);
1218
1219	while (length && offset < nearly) {
1220		loff_t size = length;
1221		ret = ceph_zero_partial_object(inode, offset, &size);
1222		if (ret < 0)
1223			return ret;
1224		offset += size;
1225		length -= size;
1226	}
1227	while (length >= object_set_size) {
1228		int i;
1229		loff_t pos = offset;
1230		for (i = 0; i < stripe_count; ++i) {
1231			ret = ceph_zero_partial_object(inode, pos, NULL);
1232			if (ret < 0)
1233				return ret;
1234			pos += stripe_unit;
1235		}
1236		offset += object_set_size;
1237		length -= object_set_size;
1238	}
1239	while (length) {
1240		loff_t size = length;
1241		ret = ceph_zero_partial_object(inode, offset, &size);
1242		if (ret < 0)
1243			return ret;
1244		offset += size;
1245		length -= size;
1246	}
1247	return ret;
1248}
1249
1250static long ceph_fallocate(struct file *file, int mode,
1251				loff_t offset, loff_t length)
1252{
1253	struct ceph_file_info *fi = file->private_data;
1254	struct inode *inode = file_inode(file);
1255	struct ceph_inode_info *ci = ceph_inode(inode);
1256	struct ceph_osd_client *osdc =
1257		&ceph_inode_to_client(inode)->client->osdc;
1258	int want, got = 0;
1259	int dirty;
1260	int ret = 0;
1261	loff_t endoff = 0;
1262	loff_t size;
1263
1264	if (mode & ~(FALLOC_FL_KEEP_SIZE | FALLOC_FL_PUNCH_HOLE))
1265		return -EOPNOTSUPP;
1266
1267	if (!S_ISREG(inode->i_mode))
1268		return -EOPNOTSUPP;
1269
1270	mutex_lock(&inode->i_mutex);
1271
1272	if (ceph_snap(inode) != CEPH_NOSNAP) {
1273		ret = -EROFS;
1274		goto unlock;
1275	}
1276
1277	if (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL) &&
1278		!(mode & FALLOC_FL_PUNCH_HOLE)) {
1279		ret = -ENOSPC;
1280		goto unlock;
1281	}
1282
1283	if (ci->i_inline_version != CEPH_INLINE_NONE) {
1284		ret = ceph_uninline_data(file, NULL);
1285		if (ret < 0)
1286			goto unlock;
1287	}
1288
1289	size = i_size_read(inode);
1290	if (!(mode & FALLOC_FL_KEEP_SIZE))
1291		endoff = offset + length;
1292
1293	if (fi->fmode & CEPH_FILE_MODE_LAZY)
1294		want = CEPH_CAP_FILE_BUFFER | CEPH_CAP_FILE_LAZYIO;
1295	else
1296		want = CEPH_CAP_FILE_BUFFER;
1297
1298	ret = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, endoff, &got, NULL);
1299	if (ret < 0)
1300		goto unlock;
1301
1302	if (mode & FALLOC_FL_PUNCH_HOLE) {
1303		if (offset < size)
1304			ceph_zero_pagecache_range(inode, offset, length);
1305		ret = ceph_zero_objects(inode, offset, length);
1306	} else if (endoff > size) {
1307		truncate_pagecache_range(inode, size, -1);
1308		if (ceph_inode_set_size(inode, endoff))
1309			ceph_check_caps(ceph_inode(inode),
1310				CHECK_CAPS_AUTHONLY, NULL);
1311	}
1312
1313	if (!ret) {
1314		spin_lock(&ci->i_ceph_lock);
1315		ci->i_inline_version = CEPH_INLINE_NONE;
1316		dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR);
1317		spin_unlock(&ci->i_ceph_lock);
1318		if (dirty)
1319			__mark_inode_dirty(inode, dirty);
1320	}
1321
1322	ceph_put_cap_refs(ci, got);
1323unlock:
1324	mutex_unlock(&inode->i_mutex);
1325	return ret;
1326}
1327
1328const struct file_operations ceph_file_fops = {
1329	.open = ceph_open,
1330	.release = ceph_release,
1331	.llseek = ceph_llseek,
1332	.read_iter = ceph_read_iter,
1333	.write_iter = ceph_write_iter,
1334	.mmap = ceph_mmap,
1335	.fsync = ceph_fsync,
1336	.lock = ceph_lock,
1337	.flock = ceph_flock,
1338	.splice_read = generic_file_splice_read,
1339	.splice_write = iter_file_splice_write,
1340	.unlocked_ioctl = ceph_ioctl,
1341	.compat_ioctl	= ceph_ioctl,
1342	.fallocate	= ceph_fallocate,
1343};
1344
1345