12345678910111213141516171819202122232425262728293031323334353637383940414243444546 |
- package operators
- import (
- "fmt"
- "github.com/emqx/kuiper/xsql"
- "github.com/emqx/kuiper/xstream/api"
- )
- type TableProcessor struct {
- //Pruned stream fields. Could be streamField(with data type info) or string
- defaultFieldProcessor
- }
- func NewTableProcessor(fields []interface{}, fs xsql.Fields, timestampFormat string) (*TableProcessor, error) {
- p := &TableProcessor{}
- p.defaultFieldProcessor = defaultFieldProcessor{
- streamFields: fields, aliasFields: fs, isBinary: false, timestampFormat: timestampFormat,
- }
- return p, nil
- }
- /*
- * input: []*xsql.Tuple
- * output: WindowTuples
- */
- func (p *TableProcessor) Apply(ctx api.StreamContext, data interface{}, fv *xsql.FunctionValuer, _ *xsql.AggregateFunctionValuer) interface{} {
- logger := ctx.GetLogger()
- tuples, ok := data.([]*xsql.Tuple)
- if !ok {
- return fmt.Errorf("expect []*xsql.Tuple data type")
- }
- logger.Debugf("Start to process table fields")
- w := xsql.WindowTuples{
- Emitter: tuples[0].Emitter,
- Tuples: make([]xsql.Tuple, len(tuples)),
- }
- for i, t := range tuples {
- result, err := p.processField(t, fv)
- if err != nil {
- return fmt.Errorf("error in table processor: %s", err)
- }
- t.Message = result
- w.Tuples[i] = *t
- }
- return w
- }
|