2020-03-30 20:26:26 +00:00
|
|
|
package main
|
|
|
|
|
|
|
|
import (
|
2020-09-20 05:53:01 +00:00
|
|
|
"encoding/json"
|
|
|
|
"fmt"
|
|
|
|
"log"
|
|
|
|
"strings"
|
|
|
|
"sync"
|
|
|
|
"time"
|
|
|
|
|
|
|
|
errors "git.sequentialread.com/forest/pkg-errors"
|
|
|
|
"git.sequentialread.com/forest/rootsystem/objectStorage"
|
2020-03-30 20:26:26 +00:00
|
|
|
)
|
|
|
|
|
|
|
|
type lockInfo struct {
|
2020-09-20 05:53:01 +00:00
|
|
|
Who string
|
|
|
|
Version int
|
|
|
|
LastHeldMillisecondsSinceUnixEpoch int64
|
2020-03-30 20:26:26 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
const preAquirePeriod = time.Second * time.Duration(5)
|
|
|
|
const lockRenewalIntervalMilliseconds = 10 * 1000
|
|
|
|
const lockExpiryTimeoutMilliseconds = 30 * 1000
|
|
|
|
|
|
|
|
func aquireLock(who string) (func(), error) {
|
|
|
|
|
2020-09-20 05:53:01 +00:00
|
|
|
// In order to hold a lock in this system, you have to record the time you aquired the lock.
|
|
|
|
// Then, in order to continue holding it, you have to periodically renew the lock (re-write the file with an updated timestamp)
|
|
|
|
// This way if a process crashes or disconnects, the lock will not be stuck until manual intervention. It will simply expire.
|
|
|
|
// As a consequence, it's really important that the clocks on all the participating nodes are showing the same time
|
|
|
|
// (at least within a few seconds of each-other)
|
|
|
|
// Therefore, as a precaution, we will measure the difference between the clock on this node and the clock
|
|
|
|
// on the object storage server and adjust our clock accordingly.
|
|
|
|
|
|
|
|
clockSkewMs, err := getClockSkewRelativeToObjectStorage()
|
|
|
|
if err != nil {
|
|
|
|
return nil, errors.Wrap(err, "can't aquireLock because can't getClockSkewRelativeToObjectStorage")
|
|
|
|
}
|
|
|
|
|
|
|
|
log.Printf("aquiring lock with %dms clock skew\n", clockSkewMs)
|
|
|
|
|
|
|
|
currentlyHeldLock, err := getCurrentlyHeldLock(clockSkewMs)
|
|
|
|
if err != nil {
|
|
|
|
return nil, errors.Wrap(err, "can't aquireLock because can't getCurrentlyHeldLock")
|
|
|
|
}
|
|
|
|
|
|
|
|
// Enter the main retry loop
|
|
|
|
for true {
|
|
|
|
|
|
|
|
// Someone already has a lock -- now we just spin and wait for that lock to expire or be released.
|
|
|
|
for currentlyHeldLock != nil && currentlyHeldLock.Who != who {
|
|
|
|
log.Printf("Waiting for lock held by %s to be released...\n", currentlyHeldLock.Who)
|
|
|
|
time.Sleep(time.Millisecond * time.Duration(lockRenewalIntervalMilliseconds))
|
|
|
|
|
|
|
|
currentlyHeldLock, err = getCurrentlyHeldLock(clockSkewMs)
|
|
|
|
if err != nil {
|
|
|
|
return nil, errors.Wrap(err, "can't aquireLock because can't getCurrentlyHeldLock (wait loop)")
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// There is no current lock! Time to try to aquire it!
|
|
|
|
|
|
|
|
lockTimestamp := MillisecondsSinceUnixEpoch() + clockSkewMs
|
|
|
|
lockFilename := fmt.Sprintf("rootsystem/lock_%d", lockTimestamp)
|
|
|
|
|
|
|
|
log.Printf("pre-aquiring lock %s\n", lockFilename)
|
|
|
|
err = writeLockFile(lockFilename, who, clockSkewMs)
|
|
|
|
if err != nil {
|
|
|
|
return nil, errors.Wrapf(err, "can't aquireLock because can't writeLockFile %s", lockFilename)
|
|
|
|
}
|
|
|
|
|
|
|
|
// We tried to aquire this lock -- in order to prevent two nodes aquiring the lock at the same time (race condition)
|
|
|
|
// We will now wait for a bit and see if anyone else also tried to aquire the lock
|
|
|
|
time.Sleep(preAquirePeriod)
|
|
|
|
|
|
|
|
currentlyHeldLock, err := getCurrentlyHeldLock(clockSkewMs)
|
|
|
|
if err != nil {
|
|
|
|
return nil, errors.Wrap(err, "can't aquireLock because can't getCurrentlyHeldLock (try aquire)")
|
|
|
|
}
|
|
|
|
|
|
|
|
if currentlyHeldLock != nil && currentlyHeldLock.Who != who {
|
|
|
|
// someone else grabbed the lock first
|
|
|
|
log.Printf("pre-aquire failed, %s grabbed the lock first\n", currentlyHeldLock.Who)
|
|
|
|
global.storage.Delete(lockFilename)
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
if currentlyHeldLock != nil && currentlyHeldLock.Who == who {
|
|
|
|
log.Printf("aquired lock %s!\n", lockFilename)
|
|
|
|
|
|
|
|
lockHasBeenReleased := false
|
|
|
|
go (func() {
|
|
|
|
for !lockHasBeenReleased {
|
|
|
|
time.Sleep(time.Millisecond * time.Duration(lockRenewalIntervalMilliseconds))
|
|
|
|
if !lockHasBeenReleased {
|
|
|
|
err := writeLockFile(lockFilename, who, clockSkewMs)
|
|
|
|
if err != nil {
|
|
|
|
log.Printf("error renewing lock %s: %s\n", lockFilename, err)
|
|
|
|
} else {
|
|
|
|
log.Printf("renewed lock %s\n", lockFilename)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
})()
|
|
|
|
|
|
|
|
releaseLock := func() {
|
|
|
|
lockHasBeenReleased = true
|
|
|
|
err := global.storage.Delete(lockFilename)
|
|
|
|
if err != nil {
|
|
|
|
log.Printf("error releasing lock %s: %s\n", lockFilename, err)
|
|
|
|
} else {
|
|
|
|
log.Printf("released lock %s\n", lockFilename)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return releaseLock, nil
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil, errors.New("this code was supposed to be unreachable :/")
|
2020-03-30 20:26:26 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
func writeLockFile(lockFilename, who string, clockSkewMs int64) error {
|
2020-09-20 05:53:01 +00:00
|
|
|
lockTimestamp := MillisecondsSinceUnixEpoch() + clockSkewMs
|
|
|
|
lockObject := lockInfo{
|
|
|
|
Who: who,
|
|
|
|
Version: 1,
|
|
|
|
LastHeldMillisecondsSinceUnixEpoch: lockTimestamp,
|
|
|
|
}
|
|
|
|
|
|
|
|
lockBytes, err := json.Marshal(lockObject)
|
|
|
|
if err != nil {
|
|
|
|
return errors.Wrap(err, "can't writeLockFile because can't json.Marshal(lockObject)")
|
|
|
|
}
|
|
|
|
|
|
|
|
err = global.storage.Put(lockFilename, lockBytes)
|
|
|
|
if err != nil {
|
|
|
|
return errors.Wrap(err, "can't writeLockFile because can't save lock file")
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
2020-03-30 20:26:26 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
func getCurrentlyHeldLock(clockSkewMs int64) (*lockInfo, error) {
|
|
|
|
|
2020-09-20 05:53:01 +00:00
|
|
|
fileInfos, err := global.storage.List("rootsystem/")
|
|
|
|
if err != nil {
|
|
|
|
return nil, errors.Wrap(err, "can't getCurrentlyHeldLock because can't list files")
|
|
|
|
}
|
|
|
|
|
|
|
|
lockFileNames := make([]string, 0)
|
|
|
|
|
|
|
|
for _, fileInfo := range fileInfos {
|
|
|
|
if !fileInfo.IsDirectory && strings.HasPrefix(fileInfo.Name, "lock_") {
|
|
|
|
lockFileNames = append(lockFileNames, fileInfo.Name)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
objects := make([]lockInfo, 0)
|
|
|
|
|
|
|
|
if len(lockFileNames) > 0 {
|
|
|
|
getAllLockFiles := sync.WaitGroup{}
|
|
|
|
getAllLockFiles.Add(len(lockFileNames))
|
|
|
|
|
|
|
|
type getResult struct {
|
|
|
|
Path string
|
|
|
|
File objectStorage.ObjectStorageFile
|
|
|
|
NotFound bool
|
|
|
|
Error error
|
|
|
|
}
|
|
|
|
|
|
|
|
results := make([]getResult, len(lockFileNames))
|
|
|
|
|
|
|
|
for i, lockfileName := range lockFileNames {
|
|
|
|
go (func() {
|
|
|
|
path := fmt.Sprintf("rootsystem/%s", lockfileName)
|
|
|
|
file, notFound, err := global.storage.Get(path)
|
|
|
|
results[i] = getResult{
|
|
|
|
Path: path,
|
|
|
|
File: file,
|
|
|
|
NotFound: notFound,
|
|
|
|
Error: err,
|
|
|
|
}
|
|
|
|
getAllLockFiles.Add(-1)
|
|
|
|
})()
|
|
|
|
}
|
|
|
|
|
|
|
|
getAllLockFiles.Wait()
|
|
|
|
|
|
|
|
currentTimeMilliseconds := MillisecondsSinceUnixEpoch() + clockSkewMs
|
|
|
|
expiredLockFiles := make([]string, 0)
|
|
|
|
|
|
|
|
for _, result := range results {
|
|
|
|
if result.NotFound || len(result.File.Content) == 0 {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
if result.Error != nil {
|
|
|
|
return nil, errors.Wrapf(err, "can't getCurrentlyHeldLock because can't HTTP get %s", result.Path)
|
|
|
|
}
|
|
|
|
var lockObject lockInfo
|
|
|
|
//log.Printf("lock file contents: %s\n", string(result.File.Content))
|
|
|
|
err := json.Unmarshal(result.File.Content, &lockObject)
|
|
|
|
if err != nil {
|
|
|
|
return nil, errors.Wrapf(err, "can't getCurrentlyHeldLock because can't json.Unmarshal %s", result.Path)
|
|
|
|
}
|
|
|
|
|
|
|
|
if (currentTimeMilliseconds - lockObject.LastHeldMillisecondsSinceUnixEpoch) < lockExpiryTimeoutMilliseconds {
|
|
|
|
objects = append(objects, lockObject)
|
|
|
|
} else {
|
|
|
|
expiredLockFiles = append(expiredLockFiles, result.Path)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Clean up the expired lock files asynchronously
|
|
|
|
go (func() {
|
|
|
|
for _, lockfilePath := range expiredLockFiles {
|
|
|
|
go (func() { global.storage.Delete(lockfilePath) })()
|
|
|
|
}
|
|
|
|
})()
|
|
|
|
}
|
|
|
|
|
|
|
|
if len(objects) > 0 {
|
|
|
|
var oldest lockInfo
|
|
|
|
for _, lockObject := range objects {
|
|
|
|
currentIsOlder := oldest.LastHeldMillisecondsSinceUnixEpoch > lockObject.LastHeldMillisecondsSinceUnixEpoch
|
|
|
|
currentIsFirst := oldest.LastHeldMillisecondsSinceUnixEpoch == 0
|
|
|
|
if currentIsFirst || currentIsOlder {
|
|
|
|
oldest = lockObject
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return &oldest, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil, nil
|
2020-03-30 20:26:26 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
func getClockSkewRelativeToObjectStorage() (int64, error) {
|
2020-09-20 05:53:01 +00:00
|
|
|
originalTimestamp := MillisecondsSinceUnixEpoch()
|
|
|
|
locktimeFilename := fmt.Sprintf("rootsystem/locktime_%d", originalTimestamp)
|
2020-03-30 20:26:26 +00:00
|
|
|
|
2020-09-20 05:53:01 +00:00
|
|
|
err := global.storage.Put(locktimeFilename, []byte("lock time file"))
|
|
|
|
if err != nil {
|
|
|
|
return 0, errors.Wrap(err, "can't getClockSkewRelativeToObjectStorage because can't save lock time file")
|
|
|
|
}
|
2020-03-30 20:26:26 +00:00
|
|
|
|
2020-09-20 05:53:01 +00:00
|
|
|
latency := MillisecondsSinceUnixEpoch() - originalTimestamp
|
2020-04-10 12:49:18 +00:00
|
|
|
|
2020-09-20 05:53:01 +00:00
|
|
|
result, _, err := global.storage.Get(locktimeFilename)
|
|
|
|
if err != nil {
|
|
|
|
return 0, errors.Wrap(err, "can't getClockSkewRelativeToObjectStorage because can't get lock time file")
|
|
|
|
}
|
2020-03-30 20:26:26 +00:00
|
|
|
|
2020-09-20 05:53:01 +00:00
|
|
|
global.storage.Delete(locktimeFilename)
|
2020-03-30 20:26:26 +00:00
|
|
|
|
2020-09-20 05:53:01 +00:00
|
|
|
objectStorageTimestamp := (result.LastModified.UnixNano() / int64(time.Millisecond))
|
2020-03-30 20:26:26 +00:00
|
|
|
|
2020-09-20 05:53:01 +00:00
|
|
|
return (objectStorageTimestamp - (originalTimestamp + (latency / 2))), nil
|
2020-03-30 20:26:26 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
func TimeFromMillisecondsSinceUnixEpoch(timestamp int64) time.Time {
|
2020-09-20 05:53:01 +00:00
|
|
|
return time.Unix(timestamp/int64(1000), 0)
|
2020-03-30 20:26:26 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
func MillisecondsSinceUnixEpoch() int64 {
|
2020-09-20 05:53:01 +00:00
|
|
|
return time.Now().UnixNano() / int64(time.Millisecond)
|
2020-03-30 20:26:26 +00:00
|
|
|
}
|