plugin_ins_manager_test.go 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181
  1. // Copyright 2021 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package runtime
  15. import (
  16. "fmt"
  17. "github.com/lf-edge/ekuiper/internal/conf"
  18. "github.com/lf-edge/ekuiper/internal/topo/context"
  19. "github.com/lf-edge/ekuiper/internal/topo/state"
  20. "go.nanomsg.org/mangos/v3"
  21. "go.nanomsg.org/mangos/v3/protocol/req"
  22. "testing"
  23. )
  24. // Plugin manager involves process, only covered in the integration test
  25. // TestPluginInstance test the encode/decode of command
  26. func TestPluginInstance(t *testing.T) {
  27. pluginName := "test"
  28. ch, err := CreateControlChannel(pluginName)
  29. if err != nil {
  30. t.Errorf("create channel error %v", err)
  31. return
  32. }
  33. client, err := createMockClient(pluginName)
  34. if err != nil {
  35. t.Errorf("normal process: create client error %v", err)
  36. return
  37. }
  38. err = client.Send([]byte("handshake"))
  39. if err != nil {
  40. t.Errorf("can't send handshake: %s", err.Error())
  41. return
  42. }
  43. err = ch.Handshake()
  44. if err != nil {
  45. t.Errorf("can't ack handshake: %s", err.Error())
  46. return
  47. }
  48. ins := &pluginIns{
  49. name: "test",
  50. process: nil,
  51. ctrlChan: ch,
  52. }
  53. var tests = []struct {
  54. c *Control
  55. sj string
  56. ej string
  57. }{
  58. {
  59. c: &Control{
  60. SymbolName: "symbol1",
  61. Meta: &Meta{
  62. RuleId: "rule1",
  63. OpId: "op1",
  64. InstanceId: 0,
  65. },
  66. PluginType: "sources",
  67. DataSource: "topic",
  68. Config: map[string]interface{}{"abc": 1},
  69. },
  70. sj: "{\"cmd\":\"start\",\"arg\":\"{\\\"symbolName\\\":\\\"symbol1\\\",\\\"meta\\\":{\\\"ruleId\\\":\\\"rule1\\\",\\\"opId\\\":\\\"op1\\\",\\\"instanceId\\\":0},\\\"pluginType\\\":\\\"sources\\\",\\\"dataSource\\\":\\\"topic\\\",\\\"config\\\":{\\\"abc\\\":1}}\"}",
  71. ej: "{\"cmd\":\"stop\",\"arg\":\"{\\\"symbolName\\\":\\\"symbol1\\\",\\\"meta\\\":{\\\"ruleId\\\":\\\"rule1\\\",\\\"opId\\\":\\\"op1\\\",\\\"instanceId\\\":0},\\\"pluginType\\\":\\\"sources\\\",\\\"dataSource\\\":\\\"topic\\\",\\\"config\\\":{\\\"abc\\\":1}}\"}",
  72. }, {
  73. c: &Control{
  74. SymbolName: "symbol2",
  75. Meta: &Meta{
  76. RuleId: "rule1",
  77. OpId: "op2",
  78. InstanceId: 0,
  79. },
  80. PluginType: "functions",
  81. },
  82. sj: "{\"cmd\":\"start\",\"arg\":\"{\\\"symbolName\\\":\\\"symbol2\\\",\\\"meta\\\":{\\\"ruleId\\\":\\\"rule1\\\",\\\"opId\\\":\\\"op2\\\",\\\"instanceId\\\":0},\\\"pluginType\\\":\\\"functions\\\"}\"}",
  83. ej: "{\"cmd\":\"stop\",\"arg\":\"{\\\"symbolName\\\":\\\"symbol2\\\",\\\"meta\\\":{\\\"ruleId\\\":\\\"rule1\\\",\\\"opId\\\":\\\"op2\\\",\\\"instanceId\\\":0},\\\"pluginType\\\":\\\"functions\\\"}\"}",
  84. }, {
  85. c: &Control{
  86. SymbolName: "symbol3",
  87. Meta: &Meta{
  88. RuleId: "rule1",
  89. OpId: "op3",
  90. InstanceId: 0,
  91. },
  92. PluginType: "sinks",
  93. Config: map[string]interface{}{"def": map[string]interface{}{"ci": "aaa"}},
  94. },
  95. sj: "{\"cmd\":\"start\",\"arg\":\"{\\\"symbolName\\\":\\\"symbol3\\\",\\\"meta\\\":{\\\"ruleId\\\":\\\"rule1\\\",\\\"opId\\\":\\\"op3\\\",\\\"instanceId\\\":0},\\\"pluginType\\\":\\\"sinks\\\",\\\"config\\\":{\\\"def\\\":{\\\"ci\\\":\\\"aaa\\\"}}}\"}",
  96. ej: "{\"cmd\":\"stop\",\"arg\":\"{\\\"symbolName\\\":\\\"symbol3\\\",\\\"meta\\\":{\\\"ruleId\\\":\\\"rule1\\\",\\\"opId\\\":\\\"op3\\\",\\\"instanceId\\\":0},\\\"pluginType\\\":\\\"sinks\\\",\\\"config\\\":{\\\"def\\\":{\\\"ci\\\":\\\"aaa\\\"}}}\"}",
  97. },
  98. }
  99. ctx := context.WithValue(context.Background(), context.LoggerKey, conf.Log)
  100. sctx := ctx.WithMeta("rule1", "op1", &state.MemoryStore{}).WithInstance(1)
  101. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  102. go func() {
  103. err := ins.StartSymbol(sctx, tests[0].c)
  104. if err != nil {
  105. t.Errorf("start command err %v", err)
  106. return
  107. }
  108. for _, tt := range tests {
  109. err := ins.StartSymbol(sctx, tt.c)
  110. if err != nil {
  111. t.Errorf("start command err %v", err)
  112. return
  113. }
  114. err = ins.StopSymbol(sctx, tt.c)
  115. if err != nil {
  116. t.Errorf("stop command err %v", err)
  117. return
  118. }
  119. }
  120. }()
  121. // start symbol1 to avoild instance clean
  122. msg, err := client.Recv()
  123. if err != nil {
  124. t.Errorf("receive start command err %v", err)
  125. }
  126. client.Send(okMsg)
  127. sj := string(msg)
  128. if sj != tests[0].sj {
  129. t.Errorf("start command mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", tests[0].sj, sj)
  130. }
  131. for _, tt := range tests {
  132. msg, err := client.Recv()
  133. if err != nil {
  134. t.Errorf("receive start command err %v", err)
  135. break
  136. }
  137. client.Send(okMsg)
  138. sj := string(msg)
  139. if sj != tt.sj {
  140. t.Errorf("start command mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", tt.sj, sj)
  141. }
  142. msg, err = client.Recv()
  143. if err != nil {
  144. t.Errorf("receive stop command err %v", err)
  145. break
  146. }
  147. client.Send(okMsg)
  148. ej := string(msg)
  149. if ej != tt.ej {
  150. t.Errorf("end command mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", tt.ej, ej)
  151. }
  152. }
  153. err = client.Close()
  154. if err != nil {
  155. t.Errorf("close client error %v", err)
  156. }
  157. err = ins.ctrlChan.Close()
  158. if err != nil {
  159. t.Errorf("close ins error %v", err)
  160. }
  161. }
  162. func createMockClient(pluginName string) (mangos.Socket, error) {
  163. var (
  164. sock mangos.Socket
  165. err error
  166. )
  167. if sock, err = req.NewSocket(); err != nil {
  168. return nil, fmt.Errorf("can't get new req socket: %s", err)
  169. }
  170. setSockOptions(sock)
  171. url := fmt.Sprintf("ipc:///tmp/plugin_%s.ipc", pluginName)
  172. if err = sock.Dial(url); err != nil {
  173. return nil, fmt.Errorf("can't dial on req socket: %s", err.Error())
  174. }
  175. return sock, nil
  176. }