封面

AWS SQS/SNS/Redis的pub/sub用法总结

最近做一个功能的时候需要用到消息队列,提起消息队列首先想到的就是mq系列的rabbitMQ或者rockMQ,Kafuka之类的服务。另外aws也有自己的消息队列服务SNS和SQS,今天简单记录一下SQS/SNS/Redis的用法。

mq和kafuka对比

业务场景分析

kafka 适合简单业务,性能极强,用于大数据领域的实时计算以及日志采集。

rocketmq 功能多,坑多,性能好, 用于大公司的业务

rabbitmq 功能多,简单易用,性能较差。用于中小公司的业务

activeMQ, 它是消息队列,叫做messagequeen,我们是使用在电商平台上。比如当用户商品上架的时候,它将用户的商品信息,它就是一个消息服务中间件,里面保存了参数值,就是商品上架的同时,将商品的ID信息保存到消息服务的中间件中,这是这一端,然后我到另一端,solr服务端和我的freemarker服务端都会反复不断的监听,监听activeMQ中的一些相应的参数信息,就是ID,如果找到了ID信息,那么我就会相应处理,而且activeMQ可以采用一对一模式和主题订阅模式。当引用了activeMQ的时候,它将用户的商品信息保存到消息服务中间件,然后我的另外2个,一个solr,一个freemarker这2个服务器都可以同时或者很短的时间内异步监听到ID信息,同时取出这个信息,异步的进行先相关处理,这样可以节约大量的时间,这就是为什么用,activeMQ是节省时间的。

社区

rocketmq 社区不太活跃,文档少,相对不太成熟,需要踩坑,不过是用 java 编写。

其他两个比较活跃,rabbitmq 是 erlang 编写,源码学习成本较高。

功能

rocketmq 和 rabbitmq 支持的功能较多,更适合业务。

接口

rocketmq 自己一套接口,有学习成本。

延迟和性能

rabbitmq 延迟低,性能差 rocketmq 一倍。kafka 最强。

rabbitmq 的集群模式感觉不太合理, producer 不知道 queue 数据在哪台机器上,只能挑一台发送,broker如果发现queue数据不在当前节点上就帮你转发,这样产生一次转发的开销。

Kafka、RabbitMQ、RocketMQ等消息中间件的介绍和对比_Brave Soul Blog-CSDN博客blog.csdn.net图标

队列数对性能的影响

  • kafka 单台机器 队列数量大于 64的时候,负载大幅增加。
  • rabbitmq 的集群模式下,会在所有集群节点中同步队列的元信息。有一定的开销,也意味着队列数不宜过多。
  • rocketmq 单台机器撑住五万的队列,由于有专门的namesrv节点存储元信息。所以集群能够存储很多很多队列。

kafka 表现最差,但是得益于性能高,还是适用于队列数少,数据量大的场景。

有序性

rocketmq 可以很容易实现消息有序性。

rabbitmq 很麻烦

kafka 不清楚。

支持事务

mq 支持这个功能不是刚需,有别的办法可以实现类似事务的效果。

SQS是什么

SQS(Simple Queue Service)提供高吞吐量的系统到系统消息收发队列。您可以使用队列来解耦高开销流程,并实现作业的缓冲和批处理。Amazon SQS 会在微服务和无服务器应用程序完成处理前一直存储消息。

SQS作用

高度可扩展的标准队列和 FIFO 队列

队列可根据应用程序的需要弹性扩展。标准队列提供几乎无限的吞吐量,每个队列的消息数量也无限制。FIFO 队列按先进先出原则传送消息,并确保仅处理一次。

持久性和可用性

在多个服务器上分布队列。冗余基础设施确保了对消息的高度并发访问。

安全性

保护传输中的数据和静态数据。通过加密队列传输敏感数据。在 Virtual Private Cloud 中发送消息。

批处理

批量发送、接收或删除消息,每批最多 10 条消息或 256KB。

SQS用法

spring 读取配置文件

import com.amazonaws.regions.Regions;
import javax.validation.constraints.NotBlank;
import javax.validation.constraints.NotNull;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.validation.annotation.Validated;

@Validated
@ConfigurationProperties("aws.credentials")
@Data
public class AwsCredentialsProperties {
@NotBlank private String accessKey;
@NotBlank private String secretKey;
@NotNull private Regions region;
}

设置用于开发的 AWS 凭证和区域 将读取到的配置文件填入snsClient中,注入一个bean

import info.xiamo.property.AwsCredentialsProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.sns.SnsClient;

@Configuration
public class AwsSnsConfiguration {

@Bean
public SnsClient sqsClient(AwsCredentialsProperties awsCredentialsProperties) {
AwsBasicCredentials credentials =
AwsBasicCredentials.create(
awsCredentialsProperties.getAccessKey(),
awsCredentialsProperties.getSecretKey());

StaticCredentialsProvider credentialsProvider =
StaticCredentialsProvider.create(credentials);

String name = awsCredentialsProperties.getRegion().name();
return SnsClient.builder()
.region(Region.of(name))
.credentialsProvider(credentialsProvider)
.build();
}
}
import info.xiaomo.property.AwsCredentialsProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.sqs.SqsClient;

@Configuration
public class AwsSqsConfiguration {

@Bean
public SqsClient sqsClient(AwsCredentialsProperties awsCredentialsProperties) {
return SqsClient.builder()
.region(Region.of(awsCredentialsProperties.getRegion().getName()))
.credentialsProvider(
StaticCredentialsProvider.create(
AwsBasicCredentials.create(
awsCredentialsProperties.getAccessKey(),
awsCredentialsProperties.getSecretKey())))
.build();
}
}
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
import software.amazon.awssdk.services.sqs.SqsClient;
import software.amazon.awssdk.services.sqs.model.*;

