【微服务学习笔记(二)】Docker、RabbitMQ、SpringAMQP、Elasticseach

Docker镜像和容器安装基础命令Dockerfile自定义镜像

MQ(服务异步通讯)RabbitMQ安装使用消息模型

SpringAMQP消息发送消息接收Work Queue 工作队列发布订阅Fanout ExchangeDirect ExchangeTopicExchange

消息转换器

elasticsearch(分布式搜索)倒排索引索引库语法操作文档

本篇内容为学习笔记,学习链接为SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式,系统详解springcloud微服务技术栈课

课程资料链接可在视频下方找到,此处不粘贴,而以下的代码都是资料中有的,只不过做为记录单独粘贴,做为学习使用的参考步骤。

Docker

作用:解决开发部署时依赖、环境冲突的问题。

Docker如何解决依赖的兼容问题的?

将应用的Libs(函数库)、Deps(依赖)配置与应用一起打包。将每个应用放到一个隔离容器去运行,避免互相干扰。

​​

Docker如何解决不同系统环境的问题?

Docker将用户程序与所需要调用的系统(比如Ubuntu)函数库一起打包。Docker运行到不同操作系统时,直接基于打包的库函数,借助于操作系统的Linux内核来运行。

Docker与虚拟机的比较

docker是一个系统进程,虚拟机是在操作系统中的操作系统。

特性Docker虚拟机性能接近原生性能较差硬盘占用一般为MB一般为GB启动秒级分钟级

镜像和容器

镜像(Image):Docker将应用程序及其所需的依赖、函数库、环境、配置等文件打包在一起,称为镜像。

**容器(Container)*镜像中的应用程序运行后形成的进程就是容器,只是Docker会给容器做隔离,对外不可见。

Docker和DockerHub

DockerHub:DockerHub是一个Docker镜像的托管平台。这样的平台称为Docker Registry。国内也有类似于DockerHub 的公开服务,比如网易云镜像服务、阿里云镜像库等。

架构:

Docker是一个CS架构的程序,由两部分组成:

服务端(server):Docker守护进程,负责处理Docker指令,管理镜像、容器等。客户端(client):通过命令或RestAPI向Docker服务端发送指令。可以在本地或远程向服务端发送指令。

安装

可以参考官方文档: Docker安装文档

还可以参考这篇博客: Docker 安装 (完整详细版)

基础命令

可以通过 docker --help命令查看帮助文档,学习使用。 拉取命令步骤: 1、进入docker官网搜索需要拉取的镜像 2、按照官网搜索出来的指令放入控制台

数据卷是一个虚拟目录,指向宿主机文件系统中的某个目录。

容器与数据耦合的问题:

不便于修改 当我们要修改Nginx的html内容时,需要进入容器内部修改,很不方便。数据不可复用 在容器内的修改对外是不可见的。所有修改对新创建的容器是不可复用的。升级维护困难 数据在容器内,如果要升级容器必然删除旧容器,所有数据都跟着删除了。

命令:

docker volume [选项]

选项:

create 创建一个volumeinspect 显示一个或多个volume的信息ls 列出所有的volumeprune 删除未使用的volumerm 删除一个或多个指定的volume

在创建容器时,可以通过-v参数来挂载一个数据卷到某个容器目录:

Dockerfile自定义镜像

Dockerfile就是一个文本文件,其中包含一个个的指令(Instruction),用指令来说明要执行什么操作来构建镜像。每一个指令都会形成一层Layer。

搭建镜像仓库

Docker官方的Docker Registry是一个基础版本的Docker镜像仓库,具备仓库管理的完整功能,但是没有图形化界面。

搭建方式比较简单,命令如下:

docker run -d \

--restart=always \

--name registry \

-p 5000:5000 \

-v registry-data:/var/lib/registry \

registry

命令中挂载了一个数据卷registry-data到容器内的/var/lib/registry 目录,这是私有镜像库存放数据的目录。

访问http://YourIp:5000/v2/_catalog 可以查看当前私有镜像服务中包含的镜像

使用DockerCompose部署带有图像界面的DockerRegistry,命令如下:

version: '3.0'

services:

registry:

image: registry

volumes:

- ./registry-data:/var/lib/registry

ui:

image: joxit/docker-registry-ui:static

ports:

- 8080:80

environment:

- REGISTRY_TITLE=传智教育私有仓库

