diff --git a/GNUmakefile b/GNUmakefile index 10ba52d..58b4b19 100644 --- a/GNUmakefile +++ b/GNUmakefile @@ -2,6 +2,8 @@ include $(GNUSTEP_MAKEFILES)/common.make LIBRARY_NAME = libobjc +SUBPROJECTS = toydispatch + libobjc_VERSION = 4 libobjc_OBJC_FILES = \ @@ -77,6 +79,7 @@ libobjc_C_FILES += libobjc_entry.c endif include $(GNUSTEP_MAKEFILES)/library.make +include $(GNUSTEP_MAKEFILES)/aggregate.make ifeq ($(findstring no, $(debug)),) before-all:: @@ -88,3 +91,4 @@ before-all:: @echo @echo endif + diff --git a/toydispatch/GNUmakefile b/toydispatch/GNUmakefile new file mode 100644 index 0000000..2f5fcc4 --- /dev/null +++ b/toydispatch/GNUmakefile @@ -0,0 +1,18 @@ +include $(GNUSTEP_MAKEFILES)/common.make + +LIBRARY_NAME = toydispatch + +toydispatch_VERSION = 1 + +toydispatch_C_FILES = \ + toydispatch.c + +toydispatch_HEADER_FILES_INSTALL_DIR = toydispatch +toydispatch_HEADER_FILES = \ + toydispatch.h + +toydispatch_LIBRARIES_DEPEND_UPON += -lpthread + +toydispatch_CFLAGS += -Werror -std=c99 + +include $(GNUSTEP_MAKEFILES)/library.make diff --git a/toydispatch/toydispatch.c b/toydispatch/toydispatch.c new file mode 100644 index 0000000..ae70a42 --- /dev/null +++ b/toydispatch/toydispatch.c @@ -0,0 +1,240 @@ +#include +#include +#define __TOY_DISPATCH__ +#include "toydispatch.h" + +/** + * Amount of total space in the ring buffer. Must be a power of two. + */ +#define RING_BUFFER_SIZE 32 +/** + * Mask for converting a free-running counters into ring buffer indexes. + */ +#define RING_BUFFER_MASK (RING_BUFFER_SIZE - 1) + +struct dispatch_queue +{ + /** + * Reference count for this queue. + */ + int refcount; + /** + * Spin lock value. Set to 1 when the queue is locked. This allows + * multiple threads to write to the queue but only one to read from it. + * Reading and writing can happen concurrently, but writing requires + * acquisition of this lock. + */ + volatile int spinlock; + /** + * Producer free-running counter. Incremented every time that a new item + * is inserted into the ring buffer. + */ + unsigned int producer; + /** + * Consumer free-running counter. Incremented every time that an item is + * removed from the buffer. + */ + unsigned int consumer; + /** + * Mutex used to protect the condition variable. + */ + pthread_mutex_t mutex; + /** + * Condition variable used in blocking mode. The consumer thread will + * sleep on this condition variable when the queue has been empty for a + * little while. The next producer thread to insert something will poke + * the condition variable on any empty->non-empty transition. + */ + pthread_cond_t conditionVariable; + /** + * Ring buffer containing functions and data to be executed by the + * consumer. + */ + struct + { + dispatch_function_t function; + void *data; + } ring_buffer[RING_BUFFER_SIZE]; +}; + +/** + * Check how much space is in the queue. The number of used elements in the + * queue is always equal to producer - consumer. Producer will always + * overflow before consumer (because you can't remove objects that have not + * been inserted. In this case, the subtraction will be something along the + * lines of (0 - (2^32 - 14)). This will be -(2^32 - 14), however this value + * can't be represented in a 32-bit integer and so will overflow to 14, giving + * the correct result, irrespective of overflow. + */ +#define SPACE(q) (RING_BUFFER_SIZE - (q->producer - q->consumer)) +/** + * The buffer is full if there is no space in it. + */ +#define ISFULL(q) (SPACE(q) == 0) +/** + * The buffer is empty if there is no data in it. + */ +#define ISEMPTY(q) ((q->producer - q->consumer) == 0) +/** + * Converting the free running counters to array indexes is a masking + * operation. For this to work, the buffer size must be a power of two. + * RING_BUFFER_MASK = RING_BUFFER_SIZE - 1. If RING_BUFFER_SIZE is 256, we want the lowest 8 + * bits of the index, which is obtained by ANDing the value with 255. Any + * power of two may be selected. Non power-of-two values could be used if a + * more complex mapping operation were chosen, but this one is nice and cheap. + */ +#define MASK(index) ((index) & RING_BUFFER_MASK) + +/** + * Lock the queue. This uses a very lightweight, nonrecursive, spinlock. It + * is expected that queue insertions will be relatively uncontended. + */ +inline static void lock_queue(dispatch_queue_t queue) +{ + // Set the spin lock value to 1 if it is 0. + while(!__sync_bool_compare_and_swap(&queue->spinlock, 0, 1)) + { + // If it is already 1, let another thread play with the CPU for a bit + // then try again. + sched_yield(); + } +} + +/** + * Unlock the queue. This doesn't need to be an atomic op; that will cause a + * complete pipeline flush on this thread and not actually buy us anything + * because at this point only one thread (this one) will do anything that will + * modify the variable. The other threads will all be using atomic + * compare-and-exchange instructions which will fail because we already set it + * to 1. + */ +inline static void unlock_queue(dispatch_queue_t queue) +{ + queue->spinlock = 0; +} + +/** + * Inserting an element into the queue involves the following steps: + * + * 1) Check that there is space in the buffer. + * Spin if there isn't any. + * 2) Add the invocation and optionally the proxy containing the return value + * (nil for none) to the next two elements in the ring buffer. + * 3) Increment the producer counter (by two, since we are adding two elements). + * 4) If the queue was previously empty, we need to transition back to lockless + * mode. This is done by signalling the condition variable that the other + * thread will be waiting on if it is in blocking mode. + */ +inline static void insert_into_queue(dispatch_queue_t queue, + dispatch_function_t function, + void *data) +{ + /* Wait for space in the buffer */ + lock_queue(queue); + while (ISFULL(queue)) + { + sched_yield(); + } + unsigned int idx = MASK(queue->producer); + queue->ring_buffer[idx].function = function; + queue->ring_buffer[idx].data = data; + // NOTE: This doesn't actually need to be atomic on a strongly-ordered + // architecture like x86. + __sync_fetch_and_add(&queue->producer, 1); + unsigned int space = queue->producer - queue->consumer; + unlock_queue(queue); + // If we've just transitioned from empty to full, wake up the consumer thread. + // Note: We do this after unlocking the queue, because it is much more + // expensive than anything else that we do in this function and we don't + // want to hold the spinlock for any longer than possible. We need to + // calculate the space first, however, because otherwise another thread may + // increment producer, while consumer stays the same (with the consumer + // thread sleeping), preventing the wakeup. + if (space == 1) + { + pthread_mutex_lock(&queue->mutex); + pthread_cond_signal(&queue->conditionVariable); + pthread_mutex_unlock(&queue->mutex); + } +} +/** + * Removing an element from the queue involves the following steps: + * + * 1) Wait until the queue has messages waiting. If there are none, enter + * blocking mode. The additional test inside the mutex ensures that a + * transition from blocking to non-blocking mode will not be missed, since the + * condition variable can only be signalled when the producer thread has the + * mutex. + * 2) Read the invocation and return proxy from the buffer. + * 3) Incrememt the consumer counter. + */ +static inline void read_from_queue(dispatch_queue_t queue, + dispatch_function_t *function, void **data) +{ + while (ISEMPTY(queue)) + { + pthread_mutex_lock(&queue->mutex); + if (ISEMPTY(queue)) + { + pthread_cond_wait(&queue->conditionVariable, &queue->mutex); + } + pthread_mutex_unlock(&queue->mutex); + } + unsigned int idx = MASK(queue->consumer); + *function = queue->ring_buffer[idx].function; + *data = queue->ring_buffer[idx].data; + __sync_fetch_and_add(&queue->consumer, 2); +} + +static void *runloop(void *q) +{ + dispatch_queue_t queue = q; + dispatch_function_t function; + void *data; + while (queue->refcount > 0) + { + read_from_queue(queue, &function, &data); + function(data); + } + pthread_cond_destroy(&queue->conditionVariable); + pthread_mutex_destroy(&queue->mutex); + free(queue); + return NULL; +} + + +dispatch_queue_t dispatch_queue_create(const char *label, + void *attr) +{ + dispatch_queue_t queue = calloc(1, sizeof(struct dispatch_queue)); + queue->refcount = 1; + pthread_cond_init(&queue->conditionVariable, NULL); + pthread_mutex_init(&queue->mutex, NULL); + pthread_t thread; + pthread_create(&thread, NULL, runloop, queue); + pthread_detach(thread); + return queue; +} + +void dispatch_async_f(dispatch_queue_t queue, void *context, + dispatch_function_t work) +{ + insert_into_queue(queue, work, context); +} + +static void release(void *queue) +{ + ((dispatch_queue_t)queue)->refcount--; +} + +void dispatch_release(dispatch_queue_t queue) +{ + // Asynchronously release the queue, so that we don't delete it before all + // of the work is finished. + insert_into_queue(queue, release, queue); +} + +void dispatch_retain(dispatch_queue_t queue) +{ + queue->refcount++; +} diff --git a/toydispatch/toydispatch.h b/toydispatch/toydispatch.h new file mode 100644 index 0000000..e89b9ad --- /dev/null +++ b/toydispatch/toydispatch.h @@ -0,0 +1,47 @@ +/** + * toydispatch implements a (tiny) subset of the libdispatch interfaces. It + * can produce FIFO work queues, but not concurrent ones (although queues are + * concurrent with respect to each other, as with libdispatch). Unlike + * libdispatch, queues all run on the same system thread. This is less + * efficient, so the real libdispatch should be used on platforms where it is + * available. + * + * Toydispatch symbol names are prefixed with toy_ so programs can be linked to + * both libdispatch and toydispatch. + */ + +/* If the real libdispatch exists, use that instead of the toy one. */ +#if !defined(__has_include) +#define __has_include(x) 0 +#endif +#if __has_include() && !defined(__TOY_DISPATCH__) +# include +#else + +/** + * Function type for functions that can be added to dispatch queues. + */ +typedef void (*dispatch_function_t)(void *); + +typedef struct dispatch_queue * dispatch_queue_t; + +#define dispatch_queue_create toy_dispatch_queue_create +/** + * Create a new queue. Both parameters are ignored by toydispatch. + */ +dispatch_queue_t dispatch_queue_create(const char *label, + void *attr); + +#define dispatch_async_f toy_dispatch_async_f +/** + * Add a function to the queue. + */ +void dispatch_async_f(dispatch_queue_t queue, void *context, + dispatch_function_t work); + +#define dispatch_release toy_dispatch_release +void dispatch_release(dispatch_queue_t queue); + +#define dispatch_retain toy_dispatch_retain +void dispatch_retain(dispatch_queue_t queue); +#endif