Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: query fewer historical messages on reconnects #5001

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
343cd70
perf: query fewer historical messages on reconnects
iProdigy Dec 4, 2023
469f41c
feat: specify before and after timestamps
iProdigy Dec 5, 2023
f2c7f71
chore: update changelog
iProdigy Dec 5, 2023
a5bd088
feat: add jitter to historical message requests
iProdigy Dec 5, 2023
9ea6c20
Merge branch 'master' into perf/recent-message-reconnect-load
iProdigy Dec 5, 2023
138efd2
docs: clarify default timestamp behavior
iProdigy Dec 5, 2023
fb6a152
refactor: ensure timestamp is stored as 64-bit
iProdigy Dec 5, 2023
077aa85
refactor: use optional time points
iProdigy Dec 6, 2023
e6e293e
fix: delay historical message query until rejoined
iProdigy Dec 6, 2023
1b5b9fb
chore: use pointer accessor
iProdigy Dec 6, 2023
45e6093
chore: use add chrono headers
iProdigy Dec 6, 2023
2d490a7
chore: add header for optional
iProdigy Dec 6, 2023
4a1cc13
Apply suggestions from code review
iProdigy Dec 7, 2023
82ecf4d
refactor: make limit a required arg
iProdigy Dec 7, 2023
855639f
fix: avoid double query when reconnect is requested before initial join
iProdigy Dec 7, 2023
ebb1446
Merge branch 'master' into perf/recent-message-reconnect-load
pajlada Dec 8, 2023
6ca8e92
nit: Remove `recentmessages::load`'s parameter default values
pajlada Dec 9, 2023
1b48f69
nit: Remove `recentmessages::detail::constructRecentMessagesUrl` para…
pajlada Dec 9, 2023
bff18bf
nit: recentmessages impl: remove unused settings include
pajlada Dec 9, 2023
12bf88f
nit: Don't respect after & before query params in env variable URL
pajlada Dec 9, 2023
dc9985d
Merge remote-tracking branch 'origin/master' into perf/recent-message…
pajlada Dec 9, 2023
3d7d65c
nit: rename `duration` to `secondsSinceDisconnect`
pajlada Dec 9, 2023
a2ee3c2
nit: Remove now-unused `Channel::connected` signal
pajlada Dec 9, 2023
3aed60c
Update changelog entry
pajlada Dec 9, 2023
41b3c74
nit: Update comment for `TwitchChannel::markDisconnectedNow`
pajlada Dec 9, 2023
94e78e3
Merge branch 'master' into perf/recent-message-reconnect-load
iProdigy Dec 9, 2023
0f8131f
docs: fix typo in method code comment
iProdigy Dec 9, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@
- Dev: Move `clang-tidy` checker to its own CI job. (#4996)
- Dev: Refactored the Image Uploader feature. (#4971)
- Dev: Fixed deadlock and use-after-free in tests. (#4981)
- Dev: Load less message history upon reconnects. (#5001)

## 2.4.6

Expand Down
2 changes: 0 additions & 2 deletions src/common/Channel.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,6 @@ class Channel : public std::enable_shared_from_this<Channel>
pajlada::Signals::Signal<const std::vector<MessagePtr> &> filledInMessages;
pajlada::Signals::NoArgSignal destroyed;
pajlada::Signals::NoArgSignal displayNameChanged;
/// Invoked when AbstractIrcServer::onReadConnected occurs
pajlada::Signals::NoArgSignal connected;

Type getType() const;
const QString &getName() const;
Expand Down
8 changes: 6 additions & 2 deletions src/providers/irc/AbstractIrcServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include "messages/LimitedQueueSnapshot.hpp"
#include "messages/Message.hpp"
#include "messages/MessageBuilder.hpp"
#include "providers/twitch/TwitchChannel.hpp"

#include <QCoreApplication>

Expand Down Expand Up @@ -331,8 +332,6 @@ void AbstractIrcServer::onReadConnected(IrcConnection *connection)
{
chan->addMessage(connectedMsg);
}

chan->connected.invoke();
}

this->falloffCounter_ = 1;
Expand Down Expand Up @@ -360,6 +359,11 @@ void AbstractIrcServer::onDisconnected()
}

chan->addMessage(disconnectedMsg);

if (auto *channel = dynamic_cast<TwitchChannel *>(chan.get()))
{
channel->markDisconnectedNow();
}
}
}

