Replace Gzip with PGzip (#266)

* Replace Gzip with optimized PGzip. Add concurrency option.

* Add shortened timeout for 'dc down' too.

* Add NaturalNumberZero to allow zero.

* Add test for concurrency=0

* Rename to GZIP_PARALLELISM

* Fix block size. Fix compression level. Fix CI.

* Refactor compression writer fetching. Renamed WholeNumber
This commit is contained in:
MaxJa4
2023-09-03 16:49:52 +02:00
committed by GitHub
parent 1e39ac41f4
commit 336c5bed71
8 changed files with 114 additions and 17 deletions

View File

@@ -8,18 +8,20 @@ package main
import (
"archive/tar"
"compress/gzip"
"fmt"
"io"
"os"
"path"
"path/filepath"
"runtime"
"strings"
"github.com/klauspost/pgzip"
"github.com/klauspost/compress/zstd"
)
func createArchive(files []string, inputFilePath, outputFilePath string, compression string) error {
func createArchive(files []string, inputFilePath, outputFilePath string, compression string, compressionConcurrency int) error {
inputFilePath = stripTrailingSlashes(inputFilePath)
inputFilePath, outputFilePath, err := makeAbsolute(inputFilePath, outputFilePath)
if err != nil {
@@ -29,7 +31,7 @@ func createArchive(files []string, inputFilePath, outputFilePath string, compres
return fmt.Errorf("createArchive: error creating output file path: %w", err)
}
if err := compress(files, outputFilePath, filepath.Dir(inputFilePath), compression); err != nil {
if err := compress(files, outputFilePath, filepath.Dir(inputFilePath), compression, compressionConcurrency); err != nil {
return fmt.Errorf("createArchive: error creating archive: %w", err)
}
@@ -53,26 +55,17 @@ func makeAbsolute(inputFilePath, outputFilePath string) (string, string, error)
return inputFilePath, outputFilePath, err
}
func compress(paths []string, outFilePath, subPath string, algo string) error {
func compress(paths []string, outFilePath, subPath string, algo string, concurrency int) error {
file, err := os.Create(outFilePath)
var compressWriter io.WriteCloser
if err != nil {
return fmt.Errorf("compress: error creating out file: %w", err)
}
prefix := path.Dir(outFilePath)
switch algo {
case "gz":
compressWriter = gzip.NewWriter(file)
case "zst":
compressWriter, err = zstd.NewWriter(file)
if err != nil {
return fmt.Errorf("compress: zstd error: %w", err)
}
default:
return fmt.Errorf("compress: unsupported compression algorithm: %s", algo)
compressWriter, err := getCompressionWriter(file, algo, concurrency)
if err != nil {
return fmt.Errorf("compress: error getting compression writer: %w", err)
}
tarWriter := tar.NewWriter(compressWriter)
for _, p := range paths {
@@ -99,6 +92,34 @@ func compress(paths []string, outFilePath, subPath string, algo string) error {
return nil
}
func getCompressionWriter(file *os.File, algo string, concurrency int) (io.WriteCloser, error) {
switch algo {
case "gz":
w, err := pgzip.NewWriterLevel(file, 5)
if err != nil {
return nil, fmt.Errorf("getCompressionWriter: gzip error: %w", err)
}
if concurrency == 0 {
concurrency = runtime.GOMAXPROCS(0)
}
if err := w.SetConcurrency(1<<20, concurrency); err != nil {
return nil, fmt.Errorf("getCompressionWriter: error setting concurrency: %w", err)
}
return w, nil
case "zst":
compressWriter, err := zstd.NewWriter(file)
if err != nil {
return nil, fmt.Errorf("getCompressionWriter: zstd error: %w", err)
}
return compressWriter, nil
default:
return nil, fmt.Errorf("getCompressionWriter: unsupported compression algorithm: %s", algo)
}
}
func writeTarball(path string, tarWriter *tar.Writer, prefix string) error {
fileInfo, err := os.Lstat(path)
if err != nil {

View File

@@ -28,6 +28,7 @@ type Config struct {
AwsIamRoleEndpoint string `split_words:"true"`
AwsPartSize int64 `split_words:"true"`
BackupCompression CompressionType `split_words:"true" default:"gz"`
GzipParallelism WholeNumber `split_words:"true" default:"1"`
BackupSources string `split_words:"true" default:"/backup"`
BackupFilename string `split_words:"true" default:"backup-%Y-%m-%dT%H-%M-%S.{{ .Extension }}"`
BackupFilenameExpand bool `split_words:"true"`
@@ -131,6 +132,7 @@ func (r *RegexpDecoder) Decode(v string) error {
return nil
}
// NaturalNumber is a type that can be used to decode a positive, non-zero natural number
type NaturalNumber int
func (n *NaturalNumber) Decode(v string) error {
@@ -148,3 +150,22 @@ func (n *NaturalNumber) Decode(v string) error {
func (n *NaturalNumber) Int() int {
return int(*n)
}
// WholeNumber is a type that can be used to decode a positive whole number, including zero
type WholeNumber int
func (n *WholeNumber) Decode(v string) error {
asInt, err := strconv.Atoi(v)
if err != nil {
return fmt.Errorf("config: error converting %s to int", v)
}
if asInt < 0 {
return fmt.Errorf("config: expected a whole, positive number, including zero. Got %d", asInt)
}
*n = WholeNumber(asInt)
return nil
}
func (n *WholeNumber) Int() int {
return int(*n)
}

View File

@@ -503,7 +503,7 @@ func (s *script) createArchive() error {
return fmt.Errorf("createArchive: error walking filesystem tree: %w", err)
}
if err := createArchive(filesEligibleForBackup, backupSources, tarFile, s.c.BackupCompression.String()); err != nil {
if err := createArchive(filesEligibleForBackup, backupSources, tarFile, s.c.BackupCompression.String(), s.c.GzipParallelism.Int()); err != nil {
return fmt.Errorf("createArchive: error compressing backup folder: %w", err)
}