watermark.go 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258
  1. package operators
  2. import (
  3. "context"
  4. "engine/common"
  5. "engine/xsql"
  6. "fmt"
  7. "math"
  8. "sort"
  9. "time"
  10. )
  11. type WatermarkTuple struct{
  12. Timestamp int64
  13. }
  14. func (t *WatermarkTuple) GetTimestamp() int64 {
  15. return t.Timestamp
  16. }
  17. func (t *WatermarkTuple) IsWatermark() bool {
  18. return true
  19. }
  20. type WatermarkGenerator struct {
  21. lastWatermarkTs int64
  22. inputTopics []string
  23. topicToTs map[string]int64
  24. window *WindowConfig
  25. lateTolerance int64
  26. interval int
  27. ticker common.Ticker
  28. stream chan<- interface{}
  29. }
  30. func NewWatermarkGenerator(window *WindowConfig, l int64, s []string, stream chan<- interface{}) (*WatermarkGenerator, error){
  31. w := &WatermarkGenerator{
  32. window: window,
  33. topicToTs: make(map[string]int64),
  34. lateTolerance: l,
  35. inputTopics: s,
  36. stream: stream,
  37. }
  38. //Tickers to update watermark
  39. switch window.Type{
  40. case xsql.NOT_WINDOW:
  41. case xsql.TUMBLING_WINDOW:
  42. w.ticker = common.GetTicker(window.Length)
  43. w.interval = window.Length
  44. case xsql.HOPPING_WINDOW:
  45. w.ticker = common.GetTicker(window.Interval)
  46. w.interval = window.Interval
  47. case xsql.SLIDING_WINDOW:
  48. w.interval = window.Length
  49. case xsql.SESSION_WINDOW:
  50. //Use timeout to update watermark
  51. w.ticker = common.GetTicker(window.Interval)
  52. w.interval = window.Interval
  53. default:
  54. return nil, fmt.Errorf("unsupported window type %d", window.Type)
  55. }
  56. return w, nil
  57. }
  58. func (w *WatermarkGenerator) track(s string, ts int64, ctx context.Context) bool {
  59. log := common.GetLogger(ctx)
  60. log.Infof("watermark generator track event from topic %s at %d", s, ts)
  61. currentVal, ok := w.topicToTs[s]
  62. if !ok || ts > currentVal {
  63. w.topicToTs[s] = ts
  64. }
  65. r := ts >= w.lastWatermarkTs
  66. if r{
  67. switch w.window.Type{
  68. case xsql.SLIDING_WINDOW:
  69. w.trigger(ctx)
  70. }
  71. }
  72. return r
  73. }
  74. func (w *WatermarkGenerator) start(ctx context.Context) {
  75. exeCtx, cancel := context.WithCancel(ctx)
  76. log := common.GetLogger(ctx)
  77. var c <-chan time.Time
  78. if w.ticker != nil {
  79. c = w.ticker.GetC()
  80. }
  81. for {
  82. select {
  83. case <-c:
  84. w.trigger(ctx)
  85. case <-exeCtx.Done():
  86. log.Println("Cancelling watermark generator....")
  87. if w.ticker != nil{
  88. w.ticker.Stop()
  89. }
  90. cancel()
  91. return
  92. }
  93. }
  94. }
  95. func (w *WatermarkGenerator) trigger(ctx context.Context) {
  96. log := common.GetLogger(ctx)
  97. watermark := w.computeWatermarkTs(ctx)
  98. log.Infof("compute watermark event at %d with last %d", watermark, w.lastWatermarkTs)
  99. if watermark > w.lastWatermarkTs {
  100. t := &WatermarkTuple{Timestamp: watermark}
  101. select {
  102. case w.stream <- t:
  103. default: //TODO need to set buffer
  104. }
  105. w.lastWatermarkTs = watermark
  106. log.Infof("scan watermark event at %d", watermark)
  107. }
  108. }
  109. func (w *WatermarkGenerator) computeWatermarkTs(ctx context.Context) int64{
  110. var ts int64
  111. if len(w.topicToTs) >= len(w.inputTopics) {
  112. ts = math.MaxInt64
  113. for _, key := range w.inputTopics {
  114. if ts > w.topicToTs[key]{
  115. ts = w.topicToTs[key]
  116. }
  117. }
  118. }
  119. return ts - w.lateTolerance
  120. }
  121. //If window end cannot be determined yet, return max int64 so that it can be recalculated for the next watermark
  122. func (w *WatermarkGenerator) getNextWindow(inputs []*xsql.Tuple,current int64, watermark int64, triggered bool) int64{
  123. switch w.window.Type{
  124. case xsql.TUMBLING_WINDOW, xsql.HOPPING_WINDOW:
  125. if triggered{
  126. return current + int64(w.interval)
  127. }else{
  128. interval := int64(w.interval)
  129. nextTs := getEarliestEventTs(inputs, current, watermark)
  130. if nextTs == math.MaxInt64 || nextTs % interval == 0{
  131. return nextTs
  132. }
  133. return nextTs + (interval - nextTs % interval)
  134. }
  135. case xsql.SLIDING_WINDOW:
  136. nextTs := getEarliestEventTs(inputs, current, watermark)
  137. return nextTs
  138. case xsql.SESSION_WINDOW:
  139. if len(inputs) > 0{
  140. timeout, duration := int64(w.window.Interval), int64(w.window.Length)
  141. sort.SliceStable(inputs, func(i, j int) bool {
  142. return inputs[i].Timestamp < inputs[j].Timestamp
  143. })
  144. et := inputs[0].Timestamp
  145. tick := et + (duration - et % duration)
  146. if et % duration == 0 {
  147. tick = et
  148. }
  149. var p int64
  150. for _, tuple := range inputs {
  151. var r int64 = math.MaxInt64
  152. if p > 0{
  153. if tuple.Timestamp - p > timeout{
  154. r = p + timeout
  155. }
  156. }
  157. if tuple.Timestamp > tick {
  158. if tick - duration > et && tick < r {
  159. r = tick
  160. }
  161. tick += duration
  162. }
  163. if r < math.MaxInt64{
  164. return r
  165. }
  166. p = tuple.Timestamp
  167. }
  168. }
  169. return math.MaxInt64
  170. default:
  171. return math.MaxInt64
  172. }
  173. }
  174. func (o *WindowOperator) execEventWindow(ctx context.Context) {
  175. exeCtx, cancel := context.WithCancel(ctx)
  176. log := common.GetLogger(ctx)
  177. go o.watermarkGenerator.start(ctx)
  178. var (
  179. inputs []*xsql.Tuple
  180. triggered bool
  181. nextWindowEndTs int64
  182. prevWindowEndTs int64
  183. )
  184. for {
  185. select {
  186. // process incoming item
  187. case item, opened := <-o.input:
  188. if !opened {
  189. break
  190. }
  191. if d, ok := item.(xsql.Event); !ok {
  192. log.Errorf("Expect xsql.Event type")
  193. break
  194. }else{
  195. if d.IsWatermark(){
  196. watermarkTs := d.GetTimestamp()
  197. windowEndTs := nextWindowEndTs
  198. //Session window needs a recalculation of window because its window end depends the inputs
  199. if windowEndTs == math.MaxInt64 || o.window.Type == xsql.SESSION_WINDOW || o.window.Type == xsql.SLIDING_WINDOW{
  200. windowEndTs = o.watermarkGenerator.getNextWindow(inputs, prevWindowEndTs, watermarkTs, triggered)
  201. }
  202. for windowEndTs <= watermarkTs && windowEndTs >= 0 {
  203. log.Debugf("Window end ts %d Watermark ts %d", windowEndTs, watermarkTs)
  204. log.Debugf("Current input count %d", len(inputs))
  205. //scan all events and find out the event in the current window
  206. inputs, triggered = o.scan(inputs, windowEndTs, ctx)
  207. prevWindowEndTs = windowEndTs
  208. windowEndTs = o.watermarkGenerator.getNextWindow(inputs, windowEndTs, watermarkTs, triggered)
  209. }
  210. nextWindowEndTs = windowEndTs
  211. log.Debugf("next window end %d", nextWindowEndTs)
  212. }else{
  213. tuple, ok := d.(*xsql.Tuple)
  214. if !ok{
  215. log.Infof("receive non tuple element %v", d)
  216. }
  217. log.Debugf("event window receive tuple %s", tuple.Message)
  218. if o.watermarkGenerator.track(tuple.Emitter, d.GetTimestamp(),ctx){
  219. inputs = append(inputs, tuple)
  220. }
  221. }
  222. }
  223. // is cancelling
  224. case <-exeCtx.Done():
  225. log.Println("Cancelling window....")
  226. if o.ticker != nil{
  227. o.ticker.Stop()
  228. }
  229. cancel()
  230. return
  231. }
  232. }
  233. }
  234. func getEarliestEventTs(inputs []*xsql.Tuple, startTs int64, endTs int64) int64{
  235. var minTs int64 = math.MaxInt64
  236. for _, t := range inputs{
  237. if t.Timestamp > startTs && t.Timestamp <= endTs && t.Timestamp < minTs{
  238. minTs = t.Timestamp
  239. }
  240. }
  241. return minTs
  242. }