1.服務器端主要類
public abstractclass?Server
{
??public static final ByteBuffer HEADER =ByteBuffer.wrap("hrpc".getBytes());
??public static final byte CURRENT_VERSION =4;
??private static finalThreadLocal<Server>?SERVER= new ThreadLocal<Server>();??
??private static finalThreadLocal<Call> CurCall = newThreadLocal<Call>();
??private String bindAddress; ??
??private int port; ?? ? ?? ??
??private int handlerCount; ?? ?
??private int readThreads; ?? ?
??private Class<? extendsWritable> paramClass; ???
??private int maxIdleTime; ??
??private int thresholdIdleConnections;? ? ?
??private Configuration conf;
??private int maxQueueSize;
??private final int maxRespSize;
??private int socketSendBufferSize;
??volatile private boolean running =true;
??privateBlockingQueue<Call> callQueue;
??privateList<Connection> connectionList = Collections.synchronizedList(newLinkedList<Connection>());
??private?Listener?listener= null;
??private?Responder?responder= null;
??private int numConnections = 0;
??private?Handler[]handlers = null;
?
??//內部類Server.Call,包裝請求參數
??private static class?Call?{
?? ? private int id;? ? ?? ? ?? ? ?? ? ?? ? ? // theclient's call id ? ?
?? ? private Writable param;? ? ?? ? ?? ? ?? ? // the parameter passed? ?
?? ? private?Connection?connection;? ?
?? ? private ByteBuffer response;? ? ?
? }? ?
?
//內部類Server.Listener?,線程??
??private class?Listener?extendsThread {
?? ??privateServerSocketChannel acceptChannel =null; //the accept channel
?? ??privateSelector selector = null; //theselector that we use for the server
?? ? private?Reader[]readers = null;
?? ? private int currentReader =0;
?? ? private InetSocketAddressaddress; //the address we bind at
?? ? private Random rand = newRandom();
?? ? private longlastCleanupRunTime = 0;?
?? ? private ExecutorServicereadPool; ? ?
?
??//內部類Server.Listener.Reader?,?線程????
?? ?privateclass?Reader?implementsRunnable {
?? ? ? privatevolatile boolean adding = false;
?? ? ? privateSelector readSelector =null;
?? }??
?
???//內部類Server.Responder??,?線程??
?? ?privateclass?Responder?extendsThread {
?? ? ?privateSelector writeSelector;
?? ? ?private intpending; ? ??
?? }?
?
//內部類Server.Connection,而Client.Connection是線程?
?? ?publicclass?Connection?{
?? ? ?privateboolean rpcHeaderRead = false; // if initial rpc header isread
?? ? ?privateboolean headerRead = false;??
?? ? ?privateSocketChannel channel;
?? ? ?privateByteBuffer data;
?? ? ?privateByteBuffer dataLengthBuffer;
?? ? ?privateLinkedList<Call> responseQueue;
?? ? ?privatevolatile int rpcCount = 0; // number of outstanding rpcs
?? ? ?privatelong lastContact;
?? ? ?private intdataLength;
?? ? ?privateSocket socket;
?? ? ?privateString hostAddress;
?? ? ?private intremotePort;
?? ? ?privateInetAddress addr;
?? ??ConnectionHeader header = new ConnectionHeader();
?? ??Class<?> protocol;
?? ? ?privateAuthMethod authMethod;?
?? }??
?
??//內部類Server.Handler,線程?
??private class?Handler?extendsThread {
?}
?
}
?
2.客戶端主要類
public classClient {
privateHashtable<ConnectionId, Connection> connections = new Hashtable<ConnectionId,Connection>();?
? privateClass<? extendsWritable>?valueClass;??
? private intcounter; ? ? ?? ? ?? ? ?? ? ?? ?// counter for call ids
? privateAtomicBoolean running = new AtomicBoolean(true); // if clientruns
? finalprivate Configuration conf;
? privateSocketFactory socketFactory; ? ?? ? ? // how tocreate sockets
?
? private intrefCount = 1;
//內部類Client.Call
?? private class Call {
?? ? int id; ?? ? ?? ? ?? ? ?? ? ?? ? ?? ? ? // callid
?? ? Writable param;? ? ?? ? ?? ? ?? ? ?? ? ? //parameter
?? ? Writable value;? ? ?? ? ?? ? ?? ? ?? ? ? // value,null if error
?? ? IOException error;? ? ?? ? ?? ? ?? ? ?? ?// exception, null ifvalue
?? ? boolean done;?
??}
???//內部類Client.Connection?,線程?,而Server.Connection不是線程?
?? private class Connection extendsThread {
????? ??privateInetSocketAddress server; ? ?? ? ?? // server ip:port
????? ??privateString serverPrincipal; ?// server's krb5principal name
????? ??privateConnectionHeader header; ? ?? ? ?? ?// connection header
????? ??privatefinal ConnectionId remoteId; ?? ? ?? ? ??// connection id
????? ??privateAuthMethod authMethod; // authentication method
?? ? ?? private Socket socket = null; ?? ? ?? ? ?? // connected socket
??? ????privateDataInputStream in;
????? ??privateDataOutputStream out;
????? ??private intrpcTimeout;
????? ??private intmaxIdleTime; ?
????? ??private intmaxRetries; //the max. no. of retries for socket connections
????? ??privateboolean tcpNoDelay; // if T then disable Nagle's Algorithm
????? ??private intpingInterval; /?
??? ????privateHashtable<Integer, Call> calls= new Hashtable<Integer, Call>();
????? ??privateAtomicLong lastActivity = new AtomicLong();?
????? ??privateAtomicBoolean shouldCloseConnection = new AtomicBoolean();??
?? ? ?? private IOException closeException; // closereason ?
?? ?//內部類Client.Connection.PingInputStream
?? ? ?? private class?PingInputStream?extendsFilterInputStream {
?? }
?? }
???//內部類Client.ParallelCall?
?privateclass ParallelCall extends Call {
?? ? ?privateParallelResults results;
?? ? ?private intindex;
?? } ??
???//內部類Client.ParallelResults?
?? ?private static classParallelResults {
?? ? ? privateWritable[] values;
?? ? ? privateint size;
?? ? ? privateint count;
?? ? //
?? ?}
???
??//內部類Client.ConnectionId?
?? static class ConnectionId {
?? ???InetSocketAddress address;
??????UserGroupInformationticket;
??????Class<?>protocol;
??????privatestatic final int PRIME = 16777619;
??????privateint rpcTimeout;
??????privateString serverPrincipal;
??????privateint maxIdleTime; ?
??????privateint maxRetries; //the max. no. of retries for socketconnections
??????privateboolean tcpNoDelay; // if T then disable Nagle's Algorithm
??????privateint pingInterval; // how often sends ping to the server inmsecs
?? ?}
?? ?
}
3.RPC主要類
public class RPC {
??private static ClientCache CLIENTS=newClientCache();
??//內部類RPC.ClientCache?
??static privateclass?ClientCache?{
?? ? private Map<SocketFactory,?Client>?clients= ?new HashMap<SocketFactory, Client>();
? }
?//內部類RPC.Invocation ,只是一個包裝請求參數的普通類,不執行動態代理方法
?? private static class Invocationimplements Writable, Configurable {
?? ? ?privateString methodName;
?? ? ?privateClass[] parameterClasses;
?? ? ?privateObject[] parameters;
?
?? ? ?privateConfiguration conf;
? }
??//內部類RPC.Invoker?,執行動態代理方法
?? private static class Invoker implementsInvocationHandler {
?? ? ? privateClient.ConnectionId remoteId;
?? ? ? privateClient client;
?? ? ? privateboolean isClosed = false;
?? }
????//內部類RPC.VersionMismatch?
?? ?public static classVersionMismatch extends IOException {
?? ? ? privateString interfaceName;
?? ? ? privatelong clientVersion;
?? ? ? privatelong serverVersion;
?? } ?
???//內部類RPC.Server?,添加了兩個成員??instance,verbose
??public static class Server extendsorg.apache.hadoop.ipc.Server {
?? ? ?? private Object instance;
?? ? ?? private boolean verbose;
?? }
}
4.其他類
?
//IPC所有類都要實現的接口
publicinterface?VersionedProtocol?{
? publiclong getProtocolVersion(String protocol, ?longclientVersion) throws IOException;
}
?
?
//連接頭信息,包括protocol,userGroupInformation?, ?authMethod三個成員變量
class?ConnectionHeader?implementsWritable ?{?
?? private String protocol;
?? private UserGroupInformation ugi =null;
?? private AuthMethod authMethod; ......
}?
?
//訪問狀況,包括SUCCESS、ERROR、FATAL
enum?Status{
? SUCCESS(0),
? ERROR(1),
? FATAL(-1);....... ?
}
?
//包裝IO異常
publicclass?RemoteException?extendsIOException {
}
?
?