From d065e354b18eb1b641cd8da238f87039886d3c9c Mon Sep 17 00:00:00 2001 From: toga4 <81744248+toga4@users.noreply.github.com> Date: Sun, 8 Jan 2023 14:04:29 +0900 Subject: [PATCH 1/3] Make option type public --- subscriber.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/subscriber.go b/subscriber.go index e55f838..2d4066f 100644 --- a/subscriber.go +++ b/subscriber.go @@ -46,7 +46,7 @@ type config struct { spannerRequestPriority spannerpb.RequestOptions_Priority } -type option interface { +type Option interface { Apply(*config) } @@ -60,7 +60,7 @@ func (o withStartTimestamp) Apply(c *config) { // // The value must be within the retention period of the change stream and before the current time. // Default value is current timestamp. -func WithStartTimestamp(startTimestamp time.Time) option { +func WithStartTimestamp(startTimestamp time.Time) Option { return withStartTimestamp(startTimestamp) } @@ -74,7 +74,7 @@ func (o withEndTimestamp) Apply(c *config) { // // The value must be within the retention period of the change stream and must be after the start timestamp. // If not set, read latest changes until canceled. -func WithEndTimestamp(endTimestamp time.Time) option { +func WithEndTimestamp(endTimestamp time.Time) Option { return withEndTimestamp(endTimestamp) } @@ -87,7 +87,7 @@ func (o withHeartbeatInterval) Apply(c *config) { // WithHeartbeatInterval set the heartbeat interval for read change streams. // // Default value is 10 seconds. -func WithHeartbeatInterval(heartbeatInterval time.Duration) option { +func WithHeartbeatInterval(heartbeatInterval time.Duration) Option { return withHeartbeatInterval(heartbeatInterval) } @@ -100,7 +100,7 @@ func (o withSpannerRequestPriotiry) Apply(c *config) { // WithSpannerRequestPriotiry set the request priority option for read change streams. // // Default value is unspecified, equivalent to high. -func WithSpannerRequestPriotiry(priority spannerpb.RequestOptions_Priority) option { +func WithSpannerRequestPriotiry(priority spannerpb.RequestOptions_Priority) Option { return withSpannerRequestPriotiry(priority) } @@ -116,7 +116,7 @@ func NewSubscriber( client *spanner.Client, streamName string, partitionStorage PartitionStorage, - options ...option, + options ...Option, ) *Subscriber { c := &config{ startTimestamp: nowFunc(), From 3d926bdc4c3495938eb842dffc23206b0b2fe442 Mon Sep 17 00:00:00 2001 From: toga4 <81744248+toga4@users.noreply.github.com> Date: Sun, 8 Jan 2023 14:18:39 +0900 Subject: [PATCH 2/3] Add CLI comamnd --- cmd/spream/main.go | 183 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 183 insertions(+) create mode 100644 cmd/spream/main.go diff --git a/cmd/spream/main.go b/cmd/spream/main.go new file mode 100644 index 0000000..58299f6 --- /dev/null +++ b/cmd/spream/main.go @@ -0,0 +1,183 @@ +package main + +import ( + "context" + "encoding/json" + "errors" + "flag" + "fmt" + "io" + "os" + "os/signal" + "sync" + "time" + + "cloud.google.com/go/spanner" + "cloud.google.com/go/spanner/apiv1/spannerpb" + "github.com/toga4/spream" + "github.com/toga4/spream/partitionstorage" +) + +type flags struct { + database string + streamName string + startTimestamp time.Time + endTimestamp time.Time + heartbeatInterval time.Duration + priority spannerpb.RequestOptions_Priority + metadataTableName string + metadataDatabase string +} + +const ( + priorityHigh = "high" + priorityMedium = "medium" + priorityLow = "low" +) + +func parseFlags(cmd string, args []string) (*flags, error) { + var flags flags + + fs := flag.NewFlagSet(cmd, flag.ExitOnError) + fs.Usage = func() { + fmt.Fprintf(os.Stderr, `Usage: %s [OPTIONS...] + +Options: + -d, --database (required) Database name of change stream with the form 'projects/PROJECT_ID/instances/INSTANCE_ID/databases/DATABASE_ID'. + -s, --stream (required) Change stream name + -t, --metadata-table Table name for partition metadata (default: store partition metadata on memory, not Cloud Spanner) + --start Start timestamp with RFC3339 format (default: current timestamp) + --end End timestamp with RFC3339 format (default: indefinite) + --heartbeat-interval Heartbeat interval with time.Duration format (default: 10s) + --priority [high|medium|low] Request priority for Cloud Spanner (default: high) + --metadata-database Database name of partition metadata table (default: same as database option) + -h, --help Print this message + +`, cmd) + } + + fs.StringVar(&flags.database, "d", "", "") + fs.StringVar(&flags.streamName, "s", "", "") + fs.StringVar(&flags.metadataTableName, "t", "", "") + + fs.StringVar(&flags.database, "database", "", "") + fs.StringVar(&flags.streamName, "stream", "", "") + fs.StringVar(&flags.metadataTableName, "metadata-table", flags.database, "") + fs.StringVar(&flags.metadataDatabase, "metadata-database", flags.database, "") + fs.DurationVar(&flags.heartbeatInterval, "heartbeat-interval", 10*time.Second, "") + + var start, end, priority string + fs.StringVar(&start, "start", "", "") + fs.StringVar(&end, "end", "", "") + fs.StringVar(&priority, "priority", "", "") + + if err := fs.Parse(args); err != nil { + return nil, err + } + + if flags.database == "" || flags.streamName == "" { + fs.Usage() + return nil, errors.New("database and stream is required") + } + + if start != "" { + t, err := time.Parse(time.RFC3339, start) + if err != nil { + fs.Usage() + return nil, fmt.Errorf("invalid start timestamp: %v", err) + } + flags.startTimestamp = t + } + if end != "" { + t, err := time.Parse(time.RFC3339, end) + if err != nil { + fs.Usage() + return nil, fmt.Errorf("invalid end timestamp: %v", err) + } + flags.startTimestamp = t + } + if priority != "" { + switch priority { + case priorityHigh: + flags.priority = spannerpb.RequestOptions_PRIORITY_HIGH + case priorityMedium: + flags.priority = spannerpb.RequestOptions_PRIORITY_MEDIUM + case priorityLow: + flags.priority = spannerpb.RequestOptions_PRIORITY_LOW + default: + fs.Usage() + return nil, fmt.Errorf("invalid priority: %v", priority) + } + } + + return &flags, nil +} + +type jsonOutputConsumer struct { + out io.Writer + mu sync.Mutex +} + +func (l *jsonOutputConsumer) Consume(change *spream.DataChangeRecord) error { + l.mu.Lock() + defer l.mu.Unlock() + return json.NewEncoder(l.out).Encode(change) +} + +func main() { + flags, err := parseFlags(os.Args[0], os.Args[1:]) + if err != nil { + fmt.Fprintln(os.Stderr, err) + os.Exit(1) + } + + ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, os.Kill) + defer stop() + + spannerClient, err := spanner.NewClient(ctx, flags.database) + if err != nil { + fmt.Fprintln(os.Stderr, err) + os.Exit(1) + } + defer spannerClient.Close() + + var partitionStorage spream.PartitionStorage + if flags.metadataTableName == "" { + partitionStorage = partitionstorage.NewInmemory() + } else { + metadataSpannerClient, err := spanner.NewClient(ctx, flags.metadataDatabase) + if err != nil { + fmt.Fprintln(os.Stderr, err) + os.Exit(1) + } + ps := partitionstorage.NewSpanner(metadataSpannerClient, flags.metadataTableName) + if err := ps.CreateTableIfNotExists(ctx); err != nil { + fmt.Fprintln(os.Stderr, err) + os.Exit(1) + } + partitionStorage = ps + } + + options := []spream.Option{} + if !flags.startTimestamp.IsZero() { + options = append(options, spream.WithStartTimestamp(flags.startTimestamp)) + } + if !flags.endTimestamp.IsZero() { + options = append(options, spream.WithEndTimestamp(flags.endTimestamp)) + } + if flags.heartbeatInterval != 0 { + options = append(options, spream.WithHeartbeatInterval(flags.heartbeatInterval)) + } + if flags.priority != spannerpb.RequestOptions_PRIORITY_UNSPECIFIED { + options = append(options, spream.WithSpannerRequestPriotiry(flags.priority)) + } + + subscriber := spream.NewSubscriber(spannerClient, flags.streamName, partitionStorage, options...) + consumer := &jsonOutputConsumer{out: os.Stdout} + + fmt.Fprintln(os.Stderr, "Waiting changes...") + if err := subscriber.Subscribe(ctx, consumer); err != nil && !errors.Is(ctx.Err(), context.Canceled) { + fmt.Fprintln(os.Stderr, err) + os.Exit(1) + } +} From 26534933f6eef57202b1971933a3adccaa28c310 Mon Sep 17 00:00:00 2001 From: toga4 <81744248+toga4@users.noreply.github.com> Date: Sun, 8 Jan 2023 15:12:25 +0900 Subject: [PATCH 3/3] Update README --- README.md | 48 ++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 46 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 9ac5202..7e2b771 100644 --- a/README.md +++ b/README.md @@ -7,14 +7,14 @@ Cloud Spanner Change Streams Subscriber for Go ### Sypnosis -This library is an implementation to subscribe a change stream's records of Google Spanner in Go. +This library is an implementation to subscribe a change stream's records of Google Cloud Spanner in Go. It is heavily inspired by the SpannerIO connector of the [Apache Beam SDK](https://github.com/apache/beam) and is compatible with the PartitionMetadata data model. ### Motivation To read a change streams, Google Cloud offers [Dataflow connector](https://cloud.google.com/spanner/docs/change-streams/use-dataflow) as a scalable and reliable solution, but in some cases the abstraction and capabilities of Dataflow pipelines can be too much (or is simply too expensive). For more flexibility, use the change stream API directly, but it is a bit complex. -This library aims to make reading change streams more flexible and casual to use. +This library aims to make reading change streams more flexible and casual, while maintaining an easily transition to the use of Dataflow connectors as needed. ## Example Usage @@ -74,3 +74,47 @@ func (l *Logger) Consume(change *spream.DataChangeRecord) error { return json.NewEncoder(l.out).Encode(change) } ``` + +## CLI + +Use the CLI as a tool for tracking change streams or as a more detailed implementation example. + +### Installation + +```console +$ go install github.com/toga4/spream/cmd/spream@latest +``` + +### Usage + +``` +Usage: spream [OPTIONS...] + +Options: + -d, --database (required) Database name of change stream with the form 'projects/PROJECT_ID/instances/INSTANCE_ID/databases/DATABASE_ID'. + -s, --stream (required) Change stream name + -t, --metadata-table Table name for partition metadata (default: store partition metadata on memory, not Cloud Spanner) + --start Start timestamp with RFC3339 format (default: current timestamp) + --end End timestamp with RFC3339 format (default: indefinite) + --heartbeat-interval Heartbeat interval with time.Duration format (default: 10s) + --priority [high|medium|low] Request priority for Cloud Spanner (default: high) + --metadata-database Database name of partition metadata table (default: same as database option) + -h, --help Print this message +``` + +### Example + +``` +$ spream -d projects/my-project/instances/my-instance/databases/my-database -s SingerStream +Waiting changes... +{"commit_timestamp":"2023-01-08T05:47:57.998479Z","record_sequence":"00000000","server_transaction_id":"ODIzNDU0OTc2NzUxOTc0NTU1OQ==","is_last_record_in_transaction_in_partition":true,"table_name":"Singers","column_types":[{"name":"SingerId","type":{"code":"INT64"},"is_primary_key":true,"ordinal_position":1},{"name":"Name","type":{"code":"STRING"},"ordinal_position":2}],"mods":[{"keys":{"SingerId":"1"},"new_values":{"Name":"foo"}}],"mod_type":"INSERT","value_capture_type":"OLD_AND_NEW_VALUES","number_of_records_in_transaction":1,"number_of_partitions_in_transaction":1,"transaction_tag":"","is_system_transaction":false} +{"commit_timestamp":"2023-01-08T05:47:58.766575Z","record_sequence":"00000000","server_transaction_id":"MjQ3ODQzMDcxOTMwNjcyODg4Nw==","is_last_record_in_transaction_in_partition":true,"table_name":"Singers","column_types":[{"name":"SingerId","type":{"code":"INT64"},"is_primary_key":true,"ordinal_position":1},{"name":"Name","type":{"code":"STRING"},"ordinal_position":2}],"mods":[{"keys":{"SingerId":"1"},"new_values":{"Name":"bar"},"old_values":{"Name":"foo"}}],"mod_type":"UPDATE","value_capture_type":"OLD_AND_NEW_VALUES","number_of_records_in_transaction":1,"number_of_partitions_in_transaction":1,"transaction_tag":"","is_system_transaction":false} +{"commit_timestamp":"2023-01-08T05:47:59.117807Z","record_sequence":"00000000","server_transaction_id":"ODkwNDMzNDgxMDU2NzAwMDM2MA==","is_last_record_in_transaction_in_partition":true,"table_name":"Singers","column_types":[{"name":"SingerId","type":{"code":"INT64"},"is_primary_key":true,"ordinal_position":1},{"name":"Name","type":{"code":"STRING"},"ordinal_position":2}],"mods":[{"keys":{"SingerId":"1"},"old_values":{"Name":"bar"}}],"mod_type":"DELETE","value_capture_type":"OLD_AND_NEW_VALUES","number_of_records_in_transaction":1,"number_of_partitions_in_transaction":1,"transaction_tag":"","is_system_transaction":false} +``` + +## Credits + +Heavily inspired by below projects. + +- The SpannerIO connector of the Apache Beam SDK. (https://github.com/apache/beam) +- spanner-change-streams-tail (https://github.com/cloudspannerecosystem/spanner-change-streams-tail)