event_window_trigger.go 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203
  1. // Copyright 2021-2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package node
  15. import (
  16. "fmt"
  17. "math"
  18. "time"
  19. "github.com/lf-edge/ekuiper/internal/xsql"
  20. "github.com/lf-edge/ekuiper/pkg/api"
  21. "github.com/lf-edge/ekuiper/pkg/ast"
  22. )
  23. // EventTimeTrigger scans the input tuples and find out the tuples in the current window
  24. // The inputs are sorted by watermark op
  25. type EventTimeTrigger struct {
  26. window *WindowConfig
  27. interval int64
  28. }
  29. func NewEventTimeTrigger(window *WindowConfig) (*EventTimeTrigger, error) {
  30. w := &EventTimeTrigger{
  31. window: window,
  32. }
  33. switch window.Type {
  34. case ast.NOT_WINDOW:
  35. case ast.TUMBLING_WINDOW:
  36. w.interval = window.Length
  37. case ast.HOPPING_WINDOW:
  38. w.interval = window.Interval
  39. case ast.SLIDING_WINDOW:
  40. w.interval = window.Length
  41. case ast.SESSION_WINDOW:
  42. // Use timeout to update watermark
  43. w.interval = window.Interval
  44. default:
  45. return nil, fmt.Errorf("unsupported window type %d", window.Type)
  46. }
  47. return w, nil
  48. }
  49. // If the window end cannot be determined yet, return max int64 so that it can be recalculated for the next watermark
  50. func (w *EventTimeTrigger) getNextWindow(inputs []*xsql.Tuple, current int64, watermark int64) int64 {
  51. switch w.window.Type {
  52. case ast.TUMBLING_WINDOW, ast.HOPPING_WINDOW:
  53. if current > 0 {
  54. return current + w.interval
  55. } else { // first run without a previous window
  56. nextTs := getEarliestEventTs(inputs, current, watermark)
  57. if nextTs == math.MaxInt64 {
  58. return nextTs
  59. }
  60. return getAlignedWindowEndTime(time.UnixMilli(nextTs), w.window.RawInterval, w.window.TimeUnit).UnixMilli()
  61. }
  62. case ast.SLIDING_WINDOW:
  63. nextTs := getEarliestEventTs(inputs, current, watermark)
  64. return nextTs
  65. default:
  66. return math.MaxInt64
  67. }
  68. }
  69. func (w *EventTimeTrigger) getNextSessionWindow(inputs []*xsql.Tuple, now int64) (int64, bool) {
  70. if len(inputs) > 0 {
  71. timeout, duration := w.window.Interval, w.window.Length
  72. et := inputs[0].Timestamp
  73. tick := getAlignedWindowEndTime(time.UnixMilli(et), w.window.RawInterval, w.window.TimeUnit).UnixMilli()
  74. var p int64
  75. ticked := false
  76. for _, tuple := range inputs {
  77. var r int64 = math.MaxInt64
  78. if p > 0 {
  79. if tuple.Timestamp-p > timeout {
  80. r = p + timeout
  81. }
  82. }
  83. if tuple.Timestamp > tick {
  84. if tick-duration > et && tick < r {
  85. r = tick
  86. ticked = true
  87. }
  88. tick += duration
  89. }
  90. if r < math.MaxInt64 {
  91. return r, ticked
  92. }
  93. p = tuple.Timestamp
  94. }
  95. if p > 0 {
  96. if now-p > timeout {
  97. return p + timeout, ticked
  98. }
  99. }
  100. }
  101. return math.MaxInt64, false
  102. }
  103. func (o *WindowOperator) execEventWindow(ctx api.StreamContext, inputs []*xsql.Tuple, _ chan<- error) {
  104. log := ctx.GetLogger()
  105. var (
  106. nextWindowEndTs int64
  107. prevWindowEndTs int64
  108. lastTicked bool
  109. )
  110. for {
  111. select {
  112. // process incoming item
  113. case item, opened := <-o.input:
  114. if !opened {
  115. o.statManager.IncTotalExceptions("input channel closed")
  116. break
  117. }
  118. processed := false
  119. if item, processed = o.preprocess(item); processed {
  120. break
  121. }
  122. switch d := item.(type) {
  123. case error:
  124. _ = o.Broadcast(d)
  125. o.statManager.IncTotalExceptions(d.Error())
  126. case *xsql.WatermarkTuple:
  127. ctx.GetLogger().Debug("WatermarkTuple", d.GetTimestamp())
  128. watermarkTs := d.GetTimestamp()
  129. windowEndTs := nextWindowEndTs
  130. ticked := false
  131. // Session window needs a recalculation of window because its window end depends on the inputs
  132. if windowEndTs == math.MaxInt64 || o.window.Type == ast.SESSION_WINDOW || o.window.Type == ast.SLIDING_WINDOW {
  133. if o.window.Type == ast.SESSION_WINDOW {
  134. windowEndTs, ticked = o.trigger.getNextSessionWindow(inputs, watermarkTs)
  135. } else {
  136. windowEndTs = o.trigger.getNextWindow(inputs, prevWindowEndTs, watermarkTs)
  137. }
  138. }
  139. for windowEndTs <= watermarkTs && windowEndTs >= 0 {
  140. log.Debugf("Current input count %d", len(inputs))
  141. // scan all events and find out the event in the current window
  142. if o.window.Type == ast.SESSION_WINDOW && !lastTicked {
  143. o.triggerTime = inputs[0].Timestamp
  144. }
  145. if windowEndTs > 0 {
  146. inputs = o.scan(inputs, windowEndTs, ctx)
  147. }
  148. prevWindowEndTs = windowEndTs
  149. lastTicked = ticked
  150. if o.window.Type == ast.SESSION_WINDOW {
  151. windowEndTs, ticked = o.trigger.getNextSessionWindow(inputs, watermarkTs)
  152. } else {
  153. windowEndTs = o.trigger.getNextWindow(inputs, prevWindowEndTs, watermarkTs)
  154. }
  155. log.Debugf("Window end ts %d Watermark ts %d\n", windowEndTs, watermarkTs)
  156. }
  157. nextWindowEndTs = windowEndTs
  158. log.Debugf("next window end %d", nextWindowEndTs)
  159. case *xsql.Tuple:
  160. ctx.GetLogger().Debug("Tuple", d.GetTimestamp())
  161. o.statManager.ProcessTimeStart()
  162. o.statManager.IncTotalRecordsIn()
  163. log.Debugf("event window receive tuple %s", d.Message)
  164. // first tuple, set the window start time, which will set to triggerTime
  165. if o.triggerTime == 0 {
  166. o.triggerTime = d.Timestamp
  167. }
  168. inputs = append(inputs, d)
  169. o.statManager.ProcessTimeEnd()
  170. _ = ctx.PutState(WindowInputsKey, inputs)
  171. default:
  172. e := fmt.Errorf("run Window error: expect xsql.Event type but got %[1]T(%[1]v)", d)
  173. _ = o.Broadcast(e)
  174. o.statManager.IncTotalExceptions(e.Error())
  175. }
  176. // is cancelling
  177. case <-ctx.Done():
  178. log.Infoln("Cancelling window....")
  179. if o.ticker != nil {
  180. o.ticker.Stop()
  181. }
  182. return
  183. }
  184. }
  185. }
  186. func getEarliestEventTs(inputs []*xsql.Tuple, startTs int64, endTs int64) int64 {
  187. var minTs int64 = math.MaxInt64
  188. for _, t := range inputs {
  189. if t.Timestamp > startTs && t.Timestamp <= endTs && t.Timestamp < minTs {
  190. minTs = t.Timestamp
  191. }
  192. }
  193. return minTs
  194. }