default.go 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200
  1. // Copyright 2021 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package context
  15. import (
  16. "context"
  17. "fmt"
  18. "github.com/lf-edge/ekuiper/internal/conf"
  19. "github.com/lf-edge/ekuiper/pkg/api"
  20. "github.com/lf-edge/ekuiper/pkg/cast"
  21. "github.com/sirupsen/logrus"
  22. "sync"
  23. "time"
  24. )
  25. const LoggerKey = "$$logger"
  26. type DefaultContext struct {
  27. ruleId string
  28. opId string
  29. instanceId int
  30. ctx context.Context
  31. err error
  32. //Only initialized after withMeta set
  33. store api.Store
  34. state *sync.Map
  35. snapshot map[string]interface{}
  36. }
  37. func Background() *DefaultContext {
  38. c := &DefaultContext{
  39. ctx: context.Background(),
  40. }
  41. return c
  42. }
  43. func WithValue(parent *DefaultContext, key, val interface{}) *DefaultContext {
  44. parent.ctx = context.WithValue(parent.ctx, key, val)
  45. return parent
  46. }
  47. //Implement context interface
  48. func (c *DefaultContext) Deadline() (deadline time.Time, ok bool) {
  49. return c.ctx.Deadline()
  50. }
  51. func (c *DefaultContext) Done() <-chan struct{} {
  52. return c.ctx.Done()
  53. }
  54. func (c *DefaultContext) Err() error {
  55. if c.err != nil {
  56. return c.err
  57. }
  58. return c.ctx.Err()
  59. }
  60. func (c *DefaultContext) Value(key interface{}) interface{} {
  61. return c.ctx.Value(key)
  62. }
  63. // Stream metas
  64. func (c *DefaultContext) GetContext() context.Context {
  65. return c.ctx
  66. }
  67. func (c *DefaultContext) GetLogger() api.Logger {
  68. l, ok := c.ctx.Value(LoggerKey).(*logrus.Entry)
  69. if l != nil && ok {
  70. return l
  71. }
  72. return conf.Log.WithField("caller", "default")
  73. }
  74. func (c *DefaultContext) GetRuleId() string {
  75. return c.ruleId
  76. }
  77. func (c *DefaultContext) GetOpId() string {
  78. return c.opId
  79. }
  80. func (c *DefaultContext) GetInstanceId() int {
  81. return c.instanceId
  82. }
  83. func (c *DefaultContext) GetRootPath() string {
  84. loc, _ := conf.GetLoc("")
  85. return loc
  86. }
  87. func (c *DefaultContext) SetError(err error) {
  88. c.err = err
  89. }
  90. func (c *DefaultContext) WithMeta(ruleId string, opId string, store api.Store) api.StreamContext {
  91. s, err := store.GetOpState(opId)
  92. if err != nil {
  93. c.GetLogger().Warnf("Initialize context store error for %s: %s", opId, err)
  94. }
  95. return &DefaultContext{
  96. ruleId: ruleId,
  97. opId: opId,
  98. instanceId: 0,
  99. ctx: c.ctx,
  100. store: store,
  101. state: s,
  102. }
  103. }
  104. func (c *DefaultContext) WithInstance(instanceId int) api.StreamContext {
  105. return &DefaultContext{
  106. instanceId: instanceId,
  107. ruleId: c.ruleId,
  108. opId: c.opId,
  109. ctx: c.ctx,
  110. state: c.state,
  111. }
  112. }
  113. func (c *DefaultContext) WithCancel() (api.StreamContext, context.CancelFunc) {
  114. ctx, cancel := context.WithCancel(c.ctx)
  115. return &DefaultContext{
  116. ruleId: c.ruleId,
  117. opId: c.opId,
  118. instanceId: c.instanceId,
  119. ctx: ctx,
  120. state: c.state,
  121. }, cancel
  122. }
  123. func (c *DefaultContext) IncrCounter(key string, amount int) error {
  124. if v, ok := c.state.Load(key); ok {
  125. if vi, err := cast.ToInt(v, cast.STRICT); err != nil {
  126. return fmt.Errorf("state[%s] must be an int", key)
  127. } else {
  128. c.state.Store(key, vi+amount)
  129. }
  130. } else {
  131. c.state.Store(key, amount)
  132. }
  133. return nil
  134. }
  135. func (c *DefaultContext) GetCounter(key string) (int, error) {
  136. if v, ok := c.state.Load(key); ok {
  137. if vi, err := cast.ToInt(v, cast.STRICT); err != nil {
  138. return 0, fmt.Errorf("state[%s] is not a number, but %v", key, v)
  139. } else {
  140. return vi, nil
  141. }
  142. } else {
  143. c.state.Store(key, 0)
  144. return 0, nil
  145. }
  146. }
  147. func (c *DefaultContext) PutState(key string, value interface{}) error {
  148. c.state.Store(key, value)
  149. return nil
  150. }
  151. func (c *DefaultContext) GetState(key string) (interface{}, error) {
  152. if v, ok := c.state.Load(key); ok {
  153. return v, nil
  154. } else {
  155. return nil, nil
  156. }
  157. }
  158. func (c *DefaultContext) DeleteState(key string) error {
  159. c.state.Delete(key)
  160. return nil
  161. }
  162. func (c *DefaultContext) Snapshot() error {
  163. c.snapshot = cast.SyncMapToMap(c.state)
  164. return nil
  165. }
  166. func (c *DefaultContext) SaveState(checkpointId int64) error {
  167. err := c.store.SaveState(checkpointId, c.opId, c.snapshot)
  168. if err != nil {
  169. return err
  170. }
  171. c.snapshot = nil
  172. return nil
  173. }