Eric Radman : a Journal

Handling TCP Connections with Kqueue Event Notification

Kqueue is a scalable mechanism for registering and responding to process, signal, timer, and file descriptor events in the kernel. Today it's a native facility on every major BSD distribution and MacOS.

The key to using the kqueue API is understand that every event is identified by a user-supplied integer. As Ted Unangst notes in "Experiences with kqueue" [PDF]. you must make sure you understand what this integer identifies. When implementing a TCP server this integer will be a file descriptor created by socket(2).

Listening on a Socket

The main event loop will react to events on the listening socket by calling accept(2) or close(2) if the client has disconnected. To get started, we first create a socket, and then bind it to an address

#include <sys/socket.h>
#include <sys/un.h>
#include <netdb.h>

struct addrinfo *addr;
struct addrinfo hints;

/* open a TCP socket */
memset(&hints, 0, sizeof hints);
hints.ai_family = PF_UNSPEC; /* any supported protocol */
hints.ai_flags = AI_PASSIVE; /* result for bind() */
hints.ai_socktype = SOCK_STREAM; /* TCP */
int error = getaddrinfo ("127.0.0.1", "8080", &hints, &addr);
if (error)
    errx(1, "getaddrinfo failed: %s", gai_strerror(error));

getaddrinfo(3) retured a pointer to an array of addresses that matched the hostname provided, here we'll only use the first match.

local_s = socket(addr->ai_family, addr->ai_socktype, addr->ai_protocol);
bind(local_s, addr->ai_addr, addr->ai_addrlen);
listen(local_s, 5);

socket, bind and listen all return -1 on error. Now that we have a file descriptor to watch, we initialize a queue, then add filters specify the actions to listen for

#include <sys/event.h>

int kq;
struct kevent evSet;

kq = kqueue();

EV_SET(&evSet, local_s, EVFILT_READ, EV_ADD, 0, 0, NULL);
if (kevent(kq, &evSet, 1, NULL, 0, NULL) == -1)
    err(1, "kevent");

watch_loop(kq);

EV_SET is a macro that simply fills in the kevent structure. The call to kevent(2) indicates that there is one change to kq: the addition of EVFILT_READ. The following loop waits for events and then uses any combination of meaningful conditions to determine what the event is, and what should be done about it.

