Browse Source

feat(native plugin): add sql source native plugin (#1240)

* feat(native plugin): add sql source native plugin

Signed-off-by: Jianxiang Ran <rxan_embedded@163.com>
superxan 3 years ago
parent
commit
6c0c3df539
52 changed files with 4332 additions and 42 deletions
  1. 1 0
      .github/workflows/build_packages.yaml
  2. 1 0
      Makefile
  3. 253 0
      etc/sources/sql.json
  4. 40 0
      etc/sources/sql.yaml
  5. 172 15
      extensions.mod
  6. 1028 15
      extensions.sum
  7. 193 1
      extensions/go.mod
  8. 1049 5
      extensions/go.sum
  9. 184 0
      extensions/sources/sql/sql.go
  10. 21 0
      extensions/sqldatabase/driver/adodb.go
  11. 51 0
      extensions/sqldatabase/driver/apply.go
  12. 21 0
      extensions/sqldatabase/driver/athena.go
  13. 21 0
      extensions/sqldatabase/driver/avatica.go
  14. 21 0
      extensions/sqldatabase/driver/clickhouse.go
  15. 21 0
      extensions/sqldatabase/driver/cosmos.go
  16. 21 0
      extensions/sqldatabase/driver/couchbase.go
  17. 21 0
      extensions/sqldatabase/driver/firebird.go
  18. 21 0
      extensions/sqldatabase/driver/godror.go
  19. 21 0
      extensions/sqldatabase/driver/h2.go
  20. 21 0
      extensions/sqldatabase/driver/hive.go
  21. 21 0
      extensions/sqldatabase/driver/ignite.go
  22. 21 0
      extensions/sqldatabase/driver/impala.go
  23. 21 0
      extensions/sqldatabase/driver/maxcompute.go
  24. 21 0
      extensions/sqldatabase/driver/moderncsqlite.go
  25. 21 0
      extensions/sqldatabase/driver/mymysql.go
  26. 21 0
      extensions/sqldatabase/driver/mysql.go
  27. 21 0
      extensions/sqldatabase/driver/netezza.go
  28. 21 0
      extensions/sqldatabase/driver/odbc.go
  29. 21 0
      extensions/sqldatabase/driver/oracle.go
  30. 21 0
      extensions/sqldatabase/driver/pgx.go
  31. 21 0
      extensions/sqldatabase/driver/postgres.go
  32. 21 0
      extensions/sqldatabase/driver/presto.go
  33. 21 0
      extensions/sqldatabase/driver/ql.go
  34. 21 0
      extensions/sqldatabase/driver/sapase.go
  35. 21 0
      extensions/sqldatabase/driver/snowflake.go
  36. 21 0
      extensions/sqldatabase/driver/spanner.go
  37. 21 0
      extensions/sqldatabase/driver/sqlite3.go
  38. 21 0
      extensions/sqldatabase/driver/sqlserver.go
  39. 21 0
      extensions/sqldatabase/driver/trino.go
  40. 21 0
      extensions/sqldatabase/driver/vertica.go
  41. 21 0
      extensions/sqldatabase/driver/voltdb.go
  42. 94 0
      extensions/sqldatabase/sqlgen/commonSqlDialect.go
  43. 92 0
      extensions/sqldatabase/sqlgen/sqlServerDialect.go
  44. 112 0
      extensions/sqldatabase/sqlgen/sqlServerDialect_test.go
  45. 108 0
      extensions/sqldatabase/sqlgen/sqlgen.go
  46. 91 0
      extensions/sqldatabase/sqlgen/templateSqlDialect.go
  47. 124 0
      extensions/sqldatabase/sqlgen/templateSqlDialect_test.go
  48. 2 2
      go.mod
  49. 2 2
      go.sum
  50. 2 0
      internal/pkg/store/encoding/encoding.go
  51. 1 1
      internal/server/meta_plugin_init.go
  52. 81 1
      pkg/cast/time_test.go

+ 1 - 0
.github/workflows/build_packages.yaml

@@ -166,6 +166,7 @@ jobs:
           - sinks/redis
           - sources/random
           - sources/zmq
+          - sources/sql
           - functions/accumulateWordCount
           - functions/countPlusOne
           - functions/echo

+ 1 - 0
Makefile

@@ -87,6 +87,7 @@ PLUGINS := sinks/file \
 	sinks/redis \
 	sources/random \
 	sources/zmq \
+	sources/sql \
 	sinks/tdengine \
 	functions/accumulateWordCount \
 	functions/countPlusOne \

+ 253 - 0
etc/sources/sql.json

@@ -0,0 +1,253 @@
+{
+  "about": {
+    "trial": true,
+    "author": {
+      "name": "EMQ",
+      "email": "contact@emqx.io",
+      "company": "EMQ Technologies Co., Ltd",
+      "website": "https://www.emqx.io"
+    },
+    "helpUrl": {
+      "en_US": "https://github.com/lf-edge/ekuiper/blob/master/docs/en_US/rules/sources/sql.md",
+      "zh_CN": "https://github.com/lf-edge/ekuiper/blob/master/docs/zh_CN/rules/sources/sql.md"
+    },
+    "description": {
+      "en_US": "Read message from sql database",
+      "zh_CN": "从数据库中读取消息"
+    }
+  },
+  "libs": [],
+  "properties": {
+    "default": [
+      {
+        "name": "url",
+        "default": "",
+        "optional": false,
+        "control": "text",
+        "type": "string",
+        "hint": {
+          "en_US": "The url of the database",
+          "zh_CN": "数据库服务器的 URL"
+        },
+        "label": {
+          "en_US": "server address",
+          "zh_CN": "数据库地址"
+        }
+      },
+      {
+        "name": "interval",
+        "default": 1000,
+        "optional": false,
+        "control": "text",
+        "type": "int",
+        "hint": {
+          "en_US": "The interval (ms) to issue a query",
+          "zh_CN": "发出消息的间隔(毫秒)"
+        },
+        "label": {
+          "en_US": "Interval",
+          "zh_CN": "间隔时间"
+        }
+      },
+      {
+        "name": "internalSqlQueryCfg",
+        "default": [
+          {
+            "name": "table",
+            "default": "tableName",
+            "optional": false,
+            "control": "text",
+            "type": "string",
+            "hint": {
+              "en_US": "table name to query",
+              "zh_CN": "指定查询的数据库表名"
+            },
+            "label": {
+              "en_US": "tableName",
+              "zh_CN": "表名"
+            }
+          },
+          {
+            "name": "indexField",
+            "default": "",
+            "optional": true,
+            "control": "text",
+            "type": "string",
+            "hint": {
+              "en_US": "index field",
+              "zh_CN": "索引字段名"
+            },
+            "label": {
+              "en_US": "indexField",
+              "zh_CN": "索引字段名"
+            }
+          },
+          {
+            "name": "indexValue",
+            "default": "",
+            "optional": true,
+            "control": "text",
+            "type": "string",
+            "hint": {
+              "en_US": "index init value",
+              "zh_CN": "索引字段初始值"
+            },
+            "label": {
+              "en_US": "index init value",
+              "zh_CN": "索引字段初始值"
+            }
+          },
+          {
+            "name": "limit",
+            "default": 10,
+            "optional": true,
+            "control": "text",
+            "type": "int",
+            "hint": {
+              "en_US": "query result limit",
+              "zh_CN": "查询结果条数限制"
+            },
+            "label": {
+              "en_US": "Limit",
+              "zh_CN": "查询条数限制"
+            }
+          },
+          {
+            "name": "indexFieldType",
+            "default": "",
+            "optional": true,
+            "control": "select",
+            "type": "string",
+            "values": ["DATETIME"],
+            "hint": {
+              "en_US": "is the index datetime type",
+              "zh_CN": "是否为时间格式"
+            },
+            "label": {
+              "en_US": "indexFieldType",
+              "zh_CN": "indexFieldType"
+            }
+          },
+          {
+            "name": "dateTimeFormat",
+            "default": "",
+            "optional": true,
+            "control": "text",
+            "type": "string",
+            "hint": {
+              "en_US": "dateTimeFormat",
+              "zh_CN": "dateTimeFormat"
+            },
+            "label": {
+              "en_US": "dateTimeFormat",
+              "zh_CN": "dateTimeFormat"
+            }
+          }
+        ],
+        "optional": true,
+        "control": "list",
+        "type": "list_object",
+        "hint": {
+          "en_US": "basic configuration for the query",
+          "zh_CN": "查询基础配置"
+        },
+        "label": {
+          "en_US": "QueryPattern",
+          "zh_CN": "查询样式"
+        }
+      },
+      {
+        "name": "templateSqlQueryCfg",
+        "default": [
+          {
+            "name": "TemplateSql",
+            "default": "",
+            "optional": false,
+            "control": "text",
+            "type": "string",
+            "hint": {
+              "en_US": "query template",
+              "zh_CN": "查询语句模版"
+            },
+            "label": {
+              "en_US": "query template",
+              "zh_CN": "查询语句模版"
+            }
+          },
+          {
+            "name": "indexField",
+            "default": "",
+            "optional": true,
+            "control": "text",
+            "type": "string",
+            "hint": {
+              "en_US": "index field",
+              "zh_CN": "索引字段名"
+            },
+            "label": {
+              "en_US": "indexField",
+              "zh_CN": "索引字段名"
+            }
+          },
+          {
+            "name": "indexValue",
+            "default": "",
+            "optional": true,
+            "control": "text",
+            "type": "string",
+            "hint": {
+              "en_US": "index init value",
+              "zh_CN": "索引字段初始值"
+            },
+            "label": {
+              "en_US": "index init value",
+              "zh_CN": "索引字段初始值"
+            }
+          },
+          {
+            "name": "indexFieldType",
+            "default": "",
+            "optional": true,
+            "control": "select",
+            "type": "string",
+            "values": ["DATETIME"],
+            "hint": {
+              "en_US": "is the index datetime type",
+              "zh_CN": "是否为时间格式"
+            },
+            "label": {
+              "en_US": "indexFieldType",
+              "zh_CN": "indexFieldType"
+            }
+          },
+          {
+            "name": "dateTimeFormat",
+            "default": "",
+            "optional": true,
+            "control": "text",
+            "type": "string",
+            "hint": {
+              "en_US": "dateTimeFormat",
+              "zh_CN": "dateTimeFormat"
+            },
+            "label": {
+              "en_US": "dateTimeFormat",
+              "zh_CN": "dateTimeFormat"
+            }
+          }
+        ],
+        "optional": true,
+        "control": "list",
+        "type": "list_object",
+        "hint": {
+          "en_US": "user defined query",
+          "zh_CN": "自定义查询"
+        },
+        "label": {
+          "en_US": "user defined query",
+          "zh_CN": "自定义查询"
+        }
+      }
+    ]
+  }
+}

+ 40 - 0
etc/sources/sql.yaml

@@ -0,0 +1,40 @@
+default:
+  interval: 10000
+  url: mysql://user:test@140.210.204.147/user?parseTime=true
+  internalSqlQueryCfg:
+    table: test
+    limit: 1
+    indexField: registerTime
+    indexValue: "2022-04-21 10:23:55"
+    indexFieldType: "DATETIME"
+    dateTimeFormat: "YYYY-MM-dd HH:mm:ss"
+
+sqlserver_config:
+  url: sqlserver://username:password@140.210.204.147/testdb
+  internalSqlQueryCfg:
+    table: Student
+    limit: 10
+    indexField: id
+    indexValue: 1000
+#    indexFieldType: "DATETIME"
+#    dateTimeFormat: "YYYY-MM-dd HH:mm:ssSSS"
+# select top 10 * from Student where id > 1010 order by id ASC
+
+mysql_config:
+  interval: 10000
+  url: mysql://user:test@140.210.204.147/user?parseTime=true
+  internalSqlQueryCfg:
+    table: test
+    limit: 1
+    indexField: registerTime
+    indexValue: "2022-04-21 10:23:55"
+    indexFieldType: "DATETIME"
+    dateTimeFormat: "YYYY-MM-dd HH:mm:ss"
+
+sqltemplate_config:
+  templateSqlQueryCfg:
+    TemplateSql: "select * from table where entry_data > {{.entry_data}}"
+    indexField: entry_data
+    indexValue: "2022-04-13 06:22:32.233"
+    indexFieldType: "DATETIME"
+    dateTimeFormat: "YYYY-MM-dd HH:mm:ssSSS"

+ 172 - 15
extensions.mod

@@ -18,71 +18,228 @@ require (
 	github.com/google/uuid v1.3.0
 	github.com/gorilla/handlers v1.4.2
 	github.com/gorilla/mux v1.7.3
-	github.com/influxdata/influxdb1-client v0.0.0-20200827194710-b269163b24ab
 	github.com/jhump/protoreflect v1.8.2
 	github.com/keepeye/logrus-filename v0.0.0-20190711075016-ce01a4391dd1
 	github.com/lestrrat-go/file-rotatelogs v2.4.0+incompatible
-	github.com/mattn/go-sqlite3 v1.14.5
+	github.com/mattn/go-sqlite3 v1.14.12
 	github.com/mitchellh/mapstructure v1.4.1
 	github.com/msgpack-rpc/msgpack-rpc-go v0.0.0-20131026060856-c76397e1782b
 	github.com/pebbe/zmq4 v1.2.7
 	github.com/prometheus/client_golang v1.11.0
 	github.com/sirupsen/logrus v1.8.1
 	github.com/ugorji/go/codec v1.2.5
-	github.com/urfave/cli v1.22.0
+	github.com/urfave/cli v1.22.4
 	go.nanomsg.org/mangos/v3 v3.2.1
-	golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4
-	google.golang.org/genproto v0.0.0-20210602131652-f16073e35f0c
-	google.golang.org/grpc v1.38.0
-	google.golang.org/protobuf v1.27.1
+	google.golang.org/genproto v0.0.0-20220211171837-173942840c17
+	google.golang.org/grpc v1.44.0
+	google.golang.org/protobuf v1.28.0
 	gopkg.in/ini.v1 v1.62.0
 	gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b
 )
 
 require (
+	cloud.google.com/go v0.100.2 // indirect
+	cloud.google.com/go/compute v1.2.0 // indirect
+	cloud.google.com/go/spanner v1.29.0 // indirect
+	github.com/Azure/azure-pipeline-go v0.2.3 // indirect
+	github.com/Azure/azure-storage-blob-go v0.14.0 // indirect
+	github.com/ClickHouse/clickhouse-go v1.5.4 // indirect
+	github.com/DATA-DOG/go-sqlmock v1.4.1 // indirect
+	github.com/IBM/nzgo v11.1.0+incompatible // indirect
 	github.com/Masterminds/goutils v1.1.1 // indirect
+	github.com/Masterminds/semver v1.4.2 // indirect
 	github.com/Masterminds/semver/v3 v3.1.1 // indirect
 	github.com/Microsoft/go-winio v0.4.11 // indirect
+	github.com/VoltDB/voltdb-client-go v1.0.13 // indirect
+	github.com/alexbrainman/odbc v0.0.0-20211220213544-9c9a2e61c5e2 // indirect
 	github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a // indirect
+	github.com/amsokol/ignite-go-client v0.12.2 // indirect
+	github.com/apache/arrow/go/arrow v0.0.0-20211112161151-bc219186db40 // indirect
+	github.com/apache/calcite-avatica-go/v5 v5.1.0 // indirect
+	github.com/apache/thrift v0.16.0 // indirect
+	github.com/asaskevich/govalidator v0.0.0-20190424111038-f61b66f89f4a // indirect
+	github.com/aws/aws-sdk-go v1.38.68 // indirect
+	github.com/aws/aws-sdk-go-v2 v1.11.0 // indirect
+	github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.0.0 // indirect
+	github.com/aws/aws-sdk-go-v2/credentials v1.6.1 // indirect
+	github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.7.1 // indirect
+	github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.0 // indirect
+	github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.0.0 // indirect
+	github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.5.0 // indirect
+	github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.5.0 // indirect
+	github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.9.0 // indirect
+	github.com/aws/aws-sdk-go-v2/service/s3 v1.19.0 // indirect
+	github.com/aws/smithy-go v1.9.0 // indirect
+	github.com/beltran/gohive v1.5.3 // indirect
+	github.com/beltran/gosasl v0.0.0-20200715011608-d5475aebb293 // indirect
+	github.com/beltran/gssapi v0.0.0-20200324152954-d86554db4bab // indirect
 	github.com/beorn7/perks v1.0.1 // indirect
+	github.com/bippio/go-impala v2.1.0+incompatible // indirect
+	github.com/btnguyen2k/consu/gjrc v0.1.1 // indirect
+	github.com/btnguyen2k/consu/olaf v0.1.3 // indirect
+	github.com/btnguyen2k/consu/reddo v0.1.4 // indirect
+	github.com/btnguyen2k/consu/semita v0.1.4 // indirect
+	github.com/btnguyen2k/gocosmos v0.1.6 // indirect
+	github.com/census-instrumentation/opencensus-proto v0.3.0 // indirect
 	github.com/cespare/xxhash/v2 v2.1.1 // indirect
-	github.com/cpuguy83/go-md2man v1.0.10 // indirect
+	github.com/cloudflare/golz4 v0.0.0-20150217214814-ef862a3cdc58 // indirect
+	github.com/cncf/udpa/go v0.0.0-20210930031921-04548b0d99d4 // indirect
+	github.com/cncf/xds/go v0.0.0-20211011173535-cb28da3451f1 // indirect
+	github.com/couchbase/go-couchbase v0.1.1 // indirect
+	github.com/couchbase/go_n1ql v0.0.0-20220303011133-0ed4bf93e31d // indirect
+	github.com/couchbase/gomemcached v0.1.4 // indirect
+	github.com/couchbase/goutils v0.1.2 // indirect
+	github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d // indirect
+	github.com/creack/pty v1.1.11 // indirect
+	github.com/denisenkom/go-mssqldb v0.12.0 // indirect
+	github.com/edsrzf/mmap-go v1.1.0 // indirect
+	github.com/envoyproxy/go-control-plane v0.9.10-0.20210907150352-cf90f659a021 // indirect
+	github.com/envoyproxy/protoc-gen-validate v0.1.0 // indirect
 	github.com/fastly/go-utils v0.0.0-20180712184237-d95a45783239 // indirect
+	github.com/fatih/color v1.9.0 // indirect
+	github.com/form3tech-oss/jwt-go v3.2.5+incompatible // indirect
 	github.com/fxamacker/cbor/v2 v2.3.0 // indirect
+	github.com/gabriel-vasile/mimetype v1.4.0 // indirect
+	github.com/go-logfmt/logfmt v0.5.1 // indirect
+	github.com/go-logr/logr v1.2.3 // indirect
+	github.com/go-ole/go-ole v1.2.5 // indirect
+	github.com/go-openapi/errors v0.19.2 // indirect
+	github.com/go-openapi/strfmt v0.19.5 // indirect
 	github.com/go-playground/locales v0.14.0 // indirect
 	github.com/go-playground/universal-translator v0.18.0 // indirect
 	github.com/go-playground/validator/v10 v10.9.0 // indirect
+	github.com/go-sql-driver/mysql v1.6.0 // indirect
+	github.com/go-stack/stack v1.8.0 // indirect
+	github.com/go-zookeeper/zk v1.0.2 // indirect
+	github.com/godror/godror v0.33.0 // indirect
+	github.com/godror/knownpb v0.1.0 // indirect
+	github.com/golang-sql/civil v0.0.0-20190719163853-cb61b32ac6fe // indirect
+	github.com/golang-sql/sqlexp v0.0.0-20170517235910-f1bb20e5a188 // indirect
+	github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e // indirect
+	github.com/google/flatbuffers v2.0.0+incompatible // indirect
+	github.com/google/go-cmp v0.5.7 // indirect
+	github.com/googleapis/gax-go/v2 v2.1.1 // indirect
+	github.com/googleapis/go-sql-spanner v0.0.0-20220321120010-12780e57be1c // indirect
 	github.com/gorilla/websocket v1.4.2 // indirect
+	github.com/hashicorp/go-uuid v1.0.2 // indirect
 	github.com/huandu/xstrings v1.3.2 // indirect
 	github.com/imdario/mergo v0.3.12 // indirect
+	github.com/influxdata/influxdb1-client v0.0.0-20220302092344-a9ab5670611c // indirect
+	github.com/jackc/chunkreader/v2 v2.0.1 // indirect
+	github.com/jackc/pgconn v1.12.0 // indirect
+	github.com/jackc/pgio v1.0.0 // indirect
+	github.com/jackc/pgpassfile v1.0.0 // indirect
+	github.com/jackc/pgproto3/v2 v2.3.0 // indirect
+	github.com/jackc/pgservicefile v0.0.0-20200714003250-2b9c44734f2b // indirect
+	github.com/jackc/pgtype v1.11.0 // indirect
+	github.com/jackc/pgx/v4 v4.16.0 // indirect
+	github.com/jcmturner/aescts/v2 v2.0.0 // indirect
+	github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect
+	github.com/jcmturner/gofork v1.0.0 // indirect
+	github.com/jcmturner/goidentity/v6 v6.0.1 // indirect
+	github.com/jcmturner/gokrb5/v8 v8.4.2 // indirect
+	github.com/jcmturner/rpc/v2 v2.0.3 // indirect
+	github.com/jedib0t/go-pretty v4.3.0+incompatible // indirect
 	github.com/jehiah/go-strftime v0.0.0-20171201141054-1d33003b3869 // indirect
+	github.com/jmespath/go-jmespath v0.4.0 // indirect
+	github.com/jmrobles/h2go v0.5.0 // indirect
 	github.com/jonboulle/clockwork v0.2.2 // indirect
+	github.com/kardianos/osext v0.0.0-20190222173326-2bc1f35cddc0 // indirect
+	github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 // indirect
+	github.com/klauspost/compress v1.13.6 // indirect
 	github.com/leodido/go-urn v1.2.1 // indirect
 	github.com/lestrrat-go/strftime v1.0.3 // indirect
-	github.com/mattn/go-tflite v1.0.1 // indirect
+	github.com/lf-edge/ekuiper/extensions v0.0.0-20220425035204-e059190dab55 // indirect
+	github.com/lib/pq v1.10.5 // indirect
+	github.com/mattn/go-adodb v0.0.1 // indirect
+	github.com/mattn/go-colorable v0.1.6 // indirect
+	github.com/mattn/go-ieproxy v0.0.1 // indirect
+	github.com/mattn/go-isatty v0.0.12 // indirect
+	github.com/mattn/go-runewidth v0.0.9 // indirect
 	github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
 	github.com/mitchellh/copystructure v1.2.0 // indirect
 	github.com/mitchellh/reflectwalk v1.0.2 // indirect
 	github.com/mmcloughlin/geohash v0.10.0 // indirect
 	github.com/msgpack/msgpack-go v0.0.0-20130625150338-8224460e6fa3 // indirect
+	github.com/nakagami/firebirdsql v0.9.4 // indirect
 	github.com/nfnt/resize v0.0.0-20180221191011-83c6a9932646 // indirect
+	github.com/mattn/go-tflite v1.0.1 // indirect
+	github.com/mattn/go-pointer v0.0.0-20190911064623-a0a44394634f // indirect
+	github.com/pierrec/lz4/v4 v4.1.11 // indirect
+	github.com/pkg/browser v0.0.0-20210911075715-681adbf594b8 // indirect
 	github.com/pkg/errors v0.9.1 // indirect
+	github.com/posener/order v0.0.1 // indirect
+	github.com/prestodb/presto-go-client v0.0.0-20211201125635-ad28cec17d6c // indirect
 	github.com/prometheus/client_model v0.2.0 // indirect
 	github.com/prometheus/common v0.26.0 // indirect
 	github.com/prometheus/procfs v0.6.0 // indirect
-	github.com/russross/blackfriday v1.5.2 // indirect
+	github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0 // indirect
+	github.com/russross/blackfriday/v2 v2.0.1 // indirect
 	github.com/shopspring/decimal v1.2.0 // indirect
+	github.com/shurcooL/sanitized_anchor_name v1.0.0 // indirect
+	github.com/sijms/go-ora/v2 v2.4.16 // indirect
+	github.com/snowflakedb/gosnowflake v1.6.9 // indirect
+	github.com/spaolacci/murmur3 v1.1.0 // indirect
 	github.com/spf13/cast v1.3.1 // indirect
-	github.com/taosdata/driver-go/v2 v2.0.0 // indirect
+	github.com/taosdata/driver-go/v2 v2.0.1 // indirect
 	github.com/tebeka/strftime v0.1.5 // indirect
+	github.com/thda/tds v0.1.7 // indirect
+	github.com/trinodb/trino-go-client v0.300.0 // indirect
+	github.com/uber-go/tally v3.3.17+incompatible // indirect
+	github.com/uber/athenadriver v1.1.13 // indirect
+	github.com/unchartedsoftware/witch v0.0.0-20200617171400-4f405404126f // indirect
+	github.com/vertica/vertica-sql-go v1.2.1 // indirect
 	github.com/x448/float16 v0.8.4 // indirect
+	github.com/xinsnake/go-http-digest-auth-client v0.6.0 // indirect
+	github.com/xo/dburl v0.9.1 // indirect
+	github.com/xwb1989/sqlparser v0.0.0-20180606152119-120387863bf2 // indirect
 	github.com/yuin/gopher-lua v0.0.0-20200816102855-ee81675732da // indirect
-	golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97 // indirect
-	golang.org/x/sys v0.0.0-20210806184541-e5e7981a1069 // indirect
-	golang.org/x/text v0.3.6 // indirect
+	github.com/ziutek/mymysql v1.5.4 // indirect
+	gitlab.com/nyarla/go-crypt v0.0.0-20160106005555-d9a5dc2b789b // indirect
+	go.mongodb.org/mongo-driver v1.0.3 // indirect
+	go.opencensus.io v0.23.0 // indirect
+	go.uber.org/atomic v1.7.0 // indirect
+	go.uber.org/multierr v1.6.0 // indirect
+	go.uber.org/zap v1.17.0 // indirect
+	golang.org/x/crypto v0.0.0-20211117183948-ae814b36b871 // indirect
+	golang.org/x/mod v0.4.2 // indirect
+	golang.org/x/net v0.0.0-20211118161319-6a13c67c3ce4 // indirect
+	golang.org/x/oauth2 v0.0.0-20211104180415-d3ed0bb246c8 // indirect
+	golang.org/x/sys v0.0.0-20220204135822-1c1b9b1eba6a // indirect
+	golang.org/x/text v0.3.7 // indirect
+	golang.org/x/tools v0.1.5 // indirect
+	golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
+	google.golang.org/api v0.68.0 // indirect
+	google.golang.org/appengine v1.6.7 // indirect
+	gopkg.in/jcmturner/aescts.v1 v1.0.1 // indirect
+	gopkg.in/jcmturner/dnsutils.v1 v1.0.1 // indirect
+	gopkg.in/jcmturner/gokrb5.v6 v6.1.1 // indirect
+	gopkg.in/jcmturner/rpc.v1 v1.1.0 // indirect
 	gopkg.in/yaml.v2 v2.4.0 // indirect
+	lukechampine.com/uint128 v1.1.1 // indirect
+	modernc.org/b v1.0.2 // indirect
+	modernc.org/cc/v3 v3.35.26 // indirect
+	modernc.org/ccgo/v3 v3.16.2 // indirect
+	modernc.org/db v1.0.3 // indirect
+	modernc.org/file v1.0.3 // indirect
+	modernc.org/fileutil v1.0.0 // indirect
+	modernc.org/golex v1.0.1 // indirect
+	modernc.org/internal v1.0.3 // indirect
+	modernc.org/libc v1.15.0 // indirect
+	modernc.org/lldb v1.0.2 // indirect
+	modernc.org/mathutil v1.4.1 // indirect
+	modernc.org/memory v1.0.7 // indirect
+	modernc.org/opt v0.1.1 // indirect
+	modernc.org/ql v1.4.1 // indirect
+	modernc.org/sortutil v1.1.0 // indirect
+	modernc.org/sqlite v1.17.0 // indirect
+	modernc.org/strutil v1.1.1 // indirect
+	modernc.org/token v1.0.0 // indirect
+	modernc.org/zappy v1.0.3 // indirect
+	sqlflow.org/gohive v0.0.0-20200521083454-ed52ee669b84 // indirect
+	sqlflow.org/gomaxcompute v0.0.0-20210805062559-c14ae028b44c // indirect
 )
 
-replace github.com/lf-edge/ekuiper/extensions => ./extensions
+replace github.com/lf-edge/ekuiper/extensions v0.0.0-20220425035204-e059190dab55 => ./extensions
 
 go 1.17

File diff suppressed because it is too large
+ 1028 - 15
extensions.sum


+ 193 - 1
extensions/go.mod

@@ -3,16 +3,208 @@ module github.com/lf-edge/ekuiper/extensions
 go 1.17
 
 require (
+	github.com/ClickHouse/clickhouse-go v1.5.4
+	github.com/IBM/nzgo v11.1.0+incompatible
+	github.com/VoltDB/voltdb-client-go v1.0.13
+	github.com/alexbrainman/odbc v0.0.0-20211220213544-9c9a2e61c5e2
+	github.com/amsokol/ignite-go-client v0.12.2
+	github.com/apache/calcite-avatica-go/v5 v5.1.0
+	github.com/bippio/go-impala v2.1.0+incompatible
+	github.com/btnguyen2k/gocosmos v0.1.6
+	github.com/couchbase/go_n1ql v0.0.0-20220303011133-0ed4bf93e31d
+	github.com/denisenkom/go-mssqldb v0.12.0
 	github.com/go-redis/redis/v7 v7.3.0
+	github.com/go-sql-driver/mysql v1.6.0
+	github.com/godror/godror v0.33.0
+	github.com/googleapis/go-sql-spanner v0.0.0-20220321120010-12780e57be1c
 	github.com/influxdata/influxdb1-client v0.0.0-20200827194710-b269163b24ab
+	github.com/jackc/pgx/v4 v4.16.0
+	github.com/jmrobles/h2go v0.5.0
 	github.com/lf-edge/ekuiper v0.0.0-20210705062157-b68b45211d6e
+	github.com/lib/pq v1.10.5
+	github.com/mattn/go-adodb v0.0.1
+	github.com/mattn/go-sqlite3 v1.14.12
 	github.com/mattn/go-tflite v1.0.1
 	github.com/mmcloughlin/geohash v0.10.0
+	github.com/nakagami/firebirdsql v0.9.4
 	github.com/nfnt/resize v0.0.0-20180221191011-83c6a9932646
 	github.com/pebbe/zmq4 v1.2.7
+	github.com/posener/order v0.0.1
+	github.com/prestodb/presto-go-client v0.0.0-20211201125635-ad28cec17d6c
+	github.com/sijms/go-ora/v2 v2.4.16
+	github.com/snowflakedb/gosnowflake v1.6.9
 	github.com/taosdata/driver-go/v2 v2.0.0
+	github.com/thda/tds v0.1.7
+	github.com/trinodb/trino-go-client v0.300.0
+	github.com/uber/athenadriver v1.1.13
+	github.com/vertica/vertica-sql-go v1.2.1
+	github.com/xo/dburl v0.9.1
+	github.com/ziutek/mymysql v1.5.4
+	modernc.org/ql v1.4.1
+	modernc.org/sqlite v1.17.0
+	sqlflow.org/gohive v0.0.0-20200521083454-ed52ee669b84
+	sqlflow.org/gomaxcompute v0.0.0-20210805062559-c14ae028b44c
 )
 
-require gopkg.in/yaml.v2 v2.4.0 // indirect
+require (
+	cloud.google.com/go v0.100.2 // indirect
+	cloud.google.com/go/compute v1.2.0 // indirect
+	cloud.google.com/go/spanner v1.29.0 // indirect
+	github.com/Azure/azure-pipeline-go v0.2.3 // indirect
+	github.com/Azure/azure-storage-blob-go v0.14.0 // indirect
+	github.com/DATA-DOG/go-sqlmock v1.4.1 // indirect
+	github.com/Masterminds/goutils v1.1.1 // indirect
+	github.com/Masterminds/semver v1.4.2 // indirect
+	github.com/Masterminds/semver/v3 v3.1.1 // indirect
+	github.com/Masterminds/sprig/v3 v3.2.1 // indirect
+	github.com/PaesslerAG/gval v1.0.0 // indirect
+	github.com/PaesslerAG/jsonpath v0.1.1 // indirect
+	github.com/apache/arrow/go/arrow v0.0.0-20211112161151-bc219186db40 // indirect
+	github.com/apache/thrift v0.16.0 // indirect
+	github.com/asaskevich/govalidator v0.0.0-20190424111038-f61b66f89f4a // indirect
+	github.com/aws/aws-sdk-go v1.38.68 // indirect
+	github.com/aws/aws-sdk-go-v2 v1.11.0 // indirect
+	github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.0.0 // indirect
+	github.com/aws/aws-sdk-go-v2/credentials v1.6.1 // indirect
+	github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.7.1 // indirect
+	github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.0 // indirect
+	github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.0.0 // indirect
+	github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.5.0 // indirect
+	github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.5.0 // indirect
+	github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.9.0 // indirect
+	github.com/aws/aws-sdk-go-v2/service/s3 v1.19.0 // indirect
+	github.com/aws/smithy-go v1.9.0 // indirect
+	github.com/beltran/gohive v1.5.3 // indirect
+	github.com/beltran/gosasl v0.0.0-20200715011608-d5475aebb293 // indirect
+	github.com/beltran/gssapi v0.0.0-20200324152954-d86554db4bab // indirect
+	github.com/benbjohnson/clock v1.0.0 // indirect
+	github.com/btnguyen2k/consu/gjrc v0.1.1 // indirect
+	github.com/btnguyen2k/consu/olaf v0.1.3 // indirect
+	github.com/btnguyen2k/consu/reddo v0.1.4 // indirect
+	github.com/btnguyen2k/consu/semita v0.1.4 // indirect
+	github.com/census-instrumentation/opencensus-proto v0.3.0 // indirect
+	github.com/cespare/xxhash/v2 v2.1.1 // indirect
+	github.com/cloudflare/golz4 v0.0.0-20150217214814-ef862a3cdc58 // indirect
+	github.com/cncf/udpa/go v0.0.0-20210930031921-04548b0d99d4 // indirect
+	github.com/cncf/xds/go v0.0.0-20211011173535-cb28da3451f1 // indirect
+	github.com/couchbase/go-couchbase v0.1.1 // indirect
+	github.com/couchbase/gomemcached v0.1.4 // indirect
+	github.com/couchbase/goutils v0.1.2 // indirect
+	github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d // indirect
+	github.com/creack/pty v1.1.11 // indirect
+	github.com/edsrzf/mmap-go v1.1.0 // indirect
+	github.com/envoyproxy/go-control-plane v0.9.10-0.20210907150352-cf90f659a021 // indirect
+	github.com/envoyproxy/protoc-gen-validate v0.1.0 // indirect
+	github.com/fatih/color v1.9.0 // indirect
+	github.com/form3tech-oss/jwt-go v3.2.5+incompatible // indirect
+	github.com/gabriel-vasile/mimetype v1.4.0 // indirect
+	github.com/go-logfmt/logfmt v0.5.1 // indirect
+	github.com/go-logr/logr v1.2.3 // indirect
+	github.com/go-ole/go-ole v1.2.5 // indirect
+	github.com/go-openapi/errors v0.19.2 // indirect
+	github.com/go-openapi/strfmt v0.19.5 // indirect
+	github.com/go-stack/stack v1.8.0 // indirect
+	github.com/go-zookeeper/zk v1.0.2 // indirect
+	github.com/godror/knownpb v0.1.0 // indirect
+	github.com/golang-sql/civil v0.0.0-20190719163853-cb61b32ac6fe // indirect
+	github.com/golang-sql/sqlexp v0.0.0-20170517235910-f1bb20e5a188 // indirect
+	github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e // indirect
+	github.com/golang/protobuf v1.5.2 // indirect
+	github.com/gomodule/redigo v2.0.0+incompatible // indirect
+	github.com/google/flatbuffers v2.0.0+incompatible // indirect
+	github.com/google/go-cmp v0.5.7 // indirect
+	github.com/google/uuid v1.3.0 // indirect
+	github.com/googleapis/gax-go/v2 v2.1.1 // indirect
+	github.com/hashicorp/go-uuid v1.0.2 // indirect
+	github.com/huandu/xstrings v1.3.2 // indirect
+	github.com/imdario/mergo v0.3.12 // indirect
+	github.com/jackc/chunkreader/v2 v2.0.1 // indirect
+	github.com/jackc/pgconn v1.12.0 // indirect
+	github.com/jackc/pgio v1.0.0 // indirect
+	github.com/jackc/pgpassfile v1.0.0 // indirect
+	github.com/jackc/pgproto3/v2 v2.3.0 // indirect
+	github.com/jackc/pgservicefile v0.0.0-20200714003250-2b9c44734f2b // indirect
+	github.com/jackc/pgtype v1.11.0 // indirect
+	github.com/jcmturner/aescts/v2 v2.0.0 // indirect
+	github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect
+	github.com/jcmturner/gofork v1.0.0 // indirect
+	github.com/jcmturner/goidentity/v6 v6.0.1 // indirect
+	github.com/jcmturner/gokrb5/v8 v8.4.2 // indirect
+	github.com/jcmturner/rpc/v2 v2.0.3 // indirect
+	github.com/jedib0t/go-pretty v4.3.0+incompatible // indirect
+	github.com/jmespath/go-jmespath v0.4.0 // indirect
+	github.com/kardianos/osext v0.0.0-20190222173326-2bc1f35cddc0 // indirect
+	github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 // indirect
+	github.com/keepeye/logrus-filename v0.0.0-20190711075016-ce01a4391dd1 // indirect
+	github.com/klauspost/compress v1.13.6 // indirect
+	github.com/lestrrat-go/file-rotatelogs v2.4.0+incompatible // indirect
+	github.com/lestrrat-go/strftime v1.0.3 // indirect
+	github.com/mattn/go-colorable v0.1.6 // indirect
+	github.com/mattn/go-ieproxy v0.0.1 // indirect
+	github.com/mattn/go-isatty v0.0.12 // indirect
+	github.com/mattn/go-pointer v0.0.0-20190911064623-a0a44394634f // indirect
+	github.com/mattn/go-runewidth v0.0.9 // indirect
+	github.com/mitchellh/copystructure v1.2.0 // indirect
+	github.com/mitchellh/mapstructure v1.4.1 // indirect
+	github.com/mitchellh/reflectwalk v1.0.2 // indirect
+	github.com/pierrec/lz4/v4 v4.1.11 // indirect
+	github.com/pkg/browser v0.0.0-20210911075715-681adbf594b8 // indirect
+	github.com/pkg/errors v0.9.1 // indirect
+	github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0 // indirect
+	github.com/rogpeppe/go-internal v1.8.0 // indirect
+	github.com/russross/blackfriday/v2 v2.0.1 // indirect
+	github.com/shopspring/decimal v1.2.0 // indirect
+	github.com/shurcooL/sanitized_anchor_name v1.0.0 // indirect
+	github.com/sirupsen/logrus v1.8.1 // indirect
+	github.com/spaolacci/murmur3 v1.1.0 // indirect
+	github.com/spf13/cast v1.3.1 // indirect
+	github.com/uber-go/tally v3.3.17+incompatible // indirect
+	github.com/unchartedsoftware/witch v0.0.0-20200617171400-4f405404126f // indirect
+	github.com/urfave/cli v1.22.4 // indirect
+	github.com/xinsnake/go-http-digest-auth-client v0.6.0 // indirect
+	github.com/xwb1989/sqlparser v0.0.0-20180606152119-120387863bf2 // indirect
+	gitlab.com/nyarla/go-crypt v0.0.0-20160106005555-d9a5dc2b789b // indirect
+	go.mongodb.org/mongo-driver v1.0.3 // indirect
+	go.opencensus.io v0.23.0 // indirect
+	go.uber.org/atomic v1.7.0 // indirect
+	go.uber.org/multierr v1.6.0 // indirect
+	go.uber.org/zap v1.17.0 // indirect
+	golang.org/x/crypto v0.0.0-20211117183948-ae814b36b871 // indirect
+	golang.org/x/mod v0.4.2 // indirect
+	golang.org/x/net v0.0.0-20211118161319-6a13c67c3ce4 // indirect
+	golang.org/x/oauth2 v0.0.0-20211104180415-d3ed0bb246c8 // indirect
+	golang.org/x/sys v0.0.0-20220204135822-1c1b9b1eba6a // indirect
+	golang.org/x/text v0.3.7 // indirect
+	golang.org/x/tools v0.1.5 // indirect
+	golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
+	google.golang.org/api v0.68.0 // indirect
+	google.golang.org/appengine v1.6.7 // indirect
+	google.golang.org/genproto v0.0.0-20220211171837-173942840c17 // indirect
+	google.golang.org/grpc v1.44.0 // indirect
+	google.golang.org/protobuf v1.28.0 // indirect
+	gopkg.in/jcmturner/aescts.v1 v1.0.1 // indirect
+	gopkg.in/jcmturner/dnsutils.v1 v1.0.1 // indirect
+	gopkg.in/jcmturner/gokrb5.v6 v6.1.1 // indirect
+	gopkg.in/jcmturner/rpc.v1 v1.1.0 // indirect
+	gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect
+	lukechampine.com/uint128 v1.1.1 // indirect
+	modernc.org/b v1.0.2 // indirect
+	modernc.org/cc/v3 v3.35.26 // indirect
+	modernc.org/ccgo/v3 v3.16.2 // indirect
+	modernc.org/db v1.0.3 // indirect
+	modernc.org/file v1.0.3 // indirect
+	modernc.org/fileutil v1.0.0 // indirect
+	modernc.org/golex v1.0.1 // indirect
+	modernc.org/internal v1.0.3 // indirect
+	modernc.org/libc v1.15.0 // indirect
+	modernc.org/lldb v1.0.2 // indirect
+	modernc.org/mathutil v1.4.1 // indirect
+	modernc.org/memory v1.0.7 // indirect
+	modernc.org/opt v0.1.1 // indirect
+	modernc.org/sortutil v1.1.0 // indirect
+	modernc.org/strutil v1.1.1 // indirect
+	modernc.org/token v1.0.0 // indirect
+	modernc.org/zappy v1.0.3 // indirect
+)
 
 replace github.com/lf-edge/ekuiper => ../

File diff suppressed because it is too large
+ 1049 - 5
extensions/go.sum


+ 184 - 0
extensions/sources/sql/sql.go

@@ -0,0 +1,184 @@
+// Copyright 2022 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package main
+
+import (
+	"database/sql"
+	"database/sql/driver"
+	"fmt"
+	driver2 "github.com/lf-edge/ekuiper/extensions/sqldatabase/driver"
+	"github.com/lf-edge/ekuiper/extensions/sqldatabase/sqlgen"
+	"github.com/lf-edge/ekuiper/pkg/api"
+	"github.com/lf-edge/ekuiper/pkg/cast"
+	"github.com/xo/dburl"
+	"reflect"
+	"time"
+)
+
+type sqlConConfig struct {
+	Interval int    `json:"interval"`
+	Url      string `json:"url"`
+}
+
+type sqlsource struct {
+	conf  *sqlConConfig
+	Query sqlgen.SqlQueryGenerator
+	//The db connection instance
+	db *sql.DB
+}
+
+func (m *sqlsource) Configure(_ string, props map[string]interface{}) error {
+	cfg := &sqlConConfig{}
+
+	err := cast.MapToStruct(props, cfg)
+	if err != nil {
+		return fmt.Errorf("read properties %v fail with error: %v", props, err)
+	}
+	if cfg.Url == "" {
+		return fmt.Errorf("property Url is required")
+	}
+	if cfg.Interval == 0 {
+		return fmt.Errorf("property interval is required")
+	}
+
+	Db, err := dburl.Parse(cfg.Url)
+	if err != nil {
+		return fmt.Errorf("dburl.Parse %s fail with error: %v", cfg.Url, err)
+	}
+
+	generator, err := sqlgen.GetQueryGenerator(Db, props)
+	if err != nil {
+		return fmt.Errorf("GetQueryGenerator %s fail with error: %v", cfg.Url, err)
+	}
+
+	m.Query = generator
+	m.conf = cfg
+	return nil
+}
+
+func (m *sqlsource) Open(ctx api.StreamContext, consumer chan<- api.SourceTuple, errCh chan<- error) {
+	logger := ctx.GetLogger()
+	logger.Debugf("Opening sql stream %v", m.conf)
+
+	db, err := dburl.Open(m.conf.Url)
+	if err != nil {
+		logger.Errorf("connection to %s Open with error %v", m.conf.Url, err)
+		logger.Errorf("support build tags are %v", driver2.KnownBuildTags())
+		errCh <- err
+		return
+	}
+	m.db = db
+
+	defer func() {
+		_ = m.db.Close()
+	}()
+
+	t := time.NewTicker(time.Duration(m.conf.Interval) * time.Millisecond)
+	defer t.Stop()
+	for {
+		select {
+		case <-t.C:
+			query, err := m.Query.SqlQueryStatement()
+			if err != nil {
+				logger.Errorf("Get sql query error %v", err)
+			}
+			logger.Debugf("Query the database with %s", query)
+			rows, err := m.db.Query(query)
+			if err != nil {
+				logger.Errorf("Run sql query(%s) error %v", query, err)
+				errCh <- err
+				return
+			}
+
+			cols, _ := rows.Columns()
+
+			types, err := rows.ColumnTypes()
+			if err != nil {
+				logger.Errorf("row ColumnTypes error %v", query, err)
+				errCh <- err
+				return
+			}
+			for rows.Next() {
+				data := make(map[string]interface{})
+				columns := make([]interface{}, len(cols))
+				m.prepareValues(columns, types, cols)
+
+				err := rows.Scan(columns...)
+				if err != nil {
+					logger.Errorf("Run sql scan(%s) error %v", query, err)
+					errCh <- err
+					return
+				}
+
+				m.scanIntoMap(data, columns, cols)
+				m.Query.UpdateMaxIndexValue(data)
+				consumer <- api.NewDefaultSourceTuple(data, nil)
+			}
+		case <-ctx.Done():
+			return
+		}
+	}
+}
+
+func (m *sqlsource) prepareValues(values []interface{}, columnTypes []*sql.ColumnType, columns []string) {
+	if len(columnTypes) > 0 {
+		for idx, columnType := range columnTypes {
+			if columnType.ScanType() != nil {
+				values[idx] = reflect.New(reflect.PtrTo(columnType.ScanType())).Interface()
+			} else {
+				values[idx] = new(interface{})
+			}
+		}
+	} else {
+		for idx := range columns {
+			values[idx] = new(interface{})
+		}
+	}
+}
+
+func (m *sqlsource) scanIntoMap(mapValue map[string]interface{}, values []interface{}, columns []string) {
+	for idx, column := range columns {
+		if reflectValue := reflect.Indirect(reflect.Indirect(reflect.ValueOf(values[idx]))); reflectValue.IsValid() {
+			mapValue[column] = reflectValue.Interface()
+			if valuer, ok := mapValue[column].(driver.Valuer); ok {
+				mapValue[column], _ = valuer.Value()
+			} else if b, ok := mapValue[column].(sql.RawBytes); ok {
+				mapValue[column] = string(b)
+			}
+		} else {
+			mapValue[column] = nil
+		}
+	}
+}
+
+func (m *sqlsource) GetOffset() (interface{}, error) {
+	return m.Query.GetIndexValue(), nil
+}
+
+func (m *sqlsource) Rewind(offset interface{}) error {
+	m.Query.SetIndexValue(offset)
+	return nil
+}
+
+func (m *sqlsource) Close(ctx api.StreamContext) error {
+	logger := ctx.GetLogger()
+	logger.Debugf("Closing sql stream to %v", m.conf)
+
+	return nil
+}
+
+func Sql() api.Source {
+	return &sqlsource{}
+}

+ 21 - 0
extensions/sqldatabase/driver/adodb.go

@@ -0,0 +1,21 @@
+// Copyright 2022 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//go:build (all || most || adodb) && !no_adodb
+
+package driver
+
+import (
+	_ "github.com/mattn/go-adodb" // Microsoft ADODB driver
+)

+ 51 - 0
extensions/sqldatabase/driver/apply.go

@@ -0,0 +1,51 @@
+// Copyright 2022 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package driver
+
+func KnownBuildTags() map[string]string {
+	return map[string]string{
+		"adodb":         "adodb",         // github.com/mattn/go-adodb
+		"athena":        "athena",        // github.com/uber/athenadriver/go
+		"avatica":       "avatica",       // github.com/apache/calcite-avatica-go/v5
+		"clickhouse":    "clickhouse",    // github.com/ClickHouse/clickhouse-go
+		"cosmos":        "cosmos",        // github.com/btnguyen2k/gocosmos
+		"couchbase":     "n1ql",          // github.com/couchbase/go_n1ql
+		"firebird":      "firebird",      // github.com/nakagami/firebirdsql
+		"godror":        "godror",        // github.com/godror/godror
+		"h2":            "h2",            // github.com/jmrobles/h2go
+		"hive":          "hive",          // sqlflow.org/gohive
+		"ignite":        "ignite",        // github.com/amsokol/ignite-go-client/sql
+		"impala":        "impala",        // github.com/bippio/go-impala
+		"maxcompute":    "maxcompute",    // sqlflow.org/gomaxcompute
+		"moderncsqlite": "moderncsqlite", // modernc.org/sqlite
+		"mymysql":       "mymysql",       // github.com/ziutek/mymysql/godrv
+		"mysql":         "mysql",         // github.com/go-sql-sqlgen/mysql
+		"netezza":       "netezza",       // github.com/IBM/nzgo
+		"odbc":          "odbc",          // github.com/alexbrainman/odbc
+		"oracle":        "oracle",        // github.com/sijms/go-ora/v2
+		"pgx":           "pgx",           // github.com/jackc/pgx/v4/stdlib
+		"postgres":      "postgres",      // github.com/lib/pq
+		"presto":        "presto",        // github.com/prestodb/presto-go-client/presto
+		"ql":            "ql",            // modernc.org/ql
+		"sapase":        "sapase",        // github.com/thda/tds
+		"snowflake":     "snowflake",     // github.com/snowflakedb/gosnowflake
+		"spanner":       "spanner",       // github.com/cloudspannerecosystem/go-sql-spanner
+		"sqlite3":       "sqlite3",       // github.com/mattn/go-sqlite3
+		"sqlserver":     "sqlserver",     // github.com/denisenkom/go-mssqldb
+		"trino":         "trino",         // github.com/trinodb/trino-go-client/trino
+		"vertica":       "vertica",       // github.com/vertica/vertica-sql-go
+		"voltdb":        "voltdb",        // github.com/VoltDB/voltdb-client-go/voltdbclient
+	}
+}

+ 21 - 0
extensions/sqldatabase/driver/athena.go

@@ -0,0 +1,21 @@
+// Copyright 2022 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//go:build (all || most || athena) && !no_athena
+
+package driver
+
+import (
+	_ "github.com/uber/athenadriver/go" // AWS Athena driver
+)

+ 21 - 0
extensions/sqldatabase/driver/avatica.go

@@ -0,0 +1,21 @@
+// Copyright 2022 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//go:build (all || most || avatica) && !no_avatica
+
+package driver
+
+import (
+	_ "github.com/apache/calcite-avatica-go/v5" // Apache Avatica driver
+)

+ 21 - 0
extensions/sqldatabase/driver/clickhouse.go

@@ -0,0 +1,21 @@
+// Copyright 2022 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//go:build (all || most || clickhouse) && !no_clickhouse
+
+package driver
+
+import (
+	_ "github.com/ClickHouse/clickhouse-go" // ClickHouse driver
+)

+ 21 - 0
extensions/sqldatabase/driver/cosmos.go

@@ -0,0 +1,21 @@
+// Copyright 2022 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//go:build (all || cosmos) && !no_cosmos
+
+package driver
+
+import (
+	_ "github.com/btnguyen2k/gocosmos" // Azure CosmosDB driver
+)

+ 21 - 0
extensions/sqldatabase/driver/couchbase.go

@@ -0,0 +1,21 @@
+// Copyright 2022 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//go:build (all || most || couchbase) && !no_couchbase
+
+package driver
+
+import (
+	_ "github.com/couchbase/go_n1ql" // Couchbase driver
+)

+ 21 - 0
extensions/sqldatabase/driver/firebird.go

@@ -0,0 +1,21 @@
+// Copyright 2022 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//go:build (all || most || firebird) && !no_firebird
+
+package driver
+
+import (
+	_ "github.com/nakagami/firebirdsql" // Firebird driver
+)

+ 21 - 0
extensions/sqldatabase/driver/godror.go

@@ -0,0 +1,21 @@
+// Copyright 2022 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//go:build (all || godror) && !no_godror
+
+package driver
+
+import (
+	_ "github.com/godror/godror" // GO DRiver for ORacle driver
+)

+ 21 - 0
extensions/sqldatabase/driver/h2.go

@@ -0,0 +1,21 @@
+// Copyright 2022 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//go:build (all || most || h2) && !no_h2
+
+package driver
+
+import (
+	_ "github.com/jmrobles/h2go" // Apache H2 driver
+)

+ 21 - 0
extensions/sqldatabase/driver/hive.go

@@ -0,0 +1,21 @@
+// Copyright 2022 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//go:build (all || most || hive) && !no_hive
+
+package driver
+
+import (
+	_ "sqlflow.org/gohive" // Apache Hive driver
+)

+ 21 - 0
extensions/sqldatabase/driver/ignite.go

@@ -0,0 +1,21 @@
+// Copyright 2022 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//go:build (all || most || ignite) && !no_ignite
+
+package driver
+
+import (
+	_ "github.com/amsokol/ignite-go-client/sql" // Apache Ignite driver
+)

+ 21 - 0
extensions/sqldatabase/driver/impala.go

@@ -0,0 +1,21 @@
+// Copyright 2022 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//go:build (all || impala) && !no_impala
+
+package driver
+
+import (
+	_ "github.com/bippio/go-impala" // Apache Impala driver
+)

+ 21 - 0
extensions/sqldatabase/driver/maxcompute.go

@@ -0,0 +1,21 @@
+// Copyright 2022 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//go:build (all || most || maxcompute) && !no_maxcompute
+
+package driver
+
+import (
+	_ "sqlflow.org/gomaxcompute" // Alibaba MaxCompute driver
+)

+ 21 - 0
extensions/sqldatabase/driver/moderncsqlite.go

@@ -0,0 +1,21 @@
+// Copyright 2022 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//go:build (all || most || moderncsqlite) && !no_moderncsqlite
+
+package driver
+
+import (
+	_ "modernc.org/sqlite" // ModernC SQLite3 driver
+)

+ 21 - 0
extensions/sqldatabase/driver/mymysql.go

@@ -0,0 +1,21 @@
+// Copyright 2022 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//go:build (all || most || mymysql) && !no_mymysql
+
+package driver
+
+import (
+	_ "github.com/ziutek/mymysql/godrv" // MySQL MyMySQL driver
+)

+ 21 - 0
extensions/sqldatabase/driver/mysql.go

@@ -0,0 +1,21 @@
+// Copyright 2022 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//go:build (!no_base || mysql) && !no_mysql
+
+package driver
+
+import (
+	_ "github.com/go-sql-driver/mysql" // Microsoft SQL Server sqlgen
+)

+ 21 - 0
extensions/sqldatabase/driver/netezza.go

@@ -0,0 +1,21 @@
+// Copyright 2022 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//go:build (all || most || netezza) && !no_netezza
+
+package driver
+
+import (
+	_ "github.com/IBM/nzgo" // Netezza driver
+)

+ 21 - 0
extensions/sqldatabase/driver/odbc.go

@@ -0,0 +1,21 @@
+// Copyright 2022 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//go:build (all || odbc) && !no_odbc
+
+package driver
+
+import (
+	_ "github.com/alexbrainman/odbc" // ODBC driver
+)

+ 21 - 0
extensions/sqldatabase/driver/oracle.go

@@ -0,0 +1,21 @@
+// Copyright 2022 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//go:build (!no_base || oracle) && !no_oracle
+
+package driver
+
+import (
+	_ "github.com/sijms/go-ora/v2" // Oracle Database driver
+)

+ 21 - 0
extensions/sqldatabase/driver/pgx.go

@@ -0,0 +1,21 @@
+// Copyright 2022 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//go:build (all || most || pgx) && !no_pgx
+
+package driver
+
+import (
+	_ "github.com/jackc/pgx/v4/stdlib" // PostgreSQL PGX driver
+)

+ 21 - 0
extensions/sqldatabase/driver/postgres.go

@@ -0,0 +1,21 @@
+// Copyright 2022 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//go:build (!no_base || postgres) && !no_postgres
+
+package driver
+
+import (
+	_ "github.com/lib/pq" // PostgreSQL driver
+)

+ 21 - 0
extensions/sqldatabase/driver/presto.go

@@ -0,0 +1,21 @@
+// Copyright 2022 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//go:build (all || most || presto) && !no_presto
+
+package driver
+
+import (
+	_ "github.com/prestodb/presto-go-client/presto" // Presto driver
+)

+ 21 - 0
extensions/sqldatabase/driver/ql.go

@@ -0,0 +1,21 @@
+// Copyright 2022 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//go:build (all || most || ql) && !no_ql
+
+package driver
+
+import (
+	_ "modernc.org/ql" // Cznic QL driver
+)

+ 21 - 0
extensions/sqldatabase/driver/sapase.go

@@ -0,0 +1,21 @@
+// Copyright 2022 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//go:build (all || most || sapase) && !no_sapase
+
+package driver
+
+import (
+	_ "github.com/thda/tds" // SAP ASE driver
+)

+ 21 - 0
extensions/sqldatabase/driver/snowflake.go

@@ -0,0 +1,21 @@
+// Copyright 2022 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//go:build (all || snowflake) && !no_snowflake
+
+package driver
+
+import (
+	_ "github.com/snowflakedb/gosnowflake" // Snowflake driver
+)

+ 21 - 0
extensions/sqldatabase/driver/spanner.go

@@ -0,0 +1,21 @@
+// Copyright 2022 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//go:build (all || most || spanner) && !no_spanner
+
+package driver
+
+import (
+	_ "github.com/googleapis/go-sql-spanner" // Google Spanner driver
+)

+ 21 - 0
extensions/sqldatabase/driver/sqlite3.go

@@ -0,0 +1,21 @@
+// Copyright 2022 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//go:build (!no_base || sqlite3) && !no_sqlite3
+
+package driver
+
+import (
+	_ "github.com/mattn/go-sqlite3" // SQLite3 driver
+)

+ 21 - 0
extensions/sqldatabase/driver/sqlserver.go

@@ -0,0 +1,21 @@
+// Copyright 2022 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//go:build (!no_base || sqlserver) && !no_sqlserver
+
+package driver
+
+import (
+	_ "github.com/denisenkom/go-mssqldb" // Microsoft SQL Server
+)

+ 21 - 0
extensions/sqldatabase/driver/trino.go

@@ -0,0 +1,21 @@
+// Copyright 2022 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//go:build (all || most || trino) && !no_trino
+
+package driver
+
+import (
+	_ "github.com/trinodb/trino-go-client/trino" // Trino driver
+)

+ 21 - 0
extensions/sqldatabase/driver/vertica.go

@@ -0,0 +1,21 @@
+// Copyright 2022 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//go:build (all || most || vertica) && !no_vertica
+
+package driver
+
+import (
+	_ "github.com/vertica/vertica-sql-go" // Vertica driver
+)

+ 21 - 0
extensions/sqldatabase/driver/voltdb.go

@@ -0,0 +1,21 @@
+// Copyright 2022 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//go:build (all || most || voltdb) && !no_voltdb
+
+package driver
+
+import (
+	_ "github.com/VoltDB/voltdb-client-go/voltdbclient" // VoltDB driver
+)

+ 94 - 0
extensions/sqldatabase/sqlgen/commonSqlDialect.go

@@ -0,0 +1,94 @@
+// Copyright 2022 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package sqlgen
+
+import (
+	"fmt"
+	"github.com/lf-edge/ekuiper/pkg/cast"
+)
+
+type CommonQueryGenerator struct {
+	*InternalSqlQueryCfg
+}
+
+func (q *CommonQueryGenerator) quoteIdentifier(identifier string) string {
+	return "'" + identifier + "'"
+}
+
+func (q *CommonQueryGenerator) getSelect() string {
+	return "select * from " + q.Table + " "
+}
+
+func (q *CommonQueryGenerator) getCondition() (string, error) {
+	var val string
+	if q.IndexField != "" {
+		if q.IndexFieldType == DATETIME_TYPE && q.DateTimeFormat != "" {
+			t, err := cast.InterfaceToTime(q.IndexValue, q.DateTimeFormat)
+			if err != nil {
+				err = fmt.Errorf("SqlQueryStatement InterfaceToTime datetime convert got error %v", err)
+				return "", err
+			}
+			val, err = cast.FormatTime(t, q.DateTimeFormat)
+			if err != nil {
+				err = fmt.Errorf("SqlQueryStatement FormatTime datetime convert got error %v", err)
+				return "", err
+			}
+		} else {
+			val = fmt.Sprintf("%v", q.IndexValue)
+		}
+		return "where " + q.IndexField + " > " + q.quoteIdentifier(val) + " ", nil
+	}
+
+	return "", nil
+}
+
+func (q *CommonQueryGenerator) getOrderby() string {
+	if q.IndexField != "" {
+		return "order by " + q.quoteIdentifier(q.IndexField) + " ASC "
+	}
+	return ""
+}
+
+func (q *CommonQueryGenerator) getLimit() string {
+	if q.Limit != 0 {
+		return fmt.Sprintf("limit %d", q.Limit)
+	}
+	return ""
+}
+
+func NewCommonSqlQuery(cfg *InternalSqlQueryCfg) SqlQueryGenerator {
+	in := &CommonQueryGenerator{
+		InternalSqlQueryCfg: cfg,
+	}
+	return in
+}
+
+func (q *CommonQueryGenerator) SqlQueryStatement() (string, error) {
+	con, err := q.getCondition()
+	if err != nil {
+		return "", err
+	}
+	return q.getSelect() + con + q.getOrderby() + q.getLimit(), nil
+}
+
+func (q *CommonQueryGenerator) UpdateMaxIndexValue(row map[string]interface{}) {
+	if q.IndexField != "" {
+		v, found := row[q.IndexField]
+		if !found {
+			return
+		}
+		q.IndexValue = v
+	}
+}

+ 92 - 0
extensions/sqldatabase/sqlgen/sqlServerDialect.go

@@ -0,0 +1,92 @@
+// Copyright 2022 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package sqlgen
+
+import (
+	"fmt"
+	"github.com/lf-edge/ekuiper/pkg/cast"
+)
+
+type SqlServerQueryGenerator struct {
+	*InternalSqlQueryCfg
+}
+
+func (q *SqlServerQueryGenerator) quoteIdentifier(identifier string) string {
+	return "'" + identifier + "'"
+}
+
+func (q *SqlServerQueryGenerator) getSelect() string {
+	if q.Limit != 0 {
+		return fmt.Sprintf("select top %d * from %s ", q.Limit, q.Table)
+	} else {
+		return "select * from " + q.Table + " "
+	}
+}
+
+func (q *SqlServerQueryGenerator) getCondition() (string, error) {
+	var val string
+	if q.IndexField != "" {
+		if q.IndexFieldType == DATETIME_TYPE && q.DateTimeFormat != "" {
+			t, err := cast.InterfaceToTime(q.IndexValue, q.DateTimeFormat)
+			if err != nil {
+				err = fmt.Errorf("SqlQueryStatement InterfaceToTime datetime convert got error %v", err)
+				return "", err
+			}
+			val, err = cast.FormatTime(t, q.DateTimeFormat)
+			if err != nil {
+				err = fmt.Errorf("SqlQueryStatement FormatTime datetime convert got error %v", err)
+				return "", err
+			}
+		} else {
+			val = fmt.Sprintf("%v", q.IndexValue)
+		}
+		return "where " + q.IndexField + " > " + q.quoteIdentifier(val) + " ", nil
+	}
+
+	return "", nil
+}
+
+func (q *SqlServerQueryGenerator) getOrderby() string {
+	if q.IndexField != "" {
+		return "order by " + q.IndexField + " ASC"
+	}
+	return ""
+}
+
+func NewSqlServerQuery(cfg *InternalSqlQueryCfg) SqlQueryGenerator {
+	in := &SqlServerQueryGenerator{
+		InternalSqlQueryCfg: cfg,
+	}
+	return in
+}
+
+func (q *SqlServerQueryGenerator) SqlQueryStatement() (string, error) {
+	con, err := q.getCondition()
+	if err != nil {
+		return "", err
+	}
+	return q.getSelect() + con + q.getOrderby(), nil
+}
+
+func (q *SqlServerQueryGenerator) UpdateMaxIndexValue(row map[string]interface{}) {
+	// since internal sql have asc clause, so the last element is largest
+	if q.IndexField != "" {
+		v, found := row[q.IndexField]
+		if !found {
+			return
+		}
+		q.IndexValue = v
+	}
+}

+ 112 - 0
extensions/sqldatabase/sqlgen/sqlServerDialect_test.go

@@ -0,0 +1,112 @@
+// Copyright 2022 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package sqlgen
+
+import (
+	"reflect"
+	"testing"
+)
+
+func TestQueryGenerator_SqlQueryStatement(t *testing.T) {
+	type fields struct {
+		indexSlice          []interface{}
+		InternalSqlQueryCfg *InternalSqlQueryCfg
+	}
+	tests := []struct {
+		name    string
+		fields  fields
+		want    string
+		wantErr bool
+	}{
+		{
+			name: "int index",
+			fields: fields{
+				indexSlice: nil,
+				InternalSqlQueryCfg: &InternalSqlQueryCfg{
+					Table:          "table",
+					Limit:          2,
+					IndexField:     "responseTime",
+					IndexValue:     10,
+					IndexFieldType: "",
+					DateTimeFormat: "",
+				},
+			},
+			want:    "select top 2 * from table where responseTime > '10' order by responseTime ASC",
+			wantErr: false,
+		},
+		{
+			name: "time string index",
+			fields: fields{
+				indexSlice: nil,
+				InternalSqlQueryCfg: &InternalSqlQueryCfg{
+					Table:          "table",
+					Limit:          2,
+					IndexField:     "responseTime",
+					IndexValue:     "2022-04-13 06:22:32.233",
+					IndexFieldType: "DATETIME",
+					DateTimeFormat: "YYYY-MM-dd HH:mm:ssSSS",
+				},
+			},
+			want:    "select top 2 * from table where responseTime > '2022-04-13 06:22:32.233' order by responseTime ASC",
+			wantErr: false,
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			q := &SqlServerQueryGenerator{
+				InternalSqlQueryCfg: tt.fields.InternalSqlQueryCfg,
+			}
+			got, err := q.SqlQueryStatement()
+			if (err != nil) != tt.wantErr {
+				t.Errorf("SqlQueryStatement() error = %v, wantErr %v", err, tt.wantErr)
+				return
+			}
+			if got != tt.want {
+				t.Errorf("SqlQueryStatement() got = %v, want %v", got, tt.want)
+			}
+		})
+	}
+}
+
+func TestInternalQuery(t *testing.T) {
+
+	s := NewSqlServerQuery(&InternalSqlQueryCfg{
+		Table:      "table",
+		Limit:      10,
+		IndexField: "responseTime",
+		IndexValue: 10,
+	})
+
+	s.UpdateMaxIndexValue(map[string]interface{}{
+		"responseTime": 20,
+	})
+	s.UpdateMaxIndexValue(map[string]interface{}{
+		"responseTime": 30,
+	})
+	s.UpdateMaxIndexValue(map[string]interface{}{
+		"responseTime": 40,
+	})
+	s.UpdateMaxIndexValue(map[string]interface{}{
+		"responseTime": 50,
+	})
+
+	nextSqlStr, _ := s.SqlQueryStatement()
+
+	want := "select top 10 * from table where responseTime > '50' order by responseTime ASC"
+
+	if !reflect.DeepEqual(nextSqlStr, want) {
+		t.Errorf("SqlQueryStatement() = %v, want %v", nextSqlStr, want)
+	}
+}

