From 4c56742c9456d13d227a8a39a07dfabd6b77d3a7 Mon Sep 17 00:00:00 2001
From: venaas <venaas>
Date: Tue, 16 Sep 2008 12:38:09 +0000
Subject: simplifying code a bit by using lock per rqout

git-svn-id: https://svn.testnett.uninett.no/radsecproxy/trunk@381 e88ac4ed-0b26-0410-9574-a7f39faa03bf
---
 radsecproxy.c | 130 ++++++++++++++++++++++++++++++++++++++--------------------
 radsecproxy.h |   3 +-
 2 files changed, 88 insertions(+), 45 deletions(-)

diff --git a/radsecproxy.c b/radsecproxy.c
index b59b3f2..5ce8504 100644
--- a/radsecproxy.c
+++ b/radsecproxy.c
@@ -618,15 +618,13 @@ void removeclientrqs(struct client *client) {
 	server = ((struct clsrvconf *)entry->data)->servers;
 	if (!server)
 	    continue;
-	pthread_mutex_lock(&server->newrq_mutex);
 	for (i = 0; i < MAX_REQUESTS; i++) {
 	    rqout = server->requests + i;
-	    if (rqout->rq && rqout->rq->from == client) {
-		freerq(rqout->rq);
-		rqout->rq = NULL;
-	    }
+	    pthread_mutex_lock(rqout->lock);
+	    if (rqout->rq && rqout->rq->from == client)
+		freerqoutdata(rqout->rq);
+	    pthread_mutex_unlock(rqout->lock);
 	}
-	pthread_mutex_unlock(&server->newrq_mutex);
     }
 }
 
