1 ///
2 module libasync.file;
3 import libasync.types;
4 import libasync.events;
5 import core.thread : Thread, ThreadGroup;
6 import core.sync.mutex;
7 import core.sync.condition;
8 import std.stdio;
9 import core.atomic;
10 import libasync.internals.path;
11 import libasync.threads;
12 import std.file;
13 import std.conv : to;
14 import libasync.internals.memory;
15 import libasync.internals.logging;
16 
17 /// Runs all blocking file I/O commands in a thread pool and calls the handler
18 /// upon completion.
19 shared final class AsyncFile
20 {
21 nothrow:
22 private:
23 	EventLoop m_evLoop;
24 	bool m_busy;
25 	bool m_error;
26 	FileReadyHandler m_handler;
27 	FileCmdInfo m_cmdInfo;
28 	StatusInfo m_status;
29 	ulong m_cursorOffset;
30 	Thread m_owner;
31 	File* m_file;
32 
33 public:
34 	///
35 	this(EventLoop evl) {
36 		m_evLoop = cast(shared) evl;
37 		m_cmdInfo.ready = new shared AsyncSignal(cast(EventLoop)m_evLoop);
38 		m_cmdInfo.ready.run(cast(void delegate())&handler);
39 		m_owner = cast(shared)Thread.getThis();
40 		m_file = cast(shared)new File;
41 		try m_cmdInfo.mtx = cast(shared) new Mutex; catch (Exception) {}
42 	}
43 
44 	/// Cleans up the underlying resources. todo: make this dispose?
45 	bool kill() {
46 		scope(failure) assert(false);
47 		if (file.isOpen)
48 			(cast()*m_file).close();
49 		(cast()*m_file).__dtor();
50 		m_cmdInfo.ready.kill();
51 		m_cmdInfo = typeof(m_cmdInfo).init;
52 		m_handler = typeof(m_handler).init;
53 		return true;
54 	}
55 
56 	///
57 	synchronized @property StatusInfo status() const
58 	{
59 		return cast(StatusInfo) m_status;
60 	}
61 
62 	///
63 	@property string error() const
64 	{
65 		return status.text;
66 	}
67 
68 	/// Retrieve the buffer from the last command. Must be called upon completion.
69 	shared(ubyte[]) buffer() {
70 		try synchronized(m_cmdInfo.mtx)
71 			return m_cmdInfo.buffer;
72 		catch (Exception) {}
73 		return null;
74 	}
75 
76 	/// The current offset updated after the command execution
77 	synchronized @property ulong offset() const {
78 		return m_cursorOffset;
79 	}
80 
81 	/// Sets the handler called by the owner thread's event loop after the command is completed.
82 	shared(typeof(this)) onReady(void delegate() del) {
83 		shared FileReadyHandler handler;
84 		handler.del = del;
85 		handler.ctxt = this;
86 		try synchronized(this) m_handler = handler; catch (Exception) {}
87 		return this;
88 	}
89 
90 	/// Creates a new buffer with the specified length and uses it to read the
91 	/// file data at the specified path starting at the specified offset byte.
92 	bool read(string file_path, size_t len = 128, ulong off = -1, bool create_if_not_exists = true, bool truncate_if_exists = false) {
93 		return read(file_path, new shared ubyte[len], off, create_if_not_exists, truncate_if_exists);
94 	}
95 
96 	/// Reads the file into the buffer starting at offset byte position.
97 	bool read(string file_path, shared ubyte[] buffer, ulong off = -1, bool create_if_not_exists = true, bool truncate_if_exists = false)
98 	in {
99 		assert(!m_busy, "File is busy or closed");
100 		assert(m_handler.ctxt !is null, "AsyncFile must be run before being operated on.");
101 	}
102 	body {
103 		if (buffer.length == 0) {
104 			try m_handler(); catch (Exception) { }
105 			return true;
106 		}
107 		try {
108 			static string last_path;
109 			static string last_native_path;
110 			if (last_path == file_path)
111 				file_path = last_native_path;
112 			else {
113 				last_path = file_path;
114 				file_path = Path(file_path).toNativeString();
115 				last_native_path = file_path;
116 			}
117 
118 			bool flag;
119 			if (create_if_not_exists && !m_file && !exists(file_path))
120 				flag = true;
121 			else if (truncate_if_exists && (m_file || exists(file_path)))
122 				flag = true;
123 			if (flag) // touch
124 			{
125 				if (file.isOpen)
126 					file.close();
127 				import core.stdc.stdio;
128 				import std..string : toStringz;
129 				FILE * f = fopen(file_path.toStringz, "w\0".ptr);
130 				fclose(f);
131 			}
132 
133 			if (!file.isOpen || m_cmdInfo.command != FileCmd.READ) {
134 				auto tmp = File(file_path, "rb");
135 				file = tmp;
136 				m_cmdInfo.command = FileCmd.READ;
137 			}
138 			if (buffer.length <= 65_536) {
139 				m_cmdInfo.buffer = cast(shared(ubyte[])) buffer;
140 
141 				if (off != -1)
142 					file.seek(cast(long)off);
143 				ubyte[] res;
144 				res = file.rawRead(cast(ubyte[])buffer);
145 				if (res)
146 					m_cursorOffset = cast(shared(ulong)) (off + res.length);
147 				m_handler();
148 				return true;
149 			}
150 		} catch (Exception e) {
151 			auto status = StatusInfo.init;
152 			status.code = Status.ERROR;
153 			try status.text = "Error in read, " ~ e.toString(); catch (Exception) {}
154 			m_status = cast(shared) status;
155 			try m_handler(); catch (Exception) { }
156 			return false;
157 		}
158 		try synchronized(m_cmdInfo.mtx) {
159 			m_cmdInfo.buffer = buffer;
160 			m_cmdInfo.command = FileCmd.READ;
161 			filePath = Path(file_path);
162 		} catch (Exception) {}
163 		offset = off;
164 
165 		return doOffThread({ process(this); });
166 	}
167 
168 	/// Writes the data from the buffer into the file at the specified path starting at the
169 	/// given offset byte position.
170 	bool write(string file_path, shared const(ubyte)[] buffer, ulong off = -1, bool create_if_not_exists = true, bool truncate_if_exists = false)
171 	in {
172 		assert(!m_busy, "File is busy or closed");
173 		assert(m_handler.ctxt !is null, "AsyncFile must be run before being operated on.");
174 	}
175 	body {
176 		if (buffer.length == 0) {
177 			try m_handler(); catch (Exception) { return false; }
178 			return true;
179 		}
180 		try {
181 
182 			static string last_path;
183 			static string last_native_path;
184 			if (last_path == file_path)
185 				file_path = last_native_path;
186 			else {
187 				last_path = file_path;
188 				file_path = Path(file_path).toNativeString();
189 				last_native_path = file_path;
190 			}
191 
192 			bool flag;
193 			if (create_if_not_exists && !m_file && !exists(file_path))
194 				flag = true;
195 			else if (truncate_if_exists && (m_file || exists(file_path)))
196 				flag = true;
197 			if (flag) // touch
198 			{
199 				if (file.isOpen)
200 					file.close();
201 				import core.stdc.stdio;
202 				import std..string : toStringz;
203 				FILE * f = fopen(file_path.toStringz, "w\0".ptr);
204 				fclose(f);
205 			}
206 
207 			if (!file.isOpen || m_cmdInfo.command != FileCmd.WRITE) {
208 				auto tmp = File(file_path, "r+b");
209 				file = tmp;
210 				m_cmdInfo.command = FileCmd.WRITE;
211 			}
212 
213 			if (buffer.length <= 65_536) {
214 				m_cmdInfo.buffer = cast(shared(ubyte[])) buffer;
215 				if (off != -1)
216 					file.seek(cast(long)off);
217 				file.rawWrite(cast(ubyte[])buffer);
218 				file.flush();
219 				m_cursorOffset = cast(shared(ulong)) (off + buffer.length);
220 				m_handler();
221 				return true;
222 			}
223 		} catch (Exception e) {
224 			auto status = StatusInfo.init;
225 			status.code = Status.ERROR;
226 			try status.text = "Error in write, " ~ e.toString(); catch (Exception) {}
227 			m_status = cast(shared) status;
228 			return false;
229 		}
230 		try synchronized(m_cmdInfo.mtx) {
231 			m_cmdInfo.buffer = cast(shared(ubyte[])) buffer;
232 			m_cmdInfo.command = FileCmd.WRITE;
233 			filePath = Path(file_path);
234 		} catch (Exception) {}
235 		offset = off;
236 
237 		return doOffThread({ process(this); });
238 	}
239 
240 	/// Appends the data from the buffer into a file at the specified path.
241 	bool append(string file_path, shared ubyte[] buffer, bool create_if_not_exists = true, bool truncate_if_exists = false)
242 	in {
243 		assert(!m_busy, "File is busy or closed");
244 		assert(m_handler.ctxt !is null, "AsyncFile must be run before being operated on.");
245 	}
246 	body {
247 		if (buffer.length == 0) {
248 			try m_handler(); catch (Exception) { return false; }
249 			return true;
250 		}
251 		try {
252 			static string last_path;
253 			static string last_native_path;
254 			if (last_path == file_path)
255 				file_path = last_native_path;
256 			else {
257 				last_path = file_path;
258 				file_path = Path(file_path).toNativeString();
259 				last_native_path = file_path;
260 			}
261 
262 			bool flag;
263 			if (create_if_not_exists && !m_file && !exists(file_path))
264 				flag = true;
265 			else if (truncate_if_exists && (m_file || exists(file_path)))
266 				flag = true;
267 			if (flag) // touch
268 			{
269 				if (file.isOpen)
270 					file.close();
271 				import core.stdc.stdio;
272 				import std..string : toStringz;
273 				FILE * f = fopen(file_path.toStringz, "w\0".ptr);
274 				fclose(f);
275 			}
276 
277 			if (!file.isOpen || m_cmdInfo.command != FileCmd.APPEND) {
278 				auto tmp = File(file_path, "a+");
279 				file = tmp;
280 				m_cmdInfo.command = FileCmd.APPEND;
281 			}
282 			if (buffer.length < 65_536) {
283 				m_cmdInfo.buffer = cast(shared(ubyte[])) buffer;
284 				file.rawWrite(cast(ubyte[]) buffer);
285 				m_cursorOffset = cast(shared(ulong)) file.size();
286 				file.flush();
287 				m_handler();
288 				return true;
289 			}
290 		} catch (Exception e) {
291 			auto status = StatusInfo.init;
292 			status.code = Status.ERROR;
293 			try status.text = "Error in append, " ~ e.toString(); catch (Exception) {}
294 			m_status = cast(shared) status;
295 			return false;
296 		}
297 		try synchronized(m_cmdInfo.mtx) {
298 			m_cmdInfo.buffer = cast(shared(ubyte[])) buffer;
299 			m_cmdInfo.command = FileCmd.APPEND;
300 			filePath = Path(file_path);
301 		} catch (Exception) {}
302 
303 		return doOffThread({ process(this); });
304 	}
305 
306 package:
307 
308 	synchronized @property FileCmdInfo cmdInfo() {
309 		return m_cmdInfo;
310 	}
311 
312 	synchronized @property Path filePath() {
313 		return cast(Path) m_cmdInfo.filePath;
314 	}
315 
316 	synchronized @property bool waiting() const {
317 		return cast(bool) m_busy;
318 	}
319 
320 	synchronized @property void filePath(Path file_path) {
321 		m_cmdInfo.filePath = cast(shared) file_path;
322 	}
323 
324 	synchronized @property File file() {
325 		scope(failure) assert(false);
326 		return cast()*m_file;
327 	}
328 
329 	synchronized @property void file(ref File f) {
330 		try (cast()*m_file).opAssign(f);
331 		catch (Exception e) {
332 			trace(e.msg);
333 		}
334 	}
335 
336 	synchronized @property void status(StatusInfo stat) {
337 		m_status = cast(shared) stat;
338 	}
339 
340 	synchronized @property void offset(ulong val) {
341 		m_cursorOffset = cast(shared) val;
342 	}
343 
344 	synchronized @property void waiting(bool b) {
345 		m_busy = cast(shared) b;
346 	}
347 
348 	void handler() {
349 		try m_handler();
350 		catch (Throwable e) {
351 			trace("Failed to send command. ", e.toString());
352 		}
353 	}
354 }
355 
356 package enum FileCmd {
357 	READ,
358 	WRITE,
359 	APPEND
360 }
361 
362 package shared struct FileCmdInfo
363 {
364 	FileCmd command;
365 	Path filePath;
366 	ubyte[] buffer;
367 	AsyncSignal ready;
368 	AsyncFile file;
369 	Mutex mtx; // for buffer writing
370 }
371 
372 package shared struct FileReadyHandler {
373 	AsyncFile ctxt;
374 	void delegate() del;
375 
376 	void opCall() {
377 		assert(ctxt !is null);
378 		ctxt.waiting = false;
379 		del();
380 		return;
381 	}
382 }
383 
384 private void process(shared AsyncFile ctxt) {
385 	auto cmdInfo = ctxt.cmdInfo;
386 	auto mutex = cmdInfo.mtx;
387 	FileCmd cmd;
388 	cmd = cmdInfo.command;
389 
390 	try final switch (cmd)
391 	{
392 		case FileCmd.READ:
393 			File file = ctxt.file;
394 			if (ctxt.offset != -1)
395 				file.seek(cast(long) ctxt.offset);
396 			ubyte[] res;
397 			synchronized(mutex) res = file.rawRead(cast(ubyte[])ctxt.buffer);
398 			if (res)
399 				ctxt.offset = cast(ulong) (ctxt.offset + res.length);
400 
401 			break;
402 
403 		case FileCmd.WRITE:
404 
405 			File file = cast(File) ctxt.file;
406 			if (ctxt.offset != -1)
407 				file.seek(cast(long) ctxt.offset);
408 			synchronized(mutex) file.rawWrite(cast(ubyte[]) ctxt.buffer);
409 			file.flush();
410 			ctxt.offset = cast(ulong) (ctxt.offset + ctxt.buffer.length);
411 			break;
412 
413 		case FileCmd.APPEND:
414 			File file = cast(File) ctxt.file;
415 			synchronized(mutex) file.rawWrite(cast(ubyte[]) ctxt.buffer);
416 			ctxt.offset = cast(ulong) file.size();
417 			file.flush();
418 			break;
419 	} catch (Throwable e) {
420 		auto status = StatusInfo.init;
421 		status.code = Status.ERROR;
422 		try status.text = "Error in " ~  cmd.to!string ~ ", " ~ e.toString(); catch (Exception) {}
423 		ctxt.status = status;
424 	}
425 
426 	try cmdInfo.ready.trigger(getThreadEventLoop());
427 	catch (Throwable e) {
428 		trace("AsyncFile Thread Error: ", e.toString());
429 		auto status = StatusInfo.init;
430 		status.code = Status.ERROR;
431 		try status.text = e.toString(); catch (Exception) {}
432 		ctxt.status = status;
433 	}
434 }