From c73cd173e144d0423466b4a0e4f628792ab588f0 Mon Sep 17 00:00:00 2001 From: "pengfei.ji" Date: Mon, 21 Nov 2022 19:07:48 +0800 Subject: [PATCH] feat: add tx support and rename package to mou --- mongu/tx.go | 53 ------------------- {mongu => mou}/aggregate.go | 2 +- {mongu => mou}/aggregate_pipelines_builder.go | 2 +- {mongu => mou}/count.go | 2 +- {mongu => mou}/delete.go | 2 +- {mongu => mou}/distinct.go | 2 +- {mongu => mou}/find.go | 2 +- {mongu => mou}/index.go | 2 +- {mongu => mou}/insert.go | 4 +- mongu/mongu.go => mou/mou.go | 9 +++- {mongu => mou}/replace.go | 2 +- mou/tx.go | 44 +++++++++++++++ {mongu => mou}/update.go | 2 +- {mongu => mou}/utils.go | 4 +- {mongu => mou}/watch.go | 2 +- 15 files changed, 66 insertions(+), 68 deletions(-) delete mode 100644 mongu/tx.go rename {mongu => mou}/aggregate.go (99%) rename {mongu => mou}/aggregate_pipelines_builder.go (99%) rename {mongu => mou}/count.go (98%) rename {mongu => mou}/delete.go (98%) rename {mongu => mou}/distinct.go (98%) rename {mongu => mou}/find.go (99%) rename {mongu => mou}/index.go (98%) rename {mongu => mou}/insert.go (90%) rename mongu/mongu.go => mou/mou.go (96%) rename {mongu => mou}/replace.go (98%) create mode 100644 mou/tx.go rename {mongu => mou}/update.go (99%) rename {mongu => mou}/utils.go (82%) rename {mongu => mou}/watch.go (99%) diff --git a/mongu/tx.go b/mongu/tx.go deleted file mode 100644 index 299c914..0000000 --- a/mongu/tx.go +++ /dev/null @@ -1,53 +0,0 @@ -/* - * @Descripttion: - * @version: - * @Author: pengfei - * @Date: 2021-07-01 08:31:06 - */ -package mongu - -import ( - "log" - - "go.mongodb.org/mongo-driver/mongo" -) - -func CommitWithRetry(sctx mongo.SessionContext) error { - for { - err := sctx.CommitTransaction(sctx) - switch e := err.(type) { - case nil: - log.Println("Transaction committed.") - return nil - case mongo.CommandError: - // Can retry commit - if e.HasErrorLabel("UnknownTransactionCommitResult") { - log.Println("UnknownTransactionCommitResult, retrying commit operation...") - continue - } - log.Println("Error during commit...") - return e - default: - log.Println("Error during commit...") - return e - } - } -} - -func RunTransactionWithRetry(sctx mongo.SessionContext, txnFn func(mongo.SessionContext) error) error { - for { - err := txnFn(sctx) // Performs transaction. - if err == nil { - return nil - } - - log.Println("Transaction aborted. Caught exception during transaction.") - - // If transient error, retry the whole transaction - if cmdErr, ok := err.(mongo.CommandError); ok && cmdErr.HasErrorLabel("TransientTransactionError") { - log.Println("TransientTransactionError, retrying transaction...") - continue - } - return err - } -} diff --git a/mongu/aggregate.go b/mou/aggregate.go similarity index 99% rename from mongu/aggregate.go rename to mou/aggregate.go index 0dd80a7..caece69 100644 --- a/mongu/aggregate.go +++ b/mou/aggregate.go @@ -5,7 +5,7 @@ * @Date: 2021-07-01 08:31:06 */ -package mongu +package mou import ( "context" diff --git a/mongu/aggregate_pipelines_builder.go b/mou/aggregate_pipelines_builder.go similarity index 99% rename from mongu/aggregate_pipelines_builder.go rename to mou/aggregate_pipelines_builder.go index 8687732..c174983 100644 --- a/mongu/aggregate_pipelines_builder.go +++ b/mou/aggregate_pipelines_builder.go @@ -5,7 +5,7 @@ * @Date: 2021-10-13 08:31:06 */ -package mongu +package mou import ( "fmt" diff --git a/mongu/count.go b/mou/count.go similarity index 98% rename from mongu/count.go rename to mou/count.go index 5f39e47..509b3e4 100644 --- a/mongu/count.go +++ b/mou/count.go @@ -5,7 +5,7 @@ * @Date: 2021-07-01 08:31:06 */ -package mongu +package mou import ( "context" diff --git a/mongu/delete.go b/mou/delete.go similarity index 98% rename from mongu/delete.go rename to mou/delete.go index 9523249..4bbc2de 100644 --- a/mongu/delete.go +++ b/mou/delete.go @@ -4,7 +4,7 @@ * @Author: pengfei * @Date: 2021-07-01 08:31:06 */ -package mongu +package mou import ( "context" diff --git a/mongu/distinct.go b/mou/distinct.go similarity index 98% rename from mongu/distinct.go rename to mou/distinct.go index b68f638..f878953 100644 --- a/mongu/distinct.go +++ b/mou/distinct.go @@ -5,7 +5,7 @@ * @Date: 2021-07-01 08:31:06 */ -package mongu +package mou import ( "context" diff --git a/mongu/find.go b/mou/find.go similarity index 99% rename from mongu/find.go rename to mou/find.go index 420d692..06dad1a 100644 --- a/mongu/find.go +++ b/mou/find.go @@ -5,7 +5,7 @@ * @Date: 2021-07-01 08:31:06 */ -package mongu +package mou import ( "context" diff --git a/mongu/index.go b/mou/index.go similarity index 98% rename from mongu/index.go rename to mou/index.go index 1cf3765..dc33b7e 100644 --- a/mongu/index.go +++ b/mou/index.go @@ -5,7 +5,7 @@ * @Date: 2021-07-01 08:31:06 */ -package mongu +package mou import ( "context" diff --git a/mongu/insert.go b/mou/insert.go similarity index 90% rename from mongu/insert.go rename to mou/insert.go index 7540793..6c4fc61 100644 --- a/mongu/insert.go +++ b/mou/insert.go @@ -4,7 +4,7 @@ * @Author: pengfei * @Date: 2021-07-01 08:31:06 */ -package mongu +package mou import ( "context" @@ -36,7 +36,7 @@ type InsertManyObject struct { opts *options.InsertManyOptions } -//If true, no writes will be executed after one fails. The default value is true. +// If true, no writes will be executed after one fails. The default value is true. func (o *InsertManyObject) Ordered(ordered bool) *InsertManyObject { o.opts.SetOrdered(!ordered) return o diff --git a/mongu/mongu.go b/mou/mou.go similarity index 96% rename from mongu/mongu.go rename to mou/mou.go index 32f8e21..dbe77a5 100644 --- a/mongu/mongu.go +++ b/mou/mou.go @@ -41,7 +41,7 @@ err := coll.FindOneByProjectID(ctx).Where(bson.M{"_id": ID}).Do(&wfInfo) */ -package mongu +package mou import ( "context" @@ -203,3 +203,10 @@ func (o Coll) WatchChangeStream(ctx context.Context) *ChangeStreamWatchObject { opts: options.ChangeStream(), } } + +func (o Coll) Tx(ctx context.Context) *TxObject { + return &TxObject{ + ctx: ctx, + client: o.coll.Database().Client(), + } +} diff --git a/mongu/replace.go b/mou/replace.go similarity index 98% rename from mongu/replace.go rename to mou/replace.go index 872228f..9d69fc8 100644 --- a/mongu/replace.go +++ b/mou/replace.go @@ -5,7 +5,7 @@ * @Date: 2021-07-01 08:31:06 */ -package mongu +package mou import ( "context" diff --git a/mou/tx.go b/mou/tx.go new file mode 100644 index 0000000..3d526ff --- /dev/null +++ b/mou/tx.go @@ -0,0 +1,44 @@ +/* + * @Descripttion: + * @version: + * @Author: pengfei + * @Date: 2021-07-01 08:31:06 + */ +package mou + +import ( + "context" + + "go.mongodb.org/mongo-driver/mongo" +) + +type TxFn = func(sessCtx mongo.SessionContext) (interface{}, error) +type TxObject struct { + ctx context.Context + client *mongo.Client +} + +func (o *TxObject) Run(fn TxFn) error { + return RunInCtxTransaction(o.ctx, o.client, fn) +} + +func RunInCtxTransaction(ctx context.Context, client *mongo.Client, callback TxFn) error { + session, err := client.StartSession() + if err != nil { + return err + } + defer session.EndSession(ctx) + _, err = session.WithTransaction(ctx, callback) + return err +} + +func RunInTransaction(client *mongo.Client, callback TxFn) error { + session, err := client.StartSession() + if err != nil { + return err + } + ctx := context.Background() + defer session.EndSession(ctx) + _, err = session.WithTransaction(ctx, callback) + return err +} diff --git a/mongu/update.go b/mou/update.go similarity index 99% rename from mongu/update.go rename to mou/update.go index 506d710..f5db512 100644 --- a/mongu/update.go +++ b/mou/update.go @@ -5,7 +5,7 @@ * @Date: 2021-07-01 08:31:06 */ -package mongu +package mou import ( "context" diff --git a/mongu/utils.go b/mou/utils.go similarity index 82% rename from mongu/utils.go rename to mou/utils.go index 2715638..960b7e1 100644 --- a/mongu/utils.go +++ b/mou/utils.go @@ -1,8 +1,8 @@ -package mongu +package mou import "go.mongodb.org/mongo-driver/bson" -//MergeBsonM ignores non bson.M type +// MergeBsonM ignores non bson.M type func MergeBsonM(ms ...interface{}) bson.M { var result = bson.M{} for _, one := range ms { diff --git a/mongu/watch.go b/mou/watch.go similarity index 99% rename from mongu/watch.go rename to mou/watch.go index a47be9d..663324c 100644 --- a/mongu/watch.go +++ b/mou/watch.go @@ -1,4 +1,4 @@ -package mongu +package mou import ( "context"