Skip to content
This repository has been archived by the owner on Aug 25, 2021. It is now read-only.

Recover state after RpcConnection reconnection #47

Closed
evdokimovs opened this issue Aug 28, 2019 · 6 comments
Closed

Recover state after RpcConnection reconnection #47

evdokimovs opened this issue Aug 28, 2019 · 6 comments
Assignees
Labels
feature New feature or request k::api Related to API (application interface)
Milestone

Comments

@evdokimovs
Copy link
Contributor

evdokimovs commented Aug 28, 2019

Part of #27

Background

При попытке отправить событие пользователю в закрытый RpcConnection,
сообщения не будут отправлены или сохранены. В текущей реализации ошибки отправки обрабатываются закрытием комнаты.

Problem to solve

Проблема возникнет при следующем сценарии:

  1. Подключился пользователь A.
  2. Подключился пользователь B.
  3. Начинается сигналинг между пользователями, происходит отправка PeerCreated пользователю A.
  4. Тереяется соединение с пользователем B (RpcConnectionClosed(ClosedReason::Lost))
  5. Пользователь A отвечает MakeSdpOffer.
  6. При поптыке отправить пользователю B событие SdpOfferMade происходит ошибка, приводящая к закрытию комнаты.

В текущей реализации, ClosedReason::Lost будет отправлен в случае отсутствия ping'ов от клиента в течении Rpc.idle_timeout.

В случае поетри соединения будет отправленClosedReason::Closed независимо от статуса.

Possible solutions

Изменения в отправке сообщений клиентам находящимся в состоянии Lost (могут вернутся):

Предлагается сделать буфферизацию сообщений в случае отсутствия подключения
с клиентом.

В случае перепоключения клиента будет совершена попытка зафлашить буфер в новое подкючение.

В случае ClosedReason::Closed сообщения будут дискардится.

Предлагаемая реализация

Для начала, предлагается создать обертку над Box<dyn RpcConnection> с возможностью
буфферизации EventMessage. Выглядеть это будет примерно так:

struct BufferedRpcConnection {
    connection: Box<dyn RpcConnection>,
    buffer: VecDeque<EventMessage>,
}

impl BufferedRpcConnection {
    /// Swap [`RpcConnection`] in this [`BufferedRpcConnection`] 
    /// with another connection.
    pub fn swap_connection(
        &mut self,
        another_conn: &mut Box<dyn RpcConnection>,
    ) { /* ... */ }

    /// Send all buffered [`EventMessage`]s to [`RpcConnection`].
    pub fn flush(
        &mut self
    ) -> impl Future<Item = (), Error = ()> { /* ... */ }

    /// Swap [`RpcConnection`] and send all buffered messages to
    /// new [`RpcConnection`].
    pub fn swap_connection_and_flush(
        &mut self,
        another_conn: &mut Box<dyn RpcConnection>,
    ) -> impl Future<Item = (), Error = ()> { /* ... */ }
}

impl RpcConnection for BufferedRpcConnection {
    /// Sends [`EventMessage`] to remote [`RpcConnection`] and store
    /// this [`EventMessage`] in buffer if [`RpcConnection::send_event`] fails.
    fn send_event(
        &self,
        msg: EventMessage,
    ) -> Box<dyn Future<Item = (), Error = ()>> { /* ... */ }

    /// Close [`RpcConnection`].
    fn close(&mut self) -> Box<dyn Future<Item = (), Error = ()>> {
        self.connection.close()
    }
}

В BufferedRpcConnection::send_event мы считаем, что RpcConnection оборван, если
RpcConnection::send_event возвращает ошибку (на данный момент это (), но при надобности можно будет добавить enum). На данный момент для Addr<WsSession>::send_event это будет MailboxError::Closed.
В документации это не описано, но опытным путем определено, что это так.

BufferedRpcConnection::swap_connection_and_flush будет вызываться в
ParticipantService::connection_established, если будет найден
BufferedRpcConnection для подключившегося пользователя.

Ну и соответственно ParticipantService теперь будет вместо Box<dyn RpcConnection хранить BufferedRpcConnection в поле connections.

Переподключение пользователя по истечению reconnect_timeout (ClosedReason::Closed)

В этом случае сервер будет воспринимать пользователя как нового и будет пытатся соединить его с другими пользователями заново.

Тут основной проблемой является то, что сервер ничего не знает про его текущее состояние, клиент еще может держать обьекты, ассоциированные с более не существующими обьектами сервера, удаленными по причине отключения пользователя.

Варианты решения:

  1. Производить сверку состояний обьектов клиента с обьектами сервера. На первый взгляд механизм кажется весьма сложным и ненадежным.
  2. Ресетить состояние клиента и поднимать его заново.

