1 ///
2 module libasync.tcp;
3 import std.traits : isPointer;
4 import libasync.types;
5 import libasync.events;
6 import std.typecons : Tuple;
7 
8 /// Wraps a TCP stream between 2 network adapters, using a custom handler to
9 /// signal related events. Many of these objects can be active concurrently
10 /// in a thread if the event loop is running and the handlers do not block.
11 final class AsyncTCPConnection
12 {
13 package:
14 	EventLoop m_evLoop;
15 
16 private:
17 	NetworkAddress m_peer;
18 
19 nothrow:
20 	fd_t m_socket;
21 	fd_t m_preInitializedSocket;
22 	bool m_noDelay;
23 	bool m_inbound;
24 public:
25 	///
26 	this(EventLoop evl, fd_t preInitializedSocket = fd_t.init)
27 	in { assert(evl !is null); }
28 	body {
29 		m_evLoop = evl;
30 		m_preInitializedSocket = preInitializedSocket;
31 	}
32 
33 	mixin DefStatus;
34 
35 	/// Returns false if the connection has gone.
36 	@property bool isConnected() const {
37 		return m_socket != fd_t.init;
38 	}
39 
40 	/// Returns true if this connection was accepted by an AsyncTCPListener instance.
41 	@property bool inbound() const {
42 		return m_inbound;
43 	}
44 
45 	/// Disables(true)/enables(false) nagle's algorithm (default:enabled).
46 	@property void noDelay(bool b)
47 	{
48 		if (m_socket == fd_t.init)
49 			m_noDelay = b;
50 		else
51 			setOption(TCPOption.NODELAY, true);
52 	}
53 
54 	/// Changes the default OS configurations for this underlying TCP Socket.
55 	bool setOption(T)(TCPOption op, in T val)
56 	in { assert(isConnected, "No socket to operate on"); }
57 	body {
58 		return m_evLoop.setOption(m_socket, op, val);
59 	}
60 
61 	/// Returns the OS-specific structure of the internet address
62 	/// of the remote network adapter
63 	@property NetworkAddress peer() const
64 	{
65 		return m_peer;
66 	}
67 
68 	/// Returns the OS-specific structure of the internet address
69 	/// for the local end of the connection.
70 	@property NetworkAddress local()
71 	in {
72 		assert(isConnected && m_peer != NetworkAddress.init, "Cannot get local address from a non-connected socket");
73 	}
74 	body {
75 		return m_evLoop.localAddr(m_socket, m_peer.ipv6);
76 	}
77 
78 	/// Sets the remote address as an OS-specific structure (only usable before connecting).
79 	@property void peer(NetworkAddress addr)
80 	in {
81 		assert(!isConnected, "Cannot change remote address on a connected socket");
82 		assert(addr != NetworkAddress.init);
83 	}
84 	body {
85 		m_peer = addr;
86 	}
87 
88 	/// (Blocking) Resolves the specified host and resets the peer to this address.
89 	/// Use AsyncDNS for a non-blocking resolver. (only usable before connecting).
90 	typeof(this) host(string hostname, size_t port)
91 	in {
92 		assert(!isConnected, "Cannot change remote address on a connected socket");
93 	}
94 	body {
95 		m_peer = m_evLoop.resolveHost(hostname, cast(ushort) port);
96 		return this;
97 	}
98 
99 	/// Sets the peer to the specified IP address and port. (only usable before connecting).
100 	typeof(this) ip(string ip, size_t port)
101 	in {
102 		assert(!isConnected, "Cannot change remote address on a connected socket");
103 	}
104 	body {
105 		m_peer = m_evLoop.resolveIP(ip, cast(ushort) port);
106 		return this;
107 	}
108 
109 	/// Starts the connection by registering the associated callback handler in the
110 	/// underlying OS event loop.
111 	bool run(void delegate(TCPEvent) del) {
112 		TCPEventHandler handler;
113 		handler.del = del;
114 		handler.conn = this;
115 		return run(handler);
116 	}
117 
118 	///
119 	bool run(TCPEventHandler del)
120 	in { assert(!isConnected); }
121 	body {
122 		m_socket = m_evLoop.run(this, del);
123 		if (m_socket == 0)
124 			return false;
125 		else
126 			return true;
127 	}
128 
129 	/// Receive data from the underlying stream. To be used when TCPEvent.READ is received by the
130 	/// callback handler. IMPORTANT: This must be called until is returns a lower value than the buffer!
131 	final pragma(inline, true)
132 	uint recv(ref ubyte[] ub)
133 	//in { assert(isConnected, "No socket to operate on"); }
134 	//body
135 	{
136 		return m_evLoop.recv(m_socket, ub);
137 	}
138 
139 	/// Send data through the underlying stream by moving it into the OS buffer.
140 	final pragma(inline, true)
141 	uint send(in ubyte[] ub)
142 	//in { assert(isConnected, "No socket to operate on"); }
143 	//body
144 	{
145 		uint ret = m_evLoop.send(m_socket, ub);
146 		version(Posix)
147 			if (m_evLoop.status.code == Status.ASYNC)
148 				this.writeBlocked = true;
149 		return ret;
150 	}
151 
152 	/// Removes the connection from the event loop, closing it if necessary, and
153 	/// cleans up the underlying resources.
154 	bool kill(bool forced = false)
155 	in { assert(isConnected); }
156 	body {
157 		bool ret = m_evLoop.kill(this, forced);
158 		scope(exit) m_socket = 0;
159 		return ret;
160 	}
161 
162 	@property fd_t socket() const {
163 		return m_socket;
164 	}
165 
166 package:
167 	mixin COSocketMixins;
168 
169 	@property void inbound(bool b) {
170 		m_inbound = b;
171 	}
172 
173 	@property bool noDelay() const
174 	{
175 		return m_noDelay;
176 	}
177 
178 	@property void socket(fd_t sock) {
179 		m_socket = sock;
180 	}
181 
182 	@property fd_t preInitializedSocket() const {
183 		return m_preInitializedSocket;
184 	}
185 }
186 
187 /// Accepts connections on a single IP:PORT tuple by sending a new inbound AsyncTCPConnection
188 /// object to the handler for every newly completed handshake.
189 ///
190 /// Note: If multiple threads are listening to the same IP:PORT tuple, the connections will
191 /// be distributed evenly between them. However, this behavior on Windows is not implemented yet.
192 final class AsyncTCPListener
193 {
194 private:
195 nothrow:
196 	EventLoop m_evLoop;
197 	fd_t m_socket;
198 	NetworkAddress m_local;
199 	bool m_noDelay;
200 	bool m_started;
201 
202 public:
203 
204 	///
205 	this(EventLoop evl, fd_t sock = fd_t.init) { m_evLoop = evl; m_socket = sock; }
206 
207 	mixin DefStatus;
208 
209 	/// Sets the default value for nagle's algorithm on new connections.
210 	@property void noDelay(bool b)
211 	in { assert(!m_started, "Cannot set noDelay on a running object."); }
212 	body {
213 		m_noDelay = b;
214 	}
215 
216 	/// Returns the local internet address as an OS-specific structure.
217 	@property NetworkAddress local() const
218 	{
219 		return m_local;
220 	}
221 
222 	/// Sets the local internet address as an OS-specific structure.
223 	@property void local(NetworkAddress addr)
224 	in { assert(!m_started, "Cannot rebind a listening socket"); }
225 	body {
226 		m_local = addr;
227 	}
228 
229 	/// Sets the local listening interface to the specified hostname/port.
230 	typeof(this) host(string hostname, size_t port)
231 	in { assert(!m_started, "Cannot rebind a listening socket"); }
232 	body {
233 		m_local = m_evLoop.resolveHost(hostname, cast(ushort) port);
234 		return this;
235 	}
236 
237 	/// Sets the local listening interface to the specified ip/port.
238 	typeof(this) ip(string ip, size_t port)
239 	in { assert(!m_started, "Cannot rebind a listening socket"); }
240 	body {
241 		m_local = m_evLoop.resolveIP(ip, cast(ushort) port);
242 		return this;
243 	}
244 
245 	/// Starts accepting connections by registering the given handler with the underlying OS event.
246 	bool run(void delegate(TCPEvent) delegate(AsyncTCPConnection) del) {
247 		TCPAcceptHandler handler;
248 		handler.ctxt = this;
249 		handler.del = del;
250 		return run(handler);
251 	}
252 
253 	private bool run(TCPAcceptHandler del)
254 	in {
255 		assert(m_local != NetworkAddress.init, "Cannot bind without an address. Please run .host() or .ip()");
256 	}
257 	body {
258 		m_socket = m_evLoop.run(this, del);
259 		if (m_socket == fd_t.init)
260 			return false;
261 		else {
262 			if (m_local.port == 0)
263 				m_local = m_evLoop.localAddr(m_socket, m_local.ipv6);
264 			m_started = true;
265 			return true;
266 		}
267 	}
268 
269 	/// Use to implement distributed servicing of connections
270 	@property fd_t socket() const {
271 		return m_socket;
272 	}
273 
274 	/// Stops accepting connections and cleans up the underlying OS resources.
275 	bool kill()
276 	in { assert(m_socket != 0); }
277 	body {
278 		bool ret = m_evLoop.kill(this);
279 		if (ret)
280 			m_started = false;
281 		return ret;
282 	}
283 
284 package:
285 	version(Posix) mixin EvInfoMixins;
286 	version(Distributed) version(Windows) mixin TCPListenerDistMixins;
287 	@property bool noDelay() const
288 	{
289 		return m_noDelay;
290 	}
291 }
292 
293 package struct TCPEventHandler {
294 	AsyncTCPConnection conn;
295 
296 	/// Use getContext/setContext to persist the context in each activity. Using AsyncTCPConnection in args
297 	/// allows the EventLoop implementation to create and pass a new object, which is necessary for listeners.
298 	void delegate(TCPEvent) del;
299 
300 	void opCall(TCPEvent ev){
301 		if (conn is null || !conn.isConnected) return; //, "Connection was disposed before shutdown could be completed");
302 		del(ev);
303 		return;
304 	}
305 }
306 
307 package struct TCPAcceptHandler {
308 	AsyncTCPListener ctxt;
309 	void delegate(TCPEvent) delegate(AsyncTCPConnection) del;
310 
311 	TCPEventHandler opCall(AsyncTCPConnection conn){ // conn is null = error!
312 		assert(ctxt !is null);
313 
314 		void delegate(TCPEvent) ev_handler = del(conn);
315 		TCPEventHandler handler;
316 		handler.del = ev_handler;
317 		handler.conn = conn;
318 		return handler;
319 	}
320 }
321 
322 ///
323 enum TCPEvent : char {
324 	ERROR = 0, /// The connection will be forcefully closed, this is debugging information
325 	CONNECT, /// indicates write will not block, although recv may or may not have data
326 	READ, /// called once when new bytes are in the buffer
327 	WRITE, /// only called when send returned Status.ASYNC
328 	CLOSE /// The connection is being shutdown
329 }
330 
331 ///
332 enum TCPOption : char {
333 	NODELAY = 0,		/// Don't delay send to coalesce packets
334 	REUSEADDR = 1, ///
335 	REUSEPORT, ///
336 	CORK, ///
337 	LINGER, ///
338 	BUFFER_RECV, ///
339 	BUFFER_SEND, ///
340 	TIMEOUT_RECV, ///
341 	TIMEOUT_SEND, ///
342 	TIMEOUT_HALFOPEN, ///
343 	KEEPALIVE_ENABLE, ///
344 	KEEPALIVE_DEFER,	/// Start keeplives after this period
345 	KEEPALIVE_COUNT,	/// Number of keepalives before death
346 	KEEPALIVE_INTERVAL,	/// Interval between keepalives
347 	DEFER_ACCEPT, ///
348 	QUICK_ACK,			/// Bock/reenable quick ACKs.
349 	CONGESTION ///
350 }