个人博客:进入博客,关注下博主,感谢~

所有博客均在上面博客首发,其他平台同步更新 大家一起进步,多多指教~

文章目录

灰度发布AB Test实践逻辑架构图这个是外部访问内部,内部怎么访问外部?

实现demo解析关键修改点

实践优化点ReactiveLoadBalancerClientFilter源码

灰度发布

灰度发布分几种:蓝绿、ABTest以及金丝雀 灰度作用:为了减少灰度版本对生产环境的影响

灰度策略内容缺点蓝绿会有两套环境,灰度发布之后,流量从绿切到蓝,然后进行验证环境比较浪费,而且生产直接路由到灰度版本ABTest在网关层对不同用户进行路由,只有特定用户切流量到灰度版本,这样的话不会影响生产用户相对流量权重比较复杂,需要对用户进行区分对待金丝雀将部分流量切换过去,然后进行验证灰度的准确性,如果没有问题则流量全部切换还是那个生产用户直接测试灰度版本

AB Test实践

逻辑架构图

这是第一版本,我们实现一个自定义loadBalance,filter拿到对应header头,或者说被动打标,通过不同的域名,来进行路由 流量路由的时候,基于nacos注册服务里面metaData标识,来决定路由到哪台服务 灰度测试完之后滚动更新生产pod

这个是外部访问内部,内部怎么访问外部?

fegin或者http请求,灰度版本服务通过域名来访问 dubbo请求,本身也有负载均衡器,需要拿到对应的标识,比如说版本号来负载

进阶第二版 在第一版我们自定义了loadBalance,以及路由标识,比如说网关配置lb:xxx,为啥是lb开头,大家可以看下ReactiveLoadBalancerClientFilter源码,我们其实是需要定义另一种标识 那么问题来了,当灰度测试完之后,负载怎么换成正常的lb?

这个涉及到网关route动态配置,publushEvent即可(这个不在本章介绍)

实现demo

参照另一篇文章

gateway灰度发布

GrayGatewayReactiveLoadBalancerClientAutoConfiguration

import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;

import org.springframework.cloud.client.loadbalancer.LoadBalancerProperties;

import org.springframework.cloud.loadbalancer.support.LoadBalancerClientFactory;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

/**

* @author javachen

* @description GrayGatewayReactiveLoadBalancerClientAutoConfiguration

*/

@Configuration

public class GrayGatewayReactiveLoadBalancerClientAutoConfiguration {

public GrayGatewayReactiveLoadBalancerClientAutoConfiguration() {

}

@Bean

@ConditionalOnMissingBean({GrayReactiveLoadBalancerClientFilter.class})

public GrayReactiveLoadBalancerClientFilter grayReactiveLoadBalancerClientFilter(LoadBalancerClientFactory clientFactory, LoadBalancerProperties properties) {

return new GrayReactiveLoadBalancerClientFilter(clientFactory, properties);

}

}

GrayLoadBalancer

import org.apache.commons.lang3.ObjectUtils;

import org.apache.commons.logging.Log;

import org.apache.commons.logging.LogFactory;

import org.springframework.beans.factory.ObjectProvider;

import org.springframework.cloud.client.ServiceInstance;

import org.springframework.cloud.client.loadbalancer.DefaultResponse;

import org.springframework.cloud.client.loadbalancer.EmptyResponse;

import org.springframework.cloud.client.loadbalancer.Request;

import org.springframework.cloud.client.loadbalancer.Response;

import org.springframework.cloud.loadbalancer.core.NoopServiceInstanceListSupplier;

import org.springframework.cloud.loadbalancer.core.ReactorServiceInstanceLoadBalancer;

import org.springframework.cloud.loadbalancer.core.ServiceInstanceListSupplier;

import org.springframework.http.HttpHeaders;

import reactor.core.publisher.Flux;

import reactor.core.publisher.Mono;

import java.util.*;

/**

* @author javachen

* @description GrayReactiveLoadBalancerClientFilter

*/

