X-Git-Url: https://develop.uap-core.de/gitweb/mizunara.git/blobdiff_plain/6de1fe1dd8731d4d952657181f871f9723bbb8b2..2f4baaacb05f4d69dac8d2888b8b8eb07606e67c:/mizucp/main.c diff --git a/mizucp/main.c b/mizucp/main.c index 9dc305a..18aaf5a 100644 --- a/mizucp/main.c +++ b/mizucp/main.c @@ -1,33 +1,29 @@ /* - * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER. + * Copyright 2021 Olaf Wintermann * - * Copyright 2021 Olaf Wintermann. All rights reserved. + * Permission is hereby granted, free of charge, to any person obtaining a + * copy of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are met: - * - * 1. Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * - * 2. Redistributions in binary form must reproduce the above copyright - * notice, this list of conditions and the following disclaimer in the - * documentation and/or other materials provided with the distribution. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" - * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE - * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE - * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE - * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR - * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF - * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS - * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN - * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) - * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE - * POSSIBILITY OF SUCH DAMAGE. + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. */ #include "main.h" +#include "srvctrl.h" + #include #include #include @@ -39,25 +35,31 @@ #include #include #include -#include #include +#include #include #include -#define OPTSTR "hlpsuv" -#define TIMEOUT_IDLE -1 -#define TIMEOUT_CLIENT 1000 -#define CLIENT_UPDATE_INTERVALL 1 +#define OPTSTR "hlpsuv" static char *cfgdir; -static char *socket_path; +static char *copydir; + -static int srvctrl; +static pthread_t scan_thread; -static int eventp[2]; +static pthread_t *copy_threads; +static size_t num_copy_threads; + +static pthread_mutex_t queue_lock; +static pthread_cond_t queue_available; +static MZQueue *queue_begin; +static MZQueue *queue_end; + +static int scan_complete; int main(int argc, char** argv) { int ret = 1; @@ -95,7 +97,7 @@ int main(int argc, char** argv) { // copy settings.from = argv[optind]; settings.to = argv[optind+1]; - ret = uwcp_copy(&settings); + ret = mzcp_copy(&settings); } else { // print usage @@ -104,81 +106,67 @@ int main(int argc, char** argv) { return ret; } -static int check_configdir(void) { - char *home = getenv(UWCP_ENV_HOME); - - cfgdir = util_concat_path(home, UWCP_CFG_DIR); - +static int check_dir(const char *path) { struct stat s; - if(stat(cfgdir, &s)) { + if(stat(path, &s)) { if(errno == ENOENT) { - if(mkdir(cfgdir, S_IRWXU)) { - fprintf(stderr, "Cannot create %s: %s", cfgdir, strerror(errno)); + if(mkdir(path, S_IRWXU)) { + fprintf(stderr, "Cannot create %s: %s", path, strerror(errno)); return 1; } } else { - fprintf(stderr, "Cannot access %s: %s", cfgdir, strerror(errno)); + fprintf(stderr, "Cannot access %s: %s", path, strerror(errno)); return 1; } } - return 0; } -static int create_control_socket(void) { - char *copydir = util_concat_path(cfgdir, UWCP_COPY_DIR); +static int check_configdir(void) { + char *home = getenv(MZCP_ENV_HOME); - struct stat s; - if(stat(copydir, &s)) { - if(errno == ENOENT) { - if(mkdir(copydir, S_IRWXU)) { - fprintf(stderr, "Cannot create %s: %s", copydir, strerror(errno)); - return 1; - } - } else { - fprintf(stderr, "Cannot access %s: %s", copydir, strerror(errno)); - return 1; - } - } + char *base = util_concat_path(home, MZCP_CFG_BASE); + if(check_dir(base)) return 1; + free(base); + + cfgdir = util_concat_path(home, MZCP_CFG_DIR); + if(check_dir(cfgdir)) return 1; - // create unix domain socket - char *random_str = util_random_str(); - sstr_t socketp = ucx_sprintf("%s/%.*s", copydir, 8, random_str); - free(random_str); - socket_path = socketp.ptr; - - struct sockaddr_un addr; - if(socketp.length > sizeof(addr.sun_path)-1) { - fprintf(stderr, - "path '%s' too long for unix domain socket", - socketp.ptr); - return 1; - } + copydir = util_concat_path(mzcp_get_cfgdir(), MZCP_COPY_DIR); + if(check_dir(copydir)) return 1; - memset(&addr, 0, sizeof(addr)); - addr.sun_family = AF_UNIX; - memcpy(addr.sun_path, socketp.ptr, socketp.length); + return 0; +} + +const char* mzcp_get_cfgdir(void) { + return cfgdir; +} + +const char* mzcp_get_copydir(void) { + return copydir; +} + +static int init_queue(void) { + if(pthread_mutex_init(&queue_lock, NULL)) return 1; + if(pthread_cond_init(&queue_available, NULL)) return 1; + return 0; +} + +static MZQueue* queue_root_elm_new(void) { + SrcFile *file = malloc(sizeof(SrcFile)); + if(!file) return NULL; + memset(file, 0, sizeof(SrcFile)); - srvctrl = socket(AF_UNIX, SOCK_STREAM, 0); - if(srvctrl == -1) { - fprintf(stderr, - "Cannot create server control socket: %s", - strerror(errno)); - return 1; - } - if(bind(srvctrl, (struct sockaddr*)&addr, sizeof(addr))) { - fprintf(stderr, - "srvctrl socket bind failed: %s", - strerror(errno)); - return 1; - } + MZQueue *q = malloc(sizeof(MZQueue)); + if(!q) return NULL; - listen(srvctrl, 4); + q->file = file; + q->next = NULL; - return 0; + return q; } -int uwcp_copy(CPSettings *settings) { +int mzcp_copy(CPSettings *settings) { int ret = 0; if(check_configdir()) { @@ -190,8 +178,12 @@ int uwcp_copy(CPSettings *settings) { return 3; } + if(init_queue()) { + return 4; + } + if(settings->printsocket) { - printf("%s\n", socket_path); + printf("%s\n", mzcp_get_socketpath()); } else { printf("copy %s to %s\n", settings->from, settings->to); if(settings->pause) { @@ -206,162 +198,225 @@ int uwcp_copy(CPSettings *settings) { //close(1); //close(2); - ret = uwcp_srvctrl(settings); + if(mzcp_start_scan(settings)) { + return 2; + } + + if(mzcp_start_copy_threads(settings)) { + + return 3; + } + + ret = mzcp_srvctrl(settings); } return ret; } -int uwcp_srvctrl(CPSettings *settings) { - if(pipe(eventp)) { - perror("Cannot create event pipe"); + +int mzcp_start_scan(CPSettings *settings) { + struct stat s; + if(stat(settings->from, &s)) { + // TODO: error return 1; } - size_t allocfds = 8; - size_t numfds = 1; - - struct pollfd *fds = calloc(allocfds, sizeof(struct pollfd)); - CtrlClient **clients = calloc(allocfds, sizeof(void*)); + if(!S_ISDIR(s.st_mode)) { + // queue single file + queue_begin = queue_root_elm_new(); + if(!queue_begin) { + return 1; + } + queue_end = queue_begin; + scan_complete = 1; + settings->num_threads = 1; + } else { + // scan src directory in a separate thread + if(pthread_create(&scan_thread, NULL, scan_run, settings)) { + // TODO: we need some clever error handling + // we are already in the forked procress and stdout/stderr are closed + // maybe wait for someone to connect to the unix domain socket + return 1; + } + } - int timeout = TIMEOUT_IDLE; + return 0; +} + + +void* scan_run(void *data) { + CPSettings *settings = data; - fds[0].fd = srvctrl; - fds[0].events = POLLIN; + UcxList *stack = NULL; - int abort = 0; + char *root = strdup(""); - time_t tbegin = time(NULL); + SrcFile *file = calloc(1, sizeof(SrcFile)); + if(!file) { + scan_complete = 1; + // TODO: error + return NULL; + } + file->path = root; + file->isdir = 1; + if(enqueue_file(file)) { + scan_complete = 1; + // TODO: error + return NULL; + } - while(poll(fds, numfds, 1000) >= 0) { - time_t tend = time(NULL); - time_t diff = tend - tbegin; - tbegin = tend; + stack = ucx_list_prepend(NULL, file); + while(stack) { + SrcFile *elm = stack->data; + UcxList *next = stack->next; + free(stack); + stack = next; - if((fds[0].revents & POLLIN) == POLLIN) { - printf("accept\n"); - int fd = accept(srvctrl, NULL, 0); - if(fd < 0) { - break; + char *path = util_concat_path(settings->from, elm->path); + + int dir_fd = open(path, O_RDONLY); + if(dir_fd < 0) { + // TODO: error? + continue; + } + + // read directory and enqueue all children + DIR *dir = fdopendir(dir_fd); + struct dirent *ent; + while((ent = readdir(dir)) != NULL) { + char *name = ent->d_name; + if(!strcmp(name, ".") || !strcmp(name, "..")) { + continue; } - //int flags = fcntl(fd, F_GETFL, 0); - //flags = flags & ~O_NONBLOCK; - //fcntl(fd, F_SETFL, flags); - - CtrlClient *client = malloc(sizeof(CtrlClient)); - memset(client, 0, sizeof(CtrlClient)); - client->fd = fd; + struct stat s; + if(fstatat(dir_fd, name, &s, 0)) { + // TODO: error? + continue; + } - printf("add client: %d\n", client->fd); + SrcFile *f = calloc(1, sizeof(SrcFile)); + f->path = util_concat_path(elm->path, name); + f->isdir = S_ISDIR(s.st_mode); + f->depends_on = elm; - fds[numfds].fd = client->fd; - fds[numfds].events = POLLIN; - fds[numfds].revents = 0; - clients[numfds] = client; - numfds++; - } - - // check clients - int remove = 0; - for(int i=1;ibuf + client->pos, CLIENT_MSG_BUFSIZE - client->pos); - if(r <= 0) { - printf("remove client: %d\n", fds[i].fd); - fds[i].events = 0; - remove = 1; - } else { - client->pos += r; - - int msgret = handle_messages(client); - if(msgret == 1) { - fds[i].events = 0; - remove = 1; - } else if(msgret == -1) { - abort = 1; - } - } - } - } - - if(remove) { - int j = 1; - for(int i=1;i= CLIENT_UPDATE_INTERVALL) { - for(int i=1;iisdir) { + stack = ucx_list_prepend(stack, f); } } - if(abort) break; - - timeout = numfds > 1 ? TIMEOUT_CLIENT : TIMEOUT_IDLE; + closedir(dir); } - unlink(socket_path); + scan_complete = 1; - return 0; + return NULL; } - -void client_free(CtrlClient *client) { - free(client); +int mzcp_start_copy_threads(CPSettings *settings) { + if(settings->num_threads == 0) { + num_copy_threads = 1; + } else if(settings->num_threads > MAX_COPY_THREADS) { + num_copy_threads = MAX_COPY_THREADS; + } else { + num_copy_threads = settings->num_threads; + } + + copy_threads = calloc(num_copy_threads, sizeof(pthread_t)); + + int f = 0; + for(int i=0;ipos == CLIENT_MSG_BUFSIZE) { +int enqueue_file(SrcFile *file) { + MZQueue *q = malloc(sizeof(MZQueue)); + if(!q) { return 1; } - int msgstart = 0; - for(int i=0;ipos;i++) { - if(client->buf[i] == '\n') { - sstr_t msg; - msg.ptr = &client->buf[msgstart]; - msg.length = i - msgstart; - msgstart = i+1; - - int msgret = handle_client_msg(client, msg); - if(msgret) return msgret; - } - } + q->file = file; + q->next = NULL; - if(msgstart < client->pos) { - // incomplete message - memmove(client->buf, client->buf + msgstart, client->pos - msgstart); - client->pos -= msgstart; + pthread_mutex_lock(&queue_lock); + + if(queue_end) { + queue_end->next = q; + queue_end = q; } else { - client->pos = 0; + queue_begin = q; + queue_end = q; } + pthread_cond_signal(&queue_available); + pthread_mutex_unlock(&queue_lock); + return 0; } -int handle_client_msg(CtrlClient *client, sstr_t msg) { - printf("msg: %.*s\n", (int)msg.length, msg.ptr); +static SrcFile* queue_get_file(void) { + SrcFile *file = NULL; + pthread_mutex_lock(&queue_lock); + + MZQueue *q = NULL; + while(!q) { + if(!queue_begin) { + if(scan_complete) { + break; + } + pthread_cond_wait(&queue_available, &queue_lock); + continue; + } else { + q = queue_begin; + queue_begin = queue_begin->next; + if(!queue_begin) { + queue_end = NULL; + } + } + } - if(!sstrcmp(msg, S("abort"))) { - return -1; + if(q) { + file = q->file; + free(q); } - return 0; + pthread_mutex_unlock(&queue_lock); + + return file; } -void client_send_status(CtrlClient *client) { - char *msg = "s 0\n"; - write(client->fd, msg, strlen(msg)); +void* copy_run(void *data) { + CPSettings *settings = data; + for(;;) { + SrcFile *file = queue_get_file(); + if(!file) { + break; + } + + char *from = file->path ? util_concat_path(settings->from, file->path) : settings->from; + printf("src: %s\n", from); + + char *to = util_concat_path(settings->to, file->path ? file->path : util_resource_name(settings->from)); + printf("dst: %s\n", to); + + free(to); + if(from != settings->from) { + free(from); + } + } + + return NULL; } +