123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121 |
- package nodes
- import (
- "github.com/emqx/kuiper/common"
- "github.com/emqx/kuiper/xsql"
- "github.com/emqx/kuiper/xstream/api"
- "fmt"
- "github.com/go-yaml/yaml"
- )
- type SourceNode struct {
- source api.Source
- outs map[string]chan<- interface{}
- name string
- ctx api.StreamContext
- options map[string]string
- }
- func NewSourceNode(name string, source api.Source, options map[string]string) *SourceNode {
- return &SourceNode{
- source: source,
- outs: make(map[string]chan<- interface{}),
- name: name,
- options: options,
- ctx: nil,
- }
- }
- func (m *SourceNode) Open(ctx api.StreamContext, errCh chan<- error) {
- m.ctx = ctx
- logger := ctx.GetLogger()
- logger.Debugf("open source node %s with option %v", m.name, m.options)
- go func(){
- props := getConf(m.options["TYPE"], m.options["CONF_KEY"], ctx)
- err := m.source.Configure(m.options["DATASOURCE"], props)
- if err != nil{
- m.drainError(errCh, err, ctx, logger)
- return
- }
- if err := m.source.Open(ctx, func(message map[string]interface{}, meta map[string]interface{}) {
- tuple := &xsql.Tuple{Emitter: m.name, Message: message, Timestamp: common.GetNowInMilli(), Metadata: meta}
- m.Broadcast(tuple)
- logger.Debugf("%s consume data %v complete", m.name, tuple)
- }); err != nil {
- m.drainError(errCh, err, ctx, logger)
- return
- }
- for {
- select {
- case <-ctx.Done():
- logger.Infof("source %s done", m.name)
- if err := m.source.Close(ctx); err != nil {
- logger.Warnf("close source fails: %v", err)
- }
- return
- }
- }
- }()
- }
- func (m *SourceNode) drainError(errCh chan<- error, err error, ctx api.StreamContext, logger api.Logger) {
- select {
- case errCh <- err:
- case <-ctx.Done():
- if err := m.source.Close(ctx); err != nil {
- logger.Warnf("close source fails: %v", err)
- }
- }
- return
- }
- func getConf(t string, confkey string, ctx api.StreamContext) map[string]interface{} {
- logger := ctx.GetLogger()
- if t == ""{
- t = "mqtt"
- }
- confPath := "sources/" + t + ".yaml"
- if t == "mqtt"{
- confPath = "mqtt_source.yaml"
- }
- conf, err := common.LoadConf(confPath)
- props := make(map[string]interface{})
- if err == nil {
- cfg := make(map[string]map[string]interface{})
- if err := yaml.Unmarshal(conf, &cfg); err != nil {
- logger.Warnf("fail to parse yaml for source %s. Return an empty configuration", t)
- } else {
- var ok bool
- props, ok = cfg["default"]
- if !ok {
- logger.Warnf("default conf is not found", confkey)
- }
- if c, ok := cfg[confkey]; ok {
- for k, v := range c {
- props[k] = v
- }
- }
- }
- } else {
- logger.Warnf("config file %s.yaml is not loaded properly. Return an empty configuration", t)
- }
- logger.Debugf("get conf for %s with conf key %s: %v", t, confkey, props)
- return props
- }
- func (m *SourceNode) Broadcast(data interface{}) int{
- return Broadcast(m.outs, data, m.ctx)
- }
- func (m *SourceNode) GetName() string{
- return m.name
- }
- func (m *SourceNode) AddOutput(output chan<- interface{}, name string) (err error) {
- if _, ok := m.outs[name]; !ok{
- m.outs[name] = output
- }else{
- return fmt.Errorf("fail to add output %s, stream node %s already has an output of the same name", name, m.name)
- }
- return nil
- }
|