go: implement slicing archives
This commit is contained in:
parent
dad38143b0
commit
f5c4a64ea1
|
@ -1,17 +1,17 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"time"
|
||||
"path"
|
||||
"bufio"
|
||||
"strconv"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path"
|
||||
"path/filepath"
|
||||
"regexp"
|
||||
"gopkg.in/alecthomas/kingpin.v2" // argparser
|
||||
"github.com/remeh/sizedwaitgroup" // like ThreadPoolExecutor
|
||||
"strconv"
|
||||
"time"
|
||||
"gopkg.in/alecthomas/kingpin.v2"
|
||||
"github.com/remeh/sizedwaitgroup"
|
||||
"github.com/rgeoghegan/tabulate"
|
||||
)
|
||||
|
||||
|
@ -26,7 +26,13 @@ var (
|
|||
cmd_inspect = kingpin.Command("inspect", "Enumerate the contents of archives")
|
||||
cmd_inspect_fpath = cmd_inspect.Flag("file", "log archive file to inspect").Short('f').Required().String()
|
||||
|
||||
cmd_slice = kingpin.Command("slice", "Extract potions of archives")
|
||||
cmd_slice = kingpin.Command("slice", "Extract potions of archives given a date range")
|
||||
cmd_slice_src = cmd_slice.Flag("src", "Source archive file").Short('s').Required().ExistingFile()
|
||||
cmd_slice_dest = cmd_slice.Flag("dest", "Dest archive file").Short('d').Required().String()
|
||||
cmd_slice_start = cmd_slice.Flag("start", "Start timestamp such as 2016-1-1").String()
|
||||
cmd_slice_end = cmd_slice.Flag("end", "End timestamp such as 2016-12-31").String()
|
||||
cmd_slice_raw = cmd_slice.Flag("all", "Export raw lines instead of archives").Bool()
|
||||
|
||||
cmd_split = kingpin.Command("split", "Split archives by date")
|
||||
)
|
||||
|
||||
|
@ -38,6 +44,7 @@ type LogInfo struct {
|
|||
date time.Time
|
||||
}
|
||||
|
||||
// Given a source dir, scan the files and return a Slice of LogInfo structs describing the logs
|
||||
func discover_logs(srcdir string) ([]LogInfo) {
|
||||
var logs []LogInfo;
|
||||
|
||||
|
@ -56,6 +63,7 @@ func discover_logs(srcdir string) ([]LogInfo) {
|
|||
|
||||
var re_fname = regexp.MustCompile("((?P<network>[^_]+)_)?(?P<channel>.+)_(?P<date>[0-9]+)\\.log")
|
||||
|
||||
// Given the name of a logfile, return a LogInfo object containing info parsed from the fname
|
||||
func parse_log_name(logname string) (LogInfo) {
|
||||
|
||||
matches := re_fname.FindStringSubmatch(logname)
|
||||
|
@ -72,6 +80,7 @@ func parse_log_name(logname string) (LogInfo) {
|
|||
return log_info
|
||||
}
|
||||
|
||||
// Load the contents of a logfile into a 2d byte array. Each top-level entry is a line from the log.
|
||||
func load_raw_log(fpath string) ([][]byte, int) {
|
||||
var lines [][]byte;
|
||||
totalsize := 0
|
||||
|
@ -93,6 +102,7 @@ func load_raw_log(fpath string) ([][]byte, int) {
|
|||
return lines, totalsize
|
||||
}
|
||||
|
||||
// Dump all given logs into a single archive
|
||||
func archive_log(logs []LogInfo, archive_path string) {
|
||||
archive := CombinedLogfile{
|
||||
fpath: archive_path,
|
||||
|
@ -100,7 +110,6 @@ func archive_log(logs []LogInfo, archive_path string) {
|
|||
// For each log
|
||||
for _, log := range logs {
|
||||
// Load the log into a LogPortion
|
||||
|
||||
log_data, total_size := load_raw_log(log.path)
|
||||
logportion := LogPortion{
|
||||
meta: PortionMeta{
|
||||
|
@ -123,17 +132,16 @@ func archive_log(logs []LogInfo, archive_path string) {
|
|||
}
|
||||
}
|
||||
|
||||
func cmd_import_do(srcdir string, outdir string, impall bool) {
|
||||
fmt.Printf("import %s %s %v\n", srcdir, outdir, impall)
|
||||
|
||||
// Entrypoint for the `import` command. Given an srcdir, scan it for log files. The log files will be sorted by channel
|
||||
// and combined into an archive file per channel, placed in `outdir`. The `impall` flag determines whether only channel
|
||||
// logs will be imported. If `true`, non-channel logs, such as PMs or server messages, will be archived too.
|
||||
func cmd_import_do(srcdir string, outdir string, impall bool) {
|
||||
raw_logs := discover_logs(srcdir)
|
||||
|
||||
// Sort logs by channel
|
||||
bychannel := make(map[string][]LogInfo)
|
||||
|
||||
for _, log := range raw_logs {
|
||||
// fmt.Printf("Log %s is network %s channel %s date %s\n",
|
||||
// log.file.Name(), log.network, log.channel, log.date)
|
||||
if *cmd_import_all || log.channel[0] == '#' {
|
||||
bychannel[log.channel] = append(bychannel[log.channel], log)
|
||||
}
|
||||
|
@ -150,23 +158,22 @@ func cmd_import_do(srcdir string, outdir string, impall bool) {
|
|||
// Open archive file for channel
|
||||
archive_path := filepath.Join(outdir, fmt.Sprintf("%s.log", channel))
|
||||
|
||||
// Archive the channel
|
||||
// Archive the channels in parallel
|
||||
wg.Add()
|
||||
go func(logs []LogInfo, archive_path string) {
|
||||
defer wg.Done()
|
||||
archive_log(logs, archive_path)
|
||||
|
||||
}(logs, archive_path)
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
// Entrypint for the `inspect` command. Load an archive file and
|
||||
func cmd_inspect_do(fpath string) {
|
||||
log := &CombinedLogfile{
|
||||
fpath: fpath,
|
||||
}
|
||||
|
||||
log.Parse()
|
||||
|
||||
lmin, lmax, err := log.GetRange()
|
||||
|
@ -188,6 +195,29 @@ func cmd_inspect_do(fpath string) {
|
|||
fmt.Print(asText)
|
||||
}
|
||||
|
||||
// Extract a date range from an archive
|
||||
func cmd_slice_do(srcpath string, destpath string, starttime string, endtime string, raw bool) {
|
||||
log := &CombinedLogfile{
|
||||
fpath: srcpath,
|
||||
}
|
||||
log.Parse()
|
||||
|
||||
if starttime != "" {
|
||||
tstart := ParseDate(starttime)
|
||||
log.Limit(tstart, true)
|
||||
}
|
||||
|
||||
if endtime != "" {
|
||||
tend := ParseDate(endtime)
|
||||
log.Limit(tend, false)
|
||||
}
|
||||
|
||||
err := log.Write(destpath)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
|
||||
func main() {
|
||||
switch kingpin.Parse() {
|
||||
case "import":
|
||||
|
@ -195,6 +225,8 @@ func main() {
|
|||
case "inspect":
|
||||
cmd_inspect_do(*cmd_inspect_fpath)
|
||||
case "slice":
|
||||
cmd_slice_do(*cmd_slice_src, *cmd_slice_dest, *cmd_slice_start, *cmd_slice_end, *cmd_slice_raw)
|
||||
case "split":
|
||||
// TODO
|
||||
}
|
||||
}
|
||||
|
|
|
@ -11,6 +11,11 @@ import (
|
|||
"sort"
|
||||
)
|
||||
|
||||
|
||||
const ARCHTIMEFMT string = "20060102"
|
||||
const ARCHTIMEFMT2 string = "2006-01-02"
|
||||
|
||||
|
||||
type errorString struct { // TODO "trivial implementation of error"
|
||||
s string
|
||||
}
|
||||
|
@ -19,9 +24,13 @@ func (e *errorString) Error() string {
|
|||
}
|
||||
|
||||
func ParseDate(datestr string) (time.Time) {
|
||||
thetime, err := time.Parse("20060102", datestr)
|
||||
thetime, err := time.Parse(ARCHTIMEFMT, datestr)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
thetime, err := time.Parse(ARCHTIMEFMT2, datestr)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return thetime
|
||||
}
|
||||
return thetime
|
||||
}
|
||||
|
@ -98,7 +107,7 @@ func (self *CombinedLogfile) Write(destpath string) (error) {
|
|||
func (self *CombinedLogfile) ConvertMetaToJson(meta PortionMeta) string {
|
||||
jmeta := JsonPortionMeta{
|
||||
Channel: meta.Channel,
|
||||
Date: meta.Date.Format("20060102"),
|
||||
Date: meta.Date.Format(ARCHTIMEFMT),
|
||||
Lines: meta.Lines,
|
||||
Name: meta.Name,
|
||||
Network: meta.Network,
|
||||
|
@ -245,10 +254,19 @@ func (self *CombinedLogfile) GetRange() (time.Time, time.Time, error) {
|
|||
return self.portions[0].meta.Date, self.portions[len(self.portions)-1].meta.Date, nil
|
||||
}
|
||||
|
||||
// Exclude portions based on before/after some date
|
||||
func (self *CombinedLogfile) Limit(when time.Time, before bool) {
|
||||
b := self.portions[:0] // https://github.com/golang/go/wiki/SliceTricks#filtering-without-allocating
|
||||
for _, x := range self.portions {
|
||||
if before && (when.Before(x.meta.Date) || when == x.meta.Date) {
|
||||
b = append(b, x)
|
||||
} else if !before && (!when.Before(x.meta.Date) || when == x.meta.Date) {
|
||||
b = append(b, x)
|
||||
}
|
||||
}
|
||||
self.portions = b
|
||||
}
|
||||
|
||||
func (self *CombinedLogfile) GetSpans() {
|
||||
// TODO return slice of (start, end) time ranges present in the archive
|
||||
}
|
||||
|
||||
func (self *CombinedLogfile) Limit(start time.Time, end time.Time) {
|
||||
// TODO drop all portions older or younger than
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue