server.garden privileged automation agent (mirror of https://git.sequentialread.com/forest/rootsystem)
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

259 lines
7.0 KiB

package objectStorage
import (
"fmt"
"strings"
errors "git.sequentialread.com/forest/pkg-errors"
"git.sequentialread.com/forest/rootsystem/configuration"
)
type RedundantObjectStorage struct {
ObjectStorages []*ObjectStoragerMeta
}
func NewRedundantObjectStorage(objectStorages []*ObjectStoragerMeta) RedundantObjectStorage {
return RedundantObjectStorage{
ObjectStorages: objectStorages,
}
}
// func (redundantObjectStorage RedundantObjectStorage) CreateIfNotExists() error {
2 years ago
// errInterfaces := redundantObjectStorage.doInParallel(func(client ObjectStorager) interface{} {
// return client.CreateIfNotExists()
// })
// atLeastOneIsWorking := false
// errors := ""
// for _, errInterface := range errInterfaces {
// if errInterface == nil {
// atLeastOneIsWorking = true
// } else {
// errors = fmt.Sprintf("%s\n", errInterface.(error).Error())
// }
// }
// if !atLeastOneIsWorking {
// return fmt.Errorf("RedundantObjectStorage unable to CreateIfNotExists(): %s", errors)
// }
// return nil
// }
func (redundantObjectStorage RedundantObjectStorage) CreateIfNotExists() error {
atLeastOneIsWorking := false
for _, objectStorage := range redundantObjectStorage.ObjectStorages {
if !objectStorage.Initializing && objectStorage.Err == nil {
atLeastOneIsWorking = true
}
}
if !atLeastOneIsWorking {
// TODO enhance this result with more detail
objectStorageErrors := make([]string, 0)
for _, objectStorage := range redundantObjectStorage.ObjectStorages {
objectStorageErrors = append(
objectStorageErrors,
fmt.Sprintf(" %s: %s", objectStorage.Description, objectStorage.Err.Error()),
)
}
return errors.Errorf(
"None of the configured object storages appear to be working\n%s\n",
strings.Join(objectStorageErrors, "\n"),
)
}
return nil
}
func (redundantObjectStorage RedundantObjectStorage) CreateAccessKeyIfNotExists(key ObjectStorageKey) ([]configuration.Credential, error) {
type createAccessKeyIfNotExistsResult struct {
credentials []configuration.Credential
err error
}
resultInterfaces := redundantObjectStorage.doInParallel(func(client ObjectStorager) interface{} {
credentials, err := client.CreateAccessKeyIfNotExists(key)
return createAccessKeyIfNotExistsResult{
credentials: credentials,
err: err,
}
})
errors := ""
credentials := []configuration.Credential{}
for _, resultInterface := range resultInterfaces {
result := resultInterface.(createAccessKeyIfNotExistsResult)
if result.err == nil {
credentials = append(credentials, result.credentials...)
} else {
errors = fmt.Sprintf("%s\n", result.err.Error())
}
}
if len(credentials) == 0 {
return nil, fmt.Errorf("RedundantObjectStorage unable to CreateAccessKeyIfNotExists(\"%s\"): %s", key, errors)
}
return credentials, nil
}
func (redundantObjectStorage RedundantObjectStorage) List(key string) ([]ObjectStorageFileInfo, error) {
resultsMap := make(map[string]ObjectStorageFileInfo)
toReturn := make([]ObjectStorageFileInfo, 0)
type listResult struct {
value []ObjectStorageFileInfo
err error
}
resultInterfaces := redundantObjectStorage.doInParallel(func(client ObjectStorager) interface{} {
value, err := client.List(key)
return listResult{
value: value,
err: err,
}
})
atLeastOneIsWorking := false
errors := ""
for _, resultInterface := range resultInterfaces {
listResult := resultInterface.(listResult)
if listResult.err == nil {
atLeastOneIsWorking = true
for _, fileInfo := range listResult.value {
if resultsMap[fileInfo.Name].Name == "" {
resultsMap[fileInfo.Name] = fileInfo
}
}
} else {
errors = fmt.Sprintf("%s\n", listResult.err.Error())
}
}
if !atLeastOneIsWorking {
return nil, fmt.Errorf("RedundantObjectStorage unable to List(\"%s\"): %s", key, errors)
}
for _, fileInfo := range resultsMap {
toReturn = append(toReturn, fileInfo)
}
return toReturn, nil
}
func (redundantObjectStorage RedundantObjectStorage) Get(key string) (ObjectStorageFile, bool, error) {
type getResult struct {
value ObjectStorageFile
notFound bool
err error
}
resultInterfaces := redundantObjectStorage.doInParallel(func(client ObjectStorager) interface{} {
value, notFound, err := client.Get(key)
return getResult{
value: value,
notFound: notFound,
err: err,
}
})
errors := ""
notFoundError := false
var mostRecentlyUpdated *ObjectStorageFile
for _, resultInterface := range resultInterfaces {
result := resultInterface.(getResult)
if result.err == nil && !result.notFound {
if mostRecentlyUpdated == nil || mostRecentlyUpdated.LastModified.Before(result.value.LastModified) {
mostRecentlyUpdated = &(result.value)
}
} else {
errors = fmt.Sprintf("%s\n", result.err.Error())
if result.notFound {
notFoundError = true
}
}
}
if mostRecentlyUpdated == nil {
return ObjectStorageFile{}, notFoundError, fmt.Errorf("RedundantObjectStorage unable to Get(\"%s\"): %s", key, errors)
}
return *mostRecentlyUpdated, false, nil
}
func (redundantObjectStorage RedundantObjectStorage) Put(key string, value []byte) error {
resultInterfaces := redundantObjectStorage.doInParallel(func(client ObjectStorager) interface{} {
return client.Put(key, value)
})
atLeastOneIsWorking := false
errors := ""
// TODO somehow log partial write errors so they can be replicated later?
for _, resultInterface := range resultInterfaces {
if resultInterface == nil {
atLeastOneIsWorking = true
} else {
err := resultInterface.(error)
if err != nil {
errors = fmt.Sprintf("%s\n", err.Error())
}
}
}
if !atLeastOneIsWorking {
return fmt.Errorf("RedundantObjectStorage unable to Put(\"%s\"): %s", key, errors)
}
return nil
}
func (redundantObjectStorage RedundantObjectStorage) Delete(key string) error {
resultInterfaces := redundantObjectStorage.doInParallel(func(client ObjectStorager) interface{} {
return client.Delete(key)
})
atLeastOneIsWorking := false
errors := ""
// TODO somehow log partial write errors so they can be replicated later?
for _, resultInterface := range resultInterfaces {
if resultInterface == nil {
atLeastOneIsWorking = true
} else {
err := resultInterface.(error)
if err != nil {
errors = fmt.Sprintf("%s\n", err.Error())
}
}
}
if !atLeastOneIsWorking {
return fmt.Errorf("RedundantObjectStorage unable to Delete(\"%s\"): %s", key, errors)
}
return nil
}
// TODO handle timeouts more gracefully -- do not let 1 object storage being completely down slow the process to a crawl
func (redundantObjectStorage RedundantObjectStorage) doInParallel(action func(client ObjectStorager) interface{}) []interface{} {
resultsChannel := make(chan interface{})
results := make([]interface{}, 0)
for _, objectStorage := range redundantObjectStorage.ObjectStorages {
go (func() {
resultsChannel <- action(objectStorage.Client)
})()
}
for range redundantObjectStorage.ObjectStorages {
result := <-resultsChannel
results = append(results, result)
}
return results
}