blob: 780ea37bf190a906ca86f2f0d07f2c7410f5e4f1 [file] [log] [blame]
Benny Prijono9033e312005-11-21 02:08:39 +00001/* $Id$ */
2/*
3 * Copyright (C)2003-2006 Benny Prijono <benny@prijono.org>
4 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18 */
19/*
20 * ioqueue_epoll.c
21 *
22 * This is the implementation of IOQueue framework using /dev/epoll
23 * API in _both_ Linux user-mode and kernel-mode.
24 */
25
26#include <pj/ioqueue.h>
27#include <pj/os.h>
28#include <pj/lock.h>
29#include <pj/log.h>
30#include <pj/list.h>
31#include <pj/pool.h>
32#include <pj/string.h>
33#include <pj/assert.h>
34#include <pj/errno.h>
35#include <pj/sock.h>
36#include <pj/compat/socket.h>
37
38#if !defined(PJ_LINUX_KERNEL) || PJ_LINUX_KERNEL==0
39 /*
40 * Linux user mode
41 */
42# include <sys/epoll.h>
43# include <errno.h>
44# include <unistd.h>
45
46# define epoll_data data.ptr
47# define epoll_data_type void*
48# define ioctl_val_type unsigned long
49# define getsockopt_val_ptr int*
50# define os_getsockopt getsockopt
51# define os_ioctl ioctl
52# define os_read read
53# define os_close close
54# define os_epoll_create epoll_create
55# define os_epoll_ctl epoll_ctl
56# define os_epoll_wait epoll_wait
57#else
58 /*
59 * Linux kernel mode.
60 */
61# include <linux/config.h>
62# include <linux/version.h>
63# if defined(MODVERSIONS)
64# include <linux/modversions.h>
65# endif
66# include <linux/kernel.h>
67# include <linux/poll.h>
68# include <linux/eventpoll.h>
69# include <linux/syscalls.h>
70# include <linux/errno.h>
71# include <linux/unistd.h>
72# include <asm/ioctls.h>
73 enum EPOLL_EVENTS
74 {
75 EPOLLIN = 0x001,
76 EPOLLOUT = 0x004,
77 EPOLLERR = 0x008,
78 };
79# define os_epoll_create sys_epoll_create
80 static int os_epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)
81 {
82 long rc;
83 mm_segment_t oldfs = get_fs();
84 set_fs(KERNEL_DS);
85 rc = sys_epoll_ctl(epfd, op, fd, event);
86 set_fs(oldfs);
87 if (rc) {
88 errno = -rc;
89 return -1;
90 } else {
91 return 0;
92 }
93 }
94 static int os_epoll_wait(int epfd, struct epoll_event *events,
95 int maxevents, int timeout)
96 {
97 int count;
98 mm_segment_t oldfs = get_fs();
99 set_fs(KERNEL_DS);
100 count = sys_epoll_wait(epfd, events, maxevents, timeout);
101 set_fs(oldfs);
102 return count;
103 }
104# define os_close sys_close
105# define os_getsockopt pj_sock_getsockopt
106 static int os_read(int fd, void *buf, size_t len)
107 {
108 long rc;
109 mm_segment_t oldfs = get_fs();
110 set_fs(KERNEL_DS);
111 rc = sys_read(fd, buf, len);
112 set_fs(oldfs);
113 if (rc) {
114 errno = -rc;
115 return -1;
116 } else {
117 return 0;
118 }
119 }
120# define socklen_t unsigned
121# define ioctl_val_type unsigned long
122 int ioctl(int fd, int opt, ioctl_val_type value);
123 static int os_ioctl(int fd, int opt, ioctl_val_type value)
124 {
125 int rc;
126 mm_segment_t oldfs = get_fs();
127 set_fs(KERNEL_DS);
128 rc = ioctl(fd, opt, value);
129 set_fs(oldfs);
130 if (rc < 0) {
131 errno = -rc;
132 return rc;
133 } else
134 return rc;
135 }
136# define getsockopt_val_ptr char*
137
138# define epoll_data data
139# define epoll_data_type __u32
140#endif
141
142#define THIS_FILE "ioq_epoll"
143
144//#define TRACE_(expr) PJ_LOG(3,expr)
145#define TRACE_(expr)
146
147/*
148 * Include common ioqueue abstraction.
149 */
150#include "ioqueue_common_abs.h"
151
152/*
153 * This describes each key.
154 */
155struct pj_ioqueue_key_t
156{
157 DECLARE_COMMON_KEY
158};
159
160/*
161 * This describes the I/O queue.
162 */
163struct pj_ioqueue_t
164{
165 DECLARE_COMMON_IOQUEUE
166
167 unsigned max, count;
168 pj_ioqueue_key_t hlist;
169 int epfd;
170};
171
172/* Include implementation for common abstraction after we declare
173 * pj_ioqueue_key_t and pj_ioqueue_t.
174 */
175#include "ioqueue_common_abs.c"
176
177/*
178 * pj_ioqueue_name()
179 */
180PJ_DEF(const char*) pj_ioqueue_name(void)
181{
182#if defined(PJ_LINUX_KERNEL) && PJ_LINUX_KERNEL!=0
183 return "epoll-kernel";
184#else
185 return "epoll";
186#endif
187}
188
189/*
190 * pj_ioqueue_create()
191 *
192 * Create select ioqueue.
193 */
194PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool,
195 pj_size_t max_fd,
196 pj_ioqueue_t **p_ioqueue)
197{
198 pj_ioqueue_t *ioqueue;
199 pj_status_t rc;
200 pj_lock_t *lock;
201
202 /* Check that arguments are valid. */
203 PJ_ASSERT_RETURN(pool != NULL && p_ioqueue != NULL &&
204 max_fd > 0, PJ_EINVAL);
205
206 /* Check that size of pj_ioqueue_op_key_t is sufficient */
207 PJ_ASSERT_RETURN(sizeof(pj_ioqueue_op_key_t)-sizeof(void*) >=
208 sizeof(union operation_key), PJ_EBUG);
209
210 ioqueue = pj_pool_alloc(pool, sizeof(pj_ioqueue_t));
211
212 ioqueue_init(ioqueue);
213
214 ioqueue->max = max_fd;
215 ioqueue->count = 0;
216 pj_list_init(&ioqueue->hlist);
217
218 rc = pj_lock_create_simple_mutex(pool, "ioq%p", &lock);
219 if (rc != PJ_SUCCESS)
220 return rc;
221
222 rc = pj_ioqueue_set_lock(ioqueue, lock, PJ_TRUE);
223 if (rc != PJ_SUCCESS)
224 return rc;
225
226 ioqueue->epfd = os_epoll_create(max_fd);
227 if (ioqueue->epfd < 0) {
228 ioqueue_destroy(ioqueue);
229 return PJ_RETURN_OS_ERROR(pj_get_native_os_error());
230 }
231
232 PJ_LOG(4, ("pjlib", "epoll I/O Queue created (%p)", ioqueue));
233
234 *p_ioqueue = ioqueue;
235 return PJ_SUCCESS;
236}
237
238/*
239 * pj_ioqueue_destroy()
240 *
241 * Destroy ioqueue.
242 */
243PJ_DEF(pj_status_t) pj_ioqueue_destroy(pj_ioqueue_t *ioqueue)
244{
245 PJ_ASSERT_RETURN(ioqueue, PJ_EINVAL);
246 PJ_ASSERT_RETURN(ioqueue->epfd > 0, PJ_EINVALIDOP);
247
248 pj_lock_acquire(ioqueue->lock);
249 os_close(ioqueue->epfd);
250 ioqueue->epfd = 0;
251 return ioqueue_destroy(ioqueue);
252}
253
254/*
255 * pj_ioqueue_register_sock()
256 *
257 * Register a socket to ioqueue.
258 */
259PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool,
260 pj_ioqueue_t *ioqueue,
261 pj_sock_t sock,
262 void *user_data,
263 const pj_ioqueue_callback *cb,
264 pj_ioqueue_key_t **p_key)
265{
266 pj_ioqueue_key_t *key = NULL;
267 pj_uint32_t value;
268 struct epoll_event ev;
269 int status;
270 pj_status_t rc = PJ_SUCCESS;
271
272 PJ_ASSERT_RETURN(pool && ioqueue && sock != PJ_INVALID_SOCKET &&
273 cb && p_key, PJ_EINVAL);
274
275 pj_lock_acquire(ioqueue->lock);
276
277 if (ioqueue->count >= ioqueue->max) {
278 rc = PJ_ETOOMANY;
279 TRACE_((THIS_FILE, "pj_ioqueue_register_sock error: too many files"));
280 goto on_return;
281 }
282
283 /* Set socket to nonblocking. */
284 value = 1;
285 if ((rc=os_ioctl(sock, FIONBIO, (ioctl_val_type)&value))) {
286 TRACE_((THIS_FILE, "pj_ioqueue_register_sock error: ioctl rc=%d",
287 rc));
288 rc = pj_get_netos_error();
289 goto on_return;
290 }
291
292 /* Create key. */
293 key = (pj_ioqueue_key_t*)pj_pool_zalloc(pool, sizeof(pj_ioqueue_key_t));
294 rc = ioqueue_init_key(pool, ioqueue, key, sock, user_data, cb);
295 if (rc != PJ_SUCCESS) {
296 key = NULL;
297 goto on_return;
298 }
299
300 /* os_epoll_ctl. */
301 ev.events = EPOLLIN | EPOLLOUT | EPOLLERR;
302 ev.epoll_data = (epoll_data_type)key;
303 status = os_epoll_ctl(ioqueue->epfd, EPOLL_CTL_ADD, sock, &ev);
304 if (status < 0) {
305 rc = pj_get_os_error();
306 key = NULL;
307 TRACE_((THIS_FILE,
308 "pj_ioqueue_register_sock error: os_epoll_ctl rc=%d",
309 status));
310 goto on_return;
311 }
312
313 /* Register */
314 pj_list_insert_before(&ioqueue->hlist, key);
315 ++ioqueue->count;
316
317on_return:
318 *p_key = key;
319 pj_lock_release(ioqueue->lock);
320
321 return rc;
322}
323
324/*
325 * pj_ioqueue_unregister()
326 *
327 * Unregister handle from ioqueue.
328 */
329PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key)
330{
331 pj_ioqueue_t *ioqueue;
332 struct epoll_event ev;
333 int status;
334
335 PJ_ASSERT_RETURN(key != NULL, PJ_EINVAL);
336
337 ioqueue = key->ioqueue;
338 pj_lock_acquire(ioqueue->lock);
339
340 pj_assert(ioqueue->count > 0);
341 --ioqueue->count;
342 pj_list_erase(key);
343
344 ev.events = 0;
345 ev.epoll_data = (epoll_data_type)key;
346 status = os_epoll_ctl( ioqueue->epfd, EPOLL_CTL_DEL, key->fd, &ev);
347 if (status != 0) {
348 pj_status_t rc = pj_get_os_error();
349 pj_lock_release(ioqueue->lock);
350 return rc;
351 }
352
353 pj_lock_release(ioqueue->lock);
354
355 /* Destroy the key. */
356 ioqueue_destroy_key(key);
357
358 return PJ_SUCCESS;
359}
360
361/* ioqueue_remove_from_set()
362 * This function is called from ioqueue_dispatch_event() to instruct
363 * the ioqueue to remove the specified descriptor from ioqueue's descriptor
364 * set for the specified event.
365 */
366static void ioqueue_remove_from_set( pj_ioqueue_t *ioqueue,
367 pj_sock_t fd,
368 enum ioqueue_event_type event_type)
369{
370}
371
372/*
373 * ioqueue_add_to_set()
374 * This function is called from pj_ioqueue_recv(), pj_ioqueue_send() etc
375 * to instruct the ioqueue to add the specified handle to ioqueue's descriptor
376 * set for the specified event.
377 */
378static void ioqueue_add_to_set( pj_ioqueue_t *ioqueue,
379 pj_sock_t fd,
380 enum ioqueue_event_type event_type )
381{
382}
383
384/*
385 * pj_ioqueue_poll()
386 *
387 */
388PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout)
389{
390 int i, count, processed;
391 struct epoll_event events[PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL];
392 int msec;
393 struct queue {
394 pj_ioqueue_key_t *key;
395 enum ioqueue_event_type event_type;
396 } queue[PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL];
397
398 PJ_CHECK_STACK();
399
400 msec = timeout ? PJ_TIME_VAL_MSEC(*timeout) : 9000;
401
402 count = os_epoll_wait( ioqueue->epfd, events, PJ_ARRAY_SIZE(events), msec);
403 if (count <= 0)
404 return count;
405
406 /* Lock ioqueue. */
407 pj_lock_acquire(ioqueue->lock);
408
409 for (processed=0, i=0; i<count; ++i) {
410 pj_ioqueue_key_t *h = (pj_ioqueue_key_t*)(epoll_data_type)
411 events[i].epoll_data;
412
413 /*
414 * Check readability.
415 */
416 if ((events[i].events & EPOLLIN) &&
417 (key_has_pending_read(h) || key_has_pending_accept(h))) {
418 queue[processed].key = h;
419 queue[processed].event_type = READABLE_EVENT;
420 ++processed;
421 }
422
423 /*
424 * Check for writeability.
425 */
426 if ((events[i].events & EPOLLOUT) && key_has_pending_write(h)) {
427 queue[processed].key = h;
428 queue[processed].event_type = WRITEABLE_EVENT;
429 ++processed;
430 }
431
432#if PJ_HAS_TCP
433 /*
434 * Check for completion of connect() operation.
435 */
436 if ((events[i].events & EPOLLOUT) && (h->connecting)) {
437 queue[processed].key = h;
438 queue[processed].event_type = WRITEABLE_EVENT;
439 ++processed;
440 }
441#endif /* PJ_HAS_TCP */
442
443 /*
444 * Check for error condition.
445 */
446 if (events[i].events & EPOLLERR && (h->connecting)) {
447 queue[processed].key = h;
448 queue[processed].event_type = EXCEPTION_EVENT;
449 ++processed;
450 }
451 }
452 pj_lock_release(ioqueue->lock);
453
454 /* Now process the events. */
455 for (i=0; i<processed; ++i) {
456 switch (queue[i].event_type) {
457 case READABLE_EVENT:
458 ioqueue_dispatch_read_event(ioqueue, queue[i].key);
459 break;
460 case WRITEABLE_EVENT:
461 ioqueue_dispatch_write_event(ioqueue, queue[i].key);
462 break;
463 case EXCEPTION_EVENT:
464 ioqueue_dispatch_exception_event(ioqueue, queue[i].key);
465 break;
466 case NO_EVENT:
467 pj_assert(!"Invalid event!");
468 break;
469 }
470 }
471
472 return processed;
473}
474