Skip to content
Ilya Baldin edited this page Feb 6, 2025 · 10 revisions

E2SAR C++ APIs

Overview

E2SAR is implemented as a C++ library and consists of several major classes:

  • EjfatURI - which allows to parse and generate EJFAT-compliant URI formats (see EJFAT URI Formats)
  • LBManager - which implements the gRPC communications functionality (see Integration Overview)
  • Segmenter - which implents
    • Segmentation of the event buffers
    • Background communication with the control plane using Sync UDP packets
  • Reassembler - which implements
    • Reassembly of the event buffers
    • Via an embedded LBManager object allows to explicitly Register/Deregister worker nodes and in the background use SendState to inform control plane of the receive queue state

In addition the library defines Sync, Load Balancer (LB) and Reassembly (RE) headers.

What follows is the discussion of the these classes, best practices for instantiating them and invoking their methods. This does not discuss all available methods - for that see Doxygen documentation.

Integrating with the APIs

When integrating with the API it is best to include e2sar.hpp and link against the library -le2sar. All E2SAR classes and standalone functions as well as constants are defined inside e2sar namespace.

Note that the API implementation uses the following convention w.r.t. exceptions and return codes:

  • Constructors throw exceptions, generally of type E2SARException which contain an error message.
  • All public methods, besides the most trivial return a result<T> type which can either be of type T or an E2SARErrorInfo which has two methods: code() for the error code and message() for the error message.

The way to determine if a constructor succeeded is to surround its invocation with try/catch:

try 
{
    auto e2sarObject = E2SARObjectConstructor();
} catch (E2SARException &ee) {
        std::cout << "Exception encountered: " << static_cast<std::string>(ee) << std::endl;
}

The way to determine if a method call succeeded is to check if has_error() is true on the result:

auto returnObject = AnyE2SARObject.anyMethod();
if (returnObject.has_error()) 
{
    // the call did not succeed
    // returnObject.error().message() contains the error message string
    // returnObject.error().code() contains one of the error codes defined in e2sarError.hpp
} else {
    // the call succeeded
    auto returnValue = returnObject.value() - now we have an object of type T with the return value
}

EjfatURI

EjfatURI has a single constructor:

EjfatURI(const std::string &uri, TokenType tt=TokenType::admin, bool preferV6=false);

