linux-kernel.vger.kernel.org archive mirror
 help / color / mirror / Atom feed
From: Daniel Jordan <daniel.m.jordan@oracle.com>
To: Steffen Klassert <steffen.klassert@secunet.com>,
	Herbert Xu <herbert@gondor.apana.org.au>
Cc: Tejun Heo <tj@kernel.org>, Lai Jiangshan <jiangshanlai@gmail.com>,
	Peter Zijlstra <peterz@infradead.org>,
	linux-crypto@vger.kernel.org, linux-kernel@vger.kernel.org,
	Daniel Jordan <daniel.m.jordan@oracle.com>
Subject: [RFC 8/9] padata: unbind parallel jobs from specific CPUs
Date: Thu, 25 Jul 2019 17:25:04 -0400	[thread overview]
Message-ID: <20190725212505.15055-9-daniel.m.jordan@oracle.com> (raw)
In-Reply-To: <20190725212505.15055-1-daniel.m.jordan@oracle.com>

Padata binds the parallel part of a job to a single CPU.  Though the
serial parts rely on per-CPU queues, it's not necessary for the parallel
part, and it's beneficial to run the job locally on NUMA machines
and let the scheduler pick the CPU within a node on a busy system.

So, make the parallel workqueue unbound.

Update the parallel workqueue's cpumask when the instance's parallel
cpumask changes.

Now that parallel jobs no longer run on max_active=1 workqueues, two or
more parallel works that hash to the same CPU may run simultaneously,
finish out of order, and so be serialized out of order.  Prevent this by
keeping the works sorted on the reorder list by sequence number and
teaching padata_get_next about it.

The ENODATA case in padata_get_next no longer makes sense because
parallel jobs aren't bound to specific CPUs.  The EINPROGRESS case takes
care of the scenario where a parallel job is potentially running on the
same CPU as padata_get_next.

Signed-off-by: Daniel Jordan <daniel.m.jordan@oracle.com>
---
 include/linux/padata.h |  4 +-
 kernel/padata.c        | 97 +++++++++++++++++++++++-------------------
 2 files changed, 57 insertions(+), 44 deletions(-)

