Skip to content

Commit

Permalink
implement stream state management
Browse files Browse the repository at this point in the history
  • Loading branch information
TheBeastLT committed Nov 4, 2023
1 parent 6c50cd8 commit 1dae6e8
Show file tree
Hide file tree
Showing 9 changed files with 197 additions and 3 deletions.
30 changes: 30 additions & 0 deletions src/models/ctx/update_streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ pub fn update_streams<E: Env + 'static>(
meta_id: meta_id.to_owned(),
video_id: video_id.to_owned(),
};
let last_stream_state = streams
.items
.get(&key)
.and_then(|item| item.adjusted_state(stream));

let streams_item = StreamsItem {
stream: stream.to_owned(),
Expand All @@ -42,12 +46,38 @@ pub fn update_streams<E: Env + 'static>(
video_id: video_id.to_owned(),
meta_transport_url: meta_request.base.to_owned(),
stream_transport_url: stream_request.base.to_owned(),
state: last_stream_state,
mtime: E::now(),
};

streams.items.insert(key, streams_item);
Effects::msg(Msg::Internal(Internal::StreamsChanged(false)))
}
Msg::Internal(Internal::StreamStateChanged {
state,
stream_request: Some(stream_request),
meta_request: Some(meta_request),
}) => {
let meta_id = &meta_request.path.id;
let video_id = &stream_request.path.id;

let key = StreamsItemKey {
meta_id: meta_id.to_owned(),
video_id: video_id.to_owned(),
};
let steam_item = streams.items.get(&key).cloned();
match steam_item {
Some(item) => {
let new_stream_item = StreamsItem {
state: state.clone(),
..item
};
streams.items.insert(key, new_stream_item);
Effects::msg(Msg::Internal(Internal::StreamsChanged(false)))
}
None => Effects::none().unchanged(),
}
}
Msg::Internal(Internal::StreamsChanged(persisted)) if !persisted => {
Effects::one(push_streams_to_storage::<E>(streams)).unchanged()
}
Expand Down
70 changes: 67 additions & 3 deletions src/models/player.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use crate::types::addon::{AggrRequest, Descriptor, ExtraExt, ResourcePath, Resou
use crate::types::library::{LibraryBucket, LibraryItem};
use crate::types::profile::Settings as ProfileSettings;
use crate::types::resource::{MetaItem, SeriesInfo, Stream, Subtitles, Video};
use crate::types::streams::{StreamItemState, StreamsBucket};

use stremio_watched_bitfield::WatchedBitField;

Expand Down Expand Up @@ -84,6 +85,7 @@ pub struct Player {
pub next_stream: Option<Stream>,
pub series_info: Option<SeriesInfo>,
pub library_item: Option<LibraryItem>,
pub stream_state: Option<StreamItemState>,
#[serde(skip_serializing)]
pub watched: Option<WatchedBitField>,
#[serde(skip_serializing)]
Expand Down Expand Up @@ -165,6 +167,7 @@ impl<E: Env + 'static> UpdateWithCtx<E> for Player {
},
_ => eq_update(&mut self.meta_item, None),
};
let stream_state_effects = eq_update(&mut self.stream_state, None);
let video_params_effects = eq_update(&mut self.video_params, None);
let subtitles_effects = subtitles_update::<E>(
&mut self.subtitles,
Expand Down Expand Up @@ -241,6 +244,7 @@ impl<E: Env + 'static> UpdateWithCtx<E> for Player {
.join(update_streams_effects)
.join(selected_effects)
.join(meta_item_effects)
.join(stream_state_effects)
.join(video_params_effects)
.join(subtitles_effects)
.join(next_video_effects)
Expand Down Expand Up @@ -272,6 +276,7 @@ impl<E: Env + 'static> UpdateWithCtx<E> for Player {
let selected_effects = eq_update(&mut self.selected, None);
let video_params_effects = eq_update(&mut self.video_params, None);
let meta_item_effects = eq_update(&mut self.meta_item, None);
let stream_state_effects = eq_update(&mut self.stream_state, None);
let subtitles_effects = eq_update(&mut self.subtitles, vec![]);
let next_video_effects = eq_update(&mut self.next_video, None);
let next_streams_effects = eq_update(&mut self.next_streams, None);
Expand All @@ -289,6 +294,7 @@ impl<E: Env + 'static> UpdateWithCtx<E> for Player {
.join(selected_effects)
.join(video_params_effects)
.join(meta_item_effects)
.join(stream_state_effects)
.join(subtitles_effects)
.join(next_video_effects)
.join(next_streams_effects)
Expand All @@ -309,6 +315,23 @@ impl<E: Env + 'static> UpdateWithCtx<E> for Player {
);
video_params_effects.join(subtitles_effects)
}
Msg::Action(Action::Player(ActionPlayer::StreamStateChanged { state })) => {
let stream_state_effects = eq_update(&mut self.stream_state, state.to_owned());
let state_changed_effects =
Effects::msg(Msg::Internal(Internal::StreamStateChanged {
state: state.to_owned(),
stream_request: self
.selected
.as_ref()
.and_then(|selected| selected.stream_request.to_owned()),
meta_request: self
.selected
.as_ref()
.and_then(|selected| selected.meta_request.to_owned()),
}))
.unchanged();
stream_state_effects.join(state_changed_effects)
}
Msg::Action(Action::Player(ActionPlayer::TimeChanged {
time,
duration,
Expand Down Expand Up @@ -468,6 +491,12 @@ impl<E: Env + 'static> UpdateWithCtx<E> for Player {
),
_ => Effects::none().unchanged(),
};
let stream_state_effects = stream_state_update(
&mut self.stream_state,
&self.selected,
&self.meta_item,
&ctx.streams,
);
let subtitles_effects = resources_update_with_vector_content::<E, _>(
&mut self.subtitles,
ResourcesAction::ResourceRequestResult { request, result },
Expand Down Expand Up @@ -532,6 +561,7 @@ impl<E: Env + 'static> UpdateWithCtx<E> for Player {
analytics_context.duration = duration;
};
meta_item_effects
.join(stream_state_effects)
.join(subtitles_effects)
.join(next_video_effects)
.join(next_streams_effects)
Expand Down Expand Up @@ -577,6 +607,42 @@ fn switch_to_next_video(
Effects::none().unchanged()
}

fn stream_state_update(
state: &mut Option<StreamItemState>,
selected: &Option<Selected>,
meta_item: &Option<ResourceLoadable<MetaItem>>,
streams: &StreamsBucket,
) -> Effects {
match (&state, selected, meta_item) {
(
None,
Some(Selected {
stream,
stream_request: Some(stream_request),
meta_request: Some(meta_request),
..
}),
Some(ResourceLoadable {
content: Some(Loadable::Ready(meta_item)),
..
}),
) => {
let video_id = &stream_request.path.id;
let next_state = streams
.last_stream_item(video_id, meta_item)
.and_then(|item| item.adjusted_state(stream));
let state_changed_effect = Effects::msg(Msg::Internal(Internal::StreamStateChanged {
state: next_state.to_owned(),
stream_request: Some(stream_request.to_owned()),
meta_request: Some(meta_request.to_owned()),
}))
.unchanged();
eq_update(state, next_state).join(state_changed_effect)
}
_ => Effects::none().unchanged(),
}
}

fn next_video_update(
video: &mut Option<Video>,
stream: &Option<Stream>,
Expand Down Expand Up @@ -716,9 +782,7 @@ fn next_stream_update(
}),
) if settings.binge_watching => streams
.iter()
.find(|Stream { behavior_hints, .. }| {
behavior_hints.binge_group == stream.behavior_hints.binge_group
})
.find(|next_stream| next_stream.is_binge_match(stream))
.cloned(),
_ => None,
};
Expand Down
4 changes: 4 additions & 0 deletions src/runtime/msg/action.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::ops::Range;
use serde::Deserialize;
use url::Url;

