with_edgex.go 567 B

12345678910111213141516171819202122232425262728
  1. // +build edgex
  2. package nodes
  3. import (
  4. "github.com/emqx/kuiper/xstream/api"
  5. "github.com/emqx/kuiper/xstream/extensions"
  6. "github.com/emqx/kuiper/xstream/sinks"
  7. )
  8. func getSource(t string) (api.Source, error) {
  9. if t == "edgex" {
  10. return &extensions.EdgexSource{}, nil
  11. }
  12. return doGetSource(t)
  13. }
  14. func getSink(name string, action map[string]interface{}) (api.Sink, error) {
  15. if name == "edgex" {
  16. s := &sinks.EdgexMsgBusSink{}
  17. if err := s.Configure(action); err != nil {
  18. return nil, err
  19. } else {
  20. return s, nil
  21. }
  22. }
  23. return doGetSink(name, action)
  24. }