Quellcode durchsuchen

fix: do not print out the password in config & meta fields problem (#770)

* fix(preprocessor): multiple meta field may be set

* fix(edgex): do not print out the password in config

* fix(test): add more waiting time for frequent timeout test

* fix(build): avoid printing password in conf util
ngjaying vor 4 Jahren
Ursprung
Commit
d1122eea81

+ 25 - 2
deploy/docker/conf_util.go

@@ -66,7 +66,7 @@ func main() {
 		if bs, err := yaml.Marshal(v); err != nil {
 			fmt.Println(err)
 		} else {
-			message := fmt.Sprintf("-------------------\nConf file %s: \n %s", f, string(bs))
+			message := fmt.Sprintf("-------------------\nConf file %s: \n %s", f, printable(v))
 			fmt.Println(message)
 			if fname, ok := fileMap[f]; ok {
 				if e := ioutil.WriteFile(fname, bs, 0644); e != nil {
@@ -79,6 +79,29 @@ func main() {
 	}
 }
 
+func printable(m map[interface{}]interface{}) map[interface{}]interface{} {
+	printableMap := make(map[interface{}]interface{})
+	for k, v := range m {
+		ks, ok := k.(string)
+		if ok && strings.ToLower(ks) == "password" {
+			printableMap[k] = "*"
+		} else {
+			if vm, ok := v.(map[interface{}]interface{}); ok {
+				printableMap[k] = printable(vm)
+			} else {
+				printableMap[k] = v
+			}
+		}
+	}
+	return printableMap
+}
+
+func toPrintableString(m map[interface{}]interface{}) string {
+	p := printable(m)
+	b, _ := yaml.Marshal(p)
+	return string(b)
+}
+
 func ProcessEnv(files map[string]map[interface{}]interface{}, vars []string) {
 	for _, e := range vars {
 		pair := strings.SplitN(e, "=", 2)
@@ -116,7 +139,7 @@ func ProcessEnv(files map[string]map[interface{}]interface{}, vars []string) {
 					fmt.Printf("%s\n", err)
 				} else {
 					m := make(map[interface{}]interface{})
-					err = yaml.Unmarshal([]byte(data), &m)
+					err = yaml.Unmarshal(data, &m)
 					if err != nil {
 						fmt.Println(err)
 					}

+ 13 - 0
deploy/docker/conf_util_test.go

@@ -1,6 +1,7 @@
 package main
 
 import (
+	"fmt"
 	"reflect"
 	"testing"
 )
@@ -171,11 +172,13 @@ func TestProcessEnv(t *testing.T) {
 		vars []string
 		file string
 		expt map[interface{}]interface{}
+		out  string
 	}{
 		{
 			vars: []string{
 				"EDGEX__DEFAULT__TYPE=zmq",
 				"EDGEX__DEFAULT__OPTIONAL__CLIENTID=clientid_0000",
+				"EDGEX__DEFAULT__OPTIONAL__PASSWORD=should_not_print",
 				"EDGEX__APPLICATION_CONF__PROTOCOL=ssl",
 			},
 			file: "edgex",
@@ -185,12 +188,14 @@ func TestProcessEnv(t *testing.T) {
 					"type":     "zmq",
 					"optional": map[interface{}]interface{}{
 						"ClientId": "clientid_0000",
+						"Password": "should_not_print",
 					},
 				},
 				"application_conf": map[interface{}]interface{}{
 					"protocol": "ssl",
 				},
 			},
+			out: "application_conf:\n  protocol: ssl\ndefault:\n  optional:\n    ClientId: clientid_0000\n    Password: '*'\n  protocol: tcp\n  type: zmq\n",
 		},
 	}
 	files := make(map[string]map[interface{}]interface{})
@@ -199,6 +204,14 @@ func TestProcessEnv(t *testing.T) {
 		if !reflect.DeepEqual(tt.expt, files[tt.file]) {
 			t.Errorf("%d \tresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.expt, files[tt.file])
 		}
+		for f, v := range files {
+			p := toPrintableString(v)
+			if !reflect.DeepEqual(tt.out, p) {
+				t.Errorf("%d \tresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.out, p)
+			}
+			message := fmt.Sprintf("-------------------\nConf file %s: \n %s", f, p)
+			fmt.Println(message)
+		}
 	}
 }
 

+ 13 - 13
xstream/extensions/edgex_source.go

@@ -29,7 +29,7 @@ type EdgexSource struct {
 	valueDescs map[string]string
 }
 
-func (es *EdgexSource) Configure(device string, props map[string]interface{}) error {
+func (es *EdgexSource) Configure(_ string, props map[string]interface{}) error {
 	if f, ok := props["format"]; ok {
 		if f != common.FORMAT_JSON {
 			return fmt.Errorf("edgex source only supports `json` format")
@@ -84,6 +84,7 @@ func (es *EdgexSource) Configure(device string, props map[string]interface{}) er
 		}
 		mbconf.Optional = optional
 	}
+	printConf(mbconf)
 	common.Log.Infof("Use configuration for edgex messagebus %v\n", mbconf)
 
 	if client, err := messaging.NewMessageClient(mbconf); err != nil {
@@ -95,19 +96,18 @@ func (es *EdgexSource) Configure(device string, props map[string]interface{}) er
 
 }
 
-func castToString(v interface{}) (result string, ok bool) {
-	switch v := v.(type) {
-	case int:
-		return strconv.Itoa(v), true
-	case string:
-		return v, true
-	case bool:
-		return strconv.FormatBool(v), true
-	case float64, float32:
-		return fmt.Sprintf("%.2f", v), true
-	default:
-		return "", false
+// Modify the copied conf to print no password.
+func printConf(mbconf types.MessageBusConfig) {
+	var printableOptional = make(map[string]string)
+	for k, v := range mbconf.Optional {
+		if strings.ToLower(k) == "password" {
+			printableOptional[k] = "*"
+		} else {
+			printableOptional[k] = v
+		}
 	}
+	mbconf.Optional = printableOptional
+	common.Log.Infof("Use configuration for edgex messagebus %v\n", mbconf)
 }
 
 func (es *EdgexSource) Open(ctx api.StreamContext, consumer chan<- api.SourceTuple, errCh chan<- error) {

+ 19 - 0
xstream/extensions/edgex_source_test.go

@@ -6,7 +6,9 @@ import (
 	"encoding/json"
 	"fmt"
 	"github.com/edgexfoundry/go-mod-core-contracts/models"
+	"github.com/edgexfoundry/go-mod-messaging/pkg/types"
 	"github.com/emqx/kuiper/common"
+	"reflect"
 	"testing"
 )
 
@@ -312,3 +314,20 @@ func TestCastToString(t *testing.T) {
 		t.Errorf("Failed to cast float.")
 	}
 }
+
+func TestPrintConf(t *testing.T) {
+	expMbconf := types.MessageBusConfig{SubscribeHost: types.HostInfo{Protocol: "tcp", Host: "127.0.0.1", Port: 6625}, Type: "mbus", Optional: map[string]string{
+		"proa":     "proa",
+		"Password": "fafsadfsadf=",
+		"Prob":     "Prob",
+	}}
+	mbconf := types.MessageBusConfig{SubscribeHost: types.HostInfo{Protocol: "tcp", Host: "127.0.0.1", Port: 6625}, Type: "mbus", Optional: map[string]string{
+		"proa":     "proa",
+		"Password": "fafsadfsadf=",
+		"Prob":     "Prob",
+	}}
+	printConf(mbconf)
+	if !reflect.DeepEqual(expMbconf, mbconf) {
+		t.Errorf("conf changed after printing")
+	}
+}

+ 4 - 1
xstream/operators/preprocessor.go

@@ -5,6 +5,7 @@ import (
 	"github.com/emqx/kuiper/common"
 	"github.com/emqx/kuiper/xsql"
 	"github.com/emqx/kuiper/xstream/api"
+	"strings"
 )
 
 type Preprocessor struct {
@@ -59,7 +60,9 @@ func (p *Preprocessor) Apply(ctx api.StreamContext, data interface{}, fv *xsql.F
 	if !p.allMeta && p.metaFields != nil && len(p.metaFields) > 0 {
 		newMeta := make(xsql.Metadata)
 		for _, f := range p.metaFields {
-			newMeta[f] = tuple.Metadata[f]
+			if m, ok := tuple.Metadata.Value(f); ok {
+				newMeta[strings.ToLower(f)] = m
+			}
 		}
 		tuple.Metadata = newMeta
 	}

+ 7 - 5
xstream/planner/dataSourcePlan.go

@@ -26,7 +26,7 @@ type DataSourcePlan struct {
 	// intermediate status
 	isWildCard bool
 	fields     map[string]interface{}
-	metaMap    map[string]bool
+	metaMap    map[string]string
 }
 
 func (p DataSourcePlan) Init() *DataSourcePlan {
@@ -76,7 +76,7 @@ func (p *DataSourcePlan) PruneColumns(fields []xsql.Expr) error {
 	p.getProps()
 	p.fields = make(map[string]interface{})
 	if !p.allMeta {
-		p.metaMap = make(map[string]bool)
+		p.metaMap = make(map[string]string)
 	}
 	if p.timestampField != "" {
 		p.fields[p.timestampField] = p.timestampField
@@ -103,7 +103,7 @@ func (p *DataSourcePlan) PruneColumns(fields []xsql.Expr) error {
 					p.allMeta = true
 					p.metaMap = nil
 				} else if !p.allMeta {
-					p.metaMap[f.Name] = true
+					p.metaMap[strings.ToLower(f.Name)] = f.Name
 				}
 			}
 		case *xsql.SortField:
@@ -164,9 +164,11 @@ func (p *DataSourcePlan) getAllFields() {
 		p.streamFields = sfs
 	}
 	p.metaFields = make([]string, 0, len(p.metaMap))
-	for k, _ := range p.metaMap {
-		p.metaFields = append(p.metaFields, k)
+	for _, v := range p.metaMap {
+		p.metaFields = append(p.metaFields, v)
 	}
+	// for consistency of results for testing
+	sort.Strings(p.metaFields)
 	p.fields = nil
 	p.metaMap = nil
 }

+ 82 - 0
xstream/planner/planner_test.go

@@ -814,6 +814,88 @@ func Test_createLogicalPlan(t *testing.T) {
 				isAggregate: false,
 				sendMeta:    false,
 			}.Init(),
+		}, { // 9 meta
+			sql: `SELECT temp, meta(id) AS eid,meta(Humidity->Device) AS hdevice FROM src1 WHERE meta(device)="demo2"`,
+			p: ProjectPlan{
+				baseLogicalPlan: baseLogicalPlan{
+					children: []LogicalPlan{
+						FilterPlan{
+							baseLogicalPlan: baseLogicalPlan{
+								children: []LogicalPlan{
+									DataSourcePlan{
+										name: "src1",
+										streamFields: []interface{}{
+											&xsql.StreamField{
+												Name:      "temp",
+												FieldType: &xsql.BasicType{Type: xsql.BIGINT},
+											},
+										},
+										streamStmt: streams["src1"],
+										metaFields: []string{"Humidity", "device", "id"},
+										alias: xsql.Fields{
+											xsql.Field{
+												Expr: &xsql.Call{Name: "meta", Args: []xsql.Expr{&xsql.MetaRef{
+													Name: "id",
+												}}},
+												Name:  "meta",
+												AName: "eid",
+											},
+											xsql.Field{
+												Expr: &xsql.Call{Name: "meta", Args: []xsql.Expr{
+													&xsql.BinaryExpr{
+														OP:  xsql.ARROW,
+														LHS: &xsql.MetaRef{Name: "Humidity"},
+														RHS: &xsql.MetaRef{Name: "Device"},
+													},
+												}},
+												Name:  "meta",
+												AName: "hdevice",
+											},
+										},
+									}.Init(),
+								},
+							},
+							condition: &xsql.BinaryExpr{
+								LHS: &xsql.Call{
+									Name: "meta",
+									Args: []xsql.Expr{&xsql.MetaRef{
+										Name: "device",
+									}},
+								},
+								OP: xsql.EQ,
+								RHS: &xsql.StringLiteral{
+									Val: "demo2",
+								},
+							},
+						}.Init(),
+					},
+				},
+				fields: []xsql.Field{
+					{
+						Expr:  &xsql.FieldRef{Name: "temp"},
+						Name:  "temp",
+						AName: "",
+					}, {
+						Expr: &xsql.Call{Name: "meta", Args: []xsql.Expr{&xsql.MetaRef{
+							Name: "id",
+						}}},
+						Name:  "meta",
+						AName: "eid",
+					}, {
+						Expr: &xsql.Call{Name: "meta", Args: []xsql.Expr{
+							&xsql.BinaryExpr{
+								OP:  xsql.ARROW,
+								LHS: &xsql.MetaRef{Name: "Humidity"},
+								RHS: &xsql.MetaRef{Name: "Device"},
+							},
+						}},
+						Name:  "meta",
+						AName: "hdevice",
+					},
+				},
+				isAggregate: false,
+				sendMeta:    false,
+			}.Init(),
 		},
 	}
 	fmt.Printf("The test bucket size is %d.\n\n", len(tests))

+ 0 - 19
xstream/topotest/mock_topo.go

@@ -49,23 +49,6 @@ func getImg() ([]byte, string) {
 	return image, b64img
 }
 
-func CleanStateData() {
-	//dbDir, err := common.GetDataLoc()
-	//if err != nil {
-	//	common.Log.Panic(err)
-	//}
-	//c := path.Join(dbDir, "checkpoints")
-	//err = os.RemoveAll(c)
-	//if err != nil {
-	//	common.Log.Errorf("%s", err)
-	//}
-	//s := path.Join(dbDir, "sink", "cache")
-	//err = os.RemoveAll(s)
-	//if err != nil {
-	//	common.Log.Errorf("%s", err)
-	//}
-}
-
 func compareMetrics(tp *xstream.TopologyNew, m map[string]interface{}) (err error) {
 	keys, values := tp.GetMetrics()
 	for k, v := range m {
@@ -1105,8 +1088,6 @@ func sendData(t *testing.T, dataLength int, metrics map[string]interface{}, data
 }
 
 func createStream(t *testing.T, tt RuleTest, j int, opt *api.RuleOption, sinkProps map[string]interface{}) ([][]*xsql.Tuple, int, *xstream.TopologyNew, *mocknodes.MockSink, <-chan error) {
-	// Rest for each test
-	CleanStateData()
 	mockclock.ResetClock(1541152486000)
 	// Create stream
 	var (

+ 1 - 0
xstream/topotest/window_rule_test.go

@@ -284,6 +284,7 @@ func TestWindow(t *testing.T) {
 		}, {
 			Name: `TestWindowRule5`,
 			Sql:  `SELECT temp FROM sessionDemo GROUP BY SessionWindow(ss, 2, 1) `,
+			W:    10,
 			R: [][]map[string]interface{}{
 				{{
 					"temp": 25.5,