table_processor.go 1.2 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546
  1. package operators
  2. import (
  3. "fmt"
  4. "github.com/emqx/kuiper/xsql"
  5. "github.com/emqx/kuiper/xstream/api"
  6. )
  7. type TableProcessor struct {
  8. //Pruned stream fields. Could be streamField(with data type info) or string
  9. defaultFieldProcessor
  10. }
  11. func NewTableProcessor(fields []interface{}, fs xsql.Fields, timestampFormat string) (*TableProcessor, error) {
  12. p := &TableProcessor{}
  13. p.defaultFieldProcessor = defaultFieldProcessor{
  14. streamFields: fields, aliasFields: fs, isBinary: false, timestampFormat: timestampFormat,
  15. }
  16. return p, nil
  17. }
  18. /*
  19. * input: []*xsql.Tuple
  20. * output: WindowTuples
  21. */
  22. func (p *TableProcessor) Apply(ctx api.StreamContext, data interface{}, fv *xsql.FunctionValuer, _ *xsql.AggregateFunctionValuer) interface{} {
  23. logger := ctx.GetLogger()
  24. tuples, ok := data.([]*xsql.Tuple)
  25. if !ok {
  26. return fmt.Errorf("expect []*xsql.Tuple data type")
  27. }
  28. logger.Debugf("Start to process table fields")
  29. w := xsql.WindowTuples{
  30. Emitter: tuples[0].Emitter,
  31. Tuples: make([]xsql.Tuple, len(tuples)),
  32. }
  33. for i, t := range tuples {
  34. result, err := p.processField(t, fv)
  35. if err != nil {
  36. return fmt.Errorf("error in table processor: %s", err)
  37. }
  38. t.Message = result
  39. w.Tuples[i] = *t
  40. }
  41. return w
  42. }