Brushing Up My C. Building A Unix Domain Socket Client/Server (PART II)

I described in this previous blog post how to build a simplistic Unix Domain Socket client/server application.
The disadvantage with that approach is that the server can only handle one connection at a time (i.e. is not concurrent).

This blog post explains how this can be improved by using mechanisms like select(), epoll(), kqueue() etc.
Effectively all these mechanisms allow for monitoring multiple file descriptors and be called back when one or multiple of those file descriptors have data so that an action can be invoked (i.e. read, write etc).
The main differences among those are characteristics like:

  • Synchronous vs Asynchronous paradigms
  • Underlying data structures in the internals of those system calls, which play an important role on performance
  • Platforms/OS specific, as not every OS supports all the above. Some are platform agnonistic (i.e. select()), some are platform specific (i.e. epoll() is only implemented on Linux)

A superb blog post to understand the differences is this one by Julia Evans

select()

I tried to just enhance the server part of the previous blog post and I went with the select() option.
The select() system call is simpler from epoll() or kqueue() and it effectively allows for registering a number of file descriptors which are monitored for I/O events. On calling select() the thread blocks and it only unblocks when one or more file descriptors have I/O data.
The file descriptors have to manually be registered on an fd_set, which in turn is passed in the select() call. The below macros can be used to manipulate the fd_set:

  • void FD_ZERO(fd_set *set): Initialize an fd_set
  • void FD_SET(int fd, fd_set *set): Add a file descriptor to an fd_set
  • void FD_CLR(int fd, fd_set *set): Remove a file descripro from the fd_set
  • int FD_ISSET(int fd, fd_set *set): Check if a specific file descriptor, part of the fd_set is ready with I/O data

The main caveat with select() is that on every call the fd_set is cleared from the file descriptors that do not have any I/O data on that cycle, hence the developer has to manually re-register all the file descriptors again, which is also descripted in the select() documentation

Note well: Upon return, each of the file descriptor sets is modified in place to indicate which file descriptors are currently “ready”. Thus, if using select() within a loop, the sets must be reinitialized before each call.

Having said that, the server.c file now looks like:

#include "stdlib.h"
#include "stdio.h"
#include "string.h"
#include "stddef.h"

#include <unistd.h>
#include <sys/socket.h>
#include <sys/types.h> 
#include <sys/un.h>
#include <netinet/in.h>
#include "sys/syscall.h"
#include <sys/select.h>
#include <errno.h>

#include "af_unix_sockets_common.h"

int pumpData(int fd);
int cleanupConnections(int *connections, int idx);

/*
* Open a `AF_UNIX` socket on the `path` specified. `bind()` to that address, `listen()` for incoming connections and `accept()`. Finally, wait for input from the socket and print 
* that to the `stdout`. When one connection is closed, wait for the next one.
*/
void server(char *path) {
    printf("Starting AF_UNIX server on Path=%s\n", path);
    AFUnixAddress *domainSocketAddress = open_af_unix_socket(path);

    int hasBind = bind(domainSocketAddress->fd, (struct sockaddr *)domainSocketAddress->address, sizeof(struct sockaddr));
    if(hasBind == -1){
        fprintf(stderr, "Failed to bind AF_UNIX socket on Path=%s. ErrorNo=%d\n", path, errno);
        cleanup(domainSocketAddress->fd, path);
        exit(errno);
    }

    int isListening = listen(domainSocketAddress->fd,  10);
    if(isListening == -1) {
        fprintf(stderr, "Failed to listen to AF_UNIX socket on Path=%s. ErrorNo=%d\n", path, errno);
        cleanup(domainSocketAddress->fd, path);
        exit(errno);
    }

    fd_set readfds;
    int maxFD = domainSocketAddress->fd;
    FD_ZERO(&readfds);
    FD_SET(domainSocketAddress->fd, &readfds);

    int openConnections[FD_SETSIZE];
    int closedConnections[FD_SETSIZE] = {0};// indices to openConnections that have clo
    int nextIdx = 0;

    fprintf(stdout, "Start accepting connections on Path=%s\n", path);
    while(TRUE) {
        int retVal = select(maxFD + 1, &readfds, NULL, NULL, NULL);
        if(FD_ISSET(domainSocketAddress->fd, &readfds)) {

            int connFd = accept(domainSocketAddress->fd, NULL, NULL);
            if(connFd == -1) {
                fprintf(stderr, "Error while accepting connection. Error=%s, ErrorNo=%d\n", strerror(errno), errno);
                cleanup(domainSocketAddress->fd, path);
                exit(errno);
            }
            fprintf(stdout, "New AF_UNIX connection added\n");

            openConnections[nextIdx++] = connFd;
            maxFD = maxFD >= connFd ? maxFD : connFd;
            FD_SET(connFd, &readfds);
        } else {
            for(int i = 0; i < nextIdx;i ++) {
                if(FD_ISSET(openConnections[i], &readfds)) {

                    if(!pumpData(openConnections[i])){
                        FD_CLR(openConnections[i], &readfds);
                        openConnections[i] = -1;// denotes that connection has closed
                    }
                }
            }

            nextIdx = cleanupConnections(openConnections, nextIdx);
        }

        // re-add all active FDs to fd_set
        FD_SET(domainSocketAddress->fd, &readfds);
        for(int i = 0; i < nextIdx;i ++) {
            FD_SET(openConnections[i], &readfds);
        }
    }

    cleanup(domainSocketAddress->fd, path);
}


