initial configurable Sync Connector stub, shared history object

pull/179/head
Vit Listik 3 years ago
parent e8072597c8
commit ad4784de7d
No known key found for this signature in database
GPG Key ID: DDDF5138DA46D76B
  1. 12
      cmd/daemon/dump.go
  2. 2
      cmd/daemon/main.go
  3. 24
      cmd/daemon/run-server.go
  4. 20
      internal/cfg/cfg.go
  5. 23
      internal/histcli/histcli.go
  6. 14
      internal/histfile/histfile.go
  7. 2
      internal/histio/histio.go
  8. 2
      internal/histlist/histlist.go
  9. 2
      internal/searchapp/test.go
  10. 12
      internal/syncconnector/reader.go
  11. 54
      internal/syncconnector/syncconnector.go
  12. 49
      internal/syncconnector/writer.go

@ -2,17 +2,17 @@ package main
import ( import (
"encoding/json" "encoding/json"
"github.com/curusarn/resh/internal/histcli"
"io/ioutil" "io/ioutil"
"net/http" "net/http"
"github.com/curusarn/resh/internal/histfile"
"github.com/curusarn/resh/internal/msg" "github.com/curusarn/resh/internal/msg"
"go.uber.org/zap" "go.uber.org/zap"
) )
type dumpHandler struct { type dumpHandler struct {
sugar *zap.SugaredLogger sugar *zap.SugaredLogger
histfileBox *histfile.Histfile history *histcli.Histcli
} }
func (h *dumpHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { func (h *dumpHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
@ -35,12 +35,8 @@ func (h *dumpHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return return
} }
sugar.Debugw("Getting records to send ...") sugar.Debugw("Getting records to send ...")
fullRecords := h.histfileBox.DumpCliRecords()
if err != nil {
sugar.Errorw("Error when getting records", "error", err)
}
resp := msg.CliResponse{Records: fullRecords.List} resp := msg.CliResponse{Records: h.history.Dump()}
jsn, err = json.Marshal(&resp) jsn, err = json.Marshal(&resp)
if err != nil { if err != nil {
sugar.Errorw("Error when marshaling", "error", err) sugar.Errorw("Error when marshaling", "error", err)

@ -126,7 +126,7 @@ func (d *daemon) killDaemon(pidfile string) error {
"PIDFile", pidfile, "PIDFile", pidfile,
"error", err) "error", err)
} }
d.sugar.Infow("Succesfully read PID file", "contents", string(dat)) d.sugar.Infow("Successfully read PID file", "contents", string(dat))
pid, err := strconv.Atoi(strings.TrimSuffix(string(dat), "\n")) pid, err := strconv.Atoi(strings.TrimSuffix(string(dat), "\n"))
if err != nil { if err != nil {
return fmt.Errorf("could not parse PID file contents: %w", err) return fmt.Errorf("could not parse PID file contents: %w", err)

@ -1,6 +1,8 @@
package main package main
import ( import (
"github.com/curusarn/resh/internal/histcli"
"github.com/curusarn/resh/internal/syncconnector"
"net/http" "net/http"
"os" "os"
"strconv" "strconv"
@ -33,6 +35,8 @@ func (s *Server) Run() {
shutdown := make(chan string) shutdown := make(chan string)
history := histcli.New()
// histfile // histfile
histfileRecords := make(chan recordint.Collect) histfileRecords := make(chan recordint.Collect)
recordSubscribers = append(recordSubscribers, histfileRecords) recordSubscribers = append(recordSubscribers, histfileRecords)
@ -42,13 +46,14 @@ func (s *Server) Run() {
signalSubscribers = append(signalSubscribers, histfileSignals) signalSubscribers = append(signalSubscribers, histfileSignals)
maxHistSize := 10000 // lines maxHistSize := 10000 // lines
minHistSizeKB := 2000 // roughly lines minHistSizeKB := 2000 // roughly lines
histfileBox := histfile.New(s.sugar, histfileRecords, histfileSessionsToDrop, histfile.New(s.sugar, histfileRecords, histfileSessionsToDrop,
s.reshHistoryPath, s.bashHistoryPath, s.zshHistoryPath, s.reshHistoryPath, s.bashHistoryPath, s.zshHistoryPath,
maxHistSize, minHistSizeKB, maxHistSize, minHistSizeKB,
histfileSignals, shutdown) histfileSignals, shutdown, history)
// sesswatch // sesswatch
sesswatchRecords := make(chan recordint.Collect) sesswatchRecords := make(chan recordint.Collect)
// TODO: add sync connector subscriber
recordSubscribers = append(recordSubscribers, sesswatchRecords) recordSubscribers = append(recordSubscribers, sesswatchRecords)
sesswatchSessionsToWatch := make(chan recordint.SessionInit) sesswatchSessionsToWatch := make(chan recordint.SessionInit)
sessionInitSubscribers = append(sessionInitSubscribers, sesswatchSessionsToWatch) sessionInitSubscribers = append(sessionInitSubscribers, sesswatchSessionsToWatch)
@ -65,7 +70,7 @@ func (s *Server) Run() {
mux.Handle("/status", &statusHandler{sugar: s.sugar}) mux.Handle("/status", &statusHandler{sugar: s.sugar})
mux.Handle("/record", &recordHandler{sugar: s.sugar, subscribers: recordSubscribers}) mux.Handle("/record", &recordHandler{sugar: s.sugar, subscribers: recordSubscribers})
mux.Handle("/session_init", &sessionInitHandler{sugar: s.sugar, subscribers: sessionInitSubscribers}) mux.Handle("/session_init", &sessionInitHandler{sugar: s.sugar, subscribers: sessionInitSubscribers})
mux.Handle("/dump", &dumpHandler{sugar: s.sugar, histfileBox: histfileBox}) mux.Handle("/dump", &dumpHandler{sugar: s.sugar, history: history})
server := &http.Server{ server := &http.Server{
Addr: "localhost:" + strconv.Itoa(s.config.Port), Addr: "localhost:" + strconv.Itoa(s.config.Port),
@ -77,6 +82,19 @@ func (s *Server) Run() {
} }
go server.ListenAndServe() go server.ListenAndServe()
s.sugar.Infow("", "sync_addr", s.config.SyncConnectorAddress)
if s.config.SyncConnectorAddress != nil {
sc, err := syncconnector.New(s.sugar, *s.config.SyncConnectorAddress, s.config.SyncConnectorAuthToken, s.config.SyncConnectorPullPeriodSeconds, history)
if err != nil {
s.sugar.Errorw("Sync Connector init failed", "error", err)
} else {
s.sugar.Infow("Initialized Sync Connector", "Sync Connector", sc)
}
// TODO: load sync connector data
// TODO: load sync connector data
// TODO: send connector data periodically (record by record / or batch)
}
// signalhandler - takes over the main goroutine so when signal handler exists the whole program exits // signalhandler - takes over the main goroutine so when signal handler exists the whole program exits
signalhandler.Run(s.sugar, signalSubscribers, shutdown, server) signalhandler.Run(s.sugar, signalSubscribers, shutdown, server)
} }

@ -30,7 +30,9 @@ type configFile struct {
BindArrowKeysBash *bool BindArrowKeysBash *bool
BindArrowKeysZsh *bool BindArrowKeysZsh *bool
SyncConnectorAddress *string SyncConnectorAddress *string
SyncConnectorAuthToken *string
SyncConnectorPullPeriodSeconds *int
} }
// Config returned by this package to be used in the rest of the project // Config returned by this package to be used in the rest of the project
@ -65,6 +67,12 @@ type Config struct {
// - https://domain.tld // - https://domain.tld
// - https://domain.tld/resh // - https://domain.tld/resh
SyncConnectorAddress *string SyncConnectorAddress *string
// SyncConnectorAuthToken used by the daemon to authenticate with the Sync Connector
SyncConnectorAuthToken string
// SyncConnectorPullPeriodSeconds how often should Resh daemon download history from Sync Connector
SyncConnectorPullPeriodSeconds int
} }
// defaults for config // defaults for config
@ -76,6 +84,8 @@ var defaults = Config{
Debug: false, Debug: false,
SessionWatchPeriodSeconds: 600, SessionWatchPeriodSeconds: 600,
ReshHistoryMinSize: 1000, ReshHistoryMinSize: 1000,
SyncConnectorPullPeriodSeconds: 60,
} }
const headerComment = `## const headerComment = `##
@ -179,6 +189,14 @@ func processAndFillDefaults(configF *configFile) (Config, error) {
config.SyncConnectorAddress = configF.SyncConnectorAddress config.SyncConnectorAddress = configF.SyncConnectorAddress
if configF.SyncConnectorAuthToken != nil {
config.SyncConnectorAuthToken = *configF.SyncConnectorAuthToken
}
if configF.SyncConnectorPullPeriodSeconds != nil {
config.SyncConnectorPullPeriodSeconds = *configF.SyncConnectorPullPeriodSeconds
}
return config, err return config, err
} }

@ -2,29 +2,42 @@ package histcli
import ( import (
"github.com/curusarn/resh/internal/recordint" "github.com/curusarn/resh/internal/recordint"
"sync"
) )
// Histcli is a dump of history preprocessed for resh cli purposes // Histcli is a dump of history preprocessed for resh cli purposes
type Histcli struct { type Histcli struct {
// list of records // list of records
List []recordint.SearchApp list []recordint.SearchApp
lock sync.RWMutex
} }
// New Histcli // New Histcli
func New() Histcli { func New() *Histcli {
return Histcli{} return &Histcli{}
} }
// AddRecord to the histcli // AddRecord to the histcli
func (h *Histcli) AddRecord(rec *recordint.Indexed) { func (h *Histcli) AddRecord(rec *recordint.Indexed) {
cli := recordint.NewSearchApp(rec) cli := recordint.NewSearchApp(rec)
h.lock.Lock()
defer h.lock.Unlock()
h.List = append(h.List, cli) h.list = append(h.list, cli)
} }
// AddCmdLine to the histcli // AddCmdLine to the histcli
func (h *Histcli) AddCmdLine(cmdline string) { func (h *Histcli) AddCmdLine(cmdline string) {
cli := recordint.NewSearchAppFromCmdLine(cmdline) cli := recordint.NewSearchAppFromCmdLine(cmdline)
h.lock.Lock()
defer h.lock.Unlock()
h.List = append(h.List, cli) h.list = append(h.list, cli)
}
func (h *Histcli) Dump() []recordint.SearchApp {
h.lock.RLock()
defer h.lock.RUnlock()
return h.list
} }

@ -30,7 +30,7 @@ type Histfile struct {
bashCmdLines histlist.Histlist bashCmdLines histlist.Histlist
zshCmdLines histlist.Histlist zshCmdLines histlist.Histlist
cliRecords histcli.Histcli cliRecords *histcli.Histcli
rio *recio.RecIO rio *recio.RecIO
} }
@ -39,7 +39,7 @@ type Histfile struct {
func New(sugar *zap.SugaredLogger, input chan recordint.Collect, sessionsToDrop chan string, func New(sugar *zap.SugaredLogger, input chan recordint.Collect, sessionsToDrop chan string,
reshHistoryPath string, bashHistoryPath string, zshHistoryPath string, reshHistoryPath string, bashHistoryPath string, zshHistoryPath string,
maxInitHistSize int, minInitHistSizeKB int, maxInitHistSize int, minInitHistSizeKB int,
signals chan os.Signal, shutdownDone chan string) *Histfile { signals chan os.Signal, shutdownDone chan string, histCli *histcli.Histcli) *Histfile {
rio := recio.New(sugar.With("module", "histfile")) rio := recio.New(sugar.With("module", "histfile"))
hf := Histfile{ hf := Histfile{
@ -48,7 +48,7 @@ func New(sugar *zap.SugaredLogger, input chan recordint.Collect, sessionsToDrop
historyPath: reshHistoryPath, historyPath: reshHistoryPath,
bashCmdLines: histlist.New(sugar), bashCmdLines: histlist.New(sugar),
zshCmdLines: histlist.New(sugar), zshCmdLines: histlist.New(sugar),
cliRecords: histcli.New(), cliRecords: histCli,
rio: &rio, rio: &rio,
} }
go hf.loadHistory(bashHistoryPath, zshHistoryPath, maxInitHistSize, minInitHistSizeKB) go hf.loadHistory(bashHistoryPath, zshHistoryPath, maxInitHistSize, minInitHistSizeKB)
@ -70,7 +70,7 @@ func (h *Histfile) loadCliRecords(recs []recordint.Indexed) {
h.cliRecords.AddRecord(&rec) h.cliRecords.AddRecord(&rec)
} }
h.sugar.Infow("Resh history loaded", h.sugar.Infow("Resh history loaded",
"historyRecordsCount", len(h.cliRecords.List), "historyRecordsCount", len(h.cliRecords.Dump()),
) )
} }
@ -255,12 +255,6 @@ func (h *Histfile) mergeAndWriteRecord(sugar *zap.SugaredLogger, part1 recordint
// } // }
// } // }
// DumpCliRecords returns enriched records
func (h *Histfile) DumpCliRecords() histcli.Histcli {
// don't forget locks in the future
return h.cliRecords
}
func loadCmdLines(sugar *zap.SugaredLogger, recs []recordint.Indexed) histlist.Histlist { func loadCmdLines(sugar *zap.SugaredLogger, recs []recordint.Indexed) histlist.Histlist {
hl := histlist.New(sugar) hl := histlist.New(sugar)
// go from bottom and deduplicate // go from bottom and deduplicate

@ -25,7 +25,7 @@ func New(sugar *zap.SugaredLogger, dataDir, deviceID string) *Histio {
sugarHistio := sugar.With(zap.String("component", "histio")) sugarHistio := sugar.With(zap.String("component", "histio"))
histDir := path.Join(dataDir, "history") histDir := path.Join(dataDir, "history")
currPath := path.Join(histDir, deviceID) currPath := path.Join(histDir, deviceID)
// TODO: file extenstion for the history, yes or no? (<id>.reshjson vs. <id>) // TODO: file extension for the history, yes or no? (<id>.reshjson vs. <id>)
// TODO: discover other history files, exclude current // TODO: discover other history files, exclude current

@ -35,7 +35,7 @@ func Copy(hl Histlist) Histlist {
// AddCmdLine to the histlist // AddCmdLine to the histlist
func (h *Histlist) AddCmdLine(cmdLine string) { func (h *Histlist) AddCmdLine(cmdLine string) {
// lenBefore := len(h.List) // lenBefore := len(h.list)
// lookup // lookup
idx, found := h.LastIndex[cmdLine] idx, found := h.LastIndex[cmdLine]
if found { if found {

@ -22,5 +22,5 @@ func LoadHistoryFromFile(sugar *zap.SugaredLogger, historyPath string, numLines
rec := recs[i] rec := recs[i]
cliRecords.AddRecord(&rec) cliRecords.AddRecord(&rec)
} }
return msg.CliResponse{Records: cliRecords.List} return msg.CliResponse{Records: cliRecords.Dump()}
} }

@ -0,0 +1,12 @@
package syncconnector
import "github.com/curusarn/resh/internal/record"
func (sc SyncConnector) getLatestRecord(machineId *string) (map[string]string, error) {
return map[string]string{}, nil
}
func (sc SyncConnector) downloadRecords(lastRecords map[string]string) ([]record.V1, error) {
var records []record.V1
return records, nil
}

@ -0,0 +1,54 @@
package syncconnector
import (
"github.com/curusarn/resh/internal/histcli"
"github.com/curusarn/resh/internal/record"
"github.com/curusarn/resh/internal/recordint"
"go.uber.org/zap"
"net/url"
"time"
)
type SyncConnector struct {
sugar *zap.SugaredLogger
address *url.URL
authToken string
history *histcli.Histcli
// TODO periodic push (or from the write channel)
// TODO push period
}
func New(sugar *zap.SugaredLogger, address string, authToken string, pullPeriodSeconds int, history *histcli.Histcli) (*SyncConnector, error) {
parsedAddress, err := url.Parse(address)
if err != nil {
return nil, err
}
sc := &SyncConnector{
sugar: sugar.With(zap.String("component", "syncConnector")),
authToken: authToken,
address: parsedAddress,
history: history,
}
// TODO: propagate signals
go func(sc *SyncConnector) {
for _ = range time.Tick(time.Second * time.Duration(pullPeriodSeconds)) {
sc.sugar.Infow("checking remote")
// Add fake record (this will be produced by the sync connector)
sc.history.AddRecord(&recordint.Indexed{
Rec: record.V1{
CmdLine: "__fake_test__",
DeviceID: "__test__",
},
})
}
}(sc)
return sc, nil
}

@ -0,0 +1,49 @@
package syncconnector
import (
"github.com/curusarn/resh/internal/recordint"
)
func (sc SyncConnector) write(collect chan recordint.Collect) {
//for {
// func() {
// select {
// case rec := <-collect:
// part := "2"
// if rec.Rec.PartOne {
// part = "1"
// }
// sugar := h.sugar.With(
// "recordCmdLine", rec.Rec.CmdLine,
// "recordPart", part,
// "recordShell", rec.Shell,
// )
// sc.sugar.Debugw("Got record")
// h.sessionsMutex.Lock()
// defer h.sessionsMutex.Unlock()
//
// // allows nested sessions to merge records properly
// mergeID := rec.SessionID + "_" + strconv.Itoa(rec.Shlvl)
// sugar = sc.sugar.With("mergeID", mergeID)
// if rec.Rec.PartOne {
// if _, found := h.sessions[mergeID]; found {
// msg := "Got another first part of the records before merging the previous one - overwriting!"
// if rec.Shell == "zsh" {
// sc.sugar.Warnw(msg)
// } else {
// sc.sugar.Infow(msg + " Unfortunately this is normal in bash, it can't be prevented.")
// }
// }
// h.sessions[mergeID] = rec
// } else {
// if part1, found := h.sessions[mergeID]; found == false {
// sc.sugar.Warnw("Got second part of record and nothing to merge it with - ignoring!")
// } else {
// delete(h.sessions, mergeID)
// go h.mergeAndWriteRecord(sugar, part1, rec)
// }
// }
// }
// }()
//}
}
Loading…
Cancel
Save