Quellcode durchsuchen

add lowercase support for field names with select * statement

RockyJin vor 4 Jahren
Ursprung
Commit
74022e4bd8

+ 2 - 2
fvt_scripts/select_all_rule.jmx

@@ -259,7 +259,7 @@
               <boolProp name="mqtt.add_timestamp">false</boolProp>
               <boolProp name="mqtt.add_timestamp">false</boolProp>
               <stringProp name="mqtt.message_type">String</stringProp>
               <stringProp name="mqtt.message_type">String</stringProp>
               <stringProp name="mqtt.message_type_fixed_length">1024</stringProp>
               <stringProp name="mqtt.message_type_fixed_length">1024</stringProp>
-              <stringProp name="mqtt.message_to_sent">{&quot;temperature&quot;: ${temperature}, &quot;humidity&quot; : ${humidity}}</stringProp>
+              <stringProp name="mqtt.message_to_sent">{&quot;Temperature&quot;: ${temperature}, &quot;humidity&quot; : ${humidity}}</stringProp>
             </net.xmeter.samplers.PubSampler>
             </net.xmeter.samplers.PubSampler>
             <hashTree/>
             <hashTree/>
           </hashTree>
           </hashTree>
@@ -484,7 +484,7 @@
         </net.xmeter.samplers.SubSampler>
         </net.xmeter.samplers.SubSampler>
         <hashTree>
         <hashTree>
           <JSONPathAssertion guiclass="JSONPathAssertionGui" testclass="JSONPathAssertion" testname="temperature Assertion" enabled="true">
           <JSONPathAssertion guiclass="JSONPathAssertionGui" testclass="JSONPathAssertion" testname="temperature Assertion" enabled="true">
-            <stringProp name="JSON_PATH">$[0].temperature</stringProp>
+            <stringProp name="JSON_PATH">$[0].Temperature</stringProp>
             <stringProp name="EXPECTED_VALUE">${temperature}</stringProp>
             <stringProp name="EXPECTED_VALUE">${temperature}</stringProp>
             <boolProp name="JSONVALIDATION">true</boolProp>
             <boolProp name="JSONVALIDATION">true</boolProp>
             <boolProp name="EXPECT_NULL">false</boolProp>
             <boolProp name="EXPECT_NULL">false</boolProp>

+ 2 - 2
fvt_scripts/select_edgex_another_bus_rule.jmx

@@ -470,7 +470,7 @@
         </net.xmeter.samplers.SubSampler>
         </net.xmeter.samplers.SubSampler>
         <hashTree>
         <hashTree>
           <JSONPathAssertion guiclass="JSONPathAssertionGui" testclass="JSONPathAssertion" testname="temperature Assertion" enabled="true">
           <JSONPathAssertion guiclass="JSONPathAssertionGui" testclass="JSONPathAssertion" testname="temperature Assertion" enabled="true">
-            <stringProp name="JSON_PATH">$[0].temperature</stringProp>
+            <stringProp name="JSON_PATH">$[0].Temperature</stringProp>
             <stringProp name="EXPECTED_VALUE"></stringProp>
             <stringProp name="EXPECTED_VALUE"></stringProp>
             <boolProp name="JSONVALIDATION">false</boolProp>
             <boolProp name="JSONVALIDATION">false</boolProp>
             <boolProp name="EXPECT_NULL">false</boolProp>
             <boolProp name="EXPECT_NULL">false</boolProp>
@@ -479,7 +479,7 @@
           </JSONPathAssertion>
           </JSONPathAssertion>
           <hashTree/>
           <hashTree/>
           <JSONPathAssertion guiclass="JSONPathAssertionGui" testclass="JSONPathAssertion" testname="humidity Assertion" enabled="true">
           <JSONPathAssertion guiclass="JSONPathAssertionGui" testclass="JSONPathAssertion" testname="humidity Assertion" enabled="true">
-            <stringProp name="JSON_PATH">$[0].humidity</stringProp>
+            <stringProp name="JSON_PATH">$[0].Humidity</stringProp>
             <stringProp name="EXPECTED_VALUE"></stringProp>
             <stringProp name="EXPECTED_VALUE"></stringProp>
             <boolProp name="JSONVALIDATION">false</boolProp>
             <boolProp name="JSONVALIDATION">false</boolProp>
             <boolProp name="EXPECT_NULL">false</boolProp>
             <boolProp name="EXPECT_NULL">false</boolProp>