import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.parsers.ParserConfigurationException;
import javax.xml.transform.Transformer;
import javax.xml.transform.TransformerException;
import javax.xml.transform.TransformerFactory;
import javax.xml.transform.dom.DOMSource;
import javax.xml.transform.stream.StreamResult;
import java.io.StringWriter;
import java.util.*;

@Component
public class AwsSqsClient {

private final String QUEUE_NAME = "clearCacheQueue";

@Autowired
private SqsClient sqsClient;

/** 清除消息队列 */
public void purgeMyQueue() {
GetQueueUrlRequest getQueueRequest =
GetQueueUrlRequest.builder().queueName(QUEUE_NAME).build();

PurgeQueueRequest queueRequest =
PurgeQueueRequest.builder()
.queueUrl(sqsClient.getQueueUrl(getQueueRequest).queueUrl())
.build();

sqsClient.purgeQueue(queueRequest);
}

/**
* 获取消息
*
* @return String
*/
public String getMessages() {
List<String> attr = new ArrayList<>();
attr.add("Name");

try {

GetQueueUrlRequest getQueueRequest =
GetQueueUrlRequest.builder().queueName(QUEUE_NAME).build();

String queueUrl = sqsClient.getQueueUrl(getQueueRequest).queueUrl();

ReceiveMessageRequest receiveRequest =
ReceiveMessageRequest.builder()
.queueUrl(queueUrl)
.maxNumberOfMessages(10)
.messageAttributeNames(attr)
.build();
List<Message> messages = sqsClient.receiveMessage(receiveRequest).messages();

SqsMessage myMessage;

List<SqsMessage> allMessages = new ArrayList<>();

for (Message m : messages) {

myMessage = new SqsMessage();
myMessage.setBody(m.body());

Map<String, MessageAttributeValue> map = m.messageAttributes();
MessageAttributeValue val = map.get("Name");
myMessage.setName(val.stringValue());

allMessages.add(myMessage);
}

return convertToString(toXml(allMessages));

} catch (SqsException e) {
e.getStackTrace();
}
return "";
}

public void processMessage(SqsMessage msg) {
try {
MessageAttributeValue attributeValue =
MessageAttributeValue.builder()
.stringValue(msg.getName())
.dataType("String")
.build();

Map<String, MessageAttributeValue> myMap = new HashMap<>();
myMap.put("Name", attributeValue);

GetQueueUrlRequest getQueueRequest =
GetQueueUrlRequest.builder().queueName(QUEUE_NAME).build();

String queueUrl = sqsClient.getQueueUrl(getQueueRequest).queueUrl();

UUID uuid = UUID.randomUUID();
String msgId1 = uuid.toString();

SendMessageRequest sendMsgRequest =
SendMessageRequest.builder()
.queueUrl(queueUrl)
.messageAttributes(myMap)
.messageGroupId("GroupA")
.messageDeduplicationId(msgId1)
.messageBody(msg.getBody())
.build();
sqsClient.sendMessage(sendMsgRequest);

} catch (SqsException e) {
e.getStackTrace();
}
}

private Document toXml(List<SqsMessage> itemList) {

try {
DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
DocumentBuilder builder = factory.newDocumentBuilder();
Document doc = builder.newDocument();

Element root = doc.createElement("Messages");
doc.appendChild(root);

for (SqsMessage myMessage : itemList) {

Element item = doc.createElement("Message");
root.appendChild(item);

Element id = doc.createElement("Data");
id.appendChild(doc.createTextNode(myMessage.getBody()));
item.appendChild(id);

Element name = doc.createElement("User");
name.appendChild(doc.createTextNode(myMessage.getName()));
item.appendChild(name);
}

return doc;
} catch (ParserConfigurationException e) {
e.printStackTrace();
}
return null;
}

private String convertToString(Document xml) {
try {
Transformer transformer = TransformerFactory.newInstance().newTransformer();
StreamResult result = new StreamResult(new StringWriter());
DOMSource source = new DOMSource(xml);
transformer.transform(source, result);
return result.getWriter().toString();

} catch (TransformerException ex) {
ex.printStackTrace();
}
return null;
}

@Data
@AllArgsConstructor
@NoArgsConstructor
static class SqsMessage {
private String id;
private String body;
private String name;
}
}

SNS 是什么

SNS(Simple Notification Service)是发布/订阅主题的消息托管消息收发服务,可让您将发布者与订阅者分离。这对于用于微服务、分布式架构和无服务器应用程序的系统到系统消息收发很有用。

sns-how-works

SNS作用

Amazon SNS 可让您将推送通知发送到移动应用程序,将文本消息发送到电话号码,并将纯文本电子邮件发送到电子邮件地址。您可以通过主题来散发消息,或直接将消息发布到移动终端节点。

SQS VS SNS 区别

Amazon SQS 和 Amazon SNS 都是 AWS 中的消息发送服务,但为开发人员提供了不同的优势。Amazon SNS 允许应用程序通过“推送”机制向多个订阅者发送时间关键型消息,并且无需定期检查或“轮询”更新。Amazon SQS 是供分布式应用程序使用的消息队列服务,它通过轮询模式交换消息,可用于解耦收发组件。Amazon SQS 使应用程序的分布式组件可以灵活地收发消息,并且不要求每个组件同时可用。

SNS重点在推送,需要接收组件同时可用

SQS更多是一个消息队列服务,需要使用组件通过轮询模式交换消息,不要求每个组件同时可用

SNS用法

pdf文档

sns 命令行

