Skip to content

Commit

Permalink
tell the pipeline to stop when it's done
Browse files Browse the repository at this point in the history
 * the tests properly sync
  • Loading branch information
Rjected committed Jan 20, 2023
1 parent b6652a5 commit afe1e75
Show file tree
Hide file tree
Showing 5 changed files with 82 additions and 50 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion crates/tests/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ reth-stages = { path = "../stages" }
reth-db = { path = "../storage/db" }
reth-consensus = { path = "../consensus" }
reth-cli-utils = { path = "../cli/utils" }
reth-interfaces = { path = "../interfaces" }
reth-interfaces = { path = "../interfaces", features = ["test-utils"] }
reth-downloaders = { path = "../net/downloaders" }
reth-network = { path = "../net/network", features = ["test-utils", "serde"] }
reth-eth-wire = { path = "../net/eth-wire" }
Expand Down Expand Up @@ -55,3 +55,4 @@ tracing = "0.1"
# misc
hex = "0.4"
tempfile = "3.3"
serial_test = "0.10"
63 changes: 38 additions & 25 deletions crates/tests/src/clique.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,12 @@ use ethers_core::{
use ethers_middleware::SignerMiddleware;
use ethers_providers::{Middleware, Provider, Ws};
use ethers_signers::{LocalWallet, Signer, Wallet};
use reth_eth_wire::Status;
use reth_network::test_utils::{enr_to_peer_id, unused_port};
use reth_primitives::PeerId;
use std::{
collections::HashMap,
io::{BufRead, BufReader},
net::SocketAddr,
time::Duration,
};
use tracing::trace;

Expand Down Expand Up @@ -198,10 +196,7 @@ impl CliqueGethBuilder {
.disable_discovery()
.insecure_unlock();

// create a compatible status
let status = Status::from(genesis.clone());

CliqueGethInstance::new(geth, signer, status, genesis).await
CliqueGethInstance::new(geth, signer, genesis).await
}
}

