Просмотр исходного кода

feat(neuron): source, sink and connection

Signed-off-by: Jiyong Huang <huangjy@emqx.io>
Jiyong Huang 3 лет назад
Родитель
Сommit
3f97d37542

+ 4 - 1
internal/binder/io/builtin.go

@@ -1,4 +1,4 @@
-// Copyright 2021 EMQ Technologies Co., Ltd.
+// Copyright 2021-2022 EMQ Technologies Co., Ltd.
 //
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
 // you may not use this file except in compliance with the License.
@@ -16,6 +16,7 @@ package io
 
 
 import (
 import (
 	"github.com/lf-edge/ekuiper/internal/topo/memory"
 	"github.com/lf-edge/ekuiper/internal/topo/memory"
+	"github.com/lf-edge/ekuiper/internal/topo/neuron"
 	"github.com/lf-edge/ekuiper/internal/topo/sink"
 	"github.com/lf-edge/ekuiper/internal/topo/sink"
 	"github.com/lf-edge/ekuiper/internal/topo/source"
 	"github.com/lf-edge/ekuiper/internal/topo/source"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/api"
@@ -30,6 +31,7 @@ var (
 		"httppull": func() api.Source { return &source.HTTPPullSource{} },
 		"httppull": func() api.Source { return &source.HTTPPullSource{} },
 		"file":     func() api.Source { return &source.FileSource{} },
 		"file":     func() api.Source { return &source.FileSource{} },
 		"memory":   func() api.Source { return memory.GetSource() },
 		"memory":   func() api.Source { return memory.GetSource() },
+		"neuron":   func() api.Source { return neuron.GetSource() },
 	}
 	}
 	sinks = map[string]NewSinkFunc{
 	sinks = map[string]NewSinkFunc{
 		"log":         sink.NewLogSink,
 		"log":         sink.NewLogSink,
@@ -38,6 +40,7 @@ var (
 		"rest":        func() api.Sink { return &sink.RestSink{} },
 		"rest":        func() api.Sink { return &sink.RestSink{} },
 		"nop":         func() api.Sink { return &sink.NopSink{} },
 		"nop":         func() api.Sink { return &sink.NopSink{} },
 		"memory":      func() api.Sink { return memory.GetSink() },
 		"memory":      func() api.Sink { return memory.GetSink() },
+		"neuron":      func() api.Sink { return neuron.GetSink() },
 	}
 	}
 )
 )
 
 

+ 5 - 5
internal/topo/memory/manager.go

@@ -47,7 +47,7 @@ func GetSource() *source {
 	return &source{}
 	return &source{}
 }
 }
 
 
-func createPub(topic string) {
+func CreatePub(topic string) {
 	mu.Lock()
 	mu.Lock()
 	defer mu.Unlock()
 	defer mu.Unlock()
 
 
@@ -67,7 +67,7 @@ func createPub(topic string) {
 	}
 	}
 }
 }
 
 
-func createSub(wildcard string, regex *regexp.Regexp, sourceId string, bufferLength int) chan api.SourceTuple {
+func CreateSub(wildcard string, regex *regexp.Regexp, sourceId string, bufferLength int) chan api.SourceTuple {
 	mu.Lock()
 	mu.Lock()
 	defer mu.Unlock()
 	defer mu.Unlock()
 	ch := make(chan api.SourceTuple, bufferLength)
 	ch := make(chan api.SourceTuple, bufferLength)
@@ -87,7 +87,7 @@ func createSub(wildcard string, regex *regexp.Regexp, sourceId string, bufferLen
 	return ch
 	return ch
 }
 }
 
 
-func closeSourceConsumerChannel(topic string, sourceId string) {
+func CloseSourceConsumerChannel(topic string, sourceId string) {
 	mu.Lock()
 	mu.Lock()
 	defer mu.Unlock()
 	defer mu.Unlock()
 
 
@@ -104,7 +104,7 @@ func closeSourceConsumerChannel(topic string, sourceId string) {
 	}
 	}
 }
 }
 
 
-func closeSink(topic string) {
+func RemovePub(topic string) {
 	mu.Lock()
 	mu.Lock()
 	defer mu.Unlock()
 	defer mu.Unlock()
 
 
@@ -116,7 +116,7 @@ func closeSink(topic string) {
 	}
 	}
 }
 }
 
 
