implement synchronization

pull/179/head
Vit Listik 3 years ago
parent 12f106dcb7
commit 414a42f767
No known key found for this signature in database
GPG Key ID: DDDF5138DA46D76B
  1. 2
      cmd/daemon/run-server.go
  2. 11
      internal/cfg/cfg.go
  3. 11
      internal/histcli/histcli.go
  4. 2
      internal/histfile/histfile.go
  5. 64
      internal/syncconnector/reader.go
  6. 20
      internal/syncconnector/syncconnector.go
  7. 114
      internal/syncconnector/writer.go

@ -84,7 +84,7 @@ func (s *Server) Run() {
s.sugar.Infow("", "sync_addr", s.config.SyncConnectorAddress) s.sugar.Infow("", "sync_addr", s.config.SyncConnectorAddress)
if s.config.SyncConnectorAddress != nil { if s.config.SyncConnectorAddress != nil {
sc, err := syncconnector.New(s.sugar, *s.config.SyncConnectorAddress, s.config.SyncConnectorAuthToken, s.config.SyncConnectorPullPeriodSeconds, history) sc, err := syncconnector.New(s.sugar, *s.config.SyncConnectorAddress, s.config.SyncConnectorAuthToken, s.config.SyncConnectorPullPeriodSeconds, s.config.SyncConnectorSendPeriodSeconds, history)
if err != nil { if err != nil {
s.sugar.Errorw("Sync Connector init failed", "error", err) s.sugar.Errorw("Sync Connector init failed", "error", err)
} else { } else {

@ -33,6 +33,7 @@ type configFile struct {
SyncConnectorAddress *string SyncConnectorAddress *string
SyncConnectorAuthToken *string SyncConnectorAuthToken *string
SyncConnectorPullPeriodSeconds *int SyncConnectorPullPeriodSeconds *int
SyncConnectorSendPeriodSeconds *int
} }
// Config returned by this package to be used in the rest of the project // Config returned by this package to be used in the rest of the project
@ -73,6 +74,9 @@ type Config struct {
// SyncConnectorPullPeriodSeconds how often should Resh daemon download history from Sync Connector // SyncConnectorPullPeriodSeconds how often should Resh daemon download history from Sync Connector
SyncConnectorPullPeriodSeconds int SyncConnectorPullPeriodSeconds int
// SyncConnectorSendPeriodSeconds how often should Resh daemon send history to the Sync Connector
SyncConnectorSendPeriodSeconds int
} }
// defaults for config // defaults for config
@ -85,7 +89,8 @@ var defaults = Config{
SessionWatchPeriodSeconds: 600, SessionWatchPeriodSeconds: 600,
ReshHistoryMinSize: 1000, ReshHistoryMinSize: 1000,
SyncConnectorPullPeriodSeconds: 60, SyncConnectorPullPeriodSeconds: 30,
SyncConnectorSendPeriodSeconds: 30,
} }
const headerComment = `## const headerComment = `##
@ -199,6 +204,10 @@ func processAndFillDefaults(configF *configFile) (Config, error) {
config.SyncConnectorPullPeriodSeconds = *configF.SyncConnectorPullPeriodSeconds config.SyncConnectorPullPeriodSeconds = *configF.SyncConnectorPullPeriodSeconds
} }
if configF.SyncConnectorSendPeriodSeconds != nil {
config.SyncConnectorSendPeriodSeconds = *configF.SyncConnectorSendPeriodSeconds
}
return config, err return config, err
} }

