function.go 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150
  1. // Copyright 2021 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package runtime
  15. import (
  16. context2 "context"
  17. "encoding/json"
  18. "fmt"
  19. "github.com/lf-edge/ekuiper/sdk/api"
  20. "github.com/lf-edge/ekuiper/sdk/connection"
  21. "github.com/lf-edge/ekuiper/sdk/context"
  22. "sync"
  23. )
  24. type funcRuntime struct {
  25. s api.Function
  26. ch connection.DataInOutChannel
  27. ctx context2.Context
  28. cancel context2.CancelFunc
  29. key string
  30. }
  31. func setupFuncRuntime(con *Control, s api.Function) (*funcRuntime, error) {
  32. // connect to mq server
  33. ch, err := connection.CreateFuncChannel(con.SymbolName)
  34. if err != nil {
  35. return nil, err
  36. }
  37. context.Log.Info("setup function channel")
  38. ctx, cancel := context2.WithCancel(context2.Background())
  39. return &funcRuntime{
  40. s: s,
  41. ch: ch,
  42. ctx: ctx,
  43. cancel: cancel,
  44. key: fmt.Sprintf("func_%s", con.SymbolName),
  45. }, nil
  46. }
  47. // TODO how to stop? Nearly never end because each function only have one instance
  48. func (s *funcRuntime) run() {
  49. defer s.stop()
  50. err := s.ch.Run(func(req []byte) []byte {
  51. d := &FuncData{}
  52. err := json.Unmarshal(req, d)
  53. if err != nil {
  54. return encodeReply(false, err)
  55. }
  56. context.Log.Debugf("running func with %+v", d)
  57. switch d.Func {
  58. case "Validate":
  59. arg, ok := d.Arg.([]interface{})
  60. if !ok {
  61. return encodeReply(false, "argument is not interface array")
  62. }
  63. err = s.s.Validate(arg)
  64. if err == nil {
  65. return encodeReply(true, "")
  66. } else {
  67. return encodeReply(false, err.Error())
  68. }
  69. case "Exec":
  70. arg, ok := d.Arg.([]interface{})
  71. if !ok {
  72. return encodeReply(false, "argument is not interface array")
  73. }
  74. farg, fctx, err := parseFuncContextArgs(arg)
  75. if err != nil {
  76. return encodeReply(false, err.Error())
  77. }
  78. r, b := s.s.Exec(farg, fctx)
  79. return encodeReply(b, r)
  80. case "IsAggregate":
  81. result := s.s.IsAggregate()
  82. return encodeReply(true, result)
  83. default:
  84. return encodeReply(false, fmt.Sprintf("invalid func %s", d.Func))
  85. }
  86. })
  87. context.Log.Error(err)
  88. }
  89. // TODO multiple error
  90. func (s *funcRuntime) stop() error {
  91. s.cancel()
  92. err := s.ch.Close()
  93. if err != nil {
  94. context.Log.Info(err)
  95. }
  96. context.Log.Info("closed function data channel")
  97. reg.Delete(s.key)
  98. return nil
  99. }
  100. func (s *funcRuntime) isRunning() bool {
  101. return s.ctx.Err() == nil
  102. }
  103. func encodeReply(state bool, arg interface{}) []byte {
  104. r, _ := json.Marshal(FuncReply{
  105. State: state,
  106. Result: arg,
  107. })
  108. return r
  109. }
  110. func parseFuncContextArgs(args []interface{}) ([]interface{}, api.FunctionContext, error) {
  111. if len(args) < 1 {
  112. return nil, nil, fmt.Errorf("exec function context not found")
  113. }
  114. fargs, temp := args[:len(args)-1], args[len(args)-1]
  115. rawCtx, ok := temp.(string)
  116. if !ok {
  117. return nil, nil, fmt.Errorf("cannot parse function raw context %v", temp)
  118. }
  119. m := &FuncMeta{}
  120. err := json.Unmarshal([]byte(rawCtx), m)
  121. if err != nil {
  122. return nil, nil, fmt.Errorf("cannot parse function context %v", rawCtx)
  123. }
  124. if m.RuleId == "" || m.OpId == "" {
  125. err := fmt.Sprintf("invalid arg %v, ruleId, opId are required", m)
  126. context.Log.Errorf(err)
  127. return nil, nil, fmt.Errorf(err)
  128. }
  129. key := fmt.Sprintf("%s_%s_%d_%d", m.RuleId, m.OpId, m.InstanceId, m.FuncId)
  130. if c, ok := exeFuncCtxMap.Load(key); ok {
  131. return fargs, c.(api.FunctionContext), nil
  132. } else {
  133. contextLogger := context.LogEntry("rule", m.RuleId)
  134. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger).WithMeta(m.RuleId, m.OpId)
  135. fctx := context.NewDefaultFuncContext(ctx, m.FuncId)
  136. exeFuncCtxMap.Store(key, fctx)
  137. return fargs, fctx, nil
  138. }
  139. }
  140. var exeFuncCtxMap = &sync.Map{}