Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallel conference bridge #4241

Open
wants to merge 11 commits into
base: master
Choose a base branch
from
83 changes: 83 additions & 0 deletions pjlib/include/pj/os.h
Original file line number Diff line number Diff line change
Expand Up @@ -1093,6 +1093,89 @@ PJ_DECL(pj_status_t) pj_event_destroy(pj_event_t *event);
*/
#endif /* PJ_HAS_EVENT_OBJ */


/* **************************************************************************/
/**
* @defgroup PJ_BARRIER_SEC Barrier sections.
* @ingroup PJ_OS
* @{
* This module provides abstraction to pj_barrier_t - synchronization barrier.
* It allows threads to block until all participating threads have reached
* the barrier,ensuring synchronization at specific points in execution.
* pj_barrier_t provides a barrier mechanism for synchronizing threads in
* a multithreaded application, similar to
* the POSIX pthread_barrier_wait or Windows EnterSynchronizationBarrier.
*/

/**
* Flags that control the behavior of the barrier
* Supported on Windows platform starting from Windows 8
* Otherwize, the flags are ignored.
*/
enum pj_barrier_flags {
/* Specifies that the thread entering the barrier should block
* immediately until the last thread enters the barrier. */
PJ_BARRIER_FLAGS_BLOCK_ONLY = 1,

/* Specifies that the thread entering the barrier should spin until
* the last thread enters the barrier,
* even if the spinning thread exceeds the barrier's maximum spin count.*/
PJ_BARRIER_FLAGS_SPIN_ONLY = 2,

/* Specifies that the function can skip the work required to ensure
* that it is safe to delete the barrier, which can improve performance.
* All threads that enter this barrier must specify the flag;
* otherwise, the flag is ignored.
* This flag should be used only if the barrier will never be deleted.
* "Never" means "when some thread is waiting on this barrier".
*/
PJ_BARRIER_FLAGS_NO_DELETE = 4
};

/**
* Create a barrier object.
* pj_barrier_create() creates a barrier object that can be used to synchronize threads.
* The barrier object is initialized with a trip count that specifies the number of threads
* that must call pj_barrier_wait() before any are allowed to proceed.
*
* @param pool The pool to allocate the barrier object.
* @param trip_count The number of threads that must call pj_barrier_wait() before any are allowed to proceed.
* @param p_barrier Pointer to hold the barrier object upon return.
*
* @return PJ_SUCCESS on success, or the error code.
*/
pj_status_t pj_barrier_create(pj_pool_t *pool, unsigned trip_count, pj_barrier_t **p_barrier);

/**
* Destroy a barrier object.
* pj_barrier_destroy() destroys a barrier object and releases any resources associated with the barrier.
*
* @param barrier The barrier to destroy.
*
* @return PJ_SUCCESS on success, or the error code.
*/
pj_status_t pj_barrier_destroy(pj_barrier_t *barrier);

/**
* Wait for all threads to reach the barrier
* pj_barrier_wait() allows threads to block until all participating threads have reached the barrier,
* ensuring synchronization at specific points in execution.
* It provides a barrier mechanism for synchronizing threads in a multithreaded application,
* similar to the POSIX pthread_barrier_wait or Windows EnterSynchronizationBarrier.
*
* @param barrier The barrier to wait on
* @param flags Flags that control the behavior of the barrier (combination of pj_barrier_flags)
*
* @return Returns PJ_TRUE for a single (arbitrary) thread synchronized
* at the barrier and PJ_FALSE for each of the other threads.
* Otherwise, an error number shall be returned to indicate the error.
*/
pj_status_t pj_barrier_wait(pj_barrier_t *barrier, pj_uint32_t flags);

/**
* @}
*/

/* **************************************************************************/
/**
* @addtogroup PJ_TIME Time Data Type and Manipulation.
Expand Down
3 changes: 3 additions & 0 deletions pjlib/include/pj/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,9 @@ typedef struct pj_sem_t pj_sem_t;
/** Event object. */
typedef struct pj_event_t pj_event_t;

/** Barrier object. */
typedef struct pj_barrier_t pj_barrier_t;

/** Unidirectional stream pipe object. */
typedef struct pj_pipe_t pj_pipe_t;

Expand Down
122 changes: 122 additions & 0 deletions pjlib/src/pj/os_core_unix.c
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,18 @@ struct pj_event_t
};
#endif /* PJ_HAS_EVENT_OBJ */

struct pj_barrier_t {
#if defined(_POSIX_BARRIERS) && _POSIX_BARRIERS >= 200112L
/* pthread_barrier is supported. */
pthread_barrier_t barrier;
#else
/* pthread_barrier is not supported. */
pj_mutex_t mutex;
pthread_cond_t cond;
unsigned count;
unsigned trip_count;
#endif
};

/*
* Flag and reference counter for PJLIB instance.
Expand Down Expand Up @@ -2080,6 +2092,116 @@ PJ_DEF(pj_status_t) pj_event_destroy(pj_event_t *event)

#endif /* PJ_HAS_EVENT_OBJ */

