package waymaker // Locks subsystem — wraps the rwlock RPCs (Lock / ReadLock / UnLock / // LeaseStatus / ExtendLease / MultiLock). // // Entry points live on *Client: // - client.AcquireLock(ctx, key, LockConfig{…}) — exclusive (write) lock // - client.AcquireReadLock(ctx, key, LockConfig{…}) — shared (read) lock // - client.LeaseStatus(ctx, key, id) // - client.MultiLock(ctx, keys, LockConfig{…}) // // The returned *Lock keeps a background goroutine that holds the server // event stream open and, if that stream drops (e.g. the key's primary // bounces), transparently re-binds it and re-confirms ownership — reusing // the original RequestID so a still-held lease is recovered rather than // re-contended. The lock's live state is published through Lock.Watch(). import ( "context" "fmt" "io" "sync" "time" pb "git.awesomike.com/pub/waymaker-client/go/genpb/locks" "google.golang.org/grpc" ) // Ensure grpc is used (it's imported for CallOption in ListAcquiredLocks). var _ = grpc.EmptyCallOption{} // --- Scope --- // Scope is the fence-token durability tier. It controls ONE thing: how // durable the per-key fence_token counter (a monotonic uint64) is across // failures. It does NOT control whether the lease survives node failures // (that is replication_factor), nor mutual exclusion (you must still // fence at the side-effect level). type Scope int32 const ( // ScopeEphemeral — counter in RAM on the owning node. Resets on // process restart or hash-ring rebalance. Fastest; no I/O. ScopeEphemeral Scope = 1 // ScopeLocal — counter persisted to disk on the owning node. // Survives process restart; resets on ring rebalance. ScopeLocal Scope = 2 // ScopeQuorum — Raft-replicated per-key counter, cluster-wide // monotonic. Survives any single-node failure. ScopeQuorum Scope = 3 ) // --- LockConfig --- // LockConfig is the acquire configuration. Zero value is valid: // MaxWait defaults to 1 hour, LeaseTTL to 30 s, RequesterApplication // to "waymaker-client-go". RequestID is auto-filled with a UUID if empty. type LockConfig struct { // MaxWait is how long the server will block waiting for the lock. // Zero means try-acquire (fail immediately if contended). // Default: 1 hour. MaxWait time.Duration // LeaseTTL is how long the lease lives once acquired. // Default: 30 s. LeaseTTL time.Duration // Priority class — higher values jump the wait queue. Priority uint32 // Scope controls fence-token durability. Scope Scope // RequesterInfo is free-form metadata for server-side audit. RequesterInfo string // RequesterApplication is the application name. // Default: "waymaker-client-go". RequesterApplication string // RequestID is the idempotency key for retries of the same acquire. // Auto-filled with a UUID if empty. RequestID string } func (c LockConfig) withDefaults() LockConfig { if c.MaxWait == 0 { c.MaxWait = time.Hour } if c.LeaseTTL == 0 { c.LeaseTTL = 30 * time.Second } if c.RequesterApplication == "" { c.RequesterApplication = "waymaker-client-go" } if c.RequestID == "" { c.RequestID = newUUID() } if c.Scope == 0 { c.Scope = ScopeEphemeral } return c } func durationToMs(d time.Duration) uint32 { ms := d.Milliseconds() if ms < 0 { return 0 } if ms > int64(^uint32(0)) { return ^uint32(0) } return uint32(ms) } func (c LockConfig) toLockRequest(key string) *pb.LockRequest { return &pb.LockRequest{ Key: key, MaxWaitPeriod: durationToMs(c.MaxWait), MaxLeasePeriod: durationToMs(c.LeaseTTL), Priority: c.Priority, RequesterInfo: c.RequesterInfo, RequesterApplication: c.RequesterApplication, RequestId: c.RequestID, FenceScope: pb.FenceScope(c.Scope), } } // --- Lease --- // Lease holds the details of a currently-held lock lease. type Lease struct { ID string Key string AcquiredAtMs int64 LeaseExpiresAtMs int64 FenceToken uint64 Priority uint32 } func leaseFromPB(l *pb.Lease) Lease { return Lease{ ID: l.GetId(), Key: l.GetKey(), AcquiredAtMs: l.GetCreatedAt(), LeaseExpiresAtMs: l.GetLeaseExpiresAt(), FenceToken: l.GetFenceToken(), Priority: l.GetPriority(), } } // --- LockState --- // LockState is a live snapshot of a held lock, delivered through // Lock.Watch(). FenceToken and ID change only if the lock was lost // and transparently re-won after a primary failure; Lost flips to true // once the client gives up re-establishing ownership. type LockState struct { // ID is the lease id. Stable across transparent re-binds. ID string // FenceToken is the current fence token. Re-read before every // fenced side effect. FenceToken uint64 // LeaseExpiresAtMs is the lease expiry epoch (ms). LeaseExpiresAtMs int64 // Lost is true once the client could no longer prove ownership. // A lost holder MUST stop acting as the holder. Lost bool } // --- Lock --- // Lock is an acquired lock handle. Dropping the handle does NOT // auto-release the lock — call Unlock explicitly (or let the lease // expire). This matches the underlying RPC semantics. // // A background goroutine keeps the server event stream open. If the // stream drops — typically because the key's primary bounced — the // goroutine transparently re-binds it, reusing the original RequestID // so a still-held lease is recovered rather than re-contended. The // lease itself is kept alive by the server's TTL plus SpawnRenewal, // independent of the stream. type Lock struct { client *Client Key string mu sync.RWMutex state LockState // stateCh is closed and replaced whenever state changes, so // Watch() receivers can block on the current channel. stateCh chan struct{} // stopCh is closed to signal the hold goroutine to stop. stopCh chan struct{} // stopOnce guards the single close of stopCh. stopOnce sync.Once holdDone chan struct{} } // ID returns the current lease id (live). func (l *Lock) ID() string { l.mu.RLock() defer l.mu.RUnlock() return l.state.ID } // FenceToken returns the current fence token (live). Re-read before // every fenced side effect. func (l *Lock) FenceToken() uint64 { l.mu.RLock() defer l.mu.RUnlock() return l.state.FenceToken } // LeaseExpiresAtMs returns the lease expiry epoch ms (live). func (l *Lock) LeaseExpiresAtMs() int64 { l.mu.RLock() defer l.mu.RUnlock() return l.state.LeaseExpiresAtMs } // IsLost returns true once the client has lost the lock and could not // re-win it. A lost holder must stop acting as the holder. func (l *Lock) IsLost() bool { l.mu.RLock() defer l.mu.RUnlock() return l.state.Lost } // State returns a copy of the current LockState. func (l *Lock) State() LockState { l.mu.RLock() defer l.mu.RUnlock() return l.state } // Watch returns a channel that is closed each time the lock state changes. // Callers should call State() (or the individual accessors) after receiving // from the channel to get the updated values. A new channel is returned on // each call; each caller gets independent notification. // // Usage: // // ch := lock.Watch() // for { // <-ch // if lock.IsLost() { ... } // ch = lock.Watch() // re-subscribe for next change // } func (l *Lock) Watch() <-chan struct{} { l.mu.RLock() defer l.mu.RUnlock() return l.stateCh } // publishState updates the internal state and notifies watchers. // Must be called with mu held for write. func (l *Lock) publishLocked(next LockState) { if l.state == next { return } l.state = next old := l.stateCh l.stateCh = make(chan struct{}) close(old) } // Extend the lease by additional duration. func (l *Lock) Extend(ctx context.Context, additional time.Duration) (Lease, error) { c := l.client.locksClient() r, err := c.ExtendLease(ctx, &pb.ExtendLeaseRequest{ Key: l.Key, Id: l.ID(), LeaseTimeout: durationToMs(additional), }) if err != nil { return Lease{}, rpcErr(err) } if !r.GetSuccess() { return Lease{}, serverErr(r.GetResultCode(), r.GetMessage()) } lease := r.GetLease() if lease == nil { return Lease{}, serverErr("internal", "missing lease in ExtendLease response") } return leaseFromPB(lease), nil } // Unlock releases the lock. After this returns, the server-side state // is gone. The background hold goroutine is stopped before the UnLock // RPC is sent so it cannot re-acquire a lock the caller is releasing. func (l *Lock) Unlock(ctx context.Context) error { // Signal the hold goroutine to stop FIRST so it cannot race with // the UnLock RPC and resurrect a just-released lock. l.stop() // Wait for the hold goroutine to finish before sending UnLock, so // we don't race a re-acquire with the explicit release. <-l.holdDone id := l.ID() c := l.client.locksClient() r, err := c.UnLock(ctx, &pb.UnLockRequest{ Key: l.Key, Id: id, }) if err != nil { return rpcErr(err) } if !r.GetSuccess() { return serverErr(r.GetResultCode(), r.GetMessage()) } return nil } func (l *Lock) stop() { l.stopOnce.Do(func() { close(l.stopCh) }) } // RenewalHandle keeps a background renewal task alive. Drop it (or call // Stop) to halt renewal cleanly. type RenewalHandle struct { stopCh chan struct{} stopOnce sync.Once done chan struct{} } // Stop halts the renewal task. After this returns, no further // ExtendLease RPCs will be sent. func (h *RenewalHandle) Stop() { h.stopOnce.Do(func() { close(h.stopCh) }) <-h.done } // SpawnRenewal spawns a background goroutine that periodically extends // the lease. Returns a RenewalHandle — call Stop() or let it be // garbage-collected (which aborts the goroutine) to halt renewal. // // The renewal reads the live id each tick so a transparent re-acquire // after a primary failure renews the correct lease. // // The timer fires the TTL renewal at every after the first tick (which // is skipped — the caller has the freshly-acquired lease). The extend // call is bounded to every duration so a hung server can't freeze the // renewal task. func (l *Lock) SpawnRenewal(every time.Duration) *RenewalHandle { h := &RenewalHandle{ stopCh: make(chan struct{}), done: make(chan struct{}), } ttl := every * 2 if ttl > time.Duration(^uint32(0))*time.Millisecond { ttl = time.Duration(^uint32(0)) * time.Millisecond } go func() { defer close(h.done) ticker := time.NewTicker(every) defer ticker.Stop() // Skip the first tick so we don't extend immediately after acquire. select { case <-ticker.C: // discard the first tick case <-h.stopCh: return case <-l.stopCh: return } for { select { case <-h.stopCh: return case <-l.stopCh: return case <-ticker.C: state := l.State() if state.Lost { continue } ctx, cancel := context.WithTimeout(context.Background(), every) c := l.client.locksClient() _, _ = c.ExtendLease(ctx, &pb.ExtendLeaseRequest{ Key: l.Key, Id: state.ID, LeaseTimeout: durationToMs(ttl), }) cancel() } } }() return h } // --- background hold-loop internals --- const ( holdBaseBackoff = 200 * time.Millisecond holdMaxBackoff = 10 * time.Second holdRPCTimeout = 10 * time.Second ) type drainResult int const ( drainStopped drainResult = iota // stop signal received drainDisconnected // stream ended; try to re-establish ) type reboundResult struct { stream pb.WaymakerService_LockClient id string fence uint64 exp int64 ok bool } type ownershipResult int const ( ownershipHeld ownershipResult = iota ownershipLost ownershipUnknown ) // drainStream reads events from stream until it closes, stop is signalled, // or the lease is reported gone. Heartbeat / Acquired events update lock state. func (l *Lock) drainStream(stream pb.WaymakerService_LockClient) drainResult { for { // Check stop before blocking on Recv. select { case <-l.stopCh: return drainStopped default: } // Use a channel to race Recv against stopCh. type recvResult struct { ev *pb.LockEvent err error } ch := make(chan recvResult, 1) go func() { ev, err := stream.Recv() ch <- recvResult{ev, err} }() select { case <-l.stopCh: return drainStopped case res := <-ch: if res.err != nil { // io.EOF or any other error: stream ended. return drainDisconnected } ev := res.ev switch ev.GetEventType() { case pb.LockEventType_Heartbeat: l.mu.Lock() next := l.state next.LeaseExpiresAtMs = ev.GetLeaseExpiresAt() l.publishLocked(next) l.mu.Unlock() case pb.LockEventType_Acquired: l.mu.Lock() next := LockState{ ID: ev.GetId(), FenceToken: ev.GetFenceToken(), LeaseExpiresAtMs: ev.GetLeaseExpiresAt(), Lost: false, } l.publishLocked(next) l.mu.Unlock() case pb.LockEventType_Expired, pb.LockEventType_Failed: return drainDisconnected default: // Waiting, Unknown: ignore. } } } } // reacquire tries one idempotent re-acquire (max_wait=0) using the // original request_id. Returns reboundResult.ok=true on success. func (l *Lock) reacquire(read bool, req *pb.LockRequest) reboundResult { ctx, cancel := context.WithTimeout(context.Background(), holdRPCTimeout) defer cancel() var stream pb.WaymakerService_LockClient var err error c := l.client.locksClient() if read { stream, err = c.ReadLock(ctx, req) } else { stream, err = c.Lock(ctx, req) } if err != nil { return reboundResult{} } for { ev, err := stream.Recv() if err != nil { return reboundResult{} } switch ev.GetEventType() { case pb.LockEventType_Acquired: return reboundResult{ stream: stream, id: ev.GetId(), fence: ev.GetFenceToken(), exp: ev.GetLeaseExpiresAt(), ok: true, } case pb.LockEventType_Failed, pb.LockEventType_Expired: return reboundResult{} default: // Waiting, Heartbeat — keep reading. } } } // confirmOwnership checks whether the lock's current id still owns the key. func (l *Lock) confirmOwnership(key, id string) (ownershipResult, int64) { ctx, cancel := context.WithTimeout(context.Background(), holdRPCTimeout) defer cancel() c := l.client.locksClient() r, err := c.LeaseStatus(ctx, &pb.LeaseStatusRequest{Key: key, Id: id}) if err != nil { return ownershipUnknown, 0 } if !r.GetSuccess() || r.GetLease() == nil { return ownershipLost, 0 } return ownershipHeld, r.GetLease().GetLeaseExpiresAt() } // holdLoop is the background goroutine. It drains the stream, re-binds on // drop, and marks the lock lost only when ownership cannot be confirmed. func (l *Lock) holdLoop(read bool, stream pb.WaymakerService_LockClient, reacquireReq *pb.LockRequest) { defer close(l.holdDone) for { switch l.drainStream(stream) { case drainStopped: return case drainDisconnected: // Fall through to re-establish. } backoff := holdBaseBackoff for { select { case <-l.stopCh: return default: } rb := l.reacquire(read, reacquireReq) if rb.ok { l.mu.Lock() l.publishLocked(LockState{ ID: rb.id, FenceToken: rb.fence, LeaseExpiresAtMs: rb.exp, Lost: false, }) l.mu.Unlock() stream = rb.stream break // resume draining the fresh stream } // Could not re-acquire. Check if we still hold the lease. curID := l.ID() own, exp := l.confirmOwnership(l.Key, curID) switch own { case ownershipHeld: l.mu.Lock() next := l.state next.LeaseExpiresAtMs = exp l.publishLocked(next) l.mu.Unlock() case ownershipUnknown: // Transient — back off and retry. case ownershipLost: l.mu.Lock() next := l.state next.Lost = true l.publishLocked(next) l.mu.Unlock() return } select { case <-l.stopCh: return case <-time.After(backoff): } backoff *= 2 if backoff > holdMaxBackoff { backoff = holdMaxBackoff } continue } // Broke out of inner loop — resume outer drain loop. } } // --- Client entry points --- // AcquireLock acquires an exclusive (write) lock. Blocks for at most // config.MaxWait; returns an error with Code=="expired" if the wait // elapses without a grant. func (c *Client) AcquireLock(ctx context.Context, key string, config LockConfig) (*Lock, error) { return c.acquireLockInner(ctx, key, config, false) } // AcquireReadLock acquires a shared (read) lock. func (c *Client) AcquireReadLock(ctx context.Context, key string, config LockConfig) (*Lock, error) { return c.acquireLockInner(ctx, key, config, true) } func (c *Client) acquireLockInner(ctx context.Context, key string, config LockConfig, read bool) (*Lock, error) { config = config.withDefaults() req := config.toLockRequest(key) // Template for the re-acquire: same request_id (idempotent recovery) // but max_wait = 0 so it never blocks. // Construct fresh to avoid copying a proto message (contains sync.Mutex). reacquireReq := &pb.LockRequest{ Key: req.Key, MaxWaitPeriod: 0, MaxLeasePeriod: req.MaxLeasePeriod, Priority: req.Priority, RequesterInfo: req.RequesterInfo, RequesterApplication: req.RequesterApplication, RequestId: req.RequestId, FenceScope: req.FenceScope, } lc := c.locksClient() var stream pb.WaymakerService_LockClient var err error if read { stream, err = lc.ReadLock(ctx, req) } else { stream, err = lc.Lock(ctx, req) } if err != nil { return nil, rpcErr(err) } // Consume events until Acquired / Failed / Expired. for { ev, err := stream.Recv() if err != nil { if err == io.EOF { return nil, serverErr("stream_closed", "lock stream closed before acquire") } return nil, rpcErr(err) } switch ev.GetEventType() { case pb.LockEventType_Acquired: init := LockState{ ID: ev.GetId(), FenceToken: ev.GetFenceToken(), LeaseExpiresAtMs: ev.GetLeaseExpiresAt(), Lost: false, } lock := &Lock{ client: c, Key: key, state: init, stateCh: make(chan struct{}), stopCh: make(chan struct{}), holdDone: make(chan struct{}), } go lock.holdLoop(read, stream, reacquireReq) return lock, nil case pb.LockEventType_Failed: return nil, serverErr("failed", ev.GetMessage()) case pb.LockEventType_Expired: return nil, serverErr("expired", ev.GetMessage()) default: // Waiting / Heartbeat / Unknown — keep reading. } } } // LeaseStatus queries the current state of a lock by id. func (c *Client) LeaseStatus(ctx context.Context, key, id string) (Lease, error) { lc := c.locksClient() r, err := lc.LeaseStatus(ctx, &pb.LeaseStatusRequest{Key: key, Id: id}) if err != nil { return Lease{}, rpcErr(err) } if !r.GetSuccess() { return Lease{}, serverErr(r.GetResultCode(), r.GetMessage()) } lease := r.GetLease() if lease == nil { return Lease{}, serverErr("internal", "missing lease in LeaseStatus response") } return leaseFromPB(lease), nil } // MultiLockKey is one entry in a MultiLock request. type MultiLockKey struct { Key string WriteLock bool } // MultiLock acquires N locks atomically. The server sorts keys to // guarantee deadlock-free ordering. On any failure every partial lock is // released before the call returns. // // Returns leases in the server's acquisition order (lexicographic by key). func (c *Client) MultiLock(ctx context.Context, keys []MultiLockKey, config LockConfig) ([]Lease, error) { config = config.withDefaults() pbKeys := make([]*pb.MultiLockKey, len(keys)) for i, k := range keys { pbKeys[i] = &pb.MultiLockKey{Key: k.Key, WriteLock: k.WriteLock} } lc := c.locksClient() r, err := lc.MultiLock(ctx, &pb.MultiLockRequest{ Keys: pbKeys, MaxWaitPeriod: durationToMs(config.MaxWait), MaxLeasePeriod: durationToMs(config.LeaseTTL), Priority: config.Priority, RequesterInfo: config.RequesterInfo, RequesterApplication: config.RequesterApplication, RequestId: config.RequestID, FenceScope: pb.FenceScope(config.Scope), }) if err != nil { return nil, rpcErr(err) } if !r.GetSuccess() { return nil, serverErr(r.GetResultCode(), r.GetMessage()) } leases := make([]Lease, len(r.GetLeases())) for i, l := range r.GetLeases() { leases[i] = leaseFromPB(l) } return leases, nil } // ListAcquiredLocks returns every lock currently held on the node // serving the request. Optionally filtered by key prefix. func (c *Client) ListAcquiredLocks(ctx context.Context, keyPrefix string, opts ...grpc.CallOption) ([]*pb.AcquiredLock, error) { lc := c.locksClient() r, err := lc.ListAcquiredLocks(ctx, &pb.ListAcquiredLocksRequest{KeyPrefix: keyPrefix}, opts...) if err != nil { return nil, rpcErr(err) } if !r.GetSuccess() { return nil, serverErr("list_failed", "ListAcquiredLocks returned success=false") } return r.GetLocks(), nil } // --- UUID helper (no external dependency) --- func newUUID() string { // Simple time-based UUID v4 using crypto/rand. b := make([]byte, 16) _, _ = cryptoRandRead(b) b[6] = (b[6] & 0x0f) | 0x40 b[8] = (b[8] & 0x3f) | 0x80 return fmt.Sprintf("%08x-%04x-%04x-%04x-%012x", b[0:4], b[4:6], b[6:8], b[8:10], b[10:16]) }