syntax = "proto3"; package waymaker.streams; option go_package = "/apis/waymaker_streams"; // VERSION COMPATIBILITY POLICY // ---------------------------- // Same rules as waymaker.proto: never reuse field numbers, append-only // enums, no type changes, RPC removal = new package. See the comment // at the top of waymaker.proto for the full rationale. // // This is the Phase-1 surface for the JetStream-lite streams subsystem. // The handler set is intentionally narrow — pull-mode delivery only, // unary RPCs only. Push consumers (server-streaming Subscribe) and // admin streaming endpoints arrive in Phase 2. service WaymakerStreamsService { // --- Stream lifecycle --- rpc CreateStream (CreateStreamRequest) returns (CreateStreamResponse); rpc DeleteStream (DeleteStreamRequest) returns (DeleteStreamResponse); rpc GetStreamInfo (GetStreamInfoRequest) returns (GetStreamInfoResponse); rpc ListStreams (ListStreamsRequest) returns (ListStreamsResponse); // Slice 3 cross-stream sources admin: enumerate every // (sourcing, source) tail running on this node, with current // last_sourced_seq + pulled_total + last_error. Useful for // operators auditing the cluster's source topology without // ListStreams + GetStreamInfo per stream. rpc GetStreamSources (GetStreamSourcesRequest) returns (GetStreamSourcesResponse); // Update the *mutable* subset of a stream's config — the Limits // retention bounds (max_age_ms / max_msgs / max_bytes), the per- // message size cap, and the strict-limits toggle. Immutable fields // (name, subjects_filter, block_size, retention policy type) are // not touched. Lowering a bound triggers an immediate prune to // bring stats under the new limit; the primary fans the resulting // truncation out via `ReplicateTruncate` so secondaries mirror. // Partial-update semantics: only fields explicitly set in the // request are applied; unset fields leave the on-disk value // unchanged. rpc UpdateStream (UpdateStreamRequest) returns (UpdateStreamResponse); // --- Messages --- rpc Publish (PublishRequest) returns (PublishResponse); rpc Fetch (FetchRequest) returns (FetchResponse); rpc Ack (AckRequest) returns (AckResponse); // KV RPCs moved to `WaymakerKvService` in // `crates/kv/proto/kv.proto`. The underlying message types // (KvPutRequest etc.) still live in this file because the // server-internal helper `kv_publish_internal` still returns // them; the KV service trait converts at the boundary. // Hash / Set / Queue collection RPCs moved to // `WaymakerCollectionsService` in // `crates/collections/proto/collections.proto`. Message types // (HashSetRequest, etc.) still live in this proto file // because some streams-internal helpers reference them; the // collections service trait converts at the boundary. // Probabilistic data structures moved to their own service in // `WaymakerSketchesService` (see crates/sketches/proto/sketches.proto). // Negative-acknowledge: server resets the pending entry's // delivered_at_ms so the next fetch redelivers. `delay_ms` defers // eligibility by that wall-clock window (0 = immediate). The // message's `deliver_count` keeps climbing toward `max_deliver`. rpc Nak (NakRequest) returns (NakResponse); // Terminal-acknowledge: drop the pending entry permanently // without redelivery, regardless of `max_deliver`. Does NOT // trigger WorkQueue delete — other consumers can still observe // the message. rpc Term (TermRequest) returns (TermResponse); // Heartbeat-acknowledge: bump delivered_at_ms = now to extend // the ack_wait window. `deliver_count` is unchanged. rpc InProgress (InProgressRequest) returns (InProgressResponse); // Push-mode delivery: the server fetches in a loop and streams // each delivered message back to the client as it arrives. The // client acks via the unary Ack RPC just like pull-mode. The // stream stays open until the client disconnects, the server // returns an error, or the consumer is deleted. Wakes // immediately on new appends via the storage layer's subscribe // primitive — no polling for empty streams. rpc Subscribe (SubscribeRequest) returns (stream SubscribeEvent); // --- Consumers --- rpc CreateConsumer (CreateConsumerRequest) returns (CreateConsumerResponse); rpc DeleteConsumer (DeleteConsumerRequest) returns (DeleteConsumerResponse); rpc ListConsumers (ListConsumersRequest) returns (ListConsumersResponse); rpc GetConsumerInfo (GetConsumerInfoRequest) returns (GetConsumerInfoResponse); // --- Rebalancing (Phase 1: operator-driven only) --- // // The current owner of a stream serves its raw redb bytes to a peer // that's pulling the stream over. The handler atomically removes the // stream from its local registry first, refusing the call if any // outside reference is still live (operator must drain writers). On // RPC success the source deletes the local file. See // STREAMS_SPEC.md §11 for the model and limitations (no automatic // ring-change sweep yet; the operator is responsible for triggering // a migrate when membership moves a stream's authority). rpc TransferStream (TransferStreamRequest) returns (stream TransferStreamChunk); // Admin trigger on the receiving side: pull stream `name` from // `source_node_id`'s `TransferStream` and own it locally. rpc MigrateStream (MigrateStreamRequest) returns (MigrateStreamResponse); // Cluster-wide stream inventory + skew report. The receiving node // queries every cluster member's local `StreamsRegistry` (via the // existing proxy channel pool) and aggregates the result. Used by // operators to identify hash-skew imbalance before triggering // `RebalanceStreams`. Also exposed via the `wmkr-status` CLI. rpc GetClusterStreamStats (GetClusterStreamStatsRequest) returns (GetClusterStreamStatsResponse); // Server-streamed admin watch — emits a WatchEvent each time the // local node's state mutates (stream / consumer create / delete / // update). Useful for live dashboards or service-discovery // clients that want to react to topology changes without // polling. Local-only for now: each watcher sees events generated // on the node it connected to. Cluster-wide watch can be built // on top via a fan-out client; the server doesn't fan out // automatically because the events would arrive out of any // single-source ordering anyway under proxy hops. rpc WatchStreams (WatchStreamsRequest) returns (stream WatchEvent); // Read the latest message at a given subject within a stream. // The foundation for KV-style "last-value wins" lookups on top // of a stream — KV put = Publish to `.`; KV get = // this RPC against the same subject. Returns the full // MessagePb (including headers) so callers can detect KV // tombstones (`wmkv.tombstone` header). // // Returns `success: true` with `message` unset when no message // has ever been published at this subject (or all have been // pruned). Routes via `try_route!` like every other per-stream // RPC. rpc ReadLatestAtSubject (ReadLatestAtSubjectRequest) returns (ReadLatestAtSubjectResponse); // List every distinct subject in `stream` whose name starts // with `prefix`. Cost is O(matching subjects); independent of // message count. The foundation for `streams-cli kv-keys` and // service-discovery-style "everything under this namespace" // lookups. Returns subjects whose latest message is a // tombstone too — clients that want live-keys-only filter // tombstones via a follow-up `ReadLatestAtSubject`. rpc ListSubjectsByPrefix (ListSubjectsByPrefixRequest) returns (ListSubjectsByPrefixResponse); // Scan all messages published at an exact subject within // `stream`, in seq order, starting at `from_seq` (0 = from the // beginning), bounded by `limit`. The foundation for // `streams-cli kv-history` — operators want to inspect every // value ever published under a KV key (including tombstones) // for debugging/audit. Cost is O(matching messages); independent // of total stream size. Routes via `try_route!` like every // other per-stream RPC. rpc ScanExactAtSubject (ScanExactAtSubjectRequest) returns (ScanExactAtSubjectResponse); // Remove a Phase 3 per-stream authority override. Routing // reverts to the ring's hash owner. Idempotent: clearing a // stream with no override succeeds silently. Operators use this // to retire a stale override (e.g. after a ring shift made the // override redundant). Commits via a Raft entry so the clear // applies on every node before the response returns. rpc ClearStreamAuthority (ClearStreamAuthorityRequest) returns (ClearStreamAuthorityResponse); // List every Phase 3 stream_authority override active on the // responding node. The map is Raft-replicated, so any node's // response reflects the cluster-wide view (modulo apply lag). // Useful for ops triage when an unexpected number of overrides // shows up on /metrics. No fan-out — single-node RPC; the // returned set is the canonical truth. rpc ListStreamAuthorityOverrides (ListStreamAuthorityOverridesRequest) returns (ListStreamAuthorityOverridesResponse); // Toggle pinned state for `stream`. Pinned streams are exempt // from the auto-GC sweep that retires redundant overrides — use // when you want a stream to stay on its current authority node // even if the ring shifts to make the override redundant. // Idempotent. Independent of the override itself (pinning a // stream with no override is benign; the marker sits dormant). rpc SetStreamPinned (SetStreamPinnedRequest) returns (SetStreamPinnedResponse); // ---- Phase 4 — Object Store ---- // // Convention layer over streams: one stream per bucket, named // `obj:`. Two subject shapes: // - `objm.` — JSON-encoded metadata (size, chunk_count, // sha256, etc.). Latest-revision-wins KV semantics. // - `objc..` — raw chunk bytes. // // Atomicity model: chunks published first, metadata last. A // crash between steps leaves orphan chunks but no live object // (readers can't see the object because metadata is missing). // Orphan-chunk GC sweep handles cleanup. See // waymaker-streams/OBJECT_STORE_DESIGN.md §"Atomicity model". // // v0 surface: unary RPCs only (max object size ~16 MiB — the // default gRPC message cap). Streaming variants for large // objects come in a follow-up slice. rpc PutObject (PutObjectRequest) returns (PutObjectResponse); rpc GetObject (GetObjectRequest) returns (GetObjectResponse); rpc DeleteObject (DeleteObjectRequest) returns (DeleteObjectResponse); rpc GetObjectInfo (GetObjectInfoRequest) returns (GetObjectInfoResponse); rpc ListObjects (ListObjectsRequest) returns (ListObjectsResponse); // Client-streamed PutObject for arbitrary-size objects. First // frame MUST set `start { bucket, name, chunk_size, headers, // sha256 }`. Subsequent frames carry `data` only — each frame's // `data` is ONE chunk message at `objc..`. The server // accumulates a running SHA-256 and total-byte count, publishes // chunks as they arrive (replication fires async), and on the // last frame (`finish=true`) publishes the metadata. A client // disconnect before `finish=true` leaves orphan chunks; the GC // sweep cleans them up. rpc PutObjectStream (stream PutObjectStreamFrame) returns (PutObjectResponse); // Server-streamed GetObject. First frame carries `info`; // subsequent frames carry `data` only — one per chunk. Last // frame sets `done=true`. The client reassembles; the response // is sent over the wire in chunk-sized pieces so memory usage // stays bounded on both sides. rpc GetObjectStream (GetObjectRequest) returns (stream GetObjectStreamFrame); // Every revision of `name`'s metadata in seq order — covers // overwrites + tombstones. Returns one entry per metadata // message at `objm.`. Chunks are not enumerated; this RPC // is for object versioning / audit, not for binary diffing. rpc ListObjectRevisions (ListObjectRevisionsRequest) returns (ListObjectRevisionsResponse); // Read a byte range `[offset, offset + len)` from an object's // assembled payload. Only the chunks that intersect the range // are loaded server-side — useful for resumable downloads of // large objects. // - `offset + len > total_bytes` → returns whatever bytes exist // in the range (success, possibly empty). // - `offset > total_bytes` → returns empty payload (success). // - `len == 0` → returns empty payload (success). rpc GetObjectRange (GetObjectRangeRequest) returns (GetObjectRangeResponse); // Operator-driven rebalance. Takes an explicit plan — a list of // (stream, target_node) — and executes each step by issuing a // `MigrateStream` to the target. The plan is *not* auto-generated; // the operator (or a future automatic planner) is responsible for // building it from a `GetClusterStreamStats` snapshot. Steps run // sequentially with a per-step timeout; the response carries // per-step outcomes so partial success is visible. rpc RebalanceStreams (RebalanceStreamsRequest) returns (RebalanceStreamsResponse); // --- Consumer-state replication (Phase 2 §G) --- // // The primary for a stream pushes its consumers' full state to the // stream's `replication_factor - 1` secondaries after every // state-mutating consumer operation (create_consumer, fetch, ack, // delete_consumer). The push is fire-and-forget on the primary's // side — the client RPC has already returned to the caller; the // replication runs in a background task. Secondaries hold the // snapshot in memory; adoption-on-failover is a future slice. rpc ReplicateConsumerState (ReplicateConsumerStateRequest) returns (ReplicateConsumerStateResponse); // --- Cross-stream sources state replication (slice 2E) --- // // The primary for a sourcing stream pushes the current per-source // tail watermark to each secondary after every successful batch // (i.e. once per ~128 source messages). Secondaries persist the // snapshot via their own SourceTailStore so that on adoption (ring // shift → secondary becomes primary), `spawn_source_tail_tasks` // reads the replicated state and resumes from `last_sourced_seq + 1` // instead of re-pulling from `start_seq` (which would emit // duplicates with already-replicated provenance headers). rpc ReplicateSourceTailState (ReplicateSourceTailStateRequest) returns (ReplicateSourceTailStateResponse); // --- Stream-data replication (Phase 3, chunk 1) --- // // The primary for a stream pushes: // 1. ReplicateStreamCreate once at create time, so secondaries // know what stream to open in their replica registry with // what config (block_size, retention, max_msg_bytes, etc.). // 2. ReplicateMessage on every successful Publish, with the // seq the primary assigned, so the secondary's replica // mirrors the message log by seq exactly. // // Replica streams live in a per-node "replica registry" rooted at // `/replicas/.redb`, distinct from the // primary-owned namespace. The streams handler never serves // client requests from the replica — it's purely catastrophe // recovery state until the (future) adoption-on-failover slice // promotes a replica to primary. rpc ReplicateStreamCreate (ReplicateStreamCreateRequest) returns (ReplicateStreamCreateResponse); rpc ReplicateMessage (ReplicateMessageRequest) returns (ReplicateMessageResponse); // Tear down the replica when the primary deletes the stream. // Idempotent — missing replica is success. rpc ReplicateStreamDelete (ReplicateStreamDeleteRequest) returns (ReplicateStreamDeleteResponse); // The primary's retention sweep removed messages below // `first_seq`; the secondary mirrors the same truncation so its // replica's first_seq advances in lockstep. Idempotent. rpc ReplicateTruncate (ReplicateTruncateRequest) returns (ReplicateTruncateResponse); // The primary applied an UpdateStream; secondaries mirror the // mutable subset of the config so a future failover lands on a // replica whose retention matches the primary's. Carries the same // narrow shape as UpdateStreamRequest — only the mutable fields, // with partial-update semantics. rpc ReplicateStreamUpdate (ReplicateStreamUpdateRequest) returns (ReplicateStreamUpdateResponse); // Under `RetentionPolicy::WorkQueue` the primary deletes a message // on ack (delete-on-first-ack). Without this fan-out, secondaries' // replica files would still hold the acked message — and after a // failover, a fresh consumer on the new primary would see it and // re-deliver, breaking the "each message belongs to exactly one // consumer at a time" invariant. Idempotent: missing seq on // secondary is success. rpc ReplicateWorkQueueAck (ReplicateWorkQueueAckRequest) returns (ReplicateWorkQueueAckResponse); } // --------------------------------------------------------------------- // Shared types // --------------------------------------------------------------------- // `Limits` retention with three optional bounds. Any bound that's // unset (`*` field omitted) means "no limit on that dimension". message LimitsRetention { optional uint64 max_age_ms = 1; optional uint64 max_msgs = 2; optional uint64 max_bytes = 3; // `false` = block-aligned approximate pruning (default). `true` = // per-message exact pruning. See STREAMS_SPEC.md §6. bool strict_limits = 4; } message WorkQueueRetention {} // `Interest` retention: drop a block once every consumer's // `ack_floor` has advanced past its `last_seq`. With zero // consumers, every block is eligible. Block-aligned (not // per-message) so retention sweeps stay cheap. message InterestRetention {} message Retention { oneof policy { LimitsRetention limits = 1; WorkQueueRetention work_queue = 2; InterestRetention interest = 3; } } message StreamConfigPb { string name = 1; // Subject patterns this stream accepts. Empty Vec = no filter. repeated string subjects_filter = 2; Retention retention = 3; // Messages per block; 0 = server default (currently 100_000). uint64 block_size = 4; // Optional per-message size cap; 0 = no cap. uint64 max_msg_bytes = 5; // If true, the stream is stored entirely in memory — no redb // file is created. State survives node failover via the // existing replication path but a full-cluster restart loses // it. Matches the NATS JetStream `memory` storage mode. // Immutable after create. bool ephemeral = 6; // Cross-stream sources — this stream pulls messages from each // listed source stream as a tail subscriber and appends them // locally with provenance headers (`waymaker-source-stream`, // `waymaker-source-seq`). See `SOURCES_DESIGN.md`. Slice 1 // accepts at most one entry; the wire is `repeated` for forward // compatibility with slice 2 (multi-source fan-in). repeated StreamSourceConfigPb sources = 7; // Per-subject revision cap. `0` (default) = unbounded — history // bounded only by stream-level retention. When N > 0, after a // successful publish, older messages at that subject beyond the // N most recent are dropped via per-message pruning. Mirrors // NATS JetStream's MaxMsgsPerSubject. Backs KV's max_revisions. uint64 max_msgs_per_subject = 8; } // One source feeding a sourcing stream. Slice 1 honours only // `source_stream`; the remaining fields land in slice 2/3. message StreamSourceConfigPb { string source_stream = 1; // Optional NATS-style filter; empty = pull every subject. // Honoured since slice 2B. string filter_subject = 2; // Start position. 0/0 = pull from beginning (slice 1 default). // start_seq honoured since 2C. start_time_ms reserved (rejected). uint64 start_seq = 3; int64 start_time_ms = 4; // Optional subject rewrite. Slice 3. SubjectTransformPb subject_transform = 5; // Slice 2F: cap on the initial backfill window. When > 0 AND // there's no persisted state for this (sourcing, source), the // tail seeds its watermark at max(0, source.last_seq - // max_initial_backfill) instead of pulling from seq 1. Once // there's persisted state (i.e. after the first batch), this // knob is ignored — the tail resumes from the persisted seq. // Use 0 (default) for "unbounded" (slice 1 behaviour). uint64 max_initial_backfill = 6; // Slice 3: behaviour when the source's retention sweep drops // messages past our last_sourced_seq (we've fallen behind and // the source no longer has the messages we'd next pull). // * ON_DROP_HALT (default, 0): tail surfaces a persistent // error and stops advancing — operator must intervene. // * ON_DROP_SKIP_TO_FIRST_AVAILABLE (1): tail jumps its // watermark to source.first_seq - 1 and resumes, with // a warn event surfaced via last_error for one cycle so // operators can alert on it. OnDropPolicy on_drop = 7; // Slice 3: optional dead-letter stream. When the tail records // an error (subject_transform mismatch, append_failed, // on_drop=halt firing), publish a JSON record describing the // event to this stream so operators can triage without // scraping logs. Empty (default) = no DLQ. string dlq_stream = 8; } enum OnDropPolicy { ON_DROP_HALT = 0; ON_DROP_SKIP_TO_FIRST_AVAILABLE = 1; } message SubjectTransformPb { // NATS-style: e.g. "events.>" with destination "audit.{{wildcard(1)}}". string source_pattern = 1; string destination = 2; } message StreamStatsPb { uint64 last_seq = 1; uint64 msg_count = 2; uint64 bytes = 3; uint64 block_count = 4; // 0 if there are no blocks (empty stream). uint64 first_block = 5; } message MessageHeader { string key = 1; string value = 2; } message MessagePb { uint64 seq = 1; string subject = 2; int64 ts_ms = 3; repeated MessageHeader headers = 4; bytes payload = 5; // Delivery attempt count assigned by the consumer at fetch time. // Populated only for Fetch responses; 0 otherwise. uint32 deliver_count = 6; } enum DeliveryPolicyType { DELIVERY_ALL = 0; DELIVERY_LAST = 1; DELIVERY_BY_START_SEQ = 2; DELIVERY_BY_START_TIME = 3; } message DeliveryPolicyPb { DeliveryPolicyType type = 1; // Used only when type == DELIVERY_BY_START_SEQ. uint64 start_seq = 2; // Used only when type == DELIVERY_BY_START_TIME. Wall-clock ms. int64 start_time_ms = 3; } message ConsumerConfigPb { string name = 1; // Empty = no filter. string filter_subject = 2; DeliveryPolicyPb delivery_policy = 3; // 0 = server default (30s). uint64 ack_wait_ms = 4; // 0 = server default (5). uint32 max_deliver = 5; // Empty = no queue group. string deliver_group = 6; // Phase 2 dead-letter routing. When non-empty, every message // this consumer drops after `max_deliver` attempts is republished // into the same stream under this subject. Original metadata is // preserved as `x-waymaker-dlq-*` headers. Empty = silent drop. // The stream's `subjects_filter` must accept this subject — // operators typically reserve a pattern like `dlq.>` and include // it in the stream's filter. string dead_letter_subject = 7; } message ConsumerStatePb { ConsumerConfigPb config = 1; uint64 ack_floor = 2; uint64 last_delivered = 3; int64 created_at_ms = 4; uint64 redelivered_dropped = 5; } // --------------------------------------------------------------------- // Stream lifecycle // --------------------------------------------------------------------- message CreateStreamRequest { StreamConfigPb config = 1; } message CreateStreamResponse { bool success = 1; string result_code = 2; // "ok" | "already_exists" | "invalid_config" | "internal" string message = 3; } message DeleteStreamRequest { string name = 1; } message DeleteStreamResponse { bool success = 1; string result_code = 2; // "ok" | "no_such_stream" | "internal" string message = 3; } message GetStreamInfoRequest { string name = 1; } message GetStreamInfoResponse { bool success = 1; string result_code = 2; // "ok" | "no_such_stream" string message = 3; StreamConfigPb config = 4; StreamStatsPb stats = 5; // Phase 3 — if a `stream_authority` override is active for this // stream, the routing claimant + the fence epoch at which it was // committed. Unset when the stream routes via the ring's hash // owner. Useful for operators auditing "why is this stream on // node N when the ring says M?". optional StreamAuthorityOverride authority_override = 6; // The ring's hash owner for this stream (ignoring any override). // When `authority_override` is set and `claimant_node_id != // ring_owner_node_id`, the override is actively redirecting // routing. 0 = the response node couldn't compute the ring owner // (e.g. mid-membership-transition). uint64 ring_owner_node_id = 7; // Phase 3 — `true` when an operator has pinned this stream // (auto-GC will not retire its override even when redundant). bool pinned = 8; // Per-source tail state. Populated when this stream has // `sources` set in its config and the request lands on the // sourcing primary. Empty otherwise. repeated SourceStatusPb sources_status = 9; } message SourceStatusPb { string source_stream = 1; // Last seq successfully appended to the sourcing stream. uint64 last_sourced_seq = 2; // Total messages pulled since the tail task started. uint64 pulled_total = 3; // Most recent error message; empty when healthy. string last_error = 4; int64 last_error_ts_ms = 5; } message StreamAuthorityOverride { uint64 claimant_node_id = 1; uint64 fence_epoch = 2; } message ClearStreamAuthorityRequest { string stream = 1; } message ClearStreamAuthorityResponse { bool success = 1; string result_code = 2; // "ok" | "no_leader" | "internal" string message = 3; } message ListStreamAuthorityOverridesRequest {} message ListStreamAuthorityOverridesResponse { bool success = 1; string result_code = 2; string message = 3; repeated AuthorityOverrideEntry entries = 4; } message AuthorityOverrideEntry { string stream = 1; uint64 claimant_node_id = 2; uint64 fence_epoch = 3; } message SetStreamPinnedRequest { string stream = 1; bool pinned = 2; } message SetStreamPinnedResponse { bool success = 1; string result_code = 2; // "ok" | "no_leader" | "internal" string message = 3; } message ListStreamsRequest {} message ListStreamsResponse { repeated string names = 1; } message GetStreamSourcesRequest {} message GetStreamSourcesResponse { bool success = 1; string result_code = 2; // "ok" string message = 3; repeated GetStreamSourcesEntry entries = 4; } message GetStreamSourcesEntry { string sourcing_stream = 1; string source_stream = 2; uint64 last_sourced_seq = 3; uint64 pulled_total = 4; string last_error = 5; int64 last_error_ts_ms = 6; } // Partial-update of the mutable subset of a stream's config. Fields // that are present are applied; absent fields leave the existing // on-disk value unchanged. Setting a Limits bound's optional to 0 is // a valid way to *clear* that bound (equivalent to "no limit"); to // leave it unchanged, omit the field. Immutable fields (name, // subjects_filter, block_size, retention policy type) are not in // this message — changing them requires a delete + recreate. message UpdateStreamRequest { string name = 1; optional uint64 max_age_ms = 2; optional uint64 max_msgs = 3; optional uint64 max_bytes = 4; optional uint64 max_msg_bytes = 5; optional bool strict_limits = 6; } message UpdateStreamResponse { bool success = 1; string result_code = 2; // "ok" | "no_such_stream" | "invalid_config" | "immutable_field" | "internal" string message = 3; // Effective config after the update — what the next GetStreamInfo // would return. Useful for clients that want to confirm what // landed without a follow-up round trip. StreamConfigPb config = 4; // Number of messages the primary pruned to bring stats under the // new bounds. 0 = no prune (raise-only update, or already under). // For drift monitoring. uint64 pruned = 5; } // --------------------------------------------------------------------- // Messages // --------------------------------------------------------------------- message PublishRequest { string stream = 1; string subject = 2; bytes payload = 3; repeated MessageHeader headers = 4; // 0 = server uses wall clock. int64 ts_ms = 5; // Optimistic-concurrency hint. When set, the server only // commits the publish if the latest seq at `subject` matches // `expected_last_seq` (use 0 to require "subject has never been // published to"). On mismatch the response carries // `result_code="wrong_revision"` and `seq` = the current actual // last seq at the subject. Absent / unset = no check. optional uint64 expected_last_seq = 6; } message PublishResponse { bool success = 1; string result_code = 2; // "ok" | "no_such_stream" | "subject_rejected" | "oversize" | "wrong_revision" | "internal" string message = 3; uint64 seq = 4; } message FetchRequest { string stream = 1; string consumer = 2; uint32 batch_size = 3; } message FetchResponse { bool success = 1; string result_code = 2; // "ok" | "no_such_stream" | "no_such_consumer" | "internal" string message = 3; repeated MessagePb messages = 4; } message AckRequest { string stream = 1; string consumer = 2; uint64 seq = 3; } message AckResponse { bool success = 1; string result_code = 2; // "ok" | "no_such_stream" | "no_such_consumer" | "internal" string message = 3; } message NakRequest { string stream = 1; string consumer = 2; uint64 seq = 3; // Wall-clock ms to defer redelivery. 0 = eligible immediately. uint64 delay_ms = 4; } message NakResponse { bool success = 1; string result_code = 2; string message = 3; } message TermRequest { string stream = 1; string consumer = 2; uint64 seq = 3; } message TermResponse { bool success = 1; string result_code = 2; string message = 3; } message InProgressRequest { string stream = 1; string consumer = 2; uint64 seq = 3; } message InProgressResponse { bool success = 1; string result_code = 2; string message = 3; } message SubscribeRequest { string stream = 1; string consumer = 2; // How many messages per server-side fetch. Smaller batches // trade throughput for finer-grained per-message latency. 0 // means use the server default (16). uint32 batch_size = 3; // If true, the server tears down the subscription after the // first fetch returns 0 messages (after the initial backlog // drains). Useful for one-shot replays. Default false — keep // the stream open indefinitely and re-fetch on new appends. bool stop_when_empty = 4; } // Server-streamed events on a Subscribe stream. Currently one // variant — a delivered message — with a tail end-of-stream // signal if the client requested `stop_when_empty`. message SubscribeEvent { oneof event { MessagePb message = 1; SubscribeStopped stopped = 2; } } message SubscribeStopped { string reason = 1; } // --------------------------------------------------------------------- // Consumers // --------------------------------------------------------------------- message CreateConsumerRequest { string stream = 1; ConsumerConfigPb config = 2; } message CreateConsumerResponse { bool success = 1; string result_code = 2; // "ok" | "no_such_stream" | "already_exists" | "invalid_config" | "internal" string message = 3; } message DeleteConsumerRequest { string stream = 1; string consumer = 2; } message DeleteConsumerResponse { bool success = 1; string result_code = 2; // "ok" | "no_such_stream" | "no_such_consumer" | "internal" string message = 3; } message ListConsumersRequest { string stream = 1; } message ListConsumersResponse { bool success = 1; string result_code = 2; // "ok" | "no_such_stream" string message = 3; repeated ConsumerStatePb consumers = 4; } message GetConsumerInfoRequest { string stream = 1; string consumer = 2; } message GetConsumerInfoResponse { bool success = 1; string result_code = 2; // "ok" | "no_such_stream" | "no_such_consumer" string message = 3; ConsumerStatePb consumer = 4; } // --------------------------------------------------------------------- // Rebalancing // --------------------------------------------------------------------- message TransferStreamRequest { string name = 1; } // One chunk of redb bytes plus end-of-stream signalling. The body is // either `data` (a chunk of raw bytes — order-preserving via gRPC's // stream ordering) or `summary` (the final marker carrying totals so // the receiver can sanity-check what it got). Implementations should // stream multiple `data` chunks followed by exactly one `summary`. message TransferStreamChunk { oneof body { bytes data = 1; TransferStreamSummary summary = 2; } } message TransferStreamSummary { uint64 total_bytes = 1; // Last seq seen by the source at the moment of transfer — the // receiver re-opens the file and verifies its stats match, surfacing // any transfer corruption as a load failure. uint64 stream_last_seq = 2; } message MigrateStreamRequest { // Stream to acquire. string name = 1; // Node ID currently holding the data. The receiver opens a // `TransferStream` against this node via the existing proxy channel // pool. Must be a current cluster member. uint64 source_node_id = 2; } message MigrateStreamResponse { bool success = 1; string result_code = 2; // "ok" | "source_busy" | "source_unreachable" | "already_exists" | "transfer_corrupted" | "internal" string message = 3; uint64 total_bytes = 4; uint64 stream_last_seq = 5; } // --- Cluster-wide rebalance — Phase 2 --- message GetClusterStreamStatsRequest { // When set, also include per-stream stats (msg_count, bytes, // last_seq) for every stream on every node. Without this the // response carries only per-node aggregates — much smaller, and // sufficient for skew-based planning. bool include_per_stream = 1; // Internal flag set on the fan-out sub-calls. When `true`, the // receiving node skips fanning out to peers and reports only its // own local registry. The orchestrator's outermost call leaves // this `false` so a single round-trip from an operator pulls the // whole cluster's view. Mirrors the lock proxy's `iteration` cap. bool local_only = 2; } // One stream's stats as seen by its primary node. message PerStreamStats { string name = 1; uint64 owner_node_id = 2; uint64 msg_count = 3; uint64 bytes = 4; uint64 last_seq = 5; } // Per-node summary. Bytes/msg counts are summed across the node's // local streams. message PerNodeSummary { uint64 node_id = 1; uint64 stream_count = 2; uint64 total_msg_count = 3; uint64 total_bytes = 4; // "ok" if the node responded; "unreachable" / "node_standby" / // "internal" otherwise. The aggregator still emits a row per // member node so the operator can see which nodes failed to report. string status = 5; } message GetClusterStreamStatsResponse { bool success = 1; string result_code = 2; // "ok" | "no_leader" | "internal" string message = 3; repeated PerNodeSummary nodes = 4; // Populated when the request set `include_per_stream`. repeated PerStreamStats streams = 5; // Cluster-wide totals + skew. `skew_count` = max stream_count - // min stream_count across responding nodes. `skew_bytes` is the // same in bytes. Both are 0 for a perfectly-balanced cluster. uint64 total_stream_count = 6; uint64 total_msg_count = 7; uint64 total_bytes = 8; uint64 skew_count = 9; uint64 skew_bytes = 10; } message RebalancePlanEntry { string name = 1; uint64 target_node_id = 2; } message RebalanceStreamsRequest { repeated RebalancePlanEntry plan = 1; // Per-step `MigrateStream` timeout, in milliseconds. 0 = server // default (currently 30s). uint64 per_step_timeout_ms = 2; } message RebalanceStepOutcome { string name = 1; uint64 target_node_id = 2; bool success = 3; string result_code = 4; // mirrors MigrateStream codes + "skipped_same_node" / "no_source" string message = 5; } message RebalanceStreamsResponse { bool success = 1; // true iff every step succeeded string result_code = 2; // "ok" | "partial" | "no_plan" | "internal" string message = 3; repeated RebalanceStepOutcome steps = 4; } // --- Admin watch — Phase 2 --- message WatchStreamsRequest { // Currently no filters. A future slice can add subject / name // patterns; today every watcher sees every event the node emits. } // Identifies which kind of state change happened. The full // WatchEvent carries one detail oneof matching this type. enum WatchEventType { WATCH_UNKNOWN = 0; WATCH_STREAM_CREATED = 1; WATCH_STREAM_DELETED = 2; WATCH_STREAM_UPDATED = 3; WATCH_CONSUMER_CREATED = 4; WATCH_CONSUMER_DELETED = 5; // Phase 3 — emitted on every apply of StreamAuthorityClaim or // ClearStreamAuthority. `claimant_node_id == 0` in the detail // distinguishes a clear from a set (since 0 isn't a valid node // id). WATCH_STREAM_AUTHORITY_CHANGED = 6; } message StreamWatchDetail { string name = 1; } message ConsumerWatchDetail { string stream = 1; string consumer = 2; } // Detail carried on WATCH_STREAM_AUTHORITY_CHANGED events. // `claimant_node_id == 0` + `fence_epoch == 0` means the override // was cleared (routing reverts to the ring's hash owner); // otherwise the override is now `(claimant, fence_epoch)`. message AuthorityWatchDetail { string stream = 1; uint64 claimant_node_id = 2; uint64 fence_epoch = 3; } // --- KV / subject-state lookups — Phase 3 --- message ReadLatestAtSubjectRequest { string stream = 1; string subject = 2; } message ReadLatestAtSubjectResponse { bool success = 1; string result_code = 2; // "ok" | "no_such_stream" | "internal" string message = 3; // Unset when no message has ever been published at this // subject. Use the presence of `latest` to distinguish // "subject is empty" from "no such stream" (the latter is in // result_code). optional MessagePb latest = 4; } message ListSubjectsByPrefixRequest { string stream = 1; // Empty prefix matches every subject in the stream. string prefix = 2; } message ListSubjectsByPrefixResponse { bool success = 1; string result_code = 2; // "ok" | "no_such_stream" | "internal" string message = 3; repeated string subjects = 4; } message ScanExactAtSubjectRequest { string stream = 1; string subject = 2; // Start scanning at seq >= `from_seq`. 0 = scan from the // beginning of the stream. uint64 from_seq = 3; // Cap on returned messages. 0 = server default (1000). uint64 limit = 4; } message ScanExactAtSubjectResponse { bool success = 1; string result_code = 2; // "ok" | "no_such_stream" | "internal" string message = 3; // Messages at the subject, in seq order. Empty if the subject // has never been published to, or if the limit returned no // results in the requested range. repeated MessagePb messages = 4; } message WatchEvent { WatchEventType type = 1; // Server wall-clock at emit time (ms since epoch). Useful for // ordering across nodes when a client multiplexes watchers. int64 ts_ms = 2; // The watching node's id. For cluster-wide watch built on top of // per-node streams, the client can deduplicate by (node_id, ts_ms, // type, detail). uint64 node_id = 3; oneof detail { StreamWatchDetail stream = 4; ConsumerWatchDetail consumer = 5; AuthorityWatchDetail authority = 7; } // Set when this watcher fell behind the server's broadcast buffer // and missed events. The receiver should treat this as an // explicit "you missed N events" signal — typically by re-listing // the cluster to catch back up. After this event, the stream // continues with fresh events; client need not reconnect. uint64 lagged_count = 6; } // One pending-delivery entry shipped with a replication snapshot. message PendingDeliveryPb { uint64 seq = 1; int64 delivered_at_ms = 2; uint32 deliver_count = 3; } // Full snapshot of one consumer's state at the moment the primary // committed a fetch/ack/create. Includes the immutable config (so a // secondary that has never seen this consumer can reconstruct it // from this message alone), the floor/last_delivered counters, the // active pending set, the create-time wall-clock, and the running // `redelivered_dropped` total. message ConsumerStateSnapshot { string stream = 1; ConsumerConfigPb config = 2; uint64 ack_floor = 3; uint64 last_delivered = 4; int64 created_at_ms = 5; uint64 redelivered_dropped = 6; repeated PendingDeliveryPb pending = 7; // Whether this snapshot represents a deleted consumer — secondaries // remove the (stream, consumer) entry from their replica store // rather than overwriting it. bool tombstone = 8; } message ReplicateConsumerStateRequest { ConsumerStateSnapshot snapshot = 1; } message ReplicateConsumerStateResponse { bool success = 1; string result_code = 2; // "ok" | "internal" string message = 3; } // One snapshot of a source-tail's persisted progress, pushed from // the primary to each secondary after each successful batch. // `tombstone=true` signals "remove this entry" — sent when the // sourcing stream is deleted so secondaries don't keep stale rows // they might adopt later. message SourceTailStateSnapshot { string sourcing_stream = 1; string source_stream = 2; uint64 last_sourced_seq = 3; uint64 pulled_total = 4; int64 updated_ts_ms = 5; bool tombstone = 6; } message ReplicateSourceTailStateRequest { SourceTailStateSnapshot snapshot = 1; } message ReplicateSourceTailStateResponse { bool success = 1; string result_code = 2; // "ok" | "internal" string message = 3; } message ReplicateStreamCreateRequest { // Same shape as CreateStreamRequest's config — the secondary // opens an identical stream in its replica registry so subsequent // ReplicateMessage calls land in a config-matched file. StreamConfigPb config = 1; } message ReplicateStreamCreateResponse { bool success = 1; string result_code = 2; // "ok" | "already_exists" | "invalid_config" | "internal" string message = 3; } message ReplicateMessageRequest { string stream = 1; // The seq the primary assigned. The secondary applies the message // at this exact seq via `apply_replicated_append` (idempotent on // replay, errors on out-of-order or divergence). uint64 seq = 2; string subject = 3; bytes payload = 4; repeated MessageHeader headers = 5; int64 ts_ms = 6; } message ReplicateMessageResponse { bool success = 1; string result_code = 2; // "ok" | "no_such_stream" | "out_of_order" | "divergence" | "internal" string message = 3; // Receiver's last_seq AFTER applying — primary uses this to detect // when a secondary has fallen behind and needs a `MigrateStream` // re-seed. uint64 receiver_last_seq = 4; } message ReplicateStreamDeleteRequest { string name = 1; } message ReplicateStreamDeleteResponse { bool success = 1; string result_code = 2; // "ok" | "internal" string message = 3; } message ReplicateTruncateRequest { string stream = 1; // Drop every message with seq < first_seq. Also raises the // receiver's `last_seq` to at least `first_seq - 1` so a lagging // secondary aligns with the primary's expected-next-seq for // subsequent replication pushes. uint64 first_seq = 2; } message ReplicateTruncateResponse { bool success = 1; string result_code = 2; // "ok" | "no_such_stream" | "internal" string message = 3; // Number of messages the secondary actually dropped (0 on a no-op // / idempotent re-call). For drift monitoring. uint64 dropped = 4; } // Mirror of UpdateStreamRequest sent from the primary to each // secondary after a successful UpdateStream. Same partial-update // semantics: absent fields leave the secondary's on-disk value // unchanged. The accompanying prune (if any) is replicated via the // existing ReplicateTruncate path — this message carries only the // config change. message ReplicateStreamUpdateRequest { string name = 1; optional uint64 max_age_ms = 2; optional uint64 max_msgs = 3; optional uint64 max_bytes = 4; optional uint64 max_msg_bytes = 5; optional bool strict_limits = 6; } message ReplicateStreamUpdateResponse { bool success = 1; string result_code = 2; // "ok" | "no_such_stream" | "internal" string message = 3; } message ReplicateWorkQueueAckRequest { string stream = 1; uint64 seq = 2; } message ReplicateWorkQueueAckResponse { bool success = 1; string result_code = 2; // "ok" | "no_such_stream" | "internal" string message = 3; // Whether the secondary's replica had the seq present before the // delete (the operation is idempotent, so `false` here is normal // for a retry / late-arriving call). bool was_present = 4; } // ---- Phase 4 — Object Store messages ---- // Metadata about a stored object. Sent back on Get/Info/List; the // server reconstructs this from the `objm.` message body // (JSON-encoded) plus the message seq. Treat this message as a // blob description, not a payload — payload is fetched via // GetObject. message ObjectInfo { // Object name (the part after the bucket prefix). string name = 1; // Total payload bytes across all chunks (after assembly). uint64 total_bytes = 2; // Bytes per chunk (last chunk may be smaller). 0 for empty // objects. uint64 chunk_size = 3; // Number of `objc..` messages required to reconstitute // the payload. 0 for empty objects. uint64 chunk_count = 4; // SHA-256 of the assembled payload, hex-encoded. Set by the // server; verified by Get. string sha256 = 5; // Server wall-clock at metadata-publish time (ms since epoch). int64 ts_ms = 6; // Opaque headers the client attached at Put time. Preserved // verbatim on Get. repeated MessageHeader headers = 7; // The metadata message's seq number — doubles as the object // revision id. A second Put with the same name bumps it. uint64 metadata_seq = 8; // Phase 5 — `true` when the object was Put with `dedupe=true`. // Chunks are stored at `obj_chunk.` (shared across // objects in the bucket); `false` for legacy `objc..`. bool deduped = 9; } message PutObjectRequest { string bucket = 1; string name = 2; bytes payload = 3; // Bytes per chunk. 0 = server default (1 MiB). uint64 chunk_size = 4; // Optional headers — preserved verbatim in the metadata blob. repeated MessageHeader headers = 5; // Optional pre-computed SHA-256 hex; the server verifies after // chunking + before publishing metadata. Empty = the server // computes its own hash from the payload. string sha256 = 6; // Phase 5 cross-object dedupe. When set, each chunk is hashed // and stored at the content-addressed subject `obj_chunk.`; // identical content across objects shares storage. Metadata // records the chunk hashes in order so Get can re-assemble. // See `waymaker-streams/DEDUPE_DESIGN.md`. bool dedupe = 7; } message PutObjectResponse { bool success = 1; string result_code = 2; // "ok" | "no_such_bucket" | "internal" | "sha_mismatch" string message = 3; ObjectInfo info = 4; } // Streaming Put — first frame sets `start`; subsequent frames // carry `data`. Each non-empty `data` becomes one chunk message // in seq order. Last frame sets `finish=true` so the server // commits metadata; closing the stream without `finish=true` // leaves the upload aborted (orphan chunks). message PutObjectStreamFrame { optional PutObjectStart start = 1; bytes data = 2; bool finish = 3; } message PutObjectStart { string bucket = 1; string name = 2; // Bytes per chunk. 0 = server default. Note: with streaming Put // the client controls chunk boundaries by frame size — this // field is purely metadata-recorded, not used to re-chunk. uint64 chunk_size = 3; repeated MessageHeader headers = 4; // Optional SHA-256 hex. Server verifies against the running // hash before committing metadata; mismatch aborts the Put // (chunks already published become orphan; GC reclaims them). string sha256 = 5; // Phase 5 cross-object dedupe. When set, each chunk is hashed // and stored at the content-addressed subject `obj_chunk.`; // identical content across objects shares storage. See // `waymaker-streams/DEDUPE_DESIGN.md`. bool dedupe = 6; } message GetObjectRequest { string bucket = 1; string name = 2; } message GetObjectResponse { bool success = 1; string result_code = 2; // "ok" | "no_such_bucket" | "no_such_object" | "incomplete" | "internal" | "sha_mismatch" string message = 3; ObjectInfo info = 4; bytes payload = 5; } // Streaming Get — first frame carries `info` (metadata only, no // data); subsequent frames carry `data` (one per chunk). // Final frame sets `done=true`. The server stops streaming on // the first error; in particular `sha_mismatch` is sent as a // gRPC Status (Aborted), not as a result_code in a frame. message GetObjectStreamFrame { optional ObjectInfo info = 1; bytes data = 2; bool done = 3; } message DeleteObjectRequest { string bucket = 1; string name = 2; } message DeleteObjectResponse { bool success = 1; string result_code = 2; // "ok" | "no_such_bucket" | "internal" string message = 3; // Tombstone metadata seq, useful for client confirmations. uint64 tombstone_seq = 4; } message GetObjectInfoRequest { string bucket = 1; string name = 2; } message GetObjectInfoResponse { bool success = 1; string result_code = 2; // "ok" | "no_such_bucket" | "no_such_object" | "internal" string message = 3; // Unset when the object name has no live metadata (never put, // or tombstoned). optional ObjectInfo info = 4; // True if the latest metadata is a tombstone (logical delete). bool deleted = 5; } message ListObjectsRequest { string bucket = 1; // Optional name prefix filter (no leading `objm.` — pass just // the object-name prefix). string name_prefix = 2; // Include tombstoned entries? Default false. bool include_deleted = 3; } message ListObjectsResponse { bool success = 1; string result_code = 2; // "ok" | "no_such_bucket" | "internal" string message = 3; repeated ObjectListEntry entries = 4; } message ObjectListEntry { string name = 1; uint64 total_bytes = 2; bool deleted = 3; } message ListObjectRevisionsRequest { string bucket = 1; string name = 2; // Start scanning at metadata seq >= `from_seq`. 0 = beginning. uint64 from_seq = 3; // Cap on returned revisions. 0 = server default (100). uint64 limit = 4; } message ListObjectRevisionsResponse { bool success = 1; string result_code = 2; // "ok" | "no_such_bucket" | "internal" string message = 3; repeated ObjectRevisionEntry revisions = 4; } message GetObjectRangeRequest { string bucket = 1; string name = 2; uint64 offset = 3; // Bytes to return. 0 = whole tail (`total_bytes - offset`). uint64 len = 4; } message GetObjectRangeResponse { bool success = 1; string result_code = 2; // "ok" | "no_such_bucket" | "no_such_object" | "incomplete" | "internal" string message = 3; // The full object's info (size, hash, etc.). Useful for the // client to know the total size when paginating. ObjectInfo info = 4; // Bytes [offset, offset + actual_len). `actual_len` may be less // than the requested `len` when the range extends past the // object's end. uint64 actual_offset = 5; bytes payload = 6; } message ObjectRevisionEntry { // Metadata message seq — doubles as the revision id. uint64 metadata_seq = 1; // Always present, including for tombstones (where `deleted=true` // and the other fields fall back to 0/empty). bool deleted = 2; uint64 total_bytes = 3; uint64 chunk_count = 4; string sha256 = 5; int64 ts_ms = 6; } // ===== KV ===================================================== message KvCreateBucketRequest { string bucket = 1; uint64 max_bytes = 2; // 0 = unbounded uint64 max_value_size = 3; // 0 = no per-value cap // Bucket-level TTL (ms). 0 = no time-based eviction. // Bucket-level TTL is independent of per-key TTL set via KvPut. uint64 max_age_ms = 4; bool ephemeral = 5; } message KvCreateBucketResponse { bool success = 1; string result_code = 2; // "ok" | "already_exists" | "invalid_config" | "internal" string message = 3; } message KvDeleteBucketRequest { string bucket = 1; } message KvDeleteBucketResponse { bool success = 1; string result_code = 2; // "ok" | "no_such_bucket" | "internal" string message = 3; } message KvPutRequest { string bucket = 1; string key = 2; bytes value = 3; // Per-key TTL in milliseconds. 0 = no TTL. uint64 ttl_ms = 4; } message KvCreateRequest { string bucket = 1; string key = 2; bytes value = 3; uint64 ttl_ms = 4; } message KvUpdateRequest { string bucket = 1; string key = 2; bytes value = 3; // The revision the caller believes is current. Server returns // wrong_revision if mismatch. uint64 expected_revision = 4; uint64 ttl_ms = 5; } message KvPutResponse { bool success = 1; // "ok" | "no_such_bucket" | "wrong_revision" | "invalid_key" | "internal" string result_code = 2; string message = 3; // Assigned revision (stream sequence) of the newly-written // value. On wrong_revision, this is the *current* server-side // revision the caller can retry against. uint64 revision = 4; } message KvGetRequest { string bucket = 1; string key = 2; } message KvGetResponse { bool success = 1; string result_code = 2; // "ok" | "no_such_bucket" | "internal" string message = 3; // Unset when the key has no value or is tombstoned. optional KvEntry entry = 4; } message KvEntry { bytes value = 1; uint64 revision = 2; int64 ts_ms = 3; } message KvDeleteRequest { string bucket = 1; string key = 2; } message KvDeleteResponse { bool success = 1; string result_code = 2; // "ok" | "no_such_bucket" | "internal" string message = 3; uint64 revision = 4; } message KvKeysRequest { string bucket = 1; } message KvKeysResponse { bool success = 1; string result_code = 2; string message = 3; repeated KvKeyEntry entries = 4; } message KvKeyEntry { string key = 1; uint64 revision = 2; // True if the latest message at this key is a tombstone. bool deleted = 3; } message KvHistoryRequest { string bucket = 1; string key = 2; uint64 from_revision = 3; // 0 = from beginning uint64 limit = 4; // 0 = server default } message KvHistoryResponse { bool success = 1; string result_code = 2; string message = 3; repeated KvHistoryEntry entries = 4; } message KvHistoryEntry { bytes value = 1; uint64 revision = 2; int64 ts_ms = 3; bool deleted = 4; } message KvTouchRequest { string bucket = 1; string key = 2; uint64 ttl_ms = 3; } message KvWatchRequest { string bucket = 1; // Empty = watch every key in the bucket. Non-empty = watch only // this key. string key = 2; } message KvWatchEvent { oneof event { KvPutEvent put = 1; KvDeleteEvent delete = 2; } } message KvPutEvent { string key = 1; bytes value = 2; uint64 revision = 3; int64 ts_ms = 4; } message KvDeleteEvent { string key = 1; uint64 revision = 2; int64 ts_ms = 3; } // ===== Cache::Hash ============================================ message CreateHashStoreRequest { string name = 1; uint64 max_bytes = 2; bool ephemeral = 3; } message CreateHashStoreResponse { bool success = 1; string result_code = 2; string message = 3; } message DeleteHashStoreRequest { string name = 1; } message DeleteHashStoreResponse { bool success = 1; string result_code = 2; string message = 3; } message HashSetRequest { string bucket = 1; string hash_key = 2; string field = 3; bytes value = 4; } message HashSetResponse { bool success = 1; string result_code = 2; string message = 3; uint64 revision = 4; } message HashGetRequest { string bucket = 1; string hash_key = 2; string field = 3; } message HashGetResponse { bool success = 1; string result_code = 2; string message = 3; // Unset when the field has no value or is tombstoned. optional bytes value = 4; uint64 revision = 5; } message HashExistsRequest { string bucket = 1; string hash_key = 2; string field = 3; } message HashExistsResponse { bool success = 1; string result_code = 2; string message = 3; bool exists = 4; } message HashDeleteRequest { string bucket = 1; string hash_key = 2; string field = 3; } message HashDeleteResponse { bool success = 1; string result_code = 2; string message = 3; } message HashGetAllRequest { string bucket = 1; string hash_key = 2; } message HashGetAllResponse { bool success = 1; string result_code = 2; string message = 3; repeated HashFieldEntry entries = 4; } message HashFieldEntry { string field = 1; bytes value = 2; uint64 revision = 3; } message HashFieldsRequest { string bucket = 1; string hash_key = 2; } message HashFieldsResponse { bool success = 1; string result_code = 2; string message = 3; repeated string fields = 4; } message HashLenRequest { string bucket = 1; string hash_key = 2; } message HashLenResponse { bool success = 1; string result_code = 2; string message = 3; uint64 count = 4; } // ===== Cache::Set ============================================= message CreateSetStoreRequest { string name = 1; uint64 max_bytes = 2; bool ephemeral = 3; } message CreateSetStoreResponse { bool success = 1; string result_code = 2; string message = 3; } message DeleteSetStoreRequest { string name = 1; } message DeleteSetStoreResponse { bool success = 1; string result_code = 2; string message = 3; } message SetAddRequest { string bucket = 1; string set_key = 2; string member = 3; } message SetAddResponse { bool success = 1; string result_code = 2; string message = 3; } message SetRemoveRequest { string bucket = 1; string set_key = 2; string member = 3; } message SetRemoveResponse { bool success = 1; string result_code = 2; string message = 3; } message SetIsMemberRequest { string bucket = 1; string set_key = 2; string member = 3; } message SetIsMemberResponse { bool success = 1; string result_code = 2; string message = 3; bool is_member = 4; } message SetMembersRequest { string bucket = 1; string set_key = 2; } message SetMembersResponse { bool success = 1; string result_code = 2; string message = 3; repeated string members = 4; } message SetLenRequest { string bucket = 1; string set_key = 2; } message SetLenResponse { bool success = 1; string result_code = 2; string message = 3; uint64 count = 4; } // ===== Cache::Queue =========================================== message CreateQueueRequest { string name = 1; uint64 max_bytes = 2; uint64 max_messages = 3; bool ephemeral = 4; } message CreateQueueResponse { bool success = 1; string result_code = 2; string message = 3; } message DeleteQueueRequest { string name = 1; } message DeleteQueueResponse { bool success = 1; string result_code = 2; string message = 3; } message QueuePushRequest { string bucket = 1; bytes value = 2; } message QueuePushResponse { bool success = 1; string result_code = 2; string message = 3; uint64 sequence = 4; } message QueuePopRequest { string bucket = 1; } message QueuePopResponse { bool success = 1; string result_code = 2; string message = 3; // Unset when the queue is empty. optional bytes value = 4; } message QueueRangeRequest { string bucket = 1; uint64 from_sequence = 2; uint64 limit = 3; } message QueueRangeResponse { bool success = 1; string result_code = 2; string message = 3; repeated bytes values = 4; } message QueueLenRequest { string bucket = 1; } message QueueLenResponse { bool success = 1; string result_code = 2; string message = 3; uint64 count = 4; }