Expand Down
128 changes: 70 additions & 58 deletions src/providers/recentmessages/Api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,72 +18,84 @@ namespace chatterino::recentmessages {

using namespace recentmessages::detail;

void load(const QString &channelName, std::weak_ptr<Channel> channelPtr,
ResultCallback onLoaded, ErrorCallback onError)
void load(
const QString &channelName, std::weak_ptr<Channel> channelPtr,
iProdigy marked this conversation as resolved.
Show resolved Hide resolved
iProdigy marked this conversation as resolved.
Show resolved Hide resolved
ResultCallback onLoaded, ErrorCallback onError, const int limit,
iProdigy marked this conversation as resolved.
Show resolved Hide resolved
iProdigy marked this conversation as resolved.
Show resolved Hide resolved
iProdigy marked this conversation as resolved.
Show resolved Hide resolved
iProdigy marked this conversation as resolved.
Show resolved Hide resolved
iProdigy marked this conversation as resolved.
Show resolved Hide resolved
iProdigy marked this conversation as resolved.
Show resolved Hide resolved
const std::optional<std::chrono::time_point<std::chrono::system_clock>>
after,
const std::optional<std::chrono::time_point<std::chrono::system_clock>>
before,
const bool jitter)
{
qCDebug(LOG) << "Loading recent messages for" << channelName;

const auto url = constructRecentMessagesUrl(channelName);

NetworkRequest(url)
.onSuccess([channelPtr, onLoaded](const auto &result) {
auto shared = channelPtr.lock();
if (!shared)
{
return;
}

qCDebug(LOG) << "Successfully loaded recent messages for"
<< shared->getName();

auto root = result.parseJson();
auto parsedMessages = parseRecentMessages(root);

// build the Communi messages into chatterino messages
auto builtMessages =
buildRecentMessages(parsedMessages, shared.get());

postToThread([shared = std::move(shared), root = std::move(root),
messages = std::move(builtMessages),
onLoaded]() mutable {
// Notify user about a possible gap in logs if it returned some messages
// but isn't currently joined to a channel
const auto errorCode = root.value("error_code").toString();
if (!errorCode.isEmpty())
const auto url =
constructRecentMessagesUrl(channelName, limit, after, before);

const long delayMs = jitter ? std::rand() % 100 : 0;
iProdigy marked this conversation as resolved.
Show resolved Hide resolved
QTimer::singleShot(delayMs, [=] {
NetworkRequest(url)
.onSuccess([channelPtr, onLoaded](const auto &result) {
auto shared = channelPtr.lock();
if (!shared)
{
qCDebug(LOG)
<< QString("Got error from API: error_code=%1, "
"channel=%2")
.arg(errorCode, shared->getName());
if (errorCode == "channel_not_joined" && !messages.empty())
return;
}

qCDebug(LOG) << "Successfully loaded recent messages for"
<< shared->getName();

auto root = result.parseJson();
auto parsedMessages = parseRecentMessages(root);

// build the Communi messages into chatterino messages
auto builtMessages =
buildRecentMessages(parsedMessages, shared.get());

postToThread([shared = std::move(shared),
root = std::move(root),
messages = std::move(builtMessages),
onLoaded]() mutable {
// Notify user about a possible gap in logs if it returned some messages
// but isn't currently joined to a channel
const auto errorCode = root.value("error_code").toString();
if (!errorCode.isEmpty())
{
shared->addMessage(makeSystemMessage(
"Message history service recovering, there may "
"be gaps in the message history."));
qCDebug(LOG)
<< QString("Got error from API: error_code=%1, "
"channel=%2")
.arg(errorCode, shared->getName());
if (errorCode == "channel_not_joined" &&
!messages.empty())
{
shared->addMessage(makeSystemMessage(
"Message history service recovering, there may "
"be gaps in the message history."));
}
}

onLoaded(messages);
});
})
.onError([channelPtr, onError](const NetworkResult &result) {
auto shared = channelPtr.lock();
if (!shared)
{
return;
}

onLoaded(messages);
});
})
.onError([channelPtr, onError](const NetworkResult &result) {
auto shared = channelPtr.lock();
if (!shared)
{
return;
}

qCDebug(LOG) << "Failed to load recent messages for"
<< shared->getName();

shared->addMessage(makeSystemMessage(
QStringLiteral(
"Message history service unavailable (Error: %1)")
.arg(result.formatError())));

onError();
})
.execute();
qCDebug(LOG) << "Failed to load recent messages for"
<< shared->getName();

shared->addMessage(makeSystemMessage(
QStringLiteral(
"Message history service unavailable (Error: %1)")
.arg(result.formatError())));

onError();
})
.execute();
});
}

} // namespace chatterino::recentmessages
14 changes: 12 additions & 2 deletions src/providers/recentmessages/Api.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@

