package waymaker // Sketches subsystem — Bloom filter, HyperLogLog, Count-Min Sketch, Top-K, // t-digest. Thin RPC bindings over WaymakerSketchesService. // // Entry points on *Client: // Bloom: CreateBloom / Bloom(name) / DeleteBloom // HLL: CreateHLL / HLL(name) / DeleteHLL // CMS: CreateCMS / CMS(name) / DeleteCMS // TopK: CreateTopK / TopK(name) / DeleteTopK // TDigest: CreateTDigest / TDigest(name) / DeleteTDigest import ( "context" pb "git.awesomike.com/pub/waymaker-client/go/genpb/sketches" ) // ============================================================ // Bloom filter // ============================================================ // BloomConfig is the Bloom filter creation config. type BloomConfig struct { Name string Capacity uint64 // ErrorRate is the target false-positive rate (e.g. 0.01 for 1%). // 0 = server default (0.01). ErrorRate float64 } // BloomInfo is diagnostic information about a Bloom filter. type BloomInfo struct { Capacity uint64 ErrorRate float64 BitsSet uint64 BitCount uint64 HashCount uint32 ItemsAdded uint64 } // Bloom is a reference to a Bloom filter. type Bloom struct { client *Client Name string } func newBloom(c *Client, name string) *Bloom { return &Bloom{client: c, Name: name} } // Add adds item to the filter. func (b *Bloom) Add(ctx context.Context, item []byte) error { c := b.client.sketchesClient() r, err := c.BloomAdd(ctx, &pb.BloomAddRequest{Name: b.Name, Item: item}) if err != nil { return rpcErr(err) } if !r.GetSuccess() { return serverErr(r.GetResultCode(), r.GetMessage()) } return nil } // AddMany adds multiple items. func (b *Bloom) AddMany(ctx context.Context, items [][]byte) error { c := b.client.sketchesClient() r, err := c.BloomMultiAdd(ctx, &pb.BloomMultiAddRequest{Name: b.Name, Items: items}) if err != nil { return rpcErr(err) } if !r.GetSuccess() { return serverErr(r.GetResultCode(), r.GetMessage()) } return nil } // Exists tests membership. true = probably present; false = definitely absent. func (b *Bloom) Exists(ctx context.Context, item []byte) (bool, error) { c := b.client.sketchesClient() r, err := c.BloomExists(ctx, &pb.BloomExistsRequest{Name: b.Name, Item: item}) if err != nil { return false, rpcErr(err) } if !r.GetSuccess() { return false, serverErr(r.GetResultCode(), r.GetMessage()) } return r.GetExists(), nil } // ExistsMany tests membership for multiple items. func (b *Bloom) ExistsMany(ctx context.Context, items [][]byte) ([]bool, error) { c := b.client.sketchesClient() r, err := c.BloomMultiExists(ctx, &pb.BloomMultiExistsRequest{Name: b.Name, Items: items}) if err != nil { return nil, rpcErr(err) } if !r.GetSuccess() { return nil, serverErr(r.GetResultCode(), r.GetMessage()) } return r.GetExists(), nil } // Info returns diagnostic info about the filter. func (b *Bloom) Info(ctx context.Context) (BloomInfo, error) { c := b.client.sketchesClient() r, err := c.BloomInfo(ctx, &pb.BloomInfoRequest{Name: b.Name}) if err != nil { return BloomInfo{}, rpcErr(err) } if !r.GetSuccess() { return BloomInfo{}, serverErr(r.GetResultCode(), r.GetMessage()) } return BloomInfo{ Capacity: r.GetCapacity(), ErrorRate: r.GetErrorRate(), BitsSet: r.GetBitsSet(), BitCount: r.GetBitCount(), HashCount: r.GetHashCount(), ItemsAdded: r.GetItemsAdded(), }, nil } func (c *Client) CreateBloom(ctx context.Context, config BloomConfig) (*Bloom, error) { sc := c.sketchesClient() r, err := sc.BloomReserve(ctx, &pb.BloomReserveRequest{ Name: config.Name, Capacity: config.Capacity, ErrorRate: config.ErrorRate, }) if err != nil { return nil, rpcErr(err) } if !r.GetSuccess() { return nil, serverErr(r.GetResultCode(), r.GetMessage()) } return newBloom(c, config.Name), nil } func (c *Client) BloomHandle(name string) *Bloom { return newBloom(c, name) } func (c *Client) DeleteBloom(ctx context.Context, name string) error { sc := c.sketchesClient() r, err := sc.BloomDelete(ctx, &pb.BloomDeleteRequest{Name: name}) if err != nil { return rpcErr(err) } if !r.GetSuccess() { return serverErr(r.GetResultCode(), r.GetMessage()) } return nil } // ============================================================ // HyperLogLog // ============================================================ // HLLConfig is the HLL creation config. type HLLConfig struct { Name string // Precision controls register count: 2^Precision. Valid range 4..18. // 0 = server default (14, ~1% error). Precision uint32 } // HLL is a reference to a HyperLogLog. type HLL struct { client *Client Name string } func newHLL(c *Client, name string) *HLL { return &HLL{client: c, Name: name} } // Add adds items to the estimator. func (h *HLL) Add(ctx context.Context, items [][]byte) error { c := h.client.sketchesClient() r, err := c.HllAdd(ctx, &pb.HllAddRequest{Name: h.Name, Items: items}) if err != nil { return rpcErr(err) } if !r.GetSuccess() { return serverErr(r.GetResultCode(), r.GetMessage()) } return nil } // Count returns the estimated cardinality. func (h *HLL) Count(ctx context.Context) (uint64, error) { c := h.client.sketchesClient() r, err := c.HllCount(ctx, &pb.HllCountRequest{Name: h.Name}) if err != nil { return 0, rpcErr(err) } if !r.GetSuccess() { return 0, serverErr(r.GetResultCode(), r.GetMessage()) } return r.GetEstimate(), nil } // MergeFrom merges sources into this HLL (union of registers). func (h *HLL) MergeFrom(ctx context.Context, sources []string) error { c := h.client.sketchesClient() r, err := c.HllMerge(ctx, &pb.HllMergeRequest{Destination: h.Name, Sources: sources}) if err != nil { return rpcErr(err) } if !r.GetSuccess() { return serverErr(r.GetResultCode(), r.GetMessage()) } return nil } func (c *Client) CreateHLL(ctx context.Context, config HLLConfig) (*HLL, error) { sc := c.sketchesClient() r, err := sc.HllReserve(ctx, &pb.HllReserveRequest{Name: config.Name, Precision: config.Precision}) if err != nil { return nil, rpcErr(err) } if !r.GetSuccess() { return nil, serverErr(r.GetResultCode(), r.GetMessage()) } return newHLL(c, config.Name), nil } func (c *Client) HLLHandle(name string) *HLL { return newHLL(c, name) } func (c *Client) DeleteHLL(ctx context.Context, name string) error { sc := c.sketchesClient() r, err := sc.HllDelete(ctx, &pb.HllDeleteRequest{Name: name}) if err != nil { return rpcErr(err) } if !r.GetSuccess() { return serverErr(r.GetResultCode(), r.GetMessage()) } return nil } // ============================================================ // Count-Min Sketch // ============================================================ // CMSConfig is the CMS creation config. type CMSConfig struct { Name string Width uint64 // 0 = server default Depth uint64 // 0 = server default } // CMSIncrItem is one (item, count) pair for CMS.Incr. type CMSIncrItem struct { Item []byte Count uint64 } // CMS is a reference to a Count-Min Sketch. type CMS struct { client *Client Name string } func newCMS(c *Client, name string) *CMS { return &CMS{client: c, Name: name} } // Incr increments counts. Returns one estimate per input item. func (c *CMS) Incr(ctx context.Context, items []CMSIncrItem) ([]uint64, error) { sc := c.client.sketchesClient() pbItems := make([]*pb.CmsIncrByItem, len(items)) for i, it := range items { pbItems[i] = &pb.CmsIncrByItem{Item: it.Item, Count: it.Count} } r, err := sc.CmsIncrBy(ctx, &pb.CmsIncrByRequest{Name: c.Name, Items: pbItems}) if err != nil { return nil, rpcErr(err) } if !r.GetSuccess() { return nil, serverErr(r.GetResultCode(), r.GetMessage()) } return r.GetCounts(), nil } // Query returns estimated frequencies for items. func (c *CMS) Query(ctx context.Context, items [][]byte) ([]uint64, error) { sc := c.client.sketchesClient() r, err := sc.CmsQuery(ctx, &pb.CmsQueryRequest{Name: c.Name, Items: items}) if err != nil { return nil, rpcErr(err) } if !r.GetSuccess() { return nil, serverErr(r.GetResultCode(), r.GetMessage()) } return r.GetCounts(), nil } func (c *Client) CreateCMS(ctx context.Context, config CMSConfig) (*CMS, error) { sc := c.sketchesClient() r, err := sc.CmsReserve(ctx, &pb.CmsReserveRequest{Name: config.Name, Width: config.Width, Depth: config.Depth}) if err != nil { return nil, rpcErr(err) } if !r.GetSuccess() { return nil, serverErr(r.GetResultCode(), r.GetMessage()) } return newCMS(c, config.Name), nil } func (c *Client) CMSHandle(name string) *CMS { return newCMS(c, name) } func (c *Client) DeleteCMS(ctx context.Context, name string) error { sc := c.sketchesClient() r, err := sc.CmsDelete(ctx, &pb.CmsDeleteRequest{Name: name}) if err != nil { return rpcErr(err) } if !r.GetSuccess() { return serverErr(r.GetResultCode(), r.GetMessage()) } return nil } // ============================================================ // Top-K // ============================================================ // TopKConfig is the Top-K creation config. type TopKConfig struct { Name string K uint32 Width uint64 // 0 = server default Depth uint64 // 0 = server default Decay float64 // 0 = server default } // TopKListEntry is one (item, count) in the top-K list. type TopKListEntry struct { Item []byte Count uint64 } // TopK is a reference to a Top-K sketch. type TopK struct { client *Client Name string } func newTopK(c *Client, name string) *TopK { return &TopK{client: c, Name: name} } // Add adds items. Returns one evicted item per slot (empty bytes if none). func (t *TopK) Add(ctx context.Context, items [][]byte) ([][]byte, error) { c := t.client.sketchesClient() r, err := c.TopKAdd(ctx, &pb.TopKAddRequest{Name: t.Name, Items: items}) if err != nil { return nil, rpcErr(err) } if !r.GetSuccess() { return nil, serverErr(r.GetResultCode(), r.GetMessage()) } return r.GetEvicted(), nil } // Query tests whether items are in the top-K list. func (t *TopK) Query(ctx context.Context, items [][]byte) ([]bool, error) { c := t.client.sketchesClient() r, err := c.TopKQuery(ctx, &pb.TopKQueryRequest{Name: t.Name, Items: items}) if err != nil { return nil, rpcErr(err) } if !r.GetSuccess() { return nil, serverErr(r.GetResultCode(), r.GetMessage()) } return r.GetInTopK(), nil } // List returns the current top-K list. func (t *TopK) List(ctx context.Context) ([]TopKListEntry, error) { c := t.client.sketchesClient() r, err := c.TopKList(ctx, &pb.TopKListRequest{Name: t.Name}) if err != nil { return nil, rpcErr(err) } if !r.GetSuccess() { return nil, serverErr(r.GetResultCode(), r.GetMessage()) } out := make([]TopKListEntry, len(r.GetEntries())) for i, e := range r.GetEntries() { out[i] = TopKListEntry{Item: e.GetItem(), Count: e.GetCount()} } return out, nil } func (c *Client) CreateTopK(ctx context.Context, config TopKConfig) (*TopK, error) { sc := c.sketchesClient() r, err := sc.TopKReserve(ctx, &pb.TopKReserveRequest{ Name: config.Name, K: config.K, Width: config.Width, Depth: config.Depth, Decay: config.Decay, }) if err != nil { return nil, rpcErr(err) } if !r.GetSuccess() { return nil, serverErr(r.GetResultCode(), r.GetMessage()) } return newTopK(c, config.Name), nil } func (c *Client) TopKHandle(name string) *TopK { return newTopK(c, name) } func (c *Client) DeleteTopK(ctx context.Context, name string) error { sc := c.sketchesClient() r, err := sc.TopKDelete(ctx, &pb.TopKDeleteRequest{Name: name}) if err != nil { return rpcErr(err) } if !r.GetSuccess() { return serverErr(r.GetResultCode(), r.GetMessage()) } return nil } // ============================================================ // t-digest // ============================================================ // TDigestConfig is the t-digest creation config. type TDigestConfig struct { Name string Compression uint32 // 0 = server default } // TDigest is a reference to a t-digest quantile sketch. type TDigest struct { client *Client Name string } func newTDigest(c *Client, name string) *TDigest { return &TDigest{client: c, Name: name} } // Add adds values to the sketch. func (t *TDigest) Add(ctx context.Context, values []float64) error { c := t.client.sketchesClient() r, err := c.TDigestAdd(ctx, &pb.TDigestAddRequest{Name: t.Name, Values: values}) if err != nil { return rpcErr(err) } if !r.GetSuccess() { return serverErr(r.GetResultCode(), r.GetMessage()) } return nil } // Quantile computes quantile estimates. One result per input quantile. func (t *TDigest) Quantile(ctx context.Context, quantiles []float64) ([]float64, error) { c := t.client.sketchesClient() r, err := c.TDigestQuantile(ctx, &pb.TDigestQuantileRequest{Name: t.Name, Quantiles: quantiles}) if err != nil { return nil, rpcErr(err) } if !r.GetSuccess() { return nil, serverErr(r.GetResultCode(), r.GetMessage()) } return r.GetValues(), nil } // MinMax returns the minimum and maximum observed values. func (t *TDigest) MinMax(ctx context.Context) (min, max float64, err error) { c := t.client.sketchesClient() r, err := c.TDigestMinMax(ctx, &pb.TDigestMinMaxRequest{Name: t.Name}) if err != nil { return 0, 0, rpcErr(err) } if !r.GetSuccess() { return 0, 0, serverErr(r.GetResultCode(), r.GetMessage()) } return r.GetMin(), r.GetMax(), nil } func (c *Client) CreateTDigest(ctx context.Context, config TDigestConfig) (*TDigest, error) { sc := c.sketchesClient() r, err := sc.TDigestCreate(ctx, &pb.TDigestCreateRequest{Name: config.Name, Compression: config.Compression}) if err != nil { return nil, rpcErr(err) } if !r.GetSuccess() { return nil, serverErr(r.GetResultCode(), r.GetMessage()) } return newTDigest(c, config.Name), nil } func (c *Client) TDigestHandle(name string) *TDigest { return newTDigest(c, name) } func (c *Client) DeleteTDigest(ctx context.Context, name string) error { sc := c.sketchesClient() r, err := sc.TDigestDelete(ctx, &pb.TDigestDeleteRequest{Name: name}) if err != nil { return rpcErr(err) } if !r.GetSuccess() { return serverErr(r.GetResultCode(), r.GetMessage()) } return nil }