preprocessor.go 2.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283
  1. // Copyright 2021 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package operator
  15. import (
  16. "fmt"
  17. "github.com/lf-edge/ekuiper/internal/xsql"
  18. "github.com/lf-edge/ekuiper/pkg/api"
  19. "github.com/lf-edge/ekuiper/pkg/cast"
  20. )
  21. type Preprocessor struct {
  22. //Pruned stream fields. Could be streamField(with data type info) or string
  23. defaultFieldProcessor
  24. allMeta bool
  25. metaFields []string //only needed if not allMeta
  26. isEventTime bool
  27. timestampField string
  28. }
  29. func NewPreprocessor(fields []interface{}, allMeta bool, metaFields []string, iet bool, timestampField string, timestampFormat string, isBinary bool, strictValidation bool) (*Preprocessor, error) {
  30. p := &Preprocessor{
  31. allMeta: allMeta, metaFields: metaFields, isEventTime: iet, timestampField: timestampField}
  32. p.defaultFieldProcessor = defaultFieldProcessor{
  33. streamFields: fields, isBinary: isBinary, timestampFormat: timestampFormat, strictValidation: strictValidation,
  34. }
  35. return p, nil
  36. }
  37. /*
  38. * input: *xsql.Tuple
  39. * output: *xsql.Tuple
  40. */
  41. func (p *Preprocessor) Apply(ctx api.StreamContext, data interface{}, _ *xsql.FunctionValuer, _ *xsql.AggregateFunctionValuer) interface{} {
  42. log := ctx.GetLogger()
  43. tuple, ok := data.(*xsql.Tuple)
  44. if !ok {
  45. return fmt.Errorf("expect tuple data type")
  46. }
  47. log.Debugf("preprocessor receive %s", tuple.Message)
  48. result, err := p.processField(tuple, nil)
  49. if err != nil {
  50. return fmt.Errorf("error in preprocessor: %s", err)
  51. }
  52. tuple.Message = result
  53. if p.isEventTime {
  54. if t, ok := result[p.timestampField]; ok {
  55. if ts, err := cast.InterfaceToUnixMilli(t, p.timestampFormat); err != nil {
  56. return fmt.Errorf("cannot convert timestamp field %s to timestamp with error %v", p.timestampField, err)
  57. } else {
  58. tuple.Timestamp = ts
  59. log.Debugf("preprocessor calculate timstamp %d", tuple.Timestamp)
  60. }
  61. } else {
  62. return fmt.Errorf("cannot find timestamp field %s in tuple %v", p.timestampField, result)
  63. }
  64. }
  65. if !p.allMeta && p.metaFields != nil && len(p.metaFields) > 0 {
  66. newMeta := make(xsql.Metadata)
  67. for _, f := range p.metaFields {
  68. if m, ok := tuple.Metadata.Value(f); ok {
  69. newMeta[f] = m
  70. }
  71. }
  72. tuple.Metadata = newMeta
  73. }
  74. return tuple
  75. }