Skip to content

Commit

Permalink
chore: configure fluvio-run build for armv7 (#3444)
Browse files Browse the repository at this point in the history
Co-authored-by: Alan Chen <[email protected]>
Co-authored-by: morenol <[email protected]>
  • Loading branch information
3 people committed Aug 8, 2023
1 parent b7a2765 commit 61fa444
Show file tree
Hide file tree
Showing 16 changed files with 251 additions and 49 deletions.
2 changes: 0 additions & 2 deletions .cargo/config.toml

This file was deleted.

24 changes: 15 additions & 9 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ permissions: read-all
#permissions:
# contents: read

concurrency:
concurrency:
group: ci-${{ github.ref }}
cancel-in-progress: true

Expand Down Expand Up @@ -119,7 +119,7 @@ jobs:
- name: Build smdk
if: matrix.binary == 'smdk'
run: make build-smdk

- name: Build cdk
if: matrix.binary == 'cdk'
run: make build-cdk
Expand Down Expand Up @@ -154,6 +154,10 @@ jobs:
rust: stable
rust-target: armv7-unknown-linux-gnueabihf
binary: fluvio
- os: ubuntu-latest
rust: stable
rust-target: armv7-unknown-linux-gnueabihf
binary: fluvio-run
- os: ubuntu-latest
rust: stable
rust-target: x86_64-pc-windows-gnu
Expand Down Expand Up @@ -208,7 +212,7 @@ jobs:
rust: stable
rust-target: aarch64-apple-darwin
binary: smdk

# cdk
- os: macos-12
rust: stable
Expand Down Expand Up @@ -288,7 +292,7 @@ jobs:
if: matrix.rust-target == 'x86_64-pc-windows-gnu'
run: |
sudo apt-get update -o="APT::Acquire::Retries=3"
sudo apt-get install -y -V -o="APT::Acquire::Retries=3" gcc-mingw-w64-x86-64
sudo apt-get install -y -V -o="APT::Acquire::Retries=3" gcc-mingw-w64-x86-64
- name: Build fluvio.exe
timeout-minutes: 40
Expand Down Expand Up @@ -319,7 +323,7 @@ jobs:
timeout-minutes: 40
if: matrix.binary == 'smdk.exe'
run: make build-smdk

- name: Build cdk
timeout-minutes: 40
if: matrix.binary == 'cdk'
Expand Down Expand Up @@ -553,7 +557,7 @@ jobs:
with:
toolchain: ${{ matrix.rust }}
profile: minimal
- name: Rust version
- name: Rust version
run: rustc --version
- uses: Swatinem/rust-cache@v2
timeout-minutes: 10
Expand Down Expand Up @@ -943,7 +947,7 @@ jobs:
exclude:
- cluster_version: stable
cli_version: stable

steps:
- uses: actions/checkout@v3
- name: Install Rust ${{ matrix.rust }}
Expand All @@ -970,7 +974,7 @@ jobs:
run: |
curl -fsS https://hub.infinyon.cloud/install/install.sh | bash
echo "~/.fluvio/bin" >> $GITHUB_PATH
# Download artifacts from development build
- name: Download artifact - fluvio
uses: actions/download-artifact@v3
Expand Down Expand Up @@ -1056,7 +1060,7 @@ jobs:
if: matrix.test == 'smdk'
timeout-minutes: 20
run: make SMDK_BIN=~/bin/smdk cli-smdk-smoke

# test cdk
- name: Download artifact - cdk
if: matrix.test == 'cdk'
Expand Down Expand Up @@ -1215,6 +1219,8 @@ jobs:
artifact: fluvio-run
- rust-target: aarch64-apple-darwin
artifact: fluvio-run
- rust-target: armv7-unknown-linux-gnueabihf
artifact: fluvio-run
- rust-target: x86_64-pc-windows-gnu
artifact: fluvio.exe
- rust-target: x86_64-pc-windows-gnu
Expand Down
9 changes: 5 additions & 4 deletions crates/fluvio-run/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ path = "src/bin/main.rs"
doc = false

[features]
default = ["k8"]
default = ["k8", "spu_smartengine"]
k8 = [ "fluvio-sc/k8"]
spu_smartengine = ["fluvio-spu/smartengine"]
rustls = ["fluvio-future/rust_tls"]

[dependencies]
clap = { workspace = true, features = ["std", "derive", "help", "usage", "error-context"]}
Expand All @@ -31,7 +33,6 @@ tracing-subscriber = { workspace = true }

# regardless of TLS, sc and spu always use openssl_tls for now because we need cert API
fluvio-future = { workspace = true, features = ["subscriber"] }
fluvio-sc = { path = "../fluvio-sc", default-features = false }
fluvio-spu = { path = "../fluvio-spu" }
fluvio-extension-common = { workspace = true }

fluvio-sc = { path = "../fluvio-sc", default-features = false }
fluvio-spu = { path = "../fluvio-spu", default-features = false }
9 changes: 5 additions & 4 deletions crates/fluvio-sc/src/start.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
use k8_client::new_shared;
use tracing::info;

use crate::{cli::ScOpt, core::K8SharedContext};
use crate::cli::ScOpt;

pub fn main_loop(opt: ScOpt) {
// parse configuration (program exits on error)
Expand Down Expand Up @@ -46,15 +46,16 @@ pub fn main_loop(opt: ScOpt) {
info!("starting main loop");

#[cfg(feature = "k8")]
let ctx: K8SharedContext = crate::init::start_main_loop_with_k8(
let ctx: crate::core::K8SharedContext = crate::init::start_main_loop_with_k8(
(sc_config.clone(), auth_policy),
k8_client.clone(),
)
.await;

#[cfg(not(feature = "k8"))]
let _ctx: std::sync::Arc<crate::core::Context> = {
let ctx: std::sync::Arc<crate::core::Context> =
// TODO: don't use K8SharedContext
let _ctx: crate::core::K8SharedContext = {
let ctx: crate::core::K8SharedContext =
crate::core::Context::shared_metadata(sc_config.clone());

crate::init::start_main_loop(ctx, auth_policy).await
Expand Down
9 changes: 6 additions & 3 deletions crates/fluvio-spu/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ name = "fluvio-spu"
path = "src/main.rs"
doc = false

[features]
default = ["smartengine"]
smartengine = ["dep:fluvio-smartengine", "fluvio/smartengine"]

[dependencies]
cfg-if = { workspace = true }
anyhow = { workspace = true }
Expand Down Expand Up @@ -45,7 +49,7 @@ chrono = { workspace = true }


# Fluvio dependencies
fluvio = { workspace = true, features = ["smartengine"]}
fluvio = { workspace = true }
fluvio-types = { workspace = true, features = ["events"] }
fluvio-storage = { workspace = true }
fluvio-compression = { workspace = true }
Expand All @@ -62,7 +66,7 @@ fluvio-future = { workspace = true,features = [
"openssl_tls",
"zero_copy",
] }
fluvio-smartengine = { workspace = true }
fluvio-smartengine = { workspace = true, optional = true }
fluvio-smartmodule = { workspace = true}

[dev-dependencies]
Expand All @@ -75,4 +79,3 @@ portpicker = { workspace = true }
flv-util = { workspace = true, features = ["fixture"] }
fluvio-future = { workspace = true,features = ["fixture", "subscriber"] }
fluvio-protocol = { workspace = true, features = ["fixture"] }

2 changes: 1 addition & 1 deletion crates/fluvio-spu/src/core/global_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use std::fmt::Debug;

use tracing::{debug, error, instrument};

use fluvio_smartengine::SmartEngine;
use fluvio_types::SpuId;
use fluvio_storage::ReplicaStorage;

Expand All @@ -20,6 +19,7 @@ use crate::replication::leader::{
};
use crate::control_plane::{StatusMessageSink, SharedStatusUpdate};
use crate::core::metrics::SpuMetrics;
use crate::smartengine::SmartEngine;

use super::leader_client::LeaderConnections;
use super::smartmodule::SmartModuleLocalStore;
Expand Down
3 changes: 2 additions & 1 deletion crates/fluvio-spu/src/core/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@ use std::{
};

use fluvio_protocol::record::Batch;
use fluvio_smartengine::metrics::SmartModuleChainMetrics;
use fluvio_spu_schema::fetch::FilePartitionResponse;
use serde::Serialize;

use crate::smartengine::SmartModuleChainMetrics;

#[derive(Default, Debug, Serialize)]
pub(crate) struct SpuMetrics {
inbound: Activity,
Expand Down
7 changes: 5 additions & 2 deletions crates/fluvio-spu/src/services/public/produce_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use tracing::{debug, trace, error};
use tracing::instrument;
use anyhow::{anyhow, Result};

use fluvio_smartengine::{SmartModuleChainInstance, EngineError};
use fluvio_protocol::api::{RequestKind, RequestHeader};
use fluvio_spu_schema::Isolation;
use fluvio_protocol::record::{BatchRecords, Offset, Batch, RawRecords};
Expand All @@ -28,10 +27,13 @@ use fluvio_future::timer::sleep;

use crate::core::DefaultSharedGlobalContext;
use crate::replication::leader::SharedFileLeaderState;
use crate::smartengine::batch::process_batch;
use crate::smartengine::context::SmartModuleContext;
use crate::smartengine::EngineError;
use crate::smartengine::map_engine_error;
use crate::smartengine::produce_batch::ProduceBatchIterator;
use crate::smartengine::batch::process_batch;
use crate::smartengine::SmartModuleChainInstance;

use crate::traffic::TrafficType;

struct TopicWriteResult {
Expand Down Expand Up @@ -223,6 +225,7 @@ async fn smartmodule_ctx(
})
}

// #[cfg(feature = "smartengine")]
fn apply_smartmodules_for_partition_request(
partition_request: &mut PartitionProduceData<RecordSet<RawRecords>>,
sm_chain_instance: &mut SmartModuleChainInstance,
Expand Down
3 changes: 1 addition & 2 deletions crates/fluvio-spu/src/smartengine/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,10 @@ use fluvio_protocol::{
record::{Batch, MemoryRecords, Offset},
link::smartmodule::SmartModuleTransformRuntimeError,
};
use fluvio_smartengine::metrics::SmartModuleChainMetrics;
use fluvio_smartengine::SmartModuleChainInstance;
use fluvio_smartmodule::dataplane::smartmodule::SmartModuleInput;

use crate::smartengine::produce_batch::ProduceBatchIterator;
use crate::smartengine::{SmartModuleChainInstance, SmartModuleChainMetrics};

pub(crate) trait SmartModuleInputBatch {
fn records(&self) -> &Vec<u8>;
Expand Down
31 changes: 24 additions & 7 deletions crates/fluvio-spu/src/smartengine/chain.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,30 @@
#[cfg(feature = "smartengine")]
use tracing::{debug, error};
use fluvio_protocol::link::ErrorCode;
use fluvio_smartengine::{
SmartEngine, SmartModuleChainBuilder, SmartModuleChainInstance, SmartModuleConfig,
SmartModuleInitialData, EngineError,
};
use fluvio_spu_schema::server::smartmodule::{
SmartModuleContextData, SmartModuleInvocation, SmartModuleKind,
};
use fluvio_spu_schema::server::smartmodule::SmartModuleInvocation;

#[cfg(feature = "smartengine")]
use fluvio_smartengine::{EngineError, SmartModuleConfig, SmartModuleInitialData};

#[cfg(feature = "smartengine")]
use fluvio_spu_schema::server::smartmodule::{SmartModuleContextData, SmartModuleKind};

use crate::smartengine::SmartModuleChainBuilder;
use crate::smartengine::SmartEngine;
use crate::smartengine::SmartModuleChainInstance;

#[cfg(not(feature = "smartengine"))]
pub(crate) fn build_chain(
mut _chain_builder: SmartModuleChainBuilder,
_invocations: Vec<SmartModuleInvocation>,
_version: i16,
_engine: SmartEngine,
) -> Result<SmartModuleChainInstance, ErrorCode> {
let smci = SmartModuleChainInstance {};
Ok(smci)
}

#[cfg(feature = "smartengine")]
pub(crate) fn build_chain(
mut chain_builder: SmartModuleChainBuilder,
invocations: Vec<SmartModuleInvocation>,
Expand Down
13 changes: 7 additions & 6 deletions crates/fluvio-spu/src/smartengine/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ use std::time::Duration;

use async_rwlock::RwLock;
use chrono::Utc;
use fluvio::SmartModuleChainBuilder;
use fluvio_smartengine::{SmartModuleChainInstance, Version, Lookback};
use fluvio_protocol::link::ErrorCode;
use fluvio_smartmodule::Record;
use fluvio_spu_schema::server::smartmodule::{SmartModuleInvocation, SmartModuleInvocationWasm};
Expand All @@ -15,10 +13,13 @@ use tracing::{debug, trace, error};
use crate::core::GlobalContext;
use crate::core::metrics::SpuMetrics;
use crate::replication::leader::LeaderReplicaState;
use crate::smartengine::{
file_batch::{FileRecordIterator, FileBatchIterator},
chain,
};

use crate::smartengine::chain;
use crate::smartengine::file_batch::{FileRecordIterator, FileBatchIterator};
use crate::smartengine::Lookback;
use crate::smartengine::SmartModuleChainBuilder;
use crate::smartengine::SmartModuleChainInstance;
use crate::smartengine::Version;

use super::file_batch::{FileBatch, RecordItem};

Expand Down
18 changes: 16 additions & 2 deletions crates/fluvio-spu/src/smartengine/file_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,15 @@ impl Iterator for FileBatchIterator {
return None;
}

let offset = self.offset;

// ugly hack for armv7 pread offset = i32
// needed for gnu but not zig musl
#[cfg(all(target_pointer_width = "32", target_env = "gnu"))]
let offset: i32 = offset.try_into().unwrap();

let mut header = vec![0u8; BATCH_FILE_HEADER_SIZE];
let bytes_read = match pread(self.fd, &mut header, self.offset)
let bytes_read = match pread(self.fd, &mut header, offset)
.map_err(|err| IoError::new(ErrorKind::Other, format!("pread error {err}")))
{
Ok(bytes) => bytes,
Expand Down Expand Up @@ -118,7 +125,14 @@ impl Iterator for FileBatchIterator {

self.offset += BATCH_FILE_HEADER_SIZE as i64;

let bytes_read = match pread(self.fd, &mut raw_records, self.offset)
let offset = self.offset;

// ugly hack for armv7 pread offset = i32
// needed for gnu but not zig musl
#[cfg(all(target_pointer_width = "32", target_env = "gnu"))]
let offset: i32 = offset.try_into().unwrap();

let bytes_read = match pread(self.fd, &mut raw_records, offset)
.map_err(|err| IoError::new(ErrorKind::Other, format!("pread error {err}")))
{
Ok(bytes) => bytes,
Expand Down
Loading

0 comments on commit 61fa444

Please sign in to comment.