В дальнейшем будут обсуждатся способы реализации второго варианта.

Первый вариант

Можно добавить поле closed_members в ParticipantService в которое мы
будем добавлять пользователей, которые были отключены по таймауту. Когда
такой пользователь будет повторно подключаться, мы будем отправлять ему
EventMessage::ResetState. По-идее, на такое сообщение клиент должен будет
сообщать об этом приложению и пересоздавать Room. Пытаться восстанавливать
в таком случае состояние - бесполезно, поскольку остальные пользователи
уже будут считать переподключившегося пользователя "отвалившемся" и удалят
его у себя.

Второй вариант

Также можно сообщать свежеподключившемуся пользователю о том, что его подключение новое. Таким образом на такие сообщения, если клиент считает, что он переподключался, он должен будет уничтожить обьекты ассоциированые с этим подключением и ожидать дальнейших комманд.

Такой вариант выглядит предпочтительнее из-за отсутствия необходимости
в сохранении "отвалившихся" клиентов.

Переподключение пользователя до наступления таймаута на переподключение (ClosedReason::Lost)

Тут все сложнее, поскольку на стороне клиента, в момент потерянного RpcConnection
тоже могут происходить Commandы, которые он будет пытаться отправить. Соответсвенно их он должен будет буфферизировать таким же образом, как и сервер, но тут уже вылазят проблемы синхронизации сервера и клиента. Можно проставлять UNIX timestamp в буффере для каждого Event и Command. Но останутся проблемы с конфликтами.

Для примера можно взять ситуацию с получением сервером Command::RemovePeers, но при этом на сервере эти пиры уже удалены. В таком случае сервер должен будет проигнорировать такой невалидный Event. Поскольку клиенту в тот же момент должен будет придти буфферизированный Event::PeersRemoved с этими удаленными пирами. Для большей надежности можно добавить в Command и Event флаг, сообщающий о том, что данное сообщение было буфферизированно, и может иметь конфликты, которые нужно игнорировать.

На данный момент, у нас нет как таковых конфликтующий Command/Event, которые могли бы произойти без соединения с сервером. Но с первого взгляда, предложенный мною подход должен быть рабочим во всех случаях.

Все что описано в данном разделе - это лишь размышления о решении будущих проблем с синхронизацией клиента и сервера. На данный момент у нас мало ситуаций в которых могут происходить конфликты, и поэтому в будущем описанный вариант решения проблемы может оказаться не рабочим, но может послужить отправной точкой.

Изменения в логике отправки RpcConnectionClosed:

  1. ClosedReason::Closed если ws-коннект был закрыт со статусом 1000.
  2. ClosedReason::Lost по истечению idle_timeout либо при закрытии ws-коннекта с любым статусом отличным от 1000 либо при отсутствии статуса.
@evdokimovs evdokimovs added feature New feature or request k::api Related to API (application interface) labels Aug 28, 2019
@evdokimovs evdokimovs added this to the 0.2.0 milestone Aug 28, 2019
@evdokimovs evdokimovs self-assigned this Aug 28, 2019
@evdokimovs
Copy link
Contributor Author

@alexlapa , cc

@alexlapa
Copy link
Collaborator

@evdokimovs ,

Немного отредактировал текст в шапке дабы лучше обьяснить ситуацию.

В целом, с предложенной реализацией согласен. По мелочам:

  1. Не думаю, что есть смысл делать swap_connection(), flush() публичными.
  2. Не до конца понял целесообразность в impl RpcConnection for BufferedRpcConnection. Тут идея в том, что рума будет и дальше работать с dyn RpcConnection, а конкретный тип будет известен ParticipantService'у?
  3. BufferedRpcConnection => BufferingRpcConnection?

@tyranron ,

Будут замечания/предложения по предлоежнному решению?

@evdokimovs
Copy link
Contributor Author

evdokimovs commented Aug 28, 2019

@alexlapa ,

Не думаю, что есть смысл делать swap_connection(), flush() публичными.

Согласен, но есть одно но. Я еще раз посмотрел на все это уже в коде, и есть ощущение, что swap_connection_and_flush() вообще не имеет смысла, потому-что нам в любом случае перед flush() нужно будет закрывать старый RpcConnection на всякий случай. Следовательно, действия flush() и swap_connection() придется разделять, и то на то и выйдет по колличеству строк, а по читаемости станет даже чуть хуже, как по мне.

Не до конца понял целесообразность в impl RpcConnection for BufferedRpcConnection.

