watermark.go 6.7 KB


  1. package operators
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/benbjohnson/clock"
  6. "github.com/emqx/kuiper/common"
  7. "github.com/emqx/kuiper/xsql"
  8. "github.com/emqx/kuiper/xstream/api"
  9. "math"
  10. "sort"
  11. "time"
  12. )
  13. type WatermarkTuple struct {
  14. Timestamp int64
  15. }
  16. func (t *WatermarkTuple) GetTimestamp() int64 {
  17. return t.Timestamp
  18. }
  19. func (t *WatermarkTuple) IsWatermark() bool {
  20. return true
  21. }
  22. type WatermarkGenerator struct {
  23. lastWatermarkTs int64
  24. inputTopics []string
  25. topicToTs map[string]int64
  26. window *WindowConfig
  27. lateTolerance int64
  28. interval int
  29. ticker *clock.Ticker
  30. stream chan<- interface{}
  31. }
  32. func NewWatermarkGenerator(window *WindowConfig, l int64, s []string, stream chan<- interface{}) (*WatermarkGenerator, error) {
  33. w := &WatermarkGenerator{
  34. window: window,
  35. topicToTs: make(map[string]int64),
  36. lateTolerance: l,
  37. inputTopics: s,
  38. stream: stream,
  39. }
  40. //Tickers to update watermark
  41. switch window.Type {
  42. case xsql.NOT_WINDOW:
  43. case xsql.TUMBLING_WINDOW:
  44. w.ticker = common.GetTicker(window.Length)
  45. w.interval = window.Length
  46. case xsql.HOPPING_WINDOW:
  47. w.ticker = common.GetTicker(window.Interval)
  48. w.interval = window.Interval
  49. case xsql.SLIDING_WINDOW:
  50. w.interval = window.Length
  51. case xsql.SESSION_WINDOW:
  52. //Use timeout to update watermark
  53. w.ticker = common.GetTicker(window.Interval)
  54. w.interval = window.Interval
  55. default:
  56. return nil, fmt.Errorf("unsupported window type %d", window.Type)
  57. }
  58. return w, nil
  59. }
  60. func (w *WatermarkGenerator) track(s string, ts int64, ctx api.StreamContext) bool {
  61. log := ctx.GetLogger()
  62. log.Debugf("watermark generator track event from topic %s at %d", s, ts)
  63. currentVal, ok := w.topicToTs[s]
  64. if !ok || ts > currentVal {
  65. w.topicToTs[s] = ts
  66. }
  67. r := ts >= w.lastWatermarkTs
  68. if r {
  69. switch w.window.Type {
  70. case xsql.SLIDING_WINDOW:
  71. w.trigger(ctx)
  72. }
  73. }
  74. return r
  75. }
  76. func (w *WatermarkGenerator) start(ctx api.StreamContext) {
  77. log := ctx.GetLogger()
  78. var c <-chan time.Time
  79. if w.ticker != nil {
  80. c = w.ticker.C
  81. }
  82. for {
  83. select {
  84. case <-c:
  85. w.trigger(ctx)
  86. case <-ctx.Done():
  87. log.Infoln("Cancelling watermark generator....")
  88. if w.ticker != nil {
  89. w.ticker.Stop()
  90. }
  91. return
  92. }
  93. }
  94. }
  95. func (w *WatermarkGenerator) trigger(ctx api.StreamContext) {
  96. log := ctx.GetLogger()
  97. watermark := w.computeWatermarkTs(ctx)
  98. log.Debugf("compute watermark event at %d with last %d", watermark, w.lastWatermarkTs)
  99. if watermark > w.lastWatermarkTs {
  100. t := &WatermarkTuple{Timestamp: watermark}
  101. select {
  102. case w.stream <- t:
  103. default: //TODO need to set buffer
  104. }
  105. w.lastWatermarkTs = watermark
  106. log.Debugf("scan watermark event at %d", watermark)
  107. }
  108. }
  109. func (w *WatermarkGenerator) computeWatermarkTs(ctx context.Context) int64 {
  110. var ts int64
  111. if len(w.topicToTs) >= len(w.inputTopics) {
  112. ts = math.MaxInt64
  113. for _, key := range w.inputTopics {
  114. if ts > w.topicToTs[key] {
  115. ts = w.topicToTs[key]
  116. }
  117. }
  118. }
  119. return ts - w.lateTolerance
  120. }
  121. //If window end cannot be determined yet, return max int64 so that it can be recalculated for the next watermark
  122. func (w *WatermarkGenerator) getNextWindow(inputs []*xsql.Tuple, current int64, watermark int64, triggered bool) int64 {
  123. switch w.window.Type {
  124. case xsql.TUMBLING_WINDOW, xsql.HOPPING_WINDOW:
  125. if triggered {
  126. return current + int64(w.interval)
  127. } else {
  128. interval := int64(w.interval)
  129. nextTs := getEarliestEventTs(inputs, current, watermark)
  130. if nextTs == math.MaxInt64 || nextTs%interval == 0 {
  131. return nextTs
  132. }
  133. return nextTs + (interval - nextTs%interval)
  134. }
  135. case xsql.SLIDING_WINDOW:
  136. nextTs := getEarliestEventTs(inputs, current, watermark)
  137. return nextTs
  138. case xsql.SESSION_WINDOW:
  139. if len(inputs) > 0 {
  140. timeout, duration := int64(w.window.Interval), int64(w.window.Length)
  141. sort.SliceStable(inputs, func(i, j int) bool {
  142. return inputs[i].Timestamp < inputs[j].Timestamp
  143. })
  144. et := inputs[0].Timestamp
  145. tick := et + (duration - et%duration)
  146. if et%duration == 0 {
  147. tick = et
  148. }
  149. var p int64
  150. for _, tuple := range inputs {
  151. var r int64 = math.MaxInt64
  152. if p > 0 {
  153. if tuple.Timestamp-p > timeout {
  154. r = p + timeout
  155. }
  156. }
  157. if tuple.Timestamp > tick {
  158. if tick-duration > et && tick < r {
  159. r = tick
  160. }
  161. tick += duration
  162. }
  163. if r < math.MaxInt64 {
  164. return r
  165. }
  166. p = tuple.Timestamp
  167. }
  168. }
  169. return math.MaxInt64
  170. default:
  171. return math.MaxInt64
  172. }
  173. }
  174. func (o *WindowOperator) execEventWindow(ctx api.StreamContext, errCh chan<- error) {
  175. exeCtx, cancel := ctx.WithCancel()
  176. log := ctx.GetLogger()
  177. go o.watermarkGenerator.start(exeCtx)
  178. var (
  179. inputs []*xsql.Tuple
  180. triggered bool
  181. nextWindowEndTs int64
  182. prevWindowEndTs int64
  183. )
  184. for {
  185. select {
  186. // process incoming item
  187. case item, opened := <-o.input:
  188. o.statManager.ProcessTimeStart()
  189. if !opened {
  190. o.statManager.IncTotalExceptions()
  191. break
  192. }
  193. if d, ok := item.(xsql.Event); !ok {
  194. log.Errorf("Expect xsql.Event type")
  195. o.statManager.IncTotalExceptions()
  196. break
  197. } else {
  198. if d.IsWatermark() {
  199. watermarkTs := d.GetTimestamp()
  200. windowEndTs := nextWindowEndTs
  201. //Session window needs a recalculation of window because its window end depends the inputs
  202. if windowEndTs == math.MaxInt64 || o.window.Type == xsql.SESSION_WINDOW || o.window.Type == xsql.SLIDING_WINDOW {
  203. windowEndTs = o.watermarkGenerator.getNextWindow(inputs, prevWindowEndTs, watermarkTs, triggered)
  204. }
  205. for windowEndTs <= watermarkTs && windowEndTs >= 0 {
  206. log.Debugf("Window end ts %d Watermark ts %d", windowEndTs, watermarkTs)
  207. log.Debugf("Current input count %d", len(inputs))
  208. //scan all events and find out the event in the current window
  209. inputs, triggered = o.scan(inputs, windowEndTs, ctx)
  210. prevWindowEndTs = windowEndTs
  211. windowEndTs = o.watermarkGenerator.getNextWindow(inputs, windowEndTs, watermarkTs, triggered)
  212. }
  213. nextWindowEndTs = windowEndTs
  214. log.Debugf("next window end %d", nextWindowEndTs)
  215. } else {
  216. o.statManager.IncTotalRecordsIn()
  217. tuple, ok := d.(*xsql.Tuple)
  218. if !ok {
  219. log.Debugf("receive non tuple element %v", d)
  220. }
  221. log.Debugf("event window receive tuple %s", tuple.Message)
  222. if o.watermarkGenerator.track(tuple.Emitter, d.GetTimestamp(), ctx) {
  223. inputs = append(inputs, tuple)
  224. }
  225. }
  226. o.statManager.ProcessTimeEnd()
  227. }
  228. // is cancelling
  229. case <-ctx.Done():
  230. log.Infoln("Cancelling window....")
  231. if o.ticker != nil {
  232. o.ticker.Stop()
  233. }
  234. cancel()
  235. return
  236. }
  237. }
  238. }
  239. func getEarliestEventTs(inputs []*xsql.Tuple, startTs int64, endTs int64) int64 {
  240. var minTs int64 = math.MaxInt64
  241. for _, t := range inputs {
  242. if t.Timestamp > startTs && t.Timestamp <= endTs && t.Timestamp < minTs {
  243. minTs = t.Timestamp
  244. }
  245. }
  246. return minTs
  247. }