1 module libasync.posix;
2 
3 version (Posix):
4 
5 import libasync.types;
6 import std.string : toStringz;
7 import std.conv : to;
8 import std.datetime : Duration, msecs, seconds, SysTime;
9 import std.traits : isIntegral;
10 import std.typecons : Tuple, tuple;
11 import std.container : Array;
12 
13 import core.stdc.errno;
14 import libasync.events;
15 import libasync.internals.memory : FreeListObjectAlloc;
16 import libasync.internals.hashmap;
17 import libasync.internals.path;
18 import core.sys.posix.signal;
19 import libasync.posix2;
20 import core.sync.mutex;
21 enum SOCKET_ERROR = -1;
22 alias fd_t = int;
23 
24 
25 version(linux) {
26 	import libasync.internals.epoll;
27 	const EPOLL = true;
28 	extern(C) nothrow @nogc {
29 		int __libc_current_sigrtmin();
30 		int __libc_current_sigrtmax();
31 	}
32 	bool g_signalsBlocked;
33 	package nothrow void blockSignals() {
34 		try {
35 			/// Block signals to reserve SIGRTMIN .. " +30 for AsyncSignal
36 			sigset_t mask;
37 			// todo: use more signals for more event loops per thread.. (is this necessary?)
38 			//foreach (j; __libc_current_sigrtmin() .. __libc_current_sigrtmax() + 1) {
39 				//import std.stdio : writeln; 
40 				//try writeln("Blocked signal " ~ (__libc_current_sigrtmin() + j).to!string ~ " in instance " ~ m_instanceId.to!string); catch {}
41 			sigemptyset(&mask);
42 			sigaddset(&mask, cast(int) __libc_current_sigrtmin());
43 			pthread_sigmask(SIG_BLOCK, &mask, null);
44 			//}
45 		} catch {}
46 	}
47 	static this() {
48 		blockSignals();
49 		g_signalsBlocked = true;
50 	}
51 }
52 version(OSX) {
53 	import libasync.internals.kqueue;
54 	const EPOLL = false;
55 }
56 version(FreeBSD) {
57 	import libasync.internals.kqueue;
58 	const EPOLL = false;
59 }
60 
61 __gshared Mutex g_mutex;
62 
63 static if (!EPOLL) {
64 	private struct DWFileInfo {
65 		fd_t folder;
66 		Path path;
67 		SysTime lastModified;
68 		bool is_dir;
69 	}
70 }
71 
72 private struct DWFolderInfo {
73 	WatchInfo wi;
74 	fd_t fd;
75 }
76 
77 package struct EventLoopImpl {
78 	static if (EPOLL) {
79 		pragma(msg, "Using Linux EPOLL for events");
80 	}
81 	else /* if KQUEUE */
82 	{
83 		pragma(msg, "Using FreeBSD KQueue for events");
84 	}
85 
86 package:
87 	alias error_t = EPosix;
88 
89 nothrow:
90 private:
91 
92 	/// members
93 	EventLoop m_evLoop;
94 	ushort m_instanceId;
95 	bool m_started;
96 	StatusInfo m_status;
97 	error_t m_error = EPosix.EOK;
98 	EventInfo* m_evSignal;
99 	static if (EPOLL){
100 		fd_t m_epollfd;
101 		HashMap!(Tuple!(fd_t, uint), DWFolderInfo) m_dwFolders; // uint = inotify_add_watch(Path)
102 	}
103 	else /* if KQUEUE */
104 	{
105 		fd_t m_kqueuefd;
106 		HashMap!(fd_t, EventInfo*) m_watchers; // fd_t = id++ per AsyncDirectoryWatcher
107 		HashMap!(fd_t, DWFolderInfo) m_dwFolders; // fd_t = open(folder)
108 		HashMap!(fd_t, DWFileInfo) m_dwFiles; // fd_t = open(file)
109 		HashMap!(fd_t, Array!(DWChangeInfo)*) m_changes; // fd_t = id++ per AsyncDirectoryWatcher
110 
111 	}
112 
113 package:
114 	
115 	/// workaround for IDE indent bug on too big files
116 	mixin RunKill!();
117 
118 	@property bool started() const {
119 		return m_started;
120 	}
121 
122 	bool init(EventLoop evl)
123 	in { assert(!m_started); }
124 	body
125 	{
126 
127 		import core.atomic;
128 		shared static ushort i;
129 		string* failer = null;
130 
131 
132 		m_instanceId = i;
133 		static if (!EPOLL) g_threadId = new size_t(cast(size_t)m_instanceId);
134 
135 		core.atomic.atomicOp!"+="(i, cast(ushort) 1);
136 		m_evLoop = evl;
137 
138 		import core.thread;
139 		try Thread.getThis().priority = Thread.PRIORITY_MAX;
140 		catch (Exception e) { assert(false, "Could not set thread priority"); }
141 
142 		try
143 			if (!g_mutex)
144 				g_mutex = new Mutex;
145 		catch {}
146 
147 		static if (EPOLL)
148 		{
149 
150 			if (!g_signalsBlocked)
151 				blockSignals();
152 			assert(m_instanceId <= __libc_current_sigrtmax(), "An additional event loop is unsupported due to SIGRTMAX restrictions in Linux Kernel");
153 			m_epollfd = epoll_create1(0);
154 
155 			if (catchError!"epoll_create1"(m_epollfd))
156 				return false;
157 				
158 			import core.sys.linux.sys.signalfd;
159 			import core.thread : getpid;
160 
161 			fd_t err;
162 			fd_t sfd;
163 
164 			sigset_t mask;
165 
166 			try {
167 				sigemptyset(&mask);
168 				sigaddset(&mask, __libc_current_sigrtmin());
169 				err = pthread_sigmask(SIG_BLOCK, &mask, null);
170 				if (catchError!"sigprocmask"(err))
171 				{
172 					m_status.code = Status.EVLOOP_FAILURE;
173 					return false;
174 				}
175 			} catch { }
176 
177 
178 
179 			sfd = signalfd(-1, &mask, SFD_NONBLOCK);
180 			assert(sfd > 0, "Failed to setup signalfd in epoll");
181 
182 			EventType evtype;
183 
184 			epoll_event _event;
185 			_event.events = EPOLLIN;
186 			evtype = EventType.Signal;
187 			try 
188 				m_evSignal = FreeListObjectAlloc!EventInfo.alloc(sfd, evtype, EventObject.init, m_instanceId);
189 			catch (Exception e){ 
190 				assert(false, "Allocation error"); 
191 			}
192 			_event.data.ptr = cast(void*) m_evSignal;
193 
194 			err = epoll_ctl(m_epollfd, EPOLL_CTL_ADD, sfd, &_event);
195 			if (catchError!"EPOLL_CTL_ADD(sfd)"(err))
196 			{
197 				return false;
198 			}
199 
200 		}
201 		else /* if KQUEUE */ 
202 		{
203 			try {
204 				if (!gs_queueMutex) {
205 					gs_queueMutex = FreeListObjectAlloc!ReadWriteMutex.alloc();
206 					gs_signalQueue = Array!(Array!AsyncSignal)();
207 					gs_idxQueue = Array!(Array!size_t)();
208 				}
209 				if (g_evIdxAvailable.empty) {
210 					g_evIdxAvailable.reserve(32);
211 					
212 					foreach (k; g_evIdxAvailable.length .. g_evIdxAvailable.capacity) {
213 						g_evIdxAvailable.insertBack(k + 1);
214 					}
215 					g_evIdxCapacity = 32;
216 					g_idxCapacity = 32;
217 				}
218 			} catch { assert(false, "Initialization failed"); }
219 			m_kqueuefd = kqueue();
220 			int err;
221 			try {
222 				sigset_t mask;
223 				sigemptyset(&mask);
224 				sigaddset(&mask, SIGXCPU);
225 				
226 				err = sigprocmask(SIG_BLOCK, &mask, null);
227 			} catch {}
228 
229 			EventType evtype = EventType.Signal;
230 
231 			// use GC because FreeListObjectAlloc fails at emplace for shared objects
232 			try 
233 				m_evSignal = FreeListObjectAlloc!EventInfo.alloc(SIGXCPU, evtype, EventObject.init, m_instanceId);
234 			catch (Exception e) {
235 				assert(false, "Failed to allocate resources");
236 			}
237 
238 			if (catchError!"siprocmask"(err))
239 				return 0;
240 
241 			kevent_t _event;
242 			EV_SET(&_event, SIGXCPU, EVFILT_SIGNAL, EV_ADD | EV_ENABLE, 0, 0, m_evSignal);
243 			err = kevent(m_kqueuefd, &_event, 1, null, 0, null);
244 			if (catchError!"kevent_add(SIGXCPU)"(err))
245 				assert(false, "Add SIGXCPU failed at kevent call");
246 		}
247 
248 		try log("init in thread " ~ Thread.getThis().name); catch {}
249 
250 		return true;
251 	}
252 
253 	void exit() {
254 		import core.sys.posix.unistd : close;
255 		static if (EPOLL) {
256 			close(m_epollfd); // not necessary?
257 
258 			// not necessary:
259 			//try FreeListObjectAlloc!EventInfo.free(m_evSignal);
260 			//catch (Exception e) { assert(false, "Failed to free resources"); }
261 
262 		}
263 		else
264 			close(m_kqueuefd);
265 	}
266 
267 	@property const(StatusInfo) status() const {
268 		return m_status;
269 	}
270 
271 	@property string error() const {
272 		string* ptr;
273 		return ((ptr = (m_error in EPosixMessages)) !is null) ? *ptr : string.init;
274 	}
275 
276 	bool loop(Duration timeout = 0.seconds)
277 	//in { assert(Fiber.getThis() is null); }
278 	{
279 
280 		import libasync.internals.memory;
281 		bool success = true;
282 		int num;
283 
284 		static if (EPOLL) {
285 
286 			static align(1) epoll_event[] events;
287 			if (events is null)
288 			{
289 				try events = new epoll_event[128];
290 				catch (Exception e) {
291 					assert(false, "Could not allocate events array: " ~ e.msg);
292 				}
293 			}
294 			int timeout_ms;
295 			if (timeout == 0.seconds)
296 				timeout_ms = -1;
297 			else timeout_ms = cast(int)timeout.total!"msecs";
298 
299 			/// Retrieve pending events
300 			num = epoll_wait(m_epollfd, cast(epoll_event*)&events[0], 128, timeout_ms);
301 
302 			assert(events !is null && events.length <= 128);
303 
304 			
305 		}
306 		else /* if KQUEUE */ {
307 			import core.sys.posix.time : time_t;
308 			import core.sys.posix.config : c_long;
309 			static kevent_t[] events;
310 			if (events.length == 0) {
311 				try events = allocArray!kevent_t(manualAllocator(), 128); 
312 				catch (Exception e) { assert(false, "Could not allocate events array"); }				
313 			}
314 			time_t secs = timeout.split!("seconds", "nsecs")().seconds;
315 			c_long ns = timeout.split!("seconds", "nsecs")().nsecs;
316 			auto tspec = libasync.internals.kqueue.timespec(secs, ns);
317 
318 			num = kevent(m_kqueuefd, null, 0, cast(kevent_t*) events, cast(int) events.length, &tspec);
319 
320 		}
321 
322 		auto errors = [	tuple(EINTR, Status.EVLOOP_TIMEOUT) ];
323 		
324 		if (catchEvLoopErrors!"event_poll'ing"(num, errors)) 
325 			return false;
326 
327 		if (num > 0)
328 			log("Got " ~ num.to!string ~ " event(s)");
329 
330 		foreach(i; 0 .. num) {
331 			success = false;
332 			m_status = StatusInfo.init;
333 			static if (EPOLL) 
334 			{
335 				epoll_event _event = events[i];
336 				try log("Event " ~ i.to!string ~ " of: " ~ events.length.to!string); catch {}
337 				EventInfo* info = cast(EventInfo*) _event.data.ptr;
338 				int event_flags = cast(int) _event.events;
339 			}
340 			else /* if KQUEUE */
341 			{
342 				kevent_t _event = events[i];
343 				EventInfo* info = cast(EventInfo*) _event.udata;
344 				//log("Got info");
345 				int event_flags = (_event.filter << 16) | (_event.flags & 0xffff);
346 				//log("event flags");
347 			}
348 
349 			//if (info.owner != m_instanceId)
350 			//	try log("Event " ~ (cast(int)(info.evType)).to!string ~ " is invalid: supposidly created in instance #" ~ info.owner.to!string ~ ", received in " ~ m_instanceId.to!string ~ " event: " ~ event_flags.to!string);
351 			//	catch{}
352 			//log("owner");
353 			final switch (info.evType) {
354 				case EventType.TCPAccept:
355 					if (info.fd == 0)
356 						break;
357 					success = onTCPAccept(info.fd, info.evObj.tcpAcceptHandler, event_flags);
358 					break;
359 
360 				case EventType.Notifier:
361 
362 					log("Got notifier!");
363 					try info.evObj.notifierHandler();
364 					catch (Exception e) {
365 						setInternalError!"notifierHandler"(Status.ERROR);
366 					}
367 					break;
368 
369 				case EventType.DirectoryWatcher:
370 					log("Got DirectoryWatcher event!");
371 					static if (!EPOLL) {
372 						// in KQUEUE all events will be consumed here, because they must be pre-processed
373 						try {
374 							DWFileEvent fevent;
375 							if (_event.fflags & (NOTE_LINK | NOTE_WRITE))
376 								fevent = DWFileEvent.CREATED;
377 							else if (_event.fflags & NOTE_DELETE)
378 								fevent = DWFileEvent.DELETED;
379 							else if (_event.fflags & (NOTE_ATTRIB | NOTE_EXTEND | NOTE_WRITE))
380 								fevent = DWFileEvent.MODIFIED;
381 							else if (_event.fflags & NOTE_RENAME)
382 								fevent = DWFileEvent.MOVED_FROM;
383 							else if (_event.fflags & NOTE_RENAME)
384 								fevent = DWFileEvent.MOVED_TO;
385 							else
386 								assert(false, "No event found?");
387 
388 							DWFolderInfo fi = m_dwFolders.get(cast(fd_t)_event.ident, DWFolderInfo.init);
389 
390 							if (fi == DWFolderInfo.init) {
391 								DWFileInfo tmp = m_dwFiles.get(cast(fd_t)_event.ident, DWFileInfo.init);
392 								assert(tmp != DWFileInfo.init, "The event loop returned an invalid file's file descriptor for the directory watcher");
393 								fi = m_dwFolders.get(cast(fd_t) tmp.folder, DWFolderInfo.init);
394 								assert(fi != DWFolderInfo.init, "The event loop returned an invalid folder file descriptor for the directory watcher");
395 							}
396 
397 							// all recursive events will be generated here
398 							if (!compareFolderFiles(fi, fevent)) {
399 								continue;
400 							}
401 
402 						} catch (Exception e) {
403 							log("Could not process DirectoryWatcher event: " ~ e.msg);
404 							break;
405 						}
406 
407 					}
408 
409 					try info.evObj.dwHandler();
410 					catch (Exception e) {
411 						setInternalError!"dwHandler"(Status.ERROR);
412 					}
413 					break;
414 
415 				case EventType.Timer:
416 					try log("Got timer! " ~ info.fd.to!string); catch {}
417 					static if (EPOLL) {
418 						static long val;
419 						import core.sys.posix.unistd : read;
420 						read(info.evObj.timerHandler.ctxt.id, &val, long.sizeof);
421 					}
422 					try info.evObj.timerHandler();
423 					catch (Exception e) {
424 						setInternalError!"timerHandler"(Status.ERROR);
425 					}
426 					static if (!EPOLL) {
427 						if (info.evObj.timerHandler.ctxt.oneShot && !info.evObj.timerHandler.ctxt.rearmed) {
428 							destroyIndex(info.evObj.timerHandler.ctxt);
429 							info.evObj.timerHandler.ctxt.id = 0;
430 						}
431 					}
432 					break;
433 
434 				case EventType.Signal:
435 					try log("Got signal!"); catch {}
436 
437 					static if (EPOLL) {
438 						
439 						try log("Got signal: " ~ info.fd.to!string ~ " of type: " ~ info.evType.to!string); catch {}
440 						import core.sys.linux.sys.signalfd : signalfd_siginfo;
441 						import core.sys.posix.unistd : read;
442 						signalfd_siginfo fdsi;
443 						fd_t err = cast(fd_t)read(info.fd, &fdsi, fdsi.sizeof);
444 						shared AsyncSignal sig = cast(shared AsyncSignal) cast(void*) fdsi.ssi_ptr;
445 
446 						try sig.handler();
447 						catch (Exception e) {
448 							setInternalError!"signal handler"(Status.ERROR);
449 						}
450 
451 
452 					}
453 					else /* if KQUEUE */
454 					{
455 						static AsyncSignal[] sigarr;
456 						
457 						if (sigarr.length == 0) {
458 							try sigarr = new AsyncSignal[32]; 
459 							catch (Exception e) { assert(false, "Could not allocate signals array"); }		
460 						}
461 
462 						bool more = popSignals(sigarr);
463 						foreach (AsyncSignal sig; sigarr)
464 						{
465 							shared AsyncSignal ptr = cast(shared AsyncSignal) sig;
466 							if (ptr is null)
467 								break;
468 							try (cast(shared AsyncSignal)sig).handler();
469 							catch (Exception e) {
470 								setInternalError!"signal handler"(Status.ERROR);
471 							}
472 						}
473 					}
474 					break;
475 
476 				case EventType.UDPSocket:
477 					import core.sys.posix.unistd : close;
478 					success = onUDPTraffic(info.fd, info.evObj.udpHandler, event_flags);
479 
480 					nothrow void abortHandler(bool graceful) {
481 
482 						close(info.fd);
483 						info.evObj.udpHandler.conn.socket = 0;
484 						try info.evObj.udpHandler(UDPEvent.ERROR);
485 						catch (Exception e) { }
486 						try FreeListObjectAlloc!EventInfo.free(info);
487 						catch (Exception e){ assert(false, "Error freeing resources"); }
488 					}
489 					
490 					if (!success && m_status.code == Status.ABORT) {
491 						abortHandler(true);
492 						
493 					}
494 					else if (!success && m_status.code == Status.ERROR) {
495 						abortHandler(false); 
496 					}
497 					break;
498 				case EventType.TCPTraffic:
499 					assert(info.evObj.tcpEvHandler.conn !is null, "TCP Connection invalid");
500 
501 					success = onTCPTraffic(info.fd, info.evObj.tcpEvHandler, event_flags, info.evObj.tcpEvHandler.conn);
502 
503 					nothrow void abortTCPHandler(bool graceful) {
504 
505 						nothrow void closeAll() {
506 							try log("closeAll()"); catch {}
507 							if (info.evObj.tcpEvHandler.conn.connected)
508 								closeSocket(info.fd, true, true);
509 							
510 							info.evObj.tcpEvHandler.conn.socket = 0;
511 						}
512 
513 						/// Close the connection after an unexpected socket error
514 						if (graceful) {
515 							try info.evObj.tcpEvHandler(TCPEvent.CLOSE);
516 							catch (Exception e) { }
517 							closeAll();
518 						}
519 
520 						/// Kill the connection after an internal error
521 						else {
522 							try info.evObj.tcpEvHandler(TCPEvent.ERROR);
523 							catch (Exception e) { }
524 							closeAll();
525 						}
526 
527 						if (info.evObj.tcpEvHandler.conn.inbound) {
528 							log("Freeing inbound connection FD#" ~ info.fd.to!string);
529 							try FreeListObjectAlloc!AsyncTCPConnection.free(info.evObj.tcpEvHandler.conn);
530 							catch (Exception e){ assert(false, "Error freeing resources"); }
531 						}
532 						try FreeListObjectAlloc!EventInfo.free(info);
533 						catch (Exception e){ assert(false, "Error freeing resources"); }
534 					}
535 
536 					if (!success && m_status.code == Status.ABORT) {
537 						abortTCPHandler(true);
538 					}
539 					else if (!success && m_status.code == Status.ERROR) {
540 						abortTCPHandler(false);
541 					}
542 
543 					break;
544 			}
545 
546 		}
547 		return success;
548 	}
549 
550 	bool setOption(T)(fd_t fd, TCPOption option, in T value) {
551 		m_status = StatusInfo.init;
552 		import std.traits : isIntegral;
553 
554 		import libasync.internals.socket_compat : socklen_t, setsockopt, SO_REUSEADDR, SO_KEEPALIVE, SO_RCVBUF, SO_SNDBUF, SO_RCVTIMEO, SO_SNDTIMEO, SO_LINGER, SOL_SOCKET, IPPROTO_TCP, TCP_NODELAY, TCP_QUICKACK, TCP_KEEPCNT, TCP_KEEPINTVL, TCP_KEEPIDLE, TCP_CONGESTION, TCP_CORK, TCP_DEFER_ACCEPT;
555 		int err;
556 		nothrow bool errorHandler() {
557 			if (catchError!"setOption:"(err)) {
558 				try m_status.text ~= option.to!string;
559 				catch (Exception e){ assert(false, "to!string conversion failure"); }
560 				return false;
561 			}
562 
563 			return true;
564 		}
565 		final switch (option) {
566 			case TCPOption.NODELAY: // true/false
567 				static if (!is(T == bool))
568 					assert(false, "NODELAY value type must be bool, not " ~ T.stringof);
569 				else {
570 					int val = value?1:0;
571 					socklen_t len = val.sizeof;
572 					err = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, len);
573 					return errorHandler();
574 				}
575 			case TCPOption.REUSEADDR: // true/false
576 				static if (!is(T == bool))
577 					assert(false, "REUSEADDR value type must be bool, not " ~ T.stringof);
578 				else {
579 					int val = value?1:0;
580 					socklen_t len = val.sizeof;
581 					err = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &val, len);
582 					if (!errorHandler())
583 						return false;
584 					// BSD systems have SO_REUSEPORT
585 					import libasync.internals.socket_compat : SO_REUSEPORT;
586 					err = setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &val, len);
587 
588 					return errorHandler();
589 				}
590 			case TCPOption.QUICK_ACK:
591 				static if (!is(T == bool))
592 					assert(false, "QUICK_ACK value type must be bool, not " ~ T.stringof);
593 				else {
594 					static if (EPOLL) {
595 						int val = value?1:0;
596 						socklen_t len = val.sizeof;
597 						err = setsockopt(fd, IPPROTO_TCP, TCP_QUICKACK, &val, len);
598 						return errorHandler();
599 					}
600 					else /* not linux */ {
601 						return false;
602 					}
603 				}
604 			case TCPOption.KEEPALIVE_ENABLE: // true/false
605 				static if (!is(T == bool))
606 					assert(false, "KEEPALIVE_ENABLE value type must be bool, not " ~ T.stringof);
607 				else
608 				{
609 					int val = value?1:0;
610 					socklen_t len = val.sizeof;
611 					err = setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &val, len);
612 					return errorHandler();
613 				}
614 			case TCPOption.KEEPALIVE_COUNT: // ##
615 				static if (!isIntegral!T)
616 					assert(false, "KEEPALIVE_COUNT value type must be integral, not " ~ T.stringof);
617 				else {
618 					int val = value.total!"msecs".to!uint;
619 					socklen_t len = val.sizeof;
620 					err = setsockopt(fd, IPPROTO_TCP, TCP_KEEPCNT, &val, len);
621 					return errorHandler();
622 				}
623 			case TCPOption.KEEPALIVE_INTERVAL: // wait ## seconds
624 				static if (!is(T == Duration))
625 					assert(false, "KEEPALIVE_INTERVAL value type must be Duration, not " ~ T.stringof);
626 				else {
627 					int val;
628 					try val = value.total!"seconds".to!uint; catch { return false; }
629 					socklen_t len = val.sizeof;
630 					err = setsockopt(fd, IPPROTO_TCP, TCP_KEEPINTVL, &val, len);
631 					return errorHandler();
632 				}
633 			case TCPOption.KEEPALIVE_DEFER: // wait ## seconds until start
634 				static if (!is(T == Duration))
635 					assert(false, "KEEPALIVE_DEFER value type must be Duration, not " ~ T.stringof);
636 				else {
637 					int val;
638 					try val = value.total!"seconds".to!uint; catch { return false; }
639 					socklen_t len = val.sizeof;
640 					err = setsockopt(fd, IPPROTO_TCP, TCP_KEEPIDLE, &val, len);
641 					return errorHandler();
642 				}
643 			case TCPOption.BUFFER_RECV: // bytes
644 				static if (!isIntegral!T)
645 					assert(false, "BUFFER_RECV value type must be integral, not " ~ T.stringof);
646 				else {
647 					int val = value.to!int;
648 					socklen_t len = val.sizeof;
649 					err = setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &val, len);
650 					return errorHandler();
651 				}
652 			case TCPOption.BUFFER_SEND: // bytes
653 				static if (!isIntegral!T)
654 					assert(false, "BUFFER_SEND value type must be integral, not " ~ T.stringof);
655 				else {
656 					int val = value.to!int;
657 					socklen_t len = val.sizeof;
658 					err = setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &val, len);
659 					return errorHandler();
660 				}
661 			case TCPOption.TIMEOUT_RECV:
662 				static if (!is(T == Duration))
663 					assert(false, "TIMEOUT_RECV value type must be Duration, not " ~ T.stringof);
664 				else {
665 					import core.sys.posix.sys.time : timeval;
666 					time_t secs = value.split!("seconds", "usecs")().seconds;
667 					suseconds_t us;
668 					try us = value.split!("seconds", "usecs")().usecs.to!suseconds_t; catch {}
669 					timeval t = timeval(secs, us);
670 					socklen_t len = t.sizeof;
671 					err = setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, &t, len);
672 					return errorHandler();
673 				}
674 			case TCPOption.TIMEOUT_SEND:
675 				static if (!is(T == Duration))
676 					assert(false, "TIMEOUT_SEND value type must be Duration, not " ~ T.stringof);
677 				else {
678 					import core.sys.posix.sys.time : timeval;
679 					time_t secs = value.split!("seconds", "usecs")().seconds;
680 					suseconds_t us;
681 					try us = value.split!("seconds", "usecs")().usecs.to!suseconds_t; catch {}
682 					timeval t = timeval(secs, us);
683 					socklen_t len = t.sizeof;
684 					err = setsockopt(fd, SOL_SOCKET, SO_SNDTIMEO, &t, len);
685 					return errorHandler();
686 				}
687 			case TCPOption.TIMEOUT_HALFOPEN:
688 				static if (!is(T == Duration))
689 					assert(false, "TIMEOUT_SEND value type must be Duration, not " ~ T.stringof);
690 				else {
691 					uint val;
692 					try val = value.total!"msecs".to!uint; catch {
693 						return false;
694 					}
695 					socklen_t len = val.sizeof;
696 					err = setsockopt(fd, SOL_SOCKET, SO_SNDTIMEO, &val, len);
697 					return errorHandler();
698 				}
699 			case TCPOption.LINGER: // bool onOff, int seconds
700 				static if (!is(T == Tuple!(bool, int)))
701 					assert(false, "LINGER value type must be Tuple!(bool, int), not " ~ T.stringof);
702 				else {
703 					linger l = linger(val[0]?1:0, val[1]);
704 					socklen_t llen = l.sizeof;
705 					err = setsockopt(fd, SOL_SOCKET, SO_LINGER, &l, llen);
706 					return errorHandler();
707 				}
708 			case TCPOption.CONGESTION:
709 				static if (!isIntegral!T)
710 					assert(false, "CONGESTION value type must be integral, not " ~ T.stringof);
711 				else {
712 					int val = value.to!int;
713 					len = int.sizeof;
714 					err = setsockopt(fd, IPPROTO_TCP, TCP_CONGESTION, &val, len);
715 					return errorHandler();
716 				}
717 			case TCPOption.CORK:
718 				static if (!isIntegral!T)
719 					assert(false, "CORK value type must be int, not " ~ T.stringof);
720 				else {
721 					static if (EPOLL) {
722 						int val = value.to!int;
723 						socklen_t len = val.sizeof;
724 						err = setsockopt(fd, IPPROTO_TCP, TCP_CORK, &val, len);
725 						return errorHandler();
726 					}
727 					else /* if KQUEUE */ {
728 						int val = value.to!int;
729 						socklen_t len = val.sizeof;
730 						err = setsockopt(fd, IPPROTO_TCP, TCP_NOPUSH, &val, len);
731 						return errorHandler();
732 
733 					}
734 				}
735 			case TCPOption.DEFER_ACCEPT: // seconds
736 				static if (!isIntegral!T)
737 					assert(false, "DEFER_ACCEPT value type must be integral, not " ~ T.stringof);
738 				else {
739 					static if (EPOLL) {
740 						int val = value.to!int;
741 						socklen_t len = val.sizeof;
742 						err = setsockopt(fd, IPPROTO_TCP, TCP_DEFER_ACCEPT, &val, len);
743 						return errorHandler();
744 					}
745 					else /* if KQUEUE */ {
746 						// todo: Emulate DEFER_ACCEPT with ACCEPT_FILTER(9)
747 						/*int val = value.to!int;
748 						socklen_t len = val.sizeof;
749 						err = setsockopt(fd, SOL_SOCKET, SO_ACCEPTFILTER, &val, len);
750 						return errorHandler();
751 						*/
752 						assert(false, "TCPOption.DEFER_ACCEPT is not implemented");
753 					}
754 				}
755 		}
756 
757 	}
758 
759 	uint recv(in fd_t fd, ref ubyte[] data)
760 	{
761 		try log("Recv from FD: " ~ fd.to!string); catch {}
762 		m_status = StatusInfo.init;
763 		import libasync.internals.socket_compat : recv;
764 		int ret = cast(int) recv(fd, cast(void*) data.ptr, data.length, cast(int)0);
765 		
766 		static if (LOG) log(".recv " ~ ret.to!string ~ " bytes of " ~ data.length.to!string ~ " @ " ~ fd.to!string);
767 		if (catchError!".recv"(ret)){ // ret == SOCKET_ERROR == -1 ?
768 			if (m_error == EPosix.EWOULDBLOCK || m_error == EPosix.EAGAIN)
769 				m_status.code = Status.ASYNC;
770 
771 			return 0; // TODO: handle some errors more specifically
772 		}
773 
774 		m_status.code = Status.OK;
775 		
776 		return cast(uint) ret < 0 ? 0 : ret;
777 	}
778 	
779 	uint send(in fd_t fd, in ubyte[] data)
780 	{
781 		try log("Send to FD: " ~ fd.to!string); catch {}
782 		m_status = StatusInfo.init;
783 		import libasync.internals.socket_compat : send;
784 		int ret = cast(int) send(fd, cast(const(void)*) data.ptr, data.length, cast(int)0);
785 
786 		if (catchError!"send"(ret)) { // ret == -1
787 			if (m_error == EPosix.EWOULDBLOCK || m_error == EPosix.EAGAIN) {
788 				m_status.code = Status.ASYNC;
789 				return 0;
790 			}
791 		}
792 		m_status.code = Status.OK;
793 		return cast(uint) ret < 0 ? 0 : ret;
794 	}
795 
796 	uint recvFrom(in fd_t fd, ref ubyte[] data, ref NetworkAddress addr)
797 	{
798 		import libasync.internals.socket_compat : recvfrom, AF_INET6, AF_INET, socklen_t;
799 
800 		m_status = StatusInfo.init;
801 
802 		addr.family = AF_INET6;
803 		socklen_t addrLen = addr.sockAddrLen;
804 		long ret = recvfrom(fd, cast(void*) data.ptr, data.length, 0, addr.sockAddr, &addrLen);
805 		
806 		if (addrLen < addr.sockAddrLen) {
807 			addr.family = AF_INET;
808 		}
809 		
810 		try log("RECVFROM " ~ ret.to!string ~ "B"); catch {}
811 		if (catchError!".recvfrom"(ret)) { // ret == -1
812 			if (m_error == EPosix.EWOULDBLOCK || m_error == EPosix.EAGAIN)
813 				m_status.code = Status.ASYNC;
814 			return 0;
815 		}
816 
817 		m_status.code = Status.OK;
818 		
819 		return cast(uint) ret;
820 	}
821 	
822 	uint sendTo(in fd_t fd, in ubyte[] data, in NetworkAddress addr)
823 	{
824 		import libasync.internals.socket_compat : sendto;
825 
826 		m_status = StatusInfo.init;
827 
828 		try log("SENDTO " ~ data.length.to!string ~ "B");
829 		catch{}
830 		long ret = sendto(fd, cast(void*) data.ptr, data.length, 0, addr.sockAddr, addr.sockAddrLen);
831 		
832 		if (catchError!".sendto"(ret)) { // ret == -1
833 			if (m_error == EPosix.EWOULDBLOCK || m_error == EPosix.EAGAIN)
834 				m_status.code = Status.ASYNC;
835 			return 0;
836 		}
837 
838 		m_status.code = Status.OK;
839 		return cast(uint) ret;
840 	}
841 
842 	NetworkAddress localAddr(in fd_t fd, bool ipv6) {
843 		NetworkAddress ret;
844 		import libasync.internals.socket_compat : getsockname, AF_INET, AF_INET6, socklen_t, sockaddr;
845 
846 		if (ipv6)
847 			ret.family = AF_INET6;
848 		else
849 			ret.family = AF_INET;
850 
851 		socklen_t len = ret.sockAddrLen;
852 		int err = getsockname(fd, ret.sockAddr, &len);
853 		if (catchError!"getsockname"(err))
854 			return NetworkAddress.init;
855 		if (len > ret.sockAddrLen)
856 			ret.family = AF_INET6;
857 
858 		return ret;
859 	}
860 
861 	bool notify(in fd_t fd, AsyncNotifier ctxt)
862 	{
863 		static if (EPOLL)
864 		{
865 			import core.sys.posix.unistd : write;
866 
867 			long val = 1;
868 			fd_t err = cast(fd_t) write(fd, &val, long.sizeof);
869 			
870 			if (catchError!"write(notify)"(err)) {
871 				return false;
872 			}
873 			return true;
874 		}
875 		else /* if KQUEUE */
876 		{			
877 			kevent_t _event;
878 			EV_SET(&_event, fd, EVFILT_USER, EV_ENABLE | EV_CLEAR, NOTE_TRIGGER | 0x1, 0, ctxt.evInfo);
879 			int err = kevent(m_kqueuefd, &_event, 1, null, 0, null);
880 			
881 			if (catchError!"kevent_notify"(err)) {
882 				return false;
883 			}
884 			return true;
885 		}
886 	}
887 
888 	bool notify(in fd_t fd, shared AsyncSignal ctxt)
889 	{
890 		static if (EPOLL) 
891 		{
892 
893 			sigval sigvl;
894 			fd_t err;
895 			sigvl.sival_ptr = cast(void*) ctxt;
896 			try err = pthread_sigqueue(ctxt.pthreadId, fd, sigvl); catch {}
897 			if (catchError!"sigqueue"(err)) {
898 				return false;
899 			}
900 		}
901 		else /* if KQUEUE */ 
902 		{
903 
904 			import core.thread : getpid;
905 
906 			addSignal(ctxt);
907 
908 			try {
909 				log("Notified fd: " ~ fd.to!string ~ " of PID " ~ getpid().to!string); 
910 				int err = core.sys.posix.signal.kill(getpid(), SIGXCPU);
911 				if (catchError!"notify(signal)"(err))
912 					assert(false, "Signal could not be raised");
913 			} catch {}
914 		}
915 
916 		return true;
917 	}
918 
919 	// no known uses
920 	uint read(in fd_t fd, ref ubyte[] data)
921 	{
922 		m_status = StatusInfo.init;
923 		return 0;
924 	}
925 
926 	// no known uses
927 	uint write(in fd_t fd, in ubyte[] data)
928 	{
929 		m_status = StatusInfo.init;
930 		return 0;
931 	}
932 
933 	uint watch(in fd_t fd, in WatchInfo info) {
934 		// note: info.wd is still 0 at this point.
935 		m_status = StatusInfo.init;
936 		import core.sys.linux.sys.inotify;
937 		import std.file : dirEntries, isDir, SpanMode;
938 
939 		static if (EPOLL) {
940 			// Manually handle recursivity... All events show up under the same inotify
941 			uint events = info.events; // values for this API were pulled from inotify
942 			if (events & IN_DELETE)
943 				events |= IN_DELETE_SELF;
944 			if (events & IN_MOVED_FROM)
945 				events |= IN_MOVE_SELF;
946 
947 			nothrow fd_t addFolderRecursive(Path path) {
948 				fd_t ret;
949 				try {
950 					ret = inotify_add_watch(fd, path.toNativeString().toStringz, events);
951 					if (catchError!"inotify_add_watch"(ret))
952 						return fd_t.init;
953 					try log("inotify_add_watch(" ~ DWFolderInfo(WatchInfo(info.events, path, info.recursive, ret), fd).to!string ~ ")"); catch {}
954 					assert(m_dwFolders.get(tuple(cast(fd_t) fd, cast(uint)ret), DWFolderInfo.init) == DWFolderInfo.init, "Could not get a unique watch descriptor for path, got: " ~ m_dwFolders[tuple(cast(fd_t)fd, cast(uint)ret)].to!string);
955 					m_dwFolders[tuple(cast(fd_t)fd, cast(uint)ret)] = DWFolderInfo(WatchInfo(info.events, path, info.recursive, ret), fd);
956 					if (info.recursive) {
957 						foreach (de; path.toNativeString().dirEntries(SpanMode.shallow))
958 						{
959 							Path de_path = Path(de.name);
960 							if (!de_path.absolute)
961 								de_path = path ~ Path(de.name);
962 							if (isDir(de_path.toNativeString()))
963 								if (addFolderRecursive(de_path) == 0)
964 									return 0;
965 						}
966 					}
967 
968 				} catch (Exception e) { 
969 					try setInternalError!"inotify_add_watch"(Status.ERROR, "Could not add directory " ~ path.toNativeString() ~ ": " ~ e.toString() ); catch {}
970 					return 0; 
971 				}
972 
973 				return ret;
974 			}
975 
976 			return addFolderRecursive(info.path);
977 
978 		} else /* if KQUEUE */ {
979 			/// Manually handle recursivity & file tracking. Each folder is an event! 
980 			/// E.g. file creation shows up as a folder change, we must be prepared to seek the file.
981 			import core.sys.posix.fcntl;
982 			import libasync.internals.kqueue;
983 
984 			uint events;
985 			if (info.events & DWFileEvent.CREATED)
986 				events |= NOTE_LINK | NOTE_WRITE;
987 			if (info.events & DWFileEvent.DELETED)
988 				events |= NOTE_DELETE;
989 			if (info.events & DWFileEvent.MODIFIED)
990 				events |= NOTE_ATTRIB | NOTE_EXTEND | NOTE_WRITE;
991 			if (info.events & DWFileEvent.MOVED_FROM)
992 				events |= NOTE_RENAME;
993 			if (info.events & DWFileEvent.MOVED_TO)
994 				events |= NOTE_RENAME;
995 
996 			EventInfo* evinfo;
997 			try evinfo = m_watchers[fd]; catch { assert(false, "Could retrieve event info, directory watcher was not initialized properly, or you are operating on a closed directory watcher."); }
998 
999 			/// we need a file descriptor for the containers, so we open files but we don't monitor them
1000 			/// todo: track indexes internally?
1001 			nothrow fd_t addRecursive(Path path, bool is_dir) {
1002 				int ret;
1003 				try {
1004 					log("Adding path: " ~ path.toNativeString());
1005 
1006 					ret = open(path.toNativeString().toStringz, O_EVTONLY);
1007 					if (catchError!"open(watch)"(ret))
1008 						return 0;
1009 
1010 					if (is_dir)
1011 						m_dwFolders[ret] = DWFolderInfo(WatchInfo(info.events, path, info.recursive, ret), fd);
1012 
1013 					kevent_t _event;
1014 					
1015 					EV_SET(&_event, ret, EVFILT_VNODE, EV_ADD | EV_CLEAR, events, 0, cast(void*) evinfo);
1016 					
1017 					int err = kevent(m_kqueuefd, &_event, 1, null, 0, null);
1018 					
1019 					if (catchError!"kevent_timer_add"(err))
1020 						return 0;
1021 
1022 
1023 					if (is_dir) foreach (de; dirEntries(path.toNativeString(), SpanMode.shallow)) {
1024 						Path filePath = Path(de.name);
1025 						if (!filePath.absolute)
1026 							filePath = path ~ filePath;
1027 						fd_t fwd;
1028 						if (info.recursive && isDir(filePath.toNativeString()))
1029 							fwd = addRecursive(filePath, true);
1030 						else {
1031 							fwd = addRecursive(filePath, false); // gets an ID but will not scan
1032 							m_dwFiles[fwd] = DWFileInfo(ret, filePath, de.timeLastModified, isDir(filePath.toNativeString()));
1033 						}
1034 						
1035 					}
1036 
1037 				} catch (Exception e) { 
1038 					try setInternalError!"inotify_add_watch"(Status.ERROR, "Could not add directory " ~ path.toNativeString() ~ ": " ~ e.msg);  catch {}
1039 					return 0; 
1040 				}
1041 				return ret;
1042 			}
1043 
1044 			fd_t wd;
1045 
1046 			try {
1047 				wd = addRecursive(info.path, isDir(info.path.toNativeString()));
1048 
1049 				if (wd == 0)
1050 					return 0;
1051 
1052 			}
1053 			catch (Exception e) {
1054 				setInternalError!"dw.watch"(Status.ERROR, "Failed to watch directory: " ~ e.msg); 
1055 			}
1056 
1057 			return cast(uint) wd;
1058 		}
1059 	}
1060 
1061 	bool unwatch(in fd_t fd, in uint wd) {
1062 		// the wd can be used with m_dwFolders to find the DWFolderInfo
1063 		// and unwatch everything recursively.
1064 
1065 		m_status = StatusInfo.init;
1066 		static if (EPOLL) {
1067 			/// If recursive, all subfolders must also be unwatched recursively by removing them
1068 			/// from containers and from inotify
1069 			import core.sys.linux.sys.inotify;
1070 
1071 			nothrow bool removeAll(DWFolderInfo fi) {
1072 				int err;
1073 				try {
1074 
1075 					bool inotify_unwatch(uint wd) {
1076 						err = inotify_rm_watch(fd, wd);
1077 
1078 						if (catchError!"inotify_rm_watch"(err))
1079 							return false;
1080 						return true;
1081 					}
1082 
1083 					if (!inotify_unwatch(fi.wi.wd))
1084 						return false;
1085 
1086 					/*foreach (ref const fd_t id, ref const DWFileInfo file; m_dwFiles)
1087 					{
1088 						if (file.folder == fi.wi.wd) {
1089 							inotify_unwatch(id);
1090 							m_dwFiles.remove(id);
1091 						}
1092 					}*/
1093 					m_dwFolders.remove(tuple(cast(fd_t)fd, fi.wi.wd)); 
1094 
1095 					if (fi.wi.recursive) {
1096 						// find all subdirectories by comparing the path
1097 						Array!uint remove_list;
1098 						foreach (ref const DWFolderInfo folder; m_dwFolders) {
1099 							if (folder.fd == fi.fd && folder.wi.path.startsWith(fi.wi.path)) {
1100 
1101 								if (!inotify_unwatch(folder.wi.wd))
1102 									return false;
1103 
1104 								remove_list.insertBack(fi.wi.wd);
1105 							}
1106 						}
1107 						foreach (rm_wd; remove_list[])
1108 							m_dwFolders.remove(tuple(cast(fd_t) fd, rm_wd));
1109 
1110 					}
1111 					return true;
1112 				} catch (Exception e) { 
1113 					try setInternalError!"inotify_rm_watch"(Status.ERROR, "Could not unwatch directory: " ~ e.toString()); catch {}
1114 					return false; 
1115 				}
1116 			}
1117 
1118 			DWFolderInfo info;
1119 
1120 			try {
1121 				info = m_dwFolders.get(tuple(cast(fd_t) fd, cast(uint) wd), DWFolderInfo.init);
1122 				if (info == DWFolderInfo.init) {
1123 					setInternalError!"dwFolders.get(wd)"(Status.ERROR, "Could not find watch info for wd " ~ wd.to!string);
1124 					return false;
1125 				}
1126 			} catch { }
1127 
1128 			return removeAll(info);
1129 		}
1130 		else /* if KQUEUE */ {
1131 
1132 			/// Recursivity must be handled manually, so we must unwatch subfiles and subfolders
1133 			/// recursively, remove the container entries, close the file descriptor, and disable the vnode events.
1134 
1135 			nothrow bool removeAll(DWFolderInfo fi) {
1136 				import core.sys.posix.unistd : close;
1137 
1138 
1139 				bool event_unset(uint id) {
1140 					kevent_t _event;
1141 					EV_SET(&_event, cast(int) id, EVFILT_VNODE, EV_DELETE, 0, 0, null);
1142 					int err = kevent(m_kqueuefd, &_event, 1, null, 0, null);
1143 					if (catchError!"kevent_unwatch"(err))
1144 						return false;
1145 					return true;
1146 				}
1147 
1148 				bool removeFolder(uint wd) {
1149 					if (!event_unset(fi.wi.wd))
1150 						return false;
1151 					m_dwFolders.remove(fi.wi.wd);
1152 					int err = close(fi.wi.wd);
1153 					if (catchError!"close dir"(err))
1154 						return false;
1155 					return true;
1156 				}
1157 
1158 				try {
1159 					removeFolder(fi.wi.wd);
1160 
1161 					if (fi.wi.recursive) {
1162 						import std.container.array;
1163 						Array!fd_t remove_list; // keep track of unwatched folders recursively
1164 						Array!fd_t remove_file_list;
1165 						// search for subfolders and unset them / close their wd
1166 						foreach (ref const DWFolderInfo folder; m_dwFolders) {
1167 							if (folder.fd == fi.fd && folder.wi.path.startsWith(fi.wi.path)) {
1168 								 
1169 								if (!event_unset(folder.wi.wd))
1170 									return false;
1171 
1172 								// search for subfiles, close their descriptors and remove them from the file list
1173 								foreach (ref const fd_t fwd, ref const DWFileInfo file; m_dwFiles) {
1174 									if (file.folder == folder.wi.wd) {
1175 										close(fwd);
1176 										remove_file_list.insertBack(fwd); // to be removed from m_dwFiles without affecting the loop
1177 									}
1178 								}
1179 
1180 								remove_list.insertBack(folder.wi.wd); // to be removed from m_dwFolders without affecting the loop
1181 							}
1182 						}
1183 
1184 						foreach (wd; remove_file_list[])
1185 							m_dwFiles.remove(wd);
1186 
1187 						foreach (rm_wd; remove_list[])
1188 							removeFolder(rm_wd);
1189 
1190 
1191 					}
1192 				} catch (Exception e) {
1193 					try setInternalError!"dwFolders.get(wd)"(Status.ERROR, "Could not close the folder " ~ fi.to!string ~ ": " ~ e.toString()); catch {}
1194 					return false;
1195 				}
1196 
1197 				return true;
1198 			}
1199 
1200 			DWFolderInfo info;
1201 			try info = m_dwFolders.get(wd, DWFolderInfo.init); catch {}
1202 
1203 			if (!removeAll(info))
1204 				return false;
1205 			return true;
1206 		}
1207 	}
1208 
1209 	// returns the amount of changes
1210 	uint readChanges(in fd_t fd, ref DWChangeInfo[] dst) {
1211 		m_status = StatusInfo.init;
1212 
1213 		static if (EPOLL) {
1214 			assert(dst.length > 0, "DirectoryWatcher called with 0 length DWChangeInfo array");
1215 			import core.sys.linux.sys.inotify;
1216 			import core.sys.posix.unistd : read;
1217 			import core.stdc.stdio : FILENAME_MAX;
1218 			import core.stdc.string : strlen;
1219 			ubyte[inotify_event.sizeof + FILENAME_MAX + 1] buf = void;
1220 			ssize_t nread = read(fd, buf.ptr, cast(uint)buf.sizeof);
1221 			if (catchError!"read()"(nread))
1222 			{
1223 				if (m_error == EPosix.EWOULDBLOCK || m_error == EPosix.EAGAIN) 
1224 					m_status.code = Status.ASYNC;
1225 				return 0;
1226 			}
1227 			assert(nread > 0);
1228 		
1229 
1230 			/// starts (recursively) watching all newly created folders in a recursive entry,
1231 			/// creates events for additional files/folders founds, and unwatches all deleted folders
1232 			void recurseInto(DWFolderInfo fi, DWFileEvent ev, ref Array!DWChangeInfo changes) {
1233 				import std.file : dirEntries, SpanMode, isDir;
1234 				assert(fi.wi.recursive);
1235 				// get a list of stuff in the created/moved folder
1236 				if (ev == DWFileEvent.CREATED || ev == DWFileEvent.MOVED_TO) {
1237 					foreach (de; dirEntries(fi.wi.path.toNativeString(), SpanMode.shallow)) {
1238 						Path entryPath = fi.wi.path ~ Path(de.name);
1239 						
1240 						if (fi.wi.recursive && isDir(entryPath.toNativeString())) {
1241 							
1242 							watch(fd, WatchInfo(fi.wi.events, entryPath, fi.wi.recursive, 0) );
1243 							void genEvents(Path subpath) {
1244 								foreach (de; dirEntries(subpath.toNativeString(), SpanMode.shallow)) {
1245 									auto subsubpath = subpath ~ Path(de.name);
1246 									changes.insertBack(DWChangeInfo(DWFileEvent.CREATED, subsubpath));
1247 									if (isDir(subsubpath.toNativeString()))
1248 										genEvents(subsubpath);
1249 								}
1250 							}
1251 							
1252 							genEvents(entryPath);
1253 							
1254 						}
1255 					}
1256 				}
1257 			}
1258 
1259 			size_t i;
1260 			do
1261 			{
1262 				for (auto p = buf.ptr; p < buf.ptr + nread; )
1263 				{
1264 					inotify_event* ev = cast(inotify_event*)p;
1265 					p += inotify_event.sizeof + ev.len;
1266 
1267 					DWFileEvent evtype;
1268 					evtype = DWFileEvent.CREATED;
1269 					if (ev.mask & IN_CREATE)
1270 						evtype = DWFileEvent.CREATED;
1271 					if (ev.mask & IN_DELETE || ev.mask & IN_DELETE_SELF)
1272 						evtype = DWFileEvent.DELETED;
1273 					if (ev.mask & IN_MOVED_FROM || ev.mask & IN_MOVE_SELF)
1274 						evtype = DWFileEvent.MOVED_FROM;
1275 					if (ev.mask & (IN_MOVED_TO))
1276 						evtype = DWFileEvent.MOVED_TO;
1277 					if (ev.mask & IN_MODIFY)
1278 						evtype = DWFileEvent.MODIFIED;
1279 
1280 					import std.path : buildPath;
1281 					import core.stdc.string : strlen;
1282 					string name = cast(string) ev.name.ptr[0 .. cast(size_t) ev.name.ptr.strlen].idup;
1283 					DWFolderInfo fi;
1284 					Path path;
1285 					try {
1286 						fi = m_dwFolders.get(tuple(cast(fd_t)fd,cast(uint)ev.wd), DWFolderInfo.init);
1287 						if (fi == DWFolderInfo.init) {
1288 							setInternalError!"m_dwFolders[ev.wd]"(Status.ERROR, "Could not retrieve wd index in folders: " ~ ev.wd.to!string); 
1289 							continue;
1290 						}
1291 						path = fi.wi.path ~ Path(name); 
1292 					}
1293 					catch (Exception e) { 
1294 						setInternalError!"m_dwFolders[ev.wd]"(Status.ERROR, "Could not retrieve wd index in folders"); 
1295 						return 0; 
1296 					}
1297 
1298 					dst[i] = DWChangeInfo(evtype, path);
1299 					import std.file : isDir;
1300 					bool is_dir;
1301 					try is_dir = isDir(path.toNativeString()); catch {}
1302 					if (fi.wi.recursive && is_dir) {
1303 
1304 						try {
1305 							Array!DWChangeInfo changes;
1306 							recurseInto(fi, evtype, changes);
1307 							// stop watching if the folder was deleted
1308 							if (evtype == DWFileEvent.DELETED || evtype == DWFileEvent.MOVED_FROM) {
1309 								unwatch(fi.fd, fi.wi.wd);
1310 							}
1311 							foreach (change; changes[]) {
1312 								i++;
1313 								if (dst.length <= i)
1314 									dst ~= change;
1315 								else dst[i] = change;
1316 							}
1317 						}
1318 						catch (Exception e) {
1319 							setInternalError!"recurseInto"(Status.ERROR, "Failed to watch/unwatch contents of folder recursively."); 
1320 							return 0; 
1321 						}
1322 
1323 					}
1324 
1325 
1326 					i++;
1327 					if (i >= dst.length)
1328 						return cast(uint) i;
1329 				}
1330 				static if (LOG) foreach (j; 0 .. i) {
1331 					try log("Change occured for FD#" ~ fd.to!string ~ ": " ~ dst[j].to!string); catch {}
1332 				}
1333 				nread = read(fd, buf.ptr, buf.sizeof);
1334 				if (catchError!"read()"(nread)) {
1335 					if (m_error == EPosix.EWOULDBLOCK || m_error == EPosix.EAGAIN)
1336 						m_status.code = Status.ASYNC;
1337 					return cast(uint) i;
1338 				}
1339 			} while (nread > 0);
1340 
1341 			return cast(uint) i;
1342 		}
1343 		else /* if KQUEUE */ {
1344 			Array!(DWChangeInfo)* changes;
1345 			size_t i;
1346 			try {
1347 				changes = m_changes[fd];
1348 				import std.algorithm : min;
1349 				size_t cnt = min(dst.length, changes.length);
1350 				foreach (DWChangeInfo change; (*changes)[0 .. cnt]) {
1351 					dst[i] = (*changes)[i];
1352 					i++;
1353 				}
1354 				changes.linearRemove((*changes)[0 .. cnt]);
1355 			}
1356 			catch (Exception e) {
1357 				setInternalError!"watcher.readChanges"(Status.ERROR, "Could not read directory changes: " ~ e.msg);
1358 				return false;
1359 			}
1360 			return cast(uint) i;
1361 		}
1362 	}
1363 
1364 	bool broadcast(in fd_t fd, bool b) {
1365 		m_status = StatusInfo.init;
1366 
1367 		import libasync.internals.socket_compat : socklen_t, setsockopt, SO_BROADCAST, SOL_SOCKET;
1368 
1369 		int val = b?1:0;
1370 		socklen_t len = val.sizeof;
1371 		int err = setsockopt(fd, SOL_SOCKET, SO_BROADCAST, &val, len);
1372 		if (catchError!"setsockopt"(err))
1373 			return false;
1374 
1375 		return true;
1376 	}
1377 
1378 	private bool closeRemoteSocket(fd_t fd, bool forced) {
1379 		
1380 		int err;
1381 		log("shutdown");
1382 		import libasync.internals.socket_compat : shutdown, SHUT_WR, SHUT_RDWR, SHUT_RD;
1383 		if (forced) 
1384 			err = shutdown(fd, SHUT_RDWR);
1385 		else
1386 			err = shutdown(fd, SHUT_WR);
1387 
1388 		static if (!EPOLL) {
1389 			kevent_t[2] events;
1390 			try log("!!DISC delete events"); catch {}
1391 			EV_SET(&(events[0]), fd, EVFILT_READ, EV_DELETE | EV_DISABLE, 0, 0, null);
1392 			EV_SET(&(events[1]), fd, EVFILT_WRITE, EV_DELETE | EV_DISABLE, 0, 0, null);
1393 			kevent(m_kqueuefd, &(events[0]), 2, null, 0, null);
1394 
1395 		}
1396 
1397 		if (catchError!"shutdown"(err))
1398 			return false;
1399 
1400 		return true;
1401 	}
1402 
1403 	// for connected sockets
1404 	bool closeSocket(fd_t fd, bool connected, bool forced = false)
1405 	{
1406 		log("closeSocket");
1407 		if (connected && !closeRemoteSocket(fd, forced) && !forced)
1408 			return false;
1409 		
1410 		if (!connected || forced) {
1411 			// todo: flush the socket here?
1412 
1413 			import core.sys.posix.unistd : close;
1414 			log("close");
1415 			int err = close(fd);
1416 			if (catchError!"closesocket"(err)) 
1417 				return false;
1418 		}
1419 		return true;
1420 	}
1421 
1422 	
1423 	NetworkAddress getAddressFromIP(in string ipAddr, in ushort port = 0, in bool ipv6 = false, in bool tcp = true) 
1424 	in {
1425 		debug import libasync.internals.validator : validateIPv4, validateIPv6;
1426 		debug assert(validateIPv4(ipAddr) || validateIPv6(ipAddr), "Trying to connect to an invalid IP address");
1427 	}
1428 	body {
1429 		import libasync.internals.socket_compat : addrinfo, AI_NUMERICHOST, AI_NUMERICSERV;
1430 		addrinfo hints;
1431 		hints.ai_flags |= AI_NUMERICHOST | AI_NUMERICSERV; // Specific to an IP resolver!
1432 
1433 		return getAddressInfo(ipAddr, port, ipv6, tcp, hints);
1434 	}
1435 
1436 
1437 	NetworkAddress getAddressFromDNS(in string host, in ushort port = 0, in bool ipv6 = true, in bool tcp = true)
1438 	/*in { 
1439 		debug import libasync.internals.validator : validateHost;
1440 		debug assert(validateHost(host), "Trying to connect to an invalid domain"); 
1441 	}
1442 	body */{
1443 		import libasync.internals.socket_compat : addrinfo;
1444 		addrinfo hints;
1445 		return getAddressInfo(host, port, ipv6, tcp, hints);
1446 	}
1447 	
1448 	void setInternalError(string TRACE)(in Status s, in string details = "", in error_t error = EPosix.EACCES)
1449 	{
1450 		if (details.length > 0)
1451 			m_status.text = TRACE ~ ": " ~ details;
1452 		else m_status.text = TRACE;
1453 		m_error = error;
1454 		m_status.code = s;
1455 		static if(LOG) log(m_status);
1456 	}
1457 private:	
1458 
1459 	/// For DirectoryWatcher
1460 	/// In kqueue/vnode, all we get is the folder in which changes occured.
1461 	/// We have to figure out what changed exactly and put the results in a container 
1462 	/// for the readChanges call.
1463 	static if (!EPOLL) bool compareFolderFiles(DWFolderInfo fi, DWFileEvent events) {
1464 		import std.file;
1465 		import std.path : buildPath;
1466 		try {
1467 			Array!Path currFiles;
1468 			auto wd = fi.wi.wd;
1469 			auto path = fi.wi.path;
1470 			auto fd = fi.fd;
1471 			Array!(DWChangeInfo)* changes = m_changes.get(fd, null);
1472 			assert(changes !is null, "Invalid wd, could not find changes array.");
1473 			//import std.stdio : writeln;
1474 			//writeln("Scanning path: ", path.toNativeString());
1475 			//writeln("m_dwFiles length: ", m_dwFiles.length);
1476 		
1477 			// get a list of the folder
1478 			foreach (de; dirEntries(path.toNativeString(), SpanMode.shallow)) {
1479 				//writeln(de.name);
1480 				Path entryPath = Path(de.name);
1481 				if (!entryPath.absolute)
1482 					entryPath = path ~ entryPath;
1483 				bool found;
1484 
1485 				// compare it to the cached list fixme: make it faster using another container?
1486 				foreach (ref const fd_t id, ref const DWFileInfo file; m_dwFiles) {
1487 					if (file.folder != wd) continue; // this file isn't in the evented folder
1488 					if (file.path == entryPath) {
1489 						found = true;
1490 						log("File modified? " ~ entryPath.toNativeString() ~ " at: " ~ de.timeLastModified.to!string ~ " vs: " ~ file.lastModified.to!string);
1491 						// Check if it was modified
1492 						if (!isDir(entryPath.toNativeString()) && de.timeLastModified > file.lastModified)
1493 						{
1494 							DWFileInfo dwf = file;
1495 							dwf.lastModified = de.timeLastModified;
1496 							m_dwFiles[id] = dwf;
1497 							changes.insertBack(DWChangeInfo(DWFileEvent.MODIFIED, file.path));
1498 						}
1499 						break;
1500 					}
1501 				}
1502 
1503 				// This file/folder is new in the folder
1504 				if (!found) {
1505 					changes.insertBack(DWChangeInfo(DWFileEvent.CREATED, entryPath));
1506 
1507 					if (fi.wi.recursive && isDir(entryPath.toNativeString())) {
1508 						/// This is the complicated part. The folder needs to be watched, and all the events
1509 						/// generated for every file/folder found recursively inside it,
1510 						/// Useful e.g. when mkdir -p is used.
1511 						watch(fd, WatchInfo(fi.wi.events, entryPath, fi.wi.recursive, wd) );
1512 						void genEvents(Path subpath) {
1513 							foreach (de; dirEntries(subpath.toNativeString(), SpanMode.shallow)) {
1514 								auto subsubpath = Path(de.name);
1515 								if (!subsubpath.absolute())
1516 									subsubpath = subpath ~ subsubpath;
1517 								changes.insertBack(DWChangeInfo(DWFileEvent.CREATED, subsubpath));
1518 								if (isDir(subsubpath.toNativeString()))
1519 									genEvents(subsubpath);
1520 							}
1521 						}
1522 
1523 						genEvents(entryPath);
1524 
1525 					}
1526 					else {
1527 						EventInfo* evinfo;
1528 						try evinfo = m_watchers[fd]; catch { assert(false, "Could retrieve event info, directory watcher was not initialized properly, or you are operating on a closed directory watcher."); }
1529 
1530 						log("Adding path: " ~ path.toNativeString());
1531 
1532 						import core.sys.posix.fcntl : open;
1533 						fd_t fwd = open(entryPath.toNativeString().toStringz, O_EVTONLY);
1534 						if (catchError!"open(watch)"(fwd))
1535 							return 0;
1536 
1537 						kevent_t _event;
1538 						
1539 						EV_SET(&_event, fwd, EVFILT_VNODE, EV_ADD | EV_CLEAR, fi.wi.events, 0, cast(void*) evinfo);
1540 						
1541 						int err = kevent(m_kqueuefd, &_event, 1, null, 0, null);
1542 
1543 						if (catchError!"kevent_timer_add"(err))
1544 							return 0;
1545 
1546 						m_dwFiles[fwd] = DWFileInfo(fi.wi.wd, entryPath, de.timeLastModified, false);
1547 						
1548 					}
1549 				}
1550 
1551 				// This file/folder is now current. This avoids a deletion event.
1552 				currFiles.insert(entryPath);
1553 			}
1554 
1555 			/// Now search for files/folders that were deleted in this directory (no recursivity needed). 
1556 			/// Unwatch this directory and generate delete event only for the root dir
1557 			foreach (ref const fd_t id, ref const DWFileInfo file; m_dwFiles) {
1558 				if (file.folder != wd) continue; // skip those files in another folder than the evented one
1559 				bool found;
1560 				foreach (Path curr; currFiles) {
1561 					if (file.path == curr){
1562 						found = true;
1563 						break;
1564 					}
1565 				}
1566 				// this file/folder was in the folder but it's not there anymore
1567 				if (!found) {
1568 					// writeln("Deleting: ", file.path.toNativeString());
1569 					kevent_t _event;
1570 					EV_SET(&_event, cast(int) id, EVFILT_VNODE, EV_DELETE, 0, 0, null);
1571 					int err = kevent(m_kqueuefd, &_event, 1, null, 0, null);
1572 					if (catchError!"kevent_unwatch"(err))
1573 						return false;
1574 					import core.sys.posix.unistd : close;
1575 					err = close(id);
1576 					if (catchError!"close(dwFile)"(err))
1577 						return false;
1578 					changes.insert(DWChangeInfo(DWFileEvent.DELETED, file.path));
1579 
1580 					if (fi.wi.recursive && file.is_dir) 
1581 						unwatch(fd, id);
1582 
1583 					m_dwFiles.remove(id);
1584 
1585 				}
1586 				
1587 			}
1588 			if(changes.empty)
1589 				return false; // unhandled event, skip the callback
1590 
1591 			// fixme: how to implement moved_from moved_to for rename?
1592 		}
1593 		catch (Exception e) 
1594 		{
1595 			try setInternalError!"compareFiles"(Status.ERROR, "Fatal error in file comparison: " ~ e.toString()); catch {}
1596 			return false;
1597 		}
1598 		return true;
1599 	}
1600 
1601 	// socket must not be connected
1602 	bool setNonBlock(fd_t fd) {
1603 		import core.sys.posix.fcntl : fcntl, F_GETFL, F_SETFL, O_NONBLOCK;
1604 		int flags = fcntl(fd, F_GETFL);
1605 		flags |= O_NONBLOCK;
1606 		int err = fcntl(fd, F_SETFL, flags);
1607 		if (catchError!"F_SETFL O_NONBLOCK"(err)) {
1608 			closeSocket(fd, false);
1609 			return false;
1610 		}
1611 		return true;
1612 	}
1613 	
1614 	bool onTCPAccept(fd_t fd, TCPAcceptHandler del, int events)
1615 	{
1616 		import libasync.internals.socket_compat : AF_INET, AF_INET6, socklen_t, accept4, accept;
1617 		enum O_NONBLOCK     = 0x800;    // octal    04000
1618 
1619 		static if (EPOLL) 
1620 		{
1621 			const uint epoll_events = cast(uint) events;
1622 			const bool incoming = cast(bool) (epoll_events & EPOLLIN);
1623 			const bool error = cast(bool) (epoll_events & EPOLLERR);
1624 		}
1625 		else 
1626 		{
1627 			const short kqueue_events = cast(short) (events >> 16);
1628 			const ushort kqueue_flags = cast(ushort) (events & 0xffff);
1629 			const bool incoming = cast(bool)(kqueue_events & EVFILT_READ);
1630 			const bool error = cast(bool)(kqueue_flags & EV_ERROR);
1631 		}
1632 		
1633 		if (incoming) { // accept incoming connection
1634 			do {
1635 				NetworkAddress addr;
1636 				addr.family = AF_INET;
1637 				socklen_t addrlen = addr.sockAddrLen;
1638 
1639 				bool ret;
1640 				static if (EPOLL) {
1641 					/// Accept the connection and create a client socket
1642 					fd_t csock = accept4(fd, addr.sockAddr, &addrlen, O_NONBLOCK);
1643 
1644 					if (catchError!".accept"(csock)) {
1645 						ret = false;
1646 						return ret;
1647 					}
1648 				} else /* if KQUEUE */ {
1649 					fd_t csock = accept(fd, addr.sockAddr, &addrlen);
1650 					
1651 					if (catchError!".accept"(csock)) {
1652 						ret = false;
1653 						return ret;
1654 					}
1655 					
1656 					// Make non-blocking so subsequent calls to recv/send return immediately
1657 					if (!setNonBlock(csock)) {
1658 						ret = false;
1659 						return ret;
1660 					}
1661 				}
1662 
1663 				// Set client address family based on address length
1664 				if (addrlen > addr.sockAddrLen)
1665 					addr.family = AF_INET6;
1666 				if (addrlen == socklen_t.init) {
1667 					setInternalError!"addrlen"(Status.ABORT);
1668 					import core.sys.posix.unistd : close;
1669 					close(csock);
1670 					{
1671 						ret = false;
1672 						return ret;
1673 					}
1674 				}
1675 
1676 				// Allocate a new connection handler object
1677 				AsyncTCPConnection conn;
1678 				try conn = FreeListObjectAlloc!AsyncTCPConnection.alloc(m_evLoop);
1679 				catch (Exception e){ assert(false, "Allocation failure"); }
1680 				conn.peer = addr;
1681 				conn.socket = csock;
1682 				conn.inbound = true;
1683 
1684 				nothrow bool closeClient() {
1685 					try FreeListObjectAlloc!AsyncTCPConnection.free(conn);
1686 					catch (Exception e){ assert(false, "Free failure"); }
1687 					closeSocket(csock, true, true);
1688 					{
1689 						ret = false;
1690 						return ret;
1691 					}
1692 				}
1693 
1694 				// Get the connection handler from the callback
1695 				TCPEventHandler evh;
1696 				try {
1697 					evh = del(conn);
1698 					if (evh == TCPEventHandler.init || !initTCPConnection(csock, conn, evh, true)) {
1699 						try log("Failed to connect"); catch {}
1700 						return closeClient();
1701 					}
1702 					try log("Connection Started with " ~ csock.to!string); catch {}
1703 				}
1704 				catch (Exception e) {
1705 					log("Close socket");
1706 					return closeClient();
1707 				}
1708 
1709 				// Announce connection state to the connection handler
1710 				try {
1711 					log("Connected to: " ~ addr.toString());
1712 					evh.conn.connected = true;
1713 					evh(TCPEvent.CONNECT);
1714 				}
1715 				catch (Exception e) {
1716 					setInternalError!"del@TCPEvent.CONNECT"(Status.ABORT);
1717 					{
1718 						ret = false;
1719 						return ret;
1720 					}
1721 				}
1722 			} while(true);
1723 
1724 		}
1725 		
1726 		if (error) { // socket failure
1727 			m_status.text = "listen socket error";
1728 			int err;
1729 			import libasync.internals.socket_compat : getsockopt, socklen_t, SOL_SOCKET, SO_ERROR;
1730 			socklen_t len = int.sizeof;
1731 			getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len);
1732 			m_error = cast(error_t) err;
1733 			m_status.code = Status.ABORT;
1734 			static if(LOG) log(m_status);
1735 
1736 			// call with null to announce a failure
1737 			try del(null);
1738 			catch(Exception e){ assert(false, "Failure calling TCPAcceptHandler(null)"); }
1739 
1740 			/// close the listener?
1741 			// closeSocket(fd, false);
1742 		}
1743 		return true;
1744 	}
1745 
1746 	bool onUDPTraffic(fd_t fd, UDPHandler del, int events) 
1747 	{
1748 		static if (EPOLL) 
1749 		{
1750 			const uint epoll_events = cast(uint) events;
1751 			const bool read = cast(bool) (epoll_events & EPOLLIN);
1752 			const bool write = cast(bool) (epoll_events & EPOLLOUT);
1753 			const bool error = cast(bool) (epoll_events & EPOLLERR);
1754 		}
1755 		else 
1756 		{
1757 			const short kqueue_events = cast(short) (events >> 16);
1758 			const ushort kqueue_flags = cast(ushort) (events & 0xffff);
1759 			const bool read = cast(bool) (kqueue_events & EVFILT_READ);
1760 			const bool write = cast(bool) (kqueue_events & EVFILT_WRITE);
1761 			const bool error = cast(bool) (kqueue_flags & EV_ERROR);
1762 		}
1763 
1764 		if (read) {
1765 			try {
1766 				del(UDPEvent.READ);
1767 			}
1768 			catch (Exception e) {
1769 				setInternalError!"del@UDPEvent.READ"(Status.ABORT);
1770 				return false;
1771 			}
1772 		}
1773 
1774 		if (write) { 
1775 
1776 			try {
1777 				del(UDPEvent.WRITE);
1778 			}
1779 			catch (Exception e) {
1780 				setInternalError!"del@UDPEvent.WRITE"(Status.ABORT);
1781 				return false;
1782 			}
1783 		}
1784 
1785 		if (error) // socket failure
1786 		{ 
1787 
1788 			import libasync.internals.socket_compat : socklen_t, getsockopt, SOL_SOCKET, SO_ERROR;
1789 			import core.sys.posix.unistd : close;
1790 			int err;
1791 			socklen_t errlen = err.sizeof;
1792 			getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &errlen);
1793 			setInternalError!"EPOLLERR"(Status.ABORT, null, cast(error_t)err);
1794 			close(fd);
1795 		}
1796 		
1797 		return true;
1798 	}
1799 
1800 	bool onTCPTraffic(fd_t fd, TCPEventHandler del, int events, AsyncTCPConnection conn) 
1801 	{
1802 		log("TCP Traffic at FD#" ~ fd.to!string);
1803 
1804 		static if (EPOLL) 
1805 		{
1806 			const uint epoll_events = cast(uint) events;
1807 			const bool connect = ((cast(bool) (epoll_events & EPOLLIN)) || (cast(bool) (epoll_events & EPOLLOUT))) && !conn.disconnecting && !conn.connected;
1808 			bool read = cast(bool) (epoll_events & EPOLLIN);
1809 			const bool write = cast(bool) (epoll_events & EPOLLOUT);
1810 			const bool error = cast(bool) (epoll_events & EPOLLERR);
1811 			const bool close = (cast(bool) (epoll_events & EPOLLRDHUP)) || (cast(bool) (events & EPOLLHUP));
1812 		}
1813 		else /* if KQUEUE */
1814 		{
1815 			const short kqueue_events = cast(short) (events >> 16);
1816 			const ushort kqueue_flags = cast(ushort) (events & 0xffff);
1817 			const bool connect = cast(bool) ((kqueue_events & EVFILT_READ || kqueue_events & EVFILT_WRITE) && !conn.disconnecting && !conn.connected);
1818 			bool read = cast(bool) (kqueue_events & EVFILT_READ) && !connect;
1819 			const bool write = cast(bool) (kqueue_events & EVFILT_WRITE);
1820 			const bool error = cast(bool) (kqueue_flags & EV_ERROR);
1821 			const bool close = cast(bool) (kqueue_flags & EV_EOF);
1822 		}
1823 
1824 		if (error) 
1825 		{
1826 			import libasync.internals.socket_compat : socklen_t, getsockopt, SOL_SOCKET, SO_ERROR;
1827 			int err;
1828 			try log("Also got events: " ~ connect.to!string ~ " c " ~ read.to!string ~ " r " ~ write.to!string ~ " write"); catch {}
1829 			socklen_t errlen = err.sizeof;
1830 			getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &errlen);
1831 			setInternalError!"EPOLLERR"(Status.ABORT, null, cast(error_t)err);
1832 			try 
1833 				del(TCPEvent.ERROR);
1834 			catch (Exception e)
1835 			{ 
1836 				setInternalError!"del@TCPEvent.ERROR"(Status.ABORT);
1837 				// ignore failure...
1838 			}
1839 			return false;
1840 		}
1841 
1842 		
1843 		if (connect) 
1844 		{
1845 			try log("!connect"); catch {}
1846 			conn.connected = true;
1847 			try del(TCPEvent.CONNECT);
1848 			catch (Exception e) {
1849 				setInternalError!"del@TCPEvent.CONNECT"(Status.ABORT);
1850 				return false;
1851 			}
1852 			return true;
1853 		}
1854 
1855 
1856 		if (write && conn.connected && !conn.disconnecting && conn.writeBlocked) 
1857 		{
1858 			conn.writeBlocked = false;
1859 			try log("!write"); catch {}
1860 			try del(TCPEvent.WRITE);
1861 			catch (Exception e) {
1862 				setInternalError!"del@TCPEvent.WRITE"(Status.ABORT);
1863 				return false;
1864 			}
1865 		}
1866 		else {
1867 			read = true;
1868 		}
1869 
1870 		if (read && conn.connected && !conn.disconnecting)
1871 		{
1872 			try log("!read"); catch {}
1873 			try del(TCPEvent.READ);
1874 			catch (Exception e) {
1875 				setInternalError!"del@TCPEvent.READ"(Status.ABORT);
1876 				return false;
1877 			}
1878 		}
1879 
1880 		if (close && conn.connected && !conn.disconnecting) 
1881 		{
1882 			try log("!close"); catch {}
1883 			// todo: See if this hack is still necessary
1884 			if (!conn.connected && conn.disconnecting)
1885 				return true;
1886 			
1887 			try del(TCPEvent.CLOSE);
1888 			catch (Exception e) {
1889 				setInternalError!"del@TCPEvent.CLOSE"(Status.ABORT);
1890 				return false;
1891 			}
1892 			closeSocket(fd, !conn.disconnecting, conn.connected);
1893 			
1894 			m_status.code = Status.ABORT;
1895 			conn.disconnecting = true;
1896 			conn.connected = false;
1897 			conn.writeBlocked = true;
1898 			del.conn.socket = 0;
1899 			
1900 			try FreeListObjectAlloc!EventInfo.free(del.conn.evInfo);
1901 			catch (Exception e){ assert(false, "Error freeing resources"); }
1902 			
1903 			if (del.conn.inbound) {
1904 				log("Freeing inbound connection");
1905 				try FreeListObjectAlloc!AsyncTCPConnection.free(del.conn);
1906 				catch (Exception e){ assert(false, "Error freeing resources"); }
1907 			}
1908 		}
1909 		return true;
1910 	}
1911 	
1912 	bool initUDPSocket(fd_t fd, AsyncUDPSocket ctxt, UDPHandler del)
1913 	{
1914 		import libasync.internals.socket_compat : bind;
1915 		import core.sys.posix.unistd;
1916 
1917 		fd_t err;
1918 
1919 		EventObject eo;
1920 		eo.udpHandler = del;
1921 		EventInfo* ev;
1922 		try ev = FreeListObjectAlloc!EventInfo.alloc(fd, EventType.UDPSocket, eo, m_instanceId);
1923 		catch (Exception e){ assert(false, "Allocation error"); }
1924 		ctxt.evInfo = ev;
1925 		nothrow bool closeAll() {
1926 			try FreeListObjectAlloc!EventInfo.free(ev);
1927 			catch(Exception e){ assert(false, "Failed to free resources"); }
1928 			ctxt.evInfo = null;
1929 			// socket will be closed by caller if return false
1930 			return false;
1931 		}
1932 
1933 		static if (EPOLL)
1934 		{
1935 			epoll_event _event;
1936 			_event.data.ptr = ev;
1937 			_event.events = EPOLLIN | EPOLLOUT | EPOLLET;
1938 			err = epoll_ctl(m_epollfd, EPOLL_CTL_ADD, fd, &_event);
1939 			if (catchError!"epoll_ctl"(err)) {
1940 				return closeAll();
1941 			}
1942 			nothrow void deregisterEvent() {}
1943 		}
1944 		else /* if KQUEUE */
1945 		{
1946 			kevent_t[2] _event;
1947 			EV_SET(&(_event[0]), fd, EVFILT_READ, EV_ADD | EV_ENABLE, 0, 0, ev);
1948 			EV_SET(&(_event[1]), fd, EVFILT_WRITE, EV_ADD | EV_ENABLE, 0, 0, ev);
1949 			err = kevent(m_kqueuefd, &(_event[0]), 2, null, 0, null);
1950 			if (catchError!"kevent_add_udp"(err))
1951 				return closeAll();
1952 			
1953 			nothrow void deregisterEvent() {
1954 				EV_SET(&(_event[0]), fd, EVFILT_READ, EV_DELETE | EV_DISABLE, 0, 0, null);
1955 				EV_SET(&(_event[1]), fd, EVFILT_WRITE, EV_DELETE | EV_DISABLE, 0, 0, null);
1956 				kevent(m_kqueuefd, &(_event[0]), 2, null, 0, cast(libasync.internals.kqueue.timespec*) null);
1957 			}
1958 
1959 		}
1960 
1961 		/// Start accepting packets
1962 		err = bind(fd, ctxt.local.sockAddr, ctxt.local.sockAddrLen);
1963 		if (catchError!"bind"(err)) {
1964 			deregisterEvent();
1965 			return closeAll();
1966 		}
1967 
1968 		return true;
1969 	}
1970 
1971 
1972 	bool initTCPListener(fd_t fd, AsyncTCPListener ctxt, TCPAcceptHandler del, bool reusing = false)
1973 	in {
1974 		assert(ctxt.local !is NetworkAddress.init);
1975 	}
1976 	body {
1977 		import libasync.internals.socket_compat : bind, listen, SOMAXCONN;
1978 		fd_t err;
1979 
1980 		/// Create callback object
1981 		EventObject eo;
1982 		eo.tcpAcceptHandler = del;
1983 		EventInfo* ev;
1984 
1985 		try ev = FreeListObjectAlloc!EventInfo.alloc(fd, EventType.TCPAccept, eo, m_instanceId);
1986 		catch (Exception e){ assert(false, "Allocation error"); }
1987 		ctxt.evInfo = ev;
1988 		nothrow bool closeAll() {
1989 			try FreeListObjectAlloc!EventInfo.free(ev);
1990 			catch(Exception e){ assert(false, "Failed free"); }
1991 			ctxt.evInfo = null;
1992 			// Socket is closed by run()
1993 			//closeSocket(fd, false);
1994 			return false;
1995 		}
1996 
1997 		/// Add socket to event loop
1998 		static if (EPOLL)
1999 		{
2000 			epoll_event _event;
2001 			_event.data.ptr = ev;
2002 			_event.events = EPOLLIN | EPOLLET;
2003 			err = epoll_ctl(m_epollfd, EPOLL_CTL_ADD, fd, &_event);
2004 			if (catchError!"epoll_ctl_add"(err))
2005 				return closeAll();
2006 
2007 			nothrow void deregisterEvent() {
2008 				// epoll cleans itself when closing the socket
2009 			}
2010 		}
2011 		else /* if KQUEUE */
2012 		{
2013 			kevent_t _event;
2014 			EV_SET(&_event, fd, EVFILT_READ, EV_ADD | EV_ENABLE, 0, 0, ev);
2015 			err = kevent(m_kqueuefd, &_event, 1, null, 0, null);
2016 			if (catchError!"kevent_add_listener"(err))
2017 				return closeAll();
2018 
2019 			nothrow void deregisterEvent() {
2020 				EV_SET(&_event, fd, EVFILT_READ, EV_CLEAR | EV_DISABLE, 0, 0, null);
2021 				kevent(m_kqueuefd, &_event, 1, null, 0, null);
2022 				// wouldn't know how to deal with errors here...
2023 			}
2024 		}
2025 
2026 		/// Bind and listen to socket
2027 		if (!reusing) {
2028 			err = bind(fd, ctxt.local.sockAddr, ctxt.local.sockAddrLen);
2029 			if (catchError!"bind"(err)) {
2030 				deregisterEvent();
2031 				return closeAll();
2032 			}
2033 
2034 			err = listen(fd, SOMAXCONN);
2035 			if (catchError!"listen"(err)) {
2036 				deregisterEvent();
2037 				return closeAll();
2038 			}
2039 
2040 		}
2041 		return true;
2042 	}
2043 
2044 	bool initTCPConnection(fd_t fd, AsyncTCPConnection ctxt, TCPEventHandler del, bool inbound = false)
2045 	in { 
2046 		assert(ctxt.peer.port != 0, "Connecting to an invalid port");
2047 	}
2048 	body {
2049 
2050 		fd_t err;
2051 
2052 		/// Create callback object
2053 		import libasync.internals.socket_compat : connect;
2054 		EventObject eo;
2055 		eo.tcpEvHandler = del;
2056 		EventInfo* ev;
2057 
2058 		try ev = FreeListObjectAlloc!EventInfo.alloc(fd, EventType.TCPTraffic, eo, m_instanceId);
2059 		catch (Exception e){ assert(false, "Allocation error"); }
2060 		assert(ev !is null);
2061 		ctxt.evInfo = ev;
2062 		nothrow bool destroyEvInfo() {
2063 			try FreeListObjectAlloc!EventInfo.free(ev);
2064 			catch(Exception e){ assert(false, "Failed to free resources"); }
2065 			ctxt.evInfo = null;
2066 
2067 			// Socket will be closed by run()
2068 			// closeSocket(fd, false);
2069 			return false;
2070 		}
2071 
2072 		/// Add socket and callback object to event loop
2073 		static if (EPOLL)
2074 		{
2075 			epoll_event _event = void;
2076 			_event.data.ptr = ev;
2077 			_event.events = 0 | EPOLLIN | EPOLLOUT | EPOLLERR | EPOLLHUP | EPOLLRDHUP | EPOLLET;
2078 			err = epoll_ctl(m_epollfd, EPOLL_CTL_ADD, fd, &_event);
2079 			log("Connection FD#" ~ fd.to!string ~ " added to " ~ m_epollfd.to!string);
2080 			if (catchError!"epoll_ctl_add"(err))
2081 				return destroyEvInfo();
2082 
2083 			nothrow void deregisterEvent() {
2084 				// will be handled automatically when socket is closed
2085 			}
2086 		}
2087 		else /* if KQUEUE */
2088 		{
2089 			kevent_t[2] events = void;
2090 			try log("Register event ptr " ~ ev.to!string); catch {}
2091 			assert(ev.evType == EventType.TCPTraffic, "Bad event type for TCP Connection");
2092 			EV_SET(&(events[0]), fd, EVFILT_READ, EV_ADD | EV_ENABLE | EV_CLEAR, 0, 0, cast(void*) ev);
2093 			EV_SET(&(events[1]), fd, EVFILT_WRITE, EV_ADD | EV_ENABLE | EV_CLEAR, 0, 0, cast(void*) ev);
2094 			assert((cast(EventInfo*)events[0].udata) == ev && (cast(EventInfo*)events[1].udata) == ev);
2095 			assert((cast(EventInfo*)events[0].udata).owner == m_instanceId && (cast(EventInfo*)events[1].udata).owner == m_instanceId);
2096 			err = kevent(m_kqueuefd, &(events[0]), 2, null, 0, null);
2097 			if (catchError!"kevent_add_tcp"(err))
2098 				return destroyEvInfo();
2099 
2100 			// todo: verify if this allocates on the GC?
2101 			nothrow void deregisterEvent() {
2102 				EV_SET(&(events[0]), fd, EVFILT_READ, EV_DELETE | EV_DISABLE, 0, 0, null);
2103 				EV_SET(&(events[1]), fd, EVFILT_WRITE, EV_DELETE | EV_DISABLE, 0, 0, null);
2104 				kevent(m_kqueuefd, &(events[0]), 2, null, 0, null);
2105 				// wouldn't know how to deal with errors here...
2106 			}
2107 		}
2108 
2109 		// Inbound objects are already connected
2110 		if (inbound) return true;
2111 
2112 		// Connect is blocking, but this makes the socket non-blocking for send/recv
2113 		if (!setNonBlock(fd)) {
2114 			deregisterEvent();
2115 			return destroyEvInfo();
2116 		}
2117 
2118 		/// Start the connection
2119 		err = connect(fd, ctxt.peer.sockAddr, ctxt.peer.sockAddrLen);
2120 		if (catchErrorsEq!"connect"(err, [ tuple(cast(fd_t)SOCKET_ERROR, EPosix.EINPROGRESS, Status.ASYNC) ]))
2121 			return true;
2122 		if (catchError!"connect"(err)) {
2123 			deregisterEvent();
2124 			return destroyEvInfo();
2125 		}
2126 
2127 		return true;
2128 	}
2129 
2130 	bool catchError(string TRACE, T)(T val, T cmp = SOCKET_ERROR)
2131 		if (isIntegral!T)
2132 	{
2133 		if (val == cmp) {
2134 			m_status.text = TRACE;
2135 			m_error = lastError();
2136 			m_status.code = Status.ABORT;
2137 			static if(LOG) log(m_status);
2138 			return true;
2139 		}
2140 		return false;
2141 	}
2142 
2143 	bool catchSocketError(string TRACE)(fd_t fd)
2144 	{
2145 		m_status.text = TRACE;
2146 		int err;
2147 		import libasync.internals.socket_compat : getsockopt, socklen_t, SOL_SOCKET, SO_ERROR;
2148 		socklen_t len = int.sizeof;
2149 		getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len);
2150 		m_error = cast(error_t) err;
2151 		if (m_error != EPosix.EOK) {
2152 			m_status.code = Status.ABORT;
2153 			static if(LOG) log(m_status);
2154 			return true;
2155 		}
2156 
2157 		return false;
2158 	}
2159 
2160 	bool catchEvLoopErrors(string TRACE, T)(T val, Tuple!(T, Status)[] cmp ...)
2161 		if (isIntegral!T)
2162 	{
2163 		if (val == SOCKET_ERROR) {
2164 			int err = errno;
2165 			foreach (validator ; cmp) {
2166 				if (errno == validator[0]) {
2167 					m_status.text = TRACE;
2168 					m_error = lastError();
2169 					m_status.code = validator[1];
2170 					static if(LOG) log(m_status);
2171 					return true;
2172 				}
2173 			}
2174 
2175 			m_status.text = TRACE;
2176 			m_status.code = Status.EVLOOP_FAILURE;
2177 			m_error = lastError();
2178 			log(m_status);
2179 			return true;
2180 		}
2181 		return false;
2182 	}
2183 
2184 	/**
2185 	 * If the value at val matches the tuple first argument T, get the last error,
2186 	 * and if the last error matches tuple second argument error_t, set the Status as
2187 	 * tuple third argument Status.
2188 	 * 
2189 	 * Repeats for each comparison tuple until a match in which case returns true.
2190 	*/
2191 	bool catchErrorsEq(string TRACE, T)(T val, Tuple!(T, error_t, Status)[] cmp ...)
2192 		if (isIntegral!T)
2193 	{
2194 		error_t err;
2195 		foreach (validator ; cmp) {
2196 			if (val == validator[0]) {
2197 				if (err is EPosix.init) err = lastError();
2198 				if (err == validator[1]) {
2199 					m_status.text = TRACE;
2200 					m_status.code = validator[2];
2201 					if (m_status.code == Status.EVLOOP_TIMEOUT) {
2202 						log(m_status);
2203 						break;
2204 					}
2205 					m_error = lastError();
2206 					static if(LOG) log(m_status);
2207 					return true;
2208 				}
2209 			}
2210 		}
2211 		return false;
2212 	}
2213 
2214 
2215 	error_t lastError() {
2216 		try {
2217 			return cast(error_t) errno;
2218 		} catch(Exception e) {
2219 			return EPosix.EACCES;
2220 		}
2221 
2222 	}
2223 	
2224 	void log(StatusInfo val)
2225 	{
2226 		static if (LOG) {
2227 			import std.stdio;
2228 			try {
2229 				writeln("Backtrace: ", m_status.text);
2230 				writeln(" | Status:  ", m_status.code);
2231 				writeln(" | Error: " , m_error);
2232 				if ((m_error in EPosixMessages) !is null)
2233 					writeln(" | Message: ", EPosixMessages[m_error]);
2234 			} catch(Exception e) {
2235 				return;
2236 			}
2237 		}
2238 	}
2239 
2240 	void log(T)(T val)
2241 	{
2242 		static if (LOG) {
2243 			import std.stdio;
2244 			try {
2245 				writeln(val);
2246 			} catch(Exception e) {
2247 				return;
2248 			}
2249 		}
2250 	}
2251 
2252 	NetworkAddress getAddressInfo(addrinfo)(in string host, ushort port, bool ipv6, bool tcp, ref addrinfo hints) 
2253 	{
2254 		m_status = StatusInfo.init;
2255 		import libasync.internals.socket_compat : AF_INET, AF_INET6, SOCK_DGRAM, SOCK_STREAM, IPPROTO_TCP, IPPROTO_UDP, freeaddrinfo, getaddrinfo;
2256 
2257 		NetworkAddress addr;
2258 		addrinfo* infos;
2259 		error_t err;
2260 		if (ipv6) {
2261 			addr.family = AF_INET6;
2262 			hints.ai_family = AF_INET6;
2263 		}
2264 		else {
2265 			addr.family = AF_INET;
2266 			hints.ai_family = AF_INET;
2267 		}
2268 		if (tcp) {
2269 			hints.ai_socktype = SOCK_STREAM;
2270 			hints.ai_protocol = IPPROTO_TCP;
2271 		}
2272 		else {
2273 			hints.ai_socktype = SOCK_DGRAM;
2274 			hints.ai_protocol = IPPROTO_UDP;
2275 		}
2276 
2277 		static if (LOG) {
2278 			log("Resolving " ~ host ~ ":" ~ port.to!string);
2279 		}
2280 
2281 		auto chost = host.toStringz();
2282 
2283 		if (port != 0) {
2284 			addr.port = port;
2285 			const(char)* cPort = cast(const(char)*) port.to!string.toStringz;
2286 			err = cast(error_t) getaddrinfo(chost, cPort, &hints, &infos);
2287 		}
2288 		else {
2289 			err = cast(error_t) getaddrinfo(chost, null, &hints, &infos);
2290 		}
2291 
2292 		if (err != EPosix.EOK) {
2293 			setInternalError!"getAddressInfo"(Status.ERROR, string.init, err);
2294 			return NetworkAddress.init;
2295 		}
2296 		ubyte* pAddr = cast(ubyte*) infos.ai_addr;
2297 		ubyte* data = cast(ubyte*) addr.sockAddr;
2298 		data[0 .. infos.ai_addrlen] = pAddr[0 .. infos.ai_addrlen]; // perform bit copy
2299 		freeaddrinfo(infos);
2300 		return addr;
2301 	}
2302 
2303 
2304 	
2305 }
2306 
2307 
2308 static if (!EPOLL)
2309 {
2310 	import std.container : Array;
2311 	import core.sync.mutex : Mutex;
2312 	import core.sync.rwmutex : ReadWriteMutex;
2313 	size_t g_evIdxCapacity;
2314 	Array!size_t g_evIdxAvailable;
2315 
2316 	// called on run
2317 	nothrow size_t createIndex() {
2318 		size_t idx;
2319 		import std.algorithm : max;
2320 		try {
2321 			
2322 			size_t getIdx() {
2323 				
2324 				if (!g_evIdxAvailable.empty) {
2325 					immutable size_t ret = g_evIdxAvailable.back;
2326 					g_evIdxAvailable.removeBack();
2327 					return ret;
2328 				}
2329 				return 0;
2330 			}
2331 			
2332 			idx = getIdx();
2333 			if (idx == 0) {
2334 				import std.range : iota;
2335 				g_evIdxAvailable.insert( iota(g_evIdxCapacity, max(32, g_evIdxCapacity * 2), 1) );
2336 				g_evIdxCapacity = max(32, g_evIdxCapacity * 2);
2337 				idx = getIdx();
2338 			}
2339 			
2340 		} catch (Throwable e) {
2341 
2342 			import std.stdio;
2343 			
2344 			try writeln(e.toString()); catch {}
2345 
2346 		}
2347 		return idx;
2348 	}
2349 
2350 	nothrow void destroyIndex(AsyncNotifier ctxt) {
2351 		try {
2352 			g_evIdxAvailable.insert(ctxt.id);		
2353 		}
2354 		catch (Exception e) {
2355 			assert(false, "Error destroying index: " ~ e.msg);
2356 		}
2357 	}
2358 
2359 	nothrow void destroyIndex(AsyncTimer ctxt) {
2360 		try {
2361 			g_evIdxAvailable.insert(ctxt.id);		
2362 		}
2363 		catch (Exception e) {
2364 			assert(false, "Error destroying index: " ~ e.msg);
2365 		}
2366 	}
2367 
2368 	size_t* g_threadId;
2369 	size_t g_idxCapacity;
2370 	Array!size_t g_idxAvailable;
2371 
2372 	__gshared ReadWriteMutex gs_queueMutex;
2373 	__gshared Array!(Array!AsyncSignal) gs_signalQueue;
2374 	__gshared Array!(Array!size_t) gs_idxQueue; // signals notified
2375 
2376 
2377 	// loop
2378 	nothrow bool popSignals(ref AsyncSignal[] sigarr) {
2379 		bool more;
2380 		try {
2381 			foreach (ref AsyncSignal sig; sigarr) {
2382 				if (!sig)
2383 					break;
2384 				sig = null;
2385 			}
2386 			size_t len;
2387 			synchronized(gs_queueMutex.reader) {
2388 
2389 				if (gs_idxQueue.length <= *g_threadId || gs_idxQueue[*g_threadId].empty)
2390 					return false;
2391 
2392 				len = gs_idxQueue[*g_threadId].length;
2393 				import std.stdio;
2394 				if (sigarr.length < len) {
2395 					more = true;
2396 					len = sigarr.length;
2397 				}
2398 
2399 				size_t i;
2400 				foreach (size_t idx; gs_idxQueue[*g_threadId][0 .. len]){
2401 					sigarr[i] = gs_signalQueue[*g_threadId][idx];
2402 					i++;
2403 				}
2404 			}
2405 
2406 			synchronized (gs_queueMutex.writer) {
2407 				gs_idxQueue[*g_threadId].linearRemove(gs_idxQueue[*g_threadId][0 .. len]);
2408 			}
2409 		}
2410 		catch (Exception e) {
2411 			assert(false, "Could not get pending signals: " ~ e.msg);
2412 		}
2413 		return more;
2414 	}
2415 
2416 	// notify
2417 	nothrow void addSignal(shared AsyncSignal ctxt) {
2418 		try {
2419 			size_t thread_id = ctxt.threadId;
2420 			bool must_resize;
2421 			import std.stdio;
2422 			synchronized (gs_queueMutex.writer) {
2423 				if (gs_idxQueue.empty || gs_idxQueue.length < thread_id + 1) {
2424 					gs_idxQueue.reserve(thread_id + 1);
2425 					foreach (i; gs_idxQueue.length .. gs_idxQueue.capacity) {
2426 						gs_idxQueue.insertBack(Array!size_t.init);
2427 					}
2428 				}
2429 				if (gs_idxQueue[thread_id].empty)
2430 				{
2431 					gs_idxQueue[thread_id].reserve(32);
2432 				}
2433 
2434 				gs_idxQueue[thread_id].insertBack(ctxt.id);
2435 
2436 			}
2437 
2438 		}
2439 		catch (Exception e) {
2440 			assert(false, "Array error: " ~ e.msg);
2441 		}
2442 	}
2443 
2444 	// called on run
2445 	nothrow size_t createIndex(shared AsyncSignal ctxt) {
2446 		size_t idx;
2447 		import std.algorithm : max;
2448 		try {
2449 			bool must_resize;
2450 
2451 			synchronized (gs_queueMutex.reader) {
2452 				if (gs_signalQueue.length < *g_threadId)
2453 					must_resize = true;
2454 			}
2455 
2456 			/// make sure the signal queue is big enough for this thread ID
2457 			if (must_resize) {
2458 				synchronized (gs_queueMutex.writer) {
2459 					while (gs_signalQueue.length <= *g_threadId) 
2460 						gs_signalQueue.insertBack(Array!AsyncSignal.init);
2461 				}
2462 			}
2463 
2464 			size_t getIdx() {
2465 
2466 				if (!g_idxAvailable.empty) {
2467 					immutable size_t ret = g_idxAvailable.back;
2468 					g_idxAvailable.removeBack();
2469 					return ret;
2470 				}
2471 				return 0;
2472 			}
2473 
2474 			idx = getIdx();
2475 			if (idx == 0) {
2476 				import std.range : iota;
2477 				g_idxAvailable.insert( iota(g_idxCapacity,  max(32, g_idxCapacity * 2), 1) );
2478 				g_idxCapacity = max(32, g_idxCapacity * 2);
2479 				idx = getIdx();
2480 			}
2481 
2482 			synchronized (gs_queueMutex.writer) {
2483 				if (gs_signalQueue.empty || gs_signalQueue.length < *g_threadId + 1) {
2484 					
2485 					gs_signalQueue.reserve(*g_threadId + 1);
2486 					foreach (i; gs_signalQueue.length .. gs_signalQueue.capacity) {
2487 						gs_signalQueue.insertBack(Array!AsyncSignal.init);
2488 					}
2489 					
2490 				}
2491 
2492 				if (gs_signalQueue[*g_threadId].empty || gs_signalQueue[*g_threadId].length < idx + 1) {
2493 					
2494 					gs_signalQueue[*g_threadId].reserve(idx + 1);
2495 					foreach (i; gs_signalQueue[*g_threadId].length .. gs_signalQueue[*g_threadId].capacity) {
2496 						gs_signalQueue[*g_threadId].insertBack(cast(AsyncSignal)null);
2497 					}
2498 					
2499 				}
2500 
2501 				gs_signalQueue[*g_threadId][idx] = cast(AsyncSignal) ctxt;
2502 			}
2503 		} catch {}
2504 
2505 		return idx;
2506 	}
2507 
2508 	// called on kill
2509 	nothrow void destroyIndex(shared AsyncSignal ctxt) {
2510 		try {
2511 			g_idxAvailable.insert(ctxt.id);
2512 			synchronized (gs_queueMutex.writer) {
2513 				gs_signalQueue[*g_threadId][ctxt.id] = null;
2514 			}
2515 		}
2516 		catch (Exception e) {
2517 			assert(false, "Error destroying index: " ~ e.msg);
2518 		}
2519 	}
2520 }
2521 
2522 mixin template TCPConnectionMixins() {
2523 	
2524 	private CleanupData m_impl;
2525 	
2526 	struct CleanupData {
2527 		EventInfo* evInfo;
2528 		bool connected;
2529 		bool disconnecting;
2530 		bool writeBlocked;
2531 	}
2532 	
2533 	@property bool disconnecting() const {
2534 		return m_impl.disconnecting;
2535 	}
2536 
2537 	@property void disconnecting(bool b) {
2538 		m_impl.disconnecting = b;
2539 	}
2540 	
2541 	@property bool connected() const {
2542 		return m_impl.connected;
2543 	}
2544 
2545 	@property void connected(bool b) {
2546 		m_impl.connected = b;
2547 	}
2548 
2549 	@property bool writeBlocked() const {
2550 		return m_impl.writeBlocked;
2551 	}
2552 
2553 	@property void writeBlocked(bool b) {
2554 		m_impl.writeBlocked = b;
2555 	}
2556 
2557 	@property EventInfo* evInfo() {
2558 		return m_impl.evInfo;
2559 	}
2560 	
2561 	@property void evInfo(EventInfo* info) {
2562 		m_impl.evInfo = info;
2563 	}
2564 	
2565 }
2566 
2567 mixin template EvInfoMixinsShared() {
2568 
2569 	private CleanupData m_impl;
2570 	
2571 	shared struct CleanupData {
2572 		EventInfo* evInfo;
2573 	}
2574 
2575 	static if (EPOLL) {
2576 		import core.sys.posix.pthread : pthread_t;
2577 		private pthread_t m_pthreadId;
2578 		synchronized @property pthread_t pthreadId() {
2579 			return cast(pthread_t) m_pthreadId;
2580 		}
2581 		/* todo: support multiple event loops per thread?
2582 		private ushort m_sigId;
2583 		synchronized @property ushort sigId() {
2584 			return cast(ushort)m_loopId;
2585 		}
2586 		synchronized @property void sigId(ushort id) {
2587 			m_loopId = cast(shared)id;
2588 		}
2589 		*/
2590 	} 
2591 	else /* if KQUEUE */
2592 	{
2593 		private shared(size_t)* m_owner_id;
2594 		synchronized @property size_t threadId() {
2595 			return cast(size_t) *m_owner_id;
2596 		}
2597 	}
2598 
2599 	@property shared(EventInfo*) evInfo() {
2600 		return m_impl.evInfo;
2601 	}
2602 	
2603 	@property void evInfo(shared(EventInfo*) info) {
2604 		m_impl.evInfo = info;
2605 	}
2606 
2607 }
2608 
2609 mixin template EvInfoMixins() {
2610 	
2611 	private CleanupData m_impl;
2612 	
2613 	struct CleanupData {
2614 		EventInfo* evInfo;
2615 	}
2616 	
2617 	@property EventInfo* evInfo() {
2618 		return m_impl.evInfo;
2619 	}
2620 	
2621 	@property void evInfo(EventInfo* info) {
2622 		m_impl.evInfo = info;
2623 	}
2624 }
2625 
2626 
2627 
2628 union EventObject {
2629 	TCPAcceptHandler tcpAcceptHandler;
2630 	TCPEventHandler tcpEvHandler;
2631 	TimerHandler timerHandler;
2632 	DWHandler dwHandler;
2633 	UDPHandler udpHandler;
2634 	NotifierHandler notifierHandler;
2635 }
2636 
2637 enum EventType : char {
2638 	TCPAccept,
2639 	TCPTraffic,
2640 	UDPSocket,
2641 	Notifier,
2642 	Signal,
2643 	Timer,
2644 	DirectoryWatcher
2645 }
2646 
2647 struct EventInfo {
2648 	fd_t fd;
2649 	EventType evType;
2650 	EventObject evObj;
2651 	ushort owner;
2652 }
2653 
2654 
2655 
2656 /**
2657 		Represents a network/socket address. (taken from vibe.core.net)
2658 */
2659 public struct NetworkAddress {
2660 	import libasync.internals.socket_compat : sockaddr, sockaddr_in, sockaddr_in6, AF_INET, AF_INET6;
2661 	private union {
2662 		sockaddr addr;
2663 		sockaddr_in addr_ip4;
2664 		sockaddr_in6 addr_ip6;
2665 	}
2666 
2667 	@property bool ipv6() const pure nothrow { return this.family == AF_INET6; }
2668 
2669 	/** Family (AF_) of the socket address.
2670 		*/
2671 	@property ushort family() const pure nothrow { return addr.sa_family; }
2672 	/// ditto
2673 	@property void family(ushort val) pure nothrow { addr.sa_family = cast(ubyte)val; }
2674 	
2675 	/** The port in host byte order.
2676 		*/
2677 	@property ushort port()
2678 	const pure nothrow {
2679 		switch (this.family) {
2680 			default: assert(false, "port() called for invalid address family.");
2681 			case AF_INET: return ntoh(addr_ip4.sin_port);
2682 			case AF_INET6: return ntoh(addr_ip6.sin6_port);
2683 		}
2684 	}
2685 	/// ditto
2686 	@property void port(ushort val)
2687 	pure nothrow {
2688 		switch (this.family) {
2689 			default: assert(false, "port() called for invalid address family.");
2690 			case AF_INET: addr_ip4.sin_port = hton(val); break;
2691 			case AF_INET6: addr_ip6.sin6_port = hton(val); break;
2692 		}
2693 	}
2694 	
2695 	/** A pointer to a sockaddr struct suitable for passing to socket functions.
2696 		*/
2697 	@property inout(sockaddr)* sockAddr() inout pure nothrow { return &addr; }
2698 	
2699 	/** Size of the sockaddr struct that is returned by sockAddr().
2700 		*/
2701 	@property int sockAddrLen()
2702 	const pure nothrow {
2703 		switch (this.family) {
2704 			default: assert(false, "sockAddrLen() called for invalid address family.");
2705 			case AF_INET: return addr_ip4.sizeof;
2706 			case AF_INET6: return addr_ip6.sizeof;
2707 		}
2708 	}
2709 	
2710 	@property inout(sockaddr_in)* sockAddrInet4() inout pure nothrow
2711 	in { assert (family == AF_INET); }
2712 	body { return &addr_ip4; }
2713 	
2714 	@property inout(sockaddr_in6)* sockAddrInet6() inout pure nothrow
2715 	in { assert (family == AF_INET6); }
2716 	body { return &addr_ip6; }
2717 	
2718 	/** Returns a string representation of the IP address
2719 		*/
2720 	string toAddressString()
2721 	const {
2722 		import std.array : appender;
2723 		import std.string : format;
2724 		import std.format : formattedWrite;
2725 		
2726 		switch (this.family) {
2727 			default: assert(false, "toAddressString() called for invalid address family.");
2728 			case AF_INET:
2729 				ubyte[4] ip = (cast(ubyte*)&addr_ip4.sin_addr.s_addr)[0 .. 4];
2730 				return format("%d.%d.%d.%d", ip[0], ip[1], ip[2], ip[3]);
2731 			case AF_INET6:
2732 				ubyte[16] ip = addr_ip6.sin6_addr.s6_addr;
2733 				auto ret = appender!string();
2734 				ret.reserve(40);
2735 				foreach (i; 0 .. 8) {
2736 					if (i > 0) ret.put(':');
2737 					ret.formattedWrite("%x", bigEndianToNative!ushort(cast(ubyte[2])ip[i*2 .. i*2+2]));
2738 				}
2739 				return ret.data;
2740 		}
2741 	}
2742 	
2743 	/** Returns a full string representation of the address, including the port number.
2744 		*/
2745 	string toString()
2746 	const {
2747 		
2748 		import std.string : format;
2749 		
2750 		auto ret = toAddressString();
2751 		switch (this.family) {
2752 			default: assert(false, "toString() called for invalid address family.");
2753 			case AF_INET: return ret ~ format(":%s", port);
2754 			case AF_INET6: return format("[%s]:%s", ret, port);
2755 		}
2756 	}
2757 	
2758 }
2759 
2760 private pure nothrow {
2761 	import std.bitmanip;
2762 	
2763 	ushort ntoh(ushort val)
2764 	{
2765 		version (LittleEndian) return swapEndian(val);
2766 		else version (BigEndian) return val;
2767 		else static assert(false, "Unknown endianness.");
2768 	}
2769 	
2770 	ushort hton(ushort val)
2771 	{
2772 		version (LittleEndian) return swapEndian(val);
2773 		else version (BigEndian) return val;
2774 		else static assert(false, "Unknown endianness.");
2775 	}
2776 }