1 module libasync.windows;
2 
3 version (Windows):
4 
5 import core.atomic;
6 import core.thread : Fiber;
7 import libasync.types;
8 import std.container : Array;
9 import std.string : toStringz;
10 import std.conv : to;
11 import std.datetime : Duration, msecs, seconds;
12 import std.algorithm : min;
13 import libasync.internals.win32;
14 import std.traits : isIntegral;
15 import std.typecons : Tuple, tuple;
16 import std.utf : toUTFz;
17 import core.sync.mutex;
18 import libasync.events;
19 import memutils.utils;
20 import memutils.hashmap;
21 import memutils.vector;
22 pragma(lib, "ws2_32");
23 pragma(lib, "ole32");
24 alias fd_t = SIZE_T;
25 alias error_t = EWIN;
26 
27 //todo :  see if new connections with SO_REUSEADDR are evenly distributed between threads
28 
29 
30 package struct EventLoopImpl {
31 	pragma(msg, "Using Windows IOCP for events");
32 	
33 private:
34 	HashMap!(fd_t, TCPAcceptHandler) m_connHandlers; // todo: Change this to an array
35 	HashMap!(fd_t, TCPEventHandler) m_tcpHandlers;
36 	HashMap!(fd_t, TimerHandler) m_timerHandlers;
37 	HashMap!(fd_t, UDPHandler) m_udpHandlers;
38 	HashMap!(fd_t, DWHandlerInfo) m_dwHandlers; // todo: Change this to an array too
39 	HashMap!(uint, DWFolderWatcher) m_dwFolders;
40 	HashMap!(fd_t, tcp_keepalive)* kcache;
41 	~this() { kcache.destroy(); }
42 nothrow:
43 private:
44 	struct TimerCache {
45 		TimerHandler cb;
46 		fd_t fd;
47 	}
48 	TimerCache m_timer;
49 	
50 	EventLoop m_evLoop;
51 	bool m_started;
52 	wstring m_window;
53 	HWND m_hwnd;
54 	DWORD m_threadId;
55 	HANDLE[] m_waitObjects;
56 	ushort m_instanceId;
57 	StatusInfo m_status;
58 	error_t m_error = EWIN.WSA_OK;
59 	__gshared Mutex gs_mtx;
60 package:
61 	@property bool started() const {
62 		return m_started;
63 	}
64 	bool init(EventLoop evl) 
65 	in { assert(!m_started); }
66 	body
67 	{
68 		try if (!gs_mtx)
69 			gs_mtx = new Mutex; catch {}
70 		static ushort j;
71 		assert (j == 0, "Current implementation is only tested with 1 event loop per thread. There are known issues with signals on linux.");
72 		j += 1;
73 		m_status = StatusInfo.init;
74 		
75 		import core.thread;
76 		//try Thread.getThis().priority = Thread.PRIORITY_MAX;
77 		//catch (Exception e) { assert(false, "Could not set thread priority"); }
78 		SetThreadPriority(GetCurrentThread(), 31);
79 		m_evLoop = evl;
80 		shared static ushort i;
81 		m_instanceId = i;
82 		core.atomic.atomicOp!"+="(i, cast(ushort) 1);
83 		wstring inststr;
84 		import std.conv : to;
85 		try { inststr = m_instanceId.to!wstring; }
86 		catch (Exception e) {
87 			return false;
88 		}
89 		m_window = "VibeWin32MessageWindow" ~ inststr;
90 		wstring classname = "VibeWin32MessageWindow" ~ inststr;
91 		
92 		LPCWSTR wnz;
93 		LPCWSTR clsn;
94 		try {
95 			wnz = cast(LPCWSTR) m_window.toUTFz!(immutable(wchar)*);
96 			clsn = cast(LPCWSTR) classname.toUTFz!(immutable(wchar)*);
97 		} catch (Exception e) {
98 			setInternalError!"toUTFz"(Status.ERROR, e.msg);
99 			return false;
100 		}
101 		
102 		m_threadId = GetCurrentThreadId();
103 		WNDCLASSW wc;
104 		wc.lpfnWndProc = &wndProc;
105 		wc.lpszClassName = clsn;
106 		RegisterClassW(&wc);
107 		m_hwnd = CreateWindowW(wnz, clsn, 0, 0, 0, 385, 375, HWND_MESSAGE,
108 		                       cast(HMENU) null, null, null);
109 		try log("Window registered: " ~ m_hwnd.to!string); catch{}
110 		auto ptr = cast(ULONG_PTR)cast(void*)&this;
111 		SetWindowLongPtrA(m_hwnd, GWLP_USERDATA, ptr);
112 		assert( cast(EventLoopImpl*)cast(void*)GetWindowLongPtrA(m_hwnd, GWLP_USERDATA) is &this );
113 		WSADATA wd;
114 		m_error = cast(error_t) WSAStartup(0x0202, &wd);
115 		if (m_error == EWIN.WSA_OK)	
116 			m_status.code = Status.OK;
117 		else {
118 			m_status.code = Status.ABORT;
119 			static if(LOG) log(m_status);
120 			return false;
121 		}
122 		assert(wd.wVersion == 0x0202);
123 		m_started = true;
124 		return true;
125 	}
126 	
127 	// todo: find where to call this
128 	void exit() {
129 		cast(void)PostThreadMessageW(m_threadId, WM_QUIT, 0, 0);
130 	}
131 	
132 	@property StatusInfo status() const {
133 		return m_status;
134 	}
135 	
136 	@property string error() const {
137 		string* ptr;
138 		string pv = ((ptr = (m_error in EWSAMessages)) !is null) ? *ptr : string.init;
139 		return pv;
140 	}
141 	
142 	bool loop(Duration timeout = 0.seconds)
143 	in { 
144 		assert(Fiber.getThis() is null); 
145 		assert(m_started);
146 	}
147 	body {
148 		DWORD msTimeout;
149 
150 		if (timeout == -1.seconds)
151 			msTimeout = DWORD.max;
152 		else msTimeout = cast(DWORD) min(timeout.total!"msecs", DWORD.max);
153 
154 		/* 
155 		 * Waits until one or all of the specified objects are in the signaled state
156 		 * http://msdn.microsoft.com/en-us/library/windows/desktop/ms684245%28v=vs.85%29.aspx
157 		*/
158 		DWORD signal = MsgWaitForMultipleObjectsEx(
159 			cast(DWORD)0,
160 			null,
161 			msTimeout,
162 			QS_ALLEVENTS,								
163 			MWMO_ALERTABLE | MWMO_INPUTAVAILABLE		// MWMO_ALERTABLE: Wakes up to execute overlapped hEvent (i/o completion)
164 			// MWMO_INPUTAVAILABLE: Processes key/mouse input to avoid window ghosting
165 			);
166 		
167 		auto errors = 
168 		[ tuple(WAIT_FAILED, Status.EVLOOP_FAILURE) ];	/* WAIT_FAILED: Failed to call MsgWait..() */
169 		
170 		if (signal == WAIT_TIMEOUT)
171 			return true;
172 		
173 		if (catchErrors!"MsgWaitForMultipleObjectsEx"(signal, errors)) {
174 			log("Event Loop Exiting because of error");
175 			return false; 
176 		}
177 		
178 		MSG msg;
179 		while (PeekMessageW(&msg, null, 0, 0, PM_REMOVE)) {
180 			m_status = StatusInfo.init;
181 			TranslateMessage(&msg);
182 			DispatchMessageW(&msg);
183 
184 			if (m_status.code == Status.ERROR) {
185 				log(m_status.text);
186 				return false;
187 			}
188 		}
189 		return true;
190 	}
191 	
192 	fd_t run(AsyncTCPListener ctxt, TCPAcceptHandler del)
193 	{
194 		m_status = StatusInfo.init;
195 		fd_t fd = ctxt.socket;
196 		bool reusing;
197 		if (fd == fd_t.init) {
198 
199 			fd = WSASocketW(cast(int)ctxt.local.family, SOCK_STREAM, IPPROTO_TCP, null, 0, WSA_FLAG_OVERLAPPED);
200 			
201 			if (catchSocketError!("run AsyncTCPConnection")(fd, INVALID_SOCKET))
202 				return 0;
203 			
204 			if (!setOption(fd, TCPOption.REUSEADDR, true))
205 				return 0;
206 			
207 			// todo: defer accept?
208 			
209 			if (ctxt.noDelay) {
210 				if (!setOption(fd, TCPOption.NODELAY, true))
211 					return 0;
212 			}
213 		} else reusing = true;
214 
215 		if (initTCPListener(fd, ctxt, reusing))
216 		{
217 			try {
218 				log("Running listener on socket fd#" ~ fd.to!string);
219 				m_connHandlers[fd] = del;
220 				version(Distributed)ctxt.init(m_hwnd, fd);
221 			}
222 			catch (Exception e) {
223 				setInternalError!"m_connHandlers assign"(Status.ERROR, e.msg);
224 				closeSocket(fd, false);
225 				return 0;
226 			}
227 		}
228 		else
229 		{
230 			return 0;
231 		}
232 
233 
234 		return fd;
235 	}
236 	
237 	fd_t run(AsyncTCPConnection ctxt, TCPEventHandler del)
238 	in { 
239 		assert(ctxt.socket == fd_t.init); 
240 		assert(ctxt.peer.family != AF_UNSPEC);
241 	}
242 	body {
243 		m_status = StatusInfo.init;
244 		fd_t fd = ctxt.preInitializedSocket;
245 
246 		if (fd == fd_t.init)
247 			fd = WSASocketW(cast(int)ctxt.peer.family, SOCK_STREAM, IPPROTO_TCP, null, 0, WSA_FLAG_OVERLAPPED);
248 		log("Starting connection at: " ~ fd.to!string);
249 		if (catchSocketError!("run AsyncTCPConnection")(fd, INVALID_SOCKET))
250 			return 0;
251 		
252 		try {
253 			(m_tcpHandlers)[fd] = del;
254 		}
255 		catch (Exception e) {
256 			setInternalError!"m_tcpHandlers assign"(Status.ERROR, e.msg);
257 			return 0;
258 		}
259 		
260 		if (ctxt.noDelay) {
261 			if (!setOption(fd, TCPOption.NODELAY, true))
262 				return 0;
263 		}
264 		
265 		if (!initTCPConnection(fd, ctxt)) {
266 			try {
267 				log("Remove event handler for " ~ fd.to!string);
268 				m_tcpHandlers.remove(fd);
269 			}
270 			catch (Exception e) {
271 				setInternalError!"m_tcpHandlers remove"(Status.ERROR, e.msg);
272 			}
273 			
274 			closeSocket(fd, false);
275 			return 0;
276 		}
277 		
278 		
279 		try log("Client started FD#" ~ fd.to!string);
280 		catch{}
281 		return fd;
282 	}
283 	
284 	fd_t run(AsyncUDPSocket ctxt, UDPHandler del) {
285 		m_status = StatusInfo.init;
286 		fd_t fd = ctxt.preInitializedSocket;
287 
288 		if (fd == fd_t.init)
289 			fd = WSASocketW(cast(int)ctxt.local.family, SOCK_DGRAM, IPPROTO_UDP, null, 0, WSA_FLAG_OVERLAPPED);
290 		
291 		if (catchSocketError!("run AsyncUDPSocket")(fd, INVALID_SOCKET))
292 			return 0;
293 		
294 		if (initUDPSocket(fd, ctxt))
295 		{
296 			try {
297 				(m_udpHandlers)[fd] = del;
298 			}
299 			catch (Exception e) {
300 				setInternalError!"m_udpHandlers assign"(Status.ERROR, e.msg);
301 				closeSocket(fd, false);
302 				return 0;
303 			}
304 		}
305 		
306 		try log("UDP Socket started FD#" ~ fd.to!string);
307 		catch{}
308 		
309 		return fd;
310 	}
311 	
312 	fd_t run(shared AsyncSignal ctxt) {
313 		m_status = StatusInfo.init;
314 		try log("Signal subscribed to: " ~ m_hwnd.to!string); catch {}
315 		return (cast(fd_t)m_hwnd);
316 	}
317 	
318 	fd_t run(AsyncNotifier ctxt) {
319 		m_status = StatusInfo.init;
320 		//try log("Running signal " ~ (cast(AsyncNotifier)ctxt).to!string); catch {}
321 		return cast(fd_t) m_hwnd;
322 	}
323 	
324 	fd_t run(AsyncTimer ctxt, TimerHandler del, Duration timeout) {
325 		if (timeout < 0.seconds)
326 			timeout = 0.seconds;
327 		m_status = StatusInfo.init;
328 		fd_t timer_id = ctxt.id;
329 		if (timer_id == fd_t.init) {
330 			timer_id = createIndex();
331 		}
332 		try log("Timer created: " ~ timer_id.to!string ~ " with timeout: " ~ timeout.total!"msecs".to!string ~ " msecs"); catch {}
333 		
334 		BOOL err;
335 		try err = cast(int)SetTimer(m_hwnd, timer_id, timeout.total!"msecs".to!uint+30, null);
336 		catch(Exception e) {
337 			setInternalError!"SetTimer"(Status.ERROR);
338 			return 0;
339 		}
340 		
341 		if (err == 0)
342 		{
343 			m_error = GetLastErrorSafe();
344 			m_status.code = Status.ERROR;
345 			m_status.text = "kill(AsyncTimer)";
346 			log(m_status);
347 			return 0;
348 		}
349 		
350 		if (m_timer.fd == fd_t.init) 
351 		{
352 			m_timer.fd = timer_id;
353 			m_timer.cb = del;
354 		}
355 		else {
356 			try
357 			{
358 				(m_timerHandlers)[timer_id] = del;
359 			}
360 			catch (Exception e) {
361 				setInternalError!"HashMap assign"(Status.ERROR);
362 				return 0;
363 			}
364 		}
365 		
366 		
367 		return timer_id;
368 	}
369 	
370 	fd_t run(AsyncDirectoryWatcher ctxt, DWHandler del)
371 	{
372 		static fd_t ids;
373 		auto fd = ++ids;
374 		
375 		try (m_dwHandlers)[fd] = new DWHandlerInfo(del); 
376 		catch (Exception e) {
377 			setInternalError!"AsyncDirectoryWatcher.hashMap(run)"(Status.ERROR, "Could not add handler to hashmap: " ~ e.msg);
378 		}
379 		
380 		return fd;
381 		
382 	}
383 	
384 	bool kill(AsyncDirectoryWatcher ctxt) {
385 		
386 		try {
387 			Array!DWFolderWatcher toFree;
388 			foreach (ref const uint k, const DWFolderWatcher v; m_dwFolders) {
389 				if (v.fd == ctxt.fd) {
390 					CloseHandle(v.handle);
391 					m_dwFolders.remove(k);
392 				}
393 			}
394 			
395 			foreach (DWFolderWatcher obj; toFree[])
396 				ThreadMem.free(obj);
397 			
398 			// todo: close all the handlers...
399 			m_dwHandlers.remove(ctxt.fd);
400 		}
401 		catch (Exception e) {
402 			setInternalError!"in kill(AsyncDirectoryWatcher)"(Status.ERROR, e.msg);
403 			return false;
404 		}
405 		
406 		return true;
407 	}
408 	
409 	bool kill(AsyncTCPConnection ctxt, bool forced = false)
410 	{
411 		
412 		m_status = StatusInfo.init;
413 		fd_t fd = ctxt.socket;
414 		
415 		log("Killing socket "~ fd.to!string);
416 		try { 
417 			auto cb = m_tcpHandlers.get(ctxt.socket);
418 			if (cb != TCPEventHandler.init){
419 				*cb.conn.connected = false;
420 				*cb.conn.connecting = false;
421 				return closeSocket(fd, true, forced);
422 			}
423 		} catch (Exception e) {
424 			setInternalError!"in m_tcpHandlers"(Status.ERROR, e.msg);
425 			assert(false);
426 			//return false;
427 		}
428 		
429 		return true;
430 	}
431 	
432 	bool kill(AsyncTCPListener ctxt)
433 	{
434 		m_status = StatusInfo.init;
435 		fd_t fd = ctxt.socket;
436 		try { 
437 			if ((ctxt.socket in m_connHandlers) !is null) {
438 				return closeSocket(fd, false, true);
439 			}
440 		} catch (Exception e) {
441 			setInternalError!"in m_connHandlers"(Status.ERROR, e.msg);
442 			return false;
443 		}
444 		
445 		return true;
446 	}
447 	
448 	bool kill(shared AsyncSignal ctxt) {
449 		return true;
450 	}
451 	
452 	bool kill(AsyncNotifier ctxt) {
453 		return true;
454 	}
455 	
456 	bool kill(AsyncTimer ctxt) {
457 		m_status = StatusInfo.init;
458 		
459 		log("Kill timer");
460 		
461 		BOOL err = KillTimer(m_hwnd, ctxt.id);
462 		if (err == 0)
463 		{
464 			m_error = GetLastErrorSafe();
465 			m_status.code = Status.ERROR;
466 			m_status.text = "kill(AsyncTimer)";
467 			log(m_status);
468 			return false;
469 		}
470 		
471 		destroyIndex(ctxt);
472 		
473 		if (m_timer.fd == ctxt.id) {
474 			ctxt.id = 0;
475 			m_timer = TimerCache.init;
476 		} else {
477 			try {
478 				m_timerHandlers.remove(ctxt.id);
479 			}
480 			catch (Exception e) {
481 				setInternalError!"HashMap remove"(Status.ERROR);
482 				return 0;
483 			}
484 		}
485 		
486 		
487 		return true;
488 	}
489 	
490 	bool kill(AsyncUDPSocket ctxt) {
491 		m_status = StatusInfo.init;
492 		
493 		fd_t fd = ctxt.socket;
494 		INT err = closesocket(fd);
495 		if (catchSocketError!"closesocket"(err)) 
496 			return false;
497 		
498 		try m_udpHandlers.remove(ctxt.socket);
499 		catch (Exception e) {
500 			setInternalError!"HashMap remove"(Status.ERROR);
501 			return 0;
502 		}
503 		
504 		return true;
505 	}
506 	
507 	bool setOption(T)(fd_t fd, TCPOption option, in T value) {
508 		m_status = StatusInfo.init;
509 		int err;
510 		try {
511 			nothrow bool errorHandler() {
512 				if (catchSocketError!"setOption:"(err)) {
513 					try m_status.text ~= option.to!string;
514 					catch (Exception e){ assert(false, "to!string conversion failure"); }
515 					return false;
516 				}
517 				
518 				return true;
519 			}
520 			
521 
522 			
523 			final switch (option) {
524 				
525 				case TCPOption.NODELAY: // true/false
526 					static if (!is(T == bool))
527 						assert(false, "NODELAY value type must be bool, not " ~ T.stringof);
528 					else {
529 						BOOL val = value?1:0;
530 						socklen_t len = val.sizeof;
531 						err = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, len);
532 						return errorHandler();
533 					}
534 				case TCPOption.REUSEADDR: // true/false
535 					static if (!is(T == bool))
536 						assert(false, "REUSEADDR value type must be bool, not " ~ T.stringof);
537 					else
538 					{
539 						BOOL val = value?1:0;
540 						socklen_t len = val.sizeof;
541 						err = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &val, len);
542 						return errorHandler();
543 					}
544 				case TCPOption.QUICK_ACK:
545 					static if (!is(T == bool))
546 						assert(false, "QUICK_ACK value type must be bool, not " ~ T.stringof);
547 					else {
548 						m_status.code = Status.NOT_IMPLEMENTED;
549 						return false; // quick ack is not implemented
550 					}
551 				case TCPOption.KEEPALIVE_ENABLE: // true/false
552 					static if (!is(T == bool))
553 						assert(false, "KEEPALIVE_ENABLE value type must be bool, not " ~ T.stringof);
554 					else
555 					{
556 						BOOL val = value?1:0;
557 						socklen_t len = val.sizeof;
558 						err = setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &val, len);
559 						return errorHandler();
560 					}
561 				case TCPOption.KEEPALIVE_COUNT: // retransmit 10 times before dropping half-open conn
562 					static if (!isIntegral!T)
563 						assert(false, "KEEPALIVE_COUNT value type must be integral, not " ~ T.stringof);
564 					else {
565 						m_status.code = Status.NOT_IMPLEMENTED;
566 						return false;
567 					}
568 				case TCPOption.KEEPALIVE_INTERVAL: // wait ## seconds between each keepalive packets
569 					static if (!is(T == Duration))
570 						assert(false, "KEEPALIVE_INTERVAL value type must be Duration, not " ~ T.stringof);
571 					else {
572 						
573 						if (!kcache)
574 							kcache = new HashMap!(fd_t, tcp_keepalive)();
575 						
576 						tcp_keepalive kaSettings = kcache.get(fd, tcp_keepalive.init);
577 						tcp_keepalive sReturned;
578 						DWORD dwBytes;
579 						kaSettings.onoff = ULONG(1);
580 						if (kaSettings.keepalivetime == ULONG.init) {
581 							kaSettings.keepalivetime = 1000;
582 						}
583 						kaSettings.keepaliveinterval = value.total!"msecs".to!ULONG;
584 						(*kcache)[fd] = kaSettings;
585 						err = WSAIoctl(fd, SIO_KEEPALIVE_VALS, &kaSettings, tcp_keepalive.sizeof, &sReturned, tcp_keepalive.sizeof, &dwBytes, null, null);
586 						
587 						return errorHandler();
588 					}
589 				case TCPOption.KEEPALIVE_DEFER: // wait ## seconds until start
590 					static if (!is(T == Duration))
591 						assert(false, "KEEPALIVE_DEFER value type must be Duration, not " ~ T.stringof);
592 					else {
593 						
594 						if (!kcache)
595 							kcache = new HashMap!(fd_t, tcp_keepalive)();
596 						
597 						tcp_keepalive kaSettings = kcache.get(fd, tcp_keepalive.init);
598 						tcp_keepalive sReturned;
599 						DWORD dwBytes;
600 						kaSettings.onoff = ULONG(1);
601 						if (kaSettings.keepaliveinterval == ULONG.init) {
602 							kaSettings.keepaliveinterval = 75*1000;
603 						}
604 						kaSettings.keepalivetime = value.total!"msecs".to!ULONG;
605 						
606 						(*kcache)[fd] = kaSettings;
607 						err = WSAIoctl(fd, SIO_KEEPALIVE_VALS, &kaSettings, tcp_keepalive.sizeof, &sReturned, tcp_keepalive.sizeof, &dwBytes, null, null);
608 						
609 						return errorHandler();
610 					}
611 				case TCPOption.BUFFER_RECV: // bytes
612 					static if (!isIntegral!T)
613 						assert(false, "BUFFER_RECV value type must be integral, not " ~ T.stringof);
614 					else {
615 						int val = value.to!int;
616 						socklen_t len = val.sizeof;
617 						err = setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &val, len);
618 						return errorHandler();
619 					}
620 				case TCPOption.BUFFER_SEND: // bytes
621 					static if (!isIntegral!T)
622 						assert(false, "BUFFER_SEND value type must be integral, not " ~ T.stringof);
623 					else {
624 						int val = value.to!int;
625 						socklen_t len = val.sizeof;
626 						err = setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &val, len);
627 						return errorHandler();
628 					}
629 				case TCPOption.TIMEOUT_RECV:
630 					static if (!is(T == Duration))
631 						assert(false, "TIMEOUT_RECV value type must be Duration, not " ~ T.stringof);
632 					else {
633 						DWORD val = value.total!"msecs".to!DWORD;
634 						socklen_t len = val.sizeof;
635 						err = setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, &val, len);
636 						return errorHandler();
637 					}
638 				case TCPOption.TIMEOUT_SEND:
639 					static if (!is(T == Duration))
640 						assert(false, "TIMEOUT_SEND value type must be Duration, not " ~ T.stringof);
641 					else {
642 						DWORD val = value.total!"msecs".to!DWORD;
643 						socklen_t len = val.sizeof;
644 						err = setsockopt(fd, SOL_SOCKET, SO_SNDTIMEO, &val, len);
645 						return errorHandler();
646 					}
647 				case TCPOption.TIMEOUT_HALFOPEN:
648 					static if (!is(T == Duration))
649 						assert(false, "TIMEOUT_SEND value type must be Duration, not " ~ T.stringof);
650 					else {
651 						m_status.code = Status.NOT_IMPLEMENTED;
652 						return false;
653 					}
654 				case TCPOption.LINGER: // bool onOff, int seconds
655 					static if (!is(T == Tuple!(bool, int)))
656 						assert(false, "LINGER value type must be Tuple!(bool, int), not " ~ T.stringof);
657 					else {
658 						linger l = linger(val[0]?1:0, val[1].to!USHORT);
659 						socklen_t llen = l.sizeof;
660 						err = setsockopt(fd, SOL_SOCKET, SO_LINGER, &l, llen);
661 						return errorHandler();
662 					}
663 				case TCPOption.CONGESTION:
664 					static if (!isIntegral!T)
665 						assert(false, "CONGESTION value type must be integral, not " ~ T.stringof);
666 					else {
667 						m_status.code = Status.NOT_IMPLEMENTED;
668 						return false;
669 					}
670 				case TCPOption.CORK:
671 					static if (!isIntegral!T)
672 						assert(false, "CORK value type must be int, not " ~ T.stringof);
673 					else {
674 						m_status.code = Status.NOT_IMPLEMENTED;
675 						return false;
676 					}
677 				case TCPOption.DEFER_ACCEPT: // seconds
678 					static if (!isIntegral!T)
679 						assert(false, "DEFER_ACCEPT value type must be integral, not " ~ T.stringof);
680 					else {
681 						int val = value.to!int;
682 						socklen_t len = val.sizeof;
683 						err = setsockopt(fd, SOL_SOCKET, SO_CONDITIONAL_ACCEPT, &val, len);
684 						return errorHandler();
685 					}
686 			}
687 			
688 		}
689 		catch (Exception e) {
690 			return false;
691 		}
692 		
693 	}
694 	
695 	uint read(in fd_t fd, ref ubyte[] data)
696 	{
697 		return 0;
698 	}
699 	
700 	uint write(in fd_t fd, in ubyte[] data)
701 	{
702 		return 0;
703 	}
704 	
705 	uint readChanges(in fd_t fd, ref DWChangeInfo[] dst) {
706 		size_t i;
707 		Array!DWChangeInfo* changes;
708 		try {
709 			changes = &(m_dwHandlers.get(fd, DWHandlerInfo.init).buffer);
710 			if ((*changes).empty)
711 				return 0;
712 			
713 			import std.algorithm : min;
714 			size_t cnt = min(dst.length, changes.length);
715 			foreach (DWChangeInfo change; (*changes)[0 .. cnt]) {
716 				try log("reading change: " ~ change.path); catch {}
717 				dst[i] = (*changes)[i];
718 				i++;
719 			}
720 			changes.linearRemove((*changes)[0 .. cnt]);
721 		}
722 		catch (Exception e) {
723 			setInternalError!"watcher.readChanges"(Status.ERROR, "Could not read directory changes: " ~ e.msg);
724 			return 0;
725 		}
726 		try log("Changes returning with: " ~ i.to!string); catch {}
727 		return cast(uint) i;
728 	}
729 	
730 	uint watch(in fd_t fd, in WatchInfo info) {
731 		m_status = StatusInfo.init;
732 		uint wd;
733 		try {
734 			HANDLE hndl = CreateFileW(toUTFz!(const(wchar)*)(info.path.toNativeString()),
735 			                          FILE_LIST_DIRECTORY,
736 			                          FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE,
737 			                          null,
738 			                          OPEN_EXISTING,
739 			                          FILE_FLAG_BACKUP_SEMANTICS | FILE_FLAG_OVERLAPPED,
740 			                          null);
741 			wd = cast(uint) hndl;
742 			DWHandlerInfo handler = m_dwHandlers.get(fd, DWHandlerInfo.init);
743 			assert(handler !is null);
744 			log("Watching: " ~ info.path.toNativeString());
745 			(m_dwFolders)[wd] = ThreadMem.alloc!DWFolderWatcher(m_evLoop, fd, hndl, info.path, info.events, handler, info.recursive);
746 		} catch (Exception e) {
747 			setInternalError!"watch"(Status.ERROR, "Could not start watching directory: " ~ e.msg);
748 			return 0;
749 		}
750 		return wd;
751 	}
752 	
753 	bool unwatch(in fd_t fd, in fd_t _wd) {
754 		uint wd = cast(uint) _wd;
755 		m_status = StatusInfo.init;
756 		try {
757 			DWFolderWatcher fw = m_dwFolders.get(wd, null);
758 			assert(fw !is null);
759 			m_dwFolders.remove(wd);
760 			fw.close();
761 			ThreadMem.free(fw);
762 		} catch (Exception e) {
763 			setInternalError!"unwatch"(Status.ERROR, "Failed when unwatching directory: " ~ e.msg);
764 			return false;
765 		}
766 		return true;
767 	}
768 	
769 	bool notify(T)(in fd_t fd, in T payload) 
770 		if (is(T == shared AsyncSignal) || is(T == AsyncNotifier))
771 	{
772 		m_status = StatusInfo.init;
773 		import std.conv;
774 
775 		auto payloadPtr = cast(ubyte*)payload;
776 		auto payloadAddr = cast(ulong)payloadPtr;
777 
778 		WPARAM wparam = payloadAddr & 0xffffffff;
779 		LPARAM lparam = cast(uint) (payloadAddr >> 32);
780 
781 		BOOL err;
782 		static if (is(T == AsyncNotifier))
783 			err = PostMessageA(cast(HWND)fd, WM_USER_SIGNAL, wparam, lparam);
784 		else
785 			err = PostMessageA(cast(HWND)fd, WM_USER_EVENT, wparam, lparam);
786 		try log("Sending notification to: " ~ (cast(HWND)fd).to!string); catch {}
787 		if (err == 0)
788 		{
789 			m_error = GetLastErrorSafe();
790 			m_status.code = Status.ERROR;
791 			m_status.text = "notify";
792 			log(m_status);
793 			return false;
794 		}
795 		return true;
796 	}
797 	
798 	uint recv(in fd_t fd, ref ubyte[] data)
799 	{
800 		m_status = StatusInfo.init;
801 		int ret = .recv(fd, cast(void*) data.ptr, cast(INT) data.length, 0);
802 		
803 		//try log("RECV " ~ ret.to!string ~ "B FD#" ~ fd.to!string); catch {}
804 		if (catchSocketError!".recv"(ret)) { // ret == -1
805 			if (m_error == error_t.WSAEWOULDBLOCK)
806 				m_status.code = Status.ASYNC;
807 			return 0; // TODO: handle some errors more specifically
808 		}
809 		m_status.code = Status.OK;
810 		
811 		return cast(uint) ret;
812 	}
813 	
814 	uint send(in fd_t fd, in ubyte[] data)
815 	{
816 		m_status = StatusInfo.init;
817 		//try log("SEND " ~ data.length.to!string ~ "B FD#" ~ fd.to!string);
818 		//catch{}
819 		int ret = .send(fd, cast(const(void)*) data.ptr, cast(INT) data.length, 0);
820 		
821 		if (catchSocketError!"send"(ret)) {
822 			if (m_error == error_t.WSAEWOULDBLOCK)
823 				m_status.code = Status.ASYNC;
824 			return 0; // TODO: handle some errors more specifically
825 		}
826 		m_status.code = Status.ASYNC;
827 		return cast(uint) ret;
828 	}
829 	
830 	bool broadcast(in fd_t fd, bool b) {
831 		int val = b?1:0;
832 		socklen_t len = val.sizeof;
833 		int err = setsockopt(fd, SOL_SOCKET, SO_BROADCAST, &val, len);
834 		if (catchSocketError!"setsockopt"(err))
835 			return false;
836 		
837 		return true;
838 		
839 	}
840 	
841 	uint recvFrom(in fd_t fd, ref ubyte[] data, ref NetworkAddress addr)
842 	{
843 		m_status = StatusInfo.init;
844 		socklen_t addrLen;
845 		addr.family = AF_INET;
846 		int ret = .recvfrom(fd, cast(void*) data.ptr, cast(INT) data.length, 0, addr.sockAddr, &addrLen);
847 		
848 		if (addrLen > addr.sockAddrLen) {
849 			addr.family = AF_INET6;
850 		}
851 		
852 		try log("RECVFROM " ~ ret.to!string ~ "B"); catch {}
853 		if (catchSocketError!".recvfrom"(ret)) { // ret == -1
854 			if (m_error == WSAEWOULDBLOCK)
855 				m_status.code = Status.ASYNC;
856 			return 0; // TODO: handle some errors more specifically
857 		}
858 		m_status.code = Status.OK;
859 		
860 		return cast(uint) ret;
861 	}
862 	
863 	uint sendTo(in fd_t fd, in ubyte[] data, in NetworkAddress addr)
864 	{
865 		m_status = StatusInfo.init;
866 		try log("SENDTO " ~ data.length.to!string ~ "B"); catch{}
867 		int ret;
868 		if (addr != NetworkAddress.init)
869 			ret = .sendto(fd, cast(void*) data.ptr, cast(INT) data.length, 0, addr.sockAddr, addr.sockAddrLen);
870 		else
871 			ret = .send(fd, cast(void*) data.ptr, cast(INT) data.length, 0);
872 		
873 		if (catchSocketError!".sendTo"(ret)) { // ret == -1
874 			if (m_error == WSAEWOULDBLOCK)
875 				m_status.code = Status.ASYNC;
876 			return 0; // TODO: handle some errors more specifically
877 		}
878 		
879 		m_status.code = Status.OK;
880 		return cast(uint) ret;
881 	}
882 	
883 	NetworkAddress localAddr(in fd_t fd, bool ipv6) {
884 		NetworkAddress ret;
885 		import libasync.internals.win32 : getsockname, AF_INET, AF_INET6, socklen_t, sockaddr;
886 		if (ipv6)
887 			ret.family = AF_INET6;
888 		else
889 			ret.family = AF_INET;
890 		socklen_t len = ret.sockAddrLen;
891 		int err = getsockname(fd, ret.sockAddr, &len);
892 		if (catchSocketError!"getsockname"(err))
893 			return NetworkAddress.init;
894 		if (len > ret.sockAddrLen)
895 			ret.family = AF_INET6;
896 		return ret;
897 	}
898 	
899 	void noDelay(in fd_t fd, bool b) {
900 		m_status = StatusInfo.init;
901 		setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &b, b.sizeof);
902 	}
903 	
904 	private bool closeRemoteSocket(fd_t fd, bool forced) {
905 		
906 		INT err;
907 		
908 		try log("Shutdown FD#" ~ fd.to!string);
909 		catch{}
910 		if (forced) {
911 			err = shutdown(fd, SD_BOTH);
912 			closesocket(fd);
913 		}
914 		else
915 			err = shutdown(fd, SD_SEND);
916 		
917 		try {
918 			TCPEventHandler* evh = fd in m_tcpHandlers;
919 			if (evh) {
920 				if (evh.conn.inbound) {
921 					try ThreadMem.free(evh.conn);
922 					catch(Exception e) { assert(false, "Failed to free resources"); }
923 				}
924 
925 				evh.conn = null;
926 				//log("Remove event handler for " ~ fd.to!string);
927 				m_tcpHandlers.remove(fd);
928 			} 
929 		}
930 		catch (Exception e) {
931 			setInternalError!"m_tcpHandlers.remove"(Status.ERROR);
932 			return false;
933 		}
934 		if (catchSocketError!"shutdown"(err))
935 			return false;
936 		return true;
937 	}
938 	
939 	// for connected sockets
940 	bool closeSocket(fd_t fd, bool connected, bool forced = false)
941 	{
942 		m_status = StatusInfo.init;
943 		if (!connected && forced) {
944 			try {
945 				if (fd in m_connHandlers) {
946 					log("Removing connection handler for: " ~ fd.to!string);
947 					m_connHandlers.remove(fd);
948 				}
949 			}
950 			catch (Exception e) {
951 				setInternalError!"m_connHandlers.remove"(Status.ERROR);
952 				return false;
953 			}
954 		}
955 		else if (connected)
956 			closeRemoteSocket(fd, forced);
957 
958 		if (!connected || forced) {
959 			// todo: flush the socket here?
960 			
961 			INT err = closesocket(fd);
962 			if (catchSocketError!"closesocket"(err)) 
963 				return false;
964 			
965 		}
966 		return true;
967 	}
968 	
969 	bool closeConnection(fd_t fd) {
970 		return closeSocket(fd, true);
971 	}
972 	
973 	NetworkAddress getAddressFromIP(in string ipAddr, in ushort port = 0, in bool ipv6 = false, in bool tcp = true)
974 	{
975 		m_status = StatusInfo.init;
976 		
977 		NetworkAddress addr;
978 		WSAPROTOCOL_INFOW hints;
979 		import std.conv : to;
980 		if (ipv6) {
981 			addr.family = AF_INET6;
982 		}
983 		else {
984 			addr.family = AF_INET;
985 		}
986 		
987 		INT addrlen = addr.sockAddrLen;
988 		
989 		LPWSTR str;
990 		try {
991 			str = cast(LPWSTR) toUTFz!(wchar*)(ipAddr);
992 		} catch (Exception e) {
993 			setInternalError!"toStringz"(Status.ERROR, e.msg);
994 			return NetworkAddress.init;
995 		}
996 		
997 		INT err = WSAStringToAddressW(str, cast(INT) addr.family, null, addr.sockAddr, &addrlen); 
998 		if (port != 0) addr.port = port;
999 		try log(addr.toString());
1000 		catch {}
1001 		if( catchSocketError!"getAddressFromIP"(err) )
1002 			return NetworkAddress.init;
1003 		else assert(addrlen == addr.sockAddrLen);
1004 		return addr;
1005 	}
1006 	
1007 	NetworkAddress getAddressFromDNS(in string host, in ushort port = 0, in bool ipv6 = true, in bool tcp = true, in bool force = true)
1008 		/*in { 
1009 		debug import libasync.internals.validator : validateHost;
1010 		debug assert(validateHost(host), "Trying to connect to an invalid domain");
1011 	}
1012 	body */{
1013 		m_status = StatusInfo.init;
1014 		import std.conv : to;
1015 		NetworkAddress addr;
1016 		ADDRINFOW hints;
1017 		ADDRINFOW* infos;
1018 		LPCWSTR wPort = port.to!(wchar[]).toUTFz!(const(wchar)*);
1019 		if (ipv6) {
1020 			hints.ai_family = AF_INET6;
1021 			addr.family = AF_INET6;
1022 		}
1023 		else {
1024 			hints.ai_family = AF_INET;
1025 			addr.family = AF_INET;
1026 		}
1027 		
1028 		if (tcp) {
1029 			hints.ai_protocol = IPPROTO_TCP;
1030 			hints.ai_socktype = SOCK_STREAM;
1031 		}
1032 		else {
1033 			hints.ai_protocol = IPPROTO_UDP;
1034 			hints.ai_socktype = SOCK_DGRAM;
1035 		}
1036 		if (port != 0) addr.port = port;
1037 		
1038 		LPCWSTR str;
1039 		
1040 		try {
1041 			str = cast(LPCWSTR) toUTFz!(immutable(wchar)*)(host);
1042 		} catch (Exception e) {
1043 			setInternalError!"toUTFz"(Status.ERROR, e.msg);
1044 			return NetworkAddress.init;
1045 		}
1046 		
1047 		error_t err = cast(error_t) GetAddrInfoW(str, cast(LPCWSTR) wPort, &hints, &infos);
1048 		scope(exit) FreeAddrInfoW(infos);
1049 		if (err != EWIN.WSA_OK) {
1050 			setInternalError!"GetAddrInfoW"(Status.ABORT, string.init, err);
1051 			return NetworkAddress.init;
1052 		}
1053 		
1054 		ubyte* pAddr = cast(ubyte*) infos.ai_addr;
1055 		ubyte* data = cast(ubyte*) addr.sockAddr;
1056 		data[0 .. infos.ai_addrlen] = pAddr[0 .. infos.ai_addrlen]; // perform bit copy
1057 		try log("GetAddrInfoW Successfully resolved DNS to: " ~ addr.toAddressString());
1058 		catch (Exception e){}
1059 		return addr;
1060 	}
1061 	
1062 	void setInternalError(string TRACE)(in Status s, in string details = "", in error_t error = EWIN.ERROR_ACCESS_DENIED)
1063 	{
1064 		if (details.length > 0)
1065 			m_status.text = TRACE ~ ": " ~ details;
1066 		else
1067 			m_status.text = TRACE;
1068 		m_error = error;
1069 		m_status.code = s;
1070 		static if(LOG) log(m_status);
1071 	}
1072 private:
1073 	bool onMessage(MSG msg) 
1074 	{
1075 		m_status = StatusInfo.init;
1076 		switch (msg.message) {
1077 			case WM_TCP_SOCKET:
1078 				auto evt = LOWORD(msg.lParam);
1079 				auto err = HIWORD(msg.lParam);
1080 				if (!onTCPEvent(evt, err, cast(fd_t)msg.wParam)) {
1081 
1082 					if (evt == FD_ACCEPT)
1083 						setInternalError!"del@TCPAccept.ERROR"(Status.ERROR); 
1084 					else {
1085 						try {
1086 							TCPEventHandler cb = m_tcpHandlers.get(cast(fd_t)msg.wParam);
1087 							cb(TCPEvent.ERROR);
1088 						}
1089 						catch (Exception e) {
1090 							// An Error callback should never fail...
1091 							setInternalError!"del@TCPEvent.ERROR"(Status.ERROR); 
1092 							// assert(false, evt.to!string ~ " & " ~ m_status.to!string ~ " & " ~ m_error.to!string); 
1093 						}
1094 					}
1095 				}
1096 				break;
1097 			case WM_UDP_SOCKET:
1098 				auto evt = LOWORD(msg.lParam);
1099 				auto err = HIWORD(msg.lParam);
1100 				if (!onUDPEvent(evt, err, cast(fd_t)msg.wParam)) {
1101 					try {
1102 						UDPHandler cb = m_udpHandlers.get(cast(fd_t)msg.wParam);
1103 						cb(UDPEvent.ERROR);
1104 					}
1105 					catch (Exception e) {
1106 						// An Error callback should never fail...
1107 						setInternalError!"del@UDPEvent.ERROR"(Status.ERROR); 
1108 					}
1109 				}
1110 				break;
1111 			case WM_TIMER:
1112 				log("Timer callback");
1113 				TimerHandler cb;
1114 				bool cached = (m_timer.fd == cast(fd_t)msg.wParam);
1115 				try {
1116 					if (cached)
1117 						cb = m_timer.cb;
1118 					else
1119 						cb = m_timerHandlers.get(cast(fd_t)msg.wParam);
1120 					
1121 					cb.ctxt.rearmed = false;
1122 					
1123 					if (cb.ctxt.oneShot)
1124 						kill(cb.ctxt);
1125 					
1126 					cb();
1127 					
1128 					if (cb.ctxt.oneShot && cb.ctxt.rearmed)
1129 						run(cb.ctxt, cb, cb.ctxt.timeout);
1130 					
1131 				}
1132 				catch (Exception e) {
1133 					// An Error callback should never fail...
1134 					setInternalError!"del@TimerHandler"(Status.ERROR, e.msg);  
1135 				}
1136 				
1137 				break;
1138 			case WM_USER_EVENT:
1139 				log("User event");
1140 
1141 				ulong uwParam = cast(ulong)msg.wParam;
1142 				ulong ulParam = cast(ulong)msg.lParam;
1143 
1144 				ulong payloadAddr = (ulParam << 32) | uwParam;
1145 				void* payloadPtr = cast(void*) payloadAddr;
1146 				shared AsyncSignal ctxt = cast(shared AsyncSignal) payloadPtr;
1147 
1148 				try log("Got notification in : " ~ m_hwnd.to!string ~ " pointer: " ~ payloadPtr.to!string); catch {}
1149 				try {
1150 					assert(ctxt.id != 0);
1151 					ctxt.handler();
1152 				}
1153 				catch (Exception e) {
1154 					setInternalError!"WM_USER_EVENT@handler"(Status.ERROR); 
1155 				}
1156 				break;
1157 			case WM_USER_SIGNAL:
1158 				log("User signal");
1159 
1160 				ulong uwParam = cast(ulong)msg.wParam;
1161 				ulong ulParam = cast(ulong)msg.lParam;
1162 
1163 				ulong payloadAddr = (ulParam << 32) | uwParam;
1164 				void* payloadPtr = cast(void*) payloadAddr;
1165 				AsyncNotifier ctxt = cast(AsyncNotifier) payloadPtr;
1166 
1167 				try {
1168 					ctxt.handler();
1169 				}
1170 				catch (Exception e) {
1171 					setInternalError!"WM_USER_SIGNAL@handler"(Status.ERROR); 
1172 				}
1173 				break;
1174 			default: return false; // not handled, sends to wndProc
1175 		}
1176 		return true;
1177 	}
1178 	
1179 	bool onUDPEvent(WORD evt, WORD err, fd_t sock) {
1180 		m_status = StatusInfo.init;
1181 		try{
1182 			if (m_udpHandlers.get(sock) == UDPHandler.init)
1183 				return false;
1184 		}	catch {}
1185 		if (sock == 0) { // highly unlikely...
1186 			setInternalError!"onUDPEvent"(Status.ERROR, "no socket defined");
1187 			return false;
1188 		}
1189 		if (err) {
1190 			setInternalError!"onUDPEvent"(Status.ERROR, string.init, cast(error_t)err);
1191 			try {
1192 				//log("CLOSE FD#" ~ sock.to!string);
1193 				(m_udpHandlers)[sock](UDPEvent.ERROR);
1194 			} catch { // can't do anything about this...
1195 			}
1196 			return false;
1197 		}
1198 		
1199 		UDPHandler cb;
1200 		switch(evt) {
1201 			default: break;
1202 			case FD_READ:
1203 				try {
1204 					log("READ FD#" ~ sock.to!string);
1205 					cb = m_udpHandlers.get(sock);
1206 					assert(cb != UDPHandler.init, "Socket " ~ sock.to!string ~ " could not yield a callback");
1207 					cb(UDPEvent.READ);
1208 				}
1209 				catch (Exception e) {
1210 					setInternalError!"del@TCPEvent.READ"(Status.ABORT); 
1211 					return false;
1212 				}
1213 				break;
1214 			case FD_WRITE:
1215 				try {
1216 					log("WRITE FD#" ~ sock.to!string);
1217 					cb = m_udpHandlers.get(sock);
1218 					assert(cb != UDPHandler.init, "Socket " ~ sock.to!string ~ " could not yield a callback");
1219 					cb(UDPEvent.WRITE);
1220 				}
1221 				catch (Exception e) {
1222 					setInternalError!"del@TCPEvent.WRITE"(Status.ABORT); 
1223 					return false;
1224 				}
1225 				break;
1226 		}
1227 		return true;
1228 	}
1229 	
1230 	bool onTCPEvent(WORD evt, WORD err, fd_t sock) {
1231 		m_status = StatusInfo.init;
1232 		try{
1233 			if (m_tcpHandlers.get(sock) == TCPEventHandler.init && m_connHandlers.get(sock) == TCPAcceptHandler.init)
1234 				return false; 
1235 		} catch {}
1236 		if (sock == 0) { // highly unlikely...
1237 			setInternalError!"onTCPEvent"(Status.ERROR, "no socket defined");
1238 			return false;
1239 		}
1240 		if (err) {
1241 			setInternalError!"onTCPEvent"(Status.ERROR, string.init, cast(error_t)err);
1242 			try {
1243 				//log("CLOSE FD#" ~ sock.to!string);
1244 				(m_tcpHandlers)[sock](TCPEvent.ERROR);
1245 			} catch { // can't do anything about this...
1246 			}
1247 			return false;
1248 		}
1249 		
1250 		TCPEventHandler cb;
1251 		switch(evt) {
1252 			default: break;
1253 			case FD_ACCEPT:
1254 				version(Distributed) gs_mtx.lock_nothrow();
1255 
1256 				log("TCP Handlers: " ~ m_tcpHandlers.length.to!string);
1257 				log("Accepting connection");
1258 				/// Let another listener take the next connection
1259 				TCPAcceptHandler list;
1260 				try list = m_connHandlers[sock]; catch { assert(false, "Listening on an invalid socket..."); }
1261 				scope(exit) {
1262 					/// The connection rotation mechanism is handled by the TCPListenerDistMixins
1263 					/// when registering the same AsyncTCPListener object on multiple event loops.
1264 					/// This allows to even out the CPU usage on a server instance.
1265 					version(Distributed)
1266 					{
1267 						HWND hwnd = list.ctxt.next(m_hwnd);
1268 						if (hwnd !is HWND.init) {
1269 							int error = WSAAsyncSelect(sock, hwnd, WM_TCP_SOCKET, FD_ACCEPT);
1270 							if (catchSocketError!"WSAAsyncSelect.NEXT()=> HWND"(error)) {
1271 								error = WSAAsyncSelect(sock, m_hwnd, WM_TCP_SOCKET, FD_ACCEPT);
1272 								if (catchSocketError!"WSAAsyncSelect"(error))
1273 									assert(false, "Could not set listener back to window HANDLE " ~ m_hwnd.to!string); 
1274 							}
1275 						}
1276 						else log("Returned init!!");
1277 						gs_mtx.unlock_nothrow();
1278 					}
1279 				}
1280 
1281 				NetworkAddress addr;
1282 				addr.family = AF_INET;
1283 				int addrlen = addr.sockAddrLen;
1284 				fd_t csock = WSAAccept(sock, addr.sockAddr, &addrlen, null, 0);
1285 
1286 				if (catchSocketError!"WSAAccept"(csock, INVALID_SOCKET)) {
1287 					if (m_error == WSAEFAULT) { // not enough space for sockaddr
1288 						addr.family = AF_INET6;
1289 						addrlen = addr.sockAddrLen;
1290 						csock = WSAAccept(sock, addr.sockAddr, &addrlen, null, 0);
1291 						if (catchSocketError!"WSAAccept"(csock, INVALID_SOCKET))
1292 							return false;
1293 					}
1294 					else return false;
1295 				}
1296 
1297 				int ok = WSAAsyncSelect(csock, m_hwnd, WM_TCP_SOCKET, FD_CONNECT|FD_READ|FD_WRITE|FD_CLOSE);
1298 				if ( catchSocketError!"WSAAsyncSelect"(ok) ) 
1299 					return false;
1300 
1301 				log("Connection accepted: " ~ csock.to!string);
1302 
1303 				AsyncTCPConnection conn;
1304 				try conn = ThreadMem.alloc!AsyncTCPConnection(m_evLoop);
1305 				catch (Exception e) { assert(false, "Failed allocation"); }
1306 				conn.peer = addr;
1307 				conn.socket = csock;
1308 				conn.inbound = true;
1309 
1310 				try {
1311 					// Do the callback to get a handler
1312 					cb = list(conn); 
1313 				} 
1314 				catch(Exception e) {
1315 					setInternalError!"onConnected"(Status.EVLOOP_FAILURE); 
1316 					return false; 
1317 				}
1318 
1319 				try {
1320 					m_tcpHandlers[csock] = cb; // keep the handler to setup the connection
1321 					log("ACCEPT&CONNECT FD#" ~ csock.to!string);
1322 					*conn.connected = true;
1323 					cb(TCPEvent.CONNECT);
1324 				}
1325 				catch (Exception e) { 
1326 					setInternalError!"m_tcpHandlers.opIndexAssign"(Status.ABORT); 
1327 					return false; 
1328 				}
1329 				break;
1330 			case FD_CONNECT:
1331 				try {
1332 					log("CONNECT FD#" ~ sock.to!string);
1333 					cb = m_tcpHandlers.get(sock);
1334 					if (cb == TCPEventHandler.init) break;//, "Socket " ~ sock.to!string ~ " could not yield a callback");
1335 					*cb.conn.connecting = true;
1336 				} 
1337 				catch(Exception e) {
1338 					setInternalError!"del@TCPEvent.CONNECT"(Status.ABORT);
1339 					return false;
1340 				}
1341 				break;
1342 			case FD_READ:
1343 				try {
1344 					log("READ FD#" ~ sock.to!string);
1345 					cb = m_tcpHandlers.get(sock);
1346 					if (cb == TCPEventHandler.init) break; //, "Socket " ~ sock.to!string ~ " could not yield a callback");
1347 					if (!cb.conn) break;
1348 					if (*cb.conn.connected == false && *cb.conn.connecting) {
1349 						*cb.conn.connecting = false;
1350 						*cb.conn.connected = true;
1351 						cb(TCPEvent.CONNECT);
1352 					}
1353 					else 
1354 						cb(TCPEvent.READ);
1355 				}
1356 				catch (Exception e) {
1357 					setInternalError!"del@TCPEvent.READ"(Status.ABORT); 
1358 					return false;
1359 				}
1360 				break;
1361 			case FD_WRITE:
1362 				// todo: don't send the first write for consistency with epoll?
1363 				
1364 				try {
1365 					//import std.stdio;
1366 					log("WRITE FD#" ~ sock.to!string);
1367 					cb = m_tcpHandlers.get(sock);
1368 					if (cb == TCPEventHandler.init) break;//assert(cb != TCPEventHandler.init, "Socket " ~ sock.to!string ~ " could not yield a callback");
1369 					if (!cb.conn) break;
1370 					if (*cb.conn.connected == false && *cb.conn.connecting) {
1371 						*cb.conn.connecting = false;
1372 						*cb.conn.connected = true;
1373 						cb(TCPEvent.CONNECT);
1374 					}
1375 					else {
1376 						cb(TCPEvent.WRITE);
1377 					}
1378 				}
1379 				catch (Exception e) {
1380 					setInternalError!"del@TCPEvent.WRITE"(Status.ABORT); 
1381 					return false;
1382 				}
1383 				break;
1384 			case FD_CLOSE:
1385 				// called after shutdown()
1386 				INT ret;
1387 				bool connected = true;
1388 				try {
1389 					log("CLOSE FD#" ~ sock.to!string);
1390 					if (sock in m_tcpHandlers) {
1391 						cb = m_tcpHandlers.get(sock);
1392 						if (*cb.conn.connected) {
1393 							cb(TCPEvent.CLOSE);
1394 							*cb.conn.connecting = false;
1395 							*cb.conn.connected = false;
1396 						} else
1397 							connected = false;
1398 					}
1399 					else
1400 						connected = false;
1401 				}
1402 				catch (Exception e) {
1403 					if (m_status.code == Status.OK)
1404 						setInternalError!"del@TCPEvent.CLOSE"(Status.ABORT); 
1405 					return false;
1406 				}
1407 				
1408 				closeSocket(sock, connected, true); // as necessary: invokes m_tcpHandlers.remove(fd), shutdown, closesocket
1409 				
1410 				break;
1411 		}
1412 		return true;
1413 	}
1414 	
1415 	bool initUDPSocket(fd_t fd, AsyncUDPSocket ctxt)
1416 	{
1417 		INT err;
1418 		err = bind(fd, ctxt.local.sockAddr, ctxt.local.sockAddrLen);
1419 		if (catchSocketError!"bind"(err)) {
1420 			closesocket(fd);
1421 			return false;
1422 		}
1423 		err = listen(fd, 128);
1424 		if (catchSocketError!"listen"(err)) {
1425 			closesocket(fd);
1426 			return false;
1427 		}
1428 		err = WSAAsyncSelect(fd, m_hwnd, WM_UDP_SOCKET, FD_READ | FD_WRITE);
1429 		if (catchSocketError!"WSAAsyncSelect"(err)) {
1430 			closesocket(fd);
1431 			return false;
1432 		}
1433 		
1434 		return true;
1435 	}
1436 	
1437 	bool initTCPListener(fd_t fd, AsyncTCPListener ctxt, bool reusing = false)
1438 	in { 
1439 		assert(m_threadId == GetCurrentThreadId());
1440 		assert(ctxt.local !is NetworkAddress.init);
1441 	}
1442 	body {
1443 		INT err;
1444 		if (!reusing) {
1445 			err = bind(fd, ctxt.local.sockAddr, ctxt.local.sockAddrLen);
1446 			if (catchSocketError!"bind"(err)) {
1447 				closesocket(fd);
1448 				return false;
1449 			}
1450 
1451 			err = listen(fd, 128);
1452 			if (catchSocketError!"listen"(err)) {
1453 				closesocket(fd);
1454 				return false;
1455 			}
1456 			
1457 			err = WSAAsyncSelect(fd, m_hwnd, WM_TCP_SOCKET, FD_ACCEPT);
1458 			if (catchSocketError!"WSAAsyncSelect"(err)) {
1459 				closesocket(fd);
1460 				return false;
1461 			}
1462 		}
1463 		
1464 		return true;
1465 	}
1466 	
1467 	bool initTCPConnection(fd_t fd, AsyncTCPConnection ctxt)
1468 	in { 
1469 		assert(ctxt.peer !is NetworkAddress.init);
1470 		assert(ctxt.peer.port != 0, "Connecting to an invalid port");
1471 	}
1472 	body {
1473 		INT err;
1474 		NetworkAddress bind_addr;
1475 		bind_addr.family = ctxt.peer.family;
1476 		
1477 		if (ctxt.peer.family == AF_INET) 
1478 			bind_addr.sockAddrInet4.sin_addr.s_addr = 0;
1479 		else if (ctxt.peer.family == AF_INET6) 
1480 			bind_addr.sockAddrInet6.sin6_addr.s6_addr[] = 0;
1481 		else {
1482 			status.code = Status.ERROR;
1483 			status.text = "Invalid NetworkAddress.family " ~ ctxt.peer.family.to!string;
1484 			return false;
1485 		}
1486 		
1487 		err = .bind(fd, bind_addr.sockAddr, bind_addr.sockAddrLen);
1488 		if ( catchSocketError!"bind"(err) ) 
1489 			return false;
1490 		err = WSAAsyncSelect(fd, m_hwnd, WM_TCP_SOCKET, FD_CONNECT|FD_READ|FD_WRITE|FD_CLOSE);
1491 		if ( catchSocketError!"WSAAsyncSelect"(err) ) 
1492 			return false;
1493 		err = .connect(fd, ctxt.peer.sockAddr, ctxt.peer.sockAddrLen);
1494 		
1495 		auto errors = [	tuple(cast(size_t) SOCKET_ERROR, EWIN.WSAEWOULDBLOCK, Status.ASYNC) ];		
1496 		
1497 		if (catchSocketErrorsEq!"connectEQ"(err, errors))
1498 			return true;
1499 		else if (catchSocketError!"connect"(err))
1500 			return false;
1501 		
1502 		return true;
1503 	}
1504 	
1505 	bool catchErrors(string TRACE, T)(T val, Tuple!(T, Status)[] cmp ...)
1506 		if (isIntegral!T)
1507 	{
1508 		foreach (validator ; cmp) {
1509 			if (val == validator[0]) {
1510 				m_status.text = TRACE;
1511 				m_status.code = validator[1];
1512 				if (m_status.code == Status.EVLOOP_TIMEOUT) {
1513 					log(m_status);
1514 					break;
1515 				}
1516 				m_error = GetLastErrorSafe();
1517 				static if(LOG) log(m_status);
1518 				return true;
1519 			}
1520 		}
1521 		return false;
1522 	}
1523 	
1524 	bool catchSocketErrors(string TRACE, T)(T val, Tuple!(T, Status)[] cmp ...)
1525 		if (isIntegral!T)
1526 	{
1527 		foreach (validator ; cmp) {
1528 			if (val == validator[0]) {
1529 				m_status.text = TRACE;
1530 				m_error = WSAGetLastErrorSafe();
1531 				m_status.status = validator[1];
1532 				static if(LOG) log(m_status);
1533 				return true;
1534 			}
1535 		}
1536 		return false;
1537 	}
1538 	
1539 	bool catchSocketErrorsEq(string TRACE, T)(T val, Tuple!(T, error_t, Status)[] cmp ...)
1540 		if (isIntegral!T)
1541 	{
1542 		error_t err;
1543 		foreach (validator ; cmp) {
1544 			if (val == validator[0]) {
1545 				if (err is EWIN.init) err = WSAGetLastErrorSafe();
1546 				if (err == validator[1]) {
1547 					m_status.text = TRACE;
1548 					m_error = WSAGetLastErrorSafe();
1549 					m_status.code = validator[2];
1550 					static if(LOG) log(m_status);
1551 					return true;
1552 				}
1553 			}
1554 		}
1555 		return false;
1556 	}
1557 	
1558 	
1559 	bool catchSocketError(string TRACE, T)(T val, T cmp = SOCKET_ERROR)
1560 		if (isIntegral!T)
1561 	{
1562 		if (val == cmp) {
1563 			m_status.text = TRACE;
1564 			m_error = WSAGetLastErrorSafe();
1565 			m_status.code = Status.ABORT;
1566 			static if(LOG) log(m_status);
1567 			return true;
1568 		}
1569 		return false;
1570 	}
1571 	
1572 	error_t WSAGetLastErrorSafe() {
1573 		try {
1574 			return cast(error_t) WSAGetLastError();
1575 		} catch(Exception e) {
1576 			return EWIN.ERROR_ACCESS_DENIED;
1577 		}
1578 	}
1579 	
1580 	error_t GetLastErrorSafe() {
1581 		try {
1582 			return cast(error_t) GetLastError();
1583 		} catch(Exception e) {
1584 			return EWIN.ERROR_ACCESS_DENIED;
1585 		}
1586 	}
1587 	
1588 	void log(StatusInfo val)
1589 	{
1590 		static if (LOG) {
1591 			import std.stdio;
1592 			try {
1593 				writeln("Backtrace: ", m_status.text);
1594 				writeln(" | Status:  ", m_status.code);
1595 				writeln(" | Error: " , m_error);
1596 				if ((m_error in EWSAMessages) !is null)
1597 					writeln(" | Message: ", EWSAMessages[m_error]);
1598 			} catch(Exception e) {
1599 				return;
1600 			}
1601 		}
1602 	}
1603 	
1604 	void log(T)(lazy T val)
1605 	{
1606 		static if (LOG) {
1607 			import std.stdio;
1608 			try {
1609 				writeln(val);
1610 			} catch(Exception e) {
1611 				return;
1612 			}
1613 		}
1614 	}
1615 	
1616 }
1617 
1618 mixin template TCPConnectionMixins() {
1619 	
1620 	private CleanupData m_impl;
1621 	
1622 	struct CleanupData {
1623 		bool connected;
1624 		bool connecting;
1625 	}
1626 	
1627 	@property bool* connecting() {
1628 		return &m_impl.connecting;
1629 	}
1630 	
1631 	@property bool* connected() {
1632 		return &m_impl.connected;
1633 	}
1634 	
1635 }
1636 /*
1637 mixin template TCPListenerDistMixins()
1638 {
1639 	import std.c.windows.windows : HWND;
1640 	import libasync.internals.hashmap : HashMap;
1641 	import core.sync.mutex;
1642 	private {
1643 		bool m_dist;
1644 		
1645 		Tuple!(WinReference, bool*) m_handles;
1646 		__gshared HashMap!(fd_t, Tuple!(WinReference, bool*)) gs_dist;
1647 		__gshared Mutex gs_mutex;
1648 	}
1649 
1650 	/// The TCP Listener schedules distributed connection handlers based on
1651 	/// the event loops that are using the same AsyncTCPListener object.
1652 	/// This is done by using WSAAsyncSelect on a different window after each
1653 	/// accept TCPEvent.
1654 	class WinReference {
1655 		private {
1656 			struct Item {
1657 				HWND handle;
1658 				bool active;
1659 			}
1660 
1661 			Item[] m_items;
1662 		}
1663 
1664 		this(HWND hndl, bool b) {
1665 			append(hndl, b);
1666 		}
1667 
1668 		void append(HWND hndl, bool b) {
1669 			m_items ~= Item(hndl, b);
1670 		}
1671 
1672 		HWND next(HWND me) {
1673 			Item[] items;
1674 			synchronized(gs_mutex)
1675 				items = m_items;
1676 			if (items.length == 1)
1677 				return me;
1678 			foreach (i, item; items) {
1679 				if (item.active == true) {
1680 					m_items[i].active = false; // remove responsibility
1681 					if (m_items.length <= i + 1) {
1682 						m_items[0].active = true; // set responsibility
1683 						auto ret = m_items[0].handle;
1684 						return ret;
1685 					}
1686 					else {
1687 						m_items[i + 1].active = true;
1688 						auto ret = m_items[i + 1].handle;
1689 						return ret;
1690 					}
1691 				}
1692 				
1693 			}
1694 			assert(false);
1695 		}
1696 
1697 	}
1698 
1699 	void init(HWND hndl, fd_t sock) {
1700 		try {
1701 			if (!gs_mutex) {
1702 				gs_mutex = new Mutex;
1703 			}
1704 			synchronized(gs_mutex) {
1705 				m_handles = gs_dist.get(sock);
1706 				if (m_handles == typeof(m_handles).init) {
1707 					gs_dist[sock] = Tuple!(WinReference, bool*)(new WinReference(hndl, true), &m_dist);
1708 					m_handles = gs_dist.get(sock);
1709 					assert(m_handles != typeof(m_handles).init);
1710 				}
1711 				else {
1712 					m_handles[0].append(hndl, false);
1713 					*m_handles[1] = true; // set first thread to dist
1714 					m_dist = true; // set this thread to dist
1715 				}
1716 			}
1717 		} catch (Exception e) {
1718 			assert(false, e.toString());
1719 		}
1720 		
1721 	}
1722 	
1723 	HWND next(HWND me) {
1724 		try {
1725 			if (!m_dist)
1726 				return HWND.init;
1727 			return m_handles[0].next(me);
1728 		}
1729 		catch (Exception e) {
1730 			assert(false, e.toString());
1731 		}
1732 	}
1733 	
1734 }*/
1735 private class DWHandlerInfo {
1736 	DWHandler handler;
1737 	Array!DWChangeInfo buffer;
1738 	
1739 	this(DWHandler cb) {
1740 		handler = cb;
1741 	}
1742 }
1743 
1744 private final class DWFolderWatcher {
1745 	import libasync.internals.path;
1746 private:
1747 	EventLoop m_evLoop;
1748 	fd_t m_fd;
1749 	bool m_recursive;
1750 	HANDLE m_handle;
1751 	Path m_path;
1752 	DWFileEvent m_events;
1753 	DWHandlerInfo m_handler; // contains buffer
1754 	shared AsyncSignal m_signal;
1755 	ubyte[FILE_NOTIFY_INFORMATION.sizeof + MAX_PATH + 1] m_buffer;
1756 	DWORD m_bytesTransferred;
1757 public:
1758 	this(EventLoop evl, in fd_t fd, in HANDLE hndl, in Path path, in DWFileEvent events, DWHandlerInfo handler, bool recursive) {
1759 		m_fd = fd;
1760 		m_recursive = recursive;
1761 		m_handle = cast(HANDLE)hndl;
1762 		m_evLoop = evl;
1763 		m_path = path;
1764 		m_handler = handler;
1765 		
1766 		m_signal = new shared AsyncSignal(m_evLoop);
1767 		m_signal.run(&onChanged);
1768 		triggerWatch();
1769 	}
1770 package:
1771 	void close() {
1772 		CloseHandle(m_handle);
1773 		m_signal.kill();
1774 	}
1775 	
1776 	void triggerChanged() {
1777 		m_signal.trigger();
1778 	}
1779 	
1780 	void onChanged() {
1781 		ubyte[] result = m_buffer.ptr[0 .. m_bytesTransferred];
1782 		do {
1783 			assert(result.length >= FILE_NOTIFY_INFORMATION.sizeof);
1784 			auto fni = cast(FILE_NOTIFY_INFORMATION*)result.ptr;
1785 			DWFileEvent kind;
1786 			switch( fni.Action ){
1787 				default: kind = DWFileEvent.MODIFIED; break;
1788 				case 0x1: kind = DWFileEvent.CREATED; break;
1789 				case 0x2: kind = DWFileEvent.DELETED; break;
1790 				case 0x3: kind = DWFileEvent.MODIFIED; break;
1791 				case 0x4: kind = DWFileEvent.MOVED_FROM; break;
1792 				case 0x5: kind = DWFileEvent.MOVED_TO; break;
1793 			}
1794 			string filename = to!string(fni.FileName.ptr[0 .. fni.FileNameLength/2]); // FileNameLength = #bytes, FileName=WCHAR[]
1795 			m_handler.buffer.insert(DWChangeInfo(kind, m_path ~ Path(filename)));
1796 			if( fni.NextEntryOffset == 0 ) break;
1797 			result = result[fni.NextEntryOffset .. $];
1798 		} while(result.length > 0);
1799 		
1800 		triggerWatch();
1801 		
1802 		m_handler.handler();
1803 	}
1804 	
1805 	void triggerWatch() {
1806 		
1807 		static UINT notifications = FILE_NOTIFY_CHANGE_FILE_NAME|FILE_NOTIFY_CHANGE_DIR_NAME|
1808 			FILE_NOTIFY_CHANGE_SIZE|FILE_NOTIFY_CHANGE_LAST_WRITE;
1809 		
1810 		OVERLAPPED* overlapped = ThreadMem.alloc!OVERLAPPED();
1811 		overlapped.Internal = 0;
1812 		overlapped.InternalHigh = 0;
1813 		overlapped.Offset = 0;
1814 		overlapped.OffsetHigh = 0;
1815 		overlapped.Pointer = cast(void*)this;
1816 		import std.stdio;
1817 		DWORD bytesReturned;
1818 		BOOL success = ReadDirectoryChangesW(m_handle, m_buffer.ptr, m_buffer.length, cast(BOOL) m_recursive, notifications, &bytesReturned, overlapped, &onIOCompleted);
1819 
1820 		static if (DEBUG) {
1821 			import std.stdio;
1822 			if (!success)
1823 				writeln("Failed to call ReadDirectoryChangesW: " ~ EWSAMessages[GetLastError().to!EWIN]);
1824 		}
1825 	}
1826 	
1827 	@property fd_t fd() const {
1828 		return m_fd;
1829 	}
1830 	
1831 	@property HANDLE handle() const {
1832 		return cast(HANDLE) m_handle;
1833 	}
1834 	
1835 	static nothrow extern(System)
1836 	{
1837 		void onIOCompleted(DWORD dwError, DWORD cbTransferred, OVERLAPPED* overlapped)
1838 		{
1839 			import std.stdio;
1840 			DWFolderWatcher watcher = cast(DWFolderWatcher)(overlapped.Pointer);
1841 			watcher.m_bytesTransferred = cbTransferred;
1842 			try ThreadMem.free(overlapped); catch {}
1843 
1844 			static if (DEBUG) {
1845 				if (dwError != 0)
1846 					try writeln("Diretory watcher error: "~EWSAMessages[dwError.to!EWIN]); catch{}
1847 			}
1848 			try watcher.triggerChanged();
1849 			catch (Exception e) {
1850 				static if (DEBUG) {
1851 					try writeln("Failed to trigger change"); catch {}
1852 				}
1853 			}
1854 		}
1855 	}
1856 	
1857 }
1858 
1859 /**
1860 		Represents a network/socket address. (taken from vibe.core.net)
1861 */
1862 public struct NetworkAddress {
1863 	import std.c.windows.winsock : sockaddr, sockaddr_in, sockaddr_in6;
1864 	private union {
1865 		sockaddr addr;
1866 		sockaddr_in addr_ip4;
1867 		sockaddr_in6 addr_ip6;
1868 	}
1869 	
1870 	@property bool ipv6() const pure nothrow { return this.family == AF_INET6; }
1871 	
1872 	/** Family (AF_) of the socket address.
1873 		*/
1874 	@property ushort family() const pure nothrow { return addr.sa_family; }
1875 	/// ditto
1876 	@property void family(ushort val) pure nothrow { addr.sa_family = cast(ubyte)val; }
1877 	
1878 	/** The port in host byte order.
1879 		*/
1880 	@property ushort port()
1881 	const pure nothrow {
1882 		switch (this.family) {
1883 			default: assert(false, "port() called for invalid address family: " ~ this.family.to!string);
1884 			case AF_INET: return ntoh(addr_ip4.sin_port);
1885 			case AF_INET6: return ntoh(addr_ip6.sin6_port);
1886 		}
1887 	}
1888 	/// ditto
1889 	@property void port(ushort val)
1890 	pure nothrow {
1891 		switch (this.family) {
1892 			default: assert(false, "port() called for invalid address family.");
1893 			case AF_INET: addr_ip4.sin_port = hton(val); break;
1894 			case AF_INET6: addr_ip6.sin6_port = hton(val); break;
1895 		}
1896 	}
1897 	
1898 	/** A pointer to a sockaddr struct suitable for passing to socket functions.
1899 		*/
1900 	@property inout(sockaddr)* sockAddr() inout pure nothrow { return &addr; }
1901 	
1902 	/** Size of the sockaddr struct that is returned by sockAddr().
1903 		*/
1904 	@property uint sockAddrLen()
1905 	const pure nothrow {
1906 		switch (this.family) {
1907 			default: assert(false, "sockAddrLen() called for invalid address family.");
1908 			case AF_INET: return addr_ip4.sizeof;
1909 			case AF_INET6: return addr_ip6.sizeof;
1910 		}
1911 	}
1912 	
1913 	@property inout(sockaddr_in)* sockAddrInet4() inout pure nothrow
1914 	in { assert (family == AF_INET); }
1915 	body { return &addr_ip4; }
1916 	
1917 	@property inout(sockaddr_in6)* sockAddrInet6() inout pure nothrow
1918 	in { assert (family == AF_INET6); }
1919 	body { return &addr_ip6; }
1920 	
1921 	/** Returns a string representation of the IP address
1922 		*/
1923 	string toAddressString()
1924 	const {
1925 		import std.array : appender;
1926 		import std.string : format;
1927 		import std.format : formattedWrite;
1928 		
1929 		switch (this.family) {
1930 			default: assert(false, "toAddressString() called for invalid address family.");
1931 			case AF_INET:
1932 				ubyte[4] ip = (cast(ubyte*)&addr_ip4.sin_addr.s_addr)[0 .. 4];
1933 				return format("%d.%d.%d.%d", ip[0], ip[1], ip[2], ip[3]);
1934 			case AF_INET6:
1935 				ubyte[16] ip = addr_ip6.sin6_addr.s6_addr;
1936 				auto ret = appender!string();
1937 				ret.reserve(40);
1938 				foreach (i; 0 .. 8) {
1939 					if (i > 0) ret.put(':');
1940 					ret.formattedWrite("%x", bigEndianToNative!ushort(cast(ubyte[2])ip[i*2 .. i*2+2].ptr[0 .. 2]));
1941 				}
1942 				return ret.data;
1943 		}
1944 	}
1945 	
1946 	/** Returns a full string representation of the address, including the port number.
1947 		*/
1948 	string toString()
1949 	const {
1950 		
1951 		import std.string : format;
1952 		
1953 		auto ret = toAddressString();
1954 		switch (this.family) {
1955 			default: assert(false, "toString() called for invalid address family.");
1956 			case AF_INET: return ret ~ format(":%s", port);
1957 			case AF_INET6: return format("[%s]:%s", ret, port);
1958 		}
1959 	}
1960 	
1961 }
1962 
1963 private pure nothrow {
1964 	import std.bitmanip;
1965 	
1966 	ushort ntoh(ushort val)
1967 	{
1968 		version (LittleEndian) return swapEndian(val);
1969 		else version (BigEndian) return val;
1970 		else static assert(false, "Unknown endianness.");
1971 	}
1972 	
1973 	ushort hton(ushort val)
1974 	{
1975 		version (LittleEndian) return swapEndian(val);
1976 		else version (BigEndian) return val;
1977 		else static assert(false, "Unknown endianness.");
1978 	}
1979 }
1980 enum WM_TCP_SOCKET = WM_USER+102;
1981 enum WM_UDP_SOCKET = WM_USER+103;
1982 enum WM_USER_EVENT = WM_USER+104;
1983 enum WM_USER_SIGNAL = WM_USER+105;
1984 
1985 nothrow:
1986 size_t g_idxCapacity = 8;
1987 Array!size_t g_idxAvailable;
1988 
1989 // called on run
1990 size_t createIndex() {
1991 	size_t idx;
1992 	import std.algorithm : max;
1993 	import std.range : iota;
1994 	try {
1995 		
1996 		size_t getIdx() {
1997 			
1998 			if (!g_idxAvailable.empty) {
1999 				immutable size_t ret = g_idxAvailable.back;
2000 				g_idxAvailable.removeBack();
2001 				return ret;
2002 			}
2003 			return 0;
2004 		}
2005 		
2006 		idx = getIdx();
2007 		if (idx == 0) {
2008 			import std.range : iota;
2009 			// todo: not sure about this
2010 			g_idxAvailable.insert( iota(g_idxCapacity, max(32, g_idxCapacity * 2), 1) );
2011 			g_idxCapacity = max(32, g_idxCapacity * 2);
2012 			idx = getIdx();
2013 		}
2014 		
2015 	} catch {}
2016 	
2017 	return idx;
2018 }
2019 
2020 void destroyIndex(AsyncTimer ctxt) {
2021 	try {
2022 		g_idxAvailable.insert(ctxt.id);		
2023 	}
2024 	catch (Exception e) {
2025 		assert(false, "Error destroying index: " ~ e.msg);
2026 	}
2027 }
2028 
2029 
2030 nothrow extern(System) {
2031 	LRESULT wndProc(HWND wnd, UINT msg, WPARAM wparam, LPARAM lparam)
2032 	{
2033 		auto ptr = cast(void*)GetWindowLongPtrA(wnd, GWLP_USERDATA);
2034 		if (ptr is null)
2035 			return DefWindowProcA(wnd, msg, wparam, lparam);
2036 		auto appl = cast(EventLoopImpl*)ptr;
2037 		MSG obj = MSG(wnd, msg, wparam, lparam, DWORD.init, POINT.init);
2038 		if (appl.onMessage(obj)) {
2039 			static if (DEBUG) {
2040 				if (appl.status.code != Status.OK && appl.status.code != Status.ASYNC) {
2041 					import std.stdio : writeln;
2042 					try { writeln(appl.error, ": ", appl.m_status.text); } catch {}
2043 				}
2044 			}
2045 			return 0;
2046 		}
2047 		else return DefWindowProcA(wnd, msg, wparam, lparam);
2048 	}
2049 	
2050 	BOOL PostMessageA(HWND hWnd, UINT Msg, WPARAM wParam, LPARAM lParam);
2051 	
2052 }