/* * Copyright 2021 Olaf Wintermann * * Permission is hereby granted, free of charge, to any person obtaining a * copy of this software and associated documentation files (the "Software"), * to deal in the Software without restriction, including without limitation * the rights to use, copy, modify, merge, publish, distribute, sublicense, * and/or sell copies of the Software, and to permit persons to whom the * Software is furnished to do so, subject to the following conditions: * * The above copyright notice and this permission notice shall be included in * all copies or substantial portions of the Software. * * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER * DEALINGS IN THE SOFTWARE. */ #include "main.h" #include "srvctrl.h" #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #define OPTSTR "hlpsuv" static char *cfgdir; static char *copydir; static pthread_t scan_thread; static pthread_t *copy_threads; static size_t num_copy_threads; static pthread_mutex_t queue_lock; static pthread_cond_t queue_available; static MZQueue *queue_begin; static MZQueue *queue_end; static int scan_complete; int main(int argc, char** argv) { int ret = 1; extern char *optarg; extern int optind, opterr, optopt; CPSettings settings; memset(&settings, 0, sizeof(CPSettings)); int help = 0; int version = 0; int list = 0; // list copying processes int c; while((c = getopt(argc, argv, OPTSTR)) != -1) { switch(c) { case 'l': list = 1; break; case 'p': settings.pause = 1; break; case 's': settings.printsocket = 1; break; case 'u': settings.url = 1; break; case 'v': version = 1; break; } } int ac = argc - optind; if(list) { // list command } else if(help) { // print help } else if(version) { // print version } else if(ac == 2) { // copy settings.from = argv[optind]; settings.to = argv[optind+1]; ret = mzcp_copy(&settings); } else { // print usage } return ret; } static int check_dir(const char *path) { struct stat s; if(stat(path, &s)) { if(errno == ENOENT) { if(mkdir(path, S_IRWXU)) { fprintf(stderr, "Cannot create %s: %s", path, strerror(errno)); return 1; } } else { fprintf(stderr, "Cannot access %s: %s", path, strerror(errno)); return 1; } } return 0; } static int check_configdir(void) { char *home = getenv(MZCP_ENV_HOME); char *base = util_concat_path(home, MZCP_CFG_BASE); if(check_dir(base)) return 1; free(base); cfgdir = util_concat_path(home, MZCP_CFG_DIR); if(check_dir(cfgdir)) return 1; copydir = util_concat_path(mzcp_get_cfgdir(), MZCP_COPY_DIR); if(check_dir(copydir)) return 1; return 0; } const char* mzcp_get_cfgdir(void) { return cfgdir; } const char* mzcp_get_copydir(void) { return copydir; } static int init_queue(void) { if(pthread_mutex_init(&queue_lock, NULL)) return 1; if(pthread_cond_init(&queue_available, NULL)) return 1; return 0; } static MZQueue* queue_root_elm_new(void) { SrcFile *file = malloc(sizeof(SrcFile)); if(!file) return NULL; memset(file, 0, sizeof(SrcFile)); MZQueue *q = malloc(sizeof(MZQueue)); if(!q) return NULL; q->file = file; q->next = NULL; return q; } int mzcp_copy(CPSettings *settings) { int ret = 0; if(check_configdir()) { return 2; } if(create_control_socket()) { return 3; } if(init_queue()) { return 4; } if(settings->printsocket) { printf("%s\n", mzcp_get_socketpath()); } else { printf("copy %s to %s\n", settings->from, settings->to); if(settings->pause) { printf("pause\n"); } } //pid_t p = fork(); pid_t p = 0; if(p == 0) { //close(0); //close(1); //close(2); if(mzcp_start_scan(settings)) { return 2; } if(mzcp_start_copy_threads(settings)) { return 3; } ret = mzcp_srvctrl(settings); } return ret; } int mzcp_start_scan(CPSettings *settings) { struct stat s; if(stat(settings->from, &s)) { // TODO: error return 1; } if(!S_ISDIR(s.st_mode)) { // queue single file queue_begin = queue_root_elm_new(); if(!queue_begin) { return 1; } queue_end = queue_begin; scan_complete = 1; settings->num_threads = 1; } else { // scan src directory in a separate thread if(pthread_create(&scan_thread, NULL, scan_run, settings)) { // TODO: we need some clever error handling // we are already in the forked procress and stdout/stderr are closed // maybe wait for someone to connect to the unix domain socket return 1; } } return 0; } void* scan_run(void *data) { CPSettings *settings = data; UcxList *stack = NULL; char *root = strdup(""); SrcFile *file = calloc(1, sizeof(SrcFile)); if(!file) { scan_complete = 1; // TODO: error return NULL; } file->path = root; file->isdir = 1; if(enqueue_file(file)) { scan_complete = 1; // TODO: error return NULL; } stack = ucx_list_prepend(NULL, file); while(stack) { SrcFile *elm = stack->data; UcxList *next = stack->next; free(stack); stack = next; char *path = util_concat_path(settings->from, elm->path); int dir_fd = open(path, O_RDONLY); if(dir_fd < 0) { // TODO: error? continue; } // read directory and enqueue all children DIR *dir = fdopendir(dir_fd); struct dirent *ent; while((ent = readdir(dir)) != NULL) { char *name = ent->d_name; if(!strcmp(name, ".") || !strcmp(name, "..")) { continue; } struct stat s; if(fstatat(dir_fd, name, &s, 0)) { // TODO: error? continue; } SrcFile *f = calloc(1, sizeof(SrcFile)); f->path = util_concat_path(elm->path, name); f->isdir = S_ISDIR(s.st_mode); f->depends_on = elm; if(enqueue_file(f)) { // TODO: error? fprintf(stderr, "enqueue failed\n"); break; } // put dir on stack if(f->isdir) { stack = ucx_list_prepend(stack, f); } } closedir(dir); } scan_complete = 1; return NULL; } int mzcp_start_copy_threads(CPSettings *settings) { if(settings->num_threads == 0) { num_copy_threads = 1; } else if(settings->num_threads > MAX_COPY_THREADS) { num_copy_threads = MAX_COPY_THREADS; } else { num_copy_threads = settings->num_threads; } copy_threads = calloc(num_copy_threads, sizeof(pthread_t)); int f = 0; for(int i=0;ifile = file; q->next = NULL; pthread_mutex_lock(&queue_lock); if(queue_end) { queue_end->next = q; queue_end = q; } else { queue_begin = q; queue_end = q; } pthread_cond_signal(&queue_available); pthread_mutex_unlock(&queue_lock); return 0; } static SrcFile* queue_get_file(void) { SrcFile *file = NULL; pthread_mutex_lock(&queue_lock); MZQueue *q = NULL; while(!q) { if(!queue_begin) { if(scan_complete) { break; } pthread_cond_wait(&queue_available, &queue_lock); continue; } else { q = queue_begin; queue_begin = queue_begin->next; if(!queue_begin) { queue_end = NULL; } } } if(q) { file = q->file; free(q); } pthread_mutex_unlock(&queue_lock); return file; } void* copy_run(void *data) { CPSettings *settings = data; for(;;) { SrcFile *file = queue_get_file(); if(!file) { break; } char *from = file->path ? util_concat_path(settings->from, file->path) : settings->from; printf("src: %s\n", from); char *to = util_concat_path(settings->to, file->path ? file->path : util_resource_name(settings->from)); printf("dst: %s\n", to); free(to); if(from != settings->from) { free(from); } } return NULL; }