Skip to content

Commit

Permalink
Merge pull request #2002 from pmconrad/1999_padding
Browse files Browse the repository at this point in the history
Fixes for most of valgrind errors related to unintialized values
  • Loading branch information
pmconrad authored Sep 23, 2019
2 parents 44da6da + 91c4b4e commit 5f228ea
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 36 deletions.
5 changes: 5 additions & 0 deletions libraries/net/include/graphene/net/message.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ namespace graphene { namespace net {
{
boost::endian::little_uint32_buf_t size; // number of bytes in message, capped at MAX_MESSAGE_SIZE
boost::endian::little_uint32_buf_t msg_type; // every channel gets a 16 bit message type specifier
message_header()
{
size = 0;
msg_type = 0;
}
};

typedef fc::uint160_t message_hash_type;
Expand Down
43 changes: 21 additions & 22 deletions libraries/net/include/graphene/net/peer_connection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -164,25 +164,25 @@ namespace graphene { namespace net
};


size_t _total_queued_messages_size;
size_t _total_queued_messages_size = 0;
std::queue<std::unique_ptr<queued_message>, std::list<std::unique_ptr<queued_message> > > _queued_messages;
fc::future<void> _send_queued_messages_done;
public:
fc::time_point connection_initiation_time;
fc::time_point connection_closed_time;
fc::time_point connection_terminated_time;
peer_connection_direction direction;
peer_connection_direction direction = peer_connection_direction::unknown;
//connection_state state;
firewalled_state is_firewalled;
firewalled_state is_firewalled = firewalled_state::unknown;
fc::microseconds clock_offset;
fc::microseconds round_trip_delay;

our_connection_state our_state;
bool they_have_requested_close;
their_connection_state their_state;
bool we_have_requested_close;
our_connection_state our_state = our_connection_state::disconnected;
bool they_have_requested_close = false;
their_connection_state their_state = their_connection_state::disconnected;
bool we_have_requested_close = false;

connection_negotiation_status negotiation_status;
connection_negotiation_status negotiation_status = connection_negotiation_status::disconnected;
fc::oexception connection_closed_error;

fc::time_point get_connection_time()const { return _message_connection.get_connection_time(); }
Expand All @@ -197,7 +197,7 @@ namespace graphene { namespace net
* from the user_data field of the hello, or if none is present it will be filled with a
* copy of node_public_key */
node_id_t node_id;
uint32_t core_protocol_version;
uint32_t core_protocol_version = 0;
std::string user_agent;
fc::optional<std::string> graphene_git_revision_sha;
fc::optional<fc::time_point_sec> graphene_git_revision_unix_timestamp;
Expand All @@ -210,8 +210,8 @@ namespace graphene { namespace net
// its hello message. For outbound, they record what we sent the peer
// in our hello message
fc::ip::address inbound_address;
uint16_t inbound_port;
uint16_t outbound_port;
uint16_t inbound_port = 0;
uint16_t outbound_port = 0;
/// @}

typedef std::unordered_map<item_id, fc::time_point> item_to_time_map_type;
Expand All @@ -220,15 +220,15 @@ namespace graphene { namespace net
/// @{
boost::container::deque<item_hash_t> ids_of_items_to_get; /// id of items in the blockchain that this peer has told us about
std::set<item_hash_t> ids_of_items_being_processed; /// list of all items this peer has offered use that we've already handed to the client but the client hasn't finished processing
uint32_t number_of_unfetched_item_ids; /// number of items in the blockchain that follow ids_of_items_to_get but the peer hasn't yet told us their ids
bool peer_needs_sync_items_from_us;
bool we_need_sync_items_from_peer;
uint32_t number_of_unfetched_item_ids = 0; /// number of items in the blockchain that follow ids_of_items_to_get but the peer hasn't yet told us their ids
bool peer_needs_sync_items_from_us = false;
bool we_need_sync_items_from_peer = false;
fc::optional<boost::tuple<std::vector<item_hash_t>, fc::time_point> > item_ids_requested_from_peer; /// we check this to detect a timed-out request and in busy()
fc::time_point last_sync_item_received_time; /// the time we received the last sync item or the time we sent the last batch of sync item requests to this peer
std::set<item_hash_t> sync_items_requested_from_peer; /// ids of blocks we've requested from this peer during sync. fetch from another peer if this peer disconnects
item_hash_t last_block_delegate_has_seen; /// the hash of the last block this peer has told us about that the peer knows
fc::time_point_sec last_block_time_delegate_has_seen;
bool inhibit_fetching_sync_blocks;
bool inhibit_fetching_sync_blocks = false;
/// @}

/// non-synchronization state data
Expand Down Expand Up @@ -258,18 +258,17 @@ namespace graphene { namespace net
// blockchain catch up
fc::time_point transaction_fetching_inhibited_until;

uint32_t last_known_fork_block_number;
uint32_t last_known_fork_block_number = 0;

fc::future<void> accept_or_connect_task_done;

firewall_check_state_data *firewall_check_state;
#ifndef NDEBUG
firewall_check_state_data *firewall_check_state = nullptr;
private:
fc::thread* _thread;
unsigned _send_message_queue_tasks_running; // temporary debugging
#ifndef NDEBUG
fc::thread* _thread = nullptr;
unsigned _send_message_queue_tasks_running = 0; // temporary debugging
#endif
bool _currently_handling_message; // true while we're in the middle of handling a message from the remote system
private:
bool _currently_handling_message = false; // true while we're in the middle of handling a message from the remote system
peer_connection(peer_connection_delegate* delegate);
void destroy();
public:
Expand Down
40 changes: 26 additions & 14 deletions libraries/net/message_oriented_connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
#include <graphene/net/stcp_socket.hpp>
#include <graphene/net/config.hpp>

#include <atomic>

#ifdef DEFAULT_LOGGER
# undef DEFAULT_LOGGER
#endif
Expand Down Expand Up @@ -61,8 +63,8 @@ namespace graphene { namespace net {
fc::time_point _last_message_received_time;
fc::time_point _last_message_sent_time;

bool _send_message_in_progress;

std::atomic_bool _send_message_in_progress;
std::atomic_bool _read_loop_in_progress;
#ifndef NDEBUG
fc::thread* _thread;
#endif
Expand Down Expand Up @@ -99,7 +101,8 @@ namespace graphene { namespace net {
_ready_for_sending(fc::promise<void>::create()),
_bytes_received(0),
_bytes_sent(0),
_send_message_in_progress(false)
_send_message_in_progress(false),
_read_loop_in_progress(false)
#ifndef NDEBUG
,_thread(&fc::thread::current())
#endif
Expand Down Expand Up @@ -141,6 +144,20 @@ namespace graphene { namespace net {
_sock.bind(local_endpoint);
}

class no_parallel_execution_guard final
{
std::atomic_bool* _flag;
public:
explicit no_parallel_execution_guard(std::atomic_bool* flag) : _flag(flag)
{
bool expected = false;
FC_ASSERT( flag->compare_exchange_strong( expected, true ), "Only one thread at time can visit it");
}
~no_parallel_execution_guard()
{
*_flag = false;
}
};

void message_oriented_connection_impl::read_loop()
{
Expand All @@ -149,6 +166,8 @@ namespace graphene { namespace net {
const int LEFTOVER = BUFFER_SIZE - sizeof(message_header);
static_assert(BUFFER_SIZE >= sizeof(message_header), "insufficient buffer");

no_parallel_execution_guard guard( &_read_loop_in_progress );

_connected_time = fc::time_point::now();

fc::oexception exception_to_rethrow;
Expand Down Expand Up @@ -244,17 +263,7 @@ namespace graphene { namespace net {
} send_message_scope_logger(remote_endpoint);
#endif
#endif
struct verify_no_send_in_progress {
bool& var;
verify_no_send_in_progress(bool& var) : var(var)
{
if (var)
elog("Error: two tasks are calling message_oriented_connection::send_message() at the same time");
assert(!var);
var = true;
}
~verify_no_send_in_progress() { var = false; }
} _verify_no_send_in_progress(_send_message_in_progress);
no_parallel_execution_guard guard( &_send_message_in_progress );
_ready_for_sending->wait();

try
Expand All @@ -265,8 +274,11 @@ namespace graphene { namespace net {
//pad the message we send to a multiple of 16 bytes
size_t size_with_padding = 16 * ((size_of_message_and_header + 15) / 16);
std::unique_ptr<char[]> padded_message(new char[size_with_padding]);

memcpy(padded_message.get(), (char*)&message_to_send, sizeof(message_header));
memcpy(padded_message.get() + sizeof(message_header), message_to_send.data.data(), message_to_send.size.value() );
char* padding_space = padded_message.get() + sizeof(message_header) + message_to_send.size.value();
memset(padding_space, 0, size_with_padding - size_of_message_and_header);
_sock.write(padded_message.get(), size_with_padding);
_sock.flush();
_bytes_sent += size_with_padding;
Expand Down

0 comments on commit 5f228ea

Please sign in to comment.