-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsocket_app_proxy.go
More file actions
132 lines (105 loc) · 3.17 KB
/
socket_app_proxy.go
File metadata and controls
132 lines (105 loc) · 3.17 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
package proxy
import (
"time"
"net"
"net/rpc"
"net/rpc/jsonrpc"
"github.com/newdag/ledger"
"github.com/sirupsen/logrus"
)
//-------client---------------------------------------------------------------------------------------------
type APP_StateHash struct {
Hash []byte
}
type socketAppProxyClient struct {
clientAddr string
timeout time.Duration
logger *logrus.Logger
}
//call the client rpc State.CommitBlock
func (p *socketAppProxyClient) commitBlock(block ledger.Block) ([]byte, error) {
var stateHash APP_StateHash
conn, err := net.DialTimeout("tcp", p.clientAddr, p.timeout)
if err != nil {
return nil, err
}
rpcConn := jsonrpc.NewClient(conn)
err = rpcConn.Call("State.CommitBlock", block, &stateHash)
p.logger.WithFields(logrus.Fields{
"block": block.Index(),
"state_hash": stateHash.Hash,
}).Debug("AppProxyClient.commitBlock")
return stateHash.Hash, err
}
//-------server---------------------------------------------------------------------------------------------
type socketAppProxyServer struct {
netListener *net.Listener
rpcServer *rpc.Server
submitCh chan []byte
logger *logrus.Logger
}
func newSocketAppProxyServer(bindAddress string, logger *logrus.Logger) *socketAppProxyServer {
server := &socketAppProxyServer{ submitCh: make(chan []byte), logger: logger, }
rpcServer := rpc.NewServer()
rpcServer.RegisterName("NewDAG", server)
server.rpcServer = rpcServer
l, err := net.Listen("tcp", bindAddress)
if err != nil {
logger.WithField("error", err).Error("Failed to listen")
}
server.netListener = &l
return server
}
//call by rpc
func (p *socketAppProxyServer) SubmitTx(tx []byte, ack *bool) error {
p.logger.Debug("SubmitTx")
p.submitCh <- tx
*ack = true
return nil
}
func (p *socketAppProxyServer) listen() {
for {
conn, err := (*p.netListener).Accept()
if err != nil {
p.logger.WithField("error", err).Error("Failed to accept")
}
go (*p.rpcServer).ServeCodec(jsonrpc.NewServerCodec(conn))
}
}
//----------------------------------------------------------------------------------------------------
type SocketAppProxy struct {
clientAddress string
bindAddress string
client *socketAppProxyClient
server *socketAppProxyServer
logger *logrus.Logger
}
func NewSocketAppProxy(clientAddr string, bindAddr string, timeout time.Duration, logger *logrus.Logger) *SocketAppProxy {
if logger == nil {
logger = logrus.New()
logger.Level = logrus.DebugLevel
}
client := &socketAppProxyClient{ clientAddr: clientAddr, timeout: timeout, logger: logger, }
server := newSocketAppProxyServer(bindAddr, logger)
proxy := &SocketAppProxy{
clientAddress: clientAddr,
bindAddress: bindAddr,
client: client,
server: server,
logger: logger,
}
go proxy.server.listen()
return proxy
}
//++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
//Implement AppProxy Interface
type AppProxy interface {
SubmitCh() chan []byte
CommitBlock(block ledger.Block) ([]byte, error)
}
func (p *SocketAppProxy) SubmitCh() chan []byte {
return p.server.submitCh
}
func (p *SocketAppProxy) CommitBlock(block ledger.Block) ([]byte, error) {
return p.client.commitBlock(block)
}