1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.ipc;
19
20 import io.netty.buffer.ByteBuf;
21 import io.netty.buffer.ByteBufInputStream;
22 import io.netty.channel.ChannelHandlerContext;
23 import io.netty.channel.ChannelInboundHandlerAdapter;
24
25 import java.io.IOException;
26
27 import org.apache.commons.logging.Log;
28 import org.apache.commons.logging.LogFactory;
29 import org.apache.hadoop.hbase.CellScanner;
30 import org.apache.hadoop.hbase.classification.InterfaceAudience;
31 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
32 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos;
33 import org.apache.hadoop.ipc.RemoteException;
34
35 import com.google.protobuf.Message;
36
37
38
39
40 @InterfaceAudience.Private
41 public class AsyncServerResponseHandler extends ChannelInboundHandlerAdapter {
42 public static final Log LOG = LogFactory.getLog(AsyncServerResponseHandler.class.getName());
43
44 private final AsyncRpcChannel channel;
45
46
47
48
49
50
51 public AsyncServerResponseHandler(AsyncRpcChannel channel) {
52 this.channel = channel;
53 }
54
55 @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
56 ByteBuf inBuffer = (ByteBuf) msg;
57 ByteBufInputStream in = new ByteBufInputStream(inBuffer);
58 int totalSize = inBuffer.readableBytes();
59 try {
60
61 RPCProtos.ResponseHeader responseHeader = RPCProtos.ResponseHeader.parseDelimitedFrom(in);
62 int id = responseHeader.getCallId();
63 AsyncCall call = channel.removePendingCall(id);
64 if (call == null) {
65
66
67
68
69
70 int readSoFar = IPCUtil.getTotalSizeWhenWrittenDelimited(responseHeader);
71 int whatIsLeftToRead = totalSize - readSoFar;
72
73
74
75
76 inBuffer.skipBytes(whatIsLeftToRead);
77 return;
78 }
79
80 if (responseHeader.hasException()) {
81 RPCProtos.ExceptionResponse exceptionResponse = responseHeader.getException();
82 RemoteException re = createRemoteException(exceptionResponse);
83 if (exceptionResponse.getExceptionClassName().
84 equals(FatalConnectionException.class.getName())) {
85 channel.close(re);
86 } else {
87 call.setFailed(re);
88 }
89 } else {
90 Message value = null;
91
92 if (call.responseDefaultType != null) {
93 Message.Builder builder = call.responseDefaultType.newBuilderForType();
94 ProtobufUtil.mergeDelimitedFrom(builder, in);
95 value = builder.build();
96 }
97 CellScanner cellBlockScanner = null;
98 if (responseHeader.hasCellBlockMeta()) {
99 int size = responseHeader.getCellBlockMeta().getLength();
100 byte[] cellBlock = new byte[size];
101 inBuffer.readBytes(cellBlock, 0, cellBlock.length);
102 cellBlockScanner = channel.client.createCellScanner(cellBlock);
103 }
104 call.setSuccess(value, cellBlockScanner);
105 }
106 } catch (IOException e) {
107
108 channel.close(e);
109 } finally {
110 inBuffer.release();
111 }
112 }
113
114
115
116
117
118 private RemoteException createRemoteException(final RPCProtos.ExceptionResponse e) {
119 String innerExceptionClassName = e.getExceptionClassName();
120 boolean doNotRetry = e.getDoNotRetry();
121 return e.hasHostname() ?
122
123 new RemoteWithExtrasException(innerExceptionClassName, e.getStackTrace(), e.getHostname(),
124 e.getPort(), doNotRetry) :
125 new RemoteWithExtrasException(innerExceptionClassName, e.getStackTrace(), doNotRetry);
126 }
127 }