Skip to content

Commit 3342fd2

Browse files
authored
Merge pull request #16 from flashbots/block-processor-signing
New block processor with signed protocol.
2 parents dca2a6c + 2b4937b commit 3342fd2

File tree

7 files changed

+452
-67
lines changed

7 files changed

+452
-67
lines changed

Cargo.lock

Lines changed: 11 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,17 @@ prost = "0.11"
4141
tokio-stream = { version = "0.1", features = ["net"] }
4242
futures = "0.3"
4343
tower = "0.4"
44-
45-
46-
44+
reqwest = { version = "0.11.20", features = ["blocking"] }
45+
secp256k1 = { version = "0.29" }
46+
alloy-signer-local = { version = "0.3.0" }
47+
alloy-signer = { version = "0.3.0" }
48+
alloy-transport-http = { version = "0.3.0" }
49+
alloy-transport = { version = "0.3.0" }
50+
alloy-rpc-client = { version = "0.3.0" }
51+
url = "2.4.1"
52+
http = "0.2.9"
53+
hyper = "0.14"
54+
futures-util = "0.3"
4755

4856
metrics_macros = { git = "https://github.com/flashbots/rbuilder.git", rev = "d96e7215483bac0ab145459f4ddaa811d99459d6"}
4957

config-live-example.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ dry_run = true
2222
dry_run_validation_url = "http://localhost:8545"
2323

2424
blocks_processor_url = "http://block_processor.internal"
25+
key_registration_url = "http://127.0.0.1:8090"
2526
ignore_cancellable_orders = true
2627

2728
sbundle_mergeabe_signers = []

src/blocks_processor.rs

Lines changed: 101 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
1-
use alloy_json_rpc::RpcError;
21
use alloy_primitives::{BlockHash, U256};
3-
use alloy_provider::Provider;
2+
use jsonrpsee::{
3+
core::{client::ClientT, traits::ToRpcParams},
4+
http_client::{HttpClient, HttpClientBuilder},
5+
};
46
use rbuilder::{
57
building::BuiltBlockTrace,
68
live_builder::block_output::bid_observer::BidObserver,
@@ -9,10 +11,11 @@ use rbuilder::{
911
serialize::{RawBundle, RawShareBundle},
1012
Order,
1113
},
12-
utils::{error_storage::store_error_event, http_provider, BoxedProvider},
14+
utils::error_storage::store_error_event,
1315
};
1416
use reth::primitives::SealedBlock;
1517
use serde::{Deserialize, Serialize};
18+
use serde_json::value::RawValue;
1619
use serde_with::{serde_as, DisplayFromStr};
1720
use std::sync::Arc;
1821
use time::format_description::well_known;
@@ -21,11 +24,8 @@ use tracing::{debug, error, warn, Span};
2124
use crate::metrics::inc_blocks_api_errors;
2225

2326
const BLOCK_PROCESSOR_ERROR_CATEGORY: &str = "block_processor";
24-
25-
#[derive(Debug, Clone)]
26-
pub struct BlocksProcessorClient {
27-
client: Arc<BoxedProvider>,
28-
}
27+
const DEFAULT_BLOCK_CONSUME_BUILT_BLOCK_METHOD: &str = "block_consumeBuiltBlockV2";
28+
pub const SIGNED_BLOCK_CONSUME_BUILT_BLOCK_METHOD: &str = "flashbots_consumeBuiltBlockV2";
2929

3030
#[derive(Debug, Serialize, Deserialize)]
3131
#[serde(rename_all = "camelCase")]
@@ -64,13 +64,68 @@ struct BlocksProcessorHeader {
6464
pub number: Option<U256>,
6565
}
6666

