瀏覽代碼

fix(edgex): fix wrong topic format

The profile name must be in front of device name.

Closes: #947

Signed-off-by: Jiyong Huang <huangjy@emqx.io>
Jiyong Huang 3 年之前
父節點
當前提交
8e049db9d6
共有 3 個文件被更改,包括 17 次插入15 次删除
  1. 7 7
      docs/en_US/rules/sinks/edgex.md
  2. 6 6
      docs/zh_CN/rules/sinks/edgex.md
  3. 4 2
      internal/topo/sink/edgex_sink.go

文件差異過大導致無法顯示
+ 7 - 7
docs/en_US/rules/sinks/edgex.md


+ 6 - 6
docs/zh_CN/rules/sinks/edgex.md

@@ -13,13 +13,13 @@
 | host        | 是    | 消息总线主机地址,使用缺省值 `*` 。                    |
 | port        | 是    | 消息总线端口号。 如未指定,使用缺省值 `5563` 。              |
 | topic       | 是    | 发布的主题名称。该主题为固定值。若不同的消息需要动态指定主题,则将该属性置空,并设置 topicPrefix 属性。这两个属性只能设置一个。若两者都未设置,则使用缺省主题 `application` 。          |
-| topicPrefix | 是     | 发布的主题的前缀。发送的主题将采用动态拼接,格式为`$topicPrefix/$deviceName/$profileName/$sourceName` 。|
+| topicPrefix | 是     | 发布的主题的前缀。发送的主题将采用动态拼接,格式为`$topicPrefix/$profileName/$deviceName/$sourceName` 。|
 | contentType | 是    | 发布消息的内容类型,如未指定,使用缺省值 `application/json` 。|
 | messageType   | 是   | EdgeX 消息模型类型。若要将消息发送为类似 apllication service 的 event 类型,则应设置为 `event`。否则,若要将消息发送为类似 device service 或者 core data service 的 event request 类型,则应设置为 `request`。如未指定,使用缺省值 ``event`` 。|
 
 | metadata    | 是    | 该属性为一个字段名称,该字段是 SQL SELECT 子句的一个字段名称,这个字段应该类似于 `meta(*) AS xxx` ,用于选出消息中所有的 EdgeX 元数据 。 |
-| deviceName  | 是    | 允许用户指定设备名称,该名称将作为从 eKuiper 中发送出来的 Event 结构体的设备名称。若在 metadata 中设置了 deviceName 将会优先采用。 |
 | profileName  | 是    | 允许用户指定设备名称,该名称将作为从 eKuiper 中发送出来的 Event 结构体的 profile 名称。若在 metadata 中设置了 profileName 将会优先采用。|
+| deviceName  | 是    | 允许用户指定设备名称,该名称将作为从 eKuiper 中发送出来的 Event 结构体的设备名称。若在 metadata 中设置了 deviceName 将会优先采用。 |
 | sourceName    | 是   | 允许用户指定源名称,该名称将作为从 eKuiper 中发送出来的 Event 结构体的源名称。若在 metadata 中设置了 sourceName 将会优先采用。 |
 | optional      | 是    | 如果指定了 `mqtt` 消息总线,那么还可以指定一下可选的值。请参考以下可选的支持的配置类型。 |
 
@@ -58,8 +58,8 @@
         "host": "localhost",
         "port": 6379,
         "topic": "application",
-        "deviceName": "ekuiper",
         "profileName": "ekuiperProfile",
+        "deviceName": "ekuiper",        
         "contentType": "application/json"
       }
     }
@@ -69,7 +69,7 @@
 
 ### 像 device service 一样发送到 redis 消息总线
 
-通过更改 `topicPrefix` 和 `messageType` 属性,我们可以让 EdgeX sink 模拟设备。设备默认情况下会发送消息到 `edgex/events/device/$deviceName/$profileName/$sourceName` 格式的主题中。所以,我们需要设置 `topicPrefix` 属性为 `edgex/events/device` 以确保消息路由为设备消息。此外,通过与 `metadata` 结合,我们可以发送到动态的主题中,从而模拟多个设备。详情参考下一节[动态元数据](#动态元数据)。
+通过更改 `topicPrefix` 和 `messageType` 属性,我们可以让 EdgeX sink 模拟设备。设备默认情况下会发送消息到 `edgex/events/device/$profileName/$deviceName/$sourceName` 格式的主题中。所以,我们需要设置 `topicPrefix` 属性为 `edgex/events/device` 以确保消息路由为设备消息。此外,通过与 `metadata` 结合,我们可以发送到动态的主题中,从而模拟多个设备。详情参考下一节[动态元数据](#动态元数据)。
 
 ```json
 {
@@ -133,8 +133,8 @@
         "host": "*",
         "port": 5571,
         "topic": "application",
-        "deviceName": "kuiper",
-        "profileName": "kuiperProfile",
+        "profileName": "myprofile",
+        "deviceName": "mydevice",        
         "contentType": "application/json"
       }
     }

+ 4 - 2
internal/topo/sink/edgex_sink.go

@@ -12,6 +12,7 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
+//go:build edgex
 // +build edgex
 
 package sink
@@ -134,7 +135,7 @@ func (ems *EdgexMsgBusSink) Open(ctx api.StreamContext) error {
 	} else if ems.c.Topic != "" {
 		ems.topic = ems.c.Topic
 	} else if ems.c.Metadata == "" { // If meta data are static, the "dynamic" topic is static
-		ems.topic = fmt.Sprintf("%s/%s/%s/%s", ems.c.TopicPrefix, ems.c.DeviceName, ems.c.ProfileName, ems.c.SourceName)
+		ems.topic = fmt.Sprintf("%s/%s/%s/%s", ems.c.TopicPrefix, ems.c.ProfileName, ems.c.DeviceName, ems.c.SourceName)
 	} else {
 		ems.topic = "" // calculate dynamically
 	}
@@ -411,7 +412,7 @@ func (ems *EdgexMsgBusSink) Collect(ctx api.StreamContext, item interface{}) err
 		env.ContentType = ems.c.ContentType
 
 		if ems.topic == "" { // dynamic topic
-			topic = fmt.Sprintf("%s/%s/%s/%s", ems.c.TopicPrefix, evt.DeviceName, evt.ProfileName, evt.SourceName)
+			topic = fmt.Sprintf("%s/%s/%s/%s", ems.c.TopicPrefix, evt.ProfileName, evt.DeviceName, evt.SourceName)
 		} else {
 			topic = ems.topic
 		}
@@ -420,6 +421,7 @@ func (ems *EdgexMsgBusSink) Collect(ctx api.StreamContext, item interface{}) err
 			logger.Errorf("Found error %s when publish to EdgeX message bus.\n", e)
 			return e
 		}
+		logger.Debugf("Published %+v to EdgeX message bus topic %s", evt, topic)
 	} else {
 		return fmt.Errorf("Unkown type %t, the message cannot be published.\n", item)
 	}