#include <QString>

#include <chrono>
#include <functional>
#include <memory>
#include <optional>
#include <vector>

namespace chatterino {
Expand All @@ -28,8 +30,16 @@ using ErrorCallback = std::function<void()>;
* @param channelPtr Weak pointer to Channel to use to build messages
* @param onLoaded Callback taking the built messages as a const std::vector<MessagePtr> &
* @param onError Callback called when the network request fails
* @param limit Maximum number of messages to query
* @param after Only return messages that were received after this timestamp; ignored if `std::nullopt`
* @param before Only return messages that were received before this timestamp; ignored if `std::nullopt`
* @param jitter Whether to delay the request by a small random duration
*/
void load(const QString &channelName, std::weak_ptr<Channel> channelPtr,
ResultCallback onLoaded, ErrorCallback onError);
void load(
const QString &channelName, std::weak_ptr<Channel> channelPtr,
ResultCallback onLoaded, ErrorCallback onError, int limit,
std::optional<std::chrono::time_point<std::chrono::system_clock>> after,
std::optional<std::chrono::time_point<std::chrono::system_clock>> before,
bool jitter);

} // namespace chatterino::recentmessages
25 changes: 22 additions & 3 deletions src/providers/recentmessages/Impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
#include "providers/twitch/IrcMessageHandler.hpp"
#include "providers/twitch/TwitchChannel.hpp"
#include "providers/twitch/TwitchMessageBuilder.hpp"
#include "singletons/Settings.hpp"
#include "util/FormatTime.hpp"

