Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Experimental Router #252

Open
aaronalbers opened this issue Apr 28, 2019 · 15 comments
Open

Experimental Router #252

aaronalbers opened this issue Apr 28, 2019 · 15 comments

Comments

@aaronalbers
Copy link
Contributor

aaronalbers commented Apr 28, 2019

I built out a router for channels that allows you to conditionally send data down different paths. An example use case might be routing web requests based on the url of the request. If this proves to be useful perhaps something like this could be added to the library. The following example routes based on the contents of a string.

#include <iostream>
#include <string>
#include <thread>
#include <vector>
#include <set>
#include <array>

// built with stlab in same directory:
// clang++ -I. -std=c++17 main.cpp

// version 1.4.1
#include <stlab/concurrency/channel.hpp>
#include <stlab/concurrency/default_executor.hpp>

namespace stlab {
template <class T, class K>
class router {
    using channel_pair = std::pair<stlab::sender<T>, stlab::receiver<T>>;
    using route_pair = std::pair<K, channel_pair>;
    using routes = std::vector<route_pair>;

public:
    template <class E, class F>
    router(E executor, F router_func) : _self{std::make_shared<model<E, F>>(std::move(executor), std::move(router_func))} {}

    void set_ready() { _self->set_ready(); }
    stlab::receiver<T> get_route(K key) { return _self->get_route(std::move(key)); }
    stlab::receiver<T> get_default_route() { return _self->get_default_route(); }
    void operator()(T t) { _self->route(std::move(t)); }

private:
    struct concept_t {
        virtual stlab::receiver<T> get_route(K key) = 0;
        virtual stlab::receiver<T> get_default_route() = 0;
        virtual void set_ready() = 0;
        virtual void route(T t) = 0;
        virtual ~concept_t() = default;
    };

    template <class E, class F>
    struct model : concept_t {
        model(E executor, F router_func)
        : _executor{std::move(executor)}
        , _router_func{std::move(router_func)}
        , _sender_receiver{stlab::channel<T>(_executor)}
        {}

        void set_ready() override {
            std::sort(std::begin(_routes), std::end(_routes), [](const route_pair& a, const route_pair& b){
                return a.first < b.first;
            });
            for (auto& pair : _routes)
                pair.second.second.set_ready();
            _sender_receiver.second.set_ready();
        }

        stlab::receiver<T> get_default_route() override { return _sender_receiver.second; }

        stlab::receiver<T> get_route(K key) override {
            auto find_it = std::find_if(begin(_routes), std::end(_routes), [&](const route_pair& pair){
                return pair.first == key;
            });
            if (find_it != std::end(_routes)) return find_it->second.second;
            _routes.push_back(std::make_pair(std::move(key), stlab::channel<T>(_executor)));
            return _routes.back().second.second;
        }

        template<class C, typename std::enable_if_t<std::is_same<typename C::value_type, std::pair<K, bool>>::value, int> = 0>
        void route_keys(const C& key_set, T t) {
            bool did_route = false;
            auto find_it = std::begin(_routes);
            for (const auto& key : key_set) {
                if (!key.second) continue;
                find_it = std::lower_bound(find_it, std::end(_routes), key, [](const route_pair& a, const std::pair<K, bool>& k){
                    return a.first < k.first;
                });
                if (find_it == std::end(_routes)) break;
                if (find_it->first != key.first) continue;
                find_it->second.first(t);
                did_route = true;
            }
            if (!did_route) return _sender_receiver.first(std::move(t));
        }

        template<class C, typename std::enable_if_t<std::is_same<typename C::value_type, K>::value, int> = 0>
        void route_keys(const C& key_set, T t) {
            auto find_it = std::begin(_routes);
            for (const auto& key : key_set) {
                find_it = std::lower_bound(find_it, std::end(_routes), key, [](const route_pair& a, const K& k){
                    return a.first < k;
                });
                if (find_it == std::end(_routes)) return;
                if (find_it->first != key) continue;
                find_it->second.first(t);
            }
        }

        void route(T t) override {
            if (std::empty(_routes)) return _sender_receiver.first(std::move(t));
            const auto& key_set = _router_func(t);
            if (std::empty(key_set)) return _sender_receiver.first(std::move(t));
            route_keys(key_set, std::move(t)); 
        }

