mirror of
https://github.com/offen/docker-volume-backup.git
synced 2026-01-09 09:02:38 +01:00
Compare commits
6 Commits
v2.36.0-al
...
v2.36.0-al
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
316a7a6ab1 | ||
|
|
7f2139143c | ||
|
|
174f85fabd | ||
|
|
7af5fb41a7 | ||
|
|
cfa5f073d2 | ||
|
|
d0f3f41fe7 |
305
cmd/backup/docker.go
Normal file
305
cmd/backup/docker.go
Normal file
@@ -0,0 +1,305 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/docker/cli/cli/command/service/progress"
|
||||
"github.com/docker/docker/api/types"
|
||||
ctr "github.com/docker/docker/api/types/container"
|
||||
"github.com/docker/docker/api/types/filters"
|
||||
"github.com/docker/docker/api/types/swarm"
|
||||
"github.com/docker/docker/client"
|
||||
)
|
||||
|
||||
func scaleService(cli *client.Client, serviceID string, replicas uint64) ([]string, error) {
|
||||
service, _, err := cli.ServiceInspectWithRaw(context.Background(), serviceID, types.ServiceInspectOptions{})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("scaleService: error inspecting service %s: %w", serviceID, err)
|
||||
}
|
||||
serviceMode := &service.Spec.Mode
|
||||
switch {
|
||||
case serviceMode.Replicated != nil:
|
||||
serviceMode.Replicated.Replicas = &replicas
|
||||
default:
|
||||
return nil, fmt.Errorf("scaleService: service to be scaled %s has to be in replicated mode", service.Spec.Name)
|
||||
}
|
||||
|
||||
response, err := cli.ServiceUpdate(context.Background(), service.ID, service.Version, service.Spec, types.ServiceUpdateOptions{})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("scaleService: error updating service: %w", err)
|
||||
}
|
||||
|
||||
discardWriter := &noopWriteCloser{io.Discard}
|
||||
if err := progress.ServiceProgress(context.Background(), cli, service.ID, discardWriter); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return response.Warnings, nil
|
||||
}
|
||||
|
||||
func awaitContainerCountForService(cli *client.Client, serviceID string, count int) error {
|
||||
poll := time.NewTicker(time.Second)
|
||||
timeout := time.NewTicker(5 * time.Minute)
|
||||
defer timeout.Stop()
|
||||
defer poll.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-timeout.C:
|
||||
return fmt.Errorf(
|
||||
"awaitContainerCount: timed out after waiting 5 minutes for service %s to reach desired container count of %d",
|
||||
serviceID,
|
||||
count,
|
||||
)
|
||||
case <-poll.C:
|
||||
containers, err := cli.ContainerList(context.Background(), types.ContainerListOptions{
|
||||
Filters: filters.NewArgs(filters.KeyValuePair{
|
||||
Key: "label",
|
||||
Value: fmt.Sprintf("com.docker.swarm.service.id=%s", serviceID),
|
||||
}),
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("awaitContainerCount: error listing containers: %w", err)
|
||||
}
|
||||
if len(containers) == count {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// stopContainersAndServices stops all Docker containers that are marked as to being
|
||||
// stopped during the backup and returns a function that can be called to
|
||||
// restart everything that has been stopped.
|
||||
func (s *script) stopContainersAndServices() (func() error, error) {
|
||||
if s.cli == nil {
|
||||
return noop, nil
|
||||
}
|
||||
|
||||
dockerInfo, err := s.cli.Info(context.Background())
|
||||
if err != nil {
|
||||
return noop, fmt.Errorf("(*script).stopContainersAndServices: error getting docker info: %w", err)
|
||||
}
|
||||
isDockerSwarm := dockerInfo.Swarm.LocalNodeState != "inactive"
|
||||
|
||||
filterMatchLabel := fmt.Sprintf(
|
||||
"docker-volume-backup.stop-during-backup=%s",
|
||||
s.c.BackupStopContainerLabel,
|
||||
)
|
||||
|
||||
allContainers, err := s.cli.ContainerList(context.Background(), types.ContainerListOptions{})
|
||||
if err != nil {
|
||||
return noop, fmt.Errorf("(*script).stopContainersAndServices: error querying for containers: %w", err)
|
||||
}
|
||||
containersToStop, err := s.cli.ContainerList(context.Background(), types.ContainerListOptions{
|
||||
Filters: filters.NewArgs(filters.KeyValuePair{
|
||||
Key: "label",
|
||||
Value: filterMatchLabel,
|
||||
}),
|
||||
})
|
||||
if err != nil {
|
||||
return noop, fmt.Errorf("(*script).stopContainersAndServices: error querying for containers to stop: %w", err)
|
||||
}
|
||||
|
||||
var allServices []swarm.Service
|
||||
var servicesToScaleDown []handledSwarmService
|
||||
if isDockerSwarm {
|
||||
allServices, err = s.cli.ServiceList(context.Background(), types.ServiceListOptions{})
|
||||
if err != nil {
|
||||
return noop, fmt.Errorf("(*script).stopContainersAndServices: error querying for services: %w", err)
|
||||
}
|
||||
matchingServices, err := s.cli.ServiceList(context.Background(), types.ServiceListOptions{
|
||||
Filters: filters.NewArgs(filters.KeyValuePair{
|
||||
Key: "label",
|
||||
Value: filterMatchLabel,
|
||||
}),
|
||||
Status: true,
|
||||
})
|
||||
for _, s := range matchingServices {
|
||||
servicesToScaleDown = append(servicesToScaleDown, handledSwarmService{
|
||||
serviceID: s.ID,
|
||||
initialReplicaCount: *s.Spec.Mode.Replicated.Replicas,
|
||||
})
|
||||
}
|
||||
if err != nil {
|
||||
return noop, fmt.Errorf("(*script).stopContainersAndServices: error querying for services to scale down: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
if len(containersToStop) == 0 && len(servicesToScaleDown) == 0 {
|
||||
return noop, nil
|
||||
}
|
||||
|
||||
if isDockerSwarm {
|
||||
for _, container := range containersToStop {
|
||||
if swarmServiceID, ok := container.Labels["com.docker.swarm.service.id"]; ok {
|
||||
parentService, _, err := s.cli.ServiceInspectWithRaw(context.Background(), swarmServiceID, types.ServiceInspectOptions{})
|
||||
if err != nil {
|
||||
return noop, fmt.Errorf("(*script).stopContainersAndServices: error querying for parent service with ID %s: %w", swarmServiceID, err)
|
||||
}
|
||||
for label := range parentService.Spec.Labels {
|
||||
if label == "docker-volume-backup.stop-during-backup" {
|
||||
return noop, fmt.Errorf(
|
||||
"(*script).stopContainersAndServices: container %s is labeled to stop but has parent service %s which is also labeled, cannot continue",
|
||||
container.Names[0],
|
||||
parentService.Spec.Name,
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
s.logger.Info(
|
||||
fmt.Sprintf(
|
||||
"Stopping %d out of %d running container(s) and scaling down %d out of %d active service(s) as they were labeled %s.",
|
||||
len(containersToStop),
|
||||
len(allContainers),
|
||||
len(servicesToScaleDown),
|
||||
len(allServices),
|
||||
filterMatchLabel,
|
||||
),
|
||||
)
|
||||
|
||||
var stoppedContainers []types.Container
|
||||
var stopErrors []error
|
||||
for _, container := range containersToStop {
|
||||
if err := s.cli.ContainerStop(context.Background(), container.ID, ctr.StopOptions{}); err != nil {
|
||||
stopErrors = append(stopErrors, err)
|
||||
} else {
|
||||
stoppedContainers = append(stoppedContainers, container)
|
||||
}
|
||||
}
|
||||
|
||||
var scaledDownServices []handledSwarmService
|
||||
var scaleDownErrors concurrentSlice[error]
|
||||
if isDockerSwarm {
|
||||
wg := sync.WaitGroup{}
|
||||
for _, svc := range servicesToScaleDown {
|
||||
wg.Add(1)
|
||||
go func(svc handledSwarmService) {
|
||||
defer wg.Done()
|
||||
warnings, err := scaleService(s.cli, svc.serviceID, 0)
|
||||
if err != nil {
|
||||
scaleDownErrors.append(err)
|
||||
} else {
|
||||
scaledDownServices = append(scaledDownServices, svc)
|
||||
}
|
||||
for _, warning := range warnings {
|
||||
s.logger.Warn(
|
||||
fmt.Sprintf("The Docker API returned a warning when scaling down service %s: %s", svc.serviceID, warning),
|
||||
)
|
||||
}
|
||||
// progress.ServiceProgress returns too early, so we need to manually check
|
||||
// whether all containers belonging to the service have actually been removed
|
||||
if err := awaitContainerCountForService(s.cli, svc.serviceID, 0); err != nil {
|
||||
scaleDownErrors.append(err)
|
||||
}
|
||||
}(svc)
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
s.stats.Containers = ContainersStats{
|
||||
All: uint(len(allContainers)),
|
||||
ToStop: uint(len(containersToStop)),
|
||||
Stopped: uint(len(stoppedContainers)),
|
||||
StopErrors: uint(len(stopErrors)),
|
||||
}
|
||||
|
||||
s.stats.Services = ServicesStats{
|
||||
All: uint(len(allServices)),
|
||||
ToScaleDown: uint(len(servicesToScaleDown)),
|
||||
ScaledDown: uint(len(scaledDownServices)),
|
||||
ScaleDownErrors: uint(len(scaleDownErrors.value())),
|
||||
}
|
||||
|
||||
var initialErr error
|
||||
allErrors := append(stopErrors, scaleDownErrors.value()...)
|
||||
if len(allErrors) != 0 {
|
||||
initialErr = fmt.Errorf(
|
||||
"(*script).stopContainersAndServices: %d error(s) stopping containers: %w",
|
||||
len(allErrors),
|
||||
errors.Join(allErrors...),
|
||||
)
|
||||
}
|
||||
|
||||
return func() error {
|
||||
var restartErrors []error
|
||||
matchedServices := map[string]bool{}
|
||||
for _, container := range stoppedContainers {
|
||||
if swarmServiceID, ok := container.Labels["com.docker.swarm.service.id"]; ok && isDockerSwarm {
|
||||
if _, ok := matchedServices[swarmServiceID]; ok {
|
||||
continue
|
||||
}
|
||||
matchedServices[swarmServiceID] = true
|
||||
// in case a container was part of a swarm service, the service requires to
|
||||
// be force updated instead of restarting the container as it would otherwise
|
||||
// remain in a "completed" state
|
||||
service, _, err := s.cli.ServiceInspectWithRaw(context.Background(), swarmServiceID, types.ServiceInspectOptions{})
|
||||
if err != nil {
|
||||
restartErrors = append(
|
||||
restartErrors,
|
||||
fmt.Errorf("(*script).stopContainersAndServices: error looking up parent service: %w", err),
|
||||
)
|
||||
continue
|
||||
}
|
||||
service.Spec.TaskTemplate.ForceUpdate += 1
|
||||
if _, err := s.cli.ServiceUpdate(
|
||||
context.Background(), service.ID,
|
||||
service.Version, service.Spec, types.ServiceUpdateOptions{},
|
||||
); err != nil {
|
||||
restartErrors = append(restartErrors, err)
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
if err := s.cli.ContainerStart(context.Background(), container.ID, types.ContainerStartOptions{}); err != nil {
|
||||
restartErrors = append(restartErrors, err)
|
||||
}
|
||||
}
|
||||
|
||||
var scaleUpErrors concurrentSlice[error]
|
||||
if isDockerSwarm {
|
||||
wg := &sync.WaitGroup{}
|
||||
for _, svc := range servicesToScaleDown {
|
||||
wg.Add(1)
|
||||
go func(svc handledSwarmService) {
|
||||
defer wg.Done()
|
||||
warnings, err := scaleService(s.cli, svc.serviceID, svc.initialReplicaCount)
|
||||
if err != nil {
|
||||
scaleDownErrors.append(err)
|
||||
return
|
||||
}
|
||||
for _, warning := range warnings {
|
||||
s.logger.Warn(
|
||||
fmt.Sprintf("The Docker API returned a warning when scaling up service %s: %s", svc.serviceID, warning),
|
||||
)
|
||||
}
|
||||
}(svc)
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
allErrors := append(restartErrors, scaleUpErrors.value()...)
|
||||
if len(allErrors) != 0 {
|
||||
return fmt.Errorf(
|
||||
"(*script).stopContainersAndServices: %d error(s) restarting containers and services: %w",
|
||||
len(allErrors),
|
||||
errors.Join(allErrors...),
|
||||
)
|
||||
}
|
||||
s.logger.Info(
|
||||
fmt.Sprintf(
|
||||
"Restarted %d container(s) and %d service(s).",
|
||||
len(stoppedContainers),
|
||||
len(scaledDownServices),
|
||||
),
|
||||
)
|
||||
return nil
|
||||
}, initialErr
|
||||
}
|
||||
@@ -5,8 +5,6 @@ package main
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/fs"
|
||||
@@ -30,11 +28,6 @@ import (
|
||||
openpgp "github.com/ProtonMail/go-crypto/openpgp/v2"
|
||||
"github.com/containrrr/shoutrrr"
|
||||
"github.com/containrrr/shoutrrr/pkg/router"
|
||||
"github.com/docker/cli/cli/command/service/progress"
|
||||
"github.com/docker/docker/api/types"
|
||||
ctr "github.com/docker/docker/api/types/container"
|
||||
"github.com/docker/docker/api/types/filters"
|
||||
"github.com/docker/docker/api/types/swarm"
|
||||
"github.com/docker/docker/client"
|
||||
"github.com/leekchan/timeutil"
|
||||
"github.com/offen/envconfig"
|
||||
@@ -319,283 +312,6 @@ func newScript() (*script, error) {
|
||||
return s, nil
|
||||
}
|
||||
|
||||
type noopWriteCloser struct {
|
||||
io.Writer
|
||||
}
|
||||
|
||||
func (noopWriteCloser) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
type handledSwarmService struct {
|
||||
serviceID string
|
||||
initialReplicaCount uint64
|
||||
}
|
||||
|
||||
// stopContainersAndServices stops all Docker containers that are marked as to being
|
||||
// stopped during the backup and returns a function that can be called to
|
||||
// restart everything that has been stopped.
|
||||
func (s *script) stopContainersAndServices() (func() error, error) {
|
||||
if s.cli == nil {
|
||||
return noop, nil
|
||||
}
|
||||
|
||||
dockerInfo, err := s.cli.Info(context.Background())
|
||||
if err != nil {
|
||||
return noop, fmt.Errorf("(*script).stopContainersAndServices: error getting docker info: %w", err)
|
||||
}
|
||||
isDockerSwarm := dockerInfo.Swarm.LocalNodeState != "inactive"
|
||||
discardWriter := &noopWriteCloser{io.Discard}
|
||||
|
||||
filterMatchLabel := fmt.Sprintf(
|
||||
"docker-volume-backup.stop-during-backup=%s",
|
||||
s.c.BackupStopContainerLabel,
|
||||
)
|
||||
|
||||
allContainers, err := s.cli.ContainerList(context.Background(), types.ContainerListOptions{})
|
||||
if err != nil {
|
||||
return noop, fmt.Errorf("(*script).stopContainersAndServices: error querying for containers: %w", err)
|
||||
}
|
||||
containersToStop, err := s.cli.ContainerList(context.Background(), types.ContainerListOptions{
|
||||
Filters: filters.NewArgs(filters.KeyValuePair{
|
||||
Key: "label",
|
||||
Value: filterMatchLabel,
|
||||
}),
|
||||
})
|
||||
if err != nil {
|
||||
return noop, fmt.Errorf("(*script).stopContainersAndServices: error querying for containers to stop: %w", err)
|
||||
}
|
||||
|
||||
var allServices []swarm.Service
|
||||
var servicesToScaleDown []handledSwarmService
|
||||
if isDockerSwarm {
|
||||
allServices, err = s.cli.ServiceList(context.Background(), types.ServiceListOptions{})
|
||||
if err != nil {
|
||||
return noop, fmt.Errorf("(*script).stopContainersAndServices: error querying for services: %w", err)
|
||||
}
|
||||
matchingServices, err := s.cli.ServiceList(context.Background(), types.ServiceListOptions{
|
||||
Filters: filters.NewArgs(filters.KeyValuePair{
|
||||
Key: "label",
|
||||
Value: filterMatchLabel,
|
||||
}),
|
||||
Status: true,
|
||||
})
|
||||
for _, s := range matchingServices {
|
||||
servicesToScaleDown = append(servicesToScaleDown, handledSwarmService{
|
||||
serviceID: s.ID,
|
||||
initialReplicaCount: *s.Spec.Mode.Replicated.Replicas,
|
||||
})
|
||||
}
|
||||
if err != nil {
|
||||
return noop, fmt.Errorf("(*script).stopContainersAndServices: error querying for services to scale down: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
if len(containersToStop) == 0 && len(servicesToScaleDown) == 0 {
|
||||
return noop, nil
|
||||
}
|
||||
|
||||
if isDockerSwarm {
|
||||
for _, container := range containersToStop {
|
||||
if swarmServiceID, ok := container.Labels["com.docker.swarm.service.id"]; ok {
|
||||
parentService, _, err := s.cli.ServiceInspectWithRaw(context.Background(), swarmServiceID, types.ServiceInspectOptions{})
|
||||
if err != nil {
|
||||
return noop, fmt.Errorf("(*script).stopContainersAndServices: error querying for parent service with ID %s: %w", swarmServiceID, err)
|
||||
}
|
||||
for label := range parentService.Spec.Labels {
|
||||
if label == "docker-volume-backup.stop-during-backup" {
|
||||
return noop, fmt.Errorf(
|
||||
"(*script).stopContainersAndServices: container %s is labeled to stop but has parent service %s which is also labeled, cannot continue",
|
||||
container.Names[0],
|
||||
parentService.Spec.Name,
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
s.logger.Info(
|
||||
fmt.Sprintf(
|
||||
"Stopping %d out of %d running container(s) and scaling down %d out of %d active service(s) as they were labeled %s.",
|
||||
len(containersToStop),
|
||||
len(allContainers),
|
||||
len(servicesToScaleDown),
|
||||
len(allServices),
|
||||
filterMatchLabel,
|
||||
),
|
||||
)
|
||||
|
||||
var stoppedContainers []types.Container
|
||||
var stopErrors []error
|
||||
for _, container := range containersToStop {
|
||||
if err := s.cli.ContainerStop(context.Background(), container.ID, ctr.StopOptions{}); err != nil {
|
||||
stopErrors = append(stopErrors, err)
|
||||
} else {
|
||||
stoppedContainers = append(stoppedContainers, container)
|
||||
}
|
||||
}
|
||||
|
||||
var scaledDownServices []swarm.Service
|
||||
var scaleDownErrors []error
|
||||
if isDockerSwarm {
|
||||
for _, svc := range servicesToScaleDown {
|
||||
service, _, err := s.cli.ServiceInspectWithRaw(context.Background(), svc.serviceID, types.ServiceInspectOptions{})
|
||||
if err != nil {
|
||||
scaleDownErrors = append(
|
||||
scaleDownErrors,
|
||||
fmt.Errorf("(*script).stopContainersAndServices: error inspecting service %s: %w", svc.serviceID, err),
|
||||
)
|
||||
continue
|
||||
}
|
||||
var zero uint64 = 0
|
||||
serviceMode := &service.Spec.Mode
|
||||
switch {
|
||||
case serviceMode.Replicated != nil:
|
||||
serviceMode.Replicated.Replicas = &zero
|
||||
default:
|
||||
scaleDownErrors = append(
|
||||
scaleDownErrors,
|
||||
fmt.Errorf("(*script).stopContainersAndServices: labeled service %s has to be in replicated mode", service.Spec.Name),
|
||||
)
|
||||
continue
|
||||
}
|
||||
|
||||
response, err := s.cli.ServiceUpdate(context.Background(), service.ID, service.Version, service.Spec, types.ServiceUpdateOptions{})
|
||||
if err != nil {
|
||||
scaleDownErrors = append(scaleDownErrors, err)
|
||||
continue
|
||||
}
|
||||
|
||||
for _, warning := range response.Warnings {
|
||||
s.logger.Warn(
|
||||
fmt.Sprintf("The Docker API returned a warning when scaling down service %s: %s", service.Spec.Name, warning),
|
||||
)
|
||||
}
|
||||
|
||||
if err := progress.ServiceProgress(context.Background(), s.cli, service.ID, discardWriter); err != nil {
|
||||
scaleDownErrors = append(scaleDownErrors, err)
|
||||
} else {
|
||||
scaledDownServices = append(scaledDownServices, service)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
s.stats.Containers = ContainersStats{
|
||||
All: uint(len(allContainers)),
|
||||
ToStop: uint(len(containersToStop)),
|
||||
Stopped: uint(len(stoppedContainers)),
|
||||
StopErrors: uint(len(stopErrors)),
|
||||
}
|
||||
|
||||
s.stats.Services = ServicesStats{
|
||||
All: uint(len(allServices)),
|
||||
ToScaleDown: uint(len(servicesToScaleDown)),
|
||||
ScaledDown: uint(len(scaledDownServices)),
|
||||
ScaleDownErrors: uint(len(scaleDownErrors)),
|
||||
}
|
||||
|
||||
var initialErr error
|
||||
allErrors := append(stopErrors, scaleDownErrors...)
|
||||
if len(allErrors) != 0 {
|
||||
initialErr = fmt.Errorf(
|
||||
"(*script).stopContainersAndServices: %d error(s) stopping containers: %w",
|
||||
len(allErrors),
|
||||
errors.Join(allErrors...),
|
||||
)
|
||||
}
|
||||
|
||||
return func() error {
|
||||
servicesRequiringForceUpdate := map[string]struct{}{}
|
||||
|
||||
var restartErrors []error
|
||||
for _, container := range stoppedContainers {
|
||||
if swarmServiceName, ok := container.Labels["com.docker.swarm.service.name"]; ok {
|
||||
servicesRequiringForceUpdate[swarmServiceName] = struct{}{}
|
||||
continue
|
||||
}
|
||||
if err := s.cli.ContainerStart(context.Background(), container.ID, types.ContainerStartOptions{}); err != nil {
|
||||
restartErrors = append(restartErrors, err)
|
||||
}
|
||||
}
|
||||
|
||||
if len(servicesRequiringForceUpdate) != 0 {
|
||||
services, _ := s.cli.ServiceList(context.Background(), types.ServiceListOptions{})
|
||||
for serviceName := range servicesRequiringForceUpdate {
|
||||
var serviceMatch swarm.Service
|
||||
for _, service := range services {
|
||||
if service.Spec.Name == serviceName {
|
||||
serviceMatch = service
|
||||
break
|
||||
}
|
||||
}
|
||||
if serviceMatch.ID == "" {
|
||||
restartErrors = append(
|
||||
restartErrors,
|
||||
fmt.Errorf("(*script).stopContainersAndServices: couldn't find service with name %s", serviceName),
|
||||
)
|
||||
continue
|
||||
}
|
||||
serviceMatch.Spec.TaskTemplate.ForceUpdate += 1
|
||||
if _, err := s.cli.ServiceUpdate(
|
||||
context.Background(), serviceMatch.ID,
|
||||
serviceMatch.Version, serviceMatch.Spec, types.ServiceUpdateOptions{},
|
||||
); err != nil {
|
||||
restartErrors = append(restartErrors, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
var scaleUpErrors []error
|
||||
if isDockerSwarm {
|
||||
for _, svc := range servicesToScaleDown {
|
||||
service, _, err := s.cli.ServiceInspectWithRaw(context.Background(), svc.serviceID, types.ServiceInspectOptions{})
|
||||
if err != nil {
|
||||
scaleUpErrors = append(scaleUpErrors, err)
|
||||
continue
|
||||
}
|
||||
|
||||
service.Spec.Mode.Replicated.Replicas = &svc.initialReplicaCount
|
||||
response, err := s.cli.ServiceUpdate(
|
||||
context.Background(),
|
||||
service.ID,
|
||||
service.Version, service.Spec,
|
||||
types.ServiceUpdateOptions{},
|
||||
)
|
||||
if err != nil {
|
||||
scaleUpErrors = append(scaleUpErrors, err)
|
||||
continue
|
||||
}
|
||||
for _, warning := range response.Warnings {
|
||||
s.logger.Warn(
|
||||
fmt.Sprintf("The Docker API returned a warning when scaling up service %s: %s", service.Spec.Name, warning),
|
||||
)
|
||||
}
|
||||
if err := progress.ServiceProgress(context.Background(), s.cli, service.ID, discardWriter); err != nil {
|
||||
scaleUpErrors = append(scaleUpErrors, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
allErrors := append(restartErrors, scaleUpErrors...)
|
||||
if len(allErrors) != 0 {
|
||||
return fmt.Errorf(
|
||||
"stopContainers: %d error(s) restarting containers and services: %w",
|
||||
len(allErrors),
|
||||
errors.Join(allErrors...),
|
||||
)
|
||||
}
|
||||
s.logger.Info(
|
||||
fmt.Sprintf(
|
||||
"Restarted %d container(s) and %d service(s).",
|
||||
len(stoppedContainers),
|
||||
len(scaledDownServices),
|
||||
),
|
||||
)
|
||||
return nil
|
||||
}, initialErr
|
||||
}
|
||||
|
||||
// createArchive creates a tar archive of the configured backup location and
|
||||
// saves it to disk.
|
||||
func (s *script) createArchive() error {
|
||||
|
||||
@@ -8,6 +8,7 @@ import (
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"sync"
|
||||
)
|
||||
|
||||
var noop = func() error { return nil }
|
||||
@@ -50,3 +51,31 @@ func (b *bufferingWriter) Write(p []byte) (n int, err error) {
|
||||
}
|
||||
return b.writer.Write(p)
|
||||
}
|
||||
|
||||
type noopWriteCloser struct {
|
||||
io.Writer
|
||||
}
|
||||
|
||||
func (noopWriteCloser) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
type handledSwarmService struct {
|
||||
serviceID string
|
||||
initialReplicaCount uint64
|
||||
}
|
||||
|
||||
type concurrentSlice[T any] struct {
|
||||
val []T
|
||||
sync.Mutex
|
||||
}
|
||||
|
||||
func (c *concurrentSlice[T]) append(v T) {
|
||||
c.Lock()
|
||||
defer c.Unlock()
|
||||
c.val = append(c.val, v)
|
||||
}
|
||||
|
||||
func (c *concurrentSlice[T]) value() []T {
|
||||
return c.val
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user