您可以使用 AWS Command Line Interface (AWS CLI) 访问 Amazon Simple Notification Service (Amazon SNS) 的功能。要列出 Amazon SNS 的 AWS CLI 命令,请使用以下命令。

aws sns help

在运行任何命令之前,请设置默认证书。有关更多信息,请参阅 配置 AWS CLI

本主题显示执行 Amazon SNS 常见任务的 CLI 命令。

主题

创建主题

要创建主题,请使用 create-topic 命令并指定要分配给该主题的名称。

$ aws sns create-topic --name my-topic
{
"TopicArn": "arn:aws:sns:us-west-2:123456789012:my-topic"
}

记下响应的 TopicArn,您随后将用它来发布消息。

订阅主题

要订阅主题,请使用 subscribe 命令。

以下示例为 notification-endpoint 指定 email 协议和电子邮件地址。

$ aws sns subscribe --topic-arn arn:aws:sns:us-west-2:123456789012:my-topic --protocol email --notification-endpoint saanvi@example.com
{
"SubscriptionArn": "pending confirmation"
}

AWS 通过向您在 subscribe 命令中指定的地址发送电子邮件,立即发送确认电子邮件。电子邮件具有以下文本。

You have chosen to subscribe to the topic:
arn:aws:sns:us-west-2:123456789012:my-topic
To confirm this subscription, click or visit the following link (If this was in error no action is necessary):
Confirm subscription

收件人单击确认订阅链接后,收件人的浏览器显示通知消息,信息类似于以下内容。

Subscription confirmed!

You have subscribed saanvi@example.com to the topic:my-topic.

Your subscription's id is:
arn:aws:sns:us-west-2:123456789012:my-topic:1328f057-de93-4c15-512e-8bb22EXAMPLE

If it was not your intention to subscribe, click here to unsubscribe.

向主题发布

要将消息发送给某一主题的所有订阅者,请使用 publish 命令。

以下示例向指定主题的所有订阅者发送消息“Hello World!”。

$ aws sns publish --topic-arn arn:aws:sns:us-west-2:123456789012:my-topic --message "Hello World!"
{
"MessageId": "4e41661d-5eec-5ddf-8dab-2c867EXAMPLE"
}

在本示例中,AWS 将包含文本“Hello World!”的电子邮件发送到 `saanvi@example.com`。

取消订阅主题

要取消订阅某个主题并停止接收向该主题发布的消息,请使用 unsubscribe 命令并指定您要取消订阅的主题的 ARN。

$ aws sns unsubscribe --subscription-arn arn:aws:sns:us-west-2:123456789012:my-topic:1328f057-de93-4c15-512e-8bb22EXAMPLE

要验证您已成功取消订阅,请使用 list-subscriptions 命令以确认该 ARN 不再显示在列表中。

$ aws sns list-subscriptions

删除主题

要删除主题,请运行 delete-topic 命令。

$ aws sns delete-topic --topic-arn arn:aws:sns:us-west-2:123456789012:my-topic

要验证 AWS 已成功删除主题,请使用 list-topics 命令以确认该主题不再显示在列表中。

$ aws sns list-topics

java集成SNS(警告触发)

gradle

implementation ("software.amazon.awssdk:sns:2.15.19")

maven

<!-- https://mvnrepository.com/artifact/software.amazon.awssdk/sns -->
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>sns</artifactId>
<version>2.15.19</version>
<scope>test</scope>
</dependency>

yaml配置文件

// application.yml 配置文件
aws:
credentials:
region: ap_northeast_1
accessKey: your accessKey
secretKey: you secretKey

spring 读取配置文件

import com.amazonaws.regions.Regions;
import javax.validation.constraints.NotBlank;
import javax.validation.constraints.NotNull;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.validation.annotation.Validated;

@Validated
@ConfigurationProperties("aws.credentials")
@Data
public class AwsCredentialsProperties {
@NotBlank private String accessKey;
@NotBlank private String secretKey;
@NotNull private Regions region;
}

设置用于开发的 AWS 凭证和区域 将读取到的配置文件填入snsClient中,注入一个bean

import info.xiamo.property.AwsCredentialsProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.sns.SnsClient;

@Configuration
public class AwsSnsConfiguration {

@Bean
public SnsClient snsClient(AwsCredentialsProperties awsCredentialsProperties) {
AwsBasicCredentials credentials =
AwsBasicCredentials.create(
awsCredentialsProperties.getAccessKey(),
awsCredentialsProperties.getSecretKey());

StaticCredentialsProvider credentialsProvider =
StaticCredentialsProvider.create(credentials);

String name = awsCredentialsProperties.getRegion().getName();
return SnsClient.builder()
.region(Region.of(name))
.credentialsProvider(credentialsProvider)
.build();
}
}

使用注册好的SnsClient调用api

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import software.amazon.awssdk.services.sns.SnsClient;
import software.amazon.awssdk.services.sns.model.*;

import java.util.List;
import java.util.Map;

