Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add binary variant to PUBSUB subscribe commands #420

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 20 additions & 10 deletions node/src/BaseClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -521,9 +521,9 @@ function getRequestErrorClass(
}

export type PubSubMsg = {
message: string;
channel: string;
pattern?: string | null;
message: GlideString;
channel: GlideString;
pattern?: GlideString | null;
};

export type WritePromiseOptions = {
Expand Down Expand Up @@ -951,19 +951,21 @@ export class BaseClient {
let msg: PubSubMsg | null = null;
const responsePointer = pushNotification.respPointer;
let nextPushNotificationValue: Record<string, unknown> = {};
const isStringDecoder =
(decoder ?? this.defaultDecoder) === Decoder.String;

if (responsePointer) {
if (typeof responsePointer !== "number") {
nextPushNotificationValue = valueFromSplitPointer(
responsePointer.high,
responsePointer.low,
decoder === Decoder.String,
isStringDecoder,
) as Record<string, unknown>;
} else {
nextPushNotificationValue = valueFromSplitPointer(
0,
responsePointer,
decoder === Decoder.String,
isStringDecoder,
) as Record<string, unknown>;
}

Expand All @@ -980,7 +982,9 @@ export class BaseClient {
messageKind === "PMessage" ||
messageKind === "SMessage"
) {
const values = nextPushNotificationValue["values"] as string[];
const values = nextPushNotificationValue[
"values"
] as GlideString[];

if (messageKind === "PMessage") {
msg = {
Expand Down Expand Up @@ -6815,8 +6819,10 @@ export class BaseClient {
*
* @see {@link https://valkey.io/commands/pubsub-channels/|valkey.io} for more details.
*
* @param pattern - A glob-style pattern to match active channels.
* If not provided, all active channels are returned.
* @param options - (Optional) Additional parameters:
* - (Optional) `pattern`: A glob-style pattern to match active channels.
* If not provided, all active channels are returned.
* - (Optional) `decoder`: see {@link DecoderOption}.
* @returns A list of currently active channels matching the given pattern.
* If no pattern is specified, all active channels are returned.
*
Expand All @@ -6829,8 +6835,12 @@ export class BaseClient {
* console.log(newsChannels); // Output: ["news.sports", "news.weather"]
* ```
*/
public async pubsubChannels(pattern?: string): Promise<string[]> {
return this.createWritePromise(createPubSubChannels(pattern));
public async pubsubChannels(
options?: { pattern?: GlideString } & DecoderOption,
): Promise<GlideString[]> {
return this.createWritePromise(createPubSubChannels(options?.pattern), {
decoder: options?.decoder,
});
}

/**
Expand Down
8 changes: 4 additions & 4 deletions node/src/Commands.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2207,8 +2207,8 @@ export function createTime(): command_request.Command {
* @internal
*/
export function createPublish(
message: string,
channel: string,
message: GlideString,
channel: GlideString,
sharded: boolean = false,
): command_request.Command {
const request = sharded ? RequestType.SPublish : RequestType.Publish;
Expand Down Expand Up @@ -3842,7 +3842,7 @@ export function createBLMPop(
* @internal
*/
export function createPubSubChannels(
pattern?: string,
pattern?: GlideString,
): command_request.Command {
return createCommand(RequestType.PubSubChannels, pattern ? [pattern] : []);
}
Expand All @@ -3867,7 +3867,7 @@ export function createPubSubNumSub(
* @internal
*/
export function createPubsubShardChannels(
pattern?: string,
pattern?: GlideString,
): command_request.Command {
return createCommand(RequestType.PubSubSChannels, pattern ? [pattern] : []);
}
Expand Down
5 changes: 4 additions & 1 deletion node/src/GlideClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -851,7 +851,10 @@ export class GlideClient extends BaseClient {
* console.log(result); // Output: 1 - This message was posted to 1 subscription which is configured on primary node
* ```
*/
public async publish(message: string, channel: string): Promise<number> {
public async publish(
message: GlideString,
channel: GlideString,
): Promise<number> {
return this.createWritePromise(createPublish(message, channel));
}

Expand Down
21 changes: 15 additions & 6 deletions node/src/GlideClusterClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1222,8 +1222,8 @@ export class GlideClusterClient extends BaseClient {
* ```
*/
public async publish(
message: string,
channel: string,
message: GlideString,
channel: GlideString,
sharded: boolean = false,
): Promise<number> {
return this.createWritePromise(
Expand All @@ -1237,8 +1237,10 @@ export class GlideClusterClient extends BaseClient {
*
* @see {@link https://valkey.io/commands/pubsub-shardchannels/|valkey.io} for details.
*
* @param pattern - A glob-style pattern to match active shard channels.
* If not provided, all active shard channels are returned.
* @param options - (Optional) Additional parameters:
* - (Optional) `pattern`: A glob-style pattern to match active shard channels.
* If not provided, all active shard channels are returned.
* - (Optional) `decoder`: see {@link DecoderOption}.
* @returns A list of currently active shard channels matching the given pattern.
* If no pattern is specified, all active shard channels are returned.
*
Expand All @@ -1251,8 +1253,15 @@ export class GlideClusterClient extends BaseClient {
* console.log(filteredChannels); // Output: ["channel1", "channel2"]
* ```
*/
public async pubsubShardChannels(pattern?: string): Promise<string[]> {
return this.createWritePromise(createPubsubShardChannels(pattern));
public async pubsubShardChannels(
options?: {
pattern?: GlideString;
} & DecoderOption,
): Promise<GlideString[]> {
return this.createWritePromise(
createPubsubShardChannels(options?.pattern),
{ decoder: options?.decoder },
);
}

/**
Expand Down
10 changes: 5 additions & 5 deletions node/src/Transaction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3841,7 +3841,7 @@ export class BaseTransaction<T extends BaseTransaction<T>> {
* Command Response - A list of currently active channels matching the given pattern.
* If no pattern is specified, all active channels are returned.
*/
public pubsubChannels(pattern?: string): T {
public pubsubChannels(pattern?: GlideString): T {
return this.addAndReturn(createPubSubChannels(pattern));
}

Expand Down Expand Up @@ -4024,7 +4024,7 @@ export class Transaction extends BaseTransaction<Transaction> {
* Command Response - Number of subscriptions in primary node that received the message.
* Note that this value does not include subscriptions that configured on replicas.
*/
public publish(message: string, channel: string): Transaction {
public publish(message: GlideString, channel: GlideString): Transaction {
return this.addAndReturn(createPublish(message, channel));
}
}
Expand Down Expand Up @@ -4151,8 +4151,8 @@ export class ClusterTransaction extends BaseTransaction<ClusterTransaction> {
* Command Response - Number of subscriptions in primary node that received the message.
*/
public publish(
message: string,
channel: string,
message: GlideString,
channel: GlideString,
sharded: boolean = false,
): ClusterTransaction {
return this.addAndReturn(createPublish(message, channel, sharded));
Expand All @@ -4170,7 +4170,7 @@ export class ClusterTransaction extends BaseTransaction<ClusterTransaction> {
* Command Response - A list of currently active shard channels matching the given pattern.
* If no pattern is specified, all active shard channels are returned.
*/
public pubsubShardChannels(pattern?: string): ClusterTransaction {
public pubsubShardChannels(pattern?: GlideString): ClusterTransaction {
return this.addAndReturn(createPubsubShardChannels(pattern));
}

Expand Down
Loading
Loading