I described in this previous blog post how to build a simplistic Unix Domain Socket client/server application.
The disadvantage with that approach is that the server can only handle one connection at a time (i.e. is not concurrent).
This blog post explains how this can be improved by using mechanisms like select(), epoll(), kqueue() etc.
Effectively all these mechanisms allow for monitoring multiple file descriptors and be called back when one or multiple of those file descriptors have data so that an action can be invoked (i.e. read, write etc).
The main differences among those are characteristics like:
- Synchronous vs Asynchronous paradigms
- Underlying data structures in the internals of those system calls, which play an important role on performance
- Platforms/OS specific, as not every OS supports all the above. Some are platform agnonistic (i.e.
select()
), some are platform specific (i.e.epoll()
is only implemented on Linux)
A superb blog post to understand the differences is this one by Julia Evans
select()
I tried to just enhance the server part of the previous blog post and I went with the select()
option.
The select()
system call is simpler from epoll()
or kqueue()
and it effectively allows for registering a number of file descriptors which are monitored for I/O events. On calling select()
the thread blocks and it only unblocks when one or more file descriptors have I/O data.
The file descriptors have to manually be registered on an fd_set
, which in turn is passed in the select()
call. The below macros can be used to manipulate the fd_set
:
void FD_ZERO(fd_set *set)
: Initialize anfd_set
void FD_SET(int fd, fd_set *set)
: Add a file descriptor to anfd_set
void FD_CLR(int fd, fd_set *set)
: Remove a file descripro from thefd_set
int FD_ISSET(int fd, fd_set *set)
: Check if a specific file descriptor, part of thefd_set
is ready with I/O data
The main caveat with select()
is that on every call the fd_set
is cleared from the file descriptors that do not have any I/O data on that cycle, hence the developer has to manually re-register all the file descriptors again, which is also descripted in the select()
documentation
Note well: Upon return, each of the file descriptor sets is modified in place to indicate which file descriptors are currently “ready”. Thus, if using select() within a loop, the sets must be reinitialized before each call.
Having said that, the server.c
file now looks like:
#include "stdlib.h"
#include "stdio.h"
#include "string.h"
#include "stddef.h"
#include <unistd.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <sys/un.h>
#include <netinet/in.h>
#include "sys/syscall.h"
#include <sys/select.h>
#include <errno.h>
#include "af_unix_sockets_common.h"
int pumpData(int fd);
int cleanupConnections(int *connections, int idx);
/*
* Open a `AF_UNIX` socket on the `path` specified. `bind()` to that address, `listen()` for incoming connections and `accept()`. Finally, wait for input from the socket and print
* that to the `stdout`. When one connection is closed, wait for the next one.
*/
void server(char *path) {
printf("Starting AF_UNIX server on Path=%s\n", path);
AFUnixAddress *domainSocketAddress = open_af_unix_socket(path);
int hasBind = bind(domainSocketAddress->fd, (struct sockaddr *)domainSocketAddress->address, sizeof(struct sockaddr));
if(hasBind == -1){
fprintf(stderr, "Failed to bind AF_UNIX socket on Path=%s. ErrorNo=%d\n", path, errno);
cleanup(domainSocketAddress->fd, path);
exit(errno);
}
int isListening = listen(domainSocketAddress->fd, 10);
if(isListening == -1) {
fprintf(stderr, "Failed to listen to AF_UNIX socket on Path=%s. ErrorNo=%d\n", path, errno);
cleanup(domainSocketAddress->fd, path);
exit(errno);
}
fd_set readfds;
int maxFD = domainSocketAddress->fd;
FD_ZERO(&readfds);
FD_SET(domainSocketAddress->fd, &readfds);
int openConnections[FD_SETSIZE];
int closedConnections[FD_SETSIZE] = {0};// indices to openConnections that have clo
int nextIdx = 0;
fprintf(stdout, "Start accepting connections on Path=%s\n", path);
while(TRUE) {
int retVal = select(maxFD + 1, &readfds, NULL, NULL, NULL);
if(FD_ISSET(domainSocketAddress->fd, &readfds)) {
int connFd = accept(domainSocketAddress->fd, NULL, NULL);
if(connFd == -1) {
fprintf(stderr, "Error while accepting connection. Error=%s, ErrorNo=%d\n", strerror(errno), errno);
cleanup(domainSocketAddress->fd, path);
exit(errno);
}
fprintf(stdout, "New AF_UNIX connection added\n");
openConnections[nextIdx++] = connFd;
maxFD = maxFD >= connFd ? maxFD : connFd;
FD_SET(connFd, &readfds);
} else {
for(int i = 0; i < nextIdx;i ++) {
if(FD_ISSET(openConnections[i], &readfds)) {
if(!pumpData(openConnections[i])){
FD_CLR(openConnections[i], &readfds);
openConnections[i] = -1;// denotes that connection has closed
}
}
}
nextIdx = cleanupConnections(openConnections, nextIdx);
}
// re-add all active FDs to fd_set
FD_SET(domainSocketAddress->fd, &readfds);
for(int i = 0; i < nextIdx;i ++) {
FD_SET(openConnections[i], &readfds);
}
}
cleanup(domainSocketAddress->fd, path);
}
int pumpData(int connFd) {
char buf[BUFSIZ];
int bytes = read(connFd, buf, BUFSIZ);
if(bytes <= 0) {
fprintf(stdout, "Connection closed\n");
return FALSE;
}
write(1, buf, bytes);
return TRUE;
}
int cleanupConnections(int *connections, int idx) {
int temp[idx];
int next = 0;
for(int i = 0; i < idx;i++) {
if(connections[i] != -1) {
temp[next++] = connections[i];
}
}
memcpy(connections, temp, sizeof(temp));
return next;
}
The Changes
The few changes that worth mentioning are the below:
- We first registered the
AF_UNIX
sockets file descriptor on thefd_set
that is passed into theselect()
call. - On every call to
select()
, the first check to be done is that if the socket file descriptor has I/O data, which means a new connection. If so the serveraccept()
that connection
if(FD_ISSET(domainSocketAddress->fd, &readfds)) {
int connFd = accept(domainSocketAddress->fd, NULL, NULL);
...
- After accepting a connection, that connection’s file descriptor has to be added to the
fd_set
so that it can be monitored for I/O events
FD_SET(connFd, &readfds);
- For every open connection, the program checks if the corresponding file descriptor has I/O data, and if so the server reads those data. Worth noting that when a connection closes, this also means an I/O signal, hence the program needs to check and remove the closed file descriptor from the monitoring
fd_set
if(FD_ISSET(openConnections[i], &readfds)) {
if(!pumpData(openConnections[i])){
FD_CLR(openConnections[i], &readfds);
openConnections[i] = -1;// denotes that connection has closed
}
}
- Finally, as mentioned above, after
select()
returns it will only contain file descriptors that have data on thefd_set
. Any previously added file descriptors that did not have I/O data on that cycle are removed, hence needs to be re-added. Luckily, according to theselect()
documentation there is no harm trying to re-set a file descriptor that is already in thefd_set
hence we just loop over the known file descriptors and re-add them all on thefd_set
FD_SET() This macro adds the file descriptor fd to set. Adding a file descriptor that is already present in the set is a no-op, and does not produce an error.
// re-add all active FDs to fd_set
FD_SET(domainSocketAddress->fd, &readfds);
for(int i = 0; i < nextIdx;i ++) {
FD_SET(openConnections[i], &readfds);
}
Conclusion
The changes needed to allow for multiplexing of different connections were minimal and did not radically affect the programs logic. Someone can take this example and enhance it further. Some suggestions would be:
- Try
epoll()
instead ofselect()
- Instead of just reading what the client has sent and printing it out to the console, broadcast the message to all clients connected at that time