rpc: getprofiles rpc client finished

This commit is contained in:
lyyyuna 2021-06-19 16:17:29 +08:00
parent 4ae7fa3d31
commit 1e2b8485b4
8 changed files with 1477 additions and 73 deletions

13
go.mod
View File

@ -4,9 +4,7 @@ go 1.16
require ( require (
github.com/gin-gonic/gin v1.7.2 github.com/gin-gonic/gin v1.7.2
github.com/golang/protobuf v1.5.2 // indirect
github.com/gorilla/websocket v1.4.2 github.com/gorilla/websocket v1.4.2
github.com/json-iterator/go v1.1.11 // indirect
github.com/k0kubun/go-ansi v0.0.0-20180517002512-3bf9e2903213 github.com/k0kubun/go-ansi v0.0.0-20180517002512-3bf9e2903213
github.com/mattn/go-colorable v0.1.8 // indirect github.com/mattn/go-colorable v0.1.8 // indirect
github.com/mattn/go-isatty v0.0.13 // indirect github.com/mattn/go-isatty v0.0.13 // indirect
@ -14,11 +12,12 @@ require (
github.com/spf13/cobra v1.1.3 github.com/spf13/cobra v1.1.3
github.com/spf13/pflag v1.0.5 github.com/spf13/pflag v1.0.5
github.com/tongjingran/copy v1.4.2 github.com/tongjingran/copy v1.4.2
go.uber.org/multierr v1.6.0 // indirect go.uber.org/zap v1.17.0
go.uber.org/zap v1.16.0
golang.org/x/mod v0.4.2 golang.org/x/mod v0.4.2
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110 // indirect
golang.org/x/sys v0.0.0-20210608053332-aa57babbf139 // indirect golang.org/x/sys v0.0.0-20210608053332-aa57babbf139 // indirect
golang.org/x/text v0.3.6 // indirect golang.org/x/tools v0.1.3
k8s.io/kubectl v0.21.1 k8s.io/kubectl v0.21.2
k8s.io/test-infra v0.0.0-20210618100605-34aa2f2aa75b
) )
replace k8s.io/client-go => k8s.io/client-go v0.21.1

1314
go.sum

File diff suppressed because it is too large Load Diff

View File

@ -162,7 +162,7 @@ func injectCoverHandler(where string, covers []*config.PackageCover) {
} }
} }
// injectGlobalCoverVarFile 写入所有插桩变量的全局定义至单独一个文件 // injectGlobalCoverVarFile 写入所有插桩变量的全局定义至一个单独的文件
func injectGlobalCoverVarFile(decl string) { func injectGlobalCoverVarFile(decl string) {
globalCoverVarPackage := path.Base(config.GocConfig.GlobalCoverVarImportPath) globalCoverVarPackage := path.Base(config.GocConfig.GlobalCoverVarImportPath)
globalCoverDef := filepath.Join(config.GocConfig.TmpModProjectDir, globalCoverVarPackage) globalCoverDef := filepath.Join(config.GocConfig.TmpModProjectDir, globalCoverVarPackage)

View File

@ -1,46 +1,151 @@
package server package server
import ( import (
"bytes"
"net/http" "net/http"
"sync"
"time"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
"github.com/qiniu/goc/v2/pkg/log" "github.com/qiniu/goc/v2/pkg/log"
"golang.org/x/tools/cover"
"k8s.io/test-infra/gopherage/pkg/cov"
) )
// listServices return all service informations // listServices return all service informations
func (gs *gocServer) listServices(c *gin.Context) { func (gs *gocServer) listServices(c *gin.Context) {
services := make([]gocCoveredClient, 0) agents := make([]gocCoveredAgent, 0)
gs.rpcClients.Range(func(key, value interface{}) bool { gs.rpcClients.Range(func(key, value interface{}) bool {
service, ok := value.(gocCoveredClient) agent, ok := value.(gocCoveredAgent)
if !ok { if !ok {
return false return false
} }
services = append(services, service) agents = append(agents, agent)
return true return true
}) })
c.JSON(http.StatusOK, gin.H{ c.JSON(http.StatusOK, gin.H{
"items": services, "items": agents,
}) })
} }
// getProfiles get and merge all services' informations // getProfiles get and merge all agents' informations
//
// it is synchronous
func (gs *gocServer) getProfiles(c *gin.Context) { func (gs *gocServer) getProfiles(c *gin.Context) {
var mu sync.Mutex
var wg sync.WaitGroup
mergedProfiles := make([][]*cover.Profile, 0)
gs.rpcClients.Range(func(key, value interface{}) bool { gs.rpcClients.Range(func(key, value interface{}) bool {
service, ok := value.(gocCoveredClient) agent, ok := value.(gocCoveredAgent)
if !ok { if !ok {
return false return false
} }
var req GetProfileReq = "getprofile" wg.Add(1)
var res GetProfileRes // 并发 rpc且每个 rpc 设超时时间 10 second
err := service.rpc.Call("GocAgent.GetProfile", req, &res) go func() {
defer wg.Done()
timeout := time.Duration(10 * time.Second)
done := make(chan error, 1)
var req ProfileReq = "getprofile"
var res ProfileRes
go func() {
err := agent.rpc.Call("GocAgent.GetProfile", req, &res)
if err != nil { if err != nil {
log.Errorf("fail to get profile from: %v, reasson: %v", service.Id, err) log.Errorf("fail to get profile from: %v, reasson: %v. let's close the connection", agent.Id, err)
return true
} }
log.Infof("res: %v", res) done <- err
}()
select {
// rpc 超时
case <-time.After(timeout):
log.Warnf("rpc call timeout: %v", agent.Hostname)
// 关闭链接
agent.once.Do(func() {
close(agent.exitCh)
})
case err := <-done:
// 调用 rpc 发生错误
if err != nil {
// 关闭链接
agent.once.Do(func() {
close(agent.exitCh)
})
}
}
// append profile
profile, err := convertProfile([]byte(res))
if err != nil {
log.Errorf("fail to convert the received profile from: %v, reasson: %v. let's close the connection", agent.Id, err)
// 关闭链接
agent.once.Do(func() {
close(agent.exitCh)
})
return
}
mu.Lock()
defer mu.Unlock()
mergedProfiles = append(mergedProfiles, profile)
}()
return true
})
// 一直等待并发的 rpc 都回应
wg.Wait()
merged, err := cov.MergeMultipleProfiles(mergedProfiles)
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{
"msg": err.Error(),
})
return
}
var buff bytes.Buffer
err = cov.DumpProfile(merged, &buff)
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{
"msg": err.Error(),
})
return
}
c.JSON(http.StatusOK, gin.H{
"profile": buff.String(),
})
}
// resetProfiles reset all profiles in agent
//
// it is async, the function will return immediately
func (gs *gocServer) resetProfiles(c *gin.Context) {
gs.rpcClients.Range(func(key, value interface{}) bool {
agent, ok := value.(gocCoveredAgent)
if !ok {
return false
}
var req ProfileReq = "resetprofile"
var res ProfileRes
go func() {
err := agent.rpc.Call("GocAgent.ResetProfile", req, &res)
if err != nil {
log.Errorf("fail to reset profile from: %v, reasson: %v. let's close the connection", agent.Id, err)
// 关闭链接
agent.once.Do(func() {
close(agent.exitCh)
})
}
}()
return true return true
}) })

