Forráskód Böngészése

feat(sql): support sql in clause (#1303)

* feat(sql in): support sql in clause

Signed-off-by: Jianxiang Ran <rxan_embedded@163.com>

* feat(sql in): support sql not in clause

Signed-off-by: Jianxiang Ran <rxan_embedded@163.com>

* feat(sql in): support sql not in clause

Signed-off-by: Jianxiang Ran <rxan_embedded@163.com>
superxan 2 éve
szülő
commit
3e262f47a8

+ 1 - 1
docs/en_US/sqls/lexical_elements.md

@@ -64,7 +64,7 @@ CREATE STREAM `stream` (
 Following operators are provided.
 
 ```
-+, -, *, /, %, &, |, ^, =, !=, <, <=, >, >=, [], ->, ()
++, -, *, /, %, &, |, ^, =, !=, <, <=, >, >=, [], ->, (), IN, NOT IN
 ```
 
 ## Literals

+ 23 - 8
docs/en_US/sqls/query_language_elements.md

@@ -3,15 +3,15 @@
 
 eKuiper provides a variety of elements for building queries. They are summarized below.
 
-| Element               | Summary                                                      |
-| --------------------- | ------------------------------------------------------------ |
-| [SELECT](#select)     | SELECT is used to retrieve rows from input streams and enables the selection of one or many columns from one or many input streams in eKuiper. |
-| [FROM](#from)         | FROM specifies the input stream. The FROM clause is always required for any SELECT statement. |
+| Element               | Summary                                                                                                                                                                                                                                       |
+|-----------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| [SELECT](#select)     | SELECT is used to retrieve rows from input streams and enables the selection of one or many columns from one or many input streams in eKuiper.                                                                                                |
+| [FROM](#from)         | FROM specifies the input stream. The FROM clause is always required for any SELECT statement.                                                                                                                                                 |
 | [JOIN](#join)         | JOIN is used to combine records from two or more input streams. JOIN includes LEFT, RIGHT, FULL & CROSS. Join can apply to multiple streams join or stream/table join. To join multiple streams, it must run within a [window](./windows.md). |
-| [WHERE](#where)       | WHERE specifies the search condition for the rows returned by the query. |
-| [GROUP BY](#group-by) | GROUP BY groups a selected set of rows into a set of summary rows grouped by the values of one or more columns or expressions. It must run within a [window](./windows.md). |
-| [ORDER BY](#order-by) | Order the rows by values of one or more columns.             |
-| [HAVING](#having)     | HAVING specifies a search condition for a group or an aggregate. HAVING can be used only with the SELECT expression.             |
+| [WHERE](#where)       | WHERE specifies the search condition for the rows returned by the query.                                                                                                                                                                      |
+| [GROUP BY](#group-by) | GROUP BY groups a selected set of rows into a set of summary rows grouped by the values of one or more columns or expressions. It must run within a [window](./windows.md).                                                                   |
+| [ORDER BY](#order-by) | Order the rows by values of one or more columns.                                                                                                                                                                                              |
+| [HAVING](#having)     | HAVING specifies a search condition for a group or an aggregate. HAVING can be used only with the SELECT expression.                                                                                                                          |
 
 ## SELECT
 
@@ -208,6 +208,21 @@ Is the operator used to test the condition of one expression being less than the
 
 Is the operator used to test the condition of one expression being less than or equal to the other expression.
 
+**[NOT] IN**
+
+Is the operator used to test the condition of one expression (not) being part of to the other expression. support these two formats
+
+```sql
+  expression [NOT] IN (expression2,...n)
+```
+*Note*: support multiple expressions at the same time, but each expression must return single value
+
+```sql
+  expression [NOT] IN expression2
+```
+
+*Note*:user must make sure the result of expression2 is in array format 
+
 ```sql
 SELECT column1, column2, ...
 FROM table_name

+ 1 - 1
docs/zh_CN/sqls/lexical_elements.md

@@ -64,7 +64,7 @@ CREATE STREAM `stream` (
 提供了以下运算符。
 
 ```
-+, -, *, /, %, &, |, ^, =, !=, <, <=, >, >=, [], ->, ()
++, -, *, /, %, &, |, ^, =, !=, <, <=, >, >=, [], ->, (), IN, NOT IN
 ```
 
 ## 字面量(Literals)

+ 27 - 11
docs/zh_CN/sqls/query_language_elements.md

@@ -3,16 +3,16 @@
 
 eKuiper 提供了用于构建查询的各种元素。 总结如下。
 
-| 元素                  | 总结                                                         |
-| --------------------- | ------------------------------------------------------------ |
-| [SELECT](#select)     | SELECT 用于从输入流中检索行,并允许从 eKuiper 中的一个或多个输入流中选择一个或多个列。 |
-| [FROM](#from)         | FROM 指定输入流。 任何 SELECT 语句始终需要 FROM 子句。       |
-| [JOIN](#join)         | JOIN 用于合并来自两个或更多输入流的记录。 JOIN 包括 LEFT,RIGHT,FULL 和 CROSS。JOIN 可用于多个流或者流和表格。当用于多个流时,必须运行在[窗口](./windows.md)中,否则每次单条数据,JOIN 没有意义。|
-| [WHERE](#where)       | WHERE 指定查询返回的行的搜索条件。                           |
-| [GROUP BY](#group-by) | GROUP BY 将一组选定的行分组为一组汇总行,这些汇总行按一个或多个列或表达式的值分组。该语句必须运行在[窗口](./windows.md)中。 |
-| [ORDER BY](#order-by) | 按一列或多列的值对行进行排序。                               |
-| [HAVING](#having)     | HAVING 为组或集合指定搜索条件。 HAVING 只能与 SELECT 表达式一起使用。 |
-|                       |                                                              |
+| 元素                    | 总结                                                                                                                             |
+|-----------------------|--------------------------------------------------------------------------------------------------------------------------------|
+| [SELECT](#select)     | SELECT 用于从输入流中检索行,并允许从 eKuiper 中的一个或多个输入流中选择一个或多个列。                                                                            |
+| [FROM](#from)         | FROM 指定输入流。 任何 SELECT 语句始终需要 FROM 子句。                                                                                          |
+| [JOIN](#join)         | JOIN 用于合并来自两个或更多输入流的记录。 JOIN 包括 LEFT,RIGHT,FULL 和 CROSS。JOIN 可用于多个流或者流和表格。当用于多个流时,必须运行在[窗口](./windows.md)中,否则每次单条数据,JOIN 没有意义。 |
+| [WHERE](#where)       | WHERE 指定查询返回的行的搜索条件。                                                                                                           |
+| [GROUP BY](#group-by) | GROUP BY 将一组选定的行分组为一组汇总行,这些汇总行按一个或多个列或表达式的值分组。该语句必须运行在[窗口](./windows.md)中。                                                     |
+| [ORDER BY](#order-by) | 按一列或多列的值对行进行排序。                                                                                                                |
+| [HAVING](#having)     | HAVING 为组或集合指定搜索条件。 HAVING 只能与 SELECT 表达式一起使用。                                                                                 |
+|                       |                                                                                                                                |
 
 ## SELECT
 
@@ -154,7 +154,7 @@ WHERE <search_condition>
     [ { AND | OR } { <predicate> | ( <search_condition> ) } ]   
 [ ,...n ]   
 <predicate> ::=   
-    { expression { = | < > | ! = | > | > = | < | < = } expression   
+    { expression { = | < > | ! = | > | > = | < | < = | NOT IN} expression   
 ```
 
 ### 参数
@@ -209,6 +209,22 @@ WHERE <search_condition>
 
 用于测试一个表达式小于或等于另一个表达式的条件的运算符。
 
+**[NOT] IN**
+
+用于测试一个表达式是否属于另一个表达式的条件的运算符。
+使用方法支持以下两种
+
+```sql
+  expression [NOT] IN (expression2,...n)
+```
+*注意*: 支持同时设置多个表达式, 但用户须确保每个表达式返回值为单一值
+
+```sql
+  expression [NOT] IN expression2
+```
+
+*注意*: 用户须确保 expression2 的返回值为数组
+
 ```sql
 SELECT column1, column2, ...
 FROM table_name

+ 8 - 0
internal/topo/operator/filter_operator.go

@@ -45,6 +45,8 @@ func (p *FilterOp) Apply(ctx api.StreamContext, data interface{}, fv *xsql.Funct
 			if r {
 				return input
 			}
+		case nil: // nil is false
+			break
 		default:
 			return fmt.Errorf("run Where error: invalid condition that returns non-bool value %[1]T(%[1]v)", r)
 		}
@@ -60,6 +62,8 @@ func (p *FilterOp) Apply(ctx api.StreamContext, data interface{}, fv *xsql.Funct
 				if val {
 					f = append(f, t)
 				}
+			case nil:
+				break
 			default:
 				return fmt.Errorf("run Where error: invalid condition that returns non-bool value %[1]T(%[1]v)", val)
 			}
@@ -82,6 +86,8 @@ func (p *FilterOp) Apply(ctx api.StreamContext, data interface{}, fv *xsql.Funct
 				if val {
 					r = append(r, v)
 				}
+			case nil:
+				break
 			default:
 				return fmt.Errorf("run Where error: invalid condition that returns non-bool value %[1]T(%[1]v)", val)
 			}
@@ -103,6 +109,8 @@ func (p *FilterOp) Apply(ctx api.StreamContext, data interface{}, fv *xsql.Funct
 				if val {
 					r = append(r, v)
 				}
+			case nil:
+				break
 			default:
 				return fmt.Errorf("run Where error: invalid condition that returns non-bool value %[1]T(%[1]v)", val)
 			}

+ 218 - 1
internal/topo/operator/filter_test.go

@@ -131,6 +131,177 @@ func TestFilterPlan_Apply(t *testing.T) {
 		},
 
 		{
+			sql: "SELECT abc FROM tbl WHERE def IN (\"hello\") AND abc IN (34, 33)",
+			data: &xsql.Tuple{
+				Emitter: "tbl",
+				Message: xsql.Message{
+					"abc": int64(34),
+					"def": "hello",
+				},
+			},
+			result: &xsql.Tuple{
+				Emitter: "tbl",
+				Message: xsql.Message{
+					"abc": int64(34),
+					"def": "hello",
+				},
+			},
+		},
+
+		{
+			sql: "SELECT abc FROM tbl WHERE def NOT IN (\"ello\") AND abc NOT IN (35, 33)",
+			data: &xsql.Tuple{
+				Emitter: "tbl",
+				Message: xsql.Message{
+					"abc": int64(34),
+					"def": "hello",
+				},
+			},
+			result: &xsql.Tuple{
+				Emitter: "tbl",
+				Message: xsql.Message{
+					"abc": int64(34),
+					"def": "hello",
+				},
+			},
+		},
+
+		{
+			sql: "SELECT abc FROM tbl WHERE def IN strArraySet AND abc IN intArraySet",
+			data: &xsql.Tuple{
+				Emitter: "tbl",
+				Message: xsql.Message{
+					"abc":         int64(34),
+					"def":         "hello",
+					"strArraySet": []string{"hello", "world"},
+					"intArraySet": []int{33, 34},
+				},
+			},
+			result: &xsql.Tuple{
+				Emitter: "tbl",
+				Message: xsql.Message{
+					"abc":         int64(34),
+					"def":         "hello",
+					"strArraySet": []string{"hello", "world"},
+					"intArraySet": []int{33, 34},
+				},
+			},
+		},
+
+		{
+			sql: "SELECT abc FROM tbl WHERE def NOT IN strArraySet AND abc NOT IN intArraySet",
+			data: &xsql.Tuple{
+				Emitter: "tbl",
+				Message: xsql.Message{
+					"abc":         int64(34),
+					"def":         "hello",
+					"strArraySet": []string{"ello", "world"},
+					"intArraySet": []int{33, 35},
+				},
+			},
+			result: &xsql.Tuple{
+				Emitter: "tbl",
+				Message: xsql.Message{
+					"abc":         int64(34),
+					"def":         "hello",
+					"strArraySet": []string{"ello", "world"},
+					"intArraySet": []int{33, 35},
+				},
+			},
+		},
+
+		{
+			sql: "SELECT abc FROM tbl WHERE def IN (\"ello\")",
+			data: &xsql.Tuple{
+				Emitter: "tbl",
+				Message: xsql.Message{
+					"abc": int64(34),
+				},
+			},
+			result: nil,
+		},
+
+		{
+			sql: "SELECT abc FROM tbl WHERE def NOT IN (\"ello\")",
+			data: &xsql.Tuple{
+				Emitter: "tbl",
+				Message: xsql.Message{
+					"abc": int64(34),
+				},
+			},
+			result: nil,
+		},
+
+		{
+			sql: "SELECT abc FROM tbl WHERE def IN strArraySet",
+			data: &xsql.Tuple{
+				Emitter: "tbl",
+				Message: xsql.Message{
+					"abc":         int64(34),
+					"def":         "hello",
+					"strArraySet": nil,
+				},
+			},
+			result: nil,
+		},
+
+		{
+			sql: "SELECT abc FROM tbl WHERE def NOT IN strArraySet",
+			data: &xsql.Tuple{
+				Emitter: "tbl",
+				Message: xsql.Message{
+					"abc":         int64(34),
+					"def":         "hello",
+					"strArraySet": nil,
+				},
+			},
+			result: &xsql.Tuple{
+				Emitter: "tbl",
+				Message: xsql.Message{
+					"abc":         int64(34),
+					"def":         "hello",
+					"strArraySet": nil,
+				},
+			},
+		},
+
+		{
+			sql: "SELECT abc FROM tbl WHERE abc IN (abc, def, ghm)",
+			data: &xsql.Tuple{
+				Emitter: "tbl",
+				Message: xsql.Message{
+					"abc": int64(34),
+					"def": "hello",
+				},
+			},
+			result: &xsql.Tuple{
+				Emitter: "tbl",
+				Message: xsql.Message{
+					"abc": int64(34),
+					"def": "hello",
+				},
+			},
+		},
+
+		{
+			sql: "SELECT abc FROM tbl WHERE abc NOT IN (def, ghm)",
+			data: &xsql.Tuple{
+				Emitter: "tbl",
+				Message: xsql.Message{
+					"abc": int64(34),
+					"def": int64(35),
+				},
+			},
+			result: &xsql.Tuple{
+				Emitter: "tbl",
+				Message: xsql.Message{
+					"abc": int64(34),
+					"def": int64(35),
+				},
+			},
+		},
+
+		{
 			sql: "SELECT abc FROM src1 WHERE f1 = \"v1\" GROUP BY TUMBLINGWINDOW(ss, 10)",
 			data: xsql.WindowTuplesSet{
 				Content: []xsql.WindowTuples{
@@ -248,7 +419,53 @@ func TestFilterPlan_Apply(t *testing.T) {
 				},
 			},
 		},
-
+		{
+			sql: "SELECT id1 FROM src1 left join src2 on src1.id1 = src2.id2 WHERE src1.f1 IN (\"v1\") GROUP BY TUMBLINGWINDOW(ss, 10)",
+			data: &xsql.JoinTupleSets{
+				Content: []xsql.JoinTuple{
+					{
+						Tuples: []xsql.Tuple{
+							{Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
+							{Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
+						},
+					},
+					{
+						Tuples: []xsql.Tuple{
+							{Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
+							{Emitter: "src2", Message: xsql.Message{"id2": 4, "f2": "w3"}},
+						},
+					},
+					{
+						Tuples: []xsql.Tuple{
+							{Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v1"}},
+						},
+					},
+				},
+				WindowRange: &xsql.WindowRange{
+					WindowStart: 1541152486013,
+					WindowEnd:   1541152487013,
+				},
+			},
+			result: &xsql.JoinTupleSets{
+				Content: []xsql.JoinTuple{
+					{
+						Tuples: []xsql.Tuple{
+							{Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
+							{Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
+						},
+					},
+					{
+						Tuples: []xsql.Tuple{
+							{Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v1"}},
+						},
+					},
+				},
+				WindowRange: &xsql.WindowRange{
+					WindowStart: 1541152486013,
+					WindowEnd:   1541152487013,
+				},
+			},
+		},
 		{
 			sql: "SELECT id1 FROM src1 left join src2 on src1.id1 = src2.id2 WHERE src1.f1 = \"v22\" GROUP BY TUMBLINGWINDOW(ss, 10)",
 			data: &xsql.JoinTupleSets{

+ 4 - 0
internal/xsql/lexical.go

@@ -213,6 +213,10 @@ func (s *Scanner) ScanIdent() (tok ast.Token, lit string) {
 		return ast.ELSE, lit
 	case "END":
 		return ast.END, lit
+	case "IN":
+		return ast.IN, lit
+	case "NOT":
+		return ast.NOT, lit
 	case "CREATE":
 		return ast.CREATE, lit
 	case "DROP":

+ 54 - 0
internal/xsql/parser.go

@@ -503,6 +503,19 @@ func (p *Parser) ParseExpr() (ast.Expr, error) {
 		} else if op == ast.LBRACKET { //LBRACKET is a special token, need to unscan
 			op = ast.SUBSET
 			p.unscan()
+		} else if op == ast.IN { //IN is a special token, need to unscan
+			op = ast.IN
+			p.unscan()
+		} else if op == ast.NOT {
+			afterNot, tk1 := p.scanIgnoreWhitespace()
+			switch afterNot {
+			case ast.IN: //IN is a special token, need to unscan
+				op = ast.NOTIN
+				p.unscan()
+				break
+			default:
+				return nil, fmt.Errorf("found %q, expected expression", tk1)
+			}
 		}
 
 		var rhs ast.Expr
@@ -535,6 +548,8 @@ func (p *Parser) parseUnaryExpr(isSubField bool) (ast.Expr, error) {
 		return &ast.ParenExpr{Expr: expr}, nil
 	} else if tok1 == ast.LBRACKET {
 		return p.parseBracketExpr()
+	} else if tok1 == ast.IN {
+		return p.parseValueSetExpr()
 	}
 
 	p.unscan()
@@ -595,6 +610,45 @@ func (p *Parser) parseUnaryExpr(isSubField bool) (ast.Expr, error) {
 	return nil, fmt.Errorf("found %q, expected expression.", lit)
 }
 
+func (p *Parser) parseValueSetExpr() (ast.Expr, error) {
+	valsetExpr := &ast.ValueSetExpr{
+		LiteralExprs: nil,
+		ArrayExpr:    nil,
+	}
+	// IN ("A", "B") or IN expression
+	tk, _ := p.scanIgnoreWhitespace()
+	if tk == ast.LPAREN {
+		for {
+			element, err := p.ParseExpr()
+			if err != nil {
+				return nil, fmt.Errorf("expect elements for IN expression, but %v", err)
+			}
+			valsetExpr.LiteralExprs = append(valsetExpr.LiteralExprs, element)
+
+			if tok2, _ := p.scanIgnoreWhitespace(); tok2 != ast.COMMA {
+				p.unscan()
+				break
+			}
+		}
+
+		if tok, lit := p.scanIgnoreWhitespace(); tok != ast.RPAREN {
+			return nil, fmt.Errorf("expect ) for IN expression, but got %q", lit)
+		}
+
+		return valsetExpr, nil
+	} else {
+		//back to IN
+		p.unscan()
+	}
+
+	if exp, err := p.parseUnaryExpr(false); err != nil {
+		return nil, fmt.Errorf("expect Ident expression after IN, but got error %v", err)
+	} else {
+		valsetExpr.ArrayExpr = exp
+		return valsetExpr, nil
+	}
+}
+
 func (p *Parser) parseBracketExpr() (ast.Expr, error) {
 	tok2, lit2 := p.scanIgnoreWhiteSpaceWithNegativeNum()
 	if tok2 == ast.RBRACKET {

+ 116 - 0
internal/xsql/parser_test.go

@@ -886,6 +886,70 @@ func TestParser_ParseStatement(t *testing.T) {
 		},
 
 		{
+			s: `SELECT temp AS t, name FROM topic/sensor1 WHERE t IN arraySet OR name IN arraySet`,
+			stmt: &ast.SelectStatement{
+				Fields: []ast.Field{
+					{Expr: &ast.FieldRef{Name: "temp", StreamName: ast.DefaultStream}, Name: "temp", AName: "t"},
+					{Expr: &ast.FieldRef{Name: "name", StreamName: ast.DefaultStream}, Name: "name", AName: ""},
+				},
+				Sources: []ast.Source{&ast.Table{Name: "topic/sensor1"}},
+				Condition: &ast.BinaryExpr{
+					LHS: &ast.BinaryExpr{LHS: &ast.FieldRef{Name: "t", StreamName: ast.DefaultStream}, OP: ast.IN, RHS: &ast.ValueSetExpr{ArrayExpr: &ast.FieldRef{Name: "arraySet", StreamName: ast.DefaultStream}}},
+					OP:  ast.OR,
+					RHS: &ast.BinaryExpr{LHS: &ast.FieldRef{Name: "name", StreamName: ast.DefaultStream}, OP: ast.IN, RHS: &ast.ValueSetExpr{ArrayExpr: &ast.FieldRef{Name: "arraySet", StreamName: ast.DefaultStream}}},
+				},
+			},
+		},
+
+		{
+			s: `SELECT temp AS t, name FROM topic/sensor1 WHERE t NOT IN arraySet OR name NOT IN arraySet`,
+			stmt: &ast.SelectStatement{
+				Fields: []ast.Field{
+					{Expr: &ast.FieldRef{Name: "temp", StreamName: ast.DefaultStream}, Name: "temp", AName: "t"},
+					{Expr: &ast.FieldRef{Name: "name", StreamName: ast.DefaultStream}, Name: "name", AName: ""},
+				},
+				Sources: []ast.Source{&ast.Table{Name: "topic/sensor1"}},
+				Condition: &ast.BinaryExpr{
+					LHS: &ast.BinaryExpr{LHS: &ast.FieldRef{Name: "t", StreamName: ast.DefaultStream}, OP: ast.NOTIN, RHS: &ast.ValueSetExpr{ArrayExpr: &ast.FieldRef{Name: "arraySet", StreamName: ast.DefaultStream}}},
+					OP:  ast.OR,
+					RHS: &ast.BinaryExpr{LHS: &ast.FieldRef{Name: "name", StreamName: ast.DefaultStream}, OP: ast.NOTIN, RHS: &ast.ValueSetExpr{ArrayExpr: &ast.FieldRef{Name: "arraySet", StreamName: ast.DefaultStream}}},
+				},
+			},
+		},
+
+		{
+			s: `SELECT temp AS t, name FROM topic/sensor1 WHERE t IN (20.5, 20.4) OR name IN ("dname", "ename")`,
+			stmt: &ast.SelectStatement{
+				Fields: []ast.Field{
+					{Expr: &ast.FieldRef{Name: "temp", StreamName: ast.DefaultStream}, Name: "temp", AName: "t"},
+					{Expr: &ast.FieldRef{Name: "name", StreamName: ast.DefaultStream}, Name: "name", AName: ""},
+				},
+				Sources: []ast.Source{&ast.Table{Name: "topic/sensor1"}},
+				Condition: &ast.BinaryExpr{
+					LHS: &ast.BinaryExpr{LHS: &ast.FieldRef{Name: "t", StreamName: ast.DefaultStream}, OP: ast.IN, RHS: &ast.ValueSetExpr{LiteralExprs: []ast.Expr{&ast.NumberLiteral{Val: 20.5}, &ast.NumberLiteral{Val: 20.4}}}},
+					OP:  ast.OR,
+					RHS: &ast.BinaryExpr{LHS: &ast.FieldRef{Name: "name", StreamName: ast.DefaultStream}, OP: ast.IN, RHS: &ast.ValueSetExpr{LiteralExprs: []ast.Expr{&ast.StringLiteral{Val: "dname"}, &ast.StringLiteral{Val: "ename"}}}},
+				},
+			},
+		},
+
+		{
+			s: `SELECT temp AS t, name FROM topic/sensor1 WHERE t NOT IN (20.5, 20.4) OR name IN ("dname", "ename")`,
+			stmt: &ast.SelectStatement{
+				Fields: []ast.Field{
+					{Expr: &ast.FieldRef{Name: "temp", StreamName: ast.DefaultStream}, Name: "temp", AName: "t"},
+					{Expr: &ast.FieldRef{Name: "name", StreamName: ast.DefaultStream}, Name: "name", AName: ""},
+				},
+				Sources: []ast.Source{&ast.Table{Name: "topic/sensor1"}},
+				Condition: &ast.BinaryExpr{
+					LHS: &ast.BinaryExpr{LHS: &ast.FieldRef{Name: "t", StreamName: ast.DefaultStream}, OP: ast.NOTIN, RHS: &ast.ValueSetExpr{LiteralExprs: []ast.Expr{&ast.NumberLiteral{Val: 20.5}, &ast.NumberLiteral{Val: 20.4}}}},
+					OP:  ast.OR,
+					RHS: &ast.BinaryExpr{LHS: &ast.FieldRef{Name: "name", StreamName: ast.DefaultStream}, OP: ast.IN, RHS: &ast.ValueSetExpr{LiteralExprs: []ast.Expr{&ast.StringLiteral{Val: "dname"}, &ast.StringLiteral{Val: "ename"}}}},
+				},
+			},
+		},
+
+		{
 			s: `SELECT temp AS t, name FROM topic/sensor1 WHERE name = "dname" GROUP BY name`,
 			stmt: &ast.SelectStatement{
 				Fields: []ast.Field{
@@ -1517,6 +1581,27 @@ func TestParser_ParseStatement(t *testing.T) {
 			s:   `SELECT ".*(/)(?!.*\1)" FROM topic/sensor1 AS t1`,
 			err: `found "invalid string: \".*(/)(?!.*\\1)\"", expected expression.`,
 		},
+		{
+			s: `SELECT name FROM tbl WHERE name IN ("A", "B","C")`,
+			stmt: &ast.SelectStatement{
+				Fields: []ast.Field{
+					{
+						Expr:  &ast.FieldRef{Name: "name", StreamName: ast.DefaultStream},
+						Name:  "name",
+						AName: ""},
+				},
+				Sources:   []ast.Source{&ast.Table{Name: "tbl"}},
+				Condition: &ast.BinaryExpr{LHS: &ast.FieldRef{Name: "name", StreamName: ast.DefaultStream}, OP: ast.IN, RHS: &ast.ValueSetExpr{LiteralExprs: []ast.Expr{&ast.StringLiteral{Val: "A"}, &ast.StringLiteral{Val: "B"}, &ast.StringLiteral{Val: "C"}}}},
+			},
+		},
+		{
+			s:   `SELECT name FROM tbl WHERE name IN ()`,
+			err: `expect elements for IN expression, but found ")", expected expression.`,
+		},
+		{
+			s:   `SELECT name FROM tbl WHERE name IN (abc,def OR name in (abc)`,
+			err: `expect ) for IN expression, but got "EOF"`,
+		},
 	}
 
 	fmt.Printf("The test bucket size is %d.\n\n", len(tests))
@@ -1739,6 +1824,37 @@ func TestParser_ParseWindowsExpr(t *testing.T) {
 				},
 			},
 		},
+
+		{
+			s: `SELECT * FROM demo GROUP BY department, COUNTWINDOW(3,1) FILTER( where revenue IN (100, 200)), year`,
+			stmt: &ast.SelectStatement{
+				Fields: []ast.Field{
+					{
+						Expr:  &ast.Wildcard{Token: ast.ASTERISK},
+						Name:  "",
+						AName: ""},
+				},
+				Sources: []ast.Source{&ast.Table{Name: "demo"}},
+				Dimensions: ast.Dimensions{
+					ast.Dimension{Expr: &ast.FieldRef{Name: "department", StreamName: ast.DefaultStream}},
+					ast.Dimension{
+						Expr: &ast.Window{
+							WindowType: ast.COUNT_WINDOW,
+							Length:     &ast.IntegerLiteral{Val: 3},
+							Interval:   &ast.IntegerLiteral{Val: 1},
+							Filter: &ast.BinaryExpr{
+								LHS: &ast.FieldRef{Name: "revenue", StreamName: ast.DefaultStream},
+								OP:  ast.IN,
+								RHS: &ast.ValueSetExpr{
+									LiteralExprs: []ast.Expr{&ast.IntegerLiteral{Val: 100}, &ast.IntegerLiteral{Val: 200}},
+								},
+							},
+						},
+					},
+					ast.Dimension{Expr: &ast.FieldRef{Name: "year", StreamName: ast.DefaultStream}},
+				},
+			},
+		},
 		//to be supported
 		{
 			s:    `SELECT sum(f1) FILTER( where revenue > 100 ) FROM tbl GROUP BY year`,

+ 99 - 0
internal/xsql/valuer.go

@@ -562,6 +562,8 @@ func (v *ValuerEval) Eval(expr ast.Expr) interface{} {
 		return val
 	case *ast.CaseExpr:
 		return v.evalCase(expr)
+	case *ast.ValueSetExpr:
+		return v.evalValueSet(expr)
 	default:
 		return nil
 	}
@@ -595,6 +597,9 @@ func (v *ValuerEval) evalBinaryExpr(expr *ast.BinaryExpr) interface{} {
 	if _, ok := rhs.(error); ok {
 		return rhs
 	}
+	if isSetOperator(expr.OP) {
+		return v.evalSetsExpr(lhs, expr.OP, rhs)
+	}
 	return v.simpleDataEval(lhs, rhs, expr.OP)
 }
 
@@ -630,11 +635,105 @@ func (v *ValuerEval) evalCase(expr *ast.CaseExpr) interface{} {
 	return nil
 }
 
+func (v *ValuerEval) evalValueSet(expr *ast.ValueSetExpr) interface{} {
+	var valueSet []interface{}
+
+	if expr.LiteralExprs != nil {
+		for _, exp := range expr.LiteralExprs {
+			valueSet = append(valueSet, v.Eval(exp))
+		}
+		return valueSet
+	}
+
+	value := v.Eval(expr.ArrayExpr)
+	if isSliceOrArray(value) {
+		return value
+	}
+	return nil
+}
+
+func isBlank(value reflect.Value) bool {
+	switch value.Kind() {
+	case reflect.String:
+		return value.Len() == 0
+	case reflect.Bool:
+		return !value.Bool()
+	case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64:
+		return value.Int() == 0
+	case reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64, reflect.Uintptr:
+		return value.Uint() == 0
+	case reflect.Float32, reflect.Float64:
+		return value.Float() == 0
+	case reflect.Interface, reflect.Ptr:
+		return value.IsNil()
+	}
+	return reflect.DeepEqual(value.Interface(), reflect.Zero(value.Type()).Interface())
+}
+
+func (v *ValuerEval) evalSetsExpr(lhs interface{}, op ast.Token, rhsSet interface{}) interface{} {
+	switch op {
+	/*Semantic rules
+
+	When using the IN operator, the following semantics apply in this order:
+
+	Returns FALSE if value_set is empty.
+	Returns NULL if search_value is NULL.
+	Returns TRUE if value_set contains a value equal to search_value.
+	Returns NULL if value_set contains a NULL.
+	Returns FALSE.
+	When using the NOT IN operator, the following semantics apply in this order:
+
+	Returns TRUE if value_set is empty.
+	Returns NULL if search_value is NULL.
+	Returns FALSE if value_set contains a value equal to search_value.
+	Returns NULL if value_set contains a NULL.
+	Returns TRUE.
+	*/
+	case ast.IN, ast.NOTIN:
+		if rhsSet == nil {
+			if op == ast.IN {
+				return false
+			} else {
+				return true
+			}
+		}
+		if lhs == nil {
+			return nil
+		}
+		rhsSetVals := reflect.ValueOf(rhsSet)
+		for i := 0; i < rhsSetVals.Len(); i++ {
+			switch r := v.simpleDataEval(lhs, rhsSetVals.Index(i).Interface(), ast.EQ).(type) {
+			case error:
+				return fmt.Errorf("evaluate in expression error: %s", r)
+			case bool:
+				if r {
+					if op == ast.IN {
+						return true
+					} else {
+						return false
+					}
+				}
+			}
+		}
+		if op == ast.IN {
+			return false
+		} else {
+			return true
+		}
+	default:
+		return fmt.Errorf("%v is an invalid operation for %T", op, lhs)
+	}
+}
+
 func isSliceOrArray(v interface{}) bool {
 	kind := reflect.ValueOf(v).Kind()
 	return kind == reflect.Array || kind == reflect.Slice
 }
 
+func isSetOperator(op ast.Token) bool {
+	return op == ast.IN || op == ast.NOTIN
+}
+
 func (v *ValuerEval) evalJsonExpr(result interface{}, op ast.Token, expr ast.Expr) interface{} {
 	switch op {
 	case ast.ARROW:

+ 8 - 0
pkg/ast/expr.go

@@ -159,6 +159,14 @@ type CaseExpr struct {
 func (c *CaseExpr) expr() {}
 func (c *CaseExpr) node() {}
 
+type ValueSetExpr struct {
+	LiteralExprs []Expr // ("A", "B", "C") or (1, 2, 3)
+	ArrayExpr    Expr
+}
+
+func (c *ValueSetExpr) expr() {}
+func (c *ValueSetExpr) node() {}
+
 type StreamName string
 
 func (sn *StreamName) node() {}

+ 12 - 1
pkg/ast/token.go

@@ -55,6 +55,9 @@ const (
 
 	SUBSET //[
 	ARROW  //->
+	IN     // IN
+	NOT    // NOT
+	NOTIN  // NOT
 
 	operatorEnd
 
@@ -165,6 +168,7 @@ var Tokens = []string{
 
 	SUBSET: "[]",
 	ARROW:  "->",
+	IN:     "IN",
 
 	ASTERISK: "*",
 	COMMA:    ",",
@@ -192,6 +196,12 @@ var Tokens = []string{
 	BY:     "BY",
 	ASC:    "ASC",
 	DESC:   "DESC",
+	FILTER: "FILTER",
+	CASE:   "CASE",
+	WHEN:   "WHEN",
+	THEN:   "THEN",
+	ELSE:   "ELSE",
+	END:    "END",
 
 	CREATE:   "CREATE",
 	DROP:     "RROP",
@@ -228,6 +238,7 @@ var Tokens = []string{
 	OR:    "OR",
 	TRUE:  "TRUE",
 	FALSE: "FALSE",
+	NOTIN: "NOTIN",
 
 	DD: "DD",
 	HH: "HH",
@@ -264,7 +275,7 @@ func (tok Token) Precedence() int {
 		return 1
 	case AND:
 		return 2
-	case EQ, NEQ, LT, LTE, GT, GTE:
+	case EQ, NEQ, LT, LTE, GT, GTE, IN, NOTIN:
 		return 3
 	case ADD, SUB, BITWISE_OR, BITWISE_XOR:
 		return 4

+ 6 - 0
pkg/ast/visitor.go

@@ -113,6 +113,12 @@ func Walk(v Visitor, node Node) {
 
 	case *ColFuncField:
 		Walk(v, n.Expr)
+
+	case *ValueSetExpr:
+		for _, l := range n.LiteralExprs {
+			Walk(v, l)
+		}
+		Walk(v, n.ArrayExpr)
 	}
 }