@Component
@Slf4j
public class AwsSnsClient {

@Autowired private SnsClient snsClient;

// 创建主题
//主题是通信通道的逻辑分组,可定义要将消息发送到的系统,例如,将消息发送到 AWS Lambda 和 HTTP Webhook。将消息发送到 Amazon SNS 之后,这些消息将分发到主题中定义的各个通道。这将使订阅者能够收到这些消息。要创建主题,请先构建一个 CreateTopicRequest 对象,并使用构建器中的 name() 方法设置主题的名称。然后,使用 SnsClient 的 createTopic() 方法将请求对象发送到 Amazon SNS。可以将此请求的结果作为 CreateTopicResponse 对象捕获,如以下代码段中所示。
public String createSNSTopic(String topicName) {
CreateTopicResponse result;
try {
CreateTopicRequest request = CreateTopicRequest.builder().name(topicName).build();

result = snsClient.createTopic(request);
return result.topicArn();
} catch (SnsException e) {
log.error("create sns topic error:{}", e.awsErrorDetails().errorMessage());
}
return null;
}

// 删除主题
// 要删除 Amazon SNS 主题,请先构建一个 DeleteTopicRequest 对象,并将主题的 ARN 设置为构建器中的 topicArn() 方法。然后,使用 SnsClient 的 deleteTopic() 方法将请求对象发送到 Amazon SNS。可以将此请求的结果作为 DeleteTopicResponse 对象捕获,如以下代码段中所示。
public DeleteTopicResponse deleteSNSTopic(String topicArn) {
try {
DeleteTopicRequest request = DeleteTopicRequest.builder().topicArn(topicArn).build();

DeleteTopicResponse result = snsClient.deleteTopic(request);
return result;

} catch (SnsException e) {
log.error("delete sns topic error:{}", e.awsErrorDetails().errorMessage());
}
return null;
}

public Map<String, String> getSNSAttributes(String topicArn) {

try {
GetSubscriptionAttributesRequest request =
GetSubscriptionAttributesRequest.builder().subscriptionArn(topicArn).build();

GetSubscriptionAttributesResponse res = snsClient.getSubscriptionAttributes(request);
return res.attributes();

} catch (SnsException e) {
log.error("get sns attribute error:{}", e.awsErrorDetails().errorMessage());
}
return null;
}

// 获取sns属性
public Map<String, String> getSNSTopicAttributes(String topicArn) {

try {
GetTopicAttributesRequest request =
GetTopicAttributesRequest.builder().topicArn(topicArn).build();

GetTopicAttributesResponse result = snsClient.getTopicAttributes(request);
return result.attributes();

} catch (SnsException e) {
log.error("get sns topic attributes error:{}", e.awsErrorDetails().errorMessage());
return null;
}
}

public List<Subscription> listSNSSubscriptions() {
try {
ListSubscriptionsRequest request = ListSubscriptionsRequest.builder().build();
ListSubscriptionsResponse result = snsClient.listSubscriptions(request);
return result.subscriptions();

} catch (SnsException e) {
log.error("list sns subscriptions error:{}", e.awsErrorDetails().errorMessage());
return null;
}
}

// 展示主题列表
// 要检索现有 Amazon SNS 主题的列表,请构建一个 ListTopicsRequest 对象。然后,使用 SnsClient 的 listTopics() 方法将请求对象发送到 Amazon SNS。可以将此请求的结果作为 ListTopicsResponse 对象捕获。以下代码段输出请求的 HTTP 状态代码以及您的 Amazon SNS 主题的 Amazon 资源名称 (ARN) 列表。
public List<Topic> listSNSTopics() {
try {
ListTopicsRequest request = ListTopicsRequest.builder().build();

ListTopicsResponse result = snsClient.listTopics(request);
log.info(
"Status was {} Topics :{}",
result.sdkHttpResponse().statusCode(),
result.topics());

return result.topics();
} catch (SnsException e) {
log.error("list sns topics error: {}", e.awsErrorDetails().errorMessage());
return null;
}
}

// 发布消息
// 如果您拥有一个主题并且已为该主题配置一个或多个终端节点,则可向该主题发布消息。首先,请构建一个 PublishRequest 对象。指定要发送的 message(),并指定要将消息发送到的主题的 ARN (topicArn())。然后,使用 SnsClient 的 publish() 方法将请求对象发送到 Amazon SNS。可以将此请求的结果作为 PublishResponse 对象捕获。
public PublishResponse publishTopic(String message, String topicArn) {
try {
PublishRequest request =
PublishRequest.builder().message(message).topicArn(topicArn).build();

return snsClient.publish(request);

} catch (SnsException e) {
log.error("public topic error:{}", e.awsErrorDetails().errorMessage());
return null;
}
}

// 订阅https
// 创建主题后,您可以配置将哪些通信通道作为该主题的终端节点。在 Amazon SNS 收到消息后,消息将分发给这些终端节点。要将通信通道配置为主题的终端节点,请为该终端节点订阅主题。首先,请构建一个 SubscribeRequest 对象。将通信通道(例如,lambda 或 email)指定为 protocol()。将 endpoint() 设置为相关输出位置(例如,Lambda 函数的 ARN 或电子邮件地址),然后将要订阅的主题的 ARN 设置为 topicArn()。然后,使用 SnsClient 的 subscribe() 方法将请求对象发送到 SNS。可以将此请求的结果作为 SubscribeResponse 对象捕获。

以下代码段说明如何为电子邮件地址订阅主题。
public SubscribeResponse subscribeHttps(String topicArn, String url) {

try {
SubscribeRequest request =
SubscribeRequest.builder()
.protocol("https")
.endpoint(url)
.returnSubscriptionArn(true)
.topicArn(topicArn)
.build();

return snsClient.subscribe(request);

} catch (SnsException e) {
log.error("subscribe https error:{}", e.awsErrorDetails().errorMessage());
return null;
}
}

// 取消订阅
// 可以删除配置为主题的终端节点的通信通道。执行此操作后,主题本身将继续存在,并会将消息分发到为该主题配置的任何其他终端节点。要删除作为主题的终端节点的通信通道,请为该终端节点取消订阅主题。首先,请构建一个 UnsubscribeRequest 对象,并将要取消订阅的主题的 ARN 设置为 subscriptionArn()。然后,使用 SnsClient 的 unsubscribe() 方法将请求对象发送到 SNS。可以将此请求的结果作为 UnsubscribeResponse 对象捕获。
public UnsubscribeResponse unSubscription(String subscriptionToken) {
try {
UnsubscribeRequest request =
UnsubscribeRequest.builder().subscriptionArn(subscriptionToken).build();
return snsClient.unsubscribe(request);
} catch (SnsException e) {
log.error("unSubscription error:{}", e.awsErrorDetails().errorMessage());
return null;
}
}
}

