View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.ipc;
19  
20  import com.google.protobuf.Descriptors;
21  import com.google.protobuf.Message;
22  import io.netty.channel.EventLoop;
23  import io.netty.util.concurrent.DefaultPromise;
24  import org.apache.commons.logging.Log;
25  import org.apache.commons.logging.LogFactory;
26  import org.apache.hadoop.hbase.CellScanner;
27  import org.apache.hadoop.hbase.classification.InterfaceAudience;
28  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
29  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
30  import org.apache.hadoop.hbase.util.ExceptionUtil;
31  import org.apache.hadoop.ipc.RemoteException;
32  
33  import java.io.IOException;
34  
35  /**
36   * Represents an Async Hbase call and its response.
37   *
38   * Responses are passed on to its given doneHandler and failures to the rpcController
39   */
40  @InterfaceAudience.Private
41  public class AsyncCall extends DefaultPromise<Message> {
42    public static final Log LOG = LogFactory.getLog(AsyncCall.class.getName());
43  
44    final int id;
45  
46    final Descriptors.MethodDescriptor method;
47    final Message param;
48    final PayloadCarryingRpcController controller;
49    final Message responseDefaultType;
50    final long startTime;
51    final long rpcTimeout;
52  
53    /**
54     * Constructor
55     *
56     * @param eventLoop           for call
57     * @param connectId           connection id
58     * @param md                  the method descriptor
59     * @param param               parameters to send to Server
60     * @param controller          controller for response
61     * @param responseDefaultType the default response type
62     */
63    public AsyncCall(EventLoop eventLoop, int connectId, Descriptors.MethodDescriptor md, Message
64        param, PayloadCarryingRpcController controller, Message responseDefaultType) {
65      super(eventLoop);
66  
67      this.id = connectId;
68  
69      this.method = md;
70      this.param = param;
71      this.controller = controller;
72      this.responseDefaultType = responseDefaultType;
73  
74      this.startTime = EnvironmentEdgeManager.currentTime();
75      this.rpcTimeout = controller.hasCallTimeout() ? controller.getCallTimeout() : 0;
76    }
77  
78    /**
79     * Get the start time
80     *
81     * @return start time for the call
82     */
83    public long getStartTime() {
84      return this.startTime;
85    }
86  
87    @Override
88    public String toString() {
89      return "callId=" + this.id + ", method=" + this.method.getName() +
90        ", rpcTimeout=" + this.rpcTimeout + ", param {" +
91        (this.param != null ? ProtobufUtil.getShortTextFormat(this.param) : "") + "}";
92    }
93  
94    /**
95     * Set success with a cellBlockScanner
96     *
97     * @param value            to set
98     * @param cellBlockScanner to set
99     */
100   public void setSuccess(Message value, CellScanner cellBlockScanner) {
101     if (cellBlockScanner != null) {
102       controller.setCellScanner(cellBlockScanner);
103     }
104 
105     if (LOG.isTraceEnabled()) {
106       long callTime = EnvironmentEdgeManager.currentTime() - startTime;
107       LOG.trace("Call: " + method.getName() + ", callTime: " + callTime + "ms");
108     }
109 
110     this.setSuccess(value);
111   }
112 
113   /**
114    * Set failed
115    *
116    * @param exception to set
117    */
118   public void setFailed(IOException exception) {
119     if (ExceptionUtil.isInterrupt(exception)) {
120       exception = ExceptionUtil.asInterrupt(exception);
121     }
122     if (exception instanceof RemoteException) {
123       exception = ((RemoteException) exception).unwrapRemoteException();
124     }
125 
126     this.setFailure(exception);
127   }
128 
129   /**
130    * Get the rpc timeout
131    *
132    * @return current timeout for this call
133    */
134   public long getRpcTimeout() {
135     return rpcTimeout;
136   }
137 }