default.go 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136
  1. package contexts
  2. import (
  3. "context"
  4. "github.com/emqx/kuiper/common"
  5. "github.com/emqx/kuiper/xstream/api"
  6. "github.com/emqx/kuiper/xstream/states"
  7. "github.com/sirupsen/logrus"
  8. "time"
  9. )
  10. const LoggerKey = "$$logger"
  11. type DefaultContext struct {
  12. ruleId string
  13. opId string
  14. instanceId int
  15. ctx context.Context
  16. err error
  17. state states.StateContext
  18. }
  19. func Background() *DefaultContext {
  20. c := &DefaultContext{
  21. ctx: context.Background(),
  22. }
  23. s := states.NewStateContext(states.MEMORY, c.GetLogger())
  24. c.state = s
  25. return c
  26. }
  27. func WithValue(parent *DefaultContext, key, val interface{}) *DefaultContext {
  28. parent.ctx = context.WithValue(parent.ctx, key, val)
  29. return parent
  30. }
  31. //Implement context interface
  32. func (c *DefaultContext) Deadline() (deadline time.Time, ok bool) {
  33. return c.ctx.Deadline()
  34. }
  35. func (c *DefaultContext) Done() <-chan struct{} {
  36. return c.ctx.Done()
  37. }
  38. func (c *DefaultContext) Err() error {
  39. if c.err != nil {
  40. return c.err
  41. }
  42. return c.ctx.Err()
  43. }
  44. func (c *DefaultContext) Value(key interface{}) interface{} {
  45. return c.ctx.Value(key)
  46. }
  47. // Stream metas
  48. func (c *DefaultContext) GetContext() context.Context {
  49. return c.ctx
  50. }
  51. func (c *DefaultContext) GetLogger() api.Logger {
  52. l, ok := c.ctx.Value(LoggerKey).(*logrus.Entry)
  53. if l != nil && ok {
  54. return l
  55. }
  56. return common.Log.WithField("caller", "default")
  57. }
  58. func (c *DefaultContext) GetRuleId() string {
  59. return c.ruleId
  60. }
  61. func (c *DefaultContext) GetOpId() string {
  62. return c.opId
  63. }
  64. func (c *DefaultContext) GetInstanceId() int {
  65. return c.instanceId
  66. }
  67. func (c *DefaultContext) SetError(err error) {
  68. c.err = err
  69. }
  70. func (c *DefaultContext) WithMeta(ruleId string, opId string) api.StreamContext {
  71. return &DefaultContext{
  72. ruleId: ruleId,
  73. opId: opId,
  74. instanceId: 0,
  75. ctx: c.ctx,
  76. state: c.state,
  77. }
  78. }
  79. func (c *DefaultContext) WithInstance(instanceId int) api.StreamContext {
  80. return &DefaultContext{
  81. instanceId: instanceId,
  82. ruleId: c.ruleId,
  83. opId: c.opId,
  84. ctx: c.ctx,
  85. state: c.state,
  86. }
  87. }
  88. func (c *DefaultContext) WithCancel() (api.StreamContext, context.CancelFunc) {
  89. ctx, cancel := context.WithCancel(c.ctx)
  90. return &DefaultContext{
  91. ruleId: c.ruleId,
  92. opId: c.opId,
  93. instanceId: c.instanceId,
  94. ctx: ctx,
  95. state: c.state,
  96. }, cancel
  97. }
  98. func (c *DefaultContext) IncrCounter(key string, amount int) error {
  99. return c.state.IncrCounter(key, amount)
  100. }
  101. func (c *DefaultContext) GetCounter(key string) (int, error) {
  102. return c.state.GetCounter(key)
  103. }
  104. func (c *DefaultContext) PutState(key string, value interface{}) error {
  105. return c.state.PutState(key, value)
  106. }
  107. func (c *DefaultContext) GetState(key string) (interface{}, error) {
  108. return c.state.GetState(key)
  109. }
  110. func (c *DefaultContext) DeleteState(key string) error {
  111. return c.state.DeleteState(key)
  112. }