Przeglądaj źródła

fix(meta): fix function/operator meta data (#1437)

* fix(meta): fix function/operator meta data

Signed-off-by: Jianxiang Ran <rxan_embedded@163.com>

* fix(meta): fix function/operator meta data

Signed-off-by: Jianxiang Ran <rxan_embedded@163.com>

* fix(meta): update meta jsons

Signed-off-by: Jiyong Huang <huangjy@emqx.io>

* fix(graph): func op should init the isAgg property

Signed-off-by: Jiyong Huang <huangjy@emqx.io>

Signed-off-by: Jianxiang Ran <rxan_embedded@163.com>
Signed-off-by: Jiyong Huang <huangjy@emqx.io>
Co-authored-by: Jiyong Huang <huangjy@emqx.io>
superxan 2 lat temu
rodzic
commit
bfcfd54c6d

Plik diff jest za duży
+ 2511 - 13
etc/functions/internal.json


+ 222 - 0
etc/functions/windows.json

@@ -22,6 +22,45 @@
 		"hint": {
 			"en_US": "Tumbling window functions are used to segment a data stream into distinct time segments and perform a function against them, such as the example below. The key differentiators of a Tumbling window are that they repeat, do not overlap, and an event cannot belong to more than one tumbling window.",
 			"zh_CN": "滚动窗口函数用于将数据流分割成不同的时间段,并对其执行函数,例如下面的示例。滚动窗口的关键区别在于它们重复不重叠,并且一个事件不能属于多个翻滚窗口。"
+		},
+		"args": [
+			{
+				"name": "timeunit",
+				"optional": false,
+				"control": "select",
+				"type": "string",
+				"values": ["ms", "ss", "mi", "hh", "dd"],
+				"hint": {
+					"en_US": "The time unit to calculate window time",
+					"zh_CN": "窗口时间单位"
+				},
+				"label": {
+					"en_US": "Timeunit",
+					"zh_CN": "时间单位"
+				}
+			},
+			{
+				"name": "length",
+				"optional": false,
+				"control": "text",
+				"type": "int",
+				"hint": {
+					"en_US": "Window length",
+					"zh_CN": "窗口长度"
+				},
+				"label": {
+					"en_US": "Window Length",
+					"zh_CN": "窗口长度"
+				}
+			}
+		],
+		"node": {
+			"category": "function",
+			"icon": "iconPath",
+			"label": {
+				"en_US": "Tumbling Window",
+				"zh_CN": "滚动窗口"
+			}
 		}
 	}, {
 		"name": "HOPPINGWINDOW",
@@ -29,6 +68,59 @@
 		"hint": {
 			"en_US": "Hopping window functions hop forward in time by a fixed period. It may be easy to think of them as Tumbling windows that can overlap, so events can belong to more than one Hopping window result set. To make a Hopping window the same as a Tumbling window, specify the hop size to be the same as the window size.",
 			"zh_CN": "跳跃窗口功能会在时间上向前跳一段固定的时间。 将它们视为可能重叠的翻转窗口可能很容易,因此事件可以属于多个跳跃窗口结果集。 要使跳跃窗口与翻转窗口相同,请将跳跃大小指定为与窗口大小相同。"
+		},
+		"args": [
+			{
+				"name": "timeunit",
+				"optional": false,
+				"control": "select",
+				"type": "string",
+				"values": ["ms", "ss", "mi", "hh", "dd"],
+				"hint": {
+					"en_US": "The time unit to calculate window time",
+					"zh_CN": "窗口时间单位"
+				},
+				"label": {
+					"en_US": "Timeunit",
+					"zh_CN": "时间单位"
+				}
+			},
+			{
+				"name": "length",
+				"optional": false,
+				"control": "text",
+				"type": "int",
+				"hint": {
+					"en_US": "Window length",
+					"zh_CN": "窗口长度"
+				},
+				"label": {
+					"en_US": "Window Length",
+					"zh_CN": "窗口长度"
+				}
+			},
+			{
+				"name": "hopsize",
+				"optional": false,
+				"control": "text",
+				"type": "int",
+				"hint": {
+					"en_US": "Hop size",
+					"zh_CN": "窗口跳跃长度"
+				},
+				"label": {
+					"en_US": "Hop Size",
+					"zh_CN": "窗口跳跃长度"
+				}
+			}
+		],
+		"node": {
+			"category": "function",
+			"icon": "iconPath",
+			"label": {
+				"en_US": "Hopping Window",
+				"zh_CN": "跳跃窗口"
+			}
 		}
 	}, {
 		"name": "SLIDINGWINDOW",
@@ -36,6 +128,45 @@
 		"hint": {
 			"en_US": "Sliding window functions, unlike Tumbling or Hopping windows, produce an output ONLY when an event occurs. Every window will have at least one event and the window continuously moves forward by an € (epsilon). Like hopping windows, events can belong to more than one sliding window.",
 			"zh_CN": "滑动窗口功能与翻转或跳动窗口不同,仅在事件发生时会产生输出。 每个窗口至少会有一个事件,并且该窗口连续向前移动€(ε)。 就像跳跃窗口一样,事件可以属于多个滑动窗口。"
+		},
+		"args": [
+			{
+				"name": "timeunit",
+				"optional": false,
+				"control": "select",
+				"type": "string",
+				"values": ["ms", "ss", "mi", "hh", "dd"],
+				"hint": {
+					"en_US": "The time unit to calculate window time",
+					"zh_CN": "窗口时间单位"
+				},
+				"label": {
+					"en_US": "Timeunit",
+					"zh_CN": "时间单位"
+				}
+			},
+			{
+				"name": "length",
+				"optional": false,
+				"control": "text",
+				"type": "int",
+				"hint": {
+					"en_US": "Window length",
+					"zh_CN": "窗口长度"
+				},
+				"label": {
+					"en_US": "Window Length",
+					"zh_CN": "窗口长度"
+				}
+			}
+		],
+		"node": {
+			"category": "function",
+			"icon": "iconPath",
+			"label": {
+				"en_US": "Sliding Window",
+				"zh_CN": "滑动窗口"
+			}
 		}
 	}, {
 		"name": "SESSIONWINDOW",
@@ -43,6 +174,59 @@
 		"hint": {
 			"en_US": "Session window functions group events that arrive at similar times, filtering out periods of time where there is no data. It has two main parameters: timeout and maximum duration.",
 			"zh_CN": "会话窗口功能对在相似时间到达的事件进行分组,以过滤掉没有数据的时间段。 它有两个主要参数:超时和最大持续时间。"
+		},
+		"args": [
+			{
+				"name": "timeunit",
+				"optional": false,
+				"control": "select",
+				"type": "string",
+				"values": ["ms", "ss", "mi", "hh", "dd"],
+				"hint": {
+					"en_US": "The time unit to calculate window time",
+					"zh_CN": "窗口时间单位"
+				},
+				"label": {
+					"en_US": "Timeunit",
+					"zh_CN": "时间单位"
+				}
+			},
+			{
+				"name": "length",
+				"optional": false,
+				"control": "text",
+				"type": "int",
+				"hint": {
+					"en_US": "Window length",
+					"zh_CN": "最大窗口长度"
+				},
+				"label": {
+					"en_US": "Window Length",
+					"zh_CN": "最大窗口长度"
+				}
+			},
+			{
+				"name": "timeout",
+				"optional": false,
+				"control": "text",
+				"type": "int",
+				"hint": {
+					"en_US": "Timeout",
+					"zh_CN": "超时时长"
+				},
+				"label": {
+					"en_US": "Timeout",
+					"zh_CN": "超时时长"
+				}
+			}
+		],
+		"node": {
+			"category": "function",
+			"icon": "iconPath",
+			"label": {
+				"en_US": "Session Window",
+				"zh_CN": "会话窗口"
+			}
 		}
 	}, {
 		"name": "COUNTWINDOW",
@@ -50,6 +234,44 @@
 		"hint": {
 			"en_US": "Tumbling count window is similar to general tumbling window, events in a tumbling window can not repeat, do not overlap, and an event cannot belong to more than one tumbling window. Below is a count window with 5 events length.",
 			"zh_CN": "滚动计数窗口与一般的滚动窗口类似,在滚动窗口中的事件不重复、不重叠,一个事件不会属于多个滚动窗口。以下是一个长度为 5 的滚动计数窗口。"
+		},
+		"args": [
+			{
+				"name": "length",
+				"optional": false,
+				"control": "text",
+				"type": "int",
+				"hint": {
+					"en_US": "Window length",
+					"zh_CN": "窗口长度"
+				},
+				"label": {
+					"en_US": "Window Length",
+					"zh_CN": "窗口长度"
+				}
+			},
+			{
+				"name": "timeout",
+				"optional": true,
+				"control": "text",
+				"type": "int",
+				"hint": {
+					"en_US": "Trigger Length",
+					"zh_CN": "触发长度"
+				},
+				"label": {
+					"en_US": "Trigger Length",
+					"zh_CN": "触发长度"
+				}
+			}
+		],
+		"node": {
+			"category": "function",
+			"icon": "iconPath",
+			"label": {
+				"en_US": "Count Window",
+				"zh_CN": "计数窗口"
+			}
 		}
 	}]
 }