///////////////////////////////////////////////////////////////////////////////
#if defined(_POSIX_BARRIERS) && _POSIX_BARRIERS >= 200112L
/* pthread_barrier is supported. */

/**
* Barrier object.
*/
pj_status_t pj_barrier_create(pj_pool_t *pool, unsigned trip_count, pj_barrier_t **p_barrier) {
pj_barrier_t *barrier;
int rc;
PJ_ASSERT_RETURN(pool && p_barrier, PJ_EINVAL);
barrier = (pj_barrier_t *)pj_pool_zalloc(pool, sizeof(pj_barrier_t));
if (barrier == NULL)
return PJ_ENOMEM;
rc = pthread_barrier_init(&barrier->barrier, NULL, trip_count);
if (rc != 0)
return PJ_RETURN_OS_ERROR(rc);
*p_barrier = barrier;
return PJ_SUCCESS;
}

/**
* Wait on the barrier.
*/
pj_status_t pj_barrier_wait(pj_barrier_t *barrier, pj_uint32_t flags) {
PJ_UNUSED_ARG(flags);
int rc = pthread_barrier_wait(&barrier->barrier);
switch (rc)
{
case 0:
return PJ_FALSE;
case PTHREAD_BARRIER_SERIAL_THREAD:
return PJ_TRUE;
default:
return PJ_RETURN_OS_ERROR(rc);
}
}

/**
* Destroy the barrier.
*/
pj_status_t pj_barrier_destroy(pj_barrier_t *barrier) {
int status = pthread_barrier_destroy(&barrier->barrier);
if (status == 0)
return PJ_SUCCESS;
else
return PJ_RETURN_OS_ERROR(status);
}

#else // _POSIX_BARRIERS
/* pthread_barrier is not supported. */

/**
* Barrier object.
*/
pj_status_t pj_barrier_create(pj_pool_t *pool, unsigned trip_count, pj_barrier_t **p_barrier) {
pj_barrier_t *barrier;
pj_status_t status;

PJ_ASSERT_RETURN(pool && p_barrier, PJ_EINVAL);
barrier = (pj_barrier_t *)pj_pool_zalloc(pool, sizeof(pj_barrier_t));
if (barrier == NULL)
return PJ_ENOMEM;
status = init_mutex(&barrier->mutex, "barrier%p", PJ_MUTEX_SIMPLE);
if (status != PJ_SUCCESS)
return status;

pthread_cond_init(&barrier->cond, NULL);
barrier->count = 0;
barrier->trip_count = trip_count;
*p_barrier = barrier;
return PJ_SUCCESS;
}

/**
* Wait on the barrier.
*/
pj_status_t pj_barrier_wait(pj_barrier_t *barrier, pj_uint32_t flags) {
PJ_UNUSED_ARG(flags);

pj_bool_t is_last = PJ_FALSE;

pthread_mutex_lock(&barrier->mutex.mutex);
barrier->count++;
if (barrier->count >= barrier->trip_count)
{
barrier->count = 0;
pthread_cond_broadcast(&barrier->cond);
is_last = PJ_TRUE;
}
else
{
pthread_cond_wait(&barrier->cond, &barrier->mutex.mutex);
}
pthread_mutex_unlock(&barrier->mutex.mutex);

return is_last;
}

/**
* Destroy the barrier.
*/
pj_status_t pj_barrier_destroy(pj_barrier_t *barrier) {
pthread_cond_destroy(&barrier->cond);
return pj_mutex_destroy(&barrier->mutex);
}

#endif // _POSIX_BARRIERS


///////////////////////////////////////////////////////////////////////////////
#if defined(PJ_TERM_HAS_COLOR) && PJ_TERM_HAS_COLOR != 0
/*
Expand Down
135 changes: 135 additions & 0 deletions pjlib/src/pj/os_core_win32.c
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,28 @@ struct pj_atomic_t
long value;
};

/*
* Implementation of pj_barrier_t.
*/
#if PJ_WIN32_WINNT >= _WIN32_WINNT_WIN8
struct pj_barrier_t {
SYNCHRONIZATION_BARRIER sync_barrier;
};
#elif PJ_WIN32_WINNT >= _WIN32_WINNT_VISTA
struct pj_barrier_t {
CRITICAL_SECTION mutex;
CONDITION_VARIABLE cond;
unsigned count;
unsigned waiting;
};
#else
struct pj_barrier_t {
HANDLE cond; /* Semaphore */
LONG count; /* Number of threads required to pass the barrier */
LONG waiting;/* Number of threads waiting at the barrier */
};
#endif

/*
* Flag and reference counter for PJLIB instance.
*/
Expand Down Expand Up @@ -1546,6 +1568,119 @@ PJ_DEF(pj_status_t) pj_event_destroy(pj_event_t *event)

#endif /* PJ_HAS_EVENT_OBJ */

///////////////////////////////////////////////////////////////////////////////

