util.go 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395
  1. package common
  2. import (
  3. "bytes"
  4. "encoding/json"
  5. "fmt"
  6. "github.com/benbjohnson/clock"
  7. "github.com/emqx/kuiper/xstream/api"
  8. "github.com/go-yaml/yaml"
  9. "github.com/keepeye/logrus-filename"
  10. "github.com/lestrrat-go/file-rotatelogs"
  11. "github.com/sirupsen/logrus"
  12. "io"
  13. "io/ioutil"
  14. "os"
  15. "path"
  16. "path/filepath"
  17. "sort"
  18. "strings"
  19. "sync"
  20. "time"
  21. )
  22. const (
  23. logFileName = "stream.log"
  24. etc_dir = "/etc/"
  25. data_dir = "/data/"
  26. log_dir = "/log/"
  27. StreamConf = "kuiper.yaml"
  28. KuiperBaseKey = "KuiperBaseKey"
  29. KuiperSyslogKey = "KuiperSyslogKey"
  30. MetaKey = "__meta"
  31. )
  32. var (
  33. Log *logrus.Logger
  34. Config *KuiperConf
  35. IsTesting bool
  36. Clock clock.Clock
  37. logFile *os.File
  38. LoadFileType = "relative"
  39. )
  40. func LoadConf(confName string) ([]byte, error) {
  41. confDir, err := GetConfLoc()
  42. if err != nil {
  43. return nil, err
  44. }
  45. file := path.Join(confDir, confName)
  46. b, err := ioutil.ReadFile(file)
  47. if err != nil {
  48. return nil, err
  49. }
  50. return b, nil
  51. }
  52. type tlsConf struct {
  53. Certfile string `yaml:"certfile"`
  54. Keyfile string `yaml:"keyfile"`
  55. }
  56. type KuiperConf struct {
  57. Basic struct {
  58. Debug bool `yaml:"debug"`
  59. ConsoleLog bool `yaml:"consoleLog"`
  60. FileLog bool `yaml:"fileLog"`
  61. RotateTime int `yaml:"rotateTime"`
  62. MaxAge int `yaml:"maxAge"`
  63. Port int `yaml:"port"`
  64. RestPort int `yaml:"restPort"`
  65. RestTls *tlsConf `yaml:"restTls"`
  66. Prometheus bool `yaml:"prometheus"`
  67. PrometheusPort int `yaml:"prometheusPort"`
  68. PluginHosts string `yaml:"pluginHosts"`
  69. }
  70. Rule api.RuleOption
  71. Sink struct {
  72. CacheThreshold int `yaml:"cacheThreshold"`
  73. CacheTriggerCount int `yaml:"cacheTriggerCount"`
  74. DisableCache bool `yaml:"disableCache""`
  75. }
  76. }
  77. func init() {
  78. Log = logrus.New()
  79. initSyslog()
  80. filenameHook := filename.NewHook()
  81. filenameHook.Field = "file"
  82. Log.AddHook(filenameHook)
  83. Log.SetFormatter(&logrus.TextFormatter{
  84. TimestampFormat: "2006-01-02 15:04:05",
  85. DisableColors: true,
  86. FullTimestamp: true,
  87. })
  88. Log.Debugf("init with args %s", os.Args)
  89. for _, arg := range os.Args {
  90. if strings.HasPrefix(arg, "-test.") {
  91. IsTesting = true
  92. break
  93. }
  94. }
  95. if IsTesting {
  96. Log.Debugf("running in testing mode")
  97. Clock = clock.NewMock()
  98. } else {
  99. Clock = clock.New()
  100. }
  101. }
  102. func InitConf() {
  103. b, err := LoadConf(StreamConf)
  104. if err != nil {
  105. Log.Fatal(err)
  106. }
  107. kc := KuiperConf{
  108. Rule: api.RuleOption{
  109. LateTol: 1000,
  110. Concurrency: 1,
  111. BufferLength: 1024,
  112. CheckpointInterval: 300000, //5 minutes
  113. },
  114. }
  115. if err := yaml.Unmarshal(b, &kc); err != nil {
  116. Log.Fatal(err)
  117. } else {
  118. Config = &kc
  119. }
  120. if Config.Basic.Debug {
  121. Log.SetLevel(logrus.DebugLevel)
  122. }
  123. logDir, err := GetLoc(log_dir)
  124. if err != nil {
  125. Log.Fatal(err)
  126. }
  127. file := path.Join(logDir, logFileName)
  128. logFile, err := os.OpenFile(file, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666)
  129. if err == nil {
  130. if Config.Basic.ConsoleLog {
  131. if Config.Basic.FileLog {
  132. mw := io.MultiWriter(os.Stdout, logFile)
  133. Log.SetOutput(mw)
  134. }
  135. } else {
  136. if Config.Basic.FileLog {
  137. writer, err := rotatelogs.New(
  138. file+".%Y-%m-%d_%H:%M:%S",
  139. rotatelogs.WithLinkName(file),
  140. rotatelogs.WithRotationTime(time.Hour*time.Duration(Config.Basic.RotateTime)),
  141. rotatelogs.WithMaxAge(time.Hour*time.Duration(Config.Basic.MaxAge)),
  142. )
  143. if err != nil {
  144. logrus.Errorf("config local file system for logger error: %v", err)
  145. } else {
  146. Log.SetOutput(writer)
  147. }
  148. }
  149. }
  150. } else {
  151. fmt.Println("Failed to init log file settings..." + err.Error())
  152. Log.Infof("Failed to log to file, using default stderr.")
  153. }
  154. }
  155. func PrintMap(m map[string]string, buff *bytes.Buffer) {
  156. si := make([]string, 0, len(m))
  157. for s := range m {
  158. si = append(si, s)
  159. }
  160. sort.Strings(si)
  161. for _, s := range si {
  162. buff.WriteString(fmt.Sprintf("%s: %s\n", s, m[s]))
  163. }
  164. }
  165. func CloseLogger() {
  166. if logFile != nil {
  167. logFile.Close()
  168. }
  169. }
  170. func GetConfLoc() (string, error) {
  171. return GetLoc(etc_dir)
  172. }
  173. func GetDataLoc() (string, error) {
  174. if IsTesting {
  175. dataDir, err := GetLoc(data_dir)
  176. if err != nil {
  177. return "", err
  178. }
  179. d := path.Join(path.Dir(dataDir), "test")
  180. if _, err := os.Stat(d); os.IsNotExist(err) {
  181. err = os.MkdirAll(d, 0755)
  182. if err != nil {
  183. return "", err
  184. }
  185. }
  186. return d, nil
  187. }
  188. return GetLoc(data_dir)
  189. }
  190. func absolutePath(subdir string) (dir string, err error) {
  191. subdir = strings.TrimLeft(subdir, `/`)
  192. subdir = strings.TrimRight(subdir, `/`)
  193. switch subdir {
  194. case "etc":
  195. dir = "/etc/kuiper/"
  196. break
  197. case "data":
  198. dir = "/var/lib/kuiper/data/"
  199. break
  200. case "log":
  201. dir = "/var/log/kuiper/"
  202. break
  203. case "plugins":
  204. dir = "/var/lib/kuiper/plugins/"
  205. break
  206. }
  207. if 0 == len(dir) {
  208. return "", fmt.Errorf("no find such file : %s", subdir)
  209. }
  210. return dir, nil
  211. }
  212. func GetLoc(subdir string) (string, error) {
  213. if "relative" == LoadFileType {
  214. return relativePath(subdir)
  215. }
  216. if "absolute" == LoadFileType {
  217. return absolutePath(subdir)
  218. }
  219. return "", fmt.Errorf("Unrecognized loading method.")
  220. }
  221. func relativePath(subdir string) (dir string, err error) {
  222. dir, err = os.Getwd()
  223. if err != nil {
  224. return "", err
  225. }
  226. if base := os.Getenv(KuiperBaseKey); base != "" {
  227. Log.Infof("Specified Kuiper base folder at location %s.\n", base)
  228. dir = base
  229. }
  230. confDir := dir + subdir
  231. if _, err := os.Stat(confDir); os.IsNotExist(err) {
  232. lastdir := dir
  233. for len(dir) > 0 {
  234. dir = filepath.Dir(dir)
  235. if lastdir == dir {
  236. break
  237. }
  238. confDir = dir + subdir
  239. if _, err := os.Stat(confDir); os.IsNotExist(err) {
  240. lastdir = dir
  241. continue
  242. } else {
  243. //Log.Printf("Trying to load file from %s", confDir)
  244. return confDir, nil
  245. }
  246. }
  247. } else {
  248. //Log.Printf("Trying to load file from %s", confDir)
  249. return confDir, nil
  250. }
  251. return "", fmt.Errorf("conf dir not found, please set KuiperBaseKey program environment variable correctly.")
  252. }
  253. func ProcessPath(p string) (string, error) {
  254. if abs, err := filepath.Abs(p); err != nil {
  255. return "", nil
  256. } else {
  257. if _, err := os.Stat(abs); os.IsNotExist(err) {
  258. return "", err
  259. }
  260. return abs, nil
  261. }
  262. }
  263. /*********** Type Cast Utilities *****/
  264. //TODO datetime type
  265. func ToString(input interface{}) string {
  266. return fmt.Sprintf("%v", input)
  267. }
  268. func ToInt(input interface{}) (int, error) {
  269. switch t := input.(type) {
  270. case float64:
  271. return int(t), nil
  272. case int64:
  273. return int(t), nil
  274. case int:
  275. return t, nil
  276. default:
  277. return 0, fmt.Errorf("unsupported type %T of %[1]v", input)
  278. }
  279. }
  280. /*
  281. * Convert a map into a struct. The output parameter must be a pointer to a struct
  282. * The struct can have the json meta data
  283. */
  284. func MapToStruct(input, output interface{}) error {
  285. // convert map to json
  286. jsonString, err := json.Marshal(input)
  287. if err != nil {
  288. return err
  289. }
  290. // convert json to struct
  291. return json.Unmarshal(jsonString, output)
  292. }
  293. func ConvertMap(s map[interface{}]interface{}) map[string]interface{} {
  294. r := make(map[string]interface{})
  295. for k, v := range s {
  296. switch t := v.(type) {
  297. case map[interface{}]interface{}:
  298. v = ConvertMap(t)
  299. case []interface{}:
  300. v = ConvertArray(t)
  301. }
  302. r[fmt.Sprintf("%v", k)] = v
  303. }
  304. return r
  305. }
  306. func ConvertArray(s []interface{}) []interface{} {
  307. r := make([]interface{}, len(s))
  308. for i, e := range s {
  309. switch t := e.(type) {
  310. case map[interface{}]interface{}:
  311. e = ConvertMap(t)
  312. case []interface{}:
  313. e = ConvertArray(t)
  314. }
  315. r[i] = e
  316. }
  317. return r
  318. }
  319. func SyncMapToMap(sm *sync.Map) map[string]interface{} {
  320. m := make(map[string]interface{})
  321. sm.Range(func(k interface{}, v interface{}) bool {
  322. m[fmt.Sprintf("%v", k)] = v
  323. return true
  324. })
  325. return m
  326. }
  327. func MapToSyncMap(m map[string]interface{}) *sync.Map {
  328. sm := new(sync.Map)
  329. for k, v := range m {
  330. sm.Store(k, v)
  331. }
  332. return sm
  333. }
  334. func ReadJsonUnmarshal(path string, ret interface{}) error {
  335. sliByte, err := ioutil.ReadFile(path)
  336. if nil != err {
  337. return err
  338. }
  339. err = json.Unmarshal(sliByte, ret)
  340. if nil != err {
  341. return err
  342. }
  343. return nil
  344. }
  345. func ReadYamlUnmarshal(path string, ret interface{}) error {
  346. sliByte, err := ioutil.ReadFile(path)
  347. if nil != err {
  348. return err
  349. }
  350. err = yaml.Unmarshal(sliByte, ret)
  351. if nil != err {
  352. return err
  353. }
  354. return nil
  355. }
  356. func WriteYamlMarshal(path string, data interface{}) error {
  357. y, err := yaml.Marshal(data)
  358. if nil != err {
  359. return err
  360. }
  361. return ioutil.WriteFile(path, y, 0666)
  362. }