1 module libasync.posix2;
2 
3 // workaround for IDE indent bug on too big files
4 mixin template RunKill()
5 {
6 
7 	fd_t run(AsyncTCPConnection ctxt, TCPEventHandler del)
8 	in { assert(ctxt.socket == fd_t.init, "TCP Connection is active. Use another instance."); }
9 	body {
10 		m_status = StatusInfo.init;
11 		import libasync.internals.socket_compat : socket, SOCK_STREAM;
12 		import core.sys.posix.unistd : close;
13 		
14 		fd_t fd = socket(cast(int)ctxt.peer.family, SOCK_STREAM, 0);
15 		
16 		if (catchError!("run AsyncTCPConnection")(fd)) 
17 			return 0;
18 		
19 		/// Make sure the socket doesn't block on recv/send
20 		if (!setNonBlock(fd)) {
21 			log("Close socket");
22 			close(fd);
23 			return 0;
24 		}
25 		
26 		/// Enable Nagel's algorithm if specified
27 		if (ctxt.noDelay) {
28 			if (!setOption(fd, TCPOption.NODELAY, true)) {
29 				try log("Closing connection"); catch {}
30 				close(fd);
31 				return 0;
32 			}
33 		}
34 		
35 		/// Trigger the connection setup instructions
36 		if (!initTCPConnection(fd, ctxt, del)) {
37 			close(fd);
38 			return 0;
39 		}
40 		
41 		return fd;
42 		
43 	}
44 	
45 	fd_t run(AsyncTCPListener ctxt, TCPAcceptHandler del)
46 	in { 
47 		//assert(ctxt.socket == fd_t.init, "TCP Listener already bound. Please run another instance."); 
48 		assert(ctxt.local.addr !is typeof(ctxt.local.addr).init, "No locally binding address specified. Use AsyncTCPListener.local = EventLoop.resolve*"); 
49 	}
50 	body {
51 		m_status = StatusInfo.init;
52 		import libasync.internals.socket_compat : socket, SOCK_STREAM, socklen_t, setsockopt, SOL_SOCKET, SO_REUSEADDR, IPPROTO_TCP;
53 		import core.sys.posix.unistd : close;
54 		import core.sync.mutex;
55 		__gshared Mutex g_mtx;
56 		fd_t fd = ctxt.socket;
57 		bool reusing = true;
58 		try if (fd == 0) {
59 			reusing = false;
60 			/// Create the listening socket
61 			synchronized(g_mutex) {
62 				fd = socket(cast(int)ctxt.local.family, SOCK_STREAM, IPPROTO_TCP);
63 				if (catchError!("run AsyncTCPAccept")(fd))
64 					return 0;
65 				/// Allow multiple threads to listen on this address
66 				if (!setOption(fd, TCPOption.REUSEADDR, true)) {
67 					log("Close socket");
68 					close(fd);
69 					return 0;
70 				}
71 			} 
72 
73 			/// Make sure the socket returns instantly when calling listen()
74 			if (!setNonBlock(fd)) {
75 				log("Close socket");
76 				close(fd);
77 				return 0;
78 			}
79 
80 			// todo: defer accept
81 			
82 			/// Automatically starts connections with noDelay if specified
83 			if (ctxt.noDelay) {
84 				if (!setOption(fd, TCPOption.NODELAY, true)) {
85 					try log("Closing connection"); catch {}
86 					close(fd);
87 					return 0;
88 				}
89 			}
90 
91 		} catch { assert(false, "Error in synchronized listener starter"); }
92 
93 		/// Setup the event polling
94 		if (!initTCPListener(fd, ctxt, del, reusing)) {
95 			log("Close socket");
96 			close(fd);
97 			return 0;
98 		}
99 
100 
101 		return fd;
102 		
103 	}
104 	
105 	fd_t run(AsyncUDPSocket ctxt, UDPHandler del) {
106 		m_status = StatusInfo.init;
107 		
108 		import libasync.internals.socket_compat : socket, SOCK_DGRAM, IPPROTO_UDP;
109 		import core.sys.posix.unistd;
110 
111 		try log("Address: " ~ ctxt.local.toString()); catch {}
112 
113 		fd_t fd = socket(cast(int)ctxt.local.family, SOCK_DGRAM, IPPROTO_UDP);
114 
115 
116 		if (catchError!("run AsyncUDPSocket")(fd))
117 			return 0;
118 		
119 		if (!setNonBlock(fd))
120 			return 0;
121 		
122 		if (!initUDPSocket(fd, ctxt, del))
123 			return 0;
124 
125 		try log("UDP Socket started FD#" ~ fd.to!string);
126 		catch{}
127 		/*
128 		static if (!EPOLL) {
129 			gs_fdPool.insert(fd);
130 		}*/
131 		
132 		return fd;
133 	}
134 	
135 	fd_t run(AsyncNotifier ctxt)
136 	{
137 		
138 		m_status = StatusInfo.init;
139 		
140 		fd_t err;
141 		static if (EPOLL) {
142 			fd_t fd = eventfd(0, EFD_NONBLOCK);
143 			
144 			if (catchSocketError!("run AsyncNotifier")(fd))
145 				return 0;
146 			
147 			epoll_event _event;
148 			_event.events = EPOLLIN | EPOLLET;
149 		}	
150 		else /* if KQUEUE */
151 		{
152 			kevent_t _event;
153 			fd_t fd = cast(fd_t)createIndex();
154 		}
155 		EventType evtype = EventType.Notifier;
156 		NotifierHandler evh;
157 		evh.ctxt = ctxt;
158 		
159 		evh.fct = (AsyncNotifier ctxt) {
160 			try {
161 				ctxt.handler();
162 			} catch (Exception e) {
163 				//setInternalError!"AsyncTimer handler"(Status.ERROR);
164 			}
165 		};
166 		
167 		EventObject eobj;
168 		eobj.notifierHandler = evh;
169 		
170 		EventInfo* evinfo;
171 		
172 		if (!ctxt.evInfo) {
173 			try evinfo = FreeListObjectAlloc!EventInfo.alloc(fd, evtype, eobj, m_instanceId);
174 			catch (Exception e) {
175 				assert(false, "Failed to allocate resources: " ~ e.msg);
176 			}
177 			
178 			ctxt.evInfo = evinfo;
179 		}
180 		
181 		static if (EPOLL) {
182 			_event.data.ptr = cast(void*) evinfo;
183 			
184 			err = epoll_ctl(m_epollfd, EPOLL_CTL_ADD, fd, &_event);
185 			if (catchSocketError!("epoll_add(eventfd)")(err))
186 				return fd_t.init;
187 		}
188 		else /* if KQUEUE */
189 		{
190 			EV_SET(&_event, fd, EVFILT_USER, EV_ADD | EV_CLEAR, NOTE_FFCOPY, 0, evinfo);
191 			
192 			err = kevent(m_kqueuefd, &_event, 1, null, 0, null);
193 			
194 			if (catchError!"kevent_addSignal"(err))
195 				return fd_t.init;
196 		}
197 		
198 		return fd;
199 		
200 		
201 	}
202 	
203 	fd_t run(shared AsyncSignal ctxt) 
204 	{
205 		static if (EPOLL) {
206 			
207 			m_status = StatusInfo.init;
208 			
209 			ctxt.evInfo = cast(shared) m_evSignal;
210 			
211 			return cast(fd_t) (__libc_current_sigrtmin());
212 		}
213 		else
214 		{
215 			m_status = StatusInfo.init;
216 			
217 			ctxt.evInfo = cast(shared) m_evSignal;
218 
219 			return cast(fd_t) createIndex(ctxt);
220 			
221 		}
222 	}
223 	
224 	fd_t run(AsyncTimer ctxt, TimerHandler del, Duration timeout) {
225 		m_status = StatusInfo.init;
226 		
227 		static if (EPOLL)
228 		{
229 			import core.sys.posix.time : itimerspec, CLOCK_REALTIME;
230 			
231 			fd_t fd = ctxt.id;
232 			itimerspec its;
233 			
234 			its.it_value.tv_sec = timeout.split!("seconds", "nsecs")().seconds;
235 			its.it_value.tv_nsec = timeout.split!("seconds", "nsecs")().nsecs;
236 			if (!ctxt.oneShot)
237 			{
238 				its.it_interval.tv_sec = its.it_value.tv_sec;
239 				its.it_interval.tv_nsec = its.it_value.tv_nsec;
240 				
241 			}
242 			
243 			if (fd == fd_t.init) {
244 				fd = timerfd_create(CLOCK_REALTIME, 0);
245 				if (catchError!"timer_create"(fd))
246 					return 0;
247 			}
248 			int err = timerfd_settime(fd, 0, &its, null);
249 			
250 			if (catchError!"timer_settime"(err))
251 				return 0;
252 			epoll_event _event;
253 
254 			EventType evtype;			
255 			evtype = EventType.Timer;
256 			EventObject eobj;
257 			eobj.timerHandler = del;
258 			
259 			EventInfo* evinfo;
260 			
261 			if (!ctxt.evInfo) {
262 				try evinfo = FreeListObjectAlloc!EventInfo.alloc(fd, evtype, eobj, m_instanceId);
263 				catch (Exception e) {
264 					assert(false, "Failed to allocate resources: " ~ e.msg);
265 				}
266 				
267 				ctxt.evInfo = evinfo;
268 			}
269 			else {
270 				evinfo = ctxt.evInfo;
271 				evinfo.evObj = eobj;
272 			}
273 			_event.events |= EPOLLIN | EPOLLET;
274 			_event.data.ptr = evinfo;
275 			if (ctxt.id > 0) {
276 				err = epoll_ctl(m_epollfd, EPOLL_CTL_DEL, ctxt.id, null); 
277 				
278 				if (catchError!"epoll_ctl"(err))
279 					return fd_t.init;
280 			}
281 			err = epoll_ctl(m_epollfd, EPOLL_CTL_ADD, fd, &_event); 
282 			
283 			if (catchError!"timer_epoll_add"(err))
284 				return 0;
285 			return fd;
286 		}
287 		else /* if KQUEUE */
288 		{
289 			fd_t fd = ctxt.id;
290 			
291 			if (ctxt.id == 0)
292 				fd = cast(fd_t) createIndex();
293 			EventType evtype;			
294 			evtype = EventType.Timer;
295 			
296 			EventObject eobj;
297 			eobj.timerHandler = del;
298 			
299 			EventInfo* evinfo;
300 			
301 			if (!ctxt.evInfo) {
302 				try evinfo = FreeListObjectAlloc!EventInfo.alloc(fd, evtype, eobj, m_instanceId);
303 				catch (Exception e) {
304 					assert(false, "Failed to allocate resources: " ~ e.msg);
305 				}
306 				
307 				ctxt.evInfo = evinfo;
308 			}
309 			else {
310 				evinfo = ctxt.evInfo;
311 				evinfo.evObj = eobj;
312 			}
313 
314 			kevent_t _event;
315 			
316 			int msecs = cast(int) timeout.total!"msecs";
317 			
318 			// www.khmere.com/freebsd_book/html/ch06.html - EV_CLEAR set internally
319 			EV_SET(&_event, fd, EVFILT_TIMER, EV_ADD | EV_ENABLE, 0, msecs, cast(void*) evinfo);
320 			
321 			if (ctxt.oneShot)
322 				_event.flags |= EV_ONESHOT;
323 			
324 			int err = kevent(m_kqueuefd, &_event, 1, null, 0, null);
325 			
326 			if (catchError!"kevent_timer_add"(err))
327 				return 0;
328 			
329 			return fd;
330 			
331 		}
332 		
333 	}
334 	
335 	fd_t run(AsyncDirectoryWatcher ctxt, DWHandler del) {
336 
337 
338 		static if (EPOLL) 
339 		{
340 			import core.sys.linux.sys.inotify;
341 			enum IN_NONBLOCK = 0x800; // value in core.sys.linux.sys.inotify is incorrect
342 			assert(ctxt.fd == fd_t.init);
343 			int fd = inotify_init1(IN_NONBLOCK);
344 			if (catchError!"inotify_init1"(fd)) {
345 				return fd_t.init;
346 			}
347 			epoll_event _event;
348 			
349 			EventType evtype;
350 			
351 			evtype = EventType.DirectoryWatcher;
352 			EventObject eobj;
353 			eobj.dwHandler = del;
354 			
355 			EventInfo* evinfo;
356 
357 			assert (!ctxt.evInfo, "Cannot run the same DirectoryWatcher again. This should have been caught earlier...");
358 			
359 			try evinfo = FreeListObjectAlloc!EventInfo.alloc(fd, evtype, eobj, m_instanceId);
360 			catch (Exception e) {
361 				assert(false, "Failed to allocate resources: " ~ e.msg);
362 			}
363 			
364 			ctxt.evInfo = evinfo;
365 			
366 			_event.events |= EPOLLIN;
367 			_event.data.ptr = evinfo;
368 			
369 			int err = epoll_ctl(m_epollfd, EPOLL_CTL_ADD, fd, &_event); 
370 			
371 			if (catchError!"epoll_ctl"(err))
372 				return fd_t.init;
373 			return fd;
374 		} 
375 		else /* if KQUEUE */ {
376 			static size_t id = 0;
377 
378 			fd_t fd = cast(uint)++id;
379 
380 			EventType evtype;
381 			
382 			evtype = EventType.DirectoryWatcher;
383 			EventObject eobj;
384 			eobj.dwHandler = del;
385 			
386 			EventInfo* evinfo;
387 
388 			assert (!ctxt.evInfo, "Cannot run the same DirectoryWatcher again. This should have been caught earlier...");
389 
390 			try evinfo = FreeListObjectAlloc!EventInfo.alloc(fd, evtype, eobj, m_instanceId);
391 			catch (Exception e) {
392 				assert(false, "Failed to allocate resources: " ~ e.msg);
393 			}
394 			ctxt.evInfo = evinfo;
395 			try m_watchers[fd] = evinfo; catch {}
396 
397 			try m_changes[fd] = FreeListObjectAlloc!(Array!DWChangeInfo).alloc();
398 			catch (Exception e) {
399 				assert(false, "Failed to allocate resources: " ~ e.msg);
400 			}
401 
402 			/// events will be created in watch()
403 
404 			return fd;
405 
406 		}
407 	}
408 	
409 	bool kill(AsyncDirectoryWatcher ctxt) {
410 
411 		static if (EPOLL) {
412 			import core.sys.posix.unistd : close;
413 			try 
414 			{
415 				Array!(Tuple!(fd_t, uint)) remove_list;
416 				foreach (ref const Tuple!(fd_t, uint) key, ref const DWFolderInfo info; m_dwFolders) {
417 					if (info.fd == ctxt.fd)
418 						remove_list.insertBack(key);
419 				}
420 
421 				foreach (Tuple!(fd_t, uint) key; remove_list[]) {
422 					unwatch(key[0] /*fd_t*/, key[1]);
423 				}
424 
425 				close(ctxt.fd);
426 				FreeListObjectAlloc!EventInfo.free(ctxt.evInfo);
427 				ctxt.evInfo = null;
428 			}
429 			catch (Exception e)
430 			{ 
431 				setInternalError!"Kill.DirectoryWatcher"(Status.ERROR, "Error killing directory watcher"); 
432 				return false;
433 			}
434 
435 		} else /* if KQUEUE */ {
436 			try {
437 				Array!fd_t remove_list;
438 
439 				foreach (ref const fd_t wd, ref const DWFolderInfo info; m_dwFolders) {
440 					if (info.fd == ctxt.fd)
441 						remove_list.insertBack(wd);
442 				}
443 				
444 				foreach (wd; remove_list[]) {
445 					unwatch(ctxt.fd, wd); // deletes all related m_dwFolders and m_dwFiles entries
446 				}
447 
448 				FreeListObjectAlloc!(Array!DWChangeInfo).free(m_changes[ctxt.fd]);
449 				m_watchers.remove(ctxt.fd);	
450 				m_changes.remove(ctxt.fd);		
451 			}
452 			catch (Exception e) {
453 				setInternalError!"Kill.DirectoryWatcher"(Status.ERROR, "Error killing directory watcher"); 
454 				return false;
455 			}
456 		}
457 		return true;
458 	}
459 	
460 	bool kill(AsyncTCPConnection ctxt, bool forced = false)
461 	{
462 		log("Kill socket");
463 		m_status = StatusInfo.init;
464 			fd_t fd = ctxt.socket;
465 				scope(exit) {
466 					if (forced) {
467 					ctxt.connected = false;
468 			}
469 			ctxt.disconnecting = true;
470 		}
471 		if (ctxt.connected)
472 			return closeSocket(fd, true, forced);
473 		else {
474 			return true;
475 		}
476 	}
477 
478 	bool kill(AsyncTCPListener ctxt)
479 	{
480 		log("Kill listener");
481 		m_status = StatusInfo.init;
482 		nothrow void cleanup() {
483 			try FreeListObjectAlloc!EventInfo.free(ctxt.evInfo);
484 			catch { assert(false, "Failed to free resources"); }
485 			ctxt.evInfo = null;
486 		}
487 		scope(exit) {
488 			cleanup();
489 		}
490 		
491 		fd_t fd = ctxt.socket;
492 		
493 		return closeSocket(fd, false, true);
494 	}
495 	
496 	bool kill(AsyncNotifier ctxt)
497 	{
498 		static if (EPOLL)
499 		{
500 			import core.sys.posix.unistd : close;
501 			fd_t err = close(ctxt.id);
502 			
503 			if (catchError!"close(eventfd)"(err))
504 				return false;
505 			
506 			try FreeListObjectAlloc!EventInfo.free(ctxt.evInfo);
507 			catch (Exception e){ assert(false, "Error freeing resources"); }
508 			
509 			return true;
510 		}
511 		else /* if KQUEUE */
512 		{
513 			scope(exit) destroyIndex(ctxt);
514 			
515 			if (ctxt.id == fd_t.init)
516 				return false;
517 			
518 			kevent_t _event;
519 			EV_SET(&_event, ctxt.id, EVFILT_USER, EV_DELETE | EV_DISABLE, 0, 0, null);
520 			
521 			int err = kevent(m_kqueuefd, &_event, 1, null, 0, null);
522 			
523 			try FreeListObjectAlloc!EventInfo.free(ctxt.evInfo);
524 			catch (Exception e){ assert(false, "Error freeing resources"); }
525 			
526 			if (catchError!"kevent_del(notifier)"(err)) {
527 				return false;
528 			}
529 			return true;
530 		}
531 		
532 	}
533 	
534 	bool kill(shared AsyncSignal ctxt)
535 	{
536 		
537 		static if (EPOLL) {
538 			ctxt.evInfo = null;
539 		}
540 		else 
541 		{
542 			m_status = StatusInfo.init;
543 			destroyIndex(ctxt);
544 		}
545 		return true;
546 	}
547 	
548 	bool kill(AsyncTimer ctxt) {
549 		import core.sys.posix.time;
550 		m_status = StatusInfo.init;
551 		
552 		static if (EPOLL)
553 		{
554 			import core.sys.posix.unistd : close;
555 			fd_t err = close(ctxt.id);
556 			if (catchError!"timer_kill"(err))
557 				return false;
558 			
559 			if (ctxt.evInfo) {
560 				try FreeListObjectAlloc!EventInfo.free(ctxt.evInfo);
561 				catch (Exception e) { assert(false, "Failed to free resources: " ~ e.msg); }
562 				ctxt.evInfo = null;
563 			}
564 			
565 		}
566 		else /* if KQUEUE */
567 		{
568 			scope(exit) 
569 				destroyIndex(ctxt);
570 			
571 			if (ctxt.evInfo) {
572 				try FreeListObjectAlloc!EventInfo.free(ctxt.evInfo);
573 				catch (Exception e) { assert(false, "Failed to free resources: " ~ e.msg); }
574 				ctxt.evInfo = null;
575 			}
576 			kevent_t _event;
577 			EV_SET(&_event, ctxt.id, EVFILT_TIMER, EV_DELETE, 0, 0, null);
578 			int err = kevent(m_kqueuefd, &_event, 1, null, 0, null);
579 			if (catchError!"kevent_del(timer)"(err))
580 				return false;
581 		}
582 		return true;
583 	}
584 	
585 	bool kill(AsyncUDPSocket ctxt) {
586 		import core.sys.posix.unistd : close;
587 		
588 		
589 		m_status = StatusInfo.init;
590 		
591 		fd_t fd = ctxt.socket;
592 		fd_t err = close(fd);
593 		
594 		if (catchError!"udp close"(err)) 
595 			return false;
596 		
597 		static if (!EPOLL)
598 		{
599 			kevent_t[2] events;
600 			EV_SET(&(events[0]), ctxt.socket, EVFILT_READ, EV_DELETE, 0, 0, null);
601 			EV_SET(&(events[1]), ctxt.socket, EVFILT_WRITE, EV_DELETE, 0, 0, null);
602 			err = kevent(m_kqueuefd, &(events[0]), 2, null, 0, null);
603 			
604 			if (catchError!"event_del(udp)"(err)) 
605 				return false;
606 		}
607 		
608 		
609 		try FreeListObjectAlloc!EventInfo.free(ctxt.evInfo);
610 		catch (Exception e){
611 			assert(false, "Failed to free resources: " ~ e.msg);
612 		}
613 		ctxt.evInfo = null;
614 		return true;
615 	}
616 
617 }