preprocessor.go 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102
  1. // Copyright 2021-2022 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package operator
  15. import (
  16. "fmt"
  17. "github.com/lf-edge/ekuiper/internal/xsql"
  18. "github.com/lf-edge/ekuiper/pkg/api"
  19. "github.com/lf-edge/ekuiper/pkg/ast"
  20. "github.com/lf-edge/ekuiper/pkg/cast"
  21. "github.com/lf-edge/ekuiper/pkg/message"
  22. )
  23. // Preprocessor only planned when
  24. // 1. eventTime, to convert the timestamp field
  25. // 2. schema validate and convert, when strict_validation is on and field type is not binary
  26. // Do not convert types
  27. type Preprocessor struct {
  28. //Pruned stream fields. Could be streamField(with data type info) or string
  29. defaultFieldProcessor
  30. //allMeta bool
  31. //metaFields []string //only needed if not allMeta
  32. isEventTime bool
  33. timestampField string
  34. checkSchema bool
  35. isBinary bool
  36. }
  37. func NewPreprocessor(isSchemaless bool, fields map[string]*ast.JsonStreamField, _ bool, _ []string, iet bool, timestampField string, timestampFormat string, isBinary bool, strictValidation bool) (*Preprocessor, error) {
  38. p := &Preprocessor{
  39. isEventTime: iet, timestampField: timestampField, isBinary: isBinary}
  40. if !isSchemaless && (strictValidation || isBinary) {
  41. p.checkSchema = true
  42. p.defaultFieldProcessor = defaultFieldProcessor{
  43. streamFields: fields, timestampFormat: timestampFormat,
  44. }
  45. }
  46. return p, nil
  47. }
  48. // Apply the preprocessor to the tuple
  49. /* input: *xsql.Tuple
  50. * output: *xsql.Tuple
  51. */
  52. func (p *Preprocessor) Apply(ctx api.StreamContext, data interface{}, _ *xsql.FunctionValuer, _ *xsql.AggregateFunctionValuer) interface{} {
  53. log := ctx.GetLogger()
  54. tuple, ok := data.(*xsql.Tuple)
  55. if !ok {
  56. return fmt.Errorf("expect tuple data type")
  57. }
  58. log.Debugf("preprocessor receive %s", tuple.Message)
  59. if p.checkSchema {
  60. if !p.isBinary {
  61. err := p.validateAndConvert(tuple)
  62. if err != nil {
  63. return fmt.Errorf("error in preprocessor: %s", err)
  64. }
  65. } else {
  66. for name := range p.streamFields {
  67. tuple.Message[name] = tuple.Message[message.DefaultField]
  68. delete(tuple.Message, message.DefaultField)
  69. break
  70. }
  71. }
  72. }
  73. if p.isEventTime {
  74. if t, ok := tuple.Message[p.timestampField]; ok {
  75. if ts, err := cast.InterfaceToUnixMilli(t, p.timestampFormat); err != nil {
  76. return fmt.Errorf("cannot convert timestamp field %s to timestamp with error %v", p.timestampField, err)
  77. } else {
  78. tuple.Timestamp = ts
  79. log.Debugf("preprocessor calculate timestamp %d", tuple.Timestamp)
  80. }
  81. } else {
  82. return fmt.Errorf("cannot find timestamp field %s in tuple %v", p.timestampField, tuple.Message)
  83. }
  84. }
  85. // No need to reconstruct meta as the memory has been allocated earlier
  86. //if !p.allMeta && p.metaFields != nil && len(p.metaFields) > 0 {
  87. // newMeta := make(xsql.Metadata)
  88. // for _, f := range p.metaFields {
  89. // if m, ok := tuple.Metadata.Value(f, ""); ok {
  90. // newMeta[f] = m
  91. // }
  92. // }
  93. // tuple.Metadata = newMeta
  94. //}
  95. return tuple
  96. }