Skip to content

Commit

Permalink
gstUtils: creates a GObject wrapper for GstPromise
Browse files Browse the repository at this point in the history
Since the reference management of boxed object in Node-GTK is somewhat
broken, I've decided to use GObject as a vehicle instead. This also
allow thread trampolining to happen in C instead of JS.
  • Loading branch information
peat-psuwit committed Nov 12, 2023
1 parent 57623b5 commit 43f0682
Show file tree
Hide file tree
Showing 5 changed files with 222 additions and 39 deletions.
89 changes: 89 additions & 0 deletions src-native/NgwNativePromise.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
#include "NgwNativePromise.h"

struct _NgwNativePromise
{
GObject parent_instance;

GstPromise * gst_promise;
};

G_DEFINE_TYPE(NgwNativePromise, ngw_native_promise, G_TYPE_OBJECT)

enum {
SIGNAL_ON_CHANGED,
LAST_SIGNAL,
};

static guint promise_signals[LAST_SIGNAL] = { 0 };

static void ngw_native_promise_dispose (GObject *);

static void
ngw_native_promise_class_init(NgwNativePromiseClass * klass)
{
GObjectClass *object_class = G_OBJECT_CLASS (klass);

object_class->dispose = ngw_native_promise_dispose;

/**
* NgwNativePromise::on-changed:
* @object: the #NgwNativePromise
*/
promise_signals[SIGNAL_ON_CHANGED] =
g_signal_new ("on-changed", G_TYPE_FROM_CLASS (klass),
G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_NONE, 0);
}

static void on_gst_promise_changed (GstPromise * promise, gpointer user_data);

static void
ngw_native_promise_init (NgwNativePromise * self)
{
// XXX: will have to make sure JS side hold this object long enough.
self->gst_promise = gst_promise_new_with_change_func(
on_gst_promise_changed, /* user_data */ self, /* GDestroyNotify */ NULL);
}

static gboolean
handle_gst_promise_changed_on_idle (gpointer user_data)
{
NgwNativePromise * self = NGW_NATIVE_PROMISE(user_data);

g_signal_emit(self, promise_signals[SIGNAL_ON_CHANGED], /* detail */ 0);

return G_SOURCE_REMOVE;
}

static void
on_gst_promise_changed (GstPromise * promise G_GNUC_UNUSED, gpointer user_data)
{
NgwNativePromise * self = NGW_NATIVE_PROMISE(user_data);

g_idle_add (handle_gst_promise_changed_on_idle, self);
}

/**
* ngw_native_promise_get_gst_promise:
* @self: the #NgwNativePromise
*
* Returns: (transfer none): the #GstPromise
*/
GstPromise *
ngw_native_promise_get_gst_promise (NgwNativePromise * self)
{
return self->gst_promise;
}

static void
ngw_native_promise_dispose (GObject * gobject)
{
NgwNativePromise * self = NGW_NATIVE_PROMISE(gobject);

g_clear_pointer(&self->gst_promise, gst_promise_unref);
}

NgwNativePromise *
ngw_native_promise_new (void)
{
return g_object_new (NGW_NATIVE_PROMISE_TYPE, NULL);
}
22 changes: 22 additions & 0 deletions src-native/NgwNativePromise.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
#pragma once

#include <glib-object.h>
#include <gst/gstpromise.h>

#include "NgwNativeCommon.h"

G_BEGIN_DECLS

#define NGW_NATIVE_PROMISE_TYPE ngw_native_promise_get_type()
NGWNATIVE_PUBLIC G_DECLARE_FINAL_TYPE(
NgwNativePromise,
ngw_native_promise,
NGW_NATIVE, PROMISE, GObject)

NGWNATIVE_PUBLIC
NgwNativePromise * ngw_native_promise_new(void);

NGWNATIVE_PUBLIC
GstPromise * ngw_native_promise_get_gst_promise(NgwNativePromise * self);

