preprocessor.go 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108
  1. // Copyright 2021-2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package operator
  15. import (
  16. "fmt"
  17. "github.com/lf-edge/ekuiper/internal/conf"
  18. "github.com/lf-edge/ekuiper/internal/xsql"
  19. "github.com/lf-edge/ekuiper/pkg/api"
  20. "github.com/lf-edge/ekuiper/pkg/ast"
  21. "github.com/lf-edge/ekuiper/pkg/cast"
  22. "github.com/lf-edge/ekuiper/pkg/message"
  23. )
  24. // Preprocessor only planned when
  25. // 1. eventTime, to convert the timestamp field
  26. // 2. schema validate and convert, when strict_validation is on and field type is not binary
  27. // Do not convert types
  28. type Preprocessor struct {
  29. // Pruned stream fields. Could be streamField(with data type info) or string
  30. defaultFieldProcessor
  31. // allMeta bool
  32. // metaFields []string //only needed if not allMeta
  33. isEventTime bool
  34. timestampField string
  35. checkSchema bool
  36. isBinary bool
  37. }
  38. func NewPreprocessor(isSchemaless bool, fields map[string]*ast.JsonStreamField, _ bool, _ []string, iet bool, timestampField string, timestampFormat string, isBinary bool, strictValidation bool) (*Preprocessor, error) {
  39. p := &Preprocessor{
  40. isEventTime: iet, timestampField: timestampField, isBinary: isBinary,
  41. }
  42. p.defaultFieldProcessor = defaultFieldProcessor{
  43. timestampFormat: timestampFormat,
  44. }
  45. conf.Log.Infof("preprocessor isSchemaless %v, strictValidation %v, isBinary %v", isSchemaless, strictValidation, strictValidation)
  46. if !isSchemaless && (strictValidation || isBinary) {
  47. p.checkSchema = true
  48. conf.Log.Infof("preprocessor check schema")
  49. p.defaultFieldProcessor.streamFields = fields
  50. }
  51. return p, nil
  52. }
  53. // Apply the preprocessor to the tuple
  54. /* input: *xsql.Tuple
  55. * output: *xsql.Tuple
  56. */
  57. func (p *Preprocessor) Apply(ctx api.StreamContext, data interface{}, _ *xsql.FunctionValuer, _ *xsql.AggregateFunctionValuer) interface{} {
  58. log := ctx.GetLogger()
  59. tuple, ok := data.(*xsql.Tuple)
  60. if !ok {
  61. return fmt.Errorf("expect tuple data type")
  62. }
  63. log.Debugf("preprocessor receive %s", tuple.Message)
  64. if p.checkSchema {
  65. if !p.isBinary {
  66. err := p.validateAndConvert(tuple)
  67. if err != nil {
  68. return fmt.Errorf("error in preprocessor: %s", err)
  69. }
  70. } else {
  71. for name := range p.streamFields {
  72. tuple.Message[name] = tuple.Message[message.DefaultField]
  73. delete(tuple.Message, message.DefaultField)
  74. break
  75. }
  76. }
  77. }
  78. if p.isEventTime {
  79. if t, ok := tuple.Message[p.timestampField]; ok {
  80. if ts, err := cast.InterfaceToUnixMilli(t, p.timestampFormat); err != nil {
  81. return fmt.Errorf("cannot convert timestamp field %s to timestamp with error %v", p.timestampField, err)
  82. } else {
  83. tuple.Timestamp = ts
  84. log.Debugf("preprocessor calculate timestamp %d", tuple.Timestamp)
  85. }
  86. } else {
  87. return fmt.Errorf("cannot find timestamp field %s in tuple %v", p.timestampField, tuple.Message)
  88. }
  89. }
  90. // No need to reconstruct meta as the memory has been allocated earlier
  91. //if !p.allMeta && p.metaFields != nil && len(p.metaFields) > 0 {
  92. // newMeta := make(xsql.Metadata)
  93. // for _, f := range p.metaFields {
  94. // if m, ok := tuple.Metadata.Value(f, ""); ok {
  95. // newMeta[f] = m
  96. // }
  97. // }
  98. // tuple.Metadata = newMeta
  99. //}
  100. return tuple
  101. }