From 547016fc9b0ac3b2baa6c2d8f75a89421041d89f Mon Sep 17 00:00:00 2001 From: lyyyuna Date: Sun, 5 Sep 2021 20:27:29 +0800 Subject: [PATCH] add store interafce --- .gitignore | 3 +- cmd/server.go | 7 +- pkg/build/agent.tpl | 130 ++++++++++++++++++++--- pkg/build/agentwatch.tpl | 30 +++--- pkg/build/build.go | 2 +- pkg/build/config.go | 16 +-- pkg/build/install.go | 2 +- pkg/server/api.go | 65 +++++++++--- pkg/server/rpcstream.go | 77 ++++++++------ pkg/server/server.go | 194 +++++++++++++++++++++++++++++++--- pkg/server/store/filestore.go | 194 ++++++++++++++++++++++++++++++++++ pkg/server/store/store.go | 14 +++ pkg/server/watchstream.go | 57 +++++++--- 13 files changed, 667 insertions(+), 124 deletions(-) create mode 100644 pkg/server/store/filestore.go create mode 100644 pkg/server/store/store.go diff --git a/.gitignore b/.gitignore index 256f76d..7350d72 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,2 @@ -tests/e2e/tmp/* \ No newline at end of file +tests/e2e/tmp/* +.goc.kvstore \ No newline at end of file diff --git a/cmd/server.go b/cmd/server.go index e2d0e29..95edcf3 100644 --- a/cmd/server.go +++ b/cmd/server.go @@ -28,14 +28,17 @@ var serverCmd = &cobra.Command{ } var ( - serverHost string + serverHost string + serverStore string ) func init() { serverCmd.Flags().StringVarP(&serverHost, "host", "", "127.0.0.1:7777", "specify the host of the goc server") + serverCmd.Flags().StringVarP(&serverStore, "store", "", ".goc.kvstore", "specify the host of the goc server") + rootCmd.AddCommand(serverCmd) } func serve(cmd *cobra.Command, args []string) { - server.RunGocServerUntilExit(serverHost) + server.RunGocServerUntilExit(serverHost, serverStore) } diff --git a/pkg/build/agent.tpl b/pkg/build/agent.tpl index 6109e04..09ce300 100644 --- a/pkg/build/agent.tpl +++ b/pkg/build/agent.tpl @@ -16,13 +16,17 @@ package cover import ( "fmt" "io" + "io/ioutil" "log" + "net/http" "net/rpc" "net/rpc/jsonrpc" "net/url" + "encoding/json" "os" - "strconv" "strings" + "strconv" + "sync" "sync/atomic" "time" "testing" @@ -33,10 +37,17 @@ import ( ) var ( - waitDelay time.Duration = 10 * time.Second + waitDelay time.Duration = 5 * time.Second host string = "{{.Host}}" ) +var ( + token string + id string + cond = sync.NewCond(&sync.Mutex{}) + register_extra = os.Getenv("GOC_REGISTER_EXTRA") +) + func init() { // init host host_env := os.Getenv("GOC_CUSTOM_HOST") @@ -47,26 +58,35 @@ func init() { var dialer = websocket.DefaultDialer go func() { + register(host) + + cond.L.Lock() + cond.Broadcast() + cond.L.Unlock() + // 永不退出,出错后统一操作为:延时 + conitnue for { - // 获取进程元信息用于注册 - ps, err := getRegisterInfo() - if err != nil { - time.Sleep(waitDelay) - continue - } - - // 注册,直接将元信息放在 ws 地址中 + // 直接将 token 放在 ws 地址中 v := url.Values{} - v.Set("hostname", ps.hostname) - v.Set("pid", strconv.Itoa(ps.pid)) - v.Set("cmdline", ps.cmdline) + v.Set("token", token) + v.Set("id", id) v.Encode() rpcstreamUrl := fmt.Sprintf("ws://%v/v2/internal/ws/rpcstream?%v", host, v.Encode()) - ws, _, err := dialer.Dial(rpcstreamUrl, nil) + ws, resp, err := dialer.Dial(rpcstreamUrl, nil) if err != nil { - log.Printf("[goc][Error] rpc fail to dial to goc server: %v", err) + if resp != nil { + tmp, _ := ioutil.ReadAll(resp.Body) + resp.Body.Close() + log.Printf("[goc][Error] rpc fail to dial to goc server: %v, body: %v", err, string(tmp)) + + if isOffline(tmp) { + log.Printf("[goc][Error] needs re-register") + register(host) + } + } else { + log.Printf("[goc][Error] rpc fail to dial to goc server: %v", err) + } time.Sleep(waitDelay) continue } @@ -85,6 +105,86 @@ func init() { }() } +// register +func register (host string) { + for { + // 获取进程元信息用于注册 + ps, err := getRegisterInfo() + if err != nil { + time.Sleep(waitDelay) + continue + } + + // 注册,直接将元信息放在 ws 地址中 + v := url.Values{} + v.Set("hostname", ps.hostname) + v.Set("pid", strconv.Itoa(ps.pid)) + v.Set("cmdline", ps.cmdline) + v.Set("extra", register_extra) + v.Encode() + + req, err := http.NewRequest("GET", fmt.Sprintf("http://%v/v2/internal/register?%v", host, v.Encode()), nil) + if err != nil { + log.Printf("[goc][Error] register generate register http request: %v", err) + time.Sleep(waitDelay) + continue + } + + resp, err := http.DefaultClient.Do(req) + if err != nil { + log.Printf("[goc][Error] register fail to goc server: %v", err) + time.Sleep(waitDelay) + continue + } + defer resp.Body.Close() + + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + log.Printf("[goc][Error] fail to get register resp from the goc server: %v", err) + time.Sleep(waitDelay) + continue + } + + if resp.StatusCode != 200 { + log.Printf("[goc][Error] wrong register http statue code: %v", resp.StatusCode) + time.Sleep(waitDelay) + continue + } + + registerResp := struct { + Token string `json:"token"` + Id string `json:"id"` + }{} + + err = json.Unmarshal(body, ®isterResp) + if err != nil { + log.Printf("[goc][Error] register response json unmarshal failed: %v", err) + time.Sleep(waitDelay) + continue + } + + // register success + token = registerResp.Token + id = registerResp.Id + break + } +} + +// check if offline failed +func isOffline(data []byte) bool { + val := struct { + Code int `json:"code"` + }{} + err := json.Unmarshal(data, &val) + if err != nil { + return true + } + if val.Code == 1 { + return true + } + return false +} + // rpc type GocAgent struct { } diff --git a/pkg/build/agentwatch.tpl b/pkg/build/agentwatch.tpl index b5a5a04..07305a1 100644 --- a/pkg/build/agentwatch.tpl +++ b/pkg/build/agentwatch.tpl @@ -17,8 +17,8 @@ import ( "fmt" "time" "os" + "io/ioutil" "log" - "strconv" "net/url" "{{.GlobalCoverVarImportPath}}/websocket" @@ -36,25 +36,27 @@ func init() { var dialer = websocket.DefaultDialer go func() { + cond.L.Lock() + cond.Wait() + cond.L.Unlock() + for { - // 获取进程元信息用于注册 - ps, err := getRegisterInfo() - if err != nil { - time.Sleep(waitDelay) - continue - } - - // 注册,直接将元信息放在 ws 地址中 + // 直接将 token 放在 ws 地址中 v := url.Values{} - v.Set("hostname", ps.hostname) - v.Set("pid", strconv.Itoa(ps.pid)) - v.Set("cmdline", ps.cmdline) + v.Set("token", token) + v.Set("id", id) v.Encode() watchstreamUrl := fmt.Sprintf("ws://%v/v2/internal/ws/watchstream?%v", host, v.Encode()) - ws, _, err := dialer.Dial(watchstreamUrl, nil) + ws, resp, err := dialer.Dial(watchstreamUrl, nil) if err != nil { - log.Printf("[goc][Error] watch fail to dial to goc server: %v", err) + if resp != nil { + tmp, _ := ioutil.ReadAll(resp.Body) + resp.Body.Close() + log.Printf("[goc][Error] watch fail to dial to goc server: %v, body: %v", err, string(tmp)) + } else { + log.Printf("[goc][Error] watch fail to dial to goc server: %v", err) + } time.Sleep(waitDelay) continue } diff --git a/pkg/build/build.go b/pkg/build/build.go index bfae861..5f7a384 100644 --- a/pkg/build/build.go +++ b/pkg/build/build.go @@ -51,7 +51,7 @@ type Build struct { // NewBuild creates a Build struct // -func NewBuild(opts ...GocOption) *Build { +func NewBuild(opts ...gocOption) *Build { b := &Build{} for _, opt := range opts { diff --git a/pkg/build/config.go b/pkg/build/config.go index 15b6b65..bb052a8 100644 --- a/pkg/build/config.go +++ b/pkg/build/config.go @@ -17,45 +17,45 @@ import ( "github.com/spf13/pflag" ) -type GocOption func(*Build) +type gocOption func(*Build) -func WithHost(host string) GocOption { +func WithHost(host string) gocOption { return func(b *Build) { b.Host = host } } -func WithMode(mode string) GocOption { +func WithMode(mode string) gocOption { return func(b *Build) { b.Mode = mode } } -func WithArgs(args []string) GocOption { +func WithArgs(args []string) gocOption { return func(b *Build) { b.Args = args } } -func WithFlagSets(sets *pflag.FlagSet) GocOption { +func WithFlagSets(sets *pflag.FlagSet) gocOption { return func(b *Build) { b.FlagSets = sets } } -func WithBuild() GocOption { +func WithBuild() gocOption { return func(b *Build) { b.BuildType = 0 } } -func WithInstall() GocOption { +func WithInstall() gocOption { return func(b *Build) { b.BuildType = 1 } } -func WithDebug(enable bool) GocOption { +func WithDebug(enable bool) gocOption { return func(b *Build) { b.Debug = enable } diff --git a/pkg/build/install.go b/pkg/build/install.go index 18a95b1..4648503 100644 --- a/pkg/build/install.go +++ b/pkg/build/install.go @@ -20,7 +20,7 @@ import ( "github.com/qiniu/goc/v2/pkg/log" ) -func NewInstall(opts ...GocOption) *Build { +func NewInstall(opts ...gocOption) *Build { return NewBuild(opts...) } diff --git a/pkg/server/api.go b/pkg/server/api.go index 8d866df..6c34df3 100644 --- a/pkg/server/api.go +++ b/pkg/server/api.go @@ -29,7 +29,7 @@ import ( func (gs *gocServer) listAgents(c *gin.Context) { agents := make([]*gocCoveredAgent, 0) - gs.rpcAgents.Range(func(key, value interface{}) bool { + gs.agents.Range(func(key, value interface{}) bool { agent, ok := value.(*gocCoveredAgent) if !ok { return false @@ -52,7 +52,7 @@ func (gs *gocServer) getProfiles(c *gin.Context) { mergedProfiles := make([][]*cover.Profile, 0) - gs.rpcAgents.Range(func(key, value interface{}) bool { + gs.agents.Range(func(key, value interface{}) bool { agent, ok := value.(*gocCoveredAgent) if !ok { return false @@ -81,16 +81,12 @@ func (gs *gocServer) getProfiles(c *gin.Context) { case <-time.After(timeout): log.Warnf("rpc call timeout: %v", agent.Hostname) // 关闭链接 - agent.once.Do(func() { - close(agent.exitCh) - }) + agent.closeRpcConnOnce() case err := <-done: // 调用 rpc 发生错误 if err != nil { // 关闭链接 - agent.once.Do(func() { - close(agent.exitCh) - }) + agent.closeRpcConnOnce() } } // append profile @@ -98,9 +94,7 @@ func (gs *gocServer) getProfiles(c *gin.Context) { if err != nil { log.Errorf("fail to convert the received profile from: %v, reasson: %v. let's close the connection", agent.Id, err) // 关闭链接 - agent.once.Do(func() { - close(agent.exitCh) - }) + agent.closeRpcConnOnce() return } mu.Lock() @@ -140,7 +134,7 @@ func (gs *gocServer) getProfiles(c *gin.Context) { // // it is async, the function will return immediately func (gs *gocServer) resetProfiles(c *gin.Context) { - gs.rpcAgents.Range(func(key, value interface{}) bool { + gs.agents.Range(func(key, value interface{}) bool { agent, ok := value.(gocCoveredAgent) if !ok { return false @@ -153,9 +147,7 @@ func (gs *gocServer) resetProfiles(c *gin.Context) { if err != nil { log.Errorf("fail to reset profile from: %v, reasson: %v. let's close the connection", agent.Id, err) // 关闭链接 - agent.once.Do(func() { - close(agent.exitCh) - }) + agent.closeRpcConnOnce() } }() @@ -213,3 +205,46 @@ func (gs *gocServer) watchProfileUpdate(c *gin.Context) { <-gwc.exitCh } + +func (gs *gocServer) removeAgentById(c *gin.Context) { + id := c.Param("id") + + rawagent, ok := gs.agents.Load(id) + if !ok { + c.JSON(http.StatusNotFound, gin.H{ + "msg": "agent not found", + }) + return + } + + agent, ok := rawagent.(*gocCoveredAgent) + if !ok { + c.JSON(http.StatusNotFound, gin.H{ + "msg": "agent not found", + }) + return + } + + // 关闭相应连接 + agent.closeConnection() + // 从维护 agent 池里删除 + gs.agents.Delete(id) + // 从持久化中删除 + gs.removeAgentFromStore(id) +} + +func (gs *gocServer) removeAgents(c *gin.Context) { + gs.agents.Range(func(key, value interface{}) bool { + agent, ok := value.(*gocCoveredAgent) + if !ok { + return false + } + + agent.closeConnection() + gs.agents.Delete(key) + + return true + }) + + gs.removeAllAgentsFromStore() +} diff --git a/pkg/server/rpcstream.go b/pkg/server/rpcstream.go index 074741f..cd8a170 100644 --- a/pkg/server/rpcstream.go +++ b/pkg/server/rpcstream.go @@ -19,6 +19,7 @@ import ( "net/http" "net/rpc" "net/rpc/jsonrpc" + "sync" "time" "github.com/gin-gonic/gin" @@ -32,33 +33,41 @@ import ( // 2. 每个链接的 goc agent 作为 rpc 服务端 func (gs *gocServer) serveRpcStream(c *gin.Context) { // 检查插桩服务上报的信息 - remoteIP, _ := c.RemoteIP() - hostname := c.Query("hostname") - pid := c.Query("pid") - cmdline := c.Query("cmdline") + rpcRemoteIP, _ := c.RemoteIP() + id := c.Query("id") + token := c.Query("token") - if hostname == "" || pid == "" || cmdline == "" { + rawagent, ok := gs.agents.Load(id) + if !ok { c.JSON(http.StatusBadRequest, gin.H{ - "msg": "missing some params", - }) - return - } - // 计算插桩服务 id - agentId := gs.generateAgentId(remoteIP.String(), hostname, cmdline, pid) - // 检查 id 是否重复 - if _, ok := gs.rpcAgents.Load(agentId); ok { - c.JSON(http.StatusBadRequest, gin.H{ - "msg": "the rpc agent already exists", + "msg": "agent not registered", + "code": 1, }) return } - gocA := &gocCoveredAgent{ - RemoteIP: remoteIP.String(), - Hostname: hostname, - Pid: pid, - CmdLine: cmdline, - exitCh: make(chan int), + agent := rawagent.(*gocCoveredAgent) + if agent.Token != token { + c.JSON(http.StatusBadRequest, gin.H{ + "msg": "register token not match", + "code": 1, + }) + return + } + + // 更新 agent 信息 + agent.RpcRemoteIP = rpcRemoteIP.String() + agent.exitCh = make(chan int) + agent.Status &= ^DISCONNECT // 取消 DISCONNECT 的状态 + agent.Status |= RPCCONNECT // 设置为 RPC CONNECT 状态 + // 注册销毁函数 + var once sync.Once + agent.closeRpcConnOnce = func() { + once.Do(func() { + // 为什么只是关闭 channel?其它资源如何释放? + // close channel 后,本 goroutine 会进入到 defer + close(agent.exitCh) + }) } // upgrade to websocket @@ -74,10 +83,15 @@ func (gs *gocServer) serveRpcStream(c *gin.Context) { // 发送 close msg gs.wsclose(ws, deadline) time.Sleep(deadline) - // 从维护的 websocket 链接字典中移除 - gs.rpcAgents.Delete(agentId) + + // 取消 RPC CONNECT 状态 + agent.Status &= ^RPCCONNECT + if agent.Status == 0 { + agent.Status = DISCONNECT + } + ws.Close() - log.Infof("close rpc connection, %v", hostname) + log.Infof("close rpc connection, %v", agent.Hostname) }() // set pong handler @@ -94,17 +108,15 @@ func (gs *gocServer) serveRpcStream(c *gin.Context) { for range ticker.C { if err := gs.wsping(ws, PongWait); err != nil { - log.Errorf("rpc ping to %v failed: %v", hostname, err) + log.Errorf("rpc ping to %v failed: %v", agent.Hostname, err) break } } - gocA.once.Do(func() { - close(gocA.exitCh) - }) + agent.closeRpcConnOnce() }() - log.Infof("one rpc agent established, %v, cmdline: %v, pid: %v, hostname: %v", ws.RemoteAddr(), cmdline, pid, hostname) + log.Infof("one rpc agent established, %v, cmdline: %v, pid: %v, hostname: %v", ws.RemoteAddr(), agent.CmdLine, agent.Pid, agent.Hostname) // new rpc agent // 在这里 websocket server 作为 rpc 的客户端, // 发送 rpc 请求, @@ -112,11 +124,10 @@ func (gs *gocServer) serveRpcStream(c *gin.Context) { rwc := &ReadWriteCloser{ws: ws} codec := jsonrpc.NewClientCodec(rwc) - gocA.rpc = rpc.NewClientWithCodec(codec) - gocA.Id = string(agentId) - gs.rpcAgents.Store(agentId, gocA) + agent.rpc = rpc.NewClientWithCodec(codec) + // wait for exit - <-gocA.exitCh + <-agent.exitCh } // generateAgentId generate id based on agent's meta infomation diff --git a/pkg/server/server.go b/pkg/server/server.go index 3763532..7b9a102 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -14,40 +14,78 @@ package server import ( + "crypto/sha256" + "encoding/json" + "fmt" + "math/rand" "net/http" "net/rpc" + "strconv" "sync" + "sync/atomic" "time" "github.com/gin-gonic/gin" "github.com/gorilla/websocket" + "github.com/qiniu/goc/v2/pkg/log" + "github.com/qiniu/goc/v2/pkg/server/store" ) // gocServer represents a goc server type gocServer struct { - port int - storePath string - upgrader websocket.Upgrader + port int + store store.Store + + upgrader websocket.Upgrader + + agents sync.Map - rpcAgents sync.Map - watchAgents sync.Map watchCh chan []byte watchClients sync.Map + + idCount int64 } type gocCliendId string +const ( + DISCONNECT = 1 << iota + RPCCONNECT = 1 << iota + WATCHCONNECT = 1 << iota +) + // gocCoveredAgent represents a covered client type gocCoveredAgent struct { - Id string `json:"id"` - RemoteIP string `json:"remoteip"` - Hostname string `json:"hostname"` - CmdLine string `json:"cmdline"` - Pid string `json:"pid"` - rpc *rpc.Client `json:"-"` + Id string `json:"id"` + RpcRemoteIP string `json:"rpc_remoteip"` + WatchRemoteIP string `json:"watch_remoteip"` + Hostname string `json:"hostname"` + CmdLine string `json:"cmdline"` + Pid string `json:"pid"` - exitCh chan int `json:"-"` - once sync.Once `json:"-"` // 保护 close(exitCh) 只执行一次 + // 用户可以选择上报一些定制信息 + // 比如不同 namespace 的 statefulset POD,它们的 hostname/cmdline/pid 都是一样的, + // 这时候将 extra 设置为 namespace 并上报,这个额外的信息在展示时将更友好 + Extra string `json:"extra"` + + Token string `json:"token"` + Status int `json:"status"` // 表示该 agent 是否处于 connected 状态 + + rpc *rpc.Client `json:"-"` + + exitCh chan int `json:"-"` + closeRpcConnOnce func() `json:"-"` // close rpc conn 只执行一次 + closeWatchConnOnce func() `json:"-"` // close watch conn 只执行一次 +} + +func (agent *gocCoveredAgent) closeConnection() { + if agent.closeRpcConnOnce != nil { + agent.closeRpcConnOnce() + } + + if agent.closeWatchConnOnce != nil { + agent.closeWatchConnOnce() + } } // api 客户端,不是 agent @@ -57,9 +95,14 @@ type gocWatchClient struct { once sync.Once } -func RunGocServerUntilExit(host string) { +func RunGocServerUntilExit(host string, path string) { + s, err := store.NewFileStore(path) + if err != nil { + log.Fatalf("cannot create store for goc server: %v", err) + } + gs := gocServer{ - storePath: "", + store: s, upgrader: websocket.Upgrader{ ReadBufferSize: 4096, WriteBufferSize: 4096, @@ -71,17 +114,22 @@ func RunGocServerUntilExit(host string) { watchCh: make(chan []byte, 4096), } + // 从持久化存储上恢复 agent 列表 + gs.restoreAgents() + r := gin.Default() v2 := r.Group("/v2") { v2.GET("/cover/profile", gs.getProfiles) v2.DELETE("/cover/profile", gs.resetProfiles) - v2.GET("/rpcagents", gs.listAgents) - v2.GET("/watchagents", nil) + v2.GET("/agents", gs.listAgents) + v2.DELETE("/agents/:id", gs.removeAgentById) + v2.DELETE("/agents", gs.removeAgents) v2.GET("/cover/ws/watch", gs.watchProfileUpdate) // internal use only + v2.GET("/internal/register", gs.register) v2.GET("/internal/ws/rpcstream", gs.serveRpcStream) v2.GET("/internal/ws/watchstream", gs.serveWatchInternalStream) } @@ -90,3 +138,115 @@ func RunGocServerUntilExit(host string) { r.Run(host) } + +func (gs *gocServer) register(c *gin.Context) { + // 检查插桩服务上报的信息 + hostname := c.Query("hostname") + pid := c.Query("pid") + cmdline := c.Query("cmdline") + extra := c.Query("extra") + + if hostname == "" || pid == "" || cmdline == "" { + c.JSON(http.StatusBadRequest, gin.H{ + "msg": "missing some params", + }) + return + } + + atomic.AddInt64(&gs.idCount, 1) + + genToken := func(i int64) string { + now := time.Now().UnixNano() + random := rand.Int() + + raw := fmt.Sprintf("%v-%v-%v", i, random, now) + sum := sha256.Sum256([]byte(raw)) + h := fmt.Sprintf("%x", sum[:16]) + + return h + } + + token := genToken(gs.idCount) + id := strconv.Itoa(int(gs.idCount)) + + agent := &gocCoveredAgent{ + Id: id, + Hostname: hostname, + Pid: pid, + CmdLine: cmdline, + Token: token, + Status: DISCONNECT, + Extra: extra, + } + + // 持久化 + err := gs.saveAgentToStore(agent) + if err != nil { + log.Errorf("fail to save to store: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{ + "msg": err.Error(), + }) + } + // 维护 agent 连接 + gs.agents.Store(id, agent) + + log.Infof("one agent registered, id: %v, cmdline: %v, pid: %v, hostname: %v", id, agent.CmdLine, agent.Pid, agent.Hostname) + + c.JSON(http.StatusOK, gin.H{ + "id": strconv.Itoa(int(gs.idCount)), + "token": token, + }) +} + +func (gs *gocServer) saveAgentToStore(agent *gocCoveredAgent) error { + + value, err := json.Marshal(agent) + if err != nil { + return err + } + return gs.store.Set("/goc/agents/"+agent.Id, string(value)) +} + +func (gs *gocServer) removeAgentFromStore(id string) error { + + return gs.store.Remove("/goc/agents/" + id) +} + +func (gs *gocServer) removeAllAgentsFromStore() error { + + return gs.store.RangeRemove("/goc/agents/") +} + +func (gs *gocServer) restoreAgents() { + pattern := "/goc/agents/" + + // ignore err, 这个 err 不需要处理,直接忽略 + rawagents, _ := gs.store.Range(pattern) + + var maxId int + for _, rawagent := range rawagents { + var agent gocCoveredAgent + err := json.Unmarshal([]byte(rawagent), &agent) + if err != nil { + log.Fatalf("fail to unmarshal restore agents: %v", err) + } + + id, err := strconv.Atoi(agent.Id) + if err != nil { + log.Fatalf("fail to transfer id to number: %v", err) + } + if maxId < id { + maxId = id + } + + gs.agents.Store(agent.Id, &agent) + log.Infof("restore one agent: %v, %v from store", id, agent.RpcRemoteIP) + + agent.RpcRemoteIP = "" + agent.WatchRemoteIP = "" + agent.Status = DISCONNECT + } + + // 更新全局 id + atomic.StoreInt64(&gs.idCount, int64(maxId)) +} diff --git a/pkg/server/store/filestore.go b/pkg/server/store/filestore.go new file mode 100644 index 0000000..c8ac88b --- /dev/null +++ b/pkg/server/store/filestore.go @@ -0,0 +1,194 @@ +package store + +import ( + "bufio" + "fmt" + "os" + "strings" + "sync" +) + +type FileStore struct { + storePath string + mu sync.Mutex +} + +func NewFileStore(path string) (Store, error) { + f, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY, os.ModePerm) + if err != nil { + return nil, err + } + defer f.Close() + + return &FileStore{ + storePath: path, + }, nil +} + +func (s *FileStore) Get(key string) (string, error) { + s.mu.Lock() + defer s.mu.Unlock() + + f, err := os.Open(s.storePath) + if err != nil { + return "", err + } + defer f.Close() + + scanner := bufio.NewScanner(f) + for scanner.Scan() { + line := scanner.Text() + items := strings.SplitN(line, " ", 2) + + keyCandidate := items[0] + value := items[1] + + if key == keyCandidate { + return value, nil + } + } + + return "", fmt.Errorf("no key found") +} + +func (s *FileStore) Set(key string, value string) error { + s.mu.Lock() + defer s.mu.Unlock() + + f, err := os.OpenFile(s.storePath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, os.ModePerm) + if err != nil { + return err + } + defer f.Close() + + var outputLines string + scanner := bufio.NewScanner(f) + isFound := false + for scanner.Scan() { + line := scanner.Text() + items := strings.SplitN(line, " ", 2) + + keyCandidate := items[0] + + if key == keyCandidate { + line = key + " " + value + isFound = true + } else { + } + outputLines += line + "\n" + } + + if !isFound { + outputLines += key + " " + value + "\n" + } + + if err := os.Truncate(s.storePath, 0); err != nil { + return err + } + + f.Seek(0, os.SEEK_SET) + f.WriteString(outputLines) + + return nil +} + +func (s *FileStore) Remove(key string) error { + s.mu.Lock() + defer s.mu.Unlock() + + f, err := os.OpenFile(s.storePath, os.O_CREATE|os.O_WRONLY, os.ModePerm) + if err != nil { + return err + } + defer f.Close() + + var outputLines string + scanner := bufio.NewScanner(f) + for scanner.Scan() { + line := scanner.Text() + items := strings.SplitN(line, " ", 2) + + keyCandidate := items[0] + + if key == keyCandidate { + // pass + } else { + outputLines += line + "\n" + } + } + + if err := os.Truncate(s.storePath, 0); err != nil { + return err + } + + f.Seek(0, os.SEEK_SET) + f.WriteString(outputLines) + + return nil +} + +func (s *FileStore) Range(pattern string) ([]string, error) { + s.mu.Lock() + defer s.mu.Unlock() + + f, err := os.Open(s.storePath) + if err != nil { + return nil, err + } + defer f.Close() + + output := make([]string, 0) + scanner := bufio.NewScanner(f) + for scanner.Scan() { + line := scanner.Text() + items := strings.SplitN(line, " ", 2) + fmt.Println(items) + keyCandidate := items[0] + value := items[1] + + if strings.HasPrefix(keyCandidate, pattern) { + output = append(output, value) + } + } + + if len(output) == 0 { + return nil, fmt.Errorf("no key found") + } else { + return output, nil + } +} + +func (s *FileStore) RangeRemove(pattern string) error { + s.mu.Lock() + defer s.mu.Unlock() + + f, err := os.OpenFile(s.storePath, os.O_CREATE|os.O_WRONLY, os.ModePerm) + if err != nil { + return err + } + defer f.Close() + + var outputLines string + scanner := bufio.NewScanner(f) + for scanner.Scan() { + line := scanner.Text() + items := strings.SplitN(line, " ", 2) + + keyCandidate := items[0] + + if strings.HasPrefix(keyCandidate, pattern) { + // pass + } else { + outputLines += line + "\n" + } + } + + if err := os.Truncate(s.storePath, 0); err != nil { + return err + } + + f.Seek(0, os.SEEK_SET) + f.WriteString(outputLines) + + return nil +} diff --git a/pkg/server/store/store.go b/pkg/server/store/store.go new file mode 100644 index 0000000..3490e92 --- /dev/null +++ b/pkg/server/store/store.go @@ -0,0 +1,14 @@ +package store + +type Store interface { + // 返回 key 对应的数据 + Get(key string) (string, error) + // 设置 key 对应的数据 + Set(key string, value string) error + // 删除 key 和 key 对应的数据 + Remove(key string) error + // 遍历返回 pattern 开头的所有 key 数据 + Range(pattern string) ([]string, error) + // 遍历删除 pattern 开头的所有 key 数据 + RangeRemove(pattern string) error +} diff --git a/pkg/server/watchstream.go b/pkg/server/watchstream.go index 579435c..5af9ee1 100644 --- a/pkg/server/watchstream.go +++ b/pkg/server/watchstream.go @@ -15,6 +15,7 @@ package server import ( "net/http" + "sync" "time" "github.com/gin-gonic/gin" @@ -24,26 +25,34 @@ import ( func (gs *gocServer) serveWatchInternalStream(c *gin.Context) { // 检查插桩服务上报的信息 - remoteIP, _ := c.RemoteIP() - hostname := c.Query("hostname") - pid := c.Query("pid") - cmdline := c.Query("cmdline") + watchRemoteIP, _ := c.RemoteIP() + id := c.Query("id") + token := c.Query("token") - if hostname == "" || pid == "" || cmdline == "" { + rawagent, ok := gs.agents.Load(id) + if !ok { c.JSON(http.StatusBadRequest, gin.H{ - "msg": "missing some params", + "msg": "agent not registered", + "code": 1, }) return } - // 计算插桩服务 id - agentId := gs.generateAgentId(remoteIP.String(), hostname, cmdline, pid) - // 检查 id 是否重复 - if _, ok := gs.watchAgents.Load(agentId); ok { + + agent := rawagent.(*gocCoveredAgent) + if agent.Token != token { c.JSON(http.StatusBadRequest, gin.H{ - "msg": "the watch agent already exist", + "msg": "register token not match", + "code": 1, }) return } + + // 更新 agent 信息 + agent.WatchRemoteIP = watchRemoteIP.String() + agent.Status &= ^DISCONNECT // 取消 DISCONNECT 的状态 + agent.Status |= WATCHCONNECT // 设置为 RPC CONNECT 状态 + var once sync.Once + // upgrade to websocket ws, err := gs.upgrader.Upgrade(c.Writer, c.Request, nil) if err != nil { @@ -51,11 +60,25 @@ func (gs *gocServer) serveWatchInternalStream(c *gin.Context) { c.JSON(http.StatusInternalServerError, nil) } + // 注册销毁函数 + agent.closeWatchConnOnce = func() { + once.Do(func() { + // 关闭 ws 连接后,ws.ReadMessage() 会出错退出 goroutine,进入 defer + ws.Close() + }) + } + // send close msg and close ws connection defer func() { - gs.watchAgents.Delete(agentId) - ws.Close() - log.Infof("close watch connection, %v", hostname) + // 取消 WATCH CONNECT 状态 + agent.Status &= ^WATCHCONNECT + if agent.Status == 0 { + agent.Status = DISCONNECT + } + + agent.closeWatchConnOnce() + + log.Infof("close watch connection, %v", agent.Hostname) }() // set pong handler @@ -72,18 +95,18 @@ func (gs *gocServer) serveWatchInternalStream(c *gin.Context) { for range ticker.C { if err := gs.wsping(ws, PongWait); err != nil { - log.Errorf("watch ping to %v failed: %v", hostname, err) + log.Errorf("watch ping to %v failed: %v", agent.Hostname, err) break } } }() - log.Infof("one watch agent established, %v, cmdline: %v, pid: %v, hostname: %v", ws.RemoteAddr(), cmdline, pid, hostname) + log.Infof("one watch agent established, %v, cmdline: %v, pid: %v, hostname: %v", ws.RemoteAddr(), agent.CmdLine, agent.Pid, agent.Hostname) for { mt, message, err := ws.ReadMessage() if err != nil { - log.Errorf("read from %v: %v", hostname, err) + log.Errorf("read from %v: %v", agent.Hostname, err) break } if mt == websocket.TextMessage {