Skip to content

Commit

Permalink
Merge pull request #12 from breuHQ:dispatch
Browse files Browse the repository at this point in the history
chore(): updated structures and supporting structures
  • Loading branch information
debuggerpk authored Oct 4, 2024
2 parents 1dfa5d9 + e8c892c commit a5d4ed5
Show file tree
Hide file tree
Showing 5 changed files with 179 additions and 88 deletions.
68 changes: 68 additions & 0 deletions dispatch/defaults.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// Package dispatch provides helper functions for configuring workflow.Context objects with default activity options.
package dispatch

import (
"time"

"go.temporal.io/sdk/temporal"
"go.temporal.io/sdk/workflow"

"go.breu.io/durex/queues"
)

// WithDefaultActivityContext returns a workflow.Context with the default activity options applied.
// The default options include a StartToCloseTimeout of 60 seconds.
//
// Example:
//
// ctx = shared.WithDefaultActivityContext(ctx)
func WithDefaultActivityContext(ctx workflow.Context) workflow.Context {
return workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: 60 * time.Second,
})
}

// WithIgnoredErrorsContext returns a workflow.Context with activity options configured with a
// StartToCloseTimeout of 60 seconds and a RetryPolicy that allows a single attempt and ignores
// specified error types.
//
// Example:
//
// ignored := []string{"CustomErrorType"}
// ctx = shared.WithIgnoredErrorsContext(ctx, ignored...)
func WithIgnoredErrorsContext(ctx workflow.Context, args ...string) workflow.Context {
return workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: 60 * time.Second,
RetryPolicy: &temporal.RetryPolicy{
MaximumAttempts: 1,
NonRetryableErrorTypes: args,
},
})
}

// WithMarathonContext returns a workflow.Context with activity options configured for long-running activities.
// It sets the StartToCloseTimeout to 60 minutes and the HeartbeatTimeout to 30 seconds.
//
// Example:
//
// ctx = shared.WithMarathonContext(ctx)
func WithMarathonContext(ctx workflow.Context) workflow.Context {
return workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: 60 * time.Minute,
HeartbeatTimeout: 30 * time.Second,
})
}

// WithCustomQueueContext returns a workflow.Context with activity options configured with a
// StartToCloseTimeout of 60 seconds and a dedicated task queue. This allows scheduling activities
// on a different queue than the one the workflow is running on.
//
// Example:
//
// ctx = shared.WithCustomQueueContext(ctx, queues.MyTaskQueue)
func WithCustomQueueContext(ctx workflow.Context, q queues.Queue) workflow.Context {
return workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: 60 * time.Second,
TaskQueue: q.String(),
})
}
2 changes: 2 additions & 0 deletions queues/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ var (
// ErrClientNil is returned when the temporal client is nil.
ErrClientNil = errors.New("client is nil")

ErrWorkerNil = errors.New("worker is nil")

// ErrChildWorkflowExecutionAttempt is returned when attempting to execute a child workflow without the parent.
ErrChildWorkflowExecutionAttempt = errors.New("attempting to execute child workflow directly. use ExecuteWorkflow instead")

Expand Down
46 changes: 38 additions & 8 deletions queues/queues.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package queues
import (
"context"
"fmt"
"sync"

"go.temporal.io/sdk/client"
"go.temporal.io/sdk/temporal"
Expand All @@ -40,6 +41,8 @@ type (
// Name gets the name of the queue as string.
Name() Name

String() string

// Prefix gets the prefix of the queue as string.
Prefix() string

Expand Down Expand Up @@ -125,7 +128,7 @@ type (
SignalExternalWorkflow(ctx workflow.Context, options workflows.Options, signal WorkflowSignal, args any) (WorkflowFuture, error)

// CreateWorker creates a worker against the queue.
CreateWorker(opts ...WorkerOption) worker.Worker
CreateWorker(opts ...WorkerOption)
}

// QueueOption is the option for a queue.
Expand All @@ -141,13 +144,20 @@ type (
workflowMaxAttempts int32 // The maximum number of attempts for a workflow.

client client.Client // The temporal client.

once sync.Once // The sync.Once for the queue.
worker worker.Worker // The temporal worker.
}
)

func (q Name) String() string {
return string(q)
}

func (q *queue) String() string {
return q.name.String()
}

func (q *queue) Name() Name {
return q.name
}
Expand All @@ -157,14 +167,14 @@ func (q *queue) Prefix() string {
}

func (q *queue) WorkflowID(options workflows.Options) string {
pfix := ""
prefix := ""
if options.IsChild() {
pfix, _ = options.ParentWorkflowID()
prefix, _ = options.ParentWorkflowID()
} else {
pfix = q.Prefix()
prefix = q.Prefix()
}

return fmt.Sprintf("%s.%s", pfix, options.IDSuffix())
return fmt.Sprintf("%s.%s", prefix, options.IDSuffix())
}

func (q *queue) ExecuteWorkflow(ctx context.Context, opts workflows.Options, fn any, payload ...any) (WorkflowRun, error) {
Expand Down Expand Up @@ -263,10 +273,30 @@ func (q *queue) RetryPolicy(opts workflows.Options) *temporal.RetryPolicy {
return &temporal.RetryPolicy{MaximumAttempts: attempts, NonRetryableErrorTypes: opts.IgnoredErrors()}
}

func (q *queue) CreateWorker(opts ...WorkerOption) worker.Worker {
options := NewWorkerOptions(opts...)
func (q *queue) CreateWorker(opts ...WorkerOption) {
q.once.Do(func() {
options := NewWorkerOptions(opts...)

q.worker = worker.New(q.client, q.Name().String(), options)
})
}

func (q *queue) Start(ctx context.Context) error {
if q.worker == nil {
return ErrWorkerNil
}

return q.worker.Start()
}

func (q *queue) Stop(ctx context.Context) error {
if q.worker == nil {
return ErrWorkerNil
}

q.worker.Stop()

return worker.New(q.client, q.Name().String(), options)
return nil
}

// WithName sets the queue name and the prefix for the workflow ID.
Expand Down
Loading

0 comments on commit a5d4ed5

Please sign in to comment.