source_node_test.go 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118
  1. // Copyright 2021 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package node
  15. import (
  16. "github.com/lf-edge/ekuiper/internal/conf"
  17. "github.com/lf-edge/ekuiper/internal/topo/context"
  18. "github.com/lf-edge/ekuiper/pkg/ast"
  19. "github.com/lf-edge/ekuiper/pkg/cast"
  20. "reflect"
  21. "testing"
  22. )
  23. func TestGetConf_Apply(t *testing.T) {
  24. result := map[string]interface{}{
  25. "url": "http://localhost",
  26. "method": "post",
  27. "interval": 10000,
  28. "timeout": 5000,
  29. "incremental": false,
  30. "body": "{}",
  31. "bodyType": "json",
  32. "format": "json",
  33. "insecureSkipVerify": true,
  34. "headers": map[string]interface{}{
  35. "Accept": "application/json",
  36. },
  37. }
  38. n := NewSourceNode("test", ast.TypeStream, nil, &ast.Options{
  39. DATASOURCE: "/feed",
  40. TYPE: "httppull",
  41. }, false)
  42. contextLogger := conf.Log.WithField("rule", "test")
  43. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  44. conf := getSourceConf(ctx, n.sourceType, n.options)
  45. if !reflect.DeepEqual(result, conf) {
  46. t.Errorf("result mismatch:\n\nexp=%s\n\ngot=%s\n\n", result, conf)
  47. }
  48. }
  49. func TestGetConfAndConvert_Apply(t *testing.T) {
  50. result := map[string]interface{}{
  51. "url": "http://localhost:9090/",
  52. "method": "post",
  53. "interval": 10000,
  54. "timeout": 5000,
  55. "incremental": true,
  56. "body": "{}",
  57. "bodyType": "json",
  58. "format": "json",
  59. "insecureSkipVerify": true,
  60. "headers": map[string]interface{}{
  61. "Accept": "application/json",
  62. },
  63. }
  64. n := NewSourceNode("test", ast.TypeStream, nil, &ast.Options{
  65. DATASOURCE: "/feed",
  66. TYPE: "httppull",
  67. CONF_KEY: "application_conf",
  68. }, false)
  69. contextLogger := conf.Log.WithField("rule", "test")
  70. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  71. conf := getSourceConf(ctx, n.sourceType, n.options)
  72. if !reflect.DeepEqual(result, conf) {
  73. t.Errorf("result mismatch:\n\nexp=%s\n\ngot=%s\n\n", result, conf)
  74. return
  75. }
  76. r := &httpPullSourceConfig{
  77. Url: "http://localhost:9090/",
  78. Method: "post",
  79. Interval: 10000,
  80. Timeout: 5000,
  81. Incremental: true,
  82. Body: "{}",
  83. BodyType: "json",
  84. InsecureSkipVerify: true,
  85. Headers: map[string]interface{}{
  86. "Accept": "application/json",
  87. },
  88. }
  89. cfg := &httpPullSourceConfig{}
  90. err := cast.MapToStruct(conf, cfg)
  91. if err != nil {
  92. t.Errorf("map to sturct error %s", err)
  93. return
  94. }
  95. if !reflect.DeepEqual(r, cfg) {
  96. t.Errorf("result mismatch:\n\nexp=%v\n\ngot=%v\n\n", r, cfg)
  97. return
  98. }
  99. }
  100. type httpPullSourceConfig struct {
  101. Url string `json:"url"`
  102. Method string `json:"method"`
  103. Interval int `json:"interval"`
  104. Timeout int `json:"timeout"`
  105. Incremental bool `json:"incremental"`
  106. Body string `json:"body"`
  107. BodyType string `json:"bodyType"`
  108. InsecureSkipVerify bool `json:"insecureSkipVerify"`
  109. Headers map[string]interface{} `json:"headers"`
  110. }