plugin_ins_manager_test.go 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186
  1. // Copyright 2021 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package runtime
  15. import (
  16. "fmt"
  17. "github.com/lf-edge/ekuiper/internal/conf"
  18. "github.com/lf-edge/ekuiper/internal/topo/context"
  19. "github.com/lf-edge/ekuiper/internal/topo/state"
  20. "go.nanomsg.org/mangos/v3"
  21. "go.nanomsg.org/mangos/v3/protocol/req"
  22. "sync"
  23. "testing"
  24. )
  25. // Plugin manager involves process, only covered in the integration test
  26. // TestPluginInstance test the encode/decode of command
  27. func TestPluginInstance(t *testing.T) {
  28. pluginName := "test"
  29. ch, err := CreateControlChannel(pluginName)
  30. if err != nil {
  31. t.Errorf("create channel error %v", err)
  32. return
  33. }
  34. client, err := createMockClient(pluginName)
  35. if err != nil {
  36. t.Errorf("normal process: create client error %v", err)
  37. return
  38. }
  39. err = client.Send([]byte("handshake"))
  40. if err != nil {
  41. t.Errorf("can't send handshake: %s", err.Error())
  42. return
  43. }
  44. err = ch.Handshake()
  45. if err != nil {
  46. t.Errorf("can't ack handshake: %s", err.Error())
  47. return
  48. }
  49. ins := &PluginIns{
  50. name: "test",
  51. process: nil,
  52. ctrlChan: ch,
  53. }
  54. var tests = []struct {
  55. c *Control
  56. sj string
  57. ej string
  58. }{
  59. {
  60. c: &Control{
  61. SymbolName: "symbol1",
  62. Meta: &Meta{
  63. RuleId: "rule1",
  64. OpId: "op1",
  65. InstanceId: 0,
  66. },
  67. PluginType: "sources",
  68. DataSource: "topic",
  69. Config: map[string]interface{}{"abc": 1},
  70. },
  71. sj: "{\"cmd\":\"start\",\"arg\":\"{\\\"symbolName\\\":\\\"symbol1\\\",\\\"meta\\\":{\\\"ruleId\\\":\\\"rule1\\\",\\\"opId\\\":\\\"op1\\\",\\\"instanceId\\\":0},\\\"pluginType\\\":\\\"sources\\\",\\\"dataSource\\\":\\\"topic\\\",\\\"config\\\":{\\\"abc\\\":1}}\"}",
  72. ej: "{\"cmd\":\"stop\",\"arg\":\"{\\\"symbolName\\\":\\\"symbol1\\\",\\\"meta\\\":{\\\"ruleId\\\":\\\"rule1\\\",\\\"opId\\\":\\\"op1\\\",\\\"instanceId\\\":0},\\\"pluginType\\\":\\\"sources\\\",\\\"dataSource\\\":\\\"topic\\\",\\\"config\\\":{\\\"abc\\\":1}}\"}",
  73. }, {
  74. c: &Control{
  75. SymbolName: "symbol2",
  76. Meta: &Meta{
  77. RuleId: "rule1",
  78. OpId: "op2",
  79. InstanceId: 0,
  80. },
  81. PluginType: "functions",
  82. },
  83. sj: "{\"cmd\":\"start\",\"arg\":\"{\\\"symbolName\\\":\\\"symbol2\\\",\\\"meta\\\":{\\\"ruleId\\\":\\\"rule1\\\",\\\"opId\\\":\\\"op2\\\",\\\"instanceId\\\":0},\\\"pluginType\\\":\\\"functions\\\"}\"}",
  84. ej: "{\"cmd\":\"stop\",\"arg\":\"{\\\"symbolName\\\":\\\"symbol2\\\",\\\"meta\\\":{\\\"ruleId\\\":\\\"rule1\\\",\\\"opId\\\":\\\"op2\\\",\\\"instanceId\\\":0},\\\"pluginType\\\":\\\"functions\\\"}\"}",
  85. }, {
  86. c: &Control{
  87. SymbolName: "symbol3",
  88. Meta: &Meta{
  89. RuleId: "rule1",
  90. OpId: "op3",
  91. InstanceId: 0,
  92. },
  93. PluginType: "sinks",
  94. Config: map[string]interface{}{"def": map[string]interface{}{"ci": "aaa"}},
  95. },
  96. sj: "{\"cmd\":\"start\",\"arg\":\"{\\\"symbolName\\\":\\\"symbol3\\\",\\\"meta\\\":{\\\"ruleId\\\":\\\"rule1\\\",\\\"opId\\\":\\\"op3\\\",\\\"instanceId\\\":0},\\\"pluginType\\\":\\\"sinks\\\",\\\"config\\\":{\\\"def\\\":{\\\"ci\\\":\\\"aaa\\\"}}}\"}",
  97. ej: "{\"cmd\":\"stop\",\"arg\":\"{\\\"symbolName\\\":\\\"symbol3\\\",\\\"meta\\\":{\\\"ruleId\\\":\\\"rule1\\\",\\\"opId\\\":\\\"op3\\\",\\\"instanceId\\\":0},\\\"pluginType\\\":\\\"sinks\\\",\\\"config\\\":{\\\"def\\\":{\\\"ci\\\":\\\"aaa\\\"}}}\"}",
  98. },
  99. }
  100. ctx := context.WithValue(context.Background(), context.LoggerKey, conf.Log)
  101. sctx := ctx.WithMeta("rule1", "op1", &state.MemoryStore{}).WithInstance(1)
  102. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  103. var wg sync.WaitGroup
  104. wg.Add(1)
  105. go func() {
  106. err := ins.StartSymbol(sctx, tests[0].c)
  107. if err != nil {
  108. t.Errorf("start command err %v", err)
  109. return
  110. }
  111. for _, tt := range tests {
  112. err := ins.StartSymbol(sctx, tt.c)
  113. if err != nil {
  114. t.Errorf("start command err %v", err)
  115. return
  116. }
  117. err = ins.StopSymbol(sctx, tt.c)
  118. if err != nil {
  119. t.Errorf("stop command err %v", err)
  120. return
  121. }
  122. }
  123. wg.Done()
  124. }()
  125. // start symbol1 to avoild instance clean
  126. msg, err := client.Recv()
  127. if err != nil {
  128. t.Errorf("receive start command err %v", err)
  129. }
  130. client.Send(okMsg)
  131. sj := string(msg)
  132. if sj != tests[0].sj {
  133. t.Errorf("start command mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", tests[0].sj, sj)
  134. }
  135. for _, tt := range tests {
  136. msg, err := client.Recv()
  137. if err != nil {
  138. t.Errorf("receive start command err %v", err)
  139. break
  140. }
  141. client.Send(okMsg)
  142. sj := string(msg)
  143. if sj != tt.sj {
  144. t.Errorf("start command mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", tt.sj, sj)
  145. }
  146. msg, err = client.Recv()
  147. if err != nil {
  148. t.Errorf("receive stop command err %v", err)
  149. break
  150. }
  151. client.Send(okMsg)
  152. ej := string(msg)
  153. if ej != tt.ej {
  154. t.Errorf("end command mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", tt.ej, ej)
  155. }
  156. }
  157. err = client.Close()
  158. if err != nil {
  159. t.Errorf("close client error %v", err)
  160. }
  161. err = ins.ctrlChan.Close()
  162. if err != nil {
  163. t.Errorf("close ins error %v", err)
  164. }
  165. wg.Wait()
  166. }
  167. func createMockClient(pluginName string) (mangos.Socket, error) {
  168. var (
  169. sock mangos.Socket
  170. err error
  171. )
  172. if sock, err = req.NewSocket(); err != nil {
  173. return nil, fmt.Errorf("can't get new req socket: %s", err)
  174. }
  175. setSockOptions(sock)
  176. url := fmt.Sprintf("ipc:///tmp/plugin_%s.ipc", pluginName)
  177. if err = sock.Dial(url); err != nil {
  178. return nil, fmt.Errorf("can't dial on req socket: %s", err.Error())
  179. }
  180. return sock, nil
  181. }