Добавит гибкости, и выглядит чуть более логично. Но из неприятных сторон, на практике из-за impl RpcConnection придется использовать Arc<Mutex<...>> в качестве буффера.

Тут идея в том, что рума будет и дальше работать с dyn RpcConnection, а конкретный тип будет известен ParticipantService'у?

Да, выглядит неплохо.

BufferedRpcConnection => BufferingRpcConnection?

На мой взгляд Buffered звучит корректнее. Потому-что "Buffered" - это прилагательное, а "Buffering" - глагол, а глаголы обычно в названиях структур не используют.

@evdokimovs
Copy link
Contributor Author

Сценарий 1

  1. Подключился пользователь Alice.

  2. Подключился пользователь Bob.

  3. Начинается сигналинг между пользователями, происходит отправка PeerCreated пользователю Alice.

  4. Тереяется соединение с пользователем Bob (RpcConnectionClosed(ClosedReason::Lost))

  5. Пользователь Alice отвечает MakeSdpOffer.

  6. Попытка отправить PeerCreated пользователю Bob.

  7. Пользователь Bob переподключился:

Восстановление соединения пользователя Bob в случае буфферизации

  1. Отправка Bob'у сообщения о том, что он переподключился.
  2. Отправка буфферизированного PeerCreated пользователю Bob.
  3. Дальше все идет в обычном режиме.

Восстановление соединения пользователя Bob в случае snapshoting'а

  1. Отправка пользователю Bob сообщения со следующим состоянием:
alice:
  peers:
    - id: 1
      state: WaitRemoteSdp
      sdp_offer: "..."
      sdp_answer: null
      receivers:
        - ...
      senders:
        - ...
bob:
  peers:
    - id: 2
      state: WaitLocalHaveRemote
      sdp_offer: "..."
      sdp_answer: null
      receivers:
        - ...
      senders:
        - ...
  1. Bob сверяет свое состояние с состоянием сервера и создает недостающий пир:
struct Room {
  peers: PeerRepository,
  /* ... */
}

impl Room {
  fn on_handshake(&mut self, snapshot: Snaphost) {
    let me = snapshot.members.get("bob");
    for peer in me.peers {
      if let Some(my_peer) = self.peers.get(&peer.id) {
        if peer.state > my_peer.state {
          /* ... */
        } else if peer.state < my_peer.state {
          /* ... */
        }
      } else {
        self.peers.create(peer);
      }
    }
  }

  fn create_peer(&mut self, new_peer: PeerStateMachine) {
    match new_peer {
      PeerStateMachine::WaitLocalHaveRemote(peer: Peer<WaitLocalHaveRemote>) => {
        let peer_connection = peer.create_connection();
        self.peers.push(peer_connection);
        self.rpc.send_command(Command::MakeSdpAnswer {
          peer_id: peer_connection.id,
          sdp_answer: peer_connection.sdp_answer(),
        });
      },
      /* ... */
    }  
  }
}

impl Peer<WaitLocalHaveRemote> {
  pub fn create_connection(self, room: &Room) -> PeerConnection<WaitLocalHaveRemote> {
    let connection = PeerConnection<WaitLocalHaveRemote>::new();

    room.create_connection_from_tracks(self.context.tracks);
    connection.proccess_offer(self.sdp_offer);
    connection.create_and_set_answer();

    connection
  }
}

Сценарий 2

  1. Во время разговора у пользователя Bob отваливается подключение.
  2. Пользователь Alice сбрасывает звонок и отключается.
  3. Сервер пытается отправить Event::PeersRemoved пользователю Bob.
  4. Пользователь Bob переподключился:

Восстановление соединения пользователя Bob в случае буфферизации

  1. Отправка Bob'у сообщения о том, что он переподключился.
  2. Отправка буфферизированного Event::PeersRemoved Bob'у.
  3. Bob удаляет пиры.

Восстановление соединения пользователя Bob в случае snapshoting'а

  1. Bob'у приходит пустой снапшот состояния.
  2. Bob сверяет есть-ли его пиры в снапшоте и удаляет те, которых нет:
impl Room {
  fn on_handshake(&mut self, snapshot: Snapshot) {
    /* ... */
    in (id, peer) in self.peers {
      if let None = snapshot.peers.get(peer) {
        self.peers.remove(id);
      }
    }
  }
}

Сценарий 3

  1. Подключился пользователь Alice.
  2. Подключился пользователь Bob.
  3. Начинается сигналинг между пользователями, происходит отправка PeerCreated пользователю Alice.
  4. Тереяется соединение с пользователем Alice (RpcConnectionClosed(ClosedReason::Lost)).
  5. Пользователь Alice пытается ответить Command::MakeSdpOffer.

