X-Git-Url: https://develop.uap-core.de/gitweb/mizunara.git/blobdiff_plain/b0a25671970e6d8c7418ae07f5c2601ba56b413c..HEAD:/mizucp/main.c diff --git a/mizucp/main.c b/mizucp/main.c index d8a05d6..f590dc7 100644 --- a/mizucp/main.c +++ b/mizucp/main.c @@ -1,37 +1,33 @@ /* - * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER. + * Copyright 2021 Olaf Wintermann * - * Copyright 2021 Olaf Wintermann. All rights reserved. + * Permission is hereby granted, free of charge, to any person obtaining a + * copy of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are met: - * - * 1. Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * - * 2. Redistributions in binary form must reproduce the above copyright - * notice, this list of conditions and the following disclaimer in the - * documentation and/or other materials provided with the distribution. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" - * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE - * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE - * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE - * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR - * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF - * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS - * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN - * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) - * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE - * POSSIBILITY OF SUCH DAMAGE. + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. */ #include "main.h" #include "srvctrl.h" +#include "atomic.h" #include #include +#include #include #include #include @@ -41,22 +37,39 @@ #include #include #include -#include #include +#include #include #include - #define OPTSTR "hlpsuv" - - static char *cfgdir; static char *copydir; + +static pthread_t scan_thread; + +static pthread_t *copy_threads; +static size_t num_copy_threads; + +static pthread_mutex_t queue_lock; +static pthread_cond_t queue_available; +static MZQueue *queue_begin; +static MZQueue *queue_end; + +static int scan_complete; + + +static uint64_t stat_num_files; +static uint64_t stat_copied_files; +static uint64_t stat_error_files; +static uint64_t stat_total_size; +static uint64_t stat_copied_size; + int main(int argc, char** argv) { int ret = 1; @@ -142,6 +155,26 @@ const char* mzcp_get_copydir(void) { return copydir; } +static int init_queue(void) { + if(pthread_mutex_init(&queue_lock, NULL)) return 1; + if(pthread_cond_init(&queue_available, NULL)) return 1; + return 0; +} + +static MZQueue* queue_root_elm_new(void) { + SrcFile *file = malloc(sizeof(SrcFile)); + if(!file) return NULL; + memset(file, 0, sizeof(SrcFile)); + + MZQueue *q = malloc(sizeof(MZQueue)); + if(!q) return NULL; + + q->file = file; + q->next = NULL; + + return q; +} + int mzcp_copy(CPSettings *settings) { int ret = 0; @@ -154,6 +187,10 @@ int mzcp_copy(CPSettings *settings) { return 3; } + if(init_queue()) { + return 4; + } + if(settings->printsocket) { printf("%s\n", mzcp_get_socketpath()); } else { @@ -170,8 +207,357 @@ int mzcp_copy(CPSettings *settings) { //close(1); //close(2); + if(mzcp_start_scan(settings)) { + return 2; + } + + if(mzcp_start_copy_threads(settings)) { + + return 3; + } + ret = mzcp_srvctrl(settings); } return ret; } + + +int mzcp_start_scan(CPSettings *settings) { + if(stat(settings->from, &settings->root_stat)) { + // TODO: error + return 1; + } + + if(!S_ISDIR(settings->root_stat.st_mode)) { + // queue single file + queue_begin = queue_root_elm_new(); + if(!queue_begin) { + return 1; + } + queue_end = queue_begin; + scan_complete = 1; + settings->num_threads = 1; + } else { + // scan src directory in a separate thread + if(pthread_create(&scan_thread, NULL, scan_run, settings)) { + // TODO: we need some clever error handling + // we are already in the forked procress and stdout/stderr are closed + // maybe wait for someone to connect to the unix domain socket + return 1; + } + } + + return 0; +} + + +void* scan_run(void *data) { + CPSettings *settings = data; + + UcxList *stack = NULL; + + char *root = strdup(""); + + SrcFile *file = calloc(1, sizeof(SrcFile)); + if(!file) { + scan_complete = 1; + // TODO: error + return NULL; + } + file->path = root; + file->isdir = 1; + file->mode = settings->root_stat.st_mode; + if(enqueue_file(file)) { + scan_complete = 1; + // TODO: error + return NULL; + } + + stack = ucx_list_prepend(NULL, file); + while(stack) { + SrcFile *elm = stack->data; + UcxList *next = stack->next; + free(stack); + stack = next; + + char *path = util_concat_path(settings->from, elm->path); + + int dir_fd = open(path, O_RDONLY); + if(dir_fd < 0) { + // TODO: error? + continue; + } + + // read directory and enqueue all children + DIR *dir = fdopendir(dir_fd); + struct dirent *ent; + while((ent = readdir(dir)) != NULL) { + char *name = ent->d_name; + if(!strcmp(name, ".") || !strcmp(name, "..")) { + continue; + } + + struct stat s; + if(fstatat(dir_fd, name, &s, 0)) { + // TODO: error? + continue; + } + + SrcFile *f = calloc(1, sizeof(SrcFile)); + f->path = util_concat_path(elm->path, name); + f->isdir = S_ISDIR(s.st_mode); + f->size = s.st_size; + f->mode = s.st_mode; + f->depends_on = elm; + + if(enqueue_file(f)) { + // TODO: error? + fprintf(stderr, "enqueue failed\n"); + break; + } else { + mz_atomic_inc64(&stat_num_files); + int64_t sz = s.st_size; + mz_atomic_add64(&stat_total_size, sz); + } + + // put dir on stack + if(f->isdir) { + stack = ucx_list_prepend(stack, f); + } + } + + closedir(dir); + } + + scan_complete = 1; + + return NULL; +} + +int mzcp_start_copy_threads(CPSettings *settings) { + if(settings->num_threads == 0) { + num_copy_threads = 1; + } else if(settings->num_threads > MAX_COPY_THREADS) { + num_copy_threads = MAX_COPY_THREADS; + } else { + num_copy_threads = settings->num_threads; + } + + copy_threads = calloc(num_copy_threads, sizeof(pthread_t)); + + int f = 0; + for(int i=0;ifile = file; + q->next = NULL; + + pthread_mutex_lock(&queue_lock); + + if(queue_end) { + queue_end->next = q; + queue_end = q; + } else { + queue_begin = q; + queue_end = q; + } + + pthread_cond_signal(&queue_available); + pthread_mutex_unlock(&queue_lock); + + return 0; +} + +static SrcFile* queue_get_file(void) { + SrcFile *file = NULL; + pthread_mutex_lock(&queue_lock); + + MZQueue *q = NULL; + while(!q) { + if(!queue_begin) { + if(scan_complete) { + break; + } + pthread_cond_wait(&queue_available, &queue_lock); + continue; + } else { + q = queue_begin; + queue_begin = queue_begin->next; + if(!queue_begin) { + queue_end = NULL; + } + } + } + + if(q) { + file = q->file; + free(q); + } + + pthread_mutex_unlock(&queue_lock); + + return file; +} + +void* copy_run(void *data) { + CPSettings *settings = data; + + char *buffer = malloc(MZ_COPY_BUFSIZE); + size_t bufsize = MZ_COPY_BUFSIZE; + + for(;;) { + SrcFile *file = queue_get_file(); + if(!file) { + break; + } + + char *from = file->path ? util_concat_path(settings->from, file->path) : settings->from; + char *to = util_concat_path(settings->to, file->path ? file->path : util_resource_name(settings->from)); + + size_t from_len = strlen(from); + size_t to_len = strlen(to); + + if(from[from_len-1] == '/') { + from[from_len-1] = 0; + } + if(to[to_len-1] == '/') { + to[to_len-1] = 0; + } + + if(file->depends_on) { + SrcFile *dep = file->depends_on; + // check first without lock + if(dep->status == 0) { + if(dep->lock) { + pthread_mutex_lock(&dep->lock->mutex); + if(file->depends_on->status == 0) { + pthread_cond_wait(&dep->lock->cond, &dep->lock->mutex); + } + pthread_mutex_unlock(&dep->lock->mutex); + } else { + // locking disabled (because we have only one thread) + // but in that case the file status can't be 0 + // therefore this case here should not happen + file->status = -1; + } + } + // check again + if(dep->status == -1) { + file->status = -1; + } + } + + int ret; + if(file->status == 0) { + if(file->isdir) { + ret = mz_copy_dir(settings, file, from, to); + } else { + ret = mz_copy_file(settings, file, from, to, buffer, bufsize); + } + } + + free(to); + if(from != settings->from) { + free(from); + } + } + + return NULL; +} + +static void file_set_status(SrcFile *file, int status) { + if(file->lock) { + pthread_mutex_lock(&file->lock->mutex); + file->status = status; + pthread_cond_broadcast(&file->lock->cond); + pthread_mutex_unlock(&file->lock->mutex); + } else { + file->status = status; + } +} + +int mz_copy_dir(CPSettings *settings, SrcFile *file, const char *from, const char *to) { + printf("mkdir %s\n", to); + int ret = mkdir(to, file->mode); + if(ret) { + if(errno == EEXIST) { + struct stat s; + if(!stat(to, &s)) { + if(S_ISDIR(s.st_mode)) { + ret = 0; + } + } + } + } + file_set_status(file, !ret ? 1 : -1); + return ret; +} + +int mz_copy_file(CPSettings *settings, SrcFile *file, const char *from, const char *to, char *buffer, size_t bufsize) { + printf("cp %s %s\n", from, to); + int fin = open(from, O_RDONLY); + if(fin < 0) { + file_set_status(file, -1); + return 1; + } + + int ret = 0; + + int fout = open(to, O_WRONLY|O_CREAT, file->mode); + if(fout < 0) { + perror("open"); + close(fin); + file_set_status(file, -1); + return 1; + } + + int64_t copied = 0; + ssize_t r; + while((r = read(fin, buffer, bufsize)) > 0) { + ssize_t w = write(fout, buffer, r); + if(w > 0) { + mz_atomic_add64(&stat_copied_size, w); + copied += w; + } + if(w != r) { + ret = 1; + break; + } + } + + close(fin); + close(fout); + + if(!ret) { + file_set_status(file, 1); + mz_atomic_inc64(&stat_copied_files); + if(copied != file->size) { + // size changed after scan -> readjust total size + int64_t filesz_diff = copied - file->size; + mz_atomic_add64(&stat_total_size, filesz_diff); + } + } else { + file_set_status(file, -1); + mz_atomic_inc64(&stat_error_files); + if(copied != file->size) { + // count the full file size as copied, although we had an error + int64_t filesz_diff = file->size - copied; + mz_atomic_add64(&stat_copied_size, filesz_diff); + } + } + + return ret; +}