I am learning about multithreading in general and winapi multithreading specifically.
I have an SPSC FIFO queue implemented over a ring buffer. I have not yet added any synchronization, and am not explicitly using any atomic operations, yet the queue appears to work. By "work" I mean that my three assertions never get triggered.
I don't understand why that is. I think either locking or atomic operations on queue.ring_write_pos
and queue.ring_read_pos
should be necessary.
I suspect that one of these is true:
I would appreciate help in identifying what I'm missing here.
// Compiled with: // clang version 18.1.2 // Target: x86_64-pc-windows-msvc // Thread model: posix // InstalledDir: C:\Program Files\LLVM\bin // clang-cl spsc_queue.c -o "spsc_queue.exe" /MT /WX /Zi /link /DEBUG:FULL /SUBSYSTEM:CONSOLE /WX #include <assert.h> #include <inttypes.h> #include <stdbool.h> #include <stdint.h> #include <stdio.h> #include <stdlib.h> #define WIN32_LEAN_AND_MEAN #include <windows.h> struct Queue { uint8_t *ring_buffer; uint64_t ring_size; uint64_t ring_read_pos; uint64_t ring_write_pos; }; #define QUEUE_SIZE 32 static_assert((QUEUE_SIZE & (QUEUE_SIZE - 1)) == 0, "Queue size must be a power of 2"); #define MIN(a, b) (((a) < (b)) ? (a) : (b)) uint64_t ring_read(uint8_t *ring, uint64_t ring_size, uint64_t pos, void *dst, uint64_t read_size) { read_size = MIN(read_size, ring_size); uint64_t read_start_1 = pos % ring_size; uint64_t read_size_1 = read_size; uint64_t read_size_2 = 0; if (read_start_1 + read_size > ring_size) { read_size_1 = ring_size - read_start_1; read_size_2 = read_size - read_size_1; } if (read_size_1 != 0) { memcpy(dst, ring + read_start_1, read_size_1); } if (read_size_2 != 0) { memcpy(dst + read_size_1, ring, read_size_2); } return read_size; } uint64_t ring_write(uint8_t *ring, uint64_t ring_size, uint64_t pos, void *src, uint64_t write_size) { write_size = MIN(write_size, ring_size); uint64_t write_start_1 = pos % ring_size; uint64_t write_size_1 = write_size; uint64_t write_size_2 = 0; if (write_start_1 + write_size > ring_size) { write_size_1 = ring_size - write_start_1; write_size_2 = write_size - write_size_1; } if (write_size_1 != 0) { memcpy(ring + write_start_1, src, write_size_1); } if (write_size_2 != 0) { memcpy(ring, src + write_size_1, write_size_2); } return write_size; } void check_invariants(struct Queue *queue, int line) { if (!(queue->ring_write_pos >= queue->ring_read_pos)) { printf("Failed invariant(%d): write >= read: %" PRIu64 ", %" PRIu64 "\n", line, queue->ring_write_pos, queue->ring_read_pos); assert(false); } if (!(queue->ring_write_pos - queue->ring_read_pos <= queue->ring_size)) { printf("Failed invariant(%d): write - read <= size: %" PRIu64 ", %" PRIu64 ", %" PRIu64 "\n", line, queue->ring_write_pos, queue->ring_read_pos, queue->ring_size); assert(false); } } DWORD WINAPI read_thread(LPVOID lpParam) { struct Queue *queue = lpParam; bool first = true; uint64_t prev_value = 0; while (true) { if (queue->ring_write_pos - queue->ring_read_pos) { check_invariants(queue, __LINE__); uint64_t val; queue->ring_read_pos += ring_read(queue->ring_buffer, queue->ring_size, queue->ring_read_pos, &val, sizeof(uint64_t)); if (!first) { if (val != prev_value + 1) { printf("Val: %" PRIu64 " Prev: %" PRIu64 "\n", val, prev_value); assert(false); } } first = false; prev_value = val; } } } int main(void) { printf("Begin\n"); struct Queue queue = { .ring_buffer = malloc(QUEUE_SIZE * sizeof(uint64_t)), .ring_size = QUEUE_SIZE * sizeof(uint64_t), }; DWORD read_thread_id; CreateThread(NULL, 0, &read_thread, &queue, 0, &read_thread_id); uint64_t i = 0; uint64_t max_i = 1000; while (i < max_i) { while (queue.ring_write_pos != queue.ring_read_pos + queue.ring_size && i < max_i) { check_invariants(&queue, __LINE__); queue.ring_write_pos += ring_write(queue.ring_buffer, queue.ring_size, queue.ring_write_pos, &i, sizeof(uint64_t)); ++i; } } while (queue.ring_write_pos - queue.ring_read_pos) { Sleep(16); } printf("End\n"); return 0; }
I think I have proof that #2 is true. The writes to the queue.ring_write_pos
and queue.ring_read_pos
are guaranteed by x86_64 to be atomic, and this is why the queue works.
To prove this, I tried to make the queue fail by breaking the conditions upon which the x86_64 atomicity guarantees depend, predicting that I would observe failed asserts due to nonsensical ring_buffer offsets due to torn writes.
Intel Software Developer's Manual volume 3A section 9.1.1, "Guaranteed Atomic Operations", lists the guarantees.
I made the following changes to the above program:
+#pragma pack(push, 1) struct Queue { uint8_t *ring_buffer; uint64_t ring_size; uint64_t ring_read_pos; + uint8_t padding; uint64_t ring_write_pos; }; +#pragma pack(pop)
Now offsetof(Queue, ring_write_pos) == 25
, which breaks the alignment requirements.
And sure enough, I see the expected failures, like:
Failed invariant(140): write >= read: 712454830744, 182388436668416
Failed invariant(177): write - read <= size: 2344, 2304, 256
The purpose of "atomic operation" is to guarantee read/write be not-broken (load/store value fully in single operation). And second, more important feature, is to specify memory ordering related to other reads & writes.
On x86 aligned integer reads/writes will be "atomic" as in they won't provide broken values, as you already tested.
But also on x86 architecture code like this works without atomic ops because of its strong memory model. x86 has pretty strong guarantees in what order memory reads & writes come. That's why no atomic operations are required. On other architectures like ARM this will fail terribly, because ARM has weak memory model. There you will need to specify proper memory ordering rules otherwise your readers will potentially get wrong information out of the buffer.