Skip to content

Commit

Permalink
Updating driver
Browse files Browse the repository at this point in the history
  • Loading branch information
jazibjohar committed May 5, 2023
1 parent 81665ad commit 0c59ddb
Showing 1 changed file with 8 additions and 4 deletions.
12 changes: 8 additions & 4 deletions redis.v9/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,12 @@ func (d *drv) ListPush(ctx context.Context, queue string, jobJSON string) (int64
return d.client.RPush(ctx, d.nameSpace+"queue:"+queue, []byte(jobJSON)).Result()
}

func getSortedSetKey(namespace, queue string) string {
return namespace + "queue:sorted:" + queue
}

func (d *drv) ListPushDelay(ctx context.Context, t time.Time, queue string, jobJSON string) (bool, error) {
_, err := d.client.ZAdd(ctx, d.nameSpace+"queue:"+queue, redis.Z{
_, err := d.client.ZAdd(ctx, getSortedSetKey(d.nameSpace, queue), redis.Z{
Score: timeToSecondsWithNanoPrecision(t),
Member: []byte(jobJSON),
}).Result()
Expand All @@ -53,7 +57,7 @@ func (d *drv) Poll(ctx context.Context) {
now := timeToSecondsWithNanoPrecision(time.Now())
for key := range d.schedule {
jobs, _ := d.client.ZRangeArgs(ctx, redis.ZRangeArgs{
Key: key,
Key: getSortedSetKey(d.nameSpace, key),
ByScore: true,
Start: "-inf",
Stop: now,
Expand All @@ -62,8 +66,8 @@ func (d *drv) Poll(ctx context.Context) {
if len(jobs) == 0 {
continue
}
if removed, _ := d.client.ZRem(ctx, key, []byte(jobs[0])).Result(); removed > 0 {
d.client.LPush(ctx, key, []byte(jobs[0]))
if removed, _ := d.client.ZRem(ctx, getSortedSetKey(d.nameSpace, key), []byte(jobs[0])).Result(); removed > 0 {
d.client.LPush(ctx, d.nameSpace+"queue:"+key, []byte(jobs[0]))
}
}
time.Sleep(100 * time.Millisecond)
Expand Down

0 comments on commit 0c59ddb

Please sign in to comment.