re-implement flushing

2 weeks ago

author
Mike Becker <universe@uap-core.de>
date
Sun, 05 Jan 2025 18:19:42 +0100 (2 weeks ago)
changeset 1110
a0e9be7ed131
parent 1109
89ec23988b88
child 1111
78eeeb950883

re-implement flushing

fixes #542 fixes #543

CHANGELOG file | annotate | diff | comparison | revisions
src/buffer.c file | annotate | diff | comparison | revisions
src/cx/buffer.h file | annotate | diff | comparison | revisions
tests/test_buffer.c file | annotate | diff | comparison | revisions
--- a/CHANGELOG	Sun Jan 05 14:03:30 2025 +0100
+++ b/CHANGELOG	Sun Jan 05 18:19:42 2025 +0100
@@ -12,6 +12,7 @@
  * adds several new array and list functions
  * adds cxBufferReset()
  * adds cxBufferAppend()
+ * adds cxBufferEnableFlushing() and cxBufferFlush()
  * adds CX_BUFFER_COPY_ON_WRITE and CX_BUFFER_COPY_ON_EXTEND flags
  * adds cx_cmp_ptr()
  * adds cx_vcmp_* family of functions
--- a/src/buffer.c	Sun Jan 05 14:03:30 2025 +0100
+++ b/src/buffer.c	Sun Jan 05 18:19:42 2025 +0100
@@ -71,12 +71,18 @@
     buffer->size = 0;
     buffer->pos = 0;
 
-    buffer->flush_func = NULL;
-    buffer->flush_target = NULL;
-    buffer->flush_blkmax = 0;
-    buffer->flush_blksize = 4096;
-    buffer->flush_threshold = SIZE_MAX;
+    buffer->flush = NULL;
+
+    return 0;
+}
 
+int cxBufferEnableFlushing(
+    CxBuffer *buffer,
+    CxBufferFlushConfig config
+) {
+    buffer->flush = malloc(sizeof(CxBufferFlushConfig));
+    if (buffer->flush == NULL) return -1; // LCOV_EXCL_LINE
+    memcpy(buffer->flush, &config, sizeof(CxBufferFlushConfig));
     return 0;
 }
 
@@ -84,6 +90,7 @@
     if (buffer->flags & CX_BUFFER_FREE_CONTENTS) {
         cxFree(buffer->allocator, buffer->bytes);
     }
+    free(buffer->flush);
     memset(buffer, 0, sizeof(CxBuffer));
 }
 
@@ -196,39 +203,52 @@
     }
 }
 
