Handmade Network»Forums
Amin Mesbah
13 posts / 1 project
Why does SPSC queue appear to work without atomics or synchronization?
Edited by Amin Mesbah on Reason: Remove duplicate title

I am learning about multithreading in general and winapi multithreading specifically.

I have an SPSC FIFO queue implemented over a ring buffer. I have not yet added any synchronization, and am not explicitly using any atomic operations, yet the queue appears to work. By "work" I mean that my three assertions never get triggered.

I don't understand why that is. I think either locking or atomic operations on queue.ring_write_pos and queue.ring_read_pos should be necessary.

I suspect that one of these is true:

  1. The SPSC queue is actually broken, and my test is simply insufficient to reveal this. In this case, what test could I devise that demonmstrates the breakage?
  2. The SPSC queue happens to work because I am compiling to x86_64, and reads and writes to uint64_t on x86_64 are guranteed to be atomic.
  3. I am fundamentally misunderstanding the operation of the SPSC queue, and in fact neither atomic operations nor locking are required.

I would appreciate help in identifying what I'm missing here.

// Compiled with:
//     clang version 18.1.2
//     Target: x86_64-pc-windows-msvc
//     Thread model: posix
//     InstalledDir: C:\Program Files\LLVM\bin
//     clang-cl spsc_queue.c -o "spsc_queue.exe" /MT /WX /Zi /link /DEBUG:FULL /SUBSYSTEM:CONSOLE /WX

#include <assert.h>
#include <inttypes.h>
#include <stdbool.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>

#define WIN32_LEAN_AND_MEAN
#include <windows.h>

struct Queue
{
    uint8_t *ring_buffer;
    uint64_t ring_size;
    uint64_t ring_read_pos;
    uint64_t ring_write_pos;
};

#define QUEUE_SIZE 32
static_assert((QUEUE_SIZE & (QUEUE_SIZE - 1)) == 0, "Queue size must be a power of 2");

#define MIN(a, b) (((a) < (b)) ? (a) : (b))

uint64_t ring_read(uint8_t *ring, uint64_t ring_size, uint64_t pos, void *dst, uint64_t read_size)
{
    read_size = MIN(read_size, ring_size);

    uint64_t read_start_1 = pos % ring_size;
    uint64_t read_size_1 = read_size;
    uint64_t read_size_2 = 0;

    if (read_start_1 + read_size > ring_size)
    {
        read_size_1 = ring_size - read_start_1;
        read_size_2 = read_size - read_size_1;
    }

    if (read_size_1 != 0)
    {
        memcpy(dst, ring + read_start_1, read_size_1);
    }

    if (read_size_2 != 0)
    {
        memcpy(dst + read_size_1, ring, read_size_2);
    }

    return read_size;
}

uint64_t ring_write(uint8_t *ring, uint64_t ring_size, uint64_t pos, void *src, uint64_t write_size)
{
    write_size = MIN(write_size, ring_size);

    uint64_t write_start_1 = pos % ring_size;
    uint64_t write_size_1 = write_size;
    uint64_t write_size_2 = 0;

    if (write_start_1 + write_size > ring_size)
    {
        write_size_1 = ring_size - write_start_1;
        write_size_2 = write_size - write_size_1;
    }

    if (write_size_1 != 0)
    {
        memcpy(ring + write_start_1, src, write_size_1);
    }

    if (write_size_2 != 0)
    {
        memcpy(ring, src + write_size_1, write_size_2);
    }

    return write_size;
}

void check_invariants(struct Queue *queue, int line)
{
    if (!(queue->ring_write_pos >= queue->ring_read_pos))
    {
        printf("Failed invariant(%d): write >= read: %" PRIu64 ", %" PRIu64 "\n", line, queue->ring_write_pos, queue->ring_read_pos);
        assert(false);
    }
    if (!(queue->ring_write_pos - queue->ring_read_pos <= queue->ring_size))
    {
        printf("Failed invariant(%d): write - read <= size: %" PRIu64 ", %" PRIu64 ", %" PRIu64 "\n", line, queue->ring_write_pos, queue->ring_read_pos, queue->ring_size);
        assert(false);
    }
}

