server.garden privileged automation agent (mirror of https://git.sequentialread.com/forest/rootsystem)
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
267 lines
8.3 KiB
267 lines
8.3 KiB
package main |
|
|
|
import ( |
|
"encoding/json" |
|
"fmt" |
|
"log" |
|
"strings" |
|
"sync" |
|
"time" |
|
|
|
errors "git.sequentialread.com/forest/pkg-errors" |
|
"git.sequentialread.com/forest/rootsystem/objectStorage" |
|
) |
|
|
|
type lockInfo struct { |
|
Who string |
|
Version int |
|
LastHeldMillisecondsSinceUnixEpoch int64 |
|
} |
|
|
|
const preAquirePeriod = time.Second * time.Duration(5) |
|
const lockRenewalIntervalMilliseconds = 10 * 1000 |
|
const lockExpiryTimeoutMilliseconds = 30 * 1000 |
|
|
|
func aquireLock(who string) (func(), error) { |
|
|
|
// In order to hold a lock in this system, you have to record the time you aquired the lock. |
|
// Then, in order to continue holding it, you have to periodically renew the lock (re-write the file with an updated timestamp) |
|
// This way if a process crashes or disconnects, the lock will not be stuck until manual intervention. It will simply expire. |
|
// As a consequence, it's really important that the clocks on all the participating nodes are showing the same time |
|
// (at least within a few seconds of each-other) |
|
// Therefore, as a precaution, we will measure the difference between the clock on this node and the clock |
|
// on the object storage server and adjust our clock accordingly. |
|
|
|
clockSkewMs, err := getClockSkewRelativeToObjectStorage() |
|
if err != nil { |
|
return nil, errors.Wrap(err, "can't aquireLock because can't getClockSkewRelativeToObjectStorage") |
|
} |
|
|
|
log.Printf("aquiring lock with %dms clock skew\n", clockSkewMs) |
|
|
|
currentlyHeldLock, err := getCurrentlyHeldLock(clockSkewMs) |
|
if err != nil { |
|
return nil, errors.Wrap(err, "can't aquireLock because can't getCurrentlyHeldLock") |
|
} |
|
|
|
// Enter the main retry loop |
|
for true { |
|
|
|
// Someone already has a lock -- now we just spin and wait for that lock to expire or be released. |
|
for currentlyHeldLock != nil && currentlyHeldLock.Who != who { |
|
log.Printf("Waiting for lock held by %s to be released...\n", currentlyHeldLock.Who) |
|
time.Sleep(time.Millisecond * time.Duration(lockRenewalIntervalMilliseconds)) |
|
|
|
currentlyHeldLock, err = getCurrentlyHeldLock(clockSkewMs) |
|
if err != nil { |
|
return nil, errors.Wrap(err, "can't aquireLock because can't getCurrentlyHeldLock (wait loop)") |
|
} |
|
} |
|
|
|
// There is no current lock! Time to try to aquire it! |
|
|
|
lockTimestamp := MillisecondsSinceUnixEpoch() + clockSkewMs |
|
lockFilename := fmt.Sprintf("rootsystem/lock_%d", lockTimestamp) |
|
|
|
log.Printf("pre-aquiring lock %s\n", lockFilename) |
|
err = writeLockFile(lockFilename, who, clockSkewMs) |
|
if err != nil { |
|
return nil, errors.Wrapf(err, "can't aquireLock because can't writeLockFile %s", lockFilename) |
|
} |
|
|
|
// We tried to aquire this lock -- in order to prevent two nodes aquiring the lock at the same time (race condition) |
|
// We will now wait for a bit and see if anyone else also tried to aquire the lock |
|
time.Sleep(preAquirePeriod) |
|
|
|
currentlyHeldLock, err := getCurrentlyHeldLock(clockSkewMs) |
|
if err != nil { |
|
return nil, errors.Wrap(err, "can't aquireLock because can't getCurrentlyHeldLock (try aquire)") |
|
} |
|
|
|
if currentlyHeldLock != nil && currentlyHeldLock.Who != who { |
|
// someone else grabbed the lock first |
|
log.Printf("pre-aquire failed, %s grabbed the lock first\n", currentlyHeldLock.Who) |
|
global.storage.Delete(lockFilename) |
|
continue |
|
} |
|
|
|
if currentlyHeldLock != nil && currentlyHeldLock.Who == who { |
|
log.Printf("aquired lock %s!\n", lockFilename) |
|
|
|
lockHasBeenReleased := false |
|
go (func() { |
|
for !lockHasBeenReleased { |
|
time.Sleep(time.Millisecond * time.Duration(lockRenewalIntervalMilliseconds)) |
|
if !lockHasBeenReleased { |
|
err := writeLockFile(lockFilename, who, clockSkewMs) |
|
if err != nil { |
|
log.Printf("error renewing lock %s: %s\n", lockFilename, err) |
|
} else { |
|
log.Printf("renewed lock %s\n", lockFilename) |
|
} |
|
} |
|
} |
|
})() |
|
|
|
releaseLock := func() { |
|
lockHasBeenReleased = true |
|
err := global.storage.Delete(lockFilename) |
|
if err != nil { |
|
log.Printf("error releasing lock %s: %s\n", lockFilename, err) |
|
} else { |
|
log.Printf("released lock %s\n", lockFilename) |
|
} |
|
} |
|
|
|
return releaseLock, nil |
|
} |
|
} |
|
|
|
return nil, errors.New("this code was supposed to be unreachable :/") |
|
} |
|
|
|
func writeLockFile(lockFilename, who string, clockSkewMs int64) error { |
|
lockTimestamp := MillisecondsSinceUnixEpoch() + clockSkewMs |
|
lockObject := lockInfo{ |
|
Who: who, |
|
Version: 1, |
|
LastHeldMillisecondsSinceUnixEpoch: lockTimestamp, |
|
} |
|
|
|
lockBytes, err := json.Marshal(lockObject) |
|
if err != nil { |
|
return errors.Wrap(err, "can't writeLockFile because can't json.Marshal(lockObject)") |
|
} |
|
|
|
err = global.storage.Put(lockFilename, lockBytes) |
|
if err != nil { |
|
return errors.Wrap(err, "can't writeLockFile because can't save lock file") |
|
} |
|
|
|
return nil |
|
} |
|
|
|
func getCurrentlyHeldLock(clockSkewMs int64) (*lockInfo, error) { |
|
|
|
fileInfos, err := global.storage.List("rootsystem/") |
|
if err != nil { |
|
return nil, errors.Wrap(err, "can't getCurrentlyHeldLock because can't list files") |
|
} |
|
|
|
lockFileNames := make([]string, 0) |
|
|
|
for _, fileInfo := range fileInfos { |
|
if !fileInfo.IsDirectory && strings.HasPrefix(fileInfo.Name, "lock_") { |
|
lockFileNames = append(lockFileNames, fileInfo.Name) |
|
} |
|
} |
|
|
|
objects := make([]lockInfo, 0) |
|
|
|
if len(lockFileNames) > 0 { |
|
getAllLockFiles := sync.WaitGroup{} |
|
getAllLockFiles.Add(len(lockFileNames)) |
|
|
|
type getResult struct { |
|
Path string |
|
File objectStorage.ObjectStorageFile |
|
NotFound bool |
|
Error error |
|
} |
|
|
|
results := make([]getResult, len(lockFileNames)) |
|
|
|
for i, lockfileName := range lockFileNames { |
|
go (func() { |
|
path := fmt.Sprintf("rootsystem/%s", lockfileName) |
|
file, notFound, err := global.storage.Get(path) |
|
results[i] = getResult{ |
|
Path: path, |
|
File: file, |
|
NotFound: notFound, |
|
Error: err, |
|
} |
|
getAllLockFiles.Add(-1) |
|
})() |
|
} |
|
|
|
getAllLockFiles.Wait() |
|
|
|
currentTimeMilliseconds := MillisecondsSinceUnixEpoch() + clockSkewMs |
|
expiredLockFiles := make([]string, 0) |
|
|
|
for _, result := range results { |
|
if result.NotFound || len(result.File.Content) == 0 { |
|
continue |
|
} |
|
if result.Error != nil { |
|
return nil, errors.Wrapf(err, "can't getCurrentlyHeldLock because can't HTTP get %s", result.Path) |
|
} |
|
var lockObject lockInfo |
|
//log.Printf("lock file contents: %s\n", string(result.File.Content)) |
|
err := json.Unmarshal(result.File.Content, &lockObject) |
|
if err != nil { |
|
return nil, errors.Wrapf(err, "can't getCurrentlyHeldLock because can't json.Unmarshal %s", result.Path) |
|
} |
|
|
|
if (currentTimeMilliseconds - lockObject.LastHeldMillisecondsSinceUnixEpoch) < lockExpiryTimeoutMilliseconds { |
|
objects = append(objects, lockObject) |
|
} else { |
|
expiredLockFiles = append(expiredLockFiles, result.Path) |
|
} |
|
} |
|
|
|
// Clean up the expired lock files asynchronously |
|
go (func() { |
|
for _, lockfilePath := range expiredLockFiles { |
|
go (func() { global.storage.Delete(lockfilePath) })() |
|
} |
|
})() |
|
} |
|
|
|
if len(objects) > 0 { |
|
var oldest lockInfo |
|
for _, lockObject := range objects { |
|
currentIsOlder := oldest.LastHeldMillisecondsSinceUnixEpoch > lockObject.LastHeldMillisecondsSinceUnixEpoch |
|
currentIsFirst := oldest.LastHeldMillisecondsSinceUnixEpoch == 0 |
|
if currentIsFirst || currentIsOlder { |
|
oldest = lockObject |
|
} |
|
} |
|
|
|
return &oldest, nil |
|
} |
|
|
|
return nil, nil |
|
} |
|
|
|
func getClockSkewRelativeToObjectStorage() (int64, error) { |
|
originalTimestamp := MillisecondsSinceUnixEpoch() |
|
locktimeFilename := fmt.Sprintf("rootsystem/locktime_%d", originalTimestamp) |
|
|
|
err := global.storage.Put(locktimeFilename, []byte("lock time file")) |
|
if err != nil { |
|
return 0, errors.Wrap(err, "can't getClockSkewRelativeToObjectStorage because can't save lock time file") |
|
} |
|
|
|
latency := MillisecondsSinceUnixEpoch() - originalTimestamp |
|
|
|
result, _, err := global.storage.Get(locktimeFilename) |
|
if err != nil { |
|
return 0, errors.Wrap(err, "can't getClockSkewRelativeToObjectStorage because can't get lock time file") |
|
} |
|
|
|
global.storage.Delete(locktimeFilename) |
|
|
|
objectStorageTimestamp := (result.LastModified.UnixNano() / int64(time.Millisecond)) |
|
|
|
return (objectStorageTimestamp - (originalTimestamp + (latency / 2))), nil |
|
} |
|
|
|
func TimeFromMillisecondsSinceUnixEpoch(timestamp int64) time.Time { |
|
return time.Unix(timestamp/int64(1000), 0) |
|
} |
|
|
|
func MillisecondsSinceUnixEpoch() int64 { |
|
return time.Now().UnixNano() / int64(time.Millisecond) |
|
}
|
|
|