View Javadoc

1   /**
2    * Copyright 2010 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  
21  package org.apache.hadoop.hbase.client;
22  
23  import java.io.IOException;
24  import java.lang.reflect.UndeclaredThrowableException;
25  import java.net.ConnectException;
26  import java.net.SocketTimeoutException;
27  import java.util.ArrayList;
28  import java.util.List;
29  import java.util.concurrent.Callable;
30  
31  import org.apache.hadoop.conf.Configuration;
32  import org.apache.hadoop.hbase.DoNotRetryIOException;
33  import org.apache.hadoop.hbase.HConstants;
34  import org.apache.hadoop.hbase.HRegionInfo;
35  import org.apache.hadoop.hbase.HRegionLocation;
36  import org.apache.hadoop.hbase.NotServingRegionException;
37  import org.apache.hadoop.hbase.ipc.HBaseRPC;
38  import org.apache.hadoop.hbase.ipc.HRegionInterface;
39  import org.apache.hadoop.hbase.util.Bytes;
40  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
41  import org.apache.hadoop.ipc.RemoteException;
42  
43  /**
44   * Abstract class that implements {@link Callable}.  Implementation stipulates
45   * return type and method we actually invoke on remote Server.  Usually
46   * used inside a try/catch that fields usual connection failures all wrapped
47   * up in a retry loop.
48   * <p>Call {@link #connect(boolean)} to connect to server hosting region
49   * that contains the passed row in the passed table before invoking
50   * {@link #call()}.
51   * @see HConnection#getRegionServerWithoutRetries(ServerCallable)
52   * @param <T> the class that the ServerCallable handles
53   */
54  public abstract class ServerCallable<T> implements Callable<T> {
55    protected final HConnection connection;
56    protected final byte [] tableName;
57    protected final byte [] row;
58    protected HRegionLocation location;
59    protected HRegionInterface server;
60    protected int callTimeout;
61    protected long globalStartTime;
62    protected long startTime, endTime;
63    protected final static int MIN_RPC_TIMEOUT = 2000;
64  
65    /**
66     * @param connection Connection to use.
67     * @param tableName Table name to which <code>row</code> belongs.
68     * @param row The row we want in <code>tableName</code>.
69     */
70    public ServerCallable(HConnection connection, byte [] tableName, byte [] row) {
71      this(connection, tableName, row, HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
72    }
73  
74    public ServerCallable(HConnection connection, byte [] tableName, byte [] row, int callTimeout) {
75      this.connection = connection;
76      this.tableName = tableName;
77      this.row = row;
78      this.callTimeout = callTimeout;
79    }
80  
81    /**
82     * Connect to the server hosting region with row from tablename.
83     * @param reload Set this to true if connection should re-find the region
84     * @throws IOException e
85     */
86    public void connect(final boolean reload) throws IOException {
87      this.location = connection.getRegionLocation(tableName, row, reload);
88      this.server = connection.getHRegionConnection(location.getHostname(),
89        location.getPort());
90    }
91  
92    /** @return the server name
93     * @deprecated Just use {@link #toString()} instead.
94     */
95    public String getServerName() {
96      if (location == null) return null;
97      return location.getHostnamePort();
98    }
99  
100   /** @return the region name
101    * @deprecated Just use {@link #toString()} instead.
102    */
103   public byte[] getRegionName() {
104     if (location == null) return null;
105     return location.getRegionInfo().getRegionName();
106   }
107 
108   /** @return the row
109    * @deprecated Just use {@link #toString()} instead.
110    */
111   public byte [] getRow() {
112     return row;
113   }
114 
115   public void beforeCall() {
116     this.startTime = EnvironmentEdgeManager.currentTimeMillis();
117     int remaining = (int)(callTimeout - (this.startTime - this.globalStartTime));
118     if (remaining < MIN_RPC_TIMEOUT) {
119       // If there is no time left, we're trying anyway. It's too late.
120       // 0 means no timeout, and it's not the intent here. So we secure both cases by
121       // resetting to the minimum.
122       remaining = MIN_RPC_TIMEOUT;
123     }
124     HBaseRPC.setRpcTimeout(remaining);
125   }
126 
127   public void afterCall() {
128     HBaseRPC.resetRpcTimeout();
129     this.endTime = EnvironmentEdgeManager.currentTimeMillis();
130   }
131 
132   /**
133    * @return {@link HConnection} instance used by this Callable.
134    */
135   HConnection getConnection() {
136     return this.connection;
137   }
138 
139   /**
140    * Run this instance with retries, timed waits,
141    * and refinds of missing regions.
142    *
143    * @param <T> the type of the return value
144    * @return an object of type T
145    * @throws IOException if a remote or network exception occurs
146    * @throws RuntimeException other unspecified error
147    */
148   public T withRetries()
149   throws IOException, RuntimeException {
150     Configuration c = getConnection().getConfiguration();
151     final long pause = c.getLong(HConstants.HBASE_CLIENT_PAUSE,
152       HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
153     final int numRetries = c.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
154       HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
155     List<RetriesExhaustedException.ThrowableWithExtraContext> exceptions =
156       new ArrayList<RetriesExhaustedException.ThrowableWithExtraContext>();
157     globalStartTime = EnvironmentEdgeManager.currentTimeMillis();
158     long expectedSleep = 0;
159 
160     for (int tries = 0; tries < numRetries; tries++) {
161       try {
162         beforeCall();
163         connect(tries != 0);
164         return call();
165       } catch (Throwable t) {
166         t = translateException(t);
167         if (t instanceof SocketTimeoutException ||
168             t instanceof ConnectException ||
169             t instanceof RetriesExhaustedException) {
170           // if thrown these exceptions, we clear all the cache entries that
171           // map to that slow/dead server; otherwise, let cache miss and ask
172           // .META. again to find the new location
173           HRegionLocation hrl = location;
174           if (hrl != null) {
175             getConnection().clearCaches(hrl.getHostnamePort());
176           }
177         } else if (t instanceof NotServingRegionException && numRetries == 1) {
178           // Purge cache entries for this specific region from META cache
179           // since we don't call connect(true) when number of retries is 1.
180           getConnection().deleteCachedRegionLocation(location);
181         }
182         RetriesExhaustedException.ThrowableWithExtraContext qt =
183           new RetriesExhaustedException.ThrowableWithExtraContext(t,
184             EnvironmentEdgeManager.currentTimeMillis(), toString());
185         exceptions.add(qt);
186         if (tries == numRetries - 1) {
187           throw new RetriesExhaustedException(tries, exceptions);
188         }
189         // If the server is dead, we need to wait a little before retrying, to give
190         //  a chance to the regions to be
191         // tries hasn't been bumped up yet so we use "tries + 1" to get right pause time
192         expectedSleep = ConnectionUtils.getPauseTime(pause, tries + 1);
193 
194         // If, after the planned sleep, there won't be enough time left, we stop now.
195         long duration = singleCallDuration(expectedSleep);
196         if (duration > this.callTimeout) {
197           throw (SocketTimeoutException) new SocketTimeoutException(
198             "Call to access row '" + Bytes.toString(row) + "' on table '"
199               + Bytes.toString(tableName) + "' failed on timeout. "
200               + " callTimeout=" + this.callTimeout + ", callDuration="
201               + duration).initCause(t);
202         }
203       } finally {
204         afterCall();
205       }
206       try {
207         Thread.sleep(expectedSleep);
208       } catch (InterruptedException e) {
209         Thread.currentThread().interrupt();
210         throw new IOException("Giving up after tries=" + tries, e);
211       }
212     }
213     return null;
214   }
215 
216   /**
217    * Run this instance against the server once.
218    * @param <T> the type of the return value
219    * @return an object of type T
220    * @throws IOException if a remote or network exception occurs
221    * @throws RuntimeException other unspecified error
222    */
223   public T withoutRetries()
224   throws IOException, RuntimeException {
225     globalStartTime = EnvironmentEdgeManager.currentTimeMillis();
226     try {
227       beforeCall();
228       connect(false);
229       return call();
230     } catch (Throwable t) {
231       Throwable t2 = translateException(t);
232       if (t2 instanceof IOException) {
233         throw (IOException)t2;
234       } else {
235         throw new RuntimeException(t2);
236       }
237     } finally {
238       afterCall();
239     }
240   }
241 
242   /**
243    * @param expectedSleep
244    * @return Calculate how long a single call took
245    */
246   private long singleCallDuration(final long expectedSleep) {
247     return (EnvironmentEdgeManager.currentTimeMillis() - this.globalStartTime)
248       + MIN_RPC_TIMEOUT + expectedSleep;
249   }
250 
251   private static Throwable translateException(Throwable t) throws IOException {
252     if (t instanceof UndeclaredThrowableException) {
253       t = t.getCause();
254     }
255     if (t instanceof RemoteException) {
256       t = ((RemoteException)t).unwrapRemoteException();
257     }
258     if (t instanceof DoNotRetryIOException) {
259       throw (DoNotRetryIOException)t;
260     }
261     return t;
262   }
263 
264   /**
265    * @return the HRegionInfo for the current region
266    */
267   public HRegionInfo getHRegionInfo() {
268     if (this.location == null) {
269       return null;
270     }
271     return this.location.getRegionInfo();
272   }
273 }