plugin_ins_manager_test.go 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184
  1. // Copyright 2021-2022 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package runtime
  15. import (
  16. "fmt"
  17. "github.com/lf-edge/ekuiper/internal/conf"
  18. "github.com/lf-edge/ekuiper/internal/topo/context"
  19. "github.com/lf-edge/ekuiper/internal/topo/state"
  20. "go.nanomsg.org/mangos/v3"
  21. "go.nanomsg.org/mangos/v3/protocol/req"
  22. "sync"
  23. "testing"
  24. )
  25. // Plugin manager involves process, only covered in the integration test
  26. // TestPluginInstance test the encode/decode of command
  27. func TestPluginInstance(t *testing.T) {
  28. pluginName := "test"
  29. ch, err := CreateControlChannel(pluginName)
  30. if err != nil {
  31. t.Errorf("create channel error %v", err)
  32. return
  33. }
  34. client, err := createMockClient(pluginName)
  35. if err != nil {
  36. t.Errorf("normal process: create client error %v", err)
  37. return
  38. }
  39. err = client.Send([]byte("handshake"))
  40. if err != nil {
  41. t.Errorf("can't send handshake: %s", err.Error())
  42. return
  43. }
  44. err = ch.Handshake()
  45. if err != nil {
  46. t.Errorf("can't ack handshake: %s", err.Error())
  47. return
  48. }
  49. ins := NewPluginIns("test", ch, nil)
  50. var tests = []struct {
  51. c *Control
  52. sj string
  53. ej string
  54. }{
  55. {
  56. c: &Control{
  57. SymbolName: "symbol1",
  58. Meta: Meta{
  59. RuleId: "rule1",
  60. OpId: "op1",
  61. InstanceId: 0,
  62. },
  63. PluginType: "sources",
  64. DataSource: "topic",
  65. Config: map[string]interface{}{"abc": 1},
  66. },
  67. sj: "{\"cmd\":\"start\",\"arg\":\"{\\\"symbolName\\\":\\\"symbol1\\\",\\\"meta\\\":{\\\"ruleId\\\":\\\"rule1\\\",\\\"opId\\\":\\\"op1\\\",\\\"instanceId\\\":0},\\\"pluginType\\\":\\\"sources\\\",\\\"dataSource\\\":\\\"topic\\\",\\\"config\\\":{\\\"abc\\\":1}}\"}",
  68. ej: "{\"cmd\":\"stop\",\"arg\":\"{\\\"symbolName\\\":\\\"symbol1\\\",\\\"meta\\\":{\\\"ruleId\\\":\\\"rule1\\\",\\\"opId\\\":\\\"op1\\\",\\\"instanceId\\\":0},\\\"pluginType\\\":\\\"sources\\\",\\\"dataSource\\\":\\\"topic\\\",\\\"config\\\":{\\\"abc\\\":1}}\"}",
  69. }, {
  70. c: &Control{
  71. SymbolName: "symbol2",
  72. Meta: Meta{
  73. RuleId: "rule1",
  74. OpId: "op2",
  75. InstanceId: 0,
  76. },
  77. PluginType: "functions",
  78. },
  79. sj: "{\"cmd\":\"start\",\"arg\":\"{\\\"symbolName\\\":\\\"symbol2\\\",\\\"meta\\\":{\\\"ruleId\\\":\\\"rule1\\\",\\\"opId\\\":\\\"op2\\\",\\\"instanceId\\\":0},\\\"pluginType\\\":\\\"functions\\\"}\"}",
  80. ej: "{\"cmd\":\"stop\",\"arg\":\"{\\\"symbolName\\\":\\\"symbol2\\\",\\\"meta\\\":{\\\"ruleId\\\":\\\"rule1\\\",\\\"opId\\\":\\\"op2\\\",\\\"instanceId\\\":0},\\\"pluginType\\\":\\\"functions\\\"}\"}",
  81. }, {
  82. c: &Control{
  83. SymbolName: "symbol3",
  84. Meta: Meta{
  85. RuleId: "rule1",
  86. OpId: "op3",
  87. InstanceId: 0,
  88. },
  89. PluginType: "sinks",
  90. Config: map[string]interface{}{"def": map[string]interface{}{"ci": "aaa"}},
  91. },
  92. sj: "{\"cmd\":\"start\",\"arg\":\"{\\\"symbolName\\\":\\\"symbol3\\\",\\\"meta\\\":{\\\"ruleId\\\":\\\"rule1\\\",\\\"opId\\\":\\\"op3\\\",\\\"instanceId\\\":0},\\\"pluginType\\\":\\\"sinks\\\",\\\"config\\\":{\\\"def\\\":{\\\"ci\\\":\\\"aaa\\\"}}}\"}",
  93. ej: "{\"cmd\":\"stop\",\"arg\":\"{\\\"symbolName\\\":\\\"symbol3\\\",\\\"meta\\\":{\\\"ruleId\\\":\\\"rule1\\\",\\\"opId\\\":\\\"op3\\\",\\\"instanceId\\\":0},\\\"pluginType\\\":\\\"sinks\\\",\\\"config\\\":{\\\"def\\\":{\\\"ci\\\":\\\"aaa\\\"}}}\"}",
  94. },
  95. }
  96. ctx := context.WithValue(context.Background(), context.LoggerKey, conf.Log)
  97. sctx := ctx.WithMeta("rule1", "op1", &state.MemoryStore{}).WithInstance(1)
  98. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  99. var wg sync.WaitGroup
  100. wg.Add(1)
  101. go func() {
  102. defer wg.Done()
  103. err := ins.StartSymbol(sctx, tests[0].c)
  104. if err != nil {
  105. t.Errorf("start command err %v", err)
  106. return
  107. }
  108. for _, tt := range tests {
  109. err := ins.StartSymbol(sctx, tt.c)
  110. if err != nil {
  111. t.Errorf("start command err %v", err)
  112. return
  113. }
  114. err = ins.StopSymbol(sctx, tt.c)
  115. if err != nil {
  116. fmt.Printf("stop command err %v\n", err)
  117. continue
  118. }
  119. }
  120. }()
  121. // start symbol1 to avoid instance clean
  122. msg, err := client.Recv()
  123. if err != nil {
  124. t.Errorf("receive start command err %v", err)
  125. }
  126. client.Send(okMsg)
  127. sj := string(msg)
  128. if sj != tests[0].sj {
  129. t.Errorf("start command mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", tests[0].sj, sj)
  130. }
  131. for _, tt := range tests {
  132. msg, err := client.Recv()
  133. if err != nil {
  134. t.Errorf("receive start command err %v", err)
  135. break
  136. }
  137. client.Send(okMsg)
  138. sj := string(msg)
  139. if sj != tt.sj {
  140. t.Errorf("start command mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", tt.sj, sj)
  141. }
  142. msg, err = client.Recv()
  143. if err != nil {
  144. t.Errorf("receive stop command err %v", err)
  145. break
  146. }
  147. client.Send(okMsg)
  148. ej := string(msg)
  149. if ej != tt.ej {
  150. t.Errorf("end command mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", tt.ej, ej)
  151. }
  152. }
  153. err = client.Close()
  154. if err != nil {
  155. t.Errorf("close client error %v", err)
  156. }
  157. err = ins.ctrlChan.Close()
  158. if err != nil {
  159. t.Errorf("close ins error %v", err)
  160. }
  161. wg.Wait()
  162. }
  163. func createMockClient(pluginName string) (mangos.Socket, error) {
  164. var (
  165. sock mangos.Socket
  166. err error
  167. )
  168. if sock, err = req.NewSocket(); err != nil {
  169. return nil, fmt.Errorf("can't get new req socket: %s", err)
  170. }
  171. setSockOptions(sock, map[string]interface{}{
  172. mangos.OptionRetryTime: 0,
  173. })
  174. url := fmt.Sprintf("ipc:///tmp/plugin_%s.ipc", pluginName)
  175. if err = sock.Dial(url); err != nil {
  176. return nil, fmt.Errorf("can't dial on req socket: %s", err.Error())
  177. }
  178. return sock, nil
  179. }