update-repos/main.go
Thorsten Schubert 8549f49c17
All checks were successful
/ build (push) Successful in 44s
Use environment variable to determine service
2024-05-07 16:21:50 +02:00

576 lines
14 KiB
Go

// SPDX-License-Identifier: GPL-3.0-or-later
// Copyright © 2023 Thorsten Schubert <tschubert@bafh.org>
package main
import (
"bytes"
"context"
"flag"
"fmt"
"os"
"os/exec"
"os/signal"
"path/filepath"
"runtime"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/rs/zerolog"
"github.com/rs/zerolog/journald"
"github.com/rs/zerolog/log"
"golang.org/x/sys/unix"
"update-repos/internal/repo"
"update-repos/internal/retry"
)
type BackoffType uint
const (
Exponential BackoffType = iota
Fibonacci
Linear
None
)
var backoffMap = map[string]BackoffType{
"exponential": Exponential,
"fibonacci": Fibonacci,
"linear": Linear,
"none": None,
}
var VERSION string
// Implement flag.Value interface
func (b *BackoffType) Set(value string) error {
if val, exists := backoffMap[value]; exists {
*b = val
return nil
}
return fmt.Errorf("invalid backoff strategy: %s", value)
}
func (b *BackoffType) String() string {
for k, v := range backoffMap {
if v == *b {
return k
}
}
return ""
}
func init() {
isSystemd := false
systemdEnv := os.Getenv("SYSTEMD_SERVICE")
isSystemd = (systemdEnv == "yes" || systemdEnv == "true" || systemdEnv == "1")
if isSystemd {
log.Logger = log.Output(zerolog.New(journald.NewJournalDWriter()))
} else {
log.Logger = log.Output(zerolog.ConsoleWriter{
Out: os.Stderr,
TimeFormat: time.RFC3339,
NoColor: !func(fd int) bool {
_, err := unix.IoctlGetTermios(fd, unix.TCGETS)
return err == nil
}(int(os.Stdout.Fd())),
})
}
zerolog.SetGlobalLevel(zerolog.InfoLevel)
if VERSION == "" {
VERSION = "unknown"
}
}
func main() {
c := make(chan os.Signal, 1)
signal.Notify(c, unix.SIGINT)
const maxDelay time.Duration = 60 * time.Second
var (
aliveCheckOnlyFlag bool
atomicFetchFlag bool
verboseFlag bool
versionFlag bool
forceFlag bool
maxDepthFlag uint
maxJobsFlag uint
maxRetriesFlag uint
logLevelFlag string
initialDelayFlag time.Duration
backoffStrategyFlag = Linear
)
flag.UintVar(&maxDepthFlag, "max-depth", 1, "maximum search depth for git repositories")
flag.UintVar(&maxJobsFlag, "max-jobs", uint(runtime.NumCPU()), "maximum number of parallel jobs")
flag.BoolVar(&aliveCheckOnlyFlag, "alive-check", false, "Check if remote is alive, don't update or alter any state")
flag.BoolVar(&atomicFetchFlag, "atomic", false, "Use an atomic transaction to update local refs")
flag.UintVar(&maxRetriesFlag, "max-retries", 5, "maximum number of retries on failure")
flag.DurationVar(&initialDelayFlag, "initial-delay", 500*time.Millisecond, "initial delay for retries")
flag.Var(&backoffStrategyFlag, "backoff-strategy", "back-off strategy for retries (none, linear, fibonacci, exponential)")
flag.BoolVar(&forceFlag, "force", false, "force update on diverging history")
flag.BoolVar(&verboseFlag, "verbose", false, "be verbose")
flag.BoolVar(&verboseFlag, "v", false, "be verbose")
flag.BoolVar(&versionFlag, "version", false, "program version")
flag.StringVar(&logLevelFlag, "log-level", "info", "set log level (debug, info, warn, error, fatal, panic)")
flag.Usage = func() {
fmt.Fprintf(os.Stderr, "Usage: %s [options] [<repo-dir>]\n", os.Args[0])
flag.PrintDefaults()
}
flag.Parse()
nonFlagArgs := flag.Args()
if versionFlag {
fmt.Fprintf(os.Stderr, "Version: %s\n", VERSION)
return
}
if maxJobsFlag == 0 {
maxJobsFlag++
}
// A depth of 1 means, we descent one subdirectory deep, so offset by +1
// since we expect a base directory and need to descend at least 2 levels
maxDepthFlag++
parsedLevel, err := zerolog.ParseLevel(logLevelFlag)
if err != nil {
log.Fatal().Err(err).Msg("Invalid log level")
}
zerolog.SetGlobalLevel(parsedLevel)
var repoBase string
if len(nonFlagArgs) != 0 {
repoBase = nonFlagArgs[0]
} else {
var cwd string
cwd, err = os.Getwd()
if err != nil {
log.Fatal().Str("error", cwd).Msg("Could not get current working directory")
}
repoBase = cwd
log.Debug().Str("cwd", cwd).Msg("Using current directory")
}
repoBase, err = filepath.Abs(repoBase)
if err != nil {
log.Fatal().Err(err).Msg("Invalid directory path")
}
if err := isValidDirectory(repoBase); err != nil {
log.Fatal().Err(err).Msg("Invalid directory path")
}
// One instance of the program for identical paths
lock := acquireFileLock(repoBase)
defer releaseFileLock(lock)
envVars := map[string]string{
"GIT_CONFIG_GLOBAL": "/dev/null",
"GIT_TERMINAL_PROMPT": "0",
"GIT_ASKPASS": "/bin/false",
"GIT_HTTP_MAX_REQUESTS": "2",
"GIT_HTTP_LOW_SPEED_LIMIT": "8192",
"GIT_HTTP_LOW_SPEED_TIME": "5",
"GIT_SSH_COMMAND": "ssh -o BatchMode=yes",
}
// Allow env vars to be overriden by the caller, i.e. the calling shell
setAndExportUnsetEnv(envVars)
var (
failedRepos []*repo.Gitrepo
changedRepos []*repo.Gitrepo
wgRepo sync.WaitGroup
wgResult sync.WaitGroup
once sync.Once
)
cRepoPaths := make(chan string)
cFailed := make(chan *repo.Gitrepo)
cPassed := make(chan *repo.Gitrepo)
ctx, cancel := context.WithCancel(context.Background())
// Handle SIGINT (CTRL-C)
go func() {
<-c
once.Do(func() {
log.Warn().Msg("SIGINT received - shutting down gracefully")
cancel()
})
}()
wgResult.Add(1)
go func() {
defer wgResult.Done()
for {
select {
case repo, ok := <-cFailed:
if !ok {
cFailed = nil
} else {
log.Debug().Str("repo", repo.Name).Msg("Appending failed repo")
failedRepos = append(failedRepos, repo)
}
case repo, ok := <-cPassed:
if !ok {
cPassed = nil
} else {
log.Debug().Str("repo", repo.Name).Msg("Appending passed repo")
changedRepos = append(changedRepos, repo)
}
}
if cFailed == nil && cPassed == nil {
break // exit the loop if both channels are closed
}
}
}()
// No WaitGroup necessary, since we will block on the channel below until this returns
go func(ctx context.Context) {
findGitrepos(ctx, repoBase, maxDepthFlag, cRepoPaths)
close(cRepoPaths)
}(ctx)
jobSem := make(chan struct{}, maxJobsFlag)
var repoCount atomic.Uint32
done:
for {
select {
case <-ctx.Done():
log.Trace().Ctx(ctx).Msg("Updates cancelled")
break done
case discoveredPath, ok := <-cRepoPaths:
if !ok {
break done
}
wgRepo.Add(1)
jobSem <- struct{}{}
go func(cFail, cDone chan<- *repo.Gitrepo) {
defer func() {
wgRepo.Done()
<-jobSem
}()
repoName := filepath.Base(discoveredPath)
git, err := repo.NewGitRepo(filepath.Clean(discoveredPath))
if err != nil {
if repo.IsMissingRemoteError(err) {
log.Warn().Str("path", discoveredPath).Err(err).Msg("Ignoring repo")
} else {
log.Error().Str("path", discoveredPath).Err(err).Send()
}
return
}
repoCount.Add(1)
r := git.(*repo.Gitrepo)
log.Info().Str("repo", repoName).Msg("Processing")
success := true
defer func() {
msg := "Failure"
if success {
msg = "Success"
}
if !success {
cFail <- r
}
log.Debug().Str("repo", repoName).Str("result", msg).Send()
}()
strategy := selectBackoffStrategy(backoffStrategyFlag, initialDelayFlag, maxDelay, maxRetriesFlag+1)
// Alive Check only, do not alter any discovered repositories and return early
if aliveCheckOnlyFlag {
alive := false
if alive, err = isRemoteAliveWithRetry(ctx, r, strategy); alive {
cDone <- r
} else {
// Don't log or alter success state on context cancel
if ctx.Err() != nil {
return
}
success = false
log.Error().Str("repo", repoName).Err(err).Send()
}
return
}
// Get objects before fetching anything for bare repositories
// since we do not have merge output to determine changed refs
var preFetchState []byte
if r.IsBare {
if preFetchState, err = r.GetObjects(); err != nil {
log.Error().Str("repo", repoName).Err(err).Send()
return
}
}
// The actual update
if err = fetchOrUpdateWithRetry(ctx, r, atomicFetchFlag, strategy); err != nil {
if ctx.Err() != nil {
return
}
success = false
log.Error().Str("repo", repoName).Err(err).Send()
}
// Post-processing bare repositories
if r.IsBare {
// Get new refs
var postFetchState []byte
if postFetchState, err = r.GetObjects(); err != nil {
log.Warn().Str("repo", repoName).Err(err).Msg("Cannot determine state")
return
}
// Check for new refs
if !bytes.Equal(preFetchState, postFetchState) {
cDone <- r
}
log.Debug().Str("repo", repoName).Msg("Success")
return
}
// Post-processing non-bare repositories, treat repos with diverging history differently
diverged := git.IsHistoryDiverged()
// Standard merge
if !diverged {
stdout, stderr, err := git.Merge()
if err != nil {
success = false
log.Info().Str("repo", repoName).Msg(string(stderr))
return
}
if !strings.HasPrefix(string(stdout), "Already up to date") {
cDone <- r
if verboseFlag {
log.Info().Str("repo", repoName).Msg(string(stdout))
}
}
return
}
// Reset required
if diverged && forceFlag {
// Reset to remote history
log.Warn().Str("repo", repoName).Msg("Repo history diverged, reset to remote")
stdout, stderr, err := git.Reset( /*Hard reset*/ true)
if err != nil {
success = false
log.Info().Str("repo", repoName).Msg(string(stderr))
return
}
cDone <- r
if verboseFlag {
log.Info().Str("repo", repoName).Msg(string(stdout))
}
return
}
log.Debug().Str("repo", repoName).Msg("Diverged history")
success = false
}(cFailed, cPassed)
}
}
wgRepo.Wait()
close(cFailed)
close(cPassed)
wgResult.Wait()
var successMsg string
if aliveCheckOnlyFlag {
successMsg = "Alive"
} else {
successMsg = "Updated"
}
for _, repo := range failedRepos {
relative, _ := filepath.Rel(repoBase, repo.Path)
log.Warn().Str("path", relative).Msg("Failed")
}
for _, repo := range changedRepos {
relative, _ := filepath.Rel(repoBase, repo.Path)
log.Info().Str("path", relative).Msg(successMsg)
}
log.Info().Uint32("Processed", repoCount.Load()).Int(successMsg, len(changedRepos)).Int("Failed", len(failedRepos)).Msg("Statistics")
}
func selectBackoffStrategy(backoffType BackoffType, delay time.Duration, maxDelay time.Duration, retries uint) retry.RetryStrategy {
var backoffStrategy retry.RetryStrategy
switch backoffType {
case None:
backoffStrategy = retry.NewNoneBackoff(delay, retries)
case Linear:
backoffStrategy = retry.NewLinearBackoff(delay, retries)
case Fibonacci:
backoffStrategy = retry.NewFibonacciBackoff(delay, retries)
case Exponential:
backoffStrategy = retry.NewExponentialBackoffWithCap(delay, maxDelay, retries)
default:
panic("No back-off strategy")
}
log.Trace().Str("back-off", backoffStrategy.Name()).Send()
return backoffStrategy
}
func isValidDirectory(path string) error {
info, err := os.Stat(path)
if err != nil {
return fmt.Errorf("failed to stat directory: %w", err)
}
if !info.IsDir() {
return fmt.Errorf("path is not a directory: %w", err)
}
dir, err := os.Open(path)
if err != nil {
return fmt.Errorf("no permission to read directory: %w", err)
}
dir.Close()
return nil
}
func acquireFileLock(path string) *os.File {
lockFile, err := os.OpenFile(path, os.O_RDONLY, 0o755)
if err != nil {
log.Fatal().Err(err).Msg("Cannot open directory")
}
log.Trace().Int("lock", int(lockFile.Fd())).Msg("Lock acquired")
err = unix.Flock(int(lockFile.Fd()), unix.LOCK_EX|unix.LOCK_NB)
if err != nil {
log.Fatal().Err(err).Msg("Lock already acquired")
}
return lockFile
}
func releaseFileLock(fileLock *os.File) {
unix.Flock(int(fileLock.Fd()), unix.LOCK_UN)
log.Trace().Int("lock", int(fileLock.Fd())).Msg("Lock released")
fileLock.Close()
}
func isGitRepo(path string) bool {
cmd := exec.Command("git", "-C", path, "rev-parse", "--is-inside-work-tree")
cmd.Stderr, cmd.Stdout = nil, nil
// No error -> git repo, even if git returns false (bare)
return cmd.Run() == nil
}
func findGitrepos(ctx context.Context, path string, maxDepth uint, ch chan<- string) {
if maxDepth == 0 {
return
}
select {
case <-ctx.Done():
log.Trace().Ctx(ctx).Msg("Repo discovery recursive call cancelled")
return
default:
if isGitRepo(path) {
ch <- path
return
}
entries, _ := os.ReadDir(path)
for _, entry := range entries {
if entry.IsDir() && entry.Name() != ".git" {
newPath := filepath.Join(path, entry.Name())
findGitrepos(ctx, newPath, maxDepth-1, ch)
}
}
}
}
func setAndExportUnsetEnv(env map[string]string) {
for k, v := range env {
if _, exists := os.LookupEnv(k); !exists {
os.Setenv(k, v)
log.Trace().Str("var", k).Str("value", v).Msg("Environment variable set")
} else {
log.Trace().Str("var", k).Str("value", os.Getenv(k)).Msg("Environment variable overriden")
}
}
}
func fetchOrUpdateWithRetry(ctx context.Context, repo repo.Repo, atomic bool, strategy retry.RetryStrategy) error {
var currentDelay time.Duration
var err error
for strategy.Next() {
if err = repo.FetchOrUpdateWithCtx(ctx, atomic); err == nil {
return nil
}
name := repo.GetName()
current, limit := strategy.Retries()
currentDelay = strategy.NextDelay()
select {
case <-time.After(currentDelay):
log.Debug().Str("repo", name).Err(err).Uint("tries", current).Uint("limit", limit).
Dur("delay", currentDelay).Msg("Retrying")
case <-ctx.Done():
log.Debug().Str("repo", name).Msg("Remote sync canceled")
ctx.Err()
}
}
return fmt.Errorf("all retry attempts failed trying to fetch refs: %w", err)
}
func isRemoteAliveWithRetry(ctx context.Context, repo repo.Repo, strategy retry.RetryStrategy) (bool, error) {
var currentDelay time.Duration
var err error
for strategy.Next() {
isAlive := false
if isAlive, err = repo.IsRemoteAliveWithCtx(ctx); isAlive && err == nil {
return true, nil
}
name := repo.GetName()
current, limit := strategy.Retries()
currentDelay = strategy.NextDelay()
select {
case <-time.After(currentDelay):
log.Debug().Str("repo", name).Err(err).Uint("tries", current).Uint("limit", limit).
Dur("delay", currentDelay).Msg("Retrying")
case <-ctx.Done():
log.Debug().Str("repo", name).Msg("Alive check canceled")
return false, ctx.Err()
}
}
return false, fmt.Errorf("all retry attempts failed for checking if remote is alive: %w", err)
}