Improve Swarm support (#333)

* Query for labeled services as well

* Try scaling down services

* Scale services back up

* Use progress tool from Docker CLI

* In test, label both services

* Clean up error and log messages

* Document scale-up/down approach in docs

* Downgrade Docker CLI to match client

* Document services stats

* Do not rely on PreviousSpec for storing desired replica count

* Log warnings from Docker when updating services

* Check whether container and service labels collide

* Document script behavior on label collision

* Add additional check if all containers have been removed

* Scale services concurrently

* Move docker interaction code into own file

* Factor out code for service updating

* Time out after five minutes of not reaching desired container count

* Inline handling of in-swarm container level restart

* Timer is more suitable for timeout race

* Timeout when scaling down services should be configurable

* Choose better filename

* Reflect changes in naming

* Rename and deprecate BACKUP_STOP_CONTAINER_LABEL

* Improve logging

* Further simplify logging
This commit is contained in:
Frederik Ring
2024-01-31 12:17:41 +01:00
committed by GitHub
parent 2065fb2815
commit c3daeacecb
18 changed files with 640 additions and 145 deletions

View File

@@ -37,7 +37,9 @@ type Config struct {
BackupRetentionDays int32 `split_words:"true" default:"-1"`
BackupPruningLeeway time.Duration `split_words:"true" default:"1m"`
BackupPruningPrefix string `split_words:"true"`
BackupStopContainerLabel string `split_words:"true" default:"true"`
BackupStopContainerLabel string `split_words:"true"`
BackupStopDuringBackupLabel string `split_words:"true" default:"true"`
BackupStopServiceTimeout time.Duration `split_words:"true" default:"5m"`
BackupFromSnapshot bool `split_words:"true"`
BackupExcludeRegexp RegexpDecoder `split_words:"true"`
BackupSkipBackendsFromPrune []string `split_words:"true"`

View File

@@ -47,12 +47,12 @@ func main() {
}()
s.must(s.withLabeledCommands(lifecyclePhaseArchive, func() error {
restartContainers, err := s.stopContainers()
restartContainersAndServices, err := s.stopContainersAndServices()
// The mechanism for restarting containers is not using hooks as it
// should happen as soon as possible (i.e. before uploading backups or
// similar).
defer func() {
s.must(restartContainers())
s.must(restartContainersAndServices())
}()
if err != nil {
return err

View File

@@ -5,8 +5,6 @@ package main
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"io/fs"
@@ -30,10 +28,6 @@ import (
openpgp "github.com/ProtonMail/go-crypto/openpgp/v2"
"github.com/containrrr/shoutrrr"
"github.com/containrrr/shoutrrr/pkg/router"
"github.com/docker/docker/api/types"
ctr "github.com/docker/docker/api/types/container"
"github.com/docker/docker/api/types/filters"
"github.com/docker/docker/api/types/swarm"
"github.com/docker/docker/client"
"github.com/leekchan/timeutil"
"github.com/offen/envconfig"
@@ -318,126 +312,6 @@ func newScript() (*script, error) {
return s, nil
}
// stopContainers stops all Docker containers that are marked as to being
// stopped during the backup and returns a function that can be called to
// restart everything that has been stopped.
func (s *script) stopContainers() (func() error, error) {
if s.cli == nil {
return noop, nil
}
allContainers, err := s.cli.ContainerList(context.Background(), types.ContainerListOptions{})
if err != nil {
return noop, fmt.Errorf("stopContainers: error querying for containers: %w", err)
}
containerLabel := fmt.Sprintf(
"docker-volume-backup.stop-during-backup=%s",
s.c.BackupStopContainerLabel,
)
containersToStop, err := s.cli.ContainerList(context.Background(), types.ContainerListOptions{
Filters: filters.NewArgs(filters.KeyValuePair{
Key: "label",
Value: containerLabel,
}),
})
if err != nil {
return noop, fmt.Errorf("stopContainers: error querying for containers to stop: %w", err)
}
if len(containersToStop) == 0 {
return noop, nil
}
s.logger.Info(
fmt.Sprintf(
"Stopping %d container(s) labeled `%s` out of %d running container(s).",
len(containersToStop),
containerLabel,
len(allContainers),
),
)
var stoppedContainers []types.Container
var stopErrors []error
for _, container := range containersToStop {
if err := s.cli.ContainerStop(context.Background(), container.ID, ctr.StopOptions{}); err != nil {
stopErrors = append(stopErrors, err)
} else {
stoppedContainers = append(stoppedContainers, container)
}
}
var stopError error
if len(stopErrors) != 0 {
stopError = fmt.Errorf(
"stopContainers: %d error(s) stopping containers: %w",
len(stopErrors),
errors.Join(stopErrors...),
)
}
s.stats.Containers = ContainersStats{
All: uint(len(allContainers)),
ToStop: uint(len(containersToStop)),
Stopped: uint(len(stoppedContainers)),
}
return func() error {
servicesRequiringUpdate := map[string]struct{}{}
var restartErrors []error
for _, container := range stoppedContainers {
if swarmServiceName, ok := container.Labels["com.docker.swarm.service.name"]; ok {
servicesRequiringUpdate[swarmServiceName] = struct{}{}
continue
}
if err := s.cli.ContainerStart(context.Background(), container.ID, types.ContainerStartOptions{}); err != nil {
restartErrors = append(restartErrors, err)
}
}
if len(servicesRequiringUpdate) != 0 {
services, _ := s.cli.ServiceList(context.Background(), types.ServiceListOptions{})
for serviceName := range servicesRequiringUpdate {
var serviceMatch swarm.Service
for _, service := range services {
if service.Spec.Name == serviceName {
serviceMatch = service
break
}
}
if serviceMatch.ID == "" {
return fmt.Errorf("stopContainers: couldn't find service with name %s", serviceName)
}
serviceMatch.Spec.TaskTemplate.ForceUpdate += 1
if _, err := s.cli.ServiceUpdate(
context.Background(), serviceMatch.ID,
serviceMatch.Version, serviceMatch.Spec, types.ServiceUpdateOptions{},
); err != nil {
restartErrors = append(restartErrors, err)
}
}
}
if len(restartErrors) != 0 {
return fmt.Errorf(
"stopContainers: %d error(s) restarting containers and services: %w",
len(restartErrors),
errors.Join(restartErrors...),
)
}
s.logger.Info(
fmt.Sprintf(
"Restarted %d container(s) and the matching service(s).",
len(stoppedContainers),
),
)
return nil
}, stopError
}
// createArchive creates a tar archive of the configured backup location and
// saves it to disk.
func (s *script) createArchive() error {
@@ -448,7 +322,7 @@ func (s *script) createArchive() error {
"Using BACKUP_FROM_SNAPSHOT has been deprecated and will be removed in the next major version.",
)
s.logger.Warn(
"Please use `archive-pre` and `archive-post` commands to prepare your backup sources. Refer to the README for an upgrade guide.",
"Please use `archive-pre` and `archive-post` commands to prepare your backup sources. Refer to the documentation for an upgrade guide.",
)
backupSources = filepath.Join("/tmp", s.c.BackupSources)
// copy before compressing guard against a situation where backup folder's content are still growing.

View File

@@ -17,6 +17,15 @@ type ContainersStats struct {
StopErrors uint
}
// ServicesStats contains info about Swarm services that have been
// operated upon
type ServicesStats struct {
All uint
ToScaleDown uint
ScaledDown uint
ScaleDownErrors uint
}
// BackupFileStats stats about the created backup file
type BackupFileStats struct {
Name string
@@ -40,6 +49,7 @@ type Stats struct {
LockedTime time.Duration
LogOutput *bytes.Buffer
Containers ContainersStats
Services ServicesStats
BackupFile BackupFileStats
Storages map[string]StorageStats
}

338
cmd/backup/stop_restart.go Normal file
View File

@@ -0,0 +1,338 @@
package main
import (
"context"
"errors"
"fmt"
"io"
"os"
"sync"
"time"
"github.com/docker/cli/cli/command/service/progress"
"github.com/docker/docker/api/types"
ctr "github.com/docker/docker/api/types/container"
"github.com/docker/docker/api/types/filters"
"github.com/docker/docker/api/types/swarm"
"github.com/docker/docker/client"
)
func scaleService(cli *client.Client, serviceID string, replicas uint64) ([]string, error) {
service, _, err := cli.ServiceInspectWithRaw(context.Background(), serviceID, types.ServiceInspectOptions{})
if err != nil {
return nil, fmt.Errorf("scaleService: error inspecting service %s: %w", serviceID, err)
}
serviceMode := &service.Spec.Mode
switch {
case serviceMode.Replicated != nil:
serviceMode.Replicated.Replicas = &replicas
default:
return nil, fmt.Errorf("scaleService: service to be scaled %s has to be in replicated mode", service.Spec.Name)
}
response, err := cli.ServiceUpdate(context.Background(), service.ID, service.Version, service.Spec, types.ServiceUpdateOptions{})
if err != nil {
return nil, fmt.Errorf("scaleService: error updating service: %w", err)
}
discardWriter := &noopWriteCloser{io.Discard}
if err := progress.ServiceProgress(context.Background(), cli, service.ID, discardWriter); err != nil {
return nil, err
}
return response.Warnings, nil
}
func awaitContainerCountForService(cli *client.Client, serviceID string, count int, timeoutAfter time.Duration) error {
poll := time.NewTicker(time.Second)
timeout := time.NewTimer(timeoutAfter)
defer timeout.Stop()
defer poll.Stop()
for {
select {
case <-timeout.C:
return fmt.Errorf(
"awaitContainerCount: timed out after waiting %s for service %s to reach desired container count of %d",
timeoutAfter,
serviceID,
count,
)
case <-poll.C:
containers, err := cli.ContainerList(context.Background(), types.ContainerListOptions{
Filters: filters.NewArgs(filters.KeyValuePair{
Key: "label",
Value: fmt.Sprintf("com.docker.swarm.service.id=%s", serviceID),
}),
})
if err != nil {
return fmt.Errorf("awaitContainerCount: error listing containers: %w", err)
}
if len(containers) == count {
return nil
}
}
}
}
// stopContainersAndServices stops all Docker containers that are marked as to being
// stopped during the backup and returns a function that can be called to
// restart everything that has been stopped.
func (s *script) stopContainersAndServices() (func() error, error) {
if s.cli == nil {
return noop, nil
}
dockerInfo, err := s.cli.Info(context.Background())
if err != nil {
return noop, fmt.Errorf("(*script).stopContainersAndServices: error getting docker info: %w", err)
}
isDockerSwarm := dockerInfo.Swarm.LocalNodeState != "inactive"
labelValue := s.c.BackupStopDuringBackupLabel
if s.c.BackupStopContainerLabel != "" {
s.logger.Warn(
"Using BACKUP_STOP_CONTAINER_LABEL has been deprecated and will be removed in the next major version.",
)
s.logger.Warn(
"Please use BACKUP_STOP_DURING_BACKUP_LABEL instead. Refer to the docs for an upgrade guide.",
)
if _, ok := os.LookupEnv("BACKUP_STOP_DURING_BACKUP_LABEL"); ok {
return noop, errors.New("(*script).stopContainersAndServices: both BACKUP_STOP_DURING_BACKUP_LABEL and BACKUP_STOP_CONTAINER_LABEL have been set, cannot continue")
}
labelValue = s.c.BackupStopContainerLabel
}
filterMatchLabel := fmt.Sprintf(
"docker-volume-backup.stop-during-backup=%s",
labelValue,
)
allContainers, err := s.cli.ContainerList(context.Background(), types.ContainerListOptions{})
if err != nil {
return noop, fmt.Errorf("(*script).stopContainersAndServices: error querying for containers: %w", err)
}
containersToStop, err := s.cli.ContainerList(context.Background(), types.ContainerListOptions{
Filters: filters.NewArgs(filters.KeyValuePair{
Key: "label",
Value: filterMatchLabel,
}),
})
if err != nil {
return noop, fmt.Errorf("(*script).stopContainersAndServices: error querying for containers to stop: %w", err)
}
var allServices []swarm.Service
var servicesToScaleDown []handledSwarmService
if isDockerSwarm {
allServices, err = s.cli.ServiceList(context.Background(), types.ServiceListOptions{})
if err != nil {
return noop, fmt.Errorf("(*script).stopContainersAndServices: error querying for services: %w", err)
}
matchingServices, err := s.cli.ServiceList(context.Background(), types.ServiceListOptions{
Filters: filters.NewArgs(filters.KeyValuePair{
Key: "label",
Value: filterMatchLabel,
}),
Status: true,
})
for _, s := range matchingServices {
servicesToScaleDown = append(servicesToScaleDown, handledSwarmService{
serviceID: s.ID,
initialReplicaCount: *s.Spec.Mode.Replicated.Replicas,
})
}
if err != nil {
return noop, fmt.Errorf("(*script).stopContainersAndServices: error querying for services to scale down: %w", err)
}
}
if len(containersToStop) == 0 && len(servicesToScaleDown) == 0 {
return noop, nil
}
if isDockerSwarm {
for _, container := range containersToStop {
if swarmServiceID, ok := container.Labels["com.docker.swarm.service.id"]; ok {
parentService, _, err := s.cli.ServiceInspectWithRaw(context.Background(), swarmServiceID, types.ServiceInspectOptions{})
if err != nil {
return noop, fmt.Errorf("(*script).stopContainersAndServices: error querying for parent service with ID %s: %w", swarmServiceID, err)
}
for label := range parentService.Spec.Labels {
if label == "docker-volume-backup.stop-during-backup" {
return noop, fmt.Errorf(
"(*script).stopContainersAndServices: container %s is labeled to stop but has parent service %s which is also labeled, cannot continue",
container.Names[0],
parentService.Spec.Name,
)
}
}
}
}
}
s.logger.Info(
fmt.Sprintf(
"Stopping %d out of %d running container(s) as they were labeled %s.",
len(containersToStop),
len(allContainers),
filterMatchLabel,
),
)
if isDockerSwarm {
s.logger.Info(
fmt.Sprintf(
"Scaling down %d out of %d active service(s) as they were labeled %s.",
len(servicesToScaleDown),
len(allServices),
filterMatchLabel,
),
)
}
var stoppedContainers []types.Container
var stopErrors []error
for _, container := range containersToStop {
if err := s.cli.ContainerStop(context.Background(), container.ID, ctr.StopOptions{}); err != nil {
stopErrors = append(stopErrors, err)
} else {
stoppedContainers = append(stoppedContainers, container)
}
}
var scaledDownServices []handledSwarmService
var scaleDownErrors concurrentSlice[error]
if isDockerSwarm {
wg := sync.WaitGroup{}
for _, svc := range servicesToScaleDown {
wg.Add(1)
go func(svc handledSwarmService) {
defer wg.Done()
warnings, err := scaleService(s.cli, svc.serviceID, 0)
if err != nil {
scaleDownErrors.append(err)
} else {
scaledDownServices = append(scaledDownServices, svc)
}
for _, warning := range warnings {
s.logger.Warn(
fmt.Sprintf("The Docker API returned a warning when scaling down service %s: %s", svc.serviceID, warning),
)
}
// progress.ServiceProgress returns too early, so we need to manually check
// whether all containers belonging to the service have actually been removed
if err := awaitContainerCountForService(s.cli, svc.serviceID, 0, s.c.BackupStopServiceTimeout); err != nil {
scaleDownErrors.append(err)
}
}(svc)
}
wg.Wait()
}
s.stats.Containers = ContainersStats{
All: uint(len(allContainers)),
ToStop: uint(len(containersToStop)),
Stopped: uint(len(stoppedContainers)),
StopErrors: uint(len(stopErrors)),
}
s.stats.Services = ServicesStats{
All: uint(len(allServices)),
ToScaleDown: uint(len(servicesToScaleDown)),
ScaledDown: uint(len(scaledDownServices)),
ScaleDownErrors: uint(len(scaleDownErrors.value())),
}
var initialErr error
allErrors := append(stopErrors, scaleDownErrors.value()...)
if len(allErrors) != 0 {
initialErr = fmt.Errorf(
"(*script).stopContainersAndServices: %d error(s) stopping containers: %w",
len(allErrors),
errors.Join(allErrors...),
)
}
return func() error {
var restartErrors []error
matchedServices := map[string]bool{}
for _, container := range stoppedContainers {
if swarmServiceID, ok := container.Labels["com.docker.swarm.service.id"]; ok && isDockerSwarm {
if _, ok := matchedServices[swarmServiceID]; ok {
continue
}
matchedServices[swarmServiceID] = true
// in case a container was part of a swarm service, the service requires to
// be force updated instead of restarting the container as it would otherwise
// remain in a "completed" state
service, _, err := s.cli.ServiceInspectWithRaw(context.Background(), swarmServiceID, types.ServiceInspectOptions{})
if err != nil {
restartErrors = append(
restartErrors,
fmt.Errorf("(*script).stopContainersAndServices: error looking up parent service: %w", err),
)
continue
}
service.Spec.TaskTemplate.ForceUpdate += 1
if _, err := s.cli.ServiceUpdate(
context.Background(), service.ID,
service.Version, service.Spec, types.ServiceUpdateOptions{},
); err != nil {
restartErrors = append(restartErrors, err)
}
continue
}
if err := s.cli.ContainerStart(context.Background(), container.ID, types.ContainerStartOptions{}); err != nil {
restartErrors = append(restartErrors, err)
}
}
var scaleUpErrors concurrentSlice[error]
if isDockerSwarm {
wg := &sync.WaitGroup{}
for _, svc := range servicesToScaleDown {
wg.Add(1)
go func(svc handledSwarmService) {
defer wg.Done()
warnings, err := scaleService(s.cli, svc.serviceID, svc.initialReplicaCount)
if err != nil {
scaleDownErrors.append(err)
return
}
for _, warning := range warnings {
s.logger.Warn(
fmt.Sprintf("The Docker API returned a warning when scaling up service %s: %s", svc.serviceID, warning),
)
}
}(svc)
}
wg.Wait()
}
allErrors := append(restartErrors, scaleUpErrors.value()...)
if len(allErrors) != 0 {
return fmt.Errorf(
"(*script).stopContainersAndServices: %d error(s) restarting containers and services: %w",
len(allErrors),
errors.Join(allErrors...),
)
}
s.logger.Info(
fmt.Sprintf(
"Restarted %d container(s).",
len(stoppedContainers),
),
)
if isDockerSwarm {
s.logger.Info(
fmt.Sprintf(
"Scaled %d service(s) back up.",
len(scaledDownServices),
),
)
}
return nil
}, initialErr
}

View File

@@ -8,6 +8,7 @@ import (
"fmt"
"io"
"os"
"sync"
)
var noop = func() error { return nil }
@@ -50,3 +51,31 @@ func (b *bufferingWriter) Write(p []byte) (n int, err error) {
}
return b.writer.Write(p)
}
type noopWriteCloser struct {
io.Writer
}
func (noopWriteCloser) Close() error {
return nil
}
type handledSwarmService struct {
serviceID string
initialReplicaCount uint64
}
type concurrentSlice[T any] struct {
val []T
sync.Mutex
}
func (c *concurrentSlice[T]) append(v T) {
c.Lock()
defer c.Unlock()
c.val = append(c.val, v)
}
func (c *concurrentSlice[T]) value() []T {
return c.val
}