在controller中注入并测试

import info.xiaomo.client.AwsSnsClient;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import software.amazon.awssdk.services.sns.model.*;

import java.util.List;
import java.util.Map;

@RestController
@RequestMapping("/api/v1/sns")
@Slf4j
public class SnsController {

private static final String TOPIC_NAME = "clear-cache";

@Autowired private AwsSnsClient awsSnsClient;

/**
* 查询所有的主题
*
* @return 主题列表
*/
@RequestMapping("/listSNSTopics")
public List<Topic> listSNSTopics() {
return awsSnsClient.listSNSTopics();
}

/**
* 查询所有的订阅
*
* @return 订阅列表
*/
@RequestMapping("/listSnsSubscriptions")
public List<Subscription> listSnsSubscriptions() {
return awsSnsClient.listSNSSubscriptions();
}

/**
* 创建主题
*
* @return CreateTopicResponse
*/
@RequestMapping("/createSnsTopic")
public CreateTopicResponse createSnsTopic() {
return awsSnsClient.createSNSTopic(TOPIC_NAME);
}

/**
* 删除主题
*
* @return 删除结果
*/
@RequestMapping("/deleteSnsTopic")
public boolean deleteSnsTopic() {
DeleteTopicResponse deleteTopicResponse = awsSnsClient.deleteSNSTopic(TOPIC_NAME);
return deleteTopicResponse.sdkHttpResponse().isSuccessful();
}

/**
* 发布主题 如果您拥有一个主题并且已为该主题配置一个或多个终端节点,则可向该主题发布消息。
*
* @param message message
* @return message
*/
@RequestMapping("/publishTopic")
public PublishResponse publishTopic(String message) {
return awsSnsClient.publishTopic(message, TOPIC_NAME);
}

/**
* 订阅 创建主题后,您可以配置将哪些通信通道作为该主题的终端节点。在 Amazon SNS 收到消息后,消息将分发给这些终端节点。
*
* @param url url
* @return SubscribeResponse
*/
@RequestMapping("/subscribe/{url}")
public SubscribeResponse subscribe(@PathVariable String url) {
return awsSnsClient.subscribeHttps(TOPIC_NAME, url);
}

/**
* 取消订阅
*
* @param subscriptionToken subscriptionToken
* @return 是否取消成功
*/
@RequestMapping("/unSubscribe/{subscriptionToken}")
public boolean unSubscribe(@PathVariable String subscriptionToken) {
UnsubscribeResponse unsubscribeResponse = awsSnsClient.unSubscription(subscriptionToken);
return unsubscribeResponse.sdkHttpResponse().isSuccessful();
}

/**
* 查询所有的属性
*
* @return 属性Map
*/
@RequestMapping("/getSNSAttributes")
public Map<String, String> getSNSAttributes() {
return awsSnsClient.getSNSAttributes(TOPIC_NAME);
}

/**
* 查询主题的属性
*
* @return 主题属性Map
*/
@RequestMapping("/getSNSTopicAttributes")
public Map<String, String> getSNSTopicAttributes() {
return awsSnsClient.getSNSTopicAttributes(TOPIC_NAME);
}

/** 测试 */
@RequestMapping("")
public void test() {
List<Topic> topics = awsSnsClient.listSNSTopics();
List<Subscription> subscriptions = awsSnsClient.listSNSSubscriptions();
System.out.println(topics);
System.out.println(subscriptions);

String topicName = "clearCacheTopic";
String topic = awsSnsClient.createSNSTopic(topicName).topicArn();
System.out.println(topic);

awsSnsClient.subscribeHttps(topicName, "xxx");

PublishResponse publishResponse = awsSnsClient.publishTopic("hello", topicName);
if (publishResponse != null) {
System.out.println(publishResponse.messageId());
}

UnsubscribeResponse unsubscribeResponse = awsSnsClient.unSubscription("1123");
if (unsubscribeResponse != null) {
System.out.println(unsubscribeResponse.sdkHttpResponse().statusCode());
}

Map<String, String> clearCacheTopic = awsSnsClient.getSNSAttributes(topicName);
for (String value : clearCacheTopic.values()) {
System.out.println(value);
}

DeleteTopicResponse deleteTopicResponse = awsSnsClient.deleteSNSTopic(topicName);
System.out.println(deleteTopicResponse.toString());

Map<String, String> attributes = awsSnsClient.getSNSTopicAttributes(topicName);
for (String value : attributes.values()) {
log.info(value);
}
}
}

Amazon ElastiCache for Redis

Amazon ElastiCache for Redis 是速度超快的内存数据存储,能够提供亚毫秒级延迟来支持 Internet 范围内的实时应用程序。适用于 Redis 的 ElastiCache 基于开源 Redis 构建,可与 Redis API 兼容,能够与 Redis 客户端配合工作,并使用开放的 Redis 数据格式来存储数据。自我管理型 Redis 应用程序可与适用于 Redis 的 ElastiCache 无缝配合使用,无需更改任何代码。适用于 Redis 的 ElastiCache 兼具开源 Redis 的速度、简单性和多功能性与 Amazon 的可管理性、安全性和可扩展性,能够在游戏、广告技术、电子商务、医疗保健、金融服务和物联网领域支持要求最严苛的实时应用程序。支持pub/sub模式。

