waymaker-client/go/object.go

303 lines
8.7 KiB
Go

package waymaker
// Object store subsystem — chunked Put/Get/Delete/List/Revisions backed by
// the streams service's PutObject/GetObject RPCs.
//
// An object-store bucket maps to a stream. The wrapper hides the wire
// convention behind a per-bucket *ObjectStore handle.
//
// Entry points on *Client:
// - client.CreateObjectStore(ctx, ObjectStoreConfig{…})
// - client.GetOrCreateObjectStore(ctx, ObjectStoreConfig{…})
// - client.ObjectStore(name) — handle without creation
import (
"context"
pb "git.awesomike.com/pub/waymaker-client/go/genpb/streams"
)
// DefaultChunkSize is the server-side default chunk size (1 MiB).
const DefaultChunkSize uint64 = 1024 * 1024
// ObjectStoreConfig is the bucket creation config.
type ObjectStoreConfig struct {
Name string
MaxBytes *uint64
Ephemeral bool
}
// ObjectInfo is the metadata of a stored object.
type ObjectInfo struct {
Name string
TotalBytes uint64
ChunkCount uint64
ChunkSize uint64
SHA256 string
TsMs int64
Headers [][2]string
Revision uint64 // metadata sequence
Deduped bool
}
func objectInfoFromPB(i *pb.ObjectInfo) ObjectInfo {
hdrs := make([][2]string, len(i.GetHeaders()))
for j, h := range i.GetHeaders() {
hdrs[j] = [2]string{h.GetKey(), h.GetValue()}
}
return ObjectInfo{
Name: i.GetName(),
TotalBytes: i.GetTotalBytes(),
ChunkCount: i.GetChunkCount(),
ChunkSize: i.GetChunkSize(),
SHA256: i.GetSha256(),
TsMs: i.GetTsMs(),
Headers: hdrs,
Revision: i.GetMetadataSeq(),
Deduped: i.GetDeduped(),
}
}
// ObjectEntry is one row returned by ObjectStore.List.
type ObjectEntry struct {
Name string
TotalBytes uint64
Deleted bool
}
// ObjectRevision is one metadata revision returned by ObjectStore.Revisions.
type ObjectRevision struct {
MetadataSeq uint64
TotalBytes uint64
SHA256 string
TsMs int64
Deleted bool
}
// PutOptions controls optional Put parameters.
type PutOptions struct {
ChunkSize uint64 // 0 = server default
Headers [][2]string
SHA256 string // optional pre-computed SHA-256 hex
Dedupe bool
}
// ObjectStore is a reference to an object-store bucket.
type ObjectStore struct {
client *Client
Name string
}
func newObjectStore(c *Client, name string) *ObjectStore {
return &ObjectStore{client: c, Name: name}
}
// Put uploads a small-to-medium object (whole payload in one RPC).
func (s *ObjectStore) Put(ctx context.Context, name string, payload []byte) (ObjectInfo, error) {
return s.PutWith(ctx, name, payload, PutOptions{})
}
// PutWith uploads with explicit options.
func (s *ObjectStore) PutWith(ctx context.Context, name string, payload []byte, opts PutOptions) (ObjectInfo, error) {
pbHeaders := make([]*pb.MessageHeader, len(opts.Headers))
for i, h := range opts.Headers {
pbHeaders[i] = &pb.MessageHeader{Key: h[0], Value: h[1]}
}
c := s.client.streamsClient()
r, err := c.PutObject(ctx, &pb.PutObjectRequest{
Bucket: s.Name,
Name: name,
Payload: payload,
ChunkSize: opts.ChunkSize,
Headers: pbHeaders,
Sha256: opts.SHA256,
Dedupe: opts.Dedupe,
})
if err != nil {
return ObjectInfo{}, rpcErr(err)
}
if !r.GetSuccess() {
return ObjectInfo{}, serverErr(r.GetResultCode(), r.GetMessage())
}
info := r.GetInfo()
if info == nil {
return ObjectInfo{}, serverErr("internal", "missing info in PutObject response")
}
return objectInfoFromPB(info), nil
}
// Get retrieves an object's payload and metadata.
func (s *ObjectStore) Get(ctx context.Context, name string) (ObjectInfo, []byte, error) {
c := s.client.streamsClient()
r, err := c.GetObject(ctx, &pb.GetObjectRequest{Bucket: s.Name, Name: name})
if err != nil {
return ObjectInfo{}, nil, rpcErr(err)
}
if !r.GetSuccess() {
return ObjectInfo{}, nil, serverErr(r.GetResultCode(), r.GetMessage())
}
info := r.GetInfo()
if info == nil {
return ObjectInfo{}, nil, serverErr("internal", "missing info in GetObject response")
}
return objectInfoFromPB(info), r.GetPayload(), nil
}
// GetRange reads a byte range of an object's payload. len=0 reads to EOF.
func (s *ObjectStore) GetRange(ctx context.Context, name string, offset, length uint64) (ObjectInfo, uint64, []byte, error) {
c := s.client.streamsClient()
r, err := c.GetObjectRange(ctx, &pb.GetObjectRangeRequest{
Bucket: s.Name,
Name: name,
Offset: offset,
Len: length,
})
if err != nil {
return ObjectInfo{}, 0, nil, rpcErr(err)
}
if !r.GetSuccess() {
return ObjectInfo{}, 0, nil, serverErr(r.GetResultCode(), r.GetMessage())
}
info := r.GetInfo()
if info == nil {
return ObjectInfo{}, 0, nil, serverErr("internal", "missing info in GetObjectRange response")
}
return objectInfoFromPB(info), r.GetActualOffset(), r.GetPayload(), nil
}
// Info returns object metadata without the payload. Returns (zero, nil) if
// the object has been deleted.
func (s *ObjectStore) Info(ctx context.Context, name string) (*ObjectInfo, error) {
c := s.client.streamsClient()
r, err := c.GetObjectInfo(ctx, &pb.GetObjectInfoRequest{Bucket: s.Name, Name: name})
if err != nil {
return nil, rpcErr(err)
}
if !r.GetSuccess() {
return nil, serverErr(r.GetResultCode(), r.GetMessage())
}
if r.GetDeleted() {
return nil, nil
}
if r.GetInfo() == nil {
return nil, nil
}
info := objectInfoFromPB(r.GetInfo())
return &info, nil
}
// Delete tombstones an object. Returns the tombstone sequence number.
func (s *ObjectStore) Delete(ctx context.Context, name string) (uint64, error) {
c := s.client.streamsClient()
r, err := c.DeleteObject(ctx, &pb.DeleteObjectRequest{Bucket: s.Name, Name: name})
if err != nil {
return 0, rpcErr(err)
}
if !r.GetSuccess() {
return 0, serverErr(r.GetResultCode(), r.GetMessage())
}
return r.GetTombstoneSeq(), nil
}
// List lists objects in the bucket matching namePrefix. Tombstoned entries
// are excluded.
func (s *ObjectStore) List(ctx context.Context, namePrefix string) ([]ObjectEntry, error) {
return s.listInner(ctx, namePrefix, false)
}
// ListWithDeleted lists objects including tombstoned entries.
func (s *ObjectStore) ListWithDeleted(ctx context.Context, namePrefix string) ([]ObjectEntry, error) {
return s.listInner(ctx, namePrefix, true)
}
func (s *ObjectStore) listInner(ctx context.Context, namePrefix string, includeDeleted bool) ([]ObjectEntry, error) {
c := s.client.streamsClient()
r, err := c.ListObjects(ctx, &pb.ListObjectsRequest{
Bucket: s.Name,
NamePrefix: namePrefix,
IncludeDeleted: includeDeleted,
})
if err != nil {
return nil, rpcErr(err)
}
if !r.GetSuccess() {
return nil, serverErr(r.GetResultCode(), r.GetMessage())
}
out := make([]ObjectEntry, len(r.GetEntries()))
for i, e := range r.GetEntries() {
out[i] = ObjectEntry{
Name: e.GetName(),
TotalBytes: e.GetTotalBytes(),
Deleted: e.GetDeleted(),
}
}
return out, nil
}
// Revisions lists every metadata revision of name in sequence order.
func (s *ObjectStore) Revisions(ctx context.Context, name string) ([]ObjectRevision, error) {
c := s.client.streamsClient()
r, err := c.ListObjectRevisions(ctx, &pb.ListObjectRevisionsRequest{
Bucket: s.Name,
Name: name,
FromSeq: 0,
Limit: 0,
})
if err != nil {
return nil, rpcErr(err)
}
if !r.GetSuccess() {
return nil, serverErr(r.GetResultCode(), r.GetMessage())
}
out := make([]ObjectRevision, len(r.GetRevisions()))
for i, rev := range r.GetRevisions() {
out[i] = ObjectRevision{
MetadataSeq: rev.GetMetadataSeq(),
TotalBytes: rev.GetTotalBytes(),
SHA256: rev.GetSha256(),
TsMs: rev.GetTsMs(),
Deleted: rev.GetDeleted(),
}
}
return out, nil
}
// --- Client entry points ---
// CreateObjectStore creates a new object-store bucket.
func (c *Client) CreateObjectStore(ctx context.Context, config ObjectStoreConfig) (*ObjectStore, error) {
sc := StreamConfig{
Name: config.Name,
Subjects: []string{"objm.>", "objc.>"},
Retention: RetentionLimits,
MaxBytes: config.MaxBytes,
Ephemeral: config.Ephemeral,
}
_, err := c.CreateStream(ctx, sc)
if err != nil {
return nil, err
}
return newObjectStore(c, config.Name), nil
}
// GetOrCreateObjectStore is the idempotent create-or-get.
func (c *Client) GetOrCreateObjectStore(ctx context.Context, config ObjectStoreConfig) (*ObjectStore, error) {
sc := StreamConfig{
Name: config.Name,
Subjects: []string{"objm.>", "objc.>"},
Retention: RetentionLimits,
MaxBytes: config.MaxBytes,
Ephemeral: config.Ephemeral,
}
_, err := c.GetOrCreateStream(ctx, sc)
if err != nil {
return nil, err
}
return newObjectStore(c, config.Name), nil
}
// ObjectStoreHandle returns a handle without verifying existence.
func (c *Client) ObjectStoreHandle(name string) *ObjectStore {
return newObjectStore(c, name)
}