View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.ipc;
19  
20  import io.netty.buffer.ByteBuf;
21  import io.netty.buffer.ByteBufInputStream;
22  import io.netty.channel.ChannelHandlerContext;
23  import io.netty.channel.ChannelInboundHandlerAdapter;
24  
25  import java.io.IOException;
26  
27  import org.apache.hadoop.hbase.CellScanner;
28  import org.apache.hadoop.hbase.classification.InterfaceAudience;
29  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
30  import org.apache.hadoop.hbase.protobuf.generated.RPCProtos;
31  import org.apache.hadoop.ipc.RemoteException;
32  
33  import com.google.protobuf.Message;
34  
35  /**
36   * Handles Hbase responses
37   */
38  @InterfaceAudience.Private
39  public class AsyncServerResponseHandler extends ChannelInboundHandlerAdapter {
40    private final AsyncRpcChannel channel;
41  
42    /**
43     * Constructor
44     *
45     * @param channel on which this response handler operates
46     */
47    public AsyncServerResponseHandler(AsyncRpcChannel channel) {
48      this.channel = channel;
49    }
50  
51    @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
52      ByteBuf inBuffer = (ByteBuf) msg;
53      ByteBufInputStream in = new ByteBufInputStream(inBuffer);
54      int totalSize = inBuffer.readableBytes();
55      try {
56        // Read the header
57        RPCProtos.ResponseHeader responseHeader = RPCProtos.ResponseHeader.parseDelimitedFrom(in);
58        int id = responseHeader.getCallId();
59        AsyncCall call = channel.removePendingCall(id);
60        if (call == null) {
61          // So we got a response for which we have no corresponding 'call' here on the client-side.
62          // We probably timed out waiting, cleaned up all references, and now the server decides
63          // to return a response.  There is nothing we can do w/ the response at this stage. Clean
64          // out the wire of the response so its out of the way and we can get other responses on
65          // this connection.
66          int readSoFar = IPCUtil.getTotalSizeWhenWrittenDelimited(responseHeader);
67          int whatIsLeftToRead = totalSize - readSoFar;
68  
69          // This is done through a Netty ByteBuf which has different behavior than InputStream.
70          // It does not return number of bytes read but will update pointer internally and throws an
71          // exception when too many bytes are to be skipped.
72          inBuffer.skipBytes(whatIsLeftToRead);
73          return;
74        }
75  
76        if (responseHeader.hasException()) {
77          RPCProtos.ExceptionResponse exceptionResponse = responseHeader.getException();
78          RemoteException re = createRemoteException(exceptionResponse);
79          if (exceptionResponse.getExceptionClassName().
80              equals(FatalConnectionException.class.getName())) {
81            channel.close(re);
82          } else {
83            call.setFailed(re);
84          }
85        } else {
86          Message value = null;
87          // Call may be null because it may have timedout and been cleaned up on this side already
88          if (call.responseDefaultType != null) {
89            Message.Builder builder = call.responseDefaultType.newBuilderForType();
90            ProtobufUtil.mergeDelimitedFrom(builder, in);
91            value = builder.build();
92          }
93          CellScanner cellBlockScanner = null;
94          if (responseHeader.hasCellBlockMeta()) {
95            int size = responseHeader.getCellBlockMeta().getLength();
96            byte[] cellBlock = new byte[size];
97            inBuffer.readBytes(cellBlock, 0, cellBlock.length);
98            cellBlockScanner = channel.client.createCellScanner(cellBlock);
99          }
100         call.setSuccess(value, cellBlockScanner);
101         call.callStats.setResponseSizeBytes(totalSize);
102       }
103     } catch (IOException e) {
104       // Treat this as a fatal condition and close this connection
105       channel.close(e);
106     } finally {
107       inBuffer.release();
108     }
109   }
110 
111   /**
112    * @param e Proto exception
113    * @return RemoteException made from passed <code>e</code>
114    */
115   private RemoteException createRemoteException(final RPCProtos.ExceptionResponse e) {
116     String innerExceptionClassName = e.getExceptionClassName();
117     boolean doNotRetry = e.getDoNotRetry();
118     return e.hasHostname() ?
119         // If a hostname then add it to the RemoteWithExtrasException
120         new RemoteWithExtrasException(innerExceptionClassName, e.getStackTrace(), e.getHostname(),
121             e.getPort(), doNotRetry) :
122         new RemoteWithExtrasException(innerExceptionClassName, e.getStackTrace(), doNotRetry);
123   }
124 }