1 module libasync.posix2;
2 
3 // workaround for IDE indent bug on too big files
4 mixin template RunKill()
5 {
6 
7 	fd_t run(AsyncTCPConnection ctxt, TCPEventHandler del)
8 	in { assert(ctxt.socket == fd_t.init, "TCP Connection is active. Use another instance."); }
9 	body {
10 		m_status = StatusInfo.init;
11 		import libasync.internals.socket_compat : socket, SOCK_STREAM, IPPROTO_TCP;
12 		import core.sys.posix.unistd : close;
13 		fd_t fd = ctxt.preInitializedSocket;
14 
15 		if (fd == fd_t.init)
16 			fd = socket(cast(int)ctxt.peer.family, SOCK_STREAM, IPPROTO_TCP);
17 		
18 		if (catchError!("run AsyncTCPConnection")(fd)) 
19 			return 0;
20 		
21 		/// Make sure the socket doesn't block on recv/send
22 		if (!setNonBlock(fd)) {
23 			log("Close socket");
24 			close(fd);
25 			return 0;
26 		}
27 		
28 		/// Enable Nagel's algorithm if specified
29 		if (ctxt.noDelay) {
30 			if (!setOption(fd, TCPOption.NODELAY, true)) {
31 				try log("Closing connection"); catch {}
32 				close(fd);
33 				return 0;
34 			}
35 		}
36 		
37 		/// Trigger the connection setup instructions
38 		if (!initTCPConnection(fd, ctxt, del)) {
39 			close(fd);
40 			return 0;
41 		}
42 		
43 		return fd;
44 		
45 	}
46 	
47 	fd_t run(AsyncTCPListener ctxt, TCPAcceptHandler del)
48 	in { 
49 		//assert(ctxt.socket == fd_t.init, "TCP Listener already bound. Please run another instance."); 
50 		assert(ctxt.local.addr !is typeof(ctxt.local.addr).init, "No locally binding address specified. Use AsyncTCPListener.local = EventLoop.resolve*"); 
51 	}
52 	body {
53 		m_status = StatusInfo.init;
54 		import libasync.internals.socket_compat : socket, SOCK_STREAM, socklen_t, setsockopt, SOL_SOCKET, SO_REUSEADDR, IPPROTO_TCP;
55 		import core.sys.posix.unistd : close;
56 		import core.sync.mutex;
57 		__gshared Mutex g_mtx;
58 		fd_t fd = ctxt.socket;
59 		bool reusing = true;
60 		try if (fd == 0) {
61 			reusing = false;
62 			/// Create the listening socket
63 			synchronized(g_mutex) {
64 				fd = socket(cast(int)ctxt.local.family, SOCK_STREAM, IPPROTO_TCP);
65 				if (catchError!("run AsyncTCPAccept")(fd))
66 					return 0;
67 				/// Allow multiple threads to listen on this address
68 				if (!setOption(fd, TCPOption.REUSEADDR, true)) {
69 					log("Close socket");
70 					close(fd);
71 					return 0;
72 				}
73 			} 
74 
75 			/// Make sure the socket returns instantly when calling listen()
76 			if (!setNonBlock(fd)) {
77 				log("Close socket");
78 				close(fd);
79 				return 0;
80 			}
81 
82 			// todo: defer accept
83 			
84 			/// Automatically starts connections with noDelay if specified
85 			if (ctxt.noDelay) {
86 				if (!setOption(fd, TCPOption.NODELAY, true)) {
87 					try log("Closing connection"); catch {}
88 					close(fd);
89 					return 0;
90 				}
91 			}
92 
93 		} catch { assert(false, "Error in synchronized listener starter"); }
94 
95 		/// Setup the event polling
96 		if (!initTCPListener(fd, ctxt, del, reusing)) {
97 			log("Close socket");
98 			close(fd);
99 			return 0;
100 		}
101 
102 
103 		return fd;
104 		
105 	}
106 	
107 	fd_t run(AsyncUDPSocket ctxt, UDPHandler del) {
108 		m_status = StatusInfo.init;
109 		
110 		import libasync.internals.socket_compat : socket, SOCK_DGRAM, IPPROTO_UDP;
111 		import core.sys.posix.unistd;
112 		fd_t fd = ctxt.preInitializedSocket;
113 
114 		try log("Address: " ~ ctxt.local.toString()); catch {}
115 
116 		if (fd == fd_t.init)
117 			fd = socket(cast(int)ctxt.local.family, SOCK_DGRAM, IPPROTO_UDP);
118 
119 
120 		if (catchError!("run AsyncUDPSocket")(fd))
121 			return 0;
122 		
123 		if (!setNonBlock(fd))
124 			return 0;
125 		
126 		if (!initUDPSocket(fd, ctxt, del))
127 			return 0;
128 
129 		try log("UDP Socket started FD#" ~ fd.to!string);
130 		catch{}
131 		/*
132 		static if (!EPOLL) {
133 			gs_fdPool.insert(fd);
134 		}*/
135 		
136 		return fd;
137 	}
138 	
139 	fd_t run(AsyncNotifier ctxt)
140 	{
141 		
142 		m_status = StatusInfo.init;
143 		
144 		fd_t err;
145 		static if (EPOLL) {
146 			fd_t fd = eventfd(0, EFD_NONBLOCK);
147 			
148 			if (catchSocketError!("run AsyncNotifier")(fd))
149 				return 0;
150 			
151 			epoll_event _event;
152 			_event.events = EPOLLIN | EPOLLET;
153 		}	
154 		else /* if KQUEUE */
155 		{
156 			kevent_t _event;
157 			fd_t fd = cast(fd_t)createIndex();
158 		}
159 		EventType evtype = EventType.Notifier;
160 		NotifierHandler evh;
161 		evh.ctxt = ctxt;
162 		
163 		evh.fct = (AsyncNotifier ctxt) {
164 			try {
165 				ctxt.handler();
166 			} catch (Exception e) {
167 				//setInternalError!"AsyncTimer handler"(Status.ERROR);
168 			}
169 		};
170 		
171 		EventObject eobj;
172 		eobj.notifierHandler = evh;
173 		
174 		EventInfo* evinfo;
175 		
176 		if (!ctxt.evInfo) {
177 			try evinfo = ThreadMem.alloc!EventInfo(fd, evtype, eobj, m_instanceId);
178 			catch (Exception e) {
179 				assert(false, "Failed to allocate resources: " ~ e.msg);
180 			}
181 			
182 			ctxt.evInfo = evinfo;
183 		}
184 		
185 		static if (EPOLL) {
186 			_event.data.ptr = cast(void*) evinfo;
187 			
188 			err = epoll_ctl(m_epollfd, EPOLL_CTL_ADD, fd, &_event);
189 			if (catchSocketError!("epoll_add(eventfd)")(err))
190 				return fd_t.init;
191 		}
192 		else /* if KQUEUE */
193 		{
194 			EV_SET(&_event, fd, EVFILT_USER, EV_ADD | EV_CLEAR, NOTE_FFCOPY, 0, evinfo);
195 			
196 			err = kevent(m_kqueuefd, &_event, 1, null, 0, null);
197 			
198 			if (catchError!"kevent_addSignal"(err))
199 				return fd_t.init;
200 		}
201 		
202 		return fd;
203 		
204 		
205 	}
206 	
207 	fd_t run(shared AsyncSignal ctxt) 
208 	{
209 		static if (EPOLL) {
210 			
211 			m_status = StatusInfo.init;
212 			
213 			ctxt.evInfo = cast(shared) m_evSignal;
214 			
215 			return cast(fd_t) (__libc_current_sigrtmin());
216 		}
217 		else
218 		{
219 			m_status = StatusInfo.init;
220 			
221 			ctxt.evInfo = cast(shared) m_evSignal;
222 
223 			return cast(fd_t) createIndex(ctxt);
224 			
225 		}
226 	}
227 	
228 	fd_t run(AsyncTimer ctxt, TimerHandler del, Duration timeout) {
229 		m_status = StatusInfo.init;
230 		
231 		static if (EPOLL)
232 		{
233 			import core.sys.posix.time : itimerspec, CLOCK_REALTIME;
234 			
235 			fd_t fd = ctxt.id;
236 			itimerspec its;
237 			
238 			its.it_value.tv_sec = cast(typeof(its.it_value.tv_sec)) timeout.split!("seconds", "nsecs")().seconds;
239 			its.it_value.tv_nsec = cast(typeof(its.it_value.tv_nsec)) timeout.split!("seconds", "nsecs")().nsecs;
240 			if (!ctxt.oneShot)
241 			{
242 				its.it_interval.tv_sec = its.it_value.tv_sec;
243 				its.it_interval.tv_nsec = its.it_value.tv_nsec;
244 				
245 			}
246 			
247 			if (fd == fd_t.init) {
248 				fd = timerfd_create(CLOCK_REALTIME, 0);
249 				if (catchError!"timer_create"(fd))
250 					return 0;
251 			}
252 			int err = timerfd_settime(fd, 0, &its, null);
253 			
254 			if (catchError!"timer_settime"(err))
255 				return 0;
256 			epoll_event _event;
257 
258 			EventType evtype;			
259 			evtype = EventType.Timer;
260 			EventObject eobj;
261 			eobj.timerHandler = del;
262 			
263 			EventInfo* evinfo;
264 			
265 			if (!ctxt.evInfo) {
266 				try evinfo = ThreadMem.alloc!EventInfo(fd, evtype, eobj, m_instanceId);
267 				catch (Exception e) {
268 					assert(false, "Failed to allocate resources: " ~ e.msg);
269 				}
270 				
271 				ctxt.evInfo = evinfo;
272 			}
273 			else {
274 				evinfo = ctxt.evInfo;
275 				evinfo.evObj = eobj;
276 			}
277 			_event.events |= EPOLLIN | EPOLLET;
278 			_event.data.ptr = evinfo;
279 			if (ctxt.id > 0) {
280 				err = epoll_ctl(m_epollfd, EPOLL_CTL_DEL, ctxt.id, null); 
281 				
282 				if (catchError!"epoll_ctl"(err))
283 					return fd_t.init;
284 			}
285 			err = epoll_ctl(m_epollfd, EPOLL_CTL_ADD, fd, &_event); 
286 			
287 			if (catchError!"timer_epoll_add"(err))
288 				return 0;
289 			return fd;
290 		}
291 		else /* if KQUEUE */
292 		{
293 			fd_t fd = ctxt.id;
294 			
295 			if (ctxt.id == 0)
296 				fd = cast(fd_t) createIndex();
297 			EventType evtype;			
298 			evtype = EventType.Timer;
299 			
300 			EventObject eobj;
301 			eobj.timerHandler = del;
302 			
303 			EventInfo* evinfo;
304 			
305 			if (!ctxt.evInfo) {
306 				try evinfo = ThreadMem.alloc!EventInfo(fd, evtype, eobj, m_instanceId);
307 				catch (Exception e) {
308 					assert(false, "Failed to allocate resources: " ~ e.msg);
309 				}
310 				
311 				ctxt.evInfo = evinfo;
312 			}
313 			else {
314 				evinfo = ctxt.evInfo;
315 				evinfo.evObj = eobj;
316 			}
317 
318 			kevent_t _event;
319 			int msecs = cast(int) timeout.total!"msecs";
320 			ushort flags_ = EV_ADD | EV_ENABLE;
321 			//if (ctxt.oneShot)
322 			//	flags_ |= EV_CLEAR;
323 
324 			// www.khmere.com/freebsd_book/html/ch06.html - EV_CLEAR set internally
325 			EV_SET(&_event, fd, EVFILT_TIMER, flags_, 0, msecs + 30, cast(void*) evinfo);	
326 
327 			int err = kevent(m_kqueuefd, &_event, 1, null, 0, null);
328 			
329 			if (catchError!"kevent_timer_add"(err))
330 				return 0;
331 			
332 			return fd;
333 			
334 		}
335 		
336 	}
337 	
338 	fd_t run(AsyncDirectoryWatcher ctxt, DWHandler del) {
339 
340 
341 		static if (EPOLL) 
342 		{
343 			import core.sys.linux.sys.inotify;
344 			enum IN_NONBLOCK = 0x800; // value in core.sys.linux.sys.inotify is incorrect
345 			assert(ctxt.fd == fd_t.init);
346 			int fd = inotify_init1(IN_NONBLOCK);
347 			if (catchError!"inotify_init1"(fd)) {
348 				return fd_t.init;
349 			}
350 			epoll_event _event;
351 			
352 			EventType evtype;
353 			
354 			evtype = EventType.DirectoryWatcher;
355 			EventObject eobj;
356 			eobj.dwHandler = del;
357 			
358 			EventInfo* evinfo;
359 
360 			assert (!ctxt.evInfo, "Cannot run the same DirectoryWatcher again. This should have been caught earlier...");
361 			
362 			try evinfo = ThreadMem.alloc!EventInfo(fd, evtype, eobj, m_instanceId);
363 			catch (Exception e) {
364 				assert(false, "Failed to allocate resources: " ~ e.msg);
365 			}
366 			
367 			ctxt.evInfo = evinfo;
368 			
369 			_event.events |= EPOLLIN;
370 			_event.data.ptr = evinfo;
371 			
372 			int err = epoll_ctl(m_epollfd, EPOLL_CTL_ADD, fd, &_event); 
373 			
374 			if (catchError!"epoll_ctl"(err))
375 				return fd_t.init;
376 			return fd;
377 		} 
378 		else /* if KQUEUE */ {
379 			static size_t id = 0;
380 
381 			fd_t fd = cast(uint)++id;
382 
383 			EventType evtype;
384 			
385 			evtype = EventType.DirectoryWatcher;
386 			EventObject eobj;
387 			eobj.dwHandler = del;
388 			
389 			EventInfo* evinfo;
390 
391 			assert (!ctxt.evInfo, "Cannot run the same DirectoryWatcher again. This should have been caught earlier...");
392 
393 			try evinfo = ThreadMem.alloc!EventInfo(fd, evtype, eobj, m_instanceId);
394 			catch (Exception e) {
395 				assert(false, "Failed to allocate resources: " ~ e.msg);
396 			}
397 			ctxt.evInfo = evinfo;
398 			try m_watchers[fd] = evinfo; catch {}
399 
400 			try m_changes[fd] = ThreadMem.alloc!(Array!DWChangeInfo)();
401 			catch (Exception e) {
402 				assert(false, "Failed to allocate resources: " ~ e.msg);
403 			}
404 
405 			/// events will be created in watch()
406 
407 			return fd;
408 
409 		}
410 	}
411 	
412 	bool kill(AsyncDirectoryWatcher ctxt) {
413 
414 		static if (EPOLL) {
415 			import core.sys.posix.unistd : close;
416 			try 
417 			{
418 				Array!(Tuple!(fd_t, uint)) remove_list;
419 				foreach (ref const Tuple!(fd_t, uint) key, ref const DWFolderInfo info; m_dwFolders) {
420 					if (info.fd == ctxt.fd)
421 						remove_list.insertBack(key);
422 				}
423 
424 				foreach (Tuple!(fd_t, uint) key; remove_list[]) {
425 					unwatch(key[0] /*fd_t*/, key[1]);
426 				}
427 
428 				close(ctxt.fd);
429 				ThreadMem.free(ctxt.evInfo);
430 				ctxt.evInfo = null;
431 			}
432 			catch (Exception e)
433 			{ 
434 				setInternalError!"Kill.DirectoryWatcher"(Status.ERROR, "Error killing directory watcher"); 
435 				return false;
436 			}
437 
438 		} else /* if KQUEUE */ {
439 			try {
440 				Array!fd_t remove_list;
441 
442 				foreach (ref const fd_t wd, ref const DWFolderInfo info; m_dwFolders) {
443 					if (info.fd == ctxt.fd)
444 						remove_list.insertBack(wd);
445 				}
446 				
447 				foreach (wd; remove_list[]) {
448 					unwatch(ctxt.fd, wd); // deletes all related m_dwFolders and m_dwFiles entries
449 				}
450 
451 				ThreadMem.free(m_changes[ctxt.fd]);
452 				m_watchers.remove(ctxt.fd);	
453 				m_changes.remove(ctxt.fd);		
454 			}
455 			catch (Exception e) {
456 				setInternalError!"Kill.DirectoryWatcher"(Status.ERROR, "Error killing directory watcher"); 
457 				return false;
458 			}
459 		}
460 		return true;
461 	}
462 	
463 	bool kill(AsyncTCPConnection ctxt, bool forced = false)
464 	{
465 		log("Kill socket");
466 		m_status = StatusInfo.init;
467 		fd_t fd = ctxt.socket;
468 		bool has_socket = fd > 0;
469 		ctxt.disconnecting = true;
470 		if (forced) {
471 			ctxt.connected = false;
472 			ctxt.disconnecting = false;
473 			if (ctxt.evInfo) {
474 				try ThreadMem.free(ctxt.evInfo);
475 				catch { assert(false, "Failed to free resources"); }
476 				ctxt.evInfo = null;
477 			}
478 			if (ctxt.inbound) try ThreadMem.free(ctxt);
479 			catch (Throwable t) { assert(false, "Failed to free resources for context " ~ (cast(void*)ctxt).to!string ~ ": " ~ t.to!string); }
480 		}
481 		return has_socket ? closeSocket(fd, true, forced) : true;
482 	}
483 
484 	bool kill(AsyncTCPListener ctxt)
485 	{
486 		log("Kill listener");
487 		m_status = StatusInfo.init;
488 		nothrow void cleanup() {
489 			try ThreadMem.free(ctxt.evInfo);
490 			catch { assert(false, "Failed to free resources"); }
491 			ctxt.evInfo = null;
492 		}
493 		scope(exit) {
494 			cleanup();
495 		}
496 		
497 		fd_t fd = ctxt.socket;
498 		
499 		return closeSocket(fd, false, true);
500 	}
501 	
502 	bool kill(AsyncNotifier ctxt)
503 	{
504 		static if (EPOLL)
505 		{
506 			import core.sys.posix.unistd : close;
507 			fd_t err = close(ctxt.id);
508 			
509 			if (catchError!"close(eventfd)"(err))
510 				return false;
511 			
512 			try ThreadMem.free(ctxt.evInfo);
513 			catch (Exception e){ assert(false, "Error freeing resources"); }
514 			
515 			return true;
516 		}
517 		else /* if KQUEUE */
518 		{
519 			scope(exit) destroyIndex(ctxt);
520 			
521 			if (ctxt.id == fd_t.init)
522 				return false;
523 			
524 			kevent_t _event;
525 			EV_SET(&_event, ctxt.id, EVFILT_USER, EV_DELETE | EV_DISABLE, 0, 0, null);
526 			
527 			int err = kevent(m_kqueuefd, &_event, 1, null, 0, null);
528 			
529 			try ThreadMem.free(ctxt.evInfo);
530 			catch (Exception e){ assert(false, "Error freeing resources"); }
531 			
532 			if (catchError!"kevent_del(notifier)"(err)) {
533 				return false;
534 			}
535 			return true;
536 		}
537 		
538 	}
539 	
540 	bool kill(shared AsyncSignal ctxt)
541 	{
542 		
543 		static if (EPOLL) {
544 			ctxt.evInfo = null;
545 		}
546 		else 
547 		{
548 			m_status = StatusInfo.init;
549 			destroyIndex(ctxt);
550 		}
551 		return true;
552 	}
553 	
554 	bool kill(AsyncTimer ctxt) {
555 		import core.sys.posix.time;
556 		m_status = StatusInfo.init;
557 		
558 		static if (EPOLL)
559 		{
560 			import core.sys.posix.unistd : close;
561 			fd_t err = close(ctxt.id);
562 			if (catchError!"timer_kill"(err))
563 				return false;
564 			
565 			if (ctxt.evInfo) {
566 				try ThreadMem.free(ctxt.evInfo);
567 				catch (Exception e) { assert(false, "Failed to free resources: " ~ e.msg); }
568 				ctxt.evInfo = null;
569 			}
570 			
571 		}
572 		else /* if KQUEUE */
573 		{
574 			scope(exit) 
575 				destroyIndex(ctxt);
576 			
577 			if (ctxt.evInfo) {
578 				try ThreadMem.free(ctxt.evInfo);
579 				catch (Exception e) { assert(false, "Failed to free resources: " ~ e.msg); }
580 				ctxt.evInfo = null;
581 			}
582 			kevent_t _event;
583 			EV_SET(&_event, ctxt.id, EVFILT_TIMER, EV_DELETE, 0, 0, null);
584 			int err = kevent(m_kqueuefd, &_event, 1, null, 0, null);
585 			if (catchError!"kevent_del(timer)"(err))
586 				return false;
587 		}
588 		return true;
589 	}
590 	
591 	bool kill(AsyncUDPSocket ctxt) {
592 		import core.sys.posix.unistd : close;
593 		
594 		
595 		m_status = StatusInfo.init;
596 		
597 		fd_t fd = ctxt.socket;
598 		fd_t err = close(fd);
599 		
600 		if (catchError!"udp close"(err)) 
601 			return false;
602 		
603 		static if (!EPOLL)
604 		{
605 			kevent_t[2] events;
606 			EV_SET(&(events[0]), ctxt.socket, EVFILT_READ, EV_DELETE, 0, 0, null);
607 			EV_SET(&(events[1]), ctxt.socket, EVFILT_WRITE, EV_DELETE, 0, 0, null);
608 			err = kevent(m_kqueuefd, &(events[0]), 2, null, 0, null);
609 			
610 			if (catchError!"event_del(udp)"(err)) 
611 				return false;
612 		}
613 		
614 		
615 		try ThreadMem.free(ctxt.evInfo);
616 		catch (Exception e){
617 			assert(false, "Failed to free resources: " ~ e.msg);
618 		}
619 		ctxt.evInfo = null;
620 		return true;
621 	}
622 
623 }