waymaker-client/go/sketches.go

502 lines
14 KiB
Go

package waymaker
// Sketches subsystem — Bloom filter, HyperLogLog, Count-Min Sketch, Top-K,
// t-digest. Thin RPC bindings over WaymakerSketchesService.
//
// Entry points on *Client:
// Bloom: CreateBloom / Bloom(name) / DeleteBloom
// HLL: CreateHLL / HLL(name) / DeleteHLL
// CMS: CreateCMS / CMS(name) / DeleteCMS
// TopK: CreateTopK / TopK(name) / DeleteTopK
// TDigest: CreateTDigest / TDigest(name) / DeleteTDigest
import (
"context"
pb "git.awesomike.com/pub/waymaker-client/go/genpb/sketches"
)
// ============================================================
// Bloom filter
// ============================================================
// BloomConfig is the Bloom filter creation config.
type BloomConfig struct {
Name string
Capacity uint64
// ErrorRate is the target false-positive rate (e.g. 0.01 for 1%).
// 0 = server default (0.01).
ErrorRate float64
}
// BloomInfo is diagnostic information about a Bloom filter.
type BloomInfo struct {
Capacity uint64
ErrorRate float64
BitsSet uint64
BitCount uint64
HashCount uint32
ItemsAdded uint64
}
// Bloom is a reference to a Bloom filter.
type Bloom struct {
client *Client
Name string
}
func newBloom(c *Client, name string) *Bloom { return &Bloom{client: c, Name: name} }
// Add adds item to the filter.
func (b *Bloom) Add(ctx context.Context, item []byte) error {
c := b.client.sketchesClient()
r, err := c.BloomAdd(ctx, &pb.BloomAddRequest{Name: b.Name, Item: item})
if err != nil {
return rpcErr(err)
}
if !r.GetSuccess() {
return serverErr(r.GetResultCode(), r.GetMessage())
}
return nil
}
// AddMany adds multiple items.
func (b *Bloom) AddMany(ctx context.Context, items [][]byte) error {
c := b.client.sketchesClient()
r, err := c.BloomMultiAdd(ctx, &pb.BloomMultiAddRequest{Name: b.Name, Items: items})
if err != nil {
return rpcErr(err)
}
if !r.GetSuccess() {
return serverErr(r.GetResultCode(), r.GetMessage())
}
return nil
}
// Exists tests membership. true = probably present; false = definitely absent.
func (b *Bloom) Exists(ctx context.Context, item []byte) (bool, error) {
c := b.client.sketchesClient()
r, err := c.BloomExists(ctx, &pb.BloomExistsRequest{Name: b.Name, Item: item})
if err != nil {
return false, rpcErr(err)
}
if !r.GetSuccess() {
return false, serverErr(r.GetResultCode(), r.GetMessage())
}
return r.GetExists(), nil
}
// ExistsMany tests membership for multiple items.
func (b *Bloom) ExistsMany(ctx context.Context, items [][]byte) ([]bool, error) {
c := b.client.sketchesClient()
r, err := c.BloomMultiExists(ctx, &pb.BloomMultiExistsRequest{Name: b.Name, Items: items})
if err != nil {
return nil, rpcErr(err)
}
if !r.GetSuccess() {
return nil, serverErr(r.GetResultCode(), r.GetMessage())
}
return r.GetExists(), nil
}
// Info returns diagnostic info about the filter.
func (b *Bloom) Info(ctx context.Context) (BloomInfo, error) {
c := b.client.sketchesClient()
r, err := c.BloomInfo(ctx, &pb.BloomInfoRequest{Name: b.Name})
if err != nil {
return BloomInfo{}, rpcErr(err)
}
if !r.GetSuccess() {
return BloomInfo{}, serverErr(r.GetResultCode(), r.GetMessage())
}
return BloomInfo{
Capacity: r.GetCapacity(),
ErrorRate: r.GetErrorRate(),
BitsSet: r.GetBitsSet(),
BitCount: r.GetBitCount(),
HashCount: r.GetHashCount(),
ItemsAdded: r.GetItemsAdded(),
}, nil
}
func (c *Client) CreateBloom(ctx context.Context, config BloomConfig) (*Bloom, error) {
sc := c.sketchesClient()
r, err := sc.BloomReserve(ctx, &pb.BloomReserveRequest{
Name: config.Name,
Capacity: config.Capacity,
ErrorRate: config.ErrorRate,
})
if err != nil {
return nil, rpcErr(err)
}
if !r.GetSuccess() {
return nil, serverErr(r.GetResultCode(), r.GetMessage())
}
return newBloom(c, config.Name), nil
}
func (c *Client) BloomHandle(name string) *Bloom { return newBloom(c, name) }
func (c *Client) DeleteBloom(ctx context.Context, name string) error {
sc := c.sketchesClient()
r, err := sc.BloomDelete(ctx, &pb.BloomDeleteRequest{Name: name})
if err != nil {
return rpcErr(err)
}
if !r.GetSuccess() {
return serverErr(r.GetResultCode(), r.GetMessage())
}
return nil
}
// ============================================================
// HyperLogLog
// ============================================================
// HLLConfig is the HLL creation config.
type HLLConfig struct {
Name string
// Precision controls register count: 2^Precision. Valid range 4..18.
// 0 = server default (14, ~1% error).
Precision uint32
}
// HLL is a reference to a HyperLogLog.
type HLL struct {
client *Client
Name string
}
func newHLL(c *Client, name string) *HLL { return &HLL{client: c, Name: name} }
// Add adds items to the estimator.
func (h *HLL) Add(ctx context.Context, items [][]byte) error {
c := h.client.sketchesClient()
r, err := c.HllAdd(ctx, &pb.HllAddRequest{Name: h.Name, Items: items})
if err != nil {
return rpcErr(err)
}
if !r.GetSuccess() {
return serverErr(r.GetResultCode(), r.GetMessage())
}
return nil
}
// Count returns the estimated cardinality.
func (h *HLL) Count(ctx context.Context) (uint64, error) {
c := h.client.sketchesClient()
r, err := c.HllCount(ctx, &pb.HllCountRequest{Name: h.Name})
if err != nil {
return 0, rpcErr(err)
}
if !r.GetSuccess() {
return 0, serverErr(r.GetResultCode(), r.GetMessage())
}
return r.GetEstimate(), nil
}
// MergeFrom merges sources into this HLL (union of registers).
func (h *HLL) MergeFrom(ctx context.Context, sources []string) error {
c := h.client.sketchesClient()
r, err := c.HllMerge(ctx, &pb.HllMergeRequest{Destination: h.Name, Sources: sources})
if err != nil {
return rpcErr(err)
}
if !r.GetSuccess() {
return serverErr(r.GetResultCode(), r.GetMessage())
}
return nil
}
func (c *Client) CreateHLL(ctx context.Context, config HLLConfig) (*HLL, error) {
sc := c.sketchesClient()
r, err := sc.HllReserve(ctx, &pb.HllReserveRequest{Name: config.Name, Precision: config.Precision})
if err != nil {
return nil, rpcErr(err)
}
if !r.GetSuccess() {
return nil, serverErr(r.GetResultCode(), r.GetMessage())
}
return newHLL(c, config.Name), nil
}
func (c *Client) HLLHandle(name string) *HLL { return newHLL(c, name) }
func (c *Client) DeleteHLL(ctx context.Context, name string) error {
sc := c.sketchesClient()
r, err := sc.HllDelete(ctx, &pb.HllDeleteRequest{Name: name})
if err != nil {
return rpcErr(err)
}
if !r.GetSuccess() {
return serverErr(r.GetResultCode(), r.GetMessage())
}
return nil
}
// ============================================================
// Count-Min Sketch
// ============================================================
// CMSConfig is the CMS creation config.
type CMSConfig struct {
Name string
Width uint64 // 0 = server default
Depth uint64 // 0 = server default
}
// CMSIncrItem is one (item, count) pair for CMS.Incr.
type CMSIncrItem struct {
Item []byte
Count uint64
}
// CMS is a reference to a Count-Min Sketch.
type CMS struct {
client *Client
Name string
}
func newCMS(c *Client, name string) *CMS { return &CMS{client: c, Name: name} }
// Incr increments counts. Returns one estimate per input item.
func (c *CMS) Incr(ctx context.Context, items []CMSIncrItem) ([]uint64, error) {
sc := c.client.sketchesClient()
pbItems := make([]*pb.CmsIncrByItem, len(items))
for i, it := range items {
pbItems[i] = &pb.CmsIncrByItem{Item: it.Item, Count: it.Count}
}
r, err := sc.CmsIncrBy(ctx, &pb.CmsIncrByRequest{Name: c.Name, Items: pbItems})
if err != nil {
return nil, rpcErr(err)
}
if !r.GetSuccess() {
return nil, serverErr(r.GetResultCode(), r.GetMessage())
}
return r.GetCounts(), nil
}
// Query returns estimated frequencies for items.
func (c *CMS) Query(ctx context.Context, items [][]byte) ([]uint64, error) {
sc := c.client.sketchesClient()
r, err := sc.CmsQuery(ctx, &pb.CmsQueryRequest{Name: c.Name, Items: items})
if err != nil {
return nil, rpcErr(err)
}
if !r.GetSuccess() {
return nil, serverErr(r.GetResultCode(), r.GetMessage())
}
return r.GetCounts(), nil
}
func (c *Client) CreateCMS(ctx context.Context, config CMSConfig) (*CMS, error) {
sc := c.sketchesClient()
r, err := sc.CmsReserve(ctx, &pb.CmsReserveRequest{Name: config.Name, Width: config.Width, Depth: config.Depth})
if err != nil {
return nil, rpcErr(err)
}
if !r.GetSuccess() {
return nil, serverErr(r.GetResultCode(), r.GetMessage())
}
return newCMS(c, config.Name), nil
}
func (c *Client) CMSHandle(name string) *CMS { return newCMS(c, name) }
func (c *Client) DeleteCMS(ctx context.Context, name string) error {
sc := c.sketchesClient()
r, err := sc.CmsDelete(ctx, &pb.CmsDeleteRequest{Name: name})
if err != nil {
return rpcErr(err)
}
if !r.GetSuccess() {
return serverErr(r.GetResultCode(), r.GetMessage())
}
return nil
}
// ============================================================
// Top-K
// ============================================================
// TopKConfig is the Top-K creation config.
type TopKConfig struct {
Name string
K uint32
Width uint64 // 0 = server default
Depth uint64 // 0 = server default
Decay float64 // 0 = server default
}
// TopKListEntry is one (item, count) in the top-K list.
type TopKListEntry struct {
Item []byte
Count uint64
}
// TopK is a reference to a Top-K sketch.
type TopK struct {
client *Client
Name string
}
func newTopK(c *Client, name string) *TopK { return &TopK{client: c, Name: name} }
// Add adds items. Returns one evicted item per slot (empty bytes if none).
func (t *TopK) Add(ctx context.Context, items [][]byte) ([][]byte, error) {
c := t.client.sketchesClient()
r, err := c.TopKAdd(ctx, &pb.TopKAddRequest{Name: t.Name, Items: items})
if err != nil {
return nil, rpcErr(err)
}
if !r.GetSuccess() {
return nil, serverErr(r.GetResultCode(), r.GetMessage())
}
return r.GetEvicted(), nil
}
// Query tests whether items are in the top-K list.
func (t *TopK) Query(ctx context.Context, items [][]byte) ([]bool, error) {
c := t.client.sketchesClient()
r, err := c.TopKQuery(ctx, &pb.TopKQueryRequest{Name: t.Name, Items: items})
if err != nil {
return nil, rpcErr(err)
}
if !r.GetSuccess() {
return nil, serverErr(r.GetResultCode(), r.GetMessage())
}
return r.GetInTopK(), nil
}
// List returns the current top-K list.
func (t *TopK) List(ctx context.Context) ([]TopKListEntry, error) {
c := t.client.sketchesClient()
r, err := c.TopKList(ctx, &pb.TopKListRequest{Name: t.Name})
if err != nil {
return nil, rpcErr(err)
}
if !r.GetSuccess() {
return nil, serverErr(r.GetResultCode(), r.GetMessage())
}
out := make([]TopKListEntry, len(r.GetEntries()))
for i, e := range r.GetEntries() {
out[i] = TopKListEntry{Item: e.GetItem(), Count: e.GetCount()}
}
return out, nil
}
func (c *Client) CreateTopK(ctx context.Context, config TopKConfig) (*TopK, error) {
sc := c.sketchesClient()
r, err := sc.TopKReserve(ctx, &pb.TopKReserveRequest{
Name: config.Name,
K: config.K,
Width: config.Width,
Depth: config.Depth,
Decay: config.Decay,
})
if err != nil {
return nil, rpcErr(err)
}
if !r.GetSuccess() {
return nil, serverErr(r.GetResultCode(), r.GetMessage())
}
return newTopK(c, config.Name), nil
}
func (c *Client) TopKHandle(name string) *TopK { return newTopK(c, name) }
func (c *Client) DeleteTopK(ctx context.Context, name string) error {
sc := c.sketchesClient()
r, err := sc.TopKDelete(ctx, &pb.TopKDeleteRequest{Name: name})
if err != nil {
return rpcErr(err)
}
if !r.GetSuccess() {
return serverErr(r.GetResultCode(), r.GetMessage())
}
return nil
}
// ============================================================
// t-digest
// ============================================================
// TDigestConfig is the t-digest creation config.
type TDigestConfig struct {
Name string
Compression uint32 // 0 = server default
}
// TDigest is a reference to a t-digest quantile sketch.
type TDigest struct {
client *Client
Name string
}
func newTDigest(c *Client, name string) *TDigest { return &TDigest{client: c, Name: name} }
// Add adds values to the sketch.
func (t *TDigest) Add(ctx context.Context, values []float64) error {
c := t.client.sketchesClient()
r, err := c.TDigestAdd(ctx, &pb.TDigestAddRequest{Name: t.Name, Values: values})
if err != nil {
return rpcErr(err)
}
if !r.GetSuccess() {
return serverErr(r.GetResultCode(), r.GetMessage())
}
return nil
}
// Quantile computes quantile estimates. One result per input quantile.
func (t *TDigest) Quantile(ctx context.Context, quantiles []float64) ([]float64, error) {
c := t.client.sketchesClient()
r, err := c.TDigestQuantile(ctx, &pb.TDigestQuantileRequest{Name: t.Name, Quantiles: quantiles})
if err != nil {
return nil, rpcErr(err)
}
if !r.GetSuccess() {
return nil, serverErr(r.GetResultCode(), r.GetMessage())
}
return r.GetValues(), nil
}
// MinMax returns the minimum and maximum observed values.
func (t *TDigest) MinMax(ctx context.Context) (min, max float64, err error) {
c := t.client.sketchesClient()
r, err := c.TDigestMinMax(ctx, &pb.TDigestMinMaxRequest{Name: t.Name})
if err != nil {
return 0, 0, rpcErr(err)
}
if !r.GetSuccess() {
return 0, 0, serverErr(r.GetResultCode(), r.GetMessage())
}
return r.GetMin(), r.GetMax(), nil
}
func (c *Client) CreateTDigest(ctx context.Context, config TDigestConfig) (*TDigest, error) {
sc := c.sketchesClient()
r, err := sc.TDigestCreate(ctx, &pb.TDigestCreateRequest{Name: config.Name, Compression: config.Compression})
if err != nil {
return nil, rpcErr(err)
}
if !r.GetSuccess() {
return nil, serverErr(r.GetResultCode(), r.GetMessage())
}
return newTDigest(c, config.Name), nil
}
func (c *Client) TDigestHandle(name string) *TDigest { return newTDigest(c, name) }
func (c *Client) DeleteTDigest(ctx context.Context, name string) error {
sc := c.sketchesClient()
r, err := sc.TDigestDelete(ctx, &pb.TDigestDeleteRequest{Name: name})
if err != nil {
return rpcErr(err)
}
if !r.GetSuccess() {
return serverErr(r.GetResultCode(), r.GetMessage())
}
return nil
}