Expand All @@ -217,10 +212,6 @@ pub(crate) struct CliqueGethInstance {
/// The private key used for signing clique blocks and transactions.
pub(crate) signer: SigningKey,

/// The [`Status`](reth_eth_wire::Status) extracted from the configured geth
/// [`Genesis`](ethers_core::utils::Genesis).
pub(crate) status: Status,

/// The local [`Genesis`](ethers_core::utils::Genesis) used to configure geth.
pub(crate) genesis: Genesis,

Expand All @@ -238,12 +229,7 @@ impl CliqueGethInstance {
/// block production.
///
/// This also spawns the geth instance.
pub(crate) async fn new(
geth: Geth,
signer: SigningKey,
status: Status,
genesis: Genesis,
) -> Self {
pub(crate) async fn new(geth: Geth, signer: SigningKey, genesis: Genesis) -> Self {
// spawn the geth instance
let instance = geth.spawn();

Expand All @@ -256,7 +242,7 @@ impl CliqueGethInstance {
let provider =
SignerMiddleware::new_with_provider_chain(provider, wallet.clone()).await.unwrap();

Self { instance, signer, status, genesis, provider }
Self { instance, signer, genesis, provider }
}

/// Enable mining on the clique geth instance by importing and unlocking the signer account
Expand Down Expand Up @@ -289,22 +275,48 @@ impl CliqueGethInstance {

/// Prints the logs of the [`Geth`](ethers_core::utils::Geth) instance in a new
/// [`task`](tokio::task).
pub(crate) fn print_logs(&mut self) {
pub(crate) async fn print_logs(&mut self) {
// take the stderr of the geth instance and print it
let stderr = self.instance.stderr().unwrap();

// print logs in a new task
tokio::task::spawn(async move {
let mut err_reader = BufReader::new(stderr);
let mut err_reader = BufReader::new(stderr);

tokio::spawn(async move {
loop {
let mut buf = String::new();
if let Ok(line) = err_reader.read_line(&mut buf) {
if let (Ok(line), line_str) = {
let mut buf = String::new();
(err_reader.read_line(&mut buf), buf.clone())
} {
if line == 0 {
tokio::time::sleep(Duration::from_nanos(1)).await;
continue
break
}
if !line_str.is_empty() {
dbg!(line_str);
}
}
}
});
}

/// Prevents the [`Geth`](ethers_core::utils::Geth) instance from blocking due to the `stderr`
/// filling up.
pub(crate) async fn prevent_blocking(&mut self) {
// take the stderr of the geth instance and print it
let stderr = self.instance.stderr().unwrap();

// print logs in a new task
let mut err_reader = BufReader::new(stderr);

tokio::spawn(async move {
loop {
if let (Ok(line), _line_str) = {
let mut buf = String::new();
(err_reader.read_line(&mut buf), buf.clone())
} {
if line == 0 {
break
}
dbg!(buf);
}
}
});
Expand Down Expand Up @@ -351,6 +363,7 @@ impl CliqueGethInstance {

/// Returns the chain tip hash of the [`Geth`](ethers_core::utils::Geth) instance by calling
/// from geth's `eth_getBlock`.
#[allow(dead_code)]
pub(crate) async fn tip_hash(&self) -> reth_primitives::H256 {
self.tip().await.hash.unwrap().0.into()
}
Expand Down
33 changes: 16 additions & 17 deletions crates/tests/src/reth_builder.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
//! Builder for a reth test instance.

use reth_cli_utils::init::init_genesis;
use reth_consensus::BeaconConsensus;
use reth_db::database::Database;
use reth_downloaders::{bodies, headers};
use reth_interfaces::consensus::ForkchoiceState;
use reth_interfaces::test_utils::TestConsensus;
use reth_network::NetworkHandle;
use reth_primitives::{ChainSpec, H256};
use reth_stages::{
Expand All @@ -17,13 +16,12 @@ use reth_stages::{
};
use reth_tracing::tracing::{debug, info};
use std::sync::Arc;
use tokio::sync::watch::error::SendError;

use crate::stage_config::StageConfig;

/// Reth test instance
pub(crate) struct RethTestInstance<DB> {
pub(crate) consensus: Arc<BeaconConsensus>,
pub(crate) consensus: Arc<TestConsensus>,
pub(crate) network: NetworkHandle,
pub(crate) db: Arc<DB>,
pub(crate) chain_spec: ChainSpec,
Expand All @@ -38,7 +36,10 @@ where
/// Start the reth sync pipeline
pub(crate) async fn start(&self) -> Result<(), RethTestInstanceError> {
// make sure to init genesis if not done already
let _genesis_hash = init_genesis(self.db.clone(), self.chain_spec.genesis().clone())?;
let genesis_hash = init_genesis(self.db.clone(), self.chain_spec.clone())?;
if genesis_hash != self.chain_spec.genesis_hash() {
return Err(RethTestInstanceError::GenesisMismatch)
}

// start pipeline
let fetch_client = Arc::new(self.network.fetch_client().await.unwrap());
Expand Down Expand Up @@ -78,20 +79,18 @@ where
.push(ExecutionStage {
chain_spec: self.chain_spec.clone(),
commit_threshold: self.config.execution.commit_threshold,
});
})
.with_max_block(Some(0));

if let Some(tip) = self.tip {
debug!("Tip manually set: {}", tip);
self.consensus.notify_fork_choice_state(ForkchoiceState {
head_block_hash: tip,
safe_block_hash: tip,
finalized_block_hash: tip,
})?;
self.consensus.update_tip(tip);
}

// Run pipeline
info!("Starting pipeline");
pipeline.run(self.db.clone()).await?;
info!("Pipeline finished");
Ok(())
}
}
Expand All @@ -103,20 +102,20 @@ pub(crate) enum RethTestInstanceError {
#[error("Error while initializing the genesis block: {0}")]
GenesisInitError(#[from] reth_db::Error),

/// Error while notifying consensus listeners of a fork choice state update.
#[error("Error while notifying consensus listeners of a fork choice state update: {0}")]
ForkChoiceStateUpdateError(#[from] SendError<ForkchoiceState>),

/// Error while running the reth pipeline.
#[error("Error while running the reth pipeline: {0}")]
PipelineError(#[from] reth_stages::PipelineError),

/// The genesis hash of the written genesis block does not match the chain spec genesis hash.
#[error("Written genesis hash does not match chain spec genesis hash")]
GenesisMismatch,
}

// TODO: config
/// Builder for a reth test instance.
pub(crate) struct RethBuilder<DB> {
network: Option<NetworkHandle>,
consensus: Option<Arc<BeaconConsensus>>,
consensus: Option<Arc<TestConsensus>>,
db: Option<Arc<DB>>,
chain_spec: Option<ChainSpec>,
tip: Option<H256>,
Expand Down Expand Up @@ -145,7 +144,7 @@ impl<DB> RethBuilder<DB> {

/// Sets the consensus handle.
#[must_use]
pub(crate) fn consensus(mut self, consensus: Arc<BeaconConsensus>) -> Self {
pub(crate) fn consensus(mut self, consensus: Arc<TestConsensus>) -> Self {
self.consensus = Some(consensus);
self
}
Expand Down
32 changes: 25 additions & 7 deletions crates/tests/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ use ethers_core::types::{
};
use ethers_providers::Middleware;
use reth_cli_utils::init::init_db;
use reth_consensus::BeaconConsensus;
use reth_db::mdbx::{Env, WriteMap};
use reth_interfaces::test_utils::TestConsensus;
use reth_network::{
test_utils::{unused_tcp_udp, NetworkEventStream, GETH_TIMEOUT},
NetworkConfig, NetworkManager,
Expand All @@ -25,6 +25,7 @@ use tokio::fs;
///
/// Tests that are run against a real `geth` node use geth's Clique functionality to create blocks.
#[tokio::test(flavor = "multi_thread")]
#[serial_test::serial]
async fn sync_from_clique_geth() {
reth_tracing::init_test_tracing();
tokio::time::timeout(GETH_TIMEOUT, async move {
Expand All @@ -40,18 +41,21 @@ async fn sync_from_clique_geth() {

// build the funded geth
let mut clique_instance = clique_geth.build().await;
tracing::info!("clique instance built");

// print the logs in a new task
clique_instance.print_logs();
clique_instance.print_logs().await;

// get geth to start producing blocks - use a blank password
clique_instance.enable_mining("".into()).await;
tracing::info!("enabled mining");

// === check that we have the same genesis hash ===

// get the chainspec from the genesis we configured for geth
let mut chainspec: ChainSpec = clique_instance.genesis.clone().into();
let remote_genesis = SealedHeader::from(clique_instance.genesis().await);
tracing::info!("got remote genesis");

let mut local_genesis_header = Header::from(chainspec.genesis().clone());

Expand Down Expand Up @@ -79,16 +83,23 @@ async fn sync_from_clique_geth() {
.value(1u64)
.nonce(nonce))
});
tracing::info!("generated tranactions");

// finally send the txs to geth
clique_instance.send_requests(txs).await;
tracing::info!("sent requests");

// wait for a certain number of blocks to be mined
let block = clique_instance.provider.get_block_number().await.unwrap();
assert!(block > U64::zero());

// get the current tip hash for pipeline configuration
let tip_hash = clique_instance.tip_hash().await;
let tip = clique_instance.tip().await;
let tip_hash = tip.hash.unwrap().0.into();

tracing::info!(genesis_hash = ?chainspec.genesis_hash, "genesis hash");
tracing::info!(tip_hash = ?tip_hash, "tip hash");
tracing::info!(tip_number = ?tip.number, "tip number");

// === initialize reth networking stack ===

Expand All @@ -114,7 +125,7 @@ async fn sync_from_clique_geth() {
let db = Arc::new(init_db(reth_temp_dir.path()).unwrap());

// initialize consensus
let consensus = Arc::new(BeaconConsensus::new(chainspec.clone()));
let consensus = Arc::new(TestConsensus::default());

// build reth and start the pipeline
let reth: RethTestInstance<Env<WriteMap>> = RethBuilder::new()
Expand All @@ -126,7 +137,10 @@ async fn sync_from_clique_geth() {
.build();

// start reth then manually connect geth
let pipeline_handle = tokio::task::spawn(async move { reth.start().await });
let pipeline_handle = tokio::task::spawn(async move {
reth.start().await.unwrap();
});

tokio::task::spawn(network);

// create networkeventstream to get the next session established event easily
Expand All @@ -145,7 +159,10 @@ async fn sync_from_clique_geth() {
// wait for the session to be established
let _peer_id = events.peer_added_and_established().await.unwrap();

pipeline_handle.await.unwrap().unwrap();
tracing::info!("waiting for pipeline to finish");
pipeline_handle.await.unwrap();

drop(clique_instance);

// cleanup (delete the data_dir at dir_path)
fs::remove_dir_all(dir_path).await.unwrap();
Expand All @@ -155,6 +172,7 @@ async fn sync_from_clique_geth() {
}

#[tokio::test(flavor = "multi_thread")]
#[serial_test::serial]
async fn geth_clique_keepalive() {
reth_tracing::init_test_tracing();
tokio::time::timeout(GETH_TIMEOUT, async move {
Expand All @@ -172,7 +190,7 @@ async fn geth_clique_keepalive() {
let mut clique_instance = clique_geth.build().await;

// print the logs in a new task
clique_instance.print_logs();
clique_instance.prevent_blocking().await;

// get geth to start producing blocks - use a blank password
clique_instance.enable_mining("".into()).await;
Expand Down

0 comments on commit afe1e75

Please sign in to comment.