funcs.go 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158
  1. package xstream
  2. import (
  3. "context"
  4. "fmt"
  5. "engine/xstream/operators"
  6. "reflect"
  7. )
  8. type unaryFuncForm byte
  9. const (
  10. unaryFuncUnsupported unaryFuncForm = iota
  11. unaryFuncForm1
  12. unaryFuncForm2
  13. )
  14. // ProcessFunc returns a unary function which applies the specified
  15. // user-defined function that processes data items from upstream and
  16. // returns a result value. The provided function must be of type:
  17. // func(T) R
  18. // where T is the type of incoming item
  19. // R the type of returned processed item
  20. func ProcessFunc(f interface{}) (operators.UnFunc, error) {
  21. fntype := reflect.TypeOf(f)
  22. funcForm, err := isUnaryFuncForm(fntype)
  23. if err != nil {
  24. return nil, err
  25. }
  26. if funcForm == unaryFuncUnsupported {
  27. return nil, fmt.Errorf("unsupported unary func type")
  28. }
  29. fnval := reflect.ValueOf(f)
  30. return operators.UnFunc(func(ctx context.Context, data interface{}) interface{} {
  31. result := callOpFunc(fnval, ctx, data, funcForm)
  32. return result.Interface()
  33. }), nil
  34. }
  35. // FilterFunc returns a unary function (api.UnFunc) which applies the user-defined
  36. // filtering to apply predicates that filters out data items from being included
  37. // in the downstream. The provided user-defined function must be of type:
  38. // func(T)bool - where T is the type of incoming data item, bool is the value of the predicate
  39. // When the user-defined function returns false, the current processed data item will not
  40. // be placed in the downstream processing.
  41. func FilterFunc(f interface{}) (operators.UnFunc, error) {
  42. fntype := reflect.TypeOf(f)
  43. funcForm, err := isUnaryFuncForm(fntype)
  44. if err != nil {
  45. return nil, err
  46. }
  47. if funcForm == unaryFuncUnsupported {
  48. return nil, fmt.Errorf("unsupported unary func type")
  49. }
  50. // ensure bool ret type
  51. if fntype.Out(0).Kind() != reflect.Bool {
  52. return nil, fmt.Errorf("unary filter func must return bool")
  53. }
  54. fnval := reflect.ValueOf(f)
  55. return operators.UnFunc(func(ctx context.Context, data interface{}) interface{} {
  56. result := callOpFunc(fnval, ctx, data, funcForm)
  57. predicate := result.Bool()
  58. if !predicate {
  59. return nil
  60. }
  61. return data
  62. }), nil
  63. }
  64. // MapFunc returns an unary function which applies the user-defined function which
  65. // maps, one-to-one, the incomfing value to a new value. The user-defined function
  66. // must be of type:
  67. // func(T) R - where T is the incoming item, R is the type of the returned mapped item
  68. func MapFunc(f interface{}) (operators.UnFunc, error) {
  69. return ProcessFunc(f)
  70. }
  71. // FlatMapFunc returns an unary function which applies a user-defined function which
  72. // takes incoming comsite items and deconstruct them into individual items which can
  73. // then be re-streamed. The type for the user-defined function is:
  74. // func (T) R - where R is the original item, R is a slice of decostructed items
  75. // The slice returned should be restreamed by placing each item onto the stream for
  76. // downstream processing.
  77. func FlatMapFunc(f interface{}) (operators.UnFunc, error) {
  78. fntype := reflect.TypeOf(f)
  79. funcForm, err := isUnaryFuncForm(fntype)
  80. if err != nil {
  81. return nil, err
  82. }
  83. if funcForm == unaryFuncUnsupported {
  84. return nil, fmt.Errorf("unsupported unary func type")
  85. }
  86. if fntype.Out(0).Kind() != reflect.Slice {
  87. return nil, fmt.Errorf("unary FlatMap func must return slice")
  88. }
  89. fnval := reflect.ValueOf(f)
  90. return operators.UnFunc(func(ctx context.Context, data interface{}) interface{} {
  91. result := callOpFunc(fnval, ctx, data, funcForm)
  92. return result.Interface()
  93. }), nil
  94. }
  95. // isUnaryFuncForm ensures ftype is of supported function of
  96. // form func(in) out or func(context, in) out
  97. func isUnaryFuncForm(ftype reflect.Type) (unaryFuncForm, error) {
  98. if ftype.NumOut() != 1 {
  99. return unaryFuncUnsupported, fmt.Errorf("unary func must return one param")
  100. }
  101. switch ftype.Kind() {
  102. case reflect.Func:
  103. switch ftype.NumIn() {
  104. case 1:
  105. // f(in)out, ok
  106. return unaryFuncForm1, nil
  107. case 2:
  108. // func(context,in)out
  109. param0 := ftype.In(0)
  110. if param0.Kind() != reflect.Interface {
  111. return unaryFuncUnsupported, fmt.Errorf("unary must be type func(T)R or func(context.Context, T)R")
  112. }
  113. return unaryFuncForm2, nil
  114. }
  115. }
  116. return unaryFuncUnsupported, fmt.Errorf("unary func must be of type func(T)R or func(context.Context,T)R")
  117. }
  118. func callOpFunc(fnval reflect.Value, ctx context.Context, data interface{}, funcForm unaryFuncForm) reflect.Value {
  119. var result reflect.Value
  120. switch funcForm {
  121. case unaryFuncForm1:
  122. arg0 := reflect.ValueOf(data)
  123. result = fnval.Call([]reflect.Value{arg0})[0]
  124. case unaryFuncForm2:
  125. arg0 := reflect.ValueOf(ctx)
  126. arg1 := reflect.ValueOf(data)
  127. if !arg0.IsValid() {
  128. arg0 = reflect.ValueOf(context.Background())
  129. }
  130. result = fnval.Call([]reflect.Value{arg0, arg1})[0]
  131. }
  132. return result
  133. }
  134. func isArgContext(val reflect.Value) bool {
  135. _, ok := val.Interface().(context.Context)
  136. return ok
  137. }