View File

@ -12,9 +12,9 @@ const (
PingWait = 10 * time.Second PingWait = 10 * time.Second
) )
type GetProfileReq string type ProfileReq string
type GetProfileRes string type ProfileRes string
type ReadWriteCloser struct { type ReadWriteCloser struct {
ws *websocket.Conn ws *websocket.Conn

View File

@ -13,19 +13,11 @@ import (
"github.com/qiniu/goc/v2/pkg/log" "github.com/qiniu/goc/v2/pkg/log"
) )
// GocCommandArg defines server -> client arg // serveRpcStream holds connection between goc server and agent.
type GocCommandArg struct { //
Type string // 1. goc server 作为 rpc 客户端
Content string //
} // 2. 每个链接的 goc agent 作为 rpc 服务端
// GocCommandReply defines server -> client reply
type GocCommandReply struct {
Type string
Code string
Content string
}
func (gs *gocServer) serveRpcStream(c *gin.Context) { func (gs *gocServer) serveRpcStream(c *gin.Context) {
// 检查插桩服务上报的信息 // 检查插桩服务上报的信息
remoteIP, _ := c.RemoteIP() remoteIP, _ := c.RemoteIP()
@ -49,7 +41,7 @@ func (gs *gocServer) serveRpcStream(c *gin.Context) {
return return
} }
gocC := gocCoveredClient{ gocA := gocCoveredAgent{
RemoteIP: remoteIP.String(), RemoteIP: remoteIP.String(),
Hostname: hostname, Hostname: hostname,
Pid: pid, Pid: pid,
@ -67,9 +59,13 @@ func (gs *gocServer) serveRpcStream(c *gin.Context) {
// send close msg and close ws connection // send close msg and close ws connection
defer func() { defer func() {
deadline := 1 * time.Second deadline := 1 * time.Second
// 发送 close msg
gs.wsclose(ws, deadline) gs.wsclose(ws, deadline)
time.Sleep(deadline) time.Sleep(deadline)
// 从维护的 websocket 链接字典中移除
gs.rpcClients.Delete(clientId)
ws.Close() ws.Close()
log.Infof("close connection, %v", hostname)
}() }()
// set pong handler // set pong handler
@ -81,17 +77,19 @@ func (gs *gocServer) serveRpcStream(c *gin.Context) {
// set ping goroutine to ping every PingWait time // set ping goroutine to ping every PingWait time
go func() { go func() {
ticker := time.Tick(PingWait) ticker := time.NewTicker(PingWait)
for range ticker { defer ticker.Stop()
for range ticker.C {
if err := gs.wsping(ws, PongWait); err != nil { if err := gs.wsping(ws, PongWait); err != nil {
log.Errorf("ping to %v failed: %v", ws.RemoteAddr(), err) log.Errorf("ping to %v failed: %v", hostname, err)
break break
} }
} }
// 从维护的 websocket 链接中移除 gocA.once.Do(func() {
gs.rpcClients.Delete(clientId) close(gocA.exitCh)
gs.wsclose(ws, 1) })
}() }()
log.Infof("one client established, %v, cmdline: %v, pid: %v, hostname: %v", ws.RemoteAddr(), cmdline, pid, hostname) log.Infof("one client established, %v, cmdline: %v, pid: %v, hostname: %v", ws.RemoteAddr(), cmdline, pid, hostname)
@ -102,12 +100,12 @@ func (gs *gocServer) serveRpcStream(c *gin.Context) {
rwc := &ReadWriteCloser{ws: ws} rwc := &ReadWriteCloser{ws: ws}
codec := jsonrpc.NewClientCodec(rwc) codec := jsonrpc.NewClientCodec(rwc)
gocC.rpc = rpc.NewClientWithCodec(codec) gocA.rpc = rpc.NewClientWithCodec(codec)
gocC.Id = string(clientId) gocA.Id = string(clientId)
gs.rpcClients.Store(clientId, gocC) gs.rpcClients.Store(clientId, gocA)
// wait for exit // wait for exit
for { for {
<-gocC.exitCh <-gocA.exitCh
} }
} }

View File

@ -17,20 +17,22 @@ type gocServer struct {
upgrader websocket.Upgrader upgrader websocket.Upgrader
rpcClients sync.Map rpcClients sync.Map
// mu sync.Mutex // used to protect rpcClients access // mu sync.Mutex // used to protect concurrent rpc call to agent
} }
type gocCliendId string type gocCliendId string
// gocCoveredClient represents a covered client // gocCoveredAgent represents a covered client
type gocCoveredClient struct { type gocCoveredAgent struct {
Id string `json:"id"` Id string `json:"id"`
RemoteIP string `json:"remoteip"` RemoteIP string `json:"remoteip"`
Hostname string `json:"hostname"` Hostname string `json:"hostname"`
CmdLine string `json:"cmdline"` CmdLine string `json:"cmdline"`
Pid string `json:"pid"` Pid string `json:"pid"`
rpc *rpc.Client `json:"-"` rpc *rpc.Client `json:"-"`
exitCh chan int `json:"-"` exitCh chan int `json:"-"`
once sync.Once `json:"-"` // 保护 close(exitCh) 只执行一次
} }
func RunGocServerUntilExit(port int) { func RunGocServerUntilExit(port int) {
@ -48,7 +50,7 @@ func RunGocServerUntilExit(port int) {
v2 := r.Group("/v2") v2 := r.Group("/v2")
{ {
v2.GET("/cover/profile", gs.getProfiles) v2.GET("/cover/profile", gs.getProfiles)
v2.DELETE("/cover/profile", nil) v2.DELETE("/cover/profile", gs.resetProfiles)
v2.GET("/services", gs.listServices) v2.GET("/services", gs.listServices)
v2.GET("/cover/ws/watch", nil) v2.GET("/cover/ws/watch", nil)

28
pkg/server/util.go Normal file
View File

@ -0,0 +1,28 @@
package server
import (
"bytes"
"fmt"
"io"
"io/ioutil"
"os"
"golang.org/x/tools/cover"
)
func convertProfile(p []byte) ([]*cover.Profile, error) {
// Annoyingly, ParseProfiles only accepts a filename, so we have to write the bytes to disk
// so it can read them back.
// We could probably also just give it /dev/stdin, but that'll break on Windows.
tf, err := ioutil.TempFile("", "")
if err != nil {
return nil, fmt.Errorf("failed to create temp file, err: %v", err)
}
defer tf.Close()
defer os.Remove(tf.Name())
if _, err := io.Copy(tf, bytes.NewReader(p)); err != nil {
return nil, fmt.Errorf("failed to copy data to temp file, err: %v", err)
}
return cover.ParseProfiles(tf.Name())
}