Bladeren bron

feat(source, sink) memory source and sink

Signed-off-by: Robert Wadowski <robert.wadowski1@gmail.com>
Signed-off-by: Jiyong Huang <huangjy@emqx.io>
Robert Wadowski 3 jaren geleden
bovenliggende
commit
114babcbdb

+ 125 - 0
internal/topo/node/shared/manager.go

@@ -0,0 +1,125 @@
+// Copyright 2021 INTECH Process Automation Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package shared
+
+import (
+	"fmt"
+	"sync"
+)
+
+const IdProperty = "id"
+
+type channels struct {
+	id        string
+	consumers map[string]chan map[string]interface{}
+}
+
+var sinkChannels = make(map[string]*channels)
+var mu = sync.Mutex{}
+
+func GetSink(props map[string]interface{}) (*sink, error) {
+	id, err := getId(props)
+	if err != nil {
+		return nil, err
+	}
+	ch, err := getOrCreateSinkChannels(id)
+	if err != nil {
+		return nil, err
+	}
+	s := &sink{
+		id: id,
+		ch: ch,
+	}
+	return s, nil
+}
+
+func GetSource() *source {
+	return &source{}
+}
+
+func getOrCreateSinkChannels(sink string) (*channels, error) {
+	mu.Lock()
+	defer mu.Unlock()
+
+	if c, exists := sinkChannels[sink]; exists {
+		return c, nil
+	}
+	c := createChannels(sink)
+	sinkChannels[sink] = c
+	return c, nil
+}
+
+func getOrCreateSinkConsumerChannel(sink string, source string) (chan map[string]interface{}, error) {
+	mu.Lock()
+	defer mu.Unlock()
+	var sinkConsumerChannels *channels
+	if c, exists := sinkChannels[sink]; exists {
+		sinkConsumerChannels = c
+	} else {
+		sinkConsumerChannels = createChannels(sink)
+	}
+	var ch chan map[string]interface{}
+	if sourceChannel, exists := sinkConsumerChannels.consumers[source]; exists {
+		ch = sourceChannel
+	} else {
+		ch = make(chan map[string]interface{})
+		sinkConsumerChannels.consumers[source] = ch
+	}
+	return ch, nil
+}
+
+func getId(props map[string]interface{}) (string, error) {
+	if t, ok := props[IdProperty]; ok {
+		if id, casted := t.(string); casted {
+			return id, nil
+		}
+		return "", fmt.Errorf("can't cast value %s to string", t)
+	}
+	return "", fmt.Errorf("there is no topic property in the memory action")
+}
+
+func closeSourceConsumerChannel(sink string, source string) error {
+	mu.Lock()
+	defer mu.Unlock()
+
+	if sinkConsumerChannels, exists := sinkChannels[sink]; exists {
+		if sourceChannel, exists := sinkConsumerChannels.consumers[source]; exists {
+			close(sourceChannel)
+			delete(sinkConsumerChannels.consumers, source)
+		}
+	}
+	return nil
+}
+
+func closeSink(sink string) error {
+	mu.Lock()
+	defer mu.Unlock()
+
+	if sinkConsumerChannels, exists := sinkChannels[sink]; exists {
+		for s, c := range sinkConsumerChannels.consumers {
+			close(c)
+			delete(sinkConsumerChannels.consumers, s)
+		}
+	}
+	delete(sinkChannels, sink)
+	return nil
+}
+
+func createChannels(sink string) *channels {
+	return &channels{
+		id:        sink,
+		consumers: make(map[string]chan map[string]interface{}),
+	}
+}

+ 85 - 0
internal/topo/node/shared/manager_test.go