diff --git a/include/linux/padata.h b/include/linux/padata.h
index e7978f8942ca..cc420064186f 100644
--- a/include/linux/padata.h
+++ b/include/linux/padata.h
@@ -35,6 +35,7 @@ struct padata_priv {
 	struct parallel_data	*pd;
 	int			cb_cpu;
 	int			cpu;
+	unsigned int		seq_nr;
 	int			info;
 	void                    (*parallel)(struct padata_priv *padata);
 	void                    (*serial)(struct padata_priv *padata);
@@ -104,7 +105,7 @@ struct padata_cpumask {
  * @squeue: percpu padata queues used for serialuzation.
  * @reorder_objects: Number of objects waiting in the reorder queues.
  * @refcnt: Number of objects holding a reference on this parallel_data.
- * @max_seq_nr:  Maximal used sequence number.
+ * @processed: Number of already processed objects.
  * @cpu: Next CPU to be processed.
  * @cpumask: The cpumasks in use for parallel and serial workers.
  * @reorder_work: work struct for reordering.
@@ -117,6 +118,7 @@ struct parallel_data {
 	atomic_t			reorder_objects;
 	atomic_t			refcnt;
 	atomic_t			seq_nr;
+	unsigned int			processed;
 	int				cpu;
 	struct padata_cpumask		cpumask;
 	struct work_struct		reorder_work;
diff --git a/kernel/padata.c b/kernel/padata.c
index 44c647f7300e..09a7dbdd9678 100644
--- a/kernel/padata.c
+++ b/kernel/padata.c
@@ -46,18 +46,13 @@ static int padata_index_to_cpu(struct parallel_data *pd, int cpu_index)
 	return target_cpu;
 }
 
-static int padata_cpu_hash(struct parallel_data *pd)
+static int padata_cpu_hash(struct parallel_data *pd, unsigned int seq_nr)
 {
-	unsigned int seq_nr;
-	int cpu_index;
-
 	/*
 	 * Hash the sequence numbers to the cpus by taking
 	 * seq_nr mod. number of cpus in use.
 	 */
-
-	seq_nr = atomic_inc_return(&pd->seq_nr);
-	cpu_index = seq_nr % cpumask_weight(pd->cpumask.pcpu);
+	int cpu_index = seq_nr % cpumask_weight(pd->cpumask.pcpu);
 
 	return padata_index_to_cpu(pd, cpu_index);
 }
@@ -144,7 +139,8 @@ int padata_do_parallel(struct padata_instance *pinst,
 	padata->pd = pd;
 	padata->cb_cpu = *cb_cpu;
 
-	target_cpu = padata_cpu_hash(pd);
+	padata->seq_nr = atomic_inc_return(&pd->seq_nr);
+	target_cpu = padata_cpu_hash(pd, padata->seq_nr);
 	padata->cpu = target_cpu;
 	queue = per_cpu_ptr(pd->pqueue, target_cpu);
 
@@ -152,7 +148,7 @@ int padata_do_parallel(struct padata_instance *pinst,
 	list_add_tail(&padata->list, &queue->parallel.list);
 	spin_unlock(&queue->parallel.lock);
 
-	queue_work_on(target_cpu, pinst->parallel_wq, &queue->work);
+	queue_work(pinst->parallel_wq, &queue->work);
 
 out:
 	rcu_read_unlock_bh();
@@ -172,9 +168,6 @@ EXPORT_SYMBOL(padata_do_parallel);
  * -EINPROGRESS, if the next object that needs serialization will
  *  be parallel processed by another cpu and is not yet present in
  *  the cpu's reorder queue.
- *
- * -ENODATA, if this cpu has to do the parallel processing for
- *  the next object.
  */
 static struct padata_priv *padata_get_next(struct parallel_data *pd)
 {
@@ -191,22 +184,25 @@ static struct padata_priv *padata_get_next(struct parallel_data *pd)
 		padata = list_entry(reorder->list.next,
 				    struct padata_priv, list);
 
-		list_del_init(&padata->list);
-		atomic_dec(&pd->reorder_objects);
+		/*
+		 * The check fails in the unlikely event that two or more
+		 * parallel jobs have hashed to the same CPU and one of the
+		 * later ones finishes first.
+		 */
+		if (padata->seq_nr == pd->processed) {
+			list_del_init(&padata->list);
+			atomic_dec(&pd->reorder_objects);
 
-		pd->cpu = cpumask_next_wrap(cpu, pd->cpumask.pcpu, -1,
-					    false);
+			++pd->processed;
+			pd->cpu = cpumask_next_wrap(cpu, pd->cpumask.pcpu, -1,
+						    false);
 
-		spin_unlock(&reorder->lock);
-		goto out;
+			spin_unlock(&reorder->lock);
+			goto out;
+		}
 	}
 	spin_unlock(&reorder->lock);
 
-	if (__this_cpu_read(pd->pqueue->cpu_index) == next_queue->cpu_index) {
-		padata = ERR_PTR(-ENODATA);
-		goto out;
-	}
-
 	padata = ERR_PTR(-EINPROGRESS);
 out:
 	return padata;
@@ -244,16 +240,6 @@ static void padata_reorder(struct parallel_data *pd)
 		if (PTR_ERR(padata) == -EINPROGRESS)
 			break;
 
-		/*
-		 * This cpu has to do the parallel processing of the next
-		 * object. It's waiting in the cpu's parallelization queue,
-		 * so exit immediately.
-		 */
-		if (PTR_ERR(padata) == -ENODATA) {
-			spin_unlock_bh(&pd->lock);
-			return;
-		}
-
 		cb_cpu = padata->cb_cpu;
 		squeue = per_cpu_ptr(pd->squeue, cb_cpu);
 
@@ -332,9 +318,14 @@ void padata_do_serial(struct padata_priv *padata)
 	struct parallel_data *pd = padata->pd;
 	struct padata_parallel_queue *pqueue = per_cpu_ptr(pd->pqueue,
 							   padata->cpu);
+	struct padata_priv *cur;
 
 	spin_lock(&pqueue->reorder.lock);
-	list_add_tail(&padata->list, &pqueue->reorder.list);
+	/* Sort in ascending order of sequence number. */
+	list_for_each_entry_reverse(cur, &pqueue->reorder.list, list)
+		if (cur->seq_nr < padata->seq_nr)
+			break;
+	list_add(&padata->list, &cur->list);
 	atomic_inc(&pd->reorder_objects);
 	spin_unlock(&pqueue->reorder.lock);
 
@@ -353,17 +344,36 @@ static int padata_setup_cpumasks(struct parallel_data *pd,
 				 const struct cpumask *pcpumask,
 				 const struct cpumask *cbcpumask)
 {
-	if (!alloc_cpumask_var(&pd->cpumask.pcpu, GFP_KERNEL))
-		return -ENOMEM;
+	struct workqueue_attrs *attrs;
+	int err = -ENOMEM;
 
+	if (!alloc_cpumask_var(&pd->cpumask.pcpu, GFP_KERNEL))
+		goto out;
 	cpumask_and(pd->cpumask.pcpu, pcpumask, cpu_online_mask);
-	if (!alloc_cpumask_var(&pd->cpumask.cbcpu, GFP_KERNEL)) {
-		free_cpumask_var(pd->cpumask.pcpu);
-		return -ENOMEM;
-	}
 
+	if (!alloc_cpumask_var(&pd->cpumask.cbcpu, GFP_KERNEL))
+		goto free_pcpu_mask;
 	cpumask_and(pd->cpumask.cbcpu, cbcpumask, cpu_online_mask);
+
+	attrs = alloc_workqueue_attrs();
+	if (!attrs)
+		goto free_cbcpu_mask;
+
+	/* Restrict parallel_wq workers to pd->cpumask.pcpu. */
+	cpumask_copy(attrs->cpumask, pd->cpumask.pcpu);
+	err = apply_workqueue_attrs(pd->pinst->parallel_wq, attrs);
+	free_workqueue_attrs(attrs);
+	if (err < 0)
+		goto free_cbcpu_mask;
+
 	return 0;
+
+free_cbcpu_mask:
+	free_cpumask_var(pd->cpumask.cbcpu);
+free_pcpu_mask:
+	free_cpumask_var(pd->cpumask.pcpu);
+out:
+	return err;
 }
 
 static void __padata_list_init(struct padata_list *pd_list)
@@ -429,6 +439,8 @@ static struct parallel_data *padata_alloc_pd(struct padata_instance *pinst,
 	pd->squeue = alloc_percpu(struct padata_serial_queue);
 	if (!pd->squeue)
 		goto err_free_pqueue;
+
+	pd->pinst = pinst;
 	if (padata_setup_cpumasks(pd, pcpumask, cbcpumask) < 0)
 		goto err_free_squeue;
 
@@ -437,7 +449,6 @@ static struct parallel_data *padata_alloc_pd(struct padata_instance *pinst,
 	atomic_set(&pd->seq_nr, -1);
 	atomic_set(&pd->reorder_objects, 0);
 	atomic_set(&pd->refcnt, 0);
-	pd->pinst = pinst;
 	spin_lock_init(&pd->lock);
 	pd->cpu = cpumask_first(pcpumask);
 	INIT_WORK(&pd->reorder_work, invoke_padata_reorder);
@@ -968,8 +979,8 @@ static struct padata_instance *padata_alloc(const char *name,
 	if (!pinst)
 		goto err;
 
-	pinst->parallel_wq = alloc_workqueue("%s_parallel", WQ_MEM_RECLAIM |
-					     WQ_CPU_INTENSIVE, 1, name);
+	pinst->parallel_wq = alloc_workqueue("%s_parallel", WQ_UNBOUND, 0,
+					     name);
 	if (!pinst->parallel_wq)
 		goto err_free_inst;
 
-- 
2.22.0


  parent reply	other threads:[~2019-07-25 21:26 UTC|newest]

Thread overview: 13+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2019-07-25 21:24 [RFC 0/9] padata: use unbound workqueues for parallel jobs Daniel Jordan
2019-07-25 21:24 ` [RFC 1/9] padata: allocate workqueue internally Daniel Jordan
2019-07-25 21:24 ` [RFC 2/9] workqueue: unconfine alloc/apply/free_workqueue_attrs() Daniel Jordan
2019-07-29 19:46   ` Tejun Heo
2019-07-25 21:24 ` [RFC 3/9] workqueue: require CPU hotplug read exclusion for apply_workqueue_attrs Daniel Jordan
2019-07-29 19:47   ` Tejun Heo
2019-07-30  1:53     ` Daniel Jordan
2019-07-25 21:25 ` [RFC 4/9] padata: make padata_do_parallel find alternate callback CPU Daniel Jordan
2019-07-25 21:25 ` [RFC 5/9] pcrypt: remove padata cpumask notifier Daniel Jordan
2019-07-25 21:25 ` [RFC 6/9] padata, pcrypt: take CPU hotplug lock internally in padata_alloc_possible Daniel Jordan
2019-07-25 21:25 ` [RFC 7/9] padata: use separate workqueues for parallel and serial work Daniel Jordan
2019-07-25 21:25 ` Daniel Jordan [this message]
2019-07-25 21:25 ` [RFC 9/9] padata: remove cpu_index from the parallel_queue Daniel Jordan

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20190725212505.15055-9-daniel.m.jordan@oracle.com \
    --to=daniel.m.jordan@oracle.com \
    --cc=herbert@gondor.apana.org.au \
    --cc=jiangshanlai@gmail.com \
    --cc=linux-crypto@vger.kernel.org \
    --cc=linux-kernel@vger.kernel.org \
    --cc=peterz@infradead.org \
    --cc=steffen.klassert@secunet.com \
    --cc=tj@kernel.org \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).