极致性能

适用于 Redis 的 Amazon ElastiCache 可以用作内存数据存储,能够支持要求最严苛且需要亚毫秒级响应时间的应用程序。与基于磁盘的数据库(其中大多数操作都需要往返于磁盘)相比,内存数据存储可以管理数据内存,其速度比磁盘提高了一个数量级。它可以提供超快的性能,读取或写入操作的平均时间不到一毫秒,并支持集群内每秒数亿次的操作。ElastiCache 可为您提供经过优化的端到端硬件和软件堆栈,以实现超快的性能。

安全

从 Amazon ElastiCache for Redis 6 开始,ElastiCache 现在提供了创建和管理用户和用户组的功能,可用于为 Redis 命令设置基于角色的访问控制 (RBAC)。您现在可以在保持安全边界的同时简化架构,还可以利用精细访问控制来管理用户组。Amazon ElastiCache for Redis 支持 Amazon VPC,这使您能够将集群隔离在您为节点选择的 IP 范围内。ElastiCache 团队持续监控开源 Redis、操作系统和固件中已知的安全漏洞,以确保您的 Redis 环境安全无虞。它符合 PCI 和 HIPAA 要求,由 FedRAMP 授权,提供动态和静态加密(包括 AWS KMS 中存储的客户管理 CMK)以及 Redis AUTH 来实现安全的节点间通信,从而帮助保护个人身份识别信息 (PII) 等敏感数据的安全。

详细了解基于角色的访问控制 (RBAC) » 详细了解适用于 ElastiCache 的 VPC »

完全托管并已强化

适用于 Redis 的 Amazon ElastiCache 是一项完全托管的服务。您无需执行硬件预置、软件修补、设置、配置、监控、故障恢复和备份等管理任务。ElastiCache 会持续监控您的集群,以保障您的 Redis 正常运行,使您可以集中精力开发更高价值的应用程序。其提供有关您 Redis 使用情况的详细监控指标,使您能够跟踪应用程序趋势并按需调整集群配置。ElastiCache 增加了自动写入限制、智能交换内存管理和故障转移增强功能,以便改进开源 Redis 的可用性和可管理性。详细了解适用于 Redis 的 Amazon ElastiCache »

与 REDIS 兼容

Redis 是一种得到广泛采用的内存数据存储,可用作数据库、缓存、消息代理、队列、会话存储和排行榜。Amazon ElastiCache for Redis 可与开源 Redis 数据格式、Redis API 兼容,并与 Redis 客户端配合使用。您可以将自行管理的 Redis 工作负载迁移到 ElastiCache for Redis,而无需更改任何代码。详细了解开源 Redis »

高度可用且可靠

Amazon ElastiCache for Redis 支持 Redis 集群和非集群模式,能够通过自动故障转移支持提供高可用性,而自动故障转移支持是通过检测主节点故障和在最大限度降低影响的情况下将副本提升为主节点来实现的。它通过跨可用区支持只读副本来为应用程序提供读取可用性,以便在主节点忙于应对增加的工作负载时提供读取内容。适用于 Redis 的 ElastiCache 支持增强的故障转移逻辑,能够在 Redis 集群模式的大多数主节点不可用时支持自动故障转移。从 Redis 5.0.5 开始,启用了自动故障转移的群集可为所有计划的操作提供在线配置更改。详细了解 ElastiCache 的自动故障转移 »

轻松扩展

借助 Amazon ElastiCache for Redis,您可以先从小规模起步,然后随应用程序的不断增多轻松扩展您的 Redis 数据 — 最终可扩展到一个内存数据高达 340 TB 的集群。它支持您将 Redis 集群环境扩展到 500 个节点和 500 个分片。它支持在线集群规模调整以扩展和缩减您的 Redis 集群,无需停机,可自动适应不断变化的需求。为扩展读取容量,ElastiCache 允许您跨多个可用区添加多达五个只读副本。为扩展写入容量,ElastiCache 支持 Redis 集群模式,这使您能够跨多个主节点对写入流量进行分区。详细了解扩展 ElastiCache »

使用案例

适用于 Redis 的 Amazon ElastiCache 非常适用于实时事务和分析处理使用案例,例如缓存聊天/消息收发游戏排行榜地理空间机器学习媒体流队列实时分析,以及会话存储

缓存

适用于 Redis 的 Amazon ElastiCache 是实施高度可用且安全的分布式内存中缓存的上好选择,它可以降低访问延迟、提高吞吐量,并可以减轻关系数据库或 NoSQL 数据库和应用程序的负载。ElastiCache 能够以亚毫秒级响应时间为频繁请求的项目提供支持,并且无需增加昂贵的后端数据库,即可轻松扩展以满足更高负载的需求。数据库查询结果缓存、持久性会话缓存以及整页缓存都是适用于 Redis 的 ElastiCache 的常见缓存示例。了解如何利用适用于 Redis 的 ElastiCache 构建缓存应用程序

聊天和消息传送

适用于 Redis 的 Amazon ElastiCache 支持 PUB/SUB 标准功能和模式匹配。这使得适用于 Redis 的 ElastiCache 能够支持高性能的聊天室、实时评论流以及服务器相互通信。您也可以使用 PUB/SUB 基于发布的事件触发操作。了解如何利用适用于 Redis 的 ElastiCache 构建聊天应用程序

地理空间

