文章
问答
冒泡
EdgeX Foundry之MessageBus

介绍

EdgeX MessageBusEdgeX的内部消息总线,用于EdgeX服务(EdgeX Core/Support/Application/Device Service 获取是其他基于EdgeX SDKs自定义的 Application or Device Service)之间的内部通信。

下图展示了每个EdgeX服务对EdgeX MessageBus的使用:

EdgeX MessageBus不是外部服务与内部EdgeX服务通信的方式。

EdgeX服务与外部服务通信的方式可以归纳为以下几点:

  • 通过REST API与所有的EdgeX服务进行通信

  • ApplicationService使用外部的MQTT Trigger进行通信

  • ApplicationService使用HTTP Trigger进行通信

  • ApplicationService使用自定义的Trigger进行通信

  • Core Command集成外部MQTT

Message Envelope

EdgeX MessageBus发送的所有消息都被封装成MessageEnvelope模型类。我们可以看看Golang中的具体实现:

// Versionable shows the API version in DTOs
type Versionable struct {
	ApiVersion string `json:"apiVersion" yaml:"apiVersion" validate:"required"`
}

// MessageEnvelope is the data structure for messages. It wraps the generic message payload with attributes.
type MessageEnvelope struct {
	// ApiVersion (from Versionable) shows the API version for the message envelope.
	commonDTO.Versionable
	// ReceivedTopic is the topic that the message was received on.
	ReceivedTopic string `json:"receivedTopic"`
	// CorrelationID is an object id to identify the envelope.
	CorrelationID string `json:"correlationID"`
	// RequestID is an object id to identify the request.
	RequestID string `json:"requestID"`
	// ErrorCode provides the indication of error. '0' indicates no error, '1' indicates error.
	// Additional codes may be added in the future. If non-0, the payload will contain the error.
	ErrorCode int `json:"errorCode"`
	// Payload is byte representation of the data being transferred.
	Payload []byte `json:"payload"`
	// ContentType is the marshaled type of payload, i.e. application/json, application/xml, application/cbor, etc
	ContentType string `json:"contentType"`
	// QueryParams is optionally provided key/value pairs.
	QueryParams map[string]string `json:"queryParams,omitempty"`
}

实现配置

在edgex中,提供了四种EdgeX MessageBus的实现方式:Redis Pub/Sub、MQTT 3.1、NATS Core、NATS JetStream,默认使用的是Redis Pub/Sub实现。

MessageBus配置源码(Golang)

// MessageBusInfo provides parameters related to connecting to the EdgeX MessageBus
type MessageBusInfo struct {
	// Disabled indicates if the use of the EdgeX MessageBus is disabled.
	Disabled bool
	// Indicates the message bus implementation to use, i.e. zero, mqtt, redisstreams...
	Type string
	// Protocol indicates the protocol to use when accessing the message bus.
	Protocol string
	// Host is the hostname or IP address of the broker, if applicable.
	Host string
	// Port defines the port on which to access the message bus.
	Port int
	// AuthMode specifies the type of secure connection to the message bus which are 'none', 'usernamepassword'
	// 'clientcert' or 'cacert'. Not all option supported by each implementation.
	// RedisStreams only supports 'none' & 'usernamepassword' while MQTT and NATS support all options.
	AuthMode string
	// SecretName is the name of the secret in the SecretStore that contains the Auth Credentials. The credential are
	// dynamically loaded using this name and store the Option property below where the implementation expected to
	// find them.
	SecretName string
	// BaseTopicPrefix is the base topic prefix that all topics start with.
	// If not set the DefaultBaseTopic constant is used.
	BaseTopicPrefix string
	// Provides additional configuration properties which do not fit within the existing field.
	// Typically, the key is the name of the configuration property and the value is a string representation of the
	// desired value for the configuration property.
	Optional map[string]string
}

配置参数

在所有的edgex services通用的配置文件中,配置如下方所示(yaml)(以mqtt为例):

MessageBus:
  Type: "mqtt"
  Protocol: "tcp" 
  Host: "localhost" # in docker this must be overriden to be the docker host name of the MQTT Broker
  Port: 1883
  AuthMode: "none"  # set to "usernamepassword" when running in secure mode
  SecreName: "message-bus"

公共参数说明

参数名 默认值 备注
Disabled false 是否启用/禁用MessageBus
Type redis MessageBus实现类型:redis、mqtt、nats-core、nats-jetstream
Protocol redis 协议
Host localhost 服务ip地址
Port 6379 服务端口号
AuthMode usernamepassword 认证模式
SecretName redisb 在服务的SecretStore中存储的凭证名称
BaseTopicPrefix edgex MessageBus中所有topic的前缀
Optional   可选配置

Redis Pub/Sub

安全配置

参数名 默认值 备注
AuthMode usernamepassword 认证模式,可选:none,usernamepassword,clientcert,cacert。在安全模式下Redis Pub/Sub使用usernamepassword
SecretName redisb 在服务的SecretStore中存储的凭证名称

可选配置

当前模式没有可选配置

MQTT 3.1

安全配置

参数名 默认值 备注
AuthMode usernamepassword 认证模式,可选:none,usernamepassword,clientcert,cacert。在安全模式下MQTT使用usernamepassword
SecretName redisb 在服务的SecretStore中存储的凭证名称

可选配置

参数名 默认值 备注
ClientId service key 连接mqtt的客户端唯一标识
Qos 0 0: At most once delivery 1: At least once delivery 2: Exactly once delivery
KeepAlive 10 mqtt client连接保持时长(单位:秒)
Retained false 如果为true,则服务器必须存储应用消息及其QoS,可参阅MQTT的保留消息
AutoReconnect true 是否在连接断开时自动重连
ConnectTimeout 30 连接超时时间
CleanSession false 如果为true,则服务器必须放弃任何以前的会话并启动新会话

MessageBus的通配符配置

EdgeX MessageBus使用通配符来允许订阅过滤数据,类似MQTT的方案。

在Redis Pub/Sub中,“.”用作级别分隔符,星号“”后面跟着一个级别分隔符用作单层通配符,末尾的星号“”用作多级通配符。它们分别转换为MQTT使用的“/”、“+”和“#”。

golang
edgexfoundry

关于作者

justin
123456
获得点赞
文章被阅读