123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335 |
- // Copyright 2021-2022 EMQ Technologies Co., Ltd.
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- package runtime
- import (
- "encoding/json"
- "fmt"
- "github.com/lf-edge/ekuiper/internal/conf"
- "github.com/lf-edge/ekuiper/pkg/api"
- "github.com/lf-edge/ekuiper/pkg/infra"
- "os"
- "os/exec"
- "sync"
- )
- var (
- once sync.Once
- pm *pluginInsManager
- )
- // TODO setting configuration
- var PortbleConf = &PortableConfig{
- SendTimeout: 1000,
- }
- // PluginIns created at two scenarios
- // 1. At runtime, plugin is created/updated: in order to be able to reload rules that already uses previous ins
- // 2. At system start/restart, when plugin is used by a rule
- // Once created, never deleted until delete plugin command or system shutdown
- type PluginIns struct {
- sync.RWMutex
- name string
- ctrlChan ControlChannel // the same lifecycle as pluginIns, once created keep listening
- // audit the commands, so that when restarting the plugin, we can replay the commands
- commands map[Meta][]byte
- process *os.Process // created when used by rule and deleted when no rule uses it
- }
- func NewPluginIns(name string, ctrlChan ControlChannel, process *os.Process) *PluginIns {
- return &PluginIns{
- process: process,
- ctrlChan: ctrlChan,
- name: name,
- commands: make(map[Meta][]byte),
- }
- }
- func NewPluginInsForTest(name string, ctrlChan ControlChannel) *PluginIns {
- commands := make(map[Meta][]byte)
- commands[Meta{
- RuleId: "test",
- OpId: "test",
- InstanceId: 0,
- }] = []byte{}
- return &PluginIns{
- process: nil,
- ctrlChan: ctrlChan,
- name: name,
- commands: commands,
- }
- }
- func (i *PluginIns) StartSymbol(ctx api.StreamContext, ctrl *Control) error {
- arg, err := json.Marshal(ctrl)
- if err != nil {
- return err
- }
- c := Command{
- Cmd: CMD_START,
- Arg: string(arg),
- }
- jsonArg, err := json.Marshal(c)
- if err != nil {
- return err
- }
- err = i.ctrlChan.SendCmd(jsonArg)
- if err == nil {
- i.Lock()
- i.commands[ctrl.Meta] = jsonArg
- i.Unlock()
- ctx.GetLogger().Infof("started symbol %s", ctrl.SymbolName)
- }
- return err
- }
- func (i *PluginIns) StopSymbol(ctx api.StreamContext, ctrl *Control) error {
- arg, err := json.Marshal(ctrl)
- if err != nil {
- return err
- }
- c := Command{
- Cmd: CMD_STOP,
- Arg: string(arg),
- }
- jsonArg, err := json.Marshal(c)
- if err != nil {
- return err
- }
- err = i.ctrlChan.SendCmd(jsonArg)
- if err == nil {
- referred := false
- i.Lock()
- delete(i.commands, ctrl.Meta)
- referred = len(i.commands) > 0
- i.Unlock()
- ctx.GetLogger().Infof("stopped symbol %s", ctrl.SymbolName)
- if !referred {
- err := GetPluginInsManager().Kill(i.name)
- if err != nil {
- ctx.GetLogger().Infof("fail to stop plugin %s: %v", i.name, err)
- return err
- }
- ctx.GetLogger().Infof("stop plugin %s", i.name)
- }
- }
- return err
- }
- // Stop intentionally
- func (i *PluginIns) Stop() error {
- var err error
- i.RLock()
- defer i.RUnlock()
- if i.process != nil { // will also trigger process exit clean up
- err = i.process.Kill()
- }
- return err
- }
- // Manager plugin process and control socket
- type pluginInsManager struct {
- instances map[string]*PluginIns
- sync.RWMutex
- }
- func GetPluginInsManager() *pluginInsManager {
- once.Do(func() {
- pm = &pluginInsManager{
- instances: make(map[string]*PluginIns),
- }
- })
- return pm
- }
- func (p *pluginInsManager) getPluginIns(name string) (*PluginIns, bool) {
- p.RLock()
- defer p.RUnlock()
- ins, ok := p.instances[name]
- return ins, ok
- }
- // deletePluginIns should only run when there is no state aka. commands
- func (p *pluginInsManager) deletePluginIns(name string) {
- p.Lock()
- defer p.Unlock()
- delete(p.instances, name)
- }
- // AddPluginIns For mock only
- func (p *pluginInsManager) AddPluginIns(name string, ins *PluginIns) {
- p.Lock()
- defer p.Unlock()
- p.instances[name] = ins
- }
- // CreateIns Run when plugin is created/updated
- func (p *pluginInsManager) CreateIns(pluginMeta *PluginMeta) {
- p.Lock()
- defer p.Unlock()
- if ins, ok := p.instances[pluginMeta.Name]; ok {
- if len(ins.commands) != 0 {
- go p.getOrStartProcess(pluginMeta, PortbleConf, true)
- }
- }
- }
- // getOrStartProcess Control the plugin process lifecycle.
- // Need to manage the resources: instances map, control socket, plugin process
- // May be called at plugin creation or restart with previous state(ctrlCh, commands)
- // PluginIns is created by plugin manager but started by rule/funcop.
- // During plugin delete/update, if the commands is not empty, keep the ins for next creation and restore
- // 1. During creation, clean up those resources for any errors in defer immediately after the resource is created.
- // 2. During plugin running, when detecting plugin process exit, clean up those resources for the current ins.
- func (p *pluginInsManager) getOrStartProcess(pluginMeta *PluginMeta, pconf *PortableConfig, pluginCreation bool) (*PluginIns, error) {
- p.Lock()
- defer p.Unlock()
- var (
- ins *PluginIns
- ok bool
- )
- // run initialization for firstly creating plugin instance
- ins, ok = p.instances[pluginMeta.Name]
- if !ok {
- ins = NewPluginIns(pluginMeta.Name, nil, nil)
- p.instances[pluginMeta.Name] = ins
- }
- // ins process has not run yet
- if !pluginCreation && ins.ctrlChan != nil {
- return ins, nil
- }
- // should only happen for first start, then the ctrl channel will keep running
- if ins.ctrlChan == nil {
- conf.Log.Infof("create control channel")
- ctrlChan, err := CreateControlChannel(pluginMeta.Name)
- if err != nil {
- return nil, fmt.Errorf("can't create new control channel: %s", err.Error())
- }
- defer func() {
- if err != nil {
- _ = ctrlChan.Close()
- }
- }()
- ins.ctrlChan = ctrlChan
- }
- // init or restart all need to run the process
- conf.Log.Infof("executing plugin")
- jsonArg, err := json.Marshal(pconf)
- if err != nil {
- return nil, fmt.Errorf("invalid conf: %v", pconf)
- }
- var cmd *exec.Cmd
- err = infra.SafeRun(func() error {
- switch pluginMeta.Language {
- case "go":
- conf.Log.Printf("starting go plugin executable %s", pluginMeta.Executable)
- cmd = exec.Command(pluginMeta.Executable, string(jsonArg))
- case "python":
- conf.Log.Printf("starting python plugin executable %s with script %s\n", conf.Config.Portable.PythonBin, pluginMeta.Executable)
- cmd = exec.Command(conf.Config.Portable.PythonBin, pluginMeta.Executable, string(jsonArg))
- default:
- return fmt.Errorf("unsupported language: %s", pluginMeta.Language)
- }
- return nil
- })
- if err != nil {
- return nil, fmt.Errorf("fail to start plugin %s: %v", pluginMeta.Name, err)
- }
- cmd.Stdout = conf.Log.Out
- cmd.Stderr = conf.Log.Out
- conf.Log.Println("plugin starting")
- err = cmd.Start()
- if err != nil {
- return nil, fmt.Errorf("plugin executable %s stops with error %v", pluginMeta.Executable, err)
- }
- process := cmd.Process
- conf.Log.Printf("plugin started pid: %d\n", process.Pid)
- defer func() {
- if err != nil {
- _ = process.Kill()
- }
- }()
- go infra.SafeRun(func() error { // just print out error inside
- err = cmd.Wait()
- if err != nil {
- conf.Log.Printf("plugin executable %s stops with error %v", pluginMeta.Executable, err)
- }
- // must make sure the plugin ins is not cleaned up yet by checking the process identity
- // clean up for stop unintentionally
- if ins, ok := p.getPluginIns(pluginMeta.Name); ok && ins.process == cmd.Process {
- ins.Lock()
- if len(ins.commands) == 0 {
- if ins.ctrlChan != nil {
- _ = ins.ctrlChan.Close()
- }
- p.deletePluginIns(pluginMeta.Name)
- }
- ins.process = nil
- ins.Unlock()
- }
- return nil
- })
- conf.Log.Println("waiting handshake")
- err = ins.ctrlChan.Handshake()
- if err != nil {
- return nil, fmt.Errorf("plugin %s control handshake error: %v", pluginMeta.Executable, err)
- }
- ins.process = process
- p.instances[pluginMeta.Name] = ins
- conf.Log.Println("plugin start running")
- // restore symbols by sending commands when restarting plugin
- conf.Log.Info("restore plugin symbols")
- for m, c := range ins.commands {
- go func(key Meta, jsonArg []byte) {
- e := ins.ctrlChan.SendCmd(jsonArg)
- if e != nil {
- conf.Log.Errorf("send command to %v error: %v", key, e)
- }
- }(m, c)
- }
- return ins, nil
- }
- func (p *pluginInsManager) Kill(name string) error {
- p.Lock()
- defer p.Unlock()
- var err error
- if ins, ok := p.instances[name]; ok {
- err = ins.Stop()
- } else {
- conf.Log.Warnf("instance %s not found when deleting", name)
- return nil
- }
- return err
- }
- func (p *pluginInsManager) KillAll() error {
- p.Lock()
- defer p.Unlock()
- for _, ins := range p.instances {
- _ = ins.Stop()
- }
- return nil
- }
- type PluginMeta struct {
- Name string `json:"name"`
- Version string `json:"version"`
- Language string `json:"language"`
- Executable string `json:"executable"`
- }
|