diff --git a/CMakeLists.txt b/CMakeLists.txt index fa3bd84591c9..aceeaccb7526 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1151,6 +1151,12 @@ function(ADD_YB_TEST REL_TEST_NAME) endif() endfunction() +function(ADD_YB_TESTS TESTS) + foreach(TEST ${TESTS}) + ADD_YB_TEST(${TEST}) + endforeach(TEST) +endfunction() + # A wrapper for add_dependencies() that is compatible with NO_TESTS. function(ADD_YB_TEST_DEPENDENCIES REL_TEST_NAME) if(NO_TESTS) diff --git a/src/yb/rpc/CMakeLists.txt b/src/yb/rpc/CMakeLists.txt index 5c8eb44639ee..aeff3ac55084 100644 --- a/src/yb/rpc/CMakeLists.txt +++ b/src/yb/rpc/CMakeLists.txt @@ -30,6 +30,8 @@ # under the License. # +YB_INCLUDE_EXTENSIONS() + #### Global header protobufs PROTOBUF_GENERATE_CPP( RPC_HEADER_PROTO_SRCS RPC_HEADER_PROTO_HDRS RPC_HEADER_PROTO_TGTS @@ -80,8 +82,10 @@ set(YRPC_SRCS serialization.cc service_if.cc service_pool.cc + tcp_stream.cc thread_pool.cc - yb_rpc.cc) + yb_rpc.cc + ${RPC_SRCS_EXTENSIONS}) set(YRPC_LIBS redis_protocol_proto @@ -89,7 +93,8 @@ set(YRPC_LIBS rpc_introspection_proto yb_util gutil - libev) + libev + ${RPC_LIBS_EXTENSIONS}) ADD_YB_LIBRARY(yrpc SRCS ${YRPC_SRCS} @@ -136,3 +141,4 @@ ADD_YB_TEST(rpc-test) ADD_YB_TEST(rpc_stub-test RUN_SERIAL true) ADD_YB_TEST(scheduler-test) ADD_YB_TEST(thread_pool-test) +ADD_YB_TESTS(${RPC_ADDITIONAL_TESTS}) diff --git a/src/yb/rpc/connection.cc b/src/yb/rpc/connection.cc index 97e2d200f1d1..9335348efe64 100644 --- a/src/yb/rpc/connection.cc +++ b/src/yb/rpc/connection.cc @@ -70,117 +70,48 @@ namespace rpc { /// Connection /// Connection::Connection(Reactor* reactor, - const Endpoint& remote, - int socket, + std::unique_ptr stream, Direction direction, std::unique_ptr context) : reactor_(reactor), - socket_(socket), - remote_(remote), + stream_(std::move(stream)), direction_(direction), last_activity_time_(CoarseMonoClock::Now()), - read_buffer_(&context->Allocator(), context->BufferLimit()), context_(std::move(context)) { const auto metric_entity = reactor->messenger()->metric_entity(); handler_latency_outbound_transfer_ = metric_entity ? METRIC_handler_latency_outbound_transfer.Instantiate(metric_entity) : nullptr; } -Connection::~Connection() { - // Must clear the outbound_transfers_ list before deleting. - CHECK(sending_.empty()); +Connection::~Connection() {} - // It's crucial that the connection is Shutdown first -- otherwise - // our destructor will end up calling read_io_.stop() and write_io_.stop() - // from a possibly non-reactor thread context. This can then make all - // hell break loose with libev. - CHECK(!is_epoll_registered_); -} - -void Connection::EpollRegister(ev::loop_ref& loop) { // NOLINT - DCHECK(reactor_->IsCurrentThread()); - - DVLOG(4) << "Registering connection for epoll: " << ToString(); - io_.set(loop); - io_.set(this); - int events; - if (connected_) { - events = ev::READ; - if (direction_ == Direction::CLIENT) { - events |= ev::WRITE; - } - } else { - events = ev::WRITE; +void UpdateIdleReason(const char* message, bool* result, std::string* reason) { + *result = false; + if (reason) { + AppendWithSeparator(message, reason); } - io_.start(socket_.GetFd(), events); - - timer_.set(loop); - timer_.set(this); // NOLINT - - is_epoll_registered_ = true; } -bool Connection::Idle() const { +bool Connection::Idle(std::string* reason_not_idle) const { DCHECK(reactor_->IsCurrentThread()); - // Check if we're in the middle of receiving something. - if (!read_buffer_.empty()) { - return false; - } - // Check if we still need to send something. - if (!sending_.empty()) { - return false; - } + bool result = stream_->Idle(reason_not_idle); - // Can't kill a connection if calls are waiting for a response. TODO: why? + // Connection is not idle if calls are waiting for a response. if (!awaiting_response_.empty()) { - return false; + UpdateIdleReason("awaiting response", &result, reason_not_idle); } // Check upstream logic (i.e. processing calls, etc.) - if (!context_->Idle()) { - return false; - } - - return true; + return context_->Idle(reason_not_idle) && result; } -string Connection::ReasonNotIdle() const { - DCHECK(reactor_->IsCurrentThread()); - string reason; - // Check if we're in the middle of receiving something. - if (!read_buffer_.empty()) { - reason += "read buffer not empty"; - } - - // Check if we still need to send something. - if (!sending_.empty()) { - AppendWithSeparator("still sending", &reason); - } - - // Can't kill a connection if calls are waiting for a response. TODO: why? - if (!awaiting_response_.empty()) { - AppendWithSeparator("awaiting response", &reason); - } - - // Check upstream logic (i.e. processing calls, etc.) - if (!context_->Idle()) { - AppendWithSeparator("context not idle: " + context_->ReasonNotIdle(), &reason); - } +std::string Connection::ReasonNotIdle() const { + std::string reason; + Idle(&reason); return reason; } -void Connection::ClearSending(const Status& status) { - // Clear any outbound transfers. - for (auto& call : sending_outbound_datas_) { - if (call) { - call->Transferred(status, this); - } - } - sending_outbound_datas_.clear(); - sending_.clear(); -} - void Connection::Shutdown(const Status& status) { DCHECK(reactor_->IsCurrentThread()); @@ -190,14 +121,6 @@ void Connection::Shutdown(const Status& status) { shutdown_status_ = status; } - if (!read_buffer_.empty()) { - double secs_since_active = ToSeconds(reactor_->cur_time() - last_activity_time_); - LOG(WARNING) << "Shutting down connection " << ToString() << " with pending inbound data (" - << read_buffer_ << ", last active " - << HumanReadableElapsedTime::ToShortString(secs_since_active) - << " ago, status=" << status.ToString() << ")"; - } - // Clear any calls which have been sent and were awaiting a response. for (auto& v : awaiting_response_) { if (v.second) { @@ -206,8 +129,6 @@ void Connection::Shutdown(const Status& status) { } awaiting_response_.clear(); - ClearSending(status); - for (auto& call : outbound_data_being_processed_) { call->Transferred(status, this); } @@ -215,27 +136,20 @@ void Connection::Shutdown(const Status& status) { context_->Shutdown(status); - io_.stop(); + stream_->Shutdown(status); timer_.stop(); - - is_epoll_registered_ = false; - WARN_NOT_OK(socket_.Close(), "Error closing socket"); } void Connection::OutboundQueued() { DCHECK(reactor_->IsCurrentThread()); - if (connected_ && !waiting_write_ready_) { - // If we weren't waiting write to be ready, we could try to write data to socket. - auto status = DoWrite(); - if (!status.ok()) { - reactor_->ScheduleReactorTask( - MakeFunctorReactorTask(std::bind(&Reactor::DestroyConnection, - reactor_, - this, - status), - shared_from_this())); - } + auto status = stream_->TryWrite(); + if (!status.ok()) { + VLOG_WITH_PREFIX(1) << "Write failed: " << status; + reactor_->ScheduleReactorTask( + MakeFunctorReactorTask( + std::bind(&Reactor::DestroyConnection, reactor_, this, status), + shared_from_this())); } } @@ -247,13 +161,13 @@ void Connection::HandleTimeout(ev::timer& watcher, int revents) { // NOLINT DCHECK(reactor_->IsCurrentThread()); if (EV_ERROR & revents) { - LOG(WARNING) << "Connection " << ToString() << " got an error in handle timeout"; + LOG_WITH_PREFIX(WARNING) << "Got an error in handle timeout"; return; } auto now = CoarseMonoClock::Now(); - if (!connected_) { + if (!stream_->IsConnected()) { auto deadline = last_activity_time_ + FLAGS_rpc_connection_timeout_ms * 1ms; if (now > deadline) { auto passed = reactor_->cur_time() - last_activity_time_; @@ -288,155 +202,47 @@ void Connection::HandleTimeout(ev::timer& watcher, int revents) { // NOLINT void Connection::QueueOutboundCall(const OutboundCallPtr& call) { DCHECK(call); DCHECK_EQ(direction_, Direction::CLIENT); - DCHECK(reactor_->IsCurrentThread()); - - if (PREDICT_FALSE(!shutdown_status_.ok())) { - // Already shutdown - call->SetFailed(shutdown_status_); - return; - } - // Serialize the actual bytes to be put on the wire. - call->Serialize(&sending_); + DoQueueOutboundData(call, true); - sending_outbound_datas_.resize(sending_.size()); - sending_outbound_datas_.back() = call; call->SetQueued(); } -void Connection::Handler(ev::io& watcher, int revents) { // NOLINT +void Connection::DoQueueOutboundData(OutboundDataPtr outbound_data, bool batch) { DCHECK(reactor_->IsCurrentThread()); - DVLOG(3) << ToString() << " Handler(revents=" << revents << ")"; - auto status = Status::OK(); - if (revents & EV_ERROR) { - status = STATUS(NetworkError, ToString() + ": Handler encountered an error"); - } - - if (status.ok() && (revents & EV_READ)) { - status = ReadHandler(); - } - - if (status.ok() && (revents & EV_WRITE)) { - bool just_connected = !connected_; - if (just_connected) { - connected_ = true; - context_->Connected(shared_from_this()); - } - status = WriteHandler(just_connected); - } - - if (status.ok()) { - UpdateEvents(); - } else { - reactor_->DestroyConnection(this, status); - } -} - -void Connection::UpdateEvents() { - int events = 0; - if (!read_buffer_full_) { - events |= ev::READ; - } - waiting_write_ready_ = !sending_.empty(); - if (waiting_write_ready_) { - events |= ev::WRITE; - } - if (events) { - io_.set(events); + if (!shutdown_status_.ok()) { + outbound_data->Transferred(shutdown_status_, this); + return; } -} -Status Connection::ReadHandler() { - DCHECK(reactor_->IsCurrentThread()); - last_activity_time_ = reactor_->cur_time(); - - for (;;) { - auto received = Receive(); - if (PREDICT_FALSE(!received.ok())) { - if (received.status().error_code() == ESHUTDOWN) { - VLOG(1) << ToString() << " shut down by remote end."; - } else { - if (direction_ == Direction::CLIENT) { - LOG(INFO) << ToString() << " recv error: " << received; - } else { - YB_LOG_EVERY_N(INFO, 50) << ToString() << " recv error: " << received; - } - } - return received.status(); - } - // Exit the loop if we did not receive anything. - if (!received.get()) { - return Status::OK(); - } - // If we were not able to process next call exit loop. - // If status is ok, it means that we just do not have enough data to process yet. - auto continue_receiving = TryProcessCalls(); - if (!continue_receiving.ok()) { - return continue_receiving.status(); - } - if (!continue_receiving.get()) { - return Status::OK(); - } - } -} + // If the connection is torn down, then the QueueOutbound() call that + // eventually runs in the reactor thread will take care of calling + // ResponseTransferCallbacks::NotifyTransferAborted. -Result Connection::Receive() { - auto iov = read_buffer_.PrepareAppend(); - if (!iov.ok()) { - if (iov.status().IsBusy()) { - read_buffer_full_ = true; - return false; - } - return iov.status(); - } - read_buffer_full_ = false; + stream_->Send(std::move(outbound_data)); - auto nread = socket_.Recvv(iov.get_ptr()); - if (!nread.ok()) { - if (Socket::IsTemporarySocketError(nread.status())) { - return false; - } - return nread.status(); + if (!batch) { + OutboundQueued(); } - - read_buffer_.DataAppended(*nread); - return *nread != 0; } void Connection::ParseReceived() { - auto result = TryProcessCalls(); - if (!result.ok()) { - reactor_->DestroyConnection(this, result.status()); - } - if (read_buffer_full_) { - read_buffer_full_ = false; - UpdateEvents(); - } + stream_->ParseReceived(); } -Result Connection::TryProcessCalls() { - DCHECK(reactor_->IsCurrentThread()); - - if (read_buffer_.empty()) { - return false; - } - - rpc::ReadBufferFull read_buffer_full(read_buffer_.full()); - auto consumed = context_->ProcessCalls( - shared_from_this(), read_buffer_.AppendedVecs(), read_buffer_full); +Result Connection::ProcessReceived(const IoVecs& data, ReadBufferFull read_buffer_full) { + auto consumed = context_->ProcessCalls(shared_from_this(), data, read_buffer_full); if (PREDICT_FALSE(!consumed.ok())) { - LOG(WARNING) << ToString() << " command sequence failure: " << consumed; + LOG_WITH_PREFIX(WARNING) << "Command sequence failure: " << consumed; return consumed.status(); } if (!*consumed && read_buffer_full && context_->Idle()) { - return STATUS_FORMAT( - InvalidArgument, "Command is greater than read buffer: $0", read_buffer_.size()); + return STATUS(InvalidArgument, "Command is greater than read buffer"); } - read_buffer_.Consume(*consumed); - return true; + return *consumed; } Status Connection::HandleCallResponse(std::vector* call_data) { @@ -447,8 +253,8 @@ Status Connection::HandleCallResponse(std::vector* call_data) { ++responded_call_count_; auto awaiting = awaiting_response_.find(resp.call_id()); if (awaiting == awaiting_response_.end()) { - LOG(ERROR) << ToString() << ": Got a response for call id " << resp.call_id() << " which " - << "was not pending! Ignoring."; + LOG_WITH_PREFIX(ERROR) << "Got a response for call id " << resp.call_id() << " which " + << "was not pending! Ignoring."; DCHECK(awaiting != awaiting_response_.end()); return Status::OK(); } @@ -466,67 +272,6 @@ Status Connection::HandleCallResponse(std::vector* call_data) { return Status::OK(); } -Status Connection::WriteHandler(bool just_connected) { - DCHECK(reactor_->IsCurrentThread()); - - if (sending_.empty()) { - LOG_IF(WARNING, !just_connected) << ToString() << " got a ready-to-write callback, " - << "but there is nothing to write."; - return Status::OK(); - } - - return DoWrite(); -} - -Status Connection::DoWrite() { - if (!is_epoll_registered_) { - return Status::OK(); - } - while (!sending_.empty()) { - const size_t kMaxIov = 16; - iovec iov[kMaxIov]; - const int iov_len = static_cast(std::min(kMaxIov, sending_.size())); - size_t offset = send_position_; - for (auto i = 0; i != iov_len; ++i) { - iov[i].iov_base = sending_[i].data() + offset; - iov[i].iov_len = sending_[i].size() - offset; - offset = 0; - } - - last_activity_time_ = reactor_->cur_time(); - int32_t written = 0; - - auto status = socket_.Writev(iov, iov_len, &written); - if (PREDICT_FALSE(!status.ok())) { - if (!Socket::IsTemporarySocketError(status)) { - if (direction_ == Direction::CLIENT) { - LOG(WARNING) << ToString() << " send error: " << status.ToString(); - } else { - YB_LOG_EVERY_N(WARNING, 50) << ToString() << " send error: " << status.ToString(); - } - return status; - } else { - waiting_write_ready_ = true; - io_.set(ev::READ|ev::WRITE); - return Status::OK(); - } - } - - send_position_ += written; - while (!sending_.empty() && send_position_ >= sending_.front().size()) { - auto call = sending_outbound_datas_.front(); - send_position_ -= sending_.front().size(); - sending_.pop_front(); - sending_outbound_datas_.pop_front(); - if (call) { - call->Transferred(Status::OK(), this); - } - } - } - - return Status::OK(); -} - void Connection::CallSent(OutboundCallPtr call) { DCHECK(reactor_->IsCurrentThread()); @@ -549,17 +294,18 @@ std::string Connection::ToString() const { // include anything in the output about the current state, // which might concurrently change from another thread. static const char* format = "Connection ($0) $1 $2 => $3"; + const void* self = this; if (direction_ == Direction::SERVER) { - return strings::Substitute(format, this, "server", yb::ToString(remote_), yb::ToString(local_)); + return Format(format, self, "server", remote(), local()); } else { - return strings::Substitute(format, this, "client", yb::ToString(local_), yb::ToString(remote_)); + return Format(format, self, "client", local(), remote()); } } Status Connection::DumpPB(const DumpRunningRpcsRequestPB& req, RpcConnectionPB* resp) { DCHECK(reactor_->IsCurrentThread()); - resp->set_remote_ip(yb::ToString(remote_)); + resp->set_remote_ip(yb::ToString(remote())); resp->set_state(context_->State()); const uint64_t processed_call_count = @@ -578,12 +324,8 @@ Status Connection::DumpPB(const DumpRunningRpcsRequestPB& req, call_in_flight = resp->add_calls_in_flight(); } } - for (auto& call : sending_outbound_datas_) { - if (call && call->DumpPB(req, call_in_flight)) { - call_in_flight = resp->add_calls_in_flight(); - } - } resp->mutable_calls_in_flight()->DeleteSubrange(resp->calls_in_flight_size() - 1, 1); + stream_->DumpPB(req, resp); } else if (direction_ != Direction::SERVER) { LOG(FATAL) << "Invalid direction: " << to_underlying(direction_); } @@ -607,7 +349,7 @@ void Connection::QueueOutboundData(OutboundDataPtr outbound_data) { return; } - bool was_empty = false; + bool was_empty; { std::unique_lock lock(outbound_data_queue_lock_); if (!shutdown_status_.ok()) { @@ -652,58 +394,58 @@ void Connection::ProcessResponseQueue() { } } -void Connection::DoQueueOutboundData(OutboundDataPtr outbound_data, bool batch) { +Status Connection::Start(ev::loop_ref* loop) { DCHECK(reactor_->IsCurrentThread()); - if (!shutdown_status_.ok()) { - outbound_data->Transferred(shutdown_status_, this); - return; + RETURN_NOT_OK(stream_->Start(direction_ == Direction::CLIENT, loop, this)); + + timer_.set(*loop); + timer_.set(this); // NOLINT + + if (!stream_->IsConnected()) { + StartTimer(FLAGS_rpc_connection_timeout_ms * 1ms, &timer_); } - // If the connection is torn down, then the QueueOutbound() call that - // eventually runs in the reactor thread will take care of calling - // ResponseTransferCallbacks::NotifyTransferAborted. + auto self = shared_from_this(); + context_->AssignConnection(self); - outbound_data->Serialize(&sending_); + return Status::OK(); +} - sending_outbound_datas_.resize(sending_.size()); - sending_outbound_datas_.back() = outbound_data; +void Connection::Connected() { + context_->Connected(shared_from_this()); +} - if (!batch) { - OutboundQueued(); - } +const Endpoint& Connection::remote() const { + return stream_->Remote(); } -Status Connection::Start(ev::loop_ref* loop) { - DCHECK(reactor_->IsCurrentThread()); +const Protocol* Connection::protocol() const { + return stream_->GetProtocol(); +} - RETURN_NOT_OK(socket_.SetNoDelay(true)); - RETURN_NOT_OK(socket_.SetSendTimeout(FLAGS_rpc_connection_timeout_ms * 1ms)); - RETURN_NOT_OK(socket_.SetRecvTimeout(FLAGS_rpc_connection_timeout_ms * 1ms)); +const Endpoint& Connection::local() const { + return stream_->Local(); +} - if (direction_ == Direction::CLIENT) { - auto status = socket_.Connect(remote_); - if (!status.ok() && !Socket::IsTemporarySocketError(status)) { - LOG(WARNING) << "Failed to create connection " << ToString() - << " because connect failed: " << status; - return status; - } - } - RETURN_NOT_OK(socket_.GetSocketAddress(&local_)); +void Connection::Close() { + stream_->Close(); +} - EpollRegister(*loop); +void Connection::UpdateLastActivity() { + last_activity_time_ = reactor_->cur_time(); +} - if (!connected_) { - StartTimer(FLAGS_rpc_connection_timeout_ms * 1ms, &timer_); - } +void Connection::Transferred(const OutboundDataPtr& data, const Status& status) { + data->Transferred(status, this); +} - auto self = shared_from_this(); - context_->AssignConnection(self); - if (direction_ == Direction::SERVER) { - context_->Connected(self); - } +void Connection::Destroy(const Status& status) { + reactor_->DestroyConnection(this, status); +} - return Status::OK(); +std::string Connection::LogPrefix() const { + return ToString() + ": "; } } // namespace rpc diff --git a/src/yb/rpc/connection.h b/src/yb/rpc/connection.h index 499352fe46cf..049307ec075a 100644 --- a/src/yb/rpc/connection.h +++ b/src/yb/rpc/connection.h @@ -54,6 +54,7 @@ #include "yb/rpc/outbound_call.h" #include "yb/rpc/inbound_call.h" #include "yb/rpc/server_event.h" +#include "yb/rpc/stream.h" #include "yb/util/enums.h" #include "yb/util/monotime.h" @@ -67,12 +68,6 @@ namespace yb { namespace rpc { -class Connection; -class DumpRunningRpcsRequestPB; -class GrowableBuffer; -class Reactor; -class ReactorTask; - YB_DEFINE_ENUM(ConnectionDirection, (CLIENT)(SERVER)); typedef boost::container::small_vector_base OutboundDataBatch; @@ -94,7 +89,7 @@ typedef boost::container::small_vector_base OutboundDataBatch; // This class is not fully thread-safe. It is accessed only from the context of a // single Reactor except where otherwise specified. // -class Connection final : public std::enable_shared_from_this { +class Connection final : public StreamContext, public std::enable_shared_from_this { public: typedef ConnectionDirection Direction; @@ -106,26 +101,22 @@ class Connection final : public std::enable_shared_from_this { // context: context for this connection. Context is used by connection to handle // protocol specific actions, such as parsing of incoming data into calls. Connection(Reactor* reactor, - const Endpoint& remote, - int socket, + std::unique_ptr stream, Direction direction, std::unique_ptr context); ~Connection(); - // Register our socket with an epoll loop. We will only ever be registered in - // one epoll loop at a time. - void EpollRegister(ev::loop_ref& loop); // NOLINT - CoarseMonoClock::TimePoint last_activity_time() const { return last_activity_time_; } // Returns true if we are not in the process of receiving or sending a // message, and we have no outstanding calls. - bool Idle() const; + // When reason_not_idle is specified it contains reason why this connection is not idle. + bool Idle(std::string* reason_not_idle = nullptr) const; - // A human-readable reason why the connection is not idle. + // A human-readable reason why the connection is not idle. Empty string if connection is idle. std::string ReasonNotIdle() const; // Fail any calls which are currently queued or awaiting response. @@ -140,30 +131,20 @@ class Connection final : public std::enable_shared_from_this { void QueueOutboundCall(const OutboundCallPtr& call); // The address of the remote end of the connection. - const Endpoint& remote() const { return remote_; } + const Endpoint& remote() const; - // The address of the local end of the connection. - const Endpoint& local() const { return local_; } + const Protocol* protocol() const; - // libev callback when there are some events in socket. - void Handler(ev::io& watcher, int revents); // NOLINT + // The address of the local end of the connection. + const Endpoint& local() const; void HandleTimeout(ev::timer& watcher, int revents); // NOLINT - // Invoked when we have something to read. - CHECKED_STATUS ReadHandler(); - - // Invoked when socket is ready for writing. - // `just_connected` is used to avoid flooding log on each connect. - CHECKED_STATUS WriteHandler(bool just_connected); - // Safe to be called from other threads. std::string ToString() const; Direction direction() const { return direction_; } - Socket* socket() { return &socket_; } - // Queue a call response back to the client on the server side. // // This is usually called by the IPC worker thread when the response is set, but in some @@ -195,6 +176,8 @@ class Connection final : public std::enable_shared_from_this { // Try to parse already received data. void ParseReceived(); + void Close(); + private: CHECKED_STATUS DoWrite(); @@ -203,46 +186,25 @@ class Connection final : public std::enable_shared_from_this { void ProcessResponseQueue(); - void ClearSending(const Status& status); - - Result Receive(); + void UpdateLastActivity() override; + void Transferred(const OutboundDataPtr& data, const Status& status) override; + void Destroy(const Status& status) override; + Result ProcessReceived(const IoVecs& data, ReadBufferFull read_buffer_full) override; + void Connected() override; - // Try to parse received data into calls and process them. - Result TryProcessCalls(); - - // Updates listening events. - void UpdateEvents(); + std::string LogPrefix() const; // The reactor thread that created this connection. Reactor* const reactor_; - // The socket we're communicating on. - Socket socket_; - - // The remote address we're talking from. - Endpoint local_; - - // The remote address we're talking to. - const Endpoint remote_; + std::unique_ptr stream_; // whether we are client or server Direction direction_; - bool connected_ = false; - - bool read_buffer_full_ = false; - // The last time we read or wrote from the socket. CoarseMonoClock::TimePoint last_activity_time_; - // Notifies us when our socket is readable or writable. - ev::io io_; - - // Set to true when the connection is registered on a loop. - // This is used for a sanity check in the destructor that we are properly - // un-registered before shutting down. - bool is_epoll_registered_ = false; - // Calls which have been sent and are now waiting for a response. std::unordered_map awaiting_response_; @@ -272,15 +234,6 @@ class Connection final : public std::enable_shared_from_this { CompareExpiration> expiration_queue_; ev::timer timer_; - // Data received on this connection that has not been processed yet. - GrowableBuffer read_buffer_; - - // sending_* contain bytes and calls we are currently sending to socket - std::deque sending_; - std::deque sending_outbound_datas_; - size_t send_position_ = 0; - bool waiting_write_ready_ = false; - simple_spinlock outbound_data_queue_lock_; // Responses we are going to process. diff --git a/src/yb/rpc/connection_context.h b/src/yb/rpc/connection_context.h index bca4d71d858d..b3c67acf9df3 100644 --- a/src/yb/rpc/connection_context.h +++ b/src/yb/rpc/connection_context.h @@ -29,7 +29,6 @@ class MemTracker; namespace rpc { typedef std::function IdleListener; -YB_STRONGLY_TYPED_BOOL(ReadBufferFull); class GrowableBufferAllocator; @@ -49,10 +48,9 @@ class ConnectionContext { virtual void DumpPB(const DumpRunningRpcsRequestPB& req, RpcConnectionPB* resp) = 0; // Checks whether this connection context is idle. - virtual bool Idle() = 0; - - // Returns a human-readable description of why the context is not idle. - virtual std::string ReasonNotIdle() = 0; + // If reason is supplied, then human-readable description of why the context is not idle is + // appended to it. + virtual bool Idle(std::string* reason_not_idle = nullptr) = 0; // Listen for when context becomes idle. virtual void ListenIdle(IdleListener listener) = 0; diff --git a/src/yb/rpc/messenger.cc b/src/yb/rpc/messenger.cc index b049709a1cfb..327efe82ef77 100644 --- a/src/yb/rpc/messenger.cc +++ b/src/yb/rpc/messenger.cc @@ -59,6 +59,7 @@ #include "yb/rpc/proxy.h" #include "yb/rpc/rpc_header.pb.h" #include "yb/rpc/rpc_service.h" +#include "yb/rpc/tcp_stream.h" #include "yb/rpc/yb_rpc.h" #include "yb/util/errno.h" @@ -108,7 +109,9 @@ MessengerBuilder::MessengerBuilder(std::string name) coarse_timer_granularity_(100ms), connection_context_factory_( rpc::CreateConnectionContextFactory( - FLAGS_outbound_rpc_block_size, FLAGS_outbound_rpc_memory_limit)) { + FLAGS_outbound_rpc_block_size, FLAGS_outbound_rpc_memory_limit)), + listen_protocol_(TcpStream::StaticProtocol()) { + AddStreamFactory(TcpStream::StaticProtocol(), TcpStream::Factory()); } MessengerBuilder& MessengerBuilder::set_connection_keepalive_time( @@ -143,6 +146,13 @@ Result> MessengerBuilder::Build() { messenger.release(), std::mem_fun(&Messenger::AllExternalReferencesDropped)); } +MessengerBuilder &MessengerBuilder::AddStreamFactory( + const Protocol* protocol, StreamFactoryPtr factory) { + auto p = stream_factories_.emplace(protocol, std::move(factory)); + LOG_IF(DFATAL, !p.second) << "Duplicate stream factory: " << protocol->ToString(); + return *this; +} + // ------------------------------------------------------------------------------------------------ // Messenger // ------------------------------------------------------------------------------------------------ @@ -383,6 +393,8 @@ void Messenger::RegisterInboundSocket( Messenger::Messenger(const MessengerBuilder &bld) : name_(bld.name_), connection_context_factory_(bld.connection_context_factory_), + stream_factories_(bld.stream_factories_), + listen_protocol_(bld.listen_protocol_), metric_entity_(bld.metric_entity_), retain_self_(this), io_thread_pool_(FLAGS_io_thread_pool_size), diff --git a/src/yb/rpc/messenger.h b/src/yb/rpc/messenger.h index 102dfe1f43b9..f7eedeb7f2ef 100644 --- a/src/yb/rpc/messenger.h +++ b/src/yb/rpc/messenger.h @@ -73,6 +73,8 @@ namespace rpc { template class ConnectionContextFactoryImpl; +typedef std::unordered_map StreamFactories; + // Used to construct a Messenger. class MessengerBuilder { public: @@ -98,6 +100,13 @@ class MessengerBuilder { return *this; } + MessengerBuilder &AddStreamFactory(const Protocol* protocol, StreamFactoryPtr factory); + + MessengerBuilder &SetListenProtocol(const Protocol* protocol) { + listen_protocol_ = protocol; + return *this; + } + template MessengerBuilder &CreateConnectionContextFactory( size_t block_size, size_t memory_limit, @@ -129,6 +138,8 @@ class MessengerBuilder { CoarseMonoClock::Duration coarse_timer_granularity_; scoped_refptr metric_entity_; ConnectionContextFactoryPtr connection_context_factory_; + StreamFactories stream_factories_; + const Protocol* listen_protocol_; }; // A Messenger is a container for the reactor threads which run event loops for the RPC services. @@ -175,13 +186,15 @@ class Messenger : public ProxyContext { // Queue a call for transmission. This will pick the appropriate reactor, and enqueue a task on // that reactor to assign and send the call. - void QueueOutboundCall(OutboundCallPtr call); + void QueueOutboundCall(OutboundCallPtr call) override; // Enqueue a call for processing on the server. - void QueueInboundCall(InboundCallPtr call); + void QueueInboundCall(InboundCallPtr call) override; // Invoke the RpcService to handle a call directly. - void Handle(InboundCallPtr call); + void Handle(InboundCallPtr call) override; + + const Protocol* DefaultProtocol() override { return listen_protocol_; } CHECKED_STATUS QueueEventOnAllReactors(ServerEventListPtr server_event); @@ -207,7 +220,7 @@ class Messenger : public ProxyContext { return name_; } - scoped_refptr metric_entity() const { return metric_entity_; } + scoped_refptr metric_entity() const override { return metric_entity_; } scoped_refptr rpc_service(const std::string& service_name) const; @@ -223,7 +236,7 @@ class Messenger : public ProxyContext { return scheduler_; } - IoService& io_service() { + IoService& io_service() override { return io_thread_pool_.io_service(); } @@ -251,6 +264,10 @@ class Messenger : public ProxyContext { ConnectionContextFactoryPtr connection_context_factory_; + const StreamFactories stream_factories_; + + const Protocol* const listen_protocol_; + // Protects closing_, acceptor_pools_, rpc_services_. mutable percpu_rwlock lock_; diff --git a/src/yb/rpc/outbound_call.cc b/src/yb/rpc/outbound_call.cc index f9b1877a4ce5..ccd42f80f4b1 100644 --- a/src/yb/rpc/outbound_call.cc +++ b/src/yb/rpc/outbound_call.cc @@ -167,8 +167,7 @@ OutboundCall::OutboundCall( const std::shared_ptr& outbound_call_metrics, google::protobuf::Message* response_storage, RpcController* controller, ResponseCallback callback) - : conn_id_(Endpoint(), 0), - start_(MonoTime::Now()), + : start_(MonoTime::Now()), controller_(DCHECK_NOTNULL(controller)), response_(DCHECK_NOTNULL(response_storage)), call_id_(NextCallId()), @@ -496,13 +495,14 @@ void OutboundCall::InitHeader(RequestHeader* header) { /// string ConnectionId::ToString() const { - return Format("{ remote: $0 idx: $1 }", remote_, idx_); + return Format("{ remote: $0 idx: $1 protocol: $2 }", remote_, idx_, protocol_); } size_t ConnectionId::HashCode() const { size_t seed = 0; boost::hash_combine(seed, hash_value(remote_)); boost::hash_combine(seed, idx_); + boost::hash_combine(seed, protocol_); return seed; } diff --git a/src/yb/rpc/outbound_call.h b/src/yb/rpc/outbound_call.h index 8a14e09a5b07..e6e8e6d1a0de 100644 --- a/src/yb/rpc/outbound_call.h +++ b/src/yb/rpc/outbound_call.h @@ -78,12 +78,16 @@ class RpcController; // This class is copyable for STL compatibility, but not assignable (use CopyFrom() for that). class ConnectionId { public: + ConnectionId() {} + // Convenience constructor. - ConnectionId(const Endpoint& remote, size_t idx) : remote_(remote), idx_(idx) {} + ConnectionId(const Endpoint& remote, size_t idx, const Protocol* protocol) + : remote_(remote), idx_(idx), protocol_(protocol) {} // The remote address. const Endpoint& remote() const { return remote_; } uint8_t idx() const { return idx_; } + const Protocol* protocol() const { return protocol_; } // Returns a string representation of the object, not including the password field. std::string ToString() const; @@ -94,6 +98,7 @@ class ConnectionId { // Remember to update HashCode() and Equals() when new fields are added. Endpoint remote_; uint8_t idx_ = 0; // Connection index, used to support multiple connections to the same server. + const Protocol* protocol_; }; class ConnectionIdHash { @@ -102,7 +107,7 @@ class ConnectionIdHash { }; inline bool operator==(const ConnectionId& lhs, const ConnectionId& rhs) { - return lhs.remote() == rhs.remote() && lhs.idx() == rhs.idx(); + return lhs.remote() == rhs.remote() && lhs.idx() == rhs.idx() && lhs.protocol() == rhs.protocol(); } // Container for OutboundCall metrics @@ -292,7 +297,7 @@ class OutboundCall : public RpcCall { static std::string StateName(State state); - virtual void NotifyTransferred(const Status& status, Connection* conn) override; + void NotifyTransferred(const Status& status, Connection* conn) override; void set_state(State new_state); State state() const; diff --git a/src/yb/rpc/outbound_data.h b/src/yb/rpc/outbound_data.h index 50e4c1ebb64e..ebe41747fa2e 100644 --- a/src/yb/rpc/outbound_data.h +++ b/src/yb/rpc/outbound_data.h @@ -37,8 +37,6 @@ class OutboundData : public std::enable_shared_from_this { public: virtual void Transferred(const Status& status, Connection* conn) = 0; - virtual ~OutboundData() {} - // Serializes the data to be sent out via the RPC framework. virtual void Serialize(std::deque *output) const = 0; @@ -47,6 +45,8 @@ class OutboundData : public std::enable_shared_from_this { } virtual bool DumpPB(const DumpRunningRpcsRequestPB& req, RpcCallInProgressPB* resp) = 0; + + virtual ~OutboundData() {} }; typedef std::shared_ptr OutboundDataPtr; diff --git a/src/yb/rpc/protoc-gen-yrpc.cc b/src/yb/rpc/protoc-gen-yrpc.cc index 950c7616fa11..e655056f00e9 100644 --- a/src/yb/rpc/protoc-gen-yrpc.cc +++ b/src/yb/rpc/protoc-gen-yrpc.cc @@ -606,7 +606,8 @@ class CodeGenerator : public ::google::protobuf::compiler::CodeGenerator { " public:\n" " $service_name$Proxy(\n" " ::yb::rpc::ProxyCache* cache,\n" - " const ::yb::HostPort &endpoint);\n" + " const ::yb::HostPort &endpoint,\n" + " const ::yb::rpc::Protocol* protocol = nullptr);\n" " ~$service_name$Proxy();\n" "\n" ); @@ -665,8 +666,9 @@ class CodeGenerator : public ::google::protobuf::compiler::CodeGenerator { Print(printer, *subs, "$service_name$Proxy::$service_name$Proxy(\n" " ::yb::rpc::ProxyCache* cache,\n" - " const ::yb::HostPort &remote)\n" - " : proxy_(cache->Get(remote)) {\n" + " const ::yb::HostPort &remote,\n" + " const ::yb::rpc::Protocol* protocol)\n" + " : proxy_(cache->Get(remote, protocol)) {\n" "}\n" "\n" "$service_name$Proxy::~$service_name$Proxy() {\n" diff --git a/src/yb/rpc/proxy.cc b/src/yb/rpc/proxy.cc index 55fbb56a803d..3c1e30000fcb 100644 --- a/src/yb/rpc/proxy.cc +++ b/src/yb/rpc/proxy.cc @@ -70,9 +70,11 @@ using std::shared_ptr; namespace yb { namespace rpc { -Proxy::Proxy(std::shared_ptr context, const HostPort& remote) +Proxy::Proxy(std::shared_ptr context, const HostPort& remote, + const Protocol* protocol) : context_(std::move(context)), remote_(remote), + protocol_(protocol ? protocol : context_->DefaultProtocol()), outbound_call_metrics_(context_->metric_entity() ? std::make_shared(context_->metric_entity()) : nullptr), call_local_service_(remote == HostPort()), @@ -244,7 +246,7 @@ void Proxy::ResolveDone( void Proxy::QueueCall(RpcController* controller, const Endpoint& endpoint) { uint8_t idx = num_calls_.fetch_add(1) % FLAGS_num_connections_to_server; - ConnectionId conn_id(endpoint, idx); + ConnectionId conn_id(endpoint, idx, protocol_); controller->call_->SetConnectionId(conn_id); context_->QueueOutboundCall(controller->call_); } @@ -266,11 +268,12 @@ Status Proxy::SyncRequest(const RemoteMethod* method, return controller->status(); } -std::shared_ptr ProxyCache::Get(const HostPort& remote) { +std::shared_ptr ProxyCache::Get(const HostPort& remote, const Protocol* protocol) { + ProxyKey key(remote, protocol); std::lock_guard lock(mutex_); - auto it = proxies_.find(remote); + auto it = proxies_.find(key); if (it == proxies_.end()) { - it = proxies_.emplace(remote, std::make_unique(context_, remote)).first; + it = proxies_.emplace(key, std::make_unique(context_, remote, protocol)).first; } return it->second; } diff --git a/src/yb/rpc/proxy.h b/src/yb/rpc/proxy.h index 86d3eaae4e1a..f9cf8c79ad7a 100644 --- a/src/yb/rpc/proxy.h +++ b/src/yb/rpc/proxy.h @@ -77,6 +77,8 @@ class ProxyContext { // Invoke the RpcService to handle a call directly. virtual void Handle(InboundCallPtr call) = 0; + virtual const Protocol* DefaultProtocol() = 0; + virtual IoService& io_service() = 0; virtual ~ProxyContext() {} @@ -101,7 +103,8 @@ class ProxyContext { class Proxy { public: Proxy(std::shared_ptr context, - const HostPort& remote); + const HostPort& remote, + const Protocol* protocol = nullptr); ~Proxy(); Proxy(const Proxy&) = delete; @@ -161,6 +164,7 @@ class Proxy { std::shared_ptr context_; HostPort remote_; + const Protocol* const protocol_; mutable std::atomic is_started_{false}; mutable std::atomic num_calls_{0}; std::shared_ptr outbound_call_metrics_; @@ -178,12 +182,23 @@ class ProxyCache { explicit ProxyCache(const std::shared_ptr& context) : context_(context) {} - std::shared_ptr Get(const HostPort& remote); + std::shared_ptr Get(const HostPort& remote, const Protocol* protocol); private: + typedef std::pair ProxyKey; + + struct ProxyKeyHash { + size_t operator()(const ProxyKey& key) const { + size_t result = 0; + boost::hash_combine(result, HostPortHash()(key.first)); + boost::hash_combine(result, key.second); + return result; + } + }; + std::shared_ptr context_; std::mutex mutex_; - std::unordered_map, HostPortHash> proxies_; + std::unordered_map, ProxyKeyHash> proxies_; }; } // namespace rpc diff --git a/src/yb/rpc/reactor.cc b/src/yb/rpc/reactor.cc index 3110e93dada7..d61514a33e95 100644 --- a/src/yb/rpc/reactor.cc +++ b/src/yb/rpc/reactor.cc @@ -482,6 +482,16 @@ Result CreateClientSocket(const Endpoint& remote) { return std::move(socket); } +template +Result> CreateStream( + const StreamFactories& factories, const Protocol* protocol, Args&&... args) { + auto it = factories.find(protocol); + if (it == factories.end()) { + return STATUS_FORMAT(NotFound, "Unknown protocol: $0", protocol); + } + return it->second->Create(std::forward(args)...); +} + } // namespace Status Reactor::FindOrStartConnection(const ConnectionId &conn_id, @@ -516,17 +526,22 @@ Status Reactor::FindOrStartConnection(const ConnectionId &conn_id, } } + auto context = messenger_->connection_context_factory_->Create(); + auto stream = VERIFY_RESULT(CreateStream( + messenger_->stream_factories_, conn_id.protocol(), conn_id.remote(), std::move(*sock), + &context->Allocator(), context->BufferLimit())); + // Register the new connection in our map. - auto connection = std::make_shared(this, - conn_id.remote(), - sock->Release(), - ConnectionDirection::CLIENT, - messenger_->connection_context_factory_->Create()); + auto connection = std::make_shared( + this, + std::move(stream), + ConnectionDirection::CLIENT, + std::move(context)); RETURN_NOT_OK(connection->Start(&loop_)); // Insert into the client connection map to avoid duplicate connection requests. - client_conns_.emplace(conn_id, connection); + CHECK(client_conns_.emplace(conn_id, connection).second); conn->swap(connection); return Status::OK(); @@ -535,23 +550,13 @@ Status Reactor::FindOrStartConnection(const ConnectionId &conn_id, namespace { void ShutdownIfRemoteAddressIs(const ConnectionPtr& conn, const IpAddress& address) { - auto socket = conn->socket(); - Endpoint peer; - auto status = socket->GetPeerAddress(&peer); - if (!status.ok()) { - LOG(WARNING) << "Failed to get peer address" << socket->GetFd() << ": " << status.ToString(); - return; - } + Endpoint peer = conn->remote(); if (peer.address() != address) { return; } - status = socket->Shutdown(/* shut_read */ true, /* shut_write */ true); - if (!status.ok()) { - LOG(WARNING) << "Failed to shutdown " << socket->GetFd() << ": " << status.ToString(); - return; - } + conn->Close(); LOG(INFO) << "Dropped connection: " << conn->ToString(); } @@ -581,7 +586,7 @@ void Reactor::DestroyConnection(Connection *conn, const Status &conn_status) { if (conn->direction() == ConnectionDirection::CLIENT) { bool erased = false; for (int idx = 0; idx < FLAGS_num_connections_to_server; idx++) { - auto it = client_conns_.find(ConnectionId(conn->remote(), idx)); + auto it = client_conns_.find(ConnectionId(conn->remote(), idx, conn->protocol())); if (it != client_conns_.end() && it->second.get() == conn) { client_conns_.erase(it); erased = true; @@ -790,9 +795,17 @@ void DelayedTask::TimerHandler(ev::timer& watcher, int revents) { void Reactor::RegisterInboundSocket(Socket *socket, const Endpoint& remote, std::unique_ptr connection_context) { VLOG(3) << name_ << ": new inbound connection to " << remote; + + auto stream = CreateStream( + messenger_->stream_factories_, messenger_->listen_protocol_, + remote, std::move(*socket), &connection_context->Allocator(), + connection_context->BufferLimit()); + if (!stream.ok()) { + LOG(DFATAL) << "Failed to create stream for " << remote << ": " << stream.status(); + return; + } auto conn = std::make_shared(this, - remote, - socket->Release(), + std::move(*stream), ConnectionDirection::SERVER, std::move(connection_context)); ScheduleReactorFunctor([conn = std::move(conn)](Reactor* reactor) { diff --git a/src/yb/rpc/rpc-test-base.cc b/src/yb/rpc/rpc-test-base.cc index 0128f3e5e176..5c09d3d0b652 100644 --- a/src/yb/rpc/rpc-test-base.cc +++ b/src/yb/rpc/rpc-test-base.cc @@ -62,9 +62,9 @@ Slice GetSidecarPointer(const RpcController& controller, int idx, int expected_s return Slice(sidecar.data(), expected_size); } -std::shared_ptr CreateMessenger(const std::string& name, - const scoped_refptr& metric_entity, - const MessengerOptions& options) { +MessengerBuilder CreateMessengerBuilder(const std::string& name, + const scoped_refptr& metric_entity, + const MessengerOptions& options) { MessengerBuilder bld(name); bld.set_num_reactors(options.n_reactors); static constexpr std::chrono::milliseconds kMinCoarseTimeGranularity(1); @@ -81,9 +81,13 @@ std::shared_ptr CreateMessenger(const std::string& name, bld.CreateConnectionContextFactory( FLAGS_outbound_rpc_block_size, FLAGS_outbound_rpc_memory_limit, MemTracker::FindOrCreateTracker(name)); - auto messenger = bld.Build(); - CHECK_OK(messenger); - return *messenger; + return bld; +} + +std::shared_ptr CreateMessenger(const std::string& name, + const scoped_refptr& metric_entity, + const MessengerOptions& options) { + return EXPECT_RESULT(CreateMessengerBuilder(name, metric_entity, options).Build()); } #ifdef THREAD_SANITIZER @@ -300,12 +304,10 @@ std::unique_ptr CreateCalculatorService( } TestServer::TestServer(std::unique_ptr service, - const scoped_refptr& metric_entity, + const std::shared_ptr& messenger, const TestServerOptions& options) : service_name_(service->service_name()), - messenger_(CreateMessenger("TestServer", - metric_entity, - options.messenger_options)), + messenger_(messenger), thread_pool_("rpc-test", kQueueLength, options.n_worker_threads) { // If it is CalculatorService then we should set messenger for it. @@ -429,7 +431,8 @@ void RpcTestBase::DoTestExpectTimeout(Proxy* proxy, const MonoDelta& timeout) { void RpcTestBase::StartTestServer(Endpoint* server_endpoint, const TestServerOptions& options) { std::unique_ptr service(new GenericCalculatorService(metric_entity_)); - server_.reset(new TestServer(std::move(service), metric_entity_, options)); + server_.reset(new TestServer( + std::move(service), CreateMessenger("TestServer", options.messenger_options), options)); *server_endpoint = server_->bound_endpoint(); } @@ -439,9 +442,18 @@ void RpcTestBase::StartTestServer(HostPort* server_hostport, const TestServerOpt *server_hostport = HostPort::FromBoundEndpoint(endpoint); } +TestServer RpcTestBase::StartTestServer(const std::string& name, const IpAddress& address) { + std::unique_ptr service(CreateCalculatorService(metric_entity(), name)); + TestServerOptions options; + options.endpoint = Endpoint(address, 0); + return TestServer(std::move(service), CreateMessenger("TestServer"), options); +} + void RpcTestBase::StartTestServerWithGeneratedCode(HostPort* server_hostport, const TestServerOptions& options) { - server_.reset(new TestServer(CreateCalculatorService(metric_entity_), metric_entity_, options)); + auto messenger = options.messenger ? options.messenger + : CreateMessenger("TestServer", options.messenger_options); + server_.reset(new TestServer(CreateCalculatorService(metric_entity_), messenger, options)); *server_hostport = HostPort::FromBoundEndpoint(server_->bound_endpoint()); } @@ -460,5 +472,10 @@ std::shared_ptr RpcTestBase::CreateMessenger(const string &name, return yb::rpc::CreateMessenger(name, metric_entity_, options); } +MessengerBuilder RpcTestBase::CreateMessengerBuilder(const string &name, + const MessengerOptions& options) { + return yb::rpc::CreateMessengerBuilder(name, metric_entity_, options); +} + } // namespace rpc } // namespace yb diff --git a/src/yb/rpc/rpc-test-base.h b/src/yb/rpc/rpc-test-base.h index bbd98ddefdc3..1837b3d8c718 100644 --- a/src/yb/rpc/rpc-test-base.h +++ b/src/yb/rpc/rpc-test-base.h @@ -76,7 +76,7 @@ class GenericCalculatorService : public ServiceIf { // this test doesn't generate metrics, so we ignore the argument. } - virtual void Handle(InboundCallPtr incoming) override; + void Handle(InboundCallPtr incoming) override; std::string service_name() const override { return kFullServiceName; } static std::string static_service_name() { return kFullServiceName; } @@ -122,6 +122,7 @@ extern const MessengerOptions kDefaultServerMessengerOptions; struct TestServerOptions { MessengerOptions messenger_options = kDefaultServerMessengerOptions; + std::shared_ptr messenger; size_t n_worker_threads = 3; Endpoint endpoint; }; @@ -129,7 +130,7 @@ struct TestServerOptions { class TestServer { public: TestServer(std::unique_ptr service, - const scoped_refptr& metric_entity, + const std::shared_ptr& messenger, const TestServerOptions& options = TestServerOptions()); TestServer(TestServer&& rhs) @@ -165,6 +166,10 @@ class RpcTestBase : public YBTest { const string &name, const MessengerOptions& options = kDefaultClientMessengerOptions); + MessengerBuilder CreateMessengerBuilder( + const string &name, + const MessengerOptions& options = kDefaultClientMessengerOptions); + CHECKED_STATUS DoTestSyncCall(Proxy* proxy, const RemoteMethod *method); void DoTestSidecar(Proxy* proxy, @@ -176,6 +181,7 @@ class RpcTestBase : public YBTest { const TestServerOptions& options = TestServerOptions()); void StartTestServer(Endpoint* server_endpoint, const TestServerOptions& options = TestServerOptions()); + TestServer StartTestServer(const std::string& name, const IpAddress& address); void StartTestServerWithGeneratedCode(HostPort* server_hostport, const TestServerOptions& options = TestServerOptions()); diff --git a/src/yb/rpc/rpc_context.cc b/src/yb/rpc/rpc_context.cc index a012c383fe64..73bcf9ca8dc4 100644 --- a/src/yb/rpc/rpc_context.cc +++ b/src/yb/rpc/rpc_context.cc @@ -217,12 +217,7 @@ void RpcContext::Panic(const char* filepath, int line_number, const string& mess void RpcContext::CloseConnection() { auto connection = call_->connection(); connection->reactor()->ScheduleReactorFunctor([connection](Reactor*) { - if (connection->socket()->GetFd() >= 0) { - auto status = connection->socket()->Shutdown(true, true); - if (!status.ok()) { - LOG(INFO) << "Failed to shutdown socket: " << status.ToString(); - } - } + connection->Close(); }); } diff --git a/src/yb/rpc/rpc_fwd.h b/src/yb/rpc/rpc_fwd.h index 73f3d56a1d57..feefedcdf152 100644 --- a/src/yb/rpc/rpc_fwd.h +++ b/src/yb/rpc/rpc_fwd.h @@ -23,6 +23,10 @@ #include "yb/gutil/ref_counted.h" +#include "yb/rpc/rpc_introspection.pb.h" + +#include "yb/util/strongly_typed_bool.h" + namespace boost { namespace asio { @@ -46,13 +50,16 @@ class GrowableBufferAllocator; class Messenger; class Proxy; class ProxyCache; +class Reactor; class ReactorTask; +class RpcConnectionPB; class RpcContext; class RpcController; class RpcService; class Rpcs; -class Scheduler; +class Protocol; class ServicePoolImpl; +class Stream; class ThreadPool; struct RpcMethodMetrics; @@ -69,6 +76,9 @@ typedef std::shared_ptr InboundCallPtr; class OutboundCall; typedef std::shared_ptr OutboundCallPtr; +class OutboundData; +typedef std::shared_ptr OutboundDataPtr; + class ServerEventList; typedef std::shared_ptr ServerEventListPtr; @@ -86,6 +96,11 @@ typedef std::chrono::steady_clock::time_point SteadyTimePoint; class ConnectionContextFactory; typedef std::shared_ptr ConnectionContextFactoryPtr; +class StreamFactory; +typedef std::shared_ptr StreamFactoryPtr; + +YB_STRONGLY_TYPED_BOOL(ReadBufferFull); + } // namespace rpc } // namespace yb diff --git a/src/yb/rpc/rpc_stub-test.cc b/src/yb/rpc/rpc_stub-test.cc index 28f5ec186ac0..6b0df7865924 100644 --- a/src/yb/rpc/rpc_stub-test.cc +++ b/src/yb/rpc/rpc_stub-test.cc @@ -110,13 +110,6 @@ class RpcStubTest : public RpcTestBase { ASSERT_EQ(30, resp.result()); } - TestServer StartTestServer(const std::string& name, const IpAddress& address) { - std::unique_ptr service(CreateCalculatorService(metric_entity(), name)); - TestServerOptions options; - options.endpoint = Endpoint(address, 0); - return TestServer(std::move(service), metric_entity(), options); - } - std::unique_ptr CreateCalculatorProxy(const Endpoint& remote) { auto messenger = CreateMessenger("Client"); IpAddress local_address = remote.address().is_v6() @@ -215,7 +208,7 @@ TEST_F(RpcStubTest, TestIncoherence) { // IO threads can deal with read/write calls that don't succeed // in sending the entire data in one go. TEST_F(RpcStubTest, TestBigCallData) { - constexpr int kNumSentAtOnce = 20; + constexpr int kNumSentAtOnce = 1; constexpr size_t kMessageSize = NonTsanVsTsan(32_MB, 4_MB); CalculatorServiceProxy p(proxy_cache_.get(), server_hostport_); diff --git a/src/yb/rpc/rpc_with_call_id.cc b/src/yb/rpc/rpc_with_call_id.cc index 01a8f1caf91f..0f922447ea6f 100644 --- a/src/yb/rpc/rpc_with_call_id.cc +++ b/src/yb/rpc/rpc_with_call_id.cc @@ -19,6 +19,8 @@ #include "yb/rpc/reactor.h" #include "yb/rpc/rpc_introspection.pb.h" +#include "yb/util/string_util.h" + namespace yb { namespace rpc { @@ -33,12 +35,18 @@ void ConnectionContextWithCallId::DumpPB(const DumpRunningRpcsRequestPB& req, } } -bool ConnectionContextWithCallId::Idle() { - return calls_being_handled_.empty(); -} +bool ConnectionContextWithCallId::Idle(std::string* reason_not_idle) { + if (calls_being_handled_.empty()) { + return true; + } + + if (reason_not_idle) { + AppendWithSeparator( + Format("$0 calls being handled: $1", calls_being_handled_.size(), calls_being_handled_), + reason_not_idle); + } -std::string ConnectionContextWithCallId::ReasonNotIdle() { - return Format("$0 calls being handled: $1", calls_being_handled_.size(), calls_being_handled_); + return false; } Status ConnectionContextWithCallId::Store(InboundCall* call) { diff --git a/src/yb/rpc/rpc_with_call_id.h b/src/yb/rpc/rpc_with_call_id.h index cdfe4d0b9c58..7dbe91e78fc2 100644 --- a/src/yb/rpc/rpc_with_call_id.h +++ b/src/yb/rpc/rpc_with_call_id.h @@ -45,8 +45,7 @@ class ConnectionContextWithCallId : public ConnectionContextBase { void ListenIdle(IdleListener listener) override { idle_listener_ = std::move(listener); } void Shutdown(const Status& status) override; - bool Idle() override; - std::string ReasonNotIdle() override; + bool Idle(std::string* reason_not_idle = nullptr) override; void CallProcessed(InboundCall* call); void QueueResponse(const ConnectionPtr& conn, InboundCallPtr call) override; diff --git a/src/yb/rpc/rpc_with_queue.cc b/src/yb/rpc/rpc_with_queue.cc index 2cfa413ce61b..b3a89cb10952 100644 --- a/src/yb/rpc/rpc_with_queue.cc +++ b/src/yb/rpc/rpc_with_queue.cc @@ -20,6 +20,8 @@ #include "yb/rpc/reactor.h" #include "yb/rpc/rpc_introspection.pb.h" +#include "yb/util/string_util.h" + namespace yb { namespace rpc { @@ -41,12 +43,16 @@ void ConnectionContextWithQueue::DumpPB(const DumpRunningRpcsRequestPB& req, } } -bool ConnectionContextWithQueue::Idle() { - return calls_queue_.empty(); -} +bool ConnectionContextWithQueue::Idle(std::string* reason_not_idle) { + if (calls_queue_.empty()) { + return true; + } + + if (reason_not_idle) { + AppendWithSeparator(Format("$0 calls", calls_queue_.size()), reason_not_idle); + } -string ConnectionContextWithQueue::ReasonNotIdle() { - return Format("$0 calls", calls_queue_.size()); + return false; } void ConnectionContextWithQueue::Enqueue(std::shared_ptr call) { diff --git a/src/yb/rpc/rpc_with_queue.h b/src/yb/rpc/rpc_with_queue.h index 7bb5c02baa81..5e8929792f56 100644 --- a/src/yb/rpc/rpc_with_queue.h +++ b/src/yb/rpc/rpc_with_queue.h @@ -85,8 +85,7 @@ class ConnectionContextWithQueue : public ConnectionContextBase { private: void AssignConnection(const ConnectionPtr& conn) override; void DumpPB(const DumpRunningRpcsRequestPB& req, RpcConnectionPB* resp) override; - bool Idle() override; - std::string ReasonNotIdle() override; + bool Idle(std::string* reason_not_idle = nullptr) override; void QueueResponse(const ConnectionPtr& conn, InboundCallPtr call) override; void ListenIdle(IdleListener listener) override { idle_listener_ = std::move(listener); } void Shutdown(const Status& status) override; diff --git a/src/yb/rpc/stream.h b/src/yb/rpc/stream.h new file mode 100644 index 000000000000..b1143b0d6da3 --- /dev/null +++ b/src/yb/rpc/stream.h @@ -0,0 +1,95 @@ +// Copyright (c) YugaByte, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations +// under the License. +// + +#ifndef YB_RPC_STREAM_H +#define YB_RPC_STREAM_H + +#include "yb/rpc/rpc_fwd.h" + +#include "yb/util/net/net_fwd.h" +#include "yb/util/net/socket.h" +#include "yb/util/result.h" +#include "yb/util/status.h" + +namespace ev { + +struct loop_ref; + +} + +namespace yb { +namespace rpc { + +class StreamContext { + public: + virtual void UpdateLastActivity() = 0; + virtual void Transferred(const OutboundDataPtr& data, const Status& status) = 0; + virtual void Destroy(const Status& status) = 0; + virtual void Connected() = 0; + virtual Result ProcessReceived(const IoVecs& data, ReadBufferFull read_buffer_full) = 0; + + protected: + ~StreamContext() {} +}; + +class Stream { + public: + virtual CHECKED_STATUS Start(bool connect, ev::loop_ref* loop, StreamContext* context) = 0; + virtual void Close() = 0; + virtual void Shutdown(const Status& status) = 0; + virtual void Send(OutboundDataPtr data) = 0; + virtual CHECKED_STATUS TryWrite() = 0; + virtual void ParseReceived() = 0; + + virtual bool Idle(std::string* reason_not_idle) = 0; + virtual bool IsConnected() = 0; + virtual void DumpPB(const DumpRunningRpcsRequestPB& req, RpcConnectionPB* resp) = 0; + + // The address of the remote end of the connection. + virtual const Endpoint& Remote() = 0; + + // The address of the local end of the connection. + virtual const Endpoint& Local() = 0; + + virtual const Protocol* GetProtocol() = 0; + + virtual ~Stream() {} +}; + +class StreamFactory { + public: + virtual std::unique_ptr Create( + const Endpoint& remote, Socket socket, GrowableBufferAllocator* allocator, size_t limit) = 0; + + virtual ~StreamFactory() {} +}; + +class Protocol { + public: + explicit Protocol(const std::string& id) : id_(id) {} + + Protocol(const Protocol& schema) = delete; + void operator=(const Protocol& schema) = delete; + + const std::string& ToString() const { return id_; } + + const std::string& id() const { return id_; } + + private: + std::string id_; +}; + +} // namespace rpc +} // namespace yb + +#endif // YB_RPC_STREAM_H diff --git a/src/yb/rpc/tcp_stream.cc b/src/yb/rpc/tcp_stream.cc new file mode 100644 index 000000000000..0ef798818cee --- /dev/null +++ b/src/yb/rpc/tcp_stream.cc @@ -0,0 +1,361 @@ +// Copyright (c) YugaByte, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations +// under the License. +// + +#include "yb/rpc/tcp_stream.h" + +#include "yb/rpc/outbound_data.h" + +#include "yb/util/logging.h" +#include "yb/util/string_util.h" + +using namespace std::literals; + +DECLARE_uint64(rpc_connection_timeout_ms); + +namespace yb { +namespace rpc { + +TcpStream::TcpStream( + const Endpoint& remote, Socket socket, GrowableBufferAllocator* allocator, size_t limit) + : socket_(std::move(socket)), + remote_(remote), + read_buffer_(allocator, limit) { +} + +TcpStream::~TcpStream() { + // Must clear the outbound_transfers_ list before deleting. + CHECK(sending_.empty()) << ToString(); + + // It's crucial that the stream is Shutdown first -- otherwise + // our destructor will end up calling io_.stop() + // from a possibly non-reactor thread context. This can then make all + // hell break loose with libev. + CHECK(!is_epoll_registered_) << ToString(); +} + +Status TcpStream::Start(bool connect, ev::loop_ref* loop, StreamContext* context) { + context_ = context; + connected_ = !connect; + + RETURN_NOT_OK(socket_.SetNoDelay(true)); + RETURN_NOT_OK(socket_.SetSendTimeout(FLAGS_rpc_connection_timeout_ms * 1ms)); + RETURN_NOT_OK(socket_.SetRecvTimeout(FLAGS_rpc_connection_timeout_ms * 1ms)); + + if (connect) { + auto status = socket_.Connect(remote_); + if (!status.ok() && !Socket::IsTemporarySocketError(status)) { + LOG_WITH_PREFIX(WARNING) << "Connect failed: " << status; + return status; + } + } + RETURN_NOT_OK(socket_.GetSocketAddress(&local_)); + log_prefix_.clear(); + + io_.set(*loop); + io_.set(this); + int events = ev::READ | (!connected_ ? ev::WRITE : 0); + io_.start(socket_.GetFd(), events); + + DVLOG_WITH_PREFIX(4) << "Starting, listen events: " << events << ", fd: " << socket_.GetFd(); + + is_epoll_registered_ = true; + + if (connected_) { + context_->Connected(); + } + + return Status::OK(); +} + +void TcpStream::Close() { + if (socket_.GetFd() >= 0) { + auto status = socket_.Shutdown(true, true); + LOG_IF(INFO, !status.ok()) << "Failed to shutdown socket: " << status; + } +} + +void TcpStream::Shutdown(const Status& status) { + ClearSending(status); + + if (!read_buffer_.empty()) { + LOG_WITH_PREFIX(WARNING) << "Shutting down with pending inbound data (" + << read_buffer_ << ", status = " << status << ")"; + } + + io_.stop(); + is_epoll_registered_ = false; + WARN_NOT_OK(socket_.Close(), "Error closing socket"); +} + +Status TcpStream::TryWrite() { + auto result = DoWrite(); + if (result.ok()) { + UpdateEvents(); + } + return result; +} + +Status TcpStream::DoWrite() { + if (!connected_ || waiting_write_ready_ || !is_epoll_registered_) { + return Status::OK(); + } + + // If we weren't waiting write to be ready, we could try to write data to socket. + while (!sending_.empty()) { + const size_t kMaxIov = 16; + iovec iov[kMaxIov]; + const int iov_len = static_cast(std::min(kMaxIov, sending_.size())); + size_t offset = send_position_; + for (auto i = 0; i != iov_len; ++i) { + iov[i].iov_base = sending_[i].data() + offset; + iov[i].iov_len = sending_[i].size() - offset; + offset = 0; + } + + context_->UpdateLastActivity(); + int32_t written = 0; + + auto status = socket_.Writev(iov, iov_len, &written); + if (PREDICT_FALSE(!status.ok())) { + if (!Socket::IsTemporarySocketError(status)) { + YB_LOG_WITH_PREFIX_EVERY_N(WARNING, 50) << "Send failed: " << status; + return status; + } else { + return Status::OK(); + } + } + + send_position_ += written; + while (!sending_.empty() && send_position_ >= sending_.front().size()) { + auto data = sending_outbound_datas_.front(); + send_position_ -= sending_.front().size(); + sending_.pop_front(); + sending_outbound_datas_.pop_front(); + if (data) { + context_->Transferred(data, Status::OK()); + } + } + } + + return Status::OK(); +} + +void TcpStream::Handler(ev::io& watcher, int revents) { // NOLINT + DVLOG_WITH_PREFIX(3) << "Handler(revents=" << revents << ")"; + auto status = Status::OK(); + if (revents & ev::ERROR) { + status = STATUS(NetworkError, ToString() + ": Handler encountered an error"); + } + + if (status.ok() && (revents & ev::READ)) { + status = ReadHandler(); + } + + if (status.ok() && (revents & ev::WRITE)) { + bool just_connected = !connected_; + if (just_connected) { + connected_ = true; + context_->Connected(); + } + status = WriteHandler(just_connected); + } + + if (status.ok()) { + UpdateEvents(); + } else { + context_->Destroy(status); + } +} + +void TcpStream::UpdateEvents() { + int events = 0; + if (!read_buffer_full_) { + events |= ev::READ; + } + waiting_write_ready_ = !sending_.empty() || !connected_; + if (waiting_write_ready_) { + events |= ev::WRITE; + } + if (events) { + io_.set(events); + } +} + +Status TcpStream::ReadHandler() { + context_->UpdateLastActivity(); + + for (;;) { + auto received = Receive(); + if (PREDICT_FALSE(!received.ok())) { + if (received.status().error_code() == ESHUTDOWN) { + VLOG_WITH_PREFIX(1) << "Shut down by remote end."; + } else { + YB_LOG_WITH_PREFIX_EVERY_N(INFO, 50) << " Recv failed: " << received; + } + return received.status(); + } + // Exit the loop if we did not receive anything. + if (!received.get()) { + return Status::OK(); + } + // If we were not able to process next call exit loop. + // If status is ok, it means that we just do not have enough data to process yet. + auto continue_receiving = TryProcessReceived(); + if (!continue_receiving.ok()) { + return continue_receiving.status(); + } + if (!continue_receiving.get()) { + return Status::OK(); + } + } +} + +Result TcpStream::Receive() { + auto iov = read_buffer_.PrepareAppend(); + if (!iov.ok()) { + if (iov.status().IsBusy()) { + read_buffer_full_ = true; + return false; + } + return iov.status(); + } + read_buffer_full_ = false; + + auto nread = socket_.Recvv(iov.get_ptr()); + if (!nread.ok()) { + if (Socket::IsTemporarySocketError(nread.status())) { + return false; + } + return nread.status(); + } + + read_buffer_.DataAppended(*nread); + return *nread != 0; +} + +void TcpStream::ParseReceived() { + auto result = TryProcessReceived(); + if (!result.ok()) { + context_->Destroy(result.status()); + return; + } + if (read_buffer_full_) { + read_buffer_full_ = false; + UpdateEvents(); + } +} + +Result TcpStream::TryProcessReceived() { + if (read_buffer_.empty()) { + return false; + } + + auto consumed = VERIFY_RESULT(context_->ProcessReceived( + read_buffer_.AppendedVecs(), ReadBufferFull(read_buffer_.full()))); + + read_buffer_.Consume(consumed); + return true; +} + +Status TcpStream::WriteHandler(bool just_connected) { + waiting_write_ready_ = false; + if (sending_.empty()) { + LOG_IF_WITH_PREFIX(WARNING, !just_connected) << + "Got a ready-to-write callback, but there is nothing to write."; + return Status::OK(); + } + + return DoWrite(); +} + +std::string TcpStream::ToString() const { + return Format("{ local: $0 remote: $1 }", local_, remote_); +} + +const std::string& TcpStream::LogPrefix() const { + if (log_prefix_.empty()) { + log_prefix_ = ToString() + ": "; + } + return log_prefix_; +} + +bool TcpStream::Idle(std::string* reason_not_idle) { + bool result = true; + // Check if we're in the middle of receiving something. + if (!read_buffer_.empty()) { + if (reason_not_idle) { + AppendWithSeparator("read buffer not empty", reason_not_idle); + } + result = false; + } + + // Check if we still need to send something. + if (!sending_.empty()) { + if (reason_not_idle) { + AppendWithSeparator("still sending", reason_not_idle); + } + result = false; + } + + return result; +} + +void TcpStream::ClearSending(const Status& status) { + // Clear any outbound transfers. + for (auto& data : sending_outbound_datas_) { + if (data) { + context_->Transferred(data, status); + } + } + sending_outbound_datas_.clear(); + sending_.clear(); +} + +void TcpStream::Send(OutboundDataPtr data) { + // Serialize the actual bytes to be put on the wire. + data->Serialize(&sending_); + + sending_outbound_datas_.resize(sending_.size()); + sending_outbound_datas_.back() = std::move(data); +} + +void TcpStream::DumpPB(const DumpRunningRpcsRequestPB& req, RpcConnectionPB* resp) { + auto call_in_flight = resp->add_calls_in_flight(); + for (auto& data : sending_outbound_datas_) { + if (data && data->DumpPB(req, call_in_flight)) { + call_in_flight = resp->add_calls_in_flight(); + } + } + resp->mutable_calls_in_flight()->DeleteSubrange(resp->calls_in_flight_size() - 1, 1); +} + +const Protocol* TcpStream::StaticProtocol() { + static Protocol result("tcp"); + return &result; +} + +StreamFactoryPtr TcpStream::Factory() { + class TcpStreamFactory : public StreamFactory { + private: + std::unique_ptr Create( + const Endpoint& remote, Socket socket, GrowableBufferAllocator* allocator, size_t limit) + override { + return std::make_unique(remote, std::move(socket), allocator, limit); + } + }; + + return std::make_shared(); +} + +} // namespace rpc +} // namespace yb diff --git a/src/yb/rpc/tcp_stream.h b/src/yb/rpc/tcp_stream.h new file mode 100644 index 000000000000..9fd8e6fc46fc --- /dev/null +++ b/src/yb/rpc/tcp_stream.h @@ -0,0 +1,115 @@ +// Copyright (c) YugaByte, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations +// under the License. +// + +#ifndef YB_RPC_TCP_STREAM_H +#define YB_RPC_TCP_STREAM_H + +#include + +#include "yb/rpc/growable_buffer.h" +#include "yb/rpc/stream.h" + +#include "yb/util/net/socket.h" +#include "yb/util/ref_cnt_buffer.h" + +namespace yb { +namespace rpc { + +class TcpStream : public Stream { + public: + TcpStream( + const Endpoint& remote, Socket socket, GrowableBufferAllocator* allocator, size_t limit); + ~TcpStream(); + + Socket* socket() { return &socket_; } + + std::string ToString() const; + + static const rpc::Protocol* StaticProtocol(); + static StreamFactoryPtr Factory(); + + private: + CHECKED_STATUS Start(bool connect, ev::loop_ref* loop, StreamContext* context) override; + void Close() override; + void Shutdown(const Status& status) override; + void Send(OutboundDataPtr data) override; + CHECKED_STATUS TryWrite() override; + + bool Idle(std::string* reason_not_idle) override; + bool IsConnected() override { return connected_; } + void DumpPB(const DumpRunningRpcsRequestPB& req, RpcConnectionPB* resp) override; + + const Endpoint& Remote() override { return remote_; } + const Endpoint& Local() override { return local_; } + + const Protocol* GetProtocol() override { + return StaticProtocol(); + } + + void ParseReceived() override; + + CHECKED_STATUS DoWrite(); + void HandleOutcome(const Status& status, bool enqueue); + void ClearSending(const Status& status); + + void Handler(ev::io& watcher, int revents); // NOLINT + CHECKED_STATUS ReadHandler(); + CHECKED_STATUS WriteHandler(bool just_connected); + + Result Receive(); + // Try to parse received data and process it. + Result TryProcessReceived(); + + // Updates listening events. + void UpdateEvents(); + + const std::string& LogPrefix() const; + + // The socket we're communicating on. + Socket socket_; + + // The remote address we're talking from. + Endpoint local_; + + // The remote address we're talking to. + const Endpoint remote_; + + StreamContext* context_; + + mutable std::string log_prefix_; + + // Notifies us when our socket is readable or writable. + ev::io io_; + + // Set to true when the connection is registered on a loop. + // This is used for a sanity check in the destructor that we are properly + // un-registered before shutting down. + bool is_epoll_registered_ = false; + + bool connected_ = false; + + // Data received on this connection that has not been processed yet. + GrowableBuffer read_buffer_; + bool read_buffer_full_ = false; + + // sending_* contain bytes and calls we are currently sending to socket + std::deque sending_; + std::deque sending_outbound_datas_; + size_t send_position_ = 0; + bool waiting_write_ready_ = false; +}; + +} // namespace rpc +} // namespace yb + +#endif // YB_RPC_TCP_STREAM_H diff --git a/src/yb/util/logging.h b/src/yb/util/logging.h index 2c9b9f82cfc1..1d9884ee4935 100644 --- a/src/yb/util/logging.h +++ b/src/yb/util/logging.h @@ -77,6 +77,9 @@ __FILE__, __LINE__, google::GLOG_ ## severity, num_suppressed, \ &google::LogMessage::SendToLog).stream() +#define YB_LOG_WITH_PREFIX_EVERY_N_SECS(severity, n_secs) \ + YB_LOG_EVERY_N_SECS(severify, n, secs) << LogPrefix() + namespace yb { enum PRIVATE_ThrottleMsg {THROTTLE_MSG}; } // namespace yb @@ -135,6 +138,8 @@ enum PRIVATE_ThrottleMsg {THROTTLE_MSG}; INVALID_REQUESTED_LOG_SEVERITY); \ YB_SOME_KIND_OF_LOG_EVERY_N(severity, (n), google::LogMessage::SendToLog) +#define YB_LOG_WITH_PREFIX_EVERY_N(severity, n) YB_LOG_EVERY_N(severity, n) << LogPrefix() + #define YB_SYSLOG_EVERY_N(severity, n) \ YB_SOME_KIND_OF_LOG_EVERY_N(severity, (n), google::LogMessage::SendToSyslogAndLog) @@ -252,7 +257,8 @@ std::ostream& operator<<(std::ostream &os, const PRIVATE_ThrottleMsg&); // Same as the above, but obtain the lock. #define LOG_WITH_PREFIX(severity) LOG(severity) << LogPrefix() -#define VLOG_WITH_PREFIX(verboselevel) LOG_IF(INFO, VLOG_IS_ON(verboselevel)) << LogPrefix() +#define VLOG_WITH_PREFIX(verboselevel) VLOG(verboselevel) << LogPrefix() +#define DVLOG_WITH_PREFIX(verboselevel) DVLOG(verboselevel) << LogPrefix() #define LOG_IF_WITH_PREFIX(severity, condition) LOG_IF(severity, condition) << LogPrefix() // DCHECK_ONLY_NOTNULL is like DCHECK_NOTNULL, but does not result in an unused expression in diff --git a/src/yb/util/result.h b/src/yb/util/result.h index be8c776b01a5..157791ded820 100644 --- a/src/yb/util/result.h +++ b/src/yb/util/result.h @@ -363,6 +363,10 @@ ResultToStatusAdaptor ResultToStatus(const Functor& functor) { #define ASSERT_RESULT(expr) \ __extension__ ({ auto&& __result = (expr); ASSERT_OK(__result); std::move(*__result); }) +// Asserts that result is ok, extracts result value is case of success. +#define EXPECT_RESULT(expr) \ + __extension__ ({ auto&& __result = (expr); EXPECT_OK(__result); std::move(*__result); }) + // Asserts that result is ok, extracts result value is case of success. #define ASSERT_RESULT_FAST(expr) \ __extension__ ({ auto&& __result = (expr); ASSERT_OK_FAST(__result); std::move(*__result); }) diff --git a/src/yb/util/strongly_typed_bool.h b/src/yb/util/strongly_typed_bool.h index 96f97f349866..1443d1faf440 100644 --- a/src/yb/util/strongly_typed_bool.h +++ b/src/yb/util/strongly_typed_bool.h @@ -16,6 +16,8 @@ #include +#include + // A "strongly-typed boolean" tool. This is needed to prevent passing the wrong boolean as a // function parameter, and to make callsites more readable by enforcing that MyBooleanType::kTrue or // MyBooleanType::kFalse is specified instead of kTrue, kFalse. Conversion from strongly-typed bools @@ -40,7 +42,8 @@ class StronglyTypedBool { // These operators return regular bools so that it is easy to use strongly-typed bools in logical // expressions. operator bool() const { return value_; } - bool operator!() const { return StronglyTypedBool(!value_); } + StronglyTypedBool operator!() const { return StronglyTypedBool(!value_); } + bool get() const { return value_; } private: bool value_; diff --git a/src/yb/util/tostring.h b/src/yb/util/tostring.h index 66d76e91f3e8..2e9a3badbf38 100644 --- a/src/yb/util/tostring.h +++ b/src/yb/util/tostring.h @@ -64,7 +64,8 @@ namespace yb { typedef int Yes; \ typedef struct { Yes array[2]; } No; \ typedef typename std::remove_reference::type StrippedT; \ - template static Yes Test(decltype(static_cast(nullptr)->function())*); \ + template static Yes Test(typename std::remove_reference< \ + decltype(static_cast(nullptr)->function())>::type*); \ template static No Test(...); \ static const bool value = sizeof(Yes) == sizeof(Test(nullptr)); \ }; @@ -135,7 +136,7 @@ class PointerToString { }; template <> -class PointerToString { +class PointerToString { public: static std::string Apply(const void* ptr) { if (ptr) { @@ -150,6 +151,14 @@ class PointerToString { } }; +template <> +class PointerToString { + public: + static std::string Apply(const void* ptr) { + return PointerToString::Apply(ptr); + } +}; + // This class is used to determine whether T is similar to pointer. // We suppose that if class provides * and -> operators so it is pointer. template