void
watch_loop(int kq) {
    struct kevent evSet;
    struct kevent evList[32];
    int nev, i;
    struct sockaddr_storage addr;
    socklen_t socklen = sizeof(addr);
    int fd;

    while(1) {
        nev = kevent(kq, NULL, 0, evList, 32, NULL);
        if (nev < 1)
            err(1, "kevent");
        for (i=0; i<nev; i++) {
            if (evList[i].flags & EV_EOF) {
                printf("disconnect\n");
                fd = evList[i].ident;
                EV_SET(&evSet, fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
                if (kevent(kq, &evSet, 1, NULL, 0, NULL) == -1)
                    err(1, "kevent");
                conn_delete(fd);
            }
            else if (evList[i].ident == local_s) {
                fd = accept(evList[i].ident, (struct sockaddr *)&addr,
                    &socklen);
                if (fd == -1)
                    err(1, "accept");
                if (conn_add(fd) == 0) {
                    EV_SET(&evSet, fd, EVFILT_READ, EV_ADD, 0, 0, NULL);
                    if (kevent(kq, &evSet, 1, NULL, 0, NULL) == -1)
                        err(1, "kevent");
                    send_msg(fd, "welcome!\n");
                } else {
                    printf("connection refused\n");
                    close(fd);
                }
            }
            else if (evList[i].filter == EVFILT_READ) {
                recv_msg(evList[i].ident);
            }
        }
    }
}

The two utility methods send_msg and recv_msg in this example could be defined like so

void
send_msg(int s, char *message, ...) {
    char buf[256];
    int len;

    va_list ap;
    va_start(ap, message);
    len = vsnprintf(buf, sizeof(buf), message, ap);
    va_end(ap);
    send(s, buf, len, 0);
}

void
recv_msg(int s) {
    char buf[256];
    size_t bytes_read;

    bytes_read = recv(s, buf, sizeof(buf), 0);
    if ((int)bytes_read < 0)
        printf("%d bytes read\n", (int)bytes_read);
}

A Simple Connection Pool

Sometimes writing a highly general mechanism is a wise choice, but in C it sometimes makes more sense to implement data structures and functions that implement functionality for a specific task. Not only does this produce very concise code, but such specialized code can be used to remove line noise from the main event loop.

/* connpool.c */

#include <stdio.h>
#include <string.h>
#include <unistd.h>

#define NUSERS 10

/* forwards */

static int conn_index(int);
static int conn_add(int);
static int conn_delete(int);

/* globals */

struct uc {
    int uc_fd;
    char *uc_addr;
} users[NUSERS];

/* find the index of a file descriptor or a new slot if fd=0 */
int
conn_index(int fd) {
    int uidx;
    for (uidx = 0; uidx < NUSERS; uidx++)
        if (users[uidx].uc_fd == fd)
            return uidx;
    return -1;
}

/* add a new connection storing the IP address */
int
conn_add(int fd) {
    int uidx;
    if (fd < 1) return -1;
    if ((uidx = conn_index(0)) == -1)
        return -1;
    if (uidx == NUSERS) {
        close(fd);
        return -1;
    }
    users[uidx].uc_fd = fd; /* users file descriptor */
    users[uidx].uc_addr = 0; /* user IP address */
    return 0;
}

/* remove a connection and close it's fd */
int
conn_delete(int fd) {
    int uidx;
    if (fd < 1) return -1;
    if ((uidx = conn_index(fd)) == -1)
        return -1;

    users[uidx].uc_fd = 0;
    users[uidx].uc_addr = NULL;

    /* free(users[uidx].uc_addr); */
    return close(fd);
}

Using UNIX Domain Sockets

All of the mechanisms listed so far work identically for a TCP and a local socket connection. The only difference is the way in which the way in which the local socket is initialized

struct kevent evSet;
struct sockaddr_un sun;

/* open a UNIX socket */
local_s = socket(AF_UNIX, SOCK_STREAM, 0);
memset(&sun, 0, sizeof(struct sockaddr_un));
sun.sun_family = AF_UNIX;
strlcpy(sun.sun_path, "local.s", sizeof(sun.sun_path));
bind(local_s, (struct sockaddr *)&sun, SUN_LEN(&sun))
listen(local_s, 5)

watch_loop(kq);

Note that there's no reason kevent() cannot be called twice in order to register a local socket as well socket listening on a TCP port.

Binding and Reacting on Multiple Address Families

More than ten years after IPv6 was standardized very few corporate systems have adopted it, but the programming interfaces that emerged to support it have benefited system programmers because the interfaces themselves have become more general so as to support multiple protocols. getaddrinfo(3) provides a mechanism for binding sockets to addresses specified in their native, numeric format, or by a hostname that is resolved according the order specified in /etc/resolv.conf.

To start, establish a addrinfo structure with some data about the kind of connection you're trying to make and a pointer to an array of results that the OS is going to give us. *ai will refer to a ordered linked list. One Linux IPv6 is sorted first. If we actually wanted to listen on more than one address we would have to loop through the results by following the *ai_next pointer.

Since getaddrinfo() resolves hostnames as well as numeric addresses it can return a linked list of results. This is handy, because you can use name resolution to determine what services listen on multiple addresses. Start off by looping through the results:

struct addrinfo *ai0, *ai;
struct addrinfo hints;
int s[MAXSOCK];
int nsock;

nsock = 0;
memset(&s, 0, MAXSOCK);
for (ai = ai0; ai && nsock < MAXSOCK; ai = ai->ai_next) {
    if((s[nsock] = socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol)) < 0)
        continue;
    if(bind(s[nsock], ai->ai_addr, ai->ai_addrlen) < 0)
        continue;
    listen(s[nsock], 5);
    nsock++;
}

The kevent structure is very helpful here because the last field is a user-defined typeless pointer. When we call kevent() once for each descriptor created by listen() this field can be used to identify the event as a descriptor by whatever value *ai0 has.

for (i=0; i<nsock; i++) {
    EV_SET(&evSet, s[i], EVFILT_READ, EV_ADD, 0, 0, (void *)ai0);
    kevent(kq, &evSet, 1, (void *)0, 0, (struct timespec*)0);
}

It doesn't matter what the pointer refers to in this case, I'm just using the 32- or 64-bit address as a unique identifier. The event loop doesn't look much different, it just uses a different equality test.

for (;;) {
    nev = kevent(kq, (void *)0, 0, evlist, 8, (void *)0);
    for (i = 0; i < nev; i++) {
        if (evlist[i].udata == ai0) {
            /* Connection on FD created by listen(), call accept() */
        }
}

If getaddrinfo() is called with a name that resolves to an IP and IPv6 address the program to listen on both families.

$ netstat -an | grep 8080
tcp        0      0  127.0.0.1.8080         *.*                    LISTEN
tcp6       0      0  ::1.8080               *.*                    LISTEN

Cleanup

It's not alwas nessesary to explicitly delete kqueue filters, because calling close() on a file descriptor will remove any kevents that reference the descriptor. It is proper to free the linked-list created by getaddrinfo(3)

freeaddrinfo(ai0);

For a listening socket you may, but are not required to close every descriptor created by socket(3), but these will also be closed when the process terminates.