Przeglądaj źródła

fix(plugin): can not use timestamp from message as tdengine timestamp index

Signed-off-by: Jianxiang Ran <rxan_embedded@163.com>
Jianxiang Ran 2 lat temu
rodzic
commit
797aec3e84

+ 6 - 2
extensions/sinks/tdengine/tdengine.go

@@ -84,10 +84,14 @@ func (t *taosConfig) buildSql(ctx api.StreamContext, mapData map[string]interfac
 
 
 	if t.ProvideTs {
 	if t.ProvideTs {
 		if v, ok := mapData[t.TsFieldName]; !ok {
 		if v, ok := mapData[t.TsFieldName]; !ok {
-			return "", fmt.Errorf("Timestamp field not found : %s.", t.TsFieldName)
+			return "", fmt.Errorf("timestamp field not found : %s", t.TsFieldName)
 		} else {
 		} else {
 			keys = append(keys, t.TsFieldName)
 			keys = append(keys, t.TsFieldName)
-			vals = append(vals, fmt.Sprintf(`"%v"`, v))
+			timeStamp, err := cast.ToInt64(v, cast.CONVERT_SAMEKIND)
+			if err != nil {
+				return "", fmt.Errorf("timestamp field can not convert to int64 : %v", v)
+			}
+			vals = append(vals, fmt.Sprintf(`%v`, timeStamp))
 		}
 		}
 	} else {
 	} else {
 		vals = append(vals, "now")
 		vals = append(vals, "now")

+ 5 - 5
extensions/sinks/tdengine/tdengine_test.go

@@ -157,7 +157,7 @@ func TestBuildSql(t *testing.T) {
 			data: map[string]interface{}{
 			data: map[string]interface{}{
 				"f1": "v1",
 				"f1": "v1",
 			},
 			},
-			expected: `INSERT INTO t (ts,f1) values (now,"v1");`,
+			expected: `t (ts,f1) values (now,"v1")`,
 		},
 		},
 		{
 		{
 			conf: &taosConfig{
 			conf: &taosConfig{
@@ -173,10 +173,10 @@ func TestBuildSql(t *testing.T) {
 				Fields:      nil,
 				Fields:      nil,
 			},
 			},
 			data: map[string]interface{}{
 			data: map[string]interface{}{
-				"ts": 12345678,
+				"ts": 1.2345678e+06,
 				"f2": 65,
 				"f2": 65,
 			},
 			},
-			expected: `INSERT INTO t (ts,f2) values ("12345678",65);`,
+			expected: `t (ts,f2) values (12345678,65)`,
 		},
 		},
 		{
 		{
 			conf: &taosConfig{
 			conf: &taosConfig{
@@ -195,7 +195,7 @@ func TestBuildSql(t *testing.T) {
 				"ts": 12345678,
 				"ts": 12345678,
 				"f2": 65,
 				"f2": 65,
 			},
 			},
-			expected: `INSERT INTO t (ts,f2) values ("12345678",65);`,
+			expected: `t (ts,f2) values (12345678,65)`,
 		},
 		},
 		{
 		{
 			conf: &taosConfig{
 			conf: &taosConfig{
@@ -220,7 +220,7 @@ func TestBuildSql(t *testing.T) {
 				"a":     "a1",
 				"a":     "a1",
 				"b":     2,
 				"b":     2,
 			},
 			},
-			expected: `INSERT INTO t1 (tst,f1,f2) using s tags ("a1",2) values (now,12.3,65);`,
+			expected: `t1 (tst,f1,f2) using s tags ("a1",2) values (now,12.3,65)`,
 		},
 		},
 	}
 	}
 	fmt.Printf("The test bucket size is %d.\n\n", len(tests))
 	fmt.Printf("The test bucket size is %d.\n\n", len(tests))