|
|
|
package objectStorage
|
|
|
|
|
|
|
|
import (
|
|
|
|
"fmt"
|
|
|
|
"strings"
|
|
|
|
|
|
|
|
errors "git.sequentialread.com/forest/pkg-errors"
|
|
|
|
"git.sequentialread.com/forest/rootsystem/configuration"
|
|
|
|
)
|
|
|
|
|
|
|
|
type RedundantObjectStorage struct {
|
|
|
|
ObjectStorages []*ObjectStoragerMeta
|
|
|
|
}
|
|
|
|
|
|
|
|
func NewRedundantObjectStorage(objectStorages []*ObjectStoragerMeta) RedundantObjectStorage {
|
|
|
|
return RedundantObjectStorage{
|
|
|
|
ObjectStorages: objectStorages,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// func (redundantObjectStorage RedundantObjectStorage) CreateIfNotExists() error {
|
|
|
|
// errInterfaces := redundantObjectStorage.doInParallel(func(client ObjectStorager) interface{} {
|
|
|
|
// return client.CreateIfNotExists()
|
|
|
|
// })
|
|
|
|
|
|
|
|
// atLeastOneIsWorking := false
|
|
|
|
// errors := ""
|
|
|
|
// for _, errInterface := range errInterfaces {
|
|
|
|
// if errInterface == nil {
|
|
|
|
// atLeastOneIsWorking = true
|
|
|
|
// } else {
|
|
|
|
// errors = fmt.Sprintf("%s\n", errInterface.(error).Error())
|
|
|
|
// }
|
|
|
|
// }
|
|
|
|
|
|
|
|
// if !atLeastOneIsWorking {
|
|
|
|
// return fmt.Errorf("RedundantObjectStorage unable to CreateIfNotExists(): %s", errors)
|
|
|
|
// }
|
|
|
|
|
|
|
|
// return nil
|
|
|
|
// }
|
|
|
|
|
|
|
|
func (redundantObjectStorage RedundantObjectStorage) CreateIfNotExists() error {
|
|
|
|
atLeastOneIsWorking := false
|
|
|
|
for _, objectStorage := range redundantObjectStorage.ObjectStorages {
|
|
|
|
if !objectStorage.Initializing && objectStorage.Err == nil {
|
|
|
|
atLeastOneIsWorking = true
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if !atLeastOneIsWorking {
|
|
|
|
// TODO enhance this result with more detail
|
|
|
|
objectStorageErrors := make([]string, 0)
|
|
|
|
for _, objectStorage := range redundantObjectStorage.ObjectStorages {
|
|
|
|
objectStorageErrors = append(
|
|
|
|
objectStorageErrors,
|
|
|
|
fmt.Sprintf(" %s: %s", objectStorage.Description, objectStorage.Err.Error()),
|
|
|
|
)
|
|
|
|
}
|
|
|
|
return errors.Errorf(
|
|
|
|
"None of the configured object storages appear to be working\n%s\n",
|
|
|
|
strings.Join(objectStorageErrors, "\n"),
|
|
|
|
)
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (redundantObjectStorage RedundantObjectStorage) CreateAccessKeyIfNotExists(key ObjectStorageKey) ([]configuration.Credential, error) {
|
|
|
|
type createAccessKeyIfNotExistsResult struct {
|
|
|
|
credentials []configuration.Credential
|
|
|
|
err error
|
|
|
|
}
|
|
|
|
|
|
|
|
resultInterfaces := redundantObjectStorage.doInParallel(func(client ObjectStorager) interface{} {
|
|
|
|
credentials, err := client.CreateAccessKeyIfNotExists(key)
|
|
|
|
return createAccessKeyIfNotExistsResult{
|
|
|
|
credentials: credentials,
|
|
|
|
err: err,
|
|
|
|
}
|
|
|
|
})
|
|
|
|
|
|
|
|
errors := ""
|
|
|
|
credentials := []configuration.Credential{}
|
|
|
|
for _, resultInterface := range resultInterfaces {
|
|
|
|
result := resultInterface.(createAccessKeyIfNotExistsResult)
|
|
|
|
if result.err == nil {
|
|
|
|
credentials = append(credentials, result.credentials...)
|
|
|
|
} else {
|
|
|
|
errors = fmt.Sprintf("%s\n", result.err.Error())
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if len(credentials) == 0 {
|
|
|
|
return nil, fmt.Errorf("RedundantObjectStorage unable to CreateAccessKeyIfNotExists(\"%s\"): %s", key, errors)
|
|
|
|
}
|
|
|
|
|
|
|
|
return credentials, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (redundantObjectStorage RedundantObjectStorage) List(key string) ([]ObjectStorageFileInfo, error) {
|
|
|
|
resultsMap := make(map[string]ObjectStorageFileInfo)
|
|
|
|
toReturn := make([]ObjectStorageFileInfo, 0)
|
|
|
|
type listResult struct {
|
|
|
|
value []ObjectStorageFileInfo
|
|
|
|
err error
|
|
|
|
}
|
|
|
|
resultInterfaces := redundantObjectStorage.doInParallel(func(client ObjectStorager) interface{} {
|
|
|
|
value, err := client.List(key)
|
|
|
|
return listResult{
|
|
|
|
value: value,
|
|
|
|
err: err,
|
|
|
|
}
|
|
|
|
})
|
|
|
|
atLeastOneIsWorking := false
|
|
|
|
errors := ""
|
|
|
|
for _, resultInterface := range resultInterfaces {
|
|
|
|
listResult := resultInterface.(listResult)
|
|
|
|
if listResult.err == nil {
|
|
|
|
atLeastOneIsWorking = true
|
|
|
|
for _, fileInfo := range listResult.value {
|
|
|
|
if resultsMap[fileInfo.Name].Name == "" {
|
|
|
|
resultsMap[fileInfo.Name] = fileInfo
|
|
|
|
}
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
errors = fmt.Sprintf("%s\n", listResult.err.Error())
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if !atLeastOneIsWorking {
|
|
|
|
return nil, fmt.Errorf("RedundantObjectStorage unable to List(\"%s\"): %s", key, errors)
|
|
|
|
}
|
|
|
|
|
|
|
|
for _, fileInfo := range resultsMap {
|
|
|
|
toReturn = append(toReturn, fileInfo)
|
|
|
|
}
|
|
|
|
|
|
|
|
return toReturn, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (redundantObjectStorage RedundantObjectStorage) Get(key string) (ObjectStorageFile, bool, error) {
|
|
|
|
type getResult struct {
|
|
|
|
value ObjectStorageFile
|
|
|
|
notFound bool
|
|
|
|
err error
|
|
|
|
}
|
|
|
|
|
|
|
|
resultInterfaces := redundantObjectStorage.doInParallel(func(client ObjectStorager) interface{} {
|
|
|
|
value, notFound, err := client.Get(key)
|
|
|
|
return getResult{
|
|
|
|
value: value,
|
|
|
|
notFound: notFound,
|
|
|
|
err: err,
|
|
|
|
}
|
|
|
|
})
|
|
|
|
|
|
|
|
errors := ""
|
|
|
|
notFoundError := false
|
|
|
|
var mostRecentlyUpdated *ObjectStorageFile
|
|
|
|
for _, resultInterface := range resultInterfaces {
|
|
|
|
result := resultInterface.(getResult)
|
|
|
|
if result.err == nil && !result.notFound {
|
|
|
|
if mostRecentlyUpdated == nil || mostRecentlyUpdated.LastModified.Before(result.value.LastModified) {
|
|
|
|
mostRecentlyUpdated = &(result.value)
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
errors = fmt.Sprintf("%s\n", result.err.Error())
|
|
|
|
|
|
|
|
if result.notFound {
|
|
|
|
notFoundError = true
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if mostRecentlyUpdated == nil {
|
|
|
|
return ObjectStorageFile{}, notFoundError, fmt.Errorf("RedundantObjectStorage unable to Get(\"%s\"): %s", key, errors)
|
|
|
|
}
|
|
|
|
|
|
|
|
return *mostRecentlyUpdated, false, nil
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
func (redundantObjectStorage RedundantObjectStorage) Put(key string, value []byte) error {
|
|
|
|
|
|
|
|
resultInterfaces := redundantObjectStorage.doInParallel(func(client ObjectStorager) interface{} {
|
|
|
|
return client.Put(key, value)
|
|
|
|
})
|
|
|
|
|
|
|
|
atLeastOneIsWorking := false
|
|
|
|
errors := ""
|
|
|
|
// TODO somehow log partial write errors so they can be replicated later?
|
|
|
|
for _, resultInterface := range resultInterfaces {
|
|
|
|
|
|
|
|
if resultInterface == nil {
|
|
|
|
atLeastOneIsWorking = true
|
|
|
|
} else {
|
|
|
|
err := resultInterface.(error)
|
|
|
|
if err != nil {
|
|
|
|
errors = fmt.Sprintf("%s\n", err.Error())
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if !atLeastOneIsWorking {
|
|
|
|
return fmt.Errorf("RedundantObjectStorage unable to Put(\"%s\"): %s", key, errors)
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (redundantObjectStorage RedundantObjectStorage) Delete(key string) error {
|
|
|
|
|
|
|
|
resultInterfaces := redundantObjectStorage.doInParallel(func(client ObjectStorager) interface{} {
|
|
|
|
return client.Delete(key)
|
|
|
|
})
|
|
|
|
|
|
|
|
atLeastOneIsWorking := false
|
|
|
|
errors := ""
|
|
|
|
// TODO somehow log partial write errors so they can be replicated later?
|
|
|
|
for _, resultInterface := range resultInterfaces {
|
|
|
|
if resultInterface == nil {
|
|
|
|
atLeastOneIsWorking = true
|
|
|
|
} else {
|
|
|
|
err := resultInterface.(error)
|
|
|
|
if err != nil {
|
|
|
|
errors = fmt.Sprintf("%s\n", err.Error())
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if !atLeastOneIsWorking {
|
|
|
|
return fmt.Errorf("RedundantObjectStorage unable to Delete(\"%s\"): %s", key, errors)
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// TODO handle timeouts more gracefully -- do not let 1 object storage being completely down slow the process to a crawl
|
|
|
|
|
|
|
|
func (redundantObjectStorage RedundantObjectStorage) doInParallel(action func(client ObjectStorager) interface{}) []interface{} {
|
|
|
|
|
|
|
|
resultsChannel := make(chan interface{})
|
|
|
|
results := make([]interface{}, 0)
|
|
|
|
|
|
|
|
for _, objectStorage := range redundantObjectStorage.ObjectStorages {
|
|
|
|
go (func() {
|
|
|
|
resultsChannel <- action(objectStorage.Client)
|
|
|
|
})()
|
|
|
|
}
|
|
|
|
|
|
|
|
for range redundantObjectStorage.ObjectStorages {
|
|
|
|
result := <-resultsChannel
|
|
|
|
results = append(results, result)
|
|
|
|
}
|
|
|
|
|
|
|
|
return results
|
|
|
|
}
|