mizucp: implement scan thread
authorOlaf Wintermann <olaf.wintermann@gmail.com>
Wed, 16 Jun 2021 09:51:17 +0000 (11:51 +0200)
committerOlaf Wintermann <olaf.wintermann@gmail.com>
Wed, 16 Jun 2021 09:51:17 +0000 (11:51 +0200)
mizucp/Makefile
mizucp/main.c
mizucp/main.h
mizucp/srvctrl.c
mizucp/srvctrl.h

index 8e1e5fd..5e816f3 100644 (file)
@@ -34,6 +34,7 @@ CFLAGS += -I../ucx -I..
 SRC = main.c
 SRC += srvctrl.c
 
+
 OBJ = $(SRC:%.c=$(BUILD_ROOT)/build/mizucp/%.$(OBJ_EXT))
 
 all: $(BUILD_ROOT)/build/bin/mizucp
index d8a05d6..18aaf5a 100644 (file)
@@ -1,29 +1,23 @@
 /*
- * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER.
+ * Copyright 2021 Olaf Wintermann
  *
- * Copyright 2021 Olaf Wintermann. All rights reserved.
+ * Permission is hereby granted, free of charge, to any person obtaining a 
+ * copy of this software and associated documentation files (the "Software"), 
+ * to deal in the Software without restriction, including without limitation 
+ * the rights to use, copy, modify, merge, publish, distribute, sublicense, 
+ * and/or sell copies of the Software, and to permit persons to whom the 
+ * Software is furnished to do so, subject to the following conditions:
+ * 
+ * The above copyright notice and this permission notice shall be included in 
+ * all copies or substantial portions of the Software.
  *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met:
- *
- *   1. Redistributions of source code must retain the above copyright
- *      notice, this list of conditions and the following disclaimer.
- *
- *   2. Redistributions in binary form must reproduce the above copyright
- *      notice, this list of conditions and the following disclaimer in the
- *      documentation and/or other materials provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
- * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
- * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
- * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
- * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
- * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
- * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
- * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
- * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
- * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL 
+ * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING 
+ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER 
+ * DEALINGS IN THE SOFTWARE.
  */
 
 #include "main.h"
 #include <sys/socket.h>
 #include <sys/un.h>
 #include <sys/fcntl.h>
-#include <pthread.h>
 #include <poll.h>
+#include <dirent.h>
 
 #include <libidav/utils.h>
 
 #include <ucx/utils.h>
 
 
-
 #define OPTSTR "hlpsuv"
 
-
-
 static char *cfgdir;
 static char *copydir;
 
