1 module libasync.tcp;
2 import std.traits : isPointer;
3 import libasync.types;
4 import libasync.events;
5 import std.typecons : Tuple;
6 
7 /// Wraps a TCP stream between 2 network adapters, using a custom handler to
8 /// signal related events. Many of these objects can be active concurrently 
9 /// in a thread if the event loop is running and the handlers do not block.
10 final class AsyncTCPConnection
11 {
12 package:
13 	EventLoop m_evLoop;
14 
15 private:
16 	NetworkAddress m_peer;
17 
18 nothrow:
19 	fd_t m_socket;
20 	fd_t m_preInitializedSocket;
21 	bool m_noDelay;
22 	bool m_inbound;
23 	debug bool m_dataRemaining;
24 public:
25 	this(EventLoop evl, fd_t preInitializedSocket = fd_t.init)
26 	in { assert(evl !is null); }
27 	body {
28 		m_evLoop = evl;
29 		m_preInitializedSocket = preInitializedSocket;
30 	}
31 
32 	mixin DefStatus;
33 
34 	/// Returns false if the connection has gone.
35 	@property bool isConnected() const {
36 		return m_socket != fd_t.init;
37 	}
38 
39 	/// Returns true if this connection was accepted by an AsyncTCPListener instance.
40 	@property bool inbound() const {
41 		return m_inbound;
42 	}
43 
44 	/// Disables(true)/enables(false) nagle's algorithm (default:enabled). 
45 	@property void noDelay(bool b)
46 	{
47 		if (m_socket == fd_t.init)
48 			m_noDelay = b;
49 		else
50 			setOption(TCPOption.NODELAY, true);
51 	}
52 
53 	/// Changes the default OS configurations for this underlying TCP Socket.
54 	bool setOption(T)(TCPOption op, in T val) 
55 	in { assert(isConnected, "No socket to operate on"); }
56 	body {
57 		return m_evLoop.setOption(m_socket, op, val);
58 	}
59 
60 	/// Returns the OS-specific structure of the internet address 
61 	/// of the remote network adapter
62 	@property NetworkAddress peer() const 
63 	{
64 		return m_peer;
65 	}
66 
67 	/// Returns the OS-specific structure of the internet address 
68 	/// for the local end of the connection.
69 	@property NetworkAddress local()
70 	in {
71 		assert(isConnected && m_peer != NetworkAddress.init, "Cannot get local address from a non-connected socket");
72 	}
73 	body {
74 		return m_evLoop.localAddr(m_socket, m_peer.ipv6);
75 	}
76 
77 	/// Sets the remote address as an OS-specific structure (only usable before connecting).
78 	@property void peer(NetworkAddress addr)
79 	in { 
80 		assert(!isConnected, "Cannot change remote address on a connected socket"); 
81 		assert(addr != NetworkAddress.init);
82 	}
83 	body {
84 		m_peer = addr;
85 	}
86 
87 	/// (Blocking) Resolves the specified host and resets the peer to this address.
88 	/// Use AsyncDNS for a non-blocking resolver. (only usable before connecting).
89 	typeof(this) host(string hostname, size_t port)
90 	in { 
91 		assert(!isConnected, "Cannot change remote address on a connected socket"); 
92 	}
93 	body {
94 		m_peer = m_evLoop.resolveHost(hostname, cast(ushort) port);
95 		return this;
96 	}
97 
98 	/// Sets the peer to the specified IP address and port. (only usable before connecting).
99 	typeof(this) ip(string ip, size_t port)
100 	in { 
101 		assert(!isConnected, "Cannot change remote address on a connected socket"); 
102 	}
103 	body {
104 		m_peer = m_evLoop.resolveIP(ip, cast(ushort) port);
105 		return this;
106 	}
107 
108 	/// Starts the connection by registering the associated callback handler in the
109 	/// underlying OS event loop.
110 	bool run(void delegate(TCPEvent) del) {
111 		TCPEventHandler handler;
112 		handler.del = del;
113 		handler.conn = this;
114 		return run(handler);
115 	}
116 	
117 	bool run(TCPEventHandler del)
118 	in { assert(!isConnected); }
119 	body {
120 		m_socket = m_evLoop.run(this, del);
121 		if (m_socket == 0)
122 			return false;
123 		else
124 			return true;
125 		
126 	}
127 
128 	/// Receive data from the underlying stream. To be used when TCPEvent.READ is received by the
129 	/// callback handler. IMPORTANT: This must be called until is returns a lower value than the buffer!
130 	uint recv(ref ubyte[] ub)
131 	in { assert(isConnected, "No socket to operate on"); }
132 	body {
133 		uint cnt = m_evLoop.recv(m_socket, ub);
134 		debug {
135 			if (ub.length > cnt)
136 			m_dataRemaining = false;
137 		}
138 		return cnt;
139 	}
140 
141 	/// Send data through the underlying stream by moving it into the OS buffer.
142 	uint send(in ubyte[] ub)
143 	in { assert(isConnected, "No socket to operate on"); }
144 	body {
145 		version(Posix)
146 			scope(exit)
147 				if (m_evLoop.status.code == Status.ASYNC)
148 					this.writeBlocked = true;
149 		return m_evLoop.send(m_socket, ub);
150 	}
151 
152 	/// Removes the connection from the event loop, closing it if necessary, and
153 	/// cleans up the underlying resources.
154 	bool kill(bool forced = false)
155 	in { assert(isConnected); }
156 	body {
157 		bool ret = m_evLoop.kill(this, forced);
158 		scope(exit) m_socket = 0;
159 		return ret;
160 	}
161 
162 package:
163 	mixin TCPConnectionMixins;
164 
165 	@property void inbound(bool b) {
166 		m_inbound = b;
167 	}
168 
169 	@property bool noDelay() const
170 	{
171 		return m_noDelay;
172 	}
173 
174 	@property fd_t socket() const {
175 		return m_socket;
176 	}
177 
178 	@property void socket(fd_t sock) {
179 		m_socket = sock;
180 	}
181 
182 	@property fd_t preInitializedSocket() const {
183 		return m_preInitializedSocket;
184 	}
185 }
186 
187 /// Accepts connections on a single IP:PORT tuple by sending a new inbound AsyncTCPConnection 
188 /// object to the handler for every newly completed handshake.
189 /// 
190 /// Note: If multiple threads are listening to the same IP:PORT tuple, the connections will
191 /// be distributed evenly between them. However, this behavior on Windows is not implemented yet.
192 final class AsyncTCPListener
193 {
194 private:
195 nothrow:
196 	EventLoop m_evLoop;
197 	fd_t m_socket;
198 	NetworkAddress m_local;
199 	bool m_noDelay;
200 	bool m_started;
201 
202 public:
203 
204 	this(EventLoop evl, fd_t sock = fd_t.init) { m_evLoop = evl; m_socket = sock; }
205 
206 	mixin DefStatus;
207 
208 	/// Sets the default value for nagle's algorithm on new connections.
209 	@property void noDelay(bool b)
210 	in { assert(!m_started, "Cannot set noDelay on a running object."); }
211 	body {
212 		m_noDelay = b;
213 	}
214 
215 	/// Returns the local internet address as an OS-specific structure.
216 	@property NetworkAddress local() const
217 	{
218 		return m_local;
219 	}
220 
221 	/// Sets the local internet address as an OS-specific structure.
222 	@property void local(NetworkAddress addr)
223 	in { assert(!m_started, "Cannot rebind a listening socket"); }
224 	body {
225 		m_local = addr;
226 	}
227 
228 	/// Sets the local listening interface to the specified hostname/port.
229 	typeof(this) host(string hostname, size_t port)
230 	in { assert(!m_started, "Cannot rebind a listening socket"); }
231 	body {
232 		m_local = m_evLoop.resolveHost(hostname, cast(ushort) port);
233 		return this;
234 	}
235 
236 	/// Sets the local listening interface to the specified ip/port.
237 	typeof(this) ip(string ip, size_t port)
238 	in { assert(!m_started, "Cannot rebind a listening socket"); }
239 	body {
240 		m_local = m_evLoop.resolveIP(ip, cast(ushort) port);
241 		return this;
242 	}
243 
244 	/// Starts accepting connections by registering the given handler with the underlying OS event.
245 	bool run(void delegate(TCPEvent) delegate(AsyncTCPConnection) del) {
246 		TCPAcceptHandler handler;
247 		handler.ctxt = this;
248 		handler.del = del;
249 		return run(handler);
250 	}
251 
252 	private bool run(TCPAcceptHandler del)
253 	in { 
254 		assert(m_local != NetworkAddress.init, "Cannot bind without an address. Please run .host() or .ip()");
255 	}
256 	body {
257 		m_socket = m_evLoop.run(this, del);
258 		if (m_socket == fd_t.init)
259 			return false;
260 		else {
261 			m_started = true;
262 			return true;
263 		}
264 	}
265 
266 	/// Use to implement distributed servicing of connections
267 	@property fd_t socket() const {
268 		return m_socket;
269 	}
270 
271 	/// Stops accepting connections and cleans up the underlying OS resources.
272 	bool kill()
273 	in { assert(m_socket != 0); }
274 	body {
275 		bool ret = m_evLoop.kill(this);
276 		if (ret)
277 			m_started = false;
278 		return ret;
279 	}
280 
281 package:
282 	version(Posix) mixin EvInfoMixins;
283 	version(Windows) mixin TCPListenerDistMixins;
284 	@property bool noDelay() const
285 	{
286 		return m_noDelay;
287 	}
288 }
289 
290 package struct TCPEventHandler {
291 	AsyncTCPConnection conn;
292 
293 	/// Use getContext/setContext to persist the context in each activity. Using AsyncTCPConnection in args 
294 	/// allows the EventLoop implementation to create and pass a new object, which is necessary for listeners.
295 	void delegate(TCPEvent) del;
296 
297 	void opCall(TCPEvent ev){
298 		if (conn is null) return; //, "Connection was disposed before shutdown could be completed");
299 		if (!conn.isConnected)
300 			return;
301 		debug conn.m_dataRemaining = true;
302 		del(ev);
303 		//debug assert(!conn.m_dataRemaining, "You must recv the whole buffer, because TCP events are edge triggered!");
304 		return;
305 	}
306 }
307 
308 package struct TCPAcceptHandler {
309 	AsyncTCPListener ctxt;
310 	void delegate(TCPEvent) delegate(AsyncTCPConnection) del;
311 
312 	TCPEventHandler opCall(AsyncTCPConnection conn){ // conn is null = error!
313 		assert(ctxt !is null);
314 
315 		void delegate(TCPEvent) ev_handler = del(conn);
316 		TCPEventHandler handler;
317 		handler.del = ev_handler;
318 		handler.conn = conn;
319 		return handler;
320 	}
321 }
322 
323 enum TCPEvent : char {
324 	ERROR = 0, // The connection will be forcefully closed, this is debugging information
325 	CONNECT, // indicates write will not block, although recv may or may not have data
326 	READ, // called once when new bytes are in the buffer
327 	WRITE, // only called when send returned Status.ASYNC
328 	CLOSE // The connection is being shutdown
329 }
330 
331 enum TCPOption : char {
332 	NODELAY = 0,		// Don't delay send to coalesce packets
333 	REUSEADDR = 1,
334 	CORK,
335 	LINGER,
336 	BUFFER_RECV,
337 	BUFFER_SEND,
338 	TIMEOUT_RECV,
339 	TIMEOUT_SEND,
340 	TIMEOUT_HALFOPEN,
341 	KEEPALIVE_ENABLE,
342 	KEEPALIVE_DEFER,	// Start keeplives after this period
343 	KEEPALIVE_COUNT,	// Number of keepalives before death
344 	KEEPALIVE_INTERVAL,	// Interval between keepalives
345 	DEFER_ACCEPT,
346 	QUICK_ACK,			// Bock/reenable quick ACKs.
347 	CONGESTION
348 }