Skip to content

Commit

Permalink
Merge branch 'main' of https://github.com/uplion/WokerNode
Browse files Browse the repository at this point in the history
  • Loading branch information
DrinkLessMilkTea committed Jul 18, 2024
2 parents 160b27a + 7452c76 commit 6b2d891
Showing 1 changed file with 14 additions and 7 deletions.
21 changes: 14 additions & 7 deletions WorkerNodeGo/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
"github.com/apache/pulsar-client-go/pulsar"
)

const MODEL_NAME = "static"
var MODEL_NAME = "static"

var PULSAR_URL string
var MAX_PROCESS_NUM int
Expand All @@ -24,6 +24,10 @@ var LOCK sync.Mutex
var LOCK_CHAN = make(chan int, 1)

func init() {
if v, ok := os.LookupEnv("MODEL_NAME"); ok {
MODEL_NAME = v
}

if v, ok := os.LookupEnv("PULSAR_URL"); ok {
PULSAR_URL = v
} else {
Expand Down Expand Up @@ -80,22 +84,27 @@ type Response struct {
}

func handle(msg pulsar.Message, consumer pulsar.Consumer) {
defer consumer.Ack(msg)

defer func() {
if LIMIT {
LOCK.Lock()
PROCESSING--
LOCK.Unlock()
log.Printf("Processing slot freed: %d/%d", PROCESSING, MAX_PROCESS_NUM)
select {
case LOCK_CHAN <- 1:
default:
}
}
}()
task := Task{}
if LIMIT {
log.Printf("Recv message: %s", string(msg.Payload()))
}
err := json.Unmarshal(msg.Payload(), &task)
if err != nil {
log.Fatalf("Could not unmarshal message: %v", err)
consumer.Ack(msg)
return
}

Expand All @@ -108,7 +117,6 @@ func handle(msg pulsar.Message, consumer pulsar.Consumer) {

if err != nil {
log.Printf("Could not marshal response: %v", err)
consumer.Ack(msg)
return
}

Expand All @@ -120,19 +128,15 @@ func handle(msg pulsar.Message, consumer pulsar.Consumer) {

if err != nil {
log.Printf("Could not send response: %v", err)
consumer.Ack(msg)
return
}

defer res.Body.Close()

if res.StatusCode != http.StatusOK {
log.Printf("Could not send response: %v, %s", res.Status, endpoint)
consumer.Ack(msg)
return
}

consumer.Ack(msg)
}

func main() {
Expand Down Expand Up @@ -164,8 +168,11 @@ func main() {
break
}
LOCK.Unlock()
log.Printf("Processing limit reached, waiting for free slot: %d/%d", PROCESSING, MAX_PROCESS_NUM)
<-LOCK_CHAN
log.Printf("Free slot may be available: %d/%d", PROCESSING, MAX_PROCESS_NUM)
}
log.Printf("Processing slot acquired: %d/%d, now fetching message", PROCESSING, MAX_PROCESS_NUM)
}
msg, err := consumer.Receive(context.Background())
if err != nil {
Expand Down

0 comments on commit 6b2d891

Please sign in to comment.