+ 17 - 0
etc/mqtt_source.json

@@ -212,5 +212,22 @@
 				"zh_CN": "KubeEdge 模型文件"
 			}
 		}]
+	},
+	"outputs": [
+		{
+			"label": {
+				"en_US": "Output",
+				"zh_CN": "输出"
+			},
+			"value": "signal"
+		}
+	],
+	"node": {
+		"category": "source",
+		"icon": "iconPath",
+		"label": {
+			"en_US": "MQTT",
+			"zh_CN": "MQTT"
+		}
 	}
 }

+ 18 - 15
etc/ops/filter.json

@@ -1,4 +1,5 @@
 {
+  "name": "filter",
   "about": {
     "trial": false,
     "author": {
@@ -16,24 +17,26 @@
       "zh_CN": "用于过滤数据流的操作"
     }
   },
-  "properties": [{
-    "name": "expr",
-    "default": "",
-    "optional": false,
-    "control": "text",
-    "type": "string",
-    "hint": {
-      "en_US": "filter condition expression",
-      "zh_CN": "过滤条件语句"
-    },
-    "label": {
-      "en_US": "Condition",
-      "zh_CN": "条件"
+  "properties": [
+    {
+      "name": "expr",
+      "default": "12",
+      "optional": false,
+      "control": "text",
+      "type": "string",
+      "hint": {
+        "en_US": "filter condition expression",
+        "zh_CN": "过滤条件语句"
+      },
+      "label": {
+        "en_US": "Condition",
+        "zh_CN": "条件"
+      }
     }
-  }],
+  ],
   "node": {
     "display": true,
-    "category": "op",
+    "category": "operator",
     "input": {
       "type": "any",
       "rowType": "any",

+ 2 - 1
etc/ops/function.json

@@ -1,4 +1,5 @@
 {
+  "name": "function",
   "about": {
     "trial": false,
     "author": {
@@ -33,7 +34,7 @@
   }],
   "node": {
     "display": false,
-    "category": "op",
+    "category": "operator",
     "input": {
       "type": "any",
       "rowType": "any",

+ 3 - 2
etc/ops/groupby.json

@@ -1,4 +1,5 @@
 {
+  "name": "groupby",
   "about": {
     "trial": false,
     "author": {
@@ -18,7 +19,7 @@
   },
   "properties": [{
     "name": "dimensions",
-    "default": "[]",
+    "default": "",
     "optional": false,
     "control": "text",
     "type": "list_string",
@@ -33,7 +34,7 @@
   }],
   "node": {
     "display": true,
-    "category": "op",
+    "category": "operator",
     "input": {
       "type": "collection",
       "rowType": "single",

+ 2 - 1
etc/ops/join.json

@@ -1,4 +1,5 @@
 {
+  "name": "join",
   "about": {
     "trial": false,
     "author": {
@@ -100,7 +101,7 @@
   }],
   "node": {
     "display": true,
-    "category": "op",
+    "category": "operator",
     "input": {
       "type": "collection",
       "rowType": "single",

+ 4 - 3
etc/ops/orderby.json

@@ -1,4 +1,5 @@
 {
+  "name": "orderby",
   "about": {
     "trial": false,
     "author": {
@@ -64,7 +65,7 @@
   }],
   "node": {
     "display": true,
-    "category": "op",
+    "category": "operator",
     "input": {
       "type": "any",
       "rowType": "any",
@@ -76,8 +77,8 @@
     },
     "icon": "iconPath",
     "label": {
-      "en": "Filter",
-      "zh": "过滤"
+      "en": "Sort",
+      "zh": "排序"
     }
   }
 }

+ 3 - 2
etc/ops/pick.json

@@ -1,4 +1,5 @@
 {
+  "name": "pick",
   "about": {
     "trial": false,
     "author": {
@@ -20,7 +21,7 @@
     "name": "fields",
     "default": "",
     "optional": false,
-    "control": "text",
+    "control": "list",
     "type": "list_string",
     "hint": {
       "en_US": "select fields",
@@ -33,7 +34,7 @@
   }],
   "node": {
     "display": true,
-    "category": "op",
+    "category": "operator",
     "input": {
       "type": "any",
       "rowType": "any",

+ 3 - 2
etc/ops/window.json

@@ -1,4 +1,5 @@
 {
+  "name": "window",
   "about": {
     "trial": false,
     "author": {
@@ -21,7 +22,7 @@
       "name": "type",
       "default": "",
       "optional": false,
-      "control": "text",
+      "control": "select",
       "type": "string",
       "values": [
         "tumblingwindow",
@@ -90,7 +91,7 @@
   }],
   "node": {
     "display": false,
-    "category": "op",
+    "category": "operator",
     "input": {
       "type": "row",
       "rowType": "any",

+ 3 - 3
etc/sinks/edgex.json

@@ -418,11 +418,11 @@
     }
   ],
   "node": {
-    "category": "Sink",
+    "category": "sink",
     "icon": "iconPath",
     "label": {
-      "en": "EDGEX output",
-      "zh": "EDGEX 输出"
+      "en": "EDGEX",
+      "zh": "EDGEX"
     }
   }
 }

+ 3 - 3
etc/sinks/log.json

@@ -18,11 +18,11 @@
   },
   "properties": [],
   "node": {
-    "category": "Sink",
+    "category": "sink",
     "icon": "iconPath",
     "label": {
-      "en": "Log output",
-      "zh": "Log 输出"
+      "en": "Log",
+      "zh": "Log"
     }
   }
 }

+ 2 - 2
etc/sinks/memory.json

@@ -34,10 +34,10 @@
     }
   ],
   "node": {
-    "category": "Sink",
+    "category": "sink",
     "icon": "iconPath",
     "label": {
-      "en": "Memory output",
+      "en": "Memory",
       "zh": "内存输出"
     }
   }

+ 3 - 3
etc/sinks/mqtt.json

@@ -218,11 +218,11 @@
     }
   ],
   "node": {
-    "category": "Sink",
+    "category": "sink",
     "icon": "iconPath",
     "label": {
-      "en": "MQTT output",
-      "zh": "MQTT 输出"
+      "en": "MQTT",
+      "zh": "MQTT"
     }
   }
 }

+ 3 - 3
etc/sinks/neuron.json

@@ -79,11 +79,11 @@
     }
   ],
   "node": {
-    "category": "Sink",
+    "category": "sink",
     "icon": "iconPath",
     "label": {
-      "en": "Neuron output",
-      "zh": "Neuron 输出"
+      "en": "Neuron",
+      "zh": "Neuron"
     }
   }
 }

+ 3 - 3
etc/sinks/nop.json

@@ -34,11 +34,11 @@
     }
   ],
   "node": {
-    "category": "Sink",
+    "category": "sink",
     "icon": "iconPath",
     "label": {
-      "en": "Nop output",
-      "zh": "Nop 输出"
+      "en": "Nop",
+      "zh": "Nop"
     }
   }
 }

+ 3 - 3
etc/sinks/rest.json

@@ -188,11 +188,11 @@
     }
   ],
   "node": {
-    "category": "Sink",
+    "category": "sink",
     "icon": "iconPath",
     "label": {
-      "en": "Rest output",
-      "zh": "rest 输出"
+      "en": "Rest",
+      "zh": "Rest"
     }
   }
 }

+ 1 - 1
etc/sources/edgex.json

@@ -350,7 +350,7 @@
 		}
 	],
 	"node": {
-		"category": "Protocol/Protocol1",
+		"category": "source",
 		"icon": "iconPath",
 		"label": {
 			"en_US": "Edgex",

+ 1 - 1
etc/sources/file.json

@@ -73,7 +73,7 @@
     }
   ],
   "node": {
-    "category": "Protocol/Protocol1",
+    "category": "source",
     "icon": "iconPath",
     "label": {
       "en_US": "File",

+ 1 - 1
etc/sources/httppull.json

@@ -216,7 +216,7 @@
 		}
 	],
 	"node": {
-		"category": "Protocol/Protocol1",
+		"category": "source",
 		"icon": "iconPath",
 		"label": {
 			"en_US": "HTTP PULL",

+ 1 - 1
etc/sources/httppush.json

@@ -43,7 +43,7 @@
 		"value": "signal"
 	}],
 	"node": {
-		"category": "Protocol/Protocol1",
+		"category": "source",
 		"icon": "iconPath",
 		"label": {
 			"en_US": "HTTP PUSH",

+ 1 - 1
etc/sources/memory.json

@@ -29,7 +29,7 @@
     }
   ],
   "node": {
-    "category": "Protocol/Protocol1",
+    "category": "source",
     "icon": "iconPath",
     "label": {
       "en_US": "Memory",

+ 1 - 1
etc/sources/neuron.json

@@ -28,7 +28,7 @@
     }
   ],
   "node": {
-    "category": "Protocol/Protocol1",
+    "category": "source",
     "icon": "iconPath",
     "label": {
       "en_US": "Neuron",

+ 1 - 1
etc/sources/redis.json

@@ -86,7 +86,7 @@
     ]
   },
   "node": {
-    "category": ["Protocol","Protocol1"],
+    "category": "source",
     "icon": "iconPath",
     "label": {
       "en_US": "Redis",

+ 0 - 6
internal/binder/meta/bind.go

@@ -15,7 +15,6 @@
 package meta
 
 import (
-	"github.com/lf-edge/ekuiper/internal/binder/function"
 	"github.com/lf-edge/ekuiper/internal/binder/io"
 	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/internal/meta"
@@ -35,11 +34,6 @@ func Bind() {
 	}); nil != err {
 		conf.Log.Errorf("readSinkMetaDir:%v", err)
 	}
-	if err := meta.ReadFuncMetaDir(func(name string) bool {
-		return function.HasFunctionSet(name)
-	}); nil != err {
-		conf.Log.Errorf("readFuncMetaDir:%v", err)
-	}
 	if err := meta.ReadUiMsgDir(); nil != err {
 		conf.Log.Errorf("readUiMsgDir:%v", err)
 	}

+ 16 - 100
internal/meta/func_meta.go

@@ -15,147 +15,63 @@
 package meta
 
 import (
-	"fmt"
+	"bytes"
 	"os"
 	"path"
 	"strings"
 
 	"github.com/lf-edge/ekuiper/internal/conf"
-	"github.com/lf-edge/ekuiper/internal/pkg/filex"
 )
 
-type (
-	fileFunc struct {
-		Name       string        `json:"name"`
-		Example    string        `json:"example"`
-		Hint       *fileLanguage `json:"hint"`
-		Aggregate  bool          `json:"aggregate"`
-		ArgsFields []*fileField  `json:"args"`
-		Node       *fileNode     `json:"node"`
-		Outputs    []interface{} `json:"outputs"`
-	}
-	fileFuncs struct {
-		About   *fileAbout  `json:"about"`
-		Name    string      `json:"name"`
-		Version string      `json:"version"`
-		FiFuncs []*fileFunc `json:"functions"`
-	}
-	uiFunc struct {
-		Name       string        `json:"name"`
-		Example    string        `json:"example"`
-		Hint       *language     `json:"hint"`
-		Aggregate  bool          `json:"aggregate"`
-		ArgsFields []*fileField  `json:"args"`
-		Node       *node         `json:"node"`
-		Outputs    []interface{} `json:"outputs"`
-	}
-	uiFuncs struct {
-		About   *about    `json:"about"`
-		Name    string    `json:"name"`
-		Version string    `json:"version"`
-		UiFuncs []*uiFunc `json:"functions"`
-	}
-)
-
-func newUiFuncs(fi *fileFuncs) *uiFuncs {
-	if nil == fi {
-		return nil
-	}
-	uis := new(uiFuncs)
-	uis.About = newAbout(fi.About)
-	uis.Name = fi.Name
-	for _, v := range fi.FiFuncs {
-		ui := new(uiFunc)
-		ui.Name = v.Name
-		ui.Example = v.Example
-		ui.Hint = newLanguage(v.Hint)
-		ui.Aggregate = v.Aggregate
-		ui.ArgsFields = v.ArgsFields
-		ui.Node = newNode(v.Node)
-		ui.Outputs = make([]interface{}, len(v.Outputs))
-		for k, field := range v.Outputs {
-			ui.Outputs[k] = field
-		}
-		uis.UiFuncs = append(uis.UiFuncs, ui)
-	}
-	return uis
-}
+func readFuncMetaDir() []fileContent {
+	var filesByte []fileContent
 
-var gFuncmetadata = make(map[string]*uiFuncs)
-
-func ReadFuncMetaDir(checker InstallChecker) error {
 	confDir, err := conf.GetConfLoc()
 	if nil != err {
-		return err
+		return nil
 	}
 
 	dir := path.Join(confDir, "functions")
 	files, err := os.ReadDir(dir)
 	if nil != err {
-		return err
+		return nil
 	}
 	for _, file := range files {
 		fname := file.Name()
 		if !strings.HasSuffix(fname, ".json") {
 			continue
 		}
-
-		if err := ReadFuncMetaFile(path.Join(dir, fname), checker(strings.TrimSuffix(fname, ".json"))); nil != err {
-			return err
-		}
+		filesByte = append(filesByte, readFuncMetaFile(path.Join(dir, fname)))
 	}
 
 	confDir, err = conf.GetDataLoc()
 	if nil != err {
-		return err
+		return nil
 	}
 
 	dir = path.Join(confDir, "functions")
 	files, err = os.ReadDir(dir)
 	if nil != err {
-		return err
+		return nil
 	}
 	for _, file := range files {
 		fname := file.Name()
 		if !strings.HasSuffix(fname, ".json") {
 			continue
 		}
-
-		if err := ReadFuncMetaFile(path.Join(dir, fname), checker(strings.TrimSuffix(fname, ".json"))); nil != err {
-			return err
-		}
-	}
-	return nil
-}
-
-func UninstallFunc(name string) {
-	if ui, ok := gFuncmetadata[name+".json"]; ok {
-		if nil != ui.About {
-			ui.About.Installed = false
-		}
+		filesByte = append(filesByte, readFuncMetaFile(path.Join(dir, fname)))
 	}
+	return filesByte
 }
 
-func ReadFuncMetaFile(filePath string, installed bool) error {
+func readFuncMetaFile(filePath string) fileContent {
 	fiName := path.Base(filePath)
-	fis := new(fileFuncs)
-	err := filex.ReadJsonUnmarshal(filePath, fis)
-	if nil != err {
-		return fmt.Errorf("filePath:%s err:%v", filePath, err)
-	}
-	if nil == fis.About {
-		return fmt.Errorf("not found about of %s", filePath)
-	} else {
-		fis.About.Installed = installed
-	}
-	gFuncmetadata[fiName] = newUiFuncs(fis)
+	sliByte, _ := os.ReadFile(filePath)
 	conf.Log.Infof("funcMeta file : %s", fiName)
-	return nil
+	return sliByte
 }
 
-func GetFunctions() (ret []*uiFuncs) {
-	for _, v := range gFuncmetadata {
-		ret = append(ret, v)
-	}
-	return ret
+func GetFunctions() bytes.Buffer {
+	files := readFuncMetaDir()
+	return ConstructJsonArray(files)
 }

+ 21 - 0
internal/meta/msgUtil.go

@@ -15,6 +15,7 @@
 package meta
 
 import (
+	"bytes"
 	"os"
 	"path"
 
@@ -59,3 +60,23 @@ func ReadUiMsgDir() error {
 	}
 	return nil
 }
+
+func ConstructJsonArray(jsonByteItems []fileContent) bytes.Buffer {
+	var buf bytes.Buffer
+	var length = len(jsonByteItems)
+	if length == 0 {
+		buf.Write([]byte("[]"))
+		return buf
+	}
+
+	buf.Write([]byte("["))
+	buf.Write(jsonByteItems[0])
+
+	for i := 1; i < length; i++ {
+		buf.Write([]byte(","))
+		buf.Write(jsonByteItems[i])
+	}
+
+	buf.Write([]byte("]"))
+	return buf
+}

+ 76 - 0
internal/meta/msgUtil_test.go

@@ -0,0 +1,76 @@
+// Copyright 2022 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package meta
+
+import (
+	"bytes"
+	"reflect"
+	"testing"
+)
+
+func TestConstructJsonArray(t *testing.T) {
+	type args struct {
+		jsonByteItems []fileContent
+	}
+
+	var buf1 bytes.Buffer
+	buf1.Write([]byte("[]"))
+
+	var buf2 bytes.Buffer
+	buf2.Write([]byte(`[{"key": "key1"}]`))
+
+	var buf3 bytes.Buffer
+	buf3.Write([]byte(`[{"key1": "value1"},{"key2": "value2"}]`))
+
+	tests := []struct {
+		name string
+		args args
+		want bytes.Buffer
+	}{
+		{
+			name: "no value",
+			args: args{
+				jsonByteItems: nil,
+			},
+			want: buf1,
+		},
+		{
+			name: "one value",
+			args: args{
+				jsonByteItems: []fileContent{
+					[]byte(`{"key": "key1"}`),
+				},
+			},
+			want: buf2,
+		},
+		{
+			name: "two value",
+			args: args{
+				jsonByteItems: []fileContent{
+					[]byte(`{"key1": "value1"}`),
+					[]byte(`{"key2": "value2"}`),
+				},
+			},
+			want: buf3,
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			if got := ConstructJsonArray(tt.args.jsonByteItems); !reflect.DeepEqual(got, tt.want) {
+				t.Errorf("ConstructJsonArray() = %v, want %v", got, tt.want)
+			}
+		})
+	}
+}

+ 64 - 0
internal/meta/operator_meta.go

@@ -0,0 +1,64 @@
+// Copyright 2021 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package meta
+
+import (
+	"bytes"
+	"os"
+	"path"
+	"strings"
+
+	"github.com/lf-edge/ekuiper/internal/conf"
+)
+
+type fileContent []byte
+
+func readOpsMetaDir() ([]fileContent, error) {
+	var filesByte []fileContent
+
+	confDir, err := conf.GetConfLoc()
+	if nil != err {
+		return nil, err
+	}
+
+	dir := path.Join(confDir, "ops")
+	files, err := os.ReadDir(dir)
+	if nil != err {
+		return nil, err
+	}
+	for _, file := range files {
+		fname := file.Name()
+		if !strings.HasSuffix(fname, ".json") {
+			continue
+		}
+
+		filesByte = append(filesByte, readOpsMetaFile(path.Join(dir, fname)))
+
+	}
+
+	return filesByte, nil
+}
+
+func readOpsMetaFile(filePath string) fileContent {
+	fiName := path.Base(filePath)
+	sliByte, _ := os.ReadFile(filePath)
+	conf.Log.Infof("operatorMeta file : %s", fiName)
+	return sliByte
+}
+
+func GetOperators() bytes.Buffer {
+	files, _ := readOpsMetaDir()
+	return ConstructJsonArray(files)
+}

+ 22 - 21
internal/meta/sinkMeta.go

@@ -59,16 +59,16 @@ type (
 		HelpUrl     *fileLanguage `json:"helpUrl"`
 		Description *fileLanguage `json:"description"`
 	}
-	fileNode struct {
-		Category string        `json:"category"`
-		Icon     string        `json:"iconPath"`
-		Label    *fileLanguage `json:"label"`
-	}
+	//fileNode struct {
+	//	Category string        `json:"category"`
+	//	Icon     string        `json:"iconPath"`
+	//	Label    *fileLanguage `json:"label"`
+	//}
 	fileSink struct {
 		About  *fileAbout   `json:"about"`
 		Libs   []string     `json:"libs"`
 		Fields []*fileField `json:"properties"`
-		Node   *fileNode    `json:"node"`
+		Node   interface{}  `json:"node"`
 	}
 	language struct {
 		English string `json:"en"`
@@ -99,10 +99,10 @@ type (
 		Label    *language `json:"label"`
 	}
 	uiSink struct {
-		About  *about   `json:"about"`
-		Libs   []string `json:"libs"`
-		Fields []field  `json:"properties"`
-		Node   *node    `json:"node"`
+		About  *about      `json:"about"`
+		Libs   []string    `json:"libs"`
+		Fields []field     `json:"properties"`
+		Node   interface{} `json:"node"`
 	}
 )
 
@@ -158,16 +158,17 @@ func newAbout(fi *fileAbout) *about {
 	ui.Description = newLanguage(fi.Description)
 	return ui
 }
-func newNode(fi *fileNode) *node {
-	if nil == fi {
-		return nil
-	}
-	ui := new(node)
-	ui.Category = fi.Category
-	ui.Icon = fi.Icon
-	ui.Label = newLanguage(fi.Label)
-	return ui
-}
+
+//func newNode(fi *fileNode) *node {
+//	if nil == fi {
+//		return nil
+//	}
+//	ui := new(node)
+//	ui.Category = fi.Category
+//	ui.Icon = fi.Icon
+//	ui.Label = newLanguage(fi.Label)
+//	return ui
+//}
 func newUiSink(fi *fileSink) (*uiSink, error) {
 	if nil == fi {
 		return nil, nil
@@ -175,8 +176,8 @@ func newUiSink(fi *fileSink) (*uiSink, error) {
 	var err error
 	ui := new(uiSink)
 	ui.Libs = fi.Libs
+	ui.Node = fi.Node
 	ui.About = newAbout(fi.About)
-	ui.Node = newNode(fi.Node)
 	ui.Fields, err = newField(fi.Fields)
 	return ui, err
 }

+ 16 - 25
internal/meta/sinkMeta_test.go

@@ -15,37 +15,28 @@
 package meta
 
 import (
+	"fmt"
+	"github.com/lf-edge/ekuiper/internal/conf"
+	"path"
 	"testing"
 )
 
 func TestHintWhenModifySink(t *testing.T) {
-	taosMeta := &uiSink{
-		Fields: []field{
-			{
-				Name:    "ip",
-				Default: "911.911.911.911",
-			},
-		},
-	}
-	logMeta := &uiSink{
-		Fields: []field{
-			{
-				Name:    "ip",
-				Default: "911.911.911.911",
-			},
-		},
+	confDir, err := conf.GetConfLoc()
+	if nil != err {
+		return
 	}
 
-	gSinkmetadata = make(map[string]*uiSink)
-	gSinkmetadata["taos.json"] = taosMeta
-	gSinkmetadata["log.json"] = logMeta
+	if err = ReadSinkMetaFile(path.Join(confDir, "sinks", "mqtt.json"), true); nil != err {
+		t.Error(err)
+		return
+	}
 
-	oldSink, err := GetSinkMeta("taos", "en_US")
-	if err != nil {
-		t.Errorf("%v", err)
-	} else {
-		if "911.911.911.911" != oldSink.Fields[0].Default {
-			t.Errorf("fail")
-		}
+	showMeta, err := GetSinkMeta("mqtt", "zh_CN")
+	if nil != err {
+		t.Error(err)
 	}
+
+	fmt.Printf("%+v", showMeta)
+
 }

+ 3 - 9
internal/meta/sourceMeta.go

@@ -30,15 +30,13 @@ type (
 		About    *fileAbout              `json:"about"`
 		Libs     []string                `json:"libs"`
 		ConfKeys map[string][]*fileField `json:"properties"`
-		Node     *fileNode               `json:"node"`
-		Outputs  []interface{}           `json:"outputs"`
+		Node     interface{}             `json:"node"`
 	}
 	uiSource struct {
 		About    *about             `json:"about"`
 		Libs     []string           `json:"libs"`
 		ConfKeys map[string][]field `json:"properties"`
-		Node     *node              `json:"node"`
-		Outputs  []interface{}      `json:"outputs"`
+		Node     interface{}        `json:"node"`
 	}
 )
 
@@ -50,11 +48,7 @@ func newUiSource(fi *fileSource) (*uiSource, error) {
 	ui := new(uiSource)
 	ui.Libs = fi.Libs
 	ui.About = newAbout(fi.About)
-	ui.Node = newNode(fi.Node)
-	ui.Outputs = make([]interface{}, len(fi.Outputs))
-	for k, field := range fi.Outputs {
-		ui.Outputs[k] = field
-	}
+	ui.Node = fi.Node
 	ui.ConfKeys = make(map[string][]field)
 	for k, fields := range fi.ConfKeys {
 		if ui.ConfKeys[k], err = newField(fields); nil != err {

+ 0 - 5
internal/plugin/native/manager.go

@@ -289,10 +289,6 @@ func (rr *Manager) Register(t plugin2.PluginType, j plugin2.Plugin) error {
 		if err := meta.ReadSourceMetaFile(path.Join(rr.pluginConfDir, plugin2.PluginTypes[t], name+`.json`), true); nil != err {
 			conf.Log.Errorf("readSourceFile:%v", err)
 		}
-	case plugin2.FUNCTION:
-		if err := meta.ReadFuncMetaFile(path.Join(rr.pluginConfDir, plugin2.PluginTypes[t], name+`.json`), true); nil != err {
-			conf.Log.Errorf("readFuncFile:%v", err)
-		}
 	}
 	return nil
 }
@@ -362,7 +358,6 @@ func (rr *Manager) Delete(t plugin2.PluginType, name string, stop bool) error {
 		} else if !ok {
 			rr.removeSymbols([]string{name})
 		}
-		meta.UninstallFunc(name)
 	}
 
 	for _, p := range paths {

+ 0 - 6
internal/plugin/portable/manager.go

@@ -142,11 +142,6 @@ func (m *Manager) doRegister(name string, pi *PluginInfo, isInit bool) error {
 				conf.Log.Errorf("read sink json file:%v", err)
 			}
 		}
-		for _, s := range pi.Functions {
-			if err := meta.ReadFuncMetaFile(path.Join(m.pluginConfDir, plugin.PluginTypes[plugin.FUNCTION], s+`.json`), true); nil != err {
-				conf.Log.Errorf("read function json file:%v", err)
-			}
-		}
 	}
 
 	conf.Log.Infof("Installed portable plugin %s successfully", name)
@@ -368,7 +363,6 @@ func (m *Manager) Delete(name string) error {
 	for _, s := range pinfo.Functions {
 		p := path.Join(m.pluginConfDir, plugin.PluginTypes[plugin.FUNCTION], s+".json")
 		os.Remove(p)
-		meta.UninstallFunc(s)
 	}
 	_ = os.RemoveAll(path.Join(m.pluginDir, name))
 	return nil

+ 11 - 2
internal/server/meta_init.go

@@ -42,6 +42,7 @@ func (m metaComp) register() {
 
 func (m metaComp) rest(r *mux.Router) {
 	r.HandleFunc("/metadata/functions", functionsMetaHandler).Methods(http.MethodGet)
+	r.HandleFunc("/metadata/operators", operatorsMetaHandler).Methods(http.MethodGet)
 	r.HandleFunc("/metadata/sinks", sinksMetaHandler).Methods(http.MethodGet)
 	r.HandleFunc("/metadata/sinks/{name}", newSinkMetaHandler).Methods(http.MethodGet)
 	r.HandleFunc("/metadata/sources", sourcesMetaHandler).Methods(http.MethodGet)
@@ -85,8 +86,16 @@ func newSinkMetaHandler(w http.ResponseWriter, r *http.Request) {
 // list functions
 func functionsMetaHandler(w http.ResponseWriter, r *http.Request) {
 	defer r.Body.Close()
-	sinks := meta.GetFunctions()
-	jsonResponse(sinks, w, logger)
+	funcs := meta.GetFunctions()
+	jsonByteResponse(funcs, w, logger)
+	return
+}
+
+// list operators
+func operatorsMetaHandler(w http.ResponseWriter, r *http.Request) {
+	defer r.Body.Close()
+	ops := meta.GetOperators()
+	jsonByteResponse(ops, w, logger)
 	return
 }
 

+ 12 - 0
internal/server/rest.go

@@ -98,6 +98,18 @@ func jsonResponse(i interface{}, w http.ResponseWriter, logger api.Logger) {
 	}
 }
 
+func jsonByteResponse(buffer bytes.Buffer, w http.ResponseWriter, logger api.Logger) {
+	w.Header().Add(ContentType, ContentTypeJSON)
+
+	w.Header().Add("Content-Length", strconv.Itoa(buffer.Len()))
+
+	_, err := w.Write(buffer.Bytes())
+	// Problems encoding
+	if err != nil {
+		handleError(w, err, "", logger)
+	}
+}
+
 func createRestServer(ip string, port int, needToken bool) *http.Server {
 	dataDir, err := conf.GetDataLoc()
 	if err != nil {

+ 2 - 1
internal/topo/planner/planner_graph.go

@@ -17,6 +17,7 @@ package planner
 import (
 	"errors"
 	"fmt"
+	"github.com/lf-edge/ekuiper/internal/binder/function"
 	"github.com/lf-edge/ekuiper/internal/topo"
 	"github.com/lf-edge/ekuiper/internal/topo/graph"
 	"github.com/lf-edge/ekuiper/internal/topo/node"
@@ -460,7 +461,7 @@ func parseFunc(props map[string]interface{}) (*operator.FuncOp, error) {
 	} else {
 		name = f.Name
 	}
-	return &operator.FuncOp{CallExpr: c, Name: name}, nil
+	return &operator.FuncOp{CallExpr: c, Name: name, IsAgg: function.IsAggFunc(name)}, nil
 }
 
 func parseFilter(props map[string]interface{}) (*operator.FilterOp, error) {