row.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642
  1. // Copyright 2022-2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package xsql
  15. import (
  16. "strings"
  17. "sync"
  18. "github.com/lf-edge/ekuiper/internal/conf"
  19. "github.com/lf-edge/ekuiper/pkg/ast"
  20. )
  21. // The original message map may be big. Make sure it is immutable so that never make a copy of it.
  22. // The tuple clone should be cheap.
  23. /*
  24. * Interfaces definition
  25. */
  26. type Wildcarder interface {
  27. // All Value returns the value and existence flag for a given key.
  28. All(stream string) (map[string]interface{}, bool)
  29. }
  30. type Event interface {
  31. GetTimestamp() int64
  32. IsWatermark() bool
  33. }
  34. type ReadonlyRow interface {
  35. Valuer
  36. AliasValuer
  37. Wildcarder
  38. }
  39. type Row interface {
  40. ReadonlyRow
  41. // Del Only for some ops like functionOp * and Alias
  42. Del(col string)
  43. // Set Only for some ops like functionOp *
  44. Set(col string, value interface{})
  45. // ToMap converts the row to a map to export to other systems *
  46. ToMap() map[string]interface{}
  47. // Pick the columns and discard others. It replaces the underlying message with a new value. There are 3 types to pick: column, alias and annonymous expressions.
  48. // cols is a list [columnname, tablename]
  49. Pick(allWildcard bool, cols [][]string, wildcardEmitters map[string]bool, except []string)
  50. }
  51. type CloneAbleRow interface {
  52. Row
  53. // Clone when broadcast to make sure each row are dealt single threaded
  54. Clone() CloneAbleRow
  55. }
  56. // TupleRow is a mutable row. Function with * could modify the row.
  57. type TupleRow interface {
  58. CloneAbleRow
  59. // GetEmitter returns the emitter of the row
  60. GetEmitter() string
  61. }
  62. // CollectionRow is the aggregation row of a non-grouped collection. Thinks of it as a single group.
  63. // The row data is immutable
  64. type CollectionRow interface {
  65. Row
  66. AggregateData
  67. // Clone when broadcast to make sure each row are dealt single threaded
  68. // Clone() CollectionRow
  69. }
  70. // AffiliateRow part of other row types do help calculation of newly added cols
  71. type AffiliateRow struct {
  72. lock sync.RWMutex
  73. CalCols map[string]interface{} // mutable and must be cloned when broadcast
  74. AliasMap map[string]interface{}
  75. }
  76. func (d *AffiliateRow) AppendAlias(key string, value interface{}) bool {
  77. d.lock.Lock()
  78. defer d.lock.Unlock()
  79. if d.AliasMap == nil {
  80. d.AliasMap = make(map[string]interface{})
  81. }
  82. d.AliasMap[key] = value
  83. return true
  84. }
  85. func (d *AffiliateRow) AliasValue(key string) (interface{}, bool) {
  86. d.lock.RLock()
  87. defer d.lock.RUnlock()
  88. if d.AliasMap == nil {
  89. return nil, false
  90. }
  91. v, ok := d.AliasMap[key]
  92. return v, ok
  93. }
  94. func (d *AffiliateRow) Value(key, table string) (interface{}, bool) {
  95. d.lock.RLock()
  96. defer d.lock.RUnlock()
  97. if table == "" {
  98. r, ok := d.AliasValue(key)
  99. if ok {
  100. return r, ok
  101. }
  102. r, ok = d.CalCols[key]
  103. if ok {
  104. return r, ok
  105. }
  106. }
  107. return nil, false
  108. }
  109. func (d *AffiliateRow) Set(col string, value interface{}) {
  110. d.lock.Lock()
  111. defer d.lock.Unlock()
  112. if d.CalCols == nil {
  113. d.CalCols = make(map[string]interface{})
  114. }
  115. d.CalCols[col] = value
  116. }
  117. func (d *AffiliateRow) Del(col string) {
  118. d.lock.Lock()
  119. defer d.lock.Unlock()
  120. if d.CalCols != nil {
  121. delete(d.CalCols, col)
  122. }
  123. if d.AliasMap != nil {
  124. delete(d.AliasMap, col)
  125. }
  126. }
  127. func (d *AffiliateRow) Clone() AffiliateRow {
  128. d.lock.RLock()
  129. defer d.lock.RUnlock()
  130. nd := &AffiliateRow{}
  131. if d.CalCols != nil && len(d.CalCols) > 0 {
  132. nd.CalCols = make(map[string]interface{}, len(d.CalCols))
  133. for k, v := range d.CalCols {
  134. nd.CalCols[k] = v
  135. }
  136. }
  137. if d.AliasMap != nil && len(d.AliasMap) > 0 {
  138. nd.AliasMap = make(map[string]interface{}, len(d.AliasMap))
  139. for k, v := range d.AliasMap {
  140. nd.AliasMap[k] = v
  141. }
  142. }
  143. return *nd //nolint:govet
  144. }
  145. func (d *AffiliateRow) IsEmpty() bool {
  146. d.lock.RLock()
  147. defer d.lock.RUnlock()
  148. return len(d.CalCols) == 0 && len(d.AliasMap) == 0
  149. }
  150. func (d *AffiliateRow) MergeMap(cachedMap map[string]interface{}) {
  151. d.lock.RLock()
  152. defer d.lock.RUnlock()
  153. for k, v := range d.CalCols {
  154. // Do not write out the internal fields
  155. if !strings.HasPrefix(k, "$$") {
  156. cachedMap[k] = v
  157. }
  158. }
  159. for k, v := range d.AliasMap {
  160. cachedMap[k] = v
  161. }
  162. }
  163. func (d *AffiliateRow) Pick(cols [][]string) [][]string {
  164. d.lock.Lock()
  165. defer d.lock.Unlock()
  166. if len(cols) > 0 {
  167. newAliasMap := make(map[string]interface{})
  168. newCalCols := make(map[string]interface{})
  169. var newCols [][]string
  170. for _, a := range cols {
  171. if a[1] == "" || a[1] == string(ast.DefaultStream) {
  172. if v, ok := d.AliasMap[a[0]]; ok {
  173. newAliasMap[a[0]] = v
  174. continue
  175. }
  176. if v, ok := d.CalCols[a[0]]; ok {
  177. newCalCols[a[0]] = v
  178. continue
  179. }
  180. }
  181. newCols = append(newCols, a)
  182. }
  183. d.AliasMap = newAliasMap
  184. d.CalCols = newCalCols
  185. return newCols
  186. } else {
  187. d.AliasMap = nil
  188. d.CalCols = nil
  189. return cols
  190. }
  191. }
  192. /*
  193. * Message definition
  194. */
  195. // Message is a valuer that substitutes values for the mapped interface. It is the basic type for data events.
  196. type Message map[string]interface{}
  197. var _ Valuer = Message{}
  198. type Metadata Message
  199. // Alias will not need to convert cases
  200. type Alias struct {
  201. AliasMap map[string]interface{}
  202. }
  203. /*
  204. * All row types definitions, watermark, barrier
  205. */
  206. // Tuple The input row, produced by the source
  207. type Tuple struct {
  208. Emitter string
  209. Message Message // the original pointer is immutable & big; may be cloned
  210. Timestamp int64
  211. Metadata Metadata // immutable
  212. AffiliateRow
  213. lock sync.Mutex // lock for the cachedMap, because it is possible to access by multiple sinks
  214. cachedMap map[string]interface{} // clone of the row and cached for performance
  215. }
  216. var _ TupleRow = &Tuple{}
  217. type WatermarkTuple struct {
  218. Timestamp int64
  219. }
  220. func (t *WatermarkTuple) GetTimestamp() int64 {
  221. return t.Timestamp
  222. }
  223. func (t *WatermarkTuple) IsWatermark() bool {
  224. return true
  225. }
  226. // JoinTuple is a row produced by a join operation
  227. type JoinTuple struct {
  228. Tuples []TupleRow // The content is immutable, but the slice may be add or removed
  229. AffiliateRow
  230. lock sync.Mutex
  231. cachedMap map[string]interface{} // clone of the row and cached for performance of toMap
  232. }
  233. func (jt *JoinTuple) AggregateEval(expr ast.Expr, v CallValuer) []interface{} {
  234. return []interface{}{Eval(expr, MultiValuer(jt, v, &WildcardValuer{jt}))}
  235. }
  236. var _ TupleRow = &JoinTuple{}
  237. // GroupedTuples is a collection of tuples grouped by a key
  238. type GroupedTuples struct {
  239. Content []TupleRow
  240. *WindowRange
  241. AffiliateRow
  242. lock sync.Mutex
  243. cachedMap map[string]interface{} // clone of the row and cached for performance of toMap
  244. }
  245. var _ CollectionRow = &GroupedTuples{}
  246. /*
  247. * Implementations
  248. */
  249. func ToMessage(input interface{}) (Message, bool) {
  250. var result Message
  251. switch m := input.(type) {
  252. case Message:
  253. result = m
  254. case Metadata:
  255. result = Message(m)
  256. case map[string]interface{}:
  257. result = m
  258. default:
  259. return nil, false
  260. }
  261. return result, true
  262. }
  263. func (m Message) Value(key, _ string) (interface{}, bool) {
  264. if v, ok := m[key]; ok {
  265. return v, ok
  266. } else if conf.Config == nil || conf.Config.Basic.IgnoreCase {
  267. // Only when with 'SELECT * FROM ...' and 'schemaless', the key in map is not convert to lower case.
  268. // So all of keys in map should be convert to lowercase and then compare them.
  269. return m.getIgnoreCase(key)
  270. } else {
  271. return nil, false
  272. }
  273. }
  274. func (m Message) getIgnoreCase(key interface{}) (interface{}, bool) {
  275. if k, ok := key.(string); ok {
  276. for mk, v := range m {
  277. if strings.EqualFold(k, mk) {
  278. return v, true
  279. }
  280. }
  281. }
  282. return nil, false
  283. }
  284. func (m Message) Meta(key, table string) (interface{}, bool) {
  285. if key == "*" {
  286. return map[string]interface{}(m), true
  287. }
  288. return m.Value(key, table)
  289. }
  290. // MetaData implementation
  291. func (m Metadata) Value(key, table string) (interface{}, bool) {
  292. msg := Message(m)
  293. return msg.Value(key, table)
  294. }
  295. func (m Metadata) Meta(key, table string) (interface{}, bool) {
  296. if key == "*" {
  297. return map[string]interface{}(m), true
  298. }
  299. msg := Message(m)
  300. return msg.Meta(key, table)
  301. }
  302. // Tuple implementation
  303. func (t *Tuple) Value(key, table string) (interface{}, bool) {
  304. r, ok := t.AffiliateRow.Value(key, table)
  305. if ok {
  306. return r, ok
  307. }
  308. return t.Message.Value(key, table)
  309. }
  310. func (t *Tuple) All(string) (map[string]interface{}, bool) {
  311. return t.Message, true
  312. }
  313. func (t *Tuple) Clone() CloneAbleRow {
  314. return &Tuple{
  315. Emitter: t.Emitter,
  316. Timestamp: t.Timestamp,
  317. Message: t.Message,
  318. Metadata: t.Metadata,
  319. AffiliateRow: t.AffiliateRow.Clone(),
  320. }
  321. }
  322. // ToMap should only use in sink.
  323. func (t *Tuple) ToMap() map[string]interface{} {
  324. t.lock.Lock()
  325. defer t.lock.Unlock()
  326. if t.AffiliateRow.IsEmpty() {
  327. return t.Message
  328. }
  329. if t.cachedMap == nil { // clone the message
  330. m := make(map[string]interface{})
  331. for k, v := range t.Message {
  332. m[k] = v
  333. }
  334. t.cachedMap = m
  335. t.AffiliateRow.MergeMap(t.cachedMap)
  336. }
  337. return t.cachedMap
  338. }
  339. func (t *Tuple) Meta(key, table string) (interface{}, bool) {
  340. if key == "*" {
  341. return map[string]interface{}(t.Metadata), true
  342. }
  343. return t.Metadata.Value(key, table)
  344. }
  345. func (t *Tuple) GetEmitter() string {
  346. return t.Emitter
  347. }
  348. func (t *Tuple) AggregateEval(expr ast.Expr, v CallValuer) []interface{} {
  349. return []interface{}{Eval(expr, MultiValuer(t, v, &WildcardValuer{t}))}
  350. }
  351. func (t *Tuple) GetTimestamp() int64 {
  352. return t.Timestamp
  353. }
  354. func (t *Tuple) IsWatermark() bool {
  355. return false
  356. }
  357. func (t *Tuple) FuncValue(key string) (interface{}, bool) {
  358. switch key {
  359. case "event_time":
  360. return t.Timestamp, true
  361. default:
  362. return nil, false
  363. }
  364. }
  365. func (t *Tuple) Pick(allWildcard bool, cols [][]string, wildcardEmitters map[string]bool, except []string) {
  366. cols = t.AffiliateRow.Pick(cols)
  367. if !allWildcard && wildcardEmitters[t.Emitter] {
  368. allWildcard = true
  369. }
  370. if !allWildcard {
  371. if len(cols) > 0 {
  372. pickedMap := make(map[string]interface{})
  373. for _, colTab := range cols {
  374. if colTab[1] == "" || colTab[1] == string(ast.DefaultStream) || colTab[1] == t.Emitter {
  375. if v, ok := t.Message.Value(colTab[0], colTab[1]); ok {
  376. pickedMap[colTab[0]] = v
  377. }
  378. }
  379. }
  380. t.Message = pickedMap
  381. } else {
  382. t.Message = make(map[string]interface{})
  383. // invalidate cache, will calculate again
  384. t.cachedMap = nil
  385. }
  386. } else if len(except) > 0 {
  387. pickedMap := make(map[string]interface{})
  388. for key, mess := range t.Message {
  389. if !contains(except, key) {
  390. pickedMap[key] = mess
  391. }
  392. }
  393. t.Message = pickedMap
  394. }
  395. }
  396. // JoinTuple implementation
  397. func (jt *JoinTuple) AddTuple(tuple TupleRow) {
  398. jt.Tuples = append(jt.Tuples, tuple)
  399. }
  400. func (jt *JoinTuple) AddTuples(tuples []TupleRow) {
  401. for _, t := range tuples {
  402. jt.Tuples = append(jt.Tuples, t)
  403. }
  404. }
  405. func (jt *JoinTuple) doGetValue(key, table string, isVal bool) (interface{}, bool) {
  406. tuples := jt.Tuples
  407. if table == "" {
  408. if len(tuples) > 1 {
  409. for _, tuple := range tuples { // TODO support key without modifier?
  410. v, ok := getTupleValue(tuple, key, isVal)
  411. if ok {
  412. return v, ok
  413. }
  414. }
  415. conf.Log.Debugf("Wrong key: %s not found", key)
  416. return nil, false
  417. } else {
  418. return getTupleValue(tuples[0], key, isVal)
  419. }
  420. } else {
  421. // TODO should use hash here
  422. for _, tuple := range tuples {
  423. if tuple.GetEmitter() == table {
  424. return getTupleValue(tuple, key, isVal)
  425. }
  426. }
  427. return nil, false
  428. }
  429. }
  430. func getTupleValue(tuple Row, key string, isVal bool) (interface{}, bool) {
  431. if isVal {
  432. return tuple.Value(key, "")
  433. } else {
  434. return tuple.Meta(key, "")
  435. }
  436. }
  437. func (jt *JoinTuple) GetEmitter() string {
  438. return "$$JOIN"
  439. }
  440. func (jt *JoinTuple) Value(key, table string) (interface{}, bool) {
  441. r, ok := jt.AffiliateRow.Value(key, table)
  442. if ok {
  443. return r, ok
  444. }
  445. return jt.doGetValue(key, table, true)
  446. }
  447. func (jt *JoinTuple) Meta(key, table string) (interface{}, bool) {
  448. return jt.doGetValue(key, table, false)
  449. }
  450. func (jt *JoinTuple) All(stream string) (map[string]interface{}, bool) {
  451. if stream != "" {
  452. for _, t := range jt.Tuples {
  453. if t.GetEmitter() == stream {
  454. return t.All("")
  455. }
  456. }
  457. }
  458. result := make(map[string]interface{})
  459. for _, t := range jt.Tuples {
  460. if m, ok := t.All(""); ok {
  461. for k, v := range m {
  462. result[k] = v
  463. }
  464. }
  465. }
  466. return result, true
  467. }
  468. func (jt *JoinTuple) Clone() CloneAbleRow {
  469. ts := make([]TupleRow, len(jt.Tuples))
  470. for i, t := range jt.Tuples {
  471. ts[i] = t.Clone().(TupleRow)
  472. }
  473. c := &JoinTuple{
  474. Tuples: ts,
  475. AffiliateRow: jt.AffiliateRow.Clone(),
  476. }
  477. return c
  478. }
  479. func (jt *JoinTuple) ToMap() map[string]interface{} {
  480. jt.lock.Lock()
  481. defer jt.lock.Unlock()
  482. if jt.cachedMap == nil { // clone the message
  483. m := make(map[string]interface{})
  484. for i := len(jt.Tuples) - 1; i >= 0; i-- {
  485. for k, v := range jt.Tuples[i].ToMap() {
  486. m[k] = v
  487. }
  488. }
  489. jt.cachedMap = m
  490. jt.AffiliateRow.MergeMap(jt.cachedMap)
  491. }
  492. return jt.cachedMap
  493. }
  494. func (jt *JoinTuple) Pick(allWildcard bool, cols [][]string, wildcardEmitters map[string]bool, except []string) {
  495. cols = jt.AffiliateRow.Pick(cols)
  496. if !allWildcard {
  497. if len(cols) > 0 {
  498. for i, tuple := range jt.Tuples {
  499. if _, ok := wildcardEmitters[tuple.GetEmitter()]; ok {
  500. continue
  501. }
  502. nt := tuple.Clone().(TupleRow)
  503. nt.Pick(allWildcard, cols, wildcardEmitters, except)
  504. jt.Tuples[i] = nt
  505. }
  506. } else {
  507. jt.Tuples = jt.Tuples[:0]
  508. }
  509. }
  510. jt.cachedMap = nil
  511. }
  512. // GroupedTuple implementation
  513. func (s *GroupedTuples) AggregateEval(expr ast.Expr, v CallValuer) []interface{} {
  514. var result []interface{}
  515. for _, t := range s.Content {
  516. result = append(result, Eval(expr, MultiValuer(t, &WindowRangeValuer{WindowRange: s.WindowRange}, v, &WildcardValuer{t})))
  517. }
  518. return result
  519. }
  520. func (s *GroupedTuples) Value(key, table string) (interface{}, bool) {
  521. r, ok := s.AffiliateRow.Value(key, table)
  522. if ok {
  523. return r, ok
  524. }
  525. return s.Content[0].Value(key, table)
  526. }
  527. func (s *GroupedTuples) Meta(key, table string) (interface{}, bool) {
  528. return s.Content[0].Meta(key, table)
  529. }
  530. func (s *GroupedTuples) All(_ string) (map[string]interface{}, bool) {
  531. return s.Content[0].All("")
  532. }
  533. func (s *GroupedTuples) ToMap() map[string]interface{} {
  534. s.lock.Lock()
  535. defer s.lock.Unlock()
  536. if s.cachedMap == nil {
  537. m := make(map[string]interface{})
  538. for k, v := range s.Content[0].ToMap() {
  539. m[k] = v
  540. }
  541. s.cachedMap = m
  542. s.AffiliateRow.MergeMap(s.cachedMap)
  543. }
  544. return s.cachedMap
  545. }
  546. func (s *GroupedTuples) Clone() CloneAbleRow {
  547. ts := make([]TupleRow, len(s.Content))
  548. for i, t := range s.Content {
  549. ts[i] = t
  550. }
  551. c := &GroupedTuples{
  552. Content: ts,
  553. WindowRange: s.WindowRange,
  554. AffiliateRow: s.AffiliateRow.Clone(),
  555. }
  556. return c
  557. }
  558. func (s *GroupedTuples) Pick(allWildcard bool, cols [][]string, wildcardEmitters map[string]bool, except []string) {
  559. cols = s.AffiliateRow.Pick(cols)
  560. sc := s.Content[0].Clone().(TupleRow)
  561. sc.Pick(allWildcard, cols, wildcardEmitters, except)
  562. s.Content[0] = sc
  563. }