572 lines
14 KiB
Go
572 lines
14 KiB
Go
// SPDX-License-Identifier: GPL-3.0-or-later
|
|
// Copyright © 2023 Thorsten Schubert <tschubert@bafh.org>
|
|
|
|
package main
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"flag"
|
|
"fmt"
|
|
"os"
|
|
"os/exec"
|
|
"os/signal"
|
|
"path/filepath"
|
|
"runtime"
|
|
"strings"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/rs/zerolog"
|
|
"github.com/rs/zerolog/journald"
|
|
"github.com/rs/zerolog/log"
|
|
"golang.org/x/sys/unix"
|
|
|
|
"update-repos/internal/repo"
|
|
"update-repos/internal/retry"
|
|
)
|
|
|
|
type BackoffType uint
|
|
|
|
const (
|
|
Exponential BackoffType = iota
|
|
Fibonacci
|
|
Linear
|
|
None
|
|
)
|
|
|
|
var backoffMap = map[string]BackoffType{
|
|
"exponential": Exponential,
|
|
"fibonacci": Fibonacci,
|
|
"linear": Linear,
|
|
"none": None,
|
|
}
|
|
|
|
var VERSION string
|
|
|
|
// Implement flag.Value interface
|
|
func (b *BackoffType) Set(value string) error {
|
|
if val, exists := backoffMap[value]; exists {
|
|
*b = val
|
|
return nil
|
|
}
|
|
return fmt.Errorf("invalid backoff strategy: %s", value)
|
|
}
|
|
|
|
func (b *BackoffType) String() string {
|
|
for k, v := range backoffMap {
|
|
if v == *b {
|
|
return k
|
|
}
|
|
}
|
|
return ""
|
|
}
|
|
|
|
func init() {
|
|
isSystemd := len(os.Getenv("JOURNAL_STREAM")) > 0
|
|
|
|
if isSystemd {
|
|
log.Logger = log.Output(zerolog.New(journald.NewJournalDWriter()))
|
|
} else {
|
|
log.Logger = log.Output(zerolog.ConsoleWriter{
|
|
Out: os.Stderr,
|
|
TimeFormat: time.RFC3339,
|
|
NoColor: !func(fd int) bool {
|
|
_, err := unix.IoctlGetTermios(fd, unix.TCGETS)
|
|
return err == nil
|
|
}(int(os.Stdout.Fd())),
|
|
})
|
|
}
|
|
zerolog.SetGlobalLevel(zerolog.InfoLevel)
|
|
|
|
if VERSION == "" {
|
|
VERSION = "unknown"
|
|
}
|
|
}
|
|
|
|
func main() {
|
|
c := make(chan os.Signal, 1)
|
|
signal.Notify(c, unix.SIGINT)
|
|
|
|
const maxDelay time.Duration = 60 * time.Second
|
|
|
|
var (
|
|
aliveCheckOnlyFlag bool
|
|
atomicFetchFlag bool
|
|
verboseFlag bool
|
|
versionFlag bool
|
|
forceFlag bool
|
|
maxDepthFlag uint
|
|
maxJobsFlag uint
|
|
maxRetriesFlag uint
|
|
logLevelFlag string
|
|
initialDelayFlag time.Duration
|
|
backoffStrategyFlag = Linear
|
|
)
|
|
|
|
flag.UintVar(&maxDepthFlag, "max-depth", 1, "maximum search depth for git repositories")
|
|
flag.UintVar(&maxJobsFlag, "max-jobs", uint(runtime.NumCPU()), "maximum number of parallel jobs")
|
|
flag.BoolVar(&aliveCheckOnlyFlag, "alive-check", false, "Check if remote is alive, don't update or alter any state")
|
|
flag.BoolVar(&atomicFetchFlag, "atomic", false, "Use an atomic transaction to update local refs")
|
|
flag.UintVar(&maxRetriesFlag, "max-retries", 5, "maximum number of retries on failure")
|
|
flag.DurationVar(&initialDelayFlag, "initial-delay", 500*time.Millisecond, "initial delay for retries")
|
|
flag.Var(&backoffStrategyFlag, "backoff-strategy", "back-off strategy for retries (none, linear, fibonacci, exponential)")
|
|
flag.BoolVar(&forceFlag, "force", false, "force update on diverging history")
|
|
flag.BoolVar(&verboseFlag, "verbose", false, "be verbose")
|
|
flag.BoolVar(&verboseFlag, "v", false, "be verbose")
|
|
flag.BoolVar(&versionFlag, "version", false, "program version")
|
|
flag.StringVar(&logLevelFlag, "log-level", "info", "set log level (debug, info, warn, error, fatal, panic)")
|
|
|
|
flag.Usage = func() {
|
|
fmt.Fprintf(os.Stderr, "Usage: %s [options] [<repo-dir>]\n", os.Args[0])
|
|
flag.PrintDefaults()
|
|
}
|
|
flag.Parse()
|
|
nonFlagArgs := flag.Args()
|
|
|
|
if versionFlag {
|
|
fmt.Fprintf(os.Stderr, "Version: %s\n", VERSION)
|
|
return
|
|
}
|
|
|
|
if maxJobsFlag == 0 {
|
|
maxJobsFlag++
|
|
}
|
|
|
|
// A depth of 1 means, we descent one subdirectory deep, so offset by +1
|
|
// since we expect a base directory and need to descend at least 2 levels
|
|
maxDepthFlag++
|
|
|
|
parsedLevel, err := zerolog.ParseLevel(logLevelFlag)
|
|
if err != nil {
|
|
log.Fatal().Err(err).Msg("Invalid log level")
|
|
}
|
|
zerolog.SetGlobalLevel(parsedLevel)
|
|
|
|
var repoBase string
|
|
if len(nonFlagArgs) != 0 {
|
|
repoBase = nonFlagArgs[0]
|
|
} else {
|
|
var cwd string
|
|
cwd, err = os.Getwd()
|
|
if err != nil {
|
|
log.Fatal().Str("error", cwd).Msg("Could not get current working directory")
|
|
}
|
|
repoBase = cwd
|
|
log.Debug().Str("cwd", cwd).Msg("Using current directory")
|
|
}
|
|
repoBase, err = filepath.Abs(repoBase)
|
|
if err != nil {
|
|
log.Fatal().Err(err).Msg("Invalid directory path")
|
|
}
|
|
|
|
if err := isValidDirectory(repoBase); err != nil {
|
|
log.Fatal().Err(err).Msg("Invalid directory path")
|
|
}
|
|
|
|
// One instance of the program for identical paths
|
|
lock := acquireFileLock(repoBase)
|
|
defer releaseFileLock(lock)
|
|
|
|
envVars := map[string]string{
|
|
"GIT_CONFIG_GLOBAL": "/dev/null",
|
|
"GIT_TERMINAL_PROMPT": "0",
|
|
"GIT_ASKPASS": "/bin/false",
|
|
"GIT_HTTP_MAX_REQUESTS": "2",
|
|
"GIT_HTTP_LOW_SPEED_LIMIT": "8192",
|
|
"GIT_HTTP_LOW_SPEED_TIME": "5",
|
|
"GIT_SSH_COMMAND": "ssh -o BatchMode=yes",
|
|
}
|
|
// Allow env vars to be overriden by the caller, i.e. the calling shell
|
|
setAndExportUnsetEnv(envVars)
|
|
|
|
var (
|
|
failedRepos []*repo.Gitrepo
|
|
changedRepos []*repo.Gitrepo
|
|
wgRepo sync.WaitGroup
|
|
wgResult sync.WaitGroup
|
|
once sync.Once
|
|
)
|
|
cRepoPaths := make(chan string)
|
|
cFailed := make(chan *repo.Gitrepo)
|
|
cPassed := make(chan *repo.Gitrepo)
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
|
|
// Handle SIGINT (CTRL-C)
|
|
go func() {
|
|
<-c
|
|
once.Do(func() {
|
|
log.Warn().Msg("SIGINT received - shutting down gracefully")
|
|
cancel()
|
|
})
|
|
}()
|
|
|
|
wgResult.Add(1)
|
|
go func() {
|
|
defer wgResult.Done()
|
|
for {
|
|
select {
|
|
case repo, ok := <-cFailed:
|
|
if !ok {
|
|
cFailed = nil
|
|
} else {
|
|
log.Debug().Str("repo", repo.Name).Msg("Appending failed repo")
|
|
failedRepos = append(failedRepos, repo)
|
|
}
|
|
case repo, ok := <-cPassed:
|
|
if !ok {
|
|
cPassed = nil
|
|
} else {
|
|
log.Debug().Str("repo", repo.Name).Msg("Appending passed repo")
|
|
changedRepos = append(changedRepos, repo)
|
|
}
|
|
}
|
|
if cFailed == nil && cPassed == nil {
|
|
break // exit the loop if both channels are closed
|
|
}
|
|
}
|
|
}()
|
|
|
|
// No WaitGroup necessary, since we will block on the channel below until this returns
|
|
go func(ctx context.Context) {
|
|
findGitrepos(ctx, repoBase, maxDepthFlag, cRepoPaths)
|
|
close(cRepoPaths)
|
|
}(ctx)
|
|
|
|
jobSem := make(chan struct{}, maxJobsFlag)
|
|
var repoCount atomic.Uint32
|
|
|
|
done:
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
log.Trace().Ctx(ctx).Msg("Updates cancelled")
|
|
break done
|
|
|
|
case discoveredPath, ok := <-cRepoPaths:
|
|
if !ok {
|
|
break done
|
|
}
|
|
|
|
wgRepo.Add(1)
|
|
jobSem <- struct{}{}
|
|
go func(cFail, cDone chan<- *repo.Gitrepo) {
|
|
defer func() {
|
|
wgRepo.Done()
|
|
<-jobSem
|
|
}()
|
|
|
|
repoName := filepath.Base(discoveredPath)
|
|
git, err := repo.NewGitRepo(filepath.Clean(discoveredPath))
|
|
if err != nil {
|
|
if repo.IsMissingRemoteError(err) {
|
|
log.Warn().Str("path", discoveredPath).Err(err).Msg("Ignoring repo")
|
|
} else {
|
|
log.Error().Str("path", discoveredPath).Err(err).Send()
|
|
}
|
|
return
|
|
}
|
|
|
|
repoCount.Add(1)
|
|
r := git.(*repo.Gitrepo)
|
|
log.Info().Str("repo", repoName).Msg("Processing")
|
|
|
|
success := true
|
|
defer func() {
|
|
msg := "Failure"
|
|
if success {
|
|
msg = "Success"
|
|
}
|
|
if !success {
|
|
cFail <- r
|
|
}
|
|
log.Debug().Str("repo", repoName).Str("result", msg).Send()
|
|
}()
|
|
|
|
strategy := selectBackoffStrategy(backoffStrategyFlag, initialDelayFlag, maxDelay, maxRetriesFlag+1)
|
|
|
|
// Alive Check only, do not alter any discovered repositories and return early
|
|
if aliveCheckOnlyFlag {
|
|
alive := false
|
|
if alive, err = isRemoteAliveWithRetry(ctx, r, strategy); alive {
|
|
cDone <- r
|
|
} else {
|
|
// Don't log or alter success state on context cancel
|
|
if ctx.Err() != nil {
|
|
return
|
|
}
|
|
success = false
|
|
log.Error().Str("repo", repoName).Err(err).Send()
|
|
}
|
|
return
|
|
}
|
|
|
|
// Get objects before fetching anything for bare repositories
|
|
// since we do not have merge output to determine changed refs
|
|
var preFetchState []byte
|
|
if r.IsBare {
|
|
if preFetchState, err = r.GetObjects(); err != nil {
|
|
log.Error().Str("repo", repoName).Err(err).Send()
|
|
return
|
|
}
|
|
}
|
|
|
|
// The actual update
|
|
if err = fetchOrUpdateWithRetry(ctx, r, atomicFetchFlag, strategy); err != nil {
|
|
if ctx.Err() != nil {
|
|
return
|
|
}
|
|
success = false
|
|
log.Error().Str("repo", repoName).Err(err).Send()
|
|
}
|
|
|
|
// Post-processing bare repositories
|
|
if r.IsBare {
|
|
// Get new refs
|
|
var postFetchState []byte
|
|
|
|
if postFetchState, err = r.GetObjects(); err != nil {
|
|
log.Warn().Str("repo", repoName).Err(err).Msg("Cannot determine state")
|
|
return
|
|
}
|
|
|
|
// Check for new refs
|
|
if !bytes.Equal(preFetchState, postFetchState) {
|
|
cDone <- r
|
|
}
|
|
|
|
log.Debug().Str("repo", repoName).Msg("Success")
|
|
|
|
return
|
|
}
|
|
|
|
// Post-processing non-bare repositories, treat repos with diverging history differently
|
|
diverged := git.IsHistoryDiverged()
|
|
|
|
// Standard merge
|
|
if !diverged {
|
|
stdout, stderr, err := git.Merge()
|
|
|
|
if err != nil {
|
|
success = false
|
|
log.Info().Str("repo", repoName).Msg(string(stderr))
|
|
return
|
|
}
|
|
|
|
if !strings.HasPrefix(string(stdout), "Already up to date") {
|
|
cDone <- r
|
|
|
|
if verboseFlag {
|
|
log.Info().Str("repo", repoName).Msg(string(stdout))
|
|
}
|
|
}
|
|
|
|
return
|
|
}
|
|
// Reset required
|
|
if diverged && forceFlag {
|
|
// Reset to remote history
|
|
log.Warn().Str("repo", repoName).Msg("Repo history diverged, reset to remote")
|
|
|
|
stdout, stderr, err := git.Reset( /*Hard reset*/ true)
|
|
|
|
if err != nil {
|
|
success = false
|
|
log.Info().Str("repo", repoName).Msg(string(stderr))
|
|
return
|
|
}
|
|
|
|
cDone <- r
|
|
|
|
if verboseFlag {
|
|
log.Info().Str("repo", repoName).Msg(string(stdout))
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
log.Debug().Str("repo", repoName).Msg("Diverged history")
|
|
success = false
|
|
}(cFailed, cPassed)
|
|
}
|
|
}
|
|
|
|
wgRepo.Wait()
|
|
close(cFailed)
|
|
close(cPassed)
|
|
wgResult.Wait()
|
|
|
|
var successMsg string
|
|
if aliveCheckOnlyFlag {
|
|
successMsg = "Alive"
|
|
} else {
|
|
successMsg = "Updated"
|
|
}
|
|
|
|
for _, repo := range failedRepos {
|
|
relative, _ := filepath.Rel(repoBase, repo.Path)
|
|
log.Warn().Str("path", relative).Msg("Failed")
|
|
}
|
|
for _, repo := range changedRepos {
|
|
relative, _ := filepath.Rel(repoBase, repo.Path)
|
|
log.Info().Str("path", relative).Msg(successMsg)
|
|
}
|
|
|
|
log.Info().Uint32("Processed", repoCount.Load()).Int(successMsg, len(changedRepos)).Int("Failed", len(failedRepos)).Msg("Statistics")
|
|
}
|
|
|
|
func selectBackoffStrategy(backoffType BackoffType, delay time.Duration, maxDelay time.Duration, retries uint) retry.RetryStrategy {
|
|
var backoffStrategy retry.RetryStrategy
|
|
switch backoffType {
|
|
case None:
|
|
backoffStrategy = retry.NewNoneBackoff(delay, retries)
|
|
case Linear:
|
|
backoffStrategy = retry.NewLinearBackoff(delay, retries)
|
|
case Fibonacci:
|
|
backoffStrategy = retry.NewFibonacciBackoff(delay, retries)
|
|
case Exponential:
|
|
backoffStrategy = retry.NewExponentialBackoffWithCap(delay, maxDelay, retries)
|
|
default:
|
|
panic("No back-off strategy")
|
|
}
|
|
log.Trace().Str("back-off", backoffStrategy.Name()).Send()
|
|
|
|
return backoffStrategy
|
|
}
|
|
|
|
func isValidDirectory(path string) error {
|
|
info, err := os.Stat(path)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to stat directory: %w", err)
|
|
}
|
|
if !info.IsDir() {
|
|
return fmt.Errorf("path is not a directory: %w", err)
|
|
}
|
|
dir, err := os.Open(path)
|
|
if err != nil {
|
|
return fmt.Errorf("no permission to read directory: %w", err)
|
|
}
|
|
dir.Close()
|
|
|
|
return nil
|
|
}
|
|
|
|
func acquireFileLock(path string) *os.File {
|
|
lockFile, err := os.OpenFile(path, os.O_RDONLY, 0o755)
|
|
if err != nil {
|
|
log.Fatal().Err(err).Msg("Cannot open directory")
|
|
}
|
|
log.Trace().Int("lock", int(lockFile.Fd())).Msg("Lock acquired")
|
|
|
|
err = unix.Flock(int(lockFile.Fd()), unix.LOCK_EX|unix.LOCK_NB)
|
|
if err != nil {
|
|
log.Fatal().Err(err).Msg("Lock already acquired")
|
|
}
|
|
|
|
return lockFile
|
|
}
|
|
|
|
func releaseFileLock(fileLock *os.File) {
|
|
unix.Flock(int(fileLock.Fd()), unix.LOCK_UN)
|
|
log.Trace().Int("lock", int(fileLock.Fd())).Msg("Lock released")
|
|
fileLock.Close()
|
|
}
|
|
|
|
func isGitRepo(path string) bool {
|
|
cmd := exec.Command("git", "-C", path, "rev-parse", "--is-inside-work-tree")
|
|
cmd.Stderr, cmd.Stdout = nil, nil
|
|
|
|
// No error -> git repo, even if git returns false (bare)
|
|
return cmd.Run() == nil
|
|
}
|
|
|
|
func findGitrepos(ctx context.Context, path string, maxDepth uint, ch chan<- string) {
|
|
if maxDepth == 0 {
|
|
return
|
|
}
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
log.Trace().Ctx(ctx).Msg("Repo discovery recursive call cancelled")
|
|
return
|
|
|
|
default:
|
|
if isGitRepo(path) {
|
|
ch <- path
|
|
return
|
|
}
|
|
|
|
entries, _ := os.ReadDir(path)
|
|
for _, entry := range entries {
|
|
if entry.IsDir() && entry.Name() != ".git" {
|
|
newPath := filepath.Join(path, entry.Name())
|
|
findGitrepos(ctx, newPath, maxDepth-1, ch)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func setAndExportUnsetEnv(env map[string]string) {
|
|
for k, v := range env {
|
|
if _, exists := os.LookupEnv(k); !exists {
|
|
os.Setenv(k, v)
|
|
log.Trace().Str("var", k).Str("value", v).Msg("Environment variable set")
|
|
} else {
|
|
log.Trace().Str("var", k).Str("value", os.Getenv(k)).Msg("Environment variable overriden")
|
|
}
|
|
}
|
|
}
|
|
|
|
func fetchOrUpdateWithRetry(ctx context.Context, repo repo.Repo, atomic bool, strategy retry.RetryStrategy) error {
|
|
var currentDelay time.Duration
|
|
var err error
|
|
|
|
for strategy.Next() {
|
|
if err = repo.FetchOrUpdateWithCtx(ctx, atomic); err == nil {
|
|
return nil
|
|
}
|
|
|
|
name := repo.GetName()
|
|
current, limit := strategy.Retries()
|
|
currentDelay = strategy.NextDelay()
|
|
|
|
select {
|
|
case <-time.After(currentDelay):
|
|
log.Debug().Str("repo", name).Err(err).Uint("tries", current).Uint("limit", limit).
|
|
Dur("delay", currentDelay).Msg("Retrying")
|
|
case <-ctx.Done():
|
|
log.Debug().Str("repo", name).Msg("Remote sync canceled")
|
|
ctx.Err()
|
|
}
|
|
}
|
|
|
|
return fmt.Errorf("all retry attempts failed trying to fetch refs: %w", err)
|
|
}
|
|
|
|
func isRemoteAliveWithRetry(ctx context.Context, repo repo.Repo, strategy retry.RetryStrategy) (bool, error) {
|
|
var currentDelay time.Duration
|
|
var err error
|
|
|
|
for strategy.Next() {
|
|
isAlive := false
|
|
if isAlive, err = repo.IsRemoteAliveWithCtx(ctx); isAlive && err == nil {
|
|
return true, nil
|
|
}
|
|
|
|
name := repo.GetName()
|
|
current, limit := strategy.Retries()
|
|
currentDelay = strategy.NextDelay()
|
|
|
|
select {
|
|
case <-time.After(currentDelay):
|
|
log.Debug().Str("repo", name).Err(err).Uint("tries", current).Uint("limit", limit).
|
|
Dur("delay", currentDelay).Msg("Retrying")
|
|
case <-ctx.Done():
|
|
log.Debug().Str("repo", name).Msg("Alive check canceled")
|
|
return false, ctx.Err()
|
|
}
|
|
}
|
|
|
|
return false, fmt.Errorf("all retry attempts failed for checking if remote is alive: %w", err)
|
|
}
|