plugin.go 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211
  1. // Copyright 2021-2022 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. // Plugin runtime to control the whole plugin with control channel: Distribute symbol data connection, stop symbol and stop plugin
  15. package runtime
  16. import (
  17. "encoding/json"
  18. "fmt"
  19. "os"
  20. "os/signal"
  21. "sync"
  22. "syscall"
  23. "github.com/lf-edge/ekuiper/sdk/go/api"
  24. "github.com/lf-edge/ekuiper/sdk/go/connection"
  25. "github.com/lf-edge/ekuiper/sdk/go/context"
  26. )
  27. var (
  28. logger api.Logger
  29. reg runtimes
  30. )
  31. func initVars(args []string, conf *PluginConfig) {
  32. logger = context.LogEntry("plugin", conf.Name)
  33. reg = runtimes{
  34. content: make(map[string]RuntimeInstance),
  35. RWMutex: sync.RWMutex{},
  36. }
  37. // parse Args
  38. if len(args) == 2 {
  39. pc := &PortableConfig{}
  40. err := json.Unmarshal([]byte(args[1]), pc)
  41. if err != nil {
  42. panic(fmt.Sprintf("fail to parse args %v", args))
  43. }
  44. logger.Infof("config parsed to %v", pc)
  45. }
  46. }
  47. type (
  48. NewSourceFunc func() api.Source
  49. NewFunctionFunc func() api.Function
  50. NewSinkFunc func() api.Sink
  51. )
  52. // PluginConfig construct once and then read only
  53. type PluginConfig struct {
  54. Name string
  55. Sources map[string]NewSourceFunc
  56. Functions map[string]NewFunctionFunc
  57. Sinks map[string]NewSinkFunc
  58. }
  59. func (conf *PluginConfig) Get(pluginType string, symbolName string) (builderFunc interface{}) {
  60. switch pluginType {
  61. case TYPE_SOURCE:
  62. if f, ok := conf.Sources[symbolName]; ok {
  63. return f
  64. }
  65. case TYPE_FUNC:
  66. if f, ok := conf.Functions[symbolName]; ok {
  67. return f
  68. }
  69. case TYPE_SINK:
  70. if f, ok := conf.Sinks[symbolName]; ok {
  71. return f
  72. }
  73. }
  74. return nil
  75. }
  76. // Start Connect to control plane
  77. // Only run once at process startup
  78. func Start(args []string, conf *PluginConfig) {
  79. initVars(args, conf)
  80. logger.Info("starting plugin")
  81. ch, err := connection.CreateControlChannel(conf.Name)
  82. if err != nil {
  83. panic(err)
  84. }
  85. defer ch.Close()
  86. go func() {
  87. logger.Info("running control channel")
  88. err = ch.Run(func(req []byte) []byte { // not parallel run now
  89. c := &Command{}
  90. err := json.Unmarshal(req, c)
  91. if err != nil {
  92. return []byte(err.Error())
  93. }
  94. logger.Infof("received command %s with arg:'%s'", c.Cmd, c.Arg)
  95. ctrl := &Control{}
  96. err = json.Unmarshal([]byte(c.Arg), ctrl)
  97. if err != nil {
  98. return []byte(err.Error())
  99. }
  100. switch c.Cmd {
  101. case CMD_START:
  102. f := conf.Get(ctrl.PluginType, ctrl.SymbolName)
  103. if f == nil {
  104. return []byte("symbol not found")
  105. }
  106. switch ctrl.PluginType {
  107. case TYPE_SOURCE:
  108. sf := f.(NewSourceFunc)
  109. sr, err := setupSourceRuntime(ctrl, sf())
  110. if err != nil {
  111. return []byte(err.Error())
  112. }
  113. go sr.run()
  114. regKey := fmt.Sprintf("%s_%s_%d_%s", ctrl.Meta.RuleId, ctrl.Meta.OpId, ctrl.Meta.InstanceId, ctrl.SymbolName)
  115. reg.Set(regKey, sr)
  116. logger.Infof("running source %s", ctrl.SymbolName)
  117. case TYPE_SINK:
  118. sf := f.(NewSinkFunc)
  119. sr, err := setupSinkRuntime(ctrl, sf())
  120. if err != nil {
  121. return []byte(err.Error())
  122. }
  123. go sr.run()
  124. regKey := fmt.Sprintf("%s_%s_%d_%s", ctrl.Meta.RuleId, ctrl.Meta.OpId, ctrl.Meta.InstanceId, ctrl.SymbolName)
  125. reg.Set(regKey, sr)
  126. logger.Infof("running sink %s", ctrl.SymbolName)
  127. case TYPE_FUNC:
  128. regKey := fmt.Sprintf("func_%s", ctrl.SymbolName)
  129. _, ok := reg.Get(regKey)
  130. if ok {
  131. logger.Infof("got running function instance %s, do nothing", ctrl.SymbolName)
  132. } else {
  133. ff := f.(NewFunctionFunc)
  134. fr, err := setupFuncRuntime(ctrl, ff())
  135. if err != nil {
  136. return []byte(err.Error())
  137. }
  138. go fr.run()
  139. reg.Set(regKey, fr)
  140. logger.Infof("running function %s", ctrl.SymbolName)
  141. }
  142. default:
  143. return []byte(fmt.Sprintf("invalid plugin type %s", ctrl.PluginType))
  144. }
  145. return []byte(REPLY_OK)
  146. case CMD_STOP:
  147. // never stop a function symbol here.
  148. regKey := fmt.Sprintf("%s_%s_%d_%s", ctrl.Meta.RuleId, ctrl.Meta.OpId, ctrl.Meta.InstanceId, ctrl.SymbolName)
  149. logger.Infof("stopping %s", regKey)
  150. runtime, ok := reg.Get(regKey)
  151. if !ok {
  152. return []byte(fmt.Sprintf("symbol %s not found", regKey))
  153. }
  154. if runtime.isRunning() {
  155. err = runtime.stop()
  156. if err != nil {
  157. return []byte(err.Error())
  158. }
  159. }
  160. return []byte(REPLY_OK)
  161. default:
  162. return []byte(fmt.Sprintf("invalid command received: %s", c.Cmd))
  163. }
  164. })
  165. if err != nil {
  166. logger.Error(err)
  167. }
  168. os.Exit(1)
  169. }()
  170. // Stop the whole plugin
  171. sigint := make(chan os.Signal, 1)
  172. signal.Notify(sigint, os.Interrupt, syscall.SIGTERM, syscall.SIGKILL) //nolint:staticcheck
  173. <-sigint
  174. logger.Infof("stopping plugin %s", conf.Name)
  175. os.Exit(0)
  176. }
  177. // key is rule_op_ins_symbol
  178. type runtimes struct {
  179. content map[string]RuntimeInstance
  180. sync.RWMutex
  181. }
  182. func (r *runtimes) Set(name string, instance RuntimeInstance) {
  183. r.Lock()
  184. defer r.Unlock()
  185. r.content[name] = instance
  186. }
  187. func (r *runtimes) Get(name string) (RuntimeInstance, bool) {
  188. r.RLock()
  189. defer r.RUnlock()
  190. result, ok := r.content[name]
  191. return result, ok
  192. }
  193. func (r *runtimes) Delete(name string) {
  194. r.Lock()
  195. defer r.Unlock()
  196. delete(r.content, name)
  197. }