View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.client;
20  
21  import static org.apache.hadoop.hbase.client.MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY;
22  
23  import java.io.Closeable;
24  import java.io.IOException;
25  import java.io.InterruptedIOException;
26  import java.lang.reflect.UndeclaredThrowableException;
27  import java.net.InetAddress;
28  import java.net.InetSocketAddress;
29  import java.util.ArrayList;
30  import java.util.Date;
31  import java.util.HashSet;
32  import java.util.LinkedHashMap;
33  import java.util.List;
34  import java.util.concurrent.BlockingQueue;
35  import java.util.Map;
36  import java.util.Map.Entry;
37  import java.util.NavigableMap;
38  import java.util.Set;
39  import java.util.concurrent.ConcurrentHashMap;
40  import java.util.concurrent.ConcurrentMap;
41  import java.util.concurrent.ExecutorService;
42  import java.util.concurrent.LinkedBlockingQueue;
43  import java.util.concurrent.ThreadPoolExecutor;
44  import java.util.concurrent.TimeUnit;
45  import java.util.concurrent.atomic.AtomicBoolean;
46  import java.util.concurrent.atomic.AtomicInteger;
47  
48  import org.apache.commons.logging.Log;
49  import org.apache.commons.logging.LogFactory;
50  import org.apache.hadoop.conf.Configuration;
51  import org.apache.hadoop.hbase.DoNotRetryIOException;
52  import org.apache.hadoop.hbase.HBaseConfiguration;
53  import org.apache.hadoop.hbase.HConstants;
54  import org.apache.hadoop.hbase.HRegionInfo;
55  import org.apache.hadoop.hbase.HRegionLocation;
56  import org.apache.hadoop.hbase.HTableDescriptor;
57  import org.apache.hadoop.hbase.MasterNotRunningException;
58  import org.apache.hadoop.hbase.MetaTableAccessor;
59  import org.apache.hadoop.hbase.RegionLocations;
60  import org.apache.hadoop.hbase.ServerName;
61  import org.apache.hadoop.hbase.TableName;
62  import org.apache.hadoop.hbase.TableNotEnabledException;
63  import org.apache.hadoop.hbase.TableNotFoundException;
64  import org.apache.hadoop.hbase.ZooKeeperConnectionException;
65  import org.apache.hadoop.hbase.classification.InterfaceAudience;
66  import org.apache.hadoop.hbase.client.AsyncProcess.AsyncRequestFuture;
67  import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor;
68  import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitorBase;
69  import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy;
70  import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicyFactory;
71  import org.apache.hadoop.hbase.client.coprocessor.Batch;
72  import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil;
73  import org.apache.hadoop.hbase.exceptions.RegionMovedException;
74  import org.apache.hadoop.hbase.ipc.RpcClient;
75  import org.apache.hadoop.hbase.ipc.RpcClientFactory;
76  import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
77  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
78  import org.apache.hadoop.hbase.protobuf.RequestConverter;
79  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
80  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
81  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
82  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
83  import org.apache.hadoop.hbase.protobuf.generated.*;
84  import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AddColumnRequest;
85  import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AddColumnResponse;
86  import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AssignRegionRequest;
87  import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AssignRegionResponse;
88  import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.BalanceRequest;
89  import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.BalanceResponse;
90  import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.CreateNamespaceRequest;
91  import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.CreateNamespaceResponse;
92  import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.CreateTableRequest;
93  import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.CreateTableResponse;
94  import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteColumnRequest;
95  import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteColumnResponse;
96  import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteNamespaceRequest;
97  import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteNamespaceResponse;
98  import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteSnapshotRequest;
99  import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteSnapshotResponse;
100 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteTableRequest;
101 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteTableResponse;
102 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DisableTableRequest;
103 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DisableTableResponse;
104 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DispatchMergingRegionsRequest;
105 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DispatchMergingRegionsResponse;
106 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.EnableCatalogJanitorRequest;
107 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.EnableCatalogJanitorResponse;
108 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.EnableTableRequest;
109 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.EnableTableResponse;
110 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ExecProcedureRequest;
111 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ExecProcedureResponse;
112 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetClusterStatusRequest;
113 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetClusterStatusResponse;
114 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetCompletedSnapshotsRequest;
115 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetCompletedSnapshotsResponse;
116 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetNamespaceDescriptorRequest;
117 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetNamespaceDescriptorResponse;
118 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetSchemaAlterStatusRequest;
119 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetSchemaAlterStatusResponse;
120 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableDescriptorsRequest;
121 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableDescriptorsResponse;
122 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableNamesRequest;
123 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableNamesResponse;
124 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetProcedureResultRequest;
125 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetProcedureResultResponse;
126 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsBalancerEnabledRequest;
127 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsBalancerEnabledResponse;
128 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsCatalogJanitorEnabledRequest;
129 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsCatalogJanitorEnabledResponse;
130 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsMasterRunningRequest;
131 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsMasterRunningResponse;
132 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsNormalizerEnabledRequest;
133 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsNormalizerEnabledResponse;
134 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsProcedureDoneRequest;
135 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsProcedureDoneResponse;
136 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsRestoreSnapshotDoneRequest;
137 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsRestoreSnapshotDoneResponse;
138 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsSnapshotDoneRequest;
139 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsSnapshotDoneResponse;
140 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ListNamespaceDescriptorsRequest;
141 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ListNamespaceDescriptorsResponse;
142 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceRequest;
143 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceResponse;
144 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ListTableNamesByNamespaceRequest;
145 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ListTableNamesByNamespaceResponse;
146 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MajorCompactionTimestampForRegionRequest;
147 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MajorCompactionTimestampRequest;
148 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MajorCompactionTimestampResponse;
149 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MasterService;
150 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ModifyColumnRequest;
151 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ModifyColumnResponse;
152 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ModifyNamespaceRequest;
153 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ModifyNamespaceResponse;
154 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ModifyTableRequest;
155 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ModifyTableResponse;
156 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MoveRegionRequest;
157 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MoveRegionResponse;
158 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.NormalizeRequest;
159 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.NormalizeResponse;
160 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.OfflineRegionRequest;
161 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.OfflineRegionResponse;
162 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.RestoreSnapshotRequest;
163 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.RestoreSnapshotResponse;
164 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.RunCatalogScanRequest;
165 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.RunCatalogScanResponse;
166 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SecurityCapabilitiesRequest;
167 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SecurityCapabilitiesResponse;
168 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SetBalancerRunningRequest;
169 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SetBalancerRunningResponse;
170 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SetNormalizerRunningRequest;
171 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SetNormalizerRunningResponse;
172 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SetQuotaRequest;
173 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SetQuotaResponse;
174 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ShutdownRequest;
175 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ShutdownResponse;
176 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SnapshotRequest;
177 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SnapshotResponse;
178 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.StopMasterRequest;
179 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.StopMasterResponse;
180 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.TruncateTableRequest;
181 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.TruncateTableResponse;
182 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.UnassignRegionRequest;
183 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.UnassignRegionResponse;
184 import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
185 import org.apache.hadoop.hbase.security.User;
186 import org.apache.hadoop.hbase.security.UserProvider;
187 import org.apache.hadoop.hbase.util.Bytes;
188 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
189 import org.apache.hadoop.hbase.util.ExceptionUtil;
190 import org.apache.hadoop.hbase.util.Threads;
191 import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
192 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
193 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
194 import org.apache.hadoop.ipc.RemoteException;
195 import org.apache.zookeeper.KeeperException;
196 
197 import com.google.common.annotations.VisibleForTesting;
198 import com.google.protobuf.BlockingRpcChannel;
199 import com.google.protobuf.RpcController;
200 import com.google.protobuf.ServiceException;
201 
202 /**
203  * An internal, non-instantiable class that manages creation of {@link HConnection}s.
204  */
205 @SuppressWarnings("serial")
206 @InterfaceAudience.Private
207 // NOTE: DO NOT make this class public. It was made package-private on purpose.
208 class ConnectionManager {
209   static final Log LOG = LogFactory.getLog(ConnectionManager.class);
210 
211   public static final String RETRIES_BY_SERVER_KEY = "hbase.client.retries.by.server";
212   private static final String CLIENT_NONCES_ENABLED_KEY = "hbase.client.nonces.enabled";
213   private static final String RESOLVE_HOSTNAME_ON_FAIL_KEY = "hbase.resolve.hostnames.on.failure";
214 
215   // An LRU Map of HConnectionKey -> HConnection (TableServer).  All
216   // access must be synchronized.  This map is not private because tests
217   // need to be able to tinker with it.
218   static final Map<HConnectionKey, HConnectionImplementation> CONNECTION_INSTANCES;
219 
220   public static final int MAX_CACHED_CONNECTION_INSTANCES;
221 
222   /**
223    * Global nonceGenerator shared per client.Currently there's no reason to limit its scope.
224    * Once it's set under nonceGeneratorCreateLock, it is never unset or changed.
225    */
226   private static volatile NonceGenerator nonceGenerator = null;
227   /** The nonce generator lock. Only taken when creating HConnection, which gets a private copy. */
228   private static Object nonceGeneratorCreateLock = new Object();
229 
230   static {
231     // We set instances to one more than the value specified for {@link
232     // HConstants#ZOOKEEPER_MAX_CLIENT_CNXNS}. By default, the zk default max
233     // connections to the ensemble from the one client is 30, so in that case we
234     // should run into zk issues before the LRU hit this value of 31.
235     MAX_CACHED_CONNECTION_INSTANCES = HBaseConfiguration.create().getInt(
236       HConstants.ZOOKEEPER_MAX_CLIENT_CNXNS, HConstants.DEFAULT_ZOOKEPER_MAX_CLIENT_CNXNS) + 1;
237     CONNECTION_INSTANCES = new LinkedHashMap<HConnectionKey, HConnectionImplementation>(
238         (int) (MAX_CACHED_CONNECTION_INSTANCES / 0.75F) + 1, 0.75F, true) {
239       @Override
240       protected boolean removeEldestEntry(
241           Map.Entry<HConnectionKey, HConnectionImplementation> eldest) {
242          return size() > MAX_CACHED_CONNECTION_INSTANCES;
243        }
244     };
245   }
246 
247   /** Dummy nonce generator for disabled nonces. */
248   static class NoNonceGenerator implements NonceGenerator {
249     @Override
250     public long getNonceGroup() {
251       return HConstants.NO_NONCE;
252     }
253     @Override
254     public long newNonce() {
255       return HConstants.NO_NONCE;
256     }
257   }
258 
259   /*
260    * Non-instantiable.
261    */
262   private ConnectionManager() {
263     super();
264   }
265 
266   /**
267    * @param conn The connection for which to replace the generator.
268    * @param cnm Replaces the nonce generator used, for testing.
269    * @return old nonce generator.
270    */
271   @VisibleForTesting
272   static NonceGenerator injectNonceGeneratorForTesting(
273       ClusterConnection conn, NonceGenerator cnm) {
274     HConnectionImplementation connImpl = (HConnectionImplementation)conn;
275     NonceGenerator ng = connImpl.getNonceGenerator();
276     LOG.warn("Nonce generator is being replaced by test code for " + cnm.getClass().getName());
277     connImpl.nonceGenerator = cnm;
278     return ng;
279   }
280 
281   /**
282    * Get the connection that goes with the passed <code>conf</code> configuration instance.
283    * If no current connection exists, method creates a new connection and keys it using
284    * connection-specific properties from the passed {@link Configuration}; see
285    * {@link HConnectionKey}.
286    * @param conf configuration
287    * @return HConnection object for <code>conf</code>
288    * @throws ZooKeeperConnectionException
289    */
290   @Deprecated
291   public static HConnection getConnection(final Configuration conf) throws IOException {
292     return getConnectionInternal(conf);
293   }
294 
295 
296   static ClusterConnection getConnectionInternal(final Configuration conf)
297     throws IOException {
298     HConnectionKey connectionKey = new HConnectionKey(conf);
299     synchronized (CONNECTION_INSTANCES) {
300       HConnectionImplementation connection = CONNECTION_INSTANCES.get(connectionKey);
301       if (connection == null) {
302         connection = (HConnectionImplementation)createConnection(conf, true);
303         CONNECTION_INSTANCES.put(connectionKey, connection);
304       } else if (connection.isClosed()) {
305         ConnectionManager.deleteConnection(connectionKey, true);
306         connection = (HConnectionImplementation)createConnection(conf, true);
307         CONNECTION_INSTANCES.put(connectionKey, connection);
308       }
309       connection.incCount();
310       return connection;
311     }
312   }
313 
314   /**
315    * Create a new HConnection instance using the passed <code>conf</code> instance.
316    * <p>Note: This bypasses the usual HConnection life cycle management done by
317    * {@link #getConnection(Configuration)}. The caller is responsible for
318    * calling {@link HConnection#close()} on the returned connection instance.
319    *
320    * This is the recommended way to create HConnections.
321    * {@code
322    * HConnection connection = ConnectionManagerInternal.createConnection(conf);
323    * HTableInterface table = connection.getTable("mytable");
324    * table.get(...);
325    * ...
326    * table.close();
327    * connection.close();
328    * }
329    *
330    * @param conf configuration
331    * @return HConnection object for <code>conf</code>
332    * @throws ZooKeeperConnectionException
333    */
334   public static HConnection createConnection(Configuration conf) throws IOException {
335     return createConnectionInternal(conf);
336   }
337 
338   static ClusterConnection createConnectionInternal(Configuration conf) throws IOException {
339     UserProvider provider = UserProvider.instantiate(conf);
340     return createConnection(conf, false, null, provider.getCurrent());
341   }
342 
343   /**
344    * Create a new HConnection instance using the passed <code>conf</code> instance.
345    * <p>Note: This bypasses the usual HConnection life cycle management done by
346    * {@link #getConnection(Configuration)}. The caller is responsible for
347    * calling {@link HConnection#close()} on the returned connection instance.
348    * This is the recommended way to create HConnections.
349    * {@code
350    * ExecutorService pool = ...;
351    * HConnection connection = HConnectionManager.createConnection(conf, pool);
352    * HTableInterface table = connection.getTable("mytable");
353    * table.get(...);
354    * ...
355    * table.close();
356    * connection.close();
357    * }
358    * @param conf configuration
359    * @param pool the thread pool to use for batch operation in HTables used via this HConnection
360    * @return HConnection object for <code>conf</code>
361    * @throws ZooKeeperConnectionException
362    */
363   public static HConnection createConnection(Configuration conf, ExecutorService pool)
364   throws IOException {
365     UserProvider provider = UserProvider.instantiate(conf);
366     return createConnection(conf, false, pool, provider.getCurrent());
367   }
368 
369   /**
370    * Create a new HConnection instance using the passed <code>conf</code> instance.
371    * <p>Note: This bypasses the usual HConnection life cycle management done by
372    * {@link #getConnection(Configuration)}. The caller is responsible for
373    * calling {@link HConnection#close()} on the returned connection instance.
374    * This is the recommended way to create HConnections.
375    * {@code
376    * ExecutorService pool = ...;
377    * HConnection connection = HConnectionManager.createConnection(conf, pool);
378    * HTableInterface table = connection.getTable("mytable");
379    * table.get(...);
380    * ...
381    * table.close();
382    * connection.close();
383    * }
384    * @param conf configuration
385    * @param user the user the connection is for
386    * @return HConnection object for <code>conf</code>
387    * @throws ZooKeeperConnectionException
388    */
389   public static HConnection createConnection(Configuration conf, User user)
390   throws IOException {
391     return createConnection(conf, false, null, user);
392   }
393 
394   /**
395    * Create a new HConnection instance using the passed <code>conf</code> instance.
396    * <p>Note: This bypasses the usual HConnection life cycle management done by
397    * {@link #getConnection(Configuration)}. The caller is responsible for
398    * calling {@link HConnection#close()} on the returned connection instance.
399    * This is the recommended way to create HConnections.
400    * {@code
401    * ExecutorService pool = ...;
402    * HConnection connection = HConnectionManager.createConnection(conf, pool);
403    * HTableInterface table = connection.getTable("mytable");
404    * table.get(...);
405    * ...
406    * table.close();
407    * connection.close();
408    * }
409    * @param conf configuration
410    * @param pool the thread pool to use for batch operation in HTables used via this HConnection
411    * @param user the user the connection is for
412    * @return HConnection object for <code>conf</code>
413    * @throws ZooKeeperConnectionException
414    */
415   public static HConnection createConnection(Configuration conf, ExecutorService pool, User user)
416   throws IOException {
417     return createConnection(conf, false, pool, user);
418   }
419 
420   @Deprecated
421   static HConnection createConnection(final Configuration conf, final boolean managed)
422       throws IOException {
423     UserProvider provider = UserProvider.instantiate(conf);
424     return createConnection(conf, managed, null, provider.getCurrent());
425   }
426 
427   @Deprecated
428   static ClusterConnection createConnection(final Configuration conf, final boolean managed,
429       final ExecutorService pool, final User user)
430   throws IOException {
431     return (ClusterConnection) ConnectionFactory.createConnection(conf, managed, pool, user);
432   }
433 
434   /**
435    * Delete connection information for the instance specified by passed configuration.
436    * If there are no more references to the designated connection connection, this method will
437    * then close connection to the zookeeper ensemble and let go of all associated resources.
438    *
439    * @param conf configuration whose identity is used to find {@link HConnection} instance.
440    * @deprecated
441    */
442   @Deprecated
443   public static void deleteConnection(Configuration conf) {
444     deleteConnection(new HConnectionKey(conf), false);
445   }
446 
447   /**
448    * Cleanup a known stale connection.
449    * This will then close connection to the zookeeper ensemble and let go of all resources.
450    *
451    * @param connection
452    * @deprecated
453    */
454   @Deprecated
455   public static void deleteStaleConnection(HConnection connection) {
456     deleteConnection(connection, true);
457   }
458 
459   /**
460    * Delete information for all connections. Close or not the connection, depending on the
461    *  staleConnection boolean and the ref count. By default, you should use it with
462    *  staleConnection to true.
463    * @deprecated
464    */
465   @Deprecated
466   public static void deleteAllConnections(boolean staleConnection) {
467     synchronized (CONNECTION_INSTANCES) {
468       Set<HConnectionKey> connectionKeys = new HashSet<HConnectionKey>();
469       connectionKeys.addAll(CONNECTION_INSTANCES.keySet());
470       for (HConnectionKey connectionKey : connectionKeys) {
471         deleteConnection(connectionKey, staleConnection);
472       }
473       CONNECTION_INSTANCES.clear();
474     }
475   }
476 
477   /**
478    * Delete information for all connections..
479    * @deprecated kept for backward compatibility, but the behavior is broken. HBASE-8983
480    */
481   @Deprecated
482   public static void deleteAllConnections() {
483     deleteAllConnections(false);
484   }
485 
486 
487   @Deprecated
488   private static void deleteConnection(HConnection connection, boolean staleConnection) {
489     synchronized (CONNECTION_INSTANCES) {
490       for (Entry<HConnectionKey, HConnectionImplementation> e: CONNECTION_INSTANCES.entrySet()) {
491         if (e.getValue() == connection) {
492           deleteConnection(e.getKey(), staleConnection);
493           break;
494         }
495       }
496     }
497   }
498 
499   @Deprecated
500   private static void deleteConnection(HConnectionKey connectionKey, boolean staleConnection) {
501     synchronized (CONNECTION_INSTANCES) {
502       HConnectionImplementation connection = CONNECTION_INSTANCES.get(connectionKey);
503       if (connection != null) {
504         connection.decCount();
505         if (connection.isZeroReference() || staleConnection) {
506           CONNECTION_INSTANCES.remove(connectionKey);
507           connection.internalClose();
508         }
509       } else {
510         LOG.error("Connection not found in the list, can't delete it "+
511           "(connection key=" + connectionKey + "). May be the key was modified?", new Exception());
512       }
513     }
514   }
515 
516 
517   /**
518    * This convenience method invokes the given {@link HConnectable#connect}
519    * implementation using a {@link HConnection} instance that lasts just for the
520    * duration of the invocation.
521    *
522    * @param <T> the return type of the connect method
523    * @param connectable the {@link HConnectable} instance
524    * @return the value returned by the connect method
525    * @throws IOException
526    */
527   @InterfaceAudience.Private
528   public static <T> T execute(HConnectable<T> connectable) throws IOException {
529     if (connectable == null || connectable.conf == null) {
530       return null;
531     }
532     Configuration conf = connectable.conf;
533     HConnection connection = getConnection(conf);
534     boolean connectSucceeded = false;
535     try {
536       T returnValue = connectable.connect(connection);
537       connectSucceeded = true;
538       return returnValue;
539     } finally {
540       try {
541         connection.close();
542       } catch (Exception e) {
543         ExceptionUtil.rethrowIfInterrupt(e);
544         if (connectSucceeded) {
545           throw new IOException("The connection to " + connection
546               + " could not be deleted.", e);
547         }
548       }
549     }
550   }
551 
552   /** Encapsulates connection to zookeeper and regionservers.*/
553   @edu.umd.cs.findbugs.annotations.SuppressWarnings(
554       value="AT_OPERATION_SEQUENCE_ON_CONCURRENT_ABSTRACTION",
555       justification="Access to the conncurrent hash map is under a lock so should be fine.")
556   static class HConnectionImplementation implements ClusterConnection, Closeable {
557     static final Log LOG = LogFactory.getLog(HConnectionImplementation.class);
558     private final boolean hostnamesCanChange;
559     private final long pause;
560     private final boolean useMetaReplicas;
561     private final int numTries;
562     final int rpcTimeout;
563     private NonceGenerator nonceGenerator = null;
564     private final AsyncProcess asyncProcess;
565     // single tracker per connection
566     private final ServerStatisticTracker stats;
567 
568     private volatile boolean closed;
569     private volatile boolean aborted;
570 
571     // package protected for the tests
572     ClusterStatusListener clusterStatusListener;
573 
574 
575     private final Object metaRegionLock = new Object();
576 
577     // We have a single lock for master & zk to prevent deadlocks. Having
578     //  one lock for ZK and one lock for master is not possible:
579     //  When creating a connection to master, we need a connection to ZK to get
580     //  its address. But another thread could have taken the ZK lock, and could
581     //  be waiting for the master lock => deadlock.
582     private final Object masterAndZKLock = new Object();
583 
584     private long keepZooKeeperWatcherAliveUntil = Long.MAX_VALUE;
585 
586     // thread executor shared by all HTableInterface instances created
587     // by this connection
588     private volatile ExecutorService batchPool = null;
589     // meta thread executor shared by all HTableInterface instances created
590     // by this connection
591     private volatile ExecutorService metaLookupPool = null;
592     private volatile boolean cleanupPool = false;
593 
594     private final Configuration conf;
595 
596     // cache the configuration value for tables so that we can avoid calling
597     // the expensive Configuration to fetch the value multiple times.
598     private final ConnectionConfiguration connectionConfig;
599 
600     // Client rpc instance.
601     private RpcClient rpcClient;
602 
603     private final MetaCache metaCache;
604     private final MetricsConnection metrics;
605 
606     private int refCount;
607 
608     // indicates whether this connection's life cycle is managed (by us)
609     private boolean managed;
610 
611     protected User user;
612 
613     private RpcRetryingCallerFactory rpcCallerFactory;
614 
615     private RpcControllerFactory rpcControllerFactory;
616 
617     private final RetryingCallerInterceptor interceptor;
618 
619     /**
620      * Cluster registry of basic info such as clusterid and meta region location.
621      */
622      Registry registry;
623 
624     private final ClientBackoffPolicy backoffPolicy;
625 
626      HConnectionImplementation(Configuration conf, boolean managed) throws IOException {
627        this(conf, managed, null, null);
628      }
629 
630     /**
631      * constructor
632      * @param conf Configuration object
633      * @param managed If true, does not do full shutdown on close; i.e. cleanup of connection
634      * to zk and shutdown of all services; we just close down the resources this connection was
635      * responsible for and decrement usage counters.  It is up to the caller to do the full
636      * cleanup.  It is set when we want have connection sharing going on -- reuse of zk connection,
637      * and cached region locations, established regionserver connections, etc.  When connections
638      * are shared, we have reference counting going on and will only do full cleanup when no more
639      * users of an HConnectionImplementation instance.
640      */
641     HConnectionImplementation(Configuration conf, boolean managed,
642         ExecutorService pool, User user) throws IOException {
643       this(conf);
644       this.user = user;
645       this.batchPool = pool;
646       this.managed = managed;
647       this.registry = setupRegistry();
648       retrieveClusterId();
649 
650       this.rpcClient = RpcClientFactory.createClient(this.conf, this.clusterId, this.metrics);
651       this.rpcControllerFactory = RpcControllerFactory.instantiate(conf);
652 
653       // Do we publish the status?
654       boolean shouldListen = conf.getBoolean(HConstants.STATUS_PUBLISHED,
655           HConstants.STATUS_PUBLISHED_DEFAULT);
656       Class<? extends ClusterStatusListener.Listener> listenerClass =
657           conf.getClass(ClusterStatusListener.STATUS_LISTENER_CLASS,
658               ClusterStatusListener.DEFAULT_STATUS_LISTENER_CLASS,
659               ClusterStatusListener.Listener.class);
660       if (shouldListen) {
661         if (listenerClass == null) {
662           LOG.warn(HConstants.STATUS_PUBLISHED + " is true, but " +
663               ClusterStatusListener.STATUS_LISTENER_CLASS + " is not set - not listening status");
664         } else {
665           clusterStatusListener = new ClusterStatusListener(
666               new ClusterStatusListener.DeadServerHandler() {
667                 @Override
668                 public void newDead(ServerName sn) {
669                   clearCaches(sn);
670                   rpcClient.cancelConnections(sn);
671                 }
672               }, conf, listenerClass);
673         }
674       }
675     }
676 
677     /**
678      * For tests.
679      */
680     protected HConnectionImplementation(Configuration conf) {
681       this.conf = conf;
682       this.connectionConfig = new ConnectionConfiguration(conf);
683       this.closed = false;
684       this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
685           HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
686       this.useMetaReplicas = conf.getBoolean(HConstants.USE_META_REPLICAS,
687           HConstants.DEFAULT_USE_META_REPLICAS);
688       this.numTries = connectionConfig.getRetriesNumber();
689       this.rpcTimeout = conf.getInt(
690           HConstants.HBASE_RPC_TIMEOUT_KEY,
691           HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
692       if (conf.getBoolean(CLIENT_NONCES_ENABLED_KEY, true)) {
693         synchronized (nonceGeneratorCreateLock) {
694           if (ConnectionManager.nonceGenerator == null) {
695             ConnectionManager.nonceGenerator = new PerClientRandomNonceGenerator();
696           }
697           this.nonceGenerator = ConnectionManager.nonceGenerator;
698         }
699       } else {
700         this.nonceGenerator = new NoNonceGenerator();
701       }
702       stats = ServerStatisticTracker.create(conf);
703       this.asyncProcess = createAsyncProcess(this.conf);
704       this.interceptor = (new RetryingCallerInterceptorFactory(conf)).build();
705       this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf, interceptor, this.stats);
706       this.backoffPolicy = ClientBackoffPolicyFactory.create(conf);
707       if (conf.getBoolean(CLIENT_SIDE_METRICS_ENABLED_KEY, false)) {
708         this.metrics = new MetricsConnection(this);
709       } else {
710         this.metrics = null;
711       }
712       
713       this.hostnamesCanChange = conf.getBoolean(RESOLVE_HOSTNAME_ON_FAIL_KEY, true);
714       this.metaCache = new MetaCache(this.metrics);
715     }
716 
717     @Override
718     public HTableInterface getTable(String tableName) throws IOException {
719       return getTable(TableName.valueOf(tableName));
720     }
721 
722     @Override
723     public HTableInterface getTable(byte[] tableName) throws IOException {
724       return getTable(TableName.valueOf(tableName));
725     }
726 
727     @Override
728     public HTableInterface getTable(TableName tableName) throws IOException {
729       return getTable(tableName, getBatchPool());
730     }
731 
732     @Override
733     public HTableInterface getTable(String tableName, ExecutorService pool) throws IOException {
734       return getTable(TableName.valueOf(tableName), pool);
735     }
736 
737     @Override
738     public HTableInterface getTable(byte[] tableName, ExecutorService pool) throws IOException {
739       return getTable(TableName.valueOf(tableName), pool);
740     }
741 
742     @Override
743     public HTableInterface getTable(TableName tableName, ExecutorService pool) throws IOException {
744       if (managed) {
745         throw new NeedUnmanagedConnectionException();
746       }
747       return new HTable(tableName, this, connectionConfig, rpcCallerFactory, rpcControllerFactory, pool);
748     }
749 
750     @Override
751     public BufferedMutator getBufferedMutator(BufferedMutatorParams params) {
752       if (params.getTableName() == null) {
753         throw new IllegalArgumentException("TableName cannot be null.");
754       }
755       if (params.getPool() == null) {
756         params.pool(HTable.getDefaultExecutor(getConfiguration()));
757       }
758       if (params.getWriteBufferSize() == BufferedMutatorParams.UNSET) {
759         params.writeBufferSize(connectionConfig.getWriteBufferSize());
760       }
761       if (params.getMaxKeyValueSize() == BufferedMutatorParams.UNSET) {
762         params.maxKeyValueSize(connectionConfig.getMaxKeyValueSize());
763       }
764       return new BufferedMutatorImpl(this, rpcCallerFactory, rpcControllerFactory, params);
765     }
766 
767     @Override
768     public BufferedMutator getBufferedMutator(TableName tableName) {
769       return getBufferedMutator(new BufferedMutatorParams(tableName));
770     }
771 
772     @Override
773     public RegionLocator getRegionLocator(TableName tableName) throws IOException {
774       return new HRegionLocator(tableName, this);
775     }
776 
777     @Override
778     public Admin getAdmin() throws IOException {
779       if (managed) {
780         throw new NeedUnmanagedConnectionException();
781       }
782       return new HBaseAdmin(this);
783     }
784 
785     @Override
786     public MetricsConnection getConnectionMetrics() {
787       return this.metrics;
788     }
789 
790     private ExecutorService getBatchPool() {
791       if (batchPool == null) {
792         synchronized (this) {
793           if (batchPool == null) {
794             this.batchPool = getThreadPool(conf.getInt("hbase.hconnection.threads.max", 256),
795                 conf.getInt("hbase.hconnection.threads.core", 256), "-shared-", null);
796             this.cleanupPool = true;
797           }
798         }
799       }
800       return this.batchPool;
801     }
802 
803     private ExecutorService getThreadPool(int maxThreads, int coreThreads, String nameHint,
804         BlockingQueue<Runnable> passedWorkQueue) {
805       // shared HTable thread executor not yet initialized
806       if (maxThreads == 0) {
807         maxThreads = Runtime.getRuntime().availableProcessors() * 8;
808       }
809       if (coreThreads == 0) {
810         coreThreads = Runtime.getRuntime().availableProcessors() * 8;
811       }
812       long keepAliveTime = conf.getLong("hbase.hconnection.threads.keepalivetime", 60);
813       BlockingQueue<Runnable> workQueue = passedWorkQueue;
814       if (workQueue == null) {
815         workQueue =
816           new LinkedBlockingQueue<Runnable>(maxThreads *
817               conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS,
818                   HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS));
819       }
820       ThreadPoolExecutor tpe = new ThreadPoolExecutor(
821           coreThreads,
822           maxThreads,
823           keepAliveTime,
824           TimeUnit.SECONDS,
825           workQueue,
826           Threads.newDaemonThreadFactory(toString() + nameHint));
827       tpe.allowCoreThreadTimeOut(true);
828       return tpe;
829     }
830 
831     private ExecutorService getMetaLookupPool() {
832       if (this.metaLookupPool == null) {
833         synchronized (this) {
834           if (this.metaLookupPool == null) {
835             //Some of the threads would be used for meta replicas
836             //To start with, threads.max.core threads can hit the meta (including replicas).
837             //After that, requests will get queued up in the passed queue, and only after
838             //the queue is full, a new thread will be started
839             this.metaLookupPool = getThreadPool(
840                conf.getInt("hbase.hconnection.meta.lookup.threads.max", 128),
841                conf.getInt("hbase.hconnection.meta.lookup.threads.core", 10),
842              "-metaLookup-shared-", new LinkedBlockingQueue<Runnable>());
843           }
844         }
845       }
846       return this.metaLookupPool;
847     }
848 
849     protected ExecutorService getCurrentMetaLookupPool() {
850       return metaLookupPool;
851     }
852 
853     protected ExecutorService getCurrentBatchPool() {
854       return batchPool;
855     }
856 
857     private void shutdownPools() {
858       if (this.cleanupPool && this.batchPool != null && !this.batchPool.isShutdown()) {
859         shutdownBatchPool(this.batchPool);
860       }
861       if (this.metaLookupPool != null && !this.metaLookupPool.isShutdown()) {
862         shutdownBatchPool(this.metaLookupPool);
863       }
864     }
865 
866     private void shutdownBatchPool(ExecutorService pool) {
867       pool.shutdown();
868       try {
869         if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
870           pool.shutdownNow();
871         }
872       } catch (InterruptedException e) {
873         pool.shutdownNow();
874       }
875     }
876 
877     /**
878      * @return The cluster registry implementation to use.
879      * @throws IOException
880      */
881     private Registry setupRegistry() throws IOException {
882       return RegistryFactory.getRegistry(this);
883     }
884 
885     /**
886      * For tests only.
887      */
888     @VisibleForTesting
889     RpcClient getRpcClient() {
890       return rpcClient;
891     }
892 
893     /**
894      * An identifier that will remain the same for a given connection.
895      */
896     @Override
897     public String toString(){
898       return "hconnection-0x" + Integer.toHexString(hashCode());
899     }
900 
901     protected String clusterId = null;
902 
903     void retrieveClusterId() {
904       if (clusterId != null) return;
905       this.clusterId = this.registry.getClusterId();
906       if (clusterId == null) {
907         clusterId = HConstants.CLUSTER_ID_DEFAULT;
908         LOG.debug("clusterid came back null, using default " + clusterId);
909       }
910     }
911 
912     @Override
913     public Configuration getConfiguration() {
914       return this.conf;
915     }
916 
917     private void checkIfBaseNodeAvailable(ZooKeeperWatcher zkw)
918       throws MasterNotRunningException {
919       String errorMsg;
920       try {
921         if (ZKUtil.checkExists(zkw, zkw.baseZNode) == -1) {
922           errorMsg = "The node " + zkw.baseZNode+" is not in ZooKeeper. "
923             + "It should have been written by the master. "
924             + "Check the value configured in 'zookeeper.znode.parent'. "
925             + "There could be a mismatch with the one configured in the master.";
926           LOG.error(errorMsg);
927           throw new MasterNotRunningException(errorMsg);
928         }
929       } catch (KeeperException e) {
930         errorMsg = "Can't get connection to ZooKeeper: " + e.getMessage();
931         LOG.error(errorMsg);
932         throw new MasterNotRunningException(errorMsg, e);
933       }
934     }
935 
936     /**
937      * @return true if the master is running, throws an exception otherwise
938      * @throws MasterNotRunningException - if the master is not running
939      * @throws ZooKeeperConnectionException
940      */
941     @Deprecated
942     @Override
943     public boolean isMasterRunning()
944     throws MasterNotRunningException, ZooKeeperConnectionException {
945       // When getting the master connection, we check it's running,
946       // so if there is no exception, it means we've been able to get a
947       // connection on a running master
948       MasterKeepAliveConnection m = getKeepAliveMasterService();
949       m.close();
950       return true;
951     }
952 
953     @Override
954     public HRegionLocation getRegionLocation(final TableName tableName,
955         final byte [] row, boolean reload)
956     throws IOException {
957       return reload? relocateRegion(tableName, row): locateRegion(tableName, row);
958     }
959 
960     @Override
961     public HRegionLocation getRegionLocation(final byte[] tableName,
962         final byte [] row, boolean reload)
963     throws IOException {
964       return getRegionLocation(TableName.valueOf(tableName), row, reload);
965     }
966 
967     @Override
968     public boolean isTableEnabled(TableName tableName) throws IOException {
969       return this.registry.isTableOnlineState(tableName, true);
970     }
971 
972     @Override
973     public boolean isTableEnabled(byte[] tableName) throws IOException {
974       return isTableEnabled(TableName.valueOf(tableName));
975     }
976 
977     @Override
978     public boolean isTableDisabled(TableName tableName) throws IOException {
979       return this.registry.isTableOnlineState(tableName, false);
980     }
981 
982     @Override
983     public boolean isTableDisabled(byte[] tableName) throws IOException {
984       return isTableDisabled(TableName.valueOf(tableName));
985     }
986 
987     @Override
988     public boolean isTableAvailable(final TableName tableName) throws IOException {
989       final AtomicBoolean available = new AtomicBoolean(true);
990       final AtomicInteger regionCount = new AtomicInteger(0);
991       MetaScannerVisitor visitor = new MetaScannerVisitorBase() {
992         @Override
993         public boolean processRow(Result row) throws IOException {
994           HRegionInfo info = MetaScanner.getHRegionInfo(row);
995           if (info != null && !info.isSplitParent()) {
996             if (tableName.equals(info.getTable())) {
997               ServerName server = HRegionInfo.getServerName(row);
998               if (server == null) {
999                 available.set(false);
1000                 return false;
1001               }
1002               regionCount.incrementAndGet();
1003             } else if (tableName.compareTo(info.getTable()) < 0) {
1004               // Return if we are done with the current table
1005               return false;
1006             }
1007           }
1008           return true;
1009         }
1010       };
1011       MetaScanner.metaScan(this, visitor, tableName);
1012       return available.get() && (regionCount.get() > 0);
1013     }
1014 
1015     @Override
1016     public boolean isTableAvailable(final byte[] tableName) throws IOException {
1017       return isTableAvailable(TableName.valueOf(tableName));
1018     }
1019 
1020     @Override
1021     public boolean isTableAvailable(final TableName tableName, final byte[][] splitKeys)
1022         throws IOException {
1023       final AtomicBoolean available = new AtomicBoolean(true);
1024       final AtomicInteger regionCount = new AtomicInteger(0);
1025       MetaScannerVisitor visitor = new MetaScannerVisitorBase() {
1026         @Override
1027         public boolean processRow(Result row) throws IOException {
1028           HRegionInfo info = MetaScanner.getHRegionInfo(row);
1029           if (info != null && !info.isSplitParent()) {
1030             if (tableName.equals(info.getTable())) {
1031               ServerName server = HRegionInfo.getServerName(row);
1032               if (server == null) {
1033                 available.set(false);
1034                 return false;
1035               }
1036               if (!Bytes.equals(info.getStartKey(), HConstants.EMPTY_BYTE_ARRAY)) {
1037                 for (byte[] splitKey : splitKeys) {
1038                   // Just check if the splitkey is available
1039                   if (Bytes.equals(info.getStartKey(), splitKey)) {
1040                     regionCount.incrementAndGet();
1041                     break;
1042                   }
1043                 }
1044               } else {
1045                 // Always empty start row should be counted
1046                 regionCount.incrementAndGet();
1047               }
1048             } else if (tableName.compareTo(info.getTable()) < 0) {
1049               // Return if we are done with the current table
1050               return false;
1051             }
1052           }
1053           return true;
1054         }
1055       };
1056       MetaScanner.metaScan(this, visitor, tableName);
1057       // +1 needs to be added so that the empty start row is also taken into account
1058       return available.get() && (regionCount.get() == splitKeys.length + 1);
1059     }
1060 
1061     @Override
1062     public boolean isTableAvailable(final byte[] tableName, final byte[][] splitKeys)
1063         throws IOException {
1064       return isTableAvailable(TableName.valueOf(tableName), splitKeys);
1065     }
1066 
1067     @Override
1068     public HRegionLocation locateRegion(final byte[] regionName) throws IOException {
1069       RegionLocations locations = locateRegion(HRegionInfo.getTable(regionName),
1070         HRegionInfo.getStartKey(regionName), false, true);
1071       return locations == null ? null : locations.getRegionLocation();
1072     }
1073 
1074     @Override
1075     public boolean isDeadServer(ServerName sn) {
1076       if (clusterStatusListener == null) {
1077         return false;
1078       } else {
1079         return clusterStatusListener.isDeadServer(sn);
1080       }
1081     }
1082 
1083     @Override
1084     public List<HRegionLocation> locateRegions(final TableName tableName)
1085     throws IOException {
1086       return locateRegions (tableName, false, true);
1087     }
1088 
1089     @Override
1090     public List<HRegionLocation> locateRegions(final byte[] tableName)
1091     throws IOException {
1092       return locateRegions(TableName.valueOf(tableName));
1093     }
1094 
1095     @Override
1096     public List<HRegionLocation> locateRegions(final TableName tableName,
1097         final boolean useCache, final boolean offlined) throws IOException {
1098       NavigableMap<HRegionInfo, ServerName> regions = MetaScanner.allTableRegions(this, tableName);
1099       final List<HRegionLocation> locations = new ArrayList<HRegionLocation>();
1100       for (HRegionInfo regionInfo : regions.keySet()) {
1101         RegionLocations list = locateRegion(tableName, regionInfo.getStartKey(), useCache, true);
1102         if (list != null) {
1103           for (HRegionLocation loc : list.getRegionLocations()) {
1104             if (loc != null) {
1105               locations.add(loc);
1106             }
1107           }
1108         }
1109       }
1110       return locations;
1111     }
1112 
1113     @Override
1114     public List<HRegionLocation> locateRegions(final byte[] tableName,
1115        final boolean useCache, final boolean offlined) throws IOException {
1116       return locateRegions(TableName.valueOf(tableName), useCache, offlined);
1117     }
1118 
1119     @Override
1120     public HRegionLocation locateRegion(
1121         final TableName tableName, final byte[] row) throws IOException{
1122       RegionLocations locations = locateRegion(tableName, row, true, true);
1123       return locations == null ? null : locations.getRegionLocation();
1124     }
1125 
1126     @Override
1127     public HRegionLocation locateRegion(final byte[] tableName,
1128         final byte [] row)
1129     throws IOException{
1130       return locateRegion(TableName.valueOf(tableName), row);
1131     }
1132 
1133     @Override
1134     public HRegionLocation relocateRegion(final TableName tableName,
1135         final byte [] row) throws IOException{
1136       RegionLocations locations =  relocateRegion(tableName, row,
1137         RegionReplicaUtil.DEFAULT_REPLICA_ID);
1138       return locations == null ? null :
1139         locations.getRegionLocation(RegionReplicaUtil.DEFAULT_REPLICA_ID);
1140     }
1141 
1142     @Override
1143     public RegionLocations relocateRegion(final TableName tableName,
1144         final byte [] row, int replicaId) throws IOException{
1145       // Since this is an explicit request not to use any caching, finding
1146       // disabled tables should not be desirable.  This will ensure that an exception is thrown when
1147       // the first time a disabled table is interacted with.
1148       if (!tableName.equals(TableName.META_TABLE_NAME) && isTableDisabled(tableName)) {
1149         throw new TableNotEnabledException(tableName.getNameAsString() + " is disabled.");
1150       }
1151 
1152       return locateRegion(tableName, row, false, true, replicaId);
1153     }
1154 
1155     @Override
1156     public HRegionLocation relocateRegion(final byte[] tableName,
1157         final byte [] row) throws IOException {
1158       return relocateRegion(TableName.valueOf(tableName), row);
1159     }
1160 
1161     @Override
1162     public RegionLocations locateRegion(final TableName tableName,
1163       final byte [] row, boolean useCache, boolean retry)
1164     throws IOException {
1165       return locateRegion(tableName, row, useCache, retry, RegionReplicaUtil.DEFAULT_REPLICA_ID);
1166     }
1167 
1168     @Override
1169     public RegionLocations locateRegion(final TableName tableName,
1170       final byte [] row, boolean useCache, boolean retry, int replicaId)
1171     throws IOException {
1172       if (this.closed) throw new DoNotRetryIOException(toString() + " closed");
1173       if (tableName== null || tableName.getName().length == 0) {
1174         throw new IllegalArgumentException(
1175             "table name cannot be null or zero length");
1176       }
1177       if (tableName.equals(TableName.META_TABLE_NAME)) {
1178         return locateMeta(tableName, useCache, replicaId);
1179       } else {
1180         // Region not in the cache - have to go to the meta RS
1181         return locateRegionInMeta(tableName, row, useCache, retry, replicaId);
1182       }
1183     }
1184 
1185     private RegionLocations locateMeta(final TableName tableName,
1186         boolean useCache, int replicaId) throws IOException {
1187       // HBASE-10785: We cache the location of the META itself, so that we are not overloading
1188       // zookeeper with one request for every region lookup. We cache the META with empty row
1189       // key in MetaCache.
1190       byte[] metaCacheKey = HConstants.EMPTY_START_ROW; // use byte[0] as the row for meta
1191       RegionLocations locations = null;
1192       if (useCache) {
1193         locations = getCachedLocation(tableName, metaCacheKey);
1194         if (locations != null && locations.getRegionLocation(replicaId) != null) {
1195           return locations;
1196         }
1197       }
1198 
1199       // only one thread should do the lookup.
1200       synchronized (metaRegionLock) {
1201         // Check the cache again for a hit in case some other thread made the
1202         // same query while we were waiting on the lock.
1203         if (useCache) {
1204           locations = getCachedLocation(tableName, metaCacheKey);
1205           if (locations != null && locations.getRegionLocation(replicaId) != null) {
1206             return locations;
1207           }
1208         }
1209 
1210         // Look up from zookeeper
1211         locations = this.registry.getMetaRegionLocation();
1212         if (locations != null) {
1213           cacheLocation(tableName, locations);
1214         }
1215       }
1216       return locations;
1217     }
1218 
1219     /*
1220       * Search the hbase:meta table for the HRegionLocation
1221       * info that contains the table and row we're seeking.
1222       */
1223     private RegionLocations locateRegionInMeta(TableName tableName, byte[] row,
1224                    boolean useCache, boolean retry, int replicaId) throws IOException {
1225 
1226       // If we are supposed to be using the cache, look in the cache to see if
1227       // we already have the region.
1228       if (useCache) {
1229         RegionLocations locations = getCachedLocation(tableName, row);
1230         if (locations != null && locations.getRegionLocation(replicaId) != null) {
1231           return locations;
1232         }
1233       }
1234 
1235       // build the key of the meta region we should be looking for.
1236       // the extra 9's on the end are necessary to allow "exact" matches
1237       // without knowing the precise region names.
1238       byte[] metaKey = HRegionInfo.createRegionName(tableName, row, HConstants.NINES, false);
1239 
1240       Scan s = new Scan();
1241       s.setReversed(true);
1242       s.setStartRow(metaKey);
1243       s.setSmall(true);
1244       s.setCaching(1);
1245       if (this.useMetaReplicas) {
1246         s.setConsistency(Consistency.TIMELINE);
1247       }
1248 
1249       int localNumRetries = (retry ? numTries : 1);
1250 
1251       for (int tries = 0; true; tries++) {
1252         if (tries >= localNumRetries) {
1253           throw new NoServerForRegionException("Unable to find region for "
1254               + Bytes.toStringBinary(row) + " in " + tableName +
1255               " after " + localNumRetries + " tries.");
1256         }
1257         if (useCache) {
1258           RegionLocations locations = getCachedLocation(tableName, row);
1259           if (locations != null && locations.getRegionLocation(replicaId) != null) {
1260             return locations;
1261           }
1262         } else {
1263           // If we are not supposed to be using the cache, delete any existing cached location
1264           // so it won't interfere.
1265           metaCache.clearCache(tableName, row);
1266         }
1267 
1268         // Query the meta region
1269         try {
1270           Result regionInfoRow = null;
1271           ReversedClientScanner rcs = null;
1272           try {
1273             rcs = new ClientSmallReversedScanner(conf, s, TableName.META_TABLE_NAME, this,
1274               rpcCallerFactory, rpcControllerFactory, getMetaLookupPool(), 0);
1275             regionInfoRow = rcs.next();
1276           } finally {
1277             if (rcs != null) {
1278               rcs.close();
1279             }
1280           }
1281 
1282           if (regionInfoRow == null) {
1283             throw new TableNotFoundException(tableName);
1284           }
1285 
1286           // convert the row result into the HRegionLocation we need!
1287           RegionLocations locations = MetaTableAccessor.getRegionLocations(regionInfoRow);
1288           if (locations == null || locations.getRegionLocation(replicaId) == null) {
1289             throw new IOException("HRegionInfo was null in " +
1290               tableName + ", row=" + regionInfoRow);
1291           }
1292           HRegionInfo regionInfo = locations.getRegionLocation(replicaId).getRegionInfo();
1293           if (regionInfo == null) {
1294             throw new IOException("HRegionInfo was null or empty in " +
1295               TableName.META_TABLE_NAME + ", row=" + regionInfoRow);
1296           }
1297 
1298           // possible we got a region of a different table...
1299           if (!regionInfo.getTable().equals(tableName)) {
1300             throw new TableNotFoundException(
1301                   "Table '" + tableName + "' was not found, got: " +
1302                   regionInfo.getTable() + ".");
1303           }
1304           if (regionInfo.isSplit()) {
1305             throw new RegionOfflineException("the only available region for" +
1306               " the required row is a split parent," +
1307               " the daughters should be online soon: " +
1308               regionInfo.getRegionNameAsString());
1309           }
1310           if (regionInfo.isOffline()) {
1311             throw new RegionOfflineException("the region is offline, could" +
1312               " be caused by a disable table call: " +
1313               regionInfo.getRegionNameAsString());
1314           }
1315 
1316           ServerName serverName = locations.getRegionLocation(replicaId).getServerName();
1317           if (serverName == null) {
1318             throw new NoServerForRegionException("No server address listed " +
1319               "in " + TableName.META_TABLE_NAME + " for region " +
1320               regionInfo.getRegionNameAsString() + " containing row " +
1321               Bytes.toStringBinary(row));
1322           }
1323 
1324           if (isDeadServer(serverName)){
1325             throw new RegionServerStoppedException("hbase:meta says the region "+
1326                 regionInfo.getRegionNameAsString()+" is managed by the server " + serverName +
1327                 ", but it is dead.");
1328           }
1329           // Instantiate the location
1330           cacheLocation(tableName, locations);
1331           return locations;
1332         } catch (TableNotFoundException e) {
1333           // if we got this error, probably means the table just plain doesn't
1334           // exist. rethrow the error immediately. this should always be coming
1335           // from the HTable constructor.
1336           throw e;
1337         } catch (IOException e) {
1338           ExceptionUtil.rethrowIfInterrupt(e);
1339 
1340           if (e instanceof RemoteException) {
1341             e = ((RemoteException)e).unwrapRemoteException();
1342           }
1343           if (tries < localNumRetries - 1) {
1344             if (LOG.isDebugEnabled()) {
1345               LOG.debug("locateRegionInMeta parentTable=" +
1346                   TableName.META_TABLE_NAME + ", metaLocation=" +
1347                 ", attempt=" + tries + " of " +
1348                 localNumRetries + " failed; retrying after sleep of " +
1349                 ConnectionUtils.getPauseTime(this.pause, tries) + " because: " + e.getMessage());
1350             }
1351           } else {
1352             throw e;
1353           }
1354           // Only relocate the parent region if necessary
1355           if(!(e instanceof RegionOfflineException ||
1356               e instanceof NoServerForRegionException)) {
1357             relocateRegion(TableName.META_TABLE_NAME, metaKey, replicaId);
1358           }
1359         }
1360         try{
1361           Thread.sleep(ConnectionUtils.getPauseTime(this.pause, tries));
1362         } catch (InterruptedException e) {
1363           throw new InterruptedIOException("Giving up trying to location region in " +
1364             "meta: thread is interrupted.");
1365         }
1366       }
1367     }
1368 
1369     /**
1370      * Put a newly discovered HRegionLocation into the cache.
1371      * @param tableName The table name.
1372      * @param location the new location
1373      */
1374     @Override
1375     public void cacheLocation(final TableName tableName, final RegionLocations location) {
1376       metaCache.cacheLocation(tableName, location);
1377     }
1378 
1379     /**
1380      * Search the cache for a location that fits our table and row key.
1381      * Return null if no suitable region is located.
1382      *
1383      * @param tableName
1384      * @param row
1385      * @return Null or region location found in cache.
1386      */
1387     RegionLocations getCachedLocation(final TableName tableName,
1388         final byte [] row) {
1389       return metaCache.getCachedLocation(tableName, row);
1390     }
1391 
1392     public void clearRegionCache(final TableName tableName, byte[] row) {
1393       metaCache.clearCache(tableName, row);
1394     }
1395 
1396     /*
1397      * Delete all cached entries of a table that maps to a specific location.
1398      */
1399     @Override
1400     public void clearCaches(final ServerName serverName) {
1401       metaCache.clearCache(serverName);
1402     }
1403 
1404     @Override
1405     public void clearRegionCache() {
1406       metaCache.clearCache();
1407     }
1408 
1409     @Override
1410     public void clearRegionCache(final TableName tableName) {
1411       metaCache.clearCache(tableName);
1412     }
1413 
1414     @Override
1415     public void clearRegionCache(final byte[] tableName) {
1416       clearRegionCache(TableName.valueOf(tableName));
1417     }
1418 
1419     /**
1420      * Put a newly discovered HRegionLocation into the cache.
1421      * @param tableName The table name.
1422      * @param source the source of the new location, if it's not coming from meta
1423      * @param location the new location
1424      */
1425     private void cacheLocation(final TableName tableName, final ServerName source,
1426         final HRegionLocation location) {
1427       metaCache.cacheLocation(tableName, source, location);
1428     }
1429 
1430     // Map keyed by service name + regionserver to service stub implementation
1431     private final ConcurrentHashMap<String, Object> stubs =
1432       new ConcurrentHashMap<String, Object>();
1433     // Map of locks used creating service stubs per regionserver.
1434     private final ConcurrentHashMap<String, String> connectionLock =
1435       new ConcurrentHashMap<String, String>();
1436 
1437     /**
1438      * State of the MasterService connection/setup.
1439      */
1440     static class MasterServiceState {
1441       HConnection connection;
1442       MasterService.BlockingInterface stub;
1443       int userCount;
1444 
1445       MasterServiceState (final HConnection connection) {
1446         super();
1447         this.connection = connection;
1448       }
1449 
1450       @Override
1451       public String toString() {
1452         return "MasterService";
1453       }
1454 
1455       Object getStub() {
1456         return this.stub;
1457       }
1458 
1459       void clearStub() {
1460         this.stub = null;
1461       }
1462 
1463       boolean isMasterRunning() throws ServiceException {
1464         IsMasterRunningResponse response =
1465           this.stub.isMasterRunning(null, RequestConverter.buildIsMasterRunningRequest());
1466         return response != null? response.getIsMasterRunning(): false;
1467       }
1468     }
1469 
1470     /**
1471      * Makes a client-side stub for master services. Sub-class to specialize.
1472      * Depends on hosting class so not static.  Exists so we avoid duplicating a bunch of code
1473      * when setting up the MasterMonitorService and MasterAdminService.
1474      */
1475     abstract class StubMaker {
1476       /**
1477        * Returns the name of the service stub being created.
1478        */
1479       protected abstract String getServiceName();
1480 
1481       /**
1482        * Make stub and cache it internal so can be used later doing the isMasterRunning call.
1483        * @param channel
1484        */
1485       protected abstract Object makeStub(final BlockingRpcChannel channel);
1486 
1487       /**
1488        * Once setup, check it works by doing isMasterRunning check.
1489        * @throws ServiceException
1490        */
1491       protected abstract void isMasterRunning() throws ServiceException;
1492 
1493       /**
1494        * Create a stub. Try once only.  It is not typed because there is no common type to
1495        * protobuf services nor their interfaces.  Let the caller do appropriate casting.
1496        * @return A stub for master services.
1497        * @throws IOException
1498        * @throws KeeperException
1499        * @throws ServiceException
1500        */
1501       private Object makeStubNoRetries() throws IOException, KeeperException, ServiceException {
1502         ZooKeeperKeepAliveConnection zkw;
1503         try {
1504           zkw = getKeepAliveZooKeeperWatcher();
1505         } catch (IOException e) {
1506           ExceptionUtil.rethrowIfInterrupt(e);
1507           throw new ZooKeeperConnectionException("Can't connect to ZooKeeper", e);
1508         }
1509         try {
1510           checkIfBaseNodeAvailable(zkw);
1511           ServerName sn = MasterAddressTracker.getMasterAddress(zkw);
1512           if (sn == null) {
1513             String msg = "ZooKeeper available but no active master location found";
1514             LOG.info(msg);
1515             throw new MasterNotRunningException(msg);
1516           }
1517           if (isDeadServer(sn)) {
1518             throw new MasterNotRunningException(sn + " is dead.");
1519           }
1520           // Use the security info interface name as our stub key
1521           String key = getStubKey(getServiceName(),
1522               sn.getHostname(), sn.getPort(), hostnamesCanChange);
1523           connectionLock.putIfAbsent(key, key);
1524           Object stub = null;
1525           synchronized (connectionLock.get(key)) {
1526             stub = stubs.get(key);
1527             if (stub == null) {
1528               BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(sn, user, rpcTimeout);
1529               stub = makeStub(channel);
1530               isMasterRunning();
1531               stubs.put(key, stub);
1532             }
1533           }
1534           return stub;
1535         } finally {
1536           zkw.close();
1537         }
1538       }
1539 
1540       /**
1541        * Create a stub against the master.  Retry if necessary.
1542        * @return A stub to do <code>intf</code> against the master
1543        * @throws MasterNotRunningException
1544        */
1545       Object makeStub() throws IOException {
1546         // The lock must be at the beginning to prevent multiple master creations
1547         //  (and leaks) in a multithread context
1548         synchronized (masterAndZKLock) {
1549           Exception exceptionCaught = null;
1550           if (!closed) {
1551             try {
1552               return makeStubNoRetries();
1553             } catch (IOException e) {
1554               exceptionCaught = e;
1555             } catch (KeeperException e) {
1556               exceptionCaught = e;
1557             } catch (ServiceException e) {
1558               exceptionCaught = e;
1559             }
1560 
1561             throw new MasterNotRunningException(exceptionCaught);
1562           } else {
1563             throw new DoNotRetryIOException("Connection was closed while trying to get master");
1564           }
1565         }
1566       }
1567     }
1568 
1569     /**
1570      * Class to make a MasterServiceStubMaker stub.
1571      */
1572     class MasterServiceStubMaker extends StubMaker {
1573       private MasterService.BlockingInterface stub;
1574       @Override
1575       protected String getServiceName() {
1576         return MasterService.getDescriptor().getName();
1577       }
1578 
1579       @Override
1580       MasterService.BlockingInterface makeStub() throws IOException {
1581         return (MasterService.BlockingInterface)super.makeStub();
1582       }
1583 
1584       @Override
1585       protected Object makeStub(BlockingRpcChannel channel) {
1586         this.stub = MasterService.newBlockingStub(channel);
1587         return this.stub;
1588       }
1589 
1590       @Override
1591       protected void isMasterRunning() throws ServiceException {
1592         this.stub.isMasterRunning(null, RequestConverter.buildIsMasterRunningRequest());
1593       }
1594     }
1595 
1596     @Override
1597     public AdminService.BlockingInterface getAdmin(final ServerName serverName)
1598         throws IOException {
1599       return getAdmin(serverName, false);
1600     }
1601 
1602     @Override
1603     // Nothing is done w/ the 'master' parameter.  It is ignored.
1604     public AdminService.BlockingInterface getAdmin(final ServerName serverName,
1605       final boolean master)
1606     throws IOException {
1607       if (isDeadServer(serverName)) {
1608         throw new RegionServerStoppedException(serverName + " is dead.");
1609       }
1610       String key = getStubKey(AdminService.BlockingInterface.class.getName(),
1611           serverName.getHostname(), serverName.getPort(), this.hostnamesCanChange);
1612       this.connectionLock.putIfAbsent(key, key);
1613       AdminService.BlockingInterface stub = null;
1614       synchronized (this.connectionLock.get(key)) {
1615         stub = (AdminService.BlockingInterface)this.stubs.get(key);
1616         if (stub == null) {
1617           BlockingRpcChannel channel =
1618               this.rpcClient.createBlockingRpcChannel(serverName, user, rpcTimeout);
1619           stub = AdminService.newBlockingStub(channel);
1620           this.stubs.put(key, stub);
1621         }
1622       }
1623       return stub;
1624     }
1625 
1626     @Override
1627     public ClientService.BlockingInterface getClient(final ServerName sn)
1628     throws IOException {
1629       if (isDeadServer(sn)) {
1630         throw new RegionServerStoppedException(sn + " is dead.");
1631       }
1632       String key = getStubKey(ClientService.BlockingInterface.class.getName(), sn.getHostname(),
1633           sn.getPort(), this.hostnamesCanChange);
1634       this.connectionLock.putIfAbsent(key, key);
1635       ClientService.BlockingInterface stub = null;
1636       synchronized (this.connectionLock.get(key)) {
1637         stub = (ClientService.BlockingInterface)this.stubs.get(key);
1638         if (stub == null) {
1639           BlockingRpcChannel channel =
1640               this.rpcClient.createBlockingRpcChannel(sn, user, rpcTimeout);
1641           stub = ClientService.newBlockingStub(channel);
1642           // In old days, after getting stub/proxy, we'd make a call.  We are not doing that here.
1643           // Just fail on first actual call rather than in here on setup.
1644           this.stubs.put(key, stub);
1645         }
1646       }
1647       return stub;
1648     }
1649 
1650     static String getStubKey(final String serviceName,
1651                              final String rsHostname,
1652                              int port,
1653                              boolean resolveHostnames) {
1654 
1655       // Sometimes, servers go down and they come back up with the same hostname but a different
1656       // IP address. Force a resolution of the rsHostname by trying to instantiate an
1657       // InetSocketAddress, and this way we will rightfully get a new stubKey.
1658       // Also, include the hostname in the key so as to take care of those cases where the
1659       // DNS name is different but IP address remains the same.
1660       String address = rsHostname;
1661       if (resolveHostnames) {
1662         InetAddress i =  new InetSocketAddress(rsHostname, port).getAddress();
1663         if (i != null) {
1664           address = i.getHostAddress() + "-" + rsHostname;
1665         }
1666       }
1667       return serviceName + "@" + address + ":" + port;
1668     }
1669 
1670     private ZooKeeperKeepAliveConnection keepAliveZookeeper;
1671     private AtomicInteger keepAliveZookeeperUserCount = new AtomicInteger(0);
1672     private boolean canCloseZKW = true;
1673 
1674     // keepAlive time, in ms. No reason to make it configurable.
1675     private static final long keepAlive = 5 * 60 * 1000;
1676 
1677     /**
1678      * Retrieve a shared ZooKeeperWatcher. You must close it it once you've have finished with it.
1679      * @return The shared instance. Never returns null.
1680      */
1681     ZooKeeperKeepAliveConnection getKeepAliveZooKeeperWatcher()
1682       throws IOException {
1683       synchronized (masterAndZKLock) {
1684         if (keepAliveZookeeper == null) {
1685           if (this.closed) {
1686             throw new IOException(toString() + " closed");
1687           }
1688           // We don't check that our link to ZooKeeper is still valid
1689           // But there is a retry mechanism in the ZooKeeperWatcher itself
1690           keepAliveZookeeper = new ZooKeeperKeepAliveConnection(conf, this.toString(), this);
1691         }
1692         keepAliveZookeeperUserCount.addAndGet(1);
1693         keepZooKeeperWatcherAliveUntil = Long.MAX_VALUE;
1694         return keepAliveZookeeper;
1695       }
1696     }
1697 
1698     void releaseZooKeeperWatcher(final ZooKeeperWatcher zkw) {
1699       if (zkw == null){
1700         return;
1701       }
1702       if (keepAliveZookeeperUserCount.addAndGet(-1) <= 0 ){
1703         keepZooKeeperWatcherAliveUntil = System.currentTimeMillis() + keepAlive;
1704       }
1705     }
1706 
1707     private void closeZooKeeperWatcher() {
1708       synchronized (masterAndZKLock) {
1709         if (keepAliveZookeeper != null) {
1710           LOG.info("Closing zookeeper sessionid=0x" +
1711             Long.toHexString(
1712               keepAliveZookeeper.getRecoverableZooKeeper().getSessionId()));
1713           keepAliveZookeeper.internalClose();
1714           keepAliveZookeeper = null;
1715         }
1716         keepAliveZookeeperUserCount.set(0);
1717       }
1718     }
1719 
1720     final MasterServiceState masterServiceState = new MasterServiceState(this);
1721 
1722     @Override
1723     public MasterService.BlockingInterface getMaster() throws MasterNotRunningException {
1724       return getKeepAliveMasterService();
1725     }
1726 
1727     private void resetMasterServiceState(final MasterServiceState mss) {
1728       mss.userCount++;
1729     }
1730 
1731     @Override
1732     public MasterKeepAliveConnection getKeepAliveMasterService()
1733     throws MasterNotRunningException {
1734       synchronized (masterAndZKLock) {
1735         if (!isKeepAliveMasterConnectedAndRunning(this.masterServiceState)) {
1736           MasterServiceStubMaker stubMaker = new MasterServiceStubMaker();
1737           try {
1738             this.masterServiceState.stub = stubMaker.makeStub();
1739           } catch (MasterNotRunningException ex) {
1740             throw ex;
1741           } catch (IOException e) {
1742             // rethrow as MasterNotRunningException so that we can keep the method sig
1743             throw new MasterNotRunningException(e);
1744           }
1745         }
1746         resetMasterServiceState(this.masterServiceState);
1747       }
1748       // Ugly delegation just so we can add in a Close method.
1749       final MasterService.BlockingInterface stub = this.masterServiceState.stub;
1750       return new MasterKeepAliveConnection() {
1751         MasterServiceState mss = masterServiceState;
1752         @Override
1753         public MasterProtos.AbortProcedureResponse abortProcedure(
1754           RpcController controller,
1755           MasterProtos.AbortProcedureRequest request) throws ServiceException {
1756           return stub.abortProcedure(controller, request);
1757         }
1758         @Override
1759         public MasterProtos.ListProceduresResponse listProcedures(
1760             RpcController controller,
1761             MasterProtos.ListProceduresRequest request) throws ServiceException {
1762           return stub.listProcedures(controller, request);
1763         }
1764         @Override
1765         public AddColumnResponse addColumn(RpcController controller, AddColumnRequest request)
1766         throws ServiceException {
1767           return stub.addColumn(controller, request);
1768         }
1769 
1770         @Override
1771         public DeleteColumnResponse deleteColumn(RpcController controller,
1772             DeleteColumnRequest request)
1773         throws ServiceException {
1774           return stub.deleteColumn(controller, request);
1775         }
1776 
1777         @Override
1778         public ModifyColumnResponse modifyColumn(RpcController controller,
1779             ModifyColumnRequest request)
1780         throws ServiceException {
1781           return stub.modifyColumn(controller, request);
1782         }
1783 
1784         @Override
1785         public MoveRegionResponse moveRegion(RpcController controller,
1786             MoveRegionRequest request) throws ServiceException {
1787           return stub.moveRegion(controller, request);
1788         }
1789 
1790         @Override
1791         public DispatchMergingRegionsResponse dispatchMergingRegions(
1792             RpcController controller, DispatchMergingRegionsRequest request)
1793             throws ServiceException {
1794           return stub.dispatchMergingRegions(controller, request);
1795         }
1796 
1797         @Override
1798         public AssignRegionResponse assignRegion(RpcController controller,
1799             AssignRegionRequest request) throws ServiceException {
1800           return stub.assignRegion(controller, request);
1801         }
1802 
1803         @Override
1804         public UnassignRegionResponse unassignRegion(RpcController controller,
1805             UnassignRegionRequest request) throws ServiceException {
1806           return stub.unassignRegion(controller, request);
1807         }
1808 
1809         @Override
1810         public OfflineRegionResponse offlineRegion(RpcController controller,
1811             OfflineRegionRequest request) throws ServiceException {
1812           return stub.offlineRegion(controller, request);
1813         }
1814 
1815         @Override
1816         public DeleteTableResponse deleteTable(RpcController controller,
1817             DeleteTableRequest request) throws ServiceException {
1818           return stub.deleteTable(controller, request);
1819         }
1820 
1821         @Override
1822         public TruncateTableResponse truncateTable(RpcController controller,
1823             TruncateTableRequest request) throws ServiceException {
1824           return stub.truncateTable(controller, request);
1825         }
1826 
1827         @Override
1828         public EnableTableResponse enableTable(RpcController controller,
1829             EnableTableRequest request) throws ServiceException {
1830           return stub.enableTable(controller, request);
1831         }
1832 
1833         @Override
1834         public DisableTableResponse disableTable(RpcController controller,
1835             DisableTableRequest request) throws ServiceException {
1836           return stub.disableTable(controller, request);
1837         }
1838 
1839         @Override
1840         public ModifyTableResponse modifyTable(RpcController controller,
1841             ModifyTableRequest request) throws ServiceException {
1842           return stub.modifyTable(controller, request);
1843         }
1844 
1845         @Override
1846         public CreateTableResponse createTable(RpcController controller,
1847             CreateTableRequest request) throws ServiceException {
1848           return stub.createTable(controller, request);
1849         }
1850 
1851         @Override
1852         public ShutdownResponse shutdown(RpcController controller,
1853             ShutdownRequest request) throws ServiceException {
1854           return stub.shutdown(controller, request);
1855         }
1856 
1857         @Override
1858         public StopMasterResponse stopMaster(RpcController controller,
1859             StopMasterRequest request) throws ServiceException {
1860           return stub.stopMaster(controller, request);
1861         }
1862 
1863         @Override
1864         public BalanceResponse balance(RpcController controller,
1865             BalanceRequest request) throws ServiceException {
1866           return stub.balance(controller, request);
1867         }
1868 
1869         @Override
1870         public SetBalancerRunningResponse setBalancerRunning(
1871             RpcController controller, SetBalancerRunningRequest request)
1872             throws ServiceException {
1873           return stub.setBalancerRunning(controller, request);
1874         }
1875 
1876         @Override
1877         public NormalizeResponse normalize(RpcController controller,
1878                                        NormalizeRequest request) throws ServiceException {
1879           return stub.normalize(controller, request);
1880         }
1881 
1882         @Override
1883         public SetNormalizerRunningResponse setNormalizerRunning(
1884           RpcController controller, SetNormalizerRunningRequest request)
1885           throws ServiceException {
1886           return stub.setNormalizerRunning(controller, request);
1887         }
1888 
1889         @Override
1890         public RunCatalogScanResponse runCatalogScan(RpcController controller,
1891             RunCatalogScanRequest request) throws ServiceException {
1892           return stub.runCatalogScan(controller, request);
1893         }
1894 
1895         @Override
1896         public EnableCatalogJanitorResponse enableCatalogJanitor(
1897             RpcController controller, EnableCatalogJanitorRequest request)
1898             throws ServiceException {
1899           return stub.enableCatalogJanitor(controller, request);
1900         }
1901 
1902         @Override
1903         public IsCatalogJanitorEnabledResponse isCatalogJanitorEnabled(
1904             RpcController controller, IsCatalogJanitorEnabledRequest request)
1905             throws ServiceException {
1906           return stub.isCatalogJanitorEnabled(controller, request);
1907         }
1908 
1909         @Override
1910         public CoprocessorServiceResponse execMasterService(
1911             RpcController controller, CoprocessorServiceRequest request)
1912             throws ServiceException {
1913           return stub.execMasterService(controller, request);
1914         }
1915 
1916         @Override
1917         public SnapshotResponse snapshot(RpcController controller,
1918             SnapshotRequest request) throws ServiceException {
1919           return stub.snapshot(controller, request);
1920         }
1921 
1922         @Override
1923         public GetCompletedSnapshotsResponse getCompletedSnapshots(
1924             RpcController controller, GetCompletedSnapshotsRequest request)
1925             throws ServiceException {
1926           return stub.getCompletedSnapshots(controller, request);
1927         }
1928 
1929         @Override
1930         public DeleteSnapshotResponse deleteSnapshot(RpcController controller,
1931             DeleteSnapshotRequest request) throws ServiceException {
1932           return stub.deleteSnapshot(controller, request);
1933         }
1934 
1935         @Override
1936         public IsSnapshotDoneResponse isSnapshotDone(RpcController controller,
1937             IsSnapshotDoneRequest request) throws ServiceException {
1938           return stub.isSnapshotDone(controller, request);
1939         }
1940 
1941         @Override
1942         public RestoreSnapshotResponse restoreSnapshot(
1943             RpcController controller, RestoreSnapshotRequest request)
1944             throws ServiceException {
1945           return stub.restoreSnapshot(controller, request);
1946         }
1947 
1948         @Override
1949         public IsRestoreSnapshotDoneResponse isRestoreSnapshotDone(
1950             RpcController controller, IsRestoreSnapshotDoneRequest request)
1951             throws ServiceException {
1952           return stub.isRestoreSnapshotDone(controller, request);
1953         }
1954 
1955         @Override
1956         public ExecProcedureResponse execProcedure(
1957             RpcController controller, ExecProcedureRequest request)
1958             throws ServiceException {
1959           return stub.execProcedure(controller, request);
1960         }
1961 
1962         @Override
1963         public ExecProcedureResponse execProcedureWithRet(
1964             RpcController controller, ExecProcedureRequest request)
1965             throws ServiceException {
1966           return stub.execProcedureWithRet(controller, request);
1967         }
1968 
1969         @Override
1970         public IsProcedureDoneResponse isProcedureDone(RpcController controller,
1971             IsProcedureDoneRequest request) throws ServiceException {
1972           return stub.isProcedureDone(controller, request);
1973         }
1974 
1975         @Override
1976         public GetProcedureResultResponse getProcedureResult(RpcController controller,
1977             GetProcedureResultRequest request) throws ServiceException {
1978           return stub.getProcedureResult(controller, request);
1979         }
1980 
1981         @Override
1982         public IsMasterRunningResponse isMasterRunning(
1983             RpcController controller, IsMasterRunningRequest request)
1984             throws ServiceException {
1985           return stub.isMasterRunning(controller, request);
1986         }
1987 
1988         @Override
1989         public ModifyNamespaceResponse modifyNamespace(RpcController controller,
1990             ModifyNamespaceRequest request)
1991         throws ServiceException {
1992           return stub.modifyNamespace(controller, request);
1993         }
1994 
1995         @Override
1996         public CreateNamespaceResponse createNamespace(
1997             RpcController controller, CreateNamespaceRequest request) throws ServiceException {
1998           return stub.createNamespace(controller, request);
1999         }
2000 
2001         @Override
2002         public DeleteNamespaceResponse deleteNamespace(
2003             RpcController controller, DeleteNamespaceRequest request) throws ServiceException {
2004           return stub.deleteNamespace(controller, request);
2005         }
2006 
2007         @Override
2008         public GetNamespaceDescriptorResponse getNamespaceDescriptor(RpcController controller,
2009             GetNamespaceDescriptorRequest request) throws ServiceException {
2010           return stub.getNamespaceDescriptor(controller, request);
2011         }
2012 
2013         @Override
2014         public ListNamespaceDescriptorsResponse listNamespaceDescriptors(RpcController controller,
2015             ListNamespaceDescriptorsRequest request) throws ServiceException {
2016           return stub.listNamespaceDescriptors(controller, request);
2017         }
2018 
2019         @Override
2020         public ListTableDescriptorsByNamespaceResponse listTableDescriptorsByNamespace(
2021             RpcController controller, ListTableDescriptorsByNamespaceRequest request)
2022                 throws ServiceException {
2023           return stub.listTableDescriptorsByNamespace(controller, request);
2024         }
2025 
2026         @Override
2027         public ListTableNamesByNamespaceResponse listTableNamesByNamespace(
2028             RpcController controller, ListTableNamesByNamespaceRequest request)
2029                 throws ServiceException {
2030           return stub.listTableNamesByNamespace(controller, request);
2031         }
2032 
2033         @Override
2034         public void close() {
2035           release(this.mss);
2036         }
2037 
2038         @Override
2039         public GetSchemaAlterStatusResponse getSchemaAlterStatus(
2040             RpcController controller, GetSchemaAlterStatusRequest request)
2041             throws ServiceException {
2042           return stub.getSchemaAlterStatus(controller, request);
2043         }
2044 
2045         @Override
2046         public GetTableDescriptorsResponse getTableDescriptors(
2047             RpcController controller, GetTableDescriptorsRequest request)
2048             throws ServiceException {
2049           return stub.getTableDescriptors(controller, request);
2050         }
2051 
2052         @Override
2053         public GetTableNamesResponse getTableNames(
2054             RpcController controller, GetTableNamesRequest request)
2055             throws ServiceException {
2056           return stub.getTableNames(controller, request);
2057         }
2058 
2059         @Override
2060         public GetClusterStatusResponse getClusterStatus(
2061             RpcController controller, GetClusterStatusRequest request)
2062             throws ServiceException {
2063           return stub.getClusterStatus(controller, request);
2064         }
2065 
2066         @Override
2067         public SetQuotaResponse setQuota(RpcController controller, SetQuotaRequest request)
2068             throws ServiceException {
2069           return stub.setQuota(controller, request);
2070         }
2071 
2072         @Override
2073         public MajorCompactionTimestampResponse getLastMajorCompactionTimestamp(
2074             RpcController controller, MajorCompactionTimestampRequest request)
2075             throws ServiceException {
2076           return stub.getLastMajorCompactionTimestamp(controller, request);
2077         }
2078 
2079         @Override
2080         public MajorCompactionTimestampResponse getLastMajorCompactionTimestampForRegion(
2081             RpcController controller, MajorCompactionTimestampForRegionRequest request)
2082             throws ServiceException {
2083           return stub.getLastMajorCompactionTimestampForRegion(controller, request);
2084         }
2085 
2086         @Override
2087         public IsBalancerEnabledResponse isBalancerEnabled(RpcController controller,
2088             IsBalancerEnabledRequest request) throws ServiceException {
2089           return stub.isBalancerEnabled(controller, request);
2090         }
2091 
2092         @Override
2093         public IsNormalizerEnabledResponse isNormalizerEnabled(RpcController controller,
2094             IsNormalizerEnabledRequest request) throws ServiceException {
2095           return stub.isNormalizerEnabled(controller, request);
2096         }
2097 
2098         @Override
2099         public SecurityCapabilitiesResponse getSecurityCapabilities(RpcController controller,
2100             SecurityCapabilitiesRequest request) throws ServiceException {
2101           return stub.getSecurityCapabilities(controller, request);
2102         }
2103       };
2104     }
2105 
2106 
2107     private static void release(MasterServiceState mss) {
2108       if (mss != null && mss.connection != null) {
2109         ((HConnectionImplementation)mss.connection).releaseMaster(mss);
2110       }
2111     }
2112 
2113     private boolean isKeepAliveMasterConnectedAndRunning(MasterServiceState mss) {
2114       if (mss.getStub() == null){
2115         return false;
2116       }
2117       try {
2118         return mss.isMasterRunning();
2119       } catch (UndeclaredThrowableException e) {
2120         // It's somehow messy, but we can receive exceptions such as
2121         //  java.net.ConnectException but they're not declared. So we catch it...
2122         LOG.info("Master connection is not running anymore", e.getUndeclaredThrowable());
2123         return false;
2124       } catch (ServiceException se) {
2125         LOG.warn("Checking master connection", se);
2126         return false;
2127       }
2128     }
2129 
2130     void releaseMaster(MasterServiceState mss) {
2131       if (mss.getStub() == null) return;
2132       synchronized (masterAndZKLock) {
2133         --mss.userCount;
2134       }
2135     }
2136 
2137     private void closeMasterService(MasterServiceState mss) {
2138       if (mss.getStub() != null) {
2139         LOG.info("Closing master protocol: " + mss);
2140         mss.clearStub();
2141       }
2142       mss.userCount = 0;
2143     }
2144 
2145     /**
2146      * Immediate close of the shared master. Can be by the delayed close or when closing the
2147      * connection itself.
2148      */
2149     private void closeMaster() {
2150       synchronized (masterAndZKLock) {
2151         closeMasterService(masterServiceState);
2152       }
2153     }
2154 
2155     void updateCachedLocation(HRegionInfo hri, ServerName source,
2156                               ServerName serverName, long seqNum) {
2157       HRegionLocation newHrl = new HRegionLocation(hri, serverName, seqNum);
2158       cacheLocation(hri.getTable(), source, newHrl);
2159     }
2160 
2161     @Override
2162     public void deleteCachedRegionLocation(final HRegionLocation location) {
2163       metaCache.clearCache(location);
2164     }
2165 
2166     @Override
2167     public void updateCachedLocations(final TableName tableName, byte[] rowkey,
2168         final Object exception, final HRegionLocation source) {
2169       assert source != null;
2170       updateCachedLocations(tableName, source.getRegionInfo().getRegionName()
2171         , rowkey, exception, source.getServerName());
2172     }
2173 
2174     /**
2175      * Update the location with the new value (if the exception is a RegionMovedException)
2176      * or delete it from the cache. Does nothing if we can be sure from the exception that
2177      * the location is still accurate, or if the cache has already been updated.
2178      * @param exception an object (to simplify user code) on which we will try to find a nested
2179      *                  or wrapped or both RegionMovedException
2180      * @param source server that is the source of the location update.
2181      */
2182     @Override
2183     public void updateCachedLocations(final TableName tableName, byte[] regionName, byte[] rowkey,
2184       final Object exception, final ServerName source) {
2185       if (rowkey == null || tableName == null) {
2186         LOG.warn("Coding error, see method javadoc. row=" + (rowkey == null ? "null" : rowkey) +
2187             ", tableName=" + (tableName == null ? "null" : tableName));
2188         return;
2189       }
2190 
2191       if (source == null) {
2192         // This should not happen, but let's secure ourselves.
2193         return;
2194       }
2195 
2196       if (regionName == null) {
2197         // we do not know which region, so just remove the cache entry for the row and server
2198         metaCache.clearCache(tableName, rowkey, source);
2199         return;
2200       }
2201 
2202       // Is it something we have already updated?
2203       final RegionLocations oldLocations = getCachedLocation(tableName, rowkey);
2204       HRegionLocation oldLocation = null;
2205       if (oldLocations != null) {
2206         oldLocation = oldLocations.getRegionLocationByRegionName(regionName);
2207       }
2208       if (oldLocation == null || !source.equals(oldLocation.getServerName())) {
2209         // There is no such location in the cache (it's been removed already) or
2210         // the cache has already been refreshed with a different location.  => nothing to do
2211         return;
2212       }
2213 
2214       HRegionInfo regionInfo = oldLocation.getRegionInfo();
2215       Throwable cause = ClientExceptionsUtil.findException(exception);
2216       if (cause != null) {
2217         if (!ClientExceptionsUtil.isMetaClearingException(cause)) {
2218           // We know that the region is still on this region server
2219           return;
2220         }
2221 
2222         if (cause instanceof RegionMovedException) {
2223           RegionMovedException rme = (RegionMovedException) cause;
2224           if (LOG.isTraceEnabled()) {
2225             LOG.trace("Region " + regionInfo.getRegionNameAsString() + " moved to " +
2226                 rme.getHostname() + ":" + rme.getPort() +
2227                 " according to " + source.getHostAndPort());
2228           }
2229           // We know that the region is not anymore on this region server, but we know
2230           //  the new location.
2231           updateCachedLocation(
2232               regionInfo, source, rme.getServerName(), rme.getLocationSeqNum());
2233           return;
2234         }
2235       }
2236 
2237       // If we're here, it means that can cannot be sure about the location, so we remove it from
2238       // the cache. Do not send the source because source can be a new server in the same host:port
2239       metaCache.clearCache(regionInfo);
2240     }
2241 
2242     @Override
2243     public void updateCachedLocations(final byte[] tableName, byte[] rowkey,
2244       final Object exception, final HRegionLocation source) {
2245       updateCachedLocations(TableName.valueOf(tableName), rowkey, exception, source);
2246     }
2247 
2248     @Override
2249     @Deprecated
2250     public void processBatch(List<? extends Row> list,
2251         final TableName tableName,
2252         ExecutorService pool,
2253         Object[] results) throws IOException, InterruptedException {
2254       // This belongs in HTable!!! Not in here.  St.Ack
2255 
2256       // results must be the same size as list
2257       if (results.length != list.size()) {
2258         throw new IllegalArgumentException(
2259           "argument results must be the same size as argument list");
2260       }
2261       processBatchCallback(list, tableName, pool, results, null);
2262     }
2263 
2264     @Override
2265     @Deprecated
2266     public void processBatch(List<? extends Row> list,
2267         final byte[] tableName,
2268         ExecutorService pool,
2269         Object[] results) throws IOException, InterruptedException {
2270       processBatch(list, TableName.valueOf(tableName), pool, results);
2271     }
2272 
2273     /**
2274      * Send the queries in parallel on the different region servers. Retries on failures.
2275      * If the method returns it means that there is no error, and the 'results' array will
2276      * contain no exception. On error, an exception is thrown, and the 'results' array will
2277      * contain results and exceptions.
2278      * @deprecated since 0.96 - Use {@link HTable#processBatchCallback} instead
2279      */
2280     @Override
2281     @Deprecated
2282     public <R> void processBatchCallback(
2283       List<? extends Row> list,
2284       TableName tableName,
2285       ExecutorService pool,
2286       Object[] results,
2287       Batch.Callback<R> callback)
2288       throws IOException, InterruptedException {
2289 
2290       AsyncRequestFuture ars = this.asyncProcess.submitAll(
2291           pool, tableName, list, callback, results);
2292       ars.waitUntilDone();
2293       if (ars.hasError()) {
2294         throw ars.getErrors();
2295       }
2296     }
2297 
2298     @Override
2299     @Deprecated
2300     public <R> void processBatchCallback(
2301       List<? extends Row> list,
2302       byte[] tableName,
2303       ExecutorService pool,
2304       Object[] results,
2305       Batch.Callback<R> callback)
2306       throws IOException, InterruptedException {
2307       processBatchCallback(list, TableName.valueOf(tableName), pool, results, callback);
2308     }
2309 
2310     // For tests to override.
2311     protected AsyncProcess createAsyncProcess(Configuration conf) {
2312       // No default pool available.
2313       return new AsyncProcess(this, conf, this.batchPool,
2314           RpcRetryingCallerFactory.instantiate(conf, this.getStatisticsTracker()), false,
2315           RpcControllerFactory.instantiate(conf));
2316     }
2317 
2318     @Override
2319     public AsyncProcess getAsyncProcess() {
2320       return asyncProcess;
2321     }
2322 
2323     @Override
2324     public ServerStatisticTracker getStatisticsTracker() {
2325       return this.stats;
2326     }
2327 
2328     @Override
2329     public ClientBackoffPolicy getBackoffPolicy() {
2330       return this.backoffPolicy;
2331     }
2332 
2333     /*
2334      * Return the number of cached region for a table. It will only be called
2335      * from a unit test.
2336      */
2337     @VisibleForTesting
2338     int getNumberOfCachedRegionLocations(final TableName tableName) {
2339       return metaCache.getNumberOfCachedRegionLocations(tableName);
2340     }
2341 
2342     @Override
2343     @Deprecated
2344     public void setRegionCachePrefetch(final TableName tableName, final boolean enable) {
2345     }
2346 
2347     @Override
2348     @Deprecated
2349     public void setRegionCachePrefetch(final byte[] tableName,
2350         final boolean enable) {
2351     }
2352 
2353     @Override
2354     @Deprecated
2355     public boolean getRegionCachePrefetch(TableName tableName) {
2356       return false;
2357     }
2358 
2359     @Override
2360     @Deprecated
2361     public boolean getRegionCachePrefetch(byte[] tableName) {
2362       return false;
2363     }
2364 
2365     @Override
2366     public void abort(final String msg, Throwable t) {
2367       if (t instanceof KeeperException.SessionExpiredException
2368         && keepAliveZookeeper != null) {
2369         synchronized (masterAndZKLock) {
2370           if (keepAliveZookeeper != null) {
2371             LOG.warn("This client just lost it's session with ZooKeeper," +
2372               " closing it." +
2373               " It will be recreated next time someone needs it", t);
2374             closeZooKeeperWatcher();
2375           }
2376         }
2377       } else {
2378         if (t != null) {
2379           LOG.fatal(msg, t);
2380         } else {
2381           LOG.fatal(msg);
2382         }
2383         this.aborted = true;
2384         close();
2385         this.closed = true;
2386       }
2387     }
2388 
2389     @Override
2390     public boolean isClosed() {
2391       return this.closed;
2392     }
2393 
2394     @Override
2395     public boolean isAborted(){
2396       return this.aborted;
2397     }
2398 
2399     @Override
2400     public int getCurrentNrHRS() throws IOException {
2401       return this.registry.getCurrentNrHRS();
2402     }
2403 
2404     /**
2405      * Increment this client's reference count.
2406      */
2407     void incCount() {
2408       ++refCount;
2409     }
2410 
2411     /**
2412      * Decrement this client's reference count.
2413      */
2414     void decCount() {
2415       if (refCount > 0) {
2416         --refCount;
2417       }
2418     }
2419 
2420     /**
2421      * Return if this client has no reference
2422      *
2423      * @return true if this client has no reference; false otherwise
2424      */
2425     boolean isZeroReference() {
2426       return refCount == 0;
2427     }
2428 
2429     void internalClose() {
2430       if (this.closed) {
2431         return;
2432       }
2433       closeMaster();
2434       shutdownPools();
2435       if (this.metrics != null) {
2436         this.metrics.shutdown();
2437       }
2438       this.closed = true;
2439       closeZooKeeperWatcher();
2440       this.stubs.clear();
2441       if (clusterStatusListener != null) {
2442         clusterStatusListener.close();
2443       }
2444       if (rpcClient != null) {
2445         rpcClient.close();
2446       }
2447     }
2448 
2449     @Override
2450     public void close() {
2451       if (managed) {
2452         if (aborted) {
2453           ConnectionManager.deleteStaleConnection(this);
2454         } else {
2455           ConnectionManager.deleteConnection(this, false);
2456         }
2457       } else {
2458         internalClose();
2459       }
2460     }
2461 
2462     /**
2463      * Close the connection for good, regardless of what the current value of
2464      * {@link #refCount} is. Ideally, {@link #refCount} should be zero at this
2465      * point, which would be the case if all of its consumers close the
2466      * connection. However, on the off chance that someone is unable to close
2467      * the connection, perhaps because it bailed out prematurely, the method
2468      * below will ensure that this {@link HConnection} instance is cleaned up.
2469      * Caveat: The JVM may take an unknown amount of time to call finalize on an
2470      * unreachable object, so our hope is that every consumer cleans up after
2471      * itself, like any good citizen.
2472      */
2473     @Override
2474     protected void finalize() throws Throwable {
2475       super.finalize();
2476       // Pretend as if we are about to release the last remaining reference
2477       refCount = 1;
2478       close();
2479     }
2480 
2481     /**
2482      * @deprecated Use {@link Admin#listTables()} instead
2483      */
2484     @Deprecated
2485     @Override
2486     public HTableDescriptor[] listTables() throws IOException {
2487       MasterKeepAliveConnection master = getKeepAliveMasterService();
2488       try {
2489         GetTableDescriptorsRequest req =
2490           RequestConverter.buildGetTableDescriptorsRequest((List<TableName>)null);
2491         return ProtobufUtil.getHTableDescriptorArray(master.getTableDescriptors(null, req));
2492       } catch (ServiceException se) {
2493         throw ProtobufUtil.getRemoteException(se);
2494       } finally {
2495         master.close();
2496       }
2497     }
2498 
2499     /**
2500      * @deprecated Use {@link Admin#listTableNames()} instead
2501      */
2502     @Deprecated
2503     @Override
2504     public String[] getTableNames() throws IOException {
2505       TableName[] tableNames = listTableNames();
2506       String result[] = new String[tableNames.length];
2507       for (int i = 0; i < tableNames.length; i++) {
2508         result[i] = tableNames[i].getNameAsString();
2509       }
2510       return result;
2511     }
2512 
2513     /**
2514      * @deprecated Use {@link Admin#listTableNames()} instead
2515      */
2516     @Deprecated
2517     @Override
2518     public TableName[] listTableNames() throws IOException {
2519       MasterKeepAliveConnection master = getKeepAliveMasterService();
2520       try {
2521         return ProtobufUtil.getTableNameArray(master.getTableNames(null,
2522             GetTableNamesRequest.newBuilder().build())
2523           .getTableNamesList());
2524       } catch (ServiceException se) {
2525         throw ProtobufUtil.getRemoteException(se);
2526       } finally {
2527         master.close();
2528       }
2529     }
2530 
2531     /**
2532      * @deprecated Use {@link Admin#getTableDescriptorsByTableName(List)} instead
2533      */
2534     @Deprecated
2535     @Override
2536     public HTableDescriptor[] getHTableDescriptorsByTableName(
2537         List<TableName> tableNames) throws IOException {
2538       if (tableNames == null || tableNames.isEmpty()) return new HTableDescriptor[0];
2539       MasterKeepAliveConnection master = getKeepAliveMasterService();
2540       try {
2541         GetTableDescriptorsRequest req =
2542           RequestConverter.buildGetTableDescriptorsRequest(tableNames);
2543         return ProtobufUtil.getHTableDescriptorArray(master.getTableDescriptors(null, req));
2544       } catch (ServiceException se) {
2545         throw ProtobufUtil.getRemoteException(se);
2546       } finally {
2547         master.close();
2548       }
2549     }
2550 
2551     /**
2552      * @deprecated Use {@link Admin#getTableDescriptorsByTableName(List)} instead
2553      */
2554     @Deprecated
2555     @Override
2556     public HTableDescriptor[] getHTableDescriptors(
2557         List<String> names) throws IOException {
2558       List<TableName> tableNames = new ArrayList<TableName>(names.size());
2559       for(String name : names) {
2560         tableNames.add(TableName.valueOf(name));
2561       }
2562 
2563       return getHTableDescriptorsByTableName(tableNames);
2564     }
2565 
2566     @Override
2567     public NonceGenerator getNonceGenerator() {
2568       return this.nonceGenerator;
2569     }
2570 
2571     /**
2572      * Connects to the master to get the table descriptor.
2573      * @param tableName table name
2574      * @throws IOException if the connection to master fails or if the table
2575      *  is not found.
2576      * @deprecated Use {@link Admin#getTableDescriptor(TableName)} instead
2577      */
2578     @Deprecated
2579     @Override
2580     public HTableDescriptor getHTableDescriptor(final TableName tableName)
2581     throws IOException {
2582       if (tableName == null) return null;
2583       MasterKeepAliveConnection master = getKeepAliveMasterService();
2584       GetTableDescriptorsResponse htds;
2585       try {
2586         GetTableDescriptorsRequest req =
2587           RequestConverter.buildGetTableDescriptorsRequest(tableName);
2588         htds = master.getTableDescriptors(null, req);
2589       } catch (ServiceException se) {
2590         throw ProtobufUtil.getRemoteException(se);
2591       } finally {
2592         master.close();
2593       }
2594       if (!htds.getTableSchemaList().isEmpty()) {
2595         return HTableDescriptor.convert(htds.getTableSchemaList().get(0));
2596       }
2597       throw new TableNotFoundException(tableName.getNameAsString());
2598     }
2599 
2600     /**
2601      * @deprecated Use {@link Admin#getTableDescriptor(TableName)} instead
2602      */
2603     @Deprecated
2604     @Override
2605     public HTableDescriptor getHTableDescriptor(final byte[] tableName)
2606     throws IOException {
2607       return getHTableDescriptor(TableName.valueOf(tableName));
2608     }
2609 
2610     @Override
2611     public RpcRetryingCallerFactory getNewRpcRetryingCallerFactory(Configuration conf) {
2612       return RpcRetryingCallerFactory
2613           .instantiate(conf, this.interceptor, this.getStatisticsTracker());
2614     }
2615 
2616     @Override
2617     public boolean isManaged() {
2618       return managed;
2619     }
2620 
2621     @Override
2622     public boolean hasCellBlockSupport() {
2623       return this.rpcClient.hasCellBlockSupport();
2624     }
2625     
2626     @Override
2627     public ConnectionConfiguration getConnectionConfiguration() {
2628       return this.connectionConfig;
2629     }
2630 
2631     @Override
2632     public RpcRetryingCallerFactory getRpcRetryingCallerFactory() {
2633       return this.rpcCallerFactory;
2634     }
2635 
2636     @Override
2637     public RpcControllerFactory getRpcControllerFactory() {
2638       return this.rpcControllerFactory;
2639     }
2640   }
2641 
2642   /**
2643    * The record of errors for servers.
2644    */
2645   static class ServerErrorTracker {
2646     // We need a concurrent map here, as we could have multiple threads updating it in parallel.
2647     private final ConcurrentMap<ServerName, ServerErrors> errorsByServer =
2648         new ConcurrentHashMap<ServerName, ServerErrors>();
2649     private final long canRetryUntil;
2650     private final int maxRetries;
2651     private final long startTrackingTime;
2652 
2653     public ServerErrorTracker(long timeout, int maxRetries) {
2654       this.maxRetries = maxRetries;
2655       this.canRetryUntil = EnvironmentEdgeManager.currentTime() + timeout;
2656       this.startTrackingTime = new Date().getTime();
2657     }
2658 
2659     /**
2660      * We stop to retry when we have exhausted BOTH the number of retries and the time allocated.
2661      */
2662     boolean canRetryMore(int numRetry) {
2663       // If there is a single try we must not take into account the time.
2664       return numRetry < maxRetries || (maxRetries > 1 &&
2665           EnvironmentEdgeManager.currentTime() < this.canRetryUntil);
2666     }
2667 
2668     /**
2669      * Calculates the back-off time for a retrying request to a particular server.
2670      *
2671      * @param server    The server in question.
2672      * @param basePause The default hci pause.
2673      * @return The time to wait before sending next request.
2674      */
2675     long calculateBackoffTime(ServerName server, long basePause) {
2676       long result;
2677       ServerErrors errorStats = errorsByServer.get(server);
2678       if (errorStats != null) {
2679         result = ConnectionUtils.getPauseTime(basePause, errorStats.retries.get());
2680       } else {
2681         result = 0; // yes, if the server is not in our list we don't wait before retrying.
2682       }
2683       return result;
2684     }
2685 
2686     /**
2687      * Reports that there was an error on the server to do whatever bean-counting necessary.
2688      *
2689      * @param server The server in question.
2690      */
2691     void reportServerError(ServerName server) {
2692       ServerErrors errors = errorsByServer.get(server);
2693       if (errors != null) {
2694         errors.addError();
2695       } else {
2696         errors = errorsByServer.putIfAbsent(server, new ServerErrors());
2697         if (errors != null){
2698           errors.addError();
2699         }
2700       }
2701     }
2702 
2703     long getStartTrackingTime() {
2704       return startTrackingTime;
2705     }
2706 
2707     /**
2708      * The record of errors for a server.
2709      */
2710     private static class ServerErrors {
2711       public final AtomicInteger retries = new AtomicInteger(0);
2712 
2713       public void addError() {
2714         retries.incrementAndGet();
2715       }
2716     }
2717   }
2718 }