拼多多实习二面复盘
zookeeper判断服务器掉线
感知上线:服务器启动的时候,会在servers结点下创建一个新的短暂结点来存储当前服务器的信息,客户端通过对servers结点的watch就可以立刻知道有服务器上线了。
感知下线:有服务器下线后,对应servers下面的短暂结点会被删除,此时watch servers结点的客户端也能立刻知道哪个服务器下线了。
问题:zookeeper怎么知道服务器下线了?
watch监听机制:客户端向zookeeper的一个znode注册了一个watcher监听,当触发了这个watcher,就会向客户端发一个事件通知。
临时结点:当创建临时结点的程序停掉以后,这个临时结点就会消失。
问题: 如果服务端因为宕机或网络问题下线了,缓存却还在就会导致客户端请求已经不可用的服务端,增加请求失败率。
解决方案:由于服务端注册的是临时节点,所以如果服务端下线节点会被移除。只要监听zookeeper的子节点,如果新增或删除子节点就直接清空本地缓存即可。
zookeeper架构图:
rpc的服务端和其中一台zookeeper机器保持连接,发送读写请求。这些交互,基于长连接,通过心跳检测保持有效的会话,同时接收watch事件通知。
rpc消息重复、延迟问题
rpc中:因为消息重试或者没有实现幂等性导致的数据重复问题。
客户端调用出错重试策略:
- Failover: 通过负载均衡策略再选一个结点进行调用,直到达到重复次数
- Failfast: 发生异常直接抛出
- Failsafe:失败抛出一个空结果
- Failback:消费发生异常返回空结果,失败请求进行异步重试
- Forking:并行调用策略,通过线程池并发调用多个生产者,一个成功就行
幂等问题
例如用户支付成功后,支付系统将支付成功的消息,发送给消息队列,物流系统订阅到这个消息,准备为这笔订单创建物流单。
问题:可能重复创建物流单。
解决方案:
- 幂等表。新建一张表,用来做幂等,无其他业务意义,有一个字段名key建唯一索引。物流系统订阅到消息之后,首先尝试插入幂等表,订单编号作为key,如果成功继续创建物流单,如果已经存在则违反唯一性幂等原则,无法插入,丢弃消息。或者用redis,设置一定的过期时间,如一周。
- 状态机。绘制状态机图,分析状态转换,如果状态1为生成订单,为最终态,则即使再收到物流单消息也不处理。
动态代理具体调用流程
首先就调用者来说,只有一个暴露出来的API接口:
public interface HelloWordService {
String sayHello(String name);
}
例如controller里面,来了个请求,调用了这个函数:
@Controller
public class HelloWorldController {
@RpcAutowired(version = "1.0")
private HelloWordService helloWordService;
@GetMapping("/hello/world")
public ResponseEntity<String> pullServiceInfo(@RequestParam("name") String name){
return ResponseEntity.ok(helloWordService.sayHello(name));
}
}
里面用到了一个自定义注解:
@Target({ElementType.CONSTRUCTOR, ElementType.METHOD, ElementType.PARAMETER, ElementType.FIELD, ElementType.ANNOTATION_TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Autowired
public @interface RpcAutowired {
String version() default "1.0";
}
这个bean会经过一个后置处理器,如果被rpcAutowired修饰,就给属性赋动态代理。
ReflectionUtils.doWithFields(clazz, field -> {
RpcAutowired rpcAutowired = AnnotationUtils.getAnnotation(field, RpcAutowired.class);
if (rpcAutowired != null) {
Object bean = applicationContext.getBean(clazz);
field.setAccessible(true);
// 修改为代理对象
ReflectionUtils.setField(field, bean, clientStubProxyFactory.getProxy(field.getType(), rpcAutowired.version(), discoveryService, properties));
}
});
动态代理:
public class ClientStubProxyFactory {
private Map<Class<?>, Object> objectCache = new HashMap<>();
/**
* 获取代理对象
*
* @param clazz 接口
* @param version 服务版本
* @param <T>
* @return 代理对象
*/
public <T> T getProxy(Class<T> clazz, String version, DiscoveryService discoveryService, RpcClientProperties properties) {
return (T) objectCache.computeIfAbsent(clazz, clz ->
Proxy.newProxyInstance(clz.getClassLoader(), new Class[]{clz}, new ClientStubInvocationHandler(discoveryService, properties, clz, version))
);
}
}
然后就到了这个handler
里面。
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
// 1、获得服务信息
ServiceInfo serviceInfo = discoveryService.discovery(ServiceUtil.serviceKey(this.clazz.getName(), this.version));
if (serviceInfo == null) {
throw new ResourceNotFoundException("404");
}
MessageProtocol<RpcRequest> messageProtocol = new MessageProtocol<>();
// 设置请求头
/*
+---------------------------------------------------------------+
| 魔数 2byte | 协议版本号 1byte | 序列化算法 1byte | 报文类型 1byte |
+---------------------------------------------------------------+
| 状态 1byte | 消息 ID 32byte | 数据长度 4byte |
+---------------------------------------------------------------+
*/
messageProtocol.setHeader(MessageHeader.build(properties.getSerialization()));
// 设置请求体
// 服务名,方法,参数类型数组,参数值数组
RpcRequest request = new RpcRequest();
request.setServiceName(ServiceUtil.serviceKey(this.clazz.getName(), this.version));
request.setMethod(method.getName());
request.setParameterTypes(method.getParameterTypes());
request.setParameters(args);
messageProtocol.setBody(request);
// 发送网络请求 拿到结果
// netty短连接
// MessageProtocol<RpcRequest> 地址端口,超时时间
MessageProtocol<RpcResponse> responseMessageProtocol = NetClientTransportFactory.getNetClientTransport()
.sendRequest(RequestMetadata.builder().protocol(messageProtocol).address(serviceInfo.getAddress())
.port(serviceInfo.getPort()).timeout(properties.getTimeout()).build());
if (responseMessageProtocol == null) {
log.error("请求超时");
throw new RpcException("rpc调用结果失败, 请求超时 timeout:" + properties.getTimeout());
}
if (!MsgStatus.isSuccess(responseMessageProtocol.getHeader().getStatus())) {
log.error("rpc调用结果失败, message:{}", responseMessageProtocol.getBody().getMessage());
throw new RpcException(responseMessageProtocol.getBody().getMessage());
}
return responseMessageProtocol.getBody().getData();
}
sendRequest具体:
@Override
public MessageProtocol<RpcResponse> sendRequest(RequestMetadata metadata) throws Exception {
MessageProtocol<RpcRequest> protocol = metadata.getProtocol();
RpcFuture<MessageProtocol<RpcResponse>> future = new RpcFuture<>();
LocalRpcResponseCache.add(protocol.getHeader().getRequestId(), future);
// TCP 连接
ChannelFuture channelFuture = bootstrap.connect(metadata.getAddress(), metadata.getPort()).sync();
channelFuture.addListener((ChannelFutureListener) arg0 -> {
if (channelFuture.isSuccess()) {
log.info("connect rpc server {} on port {} success.", metadata.getAddress(), metadata.getPort());
} else {
log.error("connect rpc server {} on port {} failed.", metadata.getAddress(), metadata.getPort());
channelFuture.cause().printStackTrace();
eventLoopGroup.shutdownGracefully();
}
});
// 写入数据
channelFuture.channel().writeAndFlush(protocol);
return metadata.getTimeout() != null ? future.get(metadata.getTimeout(), TimeUnit.MILLISECONDS) : future.get();
}
根据MessageProtocol的header写入消息头到bytebuf,然后将请求体数据序列化,加入编码里面。
详看:
@Override
protected void encode(ChannelHandlerContext channelHandlerContext, MessageProtocol<T> messageProtocol, ByteBuf byteBuf) throws Exception {
MessageHeader header = messageProtocol.getHeader();
// 魔数
byteBuf.writeShort(header.getMagic());
// 协议版本号
byteBuf.writeByte(header.getVersion());
// 序列化算法
byteBuf.writeByte(header.getSerialization());
// 报文类型
byteBuf.writeByte(header.getMsgType());
// 状态
byteBuf.writeByte(header.getStatus());
// 消息 ID
byteBuf.writeCharSequence(header.getRequestId(), Charset.forName("UTF-8"));
RpcSerialization rpcSerialization = SerializationFactory.getRpcSerialization(SerializationTypeEnum.parseByType(header.getSerialization()));
byte[] data = rpcSerialization.serialize(messageProtocol.getBody());
// 数据长度
byteBuf.writeInt(data.length);
// 数据内容
byteBuf.writeBytes(data);
}
List size()问题
手写最小堆,完了之后面试官还和我争论了一会size()
问题,搞得我都有点不确定了。我说的是对的。
首先是例子:
List<Integer> list = new ArrayList<>(16);
System.out.println(list.size()); // 0
list.add(2);
System.out.println(list.size()); // 1
list.remove(0);
System.out.println(list.size()); // 0
源码:
// list内部还是数组
// transient Object[] elementData;
private void grow(int minCapacity) {
// overflow-conscious code
int oldCapacity = elementData.length;
int newCapacity = oldCapacity + (oldCapacity >> 1);
//......
}
public int size() {
return size;
}
有size()
和capacity
两个概念。size
是确切的元素个数,capacity
是对象数组的大小(可能没占满)。
http头部
通用头:
cache-control: 控制缓存
connection:是否需要永久连接
date:消息发送的时间,缓存在评估响应的新的程度时会用到
transfer-encoding:编码
。。。
请求头:
accept: 告诉服务器自己接受什么样的类型,*/*任何类型,type/*表示该类型下面的所有子类型
accept-charset: 自己能接受的字符集
accept-encoding: 自己能接受的编码方法,通常指定压缩方法
range: 自己想获取数据的哪一部分
host
user-agent
响应头:
server: Apache/2.0.61例如
location:让客户端重定向到URI
实体首部字段:
content-encoding
content-language
content-length
conteng-location:
content-type: text/html text/plain
expires
last-modified