public class GrayLoadBalancer implements ReactorServiceInstanceLoadBalancer {

private static final Log log = LogFactory.getLog(GrayLoadBalancer.class);

private ObjectProvider serviceInstanceListSupplierProvider;

private String serviceId;

public GrayLoadBalancer(ObjectProvider serviceInstanceListSupplierProvider, String serviceId) {

this.serviceId = serviceId;

this.serviceInstanceListSupplierProvider = serviceInstanceListSupplierProvider;

}

@Override

public Mono> choose(Request request) {

HttpHeaders headers = (HttpHeaders) request.getContext();

if (this.serviceInstanceListSupplierProvider != null) {

ServiceInstanceListSupplier supplier = (ServiceInstanceListSupplier)this.serviceInstanceListSupplierProvider.getIfAvailable(NoopServiceInstanceListSupplier::new);

return ((Flux)supplier.get()).next().map(list->getInstanceResponse((List)list,headers));

}

return null;

}

private Response getInstanceResponse(List instances, HttpHeaders headers) {

if (instances.isEmpty()) {

return getServiceInstanceEmptyResponse();

} else {

return getServiceInstanceResponseWithWeight(instances);

}

}

/**

* 根据版本进行分发

* @param instances

* @param headers

* @return

*/

private Response getServiceInstanceResponseByVersion(List instances, HttpHeaders headers) {

String versionNo = headers.getFirst("version");

//System.out.println(versionNo);

Map versionMap = new HashMap<>();

versionMap.put("version",versionNo);

final Set> attributes =

Collections.unmodifiableSet(versionMap.entrySet());

ServiceInstance serviceInstance = null;

for (ServiceInstance instance : instances) {

Map metadata = instance.getMetadata();

if(metadata.entrySet().containsAll(attributes)){

serviceInstance = instance;

break;

}

}

if(ObjectUtils.isEmpty(serviceInstance)){

return getServiceInstanceEmptyResponse();

}

return new DefaultResponse(serviceInstance);

}

/**

*

* 根据在nacos中配置的权重值,进行分发

* @param instances

*

* @return

*/

private Response getServiceInstanceResponseWithWeight(List instances) {

Map weightMap = new HashMap<>();

for (ServiceInstance instance : instances) {

Map metadata = instance.getMetadata();

//System.out.println(metadata.get("version")+"-->weight:"+metadata.get("weight"));

if(metadata.containsKey("gray")){

weightMap.put(instance,1000);

}else {

weightMap.put(instance,1);

}

}

WeightMeta weightMeta = WeightRandomUtils.buildWeightMeta(weightMap);

if(ObjectUtils.isEmpty(weightMeta)){

return getServiceInstanceEmptyResponse();

}

ServiceInstance serviceInstance = weightMeta.random();

if(ObjectUtils.isEmpty(serviceInstance)){

return getServiceInstanceEmptyResponse();

}

return new DefaultResponse(serviceInstance);

}

private Response getServiceInstanceEmptyResponse() {

log.warn("No servers available for service: " + this.serviceId);

return new EmptyResponse();

}

}

GrayReactiveLoadBalancerClientFilter

import lombok.extern.slf4j.Slf4j;

import org.apache.commons.logging.Log;

import org.apache.commons.logging.LogFactory;

import org.springframework.cloud.client.ServiceInstance;

import org.springframework.cloud.client.loadbalancer.*;

import org.springframework.cloud.gateway.filter.GatewayFilterChain;

import org.springframework.cloud.gateway.filter.GlobalFilter;

import org.springframework.cloud.gateway.filter.ReactiveLoadBalancerClientFilter;

import org.springframework.cloud.gateway.support.DelegatingServiceInstance;

import org.springframework.cloud.gateway.support.NotFoundException;

import org.springframework.cloud.gateway.support.ServerWebExchangeUtils;

import org.springframework.cloud.loadbalancer.core.ServiceInstanceListSupplier;

import org.springframework.cloud.loadbalancer.support.LoadBalancerClientFactory;

import org.springframework.core.Ordered;

import org.springframework.http.HttpHeaders;

import org.springframework.web.server.ServerWebExchange;

import reactor.core.publisher.Mono;

import javax.servlet.http.HttpServletRequest;

import java.net.URI;

/**

* @author javachen

* @description GrayReactiveLoadBalancerClientFilter

*/

public class GrayReactiveLoadBalancerClientFilter implements GlobalFilter, Ordered {

private static final Log log = LogFactory.getLog(ReactiveLoadBalancerClientFilter.class);

private static final int LOAD_BALANCER_CLIENT_FILTER_ORDER = 10150;

private final LoadBalancerClientFactory clientFactory;

private LoadBalancerProperties properties;

public GrayReactiveLoadBalancerClientFilter(LoadBalancerClientFactory clientFactory, LoadBalancerProperties properties) {

this.clientFactory = clientFactory;

this.properties = properties;

}

@Override

public int getOrder() {

return LOAD_BALANCER_CLIENT_FILTER_ORDER;

}

@Override

public Mono filter(ServerWebExchange exchange, GatewayFilterChain chain) {

URI url = (URI)exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR);

String schemePrefix = (String)exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_SCHEME_PREFIX_ATTR);

if (url != null && ("grayLb".equals(url.getScheme()) || "grayLb".equals(schemePrefix))) {

ServerWebExchangeUtils.addOriginalRequestUrl(exchange, url);

if (log.isTraceEnabled()) {

log.trace(ReactiveLoadBalancerClientFilter.class.getSimpleName() + " url before: " + url);

}

return this.choose(exchange).doOnNext((response) -> {

if (!response.hasServer()) {

throw NotFoundException.create(true, "Unable to find instance for " + url.getHost());

} else {

URI uri = exchange.getRequest().getURI();

String overrideScheme = null;

if (schemePrefix != null) {

overrideScheme = url.getScheme();

}

DelegatingServiceInstance serviceInstance = new DelegatingServiceInstance((ServiceInstance)response.getServer(), overrideScheme);

URI requestUrl = this.reconstructURI(serviceInstance, uri);

if (log.isTraceEnabled()) {

log.trace("LoadBalancerClientFilter url chosen: " + requestUrl);

}

exchange.getAttributes().put(ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR, requestUrl);

}

}).then(chain.filter(exchange));

} else {

return chain.filter(exchange);

}

}

