以太坊源码分析(二 区块)
2019-01-26 16:26区块
上篇介绍了以太坊项目的整体结构,本篇主要通过几个问题展开源码的分析
- 创世块是如何生成的?
- 以太坊如何进行块验证的?
创世块是如何生成的
创世块是每个以太坊节点在启动的时候必须要处理的,如果没有正确处理创世块那么节点是没法加入到网络中的,创世块是预定义好的,在不同的网络类型主网或者测试网络中它的内容是不同的,这取决于以太网络初建时,大多数节点共同认可的。
既然创世块是必须要做的,那么代码实现也应该是在系统刚启动初始化所完成的工作,可以从两个类去分析,第一个Initializer,另外一个是WorldManager
-
Initializer
该类在config配置下,实现BeanPostProcessor接口,作用是加载Bean成功前后可以做些业务处理,核心代码如下
@Override
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
if (bean instanceof SystemProperties) {
initConfig((SystemProperties) bean);
}
return bean;
}
可以看到,它的作用便是在Bean初始化完之前执行initConfig((SystemProperties) bean),而这个方法的代码片段如下:
private void initConfig(SystemProperties config) {
.....
// forcing loading blockchain config
config.getBlockchainConfig();
// forcing loading genesis to fail fast in case of error
config.getGenesis();
....
}
- 其中getBlockchainConfig方法会调用loadGenesisJson,而这个方法会加载classpath下的frontier.json文件,这个文件内容如下
{
"alloc": {
"3282791d6fd713f1e94f4bfd565eaa78b3a0599d": {
"balance": "1337000000000000000000"
},
...省略
"nonce": "0x0000000000000042",
"difficulty": "0x0400000000",
"mixhash": "0x0000000000000000000000000000000000000000000000000000000000000000",
"coinbase": "0x0000000000000000000000000000000000000000",
"timestamp": "0x00",
"parentHash": "0x0000000000000000000000000000000000000000000000000000000000000000",
"extraData": "0x11bbe8db4e347b4e8c937c1c8370e4b5ed33adb3db69cbdb7a38e1e50b1b82fa",
"gasLimit": "0x1388"
}
由于我们配置文件中默认的网络配置的主网,所以会加载MainNetConfig,这个就不再详细分析,这个时候其实我们已经在本地文件中将创世块的json文件加载了,但是并没有存储到链中,也不能就这样存储,因为还没有验证,所以紧接着的一步就是对内存中的创世块头进行验证,但是这里发现headerValidators没有被初始化的所以并没有验证。。。
if (genesisJson.getConfig() != null && genesisJson.getConfig().headerValidators != null) {
for (GenesisConfig.HashValidator validator : genesisJson.getConfig().headerValidators) {
BlockHeaderValidator headerValidator = new BlockHeaderValidator(new BlockCustomHashRule(ByteUtil.hexStringToBytes(validator.hash)));
blockchainConfig.getConfigForBlock(validator.number).headerValidators().add(
Pair.of(validator.number, headerValidator));
}
}
- 另外一个getGenesis方法是将内存中json信息生成为创世块
public static Genesis parseGenesis(BlockchainNetConfig blockchainNetConfig, GenesisJson genesisJson) throws RuntimeException {
try {
# 构造创世块
Genesis genesis = createBlockForJson(genesisJson);
#设置账户,就是设置对应的矿工
genesis.setPremine(generatePreMine(blockchainNetConfig, genesisJson.getAlloc()));
#生成状态的 roothash,就是根据账户状态构造一颗状态MPT树
byte[] rootHash = generateRootHash(genesis.getPremine());
genesis.setStateRoot(rootHash);
return genesis;
} catch (Exception e) {
e.printStackTrace();
Utils.showErrorAndExit("Problem parsing genesis", e.getMessage());
}
return null;
}
如此创世块便在内存中生成了,下面进行第二步,将内存中创世块存到区块链中进而存储到本地数据库中
-
WorldManager的处理
在该类构造方法中,会调用加载区块链的方法,片段如下:
if (blockStore.getBestBlock() == null) {
logger.info("DB is empty - adding Genesis");
Genesis genesis = Genesis.getInstance(config);
Genesis.populateRepository(repository, genesis);
// repository.commitBlock(genesis.getHeader());
repository.commit();
blockStore.saveBlock(Genesis.getInstance(config), Genesis.getInstance(config).getCumulativeDifficulty(), true);
blockchain.setBestBlock(Genesis.getInstance(config));
blockchain.setTotalDifficulty(Genesis.getInstance(config).getCumulativeDifficulty());
listener.onBlock(new BlockSummary(Genesis.getInstance(config), new HashMap<byte[], BigInteger>(), new ArrayList<TransactionReceipt>(), new ArrayList<TransactionExecutionSummary>()));
// repository.dumpState(Genesis.getInstance(config), 0, 0, null);
logger.info("Genesis block loaded");
}
populateRepository方法就是将创世区块的账户信息(矿工信息包含地址、余额等)存入到账户库对应的缓存中,紧接着commit方法将缓存中的信息存入库中,而blockStore.saveBlock是将区块存储到了本地数据库中,后面是将bestblock(理解为最高的快)设置为本块,难度设置为基于创世块计算的难度,至此创世块加载完毕。
以太坊是如何进行块验证的
当我们启动一个以太坊节点连接到主网或者testnet时,以太坊节点作为P2P网络上的节点它会寻找到可用节点,并从可用节点同步区块数据,这部分内容另一篇会详细介绍。所以以太坊的块验证也是应该在各个节点收到区块的时候。所以可以从Eth62类(eth协议版本是62,eth63继承的该类)看起,如下:
public void channelRead0(final ChannelHandlerContext ctx, EthMessage msg) throws InterruptedException {
super.channelRead0(ctx, msg);
switch (msg.getCommand()) {
case STATUS:
processStatus((StatusMessage) msg, ctx);
break;
case NEW_BLOCK_HASHES:
#其他节点传播过来的block hash
processNewBlockHashes((NewBlockHashesMessage) msg);
break;
case TRANSACTIONS:
#其他节点传播过来的tx
processTransactions((TransactionsMessage) msg);
break;
case GET_BLOCK_HEADERS:
#其他节点请求的header
processGetBlockHeaders((GetBlockHeadersMessage) msg);
break;
case BLOCK_HEADERS:
#本节点主动请求的header
processBlockHeaders((BlockHeadersMessage) msg);
break;
case GET_BLOCK_BODIES:
#其他节点请求的块
processGetBlockBodies((GetBlockBodiesMessage) msg);
break;
case BLOCK_BODIES:
#本节点主动请求的块
processBlockBodies((BlockBodiesMessage) msg);
break;
case NEW_BLOCK:
#新的块,其他节点传播过来的
processNewBlock((NewBlockMessage) msg);
break;
default:
break;
}
}
以上几种消息类型这里我们主要的关注的是BLOCK_HEADERS、BLOCK_BODIES、NEW_BLOCK,其中前两个是本节点启动后主动请求的区块而最后一个是其他节点传播过来的新块,这些块我们肯定需要验证并加入到链(当然NEW_BLOCK看情况),再继续分析这几个方法之前需要先要明白节点启动的几个任务,SyncManager类继承自BlockDownloader,在spring bean均加载完之后,WorldManager类配置的PostConstruct注解标记的init方法启动
@PostConstruct
private void init() {
syncManager.init(channelManager, pool);
}
syncManager.init方法执行后,由于我们没有启用fast sync,所以会继续执行initRegularSync(EthereumListener.SyncState.COMPLETE);
它做了两件主要的事情,一个是启动消费指定的阻塞队列处理块的任务,代码片段如下
private void produceQueue() {
.....省略
while (!Thread.currentThread().isInterrupted()) {
.....省略
synchronized (blockchain) {
sl = System.nanoTime();
importResult = blockchain.tryToConnect(wrapper.getBlock());
}
.....省略
}
}
tryToConnect调用到tryConnectAndFork方法,进而调用了add方法,最终调用到了addImpl方法,该方法调用了验证block的方法,部分代码如下:
public synchronized BlockSummary addImpl(Repository repo, final Block block) {
....省略
if (!isValid(repo, block)) {
logger.warn("Invalid block with number: {}", block.getNumber());
return null;
}
...后续是存储,省略
}
private boolean isValid(Repository repo, Block block) {
boolean isValid = true;
if (!block.isGenesis()) {
isValid = isValid(block.getHeader());
// Sanity checks
String trieHash = Hex.toHexString(block.getTxTrieRoot());
String trieListHash = Hex.toHexString(calcTxTrie(block.getTransactionsList()));
if (!trieHash.equals(trieListHash)) {
logger.warn("Block's given Trie Hash doesn't match: {} != {}", trieHash, trieListHash);
return false;
}
// if (!validateUncles(block)) return false;
List<Transaction> txs = block.getTransactionsList();
if (!txs.isEmpty()) {
// Repository parentRepo = repository;
// if (!Arrays.equals(bestBlock.getHash(), block.getParentHash())) {
// parentRepo = repository.getSnapshotTo(getBlockByHash(block.getParentHash()).getStateRoot());
// }
Map<ByteArrayWrapper, BigInteger> curNonce = new HashMap<>();
for (Transaction tx : txs) {
byte[] txSender = tx.getSender();
ByteArrayWrapper key = new ByteArrayWrapper(txSender);
BigInteger expectedNonce = curNonce.get(key);
if (expectedNonce == null) {
expectedNonce = repo.getNonce(txSender);
}
curNonce.put(key, expectedNonce.add(ONE));
BigInteger txNonce = new BigInteger(1, tx.getNonce());
if (!expectedNonce.equals(txNonce)) {
logger.warn("Invalid transaction: Tx nonce {} != expected nonce {} (parent nonce: {}): {}",
txNonce, expectedNonce, repo.getNonce(txSender), tx);
return false;
}
}
}
}
return isValid;
}
这部分代码实现就是参照的 (block validation)[https://github.com/ethereum/wiki/blob/master/Block-Protocol-2.0.md] 这里就不再详细分析。
initRegularSync方法执行另外一件事情就是调用父类的init方法,它启动了两个任务:
public void init(SyncQueueIfc syncQueue, final SyncPool pool) {
this.syncQueue = syncQueue;
this.pool = pool;
logger.info("Initializing BlockDownloader.");
if (headersDownload) {
getHeadersThread = new Thread(new Runnable() {
@Override
public void run() {
headerRetrieveLoop();
}
}, "SyncThreadHeaders");
getHeadersThread.start();
}
if (blockBodiesDownload) {
getBodiesThread = new Thread(new Runnable() {
@Override
public void run() {
blockRetrieveLoop();
}
}, "SyncThreadBlocks");
getBodiesThread.start();
}
}
headerRetrieveLoop是循环的去其他节点获取headers这里采用的方式是发送后使用回调处理的,回调处理内容是validateAndAddHeaders方法,它会对header进行验证,代码如下:
private boolean validateAndAddHeaders(List<BlockHeader> headers, byte[] nodeId) {
if (headers.isEmpty()) return true;
List<BlockHeaderWrapper> wrappers = new ArrayList<>(headers.size());
for (BlockHeader header : headers) {
if (!isValid(header)) {
if (logger.isDebugEnabled()) {
logger.debug("Invalid header RLP: {}", Hex.toHexString(header.getEncoded()));
}
return false;
}
wrappers.add(new BlockHeaderWrapper(header, nodeId));
}
synchronized (this) {
List<BlockHeaderWrapper> headersReady = syncQueue.addHeaders(wrappers);
if (headersReady != null && !headersReady.isEmpty()) {
pushHeaders(headersReady);
}
}
receivedHeadersLatch.countDown();
logger.debug("{} headers added", headers.size());
return true;
}
futureHeaders的设置就是在前面提到的eth62处理的handler中,具体的便是processBlockHeaders,代码如下:
protected synchronized void processBlockHeaders(BlockHeadersMessage msg) {
if(logger.isTraceEnabled()) logger.trace(
"Peer {}: processing BlockHeaders, size [{}]",
channel.getPeerIdShort(),
msg.getBlockHeaders().size()
);
GetBlockHeadersMessageWrapper request = headerRequest;
headerRequest = null;
if (!isValid(msg, request)) {
dropConnection();
return;
}
List<BlockHeader> received = msg.getBlockHeaders();
if (ethState == EthState.STATUS_SENT || ethState == EthState.HASH_CONSTRAINTS_CHECK)
processInitHeaders(received);
else {
syncStats.addHeaders(received.size());
#这里便是设置future,使得请求headers的回调执行
request.getFutureHeaders().set(received);
}
processingTime += lastReqSentTime > 0 ? (System.currentTimeMillis() - lastReqSentTime) : 0;
lastReqSentTime = 0;
peerState = IDLE;
}
另外的任务blockRetrieveLoop是循环的去其他节点获取blocks,回调的内容便是向之前消费的阻塞队列中插入块:
private void addBlocks(List<Block> blocks, byte[] nodeId) {
if (blocks.isEmpty()) {
return;
}
synchronized (this) {
logger.debug("Adding new " + blocks.size() + " blocks to sync queue: " +
blocks.get(0).getShortDescr() + " ... " + blocks.get(blocks.size() - 1).getShortDescr());
#将块插入到阻塞队列中
List<Block> newBlocks = syncQueue.addBlocks(blocks);
List<BlockWrapper> wrappers = new ArrayList<>();
for (Block b : newBlocks) {
wrappers.add(new BlockWrapper(b, nodeId));
}
logger.debug("Pushing " + wrappers.size() + " blocks to import queue: " + (wrappers.isEmpty() ? "" :
wrappers.get(0).getBlock().getShortDescr() + " ... " + wrappers.get(wrappers.size() - 1).getBlock().getShortDescr()));
pushBlocks(wrappers);
}
receivedBlocksLatch.countDown();
if (logger.isDebugEnabled()) logger.debug(
"Blocks waiting to be proceed: lastBlock.number: [{}]",
blocks.get(blocks.size() - 1).getNumber()
);
}
同样的它也也是异步实现的
protected synchronized void processBlockBodies(BlockBodiesMessage msg) {
if (logger.isTraceEnabled()) logger.trace(
"Peer {}: process BlockBodies, size [{}]",
channel.getPeerIdShort(),
msg.getBlockBodies().size()
);
if (!isValid(msg)) {
dropConnection();
return;
}
syncStats.addBlocks(msg.getBlockBodies().size());
List<Block> blocks = null;
try {
blocks = validateAndMerge(msg);
} catch (Exception e) {
logger.info("Fatal validation error while processing block bodies from peer {}", channel.getPeerIdShort());
}
if (blocks == null) {
// headers will be returned by #onShutdown()
dropConnection();
return;
}
#设置值触发回调
futureBlocks.set(blocks);
futureBlocks = null;
processingTime += (System.currentTimeMillis() - lastReqSentTime);
lastReqSentTime = 0;
另外NEW_BLOCK这种消息的处理也就是通过放入阻塞队列实现的,这里便不再分析了