From: Holger Smolinski <smolinski@de.ibm.com>

This patch replaces the single instance of kmirrord by one instance per
mirror set.  This change is required to avoid a deadlock in kmirrord when
the persistent dirty log of a mirror itself resides on a mirror. The single
instance of kmirrord then issues a sync write to the dirty log in
write_bits which gets deferred to kmirrord itself later in the call chain.
But kmirrord never does the deferred work because it is still waiting for
the sync write_bits.

_mirror_sets is removed as it no longer needed, and we always flush the
workqueue before destroying it to ensure all work is complete before
destroying it.

Signed-off-by: Holger Smolinski <smolinski@de.ibm.com>
Index: linux-2.6.19/drivers/md/dm-raid1.c
===================================================================
--- linux-2.6.19.orig/drivers/md/dm-raid1.c	2006-12-06 20:55:40.000000000 +0000
+++ linux-2.6.19/drivers/md/dm-raid1.c	2006-12-13 18:18:51.000000000 +0000
@@ -24,15 +24,8 @@
 
 #define DM_RAID1_HANDLE_ERRORS 0x01
 
-static struct workqueue_struct *_kmirrord_wq;
-static struct work_struct _kmirrord_work;
 static DECLARE_WAIT_QUEUE_HEAD(_kmirrord_recovery_stopped);
 
-static inline void wake(void)
-{
-	queue_work(_kmirrord_wq, &_kmirrord_work);
-}
-
 /*-----------------------------------------------------------------
  * Region hash
  *
@@ -139,6 +132,9 @@ struct mirror_set {
 
 	struct mirror *default_mirror;	/* Default mirror */
 
+	struct workqueue_struct *kmirrord_wq;
+	struct work_struct kmirrord_work;
+
 	unsigned int nr_mirrors;
 	struct mirror mirror[0];
 };
@@ -156,6 +152,11 @@ static inline sector_t region_to_sector(
 	return region << rh->region_shift;
 }
 
+static void wake(struct mirror_set *ms)
+{
+	queue_work(ms->kmirrord_wq, &ms->kmirrord_work);
+}
+
 /* FIXME move this */
 static void queue_bio(struct mirror_set *ms, struct bio *bio, int rw);
 
@@ -474,7 +475,7 @@ static void rh_dec(struct region_hash *r
 	spin_unlock_irqrestore(&rh->region_lock, flags);
 
 	if (should_wake)
-		wake();
+		wake(rh->ms);
 }
 
 /*
@@ -561,7 +562,7 @@ static void rh_recovery_end(struct regio
 	list_add(&reg->list, &reg->rh->recovered_regions);
 	spin_unlock_irq(&rh->region_lock);
 
-	wake();
+	wake(rh->ms);
 }
 
 static void rh_flush(struct region_hash *rh)
@@ -595,7 +596,7 @@ static void rh_start_recovery(struct reg
 	for (i = 0; i < MAX_RECOVERY; i++)
 		up(&rh->recovery_count);
 
-	wake();
+	wake(rh->ms);
 }
 
 /*
@@ -873,11 +874,9 @@ static void do_writes(struct mirror_set 
 /*-----------------------------------------------------------------
  * kmirrord
  *---------------------------------------------------------------*/
-static LIST_HEAD(_mirror_sets);
-static DECLARE_RWSEM(_mirror_sets_lock);
-
-static void do_mirror(struct mirror_set *ms)
+static void do_mirror(void *data)
 {
+	struct mirror_set *ms = data;
 	struct bio_list reads, writes;
 
 	spin_lock(&ms->lock);
@@ -893,16 +892,6 @@ static void do_mirror(struct mirror_set 
 	do_writes(ms, &writes);
 }
 
-static void do_work(void *ignored)
-{
-	struct mirror_set *ms;
-
-	down_read(&_mirror_sets_lock);
-	list_for_each_entry (ms, &_mirror_sets, list)
-		do_mirror(ms);
-	up_read(&_mirror_sets_lock);
-}
-
 /*-----------------------------------------------------------------
  * Target functions
  *---------------------------------------------------------------*/
@@ -981,23 +970,6 @@ static int get_mirror(struct mirror_set 
 	return 0;
 }
 
-static int add_mirror_set(struct mirror_set *ms)
-{
-	down_write(&_mirror_sets_lock);
-	list_add_tail(&ms->list, &_mirror_sets);
-	up_write(&_mirror_sets_lock);
-	wake();
-
-	return 0;
-}
-
-static void del_mirror_set(struct mirror_set *ms)
-{
-	down_write(&_mirror_sets_lock);
-	list_del(&ms->list);
-	up_write(&_mirror_sets_lock);
-}
-
 /*
  * Create dirty log: log_type #log_params <log_params>
  */
@@ -1154,13 +1126,22 @@ static int mirror_ctr(struct dm_target *
 		return -EINVAL;
 	}
 
+	ms->kmirrord_wq = create_singlethread_workqueue("kmirrord");
+	if (!ms->kmirrord_wq) {
+		DMERR("couldn't start kmirrord");
+		free_context(ms, ti, m);
+		return -ENOMEM;
+	}
+	INIT_WORK(&ms->kmirrord_work, do_mirror, ms);
+
 	r = kcopyd_client_create(DM_IO_PAGES, &ms->kcopyd_client);
 	if (r) {
+		destroy_workqueue(ms->kmirrord_wq);
 		free_context(ms, ti, ms->nr_mirrors);
 		return r;
 	}
 
-	add_mirror_set(ms);
+	wake(ms);
 	return 0;
 }
 
@@ -1168,8 +1149,9 @@ static void mirror_dtr(struct dm_target 
 {
 	struct mirror_set *ms = (struct mirror_set *) ti->private;
 
-	del_mirror_set(ms);
+	flush_workqueue(ms->kmirrord_wq);
 	kcopyd_client_destroy(ms->kcopyd_client);
+	destroy_workqueue(ms->kmirrord_wq);
 	free_context(ms, ti, ms->nr_mirrors);
 }
 
@@ -1185,7 +1167,7 @@ static void queue_bio(struct mirror_set 
 	spin_unlock(&ms->lock);
 
 	if (should_wake)
-		wake();
+		wake(ms);
 }
 
 /*
@@ -1334,20 +1316,11 @@ static int __init dm_mirror_init(void)
 	if (r)
 		return r;
 
-	_kmirrord_wq = create_singlethread_workqueue("kmirrord");
-	if (!_kmirrord_wq) {
-		DMERR("couldn't start kmirrord");
-		dm_dirty_log_exit();
-		return r;
-	}
-	INIT_WORK(&_kmirrord_work, do_work, NULL);
-
 	r = dm_register_target(&mirror_target);
 	if (r < 0) {
 		DMERR("%s: Failed to register mirror target",
 		      mirror_target.name);
 		dm_dirty_log_exit();
-		destroy_workqueue(_kmirrord_wq);
 	}
 
 	return r;
@@ -1361,7 +1334,6 @@ static void __exit dm_mirror_exit(void)
 	if (r < 0)
 		DMERR("%s: unregister failed %d", mirror_target.name, r);
 
-	destroy_workqueue(_kmirrord_wq);
 	dm_dirty_log_exit();
 }