Where:

  • uri is a string formatted according to the rules here
  • tt is an indicator of the token type provided in the URI string (TokenType::[admin,intance,session])
  • preferV6 is a flag telling the object how to convert the hostname in the URI into an address. The URI supports providing control plane address as either IPv4 address, IPv6 address or a hostname. In the case of a hostname (i.e. ejfats://token@somehostname:12345/), it may resolve to both IPv4 or IPv6 address and this flag indicates the preference to use the IPv6 address. It has no effect if the control plane host is specified by IP address (i.e. ejfats://[email protected]:12345/)

It offers a variety of getter/setter methods which can set or override information provided in the URI string (see documentation).

It also offers factory-like static methods to create instances of this class:

  • static getFromEnv(variableName, tokenType, preferV6) - create EjfatURI object from a string contained in an environment variable (EJFAT_URI by default), with other parameters mimicking the constructor
  • static getFromString(uriString, tokenType, preferV6) - similarly create EjfatURI object from a string, with parameters mimicking the constructor.
  • static getFromFile(fileName) - read the URI string from a named file, defaulting to /tmp/ejfat_uri

Finally it offers a helper method to resolve a hostname to a list of IP(v4 or v6) addresses:

  • static resolveHost(hostname) - resolve a hostname string to a vector of boost::asio::ip::address objects representing IPv4 and/or IPv6 addresses

LBManager

This class has a single constructor:

LBManager(const EjfatURI &cpuri, bool validateServer = true,
                  grpc::SslCredentialsOptions opts = grpc::SslCredentialsOptions());

Where:

  • cpuri is the EjfatURI object
  • validateServer is a boolean flag, indicating whether server certificate should be validated when talking to the control plane over gRPC. Generally expected to be set to true in production.
  • opts is an object containing SSL Credentials options. This is used if the EjfatURI object has ejfats:// schema (as opposed to ejfat://), indicating the required use of TLS/SSL. Depending on the origin of the server certificate used by the control plane different options are supported in this case:
    • Server certificate is issued by a trusted Certificate Authority whose cert is among well known trust roots contained in the SSL library. In this case validateServer should be set to true and opts should be left as default.
    • Server certificate is issued by a private Certificate Authority, CA certificate is available (this also applies to self-signed server certs if they are available). In this case the CA or self-signed server certificate can be loaded using makeSslOptionsFromFiles(pem_root_certs) function which will read the PEM encoded certificate and returns a grpc::SslCredentialsOptions-compatible objects to be passed as opts to the constructor. validateServer should be set to true. This will allow to validate server certificate against this root certificate.
    • Server certificate is self-signed and not available or issued by a private CA whose CA certificate is not available. In this case it is recommended to set validateServer to false and use default opts. This will cause the code to skip certificate validation alltogether (not something that is recommended to be used in production).

LBManager object implements the full range of gRPC calls (1-8, X, Y, Z) shown in the diagram on this page.

In the case of getLBStatus() and overview() calls the returned data structures are highly specific to the protobuf library and deep. To simplify retrieving information from them, helper methods are provided to convert these into simple struct-like structures (LBStatus and OverviewEntry):

    auto res = lbman.getLBStatus(lbid);
    if (~res.has_error())
    {
        LBStatus lbstatus = LBManager::asLBStatus(res.value());
        // inspect LBStatus

        std::cout << "Registered sender addresses: ";
        for (auto a : lbstatus->senderAddresses)
            std::cout << a << " "s;
        std::cout << std::endl;

        std::cout << "Registered workers: ";
        for (auto w : lbstatus->workers)
        {
            std::cout << "[ name="s << w.name() << ", controlsignal="s << w.controlsignal() << ", fillpercent="s << w.fillpercent() << ", slotsassigned="s << w.slotsassigned() << ", lastupdated=" << *w.mutable_lastupdated() << "] "s << std::endl;
        }
        std::cout << std::endl;

        std::cout << "LB details: expiresat=" << lbstatus->expiresAt << ", currentepoch=" << lbstatus->currentEpoch << ", predictedeventnum=" << lbstatus->currentPredictedEventNumber << std::endl;
    }

Similarly for overview() method:

    auto res = lbman.overview();
    if (res.has_error())
    {
        // this returns a std::vector<OverviewEntry>
        auto overview = LBManager::asOverviewMessage(res.value());
        for (auto r: overview) 
        {
            std::cout << "LB " << r.name << " ID: " << r.lbid << " FPGA LBID: " << r.fpgaLBId << std::endl;
            std::cout << "  Registered sender addresses: ";
            for (auto a : r.status.senderAddresses)
                std::cout << a << " "s;
            std::cout << std::endl;

            std::cout << "  Registered workers: " << std::endl;
            for (auto w : r.status.workers)
            {
                std::cout << "  [ name="s << w.name() << ", controlsignal="s << w.controlsignal() << 
                    ", fillpercent="s << w.fillpercent() << ", slotsassigned="s << w.slotsassigned() << 
                    ", lastupdated=" << *w.mutable_lastupdated() << "] "s << std::endl;
            }
            std::cout << std::endl;

            std::cout << "  LB details: expiresat=" << r.status.expiresAt << ", currentepoch=" << 
                r.status.currentEpoch << ", predictedeventnum=" << 
                r.status.currentPredictedEventNumber << std::endl;
        }
    }

Principle of Operation

This object can be used standalone as part of e.g. admin tool, like lbadm or integrated into WMS. Note that prior to sending data to the Load Balancer one must call one of the reserveLB() methods using an admin token.

Using the returned URI, Segmenter and Reassembler objects can be instantiated. Reassembler has a simplified version of registerWorker() method that must be invoked prior to receiving any data (some of the parameters to this call are supplied internally by the Reassembler object). There is also an option to call deregisterWorker() from Reassembler. Alternatively these calls can also be made by the central WMS.

Reassembler will automatically call sendState() method with required frequency to update the control plane regarding the worker queue state.

Consult the diagram in the Integration overview document for details.

Note that Segmenter does not offer addSenders() and removeSenders() calls. These can be invoked from the sender node where Segmenter is instantiated, but also from central WMS - this is a deployment choice, and E2SAR does not dictate a specific approach. Only that these calls must be made in order for sender IP addresses to be registered with the Load Balancer control plane to allow it to receive event segments from those nodes. Otherwise an equivalent of firewall rules disallows segments from being received by the Load Balancer from unregistered addresses.

All other gRPC calls can be used as part of the admin tool or integrated into WMS - again - a deployment choice.

Segmenter

The Segmenter class has a single constructor:

Segmenter (const EjfatURI &uri, u_int16_t dataId, u_int32_t eventSrcId, const SegmenterFlags &sflags=SegmenterFlags())

Where:

  • uri is an EjfatURI object initialized for sender with sync address and data address(es)
  • dataId can e.g. be a channel id of the detector. This is the value applied to all event buffers sent using this Segmenter, unless overridden in send calls (more discussion below)
  • eventSrcId is a unique identifier of an individual LB packet transmitting host/daq, 32-bit to accommodate IP addresses more easily, carried in Sync header
  • sflags - a struct SegmenterFlags discussed below

Principle of Operation

Segmenter performs two functions - it:

  • breaks up event buffers provided by the user into segments that fit into MTU including additional EJFAT headers (LBHeader and REHeader)
  • periodically sends UDP Sync messages to the control plane to help inform it of the rate with which event numbers/ticks are changing.

Two threads run in the background - a send thread and a sync thread. Send thread picks up event buffers from a queue, segments them and sends out the segments and sync thread sends sync messages with a predetermined frequency.

There is a lock-free queue on which a user can deposit event buffers to be segmented and sent out.

Each event buffer is uniquely identified by a tuple <event number, dataId>, where dataId denotes e.g. a DAQ channel. This identification scheme supports both single-channel and multi-channel DAQs.

  • For single-channel DAQs, you can set a fixed value for dataId in the constructor, ensuring that all messages sent using this Segmenter will share the same dataId.
  • For multi-channel DAQs, you can specify the dataId for each event buffer individually when calling the sendEvent() or addToSendQueue() methods (see Sending Data below).

The third piece of data that can be optionally supplied for each event is entropy, which is inserted into the Load Balancer header (LBHeader) of every segment. Its role is to control which destination UDP port the event segments will be sent to. If the user omits this parameter in sendEvent() and addToSendQueue() calls, Segmenter will generate a random value for each event buffer, thus randomizing the destination UDP port and providing some load spreading to the receiver (the Reassembler object described below can listen on multiple UDP ports). If control over destination UDP port is desired, the user can provide their own entropy value and e.g. use a consistent mapping between dataId values and entropy (e.g. setting entropy == dataId) thus guaranteeing that segments with the same dataId go to the same destination UDP port.

It is worth noting that the load balancer determines the destination worker node purely based on the event number, while the destination UDP port on that node is determined only by the entropy value. Event buffers with different event numbers, but the same entropy value may go to different nodes, but will use the same UDP port. In general it is recommended to allow Segmenter to set random entropy values to allow for better load spreading in the Reassembler.

Regardless of these settings however E2SAR guarantees that all segments of the same event buffer identified by <event number, dataId> are sent by the Load Balancer to the same worker node and the same Reassembler instance so they can be properly reassembled.

Sending Data

Before sending data the user must invoke openAndStart() method of the Segmenter object to start the threads and open the sockets.

Event buffers can be segmented and sent out using one of two calls: A blocking 'immediate send' call:

result<int> sendEvent (u_int8_t *event, size_t bytes, EventNum_t _eventNumber=0LL, u_int16_t _dataId=0, u_int16_t _entropy=0) noexcept

or non-blocking 'delayed send' call:

result<int> addToSendQueue (u_int8_t *event, size_t bytes, EventNum_t _eventNum=0LL, u_int16_t _dataId=0, u_int16_t entropy=0, void(*callback)(boost::any)=nullptr, boost::any cbArg=nullptr) noexcept

The parameters are the event buffer and its length, optional event number, data id and entropy. If event number is not provided, Segmenter uses an internal monotonically increasing 64-bit counter.

The non-blocking call has additional callback and callback parameter options - the provided callback function will be called with the given parameter when the event buffer has been sent. Note that the callback is called from the context of the sending thread and must be non-blocking.

Inspecting Statistics and Errors

The Segmenter object maintains two similar sets of internal statistics counters which can be retrieved using getSyncStats() and getSendStats() methods. Each method returns a 3-tuple indicating the number of segments or frames sent, number of segments of frame send errors and the errno associated with the latest error.

Thus for example for Sync frames you can do (the approach is identical for event segments and getSendStats()):

    auto syncStats = seg.getSyncStats();
    sendStats = seg.getSendStats();

    // syncStats.get<0>() - number of sync frames sent
    // syncStats.get<1>() - number of sync send errors
    // syncStats.get<2>() - last errno

    if (syncStats.get<1>() != 0) 
    {
        std::cout << "Error encountered sending sync frames: " << strerror(syncStats.get<2>()) << std::endl;
    }

Customizing Segmenter Behavior

Changing the fields of SegmenterFlags prior to invoking the constructor is the main way to modify Segmenter behavior. See descriptions of individual fields for further details.

Reassembler

The Reassembler class has two constructors:

Reassembler (const EjfatURI &uri, size_t numRecvThreads=1, const ReassemblerFlags &rflags=ReassemblerFlags())

and

Reassembler (const EjfatURI &uri, std::vector< int > cpuCoreList, const ReassemblerFlags &rflags=ReassemblerFlags())

The former allows the user to specify the number of receive threads, while the latter allows to indicate ids of the cores to bind each thread to (the number of threads will be equal to the length of the vector). The latter approach is useful for NUMA optimization - user should determine which NUMA domains the receive NIC is bound to, then determine which cores are connected to this NUMA domain and provide a subset of those core IDs in this call.

You can use lstopo or lscpu commands to determine NUMA configuration and binding between devices, NUMA domains and cores. This information is also normally contained in /sys/class/net/<device name>/device file. If using the first constructor, taskset and numatcl commands can be used to set process affinity (the latter sets both process CPU and memory affinity to a device).

The number of threads indirectly relates to the number of UDP ports that will be open for listening for incoming event segments. Load Balancer always operates on powers of 2 numbers of ports (hardware limitation), thus if e.g. a user selects 7 threads (or, equivalently, cores), the number of open ports will be 8 (2^3), and 6 threads will listen to 1 port each, while 1 thread will listen to 2.

It is possible to override this computation and directly set the number of listening ports (rather the power of 2 that determines it), by setting ReassemblerFlags.portRange to a value between 0 and 14. In this case the ports will be evenly spread among available threads. If the computed number of ports is too low for the number of threads indicated, some threads will remain idle.

Principle of Operation

As already mentioned the Reassembler starts some number of threads and listens on some number of UDP ports for segments of event buffers. Per Load Balancer specification, segments of the same event (identified by a tuple <event number, dataId>) are guaranteed to land on the same worker node and same destination UDP port. They may arrive out-of-order due to differences in network paths, and some segments may be lost, although that is considered infrequent. Segments are reassembled into event buffers as they arrive. Completed event buffers are entered onto a receive queue from where the user may pick them up using getEvent() (non-blocking) or recvEvent() (blocking) methods. Identifiying event number and data id for the reassembled event buffer are communicated as part of the interface of these calls.

In the background the Reassembler object also periodically calls the control plane using LBManager.SendState() method to inform the control plane of the state of the receive queue. As the queue starts getting fuller, the Load Balancer will start sending fewer event buffer segments to this worker. Vice versa, if the queue stays nearly empty, the Load Balancer will send more event buffer segments if they are available. This happens transparently.

As indicated in the Segmenter section above, the Segmenter object has some control over which destination UDP ports segments are sent to, however it does not know apriori how many ports a Reassembler will open - this information is only communicated between Reassembler and the Load Balancer control plane. Segmenter must learn this information out-of-band if it needs it and this discussion is outside of scope for this document.

In order to communicate with the Control Plane, Reassembler object provides two public methods that allow the user to register (registerWorker())) a receiving worker node or unregister it (deregisterWorker()). Calling registerWorker() is required for the worker node on which the Reassembler is running to start receiving event buffer segments. It is recommended that the upon exit or when detecting a fault, the user invoke deregisterWorker() on order to remove this node from the list of available worker nodes known to the control plane. This will minimize any data losses associated with the crash.

