1 module libasync.posix;
2 
3 version (Posix):
4 
5 import libasync.types;
6 import std.string : toStringz;
7 import std.conv : to;
8 import std.datetime : Duration, msecs, seconds, SysTime;
9 import std.traits : isIntegral;
10 import std.typecons : Tuple, tuple;
11 import std.container : Array;
12 
13 import core.stdc.errno;
14 import libasync.events;
15 import libasync.internals.memory : FreeListObjectAlloc;
16 import libasync.internals.hashmap;
17 import libasync.internals.path;
18 import core.sys.posix.signal;
19 import libasync.posix2;
20 import core.sync.mutex;
21 enum SOCKET_ERROR = -1;
22 alias fd_t = int;
23 
24 
25 version(linux) {
26 	import libasync.internals.epoll;
27 	const EPOLL = true;
28 	extern(C) nothrow @nogc {
29 		int __libc_current_sigrtmin();
30 		int __libc_current_sigrtmax();
31 	}
32 	bool g_signalsBlocked;
33 	package nothrow void blockSignals() {
34 		try {
35 			/// Block signals to reserve SIGRTMIN .. " +30 for AsyncSignal
36 			sigset_t mask;
37 			// todo: use more signals for more event loops per thread.. (is this necessary?)
38 			//foreach (j; __libc_current_sigrtmin() .. __libc_current_sigrtmax() + 1) {
39 				//import std.stdio : writeln; 
40 				//try writeln("Blocked signal " ~ (__libc_current_sigrtmin() + j).to!string ~ " in instance " ~ m_instanceId.to!string); catch {}
41 			sigemptyset(&mask);
42 			sigaddset(&mask, cast(int) __libc_current_sigrtmin());
43 			pthread_sigmask(SIG_BLOCK, &mask, null);
44 			//}
45 		} catch {}
46 	}
47 	static this() {
48 		blockSignals();
49 		g_signalsBlocked = true;
50 	}
51 }
52 version(OSX) {
53 	import libasync.internals.kqueue;
54 	const EPOLL = false;
55 }
56 version(FreeBSD) {
57 	import libasync.internals.kqueue;
58 	const EPOLL = false;
59 }
60 
61 __gshared Mutex g_mutex;
62 
63 static if (!EPOLL) {
64 	private struct DWFileInfo {
65 		fd_t folder;
66 		Path path;
67 		SysTime lastModified;
68 		bool is_dir;
69 	}
70 }
71 
72 private struct DWFolderInfo {
73 	WatchInfo wi;
74 	fd_t fd;
75 }
76 
77 package struct EventLoopImpl {
78 	static if (EPOLL) {
79 		pragma(msg, "Using Linux EPOLL for events");
80 	}
81 	else /* if KQUEUE */
82 	{
83 		pragma(msg, "Using FreeBSD KQueue for events");
84 	}
85 
86 package:
87 	alias error_t = EPosix;
88 
89 nothrow:
90 private:
91 
92 	/// members
93 	EventLoop m_evLoop;
94 	ushort m_instanceId;
95 	bool m_started;
96 	StatusInfo m_status;
97 	error_t m_error = EPosix.EOK;
98 	EventInfo* m_evSignal;
99 	static if (EPOLL){
100 		fd_t m_epollfd;
101 		HashMap!(Tuple!(fd_t, uint), DWFolderInfo) m_dwFolders; // uint = inotify_add_watch(Path)
102 	}
103 	else /* if KQUEUE */
104 	{
105 		fd_t m_kqueuefd;
106 		HashMap!(fd_t, EventInfo*) m_watchers; // fd_t = id++ per AsyncDirectoryWatcher
107 		HashMap!(fd_t, DWFolderInfo) m_dwFolders; // fd_t = open(folder)
108 		HashMap!(fd_t, DWFileInfo) m_dwFiles; // fd_t = open(file)
109 		HashMap!(fd_t, Array!(DWChangeInfo)*) m_changes; // fd_t = id++ per AsyncDirectoryWatcher
110 
111 	}
112 
113 package:
114 	
115 	/// workaround for IDE indent bug on too big files
116 	mixin RunKill!();
117 
118 	@property bool started() const {
119 		return m_started;
120 	}
121 
122 	bool init(EventLoop evl)
123 	in { assert(!m_started); }
124 	body
125 	{
126 
127 		import core.atomic;
128 		shared static ushort i;
129 		string* failer = null;
130 
131 
132 		m_instanceId = i;
133 		static if (!EPOLL) g_threadId = new size_t(cast(size_t)m_instanceId);
134 
135 		core.atomic.atomicOp!"+="(i, cast(ushort) 1);
136 		m_evLoop = evl;
137 
138 		import core.thread;
139 		try Thread.getThis().priority = Thread.PRIORITY_MAX;
140 		catch (Exception e) { assert(false, "Could not set thread priority"); }
141 
142 		try
143 			if (!g_mutex)
144 				g_mutex = new Mutex;
145 		catch {}
146 
147 		static if (EPOLL)
148 		{
149 
150 			if (!g_signalsBlocked)
151 				blockSignals();
152 			assert(m_instanceId <= __libc_current_sigrtmax(), "An additional event loop is unsupported due to SIGRTMAX restrictions in Linux Kernel");
153 			m_epollfd = epoll_create1(0);
154 
155 			if (catchError!"epoll_create1"(m_epollfd))
156 				return false;
157 				
158 			import core.sys.linux.sys.signalfd;
159 			import core.thread : getpid;
160 
161 			fd_t err;
162 			fd_t sfd;
163 
164 			sigset_t mask;
165 
166 			try {
167 				sigemptyset(&mask);
168 				sigaddset(&mask, __libc_current_sigrtmin());
169 				err = pthread_sigmask(SIG_BLOCK, &mask, null);
170 				if (catchError!"sigprocmask"(err))
171 				{
172 					m_status.code = Status.EVLOOP_FAILURE;
173 					return false;
174 				}
175 			} catch { }
176 
177 
178 
179 			sfd = signalfd(-1, &mask, SFD_NONBLOCK);
180 			assert(sfd > 0, "Failed to setup signalfd in epoll");
181 
182 			EventType evtype;
183 
184 			epoll_event _event;
185 			_event.events = EPOLLIN;
186 			evtype = EventType.Signal;
187 			try 
188 				m_evSignal = FreeListObjectAlloc!EventInfo.alloc(sfd, evtype, EventObject.init, m_instanceId);
189 			catch (Exception e){ 
190 				assert(false, "Allocation error"); 
191 			}
192 			_event.data.ptr = cast(void*) m_evSignal;
193 
194 			err = epoll_ctl(m_epollfd, EPOLL_CTL_ADD, sfd, &_event);
195 			if (catchError!"EPOLL_CTL_ADD(sfd)"(err))
196 			{
197 				return false;
198 			}
199 
200 		}
201 		else /* if KQUEUE */ 
202 		{
203 			try {
204 				if (!gs_queueMutex) {
205 					gs_queueMutex = FreeListObjectAlloc!ReadWriteMutex.alloc();
206 					gs_signalQueue = Array!(Array!AsyncSignal)();
207 					gs_idxQueue = Array!(Array!size_t)();
208 				}
209 				if (g_evIdxAvailable.empty) {
210 					g_evIdxAvailable.reserve(32);
211 					
212 					foreach (k; g_evIdxAvailable.length .. g_evIdxAvailable.capacity) {
213 						g_evIdxAvailable.insertBack(k + 1);
214 					}
215 					g_evIdxCapacity = 32;
216 					g_idxCapacity = 32;
217 				}
218 			} catch { assert(false, "Initialization failed"); }
219 			m_kqueuefd = kqueue();
220 			int err;
221 			try {
222 				sigset_t mask;
223 				sigemptyset(&mask);
224 				sigaddset(&mask, SIGXCPU);
225 				
226 				err = sigprocmask(SIG_BLOCK, &mask, null);
227 			} catch {}
228 
229 			EventType evtype = EventType.Signal;
230 
231 			// use GC because FreeListObjectAlloc fails at emplace for shared objects
232 			try 
233 				m_evSignal = FreeListObjectAlloc!EventInfo.alloc(SIGXCPU, evtype, EventObject.init, m_instanceId);
234 			catch (Exception e) {
235 				assert(false, "Failed to allocate resources");
236 			}
237 
238 			if (catchError!"siprocmask"(err))
239 				return 0;
240 
241 			kevent_t _event;
242 			EV_SET(&_event, SIGXCPU, EVFILT_SIGNAL, EV_ADD | EV_ENABLE, 0, 0, m_evSignal);
243 			err = kevent(m_kqueuefd, &_event, 1, null, 0, null);
244 			if (catchError!"kevent_add(SIGXCPU)"(err))
245 				assert(false, "Add SIGXCPU failed at kevent call");
246 		}
247 
248 		try log("init in thread " ~ Thread.getThis().name); catch {}
249 
250 		return true;
251 	}
252 
253 	void exit() {
254 		import core.sys.posix.unistd : close;
255 		static if (EPOLL) {
256 			close(m_epollfd); // not necessary?
257 
258 			// not necessary:
259 			//try FreeListObjectAlloc!EventInfo.free(m_evSignal);
260 			//catch (Exception e) { assert(false, "Failed to free resources"); }
261 
262 		}
263 		else
264 			close(m_kqueuefd);
265 	}
266 
267 	@property const(StatusInfo) status() const {
268 		return m_status;
269 	}
270 
271 	@property string error() const {
272 		string* ptr;
273 		return ((ptr = (m_error in EPosixMessages)) !is null) ? *ptr : string.init;
274 	}
275 
276 	bool loop(Duration timeout = 0.seconds)
277 	//in { assert(Fiber.getThis() is null); }
278 	{
279 
280 		import libasync.internals.memory;
281 		bool success = true;
282 		int num;
283 
284 		static if (EPOLL) {
285 
286 			static align(1) epoll_event[] events;
287 			if (events is null)
288 			{
289 				try events = new epoll_event[128];
290 				catch (Exception e) {
291 					assert(false, "Could not allocate events array: " ~ e.msg);
292 				}
293 			}
294 			int timeout_ms;
295 			if (timeout == 0.seconds)
296 				timeout_ms = -1;
297 			else timeout_ms = cast(int)timeout.total!"msecs";
298 
299 			/// Retrieve pending events
300 			num = epoll_wait(m_epollfd, cast(epoll_event*)&events[0], 128, timeout_ms);
301 
302 			assert(events !is null && events.length <= 128);
303 
304 			
305 		}
306 		else /* if KQUEUE */ {
307 			import core.sys.posix.time : time_t;
308 			import core.sys.posix.config : c_long;
309 			static kevent_t[] events;
310 			if (events.length == 0) {
311 				try events = allocArray!kevent_t(manualAllocator(), 128); 
312 				catch (Exception e) { assert(false, "Could not allocate events array"); }				
313 			}
314 			time_t secs = timeout.split!("seconds", "nsecs")().seconds;
315 			c_long ns = timeout.split!("seconds", "nsecs")().nsecs;
316 			auto tspec = libasync.internals.kqueue.timespec(secs, ns);
317 
318 			num = kevent(m_kqueuefd, null, 0, cast(kevent_t*) events, cast(int) events.length, &tspec);
319 
320 		}
321 
322 		auto errors = [	tuple(EINTR, Status.EVLOOP_TIMEOUT) ];
323 		
324 		if (catchEvLoopErrors!"event_poll'ing"(num, errors)) 
325 			return false;
326 
327 		if (num > 0)
328 			log("Got " ~ num.to!string ~ " event(s)");
329 
330 		foreach(i; 0 .. num) {
331 			success = false;
332 			m_status = StatusInfo.init;
333 			static if (EPOLL) 
334 			{
335 				epoll_event _event = events[i];
336 				try log("Event " ~ i.to!string ~ " of: " ~ events.length.to!string); catch {}
337 				EventInfo* info = cast(EventInfo*) _event.data.ptr;
338 				int event_flags = cast(int) _event.events;
339 			}
340 			else /* if KQUEUE */
341 			{
342 				kevent_t _event = events[i];
343 				EventInfo* info = cast(EventInfo*) _event.udata;
344 				//log("Got info");
345 				int event_flags = (_event.filter << 16) | (_event.flags & 0xffff);
346 				//log("event flags");
347 			}
348 
349 			//if (info.owner != m_instanceId)
350 			//	try log("Event " ~ (cast(int)(info.evType)).to!string ~ " is invalid: supposidly created in instance #" ~ info.owner.to!string ~ ", received in " ~ m_instanceId.to!string ~ " event: " ~ event_flags.to!string);
351 			//	catch{}
352 			//log("owner");
353 			final switch (info.evType) {
354 				case EventType.TCPAccept:
355 					if (info.fd == 0)
356 						break;
357 					success = onTCPAccept(info.fd, info.evObj.tcpAcceptHandler, event_flags);
358 					break;
359 
360 				case EventType.Notifier:
361 
362 					log("Got notifier!");
363 					try info.evObj.notifierHandler();
364 					catch (Exception e) {
365 						setInternalError!"notifierHandler"(Status.ERROR);
366 					}
367 					break;
368 
369 				case EventType.DirectoryWatcher:
370 					log("Got DirectoryWatcher event!");
371 					static if (!EPOLL) {
372 						// in KQUEUE all events will be consumed here, because they must be pre-processed
373 						try {
374 							DWFileEvent fevent;
375 							if (_event.fflags & (NOTE_LINK | NOTE_WRITE))
376 								fevent = DWFileEvent.CREATED;
377 							else if (_event.fflags & NOTE_DELETE)
378 								fevent = DWFileEvent.DELETED;
379 							else if (_event.fflags & (NOTE_ATTRIB | NOTE_EXTEND | NOTE_WRITE))
380 								fevent = DWFileEvent.MODIFIED;
381 							else if (_event.fflags & NOTE_RENAME)
382 								fevent = DWFileEvent.MOVED_FROM;
383 							else if (_event.fflags & NOTE_RENAME)
384 								fevent = DWFileEvent.MOVED_TO;
385 							else
386 								assert(false, "No event found?");
387 
388 							DWFolderInfo fi = m_dwFolders.get(cast(fd_t)_event.ident, DWFolderInfo.init);
389 
390 							if (fi == DWFolderInfo.init) {
391 								DWFileInfo tmp = m_dwFiles.get(cast(fd_t)_event.ident, DWFileInfo.init);
392 								assert(tmp != DWFileInfo.init, "The event loop returned an invalid file's file descriptor for the directory watcher");
393 								fi = m_dwFolders.get(cast(fd_t) tmp.folder, DWFolderInfo.init);
394 								assert(fi != DWFolderInfo.init, "The event loop returned an invalid folder file descriptor for the directory watcher");
395 							}
396 
397 							// all recursive events will be generated here
398 							if (!compareFolderFiles(fi, fevent)) {
399 								continue;
400 							}
401 
402 						} catch (Exception e) {
403 							log("Could not process DirectoryWatcher event: " ~ e.msg);
404 							break;
405 						}
406 
407 					}
408 
409 					try info.evObj.dwHandler();
410 					catch (Exception e) {
411 						setInternalError!"dwHandler"(Status.ERROR);
412 					}
413 					break;
414 
415 				case EventType.Timer:
416 					try log("Got timer! " ~ info.fd.to!string); catch {}
417 					static if (EPOLL) {
418 						static long val;
419 						import core.sys.posix.unistd : read;
420 						read(info.evObj.timerHandler.ctxt.id, &val, long.sizeof);
421 					}
422 					try info.evObj.timerHandler();
423 					catch (Exception e) {
424 						setInternalError!"timerHandler"(Status.ERROR);
425 					}
426 					static if (!EPOLL) {
427 						if (info.evObj.timerHandler.ctxt.oneShot && !info.evObj.timerHandler.ctxt.rearmed) {
428 							destroyIndex(info.evObj.timerHandler.ctxt);
429 							info.evObj.timerHandler.ctxt.id = 0;
430 						}
431 					}
432 					break;
433 
434 				case EventType.Signal:
435 					try log("Got signal!"); catch {}
436 
437 					static if (EPOLL) {
438 						
439 						try log("Got signal: " ~ info.fd.to!string ~ " of type: " ~ info.evType.to!string); catch {}
440 						import core.sys.linux.sys.signalfd : signalfd_siginfo;
441 						import core.sys.posix.unistd : read;
442 						signalfd_siginfo fdsi;
443 						fd_t err = cast(fd_t)read(info.fd, &fdsi, fdsi.sizeof);
444 						shared AsyncSignal sig = cast(shared AsyncSignal) cast(void*) fdsi.ssi_ptr;
445 
446 						try sig.handler();
447 						catch (Exception e) {
448 							setInternalError!"signal handler"(Status.ERROR);
449 						}
450 
451 
452 					}
453 					else /* if KQUEUE */
454 					{
455 						static AsyncSignal[] sigarr;
456 						
457 						if (sigarr.length == 0) {
458 							try sigarr = new AsyncSignal[32]; 
459 							catch (Exception e) { assert(false, "Could not allocate signals array"); }		
460 						}
461 
462 						bool more = popSignals(sigarr);
463 						foreach (AsyncSignal sig; sigarr)
464 						{
465 							shared AsyncSignal ptr = cast(shared AsyncSignal) sig;
466 							if (ptr is null)
467 								break;
468 							try (cast(shared AsyncSignal)sig).handler();
469 							catch (Exception e) {
470 								setInternalError!"signal handler"(Status.ERROR);
471 							}
472 						}
473 					}
474 					break;
475 
476 				case EventType.UDPSocket:
477 					import core.sys.posix.unistd : close;
478 					success = onUDPTraffic(info.fd, info.evObj.udpHandler, event_flags);
479 
480 					nothrow void abortHandler(bool graceful) {
481 
482 						close(info.fd);
483 						info.evObj.udpHandler.conn.socket = 0;
484 						try info.evObj.udpHandler(UDPEvent.ERROR);
485 						catch (Exception e) { }
486 						try FreeListObjectAlloc!EventInfo.free(info);
487 						catch (Exception e){ assert(false, "Error freeing resources"); }
488 					}
489 					
490 					if (!success && m_status.code == Status.ABORT) {
491 						abortHandler(true);
492 						
493 					}
494 					else if (!success && m_status.code == Status.ERROR) {
495 						abortHandler(false); 
496 					}
497 					break;
498 				case EventType.TCPTraffic:
499 					assert(info.evObj.tcpEvHandler.conn !is null, "TCP Connection invalid");
500 
501 					success = onTCPTraffic(info.fd, info.evObj.tcpEvHandler, event_flags, info.evObj.tcpEvHandler.conn);
502 
503 					nothrow void abortTCPHandler(bool graceful) {
504 
505 						nothrow void closeAll() {
506 							try log("closeAll()"); catch {}
507 							if (info.evObj.tcpEvHandler.conn.connected)
508 								closeSocket(info.fd, true, true);
509 							
510 							info.evObj.tcpEvHandler.conn.socket = 0;
511 						}
512 
513 						/// Close the connection after an unexpected socket error
514 						if (graceful) {
515 							try info.evObj.tcpEvHandler(TCPEvent.CLOSE);
516 							catch (Exception e) { }
517 							closeAll();
518 						}
519 
520 						/// Kill the connection after an internal error
521 						else {
522 							try info.evObj.tcpEvHandler(TCPEvent.ERROR);
523 							catch (Exception e) { }
524 							closeAll();
525 						}
526 
527 						if (info.evObj.tcpEvHandler.conn.inbound) {
528 							log("Freeing inbound connection FD#" ~ info.fd.to!string);
529 							try FreeListObjectAlloc!AsyncTCPConnection.free(info.evObj.tcpEvHandler.conn);
530 							catch (Exception e){ assert(false, "Error freeing resources"); }
531 						}
532 						try FreeListObjectAlloc!EventInfo.free(info);
533 						catch (Exception e){ assert(false, "Error freeing resources"); }
534 					}
535 
536 					if (!success && m_status.code == Status.ABORT) {
537 						abortTCPHandler(true);
538 					}
539 					else if (!success && m_status.code == Status.ERROR) {
540 						abortTCPHandler(false);
541 					}
542 
543 					break;
544 			}
545 
546 		}
547 		return success;
548 	}
549 
550 	bool setOption(T)(fd_t fd, TCPOption option, in T value) {
551 		m_status = StatusInfo.init;
552 		import std.traits : isIntegral;
553 
554 		import libasync.internals.socket_compat : socklen_t, setsockopt, SO_REUSEADDR, SO_KEEPALIVE, SO_RCVBUF, SO_SNDBUF, SO_RCVTIMEO, SO_SNDTIMEO, SO_LINGER, SOL_SOCKET, IPPROTO_TCP, TCP_NODELAY, TCP_QUICKACK, TCP_KEEPCNT, TCP_KEEPINTVL, TCP_KEEPIDLE, TCP_CONGESTION, TCP_CORK, TCP_DEFER_ACCEPT;
555 		int err;
556 		nothrow bool errorHandler() {
557 			if (catchError!"setOption:"(err)) {
558 				try m_status.text ~= option.to!string;
559 				catch (Exception e){ assert(false, "to!string conversion failure"); }
560 				return false;
561 			}
562 
563 			return true;
564 		}
565 		final switch (option) {
566 			case TCPOption.NODELAY: // true/false
567 				static if (!is(T == bool))
568 					assert(false, "NODELAY value type must be bool, not " ~ T.stringof);
569 				else {
570 					int val = value?1:0;
571 					socklen_t len = val.sizeof;
572 					err = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, len);
573 					return errorHandler();
574 				}
575 			case TCPOption.REUSEADDR: // true/false
576 				static if (!is(T == bool))
577 					assert(false, "REUSEADDR value type must be bool, not " ~ T.stringof);
578 				else {
579 					int val = value?1:0;
580 					socklen_t len = val.sizeof;
581 					err = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &val, len);
582 					if (!errorHandler())
583 						return false;
584 
585 					// BSD systems have SO_REUSEPORT
586 					import libasync.internals.socket_compat : SO_REUSEPORT;
587 					err = setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &val, len);
588 					
589 					// Not all linux kernels support SO_REUSEPORT
590 					version(linux) {
591 						// ignore invalid and not supported errors on linux
592 						if (errno == EINVAL || errno == ENOPROTOOPT) {
593 							return true;
594 						}						
595 					} 
596 
597 					return errorHandler();
598 				}
599 			case TCPOption.QUICK_ACK:
600 				static if (!is(T == bool))
601 					assert(false, "QUICK_ACK value type must be bool, not " ~ T.stringof);
602 				else {
603 					static if (EPOLL) {
604 						int val = value?1:0;
605 						socklen_t len = val.sizeof;
606 						err = setsockopt(fd, IPPROTO_TCP, TCP_QUICKACK, &val, len);
607 						return errorHandler();
608 					}
609 					else /* not linux */ {
610 						return false;
611 					}
612 				}
613 			case TCPOption.KEEPALIVE_ENABLE: // true/false
614 				static if (!is(T == bool))
615 					assert(false, "KEEPALIVE_ENABLE value type must be bool, not " ~ T.stringof);
616 				else
617 				{
618 					int val = value?1:0;
619 					socklen_t len = val.sizeof;
620 					err = setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &val, len);
621 					return errorHandler();
622 				}
623 			case TCPOption.KEEPALIVE_COUNT: // ##
624 				static if (!isIntegral!T)
625 					assert(false, "KEEPALIVE_COUNT value type must be integral, not " ~ T.stringof);
626 				else {
627 					int val = value.total!"msecs".to!uint;
628 					socklen_t len = val.sizeof;
629 					err = setsockopt(fd, IPPROTO_TCP, TCP_KEEPCNT, &val, len);
630 					return errorHandler();
631 				}
632 			case TCPOption.KEEPALIVE_INTERVAL: // wait ## seconds
633 				static if (!is(T == Duration))
634 					assert(false, "KEEPALIVE_INTERVAL value type must be Duration, not " ~ T.stringof);
635 				else {
636 					int val;
637 					try val = value.total!"seconds".to!uint; catch { return false; }
638 					socklen_t len = val.sizeof;
639 					err = setsockopt(fd, IPPROTO_TCP, TCP_KEEPINTVL, &val, len);
640 					return errorHandler();
641 				}
642 			case TCPOption.KEEPALIVE_DEFER: // wait ## seconds until start
643 				static if (!is(T == Duration))
644 					assert(false, "KEEPALIVE_DEFER value type must be Duration, not " ~ T.stringof);
645 				else {
646 					int val;
647 					try val = value.total!"seconds".to!uint; catch { return false; }
648 					socklen_t len = val.sizeof;
649 					err = setsockopt(fd, IPPROTO_TCP, TCP_KEEPIDLE, &val, len);
650 					return errorHandler();
651 				}
652 			case TCPOption.BUFFER_RECV: // bytes
653 				static if (!isIntegral!T)
654 					assert(false, "BUFFER_RECV value type must be integral, not " ~ T.stringof);
655 				else {
656 					int val = value.to!int;
657 					socklen_t len = val.sizeof;
658 					err = setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &val, len);
659 					return errorHandler();
660 				}
661 			case TCPOption.BUFFER_SEND: // bytes
662 				static if (!isIntegral!T)
663 					assert(false, "BUFFER_SEND value type must be integral, not " ~ T.stringof);
664 				else {
665 					int val = value.to!int;
666 					socklen_t len = val.sizeof;
667 					err = setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &val, len);
668 					return errorHandler();
669 				}
670 			case TCPOption.TIMEOUT_RECV:
671 				static if (!is(T == Duration))
672 					assert(false, "TIMEOUT_RECV value type must be Duration, not " ~ T.stringof);
673 				else {
674 					import core.sys.posix.sys.time : timeval;
675 					time_t secs = cast(time_t) value.split!("seconds", "usecs")().seconds;
676 					suseconds_t us;
677 					try us = value.split!("seconds", "usecs")().usecs.to!suseconds_t; catch {}
678 					timeval t = timeval(secs, us);
679 					socklen_t len = t.sizeof;
680 					err = setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, &t, len);
681 					return errorHandler();
682 				}
683 			case TCPOption.TIMEOUT_SEND:
684 				static if (!is(T == Duration))
685 					assert(false, "TIMEOUT_SEND value type must be Duration, not " ~ T.stringof);
686 				else {
687 					import core.sys.posix.sys.time : timeval;
688 					time_t secs = value.split!("seconds", "usecs")().seconds;
689 					suseconds_t us;
690 					try us = value.split!("seconds", "usecs")().usecs.to!suseconds_t; catch {}
691 					timeval t = timeval(secs, us);
692 					socklen_t len = t.sizeof;
693 					err = setsockopt(fd, SOL_SOCKET, SO_SNDTIMEO, &t, len);
694 					return errorHandler();
695 				}
696 			case TCPOption.TIMEOUT_HALFOPEN:
697 				static if (!is(T == Duration))
698 					assert(false, "TIMEOUT_SEND value type must be Duration, not " ~ T.stringof);
699 				else {
700 					uint val;
701 					try val = value.total!"msecs".to!uint; catch {
702 						return false;
703 					}
704 					socklen_t len = val.sizeof;
705 					err = setsockopt(fd, SOL_SOCKET, SO_SNDTIMEO, &val, len);
706 					return errorHandler();
707 				}
708 			case TCPOption.LINGER: // bool onOff, int seconds
709 				static if (!is(T == Tuple!(bool, int)))
710 					assert(false, "LINGER value type must be Tuple!(bool, int), not " ~ T.stringof);
711 				else {
712 					linger l = linger(val[0]?1:0, val[1]);
713 					socklen_t llen = l.sizeof;
714 					err = setsockopt(fd, SOL_SOCKET, SO_LINGER, &l, llen);
715 					return errorHandler();
716 				}
717 			case TCPOption.CONGESTION:
718 				static if (!isIntegral!T)
719 					assert(false, "CONGESTION value type must be integral, not " ~ T.stringof);
720 				else {
721 					int val = value.to!int;
722 					len = int.sizeof;
723 					err = setsockopt(fd, IPPROTO_TCP, TCP_CONGESTION, &val, len);
724 					return errorHandler();
725 				}
726 			case TCPOption.CORK:
727 				static if (!isIntegral!T)
728 					assert(false, "CORK value type must be int, not " ~ T.stringof);
729 				else {
730 					static if (EPOLL) {
731 						int val = value.to!int;
732 						socklen_t len = val.sizeof;
733 						err = setsockopt(fd, IPPROTO_TCP, TCP_CORK, &val, len);
734 						return errorHandler();
735 					}
736 					else /* if KQUEUE */ {
737 						int val = value.to!int;
738 						socklen_t len = val.sizeof;
739 						err = setsockopt(fd, IPPROTO_TCP, TCP_NOPUSH, &val, len);
740 						return errorHandler();
741 
742 					}
743 				}
744 			case TCPOption.DEFER_ACCEPT: // seconds
745 				static if (!isIntegral!T)
746 					assert(false, "DEFER_ACCEPT value type must be integral, not " ~ T.stringof);
747 				else {
748 					static if (EPOLL) {
749 						int val = value.to!int;
750 						socklen_t len = val.sizeof;
751 						err = setsockopt(fd, IPPROTO_TCP, TCP_DEFER_ACCEPT, &val, len);
752 						return errorHandler();
753 					}
754 					else /* if KQUEUE */ {
755 						// todo: Emulate DEFER_ACCEPT with ACCEPT_FILTER(9)
756 						/*int val = value.to!int;
757 						socklen_t len = val.sizeof;
758 						err = setsockopt(fd, SOL_SOCKET, SO_ACCEPTFILTER, &val, len);
759 						return errorHandler();
760 						*/
761 						assert(false, "TCPOption.DEFER_ACCEPT is not implemented");
762 					}
763 				}
764 		}
765 
766 	}
767 
768 	uint recv(in fd_t fd, ref ubyte[] data)
769 	{
770 		try log("Recv from FD: " ~ fd.to!string); catch {}
771 		m_status = StatusInfo.init;
772 		import libasync.internals.socket_compat : recv;
773 		int ret = cast(int) recv(fd, cast(void*) data.ptr, data.length, cast(int)0);
774 		
775 		static if (LOG) log(".recv " ~ ret.to!string ~ " bytes of " ~ data.length.to!string ~ " @ " ~ fd.to!string);
776 		if (catchError!".recv"(ret)){ // ret == SOCKET_ERROR == -1 ?
777 			if (m_error == EPosix.EWOULDBLOCK || m_error == EPosix.EAGAIN)
778 				m_status.code = Status.ASYNC;
779 
780 			return 0; // TODO: handle some errors more specifically
781 		}
782 
783 		m_status.code = Status.OK;
784 		
785 		return cast(uint) ret < 0 ? 0 : ret;
786 	}
787 	
788 	uint send(in fd_t fd, in ubyte[] data)
789 	{
790 		try log("Send to FD: " ~ fd.to!string); catch {}
791 		m_status = StatusInfo.init;
792 		import libasync.internals.socket_compat : send;
793 		int ret = cast(int) send(fd, cast(const(void)*) data.ptr, data.length, cast(int)0);
794 
795 		if (catchError!"send"(ret)) { // ret == -1
796 			if (m_error == EPosix.EWOULDBLOCK || m_error == EPosix.EAGAIN) {
797 				m_status.code = Status.ASYNC;
798 				return 0;
799 			}
800 		}
801 		m_status.code = Status.OK;
802 		return cast(uint) ret < 0 ? 0 : ret;
803 	}
804 
805 	uint recvFrom(in fd_t fd, ref ubyte[] data, ref NetworkAddress addr)
806 	{
807 		import libasync.internals.socket_compat : recvfrom, AF_INET6, AF_INET, socklen_t;
808 
809 		m_status = StatusInfo.init;
810 
811 		addr.family = AF_INET6;
812 		socklen_t addrLen = addr.sockAddrLen;
813 		long ret = recvfrom(fd, cast(void*) data.ptr, data.length, 0, addr.sockAddr, &addrLen);
814 		
815 		if (addrLen < addr.sockAddrLen) {
816 			addr.family = AF_INET;
817 		}
818 		
819 		try log("RECVFROM " ~ ret.to!string ~ "B"); catch {}
820 		if (catchError!".recvfrom"(ret)) { // ret == -1
821 			if (m_error == EPosix.EWOULDBLOCK || m_error == EPosix.EAGAIN)
822 				m_status.code = Status.ASYNC;
823 			return 0;
824 		}
825 
826 		m_status.code = Status.OK;
827 		
828 		return cast(uint) ret;
829 	}
830 	
831 	uint sendTo(in fd_t fd, in ubyte[] data, in NetworkAddress addr)
832 	{
833 		import libasync.internals.socket_compat : sendto;
834 
835 		m_status = StatusInfo.init;
836 
837 		try log("SENDTO " ~ data.length.to!string ~ "B");
838 		catch{}
839 		long ret = sendto(fd, cast(void*) data.ptr, data.length, 0, addr.sockAddr, addr.sockAddrLen);
840 		
841 		if (catchError!".sendto"(ret)) { // ret == -1
842 			if (m_error == EPosix.EWOULDBLOCK || m_error == EPosix.EAGAIN)
843 				m_status.code = Status.ASYNC;
844 			return 0;
845 		}
846 
847 		m_status.code = Status.OK;
848 		return cast(uint) ret;
849 	}
850 
851 	NetworkAddress localAddr(in fd_t fd, bool ipv6) {
852 		NetworkAddress ret;
853 		import libasync.internals.socket_compat : getsockname, AF_INET, AF_INET6, socklen_t, sockaddr;
854 
855 		if (ipv6)
856 			ret.family = AF_INET6;
857 		else
858 			ret.family = AF_INET;
859 
860 		socklen_t len = ret.sockAddrLen;
861 		int err = getsockname(fd, ret.sockAddr, &len);
862 		if (catchError!"getsockname"(err))
863 			return NetworkAddress.init;
864 		if (len > ret.sockAddrLen)
865 			ret.family = AF_INET6;
866 
867 		return ret;
868 	}
869 
870 	bool notify(in fd_t fd, AsyncNotifier ctxt)
871 	{
872 		static if (EPOLL)
873 		{
874 			import core.sys.posix.unistd : write;
875 
876 			long val = 1;
877 			fd_t err = cast(fd_t) write(fd, &val, long.sizeof);
878 			
879 			if (catchError!"write(notify)"(err)) {
880 				return false;
881 			}
882 			return true;
883 		}
884 		else /* if KQUEUE */
885 		{			
886 			kevent_t _event;
887 			EV_SET(&_event, fd, EVFILT_USER, EV_ENABLE | EV_CLEAR, NOTE_TRIGGER | 0x1, 0, ctxt.evInfo);
888 			int err = kevent(m_kqueuefd, &_event, 1, null, 0, null);
889 			
890 			if (catchError!"kevent_notify"(err)) {
891 				return false;
892 			}
893 			return true;
894 		}
895 	}
896 
897 	bool notify(in fd_t fd, shared AsyncSignal ctxt)
898 	{
899 		static if (EPOLL) 
900 		{
901 
902 			sigval sigvl;
903 			fd_t err;
904 			sigvl.sival_ptr = cast(void*) ctxt;
905 			try err = pthread_sigqueue(ctxt.pthreadId, fd, sigvl); catch {}
906 			if (catchError!"sigqueue"(err)) {
907 				return false;
908 			}
909 		}
910 		else /* if KQUEUE */ 
911 		{
912 
913 			import core.thread : getpid;
914 
915 			addSignal(ctxt);
916 
917 			try {
918 				log("Notified fd: " ~ fd.to!string ~ " of PID " ~ getpid().to!string); 
919 				int err = core.sys.posix.signal.kill(getpid(), SIGXCPU);
920 				if (catchError!"notify(signal)"(err))
921 					assert(false, "Signal could not be raised");
922 			} catch {}
923 		}
924 
925 		return true;
926 	}
927 
928 	// no known uses
929 	uint read(in fd_t fd, ref ubyte[] data)
930 	{
931 		m_status = StatusInfo.init;
932 		return 0;
933 	}
934 
935 	// no known uses
936 	uint write(in fd_t fd, in ubyte[] data)
937 	{
938 		m_status = StatusInfo.init;
939 		return 0;
940 	}
941 
942 	uint watch(in fd_t fd, in WatchInfo info) {
943 		// note: info.wd is still 0 at this point.
944 		m_status = StatusInfo.init;
945 		import core.sys.linux.sys.inotify;
946 		import std.file : dirEntries, isDir, SpanMode;
947 
948 		static if (EPOLL) {
949 			// Manually handle recursivity... All events show up under the same inotify
950 			uint events = info.events; // values for this API were pulled from inotify
951 			if (events & IN_DELETE)
952 				events |= IN_DELETE_SELF;
953 			if (events & IN_MOVED_FROM)
954 				events |= IN_MOVE_SELF;
955 
956 			nothrow fd_t addFolderRecursive(Path path) {
957 				fd_t ret;
958 				try {
959 					ret = inotify_add_watch(fd, path.toNativeString().toStringz, events);
960 					if (catchError!"inotify_add_watch"(ret))
961 						return fd_t.init;
962 					try log("inotify_add_watch(" ~ DWFolderInfo(WatchInfo(info.events, path, info.recursive, ret), fd).to!string ~ ")"); catch {}
963 					assert(m_dwFolders.get(tuple(cast(fd_t) fd, cast(uint)ret), DWFolderInfo.init) == DWFolderInfo.init, "Could not get a unique watch descriptor for path, got: " ~ m_dwFolders[tuple(cast(fd_t)fd, cast(uint)ret)].to!string);
964 					m_dwFolders[tuple(cast(fd_t)fd, cast(uint)ret)] = DWFolderInfo(WatchInfo(info.events, path, info.recursive, ret), fd);
965 					if (info.recursive) {
966 						foreach (de; path.toNativeString().dirEntries(SpanMode.shallow))
967 						{
968 							Path de_path = Path(de.name);
969 							if (!de_path.absolute)
970 								de_path = path ~ Path(de.name);
971 							if (isDir(de_path.toNativeString()))
972 								if (addFolderRecursive(de_path) == 0)
973 									return 0;
974 						}
975 					}
976 
977 				} catch (Exception e) { 
978 					try setInternalError!"inotify_add_watch"(Status.ERROR, "Could not add directory " ~ path.toNativeString() ~ ": " ~ e.toString() ); catch {}
979 					return 0; 
980 				}
981 
982 				return ret;
983 			}
984 
985 			return addFolderRecursive(info.path);
986 
987 		} else /* if KQUEUE */ {
988 			/// Manually handle recursivity & file tracking. Each folder is an event! 
989 			/// E.g. file creation shows up as a folder change, we must be prepared to seek the file.
990 			import core.sys.posix.fcntl;
991 			import libasync.internals.kqueue;
992 
993 			uint events;
994 			if (info.events & DWFileEvent.CREATED)
995 				events |= NOTE_LINK | NOTE_WRITE;
996 			if (info.events & DWFileEvent.DELETED)
997 				events |= NOTE_DELETE;
998 			if (info.events & DWFileEvent.MODIFIED)
999 				events |= NOTE_ATTRIB | NOTE_EXTEND | NOTE_WRITE;
1000 			if (info.events & DWFileEvent.MOVED_FROM)
1001 				events |= NOTE_RENAME;
1002 			if (info.events & DWFileEvent.MOVED_TO)
1003 				events |= NOTE_RENAME;
1004 
1005 			EventInfo* evinfo;
1006 			try evinfo = m_watchers[fd]; catch { assert(false, "Could retrieve event info, directory watcher was not initialized properly, or you are operating on a closed directory watcher."); }
1007 
1008 			/// we need a file descriptor for the containers, so we open files but we don't monitor them
1009 			/// todo: track indexes internally?
1010 			nothrow fd_t addRecursive(Path path, bool is_dir) {
1011 				int ret;
1012 				try {
1013 					log("Adding path: " ~ path.toNativeString());
1014 
1015 					ret = open(path.toNativeString().toStringz, O_EVTONLY);
1016 					if (catchError!"open(watch)"(ret))
1017 						return 0;
1018 
1019 					if (is_dir)
1020 						m_dwFolders[ret] = DWFolderInfo(WatchInfo(info.events, path, info.recursive, ret), fd);
1021 
1022 					kevent_t _event;
1023 					
1024 					EV_SET(&_event, ret, EVFILT_VNODE, EV_ADD | EV_CLEAR, events, 0, cast(void*) evinfo);
1025 					
1026 					int err = kevent(m_kqueuefd, &_event, 1, null, 0, null);
1027 					
1028 					if (catchError!"kevent_timer_add"(err))
1029 						return 0;
1030 
1031 
1032 					if (is_dir) foreach (de; dirEntries(path.toNativeString(), SpanMode.shallow)) {
1033 						Path filePath = Path(de.name);
1034 						if (!filePath.absolute)
1035 							filePath = path ~ filePath;
1036 						fd_t fwd;
1037 						if (info.recursive && isDir(filePath.toNativeString()))
1038 							fwd = addRecursive(filePath, true);
1039 						else {
1040 							fwd = addRecursive(filePath, false); // gets an ID but will not scan
1041 							m_dwFiles[fwd] = DWFileInfo(ret, filePath, de.timeLastModified, isDir(filePath.toNativeString()));
1042 						}
1043 						
1044 					}
1045 
1046 				} catch (Exception e) { 
1047 					try setInternalError!"inotify_add_watch"(Status.ERROR, "Could not add directory " ~ path.toNativeString() ~ ": " ~ e.msg);  catch {}
1048 					return 0; 
1049 				}
1050 				return ret;
1051 			}
1052 
1053 			fd_t wd;
1054 
1055 			try {
1056 				wd = addRecursive(info.path, isDir(info.path.toNativeString()));
1057 
1058 				if (wd == 0)
1059 					return 0;
1060 
1061 			}
1062 			catch (Exception e) {
1063 				setInternalError!"dw.watch"(Status.ERROR, "Failed to watch directory: " ~ e.msg); 
1064 			}
1065 
1066 			return cast(uint) wd;
1067 		}
1068 	}
1069 
1070 	bool unwatch(in fd_t fd, in uint wd) {
1071 		// the wd can be used with m_dwFolders to find the DWFolderInfo
1072 		// and unwatch everything recursively.
1073 
1074 		m_status = StatusInfo.init;
1075 		static if (EPOLL) {
1076 			/// If recursive, all subfolders must also be unwatched recursively by removing them
1077 			/// from containers and from inotify
1078 			import core.sys.linux.sys.inotify;
1079 
1080 			nothrow bool removeAll(DWFolderInfo fi) {
1081 				int err;
1082 				try {
1083 
1084 					bool inotify_unwatch(uint wd) {
1085 						err = inotify_rm_watch(fd, wd);
1086 
1087 						if (catchError!"inotify_rm_watch"(err))
1088 							return false;
1089 						return true;
1090 					}
1091 
1092 					if (!inotify_unwatch(fi.wi.wd))
1093 						return false;
1094 
1095 					/*foreach (ref const fd_t id, ref const DWFileInfo file; m_dwFiles)
1096 					{
1097 						if (file.folder == fi.wi.wd) {
1098 							inotify_unwatch(id);
1099 							m_dwFiles.remove(id);
1100 						}
1101 					}*/
1102 					m_dwFolders.remove(tuple(cast(fd_t)fd, fi.wi.wd)); 
1103 
1104 					if (fi.wi.recursive) {
1105 						// find all subdirectories by comparing the path
1106 						Array!uint remove_list;
1107 						foreach (ref const DWFolderInfo folder; m_dwFolders) {
1108 							if (folder.fd == fi.fd && folder.wi.path.startsWith(fi.wi.path)) {
1109 
1110 								if (!inotify_unwatch(folder.wi.wd))
1111 									return false;
1112 
1113 								remove_list.insertBack(fi.wi.wd);
1114 							}
1115 						}
1116 						foreach (rm_wd; remove_list[])
1117 							m_dwFolders.remove(tuple(cast(fd_t) fd, rm_wd));
1118 
1119 					}
1120 					return true;
1121 				} catch (Exception e) { 
1122 					try setInternalError!"inotify_rm_watch"(Status.ERROR, "Could not unwatch directory: " ~ e.toString()); catch {}
1123 					return false; 
1124 				}
1125 			}
1126 
1127 			DWFolderInfo info;
1128 
1129 			try {
1130 				info = m_dwFolders.get(tuple(cast(fd_t) fd, cast(uint) wd), DWFolderInfo.init);
1131 				if (info == DWFolderInfo.init) {
1132 					setInternalError!"dwFolders.get(wd)"(Status.ERROR, "Could not find watch info for wd " ~ wd.to!string);
1133 					return false;
1134 				}
1135 			} catch { }
1136 
1137 			return removeAll(info);
1138 		}
1139 		else /* if KQUEUE */ {
1140 
1141 			/// Recursivity must be handled manually, so we must unwatch subfiles and subfolders
1142 			/// recursively, remove the container entries, close the file descriptor, and disable the vnode events.
1143 
1144 			nothrow bool removeAll(DWFolderInfo fi) {
1145 				import core.sys.posix.unistd : close;
1146 
1147 
1148 				bool event_unset(uint id) {
1149 					kevent_t _event;
1150 					EV_SET(&_event, cast(int) id, EVFILT_VNODE, EV_DELETE, 0, 0, null);
1151 					int err = kevent(m_kqueuefd, &_event, 1, null, 0, null);
1152 					if (catchError!"kevent_unwatch"(err))
1153 						return false;
1154 					return true;
1155 				}
1156 
1157 				bool removeFolder(uint wd) {
1158 					if (!event_unset(fi.wi.wd))
1159 						return false;
1160 					m_dwFolders.remove(fi.wi.wd);
1161 					int err = close(fi.wi.wd);
1162 					if (catchError!"close dir"(err))
1163 						return false;
1164 					return true;
1165 				}
1166 
1167 				try {
1168 					removeFolder(fi.wi.wd);
1169 
1170 					if (fi.wi.recursive) {
1171 						import std.container.array;
1172 						Array!fd_t remove_list; // keep track of unwatched folders recursively
1173 						Array!fd_t remove_file_list;
1174 						// search for subfolders and unset them / close their wd
1175 						foreach (ref const DWFolderInfo folder; m_dwFolders) {
1176 							if (folder.fd == fi.fd && folder.wi.path.startsWith(fi.wi.path)) {
1177 								 
1178 								if (!event_unset(folder.wi.wd))
1179 									return false;
1180 
1181 								// search for subfiles, close their descriptors and remove them from the file list
1182 								foreach (ref const fd_t fwd, ref const DWFileInfo file; m_dwFiles) {
1183 									if (file.folder == folder.wi.wd) {
1184 										close(fwd);
1185 										remove_file_list.insertBack(fwd); // to be removed from m_dwFiles without affecting the loop
1186 									}
1187 								}
1188 
1189 								remove_list.insertBack(folder.wi.wd); // to be removed from m_dwFolders without affecting the loop
1190 							}
1191 						}
1192 
1193 						foreach (wd; remove_file_list[])
1194 							m_dwFiles.remove(wd);
1195 
1196 						foreach (rm_wd; remove_list[])
1197 							removeFolder(rm_wd);
1198 
1199 
1200 					}
1201 				} catch (Exception e) {
1202 					try setInternalError!"dwFolders.get(wd)"(Status.ERROR, "Could not close the folder " ~ fi.to!string ~ ": " ~ e.toString()); catch {}
1203 					return false;
1204 				}
1205 
1206 				return true;
1207 			}
1208 
1209 			DWFolderInfo info;
1210 			try info = m_dwFolders.get(wd, DWFolderInfo.init); catch {}
1211 
1212 			if (!removeAll(info))
1213 				return false;
1214 			return true;
1215 		}
1216 	}
1217 
1218 	// returns the amount of changes
1219 	uint readChanges(in fd_t fd, ref DWChangeInfo[] dst) {
1220 		m_status = StatusInfo.init;
1221 
1222 		static if (EPOLL) {
1223 			assert(dst.length > 0, "DirectoryWatcher called with 0 length DWChangeInfo array");
1224 			import core.sys.linux.sys.inotify;
1225 			import core.sys.posix.unistd : read;
1226 			import core.stdc.stdio : FILENAME_MAX;
1227 			import core.stdc.string : strlen;
1228 			ubyte[inotify_event.sizeof + FILENAME_MAX + 1] buf = void;
1229 			ssize_t nread = read(fd, buf.ptr, cast(uint)buf.sizeof);
1230 			if (catchError!"read()"(nread))
1231 			{
1232 				if (m_error == EPosix.EWOULDBLOCK || m_error == EPosix.EAGAIN) 
1233 					m_status.code = Status.ASYNC;
1234 				return 0;
1235 			}
1236 			assert(nread > 0);
1237 		
1238 
1239 			/// starts (recursively) watching all newly created folders in a recursive entry,
1240 			/// creates events for additional files/folders founds, and unwatches all deleted folders
1241 			void recurseInto(DWFolderInfo fi, DWFileEvent ev, ref Array!DWChangeInfo changes) {
1242 				import std.file : dirEntries, SpanMode, isDir;
1243 				assert(fi.wi.recursive);
1244 				// get a list of stuff in the created/moved folder
1245 				if (ev == DWFileEvent.CREATED || ev == DWFileEvent.MOVED_TO) {
1246 					foreach (de; dirEntries(fi.wi.path.toNativeString(), SpanMode.shallow)) {
1247 						Path entryPath = fi.wi.path ~ Path(de.name);
1248 						
1249 						if (fi.wi.recursive && isDir(entryPath.toNativeString())) {
1250 							
1251 							watch(fd, WatchInfo(fi.wi.events, entryPath, fi.wi.recursive, 0) );
1252 							void genEvents(Path subpath) {
1253 								foreach (de; dirEntries(subpath.toNativeString(), SpanMode.shallow)) {
1254 									auto subsubpath = subpath ~ Path(de.name);
1255 									changes.insertBack(DWChangeInfo(DWFileEvent.CREATED, subsubpath));
1256 									if (isDir(subsubpath.toNativeString()))
1257 										genEvents(subsubpath);
1258 								}
1259 							}
1260 							
1261 							genEvents(entryPath);
1262 							
1263 						}
1264 					}
1265 				}
1266 			}
1267 
1268 			size_t i;
1269 			do
1270 			{
1271 				for (auto p = buf.ptr; p < buf.ptr + nread; )
1272 				{
1273 					inotify_event* ev = cast(inotify_event*)p;
1274 					p += inotify_event.sizeof + ev.len;
1275 
1276 					DWFileEvent evtype;
1277 					evtype = DWFileEvent.CREATED;
1278 					if (ev.mask & IN_CREATE)
1279 						evtype = DWFileEvent.CREATED;
1280 					if (ev.mask & IN_DELETE || ev.mask & IN_DELETE_SELF)
1281 						evtype = DWFileEvent.DELETED;
1282 					if (ev.mask & IN_MOVED_FROM || ev.mask & IN_MOVE_SELF)
1283 						evtype = DWFileEvent.MOVED_FROM;
1284 					if (ev.mask & (IN_MOVED_TO))
1285 						evtype = DWFileEvent.MOVED_TO;
1286 					if (ev.mask & IN_MODIFY)
1287 						evtype = DWFileEvent.MODIFIED;
1288 
1289 					import std.path : buildPath;
1290 					import core.stdc.string : strlen;
1291 					string name = cast(string) ev.name.ptr[0 .. cast(size_t) ev.name.ptr.strlen].idup;
1292 					DWFolderInfo fi;
1293 					Path path;
1294 					try {
1295 						fi = m_dwFolders.get(tuple(cast(fd_t)fd,cast(uint)ev.wd), DWFolderInfo.init);
1296 						if (fi == DWFolderInfo.init) {
1297 							setInternalError!"m_dwFolders[ev.wd]"(Status.ERROR, "Could not retrieve wd index in folders: " ~ ev.wd.to!string); 
1298 							continue;
1299 						}
1300 						path = fi.wi.path ~ Path(name); 
1301 					}
1302 					catch (Exception e) { 
1303 						setInternalError!"m_dwFolders[ev.wd]"(Status.ERROR, "Could not retrieve wd index in folders"); 
1304 						return 0; 
1305 					}
1306 
1307 					dst[i] = DWChangeInfo(evtype, path);
1308 					import std.file : isDir;
1309 					bool is_dir;
1310 					try is_dir = isDir(path.toNativeString()); catch {}
1311 					if (fi.wi.recursive && is_dir) {
1312 
1313 						try {
1314 							Array!DWChangeInfo changes;
1315 							recurseInto(fi, evtype, changes);
1316 							// stop watching if the folder was deleted
1317 							if (evtype == DWFileEvent.DELETED || evtype == DWFileEvent.MOVED_FROM) {
1318 								unwatch(fi.fd, fi.wi.wd);
1319 							}
1320 							foreach (change; changes[]) {
1321 								i++;
1322 								if (dst.length <= i)
1323 									dst ~= change;
1324 								else dst[i] = change;
1325 							}
1326 						}
1327 						catch (Exception e) {
1328 							setInternalError!"recurseInto"(Status.ERROR, "Failed to watch/unwatch contents of folder recursively."); 
1329 							return 0; 
1330 						}
1331 
1332 					}
1333 
1334 
1335 					i++;
1336 					if (i >= dst.length)
1337 						return cast(uint) i;
1338 				}
1339 				static if (LOG) foreach (j; 0 .. i) {
1340 					try log("Change occured for FD#" ~ fd.to!string ~ ": " ~ dst[j].to!string); catch {}
1341 				}
1342 				nread = read(fd, buf.ptr, buf.sizeof);
1343 				if (catchError!"read()"(nread)) {
1344 					if (m_error == EPosix.EWOULDBLOCK || m_error == EPosix.EAGAIN)
1345 						m_status.code = Status.ASYNC;
1346 					return cast(uint) i;
1347 				}
1348 			} while (nread > 0);
1349 
1350 			return cast(uint) i;
1351 		}
1352 		else /* if KQUEUE */ {
1353 			Array!(DWChangeInfo)* changes;
1354 			size_t i;
1355 			try {
1356 				changes = m_changes[fd];
1357 				import std.algorithm : min;
1358 				size_t cnt = min(dst.length, changes.length);
1359 				foreach (DWChangeInfo change; (*changes)[0 .. cnt]) {
1360 					dst[i] = (*changes)[i];
1361 					i++;
1362 				}
1363 				changes.linearRemove((*changes)[0 .. cnt]);
1364 			}
1365 			catch (Exception e) {
1366 				setInternalError!"watcher.readChanges"(Status.ERROR, "Could not read directory changes: " ~ e.msg);
1367 				return false;
1368 			}
1369 			return cast(uint) i;
1370 		}
1371 	}
1372 
1373 	bool broadcast(in fd_t fd, bool b) {
1374 		m_status = StatusInfo.init;
1375 
1376 		import libasync.internals.socket_compat : socklen_t, setsockopt, SO_BROADCAST, SOL_SOCKET;
1377 
1378 		int val = b?1:0;
1379 		socklen_t len = val.sizeof;
1380 		int err = setsockopt(fd, SOL_SOCKET, SO_BROADCAST, &val, len);
1381 		if (catchError!"setsockopt"(err))
1382 			return false;
1383 
1384 		return true;
1385 	}
1386 
1387 	private bool closeRemoteSocket(fd_t fd, bool forced) {
1388 		
1389 		int err;
1390 		log("shutdown");
1391 		import libasync.internals.socket_compat : shutdown, SHUT_WR, SHUT_RDWR, SHUT_RD;
1392 		if (forced) 
1393 			err = shutdown(fd, SHUT_RDWR);
1394 		else
1395 			err = shutdown(fd, SHUT_WR);
1396 
1397 		static if (!EPOLL) {
1398 			kevent_t[2] events;
1399 			try log("!!DISC delete events"); catch {}
1400 			EV_SET(&(events[0]), fd, EVFILT_READ, EV_DELETE | EV_DISABLE, 0, 0, null);
1401 			EV_SET(&(events[1]), fd, EVFILT_WRITE, EV_DELETE | EV_DISABLE, 0, 0, null);
1402 			kevent(m_kqueuefd, &(events[0]), 2, null, 0, null);
1403 
1404 		}
1405 
1406 		if (catchError!"shutdown"(err))
1407 			return false;
1408 
1409 		return true;
1410 	}
1411 
1412 	// for connected sockets
1413 	bool closeSocket(fd_t fd, bool connected, bool forced = false)
1414 	{
1415 		log("closeSocket");
1416 		if (connected && !closeRemoteSocket(fd, forced) && !forced)
1417 			return false;
1418 		
1419 		if (!connected || forced) {
1420 			// todo: flush the socket here?
1421 
1422 			import core.sys.posix.unistd : close;
1423 			log("close");
1424 			int err = close(fd);
1425 			if (catchError!"closesocket"(err)) 
1426 				return false;
1427 		}
1428 		return true;
1429 	}
1430 
1431 	
1432 	NetworkAddress getAddressFromIP(in string ipAddr, in ushort port = 0, in bool ipv6 = false, in bool tcp = true) 
1433 	in {
1434 		debug import libasync.internals.validator : validateIPv4, validateIPv6;
1435 		debug assert(validateIPv4(ipAddr) || validateIPv6(ipAddr), "Trying to connect to an invalid IP address");
1436 	}
1437 	body {
1438 		import libasync.internals.socket_compat : addrinfo, AI_NUMERICHOST, AI_NUMERICSERV;
1439 		addrinfo hints;
1440 		hints.ai_flags |= AI_NUMERICHOST | AI_NUMERICSERV; // Specific to an IP resolver!
1441 
1442 		return getAddressInfo(ipAddr, port, ipv6, tcp, hints);
1443 	}
1444 
1445 
1446 	NetworkAddress getAddressFromDNS(in string host, in ushort port = 0, in bool ipv6 = true, in bool tcp = true)
1447 	/*in { 
1448 		debug import libasync.internals.validator : validateHost;
1449 		debug assert(validateHost(host), "Trying to connect to an invalid domain"); 
1450 	}
1451 	body */{
1452 		import libasync.internals.socket_compat : addrinfo;
1453 		addrinfo hints;
1454 		return getAddressInfo(host, port, ipv6, tcp, hints);
1455 	}
1456 	
1457 	void setInternalError(string TRACE)(in Status s, in string details = "", in error_t error = EPosix.EACCES)
1458 	{
1459 		if (details.length > 0)
1460 			m_status.text = TRACE ~ ": " ~ details;
1461 		else m_status.text = TRACE;
1462 		m_error = error;
1463 		m_status.code = s;
1464 		static if(LOG) log(m_status);
1465 	}
1466 private:	
1467 
1468 	/// For DirectoryWatcher
1469 	/// In kqueue/vnode, all we get is the folder in which changes occured.
1470 	/// We have to figure out what changed exactly and put the results in a container 
1471 	/// for the readChanges call.
1472 	static if (!EPOLL) bool compareFolderFiles(DWFolderInfo fi, DWFileEvent events) {
1473 		import std.file;
1474 		import std.path : buildPath;
1475 		try {
1476 			Array!Path currFiles;
1477 			auto wd = fi.wi.wd;
1478 			auto path = fi.wi.path;
1479 			auto fd = fi.fd;
1480 			Array!(DWChangeInfo)* changes = m_changes.get(fd, null);
1481 			assert(changes !is null, "Invalid wd, could not find changes array.");
1482 			//import std.stdio : writeln;
1483 			//writeln("Scanning path: ", path.toNativeString());
1484 			//writeln("m_dwFiles length: ", m_dwFiles.length);
1485 		
1486 			// get a list of the folder
1487 			foreach (de; dirEntries(path.toNativeString(), SpanMode.shallow)) {
1488 				//writeln(de.name);
1489 				Path entryPath = Path(de.name);
1490 				if (!entryPath.absolute)
1491 					entryPath = path ~ entryPath;
1492 				bool found;
1493 
1494 				// compare it to the cached list fixme: make it faster using another container?
1495 				foreach (ref const fd_t id, ref const DWFileInfo file; m_dwFiles) {
1496 					if (file.folder != wd) continue; // this file isn't in the evented folder
1497 					if (file.path == entryPath) {
1498 						found = true;
1499 						log("File modified? " ~ entryPath.toNativeString() ~ " at: " ~ de.timeLastModified.to!string ~ " vs: " ~ file.lastModified.to!string);
1500 						// Check if it was modified
1501 						if (!isDir(entryPath.toNativeString()) && de.timeLastModified > file.lastModified)
1502 						{
1503 							DWFileInfo dwf = file;
1504 							dwf.lastModified = de.timeLastModified;
1505 							m_dwFiles[id] = dwf;
1506 							changes.insertBack(DWChangeInfo(DWFileEvent.MODIFIED, file.path));
1507 						}
1508 						break;
1509 					}
1510 				}
1511 
1512 				// This file/folder is new in the folder
1513 				if (!found) {
1514 					changes.insertBack(DWChangeInfo(DWFileEvent.CREATED, entryPath));
1515 
1516 					if (fi.wi.recursive && isDir(entryPath.toNativeString())) {
1517 						/// This is the complicated part. The folder needs to be watched, and all the events
1518 						/// generated for every file/folder found recursively inside it,
1519 						/// Useful e.g. when mkdir -p is used.
1520 						watch(fd, WatchInfo(fi.wi.events, entryPath, fi.wi.recursive, wd) );
1521 						void genEvents(Path subpath) {
1522 							foreach (de; dirEntries(subpath.toNativeString(), SpanMode.shallow)) {
1523 								auto subsubpath = Path(de.name);
1524 								if (!subsubpath.absolute())
1525 									subsubpath = subpath ~ subsubpath;
1526 								changes.insertBack(DWChangeInfo(DWFileEvent.CREATED, subsubpath));
1527 								if (isDir(subsubpath.toNativeString()))
1528 									genEvents(subsubpath);
1529 							}
1530 						}
1531 
1532 						genEvents(entryPath);
1533 
1534 					}
1535 					else {
1536 						EventInfo* evinfo;
1537 						try evinfo = m_watchers[fd]; catch { assert(false, "Could retrieve event info, directory watcher was not initialized properly, or you are operating on a closed directory watcher."); }
1538 
1539 						log("Adding path: " ~ path.toNativeString());
1540 
1541 						import core.sys.posix.fcntl : open;
1542 						fd_t fwd = open(entryPath.toNativeString().toStringz, O_EVTONLY);
1543 						if (catchError!"open(watch)"(fwd))
1544 							return 0;
1545 
1546 						kevent_t _event;
1547 						
1548 						EV_SET(&_event, fwd, EVFILT_VNODE, EV_ADD | EV_CLEAR, fi.wi.events, 0, cast(void*) evinfo);
1549 						
1550 						int err = kevent(m_kqueuefd, &_event, 1, null, 0, null);
1551 
1552 						if (catchError!"kevent_timer_add"(err))
1553 							return 0;
1554 
1555 						m_dwFiles[fwd] = DWFileInfo(fi.wi.wd, entryPath, de.timeLastModified, false);
1556 						
1557 					}
1558 				}
1559 
1560 				// This file/folder is now current. This avoids a deletion event.
1561 				currFiles.insert(entryPath);
1562 			}
1563 
1564 			/// Now search for files/folders that were deleted in this directory (no recursivity needed). 
1565 			/// Unwatch this directory and generate delete event only for the root dir
1566 			foreach (ref const fd_t id, ref const DWFileInfo file; m_dwFiles) {
1567 				if (file.folder != wd) continue; // skip those files in another folder than the evented one
1568 				bool found;
1569 				foreach (Path curr; currFiles) {
1570 					if (file.path == curr){
1571 						found = true;
1572 						break;
1573 					}
1574 				}
1575 				// this file/folder was in the folder but it's not there anymore
1576 				if (!found) {
1577 					// writeln("Deleting: ", file.path.toNativeString());
1578 					kevent_t _event;
1579 					EV_SET(&_event, cast(int) id, EVFILT_VNODE, EV_DELETE, 0, 0, null);
1580 					int err = kevent(m_kqueuefd, &_event, 1, null, 0, null);
1581 					if (catchError!"kevent_unwatch"(err))
1582 						return false;
1583 					import core.sys.posix.unistd : close;
1584 					err = close(id);
1585 					if (catchError!"close(dwFile)"(err))
1586 						return false;
1587 					changes.insert(DWChangeInfo(DWFileEvent.DELETED, file.path));
1588 
1589 					if (fi.wi.recursive && file.is_dir) 
1590 						unwatch(fd, id);
1591 
1592 					m_dwFiles.remove(id);
1593 
1594 				}
1595 				
1596 			}
1597 			if(changes.empty)
1598 				return false; // unhandled event, skip the callback
1599 
1600 			// fixme: how to implement moved_from moved_to for rename?
1601 		}
1602 		catch (Exception e) 
1603 		{
1604 			try setInternalError!"compareFiles"(Status.ERROR, "Fatal error in file comparison: " ~ e.toString()); catch {}
1605 			return false;
1606 		}
1607 		return true;
1608 	}
1609 
1610 	// socket must not be connected
1611 	bool setNonBlock(fd_t fd) {
1612 		import core.sys.posix.fcntl : fcntl, F_GETFL, F_SETFL, O_NONBLOCK;
1613 		int flags = fcntl(fd, F_GETFL);
1614 		flags |= O_NONBLOCK;
1615 		int err = fcntl(fd, F_SETFL, flags);
1616 		if (catchError!"F_SETFL O_NONBLOCK"(err)) {
1617 			closeSocket(fd, false);
1618 			return false;
1619 		}
1620 		return true;
1621 	}
1622 	
1623 	bool onTCPAccept(fd_t fd, TCPAcceptHandler del, int events)
1624 	{
1625 		import libasync.internals.socket_compat : AF_INET, AF_INET6, socklen_t, accept4, accept;
1626 		enum O_NONBLOCK     = 0x800;    // octal    04000
1627 
1628 		static if (EPOLL) 
1629 		{
1630 			const uint epoll_events = cast(uint) events;
1631 			const bool incoming = cast(bool) (epoll_events & EPOLLIN);
1632 			const bool error = cast(bool) (epoll_events & EPOLLERR);
1633 		}
1634 		else 
1635 		{
1636 			const short kqueue_events = cast(short) (events >> 16);
1637 			const ushort kqueue_flags = cast(ushort) (events & 0xffff);
1638 			const bool incoming = cast(bool)(kqueue_events & EVFILT_READ);
1639 			const bool error = cast(bool)(kqueue_flags & EV_ERROR);
1640 		}
1641 		
1642 		if (incoming) { // accept incoming connection
1643 			do {
1644 				NetworkAddress addr;
1645 				addr.family = AF_INET;
1646 				socklen_t addrlen = addr.sockAddrLen;
1647 
1648 				bool ret;
1649 				static if (EPOLL) {
1650 					/// Accept the connection and create a client socket
1651 					fd_t csock = accept4(fd, addr.sockAddr, &addrlen, O_NONBLOCK);
1652 
1653 					if (catchError!".accept"(csock)) {
1654 						ret = false;
1655 						return ret;
1656 					}
1657 				} else /* if KQUEUE */ {
1658 					fd_t csock = accept(fd, addr.sockAddr, &addrlen);
1659 					
1660 					if (catchError!".accept"(csock)) {
1661 						ret = false;
1662 						return ret;
1663 					}
1664 					
1665 					// Make non-blocking so subsequent calls to recv/send return immediately
1666 					if (!setNonBlock(csock)) {
1667 						ret = false;
1668 						return ret;
1669 					}
1670 				}
1671 
1672 				// Set client address family based on address length
1673 				if (addrlen > addr.sockAddrLen)
1674 					addr.family = AF_INET6;
1675 				if (addrlen == socklen_t.init) {
1676 					setInternalError!"addrlen"(Status.ABORT);
1677 					import core.sys.posix.unistd : close;
1678 					close(csock);
1679 					{
1680 						ret = false;
1681 						return ret;
1682 					}
1683 				}
1684 
1685 				// Allocate a new connection handler object
1686 				AsyncTCPConnection conn;
1687 				try conn = FreeListObjectAlloc!AsyncTCPConnection.alloc(m_evLoop);
1688 				catch (Exception e){ assert(false, "Allocation failure"); }
1689 				conn.peer = addr;
1690 				conn.socket = csock;
1691 				conn.inbound = true;
1692 
1693 				nothrow bool closeClient() {
1694 					try FreeListObjectAlloc!AsyncTCPConnection.free(conn);
1695 					catch (Exception e){ assert(false, "Free failure"); }
1696 					closeSocket(csock, true, true);
1697 					{
1698 						ret = false;
1699 						return ret;
1700 					}
1701 				}
1702 
1703 				// Get the connection handler from the callback
1704 				TCPEventHandler evh;
1705 				try {
1706 					evh = del(conn);
1707 					if (evh == TCPEventHandler.init || !initTCPConnection(csock, conn, evh, true)) {
1708 						try log("Failed to connect"); catch {}
1709 						return closeClient();
1710 					}
1711 					try log("Connection Started with " ~ csock.to!string); catch {}
1712 				}
1713 				catch (Exception e) {
1714 					log("Close socket");
1715 					return closeClient();
1716 				}
1717 
1718 				// Announce connection state to the connection handler
1719 				try {
1720 					log("Connected to: " ~ addr.toString());
1721 					evh.conn.connected = true;
1722 					evh(TCPEvent.CONNECT);
1723 				}
1724 				catch (Exception e) {
1725 					setInternalError!"del@TCPEvent.CONNECT"(Status.ABORT);
1726 					{
1727 						ret = false;
1728 						return ret;
1729 					}
1730 				}
1731 			} while(true);
1732 
1733 		}
1734 		
1735 		if (error) { // socket failure
1736 			m_status.text = "listen socket error";
1737 			int err;
1738 			import libasync.internals.socket_compat : getsockopt, socklen_t, SOL_SOCKET, SO_ERROR;
1739 			socklen_t len = int.sizeof;
1740 			getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len);
1741 			m_error = cast(error_t) err;
1742 			m_status.code = Status.ABORT;
1743 			static if(LOG) log(m_status);
1744 
1745 			// call with null to announce a failure
1746 			try del(null);
1747 			catch(Exception e){ assert(false, "Failure calling TCPAcceptHandler(null)"); }
1748 
1749 			/// close the listener?
1750 			// closeSocket(fd, false);
1751 		}
1752 		return true;
1753 	}
1754 
1755 	bool onUDPTraffic(fd_t fd, UDPHandler del, int events) 
1756 	{
1757 		static if (EPOLL) 
1758 		{
1759 			const uint epoll_events = cast(uint) events;
1760 			const bool read = cast(bool) (epoll_events & EPOLLIN);
1761 			const bool write = cast(bool) (epoll_events & EPOLLOUT);
1762 			const bool error = cast(bool) (epoll_events & EPOLLERR);
1763 		}
1764 		else 
1765 		{
1766 			const short kqueue_events = cast(short) (events >> 16);
1767 			const ushort kqueue_flags = cast(ushort) (events & 0xffff);
1768 			const bool read = cast(bool) (kqueue_events & EVFILT_READ);
1769 			const bool write = cast(bool) (kqueue_events & EVFILT_WRITE);
1770 			const bool error = cast(bool) (kqueue_flags & EV_ERROR);
1771 		}
1772 
1773 		if (read) {
1774 			try {
1775 				del(UDPEvent.READ);
1776 			}
1777 			catch (Exception e) {
1778 				setInternalError!"del@UDPEvent.READ"(Status.ABORT);
1779 				return false;
1780 			}
1781 		}
1782 
1783 		if (write) { 
1784 
1785 			try {
1786 				del(UDPEvent.WRITE);
1787 			}
1788 			catch (Exception e) {
1789 				setInternalError!"del@UDPEvent.WRITE"(Status.ABORT);
1790 				return false;
1791 			}
1792 		}
1793 
1794 		if (error) // socket failure
1795 		{ 
1796 
1797 			import libasync.internals.socket_compat : socklen_t, getsockopt, SOL_SOCKET, SO_ERROR;
1798 			import core.sys.posix.unistd : close;
1799 			int err;
1800 			socklen_t errlen = err.sizeof;
1801 			getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &errlen);
1802 			setInternalError!"EPOLLERR"(Status.ABORT, null, cast(error_t)err);
1803 			close(fd);
1804 		}
1805 		
1806 		return true;
1807 	}
1808 
1809 	bool onTCPTraffic(fd_t fd, TCPEventHandler del, int events, AsyncTCPConnection conn) 
1810 	{
1811 		log("TCP Traffic at FD#" ~ fd.to!string);
1812 
1813 		static if (EPOLL) 
1814 		{
1815 			const uint epoll_events = cast(uint) events;
1816 			const bool connect = ((cast(bool) (epoll_events & EPOLLIN)) || (cast(bool) (epoll_events & EPOLLOUT))) && !conn.disconnecting && !conn.connected;
1817 			bool read = cast(bool) (epoll_events & EPOLLIN);
1818 			const bool write = cast(bool) (epoll_events & EPOLLOUT);
1819 			const bool error = cast(bool) (epoll_events & EPOLLERR);
1820 			const bool close = (cast(bool) (epoll_events & EPOLLRDHUP)) || (cast(bool) (events & EPOLLHUP));
1821 		}
1822 		else /* if KQUEUE */
1823 		{
1824 			const short kqueue_events = cast(short) (events >> 16);
1825 			const ushort kqueue_flags = cast(ushort) (events & 0xffff);
1826 			const bool connect = cast(bool) ((kqueue_events & EVFILT_READ || kqueue_events & EVFILT_WRITE) && !conn.disconnecting && !conn.connected);
1827 			bool read = cast(bool) (kqueue_events & EVFILT_READ) && !connect;
1828 			const bool write = cast(bool) (kqueue_events & EVFILT_WRITE);
1829 			const bool error = cast(bool) (kqueue_flags & EV_ERROR);
1830 			const bool close = cast(bool) (kqueue_flags & EV_EOF);
1831 		}
1832 
1833 		if (error) 
1834 		{
1835 			import libasync.internals.socket_compat : socklen_t, getsockopt, SOL_SOCKET, SO_ERROR;
1836 			int err;
1837 			try log("Also got events: " ~ connect.to!string ~ " c " ~ read.to!string ~ " r " ~ write.to!string ~ " write"); catch {}
1838 			socklen_t errlen = err.sizeof;
1839 			getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &errlen);
1840 			setInternalError!"EPOLLERR"(Status.ABORT, null, cast(error_t)err);
1841 			try 
1842 				del(TCPEvent.ERROR);
1843 			catch (Exception e)
1844 			{ 
1845 				setInternalError!"del@TCPEvent.ERROR"(Status.ABORT);
1846 				// ignore failure...
1847 			}
1848 			return false;
1849 		}
1850 
1851 		
1852 		if (connect) 
1853 		{
1854 			try log("!connect"); catch {}
1855 			conn.connected = true;
1856 			try del(TCPEvent.CONNECT);
1857 			catch (Exception e) {
1858 				setInternalError!"del@TCPEvent.CONNECT"(Status.ABORT);
1859 				return false;
1860 			}
1861 			return true;
1862 		}
1863 
1864 
1865 		if (write && conn.connected && !conn.disconnecting && conn.writeBlocked) 
1866 		{
1867 			conn.writeBlocked = false;
1868 			try log("!write"); catch {}
1869 			try del(TCPEvent.WRITE);
1870 			catch (Exception e) {
1871 				setInternalError!"del@TCPEvent.WRITE"(Status.ABORT);
1872 				return false;
1873 			}
1874 		}
1875 		else {
1876 			read = true;
1877 		}
1878 
1879 		if (read && conn.connected && !conn.disconnecting)
1880 		{
1881 			try log("!read"); catch {}
1882 			try del(TCPEvent.READ);
1883 			catch (Exception e) {
1884 				setInternalError!"del@TCPEvent.READ"(Status.ABORT);
1885 				return false;
1886 			}
1887 		}
1888 
1889 		if (close && conn.connected && !conn.disconnecting) 
1890 		{
1891 			try log("!close"); catch {}
1892 			// todo: See if this hack is still necessary
1893 			if (!conn.connected && conn.disconnecting)
1894 				return true;
1895 			
1896 			try del(TCPEvent.CLOSE);
1897 			catch (Exception e) {
1898 				setInternalError!"del@TCPEvent.CLOSE"(Status.ABORT);
1899 				return false;
1900 			}
1901 			closeSocket(fd, !conn.disconnecting, conn.connected);
1902 			
1903 			m_status.code = Status.ABORT;
1904 			conn.disconnecting = true;
1905 			conn.connected = false;
1906 			conn.writeBlocked = true;
1907 			del.conn.socket = 0;
1908 			
1909 			try FreeListObjectAlloc!EventInfo.free(del.conn.evInfo);
1910 			catch (Exception e){ assert(false, "Error freeing resources"); }
1911 			
1912 			if (del.conn.inbound) {
1913 				log("Freeing inbound connection");
1914 				try FreeListObjectAlloc!AsyncTCPConnection.free(del.conn);
1915 				catch (Exception e){ assert(false, "Error freeing resources"); }
1916 			}
1917 		}
1918 		return true;
1919 	}
1920 	
1921 	bool initUDPSocket(fd_t fd, AsyncUDPSocket ctxt, UDPHandler del)
1922 	{
1923 		import libasync.internals.socket_compat : bind;
1924 		import core.sys.posix.unistd;
1925 
1926 		fd_t err;
1927 
1928 		EventObject eo;
1929 		eo.udpHandler = del;
1930 		EventInfo* ev;
1931 		try ev = FreeListObjectAlloc!EventInfo.alloc(fd, EventType.UDPSocket, eo, m_instanceId);
1932 		catch (Exception e){ assert(false, "Allocation error"); }
1933 		ctxt.evInfo = ev;
1934 		nothrow bool closeAll() {
1935 			try FreeListObjectAlloc!EventInfo.free(ev);
1936 			catch(Exception e){ assert(false, "Failed to free resources"); }
1937 			ctxt.evInfo = null;
1938 			// socket will be closed by caller if return false
1939 			return false;
1940 		}
1941 
1942 		static if (EPOLL)
1943 		{
1944 			epoll_event _event;
1945 			_event.data.ptr = ev;
1946 			_event.events = EPOLLIN | EPOLLOUT | EPOLLET;
1947 			err = epoll_ctl(m_epollfd, EPOLL_CTL_ADD, fd, &_event);
1948 			if (catchError!"epoll_ctl"(err)) {
1949 				return closeAll();
1950 			}
1951 			nothrow void deregisterEvent() {}
1952 		}
1953 		else /* if KQUEUE */
1954 		{
1955 			kevent_t[2] _event;
1956 			EV_SET(&(_event[0]), fd, EVFILT_READ, EV_ADD | EV_ENABLE, 0, 0, ev);
1957 			EV_SET(&(_event[1]), fd, EVFILT_WRITE, EV_ADD | EV_ENABLE, 0, 0, ev);
1958 			err = kevent(m_kqueuefd, &(_event[0]), 2, null, 0, null);
1959 			if (catchError!"kevent_add_udp"(err))
1960 				return closeAll();
1961 			
1962 			nothrow void deregisterEvent() {
1963 				EV_SET(&(_event[0]), fd, EVFILT_READ, EV_DELETE | EV_DISABLE, 0, 0, null);
1964 				EV_SET(&(_event[1]), fd, EVFILT_WRITE, EV_DELETE | EV_DISABLE, 0, 0, null);
1965 				kevent(m_kqueuefd, &(_event[0]), 2, null, 0, cast(libasync.internals.kqueue.timespec*) null);
1966 			}
1967 
1968 		}
1969 
1970 		/// Start accepting packets
1971 		err = bind(fd, ctxt.local.sockAddr, ctxt.local.sockAddrLen);
1972 		if (catchError!"bind"(err)) {
1973 			deregisterEvent();
1974 			return closeAll();
1975 		}
1976 
1977 		return true;
1978 	}
1979 
1980 
1981 	bool initTCPListener(fd_t fd, AsyncTCPListener ctxt, TCPAcceptHandler del, bool reusing = false)
1982 	in {
1983 		assert(ctxt.local !is NetworkAddress.init);
1984 	}
1985 	body {
1986 		import libasync.internals.socket_compat : bind, listen, SOMAXCONN;
1987 		fd_t err;
1988 
1989 		/// Create callback object
1990 		EventObject eo;
1991 		eo.tcpAcceptHandler = del;
1992 		EventInfo* ev;
1993 
1994 		try ev = FreeListObjectAlloc!EventInfo.alloc(fd, EventType.TCPAccept, eo, m_instanceId);
1995 		catch (Exception e){ assert(false, "Allocation error"); }
1996 		ctxt.evInfo = ev;
1997 		nothrow bool closeAll() {
1998 			try FreeListObjectAlloc!EventInfo.free(ev);
1999 			catch(Exception e){ assert(false, "Failed free"); }
2000 			ctxt.evInfo = null;
2001 			// Socket is closed by run()
2002 			//closeSocket(fd, false);
2003 			return false;
2004 		}
2005 
2006 		/// Add socket to event loop
2007 		static if (EPOLL)
2008 		{
2009 			epoll_event _event;
2010 			_event.data.ptr = ev;
2011 			_event.events = EPOLLIN | EPOLLET;
2012 			err = epoll_ctl(m_epollfd, EPOLL_CTL_ADD, fd, &_event);
2013 			if (catchError!"epoll_ctl_add"(err))
2014 				return closeAll();
2015 
2016 			nothrow void deregisterEvent() {
2017 				// epoll cleans itself when closing the socket
2018 			}
2019 		}
2020 		else /* if KQUEUE */
2021 		{
2022 			kevent_t _event;
2023 			EV_SET(&_event, fd, EVFILT_READ, EV_ADD | EV_ENABLE, 0, 0, ev);
2024 			err = kevent(m_kqueuefd, &_event, 1, null, 0, null);
2025 			if (catchError!"kevent_add_listener"(err))
2026 				return closeAll();
2027 
2028 			nothrow void deregisterEvent() {
2029 				EV_SET(&_event, fd, EVFILT_READ, EV_CLEAR | EV_DISABLE, 0, 0, null);
2030 				kevent(m_kqueuefd, &_event, 1, null, 0, null);
2031 				// wouldn't know how to deal with errors here...
2032 			}
2033 		}
2034 
2035 		/// Bind and listen to socket
2036 		if (!reusing) {
2037 			err = bind(fd, ctxt.local.sockAddr, ctxt.local.sockAddrLen);
2038 			if (catchError!"bind"(err)) {
2039 				deregisterEvent();
2040 				return closeAll();
2041 			}
2042 
2043 			err = listen(fd, SOMAXCONN);
2044 			if (catchError!"listen"(err)) {
2045 				deregisterEvent();
2046 				return closeAll();
2047 			}
2048 
2049 		}
2050 		return true;
2051 	}
2052 
2053 	bool initTCPConnection(fd_t fd, AsyncTCPConnection ctxt, TCPEventHandler del, bool inbound = false)
2054 	in { 
2055 		assert(ctxt.peer.port != 0, "Connecting to an invalid port");
2056 	}
2057 	body {
2058 
2059 		fd_t err;
2060 
2061 		/// Create callback object
2062 		import libasync.internals.socket_compat : connect;
2063 		EventObject eo;
2064 		eo.tcpEvHandler = del;
2065 		EventInfo* ev;
2066 
2067 		try ev = FreeListObjectAlloc!EventInfo.alloc(fd, EventType.TCPTraffic, eo, m_instanceId);
2068 		catch (Exception e){ assert(false, "Allocation error"); }
2069 		assert(ev !is null);
2070 		ctxt.evInfo = ev;
2071 		nothrow bool destroyEvInfo() {
2072 			try FreeListObjectAlloc!EventInfo.free(ev);
2073 			catch(Exception e){ assert(false, "Failed to free resources"); }
2074 			ctxt.evInfo = null;
2075 
2076 			// Socket will be closed by run()
2077 			// closeSocket(fd, false);
2078 			return false;
2079 		}
2080 
2081 		/// Add socket and callback object to event loop
2082 		static if (EPOLL)
2083 		{
2084 			epoll_event _event = void;
2085 			_event.data.ptr = ev;
2086 			_event.events = 0 | EPOLLIN | EPOLLOUT | EPOLLERR | EPOLLHUP | EPOLLRDHUP | EPOLLET;
2087 			err = epoll_ctl(m_epollfd, EPOLL_CTL_ADD, fd, &_event);
2088 			log("Connection FD#" ~ fd.to!string ~ " added to " ~ m_epollfd.to!string);
2089 			if (catchError!"epoll_ctl_add"(err))
2090 				return destroyEvInfo();
2091 
2092 			nothrow void deregisterEvent() {
2093 				// will be handled automatically when socket is closed
2094 			}
2095 		}
2096 		else /* if KQUEUE */
2097 		{
2098 			kevent_t[2] events = void;
2099 			try log("Register event ptr " ~ ev.to!string); catch {}
2100 			assert(ev.evType == EventType.TCPTraffic, "Bad event type for TCP Connection");
2101 			EV_SET(&(events[0]), fd, EVFILT_READ, EV_ADD | EV_ENABLE | EV_CLEAR, 0, 0, cast(void*) ev);
2102 			EV_SET(&(events[1]), fd, EVFILT_WRITE, EV_ADD | EV_ENABLE | EV_CLEAR, 0, 0, cast(void*) ev);
2103 			assert((cast(EventInfo*)events[0].udata) == ev && (cast(EventInfo*)events[1].udata) == ev);
2104 			assert((cast(EventInfo*)events[0].udata).owner == m_instanceId && (cast(EventInfo*)events[1].udata).owner == m_instanceId);
2105 			err = kevent(m_kqueuefd, &(events[0]), 2, null, 0, null);
2106 			if (catchError!"kevent_add_tcp"(err))
2107 				return destroyEvInfo();
2108 
2109 			// todo: verify if this allocates on the GC?
2110 			nothrow void deregisterEvent() {
2111 				EV_SET(&(events[0]), fd, EVFILT_READ, EV_DELETE | EV_DISABLE, 0, 0, null);
2112 				EV_SET(&(events[1]), fd, EVFILT_WRITE, EV_DELETE | EV_DISABLE, 0, 0, null);
2113 				kevent(m_kqueuefd, &(events[0]), 2, null, 0, null);
2114 				// wouldn't know how to deal with errors here...
2115 			}
2116 		}
2117 
2118 		// Inbound objects are already connected
2119 		if (inbound) return true;
2120 
2121 		// Connect is blocking, but this makes the socket non-blocking for send/recv
2122 		if (!setNonBlock(fd)) {
2123 			deregisterEvent();
2124 			return destroyEvInfo();
2125 		}
2126 
2127 		/// Start the connection
2128 		err = connect(fd, ctxt.peer.sockAddr, ctxt.peer.sockAddrLen);
2129 		if (catchErrorsEq!"connect"(err, [ tuple(cast(fd_t)SOCKET_ERROR, EPosix.EINPROGRESS, Status.ASYNC) ]))
2130 			return true;
2131 		if (catchError!"connect"(err)) {
2132 			deregisterEvent();
2133 			return destroyEvInfo();
2134 		}
2135 
2136 		return true;
2137 	}
2138 
2139 	bool catchError(string TRACE, T)(T val, T cmp = SOCKET_ERROR)
2140 		if (isIntegral!T)
2141 	{
2142 		if (val == cmp) {
2143 			m_status.text = TRACE;
2144 			m_error = lastError();
2145 			m_status.code = Status.ABORT;
2146 			static if(LOG) log(m_status);
2147 			return true;
2148 		}
2149 		return false;
2150 	}
2151 
2152 	bool catchSocketError(string TRACE)(fd_t fd)
2153 	{
2154 		m_status.text = TRACE;
2155 		int err;
2156 		import libasync.internals.socket_compat : getsockopt, socklen_t, SOL_SOCKET, SO_ERROR;
2157 		socklen_t len = int.sizeof;
2158 		getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len);
2159 		m_error = cast(error_t) err;
2160 		if (m_error != EPosix.EOK) {
2161 			m_status.code = Status.ABORT;
2162 			static if(LOG) log(m_status);
2163 			return true;
2164 		}
2165 
2166 		return false;
2167 	}
2168 
2169 	bool catchEvLoopErrors(string TRACE, T)(T val, Tuple!(T, Status)[] cmp ...)
2170 		if (isIntegral!T)
2171 	{
2172 		if (val == SOCKET_ERROR) {
2173 			int err = errno;
2174 			foreach (validator ; cmp) {
2175 				if (errno == validator[0]) {
2176 					m_status.text = TRACE;
2177 					m_error = lastError();
2178 					m_status.code = validator[1];
2179 					static if(LOG) log(m_status);
2180 					return true;
2181 				}
2182 			}
2183 
2184 			m_status.text = TRACE;
2185 			m_status.code = Status.EVLOOP_FAILURE;
2186 			m_error = lastError();
2187 			log(m_status);
2188 			return true;
2189 		}
2190 		return false;
2191 	}
2192 
2193 	/**
2194 	 * If the value at val matches the tuple first argument T, get the last error,
2195 	 * and if the last error matches tuple second argument error_t, set the Status as
2196 	 * tuple third argument Status.
2197 	 * 
2198 	 * Repeats for each comparison tuple until a match in which case returns true.
2199 	*/
2200 	bool catchErrorsEq(string TRACE, T)(T val, Tuple!(T, error_t, Status)[] cmp ...)
2201 		if (isIntegral!T)
2202 	{
2203 		error_t err;
2204 		foreach (validator ; cmp) {
2205 			if (val == validator[0]) {
2206 				if (err is EPosix.init) err = lastError();
2207 				if (err == validator[1]) {
2208 					m_status.text = TRACE;
2209 					m_status.code = validator[2];
2210 					if (m_status.code == Status.EVLOOP_TIMEOUT) {
2211 						log(m_status);
2212 						break;
2213 					}
2214 					m_error = lastError();
2215 					static if(LOG) log(m_status);
2216 					return true;
2217 				}
2218 			}
2219 		}
2220 		return false;
2221 	}
2222 
2223 
2224 	error_t lastError() {
2225 		try {
2226 			return cast(error_t) errno;
2227 		} catch(Exception e) {
2228 			return EPosix.EACCES;
2229 		}
2230 
2231 	}
2232 	
2233 	void log(StatusInfo val)
2234 	{
2235 		static if (LOG) {
2236 			import std.stdio;
2237 			try {
2238 				writeln("Backtrace: ", m_status.text);
2239 				writeln(" | Status:  ", m_status.code);
2240 				writeln(" | Error: " , m_error);
2241 				if ((m_error in EPosixMessages) !is null)
2242 					writeln(" | Message: ", EPosixMessages[m_error]);
2243 			} catch(Exception e) {
2244 				return;
2245 			}
2246 		}
2247 	}
2248 
2249 	void log(T)(T val)
2250 	{
2251 		static if (LOG) {
2252 			import std.stdio;
2253 			try {
2254 				writeln(val);
2255 			} catch(Exception e) {
2256 				return;
2257 			}
2258 		}
2259 	}
2260 
2261 	NetworkAddress getAddressInfo(addrinfo)(in string host, ushort port, bool ipv6, bool tcp, ref addrinfo hints) 
2262 	{
2263 		m_status = StatusInfo.init;
2264 		import libasync.internals.socket_compat : AF_INET, AF_INET6, SOCK_DGRAM, SOCK_STREAM, IPPROTO_TCP, IPPROTO_UDP, freeaddrinfo, getaddrinfo;
2265 
2266 		NetworkAddress addr;
2267 		addrinfo* infos;
2268 		error_t err;
2269 		if (ipv6) {
2270 			addr.family = AF_INET6;
2271 			hints.ai_family = AF_INET6;
2272 		}
2273 		else {
2274 			addr.family = AF_INET;
2275 			hints.ai_family = AF_INET;
2276 		}
2277 		if (tcp) {
2278 			hints.ai_socktype = SOCK_STREAM;
2279 			hints.ai_protocol = IPPROTO_TCP;
2280 		}
2281 		else {
2282 			hints.ai_socktype = SOCK_DGRAM;
2283 			hints.ai_protocol = IPPROTO_UDP;
2284 		}
2285 
2286 		static if (LOG) {
2287 			log("Resolving " ~ host ~ ":" ~ port.to!string);
2288 		}
2289 
2290 		auto chost = host.toStringz();
2291 
2292 		if (port != 0) {
2293 			addr.port = port;
2294 			const(char)* cPort = cast(const(char)*) port.to!string.toStringz;
2295 			err = cast(error_t) getaddrinfo(chost, cPort, &hints, &infos);
2296 		}
2297 		else {
2298 			err = cast(error_t) getaddrinfo(chost, null, &hints, &infos);
2299 		}
2300 
2301 		if (err != EPosix.EOK) {
2302 			setInternalError!"getAddressInfo"(Status.ERROR, string.init, err);
2303 			return NetworkAddress.init;
2304 		}
2305 		ubyte* pAddr = cast(ubyte*) infos.ai_addr;
2306 		ubyte* data = cast(ubyte*) addr.sockAddr;
2307 		data[0 .. infos.ai_addrlen] = pAddr[0 .. infos.ai_addrlen]; // perform bit copy
2308 		freeaddrinfo(infos);
2309 		return addr;
2310 	}
2311 
2312 
2313 	
2314 }
2315 
2316 
2317 static if (!EPOLL)
2318 {
2319 	import std.container : Array;
2320 	import core.sync.mutex : Mutex;
2321 	import core.sync.rwmutex : ReadWriteMutex;
2322 	size_t g_evIdxCapacity;
2323 	Array!size_t g_evIdxAvailable;
2324 
2325 	// called on run
2326 	nothrow size_t createIndex() {
2327 		size_t idx;
2328 		import std.algorithm : max;
2329 		try {
2330 			
2331 			size_t getIdx() {
2332 				
2333 				if (!g_evIdxAvailable.empty) {
2334 					immutable size_t ret = g_evIdxAvailable.back;
2335 					g_evIdxAvailable.removeBack();
2336 					return ret;
2337 				}
2338 				return 0;
2339 			}
2340 			
2341 			idx = getIdx();
2342 			if (idx == 0) {
2343 				import std.range : iota;
2344 				g_evIdxAvailable.insert( iota(g_evIdxCapacity, max(32, g_evIdxCapacity * 2), 1) );
2345 				g_evIdxCapacity = max(32, g_evIdxCapacity * 2);
2346 				idx = getIdx();
2347 			}
2348 			
2349 		} catch (Throwable e) {
2350 
2351 			import std.stdio;
2352 			
2353 			try writeln(e.toString()); catch {}
2354 
2355 		}
2356 		return idx;
2357 	}
2358 
2359 	nothrow void destroyIndex(AsyncNotifier ctxt) {
2360 		try {
2361 			g_evIdxAvailable.insert(ctxt.id);		
2362 		}
2363 		catch (Exception e) {
2364 			assert(false, "Error destroying index: " ~ e.msg);
2365 		}
2366 	}
2367 
2368 	nothrow void destroyIndex(AsyncTimer ctxt) {
2369 		try {
2370 			g_evIdxAvailable.insert(ctxt.id);		
2371 		}
2372 		catch (Exception e) {
2373 			assert(false, "Error destroying index: " ~ e.msg);
2374 		}
2375 	}
2376 
2377 	size_t* g_threadId;
2378 	size_t g_idxCapacity;
2379 	Array!size_t g_idxAvailable;
2380 
2381 	__gshared ReadWriteMutex gs_queueMutex;
2382 	__gshared Array!(Array!AsyncSignal) gs_signalQueue;
2383 	__gshared Array!(Array!size_t) gs_idxQueue; // signals notified
2384 
2385 
2386 	// loop
2387 	nothrow bool popSignals(ref AsyncSignal[] sigarr) {
2388 		bool more;
2389 		try {
2390 			foreach (ref AsyncSignal sig; sigarr) {
2391 				if (!sig)
2392 					break;
2393 				sig = null;
2394 			}
2395 			size_t len;
2396 			synchronized(gs_queueMutex.reader) {
2397 
2398 				if (gs_idxQueue.length <= *g_threadId || gs_idxQueue[*g_threadId].empty)
2399 					return false;
2400 
2401 				len = gs_idxQueue[*g_threadId].length;
2402 				import std.stdio;
2403 				if (sigarr.length < len) {
2404 					more = true;
2405 					len = sigarr.length;
2406 				}
2407 
2408 				size_t i;
2409 				foreach (size_t idx; gs_idxQueue[*g_threadId][0 .. len]){
2410 					sigarr[i] = gs_signalQueue[*g_threadId][idx];
2411 					i++;
2412 				}
2413 			}
2414 
2415 			synchronized (gs_queueMutex.writer) {
2416 				gs_idxQueue[*g_threadId].linearRemove(gs_idxQueue[*g_threadId][0 .. len]);
2417 			}
2418 		}
2419 		catch (Exception e) {
2420 			assert(false, "Could not get pending signals: " ~ e.msg);
2421 		}
2422 		return more;
2423 	}
2424 
2425 	// notify
2426 	nothrow void addSignal(shared AsyncSignal ctxt) {
2427 		try {
2428 			size_t thread_id = ctxt.threadId;
2429 			bool must_resize;
2430 			import std.stdio;
2431 			synchronized (gs_queueMutex.writer) {
2432 				if (gs_idxQueue.empty || gs_idxQueue.length < thread_id + 1) {
2433 					gs_idxQueue.reserve(thread_id + 1);
2434 					foreach (i; gs_idxQueue.length .. gs_idxQueue.capacity) {
2435 						gs_idxQueue.insertBack(Array!size_t.init);
2436 					}
2437 				}
2438 				if (gs_idxQueue[thread_id].empty)
2439 				{
2440 					gs_idxQueue[thread_id].reserve(32);
2441 				}
2442 
2443 				gs_idxQueue[thread_id].insertBack(ctxt.id);
2444 
2445 			}
2446 
2447 		}
2448 		catch (Exception e) {
2449 			assert(false, "Array error: " ~ e.msg);
2450 		}
2451 	}
2452 
2453 	// called on run
2454 	nothrow size_t createIndex(shared AsyncSignal ctxt) {
2455 		size_t idx;
2456 		import std.algorithm : max;
2457 		try {
2458 			bool must_resize;
2459 
2460 			synchronized (gs_queueMutex.reader) {
2461 				if (gs_signalQueue.length < *g_threadId)
2462 					must_resize = true;
2463 			}
2464 
2465 			/// make sure the signal queue is big enough for this thread ID
2466 			if (must_resize) {
2467 				synchronized (gs_queueMutex.writer) {
2468 					while (gs_signalQueue.length <= *g_threadId) 
2469 						gs_signalQueue.insertBack(Array!AsyncSignal.init);
2470 				}
2471 			}
2472 
2473 			size_t getIdx() {
2474 
2475 				if (!g_idxAvailable.empty) {
2476 					immutable size_t ret = g_idxAvailable.back;
2477 					g_idxAvailable.removeBack();
2478 					return ret;
2479 				}
2480 				return 0;
2481 			}
2482 
2483 			idx = getIdx();
2484 			if (idx == 0) {
2485 				import std.range : iota;
2486 				g_idxAvailable.insert( iota(g_idxCapacity,  max(32, g_idxCapacity * 2), 1) );
2487 				g_idxCapacity = max(32, g_idxCapacity * 2);
2488 				idx = getIdx();
2489 			}
2490 
2491 			synchronized (gs_queueMutex.writer) {
2492 				if (gs_signalQueue.empty || gs_signalQueue.length < *g_threadId + 1) {
2493 					
2494 					gs_signalQueue.reserve(*g_threadId + 1);
2495 					foreach (i; gs_signalQueue.length .. gs_signalQueue.capacity) {
2496 						gs_signalQueue.insertBack(Array!AsyncSignal.init);
2497 					}
2498 					
2499 				}
2500 
2501 				if (gs_signalQueue[*g_threadId].empty || gs_signalQueue[*g_threadId].length < idx + 1) {
2502 					
2503 					gs_signalQueue[*g_threadId].reserve(idx + 1);
2504 					foreach (i; gs_signalQueue[*g_threadId].length .. gs_signalQueue[*g_threadId].capacity) {
2505 						gs_signalQueue[*g_threadId].insertBack(cast(AsyncSignal)null);
2506 					}
2507 					
2508 				}
2509 
2510 				gs_signalQueue[*g_threadId][idx] = cast(AsyncSignal) ctxt;
2511 			}
2512 		} catch {}
2513 
2514 		return idx;
2515 	}
2516 
2517 	// called on kill
2518 	nothrow void destroyIndex(shared AsyncSignal ctxt) {
2519 		try {
2520 			g_idxAvailable.insert(ctxt.id);
2521 			synchronized (gs_queueMutex.writer) {
2522 				gs_signalQueue[*g_threadId][ctxt.id] = null;
2523 			}
2524 		}
2525 		catch (Exception e) {
2526 			assert(false, "Error destroying index: " ~ e.msg);
2527 		}
2528 	}
2529 }
2530 
2531 mixin template TCPConnectionMixins() {
2532 	
2533 	private CleanupData m_impl;
2534 	
2535 	struct CleanupData {
2536 		EventInfo* evInfo;
2537 		bool connected;
2538 		bool disconnecting;
2539 		bool writeBlocked;
2540 	}
2541 	
2542 	@property bool disconnecting() const {
2543 		return m_impl.disconnecting;
2544 	}
2545 
2546 	@property void disconnecting(bool b) {
2547 		m_impl.disconnecting = b;
2548 	}
2549 	
2550 	@property bool connected() const {
2551 		return m_impl.connected;
2552 	}
2553 
2554 	@property void connected(bool b) {
2555 		m_impl.connected = b;
2556 	}
2557 
2558 	@property bool writeBlocked() const {
2559 		return m_impl.writeBlocked;
2560 	}
2561 
2562 	@property void writeBlocked(bool b) {
2563 		m_impl.writeBlocked = b;
2564 	}
2565 
2566 	@property EventInfo* evInfo() {
2567 		return m_impl.evInfo;
2568 	}
2569 	
2570 	@property void evInfo(EventInfo* info) {
2571 		m_impl.evInfo = info;
2572 	}
2573 	
2574 }
2575 
2576 mixin template EvInfoMixinsShared() {
2577 
2578 	private CleanupData m_impl;
2579 	
2580 	shared struct CleanupData {
2581 		EventInfo* evInfo;
2582 	}
2583 
2584 	static if (EPOLL) {
2585 		import core.sys.posix.pthread : pthread_t;
2586 		private pthread_t m_pthreadId;
2587 		synchronized @property pthread_t pthreadId() {
2588 			return cast(pthread_t) m_pthreadId;
2589 		}
2590 		/* todo: support multiple event loops per thread?
2591 		private ushort m_sigId;
2592 		synchronized @property ushort sigId() {
2593 			return cast(ushort)m_loopId;
2594 		}
2595 		synchronized @property void sigId(ushort id) {
2596 			m_loopId = cast(shared)id;
2597 		}
2598 		*/
2599 	} 
2600 	else /* if KQUEUE */
2601 	{
2602 		private shared(size_t)* m_owner_id;
2603 		synchronized @property size_t threadId() {
2604 			return cast(size_t) *m_owner_id;
2605 		}
2606 	}
2607 
2608 	@property shared(EventInfo*) evInfo() {
2609 		return m_impl.evInfo;
2610 	}
2611 	
2612 	@property void evInfo(shared(EventInfo*) info) {
2613 		m_impl.evInfo = info;
2614 	}
2615 
2616 }
2617 
2618 mixin template EvInfoMixins() {
2619 	
2620 	private CleanupData m_impl;
2621 	
2622 	struct CleanupData {
2623 		EventInfo* evInfo;
2624 	}
2625 	
2626 	@property EventInfo* evInfo() {
2627 		return m_impl.evInfo;
2628 	}
2629 	
2630 	@property void evInfo(EventInfo* info) {
2631 		m_impl.evInfo = info;
2632 	}
2633 }
2634 
2635 
2636 
2637 union EventObject {
2638 	TCPAcceptHandler tcpAcceptHandler;
2639 	TCPEventHandler tcpEvHandler;
2640 	TimerHandler timerHandler;
2641 	DWHandler dwHandler;
2642 	UDPHandler udpHandler;
2643 	NotifierHandler notifierHandler;
2644 }
2645 
2646 enum EventType : char {
2647 	TCPAccept,
2648 	TCPTraffic,
2649 	UDPSocket,
2650 	Notifier,
2651 	Signal,
2652 	Timer,
2653 	DirectoryWatcher
2654 }
2655 
2656 struct EventInfo {
2657 	fd_t fd;
2658 	EventType evType;
2659 	EventObject evObj;
2660 	ushort owner;
2661 }
2662 
2663 
2664 
2665 /**
2666 		Represents a network/socket address. (taken from vibe.core.net)
2667 */
2668 public struct NetworkAddress {
2669 	import libasync.internals.socket_compat : sockaddr, sockaddr_in, sockaddr_in6, AF_INET, AF_INET6;
2670 	private union {
2671 		sockaddr addr;
2672 		sockaddr_in addr_ip4;
2673 		sockaddr_in6 addr_ip6;
2674 	}
2675 
2676 	@property bool ipv6() const pure nothrow { return this.family == AF_INET6; }
2677 
2678 	/** Family (AF_) of the socket address.
2679 		*/
2680 	@property ushort family() const pure nothrow { return addr.sa_family; }
2681 	/// ditto
2682 	@property void family(ushort val) pure nothrow { addr.sa_family = cast(ubyte)val; }
2683 	
2684 	/** The port in host byte order.
2685 		*/
2686 	@property ushort port()
2687 	const pure nothrow {
2688 		switch (this.family) {
2689 			default: assert(false, "port() called for invalid address family.");
2690 			case AF_INET: return ntoh(addr_ip4.sin_port);
2691 			case AF_INET6: return ntoh(addr_ip6.sin6_port);
2692 		}
2693 	}
2694 	/// ditto
2695 	@property void port(ushort val)
2696 	pure nothrow {
2697 		switch (this.family) {
2698 			default: assert(false, "port() called for invalid address family.");
2699 			case AF_INET: addr_ip4.sin_port = hton(val); break;
2700 			case AF_INET6: addr_ip6.sin6_port = hton(val); break;
2701 		}
2702 	}
2703 	
2704 	/** A pointer to a sockaddr struct suitable for passing to socket functions.
2705 		*/
2706 	@property inout(sockaddr)* sockAddr() inout pure nothrow { return &addr; }
2707 	
2708 	/** Size of the sockaddr struct that is returned by sockAddr().
2709 		*/
2710 	@property uint sockAddrLen()
2711 	const pure nothrow {
2712 		switch (this.family) {
2713 			default: assert(false, "sockAddrLen() called for invalid address family.");
2714 			case AF_INET: return addr_ip4.sizeof;
2715 			case AF_INET6: return addr_ip6.sizeof;
2716 		}
2717 	}
2718 	
2719 	@property inout(sockaddr_in)* sockAddrInet4() inout pure nothrow
2720 	in { assert (family == AF_INET); }
2721 	body { return &addr_ip4; }
2722 	
2723 	@property inout(sockaddr_in6)* sockAddrInet6() inout pure nothrow
2724 	in { assert (family == AF_INET6); }
2725 	body { return &addr_ip6; }
2726 	
2727 	/** Returns a string representation of the IP address
2728 		*/
2729 	string toAddressString()
2730 	const {
2731 		import std.array : appender;
2732 		import std.string : format;
2733 		import std.format : formattedWrite;
2734 		
2735 		switch (this.family) {
2736 			default: assert(false, "toAddressString() called for invalid address family.");
2737 			case AF_INET:
2738 				ubyte[4] ip = (cast(ubyte*)&addr_ip4.sin_addr.s_addr)[0 .. 4];
2739 				return format("%d.%d.%d.%d", ip[0], ip[1], ip[2], ip[3]);
2740 			case AF_INET6:
2741 				ubyte[16] ip = addr_ip6.sin6_addr.s6_addr;
2742 				auto ret = appender!string();
2743 				ret.reserve(40);
2744 				foreach (i; 0 .. 8) {
2745 					if (i > 0) ret.put(':');
2746 					ret.formattedWrite("%x", bigEndianToNative!ushort(cast(ubyte[2])ip[i*2 .. i*2+2].ptr[0 .. 2]));
2747 				}
2748 				return ret.data;
2749 		}
2750 	}
2751 	
2752 	/** Returns a full string representation of the address, including the port number.
2753 		*/
2754 	string toString()
2755 	const {
2756 		
2757 		import std.string : format;
2758 		
2759 		auto ret = toAddressString();
2760 		switch (this.family) {
2761 			default: assert(false, "toString() called for invalid address family.");
2762 			case AF_INET: return ret ~ format(":%s", port);
2763 			case AF_INET6: return format("[%s]:%s", ret, port);
2764 		}
2765 	}
2766 	
2767 }
2768 
2769 private pure nothrow {
2770 	import std.bitmanip;
2771 	
2772 	ushort ntoh(ushort val)
2773 	{
2774 		version (LittleEndian) return swapEndian(val);
2775 		else version (BigEndian) return val;
2776 		else static assert(false, "Unknown endianness.");
2777 	}
2778 	
2779 	ushort hton(ushort val)
2780 	{
2781 		version (LittleEndian) return swapEndian(val);
2782 		else version (BigEndian) return val;
2783 		else static assert(false, "Unknown endianness.");
2784 	}
2785 }