Skip to content

Commit

Permalink
DELIA-66306 : Fix idle connection
Browse files Browse the repository at this point in the history
Reason for change: When connection was idle but not
shut down RPC fails with Connection reset by peer.
Wait until the channel is ready before making the call.
Also, log uri, and get security token on startup only.
Test Procedure: None
Risks: None
Signed-off-by: Nikita Poltorapavlo <[email protected]>
  • Loading branch information
npoltorapavlo committed Oct 15, 2024
1 parent 491028c commit d660e6f
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 41 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/BuildThunder.sh
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ set -e
############################
# 1. Install Dependencies

sudo apt install -y build-essential cmake ninja-build libusb-1.0-0-dev zlib1g-dev libssl-dev
sudo apt install -y build-essential pkg-config cmake ninja-build libusb-1.0-0-dev zlib1g-dev libssl-dev

pip install jsonref
pip install --break-system-packages jsonref

############################
# 2. Build Thunder Tools
Expand Down
20 changes: 20 additions & 0 deletions CloudStore/CloudStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,26 @@ namespace Plugin {

Core::SystemInfo::SetEnvironment(URI_ENV, uri);

SYSLOG(Logging::Startup, (_T("grpc endpoint is %s"), uri.c_str()));

string token;
auto security = _service->QueryInterfaceByCallsign<
PluginHost::IAuthenticate>("SecurityAgent");
if (security != nullptr) {
string payload = "http://localhost";
auto ret = security->CreateToken(
static_cast<uint16_t>(payload.length()),
reinterpret_cast<const uint8_t*>(payload.c_str()),
token);
if (ret != Core::ERROR_NONE) {
SYSLOG(Logging::Startup,
(_T("Couldn't create token: %d"), ret));
}
security->Release();
}

Core::SystemInfo::SetEnvironment(TOKEN_ENV, token);

_service->Register(&_notification);

_store2 = _service->Root<Exchange::IStore2>(_connectionId, RPC::CommunicationTimeOut, _T("CloudStoreImplementation"));
Expand Down
3 changes: 1 addition & 2 deletions CloudStore/Module.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,13 @@
#endif

#define URI_ENV "CLOUDSTORE_URI"
#define TOKEN_ENV "CLOUDSTORE_TOKEN"
#define IARM_INIT_NAME "Thunder_Plugins"
#define URI_RFC "Device.DeviceInfo.X_RDKCENTRAL-COM_RFC.CloudStore.Uri"
#define PARTNER_ID_FILENAME "/opt/www/authService/partnerId3.dat"
#define ACCOUNT_ID_FILENAME "/opt/www/authService/said.dat"
#define DEVICE_ID_FILENAME "/opt/www/authService/xdeviceid.dat"
#define SECURITY_AGENT_FILENAME "/tmp/SecurityAgent/token"
#define IARM_TIMEOUT 1000
#define COM_RPC_TIMEOUT 1000
#define JSON_RPC_TIMEOUT 2000
#define GRPC_TIMEOUT 3000

Expand Down
45 changes: 9 additions & 36 deletions CloudStore/grpc/Store2.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,13 @@ namespace Plugin {

public:
Store2()
: Store2(getenv(URI_ENV))
: Store2(getenv(URI_ENV), getenv(TOKEN_ENV))
{
}
Store2(const string& uri)
Store2(const string& uri, const string& token)
: IStore2()
, _uri(uri)
, _token(token)
, _authorization((_uri.find("localhost") == string::npos) && (_uri.find("0.0.0.0") == string::npos))
{
Open();
Expand Down Expand Up @@ -115,47 +116,14 @@ namespace Plugin {
#endif
return true;
}
string GetSecurityToken() const
{
// Get actual token, as it may change at any time...
string result;

const char* endpoint = ::getenv(_T("SECURITYAGENT_PATH"));
if (endpoint == nullptr) {
endpoint = SECURITY_AGENT_FILENAME;
}
auto engine = Core::ProxyType<RPC::InvokeServerType<1, 0, 4>>::Create();
auto client = Core::ProxyType<RPC::CommunicatorClient>::Create(
Core::NodeId(endpoint),
Core::ProxyType<Core::IIPCServer>(engine));

auto interface = client->Open<PluginHost::IAuthenticate>(
_T("SecurityAgent"),
static_cast<uint32_t>(~0),
COM_RPC_TIMEOUT); // Timeout
if (interface != nullptr) {
string payload = _T("http://localhost");
// If main process is out of threads, this can time out, and IPC will mess up...
auto error = interface->CreateToken(
static_cast<uint16_t>(payload.length()),
reinterpret_cast<const uint8_t*>(payload.c_str()),
result);
if (error != Core::ERROR_NONE) {
TRACE(Trace::Error, (_T("security token error %d"), error));
}
interface->Release();
}

return result;
}
string GetToken() const
{
// Get actual token, as it may change at any time...
string result;

Core::SystemInfo::SetEnvironment(_T("THUNDER_ACCESS"), (_T("127.0.0.1:9998")));
auto link = Core::ProxyType<JSONRPC::LinkType<Core::JSON::IElement>>::Create(
_T("org.rdk.AuthService"), _T(""), false, "token=" + GetSecurityToken());
_T("org.rdk.AuthService"), _T(""), false, "token=" + _token);

JsonObject json;
auto status = link->Invoke<JsonObject, JsonObject>(
Expand Down Expand Up @@ -233,6 +201,7 @@ namespace Plugin {
if (_authorization) {
context.AddMetadata("authorization", "Bearer " + GetToken());
}
context.set_wait_for_ready(true); // Wait until the channel is ready
context.set_deadline(std::chrono::system_clock::now() + std::chrono::milliseconds(GRPC_TIMEOUT)); // Timeout
::distp::gateway::secure_storage::v1::UpdateValueRequest request;
request.set_partner_id(GetPartnerId());
Expand Down Expand Up @@ -282,6 +251,7 @@ namespace Plugin {
if (_authorization) {
context.AddMetadata("authorization", "Bearer " + GetToken());
}
context.set_wait_for_ready(true); // Wait until the channel is ready
context.set_deadline(std::chrono::system_clock::now() + std::chrono::milliseconds(GRPC_TIMEOUT)); // Timeout
::distp::gateway::secure_storage::v1::GetValueRequest request;
request.set_partner_id(GetPartnerId());
Expand Down Expand Up @@ -343,6 +313,7 @@ namespace Plugin {
if (_authorization) {
context.AddMetadata("authorization", "Bearer " + GetToken());
}
context.set_wait_for_ready(true); // Wait until the channel is ready
context.set_deadline(std::chrono::system_clock::now() + std::chrono::milliseconds(GRPC_TIMEOUT)); // Timeout
::distp::gateway::secure_storage::v1::DeleteValueRequest request;
request.set_partner_id(GetPartnerId());
Expand Down Expand Up @@ -381,6 +352,7 @@ namespace Plugin {
if (_authorization) {
context.AddMetadata("authorization", "Bearer " + GetToken());
}
context.set_wait_for_ready(true); // Wait until the channel is ready
context.set_deadline(std::chrono::system_clock::now() + std::chrono::milliseconds(GRPC_TIMEOUT)); // Timeout
::distp::gateway::secure_storage::v1::DeleteAllValuesRequest request;
request.set_partner_id(GetPartnerId());
Expand Down Expand Up @@ -430,6 +402,7 @@ namespace Plugin {

private:
const string _uri;
const string _token;
const bool _authorization;
std::unique_ptr<::distp::gateway::secure_storage::v1::SecureStorageService::Stub> _stub;
std::list<INotification*> _clients;
Expand Down
2 changes: 1 addition & 1 deletion CloudStore/grpc/l0test/Store2Test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class AStore2 : public Test {
: workerPool(WPEFramework::Core::ProxyType<WorkerPoolImplementation>::Create(
WPEFramework::Core::Thread::DefaultStackSize()))
, server(kUri, &service)
, store2(WPEFramework::Core::ProxyType<Store2>::Create(kUri))
, store2(WPEFramework::Core::ProxyType<Store2>::Create(kUri, ""))
{
WPEFramework::Core::IWorkerPool::Assign(&(*workerPool));
}
Expand Down

0 comments on commit d660e6f

Please sign in to comment.