# HG changeset patch # User Mike Becker # Date 1651403671 -7200 # Node ID 2e73456e5f84f9a9e3008c2acd3ca102e125c209 # Parent 7b9114030ca43c7bdcb602064d249261dd62865f #184 untested implementation of the flush feature diff -r 7b9114030ca4 -r 2e73456e5f84 src/buffer.c --- a/src/buffer.c Sun May 01 11:54:10 2022 +0200 +++ b/src/buffer.c Sun May 01 13:14:31 2022 +0200 @@ -134,6 +134,41 @@ } } +/** + * Helps flushing data to the flush target of a buffer. + * + * @param buffer the buffer containing the config + * @param space the data to flush + * @param size the element size + * @param nitems the number of items + * @return the number of items flushed + */ +static size_t cx_buffer_write_flush_helper( + CxBuffer *buffer, + unsigned char const *space, + size_t size, + size_t nitems +) { + size_t pos = 0; + size_t remaining = nitems; + size_t max_items = buffer->flush_blksize / size; + while (remaining > 0) { + size_t items = remaining > max_items ? max_items : remaining; + size_t flushed = buffer->flush_func( + space + pos, + size, items, + buffer->flush_target); + if (flushed > 0) { + pos += (flushed * size); + remaining -= flushed; + } else { + // if no bytes can be flushed out anymore, we give up + break; + } + } + return nitems - remaining; +} + size_t cxBufferWrite( void const *ptr, size_t size, @@ -151,6 +186,7 @@ } size_t len; + size_t nitems_out = nitems; if (cx_szmul(size, nitems, &len)) { return 0; } @@ -178,7 +214,7 @@ if (size > 1) { len -= len % size; } - nitems = len / size; + nitems_out = len / size; } } } @@ -188,24 +224,52 @@ } if (perform_flush) { - // TODO: implement flushing - // (1) determine how many bytes to flush (use flushmax = blkmax * blksize) - // (2) if len is larger than the number computed in (1) we need more flush cycles, compute how many - // (3) determine how many bytes from the buffer shall be flushed - // (4) if something remains in the buffer, shift the buffer to the left - // (4a) if buffer was shifted, append the new data to the buffer - // (4b) if the buffer was flushed entirely AND the new data also fits into flushmax, - // directly write the new data to the flush sink - return 0; // remove this after implementation + size_t flush_max; + if (cx_szmul(buffer->flush_blkmax, buffer->flush_blksize, &flush_max)) { + return 0; + } + size_t flush_pos = buffer->flush_func == NULL || buffer->flush_target == NULL + ? buffer->pos + : cx_buffer_write_flush_helper(buffer, buffer->bytes, 1, buffer->pos); + if (flush_pos == buffer->pos) { + // entire buffer has been flushed, we can reset + buffer->size = buffer->pos = 0; + + size_t items_flush; // how many items can also be directly flushed + size_t items_keep; // how many items have to be written to the buffer + + items_flush = flush_max >= required ? nitems : (flush_max - flush_pos) / size; + if (items_flush > 0) { + items_flush = cx_buffer_write_flush_helper(buffer, ptr, size, items_flush / size); + // in case we could not flush everything, keep the rest + } + items_keep = nitems - items_flush; + if (items_keep > 0) { + // try again with the remaining stuff + unsigned char const *new_ptr = ptr; + new_ptr += items_flush * size; + return cxBufferWrite(new_ptr, size, items_keep, buffer); + } else { + // all items have been flushed - report them as written + return nitems; + } + } else if (flush_pos == 0) { + // nothing could be flushed at all, we immediately give up without writing any data + return 0; + } else { + // we were partially successful, we have shift left and try again + cxBufferShiftLeft(buffer, flush_pos); + return cxBufferWrite(ptr, size, nitems, buffer); + } } else { memcpy(buffer->bytes + buffer->pos, ptr, len); buffer->pos += len; if (buffer->pos > buffer->size) { buffer->size = buffer->pos; } + return nitems_out; } - return nitems; } int cxBufferPut(