diff --git a/aa-core/src/signer.rs b/aa-core/src/signer.rs index cc15b30..4b86b68 100644 --- a/aa-core/src/signer.rs +++ b/aa-core/src/signer.rs @@ -85,7 +85,7 @@ impl SmartAccountSignerBuilder { .to_determined_smart_account() .await .map_err(|e| EngineError::ValidationError { - message: format!("Failed to determine smart account: {}", e), + message: format!("Failed to determine smart account: {e}"), })?, }; diff --git a/core/src/chain.rs b/core/src/chain.rs index bba79f6..0813314 100644 --- a/core/src/chain.rs +++ b/core/src/chain.rs @@ -131,7 +131,7 @@ impl ThirdwebChainConfig<'_> { // For local anvil, use localhost URLs let local_rpc_url = "http://127.0.0.1:8545"; let rpc_url = Url::parse(local_rpc_url).map_err(|e| EngineError::RpcConfigError { - message: format!("Failed to parse local anvil RPC URL: {}", e), + message: format!("Failed to parse local anvil RPC URL: {e}"), })?; // For bundler and paymaster, use the same local RPC URL @@ -149,7 +149,7 @@ impl ThirdwebChainConfig<'_> { client_id = self.client_id, )) .map_err(|e| EngineError::RpcConfigError { - message: format!("Failed to parse RPC URL: {}", e), + message: format!("Failed to parse RPC URL: {e}"), })?; let bundler_url = Url::parse(&format!( @@ -158,7 +158,7 @@ impl ThirdwebChainConfig<'_> { base_url = self.bundler_base_url, )) .map_err(|e| EngineError::RpcConfigError { - message: format!("Failed to parse Bundler URL: {}", e), + message: format!("Failed to parse Bundler URL: {e}"), })?; let paymaster_url = Url::parse(&format!( @@ -167,7 +167,7 @@ impl ThirdwebChainConfig<'_> { base_url = self.paymaster_base_url, )) .map_err(|e| EngineError::RpcConfigError { - message: format!("Failed to parse Paymaster URL: {}", e), + message: format!("Failed to parse Paymaster URL: {e}"), })?; (rpc_url, bundler_url, paymaster_url) diff --git a/core/src/error.rs b/core/src/error.rs index ae808f0..99e2255 100644 --- a/core/src/error.rs +++ b/core/src/error.rs @@ -331,22 +331,22 @@ impl From> for SerialisableAwsSdkError { fn from(err: SdkError) -> Self { match err { SdkError::ConstructionFailure(err) => SerialisableAwsSdkError::ConstructionFailure { - message: format!("{:?}", err), + message: format!("{err:?}"), }, SdkError::TimeoutError(err) => SerialisableAwsSdkError::TimeoutError { - message: format!("{:?}", err), + message: format!("{err:?}"), }, SdkError::DispatchFailure(err) => SerialisableAwsSdkError::DispatchFailure { - message: format!("{:?}", err), + message: format!("{err:?}"), }, SdkError::ResponseError(err) => SerialisableAwsSdkError::ResponseError { - message: format!("{:?}", err), + message: format!("{err:?}"), }, SdkError::ServiceError(err) => SerialisableAwsSdkError::ServiceError { - message: format!("{:?}", err), + message: format!("{err:?}"), }, _ => SerialisableAwsSdkError::Other { - message: format!("{:?}", err), + message: format!("{err:?}"), }, } } @@ -398,11 +398,8 @@ impl From for EngineError { message, details, } => match details { - Some(details) => format!( - "Enclave error: {} - {} - details: {}", - code, message, details - ), - None => format!("Enclave error: {} - {}", code, message), + Some(details) => format!("Enclave error: {code} - {message} - details: {details}"), + None => format!("Enclave error: {code} - {message}"), }, _ => err.to_string(), }; @@ -536,15 +533,15 @@ impl ContractErrorToEngineError for alloy::contract::Error { fn to_engine_error(self, chain_id: u64, contract_address: Option
) -> EngineError { let (message, kind) = match self { alloy::contract::Error::UnknownFunction(name) => ( - format!("Unknown function: {}", name), + format!("Unknown function: {name}"), ContractInteractionErrorKind::UnknownFunction { function_name: name, }, ), alloy::contract::Error::UnknownSelector(selector) => ( - format!("Unknown selector: {:?}", selector), + format!("Unknown selector: {selector:?}"), ContractInteractionErrorKind::UnknownSelector { - function_selector: format!("{:?}", selector), + function_selector: format!("{selector:?}"), }, ), alloy::contract::Error::NotADeploymentTransaction => ( @@ -556,26 +553,26 @@ impl ContractErrorToEngineError for alloy::contract::Error { ContractInteractionErrorKind::ContractNotDeployed, ), alloy::contract::Error::ZeroData(function, err) => ( - format!("Zero data returned from contract call to {}", function), + format!("Zero data returned from contract call to {function}"), ContractInteractionErrorKind::ZeroData { function, message: err.to_string(), }, ), alloy::contract::Error::AbiError(err) => ( - format!("ABI error: {}", err), + format!("ABI error: {err}"), ContractInteractionErrorKind::AbiError { message: err.to_string(), }, ), alloy::contract::Error::TransportError(err) => ( - format!("Transport error: {}", err), + format!("Transport error: {err}"), ContractInteractionErrorKind::TransportError { message: err.to_string(), }, ), alloy::contract::Error::PendingTransactionError(err) => ( - format!("Pending transaction error: {}", err), + format!("Pending transaction error: {err}"), ContractInteractionErrorKind::PendingTransactionError { message: err.to_string(), }, diff --git a/core/src/execution_options/aa.rs b/core/src/execution_options/aa.rs index 7820d52..2917b35 100644 --- a/core/src/execution_options/aa.rs +++ b/core/src/execution_options/aa.rs @@ -91,12 +91,12 @@ impl Erc4337ExecutionOptions { pub fn get_salt_data(&self) -> Result { if self.account_salt.starts_with("0x") { Bytes::from_hex(&self.account_salt).map_err(|e| EngineError::ValidationError { - message: format!("Failed to parse hex salt: {}", e), + message: format!("Failed to parse hex salt: {e}"), }) } else { let hex_string = alloy::hex::encode(&self.account_salt); Bytes::from_hex(hex_string).map_err(|e| EngineError::ValidationError { - message: format!("Failed to encode salt as hex: {}", e), + message: format!("Failed to encode salt as hex: {e}"), }) } } diff --git a/core/src/execution_options/mod.rs b/core/src/execution_options/mod.rs index 7aaeaac..671327f 100644 --- a/core/src/execution_options/mod.rs +++ b/core/src/execution_options/mod.rs @@ -211,9 +211,8 @@ mod tests { r#"{{ "url": "https://example.com/webhook", "secret": "test_secret", - "userMetadata": "{}" - }}"#, - large_metadata + "userMetadata": "{large_metadata}" + }}"# ); let webhook_options: Result = serde_json::from_str(&invalid_json); diff --git a/core/src/rpc_clients/bundler.rs b/core/src/rpc_clients/bundler.rs index 7a1a3a2..004db70 100644 --- a/core/src/rpc_clients/bundler.rs +++ b/core/src/rpc_clients/bundler.rs @@ -71,11 +71,18 @@ pub struct TwExecuteResponse { } /// Response from tw_getTransactionHash bundler method +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "camelCase", tag = "status")] +pub enum TwGetTransactionHashResponse { + Pending, + Success { transaction_hash: String }, +} + #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] -pub struct TwGetTransactionHashResponse { - /// The transaction hash - pub transaction_hash: Option, +pub enum TwGetTransactionHashStatus { + Pending, + Success, } impl BundlerClient { @@ -152,12 +159,12 @@ impl BundlerClient { pub async fn tw_get_transaction_hash( &self, transaction_id: &str, - ) -> TransportResult> { + ) -> TransportResult { let params = serde_json::json!([transaction_id]); let response: TwGetTransactionHashResponse = self.inner.request("tw_getTransactionHash", params).await?; - Ok(response.transaction_hash) + Ok(response) } } diff --git a/core/src/rpc_clients/transport.rs b/core/src/rpc_clients/transport.rs index ec38eb5..508adba 100644 --- a/core/src/rpc_clients/transport.rs +++ b/core/src/rpc_clients/transport.rs @@ -64,12 +64,12 @@ impl HeaderInjectingTransport { .map_err(TransportErrorKind::custom)?; let status = resp.status(); - debug!(%status, "received response from server"); + debug!(?status, "received response from server"); // Get response body let body = resp.bytes().await.map_err(TransportErrorKind::custom)?; debug!(bytes = body.len(), "retrieved response body"); - trace!(body = %String::from_utf8_lossy(&body), "response body"); + trace!(body = ?String::from_utf8_lossy(&body), "response body"); // Check for HTTP errors if !status.is_success() { @@ -100,7 +100,7 @@ impl Service for HeaderInjectingTransport { #[inline] fn call(&mut self, req: RequestPacket) -> Self::Future { let this = self.clone(); // Clone is cheap - just clones the Arc inside Client - let span = debug_span!("HeaderInjectingTransport", url = %this.url); + let span = debug_span!("HeaderInjectingTransport", url = ?this.url); Box::pin(this.do_request(req).instrument(span)) } } diff --git a/core/src/signer.rs b/core/src/signer.rs index a8948ca..48e75a7 100644 --- a/core/src/signer.rs +++ b/core/src/signer.rs @@ -102,12 +102,12 @@ impl Erc4337SigningOptions { pub fn get_salt_data(&self) -> Result { if self.account_salt.starts_with("0x") { Bytes::from_hex(&self.account_salt).map_err(|e| EngineError::ValidationError { - message: format!("Failed to parse hex salt: {}", e), + message: format!("Failed to parse hex salt: {e}"), }) } else { let hex_string = alloy::hex::encode(&self.account_salt); Bytes::from_hex(hex_string).map_err(|e| EngineError::ValidationError { - message: format!("Failed to encode salt as hex: {}", e), + message: format!("Failed to encode salt as hex: {e}"), }) } } @@ -308,7 +308,7 @@ impl AccountSigner for EoaSigner { let signature = signer.sign_message(&message_bytes).await.map_err(|e| { tracing::error!("Error signing message with EOA (PrivateKey): {:?}", e); EngineError::ValidationError { - message: format!("Failed to sign message: {}", e), + message: format!("Failed to sign message: {e}"), } })?; Ok(signature.to_string()) @@ -377,7 +377,7 @@ impl AccountSigner for EoaSigner { .map_err(|e| { tracing::error!("Error signing typed data with EOA (PrivateKey): {:?}", e); EngineError::ValidationError { - message: format!("Failed to sign typed data: {}", e), + message: format!("Failed to sign typed data: {e}"), } })?; Ok(signature.to_string()) @@ -447,7 +447,7 @@ impl AccountSigner for EoaSigner { .map_err(|e| { tracing::error!("Error signing transaction with EOA (PrivateKey): {:?}", e); EngineError::ValidationError { - message: format!("Failed to sign transaction: {}", e), + message: format!("Failed to sign transaction: {e}"), } })?; Ok(signature.to_string()) @@ -521,7 +521,7 @@ impl AccountSigner for EoaSigner { let signature = signer.sign_hash_sync(&authorization_hash).map_err(|e| { tracing::error!("Error signing authorization with EOA (PrivateKey): {:?}", e); EngineError::ValidationError { - message: format!("Failed to sign authorization: {}", e), + message: format!("Failed to sign authorization: {e}"), } })?; diff --git a/core/src/userop.rs b/core/src/userop.rs index e30a6ba..88365e1 100644 --- a/core/src/userop.rs +++ b/core/src/userop.rs @@ -115,7 +115,7 @@ impl UserOpSigner { ) .await .map_err(|e| EngineError::ValidationError { - message: format!("Failed to sign userop: {}", e), + message: format!("Failed to sign userop: {e}"), })?; Ok(Bytes::from_hex(&result.signature).map_err(|_| { @@ -128,7 +128,7 @@ impl UserOpSigner { let signer = creds.get_signer(Some(params.chain_id)).await?; let userophash = params.userop.hash(params.chain_id).map_err(|e| { EngineError::ValidationError { - message: format!("Failed to hash userop: {}", e), + message: format!("Failed to hash userop: {e}"), } })?; @@ -147,13 +147,13 @@ impl UserOpSigner { SigningCredential::PrivateKey(signer) => { let userophash = params.userop.hash(params.chain_id).map_err(|e| { EngineError::ValidationError { - message: format!("Failed to hash userop: {}", e), + message: format!("Failed to hash userop: {e}"), } })?; let signature = signer.sign_hash(&userophash).await.map_err(|e| { EngineError::ValidationError { - message: format!("Failed to sign userop: {}", e), + message: format!("Failed to sign userop: {e}"), } })?; diff --git a/eip7702-core/src/transaction.rs b/eip7702-core/src/transaction.rs index e493136..6e2e80a 100644 --- a/eip7702-core/src/transaction.rs +++ b/eip7702-core/src/transaction.rs @@ -214,7 +214,7 @@ impl MinimalAccountTransaction { // Serialize wrapped calls to JSON let wrapped_calls_json = serde_json::to_value(&self.wrapped_calls).map_err(|e| { EngineError::ValidationError { - message: format!("Failed to serialize wrapped calls: {}", e), + message: format!("Failed to serialize wrapped calls: {e}"), } })?; diff --git a/eip7702-core/tests/integration_tests.rs b/eip7702-core/tests/integration_tests.rs index 60f5d97..9dba63a 100644 --- a/eip7702-core/tests/integration_tests.rs +++ b/eip7702-core/tests/integration_tests.rs @@ -2,11 +2,9 @@ use std::time::Duration; use alloy::{ consensus::{SignableTransaction, TypedTransaction}, - eips::{BlockNumberOrTag, eip7702::SignedAuthorization}, - hex, + eips::eip7702::SignedAuthorization, network::{EthereumWallet, TransactionBuilder, TransactionBuilder7702, TxSigner}, - node_bindings::{Anvil, AnvilInstance}, - primitives::{Address, BlockNumber, Bytes, TxHash, U256}, + primitives::{Address, Bytes, U256}, providers::{ DynProvider, Identity, Provider, ProviderBuilder, RootProvider, ext::AnvilApi, @@ -25,7 +23,7 @@ use engine_core::{ chain::Chain, credentials::SigningCredential, error::EngineError, - signer::{AccountSigner, EoaSigner, EoaSigningOptions}, + signer::{AccountSigner, EoaSigningOptions}, transaction::InnerTransaction, }; use engine_eip7702_core::{ @@ -36,8 +34,6 @@ use engine_eip7702_core::{ use serde_json::Value; use tokio::time::sleep; -use crate::MockERC20::{MockERC20Calls, MockERC20Instance}; - // Mock ERC20 contract sol! { #[allow(missing_docs)] @@ -185,7 +181,7 @@ impl AccountSigner for MockEoaSigner { let message_bytes = _message.as_bytes(); let signature = signer.sign_message(message_bytes).await.map_err(|e| { EngineError::ValidationError { - message: format!("Failed to sign message: {}", e), + message: format!("Failed to sign message: {e}"), } })?; Ok(signature.to_string()) @@ -208,7 +204,7 @@ impl AccountSigner for MockEoaSigner { .sign_dynamic_typed_data(typed_data) .await .map_err(|e| EngineError::ValidationError { - message: format!("Failed to sign typed data: {}", e), + message: format!("Failed to sign typed data: {e}"), })?; Ok(signature.to_string()) } @@ -229,7 +225,7 @@ impl AccountSigner for MockEoaSigner { let mut tx = transaction.clone(); let signature = signer.sign_transaction(&mut tx).await.map_err(|e| { EngineError::ValidationError { - message: format!("Failed to sign transaction: {}", e), + message: format!("Failed to sign transaction: {e}"), } })?; Ok(signature.to_string()) @@ -258,7 +254,7 @@ impl AccountSigner for MockEoaSigner { let authorization_hash = authorization.signature_hash(); let signature = signer.sign_hash(&authorization_hash).await.map_err(|e| { EngineError::ValidationError { - message: format!("Failed to sign authorization: {}", e), + message: format!("Failed to sign authorization: {e}"), } })?; Ok(authorization.into_signed(signature)) @@ -374,7 +370,7 @@ impl TestSetup { let _: () = chain .provider() .client() - .request("anvil_setBalance", (address, format!("0x{:x}", balance))) + .request("anvil_setBalance", (address, format!("0x{balance:x}"))) .await?; Ok(()) @@ -395,8 +391,7 @@ impl TestSetup { .await?; println!( - "Set bytecode for minimal account implementation at {}", - MINIMAL_ACCOUNT_IMPLEMENTATION_ADDRESS + "Set bytecode for minimal account implementation at {MINIMAL_ACCOUNT_IMPLEMENTATION_ADDRESS}" ); Ok(()) diff --git a/executors/src/eip7702_executor/confirm.rs b/executors/src/eip7702_executor/confirm.rs index aaeebf7..928d157 100644 --- a/executors/src/eip7702_executor/confirm.rs +++ b/executors/src/eip7702_executor/confirm.rs @@ -1,7 +1,8 @@ -use alloy::primitives::{Address, TxHash}; +use alloy::primitives::TxHash; use alloy::providers::Provider; use alloy::rpc::types::TransactionReceipt; use engine_core::error::{AlloyRpcErrorToEngineError, EngineError}; +use engine_core::rpc_clients::TwGetTransactionHashResponse; use engine_core::{ chain::{Chain, ChainService, RpcCredentials}, execution_options::WebhookOptions, @@ -31,11 +32,7 @@ pub struct Eip7702ConfirmationJobData { pub transaction_id: String, pub chain_id: u64, pub bundler_transaction_id: String, - /// ! Deprecated todo: remove this field after all jobs are processed - pub eoa_address: Option
, - - // TODO: make non-optional after all jobs are processed - pub sender_details: Option, + pub sender_details: Eip7702Sender, pub rpc_credentials: RpcCredentials, #[serde(default)] @@ -110,7 +107,7 @@ pub enum Eip7702ConfirmationError { impl From for Eip7702ConfirmationError { fn from(error: TwmqError) -> Self { Eip7702ConfirmationError::InternalError { - message: format!("Deserialization error for job data: {}", error), + message: format!("Deserialization error for job data: {error}"), } } } @@ -174,7 +171,7 @@ where .get_chain(job_data.chain_id) .map_err(|e| Eip7702ConfirmationError::ChainServiceError { chain_id: job_data.chain_id, - message: format!("Failed to get chain instance: {}", e), + message: format!("Failed to get chain instance: {e}"), }) .map_err_fail()?; @@ -189,7 +186,7 @@ where let chain = chain.with_new_default_headers(chain_auth_headers); // 2. Get transaction hash from bundler - let transaction_hash_str = chain + let transaction_hash_res = chain .bundler_client() .tw_get_transaction_hash(&job_data.bundler_transaction_id) .await @@ -198,16 +195,19 @@ where }) .map_err_fail()?; - let transaction_hash = match transaction_hash_str { - Some(hash) => hash.parse::().map_err(|e| { - Eip7702ConfirmationError::TransactionHashError { - message: format!("Invalid transaction hash format: {}", e), - } - .fail() - })?, - None => { + let transaction_hash = match transaction_hash_res { + TwGetTransactionHashResponse::Success { transaction_hash } => { + transaction_hash.parse::().map_err(|e| { + Eip7702ConfirmationError::TransactionHashError { + message: format!("Invalid transaction hash format: {e}"), + } + .fail() + })? + } + + TwGetTransactionHashResponse::Pending => { return Err(Eip7702ConfirmationError::TransactionHashError { - message: "Transaction not found".to_string(), + message: "Transaction not yet confirmed".to_string(), }) .map_err_nack(Some(Duration::from_secs(2)), RequeuePosition::Last); } @@ -227,7 +227,7 @@ where .map_err(|e| { // If transaction not found, nack and retry Eip7702ConfirmationError::ConfirmationError { - message: format!("Failed to get transaction receipt: {}", e), + message: format!("Failed to get transaction receipt: {e}"), inner_error: Some(e.to_engine_error(&chain)), } .nack(Some(Duration::from_secs(5)), RequeuePosition::Last) @@ -262,25 +262,11 @@ where "Transaction confirmed successfully" ); - // todo: remove this after all jobs are processed - let sender_details = job_data - .sender_details - .clone() - .or_else(|| { - job_data - .eoa_address - .map(|eoa_address| Eip7702Sender::Owner { eoa_address }) - }) - .ok_or_else(|| Eip7702ConfirmationError::InternalError { - message: "No sender details found".to_string(), - }) - .map_err_fail()?; - Ok(Eip7702ConfirmationResult { transaction_id: job_data.transaction_id.clone(), transaction_hash, receipt, - sender_details, + sender_details: job_data.sender_details.clone(), }) } diff --git a/executors/src/eip7702_executor/send.rs b/executors/src/eip7702_executor/send.rs index c7f5283..302cc64 100644 --- a/executors/src/eip7702_executor/send.rs +++ b/executors/src/eip7702_executor/send.rs @@ -38,19 +38,7 @@ pub struct Eip7702SendJobData { pub transaction_id: String, pub chain_id: u64, pub transactions: Vec, - - // !IMPORTANT TODO - // To preserve backwards compatibility with pre-existing queued jobs, we continue keeping the eoa_address field until the next release - // However, we make it optional now, and rely on the Eip7702ExecutionOptions instead - pub eoa_address: Option
, - - // We must also keep the execution_options as optional to prevent deserialization errors - // when we remove the eoa_address field, we can make execution_options required - // at runtime we resolve from both, with preference to execution_options - // if both are none, we return an error - #[serde(skip_serializing_if = "Option::is_none")] - pub execution_options: Option, - + pub execution_options: Eip7702ExecutionOptions, pub signing_credential: SigningCredential, #[serde(default)] pub webhook_options: Vec, @@ -128,7 +116,7 @@ pub enum Eip7702SendError { impl From for Eip7702SendError { fn from(error: TwmqError) -> Self { Eip7702SendError::InternalError { - message: format!("Deserialization error for job data: {}", error), + message: format!("Deserialization error for job data: {error}"), } } } @@ -194,7 +182,7 @@ where .get_chain(job_data.chain_id) .map_err(|e| Eip7702SendError::ChainServiceError { chain_id: job_data.chain_id, - message: format!("Failed to get chain instance: {}", e), + message: format!("Failed to get chain instance: {e}"), }) .map_err_fail()?; @@ -208,24 +196,17 @@ where let chain = chain.with_new_default_headers(chain_auth_headers); - let owner_address = job_data - .eoa_address - .or(job_data.execution_options.as_ref().map(|e| match e { - Eip7702ExecutionOptions::Owner(o) => o.from, - Eip7702ExecutionOptions::SessionKey(s) => s.session_key_address, - })) - .ok_or(Eip7702SendError::InternalError { - message: "No owner address found".to_string(), - }) - .map_err_fail()?; + let owner_address = match &job_data.execution_options { + Eip7702ExecutionOptions::Owner(o) => o.from, + Eip7702ExecutionOptions::SessionKey(s) => s.session_key_address, + }; let account = DelegatedAccount::new(owner_address, chain); - let session_key_target_address = - job_data.execution_options.as_ref().and_then(|e| match e { - Eip7702ExecutionOptions::Owner(_) => None, - Eip7702ExecutionOptions::SessionKey(s) => Some(s.account_address), - }); + let session_key_target_address = match &job_data.execution_options { + Eip7702ExecutionOptions::Owner(_) => None, + Eip7702ExecutionOptions::SessionKey(s) => Some(s.account_address), + }; let transactions = match session_key_target_address { Some(target_address) => { @@ -314,7 +295,7 @@ where transaction_id: transaction_id.clone(), wrapped_calls: serde_json::to_value(&wrapped_calls) .map_err(|e| Eip7702SendError::InternalError { - message: format!("Failed to serialize wrapped calls: {}", e), + message: format!("Failed to serialize wrapped calls: {e}"), }) .map_err_fail()?, signature, @@ -343,8 +324,7 @@ where transaction_id: job.job.data.transaction_id.clone(), chain_id: job.job.data.chain_id, bundler_transaction_id: success_data.result.transaction_id.clone(), - eoa_address: None, - sender_details: Some(success_data.result.sender_details.clone()), + sender_details: success_data.result.sender_details.clone(), rpc_credentials: job.job.data.rpc_credentials.clone(), webhook_options: job.job.data.webhook_options.clone(), }) diff --git a/executors/src/eoa/error_classifier.rs b/executors/src/eoa/error_classifier.rs index e20f76b..5919d1d 100644 --- a/executors/src/eoa/error_classifier.rs +++ b/executors/src/eoa/error_classifier.rs @@ -1,7 +1,7 @@ use alloy::transports::{RpcError, TransportErrorKind}; use engine_core::{ chain::Chain, - error::{AlloyRpcErrorToEngineError, EngineError, RpcErrorKind}, + error::{AlloyRpcErrorToEngineError, EngineError}, }; use std::time::Duration; use twmq::job::RequeuePosition; @@ -128,7 +128,7 @@ impl EoaErrorMapper { _ => { // Not an actionable error code EoaExecutionError::RpcError { - message: format!("RPC error code {}: {}", code, message), + message: format!("RPC error code {code}: {message}"), inner_error: Some(EngineError::InternalError { message: message.to_string(), }), diff --git a/executors/src/eoa/store/atomic.rs b/executors/src/eoa/store/atomic.rs index e054be6..22a4094 100644 --- a/executors/src/eoa/store/atomic.rs +++ b/executors/src/eoa/store/atomic.rs @@ -105,9 +105,9 @@ impl AtomicEoaExecutorStore { { Ok(()) => { tracing::debug!( - eoa = %self.eoa(), - chain_id = %self.chain_id(), - worker_id = %self.worker_id(), + eoa = ?self.eoa(), + chain_id = self.chain_id(), + worker_id = self.worker_id(), "Successfully released EOA lock" ); Ok(self.store) @@ -115,9 +115,9 @@ impl AtomicEoaExecutorStore { Err(TransactionStoreError::LockLost { .. }) => { // Lock was already taken over, which is fine for release tracing::debug!( - eoa = %self.eoa(), - chain_id = %self.chain_id(), - worker_id = %self.worker_id(), + eoa = ?self.eoa(), + chain_id = self.chain_id(), + worker_id = self.worker_id(), "Lock already released or taken over by another worker" ); Ok(self.store) @@ -125,10 +125,10 @@ impl AtomicEoaExecutorStore { Err(e) => { // Other errors shouldn't fail the worker, just log tracing::warn!( - eoa = %self.eoa(), - chain_id = %self.chain_id(), - worker_id = %self.worker_id(), - error = %e, + eoa = ?self.eoa(), + chain_id = self.chain_id(), + worker_id = self.worker_id(), + error = ?e, "Failed to release EOA lock" ); Ok(self.store) @@ -197,7 +197,7 @@ impl AtomicEoaExecutorStore { tracing::debug!( retry_count = retry_count, delay_ms = delay_ms, - eoa = %self.eoa(), + eoa = ?self.eoa(), chain_id = self.chain_id(), "Retrying lock check operation" ); @@ -312,7 +312,7 @@ impl AtomicEoaExecutorStore { tracing::debug!( retry_count = retry_count, delay_ms = delay_ms, - eoa = %self.eoa, + eoa = ?self.eoa, chain_id = self.chain_id, operation = safe_tx.name(), "Retrying atomic operation" @@ -401,6 +401,7 @@ impl AtomicEoaExecutorStore { // First, read current health data let current_health = self.get_eoa_health().await?; + let optimistic_nonce = self.get_optimistic_transaction_count().await?; // Prepare health update if health data exists let health_update = if let Some(mut health) = current_health { @@ -417,6 +418,18 @@ impl AtomicEoaExecutorStore { // Update cached transaction count pipeline.set(&tx_count_key, current_chain_tx_count); + if current_chain_tx_count > optimistic_nonce { + tracing::warn!( + current_chain_tx_count = current_chain_tx_count, + optimistic_nonce = optimistic_nonce, + "Optimistic nonce was behind fresh chain transaction count, updating to match" + ); + pipeline.set( + self.optimistic_transaction_count_key_name(), + current_chain_tx_count, + ); + } + // Update health data only if it exists if let Some(ref health_json) = health_update { let health_key = self.eoa_health_key_name(); diff --git a/executors/src/eoa/store/borrowed.rs b/executors/src/eoa/store/borrowed.rs index 7408ec7..0f9e90e 100644 --- a/executors/src/eoa/store/borrowed.rs +++ b/executors/src/eoa/store/borrowed.rs @@ -81,8 +81,8 @@ impl SafeRedisTransaction for ProcessBorrowedTransactions<'_> { valid_results.push(result.clone()); } else { tracing::warn!( - transaction_id = %transaction_id, - nonce = %result.transaction.nonce, + transaction_id = ?transaction_id, + nonce = result.transaction.nonce, "Submission result not found in borrowed state, ignoring" ); } @@ -174,6 +174,9 @@ impl SafeRedisTransaction for ProcessBorrowedTransactions<'_> { // Update transaction data status let tx_data_key = self.keys.transaction_data_key_name(transaction_id); pipeline.hset(&tx_data_key, "status", "pending"); + + // ask for this nonce to be recycled because we did not consume the nonce + pipeline.zadd(self.keys.recycled_nonces_zset_name(), nonce, nonce); // Queue webhook event using user_request from SubmissionResult let event = EoaExecutorEvent { @@ -207,6 +210,9 @@ impl SafeRedisTransaction for ProcessBorrowedTransactions<'_> { pipeline.hset(&tx_data_key, "completed_at", now); pipeline.hset(&tx_data_key, "failure_reason", err.to_string()); + // ask for this nonce to be recycled because we did not consume the nonce + pipeline.zadd(self.keys.recycled_nonces_zset_name(), nonce, nonce); + // Queue webhook event using user_request from SubmissionResult let event = EoaExecutorEvent { transaction_id: transaction_id.to_string(), diff --git a/executors/src/eoa/store/hydrate.rs b/executors/src/eoa/store/hydrate.rs index ded813e..97a118e 100644 --- a/executors/src/eoa/store/hydrate.rs +++ b/executors/src/eoa/store/hydrate.rs @@ -20,7 +20,7 @@ pub trait Dehydrated { #[derive(Debug, Clone)] pub enum SubmittedTransactionHydrator { Noop, - Real(EoaTransactionRequest), + Real(Box), } impl Dehydrated for SubmittedTransactionDehydrated { @@ -41,7 +41,7 @@ impl Dehydrated for SubmittedTransactionDehydrated SubmittedTransactionHydrator::Real(request) => { SubmittedTransactionHydrated::Real(SubmittedTransaction { data: self, - user_request: request, + user_request: *request, }) } } @@ -149,7 +149,9 @@ impl EoaExecutorStore { transaction_id: id.to_string(), })?; - hydrated.push(d.hydrate(SubmittedTransactionHydrator::Real(request.clone()))); + hydrated.push(d.hydrate(SubmittedTransactionHydrator::Real(Box::new( + request.clone(), + )))); } Ok(hydrated) diff --git a/executors/src/eoa/store/mod.rs b/executors/src/eoa/store/mod.rs index c17da80..5957404 100644 --- a/executors/src/eoa/store/mod.rs +++ b/executors/src/eoa/store/mod.rs @@ -137,7 +137,7 @@ impl EoaExecutorStoreKeys { pub fn transaction_data_key_name(&self, transaction_id: &str) -> String { match &self.namespace { Some(ns) => format!("{ns}:eoa_executor:tx_data:{transaction_id}"), - None => format!("eoa_executor:_tx_data:{transaction_id}"), + None => format!("eoa_executor:tx_data:{transaction_id}"), } } @@ -404,9 +404,9 @@ impl EoaExecutorStore { } // Lock exists, forcefully take it over tracing::warn!( - eoa = %self.eoa, - chain_id = %self.chain_id, - worker_id = %worker_id, + eoa = ?self.eoa, + chain_id = self.chain_id, + worker_id = worker_id, "Forcefully taking over EOA lock from stalled worker" ); // Force set - no expiry, only released by explicit takeover @@ -504,13 +504,32 @@ impl EoaExecutorStore { &self, limit: u64, ) -> Result, TransactionStoreError> { + self.peek_pending_transactions_paginated(0, limit).await + } + + /// Peek at pending transactions with pagination support + pub async fn peek_pending_transactions_paginated( + &self, + offset: u64, + limit: u64, + ) -> Result, TransactionStoreError> { + if limit == 0 { + return Ok(Vec::new()); + } + let pending_key = self.pending_transactions_zset_name(); let mut conn = self.redis.clone(); - // Use ZRANGE to peek without removing - let transaction_ids: Vec = conn - .zrange_withscores(&pending_key, 0, (limit - 1) as isize) - .await?; + // Use ZRANGE to peek without removing, with offset support + let start = offset as isize; + let stop = (offset + limit - 1) as isize; + + let transaction_ids: Vec = + conn.zrange_withscores(&pending_key, start, stop).await?; + + if transaction_ids.is_empty() { + return Ok(Vec::new()); + } let mut pipe = twmq::redis::pipe(); @@ -731,6 +750,88 @@ impl EoaExecutorStore { Ok(submitted_txs) } + + /// Get the current time in milliseconds + /// + /// Used as the canonical time representation for this store + pub fn now() -> u64 { + chrono::Utc::now().timestamp_millis().max(0) as u64 + } + + /// Get count of pending transactions + pub async fn get_pending_transactions_count(&self) -> Result { + let pending_key = self.pending_transactions_zset_name(); + let mut conn = self.redis.clone(); + + let count: u64 = conn.zcard(&pending_key).await?; + Ok(count) + } + + /// Get count of borrowed transactions + pub async fn get_borrowed_transactions_count(&self) -> Result { + let borrowed_key = self.borrowed_transactions_hashmap_name(); + let mut conn = self.redis.clone(); + + let count: u64 = conn.hlen(&borrowed_key).await?; + Ok(count) + } + + /// Get all recycled nonces + pub async fn get_recycled_nonces(&self) -> Result, TransactionStoreError> { + let recycled_key = self.recycled_nonces_zset_name(); + let mut conn = self.redis.clone(); + + let nonces: Vec = conn.zrange(&recycled_key, 0, -1).await?; + Ok(nonces) + } + + /// Get count of recycled nonces + pub async fn get_recycled_nonces_count(&self) -> Result { + let recycled_key = self.recycled_nonces_zset_name(); + let mut conn = self.redis.clone(); + + let count: u64 = conn.zcard(&recycled_key).await?; + Ok(count) + } + + /// Get all submitted transactions (raw data) + pub async fn get_all_submitted_transactions(&self) -> Result, TransactionStoreError> { + let submitted_key = self.submitted_transactions_zset_name(); + let mut conn = self.redis.clone(); + + let submitted_data: Vec = + conn.zrange_withscores(&submitted_key, 0, -1).await?; + + let submitted_txs: Vec = + SubmittedTransactionDehydrated::from_redis_strings(&submitted_data); + + Ok(submitted_txs) + } + + /// Get attempts count for a specific transaction + pub async fn get_transaction_attempts_count(&self, transaction_id: &str) -> Result { + let attempts_key = self.transaction_attempts_list_name(transaction_id); + let mut conn = self.redis.clone(); + + let count: u64 = conn.llen(&attempts_key).await?; + Ok(count) + } + + /// Get all transaction attempts for a specific transaction + pub async fn get_transaction_attempts(&self, transaction_id: &str) -> Result, TransactionStoreError> { + let attempts_key = self.transaction_attempts_list_name(transaction_id); + let mut conn = self.redis.clone(); + + let attempts_data: Vec = conn.lrange(&attempts_key, 0, -1).await?; + + let mut attempts = Vec::new(); + for attempt_json in attempts_data { + let attempt: TransactionAttempt = serde_json::from_str(&attempt_json)?; + attempts.push(attempt); + } + + Ok(attempts) + } } // Additional error types diff --git a/executors/src/eoa/store/pending.rs b/executors/src/eoa/store/pending.rs index c1c5f67..fe09f2a 100644 --- a/executors/src/eoa/store/pending.rs +++ b/executors/src/eoa/store/pending.rs @@ -59,7 +59,7 @@ impl SafeRedisTransaction for MovePendingToBorrowedWithIncrementedNonces<'_> { let current_optimistic: Option = conn .get(self.keys.optimistic_transaction_count_key_name()) .await?; - let current_nonce = current_optimistic.ok_or(TransactionStoreError::NonceSyncRequired { + let current_optimistic_nonce = current_optimistic.ok_or(TransactionStoreError::NonceSyncRequired { eoa: self.eoa, chain_id: self.chain_id, })?; @@ -74,12 +74,11 @@ impl SafeRedisTransaction for MovePendingToBorrowedWithIncrementedNonces<'_> { // Check that nonces are sequential with no gaps for (i, &nonce) in nonces.iter().enumerate() { - let expected_nonce = current_nonce + i as u64; + let expected_nonce = current_optimistic_nonce + i as u64; if nonce != expected_nonce { return Err(TransactionStoreError::InternalError { message: format!( - "Non-sequential nonces detected: expected {}, found {} at position {}", - expected_nonce, nonce, i + "Non-sequential nonces detected: expected {expected_nonce}, found {nonce} at position {i}" ), }); } @@ -110,7 +109,7 @@ impl SafeRedisTransaction for MovePendingToBorrowedWithIncrementedNonces<'_> { for tx in self.transactions { let borrowed_json = serde_json::to_string(tx).map_err(|e| TransactionStoreError::InternalError { - message: format!("Failed to serialize borrowed transaction: {}", e), + message: format!("Failed to serialize borrowed transaction: {e}"), })?; serialized_transactions.push(borrowed_json); } @@ -224,7 +223,7 @@ impl SafeRedisTransaction for MovePendingToBorrowedWithRecycledNonces<'_> { for tx in self.transactions { let borrowed_json = serde_json::to_string(tx).map_err(|e| TransactionStoreError::InternalError { - message: format!("Failed to serialize borrowed transaction: {}", e), + message: format!("Failed to serialize borrowed transaction: {e}"), })?; serialized_transactions.push(borrowed_json); } diff --git a/executors/src/eoa/store/submitted.rs b/executors/src/eoa/store/submitted.rs index 3f3229e..92514ab 100644 --- a/executors/src/eoa/store/submitted.rs +++ b/executors/src/eoa/store/submitted.rs @@ -475,6 +475,8 @@ impl SafeRedisTransaction for CleanAndGetRecycledNonces<'_> { .zrange(self.keys.recycled_nonces_zset_name(), 0, -1) .await?; + // filter out nonces that are higher than the highest submitted nonce + // these don't need to be recycled, they'll be used up by incrementing the nonce let recycled_nonces = recycled_nonces .into_iter() .filter(|nonce| *nonce < highest_submitted_nonce) diff --git a/executors/src/eoa/worker/confirm.rs b/executors/src/eoa/worker/confirm.rs index e21a002..cc0d57b 100644 --- a/executors/src/eoa/worker/confirm.rs +++ b/executors/src/eoa/worker/confirm.rs @@ -8,12 +8,11 @@ use crate::eoa::{ TransactionData, TransactionStoreError, }, worker::{ - EoaExecutorWorker, - error::{EoaExecutorWorkerError, should_update_balance_threshold}, - }, + error::{should_update_balance_threshold, EoaExecutorWorkerError}, EoaExecutorWorker + }, EoaExecutorStore, }; -const NONCE_STALL_TIMEOUT: u64 = 300_000; // 5 minutes in milliseconds - after this time, attempt gas bump +const NONCE_STALL_LIMIT_MS: u64 = 300_000; // 5 minutes in milliseconds - after this time, attempt gas bump #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] @@ -37,7 +36,7 @@ impl EoaExecutorWorker { .map_err(|e| { let engine_error = e.to_engine_error(&self.chain); EoaExecutorWorkerError::RpcError { - message: format!("Failed to get transaction count: {}", engine_error), + message: format!("Failed to get transaction count: {engine_error}"), inner_error: engine_error, } })?; @@ -46,7 +45,7 @@ impl EoaExecutorWorker { Err(e) => match e { TransactionStoreError::NonceSyncRequired { .. } => { self.store - .reset_nonces(current_chain_transaction_count) + .update_cached_transaction_count(current_chain_transaction_count) .await?; current_chain_transaction_count } @@ -60,15 +59,15 @@ impl EoaExecutorWorker { // no nonce progress if current_chain_transaction_count <= cached_transaction_count { let current_health = self.get_eoa_health().await?; - let now = chrono::Utc::now().timestamp_millis().max(0) as u64; + let now = EoaExecutorStore::now(); // No nonce progress - check if we should attempt gas bumping for stalled nonce let time_since_movement = now.saturating_sub(current_health.last_nonce_movement_at); // if there are waiting transactions, we can attempt a gas bump - if time_since_movement > NONCE_STALL_TIMEOUT && submitted_count > 0 { + if time_since_movement > NONCE_STALL_LIMIT_MS && submitted_count > 0 { tracing::info!( time_since_movement = time_since_movement, - stall_timeout = NONCE_STALL_TIMEOUT, + stall_timeout = NONCE_STALL_LIMIT_MS, current_chain_nonce = current_chain_transaction_count, cached_transaction_count = cached_transaction_count, "Nonce has been stalled, attempting gas bump" @@ -80,7 +79,7 @@ impl EoaExecutorWorker { .await { tracing::warn!( - error = %e, + error = ?e, "Failed to attempt gas bump for stalled nonce" ); } @@ -122,9 +121,9 @@ impl EoaExecutorWorker { Ok(receipt_json) => receipt_json, Err(e) => { tracing::warn!( - transaction_id = %tx.transaction_id, - hash = %tx.transaction_hash, - error = %e, + transaction_id = ?tx.transaction_id, + hash = tx.transaction_hash, + error = ?e, "Failed to serialize receipt as JSON, using debug format" ); format!("{:?}", tx.receipt) @@ -132,9 +131,9 @@ impl EoaExecutorWorker { }; tracing::info!( - transaction_id = %tx.transaction_id, + transaction_id = ?tx.transaction_id, nonce = tx.nonce, - hash = %tx.transaction_hash, + hash = tx.transaction_hash, "Transaction confirmed" ); @@ -156,6 +155,23 @@ impl EoaExecutorWorker { ) .await?; + if current_chain_transaction_count != cached_transaction_count { + if current_chain_transaction_count < cached_transaction_count { + tracing::error!( + current_chain_transaction_count = current_chain_transaction_count, + cached_transaction_count = cached_transaction_count, + "Fresh fetched chain transaction count is lower than cached transaction count. \ + This indicates a re-org or RPC block lag. Engine will use the newest fetched transaction count from now (assuming re-org).\ + Transactions already confirmed will not be attempted again, even if their nonce was higher than the new chain transaction count. + In case this is RPC misbehaviour not reflective of actual chain state, Engine's nonce management might be affected." + ); + } + + self.store + .update_cached_transaction_count(current_chain_transaction_count) + .await?; + } + Ok(report) } @@ -262,7 +278,7 @@ impl EoaExecutorWorker { if let Some((transaction_id, tx_data)) = newest_transaction { tracing::info!( - transaction_id = %transaction_id, + transaction_id = ?transaction_id, nonce = expected_nonce, "Found newest transaction for gas bump" ); @@ -294,9 +310,9 @@ impl EoaExecutorWorker { } tracing::warn!( - transaction_id = %transaction_id, + transaction_id = ?transaction_id, nonce = expected_nonce, - error = %e, + error = ?e, "Failed to build typed transaction for gas bump" ); return Ok(false); @@ -310,9 +326,9 @@ impl EoaExecutorWorker { Ok(tx) => tx, Err(e) => { tracing::warn!( - transaction_id = %transaction_id, + transaction_id = ?transaction_id, nonce = expected_nonce, - error = %e, + error = ?e, "Failed to sign transaction for gas bump" ); return Ok(false); @@ -337,25 +353,32 @@ impl EoaExecutorWorker { match self.chain.provider().send_tx_envelope(tx_envelope).await { Ok(_) => { tracing::info!( - transaction_id = %transaction_id, + transaction_id = ?transaction_id, nonce = expected_nonce, "Successfully sent gas bumped transaction" ); - return Ok(true); + Ok(true) } Err(e) => { tracing::warn!( - transaction_id = %transaction_id, + transaction_id = ?transaction_id, nonce = expected_nonce, - error = %e, + error = ?e, "Failed to send gas bumped transaction" ); // Don't fail the worker, just log the error - return Ok(false); + Ok(false) } } - } + } else { + tracing::debug!( + nonce = expected_nonce, + "Successfully retrieved all transactions for this nonce, but failed to find newest transaction for gas bump, sending noop" + ); - Ok(false) + let noop_tx = self.send_noop_transaction(expected_nonce).await?; + self.store.process_noop_transactions(&[noop_tx]).await?; + Ok(true) + } } } diff --git a/executors/src/eoa/worker/error.rs b/executors/src/eoa/worker/error.rs index cc54363..7805b83 100644 --- a/executors/src/eoa/worker/error.rs +++ b/executors/src/eoa/worker/error.rs @@ -117,7 +117,7 @@ impl EoaExecutorWorkerError { impl From for EoaExecutorWorkerError { fn from(error: TwmqError) -> Self { EoaExecutorWorkerError::InternalError { - message: format!("Queue error: {}", error), + message: format!("Queue error: {error}"), } } } @@ -150,7 +150,7 @@ pub enum SendContext { InitialBroadcast, } -#[tracing::instrument(skip_all, fields(error = %error, context = ?context))] +#[tracing::instrument(skip_all, fields(error = ?error, context = ?context))] pub fn classify_send_error( error: &RpcError, context: SendContext, @@ -296,7 +296,7 @@ impl SubmissionResult { // Transaction failed, should be retried let engine_error = rpc_error.to_engine_error(chain); let error = EoaExecutorWorkerError::TransactionSendError { - message: format!("Transaction send failed: {}", rpc_error), + message: format!("Transaction send failed: {rpc_error}"), inner_error: engine_error, }; SubmissionResult { diff --git a/executors/src/eoa/worker/mod.rs b/executors/src/eoa/worker/mod.rs index 68de7ce..f3ff3b2 100644 --- a/executors/src/eoa/worker/mod.rs +++ b/executors/src/eoa/worker/mod.rs @@ -127,7 +127,7 @@ where type ErrorData = EoaExecutorWorkerError; type JobData = EoaExecutorWorkerJobData; - #[tracing::instrument(skip_all, fields(eoa = %job.job.data.eoa_address, chain_id = job.job.data.chain_id))] + #[tracing::instrument(name = "eoa_executor_worker", skip_all, fields(eoa = ?job.job.data.eoa_address, chain_id = job.job.data.chain_id))] async fn process( &self, job: &BorrowedJob, @@ -140,7 +140,7 @@ where .get_chain(data.chain_id) .map_err(|e| EoaExecutorWorkerError::ChainServiceError { chain_id: data.chain_id, - message: format!("Failed to get chain: {}", e), + message: format!("Failed to get chain: {e}"), }) .map_err_nack(Some(Duration::from_secs(10)), RequeuePosition::Last)?; @@ -170,7 +170,7 @@ where let result = worker.execute_main_workflow().await?; if let Err(e) = worker.release_eoa_lock().await { - tracing::error!("Error releasing EOA lock: {}", e); + tracing::error!(error = ?e, "Error releasing EOA lock"); } if result.is_work_remaining() { @@ -232,9 +232,9 @@ where let mut conn = self.redis.clone(); if let Err(e) = conn.del::<&str, ()>(&lock_key).await { tracing::error!( - eoa = %job_data.eoa_address, - chain_id = %job_data.chain_id, - error = %e, + eoa = ?job_data.eoa_address, + chain_id = job_data.chain_id, + error = ?e, "Failed to release EOA lock" ); } @@ -276,7 +276,7 @@ impl EoaExecutorWorker { .confirm_flow() .await .map_err(|e| { - tracing::error!("Error in confirm flow: {}", e); + tracing::error!(error = ?e, "Error in confirm flow"); e }) .map_err(|e| e.handle())?; @@ -286,7 +286,7 @@ impl EoaExecutorWorker { .send_flow() .await .map_err(|e| { - tracing::error!("Error in send_flow: {}", e); + tracing::error!(error = ?e, "Error in send_flow"); e }) .map_err(|e| e.handle())?; @@ -294,47 +294,55 @@ impl EoaExecutorWorker { // 4. CHECK FOR REMAINING WORK let pending_count = self .store - .peek_pending_transactions(1000) + .get_pending_transactions_count() .await .map_err(|e| { - tracing::error!("Error in peek_pending_transactions: {}", e); + tracing::error!(error = ?e, "Error in peek_pending_transactions"); e }) - .map_err(|e| Into::::into(e).handle())? - .len(); + .map_err(|e| Into::::into(e).handle())?; let borrowed_count = self .store - .peek_borrowed_transactions() + .get_borrowed_transactions_count() .await .map_err(|e| { - tracing::error!("Error in peek_borrowed_transactions: {}", e); + tracing::error!(error = ?e, "Error in peek_borrowed_transactions"); e }) - .map_err(|e| Into::::into(e).handle())? - .len(); + .map_err(|e| Into::::into(e).handle())?; - let recycled_count = self + let recycled_nonces_count = self .store - .peek_recycled_nonces() + .get_recycled_nonces_count() .await .map_err(|e| { - tracing::error!("Error in peek_recycled_nonces: {}", e); + tracing::error!(error = ?e, "Error in peek_recycled_nonces"); e }) - .map_err(|e| Into::::into(e).handle())? - .len(); + .map_err(|e| Into::::into(e).handle())?; let submitted_count = self .store .get_submitted_transactions_count() .await .map_err(|e| { - tracing::error!("Error in get_submitted_transactions_count: {}", e); + tracing::error!(error = ?e, "Error in get_submitted_transactions_count"); e }) .map_err(|e| Into::::into(e).handle())?; + tracing::info!( + recovered = recovered, + confirmed = confirmations_report.moved_to_success, + temp_failed = confirmations_report.moved_to_pending, + replacements = confirmations_report.moved_to_pending, + currently_submitted = submitted_count, + currently_pending = pending_count, + currently_borrowed = borrowed_count, + currently_recycled = recycled_nonces_count, + ); + Ok(EoaExecutorWorkerResult { recovered_transactions: recovered, confirmed_transactions: confirmations_report.moved_to_success as u32, @@ -345,7 +353,7 @@ impl EoaExecutorWorker { submitted_transactions: submitted_count as u32, pending_transactions: pending_count as u32, borrowed_transactions: borrowed_count as u32, - recycled_nonces: recycled_count as u32, + recycled_nonces: recycled_nonces_count as u32, }) } @@ -376,7 +384,7 @@ impl EoaExecutorWorker { let transaction_id = borrowed.transaction_id.clone(); tracing::info!( - transaction_id = %transaction_id, + transaction_id = ?transaction_id, nonce = nonce, "Recovering borrowed transaction" ); @@ -452,8 +460,7 @@ impl EoaExecutorWorker { let engine_error = e.to_engine_error(&self.chain); EoaExecutorWorkerError::RpcError { message: format!( - "Failed to get balance during initialization: {}", - engine_error + "Failed to get balance during initialization: {engine_error}" ), inner_error: engine_error, } @@ -475,7 +482,7 @@ impl EoaExecutorWorker { } } - #[tracing::instrument(skip_all, fields(eoa = %self.eoa, chain_id = %self.chain.chain_id()))] + #[tracing::instrument(skip_all, fields(eoa = ?self.eoa, chain_id = self.chain.chain_id()))] async fn update_balance_threshold(&self) -> Result<(), EoaExecutorWorkerError> { let mut health = self.get_eoa_health().await?; @@ -488,7 +495,7 @@ impl EoaExecutorWorker { .map_err(|e| { let engine_error = e.to_engine_error(&self.chain); EoaExecutorWorkerError::RpcError { - message: format!("Failed to get balance: {}", engine_error), + message: format!("Failed to get balance: {engine_error}"), inner_error: engine_error, } })?; diff --git a/executors/src/eoa/worker/send.rs b/executors/src/eoa/worker/send.rs index 19b7b88..ea32023 100644 --- a/executors/src/eoa/worker/send.rs +++ b/executors/src/eoa/worker/send.rs @@ -34,7 +34,7 @@ impl EoaExecutorWorker { .map_err(|e| { let engine_error = e.to_engine_error(&self.chain); EoaExecutorWorkerError::RpcError { - message: format!("Failed to get balance: {}", engine_error), + message: format!("Failed to get balance: {engine_error}"), inner_error: engine_error, } })?; @@ -118,11 +118,12 @@ impl EoaExecutorWorker { let prepared_results = futures::future::join_all(build_tasks).await; let prepared_results_with_pending = pending_txs .iter() + .take(prepared_results.len()) .zip(prepared_results.into_iter()) .collect::>(); let cleaned_results = self - .clean_prepration_results(prepared_results_with_pending) + .clean_prepration_results(prepared_results_with_pending, false) .await?; if cleaned_results.is_empty() { @@ -217,23 +218,40 @@ impl EoaExecutorWorker { Ok(total_sent as u32) } + /// Clean preparation results to only contain successful transactions. + /// + /// If `should_break_on_failure` is true, the function will break on the first failure. + /// + /// Otherwise, it will continue to process all transactions. + /// + /// `should_break_on_failure` is used to handle incremented nonce processing + /// where we want to break on the first failure to maintain nonce continuity. + /// + /// Regardless of break condition, all errors are still processed for non-retryable errors, and cleaned up async fn clean_prepration_results( &self, results: Vec<( &PendingTransaction, Result, )>, + should_break_on_failure: bool, ) -> Result, EoaExecutorWorkerError> { let mut cleaned_results = Vec::new(); let mut balance_threshold_update_needed = false; + let mut failure_occurred = false; for (pending, result) in results.into_iter() { - match result { - Ok(borrowed_data) => { + match (failure_occurred, result) { + (false, Ok(borrowed_data)) => { cleaned_results.push(borrowed_data); } - Err(e) => { + (_, Err(e)) => { // Track balance threshold issues + + if should_break_on_failure { + failure_occurred = true; + } + if let EoaExecutorWorkerError::TransactionSimulationFailed { inner_error, .. } = &e @@ -249,17 +267,23 @@ impl EoaExecutorWorker { // For deterministic build failures, fail the transaction immediately if !is_retryable_preparation_error(&e) { + tracing::error!( + error = ?e, + transaction_id = pending.transaction_id, + "Transaction permanently failed due to non-retryable preparation error", + ); self.store .fail_pending_transaction(pending, e, self.webhook_queue.clone()) .await?; } } + (true, Ok(_)) => continue, } } if balance_threshold_update_needed { if let Err(e) = self.update_balance_threshold().await { - tracing::error!("Failed to update balance threshold: {}", e); + tracing::error!(error = ?e, "Failed to update balance threshold"); } } @@ -269,6 +293,7 @@ impl EoaExecutorWorker { /// Process new transactions with fixed iterations and simple sequential nonces async fn process_new_transactions(&self, budget: u64) -> Result { if budget == 0 { + tracing::warn!("No budget to process new transactions"); return Ok(0); } @@ -322,7 +347,7 @@ impl EoaExecutorWorker { // Clean preparation results (handles failures and removes bad transactions) let cleaned_results = self - .clean_prepration_results(prepared_results_with_pending) + .clean_prepration_results(prepared_results_with_pending, true) .await?; if cleaned_results.is_empty() { @@ -392,12 +417,10 @@ impl EoaExecutorWorker { ); total_sent += processing_report.moved_to_submitted; - remaining_budget = remaining_budget.saturating_sub(moved_count as u64); - // If we didn't use all our budget in this iteration, we're likely done - if moved_count < batch_size { - break; - } + // Update remaining budget by actual nonce consumption + remaining_budget = + remaining_budget.saturating_sub(processing_report.moved_to_submitted as u64); } Ok(total_sent as u32) diff --git a/executors/src/eoa/worker/transaction.rs b/executors/src/eoa/worker/transaction.rs index cac5313..9ff269a 100644 --- a/executors/src/eoa/worker/transaction.rs +++ b/executors/src/eoa/worker/transaction.rs @@ -50,7 +50,7 @@ impl EoaExecutorWorker { tokio::time::sleep(Duration::from_millis(delay)).await; tracing::debug!( - transaction_id = %pending_transaction.transaction_id, + transaction_id = ?pending_transaction.transaction_id, attempt = attempt, "Retrying transaction preparation" ); @@ -64,9 +64,9 @@ impl EoaExecutorWorker { Err(error) => { if is_retryable_preparation_error(&error) && attempt < MAX_PREPARATION_RETRIES { tracing::warn!( - transaction_id = %pending_transaction.transaction_id, + transaction_id = ?pending_transaction.transaction_id, attempt = attempt, - error = %error, + error = ?error, "Retryable error during transaction preparation, will retry" ); last_error = Some(error); @@ -211,8 +211,7 @@ impl EoaExecutorWorker { } Err(legacy_error) => Err(EoaExecutorWorkerError::RpcError { message: format!( - "Failed to get legacy gas price: {}", - legacy_error + "Failed to get legacy gas price: {legacy_error}" ), inner_error: legacy_error.to_engine_error(&self.chain), }), @@ -225,7 +224,7 @@ impl EoaExecutorWorker { } else { // Other EIP-1559 error Err(EoaExecutorWorkerError::RpcError { - message: format!("Failed to estimate EIP-1559 fees: {}", eip1559_error), + message: format!("Failed to estimate EIP-1559 fees: {eip1559_error}"), inner_error: eip1559_error.to_engine_error(&self.chain), }) } @@ -333,7 +332,7 @@ impl EoaExecutorWorker { // Not a revert - could be RPC issue, this should nack the worker let engine_error = e.to_engine_error(&self.chain); return Err(EoaExecutorWorkerError::RpcError { - message: format!("Gas estimation failed: {}", engine_error), + message: format!("Gas estimation failed: {engine_error}"), inner_error: engine_error, }); } @@ -344,7 +343,7 @@ impl EoaExecutorWorker { tx_request .build_typed_tx() .map_err(|e| EoaExecutorWorkerError::TransactionBuildFailed { - message: format!("Failed to build typed transaction: {:?}", e), + message: format!("Failed to build typed transaction: {e:?}"), }) } @@ -363,13 +362,13 @@ impl EoaExecutorWorker { .sign_transaction(signing_options, &typed_tx, credential) .await .map_err(|engine_error| EoaExecutorWorkerError::SigningError { - message: format!("Failed to sign transaction: {}", engine_error), + message: format!("Failed to sign transaction: {engine_error}"), inner_error: engine_error, })?; let signature = signature.parse::().map_err(|e| { EoaExecutorWorkerError::SignatureParsingFailed { - message: format!("Failed to parse signature: {}", e), + message: format!("Failed to parse signature: {e}"), } })?; diff --git a/executors/src/external_bundler/confirm.rs b/executors/src/external_bundler/confirm.rs index 273eb79..385286d 100644 --- a/executors/src/external_bundler/confirm.rs +++ b/executors/src/external_bundler/confirm.rs @@ -81,7 +81,7 @@ pub enum UserOpConfirmationError { impl From for UserOpConfirmationError { fn from(error: TwmqError) -> Self { UserOpConfirmationError::InternalError { - message: format!("Deserialization error for job data: {}", error), + message: format!("Deserialization error for job data: {error}"), } } } @@ -153,7 +153,7 @@ where .get_chain(job_data.chain_id) .map_err(|e| UserOpConfirmationError::ChainServiceError { chain_id: job_data.chain_id, - message: format!("Failed to get chain instance: {}", e), + message: format!("Failed to get chain instance: {e}"), }) .map_err_fail()?; @@ -204,10 +204,10 @@ where // 3. We got a receipt - that's confirmation success! // Whether it reverted or not is just information in the receipt tracing::info!( - transaction_id = %job_data.transaction_id, + transaction_id = job_data.transaction_id, user_op_hash = ?job_data.user_op_hash, transaction_hash = ?receipt.receipt.transaction_hash, - success = %receipt.success, + success = ?receipt.success, "User operation confirmed on-chain" ); @@ -240,7 +240,7 @@ where ); tracing::info!( - transaction_id = %job.job.data.transaction_id, + transaction_id = job.job.data.transaction_id, account_address = ?job.job.data.account_address, "Added atomic lock release and cache update to transaction pipeline" ); @@ -249,8 +249,8 @@ where // Queue success webhook if let Err(e) = self.queue_success_webhook(job, success_data, tx) { tracing::error!( - transaction_id = %job.job.data.transaction_id, - error = %e, + transaction_id = job.job.data.transaction_id, + error = ?e, "Failed to queue success webhook" ); } @@ -266,15 +266,15 @@ where // Just queue webhook with current status if let Err(e) = self.queue_nack_webhook(job, nack_data, tx) { tracing::error!( - transaction_id = %job.job.data.transaction_id, - error = %e, + transaction_id = job.job.data.transaction_id, + error = ?e, "Failed to queue nack webhook" ); } tracing::debug!( - transaction_id = %job.job.data.transaction_id, - attempt = %job.job.attempts, + transaction_id = job.job.data.transaction_id, + attempt = job.job.attempts, "Confirmation job NACKed, retaining lock for retry" ); } @@ -305,9 +305,9 @@ where }; tracing::error!( - transaction_id = %job.job.data.transaction_id, + transaction_id = job.job.data.transaction_id, account_address = ?job.job.data.account_address, - reason = %failure_reason, + reason = failure_reason, "Added lock release to transaction pipeline due to permanent failure" ); } @@ -315,8 +315,8 @@ where // Queue failure webhook if let Err(e) = self.queue_fail_webhook(job, fail_data, tx) { tracing::error!( - transaction_id = %job.job.data.transaction_id, - error = %e, + transaction_id = job.job.data.transaction_id, + error = ?e, "Failed to queue fail webhook" ); } diff --git a/executors/src/external_bundler/deployment.rs b/executors/src/external_bundler/deployment.rs index 440be62..364e64f 100644 --- a/executors/src/external_bundler/deployment.rs +++ b/executors/src/external_bundler/deployment.rs @@ -42,7 +42,7 @@ impl RedisDeploymentCache { } fn cache_key(&self, chain_id: u64, account_address: &Address) -> String { - format!("{}:{}:{}", CACHE_PREFIX, chain_id, account_address) + format!("{CACHE_PREFIX}:{chain_id}:{account_address}") } } @@ -71,11 +71,11 @@ impl RedisDeploymentLock { } fn lock_key(&self, chain_id: u64, account_address: &Address) -> String { - format!("{}:{}:{}", LOCK_PREFIX, chain_id, account_address) + format!("{LOCK_PREFIX}:{chain_id}:{account_address}") } fn cache_key(&self, chain_id: u64, account_address: &Address) -> String { - format!("{}:{}:{}", CACHE_PREFIX, chain_id, account_address) + format!("{CACHE_PREFIX}:{chain_id}:{account_address}") } /// Release a deployment lock using the provided pipeline @@ -150,7 +150,7 @@ impl DeploymentLock for RedisDeploymentLock { let lock_id = Uuid::new_v4().to_string(); let now = SystemTime::now() .duration_since(UNIX_EPOCH) - .map_err(|e| EngineError::InternalError { message: format!("System time error: {}", e) })? + .map_err(|e| EngineError::InternalError { message: format!("System time error: {e}") })? .as_secs(); let lock_data = LockData { @@ -159,20 +159,20 @@ impl DeploymentLock for RedisDeploymentLock { }; let lock_data_str = serde_json::to_string(&lock_data) - .map_err(|e| EngineError::InternalError { message: format!("Serialization failed: {}", e) })?; + .map_err(|e| EngineError::InternalError { message: format!("Serialization failed: {e}") })?; // Use SET NX EX for atomic acquire let result: Option = conn .set_nx(&key, &lock_data_str) .await - .map_err(|e| EngineError::InternalError { message: format!("Lock acquire failed: {}", e) })?; + .map_err(|e| EngineError::InternalError { message: format!("Lock acquire failed: {e}") })?; match result { Some(_) => Ok(AcquireLockResult::Acquired), None => { // Lock already exists, get the lock_id let existing_data: Option = conn.get(&key).await.map_err(|e| { - EngineError::InternalError { message: format!("Failed to read existing lock: {}", e) } + EngineError::InternalError { message: format!("Failed to read existing lock: {e}") } })?; let existing_lock_id = existing_data @@ -196,8 +196,7 @@ impl DeploymentLock for RedisDeploymentLock { let deleted = conn.del::<&str, usize>(&key).await.map_err(|e| { EngineError::InternalError { message: format!( - "Failed to delete lock for account {}: {}", - account_address, e + "Failed to delete lock for account {account_address}: {e}" ) } })?; diff --git a/executors/src/external_bundler/send.rs b/executors/src/external_bundler/send.rs index 427d53b..3eb5b32 100644 --- a/executors/src/external_bundler/send.rs +++ b/executors/src/external_bundler/send.rs @@ -145,7 +145,7 @@ pub enum ExternalBundlerSendError { impl From for ExternalBundlerSendError { fn from(error: TwmqError) -> Self { ExternalBundlerSendError::InternalError { - message: format!("Deserialization error for job data: {}", error), + message: format!("Deserialization error for job data: {error}"), } } } @@ -260,7 +260,7 @@ where .get_chain(job_data.chain_id) .map_err(|e| ExternalBundlerSendError::ChainServiceError { chain_id: job_data.chain_id, - message: format!("Failed to get chain instance: {}", e), + message: format!("Failed to get chain instance: {e}"), }) .map_err_fail()?; @@ -329,7 +329,7 @@ where EngineError::RpcError { kind: k, .. } => { let mapped_error = ExternalBundlerSendError::ChainServiceError { chain_id: chain.chain_id(), - message: format!("Deployment manager error: {}", e), + message: format!("Deployment manager error: {e}"), }; if is_retryable_rpc_error(k) { mapped_error.nack(Some(Duration::from_secs(10)), RequeuePosition::Last) @@ -338,7 +338,7 @@ where } } _ => ExternalBundlerSendError::InternalError { - message: format!("Deployment manager error: {}", e), + message: format!("Deployment manager error: {e}"), } .nack(Some(Duration::from_secs(10)), RequeuePosition::Last), } @@ -350,8 +350,7 @@ where return Err(ExternalBundlerSendError::DeploymentLocked { account_address: smart_account.address, message: format!( - "Deployment in progress (stale: {}, lock_id: {})", - stale, lock_id + "Deployment in progress (stale: {stale}, lock_id: {lock_id})" ), }) .map_err_nack( @@ -366,7 +365,7 @@ where .await .map_err(|e| ExternalBundlerSendError::DeploymentLocked { account_address: smart_account.address, - message: format!("Failed to acquire deployment lock: {}", e), + message: format!("Failed to acquire deployment lock: {e}"), }) .map_err_nack(Some(Duration::from_secs(15)), RequeuePosition::Last)? { @@ -378,7 +377,7 @@ where // Someone else has the lock, NACK and retry later return Err(ExternalBundlerSendError::DeploymentLocked { account_address: smart_account.address, - message: format!("Lock held by another process: {}", lock_id), + message: format!("Lock held by another process: {lock_id}"), }) .map_err_nack(Some(Duration::from_secs(15)), RequeuePosition::Last); } @@ -547,16 +546,16 @@ where if let Err(e) = tx.queue_job(confirmation_job) { tracing::error!( - transaction_id = %job.job.data.transaction_id, - error = %e, + transaction_id = job.job.data.transaction_id, + error = ?e, "Failed to queue confirmation job" ); } if let Err(e) = self.queue_success_webhook(job, success_data, tx) { tracing::error!( - transaction_id = %job.job.data.transaction_id, - error = %e, + transaction_id = job.job.data.transaction_id, + error = ?e, "Failed to queue success webhook" ); } @@ -578,8 +577,8 @@ where if let Err(e) = self.queue_nack_webhook(job, nack_data, tx) { tracing::error!( - transaction_id = %job.job.data.transaction_id, - error = %e, + transaction_id = job.job.data.transaction_id, + error = ?e, "Failed to queue nack webhook" ); } @@ -605,8 +604,8 @@ where if let Err(e) = self.queue_fail_webhook(job, fail_data, tx) { tracing::error!( - transaction_id = %job.job.data.transaction_id, - error = %e, + transaction_id = job.job.data.transaction_id, + error = ?e, "Failed to queue fail webhook" ); } diff --git a/executors/src/transaction_registry.rs b/executors/src/transaction_registry.rs index e1510d4..db72613 100644 --- a/executors/src/transaction_registry.rs +++ b/executors/src/transaction_registry.rs @@ -29,7 +29,7 @@ impl TransactionRegistry { fn registry_key(&self) -> String { match &self.namespace { - Some(ns) => format!("{}:tx_registry", ns), + Some(ns) => format!("{ns}:tx_registry"), None => "tx_registry".to_string(), } } diff --git a/executors/src/webhook/envelope.rs b/executors/src/webhook/envelope.rs index 0422f16..b043173 100644 --- a/executors/src/webhook/envelope.rs +++ b/executors/src/webhook/envelope.rs @@ -270,11 +270,11 @@ pub trait WebhookCapable: DurableExecution + ExecutorStage { tx.queue_job(webhook_job)?; tracing::info!( - transaction_id = %job.job.transaction_id(), - executor = %Self::executor_name(), - stage = %Self::stage_name(), + transaction_id = job.job.transaction_id(), + executor = Self::executor_name(), + stage = Self::stage_name(), event = ?envelope.event_type, - notification_id = %envelope.notification_id, + notification_id = envelope.notification_id, "Queued webhook notification" ); diff --git a/executors/src/webhook/mod.rs b/executors/src/webhook/mod.rs index e2cfc1b..d4fc375 100644 --- a/executors/src/webhook/mod.rs +++ b/executors/src/webhook/mod.rs @@ -5,8 +5,8 @@ use std::time::{Duration, SystemTime, UNIX_EPOCH}; use engine_core::execution_options::WebhookOptions; use hex; use hmac::{Hmac, Mac}; -use reqwest::header::{HeaderMap, HeaderName, HeaderValue}; use reqwest::StatusCode; +use reqwest::header::{HeaderMap, HeaderName, HeaderValue}; use serde::{Deserialize, Serialize}; use twmq::error::TwmqError; use twmq::hooks::TransactionContext; @@ -141,15 +141,13 @@ impl DurableExecution for WebhookJobHandler { for (key, value) in custom_headers { let header_name = HeaderName::from_bytes(key.as_bytes()).map_err(|e| { JobError::Fail(WebhookError::RequestConstruction(format!( - "Invalid header name '{}': {}", - key, e + "Invalid header name '{key}': {e}" ))) })?; let header_value = HeaderValue::from_str(value).map_err(|e| { JobError::Fail(WebhookError::RequestConstruction(format!( - "Invalid header value for '{}': {}", - key, e + "Invalid header value for '{key}': {e}" ))) })?; @@ -169,8 +167,7 @@ impl DurableExecution for WebhookJobHandler { .duration_since(UNIX_EPOCH) .map_err(|e| { JobError::Fail(WebhookError::RequestConstruction(format!( - "Failed to get system time for timestamp: {}", - e + "Failed to get system time for timestamp: {e}" ))) })? .as_secs(); @@ -184,7 +181,7 @@ impl DurableExecution for WebhookJobHandler { type HmacSha256 = Hmac; let mut mac = HmacSha256::new_from_slice(secret.as_bytes()) .map_err(|e| { - WebhookError::HmacGeneration(format!("Failed to initialize HMAC: {}", e)) + WebhookError::HmacGeneration(format!("Failed to initialize HMAC: {e}")) }) .map_err_fail()?; @@ -195,8 +192,7 @@ impl DurableExecution for WebhookJobHandler { let signature_header_value = HeaderValue::from_str(&signature_hex) .map_err(|e| { WebhookError::RequestConstruction(format!( - "Invalid header value for '{}': {}", - SIGNATURE_HEADER_NAME, e + "Invalid header value for '{SIGNATURE_HEADER_NAME}': {e}" )) }) .map_err_fail()?; @@ -204,8 +200,7 @@ impl DurableExecution for WebhookJobHandler { let timestamp_header_value = HeaderValue::from_str(×tamp_str) .map_err(|e| { WebhookError::RequestConstruction(format!( - "Invalid header value for '{}': {}", - TIMESTAMP_HEADER_NAME, e + "Invalid header value for '{TIMESTAMP_HEADER_NAME}': {e}" )) }) .map_err_fail()?; @@ -237,10 +232,10 @@ impl DurableExecution for WebhookJobHandler { .body(payload.body.clone()); tracing::debug!( - job_id = %job.job.id, - url = %payload.url, - method = %http_method_str, - attempt = %job.job.attempts, + job_id = job.job.id, + url = payload.url, + method = http_method_str, + attempt = job.job.attempts, "Sending webhook request" ); @@ -254,18 +249,22 @@ impl DurableExecution for WebhookJobHandler { Err(e) => { if status.is_success() { let err = WebhookError::ResponseReadError(format!( - "Failed to read response body: {}", - e + "Failed to read response body: {e}" )); return Err(err).map_err_fail(); } - tracing::warn!(job_id = %job.job.id, "Failed to read response body for error status {}: {}", status, e); + tracing::warn!( + job_id = job.job.id, + "Failed to read response body for error status {}: {}", + status, + e + ); None } }; if status.is_success() { - tracing::info!(job_id = %job.job.id, status = %status, "Webhook delivered successfully"); + tracing::info!(job_id = job.job.id, status = ?status, "Webhook delivered successfully"); Ok(WebhookJobOutput { status_code: status.as_u16(), response_body: response_body_text, @@ -298,11 +297,11 @@ impl DurableExecution for WebhookJobHandler { let delay = Duration::from_millis(delay_ms); tracing::warn!( - job_id = %job.job.id, - status = %status, - attempt = %job.job.attempts, - max_attempts = %self.retry_config.max_attempts, - delay_ms = %delay.as_millis(), + job_id = job.job.id, + status = ?status, + attempt = job.job.attempts, + max_attempts = self.retry_config.max_attempts, + delay_ms = delay.as_millis(), "Webhook failed with retryable status, NACKing." ); Err(JobError::Nack { @@ -312,17 +311,17 @@ impl DurableExecution for WebhookJobHandler { }) } else { tracing::error!( - job_id = %job.job.id, - status = %status, - attempt = %job.job.attempts, + job_id = job.job.id, + status = ?status, + attempt = job.job.attempts, "Webhook failed after max attempts, FAILING." ); Err(JobError::Fail(webhook_error)) } } else { tracing::error!( - job_id = %job.job.id, - status = %status, + job_id = job.job.id, + status = ?status, "Webhook failed with non-retryable client error, FAILING." ); Err(JobError::Fail(webhook_error)) @@ -342,7 +341,7 @@ impl DurableExecution for WebhookJobHandler { && !e.is_connect() && !e.is_timeout() { - tracing::error!(job_id = %job.job.id, error = %webhook_error, "Webhook construction error, FAILING."); + tracing::error!(job_id = job.job.id, error = ?webhook_error, "Webhook construction error, FAILING."); return Err(JobError::Fail(webhook_error)); } @@ -356,11 +355,11 @@ impl DurableExecution for WebhookJobHandler { let delay = Duration::from_millis(delay_ms); tracing::warn!( - job_id = %job.job.id, - error = %webhook_error, - attempt = %job.job.attempts, - max_attempts = %self.retry_config.max_attempts, - delay_ms = %delay.as_millis(), + job_id = job.job.id, + error = ?webhook_error, + attempt = job.job.attempts, + max_attempts = self.retry_config.max_attempts, + delay_ms = delay.as_millis(), "Webhook request failed, NACKing." ); @@ -371,9 +370,9 @@ impl DurableExecution for WebhookJobHandler { }) } else { tracing::error!( - job_id = %job.job.id, - error = %webhook_error, - attempt = %job.job.attempts, + job_id = job.job.id, + error = ?webhook_error, + attempt = job.job.attempts, "Webhook request failed after max attempts, FAILING." ); Err(JobError::Fail(webhook_error)) @@ -390,9 +389,9 @@ impl DurableExecution for WebhookJobHandler { _tx: &mut TransactionContext<'_>, ) { tracing::info!( - job_id = %job.job.id, - url = %job.job.data.url, - status = %d.result.status_code, + job_id = job.job.id, + url = job.job.data.url, + status = d.result.status_code, "Webhook successfully processed (on_success hook)." ); } @@ -405,11 +404,11 @@ impl DurableExecution for WebhookJobHandler { _tx: &mut TransactionContext<'_>, ) { tracing::warn!( - job_id = %job.job.id, - url = %job.job.data.url, - attempt = %job.job.attempts, + job_id = job.job.id, + url = job.job.data.url, + attempt = job.job.attempts, error = ?d.error, - delay_ms = %d.delay.map_or(0, |dur| dur.as_millis()), + delay_ms = d.delay.map_or(0, |dur| dur.as_millis()), "Webhook NACKed (on_nack hook)." ); } @@ -422,9 +421,9 @@ impl DurableExecution for WebhookJobHandler { _tx: &mut TransactionContext<'_>, ) { tracing::error!( - job_id = %job.job.id, - url = %job.job.data.url, - attempt = %job.job.attempts, + job_id = job.job.id, + url = job.job.data.url, + attempt = job.job.attempts, error = ?d.error, "Webhook FAILED permanently (on_fail hook)." ); @@ -442,9 +441,12 @@ pub fn queue_webhook_envelopes( webhook_options .iter() .map(|webhook_option| { - let webhook_notification_envelope = envelope - .clone() - .into_webhook_notification_envelope(now, webhook_option.url.clone(), webhook_option.user_metadata.clone()); + let webhook_notification_envelope = + envelope.clone().into_webhook_notification_envelope( + now, + webhook_option.url.clone(), + webhook_option.user_metadata.clone(), + ); let serialised_envelope = serde_json::to_string(&webhook_notification_envelope)?; Ok(( serialised_envelope, @@ -493,11 +495,11 @@ pub fn queue_webhook_envelopes( tx.queue_job(webhook_job)?; tracing::info!( - transaction_id = %webhook_notification_envelope.transaction_id, - executor = %webhook_notification_envelope.executor_name, - stage = %webhook_notification_envelope.stage_name, + transaction_id = webhook_notification_envelope.transaction_id, + executor = webhook_notification_envelope.executor_name, + stage = webhook_notification_envelope.stage_name, event = ?webhook_notification_envelope.event_type, - notification_id = %webhook_notification_envelope.notification_id, + notification_id = webhook_notification_envelope.notification_id, "Queued webhook notification" ); } diff --git a/server/src/config.rs b/server/src/config.rs index d248764..33a9446 100644 --- a/server/src/config.rs +++ b/server/src/config.rs @@ -37,6 +37,7 @@ pub struct ServerConfig { pub host: String, pub port: u16, pub log_format: LogFormat, + pub diagnostic_access_password: Option, } #[derive(Debug, Clone, Deserialize)] @@ -69,10 +70,12 @@ impl Default for ServerConfig { port: 3000, host: "0.0.0.0".into(), log_format: LogFormat::Pretty, + diagnostic_access_password: None, } } } +/// EngineConfig is cached, it only loads once pub fn get_config() -> EngineConfig { let base_path = env::current_dir().expect("Failed to determine the current directory"); let configuration_directory = base_path.join("configuration"); @@ -94,14 +97,14 @@ pub fn get_config() -> EngineConfig { .add_source(config::Environment::with_prefix("app").separator("__")) .build() .unwrap_or_else(|e| { - eprintln!("Configuration error: {}", e); + eprintln!("Configuration error: {e}"); panic!("Failed to build configuration"); }); // Deserialize the configuration config.try_deserialize::() .unwrap_or_else(|e| { - eprintln!("Configuration error: {}", e); + eprintln!("Configuration error: {e}"); eprintln!("Make sure all required fields are set correctly in your configuration files or environment variables."); panic!("Failed to deserialize configuration"); }) @@ -133,8 +136,7 @@ impl TryFrom for Environment { "development" => Ok(Self::Development), "production" => Ok(Self::Production), other => Err(format!( - "{} is not a supported environment. Use either `local`, `development`, or `production`.", - other + "{other} is not a supported environment. Use either `local`, `development`, or `production`." )), } } diff --git a/server/src/execution_router/mod.rs b/server/src/execution_router/mod.rs index 7325d4a..90b89ae 100644 --- a/server/src/execution_router/mod.rs +++ b/server/src/execution_router/mod.rs @@ -97,7 +97,7 @@ impl ExecutionRouter { // Get chain and encode calldata properly let chain = self.chains.get_chain(base_options.chain_id).map_err(|e| { EngineError::InternalError { - message: format!("Failed to get chain: {}", e), + message: format!("Failed to get chain: {e}"), } })?; @@ -128,10 +128,10 @@ impl ExecutionRouter { // Create rules for UserOp restrictions let nonce_rule = Rule::Regex(RegexRule { - pattern: format!("^{}$", preallocated_nonce), + pattern: format!("^{preallocated_nonce}$"), }); let calldata_rule = Rule::Regex(RegexRule { - pattern: format!("(?i)^{}$", encoded_calldata), + pattern: format!("(?i)^{encoded_calldata}$"), }); let userop_v06_rules = UserOperationV06Rules { @@ -187,7 +187,7 @@ impl ExecutionRouter { .vault_client .create_signed_access_token(access_token.clone(), additional_policies, expiry_timestamp) .map_err(|e| EngineError::VaultError { - message: format!("Failed to create signed access token: {}", e), + message: format!("Failed to create signed access token: {e}"), })?; let converted_credential = SigningCredential::Vault(Auth::AccessToken { @@ -300,7 +300,7 @@ impl ExecutionRouter { &self, base_execution_options: &BaseExecutionOptions, erc4337_execution_options: &Erc4337ExecutionOptions, - webhook_options: &Vec, + webhook_options: &[WebhookOptions], transactions: &[InnerTransaction], rpc_credentials: RpcCredentials, signing_credential: SigningCredential, @@ -312,7 +312,7 @@ impl ExecutionRouter { transactions: transactions.to_vec(), execution_options: erc4337_execution_options.clone(), signing_credential, - webhook_options: webhook_options.clone(), + webhook_options: webhook_options.to_owned(), rpc_credentials, pregenerated_nonce, }; @@ -325,7 +325,7 @@ impl ExecutionRouter { ) .await .map_err(|e| TwmqError::Runtime { - message: format!("Failed to register transaction: {}", e), + message: format!("Failed to register transaction: {e}"), })?; // Create job with transaction ID as the job ID for idempotency @@ -337,7 +337,7 @@ impl ExecutionRouter { .await?; tracing::debug!( - transaction_id = %base_execution_options.idempotency_key, + transaction_id = base_execution_options.idempotency_key, queue = "external_bundler_send", "Job queued successfully" ); @@ -358,8 +358,7 @@ impl ExecutionRouter { transaction_id: base_execution_options.idempotency_key.clone(), chain_id: base_execution_options.chain_id, transactions: transactions.to_vec(), - eoa_address: None, - execution_options: Some(eip7702_execution_options.clone()), + execution_options: eip7702_execution_options.clone(), signing_credential, webhook_options, rpc_credentials, @@ -371,7 +370,7 @@ impl ExecutionRouter { .set_transaction_queue(&base_execution_options.idempotency_key, "eip7702_send") .await .map_err(|e| TwmqError::Runtime { - message: format!("Failed to register transaction: {}", e), + message: format!("Failed to register transaction: {e}"), })?; // Create job with transaction ID as the job ID for idempotency @@ -383,7 +382,7 @@ impl ExecutionRouter { .await?; tracing::debug!( - transaction_id = %base_execution_options.idempotency_key, + transaction_id = base_execution_options.idempotency_key, queue = "eip7702_send", "Job queued successfully" ); @@ -404,7 +403,7 @@ impl ExecutionRouter { .chains .get_chain(base_execution_options.chain_id) .map_err(|e| EngineError::InternalError { - message: format!("Failed to get chain: {}", e), + message: format!("Failed to get chain: {e}"), })?; let transaction = if transactions.len() > 1 { @@ -422,7 +421,7 @@ impl ExecutionRouter { let is_minimal_account = is_minimal_account.map_err(|e| EngineError::InternalError { - message: format!("Failed to check 7702 delegation: {:?}", e), + message: format!("Failed to check 7702 delegation: {e:?}"), })?; if !is_minimal_account { @@ -472,7 +471,7 @@ impl ExecutionRouter { .add_transaction(eoa_transaction_request) .await .map_err(|e| TwmqError::Runtime { - message: format!("Failed to add transaction to EOA store: {}", e), + message: format!("Failed to add transaction to EOA store: {e}"), })?; // Register transaction in registry @@ -480,7 +479,7 @@ impl ExecutionRouter { .set_transaction_queue(&base_execution_options.idempotency_key, "eoa_executor") .await .map_err(|e| TwmqError::Runtime { - message: format!("Failed to register transaction: {}", e), + message: format!("Failed to register transaction: {e}"), })?; // Ensure an idempotent job exists for this EOA:chain combination @@ -504,9 +503,9 @@ impl ExecutionRouter { .await?; tracing::debug!( - transaction_id = %base_execution_options.idempotency_key, - eoa = %eoa_execution_options.from, - chain_id = %base_execution_options.chain_id, + transaction_id = base_execution_options.idempotency_key, + eoa = ?eoa_execution_options.from, + chain_id = base_execution_options.chain_id, queue = "eoa_executor", "EOA transaction added to store and worker job ensured" ); diff --git a/server/src/http/dyn_contract.rs b/server/src/http/dyn_contract.rs index a9c8da7..63437e0 100644 --- a/server/src/http/dyn_contract.rs +++ b/server/src/http/dyn_contract.rs @@ -117,7 +117,7 @@ impl ContractCall { return Err(EngineError::contract_preparation_error( Some(self.contract_address), chain_id, - format!("Function '{}' not found in contract ABI", function_name), + format!("Function '{function_name}' not found in contract ABI"), )); } @@ -210,7 +210,7 @@ impl ContractCall { } Err(EngineError::ValidationError { - message: format!("Invalid method format: {}", method), + message: format!("Invalid method format: {method}"), }) } @@ -245,7 +245,7 @@ impl ContractCall { let parsed_value: DynSolValue = sol_type .coerce_json(json_value) - .map_err(|e| format!("Failed to parse parameter as DynSolValue: {}", e))?; + .map_err(|e| format!("Failed to parse parameter as DynSolValue: {e}"))?; if !parsed_value.matches(&sol_type) { return Err(format!( @@ -288,7 +288,7 @@ impl ContractCall { EngineError::contract_preparation_error( Some(self.contract_address), chain_id, - format!("Failed to encode function call: {}", e), + format!("Failed to encode function call: {e}"), ) }) } @@ -342,7 +342,7 @@ impl ContractCall { /// Converts DynSolValue back to JSON for response pub fn dyn_sol_value_to_json(value: &DynSolValue) -> JsonValue { match value { - DynSolValue::Address(addr) => JsonValue::String(format!("{:#x}", addr)), + DynSolValue::Address(addr) => JsonValue::String(format!("{addr:#x}")), DynSolValue::Uint(val, _) => JsonValue::String(val.to_string()), DynSolValue::Int(val, _) => JsonValue::String(val.to_string()), DynSolValue::Bool(b) => JsonValue::Bool(*b), diff --git a/server/src/http/extractors.rs b/server/src/http/extractors.rs index 928c8f7..4a1c7d1 100644 --- a/server/src/http/extractors.rs +++ b/server/src/http/extractors.rs @@ -13,7 +13,7 @@ use engine_core::{ use thirdweb_core::auth::ThirdwebAuth; use vault_types::enclave::auth::Auth; -use crate::http::error::ApiEngineError; +use crate::http::{error::ApiEngineError, server::EngineServerState}; // Header name constants const HEADER_THIRDWEB_SECRET_KEY: &str = "x-thirdweb-secret-key"; @@ -24,6 +24,7 @@ const HEADER_VAULT_ACCESS_TOKEN: &str = "x-vault-access-token"; const HEADER_AWS_KMS_ARN: &str = "x-aws-kms-arn"; const HEADER_AWS_ACCESS_KEY_ID: &str = "x-aws-access-key-id"; const HEADER_AWS_SECRET_ACCESS_KEY: &str = "x-aws-secret-access-key"; +const HEADER_DIAGNOSTIC_ACCESS_PASSWORD: &str = "x-diagnostic-access-password"; /// Extractor for RPC credentials from headers #[derive(OperationIo)] @@ -168,7 +169,7 @@ impl SigningCredentialsExtractor { fn parse_kms_arn(arn: &str) -> Result<(String, String), ApiEngineError> { let parsed_arn: aws_arn::ResourceName = arn.parse().map_err(|e| { ApiEngineError(EngineError::ValidationError { - message: format!("Invalid AWS ARN format: {}", e), + message: format!("Invalid AWS ARN format: {e}"), }) })?; @@ -274,14 +275,14 @@ where Ok(Json(data)) => Ok(EngineJson(data)), Err(rejection) => { let message = match rejection { - JsonRejection::JsonDataError(err) => format!("Invalid JSON data: {}", err), - JsonRejection::JsonSyntaxError(err) => format!("JSON syntax error: {}", err), + JsonRejection::JsonDataError(err) => format!("Invalid JSON data: {err}"), + JsonRejection::JsonSyntaxError(err) => format!("JSON syntax error: {err}"), JsonRejection::MissingJsonContentType(_) => { "Missing or invalid Content-Type header. Expected application/json" .to_string() } JsonRejection::BytesRejection(err) => { - format!("Failed to read request body: {}", err) + format!("Failed to read request body: {err}") } _ => "Invalid JSON request".to_string(), }; @@ -291,3 +292,48 @@ where } } } + +/// Extractor for diagnostic access authentication +#[derive(OperationIo)] +pub struct DiagnosticAuthExtractor; + +impl FromRequestParts for DiagnosticAuthExtractor { + type Rejection = ApiEngineError; + + async fn from_request_parts( + parts: &mut Parts, + state: &EngineServerState, + ) -> Result { + // Get the configured diagnostic password from environment + let config_password = state.diagnostic_access_password.as_ref(); + + let config_password = match config_password { + Some(pass) if !pass.is_empty() => pass, + _ => { + return Err(ApiEngineError(EngineError::ValidationError { + message: "Diagnostic access is not configured. Set DIAGNOSTIC_ACCESS_PASSWORD environment variable.".to_string(), + })); + } + }; + + // Get the password from the request header + let provided_password = parts + .headers + .get(HEADER_DIAGNOSTIC_ACCESS_PASSWORD) + .and_then(|v| v.to_str().ok()) + .ok_or_else(|| { + ApiEngineError(EngineError::ValidationError { + message: "Missing x-diagnostic-access-password header".to_string(), + }) + })?; + + // Verify the password matches + if provided_password != config_password { + return Err(ApiEngineError(EngineError::ValidationError { + message: "Invalid diagnostic access password".to_string(), + })); + } + + Ok(DiagnosticAuthExtractor) + } +} diff --git a/server/src/http/routes/admin/eoa_diagnostics.rs b/server/src/http/routes/admin/eoa_diagnostics.rs new file mode 100644 index 0000000..9f75e2f --- /dev/null +++ b/server/src/http/routes/admin/eoa_diagnostics.rs @@ -0,0 +1,418 @@ +use alloy::{consensus::Transaction, primitives::Address}; +use axum::{ + Router, debug_handler, + extract::{Path, Query, State}, + http::StatusCode, + response::{IntoResponse, Json}, +}; +use engine_executors::eoa::store::{EoaExecutorStore, EoaHealth, TransactionData}; +use serde::{Deserialize, Serialize}; + +use crate::http::{ + error::ApiEngineError, extractors::DiagnosticAuthExtractor, server::EngineServerState, + types::SuccessResponse, +}; + +// ===== TYPES ===== + +#[derive(Debug, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct EoaStateResponse { + pub eoa: String, + pub chain_id: u64, + pub cached_nonce: Option, + pub optimistic_nonce: Option, + pub pending_count: u64, + pub submitted_count: u64, + pub borrowed_count: u64, + pub recycled_nonces_count: u64, + pub recycled_nonces: Vec, + pub health: Option, +} + +#[derive(Debug, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct TransactionDetailResponse { + pub transaction_data: Option, + pub attempts_count: u64, +} + +#[derive(Debug, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct PendingTransactionResponse { + pub transaction_id: String, + pub queued_at: u64, +} + +#[derive(Debug, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct PendingTransactionsResponse { + pub transactions: Vec, + pub total_count: u64, + pub has_more: bool, + pub offset: u64, + pub limit: u64, +} + +#[derive(Debug, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct SubmittedTransactionResponse { + pub nonce: u64, + pub transaction_hash: String, + pub transaction_id: String, + pub queued_at: u64, +} + +#[derive(Debug, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct BorrowedTransactionResponse { + pub transaction_id: String, + pub nonce: u64, + pub queued_at: u64, + pub borrowed_at: u64, +} + +#[derive(Debug, Deserialize)] +pub struct PaginationQuery { + pub offset: Option, + pub limit: Option, +} + +// ===== ROUTE HANDLERS ===== + +/// Get EOA State +/// +/// Get comprehensive state information for an EOA on a specific chain, including +/// all transaction counts, nonces, and recycled nonces information. +#[debug_handler] +pub async fn get_eoa_state( + _auth: DiagnosticAuthExtractor, + State(state): State, + Path(eoa_chain): Path, +) -> Result { + let (eoa, chain_id) = parse_eoa_chain(&eoa_chain)?; + let eoa_address: Address = eoa.parse().map_err(|_| { + ApiEngineError(engine_core::error::EngineError::ValidationError { + message: "Invalid EOA address format".to_string(), + }) + })?; + + // Get Redis connection from the EOA executor queue + let eoa_queue = &state.queue_manager.eoa_executor_queue; + let redis_conn = eoa_queue.redis.clone(); + + // Get namespace from the config + let namespace = state + .queue_manager + .eoa_executor_queue + .handler + .namespace + .clone(); + let store = EoaExecutorStore::new(redis_conn, namespace, eoa_address, chain_id); + + // Get all the state information using store methods + let cached_nonce = store.get_cached_transaction_count().await.ok(); + let optimistic_nonce = store.get_optimistic_transaction_count().await.ok(); + let pending_count = store.get_pending_transactions_count().await.map_err(|e| { + ApiEngineError(engine_core::error::EngineError::InternalError { + message: format!("Failed to get pending count: {e}"), + }) + })?; + let submitted_count = store + .get_submitted_transactions_count() + .await + .map_err(|e| { + ApiEngineError(engine_core::error::EngineError::InternalError { + message: format!("Failed to get submitted count: {e}"), + }) + })?; + let borrowed_count = store.get_borrowed_transactions_count().await.map_err(|e| { + ApiEngineError(engine_core::error::EngineError::InternalError { + message: format!("Failed to get borrowed count: {e}"), + }) + })?; + + // Get recycled nonces using store methods + let recycled_nonces = store.get_recycled_nonces().await.map_err(|e| { + ApiEngineError(engine_core::error::EngineError::InternalError { + message: format!("Failed to get recycled nonces: {e}"), + }) + })?; + let recycled_nonces_count = recycled_nonces.len() as u64; + + let health = store.get_eoa_health().await.map_err(|e| { + ApiEngineError(engine_core::error::EngineError::InternalError { + message: format!("Failed to get EOA health: {e}"), + }) + })?; + + let response = EoaStateResponse { + eoa: eoa_address.to_string(), + chain_id, + cached_nonce, + optimistic_nonce, + pending_count, + submitted_count, + borrowed_count, + recycled_nonces_count, + recycled_nonces, + health, + }; + + Ok((StatusCode::OK, Json(SuccessResponse::new(response)))) +} + +/// Get Transaction Detail +/// +/// Get fully hydrated transaction details including all attempts and user request data. +/// Note: This endpoint requires the transaction to exist for the given EOA:chain combination. +#[debug_handler] +pub async fn get_transaction_detail( + _auth: DiagnosticAuthExtractor, + State(state): State, + Path((eoa_chain, transaction_id)): Path<(String, String)>, +) -> Result { + let (eoa, chain_id) = parse_eoa_chain(&eoa_chain)?; + let eoa_address: Address = eoa.parse().map_err(|_| { + ApiEngineError(engine_core::error::EngineError::ValidationError { + message: "Invalid EOA address format".to_string(), + }) + })?; + + // Get Redis connection from the EOA executor queue + let eoa_queue = &state.queue_manager.eoa_executor_queue; + let redis_conn = eoa_queue.handler.redis.clone(); + + // Get namespace from the config + let namespace = eoa_queue.handler.namespace.clone(); + let store = EoaExecutorStore::new(redis_conn, namespace, eoa_address, chain_id); + + // Get transaction data using store method + let transaction_data = store + .get_transaction_data(&transaction_id) + .await + .map_err(|e| { + ApiEngineError(engine_core::error::EngineError::InternalError { + message: format!("Failed to get transaction data: {e}"), + }) + })?; + + // Get attempts count using store method + let attempts_count = store + .get_transaction_attempts_count(&transaction_id) + .await + .map_err(|e| { + ApiEngineError(engine_core::error::EngineError::InternalError { + message: format!("Failed to get attempts count: {e}"), + }) + })?; + + let response = TransactionDetailResponse { + transaction_data, + attempts_count, + }; + + Ok((StatusCode::OK, Json(SuccessResponse::new(response)))) +} + +/// Get Pending Transactions +/// +/// Get paginated list of pending transactions for an EOA (raw data from Redis). +#[debug_handler] +pub async fn get_pending_transactions( + _auth: DiagnosticAuthExtractor, + State(state): State, + Path(eoa_chain): Path, + Query(pagination): Query, +) -> Result { + let (eoa, chain_id) = parse_eoa_chain(&eoa_chain)?; + let eoa_address: Address = eoa.parse().map_err(|_| { + ApiEngineError(engine_core::error::EngineError::ValidationError { + message: "Invalid EOA address format".to_string(), + }) + })?; + + // Get Redis connection from the EOA executor queue + let eoa_queue = &state.queue_manager.eoa_executor_queue; + let redis_conn = eoa_queue.handler.redis.clone(); + + // Get namespace from the config + let namespace = eoa_queue.handler.namespace.clone(); + + let store = EoaExecutorStore::new(redis_conn, namespace, eoa_address, chain_id); + let offset = pagination.offset.unwrap_or(0); + let limit = pagination.limit.unwrap_or(1000).min(1000); // Cap at 100 + + // Use store methods to get pending transactions with pagination + let pending_txs = store + .peek_pending_transactions_paginated(offset, limit) + .await + .map_err(|e| { + ApiEngineError(engine_core::error::EngineError::InternalError { + message: format!("Failed to get pending transactions: {e}"), + }) + })?; + + let total_count = store.get_pending_transactions_count().await.map_err(|e| { + ApiEngineError(engine_core::error::EngineError::InternalError { + message: format!("Failed to get pending count: {e}"), + }) + })?; + + let transactions: Vec = pending_txs + .into_iter() + .map(|tx| PendingTransactionResponse { + transaction_id: tx.transaction_id, + queued_at: tx.queued_at, + }) + .collect(); + let has_more = (offset + (transactions.len() as u64)) < total_count; + + let response = PendingTransactionsResponse { + transactions, + total_count, + has_more, + offset, + limit, + }; + + Ok((StatusCode::OK, Json(SuccessResponse::new(response)))) +} + +/// Get Submitted Transactions +/// +/// Get all submitted transactions for an EOA (raw data from Redis). +#[debug_handler] +pub async fn get_submitted_transactions( + _auth: DiagnosticAuthExtractor, + State(state): State, + Path(eoa_chain): Path, +) -> Result { + let (eoa, chain_id) = parse_eoa_chain(&eoa_chain)?; + let eoa_address: Address = eoa.parse().map_err(|_| { + ApiEngineError(engine_core::error::EngineError::ValidationError { + message: "Invalid EOA address format".to_string(), + }) + })?; + + // Get Redis connection from the EOA executor queue + let eoa_queue = &state.queue_manager.eoa_executor_queue; + let redis_conn = eoa_queue.handler.redis.clone(); + + // Get namespace from the config + let namespace = eoa_queue.handler.namespace.clone(); + + let store = EoaExecutorStore::new(redis_conn, namespace, eoa_address, chain_id); + + // Use store method to get submitted transactions + let submitted_txs = store.get_all_submitted_transactions().await.map_err(|e| { + ApiEngineError(engine_core::error::EngineError::InternalError { + message: format!("Failed to get submitted transactions: {e}"), + }) + })?; + + let transactions: Vec = submitted_txs + .into_iter() + .map(|tx| SubmittedTransactionResponse { + nonce: tx.nonce, + transaction_hash: tx.transaction_hash, + transaction_id: tx.transaction_id, + queued_at: tx.queued_at, + }) + .collect(); + + Ok((StatusCode::OK, Json(SuccessResponse::new(transactions)))) +} + +/// Get Borrowed Transactions +/// +/// Get all borrowed transactions for an EOA (raw data from Redis). +#[debug_handler] +pub async fn get_borrowed_transactions( + _auth: DiagnosticAuthExtractor, + State(state): State, + Path(eoa_chain): Path, +) -> Result { + let (eoa, chain_id) = parse_eoa_chain(&eoa_chain)?; + let eoa_address: Address = eoa.parse().map_err(|_| { + ApiEngineError(engine_core::error::EngineError::ValidationError { + message: "Invalid EOA address format".to_string(), + }) + })?; + + // Get Redis connection from the EOA executor queue + let eoa_queue = &state.queue_manager.eoa_executor_queue; + let redis_conn = eoa_queue.handler.redis.clone(); + + // Get namespace from the config + let namespace = eoa_queue.handler.namespace.clone(); + + let store = EoaExecutorStore::new(redis_conn, namespace, eoa_address, chain_id); + + // Use store method to get borrowed transactions + let borrowed_txs = store.peek_borrowed_transactions().await.map_err(|e| { + ApiEngineError(engine_core::error::EngineError::InternalError { + message: format!("Failed to get borrowed transactions: {e}"), + }) + })?; + + let transactions: Vec = borrowed_txs + .into_iter() + .map(|tx| BorrowedTransactionResponse { + transaction_id: tx.transaction_id, + nonce: tx.signed_transaction.nonce(), + queued_at: tx.queued_at, + borrowed_at: tx.borrowed_at, + }) + .collect(); + + Ok((StatusCode::OK, Json(SuccessResponse::new(transactions)))) +} + +// ===== HELPER FUNCTIONS ===== + +/// Parse eoa:chain_id format +fn parse_eoa_chain(eoa_chain: &str) -> Result<(String, u64), ApiEngineError> { + let parts: Vec<&str> = eoa_chain.split(':').collect(); + if parts.len() != 2 { + return Err(ApiEngineError( + engine_core::error::EngineError::ValidationError { + message: "Invalid format. Expected 'address:chain_id'".to_string(), + }, + )); + } + + let eoa = parts[0].to_string(); + let chain_id = parts[1].parse::().map_err(|_| { + ApiEngineError(engine_core::error::EngineError::ValidationError { + message: "Invalid chain_id format".to_string(), + }) + })?; + + Ok((eoa, chain_id)) +} + +pub fn eoa_diagnostics_router() -> Router { + // Add hidden admin diagnostic routes (not included in OpenAPI) + Router::new() + .route( + "/admin/executors/eoa/{eoa_chain}/state", + axum::routing::get(get_eoa_state), + ) + .route( + "/admin/executors/eoa/{eoa_chain}/transaction/{transaction_id}", + axum::routing::get(get_transaction_detail), + ) + .route( + "/admin/executors/eoa/{eoa_chain}/pending", + axum::routing::get(get_pending_transactions), + ) + .route( + "/admin/executors/eoa/{eoa_chain}/submitted", + axum::routing::get(get_submitted_transactions), + ) + .route( + "/admin/executors/eoa/{eoa_chain}/borrowed", + axum::routing::get(get_borrowed_transactions), + ) +} diff --git a/server/src/http/routes/admin/mod.rs b/server/src/http/routes/admin/mod.rs index bb7845a..0f4257b 100644 --- a/server/src/http/routes/admin/mod.rs +++ b/server/src/http/routes/admin/mod.rs @@ -1 +1,2 @@ -pub mod queue; \ No newline at end of file +pub mod queue; +pub mod eoa_diagnostics; \ No newline at end of file diff --git a/server/src/http/routes/admin/queue.rs b/server/src/http/routes/admin/queue.rs index 0026dfc..4b02458 100644 --- a/server/src/http/routes/admin/queue.rs +++ b/server/src/http/routes/admin/queue.rs @@ -42,7 +42,7 @@ pub async fn empty_queue_idempotency_set( Path(queue_name): Path, ) -> Result { tracing::info!( - queue_name = %queue_name, + queue_name = queue_name, "Processing empty idempotency set request" ); @@ -88,8 +88,7 @@ pub async fn empty_queue_idempotency_set( return Err(ApiEngineError( engine_core::error::EngineError::ValidationError { message: format!( - "Invalid queue name '{}'. Valid options are: webhook, external_bundler_send, userop_confirm, eoa_executor, eip7702_send, eip7702_confirm", - queue_name + "Invalid queue name '{queue_name}'. Valid options are: webhook, external_bundler_send, userop_confirm, eoa_executor, eip7702_send, eip7702_confirm" ), }, )); @@ -100,7 +99,7 @@ pub async fn empty_queue_idempotency_set( match result { Ok(()) => { tracing::info!( - queue_name = %queue_name, + queue_name = queue_name, "Successfully emptied idempotency set" ); @@ -109,24 +108,22 @@ pub async fn empty_queue_idempotency_set( Json(SuccessResponse::new(EmptyIdempotencySetResponse { queue_name: queue_name.clone(), message: format!( - "Successfully emptied idempotency set for queue '{}'", - queue_name + "Successfully emptied idempotency set for queue '{queue_name}'" ), })), )) } Err(e) => { tracing::error!( - queue_name = %queue_name, - error = %e, + queue_name = queue_name, + error = ?e, "Failed to empty idempotency set" ); Err(ApiEngineError( engine_core::error::EngineError::InternalError { message: format!( - "Failed to empty idempotency set for queue '{}': {}", - queue_name, e + "Failed to empty idempotency set for queue '{queue_name}': {e}" ), }, )) diff --git a/server/src/http/routes/contract_read.rs b/server/src/http/routes/contract_read.rs index ef6a974..ea66f03 100644 --- a/server/src/http/routes/contract_read.rs +++ b/server/src/http/routes/contract_read.rs @@ -98,8 +98,7 @@ impl<'de> Deserialize<'de> for MulticallConfig { match addr_str.parse::
() { Ok(address) => Ok(MulticallConfig::CustomAddress(address)), Err(_) => Err(D::Error::custom(format!( - "Invalid address format: {}. Expected valid Ethereum address or boolean value", - addr_str + "Invalid address format: {addr_str}. Expected valid Ethereum address or boolean value" ))), } } @@ -475,7 +474,7 @@ async fn execute_direct_contract_calls( EngineError::contract_decoding_error( Some(prepared_call.target), chain.chain_id(), - format!("Failed to decode result: {}", e), + format!("Failed to decode result: {e}"), ), )); } @@ -513,13 +512,13 @@ async fn execute_multicall( .input(multicall_call.abi_encode().into()); let result = provider.call(call_request).await.map_err(|e| { - EngineError::contract_multicall_error(chain_id, format!("Multicall failed: {}", e)) + EngineError::contract_multicall_error(chain_id, format!("Multicall failed: {e}")) })?; let decoded = aggregate3Call::abi_decode_returns(&result).map_err(|e| { EngineError::contract_multicall_error( chain_id, - format!("Failed to decode multicall result: {}", e), + format!("Failed to decode multicall result: {e}"), ) })?; @@ -589,7 +588,7 @@ fn process_multicall_result( BatchResultItem::failure(EngineError::contract_decoding_error( Some(prepared_call.target), 0, // Chain ID not available here - format!("Failed to decode result: {}", e), + format!("Failed to decode result: {e}"), )) } } diff --git a/server/src/http/routes/contract_write.rs b/server/src/http/routes/contract_write.rs index acd4c41..309d25d 100644 --- a/server/src/http/routes/contract_write.rs +++ b/server/src/http/routes/contract_write.rs @@ -113,9 +113,9 @@ pub async fn write_contract( let executor_type = request.execution_options.executor_type(); tracing::info!( - transaction_id = %transaction_id, + transaction_id = transaction_id, executor_type = ?executor_type, - chain_id = %chain_id, + chain_id = chain_id, "Processing contract write request" ); @@ -154,7 +154,7 @@ pub async fn write_contract( if !preparation_errors.is_empty() { let error_details: Vec = preparation_errors .iter() - .map(|(index, error)| format!("Parameter {}: {}", index, error)) + .map(|(index, error)| format!("Parameter {index}: {error}")) .collect(); return Err(ApiEngineError(EngineError::ValidationError { @@ -186,7 +186,7 @@ pub async fn write_contract( .map_err(ApiEngineError)?; tracing::info!( - transaction_id = %transaction_id, + transaction_id = transaction_id, executor_type = ?executor_type, "Contract write transaction queued successfully" ); diff --git a/server/src/http/routes/sign_typed_data.rs b/server/src/http/routes/sign_typed_data.rs index a6f8a97..cf19080 100644 --- a/server/src/http/routes/sign_typed_data.rs +++ b/server/src/http/routes/sign_typed_data.rs @@ -182,7 +182,7 @@ async fn sign_single_typed_data( signed_data, }), Err(e) => BatchResultItem::failure(EngineError::ValidationError { - message: format!("Failed to serialize typed data: {}", e), + message: format!("Failed to serialize typed data: {e}"), }), } } diff --git a/server/src/http/routes/transaction.rs b/server/src/http/routes/transaction.rs index a14e75b..f5cd430 100644 --- a/server/src/http/routes/transaction.rs +++ b/server/src/http/routes/transaction.rs @@ -53,7 +53,7 @@ pub async fn cancel_transaction( Path(transaction_id): Path, ) -> Result { tracing::info!( - transaction_id = %transaction_id, + transaction_id = transaction_id, "Processing transaction cancellation request" ); @@ -85,7 +85,7 @@ pub async fn cancel_transaction( .map_err(|e| ApiEngineError(e.into()))?; tracing::info!( - transaction_id = %transaction_id, + transaction_id = transaction_id, "Transaction cancelled immediately" ); @@ -93,7 +93,7 @@ pub async fn cancel_transaction( } TwmqCancelResult::CancellationPending => { tracing::info!( - transaction_id = %transaction_id, + transaction_id = transaction_id, "Transaction cancellation pending" ); @@ -101,7 +101,7 @@ pub async fn cancel_transaction( } TwmqCancelResult::NotFound => { tracing::warn!( - transaction_id = %transaction_id, + transaction_id = transaction_id, "Transaction not found in send queue" ); @@ -111,7 +111,7 @@ pub async fn cancel_transaction( } Some("userop_confirm") => { tracing::info!( - transaction_id = %transaction_id, + transaction_id = transaction_id, "Cannot cancel transaction - already sent and waiting for mine" ); @@ -121,18 +121,18 @@ pub async fn cancel_transaction( } Some(other_queue) => { tracing::warn!( - transaction_id = %transaction_id, - queue = %other_queue, + transaction_id = transaction_id, + queue = other_queue, "Transaction in unsupported queue for cancellation" ); CancelResult::CannotCancel { - reason: format!("Transaction in unsupported queue: {}", other_queue), + reason: format!("Transaction in unsupported queue: {other_queue}"), } } None => { tracing::warn!( - transaction_id = %transaction_id, + transaction_id = transaction_id, "Transaction not found in registry" ); diff --git a/server/src/http/routes/transaction_write.rs b/server/src/http/routes/transaction_write.rs index 0977eb6..6a40034 100644 --- a/server/src/http/routes/transaction_write.rs +++ b/server/src/http/routes/transaction_write.rs @@ -47,9 +47,9 @@ pub async fn write_transaction( let executor_type = request.execution_options.executor_type(); tracing::info!( - transaction_id = %transaction_id, + transaction_id = transaction_id, executor_type = ?executor_type, - chain_id = %request.execution_options.chain_id(), + chain_id = request.execution_options.chain_id(), "Processing transaction request" ); @@ -60,7 +60,7 @@ pub async fn write_transaction( .map_err(ApiEngineError)?; tracing::info!( - transaction_id = %transaction_id, + transaction_id = transaction_id, executor_type = ?executor_type, "Transaction queued successfully" ); diff --git a/server/src/http/server.rs b/server/src/http/server.rs index 18cd704..d3601c2 100644 --- a/server/src/http/server.rs +++ b/server/src/http/server.rs @@ -10,7 +10,8 @@ use utoipa_scalar::{Scalar, Servable}; use vault_sdk::VaultClient; use crate::{ - chains::ThirdwebChainService, execution_router::ExecutionRouter, queue::manager::QueueManager, + chains::ThirdwebChainService, execution_router::ExecutionRouter, + http::routes::admin::eoa_diagnostics::eoa_diagnostics_router, queue::manager::QueueManager, }; use tower_http::{ cors::{Any, CorsLayer}, @@ -27,6 +28,8 @@ pub struct EngineServerState { pub execution_router: Arc, pub queue_manager: Arc, + + pub diagnostic_access_password: Option, } pub struct EngineServer { @@ -69,12 +72,17 @@ impl EngineServer { )) .layer(cors) .layer(TraceLayer::new_for_http()) - .with_state(state); + .with_state(state.clone()); + + let eoa_diagnostics_router = eoa_diagnostics_router().with_state(state); let (router, api) = OpenApiRouter::with_openapi(ApiDoc::openapi()) .nest("/v1", v1_router) .split_for_parts(); + // Merge the hidden diagnostic routes after OpenAPI split + let router = router.merge(eoa_diagnostics_router); + let api_clone = api.clone(); let router = router .merge(Scalar::with_url("/reference", api).custom_html(SCALAR_HTML)) @@ -168,9 +176,8 @@ impl EngineServer { } Err(e) => { tracing::error!("Failed to join HTTP server task: {}", e); - return Err(std::io::Error::new( - std::io::ErrorKind::Other, - format!("Task join error: {}", e), + return Err(std::io::Error::other( + format!("Task join error: {e}"), )); } } diff --git a/server/src/main.rs b/server/src/main.rs index b43f0b1..51d2af6 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -101,6 +101,7 @@ async fn main() -> anyhow::Result<()> { chains, execution_router: Arc::new(execution_router), queue_manager: Arc::new(queue_manager), + diagnostic_access_password: config.server.diagnostic_access_password, }) .await; diff --git a/server/src/queue/manager.rs b/server/src/queue/manager.rs index 272ee9b..84b027d 100644 --- a/server/src/queue/manager.rs +++ b/server/src/queue/manager.rs @@ -30,7 +30,7 @@ pub struct QueueManager { fn get_queue_name_for_namespace(namespace: &Option, name: &str) -> String { match namespace { - Some(namespace) => format!("{}_{}", namespace, name), + Some(namespace) => format!("{namespace}_{name}"), None => name.to_owned(), } } diff --git a/thirdweb-core/src/iaw/mod.rs b/thirdweb-core/src/iaw/mod.rs index d12bddf..ac7489e 100644 --- a/thirdweb-core/src/iaw/mod.rs +++ b/thirdweb-core/src/iaw/mod.rs @@ -126,22 +126,22 @@ impl SignAuthorizationApiResponse { .chain_id .parse() .map_err(|e| IAWError::SerializationError { - message: format!("Invalid chainId: {}", e), + message: format!("Invalid chainId: {e}"), })?; let nonce: u64 = self .nonce .parse() .map_err(|e| IAWError::SerializationError { - message: format!("Invalid nonce: {}", e), + message: format!("Invalid nonce: {e}"), })?; let r: U256 = self.r.parse().map_err(|e| IAWError::SerializationError { - message: format!("Invalid r value: {}", e), + message: format!("Invalid r value: {e}"), })?; let s: U256 = self.s.parse().map_err(|e| IAWError::SerializationError { - message: format!("Invalid s value: {}", e), + message: format!("Invalid s value: {e}"), })?; let y_parity: bool = match self.y_parity.as_str() { @@ -225,8 +225,7 @@ impl IAWClient { headers.insert( "Authorization", reqwest::header::HeaderValue::from_str(&format!( - "Bearer embedded-wallet-token:{}", - auth_token + "Bearer embedded-wallet-token:{auth_token}" )) .map_err(|_| IAWError::AuthError("Invalid auth token format".to_string()))?, ); @@ -305,8 +304,7 @@ impl IAWClient { headers.insert( "Authorization", reqwest::header::HeaderValue::from_str(&format!( - "Bearer embedded-wallet-token:{}", - auth_token + "Bearer embedded-wallet-token:{auth_token}" )) .map_err(|_| IAWError::AuthError("Invalid auth token format".to_string()))?, ); @@ -373,8 +371,7 @@ impl IAWClient { headers.insert( "Authorization", reqwest::header::HeaderValue::from_str(&format!( - "Bearer embedded-wallet-token:{}", - auth_token + "Bearer embedded-wallet-token:{auth_token}" )) .map_err(|_| IAWError::AuthError("Invalid auth token format".to_string()))?, ); @@ -444,8 +441,7 @@ impl IAWClient { headers.insert( "Authorization", reqwest::header::HeaderValue::from_str(&format!( - "Bearer embedded-wallet-token:{}", - auth_token + "Bearer embedded-wallet-token:{auth_token}" )) .map_err(|_| IAWError::AuthError("Invalid auth token format".to_string()))?, ); @@ -492,7 +488,7 @@ impl IAWClient { // Parse the API response into our custom type let api_response: SignAuthorizationApiResponse = serde_json::from_value(signed_response) .map_err(|e| IAWError::SerializationError { - message: format!("Failed to parse sign authorization response: {}", e), + message: format!("Failed to parse sign authorization response: {e}"), })?; // Convert to SignedAuthorization @@ -528,8 +524,7 @@ impl IAWClient { headers.insert( "Authorization", reqwest::header::HeaderValue::from_str(&format!( - "Bearer embedded-wallet-token:{}", - auth_token + "Bearer embedded-wallet-token:{auth_token}" )) .map_err(|_| IAWError::AuthError("Invalid auth token format".to_string()))?, ); diff --git a/twmq/benches/throughput.rs b/twmq/benches/throughput.rs index 6921f7b..6e27def 100644 --- a/twmq/benches/throughput.rs +++ b/twmq/benches/throughput.rs @@ -168,7 +168,7 @@ async fn load_test_throughput( nack_percentage: f64, ) -> (u64, f64, f64, bool) { let test_id = nanoid::nanoid!(8); - let queue_name = format!("bench_queue_{}", test_id); + let queue_name = format!("bench_queue_{test_id}"); let metrics = BenchmarkMetrics::new(); @@ -201,7 +201,7 @@ async fn load_test_throughput( // Clean up any existing data let mut redis_conn = queue.redis.clone(); let keys: Vec = redis::cmd("KEYS") - .arg(format!("{}:*", queue_name)) + .arg(format!("{queue_name}:*")) .query_async(&mut redis_conn) .await .unwrap_or_default(); @@ -241,7 +241,7 @@ async fn load_test_throughput( if queue .clone() .job(job) - .with_id(format!("job_{}", job_counter)) + .with_id(format!("job_{job_counter}")) .push() .await .is_ok() @@ -289,21 +289,20 @@ async fn load_test_throughput( let avg_processing_time = metrics.avg_processing_time_ms(); println!( - "Load Test Results - {}jobs/s for {}s:", - jobs_per_second, duration_seconds + "Load Test Results - {jobs_per_second}jobs/s for {duration_seconds}s:" ); - println!(" Jobs pushed: {}", jobs_pushed); - println!(" Jobs processed: {}", total_processed); + println!(" Jobs pushed: {jobs_pushed}"); + println!(" Jobs processed: {total_processed}"); println!(" Simulated success rate: {:.1}%", success_rate * 100.0); - println!(" Avg processing time: {:.2}ms", avg_processing_time); - println!(" Max queue depth: {}", max_depth); + println!(" Avg processing time: {avg_processing_time:.2}ms"); + println!(" Max queue depth: {max_depth}"); println!(" Final backlog: {}", final_pending + final_active); - println!(" Sustainable: {}", is_sustainable); + println!(" Sustainable: {is_sustainable}"); // Cleanup worker_handle.shutdown().await.unwrap(); let _: () = redis::cmd("DEL") - .arg(format!("{}:*", queue_name)) + .arg(format!("{queue_name}:*")) .query_async(&mut redis_conn) .await .unwrap_or(()); diff --git a/twmq/src/lib.rs b/twmq/src/lib.rs index 96582bb..f2c8dda 100644 --- a/twmq/src/lib.rs +++ b/twmq/src/lib.rs @@ -122,10 +122,10 @@ where H: DurableExecution, { pub redis: ConnectionManager, - handler: Arc, - options: QueueOptions, + pub handler: Arc, + pub options: QueueOptions, // concurrency: usize, - name: String, + pub name: String, } impl Queue { @@ -446,7 +446,7 @@ impl Queue { // Process the cancellation through hook system if let Err(e) = self.process_cancelled_job(job_id).await { tracing::error!( - job_id = %job_id, + job_id = job_id, error = ?e, "Failed to process immediately cancelled job" ); @@ -456,7 +456,7 @@ impl Queue { "cancellation_pending" => Ok(CancelResult::CancellationPending), "not_found" => Ok(CancelResult::NotFound), _ => Err(TwmqError::Runtime { - message: format!("Unexpected cancel result: {}", result), + message: format!("Unexpected cancel result: {result}"), }), } } @@ -552,7 +552,7 @@ impl Queue { .into_iter() .collect::, _>>() .map_err(|e| TwmqError::Runtime { - message: format!("Failed to acquire permits during shutdown: {}", e), + message: format!("Failed to acquire permits during shutdown: {e}"), })?; tracing::info!( @@ -757,10 +757,10 @@ impl Queue { // Log individual lease timeouts and cancellations for job_id in &timed_out_jobs { - tracing::warn!(job_id = %job_id, "Job lease expired, moved back to pending"); + tracing::warn!(job_id = job_id, "Job lease expired, moved back to pending"); } for job_id in &cancelled_jobs { - tracing::info!(job_id = %job_id, "Job cancelled by user request"); + tracing::info!(job_id = job_id, "Job cancelled by user request"); } let mut jobs = Vec::new(); @@ -841,7 +841,7 @@ impl Queue { tokio::spawn(async move { if let Err(e) = queue_clone.process_cancelled_job(&job_id).await { tracing::error!( - job_id = %job_id, + job_id = job_id, error = ?e, "Failed to process cancelled job" ); @@ -884,7 +884,7 @@ impl Queue { pipeline.query_async::<()>(&mut self.redis.clone()).await?; tracing::info!( - job_id = %job_id, + job_id = job_id, "Successfully processed job cancellation hooks" ); @@ -892,7 +892,7 @@ impl Queue { } None => { tracing::warn!( - job_id = %job_id, + job_id = job_id, "Cancelled job not found when trying to process hooks" ); Ok(()) @@ -1190,7 +1190,7 @@ impl Queue { let lease_exists: bool = conn.exists(&lease_key).await?; if !lease_exists { redis::cmd("UNWATCH").query_async::<()>(&mut conn).await?; - tracing::warn!(job_id = %job.job.id, "Lease no longer exists, job was cancelled or timed out"); + tracing::warn!(job_id = job.job.id, "Lease no longer exists, job was cancelled or timed out"); return Ok(()); } @@ -1211,12 +1211,12 @@ impl Queue { Err(JobError::Fail(_)) => self.post_fail_completion().await?, } - tracing::debug!(job_id = %job.job.id, "Job completion successful"); + tracing::debug!(job_id = job.job.id, "Job completion successful"); return Ok(()); } Err(_) => { // WATCH failed (lease key changed), retry - tracing::debug!(job_id = %job.job.id, "WATCH failed during completion, retrying"); + tracing::debug!(job_id = job.job.id, "WATCH failed during completion, retrying"); continue; } } @@ -1290,7 +1290,7 @@ impl Queue { let lease_exists: bool = conn.exists(&lease_key).await?; if !lease_exists { redis::cmd("UNWATCH").query_async::<()>(&mut conn).await?; - tracing::warn!(job_id = %job.id, "Lease no longer exists, job was cancelled or timed out"); + tracing::warn!(job_id = job.id, "Lease no longer exists, job was cancelled or timed out"); return Ok(()); } @@ -1306,12 +1306,12 @@ impl Queue { Ok(_) => { // Success! Run post-completion self.post_fail_completion().await?; - tracing::debug!(job_id = %job.id, "Queue error job completion successful"); + tracing::debug!(job_id = job.id, "Queue error job completion successful"); return Ok(()); } Err(_) => { // WATCH failed (lease key changed), retry - tracing::debug!(job_id = %job.id, "WATCH failed during queue error completion, retrying"); + tracing::debug!(job_id = job.id, "WATCH failed during queue error completion, retrying"); continue; } } diff --git a/twmq/src/multilane.rs b/twmq/src/multilane.rs index bb9b359..e935619 100644 --- a/twmq/src/multilane.rs +++ b/twmq/src/multilane.rs @@ -424,7 +424,7 @@ impl MultilaneQueue { "cancelled_immediately" => { if let Err(e) = self.process_cancelled_job(job_id).await { tracing::error!( - job_id = %job_id, + job_id = job_id, error = ?e, "Failed to process immediately cancelled job" ); @@ -434,7 +434,7 @@ impl MultilaneQueue { "cancellation_pending" => Ok(CancelResult::CancellationPending), "not_found" => Ok(CancelResult::NotFound), _ => Err(TwmqError::Runtime { - message: format!("Unexpected cancel result: {}", result), + message: format!("Unexpected cancel result: {result}"), }), } } @@ -522,7 +522,7 @@ impl MultilaneQueue { .into_iter() .collect::, _>>() .map_err(|e| TwmqError::Runtime { - message: format!("Failed to acquire permits during shutdown: {}", e), + message: format!("Failed to acquire permits during shutdown: {e}"), })?; tracing::info!( @@ -771,10 +771,18 @@ impl MultilaneQueue { // Log lease timeouts and cancellations with lane context for (lane_id, job_id) in &timed_out_jobs { - tracing::warn!(job_id = %job_id, lane_id = %lane_id, "Job lease expired, moved back to pending"); + tracing::warn!( + job_id = job_id, + lane_id = lane_id, + "Job lease expired, moved back to pending" + ); } for (lane_id, job_id) in &cancelled_jobs { - tracing::info!(job_id = %job_id, lane_id = %lane_id, "Job cancelled by user request"); + tracing::info!( + job_id = job_id, + lane_id = lane_id, + "Job cancelled by user request" + ); } let mut jobs = Vec::new(); @@ -856,8 +864,8 @@ impl MultilaneQueue { tokio::spawn(async move { if let Err(e) = queue_clone.process_cancelled_job(&job_id).await { tracing::error!( - job_id = %job_id, - lane_id = %lane_id, + job_id = job_id, + lane_id = lane_id, error = ?e, "Failed to process cancelled job" ); @@ -892,7 +900,7 @@ impl MultilaneQueue { pipeline.query_async::<()>(&mut self.redis.clone()).await?; tracing::info!( - job_id = %job_id, + job_id = job_id, "Successfully processed job cancellation hooks" ); @@ -900,7 +908,7 @@ impl MultilaneQueue { } None => { tracing::warn!( - job_id = %job_id, + job_id = job_id, "Cancelled job not found when trying to process hooks" ); Ok(()) @@ -1216,7 +1224,10 @@ impl MultilaneQueue { let lease_exists: bool = conn.exists(&lease_key).await?; if !lease_exists { redis::cmd("UNWATCH").query_async::<()>(&mut conn).await?; - tracing::warn!(job_id = %job.job.id, "Lease no longer exists, job was cancelled or timed out"); + tracing::warn!( + job_id = job.job.id, + "Lease no longer exists, job was cancelled or timed out" + ); return Ok(()); } @@ -1234,11 +1245,18 @@ impl MultilaneQueue { Err(JobError::Fail(_)) => self.post_fail_completion().await?, } - tracing::debug!(job_id = %job.job.id, lane_id = %lane_id, "Job completion successful"); + tracing::debug!( + job_id = job.job.id, + lane_id = lane_id, + "Job completion successful" + ); return Ok(()); } Err(_) => { - tracing::debug!(job_id = %job.job.id, "WATCH failed during completion, retrying"); + tracing::debug!( + job_id = job.job.id, + "WATCH failed during completion, retrying" + ); continue; } } @@ -1313,7 +1331,10 @@ impl MultilaneQueue { let lease_exists: bool = conn.exists(&lease_key).await?; if !lease_exists { redis::cmd("UNWATCH").query_async::<()>(&mut conn).await?; - tracing::warn!(job_id = %job.id, "Lease no longer exists, job was cancelled or timed out"); + tracing::warn!( + job_id = job.id, + "Lease no longer exists, job was cancelled or timed out" + ); return Ok(()); } @@ -1326,11 +1347,18 @@ impl MultilaneQueue { { Ok(_) => { self.post_fail_completion().await?; - tracing::debug!(job_id = %job.id, lane_id = %lane_id, "Queue error job completion successful"); + tracing::debug!( + job_id = job.id, + lane_id = lane_id, + "Queue error job completion successful" + ); return Ok(()); } Err(_) => { - tracing::debug!(job_id = %job.id, "WATCH failed during queue error completion, retrying"); + tracing::debug!( + job_id = job.id, + "WATCH failed during queue error completion, retrying" + ); continue; } } diff --git a/twmq/src/shutdown.rs b/twmq/src/shutdown.rs index 04341c3..a0ce94b 100644 --- a/twmq/src/shutdown.rs +++ b/twmq/src/shutdown.rs @@ -63,7 +63,7 @@ impl WorkerHandle { self.queue.queue_name(), e ); - Err(TwmqError::Runtime { message: format!("Worker panic: {}", e) }) + Err(TwmqError::Runtime { message: format!("Worker panic: {e}") }) } } } @@ -117,7 +117,7 @@ impl ShutdownHandle { errors.push(e); } Err(e) => { - let runtime_error = TwmqError::Runtime { message: format!("Worker {} panic: {}", i, e) }; + let runtime_error = TwmqError::Runtime { message: format!("Worker {i} panic: {e}") }; tracing::error!("Worker {} task panicked during shutdown: {:?}", i, e); errors.push(runtime_error); } diff --git a/twmq/tests/basic.rs b/twmq/tests/basic.rs index 8349e06..0c5d2f6 100644 --- a/twmq/tests/basic.rs +++ b/twmq/tests/basic.rs @@ -18,7 +18,7 @@ const REDIS_URL: &str = "redis://127.0.0.1:6379/"; // Helper to clean up Redis keys for a given queue name pattern async fn cleanup_redis_keys(conn_manager: &ConnectionManager, queue_name: &str) { let mut conn = conn_manager.clone(); - let keys_pattern = format!("twmq:{}:*", queue_name); + let keys_pattern = format!("twmq:{queue_name}:*"); let keys: Vec = redis::cmd("KEYS") .arg(&keys_pattern) @@ -32,7 +32,7 @@ async fn cleanup_redis_keys(conn_manager: &ConnectionManager, queue_name: &str) .await .unwrap_or_default(); } - println!("Cleaned up keys for pattern: {}", keys_pattern); + println!("Cleaned up keys for pattern: {keys_pattern}"); } #[tokio::test(flavor = "multi_thread", worker_threads = 4)] @@ -51,7 +51,7 @@ async fn test_queue_push_and_process_job() { // Reset the flag before each test run TEST_JOB_PROCESSED_SUCCESSFULLY.store(false, Ordering::SeqCst); - println!("Creating queue: {}", queue_name); + println!("Creating queue: {queue_name}"); let basic_handler = TestJobHandler; @@ -74,7 +74,7 @@ async fn test_queue_push_and_process_job() { id_to_check: test_job_id.clone(), }; - println!("Pushing job with ID: {}", test_job_id); + println!("Pushing job with ID: {test_job_id}"); let job_options = JobOptions { data: job_payload, id: test_job_id.clone(), @@ -93,7 +93,7 @@ async fn test_queue_push_and_process_job() { "There should be 1 job in the pending list" ); - println!("Starting worker for queue: {}", queue_name); + println!("Starting worker for queue: {queue_name}"); let worker_queue_ref = Arc::clone(&queue); let worker_handle = worker_queue_ref.work(); diff --git a/twmq/tests/basic_hook.rs b/twmq/tests/basic_hook.rs index eed4536..b1f43b2 100644 --- a/twmq/tests/basic_hook.rs +++ b/twmq/tests/basic_hook.rs @@ -23,7 +23,7 @@ use twmq::{ // Helper to clean up Redis keys for a given queue name pattern async fn cleanup_redis_keys(conn_manager: &ConnectionManager, queue_name: &str) { let mut conn = conn_manager.clone(); - let keys_pattern = format!("twmq:{}:*", queue_name); + let keys_pattern = format!("twmq:{queue_name}:*"); let keys: Vec = redis::cmd("KEYS") .arg(&keys_pattern) @@ -37,7 +37,7 @@ async fn cleanup_redis_keys(conn_manager: &ConnectionManager, queue_name: &str) .await .unwrap_or_default(); } - println!("Cleaned up keys for pattern: {}", keys_pattern); + println!("Cleaned up keys for pattern: {keys_pattern}"); } // Define webhook job types @@ -174,10 +174,10 @@ async fn test_cross_queue_job_scheduling() { MAIN_JOB_PROCESSED.store(false, Ordering::SeqCst); WEBHOOK_JOB_PROCESSED.store(false, Ordering::SeqCst); - println!("Creating main queue: {}", main_queue_name); - println!("Creating webhook queue: {}", webhook_queue_name); + println!("Creating main queue: {main_queue_name}"); + println!("Creating webhook queue: {webhook_queue_name}"); - let mut queue_options = QueueOptions { + let queue_options = QueueOptions { local_concurrency: 1, ..Default::default() }; @@ -257,8 +257,7 @@ async fn test_cross_queue_job_scheduling() { let webhook_success = webhook_queue.count(JobStatus::Success).await.unwrap(); println!( - "Webhook queue - Pending: {}, Success: {}", - webhook_pending, webhook_success + "Webhook queue - Pending: {webhook_pending}, Success: {webhook_success}" ); // Either the webhook job is still pending or already succeeded diff --git a/twmq/tests/delay.rs b/twmq/tests/delay.rs index c8d12f4..ef42509 100644 --- a/twmq/tests/delay.rs +++ b/twmq/tests/delay.rs @@ -23,7 +23,7 @@ const REDIS_URL: &str = "redis://127.0.0.1:6379/"; // Helper to clean up Redis keys async fn cleanup_redis_keys(conn_manager: &ConnectionManager, queue_name: &str) { let mut conn = conn_manager.clone(); - let keys_pattern = format!("twmq:{}:*", queue_name); + let keys_pattern = format!("twmq:{queue_name}:*"); let keys: Vec = redis::cmd("KEYS") .arg(&keys_pattern) .query_async(&mut conn) @@ -109,7 +109,7 @@ async fn test_job_delay_basic() { .init(); let test_id = nanoid::nanoid!(); - let queue_name = format!("test_delay_{}", test_id); + let queue_name = format!("test_delay_{test_id}"); let job_id = "delay_job_001"; let delay_duration = Duration::from_secs(2); @@ -269,9 +269,7 @@ async fn test_job_delay_basic() { assert!( actual_delay >= expected_delay && actual_delay <= expected_delay + 2, - "Actual delay ({}) should be close to expected delay ({})", - actual_delay, - expected_delay + "Actual delay ({actual_delay}) should be close to expected delay ({expected_delay})" ); tracing::info!("✅ Basic delay mechanism works correctly!"); @@ -281,7 +279,7 @@ async fn test_job_delay_basic() { // Clean up test-specific keys let _: () = redis_conn_direct - .del(format!("test:{}:processing_order", test_id)) + .del(format!("test:{test_id}:processing_order")) .await .unwrap_or(()); } @@ -291,7 +289,7 @@ async fn test_delay_position_ordering() { // Test that delayed jobs respect RequeuePosition when they expire let test_id = nanoid::nanoid!(); - let queue_name = format!("test_delay_order_{}", test_id); + let queue_name = format!("test_delay_order_{test_id}"); tracing::info!( "\n=== Testing delay position ordering (test_id: {}) ===", @@ -410,7 +408,7 @@ async fn test_delay_position_ordering() { tokio::time::sleep(Duration::from_millis(200)).await; // Check processing order from Redis - let order_key = format!("test:{}:processing_order", test_id); + let order_key = format!("test:{test_id}:processing_order"); let mut redis_conn_direct = redis_conn.as_ref().clone(); let processing_order: Vec = redis_conn_direct .lrange(&order_key, 0, -1) diff --git a/twmq/tests/idempotency_modes.rs b/twmq/tests/idempotency_modes.rs index 663920b..ce1d0f3 100644 --- a/twmq/tests/idempotency_modes.rs +++ b/twmq/tests/idempotency_modes.rs @@ -68,7 +68,7 @@ impl DurableExecution for TestJobHandler { // Helper to clean up Redis keys async fn cleanup_redis_keys(conn_manager: &ConnectionManager, queue_name: &str) { let mut conn = conn_manager.clone(); - let keys_pattern = format!("twmq:{}:*", queue_name); + let keys_pattern = format!("twmq:{queue_name}:*"); let keys: Vec = redis::cmd("KEYS") .arg(&keys_pattern) @@ -89,9 +89,11 @@ async fn test_permanent_idempotency_mode() { let queue_name = format!("test_perm_{}", nanoid::nanoid!(6)); let processed_count = Arc::new(AtomicUsize::new(0)); - let mut queue_options = QueueOptions::default(); - queue_options.idempotency_mode = IdempotencyMode::Permanent; - queue_options.local_concurrency = 1; + let queue_options = QueueOptions { + idempotency_mode: IdempotencyMode::Permanent, + local_concurrency: 1, + ..Default::default() + }; let handler = TestJobHandler { processed_count: processed_count.clone(), @@ -174,9 +176,11 @@ async fn test_active_idempotency_mode() { let queue_name = format!("test_active_{}", nanoid::nanoid!(6)); let processed_count = Arc::new(AtomicUsize::new(0)); - let mut queue_options = QueueOptions::default(); - queue_options.idempotency_mode = IdempotencyMode::Active; - queue_options.local_concurrency = 1; + let queue_options = QueueOptions { + idempotency_mode: IdempotencyMode::Active, + local_concurrency: 1, + ..Default::default() + }; let handler = TestJobHandler { processed_count: processed_count.clone(), diff --git a/twmq/tests/lease_expiry.rs b/twmq/tests/lease_expiry.rs index 4c11378..773c100 100644 --- a/twmq/tests/lease_expiry.rs +++ b/twmq/tests/lease_expiry.rs @@ -23,7 +23,7 @@ const REDIS_URL: &str = "redis://127.0.0.1:6379/"; // Helper to clean up Redis keys async fn cleanup_redis_keys(conn_manager: &ConnectionManager, queue_name: &str) { let mut conn = conn_manager.clone(); - let keys_pattern = format!("twmq:{}:*", queue_name); + let keys_pattern = format!("twmq:{queue_name}:*"); let keys: Vec = redis::cmd("KEYS") .arg(&keys_pattern) .query_async(&mut conn) @@ -329,7 +329,7 @@ async fn test_multiple_job_lease_expiry() { for job_id in job_ids { let sleep_job = SleepForeverJobData { id_to_check: job_id.to_string(), - message: format!("Multi-job test: {}", job_id), + message: format!("Multi-job test: {job_id}"), }; queue diff --git a/twmq/tests/multilane_batch_pop.rs b/twmq/tests/multilane_batch_pop.rs index bc9e111..450c54b 100644 --- a/twmq/tests/multilane_batch_pop.rs +++ b/twmq/tests/multilane_batch_pop.rs @@ -165,7 +165,7 @@ impl Drop for MultilaneTestHarness { tokio::spawn(async move { let mut conn = redis; - let keys_pattern = format!("twmq_multilane:{}:*", queue_id); + let keys_pattern = format!("twmq_multilane:{queue_id}:*"); let keys: Vec = redis::cmd("KEYS") .arg(&keys_pattern) .query_async(&mut conn) @@ -198,7 +198,7 @@ async fn multilane_test_batch_pop_single_lane_with_100k_empty_lanes() { for i in 0..100 { jobs.push(TestJob { id: i, - data: format!("job_{}", i), + data: format!("job_{i}"), }); } jobs_per_lane.insert(active_lane.clone(), jobs); @@ -207,7 +207,7 @@ async fn multilane_test_batch_pop_single_lane_with_100k_empty_lanes() { // We do this by adding empty lanes to the zset directly let mut conn = harness.queue.redis.clone(); for i in 0..99_999 { - let lane_id = format!("empty_lane_{}", i); + let lane_id = format!("empty_lane_{i}"); // Add lane to lanes zset with score 0 (never processed) redis::cmd("ZADD") .arg(harness.queue.lanes_zset_name()) @@ -245,7 +245,7 @@ async fn multilane_test_batch_pop_single_lane_with_100k_empty_lanes() { .expect("Batch pop should complete within 10 seconds"); let duration = start.elapsed(); - println!("✅ Batch pop completed in {:?}", duration); + println!("✅ Batch pop completed in {duration:?}"); // Verify results assert_eq!(result.len(), 1, "Should get jobs from exactly 1 lane"); @@ -289,10 +289,10 @@ async fn multilane_test_batch_pop_distributed_jobs_across_100k_lanes() { // Create 200 jobs distributed across 200 different lanes (1 job per lane) let mut jobs_per_lane = HashMap::new(); for i in 0..200 { - let lane_id = format!("lane_{}", i); + let lane_id = format!("lane_{i}"); let job = TestJob { id: i, - data: format!("job_{}", i), + data: format!("job_{i}"), }; jobs_per_lane.insert(lane_id, vec![job]); } @@ -300,7 +300,7 @@ async fn multilane_test_batch_pop_distributed_jobs_across_100k_lanes() { // Add 99,800 empty lanes to reach 100,000 total let mut conn = harness.queue.redis.clone(); for i in 200..100_000 { - let lane_id = format!("empty_lane_{}", i); + let lane_id = format!("empty_lane_{i}"); redis::cmd("ZADD") .arg(harness.queue.lanes_zset_name()) .arg("NX") @@ -328,14 +328,12 @@ async fn multilane_test_batch_pop_distributed_jobs_across_100k_lanes() { .expect("First batch pop should complete within 10 seconds"); let duration1 = start.elapsed(); println!( - "[200 jobs - 200/100k lanes] ✅ First batch pop completed in {:?}", - duration1 + "[200 jobs - 200/100k lanes] ✅ First batch pop completed in {duration1:?}" ); let new_lanes_count = harness.queue.lanes_count().await.unwrap(); println!( - "[200 jobs - 200/100k lanes] New lanes count after initial batch pop: {}", - new_lanes_count + "[200 jobs - 200/100k lanes] New lanes count after initial batch pop: {new_lanes_count}" ); let total_jobs_1: usize = result1.values().map(|jobs| jobs.len()).sum(); @@ -358,8 +356,7 @@ async fn multilane_test_batch_pop_distributed_jobs_across_100k_lanes() { .expect("Second batch pop should complete within 10 seconds"); let duration2 = start.elapsed(); println!( - "[200 jobs - 200/100k lanes] ✅ Second batch pop completed in {:?}", - duration2 + "[200 jobs - 200/100k lanes] ✅ Second batch pop completed in {duration2:?}" ); let total_jobs_2: usize = result2.values().map(|jobs| jobs.len()).sum(); @@ -381,7 +378,7 @@ async fn multilane_test_batch_pop_distributed_jobs_across_100k_lanes() { .await .expect("Third batch pop should complete within 10 seconds"); let duration3 = start.elapsed(); - println!("✅ Third batch pop completed in {:?}", duration3); + println!("✅ Third batch pop completed in {duration3:?}"); let total_jobs_3: usize = result3.values().map(|jobs| jobs.len()).sum(); assert_eq!(total_jobs_3, 0, "Third batch should return 0 jobs"); @@ -429,12 +426,12 @@ async fn multilane_test_batch_pop_fairness_across_lanes() { // Create 10 lanes, each with 10 jobs (100 total) let mut jobs_per_lane = HashMap::new(); for lane_num in 0..10 { - let lane_id = format!("lane_{}", lane_num); + let lane_id = format!("lane_{lane_num}"); let mut jobs = Vec::new(); for job_num in 0..10 { jobs.push(TestJob { id: lane_num * 10 + job_num, - data: format!("job_{}_{}", lane_num, job_num), + data: format!("job_{lane_num}_{job_num}"), }); } jobs_per_lane.insert(lane_id, jobs); @@ -448,17 +445,15 @@ async fn multilane_test_batch_pop_fairness_across_lanes() { assert_eq!(result.len(), 10, "Should get jobs from all 10 lanes"); for lane_num in 0..10 { - let lane_id = format!("lane_{}", lane_num); + let lane_id = format!("lane_{lane_num}"); assert!( result.contains_key(&lane_id), - "Should have job from lane {}", - lane_num + "Should have job from lane {lane_num}" ); assert_eq!( result[&lane_id].len(), 1, - "Should get exactly 1 job from lane {}", - lane_num + "Should get exactly 1 job from lane {lane_num}" ); } diff --git a/twmq/tests/nack.rs b/twmq/tests/nack.rs index f75dc72..f66cd9f 100644 --- a/twmq/tests/nack.rs +++ b/twmq/tests/nack.rs @@ -26,7 +26,7 @@ const REDIS_URL: &str = "redis://127.0.0.1:6379/"; // Helper to clean up Redis keys async fn cleanup_redis_keys(conn_manager: &ConnectionManager, queue_name: &str) { let mut conn = conn_manager.clone(); - let keys_pattern = format!("twmq:{}:*", queue_name); + let keys_pattern = format!("twmq:{queue_name}:*"); let keys: Vec = redis::cmd("KEYS") .arg(&keys_pattern) .query_async(&mut conn) @@ -107,7 +107,7 @@ impl DurableExecution for RetryJobHandler { Ok(RetryJobOutput { final_attempt: current_attempt, - message: format!("Succeeded after {} attempts", current_attempt), + message: format!("Succeeded after {current_attempt} attempts"), }) } } @@ -271,8 +271,7 @@ async fn test_job_retry_attempts() { assert_eq!( job_output.final_attempt, desired_attempts, - "Job result should show final attempt as {}", - desired_attempts + "Job result should show final attempt as {desired_attempts}" ); tracing::info!("✅ Retry mechanism works correctly!"); @@ -296,7 +295,7 @@ async fn test_different_retry_counts() { tracing::info!("\n=== Testing {} attempts ===", desired_attempts); let queue_name = format!("test_retry_{}_{}", desired_attempts, nanoid::nanoid!(4)); - let job_id = format!("retry_job_{}", desired_attempts); + let job_id = format!("retry_job_{desired_attempts}"); // Reset counters RETRY_JOB_FINAL_SUCCESS.store(false, Ordering::SeqCst);