+ 4 - 4
fvt_scripts/select_edgex_condition_rule.jmx

@@ -442,7 +442,7 @@
         </net.xmeter.samplers.SubSampler>
         </net.xmeter.samplers.SubSampler>
         <hashTree>
         <hashTree>
           <JSONPathAssertion guiclass="JSONPathAssertionGui" testclass="JSONPathAssertion" testname="temperature Assertion" enabled="true">
           <JSONPathAssertion guiclass="JSONPathAssertionGui" testclass="JSONPathAssertion" testname="temperature Assertion" enabled="true">
-            <stringProp name="JSON_PATH">$[0].temperature</stringProp>
+            <stringProp name="JSON_PATH">$[0].Temperature</stringProp>
             <stringProp name="EXPECTED_VALUE"></stringProp>
             <stringProp name="EXPECTED_VALUE"></stringProp>
             <boolProp name="JSONVALIDATION">false</boolProp>
             <boolProp name="JSONVALIDATION">false</boolProp>
             <boolProp name="EXPECT_NULL">false</boolProp>
             <boolProp name="EXPECT_NULL">false</boolProp>
@@ -451,7 +451,7 @@
           </JSONPathAssertion>
           </JSONPathAssertion>
           <hashTree/>
           <hashTree/>
           <JSONPathAssertion guiclass="JSONPathAssertionGui" testclass="JSONPathAssertion" testname="humidity Assertion" enabled="true">
           <JSONPathAssertion guiclass="JSONPathAssertionGui" testclass="JSONPathAssertion" testname="humidity Assertion" enabled="true">
-            <stringProp name="JSON_PATH">$[0].humidity</stringProp>
+            <stringProp name="JSON_PATH">$[0].Humidity</stringProp>
             <stringProp name="EXPECTED_VALUE"></stringProp>
             <stringProp name="EXPECTED_VALUE"></stringProp>
             <boolProp name="JSONVALIDATION">false</boolProp>
             <boolProp name="JSONVALIDATION">false</boolProp>
             <boolProp name="EXPECT_NULL">false</boolProp>
             <boolProp name="EXPECT_NULL">false</boolProp>
@@ -475,10 +475,10 @@ import net.sf.json.JSONObject;
 String response = SampleResult.getResponseDataAsString();
 String response = SampleResult.getResponseDataAsString();
 JSONArray arr = JSONArray.fromObject(response);
 JSONArray arr = JSONArray.fromObject(response);
 JSONObject json = arr.getJSONObject(0);
 JSONObject json = arr.getJSONObject(0);
-int temp = json.getInt(&quot;temperature&quot;);
+int temp = json.getInt(&quot;Temperature&quot;);
 if(temp &lt;= 30) {
 if(temp &lt;= 30) {
 	Failure = true;
 	Failure = true;
-	FailureMessage = &quot;The temperature result should not be less than 30!&quot;;
+	FailureMessage = &quot;The Temperature result should not be less than 30!&quot;;
 }</stringProp>
 }</stringProp>
             <stringProp name="BeanShellAssertion.filename"></stringProp>
             <stringProp name="BeanShellAssertion.filename"></stringProp>
             <stringProp name="BeanShellAssertion.parameters"></stringProp>
             <stringProp name="BeanShellAssertion.parameters"></stringProp>

+ 7 - 4
xsql/ast.go

@@ -579,11 +579,14 @@ func (m Metadata) Meta(key string) (interface{}, bool) {
 	return msg.Meta(key)
 	return msg.Meta(key)
 }
 }
 
 
