projectset_operator.go 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152
  1. // Copyright 2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package operator
  15. import (
  16. "fmt"
  17. "github.com/lf-edge/ekuiper/internal/xsql"
  18. "github.com/lf-edge/ekuiper/pkg/api"
  19. )
  20. type ProjectSetOperator struct {
  21. SrfMapping map[string]struct{}
  22. }
  23. // Apply implement UnOperation
  24. // ProjectSetOperator will extract the results from the set-returning-function into multi rows by aligning other columns
  25. // For tuple, ProjectSetOperator will do the following transform:
  26. // {"a":[1,2],"b":3} => {"a":1,"b":3},{"a":2,"b":3}
  27. // For Collection, ProjectSetOperator will do the following transform:
  28. // [{"a":[1,2],"b":3},{"a":[1,2],"b":4}] = > [{"a":"1","b":3},{"a":"2","b":3},{"a":"1","b":4},{"a":"2","b":4}]
  29. func (ps *ProjectSetOperator) Apply(_ api.StreamContext, data interface{}, _ *xsql.FunctionValuer, _ *xsql.AggregateFunctionValuer) interface{} {
  30. switch input := data.(type) {
  31. case error:
  32. return input
  33. case xsql.TupleRow:
  34. results, err := ps.handleSRFRow(input)
  35. if err != nil {
  36. return err
  37. }
  38. return results.rows
  39. case xsql.Collection:
  40. if err := ps.handleSRFRowForCollection(input); err != nil {
  41. return err
  42. }
  43. return input
  44. default:
  45. return fmt.Errorf("run Select error: invalid input %[1]T(%[1]v)", input)
  46. }
  47. }
  48. func (ps *ProjectSetOperator) handleSRFRowForCollection(data xsql.Collection) error {
  49. switch collection := data.(type) {
  50. case *xsql.JoinTuples:
  51. newContent := make([]*xsql.JoinTuple, 0)
  52. for _, c := range collection.Content {
  53. rs, err := ps.handleSRFRow(c)
  54. if err != nil {
  55. return err
  56. }
  57. newContent = append(newContent, rs.joinTuples...)
  58. }
  59. collection.Content = newContent
  60. case *xsql.GroupedTuplesSet:
  61. newGroups := make([]*xsql.GroupedTuples, 0)
  62. for _, c := range collection.Groups {
  63. rs, err := ps.handleSRFRow(c)
  64. if err != nil {
  65. return err
  66. }
  67. newGroups = append(newGroups, rs.groupTuples...)
  68. }
  69. collection.Groups = newGroups
  70. case *xsql.WindowTuples:
  71. newContent := make([]xsql.TupleRow, 0)
  72. for _, c := range collection.Content {
  73. rs, err := ps.handleSRFRow(c)
  74. if err != nil {
  75. return err
  76. }
  77. newContent = append(newContent, rs.rows...)
  78. }
  79. collection.Content = newContent
  80. default:
  81. return fmt.Errorf("run Select error: invalid input %[1]T(%[1]v)", data)
  82. }
  83. return nil
  84. }
  85. func (ps *ProjectSetOperator) handleSRFRow(row xsql.CloneAbleRow) (*resultWrapper, error) {
  86. // for now we only support 1 srf function in the field
  87. srfName := ""
  88. for k := range ps.SrfMapping {
  89. srfName = k
  90. break
  91. }
  92. aValue, ok := row.Value(srfName, "")
  93. if !ok {
  94. return nil, fmt.Errorf("can't find the result from the %v function", srfName)
  95. }
  96. aValues, ok := aValue.([]interface{})
  97. if !ok {
  98. return nil, fmt.Errorf("the argument for the %v function should be array", srfName)
  99. }
  100. res := newResultWrapper(len(aValues), row)
  101. for i, v := range aValues {
  102. newTupleRow := row.Clone()
  103. // clear original column value
  104. newTupleRow.Del(srfName)
  105. if mv, ok := v.(map[string]interface{}); ok {
  106. for k, v := range mv {
  107. newTupleRow.Set(k, v)
  108. }
  109. } else {
  110. newTupleRow.Set(srfName, v)
  111. }
  112. res.appendTuple(i, newTupleRow)
  113. }
  114. return res, nil
  115. }
  116. type resultWrapper struct {
  117. joinTuples []*xsql.JoinTuple
  118. groupTuples []*xsql.GroupedTuples
  119. rows []xsql.TupleRow
  120. }
  121. func newResultWrapper(len int, row xsql.CloneAbleRow) *resultWrapper {
  122. r := &resultWrapper{}
  123. switch row.(type) {
  124. case *xsql.JoinTuple:
  125. r.joinTuples = make([]*xsql.JoinTuple, len)
  126. case *xsql.GroupedTuples:
  127. r.groupTuples = make([]*xsql.GroupedTuples, len)
  128. case xsql.TupleRow:
  129. r.rows = make([]xsql.TupleRow, len)
  130. }
  131. return r
  132. }
  133. func (r *resultWrapper) appendTuple(index int, newRow xsql.CloneAbleRow) {
  134. switch row := newRow.(type) {
  135. case *xsql.JoinTuple:
  136. r.joinTuples[index] = row
  137. case *xsql.GroupedTuples:
  138. r.groupTuples[index] = row
  139. case xsql.TupleRow:
  140. r.rows[index] = row
  141. }
  142. }