From: Holger Smolinski <smolinski@de.ibm.com>

This patch replaces the single instance of kmirrord by one instance
per mirror set. This change was required, because the old implementation
caused a deadlock in kmirrord, when the persistent dirty log of a mirror
set resided on a mirror set itself. The single instance of kmirrord then
issued a sync write to the dirty log in write_bits, which was deferred
to kmirrord itself later in the call chain. But: kmirrord never did the
deferred work, because it was still waiting for the sync write_bits.

--- a/drivers/md/dm-raid1.c	16 Mar 2006 15:02:16 -0000	1.4
+++ b/drivers/md/dm-raid1.c	23 Mar 2006 13:49:41 -0000	1.4.2.2
@@ -21,53 +21,6 @@
 #include <linux/vmalloc.h>
 #include <linux/workqueue.h>
 
-static struct workqueue_struct *_kmirrord_wq;
-static struct work_struct _kmirrord_work;
-
-static inline void wake(void)
-{
-	queue_work(_kmirrord_wq, &_kmirrord_work);
-}
-
-static struct workqueue_struct *_kmir_mon_wq;
-
-/*-----------------------------------------------------------------
- * Region hash
- *
- * The mirror splits itself up into discrete regions.  Each
- * region can be in one of three states: clean, dirty,
- * nosync.  There is no need to put clean regions in the hash.
- *
- * In addition to being present in the hash table a region _may_
- * be present on one of three lists.
- *
- *   clean_regions: Regions on this list have no io pending to
- *   them, they are in sync, we are no longer interested in them,
- *   they are dull.  rh_update_states() will remove them from the
- *   hash table.
- *
- *   quiesced_regions: These regions have been spun down, ready
- *   for recovery.  rh_recovery_start() will remove regions from
- *   this list and hand them to kmirrord, which will schedule the
- *   recovery io with kcopyd.
- *
- *   recovered_regions: Regions that kcopyd has successfully
- *   recovered.  rh_update_states() will now schedule any delayed
- *   io, up the recovery_count, and remove the region from the
- *   hash.
- *
- * There are 2 locks:
- *   A rw spin lock 'hash_lock' protects just the hash table,
- *   this is never held in write mode from interrupt context,
- *   which I believe means that we only have to disable irqs when
- *   doing a write lock.
- *
- *   An ordinary spin lock 'region_lock' that protects the three
- *   lists in the region_hash, with the 'state', 'list' and
- *   'bhs_delayed' fields of the regions.  This is used from irq
- *   context, so all other uses will have to suspend local irqs.
- *---------------------------------------------------------------*/
-struct mirror_set;
 struct region_hash {
 	struct mirror_set *ms;
 	uint32_t region_size;
@@ -109,6 +62,84 @@
 	struct bio_list delayed_bios;
 };
 
