sources_for_test_with_edgex.go 698 B

1234567891011121314151617181920212223242526272829303132
  1. // +build test
  2. // +build edgex
  3. package nodes
  4. import (
  5. "github.com/emqx/kuiper/xstream/api"
  6. "github.com/emqx/kuiper/xstream/extensions"
  7. "github.com/emqx/kuiper/xstream/sinks"
  8. "github.com/emqx/kuiper/xstream/topotest/mocknodes"
  9. )
  10. func getSource(t string) (api.Source, error) {
  11. if t == "edgex" {
  12. return &extensions.EdgexSource{}, nil
  13. } else if t == "mock" {
  14. return &mocknodes.MockSource{}, nil
  15. }
  16. return doGetSource(t)
  17. }
  18. func getSink(name string, action map[string]interface{}) (api.Sink, error) {
  19. if name == "edgex" {
  20. s := &sinks.EdgexMsgBusSink{}
  21. if err := s.Configure(action); err != nil {
  22. return nil, err
  23. } else {
  24. return s, nil
  25. }
  26. }
  27. return doGetSink(name, action)
  28. }