preprocessor.go 2.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970
  1. package operators
  2. import (
  3. "fmt"
  4. "github.com/emqx/kuiper/common"
  5. "github.com/emqx/kuiper/xsql"
  6. "github.com/emqx/kuiper/xstream/api"
  7. "strings"
  8. )
  9. type Preprocessor struct {
  10. //Pruned stream fields. Could be streamField(with data type info) or string
  11. defaultFieldProcessor
  12. allMeta bool
  13. metaFields []string //only needed if not allMeta
  14. isEventTime bool
  15. timestampField string
  16. }
  17. func NewPreprocessor(fields []interface{}, fs xsql.Fields, allMeta bool, metaFields []string, iet bool, timestampField string, timestampFormat string, isBinary bool) (*Preprocessor, error) {
  18. p := &Preprocessor{
  19. allMeta: allMeta, metaFields: metaFields, isEventTime: iet, timestampField: timestampField}
  20. p.defaultFieldProcessor = defaultFieldProcessor{
  21. streamFields: fields, aliasFields: fs, isBinary: isBinary, timestampFormat: timestampFormat,
  22. }
  23. return p, nil
  24. }
  25. /*
  26. * input: *xsql.Tuple
  27. * output: *xsql.Tuple
  28. */
  29. func (p *Preprocessor) Apply(ctx api.StreamContext, data interface{}, fv *xsql.FunctionValuer, _ *xsql.AggregateFunctionValuer) interface{} {
  30. log := ctx.GetLogger()
  31. tuple, ok := data.(*xsql.Tuple)
  32. if !ok {
  33. return fmt.Errorf("expect tuple data type")
  34. }
  35. log.Debugf("preprocessor receive %s", tuple.Message)
  36. result, err := p.processField(tuple, fv)
  37. if err != nil {
  38. return fmt.Errorf("error in preprocessor: %s", err)
  39. }
  40. tuple.Message = result
  41. if p.isEventTime {
  42. if t, ok := result[p.timestampField]; ok {
  43. if ts, err := common.InterfaceToUnixMilli(t, p.timestampFormat); err != nil {
  44. return fmt.Errorf("cannot convert timestamp field %s to timestamp with error %v", p.timestampField, err)
  45. } else {
  46. tuple.Timestamp = ts
  47. log.Debugf("preprocessor calculate timstamp %d", tuple.Timestamp)
  48. }
  49. } else {
  50. return fmt.Errorf("cannot find timestamp field %s in tuple %v", p.timestampField, result)
  51. }
  52. }
  53. if !p.allMeta && p.metaFields != nil && len(p.metaFields) > 0 {
  54. newMeta := make(xsql.Metadata)
  55. for _, f := range p.metaFields {
  56. if m, ok := tuple.Metadata.Value(f); ok {
  57. newMeta[strings.ToLower(f)] = m
  58. }
  59. }
  60. tuple.Metadata = newMeta
  61. }
  62. return tuple
  63. }