拼多多实习二面复盘


zookeeper判断服务器掉线

image-20220328160503228

感知上线:服务器启动的时候,会在servers结点下创建一个新的短暂结点来存储当前服务器的信息,客户端通过对servers结点的watch就可以立刻知道有服务器上线了。

感知下线:有服务器下线后,对应servers下面的短暂结点会被删除,此时watch servers结点的客户端也能立刻知道哪个服务器下线了。

问题:zookeeper怎么知道服务器下线了?

watch监听机制:客户端向zookeeper的一个znode注册了一个watcher监听,当触发了这个watcher,就会向客户端发一个事件通知。

临时结点:当创建临时结点的程序停掉以后,这个临时结点就会消失。

问题: 如果服务端因为宕机或网络问题下线了,缓存却还在就会导致客户端请求已经不可用的服务端,增加请求失败率。

解决方案:由于服务端注册的是临时节点,所以如果服务端下线节点会被移除。只要监听zookeeper的子节点,如果新增或删除子节点就直接清空本地缓存即可。

zookeeper架构图

image-20220328191411909

rpc的服务端和其中一台zookeeper机器保持连接,发送读写请求。这些交互,基于长连接,通过心跳检测保持有效的会话,同时接收watch事件通知。

rpc消息重复、延迟问题

rpc中:因为消息重试或者没有实现幂等性导致的数据重复问题。

客户端调用出错重试策略:

  • Failover: 通过负载均衡策略再选一个结点进行调用,直到达到重复次数
  • Failfast: 发生异常直接抛出
  • Failsafe:失败抛出一个空结果
  • Failback:消费发生异常返回空结果,失败请求进行异步重试
  • Forking:并行调用策略,通过线程池并发调用多个生产者,一个成功就行

幂等问题

例如用户支付成功后,支付系统将支付成功的消息,发送给消息队列,物流系统订阅到这个消息,准备为这笔订单创建物流单。

问题:可能重复创建物流单。

解决方案:

  • 幂等表。新建一张表,用来做幂等,无其他业务意义,有一个字段名key建唯一索引。物流系统订阅到消息之后,首先尝试插入幂等表,订单编号作为key,如果成功继续创建物流单,如果已经存在则违反唯一性幂等原则,无法插入,丢弃消息。或者用redis,设置一定的过期时间,如一周。
  • 状态机。绘制状态机图,分析状态转换,如果状态1为生成订单,为最终态,则即使再收到物流单消息也不处理。

动态代理具体调用流程

首先就调用者来说,只有一个暴露出来的API接口:

public interface HelloWordService {
    String sayHello(String name);
}

例如controller里面,来了个请求,调用了这个函数:

@Controller
public class HelloWorldController {
    @RpcAutowired(version = "1.0")
    private HelloWordService helloWordService;

    @GetMapping("/hello/world")
    public ResponseEntity<String> pullServiceInfo(@RequestParam("name") String name){
        return  ResponseEntity.ok(helloWordService.sayHello(name));
    }
}

里面用到了一个自定义注解:

