2 * Copyright 2021 Olaf Wintermann
4 * Permission is hereby granted, free of charge, to any person obtaining a
5 * copy of this software and associated documentation files (the "Software"),
6 * to deal in the Software without restriction, including without limitation
7 * the rights to use, copy, modify, merge, publish, distribute, sublicense,
8 * and/or sell copies of the Software, and to permit persons to whom the
9 * Software is furnished to do so, subject to the following conditions:
11 * The above copyright notice and this permission notice shall be included in
12 * all copies or substantial portions of the Software.
14 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
17 * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
19 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
20 * DEALINGS IN THE SOFTWARE.
35 #include <sys/socket.h>
37 #include <sys/fcntl.h>
41 #include <libidav/utils.h>
43 #include <ucx/utils.h>
46 #define OPTSTR "hlpsuv"
52 static pthread_t scan_thread;
54 static pthread_t *copy_threads;
55 static size_t num_copy_threads;
57 static pthread_mutex_t queue_lock;
58 static pthread_cond_t queue_available;
59 static MZQueue *queue_begin;
60 static MZQueue *queue_end;
62 static int scan_complete;
64 int main(int argc, char** argv) {
68 extern int optind, opterr, optopt;
71 memset(&settings, 0, sizeof(CPSettings));
75 int list = 0; // list copying processes
78 while((c = getopt(argc, argv, OPTSTR)) != -1) {
80 case 'l': list = 1; break;
81 case 'p': settings.pause = 1; break;
82 case 's': settings.printsocket = 1; break;
83 case 'u': settings.url = 1; break;
84 case 'v': version = 1; break;
88 int ac = argc - optind;
98 settings.from = argv[optind];
99 settings.to = argv[optind+1];
100 ret = mzcp_copy(&settings);
109 static int check_dir(const char *path) {
112 if(errno == ENOENT) {
113 if(mkdir(path, S_IRWXU)) {
114 fprintf(stderr, "Cannot create %s: %s", path, strerror(errno));
118 fprintf(stderr, "Cannot access %s: %s", path, strerror(errno));
125 static int check_configdir(void) {
126 char *home = getenv(MZCP_ENV_HOME);
128 char *base = util_concat_path(home, MZCP_CFG_BASE);
129 if(check_dir(base)) return 1;
132 cfgdir = util_concat_path(home, MZCP_CFG_DIR);
133 if(check_dir(cfgdir)) return 1;
135 copydir = util_concat_path(mzcp_get_cfgdir(), MZCP_COPY_DIR);
136 if(check_dir(copydir)) return 1;
141 const char* mzcp_get_cfgdir(void) {
145 const char* mzcp_get_copydir(void) {
149 static int init_queue(void) {
150 if(pthread_mutex_init(&queue_lock, NULL)) return 1;
151 if(pthread_cond_init(&queue_available, NULL)) return 1;
155 static MZQueue* queue_root_elm_new(void) {
156 SrcFile *file = malloc(sizeof(SrcFile));
157 if(!file) return NULL;
158 memset(file, 0, sizeof(SrcFile));
160 MZQueue *q = malloc(sizeof(MZQueue));
169 int mzcp_copy(CPSettings *settings) {
172 if(check_configdir()) {
177 if(create_control_socket()) {
185 if(settings->printsocket) {
186 printf("%s\n", mzcp_get_socketpath());
188 printf("copy %s to %s\n", settings->from, settings->to);
189 if(settings->pause) {
201 if(mzcp_start_scan(settings)) {
205 if(mzcp_start_copy_threads(settings)) {
210 ret = mzcp_srvctrl(settings);
217 int mzcp_start_scan(CPSettings *settings) {
218 if(stat(settings->from, &settings->root_stat)) {
223 if(!S_ISDIR(settings->root_stat.st_mode)) {
225 queue_begin = queue_root_elm_new();
229 queue_end = queue_begin;
231 settings->num_threads = 1;
233 // scan src directory in a separate thread
234 if(pthread_create(&scan_thread, NULL, scan_run, settings)) {
235 // TODO: we need some clever error handling
236 // we are already in the forked procress and stdout/stderr are closed
237 // maybe wait for someone to connect to the unix domain socket
246 void* scan_run(void *data) {
247 CPSettings *settings = data;
249 UcxList *stack = NULL;
251 char *root = strdup("");
253 SrcFile *file = calloc(1, sizeof(SrcFile));
261 file->mode = settings->root_stat.st_mode;
262 if(enqueue_file(file)) {
268 stack = ucx_list_prepend(NULL, file);
270 SrcFile *elm = stack->data;
271 UcxList *next = stack->next;
275 char *path = util_concat_path(settings->from, elm->path);
277 int dir_fd = open(path, O_RDONLY);
283 // read directory and enqueue all children
284 DIR *dir = fdopendir(dir_fd);
286 while((ent = readdir(dir)) != NULL) {
287 char *name = ent->d_name;
288 if(!strcmp(name, ".") || !strcmp(name, "..")) {
293 if(fstatat(dir_fd, name, &s, 0)) {
298 SrcFile *f = calloc(1, sizeof(SrcFile));
299 f->path = util_concat_path(elm->path, name);
300 f->isdir = S_ISDIR(s.st_mode);
304 if(enqueue_file(f)) {
306 fprintf(stderr, "enqueue failed\n");
312 stack = ucx_list_prepend(stack, f);
324 int mzcp_start_copy_threads(CPSettings *settings) {
325 if(settings->num_threads == 0) {
326 num_copy_threads = 1;
327 } else if(settings->num_threads > MAX_COPY_THREADS) {
328 num_copy_threads = MAX_COPY_THREADS;
330 num_copy_threads = settings->num_threads;
333 copy_threads = calloc(num_copy_threads, sizeof(pthread_t));
336 for(int i=0;i<num_copy_threads;i++) {
337 if(pthread_create(©_threads[i], NULL, copy_run, settings)) {
342 return f < num_copy_threads ? 0 : 1;
345 int enqueue_file(SrcFile *file) {
346 MZQueue *q = malloc(sizeof(MZQueue));
354 pthread_mutex_lock(&queue_lock);
364 pthread_cond_signal(&queue_available);
365 pthread_mutex_unlock(&queue_lock);
370 static SrcFile* queue_get_file(void) {
371 SrcFile *file = NULL;
372 pthread_mutex_lock(&queue_lock);
380 pthread_cond_wait(&queue_available, &queue_lock);
384 queue_begin = queue_begin->next;
396 pthread_mutex_unlock(&queue_lock);
401 void* copy_run(void *data) {
402 CPSettings *settings = data;
404 char *buffer = malloc(MZ_COPY_BUFSIZE);
405 size_t bufsize = MZ_COPY_BUFSIZE;
408 SrcFile *file = queue_get_file();
413 char *from = file->path ? util_concat_path(settings->from, file->path) : settings->from;
414 char *to = util_concat_path(settings->to, file->path ? file->path : util_resource_name(settings->from));
416 size_t from_len = strlen(from);
417 size_t to_len = strlen(to);
419 if(from[from_len-1] == '/') {
420 from[from_len-1] = 0;
422 if(to[to_len-1] == '/') {
426 if(file->depends_on) {
427 SrcFile *dep = file->depends_on;
428 // check first without lock
429 if(dep->status == 0) {
431 pthread_mutex_lock(&dep->lock->mutex);
432 if(file->depends_on->status == 0) {
433 pthread_cond_wait(&dep->lock->cond, &dep->lock->mutex);
435 pthread_mutex_unlock(&dep->lock->mutex);
437 // locking disabled (because we have only one thread)
438 // but in that case the file status can't be 0
439 // therefore this case here should not happen
444 if(dep->status == -1) {
450 if(file->status == 0) {
452 ret = mz_copy_dir(settings, file, from, to);
454 ret = mz_copy_file(settings, file, from, to, buffer, bufsize);
459 if(from != settings->from) {
467 static void file_set_status(SrcFile *file, int status) {
469 pthread_mutex_lock(&file->lock->mutex);
470 file->status = status;
471 pthread_cond_broadcast(&file->lock->cond);
472 pthread_mutex_unlock(&file->lock->mutex);
474 file->status = status;
478 int mz_copy_dir(CPSettings *settings, SrcFile *file, const char *from, const char *to) {
479 printf("mkdir %s\n", to);
480 int ret = mkdir(to, file->mode);
482 if(errno == EEXIST) {
485 if(S_ISDIR(s.st_mode)) {
491 file_set_status(file, !ret ? 1 : -1);
495 int mz_copy_file(CPSettings *settings, SrcFile *file, const char *from, const char *to, char *buffer, size_t bufsize) {
496 printf("cp %s %s\n", from, to);
497 int fin = open(from, O_RDONLY);
499 file_set_status(file, -1);
505 int fout = open(to, O_WRONLY|O_CREAT, file->mode);
509 file_set_status(file, -1);
514 while((r = read(fin, buffer, bufsize)) > 0) {
515 if(write(fout, buffer, r) != r) {
524 file_set_status(file, !ret ? 1 : -1);