-/**
- * Helps flushing data to the flush target of a buffer.
- *
- * @param buffer the buffer containing the config
- * @param space the data to flush
- * @param size the element size
- * @param nitems the number of items
- * @return the number of items flushed
- */
-static size_t cx_buffer_write_flush_helper(
-        CxBuffer *buffer,
-        const unsigned char *space,
+static size_t cx_buffer_flush_helper(
+        const CxBuffer *buffer,
         size_t size,
+        const unsigned char *src,
         size_t nitems
 ) {
-    size_t pos = 0;
-    size_t remaining = nitems;
-    size_t max_items = buffer->flush_blksize / size;
-    while (remaining > 0) {
-        size_t items = remaining > max_items ? max_items : remaining;
-        size_t flushed = buffer->flush_func(
-                space + pos,
-                size, items,
-                buffer->flush_target);
+    // flush data from an arbitrary source
+    // does not need to be the buffer's contents
+    size_t max_items = buffer->flush->blksize / size;
+    size_t fblocks = 0;
+    size_t flushed_total = 0;
+    while (nitems > 0 && fblocks < buffer->flush->blkmax) {
+        fblocks++;
+        size_t items = nitems > max_items ? max_items : nitems;
+        size_t flushed = buffer->flush->wfunc(
+            src, size, items, buffer->flush->target);
         if (flushed > 0) {
-            pos += (flushed * size);
-            remaining -= flushed;
+            flushed_total += flushed;
+            src += flushed * size;
+            nitems -= flushed;
         } else {
             // if no bytes can be flushed out anymore, we give up
             break;
         }
     }
-    return nitems - remaining;
+    return flushed_total;
+}
+
+static size_t cx_buffer_flush_impl(CxBuffer *buffer, size_t size) {
+    // flush the current contents of the buffer
+    unsigned char *space = buffer->bytes;
+    size_t remaining = buffer->pos / size;
+    size_t flushed_total = cx_buffer_flush_helper(
+        buffer, size, space, remaining);
+
+    // shift the buffer left after flushing
+    // IMPORTANT: up to this point, copy on write must have been
+    // performed already, because we can't do error handling here
+    cxBufferShiftLeft(buffer, flushed_total*size);
+
+    return flushed_total;
+}
+
+size_t cxBufferFlush(CxBuffer *buffer) {
+    if (buffer_copy_on_write(buffer)) return 0;
+    return cx_buffer_flush_impl(buffer, 1);
 }
 
 size_t cxBufferWrite(
@@ -249,20 +269,20 @@
     }
 
     size_t len;
-    size_t nitems_out = nitems;
     if (cx_szmul(size, nitems, &len)) {
         errno = EOVERFLOW;
         return 0;
     }
+    if (buffer->pos > SIZE_MAX - len) {
+        errno = EOVERFLOW;
+        return 0;
+    }
     size_t required = buffer->pos + len;
-    if (buffer->pos > required) {
-        return 0;
-    }
 
     bool perform_flush = false;
     if (required > buffer->capacity) {
         if (buffer->flags & CX_BUFFER_AUTO_EXTEND) {
-            if (buffer->flush_blkmax > 0 && required > buffer->flush_threshold) {
+            if (buffer->flush != NULL && required > buffer->flush->threshold) {
                 perform_flush = true;
             } else {
                 if (cxBufferMinimumCapacity(buffer, required)) {
@@ -270,71 +290,57 @@
                 }
             }
         } else {
-            if (buffer->flush_blkmax > 0) {
+            if (buffer->flush != NULL) {
                 perform_flush = true;
             } else {
-                // truncate data to be written, if we can neither extend nor flush
+                // truncate data, if we can neither extend nor flush
                 len = buffer->capacity - buffer->pos;
                 if (size > 1) {
                     len -= len % size;
                 }
-                nitems_out = len / size;
+                nitems = len / size;
             }
         }
     }
 
+    // check here and not above because of possible truncation
     if (len == 0) {
-        return len;
+        return 0;
     }
 
-    if (perform_flush) {
-        size_t flush_max;
-        if (cx_szmul(buffer->flush_blkmax, buffer->flush_blksize, &flush_max)) {
-            errno = EOVERFLOW;
-            return 0;
-        }
-        size_t flush_pos = buffer->flush_func == NULL || buffer->flush_target == NULL
-                           ? buffer->pos
-                           : cx_buffer_write_flush_helper(buffer, buffer->bytes, 1, buffer->pos);
-        if (flush_pos == buffer->pos) {
-            // entire buffer has been flushed, we can reset
-            buffer->size = buffer->pos = 0;
-
-            size_t items_flush; // how many items can also be directly flushed
-            size_t items_keep; // how many items have to be written to the buffer
+    // check if we need to copy
+    if (buffer_copy_on_write(buffer)) return 0;
 
-            items_flush = flush_max >= required ? nitems : (flush_max - flush_pos) / size;
-            if (items_flush > 0) {
-                items_flush = cx_buffer_write_flush_helper(buffer, ptr, size, items_flush / size);
-                // in case we could not flush everything, keep the rest
+    // perform the operation
+    if (perform_flush) {
+        size_t items_flush;
+        if (buffer->pos == 0) {
+            // if we don't have data in the buffer, but are instructed
+            // to flush, it means that we are supposed to relay the data
+            items_flush = cx_buffer_flush_helper(buffer, size, ptr, nitems);
+            if (items_flush == 0) {
+                // we needed to flush, but could not flush anything
+                // give up and avoid endless trying
+                return 0;
             }
-            items_keep = nitems - items_flush;
-            if (items_keep > 0) {
-                // try again with the remaining stuff
-                const unsigned char *new_ptr = ptr;
-                new_ptr += items_flush * size;
-                // report the directly flushed items as written plus the remaining stuff
-                return items_flush + cxBufferWrite(new_ptr, size, items_keep, buffer);
-            } else {
-                // all items have been flushed - report them as written
-                return nitems;
+            size_t ritems = nitems - items_flush;
+            const unsigned char *rest = ptr;
+            rest += items_flush * size;
+            return items_flush + cxBufferWrite(rest, size, ritems, buffer);
+        } else {
+            items_flush = cx_buffer_flush_impl(buffer, size);
+            if (items_flush == 0) {
+                return 0;
             }
-        } else if (flush_pos == 0) {
-            // nothing could be flushed at all, we immediately give up without writing any data
-            return 0;
-        } else {
-            // we were partially successful, we shift left and try again
-            cxBufferShiftLeft(buffer, flush_pos);
             return cxBufferWrite(ptr, size, nitems, buffer);
         }
     } else {
-        if (buffer_copy_on_write(buffer)) return 0;
         memcpy(buffer->bytes + buffer->pos, ptr, len);
         buffer->pos += len;
         if (buffer->pos > buffer->size) {
             buffer->size = buffer->pos;
         }
-        return nitems_out;
+        return nitems;
     }
 
 }
--- a/src/cx/buffer.h	Sun Jan 05 14:03:30 2025 +0100
+++ b/src/cx/buffer.h	Sun Jan 05 18:19:42 2025 +0100
@@ -88,6 +88,60 @@
  */
 #define CX_BUFFER_COPY_ON_EXTEND 0x08
 
+/**
+ * Configuration for automatic flushing.
+ */
+struct cx_buffer_flush_config_s {
+    /**
+     * The buffer may not extend beyond this threshold before starting to flush.
+     *
+     * Only used when the buffer uses #CX_BUFFER_AUTO_EXTEND.
+     * The threshold will be the maximum capacity the buffer is extended to
+     * before flushing.
+     */
+    size_t threshold;
+    /**
+     * The block size for the elements to flush.
+     */
+    size_t blksize;
+    /**
+     * The maximum number of blocks to flush in one cycle.
+     *
+     * @attention while it is guaranteed that cxBufferFlush() will not flush
+     * more blocks, this is not necessarily the case for cxBufferWrite().
+     * After performing a flush cycle, cxBufferWrite() will retry the write
+     * operation and potentially trigger another flush cycle, until the
+     * flush target accepts no more data.
+     */
+    size_t blkmax;
+
+    /**
+     * The target for write function.
+     */
+    void *target;
+
+    /**
+     * The write-function used for flushing.
+     * If NULL, the flushed content gets discarded.
+     */
+    cx_write_func wfunc;
+};
+
+/**
+ * Type alais for the flush configuration struct.
+ *
+ * @code
+ * struct cx_buffer_flush_config_s {
+ *     size_t threshold;
+ *     size_t blksize;
+ *     size_t blkmax;
+ *     void *target;
+ *     cx_write_func wfunc;
+ * };
+ * @endcode
+ */
+typedef struct cx_buffer_flush_config_s CxBufferFlushConfig;
+
 /** Structure for the UCX buffer data. */
 struct cx_buffer_s {
     /** A pointer to the buffer contents. */
@@ -103,6 +157,12 @@
     };
     /** The allocator to use for automatic memory management. */
     const CxAllocator *allocator;
+    /**
+     * Optional flush configuration
+     *
+     * @see cxBufferEnableFlushing()
+     */
+    CxBufferFlushConfig* flush;
     /** Current position of the buffer. */
     size_t pos;
     /** Current capacity (i.e. maximum size) of the buffer. */
@@ -110,40 +170,6 @@
     /** Current size of the buffer content. */
     size_t size;
     /**
-     * The buffer may not extend beyond this threshold before starting to flush.
-     * Default is @c SIZE_MAX (flushing disabled when auto extension is enabled).
-     */
-    size_t flush_threshold;
-    /**
-     * The block size for the elements to flush.
-     * Default is 4096 bytes.
-     */
-    size_t flush_blksize;
-    /**
-     * The maximum number of blocks to flush in one cycle.
-     * Zero disables flushing entirely (this is the default).
-     * Set this to @c SIZE_MAX to flush the entire buffer.
-     *
-     * @attention if the maximum number of blocks multiplied with the block size
-     * is smaller than the expected contents written to this buffer within one write
-     * operation, multiple flush cycles are performed after that write.
-     * That means the total number of blocks flushed after one write to this buffer may
-     * be larger than @c flush_blkmax.
-     */
-    size_t flush_blkmax;
-
-    /**
-     * The write function used for flushing.
-     * If NULL, the flushed content gets discarded.
-     */
-    cx_write_func flush_func;
-
-    /**
-     * The target for @c flush_func.
-     */
-    void *flush_target;
-
-    /**
      * Flag register for buffer features.
      * @see #CX_BUFFER_DEFAULT
      * @see #CX_BUFFER_FREE_CONTENTS
@@ -199,6 +225,26 @@
 );
 
 /**
+ * Configures the buffer for flushing.
+ *
+ * Flushing can happen automatically when data is written
+ * to the buffer (see cxBufferWrite()) or manually when
+ * cxBufferFlush() is called.
+ *
+ * @param buffer the buffer
+ * @param config the flush configuration
+ * @retval zero success
+ * @retval non-zero failure
+ * @see cxBufferFlush()
+ * @see cxBufferWrite()
+ */
+cx_attr_nonnull
+int cxBufferEnableFlushing(
+    CxBuffer *buffer,
+    CxBufferFlushConfig config
+);
+
+/**
  * Destroys the buffer contents.
  *
  * Has no effect if the #CX_BUFFER_FREE_CONTENTS feature is not enabled.
@@ -419,6 +465,10 @@
 /**
  * Writes data to a CxBuffer.
  *
+ * If automatic flushing is not enabled, the data is simply written into the
+ * buffer at the current position and the position of the buffer is increased
+ * by the number of bytes written.
+ *
  * If flushing is enabled and the buffer needs to flush, the data is flushed to
  * the target until the target signals that it cannot take more data by
  * returning zero via the respective write function. In that case, the remaining
@@ -426,16 +476,11 @@
  * newly available space can be used to append as much data as possible. This
  * function only stops writing more elements, when the flush target and this
  * buffer are both incapable of taking more data or all data has been written.
- * The number returned by this function is the total number of elements that
- * could be written during the process. It does not necessarily mean that those
- * elements are still in this buffer, because some of them could have also been
- * flushed already.
- *
- * If automatic flushing is not enabled, the data is simply written into the
- * buffer at the current position and the position of the buffer is increased
- * by the number of bytes written.
- * Use cxBufferAppend() if you want to add data to this buffer regardless of
- * the position.
+ * If number of items that shall be written is larger than the buffer can hold,
+ * the first items from @c ptr are directly relayed to the flush target, if
+ * possible.
+ * The number returned by this function is only the number of elements from
+ * @c ptr that could be written to either the flush target or the buffer.
  *
  * @note The signature is compatible with the fwrite() family of functions.
  *
@@ -483,6 +528,62 @@
 );
 
 /**
+ * Performs a single flush-run on the specified buffer.
+ *
+ * Does nothing when the position in the buffer is zero.
+ * Otherwise, the data until the current position minus
+ * one is considered for flushing.
+ * Note carefully that flushing will never exceed the
+ * current @em position, even when the size of the
+ * buffer is larger than the current position.
+ *
+ * One flush run will try to flush @c blkmax many
+ * blocks of size @c blksize until either the @p buffer
+ * has no more data to flush or the write function
+ * used for flushing returns zero.
+ *
+ * The buffer is shifted left for that many bytes
+ * the flush operation has successfully flushed.
+ *
+ * @par Example 1
+ * Assume you have a buffer with size 340 and you are
+ * at position 200. The flush configuration is
+ * @c blkmax=4 and @c blksize=64 .
+ * Assume that the entire flush operation is successful.
+ * All 200 bytes on the left hand-side from the current
+ * position are written.
+ * That means, the size of the buffer is now 140 and the
+ * position is zero.
+ *
+ * @par Example 2
+ * Same as Example 1, but now the @c blkmax is 1.
+ * The size of the buffer is now 276 and the position is 136.
+ *
+ * @par Example 3
+ * Same as Example 1, but now assume the flush target
+ * only accepts 100 bytes before returning zero.
+ * That means, the flush operations manages to flush
+ * one complete block and one partial block, ending
+ * up with a buffer with size 240 and position 100.
+ *
+ * @remark Just returns zero when flushing was not enabled with
+ * cxBufferEnableFlushing().
+ *
+ * @remark When the buffer uses copy-on-write, the memory
+ * is copied first, before attempting any flush.
+ * This is, however, considered an erroneous use of the
+ * buffer, because it does not make much sense to put
+ * readonly data into an UCX buffer for flushing, instead
+ * of writing it directly to the target.
+ *
+ * @param buffer the buffer
+ * @return the number of successfully flushed bytes
+ * @see cxBufferEnableFlushing()
+ */
+cx_attr_nonnull
+size_t cxBufferFlush(CxBuffer *buffer);
+
+/**
  * Reads data from a CxBuffer.
  *
  * The position of the buffer is increased by the number of bytes read.
--- a/tests/test_buffer.c	Sun Jan 05 14:03:30 2025 +0100
+++ b/tests/test_buffer.c	Sun Jan 05 18:19:42 2025 +0100
@@ -31,14 +31,6 @@
 
 #include "cx/buffer.h"
 
-static CX_TEST_SUBROUTINE(expect_default_flush_config, CxBuffer *buf) {
-    CX_TEST_ASSERT(buf->flush_blkmax == 0);
-    CX_TEST_ASSERT(buf->flush_blksize == 4096);
-    CX_TEST_ASSERT(buf->flush_threshold == SIZE_MAX);
-    CX_TEST_ASSERT(buf->flush_func == NULL);
-    CX_TEST_ASSERT(buf->flush_target == NULL);
-}
-
 CX_TEST(test_buffer_init_wrap_space) {
     CxTestingAllocator talloc;
     cx_testing_allocator_init(&talloc);
@@ -47,7 +39,7 @@
         CxBuffer buf;
         void *space = cxMalloc(alloc, 16);
         cxBufferInit(&buf, space, 16, alloc, CX_BUFFER_DEFAULT);
-        CX_TEST_CALL_SUBROUTINE(expect_default_flush_config, &buf);
+        CX_TEST_ASSERT(buf.flush == NULL);
         CX_TEST_ASSERT(buf.space == space);
         CX_TEST_ASSERT((buf.flags & CX_BUFFER_AUTO_EXTEND) == 0);
         CX_TEST_ASSERT((buf.flags & CX_BUFFER_FREE_CONTENTS) == 0);
@@ -71,7 +63,7 @@
         CxBuffer buf;
         void *space = cxMalloc(alloc, 16);
         cxBufferInit(&buf, space, 16, alloc, CX_BUFFER_AUTO_EXTEND);
-        CX_TEST_CALL_SUBROUTINE(expect_default_flush_config, &buf);
+        CX_TEST_ASSERT(buf.flush == NULL);
         CX_TEST_ASSERT(buf.space == space);
         CX_TEST_ASSERT((buf.flags & CX_BUFFER_AUTO_EXTEND) == CX_BUFFER_AUTO_EXTEND);
         CX_TEST_ASSERT((buf.flags & CX_BUFFER_FREE_CONTENTS) == 0);
@@ -95,7 +87,7 @@
         CxBuffer buf;
         void *space = cxMalloc(alloc, 16);
         cxBufferInit(&buf, space, 16, alloc, CX_BUFFER_FREE_CONTENTS);
-        CX_TEST_CALL_SUBROUTINE(expect_default_flush_config, &buf);
+        CX_TEST_ASSERT(buf.flush == NULL);
         CX_TEST_ASSERT(buf.space == space);
         CX_TEST_ASSERT((buf.flags & CX_BUFFER_AUTO_EXTEND) == 0);
         CX_TEST_ASSERT((buf.flags & CX_BUFFER_FREE_CONTENTS) == CX_BUFFER_FREE_CONTENTS);
@@ -117,7 +109,7 @@
     CX_TEST_DO {
         CxBuffer buf;
         cxBufferInit(&buf, NULL, 8, alloc, CX_BUFFER_DEFAULT);
-        CX_TEST_CALL_SUBROUTINE(expect_default_flush_config, &buf);
+        CX_TEST_ASSERT(buf.flush == NULL);
         CX_TEST_ASSERT(buf.space != NULL);
         CX_TEST_ASSERT((buf.flags & CX_BUFFER_AUTO_EXTEND) == 0);
         CX_TEST_ASSERT((buf.flags & CX_BUFFER_FREE_CONTENTS) == CX_BUFFER_FREE_CONTENTS);
@@ -141,7 +133,7 @@
         void *space = cxMalloc(alloc, 16);
         buf = cxBufferCreate(space, 16, alloc, CX_BUFFER_FREE_CONTENTS);
         CX_TEST_ASSERT(buf != NULL);
-        CX_TEST_CALL_SUBROUTINE(expect_default_flush_config, buf);
+        CX_TEST_ASSERT(buf->flush == NULL);
         CX_TEST_ASSERT(buf->space == space);
         CX_TEST_ASSERT((buf->flags & CX_BUFFER_AUTO_EXTEND) == 0);
         CX_TEST_ASSERT((buf->flags & CX_BUFFER_FREE_CONTENTS) == CX_BUFFER_FREE_CONTENTS);
@@ -642,15 +634,7 @@
         cx_attr_unused size_t nitems,
         CxBuffer *buffer
 ) {
-    // simulate limited target drain capacity
-    static bool full = false;
-    if (full) {
-        full = false;
-        return 0;
-    } else {
-        full = true;
-        return cxBufferWrite(ptr, size, nitems > 2 ? 2 : nitems, buffer);
-    }
+    return cxBufferWrite(ptr, size, nitems > 2 ? 2 : nitems, buffer);
 }
 
 CX_TEST(test_buffer_write_size_one_fit) {
@@ -1090,15 +1074,18 @@
 
 CX_TEST(test_buffer_write_flush_at_capacity) {
     CxBuffer buf, target;
-    cxBufferInit(&target, NULL, 16, cxDefaultAllocator, CX_BUFFER_AUTO_EXTEND);
-    cxBufferInit(&buf, NULL, 16, cxDefaultAllocator, CX_BUFFER_DEFAULT);
-    memcpy(buf.space, "prep\0\0\0\0\0\0\0\0\0\0\0\0", 16);
-    buf.capacity = 8;
-    buf.size = buf.pos = 4;
-    buf.flush_target = &target;
-    buf.flush_func = (cx_write_func)cxBufferWrite;
-    buf.flush_blkmax = 1;
+    cxBufferInit(&target, NULL, 8, cxDefaultAllocator, CX_BUFFER_AUTO_EXTEND);
+    cxBufferInit(&buf, NULL, 8, cxDefaultAllocator, CX_BUFFER_DEFAULT);
+    memset(buf.space, 0, 8);
+    cxBufferPutString(&buf, "prep");
     CX_TEST_DO {
+        CxBufferFlushConfig flush;
+        flush.threshold = 0;
+        flush.blksize = 32;
+        flush.blkmax = 1;
+        flush.target = &target;
+        flush.wfunc = (cx_write_func)cxBufferWrite;
+        CX_TEST_ASSERT(0 == cxBufferEnableFlushing(&buf, flush));
         size_t written = cxBufferWrite("foo", 1, 3, &buf);
         CX_TEST_ASSERT(written == 3);
         CX_TEST_ASSERT(buf.pos == 7);
@@ -1107,12 +1094,13 @@
         CX_TEST_ASSERT(target.size == 0);
         written = cxBufferWrite("hello", 1, 5, &buf);
         CX_TEST_ASSERT(written == 5);
-        CX_TEST_ASSERT(buf.pos == 0);
-        CX_TEST_ASSERT(buf.size == 0);
+        CX_TEST_ASSERT(buf.pos == 5);
+        CX_TEST_ASSERT(buf.size == 5);
         CX_TEST_ASSERT(buf.capacity == 8);
-        CX_TEST_ASSERT(target.pos == 12);
-        CX_TEST_ASSERT(target.size == 12);
-        CX_TEST_ASSERT(0 == memcmp(target.space, "prepfoohello", 12));
+        CX_TEST_ASSERT(target.pos == 7);
+        CX_TEST_ASSERT(target.size == 7);
+        CX_TEST_ASSERT(0 == memcmp(buf.space, "hello", 5));
+        CX_TEST_ASSERT(0 == memcmp(target.space, "prepfoo", 7));
     }
     cxBufferDestroy(&buf);
     cxBufferDestroy(&target);
@@ -1120,17 +1108,17 @@
 
 CX_TEST(test_buffer_write_flush_at_threshold) {
     CxBuffer buf, target;
-    cxBufferInit(&target, NULL, 16, cxDefaultAllocator, CX_BUFFER_AUTO_EXTEND);
-    cxBufferInit(&buf, NULL, 16, cxDefaultAllocator, CX_BUFFER_DEFAULT);
-    memcpy(buf.space, "prep\0\0\0\0\0\0\0\0\0\0\0\0", 16);
-    buf.capacity = 8;
-    buf.size = buf.pos = 4;
-    buf.flush_target = &target;
-    buf.flush_func = (cx_write_func)cxBufferWrite;
-    buf.flush_blkmax = 1;
-    buf.flush_threshold = 12;
-    buf.flags |= CX_BUFFER_AUTO_EXTEND;
+    cxBufferInit(&target, NULL, 8, cxDefaultAllocator, CX_BUFFER_AUTO_EXTEND);
+    cxBufferInit(&buf, NULL, 8, cxDefaultAllocator, CX_BUFFER_AUTO_EXTEND);
+    cxBufferPutString(&buf, "prep");
     CX_TEST_DO {
+        CxBufferFlushConfig flush;
+        flush.threshold = 12;
+        flush.blksize = 32;
+        flush.blkmax = 1;
+        flush.target = &target;
+        flush.wfunc = (cx_write_func)cxBufferWrite;
+        CX_TEST_ASSERT(0 == cxBufferEnableFlushing(&buf, flush));
         size_t written = cxBufferWrite("foobar", 1, 6, &buf);
         CX_TEST_ASSERT(written == 6);
         CX_TEST_ASSERT(buf.pos == 10);
@@ -1141,31 +1129,35 @@
         CX_TEST_ASSERT(target.size == 0);
         written = cxBufferWrite("hello", 1, 5, &buf);
         CX_TEST_ASSERT(written == 5);
-        CX_TEST_ASSERT(buf.pos == 0);
-        CX_TEST_ASSERT(buf.size == 0);
+        CX_TEST_ASSERT(buf.pos == 5);
+        CX_TEST_ASSERT(buf.size == 5);
         CX_TEST_ASSERT(buf.capacity <= 12);
-        CX_TEST_ASSERT(target.pos == 15);
-        CX_TEST_ASSERT(target.size == 15);
-        CX_TEST_ASSERT(0 == memcmp(target.space, "prepfoobarhello", 15));
+        CX_TEST_ASSERT(target.pos == 10);
+        CX_TEST_ASSERT(target.size == 10);
+        CX_TEST_ASSERT(0 == memcmp(buf.space, "hello", 5));
+        CX_TEST_ASSERT(0 == memcmp(target.space, "prepfoobar", 10));
     }
     cxBufferDestroy(&buf);
     cxBufferDestroy(&target);
 }
 
-CX_TEST(test_buffer_write_flush_rate_limited) {
+CX_TEST(test_buffer_write_flush_rate_limited_and_buffer_too_small) {
+    // the idea is that the target only accepts two bytes and
+    // then gives up... accepts another two bytes, gives up, etc.
+    // and at the same time, the written string is too large for
+    // the buffer (buffer can take 8, we want to write 13)
     CxBuffer buf, target;
-    cxBufferInit(&target, NULL, 16, cxDefaultAllocator, CX_BUFFER_AUTO_EXTEND);
-    cxBufferInit(&buf, NULL, 16, cxDefaultAllocator, CX_BUFFER_DEFAULT);
-    memcpy(buf.space, "prep\0\0\0\0\0\0\0\0\0\0\0\0", 16);
-    buf.capacity = 8;
-    buf.size = buf.pos = 4;
-    buf.flush_target = &target;
-    buf.flush_blkmax = 1;
-    // limit the rate of the flush function and the capacity of the target
-    buf.flush_func = (cx_write_func) mock_write_limited_rate;
-    target.capacity = 16;
-    target.flags &= ~CX_BUFFER_AUTO_EXTEND;
+    cxBufferInit(&target, NULL, 8, cxDefaultAllocator, CX_BUFFER_AUTO_EXTEND);
+    cxBufferInit(&buf, NULL, 8, cxDefaultAllocator, CX_BUFFER_DEFAULT);
+    cxBufferPutString(&buf, "prep");
     CX_TEST_DO {
+        CxBufferFlushConfig flush;
+        flush.threshold = 0;
+        flush.blksize = 32;
+        flush.blkmax = 1;
+        flush.target = &target;
+        flush.wfunc = (cx_write_func)mock_write_limited_rate;
+        CX_TEST_ASSERT(0 == cxBufferEnableFlushing(&buf, flush));
         size_t written = cxBufferWrite("foo", 1, 3, &buf);
         CX_TEST_ASSERT(written == 3);
         CX_TEST_ASSERT(buf.pos == 7);
@@ -1181,13 +1173,49 @@
         CX_TEST_ASSERT(0 == memcmp(buf.space, " world!", 7));
         CX_TEST_ASSERT(target.pos == 13);
         CX_TEST_ASSERT(target.size == 13);
-        CX_TEST_ASSERT(target.capacity == 16);
+        CX_TEST_ASSERT(target.capacity >= 13);
         CX_TEST_ASSERT(0 == memcmp(target.space, "prepfoohello,", 13));
     }
     cxBufferDestroy(&buf);
     cxBufferDestroy(&target);
 }
 
+CX_TEST(test_buffer_flush) {
+    CxBuffer buf, target;
+    cxBufferInit(&target, NULL, 8, cxDefaultAllocator, CX_BUFFER_AUTO_EXTEND);
+    cxBufferInit(&buf, NULL, 8, cxDefaultAllocator, CX_BUFFER_DEFAULT);
+    cxBufferPutString(&buf, "prepare");
+    CX_TEST_DO {
+        CxBufferFlushConfig flush;
+        flush.threshold = 0;
+        flush.blksize = 2;
+        flush.blkmax = 2;
+        flush.target = &target;
+        flush.wfunc = (cx_write_func)cxBufferWrite;
+        CX_TEST_ASSERT(0 == cxBufferEnableFlushing(&buf, flush));
+        CX_TEST_ASSERT(buf.size == 7);
+        buf.pos = 5;
+        size_t flushed = cxBufferFlush(&buf);
+        CX_TEST_ASSERT(flushed == flush.blkmax * flush.blksize);
+        CX_TEST_ASSERT(buf.pos == 1);
+        CX_TEST_ASSERT(buf.size == 3);
+        CX_TEST_ASSERT(target.pos == 4);
+        CX_TEST_ASSERT(target.size == 4);
+        CX_TEST_ASSERT(0 == memcmp(buf.space, "are", 3));
+        CX_TEST_ASSERT(0 == memcmp(target.space, "prep", 4));
+        flushed = cxBufferFlush(&buf);
+        CX_TEST_ASSERT(flushed == 1);
+        CX_TEST_ASSERT(buf.pos == 0);
+        CX_TEST_ASSERT(buf.size == 2);
+        CX_TEST_ASSERT(target.pos == 5);
+        CX_TEST_ASSERT(target.size == 5);
+        CX_TEST_ASSERT(0 == memcmp(buf.space, "re", 2));
+        CX_TEST_ASSERT(0 == memcmp(target.space, "prepa", 5));
+    }
+    cxBufferDestroy(&buf);
+    cxBufferDestroy(&target);
+}
+
 CX_TEST(test_buffer_get) {
     CxBuffer buf;
     cxBufferInit(&buf, NULL, 16, cxDefaultAllocator, CX_BUFFER_DEFAULT);
@@ -1348,7 +1376,8 @@
     cx_test_register(suite, test_buffer_write_only_overwrite);
     cx_test_register(suite, test_buffer_write_flush_at_capacity);
     cx_test_register(suite, test_buffer_write_flush_at_threshold);
-    cx_test_register(suite, test_buffer_write_flush_rate_limited);
+    cx_test_register(suite, test_buffer_write_flush_rate_limited_and_buffer_too_small);
+    cx_test_register(suite, test_buffer_flush);
     cx_test_register(suite, test_buffer_get);
     cx_test_register(suite, test_buffer_get_eof);
     cx_test_register(suite, test_buffer_read);

mercurial