Tutorial/Multithreading

I highly encourage you to watch Handmade Hero Day 122, where Casey explains multithreading and then proceeds in subsequent videos to implement a job system much like the one here. If you're still having trouble, hopefully this article will help :)

We're interested in having multiple threads execute code. We will start by having one thread execute code and then move on to having multiple threads execute any code.

Creating a thread

Win32 provides a call that's something like

1
HANDLE CreateThread( void (*thread_code)(void *), void *thread_args);
CreateThread() actually takes in quite a bit of parameters but you end up setting most of them to zero anyways. A couple that are interesting are dwStackSize (to set stack size), lpThreadId (Win32 id to reference that thread), and dwCreationFlags that can spawn the thread asleep. For simplicity in this tutorial, the thread will start running as soon as we create it.

It takes a function pointer to the code the thread will execute and a pointer to arguments for that code. If we wanted the thread to just print out some string, we can do something like this.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
void thread_code(void *args){
   char **ptr_to_string = (char **)args;
   for(;;){ //we loop to always check for a new string, otherwise the thread will just "end"
      char *string = *ptr_to_string;
      if(string){
         printf(string);
         *ptr_to_string = 0; //print only once
      }
   }
}

int main(int argc, char **argv){
   char *string_to_print = 0;
   HANDLE handle = CreateThread(&thread_code, &string_to_print);
   string_to_print = "Hello, World!";
   CloseHandle(handle); //we won't be using this
);

If we ever only wanted our thread to do one single thing, then CreateThread() is a very convenient function. We can check that the string has printed by seeing if "string to print" has been set to zero by the other thread. We can't pass arguments to the thread after we created it, so to print again we just set "string to print" to something else.

However, we would like to have our thread run whatever code we want.

Just like how we "passed an argument" after it was created by setting "string to print" to something, we could also pass in a function pointer and change it to run something else. Because it's easier to read, I will use a typedef.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
typedef void (*ThreadCallback)(void *arguments);

struct ThreadArguments{
   ThreadCallback *callback;
   void *callback_args; 
};

//thread_code will then look like
void thread_code(void *args){
   ThraedArguments *args = (ThreadArguments *)args;
   for(;;){
      if(args->callback){
         args->callback(args->callback_args);
         args->callback = 0; 
         args->callback_args = 0;
      }
   }
}

//then we define the callback to print the string
void print_string(void *string){   printf( (char *)string );  }

int main(int argc, char **argv){
   ThreadArguments thread_arguments;
   HANDLE handle = CreateThread(&thread_code, &thread_arguments);
   thread_arguments.callback = &print_string;
   thread_arguments.callback_args = "Hello, World";
   CloseHandle(handle); //we won't be using this
}

Work Queue

A problem we have with this approach is that the main thread has to check if work it had issued has finished completing before issuing more work. We don't want the main thread to worry about when it can issue work so what we can do is buffer up "units" of work. That's really an easy problem. Let's first rename struct ThreadArguments to struct WorkQueueEntry and make a queue for the work entries like so.

1
2
3
4
5
struct WorkQueue{
   u32 next_entry_to_do;
   u32 entry_count;
   WorkQueueEntry entries[256];
};

Every time we add an entry or "job" we increment "entry count". We will pass the work queue to the thread as the argument and "thread code()" will do the next job pointed to by "next entry to do". "thread code()" will look something like this:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
void thread_code(void *args){
   WorkQueue *queue = (WorkQueue *)args;
   for(;;){
      if(queue->next_entry_to_do < queue->entry_count){
         WorkEntry *entry = queue->entries + queue->next_entry_to_do++;
         entry->callback(entry->callback_args); 

         if(queue->next_entry_to_do == queue->entry_count) //reset the queue{
            queue->entry_count = 0;
            queue->next_entry_to_do = 0;
         }
      }
   }
}

This is pretty neat. But now let's do something more interesting and have multiple thread execute code.

Multiple Threads

There are a couple of ways of going about this. One thing we can do is have one queue for each thread we create. When we add work, we iterate through all the queues we have and add it to the more "empty" queue. This is pretty reasonable but it has a major flaw.

Although adding an entry to the more "empty" queue seems like it would distribute work evenly, it actually won't. If one entry takes a really long time to complete and it still has more entries to do, then other threads will effectively sit around while this thread does "more work."

A more common approach that solves this problem is just to have a single queue and threads that aren't doing anything can just keep pulling work off of it until everything is completed.

However, this brings us the biggest problem in multithreading, which is sharing memory (work queue). Imagine if two threads attempt to increment a value(such as "next entry to do") at the same time… what would happen? We have to talk a bit about hardware to understand this…

Simplified Model of the CPU

We know a CPU needs a register file to execute code. Data gets loaded from memory into a register, it gets transformed, and then stored back out into memory. Even machine code gets loaded into a register and a special register serves as an instruction pointer that indicates the next machine instruction to execute. All the state needed to run machine code is stored in the register file.

Therefore, in order for a CPU to support running another execution path, we need another register file to hold that state.

This is called a thread and CPU's support a specific number of them. Threads need hardware to execute instructions (stuff like the ALU, FPU, instruction decoder, memory device, etc...). We call this unit of a register file and dedicated hardware a core.

