util.go 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380
  1. package common
  2. import (
  3. "bytes"
  4. "encoding/json"
  5. "fmt"
  6. "github.com/go-yaml/yaml"
  7. "github.com/patrickmn/go-cache"
  8. "github.com/sirupsen/logrus"
  9. "io/ioutil"
  10. "os"
  11. "path"
  12. "path/filepath"
  13. "time"
  14. )
  15. const (
  16. logFileName = "stream.log"
  17. etc_dir = "/etc/"
  18. data_dir = "/data/"
  19. log_dir = "/log/"
  20. )
  21. var (
  22. Log *logrus.Logger
  23. Config *XStreamConf
  24. IsTesting bool
  25. logFile *os.File
  26. mockTicker *MockTicker
  27. mockTimer *MockTimer
  28. mockNow int64
  29. )
  30. type logRedirect struct {
  31. }
  32. func (l *logRedirect) Errorf(f string, v ...interface{}) {
  33. Log.Error(fmt.Sprintf(f, v...))
  34. }
  35. func (l *logRedirect) Infof(f string, v ...interface{}) {
  36. Log.Info(fmt.Sprintf(f, v...))
  37. }
  38. func (l *logRedirect) Warningf(f string, v ...interface{}) {
  39. Log.Warning(fmt.Sprintf(f, v...))
  40. }
  41. func (l *logRedirect) Debugf(f string, v ...interface{}) {
  42. Log.Debug(fmt.Sprintf(f, v...))
  43. }
  44. func LoadConf(confName string) ([]byte, error) {
  45. confDir, err := GetConfLoc()
  46. if err != nil {
  47. return nil, err
  48. }
  49. file := confDir + confName
  50. b, err := ioutil.ReadFile(file)
  51. if err != nil {
  52. return nil, err
  53. }
  54. return b, nil
  55. }
  56. type XStreamConf struct {
  57. Debug bool `yaml:"debug"`
  58. Port int `yaml:"port"`
  59. }
  60. var StreamConf = "kuiper.yaml"
  61. const KuiperBaseKey = "KuiperBaseKey"
  62. func init(){
  63. Log = logrus.New()
  64. Log.SetFormatter(&logrus.TextFormatter{
  65. DisableColors: true,
  66. FullTimestamp: true,
  67. })
  68. }
  69. func InitConf() {
  70. b, err := LoadConf(StreamConf)
  71. if err != nil {
  72. Log.Fatal(err)
  73. }
  74. var cfg map[string]XStreamConf
  75. if err := yaml.Unmarshal(b, &cfg); err != nil {
  76. Log.Fatal(err)
  77. }
  78. if c, ok := cfg["basic"]; !ok{
  79. Log.Fatal("No basic config in kuiper.yaml")
  80. } else {
  81. Config = &c
  82. }
  83. if !Config.Debug {
  84. logDir, err := GetLoc(log_dir)
  85. if err != nil {
  86. Log.Fatal(err)
  87. }
  88. file := logDir + logFileName
  89. logFile, err := os.OpenFile(file, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666)
  90. if err == nil {
  91. Log.Out = logFile
  92. } else {
  93. Log.Infof("Failed to log to file, using default stderr")
  94. }
  95. } else {
  96. Log.SetLevel(logrus.DebugLevel)
  97. }
  98. }
  99. type KeyValue interface {
  100. Open() error
  101. Close() error
  102. Set(key string, value interface{}) error
  103. Get(key string) (interface{}, bool)
  104. Delete(key string) error
  105. Keys() (keys []string, err error)
  106. }
  107. type SimpleKVStore struct {
  108. path string
  109. c *cache.Cache;
  110. }
  111. var stores = make(map[string]*SimpleKVStore)
  112. func GetSimpleKVStore(path string) *SimpleKVStore {
  113. if s, ok := stores[path]; ok {
  114. return s
  115. } else {
  116. c := cache.New(cache.NoExpiration, 0)
  117. if _, err := os.Stat(path); os.IsNotExist(err) {
  118. os.MkdirAll(path, os.ModePerm)
  119. }
  120. sStore := &SimpleKVStore{path: path + "/stores.data", c: c}
  121. stores[path] = sStore
  122. return sStore
  123. }
  124. }
  125. func (m *SimpleKVStore) Open() error {
  126. if _, err := os.Stat(m.path); os.IsNotExist(err) {
  127. return nil
  128. }
  129. if e := m.c.LoadFile(m.path); e != nil {
  130. return e
  131. }
  132. return nil
  133. }
  134. func (m *SimpleKVStore) Close() error {
  135. e := m.saveToFile()
  136. m.c.Flush() //Delete all of the values from memory.
  137. return e
  138. }
  139. func (m *SimpleKVStore) saveToFile() error {
  140. if e := m.c.SaveFile(m.path); e != nil {
  141. return e
  142. }
  143. return nil
  144. }
  145. func (m *SimpleKVStore) Set(key string, value interface{}) error {
  146. if m.c == nil {
  147. return fmt.Errorf("cache %s has not been initialized yet", m.path)
  148. }
  149. if err := m.c.Add(key, value, cache.NoExpiration); err != nil {
  150. return err
  151. }
  152. return m.saveToFile()
  153. }
  154. func (m *SimpleKVStore) Get(key string) (interface{}, bool) {
  155. return m.c.Get(key)
  156. }
  157. func (m *SimpleKVStore) Delete(key string) error {
  158. if m.c == nil {
  159. return fmt.Errorf("cache %s has not been initialized yet", m.path)
  160. }
  161. if _, found := m.c.Get(key); found {
  162. m.c.Delete(key)
  163. }else{
  164. return fmt.Errorf("%s is not found", key)
  165. }
  166. return m.saveToFile()
  167. }
  168. func (m *SimpleKVStore) Keys() (keys []string, err error) {
  169. if m.c == nil {
  170. return nil, fmt.Errorf("Cache %s has not been initialized yet.", m.path)
  171. }
  172. its := m.c.Items()
  173. keys = make([]string, 0, len(its))
  174. for k := range its {
  175. keys = append(keys, k)
  176. }
  177. return keys, nil
  178. }
  179. func PrintMap(m map[string]string, buff *bytes.Buffer) {
  180. for k, v := range m {
  181. buff.WriteString(fmt.Sprintf("%s: %s\n", k, v))
  182. }
  183. }
  184. func CloseLogger(){
  185. if logFile != nil {
  186. logFile.Close()
  187. }
  188. }
  189. func GetConfLoc()(string, error){
  190. return GetLoc(etc_dir)
  191. }
  192. func GetDataLoc() (string, error) {
  193. return GetLoc(data_dir)
  194. }
  195. func GetLoc(subdir string)(string, error) {
  196. dir, err := os.Getwd()
  197. if err != nil {
  198. return "", err
  199. }
  200. if base := os.Getenv(KuiperBaseKey); base != "" {
  201. Log.Infof("Specified Kuiper base folder at location %s.\n", base)
  202. dir = base
  203. }
  204. confDir := dir + subdir
  205. if _, err := os.Stat(confDir); os.IsNotExist(err) {
  206. lastdir := dir
  207. for len(dir) > 0 {
  208. dir = filepath.Dir(dir)
  209. if lastdir == dir {
  210. break
  211. }
  212. confDir = dir + subdir
  213. if _, err := os.Stat(confDir); os.IsNotExist(err) {
  214. lastdir = dir
  215. continue
  216. } else {
  217. //Log.Printf("Trying to load file from %s", confDir)
  218. return confDir, nil
  219. }
  220. }
  221. } else {
  222. //Log.Printf("Trying to load file from %s", confDir)
  223. return confDir, nil
  224. }
  225. return "", fmt.Errorf("conf dir not found")
  226. }
  227. func GetAndCreateDataLoc(dir string) (string, error) {
  228. dataDir, err := GetDataLoc()
  229. if err != nil {
  230. return "", err
  231. }
  232. d := path.Join(path.Dir(dataDir), dir)
  233. if _, err := os.Stat(d); os.IsNotExist(err) {
  234. err = os.MkdirAll(d, 0755)
  235. if err != nil {
  236. return "", err
  237. }
  238. }
  239. return d, nil
  240. }
  241. //Time related. For Mock
  242. func GetTicker(duration int) Ticker {
  243. if IsTesting{
  244. if mockTicker == nil{
  245. mockTicker = NewMockTicker(duration)
  246. }else{
  247. mockTicker.SetDuration(duration)
  248. }
  249. return mockTicker
  250. }else{
  251. return NewDefaultTicker(duration)
  252. }
  253. }
  254. func GetTimer(duration int) Timer {
  255. if IsTesting{
  256. if mockTimer == nil{
  257. mockTimer = NewMockTimer(duration)
  258. }else{
  259. mockTimer.SetDuration(duration)
  260. }
  261. return mockTimer
  262. }else{
  263. return NewDefaultTimer(duration)
  264. }
  265. }
  266. func GetNowInMilli() int64{
  267. if IsTesting {
  268. return GetMockNow()
  269. }else{
  270. return TimeToUnixMilli(time.Now())
  271. }
  272. }
  273. func ProcessPath(p string) (string, error) {
  274. if abs, err := filepath.Abs(p); err != nil {
  275. return "", nil
  276. } else {
  277. if _, err := os.Stat(abs); os.IsNotExist(err) {
  278. return "", err;
  279. }
  280. return abs, nil
  281. }
  282. }
  283. /****** For Test Only ********/
  284. func GetMockTicker() *MockTicker{
  285. return mockTicker
  286. }
  287. func ResetMockTicker(){
  288. if mockTicker != nil{
  289. mockTicker.lastTick = 0
  290. }
  291. }
  292. func GetMockTimer() *MockTimer{
  293. return mockTimer
  294. }
  295. func SetMockNow(now int64){
  296. mockNow = now
  297. }
  298. func GetMockNow() int64{
  299. return mockNow
  300. }
  301. /*********** Type Cast Utilities *****/
  302. //TODO datetime type
  303. func ToString(input interface{}) string{
  304. return fmt.Sprintf("%v", input)
  305. }
  306. func ToInt(input interface{}) (int, error){
  307. switch t := input.(type) {
  308. case float64:
  309. return int(t), nil
  310. case int64:
  311. return int(t), nil
  312. case int:
  313. return t, nil
  314. default:
  315. return 0, fmt.Errorf("unsupported type %T of %[1]v", input)
  316. }
  317. }
  318. /*
  319. * Convert a map into a struct. The output parameter must be a pointer to a struct
  320. * The struct can have the json meta data
  321. */
  322. func MapToStruct(input map[string]interface{}, output interface{}) error{
  323. // convert map to json
  324. jsonString, err := json.Marshal(input)
  325. if err != nil{
  326. return err
  327. }
  328. // convert json to struct
  329. return json.Unmarshal(jsonString, output)
  330. }