        E _executor;
        F _router_func;
        routes _routes;
        channel_pair _sender_receiver;
    };

    std::shared_ptr<concept_t> _self;
};
} // namespace stlab

using namespace std;
using namespace std::placeholders;
using namespace stlab;

int main() {
    sender<std::string> send;
    receiver<std::string> receive;

    tie(send, receive) = channel<std::string>(default_executor);

#if 0
    stlab::router<std::string, std::string> router{default_executor, [](const std::string& t) {
        std::set<std::string> keys;
        if (t.find("hello") != std::string::npos) keys.insert("contains hello");
        if (t.find("world") != std::string::npos) keys.insert("contains world");
        if (t == "hello world") keys.insert("hello world");
        return keys;
    }};
#else
    stlab::router<std::string, std::string> router{default_executor, [](const std::string& t) {
        return std::array<std::pair<std::string, bool>, 3>{{
            {"contains hello", t.find("hello") != std::string::npos},
            {"contains world", t.find("world") != std::string::npos},
            {"hello world",    t == "hello world"}
        }};
    }};
#endif

    std::atomic_bool done{false};

    auto hold = receive | std::bind(router, _1);

    auto default_case = router.get_default_route() | [](std::string t) -> std::string {
        return t + ": default case\n";
    };

    auto hello = router.get_route("contains hello") | [](std::string t) -> std::string {
        return t + ": contains hello\n";
    };

    auto world = router.get_route("contains world") | [](std::string t) -> std::string {
        return t + ": contains world\n";
    };

    auto hello_world = router.get_route("hello world") | [](std::string t) -> std::string {
        return t + ": is hello world\n";
    };

    auto other_hello_world = router.get_route("hello world") | [](std::string t) -> std::string {
        return t + ": is also hello world\n";
    };

    auto all_cases =
        stlab::merge_channel<unordered_t>(stlab::default_executor,
                                          [& _done = done](std::string message) {
                                              std::cout << message;
                                              _done = true;
                                          },
                                          std::move(default_case), std::move(hello), std::move(world), std::move(hello_world), std::move(other_hello_world));

    // It is necessary to mark the receiver side as ready, when all connections are
    // established
    receive.set_ready();
    router.set_ready();

    send("bob");
    send("hello");
    send("world");
    send("hello world");

    // Waiting just for illustrational purpose
    while (!done.load()) {
        this_thread::sleep_for(chrono::milliseconds(1));
    }
    return 0;
}
/*
bob: default case
hello: contains hello
hello world: contains hello
world: contains world
hello world: contains world
hello world: is hello world
hello world: is also hello world
*/
@FelixPetriconi
Copy link
Member

Overall I like the idea of this router.
I would consider to use a more cache friendly data structure than an unordered map, like a vector. What should happen if multiple receivers are attached to the same key?

@aaronalbers
Copy link
Contributor Author

I think if multiple receivers are attached to the same key it should do as you would expect and send a copy to both (as it does currently). I like the idea of moving to a using routes = std::vector<std::pair<K, channel_pair>> and pass a predicate to the router that takes a K and T. That would allow for "broadcasting" or "multicasting" to several routes and not just one. I will follow up with some mods to that effect.

@aaronalbers
Copy link
Contributor Author

aaronalbers commented May 13, 2019

@FelixPetriconi Mods are done. I am not sure about the use of std:: partition()... seems like it might move things around more than they need to.

@FelixPetriconi
Copy link
Member

@aaronalbers I would apply a stable_sort on all receivers when the user has called set_ready. Then all values of the same category come after each other and no rearranging is necessary. As well then you run into synchronisation issues when the router is part of a whole graph where the router could be triggered from multiple threads at the same time.
What do you think?

@aaronalbers
Copy link
Contributor Author

@FelixPetriconi I was thinking about your suggestion but there is a problem. Since there is only ever 1 receiver per key and the categories of receivers are dependent on both the key and the value we will have to resort to a full linear scan for each route. That is if we want to keep the ability to "multicast" to a set of receivers depending on the input.

Even if the std::partition solution didn't have thread safety issues it would preclude any routing strategy that utilized a non-deterministic predicate (ie random routing, load balancing, etc).

Perhaps two versions (or modes) of the router would be helpful. One that doesn't multicast with a deterministic predicate and the other that is more flexible. The advantage of the first would be increased performance.

@aaronalbers
Copy link
Contributor Author

