mizucp: implement scan thread
[mizunara.git] / mizucp / main.c
index 9dc305a..18aaf5a 100644 (file)
@@ -1,33 +1,29 @@
 /*
- * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER.
+ * Copyright 2021 Olaf Wintermann
  *
- * Copyright 2021 Olaf Wintermann. All rights reserved.
+ * Permission is hereby granted, free of charge, to any person obtaining a 
+ * copy of this software and associated documentation files (the "Software"), 
+ * to deal in the Software without restriction, including without limitation 
+ * the rights to use, copy, modify, merge, publish, distribute, sublicense, 
+ * and/or sell copies of the Software, and to permit persons to whom the 
+ * Software is furnished to do so, subject to the following conditions:
+ * 
+ * The above copyright notice and this permission notice shall be included in 
+ * all copies or substantial portions of the Software.
  *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met:
- *
- *   1. Redistributions of source code must retain the above copyright
- *      notice, this list of conditions and the following disclaimer.
- *
- *   2. Redistributions in binary form must reproduce the above copyright
- *      notice, this list of conditions and the following disclaimer in the
- *      documentation and/or other materials provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
- * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
- * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
- * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
- * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
- * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
- * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
- * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
- * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
- * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL 
+ * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING 
+ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER 
+ * DEALINGS IN THE SOFTWARE.
  */
 
 #include "main.h"
 
+#include "srvctrl.h"
+
 #include <stdio.h>
 #include <stdlib.h>
 #include <string.h>
 #include <sys/socket.h>
 #include <sys/un.h>
 #include <sys/fcntl.h>
-#include <pthread.h>
 #include <poll.h>
+#include <dirent.h>
 
 #include <libidav/utils.h>
 
 #include <ucx/utils.h>
 
-#define OPTSTR "hlpsuv"
 
-#define TIMEOUT_IDLE -1
-#define TIMEOUT_CLIENT 1000
-#define CLIENT_UPDATE_INTERVALL 1
+#define OPTSTR "hlpsuv"
 
 static char *cfgdir;
-static char *socket_path;
+static char *copydir;
+
 
-static int srvctrl;
+static pthread_t scan_thread;
 
