177 lines
5.7 KiB
Markdown
177 lines
5.7 KiB
Markdown
# waymaker-client (Go)
|
|
|
|
Official Go client for [waymaker](https://git.awesomike.com/dev/waymaker) — v0.1.27.
|
|
|
|
```
|
|
go get git.awesomike.com/pub/waymaker-client/go@v0.1.27
|
|
```
|
|
|
|
The package is `waymaker`. Generated gRPC stubs live under `genpb/` but
|
|
callers rarely need them directly — the ergonomic wrappers cover everything.
|
|
|
|
## Surfaces
|
|
|
|
| Subsystem | Entry points on `*Client` | Notes |
|
|
|--------------|--------------------------------------------------------|-------|
|
|
| **lock** | `AcquireLock` / `AcquireReadLock` / `MultiLock` | Full re-acquire semantics (see below) |
|
|
| **stream** | `CreateStream` / `GetStream` / `GetOrCreateStream` | Push + pull consumers |
|
|
| **kv** | `CreateKV` / `GetOrCreateKV` / `KV` | Put/Get/Create/Update(CAS)/Delete/Keys/History/Watch |
|
|
| **collections** | `CreateHashStore` / `CreateSetStore` / `CreateQueue` | Redis-shape Hash/Set/Queue |
|
|
| **sketches** | `CreateBloom` / `CreateHLL` / `CreateCMS` / `CreateTopK` / `CreateTDigest` | Probabilistic data structures |
|
|
| **object** | `CreateObjectStore` / `GetOrCreateObjectStore` | Chunked Put/Get/Delete/List |
|
|
| **cache** | `CacheAttachPolicy` / `CacheDetachPolicy` / `CacheStats` | Stub — server returns Unimplemented |
|
|
|
|
## Connecting
|
|
|
|
```go
|
|
import "git.awesomike.com/pub/waymaker-client/go"
|
|
|
|
// Single node
|
|
client, err := waymaker.Connect(ctx, "localhost:8818")
|
|
|
|
// Multiple nodes (round-robin load balancing)
|
|
client, err := waymaker.ConnectMulti(ctx, []string{"node1:8818", "node2:8828", "node3:8838"})
|
|
defer client.Close()
|
|
```
|
|
|
|
## Leader election
|
|
|
|
The canonical leader-election pattern: `AcquireLock` with `MaxWait=0`
|
|
(try-and-fail), then `SpawnRenewal` in its own goroutine independent of
|
|
the work loop, and `lock.Watch()` to detect leadership loss.
|
|
|
|
```go
|
|
import (
|
|
"context"
|
|
"time"
|
|
|
|
"git.awesomike.com/pub/waymaker-client/go"
|
|
)
|
|
|
|
func runAsLeader(ctx context.Context, client *waymaker.Client) error {
|
|
lock, err := client.AcquireLock(ctx, "leader:reports", waymaker.LockConfig{
|
|
MaxWait: 0, // try-acquire — fail immediately if contended
|
|
LeaseTTL: 30 * time.Second,
|
|
Scope: waymaker.ScopeQuorum, // Raft-replicated fence token
|
|
})
|
|
if waymaker.IsServerCode(err, "expired") {
|
|
return nil // someone else is leader
|
|
}
|
|
if err != nil {
|
|
return err
|
|
}
|
|
// Renewal runs independently of the work loop — work can legitimately
|
|
// outlive a single lease window without renewing in-band.
|
|
renewal := lock.SpawnRenewal(15 * time.Second)
|
|
defer renewal.Stop()
|
|
|
|
// Watch for state changes (fence updates, loss notifications).
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
case <-lock.Watch():
|
|
if lock.IsLost() {
|
|
return fmt.Errorf("lost leadership")
|
|
}
|
|
default:
|
|
}
|
|
|
|
// Re-read fence BEFORE every fenced side effect.
|
|
if err := doFencedWrite(ctx, lock.FenceToken()); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
|
|
func cleanup(ctx context.Context, lock *waymaker.Lock) {
|
|
_ = lock.Unlock(ctx)
|
|
}
|
|
```
|
|
|
|
### Lock semantics
|
|
|
|
The `*Lock` handle keeps a background goroutine that holds the server event
|
|
stream open. If the stream drops — e.g. the key's primary bounces — the
|
|
goroutine transparently re-binds it by re-issuing the same `RequestID` with
|
|
`MaxWait=0`. A still-held lease on the new primary is recovered rather than
|
|
re-contended. The lease itself lives on the server's TTL + `SpawnRenewal`,
|
|
independent of the stream, so a momentary disconnect does not lose the lock.
|
|
|
|
Live state:
|
|
|
|
```go
|
|
lock.FenceToken() // current fence token — re-read before every fenced write
|
|
lock.LeaseExpiresAtMs() // lease expiry epoch ms
|
|
lock.IsLost() // true once the client gives up recovering ownership
|
|
lock.State() // full LockState snapshot
|
|
lock.Watch() // returns a channel closed on every state change
|
|
```
|
|
|
|
Dropping a `*Lock` without calling `Unlock` does NOT release the server-side
|
|
lock. The lease will expire on its own TTL. This matches the Rust client's
|
|
semantics: auto-release on drop would silently swallow errors.
|
|
|
|
## KV
|
|
|
|
```go
|
|
bucket, err := client.GetOrCreateKV(ctx, waymaker.KVConfig{Name: "my-bucket"})
|
|
rev, err := bucket.Put(ctx, "hello", []byte("world"))
|
|
val, err := bucket.Get(ctx, "hello")
|
|
rev2, err := bucket.Update(ctx, "hello", []byte("updated"), rev)
|
|
|
|
// Watch all keys
|
|
w, err := bucket.WatchAll(ctx)
|
|
for {
|
|
ev, err := w.Next()
|
|
if err != nil || ev == (waymaker.KVEvent{}) { break }
|
|
if ev.Put != nil { fmt.Println("put", ev.Put.Key) }
|
|
}
|
|
```
|
|
|
|
## Streams
|
|
|
|
```go
|
|
stream, err := client.GetOrCreateStream(ctx, waymaker.StreamConfig{
|
|
Name: "events",
|
|
Retention: waymaker.RetentionLimits,
|
|
})
|
|
ack, err := stream.Publish(ctx, "events.user.123", []byte(`{"action":"login"}`))
|
|
|
|
consumer, err := stream.GetOrCreateConsumer(ctx, waymaker.ConsumerConfig{
|
|
DurableName: "processor",
|
|
DeliverPolicy: waymaker.DeliverAll,
|
|
AckWait: 30 * time.Second,
|
|
})
|
|
msgs, err := consumer.Messages(ctx)
|
|
for {
|
|
msg, err := msgs.Next()
|
|
if err != nil || msg == nil { break }
|
|
process(msg.Payload)
|
|
_ = msg.Ack(ctx)
|
|
}
|
|
```
|
|
|
|
## Error handling
|
|
|
|
```go
|
|
_, err := client.AcquireLock(ctx, "key", waymaker.LockConfig{MaxWait: 0})
|
|
if waymaker.IsServerCode(err, "expired") {
|
|
// lock is contended, no one waiting
|
|
}
|
|
|
|
var werr *waymaker.Error
|
|
if errors.As(err, &werr) {
|
|
fmt.Println(werr.Kind, werr.Code, werr.Message)
|
|
}
|
|
```
|
|
|
|
Error kinds:
|
|
- `"rpc"` — gRPC transport / status error
|
|
- `"server"` — `success=false` response (with result_code)
|
|
- `"invalid"` — invalid argument to a wrapper method
|
|
|
|
## Version
|
|
|
|
v0.1.27 — matches the waymaker server release of the same version.
|