Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add new http apis #107

Merged
merged 6 commits into from
Mar 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 19 additions & 16 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions application/xiu/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,12 @@ and this project adheres to [Semantic Versioning](http://semver.org/).

## [Unreleased] - ReleaseDate

## [0.12.5]
- Support querying more detailed statistic data by adding two new HTTP APIs.
- Fix publishing RTSP stream error caused by network problem. by @bailb
- Fix the bug that stopping the playback of RTSP stream leads to push(publish) failure.
- Upgrade failure library.

## [0.12.4]
- Fix the failure in generating Docker images.

Expand Down
2 changes: 1 addition & 1 deletion application/xiu/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ serde_derive = "1.0"
serde = { version = "1.0.101", optional = true, features = ["derive"] }
anyhow = "^1.0"
log = "0.4.0"
failure = "0.1.1"
failure = "0.1.8"
clap = "4.1.4"
libc = "0.2.139"
serde_json = { version = "1", default-features = false, features = [
Expand Down
149 changes: 107 additions & 42 deletions application/xiu/src/api.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,45 @@
use {
anyhow::Result,
axum::{
extract::Query,
routing::{get, post},
Json, Router,
},
serde::Deserialize,
serde_json::Value,
std::sync::Arc,
streamhub::{define, define::StreamHubEventSender, utils::Uuid},
{
tokio,
tokio::sync::{mpsc, oneshot},
streamhub::{
define::{self, StreamHubEventSender},
stream::StreamIdentifier,
utils::Uuid,
},
tokio::{self, sync::oneshot},
};

#[derive(serde::Serialize)]
struct ApiResponse<T> {
error_code: i32,
desp: String,
data: T,
}

// the input to our `KickOffClient` handler
#[derive(Deserialize)]
struct KickOffClient {
id: String,
uuid: String,
}

#[derive(Deserialize, Debug)]
struct QueryWholeStreamsParams {
// query top N by subscriber's count.
top: Option<usize>,
}

#[derive(Deserialize)]
struct QueryStream {
identifier: StreamIdentifier,
// if specify uuid, then query the stream by uuid and filter no used data.
uuid: Option<String>,
}

#[derive(Clone)]
Expand All @@ -28,50 +51,89 @@ impl ApiService {
async fn root(&self) -> String {
String::from(
"Usage of xiu http api:
./get_stream_status(get) get audio and video stream statistic information.
./kick_off_client(post) kick off client by publish/subscribe id.\n",
./api/query_whole_streams(get) query whole streams' information or top streams' information.
./api/query_stream(post) query stream information by identifier and uuid.
./api/kick_off_client(post) kick off client by publish/subscribe id.\n",
)
}

async fn get_stream_status(&self) -> Result<String> {
let (data_sender, mut data_receiver) = mpsc::unbounded_channel();
let (size_sender, size_receiver) = oneshot::channel();
async fn query_whole_streams(
&self,
params: QueryWholeStreamsParams,
) -> Json<ApiResponse<Value>> {
log::info!("query_whole_streams: {:?}", params);
let (result_sender, result_receiver) = oneshot::channel();
let hub_event = define::StreamHubEvent::ApiStatistic {
data_sender,
size_sender,
top_n: params.top,
identifier: None,
uuid: None,
result_sender,
};
if let Err(err) = self.channel_event_producer.send(hub_event) {
log::error!("send api event error: {}", err);
}
let mut data = Vec::new();
match size_receiver.await {
Ok(size) => {
if size == 0 {
return Ok(String::from("no stream data"));
}
loop {
if let Some(stream_statistics) = data_receiver.recv().await {
data.push(stream_statistics);
}
if data.len() == size {
break;
}
}

match result_receiver.await {
Ok(dat_val) => {
let api_response = ApiResponse {
error_code: 0,
desp: String::from("succ"),
data: dat_val,
};
Json(api_response)
}
Err(err) => {
log::error!("start_api_service recv size error: {}", err);
let api_response = ApiResponse {
error_code: -1,
desp: String::from("failed"),
data: serde_json::json!(err.to_string()),
};
Json(api_response)
}
}
}

async fn query_stream(&self, stream: QueryStream) -> Json<ApiResponse<Value>> {
let uuid = if let Some(uid) = stream.uuid {
Uuid::from_str2(&uid)
} else {
None
};

if let Ok(data) = serde_json::to_string(&data) {
return Ok(data);
let (result_sender, result_receiver) = oneshot::channel();
let hub_event = define::StreamHubEvent::ApiStatistic {
top_n: None,
identifier: Some(stream.identifier),
uuid,
result_sender,
};

if let Err(err) = self.channel_event_producer.send(hub_event) {
log::error!("send api event error: {}", err);
}

Ok(String::from(""))
match result_receiver.await {
Ok(dat_val) => {
let api_response = ApiResponse {
error_code: 0,
desp: String::from("succ"),
data: dat_val,
};
Json(api_response)
}
Err(err) => {
let api_response = ApiResponse {
error_code: -1,
desp: String::from("failed"),
data: serde_json::json!(err.to_string()),
};
Json(api_response)
}
}
}

async fn kick_off_client(&self, id: KickOffClient) -> Result<String> {
let id_result = Uuid::from_str2(&id.id);
let id_result = Uuid::from_str2(&id.uuid);

if let Some(id) = id_result {
let hub_event = define::StreamHubEvent::ApiKickClient { id };
Expand All @@ -93,26 +155,29 @@ pub async fn run(producer: StreamHubEventSender, port: usize) {
let api_root = api.clone();
let root = move || async move { api_root.root().await };

let get_status = api.clone();
let status = move || async move {
match get_status.get_stream_status().await {
Ok(response) => response,
Err(_) => "error".to_owned(),
}
let api_query_streams = api.clone();
let query_streams = move |Query(params): Query<QueryWholeStreamsParams>| async move {
api_query_streams.query_whole_streams(params).await
};

let api_query_stream = api.clone();
let query_stream = move |Json(stream): Json<QueryStream>| async move {
api_query_stream.query_stream(stream).await
};

let kick_off = api.clone();
let kick = move |Json(id): Json<KickOffClient>| async move {
match kick_off.kick_off_client(id).await {
let api_kick_off = api.clone();
let kick_off = move |Json(id): Json<KickOffClient>| async move {
match api_kick_off.kick_off_client(id).await {
Ok(response) => response,
Err(_) => "error".to_owned(),
}
};

let app = Router::new()
.route("/", get(root))
.route("/get_stream_status", get(status))
.route("/kick_off_client", post(kick));
.route("/api/query_whole_streams", get(query_streams))
.route("/api/query_stream", post(query_stream))
.route("/api/kick_off_client", post(kick_off));

log::info!("Http api server listening on http://0.0.0.0:{}", port);
axum::Server::bind(&([0, 0, 0, 0], port as u16).into())
Expand Down
2 changes: 1 addition & 1 deletion confs/local/flv.Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ edition = "2018"
[dependencies]
byteorder = "1.4.2"
bytes = "1.0.0"
failure = "0.1.1"
failure = "0.1.8"
serde = { version = "1.0", features = ["derive", "rc"] }
log = "0.4"

Expand Down
Loading
Loading