适用于 Redis 的 Amazon ElastiCache 提供专门构建的内存中数据结构和运算符,以便从规模和速度方面管理实时地理空间数据。您可以使用适用于 Redis 的 ElastiCache 向应用程序添加基于位置的功能,如驾驶时间、驾驶距离和兴趣点。了解如何利用适用于 Redis 的 ElastiCache 构建地理空间应用程序

Machine Learning

适用于 Redis 的 Amazon ElastiCache 为您提供了快速的内存中数据存储,可快速构建和部署机器学习模型。适用于 Redis 的 ElastiCache 可用于以下使用案例,例如游戏和金融服务中的欺诈检测、广告技术中的实时竞价,以及共享约会和共享单车中的配对,它能够在几十毫秒内处理实时数据并做出决策。了解 Coffee Meets Bagel 如何使用 ElastiCache 提供基于机器学习的实时约会建议

媒体流

适用于 Redis 的 Amazon ElastiCache 提供了一个快速的内存中数据存储,支持实时流使用案例。适用于 Redis 的 ElastiCache 可存储用于用户配置文件和查看历史记录的元数据、数百万用户的身份验证信息/令牌,以及清单文件,以便 CDN 能够将视频一次性流式传输到数百万移动和桌面用户。

队列

适用于 Redis 的 Amazon ElastiCache 提供了列表数据结构,可轻松实施轻量级持久队列。这类列表提供了原子操作和屏蔽功能,适用于各种需要可靠消息代理或循环表的应用程序。

实时分析

可将 Amazon ElastiCache for Redis 与流解决方案(例如 Apache Kafka 和 Amazon Kinesis)搭配使用来作为内存数据存储,从而以亚毫秒级延迟提取、处理和分析实时数据。ElastiCache 非常适合实时分析使用案例,比如社交媒体、广告定位、个性化及物联网和时间序列数据分析

pub/sub

Pub/Sub功能(means Publish, Subscribe)即发布及订阅功能。基于事件的系统中,Pub/Sub是目前广泛使用的通信模型,它采用事件作为基本的通信机制,提供大规模系统所要求的松散耦合的交互模式:订阅者(如客户端)以事件订阅的方式表达出它有兴趣接收的一个事件或一类事件;发布者(如服务器)可将订阅者感兴趣的事件随时通知相关订阅者。熟悉设计模式的朋友应该了解这与23种设计模式中的观察者模式极为相似。 同样,Redis的pub/sub是一种消息通信模式,主要的目的是解除消息发布者和消息订阅者之间的耦合,Redis作为一个pub/sub的server,在订阅者和发布者之间起到了消息路由的功能。

Redis pub/sub的实现

Redis通过publish和subscribe命令实现订阅和发布的功能。订阅者可以通过subscribe向redis server订阅自己感兴趣的消息类型。redis将信息类型称为通道(channel)。当发布者通过publish命令向redis server发送特定类型的信息时,订阅该消息类型的全部订阅者都会收到此消息。

Pub/Sub在java中的实现

导入Redis驱动:

<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.9.0</version>
</dependency>
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-redis</artifactId>
<version>2.0.8.RELEASE</version>
</dependency>

gradle

compile group: 'redis.clients', name: 'jedis', version: '3.3.0'
compile group: 'org.springframework.data', name: 'spring-data-redis', version: '2.4.0'

Redis驱动包提供了一个抽象类:JedisPubSub…继承这个类就完成了对客户端对订阅的监听。示例代码:

import lombok.extern.slf4j.Slf4j;
import redis.clients.jedis.JedisPubSub;

/** redis发布订阅消息监听器 */
@Slf4j
public class RedisClient extends JedisPubSub {

@Override
public void unsubscribe() {
super.unsubscribe();
}

@Override
public void unsubscribe(String... channels) {
super.unsubscribe(channels);
}

@Override
public void subscribe(String... channels) {
super.subscribe(channels);
}

@Override
public void psubscribe(String... patterns) {
super.psubscribe(patterns);
}

@Override
public void punsubscribe() {
super.punsubscribe();
}

@Override
public void punsubscribe(String... patterns) {
super.punsubscribe(patterns);
}

@Override
public void onMessage(String channel, String message) {
log.info("onMessage: channel[{}], message[{}]", channel, message);
}

@Override
public void onPMessage(String pattern, String channel, String message) {
log.info("onPMessage: pattern[{}], channel[{}], message[{}]", pattern, channel, message);
}

@Override
public void onSubscribe(String channel, int subscribedChannels) {
log.info("onSubscribe: channel[{}], subscribedChannels[{}]", channel, subscribedChannels);
}

@Override
public void onPUnsubscribe(String pattern, int subscribedChannels) {
log.info(
"onPUnsubscribe: pattern[{}], subscribedChannels[{}]", pattern, subscribedChannels);
}

@Override
public void onPSubscribe(String pattern, int subscribedChannels) {
log.info("onPSubscribe: pattern[{}], subscribedChannels[{}]", pattern, subscribedChannels);
}

@Override
public void onUnsubscribe(String channel, int subscribedChannels) {
log.info("channel:{} is been subscribed:{}", channel, subscribedChannels);
}
}

如上所示,抽象类中存在的方法。分别表示

监听到订阅模式接受到消息时的回调 (onPMessage)

监听到订阅频道接受到消息时的回调 (onMessage )

订阅频道时的回调( onSubscribe )

取消订阅频道时的回调( onUnsubscribe )

订阅频道模式时的回调 ( onPSubscribe )

取消订阅模式时的回调( onPUnsubscribe )

配置Redis(RedisConfiguration)

import info.xiaomo.redis.CustomerInfoSubscriber;
import info.xiaomo.redis.RedisCustomerInfoPublisher;
import info.xiaomo.service.AppService;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import org.springframework.data.redis.serializer.GenericToStringSerializer;

