window_op.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496
  1. // Copyright 2021-2022 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package node
  15. import (
  16. "encoding/gob"
  17. "fmt"
  18. "github.com/benbjohnson/clock"
  19. "github.com/lf-edge/ekuiper/internal/conf"
  20. "github.com/lf-edge/ekuiper/internal/xsql"
  21. "github.com/lf-edge/ekuiper/pkg/api"
  22. "github.com/lf-edge/ekuiper/pkg/ast"
  23. "github.com/lf-edge/ekuiper/pkg/cast"
  24. "github.com/lf-edge/ekuiper/pkg/infra"
  25. "math"
  26. "time"
  27. )
  28. type WindowConfig struct {
  29. Type ast.WindowType
  30. Length int
  31. Interval int //If interval is not set, it is equals to Length
  32. }
  33. type WindowOperator struct {
  34. *defaultSinkNode
  35. window *WindowConfig
  36. interval int
  37. isEventTime bool
  38. watermarkGenerator *WatermarkGenerator //For event time only
  39. statManager StatManager
  40. ticker *clock.Ticker //For processing time only
  41. // states
  42. triggerTime int64
  43. msgCount int
  44. }
  45. const WINDOW_INPUTS_KEY = "$$windowInputs"
  46. const TRIGGER_TIME_KEY = "$$triggerTime"
  47. const MSG_COUNT_KEY = "$$msgCount"
  48. func init() {
  49. gob.Register([]*xsql.Tuple{})
  50. }
  51. func NewWindowOp(name string, w WindowConfig, streams []string, options *api.RuleOption) (*WindowOperator, error) {
  52. o := new(WindowOperator)
  53. o.defaultSinkNode = &defaultSinkNode{
  54. input: make(chan interface{}, options.BufferLength),
  55. defaultNode: &defaultNode{
  56. outputs: make(map[string]chan<- interface{}),
  57. name: name,
  58. sendError: options.SendError,
  59. },
  60. }
  61. o.isEventTime = options.IsEventTime
  62. o.window = &w
  63. if o.window.Interval == 0 && o.window.Type == ast.COUNT_WINDOW {
  64. //if no interval value is set and it's count window, then set interval to length value.
  65. o.window.Interval = o.window.Length
  66. }
  67. if options.IsEventTime {
  68. //Create watermark generator
  69. if w, err := NewWatermarkGenerator(o.window, options.LateTol, streams, o.input); err != nil {
  70. return nil, err
  71. } else {
  72. o.watermarkGenerator = w
  73. }
  74. }
  75. return o, nil
  76. }
  77. // Exec is the entry point for the executor
  78. // input: *xsql.Tuple from preprocessor
  79. // output: xsql.WindowTuplesSet
  80. func (o *WindowOperator) Exec(ctx api.StreamContext, errCh chan<- error) {
  81. o.ctx = ctx
  82. log := ctx.GetLogger()
  83. log.Debugf("Window operator %s is started", o.name)
  84. if len(o.outputs) <= 0 {
  85. infra.DrainError(ctx, fmt.Errorf("no output channel found"), errCh)
  86. return
  87. }
  88. stats, err := NewStatManager(ctx, "op")
  89. if err != nil {
  90. infra.DrainError(ctx, err, errCh)
  91. return
  92. }
  93. o.statManager = stats
  94. var inputs []*xsql.Tuple
  95. if s, err := ctx.GetState(WINDOW_INPUTS_KEY); err == nil {
  96. switch st := s.(type) {
  97. case []*xsql.Tuple:
  98. inputs = st
  99. log.Infof("Restore window state %+v", inputs)
  100. case nil:
  101. log.Debugf("Restore window state, nothing")
  102. default:
  103. infra.DrainError(ctx, fmt.Errorf("restore window state `inputs` %v error, invalid type", st), errCh)
  104. return
  105. }
  106. } else {
  107. log.Warnf("Restore window state fails: %s", err)
  108. }
  109. o.triggerTime = conf.GetNowInMilli()
  110. if s, err := ctx.GetState(TRIGGER_TIME_KEY); err == nil && s != nil {
  111. if si, ok := s.(int64); ok {
  112. o.triggerTime = si
  113. } else {
  114. errCh <- fmt.Errorf("restore window state `triggerTime` %v error, invalid type", s)
  115. }
  116. }
  117. o.msgCount = 0
  118. if s, err := ctx.GetState(MSG_COUNT_KEY); err == nil && s != nil {
  119. if si, ok := s.(int); ok {
  120. o.msgCount = si
  121. } else {
  122. infra.DrainError(ctx, fmt.Errorf("restore window state `msgCount` %v error, invalid type", s), errCh)
  123. return
  124. }
  125. }
  126. log.Infof("Start with window state triggerTime: %d, msgCount: %d", o.triggerTime, o.msgCount)
  127. if o.isEventTime {
  128. go func() {
  129. err := infra.SafeRun(func() error {
  130. o.execEventWindow(ctx, inputs, errCh)
  131. return nil
  132. })
  133. if err != nil {
  134. infra.DrainError(ctx, err, errCh)
  135. }
  136. }()
  137. } else {
  138. go func() {
  139. err := infra.SafeRun(func() error {
  140. o.execProcessingWindow(ctx, inputs, errCh)
  141. return nil
  142. })
  143. if err != nil {
  144. infra.DrainError(ctx, err, errCh)
  145. }
  146. }()
  147. }
  148. }
  149. func (o *WindowOperator) execProcessingWindow(ctx api.StreamContext, inputs []*xsql.Tuple, errCh chan<- error) {
  150. log := ctx.GetLogger()
  151. var (
  152. c <-chan time.Time
  153. timeoutTicker *clock.Timer
  154. timeout <-chan time.Time
  155. )
  156. switch o.window.Type {
  157. case ast.NOT_WINDOW:
  158. case ast.TUMBLING_WINDOW:
  159. o.ticker = conf.GetTicker(o.window.Length)
  160. o.interval = o.window.Length
  161. case ast.HOPPING_WINDOW:
  162. o.ticker = conf.GetTicker(o.window.Interval)
  163. o.interval = o.window.Interval
  164. case ast.SLIDING_WINDOW:
  165. o.interval = o.window.Length
  166. case ast.SESSION_WINDOW:
  167. o.ticker = conf.GetTicker(o.window.Length)
  168. o.interval = o.window.Interval
  169. case ast.COUNT_WINDOW:
  170. o.interval = o.window.Interval
  171. }
  172. if o.ticker != nil {
  173. c = o.ticker.C
  174. //resume previous window
  175. if len(inputs) > 0 && o.triggerTime > 0 {
  176. nextTick := conf.GetNowInMilli() + int64(o.interval)
  177. next := o.triggerTime
  178. switch o.window.Type {
  179. case ast.TUMBLING_WINDOW, ast.HOPPING_WINDOW:
  180. for {
  181. next = next + int64(o.interval)
  182. if next > nextTick {
  183. break
  184. }
  185. log.Debugf("triggered by restore inputs")
  186. inputs, _ = o.scan(inputs, next, ctx)
  187. ctx.PutState(WINDOW_INPUTS_KEY, inputs)
  188. ctx.PutState(TRIGGER_TIME_KEY, o.triggerTime)
  189. }
  190. case ast.SESSION_WINDOW:
  191. timeout, duration := int64(o.window.Interval), int64(o.window.Length)
  192. for {
  193. et := inputs[0].Timestamp
  194. tick := et + (duration - et%duration)
  195. if et%duration == 0 {
  196. tick = et
  197. }
  198. var p int64
  199. for _, tuple := range inputs {
  200. var r int64 = math.MaxInt64
  201. if p > 0 {
  202. if tuple.Timestamp-p > timeout {
  203. r = p + timeout
  204. }
  205. }
  206. if tuple.Timestamp > tick {
  207. if tick-duration > et && tick < r {
  208. r = tick
  209. }
  210. tick += duration
  211. }
  212. if r < math.MaxInt64 {
  213. next = r
  214. break
  215. }
  216. p = tuple.Timestamp
  217. }
  218. if next > nextTick {
  219. break
  220. }
  221. log.Debugf("triggered by restore inputs")
  222. inputs, _ = o.scan(inputs, next, ctx)
  223. ctx.PutState(WINDOW_INPUTS_KEY, inputs)
  224. ctx.PutState(TRIGGER_TIME_KEY, o.triggerTime)
  225. }
  226. }
  227. }
  228. }
  229. for {
  230. select {
  231. // process incoming item
  232. case item, opened := <-o.input:
  233. processed := false
  234. if item, processed = o.preprocess(item); processed {
  235. break
  236. }
  237. o.statManager.IncTotalRecordsIn()
  238. o.statManager.ProcessTimeStart()
  239. if !opened {
  240. o.statManager.IncTotalExceptions()
  241. break
  242. }
  243. switch d := item.(type) {
  244. case error:
  245. o.Broadcast(d)
  246. o.statManager.IncTotalExceptions()
  247. case *xsql.Tuple:
  248. log.Debugf("Event window receive tuple %s", d.Message)
  249. inputs = append(inputs, d)
  250. switch o.window.Type {
  251. case ast.NOT_WINDOW:
  252. inputs, _ = o.scan(inputs, d.Timestamp, ctx)
  253. case ast.SLIDING_WINDOW:
  254. inputs, _ = o.scan(inputs, d.Timestamp, ctx)
  255. case ast.SESSION_WINDOW:
  256. if timeoutTicker != nil {
  257. timeoutTicker.Stop()
  258. timeoutTicker.Reset(time.Duration(o.window.Interval) * time.Millisecond)
  259. } else {
  260. timeoutTicker = conf.GetTimer(o.window.Interval)
  261. timeout = timeoutTicker.C
  262. o.triggerTime = d.Timestamp
  263. ctx.PutState(TRIGGER_TIME_KEY, o.triggerTime)
  264. log.Debugf("Session window set start time %d", o.triggerTime)
  265. }
  266. case ast.COUNT_WINDOW:
  267. o.msgCount++
  268. log.Debugf(fmt.Sprintf("msgCount: %d", o.msgCount))
  269. if o.msgCount%o.window.Interval != 0 {
  270. continue
  271. } else {
  272. o.msgCount = 0
  273. }
  274. if tl, er := NewTupleList(inputs, o.window.Length); er != nil {
  275. log.Error(fmt.Sprintf("Found error when trying to "))
  276. infra.DrainError(ctx, er, errCh)
  277. return
  278. } else {
  279. log.Debugf(fmt.Sprintf("It has %d of count window.", tl.count()))
  280. triggerTime := conf.GetNowInMilli()
  281. for tl.hasMoreCountWindow() {
  282. tsets := tl.nextCountWindow()
  283. tsets.WindowStart = triggerTime
  284. triggerTime = conf.GetNowInMilli()
  285. tsets.WindowEnd = triggerTime
  286. log.Debugf("Sent: %v", tsets)
  287. o.Broadcast(tsets)
  288. o.statManager.IncTotalRecordsOut()
  289. }
  290. inputs = tl.getRestTuples()
  291. }
  292. }
  293. o.statManager.ProcessTimeEnd()
  294. o.statManager.SetBufferLength(int64(len(o.input)))
  295. ctx.PutState(WINDOW_INPUTS_KEY, inputs)
  296. ctx.PutState(MSG_COUNT_KEY, o.msgCount)
  297. default:
  298. o.Broadcast(fmt.Errorf("run Window error: expect xsql.Tuple type but got %[1]T(%[1]v)", d))
  299. o.statManager.IncTotalExceptions()
  300. }
  301. case now := <-c:
  302. n := cast.TimeToUnixMilli(now)
  303. if o.window.Type == ast.SESSION_WINDOW {
  304. log.Debugf("session window update trigger time %d with %d inputs", n, len(inputs))
  305. if len(inputs) == 0 || n-int64(o.window.Length) < inputs[0].Timestamp {
  306. if len(inputs) > 0 {
  307. log.Debugf("session window last trigger time %d < first tuple %d", n-int64(o.window.Length), inputs[0].Timestamp)
  308. }
  309. break
  310. }
  311. }
  312. if len(inputs) > 0 {
  313. o.statManager.ProcessTimeStart()
  314. log.Debugf("triggered by ticker at %d", n)
  315. inputs, _ = o.scan(inputs, n, ctx)
  316. o.statManager.ProcessTimeEnd()
  317. ctx.PutState(WINDOW_INPUTS_KEY, inputs)
  318. ctx.PutState(TRIGGER_TIME_KEY, o.triggerTime)
  319. }
  320. case now := <-timeout:
  321. if len(inputs) > 0 {
  322. o.statManager.ProcessTimeStart()
  323. log.Debugf("triggered by timeout")
  324. inputs, _ = o.scan(inputs, cast.TimeToUnixMilli(now), ctx)
  325. //expire all inputs, so that when timer scan there is no item
  326. inputs = make([]*xsql.Tuple, 0)
  327. o.statManager.ProcessTimeEnd()
  328. ctx.PutState(WINDOW_INPUTS_KEY, inputs)
  329. ctx.PutState(TRIGGER_TIME_KEY, o.triggerTime)
  330. timeoutTicker = nil
  331. }
  332. // is cancelling
  333. case <-ctx.Done():
  334. log.Infoln("Cancelling window....")
  335. if o.ticker != nil {
  336. o.ticker.Stop()
  337. }
  338. return
  339. }
  340. }
  341. }
  342. type TupleList struct {
  343. tuples []*xsql.Tuple
  344. index int //Current index
  345. size int //The size for count window
  346. }
  347. func NewTupleList(tuples []*xsql.Tuple, windowSize int) (TupleList, error) {
  348. if windowSize <= 0 {
  349. return TupleList{}, fmt.Errorf("Window size should not be less than zero.")
  350. } else if tuples == nil || len(tuples) == 0 {
  351. return TupleList{}, fmt.Errorf("The tuples should not be nil or empty.")
  352. }
  353. tl := TupleList{tuples: tuples, size: windowSize}
  354. return tl, nil
  355. }
  356. func (tl *TupleList) hasMoreCountWindow() bool {
  357. if len(tl.tuples) < tl.size {
  358. return false
  359. }
  360. return tl.index == 0
  361. }
  362. func (tl *TupleList) count() int {
  363. if len(tl.tuples) < tl.size {
  364. return 0
  365. } else {
  366. return 1
  367. }
  368. }
  369. func (tl *TupleList) nextCountWindow() xsql.WindowTuplesSet {
  370. results := xsql.WindowTuplesSet{
  371. Content: make([]xsql.WindowTuples, 0),
  372. WindowRange: &xsql.WindowRange{},
  373. }
  374. var subT []*xsql.Tuple
  375. subT = tl.tuples[len(tl.tuples)-tl.size : len(tl.tuples)]
  376. for _, tuple := range subT {
  377. results = results.AddTuple(tuple)
  378. }
  379. tl.index = tl.index + 1
  380. return results
  381. }
  382. func (tl *TupleList) getRestTuples() []*xsql.Tuple {
  383. if len(tl.tuples) < tl.size {
  384. return tl.tuples
  385. }
  386. return tl.tuples[len(tl.tuples)-tl.size+1:]
  387. }
  388. func (o *WindowOperator) scan(inputs []*xsql.Tuple, triggerTime int64, ctx api.StreamContext) ([]*xsql.Tuple, bool) {
  389. log := ctx.GetLogger()
  390. log.Debugf("window %s triggered at %s(%d)", o.name, time.Unix(triggerTime/1000, triggerTime%1000), triggerTime)
  391. var delta int64
  392. if o.window.Type == ast.HOPPING_WINDOW || o.window.Type == ast.SLIDING_WINDOW {
  393. delta = o.calDelta(triggerTime, delta, log)
  394. }
  395. results := xsql.WindowTuplesSet{
  396. Content: make([]xsql.WindowTuples, 0),
  397. WindowRange: &xsql.WindowRange{
  398. WindowEnd: triggerTime,
  399. },
  400. }
  401. i := 0
  402. //Sync table
  403. for _, tuple := range inputs {
  404. if o.window.Type == ast.HOPPING_WINDOW || o.window.Type == ast.SLIDING_WINDOW {
  405. diff := triggerTime - tuple.Timestamp
  406. if diff > int64(o.window.Length)+delta {
  407. log.Debugf("diff: %d, length: %d, delta: %d", diff, o.window.Length, delta)
  408. log.Debugf("tuple %s emitted at %d expired", tuple, tuple.Timestamp)
  409. //Expired tuple, remove it by not adding back to inputs
  410. continue
  411. }
  412. //Added back all inputs for non expired events
  413. inputs[i] = tuple
  414. i++
  415. } else if tuple.Timestamp > triggerTime {
  416. //Only added back early arrived events
  417. inputs[i] = tuple
  418. i++
  419. }
  420. if tuple.Timestamp <= triggerTime {
  421. results = results.AddTuple(tuple)
  422. }
  423. }
  424. triggered := false
  425. if len(results.Content) > 0 {
  426. switch o.window.Type {
  427. case ast.TUMBLING_WINDOW, ast.SESSION_WINDOW:
  428. results.WindowStart = o.triggerTime
  429. case ast.HOPPING_WINDOW:
  430. results.WindowStart = o.triggerTime - int64(o.window.Interval)
  431. case ast.SLIDING_WINDOW:
  432. results.WindowStart = triggerTime - int64(o.window.Length)
  433. }
  434. log.Debugf("window %s triggered for %d tuples", o.name, len(inputs))
  435. if o.isEventTime {
  436. results.Sort()
  437. }
  438. log.Debugf("Sent: %v", results)
  439. o.Broadcast(results)
  440. triggered = true
  441. o.triggerTime = triggerTime
  442. o.statManager.IncTotalRecordsOut()
  443. log.Debugf("done scan")
  444. }
  445. return inputs[:i], triggered
  446. }
  447. func (o *WindowOperator) calDelta(triggerTime int64, delta int64, log api.Logger) int64 {
  448. lastTriggerTime := o.triggerTime
  449. if lastTriggerTime <= 0 {
  450. delta = math.MaxInt16 //max int, all events for the initial window
  451. } else {
  452. if !o.isEventTime && o.window.Interval > 0 {
  453. delta = triggerTime - lastTriggerTime - int64(o.window.Interval)
  454. if delta > 100 {
  455. log.Warnf("Possible long computation in window; Previous eviction time: %d, current eviction time: %d", lastTriggerTime, triggerTime)
  456. }
  457. } else {
  458. delta = 0
  459. }
  460. }
  461. return delta
  462. }
  463. func (o *WindowOperator) GetMetrics() [][]interface{} {
  464. if o.statManager != nil {
  465. return [][]interface{}{
  466. o.statManager.GetMetrics(),
  467. }
  468. } else {
  469. return nil
  470. }
  471. }