+struct mirror {
+	atomic_t error_count;  /* Error counter to flag mirror failure */
+	struct mirror_set *ms;
+	struct dm_dev *dev;
+	sector_t offset;
+};
+
+struct mirror_set {
+	struct dm_target *ti;
+	struct list_head list;
+	struct region_hash rh;
+	struct kcopyd_client *kcopyd_client;
+
+	spinlock_t lock;	/* protects the lists */
+	struct bio_list reads;
+	struct bio_list writes;
+	struct bio_list failures;
+	struct work_struct failure_work;
+	struct completion failure_completion;
+
+	/* recovery */
+	atomic_t suspended;
+	region_t nr_regions;
+	int in_sync;
+
+	unsigned int nr_mirrors;
+	spinlock_t choose_lock; /* protects select in choose_mirror(). */
+	atomic_t read_count;    /* Read counter for read balancing. */
+	unsigned int read_mirror;       /* Last mirror read. */
+	struct mirror *default_mirror;  /* Default mirror. */
+	struct workqueue_struct *_kmirrord_wq;
+	struct work_struct _kmirrord_work;
+	struct workqueue_struct *_kmir_mon_wq;
+ 	struct mirror mirror[0];
+};
+
+static inline void wake(struct mirror_set *ms)
+{
+	queue_work(ms->_kmirrord_wq, &ms->_kmirrord_work);
+}
+
+/*-----------------------------------------------------------------
+ * Region hash
+ *
+ * The mirror splits itself up into discrete regions.  Each
+ * region can be in one of three states: clean, dirty,
+ * nosync.  There is no need to put clean regions in the hash.
+ *
+ * In addition to being present in the hash table a region _may_
+ * be present on one of three lists.
+ *
+ *   clean_regions: Regions on this list have no io pending to
+ *   them, they are in sync, we are no longer interested in them,
+ *   they are dull.  rh_update_states() will remove them from the
+ *   hash table.
+ *
+ *   quiesced_regions: These regions have been spun down, ready
+ *   for recovery.  rh_recovery_start() will remove regions from
+ *   this list and hand them to kmirrord, which will schedule the
+ *   recovery io with kcopyd.
+ *
+ *   recovered_regions: Regions that kcopyd has successfully
+ *   recovered.  rh_update_states() will now schedule any delayed
+ *   io, up the recovery_count, and remove the region from the
+ *   hash.
+ *
+ * There are 2 locks:
+ *   A rw spin lock 'hash_lock' protects just the hash table,
+ *   this is never held in write mode from interrupt context,
+ *   which I believe means that we only have to disable irqs when
+ *   doing a write lock.
+ *
+ *   An ordinary spin lock 'region_lock' that protects the three
+ *   lists in the region_hash, with the 'state', 'list' and
+ *   'bhs_delayed' fields of the regions.  This is used from irq
+ *   context, so all other uses will have to suspend local irqs.
+ *---------------------------------------------------------------*/
+
 /*
  * Conversion fns
  */
@@ -425,7 +456,7 @@
 	spin_unlock_irqrestore(&rh->region_lock, flags);
 
 	if (should_wake)
-		wake();
+		wake(rh->ms);
 }
 
 /*
@@ -504,7 +535,7 @@
 	list_add(&reg->list, &reg->rh->recovered_regions);
 	spin_unlock_irq(&rh->region_lock);
 
-	wake();
+	wake(rh->ms);
 }
 
 static void rh_flush(struct region_hash *rh)
@@ -538,44 +569,9 @@
 	for (i = 0; i < MAX_RECOVERY; i++)
 		up(&rh->recovery_count);
 
-	wake();
+	wake(rh->ms);
 }
 
-/*-----------------------------------------------------------------
- * Mirror set structures.
- *---------------------------------------------------------------*/
-struct mirror {
-	atomic_t error_count;  /* Error counter to flag mirror failure */
-	struct mirror_set *ms;
-	struct dm_dev *dev;
-	sector_t offset;
-};
-
-struct mirror_set {
-	struct dm_target *ti;
-	struct list_head list;
-	struct region_hash rh;
-	struct kcopyd_client *kcopyd_client;
-
-	spinlock_t lock;	/* protects the lists */
-	struct bio_list reads;
-	struct bio_list writes;
-	struct bio_list failures;
-	struct work_struct failure_work;
-	struct completion failure_completion;
-
-	/* recovery */
-	atomic_t suspended;
-	region_t nr_regions;
-	int in_sync;
-
-	unsigned int nr_mirrors;
-	spinlock_t choose_lock; /* protects select in choose_mirror(). */
-	atomic_t read_count;    /* Read counter for read balancing. */
-	unsigned int read_mirror;       /* Last mirror read. */
-	struct mirror *default_mirror;  /* Default mirror. */
- 	struct mirror mirror[0];
-};
 
 struct bio_map_info {
 	struct mirror *bmi_m;
@@ -947,7 +943,7 @@
 				spin_unlock(&ms->lock);
 
 				if (run)
-					queue_work(_kmir_mon_wq,
+					queue_work(ms->_kmir_mon_wq,
 						   &ms->failure_work);
 
 				return;
@@ -1082,16 +1078,6 @@
 	do_writes(ms, &writes);
 }
 
-static void do_work(void *ignored)
-{
-	struct mirror_set *ms;
-
-	down_read(&_mirror_sets_lock);
-	list_for_each_entry (ms, &_mirror_sets, list)
-		do_mirror(ms);
-	up_read(&_mirror_sets_lock);
-}
-
 /*-----------------------------------------------------------------
  * Target functions
  *---------------------------------------------------------------*/
@@ -1188,7 +1174,7 @@
 	down_write(&_mirror_sets_lock);
 	list_add_tail(&ms->list, &_mirror_sets);
 	up_write(&_mirror_sets_lock);
-	wake();
+	wake(ms);
 
 	return 0;
 }
