1 module libasync.bufferedtcp; 2 3 import std.algorithm : copy; 4 import std.array : array, empty, front, popFront; 5 import std.range : isInputRange, isOutputRange; 6 7 import memutils.circularbuffer : CircularBuffer; 8 9 import libasync.events; 10 import libasync.tcp : AsyncTCPConnection, TCPEvent, TCPEventHandler; 11 import libasync.types : StatusInfo; 12 13 final class BufferedTCPConnection(size_t size = 4092) 14 { 15 alias OnEvent = void delegate(BufferedTCPConnection!size conn); 16 alias OnRead = 17 void delegate(BufferedTCPConnection!size conn, in ubyte[] msg); 18 19 private 20 { 21 AsyncTCPConnection asyncConn; 22 23 OnEvent onConnectCb; 24 OnEvent onCloseCb; 25 OnEvent onErrorCb; 26 OnReadInfo[] onReadCbs; 27 OnWriteInfo[] onWriteCbs; 28 29 CircularBuffer!(ubyte, size) readBuffer; 30 CircularBuffer!(ubyte, size) writeBuffer; 31 ubyte[] workBuffer = new ubyte[size]; 32 } 33 34 this( 35 EventLoop evl, 36 fd_t preInitializedSocket = fd_t.init, 37 in OnEvent onConnectCb = null, 38 in OnEvent onCloseCb = null, 39 in OnEvent onErrorCb = null) 40 in 41 { 42 assert(evl !is null); 43 } 44 body 45 { 46 asyncConn = new AsyncTCPConnection(evl, preInitializedSocket); 47 48 this.onConnectCb = onConnectCb; 49 this.onCloseCb = onCloseCb; 50 this.onErrorCb = onErrorCb; 51 } 52 53 this( 54 AsyncTCPConnection conn, 55 in OnEvent onConnectCb = null, 56 in OnEvent onCloseCb = null, 57 in OnEvent onErrorCb = null) 58 in 59 { 60 assert(conn !is null); 61 } 62 body 63 { 64 asyncConn = conn; 65 66 this.onConnectCb = onConnectCb; 67 this.onCloseCb = onCloseCb; 68 this.onErrorCb = onErrorCb; 69 } 70 71 @property bool hasError() const 72 { 73 return asyncConn.hasError; 74 } 75 76 /** 77 * The status code is Status.ASYNC if the call is delayed (yield), 78 * Status.ABORT if an unrecoverable socket/fd error occurs (throw), or 79 * Status.ERROR if an internal error occured (assert). 80 */ 81 @property StatusInfo status() const 82 { 83 return asyncConn.status; 84 } 85 86 /** 87 * Returns: Human-readable error message from the underlying operating 88 * system. 89 */ 90 @property string error() const 91 { 92 return asyncConn.error; 93 } 94 95 @property bool isConnected() const nothrow 96 { 97 return asyncConn.isConnected; 98 } 99 100 /** 101 * Returns: true if this connection was accepted by an AsyncTCPListener 102 * instance. 103 */ 104 @property bool inbound() const 105 { 106 return asyncConn.inbound; 107 } 108 109 /// Disables(true)/enables(false) nagle's algorithm (default:enabled). 110 @property void noDelay(bool b) 111 { 112 asyncConn.noDelay(b); 113 } 114 115 /// Changes the default OS configurations for this underlying TCP Socket. 116 bool setOption(T)(TCPOption op, in T val) 117 { 118 return asyncConn.setOption(op, val); 119 } 120 121 /// Returns the OS-specific structure of the internet address 122 /// of the remote network adapter 123 @property NetworkAddress peer() const 124 { 125 return asyncConn.peer; 126 } 127 128 /// Returns the OS-specific structure of the internet address 129 /// for the local end of the connection. 130 @property NetworkAddress local() 131 { 132 return asyncConn.local; 133 } 134 135 /// Sets the remote address as an OS-specific structure (only usable before connecting). 136 @property void peer(NetworkAddress addr) 137 { 138 asyncConn.peer = addr; 139 } 140 141 /// (Blocking) Resolves the specified host and resets the peer to this address. 142 /// Use AsyncDNS for a non-blocking resolver. (only usable before connecting). 143 typeof(this) host(string hostname, size_t port) 144 { 145 asyncConn.host(hostname, port); 146 return this; 147 } 148 149 /// Sets the peer to the specified IP address and port. (only usable before connecting). 150 typeof(this) ip(string ip, size_t port) 151 { 152 asyncConn.ip(ip, port); 153 return this; 154 } 155 156 /// Starts the connection by registering the associated callback handler in the 157 /// underlying OS event loop. 158 bool run(void delegate(TCPEvent) del) 159 { 160 TCPEventHandler handler; 161 handler.conn = asyncConn; 162 handler.del = del; 163 return run(handler); 164 } 165 166 private bool run(TCPEventHandler del) 167 { 168 return asyncConn.run(del); 169 } 170 171 /** 172 * Receive data from the underlying stream. To be used when TCPEvent.READ 173 * is received by the callback handler. 174 * IMPORTANT: This must be called until is returns a lower value than the 175 * buffer! 176 */ 177 private uint recv() 178 in 179 { 180 assert(isConnected, "No socket to operate on"); 181 } 182 body 183 { 184 uint cnt = asyncConn.recv(workBuffer); 185 if (cnt > 0) 186 copy(workBuffer[0..cnt], &readBuffer); 187 return cnt; 188 } 189 190 /** 191 * Send data through the underlying stream by moving it into the OS buffer. 192 */ 193 private uint send() 194 in 195 { 196 assert(isConnected, "No socket to operate on"); 197 } 198 body 199 { 200 copy(writeBuffer[], workBuffer); 201 return asyncConn.send(workBuffer[0..writeBuffer.length].array); 202 } 203 204 /** 205 * Removes the connection from the event loop, closing it if necessary, and 206 * cleans up the underlying resources. 207 */ 208 private bool kill(in bool forced = false) 209 in 210 { 211 assert(isConnected); 212 } 213 body 214 { 215 bool ret = asyncConn.kill(forced); 216 return ret; 217 } 218 219 void read(in size_t len, in OnRead onReadCb) 220 { 221 onReadCbs ~= OnReadInfo(len, onReadCb); 222 } 223 224 // Note: All buffers must be empty when returning from TCPEvent.READ 225 private void onRead() 226 { 227 uint read; 228 do read = recv(); 229 while (read == readBuffer.capacity); 230 231 if (onReadCbs.empty) 232 return; 233 234 foreach (ref info; onReadCbs) with (info) 235 if (readBuffer.length >= len) 236 { 237 cb(this, readBuffer[0..len].array); 238 readBuffer.popFrontN(len); 239 onReadCbs.popFront(); 240 } 241 else 242 break; 243 } 244 245 void write(R)(in R msg, in size_t len, in OnEvent cb = null) 246 if (isInputRange!R) 247 { 248 writeBuffer.put(msg[0..len]); 249 250 onWriteCbs ~= OnWriteInfo(len, cb); 251 onWrite(); 252 } 253 254 private void onWrite() 255 { 256 if (writeBuffer.length == 0) 257 return; 258 259 uint sent = send(); 260 writeBuffer.popFrontN(sent); 261 262 foreach (ref info; onWriteCbs) with (info) 263 if (sent >= len) 264 { 265 cb(this); 266 onWriteCbs.popFront(); 267 sent -= len; 268 } 269 else 270 break; 271 } 272 273 private void onConnect() 274 { 275 if (onConnectCb !is null) 276 onConnectCb(this); 277 } 278 279 private void onError() 280 { 281 if (onErrorCb !is null) 282 onErrorCb(this); 283 } 284 285 void close() 286 { 287 kill(); 288 onClose(); 289 } 290 291 private void onClose() 292 { 293 if (onCloseCb !is null) 294 onCloseCb(this); 295 } 296 297 void handle(TCPEvent ev) 298 { 299 final switch (ev) 300 { 301 case TCPEvent.CONNECT: 302 onConnect(); 303 break; 304 case TCPEvent.READ: 305 onRead(); 306 break; 307 case TCPEvent.WRITE: 308 onWrite(); 309 break; 310 case TCPEvent.CLOSE: 311 onClose(); 312 break; 313 case TCPEvent.ERROR: 314 onError(); 315 break; 316 } 317 } 318 319 private struct OnReadInfo 320 { 321 const size_t len; 322 const OnRead cb; 323 } 324 325 private struct OnWriteInfo 326 { 327 const size_t len; 328 const OnEvent cb; 329 } 330 }