operations.go 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170
  1. package operators
  2. import (
  3. "fmt"
  4. "github.com/emqx/kuiper/xsql"
  5. "github.com/emqx/kuiper/xstream/api"
  6. "github.com/emqx/kuiper/xstream/nodes"
  7. "sync"
  8. )
  9. // UnOperation interface represents unary operations (i.e. Map, Filter, etc)
  10. type UnOperation interface {
  11. Apply(ctx api.StreamContext, data interface{}, fv *xsql.FunctionValuer, afv *xsql.AggregateFunctionValuer) interface{}
  12. }
  13. // UnFunc implements UnOperation as type func (context.Context, interface{})
  14. type UnFunc func(api.StreamContext, interface{}) interface{}
  15. // Apply implements UnOperation.Apply method
  16. func (f UnFunc) Apply(ctx api.StreamContext, data interface{}) interface{} {
  17. return f(ctx, data)
  18. }
  19. type UnaryOperator struct {
  20. op UnOperation
  21. concurrency int
  22. input chan interface{}
  23. outputs map[string]chan<- interface{}
  24. mutex sync.RWMutex
  25. cancelled bool
  26. name string
  27. statManagers []nodes.StatManager
  28. }
  29. // NewUnary creates *UnaryOperator value
  30. func New(name string, bufferLength int) *UnaryOperator {
  31. // extract logger
  32. o := new(UnaryOperator)
  33. o.concurrency = 1
  34. o.input = make(chan interface{}, bufferLength)
  35. o.outputs = make(map[string]chan<- interface{})
  36. o.name = name
  37. return o
  38. }
  39. func (o *UnaryOperator) GetName() string {
  40. return o.name
  41. }
  42. // SetOperation sets the executor operation
  43. func (o *UnaryOperator) SetOperation(op UnOperation) {
  44. o.op = op
  45. }
  46. // SetConcurrency sets the concurrency level for the operation
  47. func (o *UnaryOperator) SetConcurrency(concurr int) {
  48. o.concurrency = concurr
  49. if o.concurrency < 1 {
  50. o.concurrency = 1
  51. }
  52. }
  53. func (o *UnaryOperator) AddOutput(output chan<- interface{}, name string) error {
  54. if _, ok := o.outputs[name]; !ok {
  55. o.outputs[name] = output
  56. } else {
  57. return fmt.Errorf("fail to add output %s, operator %s already has an output of the same name", name, o.name)
  58. }
  59. return nil
  60. }
  61. func (o *UnaryOperator) GetInput() (chan<- interface{}, string) {
  62. return o.input, o.name
  63. }
  64. // Exec is the entry point for the executor
  65. func (o *UnaryOperator) Exec(ctx api.StreamContext, errCh chan<- error) {
  66. log := ctx.GetLogger()
  67. log.Debugf("Unary operator %s is started", o.name)
  68. if len(o.outputs) <= 0 {
  69. go func() { errCh <- fmt.Errorf("no output channel found") }()
  70. return
  71. }
  72. // validate p
  73. if o.concurrency < 1 {
  74. o.concurrency = 1
  75. }
  76. //reset status
  77. o.statManagers = nil
  78. for i := 0; i < o.concurrency; i++ { // workers
  79. instance := i
  80. go o.doOp(ctx.WithInstance(instance), errCh)
  81. }
  82. }
  83. func (o *UnaryOperator) doOp(ctx api.StreamContext, errCh chan<- error) {
  84. logger := ctx.GetLogger()
  85. if o.op == nil {
  86. logger.Infoln("Unary operator missing operation")
  87. return
  88. }
  89. exeCtx, cancel := ctx.WithCancel()
  90. defer func() {
  91. logger.Infof("unary operator %s instance %d done, cancelling future items", o.name, ctx.GetInstanceId())
  92. cancel()
  93. }()
  94. stats, err := nodes.NewStatManager("op", ctx)
  95. if err != nil {
  96. select {
  97. case errCh <- err:
  98. case <-ctx.Done():
  99. logger.Infof("unary operator %s cancelling....", o.name)
  100. o.mutex.Lock()
  101. cancel()
  102. o.cancelled = true
  103. o.mutex.Unlock()
  104. }
  105. return
  106. }
  107. o.mutex.Lock()
  108. o.statManagers = append(o.statManagers, stats)
  109. o.mutex.Unlock()
  110. fv, afv := xsql.NewAggregateFunctionValuers()
  111. for {
  112. select {
  113. // process incoming item
  114. case item := <-o.input:
  115. stats.IncTotalRecordsIn()
  116. stats.ProcessTimeStart()
  117. result := o.op.Apply(exeCtx, item, fv, afv)
  118. switch val := result.(type) {
  119. case nil:
  120. continue
  121. case error:
  122. logger.Errorf("Operation %s error: %s", ctx.GetOpId(), val)
  123. nodes.Broadcast(o.outputs, val, ctx)
  124. stats.IncTotalExceptions()
  125. continue
  126. default:
  127. stats.ProcessTimeEnd()
  128. nodes.Broadcast(o.outputs, val, ctx)
  129. stats.IncTotalRecordsOut()
  130. stats.SetBufferLength(int64(len(o.input)))
  131. }
  132. // is cancelling
  133. case <-ctx.Done():
  134. logger.Infof("unary operator %s instance %d cancelling....", o.name, ctx.GetInstanceId())
  135. o.mutex.Lock()
  136. cancel()
  137. o.cancelled = true
  138. o.mutex.Unlock()
  139. return
  140. }
  141. }
  142. }
  143. func (m *UnaryOperator) GetMetrics() (result [][]interface{}) {
  144. for _, stats := range m.statManagers {
  145. result = append(result, stats.GetMetrics())
  146. }
  147. return result
  148. }