View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.ipc;
19  
20  import com.google.protobuf.Descriptors;
21  import com.google.protobuf.Message;
22  import io.netty.channel.EventLoop;
23  import io.netty.util.concurrent.DefaultPromise;
24  import org.apache.commons.logging.Log;
25  import org.apache.commons.logging.LogFactory;
26  import org.apache.hadoop.hbase.CellScanner;
27  import org.apache.hadoop.hbase.classification.InterfaceAudience;
28  import org.apache.hadoop.hbase.client.MetricsConnection;
29  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
30  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
31  import org.apache.hadoop.hbase.util.ExceptionUtil;
32  import org.apache.hadoop.ipc.RemoteException;
33  
34  import java.io.IOException;
35  
36  /**
37   * Represents an Async Hbase call and its response.
38   *
39   * Responses are passed on to its given doneHandler and failures to the rpcController
40   */
41  @InterfaceAudience.Private
42  public class AsyncCall extends DefaultPromise<Message> {
43    private static final Log LOG = LogFactory.getLog(AsyncCall.class.getName());
44  
45    final int id;
46  
47    final Descriptors.MethodDescriptor method;
48    final Message param;
49    final PayloadCarryingRpcController controller;
50    final Message responseDefaultType;
51    final long startTime;
52    final long rpcTimeout;
53    final MetricsConnection.CallStats callStats;
54  
55    /**
56     * Constructor
57     *
58     * @param eventLoop           for call
59     * @param connectId           connection id
60     * @param md                  the method descriptor
61     * @param param               parameters to send to Server
62     * @param controller          controller for response
63     * @param responseDefaultType the default response type
64     */
65    public AsyncCall(EventLoop eventLoop, int connectId, Descriptors.MethodDescriptor md, Message
66        param, PayloadCarryingRpcController controller, Message responseDefaultType,
67        MetricsConnection.CallStats callStats) {
68      super(eventLoop);
69  
70      this.id = connectId;
71  
72      this.method = md;
73      this.param = param;
74      this.controller = controller;
75      this.responseDefaultType = responseDefaultType;
76  
77      this.startTime = EnvironmentEdgeManager.currentTime();
78      this.rpcTimeout = controller.hasCallTimeout() ? controller.getCallTimeout() : 0;
79      this.callStats = callStats;
80    }
81  
82    /**
83     * Get the start time
84     *
85     * @return start time for the call
86     */
87    public long getStartTime() {
88      return this.startTime;
89    }
90  
91    @Override
92    public String toString() {
93      return "callId=" + this.id + ", method=" + this.method.getName() +
94        ", rpcTimeout=" + this.rpcTimeout + ", param {" +
95        (this.param != null ? ProtobufUtil.getShortTextFormat(this.param) : "") + "}";
96    }
97  
98    /**
99     * Set success with a cellBlockScanner
100    *
101    * @param value            to set
102    * @param cellBlockScanner to set
103    */
104   public void setSuccess(Message value, CellScanner cellBlockScanner) {
105     if (cellBlockScanner != null) {
106       controller.setCellScanner(cellBlockScanner);
107     }
108 
109     if (LOG.isTraceEnabled()) {
110       long callTime = EnvironmentEdgeManager.currentTime() - startTime;
111       LOG.trace("Call: " + method.getName() + ", callTime: " + callTime + "ms");
112     }
113 
114     this.setSuccess(value);
115   }
116 
117   /**
118    * Set failed
119    *
120    * @param exception to set
121    */
122   public void setFailed(IOException exception) {
123     if (ExceptionUtil.isInterrupt(exception)) {
124       exception = ExceptionUtil.asInterrupt(exception);
125     }
126     if (exception instanceof RemoteException) {
127       exception = ((RemoteException) exception).unwrapRemoteException();
128     }
129 
130     this.setFailure(exception);
131   }
132 
133   /**
134    * Get the rpc timeout
135    *
136    * @return current timeout for this call
137    */
138   public long getRpcTimeout() {
139     return rpcTimeout;
140   }
141 }