row.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613
  1. // Copyright 2022-2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package xsql
  15. import (
  16. "strings"
  17. "sync"
  18. "github.com/lf-edge/ekuiper/internal/conf"
  19. "github.com/lf-edge/ekuiper/pkg/ast"
  20. )
  21. // The original message map may be big. Make sure it is immutable so that never make a copy of it.
  22. // The tuple clone should be cheap.
  23. /*
  24. * Interfaces definition
  25. */
  26. type Wildcarder interface {
  27. // All Value returns the value and existence flag for a given key.
  28. All(stream string) (Message, bool)
  29. }
  30. type Event interface {
  31. GetTimestamp() int64
  32. IsWatermark() bool
  33. }
  34. type ReadonlyRow interface {
  35. Valuer
  36. AliasValuer
  37. Wildcarder
  38. }
  39. type Row interface {
  40. ReadonlyRow
  41. // Del Only for some ops like functionOp * and Alias
  42. Del(col string)
  43. // Set Only for some ops like functionOp *
  44. Set(col string, value interface{})
  45. // ToMap converts the row to a map to export to other systems *
  46. ToMap() map[string]interface{}
  47. // Pick the columns and discard others. It replaces the underlying message with a new value. There are 3 types to pick: column, alias and annonymous expressions.
  48. // cols is a list [columnname, tablename]
  49. Pick(allWildcard bool, cols [][]string, wildcardEmitters map[string]bool)
  50. }
  51. type CloneAbleRow interface {
  52. Row
  53. // Clone when broadcast to make sure each row are dealt single threaded
  54. Clone() CloneAbleRow
  55. }
  56. // TupleRow is a mutable row. Function with * could modify the row.
  57. type TupleRow interface {
  58. CloneAbleRow
  59. // GetEmitter returns the emitter of the row
  60. GetEmitter() string
  61. }
  62. // CollectionRow is the aggregation row of a non-grouped collection. Thinks of it as a single group.
  63. // The row data is immutable
  64. type CollectionRow interface {
  65. Row
  66. AggregateData
  67. // Clone when broadcast to make sure each row are dealt single threaded
  68. // Clone() CollectionRow
  69. }
  70. // AffiliateRow part of other row types do help calculation of newly added cols
  71. type AffiliateRow struct {
  72. lock sync.RWMutex
  73. CalCols map[string]interface{} // mutable and must be cloned when broadcast
  74. AliasMap map[string]interface{}
  75. }
  76. func (d *AffiliateRow) AppendAlias(key string, value interface{}) bool {
  77. d.lock.Lock()
  78. defer d.lock.Unlock()
  79. if d.AliasMap == nil {
  80. d.AliasMap = make(map[string]interface{})
  81. }
  82. d.AliasMap[key] = value
  83. return true
  84. }
  85. func (d *AffiliateRow) AliasValue(key string) (interface{}, bool) {
  86. d.lock.RLock()
  87. defer d.lock.RUnlock()
  88. if d.AliasMap == nil {
  89. return nil, false
  90. }
  91. v, ok := d.AliasMap[key]
  92. return v, ok
  93. }
  94. func (d *AffiliateRow) Value(key, table string) (interface{}, bool) {
  95. d.lock.RLock()
  96. defer d.lock.RUnlock()
  97. if table == "" {
  98. r, ok := d.AliasValue(key)
  99. if ok {
  100. return r, ok
  101. }
  102. r, ok = d.CalCols[key]
  103. if ok {
  104. return r, ok
  105. }
  106. }
  107. return nil, false
  108. }
  109. func (d *AffiliateRow) Set(col string, value interface{}) {
  110. d.lock.Lock()
  111. defer d.lock.Unlock()
  112. if d.CalCols == nil {
  113. d.CalCols = make(map[string]interface{})
  114. }
  115. d.CalCols[col] = value
  116. }
  117. func (d *AffiliateRow) Del(col string) {
  118. d.lock.Lock()
  119. defer d.lock.Unlock()
  120. if d.CalCols != nil {
  121. delete(d.CalCols, col)
  122. }
  123. if d.AliasMap != nil {
  124. delete(d.AliasMap, col)
  125. }
  126. }
  127. func (d *AffiliateRow) Clone() AffiliateRow {
  128. d.lock.RLock()
  129. defer d.lock.RUnlock()
  130. nd := &AffiliateRow{}
  131. if d.CalCols != nil && len(d.CalCols) > 0 {
  132. nd.CalCols = make(map[string]interface{}, len(d.CalCols))
  133. for k, v := range d.CalCols {
  134. nd.CalCols[k] = v
  135. }
  136. }
  137. if d.AliasMap != nil && len(d.AliasMap) > 0 {
  138. nd.AliasMap = make(map[string]interface{}, len(d.AliasMap))
  139. for k, v := range d.AliasMap {
  140. nd.AliasMap[k] = v
  141. }
  142. }
  143. return *nd //nolint:govet
  144. }
  145. func (d *AffiliateRow) IsEmpty() bool {
  146. d.lock.RLock()
  147. defer d.lock.RUnlock()
  148. return len(d.CalCols) == 0 && len(d.AliasMap) == 0
  149. }
  150. func (d *AffiliateRow) MergeMap(cachedMap map[string]interface{}) {
  151. d.lock.RLock()
  152. defer d.lock.RUnlock()
  153. for k, v := range d.CalCols {
  154. // Do not write out the internal fields
  155. if !strings.HasPrefix(k, "$$") {
  156. cachedMap[k] = v
  157. }
  158. }
  159. for k, v := range d.AliasMap {
  160. cachedMap[k] = v
  161. }
  162. }
  163. func (d *AffiliateRow) Pick(cols [][]string) [][]string {
  164. d.lock.Lock()
  165. defer d.lock.Unlock()
  166. if len(cols) > 0 {
  167. newAliasMap := make(map[string]interface{})
  168. newCalCols := make(map[string]interface{})
  169. var newCols [][]string
  170. for _, a := range cols {
  171. if a[1] == "" || a[1] == string(ast.DefaultStream) {
  172. if v, ok := d.AliasMap[a[0]]; ok {
  173. newAliasMap[a[0]] = v
  174. continue
  175. }
  176. if v, ok := d.CalCols[a[0]]; ok {
  177. newCalCols[a[0]] = v
  178. continue
  179. }
  180. }
  181. newCols = append(newCols, a)
  182. }
  183. d.AliasMap = newAliasMap
  184. d.CalCols = newCalCols
  185. return newCols
  186. } else {
  187. d.AliasMap = nil
  188. d.CalCols = nil
  189. return cols
  190. }
  191. }
  192. /*
  193. * Message definition
  194. */
  195. // Message is a valuer that substitutes values for the mapped interface. It is the basic type for data events.
  196. type Message map[string]interface{}
  197. var _ Valuer = Message{}
  198. type Metadata Message
  199. // Alias will not need to convert cases
  200. type Alias struct {
  201. AliasMap map[string]interface{}
  202. }
  203. /*
  204. * All row types definitions, watermark, barrier
  205. */
  206. // Tuple The input row, produced by the source
  207. type Tuple struct {
  208. Emitter string
  209. Message Message // the original pointer is immutable & big; may be cloned
  210. Timestamp int64
  211. Metadata Metadata // immutable
  212. AffiliateRow
  213. lock sync.Mutex // lock for the cachedMap, because it is possible to access by multiple sinks
  214. cachedMap map[string]interface{} // clone of the row and cached for performance
  215. }
  216. var _ TupleRow = &Tuple{}
  217. // JoinTuple is a row produced by a join operation
  218. type JoinTuple struct {
  219. Tuples []TupleRow // The content is immutable, but the slice may be add or removed
  220. AffiliateRow
  221. lock sync.Mutex
  222. cachedMap map[string]interface{} // clone of the row and cached for performance of toMap
  223. }
  224. func (jt *JoinTuple) AggregateEval(expr ast.Expr, v CallValuer) []interface{} {
  225. return []interface{}{Eval(expr, MultiValuer(jt, v, &WildcardValuer{jt}))}
  226. }
  227. var _ TupleRow = &JoinTuple{}
  228. // GroupedTuples is a collection of tuples grouped by a key
  229. type GroupedTuples struct {
  230. Content []TupleRow
  231. *WindowRange
  232. AffiliateRow
  233. lock sync.Mutex
  234. cachedMap map[string]interface{} // clone of the row and cached for performance of toMap
  235. }
  236. var _ CollectionRow = &GroupedTuples{}
  237. /*
  238. * Implementations
  239. */
  240. func ToMessage(input interface{}) (Message, bool) {
  241. var result Message
  242. switch m := input.(type) {
  243. case Message:
  244. result = m
  245. case Metadata:
  246. result = Message(m)
  247. case map[string]interface{}:
  248. result = m
  249. default:
  250. return nil, false
  251. }
  252. return result, true
  253. }
  254. func (m Message) Value(key, _ string) (interface{}, bool) {
  255. if v, ok := m[key]; ok {
  256. return v, ok
  257. } else if conf.Config == nil || conf.Config.Basic.IgnoreCase {
  258. // Only when with 'SELECT * FROM ...' and 'schemaless', the key in map is not convert to lower case.
  259. // So all of keys in map should be convert to lowercase and then compare them.
  260. return m.getIgnoreCase(key)
  261. } else {
  262. return nil, false
  263. }
  264. }
  265. func (m Message) getIgnoreCase(key interface{}) (interface{}, bool) {
  266. if k, ok := key.(string); ok {
  267. for mk, v := range m {
  268. if strings.EqualFold(k, mk) {
  269. return v, true
  270. }
  271. }
  272. }
  273. return nil, false
  274. }
  275. func (m Message) Meta(key, table string) (interface{}, bool) {
  276. if key == "*" {
  277. return map[string]interface{}(m), true
  278. }
  279. return m.Value(key, table)
  280. }
  281. // MetaData implementation
  282. func (m Metadata) Value(key, table string) (interface{}, bool) {
  283. msg := Message(m)
  284. return msg.Value(key, table)
  285. }
  286. func (m Metadata) Meta(key, table string) (interface{}, bool) {
  287. if key == "*" {
  288. return map[string]interface{}(m), true
  289. }
  290. msg := Message(m)
  291. return msg.Meta(key, table)
  292. }
  293. // Tuple implementation
  294. func (t *Tuple) Value(key, table string) (interface{}, bool) {
  295. r, ok := t.AffiliateRow.Value(key, table)
  296. if ok {
  297. return r, ok
  298. }
  299. return t.Message.Value(key, table)
  300. }
  301. func (t *Tuple) All(string) (Message, bool) {
  302. return t.Message, true
  303. }
  304. func (t *Tuple) Clone() CloneAbleRow {
  305. return &Tuple{
  306. Emitter: t.Emitter,
  307. Timestamp: t.Timestamp,
  308. Message: t.Message,
  309. Metadata: t.Metadata,
  310. AffiliateRow: t.AffiliateRow.Clone(),
  311. }
  312. }
  313. // ToMap should only use in sink.
  314. func (t *Tuple) ToMap() map[string]interface{} {
  315. t.lock.Lock()
  316. defer t.lock.Unlock()
  317. if t.AffiliateRow.IsEmpty() {
  318. return t.Message
  319. }
  320. if t.cachedMap == nil { // clone the message
  321. m := make(map[string]interface{})
  322. for k, v := range t.Message {
  323. m[k] = v
  324. }
  325. t.cachedMap = m
  326. t.AffiliateRow.MergeMap(t.cachedMap)
  327. }
  328. return t.cachedMap
  329. }
  330. func (t *Tuple) Meta(key, table string) (interface{}, bool) {
  331. if key == "*" {
  332. return map[string]interface{}(t.Metadata), true
  333. }
  334. return t.Metadata.Value(key, table)
  335. }
  336. func (t *Tuple) GetEmitter() string {
  337. return t.Emitter
  338. }
  339. func (t *Tuple) AggregateEval(expr ast.Expr, v CallValuer) []interface{} {
  340. return []interface{}{Eval(expr, MultiValuer(t, v, &WildcardValuer{t}))}
  341. }
  342. func (t *Tuple) GetTimestamp() int64 {
  343. return t.Timestamp
  344. }
  345. func (t *Tuple) IsWatermark() bool {
  346. return false
  347. }
  348. func (t *Tuple) Pick(allWildcard bool, cols [][]string, wildcardEmitters map[string]bool) {
  349. cols = t.AffiliateRow.Pick(cols)
  350. if !allWildcard && wildcardEmitters[t.Emitter] {
  351. allWildcard = true
  352. }
  353. if !allWildcard {
  354. if len(cols) > 0 {
  355. pickedMap := make(map[string]interface{})
  356. for _, colTab := range cols {
  357. if colTab[1] == "" || colTab[1] == string(ast.DefaultStream) || colTab[1] == t.Emitter {
  358. if v, ok := t.Message.Value(colTab[0], colTab[1]); ok {
  359. pickedMap[colTab[0]] = v
  360. }
  361. }
  362. }
  363. t.Message = pickedMap
  364. } else {
  365. t.Message = make(map[string]interface{})
  366. // invalidate cache, will calculate again
  367. t.cachedMap = nil
  368. }
  369. }
  370. }
  371. // JoinTuple implementation
  372. func (jt *JoinTuple) AddTuple(tuple TupleRow) {
  373. jt.Tuples = append(jt.Tuples, tuple)
  374. }
  375. func (jt *JoinTuple) AddTuples(tuples []TupleRow) {
  376. for _, t := range tuples {
  377. jt.Tuples = append(jt.Tuples, t)
  378. }
  379. }
  380. func (jt *JoinTuple) doGetValue(key, table string, isVal bool) (interface{}, bool) {
  381. tuples := jt.Tuples
  382. if table == "" {
  383. if len(tuples) > 1 {
  384. for _, tuple := range tuples { // TODO support key without modifier?
  385. v, ok := getTupleValue(tuple, key, isVal)
  386. if ok {
  387. return v, ok
  388. }
  389. }
  390. conf.Log.Debugf("Wrong key: %s not found", key)
  391. return nil, false
  392. } else {
  393. return getTupleValue(tuples[0], key, isVal)
  394. }
  395. } else {
  396. // TODO should use hash here
  397. for _, tuple := range tuples {
  398. if tuple.GetEmitter() == table {
  399. return getTupleValue(tuple, key, isVal)
  400. }
  401. }
  402. return nil, false
  403. }
  404. }
  405. func getTupleValue(tuple Row, key string, isVal bool) (interface{}, bool) {
  406. if isVal {
  407. return tuple.Value(key, "")
  408. } else {
  409. return tuple.Meta(key, "")
  410. }
  411. }
  412. func (jt *JoinTuple) GetEmitter() string {
  413. return "$$JOIN"
  414. }
  415. func (jt *JoinTuple) Value(key, table string) (interface{}, bool) {
  416. r, ok := jt.AffiliateRow.Value(key, table)
  417. if ok {
  418. return r, ok
  419. }
  420. return jt.doGetValue(key, table, true)
  421. }
  422. func (jt *JoinTuple) Meta(key, table string) (interface{}, bool) {
  423. return jt.doGetValue(key, table, false)
  424. }
  425. func (jt *JoinTuple) All(stream string) (Message, bool) {
  426. if stream != "" {
  427. for _, t := range jt.Tuples {
  428. if t.GetEmitter() == stream {
  429. return t.All("")
  430. }
  431. }
  432. }
  433. result := make(map[string]interface{})
  434. for _, t := range jt.Tuples {
  435. if m, ok := t.All(""); ok {
  436. for k, v := range m {
  437. result[k] = v
  438. }
  439. }
  440. }
  441. return result, true
  442. }
  443. func (jt *JoinTuple) Clone() CloneAbleRow {
  444. ts := make([]TupleRow, len(jt.Tuples))
  445. for i, t := range jt.Tuples {
  446. ts[i] = t.Clone().(TupleRow)
  447. }
  448. c := &JoinTuple{
  449. Tuples: ts,
  450. AffiliateRow: jt.AffiliateRow.Clone(),
  451. }
  452. return c
  453. }
  454. func (jt *JoinTuple) ToMap() map[string]interface{} {
  455. jt.lock.Lock()
  456. defer jt.lock.Unlock()
  457. if jt.cachedMap == nil { // clone the message
  458. m := make(map[string]interface{})
  459. for i := len(jt.Tuples) - 1; i >= 0; i-- {
  460. for k, v := range jt.Tuples[i].ToMap() {
  461. m[k] = v
  462. }
  463. }
  464. jt.cachedMap = m
  465. jt.AffiliateRow.MergeMap(jt.cachedMap)
  466. }
  467. return jt.cachedMap
  468. }
  469. func (jt *JoinTuple) Pick(allWildcard bool, cols [][]string, wildcardEmitters map[string]bool) {
  470. cols = jt.AffiliateRow.Pick(cols)
  471. if !allWildcard {
  472. if len(cols) > 0 {
  473. for i, tuple := range jt.Tuples {
  474. if _, ok := wildcardEmitters[tuple.GetEmitter()]; ok {
  475. continue
  476. }
  477. nt := tuple.Clone().(TupleRow)
  478. nt.Pick(allWildcard, cols, wildcardEmitters)
  479. jt.Tuples[i] = nt
  480. }
  481. } else {
  482. jt.Tuples = jt.Tuples[:0]
  483. }
  484. }
  485. jt.cachedMap = nil
  486. }
  487. // GroupedTuple implementation
  488. func (s *GroupedTuples) AggregateEval(expr ast.Expr, v CallValuer) []interface{} {
  489. var result []interface{}
  490. for _, t := range s.Content {
  491. result = append(result, Eval(expr, MultiValuer(t, &WindowRangeValuer{WindowRange: s.WindowRange}, v, &WildcardValuer{t})))
  492. }
  493. return result
  494. }
  495. func (s *GroupedTuples) Value(key, table string) (interface{}, bool) {
  496. r, ok := s.AffiliateRow.Value(key, table)
  497. if ok {
  498. return r, ok
  499. }
  500. return s.Content[0].Value(key, table)
  501. }
  502. func (s *GroupedTuples) Meta(key, table string) (interface{}, bool) {
  503. return s.Content[0].Meta(key, table)
  504. }
  505. func (s *GroupedTuples) All(_ string) (Message, bool) {
  506. return s.Content[0].All("")
  507. }
  508. func (s *GroupedTuples) ToMap() map[string]interface{} {
  509. s.lock.Lock()
  510. defer s.lock.Unlock()
  511. if s.cachedMap == nil {
  512. m := make(map[string]interface{})
  513. for k, v := range s.Content[0].ToMap() {
  514. m[k] = v
  515. }
  516. s.cachedMap = m
  517. s.AffiliateRow.MergeMap(s.cachedMap)
  518. }
  519. return s.cachedMap
  520. }
  521. func (s *GroupedTuples) Clone() CloneAbleRow {
  522. ts := make([]TupleRow, len(s.Content))
  523. for i, t := range s.Content {
  524. ts[i] = t
  525. }
  526. c := &GroupedTuples{
  527. Content: ts,
  528. WindowRange: s.WindowRange,
  529. AffiliateRow: s.AffiliateRow.Clone(),
  530. }
  531. return c
  532. }
  533. func (s *GroupedTuples) Pick(allWildcard bool, cols [][]string, wildcardEmitters map[string]bool) {
  534. cols = s.AffiliateRow.Pick(cols)
  535. sc := s.Content[0].Clone().(TupleRow)
  536. sc.Pick(allWildcard, cols, wildcardEmitters)
  537. s.Content[0] = sc
  538. }