介绍
EdgeX MessageBus
是EdgeX
的内部消息总线,用于EdgeX服务(EdgeX Core/Support/Application/Device Service 获取是其他基于EdgeX SDKs自定义的 Application or Device Service)之间的内部通信。
下图展示了每个EdgeX服务对EdgeX MessageBus
的使用:
EdgeX MessageBus
不是外部服务与内部EdgeX服务通信的方式。
EdgeX服务与外部服务通信的方式可以归纳为以下几点:
-
通过REST API与所有的EdgeX服务进行通信
-
ApplicationService使用外部的MQTT Trigger进行通信
-
ApplicationService使用HTTP Trigger进行通信
-
ApplicationService使用自定义的Trigger进行通信
-
Core Command集成外部MQTT
Message Envelope
向EdgeX MessageBus
发送的所有消息都被封装成MessageEnvelope
模型类。我们可以看看Golang中的具体实现:
// Versionable shows the API version in DTOs
type Versionable struct {
ApiVersion string `json:"apiVersion" yaml:"apiVersion" validate:"required"`
}
// MessageEnvelope is the data structure for messages. It wraps the generic message payload with attributes.
type MessageEnvelope struct {
// ApiVersion (from Versionable) shows the API version for the message envelope.
commonDTO.Versionable
// ReceivedTopic is the topic that the message was received on.
ReceivedTopic string `json:"receivedTopic"`
// CorrelationID is an object id to identify the envelope.
CorrelationID string `json:"correlationID"`
// RequestID is an object id to identify the request.
RequestID string `json:"requestID"`
// ErrorCode provides the indication of error. '0' indicates no error, '1' indicates error.
// Additional codes may be added in the future. If non-0, the payload will contain the error.
ErrorCode int `json:"errorCode"`
// Payload is byte representation of the data being transferred.
Payload []byte `json:"payload"`
// ContentType is the marshaled type of payload, i.e. application/json, application/xml, application/cbor, etc
ContentType string `json:"contentType"`
// QueryParams is optionally provided key/value pairs.
QueryParams map[string]string `json:"queryParams,omitempty"`
}
实现配置
在edgex中,提供了四种EdgeX MessageBus
的实现方式:Redis Pub/Sub、MQTT 3.1、NATS Core、NATS JetStream,默认使用的是Redis Pub/Sub
实现。
MessageBus配置源码(Golang)
// MessageBusInfo provides parameters related to connecting to the EdgeX MessageBus
type MessageBusInfo struct {
// Disabled indicates if the use of the EdgeX MessageBus is disabled.
Disabled bool
// Indicates the message bus implementation to use, i.e. zero, mqtt, redisstreams...
Type string
// Protocol indicates the protocol to use when accessing the message bus.
Protocol string
// Host is the hostname or IP address of the broker, if applicable.
Host string
// Port defines the port on which to access the message bus.
Port int
// AuthMode specifies the type of secure connection to the message bus which are 'none', 'usernamepassword'
// 'clientcert' or 'cacert'. Not all option supported by each implementation.
// RedisStreams only supports 'none' & 'usernamepassword' while MQTT and NATS support all options.
AuthMode string
// SecretName is the name of the secret in the SecretStore that contains the Auth Credentials. The credential are
// dynamically loaded using this name and store the Option property below where the implementation expected to
// find them.
SecretName string
// BaseTopicPrefix is the base topic prefix that all topics start with.
// If not set the DefaultBaseTopic constant is used.
BaseTopicPrefix string
// Provides additional configuration properties which do not fit within the existing field.
// Typically, the key is the name of the configuration property and the value is a string representation of the
// desired value for the configuration property.
Optional map[string]string
}
配置参数
在所有的edgex services通用的配置文件中,配置如下方所示(yaml)(以mqtt为例):
MessageBus:
Type: "mqtt"
Protocol: "tcp"
Host: "localhost" # in docker this must be overriden to be the docker host name of the MQTT Broker
Port: 1883
AuthMode: "none" # set to "usernamepassword" when running in secure mode
SecreName: "message-bus"
公共参数说明
参数名 | 默认值 | 备注 |
---|---|---|
Disabled | false | 是否启用/禁用MessageBus |
Type | redis | MessageBus实现类型:redis、mqtt、nats-core、nats-jetstream |
Protocol | redis | 协议 |
Host | localhost | 服务ip地址 |
Port | 6379 | 服务端口号 |
AuthMode | usernamepassword | 认证模式 |
SecretName | redisb | 在服务的SecretStore中存储的凭证名称 |
BaseTopicPrefix | edgex | MessageBus中所有topic的前缀 |
Optional | 可选配置 |
Redis Pub/Sub
安全配置
参数名 | 默认值 | 备注 |
---|---|---|
AuthMode | usernamepassword | 认证模式,可选:none,usernamepassword,clientcert,cacert。在安全模式下Redis Pub/Sub使用usernamepassword |
SecretName | redisb | 在服务的SecretStore中存储的凭证名称 |
可选配置
当前模式没有可选配置
MQTT 3.1
安全配置
参数名 | 默认值 | 备注 |
---|---|---|
AuthMode | usernamepassword | 认证模式,可选:none,usernamepassword,clientcert,cacert。在安全模式下MQTT使用usernamepassword |
SecretName | redisb | 在服务的SecretStore中存储的凭证名称 |
可选配置
参数名 | 默认值 | 备注 |
---|---|---|
ClientId | service key | 连接mqtt的客户端唯一标识 |
Qos | 0 | 0: At most once delivery 1: At least once delivery 2: Exactly once delivery |
KeepAlive | 10 | mqtt client连接保持时长(单位:秒) |
Retained | false | 如果为true,则服务器必须存储应用消息及其QoS,可参阅MQTT的保留消息 |
AutoReconnect | true | 是否在连接断开时自动重连 |
ConnectTimeout | 30 | 连接超时时间 |
CleanSession | false | 如果为true,则服务器必须放弃任何以前的会话并启动新会话 |
MessageBus的通配符配置
EdgeX MessageBus使用通配符来允许订阅过滤数据,类似MQTT的方案。
在Redis Pub/Sub中,“.”用作级别分隔符,星号“”后面跟着一个级别分隔符用作单层通配符,末尾的星号“”用作多级通配符。它们分别转换为MQTT使用的“/”、“+”和“#”。