watermark.go 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293
  1. // Copyright 2021-2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package node
  15. import (
  16. "context"
  17. "fmt"
  18. "math"
  19. "sort"
  20. "time"
  21. "github.com/lf-edge/ekuiper/internal/xsql"
  22. "github.com/lf-edge/ekuiper/pkg/api"
  23. "github.com/lf-edge/ekuiper/pkg/ast"
  24. "github.com/lf-edge/ekuiper/pkg/infra"
  25. )
  26. type WatermarkTuple struct {
  27. Timestamp int64
  28. }
  29. func (t *WatermarkTuple) GetTimestamp() int64 {
  30. return t.Timestamp
  31. }
  32. func (t *WatermarkTuple) IsWatermark() bool {
  33. return true
  34. }
  35. const WATERMARK_KEY = "$$wartermark"
  36. type WatermarkGenerator struct {
  37. inputTopics []string
  38. topicToTs map[string]int64
  39. window *WindowConfig
  40. lateTolerance int64
  41. interval int64
  42. rawInterval int
  43. timeUnit ast.Token
  44. // ticker *clock.Ticker
  45. stream chan<- interface{}
  46. // state
  47. lastWatermarkTs int64
  48. }
  49. func NewWatermarkGenerator(window *WindowConfig, l int64, s []string, stream chan<- interface{}) (*WatermarkGenerator, error) {
  50. w := &WatermarkGenerator{
  51. window: window,
  52. topicToTs: make(map[string]int64),
  53. lateTolerance: l,
  54. inputTopics: s,
  55. stream: stream,
  56. rawInterval: window.RawInterval,
  57. timeUnit: window.TimeUnit,
  58. }
  59. switch window.Type {
  60. case ast.NOT_WINDOW:
  61. case ast.TUMBLING_WINDOW:
  62. w.interval = window.Length
  63. case ast.HOPPING_WINDOW:
  64. w.interval = window.Interval
  65. case ast.SLIDING_WINDOW:
  66. w.interval = window.Length
  67. case ast.SESSION_WINDOW:
  68. // Use timeout to update watermark
  69. w.interval = window.Interval
  70. default:
  71. return nil, fmt.Errorf("unsupported window type %d", window.Type)
  72. }
  73. return w, nil
  74. }
  75. func (w *WatermarkGenerator) track(s string, ts int64, ctx api.StreamContext) bool {
  76. log := ctx.GetLogger()
  77. log.Debugf("watermark generator track event from topic %s at %d", s, ts)
  78. currentVal, ok := w.topicToTs[s]
  79. if !ok || ts > currentVal {
  80. w.topicToTs[s] = ts
  81. }
  82. r := ts >= w.lastWatermarkTs
  83. if r {
  84. w.trigger(ctx)
  85. }
  86. return r
  87. }
  88. func (w *WatermarkGenerator) trigger(ctx api.StreamContext) {
  89. log := ctx.GetLogger()
  90. watermark := w.computeWatermarkTs(ctx)
  91. log.Debugf("compute watermark event at %d with last %d", watermark, w.lastWatermarkTs)
  92. if watermark > w.lastWatermarkTs {
  93. t := &WatermarkTuple{Timestamp: watermark}
  94. select {
  95. case w.stream <- t:
  96. default: // TODO need to set buffer
  97. }
  98. w.lastWatermarkTs = watermark
  99. ctx.PutState(WATERMARK_KEY, w.lastWatermarkTs)
  100. log.Debugf("scan watermark event at %d", watermark)
  101. }
  102. }
  103. func (w *WatermarkGenerator) computeWatermarkTs(_ context.Context) int64 {
  104. var ts int64
  105. if len(w.topicToTs) >= len(w.inputTopics) {
  106. ts = math.MaxInt64
  107. for _, key := range w.inputTopics {
  108. if ts > w.topicToTs[key] {
  109. ts = w.topicToTs[key]
  110. }
  111. }
  112. }
  113. return ts - w.lateTolerance
  114. }
  115. // If window end cannot be determined yet, return max int64 so that it can be recalculated for the next watermark
  116. func (w *WatermarkGenerator) getNextWindow(inputs []*xsql.Tuple, current int64, watermark int64) int64 {
  117. switch w.window.Type {
  118. case ast.TUMBLING_WINDOW, ast.HOPPING_WINDOW:
  119. if current > 0 {
  120. return current + w.interval
  121. } else { // first run without a previous window
  122. nextTs := getEarliestEventTs(inputs, current, watermark)
  123. if nextTs == math.MaxInt64 {
  124. return nextTs
  125. }
  126. return getAlignedWindowEndTime(time.UnixMilli(nextTs), w.rawInterval, w.timeUnit).UnixMilli()
  127. }
  128. case ast.SLIDING_WINDOW:
  129. nextTs := getEarliestEventTs(inputs, current, watermark)
  130. return nextTs
  131. default:
  132. return math.MaxInt64
  133. }
  134. }
  135. func (w *WatermarkGenerator) getNextSessionWindow(inputs []*xsql.Tuple) (int64, bool) {
  136. if len(inputs) > 0 {
  137. timeout, duration := w.window.Interval, w.window.Length
  138. sort.SliceStable(inputs, func(i, j int) bool {
  139. return inputs[i].Timestamp < inputs[j].Timestamp
  140. })
  141. et := inputs[0].Timestamp
  142. tick := getAlignedWindowEndTime(time.UnixMilli(et), w.rawInterval, w.timeUnit).UnixMilli()
  143. var p int64
  144. ticked := false
  145. for _, tuple := range inputs {
  146. var r int64 = math.MaxInt64
  147. if p > 0 {
  148. if tuple.Timestamp-p > timeout {
  149. r = p + timeout
  150. }
  151. }
  152. if tuple.Timestamp > tick {
  153. if tick-duration > et && tick < r {
  154. r = tick
  155. ticked = true
  156. }
  157. tick += duration
  158. }
  159. if r < math.MaxInt64 {
  160. return r, ticked
  161. }
  162. p = tuple.Timestamp
  163. }
  164. }
  165. return math.MaxInt64, false
  166. }
  167. func (o *WindowOperator) execEventWindow(ctx api.StreamContext, inputs []*xsql.Tuple, errCh chan<- error) {
  168. log := ctx.GetLogger()
  169. var (
  170. nextWindowEndTs int64
  171. prevWindowEndTs int64
  172. lastTicked bool
  173. )
  174. o.watermarkGenerator.lastWatermarkTs = 0
  175. if s, err := ctx.GetState(WATERMARK_KEY); err == nil && s != nil {
  176. if si, ok := s.(int64); ok {
  177. o.watermarkGenerator.lastWatermarkTs = si
  178. } else {
  179. infra.DrainError(ctx, fmt.Errorf("restore window state `lastWatermarkTs` %v error, invalid type", s), errCh)
  180. return
  181. }
  182. }
  183. log.Infof("Start with window state lastWatermarkTs: %d", o.watermarkGenerator.lastWatermarkTs)
  184. for {
  185. select {
  186. // process incoming item
  187. case item, opened := <-o.input:
  188. processed := false
  189. if item, processed = o.preprocess(item); processed {
  190. break
  191. }
  192. o.statManager.ProcessTimeStart()
  193. if !opened {
  194. o.statManager.IncTotalExceptions("input channel closed")
  195. break
  196. }
  197. switch d := item.(type) {
  198. case error:
  199. o.statManager.IncTotalRecordsIn()
  200. o.Broadcast(d)
  201. o.statManager.IncTotalExceptions(d.Error())
  202. case xsql.Event:
  203. if d.IsWatermark() {
  204. watermarkTs := d.GetTimestamp()
  205. windowEndTs := nextWindowEndTs
  206. ticked := false
  207. // Session window needs a recalculation of window because its window end depends on the inputs
  208. if windowEndTs == math.MaxInt64 || o.window.Type == ast.SESSION_WINDOW || o.window.Type == ast.SLIDING_WINDOW {
  209. if o.window.Type == ast.SESSION_WINDOW {
  210. windowEndTs, ticked = o.watermarkGenerator.getNextSessionWindow(inputs)
  211. } else {
  212. windowEndTs = o.watermarkGenerator.getNextWindow(inputs, prevWindowEndTs, watermarkTs)
  213. }
  214. }
  215. for windowEndTs <= watermarkTs && windowEndTs >= 0 {
  216. log.Debugf("Window end ts %d Watermark ts %d", windowEndTs, watermarkTs)
  217. log.Debugf("Current input count %d", len(inputs))
  218. // scan all events and find out the event in the current window
  219. if o.window.Type == ast.SESSION_WINDOW && !lastTicked {
  220. o.triggerTime = inputs[0].Timestamp
  221. }
  222. if windowEndTs > 0 {
  223. inputs = o.scan(inputs, windowEndTs, ctx)
  224. }
  225. prevWindowEndTs = windowEndTs
  226. lastTicked = ticked
  227. if o.window.Type == ast.SESSION_WINDOW {
  228. windowEndTs, ticked = o.watermarkGenerator.getNextSessionWindow(inputs)
  229. } else {
  230. windowEndTs = o.watermarkGenerator.getNextWindow(inputs, prevWindowEndTs, watermarkTs)
  231. }
  232. }
  233. nextWindowEndTs = windowEndTs
  234. log.Debugf("next window end %d", nextWindowEndTs)
  235. } else {
  236. o.statManager.IncTotalRecordsIn()
  237. tuple, ok := d.(*xsql.Tuple)
  238. if !ok {
  239. log.Debugf("receive non tuple element %v", d)
  240. }
  241. log.Debugf("event window receive tuple %s", tuple.Message)
  242. // first tuple, set the window start time, which will set to triggerTime
  243. if o.triggerTime == 0 {
  244. o.triggerTime = tuple.Timestamp
  245. }
  246. if o.watermarkGenerator.track(tuple.Emitter, d.GetTimestamp(), ctx) {
  247. inputs = append(inputs, tuple)
  248. }
  249. }
  250. o.statManager.ProcessTimeEnd()
  251. ctx.PutState(WINDOW_INPUTS_KEY, inputs)
  252. default:
  253. o.statManager.IncTotalRecordsIn()
  254. e := fmt.Errorf("run Window error: expect xsql.Event type but got %[1]T(%[1]v)", d)
  255. o.Broadcast(e)
  256. o.statManager.IncTotalExceptions(e.Error())
  257. }
  258. // is cancelling
  259. case <-ctx.Done():
  260. log.Infoln("Cancelling window....")
  261. if o.ticker != nil {
  262. o.ticker.Stop()
  263. }
  264. return
  265. }
  266. }
  267. }
  268. func getEarliestEventTs(inputs []*xsql.Tuple, startTs int64, endTs int64) int64 {
  269. var minTs int64 = math.MaxInt64
  270. for _, t := range inputs {
  271. if t.Timestamp > startTs && t.Timestamp <= endTs && t.Timestamp < minTs {
  272. minTs = t.Timestamp
  273. }
  274. }
  275. return minTs
  276. }