1 /// 2 module libasync.tcp; 3 import std.traits : isPointer; 4 import libasync.types; 5 import libasync.events; 6 import std.typecons : Tuple; 7 8 /// Wraps a TCP stream between 2 network adapters, using a custom handler to 9 /// signal related events. Many of these objects can be active concurrently 10 /// in a thread if the event loop is running and the handlers do not block. 11 final class AsyncTCPConnection 12 { 13 package: 14 EventLoop m_evLoop; 15 16 private: 17 NetworkAddress m_peer; 18 19 nothrow: 20 fd_t m_socket; 21 fd_t m_preInitializedSocket; 22 bool m_noDelay; 23 bool m_inbound; 24 public: 25 /// 26 this(EventLoop evl, fd_t preInitializedSocket = fd_t.init) 27 in { assert(evl !is null); } 28 body { 29 m_evLoop = evl; 30 m_preInitializedSocket = preInitializedSocket; 31 } 32 33 mixin DefStatus; 34 35 /// Returns false if the connection has gone. 36 @property bool isConnected() const { 37 return m_socket != fd_t.init; 38 } 39 40 /// Returns true if this connection was accepted by an AsyncTCPListener instance. 41 @property bool inbound() const { 42 return m_inbound; 43 } 44 45 /// Disables(true)/enables(false) nagle's algorithm (default:enabled). 46 @property void noDelay(bool b) 47 { 48 if (m_socket == fd_t.init) 49 m_noDelay = b; 50 else 51 setOption(TCPOption.NODELAY, true); 52 } 53 54 /// Changes the default OS configurations for this underlying TCP Socket. 55 bool setOption(T)(TCPOption op, in T val) 56 in { assert(isConnected, "No socket to operate on"); } 57 body { 58 return m_evLoop.setOption(m_socket, op, val); 59 } 60 61 /// Returns the OS-specific structure of the internet address 62 /// of the remote network adapter 63 @property NetworkAddress peer() const 64 { 65 return m_peer; 66 } 67 68 /// Returns the OS-specific structure of the internet address 69 /// for the local end of the connection. 70 @property NetworkAddress local() 71 in { 72 assert(isConnected && m_peer != NetworkAddress.init, "Cannot get local address from a non-connected socket"); 73 } 74 body { 75 return m_evLoop.localAddr(m_socket, m_peer.ipv6); 76 } 77 78 /// Sets the remote address as an OS-specific structure (only usable before connecting). 79 @property void peer(NetworkAddress addr) 80 in { 81 assert(!isConnected, "Cannot change remote address on a connected socket"); 82 assert(addr != NetworkAddress.init); 83 } 84 body { 85 m_peer = addr; 86 } 87 88 /// (Blocking) Resolves the specified host and resets the peer to this address. 89 /// Use AsyncDNS for a non-blocking resolver. (only usable before connecting). 90 typeof(this) host(string hostname, size_t port) 91 in { 92 assert(!isConnected, "Cannot change remote address on a connected socket"); 93 } 94 body { 95 m_peer = m_evLoop.resolveHost(hostname, cast(ushort) port); 96 return this; 97 } 98 99 /// Sets the peer to the specified IP address and port. (only usable before connecting). 100 typeof(this) ip(string ip, size_t port) 101 in { 102 assert(!isConnected, "Cannot change remote address on a connected socket"); 103 } 104 body { 105 m_peer = m_evLoop.resolveIP(ip, cast(ushort) port); 106 return this; 107 } 108 109 /// Starts the connection by registering the associated callback handler in the 110 /// underlying OS event loop. 111 bool run(void delegate(TCPEvent) del) { 112 TCPEventHandler handler; 113 handler.del = del; 114 handler.conn = this; 115 return run(handler); 116 } 117 118 /// 119 bool run(TCPEventHandler del) 120 in { assert(!isConnected); } 121 body { 122 m_socket = m_evLoop.run(this, del); 123 if (m_socket == 0) 124 return false; 125 else 126 return true; 127 } 128 129 /// Receive data from the underlying stream. To be used when TCPEvent.READ is received by the 130 /// callback handler. IMPORTANT: This must be called until is returns a lower value than the buffer! 131 final pragma(inline, true) 132 uint recv(ref ubyte[] ub) 133 //in { assert(isConnected, "No socket to operate on"); } 134 //body 135 { 136 return m_evLoop.recv(m_socket, ub); 137 } 138 139 /// Send data through the underlying stream by moving it into the OS buffer. 140 final pragma(inline, true) 141 uint send(in ubyte[] ub) 142 //in { assert(isConnected, "No socket to operate on"); } 143 //body 144 { 145 uint ret = m_evLoop.send(m_socket, ub); 146 version(Posix) 147 if (m_evLoop.status.code == Status.ASYNC) 148 this.writeBlocked = true; 149 return ret; 150 } 151 152 /// Removes the connection from the event loop, closing it if necessary, and 153 /// cleans up the underlying resources. 154 bool kill(bool forced = false) 155 in { assert(isConnected); } 156 body { 157 bool ret = m_evLoop.kill(this, forced); 158 scope(exit) m_socket = 0; 159 return ret; 160 } 161 162 @property fd_t socket() const { 163 return m_socket; 164 } 165 166 package: 167 mixin COSocketMixins; 168 169 @property void inbound(bool b) { 170 m_inbound = b; 171 } 172 173 @property bool noDelay() const 174 { 175 return m_noDelay; 176 } 177 178 @property void socket(fd_t sock) { 179 m_socket = sock; 180 } 181 182 @property fd_t preInitializedSocket() const { 183 return m_preInitializedSocket; 184 } 185 } 186 187 /// Accepts connections on a single IP:PORT tuple by sending a new inbound AsyncTCPConnection 188 /// object to the handler for every newly completed handshake. 189 /// 190 /// Note: If multiple threads are listening to the same IP:PORT tuple, the connections will 191 /// be distributed evenly between them. However, this behavior on Windows is not implemented yet. 192 final class AsyncTCPListener 193 { 194 private: 195 nothrow: 196 EventLoop m_evLoop; 197 fd_t m_socket; 198 NetworkAddress m_local; 199 bool m_noDelay; 200 bool m_started; 201 202 public: 203 204 /// 205 this(EventLoop evl, fd_t sock = fd_t.init) { m_evLoop = evl; m_socket = sock; } 206 207 mixin DefStatus; 208 209 /// Sets the default value for nagle's algorithm on new connections. 210 @property void noDelay(bool b) 211 in { assert(!m_started, "Cannot set noDelay on a running object."); } 212 body { 213 m_noDelay = b; 214 } 215 216 /// Returns the local internet address as an OS-specific structure. 217 @property NetworkAddress local() const 218 { 219 return m_local; 220 } 221 222 /// Sets the local internet address as an OS-specific structure. 223 @property void local(NetworkAddress addr) 224 in { assert(!m_started, "Cannot rebind a listening socket"); } 225 body { 226 m_local = addr; 227 } 228 229 /// Sets the local listening interface to the specified hostname/port. 230 typeof(this) host(string hostname, size_t port) 231 in { assert(!m_started, "Cannot rebind a listening socket"); } 232 body { 233 m_local = m_evLoop.resolveHost(hostname, cast(ushort) port); 234 return this; 235 } 236 237 /// Sets the local listening interface to the specified ip/port. 238 typeof(this) ip(string ip, size_t port) 239 in { assert(!m_started, "Cannot rebind a listening socket"); } 240 body { 241 m_local = m_evLoop.resolveIP(ip, cast(ushort) port); 242 return this; 243 } 244 245 /// Starts accepting connections by registering the given handler with the underlying OS event. 246 bool run(void delegate(TCPEvent) delegate(AsyncTCPConnection) del) { 247 TCPAcceptHandler handler; 248 handler.ctxt = this; 249 handler.del = del; 250 return run(handler); 251 } 252 253 private bool run(TCPAcceptHandler del) 254 in { 255 assert(m_local != NetworkAddress.init, "Cannot bind without an address. Please run .host() or .ip()"); 256 } 257 body { 258 m_socket = m_evLoop.run(this, del); 259 if (m_socket == fd_t.init) 260 return false; 261 else { 262 if (m_local.port == 0) 263 m_local = m_evLoop.localAddr(m_socket, m_local.ipv6); 264 m_started = true; 265 return true; 266 } 267 } 268 269 /// Use to implement distributed servicing of connections 270 @property fd_t socket() const { 271 return m_socket; 272 } 273 274 /// Stops accepting connections and cleans up the underlying OS resources. 275 bool kill() 276 in { assert(m_socket != 0); } 277 body { 278 bool ret = m_evLoop.kill(this); 279 if (ret) 280 m_started = false; 281 return ret; 282 } 283 284 package: 285 version(Posix) mixin EvInfoMixins; 286 version(Distributed) version(Windows) mixin TCPListenerDistMixins; 287 @property bool noDelay() const 288 { 289 return m_noDelay; 290 } 291 } 292 293 package struct TCPEventHandler { 294 AsyncTCPConnection conn; 295 296 /// Use getContext/setContext to persist the context in each activity. Using AsyncTCPConnection in args 297 /// allows the EventLoop implementation to create and pass a new object, which is necessary for listeners. 298 void delegate(TCPEvent) del; 299 300 void opCall(TCPEvent ev){ 301 if (conn is null || !conn.isConnected) return; //, "Connection was disposed before shutdown could be completed"); 302 del(ev); 303 return; 304 } 305 } 306 307 package struct TCPAcceptHandler { 308 AsyncTCPListener ctxt; 309 void delegate(TCPEvent) delegate(AsyncTCPConnection) del; 310 311 TCPEventHandler opCall(AsyncTCPConnection conn){ // conn is null = error! 312 assert(ctxt !is null); 313 314 void delegate(TCPEvent) ev_handler = del(conn); 315 TCPEventHandler handler; 316 handler.del = ev_handler; 317 handler.conn = conn; 318 return handler; 319 } 320 } 321 322 /// 323 enum TCPEvent : char { 324 ERROR = 0, /// The connection will be forcefully closed, this is debugging information 325 CONNECT, /// indicates write will not block, although recv may or may not have data 326 READ, /// called once when new bytes are in the buffer 327 WRITE, /// only called when send returned Status.ASYNC 328 CLOSE /// The connection is being shutdown 329 } 330 331 /// 332 enum TCPOption : char { 333 NODELAY = 0, /// Don't delay send to coalesce packets 334 REUSEADDR = 1, /// 335 REUSEPORT, /// 336 CORK, /// 337 LINGER, /// 338 BUFFER_RECV, /// 339 BUFFER_SEND, /// 340 TIMEOUT_RECV, /// 341 TIMEOUT_SEND, /// 342 TIMEOUT_HALFOPEN, /// 343 KEEPALIVE_ENABLE, /// 344 KEEPALIVE_DEFER, /// Start keeplives after this period 345 KEEPALIVE_COUNT, /// Number of keepalives before death 346 KEEPALIVE_INTERVAL, /// Interval between keepalives 347 DEFER_ACCEPT, /// 348 QUICK_ACK, /// Bock/reenable quick ACKs. 349 CONGESTION /// 350 }