Quellcode durchsuchen

doc(source): doc for file source

Signed-off-by: Jiyong Huang <huangjy@emqx.io>
Jiyong Huang vor 2 Jahren
Ursprung
Commit
c6047eb3a9

+ 24 - 2
docs/en_US/rules/sources/builtin/file.md

@@ -1,8 +1,12 @@
 ## File source
 ## File source
 
 
+<span style="background:green;color:white;">stream source</span>
 <span style="background:green;color:white">scan table source</span>
 <span style="background:green;color:white">scan table source</span>
 
 
-eKuiper provides built-in support for reading file content into the eKuiper processing pipeline. The file source is usually used as a [table](../../../sqls/tables.md) and it is the default type for create table statement.
+eKuiper provides built-in support for reading file content into the eKuiper processing pipeline. The file source is
+usually used as a [table](../../../sqls/tables.md) and it is the default type for create table statement. File sources
+are also supported as streams, where it is usually necessary to set the `interval` parameter to pull updates at regular
+intervals.
 
 
 ```sql
 ```sql
 create table table1 (
 create table table1 (
@@ -18,12 +22,30 @@ The configure file for the file source is in */etc/sources/file.yaml* in which t
 
 
 ```yaml
 ```yaml
 default:
 default:
+  # The type of the file, could be json, csv and lines
   fileType: json
   fileType: json
-  # The directory of the file relative to eKuiper root or an absolute path.
+  # The directory of the file relative to kuiper root or an absolute path.
   # Do not include the file name here. The file name should be defined in the stream data source
   # Do not include the file name here. The file name should be defined in the stream data source
   path: data
   path: data
   # The interval between reading the files, time unit is ms. If only read once, set it to 0
   # The interval between reading the files, time unit is ms. If only read once, set it to 0
   interval: 0
   interval: 0
+  # The sending interval between each event in millisecond
+  sendInterval: 0
+  # After read
+  # 0: keep the file
+  # 1: delete the file
+  # 2: move the file to moveTo
+  actionAfterRead: 0
+  # The path to move the file to after read, only valid when the actionAfterRead is 2
+  moveTo: /tmp/kuiper/moved
+  # If the first line is header
+  hasHeader: false
+  # Define the columns. If header is defined, this will be override
+  # columns: [id, name]
+  # How many lines to be ignored at the beginning. Notice that, empty line will be ignored and not be calculated.
+  ignoreStartLines: 0
+  # How many lines to be ignored in the end. Notice that, empty line will be ignored and not be calculated.
+  ignoreEndLines: 0
 ```
 ```
 
 
 With this yaml file, the table will refer to the file *${eKuiper}/data/lookup.json* and read it in json format.
 With this yaml file, the table will refer to the file *${eKuiper}/data/lookup.json* and read it in json format.

+ 100 - 2
docs/zh_CN/rules/sources/builtin/file.md

@@ -1,8 +1,10 @@
 ## 文件源
 ## 文件源
 
 
+<span style="background:green;color:white;">stream source</span>
 <span style="background:green;color:white">scan table source</span>
 <span style="background:green;color:white">scan table source</span>
 
 
-eKuiper 提供了内置支持,可将文件内容读入 eKuiper 处理管道。 文件源通常用作 [表格](../../../sqls/tables.md), 并且采用 create table 语句的默认类型。
+eKuiper 提供了内置支持,可将文件内容读入 eKuiper 处理管道。 文件源通常用作[表格](../../../sqls/tables.md), 并且采用 create
+table 语句的默认类型。文件源也支持作为用作流,此时通常需要设置 `interval` 参数以定时拉取更新。
 
 
 ```sql
 ```sql
 CREATE TABLE table1 (
 CREATE TABLE table1 (
@@ -18,12 +20,108 @@ CREATE TABLE table1 (
 
 
 ```yaml
 ```yaml
 default:
 default:
+  # 文件的类型,支持 json, csv 和 lines
   fileType: json
   fileType: json
   # 文件以 eKuiper 为根目录的目录或文件的绝对路径。
   # 文件以 eKuiper 为根目录的目录或文件的绝对路径。
   # 请勿在此处包含文件名。文件名应在流数据源中定义
   # 请勿在此处包含文件名。文件名应在流数据源中定义
   path: data
   path: data
   # 读取文件的时间间隔,单位为ms。 如果只读取一次,则将其设置为 0
   # 读取文件的时间间隔,单位为ms。 如果只读取一次,则将其设置为 0
   interval: 0
   interval: 0
+  # 读取后,两条数据发送的间隔时间
+  sendInterval: 0
+  # 文件读取后的操作
+  # 0: 文件保持不变
+  # 1: 删除文件
+  # 2: 移动文件到 moveTo 定义的位置
+  actionAfterRead: 0
+  # 移动文件的位置, 仅用于 actionAfterRead 为 2 的情况
+  moveTo: /tmp/kuiper/moved
+  # 是否包含文件头,多用于 csv。若为 true,则第一行解析为文件头。
+  hasHeader: false
+  # 定义文件的列。如果定义了文件头,该选项将被覆盖。
+  # columns: [id, name]
+  # 忽略开头多少行的内容。
+  ignoreStartLines: 0
+  # 忽略结尾多少行的内容。最后的空行不计算在内。
+  ignoreEndLines: 0
 ```
 ```
 
 
-通过这个 yaml 文件,该表将引用文件 *${eKuiper}/data/lookup.json* 并以 json 格式读取它。
+### File Types
+
+The file source supports monitoring files or folders. If the monitored location is a folder, all files in the folder are
+required to be of the same type. When monitoring a folder, it will read in files order by file name alphabetically.
+
+The supported file types are
+
+- json: standard JSON array format files,
+  see [example](https://github.com/lf-edge/ekuiper/tree/master/internal/topo/source/test/test.json). If the file format
+  is a line-separated JSON string, it needs to be defined in lines format.
+- csv: comma-separated csv files are supported, as well as custom separators.
+- lines: line-separated file. The decoding method of each line can be defined by the format parameter in the stream
+  definition. For example, for a line-separated JSON string, the file type is set to lines and the format is set to
+  json.
+
+Some files may have most of the data in standard format, but have some metadata in the opening and closing lines of the
+file. The user can use the `ignoreStartLines` and `ignoreEndLines` arguments to remove the non-standard parts of the
+beginning and end so that the above file types can be parsed.
+
+### Example
+
+File sources involve the parsing of file contents and intersect with format-related definitions in data streams. We
+describe with some examples how to combine file types and formats for parsing file sources.
+
+#### Read a csv with a custom separator
+
+The standard csv separator is a comma, but there are a large number of files that use the csv-like format with custom
+separators. Some csv-like files have column names defined in the first line instead of data.
+
+```csv
+id name age
+1 John 56
+2 Jane 34
+```
+
+When the file is read, the configuration file is as follows, specifying that the file has a header.
+
+```yaml
+csv:
+  fileType: csv
+  hasHeader: true
+```
+
+In the stream definition, set the stream data to ``DELIMITED`` format, specifying the separator with the ``DELIMITER``
+parameter.
+
+```SQL
+create
+stream cscFileDemo () WITH (FORMAT="DELIMITED", DATASOURCE="abc.csv", TYPE="file", DELIMITER=" ", CONF_KEY="csv"
+```
+
+#### Read multi-line JSON data
+
+With a standard JSON file, the entire file should be a JSON object or an array. In practice, we often need to parse
+files that contain multiple JSON objects. These files are not actually JSON themselves, but are considered to be
+multiple lines of JSON data, assuming that each JSON object is a single line.
+
+```text
+{"id": 1, "name": "John Doe"}
+{"id": 2, "name": "Jane Doe"}
+{"id": 3, "name": "John Smith"}
+```
+
+When reading this file, the configuration file is as follows, specifying the file type as lines.
+
+```yaml
+jsonlines:
+  fileType: lines
+```
+
+In the stream definition, set the stream data to be in ``JSON`` format.
+
+```SQL
+create
+stream linesFileDemo () WITH (FORMAT="JSON", TYPE="file", CONF_KEY="jsonlines"
+```
+
+Moreover, the lines file type can be combined with any format. For example, if you set the format to protobuf and
+configure the schema, it can be used to parse data that contains multiple Protobuf encoded lines.

+ 115 - 7
etc/sources/file.json

@@ -12,8 +12,8 @@
       "zh_CN": "https://github.com/lf-edge/ekuiper/blob/master/docs/zh_CN/rules/sources/builtin/file.md"
       "zh_CN": "https://github.com/lf-edge/ekuiper/blob/master/docs/zh_CN/rules/sources/builtin/file.md"
     },
     },
     "description": {
     "description": {
-      "en_US": "Reading file content into the eKuiper processing pipeline. The file source is usually used as a table and it is the default type for create table statement.",
-      "zh_CN": "读取文件内容,并且将数据放入 eKuiper 数据处理流水线中。通常用于表格,且为使用 create table 语句时,默认的源类型。"
+      "en_US": "Monitor file system and load the content into the eKuiper processing pipeline.",
+      "zh_CN": "监控文件系统,读取文件内容,并且将数据放入 eKuiper 数据处理流水线中。"
     }
     }
   },
   },
   "libs": [],
   "libs": [],
@@ -24,8 +24,8 @@
       "zh_CN": "不含路径的文件名,例如 test.json"
       "zh_CN": "不含路径的文件名,例如 test.json"
     },
     },
     "label": {
     "label": {
-      "en_US": "Data Source (File Name)",
-      "zh_CN": "数据源(文件)"
+      "en_US": "Data Source (File or directory relative path)",
+      "zh_CN": "数据源(文件或者目录的相对地址)"
     }
     }
   },
   },
   "properties": {
   "properties": {
@@ -34,8 +34,13 @@
         "name": "fileType",
         "name": "fileType",
         "default": "json",
         "default": "json",
         "optional": true,
         "optional": true,
-        "control": "text",
+        "control": "select",
         "type": "string",
         "type": "string",
+        "values": [
+          "json",
+          "csv",
+          "lines"
+        ],
         "hint": {
         "hint": {
           "en_US": "The file format type.",
           "en_US": "The file format type.",
           "zh_CN": "文件格式类型"
           "zh_CN": "文件格式类型"
@@ -61,17 +66,120 @@
       },{
       },{
         "name": "interval",
         "name": "interval",
         "default": 0,
         "default": 0,
-        "optional": false,
+        "optional": true,
         "control": "text",
         "control": "text",
         "type": "int",
         "type": "int",
         "hint": {
         "hint": {
           "en_US": "The interval between reading the files, time unit is ms. If only read once, set it to 0",
           "en_US": "The interval between reading the files, time unit is ms. If only read once, set it to 0",
-          "zh_CN": "读取文件的间隔时间。如果只需读取一次,设置为0。"
+          "zh_CN": "读取文件的间隔时间,单位为毫秒。如果只需读取一次,设置为0。"
         },
         },
         "label": {
         "label": {
           "en_US": "Interval",
           "en_US": "Interval",
           "zh_CN": "间隔时间"
           "zh_CN": "间隔时间"
         }
         }
+      },{
+        "name": "sendInterval",
+        "default": 0,
+        "optional": true,
+        "control": "text",
+        "type": "int",
+        "hint": {
+          "en_US": "The sending interval between each event in millisecond.",
+          "zh_CN": "事件发送的间隔时间,单位为毫秒。"
+        },
+        "label": {
+          "en_US": "Send Interval",
+          "zh_CN": "发送间隔"
+        }
+      },{
+        "name": "actionAfterRead",
+        "default": 0,
+        "optional": true,
+        "control": "select",
+        "type": "int",
+        "values": [
+          0,
+          1,
+          2
+        ],
+        "hint": {
+          "en_US": "The action after read. 0 meas keep the file; 1 means delete the file; 2 means move the file to the path defined in the property 'moveTo'",
+          "zh_CN": "读取后的操作。0表示保留文件;1表示删除文件;2表示将文件移动到属性 'moveTo' 中定义的路径。"
+        },
+        "label": {
+          "en_US": "Action after read",
+          "zh_CN": "读取后动作"
+        }
+      },{
+        "name": "moveTo",
+        "default": "",
+        "optional": true,
+        "control": "text",
+        "type": "string",
+        "hint": {
+          "en_US": "The path to move the file to after read, only valid when the actionAfterRead is 1.",
+          "zh_CN": "移动文件的位置, 仅用于 actionAfterRead 为 2 的情况"
+        },
+        "label": {
+          "en_US": "Move to path",
+          "zh_CN": "移动位置"
+        }
+      },{
+        "name": "hasHeader",
+        "default": false,
+        "optional": true,
+        "control": "radio",
+        "type": "bool",
+        "hint": {
+          "en_US": "If the first line is header, usually used for csv file.",
+          "zh_CN": "是否包含文件头,多用于 csv。若为 true,则第一行解析为文件头。"
+        },
+        "label": {
+          "en_US": "Has header",
+          "zh_CN": "是否包含文件头"
+        }
+      },{
+        "name": "columns",
+        "default": [],
+        "optional": true,
+        "control": "list",
+        "type": "list_string",
+        "hint": {
+          "en_US": "Define the columns. If header is defined, this will be override.",
+          "zh_CN": "定义文件的列。如果定义了文件头,该选项将被覆盖。"
+        },
+        "label": {
+          "en_US": "Columns",
+          "zh_CN": "字段列表"
+        }
+      },{
+        "name": "ignoreStartLines",
+        "default": 0,
+        "optional": true,
+        "control": "text",
+        "type": "int",
+        "hint": {
+          "en_US": "How many lines to be ignored at the beginning. Notice that, empty line will be ignored and not be calculated.",
+          "zh_CN": "忽略开头多少行的内容。"
+        },
+        "label": {
+          "en_US": "Ignore start lines",
+          "zh_CN": "文件开头忽略的行数"
+        }
+      },{
+        "name": "ignoreEndLines",
+        "default": 0,
+        "optional": true,
+        "control": "text",
+        "type": "int",
+        "hint": {
+          "en_US": "How many lines to be ignored in the end. Notice that, empty line will be ignored and not be calculated.",
+          "zh_CN": "忽略结尾多少行的内容。最后的空行不计算在内。。"
+        },
+        "label": {
+          "en_US": "Ignore end lines",
+          "zh_CN": "文件结尾忽略的行数"
+        }
       }]
       }]
   },
   },
   "outputs": [
   "outputs": [

+ 3 - 3
etc/sources/file.yaml

@@ -6,14 +6,14 @@ default:
   path: data
   path: data
   # The interval between reading the files, time unit is ms. If only read once, set it to 0
   # The interval between reading the files, time unit is ms. If only read once, set it to 0
   interval: 0
   interval: 0
-  # When using as a stream, the sending interval between each event in millisecond
-  sendInterval: 10
+  # The sending interval between each event in millisecond
+  sendInterval: 0
   # After read
   # After read
   # 0: keep the file
   # 0: keep the file
   # 1: delete the file
   # 1: delete the file
   # 2: move the file to moveTo
   # 2: move the file to moveTo
   actionAfterRead: 0
   actionAfterRead: 0
-  # The path to move the file to after read, only valid when the actionAfterRead is 1
+  # The path to move the file to after read, only valid when the actionAfterRead is 2
   moveTo: /tmp/kuiper/moved
   moveTo: /tmp/kuiper/moved
   # If the first line is header
   # If the first line is header
   hasHeader: false
   hasHeader: false