RockyJin преди 4 години
родител
ревизия
b74a25433e
променени са 43 файла, в които са добавени 1241 реда и са изтрити 280 реда
  1. 1 1
      docs/en_US/extension/function.md
  2. 6 0
      docs/en_US/extension/overview.md
  3. 4 2
      docs/en_US/extension/sink.md
  4. 4 2
      docs/en_US/extension/source.md
  5. 12 7
      docs/en_US/rules/overview.md
  6. 12 7
      docs/zh_CN/rules/overview.md
  7. 20 0
      fvt_scripts/edgex_mqtt_sink_rule.jmx
  8. 19 8
      fvt_scripts/edgex_sink_rule.jmx
  9. 53 1
      plugins/manager.go
  10. 3 1
      plugins/sinks/file.go
  11. 3 1
      plugins/sinks/memory.go
  12. 3 1
      plugins/sinks/zmq.go
  13. 7 5
      plugins/sources/random.go
  14. 3 1
      plugins/sources/zmq.go
  15. 52 22
      xsql/ast.go
  16. 42 17
      xsql/funcs_aggregate.go
  17. 2 7
      xsql/funcs_ast_validator.go
  18. 23 14
      xsql/functions.go
  19. 102 44
      xsql/plans/aggregate_operator.go
  20. 377 4
      xsql/plans/aggregate_test.go
  21. 4 4
      xsql/plans/filter_operator.go
  22. 4 4
      xsql/plans/filter_test.go
  23. 17 20
      xsql/plans/having_operator.go
  24. 156 3
      xsql/plans/having_test.go
  25. 2 1
      xsql/plans/join_multi_test.go
  26. 15 15
      xsql/plans/join_operator.go
  27. 12 6
      xsql/plans/join_test.go
  28. 2 1
      xsql/plans/math_func_test.go
  29. 6 3
      xsql/plans/misc_func_test.go
  30. 2 2
      xsql/plans/order_operator.go
  31. 41 1
      xsql/plans/order_test.go
  32. 16 18
      xsql/plans/preprocessor.go
  33. 8 4
      xsql/plans/preprocessor_test.go
  34. 10 9
      xsql/plans/project_operator.go
  35. 106 14
      xsql/plans/project_test.go
  36. 2 1
      xsql/plans/str_func_test.go
  37. 6 1
      xsql/processors/extension_test.go
  38. 14 4
      xsql/processors/xsql_processor.go
  39. 50 0
      xsql/processors/xsql_processor_test.go
  40. 5 6
      xstream/extensions/edgex_source.go
  41. 6 9
      xstream/nodes/sink_node.go
  42. 5 7
      xstream/nodes/source_node.go
  43. 4 2
      xstream/operators/operations.go

+ 1 - 1
docs/en_US/extension/function.md

@@ -30,7 +30,7 @@ The main task for a Function is to implement _exec_ method. The method will be l
 Exec(args []interface{}) (interface{}, bool)
 Exec(args []interface{}) (interface{}, bool)
 ```  
 ```  
 
 