+ 108 - 0
extensions/sqldatabase/sqlgen/sqlgen.go

@@ -0,0 +1,108 @@
+// Copyright 2022 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package sqlgen
+
+import (
+	"fmt"
+	"github.com/lf-edge/ekuiper/pkg/cast"
+	"github.com/xo/dburl"
+)
+
+type SqlQueryGenerator interface {
+	IndexValuer
+	SqlQueryStatement() (string, error)
+	UpdateMaxIndexValue(rows map[string]interface{})
+}
+
+type IndexValuer interface {
+	SetIndexValue(interface{})
+	GetIndexValue() interface{}
+}
+
+const DATETIME_TYPE = "DATETIME"
+
+type InternalSqlQueryCfg struct {
+	Table          string      `json:"table"`
+	Limit          int         `json:"limit"`
+	IndexField     string      `json:"indexField"`
+	IndexValue     interface{} `json:"indexValue"`
+	IndexFieldType string      `json:"indexFieldType"`
+	DateTimeFormat string      `json:"dateTimeFormat"`
+}
+
+func (i *InternalSqlQueryCfg) SetIndexValue(val interface{}) {
+	i.IndexValue = val
+}
+
+func (i *InternalSqlQueryCfg) GetIndexValue() interface{} {
+	return i.IndexValue
+}
+
+type TemplateSqlQueryCfg struct {
+	TemplateSql    string      `json:"templateSql"`
+	IndexField     string      `json:"indexField"`
+	IndexValue     interface{} `json:"indexValue"`
+	IndexFieldType string      `json:"indexFieldType"`
+	DateTimeFormat string      `json:"dateTimeFormat"`
+}
+
+func (i *TemplateSqlQueryCfg) SetIndexValue(val interface{}) {
+	i.IndexValue = val
+}
+
+func (i *TemplateSqlQueryCfg) GetIndexValue() interface{} {
+	return i.IndexValue
+}
+
+type sqlConfig struct {
+	TemplateSqlQueryCfg *TemplateSqlQueryCfg `json:"templateSqlQueryCfg"`
+	InternalSqlQueryCfg *InternalSqlQueryCfg `json:"internalSqlQueryCfg"`
+}
+
+func (cfg *sqlConfig) Init(props map[string]interface{}) error {
+	err := cast.MapToStruct(props, &cfg)
+	if err != nil {
+		return fmt.Errorf("read properties %v fail with error: %v", props, err)
+	}
+
+	if cfg.TemplateSqlQueryCfg == nil && cfg.InternalSqlQueryCfg == nil {
+		return fmt.Errorf("read properties %v fail with error: %v", props, err)
+	}
+	return nil
+}
+
+func GetQueryGenerator(u *dburl.URL, props map[string]interface{}) (SqlQueryGenerator, error) {
+	cfg := &sqlConfig{}
+	err := cfg.Init(props)
+	if err != nil {
+		return nil, err
+	}
+
+	if cfg.TemplateSqlQueryCfg != nil {
+		ge, err := NewTemplateSqlQuery(cfg.TemplateSqlQueryCfg)
+		if err != nil {
+			return nil, err
+		} else {
+			return ge, nil
+		}
+	}
+
+	switch u.Driver {
+	case "sqlserver":
+		return NewSqlServerQuery(cfg.InternalSqlQueryCfg), nil
+	default:
+		return NewCommonSqlQuery(cfg.InternalSqlQueryCfg), nil
+	}
+}