#include <QJsonArray>
Expand Down Expand Up @@ -94,14 +93,34 @@ std::vector<MessagePtr> buildRecentMessages(

// Returns the URL to be used for querying the Recent Messages API for the
// given channel.
QUrl constructRecentMessagesUrl(const QString &name)
QUrl constructRecentMessagesUrl(
const QString &name, const int limit,
const std::optional<std::chrono::time_point<std::chrono::system_clock>>
after,
const std::optional<std::chrono::time_point<std::chrono::system_clock>>
before)
{
QUrl url(Env::get().recentMessagesApiUrl.arg(name));
QUrlQuery urlQuery(url);
if (!urlQuery.hasQueryItem("limit"))
{
urlQuery.addQueryItem("limit", QString::number(limit));
}
if (after.has_value())
{
urlQuery.addQueryItem(
"after", QString::number(
std::chrono::duration_cast<std::chrono::milliseconds>(
after->time_since_epoch())
.count()));
}
if (before.has_value())
{
urlQuery.addQueryItem(
"limit", QString::number(getSettings()->twitchMessageHistoryLimit));
"before", QString::number(
std::chrono::duration_cast<std::chrono::milliseconds>(
before->time_since_epoch())
.count()));
}
url.setQuery(urlQuery);
return url;
Expand Down
7 changes: 6 additions & 1 deletion src/providers/recentmessages/Impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@
#include <QString>
#include <QUrl>

#include <chrono>
#include <memory>
#include <optional>
#include <vector>

namespace chatterino::recentmessages::detail {
Expand All @@ -24,6 +26,9 @@ std::vector<MessagePtr> buildRecentMessages(

// Returns the URL to be used for querying the Recent Messages API for the
// given channel.
QUrl constructRecentMessagesUrl(const QString &name);
QUrl constructRecentMessagesUrl(
const QString &name, int limit,
std::optional<std::chrono::time_point<std::chrono::system_clock>> after,
std::optional<std::chrono::time_point<std::chrono::system_clock>> before);

} // namespace chatterino::recentmessages::detail
1 change: 1 addition & 0 deletions src/providers/twitch/IrcMessageHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1136,6 +1136,7 @@ void IrcMessageHandler::handleJoinMessage(Communi::IrcMessage *message)
getApp()->accounts->twitch.getCurrent()->getUserName())
{
twitchChannel->addMessage(makeSystemMessage("joined channel"));
twitchChannel->joined.invoke();
}
else if (getSettings()->showJoins.getValue())
{
Expand Down
53 changes: 42 additions & 11 deletions src/providers/twitch/TwitchChannel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,17 +103,12 @@ TwitchChannel::TwitchChannel(const QString &name)

// We can safely ignore this signal connection this has no external dependencies - once the signal
// is destroyed, it will no longer be able to fire
std::ignore = this->connected.connect([this]() {
if (this->roomId().isEmpty())
std::ignore = this->joined.connect([this]() {
if (this->disconnectedAt_.has_value())
{
// If we get a reconnected event when the room id is not set, we
// just connected for the first time. After receiving the first
// message from a channel, setRoomId is called and further
// invocations of this event will load recent messages.
return;
this->loadRecentMessagesReconnect();
this->disconnectedAt_ = std::nullopt;
}

this->loadRecentMessagesReconnect();
});

// timers
Expand Down Expand Up @@ -1111,6 +1106,24 @@ bool TwitchChannel::setLive(bool newLiveStatus)
return true;
}

void TwitchChannel::markDisconnectedNow()
{
if (this->roomId().isEmpty())
{
// we were never joined in the first place
return;
}

if (this->disconnectedAt_.has_value())
{
// don't overwrite prior timestamp since
// a reconnection hasn't happened yet
return;
}

this->disconnectedAt_ = std::chrono::system_clock::now();
}

void TwitchChannel::loadRecentMessages()
{
if (!getSettings()->loadTwitchMessageHistoryOnConnect)
Expand Down Expand Up @@ -1163,7 +1176,9 @@ void TwitchChannel::loadRecentMessages()
return;

tc->loadingRecentMessages_.clear();
});
},
getSettings()->twitchMessageHistoryLimit.getValue(), std::nullopt,
std::nullopt, false);
}

void TwitchChannel::loadRecentMessagesReconnect()
Expand All @@ -1178,6 +1193,21 @@ void TwitchChannel::loadRecentMessagesReconnect()
return; // already loading
}

const auto now = std::chrono::system_clock::now();
int limit = getSettings()->twitchMessageHistoryLimit.getValue();
if (this->disconnectedAt_.has_value())
{
// calculate how many messages could have occured
// while we were not connected to the channel
// assuming a maximum of 10 messages per second
const auto secondsSinceDisconnect =
std::chrono::duration_cast<std::chrono::seconds>(
now - this->disconnectedAt_.value())
.count();
limit =
std::min(static_cast<int>(secondsSinceDisconnect + 1) * 10, limit);
}

auto weak = weakOf<Channel>(this);
recentmessages::load(
this->getName(), weak,
Expand All @@ -1203,7 +1233,8 @@ void TwitchChannel::loadRecentMessagesReconnect()
return;

tc->loadingRecentMessages_.clear();
});
},
limit, this->disconnectedAt_, now, true);
pajlada marked this conversation as resolved.
Show resolved Hide resolved
}

void TwitchChannel::refreshPubSub()
Expand Down
Loading
Loading