preprocessor.go 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105
  1. // Copyright 2021-2022 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package operator
  15. import (
  16. "fmt"
  17. "github.com/lf-edge/ekuiper/internal/conf"
  18. "github.com/lf-edge/ekuiper/internal/xsql"
  19. "github.com/lf-edge/ekuiper/pkg/api"
  20. "github.com/lf-edge/ekuiper/pkg/ast"
  21. "github.com/lf-edge/ekuiper/pkg/cast"
  22. "github.com/lf-edge/ekuiper/pkg/message"
  23. )
  24. // Preprocessor only planned when
  25. // 1. eventTime, to convert the timestamp field
  26. // 2. schema validate and convert, when strict_validation is on and field type is not binary
  27. // Do not convert types
  28. type Preprocessor struct {
  29. //Pruned stream fields. Could be streamField(with data type info) or string
  30. defaultFieldProcessor
  31. //allMeta bool
  32. //metaFields []string //only needed if not allMeta
  33. isEventTime bool
  34. timestampField string
  35. checkSchema bool
  36. isBinary bool
  37. }
  38. func NewPreprocessor(isSchemaless bool, fields map[string]*ast.JsonStreamField, _ bool, _ []string, iet bool, timestampField string, timestampFormat string, isBinary bool, strictValidation bool) (*Preprocessor, error) {
  39. p := &Preprocessor{
  40. isEventTime: iet, timestampField: timestampField, isBinary: isBinary}
  41. conf.Log.Infof("preprocessor isSchemaless %v, strictValidation %v, isBinary %v", isSchemaless, strictValidation, strictValidation)
  42. if !isSchemaless && (strictValidation || isBinary) {
  43. p.checkSchema = true
  44. conf.Log.Infof("preprocessor check schema")
  45. p.defaultFieldProcessor = defaultFieldProcessor{
  46. streamFields: fields, timestampFormat: timestampFormat,
  47. }
  48. }
  49. return p, nil
  50. }
  51. // Apply the preprocessor to the tuple
  52. /* input: *xsql.Tuple
  53. * output: *xsql.Tuple
  54. */
  55. func (p *Preprocessor) Apply(ctx api.StreamContext, data interface{}, _ *xsql.FunctionValuer, _ *xsql.AggregateFunctionValuer) interface{} {
  56. log := ctx.GetLogger()
  57. tuple, ok := data.(*xsql.Tuple)
  58. if !ok {
  59. return fmt.Errorf("expect tuple data type")
  60. }
  61. log.Debugf("preprocessor receive %s", tuple.Message)
  62. if p.checkSchema {
  63. if !p.isBinary {
  64. err := p.validateAndConvert(tuple)
  65. if err != nil {
  66. return fmt.Errorf("error in preprocessor: %s", err)
  67. }
  68. } else {
  69. for name := range p.streamFields {
  70. tuple.Message[name] = tuple.Message[message.DefaultField]
  71. delete(tuple.Message, message.DefaultField)
  72. break
  73. }
  74. }
  75. }
  76. if p.isEventTime {
  77. if t, ok := tuple.Message[p.timestampField]; ok {
  78. if ts, err := cast.InterfaceToUnixMilli(t, p.timestampFormat); err != nil {
  79. return fmt.Errorf("cannot convert timestamp field %s to timestamp with error %v", p.timestampField, err)
  80. } else {
  81. tuple.Timestamp = ts
  82. log.Debugf("preprocessor calculate timestamp %d", tuple.Timestamp)
  83. }
  84. } else {
  85. return fmt.Errorf("cannot find timestamp field %s in tuple %v", p.timestampField, tuple.Message)
  86. }
  87. }
  88. // No need to reconstruct meta as the memory has been allocated earlier
  89. //if !p.allMeta && p.metaFields != nil && len(p.metaFields) > 0 {
  90. // newMeta := make(xsql.Metadata)
  91. // for _, f := range p.metaFields {
  92. // if m, ok := tuple.Metadata.Value(f, ""); ok {
  93. // newMeta[f] = m
  94. // }
  95. // }
  96. // tuple.Metadata = newMeta
  97. //}
  98. return tuple
  99. }