1 /// 2 module libasync.uds; 3 4 version (Posix): 5 6 public import std.socket : UnixAddress; 7 8 import libasync.types; 9 import libasync.events; 10 import libasync.event; 11 12 import core.sys.posix.sys.socket; 13 14 /// 15 final class AsyncUDSConnection 16 { 17 package: 18 EventLoop m_evLoop; 19 AsyncEvent m_event; 20 21 private: 22 UnixAddress m_peer; 23 fd_t m_socket, m_preInitializedSocket; 24 bool m_inbound; 25 26 nothrow: 27 28 package: 29 @property fd_t socket() const { 30 return m_socket; 31 } 32 33 @property fd_t preInitializedSocket() const { 34 return m_preInitializedSocket; 35 } 36 37 @property void inbound(bool inbound) { 38 m_inbound = inbound; 39 } 40 41 public: 42 /// 43 this(EventLoop evl, fd_t preInitializedSocket = fd_t.init) 44 in { assert(evl !is null); } 45 body { 46 m_evLoop = evl; 47 m_preInitializedSocket = preInitializedSocket; 48 } 49 50 mixin DefStatus; 51 52 /// Returns false if the connection has gone. 53 @property bool isConnected() const { 54 return m_socket != fd_t.init; 55 } 56 57 /// Returns true if this connection was accepted by an AsyncUDSListener instance. 58 @property bool inbound() const { 59 return m_inbound; 60 } 61 62 /// 63 @property UnixAddress peer() const 64 { 65 return cast(UnixAddress) m_peer; 66 } 67 68 /// 69 @property void peer(UnixAddress addr) 70 in { 71 assert(!isConnected, "Cannot change remote address on a connected socket"); 72 assert(addr !is UnixAddress.init); 73 } 74 body { 75 m_peer = addr; 76 } 77 78 /// 79 bool run(void delegate(EventCode) del) 80 in { assert(!isConnected); } 81 body { 82 m_socket = m_evLoop.run(this); 83 if (m_socket == 0) return false; 84 85 m_event = new AsyncEvent(m_evLoop, m_socket, true); 86 return m_event.run(del); 87 } 88 89 /// Receive data from the underlying stream. To be used when EventCode.READ is received by the 90 /// callback handler. IMPORTANT: This must be called until is returns a lower value than the buffer! 91 final pragma(inline, true) 92 uint recv(ref ubyte[] ub) 93 { 94 return m_evLoop.recv(m_socket, ub); 95 } 96 97 /// Send data through the underlying stream by moving it into the OS buffer. 98 final pragma(inline, true) 99 uint send(in ubyte[] ub) 100 { 101 uint ret = m_evLoop.send(m_socket, ub); 102 if (m_evLoop.status.code == Status.ASYNC) 103 m_event.writeBlocked = true; 104 return ret; 105 } 106 107 /// Removes the connection from the event loop, closing it if necessary, and 108 /// cleans up the underlying resources. 109 bool kill(bool forced = false) 110 in { assert(isConnected); } 111 body { 112 scope(exit) m_socket = 0; 113 return m_event.kill(forced); 114 } 115 } 116 117 /// 118 final class AsyncUDSListener 119 { 120 package: 121 EventLoop m_evLoop; 122 123 private: 124 AsyncEvent m_event; 125 126 UnixAddress m_local; 127 fd_t m_socket; 128 bool m_started, m_unlinkFirst; 129 void delegate(EventCode) delegate(AsyncUDSConnection) nothrow m_del; 130 131 nothrow: 132 133 private: 134 void handler(EventCode code) 135 { 136 switch (code) { 137 case EventCode.READ: 138 AsyncUDSConnection conn = void; 139 while ((conn = m_evLoop.accept(this)) !is null) { 140 conn.run(m_del(conn)); 141 } 142 break; 143 default: 144 break; 145 } 146 } 147 148 package: 149 @property bool unlinkFirst() const { 150 return m_unlinkFirst; 151 } 152 153 @property fd_t socket() const { 154 return m_socket; 155 } 156 157 public: 158 /// 159 this(EventLoop evl, bool unlinkFirst = true) 160 in { assert(evl !is null); } 161 body { 162 m_evLoop = evl; 163 m_unlinkFirst = unlinkFirst; 164 } 165 166 mixin DefStatus; 167 168 /// Returns the unix domain socket address as an OS-specific structure. 169 @property UnixAddress local() const 170 { 171 return cast(UnixAddress) m_local; 172 } 173 174 /// Sets the local internet address as an OS-specific structure. 175 @property void local(UnixAddress addr) 176 in { assert(!m_started, "Cannot rebind a listening socket"); } 177 body { 178 m_local = addr; 179 } 180 181 /// Starts accepting connections by registering the given handler with the underlying OS event. 182 bool run(void delegate(EventCode) delegate(AsyncUDSConnection) nothrow del) 183 in { 184 assert(m_local !is UnixAddress.init, "Cannot bind without an address. Please set .local"); 185 } 186 body { 187 m_del = del; 188 m_socket = m_evLoop.run(this); 189 if (m_socket == 0) return false; 190 191 m_event = new AsyncEvent(m_evLoop, m_socket, false); 192 m_started = m_event.run(&handler); 193 return m_started; 194 } 195 196 /// Stops accepting connections and cleans up the underlying OS resources. 197 /// NOTE: MUST be called to clean up the domain socket path 198 bool kill() 199 in { assert(m_socket != 0); } 200 body { 201 import core.sys.posix.unistd : unlink; 202 import core.sys.posix.sys.un : sockaddr_un; 203 204 bool ret = m_evLoop.kill(m_event); 205 if (ret) m_started = false; 206 unlink(cast(char*) (cast(sockaddr_un*) m_local.name).sun_path); 207 return ret; 208 } 209 }