collection.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540
  1. // Copyright 2022-2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package xsql
  15. import (
  16. "github.com/lf-edge/ekuiper/pkg/ast"
  17. )
  18. /*
  19. * Collection interfaces
  20. */
  21. // AggregateData Could be a tuple or collection
  22. type AggregateData interface {
  23. AggregateEval(expr ast.Expr, v CallValuer) []interface{}
  24. }
  25. type SortingData interface {
  26. Len() int
  27. Swap(i, j int)
  28. Index(i int) Row
  29. }
  30. // Collection A collection of rows as a table. It is used for window, join, group by, etc.
  31. type Collection interface {
  32. SortingData
  33. // GroupRange through each group. For non-grouped collection, the whole data is a single group
  34. GroupRange(func(i int, aggRow CollectionRow) (bool, error)) error
  35. // Range through each row. For grouped collection, each row is an aggregation of groups
  36. Range(func(i int, r ReadonlyRow) (bool, error)) error
  37. // RangeSet range through each row by cloneing the row
  38. RangeSet(func(i int, r Row) (bool, error)) error
  39. Filter(indexes []int) Collection
  40. GetWindowRange() *WindowRange
  41. Clone() Collection
  42. // ToMaps returns the data as a map
  43. ToMaps() []map[string]interface{}
  44. }
  45. type SingleCollection interface {
  46. Collection
  47. CollectionRow
  48. SetIsAgg(isAgg bool)
  49. // ToAggMaps returns the aggregated data as a map
  50. ToAggMaps() []map[string]interface{}
  51. // ToRowMaps returns all the data in the collection
  52. ToRowMaps() []map[string]interface{}
  53. }
  54. type GroupedCollection interface {
  55. Collection
  56. }
  57. // MergedCollection is a collection of rows that are from different sources
  58. type MergedCollection interface {
  59. Collection
  60. GetBySrc(emitter string) []TupleRow
  61. }
  62. /*
  63. * Collection types definitions
  64. */
  65. type WindowTuples struct {
  66. Content []TupleRow // immutable
  67. *WindowRange
  68. contentBySrc map[string][]TupleRow // volatile, temporary cache]
  69. AffiliateRow
  70. cachedMap map[string]interface{}
  71. isAgg bool
  72. }
  73. var (
  74. _ MergedCollection = &WindowTuples{}
  75. _ SingleCollection = &WindowTuples{}
  76. )
  77. // Window Tuples is also an aggregate row
  78. var _ CollectionRow = &WindowTuples{}
  79. type JoinTuples struct {
  80. Content []*JoinTuple
  81. *WindowRange
  82. AffiliateRow
  83. cachedMap map[string]interface{}
  84. isAgg bool
  85. }
  86. var (
  87. _ SingleCollection = &JoinTuples{}
  88. _ CollectionRow = &JoinTuples{}
  89. )
  90. type GroupedTuplesSet struct {
  91. Groups []*GroupedTuples
  92. *WindowRange
  93. }
  94. var _ GroupedCollection = &GroupedTuplesSet{}
  95. /*
  96. * Collection implementations
  97. */
  98. func (w *WindowTuples) Index(index int) Row {
  99. return w.Content[index]
  100. }
  101. func (w *WindowTuples) Len() int {
  102. return len(w.Content)
  103. }
  104. func (w *WindowTuples) Swap(i, j int) {
  105. w.cachedMap = nil
  106. w.Content[i], w.Content[j] = w.Content[j], w.Content[i]
  107. }
  108. func (w *WindowTuples) GetBySrc(emitter string) []TupleRow {
  109. if w.contentBySrc == nil {
  110. w.contentBySrc = make(map[string][]TupleRow)
  111. for _, t := range w.Content {
  112. e := t.GetEmitter()
  113. if _, hasEmitter := w.contentBySrc[e]; !hasEmitter {
  114. w.contentBySrc[e] = make([]TupleRow, 0)
  115. }
  116. w.contentBySrc[e] = append(w.contentBySrc[e], t)
  117. }
  118. }
  119. return w.contentBySrc[emitter]
  120. }
  121. func (w *WindowTuples) GetWindowRange() *WindowRange {
  122. return w.WindowRange
  123. }
  124. func (w *WindowTuples) Range(f func(i int, r ReadonlyRow) (bool, error)) error {
  125. for i, r := range w.Content {
  126. b, e := f(i, r)
  127. if e != nil {
  128. return e
  129. }
  130. if !b {
  131. break
  132. }
  133. }
  134. return nil
  135. }
  136. func (w *WindowTuples) RangeSet(f func(i int, r Row) (bool, error)) error {
  137. for i, r := range w.Content {
  138. rc := r.Clone()
  139. b, e := f(i, rc)
  140. if e != nil {
  141. return e
  142. }
  143. if !b {
  144. break
  145. }
  146. w.Content[i] = rc.(TupleRow)
  147. }
  148. return nil
  149. }
  150. func (w *WindowTuples) GroupRange(f func(i int, aggRow CollectionRow) (bool, error)) error {
  151. _, err := f(0, w)
  152. return err
  153. }
  154. func (w *WindowTuples) AddTuple(tuple *Tuple) *WindowTuples {
  155. w.Content = append(w.Content, tuple)
  156. return w
  157. }
  158. func (w *WindowTuples) AggregateEval(expr ast.Expr, v CallValuer) []interface{} {
  159. var result []interface{}
  160. for _, t := range w.Content {
  161. result = append(result, Eval(expr, MultiValuer(t, &WindowRangeValuer{WindowRange: w.WindowRange}, v, &WildcardValuer{t})))
  162. }
  163. return result
  164. }
  165. // Filter the tuples by the given predicate
  166. func (w *WindowTuples) Filter(indexes []int) Collection {
  167. w.cachedMap = nil
  168. newC := make([]TupleRow, 0, len(indexes))
  169. for _, i := range indexes {
  170. newC = append(newC, w.Content[i])
  171. }
  172. w.Content = newC
  173. return w
  174. }
  175. func (w *WindowTuples) Value(key, table string) (interface{}, bool) {
  176. r, ok := w.AffiliateRow.Value(key, table)
  177. if ok {
  178. return r, ok
  179. }
  180. if len(w.Content) > 0 {
  181. return w.Content[0].Value(key, table)
  182. }
  183. return nil, false
  184. }
  185. func (w *WindowTuples) Meta(key, table string) (interface{}, bool) {
  186. if len(w.Content) > 0 {
  187. return w.Content[0].Value(key, table)
  188. }
  189. return nil, false
  190. }
  191. func (w *WindowTuples) All(_ string) (map[string]interface{}, bool) {
  192. return w.ToMap(), true
  193. }
  194. func (w *WindowTuples) ToMap() map[string]interface{} {
  195. if w.cachedMap == nil {
  196. m := make(map[string]interface{})
  197. if len(w.Content) > 0 {
  198. for k, v := range w.Content[0].ToMap() {
  199. m[k] = v
  200. }
  201. }
  202. w.cachedMap = m
  203. }
  204. w.AffiliateRow.MergeMap(w.cachedMap)
  205. return w.cachedMap
  206. }
  207. func (w *WindowTuples) Clone() Collection {
  208. ts := make([]TupleRow, len(w.Content))
  209. for i, t := range w.Content {
  210. ts[i] = t.Clone().(TupleRow)
  211. }
  212. c := &WindowTuples{
  213. Content: ts,
  214. WindowRange: w.WindowRange,
  215. AffiliateRow: w.AffiliateRow.Clone(),
  216. isAgg: w.isAgg,
  217. }
  218. return c
  219. }
  220. func (w *WindowTuples) ToAggMaps() []map[string]interface{} {
  221. return []map[string]interface{}{w.ToMap()}
  222. }
  223. func (w *WindowTuples) ToRowMaps() []map[string]interface{} {
  224. r := make([]map[string]interface{}, len(w.Content))
  225. for i, t := range w.Content {
  226. r[i] = t.ToMap()
  227. }
  228. return r
  229. }
  230. func (w *WindowTuples) ToMaps() []map[string]interface{} {
  231. if w.isAgg {
  232. return w.ToAggMaps()
  233. } else {
  234. return w.ToRowMaps()
  235. }
  236. }
  237. func (w *WindowTuples) Pick(allWildcard bool, cols [][]string, wildcardEmitters map[string]bool) {
  238. cols = w.AffiliateRow.Pick(cols)
  239. for i, t := range w.Content {
  240. tc := t.Clone()
  241. tc.Pick(allWildcard, cols, wildcardEmitters)
  242. w.Content[i] = tc.(TupleRow)
  243. }
  244. }
  245. func (w *WindowTuples) SetIsAgg(_ bool) {
  246. w.isAgg = true
  247. }
  248. func (s *JoinTuples) Len() int { return len(s.Content) }
  249. func (s *JoinTuples) Swap(i, j int) {
  250. s.cachedMap = nil
  251. s.Content[i], s.Content[j] = s.Content[j], s.Content[i]
  252. }
  253. func (s *JoinTuples) Index(i int) Row { return s.Content[i] }
  254. func (s *JoinTuples) AggregateEval(expr ast.Expr, v CallValuer) []interface{} {
  255. var result []interface{}
  256. for _, t := range s.Content {
  257. result = append(result, Eval(expr, MultiValuer(t, &WindowRangeValuer{WindowRange: s.WindowRange}, v, &WildcardValuer{t})))
  258. }
  259. return result
  260. }
  261. func (s *JoinTuples) GetWindowRange() *WindowRange {
  262. return s.WindowRange
  263. }
  264. func (s *JoinTuples) Range(f func(i int, r ReadonlyRow) (bool, error)) error {
  265. for i, r := range s.Content {
  266. b, e := f(i, r)
  267. if e != nil {
  268. return e
  269. }
  270. if !b {
  271. break
  272. }
  273. }
  274. return nil
  275. }
  276. func (s *JoinTuples) RangeSet(f func(i int, r Row) (bool, error)) error {
  277. for i, r := range s.Content {
  278. rc := r.Clone()
  279. b, e := f(i, rc)
  280. if e != nil {
  281. return e
  282. }
  283. if !b {
  284. break
  285. }
  286. s.Content[i] = rc.(*JoinTuple)
  287. }
  288. return nil
  289. }
  290. func (s *JoinTuples) GroupRange(f func(i int, aggRow CollectionRow) (bool, error)) error {
  291. _, err := f(0, s)
  292. return err
  293. }
  294. // Filter the tuples by the given predicate
  295. func (s *JoinTuples) Filter(indexes []int) Collection {
  296. newC := make([]*JoinTuple, 0, len(indexes))
  297. for _, i := range indexes {
  298. newC = append(newC, s.Content[i])
  299. }
  300. s.Content = newC
  301. s.cachedMap = nil
  302. return s
  303. }
  304. func (s *JoinTuples) Value(key, table string) (interface{}, bool) {
  305. r, ok := s.AffiliateRow.Value(key, table)
  306. if ok {
  307. return r, ok
  308. }
  309. return s.Content[0].Value(key, table)
  310. }
  311. func (s *JoinTuples) Meta(key, table string) (interface{}, bool) {
  312. return s.Content[0].Meta(key, table)
  313. }
  314. func (s *JoinTuples) All(_ string) (map[string]interface{}, bool) {
  315. return s.ToMap(), true
  316. }
  317. func (s *JoinTuples) ToMap() map[string]interface{} {
  318. if s.cachedMap == nil {
  319. m := make(map[string]interface{})
  320. for k, v := range s.Content[0].ToMap() {
  321. m[k] = v
  322. }
  323. s.cachedMap = m
  324. }
  325. s.AffiliateRow.MergeMap(s.cachedMap)
  326. return s.cachedMap
  327. }
  328. func (s *JoinTuples) Clone() Collection {
  329. ts := make([]*JoinTuple, len(s.Content))
  330. for i, t := range s.Content {
  331. ts[i] = t.Clone().(*JoinTuple)
  332. }
  333. c := &JoinTuples{
  334. Content: ts,
  335. WindowRange: s.WindowRange,
  336. AffiliateRow: s.AffiliateRow.Clone(),
  337. isAgg: s.isAgg,
  338. }
  339. return c
  340. }
  341. func (s *JoinTuples) ToAggMaps() []map[string]interface{} {
  342. return []map[string]interface{}{s.ToMap()}
  343. }
  344. func (s *JoinTuples) ToRowMaps() []map[string]interface{} {
  345. r := make([]map[string]interface{}, len(s.Content))
  346. for i, t := range s.Content {
  347. r[i] = t.ToMap()
  348. }
  349. return r
  350. }
  351. func (s *JoinTuples) ToMaps() []map[string]interface{} {
  352. if s.isAgg {
  353. return s.ToAggMaps()
  354. } else {
  355. return s.ToRowMaps()
  356. }
  357. }
  358. func (s *JoinTuples) Pick(allWildcard bool, cols [][]string, wildcardEmitters map[string]bool) {
  359. cols = s.AffiliateRow.Pick(cols)
  360. for i, t := range s.Content {
  361. tc := t.Clone().(*JoinTuple)
  362. tc.Pick(allWildcard, cols, wildcardEmitters)
  363. s.Content[i] = tc
  364. }
  365. }
  366. func (s *JoinTuples) SetIsAgg(_ bool) {
  367. s.isAgg = true
  368. }
  369. func (s *GroupedTuplesSet) Len() int { return len(s.Groups) }
  370. func (s *GroupedTuplesSet) Swap(i, j int) { s.Groups[i], s.Groups[j] = s.Groups[j], s.Groups[i] }
  371. func (s *GroupedTuplesSet) Index(i int) Row { return s.Groups[i] }
  372. func (s *GroupedTuplesSet) GetWindowRange() *WindowRange {
  373. return s.WindowRange
  374. }
  375. func (s *GroupedTuplesSet) Range(f func(i int, r ReadonlyRow) (bool, error)) error {
  376. for i, r := range s.Groups {
  377. b, e := f(i, r)
  378. if e != nil {
  379. return e
  380. }
  381. if !b {
  382. break
  383. }
  384. }
  385. return nil
  386. }
  387. func (s *GroupedTuplesSet) RangeSet(f func(i int, r Row) (bool, error)) error {
  388. for i, r := range s.Groups {
  389. rc := r.Clone()
  390. b, e := f(i, rc)
  391. if e != nil {
  392. return e
  393. }
  394. if !b {
  395. break
  396. }
  397. s.Groups[i] = rc.(*GroupedTuples)
  398. }
  399. return nil
  400. }
  401. func (s *GroupedTuplesSet) GroupRange(f func(i int, aggRow CollectionRow) (bool, error)) error {
  402. for i, r := range s.Groups {
  403. b, e := f(i, r)
  404. if e != nil {
  405. return e
  406. }
  407. if !b {
  408. break
  409. }
  410. }
  411. return nil
  412. }
  413. // Filter clone and return the filtered set
  414. func (s *GroupedTuplesSet) Filter(groups []int) Collection {
  415. newC := make([]*GroupedTuples, 0, len(groups))
  416. for _, i := range groups {
  417. newC = append(newC, s.Groups[i])
  418. }
  419. s.Groups = newC
  420. return s
  421. }
  422. func (s *GroupedTuplesSet) Clone() Collection {
  423. ng := make([]*GroupedTuples, len(s.Groups))
  424. for i, g := range s.Groups {
  425. ng[i] = g.Clone().(*GroupedTuples)
  426. }
  427. return &GroupedTuplesSet{
  428. Groups: ng,
  429. WindowRange: s.WindowRange,
  430. }
  431. }
  432. func (s *GroupedTuplesSet) ToMaps() []map[string]interface{} {
  433. r := make([]map[string]interface{}, len(s.Groups))
  434. for i, t := range s.Groups {
  435. r[i] = t.ToMap()
  436. }
  437. return r
  438. }
  439. /*
  440. * WindowRange definitions. It should be immutable
  441. */
  442. type WindowRangeValuer struct {
  443. *WindowRange
  444. }
  445. func (w WindowRangeValuer) Value(_, _ string) (interface{}, bool) {
  446. return nil, false
  447. }
  448. func (w WindowRangeValuer) Meta(_, _ string) (interface{}, bool) {
  449. return nil, false
  450. }
  451. type WindowRange struct {
  452. windowStart int64
  453. windowEnd int64
  454. }
  455. func NewWindowRange(windowStart int64, windowEnd int64) *WindowRange {
  456. return &WindowRange{windowStart, windowEnd}
  457. }
  458. func (r *WindowRange) FuncValue(key string) (interface{}, bool) {
  459. switch key {
  460. case "window_start":
  461. return r.windowStart, true
  462. case "window_end":
  463. return r.windowEnd, true
  464. default:
  465. return nil, false
  466. }
  467. }