View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.client;
20  
21  import static org.apache.hadoop.hbase.client.MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY;
22  
23  import java.io.Closeable;
24  import java.io.IOException;
25  import java.io.InterruptedIOException;
26  import java.lang.reflect.UndeclaredThrowableException;
27  import java.net.InetAddress;
28  import java.net.InetSocketAddress;
29  import java.util.ArrayList;
30  import java.util.Date;
31  import java.util.HashSet;
32  import java.util.LinkedHashMap;
33  import java.util.List;
34  import java.util.concurrent.BlockingQueue;
35  import java.util.Map;
36  import java.util.Map.Entry;
37  import java.util.NavigableMap;
38  import java.util.Set;
39  import java.util.concurrent.ConcurrentHashMap;
40  import java.util.concurrent.ConcurrentMap;
41  import java.util.concurrent.ExecutorService;
42  import java.util.concurrent.LinkedBlockingQueue;
43  import java.util.concurrent.ThreadPoolExecutor;
44  import java.util.concurrent.TimeUnit;
45  import java.util.concurrent.atomic.AtomicBoolean;
46  import java.util.concurrent.atomic.AtomicInteger;
47  import java.util.concurrent.locks.ReentrantLock;
48  
49  import org.apache.commons.logging.Log;
50  import org.apache.commons.logging.LogFactory;
51  import org.apache.hadoop.conf.Configuration;
52  import org.apache.hadoop.hbase.DoNotRetryIOException;
53  import org.apache.hadoop.hbase.HBaseConfiguration;
54  import org.apache.hadoop.hbase.HConstants;
55  import org.apache.hadoop.hbase.HRegionInfo;
56  import org.apache.hadoop.hbase.HRegionLocation;
57  import org.apache.hadoop.hbase.HTableDescriptor;
58  import org.apache.hadoop.hbase.MasterNotRunningException;
59  import org.apache.hadoop.hbase.MetaTableAccessor;
60  import org.apache.hadoop.hbase.RegionLocations;
61  import org.apache.hadoop.hbase.ServerName;
62  import org.apache.hadoop.hbase.TableName;
63  import org.apache.hadoop.hbase.TableNotEnabledException;
64  import org.apache.hadoop.hbase.TableNotFoundException;
65  import org.apache.hadoop.hbase.ZooKeeperConnectionException;
66  import org.apache.hadoop.hbase.classification.InterfaceAudience;
67  import org.apache.hadoop.hbase.client.AsyncProcess.AsyncRequestFuture;
68  import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor;
69  import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitorBase;
70  import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy;
71  import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicyFactory;
72  import org.apache.hadoop.hbase.client.coprocessor.Batch;
73  import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil;
74  import org.apache.hadoop.hbase.exceptions.RegionMovedException;
75  import org.apache.hadoop.hbase.ipc.RpcClient;
76  import org.apache.hadoop.hbase.ipc.RpcClientFactory;
77  import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
78  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
79  import org.apache.hadoop.hbase.protobuf.RequestConverter;
80  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
81  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
82  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
83  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
84  import org.apache.hadoop.hbase.protobuf.generated.*;
85  import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AddColumnRequest;
86  import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AddColumnResponse;
87  import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AssignRegionRequest;
88  import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AssignRegionResponse;
89  import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.BalanceRequest;
90  import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.BalanceResponse;
91  import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.CreateNamespaceRequest;
92  import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.CreateNamespaceResponse;
93  import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.CreateTableRequest;
94  import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.CreateTableResponse;
95  import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteColumnRequest;
96  import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteColumnResponse;
97  import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteNamespaceRequest;
98  import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteNamespaceResponse;
99  import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteSnapshotRequest;
100 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteSnapshotResponse;
101 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteTableRequest;
102 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteTableResponse;
103 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DisableTableRequest;
104 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DisableTableResponse;
105 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DispatchMergingRegionsRequest;
106 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DispatchMergingRegionsResponse;
107 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.EnableCatalogJanitorRequest;
108 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.EnableCatalogJanitorResponse;
109 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.EnableTableRequest;
110 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.EnableTableResponse;
111 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ExecProcedureRequest;
112 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ExecProcedureResponse;
113 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetClusterStatusRequest;
114 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetClusterStatusResponse;
115 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetCompletedSnapshotsRequest;
116 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetCompletedSnapshotsResponse;
117 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetNamespaceDescriptorRequest;
118 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetNamespaceDescriptorResponse;
119 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetSchemaAlterStatusRequest;
120 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetSchemaAlterStatusResponse;
121 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableDescriptorsRequest;
122 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableDescriptorsResponse;
123 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableNamesRequest;
124 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableNamesResponse;
125 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetProcedureResultRequest;
126 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetProcedureResultResponse;
127 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsBalancerEnabledRequest;
128 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsBalancerEnabledResponse;
129 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsCatalogJanitorEnabledRequest;
130 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsCatalogJanitorEnabledResponse;
131 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsMasterRunningRequest;
132 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsMasterRunningResponse;
133 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsNormalizerEnabledRequest;
134 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsNormalizerEnabledResponse;
135 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsProcedureDoneRequest;
136 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsProcedureDoneResponse;
137 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsRestoreSnapshotDoneRequest;
138 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsRestoreSnapshotDoneResponse;
139 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsSnapshotDoneRequest;
140 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsSnapshotDoneResponse;
141 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ListNamespaceDescriptorsRequest;
142 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ListNamespaceDescriptorsResponse;
143 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceRequest;
144 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceResponse;
145 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ListTableNamesByNamespaceRequest;
146 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ListTableNamesByNamespaceResponse;
147 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MajorCompactionTimestampForRegionRequest;
148 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MajorCompactionTimestampRequest;
149 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MajorCompactionTimestampResponse;
150 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MasterService;
151 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ModifyColumnRequest;
152 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ModifyColumnResponse;
153 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ModifyNamespaceRequest;
154 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ModifyNamespaceResponse;
155 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ModifyTableRequest;
156 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ModifyTableResponse;
157 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MoveRegionRequest;
158 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MoveRegionResponse;
159 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.NormalizeRequest;
160 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.NormalizeResponse;
161 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.OfflineRegionRequest;
162 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.OfflineRegionResponse;
163 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.RestoreSnapshotRequest;
164 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.RestoreSnapshotResponse;
165 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.RunCatalogScanRequest;
166 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.RunCatalogScanResponse;
167 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SecurityCapabilitiesRequest;
168 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SecurityCapabilitiesResponse;
169 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SetBalancerRunningRequest;
170 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SetBalancerRunningResponse;
171 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SetNormalizerRunningRequest;
172 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SetNormalizerRunningResponse;
173 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SetQuotaRequest;
174 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SetQuotaResponse;
175 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ShutdownRequest;
176 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ShutdownResponse;
177 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SnapshotRequest;
178 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SnapshotResponse;
179 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.StopMasterRequest;
180 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.StopMasterResponse;
181 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.TruncateTableRequest;
182 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.TruncateTableResponse;
183 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.UnassignRegionRequest;
184 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.UnassignRegionResponse;
185 import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
186 import org.apache.hadoop.hbase.security.User;
187 import org.apache.hadoop.hbase.security.UserProvider;
188 import org.apache.hadoop.hbase.util.Bytes;
189 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
190 import org.apache.hadoop.hbase.util.ExceptionUtil;
191 import org.apache.hadoop.hbase.util.Threads;
192 import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
193 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
194 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
195 import org.apache.hadoop.ipc.RemoteException;
196 import org.apache.zookeeper.KeeperException;
197 
198 import com.google.common.annotations.VisibleForTesting;
199 import com.google.protobuf.BlockingRpcChannel;
200 import com.google.protobuf.RpcController;
201 import com.google.protobuf.ServiceException;
202 
203 /**
204  * An internal, non-instantiable class that manages creation of {@link HConnection}s.
205  */
206 @SuppressWarnings("serial")
207 @InterfaceAudience.Private
208 // NOTE: DO NOT make this class public. It was made package-private on purpose.
209 class ConnectionManager {
210   static final Log LOG = LogFactory.getLog(ConnectionManager.class);
211 
212   public static final String RETRIES_BY_SERVER_KEY = "hbase.client.retries.by.server";
213   private static final String CLIENT_NONCES_ENABLED_KEY = "hbase.client.nonces.enabled";
214   private static final String RESOLVE_HOSTNAME_ON_FAIL_KEY = "hbase.resolve.hostnames.on.failure";
215 
216   // An LRU Map of HConnectionKey -> HConnection (TableServer).  All
217   // access must be synchronized.  This map is not private because tests
218   // need to be able to tinker with it.
219   static final Map<HConnectionKey, HConnectionImplementation> CONNECTION_INSTANCES;
220 
221   public static final int MAX_CACHED_CONNECTION_INSTANCES;
222 
223   /**
224    * Global nonceGenerator shared per client.Currently there's no reason to limit its scope.
225    * Once it's set under nonceGeneratorCreateLock, it is never unset or changed.
226    */
227   private static volatile NonceGenerator nonceGenerator = null;
228   /** The nonce generator lock. Only taken when creating HConnection, which gets a private copy. */
229   private static Object nonceGeneratorCreateLock = new Object();
230 
231   static {
232     // We set instances to one more than the value specified for {@link
233     // HConstants#ZOOKEEPER_MAX_CLIENT_CNXNS}. By default, the zk default max
234     // connections to the ensemble from the one client is 30, so in that case we
235     // should run into zk issues before the LRU hit this value of 31.
236     MAX_CACHED_CONNECTION_INSTANCES = HBaseConfiguration.create().getInt(
237       HConstants.ZOOKEEPER_MAX_CLIENT_CNXNS, HConstants.DEFAULT_ZOOKEPER_MAX_CLIENT_CNXNS) + 1;
238     CONNECTION_INSTANCES = new LinkedHashMap<HConnectionKey, HConnectionImplementation>(
239         (int) (MAX_CACHED_CONNECTION_INSTANCES / 0.75F) + 1, 0.75F, true) {
240       @Override
241       protected boolean removeEldestEntry(
242           Map.Entry<HConnectionKey, HConnectionImplementation> eldest) {
243          return size() > MAX_CACHED_CONNECTION_INSTANCES;
244        }
245     };
246   }
247 
248   /** Dummy nonce generator for disabled nonces. */
249   static class NoNonceGenerator implements NonceGenerator {
250     @Override
251     public long getNonceGroup() {
252       return HConstants.NO_NONCE;
253     }
254     @Override
255     public long newNonce() {
256       return HConstants.NO_NONCE;
257     }
258   }
259 
260   /*
261    * Non-instantiable.
262    */
263   private ConnectionManager() {
264     super();
265   }
266 
267   /**
268    * @param conn The connection for which to replace the generator.
269    * @param cnm Replaces the nonce generator used, for testing.
270    * @return old nonce generator.
271    */
272   @VisibleForTesting
273   static NonceGenerator injectNonceGeneratorForTesting(
274       ClusterConnection conn, NonceGenerator cnm) {
275     HConnectionImplementation connImpl = (HConnectionImplementation)conn;
276     NonceGenerator ng = connImpl.getNonceGenerator();
277     LOG.warn("Nonce generator is being replaced by test code for " + cnm.getClass().getName());
278     connImpl.nonceGenerator = cnm;
279     return ng;
280   }
281 
282   /**
283    * Get the connection that goes with the passed <code>conf</code> configuration instance.
284    * If no current connection exists, method creates a new connection and keys it using
285    * connection-specific properties from the passed {@link Configuration}; see
286    * {@link HConnectionKey}.
287    * @param conf configuration
288    * @return HConnection object for <code>conf</code>
289    * @throws ZooKeeperConnectionException
290    */
291   @Deprecated
292   public static HConnection getConnection(final Configuration conf) throws IOException {
293     return getConnectionInternal(conf);
294   }
295 
296 
297   static ClusterConnection getConnectionInternal(final Configuration conf)
298     throws IOException {
299     HConnectionKey connectionKey = new HConnectionKey(conf);
300     synchronized (CONNECTION_INSTANCES) {
301       HConnectionImplementation connection = CONNECTION_INSTANCES.get(connectionKey);
302       if (connection == null) {
303         connection = (HConnectionImplementation)createConnection(conf, true);
304         CONNECTION_INSTANCES.put(connectionKey, connection);
305       } else if (connection.isClosed()) {
306         ConnectionManager.deleteConnection(connectionKey, true);
307         connection = (HConnectionImplementation)createConnection(conf, true);
308         CONNECTION_INSTANCES.put(connectionKey, connection);
309       }
310       connection.incCount();
311       return connection;
312     }
313   }
314 
315   /**
316    * Create a new HConnection instance using the passed <code>conf</code> instance.
317    * <p>Note: This bypasses the usual HConnection life cycle management done by
318    * {@link #getConnection(Configuration)}. The caller is responsible for
319    * calling {@link HConnection#close()} on the returned connection instance.
320    *
321    * This is the recommended way to create HConnections.
322    * {@code
323    * HConnection connection = ConnectionManagerInternal.createConnection(conf);
324    * HTableInterface table = connection.getTable("mytable");
325    * table.get(...);
326    * ...
327    * table.close();
328    * connection.close();
329    * }
330    *
331    * @param conf configuration
332    * @return HConnection object for <code>conf</code>
333    * @throws ZooKeeperConnectionException
334    */
335   public static HConnection createConnection(Configuration conf) throws IOException {
336     return createConnectionInternal(conf);
337   }
338 
339   static ClusterConnection createConnectionInternal(Configuration conf) throws IOException {
340     UserProvider provider = UserProvider.instantiate(conf);
341     return createConnection(conf, false, null, provider.getCurrent());
342   }
343 
344   /**
345    * Create a new HConnection instance using the passed <code>conf</code> instance.
346    * <p>Note: This bypasses the usual HConnection life cycle management done by
347    * {@link #getConnection(Configuration)}. The caller is responsible for
348    * calling {@link HConnection#close()} on the returned connection instance.
349    * This is the recommended way to create HConnections.
350    * {@code
351    * ExecutorService pool = ...;
352    * HConnection connection = HConnectionManager.createConnection(conf, pool);
353    * HTableInterface table = connection.getTable("mytable");
354    * table.get(...);
355    * ...
356    * table.close();
357    * connection.close();
358    * }
359    * @param conf configuration
360    * @param pool the thread pool to use for batch operation in HTables used via this HConnection
361    * @return HConnection object for <code>conf</code>
362    * @throws ZooKeeperConnectionException
363    */
364   public static HConnection createConnection(Configuration conf, ExecutorService pool)
365   throws IOException {
366     UserProvider provider = UserProvider.instantiate(conf);
367     return createConnection(conf, false, pool, provider.getCurrent());
368   }
369 
370   /**
371    * Create a new HConnection instance using the passed <code>conf</code> instance.
372    * <p>Note: This bypasses the usual HConnection life cycle management done by
373    * {@link #getConnection(Configuration)}. The caller is responsible for
374    * calling {@link HConnection#close()} on the returned connection instance.
375    * This is the recommended way to create HConnections.
376    * {@code
377    * ExecutorService pool = ...;
378    * HConnection connection = HConnectionManager.createConnection(conf, pool);
379    * HTableInterface table = connection.getTable("mytable");
380    * table.get(...);
381    * ...
382    * table.close();
383    * connection.close();
384    * }
385    * @param conf configuration
386    * @param user the user the connection is for
387    * @return HConnection object for <code>conf</code>
388    * @throws ZooKeeperConnectionException
389    */
390   public static HConnection createConnection(Configuration conf, User user)
391   throws IOException {
392     return createConnection(conf, false, null, user);
393   }
394 
395   /**
396    * Create a new HConnection instance using the passed <code>conf</code> instance.
397    * <p>Note: This bypasses the usual HConnection life cycle management done by
398    * {@link #getConnection(Configuration)}. The caller is responsible for
399    * calling {@link HConnection#close()} on the returned connection instance.
400    * This is the recommended way to create HConnections.
401    * {@code
402    * ExecutorService pool = ...;
403    * HConnection connection = HConnectionManager.createConnection(conf, pool);
404    * HTableInterface table = connection.getTable("mytable");
405    * table.get(...);
406    * ...
407    * table.close();
408    * connection.close();
409    * }
410    * @param conf configuration
411    * @param pool the thread pool to use for batch operation in HTables used via this HConnection
412    * @param user the user the connection is for
413    * @return HConnection object for <code>conf</code>
414    * @throws ZooKeeperConnectionException
415    */
416   public static HConnection createConnection(Configuration conf, ExecutorService pool, User user)
417   throws IOException {
418     return createConnection(conf, false, pool, user);
419   }
420 
421   @Deprecated
422   static HConnection createConnection(final Configuration conf, final boolean managed)
423       throws IOException {
424     UserProvider provider = UserProvider.instantiate(conf);
425     return createConnection(conf, managed, null, provider.getCurrent());
426   }
427 
428   @Deprecated
429   static ClusterConnection createConnection(final Configuration conf, final boolean managed,
430       final ExecutorService pool, final User user)
431   throws IOException {
432     return (ClusterConnection) ConnectionFactory.createConnection(conf, managed, pool, user);
433   }
434 
435   /**
436    * Delete connection information for the instance specified by passed configuration.
437    * If there are no more references to the designated connection connection, this method will
438    * then close connection to the zookeeper ensemble and let go of all associated resources.
439    *
440    * @param conf configuration whose identity is used to find {@link HConnection} instance.
441    * @deprecated
442    */
443   @Deprecated
444   public static void deleteConnection(Configuration conf) {
445     deleteConnection(new HConnectionKey(conf), false);
446   }
447 
448   /**
449    * Cleanup a known stale connection.
450    * This will then close connection to the zookeeper ensemble and let go of all resources.
451    *
452    * @param connection
453    * @deprecated
454    */
455   @Deprecated
456   public static void deleteStaleConnection(HConnection connection) {
457     deleteConnection(connection, true);
458   }
459 
460   /**
461    * Delete information for all connections. Close or not the connection, depending on the
462    *  staleConnection boolean and the ref count. By default, you should use it with
463    *  staleConnection to true.
464    * @deprecated
465    */
466   @Deprecated
467   public static void deleteAllConnections(boolean staleConnection) {
468     synchronized (CONNECTION_INSTANCES) {
469       Set<HConnectionKey> connectionKeys = new HashSet<HConnectionKey>();
470       connectionKeys.addAll(CONNECTION_INSTANCES.keySet());
471       for (HConnectionKey connectionKey : connectionKeys) {
472         deleteConnection(connectionKey, staleConnection);
473       }
474       CONNECTION_INSTANCES.clear();
475     }
476   }
477 
478   /**
479    * Delete information for all connections..
480    * @deprecated kept for backward compatibility, but the behavior is broken. HBASE-8983
481    */
482   @Deprecated
483   public static void deleteAllConnections() {
484     deleteAllConnections(false);
485   }
486 
487 
488   @Deprecated
489   private static void deleteConnection(HConnection connection, boolean staleConnection) {
490     synchronized (CONNECTION_INSTANCES) {
491       for (Entry<HConnectionKey, HConnectionImplementation> e: CONNECTION_INSTANCES.entrySet()) {
492         if (e.getValue() == connection) {
493           deleteConnection(e.getKey(), staleConnection);
494           break;
495         }
496       }
497     }
498   }
499 
500   @Deprecated
501   private static void deleteConnection(HConnectionKey connectionKey, boolean staleConnection) {
502     synchronized (CONNECTION_INSTANCES) {
503       HConnectionImplementation connection = CONNECTION_INSTANCES.get(connectionKey);
504       if (connection != null) {
505         connection.decCount();
506         if (connection.isZeroReference() || staleConnection) {
507           CONNECTION_INSTANCES.remove(connectionKey);
508           connection.internalClose();
509         }
510       } else {
511         LOG.error("Connection not found in the list, can't delete it "+
512           "(connection key=" + connectionKey + "). May be the key was modified?", new Exception());
513       }
514     }
515   }
516 
517 
518   /**
519    * This convenience method invokes the given {@link HConnectable#connect}
520    * implementation using a {@link HConnection} instance that lasts just for the
521    * duration of the invocation.
522    *
523    * @param <T> the return type of the connect method
524    * @param connectable the {@link HConnectable} instance
525    * @return the value returned by the connect method
526    * @throws IOException
527    */
528   @InterfaceAudience.Private
529   public static <T> T execute(HConnectable<T> connectable) throws IOException {
530     if (connectable == null || connectable.conf == null) {
531       return null;
532     }
533     Configuration conf = connectable.conf;
534     HConnection connection = getConnection(conf);
535     boolean connectSucceeded = false;
536     try {
537       T returnValue = connectable.connect(connection);
538       connectSucceeded = true;
539       return returnValue;
540     } finally {
541       try {
542         connection.close();
543       } catch (Exception e) {
544         ExceptionUtil.rethrowIfInterrupt(e);
545         if (connectSucceeded) {
546           throw new IOException("The connection to " + connection
547               + " could not be deleted.", e);
548         }
549       }
550     }
551   }
552 
553   /** Encapsulates connection to zookeeper and regionservers.*/
554   @edu.umd.cs.findbugs.annotations.SuppressWarnings(
555       value="AT_OPERATION_SEQUENCE_ON_CONCURRENT_ABSTRACTION",
556       justification="Access to the conncurrent hash map is under a lock so should be fine.")
557   static class HConnectionImplementation implements ClusterConnection, Closeable {
558     static final Log LOG = LogFactory.getLog(HConnectionImplementation.class);
559     private final boolean hostnamesCanChange;
560     private final long pause;
561     private final boolean useMetaReplicas;
562     private final int numTries;
563     final int rpcTimeout;
564     private NonceGenerator nonceGenerator = null;
565     private final AsyncProcess asyncProcess;
566     // single tracker per connection
567     private final ServerStatisticTracker stats;
568 
569     private volatile boolean closed;
570     private volatile boolean aborted;
571 
572     // package protected for the tests
573     ClusterStatusListener clusterStatusListener;
574 
575 
576     private final Object metaRegionLock = new Object();
577 
578     // We have a single lock for master & zk to prevent deadlocks. Having
579     //  one lock for ZK and one lock for master is not possible:
580     //  When creating a connection to master, we need a connection to ZK to get
581     //  its address. But another thread could have taken the ZK lock, and could
582     //  be waiting for the master lock => deadlock.
583     private final Object masterAndZKLock = new Object();
584 
585     private long keepZooKeeperWatcherAliveUntil = Long.MAX_VALUE;
586 
587     // thread executor shared by all HTableInterface instances created
588     // by this connection
589     private volatile ExecutorService batchPool = null;
590     // meta thread executor shared by all HTableInterface instances created
591     // by this connection
592     private volatile ExecutorService metaLookupPool = null;
593     private volatile boolean cleanupPool = false;
594 
595     private final Configuration conf;
596 
597     // cache the configuration value for tables so that we can avoid calling
598     // the expensive Configuration to fetch the value multiple times.
599     private final ConnectionConfiguration connectionConfig;
600 
601     // Client rpc instance.
602     private RpcClient rpcClient;
603 
604     private final MetaCache metaCache;
605     private final MetricsConnection metrics;
606 
607     private int refCount;
608 
609     // indicates whether this connection's life cycle is managed (by us)
610     private boolean managed;
611 
612     protected User user;
613 
614     private RpcRetryingCallerFactory rpcCallerFactory;
615 
616     private RpcControllerFactory rpcControllerFactory;
617 
618     private final RetryingCallerInterceptor interceptor;
619 
620     /**
621      * Cluster registry of basic info such as clusterid and meta region location.
622      */
623     Registry registry;
624 
625     private final ClientBackoffPolicy backoffPolicy;
626 
627     /** lock guards against multiple threads trying to query the meta region at the same time */
628     private final ReentrantLock userRegionLock = new ReentrantLock();
629 
630 
631     HConnectionImplementation(Configuration conf, boolean managed) throws IOException {
632       this(conf, managed, null, null);
633     }
634 
635     HConnectionImplementation(Configuration conf, boolean managed, ExecutorService pool, User user)
636       throws IOException {
637       this(conf, managed, pool, user, null);
638     }
639 
640     /**
641      * constructor
642      * @param conf Configuration object
643      * @param managed If true, does not do full shutdown on close; i.e. cleanup of connection
644      * to zk and shutdown of all services; we just close down the resources this connection was
645      * responsible for and decrement usage counters.  It is up to the caller to do the full
646      * cleanup.  It is set when we want have connection sharing going on -- reuse of zk connection,
647      * and cached region locations, established regionserver connections, etc.  When connections
648      * are shared, we have reference counting going on and will only do full cleanup when no more
649      * users of an HConnectionImplementation instance.
650      */
651     HConnectionImplementation(Configuration conf, boolean managed,
652         ExecutorService pool, User user, String clusterId) throws IOException {
653       this(conf);
654       this.clusterId = clusterId;
655       this.user = user;
656       this.batchPool = pool;
657       this.managed = managed;
658       this.registry = setupRegistry();
659       retrieveClusterId();
660 
661       this.rpcClient = RpcClientFactory.createClient(this.conf, this.clusterId, this.metrics);
662       this.rpcControllerFactory = RpcControllerFactory.instantiate(conf);
663 
664       // Do we publish the status?
665       boolean shouldListen = conf.getBoolean(HConstants.STATUS_PUBLISHED,
666           HConstants.STATUS_PUBLISHED_DEFAULT);
667       Class<? extends ClusterStatusListener.Listener> listenerClass =
668           conf.getClass(ClusterStatusListener.STATUS_LISTENER_CLASS,
669               ClusterStatusListener.DEFAULT_STATUS_LISTENER_CLASS,
670               ClusterStatusListener.Listener.class);
671       if (shouldListen) {
672         if (listenerClass == null) {
673           LOG.warn(HConstants.STATUS_PUBLISHED + " is true, but " +
674               ClusterStatusListener.STATUS_LISTENER_CLASS + " is not set - not listening status");
675         } else {
676           clusterStatusListener = new ClusterStatusListener(
677               new ClusterStatusListener.DeadServerHandler() {
678                 @Override
679                 public void newDead(ServerName sn) {
680                   clearCaches(sn);
681                   rpcClient.cancelConnections(sn);
682                 }
683               }, conf, listenerClass);
684         }
685       }
686     }
687 
688     /**
689      * For tests.
690      */
691     protected HConnectionImplementation(Configuration conf) {
692       this.conf = conf;
693       this.connectionConfig = new ConnectionConfiguration(conf);
694       this.closed = false;
695       this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
696           HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
697       this.useMetaReplicas = conf.getBoolean(HConstants.USE_META_REPLICAS,
698           HConstants.DEFAULT_USE_META_REPLICAS);
699       this.numTries = connectionConfig.getRetriesNumber();
700       this.rpcTimeout = conf.getInt(
701           HConstants.HBASE_RPC_TIMEOUT_KEY,
702           HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
703       if (conf.getBoolean(CLIENT_NONCES_ENABLED_KEY, true)) {
704         synchronized (nonceGeneratorCreateLock) {
705           if (ConnectionManager.nonceGenerator == null) {
706             ConnectionManager.nonceGenerator = new PerClientRandomNonceGenerator();
707           }
708           this.nonceGenerator = ConnectionManager.nonceGenerator;
709         }
710       } else {
711         this.nonceGenerator = new NoNonceGenerator();
712       }
713       stats = ServerStatisticTracker.create(conf);
714       this.asyncProcess = createAsyncProcess(this.conf);
715       this.interceptor = (new RetryingCallerInterceptorFactory(conf)).build();
716       this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf, interceptor, this.stats);
717       this.backoffPolicy = ClientBackoffPolicyFactory.create(conf);
718       if (conf.getBoolean(CLIENT_SIDE_METRICS_ENABLED_KEY, false)) {
719         this.metrics = new MetricsConnection(this);
720       } else {
721         this.metrics = null;
722       }
723       
724       this.hostnamesCanChange = conf.getBoolean(RESOLVE_HOSTNAME_ON_FAIL_KEY, true);
725       this.metaCache = new MetaCache(this.metrics);
726     }
727 
728     @Override
729     public HTableInterface getTable(String tableName) throws IOException {
730       return getTable(TableName.valueOf(tableName));
731     }
732 
733     @Override
734     public HTableInterface getTable(byte[] tableName) throws IOException {
735       return getTable(TableName.valueOf(tableName));
736     }
737 
738     @Override
739     public HTableInterface getTable(TableName tableName) throws IOException {
740       return getTable(tableName, getBatchPool());
741     }
742 
743     @Override
744     public HTableInterface getTable(String tableName, ExecutorService pool) throws IOException {
745       return getTable(TableName.valueOf(tableName), pool);
746     }
747 
748     @Override
749     public HTableInterface getTable(byte[] tableName, ExecutorService pool) throws IOException {
750       return getTable(TableName.valueOf(tableName), pool);
751     }
752 
753     @Override
754     public HTableInterface getTable(TableName tableName, ExecutorService pool) throws IOException {
755       if (managed) {
756         throw new NeedUnmanagedConnectionException();
757       }
758       return new HTable(tableName, this, connectionConfig, rpcCallerFactory, rpcControllerFactory, pool);
759     }
760 
761     @Override
762     public BufferedMutator getBufferedMutator(BufferedMutatorParams params) {
763       if (params.getTableName() == null) {
764         throw new IllegalArgumentException("TableName cannot be null.");
765       }
766       if (params.getPool() == null) {
767         params.pool(HTable.getDefaultExecutor(getConfiguration()));
768       }
769       if (params.getWriteBufferSize() == BufferedMutatorParams.UNSET) {
770         params.writeBufferSize(connectionConfig.getWriteBufferSize());
771       }
772       if (params.getMaxKeyValueSize() == BufferedMutatorParams.UNSET) {
773         params.maxKeyValueSize(connectionConfig.getMaxKeyValueSize());
774       }
775       return new BufferedMutatorImpl(this, rpcCallerFactory, rpcControllerFactory, params);
776     }
777 
778     @Override
779     public BufferedMutator getBufferedMutator(TableName tableName) {
780       return getBufferedMutator(new BufferedMutatorParams(tableName));
781     }
782 
783     @Override
784     public RegionLocator getRegionLocator(TableName tableName) throws IOException {
785       return new HRegionLocator(tableName, this);
786     }
787 
788     @Override
789     public Admin getAdmin() throws IOException {
790       if (managed) {
791         throw new NeedUnmanagedConnectionException();
792       }
793       return new HBaseAdmin(this);
794     }
795 
796     @Override
797     public MetricsConnection getConnectionMetrics() {
798       return this.metrics;
799     }
800 
801     private ExecutorService getBatchPool() {
802       if (batchPool == null) {
803         synchronized (this) {
804           if (batchPool == null) {
805             this.batchPool = getThreadPool(conf.getInt("hbase.hconnection.threads.max", 256),
806                 conf.getInt("hbase.hconnection.threads.core", 256), "-shared-", null);
807             this.cleanupPool = true;
808           }
809         }
810       }
811       return this.batchPool;
812     }
813 
814     private ExecutorService getThreadPool(int maxThreads, int coreThreads, String nameHint,
815         BlockingQueue<Runnable> passedWorkQueue) {
816       // shared HTable thread executor not yet initialized
817       if (maxThreads == 0) {
818         maxThreads = Runtime.getRuntime().availableProcessors() * 8;
819       }
820       if (coreThreads == 0) {
821         coreThreads = Runtime.getRuntime().availableProcessors() * 8;
822       }
823       long keepAliveTime = conf.getLong("hbase.hconnection.threads.keepalivetime", 60);
824       BlockingQueue<Runnable> workQueue = passedWorkQueue;
825       if (workQueue == null) {
826         workQueue =
827           new LinkedBlockingQueue<Runnable>(maxThreads *
828               conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS,
829                   HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS));
830       }
831       ThreadPoolExecutor tpe = new ThreadPoolExecutor(
832           coreThreads,
833           maxThreads,
834           keepAliveTime,
835           TimeUnit.SECONDS,
836           workQueue,
837           Threads.newDaemonThreadFactory(toString() + nameHint));
838       tpe.allowCoreThreadTimeOut(true);
839       return tpe;
840     }
841 
842     private ExecutorService getMetaLookupPool() {
843       if (this.metaLookupPool == null) {
844         synchronized (this) {
845           if (this.metaLookupPool == null) {
846             //Some of the threads would be used for meta replicas
847             //To start with, threads.max.core threads can hit the meta (including replicas).
848             //After that, requests will get queued up in the passed queue, and only after
849             //the queue is full, a new thread will be started
850             this.metaLookupPool = getThreadPool(
851                conf.getInt("hbase.hconnection.meta.lookup.threads.max", 128),
852                conf.getInt("hbase.hconnection.meta.lookup.threads.core", 10),
853              "-metaLookup-shared-", new LinkedBlockingQueue<Runnable>());
854           }
855         }
856       }
857       return this.metaLookupPool;
858     }
859 
860     protected ExecutorService getCurrentMetaLookupPool() {
861       return metaLookupPool;
862     }
863 
864     protected ExecutorService getCurrentBatchPool() {
865       return batchPool;
866     }
867 
868     private void shutdownPools() {
869       if (this.cleanupPool && this.batchPool != null && !this.batchPool.isShutdown()) {
870         shutdownBatchPool(this.batchPool);
871       }
872       if (this.metaLookupPool != null && !this.metaLookupPool.isShutdown()) {
873         shutdownBatchPool(this.metaLookupPool);
874       }
875     }
876 
877     private void shutdownBatchPool(ExecutorService pool) {
878       pool.shutdown();
879       try {
880         if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
881           pool.shutdownNow();
882         }
883       } catch (InterruptedException e) {
884         pool.shutdownNow();
885       }
886     }
887 
888     /**
889      * @return The cluster registry implementation to use.
890      * @throws IOException
891      */
892     private Registry setupRegistry() throws IOException {
893       return RegistryFactory.getRegistry(this);
894     }
895 
896     /**
897      * For tests only.
898      */
899     @VisibleForTesting
900     RpcClient getRpcClient() {
901       return rpcClient;
902     }
903 
904     /**
905      * An identifier that will remain the same for a given connection.
906      */
907     @Override
908     public String toString(){
909       return "hconnection-0x" + Integer.toHexString(hashCode());
910     }
911 
912     protected String clusterId = null;
913 
914     void retrieveClusterId() {
915       if (clusterId != null) return;
916       this.clusterId = this.registry.getClusterId();
917       if (clusterId == null) {
918         clusterId = HConstants.CLUSTER_ID_DEFAULT;
919         LOG.debug("clusterid came back null, using default " + clusterId);
920       }
921     }
922 
923     @Override
924     public Configuration getConfiguration() {
925       return this.conf;
926     }
927 
928     private void checkIfBaseNodeAvailable(ZooKeeperWatcher zkw)
929       throws MasterNotRunningException {
930       String errorMsg;
931       try {
932         if (ZKUtil.checkExists(zkw, zkw.baseZNode) == -1) {
933           errorMsg = "The node " + zkw.baseZNode+" is not in ZooKeeper. "
934             + "It should have been written by the master. "
935             + "Check the value configured in 'zookeeper.znode.parent'. "
936             + "There could be a mismatch with the one configured in the master.";
937           LOG.error(errorMsg);
938           throw new MasterNotRunningException(errorMsg);
939         }
940       } catch (KeeperException e) {
941         errorMsg = "Can't get connection to ZooKeeper: " + e.getMessage();
942         LOG.error(errorMsg);
943         throw new MasterNotRunningException(errorMsg, e);
944       }
945     }
946 
947     /**
948      * @return true if the master is running, throws an exception otherwise
949      * @throws MasterNotRunningException - if the master is not running
950      * @throws ZooKeeperConnectionException
951      */
952     @Deprecated
953     @Override
954     public boolean isMasterRunning()
955     throws MasterNotRunningException, ZooKeeperConnectionException {
956       // When getting the master connection, we check it's running,
957       // so if there is no exception, it means we've been able to get a
958       // connection on a running master
959       MasterKeepAliveConnection m = getKeepAliveMasterService();
960       m.close();
961       return true;
962     }
963 
964     @Override
965     public HRegionLocation getRegionLocation(final TableName tableName,
966         final byte [] row, boolean reload)
967     throws IOException {
968       return reload? relocateRegion(tableName, row): locateRegion(tableName, row);
969     }
970 
971     @Override
972     public HRegionLocation getRegionLocation(final byte[] tableName,
973         final byte [] row, boolean reload)
974     throws IOException {
975       return getRegionLocation(TableName.valueOf(tableName), row, reload);
976     }
977 
978     @Override
979     public boolean isTableEnabled(TableName tableName) throws IOException {
980       return this.registry.isTableOnlineState(tableName, true);
981     }
982 
983     @Override
984     public boolean isTableEnabled(byte[] tableName) throws IOException {
985       return isTableEnabled(TableName.valueOf(tableName));
986     }
987 
988     @Override
989     public boolean isTableDisabled(TableName tableName) throws IOException {
990       return this.registry.isTableOnlineState(tableName, false);
991     }
992 
993     @Override
994     public boolean isTableDisabled(byte[] tableName) throws IOException {
995       return isTableDisabled(TableName.valueOf(tableName));
996     }
997 
998     @Override
999     public boolean isTableAvailable(final TableName tableName) throws IOException {
1000       final AtomicBoolean available = new AtomicBoolean(true);
1001       final AtomicInteger regionCount = new AtomicInteger(0);
1002       MetaScannerVisitor visitor = new MetaScannerVisitorBase() {
1003         @Override
1004         public boolean processRow(Result row) throws IOException {
1005           HRegionInfo info = MetaScanner.getHRegionInfo(row);
1006           if (info != null && !info.isSplitParent()) {
1007             if (tableName.equals(info.getTable())) {
1008               ServerName server = HRegionInfo.getServerName(row);
1009               if (server == null) {
1010                 available.set(false);
1011                 return false;
1012               }
1013               regionCount.incrementAndGet();
1014             } else if (tableName.compareTo(info.getTable()) < 0) {
1015               // Return if we are done with the current table
1016               return false;
1017             }
1018           }
1019           return true;
1020         }
1021       };
1022       MetaScanner.metaScan(this, visitor, tableName);
1023       return available.get() && (regionCount.get() > 0);
1024     }
1025 
1026     @Override
1027     public boolean isTableAvailable(final byte[] tableName) throws IOException {
1028       return isTableAvailable(TableName.valueOf(tableName));
1029     }
1030 
1031     @Override
1032     public boolean isTableAvailable(final TableName tableName, final byte[][] splitKeys)
1033         throws IOException {
1034       final AtomicBoolean available = new AtomicBoolean(true);
1035       final AtomicInteger regionCount = new AtomicInteger(0);
1036       MetaScannerVisitor visitor = new MetaScannerVisitorBase() {
1037         @Override
1038         public boolean processRow(Result row) throws IOException {
1039           HRegionInfo info = MetaScanner.getHRegionInfo(row);
1040           if (info != null && !info.isSplitParent()) {
1041             if (tableName.equals(info.getTable())) {
1042               ServerName server = HRegionInfo.getServerName(row);
1043               if (server == null) {
1044                 available.set(false);
1045                 return false;
1046               }
1047               if (!Bytes.equals(info.getStartKey(), HConstants.EMPTY_BYTE_ARRAY)) {
1048                 for (byte[] splitKey : splitKeys) {
1049                   // Just check if the splitkey is available
1050                   if (Bytes.equals(info.getStartKey(), splitKey)) {
1051                     regionCount.incrementAndGet();
1052                     break;
1053                   }
1054                 }
1055               } else {
1056                 // Always empty start row should be counted
1057                 regionCount.incrementAndGet();
1058               }
1059             } else if (tableName.compareTo(info.getTable()) < 0) {
1060               // Return if we are done with the current table
1061               return false;
1062             }
1063           }
1064           return true;
1065         }
1066       };
1067       MetaScanner.metaScan(this, visitor, tableName);
1068       // +1 needs to be added so that the empty start row is also taken into account
1069       return available.get() && (regionCount.get() == splitKeys.length + 1);
1070     }
1071 
1072     @Override
1073     public boolean isTableAvailable(final byte[] tableName, final byte[][] splitKeys)
1074         throws IOException {
1075       return isTableAvailable(TableName.valueOf(tableName), splitKeys);
1076     }
1077 
1078     @Override
1079     public HRegionLocation locateRegion(final byte[] regionName) throws IOException {
1080       RegionLocations locations = locateRegion(HRegionInfo.getTable(regionName),
1081         HRegionInfo.getStartKey(regionName), false, true);
1082       return locations == null ? null : locations.getRegionLocation();
1083     }
1084 
1085     @Override
1086     public boolean isDeadServer(ServerName sn) {
1087       if (clusterStatusListener == null) {
1088         return false;
1089       } else {
1090         return clusterStatusListener.isDeadServer(sn);
1091       }
1092     }
1093 
1094     @Override
1095     public List<HRegionLocation> locateRegions(final TableName tableName)
1096     throws IOException {
1097       return locateRegions (tableName, false, true);
1098     }
1099 
1100     @Override
1101     public List<HRegionLocation> locateRegions(final byte[] tableName)
1102     throws IOException {
1103       return locateRegions(TableName.valueOf(tableName));
1104     }
1105 
1106     @Override
1107     public List<HRegionLocation> locateRegions(final TableName tableName,
1108         final boolean useCache, final boolean offlined) throws IOException {
1109       NavigableMap<HRegionInfo, ServerName> regions = MetaScanner.allTableRegions(this, tableName);
1110       final List<HRegionLocation> locations = new ArrayList<HRegionLocation>();
1111       for (HRegionInfo regionInfo : regions.keySet()) {
1112         RegionLocations list = locateRegion(tableName, regionInfo.getStartKey(), useCache, true);
1113         if (list != null) {
1114           for (HRegionLocation loc : list.getRegionLocations()) {
1115             if (loc != null) {
1116               locations.add(loc);
1117             }
1118           }
1119         }
1120       }
1121       return locations;
1122     }
1123 
1124     @Override
1125     public List<HRegionLocation> locateRegions(final byte[] tableName,
1126        final boolean useCache, final boolean offlined) throws IOException {
1127       return locateRegions(TableName.valueOf(tableName), useCache, offlined);
1128     }
1129 
1130     @Override
1131     public HRegionLocation locateRegion(
1132         final TableName tableName, final byte[] row) throws IOException{
1133       RegionLocations locations = locateRegion(tableName, row, true, true);
1134       return locations == null ? null : locations.getRegionLocation();
1135     }
1136 
1137     @Override
1138     public HRegionLocation locateRegion(final byte[] tableName,
1139         final byte [] row)
1140     throws IOException{
1141       return locateRegion(TableName.valueOf(tableName), row);
1142     }
1143 
1144     @Override
1145     public HRegionLocation relocateRegion(final TableName tableName,
1146         final byte [] row) throws IOException{
1147       RegionLocations locations =  relocateRegion(tableName, row,
1148         RegionReplicaUtil.DEFAULT_REPLICA_ID);
1149       return locations == null ? null :
1150         locations.getRegionLocation(RegionReplicaUtil.DEFAULT_REPLICA_ID);
1151     }
1152 
1153     @Override
1154     public RegionLocations relocateRegion(final TableName tableName,
1155         final byte [] row, int replicaId) throws IOException{
1156       // Since this is an explicit request not to use any caching, finding
1157       // disabled tables should not be desirable.  This will ensure that an exception is thrown when
1158       // the first time a disabled table is interacted with.
1159       if (!tableName.equals(TableName.META_TABLE_NAME) && isTableDisabled(tableName)) {
1160         throw new TableNotEnabledException(tableName.getNameAsString() + " is disabled.");
1161       }
1162 
1163       return locateRegion(tableName, row, false, true, replicaId);
1164     }
1165 
1166     @Override
1167     public HRegionLocation relocateRegion(final byte[] tableName,
1168         final byte [] row) throws IOException {
1169       return relocateRegion(TableName.valueOf(tableName), row);
1170     }
1171 
1172     @Override
1173     public RegionLocations locateRegion(final TableName tableName,
1174       final byte [] row, boolean useCache, boolean retry)
1175     throws IOException {
1176       return locateRegion(tableName, row, useCache, retry, RegionReplicaUtil.DEFAULT_REPLICA_ID);
1177     }
1178 
1179     @Override
1180     public RegionLocations locateRegion(final TableName tableName,
1181       final byte [] row, boolean useCache, boolean retry, int replicaId)
1182     throws IOException {
1183       if (this.closed) throw new DoNotRetryIOException(toString() + " closed");
1184       if (tableName== null || tableName.getName().length == 0) {
1185         throw new IllegalArgumentException(
1186             "table name cannot be null or zero length");
1187       }
1188       if (tableName.equals(TableName.META_TABLE_NAME)) {
1189         return locateMeta(tableName, useCache, replicaId);
1190       } else {
1191         // Region not in the cache - have to go to the meta RS
1192         return locateRegionInMeta(tableName, row, useCache, retry, replicaId);
1193       }
1194     }
1195 
1196     private RegionLocations locateMeta(final TableName tableName,
1197         boolean useCache, int replicaId) throws IOException {
1198       // HBASE-10785: We cache the location of the META itself, so that we are not overloading
1199       // zookeeper with one request for every region lookup. We cache the META with empty row
1200       // key in MetaCache.
1201       byte[] metaCacheKey = HConstants.EMPTY_START_ROW; // use byte[0] as the row for meta
1202       RegionLocations locations = null;
1203       if (useCache) {
1204         locations = getCachedLocation(tableName, metaCacheKey);
1205         if (locations != null && locations.getRegionLocation(replicaId) != null) {
1206           return locations;
1207         }
1208       }
1209 
1210       // only one thread should do the lookup.
1211       synchronized (metaRegionLock) {
1212         // Check the cache again for a hit in case some other thread made the
1213         // same query while we were waiting on the lock.
1214         if (useCache) {
1215           locations = getCachedLocation(tableName, metaCacheKey);
1216           if (locations != null && locations.getRegionLocation(replicaId) != null) {
1217             return locations;
1218           }
1219         }
1220 
1221         // Look up from zookeeper
1222         locations = this.registry.getMetaRegionLocation();
1223         if (locations != null) {
1224           cacheLocation(tableName, locations);
1225         }
1226       }
1227       return locations;
1228     }
1229 
1230     /*
1231       * Search the hbase:meta table for the HRegionLocation
1232       * info that contains the table and row we're seeking.
1233       */
1234     private RegionLocations locateRegionInMeta(TableName tableName, byte[] row,
1235                    boolean useCache, boolean retry, int replicaId) throws IOException {
1236 
1237       // If we are supposed to be using the cache, look in the cache to see if
1238       // we already have the region.
1239       if (useCache) {
1240         RegionLocations locations = getCachedLocation(tableName, row);
1241         if (locations != null && locations.getRegionLocation(replicaId) != null) {
1242           return locations;
1243         }
1244       }
1245 
1246       // build the key of the meta region we should be looking for.
1247       // the extra 9's on the end are necessary to allow "exact" matches
1248       // without knowing the precise region names.
1249       byte[] metaKey = HRegionInfo.createRegionName(tableName, row, HConstants.NINES, false);
1250 
1251       Scan s = new Scan();
1252       s.setReversed(true);
1253       s.setStartRow(metaKey);
1254       s.setSmall(true);
1255       s.setCaching(1);
1256       if (this.useMetaReplicas) {
1257         s.setConsistency(Consistency.TIMELINE);
1258       }
1259 
1260       int localNumRetries = (retry ? numTries : 1);
1261 
1262       for (int tries = 0; true; tries++) {
1263         if (tries >= localNumRetries) {
1264           throw new NoServerForRegionException("Unable to find region for " +
1265             Bytes.toStringBinary(row) + " in " + tableName + " after " + tries + " tries.");
1266         }
1267         if (useCache) {
1268           RegionLocations locations = getCachedLocation(tableName, row);
1269           if (locations != null && locations.getRegionLocation(replicaId) != null) {
1270             return locations;
1271           }
1272         } else {
1273           // If we are not supposed to be using the cache, delete any existing cached location
1274           // so it won't interfere.
1275           metaCache.clearCache(tableName, row);
1276         }
1277 
1278         // Query the meta region
1279         long pauseBase = this.pause;
1280         userRegionLock.lock();
1281         try {
1282           if (useCache) {// re-check cache after get lock
1283             RegionLocations locations = getCachedLocation(tableName, row);
1284             if (locations != null && locations.getRegionLocation(replicaId) != null) {
1285               return locations;
1286             }
1287           }
1288           Result regionInfoRow = null;
1289           ReversedClientScanner rcs = null;
1290           try {
1291             rcs = new ClientSmallReversedScanner(conf, s, TableName.META_TABLE_NAME, this,
1292               rpcCallerFactory, rpcControllerFactory, getMetaLookupPool(), 0);
1293             regionInfoRow = rcs.next();
1294           } finally {
1295             if (rcs != null) {
1296               rcs.close();
1297             }
1298           }
1299 
1300           if (regionInfoRow == null) {
1301             throw new TableNotFoundException(tableName);
1302           }
1303 
1304           // convert the row result into the HRegionLocation we need!
1305           RegionLocations locations = MetaTableAccessor.getRegionLocations(regionInfoRow);
1306           if (locations == null || locations.getRegionLocation(replicaId) == null) {
1307             throw new IOException("HRegionInfo was null in " +
1308               tableName + ", row=" + regionInfoRow);
1309           }
1310           HRegionInfo regionInfo = locations.getRegionLocation(replicaId).getRegionInfo();
1311           if (regionInfo == null) {
1312             throw new IOException("HRegionInfo was null or empty in " +
1313               TableName.META_TABLE_NAME + ", row=" + regionInfoRow);
1314           }
1315 
1316           // possible we got a region of a different table...
1317           if (!regionInfo.getTable().equals(tableName)) {
1318             throw new TableNotFoundException(
1319                   "Table '" + tableName + "' was not found, got: " +
1320                   regionInfo.getTable() + ".");
1321           }
1322           if (regionInfo.isSplit()) {
1323             throw new RegionOfflineException(
1324                 "the only available region for the required row is a split parent," +
1325                 " the daughters should be online soon: " + regionInfo.getRegionNameAsString());
1326           }
1327           if (regionInfo.isOffline()) {
1328             throw new RegionOfflineException("the region is offline, could" +
1329               " be caused by a disable table call: " + regionInfo.getRegionNameAsString());
1330           }
1331 
1332           ServerName serverName = locations.getRegionLocation(replicaId).getServerName();
1333           if (serverName == null) {
1334             throw new NoServerForRegionException("No server address listed in " +
1335               TableName.META_TABLE_NAME + " for region " + regionInfo.getRegionNameAsString() +
1336               " containing row " + Bytes.toStringBinary(row));
1337           }
1338 
1339           if (isDeadServer(serverName)){
1340             throw new RegionServerStoppedException("hbase:meta says the region "+
1341                 regionInfo.getRegionNameAsString()+" is managed by the server " + serverName +
1342                 ", but it is dead.");
1343           }
1344           // Instantiate the location
1345           cacheLocation(tableName, locations);
1346           return locations;
1347         } catch (TableNotFoundException e) {
1348           // if we got this error, probably means the table just plain doesn't
1349           // exist. rethrow the error immediately. this should always be coming
1350           // from the HTable constructor.
1351           throw e;
1352         } catch (IOException e) {
1353           ExceptionUtil.rethrowIfInterrupt(e);
1354 
1355           if (e instanceof RemoteException) {
1356             e = ((RemoteException)e).unwrapRemoteException();
1357           }
1358           if (tries < localNumRetries - 1) {
1359             if (LOG.isDebugEnabled()) {
1360               LOG.debug("locateRegionInMeta parentTable=" + TableName.META_TABLE_NAME +
1361                   ", metaLocation=" + ", attempt=" + tries + " of " + localNumRetries +
1362                   " failed; retrying after sleep of " +
1363                   ConnectionUtils.getPauseTime(pauseBase, tries) + " because: " + e.getMessage());
1364             }
1365           } else {
1366             throw e;
1367           }
1368           // Only relocate the parent region if necessary
1369           if(!(e instanceof RegionOfflineException ||
1370               e instanceof NoServerForRegionException)) {
1371             relocateRegion(TableName.META_TABLE_NAME, metaKey, replicaId);
1372           }
1373         } finally {
1374           userRegionLock.unlock();
1375         }
1376         try{
1377           Thread.sleep(ConnectionUtils.getPauseTime(this.pause, tries));
1378         } catch (InterruptedException e) {
1379           throw new InterruptedIOException("Giving up trying to location region in " +
1380             "meta: thread is interrupted.");
1381         }
1382       }
1383     }
1384 
1385     /**
1386      * Put a newly discovered HRegionLocation into the cache.
1387      * @param tableName The table name.
1388      * @param location the new location
1389      */
1390     @Override
1391     public void cacheLocation(final TableName tableName, final RegionLocations location) {
1392       metaCache.cacheLocation(tableName, location);
1393     }
1394 
1395     /**
1396      * Search the cache for a location that fits our table and row key.
1397      * Return null if no suitable region is located.
1398      *
1399      * @param tableName
1400      * @param row
1401      * @return Null or region location found in cache.
1402      */
1403     RegionLocations getCachedLocation(final TableName tableName,
1404         final byte [] row) {
1405       return metaCache.getCachedLocation(tableName, row);
1406     }
1407 
1408     public void clearRegionCache(final TableName tableName, byte[] row) {
1409       metaCache.clearCache(tableName, row);
1410     }
1411 
1412     /*
1413      * Delete all cached entries of a table that maps to a specific location.
1414      */
1415     @Override
1416     public void clearCaches(final ServerName serverName) {
1417       metaCache.clearCache(serverName);
1418     }
1419 
1420     @Override
1421     public void clearRegionCache() {
1422       metaCache.clearCache();
1423     }
1424 
1425     @Override
1426     public void clearRegionCache(final TableName tableName) {
1427       metaCache.clearCache(tableName);
1428     }
1429 
1430     @Override
1431     public void clearRegionCache(final byte[] tableName) {
1432       clearRegionCache(TableName.valueOf(tableName));
1433     }
1434 
1435     /**
1436      * Put a newly discovered HRegionLocation into the cache.
1437      * @param tableName The table name.
1438      * @param source the source of the new location, if it's not coming from meta
1439      * @param location the new location
1440      */
1441     private void cacheLocation(final TableName tableName, final ServerName source,
1442         final HRegionLocation location) {
1443       metaCache.cacheLocation(tableName, source, location);
1444     }
1445 
1446     // Map keyed by service name + regionserver to service stub implementation
1447     private final ConcurrentHashMap<String, Object> stubs =
1448       new ConcurrentHashMap<String, Object>();
1449     // Map of locks used creating service stubs per regionserver.
1450     private final ConcurrentHashMap<String, String> connectionLock =
1451       new ConcurrentHashMap<String, String>();
1452 
1453     /**
1454      * State of the MasterService connection/setup.
1455      */
1456     static class MasterServiceState {
1457       HConnection connection;
1458       MasterService.BlockingInterface stub;
1459       int userCount;
1460 
1461       MasterServiceState (final HConnection connection) {
1462         super();
1463         this.connection = connection;
1464       }
1465 
1466       @Override
1467       public String toString() {
1468         return "MasterService";
1469       }
1470 
1471       Object getStub() {
1472         return this.stub;
1473       }
1474 
1475       void clearStub() {
1476         this.stub = null;
1477       }
1478 
1479       boolean isMasterRunning() throws ServiceException {
1480         IsMasterRunningResponse response =
1481           this.stub.isMasterRunning(null, RequestConverter.buildIsMasterRunningRequest());
1482         return response != null? response.getIsMasterRunning(): false;
1483       }
1484     }
1485 
1486     /**
1487      * Makes a client-side stub for master services. Sub-class to specialize.
1488      * Depends on hosting class so not static.  Exists so we avoid duplicating a bunch of code
1489      * when setting up the MasterMonitorService and MasterAdminService.
1490      */
1491     abstract class StubMaker {
1492       /**
1493        * Returns the name of the service stub being created.
1494        */
1495       protected abstract String getServiceName();
1496 
1497       /**
1498        * Make stub and cache it internal so can be used later doing the isMasterRunning call.
1499        * @param channel
1500        */
1501       protected abstract Object makeStub(final BlockingRpcChannel channel);
1502 
1503       /**
1504        * Once setup, check it works by doing isMasterRunning check.
1505        * @throws ServiceException
1506        */
1507       protected abstract void isMasterRunning() throws ServiceException;
1508 
1509       /**
1510        * Create a stub. Try once only.  It is not typed because there is no common type to
1511        * protobuf services nor their interfaces.  Let the caller do appropriate casting.
1512        * @return A stub for master services.
1513        * @throws IOException
1514        * @throws KeeperException
1515        * @throws ServiceException
1516        */
1517       private Object makeStubNoRetries() throws IOException, KeeperException, ServiceException {
1518         ZooKeeperKeepAliveConnection zkw;
1519         try {
1520           zkw = getKeepAliveZooKeeperWatcher();
1521         } catch (IOException e) {
1522           ExceptionUtil.rethrowIfInterrupt(e);
1523           throw new ZooKeeperConnectionException("Can't connect to ZooKeeper", e);
1524         }
1525         try {
1526           checkIfBaseNodeAvailable(zkw);
1527           ServerName sn = MasterAddressTracker.getMasterAddress(zkw);
1528           if (sn == null) {
1529             String msg = "ZooKeeper available but no active master location found";
1530             LOG.info(msg);
1531             throw new MasterNotRunningException(msg);
1532           }
1533           if (isDeadServer(sn)) {
1534             throw new MasterNotRunningException(sn + " is dead.");
1535           }
1536           // Use the security info interface name as our stub key
1537           String key = getStubKey(getServiceName(),
1538               sn.getHostname(), sn.getPort(), hostnamesCanChange);
1539           connectionLock.putIfAbsent(key, key);
1540           Object stub = null;
1541           synchronized (connectionLock.get(key)) {
1542             stub = stubs.get(key);
1543             if (stub == null) {
1544               BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(sn, user, rpcTimeout);
1545               stub = makeStub(channel);
1546               isMasterRunning();
1547               stubs.put(key, stub);
1548             }
1549           }
1550           return stub;
1551         } finally {
1552           zkw.close();
1553         }
1554       }
1555 
1556       /**
1557        * Create a stub against the master.  Retry if necessary.
1558        * @return A stub to do <code>intf</code> against the master
1559        * @throws MasterNotRunningException
1560        */
1561       Object makeStub() throws IOException {
1562         // The lock must be at the beginning to prevent multiple master creations
1563         //  (and leaks) in a multithread context
1564         synchronized (masterAndZKLock) {
1565           Exception exceptionCaught = null;
1566           if (!closed) {
1567             try {
1568               return makeStubNoRetries();
1569             } catch (IOException e) {
1570               exceptionCaught = e;
1571             } catch (KeeperException e) {
1572               exceptionCaught = e;
1573             } catch (ServiceException e) {
1574               exceptionCaught = e;
1575             }
1576 
1577             throw new MasterNotRunningException(exceptionCaught);
1578           } else {
1579             throw new DoNotRetryIOException("Connection was closed while trying to get master");
1580           }
1581         }
1582       }
1583     }
1584 
1585     /**
1586      * Class to make a MasterServiceStubMaker stub.
1587      */
1588     class MasterServiceStubMaker extends StubMaker {
1589       private MasterService.BlockingInterface stub;
1590       @Override
1591       protected String getServiceName() {
1592         return MasterService.getDescriptor().getName();
1593       }
1594 
1595       @Override
1596       MasterService.BlockingInterface makeStub() throws IOException {
1597         return (MasterService.BlockingInterface)super.makeStub();
1598       }
1599 
1600       @Override
1601       protected Object makeStub(BlockingRpcChannel channel) {
1602         this.stub = MasterService.newBlockingStub(channel);
1603         return this.stub;
1604       }
1605 
1606       @Override
1607       protected void isMasterRunning() throws ServiceException {
1608         this.stub.isMasterRunning(null, RequestConverter.buildIsMasterRunningRequest());
1609       }
1610     }
1611 
1612     @Override
1613     public AdminService.BlockingInterface getAdmin(final ServerName serverName)
1614         throws IOException {
1615       return getAdmin(serverName, false);
1616     }
1617 
1618     @Override
1619     // Nothing is done w/ the 'master' parameter.  It is ignored.
1620     public AdminService.BlockingInterface getAdmin(final ServerName serverName,
1621       final boolean master)
1622     throws IOException {
1623       if (isDeadServer(serverName)) {
1624         throw new RegionServerStoppedException(serverName + " is dead.");
1625       }
1626       String key = getStubKey(AdminService.BlockingInterface.class.getName(),
1627           serverName.getHostname(), serverName.getPort(), this.hostnamesCanChange);
1628       this.connectionLock.putIfAbsent(key, key);
1629       AdminService.BlockingInterface stub = null;
1630       synchronized (this.connectionLock.get(key)) {
1631         stub = (AdminService.BlockingInterface)this.stubs.get(key);
1632         if (stub == null) {
1633           BlockingRpcChannel channel =
1634               this.rpcClient.createBlockingRpcChannel(serverName, user, rpcTimeout);
1635           stub = AdminService.newBlockingStub(channel);
1636           this.stubs.put(key, stub);
1637         }
1638       }
1639       return stub;
1640     }
1641 
1642     @Override
1643     public ClientService.BlockingInterface getClient(final ServerName sn)
1644     throws IOException {
1645       if (isDeadServer(sn)) {
1646         throw new RegionServerStoppedException(sn + " is dead.");
1647       }
1648       String key = getStubKey(ClientService.BlockingInterface.class.getName(), sn.getHostname(),
1649           sn.getPort(), this.hostnamesCanChange);
1650       this.connectionLock.putIfAbsent(key, key);
1651       ClientService.BlockingInterface stub = null;
1652       synchronized (this.connectionLock.get(key)) {
1653         stub = (ClientService.BlockingInterface)this.stubs.get(key);
1654         if (stub == null) {
1655           BlockingRpcChannel channel =
1656               this.rpcClient.createBlockingRpcChannel(sn, user, rpcTimeout);
1657           stub = ClientService.newBlockingStub(channel);
1658           // In old days, after getting stub/proxy, we'd make a call.  We are not doing that here.
1659           // Just fail on first actual call rather than in here on setup.
1660           this.stubs.put(key, stub);
1661         }
1662       }
1663       return stub;
1664     }
1665 
1666     static String getStubKey(final String serviceName,
1667                              final String rsHostname,
1668                              int port,
1669                              boolean resolveHostnames) {
1670 
1671       // Sometimes, servers go down and they come back up with the same hostname but a different
1672       // IP address. Force a resolution of the rsHostname by trying to instantiate an
1673       // InetSocketAddress, and this way we will rightfully get a new stubKey.
1674       // Also, include the hostname in the key so as to take care of those cases where the
1675       // DNS name is different but IP address remains the same.
1676       String address = rsHostname;
1677       if (resolveHostnames) {
1678         InetAddress i =  new InetSocketAddress(rsHostname, port).getAddress();
1679         if (i != null) {
1680           address = i.getHostAddress() + "-" + rsHostname;
1681         }
1682       }
1683       return serviceName + "@" + address + ":" + port;
1684     }
1685 
1686     private ZooKeeperKeepAliveConnection keepAliveZookeeper;
1687     private AtomicInteger keepAliveZookeeperUserCount = new AtomicInteger(0);
1688     private boolean canCloseZKW = true;
1689 
1690     // keepAlive time, in ms. No reason to make it configurable.
1691     private static final long keepAlive = 5 * 60 * 1000;
1692 
1693     /**
1694      * Retrieve a shared ZooKeeperWatcher. You must close it it once you've have finished with it.
1695      * @return The shared instance. Never returns null.
1696      */
1697     ZooKeeperKeepAliveConnection getKeepAliveZooKeeperWatcher()
1698       throws IOException {
1699       synchronized (masterAndZKLock) {
1700         if (keepAliveZookeeper == null) {
1701           if (this.closed) {
1702             throw new IOException(toString() + " closed");
1703           }
1704           // We don't check that our link to ZooKeeper is still valid
1705           // But there is a retry mechanism in the ZooKeeperWatcher itself
1706           keepAliveZookeeper = new ZooKeeperKeepAliveConnection(conf, this.toString(), this);
1707         }
1708         keepAliveZookeeperUserCount.addAndGet(1);
1709         keepZooKeeperWatcherAliveUntil = Long.MAX_VALUE;
1710         return keepAliveZookeeper;
1711       }
1712     }
1713 
1714     void releaseZooKeeperWatcher(final ZooKeeperWatcher zkw) {
1715       if (zkw == null){
1716         return;
1717       }
1718       if (keepAliveZookeeperUserCount.addAndGet(-1) <= 0 ){
1719         keepZooKeeperWatcherAliveUntil = System.currentTimeMillis() + keepAlive;
1720       }
1721     }
1722 
1723     private void closeZooKeeperWatcher() {
1724       synchronized (masterAndZKLock) {
1725         if (keepAliveZookeeper != null) {
1726           LOG.info("Closing zookeeper sessionid=0x" +
1727             Long.toHexString(
1728               keepAliveZookeeper.getRecoverableZooKeeper().getSessionId()));
1729           keepAliveZookeeper.internalClose();
1730           keepAliveZookeeper = null;
1731         }
1732         keepAliveZookeeperUserCount.set(0);
1733       }
1734     }
1735 
1736     final MasterServiceState masterServiceState = new MasterServiceState(this);
1737 
1738     @Override
1739     public MasterService.BlockingInterface getMaster() throws MasterNotRunningException {
1740       return getKeepAliveMasterService();
1741     }
1742 
1743     private void resetMasterServiceState(final MasterServiceState mss) {
1744       mss.userCount++;
1745     }
1746 
1747     @Override
1748     public MasterKeepAliveConnection getKeepAliveMasterService()
1749     throws MasterNotRunningException {
1750       synchronized (masterAndZKLock) {
1751         if (!isKeepAliveMasterConnectedAndRunning(this.masterServiceState)) {
1752           MasterServiceStubMaker stubMaker = new MasterServiceStubMaker();
1753           try {
1754             this.masterServiceState.stub = stubMaker.makeStub();
1755           } catch (MasterNotRunningException ex) {
1756             throw ex;
1757           } catch (IOException e) {
1758             // rethrow as MasterNotRunningException so that we can keep the method sig
1759             throw new MasterNotRunningException(e);
1760           }
1761         }
1762         resetMasterServiceState(this.masterServiceState);
1763       }
1764       // Ugly delegation just so we can add in a Close method.
1765       final MasterService.BlockingInterface stub = this.masterServiceState.stub;
1766       return new MasterKeepAliveConnection() {
1767         MasterServiceState mss = masterServiceState;
1768         @Override
1769         public MasterProtos.AbortProcedureResponse abortProcedure(
1770           RpcController controller,
1771           MasterProtos.AbortProcedureRequest request) throws ServiceException {
1772           return stub.abortProcedure(controller, request);
1773         }
1774         @Override
1775         public MasterProtos.ListProceduresResponse listProcedures(
1776             RpcController controller,
1777             MasterProtos.ListProceduresRequest request) throws ServiceException {
1778           return stub.listProcedures(controller, request);
1779         }
1780         @Override
1781         public AddColumnResponse addColumn(RpcController controller, AddColumnRequest request)
1782         throws ServiceException {
1783           return stub.addColumn(controller, request);
1784         }
1785 
1786         @Override
1787         public DeleteColumnResponse deleteColumn(RpcController controller,
1788             DeleteColumnRequest request)
1789         throws ServiceException {
1790           return stub.deleteColumn(controller, request);
1791         }
1792 
1793         @Override
1794         public ModifyColumnResponse modifyColumn(RpcController controller,
1795             ModifyColumnRequest request)
1796         throws ServiceException {
1797           return stub.modifyColumn(controller, request);
1798         }
1799 
1800         @Override
1801         public MoveRegionResponse moveRegion(RpcController controller,
1802             MoveRegionRequest request) throws ServiceException {
1803           return stub.moveRegion(controller, request);
1804         }
1805 
1806         @Override
1807         public DispatchMergingRegionsResponse dispatchMergingRegions(
1808             RpcController controller, DispatchMergingRegionsRequest request)
1809             throws ServiceException {
1810           return stub.dispatchMergingRegions(controller, request);
1811         }
1812 
1813         @Override
1814         public AssignRegionResponse assignRegion(RpcController controller,
1815             AssignRegionRequest request) throws ServiceException {
1816           return stub.assignRegion(controller, request);
1817         }
1818 
1819         @Override
1820         public UnassignRegionResponse unassignRegion(RpcController controller,
1821             UnassignRegionRequest request) throws ServiceException {
1822           return stub.unassignRegion(controller, request);
1823         }
1824 
1825         @Override
1826         public OfflineRegionResponse offlineRegion(RpcController controller,
1827             OfflineRegionRequest request) throws ServiceException {
1828           return stub.offlineRegion(controller, request);
1829         }
1830 
1831         @Override
1832         public DeleteTableResponse deleteTable(RpcController controller,
1833             DeleteTableRequest request) throws ServiceException {
1834           return stub.deleteTable(controller, request);
1835         }
1836 
1837         @Override
1838         public TruncateTableResponse truncateTable(RpcController controller,
1839             TruncateTableRequest request) throws ServiceException {
1840           return stub.truncateTable(controller, request);
1841         }
1842 
1843         @Override
1844         public EnableTableResponse enableTable(RpcController controller,
1845             EnableTableRequest request) throws ServiceException {
1846           return stub.enableTable(controller, request);
1847         }
1848 
1849         @Override
1850         public DisableTableResponse disableTable(RpcController controller,
1851             DisableTableRequest request) throws ServiceException {
1852           return stub.disableTable(controller, request);
1853         }
1854 
1855         @Override
1856         public ModifyTableResponse modifyTable(RpcController controller,
1857             ModifyTableRequest request) throws ServiceException {
1858           return stub.modifyTable(controller, request);
1859         }
1860 
1861         @Override
1862         public CreateTableResponse createTable(RpcController controller,
1863             CreateTableRequest request) throws ServiceException {
1864           return stub.createTable(controller, request);
1865         }
1866 
1867         @Override
1868         public ShutdownResponse shutdown(RpcController controller,
1869             ShutdownRequest request) throws ServiceException {
1870           return stub.shutdown(controller, request);
1871         }
1872 
1873         @Override
1874         public StopMasterResponse stopMaster(RpcController controller,
1875             StopMasterRequest request) throws ServiceException {
1876           return stub.stopMaster(controller, request);
1877         }
1878 
1879         @Override
1880         public BalanceResponse balance(RpcController controller,
1881             BalanceRequest request) throws ServiceException {
1882           return stub.balance(controller, request);
1883         }
1884 
1885         @Override
1886         public SetBalancerRunningResponse setBalancerRunning(
1887             RpcController controller, SetBalancerRunningRequest request)
1888             throws ServiceException {
1889           return stub.setBalancerRunning(controller, request);
1890         }
1891 
1892         @Override
1893         public NormalizeResponse normalize(RpcController controller,
1894                                        NormalizeRequest request) throws ServiceException {
1895           return stub.normalize(controller, request);
1896         }
1897 
1898         @Override
1899         public SetNormalizerRunningResponse setNormalizerRunning(
1900           RpcController controller, SetNormalizerRunningRequest request)
1901           throws ServiceException {
1902           return stub.setNormalizerRunning(controller, request);
1903         }
1904 
1905         @Override
1906         public RunCatalogScanResponse runCatalogScan(RpcController controller,
1907             RunCatalogScanRequest request) throws ServiceException {
1908           return stub.runCatalogScan(controller, request);
1909         }
1910 
1911         @Override
1912         public EnableCatalogJanitorResponse enableCatalogJanitor(
1913             RpcController controller, EnableCatalogJanitorRequest request)
1914             throws ServiceException {
1915           return stub.enableCatalogJanitor(controller, request);
1916         }
1917 
1918         @Override
1919         public IsCatalogJanitorEnabledResponse isCatalogJanitorEnabled(
1920             RpcController controller, IsCatalogJanitorEnabledRequest request)
1921             throws ServiceException {
1922           return stub.isCatalogJanitorEnabled(controller, request);
1923         }
1924 
1925         @Override
1926         public CoprocessorServiceResponse execMasterService(
1927             RpcController controller, CoprocessorServiceRequest request)
1928             throws ServiceException {
1929           return stub.execMasterService(controller, request);
1930         }
1931 
1932         @Override
1933         public SnapshotResponse snapshot(RpcController controller,
1934             SnapshotRequest request) throws ServiceException {
1935           return stub.snapshot(controller, request);
1936         }
1937 
1938         @Override
1939         public GetCompletedSnapshotsResponse getCompletedSnapshots(
1940             RpcController controller, GetCompletedSnapshotsRequest request)
1941             throws ServiceException {
1942           return stub.getCompletedSnapshots(controller, request);
1943         }
1944 
1945         @Override
1946         public DeleteSnapshotResponse deleteSnapshot(RpcController controller,
1947             DeleteSnapshotRequest request) throws ServiceException {
1948           return stub.deleteSnapshot(controller, request);
1949         }
1950 
1951         @Override
1952         public IsSnapshotDoneResponse isSnapshotDone(RpcController controller,
1953             IsSnapshotDoneRequest request) throws ServiceException {
1954           return stub.isSnapshotDone(controller, request);
1955         }
1956 
1957         @Override
1958         public RestoreSnapshotResponse restoreSnapshot(
1959             RpcController controller, RestoreSnapshotRequest request)
1960             throws ServiceException {
1961           return stub.restoreSnapshot(controller, request);
1962         }
1963 
1964         @Override
1965         public IsRestoreSnapshotDoneResponse isRestoreSnapshotDone(
1966             RpcController controller, IsRestoreSnapshotDoneRequest request)
1967             throws ServiceException {
1968           return stub.isRestoreSnapshotDone(controller, request);
1969         }
1970 
1971         @Override
1972         public ExecProcedureResponse execProcedure(
1973             RpcController controller, ExecProcedureRequest request)
1974             throws ServiceException {
1975           return stub.execProcedure(controller, request);
1976         }
1977 
1978         @Override
1979         public ExecProcedureResponse execProcedureWithRet(
1980             RpcController controller, ExecProcedureRequest request)
1981             throws ServiceException {
1982           return stub.execProcedureWithRet(controller, request);
1983         }
1984 
1985         @Override
1986         public IsProcedureDoneResponse isProcedureDone(RpcController controller,
1987             IsProcedureDoneRequest request) throws ServiceException {
1988           return stub.isProcedureDone(controller, request);
1989         }
1990 
1991         @Override
1992         public GetProcedureResultResponse getProcedureResult(RpcController controller,
1993             GetProcedureResultRequest request) throws ServiceException {
1994           return stub.getProcedureResult(controller, request);
1995         }
1996 
1997         @Override
1998         public IsMasterRunningResponse isMasterRunning(
1999             RpcController controller, IsMasterRunningRequest request)
2000             throws ServiceException {
2001           return stub.isMasterRunning(controller, request);
2002         }
2003 
2004         @Override
2005         public ModifyNamespaceResponse modifyNamespace(RpcController controller,
2006             ModifyNamespaceRequest request)
2007         throws ServiceException {
2008           return stub.modifyNamespace(controller, request);
2009         }
2010 
2011         @Override
2012         public CreateNamespaceResponse createNamespace(
2013             RpcController controller, CreateNamespaceRequest request) throws ServiceException {
2014           return stub.createNamespace(controller, request);
2015         }
2016 
2017         @Override
2018         public DeleteNamespaceResponse deleteNamespace(
2019             RpcController controller, DeleteNamespaceRequest request) throws ServiceException {
2020           return stub.deleteNamespace(controller, request);
2021         }
2022 
2023         @Override
2024         public GetNamespaceDescriptorResponse getNamespaceDescriptor(RpcController controller,
2025             GetNamespaceDescriptorRequest request) throws ServiceException {
2026           return stub.getNamespaceDescriptor(controller, request);
2027         }
2028 
2029         @Override
2030         public ListNamespaceDescriptorsResponse listNamespaceDescriptors(RpcController controller,
2031             ListNamespaceDescriptorsRequest request) throws ServiceException {
2032           return stub.listNamespaceDescriptors(controller, request);
2033         }
2034 
2035         @Override
2036         public ListTableDescriptorsByNamespaceResponse listTableDescriptorsByNamespace(
2037             RpcController controller, ListTableDescriptorsByNamespaceRequest request)
2038                 throws ServiceException {
2039           return stub.listTableDescriptorsByNamespace(controller, request);
2040         }
2041 
2042         @Override
2043         public ListTableNamesByNamespaceResponse listTableNamesByNamespace(
2044             RpcController controller, ListTableNamesByNamespaceRequest request)
2045                 throws ServiceException {
2046           return stub.listTableNamesByNamespace(controller, request);
2047         }
2048 
2049         @Override
2050         public void close() {
2051           release(this.mss);
2052         }
2053 
2054         @Override
2055         public GetSchemaAlterStatusResponse getSchemaAlterStatus(
2056             RpcController controller, GetSchemaAlterStatusRequest request)
2057             throws ServiceException {
2058           return stub.getSchemaAlterStatus(controller, request);
2059         }
2060 
2061         @Override
2062         public GetTableDescriptorsResponse getTableDescriptors(
2063             RpcController controller, GetTableDescriptorsRequest request)
2064             throws ServiceException {
2065           return stub.getTableDescriptors(controller, request);
2066         }
2067 
2068         @Override
2069         public GetTableNamesResponse getTableNames(
2070             RpcController controller, GetTableNamesRequest request)
2071             throws ServiceException {
2072           return stub.getTableNames(controller, request);
2073         }
2074 
2075         @Override
2076         public GetClusterStatusResponse getClusterStatus(
2077             RpcController controller, GetClusterStatusRequest request)
2078             throws ServiceException {
2079           return stub.getClusterStatus(controller, request);
2080         }
2081 
2082         @Override
2083         public SetQuotaResponse setQuota(RpcController controller, SetQuotaRequest request)
2084             throws ServiceException {
2085           return stub.setQuota(controller, request);
2086         }
2087 
2088         @Override
2089         public MajorCompactionTimestampResponse getLastMajorCompactionTimestamp(
2090             RpcController controller, MajorCompactionTimestampRequest request)
2091             throws ServiceException {
2092           return stub.getLastMajorCompactionTimestamp(controller, request);
2093         }
2094 
2095         @Override
2096         public MajorCompactionTimestampResponse getLastMajorCompactionTimestampForRegion(
2097             RpcController controller, MajorCompactionTimestampForRegionRequest request)
2098             throws ServiceException {
2099           return stub.getLastMajorCompactionTimestampForRegion(controller, request);
2100         }
2101 
2102         @Override
2103         public IsBalancerEnabledResponse isBalancerEnabled(RpcController controller,
2104             IsBalancerEnabledRequest request) throws ServiceException {
2105           return stub.isBalancerEnabled(controller, request);
2106         }
2107 
2108         @Override
2109         public IsNormalizerEnabledResponse isNormalizerEnabled(RpcController controller,
2110             IsNormalizerEnabledRequest request) throws ServiceException {
2111           return stub.isNormalizerEnabled(controller, request);
2112         }
2113 
2114         @Override
2115         public SecurityCapabilitiesResponse getSecurityCapabilities(RpcController controller,
2116             SecurityCapabilitiesRequest request) throws ServiceException {
2117           return stub.getSecurityCapabilities(controller, request);
2118         }
2119       };
2120     }
2121 
2122 
2123     private static void release(MasterServiceState mss) {
2124       if (mss != null && mss.connection != null) {
2125         ((HConnectionImplementation)mss.connection).releaseMaster(mss);
2126       }
2127     }
2128 
2129     private boolean isKeepAliveMasterConnectedAndRunning(MasterServiceState mss) {
2130       if (mss.getStub() == null){
2131         return false;
2132       }
2133       try {
2134         return mss.isMasterRunning();
2135       } catch (UndeclaredThrowableException e) {
2136         // It's somehow messy, but we can receive exceptions such as
2137         //  java.net.ConnectException but they're not declared. So we catch it...
2138         LOG.info("Master connection is not running anymore", e.getUndeclaredThrowable());
2139         return false;
2140       } catch (ServiceException se) {
2141         LOG.warn("Checking master connection", se);
2142         return false;
2143       }
2144     }
2145 
2146     void releaseMaster(MasterServiceState mss) {
2147       if (mss.getStub() == null) return;
2148       synchronized (masterAndZKLock) {
2149         --mss.userCount;
2150       }
2151     }
2152 
2153     private void closeMasterService(MasterServiceState mss) {
2154       if (mss.getStub() != null) {
2155         LOG.info("Closing master protocol: " + mss);
2156         mss.clearStub();
2157       }
2158       mss.userCount = 0;
2159     }
2160 
2161     /**
2162      * Immediate close of the shared master. Can be by the delayed close or when closing the
2163      * connection itself.
2164      */
2165     private void closeMaster() {
2166       synchronized (masterAndZKLock) {
2167         closeMasterService(masterServiceState);
2168       }
2169     }
2170 
2171     void updateCachedLocation(HRegionInfo hri, ServerName source,
2172                               ServerName serverName, long seqNum) {
2173       HRegionLocation newHrl = new HRegionLocation(hri, serverName, seqNum);
2174       cacheLocation(hri.getTable(), source, newHrl);
2175     }
2176 
2177     @Override
2178     public void deleteCachedRegionLocation(final HRegionLocation location) {
2179       metaCache.clearCache(location);
2180     }
2181 
2182     @Override
2183     public void updateCachedLocations(final TableName tableName, byte[] rowkey,
2184         final Object exception, final HRegionLocation source) {
2185       assert source != null;
2186       updateCachedLocations(tableName, source.getRegionInfo().getRegionName()
2187         , rowkey, exception, source.getServerName());
2188     }
2189 
2190     /**
2191      * Update the location with the new value (if the exception is a RegionMovedException)
2192      * or delete it from the cache. Does nothing if we can be sure from the exception that
2193      * the location is still accurate, or if the cache has already been updated.
2194      * @param exception an object (to simplify user code) on which we will try to find a nested
2195      *                  or wrapped or both RegionMovedException
2196      * @param source server that is the source of the location update.
2197      */
2198     @Override
2199     public void updateCachedLocations(final TableName tableName, byte[] regionName, byte[] rowkey,
2200       final Object exception, final ServerName source) {
2201       if (rowkey == null || tableName == null) {
2202         LOG.warn("Coding error, see method javadoc. row=" + (rowkey == null ? "null" : rowkey) +
2203             ", tableName=" + (tableName == null ? "null" : tableName));
2204         return;
2205       }
2206 
2207       if (source == null) {
2208         // This should not happen, but let's secure ourselves.
2209         return;
2210       }
2211 
2212       if (regionName == null) {
2213         // we do not know which region, so just remove the cache entry for the row and server
2214         metaCache.clearCache(tableName, rowkey, source);
2215         return;
2216       }
2217 
2218       // Is it something we have already updated?
2219       final RegionLocations oldLocations = getCachedLocation(tableName, rowkey);
2220       HRegionLocation oldLocation = null;
2221       if (oldLocations != null) {
2222         oldLocation = oldLocations.getRegionLocationByRegionName(regionName);
2223       }
2224       if (oldLocation == null || !source.equals(oldLocation.getServerName())) {
2225         // There is no such location in the cache (it's been removed already) or
2226         // the cache has already been refreshed with a different location.  => nothing to do
2227         return;
2228       }
2229 
2230       HRegionInfo regionInfo = oldLocation.getRegionInfo();
2231       Throwable cause = ClientExceptionsUtil.findException(exception);
2232       if (cause != null) {
2233         if (!ClientExceptionsUtil.isMetaClearingException(cause)) {
2234           // We know that the region is still on this region server
2235           return;
2236         }
2237 
2238         if (cause instanceof RegionMovedException) {
2239           RegionMovedException rme = (RegionMovedException) cause;
2240           if (LOG.isTraceEnabled()) {
2241             LOG.trace("Region " + regionInfo.getRegionNameAsString() + " moved to " +
2242                 rme.getHostname() + ":" + rme.getPort() +
2243                 " according to " + source.getHostAndPort());
2244           }
2245           // We know that the region is not anymore on this region server, but we know
2246           //  the new location.
2247           updateCachedLocation(
2248               regionInfo, source, rme.getServerName(), rme.getLocationSeqNum());
2249           return;
2250         }
2251       }
2252 
2253       // If we're here, it means that can cannot be sure about the location, so we remove it from
2254       // the cache. Do not send the source because source can be a new server in the same host:port
2255       metaCache.clearCache(regionInfo);
2256     }
2257 
2258     @Override
2259     public void updateCachedLocations(final byte[] tableName, byte[] rowkey,
2260       final Object exception, final HRegionLocation source) {
2261       updateCachedLocations(TableName.valueOf(tableName), rowkey, exception, source);
2262     }
2263 
2264     @Override
2265     @Deprecated
2266     public void processBatch(List<? extends Row> list,
2267         final TableName tableName,
2268         ExecutorService pool,
2269         Object[] results) throws IOException, InterruptedException {
2270       // This belongs in HTable!!! Not in here.  St.Ack
2271 
2272       // results must be the same size as list
2273       if (results.length != list.size()) {
2274         throw new IllegalArgumentException(
2275           "argument results must be the same size as argument list");
2276       }
2277       processBatchCallback(list, tableName, pool, results, null);
2278     }
2279 
2280     @Override
2281     @Deprecated
2282     public void processBatch(List<? extends Row> list,
2283         final byte[] tableName,
2284         ExecutorService pool,
2285         Object[] results) throws IOException, InterruptedException {
2286       processBatch(list, TableName.valueOf(tableName), pool, results);
2287     }
2288 
2289     /**
2290      * Send the queries in parallel on the different region servers. Retries on failures.
2291      * If the method returns it means that there is no error, and the 'results' array will
2292      * contain no exception. On error, an exception is thrown, and the 'results' array will
2293      * contain results and exceptions.
2294      * @deprecated since 0.96 - Use {@link HTable#processBatchCallback} instead
2295      */
2296     @Override
2297     @Deprecated
2298     public <R> void processBatchCallback(
2299       List<? extends Row> list,
2300       TableName tableName,
2301       ExecutorService pool,
2302       Object[] results,
2303       Batch.Callback<R> callback)
2304       throws IOException, InterruptedException {
2305 
2306       AsyncRequestFuture ars = this.asyncProcess.submitAll(
2307           pool, tableName, list, callback, results);
2308       ars.waitUntilDone();
2309       if (ars.hasError()) {
2310         throw ars.getErrors();
2311       }
2312     }
2313 
2314     @Override
2315     @Deprecated
2316     public <R> void processBatchCallback(
2317       List<? extends Row> list,
2318       byte[] tableName,
2319       ExecutorService pool,
2320       Object[] results,
2321       Batch.Callback<R> callback)
2322       throws IOException, InterruptedException {
2323       processBatchCallback(list, TableName.valueOf(tableName), pool, results, callback);
2324     }
2325 
2326     // For tests to override.
2327     protected AsyncProcess createAsyncProcess(Configuration conf) {
2328       // No default pool available.
2329       return new AsyncProcess(this, conf, this.batchPool,
2330           RpcRetryingCallerFactory.instantiate(conf, this.getStatisticsTracker()), false,
2331           RpcControllerFactory.instantiate(conf));
2332     }
2333 
2334     @Override
2335     public AsyncProcess getAsyncProcess() {
2336       return asyncProcess;
2337     }
2338 
2339     @Override
2340     public ServerStatisticTracker getStatisticsTracker() {
2341       return this.stats;
2342     }
2343 
2344     @Override
2345     public ClientBackoffPolicy getBackoffPolicy() {
2346       return this.backoffPolicy;
2347     }
2348 
2349     /*
2350      * Return the number of cached region for a table. It will only be called
2351      * from a unit test.
2352      */
2353     @VisibleForTesting
2354     int getNumberOfCachedRegionLocations(final TableName tableName) {
2355       return metaCache.getNumberOfCachedRegionLocations(tableName);
2356     }
2357 
2358     @Override
2359     @Deprecated
2360     public void setRegionCachePrefetch(final TableName tableName, final boolean enable) {
2361     }
2362 
2363     @Override
2364     @Deprecated
2365     public void setRegionCachePrefetch(final byte[] tableName,
2366         final boolean enable) {
2367     }
2368 
2369     @Override
2370     @Deprecated
2371     public boolean getRegionCachePrefetch(TableName tableName) {
2372       return false;
2373     }
2374 
2375     @Override
2376     @Deprecated
2377     public boolean getRegionCachePrefetch(byte[] tableName) {
2378       return false;
2379     }
2380 
2381     @Override
2382     public void abort(final String msg, Throwable t) {
2383       if (t instanceof KeeperException.SessionExpiredException
2384         && keepAliveZookeeper != null) {
2385         synchronized (masterAndZKLock) {
2386           if (keepAliveZookeeper != null) {
2387             LOG.warn("This client just lost it's session with ZooKeeper," +
2388               " closing it." +
2389               " It will be recreated next time someone needs it", t);
2390             closeZooKeeperWatcher();
2391           }
2392         }
2393       } else {
2394         if (t != null) {
2395           LOG.fatal(msg, t);
2396         } else {
2397           LOG.fatal(msg);
2398         }
2399         this.aborted = true;
2400         close();
2401         this.closed = true;
2402       }
2403     }
2404 
2405     @Override
2406     public boolean isClosed() {
2407       return this.closed;
2408     }
2409 
2410     @Override
2411     public boolean isAborted(){
2412       return this.aborted;
2413     }
2414 
2415     @Override
2416     public int getCurrentNrHRS() throws IOException {
2417       return this.registry.getCurrentNrHRS();
2418     }
2419 
2420     /**
2421      * Increment this client's reference count.
2422      */
2423     void incCount() {
2424       ++refCount;
2425     }
2426 
2427     /**
2428      * Decrement this client's reference count.
2429      */
2430     void decCount() {
2431       if (refCount > 0) {
2432         --refCount;
2433       }
2434     }
2435 
2436     /**
2437      * Return if this client has no reference
2438      *
2439      * @return true if this client has no reference; false otherwise
2440      */
2441     boolean isZeroReference() {
2442       return refCount == 0;
2443     }
2444 
2445     void internalClose() {
2446       if (this.closed) {
2447         return;
2448       }
2449       closeMaster();
2450       shutdownPools();
2451       if (this.metrics != null) {
2452         this.metrics.shutdown();
2453       }
2454       this.closed = true;
2455       closeZooKeeperWatcher();
2456       this.stubs.clear();
2457       if (clusterStatusListener != null) {
2458         clusterStatusListener.close();
2459       }
2460       if (rpcClient != null) {
2461         rpcClient.close();
2462       }
2463     }
2464 
2465     @Override
2466     public void close() {
2467       if (managed) {
2468         if (aborted) {
2469           ConnectionManager.deleteStaleConnection(this);
2470         } else {
2471           ConnectionManager.deleteConnection(this, false);
2472         }
2473       } else {
2474         internalClose();
2475       }
2476     }
2477 
2478     /**
2479      * Close the connection for good, regardless of what the current value of
2480      * {@link #refCount} is. Ideally, {@link #refCount} should be zero at this
2481      * point, which would be the case if all of its consumers close the
2482      * connection. However, on the off chance that someone is unable to close
2483      * the connection, perhaps because it bailed out prematurely, the method
2484      * below will ensure that this {@link HConnection} instance is cleaned up.
2485      * Caveat: The JVM may take an unknown amount of time to call finalize on an
2486      * unreachable object, so our hope is that every consumer cleans up after
2487      * itself, like any good citizen.
2488      */
2489     @Override
2490     protected void finalize() throws Throwable {
2491       super.finalize();
2492       // Pretend as if we are about to release the last remaining reference
2493       refCount = 1;
2494       close();
2495     }
2496 
2497     /**
2498      * @deprecated Use {@link Admin#listTables()} instead
2499      */
2500     @Deprecated
2501     @Override
2502     public HTableDescriptor[] listTables() throws IOException {
2503       MasterKeepAliveConnection master = getKeepAliveMasterService();
2504       try {
2505         GetTableDescriptorsRequest req =
2506           RequestConverter.buildGetTableDescriptorsRequest((List<TableName>)null);
2507         return ProtobufUtil.getHTableDescriptorArray(master.getTableDescriptors(null, req));
2508       } catch (ServiceException se) {
2509         throw ProtobufUtil.getRemoteException(se);
2510       } finally {
2511         master.close();
2512       }
2513     }
2514 
2515     /**
2516      * @deprecated Use {@link Admin#listTableNames()} instead
2517      */
2518     @Deprecated
2519     @Override
2520     public String[] getTableNames() throws IOException {
2521       TableName[] tableNames = listTableNames();
2522       String result[] = new String[tableNames.length];
2523       for (int i = 0; i < tableNames.length; i++) {
2524         result[i] = tableNames[i].getNameAsString();
2525       }
2526       return result;
2527     }
2528 
2529     /**
2530      * @deprecated Use {@link Admin#listTableNames()} instead
2531      */
2532     @Deprecated
2533     @Override
2534     public TableName[] listTableNames() throws IOException {
2535       MasterKeepAliveConnection master = getKeepAliveMasterService();
2536       try {
2537         return ProtobufUtil.getTableNameArray(master.getTableNames(null,
2538             GetTableNamesRequest.newBuilder().build())
2539           .getTableNamesList());
2540       } catch (ServiceException se) {
2541         throw ProtobufUtil.getRemoteException(se);
2542       } finally {
2543         master.close();
2544       }
2545     }
2546 
2547     /**
2548      * @deprecated Use {@link Admin#getTableDescriptorsByTableName(List)} instead
2549      */
2550     @Deprecated
2551     @Override
2552     public HTableDescriptor[] getHTableDescriptorsByTableName(
2553         List<TableName> tableNames) throws IOException {
2554       if (tableNames == null || tableNames.isEmpty()) return new HTableDescriptor[0];
2555       MasterKeepAliveConnection master = getKeepAliveMasterService();
2556       try {
2557         GetTableDescriptorsRequest req =
2558           RequestConverter.buildGetTableDescriptorsRequest(tableNames);
2559         return ProtobufUtil.getHTableDescriptorArray(master.getTableDescriptors(null, req));
2560       } catch (ServiceException se) {
2561         throw ProtobufUtil.getRemoteException(se);
2562       } finally {
2563         master.close();
2564       }
2565     }
2566 
2567     /**
2568      * @deprecated Use {@link Admin#getTableDescriptorsByTableName(List)} instead
2569      */
2570     @Deprecated
2571     @Override
2572     public HTableDescriptor[] getHTableDescriptors(
2573         List<String> names) throws IOException {
2574       List<TableName> tableNames = new ArrayList<TableName>(names.size());
2575       for(String name : names) {
2576         tableNames.add(TableName.valueOf(name));
2577       }
2578 
2579       return getHTableDescriptorsByTableName(tableNames);
2580     }
2581 
2582     @Override
2583     public NonceGenerator getNonceGenerator() {
2584       return this.nonceGenerator;
2585     }
2586 
2587     /**
2588      * Connects to the master to get the table descriptor.
2589      * @param tableName table name
2590      * @throws IOException if the connection to master fails or if the table
2591      *  is not found.
2592      * @deprecated Use {@link Admin#getTableDescriptor(TableName)} instead
2593      */
2594     @Deprecated
2595     @Override
2596     public HTableDescriptor getHTableDescriptor(final TableName tableName)
2597     throws IOException {
2598       if (tableName == null) return null;
2599       MasterKeepAliveConnection master = getKeepAliveMasterService();
2600       GetTableDescriptorsResponse htds;
2601       try {
2602         GetTableDescriptorsRequest req =
2603           RequestConverter.buildGetTableDescriptorsRequest(tableName);
2604         htds = master.getTableDescriptors(null, req);
2605       } catch (ServiceException se) {
2606         throw ProtobufUtil.getRemoteException(se);
2607       } finally {
2608         master.close();
2609       }
2610       if (!htds.getTableSchemaList().isEmpty()) {
2611         return HTableDescriptor.convert(htds.getTableSchemaList().get(0));
2612       }
2613       throw new TableNotFoundException(tableName.getNameAsString());
2614     }
2615 
2616     /**
2617      * @deprecated Use {@link Admin#getTableDescriptor(TableName)} instead
2618      */
2619     @Deprecated
2620     @Override
2621     public HTableDescriptor getHTableDescriptor(final byte[] tableName)
2622     throws IOException {
2623       return getHTableDescriptor(TableName.valueOf(tableName));
2624     }
2625 
2626     @Override
2627     public RpcRetryingCallerFactory getNewRpcRetryingCallerFactory(Configuration conf) {
2628       return RpcRetryingCallerFactory
2629           .instantiate(conf, this.interceptor, this.getStatisticsTracker());
2630     }
2631 
2632     @Override
2633     public boolean isManaged() {
2634       return managed;
2635     }
2636 
2637     @Override
2638     public boolean hasCellBlockSupport() {
2639       return this.rpcClient.hasCellBlockSupport();
2640     }
2641     
2642     @Override
2643     public ConnectionConfiguration getConnectionConfiguration() {
2644       return this.connectionConfig;
2645     }
2646 
2647     @Override
2648     public RpcRetryingCallerFactory getRpcRetryingCallerFactory() {
2649       return this.rpcCallerFactory;
2650     }
2651 
2652     @Override
2653     public RpcControllerFactory getRpcControllerFactory() {
2654       return this.rpcControllerFactory;
2655     }
2656   }
2657 
2658   /**
2659    * The record of errors for servers.
2660    */
2661   static class ServerErrorTracker {
2662     // We need a concurrent map here, as we could have multiple threads updating it in parallel.
2663     private final ConcurrentMap<ServerName, ServerErrors> errorsByServer =
2664         new ConcurrentHashMap<ServerName, ServerErrors>();
2665     private final long canRetryUntil;
2666     private final int maxRetries;
2667     private final long startTrackingTime;
2668 
2669     public ServerErrorTracker(long timeout, int maxRetries) {
2670       this.maxRetries = maxRetries;
2671       this.canRetryUntil = EnvironmentEdgeManager.currentTime() + timeout;
2672       this.startTrackingTime = new Date().getTime();
2673     }
2674 
2675     /**
2676      * We stop to retry when we have exhausted BOTH the number of retries and the time allocated.
2677      */
2678     boolean canRetryMore(int numRetry) {
2679       // If there is a single try we must not take into account the time.
2680       return numRetry < maxRetries || (maxRetries > 1 &&
2681           EnvironmentEdgeManager.currentTime() < this.canRetryUntil);
2682     }
2683 
2684     /**
2685      * Calculates the back-off time for a retrying request to a particular server.
2686      *
2687      * @param server    The server in question.
2688      * @param basePause The default hci pause.
2689      * @return The time to wait before sending next request.
2690      */
2691     long calculateBackoffTime(ServerName server, long basePause) {
2692       long result;
2693       ServerErrors errorStats = errorsByServer.get(server);
2694       if (errorStats != null) {
2695         result = ConnectionUtils.getPauseTime(basePause, errorStats.retries.get());
2696       } else {
2697         result = 0; // yes, if the server is not in our list we don't wait before retrying.
2698       }
2699       return result;
2700     }
2701 
2702     /**
2703      * Reports that there was an error on the server to do whatever bean-counting necessary.
2704      *
2705      * @param server The server in question.
2706      */
2707     void reportServerError(ServerName server) {
2708       ServerErrors errors = errorsByServer.get(server);
2709       if (errors != null) {
2710         errors.addError();
2711       } else {
2712         errors = errorsByServer.putIfAbsent(server, new ServerErrors());
2713         if (errors != null){
2714           errors.addError();
2715         }
2716       }
2717     }
2718 
2719     long getStartTrackingTime() {
2720       return startTrackingTime;
2721     }
2722 
2723     /**
2724      * The record of errors for a server.
2725      */
2726     private static class ServerErrors {
2727       public final AtomicInteger retries = new AtomicInteger(0);
2728 
2729       public void addError() {
2730         retries.incrementAndGet();
2731       }
2732     }
2733   }
2734 }