collection.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550
  1. // Copyright 2022-2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package xsql
  15. import (
  16. "sort"
  17. "github.com/lf-edge/ekuiper/pkg/ast"
  18. )
  19. /*
  20. * Collection interfaces
  21. */
  22. // AggregateData Could be a tuple or collection
  23. type AggregateData interface {
  24. AggregateEval(expr ast.Expr, v CallValuer) []interface{}
  25. }
  26. type SortingData interface {
  27. Len() int
  28. Swap(i, j int)
  29. Index(i int) Row
  30. }
  31. // Collection A collection of rows as a table. It is used for window, join, group by, etc.
  32. type Collection interface {
  33. SortingData
  34. // GroupRange through each group. For non-grouped collection, the whole data is a single group
  35. GroupRange(func(i int, aggRow CollectionRow) (bool, error)) error
  36. // Range through each row. For grouped collection, each row is an aggregation of groups
  37. Range(func(i int, r ReadonlyRow) (bool, error)) error
  38. // RangeSet range through each row by cloneing the row
  39. RangeSet(func(i int, r Row) (bool, error)) error
  40. Filter(indexes []int) Collection
  41. GetWindowRange() *WindowRange
  42. Clone() Collection
  43. // ToMaps returns the data as a map
  44. ToMaps() []map[string]interface{}
  45. }
  46. type SingleCollection interface {
  47. Collection
  48. CollectionRow
  49. SetIsAgg(isAgg bool)
  50. // ToAggMaps returns the aggregated data as a map
  51. ToAggMaps() []map[string]interface{}
  52. // ToRowMaps returns all the data in the collection
  53. ToRowMaps() []map[string]interface{}
  54. }
  55. type GroupedCollection interface {
  56. Collection
  57. }
  58. // MergedCollection is a collection of rows that are from different sources
  59. type MergedCollection interface {
  60. Collection
  61. GetBySrc(emitter string) []TupleRow
  62. }
  63. /*
  64. * Collection types definitions
  65. */
  66. type WindowTuples struct {
  67. Content []TupleRow // immutable
  68. *WindowRange
  69. contentBySrc map[string][]TupleRow // volatile, temporary cache]
  70. AffiliateRow
  71. cachedMap map[string]interface{}
  72. isAgg bool
  73. }
  74. var (
  75. _ MergedCollection = &WindowTuples{}
  76. _ SingleCollection = &WindowTuples{}
  77. )
  78. // Window Tuples is also an aggregate row
  79. var _ CollectionRow = &WindowTuples{}
  80. type JoinTuples struct {
  81. Content []*JoinTuple
  82. *WindowRange
  83. AffiliateRow
  84. cachedMap map[string]interface{}
  85. isAgg bool
  86. }
  87. var (
  88. _ SingleCollection = &JoinTuples{}
  89. _ CollectionRow = &JoinTuples{}
  90. )
  91. type GroupedTuplesSet struct {
  92. Groups []*GroupedTuples
  93. *WindowRange
  94. }
  95. var _ GroupedCollection = &GroupedTuplesSet{}
  96. /*
  97. * Collection implementations
  98. */
  99. func (w *WindowTuples) Index(index int) Row {
  100. return w.Content[index]
  101. }
  102. func (w *WindowTuples) Len() int {
  103. return len(w.Content)
  104. }
  105. func (w *WindowTuples) Swap(i, j int) {
  106. w.cachedMap = nil
  107. w.Content[i], w.Content[j] = w.Content[j], w.Content[i]
  108. }
  109. func (w *WindowTuples) GetBySrc(emitter string) []TupleRow {
  110. if w.contentBySrc == nil {
  111. w.contentBySrc = make(map[string][]TupleRow)
  112. for _, t := range w.Content {
  113. e := t.GetEmitter()
  114. if _, hasEmitter := w.contentBySrc[e]; !hasEmitter {
  115. w.contentBySrc[e] = make([]TupleRow, 0)
  116. }
  117. w.contentBySrc[e] = append(w.contentBySrc[e], t)
  118. }
  119. }
  120. return w.contentBySrc[emitter]
  121. }
  122. func (w *WindowTuples) GetWindowRange() *WindowRange {
  123. return w.WindowRange
  124. }
  125. func (w *WindowTuples) Range(f func(i int, r ReadonlyRow) (bool, error)) error {
  126. for i, r := range w.Content {
  127. b, e := f(i, r)
  128. if e != nil {
  129. return e
  130. }
  131. if !b {
  132. break
  133. }
  134. }
  135. return nil
  136. }
  137. func (w *WindowTuples) RangeSet(f func(i int, r Row) (bool, error)) error {
  138. for i, r := range w.Content {
  139. rc := r.Clone()
  140. b, e := f(i, rc)
  141. if e != nil {
  142. return e
  143. }
  144. if !b {
  145. break
  146. }
  147. w.Content[i] = rc.(TupleRow)
  148. }
  149. return nil
  150. }
  151. func (w *WindowTuples) GroupRange(f func(i int, aggRow CollectionRow) (bool, error)) error {
  152. _, err := f(0, w)
  153. return err
  154. }
  155. func (w *WindowTuples) AddTuple(tuple *Tuple) *WindowTuples {
  156. w.Content = append(w.Content, tuple)
  157. return w
  158. }
  159. // Sort by tuple timestamp
  160. func (w *WindowTuples) Sort() {
  161. w.cachedMap = nil
  162. sort.SliceStable(w.Content, func(i, j int) bool {
  163. return w.Content[i].(Event).GetTimestamp() < w.Content[j].(Event).GetTimestamp()
  164. })
  165. }
  166. func (w *WindowTuples) AggregateEval(expr ast.Expr, v CallValuer) []interface{} {
  167. var result []interface{}
  168. for _, t := range w.Content {
  169. result = append(result, Eval(expr, MultiValuer(t, &WindowRangeValuer{WindowRange: w.WindowRange}, v, &WildcardValuer{t})))
  170. }
  171. return result
  172. }
  173. // Filter the tuples by the given predicate
  174. func (w *WindowTuples) Filter(indexes []int) Collection {
  175. w.cachedMap = nil
  176. newC := make([]TupleRow, 0, len(indexes))
  177. for _, i := range indexes {
  178. newC = append(newC, w.Content[i])
  179. }
  180. w.Content = newC
  181. return w
  182. }
  183. func (w *WindowTuples) Value(key, table string) (interface{}, bool) {
  184. r, ok := w.AffiliateRow.Value(key, table)
  185. if ok {
  186. return r, ok
  187. }
  188. if len(w.Content) > 0 {
  189. return w.Content[0].Value(key, table)
  190. }
  191. return nil, false
  192. }
  193. func (w *WindowTuples) Meta(key, table string) (interface{}, bool) {
  194. if len(w.Content) > 0 {
  195. return w.Content[0].Value(key, table)
  196. }
  197. return nil, false
  198. }
  199. func (w *WindowTuples) All(_ string) (Message, bool) {
  200. return w.ToMap(), true
  201. }
  202. func (w *WindowTuples) ToMap() map[string]interface{} {
  203. if w.cachedMap == nil {
  204. m := make(map[string]interface{})
  205. if len(w.Content) > 0 {
  206. for k, v := range w.Content[0].ToMap() {
  207. m[k] = v
  208. }
  209. }
  210. w.cachedMap = m
  211. }
  212. w.AffiliateRow.MergeMap(w.cachedMap)
  213. return w.cachedMap
  214. }
  215. func (w *WindowTuples) Clone() Collection {
  216. ts := make([]TupleRow, len(w.Content))
  217. for i, t := range w.Content {
  218. ts[i] = t.Clone().(TupleRow)
  219. }
  220. c := &WindowTuples{
  221. Content: ts,
  222. WindowRange: w.WindowRange,
  223. AffiliateRow: w.AffiliateRow.Clone(),
  224. isAgg: w.isAgg,
  225. }
  226. return c
  227. }
  228. func (w *WindowTuples) ToAggMaps() []map[string]interface{} {
  229. return []map[string]interface{}{w.ToMap()}
  230. }
  231. func (w *WindowTuples) ToRowMaps() []map[string]interface{} {
  232. r := make([]map[string]interface{}, len(w.Content))
  233. for i, t := range w.Content {
  234. r[i] = t.ToMap()
  235. }
  236. return r
  237. }
  238. func (w *WindowTuples) ToMaps() []map[string]interface{} {
  239. if w.isAgg {
  240. return w.ToAggMaps()
  241. } else {
  242. return w.ToRowMaps()
  243. }
  244. }
  245. func (w *WindowTuples) Pick(allWildcard bool, cols [][]string, wildcardEmitters map[string]bool) {
  246. cols = w.AffiliateRow.Pick(cols)
  247. for i, t := range w.Content {
  248. tc := t.Clone()
  249. tc.Pick(allWildcard, cols, wildcardEmitters)
  250. w.Content[i] = tc.(TupleRow)
  251. }
  252. }
  253. func (w *WindowTuples) SetIsAgg(_ bool) {
  254. w.isAgg = true
  255. }
  256. func (s *JoinTuples) Len() int { return len(s.Content) }
  257. func (s *JoinTuples) Swap(i, j int) {
  258. s.cachedMap = nil
  259. s.Content[i], s.Content[j] = s.Content[j], s.Content[i]
  260. }
  261. func (s *JoinTuples) Index(i int) Row { return s.Content[i] }
  262. func (s *JoinTuples) AggregateEval(expr ast.Expr, v CallValuer) []interface{} {
  263. var result []interface{}
  264. for _, t := range s.Content {
  265. result = append(result, Eval(expr, MultiValuer(t, &WindowRangeValuer{WindowRange: s.WindowRange}, v, &WildcardValuer{t})))
  266. }
  267. return result
  268. }
  269. func (s *JoinTuples) GetWindowRange() *WindowRange {
  270. return s.WindowRange
  271. }
  272. func (s *JoinTuples) Range(f func(i int, r ReadonlyRow) (bool, error)) error {
  273. for i, r := range s.Content {
  274. b, e := f(i, r)
  275. if e != nil {
  276. return e
  277. }
  278. if !b {
  279. break
  280. }
  281. }
  282. return nil
  283. }
  284. func (s *JoinTuples) RangeSet(f func(i int, r Row) (bool, error)) error {
  285. for i, r := range s.Content {
  286. rc := r.Clone()
  287. b, e := f(i, rc)
  288. if e != nil {
  289. return e
  290. }
  291. if !b {
  292. break
  293. }
  294. s.Content[i] = rc.(*JoinTuple)
  295. }
  296. return nil
  297. }
  298. func (s *JoinTuples) GroupRange(f func(i int, aggRow CollectionRow) (bool, error)) error {
  299. _, err := f(0, s)
  300. return err
  301. }
  302. // Filter the tuples by the given predicate
  303. func (s *JoinTuples) Filter(indexes []int) Collection {
  304. newC := make([]*JoinTuple, 0, len(indexes))
  305. for _, i := range indexes {
  306. newC = append(newC, s.Content[i])
  307. }
  308. s.Content = newC
  309. s.cachedMap = nil
  310. return s
  311. }
  312. func (s *JoinTuples) Value(key, table string) (interface{}, bool) {
  313. r, ok := s.AffiliateRow.Value(key, table)
  314. if ok {
  315. return r, ok
  316. }
  317. return s.Content[0].Value(key, table)
  318. }
  319. func (s *JoinTuples) Meta(key, table string) (interface{}, bool) {
  320. return s.Content[0].Meta(key, table)
  321. }
  322. func (s *JoinTuples) All(_ string) (Message, bool) {
  323. return s.ToMap(), true
  324. }
  325. func (s *JoinTuples) ToMap() map[string]interface{} {
  326. if s.cachedMap == nil {
  327. m := make(map[string]interface{})
  328. for k, v := range s.Content[0].ToMap() {
  329. m[k] = v
  330. }
  331. s.cachedMap = m
  332. }
  333. s.AffiliateRow.MergeMap(s.cachedMap)
  334. return s.cachedMap
  335. }
  336. func (s *JoinTuples) Clone() Collection {
  337. ts := make([]*JoinTuple, len(s.Content))
  338. for i, t := range s.Content {
  339. ts[i] = t.Clone().(*JoinTuple)
  340. }
  341. c := &JoinTuples{
  342. Content: ts,
  343. WindowRange: s.WindowRange,
  344. AffiliateRow: s.AffiliateRow.Clone(),
  345. isAgg: s.isAgg,
  346. }
  347. return c
  348. }
  349. func (s *JoinTuples) ToAggMaps() []map[string]interface{} {
  350. return []map[string]interface{}{s.ToMap()}
  351. }
  352. func (s *JoinTuples) ToRowMaps() []map[string]interface{} {
  353. r := make([]map[string]interface{}, len(s.Content))
  354. for i, t := range s.Content {
  355. r[i] = t.ToMap()
  356. }
  357. return r
  358. }
  359. func (s *JoinTuples) ToMaps() []map[string]interface{} {
  360. if s.isAgg {
  361. return s.ToAggMaps()
  362. } else {
  363. return s.ToRowMaps()
  364. }
  365. }
  366. func (s *JoinTuples) Pick(allWildcard bool, cols [][]string, wildcardEmitters map[string]bool) {
  367. cols = s.AffiliateRow.Pick(cols)
  368. for i, t := range s.Content {
  369. tc := t.Clone().(*JoinTuple)
  370. tc.Pick(allWildcard, cols, wildcardEmitters)
  371. s.Content[i] = tc
  372. }
  373. }
  374. func (s *JoinTuples) SetIsAgg(_ bool) {
  375. s.isAgg = true
  376. }
  377. func (s *GroupedTuplesSet) Len() int { return len(s.Groups) }
  378. func (s *GroupedTuplesSet) Swap(i, j int) { s.Groups[i], s.Groups[j] = s.Groups[j], s.Groups[i] }
  379. func (s *GroupedTuplesSet) Index(i int) Row { return s.Groups[i] }
  380. func (s *GroupedTuplesSet) GetWindowRange() *WindowRange {
  381. return s.WindowRange
  382. }
  383. func (s *GroupedTuplesSet) Range(f func(i int, r ReadonlyRow) (bool, error)) error {
  384. for i, r := range s.Groups {
  385. b, e := f(i, r)
  386. if e != nil {
  387. return e
  388. }
  389. if !b {
  390. break
  391. }
  392. }
  393. return nil
  394. }
  395. func (s *GroupedTuplesSet) RangeSet(f func(i int, r Row) (bool, error)) error {
  396. for i, r := range s.Groups {
  397. rc := r.Clone()
  398. b, e := f(i, rc)
  399. if e != nil {
  400. return e
  401. }
  402. if !b {
  403. break
  404. }
  405. s.Groups[i] = rc.(*GroupedTuples)
  406. }
  407. return nil
  408. }
  409. func (s *GroupedTuplesSet) GroupRange(f func(i int, aggRow CollectionRow) (bool, error)) error {
  410. for i, r := range s.Groups {
  411. b, e := f(i, r)
  412. if e != nil {
  413. return e
  414. }
  415. if !b {
  416. break
  417. }
  418. }
  419. return nil
  420. }
  421. // Filter clone and return the filtered set
  422. func (s *GroupedTuplesSet) Filter(groups []int) Collection {
  423. newC := make([]*GroupedTuples, 0, len(groups))
  424. for _, i := range groups {
  425. newC = append(newC, s.Groups[i])
  426. }
  427. s.Groups = newC
  428. return s
  429. }
  430. func (s *GroupedTuplesSet) Clone() Collection {
  431. ng := make([]*GroupedTuples, len(s.Groups))
  432. for i, g := range s.Groups {
  433. ng[i] = g.Clone().(*GroupedTuples)
  434. }
  435. return &GroupedTuplesSet{
  436. Groups: ng,
  437. WindowRange: s.WindowRange,
  438. }
  439. }
  440. func (s *GroupedTuplesSet) ToMaps() []map[string]interface{} {
  441. r := make([]map[string]interface{}, len(s.Groups))
  442. for i, t := range s.Groups {
  443. r[i] = t.ToMap()
  444. }
  445. return r
  446. }
  447. /*
  448. * WindowRange definitions. It should be immutable
  449. */
  450. type WindowRangeValuer struct {
  451. *WindowRange
  452. }
  453. func (w WindowRangeValuer) Value(_, _ string) (interface{}, bool) {
  454. return nil, false
  455. }
  456. func (w WindowRangeValuer) Meta(_, _ string) (interface{}, bool) {
  457. return nil, false
  458. }
  459. type WindowRange struct {
  460. windowStart int64
  461. windowEnd int64
  462. }
  463. func NewWindowRange(windowStart int64, windowEnd int64) *WindowRange {
  464. return &WindowRange{windowStart, windowEnd}
  465. }
  466. func (r *WindowRange) FuncValue(key string) (interface{}, bool) {
  467. switch key {
  468. case "window_start":
  469. return r.windowStart, true
  470. case "window_end":
  471. return r.windowEnd, true
  472. default:
  473. return nil, false
  474. }
  475. }