Skip to content

Commit

Permalink
fragment ws (#400)
Browse files Browse the repository at this point in the history
  • Loading branch information
joente authored Dec 21, 2024
1 parent b8fce68 commit 8b7b434
Show file tree
Hide file tree
Showing 6 changed files with 36 additions and 31 deletions.
2 changes: 1 addition & 1 deletion inc/ti/version.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
* "-rc0"
* ""
*/
#define TI_VERSION_PRE_RELEASE "-alpha0"
#define TI_VERSION_PRE_RELEASE "-alpha1"

#define TI_MAINTAINER \
"Jeroen van der Heijden <[email protected]>"
Expand Down
2 changes: 0 additions & 2 deletions inc/ti/ws.t.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@ struct ti_ws_s
queue_t * queue; /* ti_write_t */
ti_stream_t * stream;
struct lws * wsi;
unsigned char * wbuf;
size_t wbuf_sz;
};

#endif /* TI_WS_T_H_ */
5 changes: 5 additions & 0 deletions itest/test_ws.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ async def test_simple_ws(self, client):
self.assertIsInstance(info, str)
res = await client.query('6 * 7;')
self.assertEqual(res, 42)
n = 10000
res = await client.query("""//ti
range(n).map(|i| `item {i}`);
""", n=n)
self.assertEqual(res, [f'item {i}' for i in range(n)])


if __name__ == '__main__':
Expand Down
3 changes: 2 additions & 1 deletion itest/ws/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@
const thingsdb = new ThingsDB('ws://localhost:9270');
thingsdb.connect().then(() => {
thingsdb.auth().then(() => {
thingsdb.query('@:stuff', '"Hello World!";').then(response => {
thingsdb.query('@thingsdb', '"Hello World!";').then(response => {
console.log('Query done!');
console.log(response); // will be "Hello World!"
});
});
Expand Down
2 changes: 1 addition & 1 deletion src/ti/stream.c
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ const char * ti_stream_name(ti_stream_t * stream)
return stream->name_ ? stream->name_ : "<client> "STREAM__UNRESOLVED;
case TI_STREAM_WS_IN_CLIENT:
if (stream->via.user)
sprintf(prefix, "<client:%"PRIu64"> ", stream->via.user->id);
sprintf(prefix, "<ws:%"PRIu64"> ", stream->via.user->id);
else
sprintf(prefix, "<ws:not authorized> ");
stream->name_ = ti_ws_name(prefix, (ti_ws_t *) stream->with.ws);
Expand Down
53 changes: 27 additions & 26 deletions src/ti/ws.c
Original file line number Diff line number Diff line change
Expand Up @@ -48,44 +48,37 @@ static int ws__callback_established(struct lws * wsi, ti_ws_t * pss)

static int ws__callback_server_writable(struct lws * wsi, ti_ws_t * pss)
{
const size_t sugsz = 8192 - LWS_PRE;
const size_t mf = LWS_SS_MTU-LWS_PRE;
unsigned char mtubuff[LWS_SS_MTU];
unsigned char * out = mtubuff + LWS_PRE;
unsigned char * pt;
int flags, m;
size_t n;
size_t n, f, nf, len;
ti_pkg_t * pkg;
ti_write_t * req = queue_shift(pss->queue);
if (!req)
return 0; /* nothing to write */

pkg = req->pkg;
flags = lws_write_ws_flags(LWS_WRITE_BINARY, 1, 1);

/* notice we allowed for LWS_PRE in the payload already */
n = sizeof(ti_pkg_t) + pkg->n;
if (n > pss->wbuf_sz)
nf = (n-1)/mf+1;
pt = (unsigned char *) pkg;

for (f=1; f<=nf; ++f, pt+=mf, n-=mf)
{
size_t sz = n > sugsz ? n : sugsz;
unsigned char * tmp = malloc(LWS_PRE + sz);
if (!tmp)
flags = lws_write_ws_flags(LWS_WRITE_BINARY, f==1, f==nf);
len = mf > n ? n : mf;
memcpy(out, pt, len);
m = lws_write(wsi, out, len, flags);
if (m < (int) len)
{
log_error(EX_MEMORY_S);
req->cb_(req, EX_MEMORY);
log_error("ERROR %d; writing to WebSocket", m);
req->cb_(req, EX_WRITE_UV);
return -1;
}
pss->wbuf_sz = sz;
free(pss->wbuf);
pss->wbuf = tmp;
}

memcpy(pss->wbuf + LWS_PRE, (unsigned char *) pkg, n);

m = lws_write(wsi, pss->wbuf + LWS_PRE, n, flags);
if (m < (int) n)
{
log_error("ERROR %d; writing to WebSocket", m);
req->cb_(req, EX_WRITE_UV);
return -1;
}

lws_callback_on_writable(wsi);
req->cb_(req, 0);
return 0;
Expand Down Expand Up @@ -115,7 +108,17 @@ static int ws__callback_receive(struct lws * wsi, ti_ws_t * pss, void * in, size

if (lws_is_final_fragment(wsi))
{
ti_pkg_t * pkg = (ti_pkg_t *) stream->buf;
ti_pkg_t * pkg;
if (stream->n < sizeof(ti_pkg_t))
{
log_error(
"invalid package (header too small) from `%s`, "
"closing connection",
ti_stream_name(stream));
ti_stream_close(stream);
return -1;
}
pkg = (ti_pkg_t *) stream->buf;
stream->n = 0; /* reset buffer */
if (!ti_pkg_check(pkg))
{
Expand Down Expand Up @@ -161,7 +164,6 @@ int ws__callback(
case LWS_CALLBACK_CLOSED:
if (pss->stream)
{
free(pss->wbuf);
ti_stream_close(pss->stream);
queue_destroy(pss->queue, free);
}
Expand Down Expand Up @@ -350,7 +352,6 @@ char * ti_ws_name(const char * prefix, ti_ws_t * pss)
return NULL;
}


void ti_ws_destroy(void)
{
if (ws__context)
Expand Down

0 comments on commit 8b7b434

Please sign in to comment.