source_node_test.go 2.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889
  1. package nodes
  2. import (
  3. "github.com/emqx/kuiper/common"
  4. "github.com/emqx/kuiper/xstream/contexts"
  5. "reflect"
  6. "testing"
  7. )
  8. func TestGetConf_Apply(t *testing.T) {
  9. result := map[string]interface{}{
  10. "interval": 1000,
  11. "ashost": "192.168.1.100",
  12. "sysnr": "02",
  13. "client": "900",
  14. "user": "SPERF",
  15. "passwd": "PASSPASS",
  16. "params": map[string]interface{}{
  17. "QUERY_TABLE": "VBAP",
  18. "ROWCOUNT": 10,
  19. "FIELDS": []interface{}{
  20. map[string]interface{}{"FIELDNAME": "MANDT"},
  21. map[string]interface{}{"FIELDNAME": "VBELN"},
  22. map[string]interface{}{"FIELDNAME": "POSNR"},
  23. },
  24. },
  25. }
  26. n := NewSourceNode("test", map[string]string{
  27. "DATASOURCE": "RFC_READ_TABLE",
  28. "TYPE": "test",
  29. })
  30. contextLogger := common.Log.WithField("rule", "test")
  31. ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
  32. conf := n.getConf(ctx)
  33. if !reflect.DeepEqual(result, conf) {
  34. t.Errorf("result mismatch:\n\nexp=%s\n\ngot=%s\n\n", result, conf)
  35. }
  36. }
  37. func TestGetConfAndConvert_Apply(t *testing.T) {
  38. result := map[string]interface{}{
  39. "interval": 100,
  40. "seed": 1,
  41. "pattern": map[string]interface{}{
  42. "count": 50,
  43. },
  44. "deduplicate": 50,
  45. }
  46. n := NewSourceNode("test", map[string]string{
  47. "DATASOURCE": "test",
  48. "TYPE": "random",
  49. "CONF_KEY": "dedup",
  50. })
  51. contextLogger := common.Log.WithField("rule", "test")
  52. ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
  53. conf := n.getConf(ctx)
  54. if !reflect.DeepEqual(result, conf) {
  55. t.Errorf("result mismatch:\n\nexp=%s\n\ngot=%s\n\n", result, conf)
  56. return
  57. }
  58. r := &randomSourceConfig{
  59. Interval: 100,
  60. Seed: 1,
  61. Pattern: map[string]interface{}{
  62. "count": float64(50),
  63. },
  64. Deduplicate: 50,
  65. }
  66. cfg := &randomSourceConfig{}
  67. err := common.MapToStruct(conf, cfg)
  68. if err != nil {
  69. t.Errorf("map to sturct error %s", err)
  70. return
  71. }
  72. if !reflect.DeepEqual(r, cfg) {
  73. t.Errorf("result mismatch:\n\nexp=%v\n\ngot=%v\n\n", r, cfg)
  74. return
  75. }
  76. }
  77. type randomSourceConfig struct {
  78. Interval int `json:"interval"`
  79. Seed int `json:"seed"`
  80. Pattern map[string]interface{} `json:"pattern"`
  81. Deduplicate int `json:"deduplicate"`
  82. }