aaronalbers commented May 16, 2019

@FelixPetriconi I think I found a way that is the best of both worlds. The provided function takes the value to route and returns a std::set<> of keys to route to. The routes are sorted at set_ready() and a std::lower_bound() can be used to find each key in the set sequentially. How do you feel about constructing a std::set<> on each call to route()?

@FelixPetriconi
Copy link
Member

@aaronalbers I am not sure, if std::set<> is the correct container for such a task, because it is as a node based container much more expensive than a std::vector<>. I have to go into deeper into this, but I am a bit short of time at the moment, sorry.

@aaronalbers
Copy link
Contributor Author

@FelixPetriconi The code will work with a std::vector<> as well as with a std::set<> (just change std::set<K> to auto) as long as the vector is sorted and there are no duplicates. If there are duplicates or it is not sorted it will not route everything correctly. It could simply be documentation thing if you want. Take your time. =)

@FelixPetriconi
Copy link
Member

@aaronalbers I understand now, why you are returning a container from the route function. Doing it such makes the code nice and uncomplicated. std::set has its advantages of guaranteeing uniqueness of the elements, but from the point of cache friendliness it is problematic, because each node is somewhere else in memory. So a vector would be better here. With a simple append_unique function one could ensure, that each key just appears ones.
But it seems to me wrong from the point of performance to return a container with a dynamic memory allocation inside for each value which comes from upstream, because a new inside the container comes with a mutex lock inside most of the existing allocators and when destroying it, again a mutex lock. But up to now, I do not find a better way to handle this. Perhaps this is the price that ones has to pay, to have a run time specific number of routes.
But we could provide a router with a compile time fixed number of routes, that does not have this problem, because then we could always use a std::array<key, number of routes> as return value from the route function.

@aaronalbers
Copy link
Contributor Author

aaronalbers commented May 18, 2019

@FelixPetriconi I have updated the example with the ability to return any container type including std::array<>.

There are two options for the value_type of the container:

  1. K
  2. std::pair<bool, K> where bool indicates if the route should be used or not

As always the container returned must be sorted by K and contain no duplicates of K. That is handled automatically by std::set<K> but you could handle it implicitly like I did in the example by constructing the std::array<> in a sorted order.

Note:
I just realized that if is swap the pair to std::pair<K, bool> then a user could return a std::map<K, bool>

@FelixPetriconi
Copy link
Member

@aaronalbers I think the router would be a valuable extension to the library.
Option 2 is interesting, but I am not sure about the general need for it.
The idea of a router contains two parts, the dynamic one, from above and a static one, where everything is done during compile time. Do you want to bring it into a PR or shall I do it?

@aaronalbers
Copy link
Contributor Author

aaronalbers commented May 18, 2019

@FelixPetriconi Option 2 allows you to use std::array<> and have the ability to conditionally route to all possible keys.

Given N possible routes. If you return a std::array<K, N> it would necessarily have to contain all possible K every time (ie broadcast). This would cause every T to be routed to every K route. You could return an std::array<K, N-X> (where 0 <= X <= N) and conditionally route to N-X routes but N-X unique routes MUST be provided every time (ie every T routes to N-X keys every time route() is called)

At least with option 2 you can maintain the same flexibility of option 1 without a container that utilizes an allocator.

Should this live in channel.hpp or router.hpp?

Edit:
After further review I think it would be possible to dynamically route to all possible keys using std::array<> without option 2. If you used the fact that if your std::array<K, N> contains a K that doesn't have a corresponding route then it will stop all routing (essentially like a null terminator). So I agree that option two is interesting but not really necessary.

@FelixPetriconi
Copy link
Member

I did not make my point clear enough. Returning a vector is fine for your current dynamic setup. In case that one goes for a compile time variant of the router, then one could use a array<> to hold the values. In this case, I would pass to the routing function a pair of iterators, that spread the complete array<> and then it get filled from bottom to the last key and then this returns the iterator pointing beyond the last used element. So the calling function holds an array<key, N>, where N is equal to the number of routes.

It should go into router.hpp

@aaronalbers
Copy link
Contributor Author

@FelixPetriconi Ah. I think I understand now. I am curious to see what that would look like in practice. The PR is up #256

@dabrahams
Copy link
Contributor

@aaronalbers thanks for the PR; we'll take a look.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants