waymaker-client/go/lock.go

775 lines
21 KiB
Go

package waymaker
// Locks subsystem — wraps the rwlock RPCs (Lock / ReadLock / UnLock /
// LeaseStatus / ExtendLease / MultiLock).
//
// Entry points live on *Client:
// - client.AcquireLock(ctx, key, LockConfig{…}) — exclusive (write) lock
// - client.AcquireReadLock(ctx, key, LockConfig{…}) — shared (read) lock
// - client.LeaseStatus(ctx, key, id)
// - client.MultiLock(ctx, keys, LockConfig{…})
//
// The returned *Lock keeps a background goroutine that holds the server
// event stream open and, if that stream drops (e.g. the key's primary
// bounces), transparently re-binds it and re-confirms ownership — reusing
// the original RequestID so a still-held lease is recovered rather than
// re-contended. The lock's live state is published through Lock.Watch().
import (
"context"
"fmt"
"io"
"sync"
"time"
pb "git.awesomike.com/pub/waymaker-client/go/genpb/locks"
"google.golang.org/grpc"
)
// Ensure grpc is used (it's imported for CallOption in ListAcquiredLocks).
var _ = grpc.EmptyCallOption{}
// --- Scope ---
// Scope is the fence-token durability tier. It controls ONE thing: how
// durable the per-key fence_token counter (a monotonic uint64) is across
// failures. It does NOT control whether the lease survives node failures
// (that is replication_factor), nor mutual exclusion (you must still
// fence at the side-effect level).
type Scope int32
const (
// ScopeEphemeral — counter in RAM on the owning node. Resets on
// process restart or hash-ring rebalance. Fastest; no I/O.
ScopeEphemeral Scope = 1
// ScopeLocal — counter persisted to disk on the owning node.
// Survives process restart; resets on ring rebalance.
ScopeLocal Scope = 2
// ScopeQuorum — Raft-replicated per-key counter, cluster-wide
// monotonic. Survives any single-node failure.
ScopeQuorum Scope = 3
)
// --- LockConfig ---
// LockConfig is the acquire configuration. Zero value is valid:
// MaxWait defaults to 1 hour, LeaseTTL to 30 s, RequesterApplication
// to "waymaker-client-go". RequestID is auto-filled with a UUID if empty.
type LockConfig struct {
// MaxWait is how long the server will block waiting for the lock.
// Zero means try-acquire (fail immediately if contended).
// Default: 1 hour.
MaxWait time.Duration
// LeaseTTL is how long the lease lives once acquired.
// Default: 30 s.
LeaseTTL time.Duration
// Priority class — higher values jump the wait queue.
Priority uint32
// Scope controls fence-token durability.
Scope Scope
// RequesterInfo is free-form metadata for server-side audit.
RequesterInfo string
// RequesterApplication is the application name.
// Default: "waymaker-client-go".
RequesterApplication string
// RequestID is the idempotency key for retries of the same acquire.
// Auto-filled with a UUID if empty.
RequestID string
}
func (c LockConfig) withDefaults() LockConfig {
if c.MaxWait == 0 {
c.MaxWait = time.Hour
}
if c.LeaseTTL == 0 {
c.LeaseTTL = 30 * time.Second
}
if c.RequesterApplication == "" {
c.RequesterApplication = "waymaker-client-go"
}
if c.RequestID == "" {
c.RequestID = newUUID()
}
if c.Scope == 0 {
c.Scope = ScopeEphemeral
}
return c
}
func durationToMs(d time.Duration) uint32 {
ms := d.Milliseconds()
if ms < 0 {
return 0
}
if ms > int64(^uint32(0)) {
return ^uint32(0)
}
return uint32(ms)
}
func (c LockConfig) toLockRequest(key string) *pb.LockRequest {
return &pb.LockRequest{
Key: key,
MaxWaitPeriod: durationToMs(c.MaxWait),
MaxLeasePeriod: durationToMs(c.LeaseTTL),
Priority: c.Priority,
RequesterInfo: c.RequesterInfo,
RequesterApplication: c.RequesterApplication,
RequestId: c.RequestID,
FenceScope: pb.FenceScope(c.Scope),
}
}
// --- Lease ---
// Lease holds the details of a currently-held lock lease.
type Lease struct {
ID string
Key string
AcquiredAtMs int64
LeaseExpiresAtMs int64
FenceToken uint64
Priority uint32
}
func leaseFromPB(l *pb.Lease) Lease {
return Lease{
ID: l.GetId(),
Key: l.GetKey(),
AcquiredAtMs: l.GetCreatedAt(),
LeaseExpiresAtMs: l.GetLeaseExpiresAt(),
FenceToken: l.GetFenceToken(),
Priority: l.GetPriority(),
}
}
// --- LockState ---
// LockState is a live snapshot of a held lock, delivered through
// Lock.Watch(). FenceToken and ID change only if the lock was lost
// and transparently re-won after a primary failure; Lost flips to true
// once the client gives up re-establishing ownership.
type LockState struct {
// ID is the lease id. Stable across transparent re-binds.
ID string
// FenceToken is the current fence token. Re-read before every
// fenced side effect.
FenceToken uint64
// LeaseExpiresAtMs is the lease expiry epoch (ms).
LeaseExpiresAtMs int64
// Lost is true once the client could no longer prove ownership.
// A lost holder MUST stop acting as the holder.
Lost bool
}
// --- Lock ---
// Lock is an acquired lock handle. Dropping the handle does NOT
// auto-release the lock — call Unlock explicitly (or let the lease
// expire). This matches the underlying RPC semantics.
//
// A background goroutine keeps the server event stream open. If the
// stream drops — typically because the key's primary bounced — the
// goroutine transparently re-binds it, reusing the original RequestID
// so a still-held lease is recovered rather than re-contended. The
// lease itself is kept alive by the server's TTL plus SpawnRenewal,
// independent of the stream.
type Lock struct {
client *Client
Key string
mu sync.RWMutex
state LockState
// stateCh is closed and replaced whenever state changes, so
// Watch() receivers can block on the current channel.
stateCh chan struct{}
// stopCh is closed to signal the hold goroutine to stop.
stopCh chan struct{}
// stopOnce guards the single close of stopCh.
stopOnce sync.Once
holdDone chan struct{}
}
// ID returns the current lease id (live).
func (l *Lock) ID() string {
l.mu.RLock()
defer l.mu.RUnlock()
return l.state.ID
}
// FenceToken returns the current fence token (live). Re-read before
// every fenced side effect.
func (l *Lock) FenceToken() uint64 {
l.mu.RLock()
defer l.mu.RUnlock()
return l.state.FenceToken
}
// LeaseExpiresAtMs returns the lease expiry epoch ms (live).
func (l *Lock) LeaseExpiresAtMs() int64 {
l.mu.RLock()
defer l.mu.RUnlock()
return l.state.LeaseExpiresAtMs
}
// IsLost returns true once the client has lost the lock and could not
// re-win it. A lost holder must stop acting as the holder.
func (l *Lock) IsLost() bool {
l.mu.RLock()
defer l.mu.RUnlock()
return l.state.Lost
}
// State returns a copy of the current LockState.
func (l *Lock) State() LockState {
l.mu.RLock()
defer l.mu.RUnlock()
return l.state
}
// Watch returns a channel that is closed each time the lock state changes.
// Callers should call State() (or the individual accessors) after receiving
// from the channel to get the updated values. A new channel is returned on
// each call; each caller gets independent notification.
//
// Usage:
//
// ch := lock.Watch()
// for {
// <-ch
// if lock.IsLost() { ... }
// ch = lock.Watch() // re-subscribe for next change
// }
func (l *Lock) Watch() <-chan struct{} {
l.mu.RLock()
defer l.mu.RUnlock()
return l.stateCh
}
// publishState updates the internal state and notifies watchers.
// Must be called with mu held for write.
func (l *Lock) publishLocked(next LockState) {
if l.state == next {
return
}
l.state = next
old := l.stateCh
l.stateCh = make(chan struct{})
close(old)
}
// Extend the lease by additional duration.
func (l *Lock) Extend(ctx context.Context, additional time.Duration) (Lease, error) {
c := l.client.locksClient()
r, err := c.ExtendLease(ctx, &pb.ExtendLeaseRequest{
Key: l.Key,
Id: l.ID(),
LeaseTimeout: durationToMs(additional),
})
if err != nil {
return Lease{}, rpcErr(err)
}
if !r.GetSuccess() {
return Lease{}, serverErr(r.GetResultCode(), r.GetMessage())
}
lease := r.GetLease()
if lease == nil {
return Lease{}, serverErr("internal", "missing lease in ExtendLease response")
}
return leaseFromPB(lease), nil
}
// Unlock releases the lock. After this returns, the server-side state
// is gone. The background hold goroutine is stopped before the UnLock
// RPC is sent so it cannot re-acquire a lock the caller is releasing.
func (l *Lock) Unlock(ctx context.Context) error {
// Signal the hold goroutine to stop FIRST so it cannot race with
// the UnLock RPC and resurrect a just-released lock.
l.stop()
// Wait for the hold goroutine to finish before sending UnLock, so
// we don't race a re-acquire with the explicit release.
<-l.holdDone
id := l.ID()
c := l.client.locksClient()
r, err := c.UnLock(ctx, &pb.UnLockRequest{
Key: l.Key,
Id: id,
})
if err != nil {
return rpcErr(err)
}
if !r.GetSuccess() {
return serverErr(r.GetResultCode(), r.GetMessage())
}
return nil
}
func (l *Lock) stop() {
l.stopOnce.Do(func() {
close(l.stopCh)
})
}
// RenewalHandle keeps a background renewal task alive. Drop it (or call
// Stop) to halt renewal cleanly.
type RenewalHandle struct {
stopCh chan struct{}
stopOnce sync.Once
done chan struct{}
}
// Stop halts the renewal task. After this returns, no further
// ExtendLease RPCs will be sent.
func (h *RenewalHandle) Stop() {
h.stopOnce.Do(func() {
close(h.stopCh)
})
<-h.done
}
// SpawnRenewal spawns a background goroutine that periodically extends
// the lease. Returns a RenewalHandle — call Stop() or let it be
// garbage-collected (which aborts the goroutine) to halt renewal.
//
// The renewal reads the live id each tick so a transparent re-acquire
// after a primary failure renews the correct lease.
//
// The timer fires the TTL renewal at every after the first tick (which
// is skipped — the caller has the freshly-acquired lease). The extend
// call is bounded to every duration so a hung server can't freeze the
// renewal task.
func (l *Lock) SpawnRenewal(every time.Duration) *RenewalHandle {
h := &RenewalHandle{
stopCh: make(chan struct{}),
done: make(chan struct{}),
}
ttl := every * 2
if ttl > time.Duration(^uint32(0))*time.Millisecond {
ttl = time.Duration(^uint32(0)) * time.Millisecond
}
go func() {
defer close(h.done)
ticker := time.NewTicker(every)
defer ticker.Stop()
// Skip the first tick so we don't extend immediately after acquire.
select {
case <-ticker.C:
// discard the first tick
case <-h.stopCh:
return
case <-l.stopCh:
return
}
for {
select {
case <-h.stopCh:
return
case <-l.stopCh:
return
case <-ticker.C:
state := l.State()
if state.Lost {
continue
}
ctx, cancel := context.WithTimeout(context.Background(), every)
c := l.client.locksClient()
_, _ = c.ExtendLease(ctx, &pb.ExtendLeaseRequest{
Key: l.Key,
Id: state.ID,
LeaseTimeout: durationToMs(ttl),
})
cancel()
}
}
}()
return h
}
// --- background hold-loop internals ---
const (
holdBaseBackoff = 200 * time.Millisecond
holdMaxBackoff = 10 * time.Second
holdRPCTimeout = 10 * time.Second
)
type drainResult int
const (
drainStopped drainResult = iota // stop signal received
drainDisconnected // stream ended; try to re-establish
)
type reboundResult struct {
stream pb.WaymakerService_LockClient
id string
fence uint64
exp int64
ok bool
}
type ownershipResult int
const (
ownershipHeld ownershipResult = iota
ownershipLost
ownershipUnknown
)
// drainStream reads events from stream until it closes, stop is signalled,
// or the lease is reported gone. Heartbeat / Acquired events update lock state.
func (l *Lock) drainStream(stream pb.WaymakerService_LockClient) drainResult {
for {
// Check stop before blocking on Recv.
select {
case <-l.stopCh:
return drainStopped
default:
}
// Use a channel to race Recv against stopCh.
type recvResult struct {
ev *pb.LockEvent
err error
}
ch := make(chan recvResult, 1)
go func() {
ev, err := stream.Recv()
ch <- recvResult{ev, err}
}()
select {
case <-l.stopCh:
return drainStopped
case res := <-ch:
if res.err != nil {
// io.EOF or any other error: stream ended.
return drainDisconnected
}
ev := res.ev
switch ev.GetEventType() {
case pb.LockEventType_Heartbeat:
l.mu.Lock()
next := l.state
next.LeaseExpiresAtMs = ev.GetLeaseExpiresAt()
l.publishLocked(next)
l.mu.Unlock()
case pb.LockEventType_Acquired:
l.mu.Lock()
next := LockState{
ID: ev.GetId(),
FenceToken: ev.GetFenceToken(),
LeaseExpiresAtMs: ev.GetLeaseExpiresAt(),
Lost: false,
}
l.publishLocked(next)
l.mu.Unlock()
case pb.LockEventType_Expired, pb.LockEventType_Failed:
return drainDisconnected
default:
// Waiting, Unknown: ignore.
}
}
}
}
// reacquire tries one idempotent re-acquire (max_wait=0) using the
// original request_id. Returns reboundResult.ok=true on success.
func (l *Lock) reacquire(read bool, req *pb.LockRequest) reboundResult {
ctx, cancel := context.WithTimeout(context.Background(), holdRPCTimeout)
defer cancel()
var stream pb.WaymakerService_LockClient
var err error
c := l.client.locksClient()
if read {
stream, err = c.ReadLock(ctx, req)
} else {
stream, err = c.Lock(ctx, req)
}
if err != nil {
return reboundResult{}
}
for {
ev, err := stream.Recv()
if err != nil {
return reboundResult{}
}
switch ev.GetEventType() {
case pb.LockEventType_Acquired:
return reboundResult{
stream: stream,
id: ev.GetId(),
fence: ev.GetFenceToken(),
exp: ev.GetLeaseExpiresAt(),
ok: true,
}
case pb.LockEventType_Failed, pb.LockEventType_Expired:
return reboundResult{}
default:
// Waiting, Heartbeat — keep reading.
}
}
}
// confirmOwnership checks whether the lock's current id still owns the key.
func (l *Lock) confirmOwnership(key, id string) (ownershipResult, int64) {
ctx, cancel := context.WithTimeout(context.Background(), holdRPCTimeout)
defer cancel()
c := l.client.locksClient()
r, err := c.LeaseStatus(ctx, &pb.LeaseStatusRequest{Key: key, Id: id})
if err != nil {
return ownershipUnknown, 0
}
if !r.GetSuccess() || r.GetLease() == nil {
return ownershipLost, 0
}
return ownershipHeld, r.GetLease().GetLeaseExpiresAt()
}
// holdLoop is the background goroutine. It drains the stream, re-binds on
// drop, and marks the lock lost only when ownership cannot be confirmed.
func (l *Lock) holdLoop(read bool, stream pb.WaymakerService_LockClient, reacquireReq *pb.LockRequest) {
defer close(l.holdDone)
for {
switch l.drainStream(stream) {
case drainStopped:
return
case drainDisconnected:
// Fall through to re-establish.
}
backoff := holdBaseBackoff
for {
select {
case <-l.stopCh:
return
default:
}
rb := l.reacquire(read, reacquireReq)
if rb.ok {
l.mu.Lock()
l.publishLocked(LockState{
ID: rb.id,
FenceToken: rb.fence,
LeaseExpiresAtMs: rb.exp,
Lost: false,
})
l.mu.Unlock()
stream = rb.stream
break // resume draining the fresh stream
}
// Could not re-acquire. Check if we still hold the lease.
curID := l.ID()
own, exp := l.confirmOwnership(l.Key, curID)
switch own {
case ownershipHeld:
l.mu.Lock()
next := l.state
next.LeaseExpiresAtMs = exp
l.publishLocked(next)
l.mu.Unlock()
case ownershipUnknown:
// Transient — back off and retry.
case ownershipLost:
l.mu.Lock()
next := l.state
next.Lost = true
l.publishLocked(next)
l.mu.Unlock()
return
}
select {
case <-l.stopCh:
return
case <-time.After(backoff):
}
backoff *= 2
if backoff > holdMaxBackoff {
backoff = holdMaxBackoff
}
continue
}
// Broke out of inner loop — resume outer drain loop.
}
}
// --- Client entry points ---
// AcquireLock acquires an exclusive (write) lock. Blocks for at most
// config.MaxWait; returns an error with Code=="expired" if the wait
// elapses without a grant.
func (c *Client) AcquireLock(ctx context.Context, key string, config LockConfig) (*Lock, error) {
return c.acquireLockInner(ctx, key, config, false)
}
// AcquireReadLock acquires a shared (read) lock.
func (c *Client) AcquireReadLock(ctx context.Context, key string, config LockConfig) (*Lock, error) {
return c.acquireLockInner(ctx, key, config, true)
}
func (c *Client) acquireLockInner(ctx context.Context, key string, config LockConfig, read bool) (*Lock, error) {
config = config.withDefaults()
req := config.toLockRequest(key)
// Template for the re-acquire: same request_id (idempotent recovery)
// but max_wait = 0 so it never blocks.
// Construct fresh to avoid copying a proto message (contains sync.Mutex).
reacquireReq := &pb.LockRequest{
Key: req.Key,
MaxWaitPeriod: 0,
MaxLeasePeriod: req.MaxLeasePeriod,
Priority: req.Priority,
RequesterInfo: req.RequesterInfo,
RequesterApplication: req.RequesterApplication,
RequestId: req.RequestId,
FenceScope: req.FenceScope,
}
lc := c.locksClient()
var stream pb.WaymakerService_LockClient
var err error
if read {
stream, err = lc.ReadLock(ctx, req)
} else {
stream, err = lc.Lock(ctx, req)
}
if err != nil {
return nil, rpcErr(err)
}
// Consume events until Acquired / Failed / Expired.
for {
ev, err := stream.Recv()
if err != nil {
if err == io.EOF {
return nil, serverErr("stream_closed", "lock stream closed before acquire")
}
return nil, rpcErr(err)
}
switch ev.GetEventType() {
case pb.LockEventType_Acquired:
init := LockState{
ID: ev.GetId(),
FenceToken: ev.GetFenceToken(),
LeaseExpiresAtMs: ev.GetLeaseExpiresAt(),
Lost: false,
}
lock := &Lock{
client: c,
Key: key,
state: init,
stateCh: make(chan struct{}),
stopCh: make(chan struct{}),
holdDone: make(chan struct{}),
}
go lock.holdLoop(read, stream, reacquireReq)
return lock, nil
case pb.LockEventType_Failed:
return nil, serverErr("failed", ev.GetMessage())
case pb.LockEventType_Expired:
return nil, serverErr("expired", ev.GetMessage())
default:
// Waiting / Heartbeat / Unknown — keep reading.
}
}
}
// LeaseStatus queries the current state of a lock by id.
func (c *Client) LeaseStatus(ctx context.Context, key, id string) (Lease, error) {
lc := c.locksClient()
r, err := lc.LeaseStatus(ctx, &pb.LeaseStatusRequest{Key: key, Id: id})
if err != nil {
return Lease{}, rpcErr(err)
}
if !r.GetSuccess() {
return Lease{}, serverErr(r.GetResultCode(), r.GetMessage())
}
lease := r.GetLease()
if lease == nil {
return Lease{}, serverErr("internal", "missing lease in LeaseStatus response")
}
return leaseFromPB(lease), nil
}
// MultiLockKey is one entry in a MultiLock request.
type MultiLockKey struct {
Key string
WriteLock bool
}
// MultiLock acquires N locks atomically. The server sorts keys to
// guarantee deadlock-free ordering. On any failure every partial lock is
// released before the call returns.
//
// Returns leases in the server's acquisition order (lexicographic by key).
func (c *Client) MultiLock(ctx context.Context, keys []MultiLockKey, config LockConfig) ([]Lease, error) {
config = config.withDefaults()
pbKeys := make([]*pb.MultiLockKey, len(keys))
for i, k := range keys {
pbKeys[i] = &pb.MultiLockKey{Key: k.Key, WriteLock: k.WriteLock}
}
lc := c.locksClient()
r, err := lc.MultiLock(ctx, &pb.MultiLockRequest{
Keys: pbKeys,
MaxWaitPeriod: durationToMs(config.MaxWait),
MaxLeasePeriod: durationToMs(config.LeaseTTL),
Priority: config.Priority,
RequesterInfo: config.RequesterInfo,
RequesterApplication: config.RequesterApplication,
RequestId: config.RequestID,
FenceScope: pb.FenceScope(config.Scope),
})
if err != nil {
return nil, rpcErr(err)
}
if !r.GetSuccess() {
return nil, serverErr(r.GetResultCode(), r.GetMessage())
}
leases := make([]Lease, len(r.GetLeases()))
for i, l := range r.GetLeases() {
leases[i] = leaseFromPB(l)
}
return leases, nil
}
// ListAcquiredLocks returns every lock currently held on the node
// serving the request. Optionally filtered by key prefix.
func (c *Client) ListAcquiredLocks(ctx context.Context, keyPrefix string, opts ...grpc.CallOption) ([]*pb.AcquiredLock, error) {
lc := c.locksClient()
r, err := lc.ListAcquiredLocks(ctx, &pb.ListAcquiredLocksRequest{KeyPrefix: keyPrefix}, opts...)
if err != nil {
return nil, rpcErr(err)
}
if !r.GetSuccess() {
return nil, serverErr("list_failed", "ListAcquiredLocks returned success=false")
}
return r.GetLocks(), nil
}
// --- UUID helper (no external dependency) ---
func newUUID() string {
// Simple time-based UUID v4 using crypto/rand.
b := make([]byte, 16)
_, _ = cryptoRandRead(b)
b[6] = (b[6] & 0x0f) | 0x40
b[8] = (b[8] & 0x3f) | 0x80
return fmt.Sprintf("%08x-%04x-%04x-%04x-%012x",
b[0:4], b[4:6], b[6:8], b[8:10], b[10:16])
}