row.go 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384
  1. // Copyright 2022 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package xsql
  15. import (
  16. "github.com/lf-edge/ekuiper/internal/conf"
  17. "github.com/lf-edge/ekuiper/pkg/ast"
  18. "strings"
  19. )
  20. /*
  21. * Interfaces definition
  22. */
  23. // TODO how to be more efficient?
  24. type Wildcarder interface {
  25. // All Value returns the value and existence flag for a given key.
  26. All(stream string) (Message, bool)
  27. }
  28. type Event interface {
  29. GetTimestamp() int64
  30. IsWatermark() bool
  31. }
  32. type Row interface {
  33. Valuer
  34. AliasValuer
  35. Wildcarder
  36. }
  37. type TupleRow interface {
  38. Row
  39. // Set Only for some ops like functionOp
  40. Set(col string, value interface{})
  41. // GetEmitter returns the emitter of the row
  42. GetEmitter() string
  43. // Clone when broadcast to make sure each row are dealt single threaded
  44. Clone() TupleRow
  45. // ToMap converts the row to a map to export to other systems
  46. ToMap() map[string]interface{}
  47. }
  48. // CollectionRow is the aggregation row of a non-grouped collection. Thinks of it as a single group.
  49. // The row data is immutable
  50. type CollectionRow interface {
  51. Row
  52. AggregateData
  53. }
  54. /*
  55. * Message definition
  56. */
  57. // Message is a valuer that substitutes values for the mapped interface. It is the basic type for data events.
  58. type Message map[string]interface{}
  59. var _ Valuer = Message{}
  60. type Metadata Message
  61. // Alias will not need to convert cases
  62. type Alias struct {
  63. AliasMap map[string]interface{}
  64. }
  65. /*
  66. * All row types definitions, watermark, barrier
  67. */
  68. // Tuple The input row, produced by the source
  69. type Tuple struct {
  70. Emitter string
  71. Message Message // immutable
  72. Timestamp int64
  73. Metadata Metadata // immutable
  74. Alias
  75. }
  76. var _ TupleRow = &Tuple{}
  77. // JoinTuple is a row produced by a join operation
  78. type JoinTuple struct {
  79. Tuples []TupleRow
  80. Alias
  81. }
  82. var _ TupleRow = &JoinTuple{}
  83. // GroupedTuples is a collection of tuples grouped by a key
  84. type GroupedTuples struct {
  85. Content []TupleRow
  86. *WindowRange
  87. Alias
  88. }
  89. var _ CollectionRow = &GroupedTuples{}
  90. /*
  91. * Implementations
  92. */
  93. func ToMessage(input interface{}) (Message, bool) {
  94. var result Message
  95. switch m := input.(type) {
  96. case Message:
  97. result = m
  98. case Metadata:
  99. result = Message(m)
  100. case map[string]interface{}:
  101. result = m
  102. default:
  103. return nil, false
  104. }
  105. return result, true
  106. }
  107. func (m Message) Value(key, _ string) (interface{}, bool) {
  108. if v, ok := m[key]; ok {
  109. return v, ok
  110. } else if conf.Config == nil || conf.Config.Basic.IgnoreCase {
  111. //Only when with 'SELECT * FROM ...' and 'schemaless', the key in map is not convert to lower case.
  112. //So all of keys in map should be convert to lowercase and then compare them.
  113. return m.getIgnoreCase(key)
  114. } else {
  115. return nil, false
  116. }
  117. }
  118. func (m Message) getIgnoreCase(key interface{}) (interface{}, bool) {
  119. if k, ok := key.(string); ok {
  120. for mk, v := range m {
  121. if strings.EqualFold(k, mk) {
  122. return v, true
  123. }
  124. }
  125. }
  126. return nil, false
  127. }
  128. func (m Message) Meta(key, table string) (interface{}, bool) {
  129. if key == "*" {
  130. return map[string]interface{}(m), true
  131. }
  132. return m.Value(key, table)
  133. }
  134. // MetaData implementation
  135. func (m Metadata) Value(key, table string) (interface{}, bool) {
  136. msg := Message(m)
  137. return msg.Value(key, table)
  138. }
  139. func (m Metadata) Meta(key, table string) (interface{}, bool) {
  140. if key == "*" {
  141. return map[string]interface{}(m), true
  142. }
  143. msg := Message(m)
  144. return msg.Meta(key, table)
  145. }
  146. // Alias implementation
  147. func (a *Alias) AppendAlias(key string, value interface{}) bool {
  148. if a.AliasMap == nil {
  149. a.AliasMap = make(map[string]interface{})
  150. }
  151. a.AliasMap[key] = value
  152. return true
  153. }
  154. func (a *Alias) AliasValue(key string) (interface{}, bool) {
  155. if a.AliasMap == nil {
  156. return nil, false
  157. }
  158. v, ok := a.AliasMap[key]
  159. return v, ok
  160. }
  161. // Tuple implementation
  162. func (t *Tuple) Value(key, table string) (interface{}, bool) {
  163. r, ok := t.AliasValue(key)
  164. if ok {
  165. return r, ok
  166. }
  167. return t.Message.Value(key, table)
  168. }
  169. func (t *Tuple) Meta(key, table string) (interface{}, bool) {
  170. if key == "*" {
  171. return map[string]interface{}(t.Metadata), true
  172. }
  173. return t.Metadata.Value(key, table)
  174. }
  175. func (t *Tuple) Set(col string, value interface{}) {
  176. //TODO implement me
  177. panic("implement me")
  178. }
  179. func (t *Tuple) Clone() TupleRow {
  180. c := &Tuple{
  181. Emitter: t.Emitter,
  182. Timestamp: t.Timestamp,
  183. }
  184. if t.Message != nil {
  185. m := Message{}
  186. for k, v := range t.Message {
  187. m[k] = v
  188. }
  189. c.Message = m
  190. }
  191. if t.Metadata != nil {
  192. md := Metadata{}
  193. for k, v := range t.Metadata {
  194. md[k] = v
  195. }
  196. c.Metadata = md
  197. }
  198. return c
  199. }
  200. func (t *Tuple) ToMap() map[string]interface{} {
  201. return t.Message
  202. }
  203. func (t *Tuple) GetEmitter() string {
  204. return t.Emitter
  205. }
  206. func (t *Tuple) All(string) (Message, bool) {
  207. return t.Message, true
  208. }
  209. func (t *Tuple) AggregateEval(expr ast.Expr, v CallValuer) []interface{} {
  210. return []interface{}{Eval(expr, MultiValuer(t, v, &WildcardValuer{t}))}
  211. }
  212. func (t *Tuple) GetTimestamp() int64 {
  213. return t.Timestamp
  214. }
  215. func (t *Tuple) GetMetadata() Metadata {
  216. return t.Metadata
  217. }
  218. func (t *Tuple) IsWatermark() bool {
  219. return false
  220. }
  221. // JoinTuple implementation
  222. func (jt *JoinTuple) AddTuple(tuple TupleRow) {
  223. jt.Tuples = append(jt.Tuples, tuple)
  224. }
  225. func (jt *JoinTuple) AddTuples(tuples []TupleRow) {
  226. for _, t := range tuples {
  227. jt.Tuples = append(jt.Tuples, t)
  228. }
  229. }
  230. func (jt *JoinTuple) doGetValue(key, table string, isVal bool) (interface{}, bool) {
  231. tuples := jt.Tuples
  232. if table == "" {
  233. if len(tuples) > 1 {
  234. for _, tuple := range tuples { //TODO support key without modifier?
  235. v, ok := getTupleValue(tuple, key, isVal)
  236. if ok {
  237. return v, ok
  238. }
  239. }
  240. conf.Log.Debugf("Wrong key: %s not found", key)
  241. return nil, false
  242. } else {
  243. return getTupleValue(tuples[0], key, isVal)
  244. }
  245. } else {
  246. //TODO should use hash here
  247. for _, tuple := range tuples {
  248. if tuple.GetEmitter() == table {
  249. return getTupleValue(tuple, key, isVal)
  250. }
  251. }
  252. return nil, false
  253. }
  254. }
  255. func getTupleValue(tuple Row, key string, isVal bool) (interface{}, bool) {
  256. if isVal {
  257. return tuple.Value(key, "")
  258. } else {
  259. return tuple.Meta(key, "")
  260. }
  261. }
  262. func (jt *JoinTuple) Value(key, table string) (interface{}, bool) {
  263. r, ok := jt.AliasValue(key)
  264. if ok {
  265. return r, ok
  266. }
  267. return jt.doGetValue(key, table, true)
  268. }
  269. func (jt *JoinTuple) Meta(key, table string) (interface{}, bool) {
  270. return jt.doGetValue(key, table, false)
  271. }
  272. func (jt *JoinTuple) All(stream string) (Message, bool) {
  273. if stream != "" {
  274. for _, t := range jt.Tuples {
  275. if t.GetEmitter() == stream {
  276. return t.ToMap(), true
  277. }
  278. }
  279. } else {
  280. return jt.ToMap(), true
  281. }
  282. return nil, false
  283. }
  284. // TODO deal with cascade
  285. func (jt *JoinTuple) Clone() TupleRow {
  286. ts := make([]TupleRow, len(jt.Tuples))
  287. for i, t := range jt.Tuples {
  288. ts[i] = t.Clone().(TupleRow)
  289. }
  290. return &JoinTuple{Tuples: ts}
  291. }
  292. func (jt *JoinTuple) Set(col string, value interface{}) {
  293. //TODO implement me
  294. panic("implement me")
  295. }
  296. func (jt *JoinTuple) ToMap() map[string]interface{} {
  297. m := make(map[string]interface{})
  298. for i := len(jt.Tuples) - 1; i >= 0; i-- {
  299. for k, v := range jt.Tuples[i].ToMap() {
  300. m[k] = v
  301. }
  302. }
  303. return m
  304. }
  305. func (jt *JoinTuple) GetEmitter() string {
  306. return "$$JOIN"
  307. }
  308. // GroupedTuple implementation
  309. func (s *GroupedTuples) AggregateEval(expr ast.Expr, v CallValuer) []interface{} {
  310. var result []interface{}
  311. for _, t := range s.Content {
  312. result = append(result, Eval(expr, MultiValuer(t, &WindowRangeValuer{WindowRange: s.WindowRange}, v, &WildcardValuer{t})))
  313. }
  314. return result
  315. }
  316. func (s *GroupedTuples) Value(key, table string) (interface{}, bool) {
  317. return s.Content[0].Value(key, table)
  318. }
  319. func (s *GroupedTuples) Meta(key, table string) (interface{}, bool) {
  320. return s.Content[0].Meta(key, table)
  321. }
  322. func (s *GroupedTuples) All(stream string) (Message, bool) {
  323. return s.Content[0].All(stream)
  324. }