@ -2,6 +2,7 @@ package histcli
import ( import (
"github.com/curusarn/resh/internal/recordint" "github.com/curusarn/resh/internal/recordint"
"github.com/curusarn/resh/record"
"go.uber.org/zap" "go.uber.org/zap"
"sync" "sync"
) )
@ -10,6 +11,8 @@ import (
type Histcli struct { type Histcli struct {
// list of records // list of records
list []recordint.SearchApp list []recordint.SearchApp
// TODO It is not optimal to keep both raw and list but is necessary for syncConnector now
raw []record.V1
knownIds map[string]struct{} knownIds map[string]struct{}
lock sync.RWMutex lock sync.RWMutex
sugar *zap.SugaredLogger sugar *zap.SugaredLogger
@ -34,6 +37,7 @@ func (h *Histcli) AddRecord(rec *recordint.Indexed) {
if _, ok := h.knownIds[rec.Rec.RecordID]; !ok { if _, ok := h.knownIds[rec.Rec.RecordID]; !ok {
h.knownIds[rec.Rec.RecordID] = struct{}{} h.knownIds[rec.Rec.RecordID] = struct{}{}
h.list = append(h.list, cli) h.list = append(h.list, cli)
h.raw = append(h.raw, rec.Rec)
h.updateLatestPerDevice(cli) h.updateLatestPerDevice(cli)
} else { } else {
h.sugar.Debugw("Record is already present", "id", rec.Rec.RecordID) h.sugar.Debugw("Record is already present", "id", rec.Rec.RecordID)
@ -56,6 +60,13 @@ func (h *Histcli) Dump() []recordint.SearchApp {
return h.list return h.list
} }
func (h *Histcli) DumpRaw() []record.V1 {
h.lock.RLock()
defer h.lock.RUnlock()
return h.raw
}
// updateLatestPerDevice should be called only with write lock because it does not lock on its own. // updateLatestPerDevice should be called only with write lock because it does not lock on its own.
func (h *Histcli) updateLatestPerDevice(rec recordint.SearchApp) { func (h *Histcli) updateLatestPerDevice(rec recordint.SearchApp) {
if l, ok := h.latest[rec.DeviceID]; ok { if l, ok := h.latest[rec.DeviceID]; ok {

@ -218,7 +218,6 @@ func (h *Histfile) mergeAndWriteRecord(sugar *zap.SugaredLogger, part1 recordint
return return
} }
func() {
cmdLine := rec.CmdLine cmdLine := rec.CmdLine
h.bashCmdLines.AddCmdLine(cmdLine) h.bashCmdLines.AddCmdLine(cmdLine)
h.zshCmdLines.AddCmdLine(cmdLine) h.zshCmdLines.AddCmdLine(cmdLine)
@ -226,7 +225,6 @@ func (h *Histfile) mergeAndWriteRecord(sugar *zap.SugaredLogger, part1 recordint
// TODO: is this what we want? // TODO: is this what we want?
Rec: rec, Rec: rec,
}) })
}()
h.rio.AppendToFile(h.historyPath, []record.V1{rec}) h.rio.AppendToFile(h.historyPath, []record.V1{rec})
} }

@ -6,8 +6,8 @@ import (
"fmt" "fmt"
"github.com/curusarn/resh/record" "github.com/curusarn/resh/record"
"io" "io"
"log"
"net/http" "net/http"
"strconv"
"time" "time"
) )
@ -33,10 +33,10 @@ func (sc SyncConnector) downloadRecords(lastRecords map[string]float64) ([]recor
sc.sugar.Errorw("converting latest to JSON failed", "err", err) sc.sugar.Errorw("converting latest to JSON failed", "err", err)
return nil, err return nil, err
} }
responseBody := bytes.NewBuffer(latestJson) reqBody := bytes.NewBuffer(latestJson)
address := sc.getAddressWithPath(historyEndpoint) address := sc.getAddressWithPath(historyEndpoint)
resp, err := client.Post(address, "application/json", responseBody) resp, err := client.Post(address, "application/json", reqBody)
if err != nil { if err != nil {
sc.sugar.Errorw("history request failed", "address", address, "err", err) sc.sugar.Errorw("history request failed", "address", address, "err", err)
return nil, err return nil, err
@ -50,7 +50,7 @@ func (sc SyncConnector) downloadRecords(lastRecords map[string]float64) ([]recor
}(resp.Body) }(resp.Body)
body, err := io.ReadAll(resp.Body) body, err := io.ReadAll(resp.Body)
if err != nil { if err != nil {
log.Fatalln(err) sc.sugar.Warnw("reading response body failed", "err", err)
} }
err = json.Unmarshal(body, &records) err = json.Unmarshal(body, &records)
@ -62,7 +62,57 @@ func (sc SyncConnector) downloadRecords(lastRecords map[string]float64) ([]recor
return records, nil return records, nil
} }
func latest() { func (sc SyncConnector) latest() (map[string]float64, error) {
//curl localhost:8080/latest -X POST -d '[]' var knownDevices []string
//curl localhost:8080/latest -X POST -d '["one"]' for deviceId, _ := range sc.history.LatestRecordsPerDevice() {
knownDevices = append(knownDevices, deviceId)
}
client := http.Client{
Timeout: 3 * time.Second,
}
knownJson, err := json.Marshal(knownDevices)
if err != nil {
sc.sugar.Errorw("converting latest to JSON failed", "err", err)
return nil, err
}
reqBody := bytes.NewBuffer(knownJson)
address := sc.getAddressWithPath(latestEndpoint)
resp, err := client.Post(address, "application/json", reqBody)
if err != nil {
sc.sugar.Errorw("latest request failed", "address", address, "err", err)
return nil, err
}
defer func(Body io.ReadCloser) {
err := Body.Close()
if err != nil {
sc.sugar.Errorw("reader close failed", "err", err)
}
}(resp.Body)
body, err := io.ReadAll(resp.Body)
if err != nil {
sc.sugar.Warnw("reading response body failed", "err", err)
}
latest := map[string]string{}
err = json.Unmarshal(body, &latest)
if err != nil {
sc.sugar.Errorw("Unmarshalling failed", "err", err)
return nil, err
}
l := make(map[string]float64, len(latest))
for deviceId, ts := range latest {
t, err := strconv.ParseFloat(ts, 64)
if err != nil {
return nil, err
}
l[deviceId] = t
}
return l, nil
} }

