|
@@ -78,10 +78,15 @@ func (s *TopologyNew) drainErr(err error) {
|
|
|
}
|
|
|
|
|
|
func (s *TopologyNew) Open() <-chan error {
|
|
|
+
|
|
|
+ //if stream has opened, do nothing
|
|
|
+ if s.ctx != nil && s.ctx.Err() == nil {
|
|
|
+ s.ctx.GetLogger().Infoln("rule is already running, do nothing")
|
|
|
+ return s.drain
|
|
|
+ }
|
|
|
s.prepareContext() // ensure context is set
|
|
|
log := s.ctx.GetLogger()
|
|
|
log.Infoln("Opening stream")
|
|
|
-
|
|
|
// open stream
|
|
|
go func() {
|
|
|
// open stream sink, after log sink is ready.
|