1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.client;
21  
22  import org.apache.commons.logging.Log;
23  import org.apache.commons.logging.LogFactory;
24  import com.google.protobuf.ServiceException;
25  import org.apache.hadoop.classification.InterfaceAudience;
26  import org.apache.hadoop.classification.InterfaceStability;
27  import org.apache.hadoop.conf.Configuration;
28  import org.apache.hadoop.hbase.HConstants;
29  import org.apache.hadoop.hbase.HRegionLocation;
30  import org.apache.hadoop.hbase.exceptions.DoNotRetryIOException;
31  import org.apache.hadoop.hbase.exceptions.NotServingRegionException;
32  import org.apache.hadoop.hbase.ipc.RpcClient;
33  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
34  import org.apache.hadoop.hbase.util.Bytes;
35  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
36  import org.apache.hadoop.ipc.RemoteException;
37  
38  import java.io.IOException;
39  import java.lang.reflect.UndeclaredThrowableException;
40  import java.net.ConnectException;
41  import java.net.SocketTimeoutException;
42  import java.util.ArrayList;
43  import java.util.List;
44  import java.util.concurrent.Callable;
45  
46  /**
47   * Abstract class that implements {@link Callable}.  Implementation stipulates
48   * return type and method we actually invoke on remote Server.  Usually
49   * used inside a try/catch that fields usual connection failures all wrapped
50   * up in a retry loop.
51   * <p>Call {@link #prepare(boolean)} to connect to server hosting region
52   * that contains the passed row in the passed table before invoking
53   * {@link #call()}.
54   * @see HConnection#getRegionServerWithoutRetries(ServerCallable)
55   * @param <T> the class that the ServerCallable handles
56   */
57  @InterfaceAudience.Public
58  @InterfaceStability.Stable
59  public abstract class ServerCallable<T> implements Callable<T> {
60    static final Log LOG = LogFactory.getLog(ServerCallable.class);
61  
62    protected final HConnection connection;
63    protected final byte [] tableName;
64    protected final byte [] row;
65    protected HRegionLocation location;
66    protected ClientService.BlockingInterface stub;
67    protected int callTimeout;
68    protected long globalStartTime;
69    protected long startTime, endTime;
70    protected final static int MIN_RPC_TIMEOUT = 2000;
71    protected final static int MIN_WAIT_DEAD_SERVER = 10000;
72  
73    /**
74     * @param connection Connection to use.
75     * @param tableName Table name to which <code>row</code> belongs.
76     * @param row The row we want in <code>tableName</code>.
77     */
78    public ServerCallable(HConnection connection, byte [] tableName, byte [] row) {
79      this(connection, tableName, row, HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
80    }
81  
82    public ServerCallable(HConnection connection, byte [] tableName, byte [] row, int callTimeout) {
83      this.connection = connection;
84      this.tableName = tableName;
85      this.row = row;
86      this.callTimeout = callTimeout;
87    }
88  
89    /**
90     * Prepare for connection to the server hosting region with row from tablename.  Does lookup
91     * to find region location and hosting server.
92     * @param reload Set this to true if connection should re-find the region
93     * @throws IOException e
94     */
95    public void prepare(final boolean reload) throws IOException {
96      this.location = connection.getRegionLocation(tableName, row, reload);
97      this.stub = connection.getClient(location.getServerName());
98    }
99  
100   /** @return the server name
101    * @deprecated Just use {@link #toString()} instead.
102    */
103   public String getServerName() {
104     if (location == null) return null;
105     return location.getHostnamePort();
106   }
107 
108   /** @return the region name
109    * @deprecated Just use {@link #toString()} instead.
110    */
111   public byte[] getRegionName() {
112     if (location == null) return null;
113     return location.getRegionInfo().getRegionName();
114   }
115 
116   /** @return the row
117    * @deprecated Just use {@link #toString()} instead.
118    */
119   public byte [] getRow() {
120     return row;
121   }
122 
123   public void beforeCall() {
124     this.startTime = EnvironmentEdgeManager.currentTimeMillis();
125     int remaining = (int)(callTimeout - (this.startTime - this.globalStartTime));
126     if (remaining < MIN_RPC_TIMEOUT) {
127       // If there is no time left, we're trying anyway. It's too late.
128       // 0 means no timeout, and it's not the intent here. So we secure both cases by
129       // resetting to the minimum.
130       remaining = MIN_RPC_TIMEOUT;
131     }
132     RpcClient.setRpcTimeout(remaining);
133   }
134 
135   public void afterCall() {
136     RpcClient.resetRpcTimeout();
137     this.endTime = EnvironmentEdgeManager.currentTimeMillis();
138   }
139 
140   /**
141    * @return {@link HConnection} instance used by this Callable.
142    */
143   HConnection getConnection() {
144     return this.connection;
145   }
146 
147   /**
148    * Run this instance with retries, timed waits,
149    * and refinds of missing regions.
150    *
151    * @return an object of type T
152    * @throws IOException if a remote or network exception occurs
153    * @throws RuntimeException other unspecified error
154    */
155   public T withRetries()
156   throws IOException, RuntimeException {
157     Configuration c = getConnection().getConfiguration();
158     final long pause = c.getLong(HConstants.HBASE_CLIENT_PAUSE,
159       HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
160     final int numRetries = c.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
161       HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
162     List<RetriesExhaustedException.ThrowableWithExtraContext> exceptions =
163       new ArrayList<RetriesExhaustedException.ThrowableWithExtraContext>();
164     this.globalStartTime = EnvironmentEdgeManager.currentTimeMillis();
165     for (int tries = 0;; tries++) {
166       long expectedSleep = 0;
167       try {
168         beforeCall();
169         prepare(tries != 0); // if called with false, check table status on ZK
170         return call();
171       } catch (Throwable t) {
172         LOG.warn("Call exception, tries=" + tries + ", numRetries=" + numRetries + ": " + t);
173 
174         t = translateException(t);
175         // translateException throws an exception when we should not retry, i.e. when it's the
176         //  request that is bad.
177 
178         if (t instanceof SocketTimeoutException ||
179             t instanceof ConnectException ||
180             t instanceof RetriesExhaustedException ||
181             (location != null && getConnection().isDeadServer(location.getServerName()))) {
182           // if thrown these exceptions, we clear all the cache entries that
183           // map to that slow/dead server; otherwise, let cache miss and ask
184           // .META. again to find the new location
185           getConnection().clearCaches(location.getServerName());
186         } else if (t instanceof NotServingRegionException && numRetries == 1) {
187           // Purge cache entries for this specific region from META cache
188           // since we don't call connect(true) when number of retries is 1.
189           getConnection().deleteCachedRegionLocation(location);
190         }
191 
192         RetriesExhaustedException.ThrowableWithExtraContext qt =
193           new RetriesExhaustedException.ThrowableWithExtraContext(t,
194               EnvironmentEdgeManager.currentTimeMillis(), toString());
195         exceptions.add(qt);
196         if (tries >= numRetries - 1) {
197           throw new RetriesExhaustedException(tries, exceptions);
198         }
199 
200         // If the server is dead, we need to wait a little before retrying, to give
201         //  a chance to the regions to be
202         expectedSleep = ConnectionUtils.getPauseTime(pause, tries);
203         if (expectedSleep < MIN_WAIT_DEAD_SERVER 
204             && (location == null || getConnection().isDeadServer(location.getServerName()))) {
205           expectedSleep = ConnectionUtils.addJitter(MIN_WAIT_DEAD_SERVER, 0.10f);
206         }
207 
208         // If, after the planned sleep, there won't be enough time left, we stop now.
209         if (((this.endTime - this.globalStartTime) + MIN_RPC_TIMEOUT + expectedSleep) >
210             this.callTimeout) {
211           throw (SocketTimeoutException) new SocketTimeoutException(
212               "Call to access row '" + Bytes.toString(row) + "' on table '"
213                   + Bytes.toString(tableName)
214                   + "' failed on timeout. " + " callTimeout=" + this.callTimeout +
215                   ", time=" + (this.endTime - this.startTime)).initCause(t);
216         }
217       } finally {
218         afterCall();
219       }
220       try {
221         Thread.sleep(expectedSleep);
222       } catch (InterruptedException e) {
223         Thread.currentThread().interrupt();
224         throw new IOException("Interrupted after " + tries + " tries  on " + numRetries, e);
225       }
226     }
227   }
228 
229   /**
230    * Run this instance against the server once.
231    * @return an object of type T
232    * @throws IOException if a remote or network exception occurs
233    * @throws RuntimeException other unspecified error
234    */
235   public T withoutRetries()
236   throws IOException, RuntimeException {
237     // The code of this method should be shared with withRetries.
238     this.globalStartTime = EnvironmentEdgeManager.currentTimeMillis();
239     try {
240       beforeCall();
241       prepare(false);
242       return call();
243     } catch (Throwable t) {
244       Throwable t2 = translateException(t);
245       // It would be nice to clear the location cache here.
246       if (t2 instanceof IOException) {
247         throw (IOException)t2;
248       } else {
249         throw new RuntimeException(t2);
250       }
251     } finally {
252       afterCall();
253     }
254   }
255 
256   /**
257    * Get the good or the remote exception if any, throws the DoNotRetryIOException.
258    * @param t the throwable to analyze
259    * @return the translated exception, if it's not a DoNotRetryIOException
260    * @throws DoNotRetryIOException - if we find it, we throw it instead of translating.
261    */
262   protected static Throwable translateException(Throwable t) throws DoNotRetryIOException {
263     if (t instanceof UndeclaredThrowableException) {
264       if(t.getCause() != null) {
265         t = t.getCause();
266       }
267     }
268     if (t instanceof RemoteException) {
269       t = ((RemoteException)t).unwrapRemoteException();
270     }
271     if (t instanceof ServiceException) {
272       ServiceException se = (ServiceException)t;
273       Throwable cause = se.getCause();
274       if (cause != null && cause instanceof DoNotRetryIOException) {
275         throw (DoNotRetryIOException)cause;
276       }
277     } else if (t instanceof DoNotRetryIOException) {
278       throw (DoNotRetryIOException)t;
279     }
280     return t;
281   }
282 }