memory_state.go 1.2 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061
  1. package states
  2. import (
  3. "fmt"
  4. "github.com/emqx/kuiper/common"
  5. "sync"
  6. )
  7. type MemoryState struct {
  8. storage sync.Map
  9. }
  10. func newMemoryState() *MemoryState {
  11. return &MemoryState{
  12. storage: sync.Map{},
  13. }
  14. }
  15. func (s *MemoryState) IncrCounter(key string, amount int) error {
  16. if v, ok := s.storage.Load(key); ok {
  17. if vi, err := common.ToInt(v); err != nil {
  18. return fmt.Errorf("state[%s] must be an int", key)
  19. } else {
  20. s.storage.Store(key, vi+amount)
  21. }
  22. } else {
  23. s.storage.Store(key, amount)
  24. }
  25. return nil
  26. }
  27. func (s *MemoryState) GetCounter(key string) (int, error) {
  28. if v, ok := s.storage.Load(key); ok {
  29. if vi, err := common.ToInt(v); err != nil {
  30. return 0, fmt.Errorf("state[%s] is not a number, but %v", key, v)
  31. } else {
  32. return vi, nil
  33. }
  34. } else {
  35. s.storage.Store(key, 0)
  36. return 0, nil
  37. }
  38. }
  39. func (s *MemoryState) PutState(key string, value interface{}) error {
  40. s.storage.Store(key, value)
  41. return nil
  42. }
  43. func (s *MemoryState) GetState(key string) (interface{}, error) {
  44. if v, ok := s.storage.Load(key); ok {
  45. return v, nil
  46. } else {
  47. return nil, nil
  48. }
  49. }
  50. func (s *MemoryState) DeleteState(key string) error {
  51. s.storage.Delete(key)
  52. return nil
  53. }