From: Mikulas Patocka <mpatocka@redhat.com>

When there is one merging snapshot and other non-merging snapshots,
snapshot_merge_process() must make exceptions in the non-merging
snapshots.

Use a sequence count to resolve the race between I/O to chunks that are
about to be merged.  The count increases each time an exception
reallocation finishes.  Use wait_event() to wait until the count
changes.

[FIXME AGK remove goto]

Signed-off-by: Mikulas Patocka <mpatocka@redhat.com>
Signed-off-by: Mike Snitzer <snitzer@redhat.com>

---
drivers/md/dm-snap.c |   32 +++++++++++++++++++++++++++++++-
 drivers/md/dm-snap.c |   74 ++++++++++++++++++++++++++++++++++++++++++++++++++-
 1 file changed, 73 insertions(+), 1 deletion(-)

Index: linux-2.6.32/drivers/md/dm-snap.c
===================================================================
--- linux-2.6.32.orig/drivers/md/dm-snap.c
+++ linux-2.6.32/drivers/md/dm-snap.c
@@ -268,6 +268,10 @@ struct origin {
 static struct list_head *_origins;
 static struct rw_semaphore _origins_lock;
 
+static DECLARE_WAIT_QUEUE_HEAD(_pending_exception_done);
+static DEFINE_SPINLOCK(_pending_exception_done_spinlock);
+static uint64_t _pending_exception_done_count;
+
 static int init_origin_hash(void)
 {
 	int i;
@@ -760,14 +764,31 @@ static int init_hash_tables(struct dm_sn
 static void flush_bios(struct bio *bio);
 static void error_bios(struct bio *bio);
 
+static int __origin_write(struct list_head *snapshots,
+			  sector_t sector, struct bio *bio);
+
 static void merge_callback(int read_err, unsigned long write_err,
 			   void *context);
 
+static u64 read_pending_exception_done_count(void)
+{
+	u64 current_count;
+
+	spin_lock(&_pending_exception_done_spinlock);
+	current_count = _pending_exception_done_count;
+	spin_unlock(&_pending_exception_done_spinlock);
+
+	return current_count;
+}
+
 static void snapshot_merge_process(struct dm_snapshot *s)
 {
 	int r;
-	chunk_t old_chunk, new_chunk;
+	chunk_t old_chunk, new_chunk, n;
+	struct origin *o;
+	int must_wait;
 	struct dm_io_region src, dest;
+	uint64_t previous_count;
 
 	BUG_ON(!test_bit(MERGE_RUNNING, &s->bits));
 	if (unlikely(test_bit(SHUTDOWN_MERGE, &s->bits)))
@@ -797,6 +818,52 @@ static void snapshot_merge_process(struc
 	src.sector = chunk_to_sector(s->store, new_chunk);
 	src.count = dest.count;
 
+	/*
+	 * Reallocate the other snapshots:
+	 *
+	 * The chunk size of the merging snapshot may be larger than the chunk
+	 * size of some other snapshot. So we may need to reallocate multiple
+	 * chunks in a snapshot.
+	 *
+	 * We don't do linking of pending exceptions and waiting for the last
+	 * one --- that would complicate code too much and it would also be
+	 * bug-prone.
+	 *
+	 * Instead, we try to scan all the overlapping exceptions in all
+	 * non-merging snapshots and if something was reallocated then wait
+	 * for any pending exception to complete. Retry after the wait, until
+	 * all exceptions are done.
+	 *
+	 * This may seem ineffective, but in practice, people hardly use more
+	 * than one or two snapshots. In case of two snapshots (one merging and
+	 * one non-merging) with the same chunksize, wait and wakeup is done
+	 * only once.
+	 */
+
+/* FIXME REMOVE label+goto */
+test_again:
+	previous_count = read_pending_exception_done_count();
+	must_wait = 0;
+	/*
+	 * Merging snapshot already has the origin's __minimum_chunk_size()
+	 * stored in split_io (see: snapshot_merge_resume); avoid rediscovery
+	 */
+	BUG_ON(!s->ti->split_io);
+	down_read(&_origins_lock);
+	o = __lookup_origin(s->origin->bdev);
+	for (n = 0; n < s->store->chunk_size; n += s->ti->split_io) {
+		r = __origin_write(&o->snapshots, dest.sector + n, NULL);
+		if (r == DM_MAPIO_SUBMITTED)
+			must_wait = 1;
+	}
+	up_read(&_origins_lock);
+	if (must_wait) {
+		wait_event(_pending_exception_done,
+			   (read_pending_exception_done_count() !=
+			    previous_count));
+		goto test_again;
+	}
+
 	down_write(&s->lock);
 	s->merge_write_interlock = old_chunk;
 	s->merge_write_interlock_n = 1;
@@ -1325,6 +1392,11 @@ static void pending_complete(struct dm_s
 	origin_bios = bio_list_get(&pe->origin_bios);
 	free_pending_exception(pe);
 
+	spin_lock(&_pending_exception_done_spinlock);
+	_pending_exception_done_count++;
+	spin_unlock(&_pending_exception_done_spinlock);
+	wake_up_all(&_pending_exception_done);
+
 	up_write(&s->lock);
 
 	/* Submit any pending write bios */
