From: Mikulas Patocka <mpatocka@redhat.com>

Make one kcopyd thread per device.

The original shared kcopyd could deadlock.

Configuration:
--------------
A (dm-raid1)
B (dm-raid1)
C (any device)

B is a part of the device A.
C is a part of the device B.
There may be other devices in the mirrors, but they are not relevant to this
deadlock.

Deadlock scenario:
------------------
Both mirror devices A and B are running a recovery.

B's mempool "md->tio_pool" is empty. All the IO requests allocated from this
pool belong to the region that is being synchronized, so they are held on
ms->writes and ms->reads queues.

A makes a kcopyd request to B during A's recovery.
Stacktrace of A's "kmirrord" thread is:
do_mirror
_do_mirror
do_recovery
recover
kcopyd_copy

kcopyd receives the A's request and starts processing it:
do_work
process_jobs(&_io_jobs, run_io_job)
run_io_job
dm_io
async_io
dispatch_io
do_region
submit_bio
generic_make_request
... submit BIO calls the B's request function
q->make_request_fn
dm_request (on device B)
__split_bio
__clone_and_map
alloc_tio
  - alloc_tio waits, until some space is made in B's md->tio_pool

Meanwhile, the device B is doing its own recovery work (sending requests on
device C). B's "kmirrord" thread has this stacktrace:
do_mirror
_do_mirror
do_recovery
recover
kcopyd_copy --- however kcopyd is blocked elsewhere, so it doesn't process
the request immediatelly

The deadlock:
-------------
All B's requests are waiting for B's recovery of the region to complete.
The B's recovery is waiting for kcopyd.
kcopyd is waiting (on behalf of A's request) until some B's request finishes
and makes a room in B's md->tio_pool mempool.

Signed-off-by: Mikulas Patocka <mpatocka@redhat.com>
Signed-off-by: Alasdair G Kergon <agk@redhat.com>

---
 drivers/md/kcopyd.c |  132 ++++++++++++++++++++++++++++------------------------
 1 files changed, 73 insertions(+), 59 deletions(-)

Index: linux-2.6.25/drivers/md/kcopyd.c
===================================================================
--- linux-2.6.25.orig/drivers/md/kcopyd.c	2008-04-24 18:00:29.000000000 +0100
+++ linux-2.6.25/drivers/md/kcopyd.c	2008-04-24 18:00:31.000000000 +0100
@@ -26,14 +26,6 @@
 #include "kcopyd.h"
 #include "dm.h"
 
-static struct workqueue_struct *_kcopyd_wq;
-static struct work_struct _kcopyd_work;
-
-static void wake(void)
-{
-	queue_work(_kcopyd_wq, &_kcopyd_work);
-}
-
 /*-----------------------------------------------------------------
  * Each kcopyd client has its own little pool of preallocated
  * pages for kcopyd io.
@@ -50,8 +42,30 @@ struct dm_kcopyd_client {
 
 	wait_queue_head_t destroyq;
 	atomic_t nr_jobs;
+
+	struct workqueue_struct *kcopyd_wq;
+	struct work_struct kcopyd_work;
+
+/*
+ * We maintain three lists of jobs:
+ *
+ * i)   jobs waiting for pages
+ * ii)  jobs that have pages, and are waiting for the io to be issued.
+ * iii) jobs that have completed.
+ *
+ * All three of these are protected by job_lock.
+ */
+	spinlock_t job_lock;
+	struct list_head complete_jobs;
+	struct list_head io_jobs;
+	struct list_head pages_jobs;
 };
 
