package waymaker // Object store subsystem — chunked Put/Get/Delete/List/Revisions backed by // the streams service's PutObject/GetObject RPCs. // // An object-store bucket maps to a stream. The wrapper hides the wire // convention behind a per-bucket *ObjectStore handle. // // Entry points on *Client: // - client.CreateObjectStore(ctx, ObjectStoreConfig{…}) // - client.GetOrCreateObjectStore(ctx, ObjectStoreConfig{…}) // - client.ObjectStore(name) — handle without creation import ( "context" pb "git.awesomike.com/pub/waymaker-client/go/genpb/streams" ) // DefaultChunkSize is the server-side default chunk size (1 MiB). const DefaultChunkSize uint64 = 1024 * 1024 // ObjectStoreConfig is the bucket creation config. type ObjectStoreConfig struct { Name string MaxBytes *uint64 Ephemeral bool } // ObjectInfo is the metadata of a stored object. type ObjectInfo struct { Name string TotalBytes uint64 ChunkCount uint64 ChunkSize uint64 SHA256 string TsMs int64 Headers [][2]string Revision uint64 // metadata sequence Deduped bool } func objectInfoFromPB(i *pb.ObjectInfo) ObjectInfo { hdrs := make([][2]string, len(i.GetHeaders())) for j, h := range i.GetHeaders() { hdrs[j] = [2]string{h.GetKey(), h.GetValue()} } return ObjectInfo{ Name: i.GetName(), TotalBytes: i.GetTotalBytes(), ChunkCount: i.GetChunkCount(), ChunkSize: i.GetChunkSize(), SHA256: i.GetSha256(), TsMs: i.GetTsMs(), Headers: hdrs, Revision: i.GetMetadataSeq(), Deduped: i.GetDeduped(), } } // ObjectEntry is one row returned by ObjectStore.List. type ObjectEntry struct { Name string TotalBytes uint64 Deleted bool } // ObjectRevision is one metadata revision returned by ObjectStore.Revisions. type ObjectRevision struct { MetadataSeq uint64 TotalBytes uint64 SHA256 string TsMs int64 Deleted bool } // PutOptions controls optional Put parameters. type PutOptions struct { ChunkSize uint64 // 0 = server default Headers [][2]string SHA256 string // optional pre-computed SHA-256 hex Dedupe bool } // ObjectStore is a reference to an object-store bucket. type ObjectStore struct { client *Client Name string } func newObjectStore(c *Client, name string) *ObjectStore { return &ObjectStore{client: c, Name: name} } // Put uploads a small-to-medium object (whole payload in one RPC). func (s *ObjectStore) Put(ctx context.Context, name string, payload []byte) (ObjectInfo, error) { return s.PutWith(ctx, name, payload, PutOptions{}) } // PutWith uploads with explicit options. func (s *ObjectStore) PutWith(ctx context.Context, name string, payload []byte, opts PutOptions) (ObjectInfo, error) { pbHeaders := make([]*pb.MessageHeader, len(opts.Headers)) for i, h := range opts.Headers { pbHeaders[i] = &pb.MessageHeader{Key: h[0], Value: h[1]} } c := s.client.streamsClient() r, err := c.PutObject(ctx, &pb.PutObjectRequest{ Bucket: s.Name, Name: name, Payload: payload, ChunkSize: opts.ChunkSize, Headers: pbHeaders, Sha256: opts.SHA256, Dedupe: opts.Dedupe, }) if err != nil { return ObjectInfo{}, rpcErr(err) } if !r.GetSuccess() { return ObjectInfo{}, serverErr(r.GetResultCode(), r.GetMessage()) } info := r.GetInfo() if info == nil { return ObjectInfo{}, serverErr("internal", "missing info in PutObject response") } return objectInfoFromPB(info), nil } // Get retrieves an object's payload and metadata. func (s *ObjectStore) Get(ctx context.Context, name string) (ObjectInfo, []byte, error) { c := s.client.streamsClient() r, err := c.GetObject(ctx, &pb.GetObjectRequest{Bucket: s.Name, Name: name}) if err != nil { return ObjectInfo{}, nil, rpcErr(err) } if !r.GetSuccess() { return ObjectInfo{}, nil, serverErr(r.GetResultCode(), r.GetMessage()) } info := r.GetInfo() if info == nil { return ObjectInfo{}, nil, serverErr("internal", "missing info in GetObject response") } return objectInfoFromPB(info), r.GetPayload(), nil } // GetRange reads a byte range of an object's payload. len=0 reads to EOF. func (s *ObjectStore) GetRange(ctx context.Context, name string, offset, length uint64) (ObjectInfo, uint64, []byte, error) { c := s.client.streamsClient() r, err := c.GetObjectRange(ctx, &pb.GetObjectRangeRequest{ Bucket: s.Name, Name: name, Offset: offset, Len: length, }) if err != nil { return ObjectInfo{}, 0, nil, rpcErr(err) } if !r.GetSuccess() { return ObjectInfo{}, 0, nil, serverErr(r.GetResultCode(), r.GetMessage()) } info := r.GetInfo() if info == nil { return ObjectInfo{}, 0, nil, serverErr("internal", "missing info in GetObjectRange response") } return objectInfoFromPB(info), r.GetActualOffset(), r.GetPayload(), nil } // Info returns object metadata without the payload. Returns (zero, nil) if // the object has been deleted. func (s *ObjectStore) Info(ctx context.Context, name string) (*ObjectInfo, error) { c := s.client.streamsClient() r, err := c.GetObjectInfo(ctx, &pb.GetObjectInfoRequest{Bucket: s.Name, Name: name}) if err != nil { return nil, rpcErr(err) } if !r.GetSuccess() { return nil, serverErr(r.GetResultCode(), r.GetMessage()) } if r.GetDeleted() { return nil, nil } if r.GetInfo() == nil { return nil, nil } info := objectInfoFromPB(r.GetInfo()) return &info, nil } // Delete tombstones an object. Returns the tombstone sequence number. func (s *ObjectStore) Delete(ctx context.Context, name string) (uint64, error) { c := s.client.streamsClient() r, err := c.DeleteObject(ctx, &pb.DeleteObjectRequest{Bucket: s.Name, Name: name}) if err != nil { return 0, rpcErr(err) } if !r.GetSuccess() { return 0, serverErr(r.GetResultCode(), r.GetMessage()) } return r.GetTombstoneSeq(), nil } // List lists objects in the bucket matching namePrefix. Tombstoned entries // are excluded. func (s *ObjectStore) List(ctx context.Context, namePrefix string) ([]ObjectEntry, error) { return s.listInner(ctx, namePrefix, false) } // ListWithDeleted lists objects including tombstoned entries. func (s *ObjectStore) ListWithDeleted(ctx context.Context, namePrefix string) ([]ObjectEntry, error) { return s.listInner(ctx, namePrefix, true) } func (s *ObjectStore) listInner(ctx context.Context, namePrefix string, includeDeleted bool) ([]ObjectEntry, error) { c := s.client.streamsClient() r, err := c.ListObjects(ctx, &pb.ListObjectsRequest{ Bucket: s.Name, NamePrefix: namePrefix, IncludeDeleted: includeDeleted, }) if err != nil { return nil, rpcErr(err) } if !r.GetSuccess() { return nil, serverErr(r.GetResultCode(), r.GetMessage()) } out := make([]ObjectEntry, len(r.GetEntries())) for i, e := range r.GetEntries() { out[i] = ObjectEntry{ Name: e.GetName(), TotalBytes: e.GetTotalBytes(), Deleted: e.GetDeleted(), } } return out, nil } // Revisions lists every metadata revision of name in sequence order. func (s *ObjectStore) Revisions(ctx context.Context, name string) ([]ObjectRevision, error) { c := s.client.streamsClient() r, err := c.ListObjectRevisions(ctx, &pb.ListObjectRevisionsRequest{ Bucket: s.Name, Name: name, FromSeq: 0, Limit: 0, }) if err != nil { return nil, rpcErr(err) } if !r.GetSuccess() { return nil, serverErr(r.GetResultCode(), r.GetMessage()) } out := make([]ObjectRevision, len(r.GetRevisions())) for i, rev := range r.GetRevisions() { out[i] = ObjectRevision{ MetadataSeq: rev.GetMetadataSeq(), TotalBytes: rev.GetTotalBytes(), SHA256: rev.GetSha256(), TsMs: rev.GetTsMs(), Deleted: rev.GetDeleted(), } } return out, nil } // --- Client entry points --- // CreateObjectStore creates a new object-store bucket. func (c *Client) CreateObjectStore(ctx context.Context, config ObjectStoreConfig) (*ObjectStore, error) { sc := StreamConfig{ Name: config.Name, Subjects: []string{"objm.>", "objc.>"}, Retention: RetentionLimits, MaxBytes: config.MaxBytes, Ephemeral: config.Ephemeral, } _, err := c.CreateStream(ctx, sc) if err != nil { return nil, err } return newObjectStore(c, config.Name), nil } // GetOrCreateObjectStore is the idempotent create-or-get. func (c *Client) GetOrCreateObjectStore(ctx context.Context, config ObjectStoreConfig) (*ObjectStore, error) { sc := StreamConfig{ Name: config.Name, Subjects: []string{"objm.>", "objc.>"}, Retention: RetentionLimits, MaxBytes: config.MaxBytes, Ephemeral: config.Ephemeral, } _, err := c.GetOrCreateStream(ctx, sc) if err != nil { return nil, err } return newObjectStore(c, config.Name), nil } // ObjectStoreHandle returns a handle without verifying existence. func (c *Client) ObjectStoreHandle(name string) *ObjectStore { return newObjectStore(c, name) }