-
Notifications
You must be signed in to change notification settings - Fork 5
Expand file tree
/
Copy pathblocktorrent.py
More file actions
457 lines (395 loc) · 20.6 KB
/
blocktorrent.py
File metadata and controls
457 lines (395 loc) · 20.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
#!/usr/bin/python
# Public Domain
import config
import lib.logs as logs
from lib.logs import debuglog, log
import socket, select, threading, urllib2, sys, binascii, StringIO, traceback, time, random
from lib import authproxy, mininode, util, bttrees
import traceback
import btnet
from btnet import BTMessage
logs.debuglevels.extend(['btnet', 'bttree'])
rpchost = config.RPCHOST
rpcusername = config.RPCUSERNAME
rpcpassword = config.RPCPASSWORD
for arg in sys.argv:
if arg.startswith('--host='): rpchost = arg.split('--host=')[1].strip()
if arg.startswith('--username='): rpcusername = arg.split('--username=')[1].strip()
if arg.startswith('--password='): rpcpassword = arg.split('--password=')[1].strip()
def gbt():
auth_handler = urllib2.HTTPBasicAuthHandler()
proxy = authproxy.AuthServiceProxy('http://%s:%s@%s:8332' % (rpcusername, rpcpassword, rpchost))
return proxy.getblocktemplate()
def blockfromtemplate(template):
block = mininode.CBlock()
block.nVersion = template['version']
block.hashPrevBlock = int(template['previousblockhash'], 16)
block.nTime = template['curtime']
block.nBits = int(template['bits'], 16)
block.nNonce = int(template['noncerange'], 16)
vtx = []
btx = []
for tx in template['transactions']:
btx.append(binascii.unhexlify(tx['data']))
ctx = mininode.CTransaction()
ctx.deserialize(StringIO.StringIO(btx[-1]))
ctx.calc_sha256()
vtx.append(ctx)
assert ctx.sha256 == int(tx['hash'], 16)
block.vtx = vtx
block.hashMerkleRoot = block.calc_merkle_root()
block.calc_sha256()
return block
class BTPeer:
def __init__(self, low_level_peer, incoming_magic):
self.low_level_peer = low_level_peer
self.incoming_magic = incoming_magic
self.inflight = {}
self.blocks = {}
self.lastupdates = {} # sha256: (runcount, timestamp)
self.MTU = 1472 # fixme: do MTU path discovery for this
#self.txinv = set() # we'll probably want to do this in a more efficient fashion than a set
def __str__(self):
return self.low_level_peer.__str__()
def close(self):
self.unacknowledged = {}
def has_header(self, sha256):
assert type(sha256) == long
if sha256 in self.inflight:
return 'header'
elif sha256 in self.blocks:
return 'block'
def log_header(self, sha256):
if not self.has_header(sha256):
self.inflight[sha256] = bttrees.BTMerkleTree(sha256)
def send_message(self, t):
self.low_level_peer.send_message(t)
def send_message_acknowledged(self, t, error_callback=None, *args, **kwargs):
self.low_level_peer.send_message_acknowledged(t, error_callback, args, kwargs)
class BTUDPClient(threading.Thread):
def __init__(self, udp_listen=config.BT_PORT_UDP):
threading.Thread.__init__(self)
self.udp_listen = udp_listen
self.blocks = {}
self.merkles = {}
self.event_loop = btnet.BTEventLoop(self.handle_read, self.handle_close)
self.peer_manager = btnet.BTPeerManager(self.event_loop, self)
self.peers = {} # currently connected peers, key = (IP, port)
self.magic_map = {} # currently connected peers, key = (magic, (IP, port))
self.txmempool = {} # store txs
def run(self):
if not logs.logfile:
logs.logfile = open('debug.log', 'a', 1) # line buffered
self.event_loop.run(self.udp_listen)
if logs.logfile:
logs.logusers -= 1
if logs.logusers == 0:
try:
logs.logfile.close()
logs.logfile = None
except:
traceback.print_exc()
def stop(self):
self.event_loop.stop()
def handle_close(self):
for peer in self.peers.values():
self.remnode(peer)
if self.event_loop.state != "closed":
debuglog('btnet', "close")
self.event_loop.state = "closed"
try:
time.sleep(1) # wait for MSG_DISCONNECT to be sent
self.event_loop.socket.close()
except:
pass
def handle_read(self):
packet, addr = self.event_loop.socket.recvfrom(65535)
self.process_message(packet, addr)
def addnode(self, low_level_peer, magic):
if not low_level_peer.addr in self.peers:
peer = BTPeer(low_level_peer, magic)
self.peers[low_level_peer.addr] = peer
self.magic_map[(magic, low_level_peer.addr)] = peer
debuglog('btnet', "Adding peer %s" % str(peer))
else:
debuglog('btnet', "Peer %s:%i already exists" % low_level_peer.addr)
def remnode(self, peer):
if peer:
addr = peer.low_level_peer.addr
if addr in self.peers:
debuglog('btnet', "Removing peer %s" % (str(peer)))
del self.peers[addr]
del self.magic_map[(peer.incoming_magic, addr)]
peer.send_message(BTMessage.MSG_DISCONNECT)
peer.close()
else:
debuglog('btnet', "Peer %s:%i doesn't exist" % addr)
def process_message(self, packet, addr):
m = btnet.BTMessage.deserialize(packet)
peer = None
if (m.magic, addr) in self.magic_map:
peer = self.magic_map[(m.magic, addr)]
debuglog('btnet', "Received from %s: %s" % (':'.join(map(str, addr)), str(m)))
try:
if not peer:
# Not connected yet
self.peer_manager.process_message(m, addr)
else:
# connected
if m.payload.startswith(BTMessage.MSG_DISCONNECT):
self.remnode(peer)
if m.payload.startswith(BTMessage.MSG_HEADER):
self.recv_header(m.payload, peer)
if m.payload.startswith(BTMessage.MSG_MULTIPLE):
self.recv_multiple(m.payload, addr)
if m.payload.startswith(BTMessage.MSG_BLOCKSTATE):
self.recv_blockstate(m.payload, peer)
if m.payload.startswith(BTMessage.MSG_ACK):
self.recv_ack(m, peer)
if m.payload.startswith(BTMessage.MSG_TX):
self.recv_tx(m.payload, peer) # args?
if m.payload.startswith(BTMessage.MSG_REQUEST_TX):
self.send_tx(m.payload, peer) # args?
# need request_tx func. receive tx req and tx msg
# asking for specific tx? by txhash, or by blockhash and tx index,
# or multiple tx by list of tx indices (offsets). 3 tx in row, 3tx [0,0,0]. would be bandwidth efficient
# need mempool for txs. dict of tx hashes to tx obj? tx obj from mininode, or other class we write on top
# would want to add salted short hashes -- eventually
# receive req: check mempool.
# how would you check with req by offset or index?
# go into your block db, find that block, find hash that goes at that index, use that to get tx out of mempool
# Test: fill mempool with data from getblocktemplate, other nodes can req tx from it, they can fill their mempools, get complete blocks
# although don't have logic for which parts of merkle tree to req....
# write hardcoded thing that sends tx from one to another, check if its received at 2nd peer
if m.payload.startswith(BTMessage.MSG_REQUEST_NODES):
self.recv_node_request(m.payload, peer)
if m.payload.startswith(BTMessage.MSG_RUN):
self.recv_nodes(m.payload, peer)
if m.payload.startswith(BTMessage.MSG_REQ_TXCOUNT):
self.handle_txcount_req(m.payload, peer)
if m.payload.startswith(BTMessage.MSG_TXCOUNT_PROOF):
self.recv_txcount_proof(m.payload, peer)
if m.payload.startswith(BTMessage.MSG_MISSING_BLOCK):
debuglog('btnet', "MSG_MISSING_BLOCK received, but we can't parse it yet. Payload: %s" % m.payload)
if m.payload.startswith(BTMessage.MSG_MISSING_NODES):
debuglog('btnet', "MSG_MISSING_NODES received, but we can't parse it yet. Payload: %s" % m.payload)
if m.sequence:
if (m.magic, addr) in self.magic_map:
peer = self.magic_map[(m.magic, addr)]
self.peer_manager.send_ack(m, peer.low_level_peer)
except:
debuglog('btnet', 'Malformed UDP message or parsing error')
debuglog('btnet', traceback.format_exc())
traceback.print_exc()
def recv_multiple(self, data, addr):
s = StringIO.StringIO(data.split(BTMessage.MSG_MULTIPLE, 1)[1])
count = util.deser_varint(s)
for i in range(count):
msg_length = util.deser_varint(s)
self.process_message(s.read(msg_length), addr)
def add_header(self, cblock):
if not cblock.sha256 in self.blocks:
self.blocks[cblock.sha256] = cblock
self.merkles[cblock.sha256] = bttrees.BTMerkleTree(cblock.hashMerkleRoot)
def send_header(self, cblock, peer):
peer.log_header(cblock.sha256)
self.add_header(cblock)
header = mininode.CBlockHeader.serialize(cblock)
msg = BTMessage.MSG_HEADER + header
peer.send_message(msg)
def recv_header(self, data, peer):
blk = mininode.CBlock()
f = StringIO.StringIO(data.split(BTMessage.MSG_HEADER, 1)[1])
mininode.CBlockHeader.deserialize(blk, f)
blk.calc_sha256()
self.add_header(blk)
if not peer.has_header(blk.sha256):
debuglog('btnet', "Received header from %s: %s" % (peer, repr(blk)))
else:
debuglog('btnet', "Received duplicate header from %s: %s" % (peer, hex(blk.sha256)[2:]))
peer.log_header(blk.sha256)
self.broadcast_header(blk)
self.req_txcount_proof(peer, blk.sha256)
# fixme: figure out how to deal with txcount proofs sent in the same packet in a multipass message
def recv_ack(self, m, peer):
self.peer_manager.recv_ack(m, peer.low_level_peer)
# todo: in long run will have blockhash and index or indices, ie level in block ( 5th and 7th tx in block X)
# two ways node can learn about tx, complete block from file/source or from over network. add to mempool
def send_tx_req(self, txhash, peer):
assert peer in self.peers.values()
msg = BTMessage.MSG_REQUEST_TX + txhash
# todo: make node stop sending requests after receiving requested tx from peer
print "Sending tx request for ", txhash
peer.send_message(msg)
def send_tx(self, data, peer):
txhash = data.split(BTMessage.MSG_REQUEST_TX, 1)[1]
for hash in self.txmempool:
if hash == txhash:
print "Found requested txhash in mempool, sending tx to peer: ", txhash
tx = self.txmempool[hash]
msg = BTMessage.MSG_TX + tx
peer.send_message(msg)
# Receive txs from peers, check mempool for hash, add to block if not (identify block?)
# TXs come through as binary blobs, use mininode CTransaction to deserialize, calc hash
def recv_tx(self, data, peer):
ctx = mininode.CTransaction()
tx = StringIO.StringIO(data.split(BTMessage.MSG_TX, 1)[1])
mininode.CTransaction.deserialize(ctx, tx)
ctx.calc_sha256()
print 'Storing tx received over the network for txhash: ', ctx.hash
if ctx.hash not in self.txmempool:
# Store binary blob in mempool... why does output not look the same as test mempool tx blob?
self.txmempool[ctx.hash] = ctx.serialize()
print "Tx serialized and stored in mempool:", self.txmempool[ctx.hash]
def send_blockstate(self, state, sha256, peer, level=0, index=0):
assert peer in self.peers.values()
peer.lastupdates[sha256] = (self.merkles[sha256].runs, time.time())
msg = BTMessage.MSG_BLOCKSTATE + util.ser_uint256(sha256) + state.serialize(level, index)
peer.send_message(msg)
def recv_blockstate(self, data, peer):
s = StringIO.StringIO(data.split(BTMessage.MSG_BLOCKSTATE, 1)[1])
sha256 = util.deser_uint256(s)
if peer.has_header(sha256) == 'header':
peer.inflight[sha256].state.deserialize(s)
debuglog('btnet', "New block state for %i: \n" % sha256, peer.inflight[sha256])
self.maybe_download_nodes(peer, sha256)
def broadcast_header(self, cblock):
sha = cblock.sha256
for peer in self.peers.values():
if not peer.has_header(sha):
self.send_header(cblock, peer)
def req_txcount_proof(self, peer, sha256):
msg = BTMessage.MSG_REQ_TXCOUNT + util.ser_uint256(sha256)
print "sending txcount proof req to ", peer
peer.send_message(msg)
def handle_txcount_req(self, data, peer):
print "received proof req from", peer
s = StringIO.StringIO(data.split(BTMessage.MSG_REQ_TXCOUNT, 1)[1])
sha256 = util.deser_uint256(s)
self.send_txcount_proof(peer, sha256)
def send_txcount_proof(self, peer, sha256):
txcount, hashes = self.merkles[sha256].maketxcountproof()
if txcount and hashes:
msg = BTMessage.MSG_TXCOUNT_PROOF + util.ser_uint256(sha256) + util.ser_varint(txcount) + ''.join(hashes)
print "sending %i byte txcount proof to %s" %(len(msg), `peer`)
peer.send_message(msg)
else:
"couldn't send txcount proof to ", peer
def recv_txcount_proof(self, data, peer):
print "received txcount proof from %s" % `peer`
s = StringIO.StringIO(data.split(BTMessage.MSG_TXCOUNT_PROOF)[1])
sha256 = util.deser_uint256(s)
txcount = util.deser_varint(s)
levels, hashcount, path = 0, 1, txcount-1
while path:
hashcount += path & 1
levels += 1
path = path >> 1
print "txcount proof received: %i, %i, %i " % (levels, hashcount, txcount)
hashes = [s.read(32) for i in range(hashcount)]
self.merkles[sha256].checktxcountproof(txcount, hashes)
def send_node_request(self, peer, sha256, level, index, generations, complete=0):
assert level < 253 and generations < 253 and index < 2**level and index < 2**30
flags = 0
if complete: flags |= 1
msg = BTMessage.MSG_REQUEST_NODES + util.ser_uint256(sha256) + chr(level) + util.ser_varint(index) + chr(generations) + util.ser_varint(flags)
#print "sending message %s to peer %s" % (msg.encode('hex'), str(peer))
peer.send_message(msg)
def recv_node_request(self, data, peer):
s = StringIO.StringIO(data.split(BTMessage.MSG_REQUEST_NODES)[1])
sha256 = util.deser_uint256(s)
level = ord(s.read(1))
index = util.deser_varint(s)
generations = ord(s.read(1))
flags = util.deser_varint(s)
debuglog('btnet', "peer %s wants h=%s l=%i i=%i g=%i f=%i" % (str(peer), util.ser_uint256(sha256)[::-1].encode('hex'), level, index, generations, flags))
# fixme: maybe add choke/throttle checks here?
self.send_nodes(peer, sha256, level, index, generations, flags)
def send_nodes(self, peer, sha256, level, index, generations, flags):
if not sha256 in self.merkles:
debuglog('btnet', 'peer %s wants a block that we don\'t know about: %s' % (str(peer), util.ser_uint256(sha256)[::-1].encode('hex')))
peer.send_message(BTMessage.MSG_MISSING_BLOCK + util.ser_uint256(sha256) + chr(level) + util.ser_varint(index) + chr(generations))
return
if not self.merkles[sha256].state.hasdescendants(level, index, generations):
debuglog('btnet', 'peer %s wants nodes that we don\'t know about: l=%i i=%i g=%i h=%s' % (str(peer), leve, index, generations, util.ser_uint256(sha256)[::-1].encode('hex')))
peer.send_message(BTMessage.MSG_MISSING_NODES + util.ser_uint256(sha256) + chr(level) + util.ser_varint(index) + chr(generations))
return
run = self.merkles[sha256].getrun(level, index, generations)
assert type(run[0]) == str and len(run[0]) == 32 # Just checking to make sure that merkles stores the serialized str version of the hash, since I forgot
flags = 0
if flags: raise NotImplementedError
msg = BTMessage.MSG_RUN + util.ser_uint256(sha256) + chr(level) + util.ser_varint(index) + chr(generations) + util.ser_varint(len(run)) + util.ser_varint(flags) + ''.join(run)
if len(msg) > peer.MTU:
debuglog('btnet', 'MSG_RUN has length %i which exceeds peer %s\'s max MTU of %i' % (len(msg), str(peer), peer.MTU))
peer.send_message(msg)
def recv_nodes(self, data, peer):
s = StringIO.StringIO(data.split(BTMessage.MSG_RUN)[1])
sha256 = util.deser_uint256(s)
level = ord(s.read(1))
index = util.deser_varint(s)
generations = ord(s.read(1))
length = util.deser_varint(s)
flags = util.deser_varint(s)
if flags: raise NotImplementedError
run = [s.read(32) for i in range(length)]
result = self.merkles[sha256].checkaddrun(level, index, generations, length, run)
if not result:
print "Failed to add from peer=%s: l=%i i=%i g=%i h=%s" % (str(peer), level, index, generations, util.ser_uint256(sha256)[::-1].encode('hex'))
debuglog('btnet', "Failed to add from peer=%s: l=%i i=%i g=%i h=%s" % (str(peer), level, index, generations, util.ser_uint256(sha256)[::-1].encode('hex')))
else:
self.maybe_update_peers(sha256)
def maybe_update_peers(self, sha256):
if not self.merkles[sha256].txcount:
peers = [peer for peer in self.peers if sha256 in peer.blocks or sha256 in peer.inflight]
peer = random.choice(peers)
self.req_txcount_proof(peer, sha256)
print "requesting txcount proof from peer", peer
return
max_runs = config.UPDATE_PEERS_EVERY_N_RUNS
max_ms = config.UPDATE_PEERS_EVERY_N_MS
for peer in self.peers.values():
if not sha256 in peer.lastupdates:
self.send_blockstate(self.merkles[sha256].state, sha256, peer)
continue
oldruns, oldms = peer.lastupdates[sha256]
ms = (time.time() - oldms) * 1000.
runs = float(self.merkles[sha256].runs - oldruns)
if (runs / max_runs) + (ms / max_ms) > 1:
self.send_blockstate(self.merkles[sha256].state, sha256, peer)
else:
pass # fixme: queue a delayed call to this to make sure that peers eventually hear about the updates
def maybe_download_nodes(self, peer, sha256):
lastlevel = self.merkles[sha256].levels
stepsize = 5 # fixme: based on MTU and account for shorthashes
levels = []
# We want to avoid downloading a ton of 1-generation runs at the 10th level,
# so we start at the last level and jump stepsize generations back. That ensures that
# any short runs we have to download occur at levels with fewer elements
level = lastlevel
while level - stepsize > stepsize:
levels.append(level)
level -= stepsize
if level > stepsize:
levels.append(level)
level = stepsize
levels.append(level)
levels.append(0)
levels.reverse()
#print levels
mebmp = self.merkles[sha256].state.tobitmap(levels, txlev=lastlevel)
prbmp = peer.inflight[sha256].state.tobitmap(levels, txlev=lastlevel)
need, req, pipe = self.merkles[sha256].state.getrequestables(mebmp, prbmp)
#print need
#print req
#print pipe
#n1b = nodes[1].merkles[blk.sha256].state.tobitmap([0, 5, 6, nodes[1].merkles[blk.sha256].levels], txlev=nodes[1].merkles[blk.sha256].levels)
#n0b = nodes[0].merkles[blk.sha256].state.tobitmap([0, 5, 6, nodes[0].merkles[blk.sha256].levels], txlev=nodes[0].merkles[blk.sha256].levels)
#need, req, pipe = nodes[1].merkles[blk.sha256].state.getrequestables(n1b, n0b)
#levels = req.keys()
#levels.append(nodes[1].merkles[blk.sha256].levels)
#levels.sort()
#for l, nxt in zip(levels[:-1], levels[1:]):
# for i in range(len(req[l])):
# if req[l][i]:
# nodes[1].send_node_request(nodes[1].peers.values()[0], blk.sha256, l, i, nxt-l)
# requests += 1