Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Break up Client trait #7387

Merged
merged 1 commit into from Feb 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
239 changes: 123 additions & 116 deletions crates/turborepo-api-client/src/lib.rs
Expand Up @@ -37,14 +37,29 @@ pub trait Client {
async fn get_teams(&self, token: &str) -> Result<TeamsResponse>;
async fn get_team(&self, token: &str, team_id: &str) -> Result<Option<Team>>;
fn add_ci_header(request_builder: RequestBuilder) -> RequestBuilder;
async fn get_caching_status(
async fn get_spaces(&self, token: &str, team_id: Option<&str>) -> Result<SpacesResponse>;
async fn verify_sso_token(&self, token: &str, token_name: &str) -> Result<VerifiedSsoUser>;
async fn handle_403(response: Response) -> Error;
fn make_url(&self, endpoint: &str) -> Result<Url>;
}

#[async_trait]
pub trait CacheClient {
async fn get_artifact(
&self,
hash: &str,
token: &str,
team_id: Option<&str>,
team_slug: Option<&str>,
) -> Result<CachingStatusResponse>;
async fn get_spaces(&self, token: &str, team_id: Option<&str>) -> Result<SpacesResponse>;
async fn verify_sso_token(&self, token: &str, token_name: &str) -> Result<VerifiedSsoUser>;
method: Method,
) -> Result<Option<Response>>;
async fn fetch_artifact(
&self,
hash: &str,
token: &str,
team_id: Option<&str>,
team_slug: Option<&str>,
) -> Result<Option<Response>>;
#[allow(clippy::too_many_arguments)]
async fn put_artifact(
&self,
Expand All @@ -56,30 +71,19 @@ pub trait Client {
team_id: Option<&str>,
team_slug: Option<&str>,
) -> Result<()>;
async fn handle_403(response: Response) -> Error;
async fn fetch_artifact(
&self,
hash: &str,
token: &str,
team_id: Option<&str>,
team_slug: Option<&str>,
) -> Result<Option<Response>>;
async fn artifact_exists(
&self,
hash: &str,
token: &str,
team_id: Option<&str>,
team_slug: Option<&str>,
) -> Result<Option<Response>>;
async fn get_artifact(
async fn get_caching_status(
&self,
hash: &str,
token: &str,
team_id: Option<&str>,
team_slug: Option<&str>,
method: Method,
) -> Result<Option<Response>>;
fn make_url(&self, endpoint: &str) -> Result<Url>;
) -> Result<CachingStatusResponse>;
}

#[async_trait]
Expand Down Expand Up @@ -164,28 +168,6 @@ impl Client for APIClient {
request_builder
}

async fn get_caching_status(
&self,
token: &str,
team_id: Option<&str>,
team_slug: Option<&str>,
) -> Result<CachingStatusResponse> {
let request_builder = self
.client
.get(self.make_url("/v8/artifacts/status")?)
.header("User-Agent", self.user_agent.clone())
.header("Content-Type", "application/json")
.header("Authorization", format!("Bearer {}", token));

let request_builder = Self::add_team_params(request_builder, team_id, team_slug);

let response = retry::make_retryable_request(request_builder)
.await?
.error_for_status()?;

Ok(response.json().await?)
}

async fn get_spaces(&self, token: &str, team_id: Option<&str>) -> Result<SpacesResponse> {
// create url with teamId if provided
let endpoint = match team_id {
Expand Down Expand Up @@ -226,64 +208,6 @@ impl Client for APIClient {
})
}

#[tracing::instrument(skip_all)]
async fn put_artifact(
&self,
hash: &str,
artifact_body: &[u8],
duration: u64,
tag: Option<&str>,
token: &str,
team_id: Option<&str>,
team_slug: Option<&str>,
) -> Result<()> {
let mut request_url = self.make_url(&format!("/v8/artifacts/{}", hash))?;
let mut allow_auth = true;

if self.use_preflight {
let preflight_response = self
.do_preflight(
token,
request_url.clone(),
"PUT",
"Authorization, Content-Type, User-Agent, x-artifact-duration, x-artifact-tag",
)
.await?;

allow_auth = preflight_response.allow_authorization_header;
request_url = preflight_response.location.clone();
}

let mut request_builder = self
.client
.put(request_url)
.header("Content-Type", "application/octet-stream")
.header("x-artifact-duration", duration.to_string())
.header("User-Agent", self.user_agent.clone())
.body(artifact_body.to_vec());

if allow_auth {
request_builder = request_builder.header("Authorization", format!("Bearer {}", token));
}

request_builder = Self::add_team_params(request_builder, team_id, team_slug);

request_builder = Self::add_ci_header(request_builder);

if let Some(tag) = tag {
request_builder = request_builder.header("x-artifact-tag", tag);
}

let response = retry::make_retryable_request(request_builder).await?;

if response.status() == StatusCode::FORBIDDEN {
return Err(Self::handle_403(response).await);
}

response.error_for_status()?;
Ok(())
}

async fn handle_403(response: Response) -> Error {
#[derive(Deserialize)]
struct WrappedAPIError {
Expand Down Expand Up @@ -331,16 +255,57 @@ impl Client for APIClient {
}
}

#[tracing::instrument(skip_all)]
async fn fetch_artifact(
fn make_url(&self, endpoint: &str) -> Result<Url> {
let url = format!("{}{}", self.base_url, endpoint);
Url::parse(&url).map_err(|err| Error::InvalidUrl { url, err })
}
}

#[async_trait]
impl CacheClient for APIClient {
async fn get_artifact(
&self,
hash: &str,
token: &str,
team_id: Option<&str>,
team_slug: Option<&str>,
method: Method,
) -> Result<Option<Response>> {
self.get_artifact(hash, token, team_id, team_slug, Method::GET)
.await
let mut request_url = self.make_url(&format!("/v8/artifacts/{}", hash))?;
let mut allow_auth = true;

if self.use_preflight {
let preflight_response = self
.do_preflight(
token,
request_url.clone(),
"GET",
"Authorization, User-Agent",
)
.await?;

allow_auth = preflight_response.allow_authorization_header;
request_url = preflight_response.location;
};

let mut request_builder = self
.client
.request(method, request_url)
.header("User-Agent", self.user_agent.clone());

if allow_auth {
request_builder = request_builder.header("Authorization", format!("Bearer {}", token));
}

request_builder = Self::add_team_params(request_builder, team_id, team_slug);

let response = retry::make_retryable_request(request_builder).await?;

match response.status() {
StatusCode::FORBIDDEN => Err(Self::handle_403(response).await),
StatusCode::NOT_FOUND => Ok(None),
_ => Ok(Some(response.error_for_status()?)),
}
}

#[tracing::instrument(skip_all)]
Expand All @@ -355,14 +320,29 @@ impl Client for APIClient {
.await
}

async fn get_artifact(
#[tracing::instrument(skip_all)]
async fn fetch_artifact(
&self,
hash: &str,
token: &str,
team_id: Option<&str>,
team_slug: Option<&str>,
method: Method,
) -> Result<Option<Response>> {
self.get_artifact(hash, token, team_id, team_slug, Method::GET)
.await
}

#[tracing::instrument(skip_all)]
async fn put_artifact(
&self,
hash: &str,
artifact_body: &[u8],
duration: u64,
tag: Option<&str>,
token: &str,
team_id: Option<&str>,
team_slug: Option<&str>,
) -> Result<()> {
let mut request_url = self.make_url(&format!("/v8/artifacts/{}", hash))?;
let mut allow_auth = true;

Expand All @@ -371,38 +351,65 @@ impl Client for APIClient {
.do_preflight(
token,
request_url.clone(),
"GET",
"Authorization, User-Agent",
"PUT",
"Authorization, Content-Type, User-Agent, x-artifact-duration, x-artifact-tag",
)
.await?;

allow_auth = preflight_response.allow_authorization_header;
request_url = preflight_response.location;
};
request_url = preflight_response.location.clone();
}

let mut request_builder = self
.client
.request(method, request_url)
.header("User-Agent", self.user_agent.clone());
.put(request_url)
.header("Content-Type", "application/octet-stream")
.header("x-artifact-duration", duration.to_string())
.header("User-Agent", self.user_agent.clone())
.body(artifact_body.to_vec());

if allow_auth {
request_builder = request_builder.header("Authorization", format!("Bearer {}", token));
}

request_builder = Self::add_team_params(request_builder, team_id, team_slug);

request_builder = Self::add_ci_header(request_builder);

if let Some(tag) = tag {
request_builder = request_builder.header("x-artifact-tag", tag);
}

let response = retry::make_retryable_request(request_builder).await?;

match response.status() {
StatusCode::FORBIDDEN => Err(Self::handle_403(response).await),
StatusCode::NOT_FOUND => Ok(None),
_ => Ok(Some(response.error_for_status()?)),
if response.status() == StatusCode::FORBIDDEN {
return Err(Self::handle_403(response).await);
}

response.error_for_status()?;
Ok(())
}

fn make_url(&self, endpoint: &str) -> Result<Url> {
let url = format!("{}{}", self.base_url, endpoint);
Url::parse(&url).map_err(|err| Error::InvalidUrl { url, err })
async fn get_caching_status(
&self,
token: &str,
team_id: Option<&str>,
team_slug: Option<&str>,
) -> Result<CachingStatusResponse> {
let request_builder = self
.client
.get(self.make_url("/v8/artifacts/status")?)
.header("User-Agent", self.user_agent.clone())
.header("Content-Type", "application/json")
.header("Authorization", format!("Bearer {}", token));

let request_builder = Self::add_team_params(request_builder, team_id, team_slug);

let response = retry::make_retryable_request(request_builder)
.await?
.error_for_status()?;

Ok(response.json().await?)
}
}

Expand Down