aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDarrick J. Wong <darrick.wong@oracle.com>2017-12-06 09:17:08 -0600
committerEric Sandeen <sandeen@redhat.com>2017-12-06 09:17:08 -0600
commit62843f3671b5fffa8b07ffa2df5cd98cc8b20912 (patch)
tree4e81c2e3d1e7e629e796a9d27865c36c0a544463
parent852fe0131d1647bb89cf6d894348091d8a8c68be (diff)
downloadxfsprogs-dev-62843f3671b5fffa8b07ffa2df5cd98cc8b20912.tar.gz
xfs_repair: remove old workqueue stuff in favor of libfrog code
Now that we've made a generic workqueue in libfrog, we can remove the implementation in xfs_repair and turn the old functions into wrappers that call do_error if they fail. There are no functional changes in this patch, though some of the names and types have changed. Signed-off-by: Darrick J. Wong <darrick.wong@oracle.com> Reviewed-by: Eric Sandeen <sandeen@redhat.com> Signed-off-by: Eric Sandeen <sandeen@sandeen.net>
-rw-r--r--repair/phase3.c16
-rw-r--r--repair/phase4.c26
-rw-r--r--repair/phase6.c4
-rw-r--r--repair/phase7.c9
-rw-r--r--repair/prefetch.c20
-rw-r--r--repair/prefetch.h4
-rw-r--r--repair/scan.c16
-rw-r--r--repair/slab.c4
-rw-r--r--repair/threads.c125
-rw-r--r--repair/threads.h38
10 files changed, 74 insertions, 188 deletions
diff --git a/repair/phase3.c b/repair/phase3.c
index 17b1c28d33..8ebe1aefea 100644
--- a/repair/phase3.c
+++ b/repair/phase3.c
@@ -66,7 +66,7 @@ process_agi_unlinked(
static void
process_ag_func(
- work_queue_t *wq,
+ struct workqueue *wq,
xfs_agnumber_t agno,
void *arg)
{
@@ -76,7 +76,7 @@ process_ag_func(
*/
wait_for_inode_prefetch(arg);
do_log(_(" - agno = %d\n"), agno);
- process_aginodes(wq->mp, arg, agno, 1, 0, 1);
+ process_aginodes(wq->wq_ctx, arg, agno, 1, 0, 1);
blkmap_free_final();
cleanup_inode_prefetch(arg);
}
@@ -90,13 +90,13 @@ process_ags(
static void
do_uncertain_aginodes(
- work_queue_t *wq,
- xfs_agnumber_t agno,
- void *arg)
+ struct workqueue *wq,
+ xfs_agnumber_t agno,
+ void *arg)
{
- int *count = arg;
+ int *count = arg;
- *count = process_uncertain_aginodes(wq->mp, agno);
+ *count = process_uncertain_aginodes(wq->wq_ctx, agno);
#ifdef XR_INODE_TRACE
fprintf(stderr,
@@ -114,7 +114,7 @@ phase3(
{
int i, j;
int *counts;
- work_queue_t wq;
+ struct workqueue wq;
do_log(_("Phase 3 - for each AG...\n"));
if (!no_modify)
diff --git a/repair/phase4.c b/repair/phase4.c
index cc17ec03f7..0a02b7dd3c 100644
--- a/repair/phase4.c
+++ b/repair/phase4.c
@@ -134,13 +134,13 @@ quota_sb_check(xfs_mount_t *mp)
static void
process_ag_func(
- work_queue_t *wq,
+ struct workqueue *wq,
xfs_agnumber_t agno,
void *arg)
{
wait_for_inode_prefetch(arg);
do_log(_(" - agno = %d\n"), agno);
- process_aginodes(wq->mp, arg, agno, 0, 1, 0);
+ process_aginodes(wq->wq_ctx, arg, agno, 0, 1, 0);
blkmap_free_final();
cleanup_inode_prefetch(arg);
@@ -169,23 +169,23 @@ _("unable to finish adding attr/data fork reverse-mapping data for AG %u.\n"),
static void
check_rmap_btrees(
- work_queue_t *wq,
+ struct workqueue*wq,
xfs_agnumber_t agno,
void *arg)
{
int error;
- error = rmap_add_fixed_ag_rec(wq->mp, agno);
+ error = rmap_add_fixed_ag_rec(wq->wq_ctx, agno);
if (error)
do_error(
_("unable to add AG %u metadata reverse-mapping data.\n"), agno);
- error = rmap_fold_raw_recs(wq->mp, agno);
+ error = rmap_fold_raw_recs(wq->wq_ctx, agno);
if (error)
do_error(
_("unable to merge AG %u metadata reverse-mapping data.\n"), agno);
- error = rmaps_verify_btree(wq->mp, agno);
+ error = rmaps_verify_btree(wq->wq_ctx, agno);
if (error)
do_error(
_("%s while checking reverse-mappings"),
@@ -194,13 +194,13 @@ _("%s while checking reverse-mappings"),
static void
compute_ag_refcounts(
- work_queue_t *wq,
+ struct workqueue*wq,
xfs_agnumber_t agno,
void *arg)
{
int error;
- error = compute_refcounts(wq->mp, agno);
+ error = compute_refcounts(wq->wq_ctx, agno);
if (error)
do_error(
_("%s while computing reference count records.\n"),
@@ -209,13 +209,13 @@ _("%s while computing reference count records.\n"),
static void
process_inode_reflink_flags(
- struct work_queue *wq,
+ struct workqueue *wq,
xfs_agnumber_t agno,
void *arg)
{
int error;
- error = fix_inode_reflink_flags(wq->mp, agno);
+ error = fix_inode_reflink_flags(wq->wq_ctx, agno);
if (error)
do_error(
_("%s while fixing inode reflink flags.\n"),
@@ -224,13 +224,13 @@ _("%s while fixing inode reflink flags.\n"),
static void
check_refcount_btrees(
- work_queue_t *wq,
+ struct workqueue*wq,
xfs_agnumber_t agno,
void *arg)
{
int error;
- error = check_refcounts(wq->mp, agno);
+ error = check_refcounts(wq->wq_ctx, agno);
if (error)
do_error(
_("%s while checking reference counts"),
@@ -241,7 +241,7 @@ static void
process_rmap_data(
struct xfs_mount *mp)
{
- struct work_queue wq;
+ struct workqueue wq;
xfs_agnumber_t i;
if (!rmap_needs_work(mp))
diff --git a/repair/phase6.c b/repair/phase6.c
index f3b837805c..b326929543 100644
--- a/repair/phase6.c
+++ b/repair/phase6.c
@@ -3125,7 +3125,7 @@ check_for_orphaned_inodes(
static void
traverse_function(
- work_queue_t *wq,
+ struct workqueue *wq,
xfs_agnumber_t agno,
void *arg)
{
@@ -3154,7 +3154,7 @@ traverse_function(
for (i = 0; i < XFS_INODES_PER_CHUNK; i++) {
if (inode_isadir(irec, i))
- process_dir_inode(wq->mp, agno, irec, i);
+ process_dir_inode(wq->wq_ctx, agno, irec, i);
}
}
cleanup_inode_prefetch(pf_args);
diff --git a/repair/phase7.c b/repair/phase7.c
index 4ffb81a867..b495ec2bd9 100644
--- a/repair/phase7.c
+++ b/repair/phase7.c
@@ -98,10 +98,11 @@ update_inode_nlinks(
*/
static void
do_link_updates(
- struct work_queue *wq,
+ struct workqueue *wq,
xfs_agnumber_t agno,
void *arg)
{
+ struct xfs_mount *mp = wq->wq_ctx;
ino_tree_node_t *irec;
int j;
uint32_t nrefs;
@@ -120,8 +121,8 @@ do_link_updates(
ASSERT(no_modify || nrefs > 0);
if (get_inode_disk_nlinks(irec, j) != nrefs)
- update_inode_nlinks(wq->mp,
- XFS_AGINO_TO_INO(wq->mp, agno,
+ update_inode_nlinks(wq->wq_ctx,
+ XFS_AGINO_TO_INO(mp, agno,
irec->ino_startnum + j),
nrefs);
}
@@ -135,7 +136,7 @@ phase7(
struct xfs_mount *mp,
int scan_threads)
{
- struct work_queue wq;
+ struct workqueue wq;
int agno;
if (!no_modify)
diff --git a/repair/prefetch.c b/repair/prefetch.c
index 4c74b6e31b..9c68e35cd9 100644
--- a/repair/prefetch.c
+++ b/repair/prefetch.c
@@ -943,11 +943,11 @@ start_inode_prefetch(
*/
static void
prefetch_ag_range(
- struct work_queue *work,
+ struct workqueue *work,
xfs_agnumber_t start_ag,
xfs_agnumber_t end_ag,
bool dirs_only,
- void (*func)(struct work_queue *,
+ void (*func)(struct workqueue *,
xfs_agnumber_t, void *))
{
int i;
@@ -967,12 +967,12 @@ struct pf_work_args {
xfs_agnumber_t start_ag;
xfs_agnumber_t end_ag;
bool dirs_only;
- void (*func)(struct work_queue *, xfs_agnumber_t, void *);
+ void (*func)(struct workqueue *, xfs_agnumber_t, void *);
};
static void
prefetch_ag_range_work(
- struct work_queue *work,
+ struct workqueue *work,
xfs_agnumber_t unused,
void *args)
{
@@ -991,14 +991,14 @@ void
do_inode_prefetch(
struct xfs_mount *mp,
int stride,
- void (*func)(struct work_queue *,
+ void (*func)(struct workqueue *,
xfs_agnumber_t, void *),
bool check_cache,
bool dirs_only)
{
int i;
- struct work_queue queue;
- struct work_queue *queues;
+ struct workqueue queue;
+ struct workqueue *queues;
int queues_started = 0;
/*
@@ -1008,7 +1008,7 @@ do_inode_prefetch(
* CPU to maximise parallelism of the queue to be processed.
*/
if (check_cache && !libxfs_bcache_overflowed()) {
- queue.mp = mp;
+ queue.wq_ctx = mp;
create_work_queue(&queue, mp, libxfs_nproc());
for (i = 0; i < mp->m_sb.sb_agcount; i++)
queue_work(&queue, func, i, NULL);
@@ -1021,7 +1021,7 @@ do_inode_prefetch(
* directly after each AG is queued.
*/
if (!stride) {
- queue.mp = mp;
+ queue.wq_ctx = mp;
prefetch_ag_range(&queue, 0, mp->m_sb.sb_agcount,
dirs_only, func);
return;
@@ -1030,7 +1030,7 @@ do_inode_prefetch(
/*
* create one worker thread for each segment of the volume
*/
- queues = malloc(thread_count * sizeof(work_queue_t));
+ queues = malloc(thread_count * sizeof(struct workqueue));
for (i = 0; i < thread_count; i++) {
struct pf_work_args *wargs;
diff --git a/repair/prefetch.h b/repair/prefetch.h
index b837752607..8652707e1d 100644
--- a/repair/prefetch.h
+++ b/repair/prefetch.h
@@ -4,7 +4,7 @@
#include <semaphore.h>
#include "incore.h"
-struct work_queue;
+struct workqueue;
extern int do_prefetch;
@@ -45,7 +45,7 @@ void
do_inode_prefetch(
struct xfs_mount *mp,
int stride,
- void (*func)(struct work_queue *,
+ void (*func)(struct workqueue *,
xfs_agnumber_t, void *),
bool check_cache,
bool dirs_only);
diff --git a/repair/scan.c b/repair/scan.c
index 22c733128e..e4ac4a7385 100644
--- a/repair/scan.c
+++ b/repair/scan.c
@@ -2342,7 +2342,7 @@ validate_agi(
*/
static void
scan_ag(
- work_queue_t *wq,
+ struct workqueue*wq,
xfs_agnumber_t agno,
void *arg)
{
@@ -2504,13 +2504,13 @@ scan_ags(
struct xfs_mount *mp,
int scan_threads)
{
- struct aghdr_cnts *agcnts;
- uint64_t fdblocks = 0;
- uint64_t icount = 0;
- uint64_t ifreecount = 0;
- uint64_t usedblocks = 0;
- xfs_agnumber_t i;
- work_queue_t wq;
+ struct aghdr_cnts *agcnts;
+ uint64_t fdblocks = 0;
+ uint64_t icount = 0;
+ uint64_t ifreecount = 0;
+ uint64_t usedblocks = 0;
+ xfs_agnumber_t i;
+ struct workqueue wq;
agcnts = malloc(mp->m_sb.sb_agcount * sizeof(*agcnts));
if (!agcnts) {
diff --git a/repair/slab.c b/repair/slab.c
index d47448a380..b04c3b840e 100644
--- a/repair/slab.c
+++ b/repair/slab.c
@@ -211,7 +211,7 @@ struct qsort_slab {
static void
qsort_slab_helper(
- struct work_queue *wq,
+ struct workqueue *wq,
xfs_agnumber_t agno,
void *arg)
{
@@ -231,7 +231,7 @@ qsort_slab(
struct xfs_slab *slab,
int (*compare_fn)(const void *, const void *))
{
- struct work_queue wq;
+ struct workqueue wq;
struct xfs_slab_hdr *hdr;
struct qsort_slab *qs;
diff --git a/repair/threads.c b/repair/threads.c
index 631531f574..7a7f7486d5 100644
--- a/repair/threads.c
+++ b/repair/threads.c
@@ -6,50 +6,6 @@
#include "protos.h"
#include "globals.h"
-static void *
-worker_thread(void *arg)
-{
- work_queue_t *wq;
- work_item_t *wi;
-
- wq = (work_queue_t*)arg;
-
- /*
- * Loop pulling work from the passed in work queue.
- * Check for notification to exit after every chunk of work.
- */
- while (1) {
- pthread_mutex_lock(&wq->lock);
-
- /*
- * Wait for work.
- */
- while (wq->next_item == NULL && !wq->terminate) {
- ASSERT(wq->item_count == 0);
- pthread_cond_wait(&wq->wakeup, &wq->lock);
- }
- if (wq->next_item == NULL && wq->terminate) {
- pthread_mutex_unlock(&wq->lock);
- break;
- }
-
- /*
- * Dequeue work from the head of the list.
- */
- ASSERT(wq->item_count > 0);
- wi = wq->next_item;
- wq->next_item = wi->next;
- wq->item_count--;
-
- pthread_mutex_unlock(&wq->lock);
-
- (wi->function)(wi->queue, wi->agno, wi->arg);
- free(wi);
- }
-
- return NULL;
-}
-
void
thread_init(void)
{
@@ -67,85 +23,36 @@ thread_init(void)
void
create_work_queue(
- work_queue_t *wq,
- xfs_mount_t *mp,
- int nworkers)
+ struct workqueue *wq,
+ struct xfs_mount *mp,
+ unsigned int nworkers)
{
int err;
- int i;
-
- memset(wq, 0, sizeof(work_queue_t));
- pthread_cond_init(&wq->wakeup, NULL);
- pthread_mutex_init(&wq->lock, NULL);
-
- wq->mp = mp;
- wq->thread_count = nworkers;
- wq->threads = malloc(nworkers * sizeof(pthread_t));
- wq->terminate = 0;
-
- for (i = 0; i < nworkers; i++) {
- err = pthread_create(&wq->threads[i], NULL, worker_thread, wq);
- if (err != 0) {
- do_error(_("cannot create worker threads, error = [%d] %s\n"),
+ err = workqueue_create(wq, mp, nworkers);
+ if (err)
+ do_error(_("cannot create worker threads, error = [%d] %s\n"),
err, strerror(err));
- }
- }
-
}
void
queue_work(
- work_queue_t *wq,
- work_func_t func,
- xfs_agnumber_t agno,
- void *arg)
+ struct workqueue *wq,
+ workqueue_func_t func,
+ xfs_agnumber_t agno,
+ void *arg)
{
- work_item_t *wi;
+ int err;
- wi = (work_item_t *)malloc(sizeof(work_item_t));
- if (wi == NULL)
+ err = workqueue_add(wq, func, agno, arg);
+ if (err)
do_error(_("cannot allocate worker item, error = [%d] %s\n"),
- errno, strerror(errno));
-
- wi->function = func;
- wi->agno = agno;
- wi->arg = arg;
- wi->queue = wq;
- wi->next = NULL;
-
- /*
- * Now queue the new work structure to the work queue.
- */
- pthread_mutex_lock(&wq->lock);
- if (wq->next_item == NULL) {
- wq->next_item = wi;
- ASSERT(wq->item_count == 0);
- pthread_cond_signal(&wq->wakeup);
- } else {
- wq->last_item->next = wi;
- }
- wq->last_item = wi;
- wq->item_count++;
- pthread_mutex_unlock(&wq->lock);
+ err, strerror(err));
}
void
destroy_work_queue(
- work_queue_t *wq)
+ struct workqueue *wq)
{
- int i;
-
- pthread_mutex_lock(&wq->lock);
- wq->terminate = 1;
- pthread_mutex_unlock(&wq->lock);
-
- pthread_cond_broadcast(&wq->wakeup);
-
- for (i = 0; i < wq->thread_count; i++)
- pthread_join(wq->threads[i], NULL);
-
- free(wq->threads);
- pthread_mutex_destroy(&wq->lock);
- pthread_cond_destroy(&wq->wakeup);
+ workqueue_destroy(wq);
}
diff --git a/repair/threads.h b/repair/threads.h
index bb0b8f851e..fce520aa22 100644
--- a/repair/threads.h
+++ b/repair/threads.h
@@ -1,47 +1,25 @@
#ifndef _XFS_REPAIR_THREADS_H_
#define _XFS_REPAIR_THREADS_H_
-void thread_init(void);
-
-struct work_queue;
-
-typedef void work_func_t(struct work_queue *, xfs_agnumber_t, void *);
+#include "workqueue.h"
-typedef struct work_item {
- struct work_item *next;
- work_func_t *function;
- struct work_queue *queue;
- xfs_agnumber_t agno;
- void *arg;
-} work_item_t;
-
-typedef struct work_queue {
- work_item_t *next_item;
- work_item_t *last_item;
- int item_count;
- int thread_count;
- pthread_t *threads;
- xfs_mount_t *mp;
- pthread_mutex_t lock;
- pthread_cond_t wakeup;
- int terminate;
-} work_queue_t;
+void thread_init(void);
void
create_work_queue(
- work_queue_t *wq,
- xfs_mount_t *mp,
- int nworkers);
+ struct workqueue *wq,
+ struct xfs_mount *mp,
+ unsigned int nworkers);
void
queue_work(
- work_queue_t *wq,
- work_func_t func,
+ struct workqueue *wq,
+ workqueue_func_t func,
xfs_agnumber_t agno,
void *arg);
void
destroy_work_queue(
- work_queue_t *wq);
+ struct workqueue *wq);
#endif /* _XFS_REPAIR_THREADS_H_ */