watermark.go 7.0 KB


  1. package operators
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/benbjohnson/clock"
  6. "github.com/emqx/kuiper/common"
  7. "github.com/emqx/kuiper/xsql"
  8. "github.com/emqx/kuiper/xstream/api"
  9. "github.com/emqx/kuiper/xstream/nodes"
  10. "math"
  11. "sort"
  12. "time"
  13. )
  14. type WatermarkTuple struct {
  15. Timestamp int64
  16. }
  17. func (t *WatermarkTuple) GetTimestamp() int64 {
  18. return t.Timestamp
  19. }
  20. func (t *WatermarkTuple) IsWatermark() bool {
  21. return true
  22. }
  23. type WatermarkGenerator struct {
  24. lastWatermarkTs int64
  25. inputTopics []string
  26. topicToTs map[string]int64
  27. window *WindowConfig
  28. lateTolerance int64
  29. interval int
  30. ticker *clock.Ticker
  31. stream chan<- interface{}
  32. }
  33. func NewWatermarkGenerator(window *WindowConfig, l int64, s []string, stream chan<- interface{}) (*WatermarkGenerator, error) {
  34. w := &WatermarkGenerator{
  35. window: window,
  36. topicToTs: make(map[string]int64),
  37. lateTolerance: l,
  38. inputTopics: s,
  39. stream: stream,
  40. }
  41. //Tickers to update watermark
  42. switch window.Type {
  43. case xsql.NOT_WINDOW:
  44. case xsql.TUMBLING_WINDOW:
  45. w.ticker = common.GetTicker(window.Length)
  46. w.interval = window.Length
  47. case xsql.HOPPING_WINDOW:
  48. w.ticker = common.GetTicker(window.Interval)
  49. w.interval = window.Interval
  50. case xsql.SLIDING_WINDOW:
  51. w.interval = window.Length
  52. case xsql.SESSION_WINDOW:
  53. //Use timeout to update watermark
  54. w.ticker = common.GetTicker(window.Interval)
  55. w.interval = window.Interval
  56. default:
  57. return nil, fmt.Errorf("unsupported window type %d", window.Type)
  58. }
  59. return w, nil
  60. }
  61. func (w *WatermarkGenerator) track(s string, ts int64, ctx api.StreamContext) bool {
  62. log := ctx.GetLogger()
  63. log.Debugf("watermark generator track event from topic %s at %d", s, ts)
  64. currentVal, ok := w.topicToTs[s]
  65. if !ok || ts > currentVal {
  66. w.topicToTs[s] = ts
  67. }
  68. r := ts >= w.lastWatermarkTs
  69. if r {
  70. switch w.window.Type {
  71. case xsql.SLIDING_WINDOW:
  72. w.trigger(ctx)
  73. }
  74. }
  75. return r
  76. }
  77. func (w *WatermarkGenerator) start(ctx api.StreamContext) {
  78. log := ctx.GetLogger()
  79. var c <-chan time.Time
  80. if w.ticker != nil {
  81. c = w.ticker.C
  82. }
  83. for {
  84. select {
  85. case <-c:
  86. w.trigger(ctx)
  87. case <-ctx.Done():
  88. log.Infoln("Cancelling watermark generator....")
  89. if w.ticker != nil {
  90. w.ticker.Stop()
  91. }
  92. return
  93. }
  94. }
  95. }
  96. func (w *WatermarkGenerator) trigger(ctx api.StreamContext) {
  97. log := ctx.GetLogger()
  98. watermark := w.computeWatermarkTs(ctx)
  99. log.Debugf("compute watermark event at %d with last %d", watermark, w.lastWatermarkTs)
  100. if watermark > w.lastWatermarkTs {
  101. t := &WatermarkTuple{Timestamp: watermark}
  102. select {
  103. case w.stream <- t:
  104. default: //TODO need to set buffer
  105. }
  106. w.lastWatermarkTs = watermark
  107. log.Debugf("scan watermark event at %d", watermark)
  108. }
  109. }
  110. func (w *WatermarkGenerator) computeWatermarkTs(ctx context.Context) int64 {
  111. var ts int64
  112. if len(w.topicToTs) >= len(w.inputTopics) {
  113. ts = math.MaxInt64
  114. for _, key := range w.inputTopics {
  115. if ts > w.topicToTs[key] {
  116. ts = w.topicToTs[key]
  117. }
  118. }
  119. }
  120. return ts - w.lateTolerance
  121. }
  122. //If window end cannot be determined yet, return max int64 so that it can be recalculated for the next watermark
  123. func (w *WatermarkGenerator) getNextWindow(inputs []*xsql.Tuple, current int64, watermark int64, triggered bool) int64 {
  124. switch w.window.Type {
  125. case xsql.TUMBLING_WINDOW, xsql.HOPPING_WINDOW:
  126. if triggered {
  127. return current + int64(w.interval)
  128. } else {
  129. interval := int64(w.interval)
  130. nextTs := getEarliestEventTs(inputs, current, watermark)
  131. if nextTs == math.MaxInt64 || nextTs%interval == 0 {
  132. return nextTs
  133. }
  134. return nextTs + (interval - nextTs%interval)
  135. }
  136. case xsql.SLIDING_WINDOW:
  137. nextTs := getEarliestEventTs(inputs, current, watermark)
  138. return nextTs
  139. case xsql.SESSION_WINDOW:
  140. if len(inputs) > 0 {
  141. timeout, duration := int64(w.window.Interval), int64(w.window.Length)
  142. sort.SliceStable(inputs, func(i, j int) bool {
  143. return inputs[i].Timestamp < inputs[j].Timestamp
  144. })
  145. et := inputs[0].Timestamp
  146. tick := et + (duration - et%duration)
  147. if et%duration == 0 {
  148. tick = et
  149. }
  150. var p int64
  151. for _, tuple := range inputs {
  152. var r int64 = math.MaxInt64
  153. if p > 0 {
  154. if tuple.Timestamp-p > timeout {
  155. r = p + timeout
  156. }
  157. }
  158. if tuple.Timestamp > tick {
  159. if tick-duration > et && tick < r {
  160. r = tick
  161. }
  162. tick += duration
  163. }
  164. if r < math.MaxInt64 {
  165. return r
  166. }
  167. p = tuple.Timestamp
  168. }
  169. }
  170. return math.MaxInt64
  171. default:
  172. return math.MaxInt64
  173. }
  174. }
  175. func (o *WindowOperator) execEventWindow(ctx api.StreamContext, errCh chan<- error) {
  176. exeCtx, cancel := ctx.WithCancel()
  177. log := ctx.GetLogger()
  178. go o.watermarkGenerator.start(exeCtx)
  179. var (
  180. inputs []*xsql.Tuple
  181. triggered bool
  182. nextWindowEndTs int64
  183. prevWindowEndTs int64
  184. )
  185. for {
  186. select {
  187. // process incoming item
  188. case item, opened := <-o.input:
  189. o.statManager.ProcessTimeStart()
  190. if !opened {
  191. o.statManager.IncTotalExceptions()
  192. break
  193. }
  194. switch d := item.(type) {
  195. case error:
  196. o.statManager.IncTotalRecordsIn()
  197. nodes.Broadcast(o.outputs, d, ctx)
  198. o.statManager.IncTotalExceptions()
  199. case xsql.Event:
  200. if d.IsWatermark() {
  201. watermarkTs := d.GetTimestamp()
  202. windowEndTs := nextWindowEndTs
  203. //Session window needs a recalculation of window because its window end depends the inputs
  204. if windowEndTs == math.MaxInt64 || o.window.Type == xsql.SESSION_WINDOW || o.window.Type == xsql.SLIDING_WINDOW {
  205. windowEndTs = o.watermarkGenerator.getNextWindow(inputs, prevWindowEndTs, watermarkTs, triggered)
  206. }
  207. for windowEndTs <= watermarkTs && windowEndTs >= 0 {
  208. log.Debugf("Window end ts %d Watermark ts %d", windowEndTs, watermarkTs)
  209. log.Debugf("Current input count %d", len(inputs))
  210. //scan all events and find out the event in the current window
  211. inputs, triggered = o.scan(inputs, windowEndTs, ctx)
  212. prevWindowEndTs = windowEndTs
  213. windowEndTs = o.watermarkGenerator.getNextWindow(inputs, windowEndTs, watermarkTs, triggered)
  214. }
  215. nextWindowEndTs = windowEndTs
  216. log.Debugf("next window end %d", nextWindowEndTs)
  217. } else {
  218. o.statManager.IncTotalRecordsIn()
  219. tuple, ok := d.(*xsql.Tuple)
  220. if !ok {
  221. log.Debugf("receive non tuple element %v", d)
  222. }
  223. log.Debugf("event window receive tuple %s", tuple.Message)
  224. if o.watermarkGenerator.track(tuple.Emitter, d.GetTimestamp(), ctx) {
  225. inputs = append(inputs, tuple)
  226. }
  227. }
  228. o.statManager.ProcessTimeEnd()
  229. default:
  230. o.statManager.IncTotalRecordsIn()
  231. nodes.Broadcast(o.outputs, fmt.Errorf("run Window error: expect xsql.Event type but got %[1]T(%[1]v)", d), ctx)
  232. o.statManager.IncTotalExceptions()
  233. }
  234. // is cancelling
  235. case <-ctx.Done():
  236. log.Infoln("Cancelling window....")
  237. if o.ticker != nil {
  238. o.ticker.Stop()
  239. }
  240. cancel()
  241. return
  242. }
  243. }
  244. }
  245. func getEarliestEventTs(inputs []*xsql.Tuple, startTs int64, endTs int64) int64 {
  246. var minTs int64 = math.MaxInt64
  247. for _, t := range inputs {
  248. if t.Timestamp > startTs && t.Timestamp <= endTs && t.Timestamp < minTs {
  249. minTs = t.Timestamp
  250. }
  251. }
  252. return minTs
  253. }