waymaker-client/go/README.md

177 lines
5.7 KiB
Markdown

# waymaker-client (Go)
Official Go client for [waymaker](https://git.awesomike.com/dev/waymaker) — v0.1.27.
```
go get git.awesomike.com/pub/waymaker-client/go@v0.1.27
```
The package is `waymaker`. Generated gRPC stubs live under `genpb/` but
callers rarely need them directly — the ergonomic wrappers cover everything.
## Surfaces
| Subsystem | Entry points on `*Client` | Notes |
|--------------|--------------------------------------------------------|-------|
| **lock** | `AcquireLock` / `AcquireReadLock` / `MultiLock` | Full re-acquire semantics (see below) |
| **stream** | `CreateStream` / `GetStream` / `GetOrCreateStream` | Push + pull consumers |
| **kv** | `CreateKV` / `GetOrCreateKV` / `KV` | Put/Get/Create/Update(CAS)/Delete/Keys/History/Watch |
| **collections** | `CreateHashStore` / `CreateSetStore` / `CreateQueue` | Redis-shape Hash/Set/Queue |
| **sketches** | `CreateBloom` / `CreateHLL` / `CreateCMS` / `CreateTopK` / `CreateTDigest` | Probabilistic data structures |
| **object** | `CreateObjectStore` / `GetOrCreateObjectStore` | Chunked Put/Get/Delete/List |
| **cache** | `CacheAttachPolicy` / `CacheDetachPolicy` / `CacheStats` | Stub — server returns Unimplemented |
## Connecting
```go
import "git.awesomike.com/pub/waymaker-client/go"
// Single node
client, err := waymaker.Connect(ctx, "localhost:8818")
// Multiple nodes (round-robin load balancing)
client, err := waymaker.ConnectMulti(ctx, []string{"node1:8818", "node2:8828", "node3:8838"})
defer client.Close()
```
## Leader election
The canonical leader-election pattern: `AcquireLock` with `MaxWait=0`
(try-and-fail), then `SpawnRenewal` in its own goroutine independent of
the work loop, and `lock.Watch()` to detect leadership loss.
```go
import (
"context"
"time"
"git.awesomike.com/pub/waymaker-client/go"
)
func runAsLeader(ctx context.Context, client *waymaker.Client) error {
lock, err := client.AcquireLock(ctx, "leader:reports", waymaker.LockConfig{
MaxWait: 0, // try-acquire — fail immediately if contended
LeaseTTL: 30 * time.Second,
Scope: waymaker.ScopeQuorum, // Raft-replicated fence token
})
if waymaker.IsServerCode(err, "expired") {
return nil // someone else is leader
}
if err != nil {
return err
}
// Renewal runs independently of the work loop — work can legitimately
// outlive a single lease window without renewing in-band.
renewal := lock.SpawnRenewal(15 * time.Second)
defer renewal.Stop()
// Watch for state changes (fence updates, loss notifications).
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-lock.Watch():
if lock.IsLost() {
return fmt.Errorf("lost leadership")
}
default:
}
// Re-read fence BEFORE every fenced side effect.
if err := doFencedWrite(ctx, lock.FenceToken()); err != nil {
return err
}
}
}
func cleanup(ctx context.Context, lock *waymaker.Lock) {
_ = lock.Unlock(ctx)
}
```
### Lock semantics
The `*Lock` handle keeps a background goroutine that holds the server event
stream open. If the stream drops — e.g. the key's primary bounces — the
goroutine transparently re-binds it by re-issuing the same `RequestID` with
`MaxWait=0`. A still-held lease on the new primary is recovered rather than
re-contended. The lease itself lives on the server's TTL + `SpawnRenewal`,
independent of the stream, so a momentary disconnect does not lose the lock.
Live state:
```go
lock.FenceToken() // current fence token — re-read before every fenced write
lock.LeaseExpiresAtMs() // lease expiry epoch ms
lock.IsLost() // true once the client gives up recovering ownership
lock.State() // full LockState snapshot
lock.Watch() // returns a channel closed on every state change
```
Dropping a `*Lock` without calling `Unlock` does NOT release the server-side
lock. The lease will expire on its own TTL. This matches the Rust client's
semantics: auto-release on drop would silently swallow errors.
## KV
```go
bucket, err := client.GetOrCreateKV(ctx, waymaker.KVConfig{Name: "my-bucket"})
rev, err := bucket.Put(ctx, "hello", []byte("world"))
val, err := bucket.Get(ctx, "hello")
rev2, err := bucket.Update(ctx, "hello", []byte("updated"), rev)
// Watch all keys
w, err := bucket.WatchAll(ctx)
for {
ev, err := w.Next()
if err != nil || ev == (waymaker.KVEvent{}) { break }
if ev.Put != nil { fmt.Println("put", ev.Put.Key) }
}
```
## Streams
```go
stream, err := client.GetOrCreateStream(ctx, waymaker.StreamConfig{
Name: "events",
Retention: waymaker.RetentionLimits,
})
ack, err := stream.Publish(ctx, "events.user.123", []byte(`{"action":"login"}`))
consumer, err := stream.GetOrCreateConsumer(ctx, waymaker.ConsumerConfig{
DurableName: "processor",
DeliverPolicy: waymaker.DeliverAll,
AckWait: 30 * time.Second,
})
msgs, err := consumer.Messages(ctx)
for {
msg, err := msgs.Next()
if err != nil || msg == nil { break }
process(msg.Payload)
_ = msg.Ack(ctx)
}
```
## Error handling
```go
_, err := client.AcquireLock(ctx, "key", waymaker.LockConfig{MaxWait: 0})
if waymaker.IsServerCode(err, "expired") {
// lock is contended, no one waiting
}
var werr *waymaker.Error
if errors.As(err, &werr) {
fmt.Println(werr.Kind, werr.Code, werr.Message)
}
```
Error kinds:
- `"rpc"` — gRPC transport / status error
- `"server"``success=false` response (with result_code)
- `"invalid"` — invalid argument to a wrapper method
## Version
v0.1.27 — matches the waymaker server release of the same version.