goc/pkg/server/api.go

201 lines
4.3 KiB
Go
Raw Normal View History

2021-06-10 12:03:47 +00:00
package server
import (
2021-06-19 08:17:29 +00:00
"bytes"
2021-06-10 12:03:47 +00:00
"net/http"
2021-06-19 08:17:29 +00:00
"sync"
"time"
2021-06-10 12:03:47 +00:00
"github.com/gin-gonic/gin"
2021-06-17 11:53:00 +00:00
"github.com/qiniu/goc/v2/pkg/log"
2021-06-19 08:17:29 +00:00
"golang.org/x/tools/cover"
"k8s.io/test-infra/gopherage/pkg/cov"
2021-06-10 12:03:47 +00:00
)
2021-06-24 07:22:24 +00:00
// listAgents return all service informations
func (gs *gocServer) listAgents(c *gin.Context) {
agents := make([]*gocCoveredAgent, 0)
2021-06-10 12:03:47 +00:00
2021-06-24 07:22:24 +00:00
gs.rpcAgents.Range(func(key, value interface{}) bool {
agent, ok := value.(*gocCoveredAgent)
2021-06-10 12:03:47 +00:00
if !ok {
return false
}
2021-06-19 08:17:29 +00:00
agents = append(agents, agent)
2021-06-10 12:03:47 +00:00
return true
})
c.JSON(http.StatusOK, gin.H{
2021-06-19 08:17:29 +00:00
"items": agents,
2021-06-10 12:03:47 +00:00
})
}
2021-06-17 11:53:00 +00:00
2021-06-19 08:17:29 +00:00
// getProfiles get and merge all agents' informations
//
// it is synchronous
2021-06-17 11:53:00 +00:00
func (gs *gocServer) getProfiles(c *gin.Context) {
2021-06-19 08:17:29 +00:00
var mu sync.Mutex
var wg sync.WaitGroup
mergedProfiles := make([][]*cover.Profile, 0)
2021-06-24 07:22:24 +00:00
gs.rpcAgents.Range(func(key, value interface{}) bool {
agent, ok := value.(*gocCoveredAgent)
2021-06-17 11:53:00 +00:00
if !ok {
return false
}
2021-06-19 08:17:29 +00:00
wg.Add(1)
// 并发 rpc且每个 rpc 设超时时间 10 second
go func() {
defer wg.Done()
timeout := time.Duration(10 * time.Second)
done := make(chan error, 1)
var req ProfileReq = "getprofile"
var res ProfileRes
go func() {
err := agent.rpc.Call("GocAgent.GetProfile", req, &res)
if err != nil {
log.Errorf("fail to get profile from: %v, reasson: %v. let's close the connection", agent.Id, err)
}
done <- err
}()
select {
// rpc 超时
case <-time.After(timeout):
log.Warnf("rpc call timeout: %v", agent.Hostname)
// 关闭链接
agent.once.Do(func() {
close(agent.exitCh)
})
case err := <-done:
// 调用 rpc 发生错误
if err != nil {
// 关闭链接
agent.once.Do(func() {
close(agent.exitCh)
})
}
}
// append profile
profile, err := convertProfile([]byte(res))
if err != nil {
log.Errorf("fail to convert the received profile from: %v, reasson: %v. let's close the connection", agent.Id, err)
// 关闭链接
agent.once.Do(func() {
close(agent.exitCh)
})
return
}
mu.Lock()
defer mu.Unlock()
mergedProfiles = append(mergedProfiles, profile)
}()
return true
})
// 一直等待并发的 rpc 都回应
wg.Wait()
merged, err := cov.MergeMultipleProfiles(mergedProfiles)
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{
"msg": err.Error(),
})
return
}
var buff bytes.Buffer
err = cov.DumpProfile(merged, &buff)
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{
"msg": err.Error(),
})
return
}
c.JSON(http.StatusOK, gin.H{
"profile": buff.String(),
})
}
// resetProfiles reset all profiles in agent
//
// it is async, the function will return immediately
func (gs *gocServer) resetProfiles(c *gin.Context) {
2021-06-24 07:22:24 +00:00
gs.rpcAgents.Range(func(key, value interface{}) bool {
2021-06-19 08:17:29 +00:00
agent, ok := value.(gocCoveredAgent)
if !ok {
return false
2021-06-17 11:53:00 +00:00
}
2021-06-19 08:17:29 +00:00
var req ProfileReq = "resetprofile"
var res ProfileRes
go func() {
err := agent.rpc.Call("GocAgent.ResetProfile", req, &res)
if err != nil {
log.Errorf("fail to reset profile from: %v, reasson: %v. let's close the connection", agent.Id, err)
// 关闭链接
agent.once.Do(func() {
close(agent.exitCh)
})
}
}()
2021-06-17 11:53:00 +00:00
return true
})
}
2021-06-24 07:22:24 +00:00
// watchProfileUpdate watch the profile change
//
// any profile change will be updated on this websocket connection.
func (gs *gocServer) watchProfileUpdate(c *gin.Context) {
// upgrade to websocket
ws, err := gs.upgrader.Upgrade(c.Writer, c.Request, nil)
if err != nil {
log.Errorf("fail to establish websocket connection with watch client: %v", err)
c.JSON(http.StatusInternalServerError, nil)
}
log.Infof("watch client connected")
id := time.Now().String()
gwc := &gocWatchClient{
ws: ws,
exitCh: make(chan int),
}
gs.watchClients.Store(id, gwc)
// send close msg and close ws connection
defer func() {
gs.watchClients.Delete(id)
ws.Close()
gwc.once.Do(func() { close(gwc.exitCh) })
log.Infof("watch client disconnected")
}()
// set pong handler
ws.SetReadDeadline(time.Now().Add(PongWait))
ws.SetPongHandler(func(string) error {
ws.SetReadDeadline(time.Now().Add(PongWait))
return nil
})
// set ping goroutine to ping every PingWait time
go func() {
ticker := time.NewTicker(PingWait)
defer ticker.Stop()
for range ticker.C {
if err := gs.wsping(ws, PongWait); err != nil {
break
}
}
}()
<-gwc.exitCh
}