default.go 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209
  1. // Copyright 2021 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package context
  15. import (
  16. "context"
  17. "fmt"
  18. "github.com/lf-edge/ekuiper/internal/conf"
  19. "github.com/lf-edge/ekuiper/internal/topo/connection"
  20. "github.com/lf-edge/ekuiper/pkg/api"
  21. "github.com/lf-edge/ekuiper/pkg/cast"
  22. "github.com/sirupsen/logrus"
  23. "sync"
  24. "time"
  25. )
  26. const LoggerKey = "$$logger"
  27. type DefaultContext struct {
  28. ruleId string
  29. opId string
  30. instanceId int
  31. ctx context.Context
  32. err error
  33. //Only initialized after withMeta set
  34. store api.Store
  35. state *sync.Map
  36. snapshot map[string]interface{}
  37. }
  38. func Background() *DefaultContext {
  39. c := &DefaultContext{
  40. ctx: context.Background(),
  41. }
  42. return c
  43. }
  44. func WithValue(parent *DefaultContext, key, val interface{}) *DefaultContext {
  45. parent.ctx = context.WithValue(parent.ctx, key, val)
  46. return parent
  47. }
  48. //Implement context interface
  49. func (c *DefaultContext) Deadline() (deadline time.Time, ok bool) {
  50. return c.ctx.Deadline()
  51. }
  52. func (c *DefaultContext) Done() <-chan struct{} {
  53. return c.ctx.Done()
  54. }
  55. func (c *DefaultContext) Err() error {
  56. if c.err != nil {
  57. return c.err
  58. }
  59. return c.ctx.Err()
  60. }
  61. func (c *DefaultContext) Value(key interface{}) interface{} {
  62. return c.ctx.Value(key)
  63. }
  64. // Stream metas
  65. func (c *DefaultContext) GetContext() context.Context {
  66. return c.ctx
  67. }
  68. func (c *DefaultContext) GetLogger() api.Logger {
  69. l, ok := c.ctx.Value(LoggerKey).(*logrus.Entry)
  70. if l != nil && ok {
  71. return l
  72. }
  73. return conf.Log.WithField("caller", "default")
  74. }
  75. func (c *DefaultContext) GetRuleId() string {
  76. return c.ruleId
  77. }
  78. func (c *DefaultContext) GetOpId() string {
  79. return c.opId
  80. }
  81. func (c *DefaultContext) GetInstanceId() int {
  82. return c.instanceId
  83. }
  84. func (c *DefaultContext) GetRootPath() string {
  85. loc, _ := conf.GetLoc("")
  86. return loc
  87. }
  88. func (c *DefaultContext) SetError(err error) {
  89. c.err = err
  90. }
  91. func (c *DefaultContext) WithMeta(ruleId string, opId string, store api.Store) api.StreamContext {
  92. s, err := store.GetOpState(opId)
  93. if err != nil {
  94. c.GetLogger().Warnf("Initialize context store error for %s: %s", opId, err)
  95. }
  96. return &DefaultContext{
  97. ruleId: ruleId,
  98. opId: opId,
  99. instanceId: 0,
  100. ctx: c.ctx,
  101. store: store,
  102. state: s,
  103. }
  104. }
  105. func (c *DefaultContext) WithInstance(instanceId int) api.StreamContext {
  106. return &DefaultContext{
  107. instanceId: instanceId,
  108. ruleId: c.ruleId,
  109. opId: c.opId,
  110. ctx: c.ctx,
  111. state: c.state,
  112. }
  113. }
  114. func (c *DefaultContext) WithCancel() (api.StreamContext, context.CancelFunc) {
  115. ctx, cancel := context.WithCancel(c.ctx)
  116. return &DefaultContext{
  117. ruleId: c.ruleId,
  118. opId: c.opId,
  119. instanceId: c.instanceId,
  120. ctx: ctx,
  121. state: c.state,
  122. }, cancel
  123. }
  124. func (c *DefaultContext) IncrCounter(key string, amount int) error {
  125. if v, ok := c.state.Load(key); ok {
  126. if vi, err := cast.ToInt(v, cast.STRICT); err != nil {
  127. return fmt.Errorf("state[%s] must be an int", key)
  128. } else {
  129. c.state.Store(key, vi+amount)
  130. }
  131. } else {
  132. c.state.Store(key, amount)
  133. }
  134. return nil
  135. }
  136. func (c *DefaultContext) GetCounter(key string) (int, error) {
  137. if v, ok := c.state.Load(key); ok {
  138. if vi, err := cast.ToInt(v, cast.STRICT); err != nil {
  139. return 0, fmt.Errorf("state[%s] is not a number, but %v", key, v)
  140. } else {
  141. return vi, nil
  142. }
  143. } else {
  144. c.state.Store(key, 0)
  145. return 0, nil
  146. }
  147. }
  148. func (c *DefaultContext) PutState(key string, value interface{}) error {
  149. c.state.Store(key, value)
  150. return nil
  151. }
  152. func (c *DefaultContext) GetState(key string) (interface{}, error) {
  153. if v, ok := c.state.Load(key); ok {
  154. return v, nil
  155. } else {
  156. return nil, nil
  157. }
  158. }
  159. func (c *DefaultContext) DeleteState(key string) error {
  160. c.state.Delete(key)
  161. return nil
  162. }
  163. func (c *DefaultContext) Snapshot() error {
  164. c.snapshot = cast.SyncMapToMap(c.state)
  165. return nil
  166. }
  167. func (c *DefaultContext) SaveState(checkpointId int64) error {
  168. err := c.store.SaveState(checkpointId, c.opId, c.snapshot)
  169. if err != nil {
  170. return err
  171. }
  172. c.snapshot = nil
  173. return nil
  174. }
  175. func (c *DefaultContext) GetConnection(connectSelector string) (interface{}, error) {
  176. return connection.GetConnection(connectSelector)
  177. }
  178. func (c *DefaultContext) ReleaseConnection(connectSelector string) {
  179. connection.ReleaseConnection(connectSelector)
  180. }