-static int eventp[2];
+static pthread_t *copy_threads;
+static size_t num_copy_threads;
+
+static pthread_mutex_t queue_lock;
+static pthread_cond_t  queue_available;
+static MZQueue         *queue_begin;
+static MZQueue         *queue_end;
+
+static int scan_complete;
 
 int main(int argc, char** argv) { 
     int ret = 1;
@@ -95,7 +97,7 @@ int main(int argc, char** argv) {
         // copy
         settings.from = argv[optind];
         settings.to   = argv[optind+1];
-        ret = uwcp_copy(&settings);
+        ret = mzcp_copy(&settings);
     } else {
         
         // print usage
@@ -104,81 +106,67 @@ int main(int argc, char** argv) {
     return ret;
 }
 
-static int check_configdir(void) {
-    char *home = getenv(UWCP_ENV_HOME);
-    
-    cfgdir = util_concat_path(home, UWCP_CFG_DIR);
-    
+static int check_dir(const char *path) {
     struct stat s;
-    if(stat(cfgdir, &s)) {
+    if(stat(path, &s)) {
         if(errno == ENOENT) {
-            if(mkdir(cfgdir, S_IRWXU)) {
-                fprintf(stderr, "Cannot create %s: %s", cfgdir, strerror(errno));
+            if(mkdir(path, S_IRWXU)) {
+                fprintf(stderr, "Cannot create %s: %s", path, strerror(errno));
                 return 1;
             }
         } else {
-            fprintf(stderr, "Cannot access %s: %s", cfgdir, strerror(errno));
+            fprintf(stderr, "Cannot access %s: %s", path, strerror(errno));
             return 1;
         }
     }
-    
     return 0;
 }
 
-static int create_control_socket(void) {
-    char *copydir = util_concat_path(cfgdir, UWCP_COPY_DIR);
+static int check_configdir(void) {
+    char *home = getenv(MZCP_ENV_HOME);
     
-    struct stat s;
-    if(stat(copydir, &s)) {
-        if(errno == ENOENT) {
-            if(mkdir(copydir, S_IRWXU)) {
-                fprintf(stderr, "Cannot create %s: %s", copydir, strerror(errno));
-                return 1;
-            }
-        } else {
-            fprintf(stderr, "Cannot access %s: %s", copydir, strerror(errno));
-            return 1;
-        }
-    }
+    char *base = util_concat_path(home, MZCP_CFG_BASE);
+    if(check_dir(base)) return 1;
+    free(base);
+
+    cfgdir = util_concat_path(home, MZCP_CFG_DIR);
+    if(check_dir(cfgdir)) return 1;
     
-    // create unix domain socket
-    char *random_str = util_random_str();
-    sstr_t socketp = ucx_sprintf("%s/%.*s", copydir, 8, random_str);
-    free(random_str);
-    socket_path = socketp.ptr;
-    
-    struct sockaddr_un addr;
-    if(socketp.length > sizeof(addr.sun_path)-1) {
-        fprintf(stderr,
-                "path '%s' too long for unix domain socket",
-                socketp.ptr);
-        return 1;
-    }
+    copydir = util_concat_path(mzcp_get_cfgdir(), MZCP_COPY_DIR);
+    if(check_dir(copydir)) return 1;
     
-    memset(&addr, 0, sizeof(addr));
-    addr.sun_family = AF_UNIX;
-    memcpy(addr.sun_path, socketp.ptr, socketp.length);
+    return 0;
+}
+
+const char* mzcp_get_cfgdir(void) {
+    return cfgdir;
+}
+
+const char* mzcp_get_copydir(void) {
+    return copydir;
+}
+
+static int init_queue(void) {
+    if(pthread_mutex_init(&queue_lock, NULL)) return 1;
+    if(pthread_cond_init(&queue_available, NULL)) return 1;
+    return 0;
+}
+
+static MZQueue* queue_root_elm_new(void) {
+    SrcFile *file = malloc(sizeof(SrcFile));
+    if(!file) return NULL;
+    memset(file, 0, sizeof(SrcFile));
     
-    srvctrl = socket(AF_UNIX, SOCK_STREAM, 0);
-    if(srvctrl == -1) {
-        fprintf(stderr,
-                "Cannot create server control socket: %s",
-                strerror(errno));
-        return 1;
-    }
-    if(bind(srvctrl, (struct sockaddr*)&addr, sizeof(addr))) {
-        fprintf(stderr,
-                "srvctrl socket bind failed: %s",
-                strerror(errno));
-        return 1;
-    }
+    MZQueue *q = malloc(sizeof(MZQueue));
+    if(!q) return NULL;
     
-    listen(srvctrl, 4);
+    q->file = file;
+    q->next = NULL;
     
-    return 0;
+    return q;
 }
 
-int uwcp_copy(CPSettings *settings) {
+int mzcp_copy(CPSettings *settings) {
     int ret  = 0;
     
     if(check_configdir()) {
@@ -190,8 +178,12 @@ int uwcp_copy(CPSettings *settings) {
         return 3;
     }
     
+    if(init_queue()) {
+        return 4;
+    }
+    
     if(settings->printsocket) {
-        printf("%s\n", socket_path);
+        printf("%s\n", mzcp_get_socketpath());
     } else {
         printf("copy %s to %s\n", settings->from, settings->to);
         if(settings->pause) {
@@ -206,162 +198,225 @@ int uwcp_copy(CPSettings *settings) {
         //close(1);
         //close(2);
         
-        ret =  uwcp_srvctrl(settings);
+        if(mzcp_start_scan(settings)) {
+            return 2;
+        }
+        
+        if(mzcp_start_copy_threads(settings)) {
+            
+            return 3;
+        }
+        
+        ret =  mzcp_srvctrl(settings);
     }
     
     return ret;
 }
 
-int uwcp_srvctrl(CPSettings *settings) {
-    if(pipe(eventp)) {
-        perror("Cannot create event pipe");
+
+int mzcp_start_scan(CPSettings *settings) {
+    struct stat s;
+    if(stat(settings->from, &s)) {
+        // TODO: error
         return 1;
     }
     
-    size_t allocfds = 8;
-    size_t numfds = 1;
-    
-    struct pollfd *fds = calloc(allocfds, sizeof(struct pollfd));
-    CtrlClient **clients = calloc(allocfds, sizeof(void*));
+    if(!S_ISDIR(s.st_mode)) {
+        // queue single file
+        queue_begin = queue_root_elm_new();
+        if(!queue_begin) {
+            return 1;
+        }
+        queue_end = queue_begin;
+        scan_complete = 1;
+        settings->num_threads = 1;
+    } else {
+        // scan src directory in a separate thread
+        if(pthread_create(&scan_thread, NULL, scan_run, settings)) {
+            // TODO: we need some clever error handling
+            // we are already in the forked procress and stdout/stderr are closed
+            // maybe wait for someone to connect to the unix domain socket
+            return 1;
+        }
+    }
     
-    int timeout = TIMEOUT_IDLE;
+    return 0;
+}
+
+
+void* scan_run(void *data) {
+    CPSettings *settings = data;
     
-    fds[0].fd = srvctrl;
-    fds[0].events = POLLIN;
+    UcxList *stack = NULL;
     
-    int abort = 0;
+    char *root = strdup("");
     
-    time_t tbegin = time(NULL);
+    SrcFile *file = calloc(1, sizeof(SrcFile));
+    if(!file) {
+        scan_complete = 1;
+        // TODO: error
+        return NULL;
+    }
+    file->path = root;
+    file->isdir = 1;
+    if(enqueue_file(file)) {
+        scan_complete = 1;
+        // TODO: error
+        return NULL;
+    }
     
-    while(poll(fds, numfds, 1000) >= 0) {
-        time_t tend = time(NULL);
-        time_t diff = tend - tbegin;
-        tbegin = tend;
+    stack = ucx_list_prepend(NULL, file);
+    while(stack) {
+        SrcFile *elm = stack->data;
+        UcxList *next = stack->next;
+        free(stack);
+        stack = next;
         
-        if((fds[0].revents & POLLIN) == POLLIN) {
-            printf("accept\n");
-            int fd = accept(srvctrl, NULL, 0);
-            if(fd < 0) {
-                break;
+        char *path = util_concat_path(settings->from, elm->path);
+        
+        int dir_fd = open(path, O_RDONLY);
+        if(dir_fd < 0) {
+            // TODO: error?
+            continue;
+        }
+        
+        // read directory and enqueue all children
+        DIR *dir = fdopendir(dir_fd);
+        struct dirent *ent;
+        while((ent = readdir(dir)) != NULL) {
+            char *name = ent->d_name;
+            if(!strcmp(name, ".") || !strcmp(name, "..")) {
+                continue;
             }
             
-            //int flags = fcntl(fd, F_GETFL, 0);
-            //flags = flags & ~O_NONBLOCK;
-            //fcntl(fd, F_SETFL, flags);
-            
-            CtrlClient *client = malloc(sizeof(CtrlClient));
-            memset(client, 0, sizeof(CtrlClient));
-            client->fd = fd;
+            struct stat s;
+            if(fstatat(dir_fd, name, &s, 0)) {
+                // TODO: error?
+                continue;
+            }
             
-            printf("add client: %d\n", client->fd);
+            SrcFile *f = calloc(1, sizeof(SrcFile));
+            f->path = util_concat_path(elm->path, name);
+            f->isdir = S_ISDIR(s.st_mode);
+            f->depends_on = elm;
             
-            fds[numfds].fd = client->fd;
-            fds[numfds].events = POLLIN;
-            fds[numfds].revents = 0;
-            clients[numfds] = client;
-            numfds++;
-        }
-        
-        // check clients
-        int remove = 0;
-        for(int i=1;i<numfds;i++) {
-            if((fds[i].revents & POLLIN) == POLLIN) {
-                CtrlClient *client = clients[i];
-                ssize_t r = read(fds[i].fd, client->buf + client->pos, CLIENT_MSG_BUFSIZE - client->pos);
-                if(r <= 0) {
-                    printf("remove client: %d\n", fds[i].fd);
-                    fds[i].events = 0;
-                    remove = 1;
-                } else {
-                    client->pos += r;
-                    
-                    int msgret = handle_messages(client);
-                    if(msgret == 1) {
-                        fds[i].events = 0;
-                        remove = 1;
-                    } else if(msgret == -1) {
-                        abort = 1;
-                    }
-                }
-            }
-        }
-        
-        if(remove) {
-            int j = 1;
-            for(int i=1;i<numfds;i++) {
-                if(fds[i].events != 0) {
-                    fds[j] = fds[i];
-                    clients[j] = clients[j];
-                    j++;
-                } else {
-                    client_free(clients[i]);
-                    close(fds[i].fd);
-                }
+            if(enqueue_file(f)) {
+                // TODO: error?
+                fprintf(stderr, "enqueue failed\n");
+                break;
             }
-            numfds = j;
-        }
-        
-        if(diff >= CLIENT_UPDATE_INTERVALL) {
-            for(int i=1;i<numfds;i++) {
-                client_send_status(clients[i]);
+            
+            // put dir on stack
+            if(f->isdir) {
+                stack = ucx_list_prepend(stack, f);
             }
         }
         
-        if(abort) break;
-        
-        timeout = numfds > 1 ? TIMEOUT_CLIENT : TIMEOUT_IDLE;
+        closedir(dir);
     }
     
-    unlink(socket_path);
+    scan_complete = 1;
     
-    return 0;
+    return NULL;
 }
 
-
-void client_free(CtrlClient *client) {
-    free(client);
+int mzcp_start_copy_threads(CPSettings *settings) {
+    if(settings->num_threads == 0) {
+        num_copy_threads = 1;
+    } else if(settings->num_threads > MAX_COPY_THREADS) {
+        num_copy_threads = MAX_COPY_THREADS;
+    } else {
+        num_copy_threads = settings->num_threads;
+    }
+    
+    copy_threads = calloc(num_copy_threads, sizeof(pthread_t));
+    
+    int f = 0;
+    for(int i=0;i<num_copy_threads;i++) {
+        if(pthread_create(&copy_threads[i], NULL, copy_run, settings)) {
+            f++;
+        }
+    }
+    
+    return f < num_copy_threads ? 0 : 1;
 }
 
-int handle_messages(CtrlClient *client) {
-    if(client->pos == CLIENT_MSG_BUFSIZE) {
+int enqueue_file(SrcFile *file) {
+    MZQueue *q = malloc(sizeof(MZQueue));
+    if(!q) {
         return 1;
     }
     
-    int msgstart = 0;
-    for(int i=0;i<client->pos;i++) {
-        if(client->buf[i] == '\n') {
-            sstr_t msg;
-            msg.ptr = &client->buf[msgstart];
-            msg.length = i - msgstart;
-            msgstart = i+1;
-            
-            int msgret = handle_client_msg(client, msg);
-            if(msgret) return msgret;
-        }
-    }
+    q->file = file;
+    q->next = NULL;
     
-    if(msgstart < client->pos) {
-        // incomplete message
-        memmove(client->buf, client->buf + msgstart, client->pos - msgstart);
-        client->pos -= msgstart;
+    pthread_mutex_lock(&queue_lock);
+    
+    if(queue_end) {
+        queue_end->next = q;
+        queue_end = q;
     } else {
-        client->pos = 0;
+        queue_begin = q;
+        queue_end = q;
     }
     
+    pthread_cond_signal(&queue_available);
+    pthread_mutex_unlock(&queue_lock);
+    
     return 0;
 }
 
-int handle_client_msg(CtrlClient *client, sstr_t msg) {
-    printf("msg: %.*s\n", (int)msg.length, msg.ptr);
+static SrcFile* queue_get_file(void) {
+    SrcFile *file = NULL;
+    pthread_mutex_lock(&queue_lock);
+    
+    MZQueue *q = NULL;
+    while(!q) {
+        if(!queue_begin) {
+            if(scan_complete) {
+                break;
+            }
+            pthread_cond_wait(&queue_available, &queue_lock);
+            continue;
+        } else {
+            q = queue_begin;
+            queue_begin = queue_begin->next;
+            if(!queue_begin) {
+                queue_end = NULL;
+            }
+        }
+    }
     
-    if(!sstrcmp(msg, S("abort"))) {
-        return -1;
+    if(q) {
+        file = q->file;
+        free(q);
     }
     
-    return 0;
+    pthread_mutex_unlock(&queue_lock);
+    
+    return file;
 }
 
-void client_send_status(CtrlClient *client) {
-    char *msg = "s 0\n";
-    write(client->fd, msg, strlen(msg));
+void* copy_run(void *data) {
+    CPSettings *settings = data;
+    for(;;) {
+        SrcFile *file = queue_get_file();
+        if(!file) {
+            break;
+        }
+        
+        char *from = file->path ? util_concat_path(settings->from, file->path) : settings->from;
+        printf("src: %s\n", from);
+        
+        char *to = util_concat_path(settings->to, file->path ? file->path : util_resource_name(settings->from));
+        printf("dst: %s\n", to);
+        
+        free(to);
+        if(from != settings->from) {
+            free(from);
+        }
+    }
+    
+    return NULL;
 }
+