import java.util.concurrent.Executors;

@Configuration
@ComponentScan("info.xiaomo.redis")
public class RedisConfiguration {

@Autowired
private AppService appService;

@Bean
JedisConnectionFactory jedisConnectionFactory() {
return new JedisConnectionFactory();
}


@Bean
public RedisTemplate<String, Object> redisTemplate() {
final RedisTemplate<String, Object> template = new RedisTemplate<>();
template.setConnectionFactory(jedisConnectionFactory());
template.setValueSerializer(new GenericToStringSerializer<>(Object.class));
return template;
}

@Bean
MessageListenerAdapter messageListener() {
return new MessageListenerAdapter(new CustomerInfoSubscriber(appService));
}

@Bean
RedisMessageListenerContainer redisContainer() {
final RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(jedisConnectionFactory());
container.addMessageListener(messageListener(), topic());
container.setTaskExecutor(Executors.newFixedThreadPool(4));
return container;
}

@Bean
RedisCustomerInfoPublisher redisPublisher() {
return new RedisCustomerInfoPublisher();
}

/**
*
*
*/
@Bean
ChannelTopic webTopic() {
return new ChannelTopic("pubsub:clear-web-cache");
}

@Bean
ChannelTopic authTopic() {
return new ChannelTopic("pubsub:clear-auth-cache");
}
}

pub端(RedisCustomerInfoPublisher)

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class RedisCustomerInfoPublisher {
@Autowired private RedisTemplate<String, Object> redisTemplate;
@Autowired private ChannelTopic webApiTopic;
@Autowired private ChannelTopic authTopic;

public void publishWebApi() {
redisTemplate.convertAndSend(webApiTopic.getTopic(), RedisConst.clearWebCacheMessage);
}

public void publicAuth(){
redisTemplate.convertAndSend(authTopic.getTopic(), RedisConst.clearAuthCacheMessage);
}
}

sub端(CustomerInfoSubscriber)(注意直接在sub端使用@autowired的话会出现空指针)

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.NotNull;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.stereotype.Component;

@Component
@Slf4j
@RequiredArgsConstructor
public class CustomerInfoSubscriber implements MessageListener {
private final OAuthService oAuthService;

@Override
public void onMessage(@NotNull Message message, byte[] pattern) {
log.info("Received >> {}, thread:{}", message, Thread.currentThread().getName());
if (new String(message.getBody()).equals(RedisConst.clearAuthCacheMessage)) {
oAuthService.reloadClients();
}
}
}

在sub接收到消息后,执行对应的操作

测试控制器

import info.xiaomo.redis.RedisCustomerInfoPublisher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/admin/api/v2/redis")
public class RedisPubSubController {

// 这里的名字一定要和注入的名字一样
@Autowired private RedisCustomerInfoPublisher redisPublisher;

/**
* 在当前项目中pub,在当前项目中sub(既做发送方又做接收方)
*/
@RequestMapping("clearWebCache")
public void publishWebApi() {
redisPublisher.publishWebApi();
}

/**
* 在当前项目中发送,在auth中接收(当前项目只做为服务方)
*/
@RequestMapping("clearAuthCache")
public void publicAuth() {
redisPublisher.publicAuth();
}
}

思路整理

服务端

在RedisConfig中要定义一个topic,然后对外提供一个public的接口,用户触发publish。

客户端

在RedisConfig定义一个topic,名字要和服务的中定义的名字一致才能订阅到。用户在服务端publish,客户端收到后在onMessage中做对应的处理。

即做服务端又做客户端

需要在RedisConfig中监听(subscribe)(客户端的工作),然后提供一个接口给用户触发publish,如果只是作为服务端的话不需要监听消息。

文章目录
  1. 1. mq和kafuka对比
    1. 1.1. 业务场景分析
      1. 1.1.1. 社区
      2. 1.1.2. 功能
      3. 1.1.3. 接口
      4. 1.1.4. 延迟和性能
      5. 1.1.5. 队列数对性能的影响
      6. 1.1.6. 有序性
      7. 1.1.7. 支持事务
  2. 2. SQS是什么
  3. 3. SQS作用
    1. 3.0.1. 高度可扩展的标准队列和 FIFO 队列
    2. 3.0.2. 持久性和可用性
    3. 3.0.3. 安全性
    4. 3.0.4. 批处理
  • 4. SQS用法
  • 5. SNS 是什么
  • 6. SNS作用
  • 7. SQS VS SNS 区别
  • 8. SNS用法
    1. 8.1. sns 命令行
    2. 8.2. 创建主题
    3. 8.3. 订阅主题
    4. 8.4. 向主题发布
    5. 8.5. 取消订阅主题
    6. 8.6. 删除主题
    7. 8.7. java集成SNS(警告触发)
    8. 8.8. Amazon ElastiCache for Redis
    9. 8.9. 极致性能
      1. 8.9.1. 安全
      2. 8.9.2. 完全托管并已强化
      3. 8.9.3. 与 REDIS 兼容
      4. 8.9.4. 高度可用且可靠
      5. 8.9.5. 轻松扩展
    10. 8.10. 使用案例
      1. 8.10.1. 缓存
      2. 8.10.2. 聊天和消息传送
      3. 8.10.3. 地理空间
      4. 8.10.4. Machine Learning
      5. 8.10.5. 媒体流
      6. 8.10.6. 队列
      7. 8.10.7. 实时分析
    11. 8.11. pub/sub
    12. 8.12. Redis pub/sub的实现
    13. 8.13. 思路整理


  • twitter分享


    如果想及时收到回复,可在 订阅中心Participating中勾选Email

    Fork me on GitHub