preprocessor.go 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384
  1. // Copyright 2021 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package operator
  15. import (
  16. "fmt"
  17. "github.com/lf-edge/ekuiper/internal/xsql"
  18. "github.com/lf-edge/ekuiper/pkg/api"
  19. "github.com/lf-edge/ekuiper/pkg/cast"
  20. "strings"
  21. )
  22. type Preprocessor struct {
  23. //Pruned stream fields. Could be streamField(with data type info) or string
  24. defaultFieldProcessor
  25. allMeta bool
  26. metaFields []string //only needed if not allMeta
  27. isEventTime bool
  28. timestampField string
  29. }
  30. func NewPreprocessor(fields []interface{}, allMeta bool, metaFields []string, iet bool, timestampField string, timestampFormat string, isBinary bool, strictValidation bool) (*Preprocessor, error) {
  31. p := &Preprocessor{
  32. allMeta: allMeta, metaFields: metaFields, isEventTime: iet, timestampField: timestampField}
  33. p.defaultFieldProcessor = defaultFieldProcessor{
  34. streamFields: fields, isBinary: isBinary, timestampFormat: timestampFormat, strictValidation: strictValidation,
  35. }
  36. return p, nil
  37. }
  38. /*
  39. * input: *xsql.Tuple
  40. * output: *xsql.Tuple
  41. */
  42. func (p *Preprocessor) Apply(ctx api.StreamContext, data interface{}, fv *xsql.FunctionValuer, _ *xsql.AggregateFunctionValuer) interface{} {
  43. log := ctx.GetLogger()
  44. tuple, ok := data.(*xsql.Tuple)
  45. if !ok {
  46. return fmt.Errorf("expect tuple data type")
  47. }
  48. log.Debugf("preprocessor receive %s", tuple.Message)
  49. result, err := p.processField(tuple, fv)
  50. if err != nil {
  51. return fmt.Errorf("error in preprocessor: %s", err)
  52. }
  53. tuple.Message = result
  54. if p.isEventTime {
  55. if t, ok := result[p.timestampField]; ok {
  56. if ts, err := cast.InterfaceToUnixMilli(t, p.timestampFormat); err != nil {
  57. return fmt.Errorf("cannot convert timestamp field %s to timestamp with error %v", p.timestampField, err)
  58. } else {
  59. tuple.Timestamp = ts
  60. log.Debugf("preprocessor calculate timstamp %d", tuple.Timestamp)
  61. }
  62. } else {
  63. return fmt.Errorf("cannot find timestamp field %s in tuple %v", p.timestampField, result)
  64. }
  65. }
  66. if !p.allMeta && p.metaFields != nil && len(p.metaFields) > 0 {
  67. newMeta := make(xsql.Metadata)
  68. for _, f := range p.metaFields {
  69. if m, ok := tuple.Metadata.Value(f); ok {
  70. newMeta[strings.ToLower(f)] = m
  71. }
  72. }
  73. tuple.Metadata = newMeta
  74. }
  75. return tuple
  76. }