- REGISTRY_URL=http://registry:5000

depends_on:

- registry

我们的私服采用的是http协议,默认不被Docker信任,所以需要做一个配置:

# 打开要修改的文件

vi /etc/docker/daemon.json

# 添加内容:

"insecure-registries":["http://192.168.150.101:8080"]

# 重加载

systemctl daemon-reload

# 重启docker

systemctl restart docker

MQ(服务异步通讯)

MQ(MessageQueue),消息队列,事件驱动架构中的Broker。

同步调用: 调用方需要等待执行方的调用结果。(就像打电话一样,需要实时响应)

优点:时效性高

缺点:

耦合度高性能和吞吐能力下降有额外的资源消耗有级联失败问题

异步调用: 调用方无需等待执行方的执行结果 (就像发微信,不需要马上回复)。 常见实现为事件驱动模式: 优点:

耦合度低吞吐量提升故障隔离流量削峰 异步通信的缺点:依赖于Broker的可靠性、安全性、吞吐能力架构复杂了,业务没有明显的流程线,不好追踪管理

RabbitMQ

安装

1、单机部署 在Centos7虚拟机中使用Docker来安装。

下载镜像:

docker pull rabbitmq:3-management

执行下面的命令来运行MQ容器:

docker run \

-e RABBITMQ_DEFAULT_USER=itcast \

-e RABBITMQ_DEFAULT_PASS=123321 \

--name mq \

--hostname mq1 \

-p 15672:15672 \

-p 5672:5672 \

-d \

rabbitmq:3-management

2、集群部署 在RabbitMQ的官方文档中,讲述了两种集群的配置方式:

普通模式:普通模式集群不进行数据同步,每个MQ都有自己的队列、数据信息(其它元数据信息如交换机等会同步)。例如我们有2个MQ:mq1,和mq2,如果你的消息在mq1,而你连接到了mq2,那么mq2会去mq1拉取消息,然后返回给你。如果mq1宕机,消息就会丢失。镜像模式:与普通模式不同,队列会在各个mq的镜像节点之间同步,因此你连接到任何一个镜像节点,均可获取到消息。而且如果一个节点宕机,并不会导致数据丢失。不过,这种方式增加了数据同步的带宽消耗。

先来看普通模式集群。

首先,我们需要让3台MQ互相知道对方的存在。 分别在3台机器中,设置 /etc/hosts文件,添加如下内容:

192.168.150.101 mq1

192.168.150.102 mq2

192.168.150.103 mq3

并在每台机器上测试,是否可以ping通对方。

使用

开启后在浏览器中输入虚拟机的地址,以此进入RabbitMQ页面。

RabbitMQ页面介绍:

消息模型

官方的HelloWorld是基于最基础的消息队列模型来实现的,只包括三个角色

publisher:消息发布者,将消息发送到队列queuequeue:消息队列,负责接受并缓存消息consumer:订阅队列,处理队列中的消息

简单队列模型

public class PublisherTest {

@Test

public void testSendMessage() throws IOException, TimeoutException {

// 1.建立连接

ConnectionFactory factory = new ConnectionFactory();

// 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码

factory.setHost("192.168.150.101");

factory.setPort(5672);

factory.setVirtualHost("/");

factory.setUsername("itcast");

factory.setPassword("123321");

// 1.2.建立连接

Connection connection = factory.newConnection();

// 2.创建通道Channel

Channel channel = connection.createChannel();

// 3.创建队列

String queueName = "simple.queue";

channel.queueDeclare(queueName, false, false, false, null);

// 4.发送消息

String message = "hello, rabbitmq!";

channel.basicPublish("", queueName, null, message.getBytes());

System.out.println("发送消息成功:【" + message + "】");

// 5.关闭通道和连接

channel.close();

connection.close();

}

}

public class ConsumerTest {

public static void main(String[] args) throws IOException, TimeoutException {

// 1.建立连接

ConnectionFactory factory = new ConnectionFactory();

// 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码

factory.setHost("192.168.150.101");

factory.setPort(5672);

factory.setVirtualHost("/");

factory.setUsername("itcast");

factory.setPassword("123321");

// 1.2.建立连接

Connection connection = factory.newConnection();

// 2.创建通道Channel

Channel channel = connection.createChannel();

// 3.创建队列

String queueName = "simple.queue";

channel.queueDeclare(queueName, false, false, false, null);

// 4.订阅消息

channel.basicConsume(queueName, true, new DefaultConsumer(channel){

@Override

public void handleDelivery(String consumerTag, Envelope envelope,

AMQP.BasicProperties properties, byte[] body) throws IOException {

// 5.处理消息

String message = new String(body);

System.out.println("接收到消息:【" + message + "】");

}

});

System.out.println("等待接收消息。。。。");

}

}

