collection.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545
  1. // Copyright 2022 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package xsql
  15. import (
  16. "github.com/lf-edge/ekuiper/pkg/ast"
  17. "sort"
  18. )
  19. /*
  20. * Collection interfaces
  21. */
  22. // AggregateData Could be a tuple or collection
  23. type AggregateData interface {
  24. AggregateEval(expr ast.Expr, v CallValuer) []interface{}
  25. }
  26. type SortingData interface {
  27. Len() int
  28. Swap(i, j int)
  29. Index(i int) Row
  30. }
  31. // Collection A collection of rows as a table. It is used for window, join, group by, etc.
  32. type Collection interface {
  33. SortingData
  34. // GroupRange through each group. For non-grouped collection, the whole data is a single group
  35. GroupRange(func(i int, aggRow CollectionRow) (bool, error)) error
  36. // Range through each row. For grouped collection, each row is an aggregation of groups
  37. Range(func(i int, r ReadonlyRow) (bool, error)) error
  38. // RangeSet range through each row by cloneing the row
  39. RangeSet(func(i int, r Row) (bool, error)) error
  40. Filter(indexes []int) Collection
  41. GetWindowRange() *WindowRange
  42. Clone() Collection
  43. // ToMaps returns the data as a map
  44. ToMaps() []map[string]interface{}
  45. }
  46. type SingleCollection interface {
  47. Collection
  48. CollectionRow
  49. SetIsAgg(isAgg bool)
  50. // ToAggMaps returns the aggregated data as a map
  51. ToAggMaps() []map[string]interface{}
  52. // ToRowMaps returns all the data in the collection
  53. ToRowMaps() []map[string]interface{}
  54. }
  55. type GroupedCollection interface {
  56. Collection
  57. }
  58. // MergedCollection is a collection of rows that are from different sources
  59. type MergedCollection interface {
  60. Collection
  61. GetBySrc(emitter string) []TupleRow
  62. }
  63. /*
  64. * Collection types definitions
  65. */
  66. type WindowTuples struct {
  67. Content []TupleRow // immutable
  68. *WindowRange
  69. contentBySrc map[string][]TupleRow // volatile, temporary cache]
  70. AffiliateRow
  71. cachedMap map[string]interface{}
  72. isAgg bool
  73. }
  74. var _ MergedCollection = &WindowTuples{}
  75. var _ SingleCollection = &WindowTuples{}
  76. // Window Tuples is also an aggregate row
  77. var _ CollectionRow = &WindowTuples{}
  78. type JoinTuples struct {
  79. Content []*JoinTuple
  80. *WindowRange
  81. AffiliateRow
  82. cachedMap map[string]interface{}
  83. isAgg bool
  84. }
  85. var _ SingleCollection = &JoinTuples{}
  86. var _ CollectionRow = &JoinTuples{}
  87. type GroupedTuplesSet struct {
  88. Groups []*GroupedTuples
  89. *WindowRange
  90. }
  91. var _ GroupedCollection = &GroupedTuplesSet{}
  92. /*
  93. * Collection implementations
  94. */
  95. func (w *WindowTuples) Index(index int) Row {
  96. return w.Content[index]
  97. }
  98. func (w *WindowTuples) Len() int {
  99. return len(w.Content)
  100. }
  101. func (w *WindowTuples) Swap(i, j int) {
  102. w.cachedMap = nil
  103. w.Content[i], w.Content[j] = w.Content[j], w.Content[i]
  104. }
  105. func (w *WindowTuples) GetBySrc(emitter string) []TupleRow {
  106. if w.contentBySrc == nil {
  107. w.contentBySrc = make(map[string][]TupleRow)
  108. for _, t := range w.Content {
  109. e := t.GetEmitter()
  110. if _, hasEmitter := w.contentBySrc[e]; !hasEmitter {
  111. w.contentBySrc[e] = make([]TupleRow, 0)
  112. }
  113. w.contentBySrc[e] = append(w.contentBySrc[e], t)
  114. }
  115. }
  116. return w.contentBySrc[emitter]
  117. }
  118. func (w *WindowTuples) GetWindowRange() *WindowRange {
  119. return w.WindowRange
  120. }
  121. func (w *WindowTuples) Range(f func(i int, r ReadonlyRow) (bool, error)) error {
  122. for i, r := range w.Content {
  123. b, e := f(i, r)
  124. if e != nil {
  125. return e
  126. }
  127. if !b {
  128. break
  129. }
  130. }
  131. return nil
  132. }
  133. func (w *WindowTuples) RangeSet(f func(i int, r Row) (bool, error)) error {
  134. for i, r := range w.Content {
  135. rc := r.Clone()
  136. b, e := f(i, rc)
  137. if e != nil {
  138. return e
  139. }
  140. if !b {
  141. break
  142. }
  143. w.Content[i] = rc
  144. }
  145. return nil
  146. }
  147. func (w *WindowTuples) GroupRange(f func(i int, aggRow CollectionRow) (bool, error)) error {
  148. _, err := f(0, w)
  149. return err
  150. }
  151. func (w *WindowTuples) AddTuple(tuple *Tuple) *WindowTuples {
  152. w.Content = append(w.Content, tuple)
  153. return w
  154. }
  155. // Sort by tuple timestamp
  156. func (w *WindowTuples) Sort() {
  157. w.cachedMap = nil
  158. sort.SliceStable(w.Content, func(i, j int) bool {
  159. return w.Content[i].(Event).GetTimestamp() < w.Content[j].(Event).GetTimestamp()
  160. })
  161. }
  162. func (w *WindowTuples) AggregateEval(expr ast.Expr, v CallValuer) []interface{} {
  163. var result []interface{}
  164. for _, t := range w.Content {
  165. result = append(result, Eval(expr, MultiValuer(t, &WindowRangeValuer{WindowRange: w.WindowRange}, v, &WildcardValuer{t})))
  166. }
  167. return result
  168. }
  169. // Filter the tuples by the given predicate
  170. func (w *WindowTuples) Filter(indexes []int) Collection {
  171. w.cachedMap = nil
  172. newC := make([]TupleRow, 0, len(indexes))
  173. for _, i := range indexes {
  174. newC = append(newC, w.Content[i])
  175. }
  176. w.Content = newC
  177. return w
  178. }
  179. func (w *WindowTuples) Value(key, table string) (interface{}, bool) {
  180. r, ok := w.AffiliateRow.Value(key, table)
  181. if ok {
  182. return r, ok
  183. }
  184. if len(w.Content) > 0 {
  185. return w.Content[0].Value(key, table)
  186. }
  187. return nil, false
  188. }
  189. func (w *WindowTuples) Meta(key, table string) (interface{}, bool) {
  190. if len(w.Content) > 0 {
  191. return w.Content[0].Value(key, table)
  192. }
  193. return nil, false
  194. }
  195. func (w *WindowTuples) All(_ string) (Message, bool) {
  196. return w.ToMap(), true
  197. }
  198. func (w *WindowTuples) ToMap() map[string]interface{} {
  199. if w.cachedMap == nil {
  200. m := make(map[string]interface{})
  201. if len(w.Content) > 0 {
  202. for k, v := range w.Content[0].ToMap() {
  203. m[k] = v
  204. }
  205. }
  206. w.cachedMap = m
  207. }
  208. w.AffiliateRow.MergeMap(w.cachedMap)
  209. return w.cachedMap
  210. }
  211. func (w *WindowTuples) Clone() Collection {
  212. ts := make([]TupleRow, len(w.Content))
  213. for i, t := range w.Content {
  214. ts[i] = t.Clone()
  215. }
  216. c := &WindowTuples{
  217. Content: ts,
  218. WindowRange: w.WindowRange,
  219. AffiliateRow: w.AffiliateRow.Clone(),
  220. isAgg: w.isAgg,
  221. }
  222. return c
  223. }
  224. func (w *WindowTuples) ToAggMaps() []map[string]interface{} {
  225. return []map[string]interface{}{w.ToMap()}
  226. }
  227. func (w *WindowTuples) ToRowMaps() []map[string]interface{} {
  228. r := make([]map[string]interface{}, len(w.Content))
  229. for i, t := range w.Content {
  230. r[i] = t.ToMap()
  231. }
  232. return r
  233. }
  234. func (w *WindowTuples) ToMaps() []map[string]interface{} {
  235. if w.isAgg {
  236. return w.ToAggMaps()
  237. } else {
  238. return w.ToRowMaps()
  239. }
  240. }
  241. func (w *WindowTuples) Pick(allWildcard bool, cols [][]string, wildcardEmitters map[string]bool) {
  242. cols = w.AffiliateRow.Pick(cols)
  243. for i, t := range w.Content {
  244. tc := t.Clone()
  245. tc.Pick(allWildcard, cols, wildcardEmitters)
  246. w.Content[i] = tc
  247. }
  248. }
  249. func (w *WindowTuples) SetIsAgg(_ bool) {
  250. w.isAgg = true
  251. }
  252. func (s *JoinTuples) Len() int { return len(s.Content) }
  253. func (s *JoinTuples) Swap(i, j int) {
  254. s.cachedMap = nil
  255. s.Content[i], s.Content[j] = s.Content[j], s.Content[i]
  256. }
  257. func (s *JoinTuples) Index(i int) Row { return s.Content[i] }
  258. func (s *JoinTuples) AggregateEval(expr ast.Expr, v CallValuer) []interface{} {
  259. var result []interface{}
  260. for _, t := range s.Content {
  261. result = append(result, Eval(expr, MultiValuer(t, &WindowRangeValuer{WindowRange: s.WindowRange}, v, &WildcardValuer{t})))
  262. }
  263. return result
  264. }
  265. func (s *JoinTuples) GetWindowRange() *WindowRange {
  266. return s.WindowRange
  267. }
  268. func (s *JoinTuples) Range(f func(i int, r ReadonlyRow) (bool, error)) error {
  269. for i, r := range s.Content {
  270. b, e := f(i, r)
  271. if e != nil {
  272. return e
  273. }
  274. if !b {
  275. break
  276. }
  277. }
  278. return nil
  279. }
  280. func (s *JoinTuples) RangeSet(f func(i int, r Row) (bool, error)) error {
  281. for i, r := range s.Content {
  282. rc := r.Clone()
  283. b, e := f(i, rc)
  284. if e != nil {
  285. return e
  286. }
  287. if !b {
  288. break
  289. }
  290. s.Content[i] = rc.(*JoinTuple)
  291. }
  292. return nil
  293. }
  294. func (s *JoinTuples) GroupRange(f func(i int, aggRow CollectionRow) (bool, error)) error {
  295. _, err := f(0, s)
  296. return err
  297. }
  298. // Filter the tuples by the given predicate
  299. func (s *JoinTuples) Filter(indexes []int) Collection {
  300. newC := make([]*JoinTuple, 0, len(indexes))
  301. for _, i := range indexes {
  302. newC = append(newC, s.Content[i])
  303. }
  304. s.Content = newC
  305. s.cachedMap = nil
  306. return s
  307. }
  308. func (s *JoinTuples) Value(key, table string) (interface{}, bool) {
  309. r, ok := s.AffiliateRow.Value(key, table)
  310. if ok {
  311. return r, ok
  312. }
  313. return s.Content[0].Value(key, table)
  314. }
  315. func (s *JoinTuples) Meta(key, table string) (interface{}, bool) {
  316. return s.Content[0].Meta(key, table)
  317. }
  318. func (s *JoinTuples) All(_ string) (Message, bool) {
  319. return s.ToMap(), true
  320. }
  321. func (s *JoinTuples) ToMap() map[string]interface{} {
  322. if s.cachedMap == nil {
  323. m := make(map[string]interface{})
  324. for k, v := range s.Content[0].ToMap() {
  325. m[k] = v
  326. }
  327. s.cachedMap = m
  328. }
  329. s.AffiliateRow.MergeMap(s.cachedMap)
  330. return s.cachedMap
  331. }
  332. func (s *JoinTuples) Clone() Collection {
  333. ts := make([]*JoinTuple, len(s.Content))
  334. for i, t := range s.Content {
  335. ts[i] = t.Clone().(*JoinTuple)
  336. }
  337. c := &JoinTuples{
  338. Content: ts,
  339. WindowRange: s.WindowRange,
  340. AffiliateRow: s.AffiliateRow.Clone(),
  341. isAgg: s.isAgg,
  342. }
  343. return c
  344. }
  345. func (s *JoinTuples) ToAggMaps() []map[string]interface{} {
  346. return []map[string]interface{}{s.ToMap()}
  347. }
  348. func (s *JoinTuples) ToRowMaps() []map[string]interface{} {
  349. r := make([]map[string]interface{}, len(s.Content))
  350. for i, t := range s.Content {
  351. r[i] = t.ToMap()
  352. }
  353. return r
  354. }
  355. func (s *JoinTuples) ToMaps() []map[string]interface{} {
  356. if s.isAgg {
  357. return s.ToAggMaps()
  358. } else {
  359. return s.ToRowMaps()
  360. }
  361. }
  362. func (s *JoinTuples) Pick(allWildcard bool, cols [][]string, wildcardEmitters map[string]bool) {
  363. cols = s.AffiliateRow.Pick(cols)
  364. for i, t := range s.Content {
  365. tc := t.Clone().(*JoinTuple)
  366. tc.Pick(allWildcard, cols, wildcardEmitters)
  367. s.Content[i] = tc
  368. }
  369. }
  370. func (s *JoinTuples) SetIsAgg(_ bool) {
  371. s.isAgg = true
  372. }
  373. func (s *GroupedTuplesSet) Len() int { return len(s.Groups) }
  374. func (s *GroupedTuplesSet) Swap(i, j int) { s.Groups[i], s.Groups[j] = s.Groups[j], s.Groups[i] }
  375. func (s *GroupedTuplesSet) Index(i int) Row { return s.Groups[i] }
  376. func (s *GroupedTuplesSet) GetWindowRange() *WindowRange {
  377. return s.WindowRange
  378. }
  379. func (s *GroupedTuplesSet) Range(f func(i int, r ReadonlyRow) (bool, error)) error {
  380. for i, r := range s.Groups {
  381. b, e := f(i, r)
  382. if e != nil {
  383. return e
  384. }
  385. if !b {
  386. break
  387. }
  388. }
  389. return nil
  390. }
  391. func (s *GroupedTuplesSet) RangeSet(f func(i int, r Row) (bool, error)) error {
  392. for i, r := range s.Groups {
  393. rc := r.Clone()
  394. b, e := f(i, rc)
  395. if e != nil {
  396. return e
  397. }
  398. if !b {
  399. break
  400. }
  401. s.Groups[i] = rc.(*GroupedTuples)
  402. }
  403. return nil
  404. }
  405. func (s *GroupedTuplesSet) GroupRange(f func(i int, aggRow CollectionRow) (bool, error)) error {
  406. for i, r := range s.Groups {
  407. b, e := f(i, r)
  408. if e != nil {
  409. return e
  410. }
  411. if !b {
  412. break
  413. }
  414. }
  415. return nil
  416. }
  417. // Filter clone and return the filtered set
  418. func (s *GroupedTuplesSet) Filter(groups []int) Collection {
  419. newC := make([]*GroupedTuples, 0, len(groups))
  420. for _, i := range groups {
  421. newC = append(newC, s.Groups[i])
  422. }
  423. s.Groups = newC
  424. return s
  425. }
  426. func (s *GroupedTuplesSet) Clone() Collection {
  427. ng := make([]*GroupedTuples, len(s.Groups))
  428. for i, g := range s.Groups {
  429. ng[i] = g.Clone().(*GroupedTuples)
  430. }
  431. return &GroupedTuplesSet{
  432. Groups: ng,
  433. WindowRange: s.WindowRange,
  434. }
  435. }
  436. func (s *GroupedTuplesSet) ToMaps() []map[string]interface{} {
  437. r := make([]map[string]interface{}, len(s.Groups))
  438. for i, t := range s.Groups {
  439. r[i] = t.ToMap()
  440. }
  441. return r
  442. }
  443. /*
  444. * WindowRange definitions. It should be immutable
  445. */
  446. type WindowRangeValuer struct {
  447. *WindowRange
  448. }
  449. func (w WindowRangeValuer) Value(_, _ string) (interface{}, bool) {
  450. return nil, false
  451. }
  452. func (w WindowRangeValuer) Meta(_, _ string) (interface{}, bool) {
  453. return nil, false
  454. }
  455. type WindowRange struct {
  456. windowStart int64
  457. windowEnd int64
  458. }
  459. func NewWindowRange(windowStart int64, windowEnd int64) *WindowRange {
  460. return &WindowRange{windowStart, windowEnd}
  461. }
  462. func (r *WindowRange) FuncValue(key string) (interface{}, bool) {
  463. switch key {
  464. case "window_start":
  465. return r.windowStart, true
  466. case "window_end":
  467. return r.windowEnd, true
  468. default:
  469. return nil, false
  470. }
  471. }