goc/pkg/server/rpcstream.go
2021-06-24 15:22:24 +08:00

120 lines
2.8 KiB
Go

package server
import (
"crypto/sha256"
"fmt"
"net/http"
"net/rpc"
"net/rpc/jsonrpc"
"time"
"github.com/gin-gonic/gin"
"github.com/qiniu/goc/v2/pkg/log"
)
// serveRpcStream holds connection between goc server and agent.
//
// 1. goc server 作为 rpc 客户端
//
// 2. 每个链接的 goc agent 作为 rpc 服务端
func (gs *gocServer) serveRpcStream(c *gin.Context) {
// 检查插桩服务上报的信息
remoteIP, _ := c.RemoteIP()
hostname := c.Query("hostname")
pid := c.Query("pid")
cmdline := c.Query("cmdline")
if hostname == "" || pid == "" || cmdline == "" {
c.JSON(http.StatusBadRequest, gin.H{
"msg": "missing some params",
})
return
}
// 计算插桩服务 id
agentId := gs.generateAgentId(remoteIP.String(), hostname, cmdline, pid)
// 检查 id 是否重复
if _, ok := gs.rpcAgents.Load(agentId); ok {
c.JSON(http.StatusBadRequest, gin.H{
"msg": "the rpc agent already exists",
})
return
}
gocA := &gocCoveredAgent{
RemoteIP: remoteIP.String(),
Hostname: hostname,
Pid: pid,
CmdLine: cmdline,
exitCh: make(chan int),
}
// upgrade to websocket
ws, err := gs.upgrader.Upgrade(c.Writer, c.Request, nil)
if err != nil {
log.Errorf("fail to establish websocket connection with rpc agent: %v", err)
c.JSON(http.StatusInternalServerError, nil)
}
// send close msg and close ws connection
defer func() {
deadline := 1 * time.Second
// 发送 close msg
gs.wsclose(ws, deadline)
time.Sleep(deadline)
// 从维护的 websocket 链接字典中移除
gs.rpcAgents.Delete(agentId)
ws.Close()
log.Infof("close rpc connection, %v", hostname)
}()
// set pong handler
ws.SetReadDeadline(time.Now().Add(PongWait))
ws.SetPongHandler(func(string) error {
ws.SetReadDeadline(time.Now().Add(PongWait))
return nil
})
// set ping goroutine to ping every PingWait time
go func() {
ticker := time.NewTicker(PingWait)
defer ticker.Stop()
for range ticker.C {
if err := gs.wsping(ws, PongWait); err != nil {
log.Errorf("rpc ping to %v failed: %v", hostname, err)
break
}
}
gocA.once.Do(func() {
close(gocA.exitCh)
})
}()
log.Infof("one rpc agent established, %v, cmdline: %v, pid: %v, hostname: %v", ws.RemoteAddr(), cmdline, pid, hostname)
// new rpc agent
// 在这里 websocket server 作为 rpc 的客户端,
// 发送 rpc 请求,
// 由被插桩服务返回 rpc 应答
rwc := &ReadWriteCloser{ws: ws}
codec := jsonrpc.NewClientCodec(rwc)
gocA.rpc = rpc.NewClientWithCodec(codec)
gocA.Id = string(agentId)
gs.rpcAgents.Store(agentId, gocA)
// wait for exit
<-gocA.exitCh
}
// generateAgentId generate id based on agent's meta infomation
func (gs *gocServer) generateAgentId(args ...string) gocCliendId {
var path string
for _, arg := range args {
path += arg
}
sum := sha256.Sum256([]byte(path))
h := fmt.Sprintf("%x", sum[:6])
return gocCliendId(h)
}