Ver código fonte

fix(state): add extStateType config (#1923)

* fix(state): add extStateType config

Signed-off-by: Jianxiang Ran <rxan_embedded@163.com>

* fix(state): add default value for extStateType

Signed-off-by: Jianxiang Ran <rxan_embedded@163.com>

---------

Signed-off-by: Jianxiang Ran <rxan_embedded@163.com>
superxan 1 ano atrás
pai
commit
ada7383ac7

+ 1 - 0
deploy/chart/ekuiper/values.yaml

@@ -170,6 +170,7 @@ kuiperConfig:
     store:
     store:
       #Type of store that will be used for keeping state of the application
       #Type of store that will be used for keeping state of the application
       type: sqlite
       type: sqlite
+      extStateType: sqlite
       redis:
       redis:
         host: localhost
         host: localhost
         port: 6379
         port: 6379

+ 9 - 2
docs/en_US/configuration/global_configurations.md

@@ -164,13 +164,21 @@ It has properties
   * only applicable to redis connection information
   * only applicable to redis connection information
   * the server, port and password in connection info will overwrite the host port and password above
   * the server, port and password in connection info will overwrite the host port and password above
   * [more info](../guide/sources/builtin/edgex.md#connectionselector)
   * [more info](../guide/sources/builtin/edgex.md#connectionselector)
-    
+
+### External State
+
+There is also a configuration item named `extStateType`.
+The configuration's usage is user can store some information in database in advance, when stream processing rules need these information,
+they can get them easily by [get_keyed_state](../sqls/built-in_functions.md#other-functions) function in SQL.
+
+*Note*: `type` and `extStateType` can be configured differently.
 
 
 ### Config
 ### Config
 ```yaml
 ```yaml
     store:
     store:
       #Type of store that will be used for keeping state of the application
       #Type of store that will be used for keeping state of the application
       type: sqlite
       type: sqlite
+      extStateType: redis
       redis:
       redis:
         host: localhost
         host: localhost
         port: 6379
         port: 6379
@@ -181,7 +189,6 @@ It has properties
         #Sqlite file name, if left empty name of db will be sqliteKV.db
         #Sqlite file name, if left empty name of db will be sqliteKV.db
         name:
         name:
 ```
 ```
-
 ## Portable plugin configurations
 ## Portable plugin configurations
 
 
 This section configures the portable plugin runtime.
 This section configures the portable plugin runtime.

+ 1 - 1
docs/en_US/sqls/built-in_functions.md

@@ -263,7 +263,7 @@ select lag(Status) as Status, ts - lag(ts, 1, ts) OVER (WHEN had_changed(true, s
 | meta            | meta(topic)                          | Returns the meta-data of specified key. The key could be:<br/> - a standalone key if there is only one source in the from clause, such as `meta(device)`<br />- A qualified key to specify the stream, such as `meta(src1.device)` <br />- A key with arrow for multi level meta data, such as `meta(src1.reading->device->name)` This assumes reading is a map structure meta data.              |
 | meta            | meta(topic)                          | Returns the meta-data of specified key. The key could be:<br/> - a standalone key if there is only one source in the from clause, such as `meta(device)`<br />- A qualified key to specify the stream, such as `meta(src1.device)` <br />- A key with arrow for multi level meta data, such as `meta(src1.reading->device->name)` This assumes reading is a map structure meta data.              |
 | window_start    | window_start()                       | Return the window start timestamp in int64 format. If there is no time window, it returns 0. The window time is aligned with the timestamp notion of the rule. If the rule is using processing time, then the window start timestamp is the processing timestamp. If the rule is using event time, then the window start timestamp is the event timestamp.                                        |
 | window_start    | window_start()                       | Return the window start timestamp in int64 format. If there is no time window, it returns 0. The window time is aligned with the timestamp notion of the rule. If the rule is using processing time, then the window start timestamp is the processing timestamp. If the rule is using event time, then the window start timestamp is the event timestamp.                                        |
 | window_end      | window_end()                         | Return the window end timestamp in int64 format. If there is no time window, it returns 0. The window time is aligned with the timestamp notion of the rule. If the rule is using processing time, then the window start timestamp is the processing timestamp. If the rule is using event time, then the window start timestamp is the event timestamp.                                          |
 | window_end      | window_end()                         | Return the window end timestamp in int64 format. If there is no time window, it returns 0. The window time is aligned with the timestamp notion of the rule. If the rule is using processing time, then the window start timestamp is the processing timestamp. If the rule is using event time, then the window start timestamp is the event timestamp.                                          |
-| get_keyed_state | get_keyed_state(expr1, expr2, expr3) | Return the keyed value in database. First parameter is the key, second is data format of the value, support bigint, float, string, boolean and datetime. third is the default value if key not exist                                                                                                                                                                                              |
+| get_keyed_state | get_keyed_state(expr1, expr2, expr3) | Return the keyed value in database. First parameter is the key, second is data format of the value, support bigint, float, string, boolean and datetime. third is the default value if key not exist. Default database is sqlite, users can change the database by this [configuration](../configuration/global_configurations.md#external-state).                                                |
 | delay           | delay(delayTime, returnVal)          | Delay the execution of the rule for a specified time and then return the returnVal.                                                                                                                                                                                                                                                                                                               |
 | delay           | delay(delayTime, returnVal)          | Delay the execution of the rule for a specified time and then return the returnVal.                                                                                                                                                                                                                                                                                                               |
 
 
 ## Multiple Column Functions
 ## Multiple Column Functions

+ 5 - 0
docs/zh_CN/configuration/global_configurations.md

@@ -164,6 +164,10 @@ GET http://localhost:9081/plugins/functions/prebuild
     * 连接信息中的 server, port 和 password 会覆盖以上定义的 host, port 和 password
     * 连接信息中的 server, port 和 password 会覆盖以上定义的 host, port 和 password
     * [具体信息可参考](../guide/sources/builtin/edgex.md#connectionselector)
     * [具体信息可参考](../guide/sources/builtin/edgex.md#connectionselector)
 
 
+### 外部状态
+
+还有一个名为 `extStateType` 的配置项。 这个配置的用途是用户可以预先在数据库中存储一些信息,当流处理规则需要这些信息时,他们可以通过 SQL 中的 [get_keyed_state](../sqls/built-in_functions.md#其它函数) 函数轻松获取它们。
+*注意*:`type` 和 `extStateType` 可以使用不同的存储配置。
 
 
 ### 配置示例
 ### 配置示例
 
 
@@ -171,6 +175,7 @@ GET http://localhost:9081/plugins/functions/prebuild
     store:
     store:
       #Type of store that will be used for keeping state of the application
       #Type of store that will be used for keeping state of the application
       type: sqlite
       type: sqlite
+      extStateType: redis
       redis:
       redis:
         host: localhost
         host: localhost
         port: 6379
         port: 6379

+ 7 - 7
docs/zh_CN/sqls/built-in_functions.md

@@ -180,12 +180,12 @@ eKuiper 具有许多内置函数,可以对数据执行计算。
 
 
 ## 列表函数
 ## 列表函数
 
 
-| 函数         | 示例                    | 说明              |
-|------------|-----------------------|-----------------|
-| array_position | array_postion(array, value) | 返回第二个参数在列表参数中的索引下标位置,索引下标从 0 开始,若该元素不存在,则返回 -1 |
-| element_at | element_at(array, index) | 返回列表参数中在给定索引下的元素,索引下标从 0 开始,若该索引小于 0,则该元素从列表末向列表头进行计数 |
-| array_contains | array_contains(array, value) | 返回给定元素是否存在列表参数中,存在则返回 true,否则返回 false |
-| array_create | array_create(value1, ......) | 将给定的元素参数们创建为一个列表元素 |
+| 函数             | 示例                           | 说明                                                    |
+|----------------|------------------------------|-------------------------------------------------------|
+| array_position | array_postion(array, value)  | 返回第二个参数在列表参数中的索引下标位置,索引下标从 0 开始,若该元素不存在,则返回 -1        |
+| element_at     | element_at(array, index)     | 返回列表参数中在给定索引下的元素,索引下标从 0 开始,若该索引小于 0,则该元素从列表末向列表头进行计数 |
+| array_contains | array_contains(array, value) | 返回给定元素是否存在列表参数中,存在则返回 true,否则返回 false                 |
+| array_create   | array_create(value1, ......) | 将给定的元素参数们创建为一个列表元素                                    |
 
 
 ## 对象函数
 ## 对象函数
 
 
@@ -260,7 +260,7 @@ select lag(Status) as Status, ts - lag(ts, 1, ts) OVER (WHEN had_changed(true, s
 | meta            | meta(topic)                          | 返回指定键的元数据。 键可能是:<br/>-如果 from 子句中只有一个来源,则为独立键,例如`meta(device)`<br />-用于指定流的合格键,例如 `meta(src1.device)` <br />-用于多级元数据的带有箭头的键,例如 `meta(src1.reading->device->name)`。这里假定读取是地图结构元数据。 |
 | meta            | meta(topic)                          | 返回指定键的元数据。 键可能是:<br/>-如果 from 子句中只有一个来源,则为独立键,例如`meta(device)`<br />-用于指定流的合格键,例如 `meta(src1.device)` <br />-用于多级元数据的带有箭头的键,例如 `meta(src1.reading->device->name)`。这里假定读取是地图结构元数据。 |
 | window_start    | window_start()                       | 返回窗口的开始时间戳,格式为 int64。若运行时没有时间窗口,则返回默认值0。窗口的时间与规则所用的时间系统相同。若规则采用处理时间,则窗口的时间也为处理时间;若规则采用事件事件,则窗口的时间也为事件时间。                                                                          |
 | window_start    | window_start()                       | 返回窗口的开始时间戳,格式为 int64。若运行时没有时间窗口,则返回默认值0。窗口的时间与规则所用的时间系统相同。若规则采用处理时间,则窗口的时间也为处理时间;若规则采用事件事件,则窗口的时间也为事件时间。                                                                          |
 | window_end      | window_end()                         | 返回窗口的结束时间戳,格式为 int64。若运行时没有时间窗口,则返回默认值0。窗口的时间与规则所用的时间系统相同。若规则采用处理时间,则窗口的时间也为处理时间;若规则采用事件事件,则窗口的时间也为事件时间。                                                                          |
 | window_end      | window_end()                         | 返回窗口的结束时间戳,格式为 int64。若运行时没有时间窗口,则返回默认值0。窗口的时间与规则所用的时间系统相同。若规则采用处理时间,则窗口的时间也为处理时间;若规则采用事件事件,则窗口的时间也为事件时间。                                                                          |
-| get_keyed_state | get_keyed_state(expr1, expr2, expr3) | 返回键在数据库中对应的值。第一个参数为 键 表达式,第二个参数为值类型,支持 bigint, float, string, boolean and datetime 格式,第三个参数为默认值                                                                                   |
+| get_keyed_state | get_keyed_state(expr1, expr2, expr3) | 返回键在数据库中对应的值。第一个参数为 键 表达式,第二个参数为值类型,支持 bigint, float, string, boolean and datetime 格式,第三个参数为默认值。默认数据库是sqlite,用户可以通过这个[配置](../configuration/global_configurations.md#外部状态)更改数据库。   |
 | delay           | delay(delayTime, returnVal)          | 延迟执行规则一段时间后返回第二个参数作为返回值。                                                                                                                                                          |
 | delay           | delay(delayTime, returnVal)          | 延迟执行规则一段时间后返回第二个参数作为返回值。                                                                                                                                                          |
 
 
 ## 多列函数
 ## 多列函数

+ 1 - 0
etc/kuiper.yaml

@@ -89,6 +89,7 @@ source:
 store:
 store:
   #Type of store that will be used for keeping state of the application
   #Type of store that will be used for keeping state of the application
   type: sqlite
   type: sqlite
+  extStateType: sqlite
   redis:
   redis:
     host: localhost
     host: localhost
     port: 6379
     port: 6379

+ 6 - 2
internal/conf/conf.go

@@ -141,8 +141,9 @@ type KuiperConf struct {
 	Sink   *SinkConf
 	Sink   *SinkConf
 	Source *SourceConf
 	Source *SourceConf
 	Store  struct {
 	Store  struct {
-		Type  string `yaml:"type"`
-		Redis struct {
+		Type         string `yaml:"type"`
+		ExtStateType string `yaml:"extStateType"`
+		Redis        struct {
 			Host               string `yaml:"host"`
 			Host               string `yaml:"host"`
 			Port               int    `yaml:"port"`
 			Port               int    `yaml:"port"`
 			Password           string `yaml:"password"`
 			Password           string `yaml:"password"`
@@ -230,6 +231,9 @@ func InitConf() {
 			Log.Fatal(err)
 			Log.Fatal(err)
 		}
 		}
 	}
 	}
+	if Config.Store.ExtStateType == "" {
+		Config.Store.ExtStateType = "sqlite"
+	}
 
 
 	if Config.Portable.PythonBin == "" {
 	if Config.Portable.PythonBin == "" {
 		Config.Portable.PythonBin = "python"
 		Config.Portable.PythonBin = "python"

+ 1 - 1
internal/keyedstate/kv.go

@@ -26,7 +26,7 @@ type Manager struct {
 }
 }
 
 
 func InitKeyedStateKV() {
 func InitKeyedStateKV() {
-	kv, _ = store.GetKV("keyed_state")
+	kv, _ = store.GetExtStateKV("keyed_state")
 }
 }
 
 
 func GetKeyedState(key string) (interface{}, error) {
 func GetKeyedState(key string) (interface{}, error) {

+ 5 - 4
internal/pkg/store/definition/db.go

@@ -1,4 +1,4 @@
-// Copyright 2022-2022 EMQ Technologies Co., Ltd.
+// Copyright 2022-2023 EMQ Technologies Co., Ltd.
 //
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
 // you may not use this file except in compliance with the License.
@@ -20,9 +20,10 @@ type Database interface {
 }
 }
 
 
 type Config struct {
 type Config struct {
-	Type   string
-	Redis  RedisConfig
-	Sqlite SqliteConfig
+	Type         string
+	ExtStateType string
+	Redis        RedisConfig
+	Sqlite       SqliteConfig
 }
 }
 
 
 type RedisConfig struct {
 type RedisConfig struct {

+ 11 - 4
internal/pkg/store/setup.go

@@ -1,4 +1,4 @@
-// Copyright 2021-2022 EMQ Technologies Co., Ltd.
+// Copyright 2021-2023 EMQ Technologies Co., Ltd.
 //
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
 // you may not use this file except in compliance with the License.
@@ -26,8 +26,9 @@ func SetupDefault() error {
 	}
 	}
 
 
 	c := definition.Config{
 	c := definition.Config{
-		Type:  "sqlite",
-		Redis: definition.RedisConfig{},
+		Type:         "sqlite",
+		ExtStateType: "sqlite",
+		Redis:        definition.RedisConfig{},
 		Sqlite: definition.SqliteConfig{
 		Sqlite: definition.SqliteConfig{
 			Path: dir,
 			Path: dir,
 			Name: "",
 			Name: "",
@@ -43,7 +44,8 @@ func SetupWithKuiperConfig(kconf *conf.KuiperConf) error {
 		return err
 		return err
 	}
 	}
 	c := definition.Config{
 	c := definition.Config{
-		Type: kconf.Store.Type,
+		Type:         kconf.Store.Type,
+		ExtStateType: kconf.Store.ExtStateType,
 		Redis: definition.RedisConfig{
 		Redis: definition.RedisConfig{
 			Host:     kconf.Store.Redis.Host,
 			Host:     kconf.Store.Redis.Host,
 			Port:     kconf.Store.Redis.Port,
 			Port:     kconf.Store.Redis.Port,
@@ -69,5 +71,10 @@ func Setup(config definition.Config) error {
 		return err
 		return err
 	}
 	}
 	cacheStores = s
 	cacheStores = s
+	s, err = newExtStateStores(config, "extState.db")
+	if err != nil {
+		return err
+	}
+	extStateStores = s
 	return nil
 	return nil
 }
 }

+ 31 - 3
internal/pkg/store/stores.go

@@ -1,4 +1,4 @@
-// Copyright 2021-2022 EMQ Technologies Co., Ltd.
+// Copyright 2021-2023 EMQ Technologies Co., Ltd.
 //
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
 // you may not use this file except in compliance with the License.
@@ -31,8 +31,9 @@ var (
 	storeBuilders = map[string]StoreCreator{
 	storeBuilders = map[string]StoreCreator{
 		"sqlite": sql.BuildStores,
 		"sqlite": sql.BuildStores,
 	}
 	}
-	globalStores *stores = nil
-	cacheStores  *stores = nil
+	globalStores   *stores = nil
+	cacheStores    *stores = nil
+	extStateStores *stores = nil
 )
 )
 
 
 type stores struct {
 type stores struct {
@@ -63,6 +64,26 @@ func newStores(c definition.Config, name string) (*stores, error) {
 	}
 	}
 }
 }
 
 
+func newExtStateStores(c definition.Config, name string) (*stores, error) {
+	databaseType := c.ExtStateType
+	if builder, ok := storeBuilders[databaseType]; ok {
+		kvBuilder, tsBuilder, err := builder(c, name)
+		if err != nil {
+			return nil, err
+		} else {
+			return &stores{
+				kv:        make(map[string]kv.KeyValue),
+				ts:        make(map[string]kv.Tskv),
+				mu:        sync.Mutex{},
+				kvBuilder: kvBuilder,
+				tsBuilder: tsBuilder,
+			}, nil
+		}
+	} else {
+		return nil, fmt.Errorf("unknown extStateStore type: %s", databaseType)
+	}
+}
+
 func (s *stores) GetKV(table string) (kv.KeyValue, error) {
 func (s *stores) GetKV(table string) (kv.KeyValue, error) {
 	s.mu.Lock()
 	s.mu.Lock()
 	defer s.mu.Unlock()
 	defer s.mu.Unlock()
@@ -175,3 +196,10 @@ func DropCacheKVForRule(rule string) error {
 	cacheStores.DropRefKVs(path.Join("sink", rule))
 	cacheStores.DropRefKVs(path.Join("sink", rule))
 	return nil
 	return nil
 }
 }
+
+func GetExtStateKV(table string) (kv.KeyValue, error) {
+	if extStateStores == nil {
+		return nil, fmt.Errorf("extState stores are not initialized")
+	}
+	return extStateStores.GetKV(table)
+}