1 module libasync.posix;
2 
3 version (Posix):
4 
5 import libasync.types;
6 import std.string : toStringz;
7 import std.conv : to;
8 import std.datetime : Duration, msecs, seconds, SysTime;
9 import std.traits : isIntegral;
10 import std.typecons : Tuple, tuple;
11 import std.container : Array;
12 
13 import core.stdc.errno;
14 import libasync.events;
15 import libasync.internals.path;
16 import core.sys.posix.signal;
17 import libasync.posix2;
18 import core.sync.mutex;
19 import memutils.utils;
20 import memutils.hashmap;
21 
22 enum SOCKET_ERROR = -1;
23 alias fd_t = int;
24 
25 
26 version(linux) {
27 	import libasync.internals.epoll;
28 	const EPOLL = true;
29 	extern(C) nothrow @nogc {
30 		int __libc_current_sigrtmin();
31 		int __libc_current_sigrtmax();
32 	}
33 	bool g_signalsBlocked;
34 	package nothrow void blockSignals() {
35 		try {
36 			/// Block signals to reserve SIGRTMIN .. " +30 for AsyncSignal
37 			sigset_t mask;
38 			// todo: use more signals for more event loops per thread.. (is this necessary?)
39 			//foreach (j; __libc_current_sigrtmin() .. __libc_current_sigrtmax() + 1) {
40 			//import std.stdio : writeln; 
41 			//try writeln("Blocked signal " ~ (__libc_current_sigrtmin() + j).to!string ~ " in instance " ~ m_instanceId.to!string); catch {}
42 			sigemptyset(&mask);
43 			sigaddset(&mask, cast(int) __libc_current_sigrtmin());
44 			pthread_sigmask(SIG_BLOCK, &mask, null);
45 			//}
46 		} catch {}
47 	}
48 	static this() {
49 		blockSignals();
50 		g_signalsBlocked = true;
51 	}
52 }
53 version(OSX) {
54 	import libasync.internals.kqueue;
55 	const EPOLL = false;
56 }
57 version(FreeBSD) {
58 	import libasync.internals.kqueue;
59 	const EPOLL = false;
60 }
61 
62 __gshared Mutex g_mutex;
63 
64 static if (!EPOLL) {
65 	private struct DWFileInfo {
66 		fd_t folder;
67 		Path path;
68 		SysTime lastModified;
69 		bool is_dir;
70 	}
71 }
72 
73 private struct DWFolderInfo {
74 	WatchInfo wi;
75 	fd_t fd;
76 }
77 
78 package struct EventLoopImpl {
79 	static if (EPOLL) {
80 		pragma(msg, "Using Linux EPOLL for events");
81 	}
82 	else /* if KQUEUE */
83 	{
84 		pragma(msg, "Using FreeBSD KQueue for events");
85 	}
86 
87 package:
88 	alias error_t = EPosix;
89 
90 nothrow:
91 private:
92 
93 	/// members
94 	EventLoop m_evLoop;
95 	ushort m_instanceId;
96 	bool m_started;
97 	StatusInfo m_status;
98 	error_t m_error = EPosix.EOK;
99 	EventInfo* m_evSignal;
100 	static if (EPOLL){
101 		fd_t m_epollfd;
102 		HashMap!(Tuple!(fd_t, uint), DWFolderInfo) m_dwFolders; // uint = inotify_add_watch(Path)
103 	}
104 	else /* if KQUEUE */
105 	{
106 		fd_t m_kqueuefd;
107 		HashMap!(fd_t, EventInfo*) m_watchers; // fd_t = id++ per AsyncDirectoryWatcher
108 		HashMap!(fd_t, DWFolderInfo) m_dwFolders; // fd_t = open(folder)
109 		HashMap!(fd_t, DWFileInfo) m_dwFiles; // fd_t = open(file)
110 		HashMap!(fd_t, Array!(DWChangeInfo)*) m_changes; // fd_t = id++ per AsyncDirectoryWatcher
111 
112 	}
113 
114 package:
115 	
116 	/// workaround for IDE indent bug on too big files
117 	mixin RunKill!();
118 
119 	@property bool started() const {
120 		return m_started;
121 	}
122 
123 	bool init(EventLoop evl)
124 	in { assert(!m_started); }
125 	body
126 	{
127 
128 		import core.atomic;
129 		shared static ushort i;
130 		string* failer = null;
131 
132 		
133 		m_instanceId = i;
134 		static if (!EPOLL) g_threadId = new size_t(cast(size_t)m_instanceId);
135 
136 		core.atomic.atomicOp!"+="(i, cast(ushort) 1);
137 		m_evLoop = evl;
138 
139 		import core.thread;
140 		try Thread.getThis().priority = Thread.PRIORITY_MAX;
141 		catch (Exception e) { assert(false, "Could not set thread priority"); }
142 
143 		try
144 			if (!g_mutex)
145 				g_mutex = new Mutex;
146 		catch {}
147 
148 		static if (EPOLL)
149 		{
150 
151 			if (!g_signalsBlocked)
152 				blockSignals();
153 			assert(m_instanceId <= __libc_current_sigrtmax(), "An additional event loop is unsupported due to SIGRTMAX restrictions in Linux Kernel");
154 			m_epollfd = epoll_create1(0);
155 
156 			if (catchError!"epoll_create1"(m_epollfd))
157 				return false;
158 			
159 			import core.sys.linux.sys.signalfd;
160 			import core.thread : getpid;
161 
162 			fd_t err;
163 			fd_t sfd;
164 
165 			sigset_t mask;
166 
167 			try {
168 				sigemptyset(&mask);
169 				sigaddset(&mask, __libc_current_sigrtmin());
170 				err = pthread_sigmask(SIG_BLOCK, &mask, null);
171 				if (catchError!"sigprocmask"(err))
172 				{
173 					m_status.code = Status.EVLOOP_FAILURE;
174 					return false;
175 				}
176 			} catch { }
177 
178 			
179 
180 			sfd = signalfd(-1, &mask, SFD_NONBLOCK);
181 			assert(sfd > 0, "Failed to setup signalfd in epoll");
182 
183 			EventType evtype;
184 
185 			epoll_event _event;
186 			_event.events = EPOLLIN;
187 			evtype = EventType.Signal;
188 			try 
189 				m_evSignal = ThreadMem.alloc!EventInfo(sfd, evtype, EventObject.init, m_instanceId);
190 			catch (Exception e){ 
191 				assert(false, "Allocation error"); 
192 			}
193 			_event.data.ptr = cast(void*) m_evSignal;
194 
195 			err = epoll_ctl(m_epollfd, EPOLL_CTL_ADD, sfd, &_event);
196 			if (catchError!"EPOLL_CTL_ADD(sfd)"(err))
197 			{
198 				return false;
199 			}
200 
201 		}
202 			else /* if KQUEUE */ 
203 		{
204 			try {
205 				if (!gs_queueMutex) {
206 					gs_queueMutex = ThreadMem.alloc!ReadWriteMutex();
207 					gs_signalQueue = Array!(Array!AsyncSignal)();
208 					gs_idxQueue = Array!(Array!size_t)();
209 				}
210 				if (g_evIdxAvailable.empty) {
211 					g_evIdxAvailable.reserve(32);
212 					
213 					foreach (k; g_evIdxAvailable.length .. g_evIdxAvailable.capacity) {
214 						g_evIdxAvailable.insertBack(k + 1);
215 					}
216 					g_evIdxCapacity = 32;
217 					g_idxCapacity = 32;
218 				}
219 			} catch { assert(false, "Initialization failed"); }
220 			m_kqueuefd = kqueue();
221 			int err;
222 			try {
223 				sigset_t mask;
224 				sigemptyset(&mask);
225 				sigaddset(&mask, SIGXCPU);
226 				
227 				err = sigprocmask(SIG_BLOCK, &mask, null);
228 			} catch {}
229 
230 			EventType evtype = EventType.Signal;
231 
232 			// use GC because ThreadMem fails at emplace for shared objects
233 			try 
234 				m_evSignal = ThreadMem.alloc!EventInfo(SIGXCPU, evtype, EventObject.init, m_instanceId);
235 			catch (Exception e) {
236 				assert(false, "Failed to allocate resources");
237 			}
238 
239 			if (catchError!"siprocmask"(err))
240 				return 0;
241 
242 			kevent_t _event;
243 			EV_SET(&_event, SIGXCPU, EVFILT_SIGNAL, EV_ADD | EV_ENABLE, 0, 0, m_evSignal);
244 			err = kevent(m_kqueuefd, &_event, 1, null, 0, null);
245 			if (catchError!"kevent_add(SIGXCPU)"(err))
246 				assert(false, "Add SIGXCPU failed at kevent call");
247 		}
248 
249 		try log("init in thread " ~ Thread.getThis().name); catch {}
250 
251 		return true;
252 	}
253 
254 	void exit() {
255 		import core.sys.posix.unistd : close;
256 		static if (EPOLL) {
257 			close(m_epollfd); // not necessary?
258 
259 			// not necessary:
260 			//try ThreadMem.free(m_evSignal);
261 			//catch (Exception e) { assert(false, "Failed to free resources"); }
262 
263 		}
264 		else
265 			close(m_kqueuefd);
266 	}
267 
268 	@property const(StatusInfo) status() const {
269 		return m_status;
270 	}
271 
272 	@property string error() const {
273 		string* ptr;
274 		return ((ptr = (m_error in EPosixMessages)) !is null) ? *ptr : string.init;
275 	}
276 
277 	bool loop(Duration timeout = 0.seconds)
278 		//in { assert(Fiber.getThis() is null); }
279 	{
280 
281 		import libasync.internals.memory;
282 		bool success = true;
283 		int num;
284 
285 		static if (EPOLL) {
286 
287 			static align(1) epoll_event[] events;
288 			if (events is null)
289 			{
290 				try events = new epoll_event[128];
291 				catch (Exception e) {
292 					assert(false, "Could not allocate events array: " ~ e.msg);
293 				}
294 			}
295 			int timeout_ms;
296 			if (timeout == 0.seconds) // return immediately
297 				timeout_ms = 0;
298 			else timeout_ms = cast(int)timeout.total!"msecs";
299 			/// Retrieve pending events
300 			num = epoll_wait(m_epollfd, cast(epoll_event*)&events[0], 128, timeout_ms);
301 			assert(events !is null && events.length <= 128);
302 
303 			
304 		}
305 		else /* if KQUEUE */ {
306 			import core.sys.posix.time : time_t;
307 			import core.sys.posix.config : c_long;
308 			static kevent_t[] events;
309 			if (events.length == 0) {
310 				try events = allocArray!kevent_t(manualAllocator(), 128); 
311 				catch (Exception e) { assert(false, "Could not allocate events array"); }				
312 			}
313 			time_t secs = timeout.split!("seconds", "nsecs")().seconds;
314 			c_long ns = timeout.split!("seconds", "nsecs")().nsecs;
315 			auto tspec = libasync.internals.kqueue.timespec(secs, ns);
316 
317 			num = kevent(m_kqueuefd, null, 0, cast(kevent_t*) events, cast(int) events.length, &tspec);
318 
319 		}
320 
321 		auto errors = [	tuple(EINTR, Status.EVLOOP_TIMEOUT) ];
322 		
323 		if (catchEvLoopErrors!"event_poll'ing"(num, errors)) 
324 			return false;
325 
326 		if (num > 0)
327 			log("Got " ~ num.to!string ~ " event(s)");
328 
329 		foreach(i; 0 .. num) {
330 			success = false;
331 			m_status = StatusInfo.init;
332 			static if (EPOLL) 
333 			{
334 				epoll_event _event = events[i];
335 				try log("Event " ~ i.to!string ~ " of: " ~ events.length.to!string); catch {}
336 				EventInfo* info = cast(EventInfo*) _event.data.ptr;
337 				int event_flags = cast(int) _event.events;
338 			}
339 			else /* if KQUEUE */
340 			{
341 				kevent_t _event = events[i];
342 				EventInfo* info = cast(EventInfo*) _event.udata;
343 				//log("Got info");
344 				int event_flags = (_event.filter << 16) | (_event.flags & 0xffff);
345 				//log("event flags");
346 			}
347 
348 			//if (info.owner != m_instanceId)
349 			//	try log("Event " ~ (cast(int)(info.evType)).to!string ~ " is invalid: supposidly created in instance #" ~ info.owner.to!string ~ ", received in " ~ m_instanceId.to!string ~ " event: " ~ event_flags.to!string);
350 			//	catch{}
351 			//log("owner");
352 			switch (info.evType) {
353 				case EventType.TCPAccept:
354 					if (info.fd == 0)
355 						break;
356 					success = onTCPAccept(info.fd, info.evObj.tcpAcceptHandler, event_flags);
357 					break;
358 
359 				case EventType.Notifier:
360 
361 					log("Got notifier!");
362 					try info.evObj.notifierHandler();
363 					catch (Exception e) {
364 						setInternalError!"notifierHandler"(Status.ERROR);
365 					}
366 					break;
367 
368 				case EventType.DirectoryWatcher:
369 					log("Got DirectoryWatcher event!");
370 					static if (!EPOLL) {
371 						// in KQUEUE all events will be consumed here, because they must be pre-processed
372 						try {
373 							DWFileEvent fevent;
374 							if (_event.fflags & (NOTE_LINK | NOTE_WRITE))
375 								fevent = DWFileEvent.CREATED;
376 							else if (_event.fflags & NOTE_DELETE)
377 								fevent = DWFileEvent.DELETED;
378 							else if (_event.fflags & (NOTE_ATTRIB | NOTE_EXTEND | NOTE_WRITE))
379 								fevent = DWFileEvent.MODIFIED;
380 							else if (_event.fflags & NOTE_RENAME)
381 								fevent = DWFileEvent.MOVED_FROM;
382 							else if (_event.fflags & NOTE_RENAME)
383 								fevent = DWFileEvent.MOVED_TO;
384 							else
385 								assert(false, "No event found?");
386 
387 							DWFolderInfo fi = m_dwFolders.get(cast(fd_t)_event.ident, DWFolderInfo.init);
388 
389 							if (fi == DWFolderInfo.init) {
390 								DWFileInfo tmp = m_dwFiles.get(cast(fd_t)_event.ident, DWFileInfo.init);
391 								assert(tmp != DWFileInfo.init, "The event loop returned an invalid file's file descriptor for the directory watcher");
392 								fi = m_dwFolders.get(cast(fd_t) tmp.folder, DWFolderInfo.init);
393 								assert(fi != DWFolderInfo.init, "The event loop returned an invalid folder file descriptor for the directory watcher");
394 							}
395 
396 							// all recursive events will be generated here
397 							if (!compareFolderFiles(fi, fevent)) {
398 								continue;
399 							}
400 
401 						} catch (Exception e) {
402 							log("Could not process DirectoryWatcher event: " ~ e.msg);
403 							break;
404 						}
405 
406 					}
407 
408 					try info.evObj.dwHandler();
409 					catch (Exception e) {
410 						setInternalError!"dwHandler"(Status.ERROR);
411 					}
412 					break;
413 
414 				case EventType.Timer:
415 					try log("Got timer! " ~ info.fd.to!string); catch {}
416 					static if (EPOLL) {
417 						static long val;
418 						import core.sys.posix.unistd : read;
419 						read(info.evObj.timerHandler.ctxt.id, &val, long.sizeof);
420 					} else {
421 					}
422 					try info.evObj.timerHandler();
423 					catch (Exception e) {
424 						setInternalError!"timerHandler"(Status.ERROR);
425 					}
426 					static if (!EPOLL) {
427 						auto ctxt = info.evObj.timerHandler.ctxt;
428 						if (ctxt && ctxt.oneShot && !ctxt.rearmed) {
429 							kevent_t __event;
430 							EV_SET(&__event, ctxt.id, EVFILT_TIMER, EV_DELETE, 0, 0, null);
431 							int err = kevent(m_kqueuefd, &__event, 1, null, 0, null);
432 							if (catchError!"kevent_del(timer)"(err))
433 								return false;
434 						}
435 					}
436 					break;
437 
438 				case EventType.Signal:
439 					try log("Got signal!"); catch {}
440 
441 					static if (EPOLL) {
442 						
443 						try log("Got signal: " ~ info.fd.to!string ~ " of type: " ~ info.evType.to!string); catch {}
444 						import core.sys.linux.sys.signalfd : signalfd_siginfo;
445 						import core.sys.posix.unistd : read;
446 						signalfd_siginfo fdsi;
447 						fd_t err = cast(fd_t)read(info.fd, &fdsi, fdsi.sizeof);
448 						shared AsyncSignal sig = cast(shared AsyncSignal) cast(void*) fdsi.ssi_ptr;
449 
450 						try sig.handler();
451 						catch (Exception e) {
452 							setInternalError!"signal handler"(Status.ERROR);
453 						}
454 
455 						
456 					}
457 					else /* if KQUEUE */
458 					{
459 						static AsyncSignal[] sigarr;
460 						
461 						if (sigarr.length == 0) {
462 							try sigarr = new AsyncSignal[32]; 
463 							catch (Exception e) { assert(false, "Could not allocate signals array"); }		
464 						}
465 
466 						bool more = popSignals(sigarr);
467 						foreach (AsyncSignal sig; sigarr)
468 						{
469 							shared AsyncSignal ptr = cast(shared AsyncSignal) sig;
470 							if (ptr is null)
471 								break;
472 							try (cast(shared AsyncSignal)sig).handler();
473 							catch (Exception e) {
474 								setInternalError!"signal handler"(Status.ERROR);
475 							}
476 						}
477 					}
478 					break;
479 
480 				case EventType.UDPSocket:
481 					import core.sys.posix.unistd : close;
482 					success = onUDPTraffic(info.fd, info.evObj.udpHandler, event_flags);
483 
484 					nothrow void abortHandler(bool graceful) {
485 
486 						close(info.fd);
487 						info.evObj.udpHandler.conn.socket = 0;
488 						try info.evObj.udpHandler(UDPEvent.ERROR);
489 						catch (Exception e) { }
490 						try ThreadMem.free(info);
491 						catch (Exception e){ assert(false, "Error freeing resources"); }
492 					}
493 					
494 					if (!success && m_status.code == Status.ABORT) {
495 						abortHandler(true);
496 						
497 					}
498 					else if (!success && m_status.code == Status.ERROR) {
499 						abortHandler(false); 
500 					}
501 					break;
502 				case EventType.TCPTraffic:
503 					assert(info.evObj.tcpEvHandler.conn !is null, "TCP Connection invalid");
504 
505 					success = onTCPTraffic(info.fd, info.evObj.tcpEvHandler, event_flags, info.evObj.tcpEvHandler.conn);
506 
507 					nothrow void abortTCPHandler(bool graceful) {
508 
509 						nothrow void closeAll() {
510 							try log("closeAll()"); catch {}
511 							if (info.evObj.tcpEvHandler.conn.connected)
512 								closeSocket(info.fd, true, true);
513 							
514 							info.evObj.tcpEvHandler.conn.socket = 0;
515 						}
516 
517 						/// Close the connection after an unexpected socket error
518 						if (graceful) {
519 							try info.evObj.tcpEvHandler(TCPEvent.CLOSE);
520 							catch (Exception e) { static if(LOG) log("Close failure"); }
521 							closeAll();
522 						}
523 
524 						/// Kill the connection after an internal error
525 						else {
526 							try info.evObj.tcpEvHandler(TCPEvent.ERROR);
527 							catch (Exception e) { static if(LOG) log("Error failure"); }
528 							closeAll();
529 						}
530 
531 						if (info.evObj.tcpEvHandler.conn.inbound) {
532 							log("Freeing inbound connection FD#" ~ info.fd.to!string);
533 							try ThreadMem.free(info.evObj.tcpEvHandler.conn);
534 							catch (Exception e){ assert(false, "Error freeing resources"); }
535 						}
536 						try ThreadMem.free(info);
537 						catch (Exception e){ assert(false, "Error freeing resources"); }
538 					}
539 
540 					if (!success && m_status.code == Status.ABORT) {
541 						abortTCPHandler(true);
542 					}
543 					else if (!success && m_status.code == Status.ERROR) {
544 						abortTCPHandler(false);
545 					}
546 					break;
547 				default:
548 					break;
549 			}
550 
551 		}
552 		return success;
553 	}
554 
555 	bool setOption(T)(fd_t fd, TCPOption option, in T value) {
556 		m_status = StatusInfo.init;
557 		import std.traits : isIntegral;
558 
559 		import libasync.internals.socket_compat : socklen_t, setsockopt, SO_REUSEADDR, SO_KEEPALIVE, SO_RCVBUF, SO_SNDBUF, SO_RCVTIMEO, SO_SNDTIMEO, SO_LINGER, SOL_SOCKET, IPPROTO_TCP, TCP_NODELAY, TCP_QUICKACK, TCP_KEEPCNT, TCP_KEEPINTVL, TCP_KEEPIDLE, TCP_CONGESTION, TCP_CORK, TCP_DEFER_ACCEPT;
560 		int err;
561 		nothrow bool errorHandler() {
562 			if (catchError!"setOption:"(err)) {
563 				try m_status.text ~= option.to!string;
564 				catch (Exception e){ assert(false, "to!string conversion failure"); }
565 				return false;
566 			}
567 
568 			return true;
569 		}
570 		final switch (option) {
571 			case TCPOption.NODELAY: // true/false
572 				static if (!is(T == bool))
573 					assert(false, "NODELAY value type must be bool, not " ~ T.stringof);
574 				else {
575 					int val = value?1:0;
576 					socklen_t len = val.sizeof;
577 					err = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, len);
578 					return errorHandler();
579 				}
580 			case TCPOption.REUSEADDR: // true/false
581 				static if (!is(T == bool))
582 					assert(false, "REUSEADDR value type must be bool, not " ~ T.stringof);
583 				else {
584 					int val = value?1:0;
585 					socklen_t len = val.sizeof;
586 					err = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &val, len);
587 					if (!errorHandler())
588 						return false;
589 
590 					// BSD systems have SO_REUSEPORT
591 					import libasync.internals.socket_compat : SO_REUSEPORT;
592 					err = setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &val, len);
593 					
594 					// Not all linux kernels support SO_REUSEPORT
595 					version(linux) {
596 						// ignore invalid and not supported errors on linux
597 						if (errno == EINVAL || errno == ENOPROTOOPT) {
598 							return true;
599 						}						
600 					} 
601 
602 					return errorHandler();
603 				}
604 			case TCPOption.QUICK_ACK:
605 				static if (!is(T == bool))
606 					assert(false, "QUICK_ACK value type must be bool, not " ~ T.stringof);
607 				else {
608 					static if (EPOLL) {
609 						int val = value?1:0;
610 						socklen_t len = val.sizeof;
611 						err = setsockopt(fd, IPPROTO_TCP, TCP_QUICKACK, &val, len);
612 						return errorHandler();
613 					}
614 					else /* not linux */ {
615 						return false;
616 					}
617 				}
618 			case TCPOption.KEEPALIVE_ENABLE: // true/false
619 				static if (!is(T == bool))
620 					assert(false, "KEEPALIVE_ENABLE value type must be bool, not " ~ T.stringof);
621 				else
622 				{
623 					int val = value?1:0;
624 					socklen_t len = val.sizeof;
625 					err = setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &val, len);
626 					return errorHandler();
627 				}
628 			case TCPOption.KEEPALIVE_COUNT: // ##
629 				static if (!isIntegral!T)
630 					assert(false, "KEEPALIVE_COUNT value type must be integral, not " ~ T.stringof);
631 				else {
632 					int val = value.total!"msecs".to!uint;
633 					socklen_t len = val.sizeof;
634 					err = setsockopt(fd, IPPROTO_TCP, TCP_KEEPCNT, &val, len);
635 					return errorHandler();
636 				}
637 			case TCPOption.KEEPALIVE_INTERVAL: // wait ## seconds
638 				static if (!is(T == Duration))
639 					assert(false, "KEEPALIVE_INTERVAL value type must be Duration, not " ~ T.stringof);
640 				else {
641 					int val;
642 					try val = value.total!"seconds".to!uint; catch { return false; }
643 					socklen_t len = val.sizeof;
644 					err = setsockopt(fd, IPPROTO_TCP, TCP_KEEPINTVL, &val, len);
645 					return errorHandler();
646 				}
647 			case TCPOption.KEEPALIVE_DEFER: // wait ## seconds until start
648 				static if (!is(T == Duration))
649 					assert(false, "KEEPALIVE_DEFER value type must be Duration, not " ~ T.stringof);
650 				else {
651 					int val;
652 					try val = value.total!"seconds".to!uint; catch { return false; }
653 					socklen_t len = val.sizeof;
654 					err = setsockopt(fd, IPPROTO_TCP, TCP_KEEPIDLE, &val, len);
655 					return errorHandler();
656 				}
657 			case TCPOption.BUFFER_RECV: // bytes
658 				static if (!isIntegral!T)
659 					assert(false, "BUFFER_RECV value type must be integral, not " ~ T.stringof);
660 				else {
661 					int val = value.to!int;
662 					socklen_t len = val.sizeof;
663 					err = setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &val, len);
664 					return errorHandler();
665 				}
666 			case TCPOption.BUFFER_SEND: // bytes
667 				static if (!isIntegral!T)
668 					assert(false, "BUFFER_SEND value type must be integral, not " ~ T.stringof);
669 				else {
670 					int val = value.to!int;
671 					socklen_t len = val.sizeof;
672 					err = setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &val, len);
673 					return errorHandler();
674 				}
675 			case TCPOption.TIMEOUT_RECV:
676 				static if (!is(T == Duration))
677 					assert(false, "TIMEOUT_RECV value type must be Duration, not " ~ T.stringof);
678 				else {
679 					import core.sys.posix.sys.time : timeval;
680 					time_t secs = cast(time_t) value.split!("seconds", "usecs")().seconds;
681 					suseconds_t us;
682 					try us = value.split!("seconds", "usecs")().usecs.to!suseconds_t; catch {}
683 					timeval t = timeval(secs, us);
684 					socklen_t len = t.sizeof;
685 					err = setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, &t, len);
686 					return errorHandler();
687 				}
688 			case TCPOption.TIMEOUT_SEND:
689 				static if (!is(T == Duration))
690 					assert(false, "TIMEOUT_SEND value type must be Duration, not " ~ T.stringof);
691 				else {
692 					import core.sys.posix.sys.time : timeval;
693 					auto timeout = value.split!("seconds", "usecs")();
694 					timeval t;
695 					try t = timeval(timeout.seconds.to!time_t, timeout.usecs.to!suseconds_t);
696 					catch (Exception) { return false; }
697 					socklen_t len = t.sizeof;
698 					err = setsockopt(fd, SOL_SOCKET, SO_SNDTIMEO, &t, len);
699 					return errorHandler();
700 				}
701 			case TCPOption.TIMEOUT_HALFOPEN:
702 				static if (!is(T == Duration))
703 					assert(false, "TIMEOUT_SEND value type must be Duration, not " ~ T.stringof);
704 				else {
705 					uint val;
706 					try val = value.total!"msecs".to!uint; catch {
707 						return false;
708 					}
709 					socklen_t len = val.sizeof;
710 					err = setsockopt(fd, SOL_SOCKET, SO_SNDTIMEO, &val, len);
711 					return errorHandler();
712 				}
713 			case TCPOption.LINGER: // bool onOff, int seconds
714 				static if (!is(T == Tuple!(bool, int)))
715 					assert(false, "LINGER value type must be Tuple!(bool, int), not " ~ T.stringof);
716 				else {
717 					linger l = linger(val[0]?1:0, val[1]);
718 					socklen_t llen = l.sizeof;
719 					err = setsockopt(fd, SOL_SOCKET, SO_LINGER, &l, llen);
720 					return errorHandler();
721 				}
722 			case TCPOption.CONGESTION:
723 				static if (!isIntegral!T)
724 					assert(false, "CONGESTION value type must be integral, not " ~ T.stringof);
725 				else {
726 					int val = value.to!int;
727 					len = int.sizeof;
728 					err = setsockopt(fd, IPPROTO_TCP, TCP_CONGESTION, &val, len);
729 					return errorHandler();
730 				}
731 			case TCPOption.CORK:
732 				static if (!isIntegral!T)
733 					assert(false, "CORK value type must be int, not " ~ T.stringof);
734 				else {
735 					static if (EPOLL) {
736 						int val = value.to!int;
737 						socklen_t len = val.sizeof;
738 						err = setsockopt(fd, IPPROTO_TCP, TCP_CORK, &val, len);
739 						return errorHandler();
740 					}
741 					else /* if KQUEUE */ {
742 						int val = value.to!int;
743 						socklen_t len = val.sizeof;
744 						err = setsockopt(fd, IPPROTO_TCP, TCP_NOPUSH, &val, len);
745 						return errorHandler();
746 
747 					}
748 				}
749 			case TCPOption.DEFER_ACCEPT: // seconds
750 				static if (!isIntegral!T)
751 					assert(false, "DEFER_ACCEPT value type must be integral, not " ~ T.stringof);
752 				else {
753 					static if (EPOLL) {
754 						int val = value.to!int;
755 						socklen_t len = val.sizeof;
756 						err = setsockopt(fd, IPPROTO_TCP, TCP_DEFER_ACCEPT, &val, len);
757 						return errorHandler();
758 					}
759 					else /* if KQUEUE */ {
760 						// todo: Emulate DEFER_ACCEPT with ACCEPT_FILTER(9)
761 						/*int val = value.to!int;
762 						 socklen_t len = val.sizeof;
763 						 err = setsockopt(fd, SOL_SOCKET, SO_ACCEPTFILTER, &val, len);
764 						 return errorHandler();
765 						 */
766 						assert(false, "TCPOption.DEFER_ACCEPT is not implemented");
767 					}
768 				}
769 		}
770 
771 	}
772 
773 	uint recv(in fd_t fd, ref ubyte[] data)
774 	{
775 		try log("Recv from FD: " ~ fd.to!string); catch {}
776 		m_status = StatusInfo.init;
777 		import libasync.internals.socket_compat : recv;
778 		int ret = cast(int) recv(fd, cast(void*) data.ptr, data.length, cast(int)0);
779 		
780 		static if (LOG) log(".recv " ~ ret.to!string ~ " bytes of " ~ data.length.to!string ~ " @ " ~ fd.to!string);
781 		if (catchError!".recv"(ret)){ // ret == SOCKET_ERROR == -1 ?
782 			if (m_error == EPosix.EWOULDBLOCK || m_error == EPosix.EAGAIN)
783 				m_status.code = Status.ASYNC;
784 
785 			return 0; // TODO: handle some errors more specifically
786 		}
787 
788 		m_status.code = Status.OK;
789 		
790 		return cast(uint) ret < 0 ? 0 : ret;
791 	}
792 	
793 	uint send(in fd_t fd, in ubyte[] data)
794 	{
795 		try log("Send to FD: " ~ fd.to!string); catch {}
796 		m_status = StatusInfo.init;
797 		import libasync.internals.socket_compat : send;
798 		int ret = cast(int) send(fd, cast(const(void)*) data.ptr, data.length, cast(int)0);
799 
800 		if (catchError!"send"(ret)) { // ret == -1
801 			if (m_error == EPosix.EWOULDBLOCK || m_error == EPosix.EAGAIN) {
802 				m_status.code = Status.ASYNC;
803 				return 0;
804 			}
805 		} else
806 			m_status.code = Status.OK;
807 		return cast(uint) ret < 0 ? 0 : ret;
808 	}
809 
810 	uint recvFrom(in fd_t fd, ref ubyte[] data, ref NetworkAddress addr)
811 	{
812 		import libasync.internals.socket_compat : recvfrom, AF_INET6, AF_INET, socklen_t;
813 
814 		m_status = StatusInfo.init;
815 
816 		addr.family = AF_INET6;
817 		socklen_t addrLen = addr.sockAddrLen;
818 		long ret = recvfrom(fd, cast(void*) data.ptr, data.length, 0, addr.sockAddr, &addrLen);
819 		
820 		if (addrLen < addr.sockAddrLen) {
821 			addr.family = AF_INET;
822 		}
823 		
824 		try log("RECVFROM " ~ ret.to!string ~ "B"); catch {}
825 		if (catchError!".recvfrom"(ret)) { // ret == -1
826 			if (m_error == EPosix.EWOULDBLOCK || m_error == EPosix.EAGAIN)
827 				m_status.code = Status.ASYNC;
828 			return 0;
829 		}
830 
831 		m_status.code = Status.OK;
832 		
833 		return cast(uint) ret;
834 	}
835 	
836 	uint sendTo(in fd_t fd, in ubyte[] data, in NetworkAddress addr)
837 	{
838 		import libasync.internals.socket_compat : sendto;
839 
840 		m_status = StatusInfo.init;
841 
842 		try log("SENDTO " ~ data.length.to!string ~ "B");
843 		catch{}
844 		long ret = sendto(fd, cast(void*) data.ptr, data.length, 0, addr.sockAddr, addr.sockAddrLen);
845 		
846 		if (catchError!".sendto"(ret)) { // ret == -1
847 			if (m_error == EPosix.EWOULDBLOCK || m_error == EPosix.EAGAIN)
848 				m_status.code = Status.ASYNC;
849 			return 0;
850 		}
851 
852 		m_status.code = Status.OK;
853 		return cast(uint) ret;
854 	}
855 
856 	NetworkAddress localAddr(in fd_t fd, bool ipv6) {
857 		NetworkAddress ret;
858 		import libasync.internals.socket_compat : getsockname, AF_INET, AF_INET6, socklen_t, sockaddr;
859 
860 		if (ipv6)
861 			ret.family = AF_INET6;
862 		else
863 			ret.family = AF_INET;
864 
865 		socklen_t len = ret.sockAddrLen;
866 		int err = getsockname(fd, ret.sockAddr, &len);
867 		if (catchError!"getsockname"(err))
868 			return NetworkAddress.init;
869 		if (len > ret.sockAddrLen)
870 			ret.family = AF_INET6;
871 
872 		return ret;
873 	}
874 
875 	bool notify(in fd_t fd, AsyncNotifier ctxt)
876 	{
877 		static if (EPOLL)
878 		{
879 			import core.sys.posix.unistd : write;
880 
881 			long val = 1;
882 			fd_t err = cast(fd_t) write(fd, &val, long.sizeof);
883 			
884 			if (catchError!"write(notify)"(err)) {
885 				return false;
886 			}
887 			return true;
888 		}
889 		else /* if KQUEUE */
890 		{			
891 			kevent_t _event;
892 			EV_SET(&_event, fd, EVFILT_USER, EV_ENABLE | EV_CLEAR, NOTE_TRIGGER | 0x1, 0, ctxt.evInfo);
893 			int err = kevent(m_kqueuefd, &_event, 1, null, 0, null);
894 			
895 			if (catchError!"kevent_notify"(err)) {
896 				return false;
897 			}
898 			return true;
899 		}
900 	}
901 
902 	bool notify(in fd_t fd, shared AsyncSignal ctxt)
903 	{
904 		static if (EPOLL) 
905 		{
906 
907 			sigval sigvl;
908 			fd_t err;
909 			sigvl.sival_ptr = cast(void*) ctxt;
910 			try err = pthread_sigqueue(ctxt.pthreadId, fd, sigvl); catch {}
911 			if (catchError!"sigqueue"(err)) {
912 				return false;
913 			}
914 		}
915 		else /* if KQUEUE */ 
916 		{
917 
918 			import core.thread : getpid;
919 
920 			addSignal(ctxt);
921 
922 			try {
923 				log("Notified fd: " ~ fd.to!string ~ " of PID " ~ getpid().to!string); 
924 				int err = core.sys.posix.signal.kill(getpid(), SIGXCPU);
925 				if (catchError!"notify(signal)"(err))
926 					assert(false, "Signal could not be raised");
927 			} catch {}
928 		}
929 
930 		return true;
931 	}
932 
933 	// no known uses
934 	uint read(in fd_t fd, ref ubyte[] data)
935 	{
936 		m_status = StatusInfo.init;
937 		return 0;
938 	}
939 
940 	// no known uses
941 	uint write(in fd_t fd, in ubyte[] data)
942 	{
943 		m_status = StatusInfo.init;
944 		return 0;
945 	}
946 
947 	uint watch(in fd_t fd, in WatchInfo info) {
948 		// note: info.wd is still 0 at this point.
949 		m_status = StatusInfo.init;
950 		import core.sys.linux.sys.inotify;
951 		import std.file : dirEntries, isDir, SpanMode;
952 
953 		static if (EPOLL) {
954 			// Manually handle recursivity... All events show up under the same inotify
955 			uint events = info.events; // values for this API were pulled from inotify
956 			if (events & IN_DELETE)
957 				events |= IN_DELETE_SELF;
958 			if (events & IN_MOVED_FROM)
959 				events |= IN_MOVE_SELF;
960 
961 			nothrow fd_t addFolderRecursive(Path path) {
962 				fd_t ret;
963 				try {
964 					ret = inotify_add_watch(fd, path.toNativeString().toStringz, events);
965 					if (catchError!"inotify_add_watch"(ret))
966 						return fd_t.init;
967 					try log("inotify_add_watch(" ~ DWFolderInfo(WatchInfo(info.events, path, info.recursive, ret), fd).to!string ~ ")"); catch {}
968 					assert(m_dwFolders.get(tuple(cast(fd_t) fd, cast(uint)ret), DWFolderInfo.init) == DWFolderInfo.init, "Could not get a unique watch descriptor for path, got: " ~ m_dwFolders[tuple(cast(fd_t)fd, cast(uint)ret)].to!string);
969 					m_dwFolders[tuple(cast(fd_t)fd, cast(uint)ret)] = DWFolderInfo(WatchInfo(info.events, path, info.recursive, ret), fd);
970 					if (info.recursive) {
971 						foreach (de; path.toNativeString().dirEntries(SpanMode.shallow))
972 						{
973 							Path de_path = Path(de.name);
974 							if (!de_path.absolute)
975 								de_path = path ~ Path(de.name);
976 							if (isDir(de_path.toNativeString()))
977 								if (addFolderRecursive(de_path) == 0)
978 									return 0;
979 						}
980 					}
981 
982 				} catch (Exception e) { 
983 					try setInternalError!"inotify_add_watch"(Status.ERROR, "Could not add directory " ~ path.toNativeString() ~ ": " ~ e.toString() ); catch {}
984 					return 0; 
985 				}
986 
987 				return ret;
988 			}
989 
990 			return addFolderRecursive(info.path);
991 
992 		} else /* if KQUEUE */ {
993 			/// Manually handle recursivity & file tracking. Each folder is an event! 
994 			/// E.g. file creation shows up as a folder change, we must be prepared to seek the file.
995 			import core.sys.posix.fcntl;
996 			import libasync.internals.kqueue;
997 
998 			uint events;
999 			if (info.events & DWFileEvent.CREATED)
1000 				events |= NOTE_LINK | NOTE_WRITE;
1001 			if (info.events & DWFileEvent.DELETED)
1002 				events |= NOTE_DELETE;
1003 			if (info.events & DWFileEvent.MODIFIED)
1004 				events |= NOTE_ATTRIB | NOTE_EXTEND | NOTE_WRITE;
1005 			if (info.events & DWFileEvent.MOVED_FROM)
1006 				events |= NOTE_RENAME;
1007 			if (info.events & DWFileEvent.MOVED_TO)
1008 				events |= NOTE_RENAME;
1009 
1010 			EventInfo* evinfo;
1011 			try evinfo = m_watchers[fd]; catch { assert(false, "Could retrieve event info, directory watcher was not initialized properly, or you are operating on a closed directory watcher."); }
1012 
1013 			/// we need a file descriptor for the containers, so we open files but we don't monitor them
1014 			/// todo: track indexes internally?
1015 			nothrow fd_t addRecursive(Path path, bool is_dir) {
1016 				int ret;
1017 				try {
1018 					log("Adding path: " ~ path.toNativeString());
1019 
1020 					ret = open(path.toNativeString().toStringz, O_EVTONLY);
1021 					if (catchError!"open(watch)"(ret))
1022 						return 0;
1023 
1024 					if (is_dir)
1025 						m_dwFolders[ret] = DWFolderInfo(WatchInfo(info.events, path, info.recursive, ret), fd);
1026 
1027 					kevent_t _event;
1028 					
1029 					EV_SET(&_event, ret, EVFILT_VNODE, EV_ADD | EV_CLEAR, events, 0, cast(void*) evinfo);
1030 					
1031 					int err = kevent(m_kqueuefd, &_event, 1, null, 0, null);
1032 					
1033 					if (catchError!"kevent_timer_add"(err))
1034 						return 0;
1035 
1036 					
1037 					if (is_dir) foreach (de; dirEntries(path.toNativeString(), SpanMode.shallow)) {
1038 						Path filePath = Path(de.name);
1039 						if (!filePath.absolute)
1040 							filePath = path ~ filePath;
1041 						fd_t fwd;
1042 						if (info.recursive && isDir(filePath.toNativeString()))
1043 							fwd = addRecursive(filePath, true);
1044 						else {
1045 							fwd = addRecursive(filePath, false); // gets an ID but will not scan
1046 							m_dwFiles[fwd] = DWFileInfo(ret, filePath, de.timeLastModified, isDir(filePath.toNativeString()));
1047 						}
1048 						
1049 					}
1050 
1051 				} catch (Exception e) { 
1052 					try setInternalError!"inotify_add_watch"(Status.ERROR, "Could not add directory " ~ path.toNativeString() ~ ": " ~ e.msg);  catch {}
1053 					return 0; 
1054 				}
1055 				return ret;
1056 			}
1057 
1058 			fd_t wd;
1059 
1060 			try {
1061 				wd = addRecursive(info.path, isDir(info.path.toNativeString()));
1062 
1063 				if (wd == 0)
1064 					return 0;
1065 
1066 			}
1067 			catch (Exception e) {
1068 				setInternalError!"dw.watch"(Status.ERROR, "Failed to watch directory: " ~ e.msg); 
1069 			}
1070 
1071 			return cast(uint) wd;
1072 		}
1073 	}
1074 
1075 	bool unwatch(in fd_t fd, in uint wd) {
1076 		// the wd can be used with m_dwFolders to find the DWFolderInfo
1077 		// and unwatch everything recursively.
1078 
1079 		m_status = StatusInfo.init;
1080 		static if (EPOLL) {
1081 			/// If recursive, all subfolders must also be unwatched recursively by removing them
1082 			/// from containers and from inotify
1083 			import core.sys.linux.sys.inotify;
1084 
1085 			nothrow bool removeAll(DWFolderInfo fi) {
1086 				int err;
1087 				try {
1088 
1089 					bool inotify_unwatch(uint wd) {
1090 						err = inotify_rm_watch(fd, wd);
1091 
1092 						if (catchError!"inotify_rm_watch"(err))
1093 							return false;
1094 						return true;
1095 					}
1096 
1097 					if (!inotify_unwatch(fi.wi.wd))
1098 						return false;
1099 
1100 					/*foreach (ref const fd_t id, ref const DWFileInfo file; m_dwFiles)
1101 					 {
1102 					 if (file.folder == fi.wi.wd) {
1103 					 inotify_unwatch(id);
1104 					 m_dwFiles.remove(id);
1105 					 }
1106 					 }*/
1107 					m_dwFolders.remove(tuple(cast(fd_t)fd, fi.wi.wd)); 
1108 
1109 					if (fi.wi.recursive) {
1110 						// find all subdirectories by comparing the path
1111 						Array!uint remove_list;
1112 						foreach (ref const DWFolderInfo folder; m_dwFolders) {
1113 							if (folder.fd == fi.fd && folder.wi.path.startsWith(fi.wi.path)) {
1114 
1115 								if (!inotify_unwatch(folder.wi.wd))
1116 									return false;
1117 
1118 								remove_list.insertBack(fi.wi.wd);
1119 							}
1120 						}
1121 						foreach (rm_wd; remove_list[])
1122 							m_dwFolders.remove(tuple(cast(fd_t) fd, rm_wd));
1123 
1124 					}
1125 					return true;
1126 				} catch (Exception e) { 
1127 					try setInternalError!"inotify_rm_watch"(Status.ERROR, "Could not unwatch directory: " ~ e.toString()); catch {}
1128 					return false; 
1129 				}
1130 			}
1131 
1132 			DWFolderInfo info;
1133 
1134 			try {
1135 				info = m_dwFolders.get(tuple(cast(fd_t) fd, cast(uint) wd), DWFolderInfo.init);
1136 				if (info == DWFolderInfo.init) {
1137 					setInternalError!"dwFolders.get(wd)"(Status.ERROR, "Could not find watch info for wd " ~ wd.to!string);
1138 					return false;
1139 				}
1140 			} catch { }
1141 
1142 			return removeAll(info);
1143 		}
1144 		else /* if KQUEUE */ {
1145 
1146 			/// Recursivity must be handled manually, so we must unwatch subfiles and subfolders
1147 			/// recursively, remove the container entries, close the file descriptor, and disable the vnode events.
1148 
1149 			nothrow bool removeAll(DWFolderInfo fi) {
1150 				import core.sys.posix.unistd : close;
1151 
1152 				
1153 				bool event_unset(uint id) {
1154 					kevent_t _event;
1155 					EV_SET(&_event, cast(int) id, EVFILT_VNODE, EV_DELETE, 0, 0, null);
1156 					int err = kevent(m_kqueuefd, &_event, 1, null, 0, null);
1157 					if (catchError!"kevent_unwatch"(err))
1158 						return false;
1159 					return true;
1160 				}
1161 
1162 				bool removeFolder(uint wd) {
1163 					if (!event_unset(fi.wi.wd))
1164 						return false;
1165 					m_dwFolders.remove(fi.wi.wd);
1166 					int err = close(fi.wi.wd);
1167 					if (catchError!"close dir"(err))
1168 						return false;
1169 					return true;
1170 				}
1171 
1172 				try {
1173 					removeFolder(fi.wi.wd);
1174 
1175 					if (fi.wi.recursive) {
1176 						import std.container.array;
1177 						Array!fd_t remove_list; // keep track of unwatched folders recursively
1178 						Array!fd_t remove_file_list;
1179 						// search for subfolders and unset them / close their wd
1180 						foreach (ref const DWFolderInfo folder; m_dwFolders) {
1181 							if (folder.fd == fi.fd && folder.wi.path.startsWith(fi.wi.path)) {
1182 								
1183 								if (!event_unset(folder.wi.wd))
1184 									return false;
1185 
1186 								// search for subfiles, close their descriptors and remove them from the file list
1187 								foreach (ref const fd_t fwd, ref const DWFileInfo file; m_dwFiles) {
1188 									if (file.folder == folder.wi.wd) {
1189 										close(fwd);
1190 										remove_file_list.insertBack(fwd); // to be removed from m_dwFiles without affecting the loop
1191 									}
1192 								}
1193 
1194 								remove_list.insertBack(folder.wi.wd); // to be removed from m_dwFolders without affecting the loop
1195 							}
1196 						}
1197 
1198 						foreach (wd; remove_file_list[])
1199 							m_dwFiles.remove(wd);
1200 
1201 						foreach (rm_wd; remove_list[])
1202 							removeFolder(rm_wd);
1203 
1204 						
1205 					}
1206 				} catch (Exception e) {
1207 					try setInternalError!"dwFolders.get(wd)"(Status.ERROR, "Could not close the folder " ~ fi.to!string ~ ": " ~ e.toString()); catch {}
1208 					return false;
1209 				}
1210 
1211 				return true;
1212 			}
1213 
1214 			DWFolderInfo info;
1215 			try info = m_dwFolders.get(wd, DWFolderInfo.init); catch {}
1216 
1217 			if (!removeAll(info))
1218 				return false;
1219 			return true;
1220 		}
1221 	}
1222 
1223 	// returns the amount of changes
1224 	uint readChanges(in fd_t fd, ref DWChangeInfo[] dst) {
1225 		m_status = StatusInfo.init;
1226 
1227 		static if (EPOLL) {
1228 			assert(dst.length > 0, "DirectoryWatcher called with 0 length DWChangeInfo array");
1229 			import core.sys.linux.sys.inotify;
1230 			import core.sys.posix.unistd : read;
1231 			import core.stdc.stdio : FILENAME_MAX;
1232 			import core.stdc.string : strlen;
1233 			ubyte[inotify_event.sizeof + FILENAME_MAX + 1] buf = void;
1234 			ssize_t nread = read(fd, buf.ptr, cast(uint)buf.sizeof);
1235 			if (catchError!"read()"(nread))
1236 			{
1237 				if (m_error == EPosix.EWOULDBLOCK || m_error == EPosix.EAGAIN) 
1238 					m_status.code = Status.ASYNC;
1239 				return 0;
1240 			}
1241 			assert(nread > 0);
1242 			
1243 
1244 			/// starts (recursively) watching all newly created folders in a recursive entry,
1245 			/// creates events for additional files/folders founds, and unwatches all deleted folders
1246 			void recurseInto(DWFolderInfo fi, DWFileEvent ev, ref Array!DWChangeInfo changes) {
1247 				import std.file : dirEntries, SpanMode, isDir;
1248 				assert(fi.wi.recursive);
1249 				// get a list of stuff in the created/moved folder
1250 				if (ev == DWFileEvent.CREATED || ev == DWFileEvent.MOVED_TO) {
1251 					foreach (de; dirEntries(fi.wi.path.toNativeString(), SpanMode.shallow)) {
1252 						Path entryPath = fi.wi.path ~ Path(de.name);
1253 						
1254 						if (fi.wi.recursive && isDir(entryPath.toNativeString())) {
1255 							
1256 							watch(fd, WatchInfo(fi.wi.events, entryPath, fi.wi.recursive, 0) );
1257 							void genEvents(Path subpath) {
1258 								foreach (de; dirEntries(subpath.toNativeString(), SpanMode.shallow)) {
1259 									auto subsubpath = subpath ~ Path(de.name);
1260 									changes.insertBack(DWChangeInfo(DWFileEvent.CREATED, subsubpath));
1261 									if (isDir(subsubpath.toNativeString()))
1262 										genEvents(subsubpath);
1263 								}
1264 							}
1265 							
1266 							genEvents(entryPath);
1267 							
1268 						}
1269 					}
1270 				}
1271 			}
1272 
1273 			size_t i;
1274 			do
1275 			{
1276 				for (auto p = buf.ptr; p < buf.ptr + nread; )
1277 				{
1278 					inotify_event* ev = cast(inotify_event*)p;
1279 					p += inotify_event.sizeof + ev.len;
1280 
1281 					DWFileEvent evtype;
1282 					evtype = DWFileEvent.CREATED;
1283 					if (ev.mask & IN_CREATE)
1284 						evtype = DWFileEvent.CREATED;
1285 					if (ev.mask & IN_DELETE || ev.mask & IN_DELETE_SELF)
1286 						evtype = DWFileEvent.DELETED;
1287 					if (ev.mask & IN_MOVED_FROM || ev.mask & IN_MOVE_SELF)
1288 						evtype = DWFileEvent.MOVED_FROM;
1289 					if (ev.mask & (IN_MOVED_TO))
1290 						evtype = DWFileEvent.MOVED_TO;
1291 					if (ev.mask & IN_MODIFY)
1292 						evtype = DWFileEvent.MODIFIED;
1293 
1294 					import std.path : buildPath;
1295 					import core.stdc.string : strlen;
1296 					string name = cast(string) ev.name.ptr[0 .. cast(size_t) ev.name.ptr.strlen].idup;
1297 					DWFolderInfo fi;
1298 					Path path;
1299 					try {
1300 						fi = m_dwFolders.get(tuple(cast(fd_t)fd,cast(uint)ev.wd), DWFolderInfo.init);
1301 						if (fi == DWFolderInfo.init) {
1302 							setInternalError!"m_dwFolders[ev.wd]"(Status.ERROR, "Could not retrieve wd index in folders: " ~ ev.wd.to!string); 
1303 							continue;
1304 						}
1305 						path = fi.wi.path ~ Path(name); 
1306 					}
1307 					catch (Exception e) { 
1308 						setInternalError!"m_dwFolders[ev.wd]"(Status.ERROR, "Could not retrieve wd index in folders"); 
1309 						return 0; 
1310 					}
1311 
1312 					dst[i] = DWChangeInfo(evtype, path);
1313 					import std.file : isDir;
1314 					bool is_dir;
1315 					try is_dir = isDir(path.toNativeString()); catch {}
1316 					if (fi.wi.recursive && is_dir) {
1317 
1318 						try {
1319 							Array!DWChangeInfo changes;
1320 							recurseInto(fi, evtype, changes);
1321 							// stop watching if the folder was deleted
1322 							if (evtype == DWFileEvent.DELETED || evtype == DWFileEvent.MOVED_FROM) {
1323 								unwatch(fi.fd, fi.wi.wd);
1324 							}
1325 							foreach (change; changes[]) {
1326 								i++;
1327 								if (dst.length <= i)
1328 									dst ~= change;
1329 								else dst[i] = change;
1330 							}
1331 						}
1332 						catch (Exception e) {
1333 							setInternalError!"recurseInto"(Status.ERROR, "Failed to watch/unwatch contents of folder recursively."); 
1334 							return 0; 
1335 						}
1336 
1337 					}
1338 
1339 					
1340 					i++;
1341 					if (i >= dst.length)
1342 						return cast(uint) i;
1343 				}
1344 				static if (LOG) foreach (j; 0 .. i) {
1345 					try log("Change occured for FD#" ~ fd.to!string ~ ": " ~ dst[j].to!string); catch {}
1346 				}
1347 				nread = read(fd, buf.ptr, buf.sizeof);
1348 				if (catchError!"read()"(nread)) {
1349 					if (m_error == EPosix.EWOULDBLOCK || m_error == EPosix.EAGAIN)
1350 						m_status.code = Status.ASYNC;
1351 					return cast(uint) i;
1352 				}
1353 			} while (nread > 0);
1354 
1355 			return cast(uint) i;
1356 		}
1357 		else /* if KQUEUE */ {
1358 			Array!(DWChangeInfo)* changes;
1359 			size_t i;
1360 			try {
1361 				changes = m_changes[fd];
1362 				import std.algorithm : min;
1363 				size_t cnt = min(dst.length, changes.length);
1364 				foreach (DWChangeInfo change; (*changes)[0 .. cnt]) {
1365 					dst[i] = (*changes)[i];
1366 					i++;
1367 				}
1368 				changes.linearRemove((*changes)[0 .. cnt]);
1369 			}
1370 			catch (Exception e) {
1371 				setInternalError!"watcher.readChanges"(Status.ERROR, "Could not read directory changes: " ~ e.msg);
1372 				return false;
1373 			}
1374 			return cast(uint) i;
1375 		}
1376 	}
1377 
1378 	bool broadcast(in fd_t fd, bool b) {
1379 		m_status = StatusInfo.init;
1380 
1381 		import libasync.internals.socket_compat : socklen_t, setsockopt, SO_BROADCAST, SOL_SOCKET;
1382 
1383 		int val = b?1:0;
1384 		socklen_t len = val.sizeof;
1385 		int err = setsockopt(fd, SOL_SOCKET, SO_BROADCAST, &val, len);
1386 		if (catchError!"setsockopt"(err))
1387 			return false;
1388 
1389 		return true;
1390 	}
1391 
1392 	private bool closeRemoteSocket(fd_t fd, bool forced) {
1393 		
1394 		int err;
1395 		log("shutdown");
1396 		import libasync.internals.socket_compat : shutdown, SHUT_WR, SHUT_RDWR, SHUT_RD;
1397 		if (forced) 
1398 			err = shutdown(fd, SHUT_RDWR);
1399 		else
1400 			err = shutdown(fd, SHUT_WR);
1401 
1402 		static if (!EPOLL) {
1403 			kevent_t[2] events;
1404 			try log("!!DISC delete events"); catch {}
1405 			EV_SET(&(events[0]), fd, EVFILT_READ, EV_DELETE | EV_DISABLE, 0, 0, null);
1406 			EV_SET(&(events[1]), fd, EVFILT_WRITE, EV_DELETE | EV_DISABLE, 0, 0, null);
1407 			kevent(m_kqueuefd, &(events[0]), 2, null, 0, null);
1408 
1409 		}
1410 
1411 		if (catchError!"shutdown"(err))
1412 			return false;
1413 
1414 		return true;
1415 	}
1416 
1417 	// for connected sockets
1418 	bool closeSocket(fd_t fd, bool connected, bool forced = false)
1419 	{
1420 		log("closeSocket");
1421 		if (connected && !closeRemoteSocket(fd, forced) && !forced)
1422 			return false;
1423 		
1424 		if (!connected || forced) {
1425 			// todo: flush the socket here?
1426 
1427 			import core.sys.posix.unistd : close;
1428 			log("close");
1429 			int err = close(fd);
1430 			if (catchError!"closesocket"(err)) 
1431 				return false;
1432 		}
1433 		return true;
1434 	}
1435 
1436 	
1437 	NetworkAddress getAddressFromIP(in string ipAddr, in ushort port = 0, in bool ipv6 = false, in bool tcp = true) 
1438 	{
1439 		import libasync.internals.socket_compat : addrinfo, AI_NUMERICHOST, AI_NUMERICSERV;
1440 		addrinfo hints;
1441 		hints.ai_flags |= AI_NUMERICHOST | AI_NUMERICSERV; // Specific to an IP resolver!
1442 
1443 		return getAddressInfo(ipAddr, port, ipv6, tcp, hints);
1444 	}
1445 
1446 	
1447 	NetworkAddress getAddressFromDNS(in string host, in ushort port = 0, in bool ipv6 = true, in bool tcp = true)
1448 		/*in { 
1449 		 debug import libasync.internals.validator : validateHost;
1450 		 debug assert(validateHost(host), "Trying to connect to an invalid domain"); 
1451 		 }
1452 		body */{
1453 		import libasync.internals.socket_compat : addrinfo;
1454 		addrinfo hints;
1455 		return getAddressInfo(host, port, ipv6, tcp, hints);
1456 	}
1457 	
1458 	void setInternalError(string TRACE)(in Status s, in string details = "", in error_t error = EPosix.EACCES)
1459 	{
1460 		if (details.length > 0)
1461 			m_status.text = TRACE ~ ": " ~ details;
1462 		else m_status.text = TRACE;
1463 		m_error = error;
1464 		m_status.code = s;
1465 		static if(LOG) log(m_status);
1466 	}
1467 private:	
1468 
1469 	/// For DirectoryWatcher
1470 	/// In kqueue/vnode, all we get is the folder in which changes occured.
1471 	/// We have to figure out what changed exactly and put the results in a container 
1472 	/// for the readChanges call.
1473 	static if (!EPOLL) bool compareFolderFiles(DWFolderInfo fi, DWFileEvent events) {
1474 		import std.file;
1475 		import std.path : buildPath;
1476 		try {
1477 			Array!Path currFiles;
1478 			auto wd = fi.wi.wd;
1479 			auto path = fi.wi.path;
1480 			auto fd = fi.fd;
1481 			Array!(DWChangeInfo)* changes = m_changes.get(fd, null);
1482 			assert(changes !is null, "Invalid wd, could not find changes array.");
1483 			//import std.stdio : writeln;
1484 			//writeln("Scanning path: ", path.toNativeString());
1485 			//writeln("m_dwFiles length: ", m_dwFiles.length);
1486 			
1487 			// get a list of the folder
1488 			foreach (de; dirEntries(path.toNativeString(), SpanMode.shallow)) {
1489 				//writeln(de.name);
1490 				Path entryPath = Path(de.name);
1491 				if (!entryPath.absolute)
1492 					entryPath = path ~ entryPath;
1493 				bool found;
1494 
1495 				// compare it to the cached list fixme: make it faster using another container?
1496 				foreach (ref const fd_t id, ref const DWFileInfo file; m_dwFiles) {
1497 					if (file.folder != wd) continue; // this file isn't in the evented folder
1498 					if (file.path == entryPath) {
1499 						found = true;
1500 						log("File modified? " ~ entryPath.toNativeString() ~ " at: " ~ de.timeLastModified.to!string ~ " vs: " ~ file.lastModified.to!string);
1501 						// Check if it was modified
1502 						if (!isDir(entryPath.toNativeString()) && de.timeLastModified > file.lastModified)
1503 						{
1504 							DWFileInfo dwf = file;
1505 							dwf.lastModified = de.timeLastModified;
1506 							m_dwFiles[id] = dwf;
1507 							changes.insertBack(DWChangeInfo(DWFileEvent.MODIFIED, file.path));
1508 						}
1509 						break;
1510 					}
1511 				}
1512 
1513 				// This file/folder is new in the folder
1514 				if (!found) {
1515 					changes.insertBack(DWChangeInfo(DWFileEvent.CREATED, entryPath));
1516 
1517 					if (fi.wi.recursive && isDir(entryPath.toNativeString())) {
1518 						/// This is the complicated part. The folder needs to be watched, and all the events
1519 						/// generated for every file/folder found recursively inside it,
1520 						/// Useful e.g. when mkdir -p is used.
1521 						watch(fd, WatchInfo(fi.wi.events, entryPath, fi.wi.recursive, wd) );
1522 						void genEvents(Path subpath) {
1523 							foreach (de; dirEntries(subpath.toNativeString(), SpanMode.shallow)) {
1524 								auto subsubpath = Path(de.name);
1525 								if (!subsubpath.absolute())
1526 									subsubpath = subpath ~ subsubpath;
1527 								changes.insertBack(DWChangeInfo(DWFileEvent.CREATED, subsubpath));
1528 								if (isDir(subsubpath.toNativeString()))
1529 									genEvents(subsubpath);
1530 							}
1531 						}
1532 
1533 						genEvents(entryPath);
1534 
1535 					}
1536 					else {
1537 						EventInfo* evinfo;
1538 						try evinfo = m_watchers[fd]; catch { assert(false, "Could retrieve event info, directory watcher was not initialized properly, or you are operating on a closed directory watcher."); }
1539 
1540 						log("Adding path: " ~ path.toNativeString());
1541 
1542 						import core.sys.posix.fcntl : open;
1543 						fd_t fwd = open(entryPath.toNativeString().toStringz, O_EVTONLY);
1544 						if (catchError!"open(watch)"(fwd))
1545 							return 0;
1546 
1547 						kevent_t _event;
1548 						
1549 						EV_SET(&_event, fwd, EVFILT_VNODE, EV_ADD | EV_CLEAR, fi.wi.events, 0, cast(void*) evinfo);
1550 						
1551 						int err = kevent(m_kqueuefd, &_event, 1, null, 0, null);
1552 
1553 						if (catchError!"kevent_timer_add"(err))
1554 							return 0;
1555 
1556 						m_dwFiles[fwd] = DWFileInfo(fi.wi.wd, entryPath, de.timeLastModified, false);
1557 						
1558 					}
1559 				}
1560 
1561 				// This file/folder is now current. This avoids a deletion event.
1562 				currFiles.insert(entryPath);
1563 			}
1564 
1565 			/// Now search for files/folders that were deleted in this directory (no recursivity needed). 
1566 			/// Unwatch this directory and generate delete event only for the root dir
1567 			foreach (ref const fd_t id, ref const DWFileInfo file; m_dwFiles) {
1568 				if (file.folder != wd) continue; // skip those files in another folder than the evented one
1569 				bool found;
1570 				foreach (Path curr; currFiles) {
1571 					if (file.path == curr){
1572 						found = true;
1573 						break;
1574 					}
1575 				}
1576 				// this file/folder was in the folder but it's not there anymore
1577 				if (!found) {
1578 					// writeln("Deleting: ", file.path.toNativeString());
1579 					kevent_t _event;
1580 					EV_SET(&_event, cast(int) id, EVFILT_VNODE, EV_DELETE, 0, 0, null);
1581 					int err = kevent(m_kqueuefd, &_event, 1, null, 0, null);
1582 					if (catchError!"kevent_unwatch"(err))
1583 						return false;
1584 					import core.sys.posix.unistd : close;
1585 					err = close(id);
1586 					if (catchError!"close(dwFile)"(err))
1587 						return false;
1588 					changes.insert(DWChangeInfo(DWFileEvent.DELETED, file.path));
1589 
1590 					if (fi.wi.recursive && file.is_dir) 
1591 						unwatch(fd, id);
1592 
1593 					m_dwFiles.remove(id);
1594 
1595 				}
1596 				
1597 			}
1598 			if(changes.empty)
1599 				return false; // unhandled event, skip the callback
1600 
1601 			// fixme: how to implement moved_from moved_to for rename?
1602 		}
1603 		catch (Exception e) 
1604 		{
1605 			try setInternalError!"compareFiles"(Status.ERROR, "Fatal error in file comparison: " ~ e.toString()); catch {}
1606 			return false;
1607 		}
1608 		return true;
1609 	}
1610 
1611 	// socket must not be connected
1612 	bool setNonBlock(fd_t fd) {
1613 		import core.sys.posix.fcntl : fcntl, F_GETFL, F_SETFL, O_NONBLOCK;
1614 		int flags = fcntl(fd, F_GETFL);
1615 		flags |= O_NONBLOCK;
1616 		int err = fcntl(fd, F_SETFL, flags);
1617 		if (catchError!"F_SETFL O_NONBLOCK"(err)) {
1618 			closeSocket(fd, false);
1619 			return false;
1620 		}
1621 		return true;
1622 	}
1623 	
1624 	bool onTCPAccept(fd_t fd, TCPAcceptHandler del, int events)
1625 	{
1626 		import libasync.internals.socket_compat : AF_INET, AF_INET6, socklen_t, accept4, accept;
1627 		enum O_NONBLOCK     = 0x800;    // octal    04000
1628 
1629 		static if (EPOLL) 
1630 		{
1631 			const uint epoll_events = cast(uint) events;
1632 			const bool incoming = cast(bool) (epoll_events & EPOLLIN);
1633 			const bool error = cast(bool) (epoll_events & EPOLLERR);
1634 		}
1635 		else 
1636 		{
1637 			const short kqueue_events = cast(short) (events >> 16);
1638 			const ushort kqueue_flags = cast(ushort) (events & 0xffff);
1639 			const bool incoming = cast(bool)(kqueue_events & EVFILT_READ);
1640 			const bool error = cast(bool)(kqueue_flags & EV_ERROR);
1641 		}
1642 		
1643 		if (incoming) { // accept incoming connection
1644 			do {
1645 				NetworkAddress addr;
1646 				addr.family = AF_INET;
1647 				socklen_t addrlen = addr.sockAddrLen;
1648 
1649 				bool ret;
1650 				static if (EPOLL) {
1651 					/// Accept the connection and create a client socket
1652 					fd_t csock = accept4(fd, addr.sockAddr, &addrlen, O_NONBLOCK);
1653 
1654 					if (catchError!".accept"(csock)) {
1655 						ret = false;
1656 						return ret;
1657 					}
1658 				} else /* if KQUEUE */ {
1659 					fd_t csock = accept(fd, addr.sockAddr, &addrlen);
1660 					
1661 					if (catchError!".accept"(csock)) {
1662 						ret = false;
1663 						return ret;
1664 					}
1665 					
1666 					// Make non-blocking so subsequent calls to recv/send return immediately
1667 					if (!setNonBlock(csock)) {
1668 						ret = false;
1669 						return ret;
1670 					}
1671 				}
1672 
1673 				// Set client address family based on address length
1674 				if (addrlen > addr.sockAddrLen)
1675 					addr.family = AF_INET6;
1676 				if (addrlen == socklen_t.init) {
1677 					setInternalError!"addrlen"(Status.ABORT);
1678 					import core.sys.posix.unistd : close;
1679 					close(csock);
1680 					{
1681 						ret = false;
1682 						return ret;
1683 					}
1684 				}
1685 
1686 				// Allocate a new connection handler object
1687 				AsyncTCPConnection conn;
1688 				try conn = ThreadMem.alloc!AsyncTCPConnection(m_evLoop);
1689 				catch (Exception e){ assert(false, "Allocation failure"); }
1690 				conn.peer = addr;
1691 				conn.socket = csock;
1692 				conn.inbound = true;
1693 
1694 				nothrow bool closeClient() {
1695 					try ThreadMem.free(conn);
1696 					catch (Exception e){ assert(false, "Free failure"); }
1697 					closeSocket(csock, true, true);
1698 					{
1699 						ret = false;
1700 						return ret;
1701 					}
1702 				}
1703 
1704 				// Get the connection handler from the callback
1705 				TCPEventHandler evh;
1706 				try {
1707 					evh = del(conn);
1708 					if (evh == TCPEventHandler.init || !initTCPConnection(csock, conn, evh, true)) {
1709 						try log("Failed to connect"); catch {}
1710 						return closeClient();
1711 					}
1712 					try log("Connection Started with " ~ csock.to!string); catch {}
1713 				}
1714 				catch (Exception e) {
1715 					log("Close socket");
1716 					return closeClient();
1717 				}
1718 
1719 				// Announce connection state to the connection handler
1720 				try {
1721 					log("Connected to: " ~ addr.toString());
1722 					evh.conn.connected = true;
1723 					evh(TCPEvent.CONNECT);
1724 				}
1725 				catch (Exception e) {
1726 					setInternalError!"del@TCPEvent.CONNECT"(Status.ABORT);
1727 					{
1728 						ret = false;
1729 						return ret;
1730 					}
1731 				}
1732 			} while(true);
1733 
1734 		}
1735 		
1736 		if (error) { // socket failure
1737 			m_status.text = "listen socket error";
1738 			int err;
1739 			import libasync.internals.socket_compat : getsockopt, socklen_t, SOL_SOCKET, SO_ERROR;
1740 			socklen_t len = int.sizeof;
1741 			getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len);
1742 			m_error = cast(error_t) err;
1743 			m_status.code = Status.ABORT;
1744 			static if(LOG) log(m_status);
1745 
1746 			// call with null to announce a failure
1747 			try del(null);
1748 			catch(Exception e){ assert(false, "Failure calling TCPAcceptHandler(null)"); }
1749 
1750 			/// close the listener?
1751 			// closeSocket(fd, false);
1752 		}
1753 		return true;
1754 	}
1755 
1756 	bool onUDPTraffic(fd_t fd, UDPHandler del, int events) 
1757 	{
1758 		static if (EPOLL) 
1759 		{
1760 			const uint epoll_events = cast(uint) events;
1761 			const bool read = cast(bool) (epoll_events & EPOLLIN);
1762 			const bool write = cast(bool) (epoll_events & EPOLLOUT);
1763 			const bool error = cast(bool) (epoll_events & EPOLLERR);
1764 		}
1765 		else 
1766 		{
1767 			const short kqueue_events = cast(short) (events >> 16);
1768 			const ushort kqueue_flags = cast(ushort) (events & 0xffff);
1769 			const bool read = cast(bool) (kqueue_events & EVFILT_READ);
1770 			const bool write = cast(bool) (kqueue_events & EVFILT_WRITE);
1771 			const bool error = cast(bool) (kqueue_flags & EV_ERROR);
1772 		}
1773 
1774 		if (read) {
1775 			try {
1776 				del(UDPEvent.READ);
1777 			}
1778 			catch (Exception e) {
1779 				setInternalError!"del@UDPEvent.READ"(Status.ABORT);
1780 				return false;
1781 			}
1782 		}
1783 
1784 		if (write) { 
1785 
1786 			try {
1787 				del(UDPEvent.WRITE);
1788 			}
1789 			catch (Exception e) {
1790 				setInternalError!"del@UDPEvent.WRITE"(Status.ABORT);
1791 				return false;
1792 			}
1793 		}
1794 
1795 		if (error) // socket failure
1796 		{ 
1797 
1798 			import libasync.internals.socket_compat : socklen_t, getsockopt, SOL_SOCKET, SO_ERROR;
1799 			import core.sys.posix.unistd : close;
1800 			int err;
1801 			socklen_t errlen = err.sizeof;
1802 			getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &errlen);
1803 			setInternalError!"EPOLLERR"(Status.ABORT, null, cast(error_t)err);
1804 			close(fd);
1805 		}
1806 		
1807 		return true;
1808 	}
1809 
1810 	bool onTCPTraffic(fd_t fd, TCPEventHandler del, int events, AsyncTCPConnection conn) 
1811 	{
1812 		log("TCP Traffic at FD#" ~ fd.to!string);
1813 
1814 		static if (EPOLL) 
1815 		{
1816 			const uint epoll_events = cast(uint) events;
1817 			const bool connect = ((cast(bool) (epoll_events & EPOLLIN)) || (cast(bool) (epoll_events & EPOLLOUT))) && !conn.disconnecting && !conn.connected;
1818 			bool read = cast(bool) (epoll_events & EPOLLIN);
1819 			const bool write = cast(bool) (epoll_events & EPOLLOUT);
1820 			const bool error = cast(bool) (epoll_events & EPOLLERR);
1821 			const bool close = (cast(bool) (epoll_events & EPOLLRDHUP)) || (cast(bool) (events & EPOLLHUP));
1822 		}
1823 		else /* if KQUEUE */
1824 		{
1825 			const short kqueue_events = cast(short) (events >> 16);
1826 			const ushort kqueue_flags = cast(ushort) (events & 0xffff);
1827 			const bool connect = cast(bool) ((kqueue_events & EVFILT_READ || kqueue_events & EVFILT_WRITE) && !conn.disconnecting && !conn.connected);
1828 			bool read = cast(bool) (kqueue_events & EVFILT_READ) && !connect;
1829 			const bool write = cast(bool) (kqueue_events & EVFILT_WRITE);
1830 			const bool error = cast(bool) (kqueue_flags & EV_ERROR);
1831 			const bool close = cast(bool) (kqueue_flags & EV_EOF);
1832 		}
1833 
1834 		if (error) 
1835 		{
1836 			import libasync.internals.socket_compat : socklen_t, getsockopt, SOL_SOCKET, SO_ERROR;
1837 			int err;
1838 			try log("Also got events: " ~ connect.to!string ~ " c " ~ read.to!string ~ " r " ~ write.to!string ~ " write"); catch {}
1839 			socklen_t errlen = err.sizeof;
1840 			getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &errlen);
1841 			setInternalError!"EPOLLERR"(Status.ABORT, null, cast(error_t)err);
1842 			try 
1843 				del(TCPEvent.ERROR);
1844 			catch (Exception e)
1845 			{ 
1846 				setInternalError!"del@TCPEvent.ERROR"(Status.ABORT);
1847 				// ignore failure...
1848 			}
1849 			return false;
1850 		}
1851 
1852 		
1853 		if (connect) 
1854 		{
1855 			try log("!connect"); catch {}
1856 			conn.connected = true;
1857 			try del(TCPEvent.CONNECT);
1858 			catch (Exception e) {
1859 				setInternalError!"del@TCPEvent.CONNECT"(Status.ABORT);
1860 				return false;
1861 			}
1862 			return true;
1863 		}
1864 
1865 		
1866 		if (write && conn.connected && !conn.disconnecting && conn.writeBlocked) 
1867 		{
1868 			conn.writeBlocked = false;
1869 			try log("!write"); catch {}
1870 			try del(TCPEvent.WRITE);
1871 			catch (Exception e) {
1872 				setInternalError!"del@TCPEvent.WRITE"(Status.ABORT);
1873 				return false;
1874 			}
1875 		}
1876 		else {
1877 			read = true;
1878 		}
1879 
1880 		if (read && conn.connected && !conn.disconnecting)
1881 		{
1882 			try log("!read"); catch {}
1883 			try del(TCPEvent.READ);
1884 			catch (Exception e) {
1885 				setInternalError!"del@TCPEvent.READ"(Status.ABORT);
1886 				return false;
1887 			}
1888 		}
1889 
1890 		if (close && conn.connected && !conn.disconnecting) 
1891 		{
1892 			try log("!close"); catch {}
1893 			// todo: See if this hack is still necessary
1894 			if (!conn.connected && conn.disconnecting)
1895 				return true;
1896 			
1897 			try del(TCPEvent.CLOSE);
1898 			catch (Exception e) {
1899 				setInternalError!"del@TCPEvent.CLOSE"(Status.ABORT);
1900 				return false;
1901 			}
1902 
1903 			// Careful here, the delegate might have closed the connection already
1904 			if (conn.connected) {
1905 				closeSocket(fd, !conn.disconnecting, conn.connected);
1906 
1907 				m_status.code = Status.ABORT;
1908 				conn.disconnecting = true;
1909 				conn.connected = false;
1910 				conn.writeBlocked = true;
1911 				del.conn.socket = 0;
1912 				
1913 				try ThreadMem.free(del.conn.evInfo);
1914 				catch (Exception e){ assert(false, "Error freeing resources"); }
1915 				
1916 				if (del.conn.inbound) {
1917 					log("Freeing inbound connection");
1918 					try ThreadMem.free(del.conn);
1919 					catch (Exception e){ assert(false, "Error freeing resources"); }
1920 				}
1921 			}
1922 		}
1923 		return true;
1924 	}
1925 	
1926 	bool initUDPSocket(fd_t fd, AsyncUDPSocket ctxt, UDPHandler del)
1927 	{
1928 		import libasync.internals.socket_compat : bind;
1929 		import core.sys.posix.unistd;
1930 
1931 		fd_t err;
1932 
1933 		EventObject eo;
1934 		eo.udpHandler = del;
1935 		EventInfo* ev;
1936 		try ev = ThreadMem.alloc!EventInfo(fd, EventType.UDPSocket, eo, m_instanceId);
1937 		catch (Exception e){ assert(false, "Allocation error"); }
1938 		ctxt.evInfo = ev;
1939 		nothrow bool closeAll() {
1940 			try ThreadMem.free(ev);
1941 			catch(Exception e){ assert(false, "Failed to free resources"); }
1942 			ctxt.evInfo = null;
1943 			// socket will be closed by caller if return false
1944 			return false;
1945 		}
1946 
1947 		static if (EPOLL)
1948 		{
1949 			epoll_event _event;
1950 			_event.data.ptr = ev;
1951 			_event.events = EPOLLIN | EPOLLOUT | EPOLLET;
1952 			err = epoll_ctl(m_epollfd, EPOLL_CTL_ADD, fd, &_event);
1953 			if (catchError!"epoll_ctl"(err)) {
1954 				return closeAll();
1955 			}
1956 			nothrow void deregisterEvent()
1957 			{
1958 				epoll_ctl(m_epollfd, EPOLL_CTL_DEL, fd, &_event);
1959 			}
1960 		}
1961 		else /* if KQUEUE */
1962 		{
1963 			kevent_t[2] _event;
1964 			EV_SET(&(_event[0]), fd, EVFILT_READ, EV_ADD | EV_ENABLE, 0, 0, ev);
1965 			EV_SET(&(_event[1]), fd, EVFILT_WRITE, EV_ADD | EV_ENABLE, 0, 0, ev);
1966 			err = kevent(m_kqueuefd, &(_event[0]), 2, null, 0, null);
1967 			if (catchError!"kevent_add_udp"(err))
1968 				return closeAll();
1969 			
1970 			nothrow void deregisterEvent() {
1971 				EV_SET(&(_event[0]), fd, EVFILT_READ, EV_DELETE | EV_DISABLE, 0, 0, null);
1972 				EV_SET(&(_event[1]), fd, EVFILT_WRITE, EV_DELETE | EV_DISABLE, 0, 0, null);
1973 				kevent(m_kqueuefd, &(_event[0]), 2, null, 0, cast(libasync.internals.kqueue.timespec*) null);
1974 			}
1975 
1976 		}
1977 
1978 		/// Start accepting packets
1979 		err = bind(fd, ctxt.local.sockAddr, ctxt.local.sockAddrLen);
1980 		if (catchError!"bind"(err)) {
1981 			deregisterEvent();
1982 			return closeAll();
1983 		}
1984 
1985 		return true;
1986 	}
1987 
1988 	
1989 	bool initTCPListener(fd_t fd, AsyncTCPListener ctxt, TCPAcceptHandler del, bool reusing = false)
1990 	in {
1991 		assert(ctxt.local !is NetworkAddress.init);
1992 	}
1993 	body {
1994 		import libasync.internals.socket_compat : bind, listen, SOMAXCONN;
1995 		fd_t err;
1996 
1997 		/// Create callback object
1998 		EventObject eo;
1999 		eo.tcpAcceptHandler = del;
2000 		EventInfo* ev;
2001 
2002 		try ev = ThreadMem.alloc!EventInfo(fd, EventType.TCPAccept, eo, m_instanceId);
2003 		catch (Exception e){ assert(false, "Allocation error"); }
2004 		ctxt.evInfo = ev;
2005 		nothrow bool closeAll() {
2006 			try ThreadMem.free(ev);
2007 			catch(Exception e){ assert(false, "Failed free"); }
2008 			ctxt.evInfo = null;
2009 			// Socket is closed by run()
2010 			//closeSocket(fd, false);
2011 			return false;
2012 		}
2013 
2014 		/// Add socket to event loop
2015 		static if (EPOLL)
2016 		{
2017 			epoll_event _event;
2018 			_event.data.ptr = ev;
2019 			_event.events = EPOLLIN | EPOLLET;
2020 			err = epoll_ctl(m_epollfd, EPOLL_CTL_ADD, fd, &_event);
2021 			if (catchError!"epoll_ctl_add"(err))
2022 				return closeAll();
2023 
2024 			nothrow void deregisterEvent() {
2025 				// epoll cleans itself when closing the socket
2026 			}
2027 		}
2028 		else /* if KQUEUE */
2029 		{
2030 			kevent_t _event;
2031 			EV_SET(&_event, fd, EVFILT_READ, EV_ADD | EV_ENABLE, 0, 0, ev);
2032 			err = kevent(m_kqueuefd, &_event, 1, null, 0, null);
2033 			if (catchError!"kevent_add_listener"(err))
2034 				return closeAll();
2035 
2036 			nothrow void deregisterEvent() {
2037 				EV_SET(&_event, fd, EVFILT_READ, EV_CLEAR | EV_DISABLE, 0, 0, null);
2038 				kevent(m_kqueuefd, &_event, 1, null, 0, null);
2039 				// wouldn't know how to deal with errors here...
2040 			}
2041 		}
2042 
2043 		/// Bind and listen to socket
2044 		if (!reusing) {
2045 			err = bind(fd, ctxt.local.sockAddr, ctxt.local.sockAddrLen);
2046 			if (catchError!"bind"(err)) {
2047 				deregisterEvent();
2048 				return closeAll();
2049 			}
2050 
2051 			err = listen(fd, SOMAXCONN);
2052 			if (catchError!"listen"(err)) {
2053 				deregisterEvent();
2054 				return closeAll();
2055 			}
2056 
2057 		}
2058 		return true;
2059 	}
2060 
2061 	bool initTCPConnection(fd_t fd, AsyncTCPConnection ctxt, TCPEventHandler del, bool inbound = false)
2062 	in { 
2063 		assert(ctxt.peer.port != 0, "Connecting to an invalid port");
2064 	}
2065 	body {
2066 
2067 		fd_t err;
2068 
2069 		/// Create callback object
2070 		import libasync.internals.socket_compat : connect;
2071 		EventObject eo;
2072 		eo.tcpEvHandler = del;
2073 		EventInfo* ev;
2074 
2075 		try ev = ThreadMem.alloc!EventInfo(fd, EventType.TCPTraffic, eo, m_instanceId);
2076 		catch (Exception e){ assert(false, "Allocation error"); }
2077 		assert(ev !is null);
2078 		ctxt.evInfo = ev;
2079 		nothrow bool destroyEvInfo() {
2080 			try ThreadMem.free(ev);
2081 			catch(Exception e){ assert(false, "Failed to free resources"); }
2082 			ctxt.evInfo = null;
2083 
2084 			// Socket will be closed by run()
2085 			// closeSocket(fd, false);
2086 			return false;
2087 		}
2088 
2089 		/// Add socket and callback object to event loop
2090 		static if (EPOLL)
2091 		{
2092 			epoll_event _event = void;
2093 			_event.data.ptr = ev;
2094 			_event.events = 0 | EPOLLIN | EPOLLOUT | EPOLLERR | EPOLLHUP | EPOLLRDHUP | EPOLLET;
2095 			err = epoll_ctl(m_epollfd, EPOLL_CTL_ADD, fd, &_event);
2096 			log("Connection FD#" ~ fd.to!string ~ " added to " ~ m_epollfd.to!string);
2097 			if (catchError!"epoll_ctl_add"(err))
2098 				return destroyEvInfo();
2099 
2100 			nothrow void deregisterEvent() {
2101 				// will be handled automatically when socket is closed
2102 			}
2103 		}
2104 		else /* if KQUEUE */
2105 		{
2106 			kevent_t[2] events = void;
2107 			try log("Register event ptr " ~ ev.to!string); catch {}
2108 			assert(ev.evType == EventType.TCPTraffic, "Bad event type for TCP Connection");
2109 			EV_SET(&(events[0]), fd, EVFILT_READ, EV_ADD | EV_ENABLE | EV_CLEAR, 0, 0, cast(void*) ev);
2110 			EV_SET(&(events[1]), fd, EVFILT_WRITE, EV_ADD | EV_ENABLE | EV_CLEAR, 0, 0, cast(void*) ev);
2111 			assert((cast(EventInfo*)events[0].udata) == ev && (cast(EventInfo*)events[1].udata) == ev);
2112 			assert((cast(EventInfo*)events[0].udata).owner == m_instanceId && (cast(EventInfo*)events[1].udata).owner == m_instanceId);
2113 			err = kevent(m_kqueuefd, &(events[0]), 2, null, 0, null);
2114 			if (catchError!"kevent_add_tcp"(err))
2115 				return destroyEvInfo();
2116 
2117 			// todo: verify if this allocates on the GC?
2118 			nothrow void deregisterEvent() {
2119 				EV_SET(&(events[0]), fd, EVFILT_READ, EV_DELETE | EV_DISABLE, 0, 0, null);
2120 				EV_SET(&(events[1]), fd, EVFILT_WRITE, EV_DELETE | EV_DISABLE, 0, 0, null);
2121 				kevent(m_kqueuefd, &(events[0]), 2, null, 0, null);
2122 				// wouldn't know how to deal with errors here...
2123 			}
2124 		}
2125 
2126 		// Inbound objects are already connected
2127 		if (inbound) return true;
2128 
2129 		// Connect is blocking, but this makes the socket non-blocking for send/recv
2130 		if (!setNonBlock(fd)) {
2131 			deregisterEvent();
2132 			return destroyEvInfo();
2133 		}
2134 
2135 		/// Start the connection
2136 		err = connect(fd, ctxt.peer.sockAddr, ctxt.peer.sockAddrLen);
2137 		if (catchErrorsEq!"connect"(err, [ tuple(cast(fd_t)SOCKET_ERROR, EPosix.EINPROGRESS, Status.ASYNC) ]))
2138 			return true;
2139 		if (catchError!"connect"(err)) {
2140 			deregisterEvent();
2141 			return destroyEvInfo();
2142 		}
2143 
2144 		return true;
2145 	}
2146 
2147 	bool catchError(string TRACE, T)(T val, T cmp = SOCKET_ERROR)
2148 		if (isIntegral!T)
2149 	{
2150 		if (val == cmp) {
2151 			m_status.text = TRACE;
2152 			m_error = lastError();
2153 			m_status.code = Status.ABORT;
2154 			static if(LOG) log(m_status);
2155 			return true;
2156 		}
2157 		return false;
2158 	}
2159 
2160 	bool catchSocketError(string TRACE)(fd_t fd)
2161 	{
2162 		m_status.text = TRACE;
2163 		int err;
2164 		import libasync.internals.socket_compat : getsockopt, socklen_t, SOL_SOCKET, SO_ERROR;
2165 		socklen_t len = int.sizeof;
2166 		getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len);
2167 		m_error = cast(error_t) err;
2168 		if (m_error != EPosix.EOK) {
2169 			m_status.code = Status.ABORT;
2170 			static if(LOG) log(m_status);
2171 			return true;
2172 		}
2173 
2174 		return false;
2175 	}
2176 
2177 	bool catchEvLoopErrors(string TRACE, T)(T val, Tuple!(T, Status)[] cmp ...)
2178 		if (isIntegral!T)
2179 	{
2180 		if (val == SOCKET_ERROR) {
2181 			int err = errno;
2182 			foreach (validator ; cmp) {
2183 				if (errno == validator[0]) {
2184 					m_status.text = TRACE;
2185 					m_error = lastError();
2186 					m_status.code = validator[1];
2187 					static if(LOG) log(m_status);
2188 					return true;
2189 				}
2190 			}
2191 
2192 			m_status.text = TRACE;
2193 			m_status.code = Status.EVLOOP_FAILURE;
2194 			m_error = lastError();
2195 			log(m_status);
2196 			return true;
2197 		}
2198 		return false;
2199 	}
2200 
2201 	/**
2202 	 * If the value at val matches the tuple first argument T, get the last error,
2203 	 * and if the last error matches tuple second argument error_t, set the Status as
2204 	 * tuple third argument Status.
2205 	 * 
2206 	 * Repeats for each comparison tuple until a match in which case returns true.
2207 	 */
2208 	bool catchErrorsEq(string TRACE, T)(T val, Tuple!(T, error_t, Status)[] cmp ...)
2209 		if (isIntegral!T)
2210 	{
2211 		error_t err;
2212 		foreach (validator ; cmp) {
2213 			if (val == validator[0]) {
2214 				if (err is EPosix.init) err = lastError();
2215 				if (err == validator[1]) {
2216 					m_status.text = TRACE;
2217 					m_status.code = validator[2];
2218 					if (m_status.code == Status.EVLOOP_TIMEOUT) {
2219 						log(m_status);
2220 						break;
2221 					}
2222 					m_error = lastError();
2223 					static if(LOG) log(m_status);
2224 					return true;
2225 				}
2226 			}
2227 		}
2228 		return false;
2229 	}
2230 
2231 	
2232 	error_t lastError() {
2233 		try {
2234 			return cast(error_t) errno;
2235 		} catch(Exception e) {
2236 			return EPosix.EACCES;
2237 		}
2238 
2239 	}
2240 	
2241 	void log(StatusInfo val)
2242 	{
2243 		static if (LOG) {
2244 			import std.stdio;
2245 			try {
2246 				writeln("Backtrace: ", m_status.text);
2247 				writeln(" | Status:  ", m_status.code);
2248 				writeln(" | Error: " , m_error);
2249 				if ((m_error in EPosixMessages) !is null)
2250 					writeln(" | Message: ", EPosixMessages[m_error]);
2251 			} catch(Exception e) {
2252 				return;
2253 			}
2254 		}
2255 	}
2256 
2257 	void log(T)(T val)
2258 	{
2259 		static if (LOG) {
2260 			import std.stdio;
2261 			try {
2262 				writeln(val);
2263 			} catch(Exception e) {
2264 				return;
2265 			}
2266 		}
2267 	}
2268 
2269 	NetworkAddress getAddressInfo(addrinfo)(in string host, ushort port, bool ipv6, bool tcp, ref addrinfo hints) 
2270 	{
2271 		m_status = StatusInfo.init;
2272 		import libasync.internals.socket_compat : AF_INET, AF_INET6, SOCK_DGRAM, SOCK_STREAM, IPPROTO_TCP, IPPROTO_UDP, freeaddrinfo, getaddrinfo;
2273 
2274 		NetworkAddress addr;
2275 		addrinfo* infos;
2276 		error_t err;
2277 		if (ipv6) {
2278 			addr.family = AF_INET6;
2279 			hints.ai_family = AF_INET6;
2280 		}
2281 		else {
2282 			addr.family = AF_INET;
2283 			hints.ai_family = AF_INET;
2284 		}
2285 		if (tcp) {
2286 			hints.ai_socktype = SOCK_STREAM;
2287 			hints.ai_protocol = IPPROTO_TCP;
2288 		}
2289 		else {
2290 			hints.ai_socktype = SOCK_DGRAM;
2291 			hints.ai_protocol = IPPROTO_UDP;
2292 		}
2293 
2294 		static if (LOG) {
2295 			log("Resolving " ~ host ~ ":" ~ port.to!string);
2296 		}
2297 
2298 		auto chost = host.toStringz();
2299 
2300 		if (port != 0) {
2301 			addr.port = port;
2302 			const(char)* cPort = cast(const(char)*) port.to!string.toStringz;
2303 			err = cast(error_t) getaddrinfo(chost, cPort, &hints, &infos);
2304 		}
2305 		else {
2306 			err = cast(error_t) getaddrinfo(chost, null, &hints, &infos);
2307 		}
2308 
2309 		if (err != EPosix.EOK) {
2310 			setInternalError!"getAddressInfo"(Status.ERROR, string.init, err);
2311 			return NetworkAddress.init;
2312 		}
2313 		ubyte* pAddr = cast(ubyte*) infos.ai_addr;
2314 		ubyte* data = cast(ubyte*) addr.sockAddr;
2315 		data[0 .. infos.ai_addrlen] = pAddr[0 .. infos.ai_addrlen]; // perform bit copy
2316 		freeaddrinfo(infos);
2317 		return addr;
2318 	}
2319 
2320 	
2321 	
2322 }
2323 
2324 
2325 static if (!EPOLL)
2326 {
2327 	import std.container : Array;
2328 	import core.sync.mutex : Mutex;
2329 	import core.sync.rwmutex : ReadWriteMutex;
2330 	size_t g_evIdxCapacity;
2331 	Array!size_t g_evIdxAvailable;
2332 
2333 	// called on run
2334 	nothrow size_t createIndex() {
2335 		size_t idx;
2336 		import std.algorithm : max;
2337 		try {
2338 			
2339 			size_t getIdx() {
2340 				
2341 				if (!g_evIdxAvailable.empty) {
2342 					immutable size_t ret = g_evIdxAvailable.back;
2343 					g_evIdxAvailable.removeBack();
2344 					return ret;
2345 				}
2346 				return 0;
2347 			}
2348 			
2349 			idx = getIdx();
2350 			if (idx == 0) {
2351 				import std.range : iota;
2352 				g_evIdxAvailable.insert( iota(g_evIdxCapacity, max(32, g_evIdxCapacity * 2), 1) );
2353 				g_evIdxCapacity = max(32, g_evIdxCapacity * 2);
2354 				idx = getIdx();
2355 			}
2356 			
2357 		} catch (Throwable e) {
2358 			static if (DEBUG) {
2359 				import std.stdio : writeln;
2360 				try writeln(e.toString()); catch {}
2361 			}
2362 
2363 		}
2364 		return idx;
2365 	}
2366 
2367 	nothrow void destroyIndex(AsyncNotifier ctxt) {
2368 		try {
2369 			g_evIdxAvailable.insert(ctxt.id);		
2370 		}
2371 		catch (Exception e) {
2372 			assert(false, "Error destroying index: " ~ e.msg);
2373 		}
2374 	}
2375 
2376 	nothrow void destroyIndex(AsyncTimer ctxt) {
2377 		try {
2378 			g_evIdxAvailable.insert(ctxt.id);		
2379 		}
2380 		catch (Exception e) {
2381 			assert(false, "Error destroying index: " ~ e.msg);
2382 		}
2383 	}
2384 
2385 	size_t* g_threadId;
2386 	size_t g_idxCapacity;
2387 	Array!size_t g_idxAvailable;
2388 
2389 	__gshared ReadWriteMutex gs_queueMutex;
2390 	__gshared Array!(Array!AsyncSignal) gs_signalQueue;
2391 	__gshared Array!(Array!size_t) gs_idxQueue; // signals notified
2392 
2393 	
2394 	// loop
2395 	nothrow bool popSignals(ref AsyncSignal[] sigarr) {
2396 		bool more;
2397 		try {
2398 			foreach (ref AsyncSignal sig; sigarr) {
2399 				if (!sig)
2400 					break;
2401 				sig = null;
2402 			}
2403 			size_t len;
2404 			synchronized(gs_queueMutex.reader) {
2405 
2406 				if (gs_idxQueue.length <= *g_threadId || gs_idxQueue[*g_threadId].empty)
2407 					return false;
2408 
2409 				len = gs_idxQueue[*g_threadId].length;
2410 				import std.stdio;
2411 				if (sigarr.length < len) {
2412 					more = true;
2413 					len = sigarr.length;
2414 				}
2415 
2416 				size_t i;
2417 				foreach (size_t idx; gs_idxQueue[*g_threadId][0 .. len]){
2418 					sigarr[i] = gs_signalQueue[*g_threadId][idx];
2419 					i++;
2420 				}
2421 			}
2422 
2423 			synchronized (gs_queueMutex.writer) {
2424 				gs_idxQueue[*g_threadId].linearRemove(gs_idxQueue[*g_threadId][0 .. len]);
2425 			}
2426 		}
2427 		catch (Exception e) {
2428 			assert(false, "Could not get pending signals: " ~ e.msg);
2429 		}
2430 		return more;
2431 	}
2432 
2433 	// notify
2434 	nothrow void addSignal(shared AsyncSignal ctxt) {
2435 		try {
2436 			size_t thread_id = ctxt.threadId;
2437 			bool must_resize;
2438 			import std.stdio;
2439 			synchronized (gs_queueMutex.writer) {
2440 				if (gs_idxQueue.empty || gs_idxQueue.length < thread_id + 1) {
2441 					gs_idxQueue.reserve(thread_id + 1);
2442 					foreach (i; gs_idxQueue.length .. gs_idxQueue.capacity) {
2443 						gs_idxQueue.insertBack(Array!size_t.init);
2444 					}
2445 				}
2446 				if (gs_idxQueue[thread_id].empty)
2447 				{
2448 					gs_idxQueue[thread_id].reserve(32);
2449 				}
2450 
2451 				gs_idxQueue[thread_id].insertBack(ctxt.id);
2452 
2453 			}
2454 
2455 		}
2456 		catch (Exception e) {
2457 			assert(false, "Array error: " ~ e.msg);
2458 		}
2459 	}
2460 
2461 	// called on run
2462 	nothrow size_t createIndex(shared AsyncSignal ctxt) {
2463 		size_t idx;
2464 		import std.algorithm : max;
2465 		try {
2466 			bool must_resize;
2467 
2468 			synchronized (gs_queueMutex.reader) {
2469 				if (gs_signalQueue.length < *g_threadId)
2470 					must_resize = true;
2471 			}
2472 
2473 			/// make sure the signal queue is big enough for this thread ID
2474 			if (must_resize) {
2475 				synchronized (gs_queueMutex.writer) {
2476 					while (gs_signalQueue.length <= *g_threadId) 
2477 						gs_signalQueue.insertBack(Array!AsyncSignal.init);
2478 				}
2479 			}
2480 
2481 			size_t getIdx() {
2482 
2483 				if (!g_idxAvailable.empty) {
2484 					immutable size_t ret = g_idxAvailable.back;
2485 					g_idxAvailable.removeBack();
2486 					return ret;
2487 				}
2488 				return 0;
2489 			}
2490 
2491 			idx = getIdx();
2492 			if (idx == 0) {
2493 				import std.range : iota;
2494 				g_idxAvailable.insert( iota(g_idxCapacity + 1,  max(32, g_idxCapacity * 2), 1) );
2495 				g_idxCapacity = g_idxAvailable[$-1];
2496 				idx = getIdx();
2497 			}
2498 
2499 			synchronized (gs_queueMutex.writer) {
2500 				if (gs_signalQueue.empty || gs_signalQueue.length < *g_threadId + 1) {
2501 					
2502 					gs_signalQueue.reserve(*g_threadId + 1);
2503 					foreach (i; gs_signalQueue.length .. gs_signalQueue.capacity) {
2504 						gs_signalQueue.insertBack(Array!AsyncSignal.init);
2505 					}
2506 					
2507 				}
2508 
2509 				if (gs_signalQueue[*g_threadId].empty || gs_signalQueue[*g_threadId].length < idx + 1) {
2510 					
2511 					gs_signalQueue[*g_threadId].reserve(idx + 1);
2512 					foreach (i; gs_signalQueue[*g_threadId].length .. gs_signalQueue[*g_threadId].capacity) {
2513 						gs_signalQueue[*g_threadId].insertBack(cast(AsyncSignal)null);
2514 					}
2515 					
2516 				}
2517 
2518 				gs_signalQueue[*g_threadId][idx] = cast(AsyncSignal) ctxt;
2519 			}
2520 		} catch {}
2521 
2522 		return idx;
2523 	}
2524 
2525 	// called on kill
2526 	nothrow void destroyIndex(shared AsyncSignal ctxt) {
2527 		try {
2528 			g_idxAvailable.insert(ctxt.id);
2529 			synchronized (gs_queueMutex.writer) {
2530 				gs_signalQueue[*g_threadId][ctxt.id] = null;
2531 			}
2532 		}
2533 		catch (Exception e) {
2534 			assert(false, "Error destroying index: " ~ e.msg);
2535 		}
2536 	}
2537 }
2538 
2539 mixin template TCPConnectionMixins() {
2540 	
2541 	private CleanupData m_impl;
2542 	
2543 	struct CleanupData {
2544 		EventInfo* evInfo;
2545 		bool connected;
2546 		bool disconnecting;
2547 		bool writeBlocked;
2548 	}
2549 	
2550 	@property bool disconnecting() const {
2551 		return m_impl.disconnecting;
2552 	}
2553 
2554 	@property void disconnecting(bool b) {
2555 		m_impl.disconnecting = b;
2556 	}
2557 	
2558 	@property bool connected() const {
2559 		return m_impl.connected;
2560 	}
2561 
2562 	@property void connected(bool b) {
2563 		m_impl.connected = b;
2564 	}
2565 
2566 	@property bool writeBlocked() const {
2567 		return m_impl.writeBlocked;
2568 	}
2569 
2570 	@property void writeBlocked(bool b) {
2571 		m_impl.writeBlocked = b;
2572 	}
2573 
2574 	@property EventInfo* evInfo() {
2575 		return m_impl.evInfo;
2576 	}
2577 	
2578 	@property void evInfo(EventInfo* info) {
2579 		m_impl.evInfo = info;
2580 	}
2581 	
2582 }
2583 
2584 mixin template EvInfoMixinsShared() {
2585 
2586 	private CleanupData m_impl;
2587 	
2588 	shared struct CleanupData {
2589 		EventInfo* evInfo;
2590 	}
2591 
2592 	static if (EPOLL) {
2593 		import core.sys.posix.pthread : pthread_t;
2594 		private pthread_t m_pthreadId;
2595 		synchronized @property pthread_t pthreadId() {
2596 			return cast(pthread_t) m_pthreadId;
2597 		}
2598 		/* todo: support multiple event loops per thread?
2599 		 private ushort m_sigId;
2600 		 synchronized @property ushort sigId() {
2601 		 return cast(ushort)m_loopId;
2602 		 }
2603 		 synchronized @property void sigId(ushort id) {
2604 		 m_loopId = cast(shared)id;
2605 		 }
2606 		 */
2607 	} 
2608 	else /* if KQUEUE */
2609 	{
2610 		private shared(size_t)* m_owner_id;
2611 		synchronized @property size_t threadId() {
2612 			return cast(size_t) *m_owner_id;
2613 		}
2614 	}
2615 
2616 	@property shared(EventInfo*) evInfo() {
2617 		return m_impl.evInfo;
2618 	}
2619 	
2620 	@property void evInfo(shared(EventInfo*) info) {
2621 		m_impl.evInfo = info;
2622 	}
2623 
2624 }
2625 
2626 mixin template EvInfoMixins() {
2627 	
2628 	private CleanupData m_impl;
2629 	
2630 	struct CleanupData {
2631 		EventInfo* evInfo;
2632 	}
2633 	
2634 	@property EventInfo* evInfo() {
2635 		return m_impl.evInfo;
2636 	}
2637 	
2638 	@property void evInfo(EventInfo* info) {
2639 		m_impl.evInfo = info;
2640 	}
2641 }
2642 
2643 union EventObject {
2644 	TCPAcceptHandler tcpAcceptHandler;
2645 	TCPEventHandler tcpEvHandler;
2646 	TimerHandler timerHandler;
2647 	DWHandler dwHandler;
2648 	UDPHandler udpHandler;
2649 	NotifierHandler notifierHandler;
2650 }
2651 
2652 enum EventType : char {
2653 	TCPAccept,
2654 	TCPTraffic,
2655 	UDPSocket,
2656 	Notifier,
2657 	Signal,
2658 	Timer,
2659 	DirectoryWatcher
2660 }
2661 
2662 struct EventInfo {
2663 	fd_t fd;
2664 	EventType evType;
2665 	EventObject evObj;
2666 	ushort owner;
2667 }
2668 
2669 
2670 
2671 /**
2672  Represents a network/socket address. (taken from vibe.core.net)
2673  */
2674 public struct NetworkAddress {
2675 	import libasync.internals.socket_compat : sockaddr, sockaddr_in, sockaddr_in6, AF_INET, AF_INET6;
2676 	private union {
2677 		sockaddr addr;
2678 		sockaddr_in addr_ip4;
2679 		sockaddr_in6 addr_ip6;
2680 	}
2681 
2682 	@property bool ipv6() const pure nothrow { return this.family == AF_INET6; }
2683 
2684 	/** Family (AF_) of the socket address.
2685 	 */
2686 	@property ushort family() const pure nothrow { return addr.sa_family; }
2687 	/// ditto
2688 	@property void family(ushort val) pure nothrow { addr.sa_family = cast(ubyte)val; }
2689 	
2690 	/** The port in host byte order.
2691 	 */
2692 	@property ushort port()
2693 	const pure nothrow {
2694 		switch (this.family) {
2695 			default: assert(false, "port() called for invalid address family.");
2696 			case AF_INET: return ntoh(addr_ip4.sin_port);
2697 			case AF_INET6: return ntoh(addr_ip6.sin6_port);
2698 		}
2699 	}
2700 	/// ditto
2701 	@property void port(ushort val)
2702 	pure nothrow {
2703 		switch (this.family) {
2704 			default: assert(false, "port() called for invalid address family.");
2705 			case AF_INET: addr_ip4.sin_port = hton(val); break;
2706 			case AF_INET6: addr_ip6.sin6_port = hton(val); break;
2707 		}
2708 	}
2709 	
2710 	/** A pointer to a sockaddr struct suitable for passing to socket functions.
2711 	 */
2712 	@property inout(sockaddr)* sockAddr() inout pure nothrow { return &addr; }
2713 	
2714 	/** Size of the sockaddr struct that is returned by sockAddr().
2715 	 */
2716 	@property uint sockAddrLen()
2717 	const pure nothrow {
2718 		switch (this.family) {
2719 			default: assert(false, "sockAddrLen() called for invalid address family.");
2720 			case AF_INET: return addr_ip4.sizeof;
2721 			case AF_INET6: return addr_ip6.sizeof;
2722 		}
2723 	}
2724 	
2725 	@property inout(sockaddr_in)* sockAddrInet4() inout pure nothrow
2726 	in { assert (family == AF_INET); }
2727 	body { return &addr_ip4; }
2728 	
2729 	@property inout(sockaddr_in6)* sockAddrInet6() inout pure nothrow
2730 	in { assert (family == AF_INET6); }
2731 	body { return &addr_ip6; }
2732 	
2733 	/** Returns a string representation of the IP address
2734 	 */
2735 	string toAddressString()
2736 	const {
2737 		import std.array : appender;
2738 		import std.string : format;
2739 		import std.format : formattedWrite;
2740 		
2741 		switch (this.family) {
2742 			default: assert(false, "toAddressString() called for invalid address family.");
2743 			case AF_INET:
2744 				ubyte[4] ip = (cast(ubyte*)&addr_ip4.sin_addr.s_addr)[0 .. 4];
2745 				return format("%d.%d.%d.%d", ip[0], ip[1], ip[2], ip[3]);
2746 			case AF_INET6:
2747 				ubyte[16] ip = addr_ip6.sin6_addr.s6_addr;
2748 				auto ret = appender!string();
2749 				ret.reserve(40);
2750 				foreach (i; 0 .. 8) {
2751 					if (i > 0) ret.put(':');
2752 					ret.formattedWrite("%x", bigEndianToNative!ushort(cast(ubyte[2])ip[i*2 .. i*2+2].ptr[0 .. 2]));
2753 				}
2754 				return ret.data;
2755 		}
2756 	}
2757 	
2758 	/** Returns a full string representation of the address, including the port number.
2759 	 */
2760 	string toString()
2761 	const {
2762 		
2763 		import std.string : format;
2764 		
2765 		auto ret = toAddressString();
2766 		switch (this.family) {
2767 			default: assert(false, "toString() called for invalid address family.");
2768 			case AF_INET: return ret ~ format(":%s", port);
2769 			case AF_INET6: return format("[%s]:%s", ret, port);
2770 		}
2771 	}
2772 	
2773 }
2774 
2775 private pure nothrow {
2776 	import std.bitmanip;
2777 	
2778 	ushort ntoh(ushort val)
2779 	{
2780 		version (LittleEndian) return swapEndian(val);
2781 		else version (BigEndian) return val;
2782 		else static assert(false, "Unknown endianness.");
2783 	}
2784 	
2785 	ushort hton(ushort val)
2786 	{
2787 		version (LittleEndian) return swapEndian(val);
2788 		else version (BigEndian) return val;
2789 		else static assert(false, "Unknown endianness.");
2790 	}
2791 }