-func produce(ctx api.StreamContext, topic string, data map[string]interface{}) {
+func Produce(ctx api.StreamContext, topic string, data map[string]interface{}) {
 	c, exists := pubTopics[topic]
 	c, exists := pubTopics[topic]
 	if !exists {
 	if !exists {
 		return
 		return

+ 5 - 5
internal/topo/memory/manager_test.go

@@ -100,7 +100,7 @@ func TestCreateAndClose(t *testing.T) {
 		chans        []chan api.SourceTuple
 		chans        []chan api.SourceTuple
 	)
 	)
 	for i, topic := range sinkTopics {
 	for i, topic := range sinkTopics {
-		createPub(topic)
+		CreatePub(topic)
 		var (
 		var (
 			r   *regexp.Regexp
 			r   *regexp.Regexp
 			err error
 			err error
@@ -112,7 +112,7 @@ func TestCreateAndClose(t *testing.T) {
 				return
 				return
 			}
 			}
 		}
 		}
-		c := createSub(sourceTopics[i], r, fmt.Sprintf("%d", i), 100)
+		c := CreateSub(sourceTopics[i], r, fmt.Sprintf("%d", i), 100)
 		chans = append(chans, c)
 		chans = append(chans, c)
 	}
 	}
 
 
@@ -151,8 +151,8 @@ func TestCreateAndClose(t *testing.T) {
 	}
 	}
 	i := 0
 	i := 0
 	for i < 3 {
 	for i < 3 {
-		closeSourceConsumerChannel(sourceTopics[i], fmt.Sprintf("%d", i))
-		closeSink(sinkTopics[i])
+		CloseSourceConsumerChannel(sourceTopics[i], fmt.Sprintf("%d", i))
+		RemovePub(sinkTopics[i])
 		i++
 		i++
 	}
 	}
 	expPub = map[string]*pubConsumers{
 	expPub = map[string]*pubConsumers{
@@ -480,7 +480,7 @@ func TestMultipleTopics(t *testing.T) {
 		for c < 3 {
 		for c < 3 {
 			for i, v := range sinkData {
 			for i, v := range sinkData {
 				time.Sleep(10 * time.Millisecond)
 				time.Sleep(10 * time.Millisecond)
-				produce(ctx, sinkTopics[i], v[c])
+				Produce(ctx, sinkTopics[i], v[c])
 			}
 			}
 			c++
 			c++
 		}
 		}

+ 4 - 4
internal/topo/memory/sink.go

@@ -26,7 +26,7 @@ type sink struct {
 
 
 func (s *sink) Open(ctx api.StreamContext) error {
 func (s *sink) Open(ctx api.StreamContext) error {
 	ctx.GetLogger().Debugf("Opening memory sink: %v", s.topic)
 	ctx.GetLogger().Debugf("Opening memory sink: %v", s.topic)
-	createPub(s.topic)
+	CreatePub(s.topic)
 	return nil
 	return nil
 }
 }
 
 
@@ -54,10 +54,10 @@ func (s *sink) Collect(ctx api.StreamContext, data interface{}) error {
 	switch d := data.(type) {
 	switch d := data.(type) {
 	case []map[string]interface{}:
 	case []map[string]interface{}:
 		for _, el := range d {
 		for _, el := range d {
-			produce(ctx, topic, el)
+			Produce(ctx, topic, el)
 		}
 		}
 	case map[string]interface{}:
 	case map[string]interface{}:
-		produce(ctx, topic, d)
+		Produce(ctx, topic, d)
 	default:
 	default:
 		return fmt.Errorf("unrecognized format of %s", data)
 		return fmt.Errorf("unrecognized format of %s", data)
 	}
 	}
@@ -66,6 +66,6 @@ func (s *sink) Collect(ctx api.StreamContext, data interface{}) error {
 
 
 func (s *sink) Close(ctx api.StreamContext) error {
 func (s *sink) Close(ctx api.StreamContext) error {
 	ctx.GetLogger().Debugf("closing memory sink")
 	ctx.GetLogger().Debugf("closing memory sink")
-	closeSink(s.topic)
+	RemovePub(s.topic)
 	return nil
 	return nil
 }
 }

+ 2 - 2
internal/topo/memory/source.go

@@ -29,7 +29,7 @@ type source struct {
 }
 }
 
 
 func (s *source) Open(ctx api.StreamContext, consumer chan<- api.SourceTuple, _ chan<- error) {
 func (s *source) Open(ctx api.StreamContext, consumer chan<- api.SourceTuple, _ chan<- error) {
-	ch := createSub(s.topic, s.topicRegex, fmt.Sprintf("%s_%s_%d", ctx.GetRuleId(), ctx.GetOpId(), ctx.GetInstanceId()), s.bufferLength)
+	ch := CreateSub(s.topic, s.topicRegex, fmt.Sprintf("%s_%s_%d", ctx.GetRuleId(), ctx.GetOpId(), ctx.GetInstanceId()), s.bufferLength)
 	for {
 	for {
 		select {
 		select {
 		case v, opened := <-ch:
 		case v, opened := <-ch:
@@ -78,6 +78,6 @@ func getRegexp(topic string) (*regexp.Regexp, error) {
 
 
 func (s *source) Close(ctx api.StreamContext) error {
 func (s *source) Close(ctx api.StreamContext) error {
 	ctx.GetLogger().Debugf("closing memory source")
 	ctx.GetLogger().Debugf("closing memory source")
-	closeSourceConsumerChannel(s.topic, fmt.Sprintf("%s_%s_%d", ctx.GetRuleId(), ctx.GetOpId(), ctx.GetInstanceId()))
+	CloseSourceConsumerChannel(s.topic, fmt.Sprintf("%s_%s_%d", ctx.GetRuleId(), ctx.GetOpId(), ctx.GetInstanceId()))
 	return nil
 	return nil
 }
 }

+ 29 - 0
internal/topo/mock/mock.go

@@ -0,0 +1,29 @@
+// Copyright 2022 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package mock
+
+import (
+	"github.com/lf-edge/ekuiper/internal/conf"
+	"github.com/lf-edge/ekuiper/internal/topo/context"
+	"github.com/lf-edge/ekuiper/internal/topo/state"
+	"github.com/lf-edge/ekuiper/pkg/api"
+)
+
+func newMockContext(ruleId string, opId string) api.StreamContext {
+	contextLogger := conf.Log.WithField("rule", "mockRule0")
+	ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
+	tempStore, _ := state.CreateStore(ruleId, api.AtMostOnce)
+	return ctx.WithMeta(ruleId, opId, tempStore)
+}

+ 38 - 0
internal/topo/mock/test_sink.go

@@ -0,0 +1,38 @@
+// Copyright 2021-2022 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package mock
+
+import (
+	"fmt"
+	"github.com/lf-edge/ekuiper/pkg/api"
+	"time"
+)
+
+func RunSinkCollect(s api.Sink, data []interface{}) error {
+	ctx := newMockContext("rule1", "op1")
+	err := s.Open(ctx)
+	if err != nil {
+		return err
+	}
+	for _, e := range data {
+		err := s.Collect(ctx, e)
+		if err != nil {
+			return err
+		}
+	}
+	time.Sleep(time.Second)
+	fmt.Println("closing sink")
+	return s.Close(ctx)
+}

+ 70 - 0
internal/topo/mock/test_source.go

@@ -0,0 +1,70 @@
+// Copyright 2021-2022 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package mock
+
+import (
+	"fmt"
+	"github.com/lf-edge/ekuiper/pkg/api"
+	"reflect"
+	"sync/atomic"
+	"testing"
+	"time"
+)
+
+var count atomic.Value
+
+func TestSourceOpen(r api.Source, exp []api.SourceTuple, t *testing.T) {
+	c := count.Load()
+	if c == nil {
+		count.Store(1)
+		c = 0
+	}
+	ctx, cancel := newMockContext(fmt.Sprintf("rule%d", c), "op1").WithCancel()
+	count.Store(c.(int) + 1)
+	consumer := make(chan api.SourceTuple)
+	errCh := make(chan error)
+	go r.Open(ctx, consumer, errCh)
+	ticker := time.After(10 * time.Second)
+	limit := len(exp)
+	var result []api.SourceTuple
+outerloop:
+	for {
+		select {
+		case err := <-errCh:
+			t.Errorf("received error: %v", err)
+			cancel()
+			return
+		case tuple := <-consumer:
+			result = append(result, tuple)
+			limit--
+			if limit <= 0 {
+				break outerloop
+			}
+		case <-ticker:
+			t.Errorf("stop after timeout")
+			t.Errorf("expect %v, but got %v", exp, result)
+			cancel()
+			return
+		}
+	}
+	err := r.Close(ctx)
+	if err != nil {
+		t.Errorf(err.Error())
+		return
+	}
+	if !reflect.DeepEqual(exp, result) {
+		t.Errorf("result mismatch:\n  exp=%s\n  got=%s\n\n", exp, result)
+	}
+}

+ 160 - 0
internal/topo/neuron/connection.go

@@ -0,0 +1,160 @@
+// Copyright 2022 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package neuron
+
+import (
+	"fmt"
+	"github.com/lf-edge/ekuiper/internal/conf"
+	kctx "github.com/lf-edge/ekuiper/internal/topo/context"
+	"github.com/lf-edge/ekuiper/internal/topo/memory"
+	"github.com/lf-edge/ekuiper/internal/topo/state"
+	"github.com/lf-edge/ekuiper/pkg/api"
+	"github.com/lf-edge/ekuiper/pkg/message"
+	"go.nanomsg.org/mangos/v3"
+	"go.nanomsg.org/mangos/v3/protocol/pair"
+	_ "go.nanomsg.org/mangos/v3/transport/ipc"
+	"sync"
+	"time"
+)
+
+const (
+	NeuronTopic = "$$neuron"
+	NeuronUrl   = "ipc:///tmp/neuron-ekuiper.ipc"
+)
+
+var (
+	m               sync.RWMutex
+	connectionCount int
+	sock            mangos.Socket
+	sendTimeout     int
+)
+
+// createOrGetNeuronConnection creates a new neuron connection or returns an existing one
+// This is the entry function for creating a neuron connection singleton
+// The context is from a rule, but the singleton will server for multiple rules
+func createOrGetConnection(sc api.StreamContext, url string) error {
+	m.Lock()
+	defer m.Unlock()
+	sc.GetLogger().Infof("createOrGetConnection count: %d", connectionCount)
+	if connectionCount == 0 {
+		sc.GetLogger().Infof("Creating neuron connection")
+		err := connect(url)
+		if err != nil {
+			return err
+		}
+		sc.GetLogger().Infof("Neuron connected")
+		contextLogger := conf.Log.WithField("neuron_connection", 0)
+		ctx := kctx.WithValue(kctx.Background(), kctx.LoggerKey, contextLogger)
+		ruleId := "$$neuron_connection"
+		opId := "$$neuron_connection"
+		store, err := state.CreateStore(ruleId, 0)
+		if err != nil {
+			ctx.GetLogger().Errorf("neuron connection create store error %v", err)
+			return err
+		}
+		sctx := ctx.WithMeta(ruleId, opId, store)
+		memory.CreatePub(NeuronTopic)
+		go run(sctx)
+		connectionCount++
+	}
+	return nil
+}
+
+func closeConnection(ctx api.StreamContext, url string) error {
+	m.Lock()
+	defer m.Unlock()
+	ctx.GetLogger().Infof("closeConnection count: %d", connectionCount)
+	memory.RemovePub(NeuronTopic)
+	connectionCount--
+	if connectionCount == 0 {
+		err := disconnect(url)
+		if err != nil {
+			return err
+		}
+	}
+	return nil
+}
+
+// nng connections
+
+// connect to nng
+func connect(url string) error {
+	var err error
+	sock, err = pair.NewSocket()
+	if err != nil {
+		return err
+	}
+	// options consider to export
+	err = sock.SetOption(mangos.OptionSendDeadline, time.Duration(sendTimeout)*time.Millisecond)
+	if err != nil {
+		return err
+	}
+	//sock.SetOption(mangos.OptionWriteQLen, 100)
+	//sock.SetOption(mangos.OptionReadQLen, 100)
+	//sock.SetOption(mangos.OptionBestEffort, false)
+	if err = sock.DialOptions(url, map[string]interface{}{
+		mangos.OptionDialAsynch:       false, // will reports error after max reconnect time
+		mangos.OptionMaxReconnectTime: 5 * time.Second,
+		mangos.OptionReconnectTime:    100 * time.Millisecond,
+	}); err != nil {
+		return fmt.Errorf("can't dial to neuron: %s", err.Error())
+	}
+
+	return nil
+}
+
+// run the loop to receive message from the nng connection singleton
+// exit when connection is closed
+func run(ctx api.StreamContext) {
+	ctx.GetLogger().Infof("neuron source receiving loop started")
+	for {
+		// no receiving deadline, will wait until the socket closed
+		if msg, err := sock.Recv(); err == nil {
+			ctx.GetLogger().Debugf("neuron received message %s", string(msg))
+			result, err := message.Decode(msg, message.FormatJson)
+			if err != nil {
+				ctx.GetLogger().Errorf("neuron decode message error %v", err)
+				continue
+			}
+			memory.Produce(ctx, NeuronTopic, result)
+		} else if err == mangos.ErrClosed {
+			ctx.GetLogger().Infof("neuron connection closed, exit receiving loop")
+			return
+		} else {
+			ctx.GetLogger().Errorf("neuron receiving error %v", err)
+		}
+	}
+}
+
+func publish(ctx api.StreamContext, data []byte) error {
+	ctx.GetLogger().Debugf("publish to neuron: %s", string(data))
+	if sock != nil {
+		return sock.Send(data)
+	}
+	return fmt.Errorf("neuron connection is not established")
+}
+
+func disconnect(_ string) error {
+	defer func() {
+		sock = nil
+	}()
+	if sock != nil {
+		err := sock.Close()
+		if err != nil {
+			return err
+		}
+	}
+	return nil
+}

+ 173 - 0
internal/topo/neuron/neuron_test.go

@@ -0,0 +1,173 @@
+// Copyright 2022 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package neuron
+
+import (
+	"fmt"
+	"github.com/lf-edge/ekuiper/internal/topo/mock"
+	"github.com/lf-edge/ekuiper/pkg/api"
+	"go.nanomsg.org/mangos/v3"
+	"go.nanomsg.org/mangos/v3/protocol/pair"
+	_ "go.nanomsg.org/mangos/v3/transport/ipc"
+	"log"
+	"reflect"
+	"sync"
+	"testing"
+	"time"
+)
+
+var data = [][]byte{
+	[]byte("{\"timestamp\": 1646125996000, \"node_name\": \"node1\", \"group_name\": \"group1\", \"values\": {\"tag_name1\": 11.22, \"tag_name2\": \"yellow\"}, \"errors\": {\"tag_name3\": 122}}"),
+	[]byte(`{"timestamp": 1646125996000, "node_name": "node1", "group_name": "group1", "values": {"tag_name1": 11.22, "tag_name2": "green","tag_name3":60}, "errors": {}}`),
+	[]byte(`{"timestamp": 1646125996000, "node_name": "node1", "group_name": "group1", "values": {"tag_name1": 15.4, "tag_name2": "green","tag_name3":70}, "errors": {}}`),
+}
+
+// mockNeuron start the nng pair server
+func mockNeuron(send bool, recv bool) (mangos.Socket, chan []byte) {
+	var (
+		sock mangos.Socket
+		err  error
+		ch   chan []byte
+	)
+	if sock, err = pair.NewSocket(); err != nil {
+		log.Fatalf("can't get new pair socket: %s", err)
+	}
+	if err = sock.Listen("ipc:///tmp/neuron-ekuiper.ipc"); err != nil {
+		log.Fatalf("can't listen on pair socket: %s", err.Error())
+	} else {
+		log.Printf("listen on pair socket")
+	}
+	if recv {
+		ch = make(chan []byte)
+		go func() {
+			for {
+				var msg []byte
+				var err error
+				if msg, err = sock.Recv(); err == nil {
+					fmt.Printf("Neuron RECEIVED: \"%s\"\n", string(msg))
+					ch <- msg
+					fmt.Println("Neuron Sent out")
+				}
+			}
+		}()
+	}
+	if send {
+		go func() {
+			for _, msg := range data {
+				time.Sleep(10 * time.Millisecond)
+				fmt.Printf("Neuron SENDING \"%s\"\n", msg)
+				if err := sock.Send(msg); err != nil {
+					fmt.Printf("failed sending: %s\n", err)
+				}
+			}
+		}()
+	}
+	return sock, ch
+}
+
+// Test scenario of multiple neuron sources and sinks
+func TestMultiSourceSink(t *testing.T) {
+	// start and test 2 sources
+	exp := []api.SourceTuple{
+		api.NewDefaultSourceTuple(map[string]interface{}{"group_name": "group1", "timestamp": 1646125996000.0, "node_name": "node1", "values": map[string]interface{}{"tag_name1": 11.22, "tag_name2": "yellow"}, "errors": map[string]interface{}{"tag_name3": 122.0}}, map[string]interface{}{"topic": "$$neuron"}),
+		api.NewDefaultSourceTuple(map[string]interface{}{"group_name": "group1", "timestamp": 1646125996000.0, "node_name": "node1", "values": map[string]interface{}{"tag_name1": 11.22, "tag_name2": "green", "tag_name3": 60.0}, "errors": map[string]interface{}{}}, map[string]interface{}{"topic": "$$neuron"}),
+		api.NewDefaultSourceTuple(map[string]interface{}{"group_name": "group1", "timestamp": 1646125996000.0, "node_name": "node1", "values": map[string]interface{}{"tag_name1": 15.4, "tag_name2": "green", "tag_name3": 70.0}, "errors": map[string]interface{}{}}, map[string]interface{}{"topic": "$$neuron"}),
+	}
+	s1 := GetSource()
+	err := s1.Configure("new", nil)
+	if err != nil {
+		t.Errorf(err.Error())
+		return
+	}
+	s2 := GetSource()
+	err = s2.Configure("new2", nil)
+	if err != nil {
+		t.Errorf(err.Error())
+		return
+	}
+
+	sin := GetSink()
+	sin.Configure(map[string]interface{}{
+		"nodeName":  "testM",
+		"raw":       false,
+		"groupName": "grp",
+	})
+
+	server, ch := mockNeuron(true, true)
+	defer server.Close()
+	wg := sync.WaitGroup{}
+	wg.Add(3)
+	go func() {
+		mock.TestSourceOpen(s1, exp, t)
+		wg.Done()
+	}()
+	go func() {
+		mock.TestSourceOpen(s2, exp, t)
+		wg.Done()
+	}()
+
+	data := []interface{}{
+		map[string]interface{}{
+			"temperature": 22,
+			"humidity":    50,
+			"status":      "green",
+		},
+		map[string]interface{}{
+			"temperature": 25,
+			"humidity":    82,
+			"status":      "wet",
+		},
+		map[string]interface{}{
+			"temperature": 33,
+			"humidity":    60,
+			"status":      "hot",
+		},
+	}
+	go func() {
+		err = mock.RunSinkCollect(sin, data)
+		if err != nil {
+			t.Errorf(err.Error())
+			return
+		}
+		wg.Done()
+	}()
+	sexp := []string{
+		`{"group_name":"grp","node_name":"testM","tag_name":"humidity","value":50}`,
+		`{"group_name":"grp","node_name":"testM","tag_name":"status","value":"green"}`,
+		`{"group_name":"grp","node_name":"testM","tag_name":"temperature","value":22}`,
+		`{"group_name":"grp","node_name":"testM","tag_name":"humidity","value":82}`,
+		`{"group_name":"grp","node_name":"testM","tag_name":"status","value":"wet"}`,
+		`{"group_name":"grp","node_name":"testM","tag_name":"temperature","value":25}`,
+		`{"group_name":"grp","node_name":"testM","tag_name":"humidity","value":60}`,
+		`{"group_name":"grp","node_name":"testM","tag_name":"status","value":"hot"}`,
+		`{"group_name":"grp","node_name":"testM","tag_name":"temperature","value":33}`,
+	}
+	var actual []string
+	ticker := time.After(10 * time.Second)
+	for i := 0; i < len(sexp); i++ {
+		select {
+		case <-ticker:
+			t.Errorf("timeout")
+			return
+		case d := <-ch:
+			actual = append(actual, string(d))
+		}
+	}
+
+	if !reflect.DeepEqual(actual, sexp) {
+		t.Errorf("result mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", sexp, actual)
+	}
+	wg.Wait()
+}

+ 187 - 0
internal/topo/neuron/sink.go

@@ -0,0 +1,187 @@
+// Copyright 2022 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package neuron
+
+import (
+	"encoding/json"
+	"fmt"
+	"github.com/lf-edge/ekuiper/internal/conf"
+	"github.com/lf-edge/ekuiper/pkg/api"
+	"github.com/lf-edge/ekuiper/pkg/cast"
+	"sort"
+)
+
+type sink struct {
+	url       string
+	c         *c
+	connected bool
+}
+
+type c struct {
+	NodeName  string   `json:"nodeName"`
+	GroupName string   `json:"groupName"`
+	Tags      []string `json:"tags"`
+	// If sent with the raw converted string or let us range over the result map
+	Raw bool `json:"raw"`
+}
+
+type neuronTemplate struct {
+	GroupName string      `json:"group_name"`
+	NodeName  string      `json:"node_name"`
+	TagName   string      `json:"tag_name"`
+	Value     interface{} `json:"value"`
+}
+
+func (s *sink) Configure(props map[string]interface{}) error {
+	s.url = NeuronUrl
+	cc := &c{
+		NodeName:  "unknown",
+		GroupName: "unknown",
+		Raw:       false,
+	}
+	err := cast.MapToStruct(props, cc)
+	if err != nil {
+		return err
+	}
+	s.c = cc
+	return nil
+}
+
+func (s *sink) Open(ctx api.StreamContext) error {
+	ctx.GetLogger().Debugf("Opening neuron sink")
+	err := createOrGetConnection(ctx, s.url)
+	if err != nil {
+		return err
+	}
+	s.connected = true
+	return nil
+}
+
+func (s *sink) Collect(ctx api.StreamContext, data interface{}) error {
+	ctx.GetLogger().Debugf("receive %+v", data)
+	if s.c.Raw {
+		r, _, err := ctx.TransformOutput(data)
+		if err != nil {
+			return err
+		}
+		return publish(ctx, r)
+
+	} else {
+		switch d := data.(type) {
+		case []map[string]interface{}:
+			for _, el := range d {
+				err := s.SendMapToNeuron(ctx, el)
+				if err != nil {
+					ctx.GetLogger().Errorf("Error sending map to neuron: %v", err)
+				}
+			}
+			return nil
+		case map[string]interface{}:
+			return s.SendMapToNeuron(ctx, d)
+		default:
+			ctx.GetLogger().Errorf("unrecognized format of %s", data)
+			return nil
+		}
+	}
+}
+
+func (s *sink) Close(ctx api.StreamContext) error {
+	ctx.GetLogger().Debugf("closing neuron sink")
+	if s.connected {
+		return closeConnection(ctx, s.url)
+	}
+	return nil
+}
+
+func (s *sink) SendMapToNeuron(ctx api.StreamContext, el map[string]interface{}) error {
+	n, err := ctx.ParseTemplate(s.c.NodeName, el)
+	if err != nil {
+		return err
+	}
+	g, err := ctx.ParseTemplate(s.c.GroupName, el)
+	if err != nil {
+		return err
+	}
+	t := &neuronTemplate{
+		NodeName:  n,
+		GroupName: g,
+	}
+	var (
+		ok bool
+	)
+	if len(s.c.Tags) == 0 {
+		if conf.IsTesting {
+			var keys []string
+			for k := range el {
+				keys = append(keys, k)
+			}
+			sort.Strings(keys)
+			for _, k := range keys {
+				t.TagName = k
+				t.Value = el[k]
+				err := doPublish(ctx, t)
+				if err != nil {
+					ctx.GetLogger().Error(err)
+					continue
+				}
+			}
+		} else {
+			for k, v := range el {
+				t.TagName = k
+				t.Value = v
+				err := doPublish(ctx, t)
+				if err != nil {
+					ctx.GetLogger().Error(err)
+					continue
+				}
+			}
+		}
+	} else {
+		for _, tag := range s.c.Tags {
+			t.TagName, err = ctx.ParseTemplate(tag, el)
+			if err != nil {
+				ctx.GetLogger().Errorf("Error parsing tag %s: %v", tag, err)
+				continue
+			}
+			t.Value, ok = el[t.TagName]
+			if !ok {
+				ctx.GetLogger().Errorf("Error get the value of tag %s: %v", t.TagName, err)
+				continue
+			}
+			err := doPublish(ctx, t)
+			if err != nil {
+				ctx.GetLogger().Error(err)
+			}
+		}
+	}
+	return nil
+}
+
+func doPublish(ctx api.StreamContext, t *neuronTemplate) error {
+	r, err := json.Marshal(t)
+	if err != nil {
+		return fmt.Errorf("Error marshall the tag payload %v: %v", t, err)
+	}
+	err = publish(ctx, r)
+	if err != nil {
+		return fmt.Errorf("Error publish the tag payload %s: %v", t.TagName, err)
+	}
+	ctx.GetLogger().Debugf("Publish %s", r)
+	return nil
+}
+
+func GetSink() *sink {
+	return &sink{}
+}

+ 82 - 0
internal/topo/neuron/sink_test.go

@@ -0,0 +1,82 @@
+// Copyright 2022 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package neuron
+
+import (
+	"github.com/lf-edge/ekuiper/internal/topo/mock"
+	"reflect"
+	"testing"
+	"time"
+)
+
+func TestSink(t *testing.T) {
+	server, ch := mockNeuron(false, true)
+	defer server.Close()
+
+	s := GetSink()
+	s.Configure(map[string]interface{}{
+		"nodeName":  "test1",
+		"groupName": "grp",
+		"tags":      []string{"temperature", "status"},
+		"raw":       false,
+	})
+
+	data := []interface{}{
+		map[string]interface{}{
+			"temperature": 22,
+			"humidity":    50,
+			"status":      "green",
+		},
+		map[string]interface{}{
+			"temperature": 25,
+			"humidity":    82,
+			"status":      "wet",
+		},
+		map[string]interface{}{
+			"temperature": 33,
+			"humidity":    60,
+			"status":      "hot",
+		},
+	}
+	err := mock.RunSinkCollect(s, data)
+	if err != nil {
+		t.Errorf(err.Error())
+		return
+	}
+
+	exp := []string{
+		`{"group_name":"grp","node_name":"test1","tag_name":"temperature","value":22}`,
+		`{"group_name":"grp","node_name":"test1","tag_name":"status","value":"green"}`,
+		`{"group_name":"grp","node_name":"test1","tag_name":"temperature","value":25}`,
+		`{"group_name":"grp","node_name":"test1","tag_name":"status","value":"wet"}`,
+		`{"group_name":"grp","node_name":"test1","tag_name":"temperature","value":33}`,
+		`{"group_name":"grp","node_name":"test1","tag_name":"status","value":"hot"}`,
+	}
+	var actual []string
+	ticker := time.After(10 * time.Second)
+	for i := 0; i < len(exp); i++ {
+		select {
+		case <-ticker:
+			t.Errorf("timeout")
+			return
+		case d := <-ch:
+			actual = append(actual, string(d))
+		}
+	}
+
+	if !reflect.DeepEqual(actual, exp) {
+		t.Errorf("result mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", exp, actual)
+	}
+}

+ 71 - 0
internal/topo/neuron/source.go

@@ -0,0 +1,71 @@
+// Copyright 2022 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package neuron
+
+import (
+	"fmt"
+	"github.com/lf-edge/ekuiper/internal/topo/memory"
+	"github.com/lf-edge/ekuiper/pkg/api"
+	"github.com/lf-edge/ekuiper/pkg/cast"
+	"github.com/lf-edge/ekuiper/pkg/infra"
+)
+
+type source struct {
+	url          string
+	bufferLength int
+	connected    bool
+}
+
+func (s *source) Configure(_ string, props map[string]interface{}) error {
+	s.url = NeuronUrl
+	s.bufferLength = 1024
+	if c, ok := props["bufferLength"]; ok {
+		if bl, err := cast.ToInt(c, cast.STRICT); err != nil || bl > 0 {
+			s.bufferLength = bl
+		}
+	}
+	return nil
+}
+
+func (s *source) Open(ctx api.StreamContext, consumer chan<- api.SourceTuple, errCh chan<- error) {
+	err := createOrGetConnection(ctx, s.url)
+	if err != nil {
+		infra.DrainError(ctx, err, errCh)
+		return
+	}
+	defer closeConnection(ctx, s.url)
+	ch := memory.CreateSub(NeuronTopic, nil, fmt.Sprintf("%s_%s_%d", ctx.GetRuleId(), ctx.GetOpId(), ctx.GetInstanceId()), s.bufferLength)
+	defer memory.CloseSourceConsumerChannel(NeuronTopic, fmt.Sprintf("%s_%s_%d", ctx.GetRuleId(), ctx.GetOpId(), ctx.GetInstanceId()))
+	for {
+		select {
+		case v, opened := <-ch:
+			if !opened {
+				return
+			}
+			consumer <- v
+		case <-ctx.Done():
+			return
+		}
+	}
+}
+
+func (s *source) Close(ctx api.StreamContext) error {
+	ctx.GetLogger().Infof("closing neuron source")
+	return nil
+}
+
+func GetSource() *source {
+	return &source{}
+}

+ 39 - 0
internal/topo/neuron/source_test.go

@@ -0,0 +1,39 @@
+// Copyright 2022 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package neuron
+
+import (
+	"github.com/lf-edge/ekuiper/internal/topo/mock"
+	"github.com/lf-edge/ekuiper/pkg/api"
+	_ "go.nanomsg.org/mangos/v3/transport/ipc"
+	"testing"
+)
+
+func TestRun(t *testing.T) {
+	exp := []api.SourceTuple{
+		api.NewDefaultSourceTuple(map[string]interface{}{"group_name": "group1", "timestamp": 1646125996000.0, "node_name": "node1", "values": map[string]interface{}{"tag_name1": 11.22, "tag_name2": "yellow"}, "errors": map[string]interface{}{"tag_name3": 122.0}}, map[string]interface{}{"topic": "$$neuron"}),
+		api.NewDefaultSourceTuple(map[string]interface{}{"group_name": "group1", "timestamp": 1646125996000.0, "node_name": "node1", "values": map[string]interface{}{"tag_name1": 11.22, "tag_name2": "green", "tag_name3": 60.0}, "errors": map[string]interface{}{}}, map[string]interface{}{"topic": "$$neuron"}),
+		api.NewDefaultSourceTuple(map[string]interface{}{"group_name": "group1", "timestamp": 1646125996000.0, "node_name": "node1", "values": map[string]interface{}{"tag_name1": 15.4, "tag_name2": "green", "tag_name3": 70.0}, "errors": map[string]interface{}{}}, map[string]interface{}{"topic": "$$neuron"}),
+	}
+	s := GetSource()
+	err := s.Configure("new", nil)
+	if err != nil {
+		t.Errorf(err.Error())
+		return
+	}
+	server, _ := mockNeuron(true, false)
+	defer server.Close()
+	mock.TestSourceOpen(s, exp, t)
+}

+ 1 - 0
pkg/api/stream.go

@@ -156,6 +156,7 @@ type StreamContext interface {
 	// ParseJsonPath parse the jsonPath string with the given data
 	// ParseJsonPath parse the jsonPath string with the given data
 	ParseJsonPath(jsonPath string, data interface{}) (interface{}, error)
 	ParseJsonPath(jsonPath string, data interface{}) (interface{}, error)
 	// TransformOutput Transform output according to the properties including dataTemplate, sendSingle
 	// TransformOutput Transform output according to the properties including dataTemplate, sendSingle
+	// The second parameter is whether the data is transformed or just return as its json format.
 	TransformOutput(data interface{}) ([]byte, bool, error)
 	TransformOutput(data interface{}) ([]byte, bool, error)
 }
 }