声明:链码开发语言是golang,源码分析是基于fabric 1.4.0版本
用户链码与peer的关系
用户链码是一个独立的进程,使用docker封装(非dev模式下)。
链码容器由peer创建,在启动容器时指定了peer的地址,所以链码容器启动后能够找到peer,并建立tcp长连接,其中peer为服务端,协议是:grpc->http2->tcp。
switch ccType {
case pb.ChaincodeSpec_GOLANG.String(), pb.ChaincodeSpec_CAR.String():
lc.Args = []string{"chaincode", fmt.Sprintf("-peer.address=%s", c.PeerAddress)}
case pb.ChaincodeSpec_JAVA.String():
lc.Args = []string{"/root/chaincode-java/start", "--peerAddress", c.PeerAddress}
case pb.ChaincodeSpec_NODE.String():
lc.Args = []string{"/bin/sh", "-c", fmt.Sprintf("cd /usr/local/src; npm start -- --peer.address %s", c.PeerAddress)}
default:
return nil, errors.Errorf("unknown chaincodeType: %s", ccType)
}
链码进程在和peer建立长连接之后,会有一个注册的动作,注册完成之后,链码处于ready状态,开始接收背书请求。 注:由于链码容器只会和一个peer建立长连接,所以一个链码容器不可能被多个peer使用,但可以被同一个peer的多个通道共用。
背书过程分析
peer在同时收到多个client的交易提案时(接收动作是并行,因为是不同的tcp长连接),
而把提案信息发送给链码,这个发送动作必然是串行的,因为只有一个tcp长连接!不过这些都不影响我们的链码编写。
接下来看链码侧逻辑。
链码进程启动之后,核心代码:
负责与peer交互的
函数:func chatWithPeer(chaincodename string, stream PeerChaincodeStream, cc Chaincode) error {}
位置:core/chaincode/shim/chaincode.go: 321
函数:func (handler *Handler) handleReady(msg *pb.ChaincodeMessage, errc chan error) error {}
位置:core/chaincode/shim/handler.go: 774
函数:func (handler *Handler) handleTransaction(msg *pb.ChaincodeMessage, errc chan error) {}
位置:core/chaincode/shim/handler.go: 238
在chatWithPeer函数里面,使用一个无限for循环,不停的接收peer发送的消息。
如果peer是ready状态,调用handleTransaction函数,这个函数里启动了一个goroutine(关键点),在这个goroutine函数里调用了我们的Invoke函数,并把invoke的结果返回给peer!
如果极短的时间内,链码收到了多个背书请求,就会创建多个goroutine,那么这些goroutine就可能在同一时刻被执行。更多细节可以了解goroutine调度相关资料。 现在确认了多次调用Invoke是并行的(至少开始执行是并行的),不管是否是同一个接口!
在我们的业务逻辑中,不可避免的会读写世界状态,两个goroutine中同时调用GetState/PutState,是否会互相影响,最终导致串行呢? 分析源码,以GetState为例,调用堆栈如下:
func (stub *ChaincodeStub) GetState(key string) ([]byte, error) {}
func (handler *Handler) handleGetState(collection string, key string, channelId string, txid string) ([]byte, error) {}
func (handler *Handler) callPeerWithChaincodeMsg(msg *pb.ChaincodeMessage, channelID, txid string) (pb.ChaincodeMessage, error) {}
func (handler *Handler) createChannel(channelID, txid string) (chan pb.ChaincodeMessage, error) {}
func (handler *Handler) sendReceive(msg *pb.ChaincodeMessage, c chan pb.ChaincodeMessage) (pb.ChaincodeMessage, error) {}
func (handler *Handler) serialSendAsync(msg *pb.ChaincodeMessage, errc chan error) {}
callPeerWithChaincodeMsg源码如下:
func (handler *Handler) callPeerWithChaincodeMsg(msg *pb.ChaincodeMessage, channelID, txid string) (pb.ChaincodeMessage, error) {
// Create the channel on which to communicate the response from the peer
var respChan chan pb.ChaincodeMessage
var err error
if respChan, err = handler.createChannel(channelID, txid); err != nil { // 创建管道
return pb.ChaincodeMessage{}, err
}
defer handler.deleteChannel(channelID, txid) // 删除管道
return handler.sendReceive(msg, respChan)
}
createChannel会创建一个管道,并以channelID+txid为key,缓存到Handler对象中,在收到peer返回的数据后,根据channelID+txid找到对应的管道,将数据放入管道中。
注意:创建管道时,如果channelID+txid对应的管道已存在,就返回失败,这意味着Invoke中无法通过创建goroutine来调用GetState等涉及与peer交互的接口。
sendReceive的源码如下:
func (handler *Handler) sendReceive(msg *pb.ChaincodeMessage, c chan pb.ChaincodeMessage) (pb.ChaincodeMessage, error) {
errc := make(chan error, 1)
handler.serialSendAsync(msg, errc) // 异步发送,发送动作的结果通过errc管道返回
for { // 为了处理serialSendAsync发送失败的情况,才使用for
select {
case err :=
关注
打赏
- DevOps实践教程 华为云 系列教程2021 合集
- ❤️Python Django网站开发 2021年最新版教程 合集❤️
- ❤️java多线程并发编程入门 教程合集❤️
- ❤️区块链Hyperledger Fabric 老版本 1.1.0 快速部署安装 教程合集❤️
- ❤️Docker教程小白实操入门 教程合集❤️
- ❤️微信小程序 云开发 教程合集(视频+图文)免费❤️
- C++ boost::asio::io_service创建线程池thread_group简单实例
- C++ error: ‘shared_ptr’ was not declared in this scope
- git 代码回滚回退到指定版本 并 提交
- C++ 得到map中最后一个元素