@ -9,31 +9,49 @@ import (
"github.com/curusarn/resh/pkg/records"
"github.com/curusarn/resh/pkg/records"
)
)
type histfile struct {
// Histfile writes records to histfile
mutex sync . Mutex
type Histfile struct {
sessionsMutex sync . Mutex
sessions map [ string ] records . Record
sessions map [ string ] records . Record
outputPath string
historyPath string
recentMutex sync . Mutex
recentRecords [ ] records . Record
recentCmdLines [ ] string // deduplicated
cmdLinesLastIndex map [ string ] int
}
}
// Go creates histfile and runs two gorutines on it
// New creates new histfile and runs two gorutines on it
func Go ( input chan records . Record , outputPath string , sessionsToDrop chan string ) {
func New ( input chan records . Record , historyPath string , initHistSize int , sessionsToDrop chan string ) * Histfile {
hf := histfile { sessions : map [ string ] records . Record { } , outputPath : outputPath }
hf := Histfile {
sessions : map [ string ] records . Record { } ,
historyPath : historyPath ,
cmdLinesLastIndex : map [ string ] int { } ,
}
go hf . loadHistory ( initHistSize )
go hf . writer ( input )
go hf . writer ( input )
go hf . sessionGC ( sessionsToDrop )
go hf . sessionGC ( sessionsToDrop )
return & hf
}
func ( h * Histfile ) loadHistory ( initHistSize int ) {
h . recentMutex . Lock ( )
defer h . recentMutex . Unlock ( )
h . recentCmdLines = records . LoadCmdLinesFromFile ( h . historyPath , initHistSize )
}
}
// sessionGC reads sessionIDs from channel and deletes them from histfile struct
// sessionGC reads sessionIDs from channel and deletes them from histfile struct
func ( h * histfile ) sessionGC ( sessionsToDrop chan string ) {
func ( h * H istfile) sessionGC ( sessionsToDrop chan string ) {
for {
for {
func ( ) {
func ( ) {
session := <- sessionsToDrop
session := <- sessionsToDrop
log . Println ( "histfile: got session to drop" , session )
log . Println ( "histfile: got session to drop" , session )
h . m utex. Lock ( )
h . sessionsM utex. Lock ( )
defer h . m utex. Unlock ( )
defer h . sessionsM utex. Unlock ( )
if part1 , found := h . sessions [ session ] ; found == true {
if part1 , found := h . sessions [ session ] ; found == true {
log . Println ( "histfile: Dropping session:" , session )
log . Println ( "histfile: Dropping session:" , session )
delete ( h . sessions , session )
delete ( h . sessions , session )
go writeRecord ( part1 , h . output Path)
go writeRecord ( part1 , h . history Path)
} else {
} else {
log . Println ( "histfile: No hanging parts for session:" , session )
log . Println ( "histfile: No hanging parts for session:" , session )
}
}
@ -42,38 +60,52 @@ func (h *histfile) sessionGC(sessionsToDrop chan string) {
}
}
// writer reads records from channel, merges them and writes them to file
// writer reads records from channel, merges them and writes them to file
func ( h * h istfile) writer ( input chan records . Record ) {
func ( h * H istfile) writer ( input chan records . Record ) {
for {
for {
func ( ) {
func ( ) {
record := <- input
record := <- input
h . m utex. Lock ( )
h . sessionsM utex. Lock ( )
defer h . m utex. Unlock ( )
defer h . sessionsM utex. Unlock ( )
if record . PartOne {
if record . PartOne {
if _ , found := h . sessions [ record . SessionID ] ; found {
if _ , found := h . sessions [ record . SessionID ] ; found {
log . Println ( "histfile ERROR: Got another first part of the records before merging the previous one - overwriting!" )
log . Println ( "histfile WARN: Got another first part of the records before merging the previous one - overwriting! " +
"(this happens in bash because bash-preexec runs when it's not supposed to)" )
}
}
h . sessions [ record . SessionID ] = record
h . sessions [ record . SessionID ] = record
} else {
} else {
part1 , found := h . sessions [ record . SessionID ]
if part1 , found := h . sessions [ record . SessionID ] ; found == false {
if found == false {
log . Println ( "histfile ERROR: Got second part of records and nothing to merge it with - ignoring!" )
log . Println ( "histfile ERROR: Got second part of records and nothing to merge it with - ignoring!" )
} else {
} else {
delete ( h . sessions , record . SessionID )
delete ( h . sessions , record . SessionID )
go mergeAndWriteRecord ( part1 , record , h . outputPath )
go h . mergeAndWriteRecord ( part1 , record )
}
}
}
}
} ( )
} ( )
}
}
}
}
func mergeAndWriteRecord ( part1 , part2 records . Record , outputPath string ) {
func ( h * Histfile ) mergeAndWriteRecord ( part1 , part2 records . Record ) {
err := part1 . Merge ( part2 )
err := part1 . Merge ( part2 )
if err != nil {
if err != nil {
log . Println ( "Error while merging" , err )
log . Println ( "Error while merging" , err )
return
return
}
}
writeRecord ( part1 , outputPath )
func ( ) {
h . recentMutex . Lock ( )
defer h . recentMutex . Unlock ( )
h . recentRecords = append ( h . recentRecords , part1 )
cmdLine := part1 . CmdLine
idx , found := h . cmdLinesLastIndex [ cmdLine ]
if found {
h . recentCmdLines = append ( h . recentCmdLines [ : idx ] , h . recentCmdLines [ idx + 1 : ] ... )
}
h . cmdLinesLastIndex [ cmdLine ] = len ( h . recentCmdLines )
h . recentCmdLines = append ( h . recentCmdLines , cmdLine )
} ( )
writeRecord ( part1 , h . historyPath )
}
}
func writeRecord ( rec records . Record , outputPath string ) {
func writeRecord ( rec records . Record , outputPath string ) {
@ -95,3 +127,8 @@ func writeRecord(rec records.Record, outputPath string) {
return
return
}
}
}
}
// GetRecentCmdLines returns recent cmdLines
func ( h * Histfile ) GetRecentCmdLines ( limit int ) [ ] string {
return h . recentCmdLines
}