SpringAMQP

AMQP:Advanced Message Queuing Protocol,是用于在应用程序或之间传递业务消息的开放标准。该协议与语言和平台无关,更符合微服务中独立性的要求。

SpringAMQP:Spring AMQP是基于AMQP协议定义的一套API规范,提供了模板来发送和接收消息。包含两部分,其中spring-amqp是基础抽象spring-rabbit是底层的默认实现。

SpringAMQP特征:

侦听器容器,用于异步处理入站消息用于发送和接收消息的RabbitTemplateRabbitAdmin用于自动声明队列,交换和绑定

消息发送

1、引入依赖

org.springframework.boot

spring-boot-starter-amqp

2、在publisher服务中配置

spring:

rabbitmq:

host: 192.168.150.101 # rabbitMQ的ip地址

port: 5672 # 端口

username: itcast

password: 123321

virtual-host: /

3、测试方法:

@RunWith(SpringRunner.class)

@SpringBootTest

//单元测试使用该两个注解

public class SpringAmqpTest {

@Autowired

private RabbitTemplate rabbitTemplate;

@Test

public void testSendMessage2SimpleQueue() {

String queueName = "simple.queue";

String message = "hello, spring amqp!";

rabbitTemplate.convertAndSend(queueName, message);

}

}

消息接收

1、在consumer中

spring:

rabbitmq:

host: 192.168.150.101 # rabbitMQ的ip地址

port: 5672 # 端口

username: itcast

password: 123321

virtual-host: /

listener:

simple:

prefetch: 1

2、监听方法

@Component

public class SpringRabbitListener {

@RabbitListener(queues = "simple.queue")

public void listenSimpleQueue(String msg) {

System.out.println("消费者接收到simple.queue的消息:【" + msg + "】");

}

}

Work Queue 工作队列

作用:提高消息处理的效率,避免消息堆积。

消息预取:预先取出队列中的消息。 消息预取限制:通过prefetch控制预期消息的上限,使得处理更快的消费者能取得更多消息,慢的消费者取得少的消息。

spring:

rabbitmq:

listener:

simple:

prefetch: 1 #每次只能获取一条消息,处理完成才能获取下一个消息

发布订阅

以上两个模型都针对一个消息发布一个消费者,哪怕是队列模型,也是对消息进行分配,不可能一个消息获得多次。 加入exchange(交换机),发布者不需要直接面对队列,交换机只负责发送,不负责存储。 常见exchange类型包括:

Fanout:广播Direct:路由Topic:话题

Fanout Exchange

和以上类型不同的点在于,会将接收到的消息路由到每一个跟其绑定的queue。

1、声明队列、交换机,并将其绑定

@Configuration

public class FanoutConfig {

// itcast.fanout

@Bean

public FanoutExchange fanoutExchange(){

return new FanoutExchange("itcast.fanout");

}

// fanout.queue1

@Bean

public Queue fanoutQueue1(){

return new Queue("fanout.queue1");

}

// 绑定队列1到交换机

@Bean

public Binding fanoutBinding1(Queue fanoutQueue1, FanoutExchange fanoutExchange){

return BindingBuilder

.bind(fanoutQueue1)

.to(fanoutExchange);

}

// fanout.queue2

@Bean

public Queue fanoutQueue2(){

return new Queue("fanout.queue2");

}

// 绑定队列2到交换机

@Bean

public Binding fanoutBinding2(Queue fanoutQueue2, FanoutExchange fanoutExchange){

return BindingBuilder

.bind(fanoutQueue2)

.to(fanoutExchange);

}

@Bean

public Queue objectQueue(){

return new Queue("object.queue");

}

}

2、查看

3、发送测试SpringAmqpTest中

@Test

public void testSendFanoutExchange() {

// 交换机名称

String exchangeName = "itcast.fanout";

// 消息

String message = "hello, every one!";

// 发送消息

rabbitTemplate.convertAndSend(exchangeName, "", message);

}

