Handling TCP Connections with Kqueue Event Notification
Kqueue is a scalable mechanism for registering and responding to process, signal, timer, and file descriptor events in the kernel. Today it's a native facility on every major BSD distribution and MacOS.
The key to using the kqueue API is understand that every event is identified
by a user-supplied integer. As Ted Unangst notes in "Experiences with kqueue"
[PDF].
you must make sure you understand what this integer identifies. When
implementing a TCP server this integer will be a file descriptor created by
socket(2)
.
Listening on a Socket
The main event loop will react to events on the listening socket by calling
accept(2)
or
close(2)
if the client has disconnected. To get started, we first create a socket, and
then bind it to an address
#include <sys/socket.h> #include <sys/un.h> #include <netdb.h> struct addrinfo *addr; struct addrinfo hints; /* open a TCP socket */ memset(&hints, 0, sizeof hints); hints.ai_family = PF_UNSPEC; /* any supported protocol */ hints.ai_flags = AI_PASSIVE; /* result for bind() */ hints.ai_socktype = SOCK_STREAM; /* TCP */ int error = getaddrinfo ("127.0.0.1", "8080", &hints, &addr); if (error) errx(1, "getaddrinfo failed: %s", gai_strerror(error));
getaddrinfo(3)
retured a pointer to an array of addresses that matched the hostname provided,
here we'll only use the first match.
local_s = socket(addr->ai_family, addr->ai_socktype, addr->ai_protocol);
bind(local_s, addr->ai_addr, addr->ai_addrlen);
listen(local_s, 5);
socket
,
bind
and
listen
all return
-1
on error. Now that we have a file descriptor to watch, we initialize a queue,
then add filters specify the actions to listen for
#include <sys/event.h> int kq; struct kevent evSet; kq = kqueue(); EV_SET(&evSet, local_s, EVFILT_READ, EV_ADD, 0, 0, NULL); if (kevent(kq, &evSet, 1, NULL, 0, NULL) == -1) err(1, "kevent"); watch_loop(kq);
EV_SET
is a macro that simply fills in the
kevent
structure. The call to
kevent(2)
indicates that there is one change to
kq
:
the addition of
EVFILT_READ
.
The following loop waits for events and then uses any combination of
meaningful conditions to determine what the event is, and what should be done
about it.
void watch_loop(int kq) { struct kevent evSet; struct kevent evList[32]; int nev, i; struct sockaddr_storage addr; socklen_t socklen = sizeof(addr); int fd; while(1) { nev = kevent(kq, NULL, 0, evList, 32, NULL); if (nev < 1) err(1, "kevent"); for (i=0; i<nev; i++) { if (evList[i].flags & EV_EOF) { printf("disconnect\n"); fd = evList[i].ident; EV_SET(&evSet, fd, EVFILT_READ, EV_DELETE, 0, 0, NULL); if (kevent(kq, &evSet, 1, NULL, 0, NULL) == -1) err(1, "kevent"); conn_delete(fd); } else if (evList[i].ident == local_s) { fd = accept(evList[i].ident, (struct sockaddr *)&addr, &socklen); if (fd == -1) err(1, "accept"); if (conn_add(fd) == 0) { EV_SET(&evSet, fd, EVFILT_READ, EV_ADD, 0, 0, NULL); if (kevent(kq, &evSet, 1, NULL, 0, NULL) == -1) err(1, "kevent"); send_msg(fd, "welcome!\n"); } else { printf("connection refused\n"); close(fd); } } else if (evList[i].filter == EVFILT_READ) { recv_msg(evList[i].ident); } } } }
The two utility methods
send_msg
and
recv_msg
in this example could be defined like so
void send_msg(int s, char *message, ...) { char buf[256]; int len; va_list ap; va_start(ap, message); len = vsnprintf(buf, sizeof(buf), message, ap); va_end(ap); send(s, buf, len, 0); } void recv_msg(int s) { char buf[256]; size_t bytes_read; bytes_read = recv(s, buf, sizeof(buf), 0); if ((int)bytes_read < 0) printf("%d bytes read\n", (int)bytes_read); }
A Simple Connection Pool
Sometimes writing a highly general mechanism is a wise choice, but in C it sometimes makes more sense to implement data structures and functions that implement functionality for a specific task. Not only does this produce very concise code, but such specialized code can be used to remove line noise from the main event loop.
/* connpool.c */ #include <stdio.h> #include <string.h> #include <unistd.h> #define NUSERS 10 /* forwards */ static int conn_index(int); static int conn_add(int); static int conn_delete(int); /* globals */ struct uc { int uc_fd; char *uc_addr; } users[NUSERS]; /* find the index of a file descriptor or a new slot if fd=0 */ int conn_index(int fd) { int uidx; for (uidx = 0; uidx < NUSERS; uidx++) if (users[uidx].uc_fd == fd) return uidx; return -1; } /* add a new connection storing the IP address */ int conn_add(int fd) { int uidx; if (fd < 1) return -1; if ((uidx = conn_index(0)) == -1) return -1; if (uidx == NUSERS) { close(fd); return -1; } users[uidx].uc_fd = fd; /* users file descriptor */ users[uidx].uc_addr = 0; /* user IP address */ return 0; } /* remove a connection and close it's fd */ int conn_delete(int fd) { int uidx; if (fd < 1) return -1; if ((uidx = conn_index(fd)) == -1) return -1; users[uidx].uc_fd = 0; users[uidx].uc_addr = NULL; /* free(users[uidx].uc_addr); */ return close(fd); }
Using UNIX Domain Sockets
All of the mechanisms listed so far work identically for a TCP and a local socket connection. The only difference is the way in which the way in which the local socket is initialized
struct kevent evSet; struct sockaddr_un sun; /* open a UNIX socket */ local_s = socket(AF_UNIX, SOCK_STREAM, 0); memset(&sun, 0, sizeof(struct sockaddr_un)); sun.sun_family = AF_UNIX; strlcpy(sun.sun_path, "local.s", sizeof(sun.sun_path)); bind(local_s, (struct sockaddr *)&sun, SUN_LEN(&sun)) listen(local_s, 5) watch_loop(kq);
Note that there's no reason
kevent()
cannot be called twice in order to register a local socket as well socket
listening on a TCP port.
Binding and Reacting on Multiple Address Families
More than ten years after IPv6 was standardized very few corporate systems
have adopted it, but the programming interfaces that emerged to support it have
benefited system programmers because the interfaces themselves have become more
general so as to support multiple protocols.
getaddrinfo(3)
provides a mechanism for binding sockets to addresses specified in their
native, numeric format, or by a hostname that is resolved according the order
specified in
/etc/resolv.conf
.
To start, establish a
addrinfo
structure with some data about the
kind of connection you're trying to make and a pointer to an array of results
that the OS is going to give us.
*ai
will refer to a ordered linked
list. One Linux IPv6 is sorted first. If we actually wanted to listen on more
than one address we would have to loop through the results by following the
*ai_next
pointer.
Since
getaddrinfo()
resolves hostnames as well as numeric addresses it can return a linked list of
results. This is handy, because you can use name resolution to determine what
services listen on multiple addresses. Start off by looping through the
results:
struct addrinfo *ai0, *ai; struct addrinfo hints; int s[MAXSOCK]; int nsock; nsock = 0; memset(&s, 0, MAXSOCK); for (ai = ai0; ai && nsock < MAXSOCK; ai = ai->ai_next) { if((s[nsock] = socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol)) < 0) continue; if(bind(s[nsock], ai->ai_addr, ai->ai_addrlen) < 0) continue; listen(s[nsock], 5); nsock++; }
The
kevent
structure is very helpful here because the last field is a user-defined
typeless pointer. When we call
kevent()
once for each descriptor created by
listen()
this field can be used to identify the event as a descriptor by whatever value
*ai0
has.
for (i=0; i<nsock; i++) { EV_SET(&evSet, s[i], EVFILT_READ, EV_ADD, 0, 0, (void *)ai0); kevent(kq, &evSet, 1, (void *)0, 0, (struct timespec*)0); }
It doesn't matter what the pointer refers to in this case, I'm just using the 32- or 64-bit address as a unique identifier. The event loop doesn't look much different, it just uses a different equality test.
for (;;) { nev = kevent(kq, (void *)0, 0, evlist, 8, (void *)0); for (i = 0; i < nev; i++) { if (evlist[i].udata == ai0) { /* Connection on FD created by listen(), call accept() */ } }
If
getaddrinfo()
is called with a name that resolves to an IP and IPv6 address the program to
listen on both families.
$ netstat -an | grep 8080 tcp 0 0 127.0.0.1.8080 *.* LISTEN tcp6 0 0 ::1.8080 *.* LISTEN
Cleanup
It's not alwas nessesary to explicitly delete kqueue filters,
because calling
close()
on a file descriptor will remove any kevents that reference the descriptor. It
is proper to free the linked-list created by
getaddrinfo(3)
freeaddrinfo(ai0);
For a listening socket you may, but are not required to close every
descriptor created by
socket(3)
,
but these will also be closed when the process terminates.