stream source scan table source
eKuiper provides built-in support for reading file content into the eKuiper processing pipeline. This is useful in scenarios where data is batch-processed or when files need real-time processing by eKuiper.
The File Source Connector allows eKuiper to read data from local files, supporting multiple formats such as JSON, CSV, and line-separated values:
:::: tabs type:card
::: tab JSON
Description: Files in standard JSON array format.
Example:
[
{"id": 1, "name": "John Doe"},
{"id": 2, "name": "Jane Smith"}
]
Special Note: If the file consists of line-separated JSON strings, you should define the file type as "lines" and set the format to "json".
:::
::: tab csv
Description: Traditional comma-separated values (CSV) files.
Example:
id,name,age
1,John Doe,30
2,Jane Smith,25
Custom Separators: Some CSV-like files might use different delimiters. For such files, you can specify custom separators. For instance, a space-separated CSV might look like:
id name age
1 John Doe 30
2 Jane Smith 25
:::
::: tab Lines
Description: Files with line-separated data. Each line represents a distinct data point.
Example:
{"id": 1, "name": "John Doe"}
{"id": 2, "name": "Jane Smith"}
Combining with Formats: The "lines" file type is versatile and can be used with various data formats. For instance, if you set the format to "protobuf" and provide the appropriate schema, you can parse files containing multiple lines of Protobuf encoded data.
:::
::::
::: tip
When dealing with files that have metadata or non-standard content at the beginning or end, you can leverage the ignoreStartLines
and ignoreEndLines
parameters to ensure the main content is parsed correctly.
:::
The connector in eKuiper can be configured with environment variables, rest API, or configuration file. This section focuses on configuring eKuiper connectors with the configuration file.
The configure file for the file source is located at /etc/sources/file.yaml
.
Example
default:
# The type of the file, could be json, csv and lines
fileType: json
# The directory of the file relative to kuiper root or an absolute path.
# Do not include the file name here. The file name should be defined in the stream data source
path: data
# The interval between reading the files, time unit is ms. If only read once, set it to 0
interval: 0
# The sending interval between each event in millisecond
sendInterval: 0
# Read the files in a directory in parallel or not
parallel: false
# After read
# 0: keep the file
# 1: delete the file
# 2: move the file to moveTo
actionAfterRead: 0
# The path to move the file to after read, only valid when the actionAfterRead is 2
moveTo: /tmp/kuiper/moved
# If the first line is header
hasHeader: false
# Define the columns. If header is defined, this will be override
# columns: [id, name]
# How many lines to be ignored at the beginning. Notice that, empty line will be ignored and not be calculated.
ignoreStartLines: 0
# How many lines to be ignored in the end. Notice that, empty line will be ignored and not be calculated.
ignoreEndLines: 0
# Decompress the file with the specified compression method. Support `gzip`, `zstd` method now. |
decompression: ""
fileType
: Defines the type of file. Supported values are json
, csv
, and lines
.path
: Specifies the directory of the file, either relative to the Kuiper root or an absolute path. The filename should be defined in the stream data source.interval
: Sets the interval, in milliseconds, between file reads. If set to 0, the file is read only once.sendInterval
: Determines the interval, in milliseconds, between sending each event.parallel
: Determines if the files in a directory should be read in parallel. If set to true
, files in a directory are read in parallel.actionAfterRead
: Determines the action after reading the file:
0
: Keep the file.1
: Delete the file.2
: Move the file to the location specified in moveTo
.moveTo
: Specifies the path to move the file to after reading. Only valid if actionAfterRead
is set to 2
.hasHeader
: Indicates if the file has a header line.columns
: Defines the column names, particularly useful for CSV files. For instance, columns: [id, name]
.ignoreStartLines
: Specifies the number of lines to be ignored at the beginning of the file. Empty lines will be ignored and not counted.ignoreEndLines
: Specifies the number of lines to be ignored at the end of the file. Again, empty lines will be ignored and not counted.decompression
: Allows decompression of files. Currently, gzip
and zstd
methods are supported.After setting up your streams, you can integrate them with eKuiper rules to process the data.
::: tip
The File Source connector can operate as either a stream source or a scan table source. When configured as a stream source, you have the option to set the interval
parameter, allowing for regular updates at specified intervals. While the file source is commonly utilized as a table — and it's the default configuration for the "create table" statement — this section will primarily demonstrate its integration as a table source.
:::
For example, to create a table named table1
with three columns (name
, size
, and id
) and populate this table with data from a JSON file named lookup.json
, you can work with the code below:
create table table1 (
name STRING,
size BIGINT,
id BIGINT
) WITH (DATASOURCE="lookup.json", FORMAT="json", TYPE="file");
After setting up your streams, you can integrate them with eKuiper rules to process the data.
CREATE RULE rule1 AS SELECT * FROM fileDemo WHERE temperature > 50 INTO mySink;
This rule selects all data from the fileDemo
stream where the temperature exceeds 50 and sends it to mySink
.
You can define the file source as the data source either by REST API or CLI tool.
File sources in eKuiper require parsing of content, which often intersects with format-related stream definitions. To illustrate how eKuiper parses different file formats, let's walk through a couple of examples.
While the standard separator for CSV files is a comma, many files utilize custom delimiters in a CSV-like format. Additionally, some of these files designate the first line for column names rather than data values.
id name age
1 John 56
2 Jane 34
Before eKuiper can read the file, you need to specify its type and inform the system about the header. Modify the configuration file (/etc/sources/file.yaml
) with the following content:
csv:
fileType: csv
hasHeader: true
This configuration indicates that the file is of type csv
and has a header.
In the stream definition, define a stream with the DELIMITED
format. The DELIMITER
parameter allows us to specify the custom separator (in this case, a space).
create
stream cscFileDemo () WITH (FORMAT="DELIMITED", DATASOURCE="abc.csv", TYPE="file", DELIMITER=" ", CONF_KEY="csv"
This command creates a stream named csvFileDemo
that reads from the abc.csv
file, expecting space-separated values.
Typically, a standard JSON file contains a single JSON object or an array. However, some files consist of multiple JSON objects, each on a separate line.
{"id": 1, "name": "John Doe"}
{"id": 2, "name": "Jane Doe"}
{"id": 3, "name": "John Smith"}
To effectively process this file, modify the configuration file (/etc/sources/file.yaml
) with the following content:
jsonlines:
fileType: lines
In the stream definition, set the stream data to be in JSON
format.
create stream linesFileDemo () WITH (FORMAT="JSON", TYPE="file", CONF_KEY="jsonlines")
This command configures a stream named linesFileDemo
to process line-separated JSON data from the source file.