1 module libasync.posix2;
2 import libasync.types;
3 version (Posix):
4 // workaround for IDE indent bug on too big files
5 mixin template RunKill()
6 {
7 
8 	fd_t run(AsyncTCPConnection ctxt, TCPEventHandler del)
9 	in { assert(ctxt.socket == fd_t.init, "TCP Connection is active. Use another instance."); }
10 	body {
11 		m_status = StatusInfo.init;
12 		import libasync.internals.socket_compat : socket, SOCK_STREAM, IPPROTO_TCP;
13 		import core.sys.posix.unistd : close;
14 		fd_t fd = ctxt.preInitializedSocket;
15 
16 		if (fd == fd_t.init)
17 			fd = socket(cast(int)ctxt.peer.family, SOCK_STREAM, IPPROTO_TCP);
18 
19 		if (catchError!("run AsyncTCPConnection")(fd))
20 			return 0;
21 
22 		/// Make sure the socket doesn't block on recv/send
23 		if (!setNonBlock(fd)) {
24 			static if (LOG) log("Close socket");
25 			close(fd);
26 			return 0;
27 		}
28 
29 		/// Enable Nagel's algorithm if specified
30 		if (ctxt.noDelay) {
31 			if (!setOption(fd, TCPOption.NODELAY, true)) {
32 				static if (LOG) try log("Closing connection"); catch (Throwable e) {}
33 				close(fd);
34 				return 0;
35 			}
36 		}
37 
38 		/// Trigger the connection setup instructions
39 		if (!initTCPConnection(fd, ctxt, del)) {
40 			close(fd);
41 			return 0;
42 		}
43 
44 		return fd;
45 
46 	}
47 
48 	fd_t run(AsyncUDSConnection ctxt)
49 	in { assert(ctxt.socket == fd_t.init, "UDS Connection is active. Use another instance."); }
50 	body {
51 		m_status = StatusInfo.init;
52 		import libasync.internals.socket_compat : socket, connect, SOCK_STREAM, AF_UNIX;
53 		import core.sys.posix.unistd : close;
54 
55 		auto fd = ctxt.preInitializedSocket;
56 		if (fd == fd_t.init) {
57 			fd = socket(AF_UNIX, SOCK_STREAM, 0);
58 		}
59 
60 		if (catchError!"run AsyncUDSConnection"(fd)) {
61 			return 0;
62 		}
63 
64 		// Inbound sockets are already connected
65 		if (ctxt.inbound) return fd;
66 
67 		/// Make sure the socket doesn't block on recv/send
68 		if (!setNonBlock(fd)) {
69 			static if (LOG) log("Close socket");
70 			close(fd);
71 			return 0;
72 		}
73 
74 		/// Start the connection
75 		auto err = connect(fd, ctxt.peer.name, ctxt.peer.nameLen);
76 		if (catchError!"connect"(err)) {
77 			close(fd);
78 			return 0;
79 		}
80 
81 		return fd;
82 	}
83 
84 	fd_t run(AsyncUDSListener ctxt)
85 	in {
86 		assert(ctxt.socket == fd_t.init, "UDS Listener already bound. Please run another instance.");
87 		assert(ctxt.local !is UnixAddress.init, "No locally binding address specified. Use AsyncUDSListener.local = new UnixAddress(*)");
88 	}
89 	body {
90 		import libasync.internals.socket_compat : socket, bind, listen, SOCK_STREAM, AF_UNIX, SOMAXCONN;
91 		import core.sys.posix.unistd : close, unlink;
92 		import core.sys.posix.sys.un : sockaddr_un;
93 
94 		int err;
95 		m_status = StatusInfo.init;
96 
97 		if (ctxt.unlinkFirst) {
98 			import core.stdc.errno : errno, ENOENT;
99 			err = unlink(cast(char*) (cast(sockaddr_un*) ctxt.local.name).sun_path);
100 			if (err == -1 && errno != ENOENT) {
101 				if (catchError!"unlink"(err)) {}
102 				return 0;
103 			}
104 		}
105 
106 		auto fd = ctxt.socket;
107 		if (fd == fd_t.init) {
108 			fd = socket(AF_UNIX, SOCK_STREAM, 0);
109 		}
110 
111 		if (catchError!"run AsyncUDSAccept"(fd)) {
112 			return 0;
113 		}
114 
115 		/// Make sure the socket returns instantly when calling listen()
116 		if (!setNonBlock(fd)) {
117 			static if (LOG) log("Close socket");
118 			close(fd);
119 			return 0;
120 		}
121 
122 		/// Bind and listen to socket
123 		err = bind(fd, ctxt.local.name, ctxt.local.nameLen);
124 		if (catchError!"bind"(err)) {
125 			static if (LOG) log("Close socket");
126 			close(fd);
127 			return 0;
128 		}
129 
130 		err = listen(fd, SOMAXCONN);
131 		if (catchError!"listen"(err)) {
132 			static if (LOG) log("Close socket");
133 			close(fd);
134 			return 0;
135 		}
136 
137 		return fd;
138 	}
139 
140 	fd_t run(AsyncSocket ctxt)
141 	{
142 		import core.sys.posix.unistd : close;
143 		import libasync.internals.socket_compat : socket;
144 
145 		m_status = StatusInfo.init;
146 
147 		auto fd = ctxt.preInitializedHandle;
148 
149 		if (fd == INVALID_SOCKET) {
150 			fd = socket(ctxt.info.domain, ctxt.info.type, ctxt.info.protocol);
151 		}
152 
153 		if (catchError!"socket"(fd)) {
154 			.error("Failed to create socket: ", error);
155 			return INVALID_SOCKET;
156 		}
157 
158 		if (!setNonBlock(fd)) {
159 			.error("Failed to set socket non-blocking");
160 			close(fd);
161 			return INVALID_SOCKET;
162 		}
163 
164 		import core.sys.posix.unistd;
165 
166 		EventObject eventObject;
167 		eventObject.socket = ctxt;
168 		EventInfo* eventInfo = assumeWontThrow(ThreadMem.alloc!EventInfo(fd, EventType.Socket, eventObject, m_instanceId));
169 
170 		ctxt.evInfo = eventInfo;
171 		nothrow auto cleanup() {
172 			assumeWontThrow(ThreadMem.free(eventInfo));
173 			ctxt.evInfo = null;
174 			// socket must be closed by the caller
175 			return INVALID_SOCKET;
176 		}
177 
178 		fd_t err;
179 		static if (EPOLL)
180 		{
181 			epoll_event osEvent;
182 			osEvent.data.ptr = eventInfo;
183 			osEvent.events = EPOLLET | EPOLLIN | EPOLLOUT | EPOLLERR | EPOLLHUP | EPOLLRDHUP;
184 			err = epoll_ctl(m_epollfd, EPOLL_CTL_ADD, fd, &osEvent);
185 			if (catchError!"epoll_ctl"(err)) {
186 				return cleanup();
187 			}
188 		}
189 		else /* if KQUEUE */
190 		{
191 			kevent_t[2] osEvent;
192 			EV_SET(&(osEvent[0]), fd, EVFILT_READ, EV_ADD | EV_ENABLE | EV_CLEAR, 0, 0, eventInfo);
193 			EV_SET(&(osEvent[1]), fd, EVFILT_WRITE, EV_ADD | EV_ENABLE | EV_CLEAR, 0, 0, eventInfo);
194 			err = kevent(m_kqueuefd, &(osEvent[0]), 2, null, 0, null);
195 			if (catchError!"kevent_add_udp"(err)) {
196 				return cleanup();
197 			}
198 		}
199 
200 		return fd;
201 	}
202 
203 	import libasync.internals.socket_compat : sockaddr, socklen_t;
204 	bool bind(AsyncSocket ctxt, sockaddr* addr, socklen_t addrlen)
205 	{
206 		import libasync.internals.socket_compat : bind;
207 
208 		auto err = bind(ctxt.handle, addr, addrlen);
209 		if (catchError!"bind"(err)) {
210 			.error("Failed to bind socket: ", error);
211 			assumeWontThrow(ThreadMem.free(ctxt.evInfo));
212 			ctxt.evInfo = null;
213 			return false;
214 		}
215 
216 		return true;
217 	}
218 
219 	bool connect(AsyncSocket ctxt, sockaddr* addr, socklen_t addrlen)
220 	{
221 		import libasync.internals.socket_compat : connect;
222 
223 		auto err = connect(ctxt.handle, addr, addrlen);
224 		if (catchErrorsEq!"connect"(err, [ tuple(cast(fd_t) SOCKET_ERROR, EPosix.EINPROGRESS, Status.ASYNC) ])) {
225 			return true;
226 		} else if (catchError!"connect"(err)) {
227 			.error("Failed to connect socket: ", error);
228 			assumeWontThrow(ThreadMem.free(ctxt.evInfo));
229 			ctxt.evInfo = null;
230 			return false;
231 		}
232 
233 		return true;
234 	}
235 
236 	bool listen(AsyncSocket ctxt, int backlog)
237 	{
238 		import libasync.internals.socket_compat : listen;
239 
240 		auto err = listen(ctxt.handle, backlog);
241 		if (catchError!"bind"(err)) {
242 			.error("Failed to listen on socket: ", error);
243 			assumeWontThrow(ThreadMem.free(ctxt.evInfo));
244 			ctxt.evInfo = null;
245 			return false;
246 		}
247 
248 		return true;
249 	}
250 
251 	bool kill(AsyncSocket ctxt, bool forced = false) {
252 		m_status = StatusInfo.init;
253 
254 		import core.sys.posix.unistd : close;
255 
256 		auto fd = ctxt.resetHandle();
257 
258 		static if (EPOLL)
259 		{
260 			epoll_event osEvent = void;
261 			epoll_ctl(m_epollfd, EPOLL_CTL_DEL, fd, &osEvent);
262 		}
263 		else /* if KQUEUE */
264 		{
265 			kevent_t osEvent;
266 			EV_SET(&osEvent, fd, EVFILT_READ, EV_DELETE | EV_DISABLE, 0, 0, null);
267 			EV_SET(&osEvent, fd, EVFILT_WRITE, EV_DELETE | EV_DISABLE, 0, 0, null);
268 		}
269 
270 		if (ctxt.connectionOriented && ctxt.passive) {
271 			foreach (request; m_completedSocketAccepts) if (request.socket is ctxt) {
272 				m_completedSocketAccepts.removeFront();
273 				auto socket = request.socket;
274 				auto peer = request.onComplete(request.peer, request.family, socket.info.type, socket.info.protocol);
275 				assumeWontThrow(AsyncAcceptRequest.free(request));
276 				if (!peer.run) return false;
277 			}
278 		}
279 
280 		if (!ctxt.passive) {
281 			foreach (request; m_completedSocketReceives) if (request.socket is ctxt) {
282 				m_completedSocketReceives.removeFront();
283 				if (request.message) {
284 					assumeWontThrow(request.onComplete.get!0)(request.message.transferred);
285 					assumeWontThrow(NetworkMessage.free(request.message));
286 				} else {
287 					assumeWontThrow(request.onComplete.get!1)();
288 				}
289 				assumeWontThrow(AsyncReceiveRequest.free(request));
290 			}
291 
292 			foreach (request; m_completedSocketSends) if (request.socket is ctxt) {
293 				m_completedSocketSends.removeFront();
294 				request.onComplete();
295 				assumeWontThrow(NetworkMessage.free(request.message));
296 				assumeWontThrow(AsyncSendRequest.free(request));
297 			}
298 		}
299 
300 		foreach (request; ctxt.m_pendingReceives) {
301 			ctxt.m_pendingReceives.removeFront();
302 			if (request.message) assumeWontThrow(NetworkMessage.free(request.message));
303 			assumeWontThrow(AsyncReceiveRequest.free(request));
304 		}
305 
306 		foreach (request; ctxt.m_pendingSends) {
307 			ctxt.m_pendingSends.removeFront();
308 			assumeWontThrow(NetworkMessage.free(request.message));
309 			assumeWontThrow(AsyncSendRequest.free(request));
310 		}
311 
312 		if (ctxt.connectionOriented && !ctxt.passive && ctxt.connected) {
313 			bool has_socket = fd != INVALID_SOCKET;
314 			ctxt.disconnecting = true;
315 
316 			if (forced) {
317 				ctxt.connected = false;
318 				ctxt.disconnecting = false;
319 				if (ctxt.evInfo !is null) {
320 					assumeWontThrow(ThreadMem.free(ctxt.evInfo));
321 					ctxt.evInfo = null;
322 				}
323 			}
324 
325 			return has_socket ? closeSocket(fd, true, forced) : true;
326 		}
327 
328 		fd_t err = close(fd);
329 		if (catchError!"socket close"(err)) {
330 			return false;
331 		}
332 
333 		if (ctxt.evInfo !is null) {
334 			assumeWontThrow(ThreadMem.free(ctxt.evInfo));
335 			ctxt.evInfo = null;
336 		}
337 
338 		return true;
339 	}
340 
341 	bool run(AsyncEvent ctxt, EventHandler del)
342 	{
343 		fd_t fd = ctxt.id;
344 		import libasync.internals.socket_compat : bind;
345 		import core.sys.posix.unistd;
346 
347 		fd_t err;
348 
349 		EventObject eo;
350 		eo.eventHandler = del;
351 		EventInfo* ev;
352 		try ev = ThreadMem.alloc!EventInfo(fd, EventType.Event, eo, m_instanceId);
353 		catch (Exception e){ assert(false, "Allocation error"); }
354 		ctxt.evInfo = ev;
355 		nothrow bool closeAll() {
356 			try ThreadMem.free(ev);
357 			catch(Exception e){ assert(false, "Failed to free resources"); }
358 			ctxt.evInfo = null;
359 			// fd must be closed by the caller if return false
360 			return false;
361 		}
362 
363 		static if (EPOLL)
364 		{
365 			epoll_event _event;
366 			_event.data.ptr = ev;
367 			_event.events = EPOLLIN | EPOLLOUT | EPOLLET;
368 			err = epoll_ctl(m_epollfd, EPOLL_CTL_ADD, fd, &_event);
369 			if (catchError!"epoll_ctl"(err)) {
370 				return closeAll();
371 			}
372 		}
373 		else /* if KQUEUE */
374 		{
375 			kevent_t[2] _event;
376 			EV_SET(&(_event[0]), fd, EVFILT_READ, EV_ADD | EV_ENABLE | EV_CLEAR, 0, 0, ev);
377 			EV_SET(&(_event[1]), fd, EVFILT_WRITE, EV_ADD | EV_ENABLE | EV_CLEAR, 0, 0, ev);
378 			err = kevent(m_kqueuefd, &(_event[0]), 2, null, 0, null);
379 			if (catchError!"kevent_add_udp"(err))
380 				return closeAll();
381 
382 		}
383 		return true;
384 	}
385 
386 	fd_t run(AsyncTCPListener ctxt, TCPAcceptHandler del)
387 	in {
388 		//assert(ctxt.socket == fd_t.init, "TCP Listener already bound. Please run another instance.");
389 		assert(ctxt.local.addr !is typeof(ctxt.local.addr).init, "No locally binding address specified. Use AsyncTCPListener.local = EventLoop.resolve*");
390 	}
391 	body {
392 		m_status = StatusInfo.init;
393 		import libasync.internals.socket_compat : socket, SOCK_STREAM, socklen_t, setsockopt, SOL_SOCKET, SO_REUSEADDR, IPPROTO_TCP;
394 		import core.sys.posix.unistd : close;
395 		import core.sync.mutex;
396 		fd_t fd = ctxt.socket;
397 		bool reusing = true;
398 		try if (fd == 0) {
399 			reusing = false;
400 			/// Create the listening socket
401 			synchronized(g_mutex) {
402 				fd = socket(cast(int)ctxt.local.family, SOCK_STREAM, IPPROTO_TCP);
403 				if (catchError!("run AsyncTCPAccept")(fd))
404 					return 0;
405 				/// Allow multiple threads to listen on this address
406 				if (!setOption(fd, TCPOption.REUSEADDR, true)) {
407 					static if (LOG) log("Close socket");
408 					close(fd);
409 					return 0;
410 				}
411 				if (!setOption(fd, TCPOption.REUSEPORT, true)) {
412 					static if (LOG) log("Close socket");
413 					close(fd);
414 					return 0;
415 				}
416 			}
417 
418 			/// Make sure the socket returns instantly when calling listen()
419 			if (!setNonBlock(fd)) {
420 				static if (LOG) log("Close socket");
421 				close(fd);
422 				return 0;
423 			}
424 
425 			// todo: defer accept
426 
427 			/// Automatically starts connections with noDelay if specified
428 			if (ctxt.noDelay) {
429 				if (!setOption(fd, TCPOption.NODELAY, true)) {
430 					static if (LOG) try log("Closing connection"); catch (Throwable e) {}
431 					close(fd);
432 					return 0;
433 				}
434 			}
435 
436 		} catch (Throwable e) { assert(false, "Error in synchronized listener starter"); }
437 
438 		/// Setup the event polling
439 		if (!initTCPListener(fd, ctxt, del, reusing)) {
440 			static if (LOG) log("Close socket");
441 			close(fd);
442 			return 0;
443 		}
444 
445 
446 		return fd;
447 
448 	}
449 
450 	fd_t run(AsyncUDPSocket ctxt, UDPHandler del) {
451 		m_status = StatusInfo.init;
452 
453 		import libasync.internals.socket_compat : socket, SOCK_DGRAM, IPPROTO_UDP;
454 		import core.sys.posix.unistd;
455 		fd_t fd = ctxt.preInitializedSocket;
456 
457 		static if (LOG) try log("Address: " ~ ctxt.local.toString()); catch (Throwable e) {}
458 
459 		if (fd == fd_t.init)
460 			fd = socket(cast(int)ctxt.local.family, SOCK_DGRAM, IPPROTO_UDP);
461 
462 
463 		if (catchError!("run AsyncUDPSocket")(fd))
464 			return 0;
465 
466 		if (!setNonBlock(fd)) {
467 			close(fd);
468 			return 0;
469 		}
470 
471 		if (!initUDPSocket(fd, ctxt, del)) {
472 			close(fd);
473 			return 0;
474 		}
475 
476 		static if (LOG) try log("UDP Socket started FD#" ~ fd.to!string);
477 		catch{}
478 		/*
479 		static if (!EPOLL) {
480 			gs_fdPool.insert(fd);
481 		}*/
482 
483 		return fd;
484 	}
485 
486 	fd_t run(AsyncNotifier ctxt)
487 	{
488 
489 		m_status = StatusInfo.init;
490 
491 		fd_t err;
492 		static if (EPOLL) {
493 			fd_t fd = eventfd(0, EFD_NONBLOCK);
494 
495 			if (catchSocketError!("run AsyncNotifier")(fd))
496 				return 0;
497 
498 			epoll_event _event;
499 			_event.events = EPOLLIN | EPOLLET;
500 		}
501 		else /* if KQUEUE */
502 		{
503 			kevent_t _event;
504 			fd_t fd = cast(fd_t)createIndex();
505 		}
506 		EventType evtype = EventType.Notifier;
507 		NotifierHandler evh;
508 		evh.ctxt = ctxt;
509 
510 		evh.fct = (AsyncNotifier ctxt) {
511 			try {
512 				ctxt.handler();
513 			} catch (Exception e) {
514 				//setInternalError!"AsyncTimer handler"(Status.ERROR);
515 			}
516 		};
517 
518 		EventObject eobj;
519 		eobj.notifierHandler = evh;
520 
521 		EventInfo* evinfo;
522 
523 		if (!ctxt.evInfo) {
524 			try evinfo = ThreadMem.alloc!EventInfo(fd, evtype, eobj, m_instanceId);
525 			catch (Exception e) {
526 				assert(false, "Failed to allocate resources: " ~ e.msg);
527 			}
528 
529 			ctxt.evInfo = evinfo;
530 		}
531 
532 		static if (EPOLL) {
533 			_event.data.ptr = cast(void*) evinfo;
534 
535 			err = epoll_ctl(m_epollfd, EPOLL_CTL_ADD, fd, &_event);
536 			if (catchSocketError!("epoll_add(eventfd)")(err))
537 				return fd_t.init;
538 		}
539 		else /* if KQUEUE */
540 		{
541 			EV_SET(&_event, fd, EVFILT_USER, EV_ADD | EV_CLEAR, NOTE_FFCOPY, 0, evinfo);
542 
543 			err = kevent(m_kqueuefd, &_event, 1, null, 0, null);
544 
545 			if (catchError!"kevent_addSignal"(err))
546 				return fd_t.init;
547 		}
548 
549 		return fd;
550 
551 
552 	}
553 
554 	fd_t run(shared AsyncSignal ctxt)
555 	{
556 		static if (EPOLL) {
557 
558 			m_status = StatusInfo.init;
559 
560 			ctxt.evInfo = cast(shared) m_evSignal;
561 
562 			return cast(fd_t) (__libc_current_sigrtmin());
563 		}
564 		else
565 		{
566 			m_status = StatusInfo.init;
567 
568 			ctxt.evInfo = cast(shared) m_evSignal;
569 
570 			return cast(fd_t) createIndex(ctxt);
571 
572 		}
573 	}
574 
575 	fd_t run(AsyncTimer ctxt, TimerHandler del, Duration timeout) {
576 		m_status = StatusInfo.init;
577 
578 		static if (EPOLL)
579 		{
580 			import core.sys.posix.time : itimerspec, CLOCK_REALTIME;
581 
582 			fd_t fd = ctxt.id;
583 			itimerspec its;
584 
585 			its.it_value.tv_sec = cast(typeof(its.it_value.tv_sec)) timeout.split!("seconds", "nsecs")().seconds;
586 			its.it_value.tv_nsec = cast(typeof(its.it_value.tv_nsec)) timeout.split!("seconds", "nsecs")().nsecs;
587 			if (!ctxt.oneShot)
588 			{
589 				its.it_interval.tv_sec = its.it_value.tv_sec;
590 				its.it_interval.tv_nsec = its.it_value.tv_nsec;
591 
592 			}
593 
594 			if (fd == fd_t.init) {
595 				fd = timerfd_create(CLOCK_REALTIME, 0);
596 				if (catchError!"timer_create"(fd))
597 					return 0;
598 			}
599 			int err = timerfd_settime(fd, 0, &its, null);
600 
601 			if (catchError!"timer_settime"(err))
602 				return 0;
603 			epoll_event _event;
604 
605 			EventType evtype;
606 			evtype = EventType.Timer;
607 			EventObject eobj;
608 			eobj.timerHandler = del;
609 
610 			EventInfo* evinfo;
611 
612 			if (!ctxt.evInfo) {
613 				try evinfo = ThreadMem.alloc!EventInfo(fd, evtype, eobj, m_instanceId);
614 				catch (Exception e) {
615 					assert(false, "Failed to allocate resources: " ~ e.msg);
616 				}
617 
618 				ctxt.evInfo = evinfo;
619 			}
620 			else {
621 				evinfo = ctxt.evInfo;
622 				evinfo.evObj = eobj;
623 			}
624 			_event.events |= EPOLLIN | EPOLLET;
625 			_event.data.ptr = evinfo;
626 			if (ctxt.id > 0) {
627 				err = epoll_ctl(m_epollfd, EPOLL_CTL_DEL, ctxt.id, null);
628 
629 				if (catchError!"epoll_ctl"(err))
630 					return fd_t.init;
631 			}
632 			err = epoll_ctl(m_epollfd, EPOLL_CTL_ADD, fd, &_event);
633 
634 			if (catchError!"timer_epoll_add"(err))
635 				return 0;
636 			return fd;
637 		}
638 		else /* if KQUEUE */
639 		{
640 			fd_t fd = ctxt.id;
641 
642 			if (ctxt.id == 0)
643 				fd = cast(fd_t) createIndex();
644 			EventType evtype;
645 			evtype = EventType.Timer;
646 
647 			EventObject eobj;
648 			eobj.timerHandler = del;
649 
650 			EventInfo* evinfo;
651 
652 			if (!ctxt.evInfo) {
653 				try evinfo = ThreadMem.alloc!EventInfo(fd, evtype, eobj, m_instanceId);
654 				catch (Exception e) {
655 					assert(false, "Failed to allocate resources: " ~ e.msg);
656 				}
657 
658 				ctxt.evInfo = evinfo;
659 			}
660 			else {
661 				evinfo = ctxt.evInfo;
662 				evinfo.evObj = eobj;
663 			}
664 
665 			kevent_t _event;
666 			int msecs = cast(int) timeout.total!"msecs";
667 			ushort flags_ = EV_ADD | EV_ENABLE;
668 			//if (ctxt.oneShot)
669 			//	flags_ |= EV_CLEAR;
670 
671 			// www.khmere.com/freebsd_book/html/ch06.html - EV_CLEAR set internally
672 			EV_SET(&_event, fd, EVFILT_TIMER, flags_, 0, msecs + 30, cast(void*) evinfo);
673 
674 			int err = kevent(m_kqueuefd, &_event, 1, null, 0, null);
675 
676 			if (catchError!"kevent_timer_add"(err))
677 				return 0;
678 
679 			return fd;
680 
681 		}
682 
683 	}
684 
685 	fd_t run(AsyncDirectoryWatcher ctxt, DWHandler del) {
686 
687 
688 		static if (EPOLL)
689 		{
690 			import core.sys.linux.sys.inotify;
691 			enum IN_NONBLOCK = 0x800; // value in core.sys.linux.sys.inotify is incorrect
692 			assert(ctxt.fd == fd_t.init);
693 			int fd = inotify_init1(IN_NONBLOCK);
694 			if (catchError!"inotify_init1"(fd)) {
695 				return fd_t.init;
696 			}
697 			epoll_event _event;
698 
699 			EventType evtype;
700 
701 			evtype = EventType.DirectoryWatcher;
702 			EventObject eobj;
703 			eobj.dwHandler = del;
704 
705 			EventInfo* evinfo;
706 
707 			assert (!ctxt.evInfo, "Cannot run the same DirectoryWatcher again. This should have been caught earlier...");
708 
709 			try evinfo = ThreadMem.alloc!EventInfo(fd, evtype, eobj, m_instanceId);
710 			catch (Exception e) {
711 				assert(false, "Failed to allocate resources: " ~ e.msg);
712 			}
713 
714 			ctxt.evInfo = evinfo;
715 
716 			_event.events |= EPOLLIN;
717 			_event.data.ptr = evinfo;
718 
719 			int err = epoll_ctl(m_epollfd, EPOLL_CTL_ADD, fd, &_event);
720 
721 			if (catchError!"epoll_ctl"(err))
722 				return fd_t.init;
723 			return fd;
724 		}
725 		else /* if KQUEUE */ {
726 			static size_t id = 0;
727 
728 			fd_t fd = cast(uint)++id;
729 
730 			EventType evtype;
731 
732 			evtype = EventType.DirectoryWatcher;
733 			EventObject eobj;
734 			eobj.dwHandler = del;
735 
736 			EventInfo* evinfo;
737 
738 			assert (!ctxt.evInfo, "Cannot run the same DirectoryWatcher again. This should have been caught earlier...");
739 
740 			try evinfo = ThreadMem.alloc!EventInfo(fd, evtype, eobj, m_instanceId);
741 			catch (Exception e) {
742 				assert(false, "Failed to allocate resources: " ~ e.msg);
743 			}
744 			ctxt.evInfo = evinfo;
745 			try m_watchers[fd] = evinfo; catch (Throwable e) {}
746 
747 			try m_changes[fd] = ThreadMem.alloc!(Array!DWChangeInfo)();
748 			catch (Exception e) {
749 				assert(false, "Failed to allocate resources: " ~ e.msg);
750 			}
751 
752 			/// events will be created in watch()
753 
754 			return fd;
755 
756 		}
757 	}
758 
759 	AsyncUDSConnection accept(AsyncUDSListener ctxt) {
760 		import core.stdc.errno : errno, EAGAIN, EWOULDBLOCK;
761 		import libasync.internals.socket_compat : accept;
762 
763 		auto clientSocket = accept(ctxt.socket, null, null);
764 		if (clientSocket == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
765 			// No more new incoming connections
766 			return null;
767 		} else if (catchError!".accept"(clientSocket)) {
768 			// An error occured
769 			return null;
770 		}
771 
772 		// Allocate a new connection handler
773 		AsyncUDSConnection conn;
774 		try conn = ThreadMem.alloc!AsyncUDSConnection(ctxt.m_evLoop, clientSocket);
775 		catch (Exception e){ assert(false, "Allocation failure"); }
776 		conn.inbound = true;
777 
778 		return conn;
779 	}
780 
781 	bool kill(AsyncDirectoryWatcher ctxt) {
782 
783 		static if (EPOLL) {
784 			import core.sys.posix.unistd : close;
785 			try
786 			{
787 				Array!(Tuple!(fd_t, uint)) remove_list;
788 				foreach (ref const Tuple!(fd_t, uint) key, ref const DWFolderInfo info; m_dwFolders) {
789 					if (info.fd == ctxt.fd)
790 						remove_list.insertBack(key);
791 				}
792 
793 				foreach (Tuple!(fd_t, uint) key; remove_list[]) {
794 					unwatch(key[0] /*fd_t*/, key[1]);
795 				}
796 
797 				close(ctxt.fd);
798 				ThreadMem.free(ctxt.evInfo);
799 				ctxt.evInfo = null;
800 			}
801 			catch (Exception e)
802 			{
803 				setInternalError!"Kill.DirectoryWatcher"(Status.ERROR, "Error killing directory watcher");
804 				return false;
805 			}
806 
807 		} else /* if KQUEUE */ {
808 			try {
809 				Array!fd_t remove_list;
810 
811 				foreach (ref const fd_t wd, ref const DWFolderInfo info; m_dwFolders) {
812 					if (info.fd == ctxt.fd)
813 						remove_list.insertBack(wd);
814 				}
815 
816 				foreach (wd; remove_list[]) {
817 					unwatch(ctxt.fd, wd); // deletes all related m_dwFolders and m_dwFiles entries
818 				}
819 
820 				ThreadMem.free(m_changes[ctxt.fd]);
821 				m_watchers.remove(ctxt.fd);
822 				m_changes.remove(ctxt.fd);
823 			}
824 			catch (Exception e) {
825 				setInternalError!"Kill.DirectoryWatcher"(Status.ERROR, "Error killing directory watcher");
826 				return false;
827 			}
828 		}
829 		return true;
830 	}
831 
832 	bool kill(AsyncTCPConnection ctxt, bool forced = false)
833 	{
834 		static if (LOG) log("Kill socket");
835 		m_status = StatusInfo.init;
836 		fd_t fd = ctxt.socket;
837 		bool has_socket = fd > 0;
838 		ctxt.disconnecting = true;
839 		if (forced) {
840 			ctxt.connected = false;
841 			ctxt.disconnecting = false;
842 			if (ctxt.evInfo) {
843 				try ThreadMem.free(ctxt.evInfo);
844 				catch (Throwable e) { assert(false, "Failed to free resources"); }
845 				ctxt.evInfo = null;
846 			}
847 			if (ctxt.inbound) try ThreadMem.free(ctxt);
848 			catch (Throwable t) { assert(false, "Failed to free resources for context " ~ (cast(void*)ctxt).to!string ~ ": " ~ t.to!string); }
849 		}
850 		return has_socket ? closeSocket(fd, true, forced) : true;
851 	}
852 
853 	bool kill(AsyncTCPListener ctxt)
854 	{
855 		static if (LOG) log("Kill listener");
856 		m_status = StatusInfo.init;
857 		nothrow void cleanup() {
858 			try ThreadMem.free(ctxt.evInfo);
859 			catch (Throwable e) { assert(false, "Failed to free resources"); }
860 			ctxt.evInfo = null;
861 		}
862 		scope(exit) {
863 			cleanup();
864 		}
865 
866 		fd_t fd = ctxt.socket;
867 
868 		return closeSocket(fd, false, true);
869 	}
870 
871 	bool kill(AsyncNotifier ctxt)
872 	{
873 		static if (EPOLL)
874 		{
875 			import core.sys.posix.unistd : close;
876 			fd_t err = close(ctxt.id);
877 
878 			if (catchError!"close(eventfd)"(err))
879 				return false;
880 
881 			try ThreadMem.free(ctxt.evInfo);
882 			catch (Exception e){ assert(false, "Error freeing resources"); }
883 
884 			return true;
885 		}
886 		else /* if KQUEUE */
887 		{
888 			scope(exit) destroyIndex(ctxt);
889 
890 			if (ctxt.id == fd_t.init)
891 				return false;
892 
893 			kevent_t _event;
894 			EV_SET(&_event, ctxt.id, EVFILT_USER, EV_DELETE | EV_DISABLE, 0, 0, null);
895 
896 			int err = kevent(m_kqueuefd, &_event, 1, null, 0, null);
897 
898 			try ThreadMem.free(ctxt.evInfo);
899 			catch (Exception e){ assert(false, "Error freeing resources"); }
900 
901 			if (catchError!"kevent_del(notifier)"(err)) {
902 				return false;
903 			}
904 			return true;
905 		}
906 
907 	}
908 
909 	bool kill(shared AsyncSignal ctxt)
910 	{
911 
912 		static if (EPOLL) {
913 			ctxt.evInfo = null;
914 		}
915 		else
916 		{
917 			m_status = StatusInfo.init;
918 			destroyIndex(ctxt);
919 		}
920 		return true;
921 	}
922 
923 	bool kill(AsyncTimer ctxt) {
924 		import core.sys.posix.time;
925 		m_status = StatusInfo.init;
926 
927 		static if (EPOLL)
928 		{
929 			import core.sys.posix.unistd : close;
930 			fd_t err = close(ctxt.id);
931 			if (catchError!"timer_kill"(err))
932 				return false;
933 
934 			if (ctxt.evInfo) {
935 				try ThreadMem.free(ctxt.evInfo);
936 				catch (Exception e) { assert(false, "Failed to free resources: " ~ e.msg); }
937 				ctxt.evInfo = null;
938 			}
939 
940 		}
941 		else /* if KQUEUE */
942 		{
943 			scope(exit)
944 				destroyIndex(ctxt);
945 
946 			if (ctxt.evInfo) {
947 				try ThreadMem.free(ctxt.evInfo);
948 				catch (Exception e) { assert(false, "Failed to free resources: " ~ e.msg); }
949 				ctxt.evInfo = null;
950 			}
951 			kevent_t _event;
952 			EV_SET(&_event, ctxt.id, EVFILT_TIMER, EV_DELETE, 0, 0, null);
953 			int err = kevent(m_kqueuefd, &_event, 1, null, 0, null);
954 			if (catchError!"kevent_del(timer)"(err))
955 				return false;
956 		}
957 		return true;
958 	}
959 
960 	bool kill(AsyncUDPSocket ctxt) {
961 		import core.sys.posix.unistd : close;
962 
963 
964 		m_status = StatusInfo.init;
965 
966 		fd_t fd = ctxt.socket;
967 		fd_t err = close(fd);
968 
969 		if (catchError!"udp close"(err))
970 			return false;
971 
972 		static if (!EPOLL)
973 		{
974 			kevent_t[2] events;
975 			EV_SET(&(events[0]), ctxt.socket, EVFILT_READ, EV_DELETE, 0, 0, null);
976 			EV_SET(&(events[1]), ctxt.socket, EVFILT_WRITE, EV_DELETE, 0, 0, null);
977 			err = kevent(m_kqueuefd, &(events[0]), 2, null, 0, null);
978 
979 			if (catchError!"event_del(udp)"(err))
980 				return false;
981 		}
982 
983 		try ThreadMem.free(ctxt.evInfo);
984 		catch (Exception e){
985 			assert(false, "Failed to free resources: " ~ e.msg);
986 		}
987 		ctxt.evInfo = null;
988 		return true;
989 	}
990 
991 	bool kill(AsyncEvent ctxt, bool forced = false) {
992 		import core.sys.posix.unistd : close;
993 
994 		static if (LOG) log("Kill event");
995 		m_status = StatusInfo.init;
996 		fd_t fd = ctxt.id;
997 
998 		if (ctxt.stateful) {
999 			bool has_socket = fd > 0;
1000 			ctxt.disconnecting = true;
1001 
1002 			if (forced) {
1003 				ctxt.connected = false;
1004 				ctxt.disconnecting = false;
1005 				if (ctxt.evInfo) {
1006 					try ThreadMem.free(ctxt.evInfo);
1007 					catch (Exception e) {
1008 						assert(false, "Failed to free resources: " ~ e.msg);
1009 					}
1010 					ctxt.evInfo = null;
1011 				}
1012 			}
1013 
1014 			return has_socket ? closeSocket(fd, true, forced) : true;
1015 		}
1016 
1017 		fd_t err = close(fd);
1018 		if (catchError!"event close"(err))
1019 			return false;
1020 
1021 		if (ctxt.evInfo) {
1022 			try ThreadMem.free(ctxt.evInfo);
1023 			catch (Exception e) {
1024 				assert(false, "Failed to free resources: " ~ e.msg);
1025 			}
1026 			ctxt.evInfo = null;
1027 		}
1028 
1029 		return true;
1030 	}
1031 
1032 }