Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose stream-ordering in scalar and avro APIs #17766

Open
wants to merge 10 commits into
base: branch-25.02
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion cpp/include/cudf/io/avro.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020-2024, NVIDIA CORPORATION.
* Copyright (c) 2020-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -208,13 +208,15 @@ class avro_reader_options_builder {
* @endcode
*
* @param options Settings for controlling reading behavior
* @param stream CUDA stream used for device memory operations and kernel launches
* @param mr Device memory resource used to allocate device memory of the table in the returned
* table_with_metadata
*
* @return The set of columns along with metadata
*/
table_with_metadata read_avro(
avro_reader_options const& options,
rmm::cuda_stream_view stream = cudf::get_default_stream(),
rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref());

/** @} */ // end of group
Expand Down
17 changes: 1 addition & 16 deletions cpp/include/cudf/scalar/scalar.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2024, NVIDIA CORPORATION.
* Copyright (c) 2019-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -176,11 +176,6 @@ class fixed_width_scalar : public scalar {
*/
void set_value(T value, rmm::cuda_stream_view stream = cudf::get_default_stream());

/**
* @brief Explicit conversion operator to get the value of the scalar on the host.
*/
explicit operator value_type() const;

/**
* @brief Get the value of the scalar.
*
Expand Down Expand Up @@ -402,11 +397,6 @@ class fixed_point_scalar : public scalar {
[[nodiscard]] T fixed_point_value(
rmm::cuda_stream_view stream = cudf::get_default_stream()) const;

/**
* @brief Explicit conversion operator to get the value of the scalar on the host.
*/
explicit operator value_type() const;

/**
* @brief Returns a raw pointer to the value in device memory.
* @return A raw pointer to the value in device memory
Expand Down Expand Up @@ -515,11 +505,6 @@ class string_scalar : public scalar {
rmm::cuda_stream_view stream = cudf::get_default_stream(),
rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref());

/**
* @brief Explicit conversion operator to get the value of the scalar in a host std::string.
*/
explicit operator std::string() const;

/**
* @brief Get the value of the scalar in a host std::string.
*
Expand Down
6 changes: 4 additions & 2 deletions cpp/src/io/functions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,9 @@ std::vector<std::unique_ptr<data_sink>> make_datasinks(sink_info const& info)

} // namespace

table_with_metadata read_avro(avro_reader_options const& options, rmm::device_async_resource_ref mr)
table_with_metadata read_avro(avro_reader_options const& options,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
{
namespace avro = cudf::io::detail::avro;

Expand All @@ -199,7 +201,7 @@ table_with_metadata read_avro(avro_reader_options const& options, rmm::device_as

CUDF_EXPECTS(datasources.size() == 1, "Only a single source is currently supported.");

return avro::read_avro(std::move(datasources[0]), options, cudf::get_default_stream(), mr);
return avro::read_avro(std::move(datasources[0]), options, stream, mr);
}

table_with_metadata read_json(json_reader_options options,
Expand Down
22 changes: 2 additions & 20 deletions cpp/src/scalar/scalar.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2024, NVIDIA CORPORATION.
* Copyright (c) 2019-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -110,8 +110,6 @@ size_type string_scalar::size() const { return _data.size(); }

char const* string_scalar::data() const { return static_cast<char const*>(_data.data()); }

string_scalar::operator std::string() const { return this->to_string(cudf::get_default_stream()); }

std::string string_scalar::to_string(rmm::cuda_stream_view stream) const
{
std::string result(size(), '\0');
Expand Down Expand Up @@ -183,17 +181,7 @@ T fixed_point_scalar<T>::fixed_point_value(rmm::cuda_stream_view stream) const
numeric::scaled_integer<rep_type>{_data.value(stream), numeric::scale_type{type().scale()}}};
}

template <typename T>
fixed_point_scalar<T>::operator value_type() const
{
return this->fixed_point_value(cudf::get_default_stream());
}

template <typename T>
typename fixed_point_scalar<T>::rep_type* fixed_point_scalar<T>::data()
{
return _data.data();
}
typename fixed_point_scalar<T>::rep_type* fixed_point_scalar<T>::data() { return _data.data(); }

template <typename T>
typename fixed_point_scalar<T>::rep_type const* fixed_point_scalar<T>::data() const
Expand Down Expand Up @@ -266,12 +254,6 @@ T const* fixed_width_scalar<T>::data() const
return _data.data();
}

template <typename T>
fixed_width_scalar<T>::operator value_type() const
{
return this->value(cudf::get_default_stream());
}

/**
* @brief These define the valid fixed-width scalar types.
*
Expand Down
1 change: 1 addition & 0 deletions cpp/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -725,6 +725,7 @@ ConfigureTest(STREAM_REPLACE_TEST streams/replace_test.cpp STREAM_MODE testing)
ConfigureTest(STREAM_RESHAPE_TEST streams/reshape_test.cpp STREAM_MODE testing)
ConfigureTest(STREAM_ROLLING_TEST streams/rolling_test.cpp STREAM_MODE testing)
ConfigureTest(STREAM_ROUND_TEST streams/round_test.cpp STREAM_MODE testing)
ConfigureTest(STREAM_SCALAR_TEST streams/scalar_test.cpp STREAM_MODE testing)
ConfigureTest(STREAM_SEARCH_TEST streams/search_test.cpp STREAM_MODE testing)
ConfigureTest(STREAM_SORTING_TEST streams/sorting_test.cpp STREAM_MODE testing)
ConfigureTest(STREAM_STREAM_COMPACTION_TEST streams/stream_compaction_test.cpp STREAM_MODE testing)
Expand Down
19 changes: 16 additions & 3 deletions cpp/tests/binaryop/assert-binops.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2023, NVIDIA CORPORATION.
* Copyright (c) 2019-2025, NVIDIA CORPORATION.
*
* Copyright 2018-2019 BlazingDB, Inc.
* Copyright 2018 Christian Noboa Mardini <[email protected]>
Expand Down Expand Up @@ -69,6 +69,19 @@ struct NearEqualComparator {
}
};

template <typename TypeLhs, typename ScalarType>
TypeLhs scalar_host_value(cudf::scalar const& lhs)
{
auto sclr = static_cast<ScalarType const&>(lhs);
auto stream = cudf::get_default_stream();
if constexpr (std::is_same_v<ScalarType, cudf::string_scalar>)
return sclr.to_string(stream);
else if constexpr (std::is_same_v<ScalarType, cudf::fixed_point_scalar<TypeLhs>>)
return sclr.fixed_point_value(stream);
else
return sclr.value(stream);
}

template <typename TypeOut,
typename TypeLhs,
typename TypeRhs,
Expand All @@ -81,7 +94,7 @@ void ASSERT_BINOP(cudf::column_view const& out,
TypeOp&& op,
ValueComparator const& value_comparator = ValueComparator())
{
auto lhs_h = static_cast<ScalarType const&>(lhs).operator TypeLhs();
auto lhs_h = scalar_host_value<TypeLhs, ScalarType>(lhs);
auto rhs_h = cudf::test::to_host<TypeRhs>(rhs);
auto rhs_data = rhs_h.first;
auto out_h = cudf::test::to_host<TypeOut>(out);
Expand Down Expand Up @@ -129,7 +142,7 @@ void ASSERT_BINOP(cudf::column_view const& out,
TypeOp&& op,
ValueComparator const& value_comparator = ValueComparator())
{
auto rhs_h = static_cast<ScalarType const&>(rhs).operator TypeRhs();
auto rhs_h = scalar_host_value<TypeRhs, ScalarType>(rhs);
auto lhs_h = cudf::test::to_host<TypeLhs>(lhs);
auto lhs_data = lhs_h.first;
auto out_h = cudf::test::to_host<TypeOut>(out);
Expand Down
43 changes: 43 additions & 0 deletions cpp/tests/streams/scalar_test.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright (c) 2023-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <cudf_test/base_fixture.hpp>
#include <cudf_test/default_stream.hpp>
#include <cudf_test/type_lists.hpp>

#include <cudf/scalar/scalar.hpp>

template <typename T>
struct TypedScalarTest : public cudf::test::BaseFixture {};

TYPED_TEST_SUITE(TypedScalarTest, cudf::test::FixedWidthTypes);

TYPED_TEST(TypedScalarTest, DefaultValidity)
{
using Type = cudf::device_storage_type_t<TypeParam>;
Type value = static_cast<Type>(cudf::test::make_type_param_scalar<TypeParam>(7));
cudf::scalar_type_t<TypeParam> s(value, true, cudf::test::get_default_stream());
EXPECT_EQ(value, s.value(cudf::test::get_default_stream()));
}

struct StringScalarTest : public cudf::test::BaseFixture {};

TEST_F(StringScalarTest, DefaultValidity)
{
std::string value = "test string";
auto s = cudf::string_scalar(value, true, cudf::test::get_default_stream());
EXPECT_EQ(value, s.to_string(cudf::test::get_default_stream()));
}
Loading