package waymaker // Streams subsystem — wraps the stream lifecycle, publish, fetch, and // consumer (push + pull) RPCs. // // Entry points on *Client: // - client.CreateStream(ctx, StreamConfig{…}) // - client.GetStream(ctx, name) // - client.GetOrCreateStream(ctx, StreamConfig{…}) // - client.UpdateStream(ctx, name, StreamUpdate{…}) // - client.DeleteStream(ctx, name) import ( "context" "io" "time" pb "git.awesomike.com/pub/waymaker-client/go/genpb/streams" ) // --- Retention policy --- // RetentionPolicy controls how the stream prunes old messages. type RetentionPolicy int const ( RetentionLimits RetentionPolicy = iota // size/age limits; default RetentionWorkQueue // exactly-once work queue RetentionInterest // retain while consumers exist ) // --- StreamSource --- // OnDropPolicy controls what a cross-stream tail does when the source // drops messages past the last-sourced seq. type OnDropPolicy int const ( OnDropHalt OnDropPolicy = 0 OnDropSkipToFirstAvailable OnDropPolicy = 1 ) // SubjectTransform is a NATS-style subject rewrite for cross-stream sources. type SubjectTransform struct { SourcePattern string Destination string } // StreamSource identifies a stream to tail from. type StreamSource struct { SourceStream string FilterSubject string StartSeq uint64 StartTimeMs int64 MaxInitialBackfill uint64 SubjectTransform *SubjectTransform OnDrop OnDropPolicy DlqStream string } // --- StreamConfig --- // StreamConfig is the creation config for a stream. type StreamConfig struct { Name string Subjects []string Retention RetentionPolicy MaxAge time.Duration MaxMessages *uint64 MaxBytes *uint64 MaxMessageSize *uint64 BlockSize uint64 StrictLimits bool Ephemeral bool MaxMsgsPerSubject uint64 Sources []StreamSource } func (c StreamConfig) toPB() *pb.StreamConfigPb { var retention *pb.Retention switch c.Retention { case RetentionWorkQueue: retention = &pb.Retention{Policy: &pb.Retention_WorkQueue{WorkQueue: &pb.WorkQueueRetention{}}} case RetentionInterest: retention = &pb.Retention{Policy: &pb.Retention_Interest{Interest: &pb.InterestRetention{}}} default: // RetentionLimits var maxAgeMs *uint64 if c.MaxAge > 0 { v := uint64(c.MaxAge.Milliseconds()) maxAgeMs = &v } retention = &pb.Retention{ Policy: &pb.Retention_Limits{ Limits: &pb.LimitsRetention{ MaxAgeMs: maxAgeMs, MaxMsgs: c.MaxMessages, MaxBytes: c.MaxBytes, StrictLimits: c.StrictLimits, }, }, } } sources := make([]*pb.StreamSourceConfigPb, len(c.Sources)) for i, s := range c.Sources { var onDrop pb.OnDropPolicy if s.OnDrop == OnDropSkipToFirstAvailable { onDrop = pb.OnDropPolicy_ON_DROP_SKIP_TO_FIRST_AVAILABLE } src := &pb.StreamSourceConfigPb{ SourceStream: s.SourceStream, FilterSubject: s.FilterSubject, StartSeq: s.StartSeq, StartTimeMs: s.StartTimeMs, MaxInitialBackfill: s.MaxInitialBackfill, OnDrop: onDrop, DlqStream: s.DlqStream, } if s.SubjectTransform != nil { src.SubjectTransform = &pb.SubjectTransformPb{ SourcePattern: s.SubjectTransform.SourcePattern, Destination: s.SubjectTransform.Destination, } } sources[i] = src } return &pb.StreamConfigPb{ Name: c.Name, SubjectsFilter: c.Subjects, Retention: retention, BlockSize: c.BlockSize, MaxMsgBytes: uint64OrZero(c.MaxMessageSize), Ephemeral: c.Ephemeral, MaxMsgsPerSubject: c.MaxMsgsPerSubject, Sources: sources, } } func uint64OrZero(p *uint64) uint64 { if p == nil { return 0 } return *p } // --- StreamUpdate --- // StreamUpdate carries the mutable subset of a stream's config. // Nil fields are not touched server-side. type StreamUpdate struct { MaxAge *time.Duration MaxMessages *uint64 MaxBytes *uint64 MaxMsgBytes *uint64 StrictLimits *bool } // --- SourceStatus --- // SourceStatus is one row of per-(sourcing, source) status. type SourceStatus struct { SourcingStream string SourceStream string LastSourcedSeq uint64 PulledTotal uint64 LastError string LastErrorTsMs int64 } // --- PublishAck --- // PublishAck is returned by Stream.Publish. type PublishAck struct { Sequence uint64 } // --- Stream handle --- // Stream is a reference to a stream on the server. Cheap to copy. type Stream struct { client *Client Name string } func newStream(c *Client, name string) *Stream { return &Stream{client: c, Name: name} } // Publish sends a message into this stream. func (s *Stream) Publish(ctx context.Context, subject string, payload []byte) (PublishAck, error) { return s.PublishWithHeaders(ctx, subject, nil, payload) } // PublishWithHeaders publishes with explicit headers. func (s *Stream) PublishWithHeaders(ctx context.Context, subject string, headers [][2]string, payload []byte) (PublishAck, error) { pbHeaders := make([]*pb.MessageHeader, len(headers)) for i, h := range headers { pbHeaders[i] = &pb.MessageHeader{Key: h[0], Value: h[1]} } c := s.client.streamsClient() r, err := c.Publish(ctx, &pb.PublishRequest{ Stream: s.Name, Subject: subject, Payload: payload, Headers: pbHeaders, }) if err != nil { return PublishAck{}, rpcErr(err) } if !r.GetSuccess() { return PublishAck{}, serverErr(r.GetResultCode(), r.GetMessage()) } return PublishAck{Sequence: r.GetSeq()}, nil } // SourcesStatus returns the per-source tail status for this stream. func (s *Stream) SourcesStatus(ctx context.Context) ([]SourceStatus, error) { c := s.client.streamsClient() r, err := c.GetStreamInfo(ctx, &pb.GetStreamInfoRequest{Name: s.Name}) if err != nil { return nil, rpcErr(err) } if !r.GetSuccess() { return nil, serverErr(r.GetResultCode(), r.GetMessage()) } out := make([]SourceStatus, len(r.GetSourcesStatus())) for i, ss := range r.GetSourcesStatus() { out[i] = SourceStatus{ SourcingStream: s.Name, SourceStream: ss.GetSourceStream(), LastSourcedSeq: ss.GetLastSourcedSeq(), PulledTotal: ss.GetPulledTotal(), LastError: ss.GetLastError(), LastErrorTsMs: ss.GetLastErrorTsMs(), } } return out, nil } // CreateConsumer creates a new consumer on this stream. func (s *Stream) CreateConsumer(ctx context.Context, config ConsumerConfig) (*Consumer, error) { if config.DurableName == "" { return nil, invalidErr("ConsumerConfig.DurableName is required") } c := s.client.streamsClient() r, err := c.CreateConsumer(ctx, &pb.CreateConsumerRequest{ Stream: s.Name, Config: config.toPB(), }) if err != nil { return nil, rpcErr(err) } if !r.GetSuccess() { return nil, serverErr(r.GetResultCode(), r.GetMessage()) } return newConsumer(s.client, s.Name, config.DurableName), nil } // GetOrCreateConsumer is the idempotent variant. func (s *Stream) GetOrCreateConsumer(ctx context.Context, config ConsumerConfig) (*Consumer, error) { if config.DurableName == "" { return nil, invalidErr("ConsumerConfig.DurableName is required") } c, err := s.GetConsumer(ctx, config.DurableName) if err == nil { return c, nil } if IsServerCode(err, "no_such_consumer") { return s.CreateConsumer(ctx, config) } return nil, err } // GetConsumer returns a handle to an existing consumer. func (s *Stream) GetConsumer(ctx context.Context, name string) (*Consumer, error) { c := s.client.streamsClient() r, err := c.GetConsumerInfo(ctx, &pb.GetConsumerInfoRequest{ Stream: s.Name, Consumer: name, }) if err != nil { return nil, rpcErr(err) } if !r.GetSuccess() { return nil, serverErr(r.GetResultCode(), r.GetMessage()) } return newConsumer(s.client, s.Name, name), nil } // DeleteConsumer deletes a consumer by name. func (s *Stream) DeleteConsumer(ctx context.Context, name string) error { c := s.client.streamsClient() r, err := c.DeleteConsumer(ctx, &pb.DeleteConsumerRequest{ Stream: s.Name, Consumer: name, }) if err != nil { return rpcErr(err) } if !r.GetSuccess() { return serverErr(r.GetResultCode(), r.GetMessage()) } return nil } // --- Consumer --- // DeliverPolicy controls where a consumer starts delivering. type DeliverPolicy int const ( DeliverAll DeliverPolicy = iota // from the first message DeliverNew // from messages published after subscription DeliverLast // from the last message DeliverByStartSeq // from a given sequence DeliverByStartTime // from a given timestamp ) // ConsumerConfig is the consumer creation config. type ConsumerConfig struct { DurableName string FilterSubject string DeliverPolicy DeliverPolicy StartSeq uint64 // for DeliverByStartSeq StartTimeMs int64 // for DeliverByStartTime AckWait time.Duration MaxDeliver uint32 DeliverGroup string DeadLetterSubject string } func (c ConsumerConfig) toPB() *pb.ConsumerConfigPb { ackWaitMs := uint64(30_000) if c.AckWait > 0 { ackWaitMs = uint64(c.AckWait.Milliseconds()) } maxDeliver := c.MaxDeliver if maxDeliver == 0 { maxDeliver = 5 } var policy *pb.DeliveryPolicyPb switch c.DeliverPolicy { case DeliverNew: now := time.Now().UnixMilli() policy = &pb.DeliveryPolicyPb{ Type: pb.DeliveryPolicyType_DELIVERY_BY_START_TIME, StartTimeMs: now, } case DeliverLast: policy = &pb.DeliveryPolicyPb{Type: pb.DeliveryPolicyType_DELIVERY_LAST} case DeliverByStartSeq: policy = &pb.DeliveryPolicyPb{ Type: pb.DeliveryPolicyType_DELIVERY_BY_START_SEQ, StartSeq: c.StartSeq, } case DeliverByStartTime: policy = &pb.DeliveryPolicyPb{ Type: pb.DeliveryPolicyType_DELIVERY_BY_START_TIME, StartTimeMs: c.StartTimeMs, } default: // DeliverAll policy = &pb.DeliveryPolicyPb{Type: pb.DeliveryPolicyType_DELIVERY_ALL} } return &pb.ConsumerConfigPb{ Name: c.DurableName, FilterSubject: c.FilterSubject, DeliveryPolicy: policy, AckWaitMs: ackWaitMs, MaxDeliver: maxDeliver, DeliverGroup: c.DeliverGroup, DeadLetterSubject: c.DeadLetterSubject, } } // Consumer is a consumer handle. Cheap to copy. type Consumer struct { client *Client StreamName string Name string } func newConsumer(c *Client, stream, name string) *Consumer { return &Consumer{client: c, StreamName: stream, Name: name} } // --- Message --- // Message is a delivered message. Carries enough context to Ack/Nak/Term/ // InProgress through the parent consumer. type Message struct { Subject string Payload []byte Headers [][2]string Sequence uint64 TsMs int64 DeliverCount uint32 client *Client stream string consumer string } func msgFromPB(c *Client, stream, consumer string, m *pb.MessagePb) *Message { hdrs := make([][2]string, len(m.GetHeaders())) for i, h := range m.GetHeaders() { hdrs[i] = [2]string{h.GetKey(), h.GetValue()} } return &Message{ Subject: m.GetSubject(), Payload: m.GetPayload(), Headers: hdrs, Sequence: m.GetSeq(), TsMs: m.GetTsMs(), DeliverCount: m.GetDeliverCount(), client: c, stream: stream, consumer: consumer, } } // Ack positively acknowledges the message. func (m *Message) Ack(ctx context.Context) error { c := m.client.streamsClient() r, err := c.Ack(ctx, &pb.AckRequest{Stream: m.stream, Consumer: m.consumer, Seq: m.Sequence}) if err != nil { return rpcErr(err) } if !r.GetSuccess() { return serverErr(r.GetResultCode(), r.GetMessage()) } return nil } // Nak negatively acknowledges — request immediate redelivery. func (m *Message) Nak(ctx context.Context) error { return m.NakWithDelay(ctx, 0) } // NakWithDelay negatively acknowledges with a redelivery delay. func (m *Message) NakWithDelay(ctx context.Context, delay time.Duration) error { c := m.client.streamsClient() r, err := c.Nak(ctx, &pb.NakRequest{ Stream: m.stream, Consumer: m.consumer, Seq: m.Sequence, DelayMs: uint64(delay.Milliseconds()), }) if err != nil { return rpcErr(err) } if !r.GetSuccess() { return serverErr(r.GetResultCode(), r.GetMessage()) } return nil } // Term terminates delivery permanently regardless of max_deliver. func (m *Message) Term(ctx context.Context) error { c := m.client.streamsClient() r, err := c.Term(ctx, &pb.TermRequest{Stream: m.stream, Consumer: m.consumer, Seq: m.Sequence}) if err != nil { return rpcErr(err) } if !r.GetSuccess() { return serverErr(r.GetResultCode(), r.GetMessage()) } return nil } // InProgress extends the ack_wait window without acking. func (m *Message) InProgress(ctx context.Context) error { c := m.client.streamsClient() r, err := c.InProgress(ctx, &pb.InProgressRequest{Stream: m.stream, Consumer: m.consumer, Seq: m.Sequence}) if err != nil { return rpcErr(err) } if !r.GetSuccess() { return serverErr(r.GetResultCode(), r.GetMessage()) } return nil } // --- Consumer methods --- // Messages opens a push-mode subscription. Returns a *Messages iterator. func (con *Consumer) Messages(ctx context.Context) (*Messages, error) { return con.MessagesWithBatchSize(ctx, 0) } // MessagesWithBatchSize opens a subscription with a custom server-side batch // hint. 0 = server default. func (con *Consumer) MessagesWithBatchSize(ctx context.Context, batchSize uint32) (*Messages, error) { c := con.client.streamsClient() stream, err := c.Subscribe(ctx, &pb.SubscribeRequest{ Stream: con.StreamName, Consumer: con.Name, BatchSize: batchSize, StopWhenEmpty: false, }) if err != nil { return nil, rpcErr(err) } return &Messages{ inner: stream, client: con.client, stream: con.StreamName, consumer: con.Name, }, nil } // Fetch pull-style: fetches up to batchSize messages immediately. func (con *Consumer) Fetch(ctx context.Context, batchSize uint32) ([]*Message, error) { c := con.client.streamsClient() r, err := c.Fetch(ctx, &pb.FetchRequest{ Stream: con.StreamName, Consumer: con.Name, BatchSize: batchSize, }) if err != nil { return nil, rpcErr(err) } if !r.GetSuccess() { return nil, serverErr(r.GetResultCode(), r.GetMessage()) } msgs := make([]*Message, len(r.GetMessages())) for i, m := range r.GetMessages() { msgs[i] = msgFromPB(con.client, con.StreamName, con.Name, m) } return msgs, nil } // --- Messages iterator --- // Messages is a push-mode subscription iterator. Call Next to get each // delivered message. Receiving from Messages is blocking; cancel the ctx // to stop. type Messages struct { inner pb.WaymakerStreamsService_SubscribeClient client *Client stream string consumer string } // Next blocks until the next message arrives or an error occurs. // Returns (nil, nil) on normal end-of-stream. func (m *Messages) Next() (*Message, error) { for { ev, err := m.inner.Recv() if err != nil { if err == io.EOF { return nil, nil } return nil, rpcErr(err) } switch e := ev.GetEvent().(type) { case *pb.SubscribeEvent_Message: return msgFromPB(m.client, m.stream, m.consumer, e.Message), nil case *pb.SubscribeEvent_Stopped: if e.Stopped.GetReason() == "" { return nil, nil } return nil, serverErr("subscribe_stopped", e.Stopped.GetReason()) } } } // --- Client entry points for streams --- // CreateStream creates a new stream. func (c *Client) CreateStream(ctx context.Context, config StreamConfig) (*Stream, error) { sc := c.streamsClient() r, err := sc.CreateStream(ctx, &pb.CreateStreamRequest{Config: config.toPB()}) if err != nil { return nil, rpcErr(err) } if !r.GetSuccess() { return nil, serverErr(r.GetResultCode(), r.GetMessage()) } return newStream(c, config.Name), nil } // GetStream returns a handle to an existing stream. Confirms existence // via GetStreamInfo. func (c *Client) GetStream(ctx context.Context, name string) (*Stream, error) { sc := c.streamsClient() r, err := sc.GetStreamInfo(ctx, &pb.GetStreamInfoRequest{Name: name}) if err != nil { return nil, rpcErr(err) } if !r.GetSuccess() { return nil, serverErr(r.GetResultCode(), r.GetMessage()) } return newStream(c, name), nil } // GetOrCreateStream is the idempotent create-or-get. func (c *Client) GetOrCreateStream(ctx context.Context, config StreamConfig) (*Stream, error) { s, err := c.GetStream(ctx, config.Name) if err == nil { return s, nil } if IsServerCode(err, "no_such_stream") { return c.CreateStream(ctx, config) } return nil, err } // UpdateStream applies the mutable subset of a stream's config. func (c *Client) UpdateStream(ctx context.Context, name string, update StreamUpdate) error { var maxAgeMs *uint64 if update.MaxAge != nil { v := uint64(update.MaxAge.Milliseconds()) maxAgeMs = &v } sc := c.streamsClient() r, err := sc.UpdateStream(ctx, &pb.UpdateStreamRequest{ Name: name, MaxAgeMs: maxAgeMs, MaxMsgs: update.MaxMessages, MaxBytes: update.MaxBytes, MaxMsgBytes: update.MaxMsgBytes, StrictLimits: update.StrictLimits, }) if err != nil { return rpcErr(err) } if !r.GetSuccess() { return serverErr(r.GetResultCode(), r.GetMessage()) } return nil } // DeleteStream deletes a stream and all its consumers. func (c *Client) DeleteStream(ctx context.Context, name string) error { sc := c.streamsClient() r, err := sc.DeleteStream(ctx, &pb.DeleteStreamRequest{Name: name}) if err != nil { return rpcErr(err) } if !r.GetSuccess() { return serverErr(r.GetResultCode(), r.GetMessage()) } return nil } // GetStreamSources enumerates every (sourcing, source) tail running on // the node serving the request. Multi-node clusters need to query each // node. func (c *Client) GetStreamSources(ctx context.Context) ([]SourceStatus, error) { sc := c.streamsClient() r, err := sc.GetStreamSources(ctx, &pb.GetStreamSourcesRequest{}) if err != nil { return nil, rpcErr(err) } if !r.GetSuccess() { return nil, serverErr(r.GetResultCode(), r.GetMessage()) } out := make([]SourceStatus, len(r.GetEntries())) for i, e := range r.GetEntries() { out[i] = SourceStatus{ SourcingStream: e.GetSourcingStream(), SourceStream: e.GetSourceStream(), LastSourcedSeq: e.GetLastSourcedSeq(), PulledTotal: e.GetPulledTotal(), LastError: e.GetLastError(), LastErrorTsMs: e.GetLastErrorTsMs(), } } return out, nil }