@Target({ElementType.CONSTRUCTOR, ElementType.METHOD, ElementType.PARAMETER, ElementType.FIELD, ElementType.ANNOTATION_TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Autowired
public @interface RpcAutowired {
    String version() default "1.0";
}

这个bean会经过一个后置处理器,如果被rpcAutowired修饰,就给属性赋动态代理。

ReflectionUtils.doWithFields(clazz, field -> {
    RpcAutowired rpcAutowired = AnnotationUtils.getAnnotation(field, RpcAutowired.class);
    if (rpcAutowired != null) {
        Object bean = applicationContext.getBean(clazz);
        field.setAccessible(true);
        // 修改为代理对象
        ReflectionUtils.setField(field, bean, clientStubProxyFactory.getProxy(field.getType(), rpcAutowired.version(), discoveryService, properties));
    }
});

动态代理:

public class ClientStubProxyFactory {

    private Map<Class<?>, Object> objectCache = new HashMap<>();

    /**
     * 获取代理对象
     *
     * @param clazz   接口
     * @param version 服务版本
     * @param <T>
     * @return 代理对象
     */
    public <T> T getProxy(Class<T> clazz, String version, DiscoveryService discoveryService, RpcClientProperties properties) {
        return (T) objectCache.computeIfAbsent(clazz, clz ->
                Proxy.newProxyInstance(clz.getClassLoader(), new Class[]{clz}, new ClientStubInvocationHandler(discoveryService, properties, clz, version))
        );
    }
}

然后就到了这个handler里面。

@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
    // 1、获得服务信息
    ServiceInfo serviceInfo = discoveryService.discovery(ServiceUtil.serviceKey(this.clazz.getName(), this.version));
    if (serviceInfo == null) {
        throw new ResourceNotFoundException("404");
    }

    MessageProtocol<RpcRequest> messageProtocol = new MessageProtocol<>();
    // 设置请求头
    /*
    +---------------------------------------------------------------+
    | 魔数 2byte | 协议版本号 1byte | 序列化算法 1byte | 报文类型 1byte  |
    +---------------------------------------------------------------+
    | 状态 1byte |        消息 ID 32byte     |      数据长度 4byte     |
    +---------------------------------------------------------------+
    */
    messageProtocol.setHeader(MessageHeader.build(properties.getSerialization()));
    // 设置请求体
    // 服务名,方法,参数类型数组,参数值数组
    RpcRequest request = new RpcRequest();
    request.setServiceName(ServiceUtil.serviceKey(this.clazz.getName(), this.version));
    request.setMethod(method.getName());
    request.setParameterTypes(method.getParameterTypes());
    request.setParameters(args);
    messageProtocol.setBody(request);

    // 发送网络请求 拿到结果
    // netty短连接
    // MessageProtocol<RpcRequest> 地址端口,超时时间
    MessageProtocol<RpcResponse> responseMessageProtocol = NetClientTransportFactory.getNetClientTransport()
            .sendRequest(RequestMetadata.builder().protocol(messageProtocol).address(serviceInfo.getAddress())
                    .port(serviceInfo.getPort()).timeout(properties.getTimeout()).build());

    if (responseMessageProtocol == null) {
        log.error("请求超时");
        throw new RpcException("rpc调用结果失败, 请求超时 timeout:" + properties.getTimeout());
    }

    if (!MsgStatus.isSuccess(responseMessageProtocol.getHeader().getStatus())) {
        log.error("rpc调用结果失败, message:{}", responseMessageProtocol.getBody().getMessage());
        throw new RpcException(responseMessageProtocol.getBody().getMessage());
    }
    return responseMessageProtocol.getBody().getData();
}

sendRequest具体:

@Override
public MessageProtocol<RpcResponse> sendRequest(RequestMetadata metadata) throws Exception {
    MessageProtocol<RpcRequest> protocol = metadata.getProtocol();
    RpcFuture<MessageProtocol<RpcResponse>> future = new RpcFuture<>();
    LocalRpcResponseCache.add(protocol.getHeader().getRequestId(), future);

    // TCP 连接
    ChannelFuture channelFuture = bootstrap.connect(metadata.getAddress(), metadata.getPort()).sync();
    channelFuture.addListener((ChannelFutureListener) arg0 -> {
        if (channelFuture.isSuccess()) {
            log.info("connect rpc server {} on port {} success.", metadata.getAddress(), metadata.getPort());
        } else {
            log.error("connect rpc server {} on port {} failed.", metadata.getAddress(), metadata.getPort());
            channelFuture.cause().printStackTrace();
            eventLoopGroup.shutdownGracefully();
        }
    });
    // 写入数据
    channelFuture.channel().writeAndFlush(protocol);
    return metadata.getTimeout() != null ? future.get(metadata.getTimeout(), TimeUnit.MILLISECONDS) : future.get();
}

根据MessageProtocol的header写入消息头到bytebuf,然后将请求体数据序列化,加入编码里面。

详看:

@Override
protected void encode(ChannelHandlerContext channelHandlerContext, MessageProtocol<T> messageProtocol, ByteBuf byteBuf) throws Exception {
    MessageHeader header = messageProtocol.getHeader();
    // 魔数
    byteBuf.writeShort(header.getMagic());
    // 协议版本号
    byteBuf.writeByte(header.getVersion());
    // 序列化算法
    byteBuf.writeByte(header.getSerialization());
    // 报文类型
    byteBuf.writeByte(header.getMsgType());
    // 状态
    byteBuf.writeByte(header.getStatus());
    // 消息 ID
    byteBuf.writeCharSequence(header.getRequestId(), Charset.forName("UTF-8"));
    RpcSerialization rpcSerialization = SerializationFactory.getRpcSerialization(SerializationTypeEnum.parseByType(header.getSerialization()));
    byte[] data = rpcSerialization.serialize(messageProtocol.getBody());
    // 数据长度
    byteBuf.writeInt(data.length);
    // 数据内容
    byteBuf.writeBytes(data);
}

List size()问题

手写最小堆,完了之后面试官还和我争论了一会size()问题,搞得我都有点不确定了。我说的是对的。

首先是例子:

List<Integer> list = new ArrayList<>(16);
System.out.println(list.size()); // 0
list.add(2);
System.out.println(list.size()); // 1
list.remove(0);
System.out.println(list.size()); // 0

源码:

// list内部还是数组
// transient Object[] elementData;
private void grow(int minCapacity) {
    // overflow-conscious code
    int oldCapacity = elementData.length;
    int newCapacity = oldCapacity + (oldCapacity >> 1);
    //......
}

public int size() {
    return size;
}

size()capacity两个概念。size是确切的元素个数,capacity是对象数组的大小(可能没占满)。

http头部

通用头:

cache-control: 控制缓存
connection:是否需要永久连接
date:消息发送的时间,缓存在评估响应的新的程度时会用到
transfer-encoding:编码
。。。

请求头:

accept: 告诉服务器自己接受什么样的类型,*/*任何类型,type/*表示该类型下面的所有子类型
accept-charset: 自己能接受的字符集
accept-encoding: 自己能接受的编码方法,通常指定压缩方法
range: 自己想获取数据的哪一部分
host
user-agent

响应头:

server: Apache/2.0.61例如
location:让客户端重定向到URI

实体首部字段:

content-encoding
content-language
content-length
conteng-location: 
content-type: text/html text/plain
expires
last-modified