1 module libasync.tcp; 2 import std.traits : isPointer; 3 import libasync.types; 4 import libasync.events; 5 import std.typecons : Tuple; 6 7 /// Wraps a TCP stream between 2 network adapters, using a custom handler to 8 /// signal related events. Many of these objects can be active concurrently 9 /// in a thread if the event loop is running and the handlers do not block. 10 final class AsyncTCPConnection 11 { 12 package: 13 EventLoop m_evLoop; 14 15 private: 16 NetworkAddress m_peer; 17 18 nothrow: 19 fd_t m_socket; 20 bool m_noDelay; 21 bool m_inbound; 22 debug bool m_dataRemaining; 23 public: 24 this(EventLoop evl) 25 in { assert(evl !is null); } 26 body { m_evLoop = evl; } 27 28 mixin DefStatus; 29 30 /// Returns false if the connection has gone. 31 @property bool isConnected() const { 32 return m_socket != fd_t.init; 33 } 34 35 /// Returns true if this connection was accepted by an AsyncTCPListener instance. 36 @property bool inbound() const { 37 return m_inbound; 38 } 39 40 /// Disables(true)/enables(false) nagle's algorithm (default:enabled). 41 @property void noDelay(bool b) 42 { 43 if (m_socket == fd_t.init) 44 m_noDelay = b; 45 else 46 setOption(TCPOption.NODELAY, true); 47 } 48 49 /// Changes the default OS configurations for this underlying TCP Socket. 50 bool setOption(T)(TCPOption op, in T val) 51 in { assert(isConnected, "No socket to operate on"); } 52 body { 53 return m_evLoop.setOption(m_socket, op, val); 54 } 55 56 /// Returns the OS-specific structure of the internet address 57 /// of the remote network adapter 58 @property NetworkAddress peer() const 59 { 60 return m_peer; 61 } 62 63 /// Returns the OS-specific structure of the internet address 64 /// for the local end of the connection. 65 @property NetworkAddress local() 66 in { 67 assert(isConnected && m_peer != NetworkAddress.init, "Cannot get local address from a non-connected socket"); 68 } 69 body { 70 return m_evLoop.localAddr(m_socket, m_peer.ipv6); 71 } 72 73 /// Sets the remote address as an OS-specific structure (only usable before connecting). 74 @property void peer(NetworkAddress addr) 75 in { 76 assert(!isConnected, "Cannot change remote address on a connected socket"); 77 assert(addr != NetworkAddress.init); 78 } 79 body { 80 m_peer = addr; 81 } 82 83 /// (Blocking) Resolves the specified host and resets the peer to this address. 84 /// Use AsyncDNS for a non-blocking resolver. (only usable before connecting). 85 typeof(this) host(string hostname, size_t port) 86 in { 87 assert(!isConnected, "Cannot change remote address on a connected socket"); 88 } 89 body { 90 m_peer = m_evLoop.resolveHost(hostname, cast(ushort) port); 91 return this; 92 } 93 94 /// Sets the peer to the specified IP address and port. (only usable before connecting). 95 typeof(this) ip(string ip, size_t port) 96 in { 97 assert(!isConnected, "Cannot change remote address on a connected socket"); 98 } 99 body { 100 m_peer = m_evLoop.resolveIP(ip, cast(ushort) port); 101 return this; 102 } 103 104 /// Starts the connection by registering the associated callback handler in the 105 /// underlying OS event loop. 106 bool run(void delegate(TCPEvent) del) { 107 TCPEventHandler handler; 108 handler.del = del; 109 handler.conn = this; 110 return run(handler); 111 } 112 113 private bool run(TCPEventHandler del) 114 in { assert(!isConnected); } 115 body { 116 m_socket = m_evLoop.run(this, del); 117 if (m_socket == 0) 118 return false; 119 else 120 return true; 121 122 } 123 124 /// Receive data from the underlying stream. To be used when TCPEvent.READ is received by the 125 /// callback handler. IMPORTANT: This must be called until is returns a lower value than the buffer! 126 uint recv(ref ubyte[] ub) 127 in { assert(isConnected, "No socket to operate on"); } 128 body { 129 uint cnt = m_evLoop.recv(m_socket, ub); 130 debug { 131 if (ub.length > cnt) 132 m_dataRemaining = false; 133 } 134 return cnt; 135 } 136 137 /// Send data through the underlying stream by moving it into the OS buffer. 138 uint send(in ubyte[] ub) 139 in { assert(isConnected, "No socket to operate on"); } 140 body { 141 version(Posix) 142 scope(exit) 143 if (m_evLoop.status.code == Status.ASYNC) 144 this.writeBlocked = true; 145 return m_evLoop.send(m_socket, ub); 146 } 147 148 /// Removes the connection from the event loop, closing it if necessary, and 149 /// cleans up the underlying resources. 150 bool kill(bool forced = false) 151 in { assert(isConnected); } 152 body { 153 bool ret = m_evLoop.kill(this, forced); 154 scope(exit) m_socket = 0; 155 return ret; 156 } 157 158 package: 159 mixin TCPConnectionMixins; 160 161 @property void inbound(bool b) { 162 m_inbound = b; 163 } 164 165 @property bool noDelay() const 166 { 167 return m_noDelay; 168 } 169 170 @property fd_t socket() const { 171 return m_socket; 172 } 173 174 @property void socket(fd_t sock) { 175 m_socket = sock; 176 } 177 178 } 179 180 /// Accepts connections on a single IP:PORT tuple by sending a new inbound AsyncTCPConnection 181 /// object to the handler for every newly completed handshake. 182 /// 183 /// Note: If multiple threads are listening to the same IP:PORT tuple, the connections will 184 /// be distributed evenly between them. However, this behavior on Windows is not implemented yet. 185 final class AsyncTCPListener 186 { 187 private: 188 nothrow: 189 EventLoop m_evLoop; 190 fd_t m_socket; 191 NetworkAddress m_local; 192 bool m_noDelay; 193 bool m_started; 194 195 public: 196 197 this(EventLoop evl, fd_t sock = fd_t.init) { m_evLoop = evl; m_socket = sock; } 198 199 mixin DefStatus; 200 201 /// Sets the default value for nagle's algorithm on new connections. 202 @property void noDelay(bool b) 203 in { assert(!m_started, "Cannot set noDelay on a running object."); } 204 body { 205 m_noDelay = b; 206 } 207 208 /// Returns the local internet address as an OS-specific structure. 209 @property NetworkAddress local() const 210 { 211 return m_local; 212 } 213 214 /// Sets the local internet address as an OS-specific structure. 215 @property void local(NetworkAddress addr) 216 in { assert(!m_started, "Cannot rebind a listening socket"); } 217 body { 218 m_local = addr; 219 } 220 221 /// Sets the local listening interface to the specified hostname/port. 222 typeof(this) host(string hostname, size_t port) 223 in { assert(!m_started, "Cannot rebind a listening socket"); } 224 body { 225 m_local = m_evLoop.resolveHost(hostname, cast(ushort) port); 226 return this; 227 } 228 229 /// Sets the local listening interface to the specified ip/port. 230 typeof(this) ip(string ip, size_t port) 231 in { assert(!m_started, "Cannot rebind a listening socket"); } 232 body { 233 m_local = m_evLoop.resolveIP(ip, cast(ushort) port); 234 return this; 235 } 236 237 /// Starts accepting connections by registering the given handler with the underlying OS event. 238 bool run(void delegate(TCPEvent) delegate(AsyncTCPConnection) del) { 239 TCPAcceptHandler handler; 240 handler.ctxt = this; 241 handler.del = del; 242 return run(handler); 243 } 244 245 private bool run(TCPAcceptHandler del) 246 in { 247 assert(m_local != NetworkAddress.init, "Cannot bind without an address. Please run .host() or .ip()"); 248 } 249 body { 250 m_socket = m_evLoop.run(this, del); 251 if (m_socket == fd_t.init) 252 return false; 253 else { 254 m_started = true; 255 return true; 256 } 257 } 258 259 /// Use to implement distributed servicing of connections 260 @property fd_t socket() const { 261 return m_socket; 262 } 263 264 /// Stops accepting connections and cleans up the underlying OS resources. 265 bool kill() 266 in { assert(m_socket != 0); } 267 body { 268 bool ret = m_evLoop.kill(this); 269 if (ret) 270 m_started = false; 271 return ret; 272 } 273 274 package: 275 version(Posix) mixin EvInfoMixins; 276 version(Windows) mixin TCPListenerDistMixins; 277 @property bool noDelay() const 278 { 279 return m_noDelay; 280 } 281 } 282 283 package struct TCPEventHandler { 284 AsyncTCPConnection conn; 285 286 /// Use getContext/setContext to persist the context in each activity. Using AsyncTCPConnection in args 287 /// allows the EventLoop implementation to create and pass a new object, which is necessary for listeners. 288 void delegate(TCPEvent) del; 289 290 void opCall(TCPEvent ev){ 291 if (conn is null) return; //, "Connection was disposed before shutdown could be completed"); 292 if (!conn.isConnected) 293 return; 294 debug conn.m_dataRemaining = true; 295 del(ev); 296 //debug assert(!conn.m_dataRemaining, "You must recv the whole buffer, because TCP events are edge triggered!"); 297 return; 298 } 299 } 300 301 package struct TCPAcceptHandler { 302 AsyncTCPListener ctxt; 303 void delegate(TCPEvent) delegate(AsyncTCPConnection) del; 304 305 TCPEventHandler opCall(AsyncTCPConnection conn){ // conn is null = error! 306 assert(ctxt !is null); 307 308 void delegate(TCPEvent) ev_handler = del(conn); 309 TCPEventHandler handler; 310 handler.del = ev_handler; 311 handler.conn = conn; 312 return handler; 313 } 314 } 315 316 enum TCPEvent : char { 317 ERROR = 0, // The connection will be forcefully closed, this is debugging information 318 CONNECT, // indicates write will not block, although recv may or may not have data 319 READ, // called once when new bytes are in the buffer 320 WRITE, // only called when send returned Status.ASYNC 321 CLOSE // The connection is being shutdown 322 } 323 324 enum TCPOption : char { 325 NODELAY = 0, // Don't delay send to coalesce packets 326 REUSEADDR = 1, 327 CORK, 328 LINGER, 329 BUFFER_RECV, 330 BUFFER_SEND, 331 TIMEOUT_RECV, 332 TIMEOUT_SEND, 333 TIMEOUT_HALFOPEN, 334 KEEPALIVE_ENABLE, 335 KEEPALIVE_DEFER, // Start keeplives after this period 336 KEEPALIVE_COUNT, // Number of keepalives before death 337 KEEPALIVE_INTERVAL, // Interval between keepalives 338 DEFER_ACCEPT, 339 QUICK_ACK, // Bock/reenable quick ACKs. 340 CONGESTION 341 }