View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.ipc;
19  
20  import io.netty.buffer.ByteBuf;
21  import io.netty.buffer.ByteBufInputStream;
22  import io.netty.channel.ChannelHandlerContext;
23  import io.netty.channel.ChannelInboundHandlerAdapter;
24  
25  import java.io.IOException;
26  
27  import org.apache.commons.logging.Log;
28  import org.apache.commons.logging.LogFactory;
29  import org.apache.hadoop.hbase.CellScanner;
30  import org.apache.hadoop.hbase.classification.InterfaceAudience;
31  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
32  import org.apache.hadoop.hbase.protobuf.generated.RPCProtos;
33  import org.apache.hadoop.ipc.RemoteException;
34  
35  import com.google.protobuf.Message;
36  
37  /**
38   * Handles Hbase responses
39   */
40  @InterfaceAudience.Private
41  public class AsyncServerResponseHandler extends ChannelInboundHandlerAdapter {
42    public static final Log LOG = LogFactory.getLog(AsyncServerResponseHandler.class.getName());
43  
44    private final AsyncRpcChannel channel;
45  
46    /**
47     * Constructor
48     *
49     * @param channel on which this response handler operates
50     */
51    public AsyncServerResponseHandler(AsyncRpcChannel channel) {
52      this.channel = channel;
53    }
54  
55    @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
56      ByteBuf inBuffer = (ByteBuf) msg;
57      ByteBufInputStream in = new ByteBufInputStream(inBuffer);
58      int totalSize = inBuffer.readableBytes();
59      try {
60        // Read the header
61        RPCProtos.ResponseHeader responseHeader = RPCProtos.ResponseHeader.parseDelimitedFrom(in);
62        int id = responseHeader.getCallId();
63        AsyncCall call = channel.removePendingCall(id);
64        if (call == null) {
65          // So we got a response for which we have no corresponding 'call' here on the client-side.
66          // We probably timed out waiting, cleaned up all references, and now the server decides
67          // to return a response.  There is nothing we can do w/ the response at this stage. Clean
68          // out the wire of the response so its out of the way and we can get other responses on
69          // this connection.
70          int readSoFar = IPCUtil.getTotalSizeWhenWrittenDelimited(responseHeader);
71          int whatIsLeftToRead = totalSize - readSoFar;
72  
73          // This is done through a Netty ByteBuf which has different behavior than InputStream.
74          // It does not return number of bytes read but will update pointer internally and throws an
75          // exception when too many bytes are to be skipped.
76          inBuffer.skipBytes(whatIsLeftToRead);
77          return;
78        }
79  
80        if (responseHeader.hasException()) {
81          RPCProtos.ExceptionResponse exceptionResponse = responseHeader.getException();
82          RemoteException re = createRemoteException(exceptionResponse);
83          if (exceptionResponse.getExceptionClassName().
84              equals(FatalConnectionException.class.getName())) {
85            channel.close(re);
86          } else {
87            call.setFailed(re);
88          }
89        } else {
90          Message value = null;
91          // Call may be null because it may have timedout and been cleaned up on this side already
92          if (call.responseDefaultType != null) {
93            Message.Builder builder = call.responseDefaultType.newBuilderForType();
94            ProtobufUtil.mergeDelimitedFrom(builder, in);
95            value = builder.build();
96          }
97          CellScanner cellBlockScanner = null;
98          if (responseHeader.hasCellBlockMeta()) {
99            int size = responseHeader.getCellBlockMeta().getLength();
100           byte[] cellBlock = new byte[size];
101           inBuffer.readBytes(cellBlock, 0, cellBlock.length);
102           cellBlockScanner = channel.client.createCellScanner(cellBlock);
103         }
104         call.setSuccess(value, cellBlockScanner);
105       }
106     } catch (IOException e) {
107       // Treat this as a fatal condition and close this connection
108       channel.close(e);
109     } finally {
110       inBuffer.release();
111     }
112   }
113 
114   /**
115    * @param e Proto exception
116    * @return RemoteException made from passed <code>e</code>
117    */
118   private RemoteException createRemoteException(final RPCProtos.ExceptionResponse e) {
119     String innerExceptionClassName = e.getExceptionClassName();
120     boolean doNotRetry = e.getDoNotRetry();
121     return e.hasHostname() ?
122         // If a hostname then add it to the RemoteWithExtrasException
123         new RemoteWithExtrasException(innerExceptionClassName, e.getStackTrace(), e.getHostname(),
124             e.getPort(), doNotRetry) :
125         new RemoteWithExtrasException(innerExceptionClassName, e.getStackTrace(), doNotRetry);
126   }
127 }