@@ -0,0 +1,85 @@
+// Copyright 2021 INTECH Process Automation Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package shared
+
+import (
+	"encoding/json"
+	"github.com/lf-edge/ekuiper/internal/topo/context"
+	"github.com/lf-edge/ekuiper/pkg/api"
+	"reflect"
+	"testing"
+)
+
+func TestSharedInmemoryNode(t *testing.T) {
+
+	id := "test_id"
+	sinkProps := make(map[string]interface{})
+	sinkProps[IdProperty] = id
+	src := GetSource()
+	snk, err := GetSink(sinkProps)
+	if err != nil {
+		t.Error(err)
+	}
+	ctx := context.Background()
+	consumer := make(chan api.SourceTuple)
+	errorChannel := make(chan error)
+	srcProps := make(map[string]interface{})
+	srcProps["option"] = "value"
+
+	go func() {
+		src.Open(ctx, consumer, errorChannel)
+	}()
+	err = src.Configure(id, srcProps)
+	if err != nil {
+		t.Error(err)
+	}
+
+	srcProps[IdProperty] = id
+
+	if _, contains := sinkChannels[id]; !contains {
+		t.Errorf("there should be memory node for topic")
+	}
+
+	data := make(map[string]interface{})
+	data["temperature"] = 33.0
+	list := make([]map[string]interface{}, 0)
+	list = append(list, data)
+	go func() {
+		var buf []byte
+		buf, err = asJsonBytes(list)
+		if err != nil {
+			t.Error(err)
+		}
+		err = snk.Collect(ctx, buf)
+		if err != nil {
+			t.Error(err)
+		}
+	}()
+	for {
+		select {
+		case res := <-consumer:
+			expected := api.NewDefaultSourceTuple(data, make(map[string]interface{}))
+			if !reflect.DeepEqual(expected, res) {
+				t.Errorf("result %s should be equal to %s", res, expected)
+			}
+			return
+		default:
+		}
+	}
+}
+
+func asJsonBytes(m []map[string]interface{}) ([]byte, error) {
+	return json.Marshal(m)
+}

+ 63 - 0
internal/topo/node/shared/sink.go

@@ -0,0 +1,63 @@
+// Copyright 2021 INTECH Process Automation Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package shared
+
+import (
+	"encoding/json"
+	"fmt"
+	"github.com/lf-edge/ekuiper/pkg/api"
+)
+
+type sink struct {
+	id string
+	ch *channels
+}
+
+func (s *sink) Open(ctx api.StreamContext) error {
+	return nil
+}
+
+func (s *sink) Configure(props map[string]interface{}) error {
+	return nil
+}
+
+func (s *sink) Collect(ctx api.StreamContext, data interface{}) error {
+	if b, casted := data.([]byte); casted {
+		d, err := toMap(b)
+		if err != nil {
+			return err
+		}
+		for _, el := range d {
+			for _, c := range s.ch.consumers {
+				c <- el
+			}
+		}
+		return nil
+	}
+	return fmt.Errorf("unrecognized format of %s", data)
+}
+
+func (s *sink) Close(ctx api.StreamContext) error {
+	return closeSink(s.id)
+}
+
+func toMap(data []byte) ([]map[string]interface{}, error) {
+	res := make([]map[string]interface{}, 0)
+	err := json.Unmarshal(data, &res)
+	if err != nil {
+		return nil, err
+	}
+	return res, nil
+}

+ 41 - 0
internal/topo/node/shared/source.go

@@ -0,0 +1,41 @@
+package shared
+
+import (
+	"github.com/lf-edge/ekuiper/pkg/api"
+)
+
+type source struct {
+	sink  string
+	id    string
+	input chan map[string]interface{}
+}
+
+func (s *source) Open(ctx api.StreamContext, consumer chan<- api.SourceTuple, errCh chan<- error) {
+	s.id = ctx.GetOpId()
+	ch, err := getOrCreateSinkConsumerChannel(s.sink, s.id)
+	if err != nil {
+		errCh <- err
+		return
+	}
+	s.input = ch
+	for {
+		select {
+		case v, opened := <-s.input:
+			if !opened {
+				return
+			}
+			consumer <- api.NewDefaultSourceTuple(v, make(map[string]interface{}))
+		case <-ctx.Done():
+			return
+		}
+	}
+}
+
+func (s *source) Configure(datasource string, props map[string]interface{}) error {
+	s.sink = datasource
+	return nil
+}
+
+func (s *source) Close(ctx api.StreamContext) error {
+	return closeSourceConsumerChannel(s.sink, s.id)
+}