1616#include <netinet/tcp.h>
1717#include <netinet/in.h>
1818
19+ #ifdef USE_EPOLL
20+ #include <sys/epoll.h>
21+ #endif
22+
1923#include "server.h"
2024#include "limits.h"
2125#include "util.h"
2226#include "sockhub.h"
2327
2428typedef struct buffer_t {
25- int ready ; // number of bytes that are ready to be sent/processed
29+ int ready ; /* number of bytes that are ready to be sent/processed */
2630 ShubMessageHdr * curmessage ;
27- char * data ; // dynamically allocated buffer
31+ char * data ; /* dynamically allocated buffer */
2832} buffer_t ;
2933
3034typedef struct stream_data_t * stream_t ;
3135
3236typedef struct client_data_t {
33- stream_t stream ; // NULL: client value is empty
37+ stream_t stream ; /* NULL: client value is empty */
3438 void * userdata ;
3539 unsigned int chan ;
3640} client_data_t ;
3741
3842typedef struct stream_data_t {
3943 int fd ;
40- bool good ; // 'false': stop serving this stream and disconnect when possible
44+ bool good ; /* 'false': stop serving this stream and disconnect when possible */
4145 buffer_t input ;
4246 buffer_t output ;
4347
44- // a map: 'chan' -> client_data_t
45- // 'chan' is expected to be < MAX_FDS which is pretty low
46- client_data_t * clients ; // dynamically allocated
48+ /* a map: 'chan' -> client_data_t */
49+ /* 'chan' is expected to be < MAX_FDS which is pretty low */
50+ client_data_t * clients ; /* dynamically allocated */
51+ struct stream_data_t * next ;
4752} stream_data_t ;
4853
4954typedef struct server_data_t {
5055 char * host ;
5156 int port ;
5257
53- int listener ; // the listening socket
54- fd_set all ; // all sockets including the listener
58+ int listener ; /* the listening socket */
59+ #ifdef USE_EPOLL
60+ int epollfd ;
61+ #else
62+ fd_set all ; /* all sockets including the listener */
5563 int maxfd ;
56-
57- int streamsnum ;
58- stream_data_t streams [ MAX_STREAMS ] ;
64+ #endif
65+ stream_t used_chain ;
66+ stream_t free_chain ;
5967
6068 onmessage_callback_t onmessage ;
6169 onconnect_callback_t onconnect ;
6270 ondisconnect_callback_t ondisconnect ;
6371} server_data_t ;
6472
65- // Returns the created socket, or -1 if failed.
73+ /* Returns the created socket, or -1 if failed. */
6674static int create_listening_socket (const char * host , int port ) {
6775 int s = socket (AF_INET , SOCK_STREAM , 0 );
6876 if (s == -1 ) {
@@ -113,32 +121,56 @@ server_t server_init(
113121 return server ;
114122}
115123
124+ bool register_socket (server_t server , int fd , stream_t stream )
125+ {
126+ #ifdef USE_EPOLL
127+ struct epoll_event ev ;
128+ ev .events = EPOLLIN ;
129+ ev .data .ptr = (void * )stream ;
130+ if (epoll_ctl (server -> epollfd , EPOLL_CTL_ADD , fd , & ev ) < 0 ) {
131+ return false;
132+ }
133+ #else
134+ FD_SET (fd , & server -> all );
135+ if (fd > server -> maxfd ) {
136+ server -> maxfd = fd ;
137+ }
138+ #endif
139+ return true;
140+ }
141+
116142bool server_start (server_t server ) {
117143 debug ("starting the server\n" );
118- server -> streamsnum = 0 ;
119-
144+ server -> free_chain = NULL ;
145+ server -> used_chain = NULL ;
146+
120147 server -> listener = create_listening_socket (server -> host , server -> port );
121148 if (server -> listener == -1 ) {
122149 return false;
123150 }
124151
152+ #ifdef USE_EPOLL
153+ server -> epollfd = epoll_create (MAX_EVENTS );
154+ if (server -> epollfd < 0 ) {
155+ return false;
156+ }
157+ #else
125158 FD_ZERO (& server -> all );
126- FD_SET (server -> listener , & server -> all );
127- server -> maxfd = server -> listener ;
128-
129- return true;
159+ server -> maxfd = 0 ;
160+ #endif
161+ return register_socket (server , server -> listener , NULL );
130162}
131163
132164static bool stream_flush (stream_t stream ) {
133165 int tosend = stream -> output .ready ;
134166 if (tosend == 0 ) {
135- // nothing to do
167+ /* nothing to do */
136168 return true;
137169 }
138170
139171 char * cursor = stream -> output .data ;
140172 while (tosend > 0 ) {
141- // repeat sending until we send everything
173+ /* repeat sending until we send everything */
142174 int sent = send (stream -> fd , cursor , tosend , 0 );
143175 if (sent == -1 ) {
144176 shout ("failed to flush the stream\n" );
@@ -153,7 +185,7 @@ static bool stream_flush(stream_t stream) {
153185 stream -> output .ready = 0 ;
154186 ShubMessageHdr * msg = stream -> output .curmessage ;
155187 if (msg ) {
156- // move the unfinished message to the start of the buffer
188+ /* move the unfinished message to the start of the buffer */
157189 memmove (stream -> output .data , msg , msg -> size + sizeof (ShubMessageHdr ));
158190 stream -> output .curmessage = (ShubMessageHdr * )stream -> output .data ;
159191 }
@@ -163,10 +195,9 @@ static bool stream_flush(stream_t stream) {
163195
164196static void server_flush (server_t server ) {
165197 debug ("flushing the streams\n" );
166- int i ;
167- for (i = 0 ; i < server -> streamsnum ; i ++ ) {
168- stream_t stream = server -> streams + i ;
169- stream_flush (stream );
198+ stream_t s ;
199+ for (s = server -> used_chain ; s != NULL ; s = s -> next ) {
200+ stream_flush (s );
170201 }
171202}
172203
@@ -187,7 +218,7 @@ static void stream_init(stream_t stream, int fd) {
187218
188219 stream -> clients = malloc (MAX_TRANSACTIONS * sizeof (client_data_t ));
189220 assert (stream -> clients );
190- // mark all clients as empty
221+ /* mark all clients as empty */
191222 for (i = 0 ; i < MAX_TRANSACTIONS ; i ++ ) {
192223 stream -> clients [i ].stream = NULL ;
193224 }
@@ -207,36 +238,28 @@ static void server_stream_destroy(server_t server, stream_t stream) {
207238 }
208239 }
209240 }
210-
211- FD_CLR (stream -> fd , & server -> all );
241+ #ifdef USE_EPOLL
242+ epoll_ctl (server -> epollfd , EPOLL_CTL_DEL , stream -> fd , NULL );
243+ #else
244+ FD_CLR (stream -> fd , & server -> all );
245+ #endif
212246 close (stream -> fd );
213247 free (stream -> clients );
214248 free (stream -> input .data );
215249 free (stream -> output .data );
216250}
217251
218- static void stream_move (stream_t dst , stream_t src ) {
219- int i ;
220- * dst = * src ;
221- for (i = 0 ; i < MAX_TRANSACTIONS ; i ++ ) {
222- if (dst -> clients [i ].stream ) {
223- dst -> clients [i ].stream = dst ;
224- }
225- }
226- }
227-
228252static void server_close_bad_streams (server_t server ) {
229- int i ;
230- for (i = server -> streamsnum - 1 ; i >= 0 ; i -- ) {
231- stream_t stream = server -> streams + i ;
232- if (!stream -> good ) {
233- server_stream_destroy (server , stream );
234- if (i != server -> streamsnum - 1 ) {
235- // move the last one here
236- * stream = server -> streams [server -> streamsnum - 1 ];
237- stream_move (stream , server -> streams + server -> streamsnum - 1 );
238- }
239- server -> streamsnum -- ;
253+ stream_t s , next , * spp ;
254+ for (spp = & server -> used_chain ; (s = * spp ) != NULL ; s = next ) {
255+ next = s -> next ;
256+ if (!s -> good ) {
257+ server_stream_destroy (server , s );
258+ * spp = next ;
259+ s -> next = server -> free_chain ;
260+ server -> free_chain = s ;
261+ } else {
262+ spp = & s -> next ;
240263 }
241264 }
242265}
@@ -279,7 +302,7 @@ static bool stream_message_append(stream_t stream, size_t len, void *data) {
279302
280303 int newsize = stream -> output .curmessage -> size + sizeof (ShubMessageHdr ) + len ;
281304 if (newsize > BUFFER_SIZE ) {
282- // the flushing will not help here
305+ /* the flushing will not help here */
283306 shout ("the message cannot be bigger than the buffer size\n" );
284307 stream -> good = false;
285308 return false;
@@ -326,7 +349,8 @@ bool client_message_finish(client_t client) {
326349 return stream_message_finish (client -> stream );
327350}
328351
329- bool client_message_shortcut (client_t client , xid_t arg ) {
352+ bool client_message_shortcut (client_t client , xid_t arg )
353+ {
330354 if (!stream_message_start (client -> stream , client -> chan )) {
331355 return false;
332356 }
@@ -348,36 +372,33 @@ static bool server_accept(server_t server) {
348372 return false;
349373 }
350374 debug ("a new connection accepted\n" );
351-
352- if (server -> streamsnum >= MAX_STREAMS ) {
353- shout ("streams limit hit, disconnecting the accepted connection\n" );
354- close (fd );
355- return false;
375+
376+ stream_t s = server -> free_chain ;
377+ if (s == NULL ) {
378+ s = malloc (sizeof (stream_data_t ));
379+ } else {
380+ server -> free_chain = s -> next ;
356381 }
382+ /* add new stream */
383+ s -> next = server -> used_chain ;
384+ server -> used_chain = s ;
357385
358- // add new stream
359- stream_t s = server -> streams + server -> streamsnum ++ ;
360386 stream_init (s , fd );
361387
362- FD_SET (fd , & server -> all );
363- if (fd > server -> maxfd ) {
364- server -> maxfd = fd ;
365- }
366-
367- return true;
388+ return register_socket (server , fd , s );
368389}
369390
370391static client_t stream_get_client (stream_t stream , unsigned int chan , bool * isnew ) {
371392 assert (chan < MAX_TRANSACTIONS );
372393 client_t client = stream -> clients + chan ;
373394 if (client -> stream == NULL ) {
374- // client is new
395+ /* client is new */
375396 client -> stream = stream ;
376397 client -> chan = chan ;
377398 * isnew = true;
378399 client -> userdata = NULL ;
379400 } else {
380- // collisions should not happen
401+ /* collisions should not happen */
381402 assert (client -> chan == chan );
382403 * isnew = false;
383404 }
@@ -412,7 +433,7 @@ static bool server_stream_handle(server_t server, stream_t stream) {
412433 ShubMessageHdr * msg = (ShubMessageHdr * )cursor ;
413434 int header_and_data = sizeof (ShubMessageHdr ) + msg -> size ;
414435 if (header_and_data <= toprocess ) {
415- // handle message
436+ /* handle message */
416437 bool isnew ;
417438 client_t client = stream_get_client (stream , msg -> chan , & isnew );
418439 if (isnew ) {
@@ -457,9 +478,30 @@ static bool server_stream_handle(server_t server, stream_t stream) {
457478void server_loop (server_t server ) {
458479 while (1 ) {
459480 int i ;
481+ int numready ;
482+ #ifdef USE_EPOLL
483+ struct epoll_event events [MAX_EVENTS ];
484+ numready = epoll_wait (server -> epollfd , events , MAX_EVENTS , -1 );
485+ if (numready < 0 ) {
486+ shout ("failed to select: %s\n" , strerror (errno ));
487+ return ;
488+ }
489+ for (i = 0 ; i < numready ; i ++ ) {
490+ stream_t stream = (stream_t )events [i ].data .ptr ;
491+ if (stream == NULL ) {
492+ server_accept (server );
493+ } else {
494+ if (events [i ].events & EPOLLERR ) {
495+ stream -> good = false;
496+ } else if (events [i ].events & EPOLLIN ) {
497+ server_stream_handle (server , stream );
498+ }
499+ }
500+ }
501+ #else
460502 fd_set readfds = server -> all ;
461- debug ("selecting\n" );
462503 int numready = select (server -> maxfd + 1 , & readfds , NULL , NULL , NULL );
504+ stream_t s ;
463505 if (numready == -1 ) {
464506 shout ("failed to select: %s\n" , strerror (errno ));
465507 return ;
@@ -470,14 +512,13 @@ void server_loop(server_t server) {
470512 server_accept (server );
471513 }
472514
473- for (i = 0 ; (i < server -> streamsnum ) && (numready > 0 ); i ++ ) {
474- stream_t stream = server -> streams + i ;
475- if (FD_ISSET (stream -> fd , & readfds )) {
476- server_stream_handle (server , stream );
515+ for (s = server_used_chain ; s != NULL && numready > 0 ; s = s -> next ) {
516+ if (FD_ISSET (s -> fd , & readfds )) {
517+ server_stream_handle (server , s );
477518 numready -- ;
478519 }
479520 }
480-
521+ #endif
481522 server_close_bad_streams (server );
482523 server_flush (server );
483524 }
@@ -501,7 +542,7 @@ unsigned client_get_ip_addr(client_t client)
501542}
502543
503544#if 0
504- // usage example
545+ /* usage example */
505546
506547void test_onconnect (client_t client ) {
507548 char * name = "hello" ;
0 commit comments