int pumpData(int connFd) {
    char buf[BUFSIZ];
    int bytes = read(connFd, buf, BUFSIZ);
    if(bytes <= 0) {
        fprintf(stdout, "Connection closed\n");
        return FALSE;
    }
    write(1, buf, bytes);
    return TRUE;
}

int cleanupConnections(int *connections, int idx) {
    int temp[idx];
    int next = 0;
    for(int i = 0; i < idx;i++) {
        if(connections[i] != -1) {
            temp[next++] = connections[i];
        }
    }

    memcpy(connections, temp, sizeof(temp));
    return next;
}

The Changes

The few changes that worth mentioning are the below:

  • We first registered the AF_UNIX sockets file descriptor on the fd_set that is passed into the select() call.
  • On every call to select(), the first check to be done is that if the socket file descriptor has I/O data, which means a new connection. If so the server accept() that connection
if(FD_ISSET(domainSocketAddress->fd, &readfds)) {
    int connFd = accept(domainSocketAddress->fd, NULL, NULL);
    ...
  • After accepting a connection, that connection’s file descriptor has to be added to the fd_set so that it can be monitored for I/O events
FD_SET(connFd, &readfds);
  • For every open connection, the program checks if the corresponding file descriptor has I/O data, and if so the server reads those data. Worth noting that when a connection closes, this also means an I/O signal, hence the program needs to check and remove the closed file descriptor from the monitoring fd_set
if(FD_ISSET(openConnections[i], &readfds)) {
    if(!pumpData(openConnections[i])){
        FD_CLR(openConnections[i], &readfds);
        openConnections[i] = -1;// denotes that connection has closed
    }
}
  • Finally, as mentioned above, after select() returns it will only contain file descriptors that have data on the fd_set. Any previously added file descriptors that did not have I/O data on that cycle are removed, hence needs to be re-added. Luckily, according to the select() documentation there is no harm trying to re-set a file descriptor that is already in the fd_set hence we just loop over the known file descriptors and re-add them all on the fd_set

FD_SET() This macro adds the file descriptor fd to set. Adding a file descriptor that is already present in the set is a no-op, and does not produce an error.

// re-add all active FDs to fd_set
FD_SET(domainSocketAddress->fd, &readfds);
for(int i = 0; i < nextIdx;i ++) {
    FD_SET(openConnections[i], &readfds);
}

Conclusion

The changes needed to allow for multiplexing of different connections were minimal and did not radically affect the programs logic. Someone can take this example and enhance it further. Some suggestions would be:

  • Try epoll() instead of select()
  • Instead of just reading what the client has sent and printing it out to the console, broadcast the message to all clients connected at that time