@ -20,12 +20,9 @@ type SyncConnector struct {
authToken string authToken string
history *histcli.Histcli history *histcli.Histcli
// TODO periodic push (or from the write channel)
// TODO push period
} }
func New(sugar *zap.SugaredLogger, address string, authToken string, pullPeriodSeconds int, history *histcli.Histcli) (*SyncConnector, error) { func New(sugar *zap.SugaredLogger, address string, authToken string, pullPeriodSeconds int, sendPeriodSeconds int, history *histcli.Histcli) (*SyncConnector, error) {
parsedAddress, err := url.Parse(address) parsedAddress, err := url.Parse(address)
if err != nil { if err != nil {
return nil, err return nil, err
@ -41,7 +38,7 @@ func New(sugar *zap.SugaredLogger, address string, authToken string, pullPeriodS
// TODO: propagate signals // TODO: propagate signals
go func(sc *SyncConnector) { go func(sc *SyncConnector) {
for _ = range time.Tick(time.Second * time.Duration(pullPeriodSeconds)) { for _ = range time.Tick(time.Second * time.Duration(pullPeriodSeconds)) {
sc.sugar.Debug("checking remote") sc.sugar.Debug("checking remote for new records")
recs, err := sc.downloadRecords(sc.history.LatestRecordsPerDevice()) recs, err := sc.downloadRecords(sc.history.LatestRecordsPerDevice())
if err != nil { if err != nil {
@ -59,6 +56,19 @@ func New(sugar *zap.SugaredLogger, address string, authToken string, pullPeriodS
} }
}(sc) }(sc)
go func(sc *SyncConnector) {
// wait to properly load all the records
time.Sleep(time.Second * time.Duration(sendPeriodSeconds))
for _ = range time.Tick(time.Second * time.Duration(sendPeriodSeconds)) {
sc.sugar.Debug("syncing local records to the remote")
err := sc.write()
if err != nil {
sc.sugar.Warnw("sending records to the remote failed", "err", err)
}
}
}(sc)
return sc, nil return sc, nil
} }

@ -1,49 +1,77 @@
package syncconnector package syncconnector
import ( import (
"github.com/curusarn/resh/internal/recordint" "bytes"
"encoding/json"
"github.com/curusarn/resh/record"
"io"
"net/http"
"strconv"
"time"
) )
func (sc SyncConnector) write(collect chan recordint.Collect) { func (sc SyncConnector) write() error {
//for { latestRemote, err := sc.latest()
// func() { if err != nil {
// select { return err
// case rec := <-collect: }
// part := "2" latestLocal := sc.history.LatestRecordsPerDevice()
// if rec.Rec.PartOne { remoteIsOlder := false
// part = "1" for deviceId, lastLocal := range latestLocal {
// } if lastRemote, ok := latestRemote[deviceId]; !ok {
// sugar := h.sugar.With( // Unknown deviceId on the remote - add records have to be sent
// "recordCmdLine", rec.Rec.CmdLine, remoteIsOlder = true
// "recordPart", part, break
// "recordShell", rec.Shell, } else if lastLocal > lastRemote {
// ) remoteIsOlder = true
// sc.sugar.Debugw("Got record") break
// h.sessionsMutex.Lock() }
// defer h.sessionsMutex.Unlock() }
// if !remoteIsOlder {
// // allows nested sessions to merge records properly sc.sugar.Debug("No need to sync remote, there are no newer local records")
// mergeID := rec.SessionID + "_" + strconv.Itoa(rec.Shlvl) return nil
// sugar = sc.sugar.With("mergeID", mergeID) }
// if rec.Rec.PartOne { var toSend []record.V1
// if _, found := h.sessions[mergeID]; found { for _, r := range sc.history.DumpRaw() {
// msg := "Got another first part of the records before merging the previous one - overwriting!" t, err := strconv.ParseFloat(r.Time, 64)
// if rec.Shell == "zsh" { if err != nil {
// sc.sugar.Warnw(msg) sc.sugar.Warnw("Invalid time for record - skipping", "time", r.Time)
// } else { continue
// sc.sugar.Infow(msg + " Unfortunately this is normal in bash, it can't be prevented.") }
// } l, ok := latestRemote[r.DeviceID]
// } if ok && l >= t {
// h.sessions[mergeID] = rec continue
// } else { }
// if part1, found := h.sessions[mergeID]; found == false { sc.sugar.Infow("record is newer", "new", t, "old", l, "id", r.RecordID, "deviceid", r.DeviceID)
// sc.sugar.Warnw("Got second part of record and nothing to merge it with - ignoring!") toSend = append(toSend, r)
// } else { }
// delete(h.sessions, mergeID)
// go h.mergeAndWriteRecord(sugar, part1, rec) client := http.Client{
// } Timeout: 3 * time.Second,
// } }
// }
// }() toSendJson, err := json.Marshal(toSend)
//} if err != nil {
sc.sugar.Errorw("converting toSend to JSON failed", "err", err)
return err
}
reqBody := bytes.NewBuffer(toSendJson)
address := sc.getAddressWithPath(storeEndpoint)
resp, err := client.Post(address, "application/json", reqBody)
if err != nil {
sc.sugar.Errorw("store request failed", "address", address, "err", err)
return err
}
defer func(Body io.ReadCloser) {
err := Body.Close()
if err != nil {
sc.sugar.Errorw("reader close failed", "err", err)
}
}(resp.Body)
sc.sugar.Debugw("store call", "status", resp.Status)
return nil
} }

Loading…
Cancel
Save