Added toydispatch to libobjc. This is an implementation of a tiny subset of libdispatch, requiring no kernel support beyond a standard pthread implementation. It is intended to be used for running asynchronous book-keeping tasks for libobjc, but might also be useful for other things so it's compiled as a separate library.
parent
9df65418f7
commit
c3a6b866bb
@ -0,0 +1,18 @@
|
||||
include $(GNUSTEP_MAKEFILES)/common.make
|
||||
|
||||
LIBRARY_NAME = toydispatch
|
||||
|
||||
toydispatch_VERSION = 1
|
||||
|
||||
toydispatch_C_FILES = \
|
||||
toydispatch.c
|
||||
|
||||
toydispatch_HEADER_FILES_INSTALL_DIR = toydispatch
|
||||
toydispatch_HEADER_FILES = \
|
||||
toydispatch.h
|
||||
|
||||
toydispatch_LIBRARIES_DEPEND_UPON += -lpthread
|
||||
|
||||
toydispatch_CFLAGS += -Werror -std=c99
|
||||
|
||||
include $(GNUSTEP_MAKEFILES)/library.make
|
||||
@ -0,0 +1,240 @@
|
||||
#include <pthread.h>
|
||||
#include <stdlib.h>
|
||||
#define __TOY_DISPATCH__
|
||||
#include "toydispatch.h"
|
||||
|
||||
/**
|
||||
* Amount of total space in the ring buffer. Must be a power of two.
|
||||
*/
|
||||
#define RING_BUFFER_SIZE 32
|
||||
/**
|
||||
* Mask for converting a free-running counters into ring buffer indexes.
|
||||
*/
|
||||
#define RING_BUFFER_MASK (RING_BUFFER_SIZE - 1)
|
||||
|
||||
struct dispatch_queue
|
||||
{
|
||||
/**
|
||||
* Reference count for this queue.
|
||||
*/
|
||||
int refcount;
|
||||
/**
|
||||
* Spin lock value. Set to 1 when the queue is locked. This allows
|
||||
* multiple threads to write to the queue but only one to read from it.
|
||||
* Reading and writing can happen concurrently, but writing requires
|
||||
* acquisition of this lock.
|
||||
*/
|
||||
volatile int spinlock;
|
||||
/**
|
||||
* Producer free-running counter. Incremented every time that a new item
|
||||
* is inserted into the ring buffer.
|
||||
*/
|
||||
unsigned int producer;
|
||||
/**
|
||||
* Consumer free-running counter. Incremented every time that an item is
|
||||
* removed from the buffer.
|
||||
*/
|
||||
unsigned int consumer;
|
||||
/**
|
||||
* Mutex used to protect the condition variable.
|
||||
*/
|
||||
pthread_mutex_t mutex;
|
||||
/**
|
||||
* Condition variable used in blocking mode. The consumer thread will
|
||||
* sleep on this condition variable when the queue has been empty for a
|
||||
* little while. The next producer thread to insert something will poke
|
||||
* the condition variable on any empty->non-empty transition.
|
||||
*/
|
||||
pthread_cond_t conditionVariable;
|
||||
/**
|
||||
* Ring buffer containing functions and data to be executed by the
|
||||
* consumer.
|
||||
*/
|
||||
struct
|
||||
{
|
||||
dispatch_function_t function;
|
||||
void *data;
|
||||
} ring_buffer[RING_BUFFER_SIZE];
|
||||
};
|
||||
|
||||
/**
|
||||
* Check how much space is in the queue. The number of used elements in the
|
||||
* queue is always equal to producer - consumer. Producer will always
|
||||
* overflow before consumer (because you can't remove objects that have not
|
||||
* been inserted. In this case, the subtraction will be something along the
|
||||
* lines of (0 - (2^32 - 14)). This will be -(2^32 - 14), however this value
|
||||
* can't be represented in a 32-bit integer and so will overflow to 14, giving
|
||||
* the correct result, irrespective of overflow.
|
||||
*/
|
||||
#define SPACE(q) (RING_BUFFER_SIZE - (q->producer - q->consumer))
|
||||
/**
|
||||
* The buffer is full if there is no space in it.
|
||||
*/
|
||||
#define ISFULL(q) (SPACE(q) == 0)
|
||||
/**
|
||||
* The buffer is empty if there is no data in it.
|
||||
*/
|
||||
#define ISEMPTY(q) ((q->producer - q->consumer) == 0)
|
||||
/**
|
||||
* Converting the free running counters to array indexes is a masking
|
||||
* operation. For this to work, the buffer size must be a power of two.
|
||||
* RING_BUFFER_MASK = RING_BUFFER_SIZE - 1. If RING_BUFFER_SIZE is 256, we want the lowest 8
|
||||
* bits of the index, which is obtained by ANDing the value with 255. Any
|
||||
* power of two may be selected. Non power-of-two values could be used if a
|
||||
* more complex mapping operation were chosen, but this one is nice and cheap.
|
||||
*/
|
||||
#define MASK(index) ((index) & RING_BUFFER_MASK)
|
||||
|
||||
/**
|
||||
* Lock the queue. This uses a very lightweight, nonrecursive, spinlock. It
|
||||
* is expected that queue insertions will be relatively uncontended.
|
||||
*/
|
||||
inline static void lock_queue(dispatch_queue_t queue)
|
||||
{
|
||||
// Set the spin lock value to 1 if it is 0.
|
||||
while(!__sync_bool_compare_and_swap(&queue->spinlock, 0, 1))
|
||||
{
|
||||
// If it is already 1, let another thread play with the CPU for a bit
|
||||
// then try again.
|
||||
sched_yield();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Unlock the queue. This doesn't need to be an atomic op; that will cause a
|
||||
* complete pipeline flush on this thread and not actually buy us anything
|
||||
* because at this point only one thread (this one) will do anything that will
|
||||
* modify the variable. The other threads will all be using atomic
|
||||
* compare-and-exchange instructions which will fail because we already set it
|
||||
* to 1.
|
||||
*/
|
||||
inline static void unlock_queue(dispatch_queue_t queue)
|
||||
{
|
||||
queue->spinlock = 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Inserting an element into the queue involves the following steps:
|
||||
*
|
||||
* 1) Check that there is space in the buffer.
|
||||
* Spin if there isn't any.
|
||||
* 2) Add the invocation and optionally the proxy containing the return value
|
||||
* (nil for none) to the next two elements in the ring buffer.
|
||||
* 3) Increment the producer counter (by two, since we are adding two elements).
|
||||
* 4) If the queue was previously empty, we need to transition back to lockless
|
||||
* mode. This is done by signalling the condition variable that the other
|
||||
* thread will be waiting on if it is in blocking mode.
|
||||
*/
|
||||
inline static void insert_into_queue(dispatch_queue_t queue,
|
||||
dispatch_function_t function,
|
||||
void *data)
|
||||
{
|
||||
/* Wait for space in the buffer */
|
||||
lock_queue(queue);
|
||||
while (ISFULL(queue))
|
||||
{
|
||||
sched_yield();
|
||||
}
|
||||
unsigned int idx = MASK(queue->producer);
|
||||
queue->ring_buffer[idx].function = function;
|
||||
queue->ring_buffer[idx].data = data;
|
||||
// NOTE: This doesn't actually need to be atomic on a strongly-ordered
|
||||
// architecture like x86.
|
||||
__sync_fetch_and_add(&queue->producer, 1);
|
||||
unsigned int space = queue->producer - queue->consumer;
|
||||
unlock_queue(queue);
|
||||
// If we've just transitioned from empty to full, wake up the consumer thread.
|
||||
// Note: We do this after unlocking the queue, because it is much more
|
||||
// expensive than anything else that we do in this function and we don't
|
||||
// want to hold the spinlock for any longer than possible. We need to
|
||||
// calculate the space first, however, because otherwise another thread may
|
||||
// increment producer, while consumer stays the same (with the consumer
|
||||
// thread sleeping), preventing the wakeup.
|
||||
if (space == 1)
|
||||
{
|
||||
pthread_mutex_lock(&queue->mutex);
|
||||
pthread_cond_signal(&queue->conditionVariable);
|
||||
pthread_mutex_unlock(&queue->mutex);
|
||||
}
|
||||
}
|
||||
/**
|
||||
* Removing an element from the queue involves the following steps:
|
||||
*
|
||||
* 1) Wait until the queue has messages waiting. If there are none, enter
|
||||
* blocking mode. The additional test inside the mutex ensures that a
|
||||
* transition from blocking to non-blocking mode will not be missed, since the
|
||||
* condition variable can only be signalled when the producer thread has the
|
||||
* mutex.
|
||||
* 2) Read the invocation and return proxy from the buffer.
|
||||
* 3) Incrememt the consumer counter.
|
||||
*/
|
||||
static inline void read_from_queue(dispatch_queue_t queue,
|
||||
dispatch_function_t *function, void **data)
|
||||
{
|
||||
while (ISEMPTY(queue))
|
||||
{
|
||||
pthread_mutex_lock(&queue->mutex);
|
||||
if (ISEMPTY(queue))
|
||||
{
|
||||
pthread_cond_wait(&queue->conditionVariable, &queue->mutex);
|
||||
}
|
||||
pthread_mutex_unlock(&queue->mutex);
|
||||
}
|
||||
unsigned int idx = MASK(queue->consumer);
|
||||
*function = queue->ring_buffer[idx].function;
|
||||
*data = queue->ring_buffer[idx].data;
|
||||
__sync_fetch_and_add(&queue->consumer, 2);
|
||||
}
|
||||
|
||||
static void *runloop(void *q)
|
||||
{
|
||||
dispatch_queue_t queue = q;
|
||||
dispatch_function_t function;
|
||||
void *data;
|
||||
while (queue->refcount > 0)
|
||||
{
|
||||
read_from_queue(queue, &function, &data);
|
||||
function(data);
|
||||
}
|
||||
pthread_cond_destroy(&queue->conditionVariable);
|
||||
pthread_mutex_destroy(&queue->mutex);
|
||||
free(queue);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
||||
dispatch_queue_t dispatch_queue_create(const char *label,
|
||||
void *attr)
|
||||
{
|
||||
dispatch_queue_t queue = calloc(1, sizeof(struct dispatch_queue));
|
||||
queue->refcount = 1;
|
||||
pthread_cond_init(&queue->conditionVariable, NULL);
|
||||
pthread_mutex_init(&queue->mutex, NULL);
|
||||
pthread_t thread;
|
||||
pthread_create(&thread, NULL, runloop, queue);
|
||||
pthread_detach(thread);
|
||||
return queue;
|
||||
}
|
||||
|
||||
void dispatch_async_f(dispatch_queue_t queue, void *context,
|
||||
dispatch_function_t work)
|
||||
{
|
||||
insert_into_queue(queue, work, context);
|
||||
}
|
||||
|
||||
static void release(void *queue)
|
||||
{
|
||||
((dispatch_queue_t)queue)->refcount--;
|
||||
}
|
||||
|
||||
void dispatch_release(dispatch_queue_t queue)
|
||||
{
|
||||
// Asynchronously release the queue, so that we don't delete it before all
|
||||
// of the work is finished.
|
||||
insert_into_queue(queue, release, queue);
|
||||
}
|
||||
|
||||
void dispatch_retain(dispatch_queue_t queue)
|
||||
{
|
||||
queue->refcount++;
|
||||
}
|
||||
@ -0,0 +1,47 @@
|
||||
/**
|
||||
* toydispatch implements a (tiny) subset of the libdispatch interfaces. It
|
||||
* can produce FIFO work queues, but not concurrent ones (although queues are
|
||||
* concurrent with respect to each other, as with libdispatch). Unlike
|
||||
* libdispatch, queues all run on the same system thread. This is less
|
||||
* efficient, so the real libdispatch should be used on platforms where it is
|
||||
* available.
|
||||
*
|
||||
* Toydispatch symbol names are prefixed with toy_ so programs can be linked to
|
||||
* both libdispatch and toydispatch.
|
||||
*/
|
||||
|
||||
/* If the real libdispatch exists, use that instead of the toy one. */
|
||||
#if !defined(__has_include)
|
||||
#define __has_include(x) 0
|
||||
#endif
|
||||
#if __has_include(<dispatch/dispatch.h>) && !defined(__TOY_DISPATCH__)
|
||||
# include <dispatch/dispatch.h>
|
||||
#else
|
||||
|
||||
/**
|
||||
* Function type for functions that can be added to dispatch queues.
|
||||
*/
|
||||
typedef void (*dispatch_function_t)(void *);
|
||||
|
||||
typedef struct dispatch_queue * dispatch_queue_t;
|
||||
|
||||
#define dispatch_queue_create toy_dispatch_queue_create
|
||||
/**
|
||||
* Create a new queue. Both parameters are ignored by toydispatch.
|
||||
*/
|
||||
dispatch_queue_t dispatch_queue_create(const char *label,
|
||||
void *attr);
|
||||
|
||||
#define dispatch_async_f toy_dispatch_async_f
|
||||
/**
|
||||
* Add a function to the queue.
|
||||
*/
|
||||
void dispatch_async_f(dispatch_queue_t queue, void *context,
|
||||
dispatch_function_t work);
|
||||
|
||||
#define dispatch_release toy_dispatch_release
|
||||
void dispatch_release(dispatch_queue_t queue);
|
||||
|
||||
#define dispatch_retain toy_dispatch_retain
|
||||
void dispatch_retain(dispatch_queue_t queue);
|
||||
#endif
|
||||
Loading…
Reference in New Issue