+ 91 - 0
extensions/sqldatabase/sqlgen/templateSqlDialect.go

@@ -0,0 +1,91 @@
+// Copyright 2022 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package sqlgen
+
+import (
+	"bytes"
+	"fmt"
+	"github.com/lf-edge/ekuiper/pkg/cast"
+	"github.com/posener/order"
+	"text/template"
+)
+
+type templateSqlQuery struct {
+	tp *template.Template
+	*TemplateSqlQueryCfg
+}
+
+func NewTemplateSqlQuery(cfg *TemplateSqlQueryCfg) (SqlQueryGenerator, error) {
+	t := &templateSqlQuery{
+		tp:                  nil,
+		TemplateSqlQueryCfg: cfg,
+	}
+
+	if err := t.init(); err != nil {
+		return nil, err
+	} else {
+		return t, nil
+	}
+}
+
+func (t *templateSqlQuery) init() error {
+	tp, err := template.New("sql").Parse(t.TemplateSql)
+	if err != nil {
+		return err
+	}
+	t.tp = tp
+	return nil
+}
+
+func (t *templateSqlQuery) SqlQueryStatement() (string, error) {
+	var val string
+	if t.IndexFieldType == DATETIME_TYPE && t.DateTimeFormat != "" {
+		time, err := cast.InterfaceToTime(t.IndexValue, t.DateTimeFormat)
+		if err != nil {
+			err = fmt.Errorf("SqlQueryStatement InterfaceToTime datetime convert got error %v", err)
+			return "", err
+		}
+		val, err = cast.FormatTime(time, t.DateTimeFormat)
+		if err != nil {
+			err = fmt.Errorf("SqlQueryStatement FormatTime datetime convert got error %v", err)
+			return "", err
+		}
+	} else {
+		val = fmt.Sprintf("%v", t.IndexValue)
+	}
+
+	input := map[string]interface{}{
+		t.IndexField: val,
+	}
+
+	var output bytes.Buffer
+	err := t.tp.Execute(&output, input)
+	if err != nil {
+		return "", err
+	}
+	return string(output.Bytes()), nil
+}
+
+func (t *templateSqlQuery) UpdateMaxIndexValue(row map[string]interface{}) {
+	if t.IndexField != "" {
+		v, found := row[t.IndexField]
+		if !found {
+			return
+		}
+		if val := order.Is(v); val.Greater(t.IndexValue) {
+			t.IndexValue = v
+		}
+	}
+}

