source_node_test.go 2.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192
  1. package nodes
  2. import (
  3. "github.com/emqx/kuiper/common"
  4. "github.com/emqx/kuiper/xsql"
  5. "github.com/emqx/kuiper/xstream/contexts"
  6. "reflect"
  7. "testing"
  8. )
  9. func TestGetConf_Apply(t *testing.T) {
  10. result := map[string]interface{}{
  11. "interval": 1000,
  12. "ashost": "192.168.1.100",
  13. "sysnr": "02",
  14. "client": "900",
  15. "user": "SPERF",
  16. "passwd": "PASSPASS",
  17. "format": "json",
  18. "params": map[string]interface{}{
  19. "QUERY_TABLE": "VBAP",
  20. "ROWCOUNT": 10,
  21. "FIELDS": []interface{}{
  22. map[string]interface{}{"FIELDNAME": "MANDT"},
  23. map[string]interface{}{"FIELDNAME": "VBELN"},
  24. map[string]interface{}{"FIELDNAME": "POSNR"},
  25. },
  26. },
  27. }
  28. n := NewSourceNode("test", xsql.TypeStream, map[string]string{
  29. "DATASOURCE": "RFC_READ_TABLE",
  30. "TYPE": "test",
  31. })
  32. contextLogger := common.Log.WithField("rule", "test")
  33. ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
  34. conf := getSourceConf(ctx, n.sourceType, n.options)
  35. if !reflect.DeepEqual(result, conf) {
  36. t.Errorf("result mismatch:\n\nexp=%s\n\ngot=%s\n\n", result, conf)
  37. }
  38. }
  39. func TestGetConfAndConvert_Apply(t *testing.T) {
  40. result := map[string]interface{}{
  41. "interval": 100,
  42. "seed": 1,
  43. "format": "json",
  44. "pattern": map[string]interface{}{
  45. "count": 50,
  46. },
  47. "deduplicate": 50,
  48. }
  49. n := NewSourceNode("test", xsql.TypeStream, map[string]string{
  50. "DATASOURCE": "test",
  51. "TYPE": "random",
  52. "CONF_KEY": "dedup",
  53. })
  54. contextLogger := common.Log.WithField("rule", "test")
  55. ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
  56. conf := getSourceConf(ctx, n.sourceType, n.options)
  57. if !reflect.DeepEqual(result, conf) {
  58. t.Errorf("result mismatch:\n\nexp=%s\n\ngot=%s\n\n", result, conf)
  59. return
  60. }
  61. r := &randomSourceConfig{
  62. Interval: 100,
  63. Seed: 1,
  64. Pattern: map[string]interface{}{
  65. "count": float64(50),
  66. },
  67. Deduplicate: 50,
  68. }
  69. cfg := &randomSourceConfig{}
  70. err := common.MapToStruct(conf, cfg)
  71. if err != nil {
  72. t.Errorf("map to sturct error %s", err)
  73. return
  74. }
  75. if !reflect.DeepEqual(r, cfg) {
  76. t.Errorf("result mismatch:\n\nexp=%v\n\ngot=%v\n\n", r, cfg)
  77. return
  78. }
  79. }
  80. type randomSourceConfig struct {
  81. Interval int `json:"interval"`
  82. Seed int `json:"seed"`
  83. Pattern map[string]interface{} `json:"pattern"`
  84. Deduplicate int `json:"deduplicate"`
  85. }