775 lines
21 KiB
Go
775 lines
21 KiB
Go
package waymaker
|
|
|
|
// Locks subsystem — wraps the rwlock RPCs (Lock / ReadLock / UnLock /
|
|
// LeaseStatus / ExtendLease / MultiLock).
|
|
//
|
|
// Entry points live on *Client:
|
|
// - client.AcquireLock(ctx, key, LockConfig{…}) — exclusive (write) lock
|
|
// - client.AcquireReadLock(ctx, key, LockConfig{…}) — shared (read) lock
|
|
// - client.LeaseStatus(ctx, key, id)
|
|
// - client.MultiLock(ctx, keys, LockConfig{…})
|
|
//
|
|
// The returned *Lock keeps a background goroutine that holds the server
|
|
// event stream open and, if that stream drops (e.g. the key's primary
|
|
// bounces), transparently re-binds it and re-confirms ownership — reusing
|
|
// the original RequestID so a still-held lease is recovered rather than
|
|
// re-contended. The lock's live state is published through Lock.Watch().
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"io"
|
|
"sync"
|
|
"time"
|
|
|
|
pb "git.awesomike.com/pub/waymaker-client/go/genpb/locks"
|
|
"google.golang.org/grpc"
|
|
)
|
|
|
|
// Ensure grpc is used (it's imported for CallOption in ListAcquiredLocks).
|
|
var _ = grpc.EmptyCallOption{}
|
|
|
|
// --- Scope ---
|
|
|
|
// Scope is the fence-token durability tier. It controls ONE thing: how
|
|
// durable the per-key fence_token counter (a monotonic uint64) is across
|
|
// failures. It does NOT control whether the lease survives node failures
|
|
// (that is replication_factor), nor mutual exclusion (you must still
|
|
// fence at the side-effect level).
|
|
type Scope int32
|
|
|
|
const (
|
|
// ScopeEphemeral — counter in RAM on the owning node. Resets on
|
|
// process restart or hash-ring rebalance. Fastest; no I/O.
|
|
ScopeEphemeral Scope = 1
|
|
// ScopeLocal — counter persisted to disk on the owning node.
|
|
// Survives process restart; resets on ring rebalance.
|
|
ScopeLocal Scope = 2
|
|
// ScopeQuorum — Raft-replicated per-key counter, cluster-wide
|
|
// monotonic. Survives any single-node failure.
|
|
ScopeQuorum Scope = 3
|
|
)
|
|
|
|
// --- LockConfig ---
|
|
|
|
// LockConfig is the acquire configuration. Zero value is valid:
|
|
// MaxWait defaults to 1 hour, LeaseTTL to 30 s, RequesterApplication
|
|
// to "waymaker-client-go". RequestID is auto-filled with a UUID if empty.
|
|
type LockConfig struct {
|
|
// MaxWait is how long the server will block waiting for the lock.
|
|
// Zero means try-acquire (fail immediately if contended).
|
|
// Default: 1 hour.
|
|
MaxWait time.Duration
|
|
// LeaseTTL is how long the lease lives once acquired.
|
|
// Default: 30 s.
|
|
LeaseTTL time.Duration
|
|
// Priority class — higher values jump the wait queue.
|
|
Priority uint32
|
|
// Scope controls fence-token durability.
|
|
Scope Scope
|
|
// RequesterInfo is free-form metadata for server-side audit.
|
|
RequesterInfo string
|
|
// RequesterApplication is the application name.
|
|
// Default: "waymaker-client-go".
|
|
RequesterApplication string
|
|
// RequestID is the idempotency key for retries of the same acquire.
|
|
// Auto-filled with a UUID if empty.
|
|
RequestID string
|
|
}
|
|
|
|
func (c LockConfig) withDefaults() LockConfig {
|
|
if c.MaxWait == 0 {
|
|
c.MaxWait = time.Hour
|
|
}
|
|
if c.LeaseTTL == 0 {
|
|
c.LeaseTTL = 30 * time.Second
|
|
}
|
|
if c.RequesterApplication == "" {
|
|
c.RequesterApplication = "waymaker-client-go"
|
|
}
|
|
if c.RequestID == "" {
|
|
c.RequestID = newUUID()
|
|
}
|
|
if c.Scope == 0 {
|
|
c.Scope = ScopeEphemeral
|
|
}
|
|
return c
|
|
}
|
|
|
|
func durationToMs(d time.Duration) uint32 {
|
|
ms := d.Milliseconds()
|
|
if ms < 0 {
|
|
return 0
|
|
}
|
|
if ms > int64(^uint32(0)) {
|
|
return ^uint32(0)
|
|
}
|
|
return uint32(ms)
|
|
}
|
|
|
|
func (c LockConfig) toLockRequest(key string) *pb.LockRequest {
|
|
return &pb.LockRequest{
|
|
Key: key,
|
|
MaxWaitPeriod: durationToMs(c.MaxWait),
|
|
MaxLeasePeriod: durationToMs(c.LeaseTTL),
|
|
Priority: c.Priority,
|
|
RequesterInfo: c.RequesterInfo,
|
|
RequesterApplication: c.RequesterApplication,
|
|
RequestId: c.RequestID,
|
|
FenceScope: pb.FenceScope(c.Scope),
|
|
}
|
|
}
|
|
|
|
// --- Lease ---
|
|
|
|
// Lease holds the details of a currently-held lock lease.
|
|
type Lease struct {
|
|
ID string
|
|
Key string
|
|
AcquiredAtMs int64
|
|
LeaseExpiresAtMs int64
|
|
FenceToken uint64
|
|
Priority uint32
|
|
}
|
|
|
|
func leaseFromPB(l *pb.Lease) Lease {
|
|
return Lease{
|
|
ID: l.GetId(),
|
|
Key: l.GetKey(),
|
|
AcquiredAtMs: l.GetCreatedAt(),
|
|
LeaseExpiresAtMs: l.GetLeaseExpiresAt(),
|
|
FenceToken: l.GetFenceToken(),
|
|
Priority: l.GetPriority(),
|
|
}
|
|
}
|
|
|
|
// --- LockState ---
|
|
|
|
// LockState is a live snapshot of a held lock, delivered through
|
|
// Lock.Watch(). FenceToken and ID change only if the lock was lost
|
|
// and transparently re-won after a primary failure; Lost flips to true
|
|
// once the client gives up re-establishing ownership.
|
|
type LockState struct {
|
|
// ID is the lease id. Stable across transparent re-binds.
|
|
ID string
|
|
// FenceToken is the current fence token. Re-read before every
|
|
// fenced side effect.
|
|
FenceToken uint64
|
|
// LeaseExpiresAtMs is the lease expiry epoch (ms).
|
|
LeaseExpiresAtMs int64
|
|
// Lost is true once the client could no longer prove ownership.
|
|
// A lost holder MUST stop acting as the holder.
|
|
Lost bool
|
|
}
|
|
|
|
// --- Lock ---
|
|
|
|
// Lock is an acquired lock handle. Dropping the handle does NOT
|
|
// auto-release the lock — call Unlock explicitly (or let the lease
|
|
// expire). This matches the underlying RPC semantics.
|
|
//
|
|
// A background goroutine keeps the server event stream open. If the
|
|
// stream drops — typically because the key's primary bounced — the
|
|
// goroutine transparently re-binds it, reusing the original RequestID
|
|
// so a still-held lease is recovered rather than re-contended. The
|
|
// lease itself is kept alive by the server's TTL plus SpawnRenewal,
|
|
// independent of the stream.
|
|
type Lock struct {
|
|
client *Client
|
|
Key string
|
|
|
|
mu sync.RWMutex
|
|
state LockState
|
|
|
|
// stateCh is closed and replaced whenever state changes, so
|
|
// Watch() receivers can block on the current channel.
|
|
stateCh chan struct{}
|
|
|
|
// stopCh is closed to signal the hold goroutine to stop.
|
|
stopCh chan struct{}
|
|
// stopOnce guards the single close of stopCh.
|
|
stopOnce sync.Once
|
|
|
|
holdDone chan struct{}
|
|
}
|
|
|
|
// ID returns the current lease id (live).
|
|
func (l *Lock) ID() string {
|
|
l.mu.RLock()
|
|
defer l.mu.RUnlock()
|
|
return l.state.ID
|
|
}
|
|
|
|
// FenceToken returns the current fence token (live). Re-read before
|
|
// every fenced side effect.
|
|
func (l *Lock) FenceToken() uint64 {
|
|
l.mu.RLock()
|
|
defer l.mu.RUnlock()
|
|
return l.state.FenceToken
|
|
}
|
|
|
|
// LeaseExpiresAtMs returns the lease expiry epoch ms (live).
|
|
func (l *Lock) LeaseExpiresAtMs() int64 {
|
|
l.mu.RLock()
|
|
defer l.mu.RUnlock()
|
|
return l.state.LeaseExpiresAtMs
|
|
}
|
|
|
|
// IsLost returns true once the client has lost the lock and could not
|
|
// re-win it. A lost holder must stop acting as the holder.
|
|
func (l *Lock) IsLost() bool {
|
|
l.mu.RLock()
|
|
defer l.mu.RUnlock()
|
|
return l.state.Lost
|
|
}
|
|
|
|
// State returns a copy of the current LockState.
|
|
func (l *Lock) State() LockState {
|
|
l.mu.RLock()
|
|
defer l.mu.RUnlock()
|
|
return l.state
|
|
}
|
|
|
|
// Watch returns a channel that is closed each time the lock state changes.
|
|
// Callers should call State() (or the individual accessors) after receiving
|
|
// from the channel to get the updated values. A new channel is returned on
|
|
// each call; each caller gets independent notification.
|
|
//
|
|
// Usage:
|
|
//
|
|
// ch := lock.Watch()
|
|
// for {
|
|
// <-ch
|
|
// if lock.IsLost() { ... }
|
|
// ch = lock.Watch() // re-subscribe for next change
|
|
// }
|
|
func (l *Lock) Watch() <-chan struct{} {
|
|
l.mu.RLock()
|
|
defer l.mu.RUnlock()
|
|
return l.stateCh
|
|
}
|
|
|
|
// publishState updates the internal state and notifies watchers.
|
|
// Must be called with mu held for write.
|
|
func (l *Lock) publishLocked(next LockState) {
|
|
if l.state == next {
|
|
return
|
|
}
|
|
l.state = next
|
|
old := l.stateCh
|
|
l.stateCh = make(chan struct{})
|
|
close(old)
|
|
}
|
|
|
|
// Extend the lease by additional duration.
|
|
func (l *Lock) Extend(ctx context.Context, additional time.Duration) (Lease, error) {
|
|
c := l.client.locksClient()
|
|
r, err := c.ExtendLease(ctx, &pb.ExtendLeaseRequest{
|
|
Key: l.Key,
|
|
Id: l.ID(),
|
|
LeaseTimeout: durationToMs(additional),
|
|
})
|
|
if err != nil {
|
|
return Lease{}, rpcErr(err)
|
|
}
|
|
if !r.GetSuccess() {
|
|
return Lease{}, serverErr(r.GetResultCode(), r.GetMessage())
|
|
}
|
|
lease := r.GetLease()
|
|
if lease == nil {
|
|
return Lease{}, serverErr("internal", "missing lease in ExtendLease response")
|
|
}
|
|
return leaseFromPB(lease), nil
|
|
}
|
|
|
|
// Unlock releases the lock. After this returns, the server-side state
|
|
// is gone. The background hold goroutine is stopped before the UnLock
|
|
// RPC is sent so it cannot re-acquire a lock the caller is releasing.
|
|
func (l *Lock) Unlock(ctx context.Context) error {
|
|
// Signal the hold goroutine to stop FIRST so it cannot race with
|
|
// the UnLock RPC and resurrect a just-released lock.
|
|
l.stop()
|
|
// Wait for the hold goroutine to finish before sending UnLock, so
|
|
// we don't race a re-acquire with the explicit release.
|
|
<-l.holdDone
|
|
|
|
id := l.ID()
|
|
c := l.client.locksClient()
|
|
r, err := c.UnLock(ctx, &pb.UnLockRequest{
|
|
Key: l.Key,
|
|
Id: id,
|
|
})
|
|
if err != nil {
|
|
return rpcErr(err)
|
|
}
|
|
if !r.GetSuccess() {
|
|
return serverErr(r.GetResultCode(), r.GetMessage())
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (l *Lock) stop() {
|
|
l.stopOnce.Do(func() {
|
|
close(l.stopCh)
|
|
})
|
|
}
|
|
|
|
// RenewalHandle keeps a background renewal task alive. Drop it (or call
|
|
// Stop) to halt renewal cleanly.
|
|
type RenewalHandle struct {
|
|
stopCh chan struct{}
|
|
stopOnce sync.Once
|
|
done chan struct{}
|
|
}
|
|
|
|
// Stop halts the renewal task. After this returns, no further
|
|
// ExtendLease RPCs will be sent.
|
|
func (h *RenewalHandle) Stop() {
|
|
h.stopOnce.Do(func() {
|
|
close(h.stopCh)
|
|
})
|
|
<-h.done
|
|
}
|
|
|
|
// SpawnRenewal spawns a background goroutine that periodically extends
|
|
// the lease. Returns a RenewalHandle — call Stop() or let it be
|
|
// garbage-collected (which aborts the goroutine) to halt renewal.
|
|
//
|
|
// The renewal reads the live id each tick so a transparent re-acquire
|
|
// after a primary failure renews the correct lease.
|
|
//
|
|
// The timer fires the TTL renewal at every after the first tick (which
|
|
// is skipped — the caller has the freshly-acquired lease). The extend
|
|
// call is bounded to every duration so a hung server can't freeze the
|
|
// renewal task.
|
|
func (l *Lock) SpawnRenewal(every time.Duration) *RenewalHandle {
|
|
h := &RenewalHandle{
|
|
stopCh: make(chan struct{}),
|
|
done: make(chan struct{}),
|
|
}
|
|
ttl := every * 2
|
|
if ttl > time.Duration(^uint32(0))*time.Millisecond {
|
|
ttl = time.Duration(^uint32(0)) * time.Millisecond
|
|
}
|
|
|
|
go func() {
|
|
defer close(h.done)
|
|
ticker := time.NewTicker(every)
|
|
defer ticker.Stop()
|
|
// Skip the first tick so we don't extend immediately after acquire.
|
|
select {
|
|
case <-ticker.C:
|
|
// discard the first tick
|
|
case <-h.stopCh:
|
|
return
|
|
case <-l.stopCh:
|
|
return
|
|
}
|
|
for {
|
|
select {
|
|
case <-h.stopCh:
|
|
return
|
|
case <-l.stopCh:
|
|
return
|
|
case <-ticker.C:
|
|
state := l.State()
|
|
if state.Lost {
|
|
continue
|
|
}
|
|
ctx, cancel := context.WithTimeout(context.Background(), every)
|
|
c := l.client.locksClient()
|
|
_, _ = c.ExtendLease(ctx, &pb.ExtendLeaseRequest{
|
|
Key: l.Key,
|
|
Id: state.ID,
|
|
LeaseTimeout: durationToMs(ttl),
|
|
})
|
|
cancel()
|
|
}
|
|
}
|
|
}()
|
|
return h
|
|
}
|
|
|
|
// --- background hold-loop internals ---
|
|
|
|
const (
|
|
holdBaseBackoff = 200 * time.Millisecond
|
|
holdMaxBackoff = 10 * time.Second
|
|
holdRPCTimeout = 10 * time.Second
|
|
)
|
|
|
|
type drainResult int
|
|
|
|
const (
|
|
drainStopped drainResult = iota // stop signal received
|
|
drainDisconnected // stream ended; try to re-establish
|
|
)
|
|
|
|
type reboundResult struct {
|
|
stream pb.WaymakerService_LockClient
|
|
id string
|
|
fence uint64
|
|
exp int64
|
|
ok bool
|
|
}
|
|
|
|
type ownershipResult int
|
|
|
|
const (
|
|
ownershipHeld ownershipResult = iota
|
|
ownershipLost
|
|
ownershipUnknown
|
|
)
|
|
|
|
// drainStream reads events from stream until it closes, stop is signalled,
|
|
// or the lease is reported gone. Heartbeat / Acquired events update lock state.
|
|
func (l *Lock) drainStream(stream pb.WaymakerService_LockClient) drainResult {
|
|
for {
|
|
// Check stop before blocking on Recv.
|
|
select {
|
|
case <-l.stopCh:
|
|
return drainStopped
|
|
default:
|
|
}
|
|
|
|
// Use a channel to race Recv against stopCh.
|
|
type recvResult struct {
|
|
ev *pb.LockEvent
|
|
err error
|
|
}
|
|
ch := make(chan recvResult, 1)
|
|
go func() {
|
|
ev, err := stream.Recv()
|
|
ch <- recvResult{ev, err}
|
|
}()
|
|
|
|
select {
|
|
case <-l.stopCh:
|
|
return drainStopped
|
|
case res := <-ch:
|
|
if res.err != nil {
|
|
// io.EOF or any other error: stream ended.
|
|
return drainDisconnected
|
|
}
|
|
ev := res.ev
|
|
switch ev.GetEventType() {
|
|
case pb.LockEventType_Heartbeat:
|
|
l.mu.Lock()
|
|
next := l.state
|
|
next.LeaseExpiresAtMs = ev.GetLeaseExpiresAt()
|
|
l.publishLocked(next)
|
|
l.mu.Unlock()
|
|
|
|
case pb.LockEventType_Acquired:
|
|
l.mu.Lock()
|
|
next := LockState{
|
|
ID: ev.GetId(),
|
|
FenceToken: ev.GetFenceToken(),
|
|
LeaseExpiresAtMs: ev.GetLeaseExpiresAt(),
|
|
Lost: false,
|
|
}
|
|
l.publishLocked(next)
|
|
l.mu.Unlock()
|
|
|
|
case pb.LockEventType_Expired, pb.LockEventType_Failed:
|
|
return drainDisconnected
|
|
|
|
default:
|
|
// Waiting, Unknown: ignore.
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// reacquire tries one idempotent re-acquire (max_wait=0) using the
|
|
// original request_id. Returns reboundResult.ok=true on success.
|
|
func (l *Lock) reacquire(read bool, req *pb.LockRequest) reboundResult {
|
|
ctx, cancel := context.WithTimeout(context.Background(), holdRPCTimeout)
|
|
defer cancel()
|
|
|
|
var stream pb.WaymakerService_LockClient
|
|
var err error
|
|
c := l.client.locksClient()
|
|
if read {
|
|
stream, err = c.ReadLock(ctx, req)
|
|
} else {
|
|
stream, err = c.Lock(ctx, req)
|
|
}
|
|
if err != nil {
|
|
return reboundResult{}
|
|
}
|
|
|
|
for {
|
|
ev, err := stream.Recv()
|
|
if err != nil {
|
|
return reboundResult{}
|
|
}
|
|
switch ev.GetEventType() {
|
|
case pb.LockEventType_Acquired:
|
|
return reboundResult{
|
|
stream: stream,
|
|
id: ev.GetId(),
|
|
fence: ev.GetFenceToken(),
|
|
exp: ev.GetLeaseExpiresAt(),
|
|
ok: true,
|
|
}
|
|
case pb.LockEventType_Failed, pb.LockEventType_Expired:
|
|
return reboundResult{}
|
|
default:
|
|
// Waiting, Heartbeat — keep reading.
|
|
}
|
|
}
|
|
}
|
|
|
|
// confirmOwnership checks whether the lock's current id still owns the key.
|
|
func (l *Lock) confirmOwnership(key, id string) (ownershipResult, int64) {
|
|
ctx, cancel := context.WithTimeout(context.Background(), holdRPCTimeout)
|
|
defer cancel()
|
|
c := l.client.locksClient()
|
|
r, err := c.LeaseStatus(ctx, &pb.LeaseStatusRequest{Key: key, Id: id})
|
|
if err != nil {
|
|
return ownershipUnknown, 0
|
|
}
|
|
if !r.GetSuccess() || r.GetLease() == nil {
|
|
return ownershipLost, 0
|
|
}
|
|
return ownershipHeld, r.GetLease().GetLeaseExpiresAt()
|
|
}
|
|
|
|
// holdLoop is the background goroutine. It drains the stream, re-binds on
|
|
// drop, and marks the lock lost only when ownership cannot be confirmed.
|
|
func (l *Lock) holdLoop(read bool, stream pb.WaymakerService_LockClient, reacquireReq *pb.LockRequest) {
|
|
defer close(l.holdDone)
|
|
|
|
for {
|
|
switch l.drainStream(stream) {
|
|
case drainStopped:
|
|
return
|
|
case drainDisconnected:
|
|
// Fall through to re-establish.
|
|
}
|
|
|
|
backoff := holdBaseBackoff
|
|
for {
|
|
select {
|
|
case <-l.stopCh:
|
|
return
|
|
default:
|
|
}
|
|
|
|
rb := l.reacquire(read, reacquireReq)
|
|
if rb.ok {
|
|
l.mu.Lock()
|
|
l.publishLocked(LockState{
|
|
ID: rb.id,
|
|
FenceToken: rb.fence,
|
|
LeaseExpiresAtMs: rb.exp,
|
|
Lost: false,
|
|
})
|
|
l.mu.Unlock()
|
|
stream = rb.stream
|
|
break // resume draining the fresh stream
|
|
}
|
|
|
|
// Could not re-acquire. Check if we still hold the lease.
|
|
curID := l.ID()
|
|
own, exp := l.confirmOwnership(l.Key, curID)
|
|
switch own {
|
|
case ownershipHeld:
|
|
l.mu.Lock()
|
|
next := l.state
|
|
next.LeaseExpiresAtMs = exp
|
|
l.publishLocked(next)
|
|
l.mu.Unlock()
|
|
case ownershipUnknown:
|
|
// Transient — back off and retry.
|
|
case ownershipLost:
|
|
l.mu.Lock()
|
|
next := l.state
|
|
next.Lost = true
|
|
l.publishLocked(next)
|
|
l.mu.Unlock()
|
|
return
|
|
}
|
|
|
|
select {
|
|
case <-l.stopCh:
|
|
return
|
|
case <-time.After(backoff):
|
|
}
|
|
backoff *= 2
|
|
if backoff > holdMaxBackoff {
|
|
backoff = holdMaxBackoff
|
|
}
|
|
continue
|
|
}
|
|
// Broke out of inner loop — resume outer drain loop.
|
|
}
|
|
}
|
|
|
|
// --- Client entry points ---
|
|
|
|
// AcquireLock acquires an exclusive (write) lock. Blocks for at most
|
|
// config.MaxWait; returns an error with Code=="expired" if the wait
|
|
// elapses without a grant.
|
|
func (c *Client) AcquireLock(ctx context.Context, key string, config LockConfig) (*Lock, error) {
|
|
return c.acquireLockInner(ctx, key, config, false)
|
|
}
|
|
|
|
// AcquireReadLock acquires a shared (read) lock.
|
|
func (c *Client) AcquireReadLock(ctx context.Context, key string, config LockConfig) (*Lock, error) {
|
|
return c.acquireLockInner(ctx, key, config, true)
|
|
}
|
|
|
|
func (c *Client) acquireLockInner(ctx context.Context, key string, config LockConfig, read bool) (*Lock, error) {
|
|
config = config.withDefaults()
|
|
req := config.toLockRequest(key)
|
|
|
|
// Template for the re-acquire: same request_id (idempotent recovery)
|
|
// but max_wait = 0 so it never blocks.
|
|
// Construct fresh to avoid copying a proto message (contains sync.Mutex).
|
|
reacquireReq := &pb.LockRequest{
|
|
Key: req.Key,
|
|
MaxWaitPeriod: 0,
|
|
MaxLeasePeriod: req.MaxLeasePeriod,
|
|
Priority: req.Priority,
|
|
RequesterInfo: req.RequesterInfo,
|
|
RequesterApplication: req.RequesterApplication,
|
|
RequestId: req.RequestId,
|
|
FenceScope: req.FenceScope,
|
|
}
|
|
|
|
lc := c.locksClient()
|
|
var stream pb.WaymakerService_LockClient
|
|
var err error
|
|
if read {
|
|
stream, err = lc.ReadLock(ctx, req)
|
|
} else {
|
|
stream, err = lc.Lock(ctx, req)
|
|
}
|
|
if err != nil {
|
|
return nil, rpcErr(err)
|
|
}
|
|
|
|
// Consume events until Acquired / Failed / Expired.
|
|
for {
|
|
ev, err := stream.Recv()
|
|
if err != nil {
|
|
if err == io.EOF {
|
|
return nil, serverErr("stream_closed", "lock stream closed before acquire")
|
|
}
|
|
return nil, rpcErr(err)
|
|
}
|
|
switch ev.GetEventType() {
|
|
case pb.LockEventType_Acquired:
|
|
init := LockState{
|
|
ID: ev.GetId(),
|
|
FenceToken: ev.GetFenceToken(),
|
|
LeaseExpiresAtMs: ev.GetLeaseExpiresAt(),
|
|
Lost: false,
|
|
}
|
|
lock := &Lock{
|
|
client: c,
|
|
Key: key,
|
|
state: init,
|
|
stateCh: make(chan struct{}),
|
|
stopCh: make(chan struct{}),
|
|
holdDone: make(chan struct{}),
|
|
}
|
|
go lock.holdLoop(read, stream, reacquireReq)
|
|
return lock, nil
|
|
|
|
case pb.LockEventType_Failed:
|
|
return nil, serverErr("failed", ev.GetMessage())
|
|
case pb.LockEventType_Expired:
|
|
return nil, serverErr("expired", ev.GetMessage())
|
|
default:
|
|
// Waiting / Heartbeat / Unknown — keep reading.
|
|
}
|
|
}
|
|
}
|
|
|
|
// LeaseStatus queries the current state of a lock by id.
|
|
func (c *Client) LeaseStatus(ctx context.Context, key, id string) (Lease, error) {
|
|
lc := c.locksClient()
|
|
r, err := lc.LeaseStatus(ctx, &pb.LeaseStatusRequest{Key: key, Id: id})
|
|
if err != nil {
|
|
return Lease{}, rpcErr(err)
|
|
}
|
|
if !r.GetSuccess() {
|
|
return Lease{}, serverErr(r.GetResultCode(), r.GetMessage())
|
|
}
|
|
lease := r.GetLease()
|
|
if lease == nil {
|
|
return Lease{}, serverErr("internal", "missing lease in LeaseStatus response")
|
|
}
|
|
return leaseFromPB(lease), nil
|
|
}
|
|
|
|
// MultiLockKey is one entry in a MultiLock request.
|
|
type MultiLockKey struct {
|
|
Key string
|
|
WriteLock bool
|
|
}
|
|
|
|
// MultiLock acquires N locks atomically. The server sorts keys to
|
|
// guarantee deadlock-free ordering. On any failure every partial lock is
|
|
// released before the call returns.
|
|
//
|
|
// Returns leases in the server's acquisition order (lexicographic by key).
|
|
func (c *Client) MultiLock(ctx context.Context, keys []MultiLockKey, config LockConfig) ([]Lease, error) {
|
|
config = config.withDefaults()
|
|
pbKeys := make([]*pb.MultiLockKey, len(keys))
|
|
for i, k := range keys {
|
|
pbKeys[i] = &pb.MultiLockKey{Key: k.Key, WriteLock: k.WriteLock}
|
|
}
|
|
lc := c.locksClient()
|
|
r, err := lc.MultiLock(ctx, &pb.MultiLockRequest{
|
|
Keys: pbKeys,
|
|
MaxWaitPeriod: durationToMs(config.MaxWait),
|
|
MaxLeasePeriod: durationToMs(config.LeaseTTL),
|
|
Priority: config.Priority,
|
|
RequesterInfo: config.RequesterInfo,
|
|
RequesterApplication: config.RequesterApplication,
|
|
RequestId: config.RequestID,
|
|
FenceScope: pb.FenceScope(config.Scope),
|
|
})
|
|
if err != nil {
|
|
return nil, rpcErr(err)
|
|
}
|
|
if !r.GetSuccess() {
|
|
return nil, serverErr(r.GetResultCode(), r.GetMessage())
|
|
}
|
|
leases := make([]Lease, len(r.GetLeases()))
|
|
for i, l := range r.GetLeases() {
|
|
leases[i] = leaseFromPB(l)
|
|
}
|
|
return leases, nil
|
|
}
|
|
|
|
// ListAcquiredLocks returns every lock currently held on the node
|
|
// serving the request. Optionally filtered by key prefix.
|
|
func (c *Client) ListAcquiredLocks(ctx context.Context, keyPrefix string, opts ...grpc.CallOption) ([]*pb.AcquiredLock, error) {
|
|
lc := c.locksClient()
|
|
r, err := lc.ListAcquiredLocks(ctx, &pb.ListAcquiredLocksRequest{KeyPrefix: keyPrefix}, opts...)
|
|
if err != nil {
|
|
return nil, rpcErr(err)
|
|
}
|
|
if !r.GetSuccess() {
|
|
return nil, serverErr("list_failed", "ListAcquiredLocks returned success=false")
|
|
}
|
|
return r.GetLocks(), nil
|
|
}
|
|
|
|
// --- UUID helper (no external dependency) ---
|
|
|
|
func newUUID() string {
|
|
// Simple time-based UUID v4 using crypto/rand.
|
|
b := make([]byte, 16)
|
|
_, _ = cryptoRandRead(b)
|
|
b[6] = (b[6] & 0x0f) | 0x40
|
|
b[8] = (b[8] & 0x3f) | 0x80
|
|
return fmt.Sprintf("%08x-%04x-%04x-%04x-%012x",
|
|
b[0:4], b[4:6], b[6:8], b[8:10], b[10:16])
|
|
}
|