G_END_DECLS
2 changes: 2 additions & 0 deletions src-native/meson.build
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@ project('NgwNative', 'c',
lib_args = ['-DBUILDING_NGWNATIVE']

src_files = [
'NgwNativePromise.c',
'NgwNativeRTCDataChannel.c'
]

header_files = [
'NgwNativePromise.h',
'NgwNativeRTCDataChannel.h'
]

Expand Down
80 changes: 80 additions & 0 deletions src/@types/node-ngwnative-0.0.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,72 @@ import type GModule from './node-gmodule-2.0.js';

export namespace NgwNative {

module Promise {

// Signal callback interfaces

/**
* Signal callback interface for `on-changed`
*/
interface OnChangedSignalCallback {
(): void
}


// Constructor properties interface

interface ConstructorProperties extends GObject.Object.ConstructorProperties {
}

}

interface Promise {

// Own properties of NgwNative-0.0.NgwNative.Promise

__gtype__: number

// Owm methods of NgwNative-0.0.NgwNative.Promise

getGstPromise(): Gst.Promise

// Own signals of NgwNative-0.0.NgwNative.Promise

connect(sigName: "on-changed", callback: Promise.OnChangedSignalCallback): number
on(sigName: "on-changed", callback: Promise.OnChangedSignalCallback, after?: boolean): NodeJS.EventEmitter
once(sigName: "on-changed", callback: Promise.OnChangedSignalCallback, after?: boolean): NodeJS.EventEmitter
off(sigName: "on-changed", callback: Promise.OnChangedSignalCallback): NodeJS.EventEmitter
emit(sigName: "on-changed", ...args: any[]): void

// Class property signals of NgwNative-0.0.NgwNative.Promise

connect(sigName: "notify::__gtype__", callback: (...args: any[]) => void): number
on(sigName: "notify::__gtype__", callback: (...args: any[]) => void, after?: boolean): NodeJS.EventEmitter
once(sigName: "notify::__gtype__", callback: (...args: any[]) => void, after?: boolean): NodeJS.EventEmitter
off(sigName: "notify::__gtype__", callback: (...args: any[]) => void): NodeJS.EventEmitter
emit(sigName: "notify::__gtype__", ...args: any[]): void
connect(sigName: string, callback: (...args: any[]) => void): number
on(sigName: string, callback: (...args: any[]) => void, after?: boolean): NodeJS.EventEmitter
once(sigName: string, callback: (...args: any[]) => void, after?: boolean): NodeJS.EventEmitter
off(sigName: string, callback: (...args: any[]) => void): NodeJS.EventEmitter
emit(sigName: string, ...args: any[]): void
disconnect(id: number): void
}

class Promise extends GObject.Object {

// Own properties of NgwNative-0.0.NgwNative.Promise

static name: string

// Constructors of NgwNative-0.0.NgwNative.Promise

constructor(config?: Promise.ConstructorProperties)
constructor()
static new(): Promise
_init(config?: Promise.ConstructorProperties): void
}

module RTCDataChannel {

// Signal callback interfaces
Expand Down Expand Up @@ -153,6 +219,20 @@ class RTCDataChannel extends GObject.Object {
_init(config?: RTCDataChannel.ConstructorProperties): void
}

interface PromiseClass {

// Own fields of NgwNative-0.0.NgwNative.PromiseClass

parentClass: GObject.ObjectClass
}

abstract class PromiseClass {

// Own properties of NgwNative-0.0.NgwNative.PromiseClass

static name: string
}

interface RTCDataChannelClass {

// Own fields of NgwNative-0.0.NgwNative.RTCDataChannelClass
Expand Down
68 changes: 29 additions & 39 deletions src/gstUtils.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { setImmediate as resolveImmediate } from 'timers/promises';
import NgwNative from './@types/node-ngwnative-0.0';

// For convenience
import GLib from './@types/node-glib-2.0';
Expand All @@ -16,45 +16,35 @@ Gst.init(null);
// UINT64_MAX.
export const GST_CLOCK_TIME_NONE = BigInt.asUintN(/* width */ 64, -1n);

async function rejectImmediate(reason: any): Promise<any> {
await resolveImmediate();
throw reason;
}

// Node-gtk doesn't handle circular reference well, and GstPromise doesn't have
// a way to change the changeFunc, so we have to create a change function that
// doesn't capture the GstPromise itself in a closure to prevent circular reference.
function getGstPromiseChangeFuncForResolveReject(
resolve: (s: Promise<Gst.Structure | null>) => void,
reject: (reason: any) => void,
) {
return function changeFunc (gstPromise: Gst.Promise) {
switch (gstPromise.wait()) {
case Gst.PromiseResult.EXPIRED:
resolve(rejectImmediate(new Error('GstPromise is expired.')));
break;
case Gst.PromiseResult.INTERRUPTED:
resolve(rejectImmediate(new Error('GstPromise is interrupted.')));
break;
case Gst.PromiseResult.REPLIED:
let reply = gstPromise.getReply();
if (reply && reply.hasField('error')) {
let errorV = <GObject.Value>reply.getValue('error');
let error = <GLib.Error>errorV.getBoxed();
errorV.unset();

resolve(rejectImmediate(new Error(error.message || undefined)));
} else {
resolve(resolveImmediate(reply));
}
}
}
}

export function withGstPromise(f: (p: Gst.Promise) => void) {
return new Promise<Gst.Structure | null>((resolve, reject) => {
const gstPromise = Gst.Promise.newWithChangeFunc(
getGstPromiseChangeFuncForResolveReject(resolve, reject));
f(gstPromise);
let gstPromiseWrapper = new NgwNative.Promise();

gstPromiseWrapper.once("on-changed", () => {
// Pull gstPromiseWrapper itself into the closure.
let gstPromise = gstPromiseWrapper.getGstPromise();

switch (gstPromise.wait()) {
case Gst.PromiseResult.EXPIRED:
reject(new Error('GstPromise is expired.'));
break;
case Gst.PromiseResult.INTERRUPTED:
reject(new Error('GstPromise is interrupted.'));
break;
case Gst.PromiseResult.REPLIED:
let reply = gstPromise.getReply();
if (reply && reply.hasField('error')) {
let errorV = <GObject.Value>reply.getValue('error');
let error = <GLib.Error>errorV.getBoxed();
errorV.unset();

reject(new Error(error.message || undefined));
} else {
resolve(reply);
}
}
});

f(gstPromiseWrapper.getGstPromise());
});
}

0 comments on commit 43f0682

Please sign in to comment.