+ 124 - 0
extensions/sqldatabase/sqlgen/templateSqlDialect_test.go

@@ -0,0 +1,124 @@
+// Copyright 2022 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package sqlgen
+
+import (
+	"reflect"
+	"testing"
+	"time"
+)
+
+func Test_templateSqlQueryCfg_getSqlQueryStatement(t1 *testing.T) {
+	type fields struct {
+		TemplateSql string
+		FieldName   string
+		FieldValue  interface{}
+	}
+	tests := []struct {
+		name   string
+		fields fields
+		want   error
+		want1  string
+	}{
+		{
+			name: "select * from table",
+			fields: fields{
+				TemplateSql: "select * from table",
+				FieldName:   "",
+				FieldValue:  nil,
+			},
+			want:  nil,
+			want1: "select * from table",
+		},
+
+		{
+			name: "select * from table where id > {{.id}} ",
+			fields: fields{
+				TemplateSql: "select * from table where id > {{.id}}",
+				FieldName:   "id",
+				FieldValue:  100,
+			},
+			want:  nil,
+			want1: "select * from table where id > 100",
+		},
+
+		{
+			name: "select * from table where responseTime > `{{.responseTime}}` ",
+			fields: fields{
+				TemplateSql: "select * from table where responseTime > `{{.responseTime}}`",
+				FieldName:   "responseTime",
+				FieldValue:  "2008-10-29 14:56:59",
+			},
+			want:  nil,
+			want1: "select * from table where responseTime > `2008-10-29 14:56:59`",
+		},
+	}
+	for _, tt := range tests {
+		t1.Run(tt.name, func(t1 *testing.T) {
+			cfg := &TemplateSqlQueryCfg{
+				TemplateSql: tt.fields.TemplateSql,
+				IndexField:  tt.fields.FieldName,
+				IndexValue:  tt.fields.FieldValue,
+			}
+			query, _ := NewTemplateSqlQuery(cfg)
+
+			got1, got := query.SqlQueryStatement()
+			if !reflect.DeepEqual(got, tt.want) {
+				t1.Errorf("SqlQueryStatement() got = %v, want %v", got, tt.want)
+			}
+			if got1 != tt.want1 {
+				t1.Errorf("SqlQueryStatement() got1 = %v, want %v", got1, tt.want1)
+			}
+		})
+	}
+}
+
+func getDatetimeFromstring(dateStr string) time.Time {
+	myDate, _ := time.Parse("2006-01-02 15:04:05", dateStr)
+	return myDate
+}
+
+func TestTemplateQuery(t *testing.T) {
+	cfg := &TemplateSqlQueryCfg{
+		TemplateSql:    "select * from table where responseTime > `{{.responseTime}}`",
+		IndexField:     "responseTime",
+		IndexValue:     getDatetimeFromstring("2008-10-25 14:56:59"),
+		IndexFieldType: DATETIME_TYPE,
+		DateTimeFormat: "YYYY-MM-dd HH:mm:ssSSS",
+	}
+
+	s, _ := NewTemplateSqlQuery(cfg)
+
+	s.UpdateMaxIndexValue(map[string]interface{}{
+		"responseTime": getDatetimeFromstring("2008-10-29 14:56:59"),
+	})
+	s.UpdateMaxIndexValue(map[string]interface{}{
+		"responseTime": getDatetimeFromstring("2008-11-11 11:12:01"),
+	})
+	s.UpdateMaxIndexValue(map[string]interface{}{
+		"responseTime": getDatetimeFromstring("2008-11-09 15:45:21"),
+	})
+	s.UpdateMaxIndexValue(map[string]interface{}{
+		"responseTime": getDatetimeFromstring("2008-11-11 13:23:44"),
+	})
+
+	nextSqlStr, _ := s.SqlQueryStatement()
+
+	want := "select * from table where responseTime > `2008-11-11 13:23:44.000`"
+
+	if !reflect.DeepEqual(nextSqlStr, want) {
+		t.Errorf("SqlQueryStatement() = %v, want %v", nextSqlStr, want)
+	}
+}

