source_node_test.go 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115
  1. // Copyright 2021-2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package node
  15. import (
  16. "reflect"
  17. "testing"
  18. nodeConf "github.com/lf-edge/ekuiper/internal/topo/node/conf"
  19. "github.com/lf-edge/ekuiper/pkg/ast"
  20. "github.com/lf-edge/ekuiper/pkg/cast"
  21. )
  22. func TestGetConf_Apply(t *testing.T) {
  23. result := map[string]interface{}{
  24. "url": "http://localhost",
  25. "method": "post",
  26. "interval": 10000,
  27. "timeout": 5000,
  28. "bodyType": "json",
  29. "key": "",
  30. "format": "json",
  31. "responseType": "code",
  32. "incremental": false,
  33. "insecureSkipVerify": true,
  34. "headers": map[string]interface{}{
  35. "accept": "application/json",
  36. },
  37. }
  38. n := NewSourceNode("test", ast.TypeStream, nil, &ast.Options{
  39. DATASOURCE: "/feed",
  40. TYPE: "httppull",
  41. }, false, nil)
  42. conf := nodeConf.GetSourceConf(n.sourceType, n.options)
  43. if !reflect.DeepEqual(result, conf) {
  44. t.Errorf("result mismatch:\n\nexp=%s\n\ngot=%s\n\n", result, conf)
  45. }
  46. }
  47. func TestGetConfAndConvert_Apply(t *testing.T) {
  48. result := map[string]interface{}{
  49. "url": "http://localhost:9090/",
  50. "method": "post",
  51. "interval": 10000,
  52. "timeout": 5000,
  53. "bodyType": "json",
  54. "key": "",
  55. "format": "json",
  56. "responseType": "code",
  57. "incremental": true,
  58. "insecureSkipVerify": true,
  59. "headers": map[string]interface{}{
  60. "accept": "application/json",
  61. },
  62. }
  63. n := NewSourceNode("test", ast.TypeStream, nil, &ast.Options{
  64. DATASOURCE: "/feed",
  65. TYPE: "httppull",
  66. CONF_KEY: "application_conf",
  67. }, false, nil)
  68. conf := nodeConf.GetSourceConf(n.sourceType, n.options)
  69. if !reflect.DeepEqual(result, conf) {
  70. t.Errorf("result mismatch:\n\nexp=%s\n\ngot=%s\n\n", result, conf)
  71. return
  72. }
  73. r := &httpPullSourceConfig{
  74. Url: "http://localhost:9090/",
  75. Method: "post",
  76. Interval: 10000,
  77. Timeout: 5000,
  78. Incremental: true,
  79. BodyType: "json",
  80. InsecureSkipVerify: true,
  81. Headers: map[string]interface{}{
  82. "accept": "application/json",
  83. },
  84. }
  85. cfg := &httpPullSourceConfig{}
  86. err := cast.MapToStruct(conf, cfg)
  87. if err != nil {
  88. t.Errorf("map to sturct error %s", err)
  89. return
  90. }
  91. if !reflect.DeepEqual(r, cfg) {
  92. t.Errorf("result mismatch:\n\nexp=%v\n\ngot=%v\n\n", r, cfg)
  93. return
  94. }
  95. }
  96. type httpPullSourceConfig struct {
  97. Url string `json:"url"`
  98. Method string `json:"method"`
  99. Interval int `json:"interval"`
  100. Timeout int `json:"timeout"`
  101. Incremental bool `json:"incremental"`
  102. Body string `json:"body"`
  103. BodyType string `json:"bodyType"`
  104. InsecureSkipVerify bool `json:"insecureSkipVerify"`
  105. Headers map[string]interface{} `json:"headers"`
  106. }