watermark.go 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257
  1. package operators
  2. import (
  3. "context"
  4. "github.com/emqx/kuiper/common"
  5. "github.com/emqx/kuiper/xsql"
  6. "github.com/emqx/kuiper/xstream/api"
  7. "fmt"
  8. "math"
  9. "sort"
  10. "time"
  11. )
  12. type WatermarkTuple struct{
  13. Timestamp int64
  14. }
  15. func (t *WatermarkTuple) GetTimestamp() int64 {
  16. return t.Timestamp
  17. }
  18. func (t *WatermarkTuple) IsWatermark() bool {
  19. return true
  20. }
  21. type WatermarkGenerator struct {
  22. lastWatermarkTs int64
  23. inputTopics []string
  24. topicToTs map[string]int64
  25. window *WindowConfig
  26. lateTolerance int64
  27. interval int
  28. ticker common.Ticker
  29. stream chan<- interface{}
  30. }
  31. func NewWatermarkGenerator(window *WindowConfig, l int64, s []string, stream chan<- interface{}) (*WatermarkGenerator, error){
  32. w := &WatermarkGenerator{
  33. window: window,
  34. topicToTs: make(map[string]int64),
  35. lateTolerance: l,
  36. inputTopics: s,
  37. stream: stream,
  38. }
  39. //Tickers to update watermark
  40. switch window.Type{
  41. case xsql.NOT_WINDOW:
  42. case xsql.TUMBLING_WINDOW:
  43. w.ticker = common.GetTicker(window.Length)
  44. w.interval = window.Length
  45. case xsql.HOPPING_WINDOW:
  46. w.ticker = common.GetTicker(window.Interval)
  47. w.interval = window.Interval
  48. case xsql.SLIDING_WINDOW:
  49. w.interval = window.Length
  50. case xsql.SESSION_WINDOW:
  51. //Use timeout to update watermark
  52. w.ticker = common.GetTicker(window.Interval)
  53. w.interval = window.Interval
  54. default:
  55. return nil, fmt.Errorf("unsupported window type %d", window.Type)
  56. }
  57. return w, nil
  58. }
  59. func (w *WatermarkGenerator) track(s string, ts int64, ctx api.StreamContext) bool {
  60. log := ctx.GetLogger()
  61. log.Infof("watermark generator track event from topic %s at %d", s, ts)
  62. currentVal, ok := w.topicToTs[s]
  63. if !ok || ts > currentVal {
  64. w.topicToTs[s] = ts
  65. }
  66. r := ts >= w.lastWatermarkTs
  67. if r{
  68. switch w.window.Type{
  69. case xsql.SLIDING_WINDOW:
  70. w.trigger(ctx)
  71. }
  72. }
  73. return r
  74. }
  75. func (w *WatermarkGenerator) start(ctx api.StreamContext) {
  76. log := ctx.GetLogger()
  77. var c <-chan time.Time
  78. if w.ticker != nil {
  79. c = w.ticker.GetC()
  80. }
  81. for {
  82. select {
  83. case <-c:
  84. w.trigger(ctx)
  85. case <-ctx.Done():
  86. log.Infoln("Cancelling watermark generator....")
  87. if w.ticker != nil{
  88. w.ticker.Stop()
  89. }
  90. return
  91. }
  92. }
  93. }
  94. func (w *WatermarkGenerator) trigger(ctx api.StreamContext) {
  95. log := ctx.GetLogger()
  96. watermark := w.computeWatermarkTs(ctx)
  97. log.Infof("compute watermark event at %d with last %d", watermark, w.lastWatermarkTs)
  98. if watermark > w.lastWatermarkTs {
  99. t := &WatermarkTuple{Timestamp: watermark}
  100. select {
  101. case w.stream <- t:
  102. default: //TODO need to set buffer
  103. }
  104. w.lastWatermarkTs = watermark
  105. log.Infof("scan watermark event at %d", watermark)
  106. }
  107. }
  108. func (w *WatermarkGenerator) computeWatermarkTs(ctx context.Context) int64{
  109. var ts int64
  110. if len(w.topicToTs) >= len(w.inputTopics) {
  111. ts = math.MaxInt64
  112. for _, key := range w.inputTopics {
  113. if ts > w.topicToTs[key]{
  114. ts = w.topicToTs[key]
  115. }
  116. }
  117. }
  118. return ts - w.lateTolerance
  119. }
  120. //If window end cannot be determined yet, return max int64 so that it can be recalculated for the next watermark
  121. func (w *WatermarkGenerator) getNextWindow(inputs []*xsql.Tuple,current int64, watermark int64, triggered bool) int64{
  122. switch w.window.Type{
  123. case xsql.TUMBLING_WINDOW, xsql.HOPPING_WINDOW:
  124. if triggered{
  125. return current + int64(w.interval)
  126. }else{
  127. interval := int64(w.interval)
  128. nextTs := getEarliestEventTs(inputs, current, watermark)
  129. if nextTs == math.MaxInt64 || nextTs % interval == 0{
  130. return nextTs
  131. }
  132. return nextTs + (interval - nextTs % interval)
  133. }
  134. case xsql.SLIDING_WINDOW:
  135. nextTs := getEarliestEventTs(inputs, current, watermark)
  136. return nextTs
  137. case xsql.SESSION_WINDOW:
  138. if len(inputs) > 0{
  139. timeout, duration := int64(w.window.Interval), int64(w.window.Length)
  140. sort.SliceStable(inputs, func(i, j int) bool {
  141. return inputs[i].Timestamp < inputs[j].Timestamp
  142. })
  143. et := inputs[0].Timestamp
  144. tick := et + (duration - et % duration)
  145. if et % duration == 0 {
  146. tick = et
  147. }
  148. var p int64
  149. for _, tuple := range inputs {
  150. var r int64 = math.MaxInt64
  151. if p > 0{
  152. if tuple.Timestamp - p > timeout{
  153. r = p + timeout
  154. }
  155. }
  156. if tuple.Timestamp > tick {
  157. if tick - duration > et && tick < r {
  158. r = tick
  159. }
  160. tick += duration
  161. }
  162. if r < math.MaxInt64{
  163. return r
  164. }
  165. p = tuple.Timestamp
  166. }
  167. }
  168. return math.MaxInt64
  169. default:
  170. return math.MaxInt64
  171. }
  172. }
  173. func (o *WindowOperator) execEventWindow(ctx api.StreamContext, errCh chan<- error) {
  174. exeCtx, cancel := ctx.WithCancel()
  175. log := ctx.GetLogger()
  176. go o.watermarkGenerator.start(exeCtx)
  177. var (
  178. inputs []*xsql.Tuple
  179. triggered bool
  180. nextWindowEndTs int64
  181. prevWindowEndTs int64
  182. )
  183. for {
  184. select {
  185. // process incoming item
  186. case item, opened := <-o.input:
  187. if !opened {
  188. break
  189. }
  190. if d, ok := item.(xsql.Event); !ok {
  191. log.Errorf("Expect xsql.Event type")
  192. break
  193. }else{
  194. if d.IsWatermark(){
  195. watermarkTs := d.GetTimestamp()
  196. windowEndTs := nextWindowEndTs
  197. //Session window needs a recalculation of window because its window end depends the inputs
  198. if windowEndTs == math.MaxInt64 || o.window.Type == xsql.SESSION_WINDOW || o.window.Type == xsql.SLIDING_WINDOW{
  199. windowEndTs = o.watermarkGenerator.getNextWindow(inputs, prevWindowEndTs, watermarkTs, triggered)
  200. }
  201. for windowEndTs <= watermarkTs && windowEndTs >= 0 {
  202. log.Debugf("Window end ts %d Watermark ts %d", windowEndTs, watermarkTs)
  203. log.Debugf("Current input count %d", len(inputs))
  204. //scan all events and find out the event in the current window
  205. inputs, triggered = o.scan(inputs, windowEndTs, ctx)
  206. prevWindowEndTs = windowEndTs
  207. windowEndTs = o.watermarkGenerator.getNextWindow(inputs, windowEndTs, watermarkTs, triggered)
  208. }
  209. nextWindowEndTs = windowEndTs
  210. log.Debugf("next window end %d", nextWindowEndTs)
  211. }else{
  212. tuple, ok := d.(*xsql.Tuple)
  213. if !ok{
  214. log.Infof("receive non tuple element %v", d)
  215. }
  216. log.Debugf("event window receive tuple %s", tuple.Message)
  217. if o.watermarkGenerator.track(tuple.Emitter, d.GetTimestamp(),ctx){
  218. inputs = append(inputs, tuple)
  219. }
  220. }
  221. }
  222. // is cancelling
  223. case <-ctx.Done():
  224. log.Infoln("Cancelling window....")
  225. if o.ticker != nil{
  226. o.ticker.Stop()
  227. }
  228. cancel()
  229. return
  230. }
  231. }
  232. }
  233. func getEarliestEventTs(inputs []*xsql.Tuple, startTs int64, endTs int64) int64{
  234. var minTs int64 = math.MaxInt64
  235. for _, t := range inputs{
  236. if t.Timestamp > startTs && t.Timestamp <= endTs && t.Timestamp < minTs{
  237. minTs = t.Timestamp
  238. }
  239. }
  240. return minTs
  241. }