waymaker-client/go/README.md

5.7 KiB

waymaker-client (Go)

Official Go client for waymaker — v0.1.27.

go get git.awesomike.com/pub/waymaker-client/go@v0.1.27

The package is waymaker. Generated gRPC stubs live under genpb/ but callers rarely need them directly — the ergonomic wrappers cover everything.

Surfaces

Subsystem Entry points on *Client Notes
lock AcquireLock / AcquireReadLock / MultiLock Full re-acquire semantics (see below)
stream CreateStream / GetStream / GetOrCreateStream Push + pull consumers
kv CreateKV / GetOrCreateKV / KV Put/Get/Create/Update(CAS)/Delete/Keys/History/Watch
collections CreateHashStore / CreateSetStore / CreateQueue Redis-shape Hash/Set/Queue
sketches CreateBloom / CreateHLL / CreateCMS / CreateTopK / CreateTDigest Probabilistic data structures
object CreateObjectStore / GetOrCreateObjectStore Chunked Put/Get/Delete/List
cache CacheAttachPolicy / CacheDetachPolicy / CacheStats Stub — server returns Unimplemented

Connecting

import "git.awesomike.com/pub/waymaker-client/go"

// Single node
client, err := waymaker.Connect(ctx, "localhost:8818")

// Multiple nodes (round-robin load balancing)
client, err := waymaker.ConnectMulti(ctx, []string{"node1:8818", "node2:8828", "node3:8838"})
defer client.Close()

Leader election

The canonical leader-election pattern: AcquireLock with MaxWait=0 (try-and-fail), then SpawnRenewal in its own goroutine independent of the work loop, and lock.Watch() to detect leadership loss.

import (
    "context"
    "time"

    "git.awesomike.com/pub/waymaker-client/go"
)

func runAsLeader(ctx context.Context, client *waymaker.Client) error {
    lock, err := client.AcquireLock(ctx, "leader:reports", waymaker.LockConfig{
        MaxWait:  0,                          // try-acquire — fail immediately if contended
        LeaseTTL: 30 * time.Second,
        Scope:    waymaker.ScopeQuorum,       // Raft-replicated fence token
    })
    if waymaker.IsServerCode(err, "expired") {
        return nil // someone else is leader
    }
    if err != nil {
        return err
    }
    // Renewal runs independently of the work loop — work can legitimately
    // outlive a single lease window without renewing in-band.
    renewal := lock.SpawnRenewal(15 * time.Second)
    defer renewal.Stop()

    // Watch for state changes (fence updates, loss notifications).
    for {
        select {
        case <-ctx.Done():
            return ctx.Err()
        case <-lock.Watch():
            if lock.IsLost() {
                return fmt.Errorf("lost leadership")
            }
        default:
        }

        // Re-read fence BEFORE every fenced side effect.
        if err := doFencedWrite(ctx, lock.FenceToken()); err != nil {
            return err
        }
    }
}

func cleanup(ctx context.Context, lock *waymaker.Lock) {
    _ = lock.Unlock(ctx)
}

Lock semantics

The *Lock handle keeps a background goroutine that holds the server event stream open. If the stream drops — e.g. the key's primary bounces — the goroutine transparently re-binds it by re-issuing the same RequestID with MaxWait=0. A still-held lease on the new primary is recovered rather than re-contended. The lease itself lives on the server's TTL + SpawnRenewal, independent of the stream, so a momentary disconnect does not lose the lock.

Live state:

lock.FenceToken()        // current fence token — re-read before every fenced write
lock.LeaseExpiresAtMs()  // lease expiry epoch ms
lock.IsLost()            // true once the client gives up recovering ownership
lock.State()             // full LockState snapshot
lock.Watch()             // returns a channel closed on every state change

Dropping a *Lock without calling Unlock does NOT release the server-side lock. The lease will expire on its own TTL. This matches the Rust client's semantics: auto-release on drop would silently swallow errors.

KV

bucket, err := client.GetOrCreateKV(ctx, waymaker.KVConfig{Name: "my-bucket"})
rev, err := bucket.Put(ctx, "hello", []byte("world"))
val, err := bucket.Get(ctx, "hello")
rev2, err := bucket.Update(ctx, "hello", []byte("updated"), rev)

// Watch all keys
w, err := bucket.WatchAll(ctx)
for {
    ev, err := w.Next()
    if err != nil || ev == (waymaker.KVEvent{}) { break }
    if ev.Put != nil { fmt.Println("put", ev.Put.Key) }
}

Streams

stream, err := client.GetOrCreateStream(ctx, waymaker.StreamConfig{
    Name:      "events",
    Retention: waymaker.RetentionLimits,
})
ack, err := stream.Publish(ctx, "events.user.123", []byte(`{"action":"login"}`))

consumer, err := stream.GetOrCreateConsumer(ctx, waymaker.ConsumerConfig{
    DurableName:   "processor",
    DeliverPolicy: waymaker.DeliverAll,
    AckWait:       30 * time.Second,
})
msgs, err := consumer.Messages(ctx)
for {
    msg, err := msgs.Next()
    if err != nil || msg == nil { break }
    process(msg.Payload)
    _ = msg.Ack(ctx)
}

Error handling

_, err := client.AcquireLock(ctx, "key", waymaker.LockConfig{MaxWait: 0})
if waymaker.IsServerCode(err, "expired") {
    // lock is contended, no one waiting
}

var werr *waymaker.Error
if errors.As(err, &werr) {
    fmt.Println(werr.Kind, werr.Code, werr.Message)
}

Error kinds:

  • "rpc" — gRPC transport / status error
  • "server"success=false response (with result_code)
  • "invalid" — invalid argument to a wrapper method

Version

v0.1.27 — matches the waymaker server release of the same version.