-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathblockchain_client.py
More file actions
247 lines (213 loc) · 8.38 KB
/
blockchain_client.py
File metadata and controls
247 lines (213 loc) · 8.38 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
"""
blockchain_client.py
====================
Python bridge to the local Hardhat blockchain node.
This module provides a `BlockchainClient` that:
- Connects to the Hardhat JSON-RPC node (http://127.0.0.1:8545)
- Loads contract addresses + ABIs from deploy_output.json
- Exposes record_* methods for each simulation action type
- Handles unavailability gracefully (returns None, never raises)
The `onlyOwner` modifier on the VulnerableVault record_* functions
means only the deployer (accounts[0]) can call them, which is exactly
what we use here.
"""
import json
import logging
import os
from pathlib import Path
from typing import Optional, Dict, Any
logger = logging.getLogger(__name__)
# ── Lazy web3 import ─────────────────────────────────────────────────
try:
from web3 import Web3
from web3.exceptions import ContractLogicError
_WEB3_AVAILABLE = True
except ImportError:
_WEB3_AVAILABLE = False
logger.warning("web3 not installed. Blockchain features disabled. Run: pip install web3")
def _find_deploy_output() -> Optional[Path]:
"""Search for deploy_output.json in common locations."""
candidates = [
Path(__file__).parent / "deploy_output.json",
Path(__file__).parent / "blockchain" / "deploy_output.json",
]
for p in candidates:
if p.exists():
return p
return None
class BlockchainClient:
"""
Thin wrapper around Web3 for the DeFi simulation lab.
All methods return a tx_hash string on success or None on failure.
They never raise — failures are logged as warnings.
"""
def __init__(self, rpc_url: str = "http://127.0.0.1:8545"):
self._available = False
self._w3: Optional[Any] = None
self._vault = None
self._deployer = None
self._addresses: Dict[str, str] = {}
if not _WEB3_AVAILABLE:
return
try:
self._w3 = Web3(Web3.HTTPProvider(rpc_url, request_kwargs={"timeout": 5}))
if not self._w3.is_connected():
logger.warning("Hardhat node not reachable at %s", rpc_url)
return
deploy_path = _find_deploy_output()
if not deploy_path:
logger.warning(
"deploy_output.json not found. "
"Run: cd blockchain && npx hardhat run scripts/deploy.js --network localhost"
)
return
with open(deploy_path, encoding="utf-8") as f:
data = json.load(f)
self._addresses = data["addresses"]
vault_abi = data["abis"]["VulnerableVault"]
vault_addr = self._addresses["VulnerableVault"]
self._vault = self._w3.eth.contract(
address=Web3.to_checksum_address(vault_addr),
abi=vault_abi,
)
# Use account[0] (the deployer) as the signer
self._deployer = self._w3.eth.accounts[0]
self._available = True
logger.info(
"BlockchainClient connected | node=%s | block=%s | vault=%s",
rpc_url, self._w3.eth.block_number, vault_addr,
)
except Exception as exc:
logger.warning("BlockchainClient init failed: %s", exc)
# ── Public API ────────────────────────────────────────────────────
@property
def available(self) -> bool:
return self._available
def get_status(self) -> Dict[str, Any]:
"""Return a status dict for the /api/blockchain/status endpoint."""
if not self._available or not self._w3:
return {"connected": False, "addresses": {}}
try:
return {
"connected": True,
"block_number": self._w3.eth.block_number,
"chain_id": self._w3.eth.chain_id,
"deployer": self._deployer,
"addresses": self._addresses,
}
except Exception as exc:
logger.warning("get_status error: %s", exc)
return {"connected": False, "addresses": {}}
def get_transaction(self, tx_hash: str) -> Optional[Dict[str, Any]]:
"""Return receipt dict for a given tx hash, or None."""
if not self._available or not self._w3:
return None
try:
receipt = self._w3.eth.get_transaction_receipt(tx_hash)
return {
"tx_hash": tx_hash,
"block_number": receipt.blockNumber,
"gas_used": receipt.gasUsed,
"status": receipt.status, # 1 = success
}
except Exception as exc:
logger.debug("get_transaction(%s) error: %s", tx_hash, exc)
return None
# ── Record functions — fire-and-forget setters ─────────────────
def record_swap(
self,
caller: str,
token_in: str,
amount_in: float,
token_out: str,
amount_out: float,
new_amm_price: float,
price_impact: float, # 0.0 → 1.0
) -> Optional[str]:
msg_value = 0
if token_in == "ETH":
msg_value = int(amount_in * 10**18)
return self._send(
"recordSwap",
self._addr(caller),
token_in,
int(amount_in * 1e6),
token_out,
int(amount_out * 1e6),
int(new_amm_price * 1e6),
int(price_impact * 10_000), # basis points
value=msg_value
)
def record_liquidation(
self,
liquidator: str,
borrower: str,
debt_repaid: float,
collateral_seized: float,
bonus: float,
) -> Optional[str]:
return self._send(
"recordLiquidation",
self._addr(liquidator),
self._addr(borrower),
int(debt_repaid * 1e6),
int(collateral_seized * 1e6),
int(bonus * 1e6),
)
def record_borrow(
self,
borrower: str,
amount_usdc: float,
health_factor: float,
) -> Optional[str]:
if health_factor == float('inf') or health_factor > 1_000_000.0:
health_factor = 1_000_000.0
return self._send(
"recordBorrow",
self._addr(borrower),
int(amount_usdc * 1e6),
int(health_factor * 1e6),
)
def record_supply(
self,
supplier: str,
token_sym: str,
amount: float,
lp_minted: float,
) -> Optional[str]:
return self._send(
"recordSupply",
self._addr(supplier),
token_sym,
int(amount * 1e6),
int(lp_minted * 1e6),
)
# ── Internal helpers ──────────────────────────────────────────────
def _addr(self, raw: str) -> str:
"""
Convert a string address to checksum format.
Agent IDs like 'borrower_0001' are hashed into a deterministic
pseudo-address for on-chain record-keeping.
"""
if raw.startswith("0x") and len(raw) == 42:
return Web3.to_checksum_address(raw)
# Derive 20-byte address from the string (simulation address)
h = Web3.keccak(text=raw).hex()
return Web3.to_checksum_address("0x" + h[-40:])
def _send(self, fn_name: str, *args, **kwargs) -> Optional[str]:
"""Call a write function on the VulnerableVault and return its tx hash."""
if not self._available or not self._vault:
return None
try:
tx_params = {"from": self._deployer}
if "value" in kwargs and kwargs["value"] > 0:
tx_params["value"] = kwargs["value"]
fn = getattr(self._vault.functions, fn_name)
tx = fn(*args).transact(tx_params)
self._w3.eth.wait_for_transaction_receipt(tx, timeout=10)
tx_hex = tx.hex() if hasattr(tx, "hex") else str(tx)
logger.debug("blockchain.%s → %s", fn_name, tx_hex[:16] + "…")
return tx_hex
except Exception as exc:
logger.warning("blockchain.%s failed: %s", fn_name, exc)
return None