浏览代码

Merge pull request #603 from emqx/dev/1.0.1

Release 1.0.1
jinfahua 4 年之前
父节点
当前提交
4f877590e0

+ 8 - 5
.ci/Dockerfile-plugins

@@ -19,11 +19,14 @@ RUN set -e -u -x \
            ;; \
          tdengine ) \
            if [ "$(uname -m)" = "x86_64" ]; then \
-             wget "https://www.taosdata.com/download/download-all.php?pkgType=tdengine_linux&pkgName=TDengine-client-2.0.3.1-Linux-x64.tar.gz" -O /tmp/TDengine-client-2.0.3.1-Linux-x64.tar.gz \
-             && tar -zxvf /tmp/TDengine-client-2.0.3.1-Linux-x64.tar.gz \
-             && cd TDengine-client && ./install_client.sh && cd - \
-             && go build --buildmode=plugin -tags plugins -o plugins/$PLUGIN_TYPE/$PLUGIN_NAME/$PLUGIN_NAME@$VERSION.so plugins/$PLUGIN_TYPE/$PLUGIN_NAME/$PLUGIN_NAME.go; \
-           fi \
+             wget "https://www.taosdata.com/download/download-all.php?pkgType=tdengine_linux&pkgName=TDengine-client-2.0.6.0-Linux-x64.tar.gz" -O /tmp/TDengine-client-2.0.6.0.tar.gz; \
+           fi; \
+           if [ "$(uname -m)" = "aarch64" ]; then \
+             wget "https://www.taosdata.com/download/download-all.php?pkgType=tdengine_linux&pkgName=TDengine-client-2.0.6.0-Linux-aarch64.tar.gz" -O /tmp/TDengine-client-2.0.6.0.tar.gz; \
+           fi; \
+           tar -zxvf /tmp/TDengine-client-2.0.6.0.tar.gz \
+           && cd TDengine-client-2.0.6.0 && ./install_client.sh && cd - \
+           && go build --buildmode=plugin -tags plugins -o plugins/$PLUGIN_TYPE/$PLUGIN_NAME/$PLUGIN_NAME@$VERSION.so plugins/$PLUGIN_TYPE/$PLUGIN_NAME/$PLUGIN_NAME.go \
            ;; \
          * ) \
            go build --buildmode=plugin -o plugins/$PLUGIN_TYPE/$PLUGIN_NAME/$PLUGIN_NAME@$VERSION.so plugins/$PLUGIN_TYPE/$PLUGIN_NAME/$PLUGIN_NAME.go \

+ 12 - 7
.github/workflows/build_packages.yaml

@@ -262,13 +262,12 @@ jobs:
             cd _packages && for var in $( ls |grep -v sha256); do
               echo "$(cat $var.sha256) $var" | sha256sum -c || exit 1
             done
-        - name: update github release
+        - uses: zhanghongtong/upload-release-asset@v1
           if: github.event_name == 'release'
-          run: |
-            version=$(echo ${{ github.ref }} | sed -r  "s .*/.*/(.*) \1 g")
-            for var in $(ls _packages) ; do
-              .github/workflows/script/upload_github_release_asset.sh owner=emqx repo=kuiper tag=$version filename=_packages/$var github_api_token=$(echo ${{ secrets.AccessToken }})
-            done
+          with:
+            repo: kuiper
+            path: "_packages/kuiper-*"
+            token: ${{ secrets.AccessToken }}
         - name: create invalidation for cloudfront
           if: github.event_name == 'release'
           run: |
@@ -287,7 +286,13 @@ jobs:
           if: github.event_name == 'release'
           run: |
             version=$(echo ${{ github.ref }} | sed -r  "s .*/.*/(.*) \1 g")
-            curl -w %{http_code} --insecure -H ${{ secrets.EmqxHeader }} https://admin.emqx.io/admin_api/v1/kuiper_github_release_callback?tag=$version
+            curl -w %{http_code} \
+               --insecure \
+               -H "Content-Type: application/json" \
+               -H "token: ${{ secrets.EMQX_IO_TOKEN }}" \
+               -X POST \
+               -d "{\"repo\":\"emqx/kuiper\", \"tag\": \"${version}\" }" \
+               ${{ secrets.EMQX_IO_RELEASE_API }}
         - name: update helm packages
           if: github.event_name == 'release'
           run: |

+ 4 - 4
.github/workflows/run_fvt_tests.yaml

@@ -23,9 +23,9 @@ jobs:
         - name: install jmeter
           timeout-minutes: 10
           env:
-            JMETER_VERSION: 5.2.1
+              JMETER_VERSION: 5.3
           run: |
-            wget --no-check-certificate -O /tmp/apache-jmeter.tgz http://us.mirrors.quenda.co/apache//jmeter/binaries/apache-jmeter-$JMETER_VERSION.tgz --no-check-certificate
+            wget --no-check-certificate -O /tmp/apache-jmeter.tgz https://downloads.apache.org//jmeter/binaries/apache-jmeter-$JMETER_VERSION.tgz
             cd /tmp && tar -xvf apache-jmeter.tgz
             echo "jmeter.save.saveservice.output_format=xml" >> /tmp/apache-jmeter-$JMETER_VERSION/user.properties
             echo "jmeter.save.saveservice.response_data.on_error=true" >> /tmp/apache-jmeter-$JMETER_VERSION/user.properties
@@ -84,9 +84,9 @@ jobs:
       - name: install jmeter
         timeout-minutes: 10
         env:
-          JMETER_VERSION: 5.2.1
+            JMETER_VERSION: 5.3
         run: |
-          wget --no-check-certificate -O /tmp/apache-jmeter.tgz http://us.mirrors.quenda.co/apache//jmeter/binaries/apache-jmeter-$JMETER_VERSION.tgz
+          wget --no-check-certificate -O /tmp/apache-jmeter.tgz https://downloads.apache.org//jmeter/binaries/apache-jmeter-$JMETER_VERSION.tgz
           cd /tmp && tar -xvf apache-jmeter.tgz
           echo "jmeter.save.saveservice.output_format=xml" >> /tmp/apache-jmeter-$JMETER_VERSION/user.properties
           echo "jmeter.save.saveservice.response_data.on_error=true" >> /tmp/apache-jmeter-$JMETER_VERSION/user.properties

+ 0 - 64
.github/workflows/script/upload_github_release_asset.sh

@@ -1,64 +0,0 @@
-#!/usr/bin/env bash
-#
-# Author: Stefan Buck
-# License: MIT
-# https://gist.github.com/stefanbuck/ce788fee19ab6eb0b4447a85fc99f447
-#
-#
-# This script accepts the following parameters:
-#
-# * owner
-# * repo
-# * tag
-# * filename
-# * github_api_token
-#
-# Script to upload a release asset using the GitHub API v3.
-#
-# Example:
-#
-# upload_github_release_asset.sh github_api_token=TOKEN owner=stefanbuck repo=playground tag=v0.1.0 filename=./build.zip
-#
-
-# Check dependencies.
-set -e
-xargs=$(which gxargs || which xargs)
-
-# Validate settings.
-[ "$TRACE" ] && set -x
-
-CONFIG=$@
-
-for line in $CONFIG; do
-  eval "$line"
-done
-
-# Define variables.
-GH_API="https://api.github.com"
-GH_REPO="$GH_API/repos/$owner/$repo"
-GH_TAGS="$GH_REPO/releases/tags/$tag"
-AUTH="Authorization: token $github_api_token"
-WGET_ARGS="--content-disposition --auth-no-challenge --no-cookie"
-CURL_ARGS="-LJO#"
-
-if [[ "$tag" == 'LATEST' ]]; then
-  GH_TAGS="$GH_REPO/releases/latest"
-fi
-
-# Validate token.
-curl -o /dev/null -sH "$AUTH" $GH_REPO || { echo "Error: Invalid repo, token or network issue!";  exit 1; }
-
-# Read asset tags.
-response=$(curl -sH "$AUTH" $GH_TAGS)
-
-# Get ID of the asset based on given filename.
-eval $(echo "$response" | grep -m 1 "id.:" | grep -w id | tr : = | tr -cd '[[:alnum:]]=')
-[ "$id" ] || { echo "Error: Failed to get release id for tag: $tag"; echo "$response" | awk 'length($0)<100' >&2; exit 1; }
-
-# Upload asset
-echo "Uploading asset... "
-
-# Construct url
-GH_ASSET="https://uploads.github.com/repos/$owner/$repo/releases/$id/assets?name=$(basename $filename)"
-
-curl "$GITHUB_OAUTH_BASIC" --data-binary @"$filename" -H "Authorization: token $github_api_token" -H "Content-Type: application/octet-stream" $GH_ASSET

+ 6 - 3
Makefile

@@ -182,7 +182,7 @@ PLUGINS := sinks/file \
 plugins: cross_prepare sinks/tdengine $(PLUGINS)
 sinks/tdengine:
 	@docker buildx build --no-cache \
-    --platform=linux/amd64 \
+    --platform=linux/amd64,linux/arm64 \
     -t cross_build \
     --build-arg VERSION=$(VERSION) \
     --build-arg PLUGIN_TYPE=sinks \
@@ -191,9 +191,12 @@ sinks/tdengine:
     -f .ci/Dockerfile-plugins .
 
 	@mkdir -p _plugins/debian/sinks
-	@tar -xvf /tmp/cross_build_plugins_sinks_tdengine.tar --wildcards "go/kuiper/plugins/sinks/tdengine/tdengine_amd64.zip" \
-	&& mv go/kuiper/plugins/sinks/tdengine/tdengine_amd64.zip _plugins/debian/sinks
+	@for arch in amd64 arm64; do \
+		tar -xvf /tmp/cross_build_plugins_sinks_tdengine.tar --wildcards "linux_$${arch}/go/kuiper/plugins/sinks/tdengine/tdengine_$$(echo $${arch%%_*}).zip" \
+		&& mv $$(ls linux_$${arch}/go/kuiper/plugins/sinks/tdengine/tdengine_$$(echo $${arch%%_*}).zip) _plugins/debian/sinks; \
+	done
 	@rm -f /tmp/cross_build_plugins_sinks_tdengine.tar
