From 414a42f767509ccfb9657911b630463f9798ad3a Mon Sep 17 00:00:00 2001 From: Vit Listik Date: Mon, 21 Nov 2022 04:05:40 +0100 Subject: [PATCH] implement synchronization --- cmd/daemon/run-server.go | 2 +- internal/cfg/cfg.go | 11 ++- internal/histcli/histcli.go | 13 ++- internal/histfile/histfile.go | 16 ++-- internal/syncconnector/reader.go | 64 +++++++++++-- internal/syncconnector/syncconnector.go | 20 +++-- internal/syncconnector/writer.go | 114 +++++++++++++++--------- 7 files changed, 173 insertions(+), 67 deletions(-) diff --git a/cmd/daemon/run-server.go b/cmd/daemon/run-server.go index af2abae..eeb080f 100644 --- a/cmd/daemon/run-server.go +++ b/cmd/daemon/run-server.go @@ -84,7 +84,7 @@ func (s *Server) Run() { s.sugar.Infow("", "sync_addr", s.config.SyncConnectorAddress) if s.config.SyncConnectorAddress != nil { - sc, err := syncconnector.New(s.sugar, *s.config.SyncConnectorAddress, s.config.SyncConnectorAuthToken, s.config.SyncConnectorPullPeriodSeconds, history) + sc, err := syncconnector.New(s.sugar, *s.config.SyncConnectorAddress, s.config.SyncConnectorAuthToken, s.config.SyncConnectorPullPeriodSeconds, s.config.SyncConnectorSendPeriodSeconds, history) if err != nil { s.sugar.Errorw("Sync Connector init failed", "error", err) } else { diff --git a/internal/cfg/cfg.go b/internal/cfg/cfg.go index 6193320..25a0a8f 100644 --- a/internal/cfg/cfg.go +++ b/internal/cfg/cfg.go @@ -33,6 +33,7 @@ type configFile struct { SyncConnectorAddress *string SyncConnectorAuthToken *string SyncConnectorPullPeriodSeconds *int + SyncConnectorSendPeriodSeconds *int } // Config returned by this package to be used in the rest of the project @@ -73,6 +74,9 @@ type Config struct { // SyncConnectorPullPeriodSeconds how often should Resh daemon download history from Sync Connector SyncConnectorPullPeriodSeconds int + + // SyncConnectorSendPeriodSeconds how often should Resh daemon send history to the Sync Connector + SyncConnectorSendPeriodSeconds int } // defaults for config @@ -85,7 +89,8 @@ var defaults = Config{ SessionWatchPeriodSeconds: 600, ReshHistoryMinSize: 1000, - SyncConnectorPullPeriodSeconds: 60, + SyncConnectorPullPeriodSeconds: 30, + SyncConnectorSendPeriodSeconds: 30, } const headerComment = `## @@ -199,6 +204,10 @@ func processAndFillDefaults(configF *configFile) (Config, error) { config.SyncConnectorPullPeriodSeconds = *configF.SyncConnectorPullPeriodSeconds } + if configF.SyncConnectorSendPeriodSeconds != nil { + config.SyncConnectorSendPeriodSeconds = *configF.SyncConnectorSendPeriodSeconds + } + return config, err } diff --git a/internal/histcli/histcli.go b/internal/histcli/histcli.go index 175eadc..6f889a8 100644 --- a/internal/histcli/histcli.go +++ b/internal/histcli/histcli.go @@ -2,6 +2,7 @@ package histcli import ( "github.com/curusarn/resh/internal/recordint" + "github.com/curusarn/resh/record" "go.uber.org/zap" "sync" ) @@ -9,7 +10,9 @@ import ( // Histcli is a dump of history preprocessed for resh cli purposes type Histcli struct { // list of records - list []recordint.SearchApp + list []recordint.SearchApp + // TODO It is not optimal to keep both raw and list but is necessary for syncConnector now + raw []record.V1 knownIds map[string]struct{} lock sync.RWMutex sugar *zap.SugaredLogger @@ -34,6 +37,7 @@ func (h *Histcli) AddRecord(rec *recordint.Indexed) { if _, ok := h.knownIds[rec.Rec.RecordID]; !ok { h.knownIds[rec.Rec.RecordID] = struct{}{} h.list = append(h.list, cli) + h.raw = append(h.raw, rec.Rec) h.updateLatestPerDevice(cli) } else { h.sugar.Debugw("Record is already present", "id", rec.Rec.RecordID) @@ -56,6 +60,13 @@ func (h *Histcli) Dump() []recordint.SearchApp { return h.list } +func (h *Histcli) DumpRaw() []record.V1 { + h.lock.RLock() + defer h.lock.RUnlock() + + return h.raw +} + // updateLatestPerDevice should be called only with write lock because it does not lock on its own. func (h *Histcli) updateLatestPerDevice(rec recordint.SearchApp) { if l, ok := h.latest[rec.DeviceID]; ok { diff --git a/internal/histfile/histfile.go b/internal/histfile/histfile.go index 9b44fb3..032a398 100644 --- a/internal/histfile/histfile.go +++ b/internal/histfile/histfile.go @@ -218,15 +218,13 @@ func (h *Histfile) mergeAndWriteRecord(sugar *zap.SugaredLogger, part1 recordint return } - func() { - cmdLine := rec.CmdLine - h.bashCmdLines.AddCmdLine(cmdLine) - h.zshCmdLines.AddCmdLine(cmdLine) - h.cliRecords.AddRecord(&recordint.Indexed{ - // TODO: is this what we want? - Rec: rec, - }) - }() + cmdLine := rec.CmdLine + h.bashCmdLines.AddCmdLine(cmdLine) + h.zshCmdLines.AddCmdLine(cmdLine) + h.cliRecords.AddRecord(&recordint.Indexed{ + // TODO: is this what we want? + Rec: rec, + }) h.rio.AppendToFile(h.historyPath, []record.V1{rec}) } diff --git a/internal/syncconnector/reader.go b/internal/syncconnector/reader.go index a724d35..729a6d7 100644 --- a/internal/syncconnector/reader.go +++ b/internal/syncconnector/reader.go @@ -6,8 +6,8 @@ import ( "fmt" "github.com/curusarn/resh/record" "io" - "log" "net/http" + "strconv" "time" ) @@ -33,10 +33,10 @@ func (sc SyncConnector) downloadRecords(lastRecords map[string]float64) ([]recor sc.sugar.Errorw("converting latest to JSON failed", "err", err) return nil, err } - responseBody := bytes.NewBuffer(latestJson) + reqBody := bytes.NewBuffer(latestJson) address := sc.getAddressWithPath(historyEndpoint) - resp, err := client.Post(address, "application/json", responseBody) + resp, err := client.Post(address, "application/json", reqBody) if err != nil { sc.sugar.Errorw("history request failed", "address", address, "err", err) return nil, err @@ -50,7 +50,7 @@ func (sc SyncConnector) downloadRecords(lastRecords map[string]float64) ([]recor }(resp.Body) body, err := io.ReadAll(resp.Body) if err != nil { - log.Fatalln(err) + sc.sugar.Warnw("reading response body failed", "err", err) } err = json.Unmarshal(body, &records) @@ -62,7 +62,57 @@ func (sc SyncConnector) downloadRecords(lastRecords map[string]float64) ([]recor return records, nil } -func latest() { - //curl localhost:8080/latest -X POST -d '[]' - //curl localhost:8080/latest -X POST -d '["one"]' +func (sc SyncConnector) latest() (map[string]float64, error) { + var knownDevices []string + for deviceId, _ := range sc.history.LatestRecordsPerDevice() { + knownDevices = append(knownDevices, deviceId) + } + + client := http.Client{ + Timeout: 3 * time.Second, + } + + knownJson, err := json.Marshal(knownDevices) + if err != nil { + sc.sugar.Errorw("converting latest to JSON failed", "err", err) + return nil, err + } + reqBody := bytes.NewBuffer(knownJson) + + address := sc.getAddressWithPath(latestEndpoint) + resp, err := client.Post(address, "application/json", reqBody) + if err != nil { + sc.sugar.Errorw("latest request failed", "address", address, "err", err) + return nil, err + } + + defer func(Body io.ReadCloser) { + err := Body.Close() + if err != nil { + sc.sugar.Errorw("reader close failed", "err", err) + } + }(resp.Body) + body, err := io.ReadAll(resp.Body) + if err != nil { + sc.sugar.Warnw("reading response body failed", "err", err) + } + + latest := map[string]string{} + + err = json.Unmarshal(body, &latest) + if err != nil { + sc.sugar.Errorw("Unmarshalling failed", "err", err) + return nil, err + } + + l := make(map[string]float64, len(latest)) + for deviceId, ts := range latest { + t, err := strconv.ParseFloat(ts, 64) + if err != nil { + return nil, err + } + l[deviceId] = t + } + + return l, nil } diff --git a/internal/syncconnector/syncconnector.go b/internal/syncconnector/syncconnector.go index 917f66e..f1df3e0 100644 --- a/internal/syncconnector/syncconnector.go +++ b/internal/syncconnector/syncconnector.go @@ -20,12 +20,9 @@ type SyncConnector struct { authToken string history *histcli.Histcli - - // TODO periodic push (or from the write channel) - // TODO push period } -func New(sugar *zap.SugaredLogger, address string, authToken string, pullPeriodSeconds int, history *histcli.Histcli) (*SyncConnector, error) { +func New(sugar *zap.SugaredLogger, address string, authToken string, pullPeriodSeconds int, sendPeriodSeconds int, history *histcli.Histcli) (*SyncConnector, error) { parsedAddress, err := url.Parse(address) if err != nil { return nil, err @@ -41,7 +38,7 @@ func New(sugar *zap.SugaredLogger, address string, authToken string, pullPeriodS // TODO: propagate signals go func(sc *SyncConnector) { for _ = range time.Tick(time.Second * time.Duration(pullPeriodSeconds)) { - sc.sugar.Debug("checking remote") + sc.sugar.Debug("checking remote for new records") recs, err := sc.downloadRecords(sc.history.LatestRecordsPerDevice()) if err != nil { @@ -59,6 +56,19 @@ func New(sugar *zap.SugaredLogger, address string, authToken string, pullPeriodS } }(sc) + go func(sc *SyncConnector) { + // wait to properly load all the records + time.Sleep(time.Second * time.Duration(sendPeriodSeconds)) + for _ = range time.Tick(time.Second * time.Duration(sendPeriodSeconds)) { + sc.sugar.Debug("syncing local records to the remote") + + err := sc.write() + if err != nil { + sc.sugar.Warnw("sending records to the remote failed", "err", err) + } + } + }(sc) + return sc, nil } diff --git a/internal/syncconnector/writer.go b/internal/syncconnector/writer.go index a341b94..cd935cc 100644 --- a/internal/syncconnector/writer.go +++ b/internal/syncconnector/writer.go @@ -1,49 +1,77 @@ package syncconnector import ( - "github.com/curusarn/resh/internal/recordint" + "bytes" + "encoding/json" + "github.com/curusarn/resh/record" + "io" + "net/http" + "strconv" + "time" ) -func (sc SyncConnector) write(collect chan recordint.Collect) { - //for { - // func() { - // select { - // case rec := <-collect: - // part := "2" - // if rec.Rec.PartOne { - // part = "1" - // } - // sugar := h.sugar.With( - // "recordCmdLine", rec.Rec.CmdLine, - // "recordPart", part, - // "recordShell", rec.Shell, - // ) - // sc.sugar.Debugw("Got record") - // h.sessionsMutex.Lock() - // defer h.sessionsMutex.Unlock() - // - // // allows nested sessions to merge records properly - // mergeID := rec.SessionID + "_" + strconv.Itoa(rec.Shlvl) - // sugar = sc.sugar.With("mergeID", mergeID) - // if rec.Rec.PartOne { - // if _, found := h.sessions[mergeID]; found { - // msg := "Got another first part of the records before merging the previous one - overwriting!" - // if rec.Shell == "zsh" { - // sc.sugar.Warnw(msg) - // } else { - // sc.sugar.Infow(msg + " Unfortunately this is normal in bash, it can't be prevented.") - // } - // } - // h.sessions[mergeID] = rec - // } else { - // if part1, found := h.sessions[mergeID]; found == false { - // sc.sugar.Warnw("Got second part of record and nothing to merge it with - ignoring!") - // } else { - // delete(h.sessions, mergeID) - // go h.mergeAndWriteRecord(sugar, part1, rec) - // } - // } - // } - // }() - //} +func (sc SyncConnector) write() error { + latestRemote, err := sc.latest() + if err != nil { + return err + } + latestLocal := sc.history.LatestRecordsPerDevice() + remoteIsOlder := false + for deviceId, lastLocal := range latestLocal { + if lastRemote, ok := latestRemote[deviceId]; !ok { + // Unknown deviceId on the remote - add records have to be sent + remoteIsOlder = true + break + } else if lastLocal > lastRemote { + remoteIsOlder = true + break + } + } + if !remoteIsOlder { + sc.sugar.Debug("No need to sync remote, there are no newer local records") + return nil + } + var toSend []record.V1 + for _, r := range sc.history.DumpRaw() { + t, err := strconv.ParseFloat(r.Time, 64) + if err != nil { + sc.sugar.Warnw("Invalid time for record - skipping", "time", r.Time) + continue + } + l, ok := latestRemote[r.DeviceID] + if ok && l >= t { + continue + } + sc.sugar.Infow("record is newer", "new", t, "old", l, "id", r.RecordID, "deviceid", r.DeviceID) + toSend = append(toSend, r) + } + + client := http.Client{ + Timeout: 3 * time.Second, + } + + toSendJson, err := json.Marshal(toSend) + if err != nil { + sc.sugar.Errorw("converting toSend to JSON failed", "err", err) + return err + } + reqBody := bytes.NewBuffer(toSendJson) + + address := sc.getAddressWithPath(storeEndpoint) + resp, err := client.Post(address, "application/json", reqBody) + if err != nil { + sc.sugar.Errorw("store request failed", "address", address, "err", err) + return err + } + + defer func(Body io.ReadCloser) { + err := Body.Close() + if err != nil { + sc.sugar.Errorw("reader close failed", "err", err) + } + }(resp.Body) + + sc.sugar.Debugw("store call", "status", resp.Status) + + return nil }