/*
- * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER.
+ * Copyright 2021 Olaf Wintermann
*
- * Copyright 2021 Olaf Wintermann. All rights reserved.
+ * Permission is hereby granted, free of charge, to any person obtaining a
+ * copy of this software and associated documentation files (the "Software"),
+ * to deal in the Software without restriction, including without limitation
+ * the rights to use, copy, modify, merge, publish, distribute, sublicense,
+ * and/or sell copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
*
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met:
- *
- * 1. Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- *
- * 2. Redistributions in binary form must reproduce the above copyright
- * notice, this list of conditions and the following disclaimer in the
- * documentation and/or other materials provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
- * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
- * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
- * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
- * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
- * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
- * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
- * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
- * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
- * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
+ * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+ * DEALINGS IN THE SOFTWARE.
*/
#include "main.h"
+#include "srvctrl.h"
+
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <sys/fcntl.h>
-#include <pthread.h>
#include <poll.h>
+#include <dirent.h>
#include <libidav/utils.h>
#include <ucx/utils.h>
-#define OPTSTR "hlpsuv"
-#define TIMEOUT_IDLE -1
-#define TIMEOUT_CLIENT 1000
-#define CLIENT_UPDATE_INTERVALL 1
+#define OPTSTR "hlpsuv"
static char *cfgdir;
-static char *socket_path;
+static char *copydir;
+
+
+static pthread_t scan_thread;
+
+static pthread_t *copy_threads;
+static size_t num_copy_threads;
-static int srvctrl;
+static pthread_mutex_t queue_lock;
+static pthread_cond_t queue_available;
+static MZQueue *queue_begin;
+static MZQueue *queue_end;
-static int eventp[2];
+static int scan_complete;
int main(int argc, char** argv) {
int ret = 1;
// copy
settings.from = argv[optind];
settings.to = argv[optind+1];
- ret = uwcp_copy(&settings);
+ ret = mzcp_copy(&settings);
} else {
// print usage
return ret;
}
-static int check_configdir(void) {
- char *home = getenv(UWCP_ENV_HOME);
-
- cfgdir = util_concat_path(home, UWCP_CFG_DIR);
-
+static int check_dir(const char *path) {
struct stat s;
- if(stat(cfgdir, &s)) {
+ if(stat(path, &s)) {
if(errno == ENOENT) {
- if(mkdir(cfgdir, S_IRWXU)) {
- fprintf(stderr, "Cannot create %s: %s", cfgdir, strerror(errno));
+ if(mkdir(path, S_IRWXU)) {
+ fprintf(stderr, "Cannot create %s: %s", path, strerror(errno));
return 1;
}
} else {
- fprintf(stderr, "Cannot access %s: %s", cfgdir, strerror(errno));
+ fprintf(stderr, "Cannot access %s: %s", path, strerror(errno));
return 1;
}
}
-
return 0;
}
-static int create_control_socket(void) {
- char *copydir = util_concat_path(cfgdir, UWCP_COPY_DIR);
+static int check_configdir(void) {
+ char *home = getenv(MZCP_ENV_HOME);
- struct stat s;
- if(stat(copydir, &s)) {
- if(errno == ENOENT) {
- if(mkdir(copydir, S_IRWXU)) {
- fprintf(stderr, "Cannot create %s: %s", copydir, strerror(errno));
- return 1;
- }
- } else {
- fprintf(stderr, "Cannot access %s: %s", copydir, strerror(errno));
- return 1;
- }
- }
+ char *base = util_concat_path(home, MZCP_CFG_BASE);
+ if(check_dir(base)) return 1;
+ free(base);
+
+ cfgdir = util_concat_path(home, MZCP_CFG_DIR);
+ if(check_dir(cfgdir)) return 1;
- // create unix domain socket
- char *random_str = util_random_str();
- sstr_t socketp = ucx_sprintf("%s/%.*s", copydir, 8, random_str);
- free(random_str);
- socket_path = socketp.ptr;
-
- struct sockaddr_un addr;
- if(socketp.length > sizeof(addr.sun_path)-1) {
- fprintf(stderr,
- "path '%s' too long for unix domain socket",
- socketp.ptr);
- return 1;
- }
+ copydir = util_concat_path(mzcp_get_cfgdir(), MZCP_COPY_DIR);
+ if(check_dir(copydir)) return 1;
- memset(&addr, 0, sizeof(addr));
- addr.sun_family = AF_UNIX;
- memcpy(addr.sun_path, socketp.ptr, socketp.length);
+ return 0;
+}
+
+const char* mzcp_get_cfgdir(void) {
+ return cfgdir;
+}
+
+const char* mzcp_get_copydir(void) {
+ return copydir;
+}
+
+static int init_queue(void) {
+ if(pthread_mutex_init(&queue_lock, NULL)) return 1;
+ if(pthread_cond_init(&queue_available, NULL)) return 1;
+ return 0;
+}
+
+static MZQueue* queue_root_elm_new(void) {
+ SrcFile *file = malloc(sizeof(SrcFile));
+ if(!file) return NULL;
+ memset(file, 0, sizeof(SrcFile));
- srvctrl = socket(AF_UNIX, SOCK_STREAM, 0);
- if(srvctrl == -1) {
- fprintf(stderr,
- "Cannot create server control socket: %s",
- strerror(errno));
- return 1;
- }
- if(bind(srvctrl, (struct sockaddr*)&addr, sizeof(addr))) {
- fprintf(stderr,
- "srvctrl socket bind failed: %s",
- strerror(errno));
- return 1;
- }
+ MZQueue *q = malloc(sizeof(MZQueue));
+ if(!q) return NULL;
- listen(srvctrl, 4);
+ q->file = file;
+ q->next = NULL;
- return 0;
+ return q;
}
-int uwcp_copy(CPSettings *settings) {
+int mzcp_copy(CPSettings *settings) {
int ret = 0;
if(check_configdir()) {
return 3;
}
+ if(init_queue()) {
+ return 4;
+ }
+
if(settings->printsocket) {
- printf("%s\n", socket_path);
+ printf("%s\n", mzcp_get_socketpath());
} else {
printf("copy %s to %s\n", settings->from, settings->to);
if(settings->pause) {
//close(1);
//close(2);
- ret = uwcp_srvctrl(settings);
+ if(mzcp_start_scan(settings)) {
+ return 2;
+ }
+
+ if(mzcp_start_copy_threads(settings)) {
+
+ return 3;
+ }
+
+ ret = mzcp_srvctrl(settings);
}
return ret;
}
-int uwcp_srvctrl(CPSettings *settings) {
- if(pipe(eventp)) {
- perror("Cannot create event pipe");
+
+int mzcp_start_scan(CPSettings *settings) {
+ if(stat(settings->from, &settings->root_stat)) {
+ // TODO: error
return 1;
}
- size_t allocfds = 8;
- size_t numfds = 1;
-
- struct pollfd *fds = calloc(allocfds, sizeof(struct pollfd));
- CtrlClient **clients = calloc(allocfds, sizeof(void*));
+ if(!S_ISDIR(settings->root_stat.st_mode)) {
+ // queue single file
+ queue_begin = queue_root_elm_new();
+ if(!queue_begin) {
+ return 1;
+ }
+ queue_end = queue_begin;
+ scan_complete = 1;
+ settings->num_threads = 1;
+ } else {
+ // scan src directory in a separate thread
+ if(pthread_create(&scan_thread, NULL, scan_run, settings)) {
+ // TODO: we need some clever error handling
+ // we are already in the forked procress and stdout/stderr are closed
+ // maybe wait for someone to connect to the unix domain socket
+ return 1;
+ }
+ }
- int timeout = TIMEOUT_IDLE;
+ return 0;
+}
+
+
+void* scan_run(void *data) {
+ CPSettings *settings = data;
- fds[0].fd = srvctrl;
- fds[0].events = POLLIN;
+ UcxList *stack = NULL;
- int abort = 0;
+ char *root = strdup("");
- time_t tbegin = time(NULL);
+ SrcFile *file = calloc(1, sizeof(SrcFile));
+ if(!file) {
+ scan_complete = 1;
+ // TODO: error
+ return NULL;
+ }
+ file->path = root;
+ file->isdir = 1;
+ file->mode = settings->root_stat.st_mode;
+ if(enqueue_file(file)) {
+ scan_complete = 1;
+ // TODO: error
+ return NULL;
+ }
- while(poll(fds, numfds, 1000) >= 0) {
- time_t tend = time(NULL);
- time_t diff = tend - tbegin;
- tbegin = tend;
+ stack = ucx_list_prepend(NULL, file);
+ while(stack) {
+ SrcFile *elm = stack->data;
+ UcxList *next = stack->next;
+ free(stack);
+ stack = next;
- if((fds[0].revents & POLLIN) == POLLIN) {
- printf("accept\n");
- int fd = accept(srvctrl, NULL, 0);
- if(fd < 0) {
- break;
+ char *path = util_concat_path(settings->from, elm->path);
+
+ int dir_fd = open(path, O_RDONLY);
+ if(dir_fd < 0) {
+ // TODO: error?
+ continue;
+ }
+
+ // read directory and enqueue all children
+ DIR *dir = fdopendir(dir_fd);
+ struct dirent *ent;
+ while((ent = readdir(dir)) != NULL) {
+ char *name = ent->d_name;
+ if(!strcmp(name, ".") || !strcmp(name, "..")) {
+ continue;
}
- //int flags = fcntl(fd, F_GETFL, 0);
- //flags = flags & ~O_NONBLOCK;
- //fcntl(fd, F_SETFL, flags);
-
- CtrlClient *client = malloc(sizeof(CtrlClient));
- memset(client, 0, sizeof(CtrlClient));
- client->fd = fd;
+ struct stat s;
+ if(fstatat(dir_fd, name, &s, 0)) {
+ // TODO: error?
+ continue;
+ }
- printf("add client: %d\n", client->fd);
+ SrcFile *f = calloc(1, sizeof(SrcFile));
+ f->path = util_concat_path(elm->path, name);
+ f->isdir = S_ISDIR(s.st_mode);
+ f->mode = s.st_mode;
+ f->depends_on = elm;
- fds[numfds].fd = client->fd;
- fds[numfds].events = POLLIN;
- fds[numfds].revents = 0;
- clients[numfds] = client;
- numfds++;
- }
-
- // check clients
- int remove = 0;
- for(int i=1;i<numfds;i++) {
- if((fds[i].revents & POLLIN) == POLLIN) {
- CtrlClient *client = clients[i];
- ssize_t r = read(fds[i].fd, client->buf + client->pos, CLIENT_MSG_BUFSIZE - client->pos);
- if(r <= 0) {
- printf("remove client: %d\n", fds[i].fd);
- fds[i].events = 0;
- remove = 1;
- } else {
- client->pos += r;
-
- int msgret = handle_messages(client);
- if(msgret == 1) {
- fds[i].events = 0;
- remove = 1;
- } else if(msgret == -1) {
- abort = 1;
- }
- }
- }
- }
-
- if(remove) {
- int j = 1;
- for(int i=1;i<numfds;i++) {
- if(fds[i].events != 0) {
- fds[j] = fds[i];
- clients[j] = clients[j];
- j++;
- } else {
- client_free(clients[i]);
- close(fds[i].fd);
- }
+ if(enqueue_file(f)) {
+ // TODO: error?
+ fprintf(stderr, "enqueue failed\n");
+ break;
}
- numfds = j;
- }
-
- if(diff >= CLIENT_UPDATE_INTERVALL) {
- for(int i=1;i<numfds;i++) {
- client_send_status(clients[i]);
+
+ // put dir on stack
+ if(f->isdir) {
+ stack = ucx_list_prepend(stack, f);
}
}
- if(abort) break;
-
- timeout = numfds > 1 ? TIMEOUT_CLIENT : TIMEOUT_IDLE;
+ closedir(dir);
}
- unlink(socket_path);
+ scan_complete = 1;
- return 0;
+ return NULL;
}
-
-void client_free(CtrlClient *client) {
- free(client);
+int mzcp_start_copy_threads(CPSettings *settings) {
+ if(settings->num_threads == 0) {
+ num_copy_threads = 1;
+ } else if(settings->num_threads > MAX_COPY_THREADS) {
+ num_copy_threads = MAX_COPY_THREADS;
+ } else {
+ num_copy_threads = settings->num_threads;
+ }
+
+ copy_threads = calloc(num_copy_threads, sizeof(pthread_t));
+
+ int f = 0;
+ for(int i=0;i<num_copy_threads;i++) {
+ if(pthread_create(©_threads[i], NULL, copy_run, settings)) {
+ f++;
+ }
+ }
+
+ return f < num_copy_threads ? 0 : 1;
}
-int handle_messages(CtrlClient *client) {
- if(client->pos == CLIENT_MSG_BUFSIZE) {
+int enqueue_file(SrcFile *file) {
+ MZQueue *q = malloc(sizeof(MZQueue));
+ if(!q) {
return 1;
}
- int msgstart = 0;
- for(int i=0;i<client->pos;i++) {
- if(client->buf[i] == '\n') {
- sstr_t msg;
- msg.ptr = &client->buf[msgstart];
- msg.length = i - msgstart;
- msgstart = i+1;
-
- int msgret = handle_client_msg(client, msg);
- if(msgret) return msgret;
- }
- }
+ q->file = file;
+ q->next = NULL;
+
+ pthread_mutex_lock(&queue_lock);
- if(msgstart < client->pos) {
- // incomplete message
- memmove(client->buf, client->buf + msgstart, client->pos - msgstart);
- client->pos -= msgstart;
+ if(queue_end) {
+ queue_end->next = q;
+ queue_end = q;
} else {
- client->pos = 0;
+ queue_begin = q;
+ queue_end = q;
}
+ pthread_cond_signal(&queue_available);
+ pthread_mutex_unlock(&queue_lock);
+
return 0;
}
-int handle_client_msg(CtrlClient *client, sstr_t msg) {
- printf("msg: %.*s\n", (int)msg.length, msg.ptr);
+static SrcFile* queue_get_file(void) {
+ SrcFile *file = NULL;
+ pthread_mutex_lock(&queue_lock);
+
+ MZQueue *q = NULL;
+ while(!q) {
+ if(!queue_begin) {
+ if(scan_complete) {
+ break;
+ }
+ pthread_cond_wait(&queue_available, &queue_lock);
+ continue;
+ } else {
+ q = queue_begin;
+ queue_begin = queue_begin->next;
+ if(!queue_begin) {
+ queue_end = NULL;
+ }
+ }
+ }
- if(!sstrcmp(msg, S("abort"))) {
- return -1;
+ if(q) {
+ file = q->file;
+ free(q);
}
- return 0;
+ pthread_mutex_unlock(&queue_lock);
+
+ return file;
+}
+
+void* copy_run(void *data) {
+ CPSettings *settings = data;
+
+ char *buffer = malloc(MZ_COPY_BUFSIZE);
+ size_t bufsize = MZ_COPY_BUFSIZE;
+
+ for(;;) {
+ SrcFile *file = queue_get_file();
+ if(!file) {
+ break;
+ }
+
+ char *from = file->path ? util_concat_path(settings->from, file->path) : settings->from;
+ char *to = util_concat_path(settings->to, file->path ? file->path : util_resource_name(settings->from));
+
+ size_t from_len = strlen(from);
+ size_t to_len = strlen(to);
+
+ if(from[from_len-1] == '/') {
+ from[from_len-1] = 0;
+ }
+ if(to[to_len-1] == '/') {
+ to[to_len-1] = 0;
+ }
+
+ int ret;
+ if(file->isdir) {
+ ret = mz_copy_dir(settings, file, from, to);
+ } else {
+ ret = mz_copy_file(settings, file, from, to, buffer, bufsize);
+ }
+
+ free(to);
+ if(from != settings->from) {
+ free(from);
+ }
+ }
+
+ return NULL;
+}
+
+int mz_copy_dir(CPSettings *settings, SrcFile *file, const char *from, const char *to) {
+ printf("mkdir %s\n", to);
+ return mkdir(to, file->mode);
}
-void client_send_status(CtrlClient *client) {
- char *msg = "s 0\n";
- write(client->fd, msg, strlen(msg));
+int mz_copy_file(CPSettings *settings, SrcFile *file, const char *from, const char *to, char *buffer, size_t bufsize) {
+ printf("cp %s %s\n", from, to);
+ int fin = open(from, O_RDONLY);
+ if(fin < 0) {
+ return 1;
+ }
+
+ int ret = 0;
+
+ int fout = open(to, O_WRONLY|O_CREAT, file->mode);
+ if(fout < 0) {
+ perror("open");
+ close(fin);
+ return 1;
+ }
+
+ ssize_t r;
+ while((r = read(fin, buffer, bufsize)) > 0) {
+ if(write(fout, buffer, r) != r) {
+ ret = 1;
+ break;
+ }
+ }
+
+ close(fin);
+ close(fout);
+
+ return ret;
}