event_window_trigger.go 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224
  1. // Copyright 2021-2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package node
  15. import (
  16. "fmt"
  17. "math"
  18. "time"
  19. "github.com/lf-edge/ekuiper/internal/xsql"
  20. "github.com/lf-edge/ekuiper/pkg/api"
  21. "github.com/lf-edge/ekuiper/pkg/ast"
  22. )
  23. // EventTimeTrigger scans the input tuples and find out the tuples in the current window
  24. // The inputs are sorted by watermark op
  25. type EventTimeTrigger struct {
  26. window *WindowConfig
  27. interval int64
  28. }
  29. func NewEventTimeTrigger(window *WindowConfig) (*EventTimeTrigger, error) {
  30. w := &EventTimeTrigger{
  31. window: window,
  32. }
  33. switch window.Type {
  34. case ast.NOT_WINDOW:
  35. case ast.TUMBLING_WINDOW:
  36. w.interval = window.Length
  37. case ast.HOPPING_WINDOW:
  38. w.interval = window.Interval
  39. case ast.SLIDING_WINDOW:
  40. w.interval = window.Length
  41. case ast.SESSION_WINDOW:
  42. // Use timeout to update watermark
  43. w.interval = window.Interval
  44. default:
  45. return nil, fmt.Errorf("unsupported window type %d", window.Type)
  46. }
  47. return w, nil
  48. }
  49. // If the window end cannot be determined yet, return max int64 so that it can be recalculated for the next watermark
  50. func (w *EventTimeTrigger) getNextWindow(inputs []*xsql.Tuple, current int64, watermark int64) int64 {
  51. switch w.window.Type {
  52. case ast.TUMBLING_WINDOW, ast.HOPPING_WINDOW:
  53. if current > 0 {
  54. return current + w.interval
  55. } else { // first run without a previous window
  56. nextTs := getEarliestEventTs(inputs, current, watermark)
  57. if nextTs == math.MaxInt64 {
  58. return nextTs
  59. }
  60. return getAlignedWindowEndTime(time.UnixMilli(nextTs), w.window.RawInterval, w.window.TimeUnit).UnixMilli()
  61. }
  62. case ast.SLIDING_WINDOW:
  63. nextTs := getEarliestEventTs(inputs, current, watermark)
  64. return nextTs
  65. default:
  66. return math.MaxInt64
  67. }
  68. }
  69. func (w *EventTimeTrigger) getNextSessionWindow(inputs []*xsql.Tuple, now int64) (int64, bool) {
  70. if len(inputs) > 0 {
  71. timeout, duration := w.window.Interval, w.window.Length
  72. et := inputs[0].Timestamp
  73. tick := getAlignedWindowEndTime(time.UnixMilli(et), w.window.RawInterval, w.window.TimeUnit).UnixMilli()
  74. var p int64
  75. ticked := false
  76. for _, tuple := range inputs {
  77. var r int64 = math.MaxInt64
  78. if p > 0 {
  79. if tuple.Timestamp-p > timeout {
  80. r = p + timeout
  81. }
  82. }
  83. if tuple.Timestamp > tick {
  84. if tick-duration > et && tick < r {
  85. r = tick
  86. ticked = true
  87. }
  88. tick += duration
  89. }
  90. if r < math.MaxInt64 {
  91. return r, ticked
  92. }
  93. p = tuple.Timestamp
  94. }
  95. if p > 0 {
  96. if now-p > timeout {
  97. return p + timeout, ticked
  98. }
  99. }
  100. }
  101. return math.MaxInt64, false
  102. }
  103. func (o *WindowOperator) execEventWindow(ctx api.StreamContext, inputs []*xsql.Tuple, _ chan<- error) {
  104. log := ctx.GetLogger()
  105. var (
  106. nextWindowEndTs int64
  107. prevWindowEndTs int64
  108. lastTicked bool
  109. )
  110. for {
  111. select {
  112. // process incoming item
  113. case item, opened := <-o.input:
  114. if !opened {
  115. o.statManager.IncTotalExceptions("input channel closed")
  116. break
  117. }
  118. processed := false
  119. if item, processed = o.preprocess(item); processed {
  120. break
  121. }
  122. switch d := item.(type) {
  123. case error:
  124. _ = o.Broadcast(d)
  125. o.statManager.IncTotalExceptions(d.Error())
  126. case *xsql.WatermarkTuple:
  127. ctx.GetLogger().Debug("WatermarkTuple", d.GetTimestamp())
  128. watermarkTs := d.GetTimestamp()
  129. if o.window.Type == ast.SLIDING_WINDOW {
  130. for len(o.delayTS) > 0 && watermarkTs >= o.delayTS[0] {
  131. inputs = o.scan(inputs, o.delayTS[0], ctx)
  132. o.delayTS = o.delayTS[1:]
  133. }
  134. }
  135. windowEndTs := nextWindowEndTs
  136. ticked := false
  137. // Session window needs a recalculation of window because its window end depends on the inputs
  138. if windowEndTs == math.MaxInt64 || o.window.Type == ast.SESSION_WINDOW || o.window.Type == ast.SLIDING_WINDOW {
  139. if o.window.Type == ast.SESSION_WINDOW {
  140. windowEndTs, ticked = o.trigger.getNextSessionWindow(inputs, watermarkTs)
  141. } else {
  142. windowEndTs = o.trigger.getNextWindow(inputs, prevWindowEndTs, watermarkTs)
  143. }
  144. }
  145. for windowEndTs <= watermarkTs && windowEndTs >= 0 {
  146. log.Debugf("Current input count %d", len(inputs))
  147. // scan all events and find out the event in the current window
  148. if o.window.Type == ast.SESSION_WINDOW && !lastTicked {
  149. o.triggerTime = inputs[0].Timestamp
  150. }
  151. if windowEndTs > 0 {
  152. if o.window.Type == ast.SLIDING_WINDOW {
  153. for len(o.triggerTS) > 0 && o.triggerTS[0] <= watermarkTs {
  154. if o.window.Delay > 0 {
  155. o.delayTS = append(o.delayTS, o.triggerTS[0]+o.window.Delay)
  156. } else {
  157. inputs = o.scan(inputs, o.triggerTS[0], ctx)
  158. }
  159. o.triggerTS = o.triggerTS[1:]
  160. }
  161. } else {
  162. inputs = o.scan(inputs, windowEndTs, ctx)
  163. }
  164. }
  165. prevWindowEndTs = windowEndTs
  166. lastTicked = ticked
  167. if o.window.Type == ast.SESSION_WINDOW {
  168. windowEndTs, ticked = o.trigger.getNextSessionWindow(inputs, watermarkTs)
  169. } else {
  170. windowEndTs = o.trigger.getNextWindow(inputs, prevWindowEndTs, watermarkTs)
  171. }
  172. log.Debugf("Window end ts %d Watermark ts %d\n", windowEndTs, watermarkTs)
  173. }
  174. nextWindowEndTs = windowEndTs
  175. log.Debugf("next window end %d", nextWindowEndTs)
  176. case *xsql.Tuple:
  177. ctx.GetLogger().Debug("Tuple", d.GetTimestamp())
  178. o.statManager.ProcessTimeStart()
  179. o.statManager.IncTotalRecordsIn()
  180. log.Debugf("event window receive tuple %s", d.Message)
  181. // first tuple, set the window start time, which will set to triggerTime
  182. if o.triggerTime == 0 {
  183. o.triggerTime = d.Timestamp
  184. }
  185. if o.window.Type == ast.SLIDING_WINDOW && o.isMatchCondition(ctx, d) {
  186. o.triggerTS = append(o.triggerTS, d.GetTimestamp())
  187. }
  188. inputs = append(inputs, d)
  189. o.statManager.ProcessTimeEnd()
  190. _ = ctx.PutState(WindowInputsKey, inputs)
  191. default:
  192. e := fmt.Errorf("run Window error: expect xsql.Event type but got %[1]T(%[1]v)", d)
  193. _ = o.Broadcast(e)
  194. o.statManager.IncTotalExceptions(e.Error())
  195. }
  196. // is cancelling
  197. case <-ctx.Done():
  198. log.Infoln("Cancelling window....")
  199. if o.ticker != nil {
  200. o.ticker.Stop()
  201. }
  202. return
  203. }
  204. }
  205. }
  206. func getEarliestEventTs(inputs []*xsql.Tuple, startTs int64, endTs int64) int64 {
  207. var minTs int64 = math.MaxInt64
  208. for _, t := range inputs {
  209. if t.Timestamp > startTs && t.Timestamp <= endTs && t.Timestamp < minTs {
  210. minTs = t.Timestamp
  211. }
  212. }
  213. return minTs
  214. }