1 module libasync.windows;
2 
3 version (Windows):
4 
5 import core.atomic;
6 import core.thread : Fiber;
7 import libasync.types;
8 import libasync.internals.hashmap;
9 import libasync.internals.memory;
10 import std.container : Array;
11 import std.string : toStringz;
12 import std.conv : to;
13 import std.datetime : Duration, msecs, seconds;
14 import std.algorithm : min;
15 import libasync.internals.win32;
16 import std.traits : isIntegral;
17 import std.typecons : Tuple, tuple;
18 import std.utf : toUTFz;
19 import core.sync.mutex;
20 import libasync.events;
21 pragma(lib, "ws2_32");
22 pragma(lib, "ole32");
23 alias fd_t = SIZE_T;
24 alias error_t = EWIN;
25 
26 //todo :  see if new connections with SO_REUSEADDR are evenly distributed between threads
27 
28 
29 package struct EventLoopImpl {
30 	pragma(msg, "Using Windows IOCP for events");
31 	
32 private:
33 	HashMap!(fd_t, TCPAcceptHandler) m_connHandlers; // todo: Change this to an array
34 	HashMap!(fd_t, TCPEventHandler) m_tcpHandlers;
35 	HashMap!(fd_t, TimerHandler) m_timerHandlers;
36 	HashMap!(fd_t, UDPHandler) m_udpHandlers;
37 	HashMap!(fd_t, DWHandlerInfo) m_dwHandlers; // todo: Change this to an array too
38 	HashMap!(uint, DWFolderWatcher) m_dwFolders;
39 nothrow:
40 private:
41 	struct TimerCache {
42 		TimerHandler cb;
43 		fd_t fd;
44 	}
45 	TimerCache m_timer;
46 	
47 	EventLoop m_evLoop;
48 	bool m_started;
49 	wstring m_window;
50 	HWND m_hwnd;
51 	DWORD m_threadId;
52 	HANDLE[] m_waitObjects;
53 	ushort m_instanceId;
54 	StatusInfo m_status;
55 	error_t m_error = EWIN.WSA_OK;
56 	__gshared Mutex gs_mtx;
57 package:
58 	@property bool started() const {
59 		return m_started;
60 	}
61 	bool init(EventLoop evl) 
62 	in { assert(!m_started); }
63 	body
64 	{
65 		try if (!gs_mtx)
66 			gs_mtx = new Mutex; catch {}
67 		static ushort j;
68 		assert (j == 0, "Current implementation is only tested with 1 event loop per thread. There are known issues with signals on linux.");
69 		j += 1;
70 		m_status = StatusInfo.init;
71 		
72 		import core.thread;
73 		try Thread.getThis().priority = Thread.PRIORITY_MAX;
74 		catch (Exception e) { assert(false, "Could not set thread priority"); }
75 
76 		m_evLoop = evl;
77 		shared static ushort i;
78 		m_instanceId = i;
79 		core.atomic.atomicOp!"+="(i, cast(ushort) 1);
80 		wstring inststr;
81 		try { inststr = m_instanceId.to!wstring; }
82 		catch (Exception e) {
83 			return false;
84 		}
85 		m_window = "VibeWin32MessageWindow" ~ inststr;
86 		wstring classname = "VibeWin32MessageWindow" ~ inststr;
87 		
88 		LPCWSTR wnz;
89 		LPCWSTR clsn;
90 		try {
91 			wnz = cast(LPCWSTR) m_window.toUTFz!(immutable(wchar)*);
92 			clsn = cast(LPCWSTR) classname.toUTFz!(immutable(wchar)*);
93 		} catch (Exception e) {
94 			setInternalError!"toUTFz"(Status.ERROR, e.msg);
95 			return false;
96 		}
97 		
98 		m_threadId = GetCurrentThreadId();
99 		WNDCLASSW wc;
100 		wc.lpfnWndProc = &wndProc;
101 		wc.lpszClassName = clsn;
102 		RegisterClassW(&wc);
103 		m_hwnd = CreateWindowW(wnz, clsn, 0, 0, 0, 385, 375, HWND_MESSAGE,
104 		                       cast(HMENU) null, null, null);
105 		try log("Window registered: " ~ m_hwnd.to!string); catch{}
106 		auto ptr = cast(ULONG_PTR)cast(void*)&this;
107 		SetWindowLongPtrA(m_hwnd, GWLP_USERDATA, ptr);
108 		assert( cast(EventLoopImpl*)cast(void*)GetWindowLongPtrA(m_hwnd, GWLP_USERDATA) is &this );
109 		WSADATA wd;
110 		m_error = cast(error_t) WSAStartup(0x0202, &wd);
111 		if (m_error == EWIN.WSA_OK)	
112 			m_status.code = Status.OK;
113 		else {
114 			m_status.code = Status.ABORT;
115 			static if(LOG) log(m_status);
116 			return false;
117 		}
118 		assert(wd.wVersion == 0x0202);
119 		m_started = true;
120 		return true;
121 	}
122 	
123 	// todo: find where to call this
124 	void exit() {
125 		cast(void)PostThreadMessageW(m_threadId, WM_QUIT, 0, 0);
126 	}
127 	
128 	@property StatusInfo status() const {
129 		return m_status;
130 	}
131 	
132 	@property string error() const {
133 		string* ptr;
134 		string pv = ((ptr = (m_error in EWSAMessages)) !is null) ? *ptr : string.init;
135 		return pv;
136 	}
137 	
138 	bool loop(Duration timeout = 0.seconds)
139 	in { 
140 		assert(Fiber.getThis() is null); 
141 		assert(m_started);
142 	}
143 	body {
144 		DWORD msTimeout = cast(DWORD) min(timeout.total!"msecs", DWORD.max);
145 		/* 
146 		 * Waits until one or all of the specified objects are in the signaled state
147 		 * http://msdn.microsoft.com/en-us/library/windows/desktop/ms684245%28v=vs.85%29.aspx
148 		*/
149 		DWORD signal = MsgWaitForMultipleObjectsEx(
150 			cast(DWORD)0,
151 			null,
152 			msTimeout,
153 			QS_ALLEVENTS,								
154 			MWMO_ALERTABLE | MWMO_INPUTAVAILABLE		// MWMO_ALERTABLE: Wakes up to execute overlapped hEvent (i/o completion)
155 			// MWMO_INPUTAVAILABLE: Processes key/mouse input to avoid window ghosting
156 			);
157 		
158 		auto errors = 
159 		[ tuple(WAIT_FAILED, Status.EVLOOP_FAILURE) ];	/* WAIT_FAILED: Failed to call MsgWait..() */
160 		
161 		if (signal == WAIT_TIMEOUT)
162 			return true;
163 		
164 		if (catchErrors!"MsgWaitForMultipleObjectsEx"(signal, errors)) {
165 			log("Event Loop Exiting because of error");
166 			return false; 
167 		}
168 		
169 		MSG msg;
170 		while (PeekMessageW(&msg, null, 0, 0, PM_REMOVE)) {
171 			m_status = StatusInfo.init;
172 			TranslateMessage(&msg);
173 			DispatchMessageW(&msg);
174 
175 			if (m_status.code == Status.ERROR) {
176 				log(m_status.text);
177 				return false;
178 			}
179 		}
180 		return true;
181 	}
182 	
183 	fd_t run(AsyncTCPListener ctxt, TCPAcceptHandler del)
184 	{
185 		m_status = StatusInfo.init;
186 		fd_t fd = ctxt.socket;
187 		bool reusing;
188 		if (fd == fd_t.init) {
189 
190 			fd = WSASocketW(cast(int)ctxt.local.family, SOCK_STREAM, IPPROTO_TCP, null, 0, WSA_FLAG_OVERLAPPED);
191 			
192 			if (catchSocketError!("run AsyncTCPConnection")(fd, INVALID_SOCKET))
193 				return 0;
194 			
195 			if (!setOption(fd, TCPOption.REUSEADDR, true))
196 				return 0;
197 			
198 			// todo: defer accept?
199 			
200 			if (ctxt.noDelay) {
201 				if (!setOption(fd, TCPOption.NODELAY, true))
202 					return 0;
203 			}
204 		} else reusing = true;
205 
206 		if (initTCPListener(fd, ctxt, reusing))
207 		{
208 			try {
209 				log("Running listener on socket fd#" ~ fd.to!string);
210 				m_connHandlers[fd] = del;
211 				ctxt.init(m_hwnd, fd);
212 			}
213 			catch (Exception e) {
214 				setInternalError!"m_connHandlers assign"(Status.ERROR, e.msg);
215 				closeSocket(fd, false);
216 				return 0;
217 			}
218 		}
219 		else
220 		{
221 			return 0;
222 		}
223 
224 
225 		return fd;
226 	}
227 	
228 	fd_t run(AsyncTCPConnection ctxt, TCPEventHandler del)
229 	in { 
230 		assert(ctxt.socket == fd_t.init); 
231 		assert(ctxt.peer.family != AF_UNSPEC);
232 	}
233 	body {
234 		m_status = StatusInfo.init;
235 		fd_t fd = WSASocketW(cast(int)ctxt.peer.family, SOCK_STREAM, IPPROTO_TCP, null, 0, WSA_FLAG_OVERLAPPED);
236 		log("Starting connection at: " ~ fd.to!string);
237 		if (catchSocketError!("run AsyncTCPConnection")(fd, INVALID_SOCKET))
238 			return 0;
239 		
240 		try {
241 			(m_tcpHandlers)[fd] = del;
242 		}
243 		catch (Exception e) {
244 			setInternalError!"m_tcpHandlers assign"(Status.ERROR, e.msg);
245 			return 0;
246 		}
247 		
248 		debug {
249 			TCPEventHandler evh;
250 			try evh = m_tcpHandlers.get(fd);
251 			catch (Exception e) { log("Failed"); return 0; }
252 			assert( evh !is TCPEventHandler.init);
253 		}
254 		
255 		if (ctxt.noDelay) {
256 			if (!setOption(fd, TCPOption.NODELAY, true))
257 				return 0;
258 		}
259 		
260 		if (!initTCPConnection(fd, ctxt)) {
261 			try {
262 				log("Remove event handler for " ~ fd.to!string);
263 				m_tcpHandlers.remove(fd);
264 			}
265 			catch (Exception e) {
266 				setInternalError!"m_tcpHandlers remove"(Status.ERROR, e.msg);
267 			}
268 			
269 			closeSocket(fd, false);
270 			return 0;
271 		}
272 		
273 		
274 		try log("Client started FD#" ~ fd.to!string);
275 		catch{}
276 		return fd;
277 	}
278 	
279 	fd_t run(AsyncUDPSocket ctxt, UDPHandler del) {
280 		m_status = StatusInfo.init;
281 		fd_t fd = WSASocketW(cast(int)ctxt.local.family, SOCK_DGRAM, IPPROTO_UDP, null, 0, WSA_FLAG_OVERLAPPED);
282 		
283 		if (catchSocketError!("run AsyncUDPSocket")(fd, INVALID_SOCKET))
284 			return 0;
285 		
286 		if (initUDPSocket(fd, ctxt))
287 		{
288 			try {
289 				(m_udpHandlers)[fd] = del;
290 			}
291 			catch (Exception e) {
292 				setInternalError!"m_udpHandlers assign"(Status.ERROR, e.msg);
293 				closeSocket(fd, false);
294 				return 0;
295 			}
296 		}
297 		
298 		try log("UDP Socket started FD#" ~ fd.to!string);
299 		catch{}
300 		
301 		return fd;
302 	}
303 	
304 	fd_t run(shared AsyncSignal ctxt) {
305 		m_status = StatusInfo.init;
306 		try log("Signal subscribed to: " ~ m_hwnd.to!string); catch {}
307 		return (cast(fd_t)m_hwnd);
308 	}
309 	
310 	fd_t run(AsyncNotifier ctxt) {
311 		m_status = StatusInfo.init;
312 		//try log("Running signal " ~ (cast(AsyncNotifier)ctxt).to!string); catch {}
313 		return cast(fd_t) m_hwnd;
314 	}
315 	
316 	fd_t run(AsyncTimer ctxt, TimerHandler del, Duration timeout) {
317 		if (timeout < 0.seconds)
318 			timeout = 0.seconds;
319 		timeout += 10.msecs(); // round up to the next 10 msecs to avoid premature timer events
320 		m_status = StatusInfo.init;
321 		fd_t timer_id = ctxt.id;
322 		if (timer_id == fd_t.init) {
323 			timer_id = createIndex();
324 		}
325 		try log("Timer created: " ~ timer_id.to!string ~ " with timeout: " ~ timeout.total!"msecs".to!string ~ " msecs"); catch {}
326 		
327 		BOOL err;
328 		try err = cast(int)SetTimer(m_hwnd, timer_id, timeout.total!"msecs".to!uint, null);
329 		catch(Exception e) {
330 			setInternalError!"SetTimer"(Status.ERROR);
331 			return 0;
332 		}
333 		
334 		if (err == 0)
335 		{
336 			m_error = GetLastErrorSafe();
337 			m_status.code = Status.ERROR;
338 			m_status.text = "kill(AsyncTimer)";
339 			log(m_status);
340 			return 0;
341 		}
342 		
343 		if (m_timer.fd == fd_t.init) 
344 		{
345 			m_timer.fd = timer_id;
346 			m_timer.cb = del;
347 		}
348 		else {
349 			try
350 			{
351 				(m_timerHandlers)[timer_id] = del;
352 			}
353 			catch (Exception e) {
354 				setInternalError!"HashMap assign"(Status.ERROR);
355 				return 0;
356 			}
357 		}
358 		
359 		
360 		return timer_id;
361 	}
362 	
363 	fd_t run(AsyncDirectoryWatcher ctxt, DWHandler del)
364 	{
365 		static fd_t ids;
366 		auto fd = ++ids;
367 		
368 		try (m_dwHandlers)[fd] = new DWHandlerInfo(del); 
369 		catch (Exception e) {
370 			setInternalError!"AsyncDirectoryWatcher.hashMap(run)"(Status.ERROR, "Could not add handler to hashmap: " ~ e.msg);
371 		}
372 		
373 		return fd;
374 		
375 	}
376 	
377 	bool kill(AsyncDirectoryWatcher ctxt) {
378 		
379 		try {
380 			Array!DWFolderWatcher toFree;
381 			foreach (ref const uint k, const DWFolderWatcher v; m_dwFolders) {
382 				if (v.fd == ctxt.fd) {
383 					CloseHandle(v.handle);
384 					m_dwFolders.remove(k);
385 				}
386 			}
387 			
388 			foreach (DWFolderWatcher obj; toFree[])
389 				FreeListObjectAlloc!DWFolderWatcher.free(obj);
390 			
391 			// todo: close all the handlers...
392 			m_dwHandlers.remove(ctxt.fd);
393 		}
394 		catch (Exception e) {
395 			setInternalError!"in kill(AsyncDirectoryWatcher)"(Status.ERROR, e.msg);
396 			return false;
397 		}
398 		
399 		return true;
400 	}
401 	
402 	bool kill(AsyncTCPConnection ctxt, bool forced = false)
403 	{
404 		
405 		m_status = StatusInfo.init;
406 		fd_t fd = ctxt.socket;
407 		
408 		log("Killing socket "~ fd.to!string);
409 		try { 
410 			auto cb = m_tcpHandlers.get(ctxt.socket);
411 			if (cb != TCPEventHandler.init){
412 				*cb.conn.connected = false;
413 				*cb.conn.connecting = false;
414 				return closeSocket(fd, true, forced);
415 			}
416 		} catch (Exception e) {
417 			setInternalError!"in m_tcpHandlers"(Status.ERROR, e.msg);
418 			assert(false);
419 			//return false;
420 		}
421 		
422 		return true;
423 	}
424 	
425 	bool kill(AsyncTCPListener ctxt)
426 	{
427 		m_status = StatusInfo.init;
428 		fd_t fd = ctxt.socket;
429 		try { 
430 			if ((ctxt.socket in m_connHandlers) !is null) {
431 				return closeSocket(fd, false, true);
432 			}
433 		} catch (Exception e) {
434 			setInternalError!"in m_connHandlers"(Status.ERROR, e.msg);
435 			return false;
436 		}
437 		
438 		return true;
439 	}
440 	
441 	bool kill(shared AsyncSignal ctxt) {
442 		return true;
443 	}
444 	
445 	bool kill(AsyncNotifier ctxt) {
446 		return true;
447 	}
448 	
449 	bool kill(AsyncTimer ctxt) {
450 		m_status = StatusInfo.init;
451 		
452 		log("Kill timer");
453 		
454 		BOOL err = KillTimer(m_hwnd, ctxt.id);
455 		if (err == 0)
456 		{
457 			m_error = GetLastErrorSafe();
458 			m_status.code = Status.ERROR;
459 			m_status.text = "kill(AsyncTimer)";
460 			log(m_status);
461 			return false;
462 		}
463 		
464 		destroyIndex(ctxt);
465 		
466 		if (m_timer.fd == ctxt.id) {
467 			ctxt.id = 0;
468 			m_timer = TimerCache.init;
469 		} else {
470 			try {
471 				m_timerHandlers.remove(ctxt.id);
472 			}
473 			catch (Exception e) {
474 				setInternalError!"HashMap remove"(Status.ERROR);
475 				return 0;
476 			}
477 		}
478 		
479 		
480 		return true;
481 	}
482 	
483 	bool kill(AsyncUDPSocket ctxt) {
484 		m_status = StatusInfo.init;
485 		
486 		fd_t fd = ctxt.socket;
487 		INT err = closesocket(fd);
488 		if (catchSocketError!"closesocket"(err)) 
489 			return false;
490 		
491 		try m_udpHandlers.remove(ctxt.socket);
492 		catch (Exception e) {
493 			setInternalError!"HashMap remove"(Status.ERROR);
494 			return 0;
495 		}
496 		
497 		return true;
498 	}
499 	
500 	bool setOption(T)(fd_t fd, TCPOption option, in T value) {
501 		m_status = StatusInfo.init;
502 		int err;
503 		try {
504 			nothrow bool errorHandler() {
505 				if (catchSocketError!"setOption:"(err)) {
506 					try m_status.text ~= option.to!string;
507 					catch (Exception e){ assert(false, "to!string conversion failure"); }
508 					return false;
509 				}
510 				
511 				return true;
512 			}
513 			
514 			
515 			static HashMap!(fd_t, tcp_keepalive)* kcache;
516 			
517 			final switch (option) {
518 				
519 				case TCPOption.NODELAY: // true/false
520 					static if (!is(T == bool))
521 						assert(false, "NODELAY value type must be bool, not " ~ T.stringof);
522 					else {
523 						BOOL val = value?1:0;
524 						socklen_t len = val.sizeof;
525 						err = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, len);
526 						return errorHandler();
527 					}
528 				case TCPOption.REUSEADDR: // true/false
529 					static if (!is(T == bool))
530 						assert(false, "REUSEADDR value type must be bool, not " ~ T.stringof);
531 					else
532 					{
533 						BOOL val = value?1:0;
534 						socklen_t len = val.sizeof;
535 						err = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &val, len);
536 						return errorHandler();
537 					}
538 				case TCPOption.QUICK_ACK:
539 					static if (!is(T == bool))
540 						assert(false, "QUICK_ACK value type must be bool, not " ~ T.stringof);
541 					else {
542 						m_status.code = Status.NOT_IMPLEMENTED;
543 						return false; // quick ack is not implemented
544 					}
545 				case TCPOption.KEEPALIVE_ENABLE: // true/false
546 					static if (!is(T == bool))
547 						assert(false, "KEEPALIVE_ENABLE value type must be bool, not " ~ T.stringof);
548 					else
549 					{
550 						BOOL val = value?1:0;
551 						socklen_t len = val.sizeof;
552 						err = setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &val, len);
553 						return errorHandler();
554 					}
555 				case TCPOption.KEEPALIVE_COUNT: // retransmit 10 times before dropping half-open conn
556 					static if (!isIntegral!T)
557 						assert(false, "KEEPALIVE_COUNT value type must be integral, not " ~ T.stringof);
558 					else {
559 						m_status.code = Status.NOT_IMPLEMENTED;
560 						return false;
561 					}
562 				case TCPOption.KEEPALIVE_INTERVAL: // wait ## seconds between each keepalive packets
563 					static if (!is(T == Duration))
564 						assert(false, "KEEPALIVE_INTERVAL value type must be Duration, not " ~ T.stringof);
565 					else {
566 						
567 						if (!kcache)
568 							kcache = new HashMap!(fd_t, tcp_keepalive)(defaultAllocator());
569 						
570 						tcp_keepalive kaSettings = kcache.get(fd, tcp_keepalive.init);
571 						tcp_keepalive sReturned;
572 						DWORD dwBytes;
573 						kaSettings.onoff = ULONG(1);
574 						if (kaSettings.keepalivetime == ULONG.init) {
575 							kaSettings.keepalivetime = 1000;
576 						}
577 						kaSettings.keepaliveinterval = value.total!"msecs".to!ULONG;
578 						(*kcache)[fd] = kaSettings;
579 						err = WSAIoctl(fd, SIO_KEEPALIVE_VALS, &kaSettings, tcp_keepalive.sizeof, &sReturned, tcp_keepalive.sizeof, &dwBytes, null, null);
580 						
581 						return errorHandler();
582 					}
583 				case TCPOption.KEEPALIVE_DEFER: // wait ## seconds until start
584 					static if (!is(T == Duration))
585 						assert(false, "KEEPALIVE_DEFER value type must be Duration, not " ~ T.stringof);
586 					else {
587 						
588 						if (!kcache)
589 							kcache = new HashMap!(fd_t, tcp_keepalive)(defaultAllocator());
590 						
591 						tcp_keepalive kaSettings = kcache.get(fd, tcp_keepalive.init);
592 						tcp_keepalive sReturned;
593 						DWORD dwBytes;
594 						kaSettings.onoff = ULONG(1);
595 						if (kaSettings.keepaliveinterval == ULONG.init) {
596 							kaSettings.keepaliveinterval = 75*1000;
597 						}
598 						kaSettings.keepalivetime = value.total!"msecs".to!ULONG;
599 						
600 						(*kcache)[fd] = kaSettings;
601 						err = WSAIoctl(fd, SIO_KEEPALIVE_VALS, &kaSettings, tcp_keepalive.sizeof, &sReturned, tcp_keepalive.sizeof, &dwBytes, null, null);
602 						
603 						return errorHandler();
604 					}
605 				case TCPOption.BUFFER_RECV: // bytes
606 					static if (!isIntegral!T)
607 						assert(false, "BUFFER_RECV value type must be integral, not " ~ T.stringof);
608 					else {
609 						int val = value.to!int;
610 						socklen_t len = val.sizeof;
611 						err = setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &val, len);
612 						return errorHandler();
613 					}
614 				case TCPOption.BUFFER_SEND: // bytes
615 					static if (!isIntegral!T)
616 						assert(false, "BUFFER_SEND value type must be integral, not " ~ T.stringof);
617 					else {
618 						int val = value.to!int;
619 						socklen_t len = val.sizeof;
620 						err = setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &val, len);
621 						return errorHandler();
622 					}
623 				case TCPOption.TIMEOUT_RECV:
624 					static if (!is(T == Duration))
625 						assert(false, "TIMEOUT_RECV value type must be Duration, not " ~ T.stringof);
626 					else {
627 						DWORD val = value.total!"msecs".to!DWORD;
628 						socklen_t len = val.sizeof;
629 						err = setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, &val, len);
630 						return errorHandler();
631 					}
632 				case TCPOption.TIMEOUT_SEND:
633 					static if (!is(T == Duration))
634 						assert(false, "TIMEOUT_SEND value type must be Duration, not " ~ T.stringof);
635 					else {
636 						DWORD val = value.total!"msecs".to!DWORD;
637 						socklen_t len = val.sizeof;
638 						err = setsockopt(fd, SOL_SOCKET, SO_SNDTIMEO, &val, len);
639 						return errorHandler();
640 					}
641 				case TCPOption.TIMEOUT_HALFOPEN:
642 					static if (!is(T == Duration))
643 						assert(false, "TIMEOUT_SEND value type must be Duration, not " ~ T.stringof);
644 					else {
645 						m_status.code = Status.NOT_IMPLEMENTED;
646 						return false;
647 					}
648 				case TCPOption.LINGER: // bool onOff, int seconds
649 					static if (!is(T == Tuple!(bool, int)))
650 						assert(false, "LINGER value type must be Tuple!(bool, int), not " ~ T.stringof);
651 					else {
652 						linger l = linger(val[0]?1:0, val[1].to!USHORT);
653 						socklen_t llen = l.sizeof;
654 						err = setsockopt(fd, SOL_SOCKET, SO_LINGER, &l, llen);
655 						return errorHandler();
656 					}
657 				case TCPOption.CONGESTION:
658 					static if (!isIntegral!T)
659 						assert(false, "CONGESTION value type must be integral, not " ~ T.stringof);
660 					else {
661 						m_status.code = Status.NOT_IMPLEMENTED;
662 						return false;
663 					}
664 				case TCPOption.CORK:
665 					static if (!isIntegral!T)
666 						assert(false, "CORK value type must be int, not " ~ T.stringof);
667 					else {
668 						m_status.code = Status.NOT_IMPLEMENTED;
669 						return false;
670 					}
671 				case TCPOption.DEFER_ACCEPT: // seconds
672 					static if (!isIntegral!T)
673 						assert(false, "DEFER_ACCEPT value type must be integral, not " ~ T.stringof);
674 					else {
675 						int val = value.to!int;
676 						socklen_t len = val.sizeof;
677 						err = setsockopt(fd, SOL_SOCKET, SO_CONDITIONAL_ACCEPT, &val, len);
678 						return errorHandler();
679 					}
680 			}
681 			
682 		}
683 		catch (Exception e) {
684 			return false;
685 		}
686 		
687 	}
688 	
689 	uint read(in fd_t fd, ref ubyte[] data)
690 	{
691 		return 0;
692 	}
693 	
694 	uint write(in fd_t fd, in ubyte[] data)
695 	{
696 		return 0;
697 	}
698 	
699 	uint readChanges(in fd_t fd, ref DWChangeInfo[] dst) {
700 		size_t i;
701 		Array!DWChangeInfo* changes;
702 		try {
703 			changes = &(m_dwHandlers.get(fd, DWHandlerInfo.init).buffer);
704 			if ((*changes).empty)
705 				return 0;
706 			
707 			import std.algorithm : min;
708 			size_t cnt = min(dst.length, changes.length);
709 			foreach (DWChangeInfo change; (*changes)[0 .. cnt]) {
710 				try log("reading change: " ~ change.path); catch {}
711 				dst[i] = (*changes)[i];
712 				i++;
713 			}
714 			changes.linearRemove((*changes)[0 .. cnt]);
715 		}
716 		catch (Exception e) {
717 			setInternalError!"watcher.readChanges"(Status.ERROR, "Could not read directory changes: " ~ e.msg);
718 			return 0;
719 		}
720 		try log("Changes returning with: " ~ i.to!string); catch {}
721 		return cast(uint) i;
722 	}
723 	
724 	uint watch(in fd_t fd, in WatchInfo info) {
725 		m_status = StatusInfo.init;
726 		uint wd;
727 		try {
728 			HANDLE hndl = CreateFileW(toUTFz!(const(wchar)*)(info.path.toNativeString()),
729 			                          FILE_LIST_DIRECTORY,
730 			                          FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE,
731 			                          null,
732 			                          OPEN_EXISTING,
733 			                          FILE_FLAG_BACKUP_SEMANTICS | FILE_FLAG_OVERLAPPED,
734 			                          null);
735 			wd = cast(uint) hndl;
736 			DWHandlerInfo handler = m_dwHandlers.get(fd, DWHandlerInfo.init);
737 			assert(handler !is null);
738 			log("Watching: " ~ info.path.toNativeString());
739 			(m_dwFolders)[wd] = FreeListObjectAlloc!DWFolderWatcher.alloc(m_evLoop, fd, hndl, info.path, info.events, handler, info.recursive);
740 		} catch (Exception e) {
741 			setInternalError!"watch"(Status.ERROR, "Could not start watching directory: " ~ e.msg);
742 			return 0;
743 		}
744 		return wd;
745 	}
746 	
747 	bool unwatch(in fd_t fd, in fd_t _wd) {
748 		uint wd = cast(uint) _wd;
749 		m_status = StatusInfo.init;
750 		try {
751 			DWFolderWatcher fw = m_dwFolders.get(wd, null);
752 			assert(fw !is null);
753 			m_dwFolders.remove(wd);
754 			fw.close();
755 			FreeListObjectAlloc!DWFolderWatcher.free(fw);
756 		} catch (Exception e) {
757 			setInternalError!"unwatch"(Status.ERROR, "Failed when unwatching directory: " ~ e.msg);
758 			return false;
759 		}
760 		return true;
761 	}
762 	
763 	bool notify(T)(in fd_t fd, in T payload) 
764 		if (is(T == shared AsyncSignal) || is(T == AsyncNotifier))
765 	{
766 		m_status = StatusInfo.init;
767 		import std.conv;
768 
769 		auto payloadPtr = cast(ubyte*)payload;
770 		auto payloadAddr = cast(ulong)payloadPtr;
771 
772 		WPARAM wparam = payloadAddr & 0xffffffff;
773 		LPARAM lparam = cast(uint) (payloadAddr >> 32);
774 
775 		BOOL err;
776 		static if (is(T == AsyncNotifier))
777 			err = PostMessageA(cast(HWND)fd, WM_USER_SIGNAL, wparam, lparam);
778 		else
779 			err = PostMessageA(cast(HWND)fd, WM_USER_EVENT, wparam, lparam);
780 		try log("Sending notification to: " ~ (cast(HWND)fd).to!string); catch {}
781 		if (err == 0)
782 		{
783 			m_error = GetLastErrorSafe();
784 			m_status.code = Status.ERROR;
785 			m_status.text = "notify";
786 			log(m_status);
787 			return false;
788 		}
789 		return true;
790 	}
791 	
792 	uint recv(in fd_t fd, ref ubyte[] data)
793 	{
794 		m_status = StatusInfo.init;
795 		int ret = .recv(fd, cast(void*) data.ptr, cast(INT) data.length, 0);
796 		
797 		//try log("RECV " ~ ret.to!string ~ "B FD#" ~ fd.to!string); catch {}
798 		if (catchSocketError!".recv"(ret)) { // ret == -1
799 			if (m_error == error_t.WSAEWOULDBLOCK)
800 				m_status.code = Status.ASYNC;
801 			return 0; // TODO: handle some errors more specifically
802 		}
803 		m_status.code = Status.OK;
804 		
805 		return cast(uint) ret;
806 	}
807 	
808 	uint send(in fd_t fd, in ubyte[] data)
809 	{
810 		m_status = StatusInfo.init;
811 		//try log("SEND " ~ data.length.to!string ~ "B FD#" ~ fd.to!string);
812 		//catch{}
813 		int ret = .send(fd, cast(const(void)*) data.ptr, cast(INT) data.length, 0);
814 		
815 		if (catchSocketError!"send"(ret)) {
816 			if (m_error == error_t.WSAEWOULDBLOCK)
817 				m_status.code = Status.ASYNC;
818 			return 0; // TODO: handle some errors more specifically
819 		}
820 		m_status.code = Status.ASYNC;
821 		return cast(uint) ret;
822 	}
823 	
824 	bool broadcast(in fd_t fd, bool b) {
825 		int val = b?1:0;
826 		socklen_t len = val.sizeof;
827 		int err = setsockopt(fd, SOL_SOCKET, SO_BROADCAST, &val, len);
828 		if (catchSocketError!"setsockopt"(err))
829 			return false;
830 		
831 		return true;
832 		
833 	}
834 	
835 	uint recvFrom(in fd_t fd, ref ubyte[] data, ref NetworkAddress addr)
836 	{
837 		m_status = StatusInfo.init;
838 		socklen_t addrLen;
839 		addr.family = AF_INET;
840 		int ret = .recvfrom(fd, cast(void*) data.ptr, cast(INT) data.length, 0, addr.sockAddr, &addrLen);
841 		
842 		if (addrLen > addr.sockAddrLen) {
843 			addr.family = AF_INET6;
844 		}
845 		
846 		try log("RECVFROM " ~ ret.to!string ~ "B"); catch {}
847 		if (catchSocketError!".recvfrom"(ret)) { // ret == -1
848 			if (m_error == WSAEWOULDBLOCK)
849 				m_status.code = Status.ASYNC;
850 			return 0; // TODO: handle some errors more specifically
851 		}
852 		m_status.code = Status.OK;
853 		
854 		return cast(uint) ret;
855 	}
856 	
857 	uint sendTo(in fd_t fd, in ubyte[] data, in NetworkAddress addr)
858 	{
859 		m_status = StatusInfo.init;
860 		try log("SENDTO " ~ data.length.to!string ~ "B"); catch{}
861 		int ret;
862 		if (addr != NetworkAddress.init)
863 			ret = .sendto(fd, cast(void*) data.ptr, cast(INT) data.length, 0, addr.sockAddr, addr.sockAddrLen);
864 		else
865 			ret = .send(fd, cast(void*) data.ptr, cast(INT) data.length, 0);
866 		
867 		if (catchSocketError!".sendTo"(ret)) { // ret == -1
868 			if (m_error == WSAEWOULDBLOCK)
869 				m_status.code = Status.ASYNC;
870 			return 0; // TODO: handle some errors more specifically
871 		}
872 		
873 		m_status.code = Status.OK;
874 		return cast(uint) ret;
875 	}
876 	
877 	NetworkAddress localAddr(in fd_t fd, bool ipv6) {
878 		NetworkAddress ret;
879 		import libasync.internals.win32 : getsockname, AF_INET, AF_INET6, socklen_t, sockaddr;
880 		if (ipv6)
881 			ret.family = AF_INET6;
882 		else
883 			ret.family = AF_INET;
884 		socklen_t len = ret.sockAddrLen;
885 		int err = getsockname(fd, ret.sockAddr, &len);
886 		if (catchSocketError!"getsockname"(err))
887 			return NetworkAddress.init;
888 		if (len > ret.sockAddrLen)
889 			ret.family = AF_INET6;
890 		return ret;
891 	}
892 	
893 	void noDelay(in fd_t fd, bool b) {
894 		m_status = StatusInfo.init;
895 		setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &b, b.sizeof);
896 	}
897 	
898 	private bool closeRemoteSocket(fd_t fd, bool forced) {
899 		
900 		INT err;
901 		
902 		try log("Shutdown FD#" ~ fd.to!string);
903 		catch{}
904 		if (forced) {
905 			err = shutdown(fd, SD_BOTH);
906 			closesocket(fd);
907 		}
908 		else
909 			err = shutdown(fd, SD_SEND);
910 		
911 		try {
912 			TCPEventHandler* evh = fd in m_tcpHandlers;
913 			if (evh && evh.conn.inbound) {
914 				try FreeListObjectAlloc!AsyncTCPConnection.free(evh.conn);
915 				catch(Exception e) { assert(false, "Failed to free resources"); }
916 				evh.conn = null;
917 				//log("Remove event handler for " ~ fd.to!string);
918 				m_tcpHandlers.remove(fd);
919 			}
920 		}
921 		catch (Exception e) {
922 			setInternalError!"m_tcpHandlers.remove"(Status.ERROR);
923 			return false;
924 		}
925 		if (catchSocketError!"shutdown"(err))
926 			return false;
927 		return true;
928 	}
929 	
930 	// for connected sockets
931 	bool closeSocket(fd_t fd, bool connected, bool forced = false)
932 	{
933 		m_status = StatusInfo.init;
934 		if (!connected && forced) {
935 			try {
936 				if (fd in m_connHandlers) {
937 					log("Removing connection handler for: " ~ fd.to!string);
938 					m_connHandlers.remove(fd);
939 				}
940 			}
941 			catch (Exception e) {
942 				setInternalError!"m_connHandlers.remove"(Status.ERROR);
943 				return false;
944 			}
945 		}
946 		else if (connected)
947 			closeRemoteSocket(fd, forced);
948 
949 		if (!connected || forced) {
950 			// todo: flush the socket here?
951 			
952 			INT err = closesocket(fd);
953 			if (catchSocketError!"closesocket"(err)) 
954 				return false;
955 			
956 		}
957 		return true;
958 	}
959 	
960 	bool closeConnection(fd_t fd) {
961 		return closeSocket(fd, true);
962 	}
963 	
964 	NetworkAddress getAddressFromIP(in string ipAddr, in ushort port = 0, in bool ipv6 = false, in bool tcp = true)
965 	in {
966 		debug import libasync.internals.validator : validateIPv4, validateIPv6;
967 		debug assert( validateIPv4(ipAddr) || validateIPv6(ipAddr), "Trying to connect to an invalid IP address");
968 	}
969 	body {
970 		m_status = StatusInfo.init;
971 		
972 		NetworkAddress addr;
973 		WSAPROTOCOL_INFOW hints;
974 		import std.conv : to;
975 		if (ipv6) {
976 			addr.family = AF_INET6;
977 		}
978 		else {
979 			addr.family = AF_INET;
980 		}
981 		
982 		INT addrlen = addr.sockAddrLen;
983 		
984 		LPWSTR str;
985 		try {
986 			str = cast(LPWSTR) toUTFz!(wchar*)(ipAddr);
987 		} catch (Exception e) {
988 			setInternalError!"toStringz"(Status.ERROR, e.msg);
989 			return NetworkAddress.init;
990 		}
991 		
992 		INT err = WSAStringToAddressW(str, cast(INT) addr.family, null, addr.sockAddr, &addrlen); 
993 		if (port != 0) addr.port = port;
994 		try log(addr.toString());
995 		catch {}
996 		if( catchSocketError!"getAddressFromIP"(err) )
997 			return NetworkAddress.init;
998 		else assert(addrlen == addr.sockAddrLen);
999 		return addr;
1000 	}
1001 	
1002 	NetworkAddress getAddressFromDNS(in string host, in ushort port = 0, in bool ipv6 = true, in bool tcp = true, in bool force = true)
1003 		/*in { 
1004 		debug import libasync.internals.validator : validateHost;
1005 		debug assert(validateHost(host), "Trying to connect to an invalid domain");
1006 	}
1007 	body */{
1008 		m_status = StatusInfo.init;
1009 		import std.conv : to;
1010 		NetworkAddress addr;
1011 		ADDRINFOW hints;
1012 		ADDRINFOW* infos;
1013 		LPCWSTR wPort = port.to!(wchar[]).toUTFz!(const(wchar)*);
1014 		if (ipv6) {
1015 			hints.ai_family = AF_INET6;
1016 			addr.family = AF_INET6;
1017 		}
1018 		else {
1019 			hints.ai_family = AF_INET;
1020 			addr.family = AF_INET;
1021 		}
1022 		
1023 		if (tcp) {
1024 			hints.ai_protocol = IPPROTO_TCP;
1025 			hints.ai_socktype = SOCK_STREAM;
1026 		}
1027 		else {
1028 			hints.ai_protocol = IPPROTO_UDP;
1029 			hints.ai_socktype = SOCK_DGRAM;
1030 		}
1031 		if (port != 0) addr.port = port;
1032 		
1033 		LPCWSTR str;
1034 		
1035 		try {
1036 			str = cast(LPCWSTR) toUTFz!(immutable(wchar)*)(host);
1037 		} catch (Exception e) {
1038 			setInternalError!"toUTFz"(Status.ERROR, e.msg);
1039 			return NetworkAddress.init;
1040 		}
1041 		
1042 		error_t err = cast(error_t) GetAddrInfoW(str, cast(LPCWSTR) wPort, &hints, &infos);
1043 		if (err != EWIN.WSA_OK) {
1044 			setInternalError!"GetAddrInfoW"(Status.ABORT, string.init, err);
1045 			return NetworkAddress.init;
1046 		}
1047 		
1048 		ubyte* pAddr = cast(ubyte*) infos.ai_addr;
1049 		ubyte* data = cast(ubyte*) addr.sockAddr;
1050 		data[0 .. infos.ai_addrlen] = pAddr[0 .. infos.ai_addrlen]; // perform bit copy
1051 		FreeAddrInfoW(infos);
1052 		try log("GetAddrInfoW Successfully resolved DNS to: " ~ addr.toAddressString());
1053 		catch (Exception e){}
1054 		return addr;
1055 	}
1056 	
1057 	void setInternalError(string TRACE)(in Status s, in string details = "", in error_t error = EWIN.ERROR_ACCESS_DENIED)
1058 	{
1059 		if (details.length > 0)
1060 			m_status.text = TRACE ~ ": " ~ details;
1061 		else
1062 			m_status.text = TRACE;
1063 		m_error = error;
1064 		m_status.code = s;
1065 		static if(LOG) log(m_status);
1066 	}
1067 private:
1068 	bool onMessage(MSG msg) 
1069 	{
1070 		m_status = StatusInfo.init;
1071 		switch (msg.message) {
1072 			case WM_TCP_SOCKET:
1073 				auto evt = LOWORD(msg.lParam);
1074 				auto err = HIWORD(msg.lParam);
1075 				if (!onTCPEvent(evt, err, cast(fd_t)msg.wParam)) {
1076 
1077 					if (evt == FD_ACCEPT)
1078 						setInternalError!"del@TCPAccept.ERROR"(Status.ERROR); 
1079 					else {
1080 						try {
1081 							TCPEventHandler cb = m_tcpHandlers.get(cast(fd_t)msg.wParam);
1082 							cb(TCPEvent.ERROR);
1083 						}
1084 						catch (Exception e) {
1085 							// An Error callback should never fail...
1086 							setInternalError!"del@TCPEvent.ERROR"(Status.ERROR); 
1087 							// assert(false, evt.to!string ~ " & " ~ m_status.to!string ~ " & " ~ m_error.to!string); 
1088 						}
1089 					}
1090 				}
1091 				break;
1092 			case WM_UDP_SOCKET:
1093 				auto evt = LOWORD(msg.lParam);
1094 				auto err = HIWORD(msg.lParam);
1095 				if (!onUDPEvent(evt, err, cast(fd_t)msg.wParam)) {
1096 					try {
1097 						UDPHandler cb = m_udpHandlers.get(cast(fd_t)msg.wParam);
1098 						cb(UDPEvent.ERROR);
1099 					}
1100 					catch (Exception e) {
1101 						// An Error callback should never fail...
1102 						setInternalError!"del@UDPEvent.ERROR"(Status.ERROR); 
1103 					}
1104 				}
1105 				break;
1106 			case WM_TIMER:
1107 				log("Timer callback");
1108 				TimerHandler cb;
1109 				bool cached = (m_timer.fd == cast(fd_t)msg.wParam);
1110 				try {
1111 					if (cached)
1112 						cb = m_timer.cb;
1113 					else
1114 						cb = m_timerHandlers.get(cast(fd_t)msg.wParam);
1115 					
1116 					cb.ctxt.rearmed = false;
1117 					
1118 					if (cb.ctxt.oneShot)
1119 						kill(cb.ctxt);
1120 					
1121 					cb();
1122 					
1123 					if (cb.ctxt.oneShot && cb.ctxt.rearmed)
1124 						run(cb.ctxt, cb, cb.ctxt.timeout);
1125 					
1126 				}
1127 				catch (Exception e) {
1128 					// An Error callback should never fail...
1129 					setInternalError!"del@TimerHandler"(Status.ERROR, e.msg);  
1130 				}
1131 				
1132 				break;
1133 			case WM_USER_EVENT:
1134 				log("User event");
1135 
1136 				ulong uwParam = cast(ulong)msg.wParam;
1137 				ulong ulParam = cast(ulong)msg.lParam;
1138 
1139 				ulong payloadAddr = (ulParam << 32) | uwParam;
1140 				void* payloadPtr = cast(void*) payloadAddr;
1141 				shared AsyncSignal ctxt = cast(shared AsyncSignal) payloadPtr;
1142 
1143 				try log("Got notification in : " ~ m_hwnd.to!string ~ " pointer: " ~ payloadPtr.to!string); catch {}
1144 				try {
1145 					assert(ctxt.id != 0);
1146 					ctxt.handler();
1147 				}
1148 				catch (Exception e) {
1149 					setInternalError!"WM_USER_EVENT@handler"(Status.ERROR); 
1150 				}
1151 				break;
1152 			case WM_USER_SIGNAL:
1153 				log("User signal");
1154 
1155 				ulong uwParam = cast(ulong)msg.wParam;
1156 				ulong ulParam = cast(ulong)msg.lParam;
1157 
1158 				ulong payloadAddr = (ulParam << 32) | uwParam;
1159 				void* payloadPtr = cast(void*) payloadAddr;
1160 				AsyncNotifier ctxt = cast(AsyncNotifier) payloadPtr;
1161 
1162 				try {
1163 					ctxt.handler();
1164 				}
1165 				catch (Exception e) {
1166 					setInternalError!"WM_USER_SIGNAL@handler"(Status.ERROR); 
1167 				}
1168 				break;
1169 			default: return false; // not handled, sends to wndProc
1170 		}
1171 		return true;
1172 	}
1173 	
1174 	bool onUDPEvent(WORD evt, WORD err, fd_t sock) {
1175 		m_status = StatusInfo.init;
1176 		try{
1177 			if (m_udpHandlers.get(sock) == UDPHandler.init)
1178 				return false;
1179 		}	catch {}
1180 		if (sock == 0) { // highly unlikely...
1181 			setInternalError!"onUDPEvent"(Status.ERROR, "no socket defined");
1182 			return false;
1183 		}
1184 		if (err) {
1185 			setInternalError!"onUDPEvent"(Status.ERROR, string.init, cast(error_t)err);
1186 			try {
1187 				//log("CLOSE FD#" ~ sock.to!string);
1188 				(m_udpHandlers)[sock](UDPEvent.ERROR);
1189 			} catch { // can't do anything about this...
1190 			}
1191 			return false;
1192 		}
1193 		
1194 		UDPHandler cb;
1195 		switch(evt) {
1196 			default: break;
1197 			case FD_READ:
1198 				try {
1199 					log("READ FD#" ~ sock.to!string);
1200 					cb = m_udpHandlers.get(sock);
1201 					assert(cb != UDPHandler.init, "Socket " ~ sock.to!string ~ " could not yield a callback");
1202 					cb(UDPEvent.READ);
1203 				}
1204 				catch (Exception e) {
1205 					setInternalError!"del@TCPEvent.READ"(Status.ABORT); 
1206 					return false;
1207 				}
1208 				break;
1209 			case FD_WRITE:
1210 				try {
1211 					log("WRITE FD#" ~ sock.to!string);
1212 					cb = m_udpHandlers.get(sock);
1213 					assert(cb != UDPHandler.init, "Socket " ~ sock.to!string ~ " could not yield a callback");
1214 					cb(UDPEvent.WRITE);
1215 				}
1216 				catch (Exception e) {
1217 					setInternalError!"del@TCPEvent.WRITE"(Status.ABORT); 
1218 					return false;
1219 				}
1220 				break;
1221 		}
1222 		return true;
1223 	}
1224 	
1225 	bool onTCPEvent(WORD evt, WORD err, fd_t sock) {
1226 		m_status = StatusInfo.init;
1227 		try{
1228 			if (m_tcpHandlers.get(sock) == TCPEventHandler.init && m_connHandlers.get(sock) == TCPAcceptHandler.init)
1229 				assert( false ); 
1230 		}	catch {}
1231 		if (sock == 0) { // highly unlikely...
1232 			setInternalError!"onTCPEvent"(Status.ERROR, "no socket defined");
1233 			assert(false);
1234 		}
1235 		if (err) {
1236 			setInternalError!"onTCPEvent"(Status.ERROR, string.init, cast(error_t)err);
1237 			try {
1238 				//log("CLOSE FD#" ~ sock.to!string);
1239 				(m_tcpHandlers)[sock](TCPEvent.ERROR);
1240 			} catch { // can't do anything about this...
1241 			}
1242 			return false;
1243 		}
1244 		
1245 		TCPEventHandler cb;
1246 		switch(evt) {
1247 			default: break;
1248 			case FD_ACCEPT:
1249 				gs_mtx.lock_nothrow();
1250 
1251 				log("TCP Handlers: " ~ m_tcpHandlers.length.to!string);
1252 				log("Accepting connection");
1253 				/// Let another listener take the next connection
1254 				TCPAcceptHandler list;
1255 				try list = m_connHandlers[sock]; catch { assert(false, "Listening on an invalid socket..."); }
1256 				scope(exit) {
1257 					/// The connection rotation mechanism is handled by the TCPListenerDistMixins
1258 					/// when registering the same AsyncTCPListener object on multiple event loops.
1259 					/// This allows to even out the CPU usage on a server instance.
1260 					HWND hwnd = list.ctxt.next(m_hwnd);
1261 					if (hwnd !is HWND.init) {
1262 						int error = WSAAsyncSelect(sock, hwnd, WM_TCP_SOCKET, FD_ACCEPT);
1263 						if (catchSocketError!"WSAAsyncSelect.NEXT()=> HWND"(error)) {
1264 							error = WSAAsyncSelect(sock, m_hwnd, WM_TCP_SOCKET, FD_ACCEPT);
1265 							if (catchSocketError!"WSAAsyncSelect"(error))
1266 								assert(false, "Could not set listener back to window HANDLE " ~ m_hwnd.to!string); 
1267 						}
1268 					}
1269 					else log("Returned init!!");
1270 
1271 					gs_mtx.unlock_nothrow();
1272 				}
1273 
1274 				NetworkAddress addr;
1275 				addr.family = AF_INET;
1276 				int addrlen = addr.sockAddrLen;
1277 				fd_t csock = WSAAccept(sock, addr.sockAddr, &addrlen, null, 0);
1278 
1279 				if (catchSocketError!"WSAAccept"(csock, INVALID_SOCKET)) {
1280 					if (m_error == WSAEFAULT) { // not enough space for sockaddr
1281 						addr.family = AF_INET6;
1282 						addrlen = addr.sockAddrLen;
1283 						csock = WSAAccept(sock, addr.sockAddr, &addrlen, null, 0);
1284 						if (catchSocketError!"WSAAccept"(csock, INVALID_SOCKET))
1285 							return false;
1286 					}
1287 					else return false;
1288 				}
1289 
1290 				int ok = WSAAsyncSelect(csock, m_hwnd, WM_TCP_SOCKET, FD_CONNECT|FD_READ|FD_WRITE|FD_CLOSE);
1291 				if ( catchSocketError!"WSAAsyncSelect"(ok) ) 
1292 					return false;
1293 
1294 				log("Connection accepted: " ~ csock.to!string);
1295 
1296 				AsyncTCPConnection conn;
1297 				try conn = FreeListObjectAlloc!AsyncTCPConnection.alloc(m_evLoop);
1298 				catch (Exception e) { assert(false, "Failed allocation"); }
1299 				conn.peer = addr;
1300 				conn.socket = csock;
1301 				conn.inbound = true;
1302 
1303 				try {
1304 					// Do the callback to get a handler
1305 					cb = list(conn); 
1306 				} 
1307 				catch(Exception e) {
1308 					setInternalError!"onConnected"(Status.EVLOOP_FAILURE); 
1309 					return false; 
1310 				}
1311 
1312 				try {
1313 					m_tcpHandlers[csock] = cb; // keep the handler to setup the connection
1314 					log("ACCEPT&CONNECT FD#" ~ csock.to!string);
1315 					*conn.connecting = true;
1316 					//cb(TCPEvent.CONNECT);
1317 				}
1318 				catch (Exception e) { 
1319 					setInternalError!"m_tcpHandlers.opIndexAssign"(Status.ABORT); 
1320 					return false; 
1321 				}
1322 				break;
1323 			case FD_CONNECT:
1324 				try {
1325 					log("CONNECT FD#" ~ sock.to!string);
1326 					cb = m_tcpHandlers.get(sock);
1327 					if (cb == TCPEventHandler.init) break;//, "Socket " ~ sock.to!string ~ " could not yield a callback");
1328 					*cb.conn.connecting = true;
1329 				} 
1330 				catch(Exception e) {
1331 					setInternalError!"del@TCPEvent.CONNECT"(Status.ABORT);
1332 					return false;
1333 				}
1334 				break;
1335 			case FD_READ:
1336 				try {
1337 					log("READ FD#" ~ sock.to!string);
1338 					cb = m_tcpHandlers.get(sock);
1339 					if (cb == TCPEventHandler.init) break; //, "Socket " ~ sock.to!string ~ " could not yield a callback");
1340 					if (!cb.conn) break;
1341 					if (*cb.conn.connected == false && *cb.conn.connecting) {
1342 						*cb.conn.connecting = false;
1343 						*cb.conn.connected = true;
1344 						cb(TCPEvent.CONNECT);
1345 					}
1346 					else 
1347 						cb(TCPEvent.READ);
1348 				}
1349 				catch (Exception e) {
1350 					setInternalError!"del@TCPEvent.READ"(Status.ABORT); 
1351 					return false;
1352 				}
1353 				break;
1354 			case FD_WRITE:
1355 				// todo: don't send the first write for consistency with epoll?
1356 				
1357 				try {
1358 					//import std.stdio;
1359 					log("WRITE FD#" ~ sock.to!string);
1360 					cb = m_tcpHandlers.get(sock);
1361 					if (cb == TCPEventHandler.init) break;//assert(cb != TCPEventHandler.init, "Socket " ~ sock.to!string ~ " could not yield a callback");
1362 					if (!cb.conn) break;
1363 					if (*cb.conn.connected == false && *cb.conn.connecting) {
1364 						*cb.conn.connecting = false;
1365 						*cb.conn.connected = true;
1366 						cb(TCPEvent.CONNECT);
1367 					}
1368 					else {
1369 						cb(TCPEvent.WRITE);
1370 					}
1371 				}
1372 				catch (Exception e) {
1373 					setInternalError!"del@TCPEvent.WRITE"(Status.ABORT); 
1374 					return false;
1375 				}
1376 				break;
1377 			case FD_CLOSE:
1378 				// called after shutdown()
1379 				INT ret;
1380 				bool connected = true;
1381 				try {
1382 					log("CLOSE FD#" ~ sock.to!string);
1383 					if (sock in m_tcpHandlers) {
1384 						cb = m_tcpHandlers.get(sock);
1385 						if (*cb.conn.connected) {
1386 							cb(TCPEvent.CLOSE);
1387 							*cb.conn.connecting = false;
1388 							*cb.conn.connected = false;
1389 						} else
1390 							connected = false;
1391 					}
1392 					else
1393 						connected = false;
1394 				}
1395 				catch (Exception e) {
1396 					if (m_status.code == Status.OK)
1397 						setInternalError!"del@TCPEvent.CLOSE"(Status.ABORT); 
1398 					return false;
1399 				}
1400 				
1401 				closeSocket(sock, connected, true); // as necessary: invokes m_tcpHandlers.remove(fd), shutdown, closesocket
1402 				
1403 				break;
1404 		}
1405 		return true;
1406 	}
1407 	
1408 	bool initUDPSocket(fd_t fd, AsyncUDPSocket ctxt)
1409 	{
1410 		INT err;
1411 		err = bind(fd, ctxt.local.sockAddr, ctxt.local.sockAddrLen);
1412 		if (catchSocketError!"bind"(err)) {
1413 			closesocket(fd);
1414 			return false;
1415 		}
1416 		err = listen(fd, 128);
1417 		if (catchSocketError!"listen"(err)) {
1418 			closesocket(fd);
1419 			return false;
1420 		}
1421 		err = WSAAsyncSelect(fd, m_hwnd, WM_UDP_SOCKET, FD_READ | FD_WRITE);
1422 		if (catchSocketError!"WSAAsyncSelect"(err)) {
1423 			closesocket(fd);
1424 			return false;
1425 		}
1426 		
1427 		return true;
1428 	}
1429 	
1430 	bool initTCPListener(fd_t fd, AsyncTCPListener ctxt, bool reusing = false)
1431 	in { 
1432 		assert(m_threadId == GetCurrentThreadId());
1433 		assert(ctxt.local !is NetworkAddress.init);
1434 	}
1435 	body {
1436 		INT err;
1437 		if (!reusing) {
1438 			err = bind(fd, ctxt.local.sockAddr, ctxt.local.sockAddrLen);
1439 			if (catchSocketError!"bind"(err)) {
1440 				closesocket(fd);
1441 				return false;
1442 			}
1443 
1444 			err = listen(fd, 128);
1445 			if (catchSocketError!"listen"(err)) {
1446 				closesocket(fd);
1447 				return false;
1448 			}
1449 			
1450 			err = WSAAsyncSelect(fd, m_hwnd, WM_TCP_SOCKET, FD_ACCEPT);
1451 			if (catchSocketError!"WSAAsyncSelect"(err)) {
1452 				closesocket(fd);
1453 				return false;
1454 			}
1455 		}
1456 		
1457 		return true;
1458 	}
1459 	
1460 	bool initTCPConnection(fd_t fd, AsyncTCPConnection ctxt)
1461 	in { 
1462 		assert(ctxt.peer !is NetworkAddress.init);
1463 		assert(ctxt.peer.port != 0, "Connecting to an invalid port");
1464 	}
1465 	body {
1466 		INT err;
1467 		NetworkAddress bind_addr;
1468 		bind_addr.family = ctxt.peer.family;
1469 		
1470 		if (ctxt.peer.family == AF_INET) 
1471 			bind_addr.sockAddrInet4.sin_addr.s_addr = 0;
1472 		else if (ctxt.peer.family == AF_INET6) 
1473 			bind_addr.sockAddrInet6.sin6_addr.s6_addr[] = 0;
1474 		else assert(false, "Invalid NetworkAddress.family " ~ ctxt.peer.family.to!string);
1475 		
1476 		err = .bind(fd, bind_addr.sockAddr, bind_addr.sockAddrLen);
1477 		if ( catchSocketError!"bind"(err) ) 
1478 			return false;
1479 		err = WSAAsyncSelect(fd, m_hwnd, WM_TCP_SOCKET, FD_CONNECT|FD_READ|FD_WRITE|FD_CLOSE);
1480 		if ( catchSocketError!"WSAAsyncSelect"(err) ) 
1481 			return false;
1482 		err = .connect(fd, ctxt.peer.sockAddr, ctxt.peer.sockAddrLen);
1483 		
1484 		auto errors = [	tuple(cast(size_t) SOCKET_ERROR, EWIN.WSAEWOULDBLOCK, Status.ASYNC) ];		
1485 		
1486 		if (catchSocketErrorsEq!"connectEQ"(err, errors))
1487 			return true;
1488 		else if (catchSocketError!"connect"(err))
1489 			return false;
1490 		
1491 		return true;
1492 	}
1493 	
1494 	bool catchErrors(string TRACE, T)(T val, Tuple!(T, Status)[] cmp ...)
1495 		if (isIntegral!T)
1496 	{
1497 		foreach (validator ; cmp) {
1498 			if (val == validator[0]) {
1499 				m_status.text = TRACE;
1500 				m_status.code = validator[1];
1501 				if (m_status.code == Status.EVLOOP_TIMEOUT) {
1502 					log(m_status);
1503 					break;
1504 				}
1505 				m_error = GetLastErrorSafe();
1506 				static if(LOG) log(m_status);
1507 				return true;
1508 			}
1509 		}
1510 		return false;
1511 	}
1512 	
1513 	bool catchSocketErrors(string TRACE, T)(T val, Tuple!(T, Status)[] cmp ...)
1514 		if (isIntegral!T)
1515 	{
1516 		foreach (validator ; cmp) {
1517 			if (val == validator[0]) {
1518 				m_status.text = TRACE;
1519 				m_error = WSAGetLastErrorSafe();
1520 				m_status.status = validator[1];
1521 				static if(LOG) log(m_status);
1522 				return true;
1523 			}
1524 		}
1525 		return false;
1526 	}
1527 	
1528 	bool catchSocketErrorsEq(string TRACE, T)(T val, Tuple!(T, error_t, Status)[] cmp ...)
1529 		if (isIntegral!T)
1530 	{
1531 		error_t err;
1532 		foreach (validator ; cmp) {
1533 			if (val == validator[0]) {
1534 				if (err is EWIN.init) err = WSAGetLastErrorSafe();
1535 				if (err == validator[1]) {
1536 					m_status.text = TRACE;
1537 					m_error = WSAGetLastErrorSafe();
1538 					m_status.code = validator[2];
1539 					static if(LOG) log(m_status);
1540 					return true;
1541 				}
1542 			}
1543 		}
1544 		return false;
1545 	}
1546 	
1547 	
1548 	bool catchSocketError(string TRACE, T)(T val, T cmp = SOCKET_ERROR)
1549 		if (isIntegral!T)
1550 	{
1551 		if (val == cmp) {
1552 			m_status.text = TRACE;
1553 			m_error = WSAGetLastErrorSafe();
1554 			m_status.code = Status.ABORT;
1555 			static if(LOG) log(m_status);
1556 			return true;
1557 		}
1558 		return false;
1559 	}
1560 	
1561 	error_t WSAGetLastErrorSafe() {
1562 		try {
1563 			return cast(error_t) WSAGetLastError();
1564 		} catch(Exception e) {
1565 			return EWIN.ERROR_ACCESS_DENIED;
1566 		}
1567 	}
1568 	
1569 	error_t GetLastErrorSafe() {
1570 		try {
1571 			return cast(error_t) GetLastError();
1572 		} catch(Exception e) {
1573 			return EWIN.ERROR_ACCESS_DENIED;
1574 		}
1575 	}
1576 	
1577 	void log(StatusInfo val)
1578 	{
1579 		static if (LOG) {
1580 			import std.stdio;
1581 			try {
1582 				writeln("Backtrace: ", m_status.text);
1583 				writeln(" | Status:  ", m_status.code);
1584 				writeln(" | Error: " , m_error);
1585 				if ((m_error in EWSAMessages) !is null)
1586 					writeln(" | Message: ", EWSAMessages[m_error]);
1587 			} catch(Exception e) {
1588 				return;
1589 			}
1590 		}
1591 	}
1592 	
1593 	void log(T)(T val)
1594 	{
1595 		static if (LOG) {
1596 			import std.stdio;
1597 			try {
1598 				writeln(val);
1599 			} catch(Exception e) {
1600 				return;
1601 			}
1602 		}
1603 	}
1604 	
1605 }
1606 
1607 mixin template TCPConnectionMixins() {
1608 	
1609 	private CleanupData m_impl;
1610 	
1611 	struct CleanupData {
1612 		bool connected;
1613 		bool connecting;
1614 	}
1615 	
1616 	@property bool* connecting() {
1617 		return &m_impl.connecting;
1618 	}
1619 	
1620 	@property bool* connected() {
1621 		return &m_impl.connected;
1622 	}
1623 	
1624 }
1625 
1626 mixin template TCPListenerDistMixins()
1627 {
1628 	import std.c.windows.windows : HWND;
1629 	import libasync.internals.hashmap : HashMap;
1630 	import core.sync.mutex;
1631 	private {
1632 		bool m_dist;
1633 		
1634 		Tuple!(WinReference, bool*) m_handles;
1635 		__gshared HashMap!(fd_t, Tuple!(WinReference, bool*)) gs_dist;
1636 		__gshared Mutex gs_mutex;
1637 	}
1638 
1639 	/// The TCP Listener schedules distributed connection handlers based on
1640 	/// the event loops that are using the same AsyncTCPListener object.
1641 	/// This is done by using WSAAsyncSelect on a different window after each
1642 	/// accept TCPEvent.
1643 	class WinReference {
1644 		private {
1645 			struct Item {
1646 				HWND handle;
1647 				bool active;
1648 			}
1649 
1650 			Item[] m_items;
1651 		}
1652 
1653 		this(HWND hndl, bool b) {
1654 			append(hndl, b);
1655 		}
1656 
1657 		void append(HWND hndl, bool b) {
1658 			m_items ~= Item(hndl, b);
1659 		}
1660 
1661 		HWND next(HWND me) {
1662 			Item[] items;
1663 			synchronized(gs_mutex)
1664 				items = m_items;
1665 			if (items.length == 1)
1666 				return me;
1667 			foreach (i, item; items) {
1668 				if (item.active == true) {
1669 					m_items[i].active = false; // remove responsibility
1670 					if (m_items.length <= i + 1) {
1671 						m_items[0].active = true; // set responsibility
1672 						auto ret = m_items[0].handle;
1673 						return ret;
1674 					}
1675 					else {
1676 						m_items[i + 1].active = true;
1677 						auto ret = m_items[i + 1].handle;
1678 						return ret;
1679 					}
1680 				}
1681 				
1682 			}
1683 			assert(false);
1684 		}
1685 
1686 	}
1687 
1688 	void init(HWND hndl, fd_t sock) {
1689 		try {
1690 			if (!gs_mutex) {
1691 				gs_mutex = new Mutex;
1692 			}
1693 			synchronized(gs_mutex) {
1694 				m_handles = gs_dist.get(sock);
1695 				if (m_handles == typeof(m_handles).init) {
1696 					gs_dist[sock] = Tuple!(WinReference, bool*)(new WinReference(hndl, true), &m_dist);
1697 					m_handles = gs_dist.get(sock);
1698 					assert(m_handles != typeof(m_handles).init);
1699 				}
1700 				else {
1701 					m_handles[0].append(hndl, false);
1702 					*m_handles[1] = true; // set first thread to dist
1703 					m_dist = true; // set this thread to dist
1704 				}
1705 			}
1706 		} catch (Exception e) {
1707 			assert(false, e.toString());
1708 		}
1709 		
1710 	}
1711 	
1712 	HWND next(HWND me) {
1713 		try {
1714 			if (!m_dist)
1715 				return HWND.init;
1716 			return m_handles[0].next(me);
1717 		}
1718 		catch (Exception e) {
1719 			assert(false, e.toString());
1720 		}
1721 	}
1722 	
1723 }
1724 private class DWHandlerInfo {
1725 	DWHandler handler;
1726 	Array!DWChangeInfo buffer;
1727 	
1728 	this(DWHandler cb) {
1729 		handler = cb;
1730 	}
1731 }
1732 
1733 private final class DWFolderWatcher {
1734 	import libasync.internals.path;
1735 private:
1736 	EventLoop m_evLoop;
1737 	fd_t m_fd;
1738 	bool m_recursive;
1739 	HANDLE m_handle;
1740 	Path m_path;
1741 	DWFileEvent m_events;
1742 	DWHandlerInfo m_handler; // contains buffer
1743 	shared AsyncSignal m_signal;
1744 	ubyte[FILE_NOTIFY_INFORMATION.sizeof + MAX_PATH + 1] m_buffer;
1745 	DWORD m_bytesTransferred;
1746 public:
1747 	this(EventLoop evl, in fd_t fd, in HANDLE hndl, in Path path, in DWFileEvent events, DWHandlerInfo handler, bool recursive) {
1748 		import std.stdio : writeln;
1749 		try writeln("Creating directory watcher for path: " ~ path.toNativeString()); catch {}
1750 		m_fd = fd;
1751 		m_recursive = recursive;
1752 		m_handle = cast(HANDLE)hndl;
1753 		m_evLoop = evl;
1754 		m_path = path;
1755 		m_handler = handler;
1756 		
1757 		m_signal = new shared AsyncSignal(m_evLoop);
1758 		m_signal.run(&onChanged);
1759 		triggerWatch();
1760 	}
1761 package:
1762 	void close() {
1763 		CloseHandle(m_handle);
1764 		m_signal.kill();
1765 	}
1766 	
1767 	void triggerChanged() {
1768 		m_signal.trigger();
1769 	}
1770 	
1771 	void onChanged() {
1772 		ubyte[] result = m_buffer.ptr[0 .. m_bytesTransferred];
1773 		do {
1774 			assert(result.length >= FILE_NOTIFY_INFORMATION.sizeof);
1775 			auto fni = cast(FILE_NOTIFY_INFORMATION*)result.ptr;
1776 			DWFileEvent kind;
1777 			switch( fni.Action ){
1778 				default: kind = DWFileEvent.MODIFIED; break;
1779 				case 0x1: kind = DWFileEvent.CREATED; break;
1780 				case 0x2: kind = DWFileEvent.DELETED; break;
1781 				case 0x3: kind = DWFileEvent.MODIFIED; break;
1782 				case 0x4: kind = DWFileEvent.MOVED_FROM; break;
1783 				case 0x5: kind = DWFileEvent.MOVED_TO; break;
1784 			}
1785 			string filename = to!string(fni.FileName.ptr[0 .. fni.FileNameLength/2]); // FileNameLength = #bytes, FileName=WCHAR[]
1786 			m_handler.buffer.insert(DWChangeInfo(kind, m_path ~ Path(filename)));
1787 			if( fni.NextEntryOffset == 0 ) break;
1788 			result = result[fni.NextEntryOffset .. $];
1789 		} while(result.length > 0);
1790 		
1791 		triggerWatch();
1792 		
1793 		m_handler.handler();
1794 	}
1795 	
1796 	void triggerWatch() {
1797 		
1798 		static UINT notifications = FILE_NOTIFY_CHANGE_FILE_NAME|FILE_NOTIFY_CHANGE_DIR_NAME|
1799 			FILE_NOTIFY_CHANGE_SIZE|FILE_NOTIFY_CHANGE_LAST_WRITE;
1800 		
1801 		OVERLAPPED* overlapped = FreeListObjectAlloc!OVERLAPPED.alloc();
1802 		overlapped.Internal = 0;
1803 		overlapped.InternalHigh = 0;
1804 		overlapped.Offset = 0;
1805 		overlapped.OffsetHigh = 0;
1806 		overlapped.Pointer = cast(void*)this;
1807 		import std.stdio;
1808 		DWORD bytesReturned;
1809 		BOOL success = ReadDirectoryChangesW(m_handle, m_buffer.ptr, m_buffer.length, cast(BOOL) m_recursive, notifications, &bytesReturned, overlapped, &onIOCompleted);
1810 
1811 		import std.stdio;
1812 		if (!success)
1813 			writeln("Failed to call ReadDirectoryChangesW: " ~ EWSAMessages[GetLastError().to!EWIN]);
1814 		
1815 	}
1816 	
1817 	@property fd_t fd() const {
1818 		return m_fd;
1819 	}
1820 	
1821 	@property HANDLE handle() const {
1822 		return cast(HANDLE) m_handle;
1823 	}
1824 	
1825 	static nothrow extern(System)
1826 	{
1827 		void onIOCompleted(DWORD dwError, DWORD cbTransferred, OVERLAPPED* overlapped)
1828 		{
1829 			import std.stdio;
1830 			DWFolderWatcher watcher = cast(DWFolderWatcher)(overlapped.Pointer);
1831 			watcher.m_bytesTransferred = cbTransferred;
1832 			try FreeListObjectAlloc!OVERLAPPED.free(overlapped); catch {}
1833 			if (dwError != 0)
1834 				try writeln("Diretory watcher error: "~EWSAMessages[dwError.to!EWIN]); catch{}
1835 			try watcher.triggerChanged();
1836 			catch (Exception e) {
1837 				try writeln("Failed to trigger change"); catch {}
1838 			}
1839 		}
1840 	}
1841 	
1842 }
1843 
1844 /**
1845 		Represents a network/socket address. (taken from vibe.core.net)
1846 */
1847 public struct NetworkAddress {
1848 	import std.c.windows.winsock : sockaddr, sockaddr_in, sockaddr_in6;
1849 	private union {
1850 		sockaddr addr;
1851 		sockaddr_in addr_ip4;
1852 		sockaddr_in6 addr_ip6;
1853 	}
1854 	
1855 	@property bool ipv6() const pure nothrow { return this.family == AF_INET6; }
1856 	
1857 	/** Family (AF_) of the socket address.
1858 		*/
1859 	@property ushort family() const pure nothrow { return addr.sa_family; }
1860 	/// ditto
1861 	@property void family(ushort val) pure nothrow { addr.sa_family = cast(ubyte)val; }
1862 	
1863 	/** The port in host byte order.
1864 		*/
1865 	@property ushort port()
1866 	const pure nothrow {
1867 		switch (this.family) {
1868 			default: assert(false, "port() called for invalid address family.");
1869 			case AF_INET: return ntoh(addr_ip4.sin_port);
1870 			case AF_INET6: return ntoh(addr_ip6.sin6_port);
1871 		}
1872 	}
1873 	/// ditto
1874 	@property void port(ushort val)
1875 	pure nothrow {
1876 		switch (this.family) {
1877 			default: assert(false, "port() called for invalid address family.");
1878 			case AF_INET: addr_ip4.sin_port = hton(val); break;
1879 			case AF_INET6: addr_ip6.sin6_port = hton(val); break;
1880 		}
1881 	}
1882 	
1883 	/** A pointer to a sockaddr struct suitable for passing to socket functions.
1884 		*/
1885 	@property inout(sockaddr)* sockAddr() inout pure nothrow { return &addr; }
1886 	
1887 	/** Size of the sockaddr struct that is returned by sockAddr().
1888 		*/
1889 	@property int sockAddrLen()
1890 	const pure nothrow {
1891 		switch (this.family) {
1892 			default: assert(false, "sockAddrLen() called for invalid address family.");
1893 			case AF_INET: return addr_ip4.sizeof;
1894 			case AF_INET6: return addr_ip6.sizeof;
1895 		}
1896 	}
1897 	
1898 	@property inout(sockaddr_in)* sockAddrInet4() inout pure nothrow
1899 	in { assert (family == AF_INET); }
1900 	body { return &addr_ip4; }
1901 	
1902 	@property inout(sockaddr_in6)* sockAddrInet6() inout pure nothrow
1903 	in { assert (family == AF_INET6); }
1904 	body { return &addr_ip6; }
1905 	
1906 	/** Returns a string representation of the IP address
1907 		*/
1908 	string toAddressString()
1909 	const {
1910 		import std.array : appender;
1911 		import std.string : format;
1912 		import std.format : formattedWrite;
1913 		
1914 		switch (this.family) {
1915 			default: assert(false, "toAddressString() called for invalid address family.");
1916 			case AF_INET:
1917 				ubyte[4] ip = (cast(ubyte*)&addr_ip4.sin_addr.s_addr)[0 .. 4];
1918 				return format("%d.%d.%d.%d", ip[0], ip[1], ip[2], ip[3]);
1919 			case AF_INET6:
1920 				ubyte[16] ip = addr_ip6.sin6_addr.s6_addr;
1921 				auto ret = appender!string();
1922 				ret.reserve(40);
1923 				foreach (i; 0 .. 8) {
1924 					if (i > 0) ret.put(':');
1925 					ret.formattedWrite("%x", bigEndianToNative!ushort(cast(ubyte[2])ip[i*2 .. i*2+2]));
1926 				}
1927 				return ret.data;
1928 		}
1929 	}
1930 	
1931 	/** Returns a full string representation of the address, including the port number.
1932 		*/
1933 	string toString()
1934 	const {
1935 		
1936 		import std.string : format;
1937 		
1938 		auto ret = toAddressString();
1939 		switch (this.family) {
1940 			default: assert(false, "toString() called for invalid address family.");
1941 			case AF_INET: return ret ~ format(":%s", port);
1942 			case AF_INET6: return format("[%s]:%s", ret, port);
1943 		}
1944 	}
1945 	
1946 }
1947 
1948 private pure nothrow {
1949 	import std.bitmanip;
1950 	
1951 	ushort ntoh(ushort val)
1952 	{
1953 		version (LittleEndian) return swapEndian(val);
1954 		else version (BigEndian) return val;
1955 		else static assert(false, "Unknown endianness.");
1956 	}
1957 	
1958 	ushort hton(ushort val)
1959 	{
1960 		version (LittleEndian) return swapEndian(val);
1961 		else version (BigEndian) return val;
1962 		else static assert(false, "Unknown endianness.");
1963 	}
1964 }
1965 enum WM_TCP_SOCKET = WM_USER+102;
1966 enum WM_UDP_SOCKET = WM_USER+103;
1967 enum WM_USER_EVENT = WM_USER+104;
1968 enum WM_USER_SIGNAL = WM_USER+105;
1969 
1970 nothrow:
1971 size_t g_idxCapacity = 8;
1972 Array!size_t g_idxAvailable;
1973 
1974 // called on run
1975 size_t createIndex() {
1976 	size_t idx;
1977 	import std.algorithm : max;
1978 	import std.range : iota;
1979 	try {
1980 		
1981 		size_t getIdx() {
1982 			
1983 			if (!g_idxAvailable.empty) {
1984 				immutable size_t ret = g_idxAvailable.back;
1985 				g_idxAvailable.removeBack();
1986 				return ret;
1987 			}
1988 			return 0;
1989 		}
1990 		
1991 		idx = getIdx();
1992 		if (idx == 0) {
1993 			import std.range : iota;
1994 			// todo: not sure about this
1995 			g_idxAvailable.insert( iota(g_idxCapacity, max(32, g_idxCapacity * 2), 1) );
1996 			g_idxCapacity = max(32, g_idxCapacity * 2);
1997 			idx = getIdx();
1998 		}
1999 		
2000 	} catch {}
2001 	
2002 	return idx;
2003 }
2004 
2005 void destroyIndex(AsyncTimer ctxt) {
2006 	try {
2007 		g_idxAvailable.insert(ctxt.id);		
2008 	}
2009 	catch (Exception e) {
2010 		assert(false, "Error destroying index: " ~ e.msg);
2011 	}
2012 }
2013 
2014 
2015 nothrow extern(System) {
2016 	LRESULT wndProc(HWND wnd, UINT msg, WPARAM wparam, LPARAM lparam)
2017 	{
2018 		auto ptr = cast(void*)GetWindowLongPtrA(wnd, GWLP_USERDATA);
2019 		if (ptr is null) 
2020 			return DefWindowProcA(wnd, msg, wparam, lparam);
2021 		auto appl = cast(EventLoopImpl*)ptr;
2022 		MSG obj = MSG(wnd, msg, wparam, lparam, DWORD.init, POINT.init);
2023 		if (appl.onMessage(obj)) {
2024 			static if (DEBUG) {
2025 				if (appl.status.code != Status.OK && appl.status.code != Status.ASYNC) {
2026 				import std.stdio : writeln;
2027 					try { writeln(appl.error, ": ", appl.m_status.text); } catch {}
2028 				}
2029 			}
2030 			return 0;
2031 		}
2032 		else return DefWindowProcA(wnd, msg, wparam, lparam);
2033 	}
2034 	
2035 	BOOL PostMessageA(HWND hWnd, UINT Msg, WPARAM wParam, LPARAM lParam);
2036 	
2037 }