Browse Source

Bug fixes (#104)

* feat(server): support more concurrent connection

* bug(stream): fix problem of broadcasting to multiple output
ngjaying 5 years atrás
parent
commit
a700032aa8
3 changed files with 10 additions and 8 deletions
  1. 6 6
      xstream/nodes/common_func.go
  2. 3 1
      xstream/server/server/rest.go
  3. 1 1
      xstream/sinks/mqtt_sink.go

+ 6 - 6
xstream/nodes/common_func.go

@@ -8,15 +8,15 @@ import (
 //Blocking broadcast
 func Broadcast(outputs map[string]chan<- interface{}, val interface{}, ctx api.StreamContext) {
 	logger := ctx.GetLogger()
-	var barrier sync.WaitGroup
-	barrier.Add(len(outputs))
+	var wg sync.WaitGroup
+	wg.Add(len(outputs))
 	for n, out := range outputs {
-		go func(wg *sync.WaitGroup) {
-			out <- val
+		go func(output chan<- interface{}) {
+			output <- val
 			wg.Done()
 			logger.Debugf("broadcast from %s to %s done", ctx.GetOpId(), n)
-		}(&barrier)
+		}(out)
 	}
 	logger.Debugf("broadcasting from %s", ctx.GetOpId())
-	barrier.Wait()
+	wg.Wait()
 }

+ 3 - 1
xstream/server/server/rest.go

@@ -60,7 +60,7 @@ func createRestServer(port int) *http.Server {
 	r.HandleFunc("/rules/{name}/stop", stopRuleHandler).Methods(http.MethodPost)
 	r.HandleFunc("/rules/{name}/restart", restartRuleHandler).Methods(http.MethodPost)
 
-	return &http.Server{
+	server := &http.Server{
 		Addr: fmt.Sprintf("0.0.0.0:%d", port),
 		// Good practice to set timeouts to avoid Slowloris attacks.
 		WriteTimeout: time.Second * 15,
@@ -68,6 +68,8 @@ func createRestServer(port int) *http.Server {
 		IdleTimeout:  time.Second * 60,
 		Handler:      r, // Pass our instance of gorilla/mux in.
 	}
+	server.SetKeepAlivesEnabled(false)
+	return server
 }
 
 //list or create streams

+ 1 - 1
xstream/sinks/mqtt_sink.go

@@ -156,7 +156,7 @@ func (ms *MQTTSink) Open(ctx api.StreamContext) error {
 func (ms *MQTTSink) Collect(ctx api.StreamContext, item interface{}) error {
 	logger := ctx.GetLogger()
 	c := ms.conn
-	logger.Infof("publish %s", item)
+	logger.Debugf("%s publish %s", ctx.GetOpId(), item)
 	if token := c.Publish(ms.tpc, 0, false, item); token.Wait() && token.Error() != nil {
 		return fmt.Errorf("publish error: %s", token.Error())
 	}