sql_test.go 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310
  1. // Copyright 2022 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package main
  15. import (
  16. "database/sql"
  17. "database/sql/driver"
  18. "fmt"
  19. econf "github.com/lf-edge/ekuiper/internal/conf"
  20. "github.com/lf-edge/ekuiper/internal/topo/context"
  21. "os"
  22. "reflect"
  23. "testing"
  24. )
  25. func TestSingle(t *testing.T) {
  26. db, err := sql.Open("sqlite3", "file:test.db")
  27. if err != nil {
  28. t.Error(err)
  29. return
  30. }
  31. contextLogger := econf.Log.WithField("rule", "test")
  32. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  33. s := &sqlSink{}
  34. defer func() {
  35. db.Close()
  36. s.Close(ctx)
  37. err := os.Remove("test.db")
  38. if err != nil {
  39. fmt.Println(err)
  40. }
  41. }()
  42. _, err = db.Exec("CREATE TABLE IF NOT EXISTS single (id BIGINT PRIMARY KEY, name TEXT NOT NULL, address varchar(20), mobile varchar(20))")
  43. if err != nil {
  44. panic(err)
  45. }
  46. err = s.Configure(map[string]interface{}{
  47. "url": "sqlite://test.db",
  48. "table": "single",
  49. })
  50. if err != nil {
  51. t.Error(err)
  52. return
  53. }
  54. err = s.Open(ctx)
  55. if err != nil {
  56. t.Error(err)
  57. return
  58. }
  59. var data = []map[string]interface{}{
  60. {"id": 1, "name": "John", "address": "343", "mobile": "334433"},
  61. {"id": 2, "name": "Susan", "address": "34", "mobile": "334433"},
  62. {"id": 3, "name": "Susan", "address": "34", "mobile": "334433"},
  63. }
  64. for _, d := range data {
  65. err = s.Collect(ctx, d)
  66. if err != nil {
  67. t.Error(err)
  68. return
  69. }
  70. }
  71. s.Close(ctx)
  72. rows, err := db.Query("SELECT * FROM single")
  73. if err != nil {
  74. t.Error(err)
  75. return
  76. }
  77. act, _ := rowsToMap(rows)
  78. exp := []map[string]interface{}{
  79. {"id": int64(1), "name": "John", "address": "343", "mobile": "334433"},
  80. {"id": int64(2), "name": "Susan", "address": "34", "mobile": "334433"},
  81. {"id": int64(3), "name": "Susan", "address": "34", "mobile": "334433"},
  82. }
  83. if !reflect.DeepEqual(act, exp) {
  84. t.Errorf("Expect %v but got %v", exp, act)
  85. }
  86. }
  87. func TestBatch(t *testing.T) {
  88. db, err := sql.Open("sqlite3", "file:test.db")
  89. if err != nil {
  90. t.Error(err)
  91. return
  92. }
  93. contextLogger := econf.Log.WithField("rule", "test")
  94. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  95. s := &sqlSink{}
  96. defer func() {
  97. db.Close()
  98. s.Close(ctx)
  99. err := os.Remove("test.db")
  100. if err != nil {
  101. fmt.Println(err)
  102. }
  103. }()
  104. _, err = db.Exec("CREATE TABLE IF NOT EXISTS batch (id BIGINT PRIMARY KEY, name TEXT NOT NULL)")
  105. if err != nil {
  106. panic(err)
  107. }
  108. err = s.Configure(map[string]interface{}{
  109. "url": "sqlite://test.db",
  110. "table": "batch",
  111. "fields": []string{"id", "name"},
  112. })
  113. if err != nil {
  114. t.Error(err)
  115. return
  116. }
  117. err = s.Open(ctx)
  118. if err != nil {
  119. t.Error(err)
  120. return
  121. }
  122. var data = []map[string]interface{}{
  123. {"id": 1, "name": "John", "address": "343", "mobile": "334433"},
  124. {"id": 2, "name": "Susan", "address": "34", "mobile": "334433"},
  125. {"id": 3, "name": "Susan", "address": "34", "mobile": "334433"},
  126. }
  127. err = s.Collect(ctx, data)
  128. if err != nil {
  129. t.Error(err)
  130. return
  131. }
  132. s.Close(ctx)
  133. rows, err := db.Query("SELECT * FROM batch")
  134. if err != nil {
  135. t.Error(err)
  136. return
  137. }
  138. act, _ := rowsToMap(rows)
  139. exp := []map[string]interface{}{
  140. {"id": int64(1), "name": "John"},
  141. {"id": int64(2), "name": "Susan"},
  142. {"id": int64(3), "name": "Susan"},
  143. }
  144. if !reflect.DeepEqual(act, exp) {
  145. t.Errorf("Expect %v but got %v", exp, act)
  146. }
  147. }
  148. func TestUpdate(t *testing.T) {
  149. db, err := sql.Open("sqlite3", "file:test.db")
  150. if err != nil {
  151. t.Error(err)
  152. return
  153. }
  154. contextLogger := econf.Log.WithField("rule", "test")
  155. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  156. s := &sqlSink{}
  157. defer func() {
  158. db.Close()
  159. s.Close(ctx)
  160. err := os.Remove("test.db")
  161. if err != nil {
  162. fmt.Println(err)
  163. }
  164. }()
  165. _, err = db.Exec("CREATE TABLE IF NOT EXISTS updateTable (id BIGINT PRIMARY KEY, name TEXT NOT NULL)")
  166. if err != nil {
  167. panic(err)
  168. }
  169. err = s.Configure(map[string]interface{}{
  170. "url": "sqlite://test.db",
  171. "table": "updateTable",
  172. "rowkindField": "action",
  173. "keyField": "id",
  174. "fields": []string{"id", "name"},
  175. })
  176. if err != nil {
  177. t.Error(err)
  178. return
  179. }
  180. err = s.Open(ctx)
  181. if err != nil {
  182. t.Error(err)
  183. return
  184. }
  185. var test = []struct {
  186. d []map[string]interface{}
  187. b bool
  188. r []map[string]interface{}
  189. }{
  190. {
  191. d: []map[string]interface{}{
  192. {"id": 1, "name": "John", "address": "343", "mobile": "334433"},
  193. {"action": "insert", "id": 2, "name": "Susan", "address": "34", "mobile": "334433"},
  194. {"action": "update", "id": 2, "name": "Diana"},
  195. },
  196. b: true,
  197. r: []map[string]interface{}{
  198. {"id": int64(1), "name": "John"},
  199. {"id": int64(2), "name": "Diana"},
  200. },
  201. }, {
  202. d: []map[string]interface{}{
  203. {"id": 4, "name": "Charles", "address": "343", "mobile": "334433"},
  204. {"action": "delete", "id": 2},
  205. {"action": "update", "id": 1, "name": "Lizz"},
  206. },
  207. b: false,
  208. r: []map[string]interface{}{
  209. {"id": int64(1), "name": "Lizz"},
  210. {"id": int64(4), "name": "Charles"},
  211. },
  212. }, {
  213. d: []map[string]interface{}{
  214. {"action": "upsert", "id": 4, "name": "Charles", "address": "343", "mobile": "334433"},
  215. {"action": "update", "id": 3, "name": "Lizz"},
  216. {"action": "update", "id": 1, "name": "Philips"},
  217. },
  218. b: true,
  219. r: []map[string]interface{}{
  220. {"id": int64(1), "name": "Philips"},
  221. {"id": int64(4), "name": "Charles"},
  222. },
  223. },
  224. }
  225. for i, tt := range test {
  226. if tt.b {
  227. err = s.Collect(ctx, tt.d)
  228. if err != nil {
  229. fmt.Println(err)
  230. }
  231. } else {
  232. for _, d := range tt.d {
  233. err = s.Collect(ctx, d)
  234. if err != nil {
  235. fmt.Println(err)
  236. }
  237. }
  238. }
  239. rows, err := db.Query("SELECT * FROM updateTable")
  240. if err != nil {
  241. t.Error(err)
  242. return
  243. }
  244. act, _ := rowsToMap(rows)
  245. if !reflect.DeepEqual(act, tt.r) {
  246. t.Errorf("Case %d Expect %v but got %v", i, tt.r, act)
  247. }
  248. }
  249. }
  250. func rowsToMap(rows *sql.Rows) ([]map[string]interface{}, error) {
  251. cols, _ := rows.Columns()
  252. types, err := rows.ColumnTypes()
  253. if err != nil {
  254. return nil, err
  255. }
  256. var result []map[string]interface{}
  257. for rows.Next() {
  258. data := make(map[string]interface{})
  259. columns := make([]interface{}, len(cols))
  260. prepareValues(columns, types, cols)
  261. err := rows.Scan(columns...)
  262. if err != nil {
  263. return nil, err
  264. }
  265. scanIntoMap(data, columns, cols)
  266. result = append(result, data)
  267. }
  268. return result, nil
  269. }
  270. func scanIntoMap(mapValue map[string]interface{}, values []interface{}, columns []string) {
  271. for idx, column := range columns {
  272. if reflectValue := reflect.Indirect(reflect.Indirect(reflect.ValueOf(values[idx]))); reflectValue.IsValid() {
  273. mapValue[column] = reflectValue.Interface()
  274. if valuer, ok := mapValue[column].(driver.Valuer); ok {
  275. mapValue[column], _ = valuer.Value()
  276. } else if b, ok := mapValue[column].(sql.RawBytes); ok {
  277. mapValue[column] = string(b)
  278. }
  279. } else {
  280. mapValue[column] = nil
  281. }
  282. }
  283. }
  284. func prepareValues(values []interface{}, columnTypes []*sql.ColumnType, columns []string) {
  285. if len(columnTypes) > 0 {
  286. for idx, columnType := range columnTypes {
  287. if columnType.ScanType() != nil {
  288. values[idx] = reflect.New(reflect.PtrTo(columnType.ScanType())).Interface()
  289. } else {
  290. values[idx] = new(interface{})
  291. }
  292. }
  293. } else {
  294. for idx := range columns {
  295. values[idx] = new(interface{})
  296. }
  297. }
  298. }