protected URI reconstructURI(ServiceInstance serviceInstance, URI original) {

return LoadBalancerUriTools.reconstructURI(serviceInstance, original);

}

private Mono> choose(ServerWebExchange exchange) {

URI uri = (URI)exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR);

GrayLoadBalancer loadBalancer = new GrayLoadBalancer(clientFactory.getLazyProvider(uri.getHost(), ServiceInstanceListSupplier.class), uri.getHost());

if (loadBalancer == null) {

throw new NotFoundException("No loadbalancer available for " + uri.getHost());

} else {

return loadBalancer.choose(this.createRequest(exchange));

}

}

private Request createRequest(ServerWebExchange exchange) {

HttpHeaders headers = exchange.getRequest().getHeaders();

Request request = new DefaultRequest<>(headers);

return request;

}

}

WeightMeta

import java.util.Arrays;

import java.util.Random;

/**

* @author javachen

* @description 权重元数据对象

*/

public class WeightMeta {

private final Random ran = new Random();

private final T[] nodes;

private final int[] weights;

private final int maxW;

public WeightMeta(T[] nodes, int[] weights) {

this.nodes = nodes;

this.weights = weights;

this.maxW = weights[weights.length - 1];

}

/**

* 该方法返回权重随机对象

* @return

*/

public T random() {

int index = Arrays.binarySearch(weights, ran.nextInt(maxW) + 1);

if (index < 0) {

index = -1 - index;

}

return nodes[index];

}

public T random(int ranInt) {

if (ranInt > maxW) {

ranInt = maxW;

} else if(ranInt < 0){

ranInt = 1;

} else {

ranInt ++;

}

int index = Arrays.binarySearch(weights, ranInt);

if (index < 0) {

index = -1 - index;

}

return nodes[index];

}

@Override

public String toString() {

StringBuilder l1 = new StringBuilder();

StringBuilder l2 = new StringBuilder("[random]\t");

StringBuilder l3 = new StringBuilder("[node]\t\t");

l1.append(this.getClass().getName()).append(":").append(this.hashCode()).append(":\n").append("[index]\t\t");

for (int i = 0; i < weights.length; i++) {

l1.append(i).append("\t");

l2.append(weights[i]).append("\t");

l3.append(nodes[i]).append("\t");

}

l1.append("\n");

l2.append("\n");

l3.append("\n");

return l1.append(l2).append(l3).toString();

}

}

WeightRandomUtils

import java.util.HashMap;

import java.util.Map;

/**

*  权重算法取自:@see https://www.ctolib.com/topics-61571.html

*

* 随机工具类

*

* 使用权重的集合Map构建随机元数据对象

*

* 比如:

* 我们有3个url地址,他们的权重分别为1,2,3现在我们利用RandomUtil来根据权重随机获取url:

*

*

*

* map.put(url1, 1);

* map.put(url2, 2);

* map.put(url3, 3);

* RandomMeta md = WeightRandomUtils.buildWeightMeta(map);

* String weightRandomUrl = md.random();

*

*

*

*

*/

public class WeightRandomUtils {

public static WeightMeta buildWeightMeta(final Map weightMap) {

if(weightMap.isEmpty()){

return null;

}

final int size = weightMap.size();

Object[] nodes = new Object[size];

int[] weights = new int[size];

int index = 0;

int weightAdder = 0;

for (Map.Entry each : weightMap.entrySet()) {

nodes[index] = each.getKey();

weights[index++] = (weightAdder = weightAdder + each.getValue());

}

return new WeightMeta((T[]) nodes, weights);

}

public static void main(String[] args) {

Map map = new HashMap<>();

map.put("v1",1);

map.put("v2",2);

WeightMeta nodes = WeightRandomUtils.buildWeightMeta(map);

for(int i = 0; i < 10; i++){

new Thread(()->{

System.out.println(nodes.random());

}).start();

}

}

}

解析

可以参照ReactiveLoadBalancerClientFilter,RoundRobinLoadBalancer 就是里面会调用choose方法来进行负载

关键修改点

GrayLoadBalancer 启动这个,根据用户header以及访问host域名来触发

实践

nacos 两个实例,不同版本,元数据 postman带上特定标识,调接口的时候会发现进入了灰度版本

优化点

灰度测试后,不改负载也可以,最好是可以改回正常的负载

所以进行网关路由的动态配置,有很多实现方法

ReactiveLoadBalancerClientFilter源码

GlobalFilter, Ordered ReactiveLoadBalancerClientFilter实现这两个类,搞过网关(网关是谁)的都知道,这两就是过滤器+排序 为啥网关配置是按lb开头? 关键点,是在choose方法,ReactorLoadBalancer负载算法,决定要路由到哪台机器 ReactorLoadBalancer源码 (RoundRobinLoadBalancer实现类) 路由算法(这就是为啥前面我们会实现GrayLoadBalancer,WeightRandomUtils,就是仿造之前的RoundRobinLoadBalancer)

文章链接

评论可见,请评论后查看内容,谢谢!!!评论后请刷新页面。