67-
impl BlocksProcessorClient {
67+
type ConsumeBuiltBlockRequest = (
68+
BlocksProcessorHeader,
69+
String,
70+
String,
71+
Vec<UsedBundle>,
72+
Vec<UsedBundle>,
73+
Vec<UsedSbundle>,
74+
reth::rpc::types::beacon::relay::BidTrace,
75+
String,
76+
U256,
77+
U256,
78+
);
79+
80+
/// Struct to avoid copying ConsumeBuiltBlockRequest since HttpClient::request eats the parameter.
81+
#[derive(Clone)]
82+
struct ConsumeBuiltBlockRequestArc {
83+
inner: Arc<ConsumeBuiltBlockRequest>,
84+
}
85+
86+
impl ConsumeBuiltBlockRequestArc {
87+
fn new(request: ConsumeBuiltBlockRequest) -> Self {
88+
Self {
89+
inner: Arc::new(request),
90+
}
91+
}
92+
fn as_ref(&self) -> &ConsumeBuiltBlockRequest {
93+
self.inner.as_ref()
94+
}
95+
}
96+
97+
impl ToRpcParams for ConsumeBuiltBlockRequestArc {
98+
fn to_rpc_params(self) -> Result<Option<Box<RawValue>>, jsonrpsee::core::Error> {
99+
let json = serde_json::to_string(self.inner.as_ref())
100+
.map_err(jsonrpsee::core::Error::ParseError)?;
101+
RawValue::from_string(json)
102+
.map(Some)
103+
.map_err(jsonrpsee::core::Error::ParseError)
104+
}
105+
}
106+
107+
#[derive(Debug, Clone)]
108+
pub struct BlocksProcessorClient<HttpClientType> {
109+
client: HttpClientType,
110+
consume_built_block_method: &'static str,
111+
}
112+
113+
impl BlocksProcessorClient<HttpClient> {
68114
pub fn try_from(url: &str) -> eyre::Result<Self> {
69115
Ok(Self {
70-
client: Arc::new(http_provider(url.parse()?)),
116+
client: HttpClientBuilder::default().build(url)?,
117+
consume_built_block_method: DEFAULT_BLOCK_CONSUME_BUILT_BLOCK_METHOD,
71118
})
72119
}
120+
}
73121

122+
impl<HttpClientType: ClientT> BlocksProcessorClient<HttpClientType> {
123+
pub fn new(client: HttpClientType, consume_built_block_method: &'static str) -> Self {
124+
Self {
125+
client,
126+
consume_built_block_method,
127+
}
128+
}
74129
pub async fn submit_built_block(
75130
&self,
76131
sealed_block: &SealedBlock,
@@ -115,7 +170,7 @@ impl BlocksProcessorClient {
115170

116171
let used_share_bundles = Self::get_used_sbundles(built_block_trace);
117172

118-
let params = (
173+
let params: ConsumeBuiltBlockRequest = (
119174
header,
120175
closed_at,
121176
sealed_at,
@@ -127,58 +182,47 @@ impl BlocksProcessorClient {
127182
built_block_trace.true_bid_value,
128183
best_bid_value,
129184
);
130-
185+
let request = ConsumeBuiltBlockRequestArc::new(params);
131186
match self
132187
.client
133-
.raw_request("block_consumeBuiltBlockV2".into(), &params)
188+
.request(self.consume_built_block_method, request.clone())
134189
.await
135190
{
136191
Ok(()) => {}
137192
Err(err) => {
138-
match &err {
139-
RpcError::ErrorResp(err) => {
140-
error!(err = ?err, "Block processor returned error");
141-
store_error_event(
142-
BLOCK_PROCESSOR_ERROR_CATEGORY,
143-
&err.to_string(),
144-
&params,
145-
);
146-
}
147-
RpcError::SerError(err) => {
148-
error!(err = ?err, "Failed to serialize block processor request");
149-
}
150-
RpcError::DeserError { err, text } => {
151-
if !(text.contains("504 Gateway Time-out")
152-
|| text.contains("502 Bad Gateway"))
153-
{
154-
error!(err = ?err, "Failed to deserialize block processor response");
155-
store_error_event(
156-
BLOCK_PROCESSOR_ERROR_CATEGORY,
157-
&err.to_string(),
158-
&params,
159-
);
160-
}
161-
}
162-
RpcError::Transport(err) => {
163-
debug!(err = ?err, "Failed to send block processor request");
164-
}
165-
RpcError::NullResp => {
166-
error!("Block processor returned null response");
167-
}
168-
RpcError::UnsupportedFeature(err) => {
169-
error!(err = ?err, "Unsupported feature");
170-
}
171-
RpcError::LocalUsageError(err) => {
172-
error!(err = ?err, "Local usage error");
173-
}
174-
}
193+
Self::handle_rpc_error(&err, request.as_ref());
175194
return Err(err.into());
176195
}
177196
}
178197

179198
Ok(())
180199
}
181200

201+
/// T is parametric just because it too BIG to type
202+
fn handle_rpc_error(err: &jsonrpsee::core::Error, request: &ConsumeBuiltBlockRequest) {
203+
match err {
204+
jsonrpsee::core::Error::Call(error_object) => {
205+
error!(err = ?error_object, "Block processor returned error");
206+
store_error_event(BLOCK_PROCESSOR_ERROR_CATEGORY, &err.to_string(), request);
207+
}
208+
jsonrpsee::core::Error::Transport(_) => {
209+
debug!(err = ?err, "Failed to send block processor request");
210+
}
211+
jsonrpsee::core::Error::ParseError(error) => {
212+
error!(err = ?err, "Failed to deserialize block processor response");
213+
let error_txt = error.to_string();
214+
if !(error_txt.contains("504 Gateway Time-out")
215+
|| error_txt.contains("502 Bad Gateway"))
216+
{
217+
store_error_event(BLOCK_PROCESSOR_ERROR_CATEGORY, &err.to_string(), request);
218+
}
219+
}
220+
_ => {
221+
error!(err = ?err, "Block processor error");
222+
}
223+
}
224+
}
225+
182226
/// Gets the UsedSbundle carefully considering virtual orders formed by other original orders.
183227
fn get_used_sbundles(built_block_trace: &BuiltBlockTrace) -> Vec<UsedSbundle> {
184228
built_block_trace
@@ -229,17 +273,19 @@ impl BlocksProcessorClient {
229273

230274
/// BidObserver sending all data to a BlocksProcessorClient
231275
#[derive(Debug)]
232-
pub struct BlocksProcessorClientBidObserver {
233-
client: BlocksProcessorClient,
276+
pub struct BlocksProcessorClientBidObserver<HttpClientType> {
277+
client: BlocksProcessorClient<HttpClientType>,
234278
}
235279

236-
impl BlocksProcessorClientBidObserver {
237-
pub fn new(client: BlocksProcessorClient) -> Self {
280+
impl<HttpClientType> BlocksProcessorClientBidObserver<HttpClientType> {
281+
pub fn new(client: BlocksProcessorClient<HttpClientType>) -> Self {
238282
Self { client }
239283
}
240284
}
241285

242-
impl BidObserver for BlocksProcessorClientBidObserver {
286+
impl<HttpClientType: ClientT + Clone + Send + Sync + std::fmt::Debug + 'static> BidObserver
287+
for BlocksProcessorClientBidObserver<HttpClientType>
288+
{
243289
fn block_submitted(
244290
&self,
245291
sealed_block: SealedBlock,

0 commit comments

Comments
 (0)