default.go 2.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108
  1. package contexts
  2. import (
  3. "context"
  4. "github.com/emqx/kuiper/common"
  5. "github.com/emqx/kuiper/xstream/api"
  6. "github.com/sirupsen/logrus"
  7. "time"
  8. )
  9. const LoggerKey = "$$logger"
  10. type DefaultContext struct {
  11. ruleId string
  12. opId string
  13. instanceId int
  14. ctx context.Context
  15. err error
  16. }
  17. func Background() *DefaultContext {
  18. c := &DefaultContext{
  19. ctx: context.Background(),
  20. }
  21. return c
  22. }
  23. func WithValue(parent *DefaultContext, key, val interface{}) *DefaultContext {
  24. parent.ctx = context.WithValue(parent.ctx, key, val)
  25. return parent
  26. }
  27. //Implement context interface
  28. func (c *DefaultContext) Deadline() (deadline time.Time, ok bool) {
  29. return c.ctx.Deadline()
  30. }
  31. func (c *DefaultContext) Done() <-chan struct{} {
  32. return c.ctx.Done()
  33. }
  34. func (c *DefaultContext) Err() error {
  35. if c.err != nil {
  36. return c.err
  37. }
  38. return c.ctx.Err()
  39. }
  40. func (c *DefaultContext) Value(key interface{}) interface{} {
  41. return c.ctx.Value(key)
  42. }
  43. // Stream metas
  44. func (c *DefaultContext) GetContext() context.Context {
  45. return c.ctx
  46. }
  47. func (c *DefaultContext) GetLogger() api.Logger {
  48. l, ok := c.ctx.Value(LoggerKey).(*logrus.Entry)
  49. if l != nil && ok {
  50. return l
  51. }
  52. return common.Log.WithField("caller", "default")
  53. }
  54. func (c *DefaultContext) GetRuleId() string {
  55. return c.ruleId
  56. }
  57. func (c *DefaultContext) GetOpId() string {
  58. return c.opId
  59. }
  60. func (c *DefaultContext) GetInstanceId() int {
  61. return c.instanceId
  62. }
  63. func (c *DefaultContext) SetError(err error) {
  64. c.err = err
  65. }
  66. func (c *DefaultContext) WithMeta(ruleId string, opId string) api.StreamContext {
  67. return &DefaultContext{
  68. ruleId: ruleId,
  69. opId: opId,
  70. instanceId: 0,
  71. ctx: c.ctx,
  72. }
  73. }
  74. func (c *DefaultContext) WithInstance(instanceId int) api.StreamContext {
  75. return &DefaultContext{
  76. instanceId: instanceId,
  77. ruleId: c.ruleId,
  78. opId: c.opId,
  79. ctx: c.ctx,
  80. }
  81. }
  82. func (c *DefaultContext) WithCancel() (api.StreamContext, context.CancelFunc) {
  83. ctx, cancel := context.WithCancel(c.ctx)
  84. return &DefaultContext{
  85. ruleId: c.ruleId,
  86. opId: c.opId,
  87. instanceId: c.instanceId,
  88. ctx: ctx,
  89. }, cancel
  90. }