-
Notifications
You must be signed in to change notification settings - Fork 17
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
dae93e6
commit 60ab26b
Showing
4 changed files
with
417 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,6 @@ | ||
import { endpoint } from "@restatedev/restate-sdk"; | ||
|
||
import { limiter } from "./limiter"; | ||
import { myService } from "./service"; | ||
|
||
endpoint().bind(limiter).bind(myService).listen(); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,238 @@ | ||
import { object, ObjectContext } from "@restatedev/restate-sdk"; | ||
|
||
type LimiterState = { | ||
limit: number; | ||
burst: number; | ||
tokens: number; | ||
// last is the last time the limiter's tokens field was updated, in unix millis | ||
last: number; | ||
// lastEvent is the latest time of a rate-limited event (past or future), in unix millis | ||
lastEvent: number; | ||
}; | ||
|
||
export interface Reservation { | ||
ok: boolean; | ||
tokens: number; | ||
creationDate: number; | ||
dateToAct: number; | ||
// This is the Limit at reservation time, it can change later. | ||
limit: number; | ||
} | ||
|
||
export const limiter = object({ | ||
name: "limiter", | ||
handlers: { | ||
state: async (ctx: ObjectContext<LimiterState>): Promise<LimiterState> => { | ||
return getState(ctx); | ||
}, | ||
limit: async (ctx: ObjectContext<LimiterState>): Promise<number> => { | ||
return (await ctx.get("limit")) ?? 0; | ||
}, | ||
burst: async (ctx: ObjectContext<LimiterState>): Promise<number> => { | ||
return (await ctx.get("burst")) ?? 0; | ||
}, | ||
tokens: async ( | ||
ctx: ObjectContext<LimiterState>, | ||
date?: number, | ||
): Promise<number> => { | ||
const { tokens } = advance( | ||
await getState(ctx), | ||
date ?? (await ctx.date.now()), | ||
); | ||
return tokens; | ||
}, | ||
reserve: async ( | ||
ctx: ObjectContext<LimiterState>, | ||
{ | ||
date, | ||
n, | ||
waitLimitMillis, | ||
}: { date?: number; n?: number; waitLimitMillis?: number }, | ||
): Promise<Reservation> => { | ||
return reserveN( | ||
ctx, | ||
await getState(ctx), | ||
date ?? (await ctx.date.now()), | ||
n ?? 1, | ||
waitLimitMillis ?? Infinity, | ||
); | ||
}, | ||
setRate: async ( | ||
ctx: ObjectContext<LimiterState>, | ||
{ | ||
date: _date, | ||
newLimit, | ||
newBurst, | ||
}: { date?: number; newLimit?: number; newBurst?: number }, | ||
) => { | ||
if (newLimit === undefined && newBurst === undefined) { | ||
return; | ||
} | ||
|
||
const lim = await getState(ctx); | ||
const { date, tokens } = advance(lim, _date ?? (await ctx.date.now())); | ||
|
||
lim.last = date; | ||
lim.tokens = tokens; | ||
if (newLimit !== undefined) lim.limit = newLimit; | ||
if (newBurst !== undefined) lim.burst = newBurst; | ||
|
||
setState(ctx, lim); | ||
}, | ||
cancelReservation: async ( | ||
ctx: ObjectContext<LimiterState>, | ||
{ r, date }: { r: Reservation; date?: number }, | ||
) => { | ||
const lim = await getState(ctx); | ||
return cancelReservationAt(ctx, lim, r, date ?? (await ctx.date.now())); | ||
}, | ||
}, | ||
}); | ||
|
||
function advance( | ||
lim: LimiterState, | ||
date: number, | ||
): { date: number; tokens: number } { | ||
let last = lim.last; | ||
if (date <= last) { | ||
last = date; | ||
} | ||
|
||
// Calculate the new number of tokens, due to time that passed. | ||
const elapsedMillis = date - last; | ||
const delta = tokensFromDuration(lim.limit, elapsedMillis); | ||
let tokens = lim.tokens + delta; | ||
if (tokens > lim.burst) { | ||
tokens = lim.burst; | ||
} | ||
|
||
return { date, tokens }; | ||
} | ||
|
||
function reserveN( | ||
ctx: ObjectContext<LimiterState>, | ||
lim: LimiterState, | ||
_date: number, | ||
n: number, | ||
maxFutureReserveMillis: number, | ||
): Reservation { | ||
if (lim.limit == Infinity) { | ||
return { | ||
ok: true, | ||
tokens: n, | ||
creationDate: _date, | ||
dateToAct: _date, | ||
limit: 0, | ||
}; | ||
} | ||
|
||
let { date, tokens } = advance(lim, _date); | ||
|
||
// Calculate the remaining number of tokens resulting from the request. | ||
tokens -= n; | ||
|
||
// Calculate the wait duration | ||
let waitDurationMillis = 0; | ||
if (tokens < 0) { | ||
waitDurationMillis = durationFromTokens(lim.limit, -tokens); | ||
} | ||
|
||
// Decide result | ||
const ok = n <= lim.burst && waitDurationMillis <= maxFutureReserveMillis; | ||
|
||
// Prepare reservation | ||
const r = { | ||
ok, | ||
tokens: 0, | ||
creationDate: date, | ||
dateToAct: 0, | ||
limit: lim.limit, | ||
} satisfies Reservation; | ||
|
||
if (ok) { | ||
r.tokens = n; | ||
r.dateToAct = date + waitDurationMillis; | ||
|
||
// Update state | ||
lim.last = date; | ||
lim.tokens = tokens; | ||
lim.lastEvent = r.dateToAct; | ||
setState(ctx, lim); | ||
} | ||
|
||
return r; | ||
} | ||
|
||
async function cancelReservationAt( | ||
ctx: ObjectContext<LimiterState>, | ||
lim: LimiterState, | ||
r: Reservation, | ||
_date: number, | ||
) { | ||
if (lim.limit == Infinity || r.tokens == 0 || r.dateToAct < _date) { | ||
return; | ||
} | ||
|
||
// calculate tokens to restore | ||
// The duration between lim.lastEvent and r.timeToAct tells us how many tokens were reserved | ||
// after r was obtained. These tokens should not be restored. | ||
const restoreTokens = | ||
r.tokens - tokensFromDuration(r.limit, lim.lastEvent - r.dateToAct); | ||
if (restoreTokens <= 0) { | ||
return; | ||
} | ||
// advance time to now | ||
let { date, tokens } = advance(lim, _date); | ||
// calculate new number of tokens | ||
tokens += restoreTokens; | ||
if (tokens > lim.burst) { | ||
tokens = lim.burst; | ||
} | ||
// update state | ||
lim.last = date; | ||
lim.tokens = tokens; | ||
if (r.dateToAct == lim.lastEvent) { | ||
const prevEvent = r.dateToAct + durationFromTokens(r.limit, -r.tokens); | ||
if (prevEvent >= date) { | ||
lim.lastEvent = prevEvent; | ||
} | ||
} | ||
setState(ctx, lim); | ||
} | ||
|
||
async function getState( | ||
ctx: ObjectContext<LimiterState>, | ||
): Promise<LimiterState> { | ||
return { | ||
limit: (await ctx.get("limit")) ?? 0, | ||
burst: (await ctx.get("burst")) ?? 0, | ||
tokens: (await ctx.get("tokens")) ?? 0, | ||
last: (await ctx.get("last")) ?? 0, | ||
lastEvent: (await ctx.get("lastEvent")) ?? 0, | ||
}; | ||
} | ||
|
||
async function setState(ctx: ObjectContext<LimiterState>, lim: LimiterState) { | ||
ctx.set("limit", lim.limit); | ||
ctx.set("burst", lim.burst); | ||
ctx.set("tokens", lim.tokens); | ||
ctx.set("last", lim.last); | ||
ctx.set("lastEvent", lim.lastEvent); | ||
} | ||
|
||
function durationFromTokens(limit: number, tokens: number): number { | ||
if (limit <= 0) { | ||
return Infinity; | ||
} | ||
|
||
return (tokens / limit) * 1000; | ||
} | ||
|
||
function tokensFromDuration(limit: number, durationMillis: number): number { | ||
if (limit <= 0) { | ||
return 0; | ||
} | ||
return (durationMillis / 1000) * limit; | ||
} | ||
|
||
export type Limiter = typeof limiter; |
Oops, something went wrong.