1 module libasync.windows;
2 
3 version (Windows):
4 
5 import core.atomic;
6 import core.thread : Fiber;
7 import libasync.types;
8 import std.container : Array;
9 import std..string : toStringz;
10 import std.conv : to;
11 import std.datetime : Duration, msecs, seconds;
12 import std.algorithm : min;
13 import std.exception;
14 import libasync.internals.win32;
15 import libasync.internals.logging;
16 import std.traits : isIntegral;
17 import std.typecons : Tuple, tuple;
18 import std.utf : toUTFz;
19 import core.sync.mutex;
20 import libasync.events;
21 import memutils.utils;
22 import memutils.hashmap;
23 import memutils.vector;
24 pragma(lib, "ws2_32");
25 pragma(lib, "ole32");
26 alias fd_t = SIZE_T;
27 alias error_t = EWIN;
28 
29 //todo :  see if new connections with SO_REUSEADDR are evenly distributed between threads
30 
31 
32 package struct EventLoopImpl {
33 	pragma(msg, "Using Windows message-based notifications and alertable IO for events");
34 
35 private:
36 	HashMap!(fd_t, TCPAcceptHandler) m_connHandlers; // todo: Change this to an array
37 	HashMap!(fd_t, TCPEventHandler) m_tcpHandlers;
38 	HashMap!(fd_t, TimerHandler) m_timerHandlers;
39 	HashMap!(fd_t, UDPHandler) m_udpHandlers;
40 	HashMap!(fd_t, DWHandlerInfo) m_dwHandlers; // todo: Change this to an array too
41 	HashMap!(uint, DWFolderWatcher) m_dwFolders;
42 	HashMap!(fd_t, tcp_keepalive)* kcache;
43 	~this() { kcache.destroy(); }
44 nothrow:
45 private:
46 	struct TimerCache {
47 		TimerHandler cb;
48 		fd_t fd;
49 	}
50 	TimerCache m_timer;
51 
52 	EventLoop m_evLoop;
53 	bool m_started;
54 	wstring m_window;
55 	HWND m_hwnd;
56 	DWORD m_threadId;
57 	ushort m_instanceId;
58 	StatusInfo m_status;
59 	error_t m_error = EWIN.WSA_OK;
60 	__gshared Mutex gs_mtx;
61 
62 	HANDLE[] m_waitObjects;
63 	AsyncOverlapped*[AsyncSocket] m_pendingConnects;
64 	bool[AsyncOverlapped*] m_pendingAccepts;
65 
66 	@property HANDLE pendingConnectEvent()
67 	{ return m_waitObjects[0]; }
68 
69 	@property HANDLE pendingAcceptEvent()
70 	{ return m_waitObjects[1]; }
71 
72 	AsyncAcceptRequest.Queue  m_completedSocketAccepts;
73 	AsyncReceiveRequest.Queue m_completedSocketReceives;
74 	AsyncSendRequest.Queue    m_completedSocketSends;
75 package:
76 	@property bool started() const {
77 		return m_started;
78 	}
79 	bool init(EventLoop evl)
80 	in { assert(!m_started); }
81 	body
82 	{
83 		try if (!gs_mtx)
84 			gs_mtx = new Mutex; catch (Throwable) {}
85 		static ushort j;
86 		assert (j == 0, "Current implementation is only tested with 1 event loop per thread. There are known issues with signals on linux.");
87 		j += 1;
88 		m_status = StatusInfo.init;
89 
90 		import core.thread;
91 		//try Thread.getThis().priority = Thread.PRIORITY_MAX;
92 		//catch (Exception e) { assert(false, "Could not set thread priority"); }
93 		SetThreadPriority(GetCurrentThread(), 31);
94 		m_evLoop = evl;
95 		shared static ushort i;
96 		m_instanceId = i;
97 		core.atomic.atomicOp!"+="(i, cast(ushort) 1);
98 		wstring inststr;
99 		import std.conv : to;
100 		try { inststr = m_instanceId.to!wstring; }
101 		catch (Exception e) {
102 			return false;
103 		}
104 		m_window = "VibeWin32MessageWindow" ~ inststr;
105 		wstring classname = "VibeWin32MessageWindow" ~ inststr;
106 
107 		LPCWSTR wnz;
108 		LPCWSTR clsn;
109 		try {
110 			wnz = cast(LPCWSTR) m_window.toUTFz!(immutable(wchar)*);
111 			clsn = cast(LPCWSTR) classname.toUTFz!(immutable(wchar)*);
112 		} catch (Exception e) {
113 			setInternalError!"toUTFz"(Status.ERROR, e.msg);
114 			return false;
115 		}
116 
117 		m_threadId = GetCurrentThreadId();
118 		WNDCLASSW wc;
119 		wc.lpfnWndProc = &wndProc;
120 		wc.lpszClassName = clsn;
121 		RegisterClassW(&wc);
122 		m_hwnd = CreateWindowW(wnz, clsn, 0, 0, 0, 385, 375, HWND_MESSAGE,
123 		                       cast(HMENU) null, null, null);
124 		static if (LOG) try log("Window registered: " ~ m_hwnd.to!string); catch (Throwable) {}
125 		auto ptr = cast(ULONG_PTR)cast(void*)&this;
126 		SetWindowLongPtrA(m_hwnd, GWLP_USERDATA, ptr);
127 		assert( cast(EventLoopImpl*)cast(void*)GetWindowLongPtrA(m_hwnd, GWLP_USERDATA) is &this );
128 		WSADATA wd;
129 		m_error = cast(error_t) WSAStartup(0x0202, &wd);
130 		if (m_error == EWIN.WSA_OK)
131 			m_status.code = Status.OK;
132 		else {
133 			m_status.code = Status.ABORT;
134 			static if(LOG) log(m_status);
135 			return false;
136 		}
137 		assert(wd.wVersion == 0x0202);
138 
139 		auto dummySocket = socket(AF_INET6, SOCK_STREAM, 0);
140 		if (dummySocket == INVALID_SOCKET) return false;
141 		scope (exit) closesocket(dummySocket);
142 
143 		DWORD bytesReturned;
144 
145 		if (WSAIoctl(dummySocket,
146 		             SIO_GET_EXTENSION_FUNCTION_POINTER,
147 		             &WSAID_ACCEPTEX, GUID.sizeof,
148 		             &AcceptEx, AcceptEx.sizeof,
149 		             &bytesReturned,
150 		             null, null) == SOCKET_ERROR) {
151 			m_error = WSAGetLastErrorSafe();
152 			m_status.code = Status.ABORT;
153 			return false;
154 		}
155 
156 		if (WSAIoctl(dummySocket,
157 		             SIO_GET_EXTENSION_FUNCTION_POINTER,
158 		             &WSAID_GETACCEPTEXSOCKADDRS, GUID.sizeof,
159 		             &GetAcceptExSockaddrs, GetAcceptExSockaddrs.sizeof,
160 		             &bytesReturned,
161 		             null, null) == SOCKET_ERROR) {
162 			m_error = WSAGetLastErrorSafe();
163 			m_status.code = Status.ABORT;
164 			return false;
165 		}
166 
167 		if (WSAIoctl(dummySocket,
168 		             SIO_GET_EXTENSION_FUNCTION_POINTER,
169 		             &WSAID_CONNECTEX, GUID.sizeof,
170 		             &ConnectEx, ConnectEx.sizeof,
171 		             &bytesReturned,
172 		             null, null) == SOCKET_ERROR) {
173 			m_error = WSAGetLastErrorSafe();
174 			m_status.code = Status.ABORT;
175 			return false;
176 		}
177 
178 		if (WSAIoctl(dummySocket,
179 		             SIO_GET_EXTENSION_FUNCTION_POINTER,
180 		             &WSAID_DISCONNECTEX, GUID.sizeof,
181 		             &DisconnectEx, DisconnectEx.sizeof,
182 		             &bytesReturned,
183 		             null, null) == SOCKET_ERROR) {
184 			m_error = WSAGetLastErrorSafe();
185 			m_status.code = Status.ABORT;
186 			return false;
187 		}
188 
189 		// Event for pending ConnectEx requests
190 		m_waitObjects ~= CreateEvent(null, false, false, null);
191 		// Event for pending AcceptEx requests
192 		m_waitObjects ~= CreateEvent(null, false, false, null);
193 
194 		m_started = true;
195 		return true;
196 	}
197 
198 	// todo: find where to call this
199 	void exit() {
200 		cast(void)PostThreadMessageW(m_threadId, WM_QUIT, 0, 0);
201 	}
202 
203 	@property StatusInfo status() const {
204 		return m_status;
205 	}
206 
207 	@property string error() const {
208 		string* ptr;
209 		string pv = ((ptr = (m_error in EWSAMessages)) !is null) ? *ptr : string.init;
210 		return pv;
211 	}
212 
213 	bool loop(Duration timeout = 0.seconds)
214 	in {
215 		assert(Fiber.getThis() is null);
216 		assert(m_started);
217 	}
218 	body {
219 		DWORD msTimeout;
220 
221 		if (timeout == -1.seconds)
222 			msTimeout = DWORD.max;
223 		else msTimeout = cast(DWORD) min(timeout.total!"msecs", DWORD.max);
224 
225 		/*
226 		 * Waits until one or all of the specified objects are in the signaled state
227 		 * http://msdn.microsoft.com/en-us/library/windows/desktop/ms684245%28v=vs.85%29.aspx
228 		*/
229 		m_status = StatusInfo.init;
230 		DWORD signal = MsgWaitForMultipleObjectsEx(
231 			cast(DWORD) m_waitObjects.length,
232 			m_waitObjects.ptr,
233 			msTimeout,
234 			QS_ALLEVENTS,
235 			MWMO_ALERTABLE | MWMO_INPUTAVAILABLE		// MWMO_ALERTABLE: Wakes up to execute overlapped hEvent (i/o completion)
236 			// MWMO_INPUTAVAILABLE: Processes key/mouse input to avoid window ghosting
237 			);
238 
239 		auto errors =
240 		[ tuple(WAIT_FAILED, Status.EVLOOP_FAILURE) ];	/* WAIT_FAILED: Failed to call MsgWait..() */
241 
242 		if (signal == WAIT_TIMEOUT) {
243 			return true;
244 		}
245 
246 		if (signal == WAIT_IO_COMPLETION) {
247 			if (m_status.code != Status.OK) return false;
248 
249 			foreach (request; m_completedSocketReceives) {
250 				if (request.socket.receiveContinuously) {
251 					m_completedSocketReceives.removeFront();
252 					assumeWontThrow(request.onComplete.get!0)(request.message.transferred);
253 					if (request.socket.receiveContinuously && request.socket.alive) {
254 						request.message.count = 0;
255 						submitRequest(request);
256 					} else {
257 						assumeWontThrow(NetworkMessage.free(request.message));
258 						assumeWontThrow(AsyncReceiveRequest.free(request));
259 					}
260 				} else {
261 					m_completedSocketReceives.removeFront();
262 					if (request.message) {
263 						assumeWontThrow(request.onComplete.get!0)(request.message.transferred);
264 						assumeWontThrow(NetworkMessage.free(request.message));
265 					} else {
266 						assumeWontThrow(request.onComplete.get!1)();
267 					}
268 					assumeWontThrow(AsyncReceiveRequest.free(request));
269 				}
270 			}
271 
272 			foreach (request; m_completedSocketSends) {
273 				m_completedSocketSends.removeFront();
274 				request.onComplete();
275 				assumeWontThrow(NetworkMessage.free(request.message));
276 				assumeWontThrow(AsyncSendRequest.free(request));
277 			}
278 
279 			signal = MsgWaitForMultipleObjectsEx(
280 				cast(DWORD) m_waitObjects.length,
281 				m_waitObjects.ptr,
282 				0,
283 				QS_ALLEVENTS,
284 				MWMO_INPUTAVAILABLE // MWMO_INPUTAVAILABLE: Processes key/mouse input to avoid window ghosting
285 				);
286 			if (signal == WAIT_TIMEOUT) {
287 				return true;
288 			}
289 		}
290 
291 		if (catchErrors!"MsgWaitForMultipleObjectsEx"(signal, errors)) {
292 			static if (LOG) log("Event Loop Exiting because of error");
293 			return false;
294 		}
295 
296 		// Input messages
297 		if (signal == WAIT_OBJECT_0 + m_waitObjects.length) {
298 			MSG msg;
299 			while (PeekMessageW(&msg, null, 0, 0, PM_REMOVE)) {
300 				m_status = StatusInfo.init;
301 				TranslateMessage(&msg);
302 				DispatchMessageW(&msg);
303 
304 				if (m_status.code == Status.ERROR) {
305 					static if (LOG) log(m_status.text);
306 					return false;
307 				}
308 			}
309 			return true;
310 		}
311 
312 		// Events
313 		DWORD transferred, flags;
314 		switch (signal - WAIT_OBJECT_0) {
315 			// ConnectEx completion
316 			case 0:
317 				foreach (ref pendingConnect; m_pendingConnects.byKeyValue()) {
318 					auto socket = pendingConnect.key;
319 					auto overlapped = pendingConnect.value;
320 
321 					if (WSAGetOverlappedResult(socket.handle,
322 					                           &overlapped.overlapped,
323 					                           &transferred,
324 					                           false,
325 					                           &flags)) {
326 						m_pendingConnects.remove(socket);
327 						assumeWontThrow(AsyncOverlapped.free(overlapped));
328 						if (updateConnectContext(socket.handle)) {
329 							socket.handleConnect();
330 							return true;
331 						} else {
332 							socket.kill();
333 							socket.handleError();
334 							return false;
335 						}
336 					} else {
337 						m_error = WSAGetLastErrorSafe();
338 						if (m_error == WSA_IO_INCOMPLETE) {
339 							continue;
340 						} else {
341 							m_status.code = Status.ABORT;
342 							socket.kill();
343 							socket.handleError();
344 							return false;
345 						}
346 					}
347 				}
348 				break;
349 			// AcceptEx completion
350 			case 1:
351 				foreach (overlapped; cast(AsyncOverlapped*[]) m_pendingAccepts.keys) {
352 					auto request = overlapped.accept;
353 					auto socket = request.socket;
354 
355 					if (WSAGetOverlappedResult(socket.handle,
356 					                           &overlapped.overlapped,
357 											   &transferred,
358 											   false,
359 											   &flags)) {
360 						m_pendingAccepts.remove(overlapped);
361 						assumeWontThrow(AsyncOverlapped.free(overlapped));
362 						m_completedSocketAccepts.insertBack(request);
363 					} else {
364 						m_error = WSAGetLastErrorSafe();
365 						if (m_error == WSA_IO_INCOMPLETE) {
366 							continue;
367 						} else {
368 							m_status.code = Status.ABORT;
369 							m_pendingAccepts.remove(overlapped);
370 							assumeWontThrow(AsyncOverlapped.free(overlapped));
371 							assumeWontThrow(AsyncAcceptRequest.free(request));
372 							socket.kill();
373 							socket.handleError();
374 							return false;
375 						}
376 					}
377 				}
378 				foreach (request; m_completedSocketAccepts) {
379 					sockaddr* localAddress, remoteAddress;
380 					socklen_t localAddressLength, remoteAddressLength;
381 
382 					GetAcceptExSockaddrs(request.buffer.ptr,
383 										 0,
384 										 cast(DWORD) request.buffer.length / 2,
385 										 cast(DWORD) request.buffer.length / 2,
386 										 &localAddress,
387 										 &localAddressLength,
388 										 &remoteAddress,
389 										 &remoteAddressLength);
390 
391 					m_completedSocketAccepts.removeFront();
392 					if (!onAccept(request.socket.handle, request, remoteAddress)) {
393 						return false;
394 					}
395 				}
396 				break;
397 			default:
398 				.warning("Unknown event was triggered: ", signal);
399 				break;
400 		}
401 
402 		return true;
403 	}
404 
405 	bool run(AsyncEvent ctxt, EventHandler del)
406 	{
407 		return true;
408 	}
409 
410 	fd_t run(AsyncTCPListener ctxt, TCPAcceptHandler del)
411 	{
412 		m_status = StatusInfo.init;
413 		fd_t fd = ctxt.socket;
414 		bool reusing;
415 		if (fd == fd_t.init) {
416 
417 			fd = WSASocketW(cast(int)ctxt.local.family, SOCK_STREAM, IPPROTO_TCP, null, 0, WSA_FLAG_OVERLAPPED);
418 
419 			if (catchSocketError!("run AsyncTCPConnection")(fd, INVALID_SOCKET))
420 				return 0;
421 
422 			if (!setOption(fd, TCPOption.REUSEADDR, true)) {
423 				closeSocket(fd, false);
424 				return 0;
425 			}
426 			// todo: defer accept?
427 
428 			if (ctxt.noDelay) {
429 				if (!setOption(fd, TCPOption.NODELAY, true)) {
430 					closeSocket(fd, false);
431 					return 0;
432 				}
433 			}
434 		} else reusing = true;
435 
436 		if (initTCPListener(fd, ctxt, reusing))
437 		{
438 			try {
439 				static if (LOG) log("Running listener on socket fd#" ~ fd.to!string);
440 				m_connHandlers[fd] = del;
441 				version(Distributed)ctxt.init(m_hwnd, fd);
442 			}
443 			catch (Exception e) {
444 				setInternalError!"m_connHandlers assign"(Status.ERROR, e.msg);
445 				closeSocket(fd, false);
446 				return 0;
447 			}
448 		}
449 		else
450 		{
451 			return 0;
452 		}
453 
454 
455 		return fd;
456 	}
457 
458 	fd_t run(AsyncTCPConnection ctxt, TCPEventHandler del)
459 	in {
460 		assert(ctxt.socket == fd_t.init);
461 		assert(ctxt.peer.family != AF_UNSPEC);
462 	}
463 	body {
464 		m_status = StatusInfo.init;
465 		fd_t fd = ctxt.preInitializedSocket;
466 
467 		if (fd == fd_t.init)
468 			fd = WSASocketW(cast(int)ctxt.peer.family, SOCK_STREAM, IPPROTO_TCP, null, 0, WSA_FLAG_OVERLAPPED);
469 		static if (LOG) log("Starting connection at: " ~ fd.to!string);
470 		if (catchSocketError!("run AsyncTCPConnection")(fd, INVALID_SOCKET))
471 			return 0;
472 
473 		try {
474 			(m_tcpHandlers)[fd] = del;
475 		}
476 		catch (Exception e) {
477 			setInternalError!"m_tcpHandlers assign"(Status.ERROR, e.msg);
478 			closeSocket(fd, false);
479 			return 0;
480 		}
481 
482 		nothrow void closeAll() {
483 			try {
484 				static if (LOG) log("Remove event handler for " ~ fd.to!string);
485 				m_tcpHandlers.remove(fd);
486 			}
487 			catch (Exception e) {
488 				setInternalError!"m_tcpHandlers remove"(Status.ERROR, e.msg);
489 			}
490 			closeSocket(fd, false);
491 		}
492 
493 		if (ctxt.noDelay) {
494 			if (!setOption(fd, TCPOption.NODELAY, true)) {
495 				closeAll();
496 				return 0;
497 			}
498 		}
499 
500 		if (!initTCPConnection(fd, ctxt)) {
501 			closeAll();
502 			return 0;
503 		}
504 
505 
506 		static if (LOG) try log("Client started FD#" ~ fd.to!string);
507 		catch (Throwable) {}
508 		return fd;
509 	}
510 
511 	fd_t run(AsyncUDPSocket ctxt, UDPHandler del) {
512 		m_status = StatusInfo.init;
513 		fd_t fd = ctxt.preInitializedSocket;
514 
515 		if (fd == fd_t.init)
516 			fd = WSASocketW(cast(int)ctxt.local.family, SOCK_DGRAM, IPPROTO_UDP, null, 0, WSA_FLAG_OVERLAPPED);
517 
518 		if (catchSocketError!("run AsyncUDPSocket")(fd, INVALID_SOCKET))
519 			return 0;
520 
521 		if (initUDPSocket(fd, ctxt))
522 		{
523 			try {
524 				(m_udpHandlers)[fd] = del;
525 			}
526 			catch (Exception e) {
527 				setInternalError!"m_udpHandlers assign"(Status.ERROR, e.msg);
528 				closesocket(fd);
529 				return 0;
530 			}
531 		}
532 		else return 0;
533 
534 		static if (LOG) try log("UDP Socket started FD#" ~ fd.to!string);
535 		catch (Throwable) {}
536 
537 		return fd;
538 	}
539 
540 	fd_t run(shared AsyncSignal ctxt) {
541 		m_status = StatusInfo.init;
542 		static if (LOG) try log("Signal subscribed to: " ~ m_hwnd.to!string); catch (Throwable) {}
543 		return (cast(fd_t)m_hwnd);
544 	}
545 
546 	fd_t run(AsyncNotifier ctxt) {
547 		m_status = StatusInfo.init;
548 		//static if (LOG) try log("Running signal " ~ (cast(AsyncNotifier)ctxt).to!string); catch (Throwable) {}
549 		return cast(fd_t) m_hwnd;
550 	}
551 
552 	fd_t run(AsyncTimer ctxt, TimerHandler del, Duration timeout) {
553 		if (timeout < 0.seconds)
554 			timeout = 0.seconds;
555 		m_status = StatusInfo.init;
556 		fd_t timer_id = ctxt.id;
557 		if (timer_id == fd_t.init) {
558 			timer_id = createIndex();
559 		}
560 		static if (LOG) try log("Timer created: " ~ timer_id.to!string ~ " with timeout: " ~ timeout.total!"msecs".to!string ~ " msecs"); catch (Throwable) {}
561 
562 		BOOL err;
563 		try err = cast(int)SetTimer(m_hwnd, timer_id, timeout.total!"msecs".to!uint, null);
564 		catch(Exception e) {
565 			setInternalError!"SetTimer"(Status.ERROR);
566 			return 0;
567 		}
568 
569 		if (err == 0)
570 		{
571 			m_error = GetLastErrorSafe();
572 			m_status.code = Status.ERROR;
573 			m_status.text = "kill(AsyncTimer)";
574 			static if (LOG) log(m_status);
575 			return 0;
576 		}
577 
578 		if (m_timer.fd == fd_t.init || m_timer.fd == timer_id)
579 		{
580 			m_timer.fd = timer_id;
581 			m_timer.cb = del;
582 		}
583 		else {
584 			try
585 			{
586 				(m_timerHandlers)[timer_id] = del;
587 			}
588 			catch (Exception e) {
589 				setInternalError!"HashMap assign"(Status.ERROR);
590 				return 0;
591 			}
592 		}
593 
594 
595 		return timer_id;
596 	}
597 
598 	fd_t run(AsyncDirectoryWatcher ctxt, DWHandler del)
599 	{
600 		static fd_t ids;
601 		auto fd = ++ids;
602 
603 		try (m_dwHandlers)[fd] = new DWHandlerInfo(del);
604 		catch (Exception e) {
605 			setInternalError!"AsyncDirectoryWatcher.hashMap(run)"(Status.ERROR, "Could not add handler to hashmap: " ~ e.msg);
606 		}
607 
608 		return fd;
609 
610 	}
611 
612 	bool kill(AsyncDirectoryWatcher ctxt) {
613 
614 		try {
615 			Array!DWFolderWatcher toFree;
616 			foreach (ref const uint k, const DWFolderWatcher v; m_dwFolders) {
617 				if (v.fd == ctxt.fd) {
618 					CloseHandle(v.handle);
619 					m_dwFolders.remove(k);
620 				}
621 			}
622 
623 			foreach (DWFolderWatcher obj; toFree[])
624 				ThreadMem.free(obj);
625 
626 			// todo: close all the handlers...
627 			m_dwHandlers.remove(ctxt.fd);
628 		}
629 		catch (Exception e) {
630 			setInternalError!"in kill(AsyncDirectoryWatcher)"(Status.ERROR, e.msg);
631 			return false;
632 		}
633 
634 		return true;
635 	}
636 
637 	bool kill(AsyncTCPConnection ctxt, bool forced = false)
638 	{
639 
640 		m_status = StatusInfo.init;
641 		fd_t fd = ctxt.socket;
642 
643 		static if (LOG) log("Killing socket "~ fd.to!string);
644 		try {
645 			auto cb = m_tcpHandlers.get(ctxt.socket);
646 			if (cb != TCPEventHandler.init){
647 				*cb.conn.connected = false;
648 				*cb.conn.connecting = false;
649 				return closeSocket(fd, true, forced);
650 			}
651 		} catch (Exception e) {
652 			setInternalError!"in m_tcpHandlers"(Status.ERROR, e.msg);
653 			assert(false);
654 			//return false;
655 		}
656 
657 		return true;
658 	}
659 
660 	bool kill(AsyncTCPListener ctxt)
661 	{
662 		m_status = StatusInfo.init;
663 		fd_t fd = ctxt.socket;
664 		try {
665 			if ((ctxt.socket in m_connHandlers) !is null) {
666 				return closeSocket(fd, false, true);
667 			}
668 		} catch (Exception e) {
669 			setInternalError!"in m_connHandlers"(Status.ERROR, e.msg);
670 			return false;
671 		}
672 
673 		return true;
674 	}
675 
676 	bool kill(shared AsyncSignal ctxt) {
677 		return true;
678 	}
679 
680 	bool kill(AsyncNotifier ctxt) {
681 		return true;
682 	}
683 
684 	bool kill(AsyncTimer ctxt) {
685 		m_status = StatusInfo.init;
686 
687 		static if (LOG) try log("Kill timer" ~ ctxt.id.to!string); catch (Throwable) {}
688 
689 		BOOL err = KillTimer(m_hwnd, ctxt.id);
690 		if (err == 0)
691 		{
692 			m_error = GetLastErrorSafe();
693 			m_status.code = Status.ERROR;
694 			m_status.text = "kill(AsyncTimer)";
695 			static if (LOG) log(m_status);
696 			return false;
697 		}
698 
699 		destroyIndex(ctxt);
700 		scope(exit)
701 			ctxt.id = fd_t.init;
702 		if (m_timer.fd == ctxt.id) {
703 			ctxt.id = 0;
704 			m_timer = TimerCache.init;
705 		} else {
706 			try {
707 				m_timerHandlers.remove(ctxt.id);
708 			}
709 			catch (Exception e) {
710 				setInternalError!"HashMap remove"(Status.ERROR);
711 				return 0;
712 			}
713 		}
714 
715 
716 		return true;
717 	}
718 
719 	bool kill(AsyncEvent ctxt, bool forced = false) {
720 		return true;
721 	}
722 
723 	bool kill(AsyncUDPSocket ctxt) {
724 		m_status = StatusInfo.init;
725 
726 		fd_t fd = ctxt.socket;
727 		INT err = closesocket(fd);
728 		if (catchSocketError!"closesocket"(err))
729 			return false;
730 
731 		try m_udpHandlers.remove(ctxt.socket);
732 		catch (Exception e) {
733 			setInternalError!"HashMap remove"(Status.ERROR);
734 			return 0;
735 		}
736 
737 		return true;
738 	}
739 
740 	bool setOption(T)(fd_t fd, TCPOption option, in T value) {
741 		m_status = StatusInfo.init;
742 		int err;
743 		try {
744 			nothrow bool errorHandler() {
745 				if (catchSocketError!"setOption:"(err)) {
746 					try m_status.text ~= option.to!string;
747 					catch (Exception e){ assert(false, "to!string conversion failure"); }
748 					return false;
749 				}
750 
751 				return true;
752 			}
753 
754 
755 
756 			final switch (option) {
757 
758 				case TCPOption.NODELAY: // true/false
759 					static if (!is(T == bool))
760 						assert(false, "NODELAY value type must be bool, not " ~ T.stringof);
761 					else {
762 						BOOL val = value?1:0;
763 						socklen_t len = val.sizeof;
764 						err = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, len);
765 						return errorHandler();
766 					}
767 				case TCPOption.REUSEPORT:
768 				case TCPOption.REUSEADDR: // true/false
769 					static if (!is(T == bool))
770 						assert(false, "REUSEADDR value type must be bool, not " ~ T.stringof);
771 					else
772 					{
773 						BOOL val = value?1:0;
774 						socklen_t len = val.sizeof;
775 						err = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &val, len);
776 						return errorHandler();
777 					}
778 				case TCPOption.QUICK_ACK:
779 					static if (!is(T == bool))
780 						assert(false, "QUICK_ACK value type must be bool, not " ~ T.stringof);
781 					else {
782 						m_status.code = Status.NOT_IMPLEMENTED;
783 						return false; // quick ack is not implemented
784 					}
785 				case TCPOption.KEEPALIVE_ENABLE: // true/false
786 					static if (!is(T == bool))
787 						assert(false, "KEEPALIVE_ENABLE value type must be bool, not " ~ T.stringof);
788 					else
789 					{
790 						BOOL val = value?1:0;
791 						socklen_t len = val.sizeof;
792 						err = setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &val, len);
793 						return errorHandler();
794 					}
795 				case TCPOption.KEEPALIVE_COUNT: // retransmit 10 times before dropping half-open conn
796 					static if (!isIntegral!T)
797 						assert(false, "KEEPALIVE_COUNT value type must be integral, not " ~ T.stringof);
798 					else {
799 						m_status.code = Status.NOT_IMPLEMENTED;
800 						return false;
801 					}
802 				case TCPOption.KEEPALIVE_INTERVAL: // wait ## seconds between each keepalive packets
803 					static if (!is(T == Duration))
804 						assert(false, "KEEPALIVE_INTERVAL value type must be Duration, not " ~ T.stringof);
805 					else {
806 
807 						if (!kcache)
808 							kcache = new HashMap!(fd_t, tcp_keepalive)();
809 
810 						tcp_keepalive kaSettings = kcache.get(fd, tcp_keepalive.init);
811 						tcp_keepalive sReturned;
812 						DWORD dwBytes;
813 						kaSettings.onoff = ULONG(1);
814 						if (kaSettings.keepalivetime == ULONG.init) {
815 							kaSettings.keepalivetime = 1000;
816 						}
817 						kaSettings.keepaliveinterval = value.total!"msecs".to!ULONG;
818 						(*kcache)[fd] = kaSettings;
819 						err = WSAIoctl(fd, SIO_KEEPALIVE_VALS, &kaSettings, tcp_keepalive.sizeof, &sReturned, tcp_keepalive.sizeof, &dwBytes, null, null);
820 
821 						return errorHandler();
822 					}
823 				case TCPOption.KEEPALIVE_DEFER: // wait ## seconds until start
824 					static if (!is(T == Duration))
825 						assert(false, "KEEPALIVE_DEFER value type must be Duration, not " ~ T.stringof);
826 					else {
827 
828 						if (!kcache)
829 							kcache = new HashMap!(fd_t, tcp_keepalive)();
830 
831 						tcp_keepalive kaSettings = kcache.get(fd, tcp_keepalive.init);
832 						tcp_keepalive sReturned;
833 						DWORD dwBytes;
834 						kaSettings.onoff = ULONG(1);
835 						if (kaSettings.keepaliveinterval == ULONG.init) {
836 							kaSettings.keepaliveinterval = 75*1000;
837 						}
838 						kaSettings.keepalivetime = value.total!"msecs".to!ULONG;
839 
840 						(*kcache)[fd] = kaSettings;
841 						err = WSAIoctl(fd, SIO_KEEPALIVE_VALS, &kaSettings, tcp_keepalive.sizeof, &sReturned, tcp_keepalive.sizeof, &dwBytes, null, null);
842 
843 						return errorHandler();
844 					}
845 				case TCPOption.BUFFER_RECV: // bytes
846 					static if (!isIntegral!T)
847 						assert(false, "BUFFER_RECV value type must be integral, not " ~ T.stringof);
848 					else {
849 						int val = value.to!int;
850 						socklen_t len = val.sizeof;
851 						err = setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &val, len);
852 						return errorHandler();
853 					}
854 				case TCPOption.BUFFER_SEND: // bytes
855 					static if (!isIntegral!T)
856 						assert(false, "BUFFER_SEND value type must be integral, not " ~ T.stringof);
857 					else {
858 						int val = value.to!int;
859 						socklen_t len = val.sizeof;
860 						err = setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &val, len);
861 						return errorHandler();
862 					}
863 				case TCPOption.TIMEOUT_RECV:
864 					static if (!is(T == Duration))
865 						assert(false, "TIMEOUT_RECV value type must be Duration, not " ~ T.stringof);
866 					else {
867 						DWORD val = value.total!"msecs".to!DWORD;
868 						socklen_t len = val.sizeof;
869 						err = setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, &val, len);
870 						return errorHandler();
871 					}
872 				case TCPOption.TIMEOUT_SEND:
873 					static if (!is(T == Duration))
874 						assert(false, "TIMEOUT_SEND value type must be Duration, not " ~ T.stringof);
875 					else {
876 						DWORD val = value.total!"msecs".to!DWORD;
877 						socklen_t len = val.sizeof;
878 						err = setsockopt(fd, SOL_SOCKET, SO_SNDTIMEO, &val, len);
879 						return errorHandler();
880 					}
881 				case TCPOption.TIMEOUT_HALFOPEN:
882 					static if (!is(T == Duration))
883 						assert(false, "TIMEOUT_SEND value type must be Duration, not " ~ T.stringof);
884 					else {
885 						m_status.code = Status.NOT_IMPLEMENTED;
886 						return false;
887 					}
888 				case TCPOption.LINGER: // bool onOff, int seconds
889 					static if (!is(T == Tuple!(bool, int)))
890 						assert(false, "LINGER value type must be Tuple!(bool, int), not " ~ T.stringof);
891 					else {
892 						linger l = linger(val[0]?1:0, val[1].to!USHORT);
893 						socklen_t llen = l.sizeof;
894 						err = setsockopt(fd, SOL_SOCKET, SO_LINGER, &l, llen);
895 						return errorHandler();
896 					}
897 				case TCPOption.CONGESTION:
898 					static if (!isIntegral!T)
899 						assert(false, "CONGESTION value type must be integral, not " ~ T.stringof);
900 					else {
901 						m_status.code = Status.NOT_IMPLEMENTED;
902 						return false;
903 					}
904 				case TCPOption.CORK:
905 					static if (!isIntegral!T)
906 						assert(false, "CORK value type must be int, not " ~ T.stringof);
907 					else {
908 						m_status.code = Status.NOT_IMPLEMENTED;
909 						return false;
910 					}
911 				case TCPOption.DEFER_ACCEPT: // seconds
912 					static if (!isIntegral!T)
913 						assert(false, "DEFER_ACCEPT value type must be integral, not " ~ T.stringof);
914 					else {
915 						int val = value.to!int;
916 						socklen_t len = val.sizeof;
917 						err = setsockopt(fd, SOL_SOCKET, SO_CONDITIONAL_ACCEPT, &val, len);
918 						return errorHandler();
919 					}
920 			}
921 
922 		}
923 		catch (Exception e) {
924 			return false;
925 		}
926 
927 	}
928 
929 	uint read(in fd_t fd, ref ubyte[] data)
930 	{
931 		return 0;
932 	}
933 
934 	uint write(in fd_t fd, in ubyte[] data)
935 	{
936 		return 0;
937 	}
938 
939 	uint readChanges(in fd_t fd, ref DWChangeInfo[] dst) {
940 		size_t i;
941 		Array!DWChangeInfo* changes;
942 		try {
943 			changes = &(m_dwHandlers.get(fd, DWHandlerInfo.init).buffer);
944 			if ((*changes).empty)
945 				return 0;
946 
947 			import std.algorithm : min;
948 			size_t cnt = min(dst.length, changes.length);
949 			foreach (DWChangeInfo change; (*changes)[0 .. cnt]) {
950 				static if (LOG) try log("reading change: " ~ change.path); catch (Throwable) {}
951 				dst[i] = (*changes)[i];
952 				i++;
953 			}
954 			changes.linearRemove((*changes)[0 .. cnt]);
955 		}
956 		catch (Exception e) {
957 			setInternalError!"watcher.readChanges"(Status.ERROR, "Could not read directory changes: " ~ e.msg);
958 			return 0;
959 		}
960 		static if (LOG) try log("Changes returning with: " ~ i.to!string); catch (Throwable) {}
961 		return cast(uint) i;
962 	}
963 
964 	uint watch(in fd_t fd, in WatchInfo info) {
965 		m_status = StatusInfo.init;
966 		uint wd;
967 		try {
968 			HANDLE hndl = CreateFileW(toUTFz!(const(wchar)*)(info.path.toNativeString()),
969 			                          FILE_LIST_DIRECTORY,
970 			                          FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE,
971 			                          null,
972 			                          OPEN_EXISTING,
973 			                          FILE_FLAG_BACKUP_SEMANTICS | FILE_FLAG_OVERLAPPED,
974 			                          null);
975 			wd = cast(uint) hndl;
976 			DWHandlerInfo handler = m_dwHandlers.get(fd, DWHandlerInfo.init);
977 			assert(handler !is null);
978 			static if (LOG) log("Watching: " ~ info.path.toNativeString());
979 			(m_dwFolders)[wd] = ThreadMem.alloc!DWFolderWatcher(m_evLoop, fd, hndl, info.path, info.events, handler, info.recursive);
980 		} catch (Exception e) {
981 			setInternalError!"watch"(Status.ERROR, "Could not start watching directory: " ~ e.msg);
982 			return 0;
983 		}
984 		return wd;
985 	}
986 
987 	bool unwatch(in fd_t fd, in fd_t _wd) {
988 		uint wd = cast(uint) _wd;
989 		m_status = StatusInfo.init;
990 		try {
991 			DWFolderWatcher fw = m_dwFolders.get(wd, null);
992 			assert(fw !is null);
993 			m_dwFolders.remove(wd);
994 			fw.close();
995 			ThreadMem.free(fw);
996 		} catch (Exception e) {
997 			setInternalError!"unwatch"(Status.ERROR, "Failed when unwatching directory: " ~ e.msg);
998 			return false;
999 		}
1000 		return true;
1001 	}
1002 
1003 	bool notify(T)(in fd_t fd, in T payload)
1004 		if (is(T == shared AsyncSignal) || is(T == AsyncNotifier))
1005 	{
1006 		m_status = StatusInfo.init;
1007 		import std.conv;
1008 
1009 		auto payloadPtr = cast(ubyte*)payload;
1010 		auto payloadAddr = cast(ulong)payloadPtr;
1011 
1012 		WPARAM wparam = payloadAddr & 0xffffffff;
1013 		LPARAM lparam = cast(uint) (payloadAddr >> 32);
1014 
1015 		BOOL err;
1016 		static if (is(T == AsyncNotifier))
1017 			err = PostMessageA(cast(HWND)fd, WM_USER_SIGNAL, wparam, lparam);
1018 		else
1019 			err = PostMessageA(cast(HWND)fd, WM_USER_EVENT, wparam, lparam);
1020 		static if (LOG) try log("Sending notification to: " ~ (cast(HWND)fd).to!string); catch (Throwable) {}
1021 		if (err == 0)
1022 		{
1023 			m_error = GetLastErrorSafe();
1024 			m_status.code = Status.ERROR;
1025 			m_status.text = "notify";
1026 			static if (LOG) log(m_status);
1027 			return false;
1028 		}
1029 		return true;
1030 	}
1031 
1032 	fd_t run(AsyncSocket ctxt)
1033 	{
1034 		m_status = StatusInfo.init;
1035 
1036 		auto fd = ctxt.preInitializedHandle;
1037 
1038 		if (fd == INVALID_SOCKET) {
1039 			fd = WSASocketW(ctxt.info.domain, ctxt.info.type, ctxt.info.protocol, null, 0, WSA_FLAG_OVERLAPPED);
1040 		}
1041 
1042 		if (catchErrors!"socket"(fd)) {
1043 			.error("Failed to create socket: ", error);
1044 			return INVALID_SOCKET;
1045 		}
1046 
1047 		return fd;
1048 	}
1049 
1050 	bool kill(AsyncSocket ctxt, bool forced = false)
1051 	{
1052 		m_status = StatusInfo.init;
1053 
1054 		auto handle = ctxt.resetHandle();
1055 
1056 		if (ctxt.connectionOriented && ctxt.passive) {
1057 			foreach (request; m_completedSocketAccepts) if (request.socket is ctxt) {
1058 				sockaddr* localAddress, remoteAddress;
1059 				socklen_t localAddressLength, remoteAddressLength;
1060 
1061 				GetAcceptExSockaddrs(request.buffer.ptr,
1062 									 0,
1063 									 cast(DWORD) request.buffer.length / 2,
1064 									 cast(DWORD) request.buffer.length / 2,
1065 									 &localAddress,
1066 									 &localAddressLength,
1067 									 &remoteAddress,
1068 									 &remoteAddressLength);
1069 
1070 				m_completedSocketAccepts.removeFront();
1071 				if (!onAccept(handle, request, remoteAddress)) {
1072 					.warning("Failed to accept incoming connection request while killing listener");
1073 				}
1074 			}
1075 		}
1076 
1077 		if (!ctxt.passive) {
1078 			foreach (request; m_completedSocketReceives) if (request.socket is ctxt) {
1079 				m_completedSocketReceives.removeFront();
1080 				if (request.message) {
1081 					assumeWontThrow(request.onComplete.get!0)(request.message.transferred);
1082 					assumeWontThrow(NetworkMessage.free(request.message));
1083 				} else {
1084 					assumeWontThrow(request.onComplete.get!1)();
1085 				}
1086 				assumeWontThrow(AsyncReceiveRequest.free(request));
1087 			}
1088 
1089 			foreach (request; m_completedSocketSends) if (request.socket is ctxt) {
1090 				m_completedSocketSends.removeFront();
1091 				request.onComplete();
1092 				assumeWontThrow(NetworkMessage.free(request.message));
1093 				assumeWontThrow(AsyncSendRequest.free(request));
1094 			}
1095 
1096 			if(!CancelIo(cast(HANDLE) handle)) {
1097 				m_status.code = Status.ABORT;
1098 				m_error = GetLastErrorSafe();
1099 				.error("Failed to cancel outstanding overlapped I/O requests: ", this.error);
1100 				return false;
1101 			}
1102 		}
1103 
1104 		if (ctxt.connectionOriented && ctxt.passive) {
1105 			foreach (overlapped; cast(AsyncOverlapped*[]) m_pendingAccepts.keys) {
1106 				if (overlapped.accept.socket is ctxt) {
1107 					m_pendingAccepts.remove(overlapped);
1108 					assumeWontThrow(AsyncOverlapped.free(overlapped));
1109 				}
1110 			}
1111 		} else if (ctxt.connectionOriented && !ctxt.passive && ctxt in m_pendingConnects) {
1112 			auto overlapped = cast(AsyncOverlapped*) m_pendingConnects[ctxt];
1113 			m_pendingConnects.remove(ctxt);
1114 			assumeWontThrow(AsyncOverlapped.free(overlapped));
1115 		}
1116 
1117 		if (ctxt.connectionOriented && !ctxt.passive) {
1118 			*ctxt.connected = false;
1119 		}
1120 
1121 		INT err;
1122 		if (ctxt.connectionOriented) {
1123 			if (forced) {
1124 				err = shutdown(handle, SD_BOTH);
1125 				closesocket(ctxt.handle);
1126 			} else {
1127 				err = shutdown(handle, SD_SEND);
1128 			}
1129 			if (catchSocketError!"shutdown"(err)) {
1130 				return false;
1131 			}
1132 		} else {
1133 			closesocket(handle);
1134 		}
1135 
1136 		return true;
1137 	}
1138 
1139 	bool bind(AsyncSocket ctxt, sockaddr* addr, socklen_t addrlen)
1140 	{
1141 		import libasync.internals.socket_compat : bind;
1142 
1143 		auto err = bind(ctxt.handle, addr, addrlen);
1144 		if (catchSocketError!"bind"(err)) {
1145 			.error("Failed to bind socket: ", error);
1146 			return false;
1147 		}
1148 
1149 		return true;
1150 	}
1151 
1152 	bool connect(AsyncSocket ctxt, sockaddr* addr, socklen_t addrlen)
1153 	{
1154 		m_status = StatusInfo.init;
1155 
1156 		// Connectionless sockets can be connected immediately,
1157 		// as this only sets the default remote address.
1158 		if (!ctxt.connectionOriented) {
1159 			import libasync.internals.socket_compat : connect;
1160 
1161 			auto err = connect(ctxt.handle, addr, addrlen);
1162 			if (catchSocketError!"connect"(err)) {
1163 				.error("Failed to connect socket: ", error);
1164 				return false;
1165 			}
1166 			return true;
1167 		}
1168 
1169 		// ConnectEx requires a bound connection-oriented socket.
1170 		try ctxt.localAddress; catch (SocketOSException) {
1171 			NetworkAddress local;
1172 			switch (ctxt.info.domain) {
1173 				case AF_INET:
1174 					local.addr_ip4.sin_family = AF_INET;
1175 					local.addr_ip4.sin_addr.s_addr = INADDR_ANY;
1176 					local.addr_ip4.sin_port = 0;
1177 					break;
1178 				case AF_INET6:
1179 					local.addr_ip6.sin6_family = AF_INET6;
1180 					local.addr_ip6.sin6_addr = IN6ADDR_ANY;
1181 					local.addr_ip6.sin6_port = 0;
1182 					break;
1183 				default:
1184 					assert(false, "Unsupported address family");
1185 			}
1186 
1187 			if (!bind(ctxt, local.sockAddr, local.sockAddrLen)) {
1188 				return false;
1189 			}
1190 		} catch (Exception e) assert(false);
1191 
1192 		auto overlapped = assumeWontThrow(AsyncOverlapped.alloc());
1193 		overlapped.hEvent = pendingConnectEvent;
1194 		if (ConnectEx(ctxt.handle, addr, addrlen, null, 0, null, &overlapped.overlapped)) {
1195 			assumeWontThrow(AsyncOverlapped.free(overlapped));
1196 			if (updateConnectContext(ctxt.handle)) {
1197 				ctxt.handleConnect();
1198 				return true;
1199 			} else {
1200 				ctxt.kill();
1201 				ctxt.handleError();
1202 				return false;
1203 			}
1204 		} else {
1205 			m_error = WSAGetLastErrorSafe();
1206 			if (m_error == WSA_IO_PENDING) {
1207 				m_pendingConnects[ctxt] = overlapped;
1208 				return true;
1209 			} else {
1210 				m_status.code = Status.ABORT;
1211 				ctxt.kill();
1212 				ctxt.handleError();
1213 				return false;
1214 			}
1215 		}
1216 	}
1217 
1218 	auto updateAcceptContext(fd_t listener, fd_t socket)
1219 	{
1220 		auto err = setsockopt(socket, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT, &listener, listener.sizeof);
1221 		if (catchSocketError!"accept"(err)) {
1222 			.error("Failed to setup accepted socket: ", error);
1223 			return false;
1224 		}
1225 		else return true;
1226 	}
1227 
1228 	auto updateConnectContext(fd_t socket)
1229 	{
1230 		auto err = setsockopt(socket, SOL_SOCKET, SO_UPDATE_CONNECT_CONTEXT, null, 0);
1231 		if (catchSocketError!"connect"(err)) {
1232 			.error("Failed to setup connected socket: ", error);
1233 			return false;
1234 		}
1235 		else return true;
1236 	}
1237 
1238 	/+
1239 	bool setupConnectedCOASocket(AsyncSocket ctxt, AsyncSocket incomingOn = null)
1240 	{
1241 		fd_t err;
1242 
1243 		*ctxt.connected = true;
1244 
1245 		if (incomingOn) {
1246 			auto listenerHandle = incomingOn.handle;
1247 			err = setsockopt(ctxt.handle, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT, &listenerHandle, listenerHandle.sizeof);
1248 			if (catchSocketError!"connect"(err)) {
1249 				.error("Failed to setup connected socket: ", error);
1250 				ctxt.handleError();
1251 				return false;
1252 			}
1253 		} else {
1254 			err = setsockopt(ctxt.handle, SOL_SOCKET, SO_UPDATE_CONNECT_CONTEXT, null, 0);
1255 			if (catchSocketError!"connect"(err)) {
1256 				.error("Failed to setup connected socket: ", error);
1257 				ctxt.handleError();
1258 				return false;
1259 			}
1260 		}
1261 
1262 		return true;
1263 	}
1264 	+/
1265 
1266 	bool listen(AsyncSocket ctxt, int backlog)
1267 	{
1268 		import libasync.internals.socket_compat : listen;
1269 
1270 		auto err = listen(ctxt.handle, backlog);
1271 		if (catchSocketError!"listen"(err)) {
1272 			.error("Failed to listen on socket: ", error);
1273 			return false;
1274 		}
1275 		return true;
1276 	}
1277 
1278 	bool onAccept(fd_t listener, AsyncAcceptRequest* request, sockaddr* remoteAddress)
1279 	{
1280 		auto socket = request.socket;
1281 		scope (exit) assumeWontThrow(AsyncAcceptRequest.free(request));
1282 
1283 		if (!updateAcceptContext(listener, request.peer)) {
1284 			if (socket.alive) {
1285 				m_status.code = Status.ABORT;
1286 				socket.kill();
1287 				socket.handleError();
1288 			}
1289 			return false;
1290 		}
1291 
1292 		auto peer = request.onComplete(request.peer, remoteAddress.sa_family, socket.info.type, socket.info.protocol);
1293 		if (peer.run()) {
1294 			peer.handleConnect();
1295 			return true;
1296 		} else {
1297 			peer.kill();
1298 			peer.handleError();
1299 			return false;
1300 		}
1301 	}
1302 
1303 	void submitRequest(AsyncAcceptRequest* request)
1304 	{
1305 		auto overlapped = assumeWontThrow(AsyncOverlapped.alloc());
1306 		overlapped.accept = request;
1307 		overlapped.hEvent = pendingAcceptEvent;
1308 
1309 		auto socket = request.socket;
1310 
1311 		request.peer = WSASocketW(request.socket.info.domain,
1312 								  request.socket.info.type,
1313 								  request.socket.info.protocol,
1314 								  null, 0, WSA_FLAG_OVERLAPPED);
1315 
1316 		if (request.peer == SOCKET_ERROR) {
1317 			m_error = WSAGetLastErrorSafe();
1318 
1319 			assumeWontThrow(AsyncOverlapped.free(overlapped));
1320 			assumeWontThrow(AsyncAcceptRequest.free(request));
1321 
1322 			.errorf("Failed to create peer socket with WSASocket: %s", error);
1323 			m_status.code = Status.ABORT;
1324 			socket.kill();
1325 			socket.handleError();
1326 			return;
1327 		}
1328 
1329 		DWORD bytesReceived;
1330 	retry:
1331 		if (AcceptEx(socket.handle,
1332 		             request.peer,
1333 		             request.buffer.ptr,
1334 		             0,
1335 		             cast(DWORD) request.buffer.length / 2,
1336 		             cast(DWORD) request.buffer.length / 2,
1337 		             &bytesReceived,
1338 		             &overlapped.overlapped)) {
1339 			assumeWontThrow(AsyncOverlapped.free(overlapped));
1340 			m_completedSocketAccepts.insertBack(request);
1341 			return;
1342 		} else {
1343 			m_error = WSAGetLastErrorSafe();
1344 			if (m_error == WSA_IO_PENDING) {
1345 				m_pendingAccepts[overlapped] = true;
1346 				return;
1347 			// AcceptEx documentation states this error happens if "an incoming connection was indicated,
1348 			// but was subsequently terminated by the remote peer prior to accepting the call".
1349 			// This means there is no pending accept and we have to call AcceptEx again; this,
1350 			// however, is a potential avenue for a denial-of-service attack, in which clients start
1351 			// a connection to us but immediately terminate it, resulting in a (theoretically) infinite
1352 			// loop here. The alternative to continuous resubmitting is closing the socket
1353 			// (either immediately, or after a finite amount of tries to resubmit); that however, also opens up
1354 			// a denial-of-service attack vector (a finite amount of such malicous connection attempts
1355 			// can bring down any of our listening sockets). Of the two, the latter is a lot easier to exploit,
1356 			// so for now we go with the first option of continuous resubmission.
1357 			// TODO: Try to think of an better way to handle this.
1358 			} else if (m_error == WSAECONNRESET) {
1359 				goto retry;
1360 			} else {
1361 				m_status.code = Status.ABORT;
1362 				assumeWontThrow(AsyncOverlapped.free(overlapped));
1363 				assumeWontThrow(AsyncAcceptRequest.free(request));
1364 				socket.kill();
1365 				socket.handleError();
1366 			}
1367 		}
1368 	}
1369 
1370 	void submitRequest(AsyncReceiveRequest* request)
1371 	{
1372 		auto overlapped = assumeWontThrow(AsyncOverlapped.alloc());
1373 		overlapped.receive = request;
1374 		auto socket = request.socket;
1375 
1376 		int err = void;
1377 		if (!request.message) {
1378 			.tracef("WSARecv on FD %s with zero byte buffer", socket.handle);
1379 			WSABUF buffer;
1380 			DWORD flags;
1381 			err = WSARecv(socket.handle,
1382 			              &buffer,
1383 			              1,
1384 			              null,
1385 			              &flags,
1386 			              cast(const(WSAOVERLAPPEDX*)) overlapped,
1387 			              cast(LPWSAOVERLAPPED_COMPLETION_ROUTINEX) &onOverlappedReceiveComplete);
1388 		} else if (request.message.name) {
1389 			.tracef("WSARecvFrom on FD %s with buffer size %s",
1390 			        socket.handle, request.message.header.msg_iov.len);
1391 			err = WSARecvFrom(socket.handle,
1392 			                  request.message.buffers,
1393 			                  cast(DWORD) request.message.bufferCount,
1394 			                  null,
1395 			                  &request.message.header.msg_flags,
1396 			                  request.message.name,
1397 			                  &request.message.header.msg_namelen,
1398 			                  cast(const(WSAOVERLAPPEDX*)) overlapped,
1399 			                  cast(LPWSAOVERLAPPED_COMPLETION_ROUTINEX) &onOverlappedReceiveComplete);
1400 		} else {
1401 			.tracef("WSARecv on FD %s with buffer size %s",
1402 			        socket.handle, request.message.header.msg_iov.len);
1403 			err = WSARecv(socket.handle,
1404 			              request.message.buffers,
1405 			              cast(DWORD) request.message.bufferCount,
1406 			              null,
1407 			              &request.message.header.msg_flags,
1408 			              cast(const(WSAOVERLAPPEDX*)) overlapped,
1409 			              cast(LPWSAOVERLAPPED_COMPLETION_ROUTINEX) &onOverlappedReceiveComplete);
1410 		}
1411 		if (err == SOCKET_ERROR) {
1412 			m_error = WSAGetLastErrorSafe();
1413 			if (m_error == WSA_IO_PENDING) return;
1414 
1415 			assumeWontThrow(AsyncOverlapped.free(overlapped));
1416 			if (request.message) assumeWontThrow(NetworkMessage.free(request.message));
1417 			assumeWontThrow(AsyncReceiveRequest.free(request));
1418 
1419 			// TODO: Possibly deal with WSAEWOULDBLOCK, which supposedly signals
1420 			//       too many pending overlapped I/O requests.
1421 			if (m_error == WSAECONNRESET ||
1422 			    m_error == WSAECONNABORTED ||
1423 			    m_error == WSAENOTSOCK) {
1424 				socket.handleClose();
1425 
1426 				*socket.connected = false;
1427 
1428 				closesocket(socket.handle);
1429 				return;
1430 			}
1431 
1432 			.errorf("WSARecv* on FD %d encountered socket error: %s", socket.handle, this.error);
1433 			m_status.code = Status.ABORT;
1434 			socket.kill();
1435 			socket.handleError();
1436 		}
1437 	}
1438 
1439 	void submitRequest(AsyncSendRequest* request)
1440 	{
1441 		auto overlapped = assumeWontThrow(AsyncOverlapped.alloc());
1442 		overlapped.send = request;
1443 		auto socket = request.socket;
1444 
1445 		int err = void;
1446 		if (request.message.name) {
1447 			.tracef("WSASendTo on FD %s for %s with buffer size %s",
1448 			        socket.handle,
1449 			        NetworkAddress(request.message.name, request.message.header.msg_namelen),
1450 			        request.message.header.msg_iov.len);
1451 			err = WSASendTo(socket.handle,
1452 		                    request.message.buffers,
1453 		                    cast(DWORD) request.message.bufferCount,
1454 		                    null,
1455 		                    request.message.header.msg_flags,
1456 		                    request.message.name,
1457 		                    request.message.nameLength,
1458 		                    cast(const(WSAOVERLAPPEDX*)) overlapped,
1459 		                    cast(LPWSAOVERLAPPED_COMPLETION_ROUTINEX) &onOverlappedSendComplete);
1460 		} else {
1461 			.tracef("WSASend on FD %s with buffer size %s", socket.handle, request.message.header.msg_iov.len);
1462 			err = WSASend(socket.handle,
1463 		                    request.message.buffers,
1464 		                    cast(DWORD) request.message.bufferCount,
1465 		                    null,
1466 		                    request.message.header.msg_flags,
1467 		                    cast(const(WSAOVERLAPPEDX*)) overlapped,
1468 		                    cast(LPWSAOVERLAPPED_COMPLETION_ROUTINEX) &onOverlappedSendComplete);
1469 		}
1470 
1471 		if (err == SOCKET_ERROR) {
1472 			m_error = WSAGetLastErrorSafe();
1473 			if (m_error == WSA_IO_PENDING) return;
1474 
1475 			assumeWontThrow(AsyncOverlapped.free(overlapped));
1476 			assumeWontThrow(NetworkMessage.free(request.message));
1477 			assumeWontThrow(AsyncSendRequest.free(request));
1478 
1479 			// TODO: Possibly deal with WSAEWOULDBLOCK, which supposedly signals
1480 			//       too many pending overlapped I/O requests.
1481 			if (m_error == WSAECONNRESET ||
1482 			    m_error == WSAECONNABORTED ||
1483 			    m_error == WSAENOTSOCK) {
1484 				socket.handleClose();
1485 
1486 				*socket.connected = false;
1487 
1488 				closesocket(socket.handle);
1489 				return;
1490 			}
1491 
1492 			.errorf("WSASend* on FD %d encountered socket error: %s", socket.handle, this.error);
1493 			m_status.code = Status.ABORT;
1494 			socket.kill();
1495 			socket.handleError();
1496 		}
1497 	}
1498 
1499 	pragma(inline, true)
1500 	uint recv(in fd_t fd, void[] data)
1501 	{
1502 		m_status = StatusInfo.init;
1503 		int ret = .recv(fd, cast(void*) data.ptr, cast(INT) data.length, 0);
1504 
1505 		//static if (LOG) try log("RECV " ~ ret.to!string ~ "B FD#" ~ fd.to!string); catch (Throwable) {}
1506 		if (catchSocketError!".recv"(ret)) { // ret == -1
1507 			if (m_error == error_t.WSAEWOULDBLOCK)
1508 				m_status.code = Status.ASYNC;
1509 			else if (m_error == error_t.WSAEINTR)
1510 				m_status.code = Status.RETRY;
1511 			return 0; // TODO: handle some errors more specifically
1512 		}
1513 
1514 		return cast(uint) ret;
1515 	}
1516 
1517 	pragma(inline, true)
1518 	uint send(in fd_t fd, in void[] data)
1519 	{
1520 		m_status = StatusInfo.init;
1521 		static if (LOG) try log("SEND " ~ data.length.to!string ~ "B FD#" ~ fd.to!string);
1522 		catch (Throwable) {}
1523 		int ret = .send(fd, cast(const(void)*) data.ptr, cast(INT) data.length, 0);
1524 
1525 		if (catchSocketError!"send"(ret)) {
1526 			if (m_error == error_t.WSAEWOULDBLOCK)
1527 				m_status.code = Status.ASYNC;
1528 			else if (m_error == error_t.WSAEWOULDBLOCK)
1529 				m_status.code = Status.RETRY;
1530 			return 0; // TODO: handle some errors more specifically
1531 		}
1532 		return cast(uint) ret;
1533 	}
1534 
1535 	bool broadcast(in fd_t fd, bool b) {
1536 	
1537 		int val = b?1:0;
1538 		socklen_t len = val.sizeof;
1539 		int err = setsockopt(fd, SOL_SOCKET, SO_BROADCAST, &val, len);
1540 		if (catchSocketError!"setsockopt"(err))
1541 			return false;
1542 	
1543 		return true;
1544 
1545 	}
1546 
1547 	uint recvFrom(in fd_t fd, void[] data, ref NetworkAddress addr)
1548 	{
1549 		m_status = StatusInfo.init;
1550 
1551 		addr.family = AF_INET6;
1552 		socklen_t addrLen = addr.sockAddrLen;
1553 		int ret = .recvfrom(fd, cast(void*) data.ptr, cast(INT) data.length, 0, addr.sockAddr, &addrLen);
1554 
1555 		if (addrLen < addr.sockAddrLen) {
1556 			addr.family = AF_INET;
1557 		}
1558 
1559 		static if (LOG) try log("RECVFROM " ~ ret.to!string ~ "B"); catch (Throwable) {}
1560 		if (catchSocketError!".recvfrom"(ret)) { // ret == -1
1561 			if (m_error == WSAEWOULDBLOCK)
1562 				m_status.code = Status.ASYNC;
1563 			else if (m_error == WSAEINTR)
1564 				m_status.code = Status.RETRY;
1565 			return 0; // TODO: handle some errors more specifically
1566 		}
1567 		m_status.code = Status.OK;
1568 
1569 		return cast(uint) ret;
1570 	}
1571 
1572 	uint sendTo(in fd_t fd, in void[] data, in NetworkAddress addr)
1573 	{
1574 		m_status = StatusInfo.init;
1575 		static if (LOG) try log("SENDTO " ~ data.length.to!string ~ "B " ~ addr.toString()); catch (Throwable) {}
1576 		int ret;
1577 		if (addr != NetworkAddress.init)
1578 			ret = .sendto(fd, cast(void*) data.ptr, cast(INT) data.length, 0, addr.sockAddr, addr.sockAddrLen);
1579 		else
1580 			ret = .send(fd, cast(void*) data.ptr, cast(INT) data.length, 0);
1581 
1582 		if (catchSocketError!".sendTo"(ret)) { // ret == -1
1583 			if (m_error == WSAEWOULDBLOCK)
1584 				m_status.code = Status.ASYNC;
1585 			else if (m_error == WSAEINTR)
1586 				m_status.code = Status.RETRY;
1587 			return 0; // TODO: handle some errors more specifically
1588 		}
1589 
1590 		m_status.code = Status.OK;
1591 		return cast(uint) ret;
1592 	}
1593 
1594 	NetworkAddress localAddr(in fd_t fd, bool ipv6) {
1595 		NetworkAddress ret;
1596 		import libasync.internals.win32 : getsockname, AF_INET, AF_INET6, socklen_t, sockaddr;
1597 		if (ipv6)
1598 			ret.family = AF_INET6;
1599 		else
1600 			ret.family = AF_INET;
1601 		socklen_t len = ret.sockAddrLen;
1602 		int err = getsockname(fd, ret.sockAddr, &len);
1603 		if (catchSocketError!"getsockname"(err))
1604 			return NetworkAddress.init;
1605 		if (len > ret.sockAddrLen)
1606 			ret.family = AF_INET6;
1607 		return ret;
1608 	}
1609 
1610 	void noDelay(in fd_t fd, bool b) {
1611 		m_status = StatusInfo.init;
1612 		setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &b, b.sizeof);
1613 	}
1614 
1615 	private bool closeRemoteSocket(fd_t fd, bool forced) {
1616 
1617 		INT err;
1618 
1619 		static if (LOG) try log("Shutdown FD#" ~ fd.to!string);
1620 		catch (Throwable) {}
1621 		if (forced) {
1622 			err = shutdown(fd, SD_BOTH);
1623 			closesocket(fd);
1624 		}
1625 		else
1626 			err = shutdown(fd, SD_SEND);
1627 
1628 		try {
1629 			TCPEventHandler* evh = fd in m_tcpHandlers;
1630 			if (evh) {
1631 				if (evh.conn.inbound) {
1632 					try ThreadMem.free(evh.conn);
1633 					catch(Exception e) { assert(false, "Failed to free resources"); }
1634 				}
1635 
1636 				evh.conn = null;
1637 				//static if (LOG) log("Remove event handler for " ~ fd.to!string);
1638 				m_tcpHandlers.remove(fd);
1639 			}
1640 		}
1641 		catch (Exception e) {
1642 			setInternalError!"m_tcpHandlers.remove"(Status.ERROR);
1643 			return false;
1644 		}
1645 		if (catchSocketError!"shutdown"(err))
1646 			return false;
1647 		return true;
1648 	}
1649 
1650 	// for connected sockets
1651 	bool closeSocket(fd_t fd, bool connected, bool forced = false)
1652 	{
1653 		m_status = StatusInfo.init;
1654 		if (!connected && forced) {
1655 			try {
1656 				if (fd in m_connHandlers) {
1657 					static if (LOG) log("Removing connection handler for: " ~ fd.to!string);
1658 					m_connHandlers.remove(fd);
1659 				}
1660 			}
1661 			catch (Exception e) {
1662 				setInternalError!"m_connHandlers.remove"(Status.ERROR);
1663 				return false;
1664 			}
1665 		}
1666 		else if (connected)
1667 			closeRemoteSocket(fd, forced);
1668 
1669 		if (!connected || forced) {
1670 			// todo: flush the socket here?
1671 
1672 			INT err = closesocket(fd);
1673 			if (catchSocketError!"closesocket"(err))
1674 				return false;
1675 
1676 		}
1677 		return true;
1678 	}
1679 
1680 	bool closeConnection(fd_t fd) {
1681 		return closeSocket(fd, true);
1682 	}
1683 
1684 	NetworkAddress getAddressFromIP(in string ipAddr, in ushort port = 0, in bool ipv6 = false, in bool tcp = true)
1685 	{
1686 		m_status = StatusInfo.init;
1687 
1688 		NetworkAddress addr;
1689 		WSAPROTOCOL_INFOW hints;
1690 		import std.conv : to;
1691 		if (ipv6) {
1692 			addr.family = AF_INET6;
1693 		}
1694 		else {
1695 			addr.family = AF_INET;
1696 		}
1697 
1698 		INT addrlen = addr.sockAddrLen;
1699 
1700 		LPWSTR str;
1701 		try {
1702 			str = cast(LPWSTR) toUTFz!(wchar*)(ipAddr);
1703 		} catch (Exception e) {
1704 			setInternalError!"toStringz"(Status.ERROR, e.msg);
1705 			return NetworkAddress.init;
1706 		}
1707 
1708 		INT err = WSAStringToAddressW(str, cast(INT) addr.family, null, addr.sockAddr, &addrlen);
1709 		if (port != 0) addr.port = port;
1710 		static if (LOG) try log(addr.toString());
1711 		catch (Throwable) {}
1712 		if( catchSocketError!"getAddressFromIP"(err) )
1713 			return NetworkAddress.init;
1714 		else assert(addrlen == addr.sockAddrLen);
1715 		return addr;
1716 	}
1717 
1718 	NetworkAddress getAddressFromDNS(in string host, in ushort port = 0, in bool ipv6 = true, in bool tcp = true, in bool force = true)
1719 		/*in {
1720 		debug import libasync.internals.validator : validateHost;
1721 		debug assert(validateHost(host), "Trying to connect to an invalid domain");
1722 	}
1723 	body */{
1724 		m_status = StatusInfo.init;
1725 		import std.conv : to;
1726 		NetworkAddress addr;
1727 		ADDRINFOW hints;
1728 		ADDRINFOW* infos;
1729 		LPCWSTR wPort = port.to!(wchar[]).toUTFz!(const(wchar)*);
1730 		if (ipv6) {
1731 			hints.ai_family = AF_INET6;
1732 			addr.family = AF_INET6;
1733 		}
1734 		else {
1735 			hints.ai_family = AF_INET;
1736 			addr.family = AF_INET;
1737 		}
1738 
1739 		if (tcp) {
1740 			hints.ai_protocol = IPPROTO_TCP;
1741 			hints.ai_socktype = SOCK_STREAM;
1742 		}
1743 		else {
1744 			hints.ai_protocol = IPPROTO_UDP;
1745 			hints.ai_socktype = SOCK_DGRAM;
1746 		}
1747 		if (port != 0) addr.port = port;
1748 
1749 		LPCWSTR str;
1750 
1751 		try {
1752 			str = cast(LPCWSTR) toUTFz!(immutable(wchar)*)(host);
1753 		} catch (Exception e) {
1754 			setInternalError!"toUTFz"(Status.ERROR, e.msg);
1755 			return NetworkAddress.init;
1756 		}
1757 
1758 		error_t err = cast(error_t) GetAddrInfoW(str, cast(LPCWSTR) wPort, &hints, &infos);
1759 		scope(exit) FreeAddrInfoW(infos);
1760 		if (err != EWIN.WSA_OK) {
1761 			setInternalError!"GetAddrInfoW"(Status.ABORT, string.init, err);
1762 			return NetworkAddress.init;
1763 		}
1764 
1765 		ubyte* pAddr = cast(ubyte*) infos.ai_addr;
1766 		ubyte* data = cast(ubyte*) addr.sockAddr;
1767 		data[0 .. infos.ai_addrlen] = pAddr[0 .. infos.ai_addrlen]; // perform bit copy
1768 		static if (LOG) try log("GetAddrInfoW Successfully resolved DNS to: " ~ addr.toAddressString());
1769 		catch (Exception e){}
1770 		return addr;
1771 	}
1772 
1773 	pragma(inline, true)
1774 	void setInternalError(string TRACE)(in Status s, in string details = "", in error_t error = EWIN.ERROR_ACCESS_DENIED)
1775 	{
1776 		if (details.length > 0)
1777 			m_status.text = TRACE ~ ": " ~ details;
1778 		else
1779 			m_status.text = TRACE;
1780 		m_error = error;
1781 		m_status.code = s;
1782 		static if(LOG) log(m_status);
1783 	}
1784 private:
1785 	bool onMessage(MSG msg)
1786 	{
1787 		m_status = StatusInfo.init;
1788 		switch (msg.message) {
1789 			case WM_TCP_SOCKET:
1790 				auto evt = LOWORD(msg.lParam);
1791 				auto err = HIWORD(msg.lParam);
1792 				if (!onTCPEvent(evt, err, cast(fd_t)msg.wParam)) {
1793 
1794 					if (evt == FD_ACCEPT)
1795 						setInternalError!"del@TCPAccept.ERROR"(Status.ERROR);
1796 					else {
1797 						try {
1798 							TCPEventHandler cb = m_tcpHandlers.get(cast(fd_t)msg.wParam);
1799 							cb(TCPEvent.ERROR);
1800 						}
1801 						catch (Exception e) {
1802 							// An Error callback should never fail...
1803 							setInternalError!"del@TCPEvent.ERROR"(Status.ERROR);
1804 							// assert(false, evt.to!string ~ " & " ~ m_status.to!string ~ " & " ~ m_error.to!string);
1805 						}
1806 					}
1807 				}
1808 				break;
1809 			case WM_UDP_SOCKET:
1810 				auto evt = LOWORD(msg.lParam);
1811 				auto err = HIWORD(msg.lParam);
1812 				if (!onUDPEvent(evt, err, cast(fd_t)msg.wParam)) {
1813 					try {
1814 						UDPHandler cb = m_udpHandlers.get(cast(fd_t)msg.wParam);
1815 						cb(UDPEvent.ERROR);
1816 					}
1817 					catch (Exception e) {
1818 						// An Error callback should never fail...
1819 						setInternalError!"del@UDPEvent.ERROR"(Status.ERROR);
1820 					}
1821 				}
1822 				break;
1823 			case WM_TIMER:
1824 				static if (LOG) try log("Timer callback: " ~ m_timer.fd.to!string); catch (Throwable) {}
1825 				TimerHandler cb;
1826 				bool cached = (m_timer.fd == cast(fd_t)msg.wParam);
1827 				try {
1828 					if (cached)
1829 						cb = m_timer.cb;
1830 					else
1831 						cb = m_timerHandlers.get(cast(fd_t)msg.wParam);
1832 					cb.ctxt.rearmed = false;
1833 					cb();
1834 
1835 					if (cb.ctxt.oneShot && !cb.ctxt.rearmed)
1836 						kill(cb.ctxt);
1837 
1838 				}
1839 				catch (Exception e) {
1840 					// An Error callback should never fail...
1841 					setInternalError!"del@TimerHandler"(Status.ERROR, e.msg);
1842 				}
1843 
1844 				break;
1845 			case WM_USER_EVENT:
1846 				static if (LOG) log("User event");
1847 
1848 				ulong uwParam = cast(ulong)msg.wParam;
1849 				ulong ulParam = cast(ulong)msg.lParam;
1850 
1851 				ulong payloadAddr = (ulParam << 32) | uwParam;
1852 				void* payloadPtr = cast(void*) payloadAddr;
1853 				shared AsyncSignal ctxt = cast(shared AsyncSignal) payloadPtr;
1854 
1855 				static if (LOG) try log("Got notification in : " ~ m_hwnd.to!string ~ " pointer: " ~ payloadPtr.to!string); catch (Throwable) {}
1856 				try {
1857 					assert(ctxt.id != 0);
1858 					ctxt.handler();
1859 				}
1860 				catch (Exception e) {
1861 					setInternalError!"WM_USER_EVENT@handler"(Status.ERROR);
1862 				}
1863 				break;
1864 			case WM_USER_SIGNAL:
1865 				static if (LOG) log("User signal");
1866 
1867 				ulong uwParam = cast(ulong)msg.wParam;
1868 				ulong ulParam = cast(ulong)msg.lParam;
1869 
1870 				ulong payloadAddr = (ulParam << 32) | uwParam;
1871 				void* payloadPtr = cast(void*) payloadAddr;
1872 				AsyncNotifier ctxt = cast(AsyncNotifier) payloadPtr;
1873 
1874 				try {
1875 					ctxt.handler();
1876 				}
1877 				catch (Exception e) {
1878 					setInternalError!"WM_USER_SIGNAL@handler"(Status.ERROR);
1879 				}
1880 				break;
1881 			default: return false; // not handled, sends to wndProc
1882 		}
1883 		return true;
1884 	}
1885 
1886 	bool onUDPEvent(WORD evt, WORD err, fd_t sock) {
1887 		m_status = StatusInfo.init;
1888 		try{
1889 			if (m_udpHandlers.get(sock) == UDPHandler.init)
1890 				return false;
1891 		}	catch (Throwable) {}
1892 		if (sock == 0) { // highly unlikely...
1893 			setInternalError!"onUDPEvent"(Status.ERROR, "no socket defined");
1894 			return false;
1895 		}
1896 		if (err) {
1897 			setInternalError!"onUDPEvent"(Status.ERROR, string.init, cast(error_t)err);
1898 			try {
1899 				//log("CLOSE FD#" ~ sock.to!string);
1900 				(m_udpHandlers)[sock](UDPEvent.ERROR);
1901 			} catch (Throwable) { // can't do anything about this...
1902 			}
1903 			return false;
1904 		}
1905 
1906 		UDPHandler cb;
1907 		switch(evt) {
1908 			default: break;
1909 			case FD_READ:
1910 				try {
1911 					static if (LOG) log("READ FD#" ~ sock.to!string);
1912 					cb = m_udpHandlers.get(sock);
1913 					assert(cb != UDPHandler.init, "Socket " ~ sock.to!string ~ " could not yield a callback");
1914 					cb(UDPEvent.READ);
1915 				}
1916 				catch (Exception e) {
1917 					setInternalError!"del@TCPEvent.READ"(Status.ABORT);
1918 					return false;
1919 				}
1920 				break;
1921 			case FD_WRITE:
1922 				try {
1923 					static if (LOG) log("WRITE FD#" ~ sock.to!string);
1924 					cb = m_udpHandlers.get(sock);
1925 					assert(cb != UDPHandler.init, "Socket " ~ sock.to!string ~ " could not yield a callback");
1926 					cb(UDPEvent.WRITE);
1927 				}
1928 				catch (Exception e) {
1929 					setInternalError!"del@TCPEvent.WRITE"(Status.ABORT);
1930 					return false;
1931 				}
1932 				break;
1933 		}
1934 		return true;
1935 	}
1936 
1937 	bool onTCPEvent(WORD evt, WORD err, fd_t sock) {
1938 		m_status = StatusInfo.init;
1939 		try{
1940 			if (m_tcpHandlers.get(sock) == TCPEventHandler.init && m_connHandlers.get(sock) == TCPAcceptHandler.init)
1941 				return false;
1942 		} catch (Throwable) {}
1943 		if (sock == 0) { // highly unlikely...
1944 			setInternalError!"onTCPEvent"(Status.ERROR, "no socket defined");
1945 			return false;
1946 		}
1947 		if (err) {
1948 			setInternalError!"onTCPEvent"(Status.ERROR, string.init, cast(error_t)err);
1949 			try {
1950 				//log("CLOSE FD#" ~ sock.to!string);
1951 				(m_tcpHandlers)[sock](TCPEvent.ERROR);
1952 			} catch (Throwable) { // can't do anything about this...
1953 			}
1954 			return false;
1955 		}
1956 
1957 		TCPEventHandler cb;
1958 		switch(evt) {
1959 			default: break;
1960 			case FD_ACCEPT:
1961 				version(Distributed) gs_mtx.lock_nothrow();
1962 
1963 				static if (LOG) log("TCP Handlers: " ~ m_tcpHandlers.length.to!string);
1964 				static if (LOG) log("Accepting connection");
1965 				/// Let another listener take the next connection
1966 				TCPAcceptHandler list;
1967 				try list = m_connHandlers[sock]; catch (Throwable) { assert(false, "Listening on an invalid socket..."); }
1968 				scope(exit) {
1969 					/// The connection rotation mechanism is handled by the TCPListenerDistMixins
1970 					/// when registering the same AsyncTCPListener object on multiple event loops.
1971 					/// This allows to even out the CPU usage on a server instance.
1972 					version(Distributed)
1973 					{
1974 						HWND hwnd = list.ctxt.next(m_hwnd);
1975 						if (hwnd !is HWND.init) {
1976 							int error = WSAAsyncSelect(sock, hwnd, WM_TCP_SOCKET, FD_ACCEPT);
1977 							if (catchSocketError!"WSAAsyncSelect.NEXT()=> HWND"(error)) {
1978 								error = WSAAsyncSelect(sock, m_hwnd, WM_TCP_SOCKET, FD_ACCEPT);
1979 								if (catchSocketError!"WSAAsyncSelect"(error))
1980 									assert(false, "Could not set listener back to window HANDLE " ~ m_hwnd.to!string);
1981 							}
1982 						}
1983 						else static if (LOG) log("Returned init!!");
1984 						gs_mtx.unlock_nothrow();
1985 					}
1986 				}
1987 
1988 				NetworkAddress addr;
1989 				addr.family = AF_INET;
1990 				int addrlen = addr.sockAddrLen;
1991 				fd_t csock = WSAAccept(sock, addr.sockAddr, &addrlen, null, 0);
1992 
1993 				if (catchSocketError!"WSAAccept"(csock, INVALID_SOCKET)) {
1994 					if (m_error == WSAEFAULT) { // not enough space for sockaddr
1995 						addr.family = AF_INET6;
1996 						addrlen = addr.sockAddrLen;
1997 						csock = WSAAccept(sock, addr.sockAddr, &addrlen, null, 0);
1998 						if (catchSocketError!"WSAAccept"(csock, INVALID_SOCKET))
1999 							return false;
2000 					}
2001 					else return false;
2002 				}
2003 
2004 				int ok = WSAAsyncSelect(csock, m_hwnd, WM_TCP_SOCKET, FD_CONNECT|FD_READ|FD_WRITE|FD_CLOSE);
2005 				if ( catchSocketError!"WSAAsyncSelect"(ok) )
2006 					return false;
2007 
2008 				static if (LOG) log("Connection accepted: " ~ csock.to!string);
2009 
2010 				AsyncTCPConnection conn;
2011 				try conn = ThreadMem.alloc!AsyncTCPConnection(m_evLoop);
2012 				catch (Exception e) { assert(false, "Failed allocation"); }
2013 				conn.peer = addr;
2014 				conn.socket = csock;
2015 				conn.inbound = true;
2016 
2017 				try {
2018 					// Do the callback to get a handler
2019 					cb = list(conn);
2020 				}
2021 				catch(Exception e) {
2022 					setInternalError!"onConnected"(Status.EVLOOP_FAILURE);
2023 					return false;
2024 				}
2025 
2026 				try {
2027 					m_tcpHandlers[csock] = cb; // keep the handler to setup the connection
2028 					static if (LOG) log("ACCEPT&CONNECT FD#" ~ csock.to!string);
2029 					*conn.connected = true;
2030 					cb(TCPEvent.CONNECT);
2031 				}
2032 				catch (Exception e) {
2033 					setInternalError!"m_tcpHandlers.opIndexAssign"(Status.ABORT);
2034 					return false;
2035 				}
2036 				break;
2037 			case FD_CONNECT:
2038 				try {
2039 					static if (LOG) log("CONNECT FD#" ~ sock.to!string);
2040 					cb = m_tcpHandlers.get(sock);
2041 					if (cb == TCPEventHandler.init) break;//, "Socket " ~ sock.to!string ~ " could not yield a callback");
2042 					*cb.conn.connecting = true;
2043 				}
2044 				catch(Exception e) {
2045 					setInternalError!"del@TCPEvent.CONNECT"(Status.ABORT);
2046 					return false;
2047 				}
2048 				break;
2049 			case FD_READ:
2050 				try {
2051 					static if (LOG) log("READ FD#" ~ sock.to!string);
2052 					cb = m_tcpHandlers.get(sock);
2053 					if (cb == TCPEventHandler.init) break; //, "Socket " ~ sock.to!string ~ " could not yield a callback");
2054 					if (!cb.conn) break;
2055 					if (*cb.conn.connected == false && *cb.conn.connecting) {
2056 						static if (LOG) log("TCPEvent CONNECT FD#" ~ sock.to!string);
2057 
2058 						*cb.conn.connecting = false;
2059 						*cb.conn.connected = true;
2060 						cb(TCPEvent.CONNECT);
2061 					}
2062 					else {
2063 						static if (LOG) log("TCPEvent READ FD#" ~ sock.to!string);
2064 						cb(TCPEvent.READ);
2065 					}
2066 				}
2067 				catch (Exception e) {
2068 					setInternalError!"del@TCPEvent.READ"(Status.ABORT);
2069 					return false;
2070 				}
2071 				break;
2072 			case FD_WRITE:
2073 				// todo: don't send the first write for consistency with epoll?
2074 
2075 				try {
2076 					//import std.stdio;
2077 					static if (LOG) log("WRITE FD#" ~ sock.to!string);
2078 					cb = m_tcpHandlers.get(sock);
2079 					if (cb == TCPEventHandler.init) break;//assert(cb != TCPEventHandler.init, "Socket " ~ sock.to!string ~ " could not yield a callback");
2080 					if (!cb.conn) break;
2081 					if (*cb.conn.connected == false && *cb.conn.connecting) {
2082 						*cb.conn.connecting = false;
2083 						*cb.conn.connected = true;
2084 						cb(TCPEvent.CONNECT);
2085 					}
2086 					else {
2087 						cb(TCPEvent.WRITE);
2088 					}
2089 				}
2090 				catch (Exception e) {
2091 					setInternalError!"del@TCPEvent.WRITE"(Status.ABORT);
2092 					return false;
2093 				}
2094 				break;
2095 			case FD_CLOSE:
2096 				// called after shutdown()
2097 				INT ret;
2098 				bool connected = true;
2099 				try {
2100 					static if (LOG) log("CLOSE FD#" ~ sock.to!string);
2101 					if (sock in m_tcpHandlers) {
2102 						cb = m_tcpHandlers.get(sock);
2103 						if (*cb.conn.connected || *cb.conn.connecting) {
2104 							cb(TCPEvent.CLOSE);
2105 							*cb.conn.connecting = false;
2106 							*cb.conn.connected = false;
2107 						} else
2108 							connected = false;
2109 					}
2110 					else
2111 						connected = false;
2112 				}
2113 				catch (Exception e) {
2114 					if (m_status.code == Status.OK)
2115 						setInternalError!"del@TCPEvent.CLOSE"(Status.ABORT);
2116 					return false;
2117 				}
2118 
2119 				closeSocket(sock, connected, true); // as necessary: invokes m_tcpHandlers.remove(fd), shutdown, closesocket
2120 
2121 				break;
2122 		}
2123 		return true;
2124 	}
2125 
2126 	bool initUDPSocket(fd_t fd, AsyncUDPSocket ctxt)
2127 	{
2128 		INT err;
2129 		static if (LOG) log("Binding to UDP " ~ ctxt.local.toString());
2130 
2131 		if (!setOption(fd, TCPOption.REUSEADDR, true)) {
2132 			closesocket(fd);
2133 			return false;
2134 		}
2135 
2136 		err = .bind(fd, ctxt.local.sockAddr, ctxt.local.sockAddrLen);
2137 		if (catchSocketError!"bind"(err)) {
2138 			closesocket(fd);
2139 			return false;
2140 		}
2141 		err = WSAAsyncSelect(fd, m_hwnd, WM_UDP_SOCKET, FD_READ | FD_WRITE);
2142 		if (catchSocketError!"WSAAsyncSelect"(err)) {
2143 			closesocket(fd);
2144 			return false;
2145 		}
2146 
2147 		return true;
2148 	}
2149 
2150 	bool initTCPListener(fd_t fd, AsyncTCPListener ctxt, bool reusing = false)
2151 	in {
2152 		assert(m_threadId == GetCurrentThreadId());
2153 		assert(ctxt.local !is NetworkAddress.init);
2154 	}
2155 	body {
2156 		INT err;
2157 		if (!reusing) {
2158 			err = .bind(fd, ctxt.local.sockAddr, ctxt.local.sockAddrLen);
2159 			if (catchSocketError!"bind"(err)) {
2160 				closesocket(fd);
2161 				return false;
2162 			}
2163 
2164 			err = .listen(fd, 128);
2165 			if (catchSocketError!"listen"(err)) {
2166 				closesocket(fd);
2167 				return false;
2168 			}
2169 
2170 			err = WSAAsyncSelect(fd, m_hwnd, WM_TCP_SOCKET, FD_ACCEPT);
2171 			if (catchSocketError!"WSAAsyncSelect"(err)) {
2172 				closesocket(fd);
2173 				return false;
2174 			}
2175 		}
2176 
2177 		return true;
2178 	}
2179 
2180 	bool initTCPConnection(fd_t fd, AsyncTCPConnection ctxt)
2181 	in {
2182 		assert(ctxt.peer !is NetworkAddress.init);
2183 		assert(ctxt.peer.port != 0, "Connecting to an invalid port");
2184 	}
2185 	body {
2186 		INT err;
2187 		NetworkAddress bind_addr;
2188 		bind_addr.family = ctxt.peer.family;
2189 
2190 		if (ctxt.peer.family == AF_INET)
2191 			bind_addr.sockAddrInet4.sin_addr.s_addr = 0;
2192 		else if (ctxt.peer.family == AF_INET6)
2193 			bind_addr.sockAddrInet6.sin6_addr.s6_addr[] = 0;
2194 		else {
2195 			status.code = Status.ERROR;
2196 			status.text = "Invalid NetworkAddress.family " ~ ctxt.peer.family.to!string;
2197 			return false;
2198 		}
2199 
2200 		err = .bind(fd, bind_addr.sockAddr, bind_addr.sockAddrLen);
2201 		if ( catchSocketError!"bind"(err) )
2202 			return false;
2203 		err = WSAAsyncSelect(fd, m_hwnd, WM_TCP_SOCKET, FD_CONNECT|FD_READ|FD_WRITE|FD_CLOSE);
2204 		if ( catchSocketError!"WSAAsyncSelect"(err) )
2205 			return false;
2206 		err = .connect(fd, ctxt.peer.sockAddr, ctxt.peer.sockAddrLen);
2207 
2208 		auto errors = [	tuple(cast(size_t) SOCKET_ERROR, EWIN.WSAEWOULDBLOCK, Status.ASYNC) ];
2209 
2210 		if (catchSocketErrorsEq!"connectEQ"(err, errors)) {
2211 			*ctxt.connecting = true;
2212 			return true;
2213 		}
2214 		else if (catchSocketError!"connect"(err))
2215 			return false;
2216 
2217 		return true;
2218 	}
2219 
2220 	bool catchErrors(string TRACE, T)(T val, Tuple!(T, Status)[] cmp ...)
2221 		if (isIntegral!T)
2222 	{
2223 		foreach (validator ; cmp) {
2224 			if (val == validator[0]) {
2225 				m_status.text = TRACE;
2226 				m_status.code = validator[1];
2227 				if (m_status.code == Status.EVLOOP_TIMEOUT) {
2228 					static if (LOG) log(m_status);
2229 					break;
2230 				}
2231 				m_error = GetLastErrorSafe();
2232 				static if(LOG) log(m_status);
2233 				return true;
2234 			}
2235 		}
2236 		return false;
2237 	}
2238 
2239 	pragma(inline, true)
2240 	bool catchSocketErrors(string TRACE, T)(T val, Tuple!(T, Status)[] cmp ...)
2241 		if (isIntegral!T)
2242 	{
2243 		foreach (validator ; cmp) {
2244 			if (val == validator[0]) {
2245 				m_status.text = TRACE;
2246 				m_error = WSAGetLastErrorSafe();
2247 				m_status.status = validator[1];
2248 				static if(LOG) log(m_status);
2249 				return true;
2250 			}
2251 		}
2252 		return false;
2253 	}
2254 
2255 	bool catchSocketErrorsEq(string TRACE, T)(T val, Tuple!(T, error_t, Status)[] cmp ...)
2256 		if (isIntegral!T)
2257 	{
2258 		error_t err;
2259 		foreach (validator ; cmp) {
2260 			if (val == validator[0]) {
2261 				if (err is EWIN.init) err = WSAGetLastErrorSafe();
2262 				if (err == validator[1]) {
2263 					m_status.text = TRACE;
2264 					m_error = WSAGetLastErrorSafe();
2265 					m_status.code = validator[2];
2266 					static if(LOG) log(m_status);
2267 					return true;
2268 				}
2269 			}
2270 		}
2271 		return false;
2272 	}
2273 
2274 	pragma(inline, true)
2275 	bool catchSocketError(string TRACE, T)(T val, T cmp = SOCKET_ERROR)
2276 		if (isIntegral!T)
2277 	{
2278 		if (val == cmp) {
2279 			m_status.text = TRACE;
2280 			m_error = WSAGetLastErrorSafe();
2281 			m_status.code = Status.ABORT;
2282 			static if(LOG) log(m_status);
2283 			return true;
2284 		}
2285 		return false;
2286 	}
2287 
2288 	pragma(inline, true)
2289 	error_t WSAGetLastErrorSafe() {
2290 		try {
2291 			return cast(error_t) WSAGetLastError();
2292 		} catch(Exception e) {
2293 			return EWIN.ERROR_ACCESS_DENIED;
2294 		}
2295 	}
2296 
2297 	pragma(inline, true)
2298 	error_t GetLastErrorSafe() {
2299 		try {
2300 			return cast(error_t) GetLastError();
2301 		} catch(Exception e) {
2302 			return EWIN.ERROR_ACCESS_DENIED;
2303 		}
2304 	}
2305 
2306 	void log(StatusInfo val)
2307 	{
2308 		static if (LOG) {
2309 			import std.stdio;
2310 			try {
2311 				writeln("Backtrace: ", m_status.text);
2312 				writeln(" | Status:  ", m_status.code);
2313 				writeln(" | Error: " , m_error);
2314 				if ((m_error in EWSAMessages) !is null)
2315 					writeln(" | Message: ", EWSAMessages[m_error]);
2316 			} catch(Exception e) {
2317 				return;
2318 			}
2319 		}
2320 	}
2321 
2322 	void log(T)(lazy T val)
2323 	{
2324 		static if (LOG) {
2325 			import std.stdio;
2326 			try {
2327 				writeln(val);
2328 			} catch(Exception e) {
2329 				return;
2330 			}
2331 		}
2332 	}
2333 
2334 }
2335 
2336 mixin template COSocketMixins() {
2337 
2338 	private CleanupData m_impl;
2339 
2340 	struct CleanupData {
2341 		bool connected;
2342 		bool connecting;
2343 	}
2344 
2345 	@property bool* connecting() {
2346 		return &m_impl.connecting;
2347 	}
2348 
2349 	@property bool* connected() {
2350 		return &m_impl.connected;
2351 	}
2352 
2353 }
2354 /*
2355 mixin template TCPListenerDistMixins()
2356 {
2357 	import core.sys.windows.windows : HWND;
2358 	import libasync.internals.hashmap : HashMap;
2359 	import core.sync.mutex;
2360 	private {
2361 		bool m_dist;
2362 
2363 		Tuple!(WinReference, bool*) m_handles;
2364 		__gshared HashMap!(fd_t, Tuple!(WinReference, bool*)) gs_dist;
2365 		__gshared Mutex gs_mutex;
2366 	}
2367 
2368 	/// The TCP Listener schedules distributed connection handlers based on
2369 	/// the event loops that are using the same AsyncTCPListener object.
2370 	/// This is done by using WSAAsyncSelect on a different window after each
2371 	/// accept TCPEvent.
2372 	class WinReference {
2373 		private {
2374 			struct Item {
2375 				HWND handle;
2376 				bool active;
2377 			}
2378 
2379 			Item[] m_items;
2380 		}
2381 
2382 		this(HWND hndl, bool b) {
2383 			append(hndl, b);
2384 		}
2385 
2386 		void append(HWND hndl, bool b) {
2387 			m_items ~= Item(hndl, b);
2388 		}
2389 
2390 		HWND next(HWND me) {
2391 			Item[] items;
2392 			synchronized(gs_mutex)
2393 				items = m_items;
2394 			if (items.length == 1)
2395 				return me;
2396 			foreach (i, item; items) {
2397 				if (item.active == true) {
2398 					m_items[i].active = false; // remove responsibility
2399 					if (m_items.length <= i + 1) {
2400 						m_items[0].active = true; // set responsibility
2401 						auto ret = m_items[0].handle;
2402 						return ret;
2403 					}
2404 					else {
2405 						m_items[i + 1].active = true;
2406 						auto ret = m_items[i + 1].handle;
2407 						return ret;
2408 					}
2409 				}
2410 
2411 			}
2412 			assert(false);
2413 		}
2414 
2415 	}
2416 
2417 	void init(HWND hndl, fd_t sock) {
2418 		try {
2419 			if (!gs_mutex) {
2420 				gs_mutex = new Mutex;
2421 			}
2422 			synchronized(gs_mutex) {
2423 				m_handles = gs_dist.get(sock);
2424 				if (m_handles == typeof(m_handles).init) {
2425 					gs_dist[sock] = Tuple!(WinReference, bool*)(new WinReference(hndl, true), &m_dist);
2426 					m_handles = gs_dist.get(sock);
2427 					assert(m_handles != typeof(m_handles).init);
2428 				}
2429 				else {
2430 					m_handles[0].append(hndl, false);
2431 					*m_handles[1] = true; // set first thread to dist
2432 					m_dist = true; // set this thread to dist
2433 				}
2434 			}
2435 		} catch (Exception e) {
2436 			assert(false, e.toString());
2437 		}
2438 
2439 	}
2440 
2441 	HWND next(HWND me) {
2442 		try {
2443 			if (!m_dist)
2444 				return HWND.init;
2445 			return m_handles[0].next(me);
2446 		}
2447 		catch (Exception e) {
2448 			assert(false, e.toString());
2449 		}
2450 	}
2451 
2452 }*/
2453 private class DWHandlerInfo {
2454 	DWHandler handler;
2455 	Array!DWChangeInfo buffer;
2456 
2457 	this(DWHandler cb) {
2458 		handler = cb;
2459 	}
2460 }
2461 
2462 private final class DWFolderWatcher {
2463 	import libasync.internals.path;
2464 private:
2465 	EventLoop m_evLoop;
2466 	fd_t m_fd;
2467 	bool m_recursive;
2468 	HANDLE m_handle;
2469 	Path m_path;
2470 	DWFileEvent m_events;
2471 	DWHandlerInfo m_handler; // contains buffer
2472 	shared AsyncSignal m_signal;
2473 	ubyte[FILE_NOTIFY_INFORMATION.sizeof + MAX_PATH + 1] m_buffer;
2474 	DWORD m_bytesTransferred;
2475 public:
2476 	this(EventLoop evl, in fd_t fd, in HANDLE hndl, in Path path, in DWFileEvent events, DWHandlerInfo handler, bool recursive) {
2477 		m_fd = fd;
2478 		m_recursive = recursive;
2479 		m_handle = cast(HANDLE)hndl;
2480 		m_evLoop = evl;
2481 		m_path = path;
2482 		m_handler = handler;
2483 
2484 		m_signal = new shared AsyncSignal(m_evLoop);
2485 		m_signal.run(&onChanged);
2486 		triggerWatch();
2487 	}
2488 package:
2489 	void close() {
2490 		CloseHandle(m_handle);
2491 		m_signal.kill();
2492 	}
2493 
2494 	void triggerChanged() {
2495 		m_signal.trigger();
2496 	}
2497 
2498 	void onChanged() {
2499 		ubyte[] result = m_buffer.ptr[0 .. m_bytesTransferred];
2500 		do {
2501 			assert(result.length >= FILE_NOTIFY_INFORMATION.sizeof);
2502 			auto fni = cast(FILE_NOTIFY_INFORMATION*)result.ptr;
2503 			DWFileEvent kind;
2504 			switch( fni.Action ){
2505 				default: kind = DWFileEvent.MODIFIED; break;
2506 				case 0x1: kind = DWFileEvent.CREATED; break;
2507 				case 0x2: kind = DWFileEvent.DELETED; break;
2508 				case 0x3: kind = DWFileEvent.MODIFIED; break;
2509 				case 0x4: kind = DWFileEvent.MOVED_FROM; break;
2510 				case 0x5: kind = DWFileEvent.MOVED_TO; break;
2511 			}
2512 			string filename = to!string(fni.FileName.ptr[0 .. fni.FileNameLength/2]); // FileNameLength = #bytes, FileName=WCHAR[]
2513 			m_handler.buffer.insert(DWChangeInfo(kind, m_path ~ Path(filename)));
2514 			if( fni.NextEntryOffset == 0 ) break;
2515 			result = result[fni.NextEntryOffset .. $];
2516 		} while(result.length > 0);
2517 
2518 		triggerWatch();
2519 
2520 		m_handler.handler();
2521 	}
2522 
2523 	void triggerWatch() {
2524 
2525 		static UINT notifications = FILE_NOTIFY_CHANGE_FILE_NAME|FILE_NOTIFY_CHANGE_DIR_NAME|
2526 			FILE_NOTIFY_CHANGE_SIZE|FILE_NOTIFY_CHANGE_LAST_WRITE;
2527 
2528 		OVERLAPPED* overlapped = ThreadMem.alloc!OVERLAPPED();
2529 		overlapped.Internal = 0;
2530 		overlapped.InternalHigh = 0;
2531 		overlapped.Offset = 0;
2532 		overlapped.OffsetHigh = 0;
2533 		overlapped.Pointer = cast(void*)this;
2534 		import std.stdio;
2535 		DWORD bytesReturned;
2536 		BOOL success = ReadDirectoryChangesW(m_handle, m_buffer.ptr, m_buffer.length, cast(BOOL) m_recursive, notifications, &bytesReturned, overlapped, &onIOCompleted);
2537 
2538 		static if (DEBUG) {
2539 			import std.stdio;
2540 			if (!success)
2541 				writeln("Failed to call ReadDirectoryChangesW: " ~ EWSAMessages[GetLastError().to!EWIN]);
2542 		}
2543 	}
2544 
2545 	@property fd_t fd() const {
2546 		return m_fd;
2547 	}
2548 
2549 	@property HANDLE handle() const {
2550 		return cast(HANDLE) m_handle;
2551 	}
2552 
2553 	static nothrow extern(System)
2554 	{
2555 		void onIOCompleted(DWORD dwError, DWORD cbTransferred, OVERLAPPED* overlapped)
2556 		{
2557 			import std.stdio;
2558 			DWFolderWatcher watcher = cast(DWFolderWatcher)(overlapped.Pointer);
2559 			watcher.m_bytesTransferred = cbTransferred;
2560 			try ThreadMem.free(overlapped); catch (Throwable) {}
2561 
2562 			static if (DEBUG) {
2563 				if (dwError != 0)
2564 					try writeln("Diretory watcher error: "~EWSAMessages[dwError.to!EWIN]); catch (Throwable) {}
2565 			}
2566 			try watcher.triggerChanged();
2567 			catch (Exception e) {
2568 				static if (DEBUG) {
2569 					try writeln("Failed to trigger change"); catch (Throwable) {}
2570 				}
2571 			}
2572 		}
2573 	}
2574 }
2575 
2576 /// Information for a single Windows overlapped I/O request;
2577 /// uses a freelist to minimize allocations.
2578 struct AsyncOverlapped
2579 {
2580 	align (1):
2581 	/// Required for Windows overlapped I/O requests
2582 	OVERLAPPED overlapped;
2583 	align:
2584 
2585 	union
2586 	{
2587 		AsyncAcceptRequest* accept;
2588 		AsyncReceiveRequest* receive;
2589 		AsyncSendRequest* send;
2590 	}
2591 
2592 	@property void hEvent(HANDLE hEvent) @safe pure @nogc nothrow
2593 	{ overlapped.hEvent = hEvent; }
2594 
2595 	import libasync.internals.freelist;
2596 	mixin FreeList!1_000;
2597 }
2598 
2599 nothrow extern(System)
2600 {
2601 	void onOverlappedReceiveComplete(error_t error, DWORD recvCount, AsyncOverlapped* overlapped, DWORD flags)
2602 	{
2603 		.tracef("onOverlappedReceiveComplete: error: %s, recvCount: %s, flags: %s", error, recvCount, flags);
2604 
2605 		auto request = overlapped.receive;
2606 
2607 		if (error == EWIN.WSA_OPERATION_ABORTED) {
2608 			if (request.message) assumeWontThrow(NetworkMessage.free(request.message));
2609 			assumeWontThrow(AsyncReceiveRequest.free(request));
2610 			return;
2611 		}
2612 
2613 		auto socket = overlapped.receive.socket;
2614 		auto eventLoop = &socket.m_evLoop.m_evLoop;
2615 		if (eventLoop.m_status.code != Status.OK) return;
2616 
2617 		eventLoop.m_status = StatusInfo.init;
2618 
2619 		assumeWontThrow(AsyncOverlapped.free(overlapped));
2620 		if (error == 0) {
2621 			if (!request.message) {
2622 				eventLoop.m_completedSocketReceives.insertBack(request);
2623 				return;
2624 			} else if (recvCount > 0 || !socket.connectionOriented) {
2625 				request.message.count = request.message.count + recvCount;
2626 				if (request.exact && !request.message.receivedAll) {
2627 					eventLoop.submitRequest(request);
2628 					return;
2629 				} else {
2630 					eventLoop.m_completedSocketReceives.insertBack(request);
2631 					return;
2632 				}
2633 			} 
2634 		} else if (recvCount > 0) {
2635 			eventLoop.m_completedSocketReceives.insertBack(request);
2636 			return;
2637 		}
2638 
2639 		assumeWontThrow(NetworkMessage.free(request.message));
2640 		assumeWontThrow(AsyncReceiveRequest.free(request));
2641 
2642 		if (error == WSAECONNRESET || error == WSAECONNABORTED || recvCount == 0) {
2643 			socket.kill();
2644 			socket.handleClose();
2645 			return;
2646 		}
2647 
2648 		eventLoop.m_status.code = Status.ABORT;
2649 		socket.kill();
2650 		socket.handleError();
2651 	}
2652 
2653 	void onOverlappedSendComplete(error_t error, DWORD sentCount, AsyncOverlapped* overlapped, DWORD flags)
2654 	{
2655 		.tracef("onOverlappedSendComplete: error: %s, sentCount: %s, flags: %s", error, sentCount, flags);
2656 
2657 		auto request = overlapped.send;
2658 
2659 		if (error == EWIN.WSA_OPERATION_ABORTED) {
2660 			assumeWontThrow(NetworkMessage.free(request.message));
2661 			assumeWontThrow(AsyncSendRequest.free(request));
2662 			return;
2663 		}
2664 
2665 		auto socket = overlapped.send.socket;
2666 		auto eventLoop = &socket.m_evLoop.m_evLoop;
2667 		if (eventLoop.m_status.code != Status.OK) return;
2668 
2669 		eventLoop.m_status = StatusInfo.init;
2670 
2671 		assumeWontThrow(AsyncOverlapped.free(overlapped));
2672 		if (error == 0) {
2673 			request.message.count = request.message.count + sentCount;
2674 			assert(request.message.sent);
2675 			eventLoop.m_completedSocketSends.insertBack(request);
2676 			return;
2677 		}
2678 
2679 		assumeWontThrow(NetworkMessage.free(request.message));
2680 		assumeWontThrow(AsyncSendRequest.free(request));
2681 
2682 		if (error == WSAECONNRESET || error == WSAECONNABORTED) {
2683 			socket.kill();
2684 			socket.handleClose();
2685 			return;
2686 		}
2687 
2688 		eventLoop.m_status.code = Status.ABORT;
2689 		socket.kill();
2690 		socket.handleError();
2691 	}
2692 }
2693 
2694 enum WM_TCP_SOCKET = WM_USER+102;
2695 enum WM_UDP_SOCKET = WM_USER+103;
2696 enum WM_USER_EVENT = WM_USER+104;
2697 enum WM_USER_SIGNAL = WM_USER+105;
2698 
2699 nothrow:
2700 
2701 __gshared Vector!(size_t, Malloc) gs_availID;
2702 __gshared size_t gs_maxID;
2703 __gshared core.sync.mutex.Mutex gs_mutex;
2704 
2705 private size_t createIndex() {
2706 	size_t idx;
2707 	import std.algorithm : max;
2708 	try {
2709 		size_t getIdx() {
2710 			if (!gs_availID.empty) {
2711 				immutable size_t ret = gs_availID.back;
2712 				gs_availID.removeBack();
2713 				return ret;
2714 			}
2715 			return 0;
2716 		}
2717 
2718 		synchronized(gs_mutex) {
2719 			idx = getIdx();
2720 			if (idx == 0) {
2721 				import std.range : iota;
2722 				gs_availID.insert( iota(gs_maxID + 1, max(32, gs_maxID * 2 + 1), 1) );
2723 				gs_maxID = gs_availID[$-1];
2724 				idx = getIdx();
2725 			}
2726 		}
2727 	} catch (Exception e) {
2728 		assert(false, "Failed to generate necessary ID for Manual Event waiters: " ~ e.msg);
2729 	}
2730 
2731 	return idx;
2732 }
2733 
2734 void destroyIndex(AsyncTimer ctxt) {
2735 	try {
2736 		synchronized(gs_mutex) gs_availID ~= ctxt.id;
2737 	}
2738 	catch (Exception e) {
2739 		assert(false, "Error destroying index: " ~ e.msg);
2740 	}
2741 }
2742 
2743 shared static this() {
2744 
2745 	try {
2746 		if (!gs_mutex) {
2747 			import core.sync.mutex;
2748 			gs_mutex = new core.sync.mutex.Mutex;
2749 
2750 			gs_availID.reserve(32);
2751 
2752 			foreach (i; gs_availID.length .. gs_availID.capacity) {
2753 				gs_availID.insertBack(i + 1);
2754 			}
2755 
2756 			gs_maxID = 32;
2757 		}
2758 	}
2759 	catch (Throwable) {
2760 		assert(false, "Couldn't reserve necessary space for available Manual Events");
2761 	}
2762 
2763 }
2764 
2765 nothrow extern(System) {
2766 	LRESULT wndProc(HWND wnd, UINT msg, WPARAM wparam, LPARAM lparam)
2767 	{
2768 		auto ptr = cast(void*)GetWindowLongPtrA(wnd, GWLP_USERDATA);
2769 		if (ptr is null)
2770 			return DefWindowProcA(wnd, msg, wparam, lparam);
2771 		auto appl = cast(EventLoopImpl*)ptr;
2772 		MSG obj = MSG(wnd, msg, wparam, lparam, DWORD.init, POINT.init);
2773 		if (appl.onMessage(obj)) {
2774 			static if (DEBUG) {
2775 				if (appl.status.code != Status.OK && appl.status.code != Status.ASYNC) {
2776 					import std.stdio : writeln;
2777 					try { writeln(appl.error, ": ", appl.m_status.text); } catch (Throwable) {}
2778 				}
2779 			}
2780 			return 0;
2781 		}
2782 		else return DefWindowProcA(wnd, msg, wparam, lparam);
2783 	}
2784 
2785 	BOOL PostMessageA(HWND hWnd, UINT Msg, WPARAM wParam, LPARAM lParam);
2786 
2787 }