1 ///
2 module libasync.uds;
3
4 version (Posix):
5
6 public import std.socket : UnixAddress;
7
8 import libasync.types;
9 import libasync.events;
10 import libasync.event;
11
12 import core.sys.posix.sys.socket;
13
14 ///
15 final class AsyncUDSConnection
16 {
17 package:
18 EventLoop m_evLoop;
19 AsyncEvent m_event;
20
21 private:
22 UnixAddress m_peer;
23 fd_t m_socket, m_preInitializedSocket;
24 bool m_inbound;
25
26 nothrow:
27
28 package:
29 @property fd_t socket() const {
30 return m_socket;
31 }
32
33 @property fd_t preInitializedSocket() const {
34 return m_preInitializedSocket;
35 }
36
37 @property void inbound(bool inbound) {
38 m_inbound = inbound;
39 }
40
41 public:
42 ///
43 this(EventLoop evl, fd_t preInitializedSocket = fd_t.init)
44 in { assert(evl !is null); }
45 body {
46 m_evLoop = evl;
47 m_preInitializedSocket = preInitializedSocket;
48 }
49
50 mixin DefStatus;
51
52 /// Returns false if the connection has gone.
53 @property bool isConnected() const {
54 return m_socket != fd_t.init;
55 }
56
57 /// Returns true if this connection was accepted by an AsyncUDSListener instance.
58 @property bool inbound() const {
59 return m_inbound;
60 }
61
62 ///
63 @property UnixAddress peer() const
64 {
65 return cast(UnixAddress) m_peer;
66 }
67
68 ///
69 @property void peer(UnixAddress addr)
70 in {
71 assert(!isConnected, "Cannot change remote address on a connected socket");
72 assert(addr !is UnixAddress.init);
73 }
74 body {
75 m_peer = addr;
76 }
77
78 ///
79 bool run(void delegate(EventCode) del)
80 in { assert(!isConnected); }
81 body {
82 m_socket = m_evLoop.run(this);
83 if (m_socket == 0) return false;
84
85 m_event = new AsyncEvent(m_evLoop, m_socket, true);
86 return m_event.run(del);
87 }
88
89 /// Receive data from the underlying stream. To be used when EventCode.READ is received by the
90 /// callback handler. IMPORTANT: This must be called until is returns a lower value than the buffer!
91 final pragma(inline, true)
92 uint recv(ref ubyte[] ub)
93 {
94 return m_evLoop.recv(m_socket, ub);
95 }
96
97 /// Send data through the underlying stream by moving it into the OS buffer.
98 final pragma(inline, true)
99 uint send(in ubyte[] ub)
100 {
101 uint ret = m_evLoop.send(m_socket, ub);
102 if (m_evLoop.status.code == Status.ASYNC)
103 m_event.writeBlocked = true;
104 return ret;
105 }
106
107 /// Removes the connection from the event loop, closing it if necessary, and
108 /// cleans up the underlying resources.
109 bool kill(bool forced = false)
110 in { assert(isConnected); }
111 body {
112 scope(exit) m_socket = 0;
113 return m_event.kill(forced);
114 }
115 }
116
117 ///
118 final class AsyncUDSListener
119 {
120 package:
121 EventLoop m_evLoop;
122
123 private:
124 AsyncEvent m_event;
125
126 UnixAddress m_local;
127 fd_t m_socket;
128 bool m_started, m_unlinkFirst;
129 void delegate(EventCode) delegate(AsyncUDSConnection) nothrow m_del;
130
131 nothrow:
132
133 private:
134 void handler(EventCode code)
135 {
136 switch (code) {
137 case EventCode.READ:
138 AsyncUDSConnection conn = void;
139 while ((conn = m_evLoop.accept(this)) !is null) {
140 conn.run(m_del(conn));
141 }
142 break;
143 default:
144 break;
145 }
146 }
147
148 package:
149 @property bool unlinkFirst() const {
150 return m_unlinkFirst;
151 }
152
153 @property fd_t socket() const {
154 return m_socket;
155 }
156
157 public:
158 ///
159 this(EventLoop evl, bool unlinkFirst = true)
160 in { assert(evl !is null); }
161 body {
162 m_evLoop = evl;
163 m_unlinkFirst = unlinkFirst;
164 }
165
166 mixin DefStatus;
167
168 /// Returns the unix domain socket address as an OS-specific structure.
169 @property UnixAddress local() const
170 {
171 return cast(UnixAddress) m_local;
172 }
173
174 /// Sets the local internet address as an OS-specific structure.
175 @property void local(UnixAddress addr)
176 in { assert(!m_started, "Cannot rebind a listening socket"); }
177 body {
178 m_local = addr;
179 }
180
181 /// Starts accepting connections by registering the given handler with the underlying OS event.
182 bool run(void delegate(EventCode) delegate(AsyncUDSConnection) nothrow del)
183 in {
184 assert(m_local !is UnixAddress.init, "Cannot bind without an address. Please set .local");
185 }
186 body {
187 m_del = del;
188 m_socket = m_evLoop.run(this);
189 if (m_socket == 0) return false;
190
191 m_event = new AsyncEvent(m_evLoop, m_socket, false);
192 m_started = m_event.run(&handler);
193 return m_started;
194 }
195
196 /// Stops accepting connections and cleans up the underlying OS resources.
197 /// NOTE: MUST be called to clean up the domain socket path
198 bool kill()
199 in { assert(m_socket != 0); }
200 body {
201 import core.sys.posix.unistd : unlink;
202 import core.sys.posix.sys.un : sockaddr_un;
203
204 bool ret = m_evLoop.kill(m_event);
205 if (ret) m_started = false;
206 unlink(cast(char*) (cast(sockaddr_un*) m_local.name).sun_path);
207 return ret;
208 }
209 }