-As the function itself is a plugin, it must be in the main package. Given the function struct name is myFunction. At last of the file, the source must be exported as a symbol as below.
+As the function itself is a plugin, it must be in the main package. Given the function struct name is myFunction. At last of the file, the source must be exported as a symbol as below. There are [2 types of exported symbol supported](overview.md#plugin-development). For function extension, if there is no internal state, it is recommended to export a singleton instance.
 
 
 ```go
 ```go
 var MyFunction myFunction
 var MyFunction myFunction

+ 6 - 0
docs/en_US/extension/overview.md

@@ -39,6 +39,12 @@ go build --buildmode=plugin -o plugins/sources/MySource.so plugins/sources/my_so
 ```
 ```
 
 
 
 
+### Plugin development
+The development of plugins is to implement a specific interface according to the plugin type and export the implementation with a specific name. There are two types of exported symbol supported:
+
+1. Export a constructor function: Kuiper will use the constructor function to create a new instance of the plugin implementation for each load. So each rule will have one instance of the plugin and each instance will be isolated from others. This is the recommended way.
+
+2. Export an instance: Kuiper will use the instance as singleton for all plugin load. So all rules will share the same instance. For such implementation, the developer will need to handle the shared states to avoid any potential multi-thread problems. This mode is recommended where there are no shared states and the performance is critical. Especially, function extension is usually functional without internal state which is suitable for this mode.
 
 
 Please read below for how to realize the different extensions.
 Please read below for how to realize the different extensions.
 
 

+ 4 - 2
docs/en_US/extension/sink.md

@@ -35,10 +35,12 @@ The last method to implement is _Close_ which literally close the connection. It
 Close(ctx StreamContext) error
 Close(ctx StreamContext) error
 ```
 ```
 
 
-As the sink itself is a plugin, it must be in the main package. Given the sink struct name is mySink. At last of the file, the sink must be exported as a symbol as below.
+As the sink itself is a plugin, it must be in the main package. Given the sink struct name is mySink. At last of the file, the sink must be exported as a symbol as below. There are [2 types of exported symbol supported](overview.md#plugin-development). For sink extension, states are usually needed, so it is recommended to export a constructor function.
 
 
 ```go
 ```go
-var MySink mySink
+func MySink() api.Sink {
+	return &mySink{}
+}
 ```
 ```
 
 
 The [Memory Sink](../../../plugins/sinks/memory.go) is a good example.
 The [Memory Sink](../../../plugins/sinks/memory.go) is a good example.

+ 4 - 2
docs/en_US/extension/source.md

@@ -30,10 +30,12 @@ The last method to implement is _Close_ which literally close the connection. It
 Close(ctx StreamContext) error
 Close(ctx StreamContext) error
 ```
 ```
 
 
-As the source itself is a plugin, it must be in the main package. Given the source struct name is mySource. At last of the file, the source must be exported as a symbol as below.
+As the source itself is a plugin, it must be in the main package. Given the source struct name is mySource. At last of the file, the source must be exported as a symbol as below. There are [2 types of exported symbol supported](overview.md#plugin-development). For source extension, states are usually needed, so it is recommended to export a constructor function.
 
 
 ```go
 ```go
-var MySource mySource
+function MySource() api.Source{
+    return &mySource{}
+}
 ```
 ```
 
 
 The [Randome Source](../../../plugins/sources/random.go) is a good example.
 The [Randome Source](../../../plugins/sources/random.go) is a good example.

+ 12 - 7
docs/en_US/rules/overview.md

@@ -86,37 +86,42 @@ In sendSingle=true mode:
 - Print out the whole record
 - Print out the whole record
 
 
 ```
 ```
-"dataTemplate": `{"content":{{json .}}}`,
+"dataTemplate": "{\"content\":{{json .}}}",
 ```
 ```
-- Print out the the ab field
+- Print out the ab field
 
 
 ```
 ```
-"dataTemplate": `{"content":{{.ab}}}`,
+"dataTemplate": "{\"content\":{{.ab}}}",
+```
+
+if the ab field is a string, add the quotes
+```
+"dataTemplate": "{\"content\":\"{{.ab}}\"}",
 ```
 ```
 
 
 In sendSingle=false mode:
 In sendSingle=false mode:
 - Print out the whole record array
 - Print out the whole record array
 
 
 ```
 ```
-"dataTemplate": `{"content":{{json .}}}`,
+"dataTemplate": "{\"content\":{{json .}}}",
 ```
 ```
 
 
 - Print out the first record
 - Print out the first record
 
 
 ```
 ```
-"dataTemplate": `{"content":{{json (index . 0)}}}`,
+"dataTemplate": "{\"content\":{{json (index . 0)}}}",
 ```
 ```
 
 
 - Print out the field ab of the first record
 - Print out the field ab of the first record
 
 
 ```
 ```
-"dataTemplate": `{"content":{{index . 0 "ab"}}}`,
+"dataTemplate": "{\"content\":{{index . 0 \"ab\"}}}",
 ```
 ```
 
 
 - Print out field ab of each record in the array to html format
 - Print out field ab of each record in the array to html format
 
 
 ```
 ```
-"dataTemplate": `<div>results</div><ul>{{range .}}<li>{{.ab}}</li>{{end}}</ul>`,
+"dataTemplate": "<div>results</div><ul>{{range .}}<li>{{.ab}}</li>{{end}}</ul>",
 ```
 ```
 
 
 Actions could be customized to support different kinds of outputs, see [extension](../extension/overview.md) for more detailed info.
 Actions could be customized to support different kinds of outputs, see [extension](../extension/overview.md) for more detailed info.

+ 12 - 7
docs/zh_CN/rules/overview.md

@@ -76,38 +76,43 @@ In sendSingle=true mode:
 - Print out the whole record
 - Print out the whole record
 
 
 ```
 ```
-"dataTemplate": `{"content":{{json .}}}`,
+"dataTemplate": "{\"content\":{{json .}}}",
 ```
 ```
 
 
-- Print out the the ab field
+- Print out the ab field
 
 
 ```
 ```
-"dataTemplate": `{"content":{{.ab}}}`,
+"dataTemplate": "{\"content\":{{.ab}}}",
+```
+
+if the ab field is a string, add the quotes
+```
+"dataTemplate": "{\"content\":\"{{.ab}}\"}",
 ```
 ```
 
 
 In sendSingle=false mode:
 In sendSingle=false mode:
 - Print out the whole record array
 - Print out the whole record array
 
 
 ```
 ```
-"dataTemplate": `{"content":{{json .}}}`,
+"dataTemplate": "{\"content\":{{json .}}}",
 ```
 ```
 
 
 - Print out the first record
 - Print out the first record
 
 
 ```
 ```
-"dataTemplate": `{"content":{{json (index . 0)}}}`,
+"dataTemplate": "{\"content\":{{json (index . 0)}}}",
 ```
 ```
 
 
 - Print out the field ab of the first record
 - Print out the field ab of the first record
 
 
 ```
 ```
-"dataTemplate": `{"content":{{index . 0 "ab"}}}`,
+"dataTemplate": "{\"content\":{{index . 0 \"ab\"}}}",
 ```
 ```
 
 
 - Print out field ab of each record in the array to html format
 - Print out field ab of each record in the array to html format
 
 
 ```
 ```
-"dataTemplate": `<div>results</div><ul>{{range .}}<li>{{.ab}}</li>{{end}}</ul>`,
+"dataTemplate": "<div>results</div><ul>{{range .}}<li>{{.ab}}</li>{{end}}</ul>",
 ```
 ```
 
 
 
 

+ 20 - 0
fvt_scripts/edgex_mqtt_sink_rule.jmx

@@ -487,6 +487,26 @@
             <boolProp name="ISREGEX">false</boolProp>
             <boolProp name="ISREGEX">false</boolProp>
           </JSONPathAssertion>
           </JSONPathAssertion>
           <hashTree/>
           <hashTree/>
+          <ResponseAssertion guiclass="AssertionGui" testclass="ResponseAssertion" testname="temperature name assertion" enabled="true">
+            <collectionProp name="Asserion.test_strings">
+              <stringProp name="321701236">temperature</stringProp>
+            </collectionProp>
+            <stringProp name="Assertion.custom_message"></stringProp>
+            <stringProp name="Assertion.test_field">Assertion.response_data</stringProp>
+            <boolProp name="Assertion.assume_success">false</boolProp>
+            <intProp name="Assertion.test_type">2</intProp>
+          </ResponseAssertion>
+          <hashTree/>
+          <ResponseAssertion guiclass="AssertionGui" testclass="ResponseAssertion" testname="humidity name assertion" enabled="true">
+            <collectionProp name="Asserion.test_strings">
+              <stringProp name="548027571">humidity</stringProp>
+            </collectionProp>
+            <stringProp name="Assertion.custom_message"></stringProp>
+            <stringProp name="Assertion.test_field">Assertion.response_data</stringProp>
+            <boolProp name="Assertion.assume_success">false</boolProp>
+            <intProp name="Assertion.test_type">2</intProp>
+          </ResponseAssertion>
+          <hashTree/>
         </hashTree>
         </hashTree>
       </hashTree>
       </hashTree>
     </hashTree>
     </hashTree>

+ 19 - 8
fvt_scripts/edgex_sink_rule.jmx

@@ -444,14 +444,25 @@
             <boolProp name="ISREGEX">false</boolProp>
             <boolProp name="ISREGEX">false</boolProp>
           </JSONPathAssertion>
           </JSONPathAssertion>
           <hashTree/>
           <hashTree/>
-          <JSONPathAssertion guiclass="JSONPathAssertionGui" testclass="JSONPathAssertion" testname="humidity name Assertion" enabled="true">
-            <stringProp name="JSON_PATH">$.readings[0].name</stringProp>
-            <stringProp name="EXPECTED_VALUE">81</stringProp>
-            <boolProp name="JSONVALIDATION">false</boolProp>
-            <boolProp name="EXPECT_NULL">false</boolProp>
-            <boolProp name="INVERT">false</boolProp>
-            <boolProp name="ISREGEX">false</boolProp>
-          </JSONPathAssertion>
+          <ResponseAssertion guiclass="AssertionGui" testclass="ResponseAssertion" testname="Humidity name assertion" enabled="true">
+            <collectionProp name="Asserion.test_strings">
+              <stringProp name="612671699">Humidity</stringProp>
+            </collectionProp>
+            <stringProp name="Assertion.custom_message"></stringProp>
+            <stringProp name="Assertion.test_field">Assertion.response_data</stringProp>
+            <boolProp name="Assertion.assume_success">false</boolProp>
+            <intProp name="Assertion.test_type">2</intProp>
+          </ResponseAssertion>
+          <hashTree/>
+          <ResponseAssertion guiclass="AssertionGui" testclass="ResponseAssertion" testname="temperature name assertion" enabled="true">
+            <collectionProp name="Asserion.test_strings">
+              <stringProp name="321701236">temperature</stringProp>
+            </collectionProp>
+            <stringProp name="Assertion.custom_message"></stringProp>
+            <stringProp name="Assertion.test_field">Assertion.response_data</stringProp>
+            <boolProp name="Assertion.assume_success">false</boolProp>
+            <intProp name="Assertion.test_type">2</intProp>
+          </ResponseAssertion>
           <hashTree/>
           <hashTree/>
         </hashTree>
         </hashTree>
       </hashTree>
       </hashTree>

+ 53 - 1
plugins/manager.go

@@ -5,6 +5,7 @@ import (
 	"errors"
 	"errors"
 	"fmt"
 	"fmt"
 	"github.com/emqx/kuiper/common"
 	"github.com/emqx/kuiper/common"
+	"github.com/emqx/kuiper/xstream/api"
 	"io"
 	"io"
 	"io/ioutil"
 	"io/ioutil"
 	"net/http"
 	"net/http"
@@ -88,7 +89,7 @@ func (rr *Registry) Get(t PluginType, name string) (string, bool) {
 
 
 var symbolRegistry = make(map[string]plugin.Symbol)
 var symbolRegistry = make(map[string]plugin.Symbol)
 
 
-func GetPlugin(t string, pt PluginType) (plugin.Symbol, error) {
+func getPlugin(t string, pt PluginType) (plugin.Symbol, error) {
 	ut := ucFirst(t)
 	ut := ucFirst(t)
 	ptype := PluginTypes[pt]
 	ptype := PluginTypes[pt]
 	key := ptype + "/" + t
 	key := ptype + "/" + t
@@ -121,6 +122,57 @@ func GetPlugin(t string, pt PluginType) (plugin.Symbol, error) {
 	return nf, nil
 	return nf, nil
 }
 }
 
 
+func GetSource(t string) (api.Source, error) {
+	nf, err := getPlugin(t, SOURCE)
+	if err != nil {
+		return nil, err
+	}
+	var s api.Source
+	switch t := nf.(type) {
+	case api.Source:
+		s = t
+	case func() api.Source:
+		s = t()
+	default:
+		return nil, fmt.Errorf("exported symbol %s is not type of api.Source or function that return api.Source", t)
+	}
+	return s, nil
+}
+
+func GetSink(t string) (api.Sink, error) {
+	nf, err := getPlugin(t, SINK)
+	if err != nil {
+		return nil, err
+	}
+	var s api.Sink
+	switch t := nf.(type) {
+	case api.Sink:
+		s = t
+	case func() api.Sink:
+		s = t()
+	default:
+		return nil, fmt.Errorf("exported symbol %s is not type of api.Sink or function that return api.Sink", t)
+	}
+	return s, nil
+}
+
+func GetFunction(t string) (api.Function, error) {
+	nf, err := getPlugin(t, FUNCTION)
+	if err != nil {
+		return nil, err
+	}
+	var s api.Function
+	switch t := nf.(type) {
+	case api.Function:
+		s = t
+	case func() api.Function:
+		s = t()
+	default:
+		return nil, fmt.Errorf("exported symbol %s is not type of api.Function or function that return api.Function", t)
+	}
+	return s, nil
+}
+
 type Manager struct {
 type Manager struct {
 	pluginDir string
 	pluginDir string
 	etcDir    string
 	etcDir    string

+ 3 - 1
plugins/sinks/file.go

@@ -115,4 +115,6 @@ func (m *fileSink) Close(ctx api.StreamContext) error {
 	return nil
 	return nil
 }
 }
 
 
-var File fileSink
+func File() api.Sink {
+	return &fileSink{}
+}

+ 3 - 1
plugins/sinks/memory.go

@@ -33,4 +33,6 @@ func (m *memory) Configure(props map[string]interface{}) error {
 	return nil
 	return nil
 }
 }
 
 
-var Memory memory
+func Memory() api.Sink {
+	return &memory{}
+}

+ 3 - 1
plugins/sinks/zmq.go

@@ -79,4 +79,6 @@ func (m *zmqSink) Close(ctx api.StreamContext) error {
 	return nil
 	return nil
 }
 }
 
 
-var Zmq zmqSink
+func Zmq() api.Sink {
+	return &zmqSink{}
+}

+ 7 - 5
plugins/sources/random.go

@@ -16,8 +16,8 @@ type randomSource struct {
 }
 }
 
 
 func (s *randomSource) Configure(topic string, props map[string]interface{}) error {
 func (s *randomSource) Configure(topic string, props map[string]interface{}) error {
-	if i, ok := props["interval"].(float64); ok {
-		s.interval = int(i)
+	if i, ok := props["interval"].(int); ok {
+		s.interval = i
 	} else {
 	} else {
 		s.interval = 1000
 		s.interval = 1000
 	}
 	}
@@ -27,8 +27,8 @@ func (s *randomSource) Configure(topic string, props map[string]interface{}) err
 		s.pattern = make(map[string]interface{})
 		s.pattern = make(map[string]interface{})
 		s.pattern["count"] = 50
 		s.pattern["count"] = 50
 	}
 	}
-	if i, ok := props["seed"].(float64); ok {
-		s.seed = int(i)
+	if i, ok := props["seed"].(int); ok {
+		s.seed = i
 	} else {
 	} else {
 		s.seed = 1
 		s.seed = 1
 	}
 	}
@@ -66,4 +66,6 @@ func (s *randomSource) Close(ctx api.StreamContext) error {
 	return nil
 	return nil
 }
 }
 
 
-var Random randomSource
+func Random() api.Source {
+	return &randomSource{}
+}

+ 3 - 1
plugins/sources/zmq.go

@@ -86,4 +86,6 @@ func (s *zmqSource) Close(ctx api.StreamContext) error {
 	return nil
 	return nil
 }
 }
 
 
-var Zmq zmqSource
+func Zmq() api.Source {
+	return &zmqSource{}
+}

+ 52 - 22
xsql/ast.go

@@ -5,7 +5,6 @@ import (
 	"fmt"
 	"fmt"
 	"github.com/emqx/kuiper/common"
 	"github.com/emqx/kuiper/common"
 	"github.com/emqx/kuiper/plugins"
 	"github.com/emqx/kuiper/plugins"
-	"github.com/emqx/kuiper/xstream/api"
 	"math"
 	"math"
 	"reflect"
 	"reflect"
 	"sort"
 	"sort"
@@ -504,6 +503,7 @@ type CallValuer interface {
 type AggregateCallValuer interface {
 type AggregateCallValuer interface {
 	CallValuer
 	CallValuer
 	GetAllTuples() AggregateData
 	GetAllTuples() AggregateData
+	GetSingleCallValuer() CallValuer
 }
 }
 
 
 type Wildcarder interface {
 type Wildcarder interface {
@@ -543,7 +543,7 @@ func (wv *WildcardValuer) Meta(key string) (interface{}, bool) {
  */
  */
 
 
 type AggregateData interface {
 type AggregateData interface {
-	AggregateEval(expr Expr) []interface{}
+	AggregateEval(expr Expr, v CallValuer) []interface{}
 }
 }
 
 
 // Message is a valuer that substitutes values for the mapped interface.
 // Message is a valuer that substitutes values for the mapped interface.
@@ -551,7 +551,17 @@ type Message map[string]interface{}
 
 
 // Value returns the value for a key in the Message.
 // Value returns the value for a key in the Message.
 func (m Message) Value(key string) (interface{}, bool) {
 func (m Message) Value(key string) (interface{}, bool) {
-	key = strings.ToLower(key)
+	key1 := strings.ToLower(key)
+	if v, ok := m.valueUtil(key1); ok {
+		return v, ok
+	} else {
+		//Only when with 'SELECT * FROM ...'  and 'schemaless', the key in map is not convert to lower case.
+		//So all of keys in map should be convert to lowercase and then compare them.
+		return m.getIgnoreCase(key)
+	}
+}
+
+func (m Message) valueUtil(key string) (interface{}, bool) {
 	if keys := strings.Split(key, "."); len(keys) == 1 {
 	if keys := strings.Split(key, "."); len(keys) == 1 {
 		v, ok := m[key]
 		v, ok := m[key]
 		return v, ok
 		return v, ok
@@ -563,6 +573,18 @@ func (m Message) Value(key string) (interface{}, bool) {
 	return nil, false
 	return nil, false
 }
 }
 
 
+func (m Message) getIgnoreCase(key interface{}) (interface{}, bool) {
+	if k, ok := key.(string); ok {
+		key = strings.ToLower(k)
+		for k, v := range m {
+			if strings.ToLower(k) == key {
+				return v, true
+			}
+		}
+	}
+	return nil, false
+}
+
 func (m Message) Meta(key string) (interface{}, bool) {
 func (m Message) Meta(key string) (interface{}, bool) {
 	if key == "*" {
 	if key == "*" {
 		return map[string]interface{}(m), true
 		return map[string]interface{}(m), true
@@ -612,8 +634,8 @@ func (t *Tuple) All(stream string) (interface{}, bool) {
 	return t.Message, true
 	return t.Message, true
 }
 }
 
 
-func (t *Tuple) AggregateEval(expr Expr) []interface{} {
-	return []interface{}{Eval(expr, t)}
+func (t *Tuple) AggregateEval(expr Expr, v CallValuer) []interface{} {
+	return []interface{}{Eval(expr, t, v)}
 }
 }
 
 
 func (t *Tuple) GetTimestamp() int64 {
 func (t *Tuple) GetTimestamp() int64 {
@@ -694,13 +716,13 @@ func (w WindowTuplesSet) Sort() {
 	}
 	}
 }
 }
 
 
-func (w WindowTuplesSet) AggregateEval(expr Expr) []interface{} {
+func (w WindowTuplesSet) AggregateEval(expr Expr, v CallValuer) []interface{} {
 	var result []interface{}
 	var result []interface{}
 	if len(w) != 1 { //should never happen
 	if len(w) != 1 { //should never happen
 		return nil
 		return nil
 	}
 	}
 	for _, t := range w[0].Tuples {
 	for _, t := range w[0].Tuples {
-		result = append(result, Eval(expr, &t))
+		result = append(result, Eval(expr, &t, v))
 	}
 	}
 	return result
 	return result
 }
 }
@@ -798,20 +820,20 @@ func (s JoinTupleSets) Len() int           { return len(s) }
 func (s JoinTupleSets) Swap(i, j int)      { s[i], s[j] = s[j], s[i] }
 func (s JoinTupleSets) Swap(i, j int)      { s[i], s[j] = s[j], s[i] }
 func (s JoinTupleSets) Index(i int) Valuer { return &(s[i]) }
 func (s JoinTupleSets) Index(i int) Valuer { return &(s[i]) }
 
 
-func (s JoinTupleSets) AggregateEval(expr Expr) []interface{} {
+func (s JoinTupleSets) AggregateEval(expr Expr, v CallValuer) []interface{} {
 	var result []interface{}
 	var result []interface{}
 	for _, t := range s {
 	for _, t := range s {
-		result = append(result, Eval(expr, &t))
+		result = append(result, Eval(expr, &t, v))
 	}
 	}
 	return result
 	return result
 }
 }
 
 
 type GroupedTuples []DataValuer
 type GroupedTuples []DataValuer
 
 
-func (s GroupedTuples) AggregateEval(expr Expr) []interface{} {
+func (s GroupedTuples) AggregateEval(expr Expr, v CallValuer) []interface{} {
 	var result []interface{}
 	var result []interface{}
 	for _, t := range s {
 	for _, t := range s {
-		result = append(result, Eval(expr, t))
+		result = append(result, Eval(expr, t, v))
 	}
 	}
 	return result
 	return result
 }
 }
@@ -832,14 +854,16 @@ type SortingData interface {
 type MultiSorter struct {
 type MultiSorter struct {
 	SortingData
 	SortingData
 	fields SortFields
 	fields SortFields
+	valuer CallValuer
 	values []map[string]interface{}
 	values []map[string]interface{}
 }
 }
 
 
 // OrderedBy returns a Sorter that sorts using the less functions, in order.
 // OrderedBy returns a Sorter that sorts using the less functions, in order.
 // Call its Sort method to sort the data.
 // Call its Sort method to sort the data.
-func OrderedBy(fields SortFields) *MultiSorter {
+func OrderedBy(fields SortFields, fv *FunctionValuer) *MultiSorter {
 	return &MultiSorter{
 	return &MultiSorter{
 		fields: fields,
 		fields: fields,
+		valuer: fv,
 	}
 	}
 }
 }
 
 
@@ -851,7 +875,7 @@ func OrderedBy(fields SortFields) *MultiSorter {
 // exercise for the reader.
 // exercise for the reader.
 func (ms *MultiSorter) Less(i, j int) bool {
 func (ms *MultiSorter) Less(i, j int) bool {
 	p, q := ms.values[i], ms.values[j]
 	p, q := ms.values[i], ms.values[j]
-	v := &ValuerEval{Valuer: MultiValuer(&FunctionValuer{})}
+	v := &ValuerEval{Valuer: MultiValuer(ms.valuer)}
 	for _, field := range ms.fields {
 	for _, field := range ms.fields {
 		n := field.Name
 		n := field.Name
 		vp, _ := p[n]
 		vp, _ := p[n]
@@ -891,7 +915,7 @@ func (ms *MultiSorter) Sort(data SortingData) error {
 	for i := 0; i < data.Len(); i++ {
 	for i := 0; i < data.Len(); i++ {
 		ms.values[i] = make(map[string]interface{})
 		ms.values[i] = make(map[string]interface{})
 		p := data.Index(i)
 		p := data.Index(i)
-		vep := &ValuerEval{Valuer: MultiValuer(p, &FunctionValuer{})}
+		vep := &ValuerEval{Valuer: MultiValuer(p, ms.valuer)}
 		for j, field := range ms.fields {
 		for j, field := range ms.fields {
 			n := field.Name
 			n := field.Name
 			vp, _ := vep.Valuer.Value(n)
 			vp, _ := vep.Valuer.Value(n)
@@ -959,8 +983,8 @@ type EvalResultMessage struct {
 type ResultsAndMessages []EvalResultMessage
 type ResultsAndMessages []EvalResultMessage
 
 
 // Eval evaluates expr against a map.
 // Eval evaluates expr against a map.
-func Eval(expr Expr, m Valuer) interface{} {
-	eval := ValuerEval{Valuer: MultiValuer(m, &FunctionValuer{})}
+func Eval(expr Expr, m Valuer, v CallValuer) interface{} {
+	eval := ValuerEval{Valuer: MultiValuer(m, v)}
 	return eval.Eval(expr)
 	return eval.Eval(expr)
 }
 }
 
 
@@ -1015,12 +1039,14 @@ func (a multiValuer) Call(name string, args []interface{}) (interface{}, bool) {
 type multiAggregateValuer struct {
 type multiAggregateValuer struct {
 	data AggregateData
 	data AggregateData
 	multiValuer
 	multiValuer
+	singleCallValuer CallValuer
 }
 }
 
 
-func MultiAggregateValuer(data AggregateData, valuers ...Valuer) Valuer {
+func MultiAggregateValuer(data AggregateData, singleCallValuer CallValuer, valuers ...Valuer) Valuer {
 	return &multiAggregateValuer{
 	return &multiAggregateValuer{
-		data:        data,
-		multiValuer: valuers,
+		data:             data,
+		multiValuer:      valuers,
+		singleCallValuer: singleCallValuer,
 	}
 	}
 }
 }
 
 
@@ -1057,6 +1083,10 @@ func (a *multiAggregateValuer) GetAllTuples() AggregateData {
 	return a.data
 	return a.data
 }
 }
 
 
+func (a *multiAggregateValuer) GetSingleCallValuer() CallValuer {
+	return a.singleCallValuer
+}
+
 type BracketEvalResult struct {
 type BracketEvalResult struct {
 	Start, End int
 	Start, End int
 }
 }
@@ -1097,7 +1127,7 @@ func (v *ValuerEval) Eval(expr Expr) interface{} {
 				args = make([]interface{}, len(expr.Args))
 				args = make([]interface{}, len(expr.Args))
 				if aggreValuer, ok := valuer.(AggregateCallValuer); ok {
 				if aggreValuer, ok := valuer.(AggregateCallValuer); ok {
 					for i := range expr.Args {
 					for i := range expr.Args {
-						args[i] = aggreValuer.GetAllTuples().AggregateEval(expr.Args[i])
+						args[i] = aggreValuer.GetAllTuples().AggregateEval(expr.Args[i], aggreValuer.GetSingleCallValuer())
 					}
 					}
 				} else {
 				} else {
 					for i := range expr.Args {
 					for i := range expr.Args {
@@ -1711,8 +1741,8 @@ func isAggFunc(f *Call) bool {
 	} else if _, ok := mathFuncMap[fn]; ok {
 	} else if _, ok := mathFuncMap[fn]; ok {
 		return false
 		return false
 	} else {
 	} else {
-		if nf, err := plugins.GetPlugin(f.Name, plugins.FUNCTION); err == nil {
-			if ef, ok := nf.(api.Function); ok && ef.IsAggregate() {
+		if nf, err := plugins.GetFunction(f.Name); err == nil {
+			if nf.IsAggregate() {
 				return true
 				return true
 			}
 			}
 		}
 		}

+ 42 - 17
xsql/funcs_aggregate.go

@@ -9,18 +9,36 @@ import (
 )
 )
 
 
 type AggregateFunctionValuer struct {
 type AggregateFunctionValuer struct {
-	Data AggregateData
+	data    AggregateData
+	fv      *FunctionValuer
+	plugins map[string]api.Function
 }
 }
 
 
-func (v AggregateFunctionValuer) Value(key string) (interface{}, bool) {
+//Should only be called by stream to make sure a single instance for an operation
+func NewAggregateFunctionValuers() (*FunctionValuer, *AggregateFunctionValuer) {
+	fv := &FunctionValuer{}
+	return fv, &AggregateFunctionValuer{
+		fv: fv,
+	}
+}
+
+func (v *AggregateFunctionValuer) SetData(data AggregateData) {
+	v.data = data
+}
+
+func (v *AggregateFunctionValuer) GetSingleCallValuer() CallValuer {
+	return v.fv
+}
+
+func (v *AggregateFunctionValuer) Value(key string) (interface{}, bool) {
 	return nil, false
 	return nil, false
 }
 }
 
 
-func (v AggregateFunctionValuer) Meta(key string) (interface{}, bool) {
+func (v *AggregateFunctionValuer) Meta(key string) (interface{}, bool) {
 	return nil, false
 	return nil, false
 }
 }
 
 
-func (v AggregateFunctionValuer) Call(name string, args []interface{}) (interface{}, bool) {
+func (v *AggregateFunctionValuer) Call(name string, args []interface{}) (interface{}, bool) {
 	lowerName := strings.ToLower(name)
 	lowerName := strings.ToLower(name)
 	switch lowerName {
 	switch lowerName {
 	case "avg":
 	case "avg":
@@ -140,25 +158,32 @@ func (v AggregateFunctionValuer) Call(name string, args []interface{}) (interfac
 		return 0, true
 		return 0, true
 	default:
 	default:
 		common.Log.Debugf("run aggregate func %s", name)
 		common.Log.Debugf("run aggregate func %s", name)
-		if nf, err := plugins.GetPlugin(name, plugins.FUNCTION); err != nil {
-			return nil, false
-		} else {
-			f, ok := nf.(api.Function)
-			if !ok {
-				return nil, false
-			}
-			if !f.IsAggregate() {
-				return nil, false
+		if v.plugins == nil {
+			v.plugins = make(map[string]api.Function)
+		}
+		var (
+			nf  api.Function
+			ok  bool
+			err error
+		)
+		if nf, ok = v.plugins[name]; !ok {
+			nf, err = plugins.GetFunction(name)
+			if err != nil {
+				return err, false
 			}
 			}
-			result, ok := f.Exec(args)
-			common.Log.Debugf("run custom aggregate function %s, get result %v", name, result)
-			return result, ok
+			v.plugins[name] = nf
+		}
+		if !nf.IsAggregate() {
+			return nil, false
 		}
 		}
+		result, ok := nf.Exec(args)
+		common.Log.Debugf("run custom aggregate function %s, get result %v", name, result)
+		return result, ok
 	}
 	}
 }
 }
 
 
 func (v *AggregateFunctionValuer) GetAllTuples() AggregateData {
 func (v *AggregateFunctionValuer) GetAllTuples() AggregateData {
-	return v.Data
+	return v.data
 }
 }
 
 
 func getFirstValidArg(s []interface{}) interface{} {
 func getFirstValidArg(s []interface{}) interface{} {

+ 2 - 7
xsql/funcs_ast_validator.go

@@ -3,7 +3,6 @@ package xsql
 import (
 import (
 	"fmt"
 	"fmt"
 	"github.com/emqx/kuiper/plugins"
 	"github.com/emqx/kuiper/plugins"
-	"github.com/emqx/kuiper/xstream/api"
 	"strings"
 	"strings"
 )
 )
 
 
@@ -26,18 +25,14 @@ func validateFuncs(funcName string, args []Expr) error {
 	} else if _, ok := aggFuncMap[lowerName]; ok {
 	} else if _, ok := aggFuncMap[lowerName]; ok {
 		return validateAggFunc(lowerName, args)
 		return validateAggFunc(lowerName, args)
 	} else {
 	} else {
-		if nf, err := plugins.GetPlugin(funcName, plugins.FUNCTION); err != nil {
+		if nf, err := plugins.GetFunction(funcName); err != nil {
 			return err
 			return err
 		} else {
 		} else {
-			f, ok := nf.(api.Function)
-			if !ok {
-				return fmt.Errorf("exported symbol %s is not type of api.Function", funcName)
-			}
 			var targs []interface{}
 			var targs []interface{}
 			for _, arg := range args {
 			for _, arg := range args {
 				targs = append(targs, arg)
 				targs = append(targs, arg)
 			}
 			}
-			return f.Validate(targs)
+			return nf.Validate(targs)
 		}
 		}
 	}
 	}
 }
 }

+ 23 - 14
xsql/functions.go

@@ -7,7 +7,9 @@ import (
 	"strings"
 	"strings"
 )
 )
 
 
-type FunctionValuer struct{}
+type FunctionValuer struct {
+	plugins map[string]api.Function
+}
 
 
 func (*FunctionValuer) Value(key string) (interface{}, bool) {
 func (*FunctionValuer) Value(key string) (interface{}, bool) {
 	return nil, false
 	return nil, false
@@ -60,7 +62,7 @@ var otherFuncMap = map[string]string{"isnull": "",
 	"newuuid": "", "timestamp": "", "mqtt": "", "meta": "",
 	"newuuid": "", "timestamp": "", "mqtt": "", "meta": "",
 }
 }
 
 
-func (*FunctionValuer) Call(name string, args []interface{}) (interface{}, bool) {
+func (fv *FunctionValuer) Call(name string, args []interface{}) (interface{}, bool) {
 	lowerName := strings.ToLower(name)
 	lowerName := strings.ToLower(name)
 	if _, ok := mathFuncMap[lowerName]; ok {
 	if _, ok := mathFuncMap[lowerName]; ok {
 		return mathCall(name, args)
 		return mathCall(name, args)
@@ -76,19 +78,26 @@ func (*FunctionValuer) Call(name string, args []interface{}) (interface{}, bool)
 		return nil, false
 		return nil, false
 	} else {
 	} else {
 		common.Log.Debugf("run func %s", name)
 		common.Log.Debugf("run func %s", name)
-		if nf, err := plugins.GetPlugin(name, plugins.FUNCTION); err != nil {
-			return err, false
-		} else {
-			f, ok := nf.(api.Function)
-			if !ok {
-				return nil, false
-			}
-			if f.IsAggregate() {
-				return nil, false
+		if fv.plugins == nil {
+			fv.plugins = make(map[string]api.Function)
+		}
+		var (
+			nf  api.Function
+			ok  bool
+			err error
+		)
+		if nf, ok = fv.plugins[name]; !ok {
+			nf, err = plugins.GetFunction(name)
+			if err != nil {
+				return err, false
 			}
 			}
-			result, ok := f.Exec(args)
-			common.Log.Debugf("run custom function %s, get result %v", name, result)
-			return result, ok
+			fv.plugins[name] = nf
+		}
+		if nf.IsAggregate() {
+			return nil, false
 		}
 		}
+		result, ok := nf.Exec(args)
+		common.Log.Debugf("run custom function %s, get result %v", name, result)
+		return result, ok
 	}
 	}
 }
 }

+ 102 - 44
xsql/plans/aggregate_operator.go

@@ -8,66 +8,124 @@ import (
 
 
 type AggregatePlan struct {
 type AggregatePlan struct {
 	Dimensions xsql.Dimensions
 	Dimensions xsql.Dimensions
+	Alias      xsql.Fields
 }
 }
 
 
 /**
 /**
  *  input: *xsql.Tuple from preprocessor | xsql.WindowTuplesSet from windowOp | xsql.JoinTupleSets from joinOp
  *  input: *xsql.Tuple from preprocessor | xsql.WindowTuplesSet from windowOp | xsql.JoinTupleSets from joinOp
  *  output: xsql.GroupedTuplesSet
  *  output: xsql.GroupedTuplesSet
  */
  */
-func (p *AggregatePlan) Apply(ctx api.StreamContext, data interface{}) interface{} {
+func (p *AggregatePlan) Apply(ctx api.StreamContext, data interface{}, fv *xsql.FunctionValuer, afv *xsql.AggregateFunctionValuer) interface{} {
 	log := ctx.GetLogger()
 	log := ctx.GetLogger()
 	log.Debugf("aggregate plan receive %s", data)
 	log.Debugf("aggregate plan receive %s", data)
-	var ms []xsql.DataValuer
-	switch input := data.(type) {
-	case error:
-		return input
-	case xsql.DataValuer:
-		ms = append(ms, input)
-	case xsql.WindowTuplesSet:
-		if len(input) != 1 {
-			return fmt.Errorf("run Group By error: the input WindowTuplesSet with multiple tuples cannot be evaluated")
-		}
-		ms = make([]xsql.DataValuer, len(input[0].Tuples))
-		for i, m := range input[0].Tuples {
-			//this is needed or it will always point to the last
-			t := m
-			ms[i] = &t
-		}
-	case xsql.JoinTupleSets:
-		ms = make([]xsql.DataValuer, len(input))
-		for i, m := range input {
-			t := m
-			ms[i] = &t
+	grouped := data
+	if p.Dimensions != nil {
+		var ms []xsql.DataValuer
+		switch input := data.(type) {
+		case error:
+			return input
+		case xsql.DataValuer:
+			ms = append(ms, input)
+		case xsql.WindowTuplesSet:
+			if len(input) != 1 {
+				return fmt.Errorf("run Group By error: the input WindowTuplesSet with multiple tuples cannot be evaluated")
+			}
+			ms = make([]xsql.DataValuer, len(input[0].Tuples))
+			for i, m := range input[0].Tuples {
+				//this is needed or it will always point to the last
+				t := m
+				ms[i] = &t
+			}
+		case xsql.JoinTupleSets:
+			ms = make([]xsql.DataValuer, len(input))
+			for i, m := range input {
+				t := m
+				ms[i] = &t
+			}
+		default:
+			return fmt.Errorf("run Group By error: invalid input %[1]T(%[1]v)", input)
 		}
 		}
-	default:
-		return fmt.Errorf("run Group By error: invalid input %[1]T(%[1]v)", input)
-	}
 
 
-	result := make(map[string]xsql.GroupedTuples)
-	for _, m := range ms {
-		var name string
-		ve := &xsql.ValuerEval{Valuer: xsql.MultiValuer(m, &xsql.FunctionValuer{})}
-		for _, d := range p.Dimensions {
-			r := ve.Eval(d.Expr)
-			if _, ok := r.(error); ok {
-				return fmt.Errorf("run Group By error: %s", r)
+		result := make(map[string]xsql.GroupedTuples)
+		for _, m := range ms {
+			var name string
+			ve := &xsql.ValuerEval{Valuer: xsql.MultiValuer(m, fv)}
+			for _, d := range p.Dimensions {
+				r := ve.Eval(d.Expr)
+				if _, ok := r.(error); ok {
+					return fmt.Errorf("run Group By error: %s", r)
+				} else {
+					name += fmt.Sprintf("%v,", r)
+				}
+			}
+			if ts, ok := result[name]; !ok {
+				result[name] = xsql.GroupedTuples{m}
 			} else {
 			} else {
-				name += fmt.Sprintf("%v,", r)
+				result[name] = append(ts, m)
 			}
 			}
 		}
 		}
-		if ts, ok := result[name]; !ok {
-			result[name] = xsql.GroupedTuples{m}
+		if len(result) > 0 {
+			g := make([]xsql.GroupedTuples, 0, len(result))
+			for _, v := range result {
+				g = append(g, v)
+			}
+			grouped = xsql.GroupedTuplesSet(g)
 		} else {
 		} else {
-			result[name] = append(ts, m)
+			grouped = nil
 		}
 		}
 	}
 	}
-	if len(result) > 0 {
-		g := make([]xsql.GroupedTuples, 0, len(result))
-		for _, v := range result {
-			g = append(g, v)
+	if len(p.Alias) != 0 {
+		switch input := grouped.(type) {
+		case *xsql.Tuple:
+			if err := p.calculateAlias(input, input, fv, afv); err != nil {
+				return fmt.Errorf("run Aggregate function alias error: %s", err)
+			}
+		case xsql.GroupedTuplesSet:
+			for _, v := range input {
+				if err := p.calculateAlias(v[0], v, fv, afv); err != nil {
+					return fmt.Errorf("run Aggregate function alias error: %s", err)
+				}
+			}
+		case xsql.WindowTuplesSet:
+			if len(input) != 1 {
+				return fmt.Errorf("run Aggregate function alias error: the input WindowTuplesSet with multiple tuples cannot be evaluated)")
+			}
+			if err := p.calculateAlias(&input[0].Tuples[0], input, fv, afv); err != nil {
+				return fmt.Errorf("run Aggregate function alias error: %s", err)
+			}
+		case xsql.JoinTupleSets:
+			if err := p.calculateAlias(&input[0], input, fv, afv); err != nil {
+				return fmt.Errorf("run Aggregate function alias error: %s", err)
+			}
+		default:
+			return fmt.Errorf("run Aggregate function alias error: invalid input %[1]T(%[1]v)", input)
 		}
 		}
-		return xsql.GroupedTuplesSet(g)
-	} else {
-		return nil
 	}
 	}
+
+	return grouped
+}
+
+func (p *AggregatePlan) calculateAlias(tuple xsql.DataValuer, agg xsql.AggregateData, fv *xsql.FunctionValuer, afv *xsql.AggregateFunctionValuer) error {
+	afv.SetData(agg)
+	ve := &xsql.ValuerEval{Valuer: xsql.MultiAggregateValuer(agg, fv, tuple, fv, afv, &xsql.WildcardValuer{Data: tuple})}
+	for _, f := range p.Alias {
+		v := ve.Eval(f.Expr)
+		err := setTuple(tuple, f.AName, v)
+		if err != nil {
+			return err
+		}
+	}
+	return nil
+}
+
+func setTuple(tuple xsql.DataValuer, name string, value interface{}) error {
+	switch t := tuple.(type) {
+	case *xsql.Tuple:
+		t.Message[name] = value
+	case *xsql.JoinTuple:
+		t.Tuples[0].Message[name] = value
+	default:
+		return fmt.Errorf("invalid tuple to set alias")
+	}
+	return nil
 }
 }

+ 377 - 4
xsql/plans/aggregate_test.go

@@ -309,9 +309,9 @@ func TestAggregatePlan_Apply(t *testing.T) {
 			t.Errorf("statement parse error %s", err)
 			t.Errorf("statement parse error %s", err)
 			break
 			break
 		}
 		}
-
+		fv, afv := xsql.NewAggregateFunctionValuers()
 		pp := &AggregatePlan{Dimensions: stmt.Dimensions.GetGroups()}
 		pp := &AggregatePlan{Dimensions: stmt.Dimensions.GetGroups()}
-		result := pp.Apply(ctx, tt.data)
+		result := pp.Apply(ctx, tt.data, fv, afv)
 		gr, ok := result.(xsql.GroupedTuplesSet)
 		gr, ok := result.(xsql.GroupedTuplesSet)
 		if !ok {
 		if !ok {
 			t.Errorf("result is not GroupedTuplesSet")
 			t.Errorf("result is not GroupedTuplesSet")
@@ -334,6 +334,379 @@ func TestAggregatePlan_Apply(t *testing.T) {
 	}
 	}
 }
 }
 
 
+func TestAggregatePlanGroupAlias_Apply(t *testing.T) {
+	var tests = []struct {
+		sql    string
+		data   interface{}
+		result xsql.GroupedTuplesSet
+	}{
+		{
+			sql: "SELECT count(*) as c FROM tbl group by abc",
+			data: &xsql.Tuple{
+				Emitter: "tbl",
+				Message: xsql.Message{
+					"abc": int64(6),
+					"def": "hello",
+				},
+			},
+			result: xsql.GroupedTuplesSet{
+				{
+					&xsql.Tuple{
+						Emitter: "tbl",
+						Message: xsql.Message{
+							"abc": int64(6),
+							"def": "hello",
+							"c":   1,
+						},
+					},
+				},
+			},
+		},
+
+		{
+			sql: "SELECT count(*) as c FROM src1 GROUP BY TUMBLINGWINDOW(ss, 10), f1",
+			data: xsql.WindowTuplesSet{
+				xsql.WindowTuples{
+					Emitter: "src1",
+					Tuples: []xsql.Tuple{
+						{
+							Emitter: "src1",
+							Message: xsql.Message{"id1": 1, "f1": "v1"},
+						}, {
+							Emitter: "src1",
+							Message: xsql.Message{"id1": 2, "f1": "v2"},
+						}, {
+							Emitter: "src1",
+							Message: xsql.Message{"id1": 3, "f1": "v1"},
+						},
+					},
+				},
+			},
+			result: xsql.GroupedTuplesSet{
+				{
+					&xsql.Tuple{
+						Emitter: "src1",
+						Message: xsql.Message{"id1": 1, "f1": "v1", "c": 2},
+					},
+					&xsql.Tuple{
+						Emitter: "src1",
+						Message: xsql.Message{"id1": 3, "f1": "v1"},
+					},
+				},
+				{
+					&xsql.Tuple{
+						Emitter: "src1",
+						Message: xsql.Message{"id1": 2, "f1": "v2", "c": 1},
+					},
+				},
+			},
+		},
+		{
+			sql: "SELECT abc, count(*) as c FROM src1 GROUP BY id1, TUMBLINGWINDOW(ss, 10), f1",
+			data: xsql.WindowTuplesSet{
+				xsql.WindowTuples{
+					Emitter: "src1",
+					Tuples: []xsql.Tuple{
+						{
+							Emitter: "src1",
+							Message: xsql.Message{"id1": 1, "f1": "v1"},
+						}, {
+							Emitter: "src1",
+							Message: xsql.Message{"id1": 2, "f1": "v2"},
+						}, {
+							Emitter: "src1",
+							Message: xsql.Message{"id1": 3, "f1": "v1"},
+						},
+					},
+				},
+			},
+			result: xsql.GroupedTuplesSet{
+				{
+					&xsql.Tuple{
+						Emitter: "src1",
+						Message: xsql.Message{"id1": 1, "f1": "v1", "c": 1},
+					},
+				},
+				{
+					&xsql.Tuple{
+						Emitter: "src1",
+						Message: xsql.Message{"id1": 2, "f1": "v2", "c": 1},
+					},
+				},
+				{
+					&xsql.Tuple{
+						Emitter: "src1",
+						Message: xsql.Message{"id1": 3, "f1": "v1", "c": 1},
+					},
+				},
+			},
+		},
+		{
+			sql: "SELECT count(*) as c FROM src1 GROUP BY meta(topic), TUMBLINGWINDOW(ss, 10)",
+			data: xsql.WindowTuplesSet{
+				xsql.WindowTuples{
+					Emitter: "src1",
+					Tuples: []xsql.Tuple{
+						{
+							Emitter:  "src1",
+							Message:  xsql.Message{"id1": 1, "f1": "v1"},
+							Metadata: xsql.Metadata{"topic": "topic1"},
+						}, {
+							Emitter:  "src1",
+							Message:  xsql.Message{"id1": 2, "f1": "v2"},
+							Metadata: xsql.Metadata{"topic": "topic2"},
+						}, {
+							Emitter:  "src1",
+							Message:  xsql.Message{"id1": 3, "f1": "v1"},
+							Metadata: xsql.Metadata{"topic": "topic1"},
+						},
+					},
+				},
+			},
+			result: xsql.GroupedTuplesSet{
+				{
+					&xsql.Tuple{
+						Emitter:  "src1",
+						Message:  xsql.Message{"id1": 1, "f1": "v1", "c": 2},
+						Metadata: xsql.Metadata{"topic": "topic1"},
+					},
+					&xsql.Tuple{
+						Emitter:  "src1",
+						Message:  xsql.Message{"id1": 3, "f1": "v1"},
+						Metadata: xsql.Metadata{"topic": "topic1"},
+					},
+				},
+				{
+					&xsql.Tuple{
+						Emitter:  "src1",
+						Message:  xsql.Message{"id1": 2, "f1": "v2", "c": 1},
+						Metadata: xsql.Metadata{"topic": "topic2"},
+					},
+				},
+			},
+		},
+		{
+			sql: "SELECT count(*) as c FROM src1 left join src2 on src1.id1 = src2.id2 GROUP BY src2.f2, TUMBLINGWINDOW(ss, 10)",
+			data: xsql.JoinTupleSets{
+				xsql.JoinTuple{
+					Tuples: []xsql.Tuple{
+						{Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
+						{Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
+					},
+				},
+				xsql.JoinTuple{
+					Tuples: []xsql.Tuple{
+						{Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
+						{Emitter: "src2", Message: xsql.Message{"id2": 4, "f2": "w3"}},
+					},
+				},
+				xsql.JoinTuple{
+					Tuples: []xsql.Tuple{
+						{Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v1"}},
+					},
+				},
+			},
+			result: xsql.GroupedTuplesSet{
+				{
+					&xsql.JoinTuple{
+						Tuples: []xsql.Tuple{
+							{Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1", "c": 1}},
+							{Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
+						},
+					},
+				},
+				{
+					&xsql.JoinTuple{
+						Tuples: []xsql.Tuple{
+							{Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2", "c": 1}},
+							{Emitter: "src2", Message: xsql.Message{"id2": 4, "f2": "w3"}},
+						},
+					},
+				},
+				{
+					&xsql.JoinTuple{
+						Tuples: []xsql.Tuple{
+							{Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v1", "c": 1}},
+						},
+					},
+				},
+			},
+		},
+	}
+
+	fmt.Printf("The test bucket size is %d.\n\n", len(tests))
+	contextLogger := common.Log.WithField("rule", "TestFilterPlan_Apply")
+	ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
+	for i, tt := range tests {
+		stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
+		if err != nil {
+			t.Errorf("statement parse error %s", err)
+			break
+		}
+		var aggregateAlias xsql.Fields
+		for _, f := range stmt.Fields {
+			if f.AName != "" {
+				if xsql.HasAggFuncs(f.Expr) {
+					aggregateAlias = append(aggregateAlias, f)
+				}
+			}
+		}
+		fv, afv := xsql.NewAggregateFunctionValuers()
+		pp := &AggregatePlan{Dimensions: stmt.Dimensions.GetGroups(), Alias: aggregateAlias}
+		result := pp.Apply(ctx, tt.data, fv, afv)
+		gr, ok := result.(xsql.GroupedTuplesSet)
+		if !ok {
+			t.Errorf("result is not GroupedTuplesSet")
+		}
+		if len(tt.result) != len(gr) {
+			t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, gr)
+		}
+
+		for _, r := range tt.result {
+			matched := false
+			for _, gre := range gr {
+				if reflect.DeepEqual(r, gre) {
+					matched = true
+				}
+			}
+			if !matched {
+				t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, r)
+			}
+		}
+	}
+}
+
+func TestAggregatePlanAlias_Apply(t *testing.T) {
+	var tests = []struct {
+		sql    string
+		data   interface{}
+		result interface{}
+	}{
+		{
+			sql: "SELECT count(*) as c FROM demo",
+			data: &xsql.Tuple{
+				Emitter: "tbl",
+				Message: xsql.Message{
+					"abc": int64(6),
+					"def": "hello",
+				},
+			},
+			result: &xsql.Tuple{
+				Emitter: "tbl",
+				Message: xsql.Message{
+					"abc": int64(6),
+					"def": "hello",
+					"c":   1,
+				},
+			},
+		},
+		{
+			sql: `SELECT count(*) as c FROM src1`,
+			data: xsql.WindowTuplesSet{
+				xsql.WindowTuples{
+					Emitter: "src1",
+					Tuples: []xsql.Tuple{
+						{
+							Emitter: "src1",
+							Message: xsql.Message{"id1": 1, "f1": "v1"},
+						}, {
+							Emitter: "src1",
+							Message: xsql.Message{"id1": 2, "f1": "v2"},
+						}, {
+							Emitter: "src1",
+							Message: xsql.Message{"id1": 5, "f1": "v1"},
+						},
+					},
+				},
+			},
+			result: xsql.WindowTuplesSet{
+				xsql.WindowTuples{
+					Emitter: "src1",
+					Tuples: []xsql.Tuple{
+						{
+							Emitter: "src1",
+							Message: xsql.Message{"id1": 1, "f1": "v1", "c": 3},
+						}, {
+							Emitter: "src1",
+							Message: xsql.Message{"id1": 2, "f1": "v2"},
+						}, {
+							Emitter: "src1",
+							Message: xsql.Message{"id1": 5, "f1": "v1"},
+						},
+					},
+				},
+			},
+		}, {
+			sql: "SELECT count(*) as c FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10)",
+			data: xsql.JoinTupleSets{
+				xsql.JoinTuple{
+					Tuples: []xsql.Tuple{
+						{Emitter: "test", Message: xsql.Message{"id": 1, "a": 122.33}},
+						{Emitter: "src2", Message: xsql.Message{"id": 1, "color": "w2"}},
+					},
+				},
+				xsql.JoinTuple{
+					Tuples: []xsql.Tuple{
+						{Emitter: "test", Message: xsql.Message{"id": 1, "a": 68.55}},
+						{Emitter: "src2", Message: xsql.Message{"id": 1, "color": "w2"}},
+					},
+				},
+				xsql.JoinTuple{
+					Tuples: []xsql.Tuple{
+						{Emitter: "test", Message: xsql.Message{"id": 5, "a": 177.51}},
+						{Emitter: "src2", Message: xsql.Message{"id": 5, "color": "w2"}},
+					},
+				},
+			},
+			result: xsql.JoinTupleSets{
+				xsql.JoinTuple{
+					Tuples: []xsql.Tuple{
+						{Emitter: "test", Message: xsql.Message{"id": 1, "a": 122.33, "c": 3}},
+						{Emitter: "src2", Message: xsql.Message{"id": 1, "color": "w2"}},
+					},
+				},
+				xsql.JoinTuple{
+					Tuples: []xsql.Tuple{
+						{Emitter: "test", Message: xsql.Message{"id": 1, "a": 68.55}},
+						{Emitter: "src2", Message: xsql.Message{"id": 1, "color": "w2"}},
+					},
+				},
+				xsql.JoinTuple{
+					Tuples: []xsql.Tuple{
+						{Emitter: "test", Message: xsql.Message{"id": 5, "a": 177.51}},
+						{Emitter: "src2", Message: xsql.Message{"id": 5, "color": "w2"}},
+					},
+				},
+			},
+		},
+	}
+
+	fmt.Printf("The test bucket size is %d.\n\n", len(tests))
+	contextLogger := common.Log.WithField("rule", "TestFilterPlan_Apply")
+	ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
+	for i, tt := range tests {
+		stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
+		if err != nil {
+			t.Errorf("statement parse error %s", err)
+			break
+		}
+		var aggregateAlias xsql.Fields
+		for _, f := range stmt.Fields {
+			if f.AName != "" {
+				if xsql.HasAggFuncs(f.Expr) {
+					aggregateAlias = append(aggregateAlias, f)
+				}
+			}
+		}
+		fv, afv := xsql.NewAggregateFunctionValuers()
+		pp := &AggregatePlan{Dimensions: stmt.Dimensions.GetGroups(), Alias: aggregateAlias}
+		result := pp.Apply(ctx, tt.data, fv, afv)
+		if !reflect.DeepEqual(tt.result, result) {
+			t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
+		}
+	}
+}
+
 func TestAggregatePlanError(t *testing.T) {
 func TestAggregatePlanError(t *testing.T) {
 	tests := []struct {
 	tests := []struct {
 		sql    string
 		sql    string
@@ -378,9 +751,9 @@ func TestAggregatePlanError(t *testing.T) {
 			t.Errorf("statement parse error %s", err)
 			t.Errorf("statement parse error %s", err)
 			break
 			break
 		}
 		}
-
+		fv, afv := xsql.NewAggregateFunctionValuers()
 		pp := &AggregatePlan{Dimensions: stmt.Dimensions.GetGroups()}
 		pp := &AggregatePlan{Dimensions: stmt.Dimensions.GetGroups()}
-		result := pp.Apply(ctx, tt.data)
+		result := pp.Apply(ctx, tt.data, fv, afv)
 		if !reflect.DeepEqual(tt.result, result) {
 		if !reflect.DeepEqual(tt.result, result) {
 			t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
 			t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
 		}
 		}

+ 4 - 4
xsql/plans/filter_operator.go

@@ -14,14 +14,14 @@ type FilterPlan struct {
  *  input: *xsql.Tuple from preprocessor | xsql.WindowTuplesSet from windowOp | xsql.JoinTupleSets from joinOp
  *  input: *xsql.Tuple from preprocessor | xsql.WindowTuplesSet from windowOp | xsql.JoinTupleSets from joinOp
  *  output: *xsql.Tuple | xsql.WindowTuplesSet | xsql.JoinTupleSets
  *  output: *xsql.Tuple | xsql.WindowTuplesSet | xsql.JoinTupleSets
  */
  */
-func (p *FilterPlan) Apply(ctx api.StreamContext, data interface{}) interface{} {
+func (p *FilterPlan) Apply(ctx api.StreamContext, data interface{}, fv *xsql.FunctionValuer, afv *xsql.AggregateFunctionValuer) interface{} {
 	log := ctx.GetLogger()
 	log := ctx.GetLogger()
 	log.Debugf("filter plan receive %s", data)
 	log.Debugf("filter plan receive %s", data)
 	switch input := data.(type) {
 	switch input := data.(type) {
 	case error:
 	case error:
 		return input
 		return input
 	case xsql.Valuer:
 	case xsql.Valuer:
-		ve := &xsql.ValuerEval{Valuer: xsql.MultiValuer(input, &xsql.FunctionValuer{})}
+		ve := &xsql.ValuerEval{Valuer: xsql.MultiValuer(input, fv)}
 		result := ve.Eval(p.Condition)
 		result := ve.Eval(p.Condition)
 		switch r := result.(type) {
 		switch r := result.(type) {
 		case error:
 		case error:
@@ -40,7 +40,7 @@ func (p *FilterPlan) Apply(ctx api.StreamContext, data interface{}) interface{}
 		ms := input[0].Tuples
 		ms := input[0].Tuples
 		r := ms[:0]
 		r := ms[:0]
 		for _, v := range ms {
 		for _, v := range ms {
-			ve := &xsql.ValuerEval{Valuer: xsql.MultiValuer(&v, &xsql.FunctionValuer{})}
+			ve := &xsql.ValuerEval{Valuer: xsql.MultiValuer(&v, fv)}
 			result := ve.Eval(p.Condition)
 			result := ve.Eval(p.Condition)
 			switch val := result.(type) {
 			switch val := result.(type) {
 			case error:
 			case error:
@@ -61,7 +61,7 @@ func (p *FilterPlan) Apply(ctx api.StreamContext, data interface{}) interface{}
 		ms := input
 		ms := input
 		r := ms[:0]
 		r := ms[:0]
 		for _, v := range ms {
 		for _, v := range ms {
-			ve := &xsql.ValuerEval{Valuer: xsql.MultiValuer(&v, &xsql.FunctionValuer{})}
+			ve := &xsql.ValuerEval{Valuer: xsql.MultiValuer(&v, fv)}
 			result := ve.Eval(p.Condition)
 			result := ve.Eval(p.Condition)
 			switch val := result.(type) {
 			switch val := result.(type) {
 			case error:
 			case error:

+ 4 - 4
xsql/plans/filter_test.go

@@ -263,9 +263,9 @@ func TestFilterPlan_Apply(t *testing.T) {
 			t.Errorf("statement parse error %s", err)
 			t.Errorf("statement parse error %s", err)
 			break
 			break
 		}
 		}
-
+		fv, afv := xsql.NewAggregateFunctionValuers()
 		pp := &FilterPlan{Condition: stmt.Condition}
 		pp := &FilterPlan{Condition: stmt.Condition}
-		result := pp.Apply(ctx, tt.data)
+		result := pp.Apply(ctx, tt.data, fv, afv)
 		if !reflect.DeepEqual(tt.result, result) {
 		if !reflect.DeepEqual(tt.result, result) {
 			t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
 			t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
 		}
 		}
@@ -386,9 +386,9 @@ func TestFilterPlanError(t *testing.T) {
 			t.Errorf("statement parse error %s", err)
 			t.Errorf("statement parse error %s", err)
 			break
 			break
 		}
 		}
-
+		fv, afv := xsql.NewAggregateFunctionValuers()
 		pp := &FilterPlan{Condition: stmt.Condition}
 		pp := &FilterPlan{Condition: stmt.Condition}
-		result := pp.Apply(ctx, tt.data)
+		result := pp.Apply(ctx, tt.data, fv, afv)
 		if !reflect.DeepEqual(tt.result, result) {
 		if !reflect.DeepEqual(tt.result, result) {
 			t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
 			t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
 		}
 		}

+ 17 - 20
xsql/plans/having_operator.go

@@ -10,7 +10,7 @@ type HavingPlan struct {
 	Condition xsql.Expr
 	Condition xsql.Expr
 }
 }
 
 
-func (p *HavingPlan) Apply(ctx api.StreamContext, data interface{}) interface{} {
+func (p *HavingPlan) Apply(ctx api.StreamContext, data interface{}, fv *xsql.FunctionValuer, afv *xsql.AggregateFunctionValuer) interface{} {
 	log := ctx.GetLogger()
 	log := ctx.GetLogger()
 	log.Debugf("having plan receive %s", data)
 	log.Debugf("having plan receive %s", data)
 	switch input := data.(type) {
 	switch input := data.(type) {
@@ -19,7 +19,8 @@ func (p *HavingPlan) Apply(ctx api.StreamContext, data interface{}) interface{}
 	case xsql.GroupedTuplesSet:
 	case xsql.GroupedTuplesSet:
 		r := xsql.GroupedTuplesSet{}
 		r := xsql.GroupedTuplesSet{}
 		for _, v := range input {
 		for _, v := range input {
-			ve := &xsql.ValuerEval{Valuer: xsql.MultiAggregateValuer(v, v[0], &xsql.FunctionValuer{}, &xsql.AggregateFunctionValuer{Data: v}, &xsql.WildcardValuer{Data: v[0]})}
+			afv.SetData(v)
+			ve := &xsql.ValuerEval{Valuer: xsql.MultiAggregateValuer(v, fv, v[0], fv, afv, &xsql.WildcardValuer{Data: v[0]})}
 			result := ve.Eval(p.Condition)
 			result := ve.Eval(p.Condition)
 			switch val := result.(type) {
 			switch val := result.(type) {
 			case error:
 			case error:
@@ -40,30 +41,26 @@ func (p *HavingPlan) Apply(ctx api.StreamContext, data interface{}) interface{}
 			return fmt.Errorf("run Having error: input WindowTuplesSet with multiple tuples cannot be evaluated")
 			return fmt.Errorf("run Having error: input WindowTuplesSet with multiple tuples cannot be evaluated")
 		}
 		}
 		ms := input[0].Tuples
 		ms := input[0].Tuples
-		r := ms[:0]
-		for _, v := range ms {
-			ve := &xsql.ValuerEval{Valuer: xsql.MultiAggregateValuer(input, &v, &xsql.FunctionValuer{}, &xsql.AggregateFunctionValuer{Data: input}, &xsql.WildcardValuer{Data: &v})}
-			result := ve.Eval(p.Condition)
-			switch val := result.(type) {
-			case error:
-				return fmt.Errorf("run Having error: %s", val)
-			case bool:
-				if val {
-					r = append(r, v)
-				}
-			default:
-				return fmt.Errorf("run Having error: invalid condition that returns non-bool value %[1]T(%[1]v)", val)
+		v := ms[0]
+		afv.SetData(input)
+		ve := &xsql.ValuerEval{Valuer: xsql.MultiAggregateValuer(input, fv, &v, fv, afv, &xsql.WildcardValuer{Data: &v})}
+		result := ve.Eval(p.Condition)
+		switch val := result.(type) {
+		case error:
+			return fmt.Errorf("run Having error: %s", val)
+		case bool:
+			if val {
+				return input
 			}
 			}
-		}
-		if len(r) > 0 {
-			input[0].Tuples = r
-			return input
+		default:
+			return fmt.Errorf("run Having error: invalid condition that returns non-bool value %[1]T(%[1]v)", val)
 		}
 		}
 	case xsql.JoinTupleSets:
 	case xsql.JoinTupleSets:
 		ms := input
 		ms := input
 		r := ms[:0]
 		r := ms[:0]
+		afv.SetData(input)
 		for _, v := range ms {
 		for _, v := range ms {
-			ve := &xsql.ValuerEval{Valuer: xsql.MultiAggregateValuer(input, &v, &xsql.FunctionValuer{}, &xsql.AggregateFunctionValuer{Data: input}, &xsql.WildcardValuer{Data: &v})}
+			ve := &xsql.ValuerEval{Valuer: xsql.MultiAggregateValuer(input, fv, &v, fv, afv, &xsql.WildcardValuer{Data: &v})}
 			result := ve.Eval(p.Condition)
 			result := ve.Eval(p.Condition)
 			switch val := result.(type) {
 			switch val := result.(type) {
 			case error:
 			case error:

+ 156 - 3
xsql/plans/having_test.go

@@ -261,9 +261,162 @@ func TestHavingPlan_Apply(t *testing.T) {
 			t.Errorf("statement parse error %s", err)
 			t.Errorf("statement parse error %s", err)
 			break
 			break
 		}
 		}
+		fv, afv := xsql.NewAggregateFunctionValuers()
+		pp := &HavingPlan{Condition: stmt.Having}
+		result := pp.Apply(ctx, tt.data, fv, afv)
+		if !reflect.DeepEqual(tt.result, result) {
+			t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
+		}
+	}
+}
 
 
+func TestHavingPlanAlias_Apply(t *testing.T) {
+	var tests = []struct {
+		sql    string
+		data   interface{}
+		result interface{}
+	}{
+		{
+			sql: `SELECT avg(id1) as a FROM src1 HAVING a > 1`,
+			data: xsql.WindowTuplesSet{
+				xsql.WindowTuples{
+					Emitter: "src1",
+					Tuples: []xsql.Tuple{
+						{
+							Emitter: "src1",
+							Message: xsql.Message{"id1": 1, "f1": "v1", "a": 8 / 3},
+						}, {
+							Emitter: "src1",
+							Message: xsql.Message{"id1": 2, "f1": "v2"},
+						}, {
+							Emitter: "src1",
+							Message: xsql.Message{"id1": 5, "f1": "v1"},
+						},
+					},
+				},
+			},
+			result: xsql.WindowTuplesSet{
+				xsql.WindowTuples{
+					Emitter: "src1",
+					Tuples: []xsql.Tuple{
+						{
+							Emitter: "src1",
+							Message: xsql.Message{"id1": 1, "f1": "v1", "a": 8 / 3},
+						}, {
+							Emitter: "src1",
+							Message: xsql.Message{"id1": 2, "f1": "v2"},
+						}, {
+							Emitter: "src1",
+							Message: xsql.Message{"id1": 5, "f1": "v1"},
+						},
+					},
+				},
+			},
+		},
+
+		{
+			sql: `SELECT sum(id1) as s FROM src1 HAVING s > 1`,
+			data: xsql.WindowTuplesSet{
+				xsql.WindowTuples{
+					Emitter: "src1",
+					Tuples: []xsql.Tuple{
+						{
+							Emitter: "src1",
+							Message: xsql.Message{"id1": 1, "f1": "v1", "s": 1},
+						},
+					},
+				},
+			},
+			result: nil,
+		}, {
+			sql: "SELECT count(*) as c FROM src1 GROUP BY TUMBLINGWINDOW(ss, 10), f1 having c > 1",
+			data: xsql.GroupedTuplesSet{
+				{
+					&xsql.Tuple{
+						Emitter: "src1",
+						Message: xsql.Message{"id1": 1, "f1": "v1", "c": 2},
+					},
+					&xsql.Tuple{
+						Emitter: "src1",
+						Message: xsql.Message{"id1": 3, "f1": "v1"},
+					},
+				},
+				{
+					&xsql.Tuple{
+						Emitter: "src1",
+						Message: xsql.Message{"id1": 2, "f1": "v2", "c": 1},
+					},
+				},
+			},
+			result: xsql.GroupedTuplesSet{
+				{
+					&xsql.Tuple{
+						Emitter: "src1",
+						Message: xsql.Message{"id1": 1, "f1": "v1", "c": 2},
+					},
+					&xsql.Tuple{
+						Emitter: "src1",
+						Message: xsql.Message{"id1": 3, "f1": "v1"},
+					},
+				},
+			},
+		}, {
+			sql: "SELECT count(*) as c, round(a) as r FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10), test1.color having c > 1",
+			data: xsql.GroupedTuplesSet{
+				{
+					&xsql.JoinTuple{
+						Tuples: []xsql.Tuple{
+							{Emitter: "test", Message: xsql.Message{"id": 1, "a": 122.33, "c": 2}},
+							{Emitter: "src2", Message: xsql.Message{"id": 1, "color": "w2"}},
+						},
+					},
+					&xsql.JoinTuple{
+						Tuples: []xsql.Tuple{
+							{Emitter: "test", Message: xsql.Message{"id": 5, "a": 177.51}},
+							{Emitter: "src2", Message: xsql.Message{"id": 5, "color": "w2"}},
+						},
+					},
+				},
+				{
+					&xsql.JoinTuple{
+						Tuples: []xsql.Tuple{
+							{Emitter: "test", Message: xsql.Message{"id": 2, "a": 89.03, "c": 1}},
+							{Emitter: "src2", Message: xsql.Message{"id": 2, "color": "w1"}},
+						},
+					},
+				},
+			},
+			result: xsql.GroupedTuplesSet{
+				{
+					&xsql.JoinTuple{
+						Tuples: []xsql.Tuple{
+							{Emitter: "test", Message: xsql.Message{"id": 1, "a": 122.33, "c": 2}},
+							{Emitter: "src2", Message: xsql.Message{"id": 1, "color": "w2"}},
+						},
+					},
+					&xsql.JoinTuple{
+						Tuples: []xsql.Tuple{
+							{Emitter: "test", Message: xsql.Message{"id": 5, "a": 177.51}},
+							{Emitter: "src2", Message: xsql.Message{"id": 5, "color": "w2"}},
+						},
+					},
+				},
+			},
+		},
+	}
+
+	fmt.Printf("The test bucket size is %d.\n\n", len(tests))
+	contextLogger := common.Log.WithField("rule", "TestHavingPlan_Apply")
+	ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
+	for i, tt := range tests {
+		stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
+		if err != nil {
+			t.Errorf("statement parse error %s", err)
+			break
+		}
+		fv, afv := xsql.NewAggregateFunctionValuers()
 		pp := &HavingPlan{Condition: stmt.Having}
 		pp := &HavingPlan{Condition: stmt.Having}
-		result := pp.Apply(ctx, tt.data)
+		result := pp.Apply(ctx, tt.data, fv, afv)
 		if !reflect.DeepEqual(tt.result, result) {
 		if !reflect.DeepEqual(tt.result, result) {
 			t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
 			t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
 		}
 		}
@@ -333,9 +486,9 @@ func TestHavingPlanError(t *testing.T) {
 			t.Errorf("statement parse error %s", err)
 			t.Errorf("statement parse error %s", err)
 			break
 			break
 		}
 		}
-
+		fv, afv := xsql.NewAggregateFunctionValuers()
 		pp := &HavingPlan{Condition: stmt.Having}
 		pp := &HavingPlan{Condition: stmt.Having}
-		result := pp.Apply(ctx, tt.data)
+		result := pp.Apply(ctx, tt.data, fv, afv)
 		if !reflect.DeepEqual(tt.result, result) {
 		if !reflect.DeepEqual(tt.result, result) {
 			t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
 			t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
 		}
 		}

+ 2 - 1
xsql/plans/join_multi_test.go

@@ -396,8 +396,9 @@ func TestMultiJoinPlan_Apply(t *testing.T) {
 		if table, ok := stmt.Sources[0].(*xsql.Table); !ok {
 		if table, ok := stmt.Sources[0].(*xsql.Table); !ok {
 			t.Errorf("statement source is not a table")
 			t.Errorf("statement source is not a table")
 		} else {
 		} else {
+			fv, afv := xsql.NewAggregateFunctionValuers()
 			pp := &JoinPlan{Joins: stmt.Joins, From: table}
 			pp := &JoinPlan{Joins: stmt.Joins, From: table}
-			result := pp.Apply(ctx, tt.data)
+			result := pp.Apply(ctx, tt.data, fv, afv)
 			if !reflect.DeepEqual(tt.result, result) {
 			if !reflect.DeepEqual(tt.result, result) {
 				t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
 				t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
 			}
 			}

+ 15 - 15
xsql/plans/join_operator.go

@@ -14,7 +14,7 @@ type JoinPlan struct {
 
 
 // input:  xsql.WindowTuplesSet from windowOp, window is required for join
 // input:  xsql.WindowTuplesSet from windowOp, window is required for join
 // output: xsql.JoinTupleSets
 // output: xsql.JoinTupleSets
-func (jp *JoinPlan) Apply(ctx api.StreamContext, data interface{}) interface{} {
+func (jp *JoinPlan) Apply(ctx api.StreamContext, data interface{}, fv *xsql.FunctionValuer, _ *xsql.AggregateFunctionValuer) interface{} {
 	log := ctx.GetLogger()
 	log := ctx.GetLogger()
 	var input xsql.WindowTuplesSet
 	var input xsql.WindowTuplesSet
 	switch v := data.(type) {
 	switch v := data.(type) {
@@ -29,13 +29,13 @@ func (jp *JoinPlan) Apply(ctx api.StreamContext, data interface{}) interface{} {
 	result := xsql.JoinTupleSets{}
 	result := xsql.JoinTupleSets{}
 	for i, join := range jp.Joins {
 	for i, join := range jp.Joins {
 		if i == 0 {
 		if i == 0 {
-			v, err := jp.evalSet(input, join)
+			v, err := jp.evalSet(input, join, fv)
 			if err != nil {
 			if err != nil {
 				return fmt.Errorf("run Join error: %s", err)
 				return fmt.Errorf("run Join error: %s", err)
 			}
 			}
 			result = v
 			result = v
 		} else {
 		} else {
-			r1, err := jp.evalJoinSets(&result, input, join)
+			r1, err := jp.evalJoinSets(&result, input, join, fv)
 			if err != nil {
 			if err != nil {
 				return fmt.Errorf("run Join error: %s", err)
 				return fmt.Errorf("run Join error: %s", err)
 			}
 			}
@@ -67,7 +67,7 @@ func getStreamNames(join *xsql.Join) ([]string, error) {
 	return srcs, nil
 	return srcs, nil
 }
 }
 
 
-func (jp *JoinPlan) evalSet(input xsql.WindowTuplesSet, join xsql.Join) (xsql.JoinTupleSets, error) {
+func (jp *JoinPlan) evalSet(input xsql.WindowTuplesSet, join xsql.Join, fv *xsql.FunctionValuer) (xsql.JoinTupleSets, error) {
 	var leftStream, rightStream string
 	var leftStream, rightStream string
 
 
 	if join.JoinType != xsql.CROSS_JOIN {
 	if join.JoinType != xsql.CROSS_JOIN {
@@ -99,7 +99,7 @@ func (jp *JoinPlan) evalSet(input xsql.WindowTuplesSet, join xsql.Join) (xsql.Jo
 	sets := xsql.JoinTupleSets{}
 	sets := xsql.JoinTupleSets{}
 
 
 	if join.JoinType == xsql.RIGHT_JOIN {
 	if join.JoinType == xsql.RIGHT_JOIN {
-		return jp.evalSetWithRightJoin(input, join, false)
+		return jp.evalSetWithRightJoin(input, join, false, fv)
 	}
 	}
 	for _, left := range lefts {
 	for _, left := range lefts {
 		merged := &xsql.JoinTuple{}
 		merged := &xsql.JoinTuple{}
@@ -113,7 +113,7 @@ func (jp *JoinPlan) evalSet(input xsql.WindowTuplesSet, join xsql.Join) (xsql.Jo
 				temp := &xsql.JoinTuple{}
 				temp := &xsql.JoinTuple{}
 				temp.AddTuple(left)
 				temp.AddTuple(left)
 				temp.AddTuple(right)
 				temp.AddTuple(right)
-				ve := &xsql.ValuerEval{Valuer: xsql.MultiValuer(temp, &xsql.FunctionValuer{})}
+				ve := &xsql.ValuerEval{Valuer: xsql.MultiValuer(temp, fv)}
 				result := ve.Eval(join.Expr)
 				result := ve.Eval(join.Expr)
 				switch val := result.(type) {
 				switch val := result.(type) {
 				case error:
 				case error:
@@ -140,7 +140,7 @@ func (jp *JoinPlan) evalSet(input xsql.WindowTuplesSet, join xsql.Join) (xsql.Jo
 	}
 	}
 
 
 	if join.JoinType == xsql.FULL_JOIN {
 	if join.JoinType == xsql.FULL_JOIN {
-		if rightJoinSet, err := jp.evalSetWithRightJoin(input, join, true); err == nil {
+		if rightJoinSet, err := jp.evalSetWithRightJoin(input, join, true, fv); err == nil {
 			if len(rightJoinSet) > 0 {
 			if len(rightJoinSet) > 0 {
 				for _, jt := range rightJoinSet {
 				for _, jt := range rightJoinSet {
 					sets = append(sets, jt)
 					sets = append(sets, jt)
@@ -153,7 +153,7 @@ func (jp *JoinPlan) evalSet(input xsql.WindowTuplesSet, join xsql.Join) (xsql.Jo
 	return sets, nil
 	return sets, nil
 }
 }
 
 
-func (jp *JoinPlan) evalSetWithRightJoin(input xsql.WindowTuplesSet, join xsql.Join, excludeJoint bool) (xsql.JoinTupleSets, error) {
+func (jp *JoinPlan) evalSetWithRightJoin(input xsql.WindowTuplesSet, join xsql.Join, excludeJoint bool, fv *xsql.FunctionValuer) (xsql.JoinTupleSets, error) {
 	streams, err := getStreamNames(&join)
 	streams, err := getStreamNames(&join)
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err
@@ -176,7 +176,7 @@ func (jp *JoinPlan) evalSetWithRightJoin(input xsql.WindowTuplesSet, join xsql.J
 			temp := &xsql.JoinTuple{}
 			temp := &xsql.JoinTuple{}
 			temp.AddTuple(right)
 			temp.AddTuple(right)
 			temp.AddTuple(left)
 			temp.AddTuple(left)
-			ve := &xsql.ValuerEval{Valuer: xsql.MultiValuer(temp, &xsql.FunctionValuer{})}
+			ve := &xsql.ValuerEval{Valuer: xsql.MultiValuer(temp, fv)}
 			result := ve.Eval(join.Expr)
 			result := ve.Eval(join.Expr)
 			switch val := result.(type) {
 			switch val := result.(type) {
 			case error:
 			case error:
@@ -203,7 +203,7 @@ func (jp *JoinPlan) evalSetWithRightJoin(input xsql.WindowTuplesSet, join xsql.J
 	return sets, nil
 	return sets, nil
 }
 }
 
 
-func (jp *JoinPlan) evalJoinSets(set *xsql.JoinTupleSets, input xsql.WindowTuplesSet, join xsql.Join) (interface{}, error) {
+func (jp *JoinPlan) evalJoinSets(set *xsql.JoinTupleSets, input xsql.WindowTuplesSet, join xsql.Join, fv *xsql.FunctionValuer) (interface{}, error) {
 	var rightStream string
 	var rightStream string
 	if join.Alias == "" {
 	if join.Alias == "" {
 		rightStream = join.Name
 		rightStream = join.Name
@@ -215,7 +215,7 @@ func (jp *JoinPlan) evalJoinSets(set *xsql.JoinTupleSets, input xsql.WindowTuple
 
 
 	newSets := xsql.JoinTupleSets{}
 	newSets := xsql.JoinTupleSets{}
 	if join.JoinType == xsql.RIGHT_JOIN {
 	if join.JoinType == xsql.RIGHT_JOIN {
-		return jp.evalRightJoinSets(set, input, join, false)
+		return jp.evalRightJoinSets(set, input, join, false, fv)
 	}
 	}
 	for _, left := range *set {
 	for _, left := range *set {
 		merged := &xsql.JoinTuple{}
 		merged := &xsql.JoinTuple{}
@@ -227,7 +227,7 @@ func (jp *JoinPlan) evalJoinSets(set *xsql.JoinTupleSets, input xsql.WindowTuple
 			if join.JoinType == xsql.CROSS_JOIN {
 			if join.JoinType == xsql.CROSS_JOIN {
 				merged.AddTuple(right)
 				merged.AddTuple(right)
 			} else {
 			} else {
-				ve := &xsql.ValuerEval{Valuer: xsql.MultiValuer(&left, &right, &xsql.FunctionValuer{})}
+				ve := &xsql.ValuerEval{Valuer: xsql.MultiValuer(&left, &right, fv)}
 				result := ve.Eval(join.Expr)
 				result := ve.Eval(join.Expr)
 				switch val := result.(type) {
 				switch val := result.(type) {
 				case error:
 				case error:
@@ -252,7 +252,7 @@ func (jp *JoinPlan) evalJoinSets(set *xsql.JoinTupleSets, input xsql.WindowTuple
 	}
 	}
 
 
 	if join.JoinType == xsql.FULL_JOIN {
 	if join.JoinType == xsql.FULL_JOIN {
-		if rightJoinSet, err := jp.evalRightJoinSets(set, input, join, true); err == nil && len(rightJoinSet) > 0 {
+		if rightJoinSet, err := jp.evalRightJoinSets(set, input, join, true, fv); err == nil && len(rightJoinSet) > 0 {
 			for _, jt := range rightJoinSet {
 			for _, jt := range rightJoinSet {
 				newSets = append(newSets, jt)
 				newSets = append(newSets, jt)
 			}
 			}
@@ -262,7 +262,7 @@ func (jp *JoinPlan) evalJoinSets(set *xsql.JoinTupleSets, input xsql.WindowTuple
 	return newSets, nil
 	return newSets, nil
 }
 }
 
 
-func (jp *JoinPlan) evalRightJoinSets(set *xsql.JoinTupleSets, input xsql.WindowTuplesSet, join xsql.Join, excludeJoint bool) (xsql.JoinTupleSets, error) {
+func (jp *JoinPlan) evalRightJoinSets(set *xsql.JoinTupleSets, input xsql.WindowTuplesSet, join xsql.Join, excludeJoint bool, fv *xsql.FunctionValuer) (xsql.JoinTupleSets, error) {
 	var rightStream string
 	var rightStream string
 	if join.Alias == "" {
 	if join.Alias == "" {
 		rightStream = join.Name
 		rightStream = join.Name
@@ -278,7 +278,7 @@ func (jp *JoinPlan) evalRightJoinSets(set *xsql.JoinTupleSets, input xsql.Window
 		merged.AddTuple(right)
 		merged.AddTuple(right)
 		isJoint := false
 		isJoint := false
 		for _, left := range *set {
 		for _, left := range *set {
-			ve := &xsql.ValuerEval{Valuer: xsql.MultiValuer(&right, &left, &xsql.FunctionValuer{})}
+			ve := &xsql.ValuerEval{Valuer: xsql.MultiValuer(&right, &left, fv)}
 			result := ve.Eval(join.Expr)
 			result := ve.Eval(join.Expr)
 			switch val := result.(type) {
 			switch val := result.(type) {
 			case error:
 			case error:

+ 12 - 6
xsql/plans/join_test.go

@@ -662,8 +662,9 @@ func TestLeftJoinPlan_Apply(t *testing.T) {
 		if table, ok := stmt.Sources[0].(*xsql.Table); !ok {
 		if table, ok := stmt.Sources[0].(*xsql.Table); !ok {
 			t.Errorf("statement source is not a table")
 			t.Errorf("statement source is not a table")
 		} else {
 		} else {
+			fv, afv := xsql.NewAggregateFunctionValuers()
 			pp := &JoinPlan{Joins: stmt.Joins, From: table}
 			pp := &JoinPlan{Joins: stmt.Joins, From: table}
-			result := pp.Apply(ctx, tt.data)
+			result := pp.Apply(ctx, tt.data, fv, afv)
 			if !reflect.DeepEqual(tt.result, result) {
 			if !reflect.DeepEqual(tt.result, result) {
 				t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
 				t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
 			}
 			}
@@ -1127,8 +1128,9 @@ func TestInnerJoinPlan_Apply(t *testing.T) {
 		if table, ok := stmt.Sources[0].(*xsql.Table); !ok {
 		if table, ok := stmt.Sources[0].(*xsql.Table); !ok {
 			t.Errorf("statement source is not a table")
 			t.Errorf("statement source is not a table")
 		} else {
 		} else {
+			fv, afv := xsql.NewAggregateFunctionValuers()
 			pp := &JoinPlan{Joins: stmt.Joins, From: table}
 			pp := &JoinPlan{Joins: stmt.Joins, From: table}
-			result := pp.Apply(ctx, tt.data)
+			result := pp.Apply(ctx, tt.data, fv, afv)
 			if !reflect.DeepEqual(tt.result, result) {
 			if !reflect.DeepEqual(tt.result, result) {
 				t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
 				t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
 			}
 			}
@@ -1310,8 +1312,9 @@ func TestRightJoinPlan_Apply(t *testing.T) {
 		if table, ok := stmt.Sources[0].(*xsql.Table); !ok {
 		if table, ok := stmt.Sources[0].(*xsql.Table); !ok {
 			t.Errorf("statement source is not a table")
 			t.Errorf("statement source is not a table")
 		} else {
 		} else {
+			fv, afv := xsql.NewAggregateFunctionValuers()
 			pp := &JoinPlan{Joins: stmt.Joins, From: table}
 			pp := &JoinPlan{Joins: stmt.Joins, From: table}
-			result := pp.Apply(ctx, tt.data)
+			result := pp.Apply(ctx, tt.data, fv, afv)
 			if !reflect.DeepEqual(tt.result, result) {
 			if !reflect.DeepEqual(tt.result, result) {
 				t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
 				t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
 			}
 			}
@@ -1560,8 +1563,9 @@ func TestFullJoinPlan_Apply(t *testing.T) {
 		if table, ok := stmt.Sources[0].(*xsql.Table); !ok {
 		if table, ok := stmt.Sources[0].(*xsql.Table); !ok {
 			t.Errorf("statement source is not a table")
 			t.Errorf("statement source is not a table")
 		} else {
 		} else {
+			fv, afv := xsql.NewAggregateFunctionValuers()
 			pp := &JoinPlan{Joins: stmt.Joins, From: table}
 			pp := &JoinPlan{Joins: stmt.Joins, From: table}
-			result := pp.Apply(ctx, tt.data)
+			result := pp.Apply(ctx, tt.data, fv, afv)
 			if !reflect.DeepEqual(tt.result, result) {
 			if !reflect.DeepEqual(tt.result, result) {
 				t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
 				t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
 			}
 			}
@@ -1690,8 +1694,9 @@ func TestCrossJoinPlan_Apply(t *testing.T) {
 		if table, ok := stmt.Sources[0].(*xsql.Table); !ok {
 		if table, ok := stmt.Sources[0].(*xsql.Table); !ok {
 			t.Errorf("statement source is not a table")
 			t.Errorf("statement source is not a table")
 		} else {
 		} else {
+			fv, afv := xsql.NewAggregateFunctionValuers()
 			pp := &JoinPlan{Joins: stmt.Joins, From: table}
 			pp := &JoinPlan{Joins: stmt.Joins, From: table}
-			result := pp.Apply(ctx, tt.data)
+			result := pp.Apply(ctx, tt.data, fv, afv)
 			if !reflect.DeepEqual(tt.result, result) {
 			if !reflect.DeepEqual(tt.result, result) {
 				t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
 				t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
 			}
 			}
@@ -1763,8 +1768,9 @@ func TestCrossJoinPlanError(t *testing.T) {
 		if table, ok := stmt.Sources[0].(*xsql.Table); !ok {
 		if table, ok := stmt.Sources[0].(*xsql.Table); !ok {
 			t.Errorf("statement source is not a table")
 			t.Errorf("statement source is not a table")
 		} else {
 		} else {
+			fv, afv := xsql.NewAggregateFunctionValuers()
 			pp := &JoinPlan{Joins: stmt.Joins, From: table}
 			pp := &JoinPlan{Joins: stmt.Joins, From: table}
-			result := pp.Apply(ctx, tt.data)
+			result := pp.Apply(ctx, tt.data, fv, afv)
 			if !reflect.DeepEqual(tt.result, result) {
 			if !reflect.DeepEqual(tt.result, result) {
 				t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
 				t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
 			}
 			}

+ 2 - 1
xsql/plans/math_func_test.go

@@ -467,7 +467,8 @@ func TestMathAndConversionFunc_Apply1(t *testing.T) {
 		}
 		}
 		pp := &ProjectPlan{Fields: stmt.Fields}
 		pp := &ProjectPlan{Fields: stmt.Fields}
 		pp.isTest = true
 		pp.isTest = true
-		result := pp.Apply(ctx, tt.data)
+		fv, afv := xsql.NewAggregateFunctionValuers()
+		result := pp.Apply(ctx, tt.data, fv, afv)
 		var mapRes []map[string]interface{}
 		var mapRes []map[string]interface{}
 		if v, ok := result.([]byte); ok {
 		if v, ok := result.([]byte); ok {
 			err := json.Unmarshal(v, &mapRes)
 			err := json.Unmarshal(v, &mapRes)

+ 6 - 3
xsql/plans/misc_func_test.go

@@ -183,7 +183,8 @@ func TestMiscFunc_Apply1(t *testing.T) {
 		}
 		}
 		pp := &ProjectPlan{Fields: stmt.Fields}
 		pp := &ProjectPlan{Fields: stmt.Fields}
 		pp.isTest = true
 		pp.isTest = true
-		result := pp.Apply(ctx, tt.data)
+		fv, afv := xsql.NewAggregateFunctionValuers()
+		result := pp.Apply(ctx, tt.data, fv, afv)
 		var mapRes []map[string]interface{}
 		var mapRes []map[string]interface{}
 		if v, ok := result.([]byte); ok {
 		if v, ok := result.([]byte); ok {
 			err := json.Unmarshal(v, &mapRes)
 			err := json.Unmarshal(v, &mapRes)
@@ -235,7 +236,8 @@ func TestMqttFunc_Apply2(t *testing.T) {
 		}
 		}
 		pp := &ProjectPlan{Fields: stmt.Fields}
 		pp := &ProjectPlan{Fields: stmt.Fields}
 		pp.isTest = true
 		pp.isTest = true
-		result := pp.Apply(ctx, tt.data)
+		fv, afv := xsql.NewAggregateFunctionValuers()
+		result := pp.Apply(ctx, tt.data, fv, afv)
 		var mapRes []map[string]interface{}
 		var mapRes []map[string]interface{}
 		if v, ok := result.([]byte); ok {
 		if v, ok := result.([]byte); ok {
 			err := json.Unmarshal(v, &mapRes)
 			err := json.Unmarshal(v, &mapRes)
@@ -336,7 +338,8 @@ func TestMetaFunc_Apply1(t *testing.T) {
 		}
 		}
 		pp := &ProjectPlan{Fields: stmt.Fields}
 		pp := &ProjectPlan{Fields: stmt.Fields}
 		pp.isTest = true
 		pp.isTest = true
-		result := pp.Apply(ctx, tt.data)
+		fv, afv := xsql.NewAggregateFunctionValuers()
+		result := pp.Apply(ctx, tt.data, fv, afv)
 		var mapRes []map[string]interface{}
 		var mapRes []map[string]interface{}
 		if v, ok := result.([]byte); ok {
 		if v, ok := result.([]byte); ok {
 			err := json.Unmarshal(v, &mapRes)
 			err := json.Unmarshal(v, &mapRes)

+ 2 - 2
xsql/plans/order_operator.go

@@ -14,10 +14,10 @@ type OrderPlan struct {
  *  input: *xsql.Tuple from preprocessor | xsql.WindowTuplesSet from windowOp | xsql.JoinTupleSets from joinOp
  *  input: *xsql.Tuple from preprocessor | xsql.WindowTuplesSet from windowOp | xsql.JoinTupleSets from joinOp
  *  output: *xsql.Tuple | xsql.WindowTuplesSet | xsql.JoinTupleSets
  *  output: *xsql.Tuple | xsql.WindowTuplesSet | xsql.JoinTupleSets
  */
  */
-func (p *OrderPlan) Apply(ctx api.StreamContext, data interface{}) interface{} {
+func (p *OrderPlan) Apply(ctx api.StreamContext, data interface{}, fv *xsql.FunctionValuer, _ *xsql.AggregateFunctionValuer) interface{} {
 	log := ctx.GetLogger()
 	log := ctx.GetLogger()
 	log.Debugf("order plan receive %s", data)
 	log.Debugf("order plan receive %s", data)
-	sorter := xsql.OrderedBy(p.SortFields)
+	sorter := xsql.OrderedBy(p.SortFields, fv)
 	switch input := data.(type) {
 	switch input := data.(type) {
 	case error:
 	case error:
 		return input
 		return input

+ 41 - 1
xsql/plans/order_test.go

@@ -369,6 +369,45 @@ func TestOrderPlan_Apply(t *testing.T) {
 			},
 			},
 		},
 		},
 		{
 		{
+			sql: "SELECT count(*) as c FROM src1 GROUP BY TUMBLINGWINDOW(ss, 10), f1 ORDER BY c",
+			data: xsql.GroupedTuplesSet{
+				{
+					&xsql.Tuple{
+						Emitter: "src1",
+						Message: xsql.Message{"id1": 1, "f1": "v1", "c": 2},
+					},
+					&xsql.Tuple{
+						Emitter: "src1",
+						Message: xsql.Message{"id1": 3, "f1": "v1"},
+					},
+				},
+				{
+					&xsql.Tuple{
+						Emitter: "src1",
+						Message: xsql.Message{"id1": 2, "f1": "v2", "c": 1},
+					},
+				},
+			},
+			result: xsql.GroupedTuplesSet{
+				{
+					&xsql.Tuple{
+						Emitter: "src1",
+						Message: xsql.Message{"id1": 2, "f1": "v2", "c": 1},
+					},
+				},
+				{
+					&xsql.Tuple{
+						Emitter: "src1",
+						Message: xsql.Message{"id1": 1, "f1": "v1", "c": 2},
+					},
+					&xsql.Tuple{
+						Emitter: "src1",
+						Message: xsql.Message{"id1": 3, "f1": "v1"},
+					},
+				},
+			},
+		},
+		{
 			sql: "SELECT src2.id2 FROM src1 left join src2 on src1.id1 = src2.id2 GROUP BY src2.f2, TUMBLINGWINDOW(ss, 10) ORDER BY src2.id2 DESC",
 			sql: "SELECT src2.id2 FROM src1 left join src2 on src1.id1 = src2.id2 GROUP BY src2.f2, TUMBLINGWINDOW(ss, 10) ORDER BY src2.id2 DESC",
 			data: xsql.GroupedTuplesSet{
 			data: xsql.GroupedTuplesSet{
 				{
 				{
@@ -434,7 +473,8 @@ func TestOrderPlan_Apply(t *testing.T) {
 		}
 		}
 
 
 		pp := &OrderPlan{SortFields: stmt.SortFields}
 		pp := &OrderPlan{SortFields: stmt.SortFields}
-		result := pp.Apply(ctx, tt.data)
+		fv, afv := xsql.NewAggregateFunctionValuers()
+		result := pp.Apply(ctx, tt.data, fv, afv)
 		if !reflect.DeepEqual(tt.result, result) {
 		if !reflect.DeepEqual(tt.result, result) {
 			t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
 			t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
 		}
 		}

+ 16 - 18
xsql/plans/preprocessor.go

@@ -15,14 +15,15 @@ import (
 
 
 type Preprocessor struct {
 type Preprocessor struct {
 	streamStmt      *xsql.StreamStmt
 	streamStmt      *xsql.StreamStmt
-	fields          xsql.Fields
+	aliasFields     xsql.Fields
+	isSelectAll     bool
 	isEventTime     bool
 	isEventTime     bool
 	timestampField  string
 	timestampField  string
 	timestampFormat string
 	timestampFormat string
 }
 }
 
 
-func NewPreprocessor(s *xsql.StreamStmt, fs xsql.Fields, iet bool) (*Preprocessor, error) {
-	p := &Preprocessor{streamStmt: s, fields: fs, isEventTime: iet}
+func NewPreprocessor(s *xsql.StreamStmt, fs xsql.Fields, iet bool, isa bool) (*Preprocessor, error) {
+	p := &Preprocessor{streamStmt: s, aliasFields: fs, isEventTime: iet, isSelectAll: isa}
 	if iet {
 	if iet {
 		if tf, ok := s.Options["TIMESTAMP"]; ok {
 		if tf, ok := s.Options["TIMESTAMP"]; ok {
 			p.timestampField = tf
 			p.timestampField = tf
@@ -41,7 +42,7 @@ func NewPreprocessor(s *xsql.StreamStmt, fs xsql.Fields, iet bool) (*Preprocesso
  *	input: *xsql.Tuple
  *	input: *xsql.Tuple
  *	output: *xsql.Tuple
  *	output: *xsql.Tuple
  */
  */
-func (p *Preprocessor) Apply(ctx api.StreamContext, data interface{}) interface{} {
+func (p *Preprocessor) Apply(ctx api.StreamContext, data interface{}, fv *xsql.FunctionValuer, _ *xsql.AggregateFunctionValuer) interface{} {
 	log := ctx.GetLogger()
 	log := ctx.GetLogger()
 	tuple, ok := data.(*xsql.Tuple)
 	tuple, ok := data.(*xsql.Tuple)
 	if !ok {
 	if !ok {
@@ -53,13 +54,12 @@ func (p *Preprocessor) Apply(ctx api.StreamContext, data interface{}) interface{
 	result := make(map[string]interface{})
 	result := make(map[string]interface{})
 	if p.streamStmt.StreamFields != nil {
 	if p.streamStmt.StreamFields != nil {
 		for _, f := range p.streamStmt.StreamFields {
 		for _, f := range p.streamStmt.StreamFields {
-			fname := strings.ToLower(f.Name)
-			if e := p.addRecField(f.FieldType, result, tuple.Message, fname); e != nil {
+			if e := p.addRecField(f.FieldType, result, tuple.Message, f.Name); e != nil {
 				return fmt.Errorf("error in preprocessor: %s", e)
 				return fmt.Errorf("error in preprocessor: %s", e)
 			}
 			}
 		}
 		}
 	} else {
 	} else {
-		if p.fields.IsSelectAll() {
+		if p.isSelectAll {
 			result = tuple.Message
 			result = tuple.Message
 		} else {
 		} else {
 			result = xsql.LowercaseKeyMap(tuple.Message)
 			result = xsql.LowercaseKeyMap(tuple.Message)
@@ -68,15 +68,13 @@ func (p *Preprocessor) Apply(ctx api.StreamContext, data interface{}) interface{
 
 
 	//If the field has alias name, then evaluate the alias field before transfer it to proceeding operators, and put it into result.
 	//If the field has alias name, then evaluate the alias field before transfer it to proceeding operators, and put it into result.
 	//Otherwise, the GROUP BY, ORDER BY statement cannot get the value.
 	//Otherwise, the GROUP BY, ORDER BY statement cannot get the value.
-	for _, f := range p.fields {
-		if f.AName != "" && (!xsql.HasAggFuncs(f.Expr)) {
-			ve := &xsql.ValuerEval{Valuer: xsql.MultiValuer(tuple, &xsql.FunctionValuer{})}
-			v := ve.Eval(f.Expr)
-			if _, ok := v.(error); ok {
-				return v
-			} else {
-				result[strings.ToLower(f.AName)] = v
-			}
+	for _, f := range p.aliasFields {
+		ve := &xsql.ValuerEval{Valuer: xsql.MultiValuer(tuple, fv)}
+		v := ve.Eval(f.Expr)
+		if _, ok := v.(error); ok {
+			return v
+		} else {
+			result[f.AName] = v
 		}
 		}
 	}
 	}
 
 
@@ -104,8 +102,8 @@ func (p *Preprocessor) parseTime(s string) (time.Time, error) {
 	}
 	}
 }
 }
 
 
-func (p *Preprocessor) addRecField(ft xsql.FieldType, r map[string]interface{}, j map[string]interface{}, n string) error {
-	if t, ok := j[n]; ok {
+func (p *Preprocessor) addRecField(ft xsql.FieldType, r map[string]interface{}, j xsql.Message, n string) error {
+	if t, ok := j.Value(n); ok {
 		v := reflect.ValueOf(t)
 		v := reflect.ValueOf(t)
 		jtype := v.Kind()
 		jtype := v.Kind()
 		switch st := ft.(type) {
 		switch st := ft.(type) {

+ 8 - 4
xsql/plans/preprocessor_test.go

@@ -530,7 +530,8 @@ func TestPreprocessor_Apply(t *testing.T) {
 			return
 			return
 		} else {
 		} else {
 			tuple := &xsql.Tuple{Message: dm}
 			tuple := &xsql.Tuple{Message: dm}
-			result := pp.Apply(ctx, tuple)
+			fv, afv := xsql.NewAggregateFunctionValuers()
+			result := pp.Apply(ctx, tuple, fv, afv)
 			if !reflect.DeepEqual(tt.result, result) {
 			if !reflect.DeepEqual(tt.result, result) {
 				t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tuple, tt.result, result)
 				t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tuple, tt.result, result)
 			}
 			}
@@ -663,7 +664,8 @@ func TestPreprocessorTime_Apply(t *testing.T) {
 			return
 			return
 		} else {
 		} else {
 			tuple := &xsql.Tuple{Message: dm}
 			tuple := &xsql.Tuple{Message: dm}
-			result := pp.Apply(ctx, tuple)
+			fv, afv := xsql.NewAggregateFunctionValuers()
+			result := pp.Apply(ctx, tuple, fv, afv)
 			//workaround make sure all the timezone are the same for time vars or the DeepEqual will be false.
 			//workaround make sure all the timezone are the same for time vars or the DeepEqual will be false.
 			if rt, ok := result.(*xsql.Tuple); ok {
 			if rt, ok := result.(*xsql.Tuple); ok {
 				if rtt, ok := rt.Message["abc"].(time.Time); ok {
 				if rtt, ok := rt.Message["abc"].(time.Time); ok {
@@ -836,7 +838,8 @@ func TestPreprocessorEventtime_Apply(t *testing.T) {
 			return
 			return
 		} else {
 		} else {
 			tuple := &xsql.Tuple{Message: dm}
 			tuple := &xsql.Tuple{Message: dm}
-			result := pp.Apply(ctx, tuple)
+			fv, afv := xsql.NewAggregateFunctionValuers()
+			result := pp.Apply(ctx, tuple, fv, afv)
 			//workaround make sure all the timezone are the same for time vars or the DeepEqual will be false.
 			//workaround make sure all the timezone are the same for time vars or the DeepEqual will be false.
 			if rt, ok := result.(*xsql.Tuple); ok {
 			if rt, ok := result.(*xsql.Tuple); ok {
 				if rtt, ok := rt.Message["abc"].(time.Time); ok {
 				if rtt, ok := rt.Message["abc"].(time.Time); ok {
@@ -914,7 +917,8 @@ func TestPreprocessorError(t *testing.T) {
 			return
 			return
 		} else {
 		} else {
 			tuple := &xsql.Tuple{Message: dm}
 			tuple := &xsql.Tuple{Message: dm}
-			result := pp.Apply(ctx, tuple)
+			fv, afv := xsql.NewAggregateFunctionValuers()
+			result := pp.Apply(ctx, tuple, fv, afv)
 			if !reflect.DeepEqual(tt.result, result) {
 			if !reflect.DeepEqual(tt.result, result) {
 				t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tuple, tt.result, result)
 				t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tuple, tt.result, result)
 			}
 			}

+ 10 - 9
xsql/plans/project_operator.go

@@ -20,7 +20,7 @@ type ProjectPlan struct {
  *  input: *xsql.Tuple from preprocessor or filterOp | xsql.WindowTuplesSet from windowOp or filterOp | xsql.JoinTupleSets from joinOp or filterOp
  *  input: *xsql.Tuple from preprocessor or filterOp | xsql.WindowTuplesSet from windowOp or filterOp | xsql.JoinTupleSets from joinOp or filterOp
  *  output: []map[string]interface{}
  *  output: []map[string]interface{}
  */
  */
-func (pp *ProjectPlan) Apply(ctx api.StreamContext, data interface{}) interface{} {
+func (pp *ProjectPlan) Apply(ctx api.StreamContext, data interface{}, fv *xsql.FunctionValuer, afv *xsql.AggregateFunctionValuer) interface{} {
 	log := ctx.GetLogger()
 	log := ctx.GetLogger()
 	log.Debugf("project plan receive %s", data)
 	log.Debugf("project plan receive %s", data)
 	var results []map[string]interface{}
 	var results []map[string]interface{}
@@ -28,7 +28,7 @@ func (pp *ProjectPlan) Apply(ctx api.StreamContext, data interface{}) interface{
 	case error:
 	case error:
 		return input
 		return input
 	case *xsql.Tuple:
 	case *xsql.Tuple:
-		ve := pp.getVE(input, input)
+		ve := pp.getVE(input, input, fv, afv)
 		if r, err := project(pp.Fields, ve, pp.isTest); err != nil {
 		if r, err := project(pp.Fields, ve, pp.isTest); err != nil {
 			return fmt.Errorf("run Select error: %s", err)
 			return fmt.Errorf("run Select error: %s", err)
 		} else {
 		} else {
@@ -40,7 +40,7 @@ func (pp *ProjectPlan) Apply(ctx api.StreamContext, data interface{}) interface{
 		}
 		}
 		ms := input[0].Tuples
 		ms := input[0].Tuples
 		for _, v := range ms {
 		for _, v := range ms {
-			ve := pp.getVE(&v, input)
+			ve := pp.getVE(&v, input, fv, afv)
 			if r, err := project(pp.Fields, ve, pp.isTest); err != nil {
 			if r, err := project(pp.Fields, ve, pp.isTest); err != nil {
 				return fmt.Errorf("run Select error: %s", err)
 				return fmt.Errorf("run Select error: %s", err)
 			} else {
 			} else {
@@ -53,7 +53,7 @@ func (pp *ProjectPlan) Apply(ctx api.StreamContext, data interface{}) interface{
 	case xsql.JoinTupleSets:
 	case xsql.JoinTupleSets:
 		ms := input
 		ms := input
 		for _, v := range ms {
 		for _, v := range ms {
-			ve := pp.getVE(&v, input)
+			ve := pp.getVE(&v, input, fv, afv)
 			if r, err := project(pp.Fields, ve, pp.isTest); err != nil {
 			if r, err := project(pp.Fields, ve, pp.isTest); err != nil {
 				return err
 				return err
 			} else {
 			} else {
@@ -65,7 +65,7 @@ func (pp *ProjectPlan) Apply(ctx api.StreamContext, data interface{}) interface{
 		}
 		}
 	case xsql.GroupedTuplesSet:
 	case xsql.GroupedTuplesSet:
 		for _, v := range input {
 		for _, v := range input {
-			ve := pp.getVE(v[0], v)
+			ve := pp.getVE(v[0], v, fv, afv)
 			if r, err := project(pp.Fields, ve, pp.isTest); err != nil {
 			if r, err := project(pp.Fields, ve, pp.isTest); err != nil {
 				return fmt.Errorf("run Select error: %s", err)
 				return fmt.Errorf("run Select error: %s", err)
 			} else {
 			} else {
@@ -83,11 +83,12 @@ func (pp *ProjectPlan) Apply(ctx api.StreamContext, data interface{}) interface{
 	}
 	}
 }
 }
 
 
-func (pp *ProjectPlan) getVE(tuple xsql.DataValuer, agg xsql.AggregateData) *xsql.ValuerEval {
+func (pp *ProjectPlan) getVE(tuple xsql.DataValuer, agg xsql.AggregateData, fv *xsql.FunctionValuer, afv *xsql.AggregateFunctionValuer) *xsql.ValuerEval {
+	afv.SetData(agg)
 	if pp.IsAggregate {
 	if pp.IsAggregate {
-		return &xsql.ValuerEval{Valuer: xsql.MultiAggregateValuer(agg, tuple, &xsql.FunctionValuer{}, &xsql.AggregateFunctionValuer{Data: agg}, &xsql.WildcardValuer{Data: tuple})}
+		return &xsql.ValuerEval{Valuer: xsql.MultiAggregateValuer(agg, fv, tuple, fv, afv, &xsql.WildcardValuer{Data: tuple})}
 	} else {
 	} else {
-		return &xsql.ValuerEval{Valuer: xsql.MultiValuer(tuple, &xsql.FunctionValuer{}, &xsql.WildcardValuer{Data: tuple})}
+		return &xsql.ValuerEval{Valuer: xsql.MultiValuer(tuple, fv, &xsql.WildcardValuer{Data: tuple})}
 	}
 	}
 }
 }
 
 
@@ -95,7 +96,7 @@ func project(fs xsql.Fields, ve *xsql.ValuerEval, isTest bool) (map[string]inter
 	result := make(map[string]interface{})
 	result := make(map[string]interface{})
 	for _, f := range fs {
 	for _, f := range fs {
 		//Avoid to re-evaluate for non-agg field has alias name, which was already evaluated in pre-processor operator.
 		//Avoid to re-evaluate for non-agg field has alias name, which was already evaluated in pre-processor operator.
-		if f.AName != "" && (!xsql.HasAggFuncs(f.Expr)) && !isTest {
+		if f.AName != "" && !isTest {
 			fr := &xsql.FieldRef{StreamName: "", Name: f.AName}
 			fr := &xsql.FieldRef{StreamName: "", Name: f.AName}
 			v := ve.Eval(fr)
 			v := ve.Eval(fr)
 			if e, ok := v.(error); ok {
 			if e, ok := v.(error); ok {

+ 106 - 14
xsql/plans/project_test.go

@@ -389,7 +389,8 @@ func TestProjectPlan_Apply1(t *testing.T) {
 
 
 		pp := &ProjectPlan{Fields: stmt.Fields}
 		pp := &ProjectPlan{Fields: stmt.Fields}
 		pp.isTest = true
 		pp.isTest = true
-		result := pp.Apply(ctx, tt.data)
+		fv, afv := xsql.NewAggregateFunctionValuers()
+		result := pp.Apply(ctx, tt.data, fv, afv)
 		var mapRes []map[string]interface{}
 		var mapRes []map[string]interface{}
 		if v, ok := result.([]byte); ok {
 		if v, ok := result.([]byte); ok {
 			err := json.Unmarshal(v, &mapRes)
 			err := json.Unmarshal(v, &mapRes)
@@ -939,7 +940,8 @@ func TestProjectPlan_MultiInput(t *testing.T) {
 
 
 		pp := &ProjectPlan{Fields: stmt.Fields}
 		pp := &ProjectPlan{Fields: stmt.Fields}
 		pp.isTest = true
 		pp.isTest = true
-		result := pp.Apply(ctx, tt.data)
+		fv, afv := xsql.NewAggregateFunctionValuers()
+		result := pp.Apply(ctx, tt.data, fv, afv)
 		var mapRes []map[string]interface{}
 		var mapRes []map[string]interface{}
 		if v, ok := result.([]byte); ok {
 		if v, ok := result.([]byte); ok {
 			err := json.Unmarshal(v, &mapRes)
 			err := json.Unmarshal(v, &mapRes)
@@ -1139,7 +1141,8 @@ func TestProjectPlan_Funcs(t *testing.T) {
 		}
 		}
 		pp := &ProjectPlan{Fields: stmt.Fields, IsAggregate: xsql.IsAggStatement(stmt)}
 		pp := &ProjectPlan{Fields: stmt.Fields, IsAggregate: xsql.IsAggStatement(stmt)}
 		pp.isTest = true
 		pp.isTest = true
-		result := pp.Apply(ctx, tt.data)
+		fv, afv := xsql.NewAggregateFunctionValuers()
+		result := pp.Apply(ctx, tt.data, fv, afv)
 		var mapRes []map[string]interface{}
 		var mapRes []map[string]interface{}
 		if v, ok := result.([]byte); ok {
 		if v, ok := result.([]byte); ok {
 			err := json.Unmarshal(v, &mapRes)
 			err := json.Unmarshal(v, &mapRes)
@@ -1171,7 +1174,7 @@ func TestProjectPlan_AggFuncs(t *testing.T) {
 				{
 				{
 					&xsql.JoinTuple{
 					&xsql.JoinTuple{
 						Tuples: []xsql.Tuple{
 						Tuples: []xsql.Tuple{
-							{Emitter: "test", Message: xsql.Message{"id": 1, "a": 122.33}},
+							{Emitter: "test", Message: xsql.Message{"id": 1, "a": 122.33, "c": 2, "r": 122}},
 							{Emitter: "src2", Message: xsql.Message{"id": 1, "color": "w2"}},
 							{Emitter: "src2", Message: xsql.Message{"id": 1, "color": "w2"}},
 						},
 						},
 					},
 					},
@@ -1185,7 +1188,7 @@ func TestProjectPlan_AggFuncs(t *testing.T) {
 				{
 				{
 					&xsql.JoinTuple{
 					&xsql.JoinTuple{
 						Tuples: []xsql.Tuple{
 						Tuples: []xsql.Tuple{
-							{Emitter: "test", Message: xsql.Message{"id": 2, "a": 89.03}},
+							{Emitter: "test", Message: xsql.Message{"id": 2, "a": 89.03, "c": 2, "r": 89}},
 							{Emitter: "src2", Message: xsql.Message{"id": 2, "color": "w1"}},
 							{Emitter: "src2", Message: xsql.Message{"id": 2, "color": "w1"}},
 						},
 						},
 					},
 					},
@@ -1206,7 +1209,7 @@ func TestProjectPlan_AggFuncs(t *testing.T) {
 			}},
 			}},
 		},
 		},
 		{
 		{
-			sql: "SELECT avg(a) as avg FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10), test1.color",
+			sql: "SELECT avg(a) FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10), test1.color",
 			data: xsql.GroupedTuplesSet{
 			data: xsql.GroupedTuplesSet{
 				{
 				{
 					&xsql.JoinTuple{
 					&xsql.JoinTuple{
@@ -1256,7 +1259,7 @@ func TestProjectPlan_AggFuncs(t *testing.T) {
 			}},
 			}},
 		},
 		},
 		{
 		{
-			sql: "SELECT max(a) as max FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10), test1.color",
+			sql: "SELECT max(a) FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10), test1.color",
 			data: xsql.GroupedTuplesSet{
 			data: xsql.GroupedTuplesSet{
 				{
 				{
 					&xsql.JoinTuple{
 					&xsql.JoinTuple{
@@ -1300,7 +1303,7 @@ func TestProjectPlan_AggFuncs(t *testing.T) {
 			}},
 			}},
 		},
 		},
 		{
 		{
-			sql: "SELECT min(a) as min FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10)",
+			sql: "SELECT min(a) FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10)",
 			data: xsql.JoinTupleSets{
 			data: xsql.JoinTupleSets{
 				xsql.JoinTuple{
 				xsql.JoinTuple{
 					Tuples: []xsql.Tuple{
 					Tuples: []xsql.Tuple{
@@ -1326,7 +1329,33 @@ func TestProjectPlan_AggFuncs(t *testing.T) {
 				"min": 68.55,
 				"min": 68.55,
 			}},
 			}},
 		}, {
 		}, {
-			sql: "SELECT sum(a) as sum FROM test GROUP BY TumblingWindow(ss, 10)",
+			sql: "SELECT min(a) as m FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10)",
+			data: xsql.JoinTupleSets{
+				xsql.JoinTuple{
+					Tuples: []xsql.Tuple{
+						{Emitter: "test", Message: xsql.Message{"id": 1, "a": 122.33, "m": 68.55}},
+						{Emitter: "src2", Message: xsql.Message{"id": 1, "color": "w2"}},
+					},
+				},
+				xsql.JoinTuple{
+					Tuples: []xsql.Tuple{
+						{Emitter: "test", Message: xsql.Message{"id": 1, "a": 68.55}},
+						{Emitter: "src2", Message: xsql.Message{"id": 1, "color": "w2"}},
+					},
+				},
+				xsql.JoinTuple{
+					Tuples: []xsql.Tuple{
+						{Emitter: "test", Message: xsql.Message{"id": 5, "a": 177.51}},
+						{Emitter: "src2", Message: xsql.Message{"id": 5, "color": "w2"}},
+					},
+				},
+			},
+
+			result: []map[string]interface{}{{
+				"m": 68.55,
+			}},
+		}, {
+			sql: "SELECT sum(a) FROM test GROUP BY TumblingWindow(ss, 10)",
 			data: xsql.WindowTuplesSet{
 			data: xsql.WindowTuplesSet{
 				xsql.WindowTuples{
 				xsql.WindowTuples{
 					Emitter: "test",
 					Emitter: "test",
@@ -1348,7 +1377,29 @@ func TestProjectPlan_AggFuncs(t *testing.T) {
 				"sum": float64(123203),
 				"sum": float64(123203),
 			}},
 			}},
 		}, {
 		}, {
-			sql: "SELECT sum(a) as sum FROM test GROUP BY TumblingWindow(ss, 10)",
+			sql: "SELECT sum(a) as s FROM test GROUP BY TumblingWindow(ss, 10)",
+			data: xsql.WindowTuplesSet{
+				xsql.WindowTuples{
+					Emitter: "test",
+					Tuples: []xsql.Tuple{
+						{
+							Emitter: "src1",
+							Message: xsql.Message{"a": 53, "s": 123203},
+						}, {
+							Emitter: "src1",
+							Message: xsql.Message{"a": 27},
+						}, {
+							Emitter: "src1",
+							Message: xsql.Message{"a": 123123},
+						},
+					},
+				},
+			},
+			result: []map[string]interface{}{{
+				"s": float64(123203),
+			}},
+		}, {
+			sql: "SELECT sum(a) FROM test GROUP BY TumblingWindow(ss, 10)",
 			data: xsql.WindowTuplesSet{
 			data: xsql.WindowTuplesSet{
 				xsql.WindowTuples{
 				xsql.WindowTuples{
 					Emitter: "test",
 					Emitter: "test",
@@ -1371,7 +1422,7 @@ func TestProjectPlan_AggFuncs(t *testing.T) {
 			}},
 			}},
 		},
 		},
 		{
 		{
-			sql: "SELECT count(*) as c, meta(test1.device) as d FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10), test1.color",
+			sql: "SELECT count(*), meta(test1.device) FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10), test1.color",
 			data: xsql.GroupedTuplesSet{
 			data: xsql.GroupedTuplesSet{
 				{
 				{
 					&xsql.JoinTuple{
 					&xsql.JoinTuple{
@@ -1403,6 +1454,46 @@ func TestProjectPlan_AggFuncs(t *testing.T) {
 				},
 				},
 			},
 			},
 			result: []map[string]interface{}{{
 			result: []map[string]interface{}{{
+				"count": float64(2),
+				"meta":  "devicea",
+			}, {
+				"count": float64(2),
+				"meta":  "devicec",
+			}},
+		},
+		{
+			sql: "SELECT count(*) as c, meta(test1.device) as d FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10), test1.color",
+			data: xsql.GroupedTuplesSet{
+				{
+					&xsql.JoinTuple{
+						Tuples: []xsql.Tuple{
+							{Emitter: "test", Message: xsql.Message{"id": 1, "a": 122.33, "c": 2, "d": "devicea"}},
+							{Emitter: "test1", Message: xsql.Message{"id": 1, "color": "w2"}, Metadata: xsql.Metadata{"device": "devicea"}},
+						},
+					},
+					&xsql.JoinTuple{
+						Tuples: []xsql.Tuple{
+							{Emitter: "test", Message: xsql.Message{"id": 5, "a": 177.51}},
+							{Emitter: "test1", Message: xsql.Message{"id": 5, "color": "w2"}, Metadata: xsql.Metadata{"device": "deviceb"}},
+						},
+					},
+				},
+				{
+					&xsql.JoinTuple{
+						Tuples: []xsql.Tuple{
+							{Emitter: "test", Message: xsql.Message{"id": 2, "a": 89.03, "c": 2, "d": "devicec"}},
+							{Emitter: "test1", Message: xsql.Message{"id": 2, "color": "w1"}, Metadata: xsql.Metadata{"device": "devicec"}},
+						},
+					},
+					&xsql.JoinTuple{
+						Tuples: []xsql.Tuple{
+							{Emitter: "test", Message: xsql.Message{"id": 4, "a": 14.6}},
+							{Emitter: "test1", Message: xsql.Message{"id": 4, "color": "w1"}, Metadata: xsql.Metadata{"device": "deviced"}},
+						},
+					},
+				},
+			},
+			result: []map[string]interface{}{{
 				"c": float64(2),
 				"c": float64(2),
 				"d": "devicea",
 				"d": "devicea",
 			}, {
 			}, {
@@ -1421,8 +1512,8 @@ func TestProjectPlan_AggFuncs(t *testing.T) {
 			t.Error(err)
 			t.Error(err)
 		}
 		}
 		pp := &ProjectPlan{Fields: stmt.Fields, IsAggregate: true}
 		pp := &ProjectPlan{Fields: stmt.Fields, IsAggregate: true}
-		pp.isTest = true
-		result := pp.Apply(ctx, tt.data)
+		fv, afv := xsql.NewAggregateFunctionValuers()
+		result := pp.Apply(ctx, tt.data, fv, afv)
 		var mapRes []map[string]interface{}
 		var mapRes []map[string]interface{}
 		if v, ok := result.([]byte); ok {
 		if v, ok := result.([]byte); ok {
 			err := json.Unmarshal(v, &mapRes)
 			err := json.Unmarshal(v, &mapRes)
@@ -1572,7 +1663,8 @@ func TestProjectPlanError(t *testing.T) {
 
 
 		pp := &ProjectPlan{Fields: stmt.Fields, IsAggregate: xsql.IsAggStatement(stmt)}
 		pp := &ProjectPlan{Fields: stmt.Fields, IsAggregate: xsql.IsAggStatement(stmt)}
 		pp.isTest = true
 		pp.isTest = true
-		result := pp.Apply(ctx, tt.data)
+		fv, afv := xsql.NewAggregateFunctionValuers()
+		result := pp.Apply(ctx, tt.data, fv, afv)
 		if !reflect.DeepEqual(tt.result, result) {
 		if !reflect.DeepEqual(tt.result, result) {
 			t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
 			t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
 		}
 		}

+ 2 - 1
xsql/plans/str_func_test.go

@@ -419,7 +419,8 @@ func TestStrFunc_Apply1(t *testing.T) {
 		}
 		}
 		pp := &ProjectPlan{Fields: stmt.Fields}
 		pp := &ProjectPlan{Fields: stmt.Fields}
 		pp.isTest = true
 		pp.isTest = true
-		result := pp.Apply(ctx, tt.data)
+		fv, afv := xsql.NewAggregateFunctionValuers()
+		result := pp.Apply(ctx, tt.data, fv, afv)
 		var mapRes []map[string]interface{}
 		var mapRes []map[string]interface{}
 		if v, ok := result.([]byte); ok {
 		if v, ok := result.([]byte); ok {
 			err := json.Unmarshal(v, &mapRes)
 			err := json.Unmarshal(v, &mapRes)

+ 6 - 1
xsql/processors/extension_test.go

@@ -52,7 +52,7 @@ func TestExtensions(t *testing.T) {
 	}{
 	}{
 		{
 		{
 			name: `$$test1`,
 			name: `$$test1`,
-			rj:   "{\"sql\": \"SELECT echo(count) as e, countPlusOne(count) as p FROM ext where count > 49\",\"actions\": [{\"file\":  {\"path\":\"" + CACHE_FILE + "\"}}]}",
+			rj:   "{\"sql\": \"SELECT count(echo(count)) as c, echo(count) as e, countPlusOne(count) as p FROM ext where count > 49\",\"actions\": [{\"file\":  {\"path\":\"" + CACHE_FILE + "\"}}]}",
 		},
 		},
 	}
 	}
 	fmt.Printf("The test bucket size is %d.\n\n", len(tests))
 	fmt.Printf("The test bucket size is %d.\n\n", len(tests))
@@ -105,6 +105,11 @@ func TestExtensions(t *testing.T) {
 				break
 				break
 			}
 			}
 			r := r[0]
 			r := r[0]
+			c := int((r["c"]).(float64))
+			if c != 1 {
+				t.Errorf("%d. %q\n\nresult mismatch:\n\ngot=%#v\n\n", i, tt.rj, maps)
+				break
+			}
 			e := int((r["e"]).(float64))
 			e := int((r["e"]).(float64))
 			if e != 50 && e != 51 {
 			if e != 50 && e != 51 {
 				t.Errorf("%d. %q\n\nresult mismatch:\n\ngot=%#v\n\n", i, tt.rj, maps)
 				t.Errorf("%d. %q\n\nresult mismatch:\n\ngot=%#v\n\n", i, tt.rj, maps)

+ 14 - 4
xsql/processors/xsql_processor.go

@@ -406,12 +406,22 @@ func (p *RuleProcessor) createTopoWithSources(rule *api.Rule, sources []*nodes.S
 			}
 			}
 			defer store.Close()
 			defer store.Close()
 
 
+			var alias, aggregateAlias xsql.Fields
+			for _, f := range selectStmt.Fields {
+				if f.AName != "" {
+					if !xsql.HasAggFuncs(f.Expr) {
+						alias = append(alias, f)
+					} else {
+						aggregateAlias = append(aggregateAlias, f)
+					}
+				}
+			}
 			for i, s := range streamsFromStmt {
 			for i, s := range streamsFromStmt {
 				streamStmt, err := GetStream(store, s)
 				streamStmt, err := GetStream(store, s)
 				if err != nil {
 				if err != nil {
 					return nil, nil, fmt.Errorf("fail to get stream %s, please check if stream is created", s)
 					return nil, nil, fmt.Errorf("fail to get stream %s, please check if stream is created", s)
 				}
 				}
-				pp, err := plans.NewPreprocessor(streamStmt, selectStmt.Fields, isEventTime)
+				pp, err := plans.NewPreprocessor(streamStmt, alias, isEventTime, selectStmt.Fields.IsSelectAll())
 				if err != nil {
 				if err != nil {
 					return nil, nil, err
 					return nil, nil, err
 				}
 				}
@@ -459,10 +469,10 @@ func (p *RuleProcessor) createTopoWithSources(rule *api.Rule, sources []*nodes.S
 			}
 			}
 
 
 			var ds xsql.Dimensions
 			var ds xsql.Dimensions
-			if dimensions != nil {
+			if dimensions != nil || len(aggregateAlias) > 0 {
 				ds = dimensions.GetGroups()
 				ds = dimensions.GetGroups()
-				if ds != nil && len(ds) > 0 {
-					aggregateOp := xstream.Transform(&plans.AggregatePlan{Dimensions: ds}, "aggregate", bufferLength)
+				if (ds != nil && len(ds) > 0) || len(aggregateAlias) > 0 {
+					aggregateOp := xstream.Transform(&plans.AggregatePlan{Dimensions: ds, Alias: aggregateAlias}, "aggregate", bufferLength)
 					aggregateOp.SetConcurrency(concurrency)
 					aggregateOp.SetConcurrency(concurrency)
 					tp.AddOperator(inputs, aggregateOp)
 					tp.AddOperator(inputs, aggregateOp)
 					inputs = []api.Emitter{aggregateOp}
 					inputs = []api.Emitter{aggregateOp}

+ 50 - 0
xsql/processors/xsql_processor_test.go

@@ -1895,6 +1895,56 @@ func TestWindow(t *testing.T) {
 				"op_window_0_records_in_total":   int64(5),
 				"op_window_0_records_in_total":   int64(5),
 				"op_window_0_records_out_total":  int64(3),
 				"op_window_0_records_out_total":  int64(3),
 			},
 			},
+		}, {
+			name: `rule8`,
+			sql:  `SELECT color, ts, count(*) as c FROM demo where size > 2 GROUP BY tumblingwindow(ss, 1) having c > 1`,
+			size: 5,
+			r: [][]map[string]interface{}{
+				{{
+					"color": "red",
+					"ts":    float64(1541152486013),
+					"c":     float64(2),
+				}},
+			},
+			m: map[string]interface{}{
+				"op_preprocessor_demo_0_exceptions_total":   int64(0),
+				"op_preprocessor_demo_0_process_latency_ms": int64(0),
+				"op_preprocessor_demo_0_records_in_total":   int64(5),
+				"op_preprocessor_demo_0_records_out_total":  int64(5),
+
+				"op_project_0_exceptions_total":   int64(0),
+				"op_project_0_process_latency_ms": int64(0),
+				"op_project_0_records_in_total":   int64(1),
+				"op_project_0_records_out_total":  int64(1),
+
+				"sink_mockSink_0_exceptions_total":  int64(0),
+				"sink_mockSink_0_records_in_total":  int64(1),
+				"sink_mockSink_0_records_out_total": int64(1),
+
+				"source_demo_0_exceptions_total":  int64(0),
+				"source_demo_0_records_in_total":  int64(5),
+				"source_demo_0_records_out_total": int64(5),
+
+				"op_window_0_exceptions_total":   int64(0),
+				"op_window_0_process_latency_ms": int64(0),
+				"op_window_0_records_in_total":   int64(5),
+				"op_window_0_records_out_total":  int64(3),
+
+				"op_filter_0_exceptions_total":   int64(0),
+				"op_filter_0_process_latency_ms": int64(0),
+				"op_filter_0_records_in_total":   int64(3),
+				"op_filter_0_records_out_total":  int64(2),
+
+				"op_aggregate_0_exceptions_total":   int64(0),
+				"op_aggregate_0_process_latency_ms": int64(0),
+				"op_aggregate_0_records_in_total":   int64(2),
+				"op_aggregate_0_records_out_total":  int64(2),
+
+				"op_having_0_exceptions_total":   int64(0),
+				"op_having_0_process_latency_ms": int64(0),
+				"op_having_0_records_in_total":   int64(2),
+				"op_having_0_records_out_total":  int64(1),
+			},
 		},
 		},
 	}
 	}
 	fmt.Printf("The test bucket size is %d.\n\n", len(tests))
 	fmt.Printf("The test bucket size is %d.\n\n", len(tests))

+ 5 - 6
xstream/extensions/edgex_source.go

@@ -215,16 +215,15 @@ func (es *EdgexSource) getValue(r models.Reading, logger api.Logger) (interface{
 
 
 func (es *EdgexSource) getFloatValue(r models.Reading, logger api.Logger) (interface{}, error) {
 func (es *EdgexSource) getFloatValue(r models.Reading, logger api.Logger) (interface{}, error) {
 	if len(r.FloatEncoding) == 0 {
 	if len(r.FloatEncoding) == 0 {
-		if strings.Contains(r.Value, "==") {
+		if strings.Contains(r.Value, "=") {
 			r.FloatEncoding = models.Base64Encoding
 			r.FloatEncoding = models.Base64Encoding
 		} else {
 		} else {
 			r.FloatEncoding = models.ENotation
 			r.FloatEncoding = models.ENotation
 		}
 		}
 	}
 	}
-	switch r.ValueType {
-	case models.ValueTypeFloat32:
+	switch strings.ToLower(r.ValueType) {
+	case strings.ToLower(models.ValueTypeFloat32):
 		var value float64
 		var value float64
-
 		switch r.FloatEncoding {
 		switch r.FloatEncoding {
 		case models.Base64Encoding:
 		case models.Base64Encoding:
 			data, err := base64.StdEncoding.DecodeString(r.Value)
 			data, err := base64.StdEncoding.DecodeString(r.Value)
@@ -253,7 +252,7 @@ func (es *EdgexSource) getFloatValue(r models.Reading, logger api.Logger) (inter
 		}
 		}
 		return value, nil
 		return value, nil
 
 
-	case models.ValueTypeFloat64:
+	case strings.ToLower(models.ValueTypeFloat64):
 		var value float64
 		var value float64
 		switch r.FloatEncoding {
 		switch r.FloatEncoding {
 		case models.Base64Encoding:
 		case models.Base64Encoding:
@@ -278,7 +277,7 @@ func (es *EdgexSource) getFloatValue(r models.Reading, logger api.Logger) (inter
 			return false, fmt.Errorf("unkown FloatEncoding for float64 value: %s", r.FloatEncoding)
 			return false, fmt.Errorf("unkown FloatEncoding for float64 value: %s", r.FloatEncoding)
 		}
 		}
 	default:
 	default:
-		return nil, fmt.Errorf("unkown value type: %s, %v", r.ValueType, r)
+		return nil, fmt.Errorf("unkown value type: %s, reading:%v", r.ValueType, r)
 	}
 	}
 }
 }
 
 

+ 6 - 9
xstream/nodes/sink_node.go

@@ -287,7 +287,10 @@ func doCollect(sink api.Sink, item *CacheTuple, stats StatManager, retryInterval
 }
 }
 
 
 func doGetSink(name string, action map[string]interface{}) (api.Sink, error) {
 func doGetSink(name string, action map[string]interface{}) (api.Sink, error) {
-	var s api.Sink
+	var (
+		s   api.Sink
+		err error
+	)
 	switch name {
 	switch name {
 	case "log":
 	case "log":
 		s = sinks.NewLogSink()
 		s = sinks.NewLogSink()
@@ -300,18 +303,12 @@ func doGetSink(name string, action map[string]interface{}) (api.Sink, error) {
 	case "nop":
 	case "nop":
 		s = &sinks.NopSink{}
 		s = &sinks.NopSink{}
 	default:
 	default:
-		nf, err := plugins.GetPlugin(name, plugins.SINK)
+		s, err = plugins.GetSink(name)
 		if err != nil {
 		if err != nil {
 			return nil, err
 			return nil, err
 		}
 		}
-		var ok bool
-		s, ok = nf.(api.Sink)
-		if !ok {
-			return nil, fmt.Errorf("exported symbol %s is not type of api.Sink", name)
-		}
 	}
 	}
-
-	err := s.Configure(action)
+	err = s.Configure(action)
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err
 	}
 	}

+ 5 - 7
xstream/nodes/source_node.go

@@ -147,20 +147,18 @@ func (m *SourceNode) reset() {
 }
 }
 
 
 func doGetSource(t string) (api.Source, error) {
 func doGetSource(t string) (api.Source, error) {
-	var s api.Source
-	var ok bool
+	var (
+		s   api.Source
+		err error
+	)
 	switch t {
 	switch t {
 	case "mqtt":
 	case "mqtt":
 		s = &extensions.MQTTSource{}
 		s = &extensions.MQTTSource{}
 	default:
 	default:
-		nf, err := plugins.GetPlugin(t, plugins.SOURCE)
+		s, err = plugins.GetSource(t)
 		if err != nil {
 		if err != nil {
 			return nil, err
 			return nil, err
 		}
 		}
-		s, ok = nf.(api.Source)
-		if !ok {
-			return nil, fmt.Errorf("exported symbol %s is not type of api.Source", t)
-		}
 	}
 	}
 	return s, nil
 	return s, nil
 }
 }

+ 4 - 2
xstream/operators/operations.go

@@ -2,6 +2,7 @@ package operators
 
 
 import (
 import (
 	"fmt"
 	"fmt"
+	"github.com/emqx/kuiper/xsql"
 	"github.com/emqx/kuiper/xstream/api"
 	"github.com/emqx/kuiper/xstream/api"
 	"github.com/emqx/kuiper/xstream/nodes"
 	"github.com/emqx/kuiper/xstream/nodes"
 	"sync"
 	"sync"
@@ -9,7 +10,7 @@ import (
 
 
 // UnOperation interface represents unary operations (i.e. Map, Filter, etc)
 // UnOperation interface represents unary operations (i.e. Map, Filter, etc)
 type UnOperation interface {
 type UnOperation interface {
-	Apply(ctx api.StreamContext, data interface{}) interface{}
+	Apply(ctx api.StreamContext, data interface{}, fv *xsql.FunctionValuer, afv *xsql.AggregateFunctionValuer) interface{}
 }
 }
 
 
 // UnFunc implements UnOperation as type func (context.Context, interface{})
 // UnFunc implements UnOperation as type func (context.Context, interface{})
@@ -125,6 +126,7 @@ func (o *UnaryOperator) doOp(ctx api.StreamContext, errCh chan<- error) {
 	o.mutex.Lock()
 	o.mutex.Lock()
 	o.statManagers = append(o.statManagers, stats)
 	o.statManagers = append(o.statManagers, stats)
 	o.mutex.Unlock()
 	o.mutex.Unlock()
+	fv, afv := xsql.NewAggregateFunctionValuers()
 
 
 	for {
 	for {
 		select {
 		select {
@@ -132,7 +134,7 @@ func (o *UnaryOperator) doOp(ctx api.StreamContext, errCh chan<- error) {
 		case item := <-o.input:
 		case item := <-o.input:
 			stats.IncTotalRecordsIn()
 			stats.IncTotalRecordsIn()
 			stats.ProcessTimeStart()
 			stats.ProcessTimeStart()
-			result := o.op.Apply(exeCtx, item)
+			result := o.op.Apply(exeCtx, item, fv, afv)
 
 
 			switch val := result.(type) {
 			switch val := result.(type) {
 			case nil:
 			case nil: