row.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475
  1. // Copyright 2022 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package xsql
  15. import (
  16. "github.com/lf-edge/ekuiper/internal/conf"
  17. "github.com/lf-edge/ekuiper/pkg/ast"
  18. "strings"
  19. )
  20. /*
  21. * Interfaces definition
  22. */
  23. type Wildcarder interface {
  24. // All Value returns the value and existence flag for a given key.
  25. All(stream string) (Message, bool)
  26. }
  27. type Event interface {
  28. GetTimestamp() int64
  29. IsWatermark() bool
  30. }
  31. type Row interface {
  32. Valuer
  33. AliasValuer
  34. Wildcarder
  35. // Set Only for some ops like functionOp *
  36. Set(col string, value interface{})
  37. // ToMap converts the row to a map to export to other systems *
  38. ToMap() map[string]interface{}
  39. }
  40. // TupleRow is a mutable row. Function with * could modify the row.
  41. type TupleRow interface {
  42. Row
  43. // GetEmitter returns the emitter of the row
  44. GetEmitter() string
  45. // Clone when broadcast to make sure each row are dealt single threaded
  46. Clone() TupleRow
  47. }
  48. // CollectionRow is the aggregation row of a non-grouped collection. Thinks of it as a single group.
  49. // The row data is immutable
  50. type CollectionRow interface {
  51. Row
  52. AggregateData
  53. // Clone when broadcast to make sure each row are dealt single threaded
  54. //Clone() CollectionRow
  55. }
  56. // AffiliateRow part of other row types do help calculation of newly added cols
  57. type AffiliateRow struct {
  58. CalCols map[string]interface{} // mutable and must be cloned when broadcast
  59. Alias
  60. }
  61. func (d *AffiliateRow) Value(key, table string) (interface{}, bool) {
  62. if table == "" {
  63. r, ok := d.AliasValue(key)
  64. if ok {
  65. return r, ok
  66. }
  67. r, ok = d.CalCols[key]
  68. if ok {
  69. return r, ok
  70. }
  71. }
  72. return nil, false
  73. }
  74. func (d *AffiliateRow) Set(col string, value interface{}) {
  75. if d.CalCols == nil {
  76. d.CalCols = make(map[string]interface{})
  77. }
  78. d.CalCols[col] = value
  79. }
  80. func (d *AffiliateRow) Clone() AffiliateRow {
  81. nd := &AffiliateRow{}
  82. if d.CalCols != nil && len(d.CalCols) > 0 {
  83. nd.CalCols = make(map[string]interface{}, len(d.CalCols))
  84. for k, v := range d.CalCols {
  85. nd.CalCols[k] = v
  86. }
  87. }
  88. if d.AliasMap != nil && len(d.AliasMap) > 0 {
  89. nd.AliasMap = make(map[string]interface{}, len(d.AliasMap))
  90. for k, v := range d.AliasMap {
  91. nd.AliasMap[k] = v
  92. }
  93. }
  94. return *nd
  95. }
  96. func (d *AffiliateRow) IsEmpty() bool {
  97. return len(d.CalCols) == 0 && len(d.AliasMap) == 0
  98. }
  99. func (d *AffiliateRow) MergeMap(cachedMap map[string]interface{}) {
  100. for k, v := range d.CalCols {
  101. cachedMap[k] = v
  102. }
  103. for k, v := range d.AliasMap {
  104. cachedMap[k] = v
  105. }
  106. }
  107. /*
  108. * Message definition
  109. */
  110. // Message is a valuer that substitutes values for the mapped interface. It is the basic type for data events.
  111. type Message map[string]interface{}
  112. var _ Valuer = Message{}
  113. type Metadata Message
  114. // Alias will not need to convert cases
  115. type Alias struct {
  116. AliasMap map[string]interface{}
  117. }
  118. /*
  119. * All row types definitions, watermark, barrier
  120. */
  121. // Tuple The input row, produced by the source
  122. type Tuple struct {
  123. Emitter string
  124. Message Message // the original pointer is immutable & big; may be cloned
  125. Timestamp int64
  126. Metadata Metadata // immutable
  127. AffiliateRow
  128. cachedMap map[string]interface{} // clone of the row and cached for performance
  129. }
  130. var _ TupleRow = &Tuple{}
  131. // JoinTuple is a row produced by a join operation
  132. type JoinTuple struct {
  133. Tuples []TupleRow // The content is immutable, but the slice may be add or removed
  134. AffiliateRow
  135. cachedMap map[string]interface{} // clone of the row and cached for performance of toMap
  136. }
  137. func (jt *JoinTuple) AggregateEval(expr ast.Expr, v CallValuer) []interface{} {
  138. return []interface{}{Eval(expr, MultiValuer(jt, v, &WildcardValuer{jt}))}
  139. }
  140. var _ TupleRow = &JoinTuple{}
  141. // GroupedTuples is a collection of tuples grouped by a key
  142. type GroupedTuples struct {
  143. Content []TupleRow
  144. *WindowRange
  145. AffiliateRow
  146. cachedMap map[string]interface{} // clone of the row and cached for performance of toMap
  147. }
  148. var _ CollectionRow = &GroupedTuples{}
  149. /*
  150. * Implementations
  151. */
  152. func ToMessage(input interface{}) (Message, bool) {
  153. var result Message
  154. switch m := input.(type) {
  155. case Message:
  156. result = m
  157. case Metadata:
  158. result = Message(m)
  159. case map[string]interface{}:
  160. result = m
  161. default:
  162. return nil, false
  163. }
  164. return result, true
  165. }
  166. func (m Message) Value(key, _ string) (interface{}, bool) {
  167. if v, ok := m[key]; ok {
  168. return v, ok
  169. } else if conf.Config == nil || conf.Config.Basic.IgnoreCase {
  170. //Only when with 'SELECT * FROM ...' and 'schemaless', the key in map is not convert to lower case.
  171. //So all of keys in map should be convert to lowercase and then compare them.
  172. return m.getIgnoreCase(key)
  173. } else {
  174. return nil, false
  175. }
  176. }
  177. func (m Message) getIgnoreCase(key interface{}) (interface{}, bool) {
  178. if k, ok := key.(string); ok {
  179. for mk, v := range m {
  180. if strings.EqualFold(k, mk) {
  181. return v, true
  182. }
  183. }
  184. }
  185. return nil, false
  186. }
  187. func (m Message) Meta(key, table string) (interface{}, bool) {
  188. if key == "*" {
  189. return map[string]interface{}(m), true
  190. }
  191. return m.Value(key, table)
  192. }
  193. // MetaData implementation
  194. func (m Metadata) Value(key, table string) (interface{}, bool) {
  195. msg := Message(m)
  196. return msg.Value(key, table)
  197. }
  198. func (m Metadata) Meta(key, table string) (interface{}, bool) {
  199. if key == "*" {
  200. return map[string]interface{}(m), true
  201. }
  202. msg := Message(m)
  203. return msg.Meta(key, table)
  204. }
  205. // Alias implementation
  206. func (a *Alias) AppendAlias(key string, value interface{}) bool {
  207. if a.AliasMap == nil {
  208. a.AliasMap = make(map[string]interface{})
  209. }
  210. a.AliasMap[key] = value
  211. return true
  212. }
  213. func (a *Alias) AliasValue(key string) (interface{}, bool) {
  214. if a.AliasMap == nil {
  215. return nil, false
  216. }
  217. v, ok := a.AliasMap[key]
  218. return v, ok
  219. }
  220. // Tuple implementation
  221. func (t *Tuple) Value(key, table string) (interface{}, bool) {
  222. r, ok := t.AffiliateRow.Value(key, table)
  223. if ok {
  224. return r, ok
  225. }
  226. return t.Message.Value(key, table)
  227. }
  228. func (t *Tuple) All(string) (Message, bool) {
  229. return t.ToMap(), true
  230. }
  231. func (t *Tuple) Clone() TupleRow {
  232. return &Tuple{
  233. Emitter: t.Emitter,
  234. Timestamp: t.Timestamp,
  235. Message: t.Message,
  236. Metadata: t.Metadata,
  237. AffiliateRow: t.AffiliateRow.Clone(),
  238. }
  239. }
  240. // ToMap should only use in sink.
  241. func (t *Tuple) ToMap() map[string]interface{} {
  242. if t.AffiliateRow.IsEmpty() {
  243. return t.Message
  244. }
  245. if t.cachedMap == nil { // clone the message
  246. m := make(map[string]interface{})
  247. for k, v := range t.Message {
  248. m[k] = v
  249. }
  250. t.cachedMap = m
  251. t.Message = t.cachedMap
  252. }
  253. t.AffiliateRow.MergeMap(t.cachedMap)
  254. return t.cachedMap
  255. }
  256. func (t *Tuple) Meta(key, table string) (interface{}, bool) {
  257. if key == "*" {
  258. return map[string]interface{}(t.Metadata), true
  259. }
  260. return t.Metadata.Value(key, table)
  261. }
  262. func (t *Tuple) GetEmitter() string {
  263. return t.Emitter
  264. }
  265. func (t *Tuple) AggregateEval(expr ast.Expr, v CallValuer) []interface{} {
  266. return []interface{}{Eval(expr, MultiValuer(t, v, &WildcardValuer{t}))}
  267. }
  268. func (t *Tuple) GetTimestamp() int64 {
  269. return t.Timestamp
  270. }
  271. func (t *Tuple) IsWatermark() bool {
  272. return false
  273. }
  274. // JoinTuple implementation
  275. func (jt *JoinTuple) AddTuple(tuple TupleRow) {
  276. jt.Tuples = append(jt.Tuples, tuple)
  277. }
  278. func (jt *JoinTuple) AddTuples(tuples []TupleRow) {
  279. for _, t := range tuples {
  280. jt.Tuples = append(jt.Tuples, t)
  281. }
  282. }
  283. func (jt *JoinTuple) doGetValue(key, table string, isVal bool) (interface{}, bool) {
  284. tuples := jt.Tuples
  285. if table == "" {
  286. if len(tuples) > 1 {
  287. for _, tuple := range tuples { //TODO support key without modifier?
  288. v, ok := getTupleValue(tuple, key, isVal)
  289. if ok {
  290. return v, ok
  291. }
  292. }
  293. conf.Log.Debugf("Wrong key: %s not found", key)
  294. return nil, false
  295. } else {
  296. return getTupleValue(tuples[0], key, isVal)
  297. }
  298. } else {
  299. //TODO should use hash here
  300. for _, tuple := range tuples {
  301. if tuple.GetEmitter() == table {
  302. return getTupleValue(tuple, key, isVal)
  303. }
  304. }
  305. return nil, false
  306. }
  307. }
  308. func getTupleValue(tuple Row, key string, isVal bool) (interface{}, bool) {
  309. if isVal {
  310. return tuple.Value(key, "")
  311. } else {
  312. return tuple.Meta(key, "")
  313. }
  314. }
  315. func (jt *JoinTuple) GetEmitter() string {
  316. return "$$JOIN"
  317. }
  318. func (jt *JoinTuple) Value(key, table string) (interface{}, bool) {
  319. r, ok := jt.AffiliateRow.Value(key, table)
  320. if ok {
  321. return r, ok
  322. }
  323. return jt.doGetValue(key, table, true)
  324. }
  325. func (jt *JoinTuple) Meta(key, table string) (interface{}, bool) {
  326. return jt.doGetValue(key, table, false)
  327. }
  328. func (jt *JoinTuple) All(stream string) (Message, bool) {
  329. if stream != "" {
  330. for _, t := range jt.Tuples {
  331. if t.GetEmitter() == stream {
  332. return t.ToMap(), true
  333. }
  334. }
  335. } else {
  336. return jt.ToMap(), true
  337. }
  338. return nil, false
  339. }
  340. func (jt *JoinTuple) Clone() TupleRow {
  341. ts := make([]TupleRow, len(jt.Tuples))
  342. for i, t := range jt.Tuples {
  343. ts[i] = t
  344. }
  345. c := &JoinTuple{
  346. Tuples: ts,
  347. AffiliateRow: jt.AffiliateRow.Clone(),
  348. }
  349. return c
  350. }
  351. func (jt *JoinTuple) ToMap() map[string]interface{} {
  352. if jt.cachedMap == nil { // clone the message
  353. m := make(map[string]interface{})
  354. for i := len(jt.Tuples) - 1; i >= 0; i-- {
  355. for k, v := range jt.Tuples[i].ToMap() {
  356. m[k] = v
  357. }
  358. }
  359. jt.cachedMap = m
  360. }
  361. jt.AffiliateRow.MergeMap(jt.cachedMap)
  362. return jt.cachedMap
  363. }
  364. // GroupedTuple implementation
  365. func (s *GroupedTuples) AggregateEval(expr ast.Expr, v CallValuer) []interface{} {
  366. var result []interface{}
  367. for _, t := range s.Content {
  368. result = append(result, Eval(expr, MultiValuer(t, &WindowRangeValuer{WindowRange: s.WindowRange}, v, &WildcardValuer{t})))
  369. }
  370. return result
  371. }
  372. func (s *GroupedTuples) Value(key, table string) (interface{}, bool) {
  373. r, ok := s.AffiliateRow.Value(key, table)
  374. if ok {
  375. return r, ok
  376. }
  377. return s.Content[0].Value(key, table)
  378. }
  379. func (s *GroupedTuples) Meta(key, table string) (interface{}, bool) {
  380. return s.Content[0].Meta(key, table)
  381. }
  382. func (s *GroupedTuples) All(_ string) (Message, bool) {
  383. return s.ToMap(), true
  384. }
  385. func (s *GroupedTuples) ToMap() map[string]interface{} {
  386. if s.cachedMap == nil {
  387. m := make(map[string]interface{})
  388. for k, v := range s.Content[0].ToMap() {
  389. m[k] = v
  390. }
  391. s.cachedMap = m
  392. }
  393. s.AffiliateRow.MergeMap(s.cachedMap)
  394. return s.cachedMap
  395. }
  396. func (s *GroupedTuples) Clone() CollectionRow {
  397. ts := make([]TupleRow, len(s.Content))
  398. for i, t := range s.Content {
  399. ts[i] = t
  400. }
  401. c := &GroupedTuples{
  402. Content: ts,
  403. WindowRange: s.WindowRange,
  404. AffiliateRow: s.AffiliateRow.Clone(),
  405. }
  406. return c
  407. }