table_node_test.go 1.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081
  1. package nodes
  2. import (
  3. "github.com/emqx/kuiper/common"
  4. "github.com/emqx/kuiper/xsql"
  5. "github.com/emqx/kuiper/xstream/contexts"
  6. "github.com/emqx/kuiper/xstream/topotest/mockclock"
  7. "reflect"
  8. "testing"
  9. )
  10. func TestTableNode(t *testing.T) {
  11. mockclock.ResetClock(1541152486000)
  12. var tests = []struct {
  13. name string
  14. options map[string]string
  15. result []*xsql.Tuple
  16. }{
  17. { //0
  18. name: "test0",
  19. options: map[string]string{
  20. "TYPE": "file",
  21. "DATASOURCE": "lookup.json",
  22. "CONF_KEY": "test",
  23. },
  24. result: []*xsql.Tuple{
  25. {
  26. Emitter: "test0",
  27. Message: map[string]interface{}{
  28. "id": float64(1541152486013),
  29. "name": "name1",
  30. "size": float64(2),
  31. },
  32. Timestamp: common.GetNowInMilli(),
  33. },
  34. {
  35. Emitter: "test0",
  36. Message: map[string]interface{}{
  37. "id": float64(1541152487632),
  38. "name": "name2",
  39. "size": float64(6),
  40. },
  41. Timestamp: common.GetNowInMilli(),
  42. },
  43. {
  44. Emitter: "test0",
  45. Message: map[string]interface{}{
  46. "id": float64(1541152489252),
  47. "name": "name3",
  48. "size": float64(4),
  49. },
  50. Timestamp: common.GetNowInMilli(),
  51. },
  52. },
  53. },
  54. }
  55. t.Logf("The test bucket size is %d.\n\n", len(tests))
  56. for i, tt := range tests {
  57. n := NewTableNode(tt.name, tt.options)
  58. resultCh := make(chan interface{})
  59. errCh := make(chan error)
  60. contextLogger := common.Log.WithField("test", "test")
  61. ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
  62. n.AddOutput(resultCh, "test")
  63. n.Open(ctx, errCh)
  64. select {
  65. case err := <-errCh:
  66. t.Error(err)
  67. case d := <-resultCh:
  68. r, ok := d.([]*xsql.Tuple)
  69. if !ok {
  70. t.Errorf("%d. \nresult is not tuple list:got=%#v\n\n", i, d)
  71. break
  72. }
  73. if !reflect.DeepEqual(tt.result, r) {
  74. t.Errorf("%d. \nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.result, r)
  75. }
  76. }
  77. }
  78. }