goc/pkg/server/api.go

152 lines
3.2 KiB
Go
Raw Normal View History

2021-06-10 12:03:47 +00:00
package server
import (
2021-06-19 08:17:29 +00:00
"bytes"
2021-06-10 12:03:47 +00:00
"net/http"
2021-06-19 08:17:29 +00:00
"sync"
"time"
2021-06-10 12:03:47 +00:00
"github.com/gin-gonic/gin"
2021-06-17 11:53:00 +00:00
"github.com/qiniu/goc/v2/pkg/log"
2021-06-19 08:17:29 +00:00
"golang.org/x/tools/cover"
"k8s.io/test-infra/gopherage/pkg/cov"
2021-06-10 12:03:47 +00:00
)
// listServices return all service informations
func (gs *gocServer) listServices(c *gin.Context) {
agents := make([]*gocCoveredAgent, 0)
2021-06-10 12:03:47 +00:00
gs.rpcClients.Range(func(key, value interface{}) bool {
agent, ok := value.(*gocCoveredAgent)
2021-06-10 12:03:47 +00:00
if !ok {
return false
}
2021-06-19 08:17:29 +00:00
agents = append(agents, agent)
2021-06-10 12:03:47 +00:00
return true
})
c.JSON(http.StatusOK, gin.H{
2021-06-19 08:17:29 +00:00
"items": agents,
2021-06-10 12:03:47 +00:00
})
}
2021-06-17 11:53:00 +00:00
2021-06-19 08:17:29 +00:00
// getProfiles get and merge all agents' informations
//
// it is synchronous
2021-06-17 11:53:00 +00:00
func (gs *gocServer) getProfiles(c *gin.Context) {
2021-06-19 08:17:29 +00:00
var mu sync.Mutex
var wg sync.WaitGroup
mergedProfiles := make([][]*cover.Profile, 0)
2021-06-17 11:53:00 +00:00
gs.rpcClients.Range(func(key, value interface{}) bool {
agent, ok := value.(*gocCoveredAgent)
2021-06-17 11:53:00 +00:00
if !ok {
return false
}
2021-06-19 08:17:29 +00:00
wg.Add(1)
// 并发 rpc且每个 rpc 设超时时间 10 second
go func() {
defer wg.Done()
timeout := time.Duration(10 * time.Second)
done := make(chan error, 1)
var req ProfileReq = "getprofile"
var res ProfileRes
go func() {
err := agent.rpc.Call("GocAgent.GetProfile", req, &res)
if err != nil {
log.Errorf("fail to get profile from: %v, reasson: %v. let's close the connection", agent.Id, err)
}
done <- err
}()
select {
// rpc 超时
case <-time.After(timeout):
log.Warnf("rpc call timeout: %v", agent.Hostname)
// 关闭链接
agent.once.Do(func() {
close(agent.exitCh)
})
case err := <-done:
// 调用 rpc 发生错误
if err != nil {
// 关闭链接
agent.once.Do(func() {
close(agent.exitCh)
})
}
}
// append profile
profile, err := convertProfile([]byte(res))
if err != nil {
log.Errorf("fail to convert the received profile from: %v, reasson: %v. let's close the connection", agent.Id, err)
// 关闭链接
agent.once.Do(func() {
close(agent.exitCh)
})
return
}
mu.Lock()
defer mu.Unlock()
mergedProfiles = append(mergedProfiles, profile)
}()
return true
})
// 一直等待并发的 rpc 都回应
wg.Wait()
merged, err := cov.MergeMultipleProfiles(mergedProfiles)
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{
"msg": err.Error(),
})
return
}
var buff bytes.Buffer
err = cov.DumpProfile(merged, &buff)
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{
"msg": err.Error(),
})
return
}
c.JSON(http.StatusOK, gin.H{
"profile": buff.String(),
})
}
// resetProfiles reset all profiles in agent
//
// it is async, the function will return immediately
func (gs *gocServer) resetProfiles(c *gin.Context) {
gs.rpcClients.Range(func(key, value interface{}) bool {
agent, ok := value.(gocCoveredAgent)
if !ok {
return false
2021-06-17 11:53:00 +00:00
}
2021-06-19 08:17:29 +00:00
var req ProfileReq = "resetprofile"
var res ProfileRes
go func() {
err := agent.rpc.Call("GocAgent.ResetProfile", req, &res)
if err != nil {
log.Errorf("fail to reset profile from: %v, reasson: %v. let's close the connection", agent.Id, err)
// 关闭链接
agent.once.Do(func() {
close(agent.exitCh)
})
}
}()
2021-06-17 11:53:00 +00:00
return true
})
}