ソースを参照

feat(lookup): add fields parameter to the lookup func

Enable to create a better SQL template

Signed-off-by: Jiyong Huang <huangjy@emqx.io>
Jiyong Huang 2 年 前
コミット
fc5a3c6c8f

+ 18 - 7
extensions/sources/sql/sqlLookup.go

@@ -27,9 +27,9 @@ type sqlLookupConfig struct {
 }
 }
 
 
 type sqlLookupSource struct {
 type sqlLookupSource struct {
-	url      string
-	template string
-	db       *sql.DB
+	url   string
+	table string
+	db    *sql.DB
 }
 }
 
 
 // Open establish a connection to the database
 // Open establish a connection to the database
@@ -57,13 +57,24 @@ func (s *sqlLookupSource) Configure(datasource string, props map[string]interfac
 		return fmt.Errorf("dburl.Parse %s fail with error: %v", cfg.Url, err)
 		return fmt.Errorf("dburl.Parse %s fail with error: %v", cfg.Url, err)
 	}
 	}
 	s.url = cfg.Url
 	s.url = cfg.Url
-	s.template = fmt.Sprintf("SELECT * FROM `%s` WHERE ", datasource)
+	s.table = datasource
 	return nil
 	return nil
 }
 }
 
 
-func (s *sqlLookupSource) Lookup(ctx api.StreamContext, keys []string, values []interface{}) ([]api.SourceTuple, error) {
+func (s *sqlLookupSource) Lookup(ctx api.StreamContext, fields []string, keys []string, values []interface{}) ([]api.SourceTuple, error) {
 	ctx.GetLogger().Debug("Start to lookup tuple")
 	ctx.GetLogger().Debug("Start to lookup tuple")
-	query := s.template
+	query := "SELECT "
+	if len(fields) == 0 {
+		query += "*"
+	} else {
+		for i, f := range fields {
+			if i > 0 {
+				query += ","
+			}
+			query += f
+		}
+	}
+	query += fmt.Sprintf(" FROM %s WHERE ", s.table)
 	for i, k := range keys {
 	for i, k := range keys {
 		if i > 0 {
 		if i > 0 {
 			query += " AND "
 			query += " AND "
@@ -76,7 +87,6 @@ func (s *sqlLookupSource) Lookup(ctx api.StreamContext, keys []string, values []
 		}
 		}
 	}
 	}
 	ctx.GetLogger().Debugf("Query is %s", query)
 	ctx.GetLogger().Debugf("Query is %s", query)
-	// TODO extract common functions
 	rows, err := s.db.Query(query)
 	rows, err := s.db.Query(query)
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err
@@ -104,6 +114,7 @@ func (s *sqlLookupSource) Lookup(ctx api.StreamContext, keys []string, values []
 }
 }
 
 
 func (s *sqlLookupSource) Close(ctx api.StreamContext) error {
 func (s *sqlLookupSource) Close(ctx api.StreamContext) error {
+	ctx.GetLogger().Debugf("Closing sql lookup source")
 	defer func() { s.db = nil }()
 	defer func() { s.db = nil }()
 	if s.db != nil {
 	if s.db != nil {
 		return s.db.Close()
 		return s.db.Close()

+ 1 - 1
internal/topo/memory/lookupsource.go

@@ -55,7 +55,7 @@ func (s *lookupsource) Configure(datasource string, props map[string]interface{}
 	return nil
 	return nil
 }
 }
 
 
-func (s *lookupsource) Lookup(ctx api.StreamContext, keys []string, values []interface{}) ([]api.SourceTuple, error) {
+func (s *lookupsource) Lookup(ctx api.StreamContext, _ []string, keys []string, values []interface{}) ([]api.SourceTuple, error) {
 	ctx.GetLogger().Debugf("lookup source %s is looking up keys %v with values %v", s.topic, keys, values)
 	ctx.GetLogger().Debugf("lookup source %s is looking up keys %v with values %v", s.topic, keys, values)
 	return s.table.Read(keys, values)
 	return s.table.Read(keys, values)
 }
 }

+ 2 - 2
internal/topo/memory/lookupsource_test.go

@@ -62,7 +62,7 @@ func TestNoIndexLookup(t *testing.T) {
 		api.NewDefaultSourceTuple(map[string]interface{}{"ff": "value1", "gg": "value2"}, map[string]interface{}{"topic": "test"}),
 		api.NewDefaultSourceTuple(map[string]interface{}{"ff": "value1", "gg": "value2"}, map[string]interface{}{"topic": "test"}),
 		api.NewDefaultSourceTuple(map[string]interface{}{"ff": "value1", "gg": "value4"}, map[string]interface{}{"topic": "test"}),
 		api.NewDefaultSourceTuple(map[string]interface{}{"ff": "value1", "gg": "value4"}, map[string]interface{}{"topic": "test"}),
 	}
 	}
-	result, err := ls.Lookup(ctx, []string{"ff"}, []interface{}{"value1"})
+	result, err := ls.Lookup(ctx, []string{}, []string{"ff"}, []interface{}{"value1"})
 	if !reflect.DeepEqual(result, expected) {
 	if !reflect.DeepEqual(result, expected) {
 		t.Errorf("expect %v but got %v", expected, result)
 		t.Errorf("expect %v but got %v", expected, result)
 	}
 	}
@@ -110,7 +110,7 @@ func TestSingleIndexLookup(t *testing.T) {
 		api.NewDefaultSourceTuple(map[string]interface{}{"ff": "value1", "gg": "value2"}, map[string]interface{}{"topic": "test2"}),
 		api.NewDefaultSourceTuple(map[string]interface{}{"ff": "value1", "gg": "value2"}, map[string]interface{}{"topic": "test2"}),
 		api.NewDefaultSourceTuple(map[string]interface{}{"ff": "value1", "gg": "value4"}, map[string]interface{}{"topic": "test2"}),
 		api.NewDefaultSourceTuple(map[string]interface{}{"ff": "value1", "gg": "value4"}, map[string]interface{}{"topic": "test2"}),
 	}
 	}
-	result, err := ls.Lookup(ctx, []string{"ff"}, []interface{}{"value1"})
+	result, err := ls.Lookup(ctx, []string{}, []string{"ff"}, []interface{}{"value1"})
 	if !reflect.DeepEqual(result, expected) {
 	if !reflect.DeepEqual(result, expected) {
 		t.Errorf("expect %v but got %v", expected, result)
 		t.Errorf("expect %v but got %v", expected, result)
 	}
 	}

+ 4 - 2
internal/topo/node/lookup_node.go

@@ -34,15 +34,17 @@ type LookupNode struct {
 	vals        []ast.Expr
 	vals        []ast.Expr
 
 
 	srcOptions *ast.Options
 	srcOptions *ast.Options
+	fields     []string
 	keys       []string
 	keys       []string
 }
 }
 
 
-func NewLookupNode(name string, keys []string, joinType ast.JoinType, vals []ast.Expr, srcOptions *ast.Options, options *api.RuleOption) (*LookupNode, error) {
+func NewLookupNode(name string, fields []string, keys []string, joinType ast.JoinType, vals []ast.Expr, srcOptions *ast.Options, options *api.RuleOption) (*LookupNode, error) {
 	t := srcOptions.TYPE
 	t := srcOptions.TYPE
 	if t == "" {
 	if t == "" {
 		return nil, fmt.Errorf("source type is not specified")
 		return nil, fmt.Errorf("source type is not specified")
 	}
 	}
 	n := &LookupNode{
 	n := &LookupNode{
+		fields:     fields,
 		keys:       keys,
 		keys:       keys,
 		srcOptions: srcOptions,
 		srcOptions: srcOptions,
 		sourceType: t,
 		sourceType: t,
@@ -173,7 +175,7 @@ func (n *LookupNode) lookup(ctx api.StreamContext, d xsql.TupleRow, fv *xsql.Fun
 		e error
 		e error
 	)
 	)
 	if !hasNil { // if any of the value is nil, the lookup will always return empty result
 	if !hasNil { // if any of the value is nil, the lookup will always return empty result
-		r, e = ns.Lookup(ctx, n.keys, cvs)
+		r, e = ns.Lookup(ctx, n.fields, n.keys, cvs)
 	}
 	}
 	if e != nil {
 	if e != nil {
 		return e
 		return e

+ 2 - 2
internal/topo/node/lookup_node_test.go

@@ -41,7 +41,7 @@ func (m *mockLookupSrc) Configure(_ string, _ map[string]interface{}) error {
 }
 }
 
 
 // Lookup accept int value as the first array value
 // Lookup accept int value as the first array value
-func (m *mockLookupSrc) Lookup(_ api.StreamContext, _ []string, values []interface{}) ([]api.SourceTuple, error) {
+func (m *mockLookupSrc) Lookup(_ api.StreamContext, _ []string, _ []string, values []interface{}) ([]api.SourceTuple, error) {
 	a1, ok := values[0].(int)
 	a1, ok := values[0].(int)
 	if ok {
 	if ok {
 		var result []api.SourceTuple
 		var result []api.SourceTuple
@@ -299,7 +299,7 @@ func TestLookup(t *testing.T) {
 	lookup.CreateInstance("mock", "mock", options)
 	lookup.CreateInstance("mock", "mock", options)
 	contextLogger := conf.Log.WithField("rule", "TestLookup")
 	contextLogger := conf.Log.WithField("rule", "TestLookup")
 	ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
 	ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
-	l, _ := NewLookupNode("mock", []string{"a"}, ast.INNER_JOIN, []ast.Expr{&ast.FieldRef{
+	l, _ := NewLookupNode("mock", []string{}, []string{"a"}, ast.INNER_JOIN, []ast.Expr{&ast.FieldRef{
 		StreamName: "",
 		StreamName: "",
 		Name:       "a",
 		Name:       "a",
 	}}, options, &api.RuleOption{
 	}}, options, &api.RuleOption{

+ 1 - 1
internal/topo/planner/dataSourcePlan.go

@@ -48,7 +48,7 @@ func (p DataSourcePlan) Init() *DataSourcePlan {
 	return &p
 	return &p
 }
 }
 
 
-// Presume no children for data source
+// PushDownPredicate Presume no children for data source
 func (p *DataSourcePlan) PushDownPredicate(condition ast.Expr) (ast.Expr, LogicalPlan) {
 func (p *DataSourcePlan) PushDownPredicate(condition ast.Expr) (ast.Expr, LogicalPlan) {
 	if p.streamStmt.StreamType == ast.TypeTable {
 	if p.streamStmt.StreamType == ast.TypeTable {
 		return condition, p.self
 		return condition, p.self

+ 36 - 1
internal/topo/planner/lookupPlan.go

@@ -21,9 +21,9 @@ import (
 // LookupPlan is the plan for table lookup and then merged/joined
 // LookupPlan is the plan for table lookup and then merged/joined
 type LookupPlan struct {
 type LookupPlan struct {
 	baseLogicalPlan
 	baseLogicalPlan
-
 	joinExpr   ast.Join
 	joinExpr   ast.Join
 	keys       []string
 	keys       []string
+	fields     []string
 	valvars    []ast.Expr
 	valvars    []ast.Expr
 	options    *ast.Options
 	options    *ast.Options
 	conditions ast.Expr
 	conditions ast.Expr
@@ -143,3 +143,38 @@ func flatConditions(condition ast.Expr) ([]*ast.BinaryExpr, []ast.Expr) {
 	}
 	}
 	return []*ast.BinaryExpr{}, []ast.Expr{condition}
 	return []*ast.BinaryExpr{}, []ast.Expr{condition}
 }
 }
+
+func (p *LookupPlan) PruneColumns(fields []ast.Expr) error {
+	newFields := make([]ast.Expr, 0, len(fields))
+	isWildcard := false
+	strName := p.joinExpr.Name
+	fieldMap := make(map[string]struct{})
+	for _, field := range fields {
+		switch f := field.(type) {
+		case *ast.Wildcard:
+			isWildcard = true
+		case *ast.FieldRef:
+			if !isWildcard && (f.StreamName == ast.DefaultStream || string(f.StreamName) == strName) {
+				if f.Name == "*" {
+					isWildcard = true
+				} else {
+					fieldMap[f.Name] = struct{}{}
+				}
+				continue
+			}
+		case *ast.SortField:
+			if !isWildcard {
+				fieldMap[f.Name] = struct{}{}
+				continue
+			}
+		}
+		newFields = append(newFields, field)
+	}
+	if !isWildcard {
+		p.fields = make([]string, 0, len(fieldMap))
+		for k := range fieldMap {
+			p.fields = append(p.fields, k)
+		}
+	}
+	return p.baseLogicalPlan.PruneColumns(newFields)
+}

+ 1 - 1
internal/topo/planner/planner.go

@@ -172,7 +172,7 @@ func buildOps(lp LogicalPlan, tp *topo.Topo, options *api.RuleOption, sources []
 			return nil, 0, err
 			return nil, 0, err
 		}
 		}
 	case *LookupPlan:
 	case *LookupPlan:
-		op, err = node.NewLookupNode(t.joinExpr.Name, t.keys, t.joinExpr.JoinType, t.valvars, t.options, options)
+		op, err = node.NewLookupNode(t.joinExpr.Name, t.fields, t.keys, t.joinExpr.JoinType, t.valvars, t.options, options)
 	case *JoinAlignPlan:
 	case *JoinAlignPlan:
 		op, err = node.NewJoinAlignNode(fmt.Sprintf("%d_join_aligner", newIndex), t.Emitters, options)
 		op, err = node.NewJoinAlignNode(fmt.Sprintf("%d_join_aligner", newIndex), t.Emitters, options)
 	case *JoinPlan:
 	case *JoinPlan:

+ 16 - 26
internal/topo/planner/planner_test.go

@@ -2155,7 +2155,8 @@ func Test_createLogicalPlan4Lookup(t *testing.T) {
 									},
 									},
 								},
 								},
 							},
 							},
-							keys: []string{"id"},
+							keys:   []string{"id"},
+							fields: []string{"b"},
 							valvars: []ast.Expr{
 							valvars: []ast.Expr{
 								&ast.FieldRef{
 								&ast.FieldRef{
 									StreamName: "src1",
 									StreamName: "src1",
@@ -2195,7 +2196,7 @@ func Test_createLogicalPlan4Lookup(t *testing.T) {
 			}.Init(),
 			}.Init(),
 		},
 		},
 		{ // 1
 		{ // 1
-			sql: `SELECT src1.a, table1.b FROM src1 INNER JOIN table1 ON table1.b > 20 AND src1.c < 40 AND src1.id = table1.id`,
+			sql: `SELECT src1.a, table1.* FROM src1 INNER JOIN table1 ON table1.b > 20 AND src1.c < 40 AND src1.id = table1.id`,
 			p: ProjectPlan{
 			p: ProjectPlan{
 				baseLogicalPlan: baseLogicalPlan{
 				baseLogicalPlan: baseLogicalPlan{
 					children: []LogicalPlan{
 					children: []LogicalPlan{
@@ -2326,9 +2327,9 @@ func Test_createLogicalPlan4Lookup(t *testing.T) {
 					{
 					{
 						Expr: &ast.FieldRef{
 						Expr: &ast.FieldRef{
 							StreamName: "table1",
 							StreamName: "table1",
-							Name:       "b",
+							Name:       "*",
 						},
 						},
-						Name:  "b",
+						Name:  "*",
 						AName: "",
 						AName: "",
 					},
 					},
 				},
 				},
@@ -2336,7 +2337,7 @@ func Test_createLogicalPlan4Lookup(t *testing.T) {
 				sendMeta:    false,
 				sendMeta:    false,
 			}.Init(),
 			}.Init(),
 		},
 		},
-		{ // 0
+		{ // 2
 			sql: `SELECT src1.a, table1.b, table2.c FROM src1 INNER JOIN table1 ON src1.id = table1.id INNER JOIN table2 on table1.id = table2.id`,
 			sql: `SELECT src1.a, table1.b, table2.c FROM src1 INNER JOIN table1 ON src1.id = table1.id INNER JOIN table2 on table1.id = table2.id`,
 			p: ProjectPlan{
 			p: ProjectPlan{
 				baseLogicalPlan: baseLogicalPlan{
 				baseLogicalPlan: baseLogicalPlan{
@@ -2374,7 +2375,8 @@ func Test_createLogicalPlan4Lookup(t *testing.T) {
 												},
 												},
 											},
 											},
 										},
 										},
-										keys: []string{"id"},
+										keys:   []string{"id"},
+										fields: []string{"b"},
 										valvars: []ast.Expr{
 										valvars: []ast.Expr{
 											&ast.FieldRef{
 											&ast.FieldRef{
 												StreamName: "src1",
 												StreamName: "src1",
@@ -2407,7 +2409,8 @@ func Test_createLogicalPlan4Lookup(t *testing.T) {
 									},
 									},
 								},
 								},
 							},
 							},
-							keys: []string{"id"},
+							keys:   []string{"id"},
+							fields: []string{"c"},
 							valvars: []ast.Expr{
 							valvars: []ast.Expr{
 								&ast.FieldRef{
 								&ast.FieldRef{
 									StreamName: "table1",
 									StreamName: "table1",
@@ -2454,7 +2457,7 @@ func Test_createLogicalPlan4Lookup(t *testing.T) {
 			}.Init(),
 			}.Init(),
 		},
 		},
 		{ // 3
 		{ // 3
-			sql: `SELECT src1.a, table1.b FROM src1 INNER JOIN table1 ON src1.id = table1.id GROUP BY TUMBLINGWINDOW(ss, 10)`,
+			sql: `SELECT * FROM src1 INNER JOIN table1 ON src1.id = table1.id GROUP BY TUMBLINGWINDOW(ss, 10)`,
 			p: ProjectPlan{
 			p: ProjectPlan{
 				baseLogicalPlan: baseLogicalPlan{
 				baseLogicalPlan: baseLogicalPlan{
 					children: []LogicalPlan{
 					children: []LogicalPlan{
@@ -2467,11 +2470,9 @@ func Test_createLogicalPlan4Lookup(t *testing.T) {
 												DataSourcePlan{
 												DataSourcePlan{
 													baseLogicalPlan: baseLogicalPlan{},
 													baseLogicalPlan: baseLogicalPlan{},
 													name:            "src1",
 													name:            "src1",
-													streamFields: []interface{}{
-														"a",
-													},
-													streamStmt: streams["src1"],
-													metaFields: []string{},
+													streamStmt:      streams["src1"],
+													metaFields:      []string{},
+													isWildCard:      true,
 												}.Init(),
 												}.Init(),
 											},
 											},
 										},
 										},
@@ -2518,19 +2519,8 @@ func Test_createLogicalPlan4Lookup(t *testing.T) {
 				},
 				},
 				fields: []ast.Field{
 				fields: []ast.Field{
 					{
 					{
-						Expr: &ast.FieldRef{
-							StreamName: "src1",
-							Name:       "a",
-						},
-						Name:  "a",
-						AName: "",
-					},
-					{
-						Expr: &ast.FieldRef{
-							StreamName: "table1",
-							Name:       "b",
-						},
-						Name:  "b",
+						Expr:  &ast.Wildcard{Token: ast.ASTERISK},
+						Name:  "",
 						AName: "",
 						AName: "",
 					},
 					},
 				},
 				},

+ 1 - 1
internal/topo/redis/lookup.go

@@ -72,7 +72,7 @@ func (s *lookupSource) Open(ctx api.StreamContext) error {
 	return nil
 	return nil
 }
 }
 
 
-func (s *lookupSource) Lookup(ctx api.StreamContext, keys []string, values []interface{}) ([]api.SourceTuple, error) {
+func (s *lookupSource) Lookup(ctx api.StreamContext, _ []string, keys []string, values []interface{}) ([]api.SourceTuple, error) {
 	ctx.GetLogger().Debugf("Lookup redis %v", keys)
 	ctx.GetLogger().Debugf("Lookup redis %v", keys)
 	if len(keys) != 1 {
 	if len(keys) != 1 {
 		return nil, fmt.Errorf("redis lookup only support one key, but got %v", keys)
 		return nil, fmt.Errorf("redis lookup only support one key, but got %v", keys)

+ 2 - 2
internal/topo/redis/lookup_test.go

@@ -78,7 +78,7 @@ func TestSingle(t *testing.T) {
 		},
 		},
 	}
 	}
 	for i, tt := range tests {
 	for i, tt := range tests {
-		actual, err := ls.Lookup(ctx, []string{"id"}, []interface{}{tt.value})
+		actual, err := ls.Lookup(ctx, []string{}, []string{"id"}, []interface{}{tt.value})
 		if err != nil {
 		if err != nil {
 			t.Errorf("Test %d: %v", i, err)
 			t.Errorf("Test %d: %v", i, err)
 			continue
 			continue
@@ -125,7 +125,7 @@ func TestList(t *testing.T) {
 		},
 		},
 	}
 	}
 	for i, tt := range tests {
 	for i, tt := range tests {
-		actual, err := ls.Lookup(ctx, []string{"id"}, []interface{}{tt.value})
+		actual, err := ls.Lookup(ctx, []string{}, []string{"id"}, []interface{}{tt.value})
 		if err != nil {
 		if err != nil {
 			t.Errorf("Test %d: %v", i, err)
 			t.Errorf("Test %d: %v", i, err)
 			continue
 			continue

+ 1 - 1
pkg/api/stream.go

@@ -85,7 +85,7 @@ type LookupSource interface {
 	//read from the yaml
 	//read from the yaml
 	Configure(datasource string, props map[string]interface{}) error
 	Configure(datasource string, props map[string]interface{}) error
 	// Lookup receive lookup values to construct the query and return query results
 	// Lookup receive lookup values to construct the query and return query results
-	Lookup(ctx StreamContext, keys []string, values []interface{}) ([]SourceTuple, error)
+	Lookup(ctx StreamContext, fields []string, keys []string, values []interface{}) ([]SourceTuple, error)
 	Closable
 	Closable
 }
 }