Skip to content

Commit

Permalink
Merge pull request #2005 from bitshares/release
Browse files Browse the repository at this point in the history
Release -> master
  • Loading branch information
abitmore authored Sep 23, 2019
2 parents ca8e352 + 5f228ea commit 1976d20
Show file tree
Hide file tree
Showing 7 changed files with 65 additions and 45 deletions.
5 changes: 5 additions & 0 deletions libraries/net/include/graphene/net/message.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ namespace graphene { namespace net {
{
boost::endian::little_uint32_buf_t size; // number of bytes in message, capped at MAX_MESSAGE_SIZE
boost::endian::little_uint32_buf_t msg_type; // every channel gets a 16 bit message type specifier
message_header()
{
size = 0;
msg_type = 0;
}
};

typedef fc::uint160_t message_hash_type;
Expand Down
43 changes: 21 additions & 22 deletions libraries/net/include/graphene/net/peer_connection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -164,25 +164,25 @@ namespace graphene { namespace net
};


size_t _total_queued_messages_size;
size_t _total_queued_messages_size = 0;
std::queue<std::unique_ptr<queued_message>, std::list<std::unique_ptr<queued_message> > > _queued_messages;
fc::future<void> _send_queued_messages_done;
public:
fc::time_point connection_initiation_time;
fc::time_point connection_closed_time;
fc::time_point connection_terminated_time;
peer_connection_direction direction;
peer_connection_direction direction = peer_connection_direction::unknown;
//connection_state state;
firewalled_state is_firewalled;
firewalled_state is_firewalled = firewalled_state::unknown;
fc::microseconds clock_offset;
fc::microseconds round_trip_delay;

our_connection_state our_state;
bool they_have_requested_close;
their_connection_state their_state;
bool we_have_requested_close;
our_connection_state our_state = our_connection_state::disconnected;
bool they_have_requested_close = false;
their_connection_state their_state = their_connection_state::disconnected;
bool we_have_requested_close = false;

connection_negotiation_status negotiation_status;
connection_negotiation_status negotiation_status = connection_negotiation_status::disconnected;
fc::oexception connection_closed_error;

fc::time_point get_connection_time()const { return _message_connection.get_connection_time(); }
Expand All @@ -197,7 +197,7 @@ namespace graphene { namespace net
* from the user_data field of the hello, or if none is present it will be filled with a
* copy of node_public_key */
node_id_t node_id;
uint32_t core_protocol_version;
uint32_t core_protocol_version = 0;
std::string user_agent;
fc::optional<std::string> graphene_git_revision_sha;
fc::optional<fc::time_point_sec> graphene_git_revision_unix_timestamp;
Expand All @@ -210,8 +210,8 @@ namespace graphene { namespace net
// its hello message. For outbound, they record what we sent the peer
// in our hello message
fc::ip::address inbound_address;
uint16_t inbound_port;
uint16_t outbound_port;
uint16_t inbound_port = 0;
uint16_t outbound_port = 0;
/// @}

typedef std::unordered_map<item_id, fc::time_point> item_to_time_map_type;
Expand All @@ -220,15 +220,15 @@ namespace graphene { namespace net
/// @{
boost::container::deque<item_hash_t> ids_of_items_to_get; /// id of items in the blockchain that this peer has told us about
std::set<item_hash_t> ids_of_items_being_processed; /// list of all items this peer has offered use that we've already handed to the client but the client hasn't finished processing
uint32_t number_of_unfetched_item_ids; /// number of items in the blockchain that follow ids_of_items_to_get but the peer hasn't yet told us their ids
bool peer_needs_sync_items_from_us;
bool we_need_sync_items_from_peer;
uint32_t number_of_unfetched_item_ids = 0; /// number of items in the blockchain that follow ids_of_items_to_get but the peer hasn't yet told us their ids
bool peer_needs_sync_items_from_us = false;
bool we_need_sync_items_from_peer = false;
fc::optional<boost::tuple<std::vector<item_hash_t>, fc::time_point> > item_ids_requested_from_peer; /// we check this to detect a timed-out request and in busy()
fc::time_point last_sync_item_received_time; /// the time we received the last sync item or the time we sent the last batch of sync item requests to this peer
std::set<item_hash_t> sync_items_requested_from_peer; /// ids of blocks we've requested from this peer during sync. fetch from another peer if this peer disconnects
item_hash_t last_block_delegate_has_seen; /// the hash of the last block this peer has told us about that the peer knows
fc::time_point_sec last_block_time_delegate_has_seen;
bool inhibit_fetching_sync_blocks;
bool inhibit_fetching_sync_blocks = false;
/// @}

/// non-synchronization state data
Expand Down Expand Up @@ -258,18 +258,17 @@ namespace graphene { namespace net
// blockchain catch up
fc::time_point transaction_fetching_inhibited_until;

uint32_t last_known_fork_block_number;
uint32_t last_known_fork_block_number = 0;

fc::future<void> accept_or_connect_task_done;

firewall_check_state_data *firewall_check_state;
#ifndef NDEBUG
firewall_check_state_data *firewall_check_state = nullptr;
private:
fc::thread* _thread;
unsigned _send_message_queue_tasks_running; // temporary debugging
#ifndef NDEBUG
fc::thread* _thread = nullptr;
unsigned _send_message_queue_tasks_running = 0; // temporary debugging
#endif
bool _currently_handling_message; // true while we're in the middle of handling a message from the remote system
private:
bool _currently_handling_message = false; // true while we're in the middle of handling a message from the remote system
peer_connection(peer_connection_delegate* delegate);
void destroy();
public:
Expand Down
2 changes: 0 additions & 2 deletions libraries/net/include/graphene/net/stcp_socket.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@
#include <fc/crypto/aes.hpp>
#include <fc/crypto/elliptic.hpp>

#include <array>

namespace graphene { namespace net {

/**
Expand Down
46 changes: 32 additions & 14 deletions libraries/net/message_oriented_connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
#include <graphene/net/stcp_socket.hpp>
#include <graphene/net/config.hpp>

#include <atomic>

#ifdef DEFAULT_LOGGER
# undef DEFAULT_LOGGER
#endif
Expand All @@ -52,6 +54,7 @@ namespace graphene { namespace net {
message_oriented_connection* _self;
message_oriented_connection_delegate *_delegate;
stcp_socket _sock;
fc::promise<void>::ptr _ready_for_sending;
fc::future<void> _read_loop_done;
uint64_t _bytes_received;
uint64_t _bytes_sent;
Expand All @@ -60,8 +63,8 @@ namespace graphene { namespace net {
fc::time_point _last_message_received_time;
fc::time_point _last_message_sent_time;

bool _send_message_in_progress;

std::atomic_bool _send_message_in_progress;
std::atomic_bool _read_loop_in_progress;
#ifndef NDEBUG
fc::thread* _thread;
#endif
Expand Down Expand Up @@ -95,9 +98,11 @@ namespace graphene { namespace net {
message_oriented_connection_delegate* delegate)
: _self(self),
_delegate(delegate),
_ready_for_sending(fc::promise<void>::create()),
_bytes_received(0),
_bytes_sent(0),
_send_message_in_progress(false)
_send_message_in_progress(false),
_read_loop_in_progress(false)
#ifndef NDEBUG
,_thread(&fc::thread::current())
#endif
Expand All @@ -121,6 +126,7 @@ namespace graphene { namespace net {
_sock.accept();
assert(!_read_loop_done.valid()); // check to be sure we never launch two read loops
_read_loop_done = fc::async([=](){ read_loop(); }, "message read_loop");
_ready_for_sending->set_value();
}

void message_oriented_connection_impl::connect_to(const fc::ip::endpoint& remote_endpoint)
Expand All @@ -129,6 +135,7 @@ namespace graphene { namespace net {
_sock.connect_to(remote_endpoint);
assert(!_read_loop_done.valid()); // check to be sure we never launch two read loops
_read_loop_done = fc::async([=](){ read_loop(); }, "message read_loop");
_ready_for_sending->set_value();
}

void message_oriented_connection_impl::bind(const fc::ip::endpoint& local_endpoint)
Expand All @@ -137,6 +144,20 @@ namespace graphene { namespace net {
_sock.bind(local_endpoint);
}

class no_parallel_execution_guard final
{
std::atomic_bool* _flag;
public:
explicit no_parallel_execution_guard(std::atomic_bool* flag) : _flag(flag)
{
bool expected = false;
FC_ASSERT( flag->compare_exchange_strong( expected, true ), "Only one thread at time can visit it");
}
~no_parallel_execution_guard()
{
*_flag = false;
}
};

void message_oriented_connection_impl::read_loop()
{
Expand All @@ -145,6 +166,8 @@ namespace graphene { namespace net {
const int LEFTOVER = BUFFER_SIZE - sizeof(message_header);
static_assert(BUFFER_SIZE >= sizeof(message_header), "insufficient buffer");

no_parallel_execution_guard guard( &_read_loop_in_progress );

_connected_time = fc::time_point::now();

fc::oexception exception_to_rethrow;
Expand Down Expand Up @@ -240,17 +263,8 @@ namespace graphene { namespace net {
} send_message_scope_logger(remote_endpoint);
#endif
#endif
struct verify_no_send_in_progress {
bool& var;
verify_no_send_in_progress(bool& var) : var(var)
{
if (var)
elog("Error: two tasks are calling message_oriented_connection::send_message() at the same time");
assert(!var);
var = true;
}
~verify_no_send_in_progress() { var = false; }
} _verify_no_send_in_progress(_send_message_in_progress);
no_parallel_execution_guard guard( &_send_message_in_progress );
_ready_for_sending->wait();

try
{
Expand All @@ -260,8 +274,11 @@ namespace graphene { namespace net {
//pad the message we send to a multiple of 16 bytes
size_t size_with_padding = 16 * ((size_of_message_and_header + 15) / 16);
std::unique_ptr<char[]> padded_message(new char[size_with_padding]);

memcpy(padded_message.get(), (char*)&message_to_send, sizeof(message_header));
memcpy(padded_message.get() + sizeof(message_header), message_to_send.data.data(), message_to_send.size.value() );
char* padding_space = padded_message.get() + sizeof(message_header) + message_to_send.size.value();
memset(padding_space, 0, size_with_padding - size_of_message_and_header);
_sock.write(padded_message.get(), size_with_padding);
_sock.flush();
_bytes_sent += size_with_padding;
Expand Down Expand Up @@ -301,6 +318,7 @@ namespace graphene { namespace net {
{
wlog( "Exception thrown while canceling message_oriented_connection's read_loop, ignoring" );
}
_ready_for_sending->set_exception( std::make_shared<fc::canceled_exception>() );
}

uint64_t message_oriented_connection_impl::get_total_bytes_sent() const
Expand Down
10 changes: 5 additions & 5 deletions libraries/protocol/address.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,12 @@ namespace graphene { namespace protocol {

address::operator std::string()const
{
std::array<char,24> bin_addr;
static_assert( bin_addr.size() >= sizeof(addr) + 4, "address size mismatch" );
memcpy( bin_addr.data(), addr.data(), sizeof(addr) );
char bin_addr[24];
static_assert( sizeof(bin_addr) >= sizeof(addr) + 4, "address size mismatch" );
memcpy( bin_addr, addr.data(), sizeof(addr) );
auto checksum = fc::ripemd160::hash( addr.data(), sizeof(addr) );
memcpy( bin_addr.data() + 20, (char*)&checksum._hash[0], 4 );
return GRAPHENE_ADDRESS_PREFIX + fc::to_base58( bin_addr.data(), bin_addr.size() );
memcpy( bin_addr + sizeof(addr), (char*)&checksum._hash[0], 4 );
return GRAPHENE_ADDRESS_PREFIX + fc::to_base58( bin_addr, sizeof(bin_addr) );
}

} } // namespace graphene::protocol
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ namespace graphene { namespace protocol {

operator std::string()const; ///< converts to base58 + checksum

std::array<char,25> addr; ///< binary representation of address
std::array<char,25> addr{}; ///< binary representation of address, 0-initialized
};

inline bool operator == ( const pts_address& a, const pts_address& b ) { return a.addr == b.addr; }
Expand Down

0 comments on commit 1976d20

Please sign in to comment.