Receiving Data

Before receiving data the user must invoke openAndStart() method of the Reassembler object, which starts the threads and opens the sockets.

Two calls allow user to receive reassembled event buffers:

result<int> recvEvent (uint8_t **event, size_t *bytes, EventNum_t *eventNum, uint16_t *dataId) noexcept

(blocking) and

result<int> getEvent (uint8_t **event, size_t *bytes, EventNum_t *eventNum, uint16_t *dataId) noexcept

(non-blocking, returns value() of -1 if queue was emtpy).

Inspecting Statistics and Errors

Similar to Segmenter there is a block of statistics counters that can be queried:

const boost::tuple<EventNum_t, EventNum_t, int, int, int, E2SARErrorc> 	getStats () const noexcept

The order of counters in the tuple is as follows:

  • enqueueLoss - number of events received and lost due to incompleteness on enqueue
  • eventSuccess - events successfully processed
  • lastErrno - last errno associated with the receive sockets/ports
  • grpcErrCnt - number of errors invoking gRPC SendState calls
  • dataErrCnt - number of errors on receive sockets
  • lastE2SARError - last E2SARError object associated with gRPC calls or other conditions

Customizing Reassembler Behavior

Changing the fields of ReassemblerFlags prior to invoking the constructor is the main way to modify Reassembler behavior. See descriptions of individual fields for further details.

A Note on Debugging

If debugging one-to-one Segmenter to Reassembler without a loadbalancer on the path, it is useful to set useCP flag on both SegmenterFlags and ReassemblerFlags to false, as well as setting withLBHeader to true in ReassemblerFlags. This completely disables any control plane interactions by either side and tells Reassembler to ignore LB headers which will be attached to every segment (they are normally stripped off by the Load Balancer).