diff options
Diffstat (limited to 'lib/udp.c')
-rw-r--r-- | lib/udp.c | 133 |
1 files changed, 74 insertions, 59 deletions
@@ -1,4 +1,4 @@ -/* Copyright 2011 NORDUnet A/S. All rights reserved. +/* Copyright 2011,2013 NORDUnet A/S. All rights reserved. See LICENSE for licensing information. */ #if defined HAVE_CONFIG_H @@ -16,38 +16,39 @@ #include "event.h" #include "compat.h" #include "udp.h" +#include "conn.h" /* Send one packet, the first in queue. */ static int _send (struct rs_connection *conn, int fd) { ssize_t r = 0; - struct rs_packet *pkt = conn->out_queue; + struct rs_message *msg = conn->out_queue; - assert (pkt->rpkt); - assert (pkt->rpkt->data); + assert (msg->rpkt); + assert (msg->rpkt->data); /* Send. */ - r = compat_send (fd, pkt->rpkt->data, pkt->rpkt->length, 0); + r = compat_send (fd, msg->rpkt->data, msg->rpkt->length, 0); if (r == -1) { - int sockerr = evutil_socket_geterror (pkt->conn->fd); + int sockerr = evutil_socket_geterror (msg->conn->fd); if (sockerr != EAGAIN) - return rs_err_conn_push_fl (pkt->conn, RSE_SOCKERR, __FILE__, __LINE__, + return rs_err_conn_push_fl (msg->conn, RSE_SOCKERR, __FILE__, __LINE__, "%d: send: %d (%s)", fd, sockerr, evutil_socket_error_to_string (sockerr)); } - assert (r == pkt->rpkt->length); - /* Unlink the packet. */ - conn->out_queue = pkt->next; + assert (r == msg->rpkt->length); + /* Unlink the message. */ + conn->out_queue = msg->next; - /* If there are more packets in queue, add the write event again. */ - if (pkt->conn->out_queue) + /* If there are more messages in queue, add the write event again. */ + if (msg->conn->out_queue) { - r = event_add (pkt->conn->wev, NULL); + r = event_add (msg->conn->base_.wev, NULL); if (r < 0) - return rs_err_conn_push_fl (pkt->conn, RSE_EVENT, __FILE__, __LINE__, + return rs_err_conn_push_fl (msg->conn, RSE_EVENT, __FILE__, __LINE__, "event_add: %s", evutil_gai_strerror (r)); rs_debug (("%s: re-adding the write event\n", __func__)); } @@ -55,17 +56,14 @@ _send (struct rs_connection *conn, int fd) return RSE_OK; } -/* Callback for conn->wev and conn->rev. FIXME: Rename. +/** Callback for conn->wev and conn->rev. FIXME: Rename. - USER_DATA contains connection for EV_READ and a packet for - EV_WRITE. This is because we don't have a connect/establish entry - point at the user level -- send implies connect so when we're - connected we need the packet to send. */ + \a user_data holds a message. */ static void _evcb (evutil_socket_t fd, short what, void *user_data) { int err; - struct rs_packet *pkt = (struct rs_packet *) user_data; + struct rs_message *msg = (struct rs_message *) user_data; rs_debug (("%s: fd=%d what =", __func__, fd)); if (what & EV_TIMEOUT) rs_debug ((" TIMEOUT -- shouldn't happen!")); @@ -73,20 +71,21 @@ _evcb (evutil_socket_t fd, short what, void *user_data) if (what & EV_WRITE) rs_debug ((" WRITE")); rs_debug (("\n")); - assert (pkt); - assert (pkt->conn); + assert (msg); + assert (msg->conn); if (what & EV_READ) { - /* Read a single UDP packet and stick it in USER_DATA. */ + /* Read a single UDP packet and stick it in the struct + rs_message passed in user_data. */ /* TODO: Verify that unsolicited packets are dropped. */ ssize_t r = 0; + assert (msg->rpkt); + assert (msg->rpkt->data); - assert (pkt->rpkt->data); - - r = compat_recv (fd, pkt->rpkt->data, RS_MAX_PACKET_LEN, MSG_TRUNC); + r = compat_recv (fd, msg->rpkt->data, RS_MAX_PACKET_LEN, MSG_TRUNC); if (r == -1) { - int sockerr = evutil_socket_geterror (pkt->conn->fd); + int sockerr = evutil_socket_geterror (msg->conn->fd); if (sockerr == EAGAIN) { /* FIXME: Really shouldn't happen since we've been told @@ -96,61 +95,74 @@ _evcb (evutil_socket_t fd, short what, void *user_data) } /* Hard error. */ - rs_err_conn_push_fl (pkt->conn, RSE_SOCKERR, __FILE__, __LINE__, - "%d: recv: %d (%s)", fd, sockerr, - evutil_socket_error_to_string (sockerr)); - event_del (pkt->conn->tev); + rs_err_conn_push (msg->conn, RSE_SOCKERR, + "%d: recv: %d (%s)", fd, sockerr, + evutil_socket_error_to_string (sockerr)); + event_del (msg->conn->tev); goto err_out; } - event_del (pkt->conn->tev); + event_del (msg->conn->tev); if (r < 20 || r > RS_MAX_PACKET_LEN) /* Short or long packet. */ { - rs_err_conn_push (pkt->conn, RSE_INVALID_PKT, - "invalid packet length: %d", r); + rs_err_conn_push (msg->conn, RSE_INVALID_MSG, + "invalid message length: %d", r); goto err_out; } - pkt->rpkt->length = (pkt->rpkt->data[2] << 8) + pkt->rpkt->data[3]; - err = nr_packet_ok (pkt->rpkt); + msg->rpkt->length = (msg->rpkt->data[2] << 8) + msg->rpkt->data[3]; + err = nr_packet_ok (msg->rpkt); if (err) { - rs_err_conn_push_fl (pkt->conn, -err, __FILE__, __LINE__, - "invalid packet"); + rs_err_conn_push (msg->conn, -err, "invalid message"); goto err_out; } - /* Hand over message to user. This changes ownership of pkt. + /* Hand over message to user. This changes ownership of msg. Don't touch it afterwards -- it might have been freed. */ - if (pkt->conn->callbacks.received_cb) - pkt->conn->callbacks.received_cb (pkt, pkt->conn->user_data); + if (msg->conn->callbacks.received_cb) + msg->conn->callbacks.received_cb (msg, msg->conn->base_.user_data); } else if (what & EV_WRITE) { - if (!pkt->conn->is_connected) - event_on_connect (pkt->conn, pkt); - - if (pkt->conn->out_queue) - if (_send (pkt->conn, fd) == RSE_OK) - if (pkt->conn->callbacks.sent_cb) - pkt->conn->callbacks.sent_cb (pkt->conn->user_data); + if (conn_originating_p (msg->conn)) + { + /* We're a client. */ + if (msg->conn->state == RS_CONN_STATE_CONNECTING) + event_on_connect_orig (msg->conn, msg); + } + else + { + /* We're a server. */ + rs_debug (("%s: write event on terminating conn %p\n", + __func__, msg->conn)); + } + + if (msg->conn->out_queue) + if (_send (msg->conn, fd) == RSE_OK) + if (msg->conn->callbacks.sent_cb) + msg->conn->callbacks.sent_cb (msg->conn->base_.user_data); } return; err_out: - rs_conn_disconnect (pkt->conn); + rs_conn_disconnect (msg->conn); } int -udp_init (struct rs_connection *conn, struct rs_packet *pkt) +udp_init (struct rs_connection *conn, struct rs_message *msg) { - assert (!conn->bev); - - conn->rev = event_new (conn->evb, conn->fd, EV_READ|EV_PERSIST, _evcb, NULL); - conn->wev = event_new (conn->evb, conn->fd, EV_WRITE, _evcb, NULL); - if (!conn->rev || !conn->wev) + assert (!conn->base_.bev); + + /* FIXME: Explain why we set EV_PERSIST on the read event but not on + the write event. */ + conn->base_.rev = event_new (conn->base_.ctx->evb, conn->base_.fd, + EV_READ|EV_PERSIST, _evcb, NULL); + conn->base_.wev = event_new (conn->base_.ctx->evb, conn->base_.fd, + EV_WRITE, _evcb, NULL); + if (!conn->base_.rev || !conn->base_.wev) { - if (conn->rev) + if (conn->base_.rev) { - event_free (conn->rev); - conn->rev = NULL; + event_free (conn->base_.rev); + conn->base_.rev = NULL; } /* ENOMEM _or_ EINVAL but EINVAL only if we use EV_SIGNAL, at least for now (libevent-2.0.5). */ @@ -163,10 +175,13 @@ int udp_init_retransmit_timer (struct rs_connection *conn) { assert (conn); + assert (conn->base_.ctx); + assert (conn->base_.ctx->evb); if (conn->tev) event_free (conn->tev); - conn->tev = evtimer_new (conn->evb, event_retransmit_timeout_cb, conn); + conn->tev = + evtimer_new (conn->base_.ctx->evb, event_retransmit_timeout_cb, conn); if (!conn->tev) return rs_err_conn_push_fl (conn, RSE_EVENT, __FILE__, __LINE__, "evtimer_new"); |