+ 2 - 2
go.mod

@@ -21,7 +21,7 @@ require (
 	github.com/jhump/protoreflect v1.8.2
 	github.com/keepeye/logrus-filename v0.0.0-20190711075016-ce01a4391dd1
 	github.com/lestrrat-go/file-rotatelogs v2.4.0+incompatible
-	github.com/mattn/go-sqlite3 v1.14.5
+	github.com/mattn/go-sqlite3 v1.14.12
 	github.com/mitchellh/mapstructure v1.4.1
 	github.com/msgpack-rpc/msgpack-rpc-go v0.0.0-20131026060856-c76397e1782b
 	github.com/pebbe/zmq4 v1.2.7
@@ -30,7 +30,6 @@ require (
 	github.com/ugorji/go/codec v1.2.5
 	github.com/urfave/cli v1.22.0
 	go.nanomsg.org/mangos/v3 v3.2.1
-	golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4
 	google.golang.org/genproto v0.0.0-20210602131652-f16073e35f0c
 	google.golang.org/grpc v1.38.0
 	google.golang.org/protobuf v1.27.1
@@ -73,6 +72,7 @@ require (
 	github.com/x448/float16 v0.8.4 // indirect
 	github.com/yuin/gopher-lua v0.0.0-20200816102855-ee81675732da // indirect
 	golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97 // indirect
+	golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4 // indirect
 	golang.org/x/sys v0.0.0-20210806184541-e5e7981a1069 // indirect
 	golang.org/x/text v0.3.6 // indirect
 )

+ 2 - 2
go.sum

@@ -255,8 +255,8 @@ github.com/mattn/go-isatty v0.0.8/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hd
 github.com/mattn/go-isatty v0.0.10/go.mod h1:qgIWMr58cqv1PHHyhnkY9lrL7etaEgOFcMEpPG5Rm84=
 github.com/mattn/go-isatty v0.0.11/go.mod h1:PhnuNfih5lzO57/f3n+odYbM4JtupLOxQOAqxQCu2WE=
 github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU=
-github.com/mattn/go-sqlite3 v1.14.5 h1:1IdxlwTNazvbKJQSxoJ5/9ECbEeaTTyeU7sEAZ5KKTQ=
-github.com/mattn/go-sqlite3 v1.14.5/go.mod h1:WVKg1VTActs4Qso6iwGbiFih2UIHo0ENGwNd0Lj+XmI=
+github.com/mattn/go-sqlite3 v1.14.12 h1:TJ1bhYJPV44phC+IMu1u2K/i5RriLTPe+yc68XDJ1Z0=
+github.com/mattn/go-sqlite3 v1.14.12/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU=
 github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU=
 github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
 github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg=

+ 2 - 0
internal/pkg/store/encoding/encoding.go

@@ -17,10 +17,12 @@ package encoding
 import (
 	"bytes"
 	"encoding/gob"
+	"time"
 )
 
 func Encode(value interface{}) (error, []byte) {
 	var buff bytes.Buffer
+	gob.Register(time.Time{})
 	gob.Register(value)
 	enc := gob.NewEncoder(&buff)
 	if err := enc.Encode(value); err != nil {

+ 1 - 1
internal/server/meta_plugin_init.go

@@ -87,7 +87,7 @@ func prebuildPluginsHandler(w http.ResponseWriter, r *http.Request, t plugin.Plu
 	}
 }
 
-var NativeSourcePlugin = []string{"random", "zmq"}
+var NativeSourcePlugin = []string{"random", "zmq", "sql"}
 var NativeSinkPlugin = []string{"file", "image", "influx", "redis", "tdengine", "zmq"}
 var NativeFunctionPlugin = []string{"accumulateWordCount", "countPlusOne", "echo", "geohash", "image", "labelImage"}
 

+ 81 - 1
pkg/cast/time_test.go

@@ -1,4 +1,4 @@
-// Copyright 2021 EMQ Technologies Co., Ltd.
+// Copyright 2022 EMQ Technologies Co., Ltd.
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
@@ -15,6 +15,7 @@
 package cast
 
 import (
+	"reflect"
 	"testing"
 	"time"
 )
@@ -40,3 +41,82 @@ func TestDateToAndFromMilli(t *testing.T) {
 		}
 	}
 }
+
+func TestFormatTime(t *testing.T) {
+	type args struct {
+		time time.Time
+		f    string
+	}
+	tests := []struct {
+		name    string
+		args    args
+		want    string
+		wantErr bool
+	}{
+		{
+			name: "test1",
+			args: args{
+				time: time.Date(2020, time.January, 16, 2, 14, 24, 913000000, time.UTC),
+				f:    "YYYY-MM-dd HH:mm:ssSSS",
+			},
+			want:    "2020-01-16 02:14:24.913",
+			wantErr: false,
+		},
+		{
+			name: "test1",
+			args: args{
+				time: time.Date(2020, time.January, 16, 2, 14, 24, 913000000, time.UTC),
+				f:    "YYYY-MM-dd T HH:mm:ss",
+			},
+			want:    "2020-01-16 T 02:14:24",
+			wantErr: false,
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			got, err := FormatTime(tt.args.time, tt.args.f)
+			if (err != nil) != tt.wantErr {
+				t.Errorf("FormatTime() error = %v, wantErr %v", err, tt.wantErr)
+				return
+			}
+			if got != tt.want {
+				t.Errorf("FormatTime() got = %v, want %v", got, tt.want)
+			}
+		})
+	}
+}
+
+func TestInterfaceToTime(t *testing.T) {
+	type args struct {
+		i      interface{}
+		format string
+	}
+	tests := []struct {
+		name    string
+		args    args
+		want    time.Time
+		wantErr bool
+	}{
+		{
+			name: "test string",
+			args: args{
+				i:      "2022-04-13 06:22:32.233",
+				format: "YYYY-MM-dd HH:mm:ssSSS",
+			},
+			want:    time.Date(2022, time.April, 13, 6, 22, 32, 233000000, time.UTC),
+			wantErr: false,
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			got, err := InterfaceToTime(tt.args.i, tt.args.format)
+			if (err != nil) != tt.wantErr {
+				t.Errorf("InterfaceToTime() error = %v, wantErr %v", err, tt.wantErr)
+				return
+			}
+			if !reflect.DeepEqual(got, tt.want) {
+				t.Errorf("InterfaceToTime() got = %v, want %v", got, tt.want)
+			}
+		})
+	}
+}