@@ -638,8 +636,11 @@ void freeserver(struct server *server, uint8_t destroymutex) {
 
     if (server->requests) {
 	rqout = server->requests;
-	for (end = rqout + MAX_REQUESTS; rqout < end; rqout++)
+	for (end = rqout + MAX_REQUESTS; rqout < end; rqout++) {
 	    freerqoutdata(rqout);
+	    pthread_mutex_destroy(rqout->lock);
+	    free(rqout->lock);
+	}
 	free(server->requests);
     }
     if (server->rbios)
@@ -658,6 +659,7 @@ void freeserver(struct server *server, uint8_t destroymutex) {
 int addserver(struct clsrvconf *conf) {
     struct clsrvconf *res;
     uint8_t type;
+    int i;
     
     if (conf->servers) {
 	debug(DBG_ERR, "addserver: currently works with just one server per conf");
@@ -691,6 +693,19 @@ int addserver(struct clsrvconf *conf) {
 	debug(DBG_ERR, "malloc failed");
 	goto errexit;
     }
+    for (i = 0; i < MAX_REQUESTS; i++) {
+	conf->servers->requests[i].lock = malloc(sizeof(pthread_mutex_t));
+	if (!conf->servers->requests[i].lock) {
+	    debug(DBG_ERR, "malloc failed");
+	    goto errexit;
+	}
+	if (pthread_mutex_init(conf->servers->requests[i].lock, NULL)) {
+	    debug(DBG_ERR, "mutex init failed");
+	    free(conf->servers->requests[i].lock);
+	    conf->servers->requests[i].lock = NULL;
+	    goto errexit;
+	}
+    }
     if (pthread_mutex_init(&conf->servers->lock, NULL)) {
 	debug(DBG_ERR, "mutex init failed");
 	goto errexit;
@@ -936,26 +951,45 @@ void freerq(struct request *rq) {
 void freerqoutdata(struct rqout *rqout) {
     if (!rqout)
 	return;
-    if (rqout->msg)
+    if (rqout->msg) {
 	radmsg_free(rqout->msg);
-    if (rqout->buf)
+	rqout->msg = NULL;
+    }
+    if (rqout->buf) {
 	free(rqout->buf);
-    if (rqout->rq)
+	rqout->buf = NULL;
+    }
+    if (rqout->rq) {
 	freerq(rqout->rq);
+	rqout->rq = NULL;
+    }
+    rqout->tries = 0;
+    rqout->received = 0;
+    memset(&rqout->expiry, 0, sizeof(struct timeval));
 }
 
 void sendrq(struct server *to, struct rqout *rqout) {
     int i;
-
+    
     pthread_mutex_lock(&to->newrq_mutex);
     /* might simplify if only try nextid, might be ok */
-    for (i = to->nextid; i < MAX_REQUESTS; i++)
-	if (!to->requests[i].buf)
-	    break;
-    if (i == MAX_REQUESTS) {
-	for (i = 0; i < to->nextid; i++)
+    for (i = to->nextid; i < MAX_REQUESTS; i++) {
+	if (!to->requests[i].buf) {
+	    pthread_mutex_lock(to->requests[i].lock);
 	    if (!to->requests[i].buf)
 		break;
+	    pthread_mutex_unlock(to->requests[i].lock);
+	}
+    }
+    if (i == MAX_REQUESTS) {
+	for (i = 0; i < to->nextid; i++) {
+	    if (!to->requests[i].buf) {
+		pthread_mutex_lock(to->requests[i].lock);
+		if (!to->requests[i].buf)
+		    break;
+		pthread_mutex_unlock(to->requests[i].lock);
+	    }
+	}
 	if (i == to->nextid) {
 	    debug(DBG_WARN, "sendrq: no room in queue, dropping request");
 	    freerqoutdata(rqout);
@@ -966,13 +1000,17 @@ void sendrq(struct server *to, struct rqout *rqout) {
     rqout->msg->id = (uint8_t)i;
     rqout->buf = radmsg2buf(rqout->msg, (uint8_t *)to->conf->secret);
     if (!rqout->buf) {
+	pthread_mutex_unlock(to->requests[i].lock);
 	debug(DBG_ERR, "sendrq: radmsg2buf failed");
 	freerqoutdata(rqout);
 	goto exit;
     }
     
     debug(DBG_DBG, "sendrq: inserting packet with id %d in queue for %s", i, to->conf->host);
-    to->requests[i] = *rqout;
+    to->requests[i].buf = rqout->buf;
+    to->requests[i].msg = rqout->msg;
+    to->requests[i].rq = rqout->rq;
+    pthread_mutex_unlock(to->requests[i].lock);
     to->nextid = i + 1;
 
     if (!to->newrq) {
@@ -1936,26 +1974,27 @@ void replyh(struct server *server, unsigned char *buf) {
     server->lostrqs = 0;
 
     rqout = server->requests + buf[1];
+    pthread_mutex_lock(rqout->lock);
+    if (!rqout->tries) {
+	free(buf);
+	buf = NULL;
+	debug(DBG_INFO, "replyh: no matching request sent with this id, ignoring reply");
+	goto errunlock;
+    }
+	
     msg = buf2radmsg(buf, (uint8_t *)server->conf->secret, rqout->msg->auth);
     free(buf);
     buf = NULL;
     if (!msg) {
         debug(DBG_WARN, "replyh: message validation failed, ignoring packet");
-	return;
+	goto errunlock;
     }
     if (msg->code != RAD_Access_Accept && msg->code != RAD_Access_Reject && msg->code != RAD_Access_Challenge
 	&& msg->code != RAD_Accounting_Response) {
 	debug(DBG_INFO, "replyh: discarding message type %s, accepting only access accept, access reject, access challenge and accounting response messages", radmsgtype2string(msg->code));
-	radmsg_free(msg);
-	return;
-    }
-    debug(DBG_DBG, "got %s message with id %d", radmsgtype2string(msg->code), msg->id);
-
-    pthread_mutex_lock(&server->newrq_mutex);
-    if (!rqout->buf || !rqout->tries) {
-	debug(DBG_INFO, "replyh: no matching request sent with this id, ignoring reply");
 	goto errunlock;
     }
+    debug(DBG_DBG, "got %s message with id %d", radmsgtype2string(msg->code), msg->id);
 
     if (rqout->received) {
 	debug(DBG_INFO, "replyh: already received, ignoring reply");
@@ -2047,12 +2086,12 @@ void replyh(struct server *server, unsigned char *buf) {
 
     debug(DBG_INFO, "replyh: passing reply to client %s", from->conf->name);
     sendreply(from, msg, &fromsa, rqout->rq->fromudpsock);
-    pthread_mutex_unlock(&server->newrq_mutex);
+    pthread_mutex_unlock(rqout->lock);
     return;
 
  errunlock:
     radmsg_free(msg);
-    pthread_mutex_unlock(&server->newrq_mutex);
+    pthread_mutex_unlock(rqout->lock);
     return;
 }
 
@@ -2156,24 +2195,28 @@ void *clientwr(void *arg) {
 		pthread_join(clientrdth, NULL);
 		goto errexit;
 	    }
-	    pthread_mutex_lock(&server->newrq_mutex);
-	    while (i < MAX_REQUESTS && !server->requests[i].buf)
-		i++;
-	    if (i == MAX_REQUESTS) {
-		pthread_mutex_unlock(&server->newrq_mutex);
-		break;
+
+	    for (; i < MAX_REQUESTS; i++) {
+		rqout = server->requests + i;
+		if (rqout->buf) {
+		    pthread_mutex_lock(rqout->lock);
+		    if (rqout->buf)
+			break;
+		    pthread_mutex_unlock(rqout->lock);
+		}
 	    }
-	    rqout = server->requests + i;
+		
+	    if (i == MAX_REQUESTS)
+		break;
 
             if (rqout->received) {
 		debug(DBG_DBG, "clientwr: packet %d in queue is marked as received", i);
 		if (rqout->buf) {
 		    debug(DBG_DBG, "clientwr: freeing received packet %d from queue", i);
 		    freerqoutdata(rqout);
-		    /* setting this to NULL means that it can be reused */
-		    rqout->buf = NULL;
+		    /* rqout->buf becomes NULL, means can be reused */
 		}
-                pthread_mutex_unlock(&server->newrq_mutex);
+                pthread_mutex_unlock(rqout->lock);
                 continue;
             }
 	    
@@ -2181,7 +2224,7 @@ void *clientwr(void *arg) {
             if (now.tv_sec < rqout->expiry.tv_sec) {
 		if (!timeout.tv_sec || rqout->expiry.tv_sec < timeout.tv_sec)
 		    timeout.tv_sec = rqout->expiry.tv_sec;
-		pthread_mutex_unlock(&server->newrq_mutex);
+                pthread_mutex_unlock(rqout->lock);
 		continue;
 	    }
 
@@ -2199,18 +2242,17 @@ void *clientwr(void *arg) {
 			server->lostrqs++;
 		}
 		freerqoutdata(rqout);
-		/* setting this to NULL means that it can be reused */
-		rqout->buf = NULL;
-		pthread_mutex_unlock(&server->newrq_mutex);
+		/* rqout->buf becomes NULL, means can be reused */
+                pthread_mutex_unlock(rqout->lock);
 		continue;
 	    }
-            pthread_mutex_unlock(&server->newrq_mutex);
 
 	    rqout->expiry.tv_sec = now.tv_sec + conf->retryinterval;
 	    if (!timeout.tv_sec || rqout->expiry.tv_sec < timeout.tv_sec)
 		timeout.tv_sec = rqout->expiry.tv_sec;
 	    rqout->tries++;
-	    conf->pdef->clientradput(server, server->requests[i].buf);
+	    conf->pdef->clientradput(server, rqout->buf);
+	    pthread_mutex_unlock(rqout->lock);
 	}
 	if (conf->statusserver) {
 	    secs = server->lastrcv.tv_sec > laststatsrv.tv_sec ? server->lastrcv.tv_sec : laststatsrv.tv_sec;
diff --git a/radsecproxy.h b/radsecproxy.h
index d1dcaef..a4eabfc 100644
--- a/radsecproxy.h
+++ b/radsecproxy.h
@@ -56,12 +56,13 @@ struct request {
 
 /* requests that our client will send */
 struct rqout {
+    pthread_mutex_t *lock; /* used when modifying buf/msg/rq */
     unsigned char *buf;
     struct radmsg *msg;
+    struct request *rq;
     uint8_t tries;
     uint8_t received;
     struct timeval expiry;
-    struct request *rq;
 };
 
 /* replies that a server will send */
-- 
cgit v1.1