123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250 |
- package nodes
- import (
- "context"
- "fmt"
- "github.com/emqx/kuiper/xsql"
- "github.com/emqx/kuiper/xstream/api"
- "math"
- "sort"
- )
- type WatermarkTuple struct {
- Timestamp int64
- }
- func (t *WatermarkTuple) GetTimestamp() int64 {
- return t.Timestamp
- }
- func (t *WatermarkTuple) IsWatermark() bool {
- return true
- }
- const WATERMARK_KEY = "$$wartermark"
- type WatermarkGenerator struct {
- inputTopics []string
- topicToTs map[string]int64
- window *WindowConfig
- lateTolerance int64
- interval int
- //ticker *clock.Ticker
- stream chan<- interface{}
- //state
- lastWatermarkTs int64
- }
- func NewWatermarkGenerator(window *WindowConfig, l int64, s []string, stream chan<- interface{}) (*WatermarkGenerator, error) {
- w := &WatermarkGenerator{
- window: window,
- topicToTs: make(map[string]int64),
- lateTolerance: l,
- inputTopics: s,
- stream: stream,
- }
- switch window.Type {
- case xsql.NOT_WINDOW:
- case xsql.TUMBLING_WINDOW:
- w.interval = window.Length
- case xsql.HOPPING_WINDOW:
- w.interval = window.Interval
- case xsql.SLIDING_WINDOW:
- w.interval = window.Length
- case xsql.SESSION_WINDOW:
- //Use timeout to update watermark
- w.interval = window.Interval
- default:
- return nil, fmt.Errorf("unsupported window type %d", window.Type)
- }
- return w, nil
- }
- func (w *WatermarkGenerator) track(s string, ts int64, ctx api.StreamContext) bool {
- log := ctx.GetLogger()
- log.Debugf("watermark generator track event from topic %s at %d", s, ts)
- currentVal, ok := w.topicToTs[s]
- if !ok || ts > currentVal {
- w.topicToTs[s] = ts
- }
- r := ts >= w.lastWatermarkTs
- if r {
- w.trigger(ctx)
- }
- return r
- }
- func (w *WatermarkGenerator) trigger(ctx api.StreamContext) {
- log := ctx.GetLogger()
- watermark := w.computeWatermarkTs(ctx)
- log.Debugf("compute watermark event at %d with last %d", watermark, w.lastWatermarkTs)
- if watermark > w.lastWatermarkTs {
- t := &WatermarkTuple{Timestamp: watermark}
- select {
- case w.stream <- t:
- default: //TODO need to set buffer
- }
- w.lastWatermarkTs = watermark
- ctx.PutState(WATERMARK_KEY, w.lastWatermarkTs)
- log.Debugf("scan watermark event at %d", watermark)
- }
- }
- func (w *WatermarkGenerator) computeWatermarkTs(_ context.Context) int64 {
- var ts int64
- if len(w.topicToTs) >= len(w.inputTopics) {
- ts = math.MaxInt64
- for _, key := range w.inputTopics {
- if ts > w.topicToTs[key] {
- ts = w.topicToTs[key]
- }
- }
- }
- return ts - w.lateTolerance
- }
- //If window end cannot be determined yet, return max int64 so that it can be recalculated for the next watermark
- func (w *WatermarkGenerator) getNextWindow(inputs []*xsql.Tuple, current int64, watermark int64, triggered bool) int64 {
- switch w.window.Type {
- case xsql.TUMBLING_WINDOW, xsql.HOPPING_WINDOW:
- if triggered {
- return current + int64(w.interval)
- } else {
- interval := int64(w.interval)
- nextTs := getEarliestEventTs(inputs, current, watermark)
- if nextTs == math.MaxInt64 || nextTs%interval == 0 {
- return nextTs
- }
- return nextTs + (interval - nextTs%interval)
- }
- case xsql.SLIDING_WINDOW:
- nextTs := getEarliestEventTs(inputs, current, watermark)
- return nextTs
- case xsql.SESSION_WINDOW:
- if len(inputs) > 0 {
- timeout, duration := int64(w.window.Interval), int64(w.window.Length)
- sort.SliceStable(inputs, func(i, j int) bool {
- return inputs[i].Timestamp < inputs[j].Timestamp
- })
- et := inputs[0].Timestamp
- tick := et + (duration - et%duration)
- if et%duration == 0 {
- tick = et
- }
- var p int64
- for _, tuple := range inputs {
- var r int64 = math.MaxInt64
- if p > 0 {
- if tuple.Timestamp-p > timeout {
- r = p + timeout
- }
- }
- if tuple.Timestamp > tick {
- if tick-duration > et && tick < r {
- r = tick
- }
- tick += duration
- }
- if r < math.MaxInt64 {
- return r
- }
- p = tuple.Timestamp
- }
- }
- return math.MaxInt64
- default:
- return math.MaxInt64
- }
- }
- func (o *WindowOperator) execEventWindow(ctx api.StreamContext, inputs []*xsql.Tuple, errCh chan<- error) {
- log := ctx.GetLogger()
- var (
- triggered bool
- nextWindowEndTs int64
- prevWindowEndTs int64
- )
- o.watermarkGenerator.lastWatermarkTs = 0
- if s, err := ctx.GetState(WATERMARK_KEY); err == nil && s != nil {
- if si, ok := s.(int64); ok {
- o.watermarkGenerator.lastWatermarkTs = si
- } else {
- errCh <- fmt.Errorf("restore window state `lastWatermarkTs` %v error, invalid type", s)
- }
- }
- log.Infof("Start with window state lastWatermarkTs: %d", o.watermarkGenerator.lastWatermarkTs)
- for {
- select {
- // process incoming item
- case item, opened := <-o.input:
- processed := false
- if item, processed = o.preprocess(item); processed {
- break
- }
- o.statManager.ProcessTimeStart()
- if !opened {
- o.statManager.IncTotalExceptions()
- break
- }
- switch d := item.(type) {
- case error:
- o.statManager.IncTotalRecordsIn()
- o.Broadcast(d)
- o.statManager.IncTotalExceptions()
- case xsql.Event:
- if d.IsWatermark() {
- watermarkTs := d.GetTimestamp()
- windowEndTs := nextWindowEndTs
- //Session window needs a recalculation of window because its window end depends the inputs
- if windowEndTs == math.MaxInt64 || o.window.Type == xsql.SESSION_WINDOW || o.window.Type == xsql.SLIDING_WINDOW {
- windowEndTs = o.watermarkGenerator.getNextWindow(inputs, prevWindowEndTs, watermarkTs, triggered)
- }
- for windowEndTs <= watermarkTs && windowEndTs >= 0 {
- log.Debugf("Window end ts %d Watermark ts %d", windowEndTs, watermarkTs)
- log.Debugf("Current input count %d", len(inputs))
- //scan all events and find out the event in the current window
- inputs, triggered = o.scan(inputs, windowEndTs, ctx)
- prevWindowEndTs = windowEndTs
- windowEndTs = o.watermarkGenerator.getNextWindow(inputs, windowEndTs, watermarkTs, triggered)
- }
- nextWindowEndTs = windowEndTs
- log.Debugf("next window end %d", nextWindowEndTs)
- } else {
- o.statManager.IncTotalRecordsIn()
- tuple, ok := d.(*xsql.Tuple)
- if !ok {
- log.Debugf("receive non tuple element %v", d)
- }
- log.Debugf("event window receive tuple %s", tuple.Message)
- if o.watermarkGenerator.track(tuple.Emitter, d.GetTimestamp(), ctx) {
- inputs = append(inputs, tuple)
- }
- }
- o.statManager.ProcessTimeEnd()
- ctx.PutState(WINDOW_INPUTS_KEY, inputs)
- default:
- o.statManager.IncTotalRecordsIn()
- o.Broadcast(fmt.Errorf("run Window error: expect xsql.Event type but got %[1]T(%[1]v)", d))
- o.statManager.IncTotalExceptions()
- }
- // is cancelling
- case <-ctx.Done():
- log.Infoln("Cancelling window....")
- if o.ticker != nil {
- o.ticker.Stop()
- }
- return
- }
- }
- }
- func getEarliestEventTs(inputs []*xsql.Tuple, startTs int64, endTs int64) int64 {
- var minTs int64 = math.MaxInt64
- for _, t := range inputs {
- if t.Timestamp > startTs && t.Timestamp <= endTs && t.Timestamp < minTs {
- minTs = t.Timestamp
- }
- }
- return minTs
- }
|