DWORD WINAPI read_thread(LPVOID lpParam)
{
    struct Queue *queue = lpParam;
    bool first = true;
    uint64_t prev_value = 0;
    while (true)
    {
        if (queue->ring_write_pos - queue->ring_read_pos)
        {
            check_invariants(queue, __LINE__);
            uint64_t val;
            queue->ring_read_pos += ring_read(queue->ring_buffer, queue->ring_size, queue->ring_read_pos, &val, sizeof(uint64_t));

            if (!first)
            {
                if (val != prev_value + 1)
                {
                    printf("Val: %" PRIu64 " Prev: %" PRIu64 "\n", val, prev_value);
                    assert(false);
                }
            }
            first = false;
            prev_value = val;
        }
    }
}

int main(void)
{
    printf("Begin\n");
    struct Queue queue =
    {
        .ring_buffer = malloc(QUEUE_SIZE * sizeof(uint64_t)),
        .ring_size = QUEUE_SIZE * sizeof(uint64_t),
    };

    DWORD read_thread_id;
    CreateThread(NULL, 0, &read_thread, &queue, 0, &read_thread_id);

    uint64_t i = 0;
    uint64_t max_i = 1000;
    while (i < max_i)
    {
        while (queue.ring_write_pos != queue.ring_read_pos + queue.ring_size && i < max_i)
        {
            check_invariants(&queue, __LINE__);
            queue.ring_write_pos += ring_write(queue.ring_buffer, queue.ring_size, queue.ring_write_pos, &i, sizeof(uint64_t));
            ++i;
        }
    }

    while (queue.ring_write_pos - queue.ring_read_pos)
    {
        Sleep(16);
    }

    printf("End\n");
    return 0;
}
Amin Mesbah
13 posts / 1 project
Why does SPSC queue appear to work without atomics or synchronization?
Edited by Amin Mesbah on Reason: Fix failure example that was cut off during copy paste

I think I have proof that #2 is true. The writes to the queue.ring_write_pos and queue.ring_read_pos are guaranteed by x86_64 to be atomic, and this is why the queue works.

To prove this, I tried to make the queue fail by breaking the conditions upon which the x86_64 atomicity guarantees depend, predicting that I would observe failed asserts due to nonsensical ring_buffer offsets due to torn writes.

Intel Software Developer's Manual volume 3A section 9.1.1, "Guaranteed Atomic Operations", lists the guarantees.

I made the following changes to the above program:

+#pragma pack(push, 1)
 struct Queue
 {
     uint8_t *ring_buffer;
     uint64_t ring_size;
     uint64_t ring_read_pos;
+    uint8_t padding;
     uint64_t ring_write_pos;
 };
+#pragma pack(pop)

Now offsetof(Queue, ring_write_pos) == 25, which breaks the alignment requirements.

And sure enough, I see the expected failures, like:

Failed invariant(140): write >= read: 712454830744, 182388436668416

Failed invariant(177): write - read <= size: 2344, 2304, 256

Mārtiņš Možeiko
2583 posts / 2 projects
Why does SPSC queue appear to work without atomics or synchronization?

The purpose of "atomic operation" is to guarantee read/write be not-broken (load/store value fully in single operation). And second, more important feature, is to specify memory ordering related to other reads & writes.

On x86 aligned integer reads/writes will be "atomic" as in they won't provide broken values, as you already tested.

But also on x86 architecture code like this works without atomic ops because of its strong memory model. x86 has pretty strong guarantees in what order memory reads & writes come. That's why no atomic operations are required. On other architectures like ARM this will fail terribly, because ARM has weak memory model. There you will need to specify proper memory ordering rules otherwise your readers will potentially get wrong information out of the buffer.