From 74da143066ba659aff80fecbd79f7061308fa3cf Mon Sep 17 00:00:00 2001 From: Anand Kandasamy <37086488+anand-ky@users.noreply.github.com> Date: Wed, 4 Sep 2024 12:14:53 -0400 Subject: [PATCH 1/3] Merge pull request #5647 from npoltorapavlo/DELIA-66118 DELIA-66118: CloudStore improvements --- .github/workflows/L2-CloudStore.yml | 2 +- CloudStore/Module.h | 8 + CloudStore/grpc/Store2.h | 181 +++++++++++++----- CloudStore/grpc/l0test/Store2Test.cpp | 11 +- .../grpc/l0test/WorkerPoolImplementation.h | 49 +++++ CloudStore/l0test/ServiceMock.h | 1 + 6 files changed, 206 insertions(+), 46 deletions(-) create mode 100644 CloudStore/grpc/l0test/WorkerPoolImplementation.h diff --git a/.github/workflows/L2-CloudStore.yml b/.github/workflows/L2-CloudStore.yml index 4f76c4b7fd..33ab72540e 100644 --- a/.github/workflows/L2-CloudStore.yml +++ b/.github/workflows/L2-CloudStore.yml @@ -37,4 +37,4 @@ jobs: # Usage: # PATH=${PWD}/install/bin:${PATH} LD_LIBRARY_PATH=${PWD}/install/lib:${LD_LIBRARY_PATH} valgrind --tool=memcheck --log-file=valgrind_log --leak-check=yes --show-reachable=yes --track-fds=yes --fair-sched=try Thunder -f -c ${PWD}/install/etc/Thunder/config.json # (to stop press q & enter) -# curl -d '{"jsonrpc":"2.0","id":0,"method":"org.rdk.CloudStore.setValue","params":{"namespace":"test","key":"key1","value":"1","ttl":100}}' http://localhost:55555/jsonrpc +# curl -d '{"jsonrpc":"2.0","id":0,"method":"org.rdk.CloudStore.setValue","params":{"scope":"account","namespace":"test","key":"key1","value":"1","ttl":100}}' http://localhost:55555/jsonrpc diff --git a/CloudStore/Module.h b/CloudStore/Module.h index 4901b08640..7892d8a47b 100644 --- a/CloudStore/Module.h +++ b/CloudStore/Module.h @@ -32,6 +32,14 @@ #define URI_ENV "CLOUDSTORE_URI" #define IARM_INIT_NAME "Thunder_Plugins" #define URI_RFC "Device.DeviceInfo.X_RDKCENTRAL-COM_RFC.CloudStore.Uri" +#define PARTNER_ID_FILENAME "/opt/www/authService/partnerId3.dat" +#define ACCOUNT_ID_FILENAME "/opt/www/authService/said.dat" +#define DEVICE_ID_FILENAME "/opt/www/authService/xdeviceid.dat" +#define SECURITY_AGENT_FILENAME "/tmp/SecurityAgent/token" +#define IARM_TIMEOUT 1000 +#define COM_RPC_TIMEOUT 1000 +#define JSON_RPC_TIMEOUT 2000 +#define GRPC_TIMEOUT 3000 #undef EXTERNAL #define EXTERNAL diff --git a/CloudStore/grpc/Store2.h b/CloudStore/grpc/Store2.h index bf8d862e23..e007bd9226 100644 --- a/CloudStore/grpc/Store2.h +++ b/CloudStore/grpc/Store2.h @@ -21,6 +21,7 @@ #include "../Module.h" #include "secure_storage.grpc.pb.h" +#include #include #include #ifdef WITH_SYSMGR @@ -33,6 +34,35 @@ namespace Plugin { namespace Grpc { class Store2 : public Exchange::IStore2 { + private: + class Job : public Core::IDispatch { + public: + Job(Store2* parent, const ScopeType scope, const string& ns, const string& key, const string& value) + : _parent(parent) + , _scope(scope) + , _ns(ns) + , _key(key) + , _value(value) + { + _parent->AddRef(); + } + ~Job() override + { + _parent->Release(); + } + void Dispatch() override + { + _parent->OnValueChanged(_scope, _ns, _key, _value); + } + + private: + Store2* _parent; + const ScopeType _scope; + const string _ns; + const string _key; + const string _value; + }; + private: Store2(const Store2&) = delete; Store2& operator=(const Store2&) = delete; @@ -45,6 +75,7 @@ namespace Plugin { Store2(const string& uri) : IStore2() , _uri(uri) + , _authorization((_uri.find("localhost") == string::npos) && (_uri.find("0.0.0.0") == string::npos)) { Open(); } @@ -54,7 +85,7 @@ namespace Plugin { void Open() { std::shared_ptr creds; - if ((_uri.find("localhost") == string::npos) && (_uri.find("0.0.0.0") == string::npos)) { + if (_authorization) { creds = grpc::SslCredentials(grpc::SslCredentialsOptions()); } else { creds = grpc::InsecureChannelCredentials(); @@ -62,61 +93,107 @@ namespace Plugin { _stub = ::distp::gateway::secure_storage::v1::SecureStorageService::NewStub( grpc::CreateChannel(_uri, creds)); } - static bool IsTimeSynced() + + private: + bool IsTimeSynced() const { #ifdef WITH_SYSMGR + // Get actual state, as it may change at any time... IARM_Bus_Init(IARM_INIT_NAME); IARM_Bus_Connect(); IARM_Bus_SYSMgr_GetSystemStates_Param_t param; - if ((IARM_Bus_Call(IARM_BUS_SYSMGR_NAME, IARM_BUS_SYSMGR_API_GetSystemStates, ¶m, sizeof(param)) != IARM_RESULT_SUCCESS) + if ((IARM_Bus_Call_with_IPCTimeout( + IARM_BUS_SYSMGR_NAME, + IARM_BUS_SYSMGR_API_GetSystemStates, + ¶m, + sizeof(param), + IARM_TIMEOUT) // Timeout + != IARM_RESULT_SUCCESS) || !param.time_source.state) { return false; } #endif return true; } - static string ExecuteCmd(const char* cmd) + string GetSecurityToken() const { + // Get actual token, as it may change at any time... string result; - auto pipe = popen(cmd, "r"); - if (pipe != nullptr) { - char buffer[128]; - while (fgets(buffer, sizeof buffer, pipe) != nullptr) { - result += buffer; + + const char* endpoint = ::getenv(_T("SECURITYAGENT_PATH")); + if (endpoint == nullptr) { + endpoint = SECURITY_AGENT_FILENAME; + } + auto engine = Core::ProxyType>::Create(); + auto client = Core::ProxyType::Create( + Core::NodeId(endpoint), + Core::ProxyType(engine)); + + auto interface = client->Open( + _T("SecurityAgent"), + static_cast(~0), + COM_RPC_TIMEOUT); // Timeout + if (interface != nullptr) { + string payload = _T("http://localhost"); + // If main process is out of threads, this can time out, and IPC will mess up... + auto error = interface->CreateToken( + static_cast(payload.length()), + reinterpret_cast(payload.c_str()), + result); + if (error != Core::ERROR_NONE) { + TRACE(Trace::Error, (_T("security token error %d"), error)); } - pclose(pipe); - result.erase(result.find_last_not_of(" \n\r\t") + 1); + interface->Release(); } + return result; } string GetToken() const { - // TODO remove this - return ExecuteCmd("curl -H \"Authorization: Bearer `WPEFrameworkSecurityUtility 2>/dev/null | cut -d '\"' -f 4`\" -s -d '{\"jsonrpc\":\"2.0\",\"id\":0,\"method\":\"org.rdk.AuthService.getServiceAccessToken\"}' http://127.0.0.1:9998/jsonrpc | cut -d '\"' -f 14"); - } - static string ReadFromFile(const char* filename) - { + // Get actual token, as it may change at any time... string result; - Core::File file(filename); - if (file.Open(true)) { - uint8_t buffer[1024]; - auto size = file.Read(buffer, 1024); - result.assign(reinterpret_cast(buffer), size); - result.erase(result.find_last_not_of(" \n\r\t") + 1); + + Core::SystemInfo::SetEnvironment(_T("THUNDER_ACCESS"), (_T("127.0.0.1:9998"))); + auto link = Core::ProxyType>::Create( + _T("org.rdk.AuthService"), _T(""), false, "token=" + GetSecurityToken()); + + JsonObject json; + auto status = link->Invoke( + JSON_RPC_TIMEOUT, // Timeout + _T("getServiceAccessToken"), + JsonObject(), + json); + if (status == Core::ERROR_NONE) { + result = json[_T("token")].String(); + } else { + TRACE(Trace::Error, (_T("sat status %d"), status)); } + return result; } string GetPartnerId() const { - return ReadFromFile("/opt/www/authService/partnerId3.dat"); + // Get actual id, as it may change at any time... + std::ifstream input(PARTNER_ID_FILENAME); + string line; + getline(input, line); + return line; } string GetAccountId() const { - return ReadFromFile("/opt/www/authService/said.dat"); + // Get actual id, as it may change at any time... + std::ifstream input(ACCOUNT_ID_FILENAME); + string line; + getline(input, line); + return line; } string GetDeviceId() const { - return ReadFromFile("/opt/www/authService/xdeviceid.dat"); + // Get actual id, as it may change at any time... + std::ifstream input(DEVICE_ID_FILENAME); + string line; + getline(input, line); + return line; } public: @@ -150,14 +227,13 @@ namespace Plugin { uint32_t SetValue(const ScopeType scope, const string& ns, const string& key, const string& value, const uint32_t ttl) override { - ASSERT(scope == ScopeType::ACCOUNT); - uint32_t result; grpc::ClientContext context; - if ((_uri.find("localhost") == string::npos) && (_uri.find("0.0.0.0") == string::npos)) { + if (_authorization) { context.AddMetadata("authorization", "Bearer " + GetToken()); } + context.set_deadline(std::chrono::system_clock::now() + std::chrono::milliseconds(GRPC_TIMEOUT)); // Timeout ::distp::gateway::secure_storage::v1::UpdateValueRequest request; request.set_partner_id(GetPartnerId()); request.set_account_id(GetAccountId()); @@ -172,14 +248,20 @@ namespace Plugin { auto k = new ::distp::gateway::secure_storage::v1::Key(); k->set_app_id(ns); k->set_key(key); - k->set_scope(::distp::gateway::secure_storage::v1::Scope::SCOPE_ACCOUNT); + k->set_scope(scope == ScopeType::ACCOUNT + ? ::distp::gateway::secure_storage::v1::Scope::SCOPE_ACCOUNT + : (scope == ScopeType::DEVICE + ? ::distp::gateway::secure_storage::v1::Scope::SCOPE_DEVICE + : ::distp::gateway::secure_storage::v1::Scope::SCOPE_UNSPECIFIED)); v->set_allocated_key(k); request.set_allocated_value(v); ::distp::gateway::secure_storage::v1::UpdateValueResponse response; auto status = _stub->UpdateValue(&context, request, &response); if (status.ok()) { - OnValueChanged(ns, key, value); + Core::IWorkerPool::Instance().Submit(Core::ProxyType( + Core::ProxyType::Create(this, scope, ns, key, value))); // Decouple notification + result = Core::ERROR_NONE; } else { OnError(__FUNCTION__, status); @@ -194,14 +276,13 @@ namespace Plugin { } uint32_t GetValue(const ScopeType scope, const string& ns, const string& key, string& value, uint32_t& ttl) override { - ASSERT(scope == ScopeType::ACCOUNT); - uint32_t result; grpc::ClientContext context; - if ((_uri.find("localhost") == string::npos) && (_uri.find("0.0.0.0") == string::npos)) { + if (_authorization) { context.AddMetadata("authorization", "Bearer " + GetToken()); } + context.set_deadline(std::chrono::system_clock::now() + std::chrono::milliseconds(GRPC_TIMEOUT)); // Timeout ::distp::gateway::secure_storage::v1::GetValueRequest request; request.set_partner_id(GetPartnerId()); request.set_account_id(GetAccountId()); @@ -209,7 +290,11 @@ namespace Plugin { auto k = new ::distp::gateway::secure_storage::v1::Key(); k->set_app_id(ns); k->set_key(key); - k->set_scope(::distp::gateway::secure_storage::v1::Scope::SCOPE_ACCOUNT); + k->set_scope(scope == ScopeType::ACCOUNT + ? ::distp::gateway::secure_storage::v1::Scope::SCOPE_ACCOUNT + : (scope == ScopeType::DEVICE + ? ::distp::gateway::secure_storage::v1::Scope::SCOPE_DEVICE + : ::distp::gateway::secure_storage::v1::Scope::SCOPE_UNSPECIFIED)); request.set_allocated_key(k); ::distp::gateway::secure_storage::v1::GetValueResponse response; auto status = _stub->GetValue(&context, request, &response); @@ -252,14 +337,13 @@ namespace Plugin { } uint32_t DeleteKey(const ScopeType scope, const string& ns, const string& key) override { - ASSERT(scope == ScopeType::ACCOUNT); - uint32_t result; grpc::ClientContext context; - if ((_uri.find("localhost") == string::npos) && (_uri.find("0.0.0.0") == string::npos)) { + if (_authorization) { context.AddMetadata("authorization", "Bearer " + GetToken()); } + context.set_deadline(std::chrono::system_clock::now() + std::chrono::milliseconds(GRPC_TIMEOUT)); // Timeout ::distp::gateway::secure_storage::v1::DeleteValueRequest request; request.set_partner_id(GetPartnerId()); request.set_account_id(GetAccountId()); @@ -267,7 +351,11 @@ namespace Plugin { auto k = new ::distp::gateway::secure_storage::v1::Key(); k->set_app_id(ns); k->set_key(key); - k->set_scope(::distp::gateway::secure_storage::v1::Scope::SCOPE_ACCOUNT); + k->set_scope(scope == ScopeType::ACCOUNT + ? ::distp::gateway::secure_storage::v1::Scope::SCOPE_ACCOUNT + : (scope == ScopeType::DEVICE + ? ::distp::gateway::secure_storage::v1::Scope::SCOPE_DEVICE + : ::distp::gateway::secure_storage::v1::Scope::SCOPE_UNSPECIFIED)); request.set_allocated_key(k); ::distp::gateway::secure_storage::v1::DeleteValueResponse response; auto status = _stub->DeleteValue(&context, request, &response); @@ -287,20 +375,23 @@ namespace Plugin { } uint32_t DeleteNamespace(const ScopeType scope, const string& ns) override { - ASSERT(scope == ScopeType::ACCOUNT); - uint32_t result; grpc::ClientContext context; - if ((_uri.find("localhost") == string::npos) && (_uri.find("0.0.0.0") == string::npos)) { + if (_authorization) { context.AddMetadata("authorization", "Bearer " + GetToken()); } + context.set_deadline(std::chrono::system_clock::now() + std::chrono::milliseconds(GRPC_TIMEOUT)); // Timeout ::distp::gateway::secure_storage::v1::DeleteAllValuesRequest request; request.set_partner_id(GetPartnerId()); request.set_account_id(GetAccountId()); request.set_device_id(GetDeviceId()); request.set_app_id(ns); - request.set_scope(::distp::gateway::secure_storage::v1::Scope::SCOPE_ACCOUNT); + request.set_scope(scope == ScopeType::ACCOUNT + ? ::distp::gateway::secure_storage::v1::Scope::SCOPE_ACCOUNT + : (scope == ScopeType::DEVICE + ? ::distp::gateway::secure_storage::v1::Scope::SCOPE_DEVICE + : ::distp::gateway::secure_storage::v1::Scope::SCOPE_UNSPECIFIED)); ::distp::gateway::secure_storage::v1::DeleteAllValuesResponse response; auto status = _stub->DeleteAllValues(&context, request, &response); @@ -319,7 +410,7 @@ namespace Plugin { END_INTERFACE_MAP private: - void OnValueChanged(const string& ns, const string& key, const string& value) + void OnValueChanged(const ScopeType scope, const string& ns, const string& key, const string& value) { Core::SafeSyncType lock(_clientLock); @@ -327,7 +418,8 @@ namespace Plugin { index(_clients.begin()); while (index != _clients.end()) { - (*index)->ValueChanged(ScopeType::ACCOUNT, ns, key, value); + // If main process is out of threads, this can time out, and IPC will mess up... + (*index)->ValueChanged(scope, ns, key, value); index++; } } @@ -338,6 +430,7 @@ namespace Plugin { private: const string _uri; + const bool _authorization; std::unique_ptr<::distp::gateway::secure_storage::v1::SecureStorageService::Stub> _stub; std::list _clients; Core::CriticalSection _clientLock; diff --git a/CloudStore/grpc/l0test/Store2Test.cpp b/CloudStore/grpc/l0test/Store2Test.cpp index 7e4a216807..8f23703639 100644 --- a/CloudStore/grpc/l0test/Store2Test.cpp +++ b/CloudStore/grpc/l0test/Store2Test.cpp @@ -4,6 +4,7 @@ #include "../Store2.h" #include "SecureStorageServiceMock.h" #include "Server.h" +#include "WorkerPoolImplementation.h" using ::distp::gateway::secure_storage::v1::DeleteAllValuesRequest; using ::distp::gateway::secure_storage::v1::DeleteAllValuesResponse; @@ -39,13 +40,21 @@ const auto kScope = Scope::SCOPE_ACCOUNT; class AStore2 : public Test { protected: + WPEFramework::Core::ProxyType workerPool; NiceMock service; Server server; WPEFramework::Core::ProxyType store2; AStore2() - : server(kUri, &service) + : workerPool(WPEFramework::Core::ProxyType::Create( + WPEFramework::Core::Thread::DefaultStackSize())) + , server(kUri, &service) , store2(WPEFramework::Core::ProxyType::Create(kUri)) { + WPEFramework::Core::IWorkerPool::Assign(&(*workerPool)); + } + ~AStore2() override + { + WPEFramework::Core::IWorkerPool::Assign(nullptr); } }; diff --git a/CloudStore/grpc/l0test/WorkerPoolImplementation.h b/CloudStore/grpc/l0test/WorkerPoolImplementation.h new file mode 100644 index 0000000000..f782fa5489 --- /dev/null +++ b/CloudStore/grpc/l0test/WorkerPoolImplementation.h @@ -0,0 +1,49 @@ +#pragma once + +#include "../../Module.h" + +class WorkerPoolImplementation + : public WPEFramework::Core::WorkerPool, + public WPEFramework::Core::ThreadPool::ICallback { +private: + class Dispatcher : public WPEFramework::Core::ThreadPool::IDispatcher { + public: + Dispatcher(const Dispatcher&) = delete; + Dispatcher& operator=(const Dispatcher&) = delete; + Dispatcher() = default; + ~Dispatcher() override = default; + + private: + void Initialize() override + { + } + void Deinitialize() override + { + } + void Dispatch(WPEFramework::Core::IDispatch* job) override + { + job->Dispatch(); + } + }; + +public: + WorkerPoolImplementation() = delete; + WorkerPoolImplementation(const WorkerPoolImplementation&) = delete; + WorkerPoolImplementation& operator=(const WorkerPoolImplementation&) = delete; + WorkerPoolImplementation(const uint32_t stackSize) + : WPEFramework::Core::WorkerPool(4 /*threadCount*/, stackSize, 32 /*queueSize*/, &_dispatch, this) + , _dispatch() + { + Run(); + } + ~WorkerPoolImplementation() override + { + Stop(); + } + void Idle() override + { + } + +private: + Dispatcher _dispatch; +}; diff --git a/CloudStore/l0test/ServiceMock.h b/CloudStore/l0test/ServiceMock.h index 63fa33379f..c9efdb8da5 100644 --- a/CloudStore/l0test/ServiceMock.h +++ b/CloudStore/l0test/ServiceMock.h @@ -53,6 +53,7 @@ class ServiceMock : public WPEFramework::PluginHost::IShell, MOCK_METHOD(void, Unregister, (const IShell::ICOMLink::INotification*), (override)); MOCK_METHOD(WPEFramework::RPC::IRemoteConnection*, RemoteConnection, (const uint32_t), (override)); MOCK_METHOD(void*, Instantiate, (const WPEFramework::RPC::Object&, const uint32_t, uint32_t&), (override)); + MOCK_METHOD(WPEFramework::RPC::IStringIterator*, GetLibrarySearchPaths, (const string&), (const, override)); BEGIN_INTERFACE_MAP(ServiceMock) INTERFACE_ENTRY(IShell) INTERFACE_ENTRY(IShell::ICOMLink) From 8aa66c1c3f74fccb8f358a373f3df2b7a6f9ae6b Mon Sep 17 00:00:00 2001 From: Anand Kandasamy <37086488+anand-ky@users.noreply.github.com> Date: Tue, 10 Sep 2024 15:40:34 -0400 Subject: [PATCH 2/3] Merge pull request #5688 from npoltorapavlo/DELIA-66118-persistentstore DELIA-66118: PersistentStore improvements --- PersistentStore/Module.h | 2 + PersistentStore/l0test/ServiceMock.h | 1 + PersistentStore/l1test/ServiceMock.h | 1 + PersistentStore/sqlite/Store2.h | 112 +++++++++++++++--- PersistentStore/sqlite/l1test/Store2Test.cpp | 11 +- .../sqlite/l1test/WorkerPoolImplementation.h | 49 ++++++++ 6 files changed, 157 insertions(+), 19 deletions(-) create mode 100644 PersistentStore/sqlite/l1test/WorkerPoolImplementation.h diff --git a/PersistentStore/Module.h b/PersistentStore/Module.h index 342536a741..88ac1288c7 100644 --- a/PersistentStore/Module.h +++ b/PersistentStore/Module.h @@ -34,6 +34,8 @@ #define MAXVALUE_ENV "PERSISTENTSTORE_MAXVALUE" #define LIMIT_ENV "PERSISTENTSTORE_LIMIT" #define IARM_INIT_NAME "Thunder_Plugins" +#define IARM_TIMEOUT 1000 +#define SQLITE_TIMEOUT 1000 #undef EXTERNAL #define EXTERNAL diff --git a/PersistentStore/l0test/ServiceMock.h b/PersistentStore/l0test/ServiceMock.h index 63fa33379f..c9efdb8da5 100644 --- a/PersistentStore/l0test/ServiceMock.h +++ b/PersistentStore/l0test/ServiceMock.h @@ -53,6 +53,7 @@ class ServiceMock : public WPEFramework::PluginHost::IShell, MOCK_METHOD(void, Unregister, (const IShell::ICOMLink::INotification*), (override)); MOCK_METHOD(WPEFramework::RPC::IRemoteConnection*, RemoteConnection, (const uint32_t), (override)); MOCK_METHOD(void*, Instantiate, (const WPEFramework::RPC::Object&, const uint32_t, uint32_t&), (override)); + MOCK_METHOD(WPEFramework::RPC::IStringIterator*, GetLibrarySearchPaths, (const string&), (const, override)); BEGIN_INTERFACE_MAP(ServiceMock) INTERFACE_ENTRY(IShell) INTERFACE_ENTRY(IShell::ICOMLink) diff --git a/PersistentStore/l1test/ServiceMock.h b/PersistentStore/l1test/ServiceMock.h index 63fa33379f..c9efdb8da5 100644 --- a/PersistentStore/l1test/ServiceMock.h +++ b/PersistentStore/l1test/ServiceMock.h @@ -53,6 +53,7 @@ class ServiceMock : public WPEFramework::PluginHost::IShell, MOCK_METHOD(void, Unregister, (const IShell::ICOMLink::INotification*), (override)); MOCK_METHOD(WPEFramework::RPC::IRemoteConnection*, RemoteConnection, (const uint32_t), (override)); MOCK_METHOD(void*, Instantiate, (const WPEFramework::RPC::Object&, const uint32_t, uint32_t&), (override)); + MOCK_METHOD(WPEFramework::RPC::IStringIterator*, GetLibrarySearchPaths, (const string&), (const, override)); BEGIN_INTERFACE_MAP(ServiceMock) INTERFACE_ENTRY(IShell) INTERFACE_ENTRY(IShell::ICOMLink) diff --git a/PersistentStore/sqlite/Store2.h b/PersistentStore/sqlite/Store2.h index 1b9b9bbb13..4091f48661 100644 --- a/PersistentStore/sqlite/Store2.h +++ b/PersistentStore/sqlite/Store2.h @@ -36,6 +36,35 @@ namespace Plugin { public Exchange::IStoreCache, public Exchange::IStoreInspector, public Exchange::IStoreLimit { + private: + class Job : public Core::IDispatch { + public: + Job(Store2* parent, const IStore2::ScopeType scope, const string& ns, const string& key, const string& value) + : _parent(parent) + , _scope(scope) + , _ns(ns) + , _key(key) + , _value(value) + { + _parent->AddRef(); + } + ~Job() override + { + _parent->Release(); + } + void Dispatch() override + { + _parent->OnValueChanged(_scope, _ns, _key, _value); + } + + private: + Store2* _parent; + const IStore2::ScopeType _scope; + const string _ns; + const string _key; + const string _value; + }; + private: Store2(const Store2&) = delete; Store2& operator=(const Store2&) = delete; @@ -75,22 +104,59 @@ namespace Plugin { if (rc != SQLITE_OK) { OnError(__FUNCTION__, rc); } + rc = sqlite3_busy_timeout(_data, SQLITE_TIMEOUT); // Timeout + if (rc != SQLITE_OK) { + OnError(__FUNCTION__, rc); + } const std::vector statements = { "pragma foreign_keys = on;", - "pragma busy_timeout = 1000000;", - "create table if not exists namespace (id integer primary key,name text unique);", - "create table if not exists item (ns integer,key text,value text,foreign key(ns) references namespace(id) on delete cascade on update no action,unique(ns,key) on conflict replace);", - "create table if not exists limits (n integer,size integer,foreign key(n) references namespace(id) on delete cascade on update no action,unique(n) on conflict replace);", + "create table if not exists namespace" + " (id integer primary key,name text unique);", + "create table if not exists item" + " (ns integer,key text,value text," + "foreign key(ns) references namespace(id) on delete cascade on update no action," + "unique(ns,key) on conflict replace);", + "create table if not exists limits" + " (n integer,size integer," + "foreign key(n) references namespace(id) on delete cascade on update no action," + "unique(n) on conflict replace);", "alter table item add column ttl integer;", - "create temporary trigger if not exists ns_empty insert on namespace begin select case when length(new.name) = 0 then raise (fail, 'empty') end; end;", - "create temporary trigger if not exists key_empty insert on item begin select case when length(new.key) = 0 then raise (fail, 'empty') end; end;", - "create temporary trigger if not exists ns_maxvalue insert on namespace begin select case when length(new.name) > " + std::to_string(_maxValue) + " then raise (fail, 'max value') end; end;", - "create temporary trigger if not exists key_maxvalue insert on item begin select case when length(new.key) > " + std::to_string(_maxValue) + " then raise (fail, 'max value') end; end;", - "create temporary trigger if not exists value_maxvalue insert on item begin select case when length(new.value) > " + std::to_string(_maxValue) + " then raise (fail, 'max value') end; end;", - "create temporary trigger if not exists ns_maxsize insert on namespace begin select case when (select sum(s) from (select sum(length(key)+length(value)) s from item union all select sum(length(name)) s from namespace union all select length(new.name) s)) > " + std::to_string(_maxSize) + " then raise (fail, 'max size') end; end;", - "create temporary trigger if not exists item_maxsize insert on item begin select case when (select sum(s) from (select sum(length(key)+length(value)) s from item union all select sum(length(name)) s from namespace union all select length(new.key)+length(new.value) s)) > " + std::to_string(_maxSize) + " then raise (fail, 'max size') end; end;", - "create temporary trigger if not exists item_limit_default insert on item begin select case when (select length(new.key)+length(new.value)+sum(length(key)+length(value)) from item where ns = new.ns) > " + std::to_string(_limit) + " then raise (fail, 'limit') end; end;", - "create temporary trigger if not exists item_limit insert on item begin select case when (select size-length(new.key)-length(new.value)-sum(length(key)+length(value)) from limits inner join item on limits.n = item.ns where n = new.ns) < 0 then raise (fail, 'limit') end; end;" + "create temporary trigger if not exists ns_empty insert on namespace" + " begin select case when length(new.name) = 0" + " then raise (fail, 'empty') end; end;", + "create temporary trigger if not exists key_empty insert on item" + " begin select case when length(new.key) = 0" + " then raise (fail, 'empty') end; end;", + "create temporary trigger if not exists ns_maxvalue insert on namespace" + " begin select case when length(new.name) > " + + std::to_string(_maxValue) + " then raise (fail, 'max value') end; end;", + "create temporary trigger if not exists key_maxvalue insert on item" + " begin select case when length(new.key) > " + + std::to_string(_maxValue) + " then raise (fail, 'max value') end; end;", + "create temporary trigger if not exists value_maxvalue insert on item" + " begin select case when length(new.value) > " + + std::to_string(_maxValue) + " then raise (fail, 'max value') end; end;", + "create temporary trigger if not exists ns_maxsize insert on namespace" + " begin select case when" + " (select sum(s) from (select sum(length(key)+length(value)) s from item" + " union all select sum(length(name)) s from namespace" + " union all select length(new.name) s)) > " + + std::to_string(_maxSize) + " then raise (fail, 'max size') end; end;", + "create temporary trigger if not exists item_maxsize insert on item" + " begin select case when" + " (select sum(s) from (select sum(length(key)+length(value)) s from item" + " union all select sum(length(name)) s from namespace" + " union all select length(new.key)+length(new.value) s)) > " + + std::to_string(_maxSize) + " then raise (fail, 'max size') end; end;", + "create temporary trigger if not exists item_limit_default insert on item" + " begin select case when" + " (select length(new.key)+length(new.value)+sum(length(key)+length(value)) from item where ns = new.ns) > " + + std::to_string(_limit) + " then raise (fail, 'limit') end; end;", + "create temporary trigger if not exists item_limit insert on item" + " begin select case when" + " (select size-length(new.key)-length(new.value)-sum(length(key)+length(value)) from limits" + " inner join item on limits.n = item.ns where n = new.ns) < 0" + " then raise (fail, 'limit') end; end;" }; for (auto& sql : statements) { auto rc = sqlite3_exec(_data, sql.c_str(), nullptr, nullptr, nullptr); @@ -106,13 +172,20 @@ namespace Plugin { OnError(__FUNCTION__, rc); } } - static bool IsTimeSynced() + bool IsTimeSynced() const { #ifdef WITH_SYSMGR + // Get actual state, as it may change at any time... IARM_Bus_Init(IARM_INIT_NAME); IARM_Bus_Connect(); IARM_Bus_SYSMgr_GetSystemStates_Param_t param; - if ((IARM_Bus_Call(IARM_BUS_SYSMGR_NAME, IARM_BUS_SYSMGR_API_GetSystemStates, ¶m, sizeof(param)) != IARM_RESULT_SUCCESS) + if ((IARM_Bus_Call_with_IPCTimeout( + IARM_BUS_SYSMGR_NAME, + IARM_BUS_SYSMGR_API_GetSystemStates, + ¶m, + sizeof(param), + IARM_TIMEOUT) // Timeout + != IARM_RESULT_SUCCESS) || !param.time_source.state) { return false; } @@ -185,7 +258,9 @@ namespace Plugin { } if (rc == SQLITE_DONE) { - OnValueChanged(ns, key, value); + Core::IWorkerPool::Instance().Submit(Core::ProxyType( + Core::ProxyType::Create(this, scope, ns, key, value))); // Decouple notification + result = Core::ERROR_NONE; } else { OnError(__FUNCTION__, rc); @@ -489,7 +564,7 @@ namespace Plugin { END_INTERFACE_MAP private: - void OnValueChanged(const string& ns, const string& key, const string& value) + void OnValueChanged(const IStore2::ScopeType scope, const string& ns, const string& key, const string& value) { Core::SafeSyncType lock(_clientLock); @@ -497,7 +572,8 @@ namespace Plugin { index(_clients.begin()); while (index != _clients.end()) { - (*index)->ValueChanged(IStore2::ScopeType::DEVICE, ns, key, value); + // If main process is out of threads, this can time out, and IPC will mess up... + (*index)->ValueChanged(scope, ns, key, value); index++; } } diff --git a/PersistentStore/sqlite/l1test/Store2Test.cpp b/PersistentStore/sqlite/l1test/Store2Test.cpp index 2ee78da6a4..04454ed49e 100644 --- a/PersistentStore/sqlite/l1test/Store2Test.cpp +++ b/PersistentStore/sqlite/l1test/Store2Test.cpp @@ -3,6 +3,7 @@ #include "../Store2.h" #include "Store2NotificationMock.h" +#include "WorkerPoolImplementation.h" using ::testing::_; using ::testing::Eq; @@ -39,10 +40,18 @@ const auto kLimit40 = 40; class AStore2 : public Test { protected: + WPEFramework::Core::ProxyType workerPool; WPEFramework::Core::ProxyType store2; AStore2() - : store2(WPEFramework::Core::ProxyType::Create(kPath, kMaxSize, kMaxValue, kLimit)) + : workerPool(WPEFramework::Core::ProxyType::Create( + WPEFramework::Core::Thread::DefaultStackSize())) + , store2(WPEFramework::Core::ProxyType::Create(kPath, kMaxSize, kMaxValue, kLimit)) { + WPEFramework::Core::IWorkerPool::Assign(&(*workerPool)); + } + ~AStore2() override + { + WPEFramework::Core::IWorkerPool::Assign(nullptr); } }; diff --git a/PersistentStore/sqlite/l1test/WorkerPoolImplementation.h b/PersistentStore/sqlite/l1test/WorkerPoolImplementation.h new file mode 100644 index 0000000000..f782fa5489 --- /dev/null +++ b/PersistentStore/sqlite/l1test/WorkerPoolImplementation.h @@ -0,0 +1,49 @@ +#pragma once + +#include "../../Module.h" + +class WorkerPoolImplementation + : public WPEFramework::Core::WorkerPool, + public WPEFramework::Core::ThreadPool::ICallback { +private: + class Dispatcher : public WPEFramework::Core::ThreadPool::IDispatcher { + public: + Dispatcher(const Dispatcher&) = delete; + Dispatcher& operator=(const Dispatcher&) = delete; + Dispatcher() = default; + ~Dispatcher() override = default; + + private: + void Initialize() override + { + } + void Deinitialize() override + { + } + void Dispatch(WPEFramework::Core::IDispatch* job) override + { + job->Dispatch(); + } + }; + +public: + WorkerPoolImplementation() = delete; + WorkerPoolImplementation(const WorkerPoolImplementation&) = delete; + WorkerPoolImplementation& operator=(const WorkerPoolImplementation&) = delete; + WorkerPoolImplementation(const uint32_t stackSize) + : WPEFramework::Core::WorkerPool(4 /*threadCount*/, stackSize, 32 /*queueSize*/, &_dispatch, this) + , _dispatch() + { + Run(); + } + ~WorkerPoolImplementation() override + { + Stop(); + } + void Idle() override + { + } + +private: + Dispatcher _dispatch; +}; From 0576e7b928cb84cce55fd96f4f1117c79d3cf95b Mon Sep 17 00:00:00 2001 From: Nikita Poltorapavlo Date: Tue, 17 Sep 2024 13:53:22 +0300 Subject: [PATCH 3/3] update changelog and api version --- CloudStore/CHANGELOG.md | 4 ++++ CloudStore/CloudStore.cpp | 2 +- PersistentStore/CHANGELOG.md | 4 ++++ PersistentStore/PersistentStore.cpp | 2 +- 4 files changed, 10 insertions(+), 2 deletions(-) diff --git a/CloudStore/CHANGELOG.md b/CloudStore/CHANGELOG.md index 7864154892..286ecd7b3f 100644 --- a/CloudStore/CHANGELOG.md +++ b/CloudStore/CHANGELOG.md @@ -16,6 +16,10 @@ All notable changes to this RDK Service will be documented in this file. * For more details, refer to [versioning](https://github.com/rdkcentral/rdkservices#versioning) section under Main README. +## [1.0.1] - 2024-09-17 +### Fixed +- Decouple notification, add timeouts + ## [1.0.0] - 2024-07-15 ### Added - Add CHANGELOG diff --git a/CloudStore/CloudStore.cpp b/CloudStore/CloudStore.cpp index af842ca504..41d344c066 100644 --- a/CloudStore/CloudStore.cpp +++ b/CloudStore/CloudStore.cpp @@ -24,7 +24,7 @@ #define API_VERSION_NUMBER_MAJOR 1 #define API_VERSION_NUMBER_MINOR 0 -#define API_VERSION_NUMBER_PATCH 0 +#define API_VERSION_NUMBER_PATCH 1 namespace WPEFramework { diff --git a/PersistentStore/CHANGELOG.md b/PersistentStore/CHANGELOG.md index 536ac8a1a3..6fe74934e9 100644 --- a/PersistentStore/CHANGELOG.md +++ b/PersistentStore/CHANGELOG.md @@ -16,6 +16,10 @@ All notable changes to this RDK Service will be documented in this file. * For more details, refer to [versioning](https://github.com/rdkcentral/rdkservices#versioning) section under Main README. +## [2.0.1] - 2024-09-17 +### Fixed +- Decouple notification, add timeouts + ## [2.0.0] - 2024-08-06 ### Removed - Move secure store into a separate plugin diff --git a/PersistentStore/PersistentStore.cpp b/PersistentStore/PersistentStore.cpp index 2182979421..99ddcadf71 100644 --- a/PersistentStore/PersistentStore.cpp +++ b/PersistentStore/PersistentStore.cpp @@ -22,7 +22,7 @@ #define API_VERSION_NUMBER_MAJOR 2 #define API_VERSION_NUMBER_MINOR 0 -#define API_VERSION_NUMBER_PATCH 0 +#define API_VERSION_NUMBER_PATCH 1 namespace WPEFramework {