Browse Source

Merge pull request #51 from emqx/spec_kuiper_base

Spec kuiper base
ngjaying 5 years atrás
parent
commit
a39c06bf3c
2 changed files with 24 additions and 22 deletions
  1. 11 10
      common/util.go
  2. 13 12
      xstream/server/server/server.go

+ 11 - 10
common/util.go

@@ -3,7 +3,6 @@ package common
 import (
 	"bytes"
 	"encoding/json"
-	"flag"
 	"fmt"
 	"github.com/go-yaml/yaml"
 	"github.com/patrickmn/go-cache"
@@ -72,14 +71,16 @@ type XStreamConf struct {
 }
 
 var StreamConf = "kuiper.yaml"
-var kpbase = flag.String("kuiper_base", "", "Specify Kuiper base directory")
-
+const KuiperBaseKey = "KuiperBaseKey"
 func init(){
 	Log = logrus.New()
 	Log.SetFormatter(&logrus.TextFormatter{
 		DisableColors: true,
 		FullTimestamp: true,
 	})
+}
+
+func InitConf() {
 	b, err := LoadConf(StreamConf)
 	if err != nil {
 		Log.Fatal(err)
@@ -90,8 +91,8 @@ func init(){
 	}
 
 	if c, ok := cfg["basic"]; !ok{
-		Log.Fatal("no basic config in kuiper.yaml")
-	}else{
+		Log.Fatal("No basic config in kuiper.yaml")
+	} else {
 		Config = &c
 	}
 
@@ -107,7 +108,7 @@ func init(){
 		} else {
 			Log.Infof("Failed to log to file, using default stderr")
 		}
-	}else{
+	} else {
 		Log.SetLevel(logrus.DebugLevel)
 	}
 }
@@ -231,10 +232,10 @@ func GetLoc(subdir string)(string, error) {
 		return "", err
 	}
 
-	//flag.Parse()
-	//if loc := *kpbase; loc != "" {
-	//	dir = loc
-	//}
+	if base := os.Getenv(KuiperBaseKey); base != "" {
+		Log.Infof("Specified Kuiper base folder at location %s.\n", base)
+		dir = base
+	}
 
 	confDir := dir + subdir
 	if _, err := os.Stat(confDir); os.IsNotExist(err) {

+ 13 - 12
xstream/server/server/server.go

@@ -239,17 +239,6 @@ func (t *Server) DropRule(name string, reply *string) error{
 }
 
 func init(){
-	var err error
-	dataDir, err = common.GetDataLoc()
-	if err != nil {
-		log.Panic(err)
-	}else{
-		log.Infof("db location is %s", dataDir)
-	}
-
-	processor = processors.NewRuleProcessor(path.Dir(dataDir))
-	registry = make(RuleRegistry)
-
 	ticker := time.NewTicker(time.Second * 5)
 	go func() {
 		for {
@@ -271,6 +260,18 @@ func init(){
 
 
 func StartUp(Version string) {
+	common.InitConf()
+
+	dr, err := common.GetDataLoc()
+	if err != nil {
+		log.Panic(err)
+	}else{
+		log.Infof("db location is %s", dr)
+		dataDir = dr
+	}
+	processor = processors.NewRuleProcessor(path.Dir(dataDir))
+	registry = make(RuleRegistry)
+
 	server := new(Server)
 	//Start rules
 	if rules, err := processor.GetAllRules(); err != nil{
@@ -289,7 +290,7 @@ func StartUp(Version string) {
 	}
 
 	//Start server
-	err := rpc.Register(server)
+	err = rpc.Register(server)
 	if err != nil {
 		log.Fatal("Format of service Server isn't correct. ", err)
 	}