server.garden privileged automation agent (mirror of https://git.sequentialread.com/forest/rootsystem)
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
258 lines
7.0 KiB
258 lines
7.0 KiB
package objectStorage |
|
|
|
import ( |
|
"fmt" |
|
"strings" |
|
|
|
errors "git.sequentialread.com/forest/pkg-errors" |
|
"git.sequentialread.com/forest/rootsystem/configuration" |
|
) |
|
|
|
type RedundantObjectStorage struct { |
|
ObjectStorages []*ObjectStoragerMeta |
|
} |
|
|
|
func NewRedundantObjectStorage(objectStorages []*ObjectStoragerMeta) RedundantObjectStorage { |
|
return RedundantObjectStorage{ |
|
ObjectStorages: objectStorages, |
|
} |
|
} |
|
|
|
// func (redundantObjectStorage RedundantObjectStorage) CreateIfNotExists() error { |
|
// errInterfaces := redundantObjectStorage.doInParallel(func(client ObjectStorager) interface{} { |
|
// return client.CreateIfNotExists() |
|
// }) |
|
|
|
// atLeastOneIsWorking := false |
|
// errors := "" |
|
// for _, errInterface := range errInterfaces { |
|
// if errInterface == nil { |
|
// atLeastOneIsWorking = true |
|
// } else { |
|
// errors = fmt.Sprintf("%s\n", errInterface.(error).Error()) |
|
// } |
|
// } |
|
|
|
// if !atLeastOneIsWorking { |
|
// return fmt.Errorf("RedundantObjectStorage unable to CreateIfNotExists(): %s", errors) |
|
// } |
|
|
|
// return nil |
|
// } |
|
|
|
func (redundantObjectStorage RedundantObjectStorage) CreateIfNotExists() error { |
|
atLeastOneIsWorking := false |
|
for _, objectStorage := range redundantObjectStorage.ObjectStorages { |
|
if !objectStorage.Initializing && objectStorage.Err == nil { |
|
atLeastOneIsWorking = true |
|
} |
|
} |
|
|
|
if !atLeastOneIsWorking { |
|
// TODO enhance this result with more detail |
|
objectStorageErrors := make([]string, 0) |
|
for _, objectStorage := range redundantObjectStorage.ObjectStorages { |
|
objectStorageErrors = append( |
|
objectStorageErrors, |
|
fmt.Sprintf(" %s: %s", objectStorage.Description, objectStorage.Err.Error()), |
|
) |
|
} |
|
return errors.Errorf( |
|
"None of the configured object storages appear to be working\n%s\n", |
|
strings.Join(objectStorageErrors, "\n"), |
|
) |
|
} |
|
|
|
return nil |
|
} |
|
|
|
func (redundantObjectStorage RedundantObjectStorage) CreateAccessKeyIfNotExists(key ObjectStorageKey) ([]configuration.Credential, error) { |
|
type createAccessKeyIfNotExistsResult struct { |
|
credentials []configuration.Credential |
|
err error |
|
} |
|
|
|
resultInterfaces := redundantObjectStorage.doInParallel(func(client ObjectStorager) interface{} { |
|
credentials, err := client.CreateAccessKeyIfNotExists(key) |
|
return createAccessKeyIfNotExistsResult{ |
|
credentials: credentials, |
|
err: err, |
|
} |
|
}) |
|
|
|
errors := "" |
|
credentials := []configuration.Credential{} |
|
for _, resultInterface := range resultInterfaces { |
|
result := resultInterface.(createAccessKeyIfNotExistsResult) |
|
if result.err == nil { |
|
credentials = append(credentials, result.credentials...) |
|
} else { |
|
errors = fmt.Sprintf("%s\n", result.err.Error()) |
|
} |
|
} |
|
|
|
if len(credentials) == 0 { |
|
return nil, fmt.Errorf("RedundantObjectStorage unable to CreateAccessKeyIfNotExists(\"%s\"): %s", key, errors) |
|
} |
|
|
|
return credentials, nil |
|
} |
|
|
|
func (redundantObjectStorage RedundantObjectStorage) List(key string) ([]ObjectStorageFileInfo, error) { |
|
resultsMap := make(map[string]ObjectStorageFileInfo) |
|
toReturn := make([]ObjectStorageFileInfo, 0) |
|
type listResult struct { |
|
value []ObjectStorageFileInfo |
|
err error |
|
} |
|
resultInterfaces := redundantObjectStorage.doInParallel(func(client ObjectStorager) interface{} { |
|
value, err := client.List(key) |
|
return listResult{ |
|
value: value, |
|
err: err, |
|
} |
|
}) |
|
atLeastOneIsWorking := false |
|
errors := "" |
|
for _, resultInterface := range resultInterfaces { |
|
listResult := resultInterface.(listResult) |
|
if listResult.err == nil { |
|
atLeastOneIsWorking = true |
|
for _, fileInfo := range listResult.value { |
|
if resultsMap[fileInfo.Name].Name == "" { |
|
resultsMap[fileInfo.Name] = fileInfo |
|
} |
|
} |
|
} else { |
|
errors = fmt.Sprintf("%s\n", listResult.err.Error()) |
|
} |
|
} |
|
|
|
if !atLeastOneIsWorking { |
|
return nil, fmt.Errorf("RedundantObjectStorage unable to List(\"%s\"): %s", key, errors) |
|
} |
|
|
|
for _, fileInfo := range resultsMap { |
|
toReturn = append(toReturn, fileInfo) |
|
} |
|
|
|
return toReturn, nil |
|
} |
|
|
|
func (redundantObjectStorage RedundantObjectStorage) Get(key string) (ObjectStorageFile, bool, error) { |
|
type getResult struct { |
|
value ObjectStorageFile |
|
notFound bool |
|
err error |
|
} |
|
|
|
resultInterfaces := redundantObjectStorage.doInParallel(func(client ObjectStorager) interface{} { |
|
value, notFound, err := client.Get(key) |
|
return getResult{ |
|
value: value, |
|
notFound: notFound, |
|
err: err, |
|
} |
|
}) |
|
|
|
errors := "" |
|
notFoundError := false |
|
var mostRecentlyUpdated *ObjectStorageFile |
|
for _, resultInterface := range resultInterfaces { |
|
result := resultInterface.(getResult) |
|
if result.err == nil && !result.notFound { |
|
if mostRecentlyUpdated == nil || mostRecentlyUpdated.LastModified.Before(result.value.LastModified) { |
|
mostRecentlyUpdated = &(result.value) |
|
} |
|
} else { |
|
errors = fmt.Sprintf("%s\n", result.err.Error()) |
|
|
|
if result.notFound { |
|
notFoundError = true |
|
} |
|
} |
|
} |
|
|
|
if mostRecentlyUpdated == nil { |
|
return ObjectStorageFile{}, notFoundError, fmt.Errorf("RedundantObjectStorage unable to Get(\"%s\"): %s", key, errors) |
|
} |
|
|
|
return *mostRecentlyUpdated, false, nil |
|
|
|
} |
|
|
|
func (redundantObjectStorage RedundantObjectStorage) Put(key string, value []byte) error { |
|
|
|
resultInterfaces := redundantObjectStorage.doInParallel(func(client ObjectStorager) interface{} { |
|
return client.Put(key, value) |
|
}) |
|
|
|
atLeastOneIsWorking := false |
|
errors := "" |
|
// TODO somehow log partial write errors so they can be replicated later? |
|
for _, resultInterface := range resultInterfaces { |
|
|
|
if resultInterface == nil { |
|
atLeastOneIsWorking = true |
|
} else { |
|
err := resultInterface.(error) |
|
if err != nil { |
|
errors = fmt.Sprintf("%s\n", err.Error()) |
|
} |
|
} |
|
} |
|
|
|
if !atLeastOneIsWorking { |
|
return fmt.Errorf("RedundantObjectStorage unable to Put(\"%s\"): %s", key, errors) |
|
} |
|
|
|
return nil |
|
} |
|
|
|
func (redundantObjectStorage RedundantObjectStorage) Delete(key string) error { |
|
|
|
resultInterfaces := redundantObjectStorage.doInParallel(func(client ObjectStorager) interface{} { |
|
return client.Delete(key) |
|
}) |
|
|
|
atLeastOneIsWorking := false |
|
errors := "" |
|
// TODO somehow log partial write errors so they can be replicated later? |
|
for _, resultInterface := range resultInterfaces { |
|
if resultInterface == nil { |
|
atLeastOneIsWorking = true |
|
} else { |
|
err := resultInterface.(error) |
|
if err != nil { |
|
errors = fmt.Sprintf("%s\n", err.Error()) |
|
} |
|
} |
|
} |
|
|
|
if !atLeastOneIsWorking { |
|
return fmt.Errorf("RedundantObjectStorage unable to Delete(\"%s\"): %s", key, errors) |
|
} |
|
|
|
return nil |
|
} |
|
|
|
// TODO handle timeouts more gracefully -- do not let 1 object storage being completely down slow the process to a crawl |
|
|
|
func (redundantObjectStorage RedundantObjectStorage) doInParallel(action func(client ObjectStorager) interface{}) []interface{} { |
|
|
|
resultsChannel := make(chan interface{}) |
|
results := make([]interface{}, 0) |
|
|
|
for _, objectStorage := range redundantObjectStorage.ObjectStorages { |
|
go (func() { |
|
resultsChannel <- action(objectStorage.Client) |
|
})() |
|
} |
|
|
|
for range redundantObjectStorage.ObjectStorages { |
|
result := <-resultsChannel |
|
results = append(results, result) |
|
} |
|
|
|
return results |
|
}
|
|
|