1 module libasync.tcp; 2 import std.traits : isPointer; 3 import libasync.types; 4 import libasync.events; 5 import std.typecons : Tuple; 6 7 /// Wraps a TCP stream between 2 network adapters, using a custom handler to 8 /// signal related events. Many of these objects can be active concurrently 9 /// in a thread if the event loop is running and the handlers do not block. 10 final class AsyncTCPConnection 11 { 12 package: 13 EventLoop m_evLoop; 14 15 private: 16 NetworkAddress m_peer; 17 18 nothrow: 19 fd_t m_socket; 20 fd_t m_preInitializedSocket; 21 bool m_noDelay; 22 bool m_inbound; 23 debug bool m_dataRemaining; 24 public: 25 this(EventLoop evl, fd_t preInitializedSocket = fd_t.init) 26 in { assert(evl !is null); } 27 body { 28 m_evLoop = evl; 29 m_preInitializedSocket = preInitializedSocket; 30 } 31 32 mixin DefStatus; 33 34 /// Returns false if the connection has gone. 35 @property bool isConnected() const { 36 return m_socket != fd_t.init; 37 } 38 39 /// Returns true if this connection was accepted by an AsyncTCPListener instance. 40 @property bool inbound() const { 41 return m_inbound; 42 } 43 44 /// Disables(true)/enables(false) nagle's algorithm (default:enabled). 45 @property void noDelay(bool b) 46 { 47 if (m_socket == fd_t.init) 48 m_noDelay = b; 49 else 50 setOption(TCPOption.NODELAY, true); 51 } 52 53 /// Changes the default OS configurations for this underlying TCP Socket. 54 bool setOption(T)(TCPOption op, in T val) 55 in { assert(isConnected, "No socket to operate on"); } 56 body { 57 return m_evLoop.setOption(m_socket, op, val); 58 } 59 60 /// Returns the OS-specific structure of the internet address 61 /// of the remote network adapter 62 @property NetworkAddress peer() const 63 { 64 return m_peer; 65 } 66 67 /// Returns the OS-specific structure of the internet address 68 /// for the local end of the connection. 69 @property NetworkAddress local() 70 in { 71 assert(isConnected && m_peer != NetworkAddress.init, "Cannot get local address from a non-connected socket"); 72 } 73 body { 74 return m_evLoop.localAddr(m_socket, m_peer.ipv6); 75 } 76 77 /// Sets the remote address as an OS-specific structure (only usable before connecting). 78 @property void peer(NetworkAddress addr) 79 in { 80 assert(!isConnected, "Cannot change remote address on a connected socket"); 81 assert(addr != NetworkAddress.init); 82 } 83 body { 84 m_peer = addr; 85 } 86 87 /// (Blocking) Resolves the specified host and resets the peer to this address. 88 /// Use AsyncDNS for a non-blocking resolver. (only usable before connecting). 89 typeof(this) host(string hostname, size_t port) 90 in { 91 assert(!isConnected, "Cannot change remote address on a connected socket"); 92 } 93 body { 94 m_peer = m_evLoop.resolveHost(hostname, cast(ushort) port); 95 return this; 96 } 97 98 /// Sets the peer to the specified IP address and port. (only usable before connecting). 99 typeof(this) ip(string ip, size_t port) 100 in { 101 assert(!isConnected, "Cannot change remote address on a connected socket"); 102 } 103 body { 104 m_peer = m_evLoop.resolveIP(ip, cast(ushort) port); 105 return this; 106 } 107 108 /// Starts the connection by registering the associated callback handler in the 109 /// underlying OS event loop. 110 bool run(void delegate(TCPEvent) del) { 111 TCPEventHandler handler; 112 handler.del = del; 113 handler.conn = this; 114 return run(handler); 115 } 116 117 bool run(TCPEventHandler del) 118 in { assert(!isConnected); } 119 body { 120 m_socket = m_evLoop.run(this, del); 121 if (m_socket == 0) 122 return false; 123 else 124 return true; 125 126 } 127 128 /// Receive data from the underlying stream. To be used when TCPEvent.READ is received by the 129 /// callback handler. IMPORTANT: This must be called until is returns a lower value than the buffer! 130 uint recv(ref ubyte[] ub) 131 in { assert(isConnected, "No socket to operate on"); } 132 body { 133 uint cnt = m_evLoop.recv(m_socket, ub); 134 debug { 135 if (ub.length > cnt) 136 m_dataRemaining = false; 137 } 138 return cnt; 139 } 140 141 /// Send data through the underlying stream by moving it into the OS buffer. 142 uint send(in ubyte[] ub) 143 in { assert(isConnected, "No socket to operate on"); } 144 body { 145 version(Posix) 146 scope(exit) 147 if (m_evLoop.status.code == Status.ASYNC) 148 this.writeBlocked = true; 149 return m_evLoop.send(m_socket, ub); 150 } 151 152 /// Removes the connection from the event loop, closing it if necessary, and 153 /// cleans up the underlying resources. 154 bool kill(bool forced = false) 155 in { assert(isConnected); } 156 body { 157 bool ret = m_evLoop.kill(this, forced); 158 scope(exit) m_socket = 0; 159 return ret; 160 } 161 162 package: 163 mixin TCPConnectionMixins; 164 165 @property void inbound(bool b) { 166 m_inbound = b; 167 } 168 169 @property bool noDelay() const 170 { 171 return m_noDelay; 172 } 173 174 @property fd_t socket() const { 175 return m_socket; 176 } 177 178 @property void socket(fd_t sock) { 179 m_socket = sock; 180 } 181 182 @property fd_t preInitializedSocket() const { 183 return m_preInitializedSocket; 184 } 185 } 186 187 /// Accepts connections on a single IP:PORT tuple by sending a new inbound AsyncTCPConnection 188 /// object to the handler for every newly completed handshake. 189 /// 190 /// Note: If multiple threads are listening to the same IP:PORT tuple, the connections will 191 /// be distributed evenly between them. However, this behavior on Windows is not implemented yet. 192 final class AsyncTCPListener 193 { 194 private: 195 nothrow: 196 EventLoop m_evLoop; 197 fd_t m_socket; 198 NetworkAddress m_local; 199 bool m_noDelay; 200 bool m_started; 201 202 public: 203 204 this(EventLoop evl, fd_t sock = fd_t.init) { m_evLoop = evl; m_socket = sock; } 205 206 mixin DefStatus; 207 208 /// Sets the default value for nagle's algorithm on new connections. 209 @property void noDelay(bool b) 210 in { assert(!m_started, "Cannot set noDelay on a running object."); } 211 body { 212 m_noDelay = b; 213 } 214 215 /// Returns the local internet address as an OS-specific structure. 216 @property NetworkAddress local() const 217 { 218 return m_local; 219 } 220 221 /// Sets the local internet address as an OS-specific structure. 222 @property void local(NetworkAddress addr) 223 in { assert(!m_started, "Cannot rebind a listening socket"); } 224 body { 225 m_local = addr; 226 } 227 228 /// Sets the local listening interface to the specified hostname/port. 229 typeof(this) host(string hostname, size_t port) 230 in { assert(!m_started, "Cannot rebind a listening socket"); } 231 body { 232 m_local = m_evLoop.resolveHost(hostname, cast(ushort) port); 233 return this; 234 } 235 236 /// Sets the local listening interface to the specified ip/port. 237 typeof(this) ip(string ip, size_t port) 238 in { assert(!m_started, "Cannot rebind a listening socket"); } 239 body { 240 m_local = m_evLoop.resolveIP(ip, cast(ushort) port); 241 return this; 242 } 243 244 /// Starts accepting connections by registering the given handler with the underlying OS event. 245 bool run(void delegate(TCPEvent) delegate(AsyncTCPConnection) del) { 246 TCPAcceptHandler handler; 247 handler.ctxt = this; 248 handler.del = del; 249 return run(handler); 250 } 251 252 private bool run(TCPAcceptHandler del) 253 in { 254 assert(m_local != NetworkAddress.init, "Cannot bind without an address. Please run .host() or .ip()"); 255 } 256 body { 257 m_socket = m_evLoop.run(this, del); 258 if (m_socket == fd_t.init) 259 return false; 260 else { 261 m_started = true; 262 return true; 263 } 264 } 265 266 /// Use to implement distributed servicing of connections 267 @property fd_t socket() const { 268 return m_socket; 269 } 270 271 /// Stops accepting connections and cleans up the underlying OS resources. 272 bool kill() 273 in { assert(m_socket != 0); } 274 body { 275 bool ret = m_evLoop.kill(this); 276 if (ret) 277 m_started = false; 278 return ret; 279 } 280 281 package: 282 version(Posix) mixin EvInfoMixins; 283 version(Windows) mixin TCPListenerDistMixins; 284 @property bool noDelay() const 285 { 286 return m_noDelay; 287 } 288 } 289 290 package struct TCPEventHandler { 291 AsyncTCPConnection conn; 292 293 /// Use getContext/setContext to persist the context in each activity. Using AsyncTCPConnection in args 294 /// allows the EventLoop implementation to create and pass a new object, which is necessary for listeners. 295 void delegate(TCPEvent) del; 296 297 void opCall(TCPEvent ev){ 298 if (conn is null) return; //, "Connection was disposed before shutdown could be completed"); 299 if (!conn.isConnected) 300 return; 301 debug conn.m_dataRemaining = true; 302 del(ev); 303 //debug assert(!conn.m_dataRemaining, "You must recv the whole buffer, because TCP events are edge triggered!"); 304 return; 305 } 306 } 307 308 package struct TCPAcceptHandler { 309 AsyncTCPListener ctxt; 310 void delegate(TCPEvent) delegate(AsyncTCPConnection) del; 311 312 TCPEventHandler opCall(AsyncTCPConnection conn){ // conn is null = error! 313 assert(ctxt !is null); 314 315 void delegate(TCPEvent) ev_handler = del(conn); 316 TCPEventHandler handler; 317 handler.del = ev_handler; 318 handler.conn = conn; 319 return handler; 320 } 321 } 322 323 enum TCPEvent : char { 324 ERROR = 0, // The connection will be forcefully closed, this is debugging information 325 CONNECT, // indicates write will not block, although recv may or may not have data 326 READ, // called once when new bytes are in the buffer 327 WRITE, // only called when send returned Status.ASYNC 328 CLOSE // The connection is being shutdown 329 } 330 331 enum TCPOption : char { 332 NODELAY = 0, // Don't delay send to coalesce packets 333 REUSEADDR = 1, 334 CORK, 335 LINGER, 336 BUFFER_RECV, 337 BUFFER_SEND, 338 TIMEOUT_RECV, 339 TIMEOUT_SEND, 340 TIMEOUT_HALFOPEN, 341 KEEPALIVE_ENABLE, 342 KEEPALIVE_DEFER, // Start keeplives after this period 343 KEEPALIVE_COUNT, // Number of keepalives before death 344 KEEPALIVE_INTERVAL, // Interval between keepalives 345 DEFER_ACCEPT, 346 QUICK_ACK, // Bock/reenable quick ACKs. 347 CONGESTION 348 }