source_node_test.go 2.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091
  1. package nodes
  2. import (
  3. "github.com/emqx/kuiper/common"
  4. "github.com/emqx/kuiper/xstream/contexts"
  5. "reflect"
  6. "testing"
  7. )
  8. func TestGetConf_Apply(t *testing.T) {
  9. result := map[string]interface{}{
  10. "interval": 1000,
  11. "ashost": "192.168.1.100",
  12. "sysnr": "02",
  13. "client": "900",
  14. "user": "SPERF",
  15. "passwd": "PASSPASS",
  16. "format": "json",
  17. "params": map[string]interface{}{
  18. "QUERY_TABLE": "VBAP",
  19. "ROWCOUNT": 10,
  20. "FIELDS": []interface{}{
  21. map[string]interface{}{"FIELDNAME": "MANDT"},
  22. map[string]interface{}{"FIELDNAME": "VBELN"},
  23. map[string]interface{}{"FIELDNAME": "POSNR"},
  24. },
  25. },
  26. }
  27. n := NewSourceNode("test", map[string]string{
  28. "DATASOURCE": "RFC_READ_TABLE",
  29. "TYPE": "test",
  30. })
  31. contextLogger := common.Log.WithField("rule", "test")
  32. ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
  33. conf := getSourceConf(ctx, n.sourceType, n.options)
  34. if !reflect.DeepEqual(result, conf) {
  35. t.Errorf("result mismatch:\n\nexp=%s\n\ngot=%s\n\n", result, conf)
  36. }
  37. }
  38. func TestGetConfAndConvert_Apply(t *testing.T) {
  39. result := map[string]interface{}{
  40. "interval": 100,
  41. "seed": 1,
  42. "format": "json",
  43. "pattern": map[string]interface{}{
  44. "count": 50,
  45. },
  46. "deduplicate": 50,
  47. }
  48. n := NewSourceNode("test", map[string]string{
  49. "DATASOURCE": "test",
  50. "TYPE": "random",
  51. "CONF_KEY": "dedup",
  52. })
  53. contextLogger := common.Log.WithField("rule", "test")
  54. ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
  55. conf := getSourceConf(ctx, n.sourceType, n.options)
  56. if !reflect.DeepEqual(result, conf) {
  57. t.Errorf("result mismatch:\n\nexp=%s\n\ngot=%s\n\n", result, conf)
  58. return
  59. }
  60. r := &randomSourceConfig{
  61. Interval: 100,
  62. Seed: 1,
  63. Pattern: map[string]interface{}{
  64. "count": float64(50),
  65. },
  66. Deduplicate: 50,
  67. }
  68. cfg := &randomSourceConfig{}
  69. err := common.MapToStruct(conf, cfg)
  70. if err != nil {
  71. t.Errorf("map to sturct error %s", err)
  72. return
  73. }
  74. if !reflect.DeepEqual(r, cfg) {
  75. t.Errorf("result mismatch:\n\nexp=%v\n\ngot=%v\n\n", r, cfg)
  76. return
  77. }
  78. }
  79. type randomSourceConfig struct {
  80. Interval int `json:"interval"`
  81. Seed int `json:"seed"`
  82. Pattern map[string]interface{} `json:"pattern"`
  83. Deduplicate int `json:"deduplicate"`
  84. }