sink.go 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191
  1. // Copyright 2022-2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package neuron
  15. import (
  16. "encoding/json"
  17. "fmt"
  18. "github.com/lf-edge/ekuiper/internal/conf"
  19. "github.com/lf-edge/ekuiper/pkg/api"
  20. "github.com/lf-edge/ekuiper/pkg/cast"
  21. "github.com/lf-edge/ekuiper/pkg/errorx"
  22. "sort"
  23. )
  24. type sink struct {
  25. c *c
  26. cli *conninfo
  27. }
  28. type c struct {
  29. NodeName string `json:"nodeName"`
  30. GroupName string `json:"groupName"`
  31. Tags []string `json:"tags"`
  32. // If sent with the raw converted string or let us range over the result map
  33. Raw bool `json:"raw"`
  34. Url string `json:"url"`
  35. }
  36. type neuronTemplate struct {
  37. GroupName string `json:"group_name"`
  38. NodeName string `json:"node_name"`
  39. TagName string `json:"tag_name"`
  40. Value interface{} `json:"value"`
  41. }
  42. func (s *sink) Configure(props map[string]interface{}) error {
  43. cc := &c{
  44. Raw: false,
  45. Url: DefaultNeuronUrl,
  46. }
  47. err := cast.MapToStruct(props, cc)
  48. if err != nil {
  49. return err
  50. }
  51. if !cc.Raw {
  52. if cc.NodeName == "" {
  53. return fmt.Errorf("node name is required if raw is not set")
  54. }
  55. if cc.GroupName == "" {
  56. return fmt.Errorf("group name is required if raw is not set")
  57. }
  58. }
  59. s.c = cc
  60. return nil
  61. }
  62. func (s *sink) Open(ctx api.StreamContext) error {
  63. ctx.GetLogger().Debugf("Opening neuron sink")
  64. cli, err := createOrGetConnection(ctx, s.c.Url)
  65. if err != nil {
  66. return err
  67. }
  68. s.cli = cli
  69. return nil
  70. }
  71. func (s *sink) Collect(ctx api.StreamContext, data interface{}) error {
  72. ctx.GetLogger().Debugf("receive %+v", data)
  73. if s.c.Raw {
  74. r, _, err := ctx.TransformOutput(data)
  75. if err != nil {
  76. return err
  77. }
  78. return publish(ctx, r, s.cli)
  79. } else {
  80. switch d := data.(type) {
  81. case []map[string]interface{}:
  82. for _, el := range d {
  83. err := s.SendMapToNeuron(ctx, el)
  84. if err != nil {
  85. return err
  86. }
  87. }
  88. return nil
  89. case map[string]interface{}:
  90. return s.SendMapToNeuron(ctx, d)
  91. default:
  92. return fmt.Errorf("unrecognized format of %s", data)
  93. }
  94. }
  95. }
  96. func (s *sink) Close(ctx api.StreamContext) error {
  97. ctx.GetLogger().Debugf("closing neuron sink")
  98. if s.cli != nil {
  99. return closeConnection(ctx, s.c.Url)
  100. }
  101. return nil
  102. }
  103. func (s *sink) SendMapToNeuron(ctx api.StreamContext, el map[string]interface{}) error {
  104. n, err := ctx.ParseTemplate(s.c.NodeName, el)
  105. if err != nil {
  106. return err
  107. }
  108. g, err := ctx.ParseTemplate(s.c.GroupName, el)
  109. if err != nil {
  110. return err
  111. }
  112. t := &neuronTemplate{
  113. NodeName: n,
  114. GroupName: g,
  115. }
  116. var (
  117. ok bool
  118. )
  119. if len(s.c.Tags) == 0 {
  120. if conf.IsTesting {
  121. var keys []string
  122. for k := range el {
  123. keys = append(keys, k)
  124. }
  125. sort.Strings(keys)
  126. for _, k := range keys {
  127. t.TagName = k
  128. t.Value = el[k]
  129. err := doPublish(ctx, t, s.cli)
  130. if err != nil {
  131. return err
  132. }
  133. }
  134. } else {
  135. for k, v := range el {
  136. t.TagName = k
  137. t.Value = v
  138. err := doPublish(ctx, t, s.cli)
  139. if err != nil {
  140. return err
  141. }
  142. }
  143. }
  144. } else {
  145. // Send as many tags as possible in order and drop the tag if it is invalid
  146. for _, tag := range s.c.Tags {
  147. t.TagName, err = ctx.ParseTemplate(tag, el)
  148. if err != nil {
  149. ctx.GetLogger().Errorf("Error parsing tag %s: %v", tag, err)
  150. continue
  151. }
  152. t.Value, ok = el[t.TagName]
  153. if !ok {
  154. ctx.GetLogger().Errorf("Error get the value of tag %s: %v", t.TagName, err)
  155. continue
  156. }
  157. err := doPublish(ctx, t, s.cli)
  158. if err != nil {
  159. return err
  160. }
  161. }
  162. }
  163. return nil
  164. }
  165. func doPublish(ctx api.StreamContext, t *neuronTemplate, cli *conninfo) error {
  166. r, err := json.Marshal(t)
  167. if err != nil {
  168. return fmt.Errorf("Error marshall the tag payload %v: %v", t, err)
  169. }
  170. err = publish(ctx, r, cli)
  171. if err != nil {
  172. return fmt.Errorf("%s: Error publish the tag payload %s: %v", errorx.IOErr, t.TagName, err)
  173. }
  174. ctx.GetLogger().Debugf("Publish %s", r)
  175. return nil
  176. }
  177. func GetSink() *sink {
  178. return &sink{}
  179. }