default.go 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257
  1. // Copyright 2022 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package context
  15. import (
  16. "bytes"
  17. "context"
  18. "fmt"
  19. "github.com/lf-edge/ekuiper/internal/conf"
  20. "github.com/lf-edge/ekuiper/internal/topo/connection/clients"
  21. "github.com/lf-edge/ekuiper/internal/topo/transform"
  22. "github.com/lf-edge/ekuiper/pkg/api"
  23. "github.com/lf-edge/ekuiper/pkg/cast"
  24. "github.com/sirupsen/logrus"
  25. "regexp"
  26. "strings"
  27. "sync"
  28. "text/template"
  29. "time"
  30. )
  31. const LoggerKey = "$$logger"
  32. type DefaultContext struct {
  33. ruleId string
  34. opId string
  35. instanceId int
  36. ctx context.Context
  37. err error
  38. //Only initialized after withMeta set
  39. store api.Store
  40. state *sync.Map
  41. snapshot map[string]interface{}
  42. // cache
  43. tpReg sync.Map
  44. jpReg sync.Map
  45. }
  46. func Background() *DefaultContext {
  47. c := &DefaultContext{
  48. ctx: context.Background(),
  49. }
  50. return c
  51. }
  52. func WithValue(parent *DefaultContext, key, val interface{}) *DefaultContext {
  53. parent.ctx = context.WithValue(parent.ctx, key, val)
  54. return parent
  55. }
  56. //Implement context interface
  57. func (c *DefaultContext) Deadline() (deadline time.Time, ok bool) {
  58. return c.ctx.Deadline()
  59. }
  60. func (c *DefaultContext) Done() <-chan struct{} {
  61. return c.ctx.Done()
  62. }
  63. func (c *DefaultContext) Err() error {
  64. if c.err != nil {
  65. return c.err
  66. }
  67. return c.ctx.Err()
  68. }
  69. func (c *DefaultContext) Value(key interface{}) interface{} {
  70. return c.ctx.Value(key)
  71. }
  72. // Stream metas
  73. func (c *DefaultContext) GetContext() context.Context {
  74. return c.ctx
  75. }
  76. func (c *DefaultContext) GetLogger() api.Logger {
  77. l, ok := c.ctx.Value(LoggerKey).(*logrus.Entry)
  78. if l != nil && ok {
  79. return l
  80. }
  81. return conf.Log.WithField("caller", "default")
  82. }
  83. func (c *DefaultContext) GetRuleId() string {
  84. return c.ruleId
  85. }
  86. func (c *DefaultContext) GetOpId() string {
  87. return c.opId
  88. }
  89. func (c *DefaultContext) GetInstanceId() int {
  90. return c.instanceId
  91. }
  92. func (c *DefaultContext) GetRootPath() string {
  93. loc, _ := conf.GetLoc("")
  94. return loc
  95. }
  96. func (c *DefaultContext) SetError(err error) {
  97. c.err = err
  98. }
  99. func (c *DefaultContext) ParseDynamicProp(prop string, data interface{}) (interface{}, error) {
  100. re := regexp.MustCompile(`{{(.*?)}}`)
  101. if re.Match([]byte(prop)) {
  102. var (
  103. tp *template.Template
  104. err error
  105. )
  106. if raw, ok := c.tpReg.Load(prop); ok {
  107. tp = raw.(*template.Template)
  108. } else {
  109. tp, err = transform.GenTp(prop)
  110. if err != nil {
  111. return fmt.Sprintf("%v", data), err
  112. }
  113. c.tpReg.Store(prop, tp)
  114. }
  115. var output bytes.Buffer
  116. err = tp.Execute(&output, data)
  117. if err != nil {
  118. return fmt.Sprintf("%v", data), err
  119. }
  120. return output.String(), nil
  121. } else if strings.HasPrefix(prop, "{$") { //TO BE REMOVED: will be extracted as a new function in the next release
  122. var (
  123. je conf.JsonPathEval
  124. err error
  125. )
  126. if raw, ok := c.jpReg.Load(prop); ok {
  127. je = raw.(conf.JsonPathEval)
  128. } else {
  129. je, err = conf.GetJsonPathEval(prop[1:])
  130. if err != nil {
  131. return nil, err
  132. }
  133. c.jpReg.Store(prop, je)
  134. }
  135. return je.Eval(data)
  136. } else {
  137. return prop, nil
  138. }
  139. }
  140. func (c *DefaultContext) WithMeta(ruleId string, opId string, store api.Store) api.StreamContext {
  141. s, err := store.GetOpState(opId)
  142. if err != nil {
  143. c.GetLogger().Warnf("Initialize context store error for %s: %s", opId, err)
  144. }
  145. return &DefaultContext{
  146. ruleId: ruleId,
  147. opId: opId,
  148. instanceId: 0,
  149. ctx: c.ctx,
  150. store: store,
  151. state: s,
  152. tpReg: sync.Map{},
  153. jpReg: sync.Map{},
  154. }
  155. }
  156. func (c *DefaultContext) WithInstance(instanceId int) api.StreamContext {
  157. return &DefaultContext{
  158. instanceId: instanceId,
  159. ruleId: c.ruleId,
  160. opId: c.opId,
  161. ctx: c.ctx,
  162. state: c.state,
  163. }
  164. }
  165. func (c *DefaultContext) WithCancel() (api.StreamContext, context.CancelFunc) {
  166. ctx, cancel := context.WithCancel(c.ctx)
  167. return &DefaultContext{
  168. ruleId: c.ruleId,
  169. opId: c.opId,
  170. instanceId: c.instanceId,
  171. ctx: ctx,
  172. state: c.state,
  173. }, cancel
  174. }
  175. func (c *DefaultContext) IncrCounter(key string, amount int) error {
  176. if v, ok := c.state.Load(key); ok {
  177. if vi, err := cast.ToInt(v, cast.STRICT); err != nil {
  178. return fmt.Errorf("state[%s] must be an int", key)
  179. } else {
  180. c.state.Store(key, vi+amount)
  181. }
  182. } else {
  183. c.state.Store(key, amount)
  184. }
  185. return nil
  186. }
  187. func (c *DefaultContext) GetCounter(key string) (int, error) {
  188. if v, ok := c.state.Load(key); ok {
  189. if vi, err := cast.ToInt(v, cast.STRICT); err != nil {
  190. return 0, fmt.Errorf("state[%s] is not a number, but %v", key, v)
  191. } else {
  192. return vi, nil
  193. }
  194. } else {
  195. c.state.Store(key, 0)
  196. return 0, nil
  197. }
  198. }
  199. func (c *DefaultContext) PutState(key string, value interface{}) error {
  200. c.state.Store(key, value)
  201. return nil
  202. }
  203. func (c *DefaultContext) GetState(key string) (interface{}, error) {
  204. if v, ok := c.state.Load(key); ok {
  205. return v, nil
  206. } else {
  207. return nil, nil
  208. }
  209. }
  210. func (c *DefaultContext) DeleteState(key string) error {
  211. c.state.Delete(key)
  212. return nil
  213. }
  214. func (c *DefaultContext) Snapshot() error {
  215. c.snapshot = cast.SyncMapToMap(c.state)
  216. return nil
  217. }
  218. func (c *DefaultContext) SaveState(checkpointId int64) error {
  219. err := c.store.SaveState(checkpointId, c.opId, c.snapshot)
  220. if err != nil {
  221. return err
  222. }
  223. c.snapshot = nil
  224. return nil
  225. }
  226. func (c *DefaultContext) GetClient(clientType string, config map[string]interface{}) (api.MessageClient, error) {
  227. return clients.GetClient(clientType, config)
  228. }