Skip to content

Commit

Permalink
[CLI] Adjust Diesel deps, improve messages, improve Docker daemon det…
Browse files Browse the repository at this point in the history
…ection
  • Loading branch information
banool committed Oct 6, 2023
1 parent 870acbe commit 6149aee
Show file tree
Hide file tree
Showing 16 changed files with 106 additions and 74 deletions.
3 changes: 1 addition & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -727,3 +727,5 @@ debug = true
serde-reflection = { git = "https://github.com/aptos-labs/serde-reflection", rev = "839aed62a20ddccf043c08961cfe74875741ccba" }
merlin = { git = "https://github.com/aptos-labs/merlin" }
x25519-dalek = { git = "https://github.com/aptos-labs/x25519-dalek", branch = "zeroize_v1" }
# More context here: https://github.com/weiznich/diesel_async/pull/121.
diesel-async = { git = "https://github.com/banool/diesel_async", rev = "4438e23c206b0ed0df4114e43e156825c2b4ab66" }
2 changes: 1 addition & 1 deletion crates/aptos/src/common/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ pub fn prompt_yes(prompt: &str) -> bool {
result.unwrap()
}

/// Convert any successful response to Success. If there is an error, show as JSON
/// Convert any successful response to Success. If there is an error, show it as JSON
/// unless `jsonify_error` is false.
pub async fn to_common_success_result<T>(
command: &str,
Expand Down
2 changes: 1 addition & 1 deletion crates/aptos/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ fn main() {
// Shutdown the runtime with a timeout. We do this to make sure that we don't sit
// here waiting forever waiting for tasks that sometimes don't want to exit on
// their own (e.g. telemetry, containers spawned by the local testnet, etc).
runtime.shutdown_timeout(Duration::from_millis(100));
runtime.shutdown_timeout(Duration::from_millis(50));

match result {
Ok(inner) => println!("{}", inner),
Expand Down
2 changes: 1 addition & 1 deletion crates/aptos/src/node/local_testnet/faucet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ impl ServiceManager for FaucetManager {
"Faucet".to_string()
}

fn get_healthchecks(&self) -> HashSet<HealthChecker> {
fn get_health_checkers(&self) -> HashSet<HealthChecker> {
hashset! {HealthChecker::http_checker_from_port(
self.config.server_config.listen_port,
self.get_name(),
Expand Down
2 changes: 1 addition & 1 deletion crates/aptos/src/node/local_testnet/health_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ impl HealthChecker {
url.clone(),
Some(Duration::from_secs(5)),
)
.await;
.await?;
let request = tonic::Request::new(GetTransactionsRequest {
starting_version: Some(0),
..Default::default()
Expand Down
72 changes: 45 additions & 27 deletions crates/aptos/src/node/local_testnet/indexer_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use tracing::{info, warn};
const INDEXER_API_CONTAINER_NAME: &str = "indexer-api";
const HASURA_IMAGE: &str = "hasura/graphql-engine:v2.33.0";

/// This Hasura metadata origintes from the aptos-indexer-processors repo.
/// This Hasura metadata originates from the aptos-indexer-processors repo.
///
/// This metadata is from revision: 1b8e14d9669258f797403e2b38da9ea5aea29e35.
///
Expand All @@ -42,9 +42,9 @@ const HASURA_METADATA: &str = include_str!("hasura_metadata.json");
pub struct IndexerApiArgs {
/// If set, we will run a postgres DB using Docker (unless
/// --use-host-postgres is set), run the standard set of indexer processors (see
/// --processors) and configure them to write to this DB, and run an API that lets
/// --processors), and configure them to write to this DB, and run an API that lets
/// you access the data they write to storage. This is opt in because it requires
/// Docker to be installed in the host system.
/// Docker to be installed on the host system.
#[clap(long, conflicts_with = "no_txn_stream")]
pub with_indexer_api: bool,

Expand Down Expand Up @@ -106,7 +106,7 @@ impl ServiceManager for IndexerApiManager {
Ok(())
}

fn get_healthchecks(&self) -> HashSet<HealthChecker> {
fn get_health_checkers(&self) -> HashSet<HealthChecker> {
hashset! {HealthChecker::Http(
Url::parse(&format!("http://127.0.0.1:{}", self.indexer_api_port)).unwrap(),
self.get_name(),
Expand All @@ -120,24 +120,6 @@ impl ServiceManager for IndexerApiManager {
async fn run_service(self: Box<Self>) -> Result<()> {
setup_docker_logging(&self.test_dir, "indexer-api", INDEXER_API_CONTAINER_NAME)?;

// Unconditionally use host.docker.internal instead of 127.0.0.1 to access the
// host system. This currently works out of the box on Docker for Desktop on
// Mac and Windows. On Linux, this requires that you bind the name to the host
// gateway, which we do below.
let postgres_connection_string = self
.postgres_connection_string
.replace("127.0.0.1", "host.docker.internal");

info!(
"Using postgres connection string: {}",
postgres_connection_string
);

let options = Some(CreateContainerOptions {
name: INDEXER_API_CONTAINER_NAME,
..Default::default()
});

let exposed_ports = Some(hashmap! {self.indexer_api_port.to_string() => hashmap!{}});
let mut host_config = HostConfig {
port_bindings: Some(hashmap! {
Expand All @@ -149,9 +131,42 @@ impl ServiceManager for IndexerApiManager {
..Default::default()
};

if cfg!(target_os = "linux") {
host_config.extra_hosts = Some(vec!["host.docker.internal:host-gateway".to_string()]);
}
let docker = get_docker()?;

// When using Docker Desktop you can and indeed must use the magic hostname
// host.docker.internal in order to access localhost on the host system from
// within the container. This also theoretically works without Docker Desktop,
// but you have to manually add the name to /etc/hosts in the container, and in
// my experience even that doesn't work sometimes. So when in a Docker Desktop
// environment we replace 127.0.0.1 with host.docker.internal, whereas in other
// environments we still use 127.0.0.1 and use host networking mode.
//
// In practice, this means we do the replacement when on MacOS or Windows, both
// standard (NT) and WSL and we don't do it on Linux / when running from within
// a container. But checking for OS is not accurate, since for example we must
// do the replacement when running in WSL configured to use the host Docker
// daemon but not when running in WSL configured to use Docker from within the
// WSL environment. So instead of checking for OS we check the name of the
// Docker daemon.
let info = docker
.info()
.await
.context("Failed to get info about Docker daemon")?;
let is_docker_desktop = info.name == Some("docker-desktop".to_string());
let postgres_connection_string = if is_docker_desktop {
info!("Running with Docker Desktop, using host.docker.internal");
self.postgres_connection_string
.replace("127.0.0.1", "host.docker.internal")
} else {
info!("Not running with Docker Desktop, using host networking mode");
host_config.network_mode = Some("host".to_string());
self.postgres_connection_string
};

info!(
"Using postgres connection string: {}",
postgres_connection_string
);

let config = Config {
image: Some(HASURA_IMAGE.to_string()),
Expand All @@ -173,9 +188,12 @@ impl ServiceManager for IndexerApiManager {
..Default::default()
};

info!("Starting indexer API with this config: {:#?}", config);
let options = Some(CreateContainerOptions {
name: INDEXER_API_CONTAINER_NAME,
..Default::default()
});

let docker = get_docker()?;
info!("Starting indexer API with this config: {:?}", config);

let id = docker.create_container(options, config).await?.id;

Expand Down
31 changes: 17 additions & 14 deletions crates/aptos/src/node/local_testnet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,11 +191,11 @@ impl CliCommand<()> for RunLocalTestnet {
info!("Created test directory: {:?}", test_dir);
}

// Set up logging for anything that uses tracing. These logs will go to
// different directories based on the name of the runtime.
if self.log_to_stdout {
setup_logging(None);
} else {
// Set up logging for anything that uses tracing. These logs will go to
// different directories based on the name of the runtime.
let td = test_dir.clone();
let make_writer = move || {
ThreadNameMakeWriter::new(td.clone()).make_writer() as Box<dyn std::io::Write>
Expand All @@ -208,7 +208,7 @@ impl CliCommand<()> for RunLocalTestnet {
// Build the node manager. We do this unconditionally.
let node_manager = NodeManager::new(&self, test_dir.clone())
.context("Failed to build node service manager")?;
let node_health_checkers = node_manager.get_healthchecks();
let node_health_checkers = node_manager.get_health_checkers();

// If configured to do so, build the faucet manager.
if !self.faucet_args.no_faucet {
Expand All @@ -225,7 +225,7 @@ impl CliCommand<()> for RunLocalTestnet {
if self.indexer_api_args.with_indexer_api {
let postgres_manager = postgres::PostgresManager::new(&self, test_dir.clone())
.context("Failed to build postgres service manager")?;
let postgres_health_checkers = postgres_manager.get_healthchecks();
let postgres_health_checkers = postgres_manager.get_health_checkers();
managers.push(Box::new(postgres_manager));

let processor_preqrequisite_healthcheckers =
Expand All @@ -241,9 +241,12 @@ impl CliCommand<()> for RunLocalTestnet {
)
.context("Failed to build processor service managers")?;

// We have already ensured that at least one processor is used when
// building the processor managers with `many_new`.
let processor_health_checkers = processor_managers[0].get_healthchecks();
// All processors return the same health checkers so we only need to call
// `get_health_checkers` for one of them. This is a bit of a leaky abstraction
// but it works well enough for now. Note: We have already ensured that at
// least one processor is used when building the processor managers with
// `many_new`.
let processor_health_checkers = processor_managers[0].get_health_checkers();

let mut processor_managers = processor_managers
.into_iter()
Expand All @@ -267,8 +270,10 @@ impl CliCommand<()> for RunLocalTestnet {

// Get the healthcheckers from all the managers. We'll pass to this
// `wait_for_startup`.
let health_checkers: HashSet<HealthChecker> =
managers.iter().flat_map(|m| m.get_healthchecks()).collect();
let health_checkers: HashSet<HealthChecker> = managers
.iter()
.flat_map(|m| m.get_health_checkers())
.collect();

// The final manager we add is the ready server. This must happen last since
// it use the health checkers from all the other services.
Expand All @@ -277,7 +282,8 @@ impl CliCommand<()> for RunLocalTestnet {
health_checkers.clone(),
)?));

// Collect steps to run on shutdown. We run these in reverse.
// Collect steps to run on shutdown. We run these in reverse. This is somewhat
// arbitrary, each shutdown step should work no matter the order it is run in.
let shutdown_steps: Vec<Box<dyn ShutdownStep>> = managers
.iter()
.flat_map(|m| m.get_shutdown_steps())
Expand Down Expand Up @@ -351,17 +357,14 @@ impl CliCommand<()> for RunLocalTestnet {
Err(err) => err.id(),
};

// Because we added the ctrl-c task last, we can figure out if that was the one
// that ended based on `finished_future_index`. We modify our messaging and the
// return value based on this.
let was_ctrl_c = finished_task_id == ctrl_c_task_id;
if was_ctrl_c {
eprintln!("\nReceived ctrl-c, running shutdown steps...");
} else {
eprintln!("\nOne of the futures exited unexpectedly, running shutdown steps...");
}

// At this point replace the ctrl-c handler so the user can kill the CLI
// At this point register another ctrl-c handler so the user can kill the CLI
// instantly if they send the signal twice.
tokio::spawn(async move {
tokio::signal::ctrl_c()
Expand Down
2 changes: 1 addition & 1 deletion crates/aptos/src/node/local_testnet/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ impl ServiceManager for NodeManager {
/// We return health checkers for both the Node API and the txn stream (if enabled).
/// As it is now, it is fine to make downstream services wait for both but if that
/// changes we can refactor.
fn get_healthchecks(&self) -> HashSet<HealthChecker> {
fn get_health_checkers(&self) -> HashSet<HealthChecker> {
let node_api_url = self.get_node_api_url();
let mut checkers = HashSet::new();
checkers.insert(HealthChecker::NodeApi(node_api_url));
Expand Down
15 changes: 8 additions & 7 deletions crates/aptos/src/node/local_testnet/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ const POSTGRES_DEFAULT_PORT: u16 = 5432;
#[derive(Clone, Debug, Parser)]
pub struct PostgresArgs {
/// This is the database to connect to, both when --use-host-postgres is set
/// and when it is not (i.e. when postgres is running in a container).
/// and when it is not (when postgres is running in a container).
#[clap(long, default_value = "local_testnet")]
pub postgres_database: String,

Expand All @@ -46,12 +46,12 @@ pub struct PostgresArgs {
pub postgres_port: u16,

/// If set, connect to the postgres instance specified by the rest of the
/// `postgres_args` (e.g. --host-postgres-host, --host-postgres-user, etc) rather
/// than running a new one with Docker. This can be used to connect to an existing
/// postgres instance running on the host system. Do not include the database.
/// `postgres_args` (e.g. --host-postgres-port) rather than running an instance
/// with Docker. This can be used to connect to an existing postgres instance
/// running on the host system.
///
/// WARNING: Any existing database it finds (based on --postgres-database) will be
/// dropped.
/// dropped and recreated.
#[clap(long, requires = "with_indexer_api")]
pub use_host_postgres: bool,

Expand All @@ -73,7 +73,7 @@ impl PostgresArgs {
}

/// Get the connection string for the postgres database. If `database` is specified
/// we will use that rather than `postgres_database`.
/// we will use that rather than `self.postgres_database`.
pub fn get_connection_string(&self, database: Option<&str>) -> String {
let password = match self.use_host_postgres {
true => match &self.host_postgres_password {
Expand Down Expand Up @@ -171,7 +171,7 @@ impl ServiceManager for PostgresManager {
Ok(())
}

fn get_healthchecks(&self) -> HashSet<HealthChecker> {
fn get_health_checkers(&self) -> HashSet<HealthChecker> {
hashset! {HealthChecker::Postgres(
self.args.get_connection_string(None),
)}
Expand Down Expand Up @@ -213,6 +213,7 @@ impl ServiceManager for PostgresManager {
exposed_ports,
host_config,
env: Some(vec![
// We run postgres without any auth + no password.
"POSTGRES_HOST_AUTH_METHOD=trust".to_string(),
format!("POSTGRES_USER={}", self.args.postgres_user),
format!("POSTGRES_DB={}", self.args.postgres_database),
Expand Down
13 changes: 7 additions & 6 deletions crates/aptos/src/node/local_testnet/processors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ static RUN_MIGRATIONS_ONCE: OnceCell<bool> = OnceCell::const_new();
pub struct ProcessorArgs {
/// The value of this flag determines which processors we will run if
/// --with-indexer-api is set. Note that some processors are not supported in the
/// local testnet (e.g. ANS). If you try to set those, an error will be thrown
/// local testnet (e.g. ANS). If you try to set those an error will be thrown
/// immediately.
#[clap(
long,
Expand Down Expand Up @@ -77,7 +77,7 @@ impl ProcessorManager {
ProcessorName::StakeProcessor => ProcessorConfig::StakeProcessor,
ProcessorName::TokenProcessor => {
ProcessorConfig::TokenProcessor(TokenProcessorConfig {
// This NFT points thing doesn't exist on local testnets.
// This NFT points contract doesn't exist on local testnets.
nft_points_contract: None,
})
},
Expand Down Expand Up @@ -150,7 +150,7 @@ impl ServiceManager for ProcessorManager {
)
}

fn get_healthchecks(&self) -> HashSet<HealthChecker> {
fn get_health_checkers(&self) -> HashSet<HealthChecker> {
hashset![HealthChecker::Http(
Url::parse(&format!(
"http://127.0.0.1:{}",
Expand All @@ -173,9 +173,10 @@ impl ServiceManager for ProcessorManager {
// https://stackoverflow.com/q/54351783/3846032
//
// To fix this, we run the migrations ourselves here in the CLI first. We use
// std::sync::Once to make sure we only run the migration once even though we
// call all the processor service managers get to this point. This is safer
// than relying on coordiation outside of this manager.
// OnceCell to make sure we only run the migration once. When all the processor
// ServiceManagers reach this point, one of them will run the code and the rest
// will wait. Doing it at this point in the code is safer than relying on
// coordiation outside of this manager.
RUN_MIGRATIONS_ONCE
.get_or_init(|| async {
info!("Running DB migrations for the indexer processors");
Expand Down
2 changes: 1 addition & 1 deletion crates/aptos/src/node/local_testnet/ready_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ impl ServiceManager for ReadyServerManager {
"Ready Server".to_string()
}

fn get_healthchecks(&self) -> HashSet<HealthChecker> {
fn get_health_checkers(&self) -> HashSet<HealthChecker> {
// We don't health check the service that exposes health checks.
hashset! {}
}
Expand Down
6 changes: 3 additions & 3 deletions crates/aptos/src/node/local_testnet/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@ pub trait ServiceManager: Debug + Send + Sync + 'static {
/// can use to make sure prerequisite services have started. These are also used
/// by the "ready server", a server that exposes a unified endpoint for checking
/// if all services are ready.
fn get_healthchecks(&self) -> HashSet<HealthChecker>;
fn get_health_checkers(&self) -> HashSet<HealthChecker>;

/// Whereas get_healthchecks returns healthchecks that other downstream services can
/// use, this should return health checkers for services that this service is
/// Whereas get_health_checkers returns healthchecks that other downstream services
/// can use, this should return health checkers for services that this service is
/// waiting to start.
//
// Note: If we were using an object oriented language, we'd just make the
Expand Down

0 comments on commit 6149aee

Please sign in to comment.