+
 $(PLUGINS): PLUGIN_TYPE = $(word 1, $(subst /, , $@))
 $(PLUGINS): PLUGIN_NAME = $(word 2, $(subst /, , $@))
 $(PLUGINS):

+ 20 - 19
common/util.go

@@ -15,19 +15,22 @@ import (
 	"path"
 	"path/filepath"
 	//"runtime"
+	logrus_syslog "github.com/sirupsen/logrus/hooks/syslog"
+	"log/syslog"
 	"sort"
 	"strings"
 	"sync"
 )
 
 const (
-	logFileName   = "stream.log"
-	etc_dir       = "/etc/"
-	data_dir      = "/data/"
-	log_dir       = "/log/"
-	StreamConf    = "kuiper.yaml"
-	KuiperBaseKey = "KuiperBaseKey"
-	MetaKey       = "__meta"
+	logFileName     = "stream.log"
+	etc_dir         = "/etc/"
+	data_dir        = "/data/"
+	log_dir         = "/log/"
+	StreamConf      = "kuiper.yaml"
+	KuiperBaseKey   = "KuiperBaseKey"
+	KuiperSyslogKey = "KuiperSyslogKey"
+	MetaKey         = "__meta"
 )
 
 var (
@@ -79,21 +82,19 @@ type KuiperConf struct {
 }
 
 func init() {
+	Log = logrus.New()
+	if "true" == os.Getenv(KuiperSyslogKey) {
+		if hook, err := logrus_syslog.NewSyslogHook("", "", syslog.LOG_INFO, ""); err != nil {
+			Log.Error("Unable to connect to local syslog daemon")
+		} else {
+			Log.AddHook(hook)
+		}
+	}
+
 	filenameHook := filename.NewHook()
 	filenameHook.Field = "file"
-	Log = logrus.New()
 	Log.AddHook(filenameHook)
-	/*
-		Log.SetReportCaller(true)
-			Log.SetFormatter(&logrus.TextFormatter{
-				CallerPrettyfier: func(f *runtime.Frame) (string, string) {
-					filename := path.Base(f.File)
-					return "", fmt.Sprintf("%s:%d", filename, f.Line)
-				},
-				DisableColors: true,
-				FullTimestamp: true,
-			})
-	*/
+
 	Log.SetFormatter(&logrus.TextFormatter{
 		TimestampFormat: "2006-01-02 15:04:05",
 		DisableColors:   true,

文件差异内容过多而无法显示
+ 4 - 0
docs/en_US/edgex/edgex_rule_engine_tutorial.md


+ 2 - 1
docs/en_US/operation/configuration_file.md

@@ -12,7 +12,8 @@ basic:
   # true|false, if it's set to true, then the log will be print to log file
   fileLog: true
 ```
-
+## system log
+When the user sets the value of the environment variable named KuiperSyslogKey to true, the log will be printed to the syslog.
 ## Cli Port
 ```yaml
 basic:

文件差异内容过多而无法显示
+ 4 - 0
docs/zh_CN/edgex/edgex_rule_engine_tutorial.md


+ 2 - 0
docs/zh_CN/operation/configuration_file.md

@@ -12,6 +12,8 @@ basic:
   # true|false, if it's set to true, then the log will be print to log file
   fileLog: true
 ```
+## 系统日志
+用户将名为 KuiperSyslogKey 的环境变量的值设置为 true 时,日志将打印到系统日志中。
 ## Cli 端口
 ```yaml
 basic:

+ 3 - 3
etc/sinks/rest.json

@@ -95,10 +95,10 @@
     },
     {
       "name": "headers",
-      "default": "",
+      "default": [],
       "optional": true,
-      "control": "text",
-      "type": "string",
+      "control": "list",
+      "type": "list_object",
       "hint": {
         "en_US": "The additional headers to be set for the HTTP request.",
         "zh_CN": "要为 HTTP 请求设置的其他标头"

+ 5 - 1
plugins/sinks/tdengine/install.sh

@@ -8,6 +8,9 @@ then
 fi
 
 url="https://www.taosdata.com/download/download-all.php?pkgType=tdengine_linux&pkgName=TDengine-client-$1-Linux-x64.tar.gz"
+if [ "$(uname -m)" = "aarch64" ]; then \
+	url="https://www.taosdata.com/download/download-all.php?pkgType=tdengine_linux&pkgName=TDengine-client-$1-Linux-aarch64.tar.gz"
+fi
 zip="TDengine-client.tar.gz"
 wget -T 280 -O "$zip" "$url"
 
@@ -18,7 +21,8 @@ then
 fi
 
 dir="TDengine-client"
-tar -zxvf "$zip"
+mkdir "$dir"
+tar -xzvf "$zip" -C ./"$dir" --strip-components 1
 rm "$zip"
 
 if ! [ -e $dir ]

+ 2 - 2
xsql/ast.go

@@ -314,10 +314,10 @@ type StreamField struct {
 
 func (u *StreamField) MarshalJSON() ([]byte, error) {
 	return json.Marshal(&struct {
-		FieldType string
+		FieldType interface{}
 		Name      string
 	}{
-		FieldType: PrintFieldType(u.FieldType),
+		FieldType: PrintFieldTypeForJson(u.FieldType),
 		Name:      u.Name,
 	})
 }

+ 40 - 0
xsql/ast_test.go

@@ -1,6 +1,7 @@
 package xsql
 
 import (
+	"encoding/json"
 	"fmt"
 	"reflect"
 	"testing"
@@ -108,3 +109,42 @@ func Test_MessageValTest(t *testing.T) {
 		}
 	}
 }
+
+func Test_StreamFieldsMarshall(t *testing.T) {
+	var tests = []struct {
+		sf StreamFields
+		r  string
+	}{{
+		sf: []StreamField{
+			{Name: "USERID", FieldType: &BasicType{Type: BIGINT}},
+			{Name: "FIRST_NAME", FieldType: &BasicType{Type: STRINGS}},
+			{Name: "LAST_NAME", FieldType: &BasicType{Type: STRINGS}},
+			{Name: "NICKNAMES", FieldType: &ArrayType{Type: STRINGS}},
+			{Name: "Gender", FieldType: &BasicType{Type: BOOLEAN}},
+			{Name: "ADDRESS", FieldType: &RecType{
+				StreamFields: []StreamField{
+					{Name: "STREET_NAME", FieldType: &BasicType{Type: STRINGS}},
+					{Name: "NUMBER", FieldType: &BasicType{Type: BIGINT}},
+				},
+			}},
+		},
+		r: `[{"FieldType":"bigint","Name":"USERID"},{"FieldType":"string","Name":"FIRST_NAME"},{"FieldType":"string","Name":"LAST_NAME"},{"FieldType":{"Type":"array","ElementType":"string"},"Name":"NICKNAMES"},{"FieldType":"boolean","Name":"Gender"},{"FieldType":{"Type":"struct","Fields":[{"FieldType":"string","Name":"STREET_NAME"},{"FieldType":"bigint","Name":"NUMBER"}]},"Name":"ADDRESS"}]`,
+	}, {
+		sf: []StreamField{
+			{Name: "USERID", FieldType: &BasicType{Type: BIGINT}},
+		},
+		r: `[{"FieldType":"bigint","Name":"USERID"}]`,
+	}}
+	fmt.Printf("The test bucket size is %d.\n\n", len(tests))
+	for i, tt := range tests {
+		r, err := json.Marshal(tt.sf)
+		if err != nil {
+			t.Errorf("%d. \nmarshall error: %v", i, err)
+			t.FailNow()
+		}
+		result := string(r)
+		if !reflect.DeepEqual(tt.r, result) {
+			t.Errorf("%d. \nstmt mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.r, result)
+		}
+	}
+}

+ 1 - 1
xsql/processors/common_test.go

@@ -71,7 +71,7 @@ func compareMetrics(tp *xstream.TopologyNew, m map[string]interface{}) (err erro
 		)
 		for index, key = range keys {
 			if k == key {
-				if strings.HasSuffix(k, "process_latency_ms") {
+				if strings.HasSuffix(k, "process_latency_us") {
 					if values[index].(int64) >= v.(int64) {
 						matched = true
 						continue

+ 6 - 6
xsql/processors/extension_test.go

@@ -180,12 +180,12 @@ func TestFuncState(t *testing.T) {
 			},
 			m: map[string]interface{}{
 				"op_preprocessor_text_0_exceptions_total":   int64(0),
-				"op_preprocessor_text_0_process_latency_ms": int64(0),
+				"op_preprocessor_text_0_process_latency_us": int64(0),
 				"op_preprocessor_text_0_records_in_total":   int64(8),
 				"op_preprocessor_text_0_records_out_total":  int64(8),
 
 				"op_project_0_exceptions_total":   int64(0),
-				"op_project_0_process_latency_ms": int64(0),
+				"op_project_0_process_latency_us": int64(0),
 				"op_project_0_records_in_total":   int64(8),
 				"op_project_0_records_out_total":  int64(8),
 
@@ -245,12 +245,12 @@ func TestFuncStateCheckpoint(t *testing.T) {
 				},
 				m: map[string]interface{}{
 					"op_preprocessor_text_0_exceptions_total":   int64(0),
-					"op_preprocessor_text_0_process_latency_ms": int64(0),
+					"op_preprocessor_text_0_process_latency_us": int64(0),
 					"op_preprocessor_text_0_records_in_total":   int64(6),
 					"op_preprocessor_text_0_records_out_total":  int64(6),
 
 					"op_project_0_exceptions_total":   int64(0),
-					"op_project_0_process_latency_ms": int64(0),
+					"op_project_0_process_latency_us": int64(0),
 					"op_project_0_records_in_total":   int64(6),
 					"op_project_0_records_out_total":  int64(6),
 
@@ -267,12 +267,12 @@ func TestFuncStateCheckpoint(t *testing.T) {
 			cc:        1,
 			pauseMetric: map[string]interface{}{
 				"op_preprocessor_text_0_exceptions_total":   int64(0),
-				"op_preprocessor_text_0_process_latency_ms": int64(0),
+				"op_preprocessor_text_0_process_latency_us": int64(0),
 				"op_preprocessor_text_0_records_in_total":   int64(3),
 				"op_preprocessor_text_0_records_out_total":  int64(3),
 
 				"op_project_0_exceptions_total":   int64(0),
-				"op_project_0_process_latency_ms": int64(0),
+				"op_project_0_process_latency_us": int64(0),
 				"op_project_0_records_in_total":   int64(3),
 				"op_project_0_records_out_total":  int64(3),
 

+ 33 - 33
xsql/processors/rule_test.go

@@ -45,12 +45,12 @@ func TestSingleSQL(t *testing.T) {
 			},
 			m: map[string]interface{}{
 				"op_preprocessor_demo_0_exceptions_total":   int64(0),
-				"op_preprocessor_demo_0_process_latency_ms": int64(0),
+				"op_preprocessor_demo_0_process_latency_us": int64(0),
 				"op_preprocessor_demo_0_records_in_total":   int64(5),
 				"op_preprocessor_demo_0_records_out_total":  int64(5),
 
 				"op_project_0_exceptions_total":   int64(0),
-				"op_project_0_process_latency_ms": int64(0),
+				"op_project_0_process_latency_us": int64(0),
 				"op_project_0_records_in_total":   int64(5),
 				"op_project_0_records_out_total":  int64(5),
 
@@ -85,12 +85,12 @@ func TestSingleSQL(t *testing.T) {
 			},
 			m: map[string]interface{}{
 				"op_preprocessor_demo_0_exceptions_total":   int64(0),
-				"op_preprocessor_demo_0_process_latency_ms": int64(0),
+				"op_preprocessor_demo_0_process_latency_us": int64(0),
 				"op_preprocessor_demo_0_records_in_total":   int64(5),
 				"op_preprocessor_demo_0_records_out_total":  int64(5),
 
 				"op_project_0_exceptions_total":   int64(0),
-				"op_project_0_process_latency_ms": int64(0),
+				"op_project_0_process_latency_us": int64(0),
 				"op_project_0_records_in_total":   int64(2),
 				"op_project_0_records_out_total":  int64(2),
 
@@ -103,7 +103,7 @@ func TestSingleSQL(t *testing.T) {
 				"source_demo_0_records_out_total": int64(5),
 
 				"op_filter_0_exceptions_total":   int64(0),
-				"op_filter_0_process_latency_ms": int64(0),
+				"op_filter_0_process_latency_us": int64(0),
 				"op_filter_0_records_in_total":   int64(5),
 				"op_filter_0_records_out_total":  int64(2),
 			},
@@ -122,12 +122,12 @@ func TestSingleSQL(t *testing.T) {
 			},
 			m: map[string]interface{}{
 				"op_preprocessor_demo_0_exceptions_total":   int64(0),
-				"op_preprocessor_demo_0_process_latency_ms": int64(0),
+				"op_preprocessor_demo_0_process_latency_us": int64(0),
 				"op_preprocessor_demo_0_records_in_total":   int64(5),
 				"op_preprocessor_demo_0_records_out_total":  int64(5),
 
 				"op_project_0_exceptions_total":   int64(0),
-				"op_project_0_process_latency_ms": int64(0),
+				"op_project_0_process_latency_us": int64(0),
 				"op_project_0_records_in_total":   int64(2),
 				"op_project_0_records_out_total":  int64(2),
 
@@ -140,7 +140,7 @@ func TestSingleSQL(t *testing.T) {
 				"source_demo_0_records_out_total": int64(5),
 
 				"op_filter_0_exceptions_total":   int64(0),
-				"op_filter_0_process_latency_ms": int64(0),
+				"op_filter_0_process_latency_us": int64(0),
 				"op_filter_0_records_in_total":   int64(5),
 				"op_filter_0_records_out_total":  int64(2),
 			},
@@ -164,12 +164,12 @@ func TestSingleSQL(t *testing.T) {
 			},
 			m: map[string]interface{}{
 				"op_preprocessor_demoError_0_exceptions_total":   int64(3),
-				"op_preprocessor_demoError_0_process_latency_ms": int64(0),
+				"op_preprocessor_demoError_0_process_latency_us": int64(0),
 				"op_preprocessor_demoError_0_records_in_total":   int64(5),
 				"op_preprocessor_demoError_0_records_out_total":  int64(2),
 
 				"op_project_0_exceptions_total":   int64(3),
-				"op_project_0_process_latency_ms": int64(0),
+				"op_project_0_process_latency_us": int64(0),
 				"op_project_0_records_in_total":   int64(4),
 				"op_project_0_records_out_total":  int64(1),
 
@@ -182,7 +182,7 @@ func TestSingleSQL(t *testing.T) {
 				"source_demoError_0_records_out_total": int64(5),
 
 				"op_filter_0_exceptions_total":   int64(3),
-				"op_filter_0_process_latency_ms": int64(0),
+				"op_filter_0_process_latency_us": int64(0),
 				"op_filter_0_records_in_total":   int64(5),
 				"op_filter_0_records_out_total":  int64(1),
 			},
@@ -206,12 +206,12 @@ func TestSingleSQL(t *testing.T) {
 			},
 			m: map[string]interface{}{
 				"op_preprocessor_demoError_0_exceptions_total":   int64(3),
-				"op_preprocessor_demoError_0_process_latency_ms": int64(0),
+				"op_preprocessor_demoError_0_process_latency_us": int64(0),
 				"op_preprocessor_demoError_0_records_in_total":   int64(5),
 				"op_preprocessor_demoError_0_records_out_total":  int64(2),
 
 				"op_project_0_exceptions_total":   int64(3),
-				"op_project_0_process_latency_ms": int64(0),
+				"op_project_0_process_latency_us": int64(0),
 				"op_project_0_records_in_total":   int64(4),
 				"op_project_0_records_out_total":  int64(1),
 
@@ -224,7 +224,7 @@ func TestSingleSQL(t *testing.T) {
 				"source_demoError_0_records_out_total": int64(5),
 
 				"op_filter_0_exceptions_total":   int64(3),
-				"op_filter_0_process_latency_ms": int64(0),
+				"op_filter_0_process_latency_us": int64(0),
 				"op_filter_0_records_in_total":   int64(5),
 				"op_filter_0_records_out_total":  int64(1),
 			},
@@ -255,12 +255,12 @@ func TestSingleSQL(t *testing.T) {
 			},
 			m: map[string]interface{}{
 				"op_preprocessor_demo_0_exceptions_total":   int64(0),
-				"op_preprocessor_demo_0_process_latency_ms": int64(0),
+				"op_preprocessor_demo_0_process_latency_us": int64(0),
 				"op_preprocessor_demo_0_records_in_total":   int64(5),
 				"op_preprocessor_demo_0_records_out_total":  int64(5),
 
 				"op_project_0_exceptions_total":   int64(0),
-				"op_project_0_process_latency_ms": int64(0),
+				"op_project_0_process_latency_us": int64(0),
 				"op_project_0_records_in_total":   int64(5),
 				"op_project_0_records_out_total":  int64(5),
 
@@ -287,12 +287,12 @@ func TestSingleSQL(t *testing.T) {
 			},
 			m: map[string]interface{}{
 				"op_preprocessor_demo_0_exceptions_total":   int64(0),
-				"op_preprocessor_demo_0_process_latency_ms": int64(0),
+				"op_preprocessor_demo_0_process_latency_us": int64(0),
 				"op_preprocessor_demo_0_records_in_total":   int64(5),
 				"op_preprocessor_demo_0_records_out_total":  int64(5),
 
 				"op_project_0_exceptions_total":   int64(0),
-				"op_project_0_process_latency_ms": int64(0),
+				"op_project_0_process_latency_us": int64(0),
 				"op_project_0_records_in_total":   int64(2),
 				"op_project_0_records_out_total":  int64(2),
 
@@ -305,7 +305,7 @@ func TestSingleSQL(t *testing.T) {
 				"source_demo_0_records_out_total": int64(5),
 
 				"op_filter_0_exceptions_total":   int64(0),
-				"op_filter_0_process_latency_ms": int64(0),
+				"op_filter_0_process_latency_us": int64(0),
 				"op_filter_0_records_in_total":   int64(5),
 				"op_filter_0_records_out_total":  int64(2),
 			},
@@ -331,12 +331,12 @@ func TestSingleSQL(t *testing.T) {
 			},
 			m: map[string]interface{}{
 				"op_preprocessor_demo1_0_exceptions_total":   int64(0),
-				"op_preprocessor_demo1_0_process_latency_ms": int64(0),
+				"op_preprocessor_demo1_0_process_latency_us": int64(0),
 				"op_preprocessor_demo1_0_records_in_total":   int64(5),
 				"op_preprocessor_demo1_0_records_out_total":  int64(5),
 
 				"op_project_0_exceptions_total":   int64(0),
-				"op_project_0_process_latency_ms": int64(0),
+				"op_project_0_process_latency_us": int64(0),
 				"op_project_0_records_in_total":   int64(5),
 				"op_project_0_records_out_total":  int64(5),
 
@@ -367,17 +367,17 @@ func TestSingleSQL(t *testing.T) {
 			},
 			m: map[string]interface{}{
 				"op_preprocessor_demo1_0_exceptions_total":   int64(0),
-				"op_preprocessor_demo1_0_process_latency_ms": int64(0),
+				"op_preprocessor_demo1_0_process_latency_us": int64(0),
 				"op_preprocessor_demo1_0_records_in_total":   int64(5),
 				"op_preprocessor_demo1_0_records_out_total":  int64(5),
 
 				"op_project_0_exceptions_total":   int64(0),
-				"op_project_0_process_latency_ms": int64(0),
+				"op_project_0_process_latency_us": int64(0),
 				"op_project_0_records_in_total":   int64(2),
 				"op_project_0_records_out_total":  int64(2),
 
 				"op_filter_0_exceptions_total":   int64(0),
-				"op_filter_0_process_latency_ms": int64(0),
+				"op_filter_0_process_latency_us": int64(0),
 				"op_filter_0_records_in_total":   int64(5),
 				"op_filter_0_records_out_total":  int64(2),
 
@@ -433,12 +433,12 @@ func TestSingleSQLError(t *testing.T) {
 			},
 			m: map[string]interface{}{
 				"op_preprocessor_ldemo_0_exceptions_total":   int64(0),
-				"op_preprocessor_ldemo_0_process_latency_ms": int64(0),
+				"op_preprocessor_ldemo_0_process_latency_us": int64(0),
 				"op_preprocessor_ldemo_0_records_in_total":   int64(5),
 				"op_preprocessor_ldemo_0_records_out_total":  int64(5),
 
 				"op_project_0_exceptions_total":   int64(1),
-				"op_project_0_process_latency_ms": int64(0),
+				"op_project_0_process_latency_us": int64(0),
 				"op_project_0_records_in_total":   int64(3),
 				"op_project_0_records_out_total":  int64(2),
 
@@ -451,7 +451,7 @@ func TestSingleSQLError(t *testing.T) {
 				"source_ldemo_0_records_out_total": int64(5),
 
 				"op_filter_0_exceptions_total":   int64(1),
-				"op_filter_0_process_latency_ms": int64(0),
+				"op_filter_0_process_latency_us": int64(0),
 				"op_filter_0_records_in_total":   int64(5),
 				"op_filter_0_records_out_total":  int64(2),
 			},
@@ -475,12 +475,12 @@ func TestSingleSQLError(t *testing.T) {
 			},
 			m: map[string]interface{}{
 				"op_preprocessor_ldemo_0_exceptions_total":   int64(0),
-				"op_preprocessor_ldemo_0_process_latency_ms": int64(0),
+				"op_preprocessor_ldemo_0_process_latency_us": int64(0),
 				"op_preprocessor_ldemo_0_records_in_total":   int64(5),
 				"op_preprocessor_ldemo_0_records_out_total":  int64(5),
 
 				"op_project_0_exceptions_total":   int64(1),
-				"op_project_0_process_latency_ms": int64(0),
+				"op_project_0_process_latency_us": int64(0),
 				"op_project_0_records_in_total":   int64(5),
 				"op_project_0_records_out_total":  int64(4),
 
@@ -533,12 +533,12 @@ func TestSingleSQLTemplate(t *testing.T) {
 			},
 			m: map[string]interface{}{
 				"op_preprocessor_demo_0_exceptions_total":   int64(0),
-				"op_preprocessor_demo_0_process_latency_ms": int64(0),
+				"op_preprocessor_demo_0_process_latency_us": int64(0),
 				"op_preprocessor_demo_0_records_in_total":   int64(5),
 				"op_preprocessor_demo_0_records_out_total":  int64(5),
 
 				"op_project_0_exceptions_total":   int64(0),
-				"op_project_0_process_latency_ms": int64(0),
+				"op_project_0_process_latency_us": int64(0),
 				"op_project_0_records_in_total":   int64(5),
 				"op_project_0_records_out_total":  int64(5),
 
@@ -591,12 +591,12 @@ func TestNoneSingleSQLTemplate(t *testing.T) {
 			},
 			m: map[string]interface{}{
 				"op_preprocessor_demo_0_exceptions_total":   int64(0),
-				"op_preprocessor_demo_0_process_latency_ms": int64(0),
+				"op_preprocessor_demo_0_process_latency_us": int64(0),
 				"op_preprocessor_demo_0_records_in_total":   int64(5),
 				"op_preprocessor_demo_0_records_out_total":  int64(5),
 
 				"op_project_0_exceptions_total":   int64(0),
-				"op_project_0_process_latency_ms": int64(0),
+				"op_project_0_process_latency_us": int64(0),
 				"op_project_0_records_in_total":   int64(5),
 				"op_project_0_records_out_total":  int64(5),
 

+ 91 - 91
xsql/processors/window_rule_test.go

@@ -58,12 +58,12 @@ func TestWindow(t *testing.T) {
 			},
 			m: map[string]interface{}{
 				"op_preprocessor_demo_0_exceptions_total":   int64(0),
-				"op_preprocessor_demo_0_process_latency_ms": int64(0),
+				"op_preprocessor_demo_0_process_latency_us": int64(0),
 				"op_preprocessor_demo_0_records_in_total":   int64(5),
 				"op_preprocessor_demo_0_records_out_total":  int64(5),
 
 				"op_project_0_exceptions_total":   int64(0),
-				"op_project_0_process_latency_ms": int64(0),
+				"op_project_0_process_latency_us": int64(0),
 				"op_project_0_records_in_total":   int64(4),
 				"op_project_0_records_out_total":  int64(4),
 
@@ -76,7 +76,7 @@ func TestWindow(t *testing.T) {
 				"source_demo_0_records_out_total": int64(5),
 
 				"op_window_0_exceptions_total":   int64(0),
-				"op_window_0_process_latency_ms": int64(0),
+				"op_window_0_process_latency_us": int64(0),
 				"op_window_0_records_in_total":   int64(5),
 				"op_window_0_records_out_total":  int64(4),
 			},
@@ -98,12 +98,12 @@ func TestWindow(t *testing.T) {
 			},
 			m: map[string]interface{}{
 				"op_preprocessor_demo_0_exceptions_total":   int64(0),
-				"op_preprocessor_demo_0_process_latency_ms": int64(0),
+				"op_preprocessor_demo_0_process_latency_us": int64(0),
 				"op_preprocessor_demo_0_records_in_total":   int64(5),
 				"op_preprocessor_demo_0_records_out_total":  int64(5),
 
 				"op_project_0_exceptions_total":   int64(0),
-				"op_project_0_process_latency_ms": int64(0),
+				"op_project_0_process_latency_us": int64(0),
 				"op_project_0_records_in_total":   int64(2),
 				"op_project_0_records_out_total":  int64(2),
 
@@ -116,12 +116,12 @@ func TestWindow(t *testing.T) {
 				"source_demo_0_records_out_total": int64(5),
 
 				"op_window_0_exceptions_total":   int64(0),
-				"op_window_0_process_latency_ms": int64(0),
+				"op_window_0_process_latency_us": int64(0),
 				"op_window_0_records_in_total":   int64(5),
 				"op_window_0_records_out_total":  int64(4),
 
 				"op_filter_0_exceptions_total":   int64(0),
-				"op_filter_0_process_latency_ms": int64(0),
+				"op_filter_0_process_latency_us": int64(0),
 				"op_filter_0_records_in_total":   int64(4),
 				"op_filter_0_records_out_total":  int64(2),
 			},
@@ -173,17 +173,17 @@ func TestWindow(t *testing.T) {
 			},
 			m: map[string]interface{}{
 				"op_preprocessor_demo_0_exceptions_total":   int64(0),
-				"op_preprocessor_demo_0_process_latency_ms": int64(0),
+				"op_preprocessor_demo_0_process_latency_us": int64(0),
 				"op_preprocessor_demo_0_records_in_total":   int64(5),
 				"op_preprocessor_demo_0_records_out_total":  int64(5),
 
 				"op_preprocessor_demo1_0_exceptions_total":   int64(0),
-				"op_preprocessor_demo1_0_process_latency_ms": int64(0),
+				"op_preprocessor_demo1_0_process_latency_us": int64(0),
 				"op_preprocessor_demo1_0_records_in_total":   int64(5),
 				"op_preprocessor_demo1_0_records_out_total":  int64(5),
 
 				"op_project_0_exceptions_total":   int64(0),
-				"op_project_0_process_latency_ms": int64(0),
+				"op_project_0_process_latency_us": int64(0),
 				"op_project_0_records_in_total":   int64(8),
 				"op_project_0_records_out_total":  int64(8),
 
@@ -200,12 +200,12 @@ func TestWindow(t *testing.T) {
 				"source_demo1_0_records_out_total": int64(5),
 
 				"op_window_0_exceptions_total":   int64(0),
-				"op_window_0_process_latency_ms": int64(0),
+				"op_window_0_process_latency_us": int64(0),
 				"op_window_0_records_in_total":   int64(10),
 				"op_window_0_records_out_total":  int64(10),
 
 				"op_join_0_exceptions_total":   int64(0),
-				"op_join_0_process_latency_ms": int64(0),
+				"op_join_0_process_latency_us": int64(0),
 				"op_join_0_records_in_total":   int64(10),
 				"op_join_0_records_out_total":  int64(8),
 			},
@@ -249,12 +249,12 @@ func TestWindow(t *testing.T) {
 			},
 			m: map[string]interface{}{
 				"op_preprocessor_demo_0_exceptions_total":   int64(0),
-				"op_preprocessor_demo_0_process_latency_ms": int64(0),
+				"op_preprocessor_demo_0_process_latency_us": int64(0),
 				"op_preprocessor_demo_0_records_in_total":   int64(5),
 				"op_preprocessor_demo_0_records_out_total":  int64(5),
 
 				"op_project_0_exceptions_total":   int64(0),
-				"op_project_0_process_latency_ms": int64(0),
+				"op_project_0_process_latency_us": int64(0),
 				"op_project_0_records_in_total":   int64(5),
 				"op_project_0_records_out_total":  int64(5),
 
@@ -267,17 +267,17 @@ func TestWindow(t *testing.T) {
 				"source_demo_0_records_out_total": int64(5),
 
 				"op_window_0_exceptions_total":   int64(0),
-				"op_window_0_process_latency_ms": int64(0),
+				"op_window_0_process_latency_us": int64(0),
 				"op_window_0_records_in_total":   int64(5),
 				"op_window_0_records_out_total":  int64(5),
 
 				"op_aggregate_0_exceptions_total":   int64(0),
-				"op_aggregate_0_process_latency_ms": int64(0),
+				"op_aggregate_0_process_latency_us": int64(0),
 				"op_aggregate_0_records_in_total":   int64(5),
 				"op_aggregate_0_records_out_total":  int64(5),
 
 				"op_order_0_exceptions_total":   int64(0),
-				"op_order_0_process_latency_ms": int64(0),
+				"op_order_0_process_latency_us": int64(0),
 				"op_order_0_records_in_total":   int64(5),
 				"op_order_0_records_out_total":  int64(5),
 			},
@@ -311,12 +311,12 @@ func TestWindow(t *testing.T) {
 			},
 			m: map[string]interface{}{
 				"op_preprocessor_sessionDemo_0_exceptions_total":   int64(0),
-				"op_preprocessor_sessionDemo_0_process_latency_ms": int64(0),
+				"op_preprocessor_sessionDemo_0_process_latency_us": int64(0),
 				"op_preprocessor_sessionDemo_0_records_in_total":   int64(11),
 				"op_preprocessor_sessionDemo_0_records_out_total":  int64(11),
 
 				"op_project_0_exceptions_total":   int64(0),
-				"op_project_0_process_latency_ms": int64(0),
+				"op_project_0_process_latency_us": int64(0),
 				"op_project_0_records_in_total":   int64(4),
 				"op_project_0_records_out_total":  int64(4),
 
@@ -329,7 +329,7 @@ func TestWindow(t *testing.T) {
 				"source_sessionDemo_0_records_out_total": int64(11),
 
 				"op_window_0_exceptions_total":   int64(0),
-				"op_window_0_process_latency_ms": int64(0),
+				"op_window_0_process_latency_us": int64(0),
 				"op_window_0_records_in_total":   int64(11),
 				"op_window_0_records_out_total":  int64(4),
 			},
@@ -365,17 +365,17 @@ func TestWindow(t *testing.T) {
 			},
 			m: map[string]interface{}{
 				"op_preprocessor_demo_0_exceptions_total":   int64(0),
-				"op_preprocessor_demo_0_process_latency_ms": int64(0),
+				"op_preprocessor_demo_0_process_latency_us": int64(0),
 				"op_preprocessor_demo_0_records_in_total":   int64(5),
 				"op_preprocessor_demo_0_records_out_total":  int64(5),
 
 				"op_preprocessor_demo1_0_exceptions_total":   int64(0),
-				"op_preprocessor_demo1_0_process_latency_ms": int64(0),
+				"op_preprocessor_demo1_0_process_latency_us": int64(0),
 				"op_preprocessor_demo1_0_records_in_total":   int64(5),
 				"op_preprocessor_demo1_0_records_out_total":  int64(5),
 
 				"op_project_0_exceptions_total":   int64(0),
-				"op_project_0_process_latency_ms": int64(0),
+				"op_project_0_process_latency_us": int64(0),
 				"op_project_0_records_in_total":   int64(8),
 				"op_project_0_records_out_total":  int64(8),
 
@@ -392,12 +392,12 @@ func TestWindow(t *testing.T) {
 				"source_demo1_0_records_out_total": int64(5),
 
 				"op_window_0_exceptions_total":   int64(0),
-				"op_window_0_process_latency_ms": int64(0),
+				"op_window_0_process_latency_us": int64(0),
 				"op_window_0_records_in_total":   int64(10),
 				"op_window_0_records_out_total":  int64(10),
 
 				"op_join_0_exceptions_total":   int64(0),
-				"op_join_0_process_latency_ms": int64(0),
+				"op_join_0_process_latency_us": int64(0),
 				"op_join_0_records_in_total":   int64(10),
 				"op_join_0_records_out_total":  int64(8),
 			},
@@ -436,12 +436,12 @@ func TestWindow(t *testing.T) {
 			},
 			m: map[string]interface{}{
 				"op_preprocessor_demoError_0_exceptions_total":   int64(3),
-				"op_preprocessor_demoError_0_process_latency_ms": int64(0),
+				"op_preprocessor_demoError_0_process_latency_us": int64(0),
 				"op_preprocessor_demoError_0_records_in_total":   int64(5),
 				"op_preprocessor_demoError_0_records_out_total":  int64(2),
 
 				"op_project_0_exceptions_total":   int64(3),
-				"op_project_0_process_latency_ms": int64(0),
+				"op_project_0_process_latency_us": int64(0),
 				"op_project_0_records_in_total":   int64(6),
 				"op_project_0_records_out_total":  int64(3),
 
@@ -454,7 +454,7 @@ func TestWindow(t *testing.T) {
 				"source_demoError_0_records_out_total": int64(5),
 
 				"op_window_0_exceptions_total":   int64(3),
-				"op_window_0_process_latency_ms": int64(0),
+				"op_window_0_process_latency_us": int64(0),
 				"op_window_0_records_in_total":   int64(5),
 				"op_window_0_records_out_total":  int64(3),
 			},
@@ -470,12 +470,12 @@ func TestWindow(t *testing.T) {
 			},
 			m: map[string]interface{}{
 				"op_preprocessor_demo_0_exceptions_total":   int64(0),
-				"op_preprocessor_demo_0_process_latency_ms": int64(0),
+				"op_preprocessor_demo_0_process_latency_us": int64(0),
 				"op_preprocessor_demo_0_records_in_total":   int64(5),
 				"op_preprocessor_demo_0_records_out_total":  int64(5),
 
 				"op_project_0_exceptions_total":   int64(0),
-				"op_project_0_process_latency_ms": int64(0),
+				"op_project_0_process_latency_us": int64(0),
 				"op_project_0_records_in_total":   int64(1),
 				"op_project_0_records_out_total":  int64(1),
 
@@ -488,22 +488,22 @@ func TestWindow(t *testing.T) {
 				"source_demo_0_records_out_total": int64(5),
 
 				"op_window_0_exceptions_total":   int64(0),
-				"op_window_0_process_latency_ms": int64(0),
+				"op_window_0_process_latency_us": int64(0),
 				"op_window_0_records_in_total":   int64(5),
 				"op_window_0_records_out_total":  int64(4),
 
 				"op_filter_0_exceptions_total":   int64(0),
-				"op_filter_0_process_latency_ms": int64(0),
+				"op_filter_0_process_latency_us": int64(0),
 				"op_filter_0_records_in_total":   int64(4),
 				"op_filter_0_records_out_total":  int64(2),
 
 				"op_aggregate_0_exceptions_total":   int64(0),
-				"op_aggregate_0_process_latency_ms": int64(0),
+				"op_aggregate_0_process_latency_us": int64(0),
 				"op_aggregate_0_records_in_total":   int64(2),
 				"op_aggregate_0_records_out_total":  int64(2),
 
 				"op_having_0_exceptions_total":   int64(0),
-				"op_having_0_process_latency_ms": int64(0),
+				"op_having_0_process_latency_us": int64(0),
 				"op_having_0_records_in_total":   int64(2),
 				"op_having_0_records_out_total":  int64(1),
 			},
@@ -542,12 +542,12 @@ func TestWindow(t *testing.T) {
 			},
 			m: map[string]interface{}{
 				"op_preprocessor_demo_0_exceptions_total":   int64(0),
-				"op_preprocessor_demo_0_process_latency_ms": int64(0),
+				"op_preprocessor_demo_0_process_latency_us": int64(0),
 				"op_preprocessor_demo_0_records_in_total":   int64(5),
 				"op_preprocessor_demo_0_records_out_total":  int64(5),
 
 				"op_project_0_exceptions_total":   int64(0),
-				"op_project_0_process_latency_ms": int64(0),
+				"op_project_0_process_latency_us": int64(0),
 				"op_project_0_records_in_total":   int64(4),
 				"op_project_0_records_out_total":  int64(4),
 
@@ -560,7 +560,7 @@ func TestWindow(t *testing.T) {
 				"source_demo_0_records_out_total": int64(5),
 
 				"op_window_0_exceptions_total":   int64(0),
-				"op_window_0_process_latency_ms": int64(0),
+				"op_window_0_process_latency_us": int64(0),
 				"op_window_0_records_in_total":   int64(3),
 				"op_window_0_records_out_total":  int64(4),
 			},
@@ -574,12 +574,12 @@ func TestWindow(t *testing.T) {
 			},
 			m: map[string]interface{}{
 				"op_preprocessor_demo_0_exceptions_total":   int64(0),
-				"op_preprocessor_demo_0_process_latency_ms": int64(0),
+				"op_preprocessor_demo_0_process_latency_us": int64(0),
 				"op_preprocessor_demo_0_records_in_total":   int64(5),
 				"op_preprocessor_demo_0_records_out_total":  int64(5),
 
 				"op_project_0_exceptions_total":   int64(0),
-				"op_project_0_process_latency_ms": int64(0),
+				"op_project_0_process_latency_us": int64(0),
 				"op_project_0_records_in_total":   int64(1),
 				"op_project_0_records_out_total":  int64(1),
 
@@ -592,7 +592,7 @@ func TestWindow(t *testing.T) {
 				"source_demo_0_records_out_total": int64(5),
 
 				"op_window_0_exceptions_total":   int64(0),
-				"op_window_0_process_latency_ms": int64(0),
+				"op_window_0_process_latency_us": int64(0),
 				"op_window_0_records_in_total":   int64(5),
 				"op_window_0_records_out_total":  int64(1),
 			},
@@ -610,12 +610,12 @@ func TestWindow(t *testing.T) {
 			},
 			m: map[string]interface{}{
 				"op_preprocessor_demo_0_exceptions_total":   int64(0),
-				"op_preprocessor_demo_0_process_latency_ms": int64(0),
+				"op_preprocessor_demo_0_process_latency_us": int64(0),
 				"op_preprocessor_demo_0_records_in_total":   int64(5),
 				"op_preprocessor_demo_0_records_out_total":  int64(5),
 
 				"op_project_0_exceptions_total":   int64(0),
-				"op_project_0_process_latency_ms": int64(0),
+				"op_project_0_process_latency_us": int64(0),
 				"op_project_0_records_in_total":   int64(5),
 				"op_project_0_records_out_total":  int64(5),
 
@@ -628,7 +628,7 @@ func TestWindow(t *testing.T) {
 				"source_demo_0_records_out_total": int64(5),
 
 				"op_window_0_exceptions_total":   int64(0),
-				"op_window_0_process_latency_ms": int64(0),
+				"op_window_0_process_latency_us": int64(0),
 				"op_window_0_records_in_total":   int64(5),
 				"op_window_0_records_out_total":  int64(5),
 			},
@@ -700,12 +700,12 @@ func TestEventWindow(t *testing.T) {
 			},
 			m: map[string]interface{}{
 				"op_preprocessor_demoE_0_exceptions_total":   int64(0),
-				"op_preprocessor_demoE_0_process_latency_ms": int64(0),
+				"op_preprocessor_demoE_0_process_latency_us": int64(0),
 				"op_preprocessor_demoE_0_records_in_total":   int64(6),
 				"op_preprocessor_demoE_0_records_out_total":  int64(6),
 
 				"op_project_0_exceptions_total":   int64(0),
-				"op_project_0_process_latency_ms": int64(0),
+				"op_project_0_process_latency_us": int64(0),
 				"op_project_0_records_in_total":   int64(5),
 				"op_project_0_records_out_total":  int64(5),
 
@@ -718,7 +718,7 @@ func TestEventWindow(t *testing.T) {
 				"source_demoE_0_records_out_total": int64(6),
 
 				"op_window_0_exceptions_total":   int64(0),
-				"op_window_0_process_latency_ms": int64(0),
+				"op_window_0_process_latency_us": int64(0),
 				"op_window_0_records_in_total":   int64(6),
 				"op_window_0_records_out_total":  int64(5),
 			},
@@ -737,12 +737,12 @@ func TestEventWindow(t *testing.T) {
 			},
 			m: map[string]interface{}{
 				"op_preprocessor_demoE_0_exceptions_total":   int64(0),
-				"op_preprocessor_demoE_0_process_latency_ms": int64(0),
+				"op_preprocessor_demoE_0_process_latency_us": int64(0),
 				"op_preprocessor_demoE_0_records_in_total":   int64(6),
 				"op_preprocessor_demoE_0_records_out_total":  int64(6),
 
 				"op_project_0_exceptions_total":   int64(0),
-				"op_project_0_process_latency_ms": int64(0),
+				"op_project_0_process_latency_us": int64(0),
 				"op_project_0_records_in_total":   int64(2),
 				"op_project_0_records_out_total":  int64(2),
 
@@ -755,12 +755,12 @@ func TestEventWindow(t *testing.T) {
 				"source_demoE_0_records_out_total": int64(6),
 
 				"op_window_0_exceptions_total":   int64(0),
-				"op_window_0_process_latency_ms": int64(0),
+				"op_window_0_process_latency_us": int64(0),
 				"op_window_0_records_in_total":   int64(6),
 				"op_window_0_records_out_total":  int64(4),
 
 				"op_filter_0_exceptions_total":   int64(0),
-				"op_filter_0_process_latency_ms": int64(0),
+				"op_filter_0_process_latency_us": int64(0),
 				"op_filter_0_records_in_total":   int64(4),
 				"op_filter_0_records_out_total":  int64(2),
 			},
@@ -800,17 +800,17 @@ func TestEventWindow(t *testing.T) {
 			},
 			m: map[string]interface{}{
 				"op_preprocessor_demoE_0_exceptions_total":   int64(0),
-				"op_preprocessor_demoE_0_process_latency_ms": int64(0),
+				"op_preprocessor_demoE_0_process_latency_us": int64(0),
 				"op_preprocessor_demoE_0_records_in_total":   int64(6),
 				"op_preprocessor_demoE_0_records_out_total":  int64(6),
 
 				"op_preprocessor_demo1E_0_exceptions_total":   int64(0),
-				"op_preprocessor_demo1E_0_process_latency_ms": int64(0),
+				"op_preprocessor_demo1E_0_process_latency_us": int64(0),
 				"op_preprocessor_demo1E_0_records_in_total":   int64(6),
 				"op_preprocessor_demo1E_0_records_out_total":  int64(6),
 
 				"op_project_0_exceptions_total":   int64(0),
-				"op_project_0_process_latency_ms": int64(0),
+				"op_project_0_process_latency_us": int64(0),
 				"op_project_0_records_in_total":   int64(5),
 				"op_project_0_records_out_total":  int64(5),
 
@@ -827,12 +827,12 @@ func TestEventWindow(t *testing.T) {
 				"source_demo1E_0_records_out_total": int64(6),
 
 				"op_window_0_exceptions_total":   int64(0),
-				"op_window_0_process_latency_ms": int64(0),
+				"op_window_0_process_latency_us": int64(0),
 				"op_window_0_records_in_total":   int64(12),
 				"op_window_0_records_out_total":  int64(5),
 
 				"op_join_0_exceptions_total":   int64(0),
-				"op_join_0_process_latency_ms": int64(0),
+				"op_join_0_process_latency_us": int64(0),
 				"op_join_0_records_in_total":   int64(5),
 				"op_join_0_records_out_total":  int64(5),
 			},
@@ -860,12 +860,12 @@ func TestEventWindow(t *testing.T) {
 			},
 			m: map[string]interface{}{
 				"op_preprocessor_demoE_0_exceptions_total":   int64(0),
-				"op_preprocessor_demoE_0_process_latency_ms": int64(0),
+				"op_preprocessor_demoE_0_process_latency_us": int64(0),
 				"op_preprocessor_demoE_0_records_in_total":   int64(6),
 				"op_preprocessor_demoE_0_records_out_total":  int64(6),
 
 				"op_project_0_exceptions_total":   int64(0),
-				"op_project_0_process_latency_ms": int64(0),
+				"op_project_0_process_latency_us": int64(0),
 				"op_project_0_records_in_total":   int64(4),
 				"op_project_0_records_out_total":  int64(4),
 
@@ -878,17 +878,17 @@ func TestEventWindow(t *testing.T) {
 				"source_demoE_0_records_out_total": int64(6),
 
 				"op_window_0_exceptions_total":   int64(0),
-				"op_window_0_process_latency_ms": int64(0),
+				"op_window_0_process_latency_us": int64(0),
 				"op_window_0_records_in_total":   int64(6),
 				"op_window_0_records_out_total":  int64(4),
 
 				"op_aggregate_0_exceptions_total":   int64(0),
-				"op_aggregate_0_process_latency_ms": int64(0),
+				"op_aggregate_0_process_latency_us": int64(0),
 				"op_aggregate_0_records_in_total":   int64(4),
 				"op_aggregate_0_records_out_total":  int64(4),
 
 				"op_order_0_exceptions_total":   int64(0),
-				"op_order_0_process_latency_ms": int64(0),
+				"op_order_0_process_latency_us": int64(0),
 				"op_order_0_records_in_total":   int64(4),
 				"op_order_0_records_out_total":  int64(4),
 			},
@@ -920,12 +920,12 @@ func TestEventWindow(t *testing.T) {
 			},
 			m: map[string]interface{}{
 				"op_preprocessor_sessionDemoE_0_exceptions_total":   int64(0),
-				"op_preprocessor_sessionDemoE_0_process_latency_ms": int64(0),
+				"op_preprocessor_sessionDemoE_0_process_latency_us": int64(0),
 				"op_preprocessor_sessionDemoE_0_records_in_total":   int64(12),
 				"op_preprocessor_sessionDemoE_0_records_out_total":  int64(12),
 
 				"op_project_0_exceptions_total":   int64(0),
-				"op_project_0_process_latency_ms": int64(0),
+				"op_project_0_process_latency_us": int64(0),
 				"op_project_0_records_in_total":   int64(4),
 				"op_project_0_records_out_total":  int64(4),
 
@@ -938,7 +938,7 @@ func TestEventWindow(t *testing.T) {
 				"source_sessionDemoE_0_records_out_total": int64(12),
 
 				"op_window_0_exceptions_total":   int64(0),
-				"op_window_0_process_latency_ms": int64(0),
+				"op_window_0_process_latency_us": int64(0),
 				"op_window_0_records_in_total":   int64(12),
 				"op_window_0_records_out_total":  int64(4),
 			},
@@ -965,17 +965,17 @@ func TestEventWindow(t *testing.T) {
 			},
 			m: map[string]interface{}{
 				"op_preprocessor_demoE_0_exceptions_total":   int64(0),
-				"op_preprocessor_demoE_0_process_latency_ms": int64(0),
+				"op_preprocessor_demoE_0_process_latency_us": int64(0),
 				"op_preprocessor_demoE_0_records_in_total":   int64(6),
 				"op_preprocessor_demoE_0_records_out_total":  int64(6),
 
 				"op_preprocessor_demo1E_0_exceptions_total":   int64(0),
-				"op_preprocessor_demo1E_0_process_latency_ms": int64(0),
+				"op_preprocessor_demo1E_0_process_latency_us": int64(0),
 				"op_preprocessor_demo1E_0_records_in_total":   int64(6),
 				"op_preprocessor_demo1E_0_records_out_total":  int64(6),
 
 				"op_project_0_exceptions_total":   int64(0),
-				"op_project_0_process_latency_ms": int64(0),
+				"op_project_0_process_latency_us": int64(0),
 				"op_project_0_records_in_total":   int64(5),
 				"op_project_0_records_out_total":  int64(5),
 
@@ -996,7 +996,7 @@ func TestEventWindow(t *testing.T) {
 				"op_window_0_records_out_total": int64(5),
 
 				"op_join_0_exceptions_total":   int64(0),
-				"op_join_0_process_latency_ms": int64(0),
+				"op_join_0_process_latency_us": int64(0),
 				"op_join_0_records_in_total":   int64(5),
 				"op_join_0_records_out_total":  int64(5),
 			},
@@ -1037,12 +1037,12 @@ func TestEventWindow(t *testing.T) {
 			},
 			m: map[string]interface{}{
 				"op_preprocessor_demoErr_0_exceptions_total":   int64(1),
-				"op_preprocessor_demoErr_0_process_latency_ms": int64(0),
+				"op_preprocessor_demoErr_0_process_latency_us": int64(0),
 				"op_preprocessor_demoErr_0_records_in_total":   int64(6),
 				"op_preprocessor_demoErr_0_records_out_total":  int64(5),
 
 				"op_project_0_exceptions_total":   int64(1),
-				"op_project_0_process_latency_ms": int64(0),
+				"op_project_0_process_latency_us": int64(0),
 				"op_project_0_records_in_total":   int64(6),
 				"op_project_0_records_out_total":  int64(5),
 
@@ -1055,7 +1055,7 @@ func TestEventWindow(t *testing.T) {
 				"source_demoErr_0_records_out_total": int64(6),
 
 				"op_window_0_exceptions_total":   int64(1),
-				"op_window_0_process_latency_ms": int64(0),
+				"op_window_0_process_latency_us": int64(0),
 				"op_window_0_records_in_total":   int64(6),
 				"op_window_0_records_out_total":  int64(5),
 			},
@@ -1103,12 +1103,12 @@ func TestWindowError(t *testing.T) {
 			},
 			m: map[string]interface{}{
 				"op_preprocessor_ldemo_0_exceptions_total":   int64(0),
-				"op_preprocessor_ldemo_0_process_latency_ms": int64(0),
+				"op_preprocessor_ldemo_0_process_latency_us": int64(0),
 				"op_preprocessor_ldemo_0_records_in_total":   int64(5),
 				"op_preprocessor_ldemo_0_records_out_total":  int64(5),
 
 				"op_project_0_exceptions_total":   int64(1),
-				"op_project_0_process_latency_ms": int64(0),
+				"op_project_0_process_latency_us": int64(0),
 				"op_project_0_records_in_total":   int64(2),
 				"op_project_0_records_out_total":  int64(1),
 
@@ -1121,7 +1121,7 @@ func TestWindowError(t *testing.T) {
 				"source_ldemo_0_records_out_total": int64(5),
 
 				"op_window_0_exceptions_total":   int64(0),
-				"op_window_0_process_latency_ms": int64(0),
+				"op_window_0_process_latency_us": int64(0),
 				"op_window_0_records_in_total":   int64(5),
 				"op_window_0_records_out_total":  int64(2),
 			},
@@ -1137,12 +1137,12 @@ func TestWindowError(t *testing.T) {
 			},
 			m: map[string]interface{}{
 				"op_preprocessor_ldemo_0_exceptions_total":   int64(0),
-				"op_preprocessor_ldemo_0_process_latency_ms": int64(0),
+				"op_preprocessor_ldemo_0_process_latency_us": int64(0),
 				"op_preprocessor_ldemo_0_records_in_total":   int64(5),
 				"op_preprocessor_ldemo_0_records_out_total":  int64(5),
 
 				"op_project_0_exceptions_total":   int64(1),
-				"op_project_0_process_latency_ms": int64(0),
+				"op_project_0_process_latency_us": int64(0),
 				"op_project_0_records_in_total":   int64(2),
 				"op_project_0_records_out_total":  int64(1),
 
@@ -1155,12 +1155,12 @@ func TestWindowError(t *testing.T) {
 				"source_ldemo_0_records_out_total": int64(5),
 
 				"op_window_0_exceptions_total":   int64(0),
-				"op_window_0_process_latency_ms": int64(0),
+				"op_window_0_process_latency_us": int64(0),
 				"op_window_0_records_in_total":   int64(5),
 				"op_window_0_records_out_total":  int64(4),
 
 				"op_filter_0_exceptions_total":   int64(1),
-				"op_filter_0_process_latency_ms": int64(0),
+				"op_filter_0_process_latency_us": int64(0),
 				"op_filter_0_records_in_total":   int64(4),
 				"op_filter_0_records_out_total":  int64(1),
 			},
@@ -1196,17 +1196,17 @@ func TestWindowError(t *testing.T) {
 			},
 			m: map[string]interface{}{
 				"op_preprocessor_ldemo_0_exceptions_total":   int64(0),
-				"op_preprocessor_ldemo_0_process_latency_ms": int64(0),
+				"op_preprocessor_ldemo_0_process_latency_us": int64(0),
 				"op_preprocessor_ldemo_0_records_in_total":   int64(5),
 				"op_preprocessor_ldemo_0_records_out_total":  int64(5),
 
 				"op_preprocessor_ldemo1_0_exceptions_total":   int64(0),
-				"op_preprocessor_ldemo1_0_process_latency_ms": int64(0),
+				"op_preprocessor_ldemo1_0_process_latency_us": int64(0),
 				"op_preprocessor_ldemo1_0_records_in_total":   int64(5),
 				"op_preprocessor_ldemo1_0_records_out_total":  int64(5),
 
 				"op_project_0_exceptions_total":   int64(3),
-				"op_project_0_process_latency_ms": int64(0),
+				"op_project_0_process_latency_us": int64(0),
 				"op_project_0_records_in_total":   int64(8),
 				"op_project_0_records_out_total":  int64(5),
 
@@ -1223,12 +1223,12 @@ func TestWindowError(t *testing.T) {
 				"source_ldemo1_0_records_out_total": int64(5),
 
 				"op_window_0_exceptions_total":   int64(0),
-				"op_window_0_process_latency_ms": int64(0),
+				"op_window_0_process_latency_us": int64(0),
 				"op_window_0_records_in_total":   int64(10),
 				"op_window_0_records_out_total":  int64(10),
 
 				"op_join_0_exceptions_total":   int64(3),
-				"op_join_0_process_latency_ms": int64(0),
+				"op_join_0_process_latency_us": int64(0),
 				"op_join_0_records_in_total":   int64(10),
 				"op_join_0_records_out_total":  int64(5),
 			},
@@ -1250,12 +1250,12 @@ func TestWindowError(t *testing.T) {
 			},
 			m: map[string]interface{}{
 				"op_preprocessor_ldemo_0_exceptions_total":   int64(0),
-				"op_preprocessor_ldemo_0_process_latency_ms": int64(0),
+				"op_preprocessor_ldemo_0_process_latency_us": int64(0),
 				"op_preprocessor_ldemo_0_records_in_total":   int64(5),
 				"op_preprocessor_ldemo_0_records_out_total":  int64(5),
 
 				"op_project_0_exceptions_total":   int64(3),
-				"op_project_0_process_latency_ms": int64(0),
+				"op_project_0_process_latency_us": int64(0),
 				"op_project_0_records_in_total":   int64(5),
 				"op_project_0_records_out_total":  int64(2),
 
@@ -1268,17 +1268,17 @@ func TestWindowError(t *testing.T) {
 				"source_ldemo_0_records_out_total": int64(5),
 
 				"op_window_0_exceptions_total":   int64(0),
-				"op_window_0_process_latency_ms": int64(0),
+				"op_window_0_process_latency_us": int64(0),
 				"op_window_0_records_in_total":   int64(5),
 				"op_window_0_records_out_total":  int64(5),
 
 				"op_aggregate_0_exceptions_total":   int64(0),
-				"op_aggregate_0_process_latency_ms": int64(0),
+				"op_aggregate_0_process_latency_us": int64(0),
 				"op_aggregate_0_records_in_total":   int64(5),
 				"op_aggregate_0_records_out_total":  int64(5),
 
 				"op_having_0_exceptions_total":   int64(3),
-				"op_having_0_process_latency_ms": int64(0),
+				"op_having_0_process_latency_us": int64(0),
 				"op_having_0_records_in_total":   int64(5),
 				"op_having_0_records_out_total":  int64(2),
 			},
@@ -1299,12 +1299,12 @@ func TestWindowError(t *testing.T) {
 			},
 			m: map[string]interface{}{
 				"op_preprocessor_ldemo_0_exceptions_total":   int64(0),
-				"op_preprocessor_ldemo_0_process_latency_ms": int64(0),
+				"op_preprocessor_ldemo_0_process_latency_us": int64(0),
 				"op_preprocessor_ldemo_0_records_in_total":   int64(5),
 				"op_preprocessor_ldemo_0_records_out_total":  int64(5),
 
 				"op_project_0_exceptions_total":   int64(1),
-				"op_project_0_process_latency_ms": int64(0),
+				"op_project_0_process_latency_us": int64(0),
 				"op_project_0_records_in_total":   int64(4),
 				"op_project_0_records_out_total":  int64(3),
 
@@ -1317,12 +1317,12 @@ func TestWindowError(t *testing.T) {
 				"source_ldemo_0_records_out_total": int64(5),
 
 				"op_window_0_exceptions_total":   int64(0),
-				"op_window_0_process_latency_ms": int64(0),
+				"op_window_0_process_latency_us": int64(0),
 				"op_window_0_records_in_total":   int64(5),
 				"op_window_0_records_out_total":  int64(4),
 
 				"op_order_0_exceptions_total":   int64(1),
-				"op_order_0_process_latency_ms": int64(0),
+				"op_order_0_process_latency_us": int64(0),
 				"op_order_0_records_in_total":   int64(4),
 				"op_order_0_records_out_total":  int64(3),
 			},

+ 48 - 0
xsql/processors/xsql_processor.go

@@ -77,6 +77,33 @@ func (p *StreamProcessor) execCreateStream(stmt *xsql.StreamStmt, statement stri
 	}
 }
 
+func (p *StreamProcessor) ExecReplaceStream(statement string) (string, error) {
+	parser := xsql.NewParser(strings.NewReader(statement))
+	stmt, err := xsql.Language.Parse(parser)
+	if err != nil {
+		return "", err
+	}
+
+	switch s := stmt.(type) {
+	case *xsql.StreamStmt:
+		if err = p.db.Open(); nil != err {
+			return "", fmt.Errorf("Replace stream fails, error when opening db: %v.", err)
+		}
+		defer p.db.Close()
+
+		if err = p.db.Replace(string(s.Name), statement); nil != err {
+			return "", fmt.Errorf("Replace stream fails: %v.", err)
+		} else {
+			info := fmt.Sprintf("Stream %s is replaced.", s.Name)
+			log.Printf("%s", info)
+			return info, nil
+		}
+	default:
+		return "", fmt.Errorf("Invalid stream statement: %s", statement)
+	}
+	return "", nil
+}
+
 func (p *StreamProcessor) ExecStreamSql(statement string) (string, error) {
 	r, err := p.ExecStmt(statement)
 	if err != nil {
@@ -228,6 +255,27 @@ func (p *RuleProcessor) ExecCreate(name, ruleJson string) (*api.Rule, error) {
 
 	return rule, nil
 }
+func (p *RuleProcessor) ExecUpdate(name, ruleJson string) (*api.Rule, error) {
+	rule, err := p.getRuleByJson(name, ruleJson)
+	if err != nil {
+		return nil, err
+	}
+
+	err = p.db.Open()
+	if err != nil {
+		return nil, err
+	}
+	defer p.db.Close()
+
+	err = p.db.Replace(rule.Id, ruleJson)
+	if err != nil {
+		return nil, err
+	} else {
+		log.Infof("Rule %s is update.", rule.Id)
+	}
+
+	return rule, nil
+}
 
 func (p *RuleProcessor) ExecReplaceRuleState(name string, triggered bool) (err error) {
 	rule, err := p.GetRuleByName(name)

+ 55 - 1
xsql/util.go

@@ -1,6 +1,10 @@
 package xsql
 
-import "strings"
+import (
+	"encoding/json"
+	"fmt"
+	"strings"
+)
 
 func PrintFieldType(ft FieldType) (result string) {
 	switch t := ft.(type) {
@@ -30,6 +34,56 @@ func PrintFieldType(ft FieldType) (result string) {
 	return
 }
 
+func PrintFieldTypeForJson(ft FieldType) (result interface{}) {
+	r, q := doPrintFieldTypeForJson(ft)
+	if q {
+		return r
+	} else {
+		return json.RawMessage(r)
+	}
+}
+
+func doPrintFieldTypeForJson(ft FieldType) (result string, isLiteral bool) {
+	switch t := ft.(type) {
+	case *BasicType:
+		return t.Type.String(), true
+	case *ArrayType:
+		var (
+			fieldType string
+			q         bool
+		)
+		if t.FieldType != nil {
+			fieldType, q = doPrintFieldTypeForJson(t.FieldType)
+		} else {
+			fieldType, q = t.Type.String(), true
+		}
+		if q {
+			result = fmt.Sprintf(`{"Type":"array","ElementType":"%s"}`, fieldType)
+		} else {
+			result = fmt.Sprintf(`{"Type":"array","ElementType":%s}`, fieldType)
+		}
+
+	case *RecType:
+		result = `{"Type":"struct","Fields":[`
+		isFirst := true
+		for _, f := range t.StreamFields {
+			if isFirst {
+				isFirst = false
+			} else {
+				result += ","
+			}
+			fieldType, q := doPrintFieldTypeForJson(f.FieldType)
+			if q {
+				result = fmt.Sprintf(`%s{"FieldType":"%s","Name":"%s"}`, result, fieldType, f.Name)
+			} else {
+				result = fmt.Sprintf(`%s{"FieldType":"%s","Name":"%s"}`, result, fieldType, f.Name)
+			}
+		}
+		result += `]}`
+	}
+	return result, false
+}
+
 func GetStreams(stmt *SelectStatement) (result []string) {
 	if stmt == nil {
 		return nil

+ 45 - 1
xsql/util_test.go

@@ -61,10 +61,54 @@ func TestLowercaseKeyMap(t *testing.T) {
 
 	fmt.Printf("The test bucket size is %d.\n\n", len(tests))
 	for i, tt := range tests {
-		//fmt.Printf("Parsing SQL %q.\n", tt.s)
 		result := LowercaseKeyMap(tt.src)
 		if !reflect.DeepEqual(tt.dest, result) {
 			t.Errorf("%d. \nstmt mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.dest, result)
 		}
 	}
 }
+
+func TestPrintFieldType(t *testing.T) {
+	var tests = []struct {
+		ft      FieldType
+		printed string
+	}{{
+		ft: &RecType{
+			StreamFields: []StreamField{
+				{Name: "STREET_NAME", FieldType: &BasicType{Type: STRINGS}},
+				{Name: "NUMBER", FieldType: &BasicType{Type: BIGINT}},
+			},
+		},
+		printed: `{"Type":"struct","Fields":[{"FieldType":"string","Name":"STREET_NAME"},{"FieldType":"bigint","Name":"NUMBER"}]}`,
+	}, {
+		ft: &ArrayType{
+			Type: STRUCT,
+			FieldType: &RecType{
+				StreamFields: []StreamField{
+					{Name: "STREET_NAME", FieldType: &BasicType{Type: STRINGS}},
+					{Name: "NUMBER", FieldType: &BasicType{Type: BIGINT}},
+				},
+			},
+		},
+		printed: `{"Type":"array","ElementType":{"Type":"struct","Fields":[{"FieldType":"string","Name":"STREET_NAME"},{"FieldType":"bigint","Name":"NUMBER"}]}}`,
+	}, {
+		ft: &ArrayType{
+			Type:      STRUCT,
+			FieldType: &BasicType{Type: STRINGS},
+		},
+		printed: `{"Type":"array","ElementType":"string"}`,
+	}, {
+		ft: &BasicType{
+			Type: STRINGS,
+		},
+		printed: `string`,
+	}}
+	fmt.Printf("The test bucket size is %d.\n\n", len(tests))
+	for i, tt := range tests {
+		//fmt.Printf("Parsing SQL %q.\n",tt.s)
+		result, _ := doPrintFieldTypeForJson(tt.ft)
+		if !reflect.DeepEqual(tt.printed, result) {
+			t.Errorf("%d. \nstmt mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.printed, result)
+		}
+	}
+}

+ 1 - 1
xstream/nodes/dynamic_channel_buffer.go

@@ -7,10 +7,10 @@ import (
 )
 
 type DynamicChannelBuffer struct {
+	limit  int64
 	In     chan api.SourceTuple
 	Out    chan api.SourceTuple
 	buffer []api.SourceTuple
-	limit  int64
 }
 
 func NewDynamicChannelBuffer() *DynamicChannelBuffer {

+ 3 - 3
xstream/nodes/prometheus.go

@@ -8,12 +8,12 @@ import (
 const RecordsInTotal = "records_in_total"
 const RecordsOutTotal = "records_out_total"
 const ExceptionsTotal = "exceptions_total"
-const ProcessLatencyMs = "process_latency_ms"
+const ProcessLatencyUs = "process_latency_us"
 const LastInvocation = "last_invocation"
 const BufferLength = "buffer_length"
 
 var (
-	MetricNames        = []string{RecordsInTotal, RecordsOutTotal, ExceptionsTotal, ProcessLatencyMs, BufferLength, LastInvocation}
+	MetricNames        = []string{RecordsInTotal, RecordsOutTotal, ExceptionsTotal, ProcessLatencyUs, BufferLength, LastInvocation}
 	prometheuseMetrics *PrometheusMetrics
 	mutex              sync.RWMutex
 )
@@ -60,7 +60,7 @@ func newPrometheusMetrics() *PrometheusMetrics {
 			Help: "Total number of user exceptions of " + prefix,
 		}, labelNames)
 		processLatency := prometheus.NewGaugeVec(prometheus.GaugeOpts{
-			Name: prefix + "_" + ProcessLatencyMs,
+			Name: prefix + "_" + ProcessLatencyUs,
 			Help: "Process latency in millisecond of " + prefix,
 		}, labelNames)
 		bufferLength := prometheus.NewGaugeVec(prometheus.GaugeOpts{

+ 2 - 2
xstream/nodes/stats_manager.go

@@ -111,7 +111,7 @@ func (sm *DefaultStatManager) ProcessTimeStart() {
 
 func (sm *DefaultStatManager) ProcessTimeEnd() {
 	if !sm.processTimeStart.IsZero() {
-		sm.processLatency = int64(time.Since(sm.processTimeStart) / time.Millisecond)
+		sm.processLatency = int64(time.Since(sm.processTimeStart) / time.Microsecond)
 	}
 }
 
@@ -138,7 +138,7 @@ func (sm *PrometheusStatManager) IncTotalExceptions() {
 
 func (sm *PrometheusStatManager) ProcessTimeEnd() {
 	if !sm.processTimeStart.IsZero() {
-		sm.processLatency = int64(time.Since(sm.processTimeStart) / time.Millisecond)
+		sm.processLatency = int64(time.Since(sm.processTimeStart) / time.Microsecond)
 		sm.pProcessLatency.Set(float64(sm.processLatency))
 	}
 }

+ 44 - 2
xstream/server/server/rest.go

@@ -76,9 +76,9 @@ func createRestServer(port int) *http.Server {
 	r := mux.NewRouter()
 	r.HandleFunc("/", rootHandler).Methods(http.MethodGet, http.MethodPost)
 	r.HandleFunc("/streams", streamsHandler).Methods(http.MethodGet, http.MethodPost)
-	r.HandleFunc("/streams/{name}", streamHandler).Methods(http.MethodGet, http.MethodDelete)
+	r.HandleFunc("/streams/{name}", streamHandler).Methods(http.MethodGet, http.MethodDelete, http.MethodPut)
 	r.HandleFunc("/rules", rulesHandler).Methods(http.MethodGet, http.MethodPost)
-	r.HandleFunc("/rules/{name}", ruleHandler).Methods(http.MethodDelete, http.MethodGet)
+	r.HandleFunc("/rules/{name}", ruleHandler).Methods(http.MethodDelete, http.MethodGet, http.MethodPut)
 	r.HandleFunc("/rules/{name}/status", getStatusRuleHandler).Methods(http.MethodGet)
 	r.HandleFunc("/rules/{name}/start", startRuleHandler).Methods(http.MethodPost)
 	r.HandleFunc("/rules/{name}/stop", stopRuleHandler).Methods(http.MethodPost)
@@ -191,6 +191,19 @@ func streamHandler(w http.ResponseWriter, r *http.Request) {
 		}
 		w.WriteHeader(http.StatusOK)
 		w.Write([]byte(content))
+	case http.MethodPut:
+		v, err := decodeStatementDescriptor(r.Body)
+		if err != nil {
+			handleError(w, err, "Invalid body", logger)
+			return
+		}
+		content, err := streamProcessor.ExecReplaceStream(v.Sql)
+		if err != nil {
+			handleError(w, err, "Stream command error", logger)
+			return
+		}
+		w.WriteHeader(http.StatusOK)
+		w.Write([]byte(content))
 	}
 }
 
@@ -258,6 +271,35 @@ func ruleHandler(w http.ResponseWriter, r *http.Request) {
 		}
 		w.WriteHeader(http.StatusOK)
 		w.Write([]byte(content))
+	case http.MethodPut:
+		_, err := ruleProcessor.GetRuleByName(name)
+		if err != nil {
+			handleError(w, err, "not found this rule", logger)
+			return
+		}
+
+		body, err := ioutil.ReadAll(r.Body)
+		if err != nil {
+			handleError(w, err, "Invalid body", logger)
+			return
+		}
+
+		r, err := ruleProcessor.ExecUpdate(name, string(body))
+		var result string
+		if err != nil {
+			handleError(w, err, "Update rule error", logger)
+			return
+		} else {
+			result = fmt.Sprintf("Rule %s was updated successfully.", r.Id)
+		}
+
+		err = restartRule(name)
+		if err != nil {
+			handleError(w, err, "restart rule error", logger)
+			return
+		}
+		w.WriteHeader(http.StatusOK)
+		w.Write([]byte(result))
 	}
 }