Net¶
(coming soon…)
File directory: (../ibraries/net/xxx.cpp)
File directory: (../ibraries/net/include/graphene/net/xxx.hpp)
Table of Contents
config¶
#pragma once
#define GRAPHENE_NET_PROTOCOL_VERSION 106
/**
* Define this to enable debugging code in the p2p network interface.
* This is code that would never be executed in normal operation, but is
* used for automated testing (creating artificial net splits,
* tracking where messages came from and when)
*/
#define ENABLE_P2P_DEBUGGING_API 1
/**
* 2MiB
*/
#define MAX_MESSAGE_SIZE 1024*1024*2
#define GRAPHENE_NET_DEFAULT_PEER_CONNECTION_RETRY_TIME 30 // seconds
/**
* AFter trying all peers, how long to wait before we check to
* see if there are peers we can try again.
*/
#define GRAPHENE_PEER_DATABASE_RETRY_DELAY 15 // seconds
#define GRAPHENE_NET_PEER_HANDSHAKE_INACTIVITY_TIMEOUT 5
#define GRAPHENE_NET_PEER_DISCONNECT_TIMEOUT 20
/* uncomment next line to use testnet seed ip and port */
//#define GRAPHENE_TEST_NETWORK 1
#define GRAPHENE_NET_TEST_SEED_IP "104.236.44.210" // autogenerated
#define GRAPHENE_NET_TEST_P2P_PORT 1700
#define GRAPHENE_NET_DEFAULT_P2P_PORT 1776
#define GRAPHENE_NET_DEFAULT_DESIRED_CONNECTIONS 20
#define GRAPHENE_NET_DEFAULT_MAX_CONNECTIONS 200
#define GRAPHENE_NET_MAXIMUM_QUEUED_MESSAGES_IN_BYTES (1024 * 1024)
/**
* When we receive a message from the network, we advertise it to
* our peers and save a copy in a cache were we will find it if
* a peer requests it. We expire out old items out of the cache
* after this number of blocks go by.
*
* Recently lowered from 30 to match the default expiration time
* the web wallet imposes on transactions.
*/
#define GRAPHENE_NET_MESSAGE_CACHE_DURATION_IN_BLOCKS 5
/**
* We prevent a peer from offering us a list of blocks which, if we fetched them
* all, would result in a blockchain that extended into the future.
* This parameter gives us some wiggle room, allowing a peer to give us blocks
* that would put our blockchain up to an hour in the future, just in case
* our clock is a bit off.
*/
#define GRAPHENE_NET_FUTURE_SYNC_BLOCKS_GRACE_PERIOD_SEC (60 * 60)
#define GRAPHENE_NET_MAX_INVENTORY_SIZE_IN_MINUTES 2
#define GRAPHENE_NET_MAX_BLOCKS_PER_PEER_DURING_SYNCING 200
/**
* During normal operation, how many items will be fetched from each
* peer at a time. This will only come into play when the network
* is being flooded -- typically transactions will be fetched as soon
* as we find out about them, so only one item will be requested
* at a time.
*
* No tests have been done to find the optimal value for this
* parameter, so consider increasing or decreasing it if performance
* during flooding is lacking.
*/
#define GRAPHENE_NET_MAX_ITEMS_PER_PEER_DURING_NORMAL_OPERATION 1
/**
* Instead of fetching all item IDs from a peer, then fetching all blocks
* from a peer, we will interleave them. Fetch at least this many block IDs,
* then switch into block-fetching mode until the number of blocks we know about
* but haven't yet fetched drops below this
*/
#define GRAPHENE_NET_MIN_BLOCK_IDS_TO_PREFETCH 10000
#define GRAPHENE_NET_MAX_TRX_PER_SECOND 1000
#define GRAPHENE_NET_MAX_NESTED_OBJECTS (250)
#define MAXIMUM_PEERDB_SIZE 1000
core_messages¶
using graphene::chain::signed_transaction;
using graphene::chain::block_id_type;
using graphene::chain::transaction_id_type;
using graphene::chain::signed_block;
typedef fc::ecc::public_key_data node_id_t;
typedef fc::ripemd160 item_hash_t;
struct item_id
{
uint32_t item_type;
item_hash_t item_hash;
item_id() {}
item_id(uint32_t type, const item_hash_t& hash) :
item_type(type),
item_hash(hash)
{}
bool operator==(const item_id& other) const
{
return item_type == other.item_type &&
item_hash == other.item_hash;
}
};
enum core_message_type_enum
{
trx_message_type = 1000,
block_message_type = 1001,
core_message_type_first = 5000,
item_ids_inventory_message_type = 5001,
blockchain_item_ids_inventory_message_type = 5002,
fetch_blockchain_item_ids_message_type = 5003,
fetch_items_message_type = 5004,
item_not_available_message_type = 5005,
hello_message_type = 5006,
connection_accepted_message_type = 5007,
connection_rejected_message_type = 5008,
address_request_message_type = 5009,
address_message_type = 5010,
closing_connection_message_type = 5011,
current_time_request_message_type = 5012,
current_time_reply_message_type = 5013,
check_firewall_message_type = 5014,
check_firewall_reply_message_type = 5015,
get_current_connections_request_message_type = 5016,
get_current_connections_reply_message_type = 5017,
core_message_type_last = 5099
};
const uint32_t core_protocol_version = GRAPHENE_NET_PROTOCOL_VERSION;
trx_message¶
struct trx_message
{
static const core_message_type_enum type;
signed_transaction trx;
trx_message() {}
trx_message(signed_transaction transaction) :
trx(std::move(transaction))
{}
};
block_message¶
struct block_message
{
static const core_message_type_enum type;
block_message(){}
block_message(const signed_block& blk )
:block(blk),block_id(blk.id()){}
signed_block block;
block_id_type block_id;
};
item_ids_inventory_message¶
struct item_ids_inventory_message
{
static const core_message_type_enum type;
uint32_t item_type;
std::vector<item_hash_t> item_hashes_available;
item_ids_inventory_message() {}
item_ids_inventory_message(uint32_t item_type, const std::vector<item_hash_t>& item_hashes_available) :
item_type(item_type),
item_hashes_available(item_hashes_available)
{}
};
blockchain_item_ids_inventory_message¶
struct blockchain_item_ids_inventory_message
{
static const core_message_type_enum type;
uint32_t total_remaining_item_count;
uint32_t item_type;
std::vector<item_hash_t> item_hashes_available;
blockchain_item_ids_inventory_message() {}
blockchain_item_ids_inventory_message(uint32_t total_remaining_item_count,
uint32_t item_type,
const std::vector<item_hash_t>& item_hashes_available) :
total_remaining_item_count(total_remaining_item_count),
item_type(item_type),
item_hashes_available(item_hashes_available)
{}
};
fetch_blockchain_item_ids_message¶
struct fetch_blockchain_item_ids_message
{
static const core_message_type_enum type;
uint32_t item_type;
std::vector<item_hash_t> blockchain_synopsis;
fetch_blockchain_item_ids_message() {}
fetch_blockchain_item_ids_message(uint32_t item_type, const std::vector<item_hash_t>& blockchain_synopsis) :
item_type(item_type),
blockchain_synopsis(blockchain_synopsis)
{}
};
fetch_items_message¶
struct fetch_items_message
{
static const core_message_type_enum type;
uint32_t item_type;
std::vector<item_hash_t> items_to_fetch;
fetch_items_message() {}
fetch_items_message(uint32_t item_type, const std::vector<item_hash_t>& items_to_fetch) :
item_type(item_type),
items_to_fetch(items_to_fetch)
{}
};
item_not_available_message¶
struct item_not_available_message
{
static const core_message_type_enum type;
item_id requested_item;
item_not_available_message() {}
item_not_available_message(const item_id& requested_item) :
requested_item(requested_item)
{}
};
hello_message¶
struct hello_message
{
static const core_message_type_enum type;
std::string user_agent;
uint32_t core_protocol_version;
fc::ip::address inbound_address;
uint16_t inbound_port;
uint16_t outbound_port;
node_id_t node_public_key;
fc::ecc::compact_signature signed_shared_secret;
fc::sha256 chain_id;
fc::variant_object user_data;
hello_message() {}
hello_message(const std::string& user_agent,
uint32_t core_protocol_version,
const fc::ip::address& inbound_address,
uint16_t inbound_port,
uint16_t outbound_port,
const node_id_t& node_public_key,
const fc::ecc::compact_signature& signed_shared_secret,
const fc::sha256& chain_id_arg,
const fc::variant_object& user_data ) :
user_agent(user_agent),
core_protocol_version(core_protocol_version),
inbound_address(inbound_address),
inbound_port(inbound_port),
outbound_port(outbound_port),
node_public_key(node_public_key),
signed_shared_secret(signed_shared_secret),
chain_id(chain_id_arg),
user_data(user_data)
{}
};
connection_accepted_message¶
struct connection_accepted_message
{
static const core_message_type_enum type;
connection_accepted_message() {}
};
enum class rejection_reason_code { unspecified,
different_chain,
already_connected,
connected_to_self,
not_accepting_connections,
blocked,
invalid_hello_message,
client_too_old };
connection_rejected_message¶
struct connection_rejected_message
{
static const core_message_type_enum type;
std::string user_agent;
uint32_t core_protocol_version;
fc::ip::endpoint remote_endpoint;
std::string reason_string;
fc::enum_type<uint8_t, rejection_reason_code> reason_code;
connection_rejected_message() {}
connection_rejected_message(const std::string& user_agent, uint32_t core_protocol_version,
const fc::ip::endpoint& remote_endpoint, rejection_reason_code reason_code,
const std::string& reason_string) :
user_agent(user_agent),
core_protocol_version(core_protocol_version),
remote_endpoint(remote_endpoint),
reason_string(reason_string),
reason_code(reason_code)
{}
};
address_request_message¶
struct address_request_message
{
static const core_message_type_enum type;
address_request_message() {}
};
enum class peer_connection_direction { unknown, inbound, outbound };
enum class firewalled_state { unknown, firewalled, not_firewalled };
address_info¶
struct address_info
{
fc::ip::endpoint remote_endpoint;
fc::time_point_sec last_seen_time;
fc::microseconds latency;
node_id_t node_id;
fc::enum_type<uint8_t, peer_connection_direction> direction;
fc::enum_type<uint8_t, firewalled_state> firewalled;
address_info() {}
address_info(const fc::ip::endpoint& remote_endpoint,
const fc::time_point_sec last_seen_time,
const fc::microseconds latency,
const node_id_t& node_id,
peer_connection_direction direction,
firewalled_state firewalled) :
remote_endpoint(remote_endpoint),
last_seen_time(last_seen_time),
latency(latency),
node_id(node_id),
direction(direction),
firewalled(firewalled)
{}
};
address_message¶
struct address_message
{
static const core_message_type_enum type;
std::vector<address_info> addresses;
};
closing_connection_message¶
struct closing_connection_message
{
static const core_message_type_enum type;
std::string reason_for_closing;
bool closing_due_to_error;
fc::oexception error;
closing_connection_message() : closing_due_to_error(false) {}
closing_connection_message(const std::string& reason_for_closing,
bool closing_due_to_error = false,
const fc::oexception& error = fc::oexception()) :
reason_for_closing(reason_for_closing),
closing_due_to_error(closing_due_to_error),
error(error)
{}
};
current_time_request_message¶
struct current_time_request_message
{
static const core_message_type_enum type;
fc::time_point request_sent_time;
current_time_request_message(){}
current_time_request_message(const fc::time_point request_sent_time) :
request_sent_time(request_sent_time)
{}
};
current_time_reply_message¶
struct current_time_reply_message
{
static const core_message_type_enum type;
fc::time_point request_sent_time;
fc::time_point request_received_time;
fc::time_point reply_transmitted_time;
current_time_reply_message(){}
current_time_reply_message(const fc::time_point request_sent_time,
const fc::time_point request_received_time,
const fc::time_point reply_transmitted_time = fc::time_point()) :
request_sent_time(request_sent_time),
request_received_time(request_received_time),
reply_transmitted_time(reply_transmitted_time)
{}
};
check_firewall_message¶
struct check_firewall_message
{
static const core_message_type_enum type;
node_id_t node_id;
fc::ip::endpoint endpoint_to_check;
};
enum class firewall_check_result
{
unable_to_check,
unable_to_connect,
connection_successful
};
check_firewall_reply_message¶
struct check_firewall_reply_message
{
static const core_message_type_enum type;
node_id_t node_id;
fc::ip::endpoint endpoint_checked;
fc::enum_type<uint8_t, firewall_check_result> result;
};
get_current_connections_request_message¶
struct get_current_connections_request_message
{
static const core_message_type_enum type;
};
current_connection_data¶
struct current_connection_data
{
uint32_t connection_duration; // in seconds
fc::ip::endpoint remote_endpoint;
node_id_t node_id;
fc::microseconds clock_offset;
fc::microseconds round_trip_delay;
fc::enum_type<uint8_t, peer_connection_direction> connection_direction;
fc::enum_type<uint8_t, firewalled_state> firewalled;
fc::variant_object user_data;
};
get_current_connections_reply_message¶
struct get_current_connections_reply_message
{
static const core_message_type_enum type;
uint32_t upload_rate_one_minute;
uint32_t download_rate_one_minute;
uint32_t upload_rate_fifteen_minutes;
uint32_t download_rate_fifteen_minutes;
uint32_t upload_rate_one_hour;
uint32_t download_rate_one_hour;
std::vector<current_connection_data> current_connections;
};
exceptions¶
// registered in node.cpp
FC_DECLARE_EXCEPTION( net_exception, 90000, "P2P Networking Exception" );
FC_DECLARE_DERIVED_EXCEPTION( send_queue_overflow, graphene::net::net_exception, 90001, "send queue for this peer exceeded maximum size" );
FC_DECLARE_DERIVED_EXCEPTION( insufficient_relay_fee, graphene::net::net_exception, 90002, "insufficient relay fee" );
FC_DECLARE_DERIVED_EXCEPTION( already_connected_to_requested_peer, graphene::net::net_exception, 90003, "already connected to requested peer" );
FC_DECLARE_DERIVED_EXCEPTION( block_older_than_undo_history, graphene::net::net_exception, 90004, "block is older than our undo history allows us to process" );
FC_DECLARE_DERIVED_EXCEPTION( peer_is_on_an_unreachable_fork, graphene::net::net_exception, 90005, "peer is on another fork" );
FC_DECLARE_DERIVED_EXCEPTION( unlinkable_block_exception, graphene::net::net_exception, 90006, "unlinkable block" )
message¶
message_header¶
Defines an 8 byte header that is always present because the minimum encrypted packet size is 8 bytes (blowfish). The maximum message size is defined in config.hpp. The channel, and message type is also included because almost every channel will have a message type field and we might as well include it in the 8 byte header to save space.
struct message_header
{
uint32_t size; // number of bytes in message, capped at MAX_MESSAGE_SIZE
uint32_t msg_type; // every channel gets a 16 bit message type specifier
};
typedef fc::uint160_t message_hash_type;
message¶
Abstracts the process of packing/unpacking a message for a particular channel.
struct message : public message_header
{
std::vector<char> data;
message(){}
message( message&& m )
:message_header(m),data( std::move(m.data) ){}
message( const message& m )
:message_header(m),data( m.data ){}
/**
* Assumes that T::type specifies the message type
*/
template<typename T>
message( const T& m )
{
msg_type = T::type;
data = fc::raw::pack(m);
size = (uint32_t)data.size();
}
fc::uint160_t id()const
{
return fc::ripemd160::hash( data.data(), (uint32_t)data.size() );
}
/**
* Automatically checks the type and deserializes T in the
* opposite process from the constructor.
*/
template<typename T>
T as()const
{
try {
FC_ASSERT( msg_type == T::type );
T tmp;
if( data.size() )
{
fc::datastream<const char*> ds( data.data(), data.size() );
fc::raw::unpack( ds, tmp );
}
else
{
// just to make sure that tmp shouldn't have any data
fc::datastream<const char*> ds( nullptr, 0 );
fc::raw::unpack( ds, tmp );
}
return tmp;
} FC_RETHROW_EXCEPTIONS( warn,
"error unpacking network message as a '${type}' ${x} !=? ${msg_type}",
("type", fc::get_typename<T>::name() )
("x", T::type)
("msg_type", msg_type)
);
}
};
message_oriented_connection¶
message_oriented_connection_delegate¶
receives incoming messages from a message_oriented_connection object
class message_oriented_connection_delegate
{
public:
virtual void on_message(message_oriented_connection* originating_connection, const message& received_message) = 0;
virtual void on_connection_closed(message_oriented_connection* originating_connection) = 0;
};
message_oriented_connection¶
uses a secure socket to create a connection that reads and writes a stream of
fc::net::message
objects
class message_oriented_connection
{
public:
message_oriented_connection(message_oriented_connection_delegate* delegate = nullptr);
~message_oriented_connection();
fc::tcp_socket& get_socket();
void accept();
void bind(const fc::ip::endpoint& local_endpoint);
void connect_to(const fc::ip::endpoint& remote_endpoint);
void send_message(const message& message_to_send);
void close_connection();
void destroy_connection();
uint64_t get_total_bytes_sent() const;
uint64_t get_total_bytes_received() const;
fc::time_point get_last_message_sent_time() const;
fc::time_point get_last_message_received_time() const;
fc::time_point get_connection_time() const;
fc::sha512 get_shared_secret() const;
private:
std::unique_ptr<detail::message_oriented_connection_impl> my;
};
typedef std::shared_ptr<message_oriented_connection> message_oriented_connection_ptr;
node¶
message_propagation_data¶
during network development, we need to track message propagation across the network using a structure like this:
struct message_propagation_data
{
fc::time_point received_time;
fc::time_point validated_time;
node_id_t originating_peer;
};
node_delegate¶
used by node reports status to client or fetch data from client
class node_delegate
{
public:
virtual ~node_delegate(){}
/**
* If delegate has the item, the network has no need to fetch it.
*/
virtual bool has_item( const net::item_id& id ) = 0;
/**
* @brief Called when a new block comes in from the network
*
* @param sync_mode true if the message was fetched through the sync process, false during normal operation
* @returns true if this message caused the blockchain to switch forks, false if it did not
*
* @throws exception if error validating the item, otherwise the item is
* safe to broadcast on.
*/
virtual bool handle_block( const graphene::net::block_message& blk_msg, bool sync_mode,
std::vector<fc::uint160_t>& contained_transaction_message_ids ) = 0;
/**
* @brief Called when a new transaction comes in from the network
*
* @throws exception if error validating the item, otherwise the item is
* safe to broadcast on.
*/
virtual void handle_transaction( const graphene::net::trx_message& trx_msg ) = 0;
/**
* @brief Called when a new message comes in from the network other than a
* block or a transaction. Currently there are no other possible
* messages, so this should never be called.
*
* @throws exception if error validating the item, otherwise the item is
* safe to broadcast on.
*/
virtual void handle_message( const message& message_to_process ) = 0;
/**
* Assuming all data elements are ordered in some way, this method should
* return up to limit ids that occur *after* from_id.
* On return, remaining_item_count will be set to the number of items
* in our blockchain after the last item returned in the result,
* or 0 if the result contains the last item in the blockchain
*/
virtual std::vector<item_hash_t> get_block_ids(const std::vector<item_hash_t>& blockchain_synopsis,
uint32_t& remaining_item_count,
uint32_t limit = 2000) = 0;
/**
* Given the hash of the requested data, fetch the body.
*/
virtual message get_item( const item_id& id ) = 0;
virtual chain_id_type get_chain_id()const = 0;
/**
* Returns a synopsis of the blockchain used for syncing.
* This consists of a list of selected item hashes from our current preferred
* blockchain, exponentially falling off into the past. Horrible explanation.
*
* If the blockchain is empty, it will return the empty list.
* If the blockchain has one block, it will return a list containing just that block.
* If it contains more than one block:
* the first element in the list will be the hash of the highest numbered block that
* we cannot undo
* the second element will be the hash of an item at the half way point in the undoable
* segment of the blockchain
* the third will be ~3/4 of the way through the undoable segment of the block chain
* the fourth will be at ~7/8...
* &c.
* the last item in the list will be the hash of the most recent block on our preferred chain
*/
virtual std::vector<item_hash_t> get_blockchain_synopsis(const item_hash_t& reference_point,
uint32_t number_of_blocks_after_reference_point) = 0;
/**
* Call this after the call to handle_message succeeds.
*
* @param item_type the type of the item we're synchronizing, will be the same as item passed to the sync_from() call
* @param item_count the number of items known to the node that haven't been sent to handle_item() yet.
* After `item_count` more calls to handle_item(), the node will be in sync
*/
virtual void sync_status( uint32_t item_type, uint32_t item_count ) = 0;
/**
* Call any time the number of connected peers changes.
*/
virtual void connection_count_changed( uint32_t c ) = 0;
virtual uint32_t get_block_number(const item_hash_t& block_id) = 0;
/**
* Returns the time a block was produced (if block_id = 0, returns genesis time).
* If we don't know about the block, returns time_point_sec::min()
*/
virtual fc::time_point_sec get_block_time(const item_hash_t& block_id) = 0;
virtual item_hash_t get_head_block_id() const = 0;
virtual uint32_t estimate_last_known_fork_from_git_revision_timestamp(uint32_t unix_timestamp) const = 0;
virtual void error_encountered(const std::string& message, const fc::oexception& error) = 0;
virtual uint8_t get_current_block_interval_in_seconds() const = 0;
};
peer_status¶
Information about connected peers that the client may want to make available to the user.
struct peer_status
{
uint32_t version;
fc::ip::endpoint host;
/** info contains the fields required by bitcoin-rpc's getpeerinfo call, we will likely
extend it with our own fields. */
fc::variant_object info;
};
node¶
provides application independent P2P broadcast and data synchronization
class node : public std::enable_shared_from_this<node>
{
public:
node(const std::string& user_agent);
~node();
void close();
void set_node_delegate( node_delegate* del );
void load_configuration( const fc::path& configuration_directory );
virtual void listen_to_p2p_network();
virtual void connect_to_p2p_network();
/**
* Add endpoint to internal level_map database of potential nodes
* to attempt to connect to. This database is consulted any time
* the number connected peers falls below the target.
*/
void add_node( const fc::ip::endpoint& ep );
/**
* Attempt to connect to the specified endpoint immediately.
*/
virtual void connect_to_endpoint( const fc::ip::endpoint& ep );
/**
* Specifies the network interface and port upon which incoming
* connections should be accepted.
*/
void listen_on_endpoint( const fc::ip::endpoint& ep, bool wait_if_not_available );
/**
* Call with true to enable listening for incoming connections
*/
void accept_incoming_connections(bool accept);
/**
* Specifies the port upon which incoming connections should be accepted.
* @param port the port to listen on
* @param wait_if_not_available if true and the port is not available, enter a
* sleep and retry loop to wait for it to become
* available. If false and the port is not available,
* just choose a random available port
*/
void listen_on_port(uint16_t port, bool wait_if_not_available);
/**
* Returns the endpoint the node is listening on. This is usually the same
* as the value previously passed in to listen_on_endpoint, unless we
* were unable to bind to that port.
*/
virtual fc::ip::endpoint get_actual_listening_endpoint() const;
/**
* @return a list of peers that are currently connected.
*/
std::vector<peer_status> get_connected_peers() const;
/** return the number of peers we're actively connected to */
virtual uint32_t get_connection_count() const;
/**
* Add message to outgoing inventory list, notify peers that
* I have a message ready.
*/
virtual void broadcast( const message& item_to_broadcast );
virtual void broadcast_transaction( const signed_transaction& trx )
{
broadcast( trx_message(trx) );
}
/**
* Node starts the process of fetching all items after item_id of the
* given item_type. During this process messages are not broadcast.
*/
virtual void sync_from(const item_id& current_head_block, const std::vector<uint32_t>& hard_fork_block_numbers);
bool is_connected() const;
void set_advanced_node_parameters(const fc::variant_object& params);
fc::variant_object get_advanced_node_parameters();
message_propagation_data get_transaction_propagation_data(const graphene::chain::transaction_id_type& transaction_id);
message_propagation_data get_block_propagation_data(const graphene::chain::block_id_type& block_id);
node_id_t get_node_id() const;
void set_allowed_peers(const std::vector<node_id_t>& allowed_peers);
/**
* Instructs the node to forget everything in its peer database, mostly for debugging
* problems where nodes are failing to connect to the network
*/
void clear_peer_database();
void set_total_bandwidth_limit(uint32_t upload_bytes_per_second, uint32_t download_bytes_per_second);
fc::variant_object network_get_info() const;
fc::variant_object network_get_usage_stats() const;
std::vector<potential_peer_record> get_potential_peers() const;
void disable_peer_advertising();
fc::variant_object get_call_statistics() const;
private:
std::unique_ptr<detail::node_impl, detail::node_impl_deleter> my;
};
simulated_network¶
class simulated_network : public node
{
public:
~simulated_network();
simulated_network(const std::string& user_agent) : node(user_agent) {}
void listen_to_p2p_network() override {}
void connect_to_p2p_network() override {}
void connect_to_endpoint(const fc::ip::endpoint& ep) override {}
fc::ip::endpoint get_actual_listening_endpoint() const override { return fc::ip::endpoint(); }
void sync_from(const item_id& current_head_block, const std::vector<uint32_t>& hard_fork_block_numbers) override {}
void broadcast(const message& item_to_broadcast) override;
void add_node_delegate(node_delegate* node_delegate_to_add);
virtual uint32_t get_connection_count() const override { return 8; }
private:
struct node_info;
void message_sender(node_info* destination_node);
std::list<node_info*> network_nodes;
};
typedef std::shared_ptr<node> node_ptr;
typedef std::shared_ptr<simulated_network> simulated_network_ptr;
peer_connection¶
firewall_check_state_data¶
struct firewall_check_state_data
{
node_id_t expected_node_id;
fc::ip::endpoint endpoint_to_test;
// if we're coordinating a firewall check for another node, these are the helper
// nodes we've already had do the test (if this structure is still relevant, that
// that means they have all had indeterminate results
std::set<node_id_t> nodes_already_tested;
// If we're a just a helper node, this is the node we report back to
// when we have a result
node_id_t requesting_peer;
};
peer_connection¶
class peer_connection;
class peer_connection_delegate
{
public:
virtual void on_message(peer_connection* originating_peer,
const message& received_message) = 0;
virtual void on_connection_closed(peer_connection* originating_peer) = 0;
virtual message get_message_for_item(const item_id& item) = 0;
};
class peer_connection;
typedef std::shared_ptr<peer_connection> peer_connection_ptr;
class peer_connection : public message_oriented_connection_delegate,
public std::enable_shared_from_this<peer_connection>
{
public:
enum class our_connection_state
{
disconnected,
just_connected, // if in this state, we have sent a hello_message
connection_accepted, // remote side has sent us a connection_accepted, we're operating normally with them
connection_rejected // remote side has sent us a connection_rejected, we may be exchanging address with them or may just be waiting for them to close
};
enum class their_connection_state
{
disconnected,
just_connected, // we have not yet received a hello_message
connection_accepted, // we have sent them a connection_accepted
connection_rejected // we have sent them a connection_rejected
};
enum class connection_negotiation_status
{
disconnected,
connecting,
connected,
accepting,
accepted,
hello_sent,
peer_connection_accepted,
peer_connection_rejected,
negotiation_complete,
closing,
closed
};
private:
peer_connection_delegate* _node;
fc::optional<fc::ip::endpoint> _remote_endpoint;
message_oriented_connection _message_connection;
/* a base class for messages on the queue, to hide the fact that some
* messages are complete messages and some are only hashes of messages.
*/
struct queued_message
{
fc::time_point enqueue_time;
fc::time_point transmission_start_time;
fc::time_point transmission_finish_time;
queued_message(fc::time_point enqueue_time = fc::time_point::now()) :
enqueue_time(enqueue_time)
{}
virtual message get_message(peer_connection_delegate* node) = 0;
/** returns roughly the number of bytes of memory the message is consuming while
* it is sitting on the queue
*/
virtual size_t get_size_in_queue() = 0;
virtual ~queued_message() {}
};
/* when you queue up a 'real_queued_message', a full copy of the message is
* stored on the heap until it is sent
*/
struct real_queued_message : queued_message
{
message message_to_send;
size_t message_send_time_field_offset;
real_queued_message(message message_to_send,
size_t message_send_time_field_offset = (size_t)-1) :
message_to_send(std::move(message_to_send)),
message_send_time_field_offset(message_send_time_field_offset)
{}
message get_message(peer_connection_delegate* node) override;
size_t get_size_in_queue() override;
};
/* when you queue up a 'virtual_queued_message', we just queue up the hash of the
* item we want to send. When it reaches the top of the queue, we make a callback
* to the node to generate the message.
*/
struct virtual_queued_message : queued_message
{
item_id item_to_send;
virtual_queued_message(item_id item_to_send) :
item_to_send(std::move(item_to_send))
{}
message get_message(peer_connection_delegate* node) override;
size_t get_size_in_queue() override;
};
size_t _total_queued_messages_size;
std::queue<std::unique_ptr<queued_message>, std::list<std::unique_ptr<queued_message> > > _queued_messages;
fc::future<void> _send_queued_messages_done;
public:
fc::time_point connection_initiation_time;
fc::time_point connection_closed_time;
fc::time_point connection_terminated_time;
peer_connection_direction direction;
//connection_state state;
firewalled_state is_firewalled;
fc::microseconds clock_offset;
fc::microseconds round_trip_delay;
our_connection_state our_state;
bool they_have_requested_close;
their_connection_state their_state;
bool we_have_requested_close;
connection_negotiation_status negotiation_status;
fc::oexception connection_closed_error;
fc::time_point get_connection_time()const { return _message_connection.get_connection_time(); }
fc::time_point get_connection_terminated_time()const { return connection_terminated_time; }
/// data about the peer node
/// @{
/** node_public_key from the hello message, zero-initialized before we get the hello */
node_id_t node_public_key;
/** the unique identifier we'll use to refer to the node with. zero-initialized before
* we receive the hello message, at which time it will be filled with either the "node_id"
* from the user_data field of the hello, or if none is present it will be filled with a
* copy of node_public_key */
node_id_t node_id;
uint32_t core_protocol_version;
std::string user_agent;
fc::optional<std::string> graphene_git_revision_sha;
fc::optional<fc::time_point_sec> graphene_git_revision_unix_timestamp;
fc::optional<std::string> fc_git_revision_sha;
fc::optional<fc::time_point_sec> fc_git_revision_unix_timestamp;
fc::optional<std::string> platform;
fc::optional<uint32_t> bitness;
// for inbound connections, these fields record what the peer sent us in
// its hello message. For outbound, they record what we sent the peer
// in our hello message
fc::ip::address inbound_address;
uint16_t inbound_port;
uint16_t outbound_port;
/// @}
typedef std::unordered_map<item_id, fc::time_point> item_to_time_map_type;
/// blockchain synchronization state data
/// @{
boost::container::deque<item_hash_t> ids_of_items_to_get; /// id of items in the blockchain that this peer has told us about
std::set<item_hash_t> ids_of_items_being_processed; /// list of all items this peer has offered use that we've already handed to the client but the client hasn't finished processing
uint32_t number_of_unfetched_item_ids; /// number of items in the blockchain that follow ids_of_items_to_get but the peer hasn't yet told us their ids
bool peer_needs_sync_items_from_us;
bool we_need_sync_items_from_peer;
fc::optional<boost::tuple<std::vector<item_hash_t>, fc::time_point> > item_ids_requested_from_peer; /// we check this to detect a timed-out request and in busy()
fc::time_point last_sync_item_received_time; /// the time we received the last sync item or the time we sent the last batch of sync item requests to this peer
std::set<item_hash_t> sync_items_requested_from_peer; /// ids of blocks we've requested from this peer during sync. fetch from another peer if this peer disconnects
item_hash_t last_block_delegate_has_seen; /// the hash of the last block this peer has told us about that the peer knows
fc::time_point_sec last_block_time_delegate_has_seen;
bool inhibit_fetching_sync_blocks;
/// @}
/// non-synchronization state data
/// @{
struct timestamped_item_id
{
item_id item;
fc::time_point_sec timestamp;
timestamped_item_id(const item_id& item, const fc::time_point_sec timestamp) :
item(item),
timestamp(timestamp)
{}
};
struct timestamp_index{};
typedef boost::multi_index_container<timestamped_item_id,
boost::multi_index::indexed_by<boost::multi_index::hashed_unique<boost::multi_index::member<timestamped_item_id, item_id, ×tamped_item_id::item>,
std::hash<item_id> >,
boost::multi_index::ordered_non_unique<boost::multi_index::tag<timestamp_index>,
boost::multi_index::member<timestamped_item_id,
fc::time_point_sec,
×tamped_item_id::timestamp> > > > timestamped_items_set_type;
timestamped_items_set_type inventory_peer_advertised_to_us;
timestamped_items_set_type inventory_advertised_to_peer;
item_to_time_map_type items_requested_from_peer; /// items we've requested from this peer during normal operation. fetch from another peer if this peer disconnects
/// @}
// if they're flooding us with transactions, we set this to avoid fetching for a few seconds to let the
// blockchain catch up
fc::time_point transaction_fetching_inhibited_until;
uint32_t last_known_fork_block_number;
fc::future<void> accept_or_connect_task_done;
firewall_check_state_data *firewall_check_state;
#ifndef NDEBUG
private:
fc::thread* _thread;
unsigned _send_message_queue_tasks_running; // temporary debugging
#endif
bool _currently_handling_message; // true while we're in the middle of handling a message from the remote system
private:
peer_connection(peer_connection_delegate* delegate);
void destroy();
public:
static peer_connection_ptr make_shared(peer_connection_delegate* delegate); // use this instead of the constructor
virtual ~peer_connection();
fc::tcp_socket& get_socket();
void accept_connection();
void connect_to(const fc::ip::endpoint& remote_endpoint, fc::optional<fc::ip::endpoint> local_endpoint = fc::optional<fc::ip::endpoint>());
void on_message(message_oriented_connection* originating_connection, const message& received_message) override;
void on_connection_closed(message_oriented_connection* originating_connection) override;
void send_queueable_message(std::unique_ptr<queued_message>&& message_to_send);
void send_message(const message& message_to_send, size_t message_send_time_field_offset = (size_t)-1);
void send_item(const item_id& item_to_send);
void close_connection();
void destroy_connection();
uint64_t get_total_bytes_sent() const;
uint64_t get_total_bytes_received() const;
fc::time_point get_last_message_sent_time() const;
fc::time_point get_last_message_received_time() const;
fc::optional<fc::ip::endpoint> get_remote_endpoint();
fc::ip::endpoint get_local_endpoint();
void set_remote_endpoint(fc::optional<fc::ip::endpoint> new_remote_endpoint);
bool busy() const;
bool idle() const;
bool is_currently_handling_message() const;
bool is_transaction_fetching_inhibited() const;
fc::sha512 get_shared_secret() const;
void clear_old_inventory();
bool is_inventory_advertised_to_us_list_full_for_transactions() const;
bool is_inventory_advertised_to_us_list_full() const;
bool performing_firewall_check() const;
fc::optional<fc::ip::endpoint> get_endpoint_for_connecting() const;
private:
void send_queued_messages_task();
void accept_connection_task();
void connect_to_task(const fc::ip::endpoint& remote_endpoint);
};
typedef std::shared_ptr<peer_connection> peer_connection_ptr;
peer_database¶
potential_peer_last_connection_disposition¶
enum potential_peer_last_connection_disposition
{
never_attempted_to_connect,
last_connection_failed,
last_connection_rejected,
last_connection_handshaking_failed,
last_connection_succeeded
};
potential_peer_record¶
struct potential_peer_record
{
fc::ip::endpoint endpoint;
fc::time_point_sec last_seen_time;
fc::enum_type<uint8_t,potential_peer_last_connection_disposition> last_connection_disposition;
fc::time_point_sec last_connection_attempt_time;
uint32_t number_of_successful_connection_attempts;
uint32_t number_of_failed_connection_attempts;
fc::optional<fc::exception> last_error;
potential_peer_record() :
number_of_successful_connection_attempts(0),
number_of_failed_connection_attempts(0){}
potential_peer_record(fc::ip::endpoint endpoint,
fc::time_point_sec last_seen_time = fc::time_point_sec(),
potential_peer_last_connection_disposition last_connection_disposition = never_attempted_to_connect) :
endpoint(endpoint),
last_seen_time(last_seen_time),
last_connection_disposition(last_connection_disposition),
number_of_successful_connection_attempts(0),
number_of_failed_connection_attempts(0)
{}
};
namespace detail
{
class peer_database_impl;
class peer_database_iterator_impl;
class peer_database_iterator : public boost::iterator_facade<peer_database_iterator, const potential_peer_record, boost::forward_traversal_tag>
{
public:
peer_database_iterator();
~peer_database_iterator();
explicit peer_database_iterator(peer_database_iterator_impl* impl);
peer_database_iterator( const peer_database_iterator& c );
private:
friend class boost::iterator_core_access;
void increment();
bool equal(const peer_database_iterator& other) const;
const potential_peer_record& dereference() const;
private:
std::unique_ptr<peer_database_iterator_impl> my;
};
}
peer_database¶
class peer_database
{
public:
peer_database();
~peer_database();
void open(const fc::path& databaseFilename);
void close();
void clear();
void erase(const fc::ip::endpoint& endpointToErase);
void update_entry(const potential_peer_record& updatedRecord);
potential_peer_record lookup_or_create_entry_for_endpoint(const fc::ip::endpoint& endpointToLookup);
fc::optional<potential_peer_record> lookup_entry_for_endpoint(const fc::ip::endpoint& endpointToLookup);
typedef detail::peer_database_iterator iterator;
iterator begin() const;
iterator end() const;
size_t size() const;
private:
std::unique_ptr<detail::peer_database_impl> my;
};
stcp_socket¶
Uses ECDH to negotiate a aes key for communicating with other nodes on the network.
stcp_socket¶
class stcp_socket : public virtual fc::iostream
{
public:
stcp_socket();
~stcp_socket();
fc::tcp_socket& get_socket() { return _sock; }
void accept();
void connect_to( const fc::ip::endpoint& remote_endpoint );
void bind( const fc::ip::endpoint& local_endpoint );
virtual size_t readsome( char* buffer, size_t max );
virtual size_t readsome( const std::shared_ptr<char>& buf, size_t len, size_t offset );
virtual bool eof()const;
virtual size_t writesome( const char* buffer, size_t len );
virtual size_t writesome( const std::shared_ptr<const char>& buf, size_t len, size_t offset );
virtual void flush();
virtual void close();
using istream::get;
void get( char& c ) { read( &c, 1 ); }
fc::sha512 get_shared_secret() const { return _shared_secret; }
private:
void do_key_exchange();
fc::sha512 _shared_secret;
fc::ecc::private_key _priv_key;
fc::array<char,8> _buf;
//uint32_t _buf_len;
fc::tcp_socket _sock;
fc::aes_encoder _send_aes;
fc::aes_decoder _recv_aes;
std::shared_ptr<char> _read_buffer;
std::shared_ptr<char> _write_buffer;
#ifndef NDEBUG
bool _read_buffer_in_use;
bool _write_buffer_in_use;
#endif
};
typedef std::shared_ptr<stcp_socket> stcp_socket_ptr;