mizucp: count number of copied bytes/files
[mizunara.git] / mizucp / main.c
1 /*
2  * Copyright 2021 Olaf Wintermann
3  *
4  * Permission is hereby granted, free of charge, to any person obtaining a 
5  * copy of this software and associated documentation files (the "Software"), 
6  * to deal in the Software without restriction, including without limitation 
7  * the rights to use, copy, modify, merge, publish, distribute, sublicense, 
8  * and/or sell copies of the Software, and to permit persons to whom the 
9  * Software is furnished to do so, subject to the following conditions:
10  * 
11  * The above copyright notice and this permission notice shall be included in 
12  * all copies or substantial portions of the Software.
13  *
14  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 
15  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 
16  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL 
17  * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 
18  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING 
19  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER 
20  * DEALINGS IN THE SOFTWARE.
21  */
22
23 #include "main.h"
24
25 #include "srvctrl.h"
26 #include "atomic.h"
27
28 #include <stdio.h>
29 #include <stdlib.h>
30 #include <inttypes.h>
31 #include <string.h>
32 #include <unistd.h>
33 #include <signal.h>
34 #include <errno.h>
35 #include <time.h>
36 #include <sys/stat.h>
37 #include <sys/socket.h>
38 #include <sys/un.h>
39 #include <sys/fcntl.h>
40 #include <poll.h>
41 #include <dirent.h>
42
43 #include <libidav/utils.h>
44
45 #include <ucx/utils.h>
46
47
48 #define OPTSTR "hlpsuv"
49
50 static char *cfgdir;
51 static char *copydir;
52
53
54 static pthread_t scan_thread;
55
56 static pthread_t *copy_threads;
57 static size_t num_copy_threads;
58
59 static pthread_mutex_t queue_lock;
60 static pthread_cond_t  queue_available;
61 static MZQueue         *queue_begin;
62 static MZQueue         *queue_end;
63
64 static int scan_complete;
65
66
67 static uint64_t stat_num_files;
68 static uint64_t stat_copied_files;
69 static uint64_t stat_error_files;
70 static uint64_t stat_total_size;
71 static uint64_t stat_copied_size;
72
73 int main(int argc, char** argv) { 
74     int ret = 1;
75     
76     extern char *optarg;
77     extern int optind, opterr, optopt;
78     
79     CPSettings settings;
80     memset(&settings, 0, sizeof(CPSettings));
81     
82     int help = 0;
83     int version = 0;
84     int list = 0;    // list copying processes
85     
86     int c;
87     while((c = getopt(argc, argv, OPTSTR)) != -1) {
88         switch(c) {
89             case 'l': list = 1; break;
90             case 'p': settings.pause = 1; break;
91             case 's': settings.printsocket = 1; break;
92             case 'u': settings.url = 1; break;
93             case 'v': version = 1; break;
94         }
95     }
96     
97     int ac = argc - optind;
98     
99     if(list) {
100         // list command
101     } else if(help) {
102         // print help
103     } else if(version) {
104         // print version
105     } else if(ac == 2) {
106         // copy
107         settings.from = argv[optind];
108         settings.to   = argv[optind+1];
109         ret = mzcp_copy(&settings);
110     } else {
111         
112         // print usage
113     }
114     
115     return ret;
116 }
117
118 static int check_dir(const char *path) {
119     struct stat s;
120     if(stat(path, &s)) {
121         if(errno == ENOENT) {
122             if(mkdir(path, S_IRWXU)) {
123                 fprintf(stderr, "Cannot create %s: %s", path, strerror(errno));
124                 return 1;
125             }
126         } else {
127             fprintf(stderr, "Cannot access %s: %s", path, strerror(errno));
128             return 1;
129         }
130     }
131     return 0;
132 }
133
134 static int check_configdir(void) {
135     char *home = getenv(MZCP_ENV_HOME);
136     
137     char *base = util_concat_path(home, MZCP_CFG_BASE);
138     if(check_dir(base)) return 1;
139     free(base);
140
141     cfgdir = util_concat_path(home, MZCP_CFG_DIR);
142     if(check_dir(cfgdir)) return 1;
143     
144     copydir = util_concat_path(mzcp_get_cfgdir(), MZCP_COPY_DIR);
145     if(check_dir(copydir)) return 1;
146     
147     return 0;
148 }
149
150 const char* mzcp_get_cfgdir(void) {
151     return cfgdir;
152 }
153
154 const char* mzcp_get_copydir(void) {
155     return copydir;
156 }
157
158 static int init_queue(void) {
159     if(pthread_mutex_init(&queue_lock, NULL)) return 1;
160     if(pthread_cond_init(&queue_available, NULL)) return 1;
161     return 0;
162 }
163
164 static MZQueue* queue_root_elm_new(void) {
165     SrcFile *file = malloc(sizeof(SrcFile));
166     if(!file) return NULL;
167     memset(file, 0, sizeof(SrcFile));
168     
169     MZQueue *q = malloc(sizeof(MZQueue));
170     if(!q) return NULL;
171     
172     q->file = file;
173     q->next = NULL;
174     
175     return q;
176 }
177
178 int mzcp_copy(CPSettings *settings) {
179     int ret  = 0;
180     
181     if(check_configdir()) {
182         return 2;
183     }
184     
185     
186     if(create_control_socket()) {
187         return 3;
188     }
189     
190     if(init_queue()) {
191         return 4;
192     }
193     
194     if(settings->printsocket) {
195         printf("%s\n", mzcp_get_socketpath());
196     } else {
197         printf("copy %s to %s\n", settings->from, settings->to);
198         if(settings->pause) {
199             printf("pause\n");
200         }
201     }
202     
203     //pid_t p = fork();
204     pid_t p = 0;
205     if(p == 0) {
206         //close(0);
207         //close(1);
208         //close(2);
209         
210         if(mzcp_start_scan(settings)) {
211             return 2;
212         }
213         
214         if(mzcp_start_copy_threads(settings)) {
215             
216             return 3;
217         }
218         
219         ret =  mzcp_srvctrl(settings);
220     }
221     
222     return ret;
223 }
224
225
226 int mzcp_start_scan(CPSettings *settings) {
227     if(stat(settings->from, &settings->root_stat)) {
228         // TODO: error
229         return 1;
230     }
231     
232     if(!S_ISDIR(settings->root_stat.st_mode)) {
233         // queue single file
234         queue_begin = queue_root_elm_new();
235         if(!queue_begin) {
236             return 1;
237         }
238         queue_end = queue_begin;
239         scan_complete = 1;
240         settings->num_threads = 1;
241     } else {
242         // scan src directory in a separate thread
243         if(pthread_create(&scan_thread, NULL, scan_run, settings)) {
244             // TODO: we need some clever error handling
245             // we are already in the forked procress and stdout/stderr are closed
246             // maybe wait for someone to connect to the unix domain socket
247             return 1;
248         }
249     }
250     
251     return 0;
252 }
253
254
255 void* scan_run(void *data) {
256     CPSettings *settings = data;
257     
258     UcxList *stack = NULL;
259     
260     char *root = strdup("");
261     
262     SrcFile *file = calloc(1, sizeof(SrcFile));
263     if(!file) {
264         scan_complete = 1;
265         // TODO: error
266         return NULL;
267     }
268     file->path = root;
269     file->isdir = 1;
270     file->mode = settings->root_stat.st_mode;
271     if(enqueue_file(file)) {
272         scan_complete = 1;
273         // TODO: error
274         return NULL;
275     }
276     
277     stack = ucx_list_prepend(NULL, file);
278     while(stack) {
279         SrcFile *elm = stack->data;
280         UcxList *next = stack->next;
281         free(stack);
282         stack = next;
283         
284         char *path = util_concat_path(settings->from, elm->path);
285         
286         int dir_fd = open(path, O_RDONLY);
287         if(dir_fd < 0) {
288             // TODO: error?
289             continue;
290         }
291         
292         // read directory and enqueue all children
293         DIR *dir = fdopendir(dir_fd);
294         struct dirent *ent;
295         while((ent = readdir(dir)) != NULL) {
296             char *name = ent->d_name;
297             if(!strcmp(name, ".") || !strcmp(name, "..")) {
298                 continue;
299             }
300             
301             struct stat s;
302             if(fstatat(dir_fd, name, &s, 0)) {
303                 // TODO: error?
304                 continue;
305             }
306             
307             SrcFile *f = calloc(1, sizeof(SrcFile));
308             f->path = util_concat_path(elm->path, name);
309             f->isdir = S_ISDIR(s.st_mode);
310             f->size = s.st_size;
311             f->mode = s.st_mode;
312             f->depends_on = elm;
313             
314             if(enqueue_file(f)) {
315                 // TODO: error?
316                 fprintf(stderr, "enqueue failed\n");
317                 break;
318             } else {
319                 mz_atomic_inc64(&stat_num_files);
320                 int64_t sz = s.st_size;
321                 mz_atomic_add64(&stat_total_size, sz);
322             }
323             
324             // put dir on stack
325             if(f->isdir) {
326                 stack = ucx_list_prepend(stack, f);
327             }
328         }
329         
330         closedir(dir);
331     }
332     
333     scan_complete = 1;
334     
335     return NULL;
336 }
337
338 int mzcp_start_copy_threads(CPSettings *settings) {
339     if(settings->num_threads == 0) {
340         num_copy_threads = 1;
341     } else if(settings->num_threads > MAX_COPY_THREADS) {
342         num_copy_threads = MAX_COPY_THREADS;
343     } else {
344         num_copy_threads = settings->num_threads;
345     }
346     
347     copy_threads = calloc(num_copy_threads, sizeof(pthread_t));
348     
349     int f = 0;
350     for(int i=0;i<num_copy_threads;i++) {
351         if(pthread_create(&copy_threads[i], NULL, copy_run, settings)) {
352             f++;
353         }
354     }
355     
356     return f < num_copy_threads ? 0 : 1;
357 }
358
359 int enqueue_file(SrcFile *file) {
360     MZQueue *q = malloc(sizeof(MZQueue));
361     if(!q) {
362         return 1;
363     }
364     
365     q->file = file;
366     q->next = NULL;
367     
368     pthread_mutex_lock(&queue_lock);
369     
370     if(queue_end) {
371         queue_end->next = q;
372         queue_end = q;
373     } else {
374         queue_begin = q;
375         queue_end = q;
376     }
377     
378     pthread_cond_signal(&queue_available);
379     pthread_mutex_unlock(&queue_lock);
380     
381     return 0;
382 }
383
384 static SrcFile* queue_get_file(void) {
385     SrcFile *file = NULL;
386     pthread_mutex_lock(&queue_lock);
387     
388     MZQueue *q = NULL;
389     while(!q) {
390         if(!queue_begin) {
391             if(scan_complete) {
392                 break;
393             }
394             pthread_cond_wait(&queue_available, &queue_lock);
395             continue;
396         } else {
397             q = queue_begin;
398             queue_begin = queue_begin->next;
399             if(!queue_begin) {
400                 queue_end = NULL;
401             }
402         }
403     }
404     
405     if(q) {
406         file = q->file;
407         free(q);
408     }
409     
410     pthread_mutex_unlock(&queue_lock);
411     
412     return file;
413 }
414
415 void* copy_run(void *data) {
416     CPSettings *settings = data;
417     
418     char *buffer = malloc(MZ_COPY_BUFSIZE);
419     size_t bufsize = MZ_COPY_BUFSIZE;
420     
421     for(;;) {
422         SrcFile *file = queue_get_file();
423         if(!file) {
424             break;
425         }
426         
427         char *from = file->path ? util_concat_path(settings->from, file->path) : settings->from;  
428         char *to = util_concat_path(settings->to, file->path ? file->path : util_resource_name(settings->from));
429         
430         size_t from_len = strlen(from);
431         size_t to_len = strlen(to);
432         
433         if(from[from_len-1] == '/') {
434             from[from_len-1] = 0;
435         }
436         if(to[to_len-1] == '/') {
437             to[to_len-1] = 0;
438         }
439         
440         if(file->depends_on) {
441             SrcFile *dep = file->depends_on;
442             // check first without lock
443             if(dep->status == 0) {
444                 if(dep->lock) {
445                     pthread_mutex_lock(&dep->lock->mutex);
446                     if(file->depends_on->status == 0) {
447                         pthread_cond_wait(&dep->lock->cond, &dep->lock->mutex);
448                     }
449                     pthread_mutex_unlock(&dep->lock->mutex);
450                 } else {
451                     // locking disabled (because we have only one thread)
452                     // but in that case the file status can't be 0
453                     // therefore this case here should not happen
454                     file->status = -1;
455                 }
456             }
457             // check again
458             if(dep->status == -1) {
459                 file->status = -1;
460             }
461         }
462         
463         int ret;
464         if(file->status == 0) {
465             if(file->isdir) {
466             ret = mz_copy_dir(settings, file, from, to);
467             } else {
468                 ret = mz_copy_file(settings, file, from, to, buffer, bufsize);
469             }
470         }
471         
472         free(to);
473         if(from != settings->from) {
474             free(from);
475         }
476     }
477     
478     return NULL;
479 }
480
481 static void file_set_status(SrcFile *file, int status) {
482     if(file->lock) {
483         pthread_mutex_lock(&file->lock->mutex);
484         file->status = status;
485         pthread_cond_broadcast(&file->lock->cond);
486         pthread_mutex_unlock(&file->lock->mutex);
487     } else {
488         file->status = status;
489     }
490 }
491
492 int mz_copy_dir(CPSettings *settings, SrcFile *file, const char *from, const char *to) {
493     printf("mkdir %s\n", to);
494     int ret = mkdir(to, file->mode);
495     if(ret) {
496         if(errno == EEXIST) {
497             struct stat s;
498             if(!stat(to, &s)) {
499                 if(S_ISDIR(s.st_mode)) {
500                     ret = 0;
501                 }
502             }
503         }
504     }
505     file_set_status(file, !ret ? 1 : -1);
506     return ret;
507 }
508
509 int mz_copy_file(CPSettings *settings, SrcFile *file, const char *from, const char *to, char *buffer, size_t bufsize) {
510     printf("cp %s %s\n", from, to);
511     int fin = open(from, O_RDONLY);
512     if(fin < 0) {
513         file_set_status(file, -1);
514         return 1;
515     }
516     
517     int ret = 0;
518     
519     int fout = open(to, O_WRONLY|O_CREAT, file->mode);
520     if(fout < 0) {
521         perror("open");
522         close(fin);
523         file_set_status(file, -1);
524         return 1;
525     }
526     
527     int64_t copied = 0;
528     ssize_t r;
529     while((r = read(fin, buffer, bufsize)) > 0) {
530         ssize_t w = write(fout, buffer, r);
531         if(w > 0) {
532             mz_atomic_add64(&stat_copied_size, w);
533             copied += w;
534         }
535         if(w != r) {
536             ret = 1;
537             break;
538         }
539     }
540     
541     close(fin);
542     close(fout);
543     
544     if(!ret) {
545         file_set_status(file, 1);
546         mz_atomic_inc64(&stat_copied_files);
547         if(copied != file->size) {
548             // size changed after scan -> readjust total size
549             int64_t filesz_diff = copied - file->size;
550             mz_atomic_add64(&stat_total_size, filesz_diff);
551         }
552     } else {
553         file_set_status(file, -1);
554         mz_atomic_inc64(&stat_error_files);
555         if(copied != file->size) {
556             // count the full file size as copied, although we had an error
557             int64_t filesz_diff = file->size - copied;
558             mz_atomic_add64(&stat_copied_size, filesz_diff);
559         }
560     }
561     
562     return ret;
563 }