use crate::types::streams::StreamItemState;
use crate::{
models::{
addon_details::Selected as AddonDetailsSelected,
Expand Down Expand Up @@ -130,6 +131,9 @@ pub enum ActionPlayer {
VideoParamsChanged {
video_params: Option<VideoParams>,
},
StreamStateChanged {
state: Option<StreamItemState>,
},
TimeChanged {
time: u64,
duration: u64,
Expand Down
7 changes: 7 additions & 0 deletions src/runtime/msg/internal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use crate::types::library::{LibraryBucket, LibraryItem, LibraryItemId};
use crate::types::profile::{Auth, AuthKey, Profile, User};
use crate::types::resource::Stream;
use crate::types::streaming_server::Statistics;
use crate::types::streams::StreamItemState;

pub type CtxStorageResponse = (
Option<Profile>,
Expand Down Expand Up @@ -58,6 +59,12 @@ pub enum Internal {
stream_request: Option<ResourceRequest>,
meta_request: Option<ResourceRequest>,
},
/// Dispatched when stream item's state has changed
StreamStateChanged {
state: Option<StreamItemState>,
stream_request: Option<ResourceRequest>,
meta_request: Option<ResourceRequest>,
},
/// Dispatched when library item needs to be updated in the memory, storage and API.
UpdateLibraryItem(LibraryItem),
/// Dispatched when some of auth, addons or settings changed.
Expand Down
11 changes: 11 additions & 0 deletions src/types/resource/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,17 @@ impl Stream {
_ => None,
}
}

#[inline]
pub fn is_binge_match(&self, other_stream: &Stream) -> bool {
match (
&self.behavior_hints.binge_group,
&other_stream.behavior_hints.binge_group,
) {
(Some(a), Some(b)) => a == b,
_ => false,
}
}
}

#[serde_as]
Expand Down
23 changes: 23 additions & 0 deletions src/types/streams/streams_bucket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use serde::{Deserialize, Serialize};
use serde_with::serde_as;

use crate::types::profile::UID;
use crate::types::resource::MetaItem;
use crate::types::streams::StreamsItem;

#[derive(Default, Debug, Clone, Eq, PartialEq, Hash, Serialize, Deserialize)]
Expand All @@ -28,4 +29,26 @@ impl StreamsBucket {
items: HashMap::new(),
}
}

pub fn last_stream_item(
&self,
video_id: &String,
meta_item: &MetaItem,
) -> Option<&StreamsItem> {
meta_item
.videos
.iter()
.position(|video| video.id == *video_id)
.and_then(|max_index| {
meta_item.videos[max_index.saturating_sub(30)..=max_index]
.iter()
.rev()
.find_map(|video| {
self.items.get(&StreamsItemKey {
meta_id: meta_item.preview.id.to_string(),
video_id: video.id.to_owned(),
})
})
})
}
}
53 changes: 53 additions & 0 deletions src/types/streams/streams_item.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,60 @@ pub struct StreamsItem {
pub video_id: String,
pub meta_transport_url: Url,
pub stream_transport_url: Url,
pub state: Option<StreamItemState>,
/// Modification time
#[serde(rename = "_mtime")]
pub mtime: DateTime<Utc>,
}

#[serde_as]
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct StreamItemState {
pub subtitle_track_id: Option<String>,
pub subtitle_language: Option<String>,
pub subtitle_delay_ms: Option<i64>,
pub audio_track_id: Option<String>,
pub audio_language: Option<String>,
pub audio_delay_ms: Option<i64>,
pub playback_speed: Option<f32>,
pub player_type: Option<String>,
}

impl StreamsItem {
/// Retrieve adjusted stream state based on given stream:
/// If stream source is the same we want to retain the same state;
/// If stream binge group matches we want to retain same state except
/// for subtitle and audio delay, as these are not relevant when playing
/// a stream for next binge video as audio/subtitle sync might be different,
/// but we want to retain track ids as next binge group might have
/// the same embedded tracks with same ids;
/// Otherwise retain only playback speed and player type as these should not change
/// regardless of the video, since you usually want to maintain these throughout
/// the whole series;
#[inline]
pub fn adjusted_state(&self, new_stream: &Stream) -> Option<StreamItemState> {
self.state.clone().map(|state| {
let is_exact_match = self.stream.source == new_stream.source;
let is_binge_match = self.stream.is_binge_match(new_stream);
if is_exact_match {
return state;
} else if is_binge_match {
return StreamItemState {
subtitle_delay_ms: None,
audio_delay_ms: None,
..state
};
}
StreamItemState {
subtitle_track_id: None,
subtitle_language: None,
subtitle_delay_ms: None,
audio_track_id: None,
audio_language: None,
audio_delay_ms: None,
..state
}
})
}
}
1 change: 1 addition & 0 deletions src/unit_tests/ctx/uninstall_addon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ fn create_addon_streams_item(addon: &Descriptor) -> StreamsItem {
video_id: "tt123456:1:0".to_owned(),
meta_transport_url: addon.transport_url.clone(),
stream_transport_url: addon.transport_url.clone(),
state: None,
mtime: TestEnv::now(),
}
}
Expand Down
1 change: 1 addition & 0 deletions src/unit_tests/deep_links/library_item_deep_links.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ const TORRENT_STREAMS_ITEM: Lazy<StreamsItem> = Lazy::new(|| {
stream_transport_url: "https://torrentio.strem.fun/qualityfilter=1080p,720p/manifest.json"
.parse()
.unwrap(),
state: None,
mtime: Utc::now(),
}
});
Expand Down

0 comments on commit 1dae6e8

Please sign in to comment.