server.garden privileged automation agent (mirror of https://git.sequentialread.com/forest/rootsystem)
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
212 lines
6.3 KiB
212 lines
6.3 KiB
package objectStorage |
|
|
|
import ( |
|
"bytes" |
|
"encoding/xml" |
|
"fmt" |
|
"io" |
|
"io/ioutil" |
|
"net/http" |
|
"path/filepath" |
|
"strings" |
|
"time" |
|
|
|
errors "git.sequentialread.com/forest/pkg-errors" |
|
"git.sequentialread.com/forest/rootsystem/configuration" |
|
|
|
"github.com/minio/minio-go/pkg/signer" |
|
) |
|
|
|
type S3Compatible struct { |
|
KeyId string |
|
SecretKey string |
|
HTTPClient http.Client |
|
Domain string |
|
Region string |
|
} |
|
|
|
type s3Error struct { |
|
XMLName xml.Name `xml:"Error"` |
|
Code string |
|
} |
|
|
|
type listBucketResult struct { |
|
XMLName xml.Name `xml:"ListBucketResult"` |
|
IsTruncated bool |
|
Files []string `xml:"Contents>Key"` |
|
FilesLastModified []string `xml:"Contents>LastModified"` |
|
Directories []string `xml:"CommonPrefixes>Prefix"` |
|
} |
|
|
|
const S3_LAST_MODIFIED_HEADER_FORMAT = "Mon, 2 Jan 2006 15:04:05 MST" |
|
|
|
// We can't use Digital Ocean Spaces because it comes with a monthly service charge. |
|
// Press F in the chat for digital ocean |
|
// func NewDigitalOceanSpacesClient(cred credential, region, spaceName string) S3Compatible { |
|
// return S3Compatible{ |
|
// Credential: cred, |
|
// HTTPClient: http.Client{ |
|
// // TODO test this timeout for both connect timeout and read timeout. |
|
// // Do we need to include a custom Transport as well for a Dial Timeout? |
|
// Timeout: time.Second * 60, |
|
// }, |
|
// Domain: fmt.Sprintf("https://%s.%s.digitaloceanspaces.com", spaceName, region), |
|
// Region: region, |
|
// } |
|
// } |
|
|
|
func NewS3Client(keyId, secretKey, region, bucketName string) S3Compatible { |
|
|
|
subdomain := fmt.Sprintf("s3-%s", region) |
|
if region == "us-east-1" { |
|
subdomain = "s3" |
|
} |
|
|
|
return S3Compatible{ |
|
KeyId: keyId, |
|
SecretKey: secretKey, |
|
HTTPClient: http.Client{ |
|
// TODO test this timeout for both connect timeout and read timeout. |
|
// Do we need to include a custom Transport as well for a Dial Timeout? |
|
Timeout: time.Second * 60, |
|
}, |
|
Domain: fmt.Sprintf("https://%s.%s.amazonaws.com", bucketName, subdomain), |
|
Region: region, |
|
} |
|
} |
|
|
|
func (self S3Compatible) CreateIfNotExists() error { |
|
return errors.New("not implemented") |
|
// response, err := self.doReq("GET", "/?delimiter=/", nil) |
|
// if err != nil { |
|
// return err |
|
// } |
|
// bytes, err := ioutil.ReadAll(response.Body) |
|
// if err != nil { |
|
// return errors.Wrap(err, "could not read response body") |
|
// } |
|
// if strings.Contains(string(bytes), "ListBucketResult") { |
|
// return nil |
|
// } |
|
// return fmt.Errorf("recieved weird response from %s: %s", self.Domain, string(bytes)) |
|
} |
|
|
|
func (self S3Compatible) CreateAccessKeyIfNotExists(key ObjectStorageKey) ([]configuration.Credential, error) { |
|
return nil, errors.New("not implemented") |
|
} |
|
|
|
func (self S3Compatible) List(key string) ([]ObjectStorageFileInfo, error) { |
|
if !strings.HasPrefix(key, "/") { |
|
key = fmt.Sprintf("/%s", key) |
|
} |
|
response, err := self.doReq("GET", fmt.Sprintf("%s?delimiter=/", key), nil) |
|
if err != nil { |
|
return []ObjectStorageFileInfo{}, err |
|
} |
|
bytes, err := ioutil.ReadAll(response.Body) |
|
if err != nil { |
|
return nil, errors.Wrap(err, "could not read response body") |
|
} |
|
|
|
result := listBucketResult{} |
|
err = xml.Unmarshal(bytes, &result) |
|
if err != nil { |
|
s3ErrorResult := s3Error{} |
|
s3ErrorUnmarshalError := xml.Unmarshal(bytes, &s3ErrorResult) |
|
if s3ErrorUnmarshalError != nil { |
|
return []ObjectStorageFileInfo{}, err |
|
} |
|
if s3ErrorResult.Code == "NoSuchKey" { |
|
return []ObjectStorageFileInfo{}, nil |
|
} |
|
return []ObjectStorageFileInfo{}, fmt.Errorf("recieved weird List() response from %s: %s", self.Domain, string(bytes)) |
|
} |
|
|
|
toReturn := make([]ObjectStorageFileInfo, len(result.Directories)+len(result.Files)) |
|
for i, directory := range result.Directories { |
|
if strings.HasSuffix(directory, "/") { |
|
directory = directory[:len(directory)-1] |
|
} |
|
toReturn[i] = ObjectStorageFileInfo{ |
|
IsDirectory: true, |
|
Name: directory, |
|
} |
|
} |
|
for i, file := range result.Files { |
|
//2020-02-15T22:35:15.788Z |
|
lastModified, err := time.Parse(time.RFC3339, result.FilesLastModified[i]) |
|
if err != nil { |
|
return []ObjectStorageFileInfo{}, errors.Wrapf(err, "can't parse file last modified date") |
|
} |
|
toReturn[len(result.Directories)+i] = ObjectStorageFileInfo{ |
|
IsDirectory: false, |
|
LastModified: lastModified, |
|
Name: file, |
|
} |
|
} |
|
return toReturn, nil |
|
} |
|
|
|
func (self S3Compatible) Get(key string) (ObjectStorageFile, bool, error) { |
|
if !strings.HasPrefix(key, "/") { |
|
key = fmt.Sprintf("/%s", key) |
|
} |
|
response, err := self.doReq("GET", fmt.Sprintf("%s?delimiter=/", key), nil) |
|
if err != nil { |
|
return ObjectStorageFile{}, false, err |
|
} |
|
|
|
if response.StatusCode == http.StatusNotFound { |
|
return ObjectStorageFile{}, true, errors.New("404 not found") |
|
} |
|
|
|
//Sat, 15 Feb 2020 22:47:15 GMT |
|
lastModified, err := time.Parse(S3_LAST_MODIFIED_HEADER_FORMAT, response.Header.Get("Last-Modified")) |
|
if err != nil { |
|
return ObjectStorageFile{}, false, errors.Wrapf(err, "can't parse Last-Modified header") |
|
} |
|
|
|
bytes, err := ioutil.ReadAll(response.Body) |
|
if err != nil { |
|
return ObjectStorageFile{}, false, errors.Wrap(err, "could not read response body") |
|
} |
|
|
|
toReturn := ObjectStorageFile{ |
|
Name: filepath.Base(key), |
|
LastModified: lastModified, |
|
Content: bytes, |
|
} |
|
|
|
return toReturn, false, nil |
|
} |
|
|
|
func (self S3Compatible) Put(key string, value []byte) error { |
|
_, err := self.doReq("PUT", fmt.Sprintf("%s?delimiter=/", key), bytes.NewBuffer(value)) |
|
return err |
|
} |
|
|
|
func (self S3Compatible) Delete(key string) error { |
|
_, err := self.doReq("DELETE", fmt.Sprintf("%s?delimiter=/", key), nil) |
|
return err |
|
} |
|
|
|
func (self *S3Compatible) doReq(method, pathAndQuery string, body io.Reader) (*http.Response, error) { |
|
url := fmt.Sprintf("%s%s", self.Domain, pathAndQuery) |
|
request, err := http.NewRequest(method, url, body) |
|
if err != nil { |
|
return nil, errors.Wrap(err, "could not create request") |
|
} |
|
request = signer.SignV4(*request, self.KeyId, self.SecretKey, "", self.Region) |
|
|
|
response, err := self.HTTPClient.Do(request) |
|
if err != nil { |
|
return nil, errors.Wrapf(err, "could not HTTP %s %s", method, url) |
|
} |
|
if response.StatusCode == http.StatusForbidden { |
|
return nil, fmt.Errorf("invalid credential for %s, access was forbidden", url) |
|
} else if response.StatusCode == http.StatusNotFound || (response.StatusCode >= 200 && response.StatusCode < 300) { |
|
return response, nil |
|
} else { |
|
return nil, fmt.Errorf("got %d %s from %s", response.StatusCode, response.Status, url) |
|
} |
|
}
|
|
|