View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.client;
20  
21  import java.io.Closeable;
22  import java.io.IOException;
23  import java.io.InterruptedIOException;
24  import java.lang.reflect.UndeclaredThrowableException;
25  import java.net.InetAddress;
26  import java.net.InetSocketAddress;
27  import java.util.ArrayList;
28  import java.util.Date;
29  import java.util.HashSet;
30  import java.util.LinkedHashMap;
31  import java.util.List;
32  import java.util.concurrent.BlockingQueue;
33  import java.util.Map;
34  import java.util.Map.Entry;
35  import java.util.NavigableMap;
36  import java.util.Set;
37  import java.util.concurrent.ConcurrentHashMap;
38  import java.util.concurrent.ConcurrentMap;
39  import java.util.concurrent.ExecutorService;
40  import java.util.concurrent.LinkedBlockingQueue;
41  import java.util.concurrent.ThreadPoolExecutor;
42  import java.util.concurrent.TimeUnit;
43  import java.util.concurrent.atomic.AtomicBoolean;
44  import java.util.concurrent.atomic.AtomicInteger;
45  
46  import org.apache.commons.logging.Log;
47  import org.apache.commons.logging.LogFactory;
48  import org.apache.hadoop.conf.Configuration;
49  import org.apache.hadoop.hbase.DoNotRetryIOException;
50  import org.apache.hadoop.hbase.HBaseConfiguration;
51  import org.apache.hadoop.hbase.HConstants;
52  import org.apache.hadoop.hbase.HRegionInfo;
53  import org.apache.hadoop.hbase.HRegionLocation;
54  import org.apache.hadoop.hbase.HTableDescriptor;
55  import org.apache.hadoop.hbase.MasterNotRunningException;
56  import org.apache.hadoop.hbase.MetaTableAccessor;
57  import org.apache.hadoop.hbase.RegionLocations;
58  import org.apache.hadoop.hbase.RegionTooBusyException;
59  import org.apache.hadoop.hbase.ServerName;
60  import org.apache.hadoop.hbase.TableName;
61  import org.apache.hadoop.hbase.TableNotEnabledException;
62  import org.apache.hadoop.hbase.TableNotFoundException;
63  import org.apache.hadoop.hbase.ZooKeeperConnectionException;
64  import org.apache.hadoop.hbase.classification.InterfaceAudience;
65  import org.apache.hadoop.hbase.client.AsyncProcess.AsyncRequestFuture;
66  import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor;
67  import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitorBase;
68  import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy;
69  import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicyFactory;
70  import org.apache.hadoop.hbase.client.coprocessor.Batch;
71  import org.apache.hadoop.hbase.exceptions.RegionMovedException;
72  import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
73  import org.apache.hadoop.hbase.ipc.RpcClient;
74  import org.apache.hadoop.hbase.ipc.RpcClientFactory;
75  import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
76  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
77  import org.apache.hadoop.hbase.protobuf.RequestConverter;
78  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
79  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
80  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
81  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
82  import org.apache.hadoop.hbase.protobuf.generated.*;
83  import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AddColumnRequest;
84  import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AddColumnResponse;
85  import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AssignRegionRequest;
86  import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AssignRegionResponse;
87  import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.BalanceRequest;
88  import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.BalanceResponse;
89  import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.CreateNamespaceRequest;
90  import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.CreateNamespaceResponse;
91  import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.CreateTableRequest;
92  import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.CreateTableResponse;
93  import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteColumnRequest;
94  import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteColumnResponse;
95  import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteNamespaceRequest;
96  import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteNamespaceResponse;
97  import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteSnapshotRequest;
98  import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteSnapshotResponse;
99  import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteTableRequest;
100 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteTableResponse;
101 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DisableTableRequest;
102 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DisableTableResponse;
103 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DispatchMergingRegionsRequest;
104 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DispatchMergingRegionsResponse;
105 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.EnableCatalogJanitorRequest;
106 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.EnableCatalogJanitorResponse;
107 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.EnableTableRequest;
108 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.EnableTableResponse;
109 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ExecProcedureRequest;
110 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ExecProcedureResponse;
111 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetClusterStatusRequest;
112 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetClusterStatusResponse;
113 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetCompletedSnapshotsRequest;
114 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetCompletedSnapshotsResponse;
115 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetNamespaceDescriptorRequest;
116 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetNamespaceDescriptorResponse;
117 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetSchemaAlterStatusRequest;
118 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetSchemaAlterStatusResponse;
119 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableDescriptorsRequest;
120 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableDescriptorsResponse;
121 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableNamesRequest;
122 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableNamesResponse;
123 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetProcedureResultRequest;
124 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetProcedureResultResponse;
125 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsBalancerEnabledRequest;
126 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsBalancerEnabledResponse;
127 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsCatalogJanitorEnabledRequest;
128 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsCatalogJanitorEnabledResponse;
129 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsMasterRunningRequest;
130 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsMasterRunningResponse;
131 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsProcedureDoneRequest;
132 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsProcedureDoneResponse;
133 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsRestoreSnapshotDoneRequest;
134 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsRestoreSnapshotDoneResponse;
135 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsSnapshotDoneRequest;
136 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsSnapshotDoneResponse;
137 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ListNamespaceDescriptorsRequest;
138 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ListNamespaceDescriptorsResponse;
139 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceRequest;
140 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceResponse;
141 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ListTableNamesByNamespaceRequest;
142 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ListTableNamesByNamespaceResponse;
143 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MajorCompactionTimestampForRegionRequest;
144 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MajorCompactionTimestampRequest;
145 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MajorCompactionTimestampResponse;
146 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MasterService;
147 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ModifyColumnRequest;
148 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ModifyColumnResponse;
149 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ModifyNamespaceRequest;
150 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ModifyNamespaceResponse;
151 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ModifyTableRequest;
152 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ModifyTableResponse;
153 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MoveRegionRequest;
154 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MoveRegionResponse;
155 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.OfflineRegionRequest;
156 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.OfflineRegionResponse;
157 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.RestoreSnapshotRequest;
158 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.RestoreSnapshotResponse;
159 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.RunCatalogScanRequest;
160 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.RunCatalogScanResponse;
161 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SetBalancerRunningRequest;
162 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SetBalancerRunningResponse;
163 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SetQuotaRequest;
164 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SetQuotaResponse;
165 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ShutdownRequest;
166 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ShutdownResponse;
167 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SnapshotRequest;
168 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SnapshotResponse;
169 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.StopMasterRequest;
170 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.StopMasterResponse;
171 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.TruncateTableRequest;
172 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.TruncateTableResponse;
173 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.UnassignRegionRequest;
174 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.UnassignRegionResponse;
175 import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
176 import org.apache.hadoop.hbase.security.User;
177 import org.apache.hadoop.hbase.security.UserProvider;
178 import org.apache.hadoop.hbase.util.Bytes;
179 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
180 import org.apache.hadoop.hbase.util.ExceptionUtil;
181 import org.apache.hadoop.hbase.util.Threads;
182 import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
183 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
184 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
185 import org.apache.hadoop.ipc.RemoteException;
186 import org.apache.zookeeper.KeeperException;
187 
188 import com.google.common.annotations.VisibleForTesting;
189 import com.google.protobuf.BlockingRpcChannel;
190 import com.google.protobuf.RpcController;
191 import com.google.protobuf.ServiceException;
192 
193 /**
194  * An internal, non-instantiable class that manages creation of {@link HConnection}s.
195  */
196 @SuppressWarnings("serial")
197 @InterfaceAudience.Private
198 // NOTE: DO NOT make this class public. It was made package-private on purpose.
199 class ConnectionManager {
200   static final Log LOG = LogFactory.getLog(ConnectionManager.class);
201 
202   public static final String RETRIES_BY_SERVER_KEY = "hbase.client.retries.by.server";
203   private static final String CLIENT_NONCES_ENABLED_KEY = "hbase.client.nonces.enabled";
204 
205   // An LRU Map of HConnectionKey -> HConnection (TableServer).  All
206   // access must be synchronized.  This map is not private because tests
207   // need to be able to tinker with it.
208   static final Map<HConnectionKey, HConnectionImplementation> CONNECTION_INSTANCES;
209 
210   public static final int MAX_CACHED_CONNECTION_INSTANCES;
211 
212   /**
213    * Global nonceGenerator shared per client.Currently there's no reason to limit its scope.
214    * Once it's set under nonceGeneratorCreateLock, it is never unset or changed.
215    */
216   private static volatile NonceGenerator nonceGenerator = null;
217   /** The nonce generator lock. Only taken when creating HConnection, which gets a private copy. */
218   private static Object nonceGeneratorCreateLock = new Object();
219 
220   static {
221     // We set instances to one more than the value specified for {@link
222     // HConstants#ZOOKEEPER_MAX_CLIENT_CNXNS}. By default, the zk default max
223     // connections to the ensemble from the one client is 30, so in that case we
224     // should run into zk issues before the LRU hit this value of 31.
225     MAX_CACHED_CONNECTION_INSTANCES = HBaseConfiguration.create().getInt(
226       HConstants.ZOOKEEPER_MAX_CLIENT_CNXNS, HConstants.DEFAULT_ZOOKEPER_MAX_CLIENT_CNXNS) + 1;
227     CONNECTION_INSTANCES = new LinkedHashMap<HConnectionKey, HConnectionImplementation>(
228         (int) (MAX_CACHED_CONNECTION_INSTANCES / 0.75F) + 1, 0.75F, true) {
229       @Override
230       protected boolean removeEldestEntry(
231           Map.Entry<HConnectionKey, HConnectionImplementation> eldest) {
232          return size() > MAX_CACHED_CONNECTION_INSTANCES;
233        }
234     };
235   }
236 
237   /** Dummy nonce generator for disabled nonces. */
238   static class NoNonceGenerator implements NonceGenerator {
239     @Override
240     public long getNonceGroup() {
241       return HConstants.NO_NONCE;
242     }
243     @Override
244     public long newNonce() {
245       return HConstants.NO_NONCE;
246     }
247   }
248 
249   /*
250    * Non-instantiable.
251    */
252   private ConnectionManager() {
253     super();
254   }
255 
256   /**
257    * @param conn The connection for which to replace the generator.
258    * @param cnm Replaces the nonce generator used, for testing.
259    * @return old nonce generator.
260    */
261   @VisibleForTesting
262   static NonceGenerator injectNonceGeneratorForTesting(
263       ClusterConnection conn, NonceGenerator cnm) {
264     HConnectionImplementation connImpl = (HConnectionImplementation)conn;
265     NonceGenerator ng = connImpl.getNonceGenerator();
266     LOG.warn("Nonce generator is being replaced by test code for " + cnm.getClass().getName());
267     connImpl.nonceGenerator = cnm;
268     return ng;
269   }
270 
271   /**
272    * Get the connection that goes with the passed <code>conf</code> configuration instance.
273    * If no current connection exists, method creates a new connection and keys it using
274    * connection-specific properties from the passed {@link Configuration}; see
275    * {@link HConnectionKey}.
276    * @param conf configuration
277    * @return HConnection object for <code>conf</code>
278    * @throws ZooKeeperConnectionException
279    */
280   @Deprecated
281   public static HConnection getConnection(final Configuration conf) throws IOException {
282     return getConnectionInternal(conf);
283   }
284 
285 
286   static ClusterConnection getConnectionInternal(final Configuration conf)
287     throws IOException {
288     HConnectionKey connectionKey = new HConnectionKey(conf);
289     synchronized (CONNECTION_INSTANCES) {
290       HConnectionImplementation connection = CONNECTION_INSTANCES.get(connectionKey);
291       if (connection == null) {
292         connection = (HConnectionImplementation)createConnection(conf, true);
293         CONNECTION_INSTANCES.put(connectionKey, connection);
294       } else if (connection.isClosed()) {
295         ConnectionManager.deleteConnection(connectionKey, true);
296         connection = (HConnectionImplementation)createConnection(conf, true);
297         CONNECTION_INSTANCES.put(connectionKey, connection);
298       }
299       connection.incCount();
300       return connection;
301     }
302   }
303 
304   /**
305    * Create a new HConnection instance using the passed <code>conf</code> instance.
306    * <p>Note: This bypasses the usual HConnection life cycle management done by
307    * {@link #getConnection(Configuration)}. The caller is responsible for
308    * calling {@link HConnection#close()} on the returned connection instance.
309    *
310    * This is the recommended way to create HConnections.
311    * {@code
312    * HConnection connection = ConnectionManagerInternal.createConnection(conf);
313    * HTableInterface table = connection.getTable("mytable");
314    * table.get(...);
315    * ...
316    * table.close();
317    * connection.close();
318    * }
319    *
320    * @param conf configuration
321    * @return HConnection object for <code>conf</code>
322    * @throws ZooKeeperConnectionException
323    */
324   public static HConnection createConnection(Configuration conf) throws IOException {
325     return createConnectionInternal(conf);
326   }
327 
328   static ClusterConnection createConnectionInternal(Configuration conf) throws IOException {
329     UserProvider provider = UserProvider.instantiate(conf);
330     return createConnection(conf, false, null, provider.getCurrent());
331   }
332 
333   /**
334    * Create a new HConnection instance using the passed <code>conf</code> instance.
335    * <p>Note: This bypasses the usual HConnection life cycle management done by
336    * {@link #getConnection(Configuration)}. The caller is responsible for
337    * calling {@link HConnection#close()} on the returned connection instance.
338    * This is the recommended way to create HConnections.
339    * {@code
340    * ExecutorService pool = ...;
341    * HConnection connection = HConnectionManager.createConnection(conf, pool);
342    * HTableInterface table = connection.getTable("mytable");
343    * table.get(...);
344    * ...
345    * table.close();
346    * connection.close();
347    * }
348    * @param conf configuration
349    * @param pool the thread pool to use for batch operation in HTables used via this HConnection
350    * @return HConnection object for <code>conf</code>
351    * @throws ZooKeeperConnectionException
352    */
353   public static HConnection createConnection(Configuration conf, ExecutorService pool)
354   throws IOException {
355     UserProvider provider = UserProvider.instantiate(conf);
356     return createConnection(conf, false, pool, provider.getCurrent());
357   }
358 
359   /**
360    * Create a new HConnection instance using the passed <code>conf</code> instance.
361    * <p>Note: This bypasses the usual HConnection life cycle management done by
362    * {@link #getConnection(Configuration)}. The caller is responsible for
363    * calling {@link HConnection#close()} on the returned connection instance.
364    * This is the recommended way to create HConnections.
365    * {@code
366    * ExecutorService pool = ...;
367    * HConnection connection = HConnectionManager.createConnection(conf, pool);
368    * HTableInterface table = connection.getTable("mytable");
369    * table.get(...);
370    * ...
371    * table.close();
372    * connection.close();
373    * }
374    * @param conf configuration
375    * @param user the user the connection is for
376    * @return HConnection object for <code>conf</code>
377    * @throws ZooKeeperConnectionException
378    */
379   public static HConnection createConnection(Configuration conf, User user)
380   throws IOException {
381     return createConnection(conf, false, null, user);
382   }
383 
384   /**
385    * Create a new HConnection instance using the passed <code>conf</code> instance.
386    * <p>Note: This bypasses the usual HConnection life cycle management done by
387    * {@link #getConnection(Configuration)}. The caller is responsible for
388    * calling {@link HConnection#close()} on the returned connection instance.
389    * This is the recommended way to create HConnections.
390    * {@code
391    * ExecutorService pool = ...;
392    * HConnection connection = HConnectionManager.createConnection(conf, pool);
393    * HTableInterface table = connection.getTable("mytable");
394    * table.get(...);
395    * ...
396    * table.close();
397    * connection.close();
398    * }
399    * @param conf configuration
400    * @param pool the thread pool to use for batch operation in HTables used via this HConnection
401    * @param user the user the connection is for
402    * @return HConnection object for <code>conf</code>
403    * @throws ZooKeeperConnectionException
404    */
405   public static HConnection createConnection(Configuration conf, ExecutorService pool, User user)
406   throws IOException {
407     return createConnection(conf, false, pool, user);
408   }
409 
410   @Deprecated
411   static HConnection createConnection(final Configuration conf, final boolean managed)
412       throws IOException {
413     UserProvider provider = UserProvider.instantiate(conf);
414     return createConnection(conf, managed, null, provider.getCurrent());
415   }
416 
417   @Deprecated
418   static ClusterConnection createConnection(final Configuration conf, final boolean managed,
419       final ExecutorService pool, final User user)
420   throws IOException {
421     return (ClusterConnection) ConnectionFactory.createConnection(conf, managed, pool, user);
422   }
423 
424   /**
425    * Delete connection information for the instance specified by passed configuration.
426    * If there are no more references to the designated connection connection, this method will
427    * then close connection to the zookeeper ensemble and let go of all associated resources.
428    *
429    * @param conf configuration whose identity is used to find {@link HConnection} instance.
430    * @deprecated
431    */
432   @Deprecated
433   public static void deleteConnection(Configuration conf) {
434     deleteConnection(new HConnectionKey(conf), false);
435   }
436 
437   /**
438    * Cleanup a known stale connection.
439    * This will then close connection to the zookeeper ensemble and let go of all resources.
440    *
441    * @param connection
442    * @deprecated
443    */
444   @Deprecated
445   public static void deleteStaleConnection(HConnection connection) {
446     deleteConnection(connection, true);
447   }
448 
449   /**
450    * Delete information for all connections. Close or not the connection, depending on the
451    *  staleConnection boolean and the ref count. By default, you should use it with
452    *  staleConnection to true.
453    * @deprecated
454    */
455   @Deprecated
456   public static void deleteAllConnections(boolean staleConnection) {
457     synchronized (CONNECTION_INSTANCES) {
458       Set<HConnectionKey> connectionKeys = new HashSet<HConnectionKey>();
459       connectionKeys.addAll(CONNECTION_INSTANCES.keySet());
460       for (HConnectionKey connectionKey : connectionKeys) {
461         deleteConnection(connectionKey, staleConnection);
462       }
463       CONNECTION_INSTANCES.clear();
464     }
465   }
466 
467   /**
468    * Delete information for all connections..
469    * @deprecated kept for backward compatibility, but the behavior is broken. HBASE-8983
470    */
471   @Deprecated
472   public static void deleteAllConnections() {
473     deleteAllConnections(false);
474   }
475 
476 
477   @Deprecated
478   private static void deleteConnection(HConnection connection, boolean staleConnection) {
479     synchronized (CONNECTION_INSTANCES) {
480       for (Entry<HConnectionKey, HConnectionImplementation> e: CONNECTION_INSTANCES.entrySet()) {
481         if (e.getValue() == connection) {
482           deleteConnection(e.getKey(), staleConnection);
483           break;
484         }
485       }
486     }
487   }
488 
489   @Deprecated
490   private static void deleteConnection(HConnectionKey connectionKey, boolean staleConnection) {
491     synchronized (CONNECTION_INSTANCES) {
492       HConnectionImplementation connection = CONNECTION_INSTANCES.get(connectionKey);
493       if (connection != null) {
494         connection.decCount();
495         if (connection.isZeroReference() || staleConnection) {
496           CONNECTION_INSTANCES.remove(connectionKey);
497           connection.internalClose();
498         }
499       } else {
500         LOG.error("Connection not found in the list, can't delete it "+
501           "(connection key=" + connectionKey + "). May be the key was modified?", new Exception());
502       }
503     }
504   }
505 
506 
507   /**
508    * This convenience method invokes the given {@link HConnectable#connect}
509    * implementation using a {@link HConnection} instance that lasts just for the
510    * duration of the invocation.
511    *
512    * @param <T> the return type of the connect method
513    * @param connectable the {@link HConnectable} instance
514    * @return the value returned by the connect method
515    * @throws IOException
516    */
517   @InterfaceAudience.Private
518   public static <T> T execute(HConnectable<T> connectable) throws IOException {
519     if (connectable == null || connectable.conf == null) {
520       return null;
521     }
522     Configuration conf = connectable.conf;
523     HConnection connection = getConnection(conf);
524     boolean connectSucceeded = false;
525     try {
526       T returnValue = connectable.connect(connection);
527       connectSucceeded = true;
528       return returnValue;
529     } finally {
530       try {
531         connection.close();
532       } catch (Exception e) {
533         ExceptionUtil.rethrowIfInterrupt(e);
534         if (connectSucceeded) {
535           throw new IOException("The connection to " + connection
536               + " could not be deleted.", e);
537         }
538       }
539     }
540   }
541 
542   /** Encapsulates connection to zookeeper and regionservers.*/
543   @edu.umd.cs.findbugs.annotations.SuppressWarnings(
544       value="AT_OPERATION_SEQUENCE_ON_CONCURRENT_ABSTRACTION",
545       justification="Access to the conncurrent hash map is under a lock so should be fine.")
546   static class HConnectionImplementation implements ClusterConnection, Closeable {
547     static final Log LOG = LogFactory.getLog(HConnectionImplementation.class);
548     private final long pause;
549     private final boolean useMetaReplicas;
550     private final int numTries;
551     final int rpcTimeout;
552     private NonceGenerator nonceGenerator = null;
553     private final AsyncProcess asyncProcess;
554     // single tracker per connection
555     private final ServerStatisticTracker stats;
556 
557     private volatile boolean closed;
558     private volatile boolean aborted;
559 
560     // package protected for the tests
561     ClusterStatusListener clusterStatusListener;
562 
563 
564     private final Object metaRegionLock = new Object();
565 
566     // We have a single lock for master & zk to prevent deadlocks. Having
567     //  one lock for ZK and one lock for master is not possible:
568     //  When creating a connection to master, we need a connection to ZK to get
569     //  its address. But another thread could have taken the ZK lock, and could
570     //  be waiting for the master lock => deadlock.
571     private final Object masterAndZKLock = new Object();
572 
573     private long keepZooKeeperWatcherAliveUntil = Long.MAX_VALUE;
574 
575     // thread executor shared by all HTableInterface instances created
576     // by this connection
577     private volatile ExecutorService batchPool = null;
578     // meta thread executor shared by all HTableInterface instances created
579     // by this connection
580     private volatile ExecutorService metaLookupPool = null;
581     private volatile boolean cleanupPool = false;
582 
583     private final Configuration conf;
584 
585     // cache the configuration value for tables so that we can avoid calling
586     // the expensive Configuration to fetch the value multiple times.
587     private final ConnectionConfiguration connectionConfig;
588 
589     // Client rpc instance.
590     private RpcClient rpcClient;
591 
592     private MetaCache metaCache = new MetaCache();
593 
594     private int refCount;
595 
596     // indicates whether this connection's life cycle is managed (by us)
597     private boolean managed;
598 
599     private User user;
600 
601     private RpcRetryingCallerFactory rpcCallerFactory;
602 
603     private RpcControllerFactory rpcControllerFactory;
604 
605     private final RetryingCallerInterceptor interceptor;
606 
607     /**
608      * Cluster registry of basic info such as clusterid and meta region location.
609      */
610      Registry registry;
611 
612     private final ClientBackoffPolicy backoffPolicy;
613 
614      HConnectionImplementation(Configuration conf, boolean managed) throws IOException {
615        this(conf, managed, null, null);
616      }
617 
618     /**
619      * constructor
620      * @param conf Configuration object
621      * @param managed If true, does not do full shutdown on close; i.e. cleanup of connection
622      * to zk and shutdown of all services; we just close down the resources this connection was
623      * responsible for and decrement usage counters.  It is up to the caller to do the full
624      * cleanup.  It is set when we want have connection sharing going on -- reuse of zk connection,
625      * and cached region locations, established regionserver connections, etc.  When connections
626      * are shared, we have reference counting going on and will only do full cleanup when no more
627      * users of an HConnectionImplementation instance.
628      */
629     HConnectionImplementation(Configuration conf, boolean managed,
630         ExecutorService pool, User user) throws IOException {
631       this(conf);
632       this.user = user;
633       this.batchPool = pool;
634       this.managed = managed;
635       this.registry = setupRegistry();
636       retrieveClusterId();
637 
638       this.rpcClient = RpcClientFactory.createClient(this.conf, this.clusterId);
639       this.rpcControllerFactory = RpcControllerFactory.instantiate(conf);
640 
641       // Do we publish the status?
642       boolean shouldListen = conf.getBoolean(HConstants.STATUS_PUBLISHED,
643           HConstants.STATUS_PUBLISHED_DEFAULT);
644       Class<? extends ClusterStatusListener.Listener> listenerClass =
645           conf.getClass(ClusterStatusListener.STATUS_LISTENER_CLASS,
646               ClusterStatusListener.DEFAULT_STATUS_LISTENER_CLASS,
647               ClusterStatusListener.Listener.class);
648       if (shouldListen) {
649         if (listenerClass == null) {
650           LOG.warn(HConstants.STATUS_PUBLISHED + " is true, but " +
651               ClusterStatusListener.STATUS_LISTENER_CLASS + " is not set - not listening status");
652         } else {
653           clusterStatusListener = new ClusterStatusListener(
654               new ClusterStatusListener.DeadServerHandler() {
655                 @Override
656                 public void newDead(ServerName sn) {
657                   clearCaches(sn);
658                   rpcClient.cancelConnections(sn);
659                 }
660               }, conf, listenerClass);
661         }
662       }
663     }
664 
665     /**
666      * For tests.
667      */
668     protected HConnectionImplementation(Configuration conf) {
669       this.conf = conf;
670       this.connectionConfig = new ConnectionConfiguration(conf);
671       this.closed = false;
672       this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
673           HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
674       this.useMetaReplicas = conf.getBoolean(HConstants.USE_META_REPLICAS,
675           HConstants.DEFAULT_USE_META_REPLICAS);
676       this.numTries = connectionConfig.getRetriesNumber();
677       this.rpcTimeout = conf.getInt(
678           HConstants.HBASE_RPC_TIMEOUT_KEY,
679           HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
680       if (conf.getBoolean(CLIENT_NONCES_ENABLED_KEY, true)) {
681         synchronized (nonceGeneratorCreateLock) {
682           if (ConnectionManager.nonceGenerator == null) {
683             ConnectionManager.nonceGenerator = new PerClientRandomNonceGenerator();
684           }
685           this.nonceGenerator = ConnectionManager.nonceGenerator;
686         }
687       } else {
688         this.nonceGenerator = new NoNonceGenerator();
689       }
690       stats = ServerStatisticTracker.create(conf);
691       this.asyncProcess = createAsyncProcess(this.conf);
692       this.interceptor = (new RetryingCallerInterceptorFactory(conf)).build();
693       this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf, interceptor, this.stats);
694       this.backoffPolicy = ClientBackoffPolicyFactory.create(conf);
695     }
696 
697     @Override
698     public HTableInterface getTable(String tableName) throws IOException {
699       return getTable(TableName.valueOf(tableName));
700     }
701 
702     @Override
703     public HTableInterface getTable(byte[] tableName) throws IOException {
704       return getTable(TableName.valueOf(tableName));
705     }
706 
707     @Override
708     public HTableInterface getTable(TableName tableName) throws IOException {
709       return getTable(tableName, getBatchPool());
710     }
711 
712     @Override
713     public HTableInterface getTable(String tableName, ExecutorService pool) throws IOException {
714       return getTable(TableName.valueOf(tableName), pool);
715     }
716 
717     @Override
718     public HTableInterface getTable(byte[] tableName, ExecutorService pool) throws IOException {
719       return getTable(TableName.valueOf(tableName), pool);
720     }
721 
722     @Override
723     public HTableInterface getTable(TableName tableName, ExecutorService pool) throws IOException {
724       if (managed) {
725         throw new NeedUnmanagedConnectionException();
726       }
727       return new HTable(tableName, this, connectionConfig, rpcCallerFactory, rpcControllerFactory, pool);
728     }
729 
730     @Override
731     public BufferedMutator getBufferedMutator(BufferedMutatorParams params) {
732       if (params.getTableName() == null) {
733         throw new IllegalArgumentException("TableName cannot be null.");
734       }
735       if (params.getPool() == null) {
736         params.pool(HTable.getDefaultExecutor(getConfiguration()));
737       }
738       if (params.getWriteBufferSize() == BufferedMutatorParams.UNSET) {
739         params.writeBufferSize(connectionConfig.getWriteBufferSize());
740       }
741       if (params.getMaxKeyValueSize() == BufferedMutatorParams.UNSET) {
742         params.maxKeyValueSize(connectionConfig.getMaxKeyValueSize());
743       }
744       return new BufferedMutatorImpl(this, rpcCallerFactory, rpcControllerFactory, params);
745     }
746 
747     @Override
748     public BufferedMutator getBufferedMutator(TableName tableName) {
749       return getBufferedMutator(new BufferedMutatorParams(tableName));
750     }
751 
752     @Override
753     public RegionLocator getRegionLocator(TableName tableName) throws IOException {
754       return new HRegionLocator(tableName, this);
755     }
756 
757     @Override
758     public Admin getAdmin() throws IOException {
759       if (managed) {
760         throw new NeedUnmanagedConnectionException();
761       }
762       return new HBaseAdmin(this);
763     }
764 
765     private ExecutorService getBatchPool() {
766       if (batchPool == null) {
767         synchronized (this) {
768           if (batchPool == null) {
769             this.batchPool = getThreadPool(conf.getInt("hbase.hconnection.threads.max", 256),
770                 conf.getInt("hbase.hconnection.threads.core", 256), "-shared-", null);
771             this.cleanupPool = true;
772           }
773         }
774       }
775       return this.batchPool;
776     }
777 
778     private ExecutorService getThreadPool(int maxThreads, int coreThreads, String nameHint,
779         BlockingQueue<Runnable> passedWorkQueue) {
780       // shared HTable thread executor not yet initialized
781       if (maxThreads == 0) {
782         maxThreads = Runtime.getRuntime().availableProcessors() * 8;
783       }
784       if (coreThreads == 0) {
785         coreThreads = Runtime.getRuntime().availableProcessors() * 8;
786       }
787       long keepAliveTime = conf.getLong("hbase.hconnection.threads.keepalivetime", 60);
788       BlockingQueue<Runnable> workQueue = passedWorkQueue;
789       if (workQueue == null) {
790         workQueue =
791           new LinkedBlockingQueue<Runnable>(maxThreads *
792               conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS,
793                   HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS));
794       }
795       ThreadPoolExecutor tpe = new ThreadPoolExecutor(
796           coreThreads,
797           maxThreads,
798           keepAliveTime,
799           TimeUnit.SECONDS,
800           workQueue,
801           Threads.newDaemonThreadFactory(toString() + nameHint));
802       tpe.allowCoreThreadTimeOut(true);
803       return tpe;
804     }
805 
806     private ExecutorService getMetaLookupPool() {
807       if (this.metaLookupPool == null) {
808         synchronized (this) {
809           if (this.metaLookupPool == null) {
810             //Some of the threads would be used for meta replicas
811             //To start with, threads.max.core threads can hit the meta (including replicas).
812             //After that, requests will get queued up in the passed queue, and only after
813             //the queue is full, a new thread will be started
814             this.metaLookupPool = getThreadPool(
815                conf.getInt("hbase.hconnection.meta.lookup.threads.max", 128),
816                conf.getInt("hbase.hconnection.meta.lookup.threads.core", 10),
817              "-metaLookup-shared-", new LinkedBlockingQueue<Runnable>());
818           }
819         }
820       }
821       return this.metaLookupPool;
822     }
823 
824     protected ExecutorService getCurrentMetaLookupPool() {
825       return metaLookupPool;
826     }
827 
828     protected ExecutorService getCurrentBatchPool() {
829       return batchPool;
830     }
831 
832     private void shutdownPools() {
833       if (this.cleanupPool && this.batchPool != null && !this.batchPool.isShutdown()) {
834         shutdownBatchPool(this.batchPool);
835       }
836       if (this.metaLookupPool != null && !this.metaLookupPool.isShutdown()) {
837         shutdownBatchPool(this.metaLookupPool);
838       }
839     }
840 
841     private void shutdownBatchPool(ExecutorService pool) {
842       pool.shutdown();
843       try {
844         if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
845           pool.shutdownNow();
846         }
847       } catch (InterruptedException e) {
848         pool.shutdownNow();
849       }
850     }
851 
852     /**
853      * @return The cluster registry implementation to use.
854      * @throws IOException
855      */
856     private Registry setupRegistry() throws IOException {
857       return RegistryFactory.getRegistry(this);
858     }
859 
860     /**
861      * For tests only.
862      */
863     @VisibleForTesting
864     RpcClient getRpcClient() {
865       return rpcClient;
866     }
867 
868     /**
869      * An identifier that will remain the same for a given connection.
870      */
871     @Override
872     public String toString(){
873       return "hconnection-0x" + Integer.toHexString(hashCode());
874     }
875 
876     protected String clusterId = null;
877 
878     void retrieveClusterId() {
879       if (clusterId != null) return;
880       this.clusterId = this.registry.getClusterId();
881       if (clusterId == null) {
882         clusterId = HConstants.CLUSTER_ID_DEFAULT;
883         LOG.debug("clusterid came back null, using default " + clusterId);
884       }
885     }
886 
887     @Override
888     public Configuration getConfiguration() {
889       return this.conf;
890     }
891 
892     private void checkIfBaseNodeAvailable(ZooKeeperWatcher zkw)
893       throws MasterNotRunningException {
894       String errorMsg;
895       try {
896         if (ZKUtil.checkExists(zkw, zkw.baseZNode) == -1) {
897           errorMsg = "The node " + zkw.baseZNode+" is not in ZooKeeper. "
898             + "It should have been written by the master. "
899             + "Check the value configured in 'zookeeper.znode.parent'. "
900             + "There could be a mismatch with the one configured in the master.";
901           LOG.error(errorMsg);
902           throw new MasterNotRunningException(errorMsg);
903         }
904       } catch (KeeperException e) {
905         errorMsg = "Can't get connection to ZooKeeper: " + e.getMessage();
906         LOG.error(errorMsg);
907         throw new MasterNotRunningException(errorMsg, e);
908       }
909     }
910 
911     /**
912      * @return true if the master is running, throws an exception otherwise
913      * @throws MasterNotRunningException - if the master is not running
914      * @throws ZooKeeperConnectionException
915      */
916     @Deprecated
917     @Override
918     public boolean isMasterRunning()
919     throws MasterNotRunningException, ZooKeeperConnectionException {
920       // When getting the master connection, we check it's running,
921       // so if there is no exception, it means we've been able to get a
922       // connection on a running master
923       MasterKeepAliveConnection m = getKeepAliveMasterService();
924       m.close();
925       return true;
926     }
927 
928     @Override
929     public HRegionLocation getRegionLocation(final TableName tableName,
930         final byte [] row, boolean reload)
931     throws IOException {
932       return reload? relocateRegion(tableName, row): locateRegion(tableName, row);
933     }
934 
935     @Override
936     public HRegionLocation getRegionLocation(final byte[] tableName,
937         final byte [] row, boolean reload)
938     throws IOException {
939       return getRegionLocation(TableName.valueOf(tableName), row, reload);
940     }
941 
942     @Override
943     public boolean isTableEnabled(TableName tableName) throws IOException {
944       return this.registry.isTableOnlineState(tableName, true);
945     }
946 
947     @Override
948     public boolean isTableEnabled(byte[] tableName) throws IOException {
949       return isTableEnabled(TableName.valueOf(tableName));
950     }
951 
952     @Override
953     public boolean isTableDisabled(TableName tableName) throws IOException {
954       return this.registry.isTableOnlineState(tableName, false);
955     }
956 
957     @Override
958     public boolean isTableDisabled(byte[] tableName) throws IOException {
959       return isTableDisabled(TableName.valueOf(tableName));
960     }
961 
962     @Override
963     public boolean isTableAvailable(final TableName tableName) throws IOException {
964       final AtomicBoolean available = new AtomicBoolean(true);
965       final AtomicInteger regionCount = new AtomicInteger(0);
966       MetaScannerVisitor visitor = new MetaScannerVisitorBase() {
967         @Override
968         public boolean processRow(Result row) throws IOException {
969           HRegionInfo info = MetaScanner.getHRegionInfo(row);
970           if (info != null && !info.isSplitParent()) {
971             if (tableName.equals(info.getTable())) {
972               ServerName server = HRegionInfo.getServerName(row);
973               if (server == null) {
974                 available.set(false);
975                 return false;
976               }
977               regionCount.incrementAndGet();
978             } else if (tableName.compareTo(info.getTable()) < 0) {
979               // Return if we are done with the current table
980               return false;
981             }
982           }
983           return true;
984         }
985       };
986       MetaScanner.metaScan(this, visitor, tableName);
987       return available.get() && (regionCount.get() > 0);
988     }
989 
990     @Override
991     public boolean isTableAvailable(final byte[] tableName) throws IOException {
992       return isTableAvailable(TableName.valueOf(tableName));
993     }
994 
995     @Override
996     public boolean isTableAvailable(final TableName tableName, final byte[][] splitKeys)
997         throws IOException {
998       final AtomicBoolean available = new AtomicBoolean(true);
999       final AtomicInteger regionCount = new AtomicInteger(0);
1000       MetaScannerVisitor visitor = new MetaScannerVisitorBase() {
1001         @Override
1002         public boolean processRow(Result row) throws IOException {
1003           HRegionInfo info = MetaScanner.getHRegionInfo(row);
1004           if (info != null && !info.isSplitParent()) {
1005             if (tableName.equals(info.getTable())) {
1006               ServerName server = HRegionInfo.getServerName(row);
1007               if (server == null) {
1008                 available.set(false);
1009                 return false;
1010               }
1011               if (!Bytes.equals(info.getStartKey(), HConstants.EMPTY_BYTE_ARRAY)) {
1012                 for (byte[] splitKey : splitKeys) {
1013                   // Just check if the splitkey is available
1014                   if (Bytes.equals(info.getStartKey(), splitKey)) {
1015                     regionCount.incrementAndGet();
1016                     break;
1017                   }
1018                 }
1019               } else {
1020                 // Always empty start row should be counted
1021                 regionCount.incrementAndGet();
1022               }
1023             } else if (tableName.compareTo(info.getTable()) < 0) {
1024               // Return if we are done with the current table
1025               return false;
1026             }
1027           }
1028           return true;
1029         }
1030       };
1031       MetaScanner.metaScan(this, visitor, tableName);
1032       // +1 needs to be added so that the empty start row is also taken into account
1033       return available.get() && (regionCount.get() == splitKeys.length + 1);
1034     }
1035 
1036     @Override
1037     public boolean isTableAvailable(final byte[] tableName, final byte[][] splitKeys)
1038         throws IOException {
1039       return isTableAvailable(TableName.valueOf(tableName), splitKeys);
1040     }
1041 
1042     @Override
1043     public HRegionLocation locateRegion(final byte[] regionName) throws IOException {
1044       RegionLocations locations = locateRegion(HRegionInfo.getTable(regionName),
1045         HRegionInfo.getStartKey(regionName), false, true);
1046       return locations == null ? null : locations.getRegionLocation();
1047     }
1048 
1049     @Override
1050     public boolean isDeadServer(ServerName sn) {
1051       if (clusterStatusListener == null) {
1052         return false;
1053       } else {
1054         return clusterStatusListener.isDeadServer(sn);
1055       }
1056     }
1057 
1058     @Override
1059     public List<HRegionLocation> locateRegions(final TableName tableName)
1060     throws IOException {
1061       return locateRegions (tableName, false, true);
1062     }
1063 
1064     @Override
1065     public List<HRegionLocation> locateRegions(final byte[] tableName)
1066     throws IOException {
1067       return locateRegions(TableName.valueOf(tableName));
1068     }
1069 
1070     @Override
1071     public List<HRegionLocation> locateRegions(final TableName tableName,
1072         final boolean useCache, final boolean offlined) throws IOException {
1073       NavigableMap<HRegionInfo, ServerName> regions = MetaScanner.allTableRegions(this, tableName);
1074       final List<HRegionLocation> locations = new ArrayList<HRegionLocation>();
1075       for (HRegionInfo regionInfo : regions.keySet()) {
1076         RegionLocations list = locateRegion(tableName, regionInfo.getStartKey(), useCache, true);
1077         if (list != null) {
1078           for (HRegionLocation loc : list.getRegionLocations()) {
1079             if (loc != null) {
1080               locations.add(loc);
1081             }
1082           }
1083         }
1084       }
1085       return locations;
1086     }
1087 
1088     @Override
1089     public List<HRegionLocation> locateRegions(final byte[] tableName,
1090        final boolean useCache, final boolean offlined) throws IOException {
1091       return locateRegions(TableName.valueOf(tableName), useCache, offlined);
1092     }
1093 
1094     @Override
1095     public HRegionLocation locateRegion(
1096         final TableName tableName, final byte[] row) throws IOException{
1097       RegionLocations locations = locateRegion(tableName, row, true, true);
1098       return locations == null ? null : locations.getRegionLocation();
1099     }
1100 
1101     @Override
1102     public HRegionLocation locateRegion(final byte[] tableName,
1103         final byte [] row)
1104     throws IOException{
1105       return locateRegion(TableName.valueOf(tableName), row);
1106     }
1107 
1108     @Override
1109     public HRegionLocation relocateRegion(final TableName tableName,
1110         final byte [] row) throws IOException{
1111       RegionLocations locations =  relocateRegion(tableName, row,
1112         RegionReplicaUtil.DEFAULT_REPLICA_ID);
1113       return locations == null ? null :
1114         locations.getRegionLocation(RegionReplicaUtil.DEFAULT_REPLICA_ID);
1115     }
1116 
1117     @Override
1118     public RegionLocations relocateRegion(final TableName tableName,
1119         final byte [] row, int replicaId) throws IOException{
1120       // Since this is an explicit request not to use any caching, finding
1121       // disabled tables should not be desirable.  This will ensure that an exception is thrown when
1122       // the first time a disabled table is interacted with.
1123       if (!tableName.equals(TableName.META_TABLE_NAME) && isTableDisabled(tableName)) {
1124         throw new TableNotEnabledException(tableName.getNameAsString() + " is disabled.");
1125       }
1126 
1127       return locateRegion(tableName, row, false, true, replicaId);
1128     }
1129 
1130     @Override
1131     public HRegionLocation relocateRegion(final byte[] tableName,
1132         final byte [] row) throws IOException {
1133       return relocateRegion(TableName.valueOf(tableName), row);
1134     }
1135 
1136     @Override
1137     public RegionLocations locateRegion(final TableName tableName,
1138       final byte [] row, boolean useCache, boolean retry)
1139     throws IOException {
1140       return locateRegion(tableName, row, useCache, retry, RegionReplicaUtil.DEFAULT_REPLICA_ID);
1141     }
1142 
1143     @Override
1144     public RegionLocations locateRegion(final TableName tableName,
1145       final byte [] row, boolean useCache, boolean retry, int replicaId)
1146     throws IOException {
1147       if (this.closed) throw new IOException(toString() + " closed");
1148       if (tableName== null || tableName.getName().length == 0) {
1149         throw new IllegalArgumentException(
1150             "table name cannot be null or zero length");
1151       }
1152       if (tableName.equals(TableName.META_TABLE_NAME)) {
1153         return locateMeta(tableName, useCache, replicaId);
1154       } else {
1155         // Region not in the cache - have to go to the meta RS
1156         return locateRegionInMeta(tableName, row, useCache, retry, replicaId);
1157       }
1158     }
1159 
1160     private RegionLocations locateMeta(final TableName tableName,
1161         boolean useCache, int replicaId) throws IOException {
1162       // HBASE-10785: We cache the location of the META itself, so that we are not overloading
1163       // zookeeper with one request for every region lookup. We cache the META with empty row
1164       // key in MetaCache.
1165       byte[] metaCacheKey = HConstants.EMPTY_START_ROW; // use byte[0] as the row for meta
1166       RegionLocations locations = null;
1167       if (useCache) {
1168         locations = getCachedLocation(tableName, metaCacheKey);
1169         if (locations != null && locations.getRegionLocation(replicaId) != null) {
1170           return locations;
1171         }
1172       }
1173 
1174       // only one thread should do the lookup.
1175       synchronized (metaRegionLock) {
1176         // Check the cache again for a hit in case some other thread made the
1177         // same query while we were waiting on the lock.
1178         if (useCache) {
1179           locations = getCachedLocation(tableName, metaCacheKey);
1180           if (locations != null && locations.getRegionLocation(replicaId) != null) {
1181             return locations;
1182           }
1183         }
1184 
1185         // Look up from zookeeper
1186         locations = this.registry.getMetaRegionLocation();
1187         if (locations != null) {
1188           cacheLocation(tableName, locations);
1189         }
1190       }
1191       return locations;
1192     }
1193 
1194     /*
1195       * Search the hbase:meta table for the HRegionLocation
1196       * info that contains the table and row we're seeking.
1197       */
1198     private RegionLocations locateRegionInMeta(TableName tableName, byte[] row,
1199                    boolean useCache, boolean retry, int replicaId) throws IOException {
1200 
1201       // If we are supposed to be using the cache, look in the cache to see if
1202       // we already have the region.
1203       if (useCache) {
1204         RegionLocations locations = getCachedLocation(tableName, row);
1205         if (locations != null && locations.getRegionLocation(replicaId) != null) {
1206           return locations;
1207         }
1208       }
1209 
1210       // build the key of the meta region we should be looking for.
1211       // the extra 9's on the end are necessary to allow "exact" matches
1212       // without knowing the precise region names.
1213       byte[] metaKey = HRegionInfo.createRegionName(tableName, row, HConstants.NINES, false);
1214 
1215       Scan s = new Scan();
1216       s.setReversed(true);
1217       s.setStartRow(metaKey);
1218       s.setSmall(true);
1219       s.setCaching(1);
1220       if (this.useMetaReplicas) {
1221         s.setConsistency(Consistency.TIMELINE);
1222       }
1223 
1224       int localNumRetries = (retry ? numTries : 1);
1225 
1226       for (int tries = 0; true; tries++) {
1227         if (tries >= localNumRetries) {
1228           throw new NoServerForRegionException("Unable to find region for "
1229               + Bytes.toStringBinary(row) + " in " + tableName +
1230               " after " + localNumRetries + " tries.");
1231         }
1232         if (useCache) {
1233           RegionLocations locations = getCachedLocation(tableName, row);
1234           if (locations != null && locations.getRegionLocation(replicaId) != null) {
1235             return locations;
1236           }
1237         } else {
1238           // If we are not supposed to be using the cache, delete any existing cached location
1239           // so it won't interfere.
1240           metaCache.clearCache(tableName, row);
1241         }
1242 
1243         // Query the meta region
1244         try {
1245           Result regionInfoRow = null;
1246           ReversedClientScanner rcs = null;
1247           try {
1248             rcs = new ClientSmallReversedScanner(conf, s, TableName.META_TABLE_NAME, this,
1249               rpcCallerFactory, rpcControllerFactory, getMetaLookupPool(), 0);
1250             regionInfoRow = rcs.next();
1251           } finally {
1252             if (rcs != null) {
1253               rcs.close();
1254             }
1255           }
1256 
1257           if (regionInfoRow == null) {
1258             throw new TableNotFoundException(tableName);
1259           }
1260 
1261           // convert the row result into the HRegionLocation we need!
1262           RegionLocations locations = MetaTableAccessor.getRegionLocations(regionInfoRow);
1263           if (locations == null || locations.getRegionLocation(replicaId) == null) {
1264             throw new IOException("HRegionInfo was null in " +
1265               tableName + ", row=" + regionInfoRow);
1266           }
1267           HRegionInfo regionInfo = locations.getRegionLocation(replicaId).getRegionInfo();
1268           if (regionInfo == null) {
1269             throw new IOException("HRegionInfo was null or empty in " +
1270               TableName.META_TABLE_NAME + ", row=" + regionInfoRow);
1271           }
1272 
1273           // possible we got a region of a different table...
1274           if (!regionInfo.getTable().equals(tableName)) {
1275             throw new TableNotFoundException(
1276                   "Table '" + tableName + "' was not found, got: " +
1277                   regionInfo.getTable() + ".");
1278           }
1279           if (regionInfo.isSplit()) {
1280             throw new RegionOfflineException("the only available region for" +
1281               " the required row is a split parent," +
1282               " the daughters should be online soon: " +
1283               regionInfo.getRegionNameAsString());
1284           }
1285           if (regionInfo.isOffline()) {
1286             throw new RegionOfflineException("the region is offline, could" +
1287               " be caused by a disable table call: " +
1288               regionInfo.getRegionNameAsString());
1289           }
1290 
1291           ServerName serverName = locations.getRegionLocation(replicaId).getServerName();
1292           if (serverName == null) {
1293             throw new NoServerForRegionException("No server address listed " +
1294               "in " + TableName.META_TABLE_NAME + " for region " +
1295               regionInfo.getRegionNameAsString() + " containing row " +
1296               Bytes.toStringBinary(row));
1297           }
1298 
1299           if (isDeadServer(serverName)){
1300             throw new RegionServerStoppedException("hbase:meta says the region "+
1301                 regionInfo.getRegionNameAsString()+" is managed by the server " + serverName +
1302                 ", but it is dead.");
1303           }
1304           // Instantiate the location
1305           cacheLocation(tableName, locations);
1306           return locations;
1307         } catch (TableNotFoundException e) {
1308           // if we got this error, probably means the table just plain doesn't
1309           // exist. rethrow the error immediately. this should always be coming
1310           // from the HTable constructor.
1311           throw e;
1312         } catch (IOException e) {
1313           ExceptionUtil.rethrowIfInterrupt(e);
1314 
1315           if (e instanceof RemoteException) {
1316             e = ((RemoteException)e).unwrapRemoteException();
1317           }
1318           if (tries < localNumRetries - 1) {
1319             if (LOG.isDebugEnabled()) {
1320               LOG.debug("locateRegionInMeta parentTable=" +
1321                   TableName.META_TABLE_NAME + ", metaLocation=" +
1322                 ", attempt=" + tries + " of " +
1323                 localNumRetries + " failed; retrying after sleep of " +
1324                 ConnectionUtils.getPauseTime(this.pause, tries) + " because: " + e.getMessage());
1325             }
1326           } else {
1327             throw e;
1328           }
1329           // Only relocate the parent region if necessary
1330           if(!(e instanceof RegionOfflineException ||
1331               e instanceof NoServerForRegionException)) {
1332             relocateRegion(TableName.META_TABLE_NAME, metaKey, replicaId);
1333           }
1334         }
1335         try{
1336           Thread.sleep(ConnectionUtils.getPauseTime(this.pause, tries));
1337         } catch (InterruptedException e) {
1338           throw new InterruptedIOException("Giving up trying to location region in " +
1339             "meta: thread is interrupted.");
1340         }
1341       }
1342     }
1343 
1344     /**
1345      * Put a newly discovered HRegionLocation into the cache.
1346      * @param tableName The table name.
1347      * @param location the new location
1348      */
1349     @Override
1350     public void cacheLocation(final TableName tableName, final RegionLocations location) {
1351       metaCache.cacheLocation(tableName, location);
1352     }
1353 
1354     /**
1355      * Search the cache for a location that fits our table and row key.
1356      * Return null if no suitable region is located.
1357      *
1358      * @param tableName
1359      * @param row
1360      * @return Null or region location found in cache.
1361      */
1362     RegionLocations getCachedLocation(final TableName tableName,
1363         final byte [] row) {
1364       return metaCache.getCachedLocation(tableName, row);
1365     }
1366 
1367     public void clearRegionCache(final TableName tableName, byte[] row) {
1368       metaCache.clearCache(tableName, row);
1369     }
1370 
1371     /*
1372      * Delete all cached entries of a table that maps to a specific location.
1373      */
1374     @Override
1375     public void clearCaches(final ServerName serverName) {
1376       metaCache.clearCache(serverName);
1377     }
1378 
1379     @Override
1380     public void clearRegionCache() {
1381       metaCache.clearCache();
1382     }
1383 
1384     @Override
1385     public void clearRegionCache(final TableName tableName) {
1386       metaCache.clearCache(tableName);
1387     }
1388 
1389     @Override
1390     public void clearRegionCache(final byte[] tableName) {
1391       clearRegionCache(TableName.valueOf(tableName));
1392     }
1393 
1394     /**
1395      * Put a newly discovered HRegionLocation into the cache.
1396      * @param tableName The table name.
1397      * @param source the source of the new location, if it's not coming from meta
1398      * @param location the new location
1399      */
1400     private void cacheLocation(final TableName tableName, final ServerName source,
1401         final HRegionLocation location) {
1402       metaCache.cacheLocation(tableName, source, location);
1403     }
1404 
1405     // Map keyed by service name + regionserver to service stub implementation
1406     private final ConcurrentHashMap<String, Object> stubs =
1407       new ConcurrentHashMap<String, Object>();
1408     // Map of locks used creating service stubs per regionserver.
1409     private final ConcurrentHashMap<String, String> connectionLock =
1410       new ConcurrentHashMap<String, String>();
1411 
1412     /**
1413      * State of the MasterService connection/setup.
1414      */
1415     static class MasterServiceState {
1416       HConnection connection;
1417       MasterService.BlockingInterface stub;
1418       int userCount;
1419 
1420       MasterServiceState (final HConnection connection) {
1421         super();
1422         this.connection = connection;
1423       }
1424 
1425       @Override
1426       public String toString() {
1427         return "MasterService";
1428       }
1429 
1430       Object getStub() {
1431         return this.stub;
1432       }
1433 
1434       void clearStub() {
1435         this.stub = null;
1436       }
1437 
1438       boolean isMasterRunning() throws ServiceException {
1439         IsMasterRunningResponse response =
1440           this.stub.isMasterRunning(null, RequestConverter.buildIsMasterRunningRequest());
1441         return response != null? response.getIsMasterRunning(): false;
1442       }
1443     }
1444 
1445     /**
1446      * Makes a client-side stub for master services. Sub-class to specialize.
1447      * Depends on hosting class so not static.  Exists so we avoid duplicating a bunch of code
1448      * when setting up the MasterMonitorService and MasterAdminService.
1449      */
1450     abstract class StubMaker {
1451       /**
1452        * Returns the name of the service stub being created.
1453        */
1454       protected abstract String getServiceName();
1455 
1456       /**
1457        * Make stub and cache it internal so can be used later doing the isMasterRunning call.
1458        * @param channel
1459        */
1460       protected abstract Object makeStub(final BlockingRpcChannel channel);
1461 
1462       /**
1463        * Once setup, check it works by doing isMasterRunning check.
1464        * @throws ServiceException
1465        */
1466       protected abstract void isMasterRunning() throws ServiceException;
1467 
1468       /**
1469        * Create a stub. Try once only.  It is not typed because there is no common type to
1470        * protobuf services nor their interfaces.  Let the caller do appropriate casting.
1471        * @return A stub for master services.
1472        * @throws IOException
1473        * @throws KeeperException
1474        * @throws ServiceException
1475        */
1476       private Object makeStubNoRetries() throws IOException, KeeperException, ServiceException {
1477         ZooKeeperKeepAliveConnection zkw;
1478         try {
1479           zkw = getKeepAliveZooKeeperWatcher();
1480         } catch (IOException e) {
1481           ExceptionUtil.rethrowIfInterrupt(e);
1482           throw new ZooKeeperConnectionException("Can't connect to ZooKeeper", e);
1483         }
1484         try {
1485           checkIfBaseNodeAvailable(zkw);
1486           ServerName sn = MasterAddressTracker.getMasterAddress(zkw);
1487           if (sn == null) {
1488             String msg = "ZooKeeper available but no active master location found";
1489             LOG.info(msg);
1490             throw new MasterNotRunningException(msg);
1491           }
1492           if (isDeadServer(sn)) {
1493             throw new MasterNotRunningException(sn + " is dead.");
1494           }
1495           // Use the security info interface name as our stub key
1496           String key = getStubKey(getServiceName(), sn.getHostname(), sn.getPort());
1497           connectionLock.putIfAbsent(key, key);
1498           Object stub = null;
1499           synchronized (connectionLock.get(key)) {
1500             stub = stubs.get(key);
1501             if (stub == null) {
1502               BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(sn, user, rpcTimeout);
1503               stub = makeStub(channel);
1504               isMasterRunning();
1505               stubs.put(key, stub);
1506             }
1507           }
1508           return stub;
1509         } finally {
1510           zkw.close();
1511         }
1512       }
1513 
1514       /**
1515        * Create a stub against the master.  Retry if necessary.
1516        * @return A stub to do <code>intf</code> against the master
1517        * @throws MasterNotRunningException
1518        */
1519       Object makeStub() throws IOException {
1520         // The lock must be at the beginning to prevent multiple master creations
1521         //  (and leaks) in a multithread context
1522         synchronized (masterAndZKLock) {
1523           Exception exceptionCaught = null;
1524           if (!closed) {
1525             try {
1526               return makeStubNoRetries();
1527             } catch (IOException e) {
1528               exceptionCaught = e;
1529             } catch (KeeperException e) {
1530               exceptionCaught = e;
1531             } catch (ServiceException e) {
1532               exceptionCaught = e;
1533             }
1534 
1535             throw new MasterNotRunningException(exceptionCaught);
1536           } else {
1537             throw new DoNotRetryIOException("Connection was closed while trying to get master");
1538           }
1539         }
1540       }
1541     }
1542 
1543     /**
1544      * Class to make a MasterServiceStubMaker stub.
1545      */
1546     class MasterServiceStubMaker extends StubMaker {
1547       private MasterService.BlockingInterface stub;
1548       @Override
1549       protected String getServiceName() {
1550         return MasterService.getDescriptor().getName();
1551       }
1552 
1553       @Override
1554       MasterService.BlockingInterface makeStub() throws IOException {
1555         return (MasterService.BlockingInterface)super.makeStub();
1556       }
1557 
1558       @Override
1559       protected Object makeStub(BlockingRpcChannel channel) {
1560         this.stub = MasterService.newBlockingStub(channel);
1561         return this.stub;
1562       }
1563 
1564       @Override
1565       protected void isMasterRunning() throws ServiceException {
1566         this.stub.isMasterRunning(null, RequestConverter.buildIsMasterRunningRequest());
1567       }
1568     }
1569 
1570     @Override
1571     public AdminService.BlockingInterface getAdmin(final ServerName serverName)
1572         throws IOException {
1573       return getAdmin(serverName, false);
1574     }
1575 
1576     @Override
1577     // Nothing is done w/ the 'master' parameter.  It is ignored.
1578     public AdminService.BlockingInterface getAdmin(final ServerName serverName,
1579       final boolean master)
1580     throws IOException {
1581       if (isDeadServer(serverName)) {
1582         throw new RegionServerStoppedException(serverName + " is dead.");
1583       }
1584       String key = getStubKey(AdminService.BlockingInterface.class.getName(),
1585           serverName.getHostname(), serverName.getPort());
1586       this.connectionLock.putIfAbsent(key, key);
1587       AdminService.BlockingInterface stub = null;
1588       synchronized (this.connectionLock.get(key)) {
1589         stub = (AdminService.BlockingInterface)this.stubs.get(key);
1590         if (stub == null) {
1591           BlockingRpcChannel channel =
1592               this.rpcClient.createBlockingRpcChannel(serverName, user, rpcTimeout);
1593           stub = AdminService.newBlockingStub(channel);
1594           this.stubs.put(key, stub);
1595         }
1596       }
1597       return stub;
1598     }
1599 
1600     @Override
1601     public ClientService.BlockingInterface getClient(final ServerName sn)
1602     throws IOException {
1603       if (isDeadServer(sn)) {
1604         throw new RegionServerStoppedException(sn + " is dead.");
1605       }
1606       String key = getStubKey(ClientService.BlockingInterface.class.getName(), sn.getHostname(),
1607           sn.getPort());
1608       this.connectionLock.putIfAbsent(key, key);
1609       ClientService.BlockingInterface stub = null;
1610       synchronized (this.connectionLock.get(key)) {
1611         stub = (ClientService.BlockingInterface)this.stubs.get(key);
1612         if (stub == null) {
1613           BlockingRpcChannel channel =
1614               this.rpcClient.createBlockingRpcChannel(sn, user, rpcTimeout);
1615           stub = ClientService.newBlockingStub(channel);
1616           // In old days, after getting stub/proxy, we'd make a call.  We are not doing that here.
1617           // Just fail on first actual call rather than in here on setup.
1618           this.stubs.put(key, stub);
1619         }
1620       }
1621       return stub;
1622     }
1623 
1624     static String getStubKey(final String serviceName, final String rsHostname, int port) {
1625       // Sometimes, servers go down and they come back up with the same hostname but a different
1626       // IP address. Force a resolution of the rsHostname by trying to instantiate an
1627       // InetSocketAddress, and this way we will rightfully get a new stubKey.
1628       // Also, include the hostname in the key so as to take care of those cases where the
1629       // DNS name is different but IP address remains the same.
1630       InetAddress i =  new InetSocketAddress(rsHostname, port).getAddress();
1631       String address = rsHostname;
1632       if (i != null) {
1633         address = i.getHostAddress() + "-" + rsHostname;
1634       }
1635       return serviceName + "@" + address + ":" + port;
1636     }
1637 
1638     private ZooKeeperKeepAliveConnection keepAliveZookeeper;
1639     private AtomicInteger keepAliveZookeeperUserCount = new AtomicInteger(0);
1640     private boolean canCloseZKW = true;
1641 
1642     // keepAlive time, in ms. No reason to make it configurable.
1643     private static final long keepAlive = 5 * 60 * 1000;
1644 
1645     /**
1646      * Retrieve a shared ZooKeeperWatcher. You must close it it once you've have finished with it.
1647      * @return The shared instance. Never returns null.
1648      */
1649     ZooKeeperKeepAliveConnection getKeepAliveZooKeeperWatcher()
1650       throws IOException {
1651       synchronized (masterAndZKLock) {
1652         if (keepAliveZookeeper == null) {
1653           if (this.closed) {
1654             throw new IOException(toString() + " closed");
1655           }
1656           // We don't check that our link to ZooKeeper is still valid
1657           // But there is a retry mechanism in the ZooKeeperWatcher itself
1658           keepAliveZookeeper = new ZooKeeperKeepAliveConnection(conf, this.toString(), this);
1659         }
1660         keepAliveZookeeperUserCount.addAndGet(1);
1661         keepZooKeeperWatcherAliveUntil = Long.MAX_VALUE;
1662         return keepAliveZookeeper;
1663       }
1664     }
1665 
1666     void releaseZooKeeperWatcher(final ZooKeeperWatcher zkw) {
1667       if (zkw == null){
1668         return;
1669       }
1670       if (keepAliveZookeeperUserCount.addAndGet(-1) <= 0 ){
1671         keepZooKeeperWatcherAliveUntil = System.currentTimeMillis() + keepAlive;
1672       }
1673     }
1674 
1675     private void closeZooKeeperWatcher() {
1676       synchronized (masterAndZKLock) {
1677         if (keepAliveZookeeper != null) {
1678           LOG.info("Closing zookeeper sessionid=0x" +
1679             Long.toHexString(
1680               keepAliveZookeeper.getRecoverableZooKeeper().getSessionId()));
1681           keepAliveZookeeper.internalClose();
1682           keepAliveZookeeper = null;
1683         }
1684         keepAliveZookeeperUserCount.set(0);
1685       }
1686     }
1687 
1688     final MasterServiceState masterServiceState = new MasterServiceState(this);
1689 
1690     @Override
1691     public MasterService.BlockingInterface getMaster() throws MasterNotRunningException {
1692       return getKeepAliveMasterService();
1693     }
1694 
1695     private void resetMasterServiceState(final MasterServiceState mss) {
1696       mss.userCount++;
1697     }
1698 
1699     @Override
1700     public MasterKeepAliveConnection getKeepAliveMasterService()
1701     throws MasterNotRunningException {
1702       synchronized (masterAndZKLock) {
1703         if (!isKeepAliveMasterConnectedAndRunning(this.masterServiceState)) {
1704           MasterServiceStubMaker stubMaker = new MasterServiceStubMaker();
1705           try {
1706             this.masterServiceState.stub = stubMaker.makeStub();
1707           } catch (MasterNotRunningException ex) {
1708             throw ex;
1709           } catch (IOException e) {
1710             // rethrow as MasterNotRunningException so that we can keep the method sig
1711             throw new MasterNotRunningException(e);
1712           }
1713         }
1714         resetMasterServiceState(this.masterServiceState);
1715       }
1716       // Ugly delegation just so we can add in a Close method.
1717       final MasterService.BlockingInterface stub = this.masterServiceState.stub;
1718       return new MasterKeepAliveConnection() {
1719         MasterServiceState mss = masterServiceState;
1720         @Override
1721         public MasterProtos.AbortProcedureResponse abortProcedure(
1722           RpcController controller,
1723           MasterProtos.AbortProcedureRequest request) throws ServiceException {
1724           return stub.abortProcedure(controller, request);
1725         }
1726         @Override
1727         public MasterProtos.ListProceduresResponse listProcedures(
1728             RpcController controller,
1729             MasterProtos.ListProceduresRequest request) throws ServiceException {
1730           return stub.listProcedures(controller, request);
1731         }
1732         @Override
1733         public AddColumnResponse addColumn(RpcController controller, AddColumnRequest request)
1734         throws ServiceException {
1735           return stub.addColumn(controller, request);
1736         }
1737 
1738         @Override
1739         public DeleteColumnResponse deleteColumn(RpcController controller,
1740             DeleteColumnRequest request)
1741         throws ServiceException {
1742           return stub.deleteColumn(controller, request);
1743         }
1744 
1745         @Override
1746         public ModifyColumnResponse modifyColumn(RpcController controller,
1747             ModifyColumnRequest request)
1748         throws ServiceException {
1749           return stub.modifyColumn(controller, request);
1750         }
1751 
1752         @Override
1753         public MoveRegionResponse moveRegion(RpcController controller,
1754             MoveRegionRequest request) throws ServiceException {
1755           return stub.moveRegion(controller, request);
1756         }
1757 
1758         @Override
1759         public DispatchMergingRegionsResponse dispatchMergingRegions(
1760             RpcController controller, DispatchMergingRegionsRequest request)
1761             throws ServiceException {
1762           return stub.dispatchMergingRegions(controller, request);
1763         }
1764 
1765         @Override
1766         public AssignRegionResponse assignRegion(RpcController controller,
1767             AssignRegionRequest request) throws ServiceException {
1768           return stub.assignRegion(controller, request);
1769         }
1770 
1771         @Override
1772         public UnassignRegionResponse unassignRegion(RpcController controller,
1773             UnassignRegionRequest request) throws ServiceException {
1774           return stub.unassignRegion(controller, request);
1775         }
1776 
1777         @Override
1778         public OfflineRegionResponse offlineRegion(RpcController controller,
1779             OfflineRegionRequest request) throws ServiceException {
1780           return stub.offlineRegion(controller, request);
1781         }
1782 
1783         @Override
1784         public DeleteTableResponse deleteTable(RpcController controller,
1785             DeleteTableRequest request) throws ServiceException {
1786           return stub.deleteTable(controller, request);
1787         }
1788 
1789         @Override
1790         public TruncateTableResponse truncateTable(RpcController controller,
1791             TruncateTableRequest request) throws ServiceException {
1792           return stub.truncateTable(controller, request);
1793         }
1794 
1795         @Override
1796         public EnableTableResponse enableTable(RpcController controller,
1797             EnableTableRequest request) throws ServiceException {
1798           return stub.enableTable(controller, request);
1799         }
1800 
1801         @Override
1802         public DisableTableResponse disableTable(RpcController controller,
1803             DisableTableRequest request) throws ServiceException {
1804           return stub.disableTable(controller, request);
1805         }
1806 
1807         @Override
1808         public ModifyTableResponse modifyTable(RpcController controller,
1809             ModifyTableRequest request) throws ServiceException {
1810           return stub.modifyTable(controller, request);
1811         }
1812 
1813         @Override
1814         public CreateTableResponse createTable(RpcController controller,
1815             CreateTableRequest request) throws ServiceException {
1816           return stub.createTable(controller, request);
1817         }
1818 
1819         @Override
1820         public ShutdownResponse shutdown(RpcController controller,
1821             ShutdownRequest request) throws ServiceException {
1822           return stub.shutdown(controller, request);
1823         }
1824 
1825         @Override
1826         public StopMasterResponse stopMaster(RpcController controller,
1827             StopMasterRequest request) throws ServiceException {
1828           return stub.stopMaster(controller, request);
1829         }
1830 
1831         @Override
1832         public BalanceResponse balance(RpcController controller,
1833             BalanceRequest request) throws ServiceException {
1834           return stub.balance(controller, request);
1835         }
1836 
1837         @Override
1838         public SetBalancerRunningResponse setBalancerRunning(
1839             RpcController controller, SetBalancerRunningRequest request)
1840             throws ServiceException {
1841           return stub.setBalancerRunning(controller, request);
1842         }
1843 
1844         @Override
1845         public RunCatalogScanResponse runCatalogScan(RpcController controller,
1846             RunCatalogScanRequest request) throws ServiceException {
1847           return stub.runCatalogScan(controller, request);
1848         }
1849 
1850         @Override
1851         public EnableCatalogJanitorResponse enableCatalogJanitor(
1852             RpcController controller, EnableCatalogJanitorRequest request)
1853             throws ServiceException {
1854           return stub.enableCatalogJanitor(controller, request);
1855         }
1856 
1857         @Override
1858         public IsCatalogJanitorEnabledResponse isCatalogJanitorEnabled(
1859             RpcController controller, IsCatalogJanitorEnabledRequest request)
1860             throws ServiceException {
1861           return stub.isCatalogJanitorEnabled(controller, request);
1862         }
1863 
1864         @Override
1865         public CoprocessorServiceResponse execMasterService(
1866             RpcController controller, CoprocessorServiceRequest request)
1867             throws ServiceException {
1868           return stub.execMasterService(controller, request);
1869         }
1870 
1871         @Override
1872         public SnapshotResponse snapshot(RpcController controller,
1873             SnapshotRequest request) throws ServiceException {
1874           return stub.snapshot(controller, request);
1875         }
1876 
1877         @Override
1878         public GetCompletedSnapshotsResponse getCompletedSnapshots(
1879             RpcController controller, GetCompletedSnapshotsRequest request)
1880             throws ServiceException {
1881           return stub.getCompletedSnapshots(controller, request);
1882         }
1883 
1884         @Override
1885         public DeleteSnapshotResponse deleteSnapshot(RpcController controller,
1886             DeleteSnapshotRequest request) throws ServiceException {
1887           return stub.deleteSnapshot(controller, request);
1888         }
1889 
1890         @Override
1891         public IsSnapshotDoneResponse isSnapshotDone(RpcController controller,
1892             IsSnapshotDoneRequest request) throws ServiceException {
1893           return stub.isSnapshotDone(controller, request);
1894         }
1895 
1896         @Override
1897         public RestoreSnapshotResponse restoreSnapshot(
1898             RpcController controller, RestoreSnapshotRequest request)
1899             throws ServiceException {
1900           return stub.restoreSnapshot(controller, request);
1901         }
1902 
1903         @Override
1904         public IsRestoreSnapshotDoneResponse isRestoreSnapshotDone(
1905             RpcController controller, IsRestoreSnapshotDoneRequest request)
1906             throws ServiceException {
1907           return stub.isRestoreSnapshotDone(controller, request);
1908         }
1909 
1910         @Override
1911         public ExecProcedureResponse execProcedure(
1912             RpcController controller, ExecProcedureRequest request)
1913             throws ServiceException {
1914           return stub.execProcedure(controller, request);
1915         }
1916 
1917         @Override
1918         public ExecProcedureResponse execProcedureWithRet(
1919             RpcController controller, ExecProcedureRequest request)
1920             throws ServiceException {
1921           return stub.execProcedureWithRet(controller, request);
1922         }
1923 
1924         @Override
1925         public IsProcedureDoneResponse isProcedureDone(RpcController controller,
1926             IsProcedureDoneRequest request) throws ServiceException {
1927           return stub.isProcedureDone(controller, request);
1928         }
1929 
1930         @Override
1931         public GetProcedureResultResponse getProcedureResult(RpcController controller,
1932             GetProcedureResultRequest request) throws ServiceException {
1933           return stub.getProcedureResult(controller, request);
1934         }
1935 
1936         @Override
1937         public IsMasterRunningResponse isMasterRunning(
1938             RpcController controller, IsMasterRunningRequest request)
1939             throws ServiceException {
1940           return stub.isMasterRunning(controller, request);
1941         }
1942 
1943         @Override
1944         public ModifyNamespaceResponse modifyNamespace(RpcController controller,
1945             ModifyNamespaceRequest request)
1946         throws ServiceException {
1947           return stub.modifyNamespace(controller, request);
1948         }
1949 
1950         @Override
1951         public CreateNamespaceResponse createNamespace(
1952             RpcController controller, CreateNamespaceRequest request) throws ServiceException {
1953           return stub.createNamespace(controller, request);
1954         }
1955 
1956         @Override
1957         public DeleteNamespaceResponse deleteNamespace(
1958             RpcController controller, DeleteNamespaceRequest request) throws ServiceException {
1959           return stub.deleteNamespace(controller, request);
1960         }
1961 
1962         @Override
1963         public GetNamespaceDescriptorResponse getNamespaceDescriptor(RpcController controller,
1964             GetNamespaceDescriptorRequest request) throws ServiceException {
1965           return stub.getNamespaceDescriptor(controller, request);
1966         }
1967 
1968         @Override
1969         public ListNamespaceDescriptorsResponse listNamespaceDescriptors(RpcController controller,
1970             ListNamespaceDescriptorsRequest request) throws ServiceException {
1971           return stub.listNamespaceDescriptors(controller, request);
1972         }
1973 
1974         @Override
1975         public ListTableDescriptorsByNamespaceResponse listTableDescriptorsByNamespace(
1976             RpcController controller, ListTableDescriptorsByNamespaceRequest request)
1977                 throws ServiceException {
1978           return stub.listTableDescriptorsByNamespace(controller, request);
1979         }
1980 
1981         @Override
1982         public ListTableNamesByNamespaceResponse listTableNamesByNamespace(
1983             RpcController controller, ListTableNamesByNamespaceRequest request)
1984                 throws ServiceException {
1985           return stub.listTableNamesByNamespace(controller, request);
1986         }
1987 
1988         @Override
1989         public void close() {
1990           release(this.mss);
1991         }
1992 
1993         @Override
1994         public GetSchemaAlterStatusResponse getSchemaAlterStatus(
1995             RpcController controller, GetSchemaAlterStatusRequest request)
1996             throws ServiceException {
1997           return stub.getSchemaAlterStatus(controller, request);
1998         }
1999 
2000         @Override
2001         public GetTableDescriptorsResponse getTableDescriptors(
2002             RpcController controller, GetTableDescriptorsRequest request)
2003             throws ServiceException {
2004           return stub.getTableDescriptors(controller, request);
2005         }
2006 
2007         @Override
2008         public GetTableNamesResponse getTableNames(
2009             RpcController controller, GetTableNamesRequest request)
2010             throws ServiceException {
2011           return stub.getTableNames(controller, request);
2012         }
2013 
2014         @Override
2015         public GetClusterStatusResponse getClusterStatus(
2016             RpcController controller, GetClusterStatusRequest request)
2017             throws ServiceException {
2018           return stub.getClusterStatus(controller, request);
2019         }
2020 
2021         @Override
2022         public SetQuotaResponse setQuota(RpcController controller, SetQuotaRequest request)
2023             throws ServiceException {
2024           return stub.setQuota(controller, request);
2025         }
2026 
2027         @Override
2028         public MajorCompactionTimestampResponse getLastMajorCompactionTimestamp(
2029             RpcController controller, MajorCompactionTimestampRequest request)
2030             throws ServiceException {
2031           return stub.getLastMajorCompactionTimestamp(controller, request);
2032         }
2033 
2034         @Override
2035         public MajorCompactionTimestampResponse getLastMajorCompactionTimestampForRegion(
2036             RpcController controller, MajorCompactionTimestampForRegionRequest request)
2037             throws ServiceException {
2038           return stub.getLastMajorCompactionTimestampForRegion(controller, request);
2039         }
2040 
2041         @Override
2042         public IsBalancerEnabledResponse isBalancerEnabled(RpcController controller,
2043             IsBalancerEnabledRequest request) throws ServiceException {
2044           return stub.isBalancerEnabled(controller, request);
2045         }
2046       };
2047     }
2048 
2049 
2050     private static void release(MasterServiceState mss) {
2051       if (mss != null && mss.connection != null) {
2052         ((HConnectionImplementation)mss.connection).releaseMaster(mss);
2053       }
2054     }
2055 
2056     private boolean isKeepAliveMasterConnectedAndRunning(MasterServiceState mss) {
2057       if (mss.getStub() == null){
2058         return false;
2059       }
2060       try {
2061         return mss.isMasterRunning();
2062       } catch (UndeclaredThrowableException e) {
2063         // It's somehow messy, but we can receive exceptions such as
2064         //  java.net.ConnectException but they're not declared. So we catch it...
2065         LOG.info("Master connection is not running anymore", e.getUndeclaredThrowable());
2066         return false;
2067       } catch (ServiceException se) {
2068         LOG.warn("Checking master connection", se);
2069         return false;
2070       }
2071     }
2072 
2073     void releaseMaster(MasterServiceState mss) {
2074       if (mss.getStub() == null) return;
2075       synchronized (masterAndZKLock) {
2076         --mss.userCount;
2077       }
2078     }
2079 
2080     private void closeMasterService(MasterServiceState mss) {
2081       if (mss.getStub() != null) {
2082         LOG.info("Closing master protocol: " + mss);
2083         mss.clearStub();
2084       }
2085       mss.userCount = 0;
2086     }
2087 
2088     /**
2089      * Immediate close of the shared master. Can be by the delayed close or when closing the
2090      * connection itself.
2091      */
2092     private void closeMaster() {
2093       synchronized (masterAndZKLock) {
2094         closeMasterService(masterServiceState);
2095       }
2096     }
2097 
2098     void updateCachedLocation(HRegionInfo hri, ServerName source,
2099                               ServerName serverName, long seqNum) {
2100       HRegionLocation newHrl = new HRegionLocation(hri, serverName, seqNum);
2101       cacheLocation(hri.getTable(), source, newHrl);
2102     }
2103 
2104     @Override
2105     public void deleteCachedRegionLocation(final HRegionLocation location) {
2106       metaCache.clearCache(location);
2107     }
2108 
2109     @Override
2110     public void updateCachedLocations(final TableName tableName, byte[] rowkey,
2111         final Object exception, final HRegionLocation source) {
2112       assert source != null;
2113       updateCachedLocations(tableName, source.getRegionInfo().getRegionName()
2114         , rowkey, exception, source.getServerName());
2115     }
2116 
2117     /**
2118      * Update the location with the new value (if the exception is a RegionMovedException)
2119      * or delete it from the cache. Does nothing if we can be sure from the exception that
2120      * the location is still accurate, or if the cache has already been updated.
2121      * @param exception an object (to simplify user code) on which we will try to find a nested
2122      *                  or wrapped or both RegionMovedException
2123      * @param source server that is the source of the location update.
2124      */
2125     @Override
2126     public void updateCachedLocations(final TableName tableName, byte[] regionName, byte[] rowkey,
2127       final Object exception, final ServerName source) {
2128       if (rowkey == null || tableName == null) {
2129         LOG.warn("Coding error, see method javadoc. row=" + (rowkey == null ? "null" : rowkey) +
2130             ", tableName=" + (tableName == null ? "null" : tableName));
2131         return;
2132       }
2133 
2134       if (source == null) {
2135         // This should not happen, but let's secure ourselves.
2136         return;
2137       }
2138 
2139       if (regionName == null) {
2140         // we do not know which region, so just remove the cache entry for the row and server
2141         metaCache.clearCache(tableName, rowkey, source);
2142         return;
2143       }
2144 
2145       // Is it something we have already updated?
2146       final RegionLocations oldLocations = getCachedLocation(tableName, rowkey);
2147       HRegionLocation oldLocation = null;
2148       if (oldLocations != null) {
2149         oldLocation = oldLocations.getRegionLocationByRegionName(regionName);
2150       }
2151       if (oldLocation == null || !source.equals(oldLocation.getServerName())) {
2152         // There is no such location in the cache (it's been removed already) or
2153         // the cache has already been refreshed with a different location.  => nothing to do
2154         return;
2155       }
2156 
2157       HRegionInfo regionInfo = oldLocation.getRegionInfo();
2158       Throwable cause = findException(exception);
2159       if (cause != null) {
2160         if (cause instanceof RegionTooBusyException || cause instanceof RegionOpeningException) {
2161           // We know that the region is still on this region server
2162           return;
2163         }
2164 
2165         if (cause instanceof RegionMovedException) {
2166           RegionMovedException rme = (RegionMovedException) cause;
2167           if (LOG.isTraceEnabled()) {
2168             LOG.trace("Region " + regionInfo.getRegionNameAsString() + " moved to " +
2169                 rme.getHostname() + ":" + rme.getPort() +
2170                 " according to " + source.getHostAndPort());
2171           }
2172           // We know that the region is not anymore on this region server, but we know
2173           //  the new location.
2174           updateCachedLocation(
2175               regionInfo, source, rme.getServerName(), rme.getLocationSeqNum());
2176           return;
2177         }
2178       }
2179 
2180       // If we're here, it means that can cannot be sure about the location, so we remove it from
2181       // the cache. Do not send the source because source can be a new server in the same host:port
2182       metaCache.clearCache(regionInfo);
2183     }
2184 
2185     @Override
2186     public void updateCachedLocations(final byte[] tableName, byte[] rowkey,
2187       final Object exception, final HRegionLocation source) {
2188       updateCachedLocations(TableName.valueOf(tableName), rowkey, exception, source);
2189     }
2190 
2191     @Override
2192     @Deprecated
2193     public void processBatch(List<? extends Row> list,
2194         final TableName tableName,
2195         ExecutorService pool,
2196         Object[] results) throws IOException, InterruptedException {
2197       // This belongs in HTable!!! Not in here.  St.Ack
2198 
2199       // results must be the same size as list
2200       if (results.length != list.size()) {
2201         throw new IllegalArgumentException(
2202           "argument results must be the same size as argument list");
2203       }
2204       processBatchCallback(list, tableName, pool, results, null);
2205     }
2206 
2207     @Override
2208     @Deprecated
2209     public void processBatch(List<? extends Row> list,
2210         final byte[] tableName,
2211         ExecutorService pool,
2212         Object[] results) throws IOException, InterruptedException {
2213       processBatch(list, TableName.valueOf(tableName), pool, results);
2214     }
2215 
2216     /**
2217      * Send the queries in parallel on the different region servers. Retries on failures.
2218      * If the method returns it means that there is no error, and the 'results' array will
2219      * contain no exception. On error, an exception is thrown, and the 'results' array will
2220      * contain results and exceptions.
2221      * @deprecated since 0.96 - Use {@link HTable#processBatchCallback} instead
2222      */
2223     @Override
2224     @Deprecated
2225     public <R> void processBatchCallback(
2226       List<? extends Row> list,
2227       TableName tableName,
2228       ExecutorService pool,
2229       Object[] results,
2230       Batch.Callback<R> callback)
2231       throws IOException, InterruptedException {
2232 
2233       AsyncRequestFuture ars = this.asyncProcess.submitAll(
2234           pool, tableName, list, callback, results);
2235       ars.waitUntilDone();
2236       if (ars.hasError()) {
2237         throw ars.getErrors();
2238       }
2239     }
2240 
2241     @Override
2242     @Deprecated
2243     public <R> void processBatchCallback(
2244       List<? extends Row> list,
2245       byte[] tableName,
2246       ExecutorService pool,
2247       Object[] results,
2248       Batch.Callback<R> callback)
2249       throws IOException, InterruptedException {
2250       processBatchCallback(list, TableName.valueOf(tableName), pool, results, callback);
2251     }
2252 
2253     // For tests to override.
2254     protected AsyncProcess createAsyncProcess(Configuration conf) {
2255       // No default pool available.
2256       return new AsyncProcess(this, conf, this.batchPool,
2257           RpcRetryingCallerFactory.instantiate(conf, this.getStatisticsTracker()), false,
2258           RpcControllerFactory.instantiate(conf));
2259     }
2260 
2261     @Override
2262     public AsyncProcess getAsyncProcess() {
2263       return asyncProcess;
2264     }
2265 
2266     @Override
2267     public ServerStatisticTracker getStatisticsTracker() {
2268       return this.stats;
2269     }
2270 
2271     @Override
2272     public ClientBackoffPolicy getBackoffPolicy() {
2273       return this.backoffPolicy;
2274     }
2275 
2276     /*
2277      * Return the number of cached region for a table. It will only be called
2278      * from a unit test.
2279      */
2280     @VisibleForTesting
2281     int getNumberOfCachedRegionLocations(final TableName tableName) {
2282       return metaCache.getNumberOfCachedRegionLocations(tableName);
2283     }
2284 
2285     @Override
2286     @Deprecated
2287     public void setRegionCachePrefetch(final TableName tableName, final boolean enable) {
2288     }
2289 
2290     @Override
2291     @Deprecated
2292     public void setRegionCachePrefetch(final byte[] tableName,
2293         final boolean enable) {
2294     }
2295 
2296     @Override
2297     @Deprecated
2298     public boolean getRegionCachePrefetch(TableName tableName) {
2299       return false;
2300     }
2301 
2302     @Override
2303     @Deprecated
2304     public boolean getRegionCachePrefetch(byte[] tableName) {
2305       return false;
2306     }
2307 
2308     @Override
2309     public void abort(final String msg, Throwable t) {
2310       if (t instanceof KeeperException.SessionExpiredException
2311         && keepAliveZookeeper != null) {
2312         synchronized (masterAndZKLock) {
2313           if (keepAliveZookeeper != null) {
2314             LOG.warn("This client just lost it's session with ZooKeeper," +
2315               " closing it." +
2316               " It will be recreated next time someone needs it", t);
2317             closeZooKeeperWatcher();
2318           }
2319         }
2320       } else {
2321         if (t != null) {
2322           LOG.fatal(msg, t);
2323         } else {
2324           LOG.fatal(msg);
2325         }
2326         this.aborted = true;
2327         close();
2328         this.closed = true;
2329       }
2330     }
2331 
2332     @Override
2333     public boolean isClosed() {
2334       return this.closed;
2335     }
2336 
2337     @Override
2338     public boolean isAborted(){
2339       return this.aborted;
2340     }
2341 
2342     @Override
2343     public int getCurrentNrHRS() throws IOException {
2344       return this.registry.getCurrentNrHRS();
2345     }
2346 
2347     /**
2348      * Increment this client's reference count.
2349      */
2350     void incCount() {
2351       ++refCount;
2352     }
2353 
2354     /**
2355      * Decrement this client's reference count.
2356      */
2357     void decCount() {
2358       if (refCount > 0) {
2359         --refCount;
2360       }
2361     }
2362 
2363     /**
2364      * Return if this client has no reference
2365      *
2366      * @return true if this client has no reference; false otherwise
2367      */
2368     boolean isZeroReference() {
2369       return refCount == 0;
2370     }
2371 
2372     void internalClose() {
2373       if (this.closed) {
2374         return;
2375       }
2376       closeMaster();
2377       shutdownPools();
2378       this.closed = true;
2379       closeZooKeeperWatcher();
2380       this.stubs.clear();
2381       if (clusterStatusListener != null) {
2382         clusterStatusListener.close();
2383       }
2384       if (rpcClient != null) {
2385         rpcClient.close();
2386       }
2387     }
2388 
2389     @Override
2390     public void close() {
2391       if (managed) {
2392         if (aborted) {
2393           ConnectionManager.deleteStaleConnection(this);
2394         } else {
2395           ConnectionManager.deleteConnection(this, false);
2396         }
2397       } else {
2398         internalClose();
2399       }
2400     }
2401 
2402     /**
2403      * Close the connection for good, regardless of what the current value of
2404      * {@link #refCount} is. Ideally, {@link #refCount} should be zero at this
2405      * point, which would be the case if all of its consumers close the
2406      * connection. However, on the off chance that someone is unable to close
2407      * the connection, perhaps because it bailed out prematurely, the method
2408      * below will ensure that this {@link HConnection} instance is cleaned up.
2409      * Caveat: The JVM may take an unknown amount of time to call finalize on an
2410      * unreachable object, so our hope is that every consumer cleans up after
2411      * itself, like any good citizen.
2412      */
2413     @Override
2414     protected void finalize() throws Throwable {
2415       super.finalize();
2416       // Pretend as if we are about to release the last remaining reference
2417       refCount = 1;
2418       close();
2419     }
2420 
2421     /**
2422      * @deprecated Use {@link Admin#listTables()} instead
2423      */
2424     @Deprecated
2425     @Override
2426     public HTableDescriptor[] listTables() throws IOException {
2427       MasterKeepAliveConnection master = getKeepAliveMasterService();
2428       try {
2429         GetTableDescriptorsRequest req =
2430           RequestConverter.buildGetTableDescriptorsRequest((List<TableName>)null);
2431         return ProtobufUtil.getHTableDescriptorArray(master.getTableDescriptors(null, req));
2432       } catch (ServiceException se) {
2433         throw ProtobufUtil.getRemoteException(se);
2434       } finally {
2435         master.close();
2436       }
2437     }
2438 
2439     /**
2440      * @deprecated Use {@link Admin#listTableNames()} instead
2441      */
2442     @Deprecated
2443     @Override
2444     public String[] getTableNames() throws IOException {
2445       TableName[] tableNames = listTableNames();
2446       String result[] = new String[tableNames.length];
2447       for (int i = 0; i < tableNames.length; i++) {
2448         result[i] = tableNames[i].getNameAsString();
2449       }
2450       return result;
2451     }
2452 
2453     /**
2454      * @deprecated Use {@link Admin#listTableNames()} instead
2455      */
2456     @Deprecated
2457     @Override
2458     public TableName[] listTableNames() throws IOException {
2459       MasterKeepAliveConnection master = getKeepAliveMasterService();
2460       try {
2461         return ProtobufUtil.getTableNameArray(master.getTableNames(null,
2462             GetTableNamesRequest.newBuilder().build())
2463           .getTableNamesList());
2464       } catch (ServiceException se) {
2465         throw ProtobufUtil.getRemoteException(se);
2466       } finally {
2467         master.close();
2468       }
2469     }
2470 
2471     /**
2472      * @deprecated Use {@link Admin#getTableDescriptorsByTableName(List)} instead
2473      */
2474     @Deprecated
2475     @Override
2476     public HTableDescriptor[] getHTableDescriptorsByTableName(
2477         List<TableName> tableNames) throws IOException {
2478       if (tableNames == null || tableNames.isEmpty()) return new HTableDescriptor[0];
2479       MasterKeepAliveConnection master = getKeepAliveMasterService();
2480       try {
2481         GetTableDescriptorsRequest req =
2482           RequestConverter.buildGetTableDescriptorsRequest(tableNames);
2483         return ProtobufUtil.getHTableDescriptorArray(master.getTableDescriptors(null, req));
2484       } catch (ServiceException se) {
2485         throw ProtobufUtil.getRemoteException(se);
2486       } finally {
2487         master.close();
2488       }
2489     }
2490 
2491     /**
2492      * @deprecated Use {@link Admin#getTableDescriptorsByTableName(List)} instead
2493      */
2494     @Deprecated
2495     @Override
2496     public HTableDescriptor[] getHTableDescriptors(
2497         List<String> names) throws IOException {
2498       List<TableName> tableNames = new ArrayList<TableName>(names.size());
2499       for(String name : names) {
2500         tableNames.add(TableName.valueOf(name));
2501       }
2502 
2503       return getHTableDescriptorsByTableName(tableNames);
2504     }
2505 
2506     @Override
2507     public NonceGenerator getNonceGenerator() {
2508       return this.nonceGenerator;
2509     }
2510 
2511     /**
2512      * Connects to the master to get the table descriptor.
2513      * @param tableName table name
2514      * @throws IOException if the connection to master fails or if the table
2515      *  is not found.
2516      * @deprecated Use {@link Admin#getTableDescriptor(TableName)} instead
2517      */
2518     @Deprecated
2519     @Override
2520     public HTableDescriptor getHTableDescriptor(final TableName tableName)
2521     throws IOException {
2522       if (tableName == null) return null;
2523       MasterKeepAliveConnection master = getKeepAliveMasterService();
2524       GetTableDescriptorsResponse htds;
2525       try {
2526         GetTableDescriptorsRequest req =
2527           RequestConverter.buildGetTableDescriptorsRequest(tableName);
2528         htds = master.getTableDescriptors(null, req);
2529       } catch (ServiceException se) {
2530         throw ProtobufUtil.getRemoteException(se);
2531       } finally {
2532         master.close();
2533       }
2534       if (!htds.getTableSchemaList().isEmpty()) {
2535         return HTableDescriptor.convert(htds.getTableSchemaList().get(0));
2536       }
2537       throw new TableNotFoundException(tableName.getNameAsString());
2538     }
2539 
2540     /**
2541      * @deprecated Use {@link Admin#getTableDescriptor(TableName)} instead
2542      */
2543     @Deprecated
2544     @Override
2545     public HTableDescriptor getHTableDescriptor(final byte[] tableName)
2546     throws IOException {
2547       return getHTableDescriptor(TableName.valueOf(tableName));
2548     }
2549 
2550     @Override
2551     public RpcRetryingCallerFactory getNewRpcRetryingCallerFactory(Configuration conf) {
2552       return RpcRetryingCallerFactory
2553           .instantiate(conf, this.interceptor, this.getStatisticsTracker());
2554     }
2555 
2556     @Override
2557     public boolean isManaged() {
2558       return managed;
2559     }
2560 
2561     @Override
2562     public boolean hasCellBlockSupport() {
2563       return this.rpcClient.hasCellBlockSupport();
2564     }
2565     
2566     @Override
2567     public ConnectionConfiguration getConnectionConfiguration() {
2568       return this.connectionConfig;
2569     }
2570 
2571     @Override
2572     public RpcRetryingCallerFactory getRpcRetryingCallerFactory() {
2573       return this.rpcCallerFactory;
2574     }
2575 
2576     @Override
2577     public RpcControllerFactory getRpcControllerFactory() {
2578       return this.rpcControllerFactory;
2579     }
2580   }
2581 
2582   /**
2583    * The record of errors for servers.
2584    */
2585   static class ServerErrorTracker {
2586     // We need a concurrent map here, as we could have multiple threads updating it in parallel.
2587     private final ConcurrentMap<ServerName, ServerErrors> errorsByServer =
2588         new ConcurrentHashMap<ServerName, ServerErrors>();
2589     private final long canRetryUntil;
2590     private final int maxRetries;
2591     private final long startTrackingTime;
2592 
2593     public ServerErrorTracker(long timeout, int maxRetries) {
2594       this.maxRetries = maxRetries;
2595       this.canRetryUntil = EnvironmentEdgeManager.currentTime() + timeout;
2596       this.startTrackingTime = new Date().getTime();
2597     }
2598 
2599     /**
2600      * We stop to retry when we have exhausted BOTH the number of retries and the time allocated.
2601      */
2602     boolean canRetryMore(int numRetry) {
2603       // If there is a single try we must not take into account the time.
2604       return numRetry < maxRetries || (maxRetries > 1 &&
2605           EnvironmentEdgeManager.currentTime() < this.canRetryUntil);
2606     }
2607 
2608     /**
2609      * Calculates the back-off time for a retrying request to a particular server.
2610      *
2611      * @param server    The server in question.
2612      * @param basePause The default hci pause.
2613      * @return The time to wait before sending next request.
2614      */
2615     long calculateBackoffTime(ServerName server, long basePause) {
2616       long result;
2617       ServerErrors errorStats = errorsByServer.get(server);
2618       if (errorStats != null) {
2619         result = ConnectionUtils.getPauseTime(basePause, errorStats.retries.get());
2620       } else {
2621         result = 0; // yes, if the server is not in our list we don't wait before retrying.
2622       }
2623       return result;
2624     }
2625 
2626     /**
2627      * Reports that there was an error on the server to do whatever bean-counting necessary.
2628      *
2629      * @param server The server in question.
2630      */
2631     void reportServerError(ServerName server) {
2632       ServerErrors errors = errorsByServer.get(server);
2633       if (errors != null) {
2634         errors.addError();
2635       } else {
2636         errors = errorsByServer.putIfAbsent(server, new ServerErrors());
2637         if (errors != null){
2638           errors.addError();
2639         }
2640       }
2641     }
2642 
2643     long getStartTrackingTime() {
2644       return startTrackingTime;
2645     }
2646 
2647     /**
2648      * The record of errors for a server.
2649      */
2650     private static class ServerErrors {
2651       public final AtomicInteger retries = new AtomicInteger(0);
2652 
2653       public void addError() {
2654         retries.incrementAndGet();
2655       }
2656     }
2657   }
2658 
2659   /**
2660    * Look for an exception we know in the remote exception:
2661    * - hadoop.ipc wrapped exceptions
2662    * - nested exceptions
2663    *
2664    * Looks for: RegionMovedException / RegionOpeningException / RegionTooBusyException
2665    * @return null if we didn't find the exception, the exception otherwise.
2666    */
2667   public static Throwable findException(Object exception) {
2668     if (exception == null || !(exception instanceof Throwable)) {
2669       return null;
2670     }
2671     Throwable cur = (Throwable) exception;
2672     while (cur != null) {
2673       if (cur instanceof RegionMovedException || cur instanceof RegionOpeningException
2674           || cur instanceof RegionTooBusyException) {
2675         return cur;
2676       }
2677       if (cur instanceof RemoteException) {
2678         RemoteException re = (RemoteException) cur;
2679         cur = re.unwrapRemoteException(
2680             RegionOpeningException.class, RegionMovedException.class,
2681             RegionTooBusyException.class);
2682         if (cur == null) {
2683           cur = re.unwrapRemoteException();
2684         }
2685         // unwrapRemoteException can return the exception given as a parameter when it cannot
2686         //  unwrap it. In this case, there is no need to look further
2687         // noinspection ObjectEquality
2688         if (cur == re) {
2689           return null;
2690         }
2691       } else {
2692         cur = cur.getCause();
2693       }
2694     }
2695 
2696     return null;
2697   }
2698 }