support latest result retrieval in the storage

pull/179/head
Vit Listik 3 years ago
parent 9198239825
commit 12f106dcb7
No known key found for this signature in database
GPG Key ID: DDDF5138DA46D76B
  1. 4
      cmd/cli/main.go
  2. 4
      cmd/control/cmd/version.go
  3. 2
      cmd/daemon/run-server.go
  4. 4
      cmd/daemon/session-init.go
  5. 36
      internal/histcli/histcli.go
  6. 1
      internal/recordint/searchapp.go
  7. 2
      internal/searchapp/test.go
  8. 20
      internal/syncconnector/reader.go
  9. 2
      internal/syncconnector/syncconnector.go

@ -6,7 +6,7 @@ import (
"errors" "errors"
"flag" "flag"
"fmt" "fmt"
"io/ioutil" "io"
"net/http" "net/http"
"os" "os"
"sort" "sort"
@ -589,7 +589,7 @@ func SendCliMsg(out *output.Output, m msg.CliMsg, port string) msg.CliResponse {
} }
defer resp.Body.Close() defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body) body, err := io.ReadAll(resp.Body)
if err != nil { if err != nil {
out.Fatal("Failed read response", err) out.Fatal("Failed read response", err)
} }

@ -3,7 +3,7 @@ package cmd
import ( import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"io/ioutil" "io"
"net/http" "net/http"
"os" "os"
"strconv" "strconv"
@ -69,7 +69,7 @@ func getDaemonStatus(port int) (msg.StatusResponse, error) {
return mess, err return mess, err
} }
defer resp.Body.Close() defer resp.Body.Close()
jsn, err := ioutil.ReadAll(resp.Body) jsn, err := io.ReadAll(resp.Body)
if err != nil { if err != nil {
out.Fatal("Error while reading 'daemon /status' response", err) out.Fatal("Error while reading 'daemon /status' response", err)
} }

