1 module libasync.bufferedtcp;
2 
3 import std.algorithm : copy;
4 import std.array     : array, empty, front, popFront;
5 import std.range     : isInputRange, isOutputRange;
6 
7 import memutils.circularbuffer : CircularBuffer;
8 
9 import libasync.events;
10 import libasync.tcp    : AsyncTCPConnection, TCPEvent, TCPEventHandler;
11 import libasync.types  : StatusInfo;
12 
13 final class BufferedTCPConnection(size_t size = 4092)
14 {
15     alias OnEvent = void delegate(BufferedTCPConnection!size conn);
16     alias OnRead  =
17         void delegate(BufferedTCPConnection!size conn, in ubyte[] msg);
18 
19     private
20     {
21         AsyncTCPConnection asyncConn;
22 
23         OnEvent       onConnectCb;
24         OnEvent       onCloseCb;
25         OnEvent       onErrorCb;
26         OnReadInfo[]  onReadCbs;
27         OnWriteInfo[] onWriteCbs;
28 
29         CircularBuffer!(ubyte, size) readBuffer;
30         CircularBuffer!(ubyte, size) writeBuffer;
31         ubyte[] workBuffer = new ubyte[size];
32     }
33 
34     this(
35         EventLoop evl,
36         fd_t preInitializedSocket = fd_t.init,
37         in OnEvent onConnectCb = null,
38         in OnEvent onCloseCb   = null,
39         in OnEvent onErrorCb   = null)
40     in
41     {
42         assert(evl !is null);
43     }
44     body
45     {
46         asyncConn = new AsyncTCPConnection(evl, preInitializedSocket);
47 
48         this.onConnectCb = onConnectCb;
49         this.onCloseCb   = onCloseCb;
50         this.onErrorCb   = onErrorCb;
51     }
52 
53     this(
54         AsyncTCPConnection conn,
55         in OnEvent onConnectCb = null,
56         in OnEvent onCloseCb   = null,
57         in OnEvent onErrorCb   = null)
58     in
59     {
60         assert(conn !is null);
61     }
62     body
63     {
64         asyncConn = conn;
65 
66         this.onConnectCb = onConnectCb;
67         this.onCloseCb   = onCloseCb;
68         this.onErrorCb   = onErrorCb;
69     }
70 
71     @property bool hasError() const
72     {
73         return asyncConn.hasError;
74     }
75 
76     /**
77      * The status code is Status.ASYNC if the call is delayed (yield),
78      * Status.ABORT if an unrecoverable socket/fd error occurs (throw), or
79      * Status.ERROR if an internal error occured (assert).
80      */
81     @property StatusInfo status() const
82     {
83         return asyncConn.status;
84     }
85 
86     /**
87      * Returns: Human-readable error message from the underlying operating
88      *          system.
89      */
90     @property string error() const
91     {
92         return asyncConn.error;
93     }
94 
95     @property bool isConnected() const nothrow
96     {
97         return asyncConn.isConnected;
98     }
99 
100     /**
101      * Returns: true if this connection was accepted by an AsyncTCPListener
102      *          instance.
103      */
104     @property bool inbound() const
105     {
106         return asyncConn.inbound;
107     }
108 
109     /// Disables(true)/enables(false) nagle's algorithm (default:enabled).
110     @property void noDelay(bool b)
111     {
112         asyncConn.noDelay(b);
113     }
114 
115     /// Changes the default OS configurations for this underlying TCP Socket.
116     bool setOption(T)(TCPOption op, in T val)
117     {
118         return asyncConn.setOption(op, val);
119     }
120 
121     /// Returns the OS-specific structure of the internet address
122     /// of the remote network adapter
123     @property NetworkAddress peer() const
124     {
125         return asyncConn.peer;
126     }
127 
128     /// Returns the OS-specific structure of the internet address
129     /// for the local end of the connection.
130     @property NetworkAddress local()
131     {
132         return asyncConn.local;
133     }
134 
135     /// Sets the remote address as an OS-specific structure (only usable before connecting).
136     @property void peer(NetworkAddress addr)
137     {
138         asyncConn.peer = addr;
139     }
140 
141     /// (Blocking) Resolves the specified host and resets the peer to this address.
142     /// Use AsyncDNS for a non-blocking resolver. (only usable before connecting).
143     typeof(this) host(string hostname, size_t port)
144     {
145         asyncConn.host(hostname, port);
146         return this;
147     }
148 
149     /// Sets the peer to the specified IP address and port. (only usable before connecting).
150     typeof(this) ip(string ip, size_t port)
151     {
152         asyncConn.ip(ip, port);
153         return this;
154     }
155 
156     /// Starts the connection by registering the associated callback handler in the
157     /// underlying OS event loop.
158     bool run(void delegate(TCPEvent) del)
159     {
160         TCPEventHandler handler;
161         handler.conn = asyncConn;
162         handler.del  = del;
163         return run(handler);
164     }
165 
166     private bool run(TCPEventHandler del)
167     {
168         return asyncConn.run(del);
169     }
170 
171     /**
172      * Receive data from the underlying stream. To be used when TCPEvent.READ
173      * is received by the callback handler.
174      * IMPORTANT: This must be called until is returns a lower value than the
175      * buffer!
176      */
177     private uint recv()
178     in
179     {
180         assert(isConnected, "No socket to operate on");
181     }
182     body
183     {
184         uint cnt = asyncConn.recv(workBuffer);
185         if (cnt > 0)
186             copy(workBuffer[0..cnt], &readBuffer);
187         return cnt;
188     }
189 
190     /**
191      * Send data through the underlying stream by moving it into the OS buffer.
192      */
193     private uint send()
194     in
195     {
196         assert(isConnected, "No socket to operate on");
197     }
198     body
199     {
200         copy(writeBuffer[], workBuffer);
201         return asyncConn.send(workBuffer[0..writeBuffer.length].array);
202     }
203 
204     /**
205      * Removes the connection from the event loop, closing it if necessary, and
206      * cleans up the underlying resources.
207      */
208     private bool kill(in bool forced = false)
209     in
210     {
211         assert(isConnected);
212     }
213     body
214     {
215         bool ret = asyncConn.kill(forced);
216         return ret;
217     }
218 
219     void read(in size_t len, in OnRead onReadCb)
220     {
221         onReadCbs ~= OnReadInfo(len, onReadCb);
222     }
223 
224     // Note: All buffers must be empty when returning from TCPEvent.READ
225     private void onRead()
226     {
227         uint read;
228         do read = recv();
229         while (read == readBuffer.capacity);
230 
231         if (onReadCbs.empty)
232             return;
233 
234         foreach (ref info; onReadCbs) with (info)
235             if (readBuffer.length >= len)
236             {
237                 cb(this, readBuffer[0..len].array);
238                 readBuffer.popFrontN(len);
239                 onReadCbs.popFront();
240             }
241             else
242                 break;
243     }
244 
245     void write(R)(in R msg, in size_t len, in OnEvent cb = null)
246     if (isInputRange!R)
247     {
248         writeBuffer.put(msg[0..len]);
249 
250         onWriteCbs ~= OnWriteInfo(len, cb);
251         onWrite();
252     }
253 
254     private void onWrite()
255     {
256         if (writeBuffer.length == 0)
257             return;
258 
259         uint sent = send();
260         writeBuffer.popFrontN(sent);
261 
262         foreach (ref info; onWriteCbs) with (info)
263             if (sent >= len)
264             {
265                 cb(this);
266                 onWriteCbs.popFront();
267                 sent -= len;
268             }
269             else
270                 break;
271     }
272 
273     private void onConnect()
274     {
275         if (onConnectCb !is null)
276             onConnectCb(this);
277     }
278 
279     private void onError()
280     {
281         if (onErrorCb !is null)
282             onErrorCb(this);
283     }
284 
285     void close()
286     {
287         kill();
288         onClose();
289     }
290 
291     private void onClose()
292     {
293         if (onCloseCb !is null)
294             onCloseCb(this);
295     }
296 
297     void handle(TCPEvent ev)
298     {
299         final switch (ev)
300         {
301             case TCPEvent.CONNECT:
302                 onConnect();
303                 break;
304             case TCPEvent.READ:
305                 onRead();
306                 break;
307             case TCPEvent.WRITE:
308                 onWrite();
309                 break;
310             case TCPEvent.CLOSE:
311                 onClose();
312                 break;
313             case TCPEvent.ERROR:
314                 onError();
315                 break;
316         }
317     }
318 
319     private struct OnReadInfo
320     {
321         const size_t len;
322         const OnRead cb;
323     }
324 
325     private struct OnWriteInfo
326     {
327         const size_t len;
328         const OnEvent cb;
329     }
330 }