手写rpc框架3
参考视频: 自己动手实现RPC框架,本内容为视频学习笔记。
三篇内容:
理论篇:rpc核心原理、现有框架对比、相关技术
实战篇:代码实现、使用案例
总结篇
上节:手写rpc框架2
实战篇
Client
首先是配置文件,选取什么类,服务端地址。
@Data
public class RpcClientConfig {
private Class<? extends TransportClient> transportClass
= HTTPTransportClient.class;
private Class<?extends Encoder> encoderClass = JSONEncoder.class;
private Class<?extends Decoder> decoderClass = JSONDecoder.class;
private Class<? extends TransportSelector> selectorClass =
RandomTransportSelector.class;
private int connectCount = 1;
private List<Peer> servers = Arrays.asList(
new Peer("127.0.0.1",3000)
);
}
server可能有多个,选择server的接口,包括选择、释放
/**
* 表示选择哪个server连接
* @author hqingLau
**/
public interface TransportSelector {
/**
* 初始化selector
* @param peers 可以连接的server端点信息
* @param count client与server建立多少个连接
* @param clazz client实现类class
*/
void init(List<Peer> peers,
int count,
Class<? extends TransportClient> clazz);
/**
* 选择一个transport和server做交互
* @return 网络client
*/
TransportClient select();
/**
* 释放用完的client
* @param client
*/
void release(TransportClient client);
void close();
}
随机选择server的实现类:
@Slf4j
public class RandomTransportSelector implements TransportSelector{
// 已经连接好的client
private List<TransportClient> clients;
public RandomTransportSelector() {
clients = new ArrayList<>();
}
/**
* 初始化selector
*
* @param peers 可以连接的server端点信息
* @param count client与server建立多少个连接
* @param clazz client实现类class
*/
@Override
public void init(List<Peer> peers, int count, Class<? extends TransportClient> clazz) {
count = Math.max(count,1);
for(Peer peer:peers) {
for (int i = 0; i < count; i++) {
TransportClient client = ReflectUtils.newInstance(clazz);
client.connect(peer);
clients.add(client);
}
log.info("connect server: {}",peer);
}
}
/**
* 选择一个transport和server做交互
*
* @return 网络client
*/
@Override
public synchronized TransportClient select() {
int i = new Random().nextInt(clients.size());
return clients.remove(i);
}
/**
* 释放用完的client,就是重新放回到list
*
* @param client
*/
@Override
public synchronized void release(TransportClient client) {
clients.add(client);
}
@Override
public synchronized void close() {
for(TransportClient client:clients) {
client.close();
}
clients.clear();
}
}
RpcClient:
public class RpcClient {
private RpcClientConfig config;
private Encoder encoder;
private Decoder decoder;
private TransportSelector selector;
public RpcClient() {
this(new RpcClientConfig());
}
public RpcClient(RpcClientConfig config) {
this.config = config;
this.encoder = ReflectUtils.newInstance(config.getEncoderClass());
this.decoder = ReflectUtils.newInstance(config.getDecoderClass());
this.selector = ReflectUtils.newInstance(config.getSelectorClass());
this.selector.init(this.config.getServers(),
this.config.getConnectCount(),
this.config.getTransportClass());
}
// 获取接口的代理对象
public <T> T getProxy(Class<T> clazz) {
return (T) Proxy.newProxyInstance(
getClass().getClassLoader(),
new Class[]{clazz},
new RemoteInvoker(clazz,encoder,decoder,selector)
);
}
}
可以获取接口的代理对象,就是客户端只要知道接口,方法,就ok了,具体的实现和运行交给服务端。客户端生成了一个动态代理对象来调用,其实内部handler调用的是server的。
RemoteInvoker:
/**
* 调用远程服务的代理类
* @author hqingLau
**/
@Slf4j
public class RemoteInvoker implements InvocationHandler {
private Encoder encoder;
private Decoder decoder;
private TransportSelector selector;
private Class clazz;
RemoteInvoker(Class clazz, Encoder encoder,
Decoder decoder,
TransportSelector selector) {
this.decoder = decoder;
this.encoder = encoder;
this.selector = selector;
this.clazz = clazz;
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
Request request = new Request();
request.setServiceDescriptor(ServiceDescriptor.from(clazz,method));
request.setParameters(args);
Response resp = invokeRemote(request);
if(resp == null || resp.getCode()!=0) {
throw new IllegalStateException("fail to invoke remote: "+resp);
}
return resp.getData();
}
private Response invokeRemote(Request request) {
Response resp = null;
TransportClient client = null;
try {
client = selector.select();
byte[] outBytes = encoder.encode(request);
InputStream recive = client.write(new ByteArrayInputStream(outBytes));
byte[] inBytes = IOUtils.readFully(recive,recive.available());
resp = decoder.decode(inBytes,Response.class);
} catch (IOException e) {
log.warn(e.getMessage(),e);
resp = new Response();
resp.setCode(1);
resp.setMessage("RpcClient got error: "+
e.getClass() +
": "+
e.getMessage()
);
} finally {
if(client!=null) {
selector.release(client);
}
}
return resp;
}
}
测试
测试代码:
// 服务接口
public interface CalcService {
int add(int a,int b);
int minus(int a,int b);
}
//服务具体实现
public class CalServiceImpl implements CalcService{
@Override
public int add(int a, int b) {
return a+b;
}
@Override
public int minus(int a, int b) {
return a-b;
}
}
//服务端
public class Server {
public static void main(String[] args) {
RpcServer server = new RpcServer();
server.register(CalcService.class,new CalServiceImpl());
server.start();
}
}
//客户端
public class Client {
public static void main(String[] args) {
RpcClient client = new RpcClient();
CalcService service = client.getProxy(CalcService.class);
int r1 = service.add(1,2);
int r2 = service.minus(10,3);
System.out.println(r1);
System.out.println(r2);
}
}
输出信息比较多,可以配置下logback配置文件:
<?xml version="1.0" encoding="UTF-8" ?>
<configuration>
<property name="log.pattern"
value="%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n"></property><!--日志格式-->
<!--控制台设置 定义输出到控制台的信息-->
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>${log.pattern}</pattern>
</encoder>
</appender>
<root level="info">
<appender-ref ref="STDOUT"/>
</root>
</configuration>
总结
明天梳理一下。