1 module libasync.posix2;
2 
3 // workaround for IDE indent bug on too big files
4 mixin template RunKill()
5 {
6 
7 	fd_t run(AsyncTCPConnection ctxt, TCPEventHandler del)
8 	in { assert(ctxt.socket == fd_t.init, "TCP Connection is active. Use another instance."); }
9 	body {
10 		m_status = StatusInfo.init;
11 		import libasync.internals.socket_compat : socket, SOCK_STREAM, IPPROTO_TCP;
12 		import core.sys.posix.unistd : close;
13 		fd_t fd = ctxt.preInitializedSocket;
14 
15 		if (fd == fd_t.init)
16 			fd = socket(cast(int)ctxt.peer.family, SOCK_STREAM, IPPROTO_TCP);
17 		
18 		if (catchError!("run AsyncTCPConnection")(fd)) 
19 			return 0;
20 		
21 		/// Make sure the socket doesn't block on recv/send
22 		if (!setNonBlock(fd)) {
23 			log("Close socket");
24 			close(fd);
25 			return 0;
26 		}
27 		
28 		/// Enable Nagel's algorithm if specified
29 		if (ctxt.noDelay) {
30 			if (!setOption(fd, TCPOption.NODELAY, true)) {
31 				try log("Closing connection"); catch {}
32 				close(fd);
33 				return 0;
34 			}
35 		}
36 		
37 		/// Trigger the connection setup instructions
38 		if (!initTCPConnection(fd, ctxt, del)) {
39 			close(fd);
40 			return 0;
41 		}
42 		
43 		return fd;
44 		
45 	}
46 	
47 	fd_t run(AsyncTCPListener ctxt, TCPAcceptHandler del)
48 	in { 
49 		//assert(ctxt.socket == fd_t.init, "TCP Listener already bound. Please run another instance."); 
50 		assert(ctxt.local.addr !is typeof(ctxt.local.addr).init, "No locally binding address specified. Use AsyncTCPListener.local = EventLoop.resolve*"); 
51 	}
52 	body {
53 		m_status = StatusInfo.init;
54 		import libasync.internals.socket_compat : socket, SOCK_STREAM, socklen_t, setsockopt, SOL_SOCKET, SO_REUSEADDR, IPPROTO_TCP;
55 		import core.sys.posix.unistd : close;
56 		import core.sync.mutex;
57 		__gshared Mutex g_mtx;
58 		fd_t fd = ctxt.socket;
59 		bool reusing = true;
60 		try if (fd == 0) {
61 			reusing = false;
62 			/// Create the listening socket
63 			synchronized(g_mutex) {
64 				fd = socket(cast(int)ctxt.local.family, SOCK_STREAM, IPPROTO_TCP);
65 				if (catchError!("run AsyncTCPAccept")(fd))
66 					return 0;
67 				/// Allow multiple threads to listen on this address
68 				if (!setOption(fd, TCPOption.REUSEADDR, true)) {
69 					log("Close socket");
70 					close(fd);
71 					return 0;
72 				}
73 			} 
74 
75 			/// Make sure the socket returns instantly when calling listen()
76 			if (!setNonBlock(fd)) {
77 				log("Close socket");
78 				close(fd);
79 				return 0;
80 			}
81 
82 			// todo: defer accept
83 			
84 			/// Automatically starts connections with noDelay if specified
85 			if (ctxt.noDelay) {
86 				if (!setOption(fd, TCPOption.NODELAY, true)) {
87 					try log("Closing connection"); catch {}
88 					close(fd);
89 					return 0;
90 				}
91 			}
92 
93 		} catch { assert(false, "Error in synchronized listener starter"); }
94 
95 		/// Setup the event polling
96 		if (!initTCPListener(fd, ctxt, del, reusing)) {
97 			log("Close socket");
98 			close(fd);
99 			return 0;
100 		}
101 
102 
103 		return fd;
104 		
105 	}
106 	
107 	fd_t run(AsyncUDPSocket ctxt, UDPHandler del) {
108 		m_status = StatusInfo.init;
109 		
110 		import libasync.internals.socket_compat : socket, SOCK_DGRAM, IPPROTO_UDP;
111 		import core.sys.posix.unistd;
112 		fd_t fd = ctxt.preInitializedSocket;
113 
114 		try log("Address: " ~ ctxt.local.toString()); catch {}
115 
116 		if (fd == fd_t.init)
117 			fd = socket(cast(int)ctxt.local.family, SOCK_DGRAM, IPPROTO_UDP);
118 
119 
120 		if (catchError!("run AsyncUDPSocket")(fd))
121 			return 0;
122 		
123 		if (!setNonBlock(fd))
124 			return 0;
125 		
126 		if (!initUDPSocket(fd, ctxt, del))
127 			return 0;
128 
129 		try log("UDP Socket started FD#" ~ fd.to!string);
130 		catch{}
131 		/*
132 		static if (!EPOLL) {
133 			gs_fdPool.insert(fd);
134 		}*/
135 		
136 		return fd;
137 	}
138 	
139 	fd_t run(AsyncNotifier ctxt)
140 	{
141 		
142 		m_status = StatusInfo.init;
143 		
144 		fd_t err;
145 		static if (EPOLL) {
146 			fd_t fd = eventfd(0, EFD_NONBLOCK);
147 			
148 			if (catchSocketError!("run AsyncNotifier")(fd))
149 				return 0;
150 			
151 			epoll_event _event;
152 			_event.events = EPOLLIN | EPOLLET;
153 		}	
154 		else /* if KQUEUE */
155 		{
156 			kevent_t _event;
157 			fd_t fd = cast(fd_t)createIndex();
158 		}
159 		EventType evtype = EventType.Notifier;
160 		NotifierHandler evh;
161 		evh.ctxt = ctxt;
162 		
163 		evh.fct = (AsyncNotifier ctxt) {
164 			try {
165 				ctxt.handler();
166 			} catch (Exception e) {
167 				//setInternalError!"AsyncTimer handler"(Status.ERROR);
168 			}
169 		};
170 		
171 		EventObject eobj;
172 		eobj.notifierHandler = evh;
173 		
174 		EventInfo* evinfo;
175 		
176 		if (!ctxt.evInfo) {
177 			try evinfo = FreeListObjectAlloc!EventInfo.alloc(fd, evtype, eobj, m_instanceId);
178 			catch (Exception e) {
179 				assert(false, "Failed to allocate resources: " ~ e.msg);
180 			}
181 			
182 			ctxt.evInfo = evinfo;
183 		}
184 		
185 		static if (EPOLL) {
186 			_event.data.ptr = cast(void*) evinfo;
187 			
188 			err = epoll_ctl(m_epollfd, EPOLL_CTL_ADD, fd, &_event);
189 			if (catchSocketError!("epoll_add(eventfd)")(err))
190 				return fd_t.init;
191 		}
192 		else /* if KQUEUE */
193 		{
194 			EV_SET(&_event, fd, EVFILT_USER, EV_ADD | EV_CLEAR, NOTE_FFCOPY, 0, evinfo);
195 			
196 			err = kevent(m_kqueuefd, &_event, 1, null, 0, null);
197 			
198 			if (catchError!"kevent_addSignal"(err))
199 				return fd_t.init;
200 		}
201 		
202 		return fd;
203 		
204 		
205 	}
206 	
207 	fd_t run(shared AsyncSignal ctxt) 
208 	{
209 		static if (EPOLL) {
210 			
211 			m_status = StatusInfo.init;
212 			
213 			ctxt.evInfo = cast(shared) m_evSignal;
214 			
215 			return cast(fd_t) (__libc_current_sigrtmin());
216 		}
217 		else
218 		{
219 			m_status = StatusInfo.init;
220 			
221 			ctxt.evInfo = cast(shared) m_evSignal;
222 
223 			return cast(fd_t) createIndex(ctxt);
224 			
225 		}
226 	}
227 	
228 	fd_t run(AsyncTimer ctxt, TimerHandler del, Duration timeout) {
229 		m_status = StatusInfo.init;
230 		
231 		static if (EPOLL)
232 		{
233 			import core.sys.posix.time : itimerspec, CLOCK_REALTIME;
234 			
235 			fd_t fd = ctxt.id;
236 			itimerspec its;
237 			
238 			its.it_value.tv_sec = cast(typeof(its.it_value.tv_sec)) timeout.split!("seconds", "nsecs")().seconds;
239 			its.it_value.tv_nsec = cast(typeof(its.it_value.tv_nsec)) timeout.split!("seconds", "nsecs")().nsecs;
240 			if (!ctxt.oneShot)
241 			{
242 				its.it_interval.tv_sec = its.it_value.tv_sec;
243 				its.it_interval.tv_nsec = its.it_value.tv_nsec;
244 				
245 			}
246 			
247 			if (fd == fd_t.init) {
248 				fd = timerfd_create(CLOCK_REALTIME, 0);
249 				if (catchError!"timer_create"(fd))
250 					return 0;
251 			}
252 			int err = timerfd_settime(fd, 0, &its, null);
253 			
254 			if (catchError!"timer_settime"(err))
255 				return 0;
256 			epoll_event _event;
257 
258 			EventType evtype;			
259 			evtype = EventType.Timer;
260 			EventObject eobj;
261 			eobj.timerHandler = del;
262 			
263 			EventInfo* evinfo;
264 			
265 			if (!ctxt.evInfo) {
266 				try evinfo = FreeListObjectAlloc!EventInfo.alloc(fd, evtype, eobj, m_instanceId);
267 				catch (Exception e) {
268 					assert(false, "Failed to allocate resources: " ~ e.msg);
269 				}
270 				
271 				ctxt.evInfo = evinfo;
272 			}
273 			else {
274 				evinfo = ctxt.evInfo;
275 				evinfo.evObj = eobj;
276 			}
277 			_event.events |= EPOLLIN | EPOLLET;
278 			_event.data.ptr = evinfo;
279 			if (ctxt.id > 0) {
280 				err = epoll_ctl(m_epollfd, EPOLL_CTL_DEL, ctxt.id, null); 
281 				
282 				if (catchError!"epoll_ctl"(err))
283 					return fd_t.init;
284 			}
285 			err = epoll_ctl(m_epollfd, EPOLL_CTL_ADD, fd, &_event); 
286 			
287 			if (catchError!"timer_epoll_add"(err))
288 				return 0;
289 			return fd;
290 		}
291 		else /* if KQUEUE */
292 		{
293 			fd_t fd = ctxt.id;
294 			
295 			if (ctxt.id == 0)
296 				fd = cast(fd_t) createIndex();
297 			EventType evtype;			
298 			evtype = EventType.Timer;
299 			
300 			EventObject eobj;
301 			eobj.timerHandler = del;
302 			
303 			EventInfo* evinfo;
304 			
305 			if (!ctxt.evInfo) {
306 				try evinfo = FreeListObjectAlloc!EventInfo.alloc(fd, evtype, eobj, m_instanceId);
307 				catch (Exception e) {
308 					assert(false, "Failed to allocate resources: " ~ e.msg);
309 				}
310 				
311 				ctxt.evInfo = evinfo;
312 			}
313 			else {
314 				evinfo = ctxt.evInfo;
315 				evinfo.evObj = eobj;
316 			}
317 
318 			kevent_t _event;
319 			
320 			int msecs = cast(int) timeout.total!"msecs";
321 			
322 			// www.khmere.com/freebsd_book/html/ch06.html - EV_CLEAR set internally
323 			EV_SET(&_event, fd, EVFILT_TIMER, EV_ADD | EV_ENABLE, 0, msecs, cast(void*) evinfo);
324 			
325 			if (ctxt.oneShot)
326 				_event.flags |= EV_ONESHOT;
327 			
328 			int err = kevent(m_kqueuefd, &_event, 1, null, 0, null);
329 			
330 			if (catchError!"kevent_timer_add"(err))
331 				return 0;
332 			
333 			return fd;
334 			
335 		}
336 		
337 	}
338 	
339 	fd_t run(AsyncDirectoryWatcher ctxt, DWHandler del) {
340 
341 
342 		static if (EPOLL) 
343 		{
344 			import core.sys.linux.sys.inotify;
345 			enum IN_NONBLOCK = 0x800; // value in core.sys.linux.sys.inotify is incorrect
346 			assert(ctxt.fd == fd_t.init);
347 			int fd = inotify_init1(IN_NONBLOCK);
348 			if (catchError!"inotify_init1"(fd)) {
349 				return fd_t.init;
350 			}
351 			epoll_event _event;
352 			
353 			EventType evtype;
354 			
355 			evtype = EventType.DirectoryWatcher;
356 			EventObject eobj;
357 			eobj.dwHandler = del;
358 			
359 			EventInfo* evinfo;
360 
361 			assert (!ctxt.evInfo, "Cannot run the same DirectoryWatcher again. This should have been caught earlier...");
362 			
363 			try evinfo = FreeListObjectAlloc!EventInfo.alloc(fd, evtype, eobj, m_instanceId);
364 			catch (Exception e) {
365 				assert(false, "Failed to allocate resources: " ~ e.msg);
366 			}
367 			
368 			ctxt.evInfo = evinfo;
369 			
370 			_event.events |= EPOLLIN;
371 			_event.data.ptr = evinfo;
372 			
373 			int err = epoll_ctl(m_epollfd, EPOLL_CTL_ADD, fd, &_event); 
374 			
375 			if (catchError!"epoll_ctl"(err))
376 				return fd_t.init;
377 			return fd;
378 		} 
379 		else /* if KQUEUE */ {
380 			static size_t id = 0;
381 
382 			fd_t fd = cast(uint)++id;
383 
384 			EventType evtype;
385 			
386 			evtype = EventType.DirectoryWatcher;
387 			EventObject eobj;
388 			eobj.dwHandler = del;
389 			
390 			EventInfo* evinfo;
391 
392 			assert (!ctxt.evInfo, "Cannot run the same DirectoryWatcher again. This should have been caught earlier...");
393 
394 			try evinfo = FreeListObjectAlloc!EventInfo.alloc(fd, evtype, eobj, m_instanceId);
395 			catch (Exception e) {
396 				assert(false, "Failed to allocate resources: " ~ e.msg);
397 			}
398 			ctxt.evInfo = evinfo;
399 			try m_watchers[fd] = evinfo; catch {}
400 
401 			try m_changes[fd] = FreeListObjectAlloc!(Array!DWChangeInfo).alloc();
402 			catch (Exception e) {
403 				assert(false, "Failed to allocate resources: " ~ e.msg);
404 			}
405 
406 			/// events will be created in watch()
407 
408 			return fd;
409 
410 		}
411 	}
412 	
413 	bool kill(AsyncDirectoryWatcher ctxt) {
414 
415 		static if (EPOLL) {
416 			import core.sys.posix.unistd : close;
417 			try 
418 			{
419 				Array!(Tuple!(fd_t, uint)) remove_list;
420 				foreach (ref const Tuple!(fd_t, uint) key, ref const DWFolderInfo info; m_dwFolders) {
421 					if (info.fd == ctxt.fd)
422 						remove_list.insertBack(key);
423 				}
424 
425 				foreach (Tuple!(fd_t, uint) key; remove_list[]) {
426 					unwatch(key[0] /*fd_t*/, key[1]);
427 				}
428 
429 				close(ctxt.fd);
430 				FreeListObjectAlloc!EventInfo.free(ctxt.evInfo);
431 				ctxt.evInfo = null;
432 			}
433 			catch (Exception e)
434 			{ 
435 				setInternalError!"Kill.DirectoryWatcher"(Status.ERROR, "Error killing directory watcher"); 
436 				return false;
437 			}
438 
439 		} else /* if KQUEUE */ {
440 			try {
441 				Array!fd_t remove_list;
442 
443 				foreach (ref const fd_t wd, ref const DWFolderInfo info; m_dwFolders) {
444 					if (info.fd == ctxt.fd)
445 						remove_list.insertBack(wd);
446 				}
447 				
448 				foreach (wd; remove_list[]) {
449 					unwatch(ctxt.fd, wd); // deletes all related m_dwFolders and m_dwFiles entries
450 				}
451 
452 				FreeListObjectAlloc!(Array!DWChangeInfo).free(m_changes[ctxt.fd]);
453 				m_watchers.remove(ctxt.fd);	
454 				m_changes.remove(ctxt.fd);		
455 			}
456 			catch (Exception e) {
457 				setInternalError!"Kill.DirectoryWatcher"(Status.ERROR, "Error killing directory watcher"); 
458 				return false;
459 			}
460 		}
461 		return true;
462 	}
463 	
464 	bool kill(AsyncTCPConnection ctxt, bool forced = false)
465 	{
466 		log("Kill socket");
467 		m_status = StatusInfo.init;
468 		fd_t fd = ctxt.socket;
469 		if (ctxt.connected) {
470 			ctxt.disconnecting = true;
471 			if (forced) {
472 				if (ctxt.evInfo)
473 					try FreeListObjectAlloc!EventInfo.free(ctxt.evInfo);
474 					catch { assert(false, "Failed to free resources"); }
475 				try FreeListObjectAlloc!AsyncTCPConnection.free(ctxt);
476 				catch { assert(false, "Failed to free resources"); }
477 			}
478 			return closeSocket(fd, true, forced);
479 		}
480 		else {
481 			ctxt.disconnecting = true;
482 			if (forced) {
483 				if (ctxt.evInfo)
484 					try FreeListObjectAlloc!EventInfo.free(ctxt.evInfo);
485 					catch { assert(false, "Failed to free resources"); }
486 				try FreeListObjectAlloc!AsyncTCPConnection.free(ctxt);
487 				catch { assert(false, "Failed to free resources"); }
488 			}
489 			return true;
490 		}
491 	}
492 
493 	bool kill(AsyncTCPListener ctxt)
494 	{
495 		log("Kill listener");
496 		m_status = StatusInfo.init;
497 		nothrow void cleanup() {
498 			try FreeListObjectAlloc!EventInfo.free(ctxt.evInfo);
499 			catch { assert(false, "Failed to free resources"); }
500 			ctxt.evInfo = null;
501 		}
502 		scope(exit) {
503 			cleanup();
504 		}
505 		
506 		fd_t fd = ctxt.socket;
507 		
508 		return closeSocket(fd, false, true);
509 	}
510 	
511 	bool kill(AsyncNotifier ctxt)
512 	{
513 		static if (EPOLL)
514 		{
515 			import core.sys.posix.unistd : close;
516 			fd_t err = close(ctxt.id);
517 			
518 			if (catchError!"close(eventfd)"(err))
519 				return false;
520 			
521 			try FreeListObjectAlloc!EventInfo.free(ctxt.evInfo);
522 			catch (Exception e){ assert(false, "Error freeing resources"); }
523 			
524 			return true;
525 		}
526 		else /* if KQUEUE */
527 		{
528 			scope(exit) destroyIndex(ctxt);
529 			
530 			if (ctxt.id == fd_t.init)
531 				return false;
532 			
533 			kevent_t _event;
534 			EV_SET(&_event, ctxt.id, EVFILT_USER, EV_DELETE | EV_DISABLE, 0, 0, null);
535 			
536 			int err = kevent(m_kqueuefd, &_event, 1, null, 0, null);
537 			
538 			try FreeListObjectAlloc!EventInfo.free(ctxt.evInfo);
539 			catch (Exception e){ assert(false, "Error freeing resources"); }
540 			
541 			if (catchError!"kevent_del(notifier)"(err)) {
542 				return false;
543 			}
544 			return true;
545 		}
546 		
547 	}
548 	
549 	bool kill(shared AsyncSignal ctxt)
550 	{
551 		
552 		static if (EPOLL) {
553 			ctxt.evInfo = null;
554 		}
555 		else 
556 		{
557 			m_status = StatusInfo.init;
558 			destroyIndex(ctxt);
559 		}
560 		return true;
561 	}
562 	
563 	bool kill(AsyncTimer ctxt) {
564 		import core.sys.posix.time;
565 		m_status = StatusInfo.init;
566 		
567 		static if (EPOLL)
568 		{
569 			import core.sys.posix.unistd : close;
570 			fd_t err = close(ctxt.id);
571 			if (catchError!"timer_kill"(err))
572 				return false;
573 			
574 			if (ctxt.evInfo) {
575 				try FreeListObjectAlloc!EventInfo.free(ctxt.evInfo);
576 				catch (Exception e) { assert(false, "Failed to free resources: " ~ e.msg); }
577 				ctxt.evInfo = null;
578 			}
579 			
580 		}
581 		else /* if KQUEUE */
582 		{
583 			scope(exit) 
584 				destroyIndex(ctxt);
585 			
586 			if (ctxt.evInfo) {
587 				try FreeListObjectAlloc!EventInfo.free(ctxt.evInfo);
588 				catch (Exception e) { assert(false, "Failed to free resources: " ~ e.msg); }
589 				ctxt.evInfo = null;
590 			}
591 			kevent_t _event;
592 			EV_SET(&_event, ctxt.id, EVFILT_TIMER, EV_DELETE, 0, 0, null);
593 			int err = kevent(m_kqueuefd, &_event, 1, null, 0, null);
594 			if (catchError!"kevent_del(timer)"(err))
595 				return false;
596 		}
597 		return true;
598 	}
599 	
600 	bool kill(AsyncUDPSocket ctxt) {
601 		import core.sys.posix.unistd : close;
602 		
603 		
604 		m_status = StatusInfo.init;
605 		
606 		fd_t fd = ctxt.socket;
607 		fd_t err = close(fd);
608 		
609 		if (catchError!"udp close"(err)) 
610 			return false;
611 		
612 		static if (!EPOLL)
613 		{
614 			kevent_t[2] events;
615 			EV_SET(&(events[0]), ctxt.socket, EVFILT_READ, EV_DELETE, 0, 0, null);
616 			EV_SET(&(events[1]), ctxt.socket, EVFILT_WRITE, EV_DELETE, 0, 0, null);
617 			err = kevent(m_kqueuefd, &(events[0]), 2, null, 0, null);
618 			
619 			if (catchError!"event_del(udp)"(err)) 
620 				return false;
621 		}
622 		
623 		
624 		try FreeListObjectAlloc!EventInfo.free(ctxt.evInfo);
625 		catch (Exception e){
626 			assert(false, "Failed to free resources: " ~ e.msg);
627 		}
628 		ctxt.evInfo = null;
629 		return true;
630 	}
631 
632 }