diff --git a/cmd/daemon/dump.go b/cmd/daemon/dump.go index b3a154f..c488996 100644 --- a/cmd/daemon/dump.go +++ b/cmd/daemon/dump.go @@ -2,17 +2,17 @@ package main import ( "encoding/json" + "github.com/curusarn/resh/internal/histcli" "io/ioutil" "net/http" - "github.com/curusarn/resh/internal/histfile" "github.com/curusarn/resh/internal/msg" "go.uber.org/zap" ) type dumpHandler struct { - sugar *zap.SugaredLogger - histfileBox *histfile.Histfile + sugar *zap.SugaredLogger + history *histcli.Histcli } func (h *dumpHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { @@ -35,12 +35,8 @@ func (h *dumpHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } sugar.Debugw("Getting records to send ...") - fullRecords := h.histfileBox.DumpCliRecords() - if err != nil { - sugar.Errorw("Error when getting records", "error", err) - } - resp := msg.CliResponse{Records: fullRecords.List} + resp := msg.CliResponse{Records: h.history.Dump()} jsn, err = json.Marshal(&resp) if err != nil { sugar.Errorw("Error when marshaling", "error", err) diff --git a/cmd/daemon/main.go b/cmd/daemon/main.go index 6a6feff..fec397f 100644 --- a/cmd/daemon/main.go +++ b/cmd/daemon/main.go @@ -126,7 +126,7 @@ func (d *daemon) killDaemon(pidfile string) error { "PIDFile", pidfile, "error", err) } - d.sugar.Infow("Succesfully read PID file", "contents", string(dat)) + d.sugar.Infow("Successfully read PID file", "contents", string(dat)) pid, err := strconv.Atoi(strings.TrimSuffix(string(dat), "\n")) if err != nil { return fmt.Errorf("could not parse PID file contents: %w", err) diff --git a/cmd/daemon/run-server.go b/cmd/daemon/run-server.go index afe77a1..fcf1828 100644 --- a/cmd/daemon/run-server.go +++ b/cmd/daemon/run-server.go @@ -1,6 +1,8 @@ package main import ( + "github.com/curusarn/resh/internal/histcli" + "github.com/curusarn/resh/internal/syncconnector" "net/http" "os" "strconv" @@ -33,6 +35,8 @@ func (s *Server) Run() { shutdown := make(chan string) + history := histcli.New() + // histfile histfileRecords := make(chan recordint.Collect) recordSubscribers = append(recordSubscribers, histfileRecords) @@ -42,13 +46,14 @@ func (s *Server) Run() { signalSubscribers = append(signalSubscribers, histfileSignals) maxHistSize := 10000 // lines minHistSizeKB := 2000 // roughly lines - histfileBox := histfile.New(s.sugar, histfileRecords, histfileSessionsToDrop, + histfile.New(s.sugar, histfileRecords, histfileSessionsToDrop, s.reshHistoryPath, s.bashHistoryPath, s.zshHistoryPath, maxHistSize, minHistSizeKB, - histfileSignals, shutdown) + histfileSignals, shutdown, history) // sesswatch sesswatchRecords := make(chan recordint.Collect) + // TODO: add sync connector subscriber recordSubscribers = append(recordSubscribers, sesswatchRecords) sesswatchSessionsToWatch := make(chan recordint.SessionInit) sessionInitSubscribers = append(sessionInitSubscribers, sesswatchSessionsToWatch) @@ -65,7 +70,7 @@ func (s *Server) Run() { mux.Handle("/status", &statusHandler{sugar: s.sugar}) mux.Handle("/record", &recordHandler{sugar: s.sugar, subscribers: recordSubscribers}) mux.Handle("/session_init", &sessionInitHandler{sugar: s.sugar, subscribers: sessionInitSubscribers}) - mux.Handle("/dump", &dumpHandler{sugar: s.sugar, histfileBox: histfileBox}) + mux.Handle("/dump", &dumpHandler{sugar: s.sugar, history: history}) server := &http.Server{ Addr: "localhost:" + strconv.Itoa(s.config.Port), @@ -77,6 +82,19 @@ func (s *Server) Run() { } go server.ListenAndServe() + s.sugar.Infow("", "sync_addr", s.config.SyncConnectorAddress) + if s.config.SyncConnectorAddress != nil { + sc, err := syncconnector.New(s.sugar, *s.config.SyncConnectorAddress, s.config.SyncConnectorAuthToken, s.config.SyncConnectorPullPeriodSeconds, history) + if err != nil { + s.sugar.Errorw("Sync Connector init failed", "error", err) + } else { + s.sugar.Infow("Initialized Sync Connector", "Sync Connector", sc) + } + // TODO: load sync connector data + // TODO: load sync connector data + // TODO: send connector data periodically (record by record / or batch) + } + // signalhandler - takes over the main goroutine so when signal handler exists the whole program exits signalhandler.Run(s.sugar, signalSubscribers, shutdown, server) } diff --git a/internal/cfg/cfg.go b/internal/cfg/cfg.go index 01ea46e..e9a47cb 100644 --- a/internal/cfg/cfg.go +++ b/internal/cfg/cfg.go @@ -30,7 +30,9 @@ type configFile struct { BindArrowKeysBash *bool BindArrowKeysZsh *bool - SyncConnectorAddress *string + SyncConnectorAddress *string + SyncConnectorAuthToken *string + SyncConnectorPullPeriodSeconds *int } // Config returned by this package to be used in the rest of the project @@ -65,6 +67,12 @@ type Config struct { // - https://domain.tld // - https://domain.tld/resh SyncConnectorAddress *string + + // SyncConnectorAuthToken used by the daemon to authenticate with the Sync Connector + SyncConnectorAuthToken string + + // SyncConnectorPullPeriodSeconds how often should Resh daemon download history from Sync Connector + SyncConnectorPullPeriodSeconds int } // defaults for config @@ -76,6 +84,8 @@ var defaults = Config{ Debug: false, SessionWatchPeriodSeconds: 600, ReshHistoryMinSize: 1000, + + SyncConnectorPullPeriodSeconds: 60, } const headerComment = `## @@ -179,6 +189,14 @@ func processAndFillDefaults(configF *configFile) (Config, error) { config.SyncConnectorAddress = configF.SyncConnectorAddress + if configF.SyncConnectorAuthToken != nil { + config.SyncConnectorAuthToken = *configF.SyncConnectorAuthToken + } + + if configF.SyncConnectorPullPeriodSeconds != nil { + config.SyncConnectorPullPeriodSeconds = *configF.SyncConnectorPullPeriodSeconds + } + return config, err } diff --git a/internal/histcli/histcli.go b/internal/histcli/histcli.go index 8069990..b032be5 100644 --- a/internal/histcli/histcli.go +++ b/internal/histcli/histcli.go @@ -2,29 +2,42 @@ package histcli import ( "github.com/curusarn/resh/internal/recordint" + "sync" ) // Histcli is a dump of history preprocessed for resh cli purposes type Histcli struct { // list of records - List []recordint.SearchApp + list []recordint.SearchApp + lock sync.RWMutex } // New Histcli -func New() Histcli { - return Histcli{} +func New() *Histcli { + return &Histcli{} } // AddRecord to the histcli func (h *Histcli) AddRecord(rec *recordint.Indexed) { cli := recordint.NewSearchApp(rec) + h.lock.Lock() + defer h.lock.Unlock() - h.List = append(h.List, cli) + h.list = append(h.list, cli) } // AddCmdLine to the histcli func (h *Histcli) AddCmdLine(cmdline string) { cli := recordint.NewSearchAppFromCmdLine(cmdline) + h.lock.Lock() + defer h.lock.Unlock() - h.List = append(h.List, cli) + h.list = append(h.list, cli) +} + +func (h *Histcli) Dump() []recordint.SearchApp { + h.lock.RLock() + defer h.lock.RUnlock() + + return h.list } diff --git a/internal/histfile/histfile.go b/internal/histfile/histfile.go index 33a2279..9b44fb3 100644 --- a/internal/histfile/histfile.go +++ b/internal/histfile/histfile.go @@ -30,7 +30,7 @@ type Histfile struct { bashCmdLines histlist.Histlist zshCmdLines histlist.Histlist - cliRecords histcli.Histcli + cliRecords *histcli.Histcli rio *recio.RecIO } @@ -39,7 +39,7 @@ type Histfile struct { func New(sugar *zap.SugaredLogger, input chan recordint.Collect, sessionsToDrop chan string, reshHistoryPath string, bashHistoryPath string, zshHistoryPath string, maxInitHistSize int, minInitHistSizeKB int, - signals chan os.Signal, shutdownDone chan string) *Histfile { + signals chan os.Signal, shutdownDone chan string, histCli *histcli.Histcli) *Histfile { rio := recio.New(sugar.With("module", "histfile")) hf := Histfile{ @@ -48,7 +48,7 @@ func New(sugar *zap.SugaredLogger, input chan recordint.Collect, sessionsToDrop historyPath: reshHistoryPath, bashCmdLines: histlist.New(sugar), zshCmdLines: histlist.New(sugar), - cliRecords: histcli.New(), + cliRecords: histCli, rio: &rio, } go hf.loadHistory(bashHistoryPath, zshHistoryPath, maxInitHistSize, minInitHistSizeKB) @@ -70,7 +70,7 @@ func (h *Histfile) loadCliRecords(recs []recordint.Indexed) { h.cliRecords.AddRecord(&rec) } h.sugar.Infow("Resh history loaded", - "historyRecordsCount", len(h.cliRecords.List), + "historyRecordsCount", len(h.cliRecords.Dump()), ) } @@ -255,12 +255,6 @@ func (h *Histfile) mergeAndWriteRecord(sugar *zap.SugaredLogger, part1 recordint // } // } -// DumpCliRecords returns enriched records -func (h *Histfile) DumpCliRecords() histcli.Histcli { - // don't forget locks in the future - return h.cliRecords -} - func loadCmdLines(sugar *zap.SugaredLogger, recs []recordint.Indexed) histlist.Histlist { hl := histlist.New(sugar) // go from bottom and deduplicate diff --git a/internal/histio/histio.go b/internal/histio/histio.go index d541f9d..b8486de 100644 --- a/internal/histio/histio.go +++ b/internal/histio/histio.go @@ -25,7 +25,7 @@ func New(sugar *zap.SugaredLogger, dataDir, deviceID string) *Histio { sugarHistio := sugar.With(zap.String("component", "histio")) histDir := path.Join(dataDir, "history") currPath := path.Join(histDir, deviceID) - // TODO: file extenstion for the history, yes or no? (.reshjson vs. ) + // TODO: file extension for the history, yes or no? (.reshjson vs. ) // TODO: discover other history files, exclude current diff --git a/internal/histlist/histlist.go b/internal/histlist/histlist.go index 7c80c7f..94376f8 100644 --- a/internal/histlist/histlist.go +++ b/internal/histlist/histlist.go @@ -35,7 +35,7 @@ func Copy(hl Histlist) Histlist { // AddCmdLine to the histlist func (h *Histlist) AddCmdLine(cmdLine string) { - // lenBefore := len(h.List) + // lenBefore := len(h.list) // lookup idx, found := h.LastIndex[cmdLine] if found { diff --git a/internal/searchapp/test.go b/internal/searchapp/test.go index 9a2d284..4ff2d02 100644 --- a/internal/searchapp/test.go +++ b/internal/searchapp/test.go @@ -22,5 +22,5 @@ func LoadHistoryFromFile(sugar *zap.SugaredLogger, historyPath string, numLines rec := recs[i] cliRecords.AddRecord(&rec) } - return msg.CliResponse{Records: cliRecords.List} + return msg.CliResponse{Records: cliRecords.Dump()} } diff --git a/internal/syncconnector/reader.go b/internal/syncconnector/reader.go new file mode 100644 index 0000000..6c0cc9f --- /dev/null +++ b/internal/syncconnector/reader.go @@ -0,0 +1,12 @@ +package syncconnector + +import "github.com/curusarn/resh/internal/record" + +func (sc SyncConnector) getLatestRecord(machineId *string) (map[string]string, error) { + return map[string]string{}, nil +} + +func (sc SyncConnector) downloadRecords(lastRecords map[string]string) ([]record.V1, error) { + var records []record.V1 + return records, nil +} diff --git a/internal/syncconnector/syncconnector.go b/internal/syncconnector/syncconnector.go new file mode 100644 index 0000000..f3ae344 --- /dev/null +++ b/internal/syncconnector/syncconnector.go @@ -0,0 +1,54 @@ +package syncconnector + +import ( + "github.com/curusarn/resh/internal/histcli" + "github.com/curusarn/resh/internal/record" + "github.com/curusarn/resh/internal/recordint" + "go.uber.org/zap" + "net/url" + "time" +) + +type SyncConnector struct { + sugar *zap.SugaredLogger + + address *url.URL + authToken string + + history *histcli.Histcli + + // TODO periodic push (or from the write channel) + // TODO push period +} + +func New(sugar *zap.SugaredLogger, address string, authToken string, pullPeriodSeconds int, history *histcli.Histcli) (*SyncConnector, error) { + parsedAddress, err := url.Parse(address) + if err != nil { + return nil, err + } + + sc := &SyncConnector{ + sugar: sugar.With(zap.String("component", "syncConnector")), + authToken: authToken, + address: parsedAddress, + history: history, + } + + // TODO: propagate signals + go func(sc *SyncConnector) { + for _ = range time.Tick(time.Second * time.Duration(pullPeriodSeconds)) { + sc.sugar.Infow("checking remote") + + // Add fake record (this will be produced by the sync connector) + sc.history.AddRecord(&recordint.Indexed{ + Rec: record.V1{ + CmdLine: "__fake_test__", + DeviceID: "__test__", + }, + }) + + } + }(sc) + + return sc, nil +} diff --git a/internal/syncconnector/writer.go b/internal/syncconnector/writer.go new file mode 100644 index 0000000..a341b94 --- /dev/null +++ b/internal/syncconnector/writer.go @@ -0,0 +1,49 @@ +package syncconnector + +import ( + "github.com/curusarn/resh/internal/recordint" +) + +func (sc SyncConnector) write(collect chan recordint.Collect) { + //for { + // func() { + // select { + // case rec := <-collect: + // part := "2" + // if rec.Rec.PartOne { + // part = "1" + // } + // sugar := h.sugar.With( + // "recordCmdLine", rec.Rec.CmdLine, + // "recordPart", part, + // "recordShell", rec.Shell, + // ) + // sc.sugar.Debugw("Got record") + // h.sessionsMutex.Lock() + // defer h.sessionsMutex.Unlock() + // + // // allows nested sessions to merge records properly + // mergeID := rec.SessionID + "_" + strconv.Itoa(rec.Shlvl) + // sugar = sc.sugar.With("mergeID", mergeID) + // if rec.Rec.PartOne { + // if _, found := h.sessions[mergeID]; found { + // msg := "Got another first part of the records before merging the previous one - overwriting!" + // if rec.Shell == "zsh" { + // sc.sugar.Warnw(msg) + // } else { + // sc.sugar.Infow(msg + " Unfortunately this is normal in bash, it can't be prevented.") + // } + // } + // h.sessions[mergeID] = rec + // } else { + // if part1, found := h.sessions[mergeID]; found == false { + // sc.sugar.Warnw("Got second part of record and nothing to merge it with - ignoring!") + // } else { + // delete(h.sessions, mergeID) + // go h.mergeAndWriteRecord(sugar, part1, rec) + // } + // } + // } + // }() + //} +}