Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add pmtiles cluster command #207

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ var cli struct {

Cluster struct {
Input string `arg:"" help:"Input archive" type:"existingfile"`
NoDeduplication bool `help:"Don't attempt to deduplicate tiles"`
} `cmd:"" help:"Cluster an unclustered local archive" hidden:""`

Edit struct {
Expand Down Expand Up @@ -178,6 +179,11 @@ func main() {
if err != nil {
logger.Fatalf("Failed to extract, %v", err)
}
case "cluster <input>":
err := pmtiles.Cluster(logger, cli.Cluster.Input, !cli.Cluster.NoDeduplication)
if err != nil {
logger.Fatalf("Failed to cluster, %v", err)
}
case "convert <input> <output>":
path := cli.Convert.Input
output := cli.Convert.Output
Expand Down
75 changes: 75 additions & 0 deletions pmtiles/cluster.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package pmtiles

import (
"fmt"
"bytes"
"encoding/json"
"compress/gzip"
"github.com/schollz/progressbar/v3"
"io"
"log"
"os"
)

func Cluster(logger *log.Logger, InputPMTiles string, deduplicate bool) error {
file, _ := os.OpenFile(InputPMTiles, os.O_RDONLY, 0666)
defer file.Close()

buf := make([]byte, 127)
_, _ = file.Read(buf)

header, _ := deserializeHeader(buf)

if (header.Clustered) {
return fmt.Errorf("Archive is already clustered.")
}

fmt.Println("total directory size", header.RootLength + header.LeafDirectoryLength)

metadataReader := io.NewSectionReader(file, int64(header.MetadataOffset), int64(header.MetadataLength))
var metadataBytes []byte
if header.InternalCompression == Gzip {
r, _ := gzip.NewReader(metadataReader)
metadataBytes, _ = io.ReadAll(r)
} else {
metadataBytes, _ = io.ReadAll(metadataReader)
}
var parsedMetadata map[string]interface{}
_ = json.Unmarshal(metadataBytes, &parsedMetadata)

var CollectEntries func(uint64, uint64, func(EntryV3))

CollectEntries = func(dir_offset uint64, dir_length uint64, f func(EntryV3)) {
data, _ := io.ReadAll(io.NewSectionReader(file, int64(dir_offset), int64(dir_length)))

directory := deserializeEntries(bytes.NewBuffer(data))
for _, entry := range directory {
if entry.RunLength > 0 {
f(entry)
} else {
CollectEntries(header.LeafDirectoryOffset+entry.Offset, uint64(entry.Length), f)
}
}
}

resolver := newResolver(deduplicate, false)
tmpfile, _ := os.CreateTemp("", "pmtiles")

bar := progressbar.Default(int64(header.TileEntriesCount))

CollectEntries(header.RootOffset, header.RootLength, func(e EntryV3) {
data, _ := io.ReadAll(io.NewSectionReader(file, int64(header.TileDataOffset + e.Offset), int64(e.Length)))
if isNew, newData := resolver.AddTileIsNew(e.TileID, data, e.RunLength); isNew {
tmpfile.Write(newData)
}
bar.Add(1)
});

header.Clustered = true
newHeader, err := finalize(logger, resolver, header, tmpfile, "output.pmtiles", parsedMetadata)
if err != nil {
return err
}
fmt.Printf("total directory size %d (%f%% of original)\n", newHeader.RootLength + newHeader.LeafDirectoryLength, float64(newHeader.RootLength + newHeader.LeafDirectoryLength)/float64(header.RootLength + header.LeafDirectoryLength)*100)
return nil
}
1 change: 1 addition & 0 deletions pmtiles/cluster_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
package pmtiles
38 changes: 19 additions & 19 deletions pmtiles/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func (r *resolver) NumContents() uint64 {
}

// must be called in increasing tile_id order, uniquely
func (r *resolver) AddTileIsNew(tileID uint64, data []byte) (bool, []byte) {
func (r *resolver) AddTileIsNew(tileID uint64, data []byte, runLength uint32) (bool, []byte) {
r.AddressedTiles++
var found offsetLen
var ok bool
Expand All @@ -64,12 +64,12 @@ func (r *resolver) AddTileIsNew(tileID uint64, data []byte) (bool, []byte) {
lastEntry := r.Entries[len(r.Entries)-1]
if tileID == lastEntry.TileID+uint64(lastEntry.RunLength) && lastEntry.Offset == found.Offset {
// RLE
if lastEntry.RunLength+1 > math.MaxUint32 {
if lastEntry.RunLength+runLength > math.MaxUint32 {
panic("Maximum 32-bit run length exceeded")
}
r.Entries[len(r.Entries)-1].RunLength++
r.Entries[len(r.Entries)-1].RunLength+=runLength
} else {
r.Entries = append(r.Entries, EntryV3{tileID, found.Offset, found.Length, 1})
r.Entries = append(r.Entries, EntryV3{tileID, found.Offset, found.Length, runLength})
}

return false, nil
Expand All @@ -89,7 +89,7 @@ func (r *resolver) AddTileIsNew(tileID uint64, data []byte) (bool, []byte) {
if r.deduplicate {
r.OffsetMap[sumString] = offsetLen{r.Offset, uint32(len(newData))}
}
r.Entries = append(r.Entries, EntryV3{tileID, r.Offset, uint32(len(newData)), 1})
r.Entries = append(r.Entries, EntryV3{tileID, r.Offset, uint32(len(newData)), runLength})
r.Offset += uint64(len(newData))
return true, newData
}
Expand Down Expand Up @@ -205,7 +205,7 @@ func convertPmtilesV2(logger *log.Logger, input string, output string, deduplica
}
}
// TODO: enforce sorted order
if isNew, newData := resolve.AddTileIsNew(entry.TileID, buf); isNew {
if isNew, newData := resolve.AddTileIsNew(entry.TileID, buf, 1); isNew {
_, err = tmpfile.Write(newData)
if err != nil {
return fmt.Errorf("Failed to write to tempfile, %w", err)
Expand All @@ -214,7 +214,7 @@ func convertPmtilesV2(logger *log.Logger, input string, output string, deduplica
bar.Add(1)
}

err = finalize(logger, resolve, header, tmpfile, output, jsonMetadata)
_, err = finalize(logger, resolve, header, tmpfile, output, jsonMetadata)
if err != nil {
return err
}
Expand Down Expand Up @@ -325,7 +325,7 @@ func convertMbtiles(logger *log.Logger, input string, output string, deduplicate
data := rawTileTmp.Bytes()

if len(data) > 0 {
if isNew, newData := resolve.AddTileIsNew(id, data); isNew {
if isNew, newData := resolve.AddTileIsNew(id, data, 1); isNew {
_, err := tmpfile.Write(newData)
if err != nil {
return fmt.Errorf("Failed to write to tempfile: %s", err)
Expand All @@ -338,15 +338,15 @@ func convertMbtiles(logger *log.Logger, input string, output string, deduplicate
bar.Add(1)
}
}
err = finalize(logger, resolve, header, tmpfile, output, jsonMetadata)
_, err = finalize(logger, resolve, header, tmpfile, output, jsonMetadata)
if err != nil {
return err
}
logger.Println("Finished in ", time.Since(start))
return nil
}

func finalize(logger *log.Logger, resolve *resolver, header HeaderV3, tmpfile *os.File, output string, jsonMetadata map[string]interface{}) error {
func finalize(logger *log.Logger, resolve *resolver, header HeaderV3, tmpfile *os.File, output string, jsonMetadata map[string]interface{}) (HeaderV3, error) {
logger.Println("# of addressed tiles: ", resolve.AddressedTiles)
logger.Println("# of tile entries (after RLE): ", len(resolve.Entries))
logger.Println("# of tile contents: ", resolve.NumContents())
Expand All @@ -358,7 +358,7 @@ func finalize(logger *log.Logger, resolve *resolver, header HeaderV3, tmpfile *o
// assemble the final file
outfile, err := os.Create(output)
if err != nil {
return fmt.Errorf("Failed to create %s, %w", output, err)
return header, fmt.Errorf("Failed to create %s, %w", output, err)
}

rootBytes, leavesBytes, numLeaves := optimizeDirectories(resolve.Entries, 16384-HeaderV3LenBytes)
Expand All @@ -379,7 +379,7 @@ func finalize(logger *log.Logger, resolve *resolver, header HeaderV3, tmpfile *o
{
metadataBytesUncompressed, err := json.Marshal(jsonMetadata)
if err != nil {
return fmt.Errorf("Failed to marshal metadata, %w", err)
return header, fmt.Errorf("Failed to marshal metadata, %w", err)
}
var b bytes.Buffer
w, _ := gzip.NewWriterLevel(&b, gzip.BestCompression)
Expand Down Expand Up @@ -409,30 +409,30 @@ func finalize(logger *log.Logger, resolve *resolver, header HeaderV3, tmpfile *o

_, err = outfile.Write(headerBytes)
if err != nil {
return fmt.Errorf("Failed to write header to outfile, %w", err)
return header, fmt.Errorf("Failed to write header to outfile, %w", err)
}
_, err = outfile.Write(rootBytes)
if err != nil {
return fmt.Errorf("Failed to write header to outfile, %w", err)
return header, fmt.Errorf("Failed to write header to outfile, %w", err)
}
_, err = outfile.Write(metadataBytes)
if err != nil {
return fmt.Errorf("Failed to write header to outfile, %w", err)
return header, fmt.Errorf("Failed to write header to outfile, %w", err)
}
_, err = outfile.Write(leavesBytes)
if err != nil {
return fmt.Errorf("Failed to write header to outfile, %w", err)
return header, fmt.Errorf("Failed to write header to outfile, %w", err)
}
_, err = tmpfile.Seek(0, 0)
if err != nil {
return fmt.Errorf("Failed to seek to start of tempfile, %w", err)
return header, fmt.Errorf("Failed to seek to start of tempfile, %w", err)
}
_, err = io.Copy(outfile, tmpfile)
if err != nil {
return fmt.Errorf("Failed to copy data to outfile, %w", err)
return header, fmt.Errorf("Failed to copy data to outfile, %w", err)
}

return nil
return header, nil
}

func v2ToHeaderJSON(v2JsonMetadata map[string]interface{}, first4 []byte) (HeaderV3, map[string]interface{}, error) {
Expand Down
Loading