Write to temp file & move / gofmting
This commit is contained in:
parent
2463dab958
commit
1a4b008db3
224
src/archive.go
224
src/archive.go
|
@ -10,86 +10,87 @@ import (
|
|||
"regexp"
|
||||
"strconv"
|
||||
"time"
|
||||
"gopkg.in/alecthomas/kingpin.v2"
|
||||
|
||||
"github.com/remeh/sizedwaitgroup"
|
||||
"github.com/rgeoghegan/tabulate"
|
||||
"gopkg.in/alecthomas/kingpin.v2"
|
||||
)
|
||||
|
||||
|
||||
var (
|
||||
cmd_import = kingpin.Command("import", "Import raw logs into archives")
|
||||
cmdImport = kingpin.Command("import", "Import raw logs into archives")
|
||||
|
||||
cmd_import_dir = cmd_import.Flag("dir", "dir containing raw znc log files").Short('d').Required().String()
|
||||
cmd_import_output = cmd_import.Flag("output", "dir to place created archives").Short('o').Required().String()
|
||||
cmd_import_all = cmd_import.Flag("all", "Import all log files, not only channels").Bool()
|
||||
cmd_import_parallel = cmd_import.Flag("parallel", "How many importers can run in parallel").Short('p').Default("4").Int()
|
||||
cmdImportDir = cmdImport.Flag("dir", "dir containing raw znc log files").Short('d').Required().String()
|
||||
cmdImportOutput = cmdImport.Flag("output", "dir to place created archives").Short('o').Required().String()
|
||||
cmdImportAll = cmdImport.Flag("all", "Import all log files, not only channels").Bool()
|
||||
cmdImportParallel = cmdImport.Flag("parallel", "How many importers can run in parallel").Short('p').Default("4").Int()
|
||||
|
||||
cmd_inspect = kingpin.Command("inspect", "Enumerate the contents of archives")
|
||||
cmd_inspect_fpath = cmd_inspect.Flag("file", "log archive file to inspect").Short('f').Required().String()
|
||||
cmd_inspect_detail = cmd_inspect.Flag("detail", "show detailed portion information").Bool()
|
||||
cmdInspect = kingpin.Command("inspect", "Enumerate the contents of archives")
|
||||
cmdInspectFpath = cmdInspect.Flag("file", "log archive file to inspect").Short('f').Required().String()
|
||||
cmdInspectDetail = cmdInspect.Flag("detail", "show detailed portion information").Bool()
|
||||
|
||||
cmd_slice = kingpin.Command("slice", "Extract potions of archives given a date range")
|
||||
cmd_slice_src = cmd_slice.Flag("src", "Source archive file").Short('s').Required().ExistingFile()
|
||||
cmd_slice_dest = cmd_slice.Flag("dest", "Dest archive file").Short('d').Required().String()
|
||||
cmd_slice_start = cmd_slice.Flag("start", "Start timestamp such as 2016-1-1").String()
|
||||
cmd_slice_end = cmd_slice.Flag("end", "End timestamp such as 2016-12-31").String()
|
||||
cmd_slice_raw = cmd_slice.Flag("all", "Export raw lines instead of archives").Bool()
|
||||
cmdSlice = kingpin.Command("slice", "Extract potions of archives given a date range")
|
||||
cmdSliceSrc = cmdSlice.Flag("src", "Source archive file").Short('s').Required().ExistingFile()
|
||||
cmdSliceDest = cmdSlice.Flag("dest", "Dest archive file").Short('d').Required().String()
|
||||
cmdSliceStart = cmdSlice.Flag("start", "Start timestamp such as 2016-1-1").String()
|
||||
cmdSliceEnd = cmdSlice.Flag("end", "End timestamp such as 2016-12-31").String()
|
||||
cmdSliceRaw = cmdSlice.Flag("all", "Export raw lines instead of archives").Bool()
|
||||
|
||||
cmd_split = kingpin.Command("split", "Split archives by date")
|
||||
cmd_split_src = cmd_split.Flag("src", "Source archive file").Short('s').Required().ExistingFile()
|
||||
cmd_split_dest = cmd_split.Flag("dest", "Dir to dump logs into").Short('d').Required().String()
|
||||
cmdSplit = kingpin.Command("split", "Split archives by date")
|
||||
cmdSplitSrc = cmdSplit.Flag("src", "Source archive file").Short('s').Required().ExistingFile()
|
||||
cmdSplitDest = cmdSplit.Flag("dest", "Dir to dump logs into").Short('d').Required().String()
|
||||
|
||||
cmd_gap = kingpin.Command("gaps", "Find time gaps in archives")
|
||||
cmd_gap_src = cmd_gap.Flag("file", "Source archive file").Short('f').Required().ExistingFile()
|
||||
cmdGap = kingpin.Command("gaps", "Find time gaps in archives")
|
||||
cmdGapSrc = cmdGap.Flag("file", "Source archive file").Short('f').Required().ExistingFile()
|
||||
)
|
||||
|
||||
// LogInfo holds info about a log we may import
|
||||
type LogInfo struct {
|
||||
file os.FileInfo
|
||||
path string
|
||||
file os.FileInfo
|
||||
path string
|
||||
network string
|
||||
channel string
|
||||
date time.Time
|
||||
date time.Time
|
||||
}
|
||||
|
||||
// Given a source dir, scan the files and return a Slice of LogInfo structs describing the logs
|
||||
func discover_logs(srcdir string) ([]LogInfo) {
|
||||
var logs []LogInfo;
|
||||
// DiscoverLogs given a source dir, scan the files and return a Slice of LogInfo structs describing the logs
|
||||
func DiscoverLogs(srcdir string) []LogInfo {
|
||||
var logs []LogInfo
|
||||
|
||||
files, err := ioutil.ReadDir(srcdir)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
for _, file := range files { // TODO parallelize log parsing?
|
||||
_log_info := parse_log_name(file.Name())
|
||||
_log_info.file = file
|
||||
_log_info.path = filepath.Join(srcdir, file.Name()) // TODO normalize srcdir
|
||||
logs = append(logs, _log_info)
|
||||
for _, file := range files { // TODO parallelize log parsing?
|
||||
tmpLogInfo := ParseLogName(file.Name())
|
||||
tmpLogInfo.file = file
|
||||
tmpLogInfo.path = filepath.Join(srcdir, file.Name()) // TODO normalize srcdir
|
||||
logs = append(logs, tmpLogInfo)
|
||||
}
|
||||
return logs
|
||||
}
|
||||
|
||||
var re_fname = regexp.MustCompile("((?P<network>[^_]+)_)?(?P<channel>.+)_(?P<date>[0-9]+)\\.log")
|
||||
var reFname = regexp.MustCompile("((?P<network>[^_]+)_)?(?P<channel>.+)_(?P<date>[0-9]+)\\.log")
|
||||
|
||||
// Given the name of a logfile, return a LogInfo object containing info parsed from the fname
|
||||
func parse_log_name(logname string) (LogInfo) {
|
||||
// ParseLogName given the name of a logfile, return a LogInfo object containing info parsed from the fname
|
||||
func ParseLogName(logname string) LogInfo {
|
||||
|
||||
matches := re_fname.FindStringSubmatch(logname)
|
||||
if len(matches) != 5 { // re should match [garbage, garbage, network, channel, date]
|
||||
matches := reFname.FindStringSubmatch(logname)
|
||||
if len(matches) != 5 { // re should match [garbage, garbage, network, channel, date]
|
||||
panic(fmt.Sprintf("Wrong number of matched fields matched for %v: %+v", logname, matches))
|
||||
}
|
||||
|
||||
log_info := LogInfo{
|
||||
logInfo := LogInfo{
|
||||
network: matches[2],
|
||||
channel: matches[3],
|
||||
date: ParseDate(matches[4]),
|
||||
date: ParseDate(matches[4]),
|
||||
}
|
||||
|
||||
return log_info
|
||||
return logInfo
|
||||
}
|
||||
|
||||
// Load the contents of a logfile into a 2d byte array. Each top-level entry is a line from the log.
|
||||
func load_raw_log(fpath string) ([][]byte, int) {
|
||||
var lines [][]byte;
|
||||
// LoadRawLog load the contents of a logfile into a 2d byte array. Each top-level entry is a line from the log.
|
||||
func LoadRawLog(fpath string) ([][]byte, int) {
|
||||
var lines [][]byte
|
||||
totalsize := 0
|
||||
|
||||
f, err := os.Open(fpath)
|
||||
|
@ -109,53 +110,52 @@ func load_raw_log(fpath string) ([][]byte, int) {
|
|||
return lines, totalsize
|
||||
}
|
||||
|
||||
// Dump all given logs into a single archive
|
||||
func archive_log(logs []LogInfo, archive_path string) {
|
||||
// ArchiveLog dumps all given logs into a single archive
|
||||
func ArchiveLog(logs []LogInfo, archivePath string) {
|
||||
archive := CombinedLogfile{
|
||||
fpath: archive_path,
|
||||
fpath: archivePath,
|
||||
}
|
||||
archive.Parse()
|
||||
// For each log
|
||||
for _, log := range logs {
|
||||
// Load the log into a LogPortion
|
||||
log_data, total_size := load_raw_log(log.path)
|
||||
logData, totalSize := LoadRawLog(log.path)
|
||||
logportion := LogPortion{
|
||||
meta: PortionMeta{
|
||||
Channel: log.channel,
|
||||
Date: log.date,
|
||||
Lines: len(log_data),
|
||||
Name: log.file.Name(),
|
||||
Date: log.date,
|
||||
Lines: len(logData),
|
||||
Name: log.file.Name(),
|
||||
Network: log.network,
|
||||
Size: total_size,
|
||||
Size: totalSize,
|
||||
},
|
||||
lines: log_data,
|
||||
lines: logData,
|
||||
}
|
||||
// Add porition to archive
|
||||
archive.AddPortion(logportion)
|
||||
}
|
||||
// Write archive
|
||||
err := archive.Write(archive_path)
|
||||
err := archive.Write(archivePath)
|
||||
if err != nil {
|
||||
fmt.Printf("Could not write %s - %v\n", archive_path, err)
|
||||
fmt.Printf("Could not write %s - %v\n", archivePath, err)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// Entrypoint for the `import` command. Given an srcdir, scan it for log files. The log files will be sorted by channel
|
||||
// and combined into an archive file per channel, placed in `outdir`. The `impall` flag determines whether only channel
|
||||
// logs will be imported. If `true`, non-channel logs, such as PMs or server messages, will be archived too.
|
||||
func cmd_import_do(srcdir string, outdir string, impall bool, parallel int) {
|
||||
raw_logs := discover_logs(srcdir)
|
||||
func cmdImportDo(srcdir string, outdir string, impall bool, parallel int) {
|
||||
rawLogs := DiscoverLogs(srcdir)
|
||||
|
||||
// Sort logs by channel
|
||||
bychannel := make(map[string][]LogInfo)
|
||||
for _, log := range raw_logs {
|
||||
if *cmd_import_all || log.channel[0] == '#' {
|
||||
for _, log := range rawLogs {
|
||||
if *cmdImportAll || log.channel[0] == '#' {
|
||||
bychannel[log.channel] = append(bychannel[log.channel], log)
|
||||
}
|
||||
}
|
||||
|
||||
fmt.Printf("Discovered %v raw logs\n\n", len(raw_logs))
|
||||
fmt.Printf("Discovered %v raw logs\n\n", len(rawLogs))
|
||||
|
||||
// For each channel
|
||||
wg := sizedwaitgroup.New(parallel)
|
||||
|
@ -164,21 +164,21 @@ func cmd_import_do(srcdir string, outdir string, impall bool, parallel int) {
|
|||
fmt.Printf("Reading %v portions for %s\n", len(logs), channel)
|
||||
|
||||
// Open archive file for channel
|
||||
archive_path := filepath.Join(outdir, fmt.Sprintf("%s.log", channel))
|
||||
archivePath := filepath.Join(outdir, fmt.Sprintf("%s.log", channel))
|
||||
|
||||
// Archive the channels in parallel
|
||||
wg.Add()
|
||||
go func(logs []LogInfo, archive_path string) {
|
||||
go func(logs []LogInfo, archivePath string) {
|
||||
defer wg.Done()
|
||||
archive_log(logs, archive_path)
|
||||
}(logs, archive_path)
|
||||
ArchiveLog(logs, archivePath)
|
||||
}(logs, archivePath)
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
// Entrypint for the `inspect` command. Load an archive file and
|
||||
func cmd_inspect_do(fpath string, detail bool) {
|
||||
func cmdInspectDo(fpath string, detail bool) {
|
||||
log := &CombinedLogfile{
|
||||
fpath: fpath,
|
||||
}
|
||||
|
@ -190,15 +190,15 @@ func cmd_inspect_do(fpath string, detail bool) {
|
|||
}
|
||||
|
||||
table := [][]string{
|
||||
[]string{"file", path.Base(fpath)},
|
||||
[]string{"channel", log.Channel},
|
||||
[]string{"network", log.Network},
|
||||
[]string{"portions", strconv.Itoa(len(log.portions))},
|
||||
[]string{"lines", strconv.Itoa(log.TotalLines())},
|
||||
[]string{"start", lmin.Format(ARCHTIMEFMT2)},
|
||||
[]string{"end", lmax.Format(ARCHTIMEFMT2)},
|
||||
{"file", path.Base(fpath)},
|
||||
{"channel", log.Channel},
|
||||
{"network", log.Network},
|
||||
{"portions", strconv.Itoa(len(log.portions))},
|
||||
{"lines", strconv.Itoa(log.TotalLines())},
|
||||
{"start", lmin.Format(ARCHTIMEFMT2)},
|
||||
{"end", lmax.Format(ARCHTIMEFMT2)},
|
||||
}
|
||||
layout := &tabulate.Layout{Headers:[]string{"property", "value"}, Format:tabulate.SimpleFormat}
|
||||
layout := &tabulate.Layout{Headers: []string{"property", "value"}, Format: tabulate.SimpleFormat}
|
||||
asText, _ := tabulate.Tabulate(table, layout)
|
||||
fmt.Print(asText)
|
||||
|
||||
|
@ -206,37 +206,37 @@ func cmd_inspect_do(fpath string, detail bool) {
|
|||
// Print a table show line and byte counts for each portion
|
||||
table := [][]string{}
|
||||
|
||||
total_bytes := 0
|
||||
total_lines := 0
|
||||
totalBytes := 0
|
||||
totalLines := 0
|
||||
|
||||
for _, portion := range log.portions {
|
||||
|
||||
row_bytes := 0
|
||||
rowBytes := 0
|
||||
for _, line := range portion.lines {
|
||||
row_bytes += len(line)
|
||||
rowBytes += len(line)
|
||||
}
|
||||
total_bytes += row_bytes
|
||||
total_lines += len(portion.lines)
|
||||
totalBytes += rowBytes
|
||||
totalLines += len(portion.lines)
|
||||
|
||||
table = append(table, []string{portion.meta.Name,
|
||||
portion.meta.Network,
|
||||
portion.meta.Channel,
|
||||
portion.meta.Date.Format(ARCHTIMEFMT2),
|
||||
fmt.Sprintf("%v", len(portion.lines)),
|
||||
fmt.Sprintf("%v", row_bytes)})
|
||||
portion.meta.Network,
|
||||
portion.meta.Channel,
|
||||
portion.meta.Date.Format(ARCHTIMEFMT2),
|
||||
fmt.Sprintf("%v", len(portion.lines)),
|
||||
fmt.Sprintf("%v", rowBytes)})
|
||||
}
|
||||
|
||||
table = append(table, []string{"", "", "", "", "", ""})
|
||||
table = append(table, []string{"", "", "", "total:", fmt.Sprintf("%v", total_lines), fmt.Sprintf("%v", total_bytes)})
|
||||
table = append(table, []string{"", "", "", "total:", fmt.Sprintf("%v", totalLines), fmt.Sprintf("%v", totalBytes)})
|
||||
|
||||
layout := &tabulate.Layout{Headers:[]string{"portion file", "network", "channel", "date", "lines", "mbytes"}, Format:tabulate.SimpleFormat}
|
||||
layout := &tabulate.Layout{Headers: []string{"portion file", "network", "channel", "date", "lines", "mbytes"}, Format: tabulate.SimpleFormat}
|
||||
asText, _ := tabulate.Tabulate(table, layout)
|
||||
fmt.Print(asText)
|
||||
}
|
||||
}
|
||||
|
||||
// Extract a date range from an archive
|
||||
func cmd_slice_do(srcpath string, destpath string, starttime string, endtime string, raw bool) {
|
||||
func cmdSliceDo(srcpath string, destpath string, starttime string, endtime string, raw bool) {
|
||||
log := &CombinedLogfile{
|
||||
fpath: srcpath,
|
||||
}
|
||||
|
@ -259,26 +259,27 @@ func cmd_slice_do(srcpath string, destpath string, starttime string, endtime str
|
|||
}
|
||||
|
||||
// Split an archive back into original log files
|
||||
func cmd_split_do(srcpath string, destdir string) {
|
||||
func cmdSplitDo(srcpath string, destdir string) {
|
||||
log := &CombinedLogfile{
|
||||
fpath: srcpath,
|
||||
}
|
||||
log.Parse()
|
||||
logs_written, err := log.WriteOriginals(destdir)
|
||||
logsWritten, err := log.WriteOriginals(destdir)
|
||||
check(err)
|
||||
|
||||
fmt.Printf("Wrote %v logs\n", logs_written)
|
||||
fmt.Printf("Wrote %v logs\n", logsWritten)
|
||||
}
|
||||
|
||||
// Gap represents a time window where logs are missing
|
||||
type Gap struct {
|
||||
start time.Time
|
||||
end time.Time
|
||||
days int
|
||||
end time.Time
|
||||
days int
|
||||
}
|
||||
|
||||
// Find time windows with no logs in the archive
|
||||
func cmd_gaps_do(srcpath string) {
|
||||
var gaps []Gap;
|
||||
func cmdGapsDo(srcpath string) {
|
||||
var gaps []Gap
|
||||
|
||||
log := &CombinedLogfile{
|
||||
fpath: srcpath,
|
||||
|
@ -286,34 +287,33 @@ func cmd_gaps_do(srcpath string) {
|
|||
log.Parse()
|
||||
log.Sort()
|
||||
|
||||
var lastPortion LogPortion;
|
||||
var lastPortion LogPortion
|
||||
first := true
|
||||
|
||||
for _, portion := range log.portions {
|
||||
if first {
|
||||
first = false
|
||||
} else {
|
||||
lastShouldEqual := portion.meta.Date.AddDate(0, 0, -1) // Subtract 1 day
|
||||
lastShouldEqual := portion.meta.Date.AddDate(0, 0, -1) // Subtract 1 day
|
||||
if lastShouldEqual != lastPortion.meta.Date {
|
||||
breakstart := lastPortion.meta.Date.AddDate(0, 0, 1)
|
||||
breakend := portion.meta.Date.AddDate(0, 0, -1)
|
||||
gaps = append(gaps, Gap{start: breakstart,
|
||||
end: breakend,
|
||||
days: int(portion.meta.Date.Sub(breakstart).Hours()) / 24})
|
||||
end: breakend,
|
||||
days: int(portion.meta.Date.Sub(breakstart).Hours()) / 24})
|
||||
}
|
||||
}
|
||||
lastPortion = portion
|
||||
}
|
||||
|
||||
|
||||
table := [][]string{}
|
||||
for _, gap := range gaps {
|
||||
table = append(table, []string{gap.start.Format(ARCHTIMEFMT2),
|
||||
gap.end.Format(ARCHTIMEFMT2),
|
||||
strconv.Itoa(gap.days)})
|
||||
gap.end.Format(ARCHTIMEFMT2),
|
||||
strconv.Itoa(gap.days)})
|
||||
}
|
||||
|
||||
layout := &tabulate.Layout{Headers:[]string{"start", "end", "days"}, Format:tabulate.SimpleFormat}
|
||||
layout := &tabulate.Layout{Headers: []string{"start", "end", "days"}, Format: tabulate.SimpleFormat}
|
||||
asText, _ := tabulate.Tabulate(table, layout)
|
||||
fmt.Println("Missing log segments:\n")
|
||||
fmt.Print(asText)
|
||||
|
@ -321,15 +321,15 @@ func cmd_gaps_do(srcpath string) {
|
|||
|
||||
func main() {
|
||||
switch kingpin.Parse() {
|
||||
case "import":
|
||||
cmd_import_do(*cmd_import_dir, *cmd_import_output, *cmd_import_all, *cmd_import_parallel)
|
||||
case "inspect":
|
||||
cmd_inspect_do(*cmd_inspect_fpath, *cmd_inspect_detail)
|
||||
case "slice":
|
||||
cmd_slice_do(*cmd_slice_src, *cmd_slice_dest, *cmd_slice_start, *cmd_slice_end, *cmd_slice_raw)
|
||||
case "split":
|
||||
cmd_split_do(*cmd_split_src, *cmd_split_dest)
|
||||
case "gaps":
|
||||
cmd_gaps_do(*cmd_gap_src)
|
||||
case "import":
|
||||
cmdImportDo(*cmdImportDir, *cmdImportOutput, *cmdImportAll, *cmdImportParallel)
|
||||
case "inspect":
|
||||
cmdInspectDo(*cmdInspectFpath, *cmdInspectDetail)
|
||||
case "slice":
|
||||
cmdSliceDo(*cmdSliceSrc, *cmdSliceDest, *cmdSliceStart, *cmdSliceEnd, *cmdSliceRaw)
|
||||
case "split":
|
||||
cmdSplitDo(*cmdSplitSrc, *cmdSplitDest)
|
||||
case "gaps":
|
||||
cmdGapsDo(*cmdGapSrc)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,30 +1,33 @@
|
|||
|
||||
package main
|
||||
|
||||
import (
|
||||
"os"
|
||||
"fmt"
|
||||
"bufio"
|
||||
"bytes"
|
||||
"path/filepath"
|
||||
"encoding/json"
|
||||
"time"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sort"
|
||||
"time"
|
||||
)
|
||||
|
||||
|
||||
// ARCHTIMEFMT Time format A
|
||||
const ARCHTIMEFMT string = "20060102"
|
||||
|
||||
// ARCHTIMEFMT2 Time format 2
|
||||
const ARCHTIMEFMT2 string = "2006-01-02"
|
||||
|
||||
|
||||
type errorString struct { // TODO "trivial implementation of error"
|
||||
type errorString struct { // TODO "trivial implementation of error"
|
||||
s string
|
||||
}
|
||||
|
||||
func (e *errorString) Error() string {
|
||||
return e.s
|
||||
}
|
||||
|
||||
func ParseDate(datestr string) (time.Time) {
|
||||
// ParseDate Parse a log time
|
||||
func ParseDate(datestr string) time.Time {
|
||||
thetime, err := time.Parse(ARCHTIMEFMT, datestr)
|
||||
if err != nil {
|
||||
thetime, err := time.Parse(ARCHTIMEFMT2, datestr)
|
||||
|
@ -42,58 +45,77 @@ func check(e error) {
|
|||
}
|
||||
}
|
||||
|
||||
type JsonPortionMeta struct {
|
||||
Channel string `json:"channel"`
|
||||
Date string `json:"date"`
|
||||
Lines int `json:"lines"`
|
||||
Name string `json:"name"`
|
||||
Network string `json:"network"`
|
||||
Size int `json:"size"`
|
||||
// JSONPortionMeta represents json encoded metadata about one log portion
|
||||
type JSONPortionMeta struct {
|
||||
Channel string `json:"channel"`
|
||||
Date string `json:"date"`
|
||||
Lines int `json:"lines"`
|
||||
Name string `json:"name"`
|
||||
Network string `json:"network"`
|
||||
Size int `json:"size"`
|
||||
}
|
||||
|
||||
// PortionMeta holds metadata about one log portion
|
||||
type PortionMeta struct {
|
||||
Channel string
|
||||
Date time.Time
|
||||
Lines int
|
||||
Name string
|
||||
Network string
|
||||
Size int
|
||||
Size int
|
||||
}
|
||||
|
||||
// LogPortion holds meta + line data for one log portion
|
||||
type LogPortion struct {
|
||||
meta PortionMeta
|
||||
meta PortionMeta
|
||||
lines [][]byte
|
||||
}
|
||||
|
||||
// CombinedLogfile holds multiple portions and some more metadata
|
||||
type CombinedLogfile struct {
|
||||
fpath string
|
||||
fpath string
|
||||
portions []LogPortion
|
||||
Channel string
|
||||
Network string
|
||||
Channel string
|
||||
Network string
|
||||
}
|
||||
|
||||
func (self *CombinedLogfile) Write(destpath string) (error) {
|
||||
if len(self.portions) == 0 {
|
||||
// Write the archive's contents to a tempfile then move it to the passed destpath. The tempfile will be in the same dir
|
||||
// as the file named by destpath. If destpath is blank, this log's current path is used. If there are no portions in the
|
||||
// log an error is raised (there's nothing to write)
|
||||
func (cl *CombinedLogfile) Write(destpath string) error {
|
||||
if len(cl.portions) == 0 {
|
||||
return &errorString{"no portions"}
|
||||
}
|
||||
if destpath == "" {
|
||||
destpath = self.fpath
|
||||
destpath = cl.fpath
|
||||
}
|
||||
fmt.Printf("Writing %v portions for %s\n", len(self.portions), self.Channel)
|
||||
self.Sort()
|
||||
|
||||
f, err := os.OpenFile(destpath, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0644)
|
||||
var writeDir = filepath.Dir(destpath)
|
||||
tmpfile, err := ioutil.TempFile(writeDir, ".ilogtmp-")
|
||||
check(err)
|
||||
defer f.Close()
|
||||
w := bufio.NewWriter(f)
|
||||
defer tmpfile.Close()
|
||||
defer os.Remove(tmpfile.Name())
|
||||
|
||||
fmt.Printf("Writing %v portions for %s\n", len(cl.portions), cl.Channel)
|
||||
cl.Sort()
|
||||
|
||||
w := bufio.NewWriter(tmpfile)
|
||||
cl.WriteReal(*w)
|
||||
|
||||
os.Rename(tmpfile.Name(), destpath)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// WriteReal performs the actual serialization writing to the given writer
|
||||
func (cl *CombinedLogfile) WriteReal(w bufio.Writer) error {
|
||||
// Write magic header
|
||||
w.WriteString(fmt.Sprintf("#$$$COMBINEDLOG '%s'\n", self.Channel))
|
||||
w.WriteString(fmt.Sprintf("#$$$COMBINEDLOG '%s'\n", cl.Channel))
|
||||
|
||||
// Write every portion
|
||||
for _, portion := range self.portions {
|
||||
w.WriteString(fmt.Sprintf("#$$$BEGINPORTION %s\n", self.ConvertMetaToJson(portion.meta)))
|
||||
for _, line := range portion.lines {
|
||||
for _, portion := range cl.portions {
|
||||
w.WriteString(fmt.Sprintf("#$$$BEGINPORTION %s\n", cl.ConvertMetaToJSON(portion.meta)))
|
||||
for _, line := range portion.lines {
|
||||
for _, b := range line {
|
||||
w.WriteByte(b)
|
||||
}
|
||||
|
@ -105,9 +127,10 @@ func (self *CombinedLogfile) Write(destpath string) (error) {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (self *CombinedLogfile) WriteOriginals(destdir string) (int, error) {
|
||||
// WriteOriginals recreates the original logfiles from this archive
|
||||
func (cl *CombinedLogfile) WriteOriginals(destdir string) (int, error) {
|
||||
written := 0
|
||||
for _, portion := range self.portions {
|
||||
for _, portion := range cl.portions {
|
||||
f, err := os.OpenFile(filepath.Join(destdir, portion.meta.Name), os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0644)
|
||||
check(err)
|
||||
w := bufio.NewWriter(f)
|
||||
|
@ -119,100 +142,103 @@ func (self *CombinedLogfile) WriteOriginals(destdir string) (int, error) {
|
|||
}
|
||||
check(w.Flush())
|
||||
f.Close()
|
||||
written += 1
|
||||
written++
|
||||
}
|
||||
return written, nil
|
||||
}
|
||||
|
||||
func (self *CombinedLogfile) ConvertMetaToJson(meta PortionMeta) string {
|
||||
jmeta := JsonPortionMeta{
|
||||
// ConvertMetaToJSON marshall the metadata struct to json TODO research reflection
|
||||
func (cl *CombinedLogfile) ConvertMetaToJSON(meta PortionMeta) string {
|
||||
jmeta := JSONPortionMeta{
|
||||
Channel: meta.Channel,
|
||||
Date: meta.Date.Format(ARCHTIMEFMT),
|
||||
Lines: meta.Lines,
|
||||
Name: meta.Name,
|
||||
Date: meta.Date.Format(ARCHTIMEFMT),
|
||||
Lines: meta.Lines,
|
||||
Name: meta.Name,
|
||||
Network: meta.Network,
|
||||
Size: meta.Size,
|
||||
Size: meta.Size,
|
||||
}
|
||||
|
||||
jmeta_enc, err := json.Marshal(jmeta)
|
||||
jmetaEnc, err := json.Marshal(jmeta)
|
||||
check(err)
|
||||
|
||||
return string(jmeta_enc)
|
||||
return string(jmetaEnc)
|
||||
}
|
||||
|
||||
func (self *CombinedLogfile) Sort() {
|
||||
sort.Slice(self.portions,
|
||||
func(i, j int) bool { return self.portions[i].meta.Date.Before(self.portions[j].meta.Date) })
|
||||
// Sort the portions
|
||||
func (cl *CombinedLogfile) Sort() {
|
||||
sort.Slice(cl.portions,
|
||||
func(i, j int) bool { return cl.portions[i].meta.Date.Before(cl.portions[j].meta.Date) })
|
||||
}
|
||||
|
||||
func (self *CombinedLogfile) Parse() {
|
||||
// Parse the log file and populate this struct
|
||||
func (cl *CombinedLogfile) Parse() {
|
||||
HEADER := []byte("#$$$COMBINEDLOG")
|
||||
PORTIONHEADER := []byte("#$$$BEGINPORTION")
|
||||
ENDPORTIONHEADER := []byte("#$$$ENDPORTION")
|
||||
|
||||
if _, err := os.Stat(self.fpath); os.IsNotExist(err) {
|
||||
if _, err := os.Stat(cl.fpath); os.IsNotExist(err) {
|
||||
return
|
||||
}
|
||||
|
||||
f, err := os.Open(self.fpath)
|
||||
f, err := os.Open(cl.fpath)
|
||||
check(err)
|
||||
defer f.Close()
|
||||
|
||||
scanner := bufio.NewScanner(f)
|
||||
scanner.Scan()
|
||||
var first_line []byte = scanner.Bytes()
|
||||
if !bytes.HasPrefix(first_line, HEADER) {
|
||||
var firstLine = scanner.Bytes()
|
||||
if !bytes.HasPrefix(firstLine, HEADER) {
|
||||
panic("Missing magic header")
|
||||
}
|
||||
|
||||
lines := 1
|
||||
meta := PortionMeta{}
|
||||
var sectiondata [][]byte
|
||||
var in_portion bool = false
|
||||
var inPortion = false
|
||||
|
||||
for scanner.Scan() {
|
||||
lines++
|
||||
var lineb []byte = scanner.Bytes()
|
||||
var lineb = scanner.Bytes()
|
||||
if bytes.HasPrefix(lineb, PORTIONHEADER) {
|
||||
if in_portion {
|
||||
if inPortion {
|
||||
panic("Found portion start while in portion")
|
||||
}
|
||||
in_portion = true
|
||||
inPortion = true
|
||||
sectiondata = [][]byte{}
|
||||
line := string(lineb)
|
||||
var meta_blob string = line[len(PORTIONHEADER) + 1:]
|
||||
parsedmeta := JsonPortionMeta{}
|
||||
err = json.Unmarshal([]byte(meta_blob), &parsedmeta)
|
||||
var metaBlob = line[len(PORTIONHEADER)+1:]
|
||||
parsedmeta := JSONPortionMeta{}
|
||||
err = json.Unmarshal([]byte(metaBlob), &parsedmeta)
|
||||
if err != nil {
|
||||
panic(err) // Could not parse portion metadata json
|
||||
panic(err) // Could not parse portion metadata json
|
||||
}
|
||||
// Find channel
|
||||
if self.Channel == "" && parsedmeta.Channel != "" {
|
||||
self.Channel = parsedmeta.Channel
|
||||
if cl.Channel == "" && parsedmeta.Channel != "" {
|
||||
cl.Channel = parsedmeta.Channel
|
||||
}
|
||||
if self.Channel != "" && parsedmeta.Channel != "" && parsedmeta.Channel != self.Channel {
|
||||
if cl.Channel != "" && parsedmeta.Channel != "" && parsedmeta.Channel != cl.Channel {
|
||||
panic(fmt.Sprintf("Originally parsed channel %s but now found %s at line %v",
|
||||
self.Channel, parsedmeta.Channel, lines))
|
||||
cl.Channel, parsedmeta.Channel, lines))
|
||||
}
|
||||
// Find network
|
||||
if self.Network == "" && parsedmeta.Network != "" {
|
||||
self.Network = parsedmeta.Network
|
||||
if cl.Network == "" && parsedmeta.Network != "" {
|
||||
cl.Network = parsedmeta.Network
|
||||
}
|
||||
if self.Network != "" && parsedmeta.Network != "" && parsedmeta.Network != self.Network {
|
||||
if cl.Network != "" && parsedmeta.Network != "" && parsedmeta.Network != cl.Network {
|
||||
panic(fmt.Sprintf("Originally parsed network %s but now found %s at line %v",
|
||||
self.Network, parsedmeta.Network, lines))
|
||||
cl.Network, parsedmeta.Network, lines))
|
||||
}
|
||||
meta = PortionMeta{
|
||||
Channel: parsedmeta.Channel,
|
||||
Date: ParseDate(parsedmeta.Date),
|
||||
Lines: parsedmeta.Lines,
|
||||
Name: parsedmeta.Name,
|
||||
Date: ParseDate(parsedmeta.Date),
|
||||
Lines: parsedmeta.Lines,
|
||||
Name: parsedmeta.Name,
|
||||
Network: parsedmeta.Network,
|
||||
Size: parsedmeta.Size,
|
||||
Size: parsedmeta.Size,
|
||||
}
|
||||
continue
|
||||
} else if bytes.HasPrefix(lineb, ENDPORTIONHEADER) {
|
||||
if !in_portion {
|
||||
if !inPortion {
|
||||
fmt.Println(string(lineb))
|
||||
panic(fmt.Sprintf("Found portion end while not in portion at line %v", lines))
|
||||
}
|
||||
|
@ -220,12 +246,12 @@ func (self *CombinedLogfile) Parse() {
|
|||
// lol why does this trigger
|
||||
// panic(fmt.Sprintf("Meta indicated %v lines, but parsed %v", meta.Lines, len(sectiondata)))
|
||||
}
|
||||
in_portion = false
|
||||
inPortion = false
|
||||
logportion := LogPortion{
|
||||
meta: meta,
|
||||
meta: meta,
|
||||
lines: sectiondata,
|
||||
}
|
||||
self.AddPortion(logportion)
|
||||
cl.AddPortion(logportion)
|
||||
} else {
|
||||
// Just data
|
||||
b := make([]byte, len(lineb))
|
||||
|
@ -233,64 +259,68 @@ func (self *CombinedLogfile) Parse() {
|
|||
sectiondata = append(sectiondata, b)
|
||||
}
|
||||
}
|
||||
if in_portion {
|
||||
if inPortion {
|
||||
panic("EOF while still in portion?")
|
||||
}
|
||||
}
|
||||
|
||||
func (self *CombinedLogfile) TotalLines() int {
|
||||
// TotalLines returns the total number log content lines
|
||||
func (cl *CombinedLogfile) TotalLines() int {
|
||||
total := 0
|
||||
for _, portion := range self.portions {
|
||||
for _, portion := range cl.portions {
|
||||
total += len(portion.lines)
|
||||
}
|
||||
return total
|
||||
}
|
||||
|
||||
func (self *CombinedLogfile) AddPortion(newportion LogPortion) {
|
||||
// CHECK self and new channels/networks match
|
||||
if self.Channel == "" {
|
||||
self.Channel = newportion.meta.Channel // TODO set attr on all children
|
||||
} else if newportion.meta.Channel != "" && self.Channel != newportion.meta.Channel {
|
||||
// AddPortion validates and adds a portion to the log
|
||||
func (cl *CombinedLogfile) AddPortion(newportion LogPortion) {
|
||||
// CHECK cl and new channels/networks match
|
||||
if cl.Channel == "" {
|
||||
cl.Channel = newportion.meta.Channel // TODO set attr on all children
|
||||
} else if newportion.meta.Channel != "" && cl.Channel != newportion.meta.Channel {
|
||||
panic(fmt.Sprintf("Attempted to add portion with channel '%s' to archive with channel '%s'. Log: %s",
|
||||
newportion.meta.Channel, self.Channel, newportion.meta.Name))
|
||||
newportion.meta.Channel, cl.Channel, newportion.meta.Name))
|
||||
}
|
||||
if self.Network == "" {
|
||||
self.Network = newportion.meta.Network // TODO set attr on all children
|
||||
} else if newportion.meta.Network != "" && self.Network != newportion.meta.Network {
|
||||
if cl.Network == "" {
|
||||
cl.Network = newportion.meta.Network // TODO set attr on all children
|
||||
} else if newportion.meta.Network != "" && cl.Network != newportion.meta.Network {
|
||||
panic(fmt.Sprintf("Attempted to add portion with network '%s' to archive with network '%s'. Log: %s",
|
||||
newportion.meta.Network, self.Network, newportion.meta.Name))
|
||||
newportion.meta.Network, cl.Network, newportion.meta.Name))
|
||||
}
|
||||
// Remove any portions with identical date
|
||||
for i, portion := range self.portions {
|
||||
for i, portion := range cl.portions {
|
||||
if portion.meta.Date == newportion.meta.Date {
|
||||
self.portions[i] = self.portions[len(self.portions)-1]
|
||||
self.portions = self.portions[:len(self.portions)-1]
|
||||
cl.portions[i] = cl.portions[len(cl.portions)-1]
|
||||
cl.portions = cl.portions[:len(cl.portions)-1]
|
||||
}
|
||||
}
|
||||
self.portions = append(self.portions, newportion)
|
||||
cl.portions = append(cl.portions, newportion)
|
||||
}
|
||||
|
||||
func (self *CombinedLogfile) GetRange() (time.Time, time.Time, error) {
|
||||
if len(self.portions) == 0 {
|
||||
panic("no portions") // todo
|
||||
// GetRange returns the dates of the first and last portions
|
||||
func (cl *CombinedLogfile) GetRange() (time.Time, time.Time, error) {
|
||||
if len(cl.portions) == 0 {
|
||||
panic("no portions") // todo
|
||||
}
|
||||
self.Sort()
|
||||
return self.portions[0].meta.Date, self.portions[len(self.portions)-1].meta.Date, nil
|
||||
cl.Sort()
|
||||
return cl.portions[0].meta.Date, cl.portions[len(cl.portions)-1].meta.Date, nil
|
||||
}
|
||||
|
||||
// Exclude portions based on before/after some date
|
||||
func (self *CombinedLogfile) Limit(when time.Time, before bool) {
|
||||
b := self.portions[:0] // https://github.com/golang/go/wiki/SliceTricks#filtering-without-allocating
|
||||
for _, x := range self.portions {
|
||||
// Limit exclude portions based on before/after some date
|
||||
func (cl *CombinedLogfile) Limit(when time.Time, before bool) {
|
||||
b := cl.portions[:0] // https://github.com/golang/go/wiki/SliceTricks#filtering-without-allocating
|
||||
for _, x := range cl.portions {
|
||||
if before && (when.Before(x.meta.Date) || when == x.meta.Date) {
|
||||
b = append(b, x)
|
||||
} else if !before && (!when.Before(x.meta.Date) || when == x.meta.Date) {
|
||||
b = append(b, x)
|
||||
}
|
||||
}
|
||||
self.portions = b
|
||||
cl.portions = b
|
||||
}
|
||||
|
||||
func (self *CombinedLogfile) GetSpans() {
|
||||
// GetSpans does nothing yet
|
||||
func (cl *CombinedLogfile) GetSpans() {
|
||||
// TODO return slice of (start, end) time ranges present in the archive
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue