row.go 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369
  1. // Copyright 2022 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package xsql
  15. import (
  16. "github.com/lf-edge/ekuiper/internal/conf"
  17. "github.com/lf-edge/ekuiper/pkg/ast"
  18. "strings"
  19. )
  20. type Row interface {
  21. Valuer
  22. AliasValuer
  23. Wildcarder
  24. // Set Only for some ops like functionOp
  25. Set(col string, value interface{})
  26. // Clone when broadcast to make sure each row are dealt single threaded
  27. Clone() Row
  28. // ToMap converts the row to a map to export to other systems
  29. ToMap() map[string]interface{}
  30. }
  31. // Collection A collection of rows as a table. It is used for window, join, group by, etc.
  32. type Collection interface {
  33. Index(index int) Row
  34. Len() int
  35. }
  36. // Message is a valuer that substitutes values for the mapped interface. It is the basic type for data events.
  37. type Message map[string]interface{}
  38. var _ Valuer = Message{}
  39. type Metadata Message
  40. // Alias will not need to convert cases
  41. type Alias struct {
  42. AliasMap map[string]interface{}
  43. }
  44. // All rows definitions, watermark, barrier
  45. // Tuple The input row, produced by the source
  46. type Tuple struct {
  47. Emitter string
  48. Message Message // immutable
  49. Timestamp int64
  50. Metadata Metadata // immutable
  51. Alias
  52. }
  53. var _ Row = &Tuple{}
  54. // JoinTuple is a row produced by a join operation
  55. type JoinTuple struct {
  56. Tuples []Tuple
  57. Alias
  58. }
  59. var _ Row = &JoinTuple{}
  60. // GroupedTuples is a collection of tuples grouped by a key
  61. type GroupedTuples struct {
  62. Content []Row
  63. *WindowRange
  64. }
  65. var _ Row = &GroupedTuples{}
  66. // Message implementation
  67. func ToMessage(input interface{}) (Message, bool) {
  68. var result Message
  69. switch m := input.(type) {
  70. case Message:
  71. result = m
  72. case Metadata:
  73. result = Message(m)
  74. case map[string]interface{}:
  75. result = m
  76. default:
  77. return nil, false
  78. }
  79. return result, true
  80. }
  81. func (m Message) Value(key, _ string) (interface{}, bool) {
  82. if v, ok := m[key]; ok {
  83. return v, ok
  84. } else if conf.Config == nil || conf.Config.Basic.IgnoreCase {
  85. //Only when with 'SELECT * FROM ...' and 'schemaless', the key in map is not convert to lower case.
  86. //So all of keys in map should be convert to lowercase and then compare them.
  87. return m.getIgnoreCase(key)
  88. } else {
  89. return nil, false
  90. }
  91. }
  92. func (m Message) getIgnoreCase(key interface{}) (interface{}, bool) {
  93. if k, ok := key.(string); ok {
  94. for mk, v := range m {
  95. if strings.EqualFold(k, mk) {
  96. return v, true
  97. }
  98. }
  99. }
  100. return nil, false
  101. }
  102. func (m Message) Meta(key, table string) (interface{}, bool) {
  103. if key == "*" {
  104. return map[string]interface{}(m), true
  105. }
  106. return m.Value(key, table)
  107. }
  108. // MetaData implementation
  109. func (m Metadata) Value(key, table string) (interface{}, bool) {
  110. msg := Message(m)
  111. return msg.Value(key, table)
  112. }
  113. func (m Metadata) Meta(key, table string) (interface{}, bool) {
  114. if key == "*" {
  115. return map[string]interface{}(m), true
  116. }
  117. msg := Message(m)
  118. return msg.Meta(key, table)
  119. }
  120. // Alias implementation
  121. func (a *Alias) AppendAlias(key string, value interface{}) bool {
  122. if a.AliasMap == nil {
  123. a.AliasMap = make(map[string]interface{})
  124. }
  125. a.AliasMap[key] = value
  126. return true
  127. }
  128. func (a *Alias) AliasValue(key string) (interface{}, bool) {
  129. if a.AliasMap == nil {
  130. return nil, false
  131. }
  132. v, ok := a.AliasMap[key]
  133. return v, ok
  134. }
  135. // Tuple implementation
  136. func (t *Tuple) Value(key, table string) (interface{}, bool) {
  137. r, ok := t.AliasValue(key)
  138. if ok {
  139. return r, ok
  140. }
  141. return t.Message.Value(key, table)
  142. }
  143. func (t *Tuple) Meta(key, table string) (interface{}, bool) {
  144. if key == "*" {
  145. return map[string]interface{}(t.Metadata), true
  146. }
  147. return t.Metadata.Value(key, table)
  148. }
  149. func (t *Tuple) Set(col string, value interface{}) {
  150. //TODO implement me
  151. panic("implement me")
  152. }
  153. func (t *Tuple) Clone() Row {
  154. c := &Tuple{
  155. Emitter: t.Emitter,
  156. Timestamp: t.Timestamp,
  157. }
  158. if t.Message != nil {
  159. m := Message{}
  160. for k, v := range t.Message {
  161. m[k] = v
  162. }
  163. c.Message = m
  164. }
  165. if t.Metadata != nil {
  166. md := Metadata{}
  167. for k, v := range t.Metadata {
  168. md[k] = v
  169. }
  170. c.Metadata = md
  171. }
  172. return c
  173. }
  174. func (t *Tuple) ToMap() map[string]interface{} {
  175. //TODO implement me
  176. panic("implement me")
  177. }
  178. func (t *Tuple) All(string) (Message, bool) {
  179. return t.Message, true
  180. }
  181. func (t *Tuple) AggregateEval(expr ast.Expr, v CallValuer) []interface{} {
  182. return []interface{}{Eval(expr, MultiValuer(t, v, &WildcardValuer{t}))}
  183. }
  184. func (t *Tuple) GetTimestamp() int64 {
  185. return t.Timestamp
  186. }
  187. func (t *Tuple) GetMetadata() Metadata {
  188. return t.Metadata
  189. }
  190. func (t *Tuple) IsWatermark() bool {
  191. return false
  192. }
  193. // JoinTuple implementation
  194. func (jt *JoinTuple) AddTuple(tuple Tuple) {
  195. jt.Tuples = append(jt.Tuples, tuple)
  196. }
  197. func (jt *JoinTuple) AddTuples(tuples []Tuple) {
  198. for _, t := range tuples {
  199. jt.Tuples = append(jt.Tuples, t)
  200. }
  201. }
  202. func (jt *JoinTuple) doGetValue(key, table string, isVal bool) (interface{}, bool) {
  203. tuples := jt.Tuples
  204. if table == "" {
  205. if len(tuples) > 1 {
  206. for _, tuple := range tuples { //TODO support key without modifier?
  207. v, ok := getTupleValue(tuple, key, isVal)
  208. if ok {
  209. return v, ok
  210. }
  211. }
  212. conf.Log.Debugf("Wrong key: %s not found", key)
  213. return nil, false
  214. } else {
  215. return getTupleValue(tuples[0], key, isVal)
  216. }
  217. } else {
  218. //TODO should use hash here
  219. for _, tuple := range tuples {
  220. if tuple.Emitter == table {
  221. return getTupleValue(tuple, key, isVal)
  222. }
  223. }
  224. return nil, false
  225. }
  226. }
  227. func (jt *JoinTuple) Value(key, table string) (interface{}, bool) {
  228. r, ok := jt.AliasValue(key)
  229. if ok {
  230. return r, ok
  231. }
  232. return jt.doGetValue(key, table, true)
  233. }
  234. func (jt *JoinTuple) Meta(key, table string) (interface{}, bool) {
  235. return jt.doGetValue(key, table, false)
  236. }
  237. func (jt *JoinTuple) All(stream string) (Message, bool) {
  238. if stream != "" {
  239. for _, t := range jt.Tuples {
  240. if t.Emitter == stream {
  241. return t.Message, true
  242. }
  243. }
  244. } else {
  245. var r Message = make(map[string]interface{})
  246. for _, t := range jt.Tuples {
  247. for k, v := range t.Message {
  248. if _, ok := r[k]; !ok {
  249. r[k] = v
  250. }
  251. }
  252. }
  253. return r, true
  254. }
  255. return nil, false
  256. }
  257. func (jt *JoinTuple) Clone() Row {
  258. ts := make([]Tuple, len(jt.Tuples))
  259. for i, t := range jt.Tuples {
  260. ts[i] = *(t.Clone().(*Tuple))
  261. }
  262. return &JoinTuple{Tuples: ts}
  263. }
  264. func (jt *JoinTuple) Set(col string, value interface{}) {
  265. //TODO implement me
  266. panic("implement me")
  267. }
  268. func (jt *JoinTuple) ToMap() map[string]interface{} {
  269. //TODO implement me
  270. panic("implement me")
  271. }
  272. // GroupedTuple implementation
  273. func (s GroupedTuples) AggregateEval(expr ast.Expr, v CallValuer) []interface{} {
  274. var result []interface{}
  275. for _, t := range s.Content {
  276. result = append(result, Eval(expr, MultiValuer(t, &WindowRangeValuer{WindowRange: s.WindowRange}, v, &WildcardValuer{t})))
  277. }
  278. return result
  279. }
  280. func (s GroupedTuples) Value(key, table string) (interface{}, bool) {
  281. //TODO implement me
  282. panic("implement me")
  283. }
  284. func (s GroupedTuples) Meta(key, table string) (interface{}, bool) {
  285. //TODO implement me
  286. panic("implement me")
  287. }
  288. func (s GroupedTuples) AliasValue(name string) (interface{}, bool) {
  289. //TODO implement me
  290. panic("implement me")
  291. }
  292. func (s GroupedTuples) Set(col string, value interface{}) {
  293. //TODO implement me
  294. panic("implement me")
  295. }
  296. func (s GroupedTuples) AppendAlias(key string, value interface{}) bool {
  297. //TODO implement me
  298. panic("implement me")
  299. }
  300. func (s GroupedTuples) Clone() Row {
  301. //TODO implement me
  302. panic("implement me")
  303. }
  304. func (s GroupedTuples) ToMap() map[string]interface{} {
  305. //TODO implement me
  306. panic("implement me")
  307. }
  308. func (s GroupedTuples) All(stream string) (Message, bool) {
  309. //TODO implement me
  310. panic("implement me")
  311. }