123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157 |
- // Copyright 2022 EMQ Technologies Co., Ltd.
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- package neuron
- import (
- "fmt"
- "github.com/lf-edge/ekuiper/internal/conf"
- kctx "github.com/lf-edge/ekuiper/internal/topo/context"
- "github.com/lf-edge/ekuiper/internal/topo/memory"
- "github.com/lf-edge/ekuiper/internal/topo/state"
- "github.com/lf-edge/ekuiper/pkg/api"
- "github.com/lf-edge/ekuiper/pkg/message"
- "go.nanomsg.org/mangos/v3"
- "go.nanomsg.org/mangos/v3/protocol/pair"
- _ "go.nanomsg.org/mangos/v3/transport/ipc"
- "sync"
- "time"
- )
- const (
- NeuronTopic = "$$neuron"
- NeuronUrl = "ipc:///tmp/neuron-ekuiper.ipc"
- )
- var (
- m sync.RWMutex
- connectionCount int
- sock mangos.Socket
- sendTimeout int
- )
- // createOrGetNeuronConnection creates a new neuron connection or returns an existing one
- // This is the entry function for creating a neuron connection singleton
- // The context is from a rule, but the singleton will server for multiple rules
- func createOrGetConnection(sc api.StreamContext, url string) error {
- m.Lock()
- defer m.Unlock()
- sc.GetLogger().Infof("createOrGetConnection count: %d", connectionCount)
- if connectionCount == 0 {
- sc.GetLogger().Infof("Creating neuron connection")
- err := connect(url)
- if err != nil {
- return err
- }
- sc.GetLogger().Infof("Neuron connected")
- contextLogger := conf.Log.WithField("neuron_connection", 0)
- ctx := kctx.WithValue(kctx.Background(), kctx.LoggerKey, contextLogger)
- ruleId := "$$neuron_connection"
- opId := "$$neuron_connection"
- store, err := state.CreateStore(ruleId, 0)
- if err != nil {
- ctx.GetLogger().Errorf("neuron connection create store error %v", err)
- return err
- }
- sctx := ctx.WithMeta(ruleId, opId, store)
- memory.CreatePub(NeuronTopic)
- go run(sctx)
- }
- connectionCount++
- return nil
- }
- func closeConnection(ctx api.StreamContext, url string) error {
- m.Lock()
- defer m.Unlock()
- ctx.GetLogger().Infof("closeConnection count: %d", connectionCount)
- memory.RemovePub(NeuronTopic)
- if connectionCount == 1 {
- err := disconnect(url)
- if err != nil {
- return err
- }
- }
- connectionCount--
- return nil
- }
- // nng connections
- // connect to nng
- func connect(url string) error {
- var err error
- sock, err = pair.NewSocket()
- if err != nil {
- return err
- }
- // options consider to export
- err = sock.SetOption(mangos.OptionSendDeadline, time.Duration(sendTimeout)*time.Millisecond)
- if err != nil {
- return err
- }
- //sock.SetOption(mangos.OptionWriteQLen, 100)
- //sock.SetOption(mangos.OptionReadQLen, 100)
- //sock.SetOption(mangos.OptionBestEffort, false)
- if err = sock.DialOptions(url, map[string]interface{}{
- mangos.OptionDialAsynch: false, // will reports error after max reconnect time
- mangos.OptionMaxReconnectTime: 5 * time.Second,
- mangos.OptionReconnectTime: 100 * time.Millisecond,
- }); err != nil {
- return fmt.Errorf("please make sure neuron has started and configured, can't dial to neuron: %s", err.Error())
- }
- return nil
- }
- // run the loop to receive message from the nng connection singleton
- // exit when connection is closed
- func run(ctx api.StreamContext) {
- ctx.GetLogger().Infof("neuron source receiving loop started")
- for {
- // no receiving deadline, will wait until the socket closed
- if msg, err := sock.Recv(); err == nil {
- ctx.GetLogger().Debugf("neuron received message %s", string(msg))
- result, err := message.Decode(msg, message.FormatJson)
- if err != nil {
- ctx.GetLogger().Errorf("neuron decode message error %v", err)
- continue
- }
- memory.Produce(ctx, NeuronTopic, result)
- } else if err == mangos.ErrClosed {
- ctx.GetLogger().Infof("neuron connection closed, exit receiving loop")
- return
- } else {
- ctx.GetLogger().Errorf("neuron receiving error %v", err)
- }
- }
- }
- func publish(ctx api.StreamContext, data []byte) error {
- ctx.GetLogger().Debugf("publish to neuron: %s", string(data))
- if sock != nil {
- return sock.Send(data)
- }
- return fmt.Errorf("neuron connection is not established")
- }
- func disconnect(_ string) error {
- if sock != nil {
- err := sock.Close()
- if err != nil {
- return err
- }
- }
- return nil
- }
|