util.go 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398
  1. package common
  2. import (
  3. "bytes"
  4. "encoding/json"
  5. "fmt"
  6. "github.com/benbjohnson/clock"
  7. "github.com/emqx/kuiper/xstream/api"
  8. "github.com/go-yaml/yaml"
  9. "github.com/keepeye/logrus-filename"
  10. "github.com/lestrrat-go/file-rotatelogs"
  11. "github.com/sirupsen/logrus"
  12. "io"
  13. "io/ioutil"
  14. "os"
  15. "path"
  16. "path/filepath"
  17. "sort"
  18. "strings"
  19. "sync"
  20. "time"
  21. )
  22. const (
  23. logFileName = "stream.log"
  24. etc_dir = "/etc/"
  25. data_dir = "/data/"
  26. log_dir = "/log/"
  27. StreamConf = "kuiper.yaml"
  28. KuiperBaseKey = "KuiperBaseKey"
  29. KuiperSyslogKey = "KuiperSyslogKey"
  30. MetaKey = "__meta"
  31. )
  32. var (
  33. Log *logrus.Logger
  34. Config *KuiperConf
  35. IsTesting bool
  36. Clock clock.Clock
  37. logFile *os.File
  38. LoadFileType = "relative"
  39. )
  40. func LoadConf(confName string) ([]byte, error) {
  41. confDir, err := GetConfLoc()
  42. if err != nil {
  43. return nil, err
  44. }
  45. file := path.Join(confDir, confName)
  46. b, err := ioutil.ReadFile(file)
  47. if err != nil {
  48. return nil, err
  49. }
  50. return b, nil
  51. }
  52. type tlsConf struct {
  53. Certfile string `yaml:"certfile"`
  54. Keyfile string `yaml:"keyfile"`
  55. }
  56. type KuiperConf struct {
  57. Basic struct {
  58. Debug bool `yaml:"debug"`
  59. ConsoleLog bool `yaml:"consoleLog"`
  60. FileLog bool `yaml:"fileLog"`
  61. RotateTime int `yaml:"rotateTime"`
  62. MaxAge int `yaml:"maxAge"`
  63. Ip string `yaml:"ip"`
  64. Port int `yaml:"port"`
  65. RestIp string `yaml:"restIp"`
  66. RestPort int `yaml:"restPort"`
  67. RestTls *tlsConf `yaml:"restTls"`
  68. Prometheus bool `yaml:"prometheus"`
  69. PrometheusPort int `yaml:"prometheusPort"`
  70. PluginHosts string `yaml:"pluginHosts"`
  71. }
  72. Rule api.RuleOption
  73. Sink struct {
  74. CacheThreshold int `yaml:"cacheThreshold"`
  75. CacheTriggerCount int `yaml:"cacheTriggerCount"`
  76. DisableCache bool `yaml:"disableCache""`
  77. }
  78. }
  79. func init() {
  80. Log = logrus.New()
  81. initSyslog()
  82. filenameHook := filename.NewHook()
  83. filenameHook.Field = "file"
  84. Log.AddHook(filenameHook)
  85. Log.SetFormatter(&logrus.TextFormatter{
  86. TimestampFormat: "2006-01-02 15:04:05",
  87. DisableColors: true,
  88. FullTimestamp: true,
  89. })
  90. Log.Debugf("init with args %s", os.Args)
  91. for _, arg := range os.Args {
  92. if strings.HasPrefix(arg, "-test.") {
  93. IsTesting = true
  94. break
  95. }
  96. }
  97. if IsTesting {
  98. Log.Debugf("running in testing mode")
  99. Clock = clock.NewMock()
  100. } else {
  101. Clock = clock.New()
  102. }
  103. }
  104. func InitConf() {
  105. b, err := LoadConf(StreamConf)
  106. if err != nil {
  107. Log.Fatal(err)
  108. }
  109. kc := KuiperConf{
  110. Rule: api.RuleOption{
  111. LateTol: 1000,
  112. Concurrency: 1,
  113. BufferLength: 1024,
  114. CheckpointInterval: 300000, //5 minutes
  115. },
  116. }
  117. if err := yaml.Unmarshal(b, &kc); err != nil {
  118. Log.Fatal(err)
  119. } else {
  120. Config = &kc
  121. }
  122. if 0 == len(Config.Basic.Ip) {
  123. Config.Basic.Ip = "0.0.0.0"
  124. }
  125. if 0 == len(Config.Basic.RestIp) {
  126. Config.Basic.RestIp = "0.0.0.0"
  127. }
  128. if Config.Basic.Debug {
  129. Log.SetLevel(logrus.DebugLevel)
  130. }
  131. if Config.Basic.FileLog {
  132. logDir, err := GetLoc(log_dir)
  133. if err != nil {
  134. Log.Fatal(err)
  135. }
  136. file := path.Join(logDir, logFileName)
  137. logWriter, err := rotatelogs.New(
  138. file+".%Y-%m-%d_%H-%M-%S",
  139. rotatelogs.WithLinkName(file),
  140. rotatelogs.WithRotationTime(time.Hour*time.Duration(Config.Basic.RotateTime)),
  141. rotatelogs.WithMaxAge(time.Hour*time.Duration(Config.Basic.MaxAge)),
  142. )
  143. if err != nil {
  144. fmt.Println("Failed to init log file settings..." + err.Error())
  145. Log.Infof("Failed to log to file, using default stderr.")
  146. } else if Config.Basic.ConsoleLog {
  147. mw := io.MultiWriter(os.Stdout, logWriter)
  148. Log.SetOutput(mw)
  149. } else if !Config.Basic.ConsoleLog {
  150. Log.SetOutput(logWriter)
  151. }
  152. } else if Config.Basic.ConsoleLog {
  153. Log.SetOutput(os.Stdout)
  154. }
  155. }
  156. func PrintMap(m map[string]string, buff *bytes.Buffer) {
  157. si := make([]string, 0, len(m))
  158. for s := range m {
  159. si = append(si, s)
  160. }
  161. sort.Strings(si)
  162. for _, s := range si {
  163. buff.WriteString(fmt.Sprintf("%s: %s\n", s, m[s]))
  164. }
  165. }
  166. func CloseLogger() {
  167. if logFile != nil {
  168. logFile.Close()
  169. }
  170. }
  171. func GetConfLoc() (string, error) {
  172. return GetLoc(etc_dir)
  173. }
  174. func GetDataLoc() (string, error) {
  175. if IsTesting {
  176. dataDir, err := GetLoc(data_dir)
  177. if err != nil {
  178. return "", err
  179. }
  180. d := path.Join(path.Dir(dataDir), "test")
  181. if _, err := os.Stat(d); os.IsNotExist(err) {
  182. err = os.MkdirAll(d, 0755)
  183. if err != nil {
  184. return "", err
  185. }
  186. }
  187. return d, nil
  188. }
  189. return GetLoc(data_dir)
  190. }
  191. func absolutePath(subdir string) (dir string, err error) {
  192. subdir = strings.TrimLeft(subdir, `/`)
  193. subdir = strings.TrimRight(subdir, `/`)
  194. switch subdir {
  195. case "etc":
  196. dir = "/etc/kuiper/"
  197. break
  198. case "data":
  199. dir = "/var/lib/kuiper/data/"
  200. break
  201. case "log":
  202. dir = "/var/log/kuiper/"
  203. break
  204. case "plugins":
  205. dir = "/var/lib/kuiper/plugins/"
  206. break
  207. }
  208. if 0 == len(dir) {
  209. return "", fmt.Errorf("no find such file : %s", subdir)
  210. }
  211. return dir, nil
  212. }
  213. func GetLoc(subdir string) (string, error) {
  214. if "relative" == LoadFileType {
  215. return relativePath(subdir)
  216. }
  217. if "absolute" == LoadFileType {
  218. return absolutePath(subdir)
  219. }
  220. return "", fmt.Errorf("Unrecognized loading method.")
  221. }
  222. func relativePath(subdir string) (dir string, err error) {
  223. dir, err = os.Getwd()
  224. if err != nil {
  225. return "", err
  226. }
  227. if base := os.Getenv(KuiperBaseKey); base != "" {
  228. Log.Infof("Specified Kuiper base folder at location %s.\n", base)
  229. dir = base
  230. }
  231. confDir := dir + subdir
  232. if _, err := os.Stat(confDir); os.IsNotExist(err) {
  233. lastdir := dir
  234. for len(dir) > 0 {
  235. dir = filepath.Dir(dir)
  236. if lastdir == dir {
  237. break
  238. }
  239. confDir = dir + subdir
  240. if _, err := os.Stat(confDir); os.IsNotExist(err) {
  241. lastdir = dir
  242. continue
  243. } else {
  244. //Log.Printf("Trying to load file from %s", confDir)
  245. return confDir, nil
  246. }
  247. }
  248. } else {
  249. //Log.Printf("Trying to load file from %s", confDir)
  250. return confDir, nil
  251. }
  252. return "", fmt.Errorf("conf dir not found, please set KuiperBaseKey program environment variable correctly.")
  253. }
  254. func ProcessPath(p string) (string, error) {
  255. if abs, err := filepath.Abs(p); err != nil {
  256. return "", nil
  257. } else {
  258. if _, err := os.Stat(abs); os.IsNotExist(err) {
  259. return "", err
  260. }
  261. return abs, nil
  262. }
  263. }
  264. /*********** Type Cast Utilities *****/
  265. //TODO datetime type
  266. func ToString(input interface{}) string {
  267. return fmt.Sprintf("%v", input)
  268. }
  269. func ToInt(input interface{}) (int, error) {
  270. switch t := input.(type) {
  271. case float64:
  272. return int(t), nil
  273. case int64:
  274. return int(t), nil
  275. case int:
  276. return t, nil
  277. default:
  278. return 0, fmt.Errorf("unsupported type %T of %[1]v", input)
  279. }
  280. }
  281. /*
  282. * Convert a map into a struct. The output parameter must be a pointer to a struct
  283. * The struct can have the json meta data
  284. */
  285. func MapToStruct(input, output interface{}) error {
  286. // convert map to json
  287. jsonString, err := json.Marshal(input)
  288. if err != nil {
  289. return err
  290. }
  291. // convert json to struct
  292. return json.Unmarshal(jsonString, output)
  293. }
  294. func ConvertMap(s map[interface{}]interface{}) map[string]interface{} {
  295. r := make(map[string]interface{})
  296. for k, v := range s {
  297. switch t := v.(type) {
  298. case map[interface{}]interface{}:
  299. v = ConvertMap(t)
  300. case []interface{}:
  301. v = ConvertArray(t)
  302. }
  303. r[fmt.Sprintf("%v", k)] = v
  304. }
  305. return r
  306. }
  307. func ConvertArray(s []interface{}) []interface{} {
  308. r := make([]interface{}, len(s))
  309. for i, e := range s {
  310. switch t := e.(type) {
  311. case map[interface{}]interface{}:
  312. e = ConvertMap(t)
  313. case []interface{}:
  314. e = ConvertArray(t)
  315. }
  316. r[i] = e
  317. }
  318. return r
  319. }
  320. func SyncMapToMap(sm *sync.Map) map[string]interface{} {
  321. m := make(map[string]interface{})
  322. sm.Range(func(k interface{}, v interface{}) bool {
  323. m[fmt.Sprintf("%v", k)] = v
  324. return true
  325. })
  326. return m
  327. }
  328. func MapToSyncMap(m map[string]interface{}) *sync.Map {
  329. sm := new(sync.Map)
  330. for k, v := range m {
  331. sm.Store(k, v)
  332. }
  333. return sm
  334. }
  335. func ReadJsonUnmarshal(path string, ret interface{}) error {
  336. sliByte, err := ioutil.ReadFile(path)
  337. if nil != err {
  338. return err
  339. }
  340. err = json.Unmarshal(sliByte, ret)
  341. if nil != err {
  342. return err
  343. }
  344. return nil
  345. }
  346. func ReadYamlUnmarshal(path string, ret interface{}) error {
  347. sliByte, err := ioutil.ReadFile(path)
  348. if nil != err {
  349. return err
  350. }
  351. err = yaml.Unmarshal(sliByte, ret)
  352. if nil != err {
  353. return err
  354. }
  355. return nil
  356. }
  357. func WriteYamlMarshal(path string, data interface{}) error {
  358. y, err := yaml.Marshal(data)
  359. if nil != err {
  360. return err
  361. }
  362. return ioutil.WriteFile(path, y, 0666)
  363. }