1 /// 2 module libasync.bufferedtcp; 3 4 import std.algorithm : copy; 5 import std.array : array, empty, front, popFront; 6 import std.range : isInputRange, isOutputRange; 7 8 import memutils.circularbuffer : CircularBuffer; 9 10 import libasync.events; 11 import libasync.tcp : AsyncTCPConnection, TCPEvent, TCPEventHandler; 12 import libasync.types : StatusInfo; 13 14 /// 15 final class BufferedTCPConnection(size_t size = 4092) 16 { 17 /// 18 alias OnEvent = void delegate(BufferedTCPConnection!size conn); 19 /// 20 alias OnRead = 21 void delegate(BufferedTCPConnection!size conn, in ubyte[] msg); 22 23 private 24 { 25 AsyncTCPConnection asyncConn; 26 27 OnEvent onConnectCb; 28 OnEvent onCloseCb; 29 OnEvent onErrorCb; 30 OnReadInfo[] onReadCbs; 31 OnWriteInfo[] onWriteCbs; 32 33 CircularBuffer!(ubyte, size) readBuffer; 34 CircularBuffer!(ubyte, size) writeBuffer; 35 ubyte[] workBuffer = new ubyte[size]; 36 } 37 38 /// 39 this( 40 EventLoop evl, 41 fd_t preInitializedSocket = fd_t.init, 42 in OnEvent onConnectCb = null, 43 in OnEvent onCloseCb = null, 44 in OnEvent onErrorCb = null) 45 in 46 { 47 assert(evl !is null); 48 } 49 body 50 { 51 asyncConn = new AsyncTCPConnection(evl, preInitializedSocket); 52 53 this.onConnectCb = onConnectCb; 54 this.onCloseCb = onCloseCb; 55 this.onErrorCb = onErrorCb; 56 } 57 58 /// 59 this( 60 AsyncTCPConnection conn, 61 in OnEvent onConnectCb = null, 62 in OnEvent onCloseCb = null, 63 in OnEvent onErrorCb = null) 64 in 65 { 66 assert(conn !is null); 67 } 68 body 69 { 70 asyncConn = conn; 71 72 this.onConnectCb = onConnectCb; 73 this.onCloseCb = onCloseCb; 74 this.onErrorCb = onErrorCb; 75 } 76 77 /// 78 @property bool hasError() const 79 { 80 return asyncConn.hasError; 81 } 82 83 /** 84 * The status code is Status.ASYNC if the call is delayed (yield), 85 * Status.ABORT if an unrecoverable socket/fd error occurs (throw), or 86 * Status.ERROR if an internal error occured (assert). 87 */ 88 @property StatusInfo status() const 89 { 90 return asyncConn.status; 91 } 92 93 /** 94 * Returns: Human-readable error message from the underlying operating 95 * system. 96 */ 97 @property string error() const 98 { 99 return asyncConn.error; 100 } 101 102 /// 103 @property bool isConnected() const nothrow 104 { 105 return asyncConn.isConnected; 106 } 107 108 /** 109 * Returns: true if this connection was accepted by an AsyncTCPListener 110 * instance. 111 */ 112 @property bool inbound() const 113 { 114 return asyncConn.inbound; 115 } 116 117 /// Disables(true)/enables(false) nagle's algorithm (default:enabled). 118 @property void noDelay(bool b) 119 { 120 asyncConn.noDelay(b); 121 } 122 123 /// Changes the default OS configurations for this underlying TCP Socket. 124 bool setOption(T)(TCPOption op, in T val) 125 { 126 return asyncConn.setOption(op, val); 127 } 128 129 /// Returns the OS-specific structure of the internet address 130 /// of the remote network adapter 131 @property NetworkAddress peer() const 132 { 133 return asyncConn.peer; 134 } 135 136 /// Returns the OS-specific structure of the internet address 137 /// for the local end of the connection. 138 @property NetworkAddress local() 139 { 140 return asyncConn.local; 141 } 142 143 /// Sets the remote address as an OS-specific structure (only usable before connecting). 144 @property void peer(NetworkAddress addr) 145 { 146 asyncConn.peer = addr; 147 } 148 149 /// (Blocking) Resolves the specified host and resets the peer to this address. 150 /// Use AsyncDNS for a non-blocking resolver. (only usable before connecting). 151 typeof(this) host(string hostname, size_t port) 152 { 153 asyncConn.host(hostname, port); 154 return this; 155 } 156 157 /// Sets the peer to the specified IP address and port. (only usable before connecting). 158 typeof(this) ip(string ip, size_t port) 159 { 160 asyncConn.ip(ip, port); 161 return this; 162 } 163 164 /// Starts the connection by registering the associated callback handler in the 165 /// underlying OS event loop. 166 bool run(void delegate(TCPEvent) del) 167 { 168 TCPEventHandler handler; 169 handler.conn = asyncConn; 170 handler.del = del; 171 return run(handler); 172 } 173 174 private bool run(TCPEventHandler del) 175 { 176 return asyncConn.run(del); 177 } 178 179 /** 180 * Receive data from the underlying stream. To be used when TCPEvent.READ 181 * is received by the callback handler. 182 * IMPORTANT: This must be called until is returns a lower value than the 183 * buffer! 184 */ 185 private uint recv() 186 in 187 { 188 assert(isConnected, "No socket to operate on"); 189 } 190 body 191 { 192 uint cnt = asyncConn.recv(workBuffer); 193 if (cnt > 0) 194 copy(workBuffer[0..cnt], &readBuffer); 195 return cnt; 196 } 197 198 /** 199 * Send data through the underlying stream by moving it into the OS buffer. 200 */ 201 private uint send() 202 in 203 { 204 assert(isConnected, "No socket to operate on"); 205 } 206 body 207 { 208 copy(writeBuffer[], workBuffer); 209 return asyncConn.send(workBuffer[0..writeBuffer.length].array); 210 } 211 212 /** 213 * Removes the connection from the event loop, closing it if necessary, and 214 * cleans up the underlying resources. 215 */ 216 private bool kill(in bool forced = false) 217 in 218 { 219 assert(isConnected); 220 } 221 body 222 { 223 bool ret = asyncConn.kill(forced); 224 return ret; 225 } 226 227 /// 228 void read(in size_t len, in OnRead onReadCb) 229 { 230 onReadCbs ~= OnReadInfo(len, onReadCb); 231 } 232 233 // Note: All buffers must be empty when returning from TCPEvent.READ 234 private void onRead() 235 { 236 uint read; 237 do read = recv(); 238 while (read == readBuffer.capacity); 239 240 if (onReadCbs.empty) 241 return; 242 243 foreach (ref info; onReadCbs) with (info) 244 if (readBuffer.length >= len) 245 { 246 cb(this, readBuffer[0..len].array); 247 readBuffer.popFrontN(len); 248 onReadCbs.popFront(); 249 } 250 else 251 break; 252 } 253 254 /// 255 void write(R)(in R msg, in size_t len, in OnEvent cb = null) 256 if (isInputRange!R) 257 { 258 writeBuffer.put(msg[0..len]); 259 260 onWriteCbs ~= OnWriteInfo(len, cb); 261 onWrite(); 262 } 263 264 private void onWrite() 265 { 266 if (writeBuffer.length == 0) 267 return; 268 269 uint sent = send(); 270 writeBuffer.popFrontN(sent); 271 272 foreach (ref info; onWriteCbs) with (info) 273 if (sent >= len) 274 { 275 cb(this); 276 onWriteCbs.popFront(); 277 sent -= len; 278 } 279 else 280 break; 281 } 282 283 private void onConnect() 284 { 285 if (onConnectCb !is null) 286 onConnectCb(this); 287 } 288 289 private void onError() 290 { 291 if (onErrorCb !is null) 292 onErrorCb(this); 293 } 294 295 /// 296 void close() 297 { 298 kill(); 299 onClose(); 300 } 301 302 private void onClose() 303 { 304 if (onCloseCb !is null) 305 onCloseCb(this); 306 } 307 308 /// 309 void handle(TCPEvent ev) 310 { 311 final switch (ev) 312 { 313 case TCPEvent.CONNECT: 314 onConnect(); 315 break; 316 case TCPEvent.READ: 317 onRead(); 318 break; 319 case TCPEvent.WRITE: 320 onWrite(); 321 break; 322 case TCPEvent.CLOSE: 323 onClose(); 324 break; 325 case TCPEvent.ERROR: 326 onError(); 327 break; 328 } 329 } 330 331 private struct OnReadInfo 332 { 333 const size_t len; 334 const OnRead cb; 335 } 336 337 private struct OnWriteInfo 338 { 339 const size_t len; 340 const OnEvent cb; 341 } 342 }