preprocessor.go 2.0 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667
  1. package operators
  2. import (
  3. "fmt"
  4. "github.com/emqx/kuiper/common"
  5. "github.com/emqx/kuiper/xsql"
  6. "github.com/emqx/kuiper/xstream/api"
  7. )
  8. type Preprocessor struct {
  9. //Pruned stream fields. Could be streamField(with data type info) or string
  10. defaultFieldProcessor
  11. allMeta bool
  12. metaFields []string //only needed if not allMeta
  13. isEventTime bool
  14. timestampField string
  15. }
  16. func NewPreprocessor(fields []interface{}, fs xsql.Fields, allMeta bool, metaFields []string, iet bool, timestampField string, timestampFormat string, isBinary bool) (*Preprocessor, error) {
  17. p := &Preprocessor{
  18. allMeta: allMeta, metaFields: metaFields, isEventTime: iet, timestampField: timestampField}
  19. p.defaultFieldProcessor = defaultFieldProcessor{
  20. streamFields: fields, aliasFields: fs, isBinary: isBinary, timestampFormat: timestampFormat,
  21. }
  22. return p, nil
  23. }
  24. /*
  25. * input: *xsql.Tuple
  26. * output: *xsql.Tuple
  27. */
  28. func (p *Preprocessor) Apply(ctx api.StreamContext, data interface{}, fv *xsql.FunctionValuer, _ *xsql.AggregateFunctionValuer) interface{} {
  29. log := ctx.GetLogger()
  30. tuple, ok := data.(*xsql.Tuple)
  31. if !ok {
  32. return fmt.Errorf("expect tuple data type")
  33. }
  34. log.Debugf("preprocessor receive %s", tuple.Message)
  35. result, err := p.processField(tuple, fv)
  36. if err != nil {
  37. return fmt.Errorf("error in preprocessor: %s", err)
  38. }
  39. tuple.Message = result
  40. if p.isEventTime {
  41. if t, ok := result[p.timestampField]; ok {
  42. if ts, err := common.InterfaceToUnixMilli(t, p.timestampFormat); err != nil {
  43. return fmt.Errorf("cannot convert timestamp field %s to timestamp with error %v", p.timestampField, err)
  44. } else {
  45. tuple.Timestamp = ts
  46. log.Debugf("preprocessor calculate timstamp %d", tuple.Timestamp)
  47. }
  48. } else {
  49. return fmt.Errorf("cannot find timestamp field %s in tuple %v", p.timestampField, result)
  50. }
  51. }
  52. if !p.allMeta && p.metaFields != nil && len(p.metaFields) > 0 {
  53. newMeta := make(xsql.Metadata)
  54. for _, f := range p.metaFields {
  55. newMeta[f] = tuple.Metadata[f]
  56. }
  57. tuple.Metadata = newMeta
  58. }
  59. return tuple
  60. }