diff --git a/libraries/fc b/libraries/fc index 7a4758671f..6d8d0307a2 160000 --- a/libraries/fc +++ b/libraries/fc @@ -1 +1 @@ -Subproject commit 7a4758671ff3635a6493ea5a6206b570beda976a +Subproject commit 6d8d0307a2058665fea242bb425f9dc014cbeaa3 diff --git a/libraries/net/include/graphene/net/message.hpp b/libraries/net/include/graphene/net/message.hpp index cfff380f9b..6d254b6120 100644 --- a/libraries/net/include/graphene/net/message.hpp +++ b/libraries/net/include/graphene/net/message.hpp @@ -44,6 +44,11 @@ namespace graphene { namespace net { { boost::endian::little_uint32_buf_t size; // number of bytes in message, capped at MAX_MESSAGE_SIZE boost::endian::little_uint32_buf_t msg_type; // every channel gets a 16 bit message type specifier + message_header() + { + size = 0; + msg_type = 0; + } }; typedef fc::uint160_t message_hash_type; diff --git a/libraries/net/include/graphene/net/peer_connection.hpp b/libraries/net/include/graphene/net/peer_connection.hpp index dd9d6eb774..a00e43dcbf 100644 --- a/libraries/net/include/graphene/net/peer_connection.hpp +++ b/libraries/net/include/graphene/net/peer_connection.hpp @@ -164,25 +164,25 @@ namespace graphene { namespace net }; - size_t _total_queued_messages_size; + size_t _total_queued_messages_size = 0; std::queue, std::list > > _queued_messages; fc::future _send_queued_messages_done; public: fc::time_point connection_initiation_time; fc::time_point connection_closed_time; fc::time_point connection_terminated_time; - peer_connection_direction direction; + peer_connection_direction direction = peer_connection_direction::unknown; //connection_state state; - firewalled_state is_firewalled; + firewalled_state is_firewalled = firewalled_state::unknown; fc::microseconds clock_offset; fc::microseconds round_trip_delay; - our_connection_state our_state; - bool they_have_requested_close; - their_connection_state their_state; - bool we_have_requested_close; + our_connection_state our_state = our_connection_state::disconnected; + bool they_have_requested_close = false; + their_connection_state their_state = their_connection_state::disconnected; + bool we_have_requested_close = false; - connection_negotiation_status negotiation_status; + connection_negotiation_status negotiation_status = connection_negotiation_status::disconnected; fc::oexception connection_closed_error; fc::time_point get_connection_time()const { return _message_connection.get_connection_time(); } @@ -197,7 +197,7 @@ namespace graphene { namespace net * from the user_data field of the hello, or if none is present it will be filled with a * copy of node_public_key */ node_id_t node_id; - uint32_t core_protocol_version; + uint32_t core_protocol_version = 0; std::string user_agent; fc::optional graphene_git_revision_sha; fc::optional graphene_git_revision_unix_timestamp; @@ -210,8 +210,8 @@ namespace graphene { namespace net // its hello message. For outbound, they record what we sent the peer // in our hello message fc::ip::address inbound_address; - uint16_t inbound_port; - uint16_t outbound_port; + uint16_t inbound_port = 0; + uint16_t outbound_port = 0; /// @} typedef std::unordered_map item_to_time_map_type; @@ -220,15 +220,15 @@ namespace graphene { namespace net /// @{ boost::container::deque ids_of_items_to_get; /// id of items in the blockchain that this peer has told us about std::set ids_of_items_being_processed; /// list of all items this peer has offered use that we've already handed to the client but the client hasn't finished processing - uint32_t number_of_unfetched_item_ids; /// number of items in the blockchain that follow ids_of_items_to_get but the peer hasn't yet told us their ids - bool peer_needs_sync_items_from_us; - bool we_need_sync_items_from_peer; + uint32_t number_of_unfetched_item_ids = 0; /// number of items in the blockchain that follow ids_of_items_to_get but the peer hasn't yet told us their ids + bool peer_needs_sync_items_from_us = false; + bool we_need_sync_items_from_peer = false; fc::optional, fc::time_point> > item_ids_requested_from_peer; /// we check this to detect a timed-out request and in busy() fc::time_point last_sync_item_received_time; /// the time we received the last sync item or the time we sent the last batch of sync item requests to this peer std::set sync_items_requested_from_peer; /// ids of blocks we've requested from this peer during sync. fetch from another peer if this peer disconnects item_hash_t last_block_delegate_has_seen; /// the hash of the last block this peer has told us about that the peer knows fc::time_point_sec last_block_time_delegate_has_seen; - bool inhibit_fetching_sync_blocks; + bool inhibit_fetching_sync_blocks = false; /// @} /// non-synchronization state data @@ -258,18 +258,17 @@ namespace graphene { namespace net // blockchain catch up fc::time_point transaction_fetching_inhibited_until; - uint32_t last_known_fork_block_number; + uint32_t last_known_fork_block_number = 0; fc::future accept_or_connect_task_done; - firewall_check_state_data *firewall_check_state; -#ifndef NDEBUG + firewall_check_state_data *firewall_check_state = nullptr; private: - fc::thread* _thread; - unsigned _send_message_queue_tasks_running; // temporary debugging +#ifndef NDEBUG + fc::thread* _thread = nullptr; + unsigned _send_message_queue_tasks_running = 0; // temporary debugging #endif - bool _currently_handling_message; // true while we're in the middle of handling a message from the remote system - private: + bool _currently_handling_message = false; // true while we're in the middle of handling a message from the remote system peer_connection(peer_connection_delegate* delegate); void destroy(); public: diff --git a/libraries/net/include/graphene/net/stcp_socket.hpp b/libraries/net/include/graphene/net/stcp_socket.hpp index fb4bfbc72b..ff0ea337ed 100644 --- a/libraries/net/include/graphene/net/stcp_socket.hpp +++ b/libraries/net/include/graphene/net/stcp_socket.hpp @@ -26,8 +26,6 @@ #include #include -#include - namespace graphene { namespace net { /** diff --git a/libraries/net/message_oriented_connection.cpp b/libraries/net/message_oriented_connection.cpp index de3cf0875b..b62651fa76 100644 --- a/libraries/net/message_oriented_connection.cpp +++ b/libraries/net/message_oriented_connection.cpp @@ -32,6 +32,8 @@ #include #include +#include + #ifdef DEFAULT_LOGGER # undef DEFAULT_LOGGER #endif @@ -52,6 +54,7 @@ namespace graphene { namespace net { message_oriented_connection* _self; message_oriented_connection_delegate *_delegate; stcp_socket _sock; + fc::promise::ptr _ready_for_sending; fc::future _read_loop_done; uint64_t _bytes_received; uint64_t _bytes_sent; @@ -60,8 +63,8 @@ namespace graphene { namespace net { fc::time_point _last_message_received_time; fc::time_point _last_message_sent_time; - bool _send_message_in_progress; - + std::atomic_bool _send_message_in_progress; + std::atomic_bool _read_loop_in_progress; #ifndef NDEBUG fc::thread* _thread; #endif @@ -95,9 +98,11 @@ namespace graphene { namespace net { message_oriented_connection_delegate* delegate) : _self(self), _delegate(delegate), + _ready_for_sending(fc::promise::create()), _bytes_received(0), _bytes_sent(0), - _send_message_in_progress(false) + _send_message_in_progress(false), + _read_loop_in_progress(false) #ifndef NDEBUG ,_thread(&fc::thread::current()) #endif @@ -121,6 +126,7 @@ namespace graphene { namespace net { _sock.accept(); assert(!_read_loop_done.valid()); // check to be sure we never launch two read loops _read_loop_done = fc::async([=](){ read_loop(); }, "message read_loop"); + _ready_for_sending->set_value(); } void message_oriented_connection_impl::connect_to(const fc::ip::endpoint& remote_endpoint) @@ -129,6 +135,7 @@ namespace graphene { namespace net { _sock.connect_to(remote_endpoint); assert(!_read_loop_done.valid()); // check to be sure we never launch two read loops _read_loop_done = fc::async([=](){ read_loop(); }, "message read_loop"); + _ready_for_sending->set_value(); } void message_oriented_connection_impl::bind(const fc::ip::endpoint& local_endpoint) @@ -137,6 +144,20 @@ namespace graphene { namespace net { _sock.bind(local_endpoint); } + class no_parallel_execution_guard final + { + std::atomic_bool* _flag; + public: + explicit no_parallel_execution_guard(std::atomic_bool* flag) : _flag(flag) + { + bool expected = false; + FC_ASSERT( flag->compare_exchange_strong( expected, true ), "Only one thread at time can visit it"); + } + ~no_parallel_execution_guard() + { + *_flag = false; + } + }; void message_oriented_connection_impl::read_loop() { @@ -145,6 +166,8 @@ namespace graphene { namespace net { const int LEFTOVER = BUFFER_SIZE - sizeof(message_header); static_assert(BUFFER_SIZE >= sizeof(message_header), "insufficient buffer"); + no_parallel_execution_guard guard( &_read_loop_in_progress ); + _connected_time = fc::time_point::now(); fc::oexception exception_to_rethrow; @@ -240,17 +263,8 @@ namespace graphene { namespace net { } send_message_scope_logger(remote_endpoint); #endif #endif - struct verify_no_send_in_progress { - bool& var; - verify_no_send_in_progress(bool& var) : var(var) - { - if (var) - elog("Error: two tasks are calling message_oriented_connection::send_message() at the same time"); - assert(!var); - var = true; - } - ~verify_no_send_in_progress() { var = false; } - } _verify_no_send_in_progress(_send_message_in_progress); + no_parallel_execution_guard guard( &_send_message_in_progress ); + _ready_for_sending->wait(); try { @@ -260,8 +274,11 @@ namespace graphene { namespace net { //pad the message we send to a multiple of 16 bytes size_t size_with_padding = 16 * ((size_of_message_and_header + 15) / 16); std::unique_ptr padded_message(new char[size_with_padding]); + memcpy(padded_message.get(), (char*)&message_to_send, sizeof(message_header)); memcpy(padded_message.get() + sizeof(message_header), message_to_send.data.data(), message_to_send.size.value() ); + char* padding_space = padded_message.get() + sizeof(message_header) + message_to_send.size.value(); + memset(padding_space, 0, size_with_padding - size_of_message_and_header); _sock.write(padded_message.get(), size_with_padding); _sock.flush(); _bytes_sent += size_with_padding; @@ -301,6 +318,7 @@ namespace graphene { namespace net { { wlog( "Exception thrown while canceling message_oriented_connection's read_loop, ignoring" ); } + _ready_for_sending->set_exception( std::make_shared() ); } uint64_t message_oriented_connection_impl::get_total_bytes_sent() const diff --git a/libraries/protocol/address.cpp b/libraries/protocol/address.cpp index a826289a1e..06e1946d1b 100644 --- a/libraries/protocol/address.cpp +++ b/libraries/protocol/address.cpp @@ -89,12 +89,12 @@ namespace graphene { namespace protocol { address::operator std::string()const { - std::array bin_addr; - static_assert( bin_addr.size() >= sizeof(addr) + 4, "address size mismatch" ); - memcpy( bin_addr.data(), addr.data(), sizeof(addr) ); + char bin_addr[24]; + static_assert( sizeof(bin_addr) >= sizeof(addr) + 4, "address size mismatch" ); + memcpy( bin_addr, addr.data(), sizeof(addr) ); auto checksum = fc::ripemd160::hash( addr.data(), sizeof(addr) ); - memcpy( bin_addr.data() + 20, (char*)&checksum._hash[0], 4 ); - return GRAPHENE_ADDRESS_PREFIX + fc::to_base58( bin_addr.data(), bin_addr.size() ); + memcpy( bin_addr + sizeof(addr), (char*)&checksum._hash[0], 4 ); + return GRAPHENE_ADDRESS_PREFIX + fc::to_base58( bin_addr, sizeof(bin_addr) ); } } } // namespace graphene::protocol diff --git a/libraries/protocol/include/graphene/protocol/pts_address.hpp b/libraries/protocol/include/graphene/protocol/pts_address.hpp index 8a30a44bfa..0a6e4cccea 100644 --- a/libraries/protocol/include/graphene/protocol/pts_address.hpp +++ b/libraries/protocol/include/graphene/protocol/pts_address.hpp @@ -49,7 +49,7 @@ namespace graphene { namespace protocol { operator std::string()const; ///< converts to base58 + checksum - std::array addr; ///< binary representation of address + std::array addr{}; ///< binary representation of address, 0-initialized }; inline bool operator == ( const pts_address& a, const pts_address& b ) { return a.addr == b.addr; }