1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.ipc;
19
20 import io.netty.buffer.ByteBuf;
21 import io.netty.buffer.ByteBufInputStream;
22 import io.netty.channel.ChannelHandlerContext;
23 import io.netty.channel.ChannelInboundHandlerAdapter;
24
25 import java.io.IOException;
26
27 import org.apache.hadoop.hbase.CellScanner;
28 import org.apache.hadoop.hbase.classification.InterfaceAudience;
29 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
30 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos;
31 import org.apache.hadoop.ipc.RemoteException;
32
33 import com.google.protobuf.Message;
34
35
36
37
38 @InterfaceAudience.Private
39 public class AsyncServerResponseHandler extends ChannelInboundHandlerAdapter {
40 private final AsyncRpcChannel channel;
41
42
43
44
45
46
47 public AsyncServerResponseHandler(AsyncRpcChannel channel) {
48 this.channel = channel;
49 }
50
51 @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
52 ByteBuf inBuffer = (ByteBuf) msg;
53 ByteBufInputStream in = new ByteBufInputStream(inBuffer);
54 int totalSize = inBuffer.readableBytes();
55 try {
56
57 RPCProtos.ResponseHeader responseHeader = RPCProtos.ResponseHeader.parseDelimitedFrom(in);
58 int id = responseHeader.getCallId();
59 AsyncCall call = channel.removePendingCall(id);
60 if (call == null) {
61
62
63
64
65
66 int readSoFar = IPCUtil.getTotalSizeWhenWrittenDelimited(responseHeader);
67 int whatIsLeftToRead = totalSize - readSoFar;
68
69
70
71
72 inBuffer.skipBytes(whatIsLeftToRead);
73 return;
74 }
75
76 if (responseHeader.hasException()) {
77 RPCProtos.ExceptionResponse exceptionResponse = responseHeader.getException();
78 RemoteException re = createRemoteException(exceptionResponse);
79 if (exceptionResponse.getExceptionClassName().
80 equals(FatalConnectionException.class.getName())) {
81 channel.close(re);
82 } else {
83 call.setFailed(re);
84 }
85 } else {
86 Message value = null;
87
88 if (call.responseDefaultType != null) {
89 Message.Builder builder = call.responseDefaultType.newBuilderForType();
90 ProtobufUtil.mergeDelimitedFrom(builder, in);
91 value = builder.build();
92 }
93 CellScanner cellBlockScanner = null;
94 if (responseHeader.hasCellBlockMeta()) {
95 int size = responseHeader.getCellBlockMeta().getLength();
96 byte[] cellBlock = new byte[size];
97 inBuffer.readBytes(cellBlock, 0, cellBlock.length);
98 cellBlockScanner = channel.client.createCellScanner(cellBlock);
99 }
100 call.setSuccess(value, cellBlockScanner);
101 call.callStats.setResponseSizeBytes(totalSize);
102 }
103 } catch (IOException e) {
104
105 channel.close(e);
106 } finally {
107 inBuffer.release();
108 }
109 }
110
111
112
113
114
115 private RemoteException createRemoteException(final RPCProtos.ExceptionResponse e) {
116 String innerExceptionClassName = e.getExceptionClassName();
117 boolean doNotRetry = e.getDoNotRetry();
118 return e.hasHostname() ?
119
120 new RemoteWithExtrasException(innerExceptionClassName, e.getStackTrace(), e.getHostname(),
121 e.getPort(), doNotRetry) :
122 new RemoteWithExtrasException(innerExceptionClassName, e.getStackTrace(), doNotRetry);
123 }
124 }