上一篇

Zipkin+Sleuth 链路追踪整合

增加基于 MQ 向 Zipkin 埋点功能

1.rabbitmq

docker run --name rabbitmq -d -p 5672:5672 -p 15672:15672 -e RABBITMQ_DEFAULT_USER=spring -e RABBITMQ_DEFAULT_PASS=spring rabbitmq:management

2.启动 Zipkin绑定 rabbitmq

docker run --name rabbit-zipkin -d -p 9411:9411 --link rabbitmq -e RABBIT_ADDRESSES=rabbitmq:5672 -e RABBIT_USER=spring -e RABBIT_PASSWORD=spring openzipkin/zipkin

3.添加依赖

org.springframework.cloud

spring-cloud-stream-binder-rabbit

org.springframework.cloud

spring-cloud-starter-zipkin

4.示例

(1)provider

配置

server.port=8010management.endpoints.web.exposure.include=*management.endpoint.health.show-details=alwaysspring.application.name=service-providerspring.cloud.consul.host=192.168.99.100spring.cloud.consul.port=8500spring.cloud.consul.discovery.health-check-path=/actuator/healthspring.cloud.consul.discovery.service-name=${spring.application.name}spring.cloud.consul.discovery.heartbeat.enabled=truespring.cloud.consul.discovery.prefer-ip-address=truespring.rabbitmq.host=192.168.99.100spring.rabbitmq.port=5672spring.rabbitmq.username=springspring.rabbitmq.password=springspring.cloud.stream.bindings.finishedOrders.group=service-providerspring.sleuth.sampler.probability=1.0spring.zipkin.sender.type=rabbit

启动类

package com.xyz.provider;

import com.xyz.provider.integration.Barista;

import org.springframework.boot.SpringApplication;

import org.springframework.boot.autoconfigure.SpringBootApplication;

import org.springframework.cloud.client.discovery.EnableDiscoveryClient;

import org.springframework.cloud.stream.annotation.EnableBinding;

@EnableDiscoveryClient

@SpringBootApplication

@EnableBinding(Barista.class)

public class ProviderApplication {

public static void main(String[] args) {

SpringApplication.run(ProviderApplication.class, args);

}

}

View Code

Barista.java

package com.xyz.provider.integration;

import org.springframework.cloud.stream.annotation.Input;

import org.springframework.cloud.stream.annotation.Output;

import org.springframework.messaging.MessageChannel;

import org.springframework.messaging.SubscribableChannel;

public interface Barista {

String NEW_ORDERS = "newOrders";

String FINISHED_ORDERS = "finishedOrders";

@Input

SubscribableChannel finishedOrders();

@Output

MessageChannel newOrders();

}

View Code

OrderListener.java

package com.xyz.provider.integration;

import lombok.extern.slf4j.Slf4j;

import org.springframework.cloud.stream.annotation.StreamListener;

import org.springframework.stereotype.Component;

@Component

@Slf4j

public class OrderListener {

@StreamListener(Barista.FINISHED_ORDERS)

public void listenFinishedOrders(Integer num) {

log.info("We've finished an order [{}].", num);

}

}

View Code

OrderService.java

package com.xyz.provider.service;

import com.xyz.provider.integration.Barista;

import lombok.extern.slf4j.Slf4j;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.messaging.support.MessageBuilder;

import org.springframework.stereotype.Service;

import org.springframework.transaction.annotation.Transactional;

@Service

@Transactional

@Slf4j

public class OrderService{

@Autowired

private Barista barista;

public boolean updateNum(Integer num) {

num++;

System.out.println(num);

barista.newOrders().send(MessageBuilder.withPayload(num).build());

return true;

}

}

View Code

控制器

package com.xyz.provider.controller;

import com.xyz.provider.service.OrderService;

import lombok.extern.slf4j.Slf4j;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.web.bind.annotation.GetMapping;

import org.springframework.web.bind.annotation.PathVariable;

import org.springframework.web.bind.annotation.RequestMapping;

import org.springframework.web.bind.annotation.RestController;

@RestController

@Slf4j

public class demoController {

@Autowired

private OrderService orderService;

@RequestMapping("/rabbitmq")

public String rabbitmq(Integer num) {

log.info("msq num: ", num);

orderService.updateNum(num);

return "ok";

}

}

(2)customer

配置

server.port=8015

spring.application.name=service-comsumer

management.endpoints.web.exposure.include=*

management.endpoint.health.show-details=always

spring.cloud.consul.host=192.168.99.100

spring.cloud.consul.port=8500

spring.cloud.consul.discovery.health-check-path=/actuator/health

spring.cloud.consul.discovery.service-name=${spring.application.name}

spring.cloud.consul.discovery.heartbeat.enabled=true

spring.rabbitmq.host=192.168.99.100

spring.rabbitmq.port=5672

spring.rabbitmq.username=spring

spring.rabbitmq.password=spring

spring.cloud.stream.bindings.newOrders.group=service-comsumer

spring.sleuth.sampler.probability=1.0

spring.zipkin.sender.type=rabbit

启动类

package com.xyz.comsumer;

import com.xyz.comsumer.integration.Waiter;

import org.springframework.boot.SpringApplication;

import org.springframework.boot.autoconfigure.SpringBootApplication;

import org.springframework.cloud.client.loadbalancer.LoadBalanced;

import org.springframework.cloud.openfeign.EnableFeignClients;

import org.springframework.cloud.stream.annotation.EnableBinding;

import org.springframework.context.annotation.Bean;

import org.springframework.web.client.RestTemplate;

@EnableFeignClients

@SpringBootApplication

@EnableBinding(Waiter.class)

public class ComsumerApplication {

public static void main(String[] args) {

SpringApplication.run(ComsumerApplication.class, args);

}

@Bean

@LoadBalanced

public RestTemplate restTemplate() {

return new RestTemplate();

}

}

View Code

Waiter.java

package com.xyz.comsumer.integration;

import org.springframework.cloud.stream.annotation.Input;

import org.springframework.cloud.stream.annotation.Output;

import org.springframework.messaging.MessageChannel;

import org.springframework.messaging.SubscribableChannel;

public interface Waiter {

String NEW_ORDERS = "newOrders";

String FINISHED_ORDERS = "finishedOrders";

@Input(NEW_ORDERS)

SubscribableChannel newOrders();

@Output(FINISHED_ORDERS)

MessageChannel finishedOrders();

}

View Code

OrderListener.java

package com.xyz.comsumer.integration;

import lombok.extern.slf4j.Slf4j;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.beans.factory.annotation.Qualifier;

import org.springframework.cloud.stream.annotation.StreamListener;

import org.springframework.messaging.MessageChannel;

import org.springframework.messaging.support.MessageBuilder;

import org.springframework.stereotype.Component;

import org.springframework.transaction.annotation.Transactional;

@Component

@Slf4j

@Transactional

public class OrderListener {

@Autowired

@Qualifier(Waiter.FINISHED_ORDERS)

private MessageChannel finishedOrdersMessageChannel;

@StreamListener(Waiter.NEW_ORDERS)

public void processNewOrder(Integer num) {

num++;

log.info("Receive a new order",

num);

System.out.println(num);

finishedOrdersMessageChannel.send(MessageBuilder.withPayload(num).build());

}

}

View Code

启动consul

启动provider

启动customer

GET   

  http://localhost:8010/rabbitmq

返回

  ok

浏览器打开

  http://192.168.99.100:9411

进入详情页

 

 

 RabbitMQ 的管理界面,可以看到 zipkin 这个 Queue 有消息处理

 

查看原文