1 module libasync.posix;
2 
3 version (Posix):
4 
5 import libasync.types;
6 import std.string : toStringz;
7 import std.conv : to;
8 import std.datetime : Duration, msecs, seconds, SysTime;
9 import std.traits : isIntegral;
10 import std.typecons : Tuple, tuple;
11 import std.container : Array;
12 import std.exception;
13 
14 import core.stdc.errno;
15 import libasync.events;
16 import libasync.internals.path;
17 import core.sys.posix.signal;
18 import libasync.posix2;
19 import libasync.internals.logging;
20 import core.sync.mutex;
21 import memutils.utils;
22 import memutils.hashmap;
23 
24 alias fd_t = int;
25 
26 
27 version(linux) {
28 	import libasync.internals.epoll;
29 	extern(C) nothrow @nogc {
30 		int __libc_current_sigrtmin();
31 		int __libc_current_sigrtmax();
32 		version(CRuntime_Glibc) int __res_init();
33 	}
34 	bool g_signalsBlocked;
35 	package nothrow void blockSignals() {
36 		try {
37 			/// Block signals to reserve SIGRTMIN .. " +30 for AsyncSignal
38 			sigset_t mask;
39 			// todo: use more signals for more event loops per thread.. (is this necessary?)
40 			//foreach (j; __libc_current_sigrtmin() .. __libc_current_sigrtmax() + 1) {
41 			//import std.stdio : writeln;
42 			//try writeln("Blocked signal " ~ (__libc_current_sigrtmin() + j).to!string ~ " in instance " ~ m_instanceId.to!string); catch (Throwable e) {}
43 			sigemptyset(&mask);
44 			sigaddset(&mask, cast(int) __libc_current_sigrtmin());
45 			pthread_sigmask(SIG_BLOCK, &mask, null);
46 			//}
47 		} catch (Throwable) {}
48 	}
49 	static this() {
50 		blockSignals();
51 		g_signalsBlocked = true;
52 	}
53 }
54 static if (is_OSX || is_iOS) {
55 	import libasync.internals.kqueue;
56 }
57 version(FreeBSD) {
58 	import libasync.internals.kqueue;
59 }
60 version(DragonFlyBSD) {
61 	import libasync.internals.kqueue;
62 }
63 
64 __gshared Mutex g_mutex;
65 
66 static if (!EPOLL) {
67 	private struct DWFileInfo {
68 		fd_t folder;
69 		Path path;
70 		SysTime lastModified;
71 		bool is_dir;
72 	}
73 }
74 
75 private struct DWFolderInfo {
76 	WatchInfo wi;
77 	fd_t fd;
78 }
79 
80 package struct EventLoopImpl {
81 	static if (EPOLL) {
82 		pragma(msg, "Using Linux EPOLL for events");
83 	}
84 	else /* if KQUEUE */
85 	{
86 		pragma(msg, "Using FreeBSD KQueue for events");
87 	}
88 
89 package:
90 	alias error_t = EPosix;
91 
92 nothrow:
93 private:
94 
95 	/// members
96 	EventLoop m_evLoop;
97 	ushort m_instanceId;
98 	bool m_started;
99 	StatusInfo m_status;
100 	error_t m_error = EPosix.EOK;
101 	EventInfo* m_evSignal;
102 	static if (EPOLL){
103 		fd_t m_epollfd;
104 		HashMap!(Tuple!(fd_t, uint), DWFolderInfo) m_dwFolders; // uint = inotify_add_watch(Path)
105 	}
106 	else /* if KQUEUE */
107 	{
108 		fd_t m_kqueuefd;
109 		HashMap!(fd_t, EventInfo*) m_watchers; // fd_t = id++ per AsyncDirectoryWatcher
110 		HashMap!(fd_t, DWFolderInfo) m_dwFolders; // fd_t = open(folder)
111 		HashMap!(fd_t, DWFileInfo) m_dwFiles; // fd_t = open(file)
112 		HashMap!(fd_t, Array!(DWChangeInfo)*) m_changes; // fd_t = id++ per AsyncDirectoryWatcher
113 
114 	}
115 
116 	AsyncAcceptRequest.Queue m_completedSocketAccepts;
117 	AsyncReceiveRequest.Queue m_completedSocketReceives;
118 	AsyncSendRequest.Queue m_completedSocketSends;
119 
120 package:
121 
122 	/// workaround for IDE indent bug on too big files
123 	mixin RunKill!();
124 
125 	@property bool started() const {
126 		return m_started;
127 	}
128 
129 	bool init(EventLoop evl)
130 	in { assert(!m_started); }
131 	body
132 	{
133 
134 		import core.atomic;
135 		shared static ushort i;
136 		string* failer = null;
137 
138 
139 		m_instanceId = i;
140 		static if (!EPOLL) g_threadId = new size_t(cast(size_t)m_instanceId);
141 
142 		core.atomic.atomicOp!"+="(i, cast(ushort) 1);
143 		m_evLoop = evl;
144 
145 		import core.thread;
146 		//try Thread.getThis().priority = Thread.PRIORITY_MAX;
147 		//catch (Exception e) { assert(false, "Could not set thread priority"); }
148 
149 		try
150 			if (!g_mutex)
151 				g_mutex = new Mutex;
152 		catch (Throwable) {}
153 
154 		static if (EPOLL)
155 		{
156 
157 			if (!g_signalsBlocked)
158 				blockSignals();
159 			assert(m_instanceId <= __libc_current_sigrtmax(), "An additional event loop is unsupported due to SIGRTMAX restrictions in Linux Kernel");
160 			m_epollfd = epoll_create1(EPOLL_CLOEXEC);
161 
162 			if (catchError!"epoll_create1"(m_epollfd))
163 				return false;
164 
165 			import core.sys.linux.sys.signalfd;
166 			import core.thread : getpid;
167 
168 			fd_t err;
169 			fd_t sfd;
170 
171 			sigset_t mask;
172 
173 			try {
174 				sigemptyset(&mask);
175 				sigaddset(&mask, __libc_current_sigrtmin());
176 				err = pthread_sigmask(SIG_BLOCK, &mask, null);
177 				if (catchError!"sigprocmask"(err))
178 				{
179 					m_status.code = Status.EVLOOP_FAILURE;
180 					return false;
181 				}
182 			} catch (Throwable) { }
183 
184 
185 
186 			sfd = signalfd(-1, &mask, SFD_NONBLOCK);
187 			assert(sfd > 0, "Failed to setup signalfd in epoll");
188 
189 			EventType evtype;
190 
191 			epoll_event _event;
192 			_event.events = EPOLLIN;
193 			evtype = EventType.Signal;
194 			try
195 				m_evSignal = ThreadMem.alloc!EventInfo(sfd, evtype, EventObject.init, m_instanceId);
196 			catch (Exception e){
197 				assert(false, "Allocation error");
198 			}
199 			_event.data.ptr = cast(void*) m_evSignal;
200 
201 			err = epoll_ctl(m_epollfd, EPOLL_CTL_ADD, sfd, &_event);
202 			if (catchError!"EPOLL_CTL_ADD(sfd)"(err))
203 			{
204 				return false;
205 			}
206 
207 		}
208 			else /* if KQUEUE */
209 		{
210 			try {
211 				if (!gs_queueMutex) {
212 					gs_queueMutex = ThreadMem.alloc!ReadWriteMutex();
213 					gs_signalQueue = Array!(Array!AsyncSignal)();
214 					gs_idxQueue = Array!(Array!size_t)();
215 				}
216 				if (g_evIdxAvailable.empty) {
217 					g_evIdxAvailable.reserve(32);
218 
219 					foreach (k; g_evIdxAvailable.length .. g_evIdxAvailable.capacity) {
220 						g_evIdxAvailable.insertBack(k + 1);
221 					}
222 					g_evIdxCapacity = 32;
223 					g_idxCapacity = 32;
224 				}
225 			} catch (Throwable) { assert(false, "Initialization failed"); }
226 			m_kqueuefd = kqueue();
227 			int err;
228 			try {
229 				sigset_t mask;
230 				sigemptyset(&mask);
231 				sigaddset(&mask, SIGXCPU);
232 
233 				err = sigprocmask(SIG_BLOCK, &mask, null);
234 			} catch (Throwable) {}
235 
236 			EventType evtype = EventType.Signal;
237 
238 			// use GC because ThreadMem fails at emplace for shared objects
239 			try
240 				m_evSignal = ThreadMem.alloc!EventInfo(SIGXCPU, evtype, EventObject.init, m_instanceId);
241 			catch (Exception e) {
242 				assert(false, "Failed to allocate resources");
243 			}
244 
245 			if (catchError!"siprocmask"(err))
246 				return 0;
247 
248 			kevent_t _event;
249 			EV_SET(&_event, SIGXCPU, EVFILT_SIGNAL, EV_ADD | EV_ENABLE, 0, 0, m_evSignal);
250 			err = kevent(m_kqueuefd, &_event, 1, null, 0, null);
251 			if (catchError!"kevent_add(SIGXCPU)"(err))
252 				assert(false, "Add SIGXCPU failed at kevent call");
253 		}
254 
255 		static if (LOG) try log("init in thread " ~ Thread.getThis().name); catch (Throwable) {}
256 
257 		return true;
258 	}
259 
260 	void exit() {
261 		import core.sys.posix.unistd : close;
262 		static if (EPOLL) {
263 			close(m_epollfd); // not necessary?
264 
265 			// not necessary:
266 			//try ThreadMem.free(m_evSignal);
267 			//catch (Exception e) { assert(false, "Failed to free resources"); }
268 
269 		}
270 		else
271 			close(m_kqueuefd);
272 	}
273 
274 	@property const(StatusInfo) status() const {
275 		return m_status;
276 	}
277 
278 	@property string error() const {
279 		string* ptr;
280 		return ((ptr = (m_error in EPosixMessages)) !is null) ? *ptr : string.init;
281 	}
282 
283 	bool loop(Duration timeout = 0.seconds)
284 		//in { assert(Fiber.getThis() is null); }
285 	{
286 		import libasync.internals.memory;
287 
288 		int num = void;
289 
290 		static if (EPOLL) {
291 			static align(1) epoll_event[] events;
292 			if (events is null)
293 			{
294 				try events = new epoll_event[128];
295 				catch (Exception e) {
296 					assert(false, "Could not allocate events array: " ~ e.msg);
297 				}
298 			}
299 		} else /* if KQUEUE */ {
300 			import core.sys.posix.time : time_t;
301 			import core.sys.posix.config : c_long;
302 
303 			static kevent_t[] events;
304 			if (events.length == 0) {
305 				try events = allocArray!kevent_t(manualAllocator(), 128);
306 				catch (Exception e) { assert(false, "Could not allocate events array"); }
307 			}
308 		}
309 
310 		auto waitForEvents(Duration timeout)
311 		{
312 			static if (EPOLL) {
313 				int timeout_ms;
314 				if (timeout == 0.seconds) // return immediately
315 					timeout_ms = 0;
316 				else if (timeout == -1.seconds) // wait indefinitely
317 					timeout_ms = -1;
318 				else timeout_ms = cast(int) timeout.total!"msecs";
319 				/// Retrieve pending events
320 				scope (exit) assert(events !is null && events.length <= 128);
321 				return epoll_wait(m_epollfd, cast(epoll_event*) &events[0], 128, timeout_ms);
322 			} else /* if KQUEUE */ {
323 				if (timeout != -1.seconds) {
324 					time_t secs = timeout.split!("seconds", "nsecs")().seconds;
325 					c_long ns = timeout.split!("seconds", "nsecs")().nsecs;
326 					auto tspec = libasync.internals.kqueue.timespec(secs, ns);
327 
328 					return kevent(m_kqueuefd, null, 0, cast(kevent_t*) events, cast(int) events.length, &tspec);
329 				} else {
330 					return kevent(m_kqueuefd, null, 0, cast(kevent_t*) events, cast(int) events.length, null);
331 				}
332 			}
333 		}
334 
335 		auto handleEvents()
336 		{
337 			bool success = true;
338 
339 			static Tuple!(int, Status)[] errors = [	tuple(EINTR, Status.EVLOOP_TIMEOUT) ];
340 
341 			if (catchEvLoopErrors!"event_poll'ing"(num, errors))
342 				return false;
343 
344 			if (num > 0)
345 				static if (LOG) log("Got " ~ num.to!string ~ " event(s)");
346 
347 			foreach(i; 0 .. num) {
348 				success = false;
349 				m_status = StatusInfo.init;
350 				static if (EPOLL)
351 				{
352 					epoll_event _event = events[i];
353 					static if (LOG) try log("Event " ~ i.to!string ~ " of: " ~ events.length.to!string); catch (Throwable e) {}
354 					EventInfo* info = cast(EventInfo*) _event.data.ptr;
355 					int event_flags = cast(int) _event.events;
356 				}
357 				else /* if KQUEUE */
358 				{
359 					kevent_t _event = events[i];
360 					EventInfo* info = cast(EventInfo*) _event.udata;
361 					//log("Got info");
362 					int event_flags = (_event.filter << 16) | (_event.flags & 0xffff);
363 					//log("event flags");
364 				}
365 
366 				//if (info.owner != m_instanceId)
367 				//	static if (LOG) try log("Event " ~ (cast(int)(info.evType)).to!string ~ " is invalid: supposidly created in instance #" ~ info.owner.to!string ~ ", received in " ~ m_instanceId.to!string ~ " event: " ~ event_flags.to!string);
368 				//	catch{}
369 				//log("owner");
370 				switch (info.evType) {
371 					case EventType.Event:
372 						if (info.fd == 0)
373 							break;
374 
375 						import core.sys.posix.unistd : close;
376 						success = onEvent(info.fd, info.evObj.eventHandler, event_flags);
377 
378 						if (!success) {
379 							close(info.fd);
380 							assumeWontThrow(ThreadMem.free(info));
381 						}
382 						break;
383 					case EventType.Socket:
384 						auto socket = info.evObj.socket;
385 						if (socket.passive) {
386 							success = onCOPSocketEvent(socket, event_flags);
387 						} else if (socket.connectionOriented) {
388 							success = onCOASocketEvent(socket, event_flags);
389 						} else {
390 							success = onCLSocketEvent(socket, event_flags);
391 						}
392 						break;
393 					case EventType.TCPAccept:
394 						if (info.fd == 0)
395 							break;
396 						success = onTCPAccept(info.fd, info.evObj.tcpAcceptHandler, event_flags);
397 						break;
398 
399 					case EventType.Notifier:
400 
401 						static if (LOG) log("Got notifier!");
402 						try info.evObj.notifierHandler();
403 						catch (Exception e) {
404 							setInternalError!"notifierHandler"(Status.ERROR);
405 						}
406 						break;
407 
408 					case EventType.DirectoryWatcher:
409 						static if (LOG) log("Got DirectoryWatcher event!");
410 						static if (!EPOLL) {
411 							// in KQUEUE all events will be consumed here, because they must be pre-processed
412 							try {
413 								DWFileEvent fevent;
414 								if (_event.fflags & (NOTE_LINK | NOTE_WRITE))
415 									fevent = DWFileEvent.CREATED;
416 								else if (_event.fflags & NOTE_DELETE)
417 									fevent = DWFileEvent.DELETED;
418 								else if (_event.fflags & (NOTE_ATTRIB | NOTE_EXTEND | NOTE_WRITE))
419 									fevent = DWFileEvent.MODIFIED;
420 								else if (_event.fflags & NOTE_RENAME)
421 									fevent = DWFileEvent.MOVED_FROM;
422 								else if (_event.fflags & NOTE_RENAME)
423 									fevent = DWFileEvent.MOVED_TO;
424 								else
425 									assert(false, "No event found?");
426 
427 								DWFolderInfo fi = m_dwFolders.get(cast(fd_t)_event.ident, DWFolderInfo.init);
428 
429 								if (fi == DWFolderInfo.init) {
430 									DWFileInfo tmp = m_dwFiles.get(cast(fd_t)_event.ident, DWFileInfo.init);
431 									assert(tmp != DWFileInfo.init, "The event loop returned an invalid file's file descriptor for the directory watcher");
432 									fi = m_dwFolders.get(cast(fd_t) tmp.folder, DWFolderInfo.init);
433 									assert(fi != DWFolderInfo.init, "The event loop returned an invalid folder file descriptor for the directory watcher");
434 								}
435 
436 								// all recursive events will be generated here
437 								if (!compareFolderFiles(fi, fevent)) {
438 									continue;
439 								}
440 
441 							} catch (Exception e) {
442 								static if (LOG) log("Could not process DirectoryWatcher event: " ~ e.msg);
443 								break;
444 							}
445 
446 						}
447 
448 						try info.evObj.dwHandler();
449 						catch (Exception e) {
450 							setInternalError!"dwHandler"(Status.ERROR);
451 						}
452 						break;
453 
454 					case EventType.Timer:
455 						static if (LOG) try log("Got timer! " ~ info.fd.to!string); catch (Throwable e) {}
456 						static if (EPOLL) {
457 							static long val;
458 							import core.sys.posix.unistd : read;
459 							read(info.evObj.timerHandler.ctxt.id, &val, long.sizeof);
460 						} else {
461 						}
462 						try info.evObj.timerHandler();
463 						catch (Exception e) {
464 							setInternalError!"timerHandler"(Status.ERROR);
465 						}
466 						static if (!EPOLL) {
467 							auto ctxt = info.evObj.timerHandler.ctxt;
468 							if (ctxt && ctxt.oneShot && !ctxt.rearmed) {
469 								kevent_t __event;
470 								EV_SET(&__event, ctxt.id, EVFILT_TIMER, EV_DELETE, 0, 0, null);
471 								int err = kevent(m_kqueuefd, &__event, 1, null, 0, null);
472 								if (catchError!"kevent_del(timer)"(err))
473 									return false;
474 							}
475 						}
476 						break;
477 
478 					case EventType.Signal:
479 						static if (LOG) try log("Got signal!"); catch (Throwable e) {}
480 
481 						static if (EPOLL) {
482 
483 							static if (LOG) try log("Got signal: " ~ info.fd.to!string ~ " of type: " ~ info.evType.to!string); catch (Throwable e) {}
484 							import core.sys.linux.sys.signalfd : signalfd_siginfo;
485 							import core.sys.posix.unistd : read;
486 							signalfd_siginfo fdsi;
487 							fd_t err = cast(fd_t)read(info.fd, &fdsi, fdsi.sizeof);
488 							shared AsyncSignal sig = cast(shared AsyncSignal) cast(void*) fdsi.ssi_ptr;
489 
490 							try sig.handler();
491 							catch (Exception e) {
492 								setInternalError!"signal handler"(Status.ERROR);
493 							}
494 
495 
496 						}
497 						else /* if KQUEUE */
498 						{
499 							static AsyncSignal[] sigarr;
500 
501 							if (sigarr.length == 0) {
502 								try sigarr = new AsyncSignal[32];
503 								catch (Exception e) { assert(false, "Could not allocate signals array"); }
504 							}
505 
506 							bool more = popSignals(sigarr);
507 							foreach (AsyncSignal sig; sigarr)
508 							{
509 								shared AsyncSignal ptr = cast(shared AsyncSignal) sig;
510 								if (ptr is null)
511 									break;
512 								try (cast(shared AsyncSignal)sig).handler();
513 								catch (Exception e) {
514 									setInternalError!"signal handler"(Status.ERROR);
515 								}
516 							}
517 						}
518 						break;
519 
520 					case EventType.UDPSocket:
521 						import core.sys.posix.unistd : close;
522 						success = onUDPTraffic(info.fd, info.evObj.udpHandler, event_flags);
523 
524 						nothrow void abortHandler(bool graceful) {
525 
526 							close(info.fd);
527 							info.evObj.udpHandler.conn.socket = 0;
528 							try info.evObj.udpHandler(UDPEvent.ERROR);
529 							catch (Exception e) { }
530 							try ThreadMem.free(info);
531 							catch (Exception e){ assert(false, "Error freeing resources"); }
532 						}
533 
534 						if (!success && m_status.code == Status.ABORT) {
535 							abortHandler(true);
536 
537 						}
538 						else if (!success && m_status.code == Status.ERROR) {
539 							abortHandler(false);
540 						}
541 						break;
542 					case EventType.TCPTraffic:
543 						assert(info.evObj.tcpEvHandler.conn !is null, "TCP Connection invalid");
544 
545 						success = onTCPTraffic(info.fd, info.evObj.tcpEvHandler, event_flags, info.evObj.tcpEvHandler.conn);
546 
547 						nothrow void abortTCPHandler(bool graceful) {
548 
549 							nothrow void closeAll() {
550 								static if (LOG) try log("closeAll()"); catch (Throwable e) {}
551 								if (info.evObj.tcpEvHandler.conn.connected)
552 									closeSocket(info.fd, true, true);
553 
554 								info.evObj.tcpEvHandler.conn.socket = 0;
555 							}
556 
557 							/// Close the connection after an unexpected socket error
558 							if (graceful) {
559 								try info.evObj.tcpEvHandler(TCPEvent.CLOSE);
560 								catch (Exception e) { static if(LOG) log("Close failure"); }
561 								closeAll();
562 							}
563 
564 							/// Kill the connection after an internal error
565 							else {
566 								try info.evObj.tcpEvHandler(TCPEvent.ERROR);
567 								catch (Exception e) { static if(LOG) log("Error failure"); }
568 								closeAll();
569 							}
570 
571 							if (info.evObj.tcpEvHandler.conn.inbound) {
572 								static if (LOG) log("Freeing inbound connection FD#" ~ info.fd.to!string);
573 								try ThreadMem.free(info.evObj.tcpEvHandler.conn);
574 								catch (Exception e){ assert(false, "Error freeing resources"); }
575 							}
576 							try ThreadMem.free(info);
577 							catch (Exception e){ assert(false, "Error freeing resources"); }
578 						}
579 
580 						if (!success && m_status.code == Status.ABORT) {
581 							abortTCPHandler(true);
582 						}
583 						else if (!success && m_status.code == Status.ERROR) {
584 							abortTCPHandler(false);
585 						}
586 						break;
587 					default:
588 						break;
589 				}
590 
591 			}
592 
593 			return success;
594 		}
595 
596 		if (m_completedSocketAccepts.empty && m_completedSocketReceives.empty && m_completedSocketSends.empty) {
597 			num = waitForEvents(timeout);
598 			return handleEvents();
599 		} else {
600 			num = waitForEvents(0.seconds);
601 			if (num != 0 && !handleEvents()) return false;
602 
603 			foreach (request; m_completedSocketAccepts) {
604 				m_completedSocketAccepts.removeFront();
605 				auto socket = request.socket;
606 				auto peer = request.onComplete(request.peer, request.family, socket.info.type, socket.info.protocol);
607 				assumeWontThrow(AsyncAcceptRequest.free(request));
608 				if (!peer.run) {
609 					m_status.code = Status.ABORT;
610 					peer.kill();
611 					peer.handleError();
612 					return false;
613 				}
614 			}
615 
616 			foreach (request; m_completedSocketReceives) {
617 				if (request.socket.receiveContinuously) {
618 					m_completedSocketReceives.removeFront();
619 					assumeWontThrow(request.onComplete.get!0)(request.message.transferred);
620 					if (request.socket.receiveContinuously && request.socket.alive) {
621 						request.message.count = 0;
622 						submitRequest(request);
623 					} else {
624 						assumeWontThrow(NetworkMessage.free(request.message));
625 						assumeWontThrow(AsyncReceiveRequest.free(request));
626 					}
627 				} else {
628 					m_completedSocketReceives.removeFront();
629 					if (request.message) {
630 						assumeWontThrow(request.onComplete.get!0)(request.message.transferred);
631 						assumeWontThrow(NetworkMessage.free(request.message));
632 					} else {
633 						assumeWontThrow(request.onComplete.get!1)();
634 					}
635 					assumeWontThrow(AsyncReceiveRequest.free(request));
636 				}
637 			}
638 
639 			foreach (request; m_completedSocketSends) {
640 				m_completedSocketSends.removeFront();
641 				request.onComplete();
642 				assumeWontThrow(NetworkMessage.free(request.message));
643 				assumeWontThrow(AsyncSendRequest.free(request));
644 			}
645 
646 			return true;
647 		}
648 	}
649 
650 	bool setOption(T)(fd_t fd, TCPOption option, in T value) {
651 		m_status = StatusInfo.init;
652 		import std.traits : isIntegral;
653 
654 		import libasync.internals.socket_compat : socklen_t, setsockopt, SO_REUSEADDR, SO_KEEPALIVE, SO_RCVBUF, SO_SNDBUF, SO_RCVTIMEO, SO_SNDTIMEO, SO_LINGER, SOL_SOCKET, IPPROTO_TCP, TCP_NODELAY, TCP_QUICKACK, TCP_KEEPCNT, TCP_KEEPINTVL, TCP_KEEPIDLE, TCP_CONGESTION, TCP_CORK, TCP_DEFER_ACCEPT;
655 		int err;
656 		nothrow bool errorHandler() {
657 			if (catchError!"setOption:"(err)) {
658 				try m_status.text ~= option.to!string;
659 				catch (Exception e){ assert(false, "to!string conversion failure"); }
660 				return false;
661 			}
662 
663 			return true;
664 		}
665 		final switch (option) {
666 			case TCPOption.NODELAY: // true/false
667 				static if (!is(T == bool))
668 					assert(false, "NODELAY value type must be bool, not " ~ T.stringof);
669 				else {
670 					int val = value?1:0;
671 					socklen_t len = val.sizeof;
672 					err = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, len);
673 					return errorHandler();
674 				}
675 			case TCPOption.REUSEADDR: // true/false
676 				static if (!is(T == bool))
677 					assert(false, "REUSEADDR value type must be bool, not " ~ T.stringof);
678 				else {
679 					int val = value?1:0;
680 					socklen_t len = val.sizeof;
681 					err = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &val, len);
682 					if (!errorHandler())
683 						return false;
684 					version (Posix) {
685 						version (linux) {
686 							return true;
687 						} else {
688 							// BSD systems have SO_REUSEPORT
689 							import libasync.internals.socket_compat : SO_REUSEPORT;
690 							err = setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &val, len);
691 							return errorHandler();
692 						}
693 					}
694 				}
695 			case TCPOption.REUSEPORT: // true/false
696 				// use a standalone REUSEPORT option to handle SO_REUSEPORT on linux
697 				version (linux) {
698 					static if (!is(T == bool))
699 						assert(false, "REUSEPORT value type must be bool, not " ~ T.stringof);
700 					else {
701 						// BSD systems have SO_REUSEPORT
702 						import libasync.internals.socket_compat : SO_REUSEPORT;
703 						int val = value?1:0;
704 						err = setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &val, val.sizeof);
705 
706 						// Not all linux kernels support SO_REUSEPORT
707 						// ignore invalid and not supported errors on linux
708 						if (errno == EINVAL || errno == ENOPROTOOPT) {
709 							return true;
710 						}
711 
712 						return errorHandler();
713 					}
714 				} else return true;
715 			case TCPOption.QUICK_ACK:
716 				static if (!is(T == bool))
717 					assert(false, "QUICK_ACK value type must be bool, not " ~ T.stringof);
718 				else {
719 					static if (EPOLL) {
720 						int val = value?1:0;
721 						socklen_t len = val.sizeof;
722 						err = setsockopt(fd, IPPROTO_TCP, TCP_QUICKACK, &val, len);
723 						return errorHandler();
724 					}
725 					else /* not linux */ {
726 						return false;
727 					}
728 				}
729 			case TCPOption.KEEPALIVE_ENABLE: // true/false
730 				static if (!is(T == bool))
731 					assert(false, "KEEPALIVE_ENABLE value type must be bool, not " ~ T.stringof);
732 				else
733 				{
734 					int val = value?1:0;
735 					socklen_t len = val.sizeof;
736 					err = setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &val, len);
737 					return errorHandler();
738 				}
739 			case TCPOption.KEEPALIVE_COUNT: // ##
740 				static if (!isIntegral!T)
741 					assert(false, "KEEPALIVE_COUNT value type must be integral, not " ~ T.stringof);
742 				else {
743 					int val = value.total!"msecs".to!uint;
744 					socklen_t len = val.sizeof;
745 					err = setsockopt(fd, IPPROTO_TCP, TCP_KEEPCNT, &val, len);
746 					return errorHandler();
747 				}
748 			case TCPOption.KEEPALIVE_INTERVAL: // wait ## seconds
749 				static if (!is(T == Duration))
750 					assert(false, "KEEPALIVE_INTERVAL value type must be Duration, not " ~ T.stringof);
751 				else {
752 					int val;
753 					try val = value.total!"seconds".to!uint; catch (Throwable e) { return false; }
754 					socklen_t len = val.sizeof;
755 					err = setsockopt(fd, IPPROTO_TCP, TCP_KEEPINTVL, &val, len);
756 					return errorHandler();
757 				}
758 			case TCPOption.KEEPALIVE_DEFER: // wait ## seconds until start
759 				static if (!is(T == Duration))
760 					assert(false, "KEEPALIVE_DEFER value type must be Duration, not " ~ T.stringof);
761 				else {
762 					int val;
763 					try val = value.total!"seconds".to!uint; catch (Throwable e) { return false; }
764 					socklen_t len = val.sizeof;
765 					err = setsockopt(fd, IPPROTO_TCP, TCP_KEEPIDLE, &val, len);
766 					return errorHandler();
767 				}
768 			case TCPOption.BUFFER_RECV: // bytes
769 				static if (!isIntegral!T)
770 					assert(false, "BUFFER_RECV value type must be integral, not " ~ T.stringof);
771 				else {
772 					int val = value.to!int;
773 					socklen_t len = val.sizeof;
774 					err = setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &val, len);
775 					return errorHandler();
776 				}
777 			case TCPOption.BUFFER_SEND: // bytes
778 				static if (!isIntegral!T)
779 					assert(false, "BUFFER_SEND value type must be integral, not " ~ T.stringof);
780 				else {
781 					int val = value.to!int;
782 					socklen_t len = val.sizeof;
783 					err = setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &val, len);
784 					return errorHandler();
785 				}
786 			case TCPOption.TIMEOUT_RECV:
787 				static if (!is(T == Duration))
788 					assert(false, "TIMEOUT_RECV value type must be Duration, not " ~ T.stringof);
789 				else {
790 					import core.sys.posix.sys.time : timeval;
791 					time_t secs = cast(time_t) value.split!("seconds", "usecs")().seconds;
792 					suseconds_t us;
793 					try us = value.split!("seconds", "usecs")().usecs.to!suseconds_t; catch (Throwable e) {}
794 					timeval t = timeval(secs, us);
795 					socklen_t len = t.sizeof;
796 					err = setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, &t, len);
797 					return errorHandler();
798 				}
799 			case TCPOption.TIMEOUT_SEND:
800 				static if (!is(T == Duration))
801 					assert(false, "TIMEOUT_SEND value type must be Duration, not " ~ T.stringof);
802 				else {
803 					import core.sys.posix.sys.time : timeval;
804 					auto timeout = value.split!("seconds", "usecs")();
805 					timeval t;
806 					try t = timeval(timeout.seconds.to!time_t, timeout.usecs.to!suseconds_t);
807 					catch (Exception) { return false; }
808 					socklen_t len = t.sizeof;
809 					err = setsockopt(fd, SOL_SOCKET, SO_SNDTIMEO, &t, len);
810 					return errorHandler();
811 				}
812 			case TCPOption.TIMEOUT_HALFOPEN:
813 				static if (!is(T == Duration))
814 					assert(false, "TIMEOUT_SEND value type must be Duration, not " ~ T.stringof);
815 				else {
816 					uint val;
817 					try val = value.total!"msecs".to!uint; catch (Throwable e) {
818 						return false;
819 					}
820 					socklen_t len = val.sizeof;
821 					err = setsockopt(fd, SOL_SOCKET, SO_SNDTIMEO, &val, len);
822 					return errorHandler();
823 				}
824 			case TCPOption.LINGER: // bool onOff, int seconds
825 				static if (!is(T == Tuple!(bool, int)))
826 					assert(false, "LINGER value type must be Tuple!(bool, int), not " ~ T.stringof);
827 				else {
828 					linger l = linger(val[0]?1:0, val[1]);
829 					socklen_t llen = l.sizeof;
830 					err = setsockopt(fd, SOL_SOCKET, SO_LINGER, &l, llen);
831 					return errorHandler();
832 				}
833 			case TCPOption.CONGESTION:
834 				static if (!isIntegral!T)
835 					assert(false, "CONGESTION value type must be integral, not " ~ T.stringof);
836 				else {
837 					int val = value.to!int;
838 					len = int.sizeof;
839 					err = setsockopt(fd, IPPROTO_TCP, TCP_CONGESTION, &val, len);
840 					return errorHandler();
841 				}
842 			case TCPOption.CORK:
843 				static if (!isIntegral!T)
844 					assert(false, "CORK value type must be int, not " ~ T.stringof);
845 				else {
846 					static if (EPOLL) {
847 						int val = value.to!int;
848 						socklen_t len = val.sizeof;
849 						err = setsockopt(fd, IPPROTO_TCP, TCP_CORK, &val, len);
850 						return errorHandler();
851 					}
852 					else /* if KQUEUE */ {
853 						int val = value.to!int;
854 						socklen_t len = val.sizeof;
855 						err = setsockopt(fd, IPPROTO_TCP, TCP_NOPUSH, &val, len);
856 						return errorHandler();
857 
858 					}
859 				}
860 			case TCPOption.DEFER_ACCEPT: // seconds
861 				static if (!isIntegral!T)
862 					assert(false, "DEFER_ACCEPT value type must be integral, not " ~ T.stringof);
863 				else {
864 					static if (EPOLL) {
865 						int val = value.to!int;
866 						socklen_t len = val.sizeof;
867 						err = setsockopt(fd, IPPROTO_TCP, TCP_DEFER_ACCEPT, &val, len);
868 						return errorHandler();
869 					}
870 					else /* if KQUEUE */ {
871 						// todo: Emulate DEFER_ACCEPT with ACCEPT_FILTER(9)
872 						/*int val = value.to!int;
873 						 socklen_t len = val.sizeof;
874 						 err = setsockopt(fd, SOL_SOCKET, SO_ACCEPTFILTER, &val, len);
875 						 return errorHandler();
876 						 */
877 						assert(false, "TCPOption.DEFER_ACCEPT is not implemented");
878 					}
879 				}
880 		}
881 
882 	}
883 
884 	pragma(inline, true)
885 	uint recv(in fd_t fd, ref ubyte[] data)
886 	{
887 			static if (LOG) try log("Recv from FD: " ~ fd.to!string); catch (Throwable e) {}
888 			m_status = StatusInfo.init;
889 			import libasync.internals.socket_compat : recv;
890 			int ret = cast(int) recv(fd, cast(void*) data.ptr, data.length, cast(int)0);
891 
892 			static if (LOG) log(".recv " ~ ret.to!string ~ " bytes of " ~ data.length.to!string ~ " @ " ~ fd.to!string);
893 			if (catchError!".recv"(ret)){
894 					if (m_error == EPosix.EWOULDBLOCK || m_error == EPosix.EAGAIN)
895 							m_status.code = Status.ASYNC;
896 					else if (m_error == EPosix.EINTR)
897 							m_status.code = Status.RETRY;
898 
899 					return 0;
900 			}
901 
902 			return cast(uint) ret < 0 ? 0 : ret;
903 	}
904 
905 	pragma(inline, true)
906 	uint send(in fd_t fd, in ubyte[] data)
907 	{
908 		static if (LOG) try log("Send to FD: " ~ fd.to!string); catch (Throwable e) {}
909 		m_status = StatusInfo.init;
910 		import libasync.internals.socket_compat : send;
911 		int ret = cast(int) send(fd, cast(const(void)*) data.ptr, data.length, cast(int)0);
912 		static if (LOG) try log("Sent: " ~ ret.to!string); catch (Throwable e) {}
913 		if (catchError!"send"(ret)) { // ret == -1
914 			if (m_error == EPosix.EWOULDBLOCK || m_error == EPosix.EAGAIN)
915 				m_status.code = Status.ASYNC;
916 			else if (m_error == EPosix.EINTR)
917 				m_status.code = Status.RETRY;
918 			return 0;
919 
920 		}
921 		return cast(uint) ret < 0 ? 0 : ret;
922 	}
923 	
924 	size_t recvMsg(in fd_t fd, NetworkMessage* msg)
925 	{
926 		import libasync.internals.socket_compat : recvmsg, msghdr, iovec, sockaddr_storage;
927 
928 		while (true) {
929 			auto err = recvmsg(fd, msg.header, 0);
930 
931 			.tracef("recvmsg system call on FD %d returned %d", fd, err);
932 			if (err == SOCKET_ERROR) {
933 				m_error = lastError();
934 
935 				if (m_error == EPosix.EINTR) {
936 					.tracef("recvmsg system call on FD %d was interrupted before any transfer occured", fd);
937 					continue;
938 				} else if (m_error == EPosix.EWOULDBLOCK || m_error == EPosix.EAGAIN) {
939 					.tracef("recvmsg system call on FD %d would have blocked", fd);
940 					m_status.code = Status.ASYNC;
941 					return 0;
942 				} else if (m_error == EBADF ||
943 				           m_error == EFAULT ||
944 				           m_error == EINVAL ||
945 				           m_error == ENOTCONN ||
946 				           m_error == ENOTSOCK) {
947 					.errorf("recvmsg system call on FD %d encountered fatal socket error: %s", fd, this.error);
948 					assert(false);
949 				} else if (catchError!"Receive message"(err)) {
950 					.errorf("recvmsg system call on FD %d encountered socket error: %s", fd, this.error);
951 					return 0;
952 				}
953 			} else {
954 				.tracef("Received %d bytes on FD %d", err, fd);
955 				m_status.code = Status.OK;
956 				return err;
957 			}
958 		}
959 	}
960 
961 	size_t sendMsg(in fd_t fd, NetworkMessage* msg) {
962 		import libasync.internals.socket_compat : sendmsg;
963 
964 		.tracef("Send message on FD %d with size %d", fd, msg.header.msg_iov.iov_len);
965 		m_status = StatusInfo.init;
966 
967 		while (true) {
968 			auto err = sendmsg(fd, msg.header, 0);
969 
970 			.tracef("sendmsg system call on FD %d returned %d", fd, err);
971 			if (err == SOCKET_ERROR) {
972 				m_error = lastError();
973 
974 				if (m_error == EPosix.EINTR) {
975 					.tracef("sendmsg system call on FD %d was interrupted before any transfer occured", fd);
976 					continue;
977 				} else if (m_error == EPosix.EWOULDBLOCK || m_error == EPosix.EAGAIN) {
978 					.tracef("sendmsg system call on FD %d would have blocked", fd);
979 					m_status.code = Status.ASYNC;
980 					return 0;
981 				} else if (m_error == ECONNRESET ||
982 				           m_error == EPIPE) {
983 					return 0;
984 				} else if (m_error == EBADF ||
985 				           m_error == EDESTADDRREQ ||
986 				           m_error == EFAULT ||
987 				           m_error == EINVAL ||
988 				           m_error == EISCONN ||
989 				           m_error == ENOTSOCK ||
990 				           m_error == EOPNOTSUPP) {
991 					.errorf("sendmsg system call on FD %d encountered fatal socket error: %s", fd, this.error);
992 					assert(false);
993 				// ENOTCONN, EMSGSIZE
994 				} else if (catchError!"Send message"(err)) {
995 					.errorf("sendmsg system call on FD %d encountered socket error: %s", fd, this.error);
996 					return 0;
997 				}
998 			} else {
999 				.tracef("Sent %d bytes on FD %d", err, fd);
1000 				m_status.code = Status.OK;
1001 				return err;
1002 			}
1003 		}
1004 	}
1005 
1006 	uint recvFrom(in fd_t fd, ref ubyte[] data, ref NetworkAddress addr)
1007 	{
1008 			import libasync.internals.socket_compat : recvfrom, AF_INET6, AF_INET, socklen_t;
1009 
1010 			m_status = StatusInfo.init;
1011 
1012 			addr.family = AF_INET6;
1013 			socklen_t addrLen = addr.sockAddrLen;
1014 			long ret = recvfrom(fd, cast(void*) data.ptr, data.length, 0, addr.sockAddr, &addrLen);
1015 
1016 			if (addrLen < addr.sockAddrLen) {
1017 					addr.family = AF_INET;
1018 			}
1019 
1020 			static if (LOG) try log("RECVFROM " ~ ret.to!string ~ "B"); catch (Throwable e) {}
1021 			if (catchError!".recvfrom"(ret)) { // ret == -1
1022 					if (m_error == EPosix.EWOULDBLOCK || m_error == EPosix.EAGAIN)
1023 							m_status.code = Status.ASYNC;
1024 					return 0;
1025 			}
1026 
1027 			m_status.code = Status.OK;
1028 
1029 			return cast(uint) ret;
1030 	}
1031 
1032 	uint sendTo(in fd_t fd, in ubyte[] data, in NetworkAddress addr)
1033 	{
1034 			import libasync.internals.socket_compat : sendto;
1035 
1036 			m_status = StatusInfo.init;
1037 
1038 			static if (LOG) try log("SENDTO " ~ data.length.to!string ~ "B");
1039 			catch (Throwable e) {}
1040 			long ret = sendto(fd, cast(void*) data.ptr, data.length, 0, addr.sockAddr, addr.sockAddrLen);
1041 
1042 			if (catchError!".sendto"(ret)) { // ret == -1
1043 					if (m_error == EPosix.EWOULDBLOCK || m_error == EPosix.EAGAIN)
1044 							m_status.code = Status.ASYNC;
1045 					return 0;
1046 			}
1047 
1048 			m_status.code = Status.OK;
1049 			return cast(uint) ret;
1050 	}
1051 
1052 	NetworkAddress localAddr(in fd_t fd, bool ipv6) {
1053 		NetworkAddress ret;
1054 		import libasync.internals.socket_compat : getsockname, AF_INET, AF_INET6, socklen_t, sockaddr;
1055 
1056 		if (ipv6)
1057 			ret.family = AF_INET6;
1058 		else
1059 			ret.family = AF_INET;
1060 
1061 		socklen_t len = ret.sockAddrLen;
1062 		int err = getsockname(fd, ret.sockAddr, &len);
1063 		if (catchError!"getsockname"(err))
1064 			return NetworkAddress.init;
1065 		if (len > ret.sockAddrLen)
1066 			ret.family = AF_INET6;
1067 
1068 		return ret;
1069 	}
1070 
1071 	bool notify(in fd_t fd, AsyncNotifier ctxt)
1072 	{
1073 		static if (EPOLL)
1074 		{
1075 			import core.sys.posix.unistd : write;
1076 
1077 			long val = 1;
1078 			fd_t err = cast(fd_t) write(fd, &val, long.sizeof);
1079 
1080 			if (catchError!"write(notify)"(err)) {
1081 				return false;
1082 			}
1083 			return true;
1084 		}
1085 		else /* if KQUEUE */
1086 		{
1087 			kevent_t _event;
1088 			EV_SET(&_event, fd, EVFILT_USER, EV_ENABLE | EV_CLEAR, NOTE_TRIGGER | 0x1, 0, ctxt.evInfo);
1089 			int err = kevent(m_kqueuefd, &_event, 1, null, 0, null);
1090 
1091 			if (catchError!"kevent_notify"(err)) {
1092 				return false;
1093 			}
1094 			return true;
1095 		}
1096 	}
1097 
1098 	bool notify(in fd_t fd, shared AsyncSignal ctxt)
1099 	{
1100 		static if (EPOLL)
1101 		{
1102 
1103 			sigval sigvl;
1104 			fd_t err;
1105 			sigvl.sival_ptr = cast(void*) ctxt;
1106 			try err = pthread_sigqueue(ctxt.pthreadId, fd, sigvl); catch (Throwable) {}
1107 			if (catchError!"sigqueue"(err)) {
1108 				return false;
1109 			}
1110 		}
1111 		else /* if KQUEUE */
1112 		{
1113 
1114 			import core.thread : getpid;
1115 
1116 			addSignal(ctxt);
1117 
1118 			try {
1119 				static if (LOG) log("Notified fd: " ~ fd.to!string ~ " of PID " ~ getpid().to!string);
1120 				int err = core.sys.posix.signal.kill(getpid(), SIGXCPU);
1121 				if (catchError!"notify(signal)"(err))
1122 					assert(false, "Signal could not be raised");
1123 			} catch (Throwable) {}
1124 		}
1125 
1126 		return true;
1127 	}
1128 
1129 	// no known uses
1130 	uint read(in fd_t fd, ref ubyte[] data)
1131 	{
1132 		m_status = StatusInfo.init;
1133 		return 0;
1134 	}
1135 
1136 	// no known uses
1137 	uint write(in fd_t fd, in ubyte[] data)
1138 	{
1139 		m_status = StatusInfo.init;
1140 		return 0;
1141 	}
1142 
1143 	uint watch(in fd_t fd, in WatchInfo info) {
1144 		// note: info.wd is still 0 at this point.
1145 		m_status = StatusInfo.init;
1146 		import core.sys.linux.sys.inotify;
1147 		import std.file : dirEntries, isDir, SpanMode;
1148 
1149 		static if (EPOLL) {
1150 			// Manually handle recursivity... All events show up under the same inotify
1151 			uint events = info.events; // values for this API were pulled from inotify
1152 			if (events & IN_DELETE)
1153 				events |= IN_DELETE_SELF;
1154 			if (events & IN_MOVED_FROM)
1155 				events |= IN_MOVE_SELF;
1156 
1157 			nothrow fd_t addFolderRecursive(Path path) {
1158 				fd_t ret;
1159 				try {
1160 					ret = inotify_add_watch(fd, path.toNativeString().toStringz, events);
1161 					if (catchError!"inotify_add_watch"(ret))
1162 						return fd_t.init;
1163 					static if (LOG) try log("inotify_add_watch(" ~ DWFolderInfo(WatchInfo(info.events, path, info.recursive, ret), fd).to!string ~ ")"); catch (Throwable) {}
1164 					assert(m_dwFolders.get(tuple(cast(fd_t) fd, cast(uint)ret), DWFolderInfo.init) == DWFolderInfo.init, "Could not get a unique watch descriptor for path, got: " ~ m_dwFolders[tuple(cast(fd_t)fd, cast(uint)ret)].to!string);
1165 					m_dwFolders[tuple(cast(fd_t)fd, cast(uint)ret)] = DWFolderInfo(WatchInfo(info.events, path, info.recursive, ret), fd);
1166 				} catch (Exception e) {
1167 					try setInternalError!"inotify_add_watch"(Status.ERROR, "Could not add directory " ~ path.toNativeString() ~ ": " ~ e.toString() ); catch (Throwable) {}
1168 					return 0;
1169 				}
1170 
1171 				if (info.recursive) {
1172 					try {
1173 						foreach (de; path.toNativeString().dirEntries(SpanMode.shallow))
1174 						{
1175 							Path de_path = Path(de.name);
1176 							if (!de_path.absolute)
1177 								de_path = path ~ Path(de.name);
1178 							if (isDir(de_path.toNativeString()))
1179 								if (addFolderRecursive(de_path) == 0)
1180 									continue;
1181 						}
1182 					} catch (Exception e) {
1183 						try setInternalError!"inotify_add_watch"(Status.ERROR, "Could not add sub-directories of " ~ path.toNativeString() ~ ": " ~ e.toString() ); catch (Throwable) {}
1184 					}
1185 				}
1186 
1187 				return ret;
1188 			}
1189 
1190 			return addFolderRecursive(info.path);
1191 
1192 		} else /* if KQUEUE */ {
1193 			/// Manually handle recursivity & file tracking. Each folder is an event!
1194 			/// E.g. file creation shows up as a folder change, we must be prepared to seek the file.
1195 			import core.sys.posix.fcntl;
1196 			import libasync.internals.kqueue;
1197 
1198 			uint events;
1199 			if (info.events & DWFileEvent.CREATED)
1200 				events |= NOTE_LINK | NOTE_WRITE;
1201 			if (info.events & DWFileEvent.DELETED)
1202 				events |= NOTE_DELETE;
1203 			if (info.events & DWFileEvent.MODIFIED)
1204 				events |= NOTE_ATTRIB | NOTE_EXTEND | NOTE_WRITE;
1205 			if (info.events & DWFileEvent.MOVED_FROM)
1206 				events |= NOTE_RENAME;
1207 			if (info.events & DWFileEvent.MOVED_TO)
1208 				events |= NOTE_RENAME;
1209 
1210 			EventInfo* evinfo;
1211 			try {
1212 				evinfo = m_watchers[fd];
1213 			} catch (Throwable) {
1214 				assert(false, "Could retrieve event info, directory watcher was not initialized properly, or you are operating on a closed directory watcher.");
1215 			}
1216 
1217 			/// we need a file descriptor for the containers, so we open files but we don't monitor them
1218 			/// todo: track indexes internally?
1219 			nothrow fd_t addRecursive(Path path, bool is_dir) {
1220 				int ret;
1221 				try {
1222 					static if (LOG) log("Adding path: " ~ path.toNativeString());
1223 
1224 					ret = open(path.toNativeString().toStringz, O_EVTONLY);
1225 					if (catchError!"open(watch)"(ret))
1226 						return 0;
1227 
1228 					if (is_dir)
1229 						m_dwFolders[ret] = DWFolderInfo(WatchInfo(info.events, path, info.recursive, ret), fd);
1230 
1231 					kevent_t _event;
1232 
1233 					EV_SET(&_event, ret, EVFILT_VNODE, EV_ADD | EV_CLEAR, events, 0, cast(void*) evinfo);
1234 
1235 					int err = kevent(m_kqueuefd, &_event, 1, null, 0, null);
1236 
1237 					if (catchError!"kevent_timer_add"(err))
1238 						return 0;
1239 
1240 
1241 					if (is_dir) foreach (de; dirEntries(path.toNativeString(), SpanMode.shallow)) {
1242 						Path filePath = Path(de.name);
1243 						if (!filePath.absolute)
1244 							filePath = path ~ filePath;
1245 						fd_t fwd;
1246 						if (info.recursive && isDir(filePath.toNativeString()))
1247 							fwd = addRecursive(filePath, true);
1248 						else {
1249 							fwd = addRecursive(filePath, false); // gets an ID but will not scan
1250 							m_dwFiles[fwd] = DWFileInfo(ret, filePath, de.timeLastModified, isDir(filePath.toNativeString()));
1251 						}
1252 
1253 					}
1254 
1255 				} catch (Exception e) {
1256 					try setInternalError!"inotify_add_watch"(Status.ERROR, "Could not add directory " ~ path.toNativeString() ~ ": " ~ e.msg); 
1257 					catch (Throwable) {}
1258 					return 0;
1259 				}
1260 				return ret;
1261 			}
1262 
1263 			fd_t wd;
1264 
1265 			try {
1266 				wd = addRecursive(info.path, isDir(info.path.toNativeString()));
1267 
1268 				if (wd == 0)
1269 					return 0;
1270 
1271 			}
1272 			catch (Exception e) {
1273 				setInternalError!"dw.watch"(Status.ERROR, "Failed to watch directory: " ~ e.msg);
1274 			}
1275 
1276 			return cast(uint) wd;
1277 		}
1278 	}
1279 
1280 	bool unwatch(in fd_t fd, in uint wd) {
1281 		// the wd can be used with m_dwFolders to find the DWFolderInfo
1282 		// and unwatch everything recursively.
1283 
1284 		m_status = StatusInfo.init;
1285 		static if (EPOLL) {
1286 			/// If recursive, all subfolders must also be unwatched recursively by removing them
1287 			/// from containers and from inotify
1288 			import core.sys.linux.sys.inotify;
1289 
1290 			nothrow bool removeAll(DWFolderInfo fi) {
1291 				int err;
1292 				try {
1293 
1294 					bool inotify_unwatch(uint wd) {
1295 						err = inotify_rm_watch(fd, wd);
1296 
1297 						if (catchError!"inotify_rm_watch"(err))
1298 							return false;
1299 						return true;
1300 					}
1301 
1302 					if (!inotify_unwatch(fi.wi.wd))
1303 						return false;
1304 
1305 					/*foreach (ref const fd_t id, ref const DWFileInfo file; m_dwFiles)
1306 					 {
1307 					 if (file.folder == fi.wi.wd) {
1308 					 inotify_unwatch(id);
1309 					 m_dwFiles.remove(id);
1310 					 }
1311 					 }*/
1312 					m_dwFolders.remove(tuple(cast(fd_t)fd, fi.wi.wd));
1313 
1314 					if (fi.wi.recursive) {
1315 						// find all subdirectories by comparing the path
1316 						Array!(Tuple!(fd_t, uint)) remove_list;
1317 						foreach (ref const key, ref const DWFolderInfo folder; m_dwFolders) {
1318 							if (folder.fd == fi.fd && folder.wi.path.startsWith(fi.wi.path)) {
1319 
1320 								if (!inotify_unwatch(folder.wi.wd))
1321 									return false;
1322 
1323 								remove_list.insertBack(key);
1324 							}
1325 						}
1326 						foreach (rm_wd; remove_list[])
1327 							m_dwFolders.remove(rm_wd);
1328 
1329 					}
1330 					return true;
1331 				} catch (Exception e) {
1332 					try setInternalError!"inotify_rm_watch"(Status.ERROR, "Could not unwatch directory: " ~ e.toString());
1333 					catch (Throwable) {}
1334 					return false;
1335 				}
1336 			}
1337 
1338 			DWFolderInfo info;
1339 
1340 			try {
1341 				info = m_dwFolders.get(tuple(cast(fd_t) fd, cast(uint) wd), DWFolderInfo.init);
1342 				if (info == DWFolderInfo.init) {
1343 					setInternalError!"dwFolders.get(wd)"(Status.ERROR, "Could not find watch info for wd " ~ wd.to!string);
1344 					return false;
1345 				}
1346 			} catch (Throwable) { }
1347 
1348 			return removeAll(info);
1349 		}
1350 		else /* if KQUEUE */ {
1351 
1352 			/// Recursivity must be handled manually, so we must unwatch subfiles and subfolders
1353 			/// recursively, remove the container entries, close the file descriptor, and disable the vnode events.
1354 
1355 			nothrow bool removeAll(DWFolderInfo fi) {
1356 				import core.sys.posix.unistd : close;
1357 
1358 
1359 				bool event_unset(uint id) {
1360 					kevent_t _event;
1361 					EV_SET(&_event, cast(int) id, EVFILT_VNODE, EV_DELETE, 0, 0, null);
1362 					int err = kevent(m_kqueuefd, &_event, 1, null, 0, null);
1363 					if (catchError!"kevent_unwatch"(err))
1364 						return false;
1365 					return true;
1366 				}
1367 
1368 				bool removeFolder(uint wd) {
1369 					if (!event_unset(fi.wi.wd))
1370 						return false;
1371 					m_dwFolders.remove(fi.wi.wd);
1372 					int err = close(fi.wi.wd);
1373 					if (catchError!"close dir"(err))
1374 						return false;
1375 					return true;
1376 				}
1377 
1378 				try {
1379 					removeFolder(fi.wi.wd);
1380 
1381 					if (fi.wi.recursive) {
1382 						import std.container.array;
1383 						Array!fd_t remove_list; // keep track of unwatched folders recursively
1384 						Array!fd_t remove_file_list;
1385 						// search for subfolders and unset them / close their wd
1386 						foreach (ref const DWFolderInfo folder; m_dwFolders) {
1387 							if (folder.fd == fi.fd && folder.wi.path.startsWith(fi.wi.path)) {
1388 
1389 								if (!event_unset(folder.wi.wd))
1390 									return false;
1391 
1392 								// search for subfiles, close their descriptors and remove them from the file list
1393 								foreach (ref const fd_t fwd, ref const DWFileInfo file; m_dwFiles) {
1394 									if (file.folder == folder.wi.wd) {
1395 										close(fwd);
1396 										remove_file_list.insertBack(fwd); // to be removed from m_dwFiles without affecting the loop
1397 									}
1398 								}
1399 
1400 								remove_list.insertBack(folder.wi.wd); // to be removed from m_dwFolders without affecting the loop
1401 							}
1402 						}
1403 
1404 						foreach (wd; remove_file_list[])
1405 							m_dwFiles.remove(wd);
1406 
1407 						foreach (rm_wd; remove_list[])
1408 							removeFolder(rm_wd);
1409 
1410 
1411 					}
1412 				} catch (Exception e) {
1413 					try setInternalError!"dwFolders.get(wd)"(Status.ERROR, "Could not close the folder " ~ fi.to!string ~ ": " ~ e.toString());
1414 					catch (Throwable) {}
1415 					return false;
1416 				}
1417 
1418 				return true;
1419 			}
1420 
1421 			DWFolderInfo info;
1422 			try info = m_dwFolders.get(wd, DWFolderInfo.init); catch (Throwable) {}
1423 
1424 			if (!removeAll(info))
1425 				return false;
1426 			return true;
1427 		}
1428 	}
1429 
1430 	// returns the amount of changes
1431 	uint readChanges(in fd_t fd, ref DWChangeInfo[] dst) {
1432 		m_status = StatusInfo.init;
1433 
1434 		static if (EPOLL) {
1435 			assert(dst.length > 0, "DirectoryWatcher called with 0 length DWChangeInfo array");
1436 			import core.sys.linux.sys.inotify;
1437 			import core.sys.posix.unistd : read;
1438 			import core.stdc.stdio : FILENAME_MAX;
1439 			import core.stdc.string : strlen;
1440 			ubyte[inotify_event.sizeof + FILENAME_MAX + 1] buf = void;
1441 			ssize_t nread = read(fd, buf.ptr, cast(uint)buf.sizeof);
1442 			if (catchError!"read()"(nread))
1443 			{
1444 				if (m_error == EPosix.EWOULDBLOCK || m_error == EPosix.EAGAIN)
1445 					m_status.code = Status.ASYNC;
1446 				return 0;
1447 			}
1448 			assert(nread > 0);
1449 
1450 
1451 			/// starts (recursively) watching all newly created folders in a recursive entry,
1452 			/// creates events for additional files/folders founds, and unwatches all deleted folders
1453 			void recurseInto(DWFolderInfo fi, DWFileEvent ev, ref Array!DWChangeInfo changes) {
1454 				import std.file : dirEntries, SpanMode, isDir;
1455 				assert(fi.wi.recursive);
1456 				// get a list of stuff in the created/moved folder
1457 				if (ev == DWFileEvent.CREATED || ev == DWFileEvent.MOVED_TO) {
1458 					foreach (de; dirEntries(fi.wi.path.toNativeString(), SpanMode.shallow)) {
1459 						Path entryPath = Path(de.name);
1460 						if (!entryPath.absolute)
1461 							entryPath = fi.wi.path ~ entryPath;
1462 
1463 						if (fi.wi.recursive && isDir(entryPath.toNativeString())) {
1464 
1465 							watch(fd, WatchInfo(fi.wi.events, entryPath, fi.wi.recursive, 0) );
1466 							void genEvents(Path subpath) {
1467 								foreach (de; dirEntries(subpath.toNativeString(), SpanMode.shallow)) {
1468 									auto subsubpath = Path(de.name);
1469 									if (!subsubpath.absolute)
1470 										subsubpath = subpath ~ subsubpath;
1471 									changes.insertBack(DWChangeInfo(DWFileEvent.CREATED, subsubpath));
1472 									if (isDir(subsubpath.toNativeString()))
1473 										genEvents(subsubpath);
1474 								}
1475 							}
1476 
1477 							genEvents(entryPath);
1478 
1479 						}
1480 					}
1481 				}
1482 			}
1483 
1484 			size_t i;
1485 			do
1486 			{
1487 				for (auto p = buf.ptr; p < buf.ptr + nread; )
1488 				{
1489 					inotify_event* ev = cast(inotify_event*)p;
1490 					p += inotify_event.sizeof + ev.len;
1491 
1492 					DWFileEvent evtype;
1493 					evtype = DWFileEvent.CREATED;
1494 					if (ev.mask & IN_CREATE)
1495 						evtype = DWFileEvent.CREATED;
1496 					if (ev.mask & IN_DELETE || ev.mask & IN_DELETE_SELF)
1497 						evtype = DWFileEvent.DELETED;
1498 					if (ev.mask & IN_MOVED_FROM || ev.mask & IN_MOVE_SELF)
1499 						evtype = DWFileEvent.MOVED_FROM;
1500 					if (ev.mask & (IN_MOVED_TO))
1501 						evtype = DWFileEvent.MOVED_TO;
1502 					if (ev.mask & IN_MODIFY)
1503 						evtype = DWFileEvent.MODIFIED;
1504 
1505 					import std.path : buildPath;
1506 					import core.stdc.string : strlen;
1507 					string name = cast(string) ev.name.ptr[0 .. cast(size_t) ev.name.ptr.strlen].idup;
1508 					DWFolderInfo fi;
1509 					Path path;
1510 					try {
1511 						fi = m_dwFolders.get(tuple(cast(fd_t)fd,cast(uint)ev.wd), DWFolderInfo.init);
1512 						if (fi == DWFolderInfo.init) {
1513 							setInternalError!"m_dwFolders[ev.wd]"(Status.ERROR, "Could not retrieve wd index in folders: " ~ ev.wd.to!string);
1514 							continue;
1515 						}
1516 						path = fi.wi.path ~ Path(name);
1517 					}
1518 					catch (Exception e) {
1519 						setInternalError!"m_dwFolders[ev.wd]"(Status.ERROR, "Could not retrieve wd index in folders");
1520 						return 0;
1521 					}
1522 
1523 					dst[i] = DWChangeInfo(evtype, path);
1524 					import std.file : isDir;
1525 					bool is_dir;
1526 					try is_dir = isDir(path.toNativeString()); catch (Throwable) {}
1527 					if (fi.wi.recursive && is_dir) {
1528 
1529 						try {
1530 							Array!DWChangeInfo changes;
1531 							recurseInto(fi, evtype, changes);
1532 							// stop watching if the folder was deleted
1533 							if (evtype == DWFileEvent.DELETED || evtype == DWFileEvent.MOVED_FROM) {
1534 								unwatch(fi.fd, fi.wi.wd);
1535 							}
1536 							foreach (change; changes[]) {
1537 								i++;
1538 								if (dst.length <= i)
1539 									dst ~= change;
1540 								else dst[i] = change;
1541 							}
1542 						}
1543 						catch (Exception e) {
1544 							setInternalError!"recurseInto"(Status.ERROR, "Failed to watch/unwatch contents of folder recursively.");
1545 							return 0;
1546 						}
1547 
1548 					}
1549 
1550 
1551 					i++;
1552 					if (i >= dst.length)
1553 						return cast(uint) i;
1554 				}
1555 				static if (LOG) foreach (j; 0 .. i) {
1556 					static if (LOG) try log("Change occured for FD#" ~ fd.to!string ~ ": " ~ dst[j].to!string); catch (Throwable e) {}
1557 				}
1558 				nread = read(fd, buf.ptr, buf.sizeof);
1559 				if (catchError!"read()"(nread)) {
1560 					if (m_error == EPosix.EWOULDBLOCK || m_error == EPosix.EAGAIN)
1561 						m_status.code = Status.ASYNC;
1562 					return cast(uint) i;
1563 				}
1564 			} while (nread > 0);
1565 
1566 			return cast(uint) i;
1567 		}
1568 		else /* if KQUEUE */ {
1569 			Array!(DWChangeInfo)* changes;
1570 			size_t i;
1571 			try {
1572 				changes = m_changes[fd];
1573 				import std.algorithm : min;
1574 				size_t cnt = min(dst.length, changes.length);
1575 				foreach (DWChangeInfo change; (*changes)[0 .. cnt]) {
1576 					dst[i] = (*changes)[i];
1577 					i++;
1578 				}
1579 				changes.linearRemove((*changes)[0 .. cnt]);
1580 			}
1581 			catch (Exception e) {
1582 				setInternalError!"watcher.readChanges"(Status.ERROR, "Could not read directory changes: " ~ e.msg);
1583 				return false;
1584 			}
1585 			return cast(uint) i;
1586 		}
1587 	}
1588 
1589 	void submitRequest(AsyncAcceptRequest* request)
1590 	{
1591 		request.socket.m_pendingAccepts.insertBack(request);
1592 		processPendingAccepts(request.socket);
1593 	}
1594 
1595 	void submitRequest(AsyncReceiveRequest* request)
1596 	{
1597 		request.socket.m_pendingReceives.insertBack(request);
1598 		processPendingReceives(request.socket);
1599 	}
1600 
1601 	void submitRequest(AsyncSendRequest* request)
1602 	{
1603 		request.socket.m_pendingSends.insertBack(request);
1604 		processPendingSends(request.socket);
1605 	}
1606 
1607 	bool broadcast(in fd_t fd, bool b) {
1608 		m_status = StatusInfo.init;
1609 
1610 		import libasync.internals.socket_compat : socklen_t, setsockopt, SO_BROADCAST, SOL_SOCKET;
1611 
1612 		int val = b?1:0;
1613 		socklen_t len = val.sizeof;
1614 		int err = setsockopt(fd, SOL_SOCKET, SO_BROADCAST, &val, len);
1615 		if (catchError!"setsockopt"(err))
1616 			return false;
1617 
1618 		return true;
1619 	}
1620 
1621 	private bool closeRemoteSocket(fd_t fd, bool forced) {
1622 
1623 		int err;
1624 		static if (LOG) log("shutdown");
1625 		import libasync.internals.socket_compat : shutdown, SHUT_WR, SHUT_RDWR, SHUT_RD;
1626 		if (forced)
1627 			err = shutdown(fd, SHUT_RDWR);
1628 		else
1629 			err = shutdown(fd, SHUT_WR);
1630 
1631 		static if (!EPOLL) {
1632 			kevent_t[2] events;
1633 			static if (LOG) try log("!!DISC delete events"); catch (Throwable e) {}
1634 			EV_SET(&(events[0]), fd, EVFILT_READ, EV_DELETE | EV_DISABLE, 0, 0, null);
1635 			EV_SET(&(events[1]), fd, EVFILT_WRITE, EV_DELETE | EV_DISABLE, 0, 0, null);
1636 			kevent(m_kqueuefd, &(events[0]), 2, null, 0, null);
1637 		}
1638 
1639 		if (err == SOCKET_ERROR && errno == ENOTCONN) {
1640 			// The socket has already been shut down, we can recover from that
1641 		} else  if (catchError!"shutdown"(err)) {
1642 			return false;
1643 		}
1644 
1645 		return true;
1646 	}
1647 
1648 	// for connected sockets
1649 	bool closeSocket(fd_t fd, bool connected, bool forced = false)
1650 	{
1651 		static if (LOG) log("closeSocket");
1652 		if (connected && !closeRemoteSocket(fd, forced) && !forced)
1653 			return false;
1654 
1655 		if (!connected || forced) {
1656 			// todo: flush the socket here?
1657 
1658 			import core.sys.posix.unistd : close;
1659 			static if (LOG) log("close");
1660 			int err = close(fd);
1661 			if (catchError!"closesocket"(err))
1662 				return false;
1663 		}
1664 		return true;
1665 	}
1666 
1667 
1668 	NetworkAddress getAddressFromIP(in string ipAddr, in ushort port = 0, in bool ipv6 = false, in bool tcp = true)
1669 	{
1670 		import libasync.internals.socket_compat : addrinfo, AI_NUMERICHOST, AI_NUMERICSERV;
1671 		addrinfo hints;
1672 		hints.ai_flags |= AI_NUMERICHOST | AI_NUMERICSERV; // Specific to an IP resolver!
1673 
1674 		return getAddressInfo(ipAddr, port, ipv6, tcp, hints);
1675 	}
1676 
1677 
1678 	NetworkAddress getAddressFromDNS(in string host, in ushort port = 0, in bool ipv6 = true, in bool tcp = true)
1679 		/*in {
1680 		 debug import libasync.internals.validator : validateHost;
1681 		 debug assert(validateHost(host), "Trying to connect to an invalid domain");
1682 		 }
1683 		body */{
1684 		import libasync.internals.socket_compat : addrinfo;
1685 		addrinfo hints;
1686 		return getAddressInfo(host, port, ipv6, tcp, hints);
1687 	}
1688 
1689 	void setInternalError(string TRACE)(in Status s, string details = "", error_t error = cast(EPosix) errno())
1690 	{
1691 		if (details.length > 0)
1692 			m_status.text = TRACE ~ ": " ~ details;
1693 		else m_status.text = TRACE;
1694 		m_error = error;
1695 		m_status.code = s;
1696 		static if(LOG) log(m_status);
1697 	}
1698 private:
1699 
1700 	void processPendingAccepts(AsyncSocket socket)
1701 	{
1702 		if (socket.readBlocked) return;
1703 		foreach (request; socket.m_pendingAccepts) {
1704 			// Try to accept a single connection on the socket
1705 			auto result = attemptConnectionAcceptance(socket);
1706 			request.peer = result[0];
1707 			request.family = result[1];
1708 
1709 			if (status.code != Status.OK && !socket.readBlocked) {
1710 				socket.kill();
1711 				socket.handleError();
1712 				return;
1713 			} else if (request.peer != INVALID_SOCKET) {
1714 				socket.m_pendingAccepts.removeFront();
1715 				m_completedSocketAccepts.insertBack(request);
1716 			} else {
1717 				break;
1718 			}
1719 		}
1720 	}
1721 
1722 	void processPendingReceives(AsyncSocket socket)
1723 	{
1724 		if (socket.readBlocked) return;
1725 		foreach (request; socket.m_pendingReceives) {
1726 			// Try to fit all bytes available in the OS receive buffer
1727 			// into the current request's message's buffer, or try a
1728 			// a zero byte receive, should there be no such message.
1729 			bool received = void;
1730 			if (request.message) received = attemptMessageReception(socket, request.message);
1731 			else received = attemptZeroByteReceive(socket);
1732 
1733 			if (status.code != Status.OK && !socket.readBlocked) {
1734 				if (received) m_completedSocketReceives.insertBack(request);
1735 				socket.kill();
1736 				socket.handleError();
1737 				return;
1738 			} else if (request.exact) {
1739 				if (request.message.receivedAll) {
1740 					socket.m_pendingReceives.removeFront();
1741 					m_completedSocketReceives.insertBack(request);
1742 				} else {
1743 					break;
1744 				}
1745 			// New bytes or zero-sized connectionless datagram
1746 			} else if (received || !socket.connectionOriented && !socket.readBlocked) {
1747 				socket.m_pendingReceives.removeFront();
1748 				m_completedSocketReceives.insertBack(request);
1749 			} else {
1750 				break;
1751 			}
1752 		}
1753 	}
1754 
1755 	void processPendingSends(AsyncSocket socket)
1756 	{
1757 		if (socket.writeBlocked) return;
1758 		foreach (request; socket.m_pendingSends) {
1759 			// Try to fit all bytes of the current request's buffer
1760 			// into the OS send buffer.
1761 			auto sent = attemptMessageTransmission(socket, request.message);
1762 
1763 			if (status.code != Status.OK && !socket.writeBlocked) {
1764 				socket.kill();
1765 				socket.handleError();
1766 				return;
1767 			} else if (sent) {
1768 				socket.m_pendingSends.removeFront();
1769 				m_completedSocketSends.insertBack(request);
1770 			} else {
1771 				break;
1772 			}
1773 		}
1774 	}
1775 
1776 	auto attemptConnectionAcceptance(AsyncSocket socket)
1777 	{
1778 		import core.sys.posix.fcntl : O_NONBLOCK;
1779 		import libasync.internals.socket_compat : accept, accept4, sockaddr_storage, socklen_t;
1780 
1781 		fd_t peer = void;
1782 		sockaddr_storage remote = void;
1783 		socklen_t remoteLength = remote.sizeof;
1784 
1785 		enum common = q{
1786 			if (peer == SOCKET_ERROR) {
1787 				m_error = lastError();
1788 
1789 				if (m_error == EPosix.EWOULDBLOCK || m_error == EPosix.EAGAIN) {
1790 					m_status.code = Status.ASYNC;
1791 					socket.readBlocked = true;
1792 					return tuple(INVALID_SOCKET, sockaddr.sa_family.init);
1793 				} else if (m_error == EBADF ||
1794 				           m_error == EINTR ||
1795 				           m_error == EINVAL ||
1796 				           m_error == ENOTSOCK ||
1797 				           m_error == EOPNOTSUPP ||
1798 				           m_error == EFAULT) {
1799 					assert(false, "accept{4} system call on FD " ~ socket.handle.to!string ~ " encountered fatal socket error: " ~ this.error);
1800 				} else if (catchError!"accept"(peer)) {
1801 					.errorf("accept{4} system call on FD %d encountered socket error: %s", socket.handle, this.error);
1802 					return tuple(INVALID_SOCKET, sockaddr.sa_family.init);
1803 				}
1804 			}
1805 		};
1806 
1807 		version (linux) {
1808 			peer = accept4(socket.handle, cast(sockaddr*) &remote, &remoteLength, O_NONBLOCK);
1809 			mixin(common);
1810 		} else {
1811 			peer = accept(socket.handle, cast(sockaddr*) &remote, &remoteLength);
1812 			mixin(common);
1813 			if (!setNonBlock(peer)) {
1814 				.error("Failed to set accepted peer socket non-blocking");
1815 				return tuple(INVALID_SOCKET, sockaddr.sa_family.init);
1816 			}
1817 		}
1818 
1819 		return tuple(peer, remote.ss_family);
1820 	}
1821 
1822 	bool attemptZeroByteReceive(AsyncSocket socket)
1823 	{
1824 		import libasync.internals.socket_compat : recv, MSG_PEEK;
1825 
1826 		ubyte buffer = void;
1827 		auto fd = socket.handle;
1828 
1829 		while (true) {
1830 			auto err = recv(fd, &buffer, 1, MSG_PEEK);
1831 
1832 			.tracef("recv system call on FD %d returned %d", fd, err);
1833 			if (err == SOCKET_ERROR) {
1834 				m_error = lastError();
1835 
1836 				if (m_error == EPosix.EINTR) {
1837 					.tracef("recv system call on FD %d was interrupted before any transfer occured", fd);
1838 					continue;
1839 				} else if (m_error == EPosix.EWOULDBLOCK || m_error == EPosix.EAGAIN) {
1840 					.tracef("recv system call on FD %d would have blocked", fd);
1841 					m_status.code = Status.ASYNC;
1842 					socket.readBlocked = true;
1843 					return false;
1844 				} else if (m_error == EBADF ||
1845 				           m_error == EFAULT ||
1846 				           m_error == EINVAL ||
1847 				           m_error == ENOTCONN ||
1848 				           m_error == ENOTSOCK) {
1849 					.errorf("recv system call on FD %d encountered fatal socket error: %s", fd, this.error);
1850 					assert(false);
1851 				} else if (catchError!"Receive message"(err)) {
1852 					.errorf("recv system call on FD %d encountered socket error: %s", fd, this.error);
1853 					return false;
1854 				}
1855 			} else {
1856 				.tracef("Received %d bytes on FD %d", err, fd);
1857 				m_status.code = Status.OK;
1858 				if (socket.connectionOriented && !err) {
1859 					socket.readBlocked = true;
1860 				}
1861 				return err > 0;
1862 			}
1863 		}
1864 	}
1865 
1866 	/**
1867 	 * Appends as much of the bytes currently available in the OS receive
1868 	 * buffer to the given message's transferred bytes as the message's
1869 	 * buffer's remaining free bytes and the state of the OS receive buffer
1870 	 * allow for, advancing the message's count of transferred bytes in the process.
1871 	 * Sets $(D readBlocked) on indication by the OS that there were
1872 	 * not enough bytes available in the OS receive buffer.
1873 	 * Returns: $(D true) if any bytes were transferred.
1874 	 */
1875 	bool attemptMessageReception(AsyncSocket socket, NetworkMessage* msg)
1876 	in {
1877 		assert(socket.connectionOriented && !msg.receivedAll || !msg.receivedAny, "Message already received");
1878 	} body {
1879 		bool received;
1880 		size_t recvCount = void;
1881 
1882 		if (socket.datagramOriented) {
1883 			recvCount = recvMsg(socket.handle, msg);
1884 			msg.count = msg.count + recvCount;
1885 			received = received || recvCount > 0;
1886 		} else do {
1887 			recvCount = recvMsg(socket.handle, msg);
1888 			msg.count = msg.count + recvCount;
1889 			received = received || recvCount > 0;
1890 		} while (recvCount > 0 && !msg.receivedAll);
1891 
1892 		// More bytes may yet become available in the future
1893 		if (status.code == Status.ASYNC) {
1894 			socket.readBlocked = true;
1895 		// Connection was shutdown in an orderly fashion by the remote peer
1896 		} else if (socket.connectionOriented && status.code == Status.OK && !recvCount) {
1897 			socket.readBlocked = true;
1898 		}
1899 
1900 		return received;
1901 	}
1902 
1903 	/**
1904 	 * Transfers as much of the given message's untransferred bytes
1905 	 * into the OS send buffer as the latter's state allows for,
1906 	 * advancing the message's count of transferred bytes in the process.
1907 	 * Sets $(DDOC_MEMBERS writeBlocked) on indication by the OS that
1908 	 * there was not enough space available in the OS send buffer.
1909 	 * Returns: $(D true) if all of the message's bytes
1910 	 *          have been transferred.
1911 	 */
1912 	bool attemptMessageTransmission(AsyncSocket socket, NetworkMessage* msg)
1913 	in { assert(!msg.sent, "Message already sent"); }
1914 	body {
1915 		size_t sentCount = void;
1916 
1917 		do {
1918 			sentCount = sendMsg(socket.handle, msg);
1919 			msg.count = msg.count + sentCount;
1920 		} while (sentCount > 0 && !msg.sent);
1921 
1922 		if (status.code == Status.ASYNC) {
1923 			socket.writeBlocked = true;
1924 		}
1925 
1926 		return msg.sent;
1927 	}
1928 
1929 	/// For DirectoryWatcher
1930 	/// In kqueue/vnode, all we get is the folder in which changes occured.
1931 	/// We have to figure out what changed exactly and put the results in a container
1932 	/// for the readChanges call.
1933 	static if (!EPOLL) bool compareFolderFiles(DWFolderInfo fi, DWFileEvent events) {
1934 		import std.file;
1935 		import std.path : buildPath;
1936 		try {
1937 			Array!Path currFiles;
1938 			auto wd = fi.wi.wd;
1939 			auto path = fi.wi.path;
1940 			auto fd = fi.fd;
1941 			Array!(DWChangeInfo)* changes = m_changes.get(fd, null);
1942 			assert(changes !is null, "Invalid wd, could not find changes array.");
1943 			//import std.stdio : writeln;
1944 			//writeln("Scanning path: ", path.toNativeString());
1945 			//writeln("m_dwFiles length: ", m_dwFiles.length);
1946 
1947 			// get a list of the folder
1948 			foreach (de; dirEntries(path.toNativeString(), SpanMode.shallow)) {
1949 				//writeln(de.name);
1950 				Path entryPath = Path(de.name);
1951 				if (!entryPath.absolute)
1952 					entryPath = path ~ entryPath;
1953 				bool found;
1954 
1955 				if (!de.isDir()) {
1956 					// compare it to the cached list fixme: make it faster using another container?
1957 					foreach (ref const fd_t id, ref const DWFileInfo file; m_dwFiles) {
1958 						if (file.folder != wd) continue; // this file isn't in the evented folder
1959 						if (file.path == entryPath) {
1960 							found = true;
1961 							static if (LOG) log("File modified? " ~ entryPath.toNativeString() ~ " at: " ~ de.timeLastModified.to!string ~ " vs: " ~ file.lastModified.to!string);
1962 							// Check if it was modified
1963 							if (!isDir(entryPath.toNativeString()) && de.timeLastModified > file.lastModified)
1964 							{
1965 								DWFileInfo dwf = file;
1966 								dwf.lastModified = de.timeLastModified;
1967 								m_dwFiles[id] = dwf;
1968 								changes.insertBack(DWChangeInfo(DWFileEvent.MODIFIED, file.path));
1969 							}
1970 							break;
1971 						}
1972 					}
1973 				} else {
1974 					foreach (ref const DWFolderInfo folder; m_dwFolders) {
1975 						if (folder.wi.path == entryPath) {
1976 							found = true;
1977 							break;
1978 						}
1979 					}
1980 				}
1981 
1982 				// This file/folder is new in the folder
1983 				if (!found) {
1984 					changes.insertBack(DWChangeInfo(DWFileEvent.CREATED, entryPath));
1985 
1986 					if (fi.wi.recursive && de.isDir()) {
1987 						/// This is the complicated part. The folder needs to be watched, and all the events
1988 						/// generated for every file/folder found recursively inside it,
1989 						/// Useful e.g. when mkdir -p is used.
1990 						watch(fd, WatchInfo(fi.wi.events, entryPath, fi.wi.recursive, wd) );
1991 						void genEvents(Path subpath) {
1992 							foreach (de; dirEntries(subpath.toNativeString(), SpanMode.shallow)) {
1993 								auto subsubpath = Path(de.name);
1994 								if (!subsubpath.absolute())
1995 									subsubpath = subpath ~ subsubpath;
1996 								changes.insertBack(DWChangeInfo(DWFileEvent.CREATED, subsubpath));
1997 								if (isDir(subsubpath.toNativeString()))
1998 									genEvents(subsubpath);
1999 							}
2000 						}
2001 
2002 						genEvents(entryPath);
2003 
2004 					}
2005 					else {
2006 						EventInfo* evinfo;
2007 						try evinfo = m_watchers[fd]; catch(Throwable) { assert(false, "Could retrieve event info, directory watcher was not initialized properly, or you are operating on a closed directory watcher."); }
2008 
2009 						static if (LOG) log("Adding path: " ~ path.toNativeString());
2010 
2011 						import core.sys.posix.fcntl : open;
2012 						fd_t fwd = open(entryPath.toNativeString().toStringz, O_EVTONLY);
2013 						if (catchError!"open(watch)"(fwd))
2014 							return 0;
2015 
2016 						kevent_t _event;
2017 
2018 						EV_SET(&_event, fwd, EVFILT_VNODE, EV_ADD | EV_CLEAR, fi.wi.events, 0, cast(void*) evinfo);
2019 
2020 						int err = kevent(m_kqueuefd, &_event, 1, null, 0, null);
2021 
2022 						if (catchError!"kevent_timer_add"(err))
2023 							return 0;
2024 
2025 						m_dwFiles[fwd] = DWFileInfo(fi.wi.wd, entryPath, de.timeLastModified, false);
2026 
2027 					}
2028 				}
2029 
2030 				// This file/folder is now current. This avoids a deletion event.
2031 				currFiles.insert(entryPath);
2032 			}
2033 
2034 			/// Now search for files/folders that were deleted in this directory (no recursivity needed).
2035 			/// Unwatch this directory and generate delete event only for the root dir
2036 			foreach (ref const fd_t id, ref const DWFileInfo file; m_dwFiles) {
2037 				if (file.folder != wd) continue; // skip those files in another folder than the evented one
2038 				bool found;
2039 				foreach (Path curr; currFiles) {
2040 					if (file.path == curr){
2041 						found = true;
2042 						break;
2043 					}
2044 				}
2045 				// this file/folder was in the folder but it's not there anymore
2046 				if (!found) {
2047 					// writeln("Deleting: ", file.path.toNativeString());
2048 					kevent_t _event;
2049 					EV_SET(&_event, cast(int) id, EVFILT_VNODE, EV_DELETE, 0, 0, null);
2050 					int err = kevent(m_kqueuefd, &_event, 1, null, 0, null);
2051 					if (catchError!"kevent_unwatch"(err))
2052 						return false;
2053 					import core.sys.posix.unistd : close;
2054 					err = close(id);
2055 					if (catchError!"close(dwFile)"(err))
2056 						return false;
2057 					changes.insert(DWChangeInfo(DWFileEvent.DELETED, file.path));
2058 
2059 					if (fi.wi.recursive && file.is_dir)
2060 						unwatch(fd, id);
2061 
2062 					m_dwFiles.remove(id);
2063 
2064 				}
2065 
2066 			}
2067 			if(changes.empty)
2068 				return false; // unhandled event, skip the callback
2069 
2070 			// fixme: how to implement moved_from moved_to for rename?
2071 		}
2072 		catch (Exception e)
2073 		{
2074 			try setInternalError!"compareFiles"(Status.ERROR, "Fatal error in file comparison: " ~ e.toString()); catch(Throwable) {}
2075 			return false;
2076 		}
2077 		return true;
2078 	}
2079 
2080 	// socket must not be connected
2081 	bool setNonBlock(fd_t fd) {
2082 		import core.sys.posix.fcntl : fcntl, F_GETFL, F_SETFL, O_NONBLOCK;
2083 		int flags = fcntl(fd, F_GETFL);
2084 		flags |= O_NONBLOCK;
2085 		int err = fcntl(fd, F_SETFL, flags);
2086 		if (catchError!"F_SETFL O_NONBLOCK"(err)) {
2087 			closeSocket(fd, false);
2088 			return false;
2089 		}
2090 		return true;
2091 	}
2092 
2093 	bool onTCPAccept(fd_t fd, TCPAcceptHandler del, int events)
2094 	{
2095 		import libasync.internals.socket_compat : AF_INET, AF_INET6, socklen_t, accept4, accept;
2096 		enum O_NONBLOCK     = 0x800;    // octal    04000
2097 
2098 		static if (EPOLL)
2099 		{
2100 			const uint epoll_events = cast(uint) events;
2101 			const bool incoming = cast(bool) (epoll_events & EPOLLIN);
2102 			const bool error = cast(bool) (epoll_events & EPOLLERR);
2103 		}
2104 		else
2105 		{
2106 			const short kqueue_events = cast(short) (events >> 16);
2107 			const ushort kqueue_flags = cast(ushort) (events & 0xffff);
2108 			const bool incoming = cast(bool)(kqueue_events & EVFILT_READ);
2109 			const bool error = cast(bool)(kqueue_flags & EV_ERROR);
2110 		}
2111 
2112 		if (incoming) { // accept incoming connection
2113 			do {
2114 				NetworkAddress addr;
2115 				addr.family = AF_INET;
2116 				socklen_t addrlen = addr.sockAddrLen;
2117 
2118 				bool ret;
2119 				static if (EPOLL) {
2120 					/// Accept the connection and create a client socket
2121 					fd_t csock = accept4(fd, addr.sockAddr, &addrlen, O_NONBLOCK);
2122 
2123 					if (catchError!".accept"(csock)) {
2124 						return true;// this way we know there's nothing left to accept
2125 					}
2126 				} else /* if KQUEUE */ {
2127 					fd_t csock = accept(fd, addr.sockAddr, &addrlen);
2128 
2129 					if (catchError!".accept"(csock)) {
2130 						return true;
2131 					}
2132 
2133 					// Make non-blocking so subsequent calls to recv/send return immediately
2134 					if (!setNonBlock(csock)) {
2135 						continue;
2136 					}
2137 				}
2138 
2139 				// Set client address family based on address length
2140 				if (addrlen > addr.sockAddrLen)
2141 					addr.family = AF_INET6;
2142 				if (addrlen == socklen_t.init) {
2143 					setInternalError!"addrlen"(Status.ABORT);
2144 					import core.sys.posix.unistd : close;
2145 					close(csock);
2146 					continue;
2147 				}
2148 
2149 				// Allocate a new connection handler object
2150 				AsyncTCPConnection conn;
2151 				try conn = ThreadMem.alloc!AsyncTCPConnection(m_evLoop);
2152 				catch (Exception e){ assert(false, "Allocation failure"); }
2153 				conn.peer = addr;
2154 				conn.socket = csock;
2155 				conn.inbound = true;
2156 
2157 				nothrow void closeClient() {
2158 					try ThreadMem.free(conn);
2159 					catch (Exception e){ assert(false, "Free failure"); }
2160 					closeSocket(csock, true, true);
2161 				}
2162 
2163 				// Get the connection handler from the callback
2164 				TCPEventHandler evh;
2165 				try {
2166 					evh = del(conn);
2167 					if (evh == TCPEventHandler.init || !initTCPConnection(csock, conn, evh, true)) {
2168 						static if (LOG) try log("Failed to connect"); catch (Throwable e) {}
2169 						closeClient();
2170 						continue;
2171 					}
2172 					static if (LOG) try log("Connection Started with " ~ csock.to!string); catch (Throwable e) {}
2173 				}
2174 				catch (Exception e) {
2175 					static if (LOG) log("Close socket");
2176 					closeClient();
2177 					continue;
2178 				}
2179 
2180 				// Announce connection state to the connection handler
2181 				try {
2182 					static if (LOG) log("Connected to: " ~ addr.toString());
2183 					evh.conn.connected = true;
2184 					evh(TCPEvent.CONNECT);
2185 				}
2186 				catch (Exception e) {
2187 					closeClient();
2188 					setInternalError!"del@TCPEvent.CONNECT"(Status.ABORT);
2189 				}
2190 				/*if (m_status.code == Status.ABORT)
2191 				{
2192 					try evh(TCPEvent.ERROR);
2193 					catch (Throwable e) {}
2194 				}*/
2195 			} while(true);
2196 
2197 		}
2198 
2199 		if (error) { // socket failure
2200 			m_status.text = "listen socket error";
2201 			int err;
2202 			import libasync.internals.socket_compat : getsockopt, socklen_t, SOL_SOCKET, SO_ERROR;
2203 			socklen_t len = int.sizeof;
2204 			getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len);
2205 			m_error = cast(error_t) err;
2206 			m_status.code = Status.ABORT;
2207 			static if(LOG) log(m_status);
2208 
2209 			// call with null to announce a failure
2210 			try del(null);
2211 			catch(Exception e){ assert(false, "Failure calling TCPAcceptHandler(null)"); }
2212 
2213 			/// close the listener?
2214 			// closeSocket(fd, false);
2215 		}
2216 		return true;
2217 	}
2218 
2219 	bool onUDPTraffic(fd_t fd, UDPHandler del, int events)
2220 	{
2221 		static if (EPOLL)
2222 		{
2223 			const uint epoll_events = cast(uint) events;
2224 			const bool read = cast(bool) (epoll_events & EPOLLIN);
2225 			const bool write = cast(bool) (epoll_events & EPOLLOUT);
2226 			const bool error = cast(bool) (epoll_events & EPOLLERR);
2227 		}
2228 		else
2229 		{
2230 			const short kqueue_events = cast(short) (events >> 16);
2231 			const ushort kqueue_flags = cast(ushort) (events & 0xffff);
2232 			const bool read = cast(bool) (kqueue_events & EVFILT_READ);
2233 			const bool write = cast(bool) (kqueue_events & EVFILT_WRITE);
2234 			const bool error = cast(bool) (kqueue_flags & EV_ERROR);
2235 		}
2236 
2237 		if (read) {
2238 			try {
2239 				del(UDPEvent.READ);
2240 			}
2241 			catch (Exception e) {
2242 				setInternalError!"del@UDPEvent.READ"(Status.ABORT);
2243 				return false;
2244 			}
2245 		}
2246 
2247 		if (write) {
2248 
2249 			try {
2250 				del(UDPEvent.WRITE);
2251 			}
2252 			catch (Exception e) {
2253 				setInternalError!"del@UDPEvent.WRITE"(Status.ABORT);
2254 				return false;
2255 			}
2256 		}
2257 
2258 		if (error) // socket failure
2259 		{
2260 
2261 			import libasync.internals.socket_compat : socklen_t, getsockopt, SOL_SOCKET, SO_ERROR;
2262 			import core.sys.posix.unistd : close;
2263 			int err;
2264 			socklen_t errlen = err.sizeof;
2265 			getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &errlen);
2266 			setInternalError!"EPOLLERR"(Status.ABORT, null, cast(error_t)err);
2267 			close(fd);
2268 			return false;
2269 		}
2270 
2271 		return true;
2272 	}
2273 
2274 	bool onEvent(fd_t fd, EventHandler del, int events)
2275 	{
2276 		bool connect = void, close = void;
2277 		auto conn = del.ev;
2278 
2279 		static if (EPOLL)
2280 		{
2281 			const uint epoll_events = cast(uint) events;
2282 			const bool read = cast(bool) (epoll_events & EPOLLIN);
2283 			const bool write = cast(bool) (epoll_events & EPOLLOUT);
2284 			const bool error = cast(bool) (epoll_events & EPOLLERR);
2285 			if (conn.stateful) {
2286 				connect = ((cast(bool) (epoll_events & EPOLLIN)) || (cast(bool) (epoll_events & EPOLLOUT))) && !conn.disconnecting && !conn.connected;
2287 				close = (cast(bool) (epoll_events & EPOLLRDHUP)) || (cast(bool) (events & EPOLLHUP));
2288 			}
2289 		}
2290 		else
2291 		{
2292 			const short kqueue_events = cast(short) (events >> 16);
2293 			const ushort kqueue_flags = cast(ushort) (events & 0xffff);
2294 			const bool read = cast(bool) (kqueue_events & EVFILT_READ);
2295 			const bool write = cast(bool) (kqueue_events & EVFILT_WRITE);
2296 			const bool error = cast(bool) (kqueue_flags & EV_ERROR);
2297 			if (conn.stateful) {
2298 				connect = cast(bool) ((kqueue_events & EVFILT_READ || kqueue_events & EVFILT_WRITE) && !conn.disconnecting && !conn.connected);
2299 				close = cast(bool) (kqueue_flags & EV_EOF);
2300 			}
2301 		}
2302 
2303 		if (write && (!conn.stateful || conn.connected && !conn.disconnecting && conn.writeBlocked)) {
2304 			if (conn.stateful) conn.writeBlocked = false;
2305 			static if (LOG) try log("!write"); catch (Throwable e) {}
2306 			try {
2307 				del(EventCode.WRITE);
2308 			}
2309 			catch (Exception e) {
2310 				setInternalError!"del@Event.WRITE"(Status.ABORT);
2311 				return false;
2312 			}
2313 		}
2314 
2315 		if (read && (!conn.stateful || conn.connected && !conn.disconnecting)) {
2316 			static if (LOG) try log("!read"); catch (Throwable e) {}
2317 			try {
2318 				del(EventCode.READ);
2319 			}
2320 			catch (Exception e) {
2321 				setInternalError!"del@Event.READ"(Status.ABORT);
2322 				return false;
2323 			}
2324 		}
2325 
2326 		if (conn.stateful && close && conn.connected && !conn.disconnecting)
2327 		{
2328 			static if (LOG) try log("!close"); catch (Throwable e) {}
2329 			// todo: See if this hack is still necessary
2330 			if (!conn.connected && conn.disconnecting)
2331 				return true;
2332 
2333 			try del(EventCode.CLOSE);
2334 			catch (Exception e) {
2335 				setInternalError!"del@Event.CLOSE"(Status.ABORT);
2336 				return false;
2337 			}
2338 
2339 			// Careful here, the delegate might have closed the connection already
2340 			if (conn.connected) {
2341 				closeSocket(fd, !conn.disconnecting, conn.connected);
2342 
2343 				m_status.code = Status.ABORT;
2344 				conn.disconnecting = true;
2345 				conn.connected = false;
2346 				conn.writeBlocked = true;
2347 				conn.id = 0;
2348 
2349 				try ThreadMem.free(conn.evInfo);
2350 				catch (Exception e){ assert(false, "Error freeing resources"); }
2351 			}
2352 			return true;
2353 		}
2354 
2355 		if (error) // failure
2356 		{
2357 			setInternalError!"EPOLLERR"(Status.ABORT, null);
2358 			try {
2359 				del(EventCode.ERROR);
2360 			}
2361 			catch (Exception e)
2362 			{
2363 				setInternalError!"del@Event.ERROR"(Status.ABORT);
2364 				// ignore failure...
2365 			}
2366 			return false;
2367 		}
2368 
2369 		if (conn.stateful && connect) {
2370 			static if (LOG) try log("!connect"); catch (Throwable e) {}
2371 			conn.connected = true;
2372 			try del(EventCode.CONNECT);
2373 			catch (Exception e) {
2374 				setInternalError!"del@Event.CONNECT"(Status.ABORT);
2375 				return false;
2376 			}
2377 			return true;
2378 		}
2379 
2380 		return true;
2381 	}
2382 
2383 	/// Handle an event for a connectionless socket
2384 	bool onCLSocketEvent(AsyncSocket socket, int events)
2385 	{
2386 		static if (EPOLL)
2387 		{
2388 			const uint epoll_events = cast(uint) events;
2389 			const bool read = cast(bool) (epoll_events & EPOLLIN);
2390 			const bool write = cast(bool) (epoll_events & EPOLLOUT);
2391 			const bool error = cast(bool) (epoll_events & EPOLLERR);
2392 		}
2393 		else
2394 		{
2395 			const short kqueue_events = cast(short) (events >> 16);
2396 			const ushort kqueue_flags = cast(ushort) (events & 0xffff);
2397 			const bool read = cast(bool) (kqueue_events & EVFILT_READ);
2398 			const bool write = cast(bool) (kqueue_events & EVFILT_WRITE);
2399 			const bool error = cast(bool) (kqueue_flags & EV_ERROR);
2400 		}
2401 
2402 		if (read) {
2403 			tracef("Read on FD %d", socket.handle);
2404 
2405 			socket.readBlocked = false;
2406 			processPendingReceives(socket);
2407 		}
2408 
2409 		if (write) {
2410 			tracef("Write on FD %d", socket.handle);
2411 
2412 			socket.writeBlocked = false;
2413 			processPendingSends(socket);
2414 		}
2415 
2416 		if (error) {
2417 			tracef("Error on FD %d", socket.handle);
2418 
2419 			auto err = cast(error_t) socket.lastError;
2420 			setInternalError!"AsyncSocket.ERROR"(Status.ABORT, null, cast(error_t) err);
2421 			socket.kill();
2422 			socket.handleError();
2423 			return false;
2424 		}
2425 
2426 		return true;
2427 	}
2428 
2429 	/// Handle an event for a connection-oriented, active socket
2430 	bool onCOASocketEvent(AsyncSocket socket, int events)
2431 	{
2432 		static if (EPOLL) {
2433 			const uint epoll_events = cast(uint) events;
2434 			bool read = cast(bool) (epoll_events & EPOLLIN);
2435 			bool write = cast(bool) (epoll_events & EPOLLOUT);
2436 			const bool error = cast(bool) (epoll_events & EPOLLERR);
2437 			const bool connect = ((cast(bool) (epoll_events & EPOLLIN)) || (cast(bool) (epoll_events & EPOLLOUT))) && !socket.disconnecting && !socket.connected;
2438 			const bool close = (cast(bool) (epoll_events & EPOLLRDHUP)) || (cast(bool) (events & EPOLLHUP));
2439 		} else {
2440 			const short kqueue_events = cast(short) (events >> 16);
2441 			const ushort kqueue_flags = cast(ushort) (events & 0xffff);
2442 			bool read = cast(bool) (kqueue_events & EVFILT_READ);
2443 			bool write = cast(bool) (kqueue_events & EVFILT_WRITE);
2444 			const bool error = cast(bool) (kqueue_flags & EV_ERROR);
2445 			const bool connect = cast(bool) ((kqueue_events & EVFILT_READ || kqueue_events & EVFILT_WRITE) && !socket.disconnecting && !socket.connected);
2446 			const bool close = cast(bool) (kqueue_flags & EV_EOF);
2447 		}
2448 
2449 		tracef("AsyncSocket events: (read: %s, write: %s, error: %s, connect: %s, close: %s)", read, write, error, connect, close);
2450 
2451 		if (error) {
2452 			tracef("Error on FD %d", socket.handle);
2453 
2454 			auto err = cast(error_t) socket.lastError;
2455 			if (err == ECONNRESET ||
2456 			    err == EPIPE) {
2457 				socket.kill();
2458 				socket.handleClose();
2459 				return true;
2460 			}
2461 
2462 			setInternalError!"AsyncSocket.ERROR"(Status.ABORT, null, cast(error_t) err);
2463 			socket.kill();
2464 			socket.handleError();
2465 			return false;
2466 		}
2467 
2468 		if (connect) {
2469 			tracef("Connect on FD %d", socket.handle);
2470 
2471 			socket.connected = true;
2472 			socket.readBlocked = false;
2473 			socket.writeBlocked = false;
2474 			socket.handleConnect();
2475 			read = false;
2476 			write = false;
2477 		}
2478 
2479 		if ((/+read ||+/ write) && socket.connected && !socket.disconnecting && socket.writeBlocked) {
2480 			tracef("Write on FD %d", socket.handle);
2481 
2482 			socket.writeBlocked = false;
2483 			processPendingSends(socket);
2484 		}/+ else {
2485 			read = true;
2486 		}+/
2487 
2488 		if (read && socket.connected && !socket.disconnecting && socket.readBlocked) {
2489 			tracef("Read on FD %d", socket.handle);
2490 
2491 			socket.readBlocked = false;
2492 			processPendingReceives(socket);
2493 		}
2494 
2495 		if (close && socket.connected && !socket.disconnecting)
2496 		{
2497 			tracef("Close on FD %d", socket.handle);
2498 			socket.kill();
2499 			socket.handleClose();
2500 			return true;
2501 		}
2502 
2503 		return true;
2504 	}
2505 
2506 	/// Handle an event for a connection-oriented, passive socket
2507 	bool onCOPSocketEvent(AsyncSocket socket, int events)
2508 	{
2509 		import core.sys.posix.fcntl : O_NONBLOCK;
2510 		import libasync.internals.socket_compat : accept, accept4, sockaddr, socklen_t;
2511 
2512 		static if (EPOLL) {
2513 			const uint epoll_events = cast(uint) events;
2514 			const bool incoming = cast(bool) (epoll_events & EPOLLIN);
2515 			const bool error = cast(bool) (epoll_events & EPOLLERR);
2516 		} else {
2517 			const short kqueue_events = cast(short) (events >> 16);
2518 			const ushort kqueue_flags = cast(ushort) (events & 0xffff);
2519 			const bool incoming = cast(bool) (kqueue_events & EVFILT_READ);
2520 			const bool error = cast(bool) (kqueue_flags & EV_ERROR);
2521 		}
2522 
2523 		tracef("AsyncSocket events: (incoming: %s, error: %s)", incoming, error);
2524 
2525 		if (incoming) {
2526 			tracef("Incoming on FD %d", socket.handle);
2527 
2528 			socket.readBlocked = false;
2529 			processPendingAccepts(socket);
2530 		}
2531 
2532 		if (error) {
2533 			tracef("Error on FD %d", socket.handle);
2534 
2535 			auto err = cast(error_t) socket.lastError;
2536 			setInternalError!"AsyncSocket.ERROR"(Status.ABORT, null, cast(error_t) err);
2537 			socket.kill();
2538 			socket.handleError();
2539 			return false;
2540 		}
2541 
2542 		return true;
2543 	}
2544 
2545 	bool onTCPTraffic(fd_t fd, TCPEventHandler del, int events, AsyncTCPConnection conn)
2546 	{
2547 		//log("TCP Traffic at FD#" ~ fd.to!string);
2548 
2549 		static if (EPOLL)
2550 		{
2551 			const uint epoll_events = cast(uint) events;
2552 			const bool connect = ((cast(bool) (epoll_events & EPOLLIN)) || (cast(bool) (epoll_events & EPOLLOUT))) && !conn.disconnecting && !conn.connected;
2553 			bool read = cast(bool) (epoll_events & EPOLLIN);
2554 			const bool write = cast(bool) (epoll_events & EPOLLOUT);
2555 			const bool error = cast(bool) (epoll_events & EPOLLERR);
2556 			const bool close = (cast(bool) (epoll_events & EPOLLRDHUP)) || (cast(bool) (events & EPOLLHUP));
2557 		}
2558 		else /* if KQUEUE */
2559 		{
2560 			const short kqueue_events = cast(short) (events >> 16);
2561 			const ushort kqueue_flags = cast(ushort) (events & 0xffff);
2562 			const bool connect = cast(bool) ((kqueue_events & EVFILT_READ || kqueue_events & EVFILT_WRITE) && !conn.disconnecting && !conn.connected);
2563 			bool read = cast(bool) (kqueue_events & EVFILT_READ) && !connect;
2564 			const bool write = cast(bool) (kqueue_events & EVFILT_WRITE);
2565 			const bool error = cast(bool) (kqueue_flags & EV_ERROR);
2566 			const bool close = cast(bool) (kqueue_flags & EV_EOF);
2567 		}
2568 
2569 		if (error)
2570 		{
2571 			import libasync.internals.socket_compat : socklen_t, getsockopt, SOL_SOCKET, SO_ERROR;
2572 			int err;
2573 			static if (LOG) try log("Also got events: " ~ connect.to!string ~ " c " ~ read.to!string ~ " r " ~ write.to!string ~ " write"); catch (Throwable e) {}
2574 			socklen_t errlen = err.sizeof;
2575 			getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &errlen);
2576 			setInternalError!"EPOLLERR"(Status.ABORT, null, cast(error_t)err);
2577 			try
2578 				del(TCPEvent.ERROR);
2579 			catch (Exception e)
2580 			{
2581 				setInternalError!"del@TCPEvent.ERROR"(Status.ABORT);
2582 				// ignore failure...
2583 			}
2584 			return false;
2585 		}
2586 
2587 
2588 		if (connect)
2589 		{
2590 			static if (LOG) try log("!connect"); catch (Throwable e) {}
2591 			conn.connected = true;
2592 			try del(TCPEvent.CONNECT);
2593 			catch (Exception e) {
2594 				setInternalError!"del@TCPEvent.CONNECT"(Status.ABORT);
2595 				return false;
2596 			}
2597 			return true;
2598 		}
2599 
2600 
2601 		if ((read || write) && conn.connected && !conn.disconnecting && conn.writeBlocked)
2602 		{
2603 			conn.writeBlocked = false;
2604 			static if (LOG) try log("!write"); catch (Throwable e) {}
2605 			try del(TCPEvent.WRITE);
2606 			catch (Exception e) {
2607 				setInternalError!"del@TCPEvent.WRITE"(Status.ABORT);
2608 				return false;
2609 			}
2610 		}
2611 		else {
2612 			read = true;
2613 		}
2614 
2615 		if (read && conn.connected && !conn.disconnecting)
2616 		{
2617 			static if (LOG) try log("!read"); catch (Throwable e) {}
2618 			try del(TCPEvent.READ);
2619 			catch (Exception e) {
2620 				setInternalError!"del@TCPEvent.READ"(Status.ABORT);
2621 				return false;
2622 			}
2623 		}
2624 
2625 		if (close && conn.connected && !conn.disconnecting)
2626 		{
2627 			static if (LOG) try log("!close"); catch (Throwable e) {}
2628 			// todo: See if this hack is still necessary
2629 			if (!conn.connected && conn.disconnecting)
2630 				return true;
2631 
2632 			try del(TCPEvent.CLOSE);
2633 			catch (Exception e) {
2634 				setInternalError!"del@TCPEvent.CLOSE"(Status.ABORT);
2635 				return false;
2636 			}
2637 
2638 			// Careful here, the delegate might have closed the connection already
2639 			if (conn.connected) {
2640 				closeSocket(fd, !conn.disconnecting, conn.connected);
2641 
2642 				m_status.code = Status.ABORT;
2643 				conn.disconnecting = true;
2644 				conn.connected = false;
2645 				conn.writeBlocked = true;
2646 				del.conn.socket = 0;
2647 
2648 				try ThreadMem.free(del.conn.evInfo);
2649 				catch (Exception e){ assert(false, "Error freeing resources"); }
2650 
2651 				if (del.conn.inbound) {
2652 					static if (LOG) log("Freeing inbound connection");
2653 					try ThreadMem.free(del.conn);
2654 					catch (Exception e){ assert(false, "Error freeing resources"); }
2655 				}
2656 			}
2657 		}
2658 		return true;
2659 	}
2660 
2661 	bool initUDPSocket(fd_t fd, AsyncUDPSocket ctxt, UDPHandler del)
2662 	{
2663 		import libasync.internals.socket_compat : bind;
2664 		import core.sys.posix.unistd;
2665 
2666 		fd_t err;
2667 
2668 		EventObject eo;
2669 		eo.udpHandler = del;
2670 		EventInfo* ev;
2671 		try ev = ThreadMem.alloc!EventInfo(fd, EventType.UDPSocket, eo, m_instanceId);
2672 		catch (Exception e){ assert(false, "Allocation error"); }
2673 		ctxt.evInfo = ev;
2674 		nothrow bool closeAll() {
2675 			try ThreadMem.free(ev);
2676 			catch(Exception e){ assert(false, "Failed to free resources"); }
2677 			ctxt.evInfo = null;
2678 			// socket will be closed by caller if return false
2679 			return false;
2680 		}
2681 
2682 		static if (EPOLL)
2683 		{
2684 			epoll_event _event;
2685 			_event.data.ptr = ev;
2686 			_event.events = EPOLLIN | EPOLLOUT | EPOLLET;
2687 			err = epoll_ctl(m_epollfd, EPOLL_CTL_ADD, fd, &_event);
2688 			if (catchError!"epoll_ctl"(err)) {
2689 				return closeAll();
2690 			}
2691 			nothrow void deregisterEvent()
2692 			{
2693 				epoll_ctl(m_epollfd, EPOLL_CTL_DEL, fd, &_event);
2694 			}
2695 		}
2696 		else /* if KQUEUE */
2697 		{
2698 			kevent_t[2] _event;
2699 			EV_SET(&(_event[0]), fd, EVFILT_READ, EV_ADD | EV_ENABLE | EV_CLEAR, 0, 0, ev);
2700 			EV_SET(&(_event[1]), fd, EVFILT_WRITE, EV_ADD | EV_ENABLE | EV_CLEAR, 0, 0, ev);
2701 			err = kevent(m_kqueuefd, &(_event[0]), 2, null, 0, null);
2702 			if (catchError!"kevent_add_udp"(err))
2703 				return closeAll();
2704 
2705 			nothrow void deregisterEvent() {
2706 				EV_SET(&(_event[0]), fd, EVFILT_READ, EV_DELETE | EV_DISABLE, 0, 0, null);
2707 				EV_SET(&(_event[1]), fd, EVFILT_WRITE, EV_DELETE | EV_DISABLE, 0, 0, null);
2708 				kevent(m_kqueuefd, &(_event[0]), 2, null, 0, cast(libasync.internals.kqueue.timespec*) null);
2709 			}
2710 
2711 		}
2712 
2713 		/// Start accepting packets
2714 		err = bind(fd, ctxt.local.sockAddr, ctxt.local.sockAddrLen);
2715 		if (catchError!"bind"(err)) {
2716 			deregisterEvent();
2717 			return closeAll();
2718 		}
2719 
2720 		return true;
2721 	}
2722 
2723 	bool initTCPListener(fd_t fd, AsyncTCPListener ctxt, TCPAcceptHandler del, bool reusing = false)
2724 	in {
2725 		assert(ctxt.local !is NetworkAddress.init);
2726 	}
2727 	body {
2728 		import libasync.internals.socket_compat : bind, listen, SOMAXCONN;
2729 		fd_t err;
2730 
2731 		/// Create callback object
2732 		EventObject eo;
2733 		eo.tcpAcceptHandler = del;
2734 		EventInfo* ev;
2735 
2736 		try ev = ThreadMem.alloc!EventInfo(fd, EventType.TCPAccept, eo, m_instanceId);
2737 		catch (Exception e){ assert(false, "Allocation error"); }
2738 		ctxt.evInfo = ev;
2739 		nothrow bool closeAll() {
2740 			try ThreadMem.free(ev);
2741 			catch(Exception e){ assert(false, "Failed free"); }
2742 			ctxt.evInfo = null;
2743 			// Socket is closed by run()
2744 			//closeSocket(fd, false);
2745 			return false;
2746 		}
2747 
2748 		/// Add socket to event loop
2749 		static if (EPOLL)
2750 		{
2751 			epoll_event _event;
2752 			_event.data.ptr = ev;
2753 			_event.events = EPOLLIN | EPOLLET;
2754 			err = epoll_ctl(m_epollfd, EPOLL_CTL_ADD, fd, &_event);
2755 			if (catchError!"epoll_ctl_add"(err))
2756 				return closeAll();
2757 
2758 			nothrow void deregisterEvent() {
2759 				// epoll cleans itself when closing the socket
2760 			}
2761 		}
2762 		else /* if KQUEUE */
2763 		{
2764 			kevent_t _event;
2765 			EV_SET(&_event, fd, EVFILT_READ, EV_ADD | EV_ENABLE, 0, 0, ev);
2766 			err = kevent(m_kqueuefd, &_event, 1, null, 0, null);
2767 			if (catchError!"kevent_add_listener"(err))
2768 				return closeAll();
2769 
2770 			nothrow void deregisterEvent() {
2771 				EV_SET(&_event, fd, EVFILT_READ, EV_CLEAR | EV_DISABLE, 0, 0, null);
2772 				kevent(m_kqueuefd, &_event, 1, null, 0, null);
2773 				// wouldn't know how to deal with errors here...
2774 			}
2775 		}
2776 
2777 		/// Bind and listen to socket
2778 		if (!reusing) {
2779 			err = bind(fd, ctxt.local.sockAddr, ctxt.local.sockAddrLen);
2780 			if (catchError!"bind"(err)) {
2781 				deregisterEvent();
2782 				return closeAll();
2783 			}
2784 
2785 			err = listen(fd, SOMAXCONN);
2786 			if (catchError!"listen"(err)) {
2787 				deregisterEvent();
2788 				return closeAll();
2789 			}
2790 
2791 		}
2792 		return true;
2793 	}
2794 
2795 	bool initTCPConnection(fd_t fd, AsyncTCPConnection ctxt, TCPEventHandler del, bool inbound = false)
2796 	in {
2797 		assert(ctxt.peer.port != 0, "Connecting to an invalid port");
2798 	}
2799 	body {
2800 
2801 		fd_t err;
2802 
2803 		/// Create callback object
2804 		import libasync.internals.socket_compat : connect;
2805 		EventObject eo;
2806 		eo.tcpEvHandler = del;
2807 		EventInfo* ev;
2808 
2809 		try ev = ThreadMem.alloc!EventInfo(fd, EventType.TCPTraffic, eo, m_instanceId);
2810 		catch (Exception e){ assert(false, "Allocation error"); }
2811 		assert(ev !is null);
2812 		ctxt.evInfo = ev;
2813 		nothrow bool destroyEvInfo() {
2814 			try ThreadMem.free(ev);
2815 			catch(Exception e){ assert(false, "Failed to free resources"); }
2816 			ctxt.evInfo = null;
2817 
2818 			// Socket will be closed by run()
2819 			// closeSocket(fd, false);
2820 			return false;
2821 		}
2822 
2823 		/// Add socket and callback object to event loop
2824 		static if (EPOLL)
2825 		{
2826 			epoll_event _event = void;
2827 			_event.data.ptr = ev;
2828 			_event.events = 0 | EPOLLIN | EPOLLOUT | EPOLLERR | EPOLLHUP | EPOLLRDHUP | EPOLLET;
2829 			err = epoll_ctl(m_epollfd, EPOLL_CTL_ADD, fd, &_event);
2830 			static if (LOG) log("Connection FD#" ~ fd.to!string ~ " added to " ~ m_epollfd.to!string);
2831 			if (catchError!"epoll_ctl_add"(err))
2832 				return destroyEvInfo();
2833 
2834 			nothrow void deregisterEvent() {
2835 				// will be handled automatically when socket is closed
2836 			}
2837 		}
2838 		else /* if KQUEUE */
2839 		{
2840 			kevent_t[2] events = void;
2841 			static if (LOG) try log("Register event ptr " ~ ev.to!string); catch (Throwable e) {}
2842 			assert(ev.evType == EventType.TCPTraffic, "Bad event type for TCP Connection");
2843 			EV_SET(&(events[0]), fd, EVFILT_READ, EV_ADD | EV_ENABLE | EV_CLEAR, 0, 0, cast(void*) ev);
2844 			EV_SET(&(events[1]), fd, EVFILT_WRITE, EV_ADD | EV_ENABLE | EV_CLEAR, 0, 0, cast(void*) ev);
2845 			assert((cast(EventInfo*)events[0].udata) == ev && (cast(EventInfo*)events[1].udata) == ev);
2846 			assert((cast(EventInfo*)events[0].udata).owner == m_instanceId && (cast(EventInfo*)events[1].udata).owner == m_instanceId);
2847 			err = kevent(m_kqueuefd, &(events[0]), 2, null, 0, null);
2848 			if (catchError!"kevent_add_tcp"(err))
2849 				return destroyEvInfo();
2850 
2851 			// todo: verify if this allocates on the GC?
2852 			nothrow void deregisterEvent() {
2853 				EV_SET(&(events[0]), fd, EVFILT_READ, EV_DELETE | EV_DISABLE, 0, 0, null);
2854 				EV_SET(&(events[1]), fd, EVFILT_WRITE, EV_DELETE | EV_DISABLE, 0, 0, null);
2855 				kevent(m_kqueuefd, &(events[0]), 2, null, 0, null);
2856 				// wouldn't know how to deal with errors here...
2857 			}
2858 		}
2859 
2860 		// Inbound objects are already connected
2861 		if (inbound) return true;
2862 
2863 		// Connect is blocking, but this makes the socket non-blocking for send/recv
2864 		if (!setNonBlock(fd)) {
2865 			deregisterEvent();
2866 			return destroyEvInfo();
2867 		}
2868 
2869 		/// Start the connection
2870 		err = connect(fd, ctxt.peer.sockAddr, ctxt.peer.sockAddrLen);
2871 		if (catchErrorsEq!"connect"(err, [ tuple(cast(fd_t)SOCKET_ERROR, EPosix.EINPROGRESS, Status.ASYNC) ]))
2872 			return true;
2873 		if (catchError!"connect"(err)) {
2874 			deregisterEvent();
2875 			return destroyEvInfo();
2876 		}
2877 
2878 		return true;
2879 	}
2880 
2881 	pragma(inline, true)
2882 	bool catchError(string TRACE, T)(T val, T cmp = SOCKET_ERROR)
2883 		if (isIntegral!T)
2884 	{
2885 		if (val == cmp) {
2886 			m_status.text = TRACE;
2887 			m_error = lastError();
2888 			m_status.code = Status.ABORT;
2889 			static if(LOG) log(m_status);
2890 			return true;
2891 		}
2892 		return false;
2893 	}
2894 
2895 	pragma(inline, true)
2896 	bool catchSocketError(string TRACE)(fd_t fd)
2897 	{
2898 		m_status.text = TRACE;
2899 		int err;
2900 		import libasync.internals.socket_compat : getsockopt, socklen_t, SOL_SOCKET, SO_ERROR;
2901 		socklen_t len = int.sizeof;
2902 		getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len);
2903 		m_error = cast(error_t) err;
2904 		if (m_error != EPosix.EOK) {
2905 			m_status.code = Status.ABORT;
2906 			static if(LOG) log(m_status);
2907 			return true;
2908 		}
2909 
2910 		return false;
2911 	}
2912 
2913 	bool catchEvLoopErrors(string TRACE, T)(T val, Tuple!(T, Status)[] cmp ...)
2914 		if (isIntegral!T)
2915 	{
2916 		if (val == SOCKET_ERROR) {
2917 			int err = errno;
2918 			foreach (validator ; cmp) {
2919 				if (errno == validator[0]) {
2920 					m_status.text = TRACE;
2921 					m_error = lastError();
2922 					m_status.code = validator[1];
2923 					static if(LOG) log(m_status);
2924 					return true;
2925 				}
2926 			}
2927 
2928 			m_status.text = TRACE;
2929 			m_status.code = Status.EVLOOP_FAILURE;
2930 			m_error = lastError();
2931 			static if (LOG) log(m_status);
2932 			return true;
2933 		}
2934 		return false;
2935 	}
2936 
2937 	/**
2938 	 * If the value at val matches the tuple first argument T, get the last error,
2939 	 * and if the last error matches tuple second argument error_t, set the Status as
2940 	 * tuple third argument Status.
2941 	 *
2942 	 * Repeats for each comparison tuple until a match in which case returns true.
2943 	 */
2944 	bool catchErrorsEq(string TRACE, T)(T val, Tuple!(T, error_t, Status)[] cmp ...)
2945 		if (isIntegral!T)
2946 	{
2947 		error_t err;
2948 		foreach (validator ; cmp) {
2949 			if (val == validator[0]) {
2950 				if (err is EPosix.init) err = lastError();
2951 				if (err == validator[1]) {
2952 					m_status.text = TRACE;
2953 					m_status.code = validator[2];
2954 					if (m_status.code == Status.EVLOOP_TIMEOUT) {
2955 						static if (LOG) log(m_status);
2956 						break;
2957 					}
2958 					m_error = lastError();
2959 					static if(LOG) log(m_status);
2960 					return true;
2961 				}
2962 			}
2963 		}
2964 		return false;
2965 	}
2966 
2967 	pragma(inline, true)
2968 	error_t lastError() {
2969 		try {
2970 			return cast(error_t) errno;
2971 		} catch(Exception e) {
2972 			return EPosix.EACCES;
2973 		}
2974 
2975 	}
2976 
2977 	void log(StatusInfo val)
2978 	{
2979 		static if (LOG) {
2980 			import std.stdio;
2981 			try {
2982 				writeln("Backtrace: ", m_status.text);
2983 				writeln(" | Status:  ", m_status.code);
2984 				writeln(" | Error: " , m_error);
2985 				if ((m_error in EPosixMessages) !is null)
2986 					writeln(" | Message: ", EPosixMessages[m_error]);
2987 			} catch(Exception e) {
2988 				return;
2989 			}
2990 		}
2991 	}
2992 
2993 	void log(T)(T val)
2994 	{
2995 		static if (LOG) {
2996 			import std.stdio;
2997 			try {
2998 				writeln(val);
2999 			} catch(Exception e) {
3000 				return;
3001 			}
3002 		}
3003 	}
3004 
3005 	NetworkAddress getAddressInfo(addrinfo)(in string host, ushort port, bool ipv6, bool tcp, ref addrinfo hints)
3006 	{
3007 		m_status = StatusInfo.init;
3008 		import libasync.internals.socket_compat : AF_INET, AF_INET6, SOCK_DGRAM, SOCK_STREAM, IPPROTO_TCP, IPPROTO_UDP, freeaddrinfo, getaddrinfo;
3009 
3010 		NetworkAddress addr;
3011 		addrinfo* infos;
3012 		error_t err;
3013 		if (ipv6) {
3014 			addr.family = AF_INET6;
3015 			hints.ai_family = AF_INET6;
3016 		}
3017 		else {
3018 			addr.family = AF_INET;
3019 			hints.ai_family = AF_INET;
3020 		}
3021 		if (tcp) {
3022 			hints.ai_socktype = SOCK_STREAM;
3023 			hints.ai_protocol = IPPROTO_TCP;
3024 		}
3025 		else {
3026 			hints.ai_socktype = SOCK_DGRAM;
3027 			hints.ai_protocol = IPPROTO_UDP;
3028 		}
3029 
3030 		static if (LOG) {
3031 			log("Resolving " ~ host ~ ":" ~ port.to!string);
3032 		}
3033 
3034 		auto chost = host.toStringz();
3035 
3036 		if (port != 0) {
3037 			addr.port = port;
3038 			const(char)* cPort = cast(const(char)*) port.to!string.toStringz;
3039 			err = cast(error_t) getaddrinfo(chost, cPort, &hints, &infos);
3040 		}
3041 		else {
3042 			err = cast(error_t) getaddrinfo(chost, null, &hints, &infos);
3043 		}
3044 
3045 		if (err != EPosix.EOK) {		
3046 
3047 			/// Unfortunately, glibc < 2.26 has a bug that the DNS resolver caches the contents
3048 			/// of /etc/resolve.conf. (See https://sourceware.org/bugzilla/show_bug.cgi?id=984)
3049 			/// An issue of Pidgen bug tracker(https://developer.pidgin.im/ticket/2825) shows
3050 			/// that calling res_init to refresh the nameserver list.
3051 			version (CRuntime_Glibc)
3052 			{
3053 				version (linux)
3054 				{
3055 					__res_init();
3056 				}
3057 				/// At least res_init isn't thread-safe on OSX/iOS, so nothing to do.
3058 			}
3059 			version(iOS) {
3060 				// ios uses a different error reporting for getaddrinfo
3061 				import libasync.internals.socket_compat : gai_strerror;
3062 				import std.string : fromStringz;
3063 				setInternalError!"getAddressInfo"(Status.ERROR, gai_strerror(cast(int) err).fromStringz.to!string);
3064 			}
3065 			else setInternalError!"getAddressInfo"(Status.ERROR, string.init, err);
3066 			return NetworkAddress.init;
3067 		}
3068 		ubyte* pAddr = cast(ubyte*) infos.ai_addr;
3069 		ubyte* data = cast(ubyte*) addr.sockAddr;
3070 		data[0 .. infos.ai_addrlen] = pAddr[0 .. infos.ai_addrlen]; // perform bit copy
3071 		freeaddrinfo(infos);
3072 		return addr;
3073 	}
3074 
3075 
3076 
3077 }
3078 
3079 
3080 static if (!EPOLL)
3081 {
3082 	import std.container : Array;
3083 	import core.sync.mutex : Mutex;
3084 	import core.sync.rwmutex : ReadWriteMutex;
3085 	size_t g_evIdxCapacity;
3086 	Array!size_t g_evIdxAvailable;
3087 
3088 	// called on run
3089 	nothrow size_t createIndex() {
3090 		size_t idx;
3091 		import std.algorithm : max;
3092 		try {
3093 
3094 			size_t getIdx() {
3095 
3096 				if (!g_evIdxAvailable.empty) {
3097 					immutable size_t ret = g_evIdxAvailable.back;
3098 					g_evIdxAvailable.removeBack();
3099 					return ret;
3100 				}
3101 				return 0;
3102 			}
3103 
3104 			idx = getIdx();
3105 			if (idx == 0) {
3106 				import std.range : iota;
3107 				g_evIdxAvailable.insert( iota(g_evIdxCapacity, max(32, g_evIdxCapacity * 2), 1) );
3108 				g_evIdxCapacity = max(32, g_evIdxCapacity * 2);
3109 				idx = getIdx();
3110 			}
3111 
3112 		} catch (Throwable e) {
3113 			static if (DEBUG) {
3114 				import std.stdio : writeln;
3115 				try writeln(e.toString()); catch (Throwable e) {}
3116 			}
3117 
3118 		}
3119 		return idx;
3120 	}
3121 
3122 	nothrow void destroyIndex(AsyncNotifier ctxt) {
3123 		try {
3124 			g_evIdxAvailable.insert(ctxt.id);
3125 		}
3126 		catch (Exception e) {
3127 			assert(false, "Error destroying index: " ~ e.msg);
3128 		}
3129 	}
3130 
3131 	nothrow void destroyIndex(AsyncTimer ctxt) {
3132 		try {
3133 			g_evIdxAvailable.insert(ctxt.id);
3134 		}
3135 		catch (Exception e) {
3136 			assert(false, "Error destroying index: " ~ e.msg);
3137 		}
3138 	}
3139 
3140 	size_t* g_threadId;
3141 	size_t g_idxCapacity;
3142 	Array!size_t g_idxAvailable;
3143 
3144 	__gshared ReadWriteMutex gs_queueMutex;
3145 	__gshared Array!(Array!AsyncSignal) gs_signalQueue;
3146 	__gshared Array!(Array!size_t) gs_idxQueue; // signals notified
3147 
3148 
3149 	// loop
3150 	nothrow bool popSignals(ref AsyncSignal[] sigarr) {
3151 		bool more;
3152 		try {
3153 			foreach (ref AsyncSignal sig; sigarr) {
3154 				if (!sig)
3155 					break;
3156 				sig = null;
3157 			}
3158 			size_t len;
3159 			synchronized(gs_queueMutex.reader) {
3160 
3161 				if (gs_idxQueue.length <= *g_threadId || gs_idxQueue[*g_threadId].empty)
3162 					return false;
3163 
3164 				len = gs_idxQueue[*g_threadId].length;
3165 				import std.stdio;
3166 				if (sigarr.length < len) {
3167 					more = true;
3168 					len = sigarr.length;
3169 				}
3170 
3171 				size_t i;
3172 				foreach (size_t idx; gs_idxQueue[*g_threadId][0 .. len]){
3173 					sigarr[i] = gs_signalQueue[*g_threadId][idx];
3174 					i++;
3175 				}
3176 			}
3177 
3178 			synchronized (gs_queueMutex.writer) {
3179 				gs_idxQueue[*g_threadId].linearRemove(gs_idxQueue[*g_threadId][0 .. len]);
3180 			}
3181 		}
3182 		catch (Exception e) {
3183 			assert(false, "Could not get pending signals: " ~ e.msg);
3184 		}
3185 		return more;
3186 	}
3187 
3188 	// notify
3189 	nothrow void addSignal(shared AsyncSignal ctxt) {
3190 		try {
3191 			size_t thread_id = ctxt.threadId;
3192 			bool must_resize;
3193 			import std.stdio;
3194 			synchronized (gs_queueMutex.writer) {
3195 				if (gs_idxQueue.empty || gs_idxQueue.length < thread_id + 1) {
3196 					gs_idxQueue.reserve(thread_id + 1);
3197 					foreach (i; gs_idxQueue.length .. gs_idxQueue.capacity) {
3198 						gs_idxQueue.insertBack(Array!size_t.init);
3199 					}
3200 				}
3201 				if (gs_idxQueue[thread_id].empty)
3202 				{
3203 					gs_idxQueue[thread_id].reserve(32);
3204 				}
3205 
3206 				gs_idxQueue[thread_id].insertBack(ctxt.id);
3207 
3208 			}
3209 
3210 		}
3211 		catch (Exception e) {
3212 			assert(false, "Array error: " ~ e.msg);
3213 		}
3214 	}
3215 
3216 	// called on run
3217 	nothrow size_t createIndex(shared AsyncSignal ctxt) {
3218 		size_t idx;
3219 		import std.algorithm : max;
3220 		try {
3221 			bool must_resize;
3222 
3223 			synchronized (gs_queueMutex.reader) {
3224 				if (gs_signalQueue.length < *g_threadId)
3225 					must_resize = true;
3226 			}
3227 
3228 			/// make sure the signal queue is big enough for this thread ID
3229 			if (must_resize) {
3230 				synchronized (gs_queueMutex.writer) {
3231 					while (gs_signalQueue.length <= *g_threadId)
3232 						gs_signalQueue.insertBack(Array!AsyncSignal.init);
3233 				}
3234 			}
3235 
3236 			size_t getIdx() {
3237 
3238 				if (!g_idxAvailable.empty) {
3239 					immutable size_t ret = g_idxAvailable.back;
3240 					g_idxAvailable.removeBack();
3241 					return ret;
3242 				}
3243 				return 0;
3244 			}
3245 
3246 			idx = getIdx();
3247 			if (idx == 0) {
3248 				import std.range : iota;
3249 				g_idxAvailable.insert( iota(g_idxCapacity + 1,  max(32, g_idxCapacity * 2), 1) );
3250 				g_idxCapacity = g_idxAvailable[$-1];
3251 				idx = getIdx();
3252 			}
3253 
3254 			synchronized (gs_queueMutex.writer) {
3255 				if (gs_signalQueue.empty || gs_signalQueue.length < *g_threadId + 1) {
3256 
3257 					gs_signalQueue.reserve(*g_threadId + 1);
3258 					foreach (i; gs_signalQueue.length .. gs_signalQueue.capacity) {
3259 						gs_signalQueue.insertBack(Array!AsyncSignal.init);
3260 					}
3261 
3262 				}
3263 
3264 				if (gs_signalQueue[*g_threadId].empty || gs_signalQueue[*g_threadId].length < idx + 1) {
3265 
3266 					gs_signalQueue[*g_threadId].reserve(idx + 1);
3267 					foreach (i; gs_signalQueue[*g_threadId].length .. gs_signalQueue[*g_threadId].capacity) {
3268 						gs_signalQueue[*g_threadId].insertBack(cast(AsyncSignal)null);
3269 					}
3270 
3271 				}
3272 
3273 				gs_signalQueue[*g_threadId][idx] = cast(AsyncSignal) ctxt;
3274 			}
3275 		} catch(Throwable) {}
3276 
3277 		return idx;
3278 	}
3279 
3280 	// called on kill
3281 	nothrow void destroyIndex(shared AsyncSignal ctxt) {
3282 		try {
3283 			g_idxAvailable.insert(ctxt.id);
3284 			synchronized (gs_queueMutex.writer) {
3285 				gs_signalQueue[*g_threadId][ctxt.id] = null;
3286 			}
3287 		}
3288 		catch (Exception e) {
3289 			assert(false, "Error destroying index: " ~ e.msg);
3290 		}
3291 	}
3292 }
3293 
3294 mixin template COSocketMixins() {
3295 
3296 	private CleanupData m_impl;
3297 
3298 	struct CleanupData {
3299 		EventInfo* evInfo;
3300 		bool connected;
3301 		bool disconnecting;
3302 		bool writeBlocked;
3303 		bool readBlocked;
3304 	}
3305 
3306 	@property bool disconnecting() const @safe pure @nogc {
3307 		return m_impl.disconnecting;
3308 	}
3309 
3310 	@property void disconnecting(bool b) @safe pure @nogc {
3311 		m_impl.disconnecting = b;
3312 	}
3313 
3314 	@property bool connected() const @safe pure @nogc {
3315 		return m_impl.connected;
3316 	}
3317 
3318 	@property void connected(bool b) @safe pure @nogc {
3319 		m_impl.connected = b;
3320 	}
3321 
3322 	@property bool writeBlocked() const @safe pure @nogc {
3323 		return m_impl.writeBlocked;
3324 	}
3325 
3326 	@property void writeBlocked(bool b) @safe pure @nogc {
3327 		m_impl.writeBlocked = b;
3328 	}
3329 
3330 	@property bool readBlocked() const @safe pure @nogc {
3331 		return m_impl.readBlocked;
3332 	}
3333 
3334 	@property void readBlocked(bool b) @safe pure @nogc {
3335 		m_impl.readBlocked = b;
3336 	}
3337 
3338 	@property EventInfo* evInfo() @safe pure @nogc {
3339 		return m_impl.evInfo;
3340 	}
3341 
3342 	@property void evInfo(EventInfo* info) @safe pure @nogc {
3343 		m_impl.evInfo = info;
3344 	}
3345 
3346 }
3347 
3348 mixin template EvInfoMixinsShared() {
3349 
3350 	private CleanupData m_impl;
3351 
3352 	shared struct CleanupData {
3353 		EventInfo* evInfo;
3354 	}
3355 
3356 	static if (EPOLL) {
3357 		import core.sys.posix.pthread : pthread_t;
3358 		private pthread_t m_pthreadId;
3359 		synchronized @property pthread_t pthreadId() {
3360 			return cast(pthread_t) m_pthreadId;
3361 		}
3362 		/* todo: support multiple event loops per thread?
3363 		 private ushort m_sigId;
3364 		 synchronized @property ushort sigId() {
3365 		 return cast(ushort)m_loopId;
3366 		 }
3367 		 synchronized @property void sigId(ushort id) {
3368 		 m_loopId = cast(shared)id;
3369 		 }
3370 		 */
3371 	}
3372 	else /* if KQUEUE */
3373 	{
3374 		private shared(size_t)* m_owner_id;
3375 		synchronized @property size_t threadId() {
3376 			return cast(size_t) *m_owner_id;
3377 		}
3378 	}
3379 
3380 	@property shared(EventInfo*) evInfo() {
3381 		return m_impl.evInfo;
3382 	}
3383 
3384 	@property void evInfo(shared(EventInfo*) info) {
3385 		m_impl.evInfo = info;
3386 	}
3387 
3388 }
3389 
3390 mixin template EvInfoMixins() {
3391 
3392 	private CleanupData m_impl;
3393 
3394 	struct CleanupData {
3395 		EventInfo* evInfo;
3396 	}
3397 
3398 	@property EventInfo* evInfo() {
3399 		return m_impl.evInfo;
3400 	}
3401 
3402 	@property void evInfo(EventInfo* info) {
3403 		m_impl.evInfo = info;
3404 	}
3405 }
3406 
3407 union EventObject {
3408 	TCPAcceptHandler tcpAcceptHandler;
3409 	TCPEventHandler tcpEvHandler;
3410 	AsyncSocket socket;
3411 	TimerHandler timerHandler;
3412 	DWHandler dwHandler;
3413 	UDPHandler udpHandler;
3414 	NotifierHandler notifierHandler;
3415 	EventHandler eventHandler;
3416 }
3417 
3418 enum EventType : char {
3419 	TCPAccept,
3420 	TCPTraffic,
3421 	UDPSocket,
3422 	Socket,
3423 	Notifier,
3424 	Signal,
3425 	Timer,
3426 	DirectoryWatcher,
3427 	Event // custom
3428 }
3429 
3430 struct EventInfo {
3431 	fd_t fd;
3432 	EventType evType;
3433 	EventObject evObj;
3434 	ushort owner;
3435 }