+
+
+int mzcp_start_scan(CPSettings *settings) {
+ if(stat(settings->from, &settings->root_stat)) {
+ // TODO: error
+ return 1;
+ }
+
+ if(!S_ISDIR(settings->root_stat.st_mode)) {
+ // queue single file
+ queue_begin = queue_root_elm_new();
+ if(!queue_begin) {
+ return 1;
+ }
+ queue_end = queue_begin;
+ scan_complete = 1;
+ settings->num_threads = 1;
+ } else {
+ // scan src directory in a separate thread
+ if(pthread_create(&scan_thread, NULL, scan_run, settings)) {
+ // TODO: we need some clever error handling
+ // we are already in the forked procress and stdout/stderr are closed
+ // maybe wait for someone to connect to the unix domain socket
+ return 1;
+ }
+ }
+
+ return 0;
+}
+
+
+void* scan_run(void *data) {
+ CPSettings *settings = data;
+
+ UcxList *stack = NULL;
+
+ char *root = strdup("");
+
+ SrcFile *file = calloc(1, sizeof(SrcFile));
+ if(!file) {
+ scan_complete = 1;
+ // TODO: error
+ return NULL;
+ }
+ file->path = root;
+ file->isdir = 1;
+ file->mode = settings->root_stat.st_mode;
+ if(enqueue_file(file)) {
+ scan_complete = 1;
+ // TODO: error
+ return NULL;
+ }
+
+ stack = ucx_list_prepend(NULL, file);
+ while(stack) {
+ SrcFile *elm = stack->data;
+ UcxList *next = stack->next;
+ free(stack);
+ stack = next;
+
+ char *path = util_concat_path(settings->from, elm->path);
+
+ int dir_fd = open(path, O_RDONLY);
+ if(dir_fd < 0) {
+ // TODO: error?
+ continue;
+ }
+
+ // read directory and enqueue all children
+ DIR *dir = fdopendir(dir_fd);
+ struct dirent *ent;
+ while((ent = readdir(dir)) != NULL) {
+ char *name = ent->d_name;
+ if(!strcmp(name, ".") || !strcmp(name, "..")) {
+ continue;
+ }
+
+ struct stat s;
+ if(fstatat(dir_fd, name, &s, 0)) {
+ // TODO: error?
+ continue;
+ }
+
+ SrcFile *f = calloc(1, sizeof(SrcFile));
+ f->path = util_concat_path(elm->path, name);
+ f->isdir = S_ISDIR(s.st_mode);
+ f->size = s.st_size;
+ f->mode = s.st_mode;
+ f->depends_on = elm;
+
+ if(enqueue_file(f)) {
+ // TODO: error?
+ fprintf(stderr, "enqueue failed\n");
+ break;
+ } else {
+ mz_atomic_inc64(&stat_num_files);
+ int64_t sz = s.st_size;
+ mz_atomic_add64(&stat_total_size, sz);
+ }
+
+ // put dir on stack
+ if(f->isdir) {
+ stack = ucx_list_prepend(stack, f);
+ }
+ }
+
+ closedir(dir);
+ }
+
+ scan_complete = 1;
+
+ return NULL;
+}
+
+int mzcp_start_copy_threads(CPSettings *settings) {
+ if(settings->num_threads == 0) {
+ num_copy_threads = 1;
+ } else if(settings->num_threads > MAX_COPY_THREADS) {
+ num_copy_threads = MAX_COPY_THREADS;
+ } else {
+ num_copy_threads = settings->num_threads;
+ }
+
+ copy_threads = calloc(num_copy_threads, sizeof(pthread_t));
+
+ int f = 0;
+ for(int i=0;i<num_copy_threads;i++) {
+ if(pthread_create(©_threads[i], NULL, copy_run, settings)) {
+ f++;
+ }
+ }
+
+ return f < num_copy_threads ? 0 : 1;
+}
+
+int enqueue_file(SrcFile *file) {
+ MZQueue *q = malloc(sizeof(MZQueue));
+ if(!q) {
+ return 1;
+ }
+
+ q->file = file;
+ q->next = NULL;
+
+ pthread_mutex_lock(&queue_lock);
+
+ if(queue_end) {
+ queue_end->next = q;
+ queue_end = q;
+ } else {
+ queue_begin = q;
+ queue_end = q;
+ }
+
+ pthread_cond_signal(&queue_available);
+ pthread_mutex_unlock(&queue_lock);
+
+ return 0;
+}
+
+static SrcFile* queue_get_file(void) {
+ SrcFile *file = NULL;
+ pthread_mutex_lock(&queue_lock);
+
+ MZQueue *q = NULL;
+ while(!q) {
+ if(!queue_begin) {
+ if(scan_complete) {
+ break;
+ }
+ pthread_cond_wait(&queue_available, &queue_lock);
+ continue;
+ } else {
+ q = queue_begin;
+ queue_begin = queue_begin->next;
+ if(!queue_begin) {
+ queue_end = NULL;
+ }
+ }
+ }
+
+ if(q) {
+ file = q->file;
+ free(q);
+ }
+
+ pthread_mutex_unlock(&queue_lock);
+
+ return file;
+}
+
+void* copy_run(void *data) {
+ CPSettings *settings = data;
+
+ char *buffer = malloc(MZ_COPY_BUFSIZE);
+ size_t bufsize = MZ_COPY_BUFSIZE;
+
+ for(;;) {
+ SrcFile *file = queue_get_file();
+ if(!file) {
+ break;
+ }
+
+ char *from = file->path ? util_concat_path(settings->from, file->path) : settings->from;
+ char *to = util_concat_path(settings->to, file->path ? file->path : util_resource_name(settings->from));
+
+ size_t from_len = strlen(from);
+ size_t to_len = strlen(to);
+
+ if(from[from_len-1] == '/') {
+ from[from_len-1] = 0;
+ }
+ if(to[to_len-1] == '/') {
+ to[to_len-1] = 0;
+ }
+
+ if(file->depends_on) {
+ SrcFile *dep = file->depends_on;
+ // check first without lock
+ if(dep->status == 0) {
+ if(dep->lock) {
+ pthread_mutex_lock(&dep->lock->mutex);
+ if(file->depends_on->status == 0) {
+ pthread_cond_wait(&dep->lock->cond, &dep->lock->mutex);
+ }
+ pthread_mutex_unlock(&dep->lock->mutex);
+ } else {
+ // locking disabled (because we have only one thread)
+ // but in that case the file status can't be 0
+ // therefore this case here should not happen
+ file->status = -1;
+ }
+ }
+ // check again
+ if(dep->status == -1) {
+ file->status = -1;
+ }
+ }
+
+ int ret;
+ if(file->status == 0) {
+ if(file->isdir) {
+ ret = mz_copy_dir(settings, file, from, to);
+ } else {
+ ret = mz_copy_file(settings, file, from, to, buffer, bufsize);
+ }
+ }
+
+ free(to);
+ if(from != settings->from) {
+ free(from);
+ }
+ }
+
+ return NULL;
+}
+
+static void file_set_status(SrcFile *file, int status) {
+ if(file->lock) {
+ pthread_mutex_lock(&file->lock->mutex);
+ file->status = status;
+ pthread_cond_broadcast(&file->lock->cond);
+ pthread_mutex_unlock(&file->lock->mutex);
+ } else {
+ file->status = status;
+ }
+}
+
+int mz_copy_dir(CPSettings *settings, SrcFile *file, const char *from, const char *to) {
+ printf("mkdir %s\n", to);
+ int ret = mkdir(to, file->mode);
+ if(ret) {
+ if(errno == EEXIST) {
+ struct stat s;
+ if(!stat(to, &s)) {
+ if(S_ISDIR(s.st_mode)) {
+ ret = 0;
+ }
+ }
+ }
+ }
+ file_set_status(file, !ret ? 1 : -1);
+ return ret;
+}
+
+int mz_copy_file(CPSettings *settings, SrcFile *file, const char *from, const char *to, char *buffer, size_t bufsize) {
+ printf("cp %s %s\n", from, to);
+ int fin = open(from, O_RDONLY);
+ if(fin < 0) {
+ file_set_status(file, -1);
+ return 1;
+ }
+
+ int ret = 0;
+
+ int fout = open(to, O_WRONLY|O_CREAT, file->mode);
+ if(fout < 0) {
+ perror("open");
+ close(fin);
+ file_set_status(file, -1);
+ return 1;
+ }
+
+ int64_t copied = 0;
+ ssize_t r;
+ while((r = read(fin, buffer, bufsize)) > 0) {
+ ssize_t w = write(fout, buffer, r);
+ if(w > 0) {
+ mz_atomic_add64(&stat_copied_size, w);
+ copied += w;
+ }
+ if(w != r) {
+ ret = 1;
+ break;
+ }
+ }
+
+ close(fin);
+ close(fout);
+
+ if(!ret) {
+ file_set_status(file, 1);
+ mz_atomic_inc64(&stat_copied_files);
+ if(copied != file->size) {
+ // size changed after scan -> readjust total size
+ int64_t filesz_diff = copied - file->size;
+ mz_atomic_add64(&stat_total_size, filesz_diff);
+ }
+ } else {
+ file_set_status(file, -1);
+ mz_atomic_inc64(&stat_error_files);
+ if(copied != file->size) {
+ // count the full file size as copied, although we had an error
+ int64_t filesz_diff = file->size - copied;
+ mz_atomic_add64(&stat_copied_size, filesz_diff);
+ }
+ }
+
+ return ret;
+}