12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273 |
- // Copyright 2022-2023 EMQ Technologies Co., Ltd.
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- package store
- import (
- "context"
- "fmt"
- "regexp"
- "github.com/lf-edge/ekuiper/internal/conf"
- "github.com/lf-edge/ekuiper/internal/io/memory/pubsub"
- "github.com/lf-edge/ekuiper/pkg/ast"
- )
- // Reg registers a topic to save it to memory store
- // Create a new go routine to listen to the topic and save the data to memory
- func Reg(topic string, topicRegex *regexp.Regexp, key string) (*Table, error) {
- t, isNew := db.addTable(topic, key)
- if isNew {
- go runTable(topic, topicRegex, t)
- }
- return t, nil
- }
- // runTable should only run in a single instance.
- // This go routine is used to accumulate data in memory
- // If the go routine close, the go routine exits but the data will be kept until table dropped
- func runTable(topic string, topicRegex *regexp.Regexp, t *Table) {
- conf.Log.Infof("runTable %s", topic)
- ch := pubsub.CreateSub(topic, topicRegex, fmt.Sprintf("store_%s", topic), 1024)
- ctx, cancel := context.WithCancel(context.Background())
- t.cancel = cancel
- for {
- select {
- case v, opened := <-ch:
- if !opened { // exit go routine is not sync with drop table
- return
- }
- switch vv := v.(type) {
- case *pubsub.UpdatableTuple:
- switch vv.Rowkind {
- case ast.RowkindInsert, ast.RowkindUpdate, ast.RowkindUpsert:
- t.add(vv.DefaultSourceTuple)
- case ast.RowkindDelete:
- t.delete(vv.Keyval)
- }
- default:
- t.add(v)
- }
- conf.Log.Debugf("receive data %v for %s", v, topic)
- case <-ctx.Done():
- return
- }
- }
- }
- // Unreg unregisters a topic to remove it from memory store
- func Unreg(topic string, key string) error {
- // Must be an atomic operation
- return db.dropTable(topic, key)
- }
|