funcs.go 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158
  1. package xstream
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/emqx/kuiper/xstream/api"
  6. "github.com/emqx/kuiper/xstream/nodes"
  7. "reflect"
  8. )
  9. type unaryFuncForm byte
  10. const (
  11. unaryFuncUnsupported unaryFuncForm = iota
  12. unaryFuncForm1
  13. unaryFuncForm2
  14. )
  15. // ProcessFunc returns a unary function which applies the specified
  16. // user-defined function that processes data items from upstream and
  17. // returns a result value. The provided function must be of type:
  18. // func(T) R
  19. // where T is the type of incoming item
  20. // R the type of returned processed item
  21. func ProcessFunc(f interface{}) (nodes.UnFunc, error) {
  22. fntype := reflect.TypeOf(f)
  23. funcForm, err := isUnaryFuncForm(fntype)
  24. if err != nil {
  25. return nil, err
  26. }
  27. if funcForm == unaryFuncUnsupported {
  28. return nil, fmt.Errorf("unsupported unary func type")
  29. }
  30. fnval := reflect.ValueOf(f)
  31. return nodes.UnFunc(func(ctx api.StreamContext, data interface{}) interface{} {
  32. result := callOpFunc(fnval, ctx, data, funcForm)
  33. return result.Interface()
  34. }), nil
  35. }
  36. // FilterFunc returns a unary function (api.UnFunc) which applies the user-defined
  37. // filtering to apply predicates that filters out data items from being included
  38. // in the downstream. The provided user-defined function must be of type:
  39. // func(T)bool - where T is the type of incoming data item, bool is the value of the predicate
  40. // When the user-defined function returns false, the current processed data item will not
  41. // be placed in the downstream processing.
  42. func FilterFunc(f interface{}) (nodes.UnFunc, error) {
  43. fntype := reflect.TypeOf(f)
  44. funcForm, err := isUnaryFuncForm(fntype)
  45. if err != nil {
  46. return nil, err
  47. }
  48. if funcForm == unaryFuncUnsupported {
  49. return nil, fmt.Errorf("unsupported unary func type")
  50. }
  51. // ensure bool ret type
  52. if fntype.Out(0).Kind() != reflect.Bool {
  53. return nil, fmt.Errorf("unary filter func must return bool")
  54. }
  55. fnval := reflect.ValueOf(f)
  56. return nodes.UnFunc(func(ctx api.StreamContext, data interface{}) interface{} {
  57. result := callOpFunc(fnval, ctx, data, funcForm)
  58. predicate := result.Bool()
  59. if !predicate {
  60. return nil
  61. }
  62. return data
  63. }), nil
  64. }
  65. // MapFunc returns an unary function which applies the user-defined function which
  66. // maps, one-to-one, the incomfing value to a new value. The user-defined function
  67. // must be of type:
  68. // func(T) R - where T is the incoming item, R is the type of the returned mapped item
  69. func MapFunc(f interface{}) (nodes.UnFunc, error) {
  70. return ProcessFunc(f)
  71. }
  72. // FlatMapFunc returns an unary function which applies a user-defined function which
  73. // takes incoming comsite items and deconstruct them into individual items which can
  74. // then be re-streamed. The type for the user-defined function is:
  75. // func (T) R - where R is the original item, R is a slice of decostructed items
  76. // The slice returned should be restreamed by placing each item onto the stream for
  77. // downstream processing.
  78. func FlatMapFunc(f interface{}) (nodes.UnFunc, error) {
  79. fntype := reflect.TypeOf(f)
  80. funcForm, err := isUnaryFuncForm(fntype)
  81. if err != nil {
  82. return nil, err
  83. }
  84. if funcForm == unaryFuncUnsupported {
  85. return nil, fmt.Errorf("unsupported unary func type")
  86. }
  87. if fntype.Out(0).Kind() != reflect.Slice {
  88. return nil, fmt.Errorf("unary FlatMap func must return slice")
  89. }
  90. fnval := reflect.ValueOf(f)
  91. return nodes.UnFunc(func(ctx api.StreamContext, data interface{}) interface{} {
  92. result := callOpFunc(fnval, ctx, data, funcForm)
  93. return result.Interface()
  94. }), nil
  95. }
  96. // isUnaryFuncForm ensures ftype is of supported function of
  97. // form func(in) out or func(context, in) out
  98. func isUnaryFuncForm(ftype reflect.Type) (unaryFuncForm, error) {
  99. if ftype.NumOut() != 1 {
  100. return unaryFuncUnsupported, fmt.Errorf("unary func must return one param")
  101. }
  102. switch ftype.Kind() {
  103. case reflect.Func:
  104. switch ftype.NumIn() {
  105. case 1:
  106. // f(in)out, ok
  107. return unaryFuncForm1, nil
  108. case 2:
  109. // func(context,in)out
  110. param0 := ftype.In(0)
  111. if param0.Kind() != reflect.Interface {
  112. return unaryFuncUnsupported, fmt.Errorf("unary must be type func(T)R or func(context.Context, T)R")
  113. }
  114. return unaryFuncForm2, nil
  115. }
  116. }
  117. return unaryFuncUnsupported, fmt.Errorf("unary func must be of type func(T)R or func(context.Context,T)R")
  118. }
  119. func callOpFunc(fnval reflect.Value, ctx context.Context, data interface{}, funcForm unaryFuncForm) reflect.Value {
  120. var result reflect.Value
  121. switch funcForm {
  122. case unaryFuncForm1:
  123. arg0 := reflect.ValueOf(data)
  124. result = fnval.Call([]reflect.Value{arg0})[0]
  125. case unaryFuncForm2:
  126. arg0 := reflect.ValueOf(ctx)
  127. arg1 := reflect.ValueOf(data)
  128. if !arg0.IsValid() {
  129. arg0 = reflect.ValueOf(context.Background())
  130. }
  131. result = fnval.Call([]reflect.Value{arg0, arg1})[0]
  132. }
  133. return result
  134. }
  135. func isArgContext(val reflect.Value) bool {
  136. _, ok := val.Interface().(context.Context)
  137. return ok
  138. }