I am learning about multithreading in general and winapi multithreading specifically.
I have an SPSC FIFO queue implemented over a ring buffer. I have not yet added any synchronization, and am not explicitly using any atomic operations, yet the queue appears to work. By "work" I mean that my three assertions never get triggered.
I don't understand why that is. I think either locking or atomic operations on queue.ring_write_pos
and queue.ring_read_pos
should be necessary.
I suspect that one of these is true:
- The SPSC queue is actually broken, and my test is simply insufficient to reveal this. In this case, what test could I devise that demonmstrates the breakage?
- The SPSC queue happens to work because I am compiling to x86_64, and reads and writes to uint64_t on x86_64 are guranteed to be atomic.
- I am fundamentally misunderstanding the operation of the SPSC queue, and in fact neither atomic operations nor locking are required.
I would appreciate help in identifying what I'm missing here.
// Compiled with: // clang version 18.1.2 // Target: x86_64-pc-windows-msvc // Thread model: posix // InstalledDir: C:\Program Files\LLVM\bin // clang-cl spsc_queue.c -o "spsc_queue.exe" /MT /WX /Zi /link /DEBUG:FULL /SUBSYSTEM:CONSOLE /WX #include <assert.h> #include <inttypes.h> #include <stdbool.h> #include <stdint.h> #include <stdio.h> #include <stdlib.h> #define WIN32_LEAN_AND_MEAN #include <windows.h> struct Queue { uint8_t *ring_buffer; uint64_t ring_size; uint64_t ring_read_pos; uint64_t ring_write_pos; }; #define QUEUE_SIZE 32 static_assert((QUEUE_SIZE & (QUEUE_SIZE - 1)) == 0, "Queue size must be a power of 2"); #define MIN(a, b) (((a) < (b)) ? (a) : (b)) uint64_t ring_read(uint8_t *ring, uint64_t ring_size, uint64_t pos, void *dst, uint64_t read_size) { read_size = MIN(read_size, ring_size); uint64_t read_start_1 = pos % ring_size; uint64_t read_size_1 = read_size; uint64_t read_size_2 = 0; if (read_start_1 + read_size > ring_size) { read_size_1 = ring_size - read_start_1; read_size_2 = read_size - read_size_1; } if (read_size_1 != 0) { memcpy(dst, ring + read_start_1, read_size_1); } if (read_size_2 != 0) { memcpy(dst + read_size_1, ring, read_size_2); } return read_size; } uint64_t ring_write(uint8_t *ring, uint64_t ring_size, uint64_t pos, void *src, uint64_t write_size) { write_size = MIN(write_size, ring_size); uint64_t write_start_1 = pos % ring_size; uint64_t write_size_1 = write_size; uint64_t write_size_2 = 0; if (write_start_1 + write_size > ring_size) { write_size_1 = ring_size - write_start_1; write_size_2 = write_size - write_size_1; } if (write_size_1 != 0) { memcpy(ring + write_start_1, src, write_size_1); } if (write_size_2 != 0) { memcpy(ring, src + write_size_1, write_size_2); } return write_size; } void check_invariants(struct Queue *queue, int line) { if (!(queue->ring_write_pos >= queue->ring_read_pos)) { printf("Failed invariant(%d): write >= read: %" PRIu64 ", %" PRIu64 "\n", line, queue->ring_write_pos, queue->ring_read_pos); assert(false); } if (!(queue->ring_write_pos - queue->ring_read_pos <= queue->ring_size)) { printf("Failed invariant(%d): write - read <= size: %" PRIu64 ", %" PRIu64 ", %" PRIu64 "\n", line, queue->ring_write_pos, queue->ring_read_pos, queue->ring_size); assert(false); } } DWORD WINAPI read_thread(LPVOID lpParam) { struct Queue *queue = lpParam; bool first = true; uint64_t prev_value = 0; while (true) { if (queue->ring_write_pos - queue->ring_read_pos) { check_invariants(queue, __LINE__); uint64_t val; queue->ring_read_pos += ring_read(queue->ring_buffer, queue->ring_size, queue->ring_read_pos, &val, sizeof(uint64_t)); if (!first) { if (val != prev_value + 1) { printf("Val: %" PRIu64 " Prev: %" PRIu64 "\n", val, prev_value); assert(false); } } first = false; prev_value = val; } } } int main(void) { printf("Begin\n"); struct Queue queue = { .ring_buffer = malloc(QUEUE_SIZE * sizeof(uint64_t)), .ring_size = QUEUE_SIZE * sizeof(uint64_t), }; DWORD read_thread_id; CreateThread(NULL, 0, &read_thread, &queue, 0, &read_thread_id); uint64_t i = 0; uint64_t max_i = 1000; while (i < max_i) { while (queue.ring_write_pos != queue.ring_read_pos + queue.ring_size && i < max_i) { check_invariants(&queue, __LINE__); queue.ring_write_pos += ring_write(queue.ring_buffer, queue.ring_size, queue.ring_write_pos, &i, sizeof(uint64_t)); ++i; } } while (queue.ring_write_pos - queue.ring_read_pos) { Sleep(16); } printf("End\n"); return 0; }