2 * Copyright 2021 Olaf Wintermann
4 * Permission is hereby granted, free of charge, to any person obtaining a
5 * copy of this software and associated documentation files (the "Software"),
6 * to deal in the Software without restriction, including without limitation
7 * the rights to use, copy, modify, merge, publish, distribute, sublicense,
8 * and/or sell copies of the Software, and to permit persons to whom the
9 * Software is furnished to do so, subject to the following conditions:
11 * The above copyright notice and this permission notice shall be included in
12 * all copies or substantial portions of the Software.
14 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
17 * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
19 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
20 * DEALINGS IN THE SOFTWARE.
37 #include <sys/socket.h>
39 #include <sys/fcntl.h>
43 #include <libidav/utils.h>
45 #include <ucx/utils.h>
48 #define OPTSTR "hlpsuv"
54 static pthread_t scan_thread;
56 static pthread_t *copy_threads;
57 static size_t num_copy_threads;
59 static pthread_mutex_t queue_lock;
60 static pthread_cond_t queue_available;
61 static MZQueue *queue_begin;
62 static MZQueue *queue_end;
64 static int scan_complete;
67 static uint64_t stat_num_files;
68 static uint64_t stat_copied_files;
69 static uint64_t stat_error_files;
70 static uint64_t stat_total_size;
71 static uint64_t stat_copied_size;
73 int main(int argc, char** argv) {
77 extern int optind, opterr, optopt;
80 memset(&settings, 0, sizeof(CPSettings));
84 int list = 0; // list copying processes
87 while((c = getopt(argc, argv, OPTSTR)) != -1) {
89 case 'l': list = 1; break;
90 case 'p': settings.pause = 1; break;
91 case 's': settings.printsocket = 1; break;
92 case 'u': settings.url = 1; break;
93 case 'v': version = 1; break;
97 int ac = argc - optind;
107 settings.from = argv[optind];
108 settings.to = argv[optind+1];
109 ret = mzcp_copy(&settings);
118 static int check_dir(const char *path) {
121 if(errno == ENOENT) {
122 if(mkdir(path, S_IRWXU)) {
123 fprintf(stderr, "Cannot create %s: %s", path, strerror(errno));
127 fprintf(stderr, "Cannot access %s: %s", path, strerror(errno));
134 static int check_configdir(void) {
135 char *home = getenv(MZCP_ENV_HOME);
137 char *base = util_concat_path(home, MZCP_CFG_BASE);
138 if(check_dir(base)) return 1;
141 cfgdir = util_concat_path(home, MZCP_CFG_DIR);
142 if(check_dir(cfgdir)) return 1;
144 copydir = util_concat_path(mzcp_get_cfgdir(), MZCP_COPY_DIR);
145 if(check_dir(copydir)) return 1;
150 const char* mzcp_get_cfgdir(void) {
154 const char* mzcp_get_copydir(void) {
158 static int init_queue(void) {
159 if(pthread_mutex_init(&queue_lock, NULL)) return 1;
160 if(pthread_cond_init(&queue_available, NULL)) return 1;
164 static MZQueue* queue_root_elm_new(void) {
165 SrcFile *file = malloc(sizeof(SrcFile));
166 if(!file) return NULL;
167 memset(file, 0, sizeof(SrcFile));
169 MZQueue *q = malloc(sizeof(MZQueue));
178 int mzcp_copy(CPSettings *settings) {
181 if(check_configdir()) {
186 if(create_control_socket()) {
194 if(settings->printsocket) {
195 printf("%s\n", mzcp_get_socketpath());
197 printf("copy %s to %s\n", settings->from, settings->to);
198 if(settings->pause) {
210 if(mzcp_start_scan(settings)) {
214 if(mzcp_start_copy_threads(settings)) {
219 ret = mzcp_srvctrl(settings);
226 int mzcp_start_scan(CPSettings *settings) {
227 if(stat(settings->from, &settings->root_stat)) {
232 if(!S_ISDIR(settings->root_stat.st_mode)) {
234 queue_begin = queue_root_elm_new();
238 queue_end = queue_begin;
240 settings->num_threads = 1;
242 // scan src directory in a separate thread
243 if(pthread_create(&scan_thread, NULL, scan_run, settings)) {
244 // TODO: we need some clever error handling
245 // we are already in the forked procress and stdout/stderr are closed
246 // maybe wait for someone to connect to the unix domain socket
255 void* scan_run(void *data) {
256 CPSettings *settings = data;
258 UcxList *stack = NULL;
260 char *root = strdup("");
262 SrcFile *file = calloc(1, sizeof(SrcFile));
270 file->mode = settings->root_stat.st_mode;
271 if(enqueue_file(file)) {
277 stack = ucx_list_prepend(NULL, file);
279 SrcFile *elm = stack->data;
280 UcxList *next = stack->next;
284 char *path = util_concat_path(settings->from, elm->path);
286 int dir_fd = open(path, O_RDONLY);
292 // read directory and enqueue all children
293 DIR *dir = fdopendir(dir_fd);
295 while((ent = readdir(dir)) != NULL) {
296 char *name = ent->d_name;
297 if(!strcmp(name, ".") || !strcmp(name, "..")) {
302 if(fstatat(dir_fd, name, &s, 0)) {
307 SrcFile *f = calloc(1, sizeof(SrcFile));
308 f->path = util_concat_path(elm->path, name);
309 f->isdir = S_ISDIR(s.st_mode);
314 if(enqueue_file(f)) {
316 fprintf(stderr, "enqueue failed\n");
319 mz_atomic_inc64(&stat_num_files);
320 int64_t sz = s.st_size;
321 mz_atomic_add64(&stat_total_size, sz);
326 stack = ucx_list_prepend(stack, f);
338 int mzcp_start_copy_threads(CPSettings *settings) {
339 if(settings->num_threads == 0) {
340 num_copy_threads = 1;
341 } else if(settings->num_threads > MAX_COPY_THREADS) {
342 num_copy_threads = MAX_COPY_THREADS;
344 num_copy_threads = settings->num_threads;
347 copy_threads = calloc(num_copy_threads, sizeof(pthread_t));
350 for(int i=0;i<num_copy_threads;i++) {
351 if(pthread_create(©_threads[i], NULL, copy_run, settings)) {
356 return f < num_copy_threads ? 0 : 1;
359 int enqueue_file(SrcFile *file) {
360 MZQueue *q = malloc(sizeof(MZQueue));
368 pthread_mutex_lock(&queue_lock);
378 pthread_cond_signal(&queue_available);
379 pthread_mutex_unlock(&queue_lock);
384 static SrcFile* queue_get_file(void) {
385 SrcFile *file = NULL;
386 pthread_mutex_lock(&queue_lock);
394 pthread_cond_wait(&queue_available, &queue_lock);
398 queue_begin = queue_begin->next;
410 pthread_mutex_unlock(&queue_lock);
415 void* copy_run(void *data) {
416 CPSettings *settings = data;
418 char *buffer = malloc(MZ_COPY_BUFSIZE);
419 size_t bufsize = MZ_COPY_BUFSIZE;
422 SrcFile *file = queue_get_file();
427 char *from = file->path ? util_concat_path(settings->from, file->path) : settings->from;
428 char *to = util_concat_path(settings->to, file->path ? file->path : util_resource_name(settings->from));
430 size_t from_len = strlen(from);
431 size_t to_len = strlen(to);
433 if(from[from_len-1] == '/') {
434 from[from_len-1] = 0;
436 if(to[to_len-1] == '/') {
440 if(file->depends_on) {
441 SrcFile *dep = file->depends_on;
442 // check first without lock
443 if(dep->status == 0) {
445 pthread_mutex_lock(&dep->lock->mutex);
446 if(file->depends_on->status == 0) {
447 pthread_cond_wait(&dep->lock->cond, &dep->lock->mutex);
449 pthread_mutex_unlock(&dep->lock->mutex);
451 // locking disabled (because we have only one thread)
452 // but in that case the file status can't be 0
453 // therefore this case here should not happen
458 if(dep->status == -1) {
464 if(file->status == 0) {
466 ret = mz_copy_dir(settings, file, from, to);
468 ret = mz_copy_file(settings, file, from, to, buffer, bufsize);
473 if(from != settings->from) {
481 static void file_set_status(SrcFile *file, int status) {
483 pthread_mutex_lock(&file->lock->mutex);
484 file->status = status;
485 pthread_cond_broadcast(&file->lock->cond);
486 pthread_mutex_unlock(&file->lock->mutex);
488 file->status = status;
492 int mz_copy_dir(CPSettings *settings, SrcFile *file, const char *from, const char *to) {
493 printf("mkdir %s\n", to);
494 int ret = mkdir(to, file->mode);
496 if(errno == EEXIST) {
499 if(S_ISDIR(s.st_mode)) {
505 file_set_status(file, !ret ? 1 : -1);
509 int mz_copy_file(CPSettings *settings, SrcFile *file, const char *from, const char *to, char *buffer, size_t bufsize) {
510 printf("cp %s %s\n", from, to);
511 int fin = open(from, O_RDONLY);
513 file_set_status(file, -1);
519 int fout = open(to, O_WRONLY|O_CREAT, file->mode);
523 file_set_status(file, -1);
529 while((r = read(fin, buffer, bufsize)) > 0) {
530 ssize_t w = write(fout, buffer, r);
532 mz_atomic_add64(&stat_copied_size, w);
545 file_set_status(file, 1);
546 mz_atomic_inc64(&stat_copied_files);
547 if(copied != file->size) {
548 // size changed after scan -> readjust total size
549 int64_t filesz_diff = copied - file->size;
550 mz_atomic_add64(&stat_total_size, filesz_diff);
553 file_set_status(file, -1);
554 mz_atomic_inc64(&stat_error_files);
555 if(copied != file->size) {
556 // count the full file size as copied, although we had an error
557 int64_t filesz_diff = file->size - copied;
558 mz_atomic_add64(&stat_copied_size, filesz_diff);