+static void wake(struct dm_kcopyd_client *kc)
+{
+	queue_work(kc->kcopyd_wq, &kc->kcopyd_work);
+}
+
 static struct page_list *alloc_pl(void)
 {
 	struct page_list *pl;
@@ -209,21 +223,6 @@ struct kcopyd_job {
 static struct kmem_cache *_job_cache;
 static mempool_t *_job_pool;
 
-/*
- * We maintain three lists of jobs:
- *
- * i)   jobs waiting for pages
- * ii)  jobs that have pages, and are waiting for the io to be issued.
- * iii) jobs that have completed.
- *
- * All three of these are protected by job_lock.
- */
-static DEFINE_SPINLOCK(_job_lock);
-
-static LIST_HEAD(_complete_jobs);
-static LIST_HEAD(_io_jobs);
-static LIST_HEAD(_pages_jobs);
-
 static int jobs_init(void)
 {
 	_job_cache = KMEM_CACHE(kcopyd_job, 0);
@@ -241,10 +240,6 @@ static int jobs_init(void)
 
 static void jobs_exit(void)
 {
-	BUG_ON(!list_empty(&_complete_jobs));
-	BUG_ON(!list_empty(&_io_jobs));
-	BUG_ON(!list_empty(&_pages_jobs));
-
 	mempool_destroy(_job_pool);
 	kmem_cache_destroy(_job_cache);
 	_job_pool = NULL;
@@ -255,18 +250,19 @@ static void jobs_exit(void)
  * Functions to push and pop a job onto the head of a given job
  * list.
  */
-static struct kcopyd_job *pop(struct list_head *jobs)
+static struct kcopyd_job *pop(struct list_head *jobs,
+			      struct dm_kcopyd_client *kc)
 {
 	struct kcopyd_job *job = NULL;
 	unsigned long flags;
 
-	spin_lock_irqsave(&_job_lock, flags);
+	spin_lock_irqsave(&kc->job_lock, flags);
 
 	if (!list_empty(jobs)) {
 		job = list_entry(jobs->next, struct kcopyd_job, list);
 		list_del(&job->list);
 	}
-	spin_unlock_irqrestore(&_job_lock, flags);
+	spin_unlock_irqrestore(&kc->job_lock, flags);
 
 	return job;
 }
@@ -274,10 +270,11 @@ static struct kcopyd_job *pop(struct lis
 static void push(struct list_head *jobs, struct kcopyd_job *job)
 {
 	unsigned long flags;
+	struct dm_kcopyd_client *kc = job->kc;
 
-	spin_lock_irqsave(&_job_lock, flags);
+	spin_lock_irqsave(&kc->job_lock, flags);
 	list_add_tail(&job->list, jobs);
-	spin_unlock_irqrestore(&_job_lock, flags);
+	spin_unlock_irqrestore(&kc->job_lock, flags);
 }
 
 /*
@@ -310,6 +307,7 @@ static int run_complete_job(struct kcopy
 static void complete_io(unsigned long error, void *context)
 {
 	struct kcopyd_job *job = (struct kcopyd_job *) context;
+	struct dm_kcopyd_client *kc = job->kc;
 
 	if (error) {
 		if (job->rw == WRITE)
@@ -318,21 +316,21 @@ static void complete_io(unsigned long er
 			job->read_err = 1;
 
 		if (!test_bit(DM_KCOPYD_IGNORE_ERROR, &job->flags)) {
-			push(&_complete_jobs, job);
-			wake();
+			push(&kc->complete_jobs, job);
+			wake(kc);
 			return;
 		}
 	}
 
 	if (job->rw == WRITE)
-		push(&_complete_jobs, job);
+		push(&kc->complete_jobs, job);
 
 	else {
 		job->rw = WRITE;
-		push(&_io_jobs, job);
+		push(&kc->io_jobs, job);
 	}
 
-	wake();
+	wake(kc);
 }
 
 /*
@@ -369,7 +367,7 @@ static int run_pages_job(struct kcopyd_j
 	r = kcopyd_get_pages(job->kc, job->nr_pages, &job->pages);
 	if (!r) {
 		/* this job is ready for io */
-		push(&_io_jobs, job);
+		push(&job->kc->io_jobs, job);
 		return 0;
 	}
 
@@ -384,12 +382,13 @@ static int run_pages_job(struct kcopyd_j
  * Run through a list for as long as possible.  Returns the count
  * of successful jobs.
  */
-static int process_jobs(struct list_head *jobs, int (*fn) (struct kcopyd_job *))
+static int process_jobs(struct list_head *jobs, struct dm_kcopyd_client *kc,
+			int (*fn) (struct kcopyd_job *))
 {
 	struct kcopyd_job *job;
 	int r, count = 0;
 
-	while ((job = pop(jobs))) {
+	while ((job = pop(jobs, kc))) {
 
 		r = fn(job);
 
@@ -399,7 +398,7 @@ static int process_jobs(struct list_head
 				job->write_err = (unsigned long) -1L;
 			else
 				job->read_err = 1;
-			push(&_complete_jobs, job);
+			push(&kc->complete_jobs, job);
 			break;
 		}
 
@@ -421,8 +420,11 @@ static int process_jobs(struct list_head
 /*
  * kcopyd does this every time it's woken up.
  */
-static void do_work(struct work_struct *ignored)
+static void do_work(struct work_struct *work)
 {
+	struct dm_kcopyd_client *kc = container_of(work,
+					struct dm_kcopyd_client, kcopyd_work);
+
 	/*
 	 * The order that these are called is *very* important.
 	 * complete jobs can free some pages for pages jobs.
@@ -430,9 +432,9 @@ static void do_work(struct work_struct *
 	 * list.  io jobs call wake when they complete and it all
 	 * starts again.
 	 */
-	process_jobs(&_complete_jobs, run_complete_job);
-	process_jobs(&_pages_jobs, run_pages_job);
-	process_jobs(&_io_jobs, run_io_job);
+	process_jobs(&kc->complete_jobs, kc, run_complete_job);
+	process_jobs(&kc->pages_jobs, kc, run_pages_job);
+	process_jobs(&kc->io_jobs, kc, run_io_job);
 }
 
 /*
@@ -442,9 +444,10 @@ static void do_work(struct work_struct *
  */
 static void dispatch_job(struct kcopyd_job *job)
 {
-	atomic_inc(&job->kc->nr_jobs);
-	push(&_pages_jobs, job);
-	wake();
+	struct dm_kcopyd_client *kc = job->kc;
+	atomic_inc(&kc->nr_jobs);
+	push(&kc->pages_jobs, job);
+	wake(kc);
 }
 
 #define SUB_JOB_SIZE 128
@@ -625,15 +628,7 @@ static int kcopyd_init(void)
 		return r;
 	}
 
-	_kcopyd_wq = create_singlethread_workqueue("kcopyd");
-	if (!_kcopyd_wq) {
-		jobs_exit();
-		mutex_unlock(&kcopyd_init_lock);
-		return -ENOMEM;
-	}
-
 	kcopyd_clients++;
-	INIT_WORK(&_kcopyd_work, do_work);
 	mutex_unlock(&kcopyd_init_lock);
 	return 0;
 }
@@ -644,8 +639,6 @@ static void kcopyd_exit(void)
 	kcopyd_clients--;
 	if (!kcopyd_clients) {
 		jobs_exit();
-		destroy_workqueue(_kcopyd_wq);
-		_kcopyd_wq = NULL;
 	}
 	mutex_unlock(&kcopyd_init_lock);
 }
@@ -662,15 +655,31 @@ int dm_kcopyd_client_create(unsigned int
 
 	kc = kmalloc(sizeof(*kc), GFP_KERNEL);
 	if (!kc) {
+		r = -ENOMEM;
 		kcopyd_exit();
-		return -ENOMEM;
+		return r;
 	}
 
 	spin_lock_init(&kc->lock);
+	spin_lock_init(&kc->job_lock);
+	INIT_LIST_HEAD(&kc->complete_jobs);
+	INIT_LIST_HEAD(&kc->io_jobs);
+	INIT_LIST_HEAD(&kc->pages_jobs);
+
+	INIT_WORK(&kc->kcopyd_work, do_work);
+	kc->kcopyd_wq = create_singlethread_workqueue("kcopyd");
+	if (!kc->kcopyd_wq) {
+		r = -ENOMEM;
+		kfree(kc);
+		kcopyd_exit();
+		return r;
+	}
+
 	kc->pages = NULL;
 	kc->nr_pages = kc->nr_free_pages = 0;
 	r = client_alloc_pages(kc, nr_pages);
 	if (r) {
+		destroy_workqueue(kc->kcopyd_wq);
 		kfree(kc);
 		kcopyd_exit();
 		return r;
@@ -680,6 +689,7 @@ int dm_kcopyd_client_create(unsigned int
 	if (IS_ERR(kc->io_client)) {
 		r = PTR_ERR(kc->io_client);
 		client_free_pages(kc);
+		destroy_workqueue(kc->kcopyd_wq);
 		kfree(kc);
 		kcopyd_exit();
 		return r;
@@ -699,6 +709,10 @@ void dm_kcopyd_client_destroy(struct dm_
 	/* Wait for completion of all jobs submitted by this client. */
 	wait_event(kc->destroyq, !atomic_read(&kc->nr_jobs));
 
+	BUG_ON(!list_empty(&kc->complete_jobs));
+	BUG_ON(!list_empty(&kc->io_jobs));
+	BUG_ON(!list_empty(&kc->pages_jobs));
+	destroy_workqueue(kc->kcopyd_wq);
 	dm_io_client_destroy(kc->io_client);
 	client_free_pages(kc);
 	client_del(kc);
