aboutsummaryrefslogtreecommitdiffstats
path: root/fs/dlm
diff options
context:
space:
mode:
authorLinus Torvalds <torvalds@linux-foundation.org>2023-06-29 13:27:50 -0700
committerLinus Torvalds <torvalds@linux-foundation.org>2023-06-29 13:27:50 -0700
commit0a37714f96d5746268dc09bdd400a215f180ba9b (patch)
tree7ac11e8bef8992d9ec551e6e6c8e61156c841e00 /fs/dlm
parent9e06150d3c04d1a5028a485263912ea892545d2f (diff)
parentfc4ea4229c2b2dca0bffc12eee6973e353c20086 (diff)
downloadlinux-0a37714f96d5746268dc09bdd400a215f180ba9b.tar.gz
Merge tag 'dlm-6.5' of git://git.kernel.org/pub/scm/linux/kernel/git/teigland/linux-dlm
Pull dlm updates from David Teigland: "The dlm posix lock handling (for gfs2) has three notable changes: - Local pids returned from GETLK are no longer negated. A previous patch negating remote pids mistakenly changed local pids also. - SETLKW operations can now be interrupted only when the process is killed, and not from other signals. General interruption was resulting in previously acquired locks being cleared, not just the in-progress lock. Handling this correctly will require extending a cancel capability to user space (a future feature.) - If multiple threads are requesting posix locks (with SETLKW), fix incorrect matching of results to the requests. The dlm networking has several minor cleanups, and one notable change: - Avoid delaying ack messages for too long (used for message reliability), resulting in a backlog of un-acked messages. These could previously be delayed as a result of either too many or too few other messages being sent. Now an upper and lower threshold is used to determine when an ack should be sent" * tag 'dlm-6.5' of git://git.kernel.org/pub/scm/linux/kernel/git/teigland/linux-dlm: fs: dlm: remove filter local comms on close fs: dlm: add send ack threshold and append acks to msgs fs: dlm: handle sequence numbers as atomic fs: dlm: handle lkb wait count as atomic_t fs: dlm: filter ourself midcomms calls fs: dlm: warn about messages from left nodes fs: dlm: move dlm_purge_lkb_callbacks to user module fs: dlm: cleanup STOP_IO bitflag set when stop io fs: dlm: don't check othercon twice fs: dlm: unregister memory at the very last fs: dlm: fix missing pending to false fs: dlm: clear pending bit when queue was empty fs: dlm: revert check required context while close fs: dlm: fix mismatch of plock results from userspace fs: dlm: make F_SETLK use unkillable wait_event fs: dlm: interrupt posix locks only when process is killed fs: dlm: fix cleanup pending ops when interrupted fs: dlm: return positive pid value for F_GETLK dlm: Replace all non-returning strlcpy with strscpy
Diffstat (limited to 'fs/dlm')
-rw-r--r--fs/dlm/ast.c25
-rw-r--r--fs/dlm/ast.h1
-rw-r--r--fs/dlm/dlm_internal.h2
-rw-r--r--fs/dlm/lock.c36
-rw-r--r--fs/dlm/lockspace.c12
-rw-r--r--fs/dlm/lockspace.h1
-rw-r--r--fs/dlm/lowcomms.c49
-rw-r--r--fs/dlm/main.c2
-rw-r--r--fs/dlm/member.c37
-rw-r--r--fs/dlm/midcomms.c126
-rw-r--r--fs/dlm/plock.c115
-rw-r--r--fs/dlm/user.c18
-rw-r--r--fs/dlm/user.h1
13 files changed, 203 insertions, 222 deletions
diff --git a/fs/dlm/ast.c b/fs/dlm/ast.c
index 700ff2e0515a1..1f2f70a1b824e 100644
--- a/fs/dlm/ast.c
+++ b/fs/dlm/ast.c
@@ -36,23 +36,6 @@ void dlm_callback_set_last_ptr(struct dlm_callback **from,
*from = to;
}
-void dlm_purge_lkb_callbacks(struct dlm_lkb *lkb)
-{
- struct dlm_callback *cb, *safe;
-
- list_for_each_entry_safe(cb, safe, &lkb->lkb_callbacks, list) {
- list_del(&cb->list);
- kref_put(&cb->ref, dlm_release_callback);
- }
-
- clear_bit(DLM_IFL_CB_PENDING_BIT, &lkb->lkb_iflags);
-
- /* invalidate */
- dlm_callback_set_last_ptr(&lkb->lkb_last_cast, NULL);
- dlm_callback_set_last_ptr(&lkb->lkb_last_cb, NULL);
- lkb->lkb_last_bast_mode = -1;
-}
-
int dlm_enqueue_lkb_callback(struct dlm_lkb *lkb, uint32_t flags, int mode,
int status, uint32_t sbflags)
{
@@ -181,10 +164,12 @@ void dlm_callback_work(struct work_struct *work)
spin_lock(&lkb->lkb_cb_lock);
rv = dlm_dequeue_lkb_callback(lkb, &cb);
- spin_unlock(&lkb->lkb_cb_lock);
-
- if (WARN_ON_ONCE(rv == DLM_DEQUEUE_CALLBACK_EMPTY))
+ if (WARN_ON_ONCE(rv == DLM_DEQUEUE_CALLBACK_EMPTY)) {
+ clear_bit(DLM_IFL_CB_PENDING_BIT, &lkb->lkb_iflags);
+ spin_unlock(&lkb->lkb_cb_lock);
goto out;
+ }
+ spin_unlock(&lkb->lkb_cb_lock);
for (;;) {
castfn = lkb->lkb_astfn;
diff --git a/fs/dlm/ast.h b/fs/dlm/ast.h
index 880b118824953..ce007892dc2d3 100644
--- a/fs/dlm/ast.h
+++ b/fs/dlm/ast.h
@@ -26,7 +26,6 @@ void dlm_callback_set_last_ptr(struct dlm_callback **from,
struct dlm_callback *to);
void dlm_release_callback(struct kref *ref);
-void dlm_purge_lkb_callbacks(struct dlm_lkb *lkb);
void dlm_callback_work(struct work_struct *work);
int dlm_callback_start(struct dlm_ls *ls);
void dlm_callback_stop(struct dlm_ls *ls);
diff --git a/fs/dlm/dlm_internal.h b/fs/dlm/dlm_internal.h
index 986a9d7b1f332..c8156770205e6 100644
--- a/fs/dlm/dlm_internal.h
+++ b/fs/dlm/dlm_internal.h
@@ -246,7 +246,7 @@ struct dlm_lkb {
int8_t lkb_highbast; /* highest mode bast sent for */
int8_t lkb_wait_type; /* type of reply waiting for */
- int8_t lkb_wait_count;
+ atomic_t lkb_wait_count;
int lkb_wait_nodeid; /* for debugging */
struct list_head lkb_statequeue; /* rsb g/c/w list */
diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
index debf8a55ad7d9..f511a9d7d416e 100644
--- a/fs/dlm/lock.c
+++ b/fs/dlm/lock.c
@@ -1407,6 +1407,7 @@ static int add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid)
{
struct dlm_ls *ls = lkb->lkb_resource->res_ls;
int error = 0;
+ int wc;
mutex_lock(&ls->ls_waiters_mutex);
@@ -1428,20 +1429,17 @@ static int add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid)
error = -EBUSY;
goto out;
}
- lkb->lkb_wait_count++;
+ wc = atomic_inc_return(&lkb->lkb_wait_count);
hold_lkb(lkb);
log_debug(ls, "addwait %x cur %d overlap %d count %d f %x",
- lkb->lkb_id, lkb->lkb_wait_type, mstype,
- lkb->lkb_wait_count, dlm_iflags_val(lkb));
+ lkb->lkb_id, lkb->lkb_wait_type, mstype, wc,
+ dlm_iflags_val(lkb));
goto out;
}
- DLM_ASSERT(!lkb->lkb_wait_count,
- dlm_print_lkb(lkb);
- printk("wait_count %d\n", lkb->lkb_wait_count););
-
- lkb->lkb_wait_count++;
+ wc = atomic_fetch_inc(&lkb->lkb_wait_count);
+ DLM_ASSERT(!wc, dlm_print_lkb(lkb); printk("wait_count %d\n", wc););
lkb->lkb_wait_type = mstype;
lkb->lkb_wait_nodeid = to_nodeid; /* for debugging */
hold_lkb(lkb);
@@ -1504,7 +1502,7 @@ static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype,
log_debug(ls, "remwait %x convert_reply zap overlap_cancel",
lkb->lkb_id);
lkb->lkb_wait_type = 0;
- lkb->lkb_wait_count--;
+ atomic_dec(&lkb->lkb_wait_count);
unhold_lkb(lkb);
goto out_del;
}
@@ -1531,16 +1529,15 @@ static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype,
if (overlap_done && lkb->lkb_wait_type) {
log_error(ls, "remwait error %x reply %d wait_type %d overlap",
lkb->lkb_id, mstype, lkb->lkb_wait_type);
- lkb->lkb_wait_count--;
+ atomic_dec(&lkb->lkb_wait_count);
unhold_lkb(lkb);
lkb->lkb_wait_type = 0;
}
- DLM_ASSERT(lkb->lkb_wait_count, dlm_print_lkb(lkb););
+ DLM_ASSERT(atomic_read(&lkb->lkb_wait_count), dlm_print_lkb(lkb););
clear_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags);
- lkb->lkb_wait_count--;
- if (!lkb->lkb_wait_count)
+ if (atomic_dec_and_test(&lkb->lkb_wait_count))
list_del_init(&lkb->lkb_wait_reply);
unhold_lkb(lkb);
return 0;
@@ -2669,7 +2666,7 @@ static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
goto out;
/* lock not allowed if there's any op in progress */
- if (lkb->lkb_wait_type || lkb->lkb_wait_count)
+ if (lkb->lkb_wait_type || atomic_read(&lkb->lkb_wait_count))
goto out;
if (is_overlap(lkb))
@@ -2731,7 +2728,7 @@ static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
/* normal unlock not allowed if there's any op in progress */
if (!(args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) &&
- (lkb->lkb_wait_type || lkb->lkb_wait_count))
+ (lkb->lkb_wait_type || atomic_read(&lkb->lkb_wait_count)))
goto out;
/* an lkb may be waiting for an rsb lookup to complete where the
@@ -4616,7 +4613,7 @@ static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms,
{
int error = 0, noent = 0;
- if (!dlm_is_member(ls, le32_to_cpu(ms->m_header.h_nodeid))) {
+ if (WARN_ON_ONCE(!dlm_is_member(ls, le32_to_cpu(ms->m_header.h_nodeid)))) {
log_limit(ls, "receive %d from non-member %d %x %x %d",
le32_to_cpu(ms->m_type),
le32_to_cpu(ms->m_header.h_nodeid),
@@ -4754,7 +4751,7 @@ static void dlm_receive_message(struct dlm_ls *ls, struct dlm_message *ms,
/* If we were a member of this lockspace, left, and rejoined,
other nodes may still be sending us messages from the
lockspace generation before we left. */
- if (!ls->ls_generation) {
+ if (WARN_ON_ONCE(!ls->ls_generation)) {
log_limit(ls, "receive %d from %d ignore old gen",
le32_to_cpu(ms->m_type), nodeid);
return;
@@ -5066,10 +5063,9 @@ int dlm_recover_waiters_post(struct dlm_ls *ls)
/* drop all wait_count references we still
* hold a reference for this iteration.
*/
- while (lkb->lkb_wait_count) {
- lkb->lkb_wait_count--;
+ while (!atomic_dec_and_test(&lkb->lkb_wait_count))
unhold_lkb(lkb);
- }
+
mutex_lock(&ls->ls_waiters_mutex);
list_del_init(&lkb->lkb_wait_reply);
mutex_unlock(&ls->ls_waiters_mutex);
diff --git a/fs/dlm/lockspace.c b/fs/dlm/lockspace.c
index 67261b7b1f0e1..0455dddb0797c 100644
--- a/fs/dlm/lockspace.c
+++ b/fs/dlm/lockspace.c
@@ -935,15 +935,3 @@ void dlm_stop_lockspaces(void)
log_print("dlm user daemon left %d lockspaces", count);
}
-void dlm_stop_lockspaces_check(void)
-{
- struct dlm_ls *ls;
-
- spin_lock(&lslist_lock);
- list_for_each_entry(ls, &lslist, ls_list) {
- if (WARN_ON(!rwsem_is_locked(&ls->ls_in_recovery) ||
- !dlm_locking_stopped(ls)))
- break;
- }
- spin_unlock(&lslist_lock);
-}
diff --git a/fs/dlm/lockspace.h b/fs/dlm/lockspace.h
index 03f4a4a3a871c..47ebd44119264 100644
--- a/fs/dlm/lockspace.h
+++ b/fs/dlm/lockspace.h
@@ -27,7 +27,6 @@ struct dlm_ls *dlm_find_lockspace_local(void *id);
struct dlm_ls *dlm_find_lockspace_device(int minor);
void dlm_put_lockspace(struct dlm_ls *ls);
void dlm_stop_lockspaces(void);
-void dlm_stop_lockspaces_check(void);
int dlm_new_user_lockspace(const char *name, const char *cluster,
uint32_t flags, int lvblen,
const struct dlm_lockspace_ops *ops,
diff --git a/fs/dlm/lowcomms.c b/fs/dlm/lowcomms.c
index 5c12d8cdfc16d..9f14ea9f63224 100644
--- a/fs/dlm/lowcomms.c
+++ b/fs/dlm/lowcomms.c
@@ -546,9 +546,6 @@ int dlm_lowcomms_connect_node(int nodeid)
struct connection *con;
int idx;
- if (nodeid == dlm_our_nodeid())
- return 0;
-
idx = srcu_read_lock(&connections_srcu);
con = nodeid2con(nodeid, 0);
if (WARN_ON_ONCE(!con)) {
@@ -735,19 +732,15 @@ static void stop_connection_io(struct connection *con)
if (con->othercon)
stop_connection_io(con->othercon);
+ spin_lock_bh(&con->writequeue_lock);
+ set_bit(CF_IO_STOP, &con->flags);
+ spin_unlock_bh(&con->writequeue_lock);
+
down_write(&con->sock_lock);
if (con->sock) {
lock_sock(con->sock->sk);
restore_callbacks(con->sock->sk);
-
- spin_lock_bh(&con->writequeue_lock);
- set_bit(CF_IO_STOP, &con->flags);
- spin_unlock_bh(&con->writequeue_lock);
release_sock(con->sock->sk);
- } else {
- spin_lock_bh(&con->writequeue_lock);
- set_bit(CF_IO_STOP, &con->flags);
- spin_unlock_bh(&con->writequeue_lock);
}
up_write(&con->sock_lock);
@@ -867,30 +860,8 @@ struct dlm_processed_nodes {
struct list_head list;
};
-static void add_processed_node(int nodeid, struct list_head *processed_nodes)
-{
- struct dlm_processed_nodes *n;
-
- list_for_each_entry(n, processed_nodes, list) {
- /* we already remembered this node */
- if (n->nodeid == nodeid)
- return;
- }
-
- /* if it's fails in worst case we simple don't send an ack back.
- * We try it next time.
- */
- n = kmalloc(sizeof(*n), GFP_NOFS);
- if (!n)
- return;
-
- n->nodeid = nodeid;
- list_add(&n->list, processed_nodes);
-}
-
static void process_dlm_messages(struct work_struct *work)
{
- struct dlm_processed_nodes *n, *n_tmp;
struct processqueue_entry *pentry;
LIST_HEAD(processed_nodes);
@@ -898,6 +869,7 @@ static void process_dlm_messages(struct work_struct *work)
pentry = list_first_entry_or_null(&processqueue,
struct processqueue_entry, list);
if (WARN_ON_ONCE(!pentry)) {
+ process_dlm_messages_pending = false;
spin_unlock(&processqueue_lock);
return;
}
@@ -908,7 +880,6 @@ static void process_dlm_messages(struct work_struct *work)
for (;;) {
dlm_process_incoming_buffer(pentry->nodeid, pentry->buf,
pentry->buflen);
- add_processed_node(pentry->nodeid, &processed_nodes);
free_processqueue_entry(pentry);
spin_lock(&processqueue_lock);
@@ -923,13 +894,6 @@ static void process_dlm_messages(struct work_struct *work)
list_del(&pentry->list);
spin_unlock(&processqueue_lock);
}
-
- /* send ack back after we processed couple of messages */
- list_for_each_entry_safe(n, n_tmp, &processed_nodes, list) {
- list_del(&n->list);
- dlm_midcomms_receive_done(n->nodeid);
- kfree(n);
- }
}
/* Data received from remote end */
@@ -1500,8 +1464,7 @@ int dlm_lowcomms_close(int nodeid)
call_srcu(&connections_srcu, &con->rcu, connection_release);
if (con->othercon) {
clean_one_writequeue(con->othercon);
- if (con->othercon)
- call_srcu(&connections_srcu, &con->othercon->rcu, connection_release);
+ call_srcu(&connections_srcu, &con->othercon->rcu, connection_release);
}
srcu_read_unlock(&connections_srcu, idx);
diff --git a/fs/dlm/main.c b/fs/dlm/main.c
index 93755f83a30d1..6ca28299c9db1 100644
--- a/fs/dlm/main.c
+++ b/fs/dlm/main.c
@@ -73,10 +73,10 @@ static void __exit exit_dlm(void)
dlm_plock_exit();
dlm_user_exit();
dlm_config_exit();
- dlm_memory_exit();
dlm_lockspace_exit();
dlm_midcomms_exit();
dlm_unregister_debugfs();
+ dlm_memory_exit();
}
module_init(init_dlm);
diff --git a/fs/dlm/member.c b/fs/dlm/member.c
index 923c01a8a0aa2..77d202e4a02a4 100644
--- a/fs/dlm/member.c
+++ b/fs/dlm/member.c
@@ -307,6 +307,21 @@ static void add_ordered_member(struct dlm_ls *ls, struct dlm_member *new)
}
}
+static int add_remote_member(int nodeid)
+{
+ int error;
+
+ if (nodeid == dlm_our_nodeid())
+ return 0;
+
+ error = dlm_lowcomms_connect_node(nodeid);
+ if (error < 0)
+ return error;
+
+ dlm_midcomms_add_member(nodeid);
+ return 0;
+}
+
static int dlm_add_member(struct dlm_ls *ls, struct dlm_config_node *node)
{
struct dlm_member *memb;
@@ -316,16 +331,16 @@ static int dlm_add_member(struct dlm_ls *ls, struct dlm_config_node *node)
if (!memb)
return -ENOMEM;
- error = dlm_lowcomms_connect_node(node->nodeid);
+ memb->nodeid = node->nodeid;
+ memb->weight = node->weight;
+ memb->comm_seq = node->comm_seq;
+
+ error = add_remote_member(node->nodeid);
if (error < 0) {
kfree(memb);
return error;
}
- memb->nodeid = node->nodeid;
- memb->weight = node->weight;
- memb->comm_seq = node->comm_seq;
- dlm_midcomms_add_member(node->nodeid);
add_ordered_member(ls, memb);
ls->ls_num_nodes++;
return 0;
@@ -370,11 +385,19 @@ static void clear_memb_list(struct list_head *head,
}
}
-static void clear_members_cb(int nodeid)
+static void remove_remote_member(int nodeid)
{
+ if (nodeid == dlm_our_nodeid())
+ return;
+
dlm_midcomms_remove_member(nodeid);
}
+static void clear_members_cb(int nodeid)
+{
+ remove_remote_member(nodeid);
+}
+
void dlm_clear_members(struct dlm_ls *ls)
{
clear_memb_list(&ls->ls_nodes, clear_members_cb);
@@ -562,7 +585,7 @@ int dlm_recover_members(struct dlm_ls *ls, struct dlm_recover *rv, int *neg_out)
neg++;
list_move(&memb->list, &ls->ls_nodes_gone);
- dlm_midcomms_remove_member(memb->nodeid);
+ remove_remote_member(memb->nodeid);
ls->ls_num_nodes--;
dlm_lsop_recover_slot(ls, memb);
}
diff --git a/fs/dlm/midcomms.c b/fs/dlm/midcomms.c
index c02c43e4980a7..e1a0df67b5669 100644
--- a/fs/dlm/midcomms.c
+++ b/fs/dlm/midcomms.c
@@ -136,7 +136,6 @@
#include <net/tcp.h>
#include "dlm_internal.h"
-#include "lockspace.h"
#include "lowcomms.h"
#include "config.h"
#include "memory.h"
@@ -149,12 +148,14 @@
/* 5 seconds wait to sync ending of dlm */
#define DLM_SHUTDOWN_TIMEOUT msecs_to_jiffies(5000)
#define DLM_VERSION_NOT_SET 0
+#define DLM_SEND_ACK_BACK_MSG_THRESHOLD 32
+#define DLM_RECV_ACK_BACK_MSG_THRESHOLD (DLM_SEND_ACK_BACK_MSG_THRESHOLD * 8)
struct midcomms_node {
int nodeid;
uint32_t version;
- uint32_t seq_send;
- uint32_t seq_next;
+ atomic_t seq_send;
+ atomic_t seq_next;
/* These queues are unbound because we cannot drop any message in dlm.
* We could send a fence signal for a specific node to the cluster
* manager if queues hits some maximum value, however this handling
@@ -166,7 +167,7 @@ struct midcomms_node {
#define DLM_NODE_FLAG_CLOSE 1
#define DLM_NODE_FLAG_STOP_TX 2
#define DLM_NODE_FLAG_STOP_RX 3
-#define DLM_NODE_ULP_DELIVERED 4
+ atomic_t ulp_delivered;
unsigned long flags;
wait_queue_head_t shutdown_wait;
@@ -318,8 +319,9 @@ static void midcomms_node_reset(struct midcomms_node *node)
{
pr_debug("reset node %d\n", node->nodeid);
- node->seq_next = DLM_SEQ_INIT;
- node->seq_send = DLM_SEQ_INIT;
+ atomic_set(&node->seq_next, DLM_SEQ_INIT);
+ atomic_set(&node->seq_send, DLM_SEQ_INIT);
+ atomic_set(&node->ulp_delivered, 0);
node->version = DLM_VERSION_NOT_SET;
node->flags = 0;
@@ -394,6 +396,28 @@ static int dlm_send_ack(int nodeid, uint32_t seq)
return 0;
}
+static void dlm_send_ack_threshold(struct midcomms_node *node,
+ uint32_t threshold)
+{
+ uint32_t oval, nval;
+ bool send_ack;
+
+ /* let only send one user trigger threshold to send ack back */
+ do {
+ oval = atomic_read(&node->ulp_delivered);
+ send_ack = (oval > threshold);
+ /* abort if threshold is not reached */
+ if (!send_ack)
+ break;
+
+ nval = 0;
+ /* try to reset ulp_delivered counter */
+ } while (atomic_cmpxchg(&node->ulp_delivered, oval, nval) != oval);
+
+ if (send_ack)
+ dlm_send_ack(node->nodeid, atomic_read(&node->seq_next));
+}
+
static int dlm_send_fin(struct midcomms_node *node,
void (*ack_rcv)(struct midcomms_node *node))
{
@@ -493,9 +517,19 @@ static void dlm_midcomms_receive_buffer(union dlm_packet *p,
struct midcomms_node *node,
uint32_t seq)
{
- if (seq == node->seq_next) {
- node->seq_next++;
+ bool is_expected_seq;
+ uint32_t oval, nval;
+ do {
+ oval = atomic_read(&node->seq_next);
+ is_expected_seq = (oval == seq);
+ if (!is_expected_seq)
+ break;
+
+ nval = oval + 1;
+ } while (atomic_cmpxchg(&node->seq_next, oval, nval) != oval);
+
+ if (is_expected_seq) {
switch (p->header.h_cmd) {
case DLM_FIN:
spin_lock(&node->state_lock);
@@ -504,7 +538,7 @@ static void dlm_midcomms_receive_buffer(union dlm_packet *p,
switch (node->state) {
case DLM_ESTABLISHED:
- dlm_send_ack(node->nodeid, node->seq_next);
+ dlm_send_ack(node->nodeid, nval);
/* passive shutdown DLM_LAST_ACK case 1
* additional we check if the node is used by
@@ -523,14 +557,14 @@ static void dlm_midcomms_receive_buffer(union dlm_packet *p,
}
break;
case DLM_FIN_WAIT1:
- dlm_send_ack(node->nodeid, node->seq_next);
+ dlm_send_ack(node->nodeid, nval);
node->state = DLM_CLOSING;
set_bit(DLM_NODE_FLAG_STOP_RX, &node->flags);
pr_debug("switch node %d to state %s\n",
node->nodeid, dlm_state_str(node->state));
break;
case DLM_FIN_WAIT2:
- dlm_send_ack(node->nodeid, node->seq_next);
+ dlm_send_ack(node->nodeid, nval);
midcomms_node_reset(node);
pr_debug("switch node %d to state %s\n",
node->nodeid, dlm_state_str(node->state));
@@ -551,18 +585,20 @@ static void dlm_midcomms_receive_buffer(union dlm_packet *p,
WARN_ON_ONCE(test_bit(DLM_NODE_FLAG_STOP_RX, &node->flags));
dlm_receive_buffer_3_2_trace(seq, p);
dlm_receive_buffer(p, node->nodeid);
- set_bit(DLM_NODE_ULP_DELIVERED, &node->flags);
+ atomic_inc(&node->ulp_delivered);
+ /* unlikely case to send ack back when we don't transmit */
+ dlm_send_ack_threshold(node, DLM_RECV_ACK_BACK_MSG_THRESHOLD);
break;
}
} else {
/* retry to ack message which we already have by sending back
* current node->seq_next number as ack.
*/
- if (seq < node->seq_next)
- dlm_send_ack(node->nodeid, node->seq_next);
+ if (seq < oval)
+ dlm_send_ack(node->nodeid, oval);
log_print_ratelimited("ignore dlm msg because seq mismatch, seq: %u, expected: %u, nodeid: %d",
- seq, node->seq_next, node->nodeid);
+ seq, oval, node->nodeid);
}
}
@@ -960,49 +996,6 @@ int dlm_process_incoming_buffer(int nodeid, unsigned char *buf, int len)
return ret;
}
-void dlm_midcomms_receive_done(int nodeid)
-{
- struct midcomms_node *node;
- int idx;
-
- idx = srcu_read_lock(&nodes_srcu);
- node = nodeid2node(nodeid, 0);
- if (!node) {
- srcu_read_unlock(&nodes_srcu, idx);
- return;
- }
-
- /* old protocol, we do nothing */
- switch (node->version) {
- case DLM_VERSION_3_2:
- break;
- default:
- srcu_read_unlock(&nodes_srcu, idx);
- return;
- }
-
- /* do nothing if we didn't delivered stateful to ulp */
- if (!test_and_clear_bit(DLM_NODE_ULP_DELIVERED,
- &node->flags)) {
- srcu_read_unlock(&nodes_srcu, idx);
- return;
- }
-
- spin_lock(&node->state_lock);
- /* we only ack if state is ESTABLISHED */
- switch (node->state) {
- case DLM_ESTABLISHED:
- spin_unlock(&node->state_lock);
- dlm_send_ack(node->nodeid, node->seq_next);
- break;
- default:
- spin_unlock(&node->state_lock);
- /* do nothing FIN has it's own ack send */
- break;
- }
- srcu_read_unlock(&nodes_srcu, idx);
-}
-
void dlm_midcomms_unack_msg_resend(int nodeid)
{
struct midcomms_node *node;
@@ -1059,7 +1052,7 @@ static void midcomms_new_msg_cb(void *data)
list_add_tail_rcu(&mh->list, &mh->node->send_queue);
spin_unlock_bh(&mh->node->send_queue_lock);
- mh->seq = mh->node->seq_send++;
+ mh->seq = atomic_fetch_inc(&mh->node->seq_send);
}
static struct dlm_msg *dlm_midcomms_get_msg_3_2(struct dlm_mhandle *mh, int nodeid,
@@ -1133,6 +1126,8 @@ struct dlm_mhandle *dlm_midcomms_get_mhandle(int nodeid, int len,
goto err;
}
+ /* send ack back if necessary */
+ dlm_send_ack_threshold(node, DLM_SEND_ACK_BACK_MSG_THRESHOLD);
break;
default:
dlm_free_mhandle(mh);
@@ -1281,9 +1276,6 @@ void dlm_midcomms_add_member(int nodeid)
struct midcomms_node *node;
int idx;
- if (nodeid == dlm_our_nodeid())
- return;
-
idx = srcu_read_lock(&nodes_srcu);
node = nodeid2node(nodeid, GFP_NOFS);
if (!node) {
@@ -1329,9 +1321,6 @@ void dlm_midcomms_remove_member(int nodeid)
struct midcomms_node *node;
int idx;
- if (nodeid == dlm_our_nodeid())
- return;
-
idx = srcu_read_lock(&nodes_srcu);
node = nodeid2node(nodeid, 0);
if (!node) {
@@ -1488,11 +1477,6 @@ int dlm_midcomms_close(int nodeid)
struct midcomms_node *node;
int idx, ret;
- if (nodeid == dlm_our_nodeid())
- return 0;
-
- dlm_stop_lockspaces_check();
-
idx = srcu_read_lock(&nodes_srcu);
/* Abort pending close/remove operation */
node = nodeid2node(nodeid, 0);
@@ -1542,7 +1526,7 @@ static void midcomms_new_rawmsg_cb(void *data)
switch (h->h_cmd) {
case DLM_OPTS:
if (!h->u.h_seq)
- h->u.h_seq = cpu_to_le32(rd->node->seq_send++);
+ h->u.h_seq = cpu_to_le32(atomic_fetch_inc(&rd->node->seq_send));
break;
default:
break;
diff --git a/fs/dlm/plock.c b/fs/dlm/plock.c
index ed4357e62f35f..70a4752ed913a 100644
--- a/fs/dlm/plock.c
+++ b/fs/dlm/plock.c
@@ -30,8 +30,6 @@ struct plock_async_data {
struct plock_op {
struct list_head list;
int done;
- /* if lock op got interrupted while waiting dlm_controld reply */
- bool sigint;
struct dlm_plock_info info;
/* if set indicates async handling */
struct plock_async_data *data;
@@ -157,23 +155,29 @@ int dlm_posix_lock(dlm_lockspace_t *lockspace, u64 number, struct file *file,
send_op(op);
- rv = wait_event_interruptible(recv_wq, (op->done != 0));
- if (rv == -ERESTARTSYS) {
- spin_lock(&ops_lock);
- /* recheck under ops_lock if we got a done != 0,
- * if so this interrupt case should be ignored
- */
- if (op->done != 0) {
+ if (op->info.wait) {
+ rv = wait_event_killable(recv_wq, (op->done != 0));
+ if (rv == -ERESTARTSYS) {
+ spin_lock(&ops_lock);
+ /* recheck under ops_lock if we got a done != 0,
+ * if so this interrupt case should be ignored
+ */
+ if (op->done != 0) {
+ spin_unlock(&ops_lock);
+ goto do_lock_wait;
+ }
+ list_del(&op->list);
spin_unlock(&ops_lock);
- goto do_lock_wait;
- }
- op->sigint = true;
- spin_unlock(&ops_lock);
- log_debug(ls, "%s: wait interrupted %x %llx pid %d",
- __func__, ls->ls_global_id,
- (unsigned long long)number, op->info.pid);
- goto out;
+ log_debug(ls, "%s: wait interrupted %x %llx pid %d",
+ __func__, ls->ls_global_id,
+ (unsigned long long)number, op->info.pid);
+ do_unlock_close(&op->info);
+ dlm_release_plock_op(op);
+ goto out;
+ }
+ } else {
+ wait_event(recv_wq, (op->done != 0));
}
do_lock_wait:
@@ -360,7 +364,9 @@ int dlm_posix_get(dlm_lockspace_t *lockspace, u64 number, struct file *file,
locks_init_lock(fl);
fl->fl_type = (op->info.ex) ? F_WRLCK : F_RDLCK;
fl->fl_flags = FL_POSIX;
- fl->fl_pid = -op->info.pid;
+ fl->fl_pid = op->info.pid;
+ if (op->info.nodeid != dlm_our_nodeid())
+ fl->fl_pid = -fl->fl_pid;
fl->fl_start = op->info.start;
fl->fl_end = op->info.end;
rv = 0;
@@ -389,7 +395,7 @@ static ssize_t dev_read(struct file *file, char __user *u, size_t count,
if (op->info.flags & DLM_PLOCK_FL_CLOSE)
list_del(&op->list);
else
- list_move(&op->list, &recv_list);
+ list_move_tail(&op->list, &recv_list);
memcpy(&info, &op->info, sizeof(info));
}
spin_unlock(&ops_lock);
@@ -427,34 +433,53 @@ static ssize_t dev_write(struct file *file, const char __user *u, size_t count,
if (check_version(&info))
return -EINVAL;
+ /*
+ * The results for waiting ops (SETLKW) can be returned in any
+ * order, so match all fields to find the op. The results for
+ * non-waiting ops are returned in the order that they were sent
+ * to userspace, so match the result with the first non-waiting op.
+ */
spin_lock(&ops_lock);
- list_for_each_entry(iter, &recv_list, list) {
- if (iter->info.fsid == info.fsid &&
- iter->info.number == info.number &&
- iter->info.owner == info.owner) {
- if (iter->sigint) {
- list_del(&iter->list);
- spin_unlock(&ops_lock);
-
- pr_debug("%s: sigint cleanup %x %llx pid %d",
- __func__, iter->info.fsid,
- (unsigned long long)iter->info.number,
- iter->info.pid);
- do_unlock_close(&iter->info);
- memcpy(&iter->info, &info, sizeof(info));
- dlm_release_plock_op(iter);
- return count;
+ if (info.wait) {
+ list_for_each_entry(iter, &recv_list, list) {
+ if (iter->info.fsid == info.fsid &&
+ iter->info.number == info.number &&
+ iter->info.owner == info.owner &&
+ iter->info.pid == info.pid &&
+ iter->info.start == info.start &&
+ iter->info.end == info.end &&
+ iter->info.ex == info.ex &&
+ iter->info.wait) {
+ op = iter;
+ break;
+ }
+ }
+ } else {
+ list_for_each_entry(iter, &recv_list, list) {
+ if (!iter->info.wait) {
+ op = iter;
+ break;
}
- list_del_init(&iter->list);
- memcpy(&iter->info, &info, sizeof(info));
- if (iter->data)
- do_callback = 1;
- else
- iter->done = 1;
- op = iter;
- break;
}
}
+
+ if (op) {
+ /* Sanity check that op and info match. */
+ if (info.wait)
+ WARN_ON(op->info.optype != DLM_PLOCK_OP_LOCK);
+ else
+ WARN_ON(op->info.fsid != info.fsid ||
+ op->info.number != info.number ||
+ op->info.owner != info.owner ||
+ op->info.optype != info.optype);
+
+ list_del_init(&op->list);
+ memcpy(&op->info, &info, sizeof(info));
+ if (op->data)
+ do_callback = 1;
+ else
+ op->done = 1;
+ }
spin_unlock(&ops_lock);
if (op) {
@@ -463,8 +488,8 @@ static ssize_t dev_write(struct file *file, const char __user *u, size_t count,
else
wake_up(&recv_wq);
} else
- log_print("%s: no op %x %llx", __func__,
- info.fsid, (unsigned long long)info.number);
+ pr_debug("%s: no op %x %llx", __func__,
+ info.fsid, (unsigned long long)info.number);
return count;
}
diff --git a/fs/dlm/user.c b/fs/dlm/user.c
index d9c09fc0aba11..695e691b38b31 100644
--- a/fs/dlm/user.c
+++ b/fs/dlm/user.c
@@ -145,6 +145,24 @@ static void compat_output(struct dlm_lock_result *res,
}
#endif
+/* should held proc->asts_spin lock */
+void dlm_purge_lkb_callbacks(struct dlm_lkb *lkb)
+{
+ struct dlm_callback *cb, *safe;
+
+ list_for_each_entry_safe(cb, safe, &lkb->lkb_callbacks, list) {
+ list_del(&cb->list);
+ kref_put(&cb->ref, dlm_release_callback);
+ }
+
+ clear_bit(DLM_IFL_CB_PENDING_BIT, &lkb->lkb_iflags);
+
+ /* invalidate */
+ dlm_callback_set_last_ptr(&lkb->lkb_last_cast, NULL);
+ dlm_callback_set_last_ptr(&lkb->lkb_last_cb, NULL);
+ lkb->lkb_last_bast_mode = -1;
+}
+
/* Figure out if this lock is at the end of its life and no longer
available for the application to use. The lkb still exists until
the final ast is read. A lock becomes EOL in three situations:
diff --git a/fs/dlm/user.h b/fs/dlm/user.h
index 33059452d79e0..2caf8e6e24d51 100644
--- a/fs/dlm/user.h
+++ b/fs/dlm/user.h
@@ -6,6 +6,7 @@
#ifndef __USER_DOT_H__
#define __USER_DOT_H__
+void dlm_purge_lkb_callbacks(struct dlm_lkb *lkb);
void dlm_user_add_ast(struct dlm_lkb *lkb, uint32_t flags, int mode,
int status, uint32_t sbflags);
int dlm_user_init(void);