minimal copy
[mizunara.git] / mizucp / main.c
1 /*
2  * Copyright 2021 Olaf Wintermann
3  *
4  * Permission is hereby granted, free of charge, to any person obtaining a 
5  * copy of this software and associated documentation files (the "Software"), 
6  * to deal in the Software without restriction, including without limitation 
7  * the rights to use, copy, modify, merge, publish, distribute, sublicense, 
8  * and/or sell copies of the Software, and to permit persons to whom the 
9  * Software is furnished to do so, subject to the following conditions:
10  * 
11  * The above copyright notice and this permission notice shall be included in 
12  * all copies or substantial portions of the Software.
13  *
14  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 
15  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 
16  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL 
17  * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 
18  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING 
19  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER 
20  * DEALINGS IN THE SOFTWARE.
21  */
22
23 #include "main.h"
24
25 #include "srvctrl.h"
26
27 #include <stdio.h>
28 #include <stdlib.h>
29 #include <string.h>
30 #include <unistd.h>
31 #include <signal.h>
32 #include <errno.h>
33 #include <time.h>
34 #include <sys/stat.h>
35 #include <sys/socket.h>
36 #include <sys/un.h>
37 #include <sys/fcntl.h>
38 #include <poll.h>
39 #include <dirent.h>
40
41 #include <libidav/utils.h>
42
43 #include <ucx/utils.h>
44
45
46 #define OPTSTR "hlpsuv"
47
48 static char *cfgdir;
49 static char *copydir;
50
51
52 static pthread_t scan_thread;
53
54 static pthread_t *copy_threads;
55 static size_t num_copy_threads;
56
57 static pthread_mutex_t queue_lock;
58 static pthread_cond_t  queue_available;
59 static MZQueue         *queue_begin;
60 static MZQueue         *queue_end;
61
62 static int scan_complete;
63
64 int main(int argc, char** argv) { 
65     int ret = 1;
66     
67     extern char *optarg;
68     extern int optind, opterr, optopt;
69     
70     CPSettings settings;
71     memset(&settings, 0, sizeof(CPSettings));
72     
73     int help = 0;
74     int version = 0;
75     int list = 0;    // list copying processes
76     
77     int c;
78     while((c = getopt(argc, argv, OPTSTR)) != -1) {
79         switch(c) {
80             case 'l': list = 1; break;
81             case 'p': settings.pause = 1; break;
82             case 's': settings.printsocket = 1; break;
83             case 'u': settings.url = 1; break;
84             case 'v': version = 1; break;
85         }
86     }
87     
88     int ac = argc - optind;
89     
90     if(list) {
91         // list command
92     } else if(help) {
93         // print help
94     } else if(version) {
95         // print version
96     } else if(ac == 2) {
97         // copy
98         settings.from = argv[optind];
99         settings.to   = argv[optind+1];
100         ret = mzcp_copy(&settings);
101     } else {
102         
103         // print usage
104     }
105     
106     return ret;
107 }
108
109 static int check_dir(const char *path) {
110     struct stat s;
111     if(stat(path, &s)) {
112         if(errno == ENOENT) {
113             if(mkdir(path, S_IRWXU)) {
114                 fprintf(stderr, "Cannot create %s: %s", path, strerror(errno));
115                 return 1;
116             }
117         } else {
118             fprintf(stderr, "Cannot access %s: %s", path, strerror(errno));
119             return 1;
120         }
121     }
122     return 0;
123 }
124
125 static int check_configdir(void) {
126     char *home = getenv(MZCP_ENV_HOME);
127     
128     char *base = util_concat_path(home, MZCP_CFG_BASE);
129     if(check_dir(base)) return 1;
130     free(base);
131
132     cfgdir = util_concat_path(home, MZCP_CFG_DIR);
133     if(check_dir(cfgdir)) return 1;
134     
135     copydir = util_concat_path(mzcp_get_cfgdir(), MZCP_COPY_DIR);
136     if(check_dir(copydir)) return 1;
137     
138     return 0;
139 }
140
141 const char* mzcp_get_cfgdir(void) {
142     return cfgdir;
143 }
144
145 const char* mzcp_get_copydir(void) {
146     return copydir;
147 }
148
149 static int init_queue(void) {
150     if(pthread_mutex_init(&queue_lock, NULL)) return 1;
151     if(pthread_cond_init(&queue_available, NULL)) return 1;
152     return 0;
153 }
154
155 static MZQueue* queue_root_elm_new(void) {
156     SrcFile *file = malloc(sizeof(SrcFile));
157     if(!file) return NULL;
158     memset(file, 0, sizeof(SrcFile));
159     
160     MZQueue *q = malloc(sizeof(MZQueue));
161     if(!q) return NULL;
162     
163     q->file = file;
164     q->next = NULL;
165     
166     return q;
167 }
168
169 int mzcp_copy(CPSettings *settings) {
170     int ret  = 0;
171     
172     if(check_configdir()) {
173         return 2;
174     }
175     
176     
177     if(create_control_socket()) {
178         return 3;
179     }
180     
181     if(init_queue()) {
182         return 4;
183     }
184     
185     if(settings->printsocket) {
186         printf("%s\n", mzcp_get_socketpath());
187     } else {
188         printf("copy %s to %s\n", settings->from, settings->to);
189         if(settings->pause) {
190             printf("pause\n");
191         }
192     }
193     
194     //pid_t p = fork();
195     pid_t p = 0;
196     if(p == 0) {
197         //close(0);
198         //close(1);
199         //close(2);
200         
201         if(mzcp_start_scan(settings)) {
202             return 2;
203         }
204         
205         if(mzcp_start_copy_threads(settings)) {
206             
207             return 3;
208         }
209         
210         ret =  mzcp_srvctrl(settings);
211     }
212     
213     return ret;
214 }
215
216
217 int mzcp_start_scan(CPSettings *settings) {
218     if(stat(settings->from, &settings->root_stat)) {
219         // TODO: error
220         return 1;
221     }
222     
223     if(!S_ISDIR(settings->root_stat.st_mode)) {
224         // queue single file
225         queue_begin = queue_root_elm_new();
226         if(!queue_begin) {
227             return 1;
228         }
229         queue_end = queue_begin;
230         scan_complete = 1;
231         settings->num_threads = 1;
232     } else {
233         // scan src directory in a separate thread
234         if(pthread_create(&scan_thread, NULL, scan_run, settings)) {
235             // TODO: we need some clever error handling
236             // we are already in the forked procress and stdout/stderr are closed
237             // maybe wait for someone to connect to the unix domain socket
238             return 1;
239         }
240     }
241     
242     return 0;
243 }
244
245
246 void* scan_run(void *data) {
247     CPSettings *settings = data;
248     
249     UcxList *stack = NULL;
250     
251     char *root = strdup("");
252     
253     SrcFile *file = calloc(1, sizeof(SrcFile));
254     if(!file) {
255         scan_complete = 1;
256         // TODO: error
257         return NULL;
258     }
259     file->path = root;
260     file->isdir = 1;
261     file->mode = settings->root_stat.st_mode;
262     if(enqueue_file(file)) {
263         scan_complete = 1;
264         // TODO: error
265         return NULL;
266     }
267     
268     stack = ucx_list_prepend(NULL, file);
269     while(stack) {
270         SrcFile *elm = stack->data;
271         UcxList *next = stack->next;
272         free(stack);
273         stack = next;
274         
275         char *path = util_concat_path(settings->from, elm->path);
276         
277         int dir_fd = open(path, O_RDONLY);
278         if(dir_fd < 0) {
279             // TODO: error?
280             continue;
281         }
282         
283         // read directory and enqueue all children
284         DIR *dir = fdopendir(dir_fd);
285         struct dirent *ent;
286         while((ent = readdir(dir)) != NULL) {
287             char *name = ent->d_name;
288             if(!strcmp(name, ".") || !strcmp(name, "..")) {
289                 continue;
290             }
291             
292             struct stat s;
293             if(fstatat(dir_fd, name, &s, 0)) {
294                 // TODO: error?
295                 continue;
296             }
297             
298             SrcFile *f = calloc(1, sizeof(SrcFile));
299             f->path = util_concat_path(elm->path, name);
300             f->isdir = S_ISDIR(s.st_mode);
301             f->mode = s.st_mode;
302             f->depends_on = elm;
303             
304             if(enqueue_file(f)) {
305                 // TODO: error?
306                 fprintf(stderr, "enqueue failed\n");
307                 break;
308             }
309             
310             // put dir on stack
311             if(f->isdir) {
312                 stack = ucx_list_prepend(stack, f);
313             }
314         }
315         
316         closedir(dir);
317     }
318     
319     scan_complete = 1;
320     
321     return NULL;
322 }
323
324 int mzcp_start_copy_threads(CPSettings *settings) {
325     if(settings->num_threads == 0) {
326         num_copy_threads = 1;
327     } else if(settings->num_threads > MAX_COPY_THREADS) {
328         num_copy_threads = MAX_COPY_THREADS;
329     } else {
330         num_copy_threads = settings->num_threads;
331     }
332     
333     copy_threads = calloc(num_copy_threads, sizeof(pthread_t));
334     
335     int f = 0;
336     for(int i=0;i<num_copy_threads;i++) {
337         if(pthread_create(&copy_threads[i], NULL, copy_run, settings)) {
338             f++;
339         }
340     }
341     
342     return f < num_copy_threads ? 0 : 1;
343 }
344
345 int enqueue_file(SrcFile *file) {
346     MZQueue *q = malloc(sizeof(MZQueue));
347     if(!q) {
348         return 1;
349     }
350     
351     q->file = file;
352     q->next = NULL;
353     
354     pthread_mutex_lock(&queue_lock);
355     
356     if(queue_end) {
357         queue_end->next = q;
358         queue_end = q;
359     } else {
360         queue_begin = q;
361         queue_end = q;
362     }
363     
364     pthread_cond_signal(&queue_available);
365     pthread_mutex_unlock(&queue_lock);
366     
367     return 0;
368 }
369
370 static SrcFile* queue_get_file(void) {
371     SrcFile *file = NULL;
372     pthread_mutex_lock(&queue_lock);
373     
374     MZQueue *q = NULL;
375     while(!q) {
376         if(!queue_begin) {
377             if(scan_complete) {
378                 break;
379             }
380             pthread_cond_wait(&queue_available, &queue_lock);
381             continue;
382         } else {
383             q = queue_begin;
384             queue_begin = queue_begin->next;
385             if(!queue_begin) {
386                 queue_end = NULL;
387             }
388         }
389     }
390     
391     if(q) {
392         file = q->file;
393         free(q);
394     }
395     
396     pthread_mutex_unlock(&queue_lock);
397     
398     return file;
399 }
400
401 void* copy_run(void *data) {
402     CPSettings *settings = data;
403     
404     char *buffer = malloc(MZ_COPY_BUFSIZE);
405     size_t bufsize = MZ_COPY_BUFSIZE;
406     
407     for(;;) {
408         SrcFile *file = queue_get_file();
409         if(!file) {
410             break;
411         }
412         
413         char *from = file->path ? util_concat_path(settings->from, file->path) : settings->from;  
414         char *to = util_concat_path(settings->to, file->path ? file->path : util_resource_name(settings->from));
415         
416         size_t from_len = strlen(from);
417         size_t to_len = strlen(to);
418         
419         if(from[from_len-1] == '/') {
420             from[from_len-1] = 0;
421         }
422         if(to[to_len-1] == '/') {
423             to[to_len-1] = 0;
424         }
425         
426         int ret;
427         if(file->isdir) {
428             ret = mz_copy_dir(settings, file, from, to);
429         } else {
430             ret = mz_copy_file(settings, file, from, to, buffer, bufsize);
431         }
432         
433         free(to);
434         if(from != settings->from) {
435             free(from);
436         }
437     }
438     
439     return NULL;
440 }
441
442 int mz_copy_dir(CPSettings *settings, SrcFile *file, const char *from, const char *to) {
443     printf("mkdir %s\n", to);
444     return mkdir(to, file->mode);
445 }
446
447 int mz_copy_file(CPSettings *settings, SrcFile *file, const char *from, const char *to, char *buffer, size_t bufsize) {
448     printf("cp %s %s\n", from, to);
449     int fin = open(from, O_RDONLY);
450     if(fin < 0) {
451         return 1;
452     }
453     
454     int ret = 0;
455     
456     int fout = open(to, O_WRONLY|O_CREAT, file->mode);
457     if(fout < 0) {
458         perror("open");
459         close(fin);
460         return 1;
461     }
462     
463     ssize_t r;
464     while((r = read(fin, buffer, bufsize)) > 0) {
465         if(write(fout, buffer, r) != r) {
466             ret = 1;
467             break;
468         }
469     }
470     
471     close(fin);
472     close(fout);
473     
474     return ret;
475 }