Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: influx_tools export parquet #25253

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/influx_tools/help/help.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ Usage: influx-tools command [arguments]
The commands are:

export reshapes existing shards to a new shard duration
export-parquet export existing shards to Parquet files
compact-shard fully compacts the specified shard
gen-init creates database and retention policy metadata
gen-exec generates data
Expand Down
6 changes: 6 additions & 0 deletions cmd/influx_tools/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
geninit "github.com/influxdata/influxdb/cmd/influx_tools/generate/init"
"github.com/influxdata/influxdb/cmd/influx_tools/help"
"github.com/influxdata/influxdb/cmd/influx_tools/importer"
"github.com/influxdata/influxdb/cmd/influx_tools/parquet"
"github.com/influxdata/influxdb/cmd/influx_tools/server"
"github.com/influxdata/influxdb/cmd/influxd/run"
"github.com/influxdata/influxdb/services/meta"
Expand Down Expand Up @@ -67,6 +68,11 @@ func (m *Main) Run(args ...string) error {
if err := c.Run(args); err != nil {
return fmt.Errorf("export failed: %s", err)
}
case "export-parquet":
c := parquet.NewCommand(&ossServer{logger: zap.NewNop()})
if err := c.Run(args); err != nil {
return fmt.Errorf("export-parquet failed: %s", err)
}
case "import":
c := importer.NewCommand(&ossServer{logger: zap.NewNop()})
if err := c.Run(args); err != nil {
Expand Down
48 changes: 48 additions & 0 deletions cmd/influx_tools/parquet/bucket.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package parquet

import (
"time"

"github.com/influxdata/influxdb/services/meta"
)

func makeShardGroupsForDuration(min, max time.Time, d time.Duration) meta.ShardGroupInfos {
start := min.Truncate(d).UTC()
end := max.Truncate(d).Add(d).UTC()

groups := make(meta.ShardGroupInfos, end.Sub(start)/d)
var i uint64
for start.Before(end) {
groups[i] = meta.ShardGroupInfo{
ID: i,
StartTime: start,
EndTime: start.Add(d),
}
i++
start = start.Add(d)
}
return groups[:i]
}

// PlanShardGroups creates a new ShardGroup set using a shard group duration of d, for the time spanning min to max.
func planShardGroups(sourceShards []meta.ShardGroupInfo, min, max time.Time, d time.Duration) meta.ShardGroupInfos {
groups := makeShardGroupsForDuration(min, max, d)
var target []meta.ShardGroupInfo
for i := 0; i < len(groups); i++ {
g := groups[i]
// NOTE: EndTime.Add(-1) matches the Contains interval of [start, end)
if hasShardsGroupForTimeRange(sourceShards, g.StartTime, g.EndTime.Add(-1)) {
target = append(target, g)
}
}
return target
}

func hasShardsGroupForTimeRange(groups []meta.ShardGroupInfo, min, max time.Time) bool {
for _, g := range groups {
if g.Overlaps(min, max) {
return true
}
}
return false
}
184 changes: 184 additions & 0 deletions cmd/influx_tools/parquet/command.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
package parquet

import (
"errors"
"flag"
"fmt"
"io"
"math"
"os"
"strconv"
"strings"
"time"

"github.com/influxdata/influxdb/cmd/influx_tools/internal/format/binary"
"github.com/influxdata/influxdb/cmd/influx_tools/internal/format/line"
export2 "github.com/influxdata/influxdb/cmd/influx_tools/parquet/exporter"
"github.com/influxdata/influxdb/cmd/influx_tools/server"
"go.uber.org/zap"
)

var (
_ line.Writer
_ binary.Writer
)

// Command represents the program execution for "store query".
type Command struct {
// Standard input/output, overridden for testing.
Stderr io.Writer
Stdout io.Writer
Logger *zap.Logger
server server.Interface

conflicts io.WriteCloser

configPath string
database string
rp string
measurement string
shardDuration time.Duration
format string
r rangeValue
conflictPath string
ignore bool
print bool
}

// NewCommand returns a new instance of the export Command.
func NewCommand(server server.Interface) *Command {
return &Command{
Stderr: os.Stderr,
Stdout: os.Stdout,
server: server,
}
}

// Run executes the export command using the specified args.
func (cmd *Command) Run(args []string) (err error) {
err = cmd.parseFlags(args)
if err != nil {
return err
}

err = cmd.server.Open(cmd.configPath)
if err != nil {
return err
}
defer cmd.server.Close()

e, err := cmd.openExporter()
if err != nil {
return err
}
defer e.Close()

e.PrintPlan(cmd.Stderr)

if cmd.print {
return nil
}

e.exporter = export2.New(100000000)

return e.WriteTo()
}

func (cmd *Command) openExporter() (*exporter, error) {
cfg := &exporterConfig{Database: cmd.database, RP: cmd.rp, Measurement: cmd.measurement, ShardDuration: cmd.shardDuration, Min: cmd.r.Min(), Max: cmd.r.Max()}
e, err := newExporter(cmd.server, cfg)
if err != nil {
return nil, err
}

return e, e.Open()
}

func (cmd *Command) parseFlags(args []string) error {
fs := flag.NewFlagSet("export", flag.ContinueOnError)
fs.StringVar(&cmd.configPath, "config", "", "Config file")
fs.StringVar(&cmd.database, "database", "", "Database name")
fs.StringVar(&cmd.rp, "rp", "", "Retention policy name")
fs.StringVar(&cmd.measurement, "measurement", "", "Measurement name")
fs.StringVar(&cmd.format, "format", "line", "Output format (line, binary)")
fs.StringVar(&cmd.conflictPath, "conflict-path", "", "File name for writing field conflicts using line protocol and gzipped")
fs.BoolVar(&cmd.ignore, "no-conflict-path", false, "Disable writing field conflicts to a file")
fs.Var(&cmd.r, "range", "Range of target shards to export (default: all)")
fs.BoolVar(&cmd.print, "print-only", false, "Print plan to stderr and exit")
fs.DurationVar(&cmd.shardDuration, "duration", time.Hour*24*7, "Target shard duration")

if err := fs.Parse(args); err != nil {
return err
}

if cmd.database == "" {
return errors.New("database is required")
}

switch cmd.format {
case "line", "binary", "series", "values", "discard":
default:
return fmt.Errorf("invalid format '%s'", cmd.format)
}

if cmd.conflictPath == "" && !cmd.ignore {
return errors.New("missing conflict-path")
}

return nil
}

type rangeValue struct {
min, max uint64
set bool
}

func (rv *rangeValue) Min() uint64 { return rv.min }

func (rv *rangeValue) Max() uint64 {
if !rv.set {
return math.MaxUint64
}
return rv.max
}

func (rv *rangeValue) String() string {
if rv.Min() == rv.Max() {
return fmt.Sprint(rv.min)
}
return fmt.Sprintf("[%d,%d]", rv.Min(), rv.Max())
}

func (rv *rangeValue) Set(v string) (err error) {
p := strings.Split(v, "-")
switch {
case len(p) == 1:
rv.min, err = strconv.ParseUint(p[0], 10, 64)
if err != nil {
return fmt.Errorf("range error: invalid number %s", v)
}
rv.max = rv.min
case len(p) == 2:
rv.min, err = strconv.ParseUint(p[0], 10, 64)
if err != nil {
return fmt.Errorf("range error: min value %q is not a positive number", p[0])
}
rv.max = math.MaxUint64
if len(p[1]) > 0 {
rv.max, err = strconv.ParseUint(p[1], 10, 64)
if err != nil {
return fmt.Errorf("range error: max value %q is not empty or a positive number", p[1])
}
}
default:
return fmt.Errorf("range error: %q is not a valid range", v)
}

if rv.min > rv.max {
return errors.New("range error: min > max")
}

rv.set = true

return nil
}
Loading
Loading