util.go 5.9 KB


  1. package common
  2. import (
  3. "bytes"
  4. "context"
  5. "fmt"
  6. "github.com/dgraph-io/badger"
  7. "github.com/go-yaml/yaml"
  8. "github.com/sirupsen/logrus"
  9. "io/ioutil"
  10. "os"
  11. "path"
  12. "path/filepath"
  13. )
  14. const (
  15. logFileName = "stream.log"
  16. LoggerKey = "logger"
  17. etc_dir = "/etc/"
  18. data_dir = "/data/"
  19. log_dir = "/log/"
  20. )
  21. var (
  22. Log *logrus.Logger
  23. Config *XStreamConf
  24. IsTesting bool
  25. logFile *os.File
  26. mockTicker *MockTicker
  27. mockTimer *MockTimer
  28. mockNow int64
  29. )
  30. type logRedirect struct {
  31. }
  32. func (l *logRedirect) Errorf(f string, v ...interface{}) {
  33. Log.Error(fmt.Sprintf(f, v...))
  34. }
  35. func (l *logRedirect) Infof(f string, v ...interface{}) {
  36. Log.Info(fmt.Sprintf(f, v...))
  37. }
  38. func (l *logRedirect) Warningf(f string, v ...interface{}) {
  39. Log.Warning(fmt.Sprintf(f, v...))
  40. }
  41. func (l *logRedirect) Debugf(f string, v ...interface{}) {
  42. Log.Debug(fmt.Sprintf(f, v...))
  43. }
  44. func GetLogger(ctx context.Context) *logrus.Entry {
  45. if ctx != nil{
  46. l, ok := ctx.Value(LoggerKey).(*logrus.Entry)
  47. if l != nil && ok {
  48. return l
  49. }
  50. }
  51. return Log.WithField("caller", "default")
  52. }
  53. func LoadConf(confName string) []byte {
  54. confDir, err := GetConfLoc()
  55. if err != nil {
  56. Log.Fatal(err)
  57. }
  58. file := confDir + confName
  59. b, err := ioutil.ReadFile(file)
  60. if err != nil {
  61. Log.Fatal(err)
  62. }
  63. return b
  64. }
  65. type XStreamConf struct {
  66. Debug bool `yaml:"debug"`
  67. Port int `yaml:"port"`
  68. }
  69. var StreamConf = "kuiper.yaml"
  70. func init(){
  71. Log = logrus.New()
  72. Log.SetFormatter(&logrus.TextFormatter{
  73. DisableColors: true,
  74. FullTimestamp: true,
  75. })
  76. b := LoadConf(StreamConf)
  77. var cfg map[string]XStreamConf
  78. if err := yaml.Unmarshal(b, &cfg); err != nil {
  79. Log.Fatal(err)
  80. }
  81. if c, ok := cfg["basic"]; !ok{
  82. Log.Fatal("no basic config in kuiper.yaml")
  83. }else{
  84. Config = &c
  85. }
  86. if !Config.Debug {
  87. logDir, err := GetLoc(log_dir)
  88. if err != nil {
  89. Log.Fatal(err)
  90. }
  91. file := logDir + logFileName
  92. logFile, err := os.OpenFile(file, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666)
  93. if err == nil {
  94. Log.Out = logFile
  95. } else {
  96. Log.Infof("Failed to log to file, using default stderr")
  97. }
  98. }else{
  99. Log.SetLevel(logrus.DebugLevel)
  100. }
  101. }
  102. func DbOpen(dir string) (*badger.DB, error) {
  103. opts := badger.DefaultOptions(dir)
  104. opts.Logger = &logRedirect{}
  105. db, err := badger.Open(opts)
  106. return db, err
  107. }
  108. func DbClose(db *badger.DB) error {
  109. return db.Close()
  110. }
  111. func DbSet(db *badger.DB, key string, value string) error {
  112. err := db.Update(func(txn *badger.Txn) error {
  113. _, err := txn.Get([]byte(key))
  114. //key not found
  115. if err != nil {
  116. err = txn.Set([]byte(key), []byte(value))
  117. }else{
  118. err = fmt.Errorf("key %s already exist, delete it before creating a new one", key)
  119. }
  120. return err
  121. })
  122. return err
  123. }
  124. func DbGet(db *badger.DB, key string) (value string, err error) {
  125. err = db.View(func(txn *badger.Txn) error {
  126. item, err := txn.Get([]byte(key))
  127. if err != nil {
  128. return err
  129. }
  130. err = item.Value(func(val []byte) error {
  131. value = string(val)
  132. return nil
  133. })
  134. return err
  135. })
  136. return
  137. }
  138. func DbDelete(db *badger.DB, key string) error {
  139. err := db.Update(func(txn *badger.Txn) error {
  140. _, err := txn.Get([]byte(key))
  141. //key not found
  142. if err != nil {
  143. return err
  144. }else{
  145. err = txn.Delete([]byte(key))
  146. }
  147. return err
  148. })
  149. return err
  150. }
  151. func DbKeys(db *badger.DB) (keys []string, err error) {
  152. err = db.View(func(txn *badger.Txn) error {
  153. opts := badger.DefaultIteratorOptions
  154. opts.PrefetchSize = 10
  155. it := txn.NewIterator(opts)
  156. defer it.Close()
  157. for it.Rewind(); it.Valid(); it.Next() {
  158. item := it.Item()
  159. k := item.Key()
  160. keys = append(keys, string(k))
  161. }
  162. return nil
  163. })
  164. return
  165. }
  166. func PrintMap(m map[string]string, buff *bytes.Buffer) {
  167. for k, v := range m {
  168. buff.WriteString(fmt.Sprintf("%s: %s\n", k, v))
  169. }
  170. }
  171. func CloseLogger(){
  172. if logFile != nil {
  173. logFile.Close()
  174. }
  175. }
  176. func GetConfLoc()(string, error){
  177. return GetLoc(etc_dir)
  178. }
  179. func GetDataLoc() (string, error) {
  180. return GetLoc(data_dir)
  181. }
  182. func GetLoc(subdir string)(string, error) {
  183. dir, err := os.Getwd()
  184. if err != nil {
  185. return "", err
  186. }
  187. confDir := dir + subdir
  188. if _, err := os.Stat(confDir); os.IsNotExist(err) {
  189. lastdir := dir
  190. for len(dir) > 0 {
  191. dir = filepath.Dir(dir)
  192. if lastdir == dir {
  193. break
  194. }
  195. confDir = dir + subdir
  196. if _, err := os.Stat(confDir); os.IsNotExist(err) {
  197. lastdir = dir
  198. continue
  199. } else {
  200. //Log.Printf("Trying to load file from %s", confDir)
  201. return confDir, nil
  202. }
  203. }
  204. } else {
  205. //Log.Printf("Trying to load file from %s", confDir)
  206. return confDir, nil
  207. }
  208. return "", fmt.Errorf("conf dir not found")
  209. }
  210. func GetAndCreateDataLoc(dir string) (string, error) {
  211. dataDir, err := GetDataLoc()
  212. if err != nil {
  213. return "", err
  214. }
  215. d := path.Join(path.Dir(dataDir), dir)
  216. if _, err := os.Stat(d); os.IsNotExist(err) {
  217. err = os.MkdirAll(d, 0755)
  218. if err != nil {
  219. return "", err
  220. }
  221. }
  222. return d, nil
  223. }
  224. //Time related. For Mock
  225. func GetTicker(duration int) Ticker {
  226. if IsTesting{
  227. if mockTicker == nil{
  228. mockTicker = NewMockTicker(duration)
  229. }else{
  230. mockTicker.SetDuration(duration)
  231. }
  232. return mockTicker
  233. }else{
  234. return NewDefaultTicker(duration)
  235. }
  236. }
  237. func GetTimer(duration int) Timer {
  238. if IsTesting{
  239. if mockTimer == nil{
  240. mockTimer = NewMockTimer(duration)
  241. }else{
  242. mockTimer.SetDuration(duration)
  243. }
  244. return mockTimer
  245. }else{
  246. return NewDefaultTimer(duration)
  247. }
  248. }
  249. /****** For Test Only ********/
  250. func GetMockTicker() *MockTicker{
  251. return mockTicker
  252. }
  253. func ResetMockTicker(){
  254. if mockTicker != nil{
  255. mockTicker.lastTick = 0
  256. }
  257. }
  258. func GetMockTimer() *MockTimer{
  259. return mockTimer
  260. }
  261. func SetMockNow(now int64){
  262. mockNow = now
  263. }
  264. func GetMockNow() int64{
  265. return mockNow
  266. }
  267. /*********** Type Cast Utilities *****/
  268. //TODO datetime type
  269. func ToString(input interface{}) string{
  270. return fmt.Sprintf("%v", input)
  271. }
  272. func ToInt(input interface{}) (int, error){
  273. switch t := input.(type) {
  274. case float64:
  275. return int(t), nil
  276. case int64:
  277. return int(t), nil
  278. case int:
  279. return t, nil
  280. default:
  281. return 0, fmt.Errorf("unsupported type %T of %[1]v", input)
  282. }
  283. }