518 lines
13 KiB
Go
518 lines
13 KiB
Go
package waymaker
|
|
|
|
// Collections subsystem — Redis-shape Hash / Set / Queue backed by the
|
|
// server's WaymakerCollectionsService RPCs.
|
|
//
|
|
// All wire conventions (subject patterns, tombstone markers) live
|
|
// server-side. This client calls typed RPCs only.
|
|
//
|
|
// Entry points on *Client:
|
|
// Hash: CreateHashStore / GetOrCreateHashStore / HashStore(name) / DeleteHashStore
|
|
// Set: CreateSetStore / GetOrCreateSetStore / SetStore(name) / DeleteSetStore
|
|
// Queue: CreateQueue / GetOrCreateQueue / Queue(name) / DeleteQueue
|
|
|
|
import (
|
|
"context"
|
|
|
|
pb "git.awesomike.com/pub/waymaker-client/go/genpb/collections"
|
|
)
|
|
|
|
// ============================================================
|
|
// Hash
|
|
// ============================================================
|
|
|
|
// HashStoreConfig is the Hash store creation config.
|
|
type HashStoreConfig struct {
|
|
Name string
|
|
MaxBytes *uint64
|
|
Ephemeral bool
|
|
}
|
|
|
|
// HashStore is a reference to a named hash store. Cheap to copy.
|
|
type HashStore struct {
|
|
client *Client
|
|
Name string
|
|
}
|
|
|
|
func newHashStore(c *Client, name string) *HashStore {
|
|
return &HashStore{client: c, Name: name}
|
|
}
|
|
|
|
// Hash returns a handle to the hash identified by hashKey within this store.
|
|
func (s *HashStore) Hash(hashKey string) *Hash {
|
|
return &Hash{client: s.client, bucket: s.Name, hashKey: hashKey}
|
|
}
|
|
|
|
// Hash is a per-key hash within a HashStore.
|
|
type Hash struct {
|
|
client *Client
|
|
bucket string
|
|
hashKey string
|
|
}
|
|
|
|
// Set stores field → value. Returns the revision.
|
|
func (h *Hash) Set(ctx context.Context, field string, value []byte) (uint64, error) {
|
|
c := h.client.collectionsClient()
|
|
r, err := c.HashSet(ctx, &pb.HashSetRequest{
|
|
Bucket: h.bucket,
|
|
HashKey: h.hashKey,
|
|
Field: field,
|
|
Value: value,
|
|
})
|
|
if err != nil {
|
|
return 0, rpcErr(err)
|
|
}
|
|
if !r.GetSuccess() {
|
|
return 0, serverErr(r.GetResultCode(), r.GetMessage())
|
|
}
|
|
return r.GetRevision(), nil
|
|
}
|
|
|
|
// Get returns the value for field. Returns (nil, nil) when absent.
|
|
func (h *Hash) Get(ctx context.Context, field string) ([]byte, error) {
|
|
c := h.client.collectionsClient()
|
|
r, err := c.HashGet(ctx, &pb.HashGetRequest{
|
|
Bucket: h.bucket,
|
|
HashKey: h.hashKey,
|
|
Field: field,
|
|
})
|
|
if err != nil {
|
|
return nil, rpcErr(err)
|
|
}
|
|
if !r.GetSuccess() {
|
|
return nil, serverErr(r.GetResultCode(), r.GetMessage())
|
|
}
|
|
return r.GetValue(), nil
|
|
}
|
|
|
|
// Exists tests whether field exists.
|
|
func (h *Hash) Exists(ctx context.Context, field string) (bool, error) {
|
|
c := h.client.collectionsClient()
|
|
r, err := c.HashExists(ctx, &pb.HashExistsRequest{
|
|
Bucket: h.bucket,
|
|
HashKey: h.hashKey,
|
|
Field: field,
|
|
})
|
|
if err != nil {
|
|
return false, rpcErr(err)
|
|
}
|
|
if !r.GetSuccess() {
|
|
return false, serverErr(r.GetResultCode(), r.GetMessage())
|
|
}
|
|
return r.GetExists(), nil
|
|
}
|
|
|
|
// DeleteField removes field from the hash.
|
|
func (h *Hash) DeleteField(ctx context.Context, field string) error {
|
|
c := h.client.collectionsClient()
|
|
r, err := c.HashDelete(ctx, &pb.HashDeleteRequest{
|
|
Bucket: h.bucket,
|
|
HashKey: h.hashKey,
|
|
Field: field,
|
|
})
|
|
if err != nil {
|
|
return rpcErr(err)
|
|
}
|
|
if !r.GetSuccess() {
|
|
return serverErr(r.GetResultCode(), r.GetMessage())
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Fields returns all field names in the hash.
|
|
func (h *Hash) Fields(ctx context.Context) ([]string, error) {
|
|
c := h.client.collectionsClient()
|
|
r, err := c.HashFields(ctx, &pb.HashFieldsRequest{
|
|
Bucket: h.bucket,
|
|
HashKey: h.hashKey,
|
|
})
|
|
if err != nil {
|
|
return nil, rpcErr(err)
|
|
}
|
|
if !r.GetSuccess() {
|
|
return nil, serverErr(r.GetResultCode(), r.GetMessage())
|
|
}
|
|
return r.GetFields(), nil
|
|
}
|
|
|
|
// Len returns the number of fields in the hash.
|
|
func (h *Hash) Len(ctx context.Context) (uint64, error) {
|
|
c := h.client.collectionsClient()
|
|
r, err := c.HashLen(ctx, &pb.HashLenRequest{
|
|
Bucket: h.bucket,
|
|
HashKey: h.hashKey,
|
|
})
|
|
if err != nil {
|
|
return 0, rpcErr(err)
|
|
}
|
|
if !r.GetSuccess() {
|
|
return 0, serverErr(r.GetResultCode(), r.GetMessage())
|
|
}
|
|
return r.GetCount(), nil
|
|
}
|
|
|
|
// GetAll returns every field → value pair in the hash.
|
|
func (h *Hash) GetAll(ctx context.Context) (map[string][]byte, error) {
|
|
c := h.client.collectionsClient()
|
|
r, err := c.HashGetAll(ctx, &pb.HashGetAllRequest{
|
|
Bucket: h.bucket,
|
|
HashKey: h.hashKey,
|
|
})
|
|
if err != nil {
|
|
return nil, rpcErr(err)
|
|
}
|
|
if !r.GetSuccess() {
|
|
return nil, serverErr(r.GetResultCode(), r.GetMessage())
|
|
}
|
|
out := make(map[string][]byte, len(r.GetEntries()))
|
|
for _, e := range r.GetEntries() {
|
|
out[e.GetField()] = e.GetValue()
|
|
}
|
|
return out, nil
|
|
}
|
|
|
|
// --- Client HashStore entry points ---
|
|
|
|
// CreateHashStore creates a new hash store.
|
|
func (c *Client) CreateHashStore(ctx context.Context, config HashStoreConfig) (*HashStore, error) {
|
|
cc := c.collectionsClient()
|
|
r, err := cc.CreateHashStore(ctx, &pb.CreateHashStoreRequest{
|
|
Name: config.Name,
|
|
MaxBytes: uint64OrZero(config.MaxBytes),
|
|
Ephemeral: config.Ephemeral,
|
|
})
|
|
if err != nil {
|
|
return nil, rpcErr(err)
|
|
}
|
|
if !r.GetSuccess() {
|
|
return nil, serverErr(r.GetResultCode(), r.GetMessage())
|
|
}
|
|
return newHashStore(c, config.Name), nil
|
|
}
|
|
|
|
// GetOrCreateHashStore is idempotent.
|
|
func (c *Client) GetOrCreateHashStore(ctx context.Context, config HashStoreConfig) (*HashStore, error) {
|
|
s, err := c.CreateHashStore(ctx, config)
|
|
if err == nil {
|
|
return s, nil
|
|
}
|
|
if IsServerCode(err, "already_exists") {
|
|
return newHashStore(c, config.Name), nil
|
|
}
|
|
return nil, err
|
|
}
|
|
|
|
// HashStoreHandle returns a handle without verifying existence.
|
|
func (c *Client) HashStoreHandle(name string) *HashStore {
|
|
return newHashStore(c, name)
|
|
}
|
|
|
|
// DeleteHashStore deletes the hash store.
|
|
func (c *Client) DeleteHashStore(ctx context.Context, name string) error {
|
|
cc := c.collectionsClient()
|
|
r, err := cc.DeleteHashStore(ctx, &pb.DeleteHashStoreRequest{Name: name})
|
|
if err != nil {
|
|
return rpcErr(err)
|
|
}
|
|
if !r.GetSuccess() {
|
|
return serverErr(r.GetResultCode(), r.GetMessage())
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// ============================================================
|
|
// Set
|
|
// ============================================================
|
|
|
|
// SetStoreConfig is the Set store creation config.
|
|
type SetStoreConfig struct {
|
|
Name string
|
|
MaxBytes *uint64
|
|
Ephemeral bool
|
|
}
|
|
|
|
// SetStore is a reference to a named set store. Cheap to copy.
|
|
type SetStore struct {
|
|
client *Client
|
|
Name string
|
|
}
|
|
|
|
func newSetStore(c *Client, name string) *SetStore {
|
|
return &SetStore{client: c, Name: name}
|
|
}
|
|
|
|
// Set returns a handle to the set identified by setKey.
|
|
func (s *SetStore) Set(setKey string) *Set {
|
|
return &Set{client: s.client, bucket: s.Name, setKey: setKey}
|
|
}
|
|
|
|
// Set is a per-key set within a SetStore.
|
|
type Set struct {
|
|
client *Client
|
|
bucket string
|
|
setKey string
|
|
}
|
|
|
|
// Add adds member to the set.
|
|
func (s *Set) Add(ctx context.Context, member string) error {
|
|
c := s.client.collectionsClient()
|
|
r, err := c.SetAdd(ctx, &pb.SetAddRequest{
|
|
Bucket: s.bucket,
|
|
SetKey: s.setKey,
|
|
Member: member,
|
|
})
|
|
if err != nil {
|
|
return rpcErr(err)
|
|
}
|
|
if !r.GetSuccess() {
|
|
return serverErr(r.GetResultCode(), r.GetMessage())
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Remove removes member from the set.
|
|
func (s *Set) Remove(ctx context.Context, member string) error {
|
|
c := s.client.collectionsClient()
|
|
r, err := c.SetRemove(ctx, &pb.SetRemoveRequest{
|
|
Bucket: s.bucket,
|
|
SetKey: s.setKey,
|
|
Member: member,
|
|
})
|
|
if err != nil {
|
|
return rpcErr(err)
|
|
}
|
|
if !r.GetSuccess() {
|
|
return serverErr(r.GetResultCode(), r.GetMessage())
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// IsMember tests membership.
|
|
func (s *Set) IsMember(ctx context.Context, member string) (bool, error) {
|
|
c := s.client.collectionsClient()
|
|
r, err := c.SetIsMember(ctx, &pb.SetIsMemberRequest{
|
|
Bucket: s.bucket,
|
|
SetKey: s.setKey,
|
|
Member: member,
|
|
})
|
|
if err != nil {
|
|
return false, rpcErr(err)
|
|
}
|
|
if !r.GetSuccess() {
|
|
return false, serverErr(r.GetResultCode(), r.GetMessage())
|
|
}
|
|
return r.GetIsMember(), nil
|
|
}
|
|
|
|
// Members returns all members.
|
|
func (s *Set) Members(ctx context.Context) ([]string, error) {
|
|
c := s.client.collectionsClient()
|
|
r, err := c.SetMembers(ctx, &pb.SetMembersRequest{
|
|
Bucket: s.bucket,
|
|
SetKey: s.setKey,
|
|
})
|
|
if err != nil {
|
|
return nil, rpcErr(err)
|
|
}
|
|
if !r.GetSuccess() {
|
|
return nil, serverErr(r.GetResultCode(), r.GetMessage())
|
|
}
|
|
return r.GetMembers(), nil
|
|
}
|
|
|
|
// Len returns the number of members.
|
|
func (s *Set) Len(ctx context.Context) (uint64, error) {
|
|
c := s.client.collectionsClient()
|
|
r, err := c.SetLen(ctx, &pb.SetLenRequest{
|
|
Bucket: s.bucket,
|
|
SetKey: s.setKey,
|
|
})
|
|
if err != nil {
|
|
return 0, rpcErr(err)
|
|
}
|
|
if !r.GetSuccess() {
|
|
return 0, serverErr(r.GetResultCode(), r.GetMessage())
|
|
}
|
|
return r.GetCount(), nil
|
|
}
|
|
|
|
// --- Client SetStore entry points ---
|
|
|
|
// CreateSetStore creates a new set store.
|
|
func (c *Client) CreateSetStore(ctx context.Context, config SetStoreConfig) (*SetStore, error) {
|
|
cc := c.collectionsClient()
|
|
r, err := cc.CreateSetStore(ctx, &pb.CreateSetStoreRequest{
|
|
Name: config.Name,
|
|
MaxBytes: uint64OrZero(config.MaxBytes),
|
|
Ephemeral: config.Ephemeral,
|
|
})
|
|
if err != nil {
|
|
return nil, rpcErr(err)
|
|
}
|
|
if !r.GetSuccess() {
|
|
return nil, serverErr(r.GetResultCode(), r.GetMessage())
|
|
}
|
|
return newSetStore(c, config.Name), nil
|
|
}
|
|
|
|
// GetOrCreateSetStore is idempotent.
|
|
func (c *Client) GetOrCreateSetStore(ctx context.Context, config SetStoreConfig) (*SetStore, error) {
|
|
s, err := c.CreateSetStore(ctx, config)
|
|
if err == nil {
|
|
return s, nil
|
|
}
|
|
if IsServerCode(err, "already_exists") {
|
|
return newSetStore(c, config.Name), nil
|
|
}
|
|
return nil, err
|
|
}
|
|
|
|
// SetStoreHandle returns a handle without verifying existence.
|
|
func (c *Client) SetStoreHandle(name string) *SetStore {
|
|
return newSetStore(c, name)
|
|
}
|
|
|
|
// DeleteSetStore deletes the set store.
|
|
func (c *Client) DeleteSetStore(ctx context.Context, name string) error {
|
|
cc := c.collectionsClient()
|
|
r, err := cc.DeleteSetStore(ctx, &pb.DeleteSetStoreRequest{Name: name})
|
|
if err != nil {
|
|
return rpcErr(err)
|
|
}
|
|
if !r.GetSuccess() {
|
|
return serverErr(r.GetResultCode(), r.GetMessage())
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// ============================================================
|
|
// Queue
|
|
// ============================================================
|
|
|
|
// QueueConfig is the Queue creation config.
|
|
type QueueConfig struct {
|
|
Name string
|
|
MaxBytes *uint64
|
|
MaxMessages *uint64
|
|
Ephemeral bool
|
|
}
|
|
|
|
// Queue is an RPUSH/LPOP-style append queue.
|
|
type Queue struct {
|
|
client *Client
|
|
Name string
|
|
}
|
|
|
|
func newQueue(c *Client, name string) *Queue {
|
|
return &Queue{client: c, Name: name}
|
|
}
|
|
|
|
// Push appends value to the queue. Returns the assigned sequence number.
|
|
func (q *Queue) Push(ctx context.Context, value []byte) (uint64, error) {
|
|
c := q.client.collectionsClient()
|
|
r, err := c.QueuePush(ctx, &pb.QueuePushRequest{
|
|
Bucket: q.Name,
|
|
Value: value,
|
|
})
|
|
if err != nil {
|
|
return 0, rpcErr(err)
|
|
}
|
|
if !r.GetSuccess() {
|
|
return 0, serverErr(r.GetResultCode(), r.GetMessage())
|
|
}
|
|
return r.GetSequence(), nil
|
|
}
|
|
|
|
// Pop removes and returns the front element. Returns (nil, nil) when empty.
|
|
func (q *Queue) Pop(ctx context.Context) ([]byte, error) {
|
|
c := q.client.collectionsClient()
|
|
r, err := c.QueuePop(ctx, &pb.QueuePopRequest{Bucket: q.Name})
|
|
if err != nil {
|
|
return nil, rpcErr(err)
|
|
}
|
|
if !r.GetSuccess() {
|
|
return nil, serverErr(r.GetResultCode(), r.GetMessage())
|
|
}
|
|
return r.GetValue(), nil
|
|
}
|
|
|
|
// Range returns up to limit items starting from fromSequence.
|
|
func (q *Queue) Range(ctx context.Context, from, limit uint64) ([][]byte, error) {
|
|
c := q.client.collectionsClient()
|
|
r, err := c.QueueRange(ctx, &pb.QueueRangeRequest{
|
|
Bucket: q.Name,
|
|
FromSequence: from,
|
|
Limit: limit,
|
|
})
|
|
if err != nil {
|
|
return nil, rpcErr(err)
|
|
}
|
|
if !r.GetSuccess() {
|
|
return nil, serverErr(r.GetResultCode(), r.GetMessage())
|
|
}
|
|
return r.GetValues(), nil
|
|
}
|
|
|
|
// Len returns the number of queued messages.
|
|
func (q *Queue) Len(ctx context.Context) (uint64, error) {
|
|
c := q.client.collectionsClient()
|
|
r, err := c.QueueLen(ctx, &pb.QueueLenRequest{Bucket: q.Name})
|
|
if err != nil {
|
|
return 0, rpcErr(err)
|
|
}
|
|
if !r.GetSuccess() {
|
|
return 0, serverErr(r.GetResultCode(), r.GetMessage())
|
|
}
|
|
return r.GetCount(), nil
|
|
}
|
|
|
|
// --- Client Queue entry points ---
|
|
|
|
// CreateQueue creates a new queue.
|
|
func (c *Client) CreateQueue(ctx context.Context, config QueueConfig) (*Queue, error) {
|
|
cc := c.collectionsClient()
|
|
r, err := cc.CreateQueue(ctx, &pb.CreateQueueRequest{
|
|
Name: config.Name,
|
|
MaxBytes: uint64OrZero(config.MaxBytes),
|
|
MaxMessages: uint64OrZero(config.MaxMessages),
|
|
Ephemeral: config.Ephemeral,
|
|
})
|
|
if err != nil {
|
|
return nil, rpcErr(err)
|
|
}
|
|
if !r.GetSuccess() {
|
|
return nil, serverErr(r.GetResultCode(), r.GetMessage())
|
|
}
|
|
return newQueue(c, config.Name), nil
|
|
}
|
|
|
|
// GetOrCreateQueue is idempotent.
|
|
func (c *Client) GetOrCreateQueue(ctx context.Context, config QueueConfig) (*Queue, error) {
|
|
q, err := c.CreateQueue(ctx, config)
|
|
if err == nil {
|
|
return q, nil
|
|
}
|
|
if IsServerCode(err, "already_exists") {
|
|
return newQueue(c, config.Name), nil
|
|
}
|
|
return nil, err
|
|
}
|
|
|
|
// QueueHandle returns a handle without verifying existence.
|
|
func (c *Client) QueueHandle(name string) *Queue {
|
|
return newQueue(c, name)
|
|
}
|
|
|
|
// DeleteQueue deletes the queue.
|
|
func (c *Client) DeleteQueue(ctx context.Context, name string) error {
|
|
cc := c.collectionsClient()
|
|
r, err := cc.DeleteQueue(ctx, &pb.DeleteQueueRequest{Name: name})
|
|
if err != nil {
|
|
return rpcErr(err)
|
|
}
|
|
if !r.GetSuccess() {
|
|
return serverErr(r.GetResultCode(), r.GetMessage())
|
|
}
|
|
return nil
|
|
}
|