Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(core): async adapter #419

Open
wants to merge 14 commits into
base: main
Choose a base branch
from
Open

feat(core): async adapter #419

wants to merge 14 commits into from

Conversation

Totodore
Copy link
Owner

@Totodore Totodore commented Jan 1, 2025

Full rework of the async adapter.
The majority of the adapter code is now in the socketioxide-core crate. It contains a LocalAdapter with the following type def:

pub trait CoreAdapter<E: SocketEmitter>: Sized + Send + Sync + 'static {
    /// An error that can occur when using the adapter.
    type Error: StdError + Into<AdapterError> + Send + 'static;
    /// A shared state between all the namespace [`CoreAdapter`].
    /// This can be used to share a connection for example.
    type State: Send + Sync + 'static;
    /// A stream that emits the acknowledgments of multiple sockets.
    type AckStream: Stream<Item = AckStreamItem<E::AckError>> + FusedStream + Send + 'static;

    /// Creates a new adapter with the given state and local adapter.
    ///
    /// The state is used to share a common state between all your adapters. E.G. a connection to a remote system.
    /// The local adapter is used to manipulate the local sockets.
    fn new(state: &Self::State, local: CoreLocalAdapter<E>) -> Self;

    /// Initializes the adapter.
    fn init(self: Arc<Self>) -> impl Future<Output = Result<(), Self::Error>> + Send;

    /// Closes the adapter.
    fn close(&self) -> impl Future<Output = Result<(), Self::Error>> + Send;

    /// Returns the number of servers.
    fn server_count(&self) -> impl Future<Output = Result<u16, Self::Error>> + Send;

    /// Broadcasts the packet to the sockets that match the [`BroadcastOptions`].
    fn broadcast(&self, packet: Packet, opts: BroadcastOptions) -> impl Future<Output = Result<(), BroadcastError>> + Send;

    /// Broadcasts the packet to the sockets that match the [`BroadcastOptions`]
    /// and return a stream of ack responses.
    fn broadcast_with_ack(&self, packet: Packet, opts: BroadcastOptions, timeout: Option<Duration>) -> impl Future<Output = Result<Self::AckStream, Self::Error>> + Send;

    /// Adds the sockets that match the [`BroadcastOptions`] to the rooms.
    fn add_sockets(&self, opts: BroadcastOptions, rooms: impl RoomParam) -> impl Future<Output = Result<(), Self::Error>> + Send;

    /// Removes the sockets that match the [`BroadcastOptions`] from the rooms.
    fn del_sockets(&self, opts: BroadcastOptions, rooms: impl RoomParam) -> impl Future<Output = Result<(), Self::Error>> + Send;

    /// Disconnects the sockets that match the [`BroadcastOptions`].
    fn disconnect_socket(&self, opts: BroadcastOptions) -> impl Future<Output = Result<(), BroadcastError>> + Send;

    /// Fetches rooms that match the [`BroadcastOptions`]
    fn rooms(&self, opts: BroadcastOptions) -> impl Future<Output = Result<Vec<Room>, Self::Error>> + Send;

    /// Fetches remote sockets that match the [`BroadcastOptions`].
    fn fetch_sockets(&self, opts: BroadcastOptions) -> impl Future<Output = Result<Vec<RemoteSocketData>, Self::Error>> + Send;
    /// Returns the local adapter. Used to enable default behaviors.
    fn get_local(&self) -> &CoreLocalAdapter<E>;
}

The SocketEmitter is an internal interface implemented by the socketioxide crate to execute actions on the sockets:

/// A item yield by the ack stream.
pub type AckStreamItem<E> = (Sid, Result<Value, E>);
/// The [`SocketEmitter`] will be implemented by the socketioxide library.
/// It is simply used as an abstraction to allow the adapter to communicate
/// with the socket server without the need to depend on the socketioxide lib.
pub trait SocketEmitter: Send + Sync + 'static {
    /// An error that can occur when sending data an acknowledgment.
    type AckError: StdError + Send + Serialize + DeserializeOwned + 'static;
    /// A stream that emits the acknowledgments of multiple sockets.
    type AckStream: Stream<Item = AckStreamItem<Self::AckError>> + FusedStream + Send + 'static;

    /// Get all the socket ids in the namespace.
    fn get_all_sids(&self, filter: impl Fn(&Sid) -> bool) -> Vec<Sid>;
    /// Get the socket data that match the list of socket ids.
    fn get_remote_sockets(&self, sids: BroadcastIter<'_>) -> Vec<RemoteSocketData>;
    /// Send data to the list of socket ids.
    fn send_many(&self, sids: BroadcastIter<'_>, data: Value) -> Result<(), Vec<SocketError>>;
    /// Send data to the list of socket ids and get a stream of acks and the number of expected acks.
    fn send_many_with_ack(
        &self,
        sids: BroadcastIter<'_>,
        packet: Packet,
        timeout: Option<Duration>,
    ) -> (Self::AckStream, u32);
    /// Disconnect all the sockets in the list.
    fn disconnect_many(&self, sids: Vec<Sid>) -> Result<(), Vec<SocketError>>;
    /// Get the path of the namespace.
    fn path(&self) -> &Str;
    /// Get the parser of the namespace.
    fn parser(&self) -> impl Parse;
    /// Get the unique server id.
    fn server_id(&self) -> Uid;
}

The Adapter default implementation is the local adapter. Any other implementation that uses remote systems (redis, mongo, ...) can propagate requests and then execute the default behavior with the CoreLocalAdapter passed to the adapter implementation.

@Totodore Totodore added A-core Area related to socketioxide-core C-Feature-request Request for a feature labels Jan 1, 2025
@Totodore Totodore self-assigned this Jan 1, 2025
@Totodore Totodore linked an issue Jan 1, 2025 that may be closed by this pull request
* test(e2e): provide e2e testing system for adapters

* fix(e2e/adapter): remove redis deps
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A-core Area related to socketioxide-core C-Feature-request Request for a feature
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Feature: Adapter trait with optional async
1 participant