We make the distinction between a thread and core because a core can support more than one thread. If we just add another register file to a core, we essentially have two threads sharing the dedicated hardware of the core to execute code. These threads are further classified as hyperthreads but the operating system just treats them as regular threads.

The main point of all this is to understand that each thread has its own state(the register file)!

Okay. So now let's consider what happens when two separate threads attempt to increment a value at the same time. Each thread will load a copy of that value into its register. The value in the register is incremented and stored back into memory.

This results in the value being only incremented once instead of twice because when the threads load the value at the same time, they are loading the same value! They both increment it by the same amount and write out the same value into memory!

What we'd like is for one thread to do the increment while the other thread stalls and waits for its turn to do the increment. This is where the processor interlocked instructions come in.

In "thread code()" we can do this.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
if(queue->next_entry_to_do < queue->entry_count){
   u32 to_do = InterlockedIncrement(&queue->next_entry_to_do);
   to_do = to_do - 1; //InterlockedIncrement does ++var, we expected var++ behavior
   WorkQueueEntry *entry = queue->entries + to_do;
   entry->callback(entry->callback_args);
   if(queue->next_entry_to_do == queue->entry_count){
      queue->entry_count = 0;
      queue->next_entry_to_do = 0;
   }
}

This solves the problem of threads doing the same work but now we have a different problem. Threads may do invalid work! Imagine if "next entry to do" was 0 and "entry count" was 1. Multiple threads may enter the if statement and although they will each increment the value individually and get a unique value for "to do", they will have incremented more times than desired.

We could do the "if check" again after we do the increment to ensure we got a valid value for "to do." If we got an invalid value then we know we incremented the variable too many times so we could essentially undo this by calling InterlockedDecrement. We end up with this

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
if(queue->next_entry_to_do < queue->entry_count){
   u32 to_do = InterlockedIncrement(&queue->next_entry_to_do);
   to_do = to_do - 1;
   if(to_do < queue->entry_count){
      WorkQueueEntry *entry = queue->entries + to_do;
      entry->callback(entry->callback_args);
      if(queue->next_entry_to_do == queue->entry_count){
         queue->entry_count = 0;
         queue->next_entry_to_do = 0;
      }
   }
   else{
      InterlockedDecrement(&queue->next_entry_to_do);
   }
}

You can see we had to restructure our code just to make it thread-safe. It'd be nice if we didn't have to. It'd be nice if there were a single interlocked function that did an interlocked increment but only if "next entry to do < entry count." There is exactly that and it's called InterlockedCompareExchange.

InterlockedCompareExchange takes in a pointer to the variable to modify, the new value to exchange it with, and a comparand value. The function will set the variable to the value only if the variable is equal to the comparand value.

How will we use this? Well, the problem with incrementing "next entry to do" was multiple threads entered the if statement and did the increment. If, every thread that did the increment saved a copy of "next entry to do" before the increment, we could pass this old copy as the comparand value and so only one the threads that entered the if statement will exchange the value. The other threads will have an incremented value for "next entry to do" and so it won't equal the comparand (the initial value) and won't do the exchange.

InterlockedCompareExchange returns the initial value of "next entry to do." All other threads will get "next entry to do" + 1. For the single thread that did the exchange to do the work, we just need to do a further check to see if the value returned was the old value. Something like:

1
2
3
4
5
6
7
u32 initial_value = queue->next_entry_to_do;
if(initial_value < queue->entry_count) {
   u32 value_returned = InterlockedCompareExchange(&queue->next_entry_to_do, initial_value + 1, initial_value);
   if(value_returned == initial_value){
      // do work
   }
}

It may seem like InterlockedCompareExchange didn't do a whole lot for us in terms of simplifying the code but that's because there's a big bug in the original code. On the queue reset, we set "next entry to do" to zero but we don't do it interlocked so that reset may not actually occur. But even if we did do an interlocked operation, we have a bigger problem which is we don't know if the zero happens before the decrement. If we zeroed the value before the decrement, we have a big problem. Writing the code without InterlockedCompareExchange means we have to ensure the zero happens after all the other threads that tried to enter the if-statement have decremented.

One way to attempt to solve this is to put all the threads that failed to pull entries of queue to sleep after they have decremented. This stops the threads from constantly doing an invalid increment. However, we would still need a way to stall before the reset in case not all threads have gone to sleep by then. We would have to keep track of a count of all the awake threads and wait until that number becomes zero to know we can safely reset the queue.

The problem has gotten quite complicated which is why InterlockedCompareExchange is such a nice way to get multithreading code to work. But even so, there is still a problem with the new code but it can be easily fixed (also a problem with the old one as well). We pull entries off the queue sequentially, since we do an interlocked increment. However, those entries may not get done in that same order. Some may take longer than others. The way we have the code working now, the thread the pulls the very last entry and finishes, resets the queue. This is a bug because what if we reset the queue and some of the other entries are still being worked on? All we have to have is some notion of a "completion count" to keep track of the total number of entries completed. And when that number matches the "entry count" we know that thread was the last one left working and the one to reset the queue.

Putting threads to sleep

TODO

volatile and memory fences

TODO


Edited by Jesse Coyle on Reason: another typo

Thanks for this - exactly what I was looking for! (I love how adding "handmade" to the beginning of a search in google often lands me the right answer)