server.garden privileged automation agent (mirror of https://git.sequentialread.com/forest/rootsystem)
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

267 lines
8.3 KiB

package main
import (
"encoding/json"
"fmt"
"log"
"strings"
"sync"
"time"
errors "git.sequentialread.com/forest/pkg-errors"
"git.sequentialread.com/forest/rootsystem/objectStorage"
)
type lockInfo struct {
Who string
Version int
LastHeldMillisecondsSinceUnixEpoch int64
}
const preAquirePeriod = time.Second * time.Duration(5)
const lockRenewalIntervalMilliseconds = 10 * 1000
const lockExpiryTimeoutMilliseconds = 30 * 1000
func aquireLock(who string) (func(), error) {
// In order to hold a lock in this system, you have to record the time you aquired the lock.
// Then, in order to continue holding it, you have to periodically renew the lock (re-write the file with an updated timestamp)
// This way if a process crashes or disconnects, the lock will not be stuck until manual intervention. It will simply expire.
// As a consequence, it's really important that the clocks on all the participating nodes are showing the same time
// (at least within a few seconds of each-other)
// Therefore, as a precaution, we will measure the difference between the clock on this node and the clock
// on the object storage server and adjust our clock accordingly.
clockSkewMs, err := getClockSkewRelativeToObjectStorage()
if err != nil {
return nil, errors.Wrap(err, "can't aquireLock because can't getClockSkewRelativeToObjectStorage")
}
log.Printf("aquiring lock with %dms clock skew\n", clockSkewMs)
currentlyHeldLock, err := getCurrentlyHeldLock(clockSkewMs)
if err != nil {
return nil, errors.Wrap(err, "can't aquireLock because can't getCurrentlyHeldLock")
}
// Enter the main retry loop
for true {
// Someone already has a lock -- now we just spin and wait for that lock to expire or be released.
for currentlyHeldLock != nil && currentlyHeldLock.Who != who {
log.Printf("Waiting for lock held by %s to be released...\n", currentlyHeldLock.Who)
time.Sleep(time.Millisecond * time.Duration(lockRenewalIntervalMilliseconds))
currentlyHeldLock, err = getCurrentlyHeldLock(clockSkewMs)
if err != nil {
return nil, errors.Wrap(err, "can't aquireLock because can't getCurrentlyHeldLock (wait loop)")
}
}
// There is no current lock! Time to try to aquire it!
lockTimestamp := MillisecondsSinceUnixEpoch() + clockSkewMs
lockFilename := fmt.Sprintf("rootsystem/lock_%d", lockTimestamp)
log.Printf("pre-aquiring lock %s\n", lockFilename)
err = writeLockFile(lockFilename, who, clockSkewMs)
if err != nil {
return nil, errors.Wrapf(err, "can't aquireLock because can't writeLockFile %s", lockFilename)
}
// We tried to aquire this lock -- in order to prevent two nodes aquiring the lock at the same time (race condition)
// We will now wait for a bit and see if anyone else also tried to aquire the lock
time.Sleep(preAquirePeriod)
currentlyHeldLock, err := getCurrentlyHeldLock(clockSkewMs)
if err != nil {
return nil, errors.Wrap(err, "can't aquireLock because can't getCurrentlyHeldLock (try aquire)")
}
if currentlyHeldLock != nil && currentlyHeldLock.Who != who {
// someone else grabbed the lock first
log.Printf("pre-aquire failed, %s grabbed the lock first\n", currentlyHeldLock.Who)
global.storage.Delete(lockFilename)
continue
}
if currentlyHeldLock != nil && currentlyHeldLock.Who == who {
log.Printf("aquired lock %s!\n", lockFilename)
lockHasBeenReleased := false
go (func() {
for !lockHasBeenReleased {
time.Sleep(time.Millisecond * time.Duration(lockRenewalIntervalMilliseconds))
if !lockHasBeenReleased {
err := writeLockFile(lockFilename, who, clockSkewMs)
if err != nil {
log.Printf("error renewing lock %s: %s\n", lockFilename, err)
} else {
log.Printf("renewed lock %s\n", lockFilename)
}
}
}
})()
releaseLock := func() {
lockHasBeenReleased = true
err := global.storage.Delete(lockFilename)
if err != nil {
log.Printf("error releasing lock %s: %s\n", lockFilename, err)
} else {
log.Printf("released lock %s\n", lockFilename)
}
}
return releaseLock, nil
}
}
return nil, errors.New("this code was supposed to be unreachable :/")
}
func writeLockFile(lockFilename, who string, clockSkewMs int64) error {
lockTimestamp := MillisecondsSinceUnixEpoch() + clockSkewMs
lockObject := lockInfo{
Who: who,
Version: 1,
LastHeldMillisecondsSinceUnixEpoch: lockTimestamp,
}
lockBytes, err := json.Marshal(lockObject)
if err != nil {
return errors.Wrap(err, "can't writeLockFile because can't json.Marshal(lockObject)")
}
err = global.storage.Put(lockFilename, lockBytes)
if err != nil {
return errors.Wrap(err, "can't writeLockFile because can't save lock file")
}
return nil
}
func getCurrentlyHeldLock(clockSkewMs int64) (*lockInfo, error) {
fileInfos, err := global.storage.List("rootsystem/")
if err != nil {
return nil, errors.Wrap(err, "can't getCurrentlyHeldLock because can't list files")
}
lockFileNames := make([]string, 0)
for _, fileInfo := range fileInfos {
if !fileInfo.IsDirectory && strings.HasPrefix(fileInfo.Name, "lock_") {
lockFileNames = append(lockFileNames, fileInfo.Name)
}
}
objects := make([]lockInfo, 0)
if len(lockFileNames) > 0 {
getAllLockFiles := sync.WaitGroup{}
getAllLockFiles.Add(len(lockFileNames))
type getResult struct {
Path string
File objectStorage.ObjectStorageFile
NotFound bool
Error error
}
results := make([]getResult, len(lockFileNames))
for i, lockfileName := range lockFileNames {
go (func() {
path := fmt.Sprintf("rootsystem/%s", lockfileName)
file, notFound, err := global.storage.Get(path)
results[i] = getResult{
Path: path,
File: file,
NotFound: notFound,
Error: err,
}
getAllLockFiles.Add(-1)
})()
}
getAllLockFiles.Wait()
currentTimeMilliseconds := MillisecondsSinceUnixEpoch() + clockSkewMs
expiredLockFiles := make([]string, 0)
for _, result := range results {
if result.NotFound || len(result.File.Content) == 0 {
continue
}
if result.Error != nil {
return nil, errors.Wrapf(err, "can't getCurrentlyHeldLock because can't HTTP get %s", result.Path)
}
var lockObject lockInfo
//log.Printf("lock file contents: %s\n", string(result.File.Content))
err := json.Unmarshal(result.File.Content, &lockObject)
if err != nil {
return nil, errors.Wrapf(err, "can't getCurrentlyHeldLock because can't json.Unmarshal %s", result.Path)
}
if (currentTimeMilliseconds - lockObject.LastHeldMillisecondsSinceUnixEpoch) < lockExpiryTimeoutMilliseconds {
objects = append(objects, lockObject)
} else {
expiredLockFiles = append(expiredLockFiles, result.Path)
}
}
// Clean up the expired lock files asynchronously
go (func() {
for _, lockfilePath := range expiredLockFiles {
go (func() { global.storage.Delete(lockfilePath) })()
}
})()
}
if len(objects) > 0 {
var oldest lockInfo
for _, lockObject := range objects {
currentIsOlder := oldest.LastHeldMillisecondsSinceUnixEpoch > lockObject.LastHeldMillisecondsSinceUnixEpoch
currentIsFirst := oldest.LastHeldMillisecondsSinceUnixEpoch == 0
if currentIsFirst || currentIsOlder {
oldest = lockObject
}
}
return &oldest, nil
}
return nil, nil
}
func getClockSkewRelativeToObjectStorage() (int64, error) {
originalTimestamp := MillisecondsSinceUnixEpoch()
locktimeFilename := fmt.Sprintf("rootsystem/locktime_%d", originalTimestamp)
err := global.storage.Put(locktimeFilename, []byte("lock time file"))
if err != nil {
return 0, errors.Wrap(err, "can't getClockSkewRelativeToObjectStorage because can't save lock time file")
}
latency := MillisecondsSinceUnixEpoch() - originalTimestamp
result, _, err := global.storage.Get(locktimeFilename)
if err != nil {
return 0, errors.Wrap(err, "can't getClockSkewRelativeToObjectStorage because can't get lock time file")
}
global.storage.Delete(locktimeFilename)
objectStorageTimestamp := (result.LastModified.UnixNano() / int64(time.Millisecond))
return (objectStorageTimestamp - (originalTimestamp + (latency / 2))), nil
}
func TimeFromMillisecondsSinceUnixEpoch(timestamp int64) time.Time {
return time.Unix(timestamp/int64(1000), 0)
}
func MillisecondsSinceUnixEpoch() int64 {
return time.Now().UnixNano() / int64(time.Millisecond)
}