Skip to content

Commit

Permalink
feat: update tonic version for grpc (#655)
Browse files Browse the repository at this point in the history
Signed-off-by: Gaius <[email protected]>
  • Loading branch information
gaius-qi authored Aug 5, 2024
1 parent 5647215 commit 87460ee
Show file tree
Hide file tree
Showing 13 changed files with 131 additions and 135 deletions.
185 changes: 82 additions & 103 deletions Cargo.lock

Large diffs are not rendered by default.

22 changes: 11 additions & 11 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ members = [
]

[workspace.package]
version = "0.1.94"
version = "0.1.95"
authors = ["The Dragonfly Developers"]
homepage = "https://d7y.io/"
repository = "https://github.com/dragonflyoss/client.git"
Expand All @@ -22,15 +22,15 @@ readme = "README.md"
edition = "2021"

[workspace.dependencies]
dragonfly-client = { path = "dragonfly-client", version = "0.1.94" }
dragonfly-client-core = { path = "dragonfly-client-core", version = "0.1.94" }
dragonfly-client-config = { path = "dragonfly-client-config", version = "0.1.94" }
dragonfly-client-storage = { path = "dragonfly-client-storage", version = "0.1.94" }
dragonfly-client-backend = { path = "dragonfly-client-backend", version = "0.1.94" }
dragonfly-client-util = { path = "dragonfly-client-util", version = "0.1.94" }
dragonfly-client-init = { path = "dragonfly-client-init", version = "0.1.94" }
dragonfly-client = { path = "dragonfly-client", version = "0.1.95" }
dragonfly-client-core = { path = "dragonfly-client-core", version = "0.1.95" }
dragonfly-client-config = { path = "dragonfly-client-config", version = "0.1.95" }
dragonfly-client-storage = { path = "dragonfly-client-storage", version = "0.1.95" }
dragonfly-client-backend = { path = "dragonfly-client-backend", version = "0.1.95" }
dragonfly-client-util = { path = "dragonfly-client-util", version = "0.1.95" }
dragonfly-client-init = { path = "dragonfly-client-init", version = "0.1.95" }
thiserror = "1.0"
dragonfly-api = "2.0.142"
dragonfly-api = "2.0.143"
reqwest = { version = "0.12.4", features = ["stream", "native-tls", "default-tls", "rustls-tls"] }
rcgen = { version = "0.12.1", features = ["x509-parser"] }
hyper = { version = "1.4", features = ["full"] }
Expand Down Expand Up @@ -59,7 +59,7 @@ serde = { version = "1.0", features = ["derive"] }
serde_yaml = "0.9"
serde_json = "1.0"
http = "1"
tonic = { version = "0.9.2", features = ["gzip"] }
tonic = { version = "0.12.1", features = ["gzip"] }
tokio = { version = "1.39.2", features = ["full"] }
tokio-util = { version = "0.7.11", features = ["full"] }
tokio-stream = "0.1.15"
Expand All @@ -68,7 +68,7 @@ warp = "0.3.5"
headers = "0.4.0"
regex = "1.10.5"
humantime = "2.1.0"
prost-wkt-types = "0.4"
prost-wkt-types = "0.6"
chrono = { version = "0.4.35", features = ["serde", "clock"] }
openssl = { version = "0.10", features = ["vendored"] }
opendal = { version = "0.47.3", features = [
Expand Down
4 changes: 2 additions & 2 deletions dragonfly-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ opentelemetry-jaeger = { version = "0.17.0", features = ["rt-tokio"] }
pprof = { version = "0.13", features = ["flamegraph", "protobuf-codec"] }
lazy_static = "1.5"
prometheus = { version = "0.13", features = ["process"] }
tonic-health = "0.9.2"
tonic-reflection = "0.9.2"
tonic-health = "0.12.1"
tonic-reflection = "0.12.1"
bytes = "1.6"
sysinfo = "0.30.13"
tower = "0.4.13"
Expand Down
13 changes: 9 additions & 4 deletions dragonfly-client/src/grpc/dfdaemon_download.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ use dragonfly_client_util::{
digest::{calculate_file_hash, Algorithm},
http::{get_range, hashmap_to_reqwest_headermap, reqwest_headermap_to_hashmap},
};
use hyper_util::rt::TokioIo;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::{Duration, Instant};
Expand Down Expand Up @@ -133,7 +134,6 @@ impl DfdaemonDownloadServer {
Server::builder()
.max_frame_size(super::MAX_FRAME_SIZE)
.concurrency_limit_per_connection(super::CONCURRENCY_LIMIT_PER_CONNECTION)
.tcp_keepalive(Some(super::TCP_KEEPALIVE))
.add_service(reflection.clone())
.add_service(health_service)
.add_service(self.service.clone())
Expand Down Expand Up @@ -369,7 +369,7 @@ impl DfdaemonDownload for DfdaemonDownloadServerHandler {
.as_str(),
download_clone.priority.to_string().as_str(),
task_clone.content_length().unwrap_or_default(),
download_clone.range.clone(),
download_clone.range,
start_time.elapsed(),
);

Expand All @@ -391,7 +391,7 @@ impl DfdaemonDownload for DfdaemonDownloadServerHandler {
.hard_link_or_copy(
task_clone,
Path::new(output_path.as_str()),
download_clone.range.clone(),
download_clone.range,
)
.await
{
Expand Down Expand Up @@ -986,7 +986,12 @@ impl DfdaemonDownloadClient {
let channel = Endpoint::try_from("http://[::]:50051")
.unwrap()
.connect_with_connector(service_fn(move |_: Uri| {
UnixStream::connect(socket_path.clone())
let socket_path = socket_path.clone();
async move {
Ok::<_, std::io::Error>(TokioIo::new(
UnixStream::connect(socket_path.clone()).await?,
))
}
}))
.await
.map_err(|err| {
Expand Down
6 changes: 2 additions & 4 deletions dragonfly-client/src/grpc/dfdaemon_upload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,6 @@ impl DfdaemonUploadServer {
Server::builder()
.max_frame_size(super::MAX_FRAME_SIZE)
.concurrency_limit_per_connection(super::CONCURRENCY_LIMIT_PER_CONNECTION)
.tcp_keepalive(Some(super::TCP_KEEPALIVE))
.add_service(reflection.clone())
.add_service(health_service)
.add_service(self.service.clone())
Expand Down Expand Up @@ -354,7 +353,7 @@ impl DfdaemonUpload for DfdaemonUploadServerHandler {
.as_str(),
download_clone.priority.to_string().as_str(),
task_clone.content_length().unwrap_or_default(),
download_clone.range.clone(),
download_clone.range,
start_time.elapsed(),
);

Expand All @@ -376,7 +375,7 @@ impl DfdaemonUpload for DfdaemonUploadServerHandler {
.hard_link_or_copy(
task_clone,
Path::new(output_path.as_str()),
download_clone.range.clone(),
download_clone.range,
)
.await
{
Expand Down Expand Up @@ -998,7 +997,6 @@ impl DfdaemonUploadClient {
let channel = Channel::from_static(Box::leak(addr.clone().into_boxed_str()))
.connect_timeout(super::CONNECT_TIMEOUT)
.timeout(super::REQUEST_TIMEOUT)
.tcp_keepalive(Some(super::TCP_KEEPALIVE))
.connect()
.await
.map_err(|err| {
Expand Down
10 changes: 9 additions & 1 deletion dragonfly-client/src/grpc/health.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use dragonfly_client_core::{
error::{ErrorType, OrErr},
Error, Result,
};
use hyper_util::rt::TokioIo;
use std::path::PathBuf;
use tokio::net::UnixStream;
use tonic::transport::{Channel, Endpoint, Uri};
Expand All @@ -43,6 +44,8 @@ impl HealthClient {
.connect_timeout(super::CONNECT_TIMEOUT)
.timeout(super::REQUEST_TIMEOUT)
.tcp_keepalive(Some(super::TCP_KEEPALIVE))
.http2_keep_alive_interval(super::HTTP2_KEEP_ALIVE_INTERVAL)
.keep_alive_timeout(super::HTTP2_KEEP_ALIVE_TIMEOUT)
.connect()
.await
.map_err(|err| {
Expand All @@ -63,7 +66,12 @@ impl HealthClient {
let channel = Endpoint::try_from("http://[::]:50051")
.unwrap()
.connect_with_connector(service_fn(move |_: Uri| {
UnixStream::connect(socket_path.clone())
let socket_path = socket_path.clone();
async move {
Ok::<_, std::io::Error>(TokioIo::new(
UnixStream::connect(socket_path.clone()).await?,
))
}
}))
.await
.map_err(|err| {
Expand Down
2 changes: 2 additions & 0 deletions dragonfly-client/src/grpc/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ impl ManagerClient {
.connect_timeout(super::CONNECT_TIMEOUT)
.timeout(super::REQUEST_TIMEOUT)
.tcp_keepalive(Some(super::TCP_KEEPALIVE))
.http2_keep_alive_interval(super::HTTP2_KEEP_ALIVE_INTERVAL)
.keep_alive_timeout(super::HTTP2_KEEP_ALIVE_TIMEOUT)
.connect()
.await
.map_err(|err| {
Expand Down
8 changes: 7 additions & 1 deletion dragonfly-client/src/grpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,13 @@ pub const REQUEST_TIMEOUT: Duration = Duration::from_secs(10);
pub const CONCURRENCY_LIMIT_PER_CONNECTION: usize = 8192;

// TCP_KEEPALIVE is the keepalive duration for TCP connection.
pub const TCP_KEEPALIVE: Duration = Duration::from_secs(60);
pub const TCP_KEEPALIVE: Duration = Duration::from_secs(3600);

// HTTP2_KEEP_ALIVE_INTERVAL is the interval for HTTP2 keep alive.
pub const HTTP2_KEEP_ALIVE_INTERVAL: Duration = Duration::from_secs(60);

// HTTP2_KEEP_ALIVE_TIMEOUT is the timeout for HTTP2 keep alive.
pub const HTTP2_KEEP_ALIVE_TIMEOUT: Duration = Duration::from_secs(5);

// MAX_FRAME_SIZE is the max frame size for GRPC, default is 12MB.
pub const MAX_FRAME_SIZE: u32 = 12 * 1024 * 1024;
Expand Down
5 changes: 2 additions & 3 deletions dragonfly-client/src/grpc/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,6 @@ impl SchedulerClient {
.map_err(|_| Error::InvalidURI(addr.to_string()))?
.connect_timeout(super::CONNECT_TIMEOUT)
.timeout(super::REQUEST_TIMEOUT)
.tcp_keepalive(Some(super::TCP_KEEPALIVE))
.connect()
.await
.map_err(|err| {
Expand Down Expand Up @@ -228,7 +227,6 @@ impl SchedulerClient {
.map_err(|_| Error::InvalidURI(addr.to_string()))?
.connect_timeout(super::CONNECT_TIMEOUT)
.timeout(super::REQUEST_TIMEOUT)
.tcp_keepalive(Some(super::TCP_KEEPALIVE))
.connect()
.await
.map_err(|err| {
Expand Down Expand Up @@ -287,7 +285,6 @@ impl SchedulerClient {
.map_err(|_| Error::InvalidURI(addr.to_string()))?
.connect_timeout(super::CONNECT_TIMEOUT)
.timeout(super::REQUEST_TIMEOUT)
.tcp_keepalive(Some(super::TCP_KEEPALIVE))
.connect()
.await
.map_err(|err| {
Expand Down Expand Up @@ -453,6 +450,8 @@ impl SchedulerClient {
.connect_timeout(super::CONNECT_TIMEOUT)
.timeout(super::REQUEST_TIMEOUT)
.tcp_keepalive(Some(super::TCP_KEEPALIVE))
.http2_keep_alive_interval(super::HTTP2_KEEP_ALIVE_INTERVAL)
.keep_alive_timeout(super::HTTP2_KEEP_ALIVE_TIMEOUT)
.connect()
.await
{
Expand Down
1 change: 0 additions & 1 deletion dragonfly-client/src/grpc/security.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ impl CertificateClient {
let channel = Channel::from_static(Box::leak(addr.into_boxed_str()))
.connect_timeout(super::CONNECT_TIMEOUT)
.timeout(super::REQUEST_TIMEOUT)
.tcp_keepalive(Some(super::TCP_KEEPALIVE))
.connect()
.await
.or_err(ErrorType::ConnectError)?;
Expand Down
2 changes: 1 addition & 1 deletion dragonfly-client/src/proxy/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -625,7 +625,7 @@ async fn proxy_by_dfdaemon(
message.task_id.as_str(),
piece.number,
piece.length,
download_task_started_response.range.clone(),
download_task_started_response.range,
true,
)
.await
Expand Down
4 changes: 2 additions & 2 deletions dragonfly-client/src/resource/cache_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ impl CacheTask {
tag: request.tag.clone(),
application: request.application.clone(),
piece_length: request.piece_length,
ttl: request.ttl.clone(),
ttl: request.ttl,
timeout: request.timeout,
})
.await
Expand All @@ -129,7 +129,7 @@ impl CacheTask {
})?;

// Convert prost_wkt_types::Duration to std::time::Duration.
let ttl = Duration::try_from(request.ttl.clone().ok_or(Error::UnexpectedResponse)?)
let ttl = Duration::try_from(request.ttl.ok_or(Error::UnexpectedResponse)?)
.or_err(ErrorType::ParseError)?;

// Create the persistent cache task.
Expand Down
4 changes: 2 additions & 2 deletions dragonfly-client/src/resource/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ impl Task {
let interested_pieces = match self.piece.calculate_interested(
request.piece_length,
content_length,
request.range.clone(),
request.range,
) {
Ok(interested_pieces) => interested_pieces,
Err(err) => {
Expand Down Expand Up @@ -260,7 +260,7 @@ impl Task {
download_task_response::Response::DownloadTaskStartedResponse(
dfdaemon::v2::DownloadTaskStartedResponse {
content_length,
range: request.range.clone(),
range: request.range,
response_header: task.response_header.clone(),
pieces,
},
Expand Down

0 comments on commit 87460ee

Please sign in to comment.