mizucp: implement scan thread
[mizunara.git] / mizucp / main.c
1 /*
2  * Copyright 2021 Olaf Wintermann
3  *
4  * Permission is hereby granted, free of charge, to any person obtaining a 
5  * copy of this software and associated documentation files (the "Software"), 
6  * to deal in the Software without restriction, including without limitation 
7  * the rights to use, copy, modify, merge, publish, distribute, sublicense, 
8  * and/or sell copies of the Software, and to permit persons to whom the 
9  * Software is furnished to do so, subject to the following conditions:
10  * 
11  * The above copyright notice and this permission notice shall be included in 
12  * all copies or substantial portions of the Software.
13  *
14  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 
15  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 
16  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL 
17  * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 
18  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING 
19  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER 
20  * DEALINGS IN THE SOFTWARE.
21  */
22
23 #include "main.h"
24
25 #include "srvctrl.h"
26
27 #include <stdio.h>
28 #include <stdlib.h>
29 #include <string.h>
30 #include <unistd.h>
31 #include <signal.h>
32 #include <errno.h>
33 #include <time.h>
34 #include <sys/stat.h>
35 #include <sys/socket.h>
36 #include <sys/un.h>
37 #include <sys/fcntl.h>
38 #include <poll.h>
39 #include <dirent.h>
40
41 #include <libidav/utils.h>
42
43 #include <ucx/utils.h>
44
45
46 #define OPTSTR "hlpsuv"
47
48 static char *cfgdir;
49 static char *copydir;
50
51
52 static pthread_t scan_thread;
53
54 static pthread_t *copy_threads;
55 static size_t num_copy_threads;
56
57 static pthread_mutex_t queue_lock;
58 static pthread_cond_t  queue_available;
59 static MZQueue         *queue_begin;
60 static MZQueue         *queue_end;
61
62 static int scan_complete;
63
64 int main(int argc, char** argv) { 
65     int ret = 1;
66     
67     extern char *optarg;
68     extern int optind, opterr, optopt;
69     
70     CPSettings settings;
71     memset(&settings, 0, sizeof(CPSettings));
72     
73     int help = 0;
74     int version = 0;
75     int list = 0;    // list copying processes
76     
77     int c;
78     while((c = getopt(argc, argv, OPTSTR)) != -1) {
79         switch(c) {
80             case 'l': list = 1; break;
81             case 'p': settings.pause = 1; break;
82             case 's': settings.printsocket = 1; break;
83             case 'u': settings.url = 1; break;
84             case 'v': version = 1; break;
85         }
86     }
87     
88     int ac = argc - optind;
89     
90     if(list) {
91         // list command
92     } else if(help) {
93         // print help
94     } else if(version) {
95         // print version
96     } else if(ac == 2) {
97         // copy
98         settings.from = argv[optind];
99         settings.to   = argv[optind+1];
100         ret = mzcp_copy(&settings);
101     } else {
102         
103         // print usage
104     }
105     
106     return ret;
107 }
108
109 static int check_dir(const char *path) {
110     struct stat s;
111     if(stat(path, &s)) {
112         if(errno == ENOENT) {
113             if(mkdir(path, S_IRWXU)) {
114                 fprintf(stderr, "Cannot create %s: %s", path, strerror(errno));
115                 return 1;
116             }
117         } else {
118             fprintf(stderr, "Cannot access %s: %s", path, strerror(errno));
119             return 1;
120         }
121     }
122     return 0;
123 }
124
125 static int check_configdir(void) {
126     char *home = getenv(MZCP_ENV_HOME);
127     
128     char *base = util_concat_path(home, MZCP_CFG_BASE);
129     if(check_dir(base)) return 1;
130     free(base);
131
132     cfgdir = util_concat_path(home, MZCP_CFG_DIR);
133     if(check_dir(cfgdir)) return 1;
134     
135     copydir = util_concat_path(mzcp_get_cfgdir(), MZCP_COPY_DIR);
136     if(check_dir(copydir)) return 1;
137     
138     return 0;
139 }
140
141 const char* mzcp_get_cfgdir(void) {
142     return cfgdir;
143 }
144
145 const char* mzcp_get_copydir(void) {
146     return copydir;
147 }
148
149 static int init_queue(void) {
150     if(pthread_mutex_init(&queue_lock, NULL)) return 1;
151     if(pthread_cond_init(&queue_available, NULL)) return 1;
152     return 0;
153 }
154
155 static MZQueue* queue_root_elm_new(void) {
156     SrcFile *file = malloc(sizeof(SrcFile));
157     if(!file) return NULL;
158     memset(file, 0, sizeof(SrcFile));
159     
160     MZQueue *q = malloc(sizeof(MZQueue));
161     if(!q) return NULL;
162     
163     q->file = file;
164     q->next = NULL;
165     
166     return q;
167 }
168
169 int mzcp_copy(CPSettings *settings) {
170     int ret  = 0;
171     
172     if(check_configdir()) {
173         return 2;
174     }
175     
176     
177     if(create_control_socket()) {
178         return 3;
179     }
180     
181     if(init_queue()) {
182         return 4;
183     }
184     
185     if(settings->printsocket) {
186         printf("%s\n", mzcp_get_socketpath());
187     } else {
188         printf("copy %s to %s\n", settings->from, settings->to);
189         if(settings->pause) {
190             printf("pause\n");
191         }
192     }
193     
194     //pid_t p = fork();
195     pid_t p = 0;
196     if(p == 0) {
197         //close(0);
198         //close(1);
199         //close(2);
200         
201         if(mzcp_start_scan(settings)) {
202             return 2;
203         }
204         
205         if(mzcp_start_copy_threads(settings)) {
206             
207             return 3;
208         }
209         
210         ret =  mzcp_srvctrl(settings);
211     }
212     
213     return ret;
214 }
215
216
217 int mzcp_start_scan(CPSettings *settings) {
218     struct stat s;
219     if(stat(settings->from, &s)) {
220         // TODO: error
221         return 1;
222     }
223     
224     if(!S_ISDIR(s.st_mode)) {
225         // queue single file
226         queue_begin = queue_root_elm_new();
227         if(!queue_begin) {
228             return 1;
229         }
230         queue_end = queue_begin;
231         scan_complete = 1;
232         settings->num_threads = 1;
233     } else {
234         // scan src directory in a separate thread
235         if(pthread_create(&scan_thread, NULL, scan_run, settings)) {
236             // TODO: we need some clever error handling
237             // we are already in the forked procress and stdout/stderr are closed
238             // maybe wait for someone to connect to the unix domain socket
239             return 1;
240         }
241     }
242     
243     return 0;
244 }
245
246
247 void* scan_run(void *data) {
248     CPSettings *settings = data;
249     
250     UcxList *stack = NULL;
251     
252     char *root = strdup("");
253     
254     SrcFile *file = calloc(1, sizeof(SrcFile));
255     if(!file) {
256         scan_complete = 1;
257         // TODO: error
258         return NULL;
259     }
260     file->path = root;
261     file->isdir = 1;
262     if(enqueue_file(file)) {
263         scan_complete = 1;
264         // TODO: error
265         return NULL;
266     }
267     
268     stack = ucx_list_prepend(NULL, file);
269     while(stack) {
270         SrcFile *elm = stack->data;
271         UcxList *next = stack->next;
272         free(stack);
273         stack = next;
274         
275         char *path = util_concat_path(settings->from, elm->path);
276         
277         int dir_fd = open(path, O_RDONLY);
278         if(dir_fd < 0) {
279             // TODO: error?
280             continue;
281         }
282         
283         // read directory and enqueue all children
284         DIR *dir = fdopendir(dir_fd);
285         struct dirent *ent;
286         while((ent = readdir(dir)) != NULL) {
287             char *name = ent->d_name;
288             if(!strcmp(name, ".") || !strcmp(name, "..")) {
289                 continue;
290             }
291             
292             struct stat s;
293             if(fstatat(dir_fd, name, &s, 0)) {
294                 // TODO: error?
295                 continue;
296             }
297             
298             SrcFile *f = calloc(1, sizeof(SrcFile));
299             f->path = util_concat_path(elm->path, name);
300             f->isdir = S_ISDIR(s.st_mode);
301             f->depends_on = elm;
302             
303             if(enqueue_file(f)) {
304                 // TODO: error?
305                 fprintf(stderr, "enqueue failed\n");
306                 break;
307             }
308             
309             // put dir on stack
310             if(f->isdir) {
311                 stack = ucx_list_prepend(stack, f);
312             }
313         }
314         
315         closedir(dir);
316     }
317     
318     scan_complete = 1;
319     
320     return NULL;
321 }
322
323 int mzcp_start_copy_threads(CPSettings *settings) {
324     if(settings->num_threads == 0) {
325         num_copy_threads = 1;
326     } else if(settings->num_threads > MAX_COPY_THREADS) {
327         num_copy_threads = MAX_COPY_THREADS;
328     } else {
329         num_copy_threads = settings->num_threads;
330     }
331     
332     copy_threads = calloc(num_copy_threads, sizeof(pthread_t));
333     
334     int f = 0;
335     for(int i=0;i<num_copy_threads;i++) {
336         if(pthread_create(&copy_threads[i], NULL, copy_run, settings)) {
337             f++;
338         }
339     }
340     
341     return f < num_copy_threads ? 0 : 1;
342 }
343
344 int enqueue_file(SrcFile *file) {
345     MZQueue *q = malloc(sizeof(MZQueue));
346     if(!q) {
347         return 1;
348     }
349     
350     q->file = file;
351     q->next = NULL;
352     
353     pthread_mutex_lock(&queue_lock);
354     
355     if(queue_end) {
356         queue_end->next = q;
357         queue_end = q;
358     } else {
359         queue_begin = q;
360         queue_end = q;
361     }
362     
363     pthread_cond_signal(&queue_available);
364     pthread_mutex_unlock(&queue_lock);
365     
366     return 0;
367 }
368
369 static SrcFile* queue_get_file(void) {
370     SrcFile *file = NULL;
371     pthread_mutex_lock(&queue_lock);
372     
373     MZQueue *q = NULL;
374     while(!q) {
375         if(!queue_begin) {
376             if(scan_complete) {
377                 break;
378             }
379             pthread_cond_wait(&queue_available, &queue_lock);
380             continue;
381         } else {
382             q = queue_begin;
383             queue_begin = queue_begin->next;
384             if(!queue_begin) {
385                 queue_end = NULL;
386             }
387         }
388     }
389     
390     if(q) {
391         file = q->file;
392         free(q);
393     }
394     
395     pthread_mutex_unlock(&queue_lock);
396     
397     return file;
398 }
399
400 void* copy_run(void *data) {
401     CPSettings *settings = data;
402     for(;;) {
403         SrcFile *file = queue_get_file();
404         if(!file) {
405             break;
406         }
407         
408         char *from = file->path ? util_concat_path(settings->from, file->path) : settings->from;
409         printf("src: %s\n", from);
410         
411         char *to = util_concat_path(settings->to, file->path ? file->path : util_resource_name(settings->from));
412         printf("dst: %s\n", to);
413         
414         free(to);
415         if(from != settings->from) {
416             free(from);
417         }
418     }
419     
420     return NULL;
421 }
422