add store interafce

This commit is contained in:
lyyyuna 2021-09-05 20:27:29 +08:00 committed by Li Yiyang
parent f8055dcf30
commit 547016fc9b
13 changed files with 667 additions and 124 deletions

3
.gitignore vendored
View File

@ -1 +1,2 @@
tests/e2e/tmp/*
tests/e2e/tmp/*
.goc.kvstore

View File

@ -28,14 +28,17 @@ var serverCmd = &cobra.Command{
}
var (
serverHost string
serverHost string
serverStore string
)
func init() {
serverCmd.Flags().StringVarP(&serverHost, "host", "", "127.0.0.1:7777", "specify the host of the goc server")
serverCmd.Flags().StringVarP(&serverStore, "store", "", ".goc.kvstore", "specify the host of the goc server")
rootCmd.AddCommand(serverCmd)
}
func serve(cmd *cobra.Command, args []string) {
server.RunGocServerUntilExit(serverHost)
server.RunGocServerUntilExit(serverHost, serverStore)
}

View File

@ -16,13 +16,17 @@ package cover
import (
"fmt"
"io"
"io/ioutil"
"log"
"net/http"
"net/rpc"
"net/rpc/jsonrpc"
"net/url"
"encoding/json"
"os"
"strconv"
"strings"
"strconv"
"sync"
"sync/atomic"
"time"
"testing"
@ -33,10 +37,17 @@ import (
)
var (
waitDelay time.Duration = 10 * time.Second
waitDelay time.Duration = 5 * time.Second
host string = "{{.Host}}"
)
var (
token string
id string
cond = sync.NewCond(&sync.Mutex{})
register_extra = os.Getenv("GOC_REGISTER_EXTRA")
)
func init() {
// init host
host_env := os.Getenv("GOC_CUSTOM_HOST")
@ -47,26 +58,35 @@ func init() {
var dialer = websocket.DefaultDialer
go func() {
register(host)
cond.L.Lock()
cond.Broadcast()
cond.L.Unlock()
// 永不退出,出错后统一操作为:延时 + conitnue
for {
// 获取进程元信息用于注册
ps, err := getRegisterInfo()
if err != nil {
time.Sleep(waitDelay)
continue
}
// 注册,直接将元信息放在 ws 地址中
// 直接将 token 放在 ws 地址中
v := url.Values{}
v.Set("hostname", ps.hostname)
v.Set("pid", strconv.Itoa(ps.pid))
v.Set("cmdline", ps.cmdline)
v.Set("token", token)
v.Set("id", id)
v.Encode()
rpcstreamUrl := fmt.Sprintf("ws://%v/v2/internal/ws/rpcstream?%v", host, v.Encode())
ws, _, err := dialer.Dial(rpcstreamUrl, nil)
ws, resp, err := dialer.Dial(rpcstreamUrl, nil)
if err != nil {
log.Printf("[goc][Error] rpc fail to dial to goc server: %v", err)
if resp != nil {
tmp, _ := ioutil.ReadAll(resp.Body)
resp.Body.Close()
log.Printf("[goc][Error] rpc fail to dial to goc server: %v, body: %v", err, string(tmp))
if isOffline(tmp) {
log.Printf("[goc][Error] needs re-register")
register(host)
}
} else {
log.Printf("[goc][Error] rpc fail to dial to goc server: %v", err)
}
time.Sleep(waitDelay)
continue
}
@ -85,6 +105,86 @@ func init() {
}()
}
// register
func register (host string) {
for {
// 获取进程元信息用于注册
ps, err := getRegisterInfo()
if err != nil {
time.Sleep(waitDelay)
continue
}
// 注册,直接将元信息放在 ws 地址中
v := url.Values{}
v.Set("hostname", ps.hostname)
v.Set("pid", strconv.Itoa(ps.pid))
v.Set("cmdline", ps.cmdline)
v.Set("extra", register_extra)
v.Encode()
req, err := http.NewRequest("GET", fmt.Sprintf("http://%v/v2/internal/register?%v", host, v.Encode()), nil)
if err != nil {
log.Printf("[goc][Error] register generate register http request: %v", err)
time.Sleep(waitDelay)
continue
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
log.Printf("[goc][Error] register fail to goc server: %v", err)
time.Sleep(waitDelay)
continue
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
log.Printf("[goc][Error] fail to get register resp from the goc server: %v", err)
time.Sleep(waitDelay)
continue
}
if resp.StatusCode != 200 {
log.Printf("[goc][Error] wrong register http statue code: %v", resp.StatusCode)
time.Sleep(waitDelay)
continue
}
registerResp := struct {
Token string `json:"token"`
Id string `json:"id"`
}{}
err = json.Unmarshal(body, &registerResp)
if err != nil {
log.Printf("[goc][Error] register response json unmarshal failed: %v", err)
time.Sleep(waitDelay)
continue
}
// register success
token = registerResp.Token
id = registerResp.Id
break
}
}
// check if offline failed
func isOffline(data []byte) bool {
val := struct {
Code int `json:"code"`
}{}
err := json.Unmarshal(data, &val)
if err != nil {
return true
}
if val.Code == 1 {
return true
}
return false
}
// rpc
type GocAgent struct {
}

View File

@ -17,8 +17,8 @@ import (
"fmt"
"time"
"os"
"io/ioutil"
"log"
"strconv"
"net/url"
"{{.GlobalCoverVarImportPath}}/websocket"
@ -36,25 +36,27 @@ func init() {
var dialer = websocket.DefaultDialer
go func() {
cond.L.Lock()
cond.Wait()
cond.L.Unlock()
for {
// 获取进程元信息用于注册
ps, err := getRegisterInfo()
if err != nil {
time.Sleep(waitDelay)
continue
}
// 注册,直接将元信息放在 ws 地址中
// 直接将 token 放在 ws 地址中
v := url.Values{}
v.Set("hostname", ps.hostname)
v.Set("pid", strconv.Itoa(ps.pid))
v.Set("cmdline", ps.cmdline)
v.Set("token", token)
v.Set("id", id)
v.Encode()
watchstreamUrl := fmt.Sprintf("ws://%v/v2/internal/ws/watchstream?%v", host, v.Encode())
ws, _, err := dialer.Dial(watchstreamUrl, nil)
ws, resp, err := dialer.Dial(watchstreamUrl, nil)
if err != nil {
log.Printf("[goc][Error] watch fail to dial to goc server: %v", err)
if resp != nil {
tmp, _ := ioutil.ReadAll(resp.Body)
resp.Body.Close()
log.Printf("[goc][Error] watch fail to dial to goc server: %v, body: %v", err, string(tmp))
} else {
log.Printf("[goc][Error] watch fail to dial to goc server: %v", err)
}
time.Sleep(waitDelay)
continue
}

View File

@ -51,7 +51,7 @@ type Build struct {
// NewBuild creates a Build struct
//
func NewBuild(opts ...GocOption) *Build {
func NewBuild(opts ...gocOption) *Build {
b := &Build{}
for _, opt := range opts {

View File

@ -17,45 +17,45 @@ import (
"github.com/spf13/pflag"
)
type GocOption func(*Build)
type gocOption func(*Build)
func WithHost(host string) GocOption {
func WithHost(host string) gocOption {
return func(b *Build) {
b.Host = host
}
}
func WithMode(mode string) GocOption {
func WithMode(mode string) gocOption {
return func(b *Build) {
b.Mode = mode
}
}
func WithArgs(args []string) GocOption {
func WithArgs(args []string) gocOption {
return func(b *Build) {
b.Args = args
}
}
func WithFlagSets(sets *pflag.FlagSet) GocOption {
func WithFlagSets(sets *pflag.FlagSet) gocOption {
return func(b *Build) {
b.FlagSets = sets
}
}
func WithBuild() GocOption {
func WithBuild() gocOption {
return func(b *Build) {
b.BuildType = 0
}
}
func WithInstall() GocOption {
func WithInstall() gocOption {
return func(b *Build) {
b.BuildType = 1
}
}
func WithDebug(enable bool) GocOption {
func WithDebug(enable bool) gocOption {
return func(b *Build) {
b.Debug = enable
}

View File

@ -20,7 +20,7 @@ import (
"github.com/qiniu/goc/v2/pkg/log"
)
func NewInstall(opts ...GocOption) *Build {
func NewInstall(opts ...gocOption) *Build {
return NewBuild(opts...)
}

View File

@ -29,7 +29,7 @@ import (
func (gs *gocServer) listAgents(c *gin.Context) {
agents := make([]*gocCoveredAgent, 0)
gs.rpcAgents.Range(func(key, value interface{}) bool {
gs.agents.Range(func(key, value interface{}) bool {
agent, ok := value.(*gocCoveredAgent)
if !ok {
return false
@ -52,7 +52,7 @@ func (gs *gocServer) getProfiles(c *gin.Context) {
mergedProfiles := make([][]*cover.Profile, 0)
gs.rpcAgents.Range(func(key, value interface{}) bool {
gs.agents.Range(func(key, value interface{}) bool {
agent, ok := value.(*gocCoveredAgent)
if !ok {
return false
@ -81,16 +81,12 @@ func (gs *gocServer) getProfiles(c *gin.Context) {
case <-time.After(timeout):
log.Warnf("rpc call timeout: %v", agent.Hostname)
// 关闭链接
agent.once.Do(func() {
close(agent.exitCh)
})
agent.closeRpcConnOnce()
case err := <-done:
// 调用 rpc 发生错误
if err != nil {
// 关闭链接
agent.once.Do(func() {
close(agent.exitCh)
})
agent.closeRpcConnOnce()
}
}
// append profile
@ -98,9 +94,7 @@ func (gs *gocServer) getProfiles(c *gin.Context) {
if err != nil {
log.Errorf("fail to convert the received profile from: %v, reasson: %v. let's close the connection", agent.Id, err)
// 关闭链接
agent.once.Do(func() {
close(agent.exitCh)
})
agent.closeRpcConnOnce()
return
}
mu.Lock()
@ -140,7 +134,7 @@ func (gs *gocServer) getProfiles(c *gin.Context) {
//
// it is async, the function will return immediately
func (gs *gocServer) resetProfiles(c *gin.Context) {
gs.rpcAgents.Range(func(key, value interface{}) bool {
gs.agents.Range(func(key, value interface{}) bool {
agent, ok := value.(gocCoveredAgent)
if !ok {
return false
@ -153,9 +147,7 @@ func (gs *gocServer) resetProfiles(c *gin.Context) {
if err != nil {
log.Errorf("fail to reset profile from: %v, reasson: %v. let's close the connection", agent.Id, err)
// 关闭链接
agent.once.Do(func() {
close(agent.exitCh)
})
agent.closeRpcConnOnce()
}
}()
@ -213,3 +205,46 @@ func (gs *gocServer) watchProfileUpdate(c *gin.Context) {
<-gwc.exitCh
}
func (gs *gocServer) removeAgentById(c *gin.Context) {
id := c.Param("id")
rawagent, ok := gs.agents.Load(id)
if !ok {
c.JSON(http.StatusNotFound, gin.H{
"msg": "agent not found",
})
return
}
agent, ok := rawagent.(*gocCoveredAgent)
if !ok {
c.JSON(http.StatusNotFound, gin.H{
"msg": "agent not found",
})
return
}
// 关闭相应连接
agent.closeConnection()
// 从维护 agent 池里删除
gs.agents.Delete(id)
// 从持久化中删除
gs.removeAgentFromStore(id)
}
func (gs *gocServer) removeAgents(c *gin.Context) {
gs.agents.Range(func(key, value interface{}) bool {
agent, ok := value.(*gocCoveredAgent)
if !ok {
return false
}
agent.closeConnection()
gs.agents.Delete(key)
return true
})
gs.removeAllAgentsFromStore()
}

View File

@ -19,6 +19,7 @@ import (
"net/http"
"net/rpc"
"net/rpc/jsonrpc"
"sync"
"time"
"github.com/gin-gonic/gin"
@ -32,33 +33,41 @@ import (
// 2. 每个链接的 goc agent 作为 rpc 服务端
func (gs *gocServer) serveRpcStream(c *gin.Context) {
// 检查插桩服务上报的信息
remoteIP, _ := c.RemoteIP()
hostname := c.Query("hostname")
pid := c.Query("pid")
cmdline := c.Query("cmdline")
rpcRemoteIP, _ := c.RemoteIP()
id := c.Query("id")
token := c.Query("token")
if hostname == "" || pid == "" || cmdline == "" {
rawagent, ok := gs.agents.Load(id)
if !ok {
c.JSON(http.StatusBadRequest, gin.H{
"msg": "missing some params",
})
return
}
// 计算插桩服务 id
agentId := gs.generateAgentId(remoteIP.String(), hostname, cmdline, pid)
// 检查 id 是否重复
if _, ok := gs.rpcAgents.Load(agentId); ok {
c.JSON(http.StatusBadRequest, gin.H{
"msg": "the rpc agent already exists",
"msg": "agent not registered",
"code": 1,
})
return
}
gocA := &gocCoveredAgent{
RemoteIP: remoteIP.String(),
Hostname: hostname,
Pid: pid,
CmdLine: cmdline,
exitCh: make(chan int),
agent := rawagent.(*gocCoveredAgent)
if agent.Token != token {
c.JSON(http.StatusBadRequest, gin.H{
"msg": "register token not match",
"code": 1,
})
return
}
// 更新 agent 信息
agent.RpcRemoteIP = rpcRemoteIP.String()
agent.exitCh = make(chan int)
agent.Status &= ^DISCONNECT // 取消 DISCONNECT 的状态
agent.Status |= RPCCONNECT // 设置为 RPC CONNECT 状态
// 注册销毁函数
var once sync.Once
agent.closeRpcConnOnce = func() {
once.Do(func() {
// 为什么只是关闭 channel其它资源如何释放
// close channel 后,本 goroutine 会进入到 defer
close(agent.exitCh)
})
}
// upgrade to websocket
@ -74,10 +83,15 @@ func (gs *gocServer) serveRpcStream(c *gin.Context) {
// 发送 close msg
gs.wsclose(ws, deadline)
time.Sleep(deadline)
// 从维护的 websocket 链接字典中移除
gs.rpcAgents.Delete(agentId)
// 取消 RPC CONNECT 状态
agent.Status &= ^RPCCONNECT
if agent.Status == 0 {
agent.Status = DISCONNECT
}
ws.Close()
log.Infof("close rpc connection, %v", hostname)
log.Infof("close rpc connection, %v", agent.Hostname)
}()
// set pong handler
@ -94,17 +108,15 @@ func (gs *gocServer) serveRpcStream(c *gin.Context) {
for range ticker.C {
if err := gs.wsping(ws, PongWait); err != nil {
log.Errorf("rpc ping to %v failed: %v", hostname, err)
log.Errorf("rpc ping to %v failed: %v", agent.Hostname, err)
break
}
}
gocA.once.Do(func() {
close(gocA.exitCh)
})
agent.closeRpcConnOnce()
}()
log.Infof("one rpc agent established, %v, cmdline: %v, pid: %v, hostname: %v", ws.RemoteAddr(), cmdline, pid, hostname)
log.Infof("one rpc agent established, %v, cmdline: %v, pid: %v, hostname: %v", ws.RemoteAddr(), agent.CmdLine, agent.Pid, agent.Hostname)
// new rpc agent
// 在这里 websocket server 作为 rpc 的客户端,
// 发送 rpc 请求,
@ -112,11 +124,10 @@ func (gs *gocServer) serveRpcStream(c *gin.Context) {
rwc := &ReadWriteCloser{ws: ws}
codec := jsonrpc.NewClientCodec(rwc)
gocA.rpc = rpc.NewClientWithCodec(codec)
gocA.Id = string(agentId)
gs.rpcAgents.Store(agentId, gocA)
agent.rpc = rpc.NewClientWithCodec(codec)
// wait for exit
<-gocA.exitCh
<-agent.exitCh
}
// generateAgentId generate id based on agent's meta infomation

View File

@ -14,40 +14,78 @@
package server
import (
"crypto/sha256"
"encoding/json"
"fmt"
"math/rand"
"net/http"
"net/rpc"
"strconv"
"sync"
"sync/atomic"
"time"
"github.com/gin-gonic/gin"
"github.com/gorilla/websocket"
"github.com/qiniu/goc/v2/pkg/log"
"github.com/qiniu/goc/v2/pkg/server/store"
)
// gocServer represents a goc server
type gocServer struct {
port int
storePath string
upgrader websocket.Upgrader
port int
store store.Store
upgrader websocket.Upgrader
agents sync.Map
rpcAgents sync.Map
watchAgents sync.Map
watchCh chan []byte
watchClients sync.Map
idCount int64
}
type gocCliendId string
const (
DISCONNECT = 1 << iota
RPCCONNECT = 1 << iota
WATCHCONNECT = 1 << iota
)
// gocCoveredAgent represents a covered client
type gocCoveredAgent struct {
Id string `json:"id"`
RemoteIP string `json:"remoteip"`
Hostname string `json:"hostname"`
CmdLine string `json:"cmdline"`
Pid string `json:"pid"`
rpc *rpc.Client `json:"-"`
Id string `json:"id"`
RpcRemoteIP string `json:"rpc_remoteip"`
WatchRemoteIP string `json:"watch_remoteip"`
Hostname string `json:"hostname"`
CmdLine string `json:"cmdline"`
Pid string `json:"pid"`
exitCh chan int `json:"-"`
once sync.Once `json:"-"` // 保护 close(exitCh) 只执行一次
// 用户可以选择上报一些定制信息
// 比如不同 namespace 的 statefulset POD它们的 hostname/cmdline/pid 都是一样的,
// 这时候将 extra 设置为 namespace 并上报,这个额外的信息在展示时将更友好
Extra string `json:"extra"`
Token string `json:"token"`
Status int `json:"status"` // 表示该 agent 是否处于 connected 状态
rpc *rpc.Client `json:"-"`
exitCh chan int `json:"-"`
closeRpcConnOnce func() `json:"-"` // close rpc conn 只执行一次
closeWatchConnOnce func() `json:"-"` // close watch conn 只执行一次
}
func (agent *gocCoveredAgent) closeConnection() {
if agent.closeRpcConnOnce != nil {
agent.closeRpcConnOnce()
}
if agent.closeWatchConnOnce != nil {
agent.closeWatchConnOnce()
}
}
// api 客户端,不是 agent
@ -57,9 +95,14 @@ type gocWatchClient struct {
once sync.Once
}
func RunGocServerUntilExit(host string) {
func RunGocServerUntilExit(host string, path string) {
s, err := store.NewFileStore(path)
if err != nil {
log.Fatalf("cannot create store for goc server: %v", err)
}
gs := gocServer{
storePath: "",
store: s,
upgrader: websocket.Upgrader{
ReadBufferSize: 4096,
WriteBufferSize: 4096,
@ -71,17 +114,22 @@ func RunGocServerUntilExit(host string) {
watchCh: make(chan []byte, 4096),
}
// 从持久化存储上恢复 agent 列表
gs.restoreAgents()
r := gin.Default()
v2 := r.Group("/v2")
{
v2.GET("/cover/profile", gs.getProfiles)
v2.DELETE("/cover/profile", gs.resetProfiles)
v2.GET("/rpcagents", gs.listAgents)
v2.GET("/watchagents", nil)
v2.GET("/agents", gs.listAgents)
v2.DELETE("/agents/:id", gs.removeAgentById)
v2.DELETE("/agents", gs.removeAgents)
v2.GET("/cover/ws/watch", gs.watchProfileUpdate)
// internal use only
v2.GET("/internal/register", gs.register)
v2.GET("/internal/ws/rpcstream", gs.serveRpcStream)
v2.GET("/internal/ws/watchstream", gs.serveWatchInternalStream)
}
@ -90,3 +138,115 @@ func RunGocServerUntilExit(host string) {
r.Run(host)
}
func (gs *gocServer) register(c *gin.Context) {
// 检查插桩服务上报的信息
hostname := c.Query("hostname")
pid := c.Query("pid")
cmdline := c.Query("cmdline")
extra := c.Query("extra")
if hostname == "" || pid == "" || cmdline == "" {
c.JSON(http.StatusBadRequest, gin.H{
"msg": "missing some params",
})
return
}
atomic.AddInt64(&gs.idCount, 1)
genToken := func(i int64) string {
now := time.Now().UnixNano()
random := rand.Int()
raw := fmt.Sprintf("%v-%v-%v", i, random, now)
sum := sha256.Sum256([]byte(raw))
h := fmt.Sprintf("%x", sum[:16])
return h
}
token := genToken(gs.idCount)
id := strconv.Itoa(int(gs.idCount))
agent := &gocCoveredAgent{
Id: id,
Hostname: hostname,
Pid: pid,
CmdLine: cmdline,
Token: token,
Status: DISCONNECT,
Extra: extra,
}
// 持久化
err := gs.saveAgentToStore(agent)
if err != nil {
log.Errorf("fail to save to store: %v", err)
c.JSON(http.StatusInternalServerError, gin.H{
"msg": err.Error(),
})
}
// 维护 agent 连接
gs.agents.Store(id, agent)
log.Infof("one agent registered, id: %v, cmdline: %v, pid: %v, hostname: %v", id, agent.CmdLine, agent.Pid, agent.Hostname)
c.JSON(http.StatusOK, gin.H{
"id": strconv.Itoa(int(gs.idCount)),
"token": token,
})
}
func (gs *gocServer) saveAgentToStore(agent *gocCoveredAgent) error {
value, err := json.Marshal(agent)
if err != nil {
return err
}
return gs.store.Set("/goc/agents/"+agent.Id, string(value))
}
func (gs *gocServer) removeAgentFromStore(id string) error {
return gs.store.Remove("/goc/agents/" + id)
}
func (gs *gocServer) removeAllAgentsFromStore() error {
return gs.store.RangeRemove("/goc/agents/")
}
func (gs *gocServer) restoreAgents() {
pattern := "/goc/agents/"
// ignore err, 这个 err 不需要处理,直接忽略
rawagents, _ := gs.store.Range(pattern)
var maxId int
for _, rawagent := range rawagents {
var agent gocCoveredAgent
err := json.Unmarshal([]byte(rawagent), &agent)
if err != nil {
log.Fatalf("fail to unmarshal restore agents: %v", err)
}
id, err := strconv.Atoi(agent.Id)
if err != nil {
log.Fatalf("fail to transfer id to number: %v", err)
}
if maxId < id {
maxId = id
}
gs.agents.Store(agent.Id, &agent)
log.Infof("restore one agent: %v, %v from store", id, agent.RpcRemoteIP)
agent.RpcRemoteIP = ""
agent.WatchRemoteIP = ""
agent.Status = DISCONNECT
}
// 更新全局 id
atomic.StoreInt64(&gs.idCount, int64(maxId))
}

View File

@ -0,0 +1,194 @@
package store
import (
"bufio"
"fmt"
"os"
"strings"
"sync"
)
type FileStore struct {
storePath string
mu sync.Mutex
}
func NewFileStore(path string) (Store, error) {
f, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY, os.ModePerm)
if err != nil {
return nil, err
}
defer f.Close()
return &FileStore{
storePath: path,
}, nil
}
func (s *FileStore) Get(key string) (string, error) {
s.mu.Lock()
defer s.mu.Unlock()
f, err := os.Open(s.storePath)
if err != nil {
return "", err
}
defer f.Close()
scanner := bufio.NewScanner(f)
for scanner.Scan() {
line := scanner.Text()
items := strings.SplitN(line, " ", 2)
keyCandidate := items[0]
value := items[1]
if key == keyCandidate {
return value, nil
}
}
return "", fmt.Errorf("no key found")
}
func (s *FileStore) Set(key string, value string) error {
s.mu.Lock()
defer s.mu.Unlock()
f, err := os.OpenFile(s.storePath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, os.ModePerm)
if err != nil {
return err
}
defer f.Close()
var outputLines string
scanner := bufio.NewScanner(f)
isFound := false
for scanner.Scan() {
line := scanner.Text()
items := strings.SplitN(line, " ", 2)
keyCandidate := items[0]
if key == keyCandidate {
line = key + " " + value
isFound = true
} else {
}
outputLines += line + "\n"
}
if !isFound {
outputLines += key + " " + value + "\n"
}
if err := os.Truncate(s.storePath, 0); err != nil {
return err
}
f.Seek(0, os.SEEK_SET)
f.WriteString(outputLines)
return nil
}
func (s *FileStore) Remove(key string) error {
s.mu.Lock()
defer s.mu.Unlock()
f, err := os.OpenFile(s.storePath, os.O_CREATE|os.O_WRONLY, os.ModePerm)
if err != nil {
return err
}
defer f.Close()
var outputLines string
scanner := bufio.NewScanner(f)
for scanner.Scan() {
line := scanner.Text()
items := strings.SplitN(line, " ", 2)
keyCandidate := items[0]
if key == keyCandidate {
// pass
} else {
outputLines += line + "\n"
}
}
if err := os.Truncate(s.storePath, 0); err != nil {
return err
}
f.Seek(0, os.SEEK_SET)
f.WriteString(outputLines)
return nil
}
func (s *FileStore) Range(pattern string) ([]string, error) {
s.mu.Lock()
defer s.mu.Unlock()
f, err := os.Open(s.storePath)
if err != nil {
return nil, err
}
defer f.Close()
output := make([]string, 0)
scanner := bufio.NewScanner(f)
for scanner.Scan() {
line := scanner.Text()
items := strings.SplitN(line, " ", 2)
fmt.Println(items)
keyCandidate := items[0]
value := items[1]
if strings.HasPrefix(keyCandidate, pattern) {
output = append(output, value)
}
}
if len(output) == 0 {
return nil, fmt.Errorf("no key found")
} else {
return output, nil
}
}
func (s *FileStore) RangeRemove(pattern string) error {
s.mu.Lock()
defer s.mu.Unlock()
f, err := os.OpenFile(s.storePath, os.O_CREATE|os.O_WRONLY, os.ModePerm)
if err != nil {
return err
}
defer f.Close()
var outputLines string
scanner := bufio.NewScanner(f)
for scanner.Scan() {
line := scanner.Text()
items := strings.SplitN(line, " ", 2)
keyCandidate := items[0]
if strings.HasPrefix(keyCandidate, pattern) {
// pass
} else {
outputLines += line + "\n"
}
}
if err := os.Truncate(s.storePath, 0); err != nil {
return err
}
f.Seek(0, os.SEEK_SET)
f.WriteString(outputLines)
return nil
}

14
pkg/server/store/store.go Normal file
View File

@ -0,0 +1,14 @@
package store
type Store interface {
// 返回 key 对应的数据
Get(key string) (string, error)
// 设置 key 对应的数据
Set(key string, value string) error
// 删除 key 和 key 对应的数据
Remove(key string) error
// 遍历返回 pattern 开头的所有 key 数据
Range(pattern string) ([]string, error)
// 遍历删除 pattern 开头的所有 key 数据
RangeRemove(pattern string) error
}

View File

@ -15,6 +15,7 @@ package server
import (
"net/http"
"sync"
"time"
"github.com/gin-gonic/gin"
@ -24,26 +25,34 @@ import (
func (gs *gocServer) serveWatchInternalStream(c *gin.Context) {
// 检查插桩服务上报的信息
remoteIP, _ := c.RemoteIP()
hostname := c.Query("hostname")
pid := c.Query("pid")
cmdline := c.Query("cmdline")
watchRemoteIP, _ := c.RemoteIP()
id := c.Query("id")
token := c.Query("token")
if hostname == "" || pid == "" || cmdline == "" {
rawagent, ok := gs.agents.Load(id)
if !ok {
c.JSON(http.StatusBadRequest, gin.H{
"msg": "missing some params",
"msg": "agent not registered",
"code": 1,
})
return
}
// 计算插桩服务 id
agentId := gs.generateAgentId(remoteIP.String(), hostname, cmdline, pid)
// 检查 id 是否重复
if _, ok := gs.watchAgents.Load(agentId); ok {
agent := rawagent.(*gocCoveredAgent)
if agent.Token != token {
c.JSON(http.StatusBadRequest, gin.H{
"msg": "the watch agent already exist",
"msg": "register token not match",
"code": 1,
})
return
}
// 更新 agent 信息
agent.WatchRemoteIP = watchRemoteIP.String()
agent.Status &= ^DISCONNECT // 取消 DISCONNECT 的状态
agent.Status |= WATCHCONNECT // 设置为 RPC CONNECT 状态
var once sync.Once
// upgrade to websocket
ws, err := gs.upgrader.Upgrade(c.Writer, c.Request, nil)
if err != nil {
@ -51,11 +60,25 @@ func (gs *gocServer) serveWatchInternalStream(c *gin.Context) {
c.JSON(http.StatusInternalServerError, nil)
}
// 注册销毁函数
agent.closeWatchConnOnce = func() {
once.Do(func() {
// 关闭 ws 连接后ws.ReadMessage() 会出错退出 goroutine进入 defer
ws.Close()
})
}
// send close msg and close ws connection
defer func() {
gs.watchAgents.Delete(agentId)
ws.Close()
log.Infof("close watch connection, %v", hostname)
// 取消 WATCH CONNECT 状态
agent.Status &= ^WATCHCONNECT
if agent.Status == 0 {
agent.Status = DISCONNECT
}
agent.closeWatchConnOnce()
log.Infof("close watch connection, %v", agent.Hostname)
}()
// set pong handler
@ -72,18 +95,18 @@ func (gs *gocServer) serveWatchInternalStream(c *gin.Context) {
for range ticker.C {
if err := gs.wsping(ws, PongWait); err != nil {
log.Errorf("watch ping to %v failed: %v", hostname, err)
log.Errorf("watch ping to %v failed: %v", agent.Hostname, err)
break
}
}
}()
log.Infof("one watch agent established, %v, cmdline: %v, pid: %v, hostname: %v", ws.RemoteAddr(), cmdline, pid, hostname)
log.Infof("one watch agent established, %v, cmdline: %v, pid: %v, hostname: %v", ws.RemoteAddr(), agent.CmdLine, agent.Pid, agent.Hostname)
for {
mt, message, err := ws.ReadMessage()
if err != nil {
log.Errorf("read from %v: %v", hostname, err)
log.Errorf("read from %v: %v", agent.Hostname, err)
break
}
if mt == websocket.TextMessage {