1 module libasync.file;
2 import libasync.types;
3 import libasync.events;
4 import core.thread : Thread, ThreadGroup;
5 import core.sync.mutex;
6 import core.sync.condition;
7 import std.stdio;
8 import core.atomic;
9 import libasync.internals.path;
10 import libasync.threads;
11 import std.file;
12 import libasync.internals.memory;
13 
14 /// Runs all blocking file I/O commands in a thread pool and calls the handler
15 /// upon completion.
16 shared final class AsyncFile
17 {
18 nothrow:
19 private:
20 	EventLoop m_evLoop;
21 	bool m_busy;
22 	bool m_error;
23 	FileReadyHandler m_handler;
24 	FileCmdInfo m_cmdInfo;
25 	StatusInfo m_status;
26 	ulong m_cursorOffset;
27 	Thread m_owner;
28 	File* m_file;
29 
30 public:
31 	this(EventLoop evl) {
32 		m_evLoop = cast(shared) evl;
33 		m_cmdInfo.ready = new shared AsyncSignal(cast(EventLoop)m_evLoop);
34 		m_cmdInfo.ready.run(cast(void delegate())&handler);
35 		m_owner = cast(shared)Thread.getThis();
36 		m_file = cast(shared)new File;
37 		try m_cmdInfo.mtx = cast(shared) new Mutex; catch {}
38 	}
39 
40 	/// Cleans up the underlying resources. todo: make this dispose?
41 	bool kill() {
42 		scope(failure) assert(false);
43 		if (file.isOpen)
44 			(cast()*m_file).close();
45 		(cast()*m_file).__dtor();
46 		m_cmdInfo.ready.kill();
47 		m_cmdInfo = typeof(m_cmdInfo).init;
48 		m_handler = typeof(m_handler).init;
49 		return true;
50 	}
51 
52 	synchronized @property StatusInfo status() const
53 	{
54 		return cast(StatusInfo) m_status;
55 	}
56 
57 	@property string error() const
58 	{
59 		return status.text;
60 	}
61 
62 	/// Retrieve the buffer from the last command. Must be called upon completion.
63 	shared(ubyte[]) buffer() {
64 		try synchronized(m_cmdInfo.mtx)
65 			return m_cmdInfo.buffer;
66 		catch {}
67 		return null;
68 	}
69 
70 	/// The current offset updated after the command execution
71 	synchronized @property ulong offset() const {
72 		return m_cursorOffset;
73 	}
74 
75 	/// Sets the handler called by the owner thread's event loop after the command is completed.
76 	shared(typeof(this)) onReady(void delegate() del) {
77 		shared FileReadyHandler handler;
78 		handler.del = del;
79 		handler.ctxt = this;
80 		try synchronized(this) m_handler = handler; catch {}
81 		return this;
82 	}
83 
84 	/// Creates a new buffer with the specified length and uses it to read the
85 	/// file data at the specified path starting at the specified offset byte.
86 	bool read(string file_path, size_t len = 128, ulong off = -1, bool create_if_not_exists = true, bool truncate_if_exists = false) {
87 		return read(file_path, new shared ubyte[len], off, create_if_not_exists, truncate_if_exists);
88 	}
89 
90 	/// Reads the file into the buffer starting at offset byte position.
91 	bool read(string file_path, shared ubyte[] buffer, ulong off = -1, bool create_if_not_exists = true, bool truncate_if_exists = false) 
92 	in { 
93 		assert(!m_busy, "File is busy or closed");
94 		assert(m_handler.ctxt !is null, "AsyncFile must be run before being operated on.");
95 	}
96 	body {
97 		if (buffer.length == 0) {
98 			try m_handler(); catch { }
99 			return true;
100 		}
101 		try {
102 			static string last_path;
103 			static string last_native_path;
104 			if (last_path == file_path)
105 				file_path = last_native_path;
106 			else {
107 				last_path = file_path;
108 				file_path = Path(file_path).toNativeString();
109 				last_native_path = file_path;
110 			}
111 
112 			bool flag;
113 			if (create_if_not_exists && !m_file && !exists(file_path))
114 				flag = true;
115 			else if (truncate_if_exists && (m_file || exists(file_path)))
116 				flag = true;
117 			if (flag) // touch
118 			{	
119 				if (file.isOpen) 
120 					file.close();
121 				import std.c.stdio;
122 				import std.string : toStringz;
123 				FILE * f = fopen(file_path.toStringz, "w\0".ptr);
124 				fclose(f);
125 			}
126 			
127 			if (!file.isOpen || m_cmdInfo.command != FileCmd.READ) {
128 				auto tmp = File(file_path, "rb");
129 				file = tmp;
130 				m_cmdInfo.command = FileCmd.READ;
131 			}
132 			if (buffer.length <= 65_536) {
133 				m_cmdInfo.buffer = cast(shared(ubyte[])) buffer;
134 
135 				if (off != -1)
136 					file.seek(cast(long)off);
137 				ubyte[] res;
138 				res = file.rawRead(cast(ubyte[])buffer);
139 				if (res)
140 					m_cursorOffset = cast(shared(ulong)) (off + res.length);
141 				m_handler();
142 				return true;
143 			}
144 		} catch (Exception e) {
145 			auto status = StatusInfo.init;
146 			status.code = Status.ERROR;
147 			try status.text = "Error in read, " ~ e.toString(); catch {}
148 			m_status = cast(shared) status;
149 			try m_handler(); catch { }
150 			return false;
151 		}
152 		try synchronized(m_cmdInfo.mtx) { 
153 			m_cmdInfo.buffer = buffer;
154 			m_cmdInfo.command = FileCmd.READ;
155 			filePath = Path(file_path);
156 		} catch {}
157 		offset = off;
158 		return sendCommand();
159 	}
160 
161 	/// Writes the data from the buffer into the file at the specified path starting at the
162 	/// given offset byte position.
163 	bool write(string file_path, shared const(ubyte)[] buffer, ulong off = -1, bool create_if_not_exists = true, bool truncate_if_exists = false) 
164 	in { 
165 		assert(!m_busy, "File is busy or closed"); 
166 		assert(m_handler.ctxt !is null, "AsyncFile must be run before being operated on.");
167 	}
168 	body {
169 		if (buffer.length == 0) {
170 			try m_handler(); catch { return false; }
171 			return true;
172 		}
173 		try {
174 
175 			static string last_path;
176 			static string last_native_path;
177 			if (last_path == file_path)
178 				file_path = last_native_path;
179 			else {
180 				last_path = file_path;
181 				file_path = Path(file_path).toNativeString();
182 				last_native_path = file_path;
183 			}
184 
185 			bool flag;
186 			if (create_if_not_exists && !m_file && !exists(file_path))
187 				flag = true;
188 			else if (truncate_if_exists && (m_file || exists(file_path)))
189 				flag = true;
190 			if (flag) // touch
191 			{	
192 				if (file.isOpen) 
193 					file.close();
194 				import std.c.stdio;
195 				import std.string : toStringz;
196 				FILE * f = fopen(file_path.toStringz, "w\0".ptr);
197 				fclose(f);
198 			}
199 			
200 			if (!file.isOpen || m_cmdInfo.command != FileCmd.WRITE) {
201 				auto tmp = File(file_path, "r+b");
202 				file = tmp;
203 				m_cmdInfo.command = FileCmd.WRITE;
204 			}
205 
206 			if (buffer.length <= 65_536) {
207 				m_cmdInfo.buffer = cast(shared(ubyte[])) buffer;
208 				if (off != -1)
209 					file.seek(cast(long)off);
210 				file.rawWrite(cast(ubyte[])buffer);
211 				file.flush();
212 				m_cursorOffset = cast(shared(ulong)) (off + buffer.length);
213 				m_handler();
214 				return true;
215 			}
216 		} catch (Exception e) {
217 			auto status = StatusInfo.init;
218 			status.code = Status.ERROR;
219 			try status.text = "Error in write, " ~ e.toString(); catch {}
220 			m_status = cast(shared) status;
221 			return false;
222 		}
223 		try synchronized(m_cmdInfo.mtx) { 
224 			m_cmdInfo.buffer = cast(shared(ubyte[])) buffer;
225 			m_cmdInfo.command = FileCmd.WRITE;
226 			filePath = Path(file_path);
227 		} catch {}
228 		offset = off;
229 		return sendCommand();
230 
231 	}
232 
233 	/// Appends the data from the buffer into a file at the specified path.
234 	bool append(string file_path, shared ubyte[] buffer, bool create_if_not_exists = true, bool truncate_if_exists = false)
235 	in {
236 		assert(!m_busy, "File is busy or closed");
237 		assert(m_handler.ctxt !is null, "AsyncFile must be run before being operated on.");
238 	}
239 	body {
240 		if (buffer.length == 0) {
241 			try m_handler(); catch { return false; }
242 			return true;
243 		}
244 		try {
245 			static string last_path;
246 			static string last_native_path;
247 			if (last_path == file_path)
248 				file_path = last_native_path;
249 			else {
250 				last_path = file_path;
251 				file_path = Path(file_path).toNativeString();
252 				last_native_path = file_path;
253 			}
254 
255 			bool flag;
256 			if (create_if_not_exists && !m_file && !exists(file_path))
257 				flag = true;
258 			else if (truncate_if_exists && (m_file || exists(file_path)))
259 				flag = true;
260 			if (flag) // touch
261 			{	
262 				if (file.isOpen) 
263 					file.close();
264 				import std.c.stdio;
265 				import std.string : toStringz;
266 				FILE * f = fopen(file_path.toStringz, "w\0".ptr);
267 				fclose(f);
268 			}
269 			
270 			if (!file.isOpen || m_cmdInfo.command != FileCmd.APPEND) {
271 				auto tmp = File(file_path, "a+");
272 				file = tmp;
273 				m_cmdInfo.command = FileCmd.APPEND;
274 			}
275 			if (buffer.length < 65_536) {
276 				m_cmdInfo.buffer = cast(shared(ubyte[])) buffer;
277 				file.rawWrite(cast(ubyte[]) buffer);
278 				m_cursorOffset = cast(shared(ulong)) file.size();
279 				file.flush();
280 				m_handler();
281 				return true;
282 			}
283 		} catch (Exception e) {
284 			auto status = StatusInfo.init;
285 			status.code = Status.ERROR;
286 			try status.text = "Error in append, " ~ e.toString(); catch {}
287 			m_status = cast(shared) status;
288 			return false;
289 		}
290 		try synchronized(m_cmdInfo.mtx) { 
291 			m_cmdInfo.buffer = cast(shared(ubyte[])) buffer;
292 			m_cmdInfo.command = FileCmd.APPEND;
293 			filePath = Path(file_path);
294 		} catch {}
295 		return sendCommand();
296 	}
297 
298 	private bool sendCommand() 
299 	in { assert(!waiting, "File is busy or closed"); }
300 	body {
301 		waiting = true;
302 		m_error = false;
303 		status = StatusInfo.init;
304 
305 		Waiter cmd_handler;
306 
307 		try {
308 			cmd_handler = popWaiter();
309 		
310 		} catch (Throwable e) {
311 			import std.stdio;
312 			try {
313 				status = StatusInfo(Status.ERROR, e.toString());
314 				m_error = true;
315 			} catch {}
316 			
317 			return false;
318 		}
319 
320 		assert(cmd_handler.cond, "Could not lock a thread for async operations. Note: Async file I/O in static constructors is unsupported.");
321 		
322 		m_cmdInfo.waiter = cast(shared) cmd_handler;
323 		try {
324 			synchronized(gs_wlock)
325 				gs_jobs.insert(CommandInfo(CmdInfoType.FILE, cast(void*) this));
326 			cmd_handler.cond.notifyAll();
327 		}
328 		catch (Exception e){
329 			static if (DEBUG) {
330 				import std.stdio;
331 				try writeln("Exception occured notifying foreign thread: ", e); catch {}
332 			}
333 		}
334 		return true;
335 	}
336 package:
337 
338 	synchronized @property FileCmdInfo cmdInfo() {
339 		return m_cmdInfo;
340 	}
341 
342 	synchronized @property Path filePath() {
343 		return cast(Path) m_cmdInfo.filePath;
344 	}
345 
346 	synchronized @property bool waiting() const {
347 		return cast(bool) m_busy;
348 	}
349 
350 	synchronized @property void filePath(Path file_path) {
351 		m_cmdInfo.filePath = cast(shared) file_path;
352 	}
353 
354 	synchronized @property File file() {
355 		scope(failure) assert(false);
356 		return cast()*m_file;
357 	}
358 
359 	synchronized @property void file(ref File f) {
360 		try (cast()*m_file).opAssign(f);
361 		catch (Exception e) {
362 			static if (DEBUG) {
363 				import std.stdio : writeln;
364 				try writeln(e.msg); catch {}
365 			}
366 		}
367 	}
368 	
369 	synchronized @property void status(StatusInfo stat) {
370 		m_status = cast(shared) stat;
371 	}
372 	
373 	synchronized @property void offset(ulong val) {
374 		m_cursorOffset = cast(shared) val;
375 	}
376 
377 	synchronized @property void waiting(bool b) {
378 		m_busy = cast(shared) b;
379 	}
380 
381 	void handler() {
382 		try m_handler();
383 		catch (Throwable e) {
384 			static if (DEBUG) {
385 				import std.stdio : writeln;
386 				try writeln("Failed to send command. ", e.toString()); catch {}
387 			}
388 		}
389 	}
390 }
391 
392 package enum FileCmd {
393 	READ,
394 	WRITE,
395 	APPEND
396 }
397 
398 package shared struct FileCmdInfo
399 {
400 	FileCmd command;
401 	Path filePath;
402 	ubyte[] buffer;
403 	Waiter waiter;
404 	AsyncSignal ready;
405 	AsyncFile file;
406 	Mutex mtx; // for buffer writing
407 }
408 
409 package shared struct FileReadyHandler {
410 	AsyncFile ctxt;
411 	void delegate() del;
412 	
413 	void opCall() {
414 		assert(ctxt !is null);
415 		ctxt.waiting = false;
416 		del();
417 		return;
418 	}
419 }