Skip to content

Commit

Permalink
[CLI] Add support for running postgres in a container to the local te…
Browse files Browse the repository at this point in the history
…stnet
  • Loading branch information
banool committed Oct 1, 2023
1 parent 12597d6 commit 57b3c28
Show file tree
Hide file tree
Showing 8 changed files with 546 additions and 33 deletions.
143 changes: 133 additions & 10 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,7 @@ diesel = { version = "2.1.0", features = [
"numeric",
"serde_json",
] }
diesel-async = { version = "0.4", features = ["postgres", "tokio"] }
diesel_migrations = { version = "2.1.0", features = ["postgres"] }
digest = "0.9.0"
dir-diff = "0.3.2"
Expand Down
4 changes: 4 additions & 0 deletions crates/aptos/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ clap = { workspace = true, features = ["env", "unstable-styles"] }
clap_complete = { workspace = true }
codespan-reporting = { workspace = true }
dashmap = { workspace = true }
diesel = { workspace = true, features = [
"postgres_backend",
] }
diesel-async = { workspace = true }
dirs = { workspace = true }
futures = { workspace = true }
hex = { workspace = true }
Expand Down
35 changes: 19 additions & 16 deletions crates/aptos/src/node/local_testnet/health_checker.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

use crate::common::types::{CliError, CliTypedResult};
use anyhow::Context;
use anyhow::{anyhow, Context, Result};
use aptos_protos::indexer::v1::GetTransactionsRequest;
use diesel_async::{pg::AsyncPgConnection, AsyncConnection};
use futures::StreamExt;
use reqwest::Url;
use serde::Serialize;
Expand All @@ -23,10 +23,12 @@ pub enum HealthChecker {
NodeApi(Url),
/// Check that a data service GRPC stream is up.
DataServiceGrpc(Url),
/// Check that a postgres instance is up.
Postgres(String),
}

impl HealthChecker {
pub async fn check(&self) -> CliTypedResult<()> {
pub async fn check(&self) -> Result<()> {
match self {
HealthChecker::Http(url, _) => {
reqwest::get(Url::clone(url))
Expand Down Expand Up @@ -54,19 +56,18 @@ impl HealthChecker {
client
.get_transactions(request)
.await
.map_err(|err| {
CliError::UnexpectedError(format!("GRPC connection error: {:#}", err))
})?
.context("GRPC connection error")?
.into_inner()
.next()
.await
.context("Did not receive init signal from data service GRPC stream")?
.map_err(|err| {
CliError::UnexpectedError(format!(
"Error processing first message from GRPC stream: {:#}",
err
))
})?;
.context("Error processing first message from GRPC stream")?;
Ok(())
},
HealthChecker::Postgres(connection_string) => {
AsyncPgConnection::establish(connection_string)
.await
.context("Failed to connect to postgres")?;
Ok(())
},
}
Expand All @@ -77,7 +78,7 @@ impl HealthChecker {
&self,
// The service, if any, waiting for this service to start up.
waiting_service: Option<&str>,
) -> CliTypedResult<()> {
) -> Result<()> {
let prefix = self.to_string();
wait_for_startup(|| self.check(), match waiting_service {
Some(waiting_service) => {
Expand All @@ -98,6 +99,7 @@ impl HealthChecker {
HealthChecker::Http(url, _) => url.as_str(),
HealthChecker::NodeApi(url) => url.as_str(),
HealthChecker::DataServiceGrpc(url) => url.as_str(),
HealthChecker::Postgres(url) => url.as_str(),
}
}

Expand All @@ -116,14 +118,15 @@ impl std::fmt::Display for HealthChecker {
HealthChecker::Http(_, name) => write!(f, "{}", name),
HealthChecker::NodeApi(_) => write!(f, "Node API"),
HealthChecker::DataServiceGrpc(_) => write!(f, "Transaction stream"),
HealthChecker::Postgres(_) => write!(f, "Postgres"),
}
}
}

async fn wait_for_startup<F, Fut>(check_fn: F, error_message: String) -> CliTypedResult<()>
async fn wait_for_startup<F, Fut>(check_fn: F, error_message: String) -> Result<()>
where
F: Fn() -> Fut,
Fut: futures::Future<Output = CliTypedResult<()>>,
Fut: futures::Future<Output = Result<()>>,
{
let max_wait = Duration::from_secs(MAX_WAIT_S);
let wait_interval = Duration::from_millis(WAIT_INTERVAL_MS);
Expand All @@ -150,7 +153,7 @@ where
Some(last_error_message) => format!("{}: {}", error_message, last_error_message),
None => error_message,
};
return Err(CliError::UnexpectedError(error_message));
return Err(anyhow!(error_message));
}

Ok(())
Expand Down
16 changes: 16 additions & 0 deletions crates/aptos/src/node/local_testnet/indexer_api.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

use clap::Parser;

/// Args related to running an indexer API for the local testnet.
#[derive(Debug, Parser)]
pub struct IndexerApiArgs {
/// If set, we will run a postgres DB using Docker (unless
/// --use-host-postgres is set), run the standard set of indexer processors (see
/// --processors) and configure them to write to this DB, and run an API that lets
/// you access the data they write to storage. This is opt in because it requires
/// Docker to be installed in the host system.
#[clap(long, conflicts_with = "no_txn_stream")]
pub with_indexer_api: bool,
}

0 comments on commit 57b3c28

Please sign in to comment.