Added abstract helper interface for all storage backends (#135)

* Added abstract helper interface and implemented it for all storage backends

* Moved storage client initializations also to helper classes

* Fixed ssh init issue

* Moved script parameter to helper struct to simplify script init.

* Created sub modules. Enhanced abstract implementation.

* Fixed config issue

* Fixed declaration issues. Added config to interface.

* Added StorageProviders to unify all backends.

* Cleanup, optimizations, comments.

* Applied discussed changes. See description.

Moved modules to internal packages.
Replaced StoragePool with slice.
Moved conditional for init of storage backends back to script.

* Fix docker build issue

* Fixed accidentally removed local copy condition.

* Delete .gitignore

* Renaming/changes according to review

Renamed Init functions and interface.
Replaced config object with specific config values.
Init func returns interface instead of struct.
Removed custom import names where possible.

* Fixed auto-complete error.

* Combined copy instructions into one layer.

* Added logging func for storages.

* Introduced logging func for errors too.

* Missed an error message

* Moved config back to main. Optimized prune stats handling.

* Move stats back to main package

* Code doc stuff

* Apply changes from #136

* Replace name field with function.

* Changed receiver names from stg to b.

* Renamed LogFuncDef to Log

* Removed redundant package name.

* Renamed storagePool to storages.

* Simplified creation of new storage backend.

* Added initialization for storage stats map.

* Invert .dockerignore patterns.

* Fix package typo
This commit is contained in:
MaxJa4
2022-08-18 08:52:09 +02:00
committed by Frederik Ring
parent 4ec88d14dd
commit 279844ccfb
14 changed files with 740 additions and 438 deletions

View File

@@ -5,19 +5,22 @@ package main
import (
"context"
"errors"
"fmt"
"io"
"io/fs"
"io/ioutil"
"net/http"
"os"
"path"
"path/filepath"
"strings"
"text/template"
"time"
"github.com/offen/docker-volume-backup/internal/storage"
"github.com/offen/docker-volume-backup/internal/storage/local"
"github.com/offen/docker-volume-backup/internal/storage/s3"
"github.com/offen/docker-volume-backup/internal/storage/ssh"
"github.com/offen/docker-volume-backup/internal/storage/webdav"
"github.com/offen/docker-volume-backup/internal/utilities"
"github.com/containrrr/shoutrrr"
"github.com/containrrr/shoutrrr/pkg/router"
"github.com/docker/docker/api/types"
@@ -26,29 +29,21 @@ import (
"github.com/docker/docker/client"
"github.com/kelseyhightower/envconfig"
"github.com/leekchan/timeutil"
"github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/credentials"
"github.com/otiai10/copy"
"github.com/pkg/sftp"
"github.com/sirupsen/logrus"
"github.com/studio-b12/gowebdav"
"golang.org/x/crypto/openpgp"
"golang.org/x/crypto/ssh"
)
// script holds all the stateful information required to orchestrate a
// single backup run.
type script struct {
cli *client.Client
minioClient *minio.Client
webdavClient *gowebdav.Client
sshClient *ssh.Client
sftpClient *sftp.Client
logger *logrus.Logger
sender *router.ServiceRouter
template *template.Template
hooks []hook
hookLevel hookLevel
cli *client.Client
storages []storage.Backend
logger *logrus.Logger
sender *router.ServiceRouter
template *template.Template
hooks []hook
hookLevel hookLevel
file string
stats *Stats
@@ -63,7 +58,7 @@ type script struct {
// reading from env vars or other configuration sources is expected to happen
// in this method.
func newScript() (*script, error) {
stdOut, logBuffer := buffer(os.Stdout)
stdOut, logBuffer := utilities.Buffer(os.Stdout)
s := &script{
c: &Config{},
logger: &logrus.Logger{
@@ -75,7 +70,12 @@ func newScript() (*script, error) {
stats: &Stats{
StartTime: time.Now(),
LogOutput: logBuffer,
Storages: StoragesStats{},
Storages: map[string]StorageStats{
"S3": {},
"WebDav": {},
"SSH": {},
"Local": {},
},
},
}
@@ -107,114 +107,56 @@ func newScript() (*script, error) {
s.cli = cli
}
logFunc := func(logType storage.LogType, context string, msg string, params ...interface{}) error {
var allParams []interface{}
allParams = append(allParams, context)
allParams = append(allParams, params...)
switch logType {
case storage.INFO:
s.logger.Infof("[%s] "+msg, allParams...)
return nil
case storage.WARNING:
s.logger.Warnf("[%s] "+msg, allParams...)
return nil
case storage.ERROR:
return fmt.Errorf("[%s] "+msg, allParams...)
default:
s.logger.Warnf("[%s] "+msg, allParams...)
return nil
}
}
if s.c.AwsS3BucketName != "" {
var creds *credentials.Credentials
if s.c.AwsAccessKeyID != "" && s.c.AwsSecretAccessKey != "" {
creds = credentials.NewStaticV4(
s.c.AwsAccessKeyID,
s.c.AwsSecretAccessKey,
"",
)
} else if s.c.AwsIamRoleEndpoint != "" {
creds = credentials.NewIAM(s.c.AwsIamRoleEndpoint)
if s3Backend, err := s3.NewStorageBackend(s.c.AwsEndpoint, s.c.AwsAccessKeyID, s.c.AwsSecretAccessKey, s.c.AwsIamRoleEndpoint,
s.c.AwsEndpointProto, s.c.AwsEndpointInsecure, s.c.AwsS3Path, s.c.AwsS3BucketName, s.c.AwsStorageClass, logFunc); err != nil {
return nil, err
} else {
return nil, errors.New("newScript: AWS_S3_BUCKET_NAME is defined, but no credentials were provided")
s.storages = append(s.storages, s3Backend)
}
options := minio.Options{
Creds: creds,
Secure: s.c.AwsEndpointProto == "https",
}
if s.c.AwsEndpointInsecure {
if !options.Secure {
return nil, errors.New("newScript: AWS_ENDPOINT_INSECURE = true is only meaningful for https")
}
transport, err := minio.DefaultTransport(true)
if err != nil {
return nil, fmt.Errorf("newScript: failed to create default minio transport")
}
transport.TLSClientConfig.InsecureSkipVerify = true
options.Transport = transport
}
mc, err := minio.New(s.c.AwsEndpoint, &options)
if err != nil {
return nil, fmt.Errorf("newScript: error setting up minio client: %w", err)
}
s.minioClient = mc
}
if s.c.WebdavUrl != "" {
if s.c.WebdavUsername == "" || s.c.WebdavPassword == "" {
return nil, errors.New("newScript: WEBDAV_URL is defined, but no credentials were provided")
if webdavBackend, err := webdav.NewStorageBackend(s.c.WebdavUrl, s.c.WebdavPath, s.c.WebdavUsername, s.c.WebdavPassword,
s.c.WebdavUrlInsecure, logFunc); err != nil {
return nil, err
} else {
webdavClient := gowebdav.NewClient(s.c.WebdavUrl, s.c.WebdavUsername, s.c.WebdavPassword)
s.webdavClient = webdavClient
if s.c.WebdavUrlInsecure {
defaultTransport, ok := http.DefaultTransport.(*http.Transport)
if !ok {
return nil, errors.New("newScript: unexpected error when asserting type for http.DefaultTransport")
}
webdavTransport := defaultTransport.Clone()
webdavTransport.TLSClientConfig.InsecureSkipVerify = s.c.WebdavUrlInsecure
s.webdavClient.SetTransport(webdavTransport)
}
s.storages = append(s.storages, webdavBackend)
}
}
if s.c.SSHHostName != "" {
var authMethods []ssh.AuthMethod
if s.c.SSHPassword != "" {
authMethods = append(authMethods, ssh.Password(s.c.SSHPassword))
}
if _, err := os.Stat(s.c.SSHIdentityFile); err == nil {
key, err := ioutil.ReadFile(s.c.SSHIdentityFile)
if err != nil {
return nil, errors.New("newScript: error reading the private key")
}
var signer ssh.Signer
if s.c.SSHIdentityPassphrase != "" {
signer, err = ssh.ParsePrivateKeyWithPassphrase(key, []byte(s.c.SSHIdentityPassphrase))
if err != nil {
return nil, errors.New("newScript: error parsing the encrypted private key")
}
authMethods = append(authMethods, ssh.PublicKeys(signer))
} else {
signer, err = ssh.ParsePrivateKey(key)
if err != nil {
return nil, errors.New("newScript: error parsing the private key")
}
authMethods = append(authMethods, ssh.PublicKeys(signer))
}
}
sshClientConfig := &ssh.ClientConfig{
User: s.c.SSHUser,
Auth: authMethods,
HostKeyCallback: ssh.InsecureIgnoreHostKey(),
}
sshClient, err := ssh.Dial("tcp", fmt.Sprintf("%s:%s", s.c.SSHHostName, s.c.SSHPort), sshClientConfig)
s.sshClient = sshClient
if err != nil {
return nil, fmt.Errorf("newScript: error creating ssh client: %w", err)
}
_, _, err = s.sshClient.SendRequest("keepalive", false, nil)
if err != nil {
if sshBackend, err := ssh.NewStorageBackend(s.c.SSHHostName, s.c.SSHPort, s.c.SSHUser, s.c.SSHPassword, s.c.SSHIdentityFile,
s.c.SSHIdentityPassphrase, s.c.SSHRemotePath, logFunc); err != nil {
return nil, err
}
sftpClient, err := sftp.NewClient(sshClient)
s.sftpClient = sftpClient
if err != nil {
return nil, fmt.Errorf("newScript: error creating sftp client: %w", err)
} else {
s.storages = append(s.storages, sshBackend)
}
}
localBackend := local.NewStorageBackend(s.c.BackupArchive, s.c.BackupLatestSymlink, logFunc)
s.storages = append(s.storages, localBackend)
if s.c.EmailNotificationRecipient != "" {
emailURL := fmt.Sprintf(
"smtp://%s:%s@%s:%d/?from=%s&to=%s",
@@ -286,14 +228,14 @@ func newScript() (*script, error) {
// restart everything that has been stopped.
func (s *script) stopContainers() (func() error, error) {
if s.cli == nil {
return noop, nil
return utilities.Noop, nil
}
allContainers, err := s.cli.ContainerList(context.Background(), types.ContainerListOptions{
Quiet: true,
})
if err != nil {
return noop, fmt.Errorf("stopContainersAndRun: error querying for containers: %w", err)
return utilities.Noop, fmt.Errorf("stopContainersAndRun: error querying for containers: %w", err)
}
containerLabel := fmt.Sprintf(
@@ -309,11 +251,11 @@ func (s *script) stopContainers() (func() error, error) {
})
if err != nil {
return noop, fmt.Errorf("stopContainersAndRun: error querying for containers to stop: %w", err)
return utilities.Noop, fmt.Errorf("stopContainersAndRun: error querying for containers to stop: %w", err)
}
if len(containersToStop) == 0 {
return noop, nil
return utilities.Noop, nil
}
s.logger.Infof(
@@ -338,7 +280,7 @@ func (s *script) stopContainers() (func() error, error) {
stopError = fmt.Errorf(
"stopContainersAndRun: %d error(s) stopping containers: %w",
len(stopErrors),
join(stopErrors...),
utilities.Join(stopErrors...),
)
}
@@ -389,7 +331,7 @@ func (s *script) stopContainers() (func() error, error) {
return fmt.Errorf(
"stopContainersAndRun: %d error(s) restarting containers and services: %w",
len(restartErrors),
join(restartErrors...),
utilities.Join(restartErrors...),
)
}
s.logger.Infof(
@@ -415,7 +357,7 @@ func (s *script) createArchive() error {
backupSources = filepath.Join("/tmp", s.c.BackupSources)
// copy before compressing guard against a situation where backup folder's content are still growing.
s.registerHook(hookLevelPlumbing, func(error) error {
if err := remove(backupSources); err != nil {
if err := utilities.Remove(backupSources); err != nil {
return fmt.Errorf("takeBackup: error removing snapshot: %w", err)
}
s.logger.Infof("Removed snapshot `%s`.", backupSources)
@@ -432,7 +374,7 @@ func (s *script) createArchive() error {
tarFile := s.file
s.registerHook(hookLevelPlumbing, func(error) error {
if err := remove(tarFile); err != nil {
if err := utilities.Remove(tarFile); err != nil {
return fmt.Errorf("takeBackup: error removing tar file: %w", err)
}
s.logger.Infof("Removed tar file `%s`.", tarFile)
@@ -477,7 +419,7 @@ func (s *script) encryptArchive() error {
gpgFile := fmt.Sprintf("%s.gpg", s.file)
s.registerHook(hookLevelPlumbing, func(error) error {
if err := remove(gpgFile); err != nil {
if err := utilities.Remove(gpgFile); err != nil {
return fmt.Errorf("encryptBackup: error removing gpg file: %w", err)
}
s.logger.Infof("Removed GPG file `%s`.", gpgFile)
@@ -485,20 +427,20 @@ func (s *script) encryptArchive() error {
})
outFile, err := os.Create(gpgFile)
defer outFile.Close()
if err != nil {
return fmt.Errorf("encryptBackup: error opening out file: %w", err)
}
defer outFile.Close()
_, name := path.Split(s.file)
dst, err := openpgp.SymmetricallyEncrypt(outFile, []byte(s.c.GpgPassphrase), &openpgp.FileHints{
IsBinary: true,
FileName: name,
}, nil)
defer dst.Close()
if err != nil {
return fmt.Errorf("encryptBackup: error encrypting backup file: %w", err)
}
defer dst.Close()
src, err := os.Open(s.file)
if err != nil {
@@ -529,93 +471,12 @@ func (s *script) copyArchive() error {
}
}
if s.minioClient != nil {
if _, err := s.minioClient.FPutObject(context.Background(), s.c.AwsS3BucketName, filepath.Join(s.c.AwsS3Path, name), s.file, minio.PutObjectOptions{
ContentType: "application/tar+gzip",
StorageClass: s.c.AwsStorageClass,
}); err != nil {
errResp := minio.ToErrorResponse(err)
return fmt.Errorf("copyBackup: error uploading backup to remote storage: [Message]: '%s', [Code]: %s, [StatusCode]: %d", errResp.Message, errResp.Code, errResp.StatusCode)
}
s.logger.Infof("Uploaded a copy of backup `%s` to bucket `%s`.", s.file, s.c.AwsS3BucketName)
}
if s.webdavClient != nil {
bytes, err := os.ReadFile(s.file)
if err != nil {
return fmt.Errorf("copyBackup: error reading the file to be uploaded: %w", err)
}
if err := s.webdavClient.MkdirAll(s.c.WebdavPath, 0644); err != nil {
return fmt.Errorf("copyBackup: error creating directory '%s' on WebDAV server: %w", s.c.WebdavPath, err)
}
if err := s.webdavClient.Write(filepath.Join(s.c.WebdavPath, name), bytes, 0644); err != nil {
return fmt.Errorf("copyBackup: error uploading the file to WebDAV server: %w", err)
}
s.logger.Infof("Uploaded a copy of backup `%s` to WebDAV-URL '%s' at path '%s'.", s.file, s.c.WebdavUrl, s.c.WebdavPath)
}
if s.sshClient != nil {
source, err := os.Open(s.file)
if err != nil {
return fmt.Errorf("copyBackup: error reading the file to be uploaded: %w", err)
}
defer source.Close()
destination, err := s.sftpClient.Create(filepath.Join(s.c.SSHRemotePath, name))
if err != nil {
return fmt.Errorf("copyBackup: error creating file on SSH storage: %w", err)
}
defer destination.Close()
chunk := make([]byte, 1000000)
for {
num, err := source.Read(chunk)
if err == io.EOF {
tot, err := destination.Write(chunk[:num])
if err != nil {
return fmt.Errorf("copyBackup: error uploading the file to SSH storage: %w", err)
}
if tot != len(chunk[:num]) {
return fmt.Errorf("sshClient: failed to write stream")
}
break
}
if err != nil {
return fmt.Errorf("copyBackup: error uploading the file to SSH storage: %w", err)
}
tot, err := destination.Write(chunk[:num])
if err != nil {
return fmt.Errorf("copyBackup: error uploading the file to SSH storage: %w", err)
}
if tot != len(chunk[:num]) {
return fmt.Errorf("sshClient: failed to write stream")
}
}
s.logger.Infof("Uploaded a copy of backup `%s` to SSH storage '%s' at path '%s'.", s.file, s.c.SSHHostName, s.c.SSHRemotePath)
}
if _, err := os.Stat(s.c.BackupArchive); !os.IsNotExist(err) {
if err := copyFile(s.file, path.Join(s.c.BackupArchive, name)); err != nil {
return fmt.Errorf("copyBackup: error copying file to local archive: %w", err)
}
s.logger.Infof("Stored copy of backup `%s` in local archive `%s`.", s.file, s.c.BackupArchive)
if s.c.BackupLatestSymlink != "" {
symlink := path.Join(s.c.BackupArchive, s.c.BackupLatestSymlink)
if _, err := os.Lstat(symlink); err == nil {
os.Remove(symlink)
}
if err := os.Symlink(name, symlink); err != nil {
return fmt.Errorf("copyBackup: error creating latest symlink: %w", err)
}
s.logger.Infof("Created/Updated symlink `%s` for latest backup.", s.c.BackupLatestSymlink)
for _, backend := range s.storages {
if err := backend.Copy(s.file); err != nil {
return err
}
}
return nil
}
@@ -629,208 +490,18 @@ func (s *script) pruneBackups() error {
deadline := time.Now().AddDate(0, 0, -int(s.c.BackupRetentionDays)).Add(s.c.BackupPruningLeeway)
// doPrune holds general control flow that applies to any kind of storage.
// Callers can pass in a thunk that performs the actual deletion of files.
var doPrune = func(lenMatches, lenCandidates int, description string, doRemoveFiles func() error) error {
if lenMatches != 0 && lenMatches != lenCandidates {
if err := doRemoveFiles(); err != nil {
return err
for _, backend := range s.storages {
if stats, err := backend.Prune(deadline, s.c.BackupPruningPrefix); err == nil {
s.stats.Storages[backend.Name()] = StorageStats{
Total: stats.Total,
Pruned: stats.Pruned,
}
s.logger.Infof(
"Pruned %d out of %d %s as their age exceeded the configured retention period of %d days.",
lenMatches,
lenCandidates,
description,
s.c.BackupRetentionDays,
)
} else if lenMatches != 0 && lenMatches == lenCandidates {
s.logger.Warnf("The current configuration would delete all %d existing %s.", lenMatches, description)
s.logger.Warn("Refusing to do so, please check your configuration.")
} else {
s.logger.Infof("None of %d existing %s were pruned.", lenCandidates, description)
return err
}
return nil
}
if s.minioClient != nil {
candidates := s.minioClient.ListObjects(context.Background(), s.c.AwsS3BucketName, minio.ListObjectsOptions{
WithMetadata: true,
Prefix: filepath.Join(s.c.AwsS3Path, s.c.BackupPruningPrefix),
Recursive: true,
})
var matches []minio.ObjectInfo
var lenCandidates int
for candidate := range candidates {
lenCandidates++
if candidate.Err != nil {
return fmt.Errorf(
"pruneBackups: error looking up candidates from remote storage: %w",
candidate.Err,
)
}
if candidate.LastModified.Before(deadline) {
matches = append(matches, candidate)
}
}
s.stats.Storages.S3 = StorageStats{
Total: uint(lenCandidates),
Pruned: uint(len(matches)),
}
doPrune(len(matches), lenCandidates, "remote backup(s)", func() error {
objectsCh := make(chan minio.ObjectInfo)
go func() {
for _, match := range matches {
objectsCh <- match
}
close(objectsCh)
}()
errChan := s.minioClient.RemoveObjects(context.Background(), s.c.AwsS3BucketName, objectsCh, minio.RemoveObjectsOptions{})
var removeErrors []error
for result := range errChan {
if result.Err != nil {
removeErrors = append(removeErrors, result.Err)
}
}
if len(removeErrors) != 0 {
return join(removeErrors...)
}
return nil
})
}
if s.webdavClient != nil {
candidates, err := s.webdavClient.ReadDir(s.c.WebdavPath)
if err != nil {
return fmt.Errorf("pruneBackups: error looking up candidates from remote storage: %w", err)
}
var matches []fs.FileInfo
var lenCandidates int
for _, candidate := range candidates {
if !strings.HasPrefix(candidate.Name(), s.c.BackupPruningPrefix) {
continue
}
lenCandidates++
if candidate.ModTime().Before(deadline) {
matches = append(matches, candidate)
}
}
s.stats.Storages.WebDAV = StorageStats{
Total: uint(lenCandidates),
Pruned: uint(len(matches)),
}
doPrune(len(matches), lenCandidates, "WebDAV backup(s)", func() error {
for _, match := range matches {
if err := s.webdavClient.Remove(filepath.Join(s.c.WebdavPath, match.Name())); err != nil {
return fmt.Errorf("pruneBackups: error removing file from WebDAV storage: %w", err)
}
}
return nil
})
}
if s.sshClient != nil {
candidates, err := s.sftpClient.ReadDir(s.c.SSHRemotePath)
if err != nil {
return fmt.Errorf("pruneBackups: error reading directory from SSH storage: %w", err)
}
var matches []string
for _, candidate := range candidates {
if !strings.HasPrefix(candidate.Name(), s.c.BackupPruningPrefix) {
continue
}
if candidate.ModTime().Before(deadline) {
matches = append(matches, candidate.Name())
}
}
s.stats.Storages.SSH = StorageStats{
Total: uint(len(candidates)),
Pruned: uint(len(matches)),
}
doPrune(len(matches), len(candidates), "SSH backup(s)", func() error {
for _, match := range matches {
if err := s.sftpClient.Remove(filepath.Join(s.c.SSHRemotePath, match)); err != nil {
return fmt.Errorf("pruneBackups: error removing file from SSH storage: %w", err)
}
}
return nil
})
}
if _, err := os.Stat(s.c.BackupArchive); !os.IsNotExist(err) {
globPattern := path.Join(
s.c.BackupArchive,
fmt.Sprintf("%s*", s.c.BackupPruningPrefix),
)
globMatches, err := filepath.Glob(globPattern)
if err != nil {
return fmt.Errorf(
"pruneBackups: error looking up matching files using pattern %s: %w",
globPattern,
err,
)
}
var candidates []string
for _, candidate := range globMatches {
fi, err := os.Lstat(candidate)
if err != nil {
return fmt.Errorf(
"pruneBackups: error calling Lstat on file %s: %w",
candidate,
err,
)
}
if fi.Mode()&os.ModeSymlink != os.ModeSymlink {
candidates = append(candidates, candidate)
}
}
var matches []string
for _, candidate := range candidates {
fi, err := os.Stat(candidate)
if err != nil {
return fmt.Errorf(
"pruneBackups: error calling stat on file %s: %w",
candidate,
err,
)
}
if fi.ModTime().Before(deadline) {
matches = append(matches, candidate)
}
}
s.stats.Storages.Local = StorageStats{
Total: uint(len(candidates)),
Pruned: uint(len(matches)),
}
doPrune(len(matches), len(candidates), "local backup(s)", func() error {
var removeErrors []error
for _, match := range matches {
if err := os.Remove(match); err != nil {
removeErrors = append(removeErrors, err)
}
}
if len(removeErrors) != 0 {
return fmt.Errorf(
"pruneBackups: %d error(s) deleting local files, starting with: %w",
len(removeErrors),
join(removeErrors...),
)
}
return nil
})
}
return nil
}