@ -35,7 +35,7 @@ func (s *Server) Run() {
shutdown := make(chan string) shutdown := make(chan string)
history := histcli.New() history := histcli.New(s.sugar)
// histfile // histfile
histfileRecords := make(chan recordint.Collect) histfileRecords := make(chan recordint.Collect)

@ -2,7 +2,7 @@ package main
import ( import (
"encoding/json" "encoding/json"
"io/ioutil" "io"
"net/http" "net/http"
"github.com/curusarn/resh/internal/recordint" "github.com/curusarn/resh/internal/recordint"
@ -19,7 +19,7 @@ func (h *sessionInitHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
sugar.Debugw("Handling request, sending response, reading body ...") sugar.Debugw("Handling request, sending response, reading body ...")
w.Write([]byte("OK\n")) w.Write([]byte("OK\n"))
// TODO: should we somehow check for errors here? // TODO: should we somehow check for errors here?
jsn, err := ioutil.ReadAll(r.Body) jsn, err := io.ReadAll(r.Body)
// run rest of the handler as goroutine to prevent any hangups // run rest of the handler as goroutine to prevent any hangups
go func() { go func() {
if err != nil { if err != nil {

@ -2,6 +2,7 @@ package histcli
import ( import (
"github.com/curusarn/resh/internal/recordint" "github.com/curusarn/resh/internal/recordint"
"go.uber.org/zap"
"sync" "sync"
) )
@ -9,12 +10,19 @@ import (
type Histcli struct { type Histcli struct {
// list of records // list of records
list []recordint.SearchApp list []recordint.SearchApp
knownIds map[string]struct{}
lock sync.RWMutex lock sync.RWMutex
sugar *zap.SugaredLogger
latest map[string]float64
} }
// New Histcli // New Histcli
func New() *Histcli { func New(sugar *zap.SugaredLogger) *Histcli {
return &Histcli{} return &Histcli{
sugar: sugar.With(zap.String("component", "histCli")),
knownIds: map[string]struct{}{},
latest: map[string]float64{},
}
} }
// AddRecord to the histcli // AddRecord to the histcli
@ -23,7 +31,13 @@ func (h *Histcli) AddRecord(rec *recordint.Indexed) {
h.lock.Lock() h.lock.Lock()
defer h.lock.Unlock() defer h.lock.Unlock()
if _, ok := h.knownIds[rec.Rec.RecordID]; !ok {
h.knownIds[rec.Rec.RecordID] = struct{}{}
h.list = append(h.list, cli) h.list = append(h.list, cli)
h.updateLatestPerDevice(cli)
} else {
h.sugar.Debugw("Record is already present", "id", rec.Rec.RecordID)
}
} }
// AddCmdLine to the histcli // AddCmdLine to the histcli
@ -41,3 +55,21 @@ func (h *Histcli) Dump() []recordint.SearchApp {
return h.list return h.list
} }
// updateLatestPerDevice should be called only with write lock because it does not lock on its own.
func (h *Histcli) updateLatestPerDevice(rec recordint.SearchApp) {
if l, ok := h.latest[rec.DeviceID]; ok {
if rec.Time > l {
h.latest[rec.DeviceID] = rec.Time
}
} else {
h.latest[rec.DeviceID] = rec.Time
}
}
func (h *Histcli) LatestRecordsPerDevice() map[string]float64 {
h.lock.RLock()
defer h.lock.RUnlock()
return h.latest
}

@ -43,6 +43,7 @@ func NewSearchApp(r *Indexed) SearchApp {
return SearchApp{ return SearchApp{
IsRaw: false, IsRaw: false,
SessionID: r.Rec.SessionID, SessionID: r.Rec.SessionID,
DeviceID: r.Rec.DeviceID,
CmdLine: r.Rec.CmdLine, CmdLine: r.Rec.CmdLine,
Host: r.Rec.Device, Host: r.Rec.Device,
Pwd: r.Rec.Pwd, Pwd: r.Rec.Pwd,

@ -17,7 +17,7 @@ func LoadHistoryFromFile(sugar *zap.SugaredLogger, historyPath string, numLines
if numLines != 0 && numLines < len(recs) { if numLines != 0 && numLines < len(recs) {
recs = recs[:numLines] recs = recs[:numLines]
} }
cliRecords := histcli.New() cliRecords := histcli.New(sugar)
for i := len(recs) - 1; i >= 0; i-- { for i := len(recs) - 1; i >= 0; i-- {
rec := recs[i] rec := recs[i]
cliRecords.AddRecord(&rec) cliRecords.AddRecord(&rec)

@ -3,9 +3,9 @@ package syncconnector
import ( import (
"bytes" "bytes"
"encoding/json" "encoding/json"
"fmt"
"github.com/curusarn/resh/record" "github.com/curusarn/resh/record"
"io" "io"
"io/ioutil"
"log" "log"
"net/http" "net/http"
"time" "time"
@ -15,15 +15,25 @@ func (sc SyncConnector) getLatestRecord(machineId *string) (map[string]string, e
return map[string]string{}, nil return map[string]string{}, nil
} }
func (sc SyncConnector) downloadRecords(lastRecords map[string]string) ([]record.V1, error) { func (sc SyncConnector) downloadRecords(lastRecords map[string]float64) ([]record.V1, error) {
var records []record.V1 var records []record.V1
client := http.Client{ client := http.Client{
Timeout: 3 * time.Second, Timeout: 3 * time.Second,
} }
// TODO: create request based on the local last records latestRes := map[string]string{}
responseBody := bytes.NewBuffer([]byte("{}")) for device, t := range lastRecords {
sc.sugar.Debugf("Latest for %s is %f", device, t)
latestRes[device] = fmt.Sprintf("%.4f", t)
}
latestJson, err := json.Marshal(latestRes)
if err != nil {
sc.sugar.Errorw("converting latest to JSON failed", "err", err)
return nil, err
}
responseBody := bytes.NewBuffer(latestJson)
address := sc.getAddressWithPath(historyEndpoint) address := sc.getAddressWithPath(historyEndpoint)
resp, err := client.Post(address, "application/json", responseBody) resp, err := client.Post(address, "application/json", responseBody)
@ -38,7 +48,7 @@ func (sc SyncConnector) downloadRecords(lastRecords map[string]string) ([]record
sc.sugar.Errorw("reader close failed", "err", err) sc.sugar.Errorw("reader close failed", "err", err)
} }
}(resp.Body) }(resp.Body)
body, err := ioutil.ReadAll(resp.Body) body, err := io.ReadAll(resp.Body)
if err != nil { if err != nil {
log.Fatalln(err) log.Fatalln(err)
} }

@ -43,7 +43,7 @@ func New(sugar *zap.SugaredLogger, address string, authToken string, pullPeriodS
for _ = range time.Tick(time.Second * time.Duration(pullPeriodSeconds)) { for _ = range time.Tick(time.Second * time.Duration(pullPeriodSeconds)) {
sc.sugar.Debug("checking remote") sc.sugar.Debug("checking remote")
recs, err := sc.downloadRecords(map[string]string{}) recs, err := sc.downloadRecords(sc.history.LatestRecordsPerDevice())
if err != nil { if err != nil {
continue continue
} }

Loading…
Cancel
Save