httppull_lookup_test.go 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349
  1. // Copyright 2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package http
  15. import (
  16. "fmt"
  17. "net/http"
  18. "reflect"
  19. "testing"
  20. "github.com/stretchr/testify/require"
  21. "github.com/lf-edge/ekuiper/internal/conf"
  22. "github.com/lf-edge/ekuiper/internal/topo/context"
  23. )
  24. func TestConfigureLookup(t *testing.T) {
  25. tests := []struct {
  26. name string
  27. props map[string]interface{}
  28. err error
  29. config *RawConf
  30. accessConf *AccessTokenConf
  31. refreshConf *RefreshTokenConf
  32. tokens map[string]interface{}
  33. }{
  34. // Test oAuth
  35. {
  36. name: "oAuth with access token and constant expire",
  37. props: map[string]interface{}{
  38. "url": "http://localhost:52345/",
  39. "headers": map[string]interface{}{
  40. "Authorization": "Bearer {{.token}}",
  41. },
  42. "oAuth": map[string]interface{}{
  43. "access": map[string]interface{}{
  44. "url": "http://localhost:52345/token",
  45. "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
  46. "expire": "3600",
  47. },
  48. },
  49. },
  50. config: &RawConf{
  51. Url: "http://localhost:52345/",
  52. ResendUrl: "http://localhost:52345/",
  53. Method: http.MethodGet,
  54. Interval: DefaultInterval,
  55. Timeout: DefaultTimeout,
  56. BodyType: "none",
  57. ResponseType: "code",
  58. InsecureSkipVerify: true,
  59. Headers: map[string]interface{}{
  60. "Authorization": "Bearer {{.token}}",
  61. },
  62. HeadersMap: map[string]string{
  63. "Authorization": "Bearer {{.token}}",
  64. },
  65. OAuth: map[string]map[string]interface{}{
  66. "access": {
  67. "url": "http://localhost:52345/token",
  68. "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
  69. "expire": "3600",
  70. },
  71. },
  72. },
  73. accessConf: &AccessTokenConf{
  74. Url: "http://localhost:52345/token",
  75. Body: "{\"username\": \"admin\",\"password\": \"0000\"}",
  76. Expire: "3600",
  77. ExpireInSecond: 3600,
  78. },
  79. tokens: map[string]interface{}{
  80. "token": DefaultToken,
  81. "refresh_token": RefreshToken,
  82. "client_id": "test",
  83. "expires": float64(36000),
  84. },
  85. },
  86. {
  87. name: "oAuth with access token and dynamic expire",
  88. props: map[string]interface{}{
  89. "url": "http://localhost:52345/",
  90. "headers": map[string]interface{}{
  91. "Authorization": "Bearer {{.token}}",
  92. },
  93. "oAuth": map[string]interface{}{
  94. "access": map[string]interface{}{
  95. "url": "http://localhost:52345/token",
  96. "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
  97. "expire": "{{.expires}}",
  98. },
  99. },
  100. },
  101. config: &RawConf{
  102. Url: "http://localhost:52345/",
  103. ResendUrl: "http://localhost:52345/",
  104. Method: http.MethodGet,
  105. Interval: DefaultInterval,
  106. Timeout: DefaultTimeout,
  107. BodyType: "none",
  108. ResponseType: "code",
  109. InsecureSkipVerify: true,
  110. Headers: map[string]interface{}{
  111. "Authorization": "Bearer {{.token}}",
  112. },
  113. HeadersMap: map[string]string{
  114. "Authorization": "Bearer {{.token}}",
  115. },
  116. OAuth: map[string]map[string]interface{}{
  117. "access": {
  118. "url": "http://localhost:52345/token",
  119. "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
  120. "expire": "{{.expires}}",
  121. },
  122. },
  123. },
  124. accessConf: &AccessTokenConf{
  125. Url: "http://localhost:52345/token",
  126. Body: "{\"username\": \"admin\",\"password\": \"0000\"}",
  127. Expire: "{{.expires}}",
  128. ExpireInSecond: 36000,
  129. },
  130. tokens: map[string]interface{}{
  131. "token": DefaultToken,
  132. "refresh_token": RefreshToken,
  133. "client_id": "test",
  134. "expires": float64(36000),
  135. },
  136. },
  137. {
  138. name: "oAuth with access token and refresh token",
  139. props: map[string]interface{}{
  140. "url": "http://localhost:52345/",
  141. "headers": map[string]interface{}{
  142. "Authorization": "Bearer {{.token}}",
  143. },
  144. "oAuth": map[string]interface{}{
  145. "access": map[string]interface{}{
  146. "url": "http://localhost:52345/token",
  147. "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
  148. "expire": "3600",
  149. },
  150. "refresh": map[string]interface{}{
  151. "url": "http://localhost:52345/refresh",
  152. "headers": map[string]interface{}{
  153. "Authorization": "Bearer {{.token}}",
  154. "RefreshToken": "{{.refresh_token}}",
  155. },
  156. },
  157. },
  158. },
  159. config: &RawConf{
  160. Url: "http://localhost:52345/",
  161. ResendUrl: "http://localhost:52345/",
  162. Method: http.MethodGet,
  163. Interval: DefaultInterval,
  164. Timeout: DefaultTimeout,
  165. BodyType: "none",
  166. ResponseType: "code",
  167. InsecureSkipVerify: true,
  168. Headers: map[string]interface{}{
  169. "Authorization": "Bearer {{.token}}",
  170. },
  171. HeadersMap: map[string]string{
  172. "Authorization": "Bearer {{.token}}",
  173. },
  174. OAuth: map[string]map[string]interface{}{
  175. "access": {
  176. "url": "http://localhost:52345/token",
  177. "body": "{\"username\": \"admin\",\"password\": \"0000\"}",
  178. "expire": "3600",
  179. },
  180. "refresh": {
  181. "url": "http://localhost:52345/refresh",
  182. "headers": map[string]interface{}{
  183. "Authorization": "Bearer {{.token}}",
  184. "RefreshToken": "{{.refresh_token}}",
  185. },
  186. },
  187. },
  188. },
  189. accessConf: &AccessTokenConf{
  190. Url: "http://localhost:52345/token",
  191. Body: "{\"username\": \"admin\",\"password\": \"0000\"}",
  192. Expire: "3600",
  193. ExpireInSecond: 3600,
  194. },
  195. refreshConf: &RefreshTokenConf{
  196. Url: "http://localhost:52345/refresh",
  197. Headers: map[string]string{
  198. "Authorization": "Bearer {{.token}}",
  199. "RefreshToken": "{{.refresh_token}}",
  200. },
  201. },
  202. tokens: map[string]interface{}{
  203. "token": DefaultToken,
  204. "refresh_token": RefreshToken,
  205. "client_id": "test",
  206. "expires": float64(36000),
  207. },
  208. },
  209. // test default
  210. {
  211. name: "default",
  212. props: map[string]interface{}{
  213. "url": "http://localhost:9090/",
  214. },
  215. config: &RawConf{
  216. Url: "http://localhost:9090/",
  217. ResendUrl: "http://localhost:9090/",
  218. Method: http.MethodGet,
  219. Interval: DefaultInterval,
  220. Timeout: DefaultTimeout,
  221. BodyType: "none",
  222. ResponseType: "code",
  223. InsecureSkipVerify: true,
  224. },
  225. },
  226. }
  227. server := mockAuthServer()
  228. server.Start()
  229. defer server.Close()
  230. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  231. for i, tt := range tests {
  232. t.Run(fmt.Sprintf("Test %d: %s", i, tt.name), func(t *testing.T) {
  233. r := &lookupSource{}
  234. err := r.Configure("", tt.props)
  235. if err != nil {
  236. if tt.err == nil {
  237. t.Errorf("Expected error: %v", err)
  238. } else {
  239. if err.Error() != tt.err.Error() {
  240. t.Errorf("Error mismatch\nexp\t%v\ngot\t%v", tt.err, err)
  241. }
  242. }
  243. return
  244. }
  245. if !reflect.DeepEqual(r.config, tt.config) {
  246. t.Errorf("Config mismatch\nexp\t%+v\ngot\t%+v", tt.config, r.config)
  247. }
  248. if !reflect.DeepEqual(r.accessConf, tt.accessConf) {
  249. t.Errorf("AccessConf mismatch\nexp\t%+v\ngot\t%+v", tt.accessConf, r.accessConf)
  250. }
  251. if !reflect.DeepEqual(r.refreshConf, tt.refreshConf) {
  252. t.Errorf("RefreshConf mismatch\nexp\t%+v\ngot\t%+v", tt.refreshConf, r.refreshConf)
  253. }
  254. if !reflect.DeepEqual(r.tokens, tt.tokens) {
  255. t.Errorf("Tokens mismatch\nexp\t%s\ngot\t%s", tt.tokens, r.tokens)
  256. }
  257. })
  258. }
  259. }
  260. func TestLookupPull(t *testing.T) {
  261. conf.IsTesting = false
  262. conf.InitClock()
  263. r := &lookupSource{}
  264. server := mockAuthServer()
  265. server.Start()
  266. defer server.Close()
  267. err := r.Configure("data3", map[string]interface{}{
  268. "url": "http://localhost:52345/",
  269. "responseType": "body",
  270. })
  271. require.NoError(t, err)
  272. resp, err := r.pull(context.Background())
  273. require.NoError(t, err)
  274. require.Equal(t, []map[string]interface{}{
  275. {
  276. "code": float64(200),
  277. "data": map[string]interface{}{
  278. "device_id": "d1",
  279. "temperature": float64(25.5),
  280. "humidity": float64(60),
  281. },
  282. },
  283. {
  284. "code": float64(200),
  285. "data": map[string]interface{}{
  286. "device_id": "d2",
  287. "temperature": float64(25.5),
  288. "humidity": float64(60),
  289. },
  290. },
  291. }, resp)
  292. }
  293. func TestLookupJoin(t *testing.T) {
  294. datas := []map[string]interface{}{
  295. {
  296. "a": 1,
  297. "b": 3,
  298. "c": 5,
  299. },
  300. {
  301. "a": 2,
  302. "b": 4,
  303. "c": 6,
  304. },
  305. }
  306. keys := []string{"a"}
  307. values := []interface{}{1}
  308. l := &lookupSource{}
  309. got := l.lookupJoin(datas, keys, values)
  310. require.Equal(t, []map[string]interface{}{
  311. {
  312. "a": 1,
  313. "b": 3,
  314. "c": 5,
  315. },
  316. }, got)
  317. }
  318. func TestLookup(t *testing.T) {
  319. conf.IsTesting = false
  320. conf.InitClock()
  321. r := &lookupSource{}
  322. server := mockAuthServer()
  323. server.Start()
  324. defer server.Close()
  325. err := r.Configure("data3", map[string]interface{}{
  326. "url": "http://localhost:52345/",
  327. "responseType": "body",
  328. })
  329. require.NoError(t, err)
  330. tuples, err := r.Lookup(context.Background(), nil, []string{"code"}, []interface{}{float64(200)})
  331. require.NoError(t, err)
  332. require.Len(t, tuples, 2)
  333. }
  334. func TestLookupActions(t *testing.T) {
  335. r := &lookupSource{}
  336. require.NoError(t, r.Open(context.Background()))
  337. require.NoError(t, r.Close(context.Background()))
  338. require.NotNil(t, GetLookUpSource())
  339. }