Восстановление соединения пользователя Alice в случае буфферизации

  1. Alice получает сообщение о том, она переподключилась.
  2. На сервер отправляется буфферизированный Command::MakeSdpOffer.
  3. Дальше все идет как обычно.

Восстановление соединения пользователя Alice в случае snapshoting'а

  1. Alice получает состояние с сервера.
alice:
  peers:
    - id: 1
      state: New
      sdp_offer: null
      sdp_answer: null
  1. Видит, что на сервере Peer state New, а у нее состояние WaitRemoteSdp и отправляет Commandы нужные, чтобы апгрейднуть состояние сервера на сервер:
struct Room {
  peers: PeerRepository,
  /* ... */
}

impl Room {
  fn on_handshake(&mut self, snapshot: Snaphost) {
    let me = snapshot.members.get("bob");
    for peer in me.peers {
      if let Some(my_peer) = self.peers.get(&peer.id) {
        if peer.state > my_peer.state {
          my_peer.upgrade(peer); // Here it would be something similar to the logic of the server upgrade.
        } else if peer.state < my_peer.state {
          my_peer.upgrade_server_peer(peer);
        }
      } else {
        self.peers.create(peer);
      }
    }
  }
}

impl PeerConnection<WaitLocalHaveRemote> {
  pub fn upgrade_server_peer(&self, server_peer: PeerStateMachine, room: &Room) {
    match server_peer {
      PeerStateMachine::New(server_peer: Peer<New>) => {
        room.rpc.send_command(Command::MakeSdpAnswer {
          /* ... */
        })
      }
    }
  }
}

@evdokimovs
Copy link
Contributor Author

@alexlapa , @tyranron , cc

@evdokimovs evdokimovs changed the title Buffer messages if RpcConnection is disconnected atm Recover state after RpcConnection reconnection Sep 9, 2019
evdokimovs added a commit that referenced this issue Jan 16, 2020
- impl RpcClient reconnection
- add ServerMsg::RpcSettings and its sending/processing
- add Room.on_connection_loss callback which fires when RpcClient loses connection
- add ReconnectHandle for JS side to start reconnection
- impl reconnection with backoff and simple reconnection with a constant retry delay
- add connection loss notification with button for manual reconnection to 'e2e-demo' app
- add WebSocket connection state indicator to 'demo' app
- remove RpcTransport::on_close and use RpcTransport::on_state_change instead

Additionally:
- rewrite 'satisfies_by_device_id!' and 'console_error!' macros as functions
- remove 'weak_map!' macro and add 'upgrade_or_detached!' instead
- add 'new_js_error!' macro for creating JasonError with 'tracerr' information for JS side
- reverse ping/pong mechanism: server sends Ping and expects Pong from client
- add 'rpc.ping_interval' configuration option for Medea
- use 'fakerator' instead of 'faker' for generating random male usernames in demos
evdokimovs added a commit that referenced this issue May 29, 2020
- pause mute/unmute state transition timers when RPC connection is lost
evdokimovs added a commit that referenced this issue Dec 8, 2020
…159, #27, #47)

- add progressable collections: Vec, HashMap, HashSet
- add Progressable cell
evdokimovs added a commit that referenced this issue Jan 8, 2021
- impl reactivity Component structure and its helper abstractions in 'medea-jason' crate
evdokimovs added a commit that referenced this issue Jan 13, 2021
…, #47)

- add Component and State for Sender, Receiver, Peer and PeerRepository
- rewrite EventHandler implementation to update Component's States instead of real objects

Additionally:
- upgrade Firefox to 84.0.2 version for E2E tests
- fix 'opt-level' being ignored when compiling 'medea-jason' crate in Cargo manifest
- globally set 'codegen-units = 1' in Cargo manifest
evdokimovs added a commit that referenced this issue Jan 27, 2021
- implement MediaStates updating before negotiation
- fix renegotiation after force push
- send MediaStates update intentions basing on reactive state

Additionally:
- upgrade Chrome to 88.0 version for E2E tests
evdokimovs added a commit that referenced this issue Jan 29, 2021
- add states for client and sever synchronization to 'medea-client-api-proto' crate
- add Event::StateSynchronized and Command::SynchronizeMe to 'medea-client-api-proto` crate
- implement state synchronization on reconnection in 'medea-jason' and 'medea' crates

Additionally:
- upgrade Firefox to 85.0 version for E2E tests
@alexlapa
Copy link
Collaborator

Resolved in #167

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
feature New feature or request k::api Related to API (application interface)
Projects
None yet
2 participants