Python中如何实现区块链节点同步并逐步完善转账功能


Python中如何实现区块链节点同步并逐步完善转账功能
7 回复

写区块链各大公司都在用 go 啊!楼主用 python 写怕不是慢死?


这个问题挺有意思的,搞区块链节点同步和转账,核心其实就是网络通信、共识验证和状态管理。下面我直接给个能跑的示例,从最基础的开始。

首先,节点同步最简单的就是搞个P2P网络,用socket或者aiohttp都行。这里用Flask快速搭个HTTP节点:

import hashlib
import json
import time
from typing import List, Dict, Any
from flask import Flask, request, jsonify

app = Flask(__name__)

class Block:
    def __init__(self, index: int, transactions: List[Dict], timestamp: float, previous_hash: str):
        self.index = index
        self.transactions = transactions
        self.timestamp = timestamp
        self.previous_hash = previous_hash
        self.nonce = 0
        self.hash = self.calculate_hash()

    def calculate_hash(self) -> str:
        block_string = f"{self.index}{json.dumps(self.transactions)}{self.timestamp}{self.previous_hash}{self.nonce}"
        return hashlib.sha256(block_string.encode()).hexdigest()

    def mine_block(self, difficulty: int):
        while self.hash[:difficulty] != "0" * difficulty:
            self.nonce += 1
            self.hash = self.calculate_hash()

class Blockchain:
    def __init__(self):
        self.chain: List[Block] = []
        self.pending_transactions: List[Dict] = []
        self.difficulty = 2
        self.nodes = set()
        self.create_genesis_block()

    def create_genesis_block(self):
        genesis_block = Block(0, [], time.time(), "0")
        genesis_block.mine_block(self.difficulty)
        self.chain.append(genesis_block)

    def add_transaction(self, sender: str, recipient: str, amount: float) -> int:
        self.pending_transactions.append({
            'sender': sender,
            'recipient': recipient,
            'amount': amount,
            'timestamp': time.time()
        })
        return self.get_last_block().index + 1

    def mine_pending_transactions(self, miner_address: str):
        block = Block(
            len(self.chain),
            self.pending_transactions,
            time.time(),
            self.get_last_block().hash
        )
        block.mine_block(self.difficulty)
        self.chain.append(block)
        self.pending_transactions = []
        # 给矿工奖励
        self.pending_transactions.append({
            'sender': "NETWORK",
            'recipient': miner_address,
            'amount': 1.0,
            'timestamp': time.time()
        })

    def get_last_block(self) -> Block:
        return self.chain[-1]

    def is_chain_valid(self, chain: List[Block]) -> bool:
        for i in range(1, len(chain)):
            current = chain[i]
            previous = chain[i-1]
            if current.hash != current.calculate_hash():
                return False
            if current.previous_hash != previous.hash:
                return False
        return True

    def add_node(self, address: str):
        self.nodes.add(address)

    def sync_chain(self) -> bool:
        longest_chain = None
        max_length = len(self.chain)

        for node in self.nodes:
            # 这里应该发送HTTP请求获取其他节点的链
            # 简化为直接比较
            pass

        if longest_chain:
            self.chain = longest_chain
            return True
        return False

# 初始化区块链
blockchain = Blockchain()

@app.route('/transactions/new', methods=['POST'])
def new_transaction():
    values = request.get_json()
    required = ['sender', 'recipient', 'amount']
    if not all(k in values for k in required):
        return 'Missing values', 400

    index = blockchain.add_transaction(values['sender'], values['recipient'], values['amount'])
    response = {'message': f'Transaction will be added to Block {index}'}
    return jsonify(response), 201

@app.route('/mine', methods=['GET'])
def mine():
    blockchain.mine_pending_transactions("miner_address")
    response = {
        'message': "New block mined",
        'index': blockchain.get_last_block().index,
        'transactions': [tx for tx in blockchain.get_last_block().transactions],
        'hash': blockchain.get_last_block().hash
    }
    return jsonify(response), 200

@app.route('/chain', methods=['GET'])
def full_chain():
    response = {
        'chain': [{
            'index': block.index,
            'transactions': block.transactions,
            'timestamp': block.timestamp,
            'hash': block.hash,
            'previous_hash': block.previous_hash
        } for block in blockchain.chain],
        'length': len(blockchain.chain)
    }
    return jsonify(response), 200

@app.route('/nodes/register', methods=['POST'])
def register_nodes():
    values = request.get_json()
    nodes = values.get('nodes')
    if nodes is None:
        return "Error: Please supply a valid list of nodes", 400

    for node in nodes:
        blockchain.add_node(node)

    response = {
        'message': 'New nodes have been added',
        'total_nodes': list(blockchain.nodes)
    }
    return jsonify(response), 201

@app.route('/nodes/sync', methods=['GET'])
def consensus():
    replaced = blockchain.sync_chain()
    if replaced:
        response = {'message': 'Our chain was replaced', 'new_chain': blockchain.chain}
    else:
        response = {'message': 'Our chain is authoritative', 'chain': blockchain.chain}
    return jsonify(response), 200

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=5000)

转账功能在上面已经实现了,就是add_transaction方法。要点:

  1. 交易需要验证(这里简化为直接添加)
  2. 交易打包进区块需要挖矿
  3. 同步时验证整个链的有效性

要完善的话得加:交易签名验证、UTXO模型、P2P网络通信、共识算法(PoW/PoS)。但上面这个代码已经能跑起来最基本的链了。

先跑通这个基础版本,再逐步加功能。

python 自然有 python 的优势

反正娱乐用啥都行

有个老外比较早就写了一个完整版的区块链 https://github.com/dvf/blockchain

能推荐一两个开源项目吗?

节点同步时验证区块链已经完成,接下来要做一些签名方面的处理和挖矿。

回到顶部