1 module libasync.windows;
2 
3 version (Windows):
4 
5 import core.atomic;
6 import core.thread : Fiber;
7 import libasync.types;
8 import libasync.internals.hashmap;
9 import libasync.internals.memory;
10 import std.container : Array;
11 import std.string : toStringz;
12 import std.conv : to;
13 import std.datetime : Duration, msecs, seconds;
14 import std.algorithm : min;
15 import libasync.internals.win32;
16 import std.traits : isIntegral;
17 import std.typecons : Tuple, tuple;
18 import std.utf : toUTFz;
19 import core.sync.mutex;
20 import libasync.events;
21 pragma(lib, "ws2_32");
22 pragma(lib, "ole32");
23 alias fd_t = SIZE_T;
24 alias error_t = EWIN;
25 
26 //todo :  see if new connections with SO_REUSEADDR are evenly distributed between threads
27 
28 
29 package struct EventLoopImpl {
30 	pragma(msg, "Using Windows IOCP for events");
31 	
32 private:
33 	HashMap!(fd_t, TCPAcceptHandler) m_connHandlers; // todo: Change this to an array
34 	HashMap!(fd_t, TCPEventHandler) m_tcpHandlers;
35 	HashMap!(fd_t, TimerHandler) m_timerHandlers;
36 	HashMap!(fd_t, UDPHandler) m_udpHandlers;
37 	HashMap!(fd_t, DWHandlerInfo) m_dwHandlers; // todo: Change this to an array too
38 	HashMap!(uint, DWFolderWatcher) m_dwFolders;
39 nothrow:
40 private:
41 	struct TimerCache {
42 		TimerHandler cb;
43 		fd_t fd;
44 	}
45 	TimerCache m_timer;
46 	
47 	EventLoop m_evLoop;
48 	bool m_started;
49 	wstring m_window;
50 	HWND m_hwnd;
51 	DWORD m_threadId;
52 	HANDLE[] m_waitObjects;
53 	ushort m_instanceId;
54 	StatusInfo m_status;
55 	error_t m_error = EWIN.WSA_OK;
56 	__gshared Mutex gs_mtx;
57 package:
58 	@property bool started() const {
59 		return m_started;
60 	}
61 	bool init(EventLoop evl) 
62 	in { assert(!m_started); }
63 	body
64 	{
65 		try if (!gs_mtx)
66 			gs_mtx = new Mutex; catch {}
67 		static ushort j;
68 		assert (j == 0, "Current implementation is only tested with 1 event loop per thread. There are known issues with signals on linux.");
69 		j += 1;
70 		m_status = StatusInfo.init;
71 		
72 		import core.thread;
73 		try Thread.getThis().priority = Thread.PRIORITY_MAX;
74 		catch (Exception e) { assert(false, "Could not set thread priority"); }
75 
76 		m_evLoop = evl;
77 		shared static ushort i;
78 		m_instanceId = i;
79 		core.atomic.atomicOp!"+="(i, cast(ushort) 1);
80 		wstring inststr;
81 		try { inststr = m_instanceId.to!wstring; }
82 		catch (Exception e) {
83 			return false;
84 		}
85 		m_window = "VibeWin32MessageWindow" ~ inststr;
86 		wstring classname = "VibeWin32MessageWindow" ~ inststr;
87 		
88 		LPCWSTR wnz;
89 		LPCWSTR clsn;
90 		try {
91 			wnz = cast(LPCWSTR) m_window.toUTFz!(immutable(wchar)*);
92 			clsn = cast(LPCWSTR) classname.toUTFz!(immutable(wchar)*);
93 		} catch (Exception e) {
94 			setInternalError!"toUTFz"(Status.ERROR, e.msg);
95 			return false;
96 		}
97 		
98 		m_threadId = GetCurrentThreadId();
99 		WNDCLASSW wc;
100 		wc.lpfnWndProc = &wndProc;
101 		wc.lpszClassName = clsn;
102 		RegisterClassW(&wc);
103 		m_hwnd = CreateWindowW(wnz, clsn, 0, 0, 0, 385, 375, HWND_MESSAGE,
104 		                       cast(HMENU) null, null, null);
105 		try log("Window registered: " ~ m_hwnd.to!string); catch{}
106 		auto ptr = cast(ULONG_PTR)cast(void*)&this;
107 		SetWindowLongPtrA(m_hwnd, GWLP_USERDATA, ptr);
108 		assert( cast(EventLoopImpl*)cast(void*)GetWindowLongPtrA(m_hwnd, GWLP_USERDATA) is &this );
109 		WSADATA wd;
110 		m_error = cast(error_t) WSAStartup(0x0202, &wd);
111 		if (m_error == EWIN.WSA_OK)	
112 			m_status.code = Status.OK;
113 		else {
114 			m_status.code = Status.ABORT;
115 			static if(LOG) log(m_status);
116 			return false;
117 		}
118 		assert(wd.wVersion == 0x0202);
119 		m_started = true;
120 		return true;
121 	}
122 	
123 	// todo: find where to call this
124 	void exit() {
125 		cast(void)PostThreadMessageW(m_threadId, WM_QUIT, 0, 0);
126 	}
127 	
128 	@property StatusInfo status() const {
129 		return m_status;
130 	}
131 	
132 	@property string error() const {
133 		string* ptr;
134 		string pv = ((ptr = (m_error in EWSAMessages)) !is null) ? *ptr : string.init;
135 		return pv;
136 	}
137 	
138 	bool loop(Duration timeout = 0.seconds)
139 	in { 
140 		assert(Fiber.getThis() is null); 
141 		assert(m_started);
142 	}
143 	body {
144 		DWORD msTimeout = cast(DWORD) min(timeout.total!"msecs", DWORD.max);
145 		/* 
146 		 * Waits until one or all of the specified objects are in the signaled state
147 		 * http://msdn.microsoft.com/en-us/library/windows/desktop/ms684245%28v=vs.85%29.aspx
148 		*/
149 		DWORD signal = MsgWaitForMultipleObjectsEx(
150 			cast(DWORD)0,
151 			null,
152 			msTimeout,
153 			QS_ALLEVENTS,								
154 			MWMO_ALERTABLE | MWMO_INPUTAVAILABLE		// MWMO_ALERTABLE: Wakes up to execute overlapped hEvent (i/o completion)
155 			// MWMO_INPUTAVAILABLE: Processes key/mouse input to avoid window ghosting
156 			);
157 		
158 		auto errors = 
159 		[ tuple(WAIT_FAILED, Status.EVLOOP_FAILURE) ];	/* WAIT_FAILED: Failed to call MsgWait..() */
160 		
161 		if (signal == WAIT_TIMEOUT)
162 			return true;
163 		
164 		if (catchErrors!"MsgWaitForMultipleObjectsEx"(signal, errors)) {
165 			log("Event Loop Exiting because of error");
166 			return false; 
167 		}
168 		
169 		MSG msg;
170 		while (PeekMessageW(&msg, null, 0, 0, PM_REMOVE)) {
171 			m_status = StatusInfo.init;
172 			TranslateMessage(&msg);
173 			DispatchMessageW(&msg);
174 
175 			if (m_status.code == Status.ERROR) {
176 				log(m_status.text);
177 				return false;
178 			}
179 		}
180 		return true;
181 	}
182 	
183 	fd_t run(AsyncTCPListener ctxt, TCPAcceptHandler del)
184 	{
185 		m_status = StatusInfo.init;
186 		fd_t fd = ctxt.socket;
187 		bool reusing;
188 		if (fd == fd_t.init) {
189 
190 			fd = WSASocketW(cast(int)ctxt.local.family, SOCK_STREAM, IPPROTO_TCP, null, 0, WSA_FLAG_OVERLAPPED);
191 			
192 			if (catchSocketError!("run AsyncTCPConnection")(fd, INVALID_SOCKET))
193 				return 0;
194 			
195 			if (!setOption(fd, TCPOption.REUSEADDR, true))
196 				return 0;
197 			
198 			// todo: defer accept?
199 			
200 			if (ctxt.noDelay) {
201 				if (!setOption(fd, TCPOption.NODELAY, true))
202 					return 0;
203 			}
204 		} else reusing = true;
205 
206 		if (initTCPListener(fd, ctxt, reusing))
207 		{
208 			try {
209 				log("Running listener on socket fd#" ~ fd.to!string);
210 				m_connHandlers[fd] = del;
211 				ctxt.init(m_hwnd, fd);
212 			}
213 			catch (Exception e) {
214 				setInternalError!"m_connHandlers assign"(Status.ERROR, e.msg);
215 				closeSocket(fd, false);
216 				return 0;
217 			}
218 		}
219 		else
220 		{
221 			return 0;
222 		}
223 
224 
225 		return fd;
226 	}
227 	
228 	fd_t run(AsyncTCPConnection ctxt, TCPEventHandler del)
229 	in { 
230 		assert(ctxt.socket == fd_t.init); 
231 		assert(ctxt.peer.family != AF_UNSPEC);
232 	}
233 	body {
234 		m_status = StatusInfo.init;
235 		fd_t fd = ctxt.preInitializedSocket;
236 
237 		if (fd == fd_t.init)
238 			fd = WSASocketW(cast(int)ctxt.peer.family, SOCK_STREAM, IPPROTO_TCP, null, 0, WSA_FLAG_OVERLAPPED);
239 		log("Starting connection at: " ~ fd.to!string);
240 		if (catchSocketError!("run AsyncTCPConnection")(fd, INVALID_SOCKET))
241 			return 0;
242 		
243 		try {
244 			(m_tcpHandlers)[fd] = del;
245 		}
246 		catch (Exception e) {
247 			setInternalError!"m_tcpHandlers assign"(Status.ERROR, e.msg);
248 			return 0;
249 		}
250 		
251 		debug {
252 			TCPEventHandler evh;
253 			try evh = m_tcpHandlers.get(fd);
254 			catch (Exception e) { log("Failed"); return 0; }
255 			assert( evh !is TCPEventHandler.init);
256 		}
257 		
258 		if (ctxt.noDelay) {
259 			if (!setOption(fd, TCPOption.NODELAY, true))
260 				return 0;
261 		}
262 		
263 		if (!initTCPConnection(fd, ctxt)) {
264 			try {
265 				log("Remove event handler for " ~ fd.to!string);
266 				m_tcpHandlers.remove(fd);
267 			}
268 			catch (Exception e) {
269 				setInternalError!"m_tcpHandlers remove"(Status.ERROR, e.msg);
270 			}
271 			
272 			closeSocket(fd, false);
273 			return 0;
274 		}
275 		
276 		
277 		try log("Client started FD#" ~ fd.to!string);
278 		catch{}
279 		return fd;
280 	}
281 	
282 	fd_t run(AsyncUDPSocket ctxt, UDPHandler del) {
283 		m_status = StatusInfo.init;
284 		fd_t fd = ctxt.preInitializedSocket;
285 
286 		if (fd == fd_t.init)
287 			fd = WSASocketW(cast(int)ctxt.local.family, SOCK_DGRAM, IPPROTO_UDP, null, 0, WSA_FLAG_OVERLAPPED);
288 		
289 		if (catchSocketError!("run AsyncUDPSocket")(fd, INVALID_SOCKET))
290 			return 0;
291 		
292 		if (initUDPSocket(fd, ctxt))
293 		{
294 			try {
295 				(m_udpHandlers)[fd] = del;
296 			}
297 			catch (Exception e) {
298 				setInternalError!"m_udpHandlers assign"(Status.ERROR, e.msg);
299 				closeSocket(fd, false);
300 				return 0;
301 			}
302 		}
303 		
304 		try log("UDP Socket started FD#" ~ fd.to!string);
305 		catch{}
306 		
307 		return fd;
308 	}
309 	
310 	fd_t run(shared AsyncSignal ctxt) {
311 		m_status = StatusInfo.init;
312 		try log("Signal subscribed to: " ~ m_hwnd.to!string); catch {}
313 		return (cast(fd_t)m_hwnd);
314 	}
315 	
316 	fd_t run(AsyncNotifier ctxt) {
317 		m_status = StatusInfo.init;
318 		//try log("Running signal " ~ (cast(AsyncNotifier)ctxt).to!string); catch {}
319 		return cast(fd_t) m_hwnd;
320 	}
321 	
322 	fd_t run(AsyncTimer ctxt, TimerHandler del, Duration timeout) {
323 		if (timeout < 0.seconds)
324 			timeout = 0.seconds;
325 		timeout += 10.msecs(); // round up to the next 10 msecs to avoid premature timer events
326 		m_status = StatusInfo.init;
327 		fd_t timer_id = ctxt.id;
328 		if (timer_id == fd_t.init) {
329 			timer_id = createIndex();
330 		}
331 		try log("Timer created: " ~ timer_id.to!string ~ " with timeout: " ~ timeout.total!"msecs".to!string ~ " msecs"); catch {}
332 		
333 		BOOL err;
334 		try err = cast(int)SetTimer(m_hwnd, timer_id, timeout.total!"msecs".to!uint, null);
335 		catch(Exception e) {
336 			setInternalError!"SetTimer"(Status.ERROR);
337 			return 0;
338 		}
339 		
340 		if (err == 0)
341 		{
342 			m_error = GetLastErrorSafe();
343 			m_status.code = Status.ERROR;
344 			m_status.text = "kill(AsyncTimer)";
345 			log(m_status);
346 			return 0;
347 		}
348 		
349 		if (m_timer.fd == fd_t.init) 
350 		{
351 			m_timer.fd = timer_id;
352 			m_timer.cb = del;
353 		}
354 		else {
355 			try
356 			{
357 				(m_timerHandlers)[timer_id] = del;
358 			}
359 			catch (Exception e) {
360 				setInternalError!"HashMap assign"(Status.ERROR);
361 				return 0;
362 			}
363 		}
364 		
365 		
366 		return timer_id;
367 	}
368 	
369 	fd_t run(AsyncDirectoryWatcher ctxt, DWHandler del)
370 	{
371 		static fd_t ids;
372 		auto fd = ++ids;
373 		
374 		try (m_dwHandlers)[fd] = new DWHandlerInfo(del); 
375 		catch (Exception e) {
376 			setInternalError!"AsyncDirectoryWatcher.hashMap(run)"(Status.ERROR, "Could not add handler to hashmap: " ~ e.msg);
377 		}
378 		
379 		return fd;
380 		
381 	}
382 	
383 	bool kill(AsyncDirectoryWatcher ctxt) {
384 		
385 		try {
386 			Array!DWFolderWatcher toFree;
387 			foreach (ref const uint k, const DWFolderWatcher v; m_dwFolders) {
388 				if (v.fd == ctxt.fd) {
389 					CloseHandle(v.handle);
390 					m_dwFolders.remove(k);
391 				}
392 			}
393 			
394 			foreach (DWFolderWatcher obj; toFree[])
395 				FreeListObjectAlloc!DWFolderWatcher.free(obj);
396 			
397 			// todo: close all the handlers...
398 			m_dwHandlers.remove(ctxt.fd);
399 		}
400 		catch (Exception e) {
401 			setInternalError!"in kill(AsyncDirectoryWatcher)"(Status.ERROR, e.msg);
402 			return false;
403 		}
404 		
405 		return true;
406 	}
407 	
408 	bool kill(AsyncTCPConnection ctxt, bool forced = false)
409 	{
410 		
411 		m_status = StatusInfo.init;
412 		fd_t fd = ctxt.socket;
413 		
414 		log("Killing socket "~ fd.to!string);
415 		try { 
416 			auto cb = m_tcpHandlers.get(ctxt.socket);
417 			if (cb != TCPEventHandler.init){
418 				*cb.conn.connected = false;
419 				*cb.conn.connecting = false;
420 				return closeSocket(fd, true, forced);
421 			}
422 		} catch (Exception e) {
423 			setInternalError!"in m_tcpHandlers"(Status.ERROR, e.msg);
424 			assert(false);
425 			//return false;
426 		}
427 		
428 		return true;
429 	}
430 	
431 	bool kill(AsyncTCPListener ctxt)
432 	{
433 		m_status = StatusInfo.init;
434 		fd_t fd = ctxt.socket;
435 		try { 
436 			if ((ctxt.socket in m_connHandlers) !is null) {
437 				return closeSocket(fd, false, true);
438 			}
439 		} catch (Exception e) {
440 			setInternalError!"in m_connHandlers"(Status.ERROR, e.msg);
441 			return false;
442 		}
443 		
444 		return true;
445 	}
446 	
447 	bool kill(shared AsyncSignal ctxt) {
448 		return true;
449 	}
450 	
451 	bool kill(AsyncNotifier ctxt) {
452 		return true;
453 	}
454 	
455 	bool kill(AsyncTimer ctxt) {
456 		m_status = StatusInfo.init;
457 		
458 		log("Kill timer");
459 		
460 		BOOL err = KillTimer(m_hwnd, ctxt.id);
461 		if (err == 0)
462 		{
463 			m_error = GetLastErrorSafe();
464 			m_status.code = Status.ERROR;
465 			m_status.text = "kill(AsyncTimer)";
466 			log(m_status);
467 			return false;
468 		}
469 		
470 		destroyIndex(ctxt);
471 		
472 		if (m_timer.fd == ctxt.id) {
473 			ctxt.id = 0;
474 			m_timer = TimerCache.init;
475 		} else {
476 			try {
477 				m_timerHandlers.remove(ctxt.id);
478 			}
479 			catch (Exception e) {
480 				setInternalError!"HashMap remove"(Status.ERROR);
481 				return 0;
482 			}
483 		}
484 		
485 		
486 		return true;
487 	}
488 	
489 	bool kill(AsyncUDPSocket ctxt) {
490 		m_status = StatusInfo.init;
491 		
492 		fd_t fd = ctxt.socket;
493 		INT err = closesocket(fd);
494 		if (catchSocketError!"closesocket"(err)) 
495 			return false;
496 		
497 		try m_udpHandlers.remove(ctxt.socket);
498 		catch (Exception e) {
499 			setInternalError!"HashMap remove"(Status.ERROR);
500 			return 0;
501 		}
502 		
503 		return true;
504 	}
505 	
506 	bool setOption(T)(fd_t fd, TCPOption option, in T value) {
507 		m_status = StatusInfo.init;
508 		int err;
509 		try {
510 			nothrow bool errorHandler() {
511 				if (catchSocketError!"setOption:"(err)) {
512 					try m_status.text ~= option.to!string;
513 					catch (Exception e){ assert(false, "to!string conversion failure"); }
514 					return false;
515 				}
516 				
517 				return true;
518 			}
519 			
520 			
521 			static HashMap!(fd_t, tcp_keepalive)* kcache;
522 			
523 			final switch (option) {
524 				
525 				case TCPOption.NODELAY: // true/false
526 					static if (!is(T == bool))
527 						assert(false, "NODELAY value type must be bool, not " ~ T.stringof);
528 					else {
529 						BOOL val = value?1:0;
530 						socklen_t len = val.sizeof;
531 						err = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, len);
532 						return errorHandler();
533 					}
534 				case TCPOption.REUSEADDR: // true/false
535 					static if (!is(T == bool))
536 						assert(false, "REUSEADDR value type must be bool, not " ~ T.stringof);
537 					else
538 					{
539 						BOOL val = value?1:0;
540 						socklen_t len = val.sizeof;
541 						err = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &val, len);
542 						return errorHandler();
543 					}
544 				case TCPOption.QUICK_ACK:
545 					static if (!is(T == bool))
546 						assert(false, "QUICK_ACK value type must be bool, not " ~ T.stringof);
547 					else {
548 						m_status.code = Status.NOT_IMPLEMENTED;
549 						return false; // quick ack is not implemented
550 					}
551 				case TCPOption.KEEPALIVE_ENABLE: // true/false
552 					static if (!is(T == bool))
553 						assert(false, "KEEPALIVE_ENABLE value type must be bool, not " ~ T.stringof);
554 					else
555 					{
556 						BOOL val = value?1:0;
557 						socklen_t len = val.sizeof;
558 						err = setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &val, len);
559 						return errorHandler();
560 					}
561 				case TCPOption.KEEPALIVE_COUNT: // retransmit 10 times before dropping half-open conn
562 					static if (!isIntegral!T)
563 						assert(false, "KEEPALIVE_COUNT value type must be integral, not " ~ T.stringof);
564 					else {
565 						m_status.code = Status.NOT_IMPLEMENTED;
566 						return false;
567 					}
568 				case TCPOption.KEEPALIVE_INTERVAL: // wait ## seconds between each keepalive packets
569 					static if (!is(T == Duration))
570 						assert(false, "KEEPALIVE_INTERVAL value type must be Duration, not " ~ T.stringof);
571 					else {
572 						
573 						if (!kcache)
574 							kcache = new HashMap!(fd_t, tcp_keepalive)(defaultAllocator());
575 						
576 						tcp_keepalive kaSettings = kcache.get(fd, tcp_keepalive.init);
577 						tcp_keepalive sReturned;
578 						DWORD dwBytes;
579 						kaSettings.onoff = ULONG(1);
580 						if (kaSettings.keepalivetime == ULONG.init) {
581 							kaSettings.keepalivetime = 1000;
582 						}
583 						kaSettings.keepaliveinterval = value.total!"msecs".to!ULONG;
584 						(*kcache)[fd] = kaSettings;
585 						err = WSAIoctl(fd, SIO_KEEPALIVE_VALS, &kaSettings, tcp_keepalive.sizeof, &sReturned, tcp_keepalive.sizeof, &dwBytes, null, null);
586 						
587 						return errorHandler();
588 					}
589 				case TCPOption.KEEPALIVE_DEFER: // wait ## seconds until start
590 					static if (!is(T == Duration))
591 						assert(false, "KEEPALIVE_DEFER value type must be Duration, not " ~ T.stringof);
592 					else {
593 						
594 						if (!kcache)
595 							kcache = new HashMap!(fd_t, tcp_keepalive)(defaultAllocator());
596 						
597 						tcp_keepalive kaSettings = kcache.get(fd, tcp_keepalive.init);
598 						tcp_keepalive sReturned;
599 						DWORD dwBytes;
600 						kaSettings.onoff = ULONG(1);
601 						if (kaSettings.keepaliveinterval == ULONG.init) {
602 							kaSettings.keepaliveinterval = 75*1000;
603 						}
604 						kaSettings.keepalivetime = value.total!"msecs".to!ULONG;
605 						
606 						(*kcache)[fd] = kaSettings;
607 						err = WSAIoctl(fd, SIO_KEEPALIVE_VALS, &kaSettings, tcp_keepalive.sizeof, &sReturned, tcp_keepalive.sizeof, &dwBytes, null, null);
608 						
609 						return errorHandler();
610 					}
611 				case TCPOption.BUFFER_RECV: // bytes
612 					static if (!isIntegral!T)
613 						assert(false, "BUFFER_RECV value type must be integral, not " ~ T.stringof);
614 					else {
615 						int val = value.to!int;
616 						socklen_t len = val.sizeof;
617 						err = setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &val, len);
618 						return errorHandler();
619 					}
620 				case TCPOption.BUFFER_SEND: // bytes
621 					static if (!isIntegral!T)
622 						assert(false, "BUFFER_SEND value type must be integral, not " ~ T.stringof);
623 					else {
624 						int val = value.to!int;
625 						socklen_t len = val.sizeof;
626 						err = setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &val, len);
627 						return errorHandler();
628 					}
629 				case TCPOption.TIMEOUT_RECV:
630 					static if (!is(T == Duration))
631 						assert(false, "TIMEOUT_RECV value type must be Duration, not " ~ T.stringof);
632 					else {
633 						DWORD val = value.total!"msecs".to!DWORD;
634 						socklen_t len = val.sizeof;
635 						err = setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, &val, len);
636 						return errorHandler();
637 					}
638 				case TCPOption.TIMEOUT_SEND:
639 					static if (!is(T == Duration))
640 						assert(false, "TIMEOUT_SEND value type must be Duration, not " ~ T.stringof);
641 					else {
642 						DWORD val = value.total!"msecs".to!DWORD;
643 						socklen_t len = val.sizeof;
644 						err = setsockopt(fd, SOL_SOCKET, SO_SNDTIMEO, &val, len);
645 						return errorHandler();
646 					}
647 				case TCPOption.TIMEOUT_HALFOPEN:
648 					static if (!is(T == Duration))
649 						assert(false, "TIMEOUT_SEND value type must be Duration, not " ~ T.stringof);
650 					else {
651 						m_status.code = Status.NOT_IMPLEMENTED;
652 						return false;
653 					}
654 				case TCPOption.LINGER: // bool onOff, int seconds
655 					static if (!is(T == Tuple!(bool, int)))
656 						assert(false, "LINGER value type must be Tuple!(bool, int), not " ~ T.stringof);
657 					else {
658 						linger l = linger(val[0]?1:0, val[1].to!USHORT);
659 						socklen_t llen = l.sizeof;
660 						err = setsockopt(fd, SOL_SOCKET, SO_LINGER, &l, llen);
661 						return errorHandler();
662 					}
663 				case TCPOption.CONGESTION:
664 					static if (!isIntegral!T)
665 						assert(false, "CONGESTION value type must be integral, not " ~ T.stringof);
666 					else {
667 						m_status.code = Status.NOT_IMPLEMENTED;
668 						return false;
669 					}
670 				case TCPOption.CORK:
671 					static if (!isIntegral!T)
672 						assert(false, "CORK value type must be int, not " ~ T.stringof);
673 					else {
674 						m_status.code = Status.NOT_IMPLEMENTED;
675 						return false;
676 					}
677 				case TCPOption.DEFER_ACCEPT: // seconds
678 					static if (!isIntegral!T)
679 						assert(false, "DEFER_ACCEPT value type must be integral, not " ~ T.stringof);
680 					else {
681 						int val = value.to!int;
682 						socklen_t len = val.sizeof;
683 						err = setsockopt(fd, SOL_SOCKET, SO_CONDITIONAL_ACCEPT, &val, len);
684 						return errorHandler();
685 					}
686 			}
687 			
688 		}
689 		catch (Exception e) {
690 			return false;
691 		}
692 		
693 	}
694 	
695 	uint read(in fd_t fd, ref ubyte[] data)
696 	{
697 		return 0;
698 	}
699 	
700 	uint write(in fd_t fd, in ubyte[] data)
701 	{
702 		return 0;
703 	}
704 	
705 	uint readChanges(in fd_t fd, ref DWChangeInfo[] dst) {
706 		size_t i;
707 		Array!DWChangeInfo* changes;
708 		try {
709 			changes = &(m_dwHandlers.get(fd, DWHandlerInfo.init).buffer);
710 			if ((*changes).empty)
711 				return 0;
712 			
713 			import std.algorithm : min;
714 			size_t cnt = min(dst.length, changes.length);
715 			foreach (DWChangeInfo change; (*changes)[0 .. cnt]) {
716 				try log("reading change: " ~ change.path); catch {}
717 				dst[i] = (*changes)[i];
718 				i++;
719 			}
720 			changes.linearRemove((*changes)[0 .. cnt]);
721 		}
722 		catch (Exception e) {
723 			setInternalError!"watcher.readChanges"(Status.ERROR, "Could not read directory changes: " ~ e.msg);
724 			return 0;
725 		}
726 		try log("Changes returning with: " ~ i.to!string); catch {}
727 		return cast(uint) i;
728 	}
729 	
730 	uint watch(in fd_t fd, in WatchInfo info) {
731 		m_status = StatusInfo.init;
732 		uint wd;
733 		try {
734 			HANDLE hndl = CreateFileW(toUTFz!(const(wchar)*)(info.path.toNativeString()),
735 			                          FILE_LIST_DIRECTORY,
736 			                          FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE,
737 			                          null,
738 			                          OPEN_EXISTING,
739 			                          FILE_FLAG_BACKUP_SEMANTICS | FILE_FLAG_OVERLAPPED,
740 			                          null);
741 			wd = cast(uint) hndl;
742 			DWHandlerInfo handler = m_dwHandlers.get(fd, DWHandlerInfo.init);
743 			assert(handler !is null);
744 			log("Watching: " ~ info.path.toNativeString());
745 			(m_dwFolders)[wd] = FreeListObjectAlloc!DWFolderWatcher.alloc(m_evLoop, fd, hndl, info.path, info.events, handler, info.recursive);
746 		} catch (Exception e) {
747 			setInternalError!"watch"(Status.ERROR, "Could not start watching directory: " ~ e.msg);
748 			return 0;
749 		}
750 		return wd;
751 	}
752 	
753 	bool unwatch(in fd_t fd, in fd_t _wd) {
754 		uint wd = cast(uint) _wd;
755 		m_status = StatusInfo.init;
756 		try {
757 			DWFolderWatcher fw = m_dwFolders.get(wd, null);
758 			assert(fw !is null);
759 			m_dwFolders.remove(wd);
760 			fw.close();
761 			FreeListObjectAlloc!DWFolderWatcher.free(fw);
762 		} catch (Exception e) {
763 			setInternalError!"unwatch"(Status.ERROR, "Failed when unwatching directory: " ~ e.msg);
764 			return false;
765 		}
766 		return true;
767 	}
768 	
769 	bool notify(T)(in fd_t fd, in T payload) 
770 		if (is(T == shared AsyncSignal) || is(T == AsyncNotifier))
771 	{
772 		m_status = StatusInfo.init;
773 		import std.conv;
774 
775 		auto payloadPtr = cast(ubyte*)payload;
776 		auto payloadAddr = cast(ulong)payloadPtr;
777 
778 		WPARAM wparam = payloadAddr & 0xffffffff;
779 		LPARAM lparam = cast(uint) (payloadAddr >> 32);
780 
781 		BOOL err;
782 		static if (is(T == AsyncNotifier))
783 			err = PostMessageA(cast(HWND)fd, WM_USER_SIGNAL, wparam, lparam);
784 		else
785 			err = PostMessageA(cast(HWND)fd, WM_USER_EVENT, wparam, lparam);
786 		try log("Sending notification to: " ~ (cast(HWND)fd).to!string); catch {}
787 		if (err == 0)
788 		{
789 			m_error = GetLastErrorSafe();
790 			m_status.code = Status.ERROR;
791 			m_status.text = "notify";
792 			log(m_status);
793 			return false;
794 		}
795 		return true;
796 	}
797 	
798 	uint recv(in fd_t fd, ref ubyte[] data)
799 	{
800 		m_status = StatusInfo.init;
801 		int ret = .recv(fd, cast(void*) data.ptr, cast(INT) data.length, 0);
802 		
803 		//try log("RECV " ~ ret.to!string ~ "B FD#" ~ fd.to!string); catch {}
804 		if (catchSocketError!".recv"(ret)) { // ret == -1
805 			if (m_error == error_t.WSAEWOULDBLOCK)
806 				m_status.code = Status.ASYNC;
807 			return 0; // TODO: handle some errors more specifically
808 		}
809 		m_status.code = Status.OK;
810 		
811 		return cast(uint) ret;
812 	}
813 	
814 	uint send(in fd_t fd, in ubyte[] data)
815 	{
816 		m_status = StatusInfo.init;
817 		//try log("SEND " ~ data.length.to!string ~ "B FD#" ~ fd.to!string);
818 		//catch{}
819 		int ret = .send(fd, cast(const(void)*) data.ptr, cast(INT) data.length, 0);
820 		
821 		if (catchSocketError!"send"(ret)) {
822 			if (m_error == error_t.WSAEWOULDBLOCK)
823 				m_status.code = Status.ASYNC;
824 			return 0; // TODO: handle some errors more specifically
825 		}
826 		m_status.code = Status.ASYNC;
827 		return cast(uint) ret;
828 	}
829 	
830 	bool broadcast(in fd_t fd, bool b) {
831 		int val = b?1:0;
832 		socklen_t len = val.sizeof;
833 		int err = setsockopt(fd, SOL_SOCKET, SO_BROADCAST, &val, len);
834 		if (catchSocketError!"setsockopt"(err))
835 			return false;
836 		
837 		return true;
838 		
839 	}
840 	
841 	uint recvFrom(in fd_t fd, ref ubyte[] data, ref NetworkAddress addr)
842 	{
843 		m_status = StatusInfo.init;
844 		socklen_t addrLen;
845 		addr.family = AF_INET;
846 		int ret = .recvfrom(fd, cast(void*) data.ptr, cast(INT) data.length, 0, addr.sockAddr, &addrLen);
847 		
848 		if (addrLen > addr.sockAddrLen) {
849 			addr.family = AF_INET6;
850 		}
851 		
852 		try log("RECVFROM " ~ ret.to!string ~ "B"); catch {}
853 		if (catchSocketError!".recvfrom"(ret)) { // ret == -1
854 			if (m_error == WSAEWOULDBLOCK)
855 				m_status.code = Status.ASYNC;
856 			return 0; // TODO: handle some errors more specifically
857 		}
858 		m_status.code = Status.OK;
859 		
860 		return cast(uint) ret;
861 	}
862 	
863 	uint sendTo(in fd_t fd, in ubyte[] data, in NetworkAddress addr)
864 	{
865 		m_status = StatusInfo.init;
866 		try log("SENDTO " ~ data.length.to!string ~ "B"); catch{}
867 		int ret;
868 		if (addr != NetworkAddress.init)
869 			ret = .sendto(fd, cast(void*) data.ptr, cast(INT) data.length, 0, addr.sockAddr, addr.sockAddrLen);
870 		else
871 			ret = .send(fd, cast(void*) data.ptr, cast(INT) data.length, 0);
872 		
873 		if (catchSocketError!".sendTo"(ret)) { // ret == -1
874 			if (m_error == WSAEWOULDBLOCK)
875 				m_status.code = Status.ASYNC;
876 			return 0; // TODO: handle some errors more specifically
877 		}
878 		
879 		m_status.code = Status.OK;
880 		return cast(uint) ret;
881 	}
882 	
883 	NetworkAddress localAddr(in fd_t fd, bool ipv6) {
884 		NetworkAddress ret;
885 		import libasync.internals.win32 : getsockname, AF_INET, AF_INET6, socklen_t, sockaddr;
886 		if (ipv6)
887 			ret.family = AF_INET6;
888 		else
889 			ret.family = AF_INET;
890 		socklen_t len = ret.sockAddrLen;
891 		int err = getsockname(fd, ret.sockAddr, &len);
892 		if (catchSocketError!"getsockname"(err))
893 			return NetworkAddress.init;
894 		if (len > ret.sockAddrLen)
895 			ret.family = AF_INET6;
896 		return ret;
897 	}
898 	
899 	void noDelay(in fd_t fd, bool b) {
900 		m_status = StatusInfo.init;
901 		setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &b, b.sizeof);
902 	}
903 	
904 	private bool closeRemoteSocket(fd_t fd, bool forced) {
905 		
906 		INT err;
907 		
908 		try log("Shutdown FD#" ~ fd.to!string);
909 		catch{}
910 		if (forced) {
911 			err = shutdown(fd, SD_BOTH);
912 			closesocket(fd);
913 		}
914 		else
915 			err = shutdown(fd, SD_SEND);
916 		
917 		try {
918 			TCPEventHandler* evh = fd in m_tcpHandlers;
919 			if (evh && evh.conn.inbound) {
920 				try FreeListObjectAlloc!AsyncTCPConnection.free(evh.conn);
921 				catch(Exception e) { assert(false, "Failed to free resources"); }
922 				evh.conn = null;
923 				//log("Remove event handler for " ~ fd.to!string);
924 				m_tcpHandlers.remove(fd);
925 			}
926 		}
927 		catch (Exception e) {
928 			setInternalError!"m_tcpHandlers.remove"(Status.ERROR);
929 			return false;
930 		}
931 		if (catchSocketError!"shutdown"(err))
932 			return false;
933 		return true;
934 	}
935 	
936 	// for connected sockets
937 	bool closeSocket(fd_t fd, bool connected, bool forced = false)
938 	{
939 		m_status = StatusInfo.init;
940 		if (!connected && forced) {
941 			try {
942 				if (fd in m_connHandlers) {
943 					log("Removing connection handler for: " ~ fd.to!string);
944 					m_connHandlers.remove(fd);
945 				}
946 			}
947 			catch (Exception e) {
948 				setInternalError!"m_connHandlers.remove"(Status.ERROR);
949 				return false;
950 			}
951 		}
952 		else if (connected)
953 			closeRemoteSocket(fd, forced);
954 
955 		if (!connected || forced) {
956 			// todo: flush the socket here?
957 			
958 			INT err = closesocket(fd);
959 			if (catchSocketError!"closesocket"(err)) 
960 				return false;
961 			
962 		}
963 		return true;
964 	}
965 	
966 	bool closeConnection(fd_t fd) {
967 		return closeSocket(fd, true);
968 	}
969 	
970 	NetworkAddress getAddressFromIP(in string ipAddr, in ushort port = 0, in bool ipv6 = false, in bool tcp = true)
971 	in {
972 		debug import libasync.internals.validator : validateIPv4, validateIPv6;
973 		debug assert( validateIPv4(ipAddr) || validateIPv6(ipAddr), "Trying to connect to an invalid IP address");
974 	}
975 	body {
976 		m_status = StatusInfo.init;
977 		
978 		NetworkAddress addr;
979 		WSAPROTOCOL_INFOW hints;
980 		import std.conv : to;
981 		if (ipv6) {
982 			addr.family = AF_INET6;
983 		}
984 		else {
985 			addr.family = AF_INET;
986 		}
987 		
988 		INT addrlen = addr.sockAddrLen;
989 		
990 		LPWSTR str;
991 		try {
992 			str = cast(LPWSTR) toUTFz!(wchar*)(ipAddr);
993 		} catch (Exception e) {
994 			setInternalError!"toStringz"(Status.ERROR, e.msg);
995 			return NetworkAddress.init;
996 		}
997 		
998 		INT err = WSAStringToAddressW(str, cast(INT) addr.family, null, addr.sockAddr, &addrlen); 
999 		if (port != 0) addr.port = port;
1000 		try log(addr.toString());
1001 		catch {}
1002 		if( catchSocketError!"getAddressFromIP"(err) )
1003 			return NetworkAddress.init;
1004 		else assert(addrlen == addr.sockAddrLen);
1005 		return addr;
1006 	}
1007 	
1008 	NetworkAddress getAddressFromDNS(in string host, in ushort port = 0, in bool ipv6 = true, in bool tcp = true, in bool force = true)
1009 		/*in { 
1010 		debug import libasync.internals.validator : validateHost;
1011 		debug assert(validateHost(host), "Trying to connect to an invalid domain");
1012 	}
1013 	body */{
1014 		m_status = StatusInfo.init;
1015 		import std.conv : to;
1016 		NetworkAddress addr;
1017 		ADDRINFOW hints;
1018 		ADDRINFOW* infos;
1019 		LPCWSTR wPort = port.to!(wchar[]).toUTFz!(const(wchar)*);
1020 		if (ipv6) {
1021 			hints.ai_family = AF_INET6;
1022 			addr.family = AF_INET6;
1023 		}
1024 		else {
1025 			hints.ai_family = AF_INET;
1026 			addr.family = AF_INET;
1027 		}
1028 		
1029 		if (tcp) {
1030 			hints.ai_protocol = IPPROTO_TCP;
1031 			hints.ai_socktype = SOCK_STREAM;
1032 		}
1033 		else {
1034 			hints.ai_protocol = IPPROTO_UDP;
1035 			hints.ai_socktype = SOCK_DGRAM;
1036 		}
1037 		if (port != 0) addr.port = port;
1038 		
1039 		LPCWSTR str;
1040 		
1041 		try {
1042 			str = cast(LPCWSTR) toUTFz!(immutable(wchar)*)(host);
1043 		} catch (Exception e) {
1044 			setInternalError!"toUTFz"(Status.ERROR, e.msg);
1045 			return NetworkAddress.init;
1046 		}
1047 		
1048 		error_t err = cast(error_t) GetAddrInfoW(str, cast(LPCWSTR) wPort, &hints, &infos);
1049 		if (err != EWIN.WSA_OK) {
1050 			setInternalError!"GetAddrInfoW"(Status.ABORT, string.init, err);
1051 			return NetworkAddress.init;
1052 		}
1053 		
1054 		ubyte* pAddr = cast(ubyte*) infos.ai_addr;
1055 		ubyte* data = cast(ubyte*) addr.sockAddr;
1056 		data[0 .. infos.ai_addrlen] = pAddr[0 .. infos.ai_addrlen]; // perform bit copy
1057 		FreeAddrInfoW(infos);
1058 		try log("GetAddrInfoW Successfully resolved DNS to: " ~ addr.toAddressString());
1059 		catch (Exception e){}
1060 		return addr;
1061 	}
1062 	
1063 	void setInternalError(string TRACE)(in Status s, in string details = "", in error_t error = EWIN.ERROR_ACCESS_DENIED)
1064 	{
1065 		if (details.length > 0)
1066 			m_status.text = TRACE ~ ": " ~ details;
1067 		else
1068 			m_status.text = TRACE;
1069 		m_error = error;
1070 		m_status.code = s;
1071 		static if(LOG) log(m_status);
1072 	}
1073 private:
1074 	bool onMessage(MSG msg) 
1075 	{
1076 		m_status = StatusInfo.init;
1077 		switch (msg.message) {
1078 			case WM_TCP_SOCKET:
1079 				auto evt = LOWORD(msg.lParam);
1080 				auto err = HIWORD(msg.lParam);
1081 				if (!onTCPEvent(evt, err, cast(fd_t)msg.wParam)) {
1082 
1083 					if (evt == FD_ACCEPT)
1084 						setInternalError!"del@TCPAccept.ERROR"(Status.ERROR); 
1085 					else {
1086 						try {
1087 							TCPEventHandler cb = m_tcpHandlers.get(cast(fd_t)msg.wParam);
1088 							cb(TCPEvent.ERROR);
1089 						}
1090 						catch (Exception e) {
1091 							// An Error callback should never fail...
1092 							setInternalError!"del@TCPEvent.ERROR"(Status.ERROR); 
1093 							// assert(false, evt.to!string ~ " & " ~ m_status.to!string ~ " & " ~ m_error.to!string); 
1094 						}
1095 					}
1096 				}
1097 				break;
1098 			case WM_UDP_SOCKET:
1099 				auto evt = LOWORD(msg.lParam);
1100 				auto err = HIWORD(msg.lParam);
1101 				if (!onUDPEvent(evt, err, cast(fd_t)msg.wParam)) {
1102 					try {
1103 						UDPHandler cb = m_udpHandlers.get(cast(fd_t)msg.wParam);
1104 						cb(UDPEvent.ERROR);
1105 					}
1106 					catch (Exception e) {
1107 						// An Error callback should never fail...
1108 						setInternalError!"del@UDPEvent.ERROR"(Status.ERROR); 
1109 					}
1110 				}
1111 				break;
1112 			case WM_TIMER:
1113 				log("Timer callback");
1114 				TimerHandler cb;
1115 				bool cached = (m_timer.fd == cast(fd_t)msg.wParam);
1116 				try {
1117 					if (cached)
1118 						cb = m_timer.cb;
1119 					else
1120 						cb = m_timerHandlers.get(cast(fd_t)msg.wParam);
1121 					
1122 					cb.ctxt.rearmed = false;
1123 					
1124 					if (cb.ctxt.oneShot)
1125 						kill(cb.ctxt);
1126 					
1127 					cb();
1128 					
1129 					if (cb.ctxt.oneShot && cb.ctxt.rearmed)
1130 						run(cb.ctxt, cb, cb.ctxt.timeout);
1131 					
1132 				}
1133 				catch (Exception e) {
1134 					// An Error callback should never fail...
1135 					setInternalError!"del@TimerHandler"(Status.ERROR, e.msg);  
1136 				}
1137 				
1138 				break;
1139 			case WM_USER_EVENT:
1140 				log("User event");
1141 
1142 				ulong uwParam = cast(ulong)msg.wParam;
1143 				ulong ulParam = cast(ulong)msg.lParam;
1144 
1145 				ulong payloadAddr = (ulParam << 32) | uwParam;
1146 				void* payloadPtr = cast(void*) payloadAddr;
1147 				shared AsyncSignal ctxt = cast(shared AsyncSignal) payloadPtr;
1148 
1149 				try log("Got notification in : " ~ m_hwnd.to!string ~ " pointer: " ~ payloadPtr.to!string); catch {}
1150 				try {
1151 					assert(ctxt.id != 0);
1152 					ctxt.handler();
1153 				}
1154 				catch (Exception e) {
1155 					setInternalError!"WM_USER_EVENT@handler"(Status.ERROR); 
1156 				}
1157 				break;
1158 			case WM_USER_SIGNAL:
1159 				log("User signal");
1160 
1161 				ulong uwParam = cast(ulong)msg.wParam;
1162 				ulong ulParam = cast(ulong)msg.lParam;
1163 
1164 				ulong payloadAddr = (ulParam << 32) | uwParam;
1165 				void* payloadPtr = cast(void*) payloadAddr;
1166 				AsyncNotifier ctxt = cast(AsyncNotifier) payloadPtr;
1167 
1168 				try {
1169 					ctxt.handler();
1170 				}
1171 				catch (Exception e) {
1172 					setInternalError!"WM_USER_SIGNAL@handler"(Status.ERROR); 
1173 				}
1174 				break;
1175 			default: return false; // not handled, sends to wndProc
1176 		}
1177 		return true;
1178 	}
1179 	
1180 	bool onUDPEvent(WORD evt, WORD err, fd_t sock) {
1181 		m_status = StatusInfo.init;
1182 		try{
1183 			if (m_udpHandlers.get(sock) == UDPHandler.init)
1184 				return false;
1185 		}	catch {}
1186 		if (sock == 0) { // highly unlikely...
1187 			setInternalError!"onUDPEvent"(Status.ERROR, "no socket defined");
1188 			return false;
1189 		}
1190 		if (err) {
1191 			setInternalError!"onUDPEvent"(Status.ERROR, string.init, cast(error_t)err);
1192 			try {
1193 				//log("CLOSE FD#" ~ sock.to!string);
1194 				(m_udpHandlers)[sock](UDPEvent.ERROR);
1195 			} catch { // can't do anything about this...
1196 			}
1197 			return false;
1198 		}
1199 		
1200 		UDPHandler cb;
1201 		switch(evt) {
1202 			default: break;
1203 			case FD_READ:
1204 				try {
1205 					log("READ FD#" ~ sock.to!string);
1206 					cb = m_udpHandlers.get(sock);
1207 					assert(cb != UDPHandler.init, "Socket " ~ sock.to!string ~ " could not yield a callback");
1208 					cb(UDPEvent.READ);
1209 				}
1210 				catch (Exception e) {
1211 					setInternalError!"del@TCPEvent.READ"(Status.ABORT); 
1212 					return false;
1213 				}
1214 				break;
1215 			case FD_WRITE:
1216 				try {
1217 					log("WRITE FD#" ~ sock.to!string);
1218 					cb = m_udpHandlers.get(sock);
1219 					assert(cb != UDPHandler.init, "Socket " ~ sock.to!string ~ " could not yield a callback");
1220 					cb(UDPEvent.WRITE);
1221 				}
1222 				catch (Exception e) {
1223 					setInternalError!"del@TCPEvent.WRITE"(Status.ABORT); 
1224 					return false;
1225 				}
1226 				break;
1227 		}
1228 		return true;
1229 	}
1230 	
1231 	bool onTCPEvent(WORD evt, WORD err, fd_t sock) {
1232 		m_status = StatusInfo.init;
1233 		try{
1234 			if (m_tcpHandlers.get(sock) == TCPEventHandler.init && m_connHandlers.get(sock) == TCPAcceptHandler.init)
1235 				assert( false ); 
1236 		}	catch {}
1237 		if (sock == 0) { // highly unlikely...
1238 			setInternalError!"onTCPEvent"(Status.ERROR, "no socket defined");
1239 			assert(false);
1240 		}
1241 		if (err) {
1242 			setInternalError!"onTCPEvent"(Status.ERROR, string.init, cast(error_t)err);
1243 			try {
1244 				//log("CLOSE FD#" ~ sock.to!string);
1245 				(m_tcpHandlers)[sock](TCPEvent.ERROR);
1246 			} catch { // can't do anything about this...
1247 			}
1248 			return false;
1249 		}
1250 		
1251 		TCPEventHandler cb;
1252 		switch(evt) {
1253 			default: break;
1254 			case FD_ACCEPT:
1255 				gs_mtx.lock_nothrow();
1256 
1257 				log("TCP Handlers: " ~ m_tcpHandlers.length.to!string);
1258 				log("Accepting connection");
1259 				/// Let another listener take the next connection
1260 				TCPAcceptHandler list;
1261 				try list = m_connHandlers[sock]; catch { assert(false, "Listening on an invalid socket..."); }
1262 				scope(exit) {
1263 					/// The connection rotation mechanism is handled by the TCPListenerDistMixins
1264 					/// when registering the same AsyncTCPListener object on multiple event loops.
1265 					/// This allows to even out the CPU usage on a server instance.
1266 					HWND hwnd = list.ctxt.next(m_hwnd);
1267 					if (hwnd !is HWND.init) {
1268 						int error = WSAAsyncSelect(sock, hwnd, WM_TCP_SOCKET, FD_ACCEPT);
1269 						if (catchSocketError!"WSAAsyncSelect.NEXT()=> HWND"(error)) {
1270 							error = WSAAsyncSelect(sock, m_hwnd, WM_TCP_SOCKET, FD_ACCEPT);
1271 							if (catchSocketError!"WSAAsyncSelect"(error))
1272 								assert(false, "Could not set listener back to window HANDLE " ~ m_hwnd.to!string); 
1273 						}
1274 					}
1275 					else log("Returned init!!");
1276 
1277 					gs_mtx.unlock_nothrow();
1278 				}
1279 
1280 				NetworkAddress addr;
1281 				addr.family = AF_INET;
1282 				int addrlen = addr.sockAddrLen;
1283 				fd_t csock = WSAAccept(sock, addr.sockAddr, &addrlen, null, 0);
1284 
1285 				if (catchSocketError!"WSAAccept"(csock, INVALID_SOCKET)) {
1286 					if (m_error == WSAEFAULT) { // not enough space for sockaddr
1287 						addr.family = AF_INET6;
1288 						addrlen = addr.sockAddrLen;
1289 						csock = WSAAccept(sock, addr.sockAddr, &addrlen, null, 0);
1290 						if (catchSocketError!"WSAAccept"(csock, INVALID_SOCKET))
1291 							return false;
1292 					}
1293 					else return false;
1294 				}
1295 
1296 				int ok = WSAAsyncSelect(csock, m_hwnd, WM_TCP_SOCKET, FD_CONNECT|FD_READ|FD_WRITE|FD_CLOSE);
1297 				if ( catchSocketError!"WSAAsyncSelect"(ok) ) 
1298 					return false;
1299 
1300 				log("Connection accepted: " ~ csock.to!string);
1301 
1302 				AsyncTCPConnection conn;
1303 				try conn = FreeListObjectAlloc!AsyncTCPConnection.alloc(m_evLoop);
1304 				catch (Exception e) { assert(false, "Failed allocation"); }
1305 				conn.peer = addr;
1306 				conn.socket = csock;
1307 				conn.inbound = true;
1308 
1309 				try {
1310 					// Do the callback to get a handler
1311 					cb = list(conn); 
1312 				} 
1313 				catch(Exception e) {
1314 					setInternalError!"onConnected"(Status.EVLOOP_FAILURE); 
1315 					return false; 
1316 				}
1317 
1318 				try {
1319 					m_tcpHandlers[csock] = cb; // keep the handler to setup the connection
1320 					log("ACCEPT&CONNECT FD#" ~ csock.to!string);
1321 					*conn.connecting = true;
1322 					//cb(TCPEvent.CONNECT);
1323 				}
1324 				catch (Exception e) { 
1325 					setInternalError!"m_tcpHandlers.opIndexAssign"(Status.ABORT); 
1326 					return false; 
1327 				}
1328 				break;
1329 			case FD_CONNECT:
1330 				try {
1331 					log("CONNECT FD#" ~ sock.to!string);
1332 					cb = m_tcpHandlers.get(sock);
1333 					if (cb == TCPEventHandler.init) break;//, "Socket " ~ sock.to!string ~ " could not yield a callback");
1334 					*cb.conn.connecting = true;
1335 				} 
1336 				catch(Exception e) {
1337 					setInternalError!"del@TCPEvent.CONNECT"(Status.ABORT);
1338 					return false;
1339 				}
1340 				break;
1341 			case FD_READ:
1342 				try {
1343 					log("READ FD#" ~ sock.to!string);
1344 					cb = m_tcpHandlers.get(sock);
1345 					if (cb == TCPEventHandler.init) break; //, "Socket " ~ sock.to!string ~ " could not yield a callback");
1346 					if (!cb.conn) break;
1347 					if (*cb.conn.connected == false && *cb.conn.connecting) {
1348 						*cb.conn.connecting = false;
1349 						*cb.conn.connected = true;
1350 						cb(TCPEvent.CONNECT);
1351 					}
1352 					else 
1353 						cb(TCPEvent.READ);
1354 				}
1355 				catch (Exception e) {
1356 					setInternalError!"del@TCPEvent.READ"(Status.ABORT); 
1357 					return false;
1358 				}
1359 				break;
1360 			case FD_WRITE:
1361 				// todo: don't send the first write for consistency with epoll?
1362 				
1363 				try {
1364 					//import std.stdio;
1365 					log("WRITE FD#" ~ sock.to!string);
1366 					cb = m_tcpHandlers.get(sock);
1367 					if (cb == TCPEventHandler.init) break;//assert(cb != TCPEventHandler.init, "Socket " ~ sock.to!string ~ " could not yield a callback");
1368 					if (!cb.conn) break;
1369 					if (*cb.conn.connected == false && *cb.conn.connecting) {
1370 						*cb.conn.connecting = false;
1371 						*cb.conn.connected = true;
1372 						cb(TCPEvent.CONNECT);
1373 					}
1374 					else {
1375 						cb(TCPEvent.WRITE);
1376 					}
1377 				}
1378 				catch (Exception e) {
1379 					setInternalError!"del@TCPEvent.WRITE"(Status.ABORT); 
1380 					return false;
1381 				}
1382 				break;
1383 			case FD_CLOSE:
1384 				// called after shutdown()
1385 				INT ret;
1386 				bool connected = true;
1387 				try {
1388 					log("CLOSE FD#" ~ sock.to!string);
1389 					if (sock in m_tcpHandlers) {
1390 						cb = m_tcpHandlers.get(sock);
1391 						if (*cb.conn.connected) {
1392 							cb(TCPEvent.CLOSE);
1393 							*cb.conn.connecting = false;
1394 							*cb.conn.connected = false;
1395 						} else
1396 							connected = false;
1397 					}
1398 					else
1399 						connected = false;
1400 				}
1401 				catch (Exception e) {
1402 					if (m_status.code == Status.OK)
1403 						setInternalError!"del@TCPEvent.CLOSE"(Status.ABORT); 
1404 					return false;
1405 				}
1406 				
1407 				closeSocket(sock, connected, true); // as necessary: invokes m_tcpHandlers.remove(fd), shutdown, closesocket
1408 				
1409 				break;
1410 		}
1411 		return true;
1412 	}
1413 	
1414 	bool initUDPSocket(fd_t fd, AsyncUDPSocket ctxt)
1415 	{
1416 		INT err;
1417 		err = bind(fd, ctxt.local.sockAddr, ctxt.local.sockAddrLen);
1418 		if (catchSocketError!"bind"(err)) {
1419 			closesocket(fd);
1420 			return false;
1421 		}
1422 		err = listen(fd, 128);
1423 		if (catchSocketError!"listen"(err)) {
1424 			closesocket(fd);
1425 			return false;
1426 		}
1427 		err = WSAAsyncSelect(fd, m_hwnd, WM_UDP_SOCKET, FD_READ | FD_WRITE);
1428 		if (catchSocketError!"WSAAsyncSelect"(err)) {
1429 			closesocket(fd);
1430 			return false;
1431 		}
1432 		
1433 		return true;
1434 	}
1435 	
1436 	bool initTCPListener(fd_t fd, AsyncTCPListener ctxt, bool reusing = false)
1437 	in { 
1438 		assert(m_threadId == GetCurrentThreadId());
1439 		assert(ctxt.local !is NetworkAddress.init);
1440 	}
1441 	body {
1442 		INT err;
1443 		if (!reusing) {
1444 			err = bind(fd, ctxt.local.sockAddr, ctxt.local.sockAddrLen);
1445 			if (catchSocketError!"bind"(err)) {
1446 				closesocket(fd);
1447 				return false;
1448 			}
1449 
1450 			err = listen(fd, 128);
1451 			if (catchSocketError!"listen"(err)) {
1452 				closesocket(fd);
1453 				return false;
1454 			}
1455 			
1456 			err = WSAAsyncSelect(fd, m_hwnd, WM_TCP_SOCKET, FD_ACCEPT);
1457 			if (catchSocketError!"WSAAsyncSelect"(err)) {
1458 				closesocket(fd);
1459 				return false;
1460 			}
1461 		}
1462 		
1463 		return true;
1464 	}
1465 	
1466 	bool initTCPConnection(fd_t fd, AsyncTCPConnection ctxt)
1467 	in { 
1468 		assert(ctxt.peer !is NetworkAddress.init);
1469 		assert(ctxt.peer.port != 0, "Connecting to an invalid port");
1470 	}
1471 	body {
1472 		INT err;
1473 		NetworkAddress bind_addr;
1474 		bind_addr.family = ctxt.peer.family;
1475 		
1476 		if (ctxt.peer.family == AF_INET) 
1477 			bind_addr.sockAddrInet4.sin_addr.s_addr = 0;
1478 		else if (ctxt.peer.family == AF_INET6) 
1479 			bind_addr.sockAddrInet6.sin6_addr.s6_addr[] = 0;
1480 		else assert(false, "Invalid NetworkAddress.family " ~ ctxt.peer.family.to!string);
1481 		
1482 		err = .bind(fd, bind_addr.sockAddr, bind_addr.sockAddrLen);
1483 		if ( catchSocketError!"bind"(err) ) 
1484 			return false;
1485 		err = WSAAsyncSelect(fd, m_hwnd, WM_TCP_SOCKET, FD_CONNECT|FD_READ|FD_WRITE|FD_CLOSE);
1486 		if ( catchSocketError!"WSAAsyncSelect"(err) ) 
1487 			return false;
1488 		err = .connect(fd, ctxt.peer.sockAddr, ctxt.peer.sockAddrLen);
1489 		
1490 		auto errors = [	tuple(cast(size_t) SOCKET_ERROR, EWIN.WSAEWOULDBLOCK, Status.ASYNC) ];		
1491 		
1492 		if (catchSocketErrorsEq!"connectEQ"(err, errors))
1493 			return true;
1494 		else if (catchSocketError!"connect"(err))
1495 			return false;
1496 		
1497 		return true;
1498 	}
1499 	
1500 	bool catchErrors(string TRACE, T)(T val, Tuple!(T, Status)[] cmp ...)
1501 		if (isIntegral!T)
1502 	{
1503 		foreach (validator ; cmp) {
1504 			if (val == validator[0]) {
1505 				m_status.text = TRACE;
1506 				m_status.code = validator[1];
1507 				if (m_status.code == Status.EVLOOP_TIMEOUT) {
1508 					log(m_status);
1509 					break;
1510 				}
1511 				m_error = GetLastErrorSafe();
1512 				static if(LOG) log(m_status);
1513 				return true;
1514 			}
1515 		}
1516 		return false;
1517 	}
1518 	
1519 	bool catchSocketErrors(string TRACE, T)(T val, Tuple!(T, Status)[] cmp ...)
1520 		if (isIntegral!T)
1521 	{
1522 		foreach (validator ; cmp) {
1523 			if (val == validator[0]) {
1524 				m_status.text = TRACE;
1525 				m_error = WSAGetLastErrorSafe();
1526 				m_status.status = validator[1];
1527 				static if(LOG) log(m_status);
1528 				return true;
1529 			}
1530 		}
1531 		return false;
1532 	}
1533 	
1534 	bool catchSocketErrorsEq(string TRACE, T)(T val, Tuple!(T, error_t, Status)[] cmp ...)
1535 		if (isIntegral!T)
1536 	{
1537 		error_t err;
1538 		foreach (validator ; cmp) {
1539 			if (val == validator[0]) {
1540 				if (err is EWIN.init) err = WSAGetLastErrorSafe();
1541 				if (err == validator[1]) {
1542 					m_status.text = TRACE;
1543 					m_error = WSAGetLastErrorSafe();
1544 					m_status.code = validator[2];
1545 					static if(LOG) log(m_status);
1546 					return true;
1547 				}
1548 			}
1549 		}
1550 		return false;
1551 	}
1552 	
1553 	
1554 	bool catchSocketError(string TRACE, T)(T val, T cmp = SOCKET_ERROR)
1555 		if (isIntegral!T)
1556 	{
1557 		if (val == cmp) {
1558 			m_status.text = TRACE;
1559 			m_error = WSAGetLastErrorSafe();
1560 			m_status.code = Status.ABORT;
1561 			static if(LOG) log(m_status);
1562 			return true;
1563 		}
1564 		return false;
1565 	}
1566 	
1567 	error_t WSAGetLastErrorSafe() {
1568 		try {
1569 			return cast(error_t) WSAGetLastError();
1570 		} catch(Exception e) {
1571 			return EWIN.ERROR_ACCESS_DENIED;
1572 		}
1573 	}
1574 	
1575 	error_t GetLastErrorSafe() {
1576 		try {
1577 			return cast(error_t) GetLastError();
1578 		} catch(Exception e) {
1579 			return EWIN.ERROR_ACCESS_DENIED;
1580 		}
1581 	}
1582 	
1583 	void log(StatusInfo val)
1584 	{
1585 		static if (LOG) {
1586 			import std.stdio;
1587 			try {
1588 				writeln("Backtrace: ", m_status.text);
1589 				writeln(" | Status:  ", m_status.code);
1590 				writeln(" | Error: " , m_error);
1591 				if ((m_error in EWSAMessages) !is null)
1592 					writeln(" | Message: ", EWSAMessages[m_error]);
1593 			} catch(Exception e) {
1594 				return;
1595 			}
1596 		}
1597 	}
1598 	
1599 	void log(T)(T val)
1600 	{
1601 		static if (LOG) {
1602 			import std.stdio;
1603 			try {
1604 				writeln(val);
1605 			} catch(Exception e) {
1606 				return;
1607 			}
1608 		}
1609 	}
1610 	
1611 }
1612 
1613 mixin template TCPConnectionMixins() {
1614 	
1615 	private CleanupData m_impl;
1616 	
1617 	struct CleanupData {
1618 		bool connected;
1619 		bool connecting;
1620 	}
1621 	
1622 	@property bool* connecting() {
1623 		return &m_impl.connecting;
1624 	}
1625 	
1626 	@property bool* connected() {
1627 		return &m_impl.connected;
1628 	}
1629 	
1630 }
1631 
1632 mixin template TCPListenerDistMixins()
1633 {
1634 	import std.c.windows.windows : HWND;
1635 	import libasync.internals.hashmap : HashMap;
1636 	import core.sync.mutex;
1637 	private {
1638 		bool m_dist;
1639 		
1640 		Tuple!(WinReference, bool*) m_handles;
1641 		__gshared HashMap!(fd_t, Tuple!(WinReference, bool*)) gs_dist;
1642 		__gshared Mutex gs_mutex;
1643 	}
1644 
1645 	/// The TCP Listener schedules distributed connection handlers based on
1646 	/// the event loops that are using the same AsyncTCPListener object.
1647 	/// This is done by using WSAAsyncSelect on a different window after each
1648 	/// accept TCPEvent.
1649 	class WinReference {
1650 		private {
1651 			struct Item {
1652 				HWND handle;
1653 				bool active;
1654 			}
1655 
1656 			Item[] m_items;
1657 		}
1658 
1659 		this(HWND hndl, bool b) {
1660 			append(hndl, b);
1661 		}
1662 
1663 		void append(HWND hndl, bool b) {
1664 			m_items ~= Item(hndl, b);
1665 		}
1666 
1667 		HWND next(HWND me) {
1668 			Item[] items;
1669 			synchronized(gs_mutex)
1670 				items = m_items;
1671 			if (items.length == 1)
1672 				return me;
1673 			foreach (i, item; items) {
1674 				if (item.active == true) {
1675 					m_items[i].active = false; // remove responsibility
1676 					if (m_items.length <= i + 1) {
1677 						m_items[0].active = true; // set responsibility
1678 						auto ret = m_items[0].handle;
1679 						return ret;
1680 					}
1681 					else {
1682 						m_items[i + 1].active = true;
1683 						auto ret = m_items[i + 1].handle;
1684 						return ret;
1685 					}
1686 				}
1687 				
1688 			}
1689 			assert(false);
1690 		}
1691 
1692 	}
1693 
1694 	void init(HWND hndl, fd_t sock) {
1695 		try {
1696 			if (!gs_mutex) {
1697 				gs_mutex = new Mutex;
1698 			}
1699 			synchronized(gs_mutex) {
1700 				m_handles = gs_dist.get(sock);
1701 				if (m_handles == typeof(m_handles).init) {
1702 					gs_dist[sock] = Tuple!(WinReference, bool*)(new WinReference(hndl, true), &m_dist);
1703 					m_handles = gs_dist.get(sock);
1704 					assert(m_handles != typeof(m_handles).init);
1705 				}
1706 				else {
1707 					m_handles[0].append(hndl, false);
1708 					*m_handles[1] = true; // set first thread to dist
1709 					m_dist = true; // set this thread to dist
1710 				}
1711 			}
1712 		} catch (Exception e) {
1713 			assert(false, e.toString());
1714 		}
1715 		
1716 	}
1717 	
1718 	HWND next(HWND me) {
1719 		try {
1720 			if (!m_dist)
1721 				return HWND.init;
1722 			return m_handles[0].next(me);
1723 		}
1724 		catch (Exception e) {
1725 			assert(false, e.toString());
1726 		}
1727 	}
1728 	
1729 }
1730 private class DWHandlerInfo {
1731 	DWHandler handler;
1732 	Array!DWChangeInfo buffer;
1733 	
1734 	this(DWHandler cb) {
1735 		handler = cb;
1736 	}
1737 }
1738 
1739 private final class DWFolderWatcher {
1740 	import libasync.internals.path;
1741 private:
1742 	EventLoop m_evLoop;
1743 	fd_t m_fd;
1744 	bool m_recursive;
1745 	HANDLE m_handle;
1746 	Path m_path;
1747 	DWFileEvent m_events;
1748 	DWHandlerInfo m_handler; // contains buffer
1749 	shared AsyncSignal m_signal;
1750 	ubyte[FILE_NOTIFY_INFORMATION.sizeof + MAX_PATH + 1] m_buffer;
1751 	DWORD m_bytesTransferred;
1752 public:
1753 	this(EventLoop evl, in fd_t fd, in HANDLE hndl, in Path path, in DWFileEvent events, DWHandlerInfo handler, bool recursive) {
1754 		m_fd = fd;
1755 		m_recursive = recursive;
1756 		m_handle = cast(HANDLE)hndl;
1757 		m_evLoop = evl;
1758 		m_path = path;
1759 		m_handler = handler;
1760 		
1761 		m_signal = new shared AsyncSignal(m_evLoop);
1762 		m_signal.run(&onChanged);
1763 		triggerWatch();
1764 	}
1765 package:
1766 	void close() {
1767 		CloseHandle(m_handle);
1768 		m_signal.kill();
1769 	}
1770 	
1771 	void triggerChanged() {
1772 		m_signal.trigger();
1773 	}
1774 	
1775 	void onChanged() {
1776 		ubyte[] result = m_buffer.ptr[0 .. m_bytesTransferred];
1777 		do {
1778 			assert(result.length >= FILE_NOTIFY_INFORMATION.sizeof);
1779 			auto fni = cast(FILE_NOTIFY_INFORMATION*)result.ptr;
1780 			DWFileEvent kind;
1781 			switch( fni.Action ){
1782 				default: kind = DWFileEvent.MODIFIED; break;
1783 				case 0x1: kind = DWFileEvent.CREATED; break;
1784 				case 0x2: kind = DWFileEvent.DELETED; break;
1785 				case 0x3: kind = DWFileEvent.MODIFIED; break;
1786 				case 0x4: kind = DWFileEvent.MOVED_FROM; break;
1787 				case 0x5: kind = DWFileEvent.MOVED_TO; break;
1788 			}
1789 			string filename = to!string(fni.FileName.ptr[0 .. fni.FileNameLength/2]); // FileNameLength = #bytes, FileName=WCHAR[]
1790 			m_handler.buffer.insert(DWChangeInfo(kind, m_path ~ Path(filename)));
1791 			if( fni.NextEntryOffset == 0 ) break;
1792 			result = result[fni.NextEntryOffset .. $];
1793 		} while(result.length > 0);
1794 		
1795 		triggerWatch();
1796 		
1797 		m_handler.handler();
1798 	}
1799 	
1800 	void triggerWatch() {
1801 		
1802 		static UINT notifications = FILE_NOTIFY_CHANGE_FILE_NAME|FILE_NOTIFY_CHANGE_DIR_NAME|
1803 			FILE_NOTIFY_CHANGE_SIZE|FILE_NOTIFY_CHANGE_LAST_WRITE;
1804 		
1805 		OVERLAPPED* overlapped = FreeListObjectAlloc!OVERLAPPED.alloc();
1806 		overlapped.Internal = 0;
1807 		overlapped.InternalHigh = 0;
1808 		overlapped.Offset = 0;
1809 		overlapped.OffsetHigh = 0;
1810 		overlapped.Pointer = cast(void*)this;
1811 		import std.stdio;
1812 		DWORD bytesReturned;
1813 		BOOL success = ReadDirectoryChangesW(m_handle, m_buffer.ptr, m_buffer.length, cast(BOOL) m_recursive, notifications, &bytesReturned, overlapped, &onIOCompleted);
1814 
1815 		import std.stdio;
1816 		if (!success)
1817 			writeln("Failed to call ReadDirectoryChangesW: " ~ EWSAMessages[GetLastError().to!EWIN]);
1818 		
1819 	}
1820 	
1821 	@property fd_t fd() const {
1822 		return m_fd;
1823 	}
1824 	
1825 	@property HANDLE handle() const {
1826 		return cast(HANDLE) m_handle;
1827 	}
1828 	
1829 	static nothrow extern(System)
1830 	{
1831 		void onIOCompleted(DWORD dwError, DWORD cbTransferred, OVERLAPPED* overlapped)
1832 		{
1833 			import std.stdio;
1834 			DWFolderWatcher watcher = cast(DWFolderWatcher)(overlapped.Pointer);
1835 			watcher.m_bytesTransferred = cbTransferred;
1836 			try FreeListObjectAlloc!OVERLAPPED.free(overlapped); catch {}
1837 			if (dwError != 0)
1838 				try writeln("Diretory watcher error: "~EWSAMessages[dwError.to!EWIN]); catch{}
1839 			try watcher.triggerChanged();
1840 			catch (Exception e) {
1841 				try writeln("Failed to trigger change"); catch {}
1842 			}
1843 		}
1844 	}
1845 	
1846 }
1847 
1848 /**
1849 		Represents a network/socket address. (taken from vibe.core.net)
1850 */
1851 public struct NetworkAddress {
1852 	import std.c.windows.winsock : sockaddr, sockaddr_in, sockaddr_in6;
1853 	private union {
1854 		sockaddr addr;
1855 		sockaddr_in addr_ip4;
1856 		sockaddr_in6 addr_ip6;
1857 	}
1858 	
1859 	@property bool ipv6() const pure nothrow { return this.family == AF_INET6; }
1860 	
1861 	/** Family (AF_) of the socket address.
1862 		*/
1863 	@property ushort family() const pure nothrow { return addr.sa_family; }
1864 	/// ditto
1865 	@property void family(ushort val) pure nothrow { addr.sa_family = cast(ubyte)val; }
1866 	
1867 	/** The port in host byte order.
1868 		*/
1869 	@property ushort port()
1870 	const pure nothrow {
1871 		switch (this.family) {
1872 			default: assert(false, "port() called for invalid address family.");
1873 			case AF_INET: return ntoh(addr_ip4.sin_port);
1874 			case AF_INET6: return ntoh(addr_ip6.sin6_port);
1875 		}
1876 	}
1877 	/// ditto
1878 	@property void port(ushort val)
1879 	pure nothrow {
1880 		switch (this.family) {
1881 			default: assert(false, "port() called for invalid address family.");
1882 			case AF_INET: addr_ip4.sin_port = hton(val); break;
1883 			case AF_INET6: addr_ip6.sin6_port = hton(val); break;
1884 		}
1885 	}
1886 	
1887 	/** A pointer to a sockaddr struct suitable for passing to socket functions.
1888 		*/
1889 	@property inout(sockaddr)* sockAddr() inout pure nothrow { return &addr; }
1890 	
1891 	/** Size of the sockaddr struct that is returned by sockAddr().
1892 		*/
1893 	@property uint sockAddrLen()
1894 	const pure nothrow {
1895 		switch (this.family) {
1896 			default: assert(false, "sockAddrLen() called for invalid address family.");
1897 			case AF_INET: return addr_ip4.sizeof;
1898 			case AF_INET6: return addr_ip6.sizeof;
1899 		}
1900 	}
1901 	
1902 	@property inout(sockaddr_in)* sockAddrInet4() inout pure nothrow
1903 	in { assert (family == AF_INET); }
1904 	body { return &addr_ip4; }
1905 	
1906 	@property inout(sockaddr_in6)* sockAddrInet6() inout pure nothrow
1907 	in { assert (family == AF_INET6); }
1908 	body { return &addr_ip6; }
1909 	
1910 	/** Returns a string representation of the IP address
1911 		*/
1912 	string toAddressString()
1913 	const {
1914 		import std.array : appender;
1915 		import std.string : format;
1916 		import std.format : formattedWrite;
1917 		
1918 		switch (this.family) {
1919 			default: assert(false, "toAddressString() called for invalid address family.");
1920 			case AF_INET:
1921 				ubyte[4] ip = (cast(ubyte*)&addr_ip4.sin_addr.s_addr)[0 .. 4];
1922 				return format("%d.%d.%d.%d", ip[0], ip[1], ip[2], ip[3]);
1923 			case AF_INET6:
1924 				ubyte[16] ip = addr_ip6.sin6_addr.s6_addr;
1925 				auto ret = appender!string();
1926 				ret.reserve(40);
1927 				foreach (i; 0 .. 8) {
1928 					if (i > 0) ret.put(':');
1929 					ret.formattedWrite("%x", bigEndianToNative!ushort(cast(ubyte[2])ip[i*2 .. i*2+2].ptr[0 .. 2]));
1930 				}
1931 				return ret.data;
1932 		}
1933 	}
1934 	
1935 	/** Returns a full string representation of the address, including the port number.
1936 		*/
1937 	string toString()
1938 	const {
1939 		
1940 		import std.string : format;
1941 		
1942 		auto ret = toAddressString();
1943 		switch (this.family) {
1944 			default: assert(false, "toString() called for invalid address family.");
1945 			case AF_INET: return ret ~ format(":%s", port);
1946 			case AF_INET6: return format("[%s]:%s", ret, port);
1947 		}
1948 	}
1949 	
1950 }
1951 
1952 private pure nothrow {
1953 	import std.bitmanip;
1954 	
1955 	ushort ntoh(ushort val)
1956 	{
1957 		version (LittleEndian) return swapEndian(val);
1958 		else version (BigEndian) return val;
1959 		else static assert(false, "Unknown endianness.");
1960 	}
1961 	
1962 	ushort hton(ushort val)
1963 	{
1964 		version (LittleEndian) return swapEndian(val);
1965 		else version (BigEndian) return val;
1966 		else static assert(false, "Unknown endianness.");
1967 	}
1968 }
1969 enum WM_TCP_SOCKET = WM_USER+102;
1970 enum WM_UDP_SOCKET = WM_USER+103;
1971 enum WM_USER_EVENT = WM_USER+104;
1972 enum WM_USER_SIGNAL = WM_USER+105;
1973 
1974 nothrow:
1975 size_t g_idxCapacity = 8;
1976 Array!size_t g_idxAvailable;
1977 
1978 // called on run
1979 size_t createIndex() {
1980 	size_t idx;
1981 	import std.algorithm : max;
1982 	import std.range : iota;
1983 	try {
1984 		
1985 		size_t getIdx() {
1986 			
1987 			if (!g_idxAvailable.empty) {
1988 				immutable size_t ret = g_idxAvailable.back;
1989 				g_idxAvailable.removeBack();
1990 				return ret;
1991 			}
1992 			return 0;
1993 		}
1994 		
1995 		idx = getIdx();
1996 		if (idx == 0) {
1997 			import std.range : iota;
1998 			// todo: not sure about this
1999 			g_idxAvailable.insert( iota(g_idxCapacity, max(32, g_idxCapacity * 2), 1) );
2000 			g_idxCapacity = max(32, g_idxCapacity * 2);
2001 			idx = getIdx();
2002 		}
2003 		
2004 	} catch {}
2005 	
2006 	return idx;
2007 }
2008 
2009 void destroyIndex(AsyncTimer ctxt) {
2010 	try {
2011 		g_idxAvailable.insert(ctxt.id);		
2012 	}
2013 	catch (Exception e) {
2014 		assert(false, "Error destroying index: " ~ e.msg);
2015 	}
2016 }
2017 
2018 
2019 nothrow extern(System) {
2020 	LRESULT wndProc(HWND wnd, UINT msg, WPARAM wparam, LPARAM lparam)
2021 	{
2022 		auto ptr = cast(void*)GetWindowLongPtrA(wnd, GWLP_USERDATA);
2023 		if (ptr is null) 
2024 			return DefWindowProcA(wnd, msg, wparam, lparam);
2025 		auto appl = cast(EventLoopImpl*)ptr;
2026 		MSG obj = MSG(wnd, msg, wparam, lparam, DWORD.init, POINT.init);
2027 		if (appl.onMessage(obj)) {
2028 			static if (DEBUG) {
2029 				if (appl.status.code != Status.OK && appl.status.code != Status.ASYNC) {
2030 				import std.stdio : writeln;
2031 					try { writeln(appl.error, ": ", appl.m_status.text); } catch {}
2032 				}
2033 			}
2034 			return 0;
2035 		}
2036 		else return DefWindowProcA(wnd, msg, wparam, lparam);
2037 	}
2038 	
2039 	BOOL PostMessageA(HWND hWnd, UINT Msg, WPARAM wParam, LPARAM lParam);
2040 	
2041 }