watermark.go 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269
  1. package node
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/emqx/kuiper/internal/xsql"
  6. "github.com/emqx/kuiper/pkg/api"
  7. "github.com/emqx/kuiper/pkg/ast"
  8. "math"
  9. "sort"
  10. )
  11. type WatermarkTuple struct {
  12. Timestamp int64
  13. }
  14. func (t *WatermarkTuple) GetTimestamp() int64 {
  15. return t.Timestamp
  16. }
  17. func (t *WatermarkTuple) IsWatermark() bool {
  18. return true
  19. }
  20. const WATERMARK_KEY = "$$wartermark"
  21. type WatermarkGenerator struct {
  22. inputTopics []string
  23. topicToTs map[string]int64
  24. window *WindowConfig
  25. lateTolerance int64
  26. interval int
  27. //ticker *clock.Ticker
  28. stream chan<- interface{}
  29. //state
  30. lastWatermarkTs int64
  31. }
  32. func NewWatermarkGenerator(window *WindowConfig, l int64, s []string, stream chan<- interface{}) (*WatermarkGenerator, error) {
  33. w := &WatermarkGenerator{
  34. window: window,
  35. topicToTs: make(map[string]int64),
  36. lateTolerance: l,
  37. inputTopics: s,
  38. stream: stream,
  39. }
  40. switch window.Type {
  41. case ast.NOT_WINDOW:
  42. case ast.TUMBLING_WINDOW:
  43. w.interval = window.Length
  44. case ast.HOPPING_WINDOW:
  45. w.interval = window.Interval
  46. case ast.SLIDING_WINDOW:
  47. w.interval = window.Length
  48. case ast.SESSION_WINDOW:
  49. //Use timeout to update watermark
  50. w.interval = window.Interval
  51. default:
  52. return nil, fmt.Errorf("unsupported window type %d", window.Type)
  53. }
  54. return w, nil
  55. }
  56. func (w *WatermarkGenerator) track(s string, ts int64, ctx api.StreamContext) bool {
  57. log := ctx.GetLogger()
  58. log.Debugf("watermark generator track event from topic %s at %d", s, ts)
  59. currentVal, ok := w.topicToTs[s]
  60. if !ok || ts > currentVal {
  61. w.topicToTs[s] = ts
  62. }
  63. r := ts >= w.lastWatermarkTs
  64. if r {
  65. w.trigger(ctx)
  66. }
  67. return r
  68. }
  69. func (w *WatermarkGenerator) trigger(ctx api.StreamContext) {
  70. log := ctx.GetLogger()
  71. watermark := w.computeWatermarkTs(ctx)
  72. log.Debugf("compute watermark event at %d with last %d", watermark, w.lastWatermarkTs)
  73. if watermark > w.lastWatermarkTs {
  74. t := &WatermarkTuple{Timestamp: watermark}
  75. select {
  76. case w.stream <- t:
  77. default: //TODO need to set buffer
  78. }
  79. w.lastWatermarkTs = watermark
  80. ctx.PutState(WATERMARK_KEY, w.lastWatermarkTs)
  81. log.Debugf("scan watermark event at %d", watermark)
  82. }
  83. }
  84. func (w *WatermarkGenerator) computeWatermarkTs(_ context.Context) int64 {
  85. var ts int64
  86. if len(w.topicToTs) >= len(w.inputTopics) {
  87. ts = math.MaxInt64
  88. for _, key := range w.inputTopics {
  89. if ts > w.topicToTs[key] {
  90. ts = w.topicToTs[key]
  91. }
  92. }
  93. }
  94. return ts - w.lateTolerance
  95. }
  96. //If window end cannot be determined yet, return max int64 so that it can be recalculated for the next watermark
  97. func (w *WatermarkGenerator) getNextWindow(inputs []*xsql.Tuple, current int64, watermark int64, triggered bool) int64 {
  98. switch w.window.Type {
  99. case ast.TUMBLING_WINDOW, ast.HOPPING_WINDOW:
  100. if triggered {
  101. return current + int64(w.interval)
  102. } else {
  103. interval := int64(w.interval)
  104. nextTs := getEarliestEventTs(inputs, current, watermark)
  105. if nextTs == math.MaxInt64 || nextTs%interval == 0 {
  106. return nextTs
  107. }
  108. return nextTs + (interval - nextTs%interval)
  109. }
  110. case ast.SLIDING_WINDOW:
  111. nextTs := getEarliestEventTs(inputs, current, watermark)
  112. return nextTs
  113. default:
  114. return math.MaxInt64
  115. }
  116. }
  117. func (w *WatermarkGenerator) getNextSessionWindow(inputs []*xsql.Tuple, current int64, watermark int64, triggered bool) (int64, bool) {
  118. if len(inputs) > 0 {
  119. timeout, duration := int64(w.window.Interval), int64(w.window.Length)
  120. sort.SliceStable(inputs, func(i, j int) bool {
  121. return inputs[i].Timestamp < inputs[j].Timestamp
  122. })
  123. et := inputs[0].Timestamp
  124. tick := et + (duration - et%duration)
  125. if et%duration == 0 {
  126. tick = et
  127. }
  128. var p int64
  129. ticked := false
  130. for _, tuple := range inputs {
  131. var r int64 = math.MaxInt64
  132. if p > 0 {
  133. if tuple.Timestamp-p > timeout {
  134. r = p + timeout
  135. }
  136. }
  137. if tuple.Timestamp > tick {
  138. if tick-duration > et && tick < r {
  139. r = tick
  140. ticked = true
  141. }
  142. tick += duration
  143. }
  144. if r < math.MaxInt64 {
  145. return r, ticked
  146. }
  147. p = tuple.Timestamp
  148. }
  149. }
  150. return math.MaxInt64, false
  151. }
  152. func (o *WindowOperator) execEventWindow(ctx api.StreamContext, inputs []*xsql.Tuple, errCh chan<- error) {
  153. log := ctx.GetLogger()
  154. var (
  155. triggered bool
  156. nextWindowEndTs int64
  157. prevWindowEndTs int64
  158. lastTicked bool
  159. )
  160. o.watermarkGenerator.lastWatermarkTs = 0
  161. if s, err := ctx.GetState(WATERMARK_KEY); err == nil && s != nil {
  162. if si, ok := s.(int64); ok {
  163. o.watermarkGenerator.lastWatermarkTs = si
  164. } else {
  165. errCh <- fmt.Errorf("restore window state `lastWatermarkTs` %v error, invalid type", s)
  166. }
  167. }
  168. log.Infof("Start with window state lastWatermarkTs: %d", o.watermarkGenerator.lastWatermarkTs)
  169. for {
  170. select {
  171. // process incoming item
  172. case item, opened := <-o.input:
  173. processed := false
  174. if item, processed = o.preprocess(item); processed {
  175. break
  176. }
  177. o.statManager.ProcessTimeStart()
  178. if !opened {
  179. o.statManager.IncTotalExceptions()
  180. break
  181. }
  182. switch d := item.(type) {
  183. case error:
  184. o.statManager.IncTotalRecordsIn()
  185. o.Broadcast(d)
  186. o.statManager.IncTotalExceptions()
  187. case xsql.Event:
  188. if d.IsWatermark() {
  189. watermarkTs := d.GetTimestamp()
  190. windowEndTs := nextWindowEndTs
  191. ticked := false
  192. //Session window needs a recalculation of window because its window end depends the inputs
  193. if windowEndTs == math.MaxInt64 || o.window.Type == ast.SESSION_WINDOW || o.window.Type == ast.SLIDING_WINDOW {
  194. if o.window.Type == ast.SESSION_WINDOW {
  195. windowEndTs, ticked = o.watermarkGenerator.getNextSessionWindow(inputs, prevWindowEndTs, watermarkTs, triggered)
  196. } else {
  197. windowEndTs = o.watermarkGenerator.getNextWindow(inputs, prevWindowEndTs, watermarkTs, triggered)
  198. }
  199. }
  200. for windowEndTs <= watermarkTs && windowEndTs >= 0 {
  201. log.Debugf("Window end ts %d Watermark ts %d", windowEndTs, watermarkTs)
  202. log.Debugf("Current input count %d", len(inputs))
  203. //scan all events and find out the event in the current window
  204. if o.window.Type == ast.SESSION_WINDOW && !lastTicked {
  205. o.triggerTime = inputs[0].Timestamp
  206. }
  207. inputs, triggered = o.scan(inputs, windowEndTs, ctx)
  208. prevWindowEndTs = windowEndTs
  209. lastTicked = ticked
  210. if o.window.Type == ast.SESSION_WINDOW {
  211. windowEndTs, ticked = o.watermarkGenerator.getNextSessionWindow(inputs, prevWindowEndTs, watermarkTs, triggered)
  212. } else {
  213. windowEndTs = o.watermarkGenerator.getNextWindow(inputs, prevWindowEndTs, watermarkTs, triggered)
  214. }
  215. }
  216. nextWindowEndTs = windowEndTs
  217. log.Debugf("next window end %d", nextWindowEndTs)
  218. } else {
  219. o.statManager.IncTotalRecordsIn()
  220. tuple, ok := d.(*xsql.Tuple)
  221. if !ok {
  222. log.Debugf("receive non tuple element %v", d)
  223. }
  224. log.Debugf("event window receive tuple %s", tuple.Message)
  225. if o.watermarkGenerator.track(tuple.Emitter, d.GetTimestamp(), ctx) {
  226. inputs = append(inputs, tuple)
  227. }
  228. }
  229. o.statManager.ProcessTimeEnd()
  230. ctx.PutState(WINDOW_INPUTS_KEY, inputs)
  231. default:
  232. o.statManager.IncTotalRecordsIn()
  233. o.Broadcast(fmt.Errorf("run Window error: expect xsql.Event type but got %[1]T(%[1]v)", d))
  234. o.statManager.IncTotalExceptions()
  235. }
  236. // is cancelling
  237. case <-ctx.Done():
  238. log.Infoln("Cancelling window....")
  239. if o.ticker != nil {
  240. o.ticker.Stop()
  241. }
  242. return
  243. }
  244. }
  245. }
  246. func getEarliestEventTs(inputs []*xsql.Tuple, startTs int64, endTs int64) int64 {
  247. var minTs int64 = math.MaxInt64
  248. for _, t := range inputs {
  249. if t.Timestamp > startTs && t.Timestamp <= endTs && t.Timestamp < minTs {
  250. minTs = t.Timestamp
  251. }
  252. }
  253. return minTs
  254. }