From 821d49541eb63c50eb28e00dcf41c376aef08d3f Mon Sep 17 00:00:00 2001 From: James Pozdena Date: Sat, 25 May 2024 13:40:56 -0600 Subject: [PATCH] clean up --- Makefile | 10 +----- _dev/dev.pb.go | 4 +-- _examples/basic/jobtypes.pb.go | 23 ++++++------ _examples/basic/jobtypes.proto | 4 +-- _examples/basic/work.go | 64 ++++++++++++++++++++++++++++++---- fixtures/basic.pb.go | 4 +-- fixtures/nested.pb.go | 4 +-- wire/jobs.pb.go | 4 +-- 8 files changed, 81 insertions(+), 36 deletions(-) diff --git a/Makefile b/Makefile index 0ec24a0..3318de3 100644 --- a/Makefile +++ b/Makefile @@ -74,7 +74,7 @@ gen: ## Generate protobuf models go generate ./... .PHONY: gen_protos -gen_protos: gen_go_protos gen_python_protos ## Generate protobuf models +gen_protos: gen_go_protos ## Generate protobuf models .PHONY: gen_go_protos @@ -84,14 +84,6 @@ gen_go_protos: --go-grpc_out=. --go-grpc_opt=paths=source_relative \ ${PROTO_FILES} -gen_python_protos: - python -m grpc_tools.protoc -I wire --python_out=./libs/py/conveyor/wire --grpc_python_out=./libs/py/conveyor/wire wire/jobs.proto - protol \ - --create-package \ - --in-place \ - --python-out ./libs/py/conveyor/wire \ - protoc --proto-path=./wire wire/jobs.proto - ## Test .PHONY: test test: ## Run tests diff --git a/_dev/dev.pb.go b/_dev/dev.pb.go index 5f32042..e9839f9 100644 --- a/_dev/dev.pb.go +++ b/_dev/dev.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.32.0 -// protoc v4.25.3 +// protoc-gen-go v1.34.1 +// protoc v5.26.1 // source: _dev/dev.proto package main diff --git a/_examples/basic/jobtypes.pb.go b/_examples/basic/jobtypes.pb.go index 176c3cf..f2ff0f0 100644 --- a/_examples/basic/jobtypes.pb.go +++ b/_examples/basic/jobtypes.pb.go @@ -1,10 +1,10 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.32.0 -// protoc v4.25.3 +// protoc-gen-go v1.34.1 +// protoc v5.26.1 // source: _examples/basic/jobtypes.proto -package basic +package main import ( protoreflect "google.golang.org/protobuf/reflect/protoreflect" @@ -80,13 +80,14 @@ var File___examples_basic_jobtypes_proto protoreflect.FileDescriptor var file___examples_basic_jobtypes_proto_rawDesc = []byte{ 0x0a, 0x1e, 0x5f, 0x65, 0x78, 0x61, 0x6d, 0x70, 0x6c, 0x65, 0x73, 0x2f, 0x62, 0x61, 0x73, 0x69, 0x63, 0x2f, 0x6a, 0x6f, 0x62, 0x74, 0x79, 0x70, 0x65, 0x73, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, - 0x12, 0x05, 0x62, 0x61, 0x73, 0x69, 0x63, 0x22, 0x34, 0x0a, 0x08, 0x42, 0x61, 0x73, 0x69, 0x63, - 0x4a, 0x6f, 0x62, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, - 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x63, 0x6f, 0x75, 0x6e, 0x74, - 0x18, 0x02, 0x20, 0x01, 0x28, 0x05, 0x52, 0x05, 0x63, 0x6f, 0x75, 0x6e, 0x74, 0x42, 0x29, 0x5a, - 0x27, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x6a, 0x70, 0x6f, 0x7a, - 0x2f, 0x63, 0x6f, 0x6e, 0x76, 0x65, 0x79, 0x6f, 0x72, 0x2f, 0x65, 0x78, 0x61, 0x6d, 0x70, 0x6c, - 0x65, 0x73, 0x2f, 0x62, 0x61, 0x73, 0x69, 0x63, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x12, 0x04, 0x6d, 0x61, 0x69, 0x6e, 0x22, 0x34, 0x0a, 0x08, 0x42, 0x61, 0x73, 0x69, 0x63, 0x4a, + 0x6f, 0x62, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, + 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x63, 0x6f, 0x75, 0x6e, 0x74, 0x18, + 0x02, 0x20, 0x01, 0x28, 0x05, 0x52, 0x05, 0x63, 0x6f, 0x75, 0x6e, 0x74, 0x42, 0x2f, 0x5a, 0x2d, + 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x6a, 0x70, 0x6f, 0x7a, 0x2f, + 0x63, 0x6f, 0x6e, 0x76, 0x65, 0x79, 0x6f, 0x72, 0x2f, 0x5f, 0x65, 0x78, 0x61, 0x6d, 0x70, 0x6c, + 0x65, 0x73, 0x2f, 0x62, 0x61, 0x73, 0x69, 0x63, 0x2f, 0x6d, 0x61, 0x69, 0x6e, 0x62, 0x06, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -103,7 +104,7 @@ func file___examples_basic_jobtypes_proto_rawDescGZIP() []byte { var file___examples_basic_jobtypes_proto_msgTypes = make([]protoimpl.MessageInfo, 1) var file___examples_basic_jobtypes_proto_goTypes = []interface{}{ - (*BasicJob)(nil), // 0: basic.BasicJob + (*BasicJob)(nil), // 0: main.BasicJob } var file___examples_basic_jobtypes_proto_depIdxs = []int32{ 0, // [0:0] is the sub-list for method output_type diff --git a/_examples/basic/jobtypes.proto b/_examples/basic/jobtypes.proto index 95bbe5e..6d5c07a 100644 --- a/_examples/basic/jobtypes.proto +++ b/_examples/basic/jobtypes.proto @@ -1,7 +1,7 @@ syntax = "proto3"; -package basic; +package main; -option go_package = "github.com/jpoz/conveyor/examples/basic"; +option go_package = "github.com/jpoz/conveyor/_examples/basic/main"; message BasicJob { string name = 1; diff --git a/_examples/basic/work.go b/_examples/basic/work.go index 4e73b3c..363f807 100644 --- a/_examples/basic/work.go +++ b/_examples/basic/work.go @@ -1,16 +1,68 @@ -package basic +package main import ( "context" + "fmt" "log/slog" + "os" + sync "sync" + + "github.com/jpoz/conveyor" + "github.com/jpoz/conveyor/config" ) -func RunBasicJob(ctx context.Context, arg *BasicJob) error { - slog.Info("starting job", slog.String("name", arg.Name), slog.Int("count", int(arg.Count))) +func main() { + cfg := &config.Project{ + RedisURL: os.Getenv("REDIS_URL"), + Namespace: "TestBasic", + Logger: slog.Default(), + } + + ctx := context.Background() + + worker, err := conveyor.NewWorker(cfg) + if err != nil { + panic(err) + } + + var wg sync.WaitGroup + RunBasicJob := func(ctx context.Context, arg *BasicJob) error { + slog.Info("starting job", slog.String("name", arg.Name), slog.Int("count", int(arg.Count))) + + for i := int32(0); i < arg.Count; i++ { + slog.Info("running job", slog.String("name", arg.Name), slog.Int("count", int(i+1))) + } - for i := int32(0); i < arg.Count; i++ { - slog.Info("running job", slog.String("name", arg.Name), slog.Int("count", int(i))) + wg.Done() + return nil } - return nil + worker.RegisterJobs(RunBasicJob) + + go func() { + fmt.Println("running worker") + err = worker.Run(ctx) + if err != nil { + panic(err) + } + }() + + client, err := conveyor.NewClient(cfg) + if err != nil { + return + } + + wg.Add(1) + result, err := client.Enqueue(ctx, &BasicJob{ + Name: "test", + Count: 5, + }) + if err != nil { + panic(err) + } + + fmt.Println("wait") + wg.Wait() + + fmt.Printf("result id: %v\n", result.Uuid) } diff --git a/fixtures/basic.pb.go b/fixtures/basic.pb.go index 786b95b..185bac7 100644 --- a/fixtures/basic.pb.go +++ b/fixtures/basic.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.32.0 -// protoc v4.25.3 +// protoc-gen-go v1.34.1 +// protoc v5.26.1 // source: fixtures/basic.proto package fixtures diff --git a/fixtures/nested.pb.go b/fixtures/nested.pb.go index 4d9c101..c8c812a 100644 --- a/fixtures/nested.pb.go +++ b/fixtures/nested.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.32.0 -// protoc v4.25.3 +// protoc-gen-go v1.34.1 +// protoc v5.26.1 // source: fixtures/nested.proto package fixtures diff --git a/wire/jobs.pb.go b/wire/jobs.pb.go index ffc7054..a5baa60 100644 --- a/wire/jobs.pb.go +++ b/wire/jobs.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.32.0 -// protoc v4.25.3 +// protoc-gen-go v1.34.1 +// protoc v5.26.1 // source: wire/jobs.proto package wire