waymaker-client/go/stream.go

674 lines
18 KiB
Go

package waymaker
// Streams subsystem — wraps the stream lifecycle, publish, fetch, and
// consumer (push + pull) RPCs.
//
// Entry points on *Client:
// - client.CreateStream(ctx, StreamConfig{…})
// - client.GetStream(ctx, name)
// - client.GetOrCreateStream(ctx, StreamConfig{…})
// - client.UpdateStream(ctx, name, StreamUpdate{…})
// - client.DeleteStream(ctx, name)
import (
"context"
"io"
"time"
pb "git.awesomike.com/pub/waymaker-client/go/genpb/streams"
)
// --- Retention policy ---
// RetentionPolicy controls how the stream prunes old messages.
type RetentionPolicy int
const (
RetentionLimits RetentionPolicy = iota // size/age limits; default
RetentionWorkQueue // exactly-once work queue
RetentionInterest // retain while consumers exist
)
// --- StreamSource ---
// OnDropPolicy controls what a cross-stream tail does when the source
// drops messages past the last-sourced seq.
type OnDropPolicy int
const (
OnDropHalt OnDropPolicy = 0
OnDropSkipToFirstAvailable OnDropPolicy = 1
)
// SubjectTransform is a NATS-style subject rewrite for cross-stream sources.
type SubjectTransform struct {
SourcePattern string
Destination string
}
// StreamSource identifies a stream to tail from.
type StreamSource struct {
SourceStream string
FilterSubject string
StartSeq uint64
StartTimeMs int64
MaxInitialBackfill uint64
SubjectTransform *SubjectTransform
OnDrop OnDropPolicy
DlqStream string
}
// --- StreamConfig ---
// StreamConfig is the creation config for a stream.
type StreamConfig struct {
Name string
Subjects []string
Retention RetentionPolicy
MaxAge time.Duration
MaxMessages *uint64
MaxBytes *uint64
MaxMessageSize *uint64
BlockSize uint64
StrictLimits bool
Ephemeral bool
MaxMsgsPerSubject uint64
Sources []StreamSource
}
func (c StreamConfig) toPB() *pb.StreamConfigPb {
var retention *pb.Retention
switch c.Retention {
case RetentionWorkQueue:
retention = &pb.Retention{Policy: &pb.Retention_WorkQueue{WorkQueue: &pb.WorkQueueRetention{}}}
case RetentionInterest:
retention = &pb.Retention{Policy: &pb.Retention_Interest{Interest: &pb.InterestRetention{}}}
default: // RetentionLimits
var maxAgeMs *uint64
if c.MaxAge > 0 {
v := uint64(c.MaxAge.Milliseconds())
maxAgeMs = &v
}
retention = &pb.Retention{
Policy: &pb.Retention_Limits{
Limits: &pb.LimitsRetention{
MaxAgeMs: maxAgeMs,
MaxMsgs: c.MaxMessages,
MaxBytes: c.MaxBytes,
StrictLimits: c.StrictLimits,
},
},
}
}
sources := make([]*pb.StreamSourceConfigPb, len(c.Sources))
for i, s := range c.Sources {
var onDrop pb.OnDropPolicy
if s.OnDrop == OnDropSkipToFirstAvailable {
onDrop = pb.OnDropPolicy_ON_DROP_SKIP_TO_FIRST_AVAILABLE
}
src := &pb.StreamSourceConfigPb{
SourceStream: s.SourceStream,
FilterSubject: s.FilterSubject,
StartSeq: s.StartSeq,
StartTimeMs: s.StartTimeMs,
MaxInitialBackfill: s.MaxInitialBackfill,
OnDrop: onDrop,
DlqStream: s.DlqStream,
}
if s.SubjectTransform != nil {
src.SubjectTransform = &pb.SubjectTransformPb{
SourcePattern: s.SubjectTransform.SourcePattern,
Destination: s.SubjectTransform.Destination,
}
}
sources[i] = src
}
return &pb.StreamConfigPb{
Name: c.Name,
SubjectsFilter: c.Subjects,
Retention: retention,
BlockSize: c.BlockSize,
MaxMsgBytes: uint64OrZero(c.MaxMessageSize),
Ephemeral: c.Ephemeral,
MaxMsgsPerSubject: c.MaxMsgsPerSubject,
Sources: sources,
}
}
func uint64OrZero(p *uint64) uint64 {
if p == nil {
return 0
}
return *p
}
// --- StreamUpdate ---
// StreamUpdate carries the mutable subset of a stream's config.
// Nil fields are not touched server-side.
type StreamUpdate struct {
MaxAge *time.Duration
MaxMessages *uint64
MaxBytes *uint64
MaxMsgBytes *uint64
StrictLimits *bool
}
// --- SourceStatus ---
// SourceStatus is one row of per-(sourcing, source) status.
type SourceStatus struct {
SourcingStream string
SourceStream string
LastSourcedSeq uint64
PulledTotal uint64
LastError string
LastErrorTsMs int64
}
// --- PublishAck ---
// PublishAck is returned by Stream.Publish.
type PublishAck struct {
Sequence uint64
}
// --- Stream handle ---
// Stream is a reference to a stream on the server. Cheap to copy.
type Stream struct {
client *Client
Name string
}
func newStream(c *Client, name string) *Stream {
return &Stream{client: c, Name: name}
}
// Publish sends a message into this stream.
func (s *Stream) Publish(ctx context.Context, subject string, payload []byte) (PublishAck, error) {
return s.PublishWithHeaders(ctx, subject, nil, payload)
}
// PublishWithHeaders publishes with explicit headers.
func (s *Stream) PublishWithHeaders(ctx context.Context, subject string, headers [][2]string, payload []byte) (PublishAck, error) {
pbHeaders := make([]*pb.MessageHeader, len(headers))
for i, h := range headers {
pbHeaders[i] = &pb.MessageHeader{Key: h[0], Value: h[1]}
}
c := s.client.streamsClient()
r, err := c.Publish(ctx, &pb.PublishRequest{
Stream: s.Name,
Subject: subject,
Payload: payload,
Headers: pbHeaders,
})
if err != nil {
return PublishAck{}, rpcErr(err)
}
if !r.GetSuccess() {
return PublishAck{}, serverErr(r.GetResultCode(), r.GetMessage())
}
return PublishAck{Sequence: r.GetSeq()}, nil
}
// SourcesStatus returns the per-source tail status for this stream.
func (s *Stream) SourcesStatus(ctx context.Context) ([]SourceStatus, error) {
c := s.client.streamsClient()
r, err := c.GetStreamInfo(ctx, &pb.GetStreamInfoRequest{Name: s.Name})
if err != nil {
return nil, rpcErr(err)
}
if !r.GetSuccess() {
return nil, serverErr(r.GetResultCode(), r.GetMessage())
}
out := make([]SourceStatus, len(r.GetSourcesStatus()))
for i, ss := range r.GetSourcesStatus() {
out[i] = SourceStatus{
SourcingStream: s.Name,
SourceStream: ss.GetSourceStream(),
LastSourcedSeq: ss.GetLastSourcedSeq(),
PulledTotal: ss.GetPulledTotal(),
LastError: ss.GetLastError(),
LastErrorTsMs: ss.GetLastErrorTsMs(),
}
}
return out, nil
}
// CreateConsumer creates a new consumer on this stream.
func (s *Stream) CreateConsumer(ctx context.Context, config ConsumerConfig) (*Consumer, error) {
if config.DurableName == "" {
return nil, invalidErr("ConsumerConfig.DurableName is required")
}
c := s.client.streamsClient()
r, err := c.CreateConsumer(ctx, &pb.CreateConsumerRequest{
Stream: s.Name,
Config: config.toPB(),
})
if err != nil {
return nil, rpcErr(err)
}
if !r.GetSuccess() {
return nil, serverErr(r.GetResultCode(), r.GetMessage())
}
return newConsumer(s.client, s.Name, config.DurableName), nil
}
// GetOrCreateConsumer is the idempotent variant.
func (s *Stream) GetOrCreateConsumer(ctx context.Context, config ConsumerConfig) (*Consumer, error) {
if config.DurableName == "" {
return nil, invalidErr("ConsumerConfig.DurableName is required")
}
c, err := s.GetConsumer(ctx, config.DurableName)
if err == nil {
return c, nil
}
if IsServerCode(err, "no_such_consumer") {
return s.CreateConsumer(ctx, config)
}
return nil, err
}
// GetConsumer returns a handle to an existing consumer.
func (s *Stream) GetConsumer(ctx context.Context, name string) (*Consumer, error) {
c := s.client.streamsClient()
r, err := c.GetConsumerInfo(ctx, &pb.GetConsumerInfoRequest{
Stream: s.Name,
Consumer: name,
})
if err != nil {
return nil, rpcErr(err)
}
if !r.GetSuccess() {
return nil, serverErr(r.GetResultCode(), r.GetMessage())
}
return newConsumer(s.client, s.Name, name), nil
}
// DeleteConsumer deletes a consumer by name.
func (s *Stream) DeleteConsumer(ctx context.Context, name string) error {
c := s.client.streamsClient()
r, err := c.DeleteConsumer(ctx, &pb.DeleteConsumerRequest{
Stream: s.Name,
Consumer: name,
})
if err != nil {
return rpcErr(err)
}
if !r.GetSuccess() {
return serverErr(r.GetResultCode(), r.GetMessage())
}
return nil
}
// --- Consumer ---
// DeliverPolicy controls where a consumer starts delivering.
type DeliverPolicy int
const (
DeliverAll DeliverPolicy = iota // from the first message
DeliverNew // from messages published after subscription
DeliverLast // from the last message
DeliverByStartSeq // from a given sequence
DeliverByStartTime // from a given timestamp
)
// ConsumerConfig is the consumer creation config.
type ConsumerConfig struct {
DurableName string
FilterSubject string
DeliverPolicy DeliverPolicy
StartSeq uint64 // for DeliverByStartSeq
StartTimeMs int64 // for DeliverByStartTime
AckWait time.Duration
MaxDeliver uint32
DeliverGroup string
DeadLetterSubject string
}
func (c ConsumerConfig) toPB() *pb.ConsumerConfigPb {
ackWaitMs := uint64(30_000)
if c.AckWait > 0 {
ackWaitMs = uint64(c.AckWait.Milliseconds())
}
maxDeliver := c.MaxDeliver
if maxDeliver == 0 {
maxDeliver = 5
}
var policy *pb.DeliveryPolicyPb
switch c.DeliverPolicy {
case DeliverNew:
now := time.Now().UnixMilli()
policy = &pb.DeliveryPolicyPb{
Type: pb.DeliveryPolicyType_DELIVERY_BY_START_TIME,
StartTimeMs: now,
}
case DeliverLast:
policy = &pb.DeliveryPolicyPb{Type: pb.DeliveryPolicyType_DELIVERY_LAST}
case DeliverByStartSeq:
policy = &pb.DeliveryPolicyPb{
Type: pb.DeliveryPolicyType_DELIVERY_BY_START_SEQ,
StartSeq: c.StartSeq,
}
case DeliverByStartTime:
policy = &pb.DeliveryPolicyPb{
Type: pb.DeliveryPolicyType_DELIVERY_BY_START_TIME,
StartTimeMs: c.StartTimeMs,
}
default: // DeliverAll
policy = &pb.DeliveryPolicyPb{Type: pb.DeliveryPolicyType_DELIVERY_ALL}
}
return &pb.ConsumerConfigPb{
Name: c.DurableName,
FilterSubject: c.FilterSubject,
DeliveryPolicy: policy,
AckWaitMs: ackWaitMs,
MaxDeliver: maxDeliver,
DeliverGroup: c.DeliverGroup,
DeadLetterSubject: c.DeadLetterSubject,
}
}
// Consumer is a consumer handle. Cheap to copy.
type Consumer struct {
client *Client
StreamName string
Name string
}
func newConsumer(c *Client, stream, name string) *Consumer {
return &Consumer{client: c, StreamName: stream, Name: name}
}
// --- Message ---
// Message is a delivered message. Carries enough context to Ack/Nak/Term/
// InProgress through the parent consumer.
type Message struct {
Subject string
Payload []byte
Headers [][2]string
Sequence uint64
TsMs int64
DeliverCount uint32
client *Client
stream string
consumer string
}
func msgFromPB(c *Client, stream, consumer string, m *pb.MessagePb) *Message {
hdrs := make([][2]string, len(m.GetHeaders()))
for i, h := range m.GetHeaders() {
hdrs[i] = [2]string{h.GetKey(), h.GetValue()}
}
return &Message{
Subject: m.GetSubject(),
Payload: m.GetPayload(),
Headers: hdrs,
Sequence: m.GetSeq(),
TsMs: m.GetTsMs(),
DeliverCount: m.GetDeliverCount(),
client: c,
stream: stream,
consumer: consumer,
}
}
// Ack positively acknowledges the message.
func (m *Message) Ack(ctx context.Context) error {
c := m.client.streamsClient()
r, err := c.Ack(ctx, &pb.AckRequest{Stream: m.stream, Consumer: m.consumer, Seq: m.Sequence})
if err != nil {
return rpcErr(err)
}
if !r.GetSuccess() {
return serverErr(r.GetResultCode(), r.GetMessage())
}
return nil
}
// Nak negatively acknowledges — request immediate redelivery.
func (m *Message) Nak(ctx context.Context) error {
return m.NakWithDelay(ctx, 0)
}
// NakWithDelay negatively acknowledges with a redelivery delay.
func (m *Message) NakWithDelay(ctx context.Context, delay time.Duration) error {
c := m.client.streamsClient()
r, err := c.Nak(ctx, &pb.NakRequest{
Stream: m.stream,
Consumer: m.consumer,
Seq: m.Sequence,
DelayMs: uint64(delay.Milliseconds()),
})
if err != nil {
return rpcErr(err)
}
if !r.GetSuccess() {
return serverErr(r.GetResultCode(), r.GetMessage())
}
return nil
}
// Term terminates delivery permanently regardless of max_deliver.
func (m *Message) Term(ctx context.Context) error {
c := m.client.streamsClient()
r, err := c.Term(ctx, &pb.TermRequest{Stream: m.stream, Consumer: m.consumer, Seq: m.Sequence})
if err != nil {
return rpcErr(err)
}
if !r.GetSuccess() {
return serverErr(r.GetResultCode(), r.GetMessage())
}
return nil
}
// InProgress extends the ack_wait window without acking.
func (m *Message) InProgress(ctx context.Context) error {
c := m.client.streamsClient()
r, err := c.InProgress(ctx, &pb.InProgressRequest{Stream: m.stream, Consumer: m.consumer, Seq: m.Sequence})
if err != nil {
return rpcErr(err)
}
if !r.GetSuccess() {
return serverErr(r.GetResultCode(), r.GetMessage())
}
return nil
}
// --- Consumer methods ---
// Messages opens a push-mode subscription. Returns a *Messages iterator.
func (con *Consumer) Messages(ctx context.Context) (*Messages, error) {
return con.MessagesWithBatchSize(ctx, 0)
}
// MessagesWithBatchSize opens a subscription with a custom server-side batch
// hint. 0 = server default.
func (con *Consumer) MessagesWithBatchSize(ctx context.Context, batchSize uint32) (*Messages, error) {
c := con.client.streamsClient()
stream, err := c.Subscribe(ctx, &pb.SubscribeRequest{
Stream: con.StreamName,
Consumer: con.Name,
BatchSize: batchSize,
StopWhenEmpty: false,
})
if err != nil {
return nil, rpcErr(err)
}
return &Messages{
inner: stream,
client: con.client,
stream: con.StreamName,
consumer: con.Name,
}, nil
}
// Fetch pull-style: fetches up to batchSize messages immediately.
func (con *Consumer) Fetch(ctx context.Context, batchSize uint32) ([]*Message, error) {
c := con.client.streamsClient()
r, err := c.Fetch(ctx, &pb.FetchRequest{
Stream: con.StreamName,
Consumer: con.Name,
BatchSize: batchSize,
})
if err != nil {
return nil, rpcErr(err)
}
if !r.GetSuccess() {
return nil, serverErr(r.GetResultCode(), r.GetMessage())
}
msgs := make([]*Message, len(r.GetMessages()))
for i, m := range r.GetMessages() {
msgs[i] = msgFromPB(con.client, con.StreamName, con.Name, m)
}
return msgs, nil
}
// --- Messages iterator ---
// Messages is a push-mode subscription iterator. Call Next to get each
// delivered message. Receiving from Messages is blocking; cancel the ctx
// to stop.
type Messages struct {
inner pb.WaymakerStreamsService_SubscribeClient
client *Client
stream string
consumer string
}
// Next blocks until the next message arrives or an error occurs.
// Returns (nil, nil) on normal end-of-stream.
func (m *Messages) Next() (*Message, error) {
for {
ev, err := m.inner.Recv()
if err != nil {
if err == io.EOF {
return nil, nil
}
return nil, rpcErr(err)
}
switch e := ev.GetEvent().(type) {
case *pb.SubscribeEvent_Message:
return msgFromPB(m.client, m.stream, m.consumer, e.Message), nil
case *pb.SubscribeEvent_Stopped:
if e.Stopped.GetReason() == "" {
return nil, nil
}
return nil, serverErr("subscribe_stopped", e.Stopped.GetReason())
}
}
}
// --- Client entry points for streams ---
// CreateStream creates a new stream.
func (c *Client) CreateStream(ctx context.Context, config StreamConfig) (*Stream, error) {
sc := c.streamsClient()
r, err := sc.CreateStream(ctx, &pb.CreateStreamRequest{Config: config.toPB()})
if err != nil {
return nil, rpcErr(err)
}
if !r.GetSuccess() {
return nil, serverErr(r.GetResultCode(), r.GetMessage())
}
return newStream(c, config.Name), nil
}
// GetStream returns a handle to an existing stream. Confirms existence
// via GetStreamInfo.
func (c *Client) GetStream(ctx context.Context, name string) (*Stream, error) {
sc := c.streamsClient()
r, err := sc.GetStreamInfo(ctx, &pb.GetStreamInfoRequest{Name: name})
if err != nil {
return nil, rpcErr(err)
}
if !r.GetSuccess() {
return nil, serverErr(r.GetResultCode(), r.GetMessage())
}
return newStream(c, name), nil
}
// GetOrCreateStream is the idempotent create-or-get.
func (c *Client) GetOrCreateStream(ctx context.Context, config StreamConfig) (*Stream, error) {
s, err := c.GetStream(ctx, config.Name)
if err == nil {
return s, nil
}
if IsServerCode(err, "no_such_stream") {
return c.CreateStream(ctx, config)
}
return nil, err
}
// UpdateStream applies the mutable subset of a stream's config.
func (c *Client) UpdateStream(ctx context.Context, name string, update StreamUpdate) error {
var maxAgeMs *uint64
if update.MaxAge != nil {
v := uint64(update.MaxAge.Milliseconds())
maxAgeMs = &v
}
sc := c.streamsClient()
r, err := sc.UpdateStream(ctx, &pb.UpdateStreamRequest{
Name: name,
MaxAgeMs: maxAgeMs,
MaxMsgs: update.MaxMessages,
MaxBytes: update.MaxBytes,
MaxMsgBytes: update.MaxMsgBytes,
StrictLimits: update.StrictLimits,
})
if err != nil {
return rpcErr(err)
}
if !r.GetSuccess() {
return serverErr(r.GetResultCode(), r.GetMessage())
}
return nil
}
// DeleteStream deletes a stream and all its consumers.
func (c *Client) DeleteStream(ctx context.Context, name string) error {
sc := c.streamsClient()
r, err := sc.DeleteStream(ctx, &pb.DeleteStreamRequest{Name: name})
if err != nil {
return rpcErr(err)
}
if !r.GetSuccess() {
return serverErr(r.GetResultCode(), r.GetMessage())
}
return nil
}
// GetStreamSources enumerates every (sourcing, source) tail running on
// the node serving the request. Multi-node clusters need to query each
// node.
func (c *Client) GetStreamSources(ctx context.Context) ([]SourceStatus, error) {
sc := c.streamsClient()
r, err := sc.GetStreamSources(ctx, &pb.GetStreamSourcesRequest{})
if err != nil {
return nil, rpcErr(err)
}
if !r.GetSuccess() {
return nil, serverErr(r.GetResultCode(), r.GetMessage())
}
out := make([]SourceStatus, len(r.GetEntries()))
for i, e := range r.GetEntries() {
out[i] = SourceStatus{
SourcingStream: e.GetSourcingStream(),
SourceStream: e.GetSourceStream(),
LastSourcedSeq: e.GetLastSourcedSeq(),
PulledTotal: e.GetPulledTotal(),
LastError: e.GetLastError(),
LastErrorTsMs: e.GetLastErrorTsMs(),
}
}
return out, nil
}