preprocessor.go 3.0 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192
  1. // Copyright 2021 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package operator
  15. import (
  16. "fmt"
  17. "github.com/lf-edge/ekuiper/internal/xsql"
  18. "github.com/lf-edge/ekuiper/pkg/api"
  19. "github.com/lf-edge/ekuiper/pkg/cast"
  20. )
  21. type Preprocessor struct {
  22. //Pruned stream fields. Could be streamField(with data type info) or string
  23. defaultFieldProcessor
  24. allMeta bool
  25. metaFields []string //only needed if not allMeta
  26. isEventTime bool
  27. isSchemaless bool
  28. timestampField string
  29. }
  30. func NewPreprocessor(isSchemaless bool, fields []interface{}, allMeta bool, metaFields []string, iet bool, timestampField string, timestampFormat string, isBinary bool, strictValidation bool) (*Preprocessor, error) {
  31. p := &Preprocessor{
  32. allMeta: allMeta, metaFields: metaFields, isEventTime: iet,
  33. isSchemaless: isSchemaless, timestampField: timestampField}
  34. p.defaultFieldProcessor = defaultFieldProcessor{
  35. streamFields: fields, isBinary: isBinary, timestampFormat: timestampFormat, strictValidation: strictValidation,
  36. }
  37. return p, nil
  38. }
  39. /*
  40. * input: *xsql.Tuple
  41. * output: *xsql.Tuple
  42. */
  43. func (p *Preprocessor) Apply(ctx api.StreamContext, data interface{}, _ *xsql.FunctionValuer, _ *xsql.AggregateFunctionValuer) interface{} {
  44. log := ctx.GetLogger()
  45. tuple, ok := data.(*xsql.Tuple)
  46. if !ok {
  47. return fmt.Errorf("expect tuple data type")
  48. }
  49. log.Debugf("preprocessor receive %s", tuple.Message)
  50. var (
  51. result map[string]interface{}
  52. err error
  53. )
  54. if !p.isSchemaless && p.streamFields != nil {
  55. result, err = p.processField(tuple, nil)
  56. if err != nil {
  57. return fmt.Errorf("error in preprocessor: %s", err)
  58. }
  59. tuple.Message = result
  60. } else {
  61. result = tuple.Message
  62. }
  63. if p.isEventTime {
  64. if t, ok := result[p.timestampField]; ok {
  65. if ts, err := cast.InterfaceToUnixMilli(t, p.timestampFormat); err != nil {
  66. return fmt.Errorf("cannot convert timestamp field %s to timestamp with error %v", p.timestampField, err)
  67. } else {
  68. tuple.Timestamp = ts
  69. log.Debugf("preprocessor calculate timstamp %d", tuple.Timestamp)
  70. }
  71. } else {
  72. return fmt.Errorf("cannot find timestamp field %s in tuple %v", p.timestampField, result)
  73. }
  74. }
  75. // No need to reconstruct meta as the memory has been allocated earlier
  76. //if !p.allMeta && p.metaFields != nil && len(p.metaFields) > 0 {
  77. // newMeta := make(xsql.Metadata)
  78. // for _, f := range p.metaFields {
  79. // if m, ok := tuple.Metadata.Value(f, ""); ok {
  80. // newMeta[f] = m
  81. // }
  82. // }
  83. // tuple.Metadata = newMeta
  84. //}
  85. return tuple
  86. }