Browse Source

doc(stream): docs for options concurrency and bufferLength

ngjaying 5 years atrás
parent
commit
6e83174ad3

+ 4 - 1
docs/en_US/extension/source.md

@@ -54,7 +54,10 @@ A configuration system is supported for Kuiper extension which will automaticall
  
 #### common configuration field
 
-There is a common configuration field ``concurrency`` to specify how many instances will be started to run the source.
+There are 2 common configuration fields.
+ 
+* ``concurrency`` to specify how many instances will be started to run the source.
+* ``bufferLength`` to specify the maximum number of messages to be buffered in the memory. This is used to avoid the extra large memory usage that would cause out of memory error. Notice that the memory usage will be varied to the actual buffer. Increase the length here won't increase the initial memory allocation so it is safe to set a large buffer length. The default value is 102400, that is if each payload size is about 100 bytes, the maximum buffer size will be about 102400 * 100B ~= 10MB.
 
 ### Package the source
 Build the implemented source as a go plugin and make sure the output so file resides in the plugins/sources folder.

+ 6 - 3
docs/en_US/rules/overview.md

@@ -45,11 +45,12 @@ The sql query to run for the rule.
 
 ### actions
 
-Currently, 3 kinds of actions are supported: [log](sinks/logs.md), [mqtt](sinks/mqtt.md) and [rest](sinks/rest.md). Each action can define its own properties. There are 2 common properties:
+Currently, 3 kinds of actions are supported: [log](sinks/logs.md), [mqtt](sinks/mqtt.md) and [rest](sinks/rest.md). Each action can define its own properties. There are 3 common properties:
 
 | property name | Type & Default Value | Description                                                  |
 | ------------- | -------- | ------------------------------------------------------------ |
-| concurrency | int: 1   | Specify how many instances of the sink will be run. |
+| concurrency | int: 1   | Specify how many instances of the sink will be run. If the value is bigger than 1, the order of the messages may not be retained. |
+| bufferLength | int: 1024   | Specify how many messages can be buffered in memory. If the buffered messages exceed the limit, the sink will block message receiving until the buffered messages have been sent out so that the buffered size is less than the limit. |
 | runAsync        | bool:false   | Whether the sink will run asynchronously for better performance. If it is true, the sink result order is not promised.  |
 
 Actions could be customized to support different kinds of outputs, see [extension](../extension/overview.md) for more detailed info.
@@ -60,4 +61,6 @@ The current options includes:
 | Option name | Type & Default Value | Description                                                  |
 | ------------- | -------- | ------------------------------------------------------------ |
 | isEventTime | boolean: false   | Whether to use event time or processing time as the timestamp for an event. If event time is used, the timestamp will be extracted from the payload. The timestamp filed must be specified by the [stream]([extension](../sqls/streams.md)) definition. |
-| lateTolerance        | int64:0   | When working with event-time windowing, it can happen that elements arrive late. LateTolerance can specify by how much time(unit is millisecond) elements can be late before they are dropped. By default, the value is 0 which means late elements are dropped.  |
+| lateTolerance        | int64:0   | When working with event-time windowing, it can happen that elements arrive late. LateTolerance can specify by how much time(unit is millisecond) elements can be late before they are dropped. By default, the value is 0 which means late elements are dropped.  |
+| concurrency | int: 1   | A rule is processed by several phases of plans according to the sql statement. This option will specify how many instances will be run for each plan. If the value is bigger than 1, the order of the messages may not be retained. |
+| bufferLength | int: 1024   | Specify how many messages can be buffered in memory for each plan. If the buffered messages exceed the limit, the plan will block message receiving until the buffered messages have been sent out so that the buffered size is less than the limit. A bigger value will accommodate more throughput but will also take up more memory footprint.  |

+ 3 - 0
docs/en_US/rules/sources/mqtt.md

@@ -57,6 +57,9 @@ The location of certification path. It can be an absolute path, or a relative pa
 
 The location of private key path. It can be an absolute path, or a relative path.  For more detailed information, please refer to ``certificationPath``. Such as ``d3807d9fa5-private.pem.key``.
 
