From a2e587b1e8a4fc5ad61cf32629b508ee782c770b Mon Sep 17 00:00:00 2001 From: Tianxing Ma Date: Thu, 15 Feb 2024 11:32:28 -0600 Subject: [PATCH 1/4] pub-sub: Subscribe with filter #21 --- examples/chat-pubsub-regex.cpp | 167 +++++++++++++++++++++++++++++++++ ndn-svs/common.hpp | 1 + ndn-svs/svspubsub.cpp | 21 ++++- ndn-svs/svspubsub.hpp | 14 +++ 4 files changed, 201 insertions(+), 2 deletions(-) create mode 100644 examples/chat-pubsub-regex.cpp diff --git a/examples/chat-pubsub-regex.cpp b/examples/chat-pubsub-regex.cpp new file mode 100644 index 0000000..6013eda --- /dev/null +++ b/examples/chat-pubsub-regex.cpp @@ -0,0 +1,167 @@ +/* -*- Mode: C++; c-file-style: "gnu"; indent-tabs-mode:nil -*- */ +/* + * Copyright (c) 2012-2023 University of California, Los Angeles + * + * This file is part of ndn-svs, synchronization library for distributed realtime + * applications for NDN. + * + * ndn-svs library is free software: you can redistribute it and/or modify it under the + * terms of the GNU Lesser General Public License as published by the Free Software + * Foundation, in version 2.1 of the License. + * + * ndn-svs library is distributed in the hope that it will be useful, but WITHOUT ANY + * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A + * PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details. + */ + +#include +#include +#include +#include +#include +#include + +#include + +using namespace ndn::svs; + +struct Options +{ + std::string prefix; + std::string m_id; +}; + +class Program +{ +public: + Program(const Options& options) + : m_options(options) + { + // Use HMAC signing for Sync Interests + // Note: this is not generally recommended, but is used here for simplicity + SecurityOptions secOpts(m_keyChain); + secOpts.interestSigner->signingInfo.setSigningHmacKey("dGhpcyBpcyBhIHNlY3JldCBtZXNzYWdl"); + + // Sign data packets using SHA256 (for simplicity) + secOpts.dataSigner->signingInfo.setSha256Signing(); + + // Do not fetch publications older than 10 seconds + SVSPubSubOptions opts; + opts.useTimestamp = true; + opts.maxPubAge = ndn::time::seconds(10); + + // Create the Pub/Sub instance + m_svsps = std::make_shared( + ndn::Name(m_options.prefix), + ndn::Name(m_options.m_id), + face, + std::bind(&Program::onMissingData, this, _1), + opts, + secOpts); + + std::cout << "SVS client starting: " << m_options.m_id << std::endl; + + // Subscribe to all data packets with prefix /chat (the "topic") + m_svsps->subscribeWithRegex(ndn::Regex("^"), [] (const auto& subData) + { + std::string content(reinterpret_cast(subData.data.data()), subData.data.size()); + std::cout << subData.producerPrefix << " [" << subData.seqNo << "] : " << + subData.name << " : "; + if (content.length() > 200) { + std::cout << "[LONG] " << content.length() << " bytes" + << " [" << std::hash{}(content) << "]"; + } else { + std::cout << content; + } + std::cout << std::endl; + }); + } + + void + run() + { + // Begin processing face events in a separate thread. + std::thread svsThread([this] { face.processEvents(); }); + + // Announce our presence. + // Note that the SVS-PS instance is thread-safe. + publishMsg("User " + m_options.m_id + " has joined the groupchat"); + + // Read from stdin and publish messages. + std::string userInput; + while (true) { + std::getline(std::cin, userInput); + publishMsg(userInput); + } + + // Wait for the SVS-PS thread to finish. + svsThread.join(); + } + +protected: + /** + * Callback on receving a new State Vector from another node. + * This will be called regardless of whether the missing data contains any topics + * or producers that we are subscribed to. + */ + void + onMissingData(const std::vector&) + { + // Ignore any other missing data for this example + } + + /** + * Publish a string message to the group + */ + void + publishMsg(const std::string& msg) + { + // Message to send + std::string content = msg; + + // If the message starts with "SEND " generate a new message + // with random content with length after send + if (msg.length() > 5 && msg.substr(0, 5) == "SEND ") { + auto len = std::stoi(msg.substr(5)); + + content = std::string(len, 'a'); + std::srand(std::time(nullptr)); + for (auto& c : content) + c = std::rand() % 26 + 'a'; + + std::cout << "> Sending random message with hash [" << std::hash{}(content) << "]" << std::endl; + } + + // Note that unlike SVSync, names can be arbitrary, + // and need not be prefixed with the producer prefix. + ndn::Name name("chat"); // topic of publication + name.append(m_options.m_id); // who sent this + name.appendTimestamp(); // and when + + m_svsps->publish(name, ndn::make_span(reinterpret_cast(content.data()), content.size())); + } + +private: + const Options m_options; + ndn::Face face; + std::shared_ptr m_svsps; + ndn::KeyChain m_keyChain; +}; + +int +main(int argc, char** argv) +{ + if (argc != 2) { + std::cerr << "Usage: " << argv[0] << " " << std::endl; + return 1; + } + + Options opt; + opt.prefix = "/ndn/svs"; + opt.m_id = argv[1]; + + Program program(opt); + program.run(); + + return 0; +} diff --git a/ndn-svs/common.hpp b/ndn-svs/common.hpp index b85737d..b62b71d 100644 --- a/ndn-svs/common.hpp +++ b/ndn-svs/common.hpp @@ -23,6 +23,7 @@ #include #include #include +#include #include #include diff --git a/ndn-svs/svspubsub.cpp b/ndn-svs/svspubsub.cpp index 723c890..09e6927 100644 --- a/ndn-svs/svspubsub.cpp +++ b/ndn-svs/svspubsub.cpp @@ -139,6 +139,15 @@ SVSPubSub::subscribe(const Name& prefix, const SubscriptionCallback& callback, b return handle; } +uint32_t +SVSPubSub::subscribeWithRegex(const Regex ®ex, const SubscriptionCallback &callback, bool packets) +{ + uint32_t handle = ++m_subscriptionCount; + Subscription sub = { handle, ndn::Name(), callback, packets, false, make_shared(regex)}; + m_regexSubscriptions.push_back(sub); + return handle; +} + uint32_t SVSPubSub::subscribeToProducer(const Name& nodePrefix, const SubscriptionCallback& callback, bool prefetch, bool packets) @@ -190,8 +199,8 @@ SVSPubSub::updateCallbackInternal(const std::vector& info) } } - // Fetch all mappings if we have prefix subscription(s) - if (!m_prefixSubscriptions.empty()) + // Fetch all mappings if we have prefix subscription(s) or regex subscription(s) + if (!m_prefixSubscriptions.empty() or !m_regexSubscriptions.empty()) { MissingDataInfo remainingInfo = stream; @@ -281,6 +290,14 @@ SVSPubSub::processMapping(const NodeID& nodeId, SeqNo seqNo) queued = true; } } + for (const auto& sub : m_regexSubscriptions) + { + if (sub.regex->match(mapping.first)) + { + m_fetchMap[std::pair(nodeId, seqNo)].push_back(sub); + queued = true; + } + } return queued; } diff --git a/ndn-svs/svspubsub.hpp b/ndn-svs/svspubsub.hpp index 941d4da..121e7a8 100644 --- a/ndn-svs/svspubsub.hpp +++ b/ndn-svs/svspubsub.hpp @@ -128,6 +128,18 @@ class SVSPubSub : noncopyable uint32_t subscribe(const Name& prefix, const SubscriptionCallback& callback, bool packets = false); + /** + * @brief Subscribe with a regex to name. + * + * @param regex regex of the application data + * @param callback Callback when new data is received + * @param packets Subscribe to the raw Data packets instead of BLOBs + * + * @returns Handle to the subscription + */ + uint32_t + subscribeWithRegex(const Regex& regex, const SubscriptionCallback& callback, bool packets = false); + /** * @brief Subscribe to a data producer * @@ -181,6 +193,7 @@ class SVSPubSub : noncopyable SubscriptionCallback callback; bool isPacketSubscription; bool prefetch; + std::shared_ptr regex = make_shared("^<>+$"); }; void @@ -241,6 +254,7 @@ class SVSPubSub : noncopyable uint32_t m_subscriptionCount; std::vector m_producerSubscriptions; std::vector m_prefixSubscriptions; + std::vector m_regexSubscriptions; // Queue of publications to fetch std::map, std::vector> m_fetchMap; From c25ee67033fbc9679c40eec338967ea9a8a32b2c Mon Sep 17 00:00:00 2001 From: Tianxing Ma Date: Thu, 15 Feb 2024 21:48:16 -0600 Subject: [PATCH 2/4] 1) Publish names only; 2) subscribe to the names without automatically fetching data --- ndn-svs/svspubsub.cpp | 48 +++++++++++++++++++++++++++++++++++++++---- ndn-svs/svspubsub.hpp | 17 ++++++++++++++- 2 files changed, 60 insertions(+), 5 deletions(-) diff --git a/ndn-svs/svspubsub.cpp b/ndn-svs/svspubsub.cpp index 09e6927..c4e6469 100644 --- a/ndn-svs/svspubsub.cpp +++ b/ndn-svs/svspubsub.cpp @@ -90,6 +90,24 @@ SVSPubSub::publish(const Name& name, span value, } } + +SeqNo +SVSPubSub::publish(const Name& name, + const Name& nodePrefix, time::milliseconds freshnessPeriod, + std::vector mappingBlocks) +{ + // Segment the data if larger than MAX_DATA_SIZE + + NodeID nid = nodePrefix == EMPTY_NAME ? m_dataPrefix : nodePrefix; + SeqNo seqNo = m_svsync.getCore().getSeqNo(nid) + 1; + + // Insert mapping and manually update the sequence number + insertMapping(nid, seqNo, name, mappingBlocks); + m_svsync.getCore().updateSeqNo(seqNo, nid); + + return seqNo; +} + SeqNo SVSPubSub::publishPacket(const Data& data, const Name& nodePrefix, std::vector mappingBlocks) @@ -140,10 +158,10 @@ SVSPubSub::subscribe(const Name& prefix, const SubscriptionCallback& callback, b } uint32_t -SVSPubSub::subscribeWithRegex(const Regex ®ex, const SubscriptionCallback &callback, bool packets) +SVSPubSub::subscribeWithRegex(const Regex ®ex, const SubscriptionCallback &callback,bool autofetch, bool packets) { uint32_t handle = ++m_subscriptionCount; - Subscription sub = { handle, ndn::Name(), callback, packets, false, make_shared(regex)}; + Subscription sub = { handle, ndn::Name(), callback, packets, false, make_shared(regex), autofetch}; m_regexSubscriptions.push_back(sub); return handle; } @@ -284,19 +302,41 @@ SVSPubSub::processMapping(const NodeID& nodeId, SeqNo seqNo) bool queued = false; for (const auto& sub : m_prefixSubscriptions) { - if (sub.prefix.isPrefixOf(mapping.first)) + if (sub.prefix.isPrefixOf(mapping.first) and sub.autofetch) { m_fetchMap[std::pair(nodeId, seqNo)].push_back(sub); queued = true; } + else if (sub.prefix.isPrefixOf(mapping.first) and !sub.autofetch) + { + SubscriptionData subData = { + mapping.first, + ndn::span{}, + nodeId, + seqNo, + ndn::Data() + }; + sub.callback(subData); + } } for (const auto& sub : m_regexSubscriptions) { - if (sub.regex->match(mapping.first)) + if (sub.regex->match(mapping.first) and sub.autofetch) { m_fetchMap[std::pair(nodeId, seqNo)].push_back(sub); queued = true; } + else if (sub.regex->match(mapping.first) and !sub.autofetch) + { + SubscriptionData subData = { + mapping.first, + ndn::span{}, + nodeId, + seqNo, + ndn::Data() + }; + sub.callback(subData); + } } return queued; diff --git a/ndn-svs/svspubsub.hpp b/ndn-svs/svspubsub.hpp index 121e7a8..4efecec 100644 --- a/ndn-svs/svspubsub.hpp +++ b/ndn-svs/svspubsub.hpp @@ -116,6 +116,20 @@ class SVSPubSub : noncopyable time::milliseconds freshnessPeriod = FRESH_FOREVER, std::vector mappingBlocks = {}); + /** + * @brief Publish data names only on the pub/sub group. + * + * @param name name for the publication + * @param nodePrefix Name to publish the data under + * @param freshnessPeriod freshness period for the data + * @param mappingBlocks Additional blocks to be published with the mapping (use sparingly) + */ + SeqNo + publish(const Name& name, + const Name& nodePrefix = EMPTY_NAME, + time::milliseconds freshnessPeriod = FRESH_FOREVER, + std::vector mappingBlocks = {}); + /** * @brief Subscribe to a application name prefix. * @@ -138,7 +152,7 @@ class SVSPubSub : noncopyable * @returns Handle to the subscription */ uint32_t - subscribeWithRegex(const Regex& regex, const SubscriptionCallback& callback, bool packets = false); + subscribeWithRegex(const Regex& regex, const SubscriptionCallback& callback, bool autofetch = true, bool packets = false); /** * @brief Subscribe to a data producer @@ -194,6 +208,7 @@ class SVSPubSub : noncopyable bool isPacketSubscription; bool prefetch; std::shared_ptr regex = make_shared("^<>+$"); + bool autofetch = true; }; void From d314b59e0fbdb6b7fff75204f4c92a2ee1bc8667 Mon Sep 17 00:00:00 2001 From: matianxing1992 Date: Tue, 25 Jun 2024 14:44:25 -0500 Subject: [PATCH 3/4] updated according to suggestions in the PR --- ndn-svs/common.hpp | 1 - ndn-svs/svspubsub.cpp | 79 +++++++++++++++++++++++-------------------- ndn-svs/svspubsub.hpp | 4 ++- 3 files changed, 46 insertions(+), 38 deletions(-) diff --git a/ndn-svs/common.hpp b/ndn-svs/common.hpp index b62b71d..b85737d 100644 --- a/ndn-svs/common.hpp +++ b/ndn-svs/common.hpp @@ -23,7 +23,6 @@ #include #include #include -#include #include #include diff --git a/ndn-svs/svspubsub.cpp b/ndn-svs/svspubsub.cpp index c4e6469..627c7ac 100644 --- a/ndn-svs/svspubsub.cpp +++ b/ndn-svs/svspubsub.cpp @@ -20,6 +20,8 @@ #include + + namespace ndn::svs { SVSPubSub::SVSPubSub(const Name& syncPrefix, @@ -97,15 +99,14 @@ SVSPubSub::publish(const Name& name, std::vector mappingBlocks) { // Segment the data if larger than MAX_DATA_SIZE + NodeID nid = nodePrefix == EMPTY_NAME ? m_dataPrefix : nodePrefix; + SeqNo seqNo = m_svsync.getCore().getSeqNo(nid) + 1; - NodeID nid = nodePrefix == EMPTY_NAME ? m_dataPrefix : nodePrefix; - SeqNo seqNo = m_svsync.getCore().getSeqNo(nid) + 1; - - // Insert mapping and manually update the sequence number - insertMapping(nid, seqNo, name, mappingBlocks); - m_svsync.getCore().updateSeqNo(seqNo, nid); + // Insert mapping and manually update the sequence number + insertMapping(nid, seqNo, name, mappingBlocks); + m_svsync.getCore().updateSeqNo(seqNo, nid); - return seqNo; + return seqNo; } SeqNo @@ -161,7 +162,7 @@ uint32_t SVSPubSub::subscribeWithRegex(const Regex ®ex, const SubscriptionCallback &callback,bool autofetch, bool packets) { uint32_t handle = ++m_subscriptionCount; - Subscription sub = { handle, ndn::Name(), callback, packets, false, make_shared(regex), autofetch}; + Subscription sub = { handle, ndn::Name(), callback, packets, false, autofetch, make_shared(regex)}; m_regexSubscriptions.push_back(sub); return handle; } @@ -302,40 +303,46 @@ SVSPubSub::processMapping(const NodeID& nodeId, SeqNo seqNo) bool queued = false; for (const auto& sub : m_prefixSubscriptions) { - if (sub.prefix.isPrefixOf(mapping.first) and sub.autofetch) + if (sub.prefix.isPrefixOf(mapping.first)) { - m_fetchMap[std::pair(nodeId, seqNo)].push_back(sub); - queued = true; - } - else if (sub.prefix.isPrefixOf(mapping.first) and !sub.autofetch) - { - SubscriptionData subData = { - mapping.first, - ndn::span{}, - nodeId, - seqNo, - ndn::Data() - }; - sub.callback(subData); + if (sub.autofetch) + { + m_fetchMap[std::pair(nodeId, seqNo)].push_back(sub); + queued = true; + } + else + { + SubscriptionData subData = { + mapping.first, + ndn::span{}, + nodeId, + seqNo, + ndn::Data() + }; + sub.callback(subData); + } } } for (const auto& sub : m_regexSubscriptions) { - if (sub.regex->match(mapping.first) and sub.autofetch) - { - m_fetchMap[std::pair(nodeId, seqNo)].push_back(sub); - queued = true; - } - else if (sub.regex->match(mapping.first) and !sub.autofetch) + if (sub.regex->match(mapping.first)) { - SubscriptionData subData = { - mapping.first, - ndn::span{}, - nodeId, - seqNo, - ndn::Data() - }; - sub.callback(subData); + if (sub.autofetch) + { + m_fetchMap[std::pair(nodeId, seqNo)].push_back(sub); + queued = true; + } + else + { + SubscriptionData subData = { + mapping.first, + ndn::span{}, + nodeId, + seqNo, + ndn::Data() + }; + sub.callback(subData); + } } } diff --git a/ndn-svs/svspubsub.hpp b/ndn-svs/svspubsub.hpp index 4efecec..480749c 100644 --- a/ndn-svs/svspubsub.hpp +++ b/ndn-svs/svspubsub.hpp @@ -24,6 +24,7 @@ #include "svsync.hpp" #include +#include namespace ndn::svs { @@ -207,8 +208,9 @@ class SVSPubSub : noncopyable SubscriptionCallback callback; bool isPacketSubscription; bool prefetch; - std::shared_ptr regex = make_shared("^<>+$"); bool autofetch = true; + std::shared_ptr regex = make_shared("^<>+$"); + }; void From 2ac6180ab56b566506c03554b8f2ba1b6cea25fd Mon Sep 17 00:00:00 2001 From: matianxing1992 Date: Wed, 11 Dec 2024 15:28:45 -0600 Subject: [PATCH 4/4] Piggybacking small data to sync interest --- ndn-svs/svspubsub.cpp | 117 ++++++++++++++++++++++++++++++++++-------- ndn-svs/svspubsub.hpp | 12 ++++- 2 files changed, 107 insertions(+), 22 deletions(-) diff --git a/ndn-svs/svspubsub.cpp b/ndn-svs/svspubsub.cpp index 627c7ac..a3eaadf 100644 --- a/ndn-svs/svspubsub.cpp +++ b/ndn-svs/svspubsub.cpp @@ -15,6 +15,7 @@ */ #include "svspubsub.hpp" +#include "tlv.hpp" #include @@ -22,6 +23,7 @@ + namespace ndn::svs { SVSPubSub::SVSPubSub(const Name& syncPrefix, @@ -88,6 +90,9 @@ SVSPubSub::publish(const Name& name, span value, data.setContent(value); data.setFreshnessPeriod(freshnessPeriod); m_securityOptions.dataSigner->sign(data); + // if the data size is smaller than MAX_SIZE_OF_PIGGYDATA, add it to the piggyback queue + if (data.wireEncode().size() <= MAX_SIZE_OF_PIGGYDATA) + m_piggyDataQueue.push(data); return publishPacket(data, nodePrefix); } } @@ -131,7 +136,7 @@ SVSPubSub::insertMapping(const NodeID& nid, SeqNo seqNo, const Name& name, unsigned long now = std::chrono::duration_cast (std::chrono::system_clock::now().time_since_epoch()).count(); - auto timestamp = Name::Component::fromNumber(now, tlv::TimestampNameComponent); + auto timestamp = Name::Component::fromNumber(now, ndn::tlv::TimestampNameComponent); additional.push_back(timestamp); } @@ -162,7 +167,7 @@ uint32_t SVSPubSub::subscribeWithRegex(const Regex ®ex, const SubscriptionCallback &callback,bool autofetch, bool packets) { uint32_t handle = ++m_subscriptionCount; - Subscription sub = { handle, ndn::Name(), callback, packets, false, autofetch, make_shared(regex)}; + Subscription sub = { handle, ndn::Name(), callback, packets, false, autofetch, std::make_shared(regex)}; m_regexSubscriptions.push_back(sub); return handle; } @@ -284,7 +289,7 @@ SVSPubSub::processMapping(const NodeID& nodeId, SeqNo seqNo) // if no timestamp block is present, we just skip this step for (const auto& block : mapping.second) { - if (block.type() != tlv::TimestampNameComponent) + if (block.type() != ndn::tlv::TimestampNameComponent) continue; unsigned long now = @@ -307,8 +312,24 @@ SVSPubSub::processMapping(const NodeID& nodeId, SeqNo seqNo) { if (sub.autofetch) { - m_fetchMap[std::pair(nodeId, seqNo)].push_back(sub); - queued = true; + // try to find in the piggyDataCache + auto data = m_piggyDataCache.find(mapping.first); + if(data != nullptr){ + // return data to subscription + SubscriptionData subData = { + mapping.first, + data->getContent().value_bytes(), + nodeId, + seqNo, + ndn::Data()}; + sub.callback(subData); + } + else + { + // try to fetch from network + m_fetchMap[std::pair(nodeId, seqNo)].push_back(sub); + queued = true; + } } else { @@ -323,26 +344,42 @@ SVSPubSub::processMapping(const NodeID& nodeId, SeqNo seqNo) } } } - for (const auto& sub : m_regexSubscriptions) + for (auto &sub : m_regexSubscriptions) { if (sub.regex->match(mapping.first)) { - if (sub.autofetch) + if (sub.autofetch) + { + // try to fetch from network + m_fetchMap[std::pair(nodeId, seqNo)].push_back(sub); + queued = true; + } + else + { + // try to find in the piggyDataCache + auto data = m_piggyDataCache.find(mapping.first); + if (data != nullptr) { - m_fetchMap[std::pair(nodeId, seqNo)].push_back(sub); - queued = true; + // return data to subscription + SubscriptionData subData = { + mapping.first, + data->getContent().value_bytes(), + nodeId, + seqNo, + ndn::Data()}; + sub.callback(subData); } else { - SubscriptionData subData = { - mapping.first, - ndn::span{}, - nodeId, - seqNo, - ndn::Data() - }; - sub.callback(subData); + SubscriptionData subData = { + mapping.first, + ndn::span{}, + nodeId, + seqNo, + ndn::Data()}; + sub.callback(subData); } + } } } @@ -522,9 +559,32 @@ SVSPubSub::cleanUpFetch(const std::pair& publication) Block SVSPubSub::onGetExtraData(const VersionVector&) { + // Create a block and it's type is tlv::ApplicationParameters + // This block will be sent to the other node as extra data in the Sync Interest + // It contains the notification mapping list and one or a list of piggybacked data packets + ndn::Block block(ndn::tlv::Content); MappingList copy = m_notificationMappingList; + auto mappingBlock = copy.encode(); + block.push_back(mappingBlock); + + size_t size = mappingBlock.size(); + + while (!m_piggyDataQueue.empty()) + { + const auto &data = m_piggyDataQueue.front(); // Access the front element + // If the size of the block is greater than the maximum size of the application parameters, then do not add any more data packets + auto dataBlock = data.wireEncode(); + size = size + dataBlock.size(); + if (size > MAX_SIZE_OF_APPLICATION_PARAMETERS) + break; + block.push_back(dataBlock); + m_piggyDataQueue.pop(); // Remove the front element + } + block.encode(); + m_notificationMappingList = MappingList(); - return copy.encode(); + + return block; } void @@ -532,10 +592,25 @@ SVSPubSub::onRecvExtraData(const Block& block) { try { - MappingList list(block); - for (const auto& p : list.pairs) + block.parse(); + for (const auto &childBlock : block.elements()) { - m_mappingProvider.insertMapping(list.nodeId, p.first, p.second); + // if block is tlv::MappingData, then it's mapping data + if (childBlock.type() == ndn::svs::tlv::MappingData) + { + MappingList list(childBlock); + for (const auto &p : list.pairs) + { + m_mappingProvider.insertMapping(list.nodeId, p.first, p.second); + } + } + // if block is ndn::svs::tlv::PiggybackData, then it's a piggybacked data packet + if (childBlock.type() == ndn::tlv::Data) + { + // Add it to the piggyback data cache + auto dataPtr = std::make_shared(ndn::Data(childBlock)); + m_piggyDataCache.insert(*dataPtr); + } } } catch (const std::exception&) {} diff --git a/ndn-svs/svspubsub.hpp b/ndn-svs/svspubsub.hpp index 480749c..0cfcb38 100644 --- a/ndn-svs/svspubsub.hpp +++ b/ndn-svs/svspubsub.hpp @@ -25,6 +25,7 @@ #include #include +#include namespace ndn::svs { @@ -209,7 +210,7 @@ class SVSPubSub : noncopyable bool isPacketSubscription; bool prefetch; bool autofetch = true; - std::shared_ptr regex = make_shared("^<>+$"); + std::shared_ptr regex; }; @@ -276,6 +277,15 @@ class SVSPubSub : noncopyable // Queue of publications to fetch std::map, std::vector> m_fetchMap; std::map, bool> m_fetchingMap; + + + size_t MAX_SIZE_OF_APPLICATION_PARAMETERS = 1024; + size_t MAX_SIZE_OF_PIGGYDATA = 800; + bool Enable_PiggyData = true; + // Queue of Pending Piggy Data (to be sent in the next update with sync interest) : First in first out + std::queue m_piggyDataQueue; + // A cache for received piggy data + ndn::InMemoryStorageLru m_piggyDataCache; }; } // namespace ndn::svs