+
+static pthread_t scan_thread;
+
+static pthread_t *copy_threads;
+static size_t num_copy_threads;
+
+static pthread_mutex_t queue_lock;
+static pthread_cond_t  queue_available;
+static MZQueue         *queue_begin;
+static MZQueue         *queue_end;
+
+static int scan_complete;
+
 int main(int argc, char** argv) { 
     int ret = 1;
     
@@ -142,6 +146,26 @@ const char* mzcp_get_copydir(void) {
     return copydir;
 }
 
+static int init_queue(void) {
+    if(pthread_mutex_init(&queue_lock, NULL)) return 1;
+    if(pthread_cond_init(&queue_available, NULL)) return 1;
+    return 0;
+}
+
+static MZQueue* queue_root_elm_new(void) {
+    SrcFile *file = malloc(sizeof(SrcFile));
+    if(!file) return NULL;
+    memset(file, 0, sizeof(SrcFile));
+    
+    MZQueue *q = malloc(sizeof(MZQueue));
+    if(!q) return NULL;
+    
+    q->file = file;
+    q->next = NULL;
+    
+    return q;
+}
+
 int mzcp_copy(CPSettings *settings) {
     int ret  = 0;
     
@@ -154,6 +178,10 @@ int mzcp_copy(CPSettings *settings) {
         return 3;
     }
     
+    if(init_queue()) {
+        return 4;
+    }
+    
     if(settings->printsocket) {
         printf("%s\n", mzcp_get_socketpath());
     } else {
@@ -170,8 +198,225 @@ int mzcp_copy(CPSettings *settings) {
         //close(1);
         //close(2);
         
+        if(mzcp_start_scan(settings)) {
+            return 2;
+        }
+        
+        if(mzcp_start_copy_threads(settings)) {
+            
+            return 3;
+        }
+        
         ret =  mzcp_srvctrl(settings);
     }
     
     return ret;
 }
+
+
+int mzcp_start_scan(CPSettings *settings) {
+    struct stat s;
+    if(stat(settings->from, &s)) {
+        // TODO: error
+        return 1;
+    }
+    
+    if(!S_ISDIR(s.st_mode)) {
+        // queue single file
+        queue_begin = queue_root_elm_new();
+        if(!queue_begin) {
+            return 1;
+        }
+        queue_end = queue_begin;
+        scan_complete = 1;
+        settings->num_threads = 1;
+    } else {
+        // scan src directory in a separate thread
+        if(pthread_create(&scan_thread, NULL, scan_run, settings)) {
+            // TODO: we need some clever error handling
+            // we are already in the forked procress and stdout/stderr are closed
+            // maybe wait for someone to connect to the unix domain socket
+            return 1;
+        }
+    }
+    
+    return 0;
+}
+
+
+void* scan_run(void *data) {
+    CPSettings *settings = data;
+    
+    UcxList *stack = NULL;
+    
+    char *root = strdup("");
+    
+    SrcFile *file = calloc(1, sizeof(SrcFile));
+    if(!file) {
+        scan_complete = 1;
+        // TODO: error
+        return NULL;
+    }
+    file->path = root;
+    file->isdir = 1;
+    if(enqueue_file(file)) {
+        scan_complete = 1;
+        // TODO: error
+        return NULL;
+    }
+    
+    stack = ucx_list_prepend(NULL, file);
+    while(stack) {
+        SrcFile *elm = stack->data;
+        UcxList *next = stack->next;
+        free(stack);
+        stack = next;
+        
+        char *path = util_concat_path(settings->from, elm->path);
+        
+        int dir_fd = open(path, O_RDONLY);
+        if(dir_fd < 0) {
+            // TODO: error?
+            continue;
+        }
+        
+        // read directory and enqueue all children
+        DIR *dir = fdopendir(dir_fd);
+        struct dirent *ent;
+        while((ent = readdir(dir)) != NULL) {
+            char *name = ent->d_name;
+            if(!strcmp(name, ".") || !strcmp(name, "..")) {
+                continue;
+            }
+            
+            struct stat s;
+            if(fstatat(dir_fd, name, &s, 0)) {
+                // TODO: error?
+                continue;
+            }
+            
+            SrcFile *f = calloc(1, sizeof(SrcFile));
+            f->path = util_concat_path(elm->path, name);
+            f->isdir = S_ISDIR(s.st_mode);
+            f->depends_on = elm;
+            
+            if(enqueue_file(f)) {
+                // TODO: error?
+                fprintf(stderr, "enqueue failed\n");
+                break;
+            }
+            
+            // put dir on stack
+            if(f->isdir) {
+                stack = ucx_list_prepend(stack, f);
+            }
+        }
+        
+        closedir(dir);
+    }
+    
+    scan_complete = 1;
+    
+    return NULL;
+}
+
+int mzcp_start_copy_threads(CPSettings *settings) {
+    if(settings->num_threads == 0) {
+        num_copy_threads = 1;
+    } else if(settings->num_threads > MAX_COPY_THREADS) {
+        num_copy_threads = MAX_COPY_THREADS;
+    } else {
+        num_copy_threads = settings->num_threads;
+    }
+    
+    copy_threads = calloc(num_copy_threads, sizeof(pthread_t));
+    
+    int f = 0;
+    for(int i=0;i<num_copy_threads;i++) {
+        if(pthread_create(&copy_threads[i], NULL, copy_run, settings)) {
+            f++;
+        }
+    }
+    
+    return f < num_copy_threads ? 0 : 1;
+}
+
+int enqueue_file(SrcFile *file) {
+    MZQueue *q = malloc(sizeof(MZQueue));
+    if(!q) {
+        return 1;
+    }
+    
+    q->file = file;
+    q->next = NULL;
+    
+    pthread_mutex_lock(&queue_lock);
+    
+    if(queue_end) {
+        queue_end->next = q;
+        queue_end = q;
+    } else {
+        queue_begin = q;
+        queue_end = q;
+    }
+    
+    pthread_cond_signal(&queue_available);
+    pthread_mutex_unlock(&queue_lock);
+    
+    return 0;
+}
+
+static SrcFile* queue_get_file(void) {
+    SrcFile *file = NULL;
+    pthread_mutex_lock(&queue_lock);
+    
+    MZQueue *q = NULL;
+    while(!q) {
+        if(!queue_begin) {
+            if(scan_complete) {
+                break;
+            }
+            pthread_cond_wait(&queue_available, &queue_lock);
+            continue;
+        } else {
+            q = queue_begin;
+            queue_begin = queue_begin->next;
+            if(!queue_begin) {
+                queue_end = NULL;
+            }
+        }
+    }
+    
+    if(q) {
+        file = q->file;
+        free(q);
+    }
+    
+    pthread_mutex_unlock(&queue_lock);
+    
+    return file;
+}
+
+void* copy_run(void *data) {
+    CPSettings *settings = data;
+    for(;;) {
+        SrcFile *file = queue_get_file();
+        if(!file) {
+            break;
+        }
+        
+        char *from = file->path ? util_concat_path(settings->from, file->path) : settings->from;
+        printf("src: %s\n", from);
+        
+        char *to = util_concat_path(settings->to, file->path ? file->path : util_resource_name(settings->from));
+        printf("dst: %s\n", to);
+        
+        free(to);
+        if(from != settings->from) {
+            free(from);
+        }
+    }
+    
+    return NULL;
+}
+
index f30b12a..872b06f 100644 (file)
@@ -1,29 +1,23 @@
 /*
- * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER.
+ * Copyright 2021 Olaf Wintermann
  *
- * Copyright 2021 Olaf Wintermann. All rights reserved.
+ * Permission is hereby granted, free of charge, to any person obtaining a 
+ * copy of this software and associated documentation files (the "Software"), 
+ * to deal in the Software without restriction, including without limitation 
+ * the rights to use, copy, modify, merge, publish, distribute, sublicense, 
+ * and/or sell copies of the Software, and to permit persons to whom the 
+ * Software is furnished to do so, subject to the following conditions:
+ * 
+ * The above copyright notice and this permission notice shall be included in 
+ * all copies or substantial portions of the Software.
  *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met:
- *
- *   1. Redistributions of source code must retain the above copyright
- *      notice, this list of conditions and the following disclaimer.
- *
- *   2. Redistributions in binary form must reproduce the above copyright
- *      notice, this list of conditions and the following disclaimer in the
- *      documentation and/or other materials provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
- * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
- * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
- * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
- * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
- * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
- * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
- * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
- * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
- * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL 
+ * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING 
+ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER 
+ * DEALINGS IN THE SOFTWARE.
  */
 
 #ifndef MAIN_H
@@ -33,6 +27,8 @@
 
 #include <ucx/string.h>
 
+#include <pthread.h>
+
 #ifdef __cplusplus
 extern "C" {
 #endif
@@ -44,21 +40,73 @@ extern "C" {
     
 #define CLIENT_MSG_BUFSIZE 512
     
+#define MAX_COPY_THREADS 32
+    
 typedef char CPBool;
 
+typedef struct SrcFile SrcFile;
+typedef struct MZLock  MZLock;
+typedef struct MZQueue MZQueue;
+
 typedef struct {
     char   *from;
     char   *to;
+    size_t num_threads;
     CPBool url;
     CPBool pause;
     CPBool printsocket;
 } CPSettings;
 
+struct MZLock {
+    pthread_mutex_t mutex;
+    pthread_cond_t cond;
+};
+
+struct SrcFile {
+    /*
+     * src file path relative to CPSettings.from
+     */
+    char *path;
+    
+    /*
+     * is the file a directory
+     */
+    CPBool isdir;
+    
+    /*
+     * file successfully copied
+     */
+    CPBool finished;
+    
+    /*
+     * processing this file depends on another file (directory)
+     */
+    SrcFile *depends_on;
+    
+    /*
+     * lock used for waiting, until this resource is successfully copied
+     */
+    MZLock *lock;
+};
+
+struct MZQueue {
+    SrcFile *file;
+    MZQueue *next;
+};
+
 const char* mzcp_get_cfgdir(void);
 const char* mzcp_get_copydir(void);
 
 int mzcp_copy(CPSettings *settings);
 
+int mzcp_start_scan(CPSettings *settings);
+void* scan_run(void *data);
+
+int enqueue_file(SrcFile *file);
+
+int mzcp_start_copy_threads(CPSettings *settings);
+void* copy_run(void *data);
+
 
     
 #ifdef __cplusplus
index 4b3bbd9..8c2d089 100644 (file)
@@ -1,31 +1,24 @@
 /*
- * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER.
+ * Copyright 2019 Olaf Wintermann
  *
- * Copyright 2021 Olaf Wintermann. All rights reserved.
+ * Permission is hereby granted, free of charge, to any person obtaining a 
+ * copy of this software and associated documentation files (the "Software"), 
+ * to deal in the Software without restriction, including without limitation 
+ * the rights to use, copy, modify, merge, publish, distribute, sublicense, 
+ * and/or sell copies of the Software, and to permit persons to whom the 
+ * Software is furnished to do so, subject to the following conditions:
+ * 
+ * The above copyright notice and this permission notice shall be included in 
+ * all copies or substantial portions of the Software.
  *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met:
- *
- *   1. Redistributions of source code must retain the above copyright
- *      notice, this list of conditions and the following disclaimer.
- *
- *   2. Redistributions in binary form must reproduce the above copyright
- *      notice, this list of conditions and the following disclaimer in the
- *      documentation and/or other materials provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
- * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
- * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
- * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
- * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
- * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
- * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
- * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
- * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
- * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL 
+ * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING 
+ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER 
+ * DEALINGS IN THE SOFTWARE.
  */
-
 #include "srvctrl.h"
 
 #include <stdio.h>
@@ -54,7 +47,7 @@ static char *socket_path;
 static int srvctrl;
 
 
-int create_control_socket(void) {
+int create_control_socket(void) { 
     const char *copydir = mzcp_get_copydir();
     
     // create unix domain socket
index fadb830..9c78d40 100644 (file)
@@ -1,29 +1,23 @@
 /*
- * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER.
+ * Copyright 2019 Olaf Wintermann
  *
- * Copyright 2021 Olaf Wintermann. All rights reserved.
+ * Permission is hereby granted, free of charge, to any person obtaining a 
+ * copy of this software and associated documentation files (the "Software"), 
+ * to deal in the Software without restriction, including without limitation 
+ * the rights to use, copy, modify, merge, publish, distribute, sublicense, 
+ * and/or sell copies of the Software, and to permit persons to whom the 
+ * Software is furnished to do so, subject to the following conditions:
+ * 
+ * The above copyright notice and this permission notice shall be included in 
+ * all copies or substantial portions of the Software.
  *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met:
- *
- *   1. Redistributions of source code must retain the above copyright
- *      notice, this list of conditions and the following disclaimer.
- *
- *   2. Redistributions in binary form must reproduce the above copyright
- *      notice, this list of conditions and the following disclaimer in the
- *      documentation and/or other materials provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
- * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
- * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
- * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
- * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
- * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
- * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
- * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
- * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
- * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL 
+ * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING 
+ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER 
+ * DEALINGS IN THE SOFTWARE.
  */
 
 #ifndef SRVTRL_H