1 ///
2 module libasync.uds;
3 
4 version (Posix):
5 
6 public import std.socket : UnixAddress;
7 
8 import libasync.types;
9 import libasync.events;
10 import libasync.event;
11 
12 import core.sys.posix.sys.socket;
13 
14 ///
15 final class AsyncUDSConnection
16 {
17 package:
18 	EventLoop m_evLoop;
19 	AsyncEvent m_event;
20 
21 private:
22 	UnixAddress m_peer;
23 	fd_t m_socket, m_preInitializedSocket;
24 	bool m_inbound;
25 
26 nothrow:
27 
28 package:
29 	@property fd_t socket() const {
30 		return m_socket;
31 	}
32 
33 	@property fd_t preInitializedSocket() const {
34 		return m_preInitializedSocket;
35 	}
36 
37 	@property void inbound(bool inbound) {
38 		m_inbound = inbound;
39 	}
40 
41 public:
42 	///
43 	this(EventLoop evl, fd_t preInitializedSocket = fd_t.init)
44 	in { assert(evl !is null); }
45 	body {
46 		m_evLoop = evl;
47 		m_preInitializedSocket = preInitializedSocket;
48 	}
49 
50 	mixin DefStatus;
51 
52 	/// Returns false if the connection has gone.
53 	@property bool isConnected() const {
54 		return m_socket != fd_t.init;
55 	}
56 
57 	/// Returns true if this connection was accepted by an AsyncUDSListener instance.
58 	@property bool inbound() const {
59 		return m_inbound;
60 	}
61 
62 	///
63 	@property UnixAddress peer() const
64 	{
65 		return cast(UnixAddress) m_peer;
66 	}
67 
68 	///
69 	@property void peer(UnixAddress addr)
70 	in {
71 		assert(!isConnected, "Cannot change remote address on a connected socket");
72 		assert(addr !is UnixAddress.init);
73 	}
74 	body {
75 		m_peer = addr;
76 	}
77 
78 	///
79 	bool run(void delegate(EventCode) del)
80 	in { assert(!isConnected); }
81 	body {
82 		m_socket = m_evLoop.run(this);
83 		if (m_socket == 0) return false;
84 
85 		m_event = new AsyncEvent(m_evLoop, m_socket, true);
86 		return m_event.run(del);
87 	}
88 
89 	/// Receive data from the underlying stream. To be used when EventCode.READ is received by the
90 	/// callback handler. IMPORTANT: This must be called until is returns a lower value than the buffer!
91 	final pragma(inline, true)
92 	uint recv(ref ubyte[] ub)
93 	{
94 		return m_evLoop.recv(m_socket, ub);
95 	}
96 
97 	/// Send data through the underlying stream by moving it into the OS buffer.
98 	final pragma(inline, true)
99 	uint send(in ubyte[] ub)
100 	{
101 		uint ret = m_evLoop.send(m_socket, ub);
102 		if (m_evLoop.status.code == Status.ASYNC)
103 			m_event.writeBlocked = true;
104 		return ret;
105 	}
106 
107 	/// Removes the connection from the event loop, closing it if necessary, and
108 	/// cleans up the underlying resources.
109 	bool kill(bool forced = false)
110 	in { assert(isConnected); }
111 	body {
112 		scope(exit) m_socket = 0;
113 		return m_event.kill(forced);
114 	}
115 }
116 
117 ///
118 final class AsyncUDSListener
119 {
120 package:
121 	EventLoop m_evLoop;
122 
123 private:
124 	AsyncEvent m_event;
125 
126 	UnixAddress m_local;
127 	fd_t m_socket;
128 	bool m_started, m_unlinkFirst;
129 	void delegate(EventCode) delegate(AsyncUDSConnection) nothrow m_del;
130 
131 nothrow:
132 
133 private:
134 	void handler(EventCode code)
135 	{
136 		switch (code) {
137 			case EventCode.READ:
138 				AsyncUDSConnection conn = void;
139 				while ((conn = m_evLoop.accept(this)) !is null) {
140 					conn.run(m_del(conn));
141 				}
142 				break;
143 			default:
144 				break;
145 		}
146 	}
147 
148 package:
149 	@property bool unlinkFirst() const {
150 		return m_unlinkFirst;
151 	}
152 
153 	@property fd_t socket() const {
154 		return m_socket;
155 	}
156 
157 public:
158 	///
159 	this(EventLoop evl, bool unlinkFirst = true)
160 	in { assert(evl !is null); }
161 	body {
162 		m_evLoop = evl;
163 		m_unlinkFirst = unlinkFirst;
164 	}
165 
166 	mixin DefStatus;
167 
168 	/// Returns the unix domain socket address as an OS-specific structure.
169 	@property UnixAddress local() const
170 	{
171 		return cast(UnixAddress) m_local;
172 	}
173 
174 	/// Sets the local internet address as an OS-specific structure.
175 	@property void local(UnixAddress addr)
176 	in { assert(!m_started, "Cannot rebind a listening socket"); }
177 	body {
178 		m_local = addr;
179 	}
180 
181 	/// Starts accepting connections by registering the given handler with the underlying OS event.
182 	bool run(void delegate(EventCode) delegate(AsyncUDSConnection) nothrow del)
183 	in {
184 		assert(m_local !is UnixAddress.init, "Cannot bind without an address. Please set .local");
185 	}
186 	body {
187 		m_del = del;
188 		m_socket = m_evLoop.run(this);
189 		if (m_socket == 0) return false;
190 
191 		m_event = new AsyncEvent(m_evLoop, m_socket, false);
192 		m_started = m_event.run(&handler);
193 		return m_started;
194 	}
195 
196 	/// Stops accepting connections and cleans up the underlying OS resources.
197 	/// NOTE: MUST be called to clean up the domain socket path
198 	bool kill()
199 	in { assert(m_socket != 0); }
200 	body {
201 		import core.sys.posix.unistd : unlink;
202 		import core.sys.posix.sys.un : sockaddr_un;
203 
204 		bool ret = m_evLoop.kill(m_event);
205 		if (ret) m_started = false;
206 		unlink(cast(char*) (cast(sockaddr_un*) m_local.name).sun_path);
207 		return ret;
208 	}
209 }