@@ -1301,9 +1287,30 @@
 	ti->private = ms;
  	ti->split_io = ms->rh.region_size;
 
+	ms->_kmirrord_wq = create_singlethread_workqueue("kmirrord");
+	if (!ms->_kmirrord_wq) {
+		DMERR("couldn't start kmirrord");
+		free_context(ms, ti, ms->nr_mirrors);
+		dm_destroy_dirty_log(dl);
+		return -ENOMEM;
+	}
+	INIT_WORK(&ms->_kmirrord_work, do_mirror, ms);
+
+	ms->_kmir_mon_wq = create_singlethread_workqueue("kmir_mon");
+	if (!ms->_kmir_mon_wq) {
+		DMERR("couldn't start kmir_mon");
+		destroy_workqueue(ms->_kmirrord_wq);
+		free_context(ms, ti, ms->nr_mirrors);
+		dm_destroy_dirty_log(dl);
+		return -ENOMEM;
+	}
+
 	r = kcopyd_client_create(DM_IO_PAGES, &ms->kcopyd_client);
 	if (r) {
+		destroy_workqueue(ms->_kmir_mon_wq);
+		destroy_workqueue(ms->_kmirrord_wq);
 		free_context(ms, ti, ms->nr_mirrors);
+		dm_destroy_dirty_log(dl);
 		return r;
 	}
 
@@ -1317,7 +1324,10 @@
 
 	del_mirror_set(ms);
 	kcopyd_client_destroy(ms->kcopyd_client);
+	destroy_workqueue(ms->_kmir_mon_wq);
+	destroy_workqueue(ms->_kmirrord_wq);
 	free_context(ms, ti, ms->nr_mirrors);
+	//FIXME: how to destroy the dirly log here?	dm_destroy_dirty_log(dl);
 }
 
 static void queue_bio(struct mirror_set *ms, struct bio *bio, int rw)
@@ -1333,7 +1343,7 @@
 	spin_unlock_irqrestore(&ms->lock, flags);
 
 	if (should_wake)
-		wake();
+		wake(ms);
 }
 
 /*
@@ -1576,29 +1586,11 @@
 	if (r)
 		return r;
 
-	_kmirrord_wq = create_singlethread_workqueue("kmirrord");
-	if (!_kmirrord_wq) {
-		DMERR("couldn't start kmirrord");
-		dm_dirty_log_exit();
-		return -ENOMEM;
-	}
-	INIT_WORK(&_kmirrord_work, do_work, NULL);
-
-	_kmir_mon_wq = create_singlethread_workqueue("kmir_mon");
-	if (!_kmir_mon_wq) {
-		DMERR("couldn't start kmir_mon");
-		dm_dirty_log_exit();
-		destroy_workqueue(_kmirrord_wq);
-		return -ENOMEM;
-	}
-
 	r = dm_register_target(&mirror_target);
 	if (r < 0) {
 		DMERR("%s: Failed to register mirror target",
 		      mirror_target.name);
 		dm_dirty_log_exit();
-		destroy_workqueue(_kmirrord_wq);
-		destroy_workqueue(_kmir_mon_wq);
 	}
 
 	return r;
@@ -1612,7 +1604,6 @@
 	if (r < 0)
 		DMERR("%s: unregister failed %d", mirror_target.name, r);
 
-	destroy_workqueue(_kmirrord_wq);
 	dm_dirty_log_exit();
 }