-
Notifications
You must be signed in to change notification settings - Fork 44
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
10 changed files
with
359 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
build | ||
node_modules | ||
.wrangler |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,12 @@ | ||
.PHONY: dev | ||
dev: | ||
npx wrangler dev --port 8787 | ||
|
||
.PHONY: build | ||
build: | ||
go run ../../cmd/workers-assets-gen | ||
tinygo build -o ./build/app.wasm -target wasm -no-debug ./... | ||
|
||
.PHONY: deploy | ||
deploy: | ||
npx wrangler deploy |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
module github.com/syumai/workers/_examples/queues | ||
|
||
go 1.22.8 | ||
|
||
require github.com/syumai/workers v0.0.0 | ||
|
||
replace github.com/syumai/workers => ../../ |
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,105 @@ | ||
package main | ||
|
||
import ( | ||
"encoding/json" | ||
"fmt" | ||
"io" | ||
"log" | ||
"net/http" | ||
|
||
"github.com/syumai/workers" | ||
"github.com/syumai/workers/cloudflare/queues" | ||
) | ||
|
||
const queueName = "QUEUE" | ||
|
||
func handleErr(w http.ResponseWriter, msg string, err error) { | ||
log.Println(err) | ||
w.WriteHeader(http.StatusInternalServerError) | ||
w.Write([]byte(msg)) | ||
} | ||
|
||
func main() { | ||
http.HandleFunc("/", handleProduce) | ||
workers.Serve(nil) | ||
} | ||
func handleProduce(w http.ResponseWriter, req *http.Request) { | ||
if req.URL.Path != "/" { | ||
w.WriteHeader(http.StatusNotFound) | ||
return | ||
} | ||
|
||
if req.Method != http.MethodPost { | ||
w.WriteHeader(http.StatusMethodNotAllowed) | ||
return | ||
} | ||
|
||
defer req.Body.Close() | ||
|
||
q, err := queues.NewProducer(queueName) | ||
if err != nil { | ||
handleErr(w, "failed to init queue", err) | ||
} | ||
|
||
contentType := req.Header.Get("Content-Type") | ||
switch contentType { | ||
case "text/plain": | ||
log.Println("Handling text content type") | ||
err = produceText(q, req) | ||
case "application/json": | ||
log.Println("Handling json content type") | ||
err = produceJson(q, req) | ||
default: | ||
log.Println("Handling bytes content type") | ||
err = produceBytes(q, req) | ||
} | ||
|
||
if err != nil { | ||
handleErr(w, "failed to handle request", err) | ||
return | ||
} | ||
|
||
w.WriteHeader(http.StatusOK) | ||
w.Write([]byte("message sent\n")) | ||
} | ||
|
||
func produceText(q *queues.Producer, req *http.Request) error { | ||
content, err := io.ReadAll(req.Body) | ||
if err != nil { | ||
return fmt.Errorf("failed to read request body: %w", err) | ||
} | ||
if len(content) == 0 { | ||
return fmt.Errorf("empty request body") | ||
} | ||
|
||
// text content type supports string and []byte messages | ||
if err := q.Send(content, queues.WithContentType(queues.QueueContentTypeText)); err != nil { | ||
return fmt.Errorf("failed to send message: %w", err) | ||
} | ||
|
||
return nil | ||
} | ||
|
||
func produceJson(q *queues.Producer, req *http.Request) error { | ||
var data any | ||
if err := json.NewDecoder(req.Body).Decode(&data); err != nil { | ||
return fmt.Errorf("failed to read request body: %w", err) | ||
} | ||
|
||
// json content type is default and therefore can be omitted | ||
// json content type supports messages of types that can be serialized to json | ||
if err := q.Send(data); err != nil { | ||
return fmt.Errorf("failed to send message: %w", err) | ||
} | ||
|
||
return nil | ||
} | ||
|
||
func produceBytes(q *queues.Producer, req *http.Request) error { | ||
// bytes content type support messages of type []byte, string, and io.Reader | ||
if err := q.Send(req.Body, queues.WithContentType(queues.QueueContentTypeBytes)); err != nil { | ||
return fmt.Errorf("failed to send message: %w", err) | ||
} | ||
|
||
return nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,13 @@ | ||
name = "queues-producer" | ||
main = "./build/worker.mjs" | ||
compatibility_date = "2022-05-13" | ||
compatibility_flags = [ | ||
"streams_enable_constructors" | ||
] | ||
|
||
[[queues.producers]] | ||
queue = "my-queue" | ||
binding = "QUEUE" | ||
|
||
[build] | ||
command = "make build" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,58 @@ | ||
package queues | ||
|
||
import ( | ||
"fmt" | ||
"io" | ||
"syscall/js" | ||
|
||
"github.com/syumai/workers/internal/jsutil" | ||
) | ||
|
||
type QueueContentType string | ||
|
||
const ( | ||
QueueContentTypeJSON QueueContentType = "json" | ||
QueueContentTypeText QueueContentType = "text" | ||
QueueContentTypeBytes QueueContentType = "bytes" | ||
QueueContentTypeV8 QueueContentType = "v8" | ||
) | ||
|
||
func (o QueueContentType) mapValue(val any) (js.Value, error) { | ||
switch o { | ||
case QueueContentTypeText: | ||
switch v := val.(type) { | ||
case string: | ||
return js.ValueOf(v), nil | ||
case []byte: | ||
return js.ValueOf(string(v)), nil | ||
default: | ||
return js.Undefined(), fmt.Errorf("invalid value type for text content type: %T", val) | ||
} | ||
|
||
case QueueContentTypeBytes: | ||
var b []byte | ||
switch v := val.(type) { | ||
case string: | ||
b = []byte(v) | ||
case []byte: | ||
b = v | ||
case io.Reader: | ||
var err error | ||
b, err = io.ReadAll(v) | ||
if err != nil { | ||
return js.Undefined(), fmt.Errorf("failed to read bytes from reader: %w", err) | ||
} | ||
default: | ||
return js.Undefined(), fmt.Errorf("invalid value type for bytes content type: %T", val) | ||
} | ||
|
||
ua := jsutil.NewUint8Array(len(b)) | ||
js.CopyBytesToJS(ua, b) | ||
return ua.Get("buffer"), nil | ||
|
||
case QueueContentTypeJSON, QueueContentTypeV8: | ||
return js.ValueOf(val), nil | ||
} | ||
|
||
return js.Undefined(), fmt.Errorf("unknown content type: %s", o) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,105 @@ | ||
package queues | ||
|
||
import ( | ||
"errors" | ||
"fmt" | ||
"syscall/js" | ||
|
||
"github.com/syumai/workers/cloudflare/internal/cfruntimecontext" | ||
"github.com/syumai/workers/internal/jsutil" | ||
) | ||
|
||
type BatchMessage struct { | ||
body any | ||
options *sendOptions | ||
} | ||
|
||
func NewBatchMessage(body any, opts ...SendOption) *BatchMessage { | ||
options := defaultSendOptions() | ||
for _, opt := range opts { | ||
opt(options) | ||
} | ||
return &BatchMessage{body: body, options: options} | ||
} | ||
|
||
func (m *BatchMessage) toJS() (js.Value, error) { | ||
if m == nil { | ||
return js.Undefined(), errors.New("message is nil") | ||
} | ||
|
||
jsValue, err := m.options.ContentType.mapValue(m.body) | ||
if err != nil { | ||
return js.Undefined(), err | ||
} | ||
|
||
obj := jsutil.NewObject() | ||
obj.Set("body", jsValue) | ||
obj.Set("options", m.options.toJS()) | ||
|
||
return obj, nil | ||
} | ||
|
||
type Producer struct { | ||
// queue - Objects that Queue API belongs to. Default is Global | ||
queue js.Value | ||
} | ||
|
||
func NewProducer(queueName string) (*Producer, error) { | ||
inst := cfruntimecontext.MustGetRuntimeContextEnv().Get(queueName) | ||
if inst.IsUndefined() { | ||
return nil, fmt.Errorf("%s is undefined", queueName) | ||
} | ||
return &Producer{queue: inst}, nil | ||
} | ||
|
||
func (p *Producer) Send(content any, opts ...SendOption) error { | ||
if p.queue.IsUndefined() { | ||
return errors.New("queue object not found") | ||
} | ||
|
||
options := defaultSendOptions() | ||
for _, opt := range opts { | ||
opt(options) | ||
} | ||
|
||
jsValue, err := options.ContentType.mapValue(content) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
prom := p.queue.Call("send", jsValue, options.toJS()) | ||
_, err = jsutil.AwaitPromise(prom) | ||
return err | ||
} | ||
|
||
func (p *Producer) SendBatch(messages []*BatchMessage) error { | ||
if p.queue.IsUndefined() { | ||
return errors.New("queue object not found") | ||
} | ||
|
||
if len(messages) == 0 { | ||
return nil | ||
} | ||
|
||
jsArray := jsutil.NewArray(len(messages)) | ||
for i, message := range messages { | ||
jsValue, err := message.toJS() | ||
if err != nil { | ||
return fmt.Errorf("failed to convert message %d to JS: %w", i, err) | ||
} | ||
jsArray.SetIndex(i, jsValue) | ||
} | ||
|
||
prom := p.queue.Call("sendBatch", jsArray) | ||
_, err := jsutil.AwaitPromise(prom) | ||
return err | ||
} | ||
|
||
func (p *Producer) SendJsonBatch(messages ...any) error { | ||
batch := make([]*BatchMessage, len(messages)) | ||
for i, message := range messages { | ||
batch[i] = NewBatchMessage(message) | ||
} | ||
|
||
return p.SendBatch(batch) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,51 @@ | ||
package queues | ||
|
||
import ( | ||
"syscall/js" | ||
"time" | ||
|
||
"github.com/syumai/workers/internal/jsutil" | ||
) | ||
|
||
type sendOptions struct { | ||
// ContentType - Content type of the message | ||
// Default is "json" | ||
ContentType QueueContentType | ||
|
||
// DelaySeconds - The number of seconds to delay the message. | ||
// Default is 0 | ||
DelaySeconds int | ||
} | ||
|
||
func defaultSendOptions() *sendOptions { | ||
return &sendOptions{ | ||
ContentType: QueueContentTypeJSON, | ||
} | ||
} | ||
|
||
func (o *sendOptions) toJS() js.Value { | ||
obj := jsutil.NewObject() | ||
obj.Set("contentType", string(o.ContentType)) | ||
|
||
if o.DelaySeconds != 0 { | ||
obj.Set("delaySeconds", o.DelaySeconds) | ||
} | ||
|
||
return obj | ||
} | ||
|
||
type SendOption func(*sendOptions) | ||
|
||
// WithContentType changes the content type of the message. | ||
func WithContentType(contentType QueueContentType) SendOption { | ||
return func(o *sendOptions) { | ||
o.ContentType = contentType | ||
} | ||
} | ||
|
||
// WithDelay changes the number of seconds to delay the message. | ||
func (q *Producer) WithDelay(d time.Duration) SendOption { | ||
return func(o *sendOptions) { | ||
o.DelaySeconds = int(d.Seconds()) | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters