123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899 |
- package nodes
- import (
- "github.com/emqx/kuiper/common"
- "github.com/emqx/kuiper/xsql"
- "github.com/emqx/kuiper/xstream/api"
- "github.com/emqx/kuiper/xstream/extensions"
- )
- // Node for table source
- type TableNode struct {
- *defaultNode
- sourceType string
- options map[string]string
- }
- func NewTableNode(name string, options map[string]string) *TableNode {
- t, ok := options["TYPE"]
- if !ok {
- t = "file"
- }
- return &TableNode{
- sourceType: t,
- defaultNode: &defaultNode{
- name: name,
- outputs: make(map[string]chan<- interface{}),
- concurrency: 1,
- },
- options: options,
- }
- }
- func (m *TableNode) Open(ctx api.StreamContext, errCh chan<- error) {
- m.ctx = ctx
- logger := ctx.GetLogger()
- logger.Infof("open table node %s with option %v", m.name, m.options)
- go func() {
- props := getSourceConf(ctx, m.sourceType, m.options)
- //TODO apply properties like concurrency
- source, err := doGetTableSource(m.sourceType)
- if err != nil {
- m.drainError(errCh, err, ctx)
- return
- }
- err = source.Configure(m.options["DATASOURCE"], props)
- if err != nil {
- m.drainError(errCh, err, ctx)
- return
- }
- stats, err := NewStatManager("source", ctx)
- if err != nil {
- m.drainError(errCh, err, ctx)
- return
- }
- m.statManagers = append(m.statManagers, stats)
- stats.ProcessTimeStart()
- if data, err := source.Load(ctx); err != nil {
- stats.IncTotalExceptions()
- stats.ProcessTimeEnd()
- m.drainError(errCh, err, ctx)
- return
- } else {
- stats.IncTotalRecordsIn()
- stats.ProcessTimeEnd()
- logger.Debugf("table node %s is sending result", m.name)
- result := make([]*xsql.Tuple, len(data))
- for i, t := range data {
- tuple := &xsql.Tuple{Emitter: m.name, Message: t.Message(), Metadata: t.Meta(), Timestamp: common.GetNowInMilli()}
- result[i] = tuple
- }
- m.doBroadcast(result)
- stats.IncTotalRecordsOut()
- logger.Debugf("table node %s has consumed all data", m.name)
- }
- }()
- }
- func (m *TableNode) drainError(errCh chan<- error, err error, ctx api.StreamContext) {
- select {
- case errCh <- err:
- case <-ctx.Done():
- }
- return
- }
- func doGetTableSource(t string) (api.TableSource, error) {
- var s api.TableSource
- switch t {
- case "file":
- s = &extensions.FileSource{}
- default: //TODO table source plugin
- //s, err = plugins.GetTableSource(t)
- //if err != nil {
- // return nil, err
- //}
- }
- return s, nil
- }
|