作者都是各自领域经过审查的专家,并撰写他们有经验的主题. 我们所有的内容都经过同行评审,并由同一领域的Toptal专家验证.
An expert in graphics, robotics, and backends, Adnan专注于用c++构建高性能解决方案, JS and many other languages.
微服务架构是设计和实现高度可扩展web应用程序的一种非常流行的方法. 单体应用程序中组件之间的通信通常基于同一进程中的方法或函数调用. A microservices‑based application, on the other hand, is a distributed system running on multiple machines.
为了拥有稳定和可扩展的系统,这些微服务之间的通信非常重要. There are multiple ways to do this. 基于消息的通信是可靠地做到这一点的一种方法.
在使用消息传递时,组件通过异步交换消息来相互交互. Messages are exchanged through channels.
When Service A wants to communicate with Service B, instead of sending it directly, A sends it to a specific channel. 当服务B想要读取消息时,它从一个特定的消息通道中获取消息.
In this Spring Integration tutorial, 您将学习如何使用Redis在Spring应用程序中实现消息传递. 您将了解一个示例应用程序,其中一个服务正在向队列中推送事件,而另一个服务正在逐个处理这些事件.
Spring Integration项目扩展了Spring框架,为基于Spring的应用程序之间或内部的消息传递提供支持. Components are wired together via the messaging paradigm. 单个组件可能不知道应用程序中的其他组件.
Spring Integration为与外部系统通信提供了广泛的机制选择. 通道适配器就是用于单向集成(发送或接收)的一种机制。. 网关用于请求/应答场景(入站或出站).
Apache Camel is an alternative that is widely used. 在现有的基于Spring的服务中,Spring集成通常是首选,因为它是Spring生态系统的一部分.
Redis is an extremely fast in-memory data store. It can optionally persist to a disk also. 它支持不同的数据结构,如简单的键值对、集合、队列等.
使用Redis作为队列使得组件之间的数据共享和水平扩展变得更加容易. 一个生产者或多个生产者可以将数据推送到队列, 一个或多个消费者可以提取数据并处理事件.
多个使用者不能使用相同的事件—这确保一个事件只处理一次.
Benefits of using Redis as a message queue:
Rules:
下面通过创建一个示例应用程序来解释如何使用Spring Integration与Redis.
假设您有一个允许用户发布帖子的应用程序. And you want to build a follow feature. 另一个要求是,每当有人发布帖子时, 所有的关注者应该通过一些通信渠道(如.g., email or push notification).
实现这一点的一种方法是,一旦用户发布内容,就向每个关注者发送电子邮件. But what happens when the user has 1,000 followers? 当1000个用户在10秒内发布内容时,每个用户都有1000个关注者? 此外,出版商的帖子将等到所有的电子邮件发送?
Distributed systems resolve this problem.
This specific problem could be resolved by using a queue. 负责发布帖子的服务A(生产者)将负责这一工作. 它将发布一个帖子,并推送一个事件,其中包含需要接收电子邮件和帖子本身的用户列表. The list of users could be fetched in service B, but for simplicity of this example, we will send it from service A.
This is an asynchronous operation. 这意味着正在发布的服务将不必等待发送电子邮件.
服务B(消费者)将从队列中提取事件并处理它. 通过这种方式,我们可以很容易地扩展我们的服务,我们可以做到 n
consumers sending emails (processing events).
因此,让我们从生产者服务中的实现开始. Necessary dependencies are:
redis.clients
jedis
org.springframework.data
spring-data-redis
org.springframework.integration
spring-integration-redis
These three Maven dependencies are necessary:
Next, we need to configure the Jedis client:
@Configuration
public class RedisConfig {
@Value("${redis.host}")
private String redisHost;
@Value("${redis.port:6379}")
private int redisPort;
@Bean
public JedisPoolConfig poolConfig() {
JedisPoolConfig poolConfig = new JedisPoolConfig();
poolConfig.setMaxTotal(128);
return poolConfig;
}
@Bean
RedisConnectionFactory (JedisPoolConfig)
最后JedisConnectionFactory连接工厂=新的JedisConnectionFactory();
connectionFactory.setHostName(redisHost);
connectionFactory.setPort(redisPort);
connectionFactory.setPoolConfig(poolConfig);
connectionFactory.setUsePool(true);
return connectionFactory;
}
}
The annotation @Value
意味着Spring将把应用程序属性中定义的值注入到字段中. This means redis.host
and redis.port
values should be defined in the application properties.
现在,我们需要定义要发送到队列的消息. A simple example message could look like:
@Getter
@Setter
@Builder
public class PostPublishedEvent {
private String postUrl;
private String postTitle;
private List emails;
}
Note: Project Lombok (http://projectlombok.org/) provides the @Getter
, @Setter
, @Builder
,以及许多其他注释,以避免与getter、setter和其他琐碎的东西混淆代码. You can learn more about it from this Toptal article.
消息本身将以JSON格式保存在队列中. 每次将事件发布到队列时,消息将被序列化为JSON. 当从队列中消费时,消息将被反序列化.
定义了消息后,我们需要定义队列本身. In Spring Integration, it can be easily done via an .xml
configuration. The configuration should be placed inside the resources/WEB-INF
directory.
在配置中,您可以看到“int-redis:queue-outbound-channel-adapter”部分.” Its properties are:
MessageChannel
from which this endpoint receives messages.RedisConnectionFactory
bean.#root
variable. This attribute is mutually exclusive with the queue.RedisSerializer
bean reference. By default, it is a JdkSerializationRedisSerializer
. However, for String
payloads, a StringRedisSerializer
is used if a serializer reference isn’t provided.true
.true
) or right push (when false
) to write messages to the Redis list. If true, 当与默认Redis队列入站通道适配器一起使用时,Redis列表充当FIFO队列. Set to false
与从左弹出列表中读取的软件一起使用,或实现类似堆栈的消息顺序. Its default value is true
.下一个步骤是定义网关,这在 .xml
configuration. For a gateway, we are using the RedisChannelGateway
class from the org.toptal.queue
package.
StringRedisSerializer
is used to serialize message before saving in Redis.
Also in the .xml
configuration, we defined the gateway and set RedisChannelGateway
as a gateway service. This means that the RedisChannelGateway
bean could be injected into other beans. We defined the property default-request-channel
方法来提供每个方法的通道引用 @Gateway
annotation. Class definition:
public interface RedisChannelGateway {
void enqueue(PostPublishedEvent event);
}
要将此配置连接到我们的应用程序中,我们必须导入它. This is implemented in the SpringIntegrationConfig
class.
@ImportResource("classpath:WEB-INF/event-queue-config.xml")
@AutoConfigureAfter(RedisConfig.class)
@Configuration
public class SpringIntegrationConfig {
}
@ImportResource
annotation is used to import Spring .xml
configuration files into @Configuration
. And @AutoConfigureAfter
注释用于提示应该在其他指定的自动配置类之后应用自动配置.
现在我们将创建一个服务并实现该方法 enqueue
events to the Redis queue.
public interface QueueService {
void enqueue(PostPublishedEvent event);
}
@Service
public class RedisQueueService implements QueueService {
private RedisChannelGateway channelGateway;
@Autowired
public RedisQueueService(RedisChannelGateway) {
this.channelGateway = channelGateway;
}
@Override
public void enqueue(PostPublishedEvent event) {
channelGateway.enqueue(event);
}
}
方法将消息发送到队列 enqueue
method from QueueService
.
Redis队列只是一个或多个生产者和消费者的列表. To publish a message to a queue, producers use the LPUSH
Redis command. And if you monitor Redis (hint: type redis-cli monitor
), you can see that the message is added to the queue:
“my-event-queue”“LPUSH{\“postUrl \”,\“测试\”,\“postTitle \”:\“测试\”,\“邮件\”:[\“测试”\]}”
Now, 我们需要创建一个消费者应用程序,它将从队列中提取这些事件并对其进行处理. 消费者服务需要与生产者服务相同的依赖关系.
Now we can reuse the PostPublishedEvent
class to deserialize messages.
我们需要创建队列配置,同样,它必须放在 resources/WEB-INF
directory. The content of the queue config is:
In the .xml
configuration, int-redis:queue-inbound-channel-adapter
can have the following properties:
MessageChannel
to which we send messages from this endpoint.SmartLifecycle
属性指定此端点是否应在应用程序上下文启动后自动启动. Its default value is true
.SmartLifecycle
属性指定将在哪个阶段启动此端点. Its default value is 0
.RedisConnectionFactory
bean.MessageChannel
to which we will send ErrorMessages
with Exceptions
from the listening task of the Endpoint
. By default, the underlying MessagePublishingErrorHandler
uses the default errorChannel
from the application context.RedisSerializer
bean reference. It can be an empty string, which means no serializer. In this case, the raw byte[]
从入站Redis消息发送到通道作为 Message
payload. By default, it is a JdkSerializationRedisSerializer
.true
, 序列化器不能是空字符串,因为消息需要某种形式的反序列化(默认是JDK序列化)。. Its default value is false
.TaskExecutor
(or standard JDK 1.5+ Executor) bean. It is used for the underlying listening task. By default, a SimpleAsyncTaskExecutor
is used.true
) or left pop (when false
) to read messages from the Redis list. If true
,当与默认Redis队列出站通道适配器一起使用时,Redis列表充当FIFO队列. Set to false
与用右压向列表写入的软件一起使用,或实现类似堆栈的消息顺序. Its default value is true
.The important part is the “service activator,,它定义了应该使用哪个服务和方法来处理事件.’
Also, the json-to-object-transformer
需要一个type属性以便将JSON转换为对象,上面设置为 type="com.toptal.integration.spring.model.PostPublishedEvent"
.
Again, to wire this config, we will need the SpringIntegrationConfig
class, which can be the same as before. 最后,我们需要一个实际处理事件的服务.
public interface EventProcessingService {
void process(PostPublishedEvent event);
}
@Service("RedisEventProcessingService")
RedisEventProcessingService实现EventProcessingService
@Override
public void process(PostPublishedEvent event) {
// TODO: Send emails here, retry strategy, etc :)
}
}
Once you run the application, you can see in Redis:
"BRPOP" "my-event-queue" "1"
With Spring Integration and Redis, 构建Spring微服务应用程序并不像通常那样令人生畏. 只需少量的配置和少量的样板代码, you can build the foundations of your microservice architecture in no time.
即使您不打算完全放弃当前的Spring项目并切换到新的体系结构, with the help of Redis, 使用队列获得巨大的性能改进非常简单.
基于微服务的应用程序是运行在多台机器上的分布式系统. 系统中的每个服务通过向其他服务传递消息进行通信.
In a monolithic application, 所有组件都位于同一进程中,通信通常基于同一进程中的方法或函数调用.
An expert in graphics, robotics, and backends, Adnan专注于用c++构建高性能解决方案, JS and many other languages.
11
World-class articles, delivered weekly.
World-class articles, delivered weekly.
Join the Toptal® community.