Skip to content

Commit

Permalink
Add proxygen endpoint for expression eval
Browse files Browse the repository at this point in the history
  • Loading branch information
pramodsatya committed Jul 25, 2024
1 parent 03a2bd8 commit 418b219
Show file tree
Hide file tree
Showing 8 changed files with 580 additions and 0 deletions.
2 changes: 2 additions & 0 deletions presto-native-execution/presto_cpp/main/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ add_subdirectory(operators)
add_subdirectory(types)
add_subdirectory(http)
add_subdirectory(common)
add_subdirectory(eval)
add_subdirectory(thrift)

add_library(
Expand Down Expand Up @@ -48,6 +49,7 @@ target_link_libraries(
presto_exception
presto_http
presto_operators
presto_expr_eval
velox_aggregates
velox_caching
velox_common_base
Expand Down
7 changes: 7 additions & 0 deletions presto-native-execution/presto_cpp/main/PrestoServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,13 @@ void PrestoServer::run() {
taskManager_ = std::make_unique<TaskManager>(
driverExecutor_.get(), httpSrvCpuExecutor_.get(), spillerExecutor_.get());

// Initialize prestoExprEval_ after pool_ is initialized and the scalar
// functions are registered.
if (1 /*systemConfig->sidecar()*/) {
prestoExprEval_ = std::make_unique<eval::PrestoExprEval>(pool_);
prestoExprEval_->registerUris(*httpServer_);
}

std::string taskUri;
if (httpsPort.has_value()) {
taskUri = fmt::format(kTaskUriFormat, kHttps, address_, httpsPort.value());
Expand Down
2 changes: 2 additions & 0 deletions presto-native-execution/presto_cpp/main/PrestoServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "presto_cpp/main/PeriodicHeartbeatManager.h"
#include "presto_cpp/main/PrestoExchangeSource.h"
#include "presto_cpp/main/PrestoServerOperations.h"
#include "presto_cpp/main/eval/PrestoExprEval.h"
#include "velox/common/caching/AsyncDataCache.h"
#include "velox/common/memory/MemoryAllocator.h"
#if __has_include("filesystem")
Expand Down Expand Up @@ -261,6 +262,7 @@ class PrestoServer {
std::string address_;
std::string nodeLocation_;
folly::SSLContextPtr sslContext_;
std::unique_ptr<eval::PrestoExprEval> prestoExprEval_;
};

} // namespace facebook::presto
33 changes: 33 additions & 0 deletions presto-native-execution/presto_cpp/main/eval/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# Copyright (c) Facebook, Inc. and its affiliates.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
add_library(presto_expr_eval PrestoExprEval.cpp)

target_link_libraries(
presto_expr_eval
presto_type_converter
presto_types
presto_protocol
presto_http
velox_coverage_util
velox_parse_expression
velox_parse_parser
velox_presto_serializer
velox_serialization
velox_type_parser
${FOLLY_WITH_DEPENDENCIES}
${PROXYGEN_LIBRARIES})

if(PRESTO_ENABLE_TESTING)
add_subdirectory(tests)
endif()
198 changes: 198 additions & 0 deletions presto-native-execution/presto_cpp/main/eval/PrestoExprEval.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "presto_cpp/main/eval/PrestoExprEval.h"
#include <proxygen/httpserver/ResponseBuilder.h>
#include "presto_cpp/main/common/Utils.h"
#include "presto_cpp/presto_protocol/presto_protocol.h"
#include "velox/common/encode/Base64.h"
#include "velox/core/Expressions.h"
#include "velox/exec/ExchangeQueue.h"
#include "velox/expression/ConstantExpr.h"
#include "velox/expression/EvalCtx.h"
#include "velox/expression/Expr.h"
#include "velox/expression/ExprCompiler.h"
#include "velox/expression/FieldReference.h"
#include "velox/expression/LambdaExpr.h"
#include "velox/parse/Expressions.h"
#include "velox/parse/ExpressionsParser.h"
#include "velox/serializers/PrestoSerializer.h"
#include "velox/vector/BaseVector.h"
#include "velox/vector/ComplexVector.h"

using namespace facebook::presto;
using namespace facebook::velox;

namespace facebook::presto::eval {

namespace {
constexpr vector_size_t kValueBlockSkipBytes = 25;

const std::unique_ptr<exec::SerializedPage> toSerializedPage(
const RowVectorPtr& vector,
memory::MemoryPool* pool) {
auto numRows = vector->size();
serializer::presto::PrestoVectorSerde::PrestoOptions paramOptions;
std::unique_ptr<serializer::presto::PrestoVectorSerde> serde =
std::make_unique<serializer::presto::PrestoVectorSerde>();
const IndexRange allRows{0, numRows};
const auto arena = std::make_unique<StreamArena>(pool);
const auto serializer = serde->createIterativeSerializer(
asRowType(vector->type()), numRows, arena.get(), &paramOptions);
serializer->append(vector, folly::Range(&allRows, 1));

IOBufOutputStream stream(*pool);
serializer->flush(&stream);
return std::make_unique<exec::SerializedPage>(
stream.getIOBuf(), nullptr, numRows);
}

// ValueBlock in ConstantExpression requires only the column to be serialized
// in PrestoPage format, without the page header. The first 25 bytes,
// representing the page header and the number of rows are skipped.
const std::string serializeValueBlock(
const RowVectorPtr& vector,
memory::MemoryPool* pool) {
auto serializedPage = toSerializedPage(vector, pool);
auto resultBuf = serializedPage->getIOBuf();
auto pageLen = serializedPage->size();
resultBuf->gather(pageLen);
return velox::encoding::Base64::encode(
reinterpret_cast<const char*>(resultBuf->data() + kValueBlockSkipBytes),
resultBuf->length() - kValueBlockSkipBytes);
}

json fieldReferenceToVariableRefExpr(exec::FieldReference* fieldReference) {
json res;
res["@type"] = "variable";
res["sourceLocation"] = "sampleSource";
res["name"] = fieldReference->name();
res["type"] = fieldReference->type()->toString();
return res;
}
} // namespace

void PrestoExprEval::registerUris(http::HttpServer& server) {
server.registerPost(
"/v1/expressions",
[&](proxygen::HTTPMessage* /*message*/,
const std::vector<std::unique_ptr<folly::IOBuf>>& body,
proxygen::ResponseHandler* downstream) {
return evaluateExpression(body, downstream);
});
}

json PrestoExprEval::exprToRowExpression(
std::shared_ptr<velox::exec::Expr> expr) {
json res;
if (expr->isConstant()) {
// constant
res["@type"] = "constant";
auto constantExpr = std::dynamic_pointer_cast<exec::ConstantExpr>(expr);
auto type = constantExpr->type();
VectorPtr constVec = constantExpr->value();
const size_t length = constVec->size();
auto rowVector = std::make_shared<RowVector>(
pool_.get(),
ROW({type}),
nullptr,
length,
std::vector<VectorPtr>({constVec}));
res["valueBlock"] = serializeValueBlock(rowVector, pool_.get());
res["type"] = expr->type()->toString();
} else if (expr->isSpecialForm()) {
// special
res["@type"] = "special";
res["sourceLocation"] = "sampleSource";
auto inputs = expr->inputs();
res["arguments"] = json::array();
for (auto input : inputs) {
res["arguments"].push_back(exprToRowExpression(input));
}
res["form"] = "BIND";
res["type"] = expr->type()->toString();
} else if (auto lambda = std::dynamic_pointer_cast<exec::LambdaExpr>(expr)) {
// lambda
res["@type"] = "lambda";
auto inputs = lambda->distinctFields();
res["arguments"] = json::array();
res["argumentTypes"] = json::array();
auto numInputs = inputs.size();
for (auto i = 0; i < numInputs; i++) {
res["arguments"].push_back(fieldReferenceToVariableRefExpr(inputs[i]));
// TODO: Recheck type conversion.
res["argumentTypes"].push_back(lambda->type()->childAt(i)->toString());
}
VELOX_USER_CHECK(isLambda_, "Not a lambda expression");
res["body"] = lambdaTypedExpr_->body()->toString();
} else if (auto func = expr->vectorFunction()) {
// call
res["@type"] = "call";
res["sourceLocation"] = "sampleSource";
res["displayName"] = expr->name();
res["functionHandle"] = expr->toString();
res["returnType"] = expr->type()->toString();
auto fields = expr->distinctFields();
for (auto field : fields) {
// TODO: Check why static cast and dynamic cast are not working.
res["arguments"].push_back(fieldReferenceToVariableRefExpr(field));
}
auto inputs = expr->inputs();
for (auto input : inputs) {
res["arguments"].push_back(exprToRowExpression(input));
}
} else {
VELOX_NYI("Unable to convert velox expr to rowexpr");
}

return res;
}

void PrestoExprEval::evaluateExpression(
const std::vector<std::unique_ptr<folly::IOBuf>>& body,
proxygen::ResponseHandler* downstream) {
std::ostringstream oss;
for (auto& buf : body) {
oss << std::string((const char*)buf->data(), buf->length());
}
auto input = json::parse(oss.str());
auto numExpr = input.size();
nlohmann::json output = json::array();

for (auto i = 0; i < numExpr; i++) {
std::shared_ptr<protocol::RowExpression> inputRowExpr = input[i];
auto typedExpr = exprConverter_.toVeloxExpr(inputRowExpr);

if (auto lambdaExpr = core::TypedExprs::asLambda(typedExpr)) {
lambdaTypedExpr_ = lambdaExpr;
isLambda_ = true;
} else {
isLambda_ = false;
}

exec::ExprSet exprSet{{typedExpr}, execCtx_.get()};
auto compiledExprs =
exec::compileExpressions({typedExpr}, execCtx_.get(), &exprSet, true);
auto compiledExpr = compiledExprs[0];
auto resultJson = exprToRowExpression(compiledExpr);
output.push_back(resultJson);
}

proxygen::ResponseBuilder(downstream)
.status(http::kHttpOk, "")
.header(
proxygen::HTTP_HEADER_CONTENT_TYPE, http::kMimeTypeApplicationJson)
.body(output.dump())
.sendWithEOM();
}
} // namespace facebook::presto::eval
53 changes: 53 additions & 0 deletions presto-native-execution/presto_cpp/main/eval/PrestoExprEval.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once

#include <proxygen/httpserver/ResponseHandler.h>
#include "presto_cpp/external/json/nlohmann/json.hpp"
#include "presto_cpp/main/http/HttpServer.h"
#include "presto_cpp/main/types/PrestoToVeloxExpr.h"
#include "velox/core/QueryCtx.h"
#include "velox/expression/Expr.h"

namespace facebook::presto::eval {

class PrestoExprEval {
public:
PrestoExprEval(std::shared_ptr<velox::memory::MemoryPool> pool)
: pool_(pool),
queryCtx_(facebook::velox::core::QueryCtx::create()),
execCtx_{std::make_unique<velox::core::ExecCtx>(
pool.get(),
queryCtx_.get())},
exprConverter_(pool.get(), &typeParser_){};

void registerUris(http::HttpServer& server);

/// Evaluate expressions sent along /v1/expressions endpoint.
void evaluateExpression(
const std::vector<std::unique_ptr<folly::IOBuf>>& body,
proxygen::ResponseHandler* downstream);

protected:
json exprToRowExpression(std::shared_ptr<velox::exec::Expr> expr);

const std::shared_ptr<velox::memory::MemoryPool> pool_;
const std::shared_ptr<velox::core::QueryCtx> queryCtx_;
const std::unique_ptr<velox::core::ExecCtx> execCtx_;
VeloxExprConverter exprConverter_;
TypeParser typeParser_;
bool isLambda_;
std::shared_ptr<const velox::core::LambdaTypedExpr> lambdaTypedExpr_;
};
} // namespace facebook::presto::eval
29 changes: 29 additions & 0 deletions presto-native-execution/presto_cpp/main/eval/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Copyright (c) Facebook, Inc. and its affiliates.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
add_executable(presto_expr_eval_test PrestoExprEvalTest.cpp)

add_test(presto_expr_eval_test presto_expr_eval_test)

target_link_libraries(
presto_expr_eval_test
presto_expr_eval
presto_http
velox_exec_test_lib
velox_presto_serializer
gtest
gtest_main
${PROXYGEN_LIBRARIES})

set_property(TARGET presto_expr_eval_test PROPERTY JOB_POOL_LINK
presto_link_job_pool)
Loading

0 comments on commit 418b219

Please sign in to comment.