+type OriginalKeys map[string]interface{}
+
 type Tuple struct {
 type Tuple struct {
-	Emitter   string
-	Message   Message
-	Timestamp int64
-	Metadata  Metadata
+	Emitter      string
+	Message      Message
+	Timestamp    int64
+	Metadata     Metadata
+	OriginalKeys OriginalKeys
 }
 }
 
 
 func (t *Tuple) Value(key string) (interface{}, bool) {
 func (t *Tuple) Value(key string) (interface{}, bool) {

+ 16 - 7
xsql/plans/project_operator.go

@@ -28,8 +28,9 @@ func (pp *ProjectPlan) Apply(ctx api.StreamContext, data interface{}) interface{
 	case error:
 	case error:
 		return input
 		return input
 	case *xsql.Tuple:
 	case *xsql.Tuple:
+		okeys := input.OriginalKeys
 		ve := pp.getVE(input, input)
 		ve := pp.getVE(input, input)
-		if r, err := project(pp.Fields, ve, pp.isTest); err != nil {
+		if r, err := project(pp.Fields, ve, okeys, pp.isTest); err != nil {
 			return fmt.Errorf("run Select error: %s", err)
 			return fmt.Errorf("run Select error: %s", err)
 		} else {
 		} else {
 			results = append(results, r)
 			results = append(results, r)
@@ -41,7 +42,7 @@ func (pp *ProjectPlan) Apply(ctx api.StreamContext, data interface{}) interface{
 		ms := input[0].Tuples
 		ms := input[0].Tuples
 		for _, v := range ms {
 		for _, v := range ms {
 			ve := pp.getVE(&v, input)
 			ve := pp.getVE(&v, input)
-			if r, err := project(pp.Fields, ve, pp.isTest); err != nil {
+			if r, err := project(pp.Fields, ve, nil, pp.isTest); err != nil {
 				return fmt.Errorf("run Select error: %s", err)
 				return fmt.Errorf("run Select error: %s", err)
 			} else {
 			} else {
 				results = append(results, r)
 				results = append(results, r)
@@ -54,7 +55,7 @@ func (pp *ProjectPlan) Apply(ctx api.StreamContext, data interface{}) interface{
 		ms := input
 		ms := input
 		for _, v := range ms {
 		for _, v := range ms {
 			ve := pp.getVE(&v, input)
 			ve := pp.getVE(&v, input)
-			if r, err := project(pp.Fields, ve, pp.isTest); err != nil {
+			if r, err := project(pp.Fields, ve, nil, pp.isTest); err != nil {
 				return err
 				return err
 			} else {
 			} else {
 				results = append(results, r)
 				results = append(results, r)
@@ -66,7 +67,7 @@ func (pp *ProjectPlan) Apply(ctx api.StreamContext, data interface{}) interface{
 	case xsql.GroupedTuplesSet:
 	case xsql.GroupedTuplesSet:
 		for _, v := range input {
 		for _, v := range input {
 			ve := pp.getVE(v[0], v)
 			ve := pp.getVE(v[0], v)
-			if r, err := project(pp.Fields, ve, pp.isTest); err != nil {
+			if r, err := project(pp.Fields, ve, nil, pp.isTest); err != nil {
 				return fmt.Errorf("run Select error: %s", err)
 				return fmt.Errorf("run Select error: %s", err)
 			} else {
 			} else {
 				results = append(results, r)
 				results = append(results, r)
@@ -91,7 +92,7 @@ func (pp *ProjectPlan) getVE(tuple xsql.DataValuer, agg xsql.AggregateData) *xsq
 	}
 	}
 }
 }
 
 
-func project(fs xsql.Fields, ve *xsql.ValuerEval, isTest bool) (map[string]interface{}, error) {
+func project(fs xsql.Fields, ve *xsql.ValuerEval, okeys xsql.OriginalKeys, isTest bool) (map[string]interface{}, error) {
 	result := make(map[string]interface{})
 	result := make(map[string]interface{})
 	for _, f := range fs {
 	for _, f := range fs {
 		//Avoid to re-evaluate for non-agg field has alias name, which was already evaluated in pre-processor operator.
 		//Avoid to re-evaluate for non-agg field has alias name, which was already evaluated in pre-processor operator.
@@ -112,12 +113,18 @@ func project(fs xsql.Fields, ve *xsql.ValuerEval, isTest bool) (map[string]inter
 				case map[string]interface{}:
 				case map[string]interface{}:
 					for k, v := range val {
 					for k, v := range val {
 						if _, ok := result[k]; !ok {
 						if _, ok := result[k]; !ok {
-							result[k] = v
+							if ok, okey := xsql.GetOriginalKey(k, okeys); ok {
+								result[okey] = v
+							} else {
+								result[k] = v
+							}
 						}
 						}
 					}
 					}
 				case xsql.Message:
 				case xsql.Message:
 					for k, v := range val {
 					for k, v := range val {
-						if _, ok := result[k]; !ok {
+						if ok, okey := xsql.GetOriginalKey(k, okeys); ok {
+							result[okey] = v
+						} else {
 							result[k] = v
 							result[k] = v
 						}
 						}
 					}
 					}
@@ -137,6 +144,8 @@ func project(fs xsql.Fields, ve *xsql.ValuerEval, isTest bool) (map[string]inter
 	return result, nil
 	return result, nil
 }
 }
 
 
+
+
 const DEFAULT_FIELD_NAME_PREFIX string = "rengine_field_"
 const DEFAULT_FIELD_NAME_PREFIX string = "rengine_field_"
 
 
 func assignName(name, alias string, fields map[string]interface{}) string {
 func assignName(name, alias string, fields map[string]interface{}) string {

+ 15 - 2
xsql/util.go

@@ -48,14 +48,27 @@ func GetStreams(stmt *SelectStatement) (result []string) {
 	return
 	return
 }
 }
 
 
-func LowercaseKeyMap(m map[string]interface{}) map[string]interface{} {
+func LowercaseKeyMap(m map[string]interface{}, orig map[string]interface{}) map[string]interface{} {
 	m1 := make(map[string]interface{})
 	m1 := make(map[string]interface{})
 	for k, v := range m {
 	for k, v := range m {
 		if m2, ok := v.(map[string]interface{}); ok {
 		if m2, ok := v.(map[string]interface{}); ok {
-			m1[strings.ToLower(k)] = LowercaseKeyMap(m2)
+			o1 := make(map[string]interface{})
+			orig[k] = o1
+			m1[strings.ToLower(k)] = LowercaseKeyMap(m2, o1)
 		} else {
 		} else {
 			m1[strings.ToLower(k)] = v
 			m1[strings.ToLower(k)] = v
+			orig[k] = nil
 		}
 		}
 	}
 	}
 	return m1
 	return m1
 }
 }
+
+//TODO To handle nested types?
+func GetOriginalKey(lkey string, okeys map[string]interface{}) (bool, string){
+	for k, _ := range okeys {
+		if strings.ToLower(k) == lkey {
+			return true, k
+		}
+	}
+	return false, ""
+}

+ 28 - 3
xsql/util_test.go

@@ -8,14 +8,19 @@ import (
 
 
 func TestLowercaseKeyMap(t *testing.T) {
 func TestLowercaseKeyMap(t *testing.T) {
 	var tests = []struct {
 	var tests = []struct {
-		src  map[string]interface{}
-		dest map[string]interface{}
+		src    map[string]interface{}
+		expOrg map[string]interface{}
+		dest   map[string]interface{}
 	}{
 	}{
 		{
 		{
 			src: map[string]interface{}{
 			src: map[string]interface{}{
 				"Key1": "value1",
 				"Key1": "value1",
 				"key2": "value2",
 				"key2": "value2",
 			},
 			},
+			expOrg: map[string]interface{} {
+				"Key1": nil,
+				"key2": nil,
+			},
 			dest: map[string]interface{}{
 			dest: map[string]interface{}{
 				"key1": "value1",
 				"key1": "value1",
 				"key2": "value2",
 				"key2": "value2",
@@ -29,6 +34,12 @@ func TestLowercaseKeyMap(t *testing.T) {
 					"Sub1": "sub_value1",
 					"Sub1": "sub_value1",
 				},
 				},
 			},
 			},
+			expOrg: map[string]interface{} {
+				"Key1": nil,
+				"Complex": map[string]interface{} {
+					"Sub1": nil,
+				},
+			},
 			dest: map[string]interface{}{
 			dest: map[string]interface{}{
 				"key1": "value1",
 				"key1": "value1",
 				"complex": map[string]interface{}{
 				"complex": map[string]interface{}{
@@ -47,6 +58,15 @@ func TestLowercaseKeyMap(t *testing.T) {
 					},
 					},
 				},
 				},
 			},
 			},
+			expOrg: map[string]interface{} {
+				"Key1": nil,
+				"Complex": map[string]interface{}{
+					"Sub1": nil,
+					"Sub1_2": map[string]interface{}{
+						"Sub2": nil,
+					},
+				},
+			},
 			dest: map[string]interface{}{
 			dest: map[string]interface{}{
 				"key1": "value1",
 				"key1": "value1",
 				"complex": map[string]interface{}{
 				"complex": map[string]interface{}{
@@ -62,9 +82,14 @@ func TestLowercaseKeyMap(t *testing.T) {
 	fmt.Printf("The test bucket size is %d.\n\n", len(tests))
 	fmt.Printf("The test bucket size is %d.\n\n", len(tests))
 	for i, tt := range tests {
 	for i, tt := range tests {
 		//fmt.Printf("Parsing SQL %q.\n", tt.s)
 		//fmt.Printf("Parsing SQL %q.\n", tt.s)
-		result := LowercaseKeyMap(tt.src)
+		origKeys := make(map[string]interface{})
+		result := LowercaseKeyMap(tt.src, origKeys)
 		if !reflect.DeepEqual(tt.dest, result) {
 		if !reflect.DeepEqual(tt.dest, result) {
 			t.Errorf("%d. \nstmt mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.dest, result)
 			t.Errorf("%d. \nstmt mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.dest, result)
 		}
 		}
+
+		if !reflect.DeepEqual(tt.expOrg, origKeys) {
+			t.Errorf("%d. \nstmt mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.dest, result)
+		}
 	}
 	}
 }
 }

+ 15 - 0
xstream/api/stream.go

@@ -7,17 +7,29 @@ import (
 type SourceTuple interface {
 type SourceTuple interface {
 	Message() map[string]interface{}
 	Message() map[string]interface{}
 	Meta() map[string]interface{}
 	Meta() map[string]interface{}
+	OriginalKeys() map[string]interface{}
 }
 }
 
 
 type DefaultSourceTuple struct {
 type DefaultSourceTuple struct {
 	message map[string]interface{}
 	message map[string]interface{}
 	meta    map[string]interface{}
 	meta    map[string]interface{}
+	origkey map[string]interface{}
 }
 }
 
 
 func NewDefaultSourceTuple(message map[string]interface{}, meta map[string]interface{}) *DefaultSourceTuple {
 func NewDefaultSourceTuple(message map[string]interface{}, meta map[string]interface{}) *DefaultSourceTuple {
+	origkey := make(map[string]interface{})
 	return &DefaultSourceTuple{
 	return &DefaultSourceTuple{
 		message: message,
 		message: message,
 		meta:    meta,
 		meta:    meta,
+		origkey: origkey,
+	}
+}
+
+func NewDefaultSourceTupleWithOrigKey(message map[string]interface{}, meta map[string]interface{}, origkeys map[string]interface{}) *DefaultSourceTuple {
+	return &DefaultSourceTuple{
+		message: message,
+		meta:    meta,
+		origkey: origkeys,
 	}
 	}
 }
 }
 
 
@@ -27,6 +39,9 @@ func (t *DefaultSourceTuple) Message() map[string]interface{} {
 func (t *DefaultSourceTuple) Meta() map[string]interface{} {
 func (t *DefaultSourceTuple) Meta() map[string]interface{} {
 	return t.meta
 	return t.meta
 }
 }
+func (t *DefaultSourceTuple) OriginalKeys() map[string]interface{} {
+	return t.origkey
+}
 
 
 type Logger interface {
 type Logger interface {
 	Debug(args ...interface{})
 	Debug(args ...interface{})

+ 3 - 2
xstream/extensions/edgex_source.go

@@ -122,7 +122,7 @@ func (es *EdgexSource) Open(ctx api.StreamContext, consumer chan<- api.SourceTup
 					} else {
 					} else {
 						result := make(map[string]interface{})
 						result := make(map[string]interface{})
 						meta := make(map[string]interface{})
 						meta := make(map[string]interface{})
-
+						origKeys := make(map[string]interface{})
 						log.Debugf("receive message %s from device %s", env.Payload, e.Device)
 						log.Debugf("receive message %s from device %s", env.Payload, e.Device)
 						for _, r := range e.Readings {
 						for _, r := range e.Readings {
 							if r.Name != "" {
 							if r.Name != "" {
@@ -139,6 +139,7 @@ func (es *EdgexSource) Open(ctx api.StreamContext, consumer chan<- api.SourceTup
 								r_meta["pushed"] = r.Pushed
 								r_meta["pushed"] = r.Pushed
 								r_meta["device"] = r.Device
 								r_meta["device"] = r.Device
 								meta[strings.ToLower(r.Name)] = r_meta
 								meta[strings.ToLower(r.Name)] = r_meta
+								origKeys[r.Name] = nil
 							} else {
 							} else {
 								log.Warnf("The name of readings should not be empty!")
 								log.Warnf("The name of readings should not be empty!")
 							}
 							}
@@ -153,7 +154,7 @@ func (es *EdgexSource) Open(ctx api.StreamContext, consumer chan<- api.SourceTup
 							meta["correlationid"] = env.CorrelationID
 							meta["correlationid"] = env.CorrelationID
 
 
 							select {
 							select {
-							case consumer <- api.NewDefaultSourceTuple(result, meta):
+							case consumer <- api.NewDefaultSourceTupleWithOrigKey(result, meta, origKeys):
 								log.Debugf("send data to device node")
 								log.Debugf("send data to device node")
 							case <-ctx.Done():
 							case <-ctx.Done():
 								return
 								return

+ 5 - 2
xstream/extensions/mqtt_source.go

@@ -150,14 +150,17 @@ func subscribe(topic string, client MQTT.Client, ctx api.StreamContext, consumer
 			log.Errorf("Invalid data format, cannot convert %s into JSON with error %s", string(msg.Payload()), e)
 			log.Errorf("Invalid data format, cannot convert %s into JSON with error %s", string(msg.Payload()), e)
 			return
 			return
 		}
 		}
+		originkey := make(map[string]interface{})
 		//Convert the keys to lowercase
 		//Convert the keys to lowercase
-		result = xsql.LowercaseKeyMap(result)
+		result = xsql.LowercaseKeyMap(result, originkey)
 
 
 		meta := make(map[string]interface{})
 		meta := make(map[string]interface{})
 		meta["topic"] = msg.Topic()
 		meta["topic"] = msg.Topic()
 		meta["messageid"] = strconv.Itoa(int(msg.MessageID()))
 		meta["messageid"] = strconv.Itoa(int(msg.MessageID()))
+
+
 		select {
 		select {
-		case consumer <- api.NewDefaultSourceTuple(result, meta):
+		case consumer <- api.NewDefaultSourceTupleWithOrigKey(result, meta, originkey):
 			log.Debugf("send data to source node")
 			log.Debugf("send data to source node")
 		case <-ctx.Done():
 		case <-ctx.Done():
 			return
 			return

+ 1 - 1
xstream/nodes/source_node.go

@@ -125,7 +125,7 @@ func (m *SourceNode) Open(ctx api.StreamContext, errCh chan<- error) {
 					case data := <-buffer.Out:
 					case data := <-buffer.Out:
 						stats.IncTotalRecordsIn()
 						stats.IncTotalRecordsIn()
 						stats.ProcessTimeStart()
 						stats.ProcessTimeStart()
-						tuple := &xsql.Tuple{Emitter: m.name, Message: data.Message(), Timestamp: common.GetNowInMilli(), Metadata: data.Meta()}
+						tuple := &xsql.Tuple{Emitter: m.name, Message: data.Message(), Timestamp: common.GetNowInMilli(), Metadata: data.Meta(), OriginalKeys: data.OriginalKeys()}
 						stats.ProcessTimeEnd()
 						stats.ProcessTimeEnd()
 						//blocking
 						//blocking
 						Broadcast(m.outs, tuple, ctx)
 						Broadcast(m.outs, tuple, ctx)