event_window_trigger.go 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233
  1. // Copyright 2021-2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package node
  15. import (
  16. "fmt"
  17. "math"
  18. "time"
  19. "github.com/lf-edge/ekuiper/internal/xsql"
  20. "github.com/lf-edge/ekuiper/pkg/api"
  21. "github.com/lf-edge/ekuiper/pkg/ast"
  22. )
  23. // EventTimeTrigger scans the input tuples and find out the tuples in the current window
  24. // The inputs are sorted by watermark op
  25. type EventTimeTrigger struct {
  26. window *WindowConfig
  27. interval int64
  28. }
  29. func NewEventTimeTrigger(window *WindowConfig) (*EventTimeTrigger, error) {
  30. w := &EventTimeTrigger{
  31. window: window,
  32. }
  33. switch window.Type {
  34. case ast.NOT_WINDOW:
  35. case ast.TUMBLING_WINDOW:
  36. w.interval = window.Length
  37. case ast.HOPPING_WINDOW:
  38. w.interval = window.Interval
  39. case ast.SLIDING_WINDOW:
  40. w.interval = window.Length
  41. case ast.SESSION_WINDOW:
  42. // Use timeout to update watermark
  43. w.interval = window.Interval
  44. default:
  45. return nil, fmt.Errorf("unsupported window type %d", window.Type)
  46. }
  47. return w, nil
  48. }
  49. // If the window end cannot be determined yet, return max int64 so that it can be recalculated for the next watermark
  50. func (w *EventTimeTrigger) getNextWindow(inputs []*xsql.Tuple, current int64, watermark int64) int64 {
  51. switch w.window.Type {
  52. case ast.TUMBLING_WINDOW, ast.HOPPING_WINDOW:
  53. if current > 0 {
  54. return current + w.interval
  55. } else { // first run without a previous window
  56. nextTs := getEarliestEventTs(inputs, current, watermark)
  57. if nextTs == math.MaxInt64 {
  58. return nextTs
  59. }
  60. return getAlignedWindowEndTime(time.UnixMilli(nextTs), w.window.RawInterval, w.window.TimeUnit).UnixMilli()
  61. }
  62. case ast.SLIDING_WINDOW:
  63. nextTs := getEarliestEventTs(inputs, current, watermark)
  64. return nextTs
  65. default:
  66. return math.MaxInt64
  67. }
  68. }
  69. func (w *EventTimeTrigger) getNextSessionWindow(inputs []*xsql.Tuple, now int64) (int64, bool) {
  70. if len(inputs) > 0 {
  71. timeout, duration := w.window.Interval, w.window.Length
  72. et := inputs[0].Timestamp
  73. tick := getAlignedWindowEndTime(time.UnixMilli(et), w.window.RawInterval, w.window.TimeUnit).UnixMilli()
  74. var p int64
  75. ticked := false
  76. for _, tuple := range inputs {
  77. var r int64 = math.MaxInt64
  78. if p > 0 {
  79. if tuple.Timestamp-p > timeout {
  80. r = p + timeout
  81. }
  82. }
  83. if tuple.Timestamp > tick {
  84. if tick-duration > et && tick < r {
  85. r = tick
  86. ticked = true
  87. }
  88. tick += duration
  89. }
  90. if r < math.MaxInt64 {
  91. return r, ticked
  92. }
  93. p = tuple.Timestamp
  94. }
  95. if p > 0 {
  96. if now-p > timeout {
  97. return p + timeout, ticked
  98. }
  99. }
  100. }
  101. return math.MaxInt64, false
  102. }
  103. func (o *WindowOperator) execEventWindow(ctx api.StreamContext, inputs []*xsql.Tuple, _ chan<- error) {
  104. log := ctx.GetLogger()
  105. var (
  106. nextWindowEndTs int64
  107. prevWindowEndTs int64
  108. lastTicked bool
  109. )
  110. isTupleMatch := make(map[*xsql.Tuple]bool, 0)
  111. for {
  112. select {
  113. // process incoming item
  114. case item, opened := <-o.input:
  115. if !opened {
  116. o.statManager.IncTotalExceptions("input channel closed")
  117. break
  118. }
  119. processed := false
  120. if item, processed = o.preprocess(item); processed {
  121. break
  122. }
  123. switch d := item.(type) {
  124. case error:
  125. _ = o.Broadcast(d)
  126. o.statManager.IncTotalExceptions(d.Error())
  127. case *xsql.WatermarkTuple:
  128. ctx.GetLogger().Debug("WatermarkTuple", d.GetTimestamp())
  129. watermarkTs := d.GetTimestamp()
  130. if o.window.Type == ast.SLIDING_WINDOW {
  131. for len(o.delayTS) > 0 && watermarkTs >= o.delayTS[0] {
  132. inputs = o.scan(inputs, o.delayTS[0], ctx)
  133. o.delayTS = o.delayTS[1:]
  134. }
  135. }
  136. windowEndTs := nextWindowEndTs
  137. ticked := false
  138. // Session window needs a recalculation of window because its window end depends on the inputs
  139. if windowEndTs == math.MaxInt64 || o.window.Type == ast.SESSION_WINDOW || o.window.Type == ast.SLIDING_WINDOW {
  140. if o.window.Type == ast.SESSION_WINDOW {
  141. windowEndTs, ticked = o.trigger.getNextSessionWindow(inputs, watermarkTs)
  142. } else {
  143. windowEndTs = o.trigger.getNextWindow(inputs, prevWindowEndTs, watermarkTs)
  144. }
  145. }
  146. for windowEndTs <= watermarkTs && windowEndTs >= 0 {
  147. log.Debugf("Current input count %d", len(inputs))
  148. // scan all events and find out the event in the current window
  149. if o.window.Type == ast.SESSION_WINDOW && !lastTicked {
  150. o.triggerTime = inputs[0].Timestamp
  151. }
  152. if windowEndTs > 0 {
  153. if o.window.Type == ast.SLIDING_WINDOW {
  154. var targetTuple *xsql.Tuple
  155. if len(inputs) > 0 && o.window.Type == ast.SLIDING_WINDOW {
  156. for _, t := range inputs {
  157. if t.Timestamp == windowEndTs {
  158. targetTuple = t
  159. break
  160. }
  161. }
  162. }
  163. isMatch := isTupleMatch[targetTuple]
  164. if isMatch {
  165. if o.window.Delay > 0 && o.window.Type == ast.SLIDING_WINDOW {
  166. o.delayTS = append(o.delayTS, windowEndTs+o.window.Delay)
  167. } else {
  168. inputs = o.scan(inputs, windowEndTs, ctx)
  169. }
  170. }
  171. delete(isTupleMatch, targetTuple)
  172. } else {
  173. inputs = o.scan(inputs, windowEndTs, ctx)
  174. }
  175. }
  176. prevWindowEndTs = windowEndTs
  177. lastTicked = ticked
  178. if o.window.Type == ast.SESSION_WINDOW {
  179. windowEndTs, ticked = o.trigger.getNextSessionWindow(inputs, watermarkTs)
  180. } else {
  181. windowEndTs = o.trigger.getNextWindow(inputs, prevWindowEndTs, watermarkTs)
  182. }
  183. log.Debugf("Window end ts %d Watermark ts %d\n", windowEndTs, watermarkTs)
  184. }
  185. nextWindowEndTs = windowEndTs
  186. log.Debugf("next window end %d", nextWindowEndTs)
  187. case *xsql.Tuple:
  188. ctx.GetLogger().Debug("Tuple", d.GetTimestamp())
  189. o.statManager.ProcessTimeStart()
  190. o.statManager.IncTotalRecordsIn()
  191. log.Debugf("event window receive tuple %s", d.Message)
  192. // first tuple, set the window start time, which will set to triggerTime
  193. if o.triggerTime == 0 {
  194. o.triggerTime = d.Timestamp
  195. }
  196. isTupleMatch[d] = o.isMatchCondition(ctx, d)
  197. inputs = append(inputs, d)
  198. o.statManager.ProcessTimeEnd()
  199. _ = ctx.PutState(WindowInputsKey, inputs)
  200. default:
  201. e := fmt.Errorf("run Window error: expect xsql.Event type but got %[1]T(%[1]v)", d)
  202. _ = o.Broadcast(e)
  203. o.statManager.IncTotalExceptions(e.Error())
  204. }
  205. // is cancelling
  206. case <-ctx.Done():
  207. log.Infoln("Cancelling window....")
  208. if o.ticker != nil {
  209. o.ticker.Stop()
  210. }
  211. return
  212. }
  213. }
  214. }
  215. func getEarliestEventTs(inputs []*xsql.Tuple, startTs int64, endTs int64) int64 {
  216. var minTs int64 = math.MaxInt64
  217. for _, t := range inputs {
  218. if t.Timestamp > startTs && t.Timestamp <= endTs && t.Timestamp < minTs {
  219. minTs = t.Timestamp
  220. }
  221. }
  222. return minTs
  223. }