source_node_test.go 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293
  1. package node
  2. import (
  3. "github.com/lf-edge/ekuiper/internal/conf"
  4. "github.com/lf-edge/ekuiper/internal/topo/context"
  5. "github.com/lf-edge/ekuiper/pkg/ast"
  6. "github.com/lf-edge/ekuiper/pkg/cast"
  7. "reflect"
  8. "testing"
  9. )
  10. func TestGetConf_Apply(t *testing.T) {
  11. result := map[string]interface{}{
  12. "interval": 1000,
  13. "ashost": "192.168.1.100",
  14. "sysnr": "02",
  15. "client": "900",
  16. "user": "SPERF",
  17. "passwd": "PASSPASS",
  18. "format": "json",
  19. "params": map[string]interface{}{
  20. "QUERY_TABLE": "VBAP",
  21. "ROWCOUNT": 10,
  22. "FIELDS": []interface{}{
  23. map[string]interface{}{"FIELDNAME": "MANDT"},
  24. map[string]interface{}{"FIELDNAME": "VBELN"},
  25. map[string]interface{}{"FIELDNAME": "POSNR"},
  26. },
  27. },
  28. }
  29. n := NewSourceNode("test", ast.TypeStream, &ast.Options{
  30. DATASOURCE: "RFC_READ_TABLE",
  31. TYPE: "test",
  32. })
  33. contextLogger := conf.Log.WithField("rule", "test")
  34. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  35. conf := getSourceConf(ctx, n.sourceType, n.options)
  36. if !reflect.DeepEqual(result, conf) {
  37. t.Errorf("result mismatch:\n\nexp=%s\n\ngot=%s\n\n", result, conf)
  38. }
  39. }
  40. func TestGetConfAndConvert_Apply(t *testing.T) {
  41. result := map[string]interface{}{
  42. "interval": 100,
  43. "seed": 1,
  44. "format": "json",
  45. "pattern": map[string]interface{}{
  46. "count": 50,
  47. },
  48. "deduplicate": 50,
  49. }
  50. n := NewSourceNode("test", ast.TypeStream, &ast.Options{
  51. DATASOURCE: "test",
  52. TYPE: "random",
  53. CONF_KEY: "dedup",
  54. })
  55. contextLogger := conf.Log.WithField("rule", "test")
  56. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  57. conf := getSourceConf(ctx, n.sourceType, n.options)
  58. if !reflect.DeepEqual(result, conf) {
  59. t.Errorf("result mismatch:\n\nexp=%s\n\ngot=%s\n\n", result, conf)
  60. return
  61. }
  62. r := &randomSourceConfig{
  63. Interval: 100,
  64. Seed: 1,
  65. Pattern: map[string]interface{}{
  66. "count": float64(50),
  67. },
  68. Deduplicate: 50,
  69. }
  70. cfg := &randomSourceConfig{}
  71. err := cast.MapToStruct(conf, cfg)
  72. if err != nil {
  73. t.Errorf("map to sturct error %s", err)
  74. return
  75. }
  76. if !reflect.DeepEqual(r, cfg) {
  77. t.Errorf("result mismatch:\n\nexp=%v\n\ngot=%v\n\n", r, cfg)
  78. return
  79. }
  80. }
  81. type randomSourceConfig struct {
  82. Interval int `json:"interval"`
  83. Seed int `json:"seed"`
  84. Pattern map[string]interface{} `json:"pattern"`
  85. Deduplicate int `json:"deduplicate"`
  86. }