symbol.go 1.9 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768
  1. // Copyright 2021 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. // Runtime for symbol, to establish data connection
  15. package runtime
  16. import (
  17. "encoding/json"
  18. "fmt"
  19. "github.com/lf-edge/ekuiper/sdk/go/api"
  20. "github.com/lf-edge/ekuiper/sdk/go/connection"
  21. "github.com/lf-edge/ekuiper/sdk/go/context"
  22. )
  23. type RuntimeInstance interface {
  24. run()
  25. stop() error
  26. isRunning() bool
  27. }
  28. func broadcast(ctx api.StreamContext, sock connection.DataOutChannel, data interface{}) {
  29. // encode
  30. var (
  31. result []byte
  32. err error
  33. )
  34. switch dt := data.(type) {
  35. case error:
  36. result, err = json.Marshal(fmt.Sprintf("{\"error\":\"%v\"}", dt))
  37. if err != nil {
  38. ctx.GetLogger().Errorf("%v", err)
  39. return
  40. }
  41. default:
  42. result, err = json.Marshal(dt)
  43. if err != nil {
  44. ctx.GetLogger().Errorf("%v", err)
  45. return
  46. }
  47. }
  48. if err = sock.Send(result); err != nil {
  49. ctx.GetLogger().Errorf("Failed publishing: %s", err.Error())
  50. }
  51. }
  52. func parseContext(con *Control) (api.StreamContext, error) {
  53. if con.Meta.RuleId == "" || con.Meta.OpId == "" {
  54. err := fmt.Sprintf("invalid arg %v, ruleId and opId are required", con)
  55. context.Log.Errorf(err)
  56. return nil, fmt.Errorf(err)
  57. }
  58. contextLogger := context.LogEntry("rule", con.Meta.RuleId)
  59. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger).WithMeta(con.Meta.RuleId, con.Meta.OpId)
  60. return ctx, nil
  61. }