/*
* pj_barrier_create()
*/
pj_status_t pj_barrier_create(pj_pool_t *pool, unsigned trip_count, pj_barrier_t **p_barrier) {
pj_barrier_t *barrier;
PJ_ASSERT_RETURN(pool && p_barrier, PJ_EINVAL);
barrier = (pj_barrier_t *)pj_pool_zalloc(pool, sizeof(pj_barrier_t));
if (barrier == NULL)
return PJ_ENOMEM;
#if PJ_WIN32_WINNT >= _WIN32_WINNT_WIN8
if (InitializeSynchronizationBarrier(&barrier->sync_barrier, trip_count, -1))
{
*p_barrier = barrier;
return PJ_SUCCESS;
}
else
return pj_get_os_error();
#elif PJ_WIN32_WINNT >= _WIN32_WINNT_VISTA
InitializeCriticalSection(&barrier->mutex);
InitializeConditionVariable(&barrier->cond);
barrier->count = trip_count;
barrier->waiting = 0;
*p_barrier = barrier;
return PJ_SUCCESS;
#else
barrier->cond = CreateSemaphore(NULL,
0, /* initial count */
trip_count, /* max count */
NULL);
if (!barrier->cond)
return pj_get_os_error();
barrier->count = trip_count;
barrier->waiting = 0;
*p_barrier = barrier;
return PJ_SUCCESS;
#endif
}

/*
* pj_barrier_destroy()
*/
pj_status_t pj_barrier_destroy(pj_barrier_t *barrier) {
PJ_ASSERT_RETURN(barrier, PJ_EINVAL);
#if PJ_WIN32_WINNT >= _WIN32_WINNT_WIN8
DeleteSynchronizationBarrier(&barrier->sync_barrier);
return PJ_SUCCESS;
#elif PJ_WIN32_WINNT >= _WIN32_WINNT_VISTA
DeleteCriticalSection(&barrier->mutex);
return PJ_SUCCESS;
#else
if (CloseHandle(barrier->cond))
return PJ_SUCCESS;
else
return pj_get_os_error();
#endif
}

/*
* pj_barrier_wait()
*/
pj_status_t pj_barrier_wait(pj_barrier_t *barrier, pj_uint32_t flags) {
PJ_ASSERT_RETURN(barrier, PJ_EINVAL);
#if PJ_WIN32_WINNT >= _WIN32_WINNT_WIN8
DWORD dwFlags = ((flags & PJ_BARRIER_FLAGS_BLOCK_ONLY) ? SYNCHRONIZATION_BARRIER_FLAGS_BLOCK_ONLY : 0) |
((flags & PJ_BARRIER_FLAGS_SPIN_ONLY) ? SYNCHRONIZATION_BARRIER_FLAGS_SPIN_ONLY : 0) |
((flags & PJ_BARRIER_FLAGS_NO_DELETE) ? SYNCHRONIZATION_BARRIER_FLAGS_NO_DELETE : 0);
return EnterSynchronizationBarrier(&barrier->sync_barrier, dwFlags);
#elif PJ_WIN32_WINNT >= _WIN32_WINNT_VISTA
PJ_UNUSED_ARG(flags);
EnterCriticalSection(&barrier->mutex);
if (++barrier->waiting == barrier->count)
{
barrier->waiting = 0;
LeaveCriticalSection(&barrier->mutex);
WakeAllConditionVariable(&barrier->cond);
return PJ_TRUE;
}
else
{
BOOL rc = SleepConditionVariableCS(&barrier->cond, &barrier->mutex, INFINITE);
LeaveCriticalSection(&barrier->mutex);
if (rc)
return PJ_FALSE;
else
return pj_get_os_error();
}
#else
PJ_UNUSED_ARG(flags);

if (InterlockedIncrement(&barrier->waiting) == barrier->count)
{
LONG previousCount = 0;
barrier->waiting = 0;
/* Release all threads waiting on the semaphore */
if (barrier->count == 1 || ReleaseSemaphore(barrier->cond, barrier->count - 1, &previousCount))
{
PJ_ASSERT_RETURN(previousCount == 0, PJ_EBUG);
return PJ_TRUE;
}
else
return pj_get_os_error();
}

DWORD rc = WaitForSingleObject(barrier->cond, INFINITE);
if (rc == WAIT_OBJECT_0)
return PJ_FALSE;
else
return pj_get_os_error();
#endif
}

///////////////////////////////////////////////////////////////////////////////
#if defined(PJ_TERM_HAS_COLOR) && PJ_TERM_HAS_COLOR != 0
/*
Expand Down
1 change: 1 addition & 0 deletions pjmedia/build/pjmedia.vcxproj
Original file line number Diff line number Diff line change
Expand Up @@ -614,6 +614,7 @@
<ClCompile Include="..\src\pjmedia\clock_thread.c" />
<ClCompile Include="..\src\pjmedia\codec.c" />
<ClCompile Include="..\src\pjmedia\conference.c" />
<ClCompile Include="..\src\pjmedia\conf_openmp.c" />
<ClCompile Include="..\src\pjmedia\conf_switch.c" />
<ClCompile Include="..\src\pjmedia\converter.c" />
<ClCompile Include="..\src\pjmedia\converter_libswscale.c" />
Expand Down
Loading
Loading