window_op.go 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197
  1. package operators
  2. import (
  3. "context"
  4. "engine/common"
  5. "engine/xsql"
  6. "fmt"
  7. "math"
  8. "time"
  9. )
  10. type WindowType int
  11. const (
  12. NO_WINDOW WindowType = iota
  13. TUMBLING_WINDOW
  14. HOPPING_WINDOW
  15. SLIDING_WINDOW
  16. SESSION_WINDOW
  17. )
  18. type WindowConfig struct {
  19. Type WindowType
  20. Length int64
  21. Interval int64 //If interval is not set, it is equals to Length
  22. }
  23. type WindowOperator struct {
  24. input chan interface{}
  25. outputs map[string]chan<- interface{}
  26. name string
  27. ticker *time.Ticker
  28. window *WindowConfig
  29. interval int64
  30. triggerTime int64
  31. }
  32. func NewWindowOp(name string, config *WindowConfig) *WindowOperator {
  33. o := new(WindowOperator)
  34. o.input = make(chan interface{}, 1024)
  35. o.outputs = make(map[string]chan<- interface{})
  36. o.name = name
  37. o.window = config
  38. switch config.Type{
  39. case NO_WINDOW:
  40. case TUMBLING_WINDOW:
  41. o.ticker = time.NewTicker(time.Duration(config.Length) * time.Millisecond)
  42. o.interval = config.Length
  43. case HOPPING_WINDOW:
  44. o.ticker = time.NewTicker(time.Duration(config.Interval) * time.Millisecond)
  45. o.interval = config.Interval
  46. case SLIDING_WINDOW:
  47. o.interval = config.Length
  48. case SESSION_WINDOW:
  49. o.interval = config.Interval
  50. default:
  51. log.Errorf("Unsupported window type %d", config.Type)
  52. }
  53. return o
  54. }
  55. func (o *WindowOperator) GetName() string {
  56. return o.name
  57. }
  58. func (o *WindowOperator) AddOutput(output chan<- interface{}, name string) {
  59. if _, ok := o.outputs[name]; !ok{
  60. o.outputs[name] = output
  61. }else{
  62. log.Error("fail to add output %s, operator %s already has an output of the same name", name, o.name)
  63. }
  64. }
  65. func (o *WindowOperator) GetInput() (chan<- interface{}, string) {
  66. return o.input, o.name
  67. }
  68. // Exec is the entry point for the executor
  69. func (o *WindowOperator) Exec(ctx context.Context) (err error) {
  70. log.Printf("Window operator %s is started.\n", o.name)
  71. if len(o.outputs) <= 0 {
  72. err = fmt.Errorf("no output channel found")
  73. return
  74. }
  75. go func() {
  76. var (
  77. inputs []*xsql.Tuple
  78. c <-chan time.Time
  79. timeoutTicker *time.Timer
  80. timeout <-chan time.Time
  81. )
  82. if o.ticker != nil {
  83. c = o.ticker.C
  84. }
  85. for {
  86. select {
  87. // process incoming item
  88. case item, opened := <-o.input:
  89. if !opened {
  90. return
  91. }
  92. if d, ok := item.(*xsql.Tuple); !ok {
  93. log.Errorf("Expect xsql.Tuple type.\n")
  94. return
  95. }else{
  96. inputs = append(inputs, d)
  97. switch o.window.Type{
  98. case NO_WINDOW:
  99. inputs = o.trigger(inputs, d.Timestamp)
  100. case SLIDING_WINDOW:
  101. inputs = o.trigger(inputs, d.Timestamp)
  102. case SESSION_WINDOW:
  103. if o.ticker == nil{ //Stopped by timeout or init
  104. o.ticker = time.NewTicker(time.Duration(o.window.Length) * time.Millisecond)
  105. c = o.ticker.C
  106. }
  107. if timeoutTicker != nil {
  108. timeoutTicker.Stop()
  109. timeoutTicker.Reset(time.Duration(o.window.Interval) * time.Millisecond)
  110. } else {
  111. timeoutTicker = time.NewTimer(time.Duration(o.window.Interval) * time.Millisecond)
  112. timeout = timeoutTicker.C
  113. }
  114. }
  115. }
  116. case now := <-c:
  117. if len(inputs) > 0 {
  118. log.Infof("triggered by ticker")
  119. inputs = o.trigger(inputs, common.TimeToUnixMilli(now))
  120. }
  121. case now := <-timeout:
  122. if len(inputs) > 0 {
  123. log.Infof("triggered by timeout")
  124. inputs = o.trigger(inputs, common.TimeToUnixMilli(now))
  125. }
  126. o.ticker.Stop()
  127. o.ticker = nil
  128. // is cancelling
  129. case <-ctx.Done():
  130. log.Println("Cancelling....")
  131. o.ticker.Stop()
  132. return
  133. }
  134. }
  135. }()
  136. return nil
  137. }
  138. func (o *WindowOperator) trigger(inputs []*xsql.Tuple, triggerTime int64) []*xsql.Tuple{
  139. log.Printf("window %s triggered at %s", o.name, triggerTime)
  140. var delta int64
  141. if o.window.Type == HOPPING_WINDOW || o.window.Type == SLIDING_WINDOW {
  142. lastTriggerTime := o.triggerTime
  143. o.triggerTime = triggerTime
  144. if lastTriggerTime <= 0 {
  145. delta = math.MaxInt32 //max int, all events for the initial window
  146. }else{
  147. delta = o.triggerTime - lastTriggerTime - o.window.Interval
  148. if delta > 100 && o.window.Interval > 0 {
  149. log.Warnf("Possible long computation in window; Previous eviction time: %d, current eviction time: %d", lastTriggerTime, o.triggerTime)
  150. }
  151. }
  152. }
  153. var results xsql.MultiEmitterTuples = make([]xsql.EmitterTuples, 0)
  154. i := 0
  155. //Sync table
  156. for _, tuple := range inputs{
  157. if o.window.Type == HOPPING_WINDOW || o.window.Type == SLIDING_WINDOW {
  158. diff := o.triggerTime - tuple.Timestamp
  159. if diff >= o.window.Length + delta {
  160. log.Infof("diff: %d, length: %d, delta: %d", diff, o.window.Length, delta)
  161. log.Infof("tuple %s emitted at %d expired", tuple, tuple.Timestamp)
  162. //Expired tuple, remove it by not adding back to inputs
  163. continue
  164. }
  165. //All tuples in tumbling window are not added back
  166. inputs[i] = tuple
  167. i++
  168. }
  169. results.AddTuple(tuple)
  170. }
  171. if len(results) > 0{
  172. log.Printf("window %s triggered for %d tuples", o.name, len(results))
  173. for _, output := range o.outputs{
  174. output <- results
  175. }
  176. }
  177. return inputs[:i]
  178. }