default.go 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257
  1. // Copyright 2022 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package context
  15. import (
  16. "bytes"
  17. "context"
  18. "fmt"
  19. "github.com/lf-edge/ekuiper/internal/conf"
  20. "github.com/lf-edge/ekuiper/internal/topo/connection/clients"
  21. "github.com/lf-edge/ekuiper/internal/topo/transform"
  22. "github.com/lf-edge/ekuiper/pkg/api"
  23. "github.com/lf-edge/ekuiper/pkg/cast"
  24. "github.com/sirupsen/logrus"
  25. "regexp"
  26. "sync"
  27. "text/template"
  28. "time"
  29. )
  30. const LoggerKey = "$$logger"
  31. type DefaultContext struct {
  32. ruleId string
  33. opId string
  34. instanceId int
  35. ctx context.Context
  36. err error
  37. //Only initialized after withMeta set
  38. store api.Store
  39. state *sync.Map
  40. snapshot map[string]interface{}
  41. // cache
  42. tpReg sync.Map
  43. jpReg sync.Map
  44. }
  45. func Background() *DefaultContext {
  46. c := &DefaultContext{
  47. ctx: context.Background(),
  48. }
  49. return c
  50. }
  51. func WithValue(parent *DefaultContext, key, val interface{}) *DefaultContext {
  52. parent.ctx = context.WithValue(parent.ctx, key, val)
  53. return parent
  54. }
  55. //Implement context interface
  56. func (c *DefaultContext) Deadline() (deadline time.Time, ok bool) {
  57. return c.ctx.Deadline()
  58. }
  59. func (c *DefaultContext) Done() <-chan struct{} {
  60. return c.ctx.Done()
  61. }
  62. func (c *DefaultContext) Err() error {
  63. if c.err != nil {
  64. return c.err
  65. }
  66. return c.ctx.Err()
  67. }
  68. func (c *DefaultContext) Value(key interface{}) interface{} {
  69. return c.ctx.Value(key)
  70. }
  71. func (c *DefaultContext) GetContext() context.Context {
  72. return c.ctx
  73. }
  74. func (c *DefaultContext) GetLogger() api.Logger {
  75. l, ok := c.ctx.Value(LoggerKey).(*logrus.Entry)
  76. if l != nil && ok {
  77. return l
  78. }
  79. return conf.Log.WithField("caller", "default")
  80. }
  81. func (c *DefaultContext) GetRuleId() string {
  82. return c.ruleId
  83. }
  84. func (c *DefaultContext) GetOpId() string {
  85. return c.opId
  86. }
  87. func (c *DefaultContext) GetInstanceId() int {
  88. return c.instanceId
  89. }
  90. func (c *DefaultContext) GetRootPath() string {
  91. loc, _ := conf.GetLoc("")
  92. return loc
  93. }
  94. func (c *DefaultContext) SetError(err error) {
  95. c.err = err
  96. }
  97. func (c *DefaultContext) ParseTemplate(prop string, data interface{}) (string, error) {
  98. re := regexp.MustCompile(`{{(.*?)}}`)
  99. if re.Match([]byte(prop)) {
  100. var (
  101. tp *template.Template
  102. err error
  103. )
  104. if raw, ok := c.tpReg.Load(prop); ok {
  105. tp = raw.(*template.Template)
  106. } else {
  107. tp, err = transform.GenTp(prop)
  108. if err != nil {
  109. return fmt.Sprintf("%v", data), err
  110. }
  111. c.tpReg.Store(prop, tp)
  112. }
  113. var output bytes.Buffer
  114. err = tp.Execute(&output, data)
  115. if err != nil {
  116. return fmt.Sprintf("%v", data), err
  117. }
  118. return output.String(), nil
  119. } else {
  120. return prop, nil
  121. }
  122. }
  123. func (c *DefaultContext) ParseJsonPath(prop string, data interface{}) (interface{}, error) {
  124. var (
  125. je conf.JsonPathEval
  126. err error
  127. )
  128. if raw, ok := c.jpReg.Load(prop); ok {
  129. je = raw.(conf.JsonPathEval)
  130. } else {
  131. je, err = conf.GetJsonPathEval(prop)
  132. if err != nil {
  133. return nil, err
  134. }
  135. c.jpReg.Store(prop, je)
  136. }
  137. return je.Eval(data)
  138. }
  139. func (c *DefaultContext) WithMeta(ruleId string, opId string, store api.Store) api.StreamContext {
  140. s, err := store.GetOpState(opId)
  141. if err != nil {
  142. c.GetLogger().Warnf("Initialize context store error for %s: %s", opId, err)
  143. }
  144. return &DefaultContext{
  145. ruleId: ruleId,
  146. opId: opId,
  147. instanceId: 0,
  148. ctx: c.ctx,
  149. store: store,
  150. state: s,
  151. tpReg: sync.Map{},
  152. jpReg: sync.Map{},
  153. }
  154. }
  155. func (c *DefaultContext) WithInstance(instanceId int) api.StreamContext {
  156. return &DefaultContext{
  157. instanceId: instanceId,
  158. ruleId: c.ruleId,
  159. opId: c.opId,
  160. ctx: c.ctx,
  161. state: c.state,
  162. }
  163. }
  164. func (c *DefaultContext) WithCancel() (api.StreamContext, context.CancelFunc) {
  165. ctx, cancel := context.WithCancel(c.ctx)
  166. return &DefaultContext{
  167. ruleId: c.ruleId,
  168. opId: c.opId,
  169. instanceId: c.instanceId,
  170. ctx: ctx,
  171. state: c.state,
  172. }, cancel
  173. }
  174. func (c *DefaultContext) IncrCounter(key string, amount int) error {
  175. if v, ok := c.state.Load(key); ok {
  176. if vi, err := cast.ToInt(v, cast.STRICT); err != nil {
  177. return fmt.Errorf("state[%s] must be an int", key)
  178. } else {
  179. c.state.Store(key, vi+amount)
  180. }
  181. } else {
  182. c.state.Store(key, amount)
  183. }
  184. return nil
  185. }
  186. func (c *DefaultContext) GetCounter(key string) (int, error) {
  187. if v, ok := c.state.Load(key); ok {
  188. if vi, err := cast.ToInt(v, cast.STRICT); err != nil {
  189. return 0, fmt.Errorf("state[%s] is not a number, but %v", key, v)
  190. } else {
  191. return vi, nil
  192. }
  193. } else {
  194. c.state.Store(key, 0)
  195. return 0, nil
  196. }
  197. }
  198. func (c *DefaultContext) PutState(key string, value interface{}) error {
  199. c.state.Store(key, value)
  200. return nil
  201. }
  202. func (c *DefaultContext) GetState(key string) (interface{}, error) {
  203. if v, ok := c.state.Load(key); ok {
  204. return v, nil
  205. } else {
  206. return nil, nil
  207. }
  208. }
  209. func (c *DefaultContext) DeleteState(key string) error {
  210. c.state.Delete(key)
  211. return nil
  212. }
  213. func (c *DefaultContext) Snapshot() error {
  214. c.snapshot = cast.SyncMapToMap(c.state)
  215. return nil
  216. }
  217. func (c *DefaultContext) SaveState(checkpointId int64) error {
  218. err := c.store.SaveState(checkpointId, c.opId, c.snapshot)
  219. if err != nil {
  220. return err
  221. }
  222. c.snapshot = nil
  223. return nil
  224. }
  225. func (c *DefaultContext) GetClient(clientType string, config map[string]interface{}) (api.MessageClient, error) {
  226. return clients.GetClient(clientType, config)
  227. }