1 module libasync.tcp;
2 import std.traits : isPointer;
3 import libasync.types;
4 import libasync.events;
5 import std.typecons : Tuple;
6 
7 /// Wraps a TCP stream between 2 network adapters, using a custom handler to
8 /// signal related events. Many of these objects can be active concurrently 
9 /// in a thread if the event loop is running and the handlers do not block.
10 final class AsyncTCPConnection
11 {
12 package:
13 	EventLoop m_evLoop;
14 
15 private:
16 	NetworkAddress m_peer;
17 
18 nothrow:
19 	fd_t m_socket;
20 	bool m_noDelay;
21 	bool m_inbound;
22 	debug bool m_dataRemaining;
23 public:
24 	this(EventLoop evl)
25 	in { assert(evl !is null); }
26 	body { m_evLoop = evl; }
27 
28 	mixin DefStatus;
29 
30 	/// Returns false if the connection has gone.
31 	@property bool isConnected() const {
32 		return m_socket != fd_t.init;
33 	}
34 
35 	/// Returns true if this connection was accepted by an AsyncTCPListener instance.
36 	@property bool inbound() const {
37 		return m_inbound;
38 	}
39 
40 	/// Disables(true)/enables(false) nagle's algorithm (default:enabled). 
41 	@property void noDelay(bool b)
42 	{
43 		if (m_socket == fd_t.init)
44 			m_noDelay = b;
45 		else
46 			setOption(TCPOption.NODELAY, true);
47 	}
48 
49 	/// Changes the default OS configurations for this underlying TCP Socket.
50 	bool setOption(T)(TCPOption op, in T val) 
51 	in { assert(isConnected, "No socket to operate on"); }
52 	body {
53 		return m_evLoop.setOption(m_socket, op, val);
54 	}
55 
56 	/// Returns the OS-specific structure of the internet address 
57 	/// of the remote network adapter
58 	@property NetworkAddress peer() const 
59 	{
60 		return m_peer;
61 	}
62 
63 	/// Returns the OS-specific structure of the internet address 
64 	/// for the local end of the connection.
65 	@property NetworkAddress local()
66 	in {
67 		assert(isConnected && m_peer != NetworkAddress.init, "Cannot get local address from a non-connected socket");
68 	}
69 	body {
70 		return m_evLoop.localAddr(m_socket, m_peer.ipv6);
71 	}
72 
73 	/// Sets the remote address as an OS-specific structure (only usable before connecting).
74 	@property void peer(NetworkAddress addr)
75 	in { 
76 		assert(!isConnected, "Cannot change remote address on a connected socket"); 
77 		assert(addr != NetworkAddress.init);
78 	}
79 	body {
80 		m_peer = addr;
81 	}
82 
83 	/// (Blocking) Resolves the specified host and resets the peer to this address.
84 	/// Use AsyncDNS for a non-blocking resolver. (only usable before connecting).
85 	typeof(this) host(string hostname, size_t port)
86 	in { 
87 		assert(!isConnected, "Cannot change remote address on a connected socket"); 
88 	}
89 	body {
90 		m_peer = m_evLoop.resolveHost(hostname, cast(ushort) port);
91 		return this;
92 	}
93 
94 	/// Sets the peer to the specified IP address and port. (only usable before connecting).
95 	typeof(this) ip(string ip, size_t port)
96 	in { 
97 		assert(!isConnected, "Cannot change remote address on a connected socket"); 
98 	}
99 	body {
100 		m_peer = m_evLoop.resolveIP(ip, cast(ushort) port);
101 		return this;
102 	}
103 
104 	/// Starts the connection by registering the associated callback handler in the
105 	/// underlying OS event loop.
106 	bool run(void delegate(TCPEvent) del) {
107 		TCPEventHandler handler;
108 		handler.del = del;
109 		handler.conn = this;
110 		return run(handler);
111 	}
112 	
113 	private bool run(TCPEventHandler del)
114 	in { assert(!isConnected); }
115 	body {
116 		m_socket = m_evLoop.run(this, del);
117 		if (m_socket == 0)
118 			return false;
119 		else
120 			return true;
121 		
122 	}
123 
124 	/// Receive data from the underlying stream. To be used when TCPEvent.READ is received by the
125 	/// callback handler. IMPORTANT: This must be called until is returns a lower value than the buffer!
126 	uint recv(ref ubyte[] ub)
127 	in { assert(isConnected, "No socket to operate on"); }
128 	body {
129 		uint cnt = m_evLoop.recv(m_socket, ub);
130 		debug {
131 			if (ub.length > cnt)
132 			m_dataRemaining = false;
133 		}
134 		return cnt;
135 	}
136 
137 	/// Send data through the underlying stream by moving it into the OS buffer.
138 	uint send(in ubyte[] ub)
139 	in { assert(isConnected, "No socket to operate on"); }
140 	body {
141 		version(Posix)
142 			scope(exit)
143 				if (m_evLoop.status.code == Status.ASYNC)
144 					this.writeBlocked = true;
145 		return m_evLoop.send(m_socket, ub);
146 	}
147 
148 	/// Removes the connection from the event loop, closing it if necessary, and
149 	/// cleans up the underlying resources.
150 	bool kill(bool forced = false)
151 	in { assert(isConnected); }
152 	body {
153 		bool ret = m_evLoop.kill(this, forced);
154 		scope(exit) m_socket = 0;
155 		return ret;
156 	}
157 
158 package:
159 	mixin TCPConnectionMixins;
160 
161 	@property void inbound(bool b) {
162 		m_inbound = b;
163 	}
164 
165 	@property bool noDelay() const
166 	{
167 		return m_noDelay;
168 	}
169 
170 	@property fd_t socket() const {
171 		return m_socket;
172 	}
173 
174 	@property void socket(fd_t sock) {
175 		m_socket = sock;
176 	}
177 
178 }
179 
180 /// Accepts connections on a single IP:PORT tuple by sending a new inbound AsyncTCPConnection 
181 /// object to the handler for every newly completed handshake.
182 /// 
183 /// Note: If multiple threads are listening to the same IP:PORT tuple, the connections will
184 /// be distributed evenly between them. However, this behavior on Windows is not implemented yet.
185 final class AsyncTCPListener
186 {
187 private:
188 nothrow:
189 	EventLoop m_evLoop;
190 	fd_t m_socket;
191 	NetworkAddress m_local;
192 	bool m_noDelay;
193 	bool m_started;
194 
195 public:
196 
197 	this(EventLoop evl, fd_t sock = fd_t.init) { m_evLoop = evl; m_socket = sock; }
198 
199 	mixin DefStatus;
200 
201 	/// Sets the default value for nagle's algorithm on new connections.
202 	@property void noDelay(bool b)
203 	in { assert(!m_started, "Cannot set noDelay on a running object."); }
204 	body {
205 		m_noDelay = b;
206 	}
207 
208 	/// Returns the local internet address as an OS-specific structure.
209 	@property NetworkAddress local() const
210 	{
211 		return m_local;
212 	}
213 
214 	/// Sets the local internet address as an OS-specific structure.
215 	@property void local(NetworkAddress addr)
216 	in { assert(!m_started, "Cannot rebind a listening socket"); }
217 	body {
218 		m_local = addr;
219 	}
220 
221 	/// Sets the local listening interface to the specified hostname/port.
222 	typeof(this) host(string hostname, size_t port)
223 	in { assert(!m_started, "Cannot rebind a listening socket"); }
224 	body {
225 		m_local = m_evLoop.resolveHost(hostname, cast(ushort) port);
226 		return this;
227 	}
228 
229 	/// Sets the local listening interface to the specified ip/port.
230 	typeof(this) ip(string ip, size_t port)
231 	in { assert(!m_started, "Cannot rebind a listening socket"); }
232 	body {
233 		m_local = m_evLoop.resolveIP(ip, cast(ushort) port);
234 		return this;
235 	}
236 
237 	/// Starts accepting connections by registering the given handler with the underlying OS event.
238 	bool run(void delegate(TCPEvent) delegate(AsyncTCPConnection) del) {
239 		TCPAcceptHandler handler;
240 		handler.ctxt = this;
241 		handler.del = del;
242 		return run(handler);
243 	}
244 
245 	private bool run(TCPAcceptHandler del)
246 	in { 
247 		assert(m_local != NetworkAddress.init, "Cannot bind without an address. Please run .host() or .ip()");
248 	}
249 	body {
250 		m_socket = m_evLoop.run(this, del);
251 		if (m_socket == fd_t.init)
252 			return false;
253 		else {
254 			m_started = true;
255 			return true;
256 		}
257 	}
258 
259 	/// Use to implement distributed servicing of connections
260 	@property fd_t socket() const {
261 		return m_socket;
262 	}
263 
264 	/// Stops accepting connections and cleans up the underlying OS resources.
265 	bool kill()
266 	in { assert(m_socket != 0); }
267 	body {
268 		bool ret = m_evLoop.kill(this);
269 		if (ret)
270 			m_started = false;
271 		return ret;
272 	}
273 
274 package:
275 	version(Posix) mixin EvInfoMixins;
276 	version(Windows) mixin TCPListenerDistMixins;
277 	@property bool noDelay() const
278 	{
279 		return m_noDelay;
280 	}
281 }
282 
283 package struct TCPEventHandler {
284 	AsyncTCPConnection conn;
285 
286 	/// Use getContext/setContext to persist the context in each activity. Using AsyncTCPConnection in args 
287 	/// allows the EventLoop implementation to create and pass a new object, which is necessary for listeners.
288 	void delegate(TCPEvent) del;
289 
290 	void opCall(TCPEvent ev){
291 		if (conn is null) return; //, "Connection was disposed before shutdown could be completed");
292 		if (!conn.isConnected)
293 			return;
294 		debug conn.m_dataRemaining = true;
295 		del(ev);
296 		//debug assert(!conn.m_dataRemaining, "You must recv the whole buffer, because TCP events are edge triggered!");
297 		return;
298 	}
299 }
300 
301 package struct TCPAcceptHandler {
302 	AsyncTCPListener ctxt;
303 	void delegate(TCPEvent) delegate(AsyncTCPConnection) del;
304 
305 	TCPEventHandler opCall(AsyncTCPConnection conn){ // conn is null = error!
306 		assert(ctxt !is null);
307 
308 		void delegate(TCPEvent) ev_handler = del(conn);
309 		TCPEventHandler handler;
310 		handler.del = ev_handler;
311 		handler.conn = conn;
312 		return handler;
313 	}
314 }
315 
316 enum TCPEvent : char {
317 	ERROR = 0, // The connection will be forcefully closed, this is debugging information
318 	CONNECT, // indicates write will not block, although recv may or may not have data
319 	READ, // called once when new bytes are in the buffer
320 	WRITE, // only called when send returned Status.ASYNC
321 	CLOSE // The connection is being shutdown
322 }
323 
324 enum TCPOption : char {
325 	NODELAY = 0,		// Don't delay send to coalesce packets
326 	REUSEADDR = 1,
327 	CORK,
328 	LINGER,
329 	BUFFER_RECV,
330 	BUFFER_SEND,
331 	TIMEOUT_RECV,
332 	TIMEOUT_SEND,
333 	TIMEOUT_HALFOPEN,
334 	KEEPALIVE_ENABLE,
335 	KEEPALIVE_DEFER,	// Start keeplives after this period
336 	KEEPALIVE_COUNT,	// Number of keepalives before death
337 	KEEPALIVE_INTERVAL,	// Interval between keepalives
338 	DEFER_ACCEPT,
339 	QUICK_ACK,			// Bock/reenable quick ACKs.
340 	CONGESTION
341 }