From 24fe9d65a37cb33d07a65fd88cd24ff3b79569ca Mon Sep 17 00:00:00 2001 From: Doug Bell Date: Wed, 29 Jul 2020 21:25:24 -0500 Subject: [PATCH] add feed action to controller The feed action first sends the same response as the list action, only over a websocket. Then, it polls the database periodically and sends any changes that happen. Look for some examples soon! --- lib/Yancy/Controller/Yancy.pm | 360 +++++++++++++++++++++++++++------- t/controller/yancy.t | 54 +++++ t/fixtures/basic/dbic.pl | 2 +- t/fixtures/basic/mysql.sql | 2 +- t/fixtures/basic/pg.sql | 2 +- t/fixtures/basic/sqlite.sql | 2 +- 6 files changed, 347 insertions(+), 75 deletions(-) diff --git a/lib/Yancy/Controller/Yancy.pm b/lib/Yancy/Controller/Yancy.pm index dee83777..176768f4 100644 --- a/lib/Yancy/Controller/Yancy.pm +++ b/lib/Yancy/Controller/Yancy.pm @@ -354,75 +354,7 @@ number and setting the C<$offset> query parameter. sub list { my ( $c ) = @_; - if ( $c->stash( 'collection' ) ) { - derp '"collection" stash key is now "schema" in controller configuration'; - } - my $schema_name = $c->stash( 'schema' ) || $c->stash( 'collection' ) - || die "Schema name not defined in stash"; - my $limit = $c->param( '$limit' ) // $c->stash->{ limit } // 10; - my $offset = $c->param( '$page' ) ? ( $c->param( '$page' ) - 1 ) * $limit - : $c->param( '$offset' ) ? $c->param( '$offset' ) - : ( ( $c->stash->{page} // 1 ) - 1 ) * $limit; - $c->stash( page => int( $offset / $limit ) + 1 ); - my $opt = { - limit => $limit, - offset => $offset, - }; - - if ( my $order_by = $c->param( '$order_by' ) ) { - $opt->{order_by} = [ - map +{ "-" . ( $_->[1] ? $_->[0] : 'asc' ) => $_->[1] // $_->[0] }, - map +[ split /:/ ], - split /,/, $order_by - ]; - } - elsif ( $order_by = $c->stash( 'order_by' ) ) { - $opt->{order_by} = $order_by; - } - - my $schema = $c->yancy->schema( $schema_name ) ; - my $props = $schema->{properties}; - my %param_filter = (); - for my $key ( @{ $c->req->params->names } ) { - next unless exists $props->{ $key }; - my $type = $props->{$key}{type} || 'string'; - my $value = $c->param( $key ); - if ( is_type( $type, 'string' ) ) { - if ( ( $value =~ tr/*/%/ ) <= 0 ) { - $value = "\%$value\%"; - } - $param_filter{ $key } = { -like => $value }; - } - elsif ( grep is_type( $type, $_ ), qw(number integer) ) { - $param_filter{ $key } = $value ; - } - elsif ( is_type( $type, 'boolean' ) ) { - $param_filter{ ($value && $value ne 'false')? '-bool' : '-not_bool' } = $key; - } - elsif ( is_type($type, 'array') ) { - $param_filter{ $key } = { '-has' => $value }; - } - else { - die "Sorry type '" . - to_json( $type ) . - "' is not handled yet, only string|number|integer|boolean|array is supported." - } - } - my $filter = { - %param_filter, - # Stash filter always overrides param filter, for security - %{ $c->_resolve_filter }, - }; - if ( $c->param( '$match' ) && $c->param( '$match' ) eq 'any' ) { - $filter = [ - map +{ $_ => $filter->{ $_ } }, keys %$filter - ]; - } - - #; use Data::Dumper; - #; $c->app->log->info( Dumper $filter ); - #; $c->app->log->info( Dumper $opt ); - + my ( $schema_name, $filter, $opt ) = $c->_get_list_args; my $result = $c->yancy->backend->list( $schema_name, $filter, $opt ); for my $helper ( @{ $c->stash( 'before_render' ) // [] } ) { $c->$helper( $_ ) for @{ $result->{items} }; @@ -432,7 +364,7 @@ sub list { my $format = $c->stash( 'format' ); return $c->respond_to( json => sub { - $c->stash( json => { %$result, offset => $offset } ); + $c->stash( json => { %$result, offset => $opt->{offset} } ); }, any => sub { if ( !$c->stash( 'template' ) ) { @@ -441,7 +373,7 @@ sub list { $c->stash( ( format => $format )x!!$format, %$result, - total_pages => ceil( $result->{total} / $limit ), + total_pages => ceil( $result->{total} / $opt->{limit} ), ); }, ); @@ -983,6 +915,292 @@ sub delete { ); } +=method feed + + $routes->websocket( '/' )->to( + 'yancy#feed', + schema => $schema_name, + ); + +Subscribe to a feed of changes to the given schema. This first sends a list result +(like L would). Then it sends change messages. Change messages are JSON objects +with different fields based on the method of change: + + # An item in the list was changed + { + method => "set", + # The position of the changed item in the list, 0-based + index => 2, + item => { + # These are the fields that changed + name => 'Lars Fillmore', + }, + } + + # An item was added to the list + { + method => "create", + # The position of the new item in the list, 0-based + index => 0, + item => { + # The entire, newly-created item + # ... + }, + } + + # An item was removed from the list. This does not necessarily mean + # the item was removed from the database. + { + method => "delete", + # The position of the item removed from the list, 0-based + index => 0, + } + +B Allow the client to send change messages to the server. + +=head4 Input Stash + +This method uses the following stash values for configuration: + +=over + +=item schema + +The schema to use. Required. + +=item limit + +The number of items to show on the page. Defaults to C<10>. + +=item page + +The page number to show. Defaults to C<1>. The page number will +be used to calculate the C parameter to L. + +=item filter + +A hash reference of field/value pairs to filter the contents of the list +or a subref that generates this hash reference. The subref will be passed +the current controller object (C<$c>). + +This overrides any query filters and so can be used to enforce +authorization / security. + +=item order_by + +Set the default order for the items. Supports any L +C structure. + +=item before_render + +An array reference of hooks to call once for each item in the C list +before they are sent as messages. See L for usage. + +=back + +=head4 Query Params + +The following URL query parameters are allowed for this method: + +=over + +=item $page + +Instead of using the C stash value, you can use the C<$page> query +parameter to set the page. + +=item $offset + +Instead of using the C stash value, you can use the C<$offset> +query parameter to set the page offset. This is overridden by the +C<$page> query parameter. + +=item $limit + +Instead of using the C stash value, you can use the C<$limit> +query parameter to allow users to specify their own page size. + +=item $order_by + +One or more fields to order by. Can be specified as C<< >> or +C<< asc: >> to sort in ascending order or C<< desc: >> +to sort in descending order. + +=item $match + +How to match multiple field filters. Can be C or C (default +C). C means all fields must match for a row to be returned. +C means at least one field must match for a row to be returned. + +=item Additional Field Filters + +Any named query parameter that matches a field in the schema will be +used to further filter the results. The stash C will override +this filter, so that the stash C can be used for security. + +=back + +=cut + +sub feed { + my ( $c ) = @_; + $c->inactivity_timeout( 3600 ); + + # First, send the message for the initial page + my ( $schema_name, $filter, $opt ) = $c->_get_list_args; + my $result = $c->yancy->backend->list( $schema_name, $filter, $opt ); + for my $helper ( @{ $c->stash( 'before_render' ) // [] } ) { + $c->$helper( $_ ) for @{ $result->{items} }; + } + my $x_id_field = $c->yancy->schema( $schema_name )->{'x-id-field'} // 'id'; + my @id_fields = ref $x_id_field eq 'ARRAY' ? @$x_id_field : ( $x_id_field ); + #; $c->log->debug( 'Original result: ' . $c->dumper( $result ) ); + $c->send({ json => { %$result, method => 'list' } }); + + # Now, poll the database for updates every few seconds. + # XXX: Create Yancy::Plugin::PubSub to do push messaging instead of + # ugly polling... + my $id = Mojo::IOLoop->recurring( $c->stash( 'interval' ) // 10, sub { + my $new_result = $c->yancy->backend->list( $schema_name, $filter, $opt ); + #; $c->log->debug( 'New result: ' . $c->dumper( $new_result ) ); + my %seen_items; + my @created_items; + NEW_ITEM: for my $new_i ( 0..$#{ $new_result->{items} } ) { + my $new_item = $new_result->{items}[$new_i]; + # Loop through the old result to find the existing items by + # their ID fields + for my $old_i ( 0..$#{ $result->{items} } ) { + my $old_item = $result->{items}[$old_i]; + if ( @id_fields == grep { $new_item->{ $_ } eq $old_item->{ $_ } } @id_fields ) { + # Found it! + $seen_items{ $old_i }++; + my %diff = + map { $_ => $new_item->{ $_ } } + grep {; no warnings 'uninitialized'; $new_item->{ $_ } ne $old_item->{ $_ } } + keys %$new_item, keys %$old_item + ; + if ( keys %diff ) { + my $message = { + method => 'set', + index => $old_i, + item => \%diff, + }; + #$c->log->debug( $c->dumper( $message ) ); + $c->send({ json => $message }); + } + next NEW_ITEM; + } + } + # If we can't find the new item, it must have been added. + # Queue it up to send after deletes to maintain indexes. + push @created_items, { + method => 'create', + index => $new_i, + item => $new_item, + }; + } + # Any items we did not see must have been removed from the list, + # or pushed out by newly-created items. Send these in reverse to + # maintain indexes. + for my $old_i ( reverse grep { !$seen_items{ $_ } } 0..$#{ $result->{items} } ) { + my $message = { + method => 'delete', + index => $old_i, + }; + #$c->log->debug( $c->dumper( $message ) ); + $c->send({ json => $message }); + } + # Now we can send the created items, from lowest index to + # highest index + for my $item ( @created_items ) { + #$c->log->debug( $c->dumper( $item ) ); + $c->send({ json => $item }); + } + + $result = $new_result; + } ); + $c->on( finish => sub { Mojo::IOLoop->remove( $id ) } ); + # XXX: Allow client to send "list" message to change the parameters + # of the list. Respond with an entirely new result (not a diff). + # XXX: Allow client to send "create", "set", and "delete" messages + # to create, set, and delete items +} + +sub _get_list_args { + my ( $c ) = @_; + + if ( $c->stash( 'collection' ) ) { + derp '"collection" stash key is now "schema" in controller configuration'; + } + my $schema_name = $c->stash( 'schema' ) || $c->stash( 'collection' ) + || die "Schema name not defined in stash"; + my $limit = $c->param( '$limit' ) // $c->stash->{ limit } // 10; + my $offset = $c->param( '$page' ) ? ( $c->param( '$page' ) - 1 ) * $limit + : $c->param( '$offset' ) ? $c->param( '$offset' ) + : ( ( $c->stash->{page} // 1 ) - 1 ) * $limit; + $c->stash( page => int( $offset / $limit ) + 1 ); + my $opt = { + limit => $limit, + offset => $offset, + }; + + if ( my $order_by = $c->param( '$order_by' ) ) { + $opt->{order_by} = [ + map +{ "-" . ( $_->[1] ? $_->[0] : 'asc' ) => $_->[1] // $_->[0] }, + map +[ split /:/ ], + split /,/, $order_by + ]; + } + elsif ( $order_by = $c->stash( 'order_by' ) ) { + $opt->{order_by} = $order_by; + } + + my $schema = $c->yancy->schema( $schema_name ) ; + my $props = $schema->{properties}; + my %param_filter = (); + for my $key ( @{ $c->req->params->names } ) { + next unless exists $props->{ $key }; + my $type = $props->{$key}{type} || 'string'; + my $value = $c->param( $key ); + if ( is_type( $type, 'string' ) ) { + if ( ( $value =~ tr/*/%/ ) <= 0 ) { + $value = "\%$value\%"; + } + $param_filter{ $key } = { -like => $value }; + } + elsif ( grep is_type( $type, $_ ), qw(number integer) ) { + $param_filter{ $key } = $value ; + } + elsif ( is_type( $type, 'boolean' ) ) { + $param_filter{ ($value && $value ne 'false')? '-bool' : '-not_bool' } = $key; + } + elsif ( is_type($type, 'array') ) { + $param_filter{ $key } = { '-has' => $value }; + } + else { + die "Sorry type '" . + to_json( $type ) . + "' is not handled yet, only string|number|integer|boolean|array is supported." + } + } + my $filter = { + %param_filter, + # Stash filter always overrides param filter, for security + %{ $c->_resolve_filter }, + }; + if ( $c->param( '$match' ) && $c->param( '$match' ) eq 'any' ) { + $filter = [ + map +{ $_ => $filter->{ $_ } }, keys %$filter + ]; + } + + #; use Data::Dumper; + #; $c->app->log->info( Dumper $filter ); + #; $c->app->log->info( Dumper $opt ); + + return ( $schema_name, $filter, $opt ); +} + sub _resolve_filter { my ( $c ) = @_; my $filter = $c->stash( 'filter' ); diff --git a/t/controller/yancy.t b/t/controller/yancy.t index c0117e5d..4103ce66 100644 --- a/t/controller/yancy.t +++ b/t/controller/yancy.t @@ -1180,6 +1180,60 @@ subtest 'composite keys' => sub { }; }; +subtest 'feed' => sub { + my $t = build_app(); + + my $r = $t->app->routes; + $r->websocket( '/employees' )->to( + controller => 'yancy', + action => 'feed', + schema => 'employees', + interval => 1, + ); + + $t->websocket_ok( '/employees' ) + ->message_ok + ->json_message_is( '/method', 'list' ) + ->json_message_is( '/total', '5' ) + ->json_message_has( '/items' ) + ->json_message_has( '/items/0' ) + ->json_message_has( '/items/1' ) + ->json_message_has( '/items/2' ) + ->json_message_has( '/items/3' ) + ->json_message_has( '/items/4' ) + ->json_message_hasnt( '/items/5' ) + ; + + # Create an employee + my $id = $t->app->yancy->backend->create( employees => { + name => 'Philip J. Fry', + email => 'phil-2@example.com', + department => 'support', + } ); + $t->message_ok + ->json_message_is( '/method', 'create' ) + ->json_message_is( '/index', 5 ) + ->json_message_is( '/item/name', 'Philip J. Fry' ) + ; + + # Update the employee + $t->app->yancy->backend->set( employees => $id, { name => 'Lars Fillmore' } ); + $t->message_ok + ->json_message_is( '/method', 'set' ) + ->json_message_is( '/index', 5 ) + ->json_message_is( '/item', { name => 'Lars Fillmore' } ) + ; + + # Delete the employee + $t->app->yancy->backend->delete( employees => $id ); + $t->message_ok + ->json_message_is( '/method', 'delete' ) + ->json_message_is( '/index', 5 ) + ; + + $t->finish_ok; +}; + done_testing; sub build_app { diff --git a/t/fixtures/basic/dbic.pl b/t/fixtures/basic/dbic.pl index e0ca71b5..e2d060da 100644 --- a/t/fixtures/basic/dbic.pl +++ b/t/fixtures/basic/dbic.pl @@ -15,7 +15,7 @@ package Local::Schema::Result::employees { ssn => { data_type => 'varchar', size => 11, - is_nullable => 0, + is_nullable => 1, }, department => { data_type => 'enum', diff --git a/t/fixtures/basic/mysql.sql b/t/fixtures/basic/mysql.sql index 1739bede..13147e2b 100644 --- a/t/fixtures/basic/mysql.sql +++ b/t/fixtures/basic/mysql.sql @@ -2,7 +2,7 @@ DROP TABLE IF EXISTS employees; CREATE TABLE employees ( employee_id INTEGER PRIMARY KEY AUTO_INCREMENT, name VARCHAR(255) NOT NULL, - ssn VARCHAR(11) NOT NULL, + ssn VARCHAR(11) DEFAULT NULL, department ENUM( 'unknown', 'admin', 'support', 'development', 'sales' ) NOT NULL DEFAULT 'unknown', salary INTEGER NOT NULL DEFAULT 0, email VARCHAR(255) NOT NULL, diff --git a/t/fixtures/basic/pg.sql b/t/fixtures/basic/pg.sql index 2bfc5109..722dce34 100644 --- a/t/fixtures/basic/pg.sql +++ b/t/fixtures/basic/pg.sql @@ -6,7 +6,7 @@ CREATE TYPE department_type AS ENUM ( 'unknown', 'admin', 'support', 'developmen CREATE TABLE employees ( employee_id SERIAL PRIMARY KEY, name VARCHAR(255) NOT NULL, - ssn VARCHAR(11) NOT NULL, + ssn VARCHAR(11) DEFAULT NULL, department department_type NOT NULL DEFAULT 'unknown', salary INTEGER NOT NULL DEFAULT 0, email VARCHAR(255) NOT NULL, diff --git a/t/fixtures/basic/sqlite.sql b/t/fixtures/basic/sqlite.sql index bed446fb..3a322e72 100644 --- a/t/fixtures/basic/sqlite.sql +++ b/t/fixtures/basic/sqlite.sql @@ -2,7 +2,7 @@ DROP TABLE IF EXISTS employees; CREATE TABLE employees ( employee_id INTEGER PRIMARY KEY AUTOINCREMENT, name VARCHAR(255) NOT NULL, - ssn VARCHAR(11) NOT NULL, + ssn VARCHAR(11) DEFAULT NULL, department VARCHAR(50) NOT NULL CHECK( department IN ( 'unknown', 'admin', 'support', 'development', 'sales' ) ) DEFAULT 'unknown', salary INTEGER NOT NULL DEFAULT 0, email VARCHAR(255) NOT NULL,