Direct Exchange

会将接收到的消息根据规则路由到指定的Queue,因此称为路由模式(routes)。

每一个Queue都与Exchange设置一个BindingKey发布者发送消息时,指定消息的RoutingKeyExchange将消息路由到BindingKey与消息RoutingKey一致的队列

1、SpringRabbitListener中声明监听方法,分别监听两个队列

@RabbitListener(bindings = @QueueBinding(

value = @Queue(name = "direct.queue1"),

exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),

key = {"red", "blue"}

))

public void listenDirectQueue1(String msg){

System.out.println("消费者接收到direct.queue1的消息:【" + msg + "】");

}

@RabbitListener(bindings = @QueueBinding(

value = @Queue(name = "direct.queue2"),

exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),

key = {"red", "yellow"}

))

public void listenDirectQueue2(String msg){

System.out.println("消费者接收到direct.queue2的消息:【" + msg + "】");

}

2、SpringAmqpTest类发送测试

@Test

public void testSendDirectExchange() {

// 交换机名称

String exchangeName = "itcast.direct";

// 消息

String message = "hello, red!";

// 发送消息

rabbitTemplate.convertAndSend(exchangeName, "red", message);

}

TopicExchange

与DirectExchange类似,区别在于routingKey必须是多个单词的列表,并且以.分割。

1、SpringRabbitListener监听

@RabbitListener(bindings = @QueueBinding(

value = @Queue(name = "topic.queue1"),

exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),

key = "china.#"

))

public void listenTopicQueue1(String msg){

System.out.println("消费者接收到topic.queue1的消息:【" + msg + "】");

}

@RabbitListener(bindings = @QueueBinding(

value = @Queue(name = "topic.queue2"),

exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),

key = "#.news"

))

public void listenTopicQueue2(String msg){

System.out.println("消费者接收到topic.queue2的消息:【" + msg + "】");

}

2、SpringAmqpTest中测试

@Test

public void testSendTopicExchange() {

// 交换机名称

String exchangeName = "itcast.topic";

// 消息

String message = "今天天气不错,我的心情好极了!";

// 发送消息

rabbitTemplate.convertAndSend(exchangeName, "china.weather", message);

}

消息转换器

RabbitMQ只支持字节注入,而SpringAMQP允许发对象,采用Java中jdk的序列化,将其转化。

Spring的对消息对象的处理是由org.springframework,amqp.suppor.converter.Messageconverter来处理的。默认实现是SimpleMessageConverter,基于JDK的ObjectOutputStream完成序列化。

如果要修改只需要定义一个MessageConverter 类型的Bean即可。 推荐用JSON方式序列化,步骤如下:

1、引入依赖

com.fasterxml.jackson.core

jackson-databind

2、PublisherApplication中:

@Bean

public MessageConverter messageConverter(){

return new Jackson2JsonMessageConverter();

}

3、发送转换为JSON完成,接下来是接收:

SpringRabbitListener:

@RabbitListener(queues = "object.queue")

public void listenObjectQueue(Map msg){

System.out.println("接收到object.queue的消息:" + msg);

}

ConsumerApplication:

@Bean

public MessageConverter messageConverter(){

return new Jackson2JsonMessageConverter();

}

elasticsearch(分布式搜索)

elasticsearch是强大的开源搜索引擎,广泛应用于日志数据分析、实时监控等领域。

Lucene是一个]ava语言的搜索引擎类库,是Apache公司的顶级项目。

Lucene的优势:

易扩展高性能(基于倒排索引)

Lucene的缺点:

只限于Java语言开发学习曲线陡峭不支持水平扩展

倒排索引

与MySQL对比:

Mysql:擅长事务类型操作,可以确保数据的安全和一致性Elasticsearch:擅长海量数据的搜索、分析、计算

索引库语法

查看索引库语法

GET /索引库名

示例:

GET /heima

删除索引库的语法

DELETE /索引库名

示例:

DELETE /heima

索引库和mapping一旦创建无法修改,但是可以添加新的字段,语法如下:

操作文档

新增文档

查看文档语法

GET /索引库名/-doc/文档id

示例:

GET /heima/_doc/1

删除文档的语法:

DELETE /索引库名/-doc/文档id

示例:

DELETE /heima/ doc/1

修改文档

精彩链接

评论可见,请评论后查看内容,谢谢!!!评论后请刷新页面。