+### bufferLength
+specify the maximum number of messages to be buffered in the memory. This is used to avoid the extra large memory usage that would cause out of memory error. Notice that the memory usage will be varied to the actual buffer. Increase the length here won't increase the initial memory allocation so it is safe to set a large buffer length. The default value is 102400, that is if each payload size is about 100 bytes, the maximum buffer size will be about 102400 * 100B ~= 10MB.
+
 ## Override the default settings
 
 If you have a specific connection that need to overwrite the default settings, you can create a customized section. In the previous sample, we create a specific setting named with ``demo``.  Then you can specify the configuration with option ``CONF_KEY`` when creating the stream definition (see [stream specs](../../sqls/streams.md) for more info).

+ 6 - 3
docs/zh_CN/rules/overview.md

@@ -45,11 +45,12 @@
 
 ### 动作
 
-当前,支持两种操作: [log](sinks/logs.md) 、[mqtt](sinks/mqtt.md) 和 [rest](sinks/rest.md)。 每个动作可以定义自己的属性。当前有个公共属性:
+当前,支持两种操作: [log](sinks/logs.md) 、[mqtt](sinks/mqtt.md) 和 [rest](sinks/rest.md)。 每个动作可以定义自己的属性。当前有个公共属性:
 
 | 属性名 | 类型和默认值 | 描述                                                  |
 | ------------- | -------- | ------------------------------------------------------------ |
-| concurrency | int: 1   | 设置运行的线程数。 |
+| concurrency | int: 1   | 设置运行的线程数。该参数值大于1时,消息发出的顺序可能无法保证。 |
+| bufferLength | int: 1024   | 设置可缓存消息数目。若缓存消息数超过此限制,sink将阻塞消息接收,直到缓存消息被消费使得缓存消息数目小于限制为止。|
 | runAsync        | bool:false   | 设置是否异步运行输出操作以提升性能。请注意,异步运行的情况下,输出结果顺序不能保证。  |
 
 可以自定义动作以支持不同种类的输出,有关更多详细信息,请参见 [extension](../extension/overview.md) 。
@@ -60,4 +61,6 @@
 | 选项名 | 类型和默认值 | Description                                                  |
 | ------------- | -------- | ------------------------------------------------------------ |
 | isEventTime | 布尔值:false | 使用事件时间还是将时间用作事件的时间戳。 如果使用事件时间,则将从有效负载中提取时间戳。 必须通过[stream]([extension](../sqls/streams.md))定义指定时间戳记。 |
-| lateTolerance        | int64:0   | 在使用事件时间窗口时,可能会出现元素延迟到达的情况。 LateTolerance可以指定在删除元素之前可以延迟多少时间(单位为毫秒)。 默认情况下,该值为0,表示后期元素将被删除。 |
+| lateTolerance        | int64:0   | 在使用事件时间窗口时,可能会出现元素延迟到达的情况。 LateTolerance可以指定在删除元素之前可以延迟多少时间(单位为毫秒)。 默认情况下,该值为0,表示后期元素将被删除。 |
+| concurrency | int: 1   | 一条规则运行时会根据sql语句分解成多个plan运行。该参数设置每个plan运行的线程数。该参数值大于1时,消息处理顺序可能无法保证。 |
+| bufferLength | int: 1024   | 指定每个plan可缓存消息数。若缓存消息数超过此限制,plan将阻塞消息接收,直到缓存消息被消费使得缓存消息数目小于限制为止。此选项值越大,则消息吞吐能力越强,但是内存占用也会越多。|

+ 4 - 0
docs/zh_CN/rules/sources/mqtt.md

@@ -56,6 +56,10 @@ MQTT 连接密码。如果指定了``certificationPath`` 或者 ``privateKeyPath
 
 私钥路径。可以为绝对路径,也可以为相对路径。更详细的信息,请参考 ``certificationPath``. 比如``d3807d9fa5-private.pem.key``.
 
+### bufferLength
+
+指定最大缓存消息数目。该参数主要用于防止内存溢出。实际内存用量会根据当前缓存消息数目动态变化。增大该参数不会增加初始内存分配量,因此设置较大的数值是安全的。该参数默认值为102400;如果每条消息为100字节,则默认情况下,缓存最大占用内存量为102400 * 100B ~= 10MB. 
+
 ## 覆盖默认设置
 
 如果您有一个特定连接需要覆盖默认设置,则可以创建一个自定义模块。 在上一个示例中,我们创建一个名为“ demo”的特定设置。 然后,您可以在创建流定义时使用选项“ CONF_KEY”指定配置(有关更多信息,请参见 [stream specs](../../sqls/streams.md) )。