Bladeren bron

feat(extension): move source and sink extension to sources and sinks folder inside plugins

ngjaying 5 jaren geleden
bovenliggende
commit
2d3826c3b0
5 gewijzigde bestanden met toevoegingen van 18 en 13 verwijderingen
  1. 0 0
      etc/sources/RandomSource.yaml
  2. 1 1
      plugins/memory_sink.go
  3. 1 1
      plugins/RandomSource.go
  4. 4 4
      xsql/processors/xsql_processor.go
  5. 12 7
      xstream/api/stream.go

etc/RandomSource.yaml → etc/sources/RandomSource.yaml


+ 1 - 1
plugins/memory_sink.go

@@ -1,4 +1,4 @@
-package main
+package sinks
 
 import "engine/xstream/api"
 

+ 1 - 1
plugins/RandomSource.go

@@ -1,4 +1,4 @@
-package main
+package sources
 
 import (
 	"context"

+ 4 - 4
xsql/processors/xsql_processor.go

@@ -448,7 +448,7 @@ func getSource(streamStmt *xsql.StreamStmt) (api.Source, error) {
 		log.Tracef("Source mqtt created")
 		return mqs, nil
 	default:
-		nf, err := getPlugin(t)
+		nf, err := getPlugin(t, "sources")
 		if err != nil {
 			return nil, err
 		}
@@ -485,8 +485,8 @@ func getConf(t string, confkey string) map[string]interface{} {
 	return props
 }
 
-func getPlugin(t string) (plugin.Symbol, error) {
-	mod := "plugins/" + t + ".so"
+func getPlugin(t string, ptype string) (plugin.Symbol, error) {
+	mod := "plugins/" + ptype + "/" + t + ".so"
 	plug, err := plugin.Open(mod)
 	if err != nil {
 		return nil, fmt.Errorf("cannot open %s: %v", mod, err)
@@ -506,7 +506,7 @@ func getSink(name string, action interface{}) (api.Sink, error) {
 	case "mqtt":
 		return sinks.NewMqttSink(action)
 	default:
-		nf, err := getPlugin(name)
+		nf, err := getPlugin(name, "sinks")
 		if err != nil {
 			return nil, err
 		}

+ 12 - 7
xstream/api/stream.go

@@ -6,24 +6,29 @@ import (
 	"github.com/sirupsen/logrus"
 )
 
-type ConsumeFunc func(xsql.Message, xsql.Metadata)
+//The function to call when data is emitted by the source.
+type ConsumeFunc func(message xsql.Message, metadata xsql.Metadata)
 
 type Closable interface {
-	Close(StreamContext) error
+	Close(ctx StreamContext) error
 }
 
 type Source interface {
 	//Should be sync function for normal case. The container will run it in go func
-	Open(StreamContext, ConsumeFunc) error
-	Configure(string, map[string]interface{}) error
+	Open(ctx StreamContext, consume ConsumeFunc) error
+	//Called during initialization. Configure the source with the data source(e.g. topic for mqtt) and the properties
+	//read from the yaml
+	Configure(datasource string, props map[string]interface{}) error
 	Closable
 }
 
 type Sink interface {
 	//Should be sync function for normal case. The container will run it in go func
-	Open(StreamContext) error
-	Configure(map[string]interface{}) error
-	Collect(StreamContext, interface{}) error
+	Open(ctx StreamContext) error
+	//Called during initialization. Configure the sink with the properties from rule action definition
+	Configure(props map[string]interface{}) error
+	//Called when each row of data has transferred to this sink
+	Collect(ctx StreamContext, data interface{}) error
 	Closable
 }