-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathhooks.ts
115 lines (105 loc) · 3.23 KB
/
hooks.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
import { skipToken } from '@tanstack/react-query';
import { trpc } from '~/lib/trpc';
import * as React from 'react';
/**
* Set isTyping with a throttle of 1s
* Triggers immediately if state changes
*/
export function useThrottledIsTypingMutation(channelId: string) {
const isTyping = trpc.channel.isTyping.useMutation();
return React.useMemo(() => {
let state = false;
let timeout: ReturnType<typeof setTimeout> | null;
function trigger() {
timeout && clearTimeout(timeout);
timeout = null;
isTyping.mutate({ typing: state, channelId });
}
return (nextState: boolean) => {
const shouldTriggerImmediately = nextState !== state;
state = nextState;
if (shouldTriggerImmediately) {
trigger();
} else if (!timeout) {
timeout = setTimeout(trigger, 1000);
}
};
// eslint-disable-next-line react-hooks/exhaustive-deps
}, [channelId]);
}
export function useLivePosts(channelId: string) {
const [, query] = trpc.post.infinite.useSuspenseInfiniteQuery(
{ channelId },
{
getNextPageParam: (d) => d.nextCursor,
// No need to refetch as we have a subscription
refetchOnReconnect: false,
refetchOnWindowFocus: false,
refetchOnMount: false,
},
);
const utils = trpc.useUtils();
const [messages, setMessages] = React.useState(() => {
const msgs = query.data?.pages.map((page) => page.items).flat();
return msgs ?? null;
});
type Post = NonNullable<typeof messages>[number];
/**
* fn to add and dedupe new messages onto state
*/
const addMessages = React.useCallback((incoming?: Post[]) => {
setMessages((current) => {
const map: Record<Post['id'], Post> = {};
for (const msg of current ?? []) {
map[msg.id] = msg;
}
for (const msg of incoming ?? []) {
map[msg.id] = msg;
}
return Object.values(map).sort(
(a, b) => a.createdAt.getTime() - b.createdAt.getTime(),
);
});
}, []);
/**
* when new data from `useInfiniteQuery`, merge with current state
*/
React.useEffect(() => {
const msgs = query.data?.pages.map((page) => page.items).flat();
addMessages(msgs);
}, [query.data?.pages, addMessages]);
const [lastEventId, setLastEventId] = React.useState<
// Query has not been run yet
| false
// Empty list
| null
// Event id
| string
>(false);
if (messages && lastEventId === false) {
// We should only set the lastEventId once, if the SSE-connection is lost, it will automatically reconnect and continue from the last event id
// Changing this value will trigger a new subscription
setLastEventId(messages.at(-1)?.id ?? null);
}
const subscription = trpc.post.onAdd.useSubscription(
lastEventId === false ? skipToken : { channelId, lastEventId },
{
onData(event) {
addMessages([event.data]);
},
onError(err) {
console.error('Subscription error:', err);
const lastMessageEventId = messages?.at(-1)?.id;
if (lastMessageEventId) {
// We've lost the connection, let's resubscribe from the last message
setLastEventId(lastMessageEventId);
}
},
},
);
return {
query,
messages,
subscription,
};
}