1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.client;
20
21 import static org.apache.hadoop.hbase.client.MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY;
22
23 import java.io.Closeable;
24 import java.io.IOException;
25 import java.io.InterruptedIOException;
26 import java.lang.reflect.UndeclaredThrowableException;
27 import java.net.InetAddress;
28 import java.net.InetSocketAddress;
29 import java.util.ArrayList;
30 import java.util.Date;
31 import java.util.HashSet;
32 import java.util.LinkedHashMap;
33 import java.util.List;
34 import java.util.concurrent.BlockingQueue;
35 import java.util.Map;
36 import java.util.Map.Entry;
37 import java.util.NavigableMap;
38 import java.util.Set;
39 import java.util.concurrent.ConcurrentHashMap;
40 import java.util.concurrent.ConcurrentMap;
41 import java.util.concurrent.ExecutorService;
42 import java.util.concurrent.LinkedBlockingQueue;
43 import java.util.concurrent.ThreadPoolExecutor;
44 import java.util.concurrent.TimeUnit;
45 import java.util.concurrent.atomic.AtomicBoolean;
46 import java.util.concurrent.atomic.AtomicInteger;
47 import java.util.concurrent.locks.ReentrantLock;
48
49 import org.apache.commons.logging.Log;
50 import org.apache.commons.logging.LogFactory;
51 import org.apache.hadoop.conf.Configuration;
52 import org.apache.hadoop.hbase.DoNotRetryIOException;
53 import org.apache.hadoop.hbase.HBaseConfiguration;
54 import org.apache.hadoop.hbase.HConstants;
55 import org.apache.hadoop.hbase.HRegionInfo;
56 import org.apache.hadoop.hbase.HRegionLocation;
57 import org.apache.hadoop.hbase.HTableDescriptor;
58 import org.apache.hadoop.hbase.MasterNotRunningException;
59 import org.apache.hadoop.hbase.MetaTableAccessor;
60 import org.apache.hadoop.hbase.RegionLocations;
61 import org.apache.hadoop.hbase.ServerName;
62 import org.apache.hadoop.hbase.TableName;
63 import org.apache.hadoop.hbase.TableNotEnabledException;
64 import org.apache.hadoop.hbase.TableNotFoundException;
65 import org.apache.hadoop.hbase.ZooKeeperConnectionException;
66 import org.apache.hadoop.hbase.classification.InterfaceAudience;
67 import org.apache.hadoop.hbase.client.AsyncProcess.AsyncRequestFuture;
68 import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor;
69 import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitorBase;
70 import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy;
71 import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicyFactory;
72 import org.apache.hadoop.hbase.client.coprocessor.Batch;
73 import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil;
74 import org.apache.hadoop.hbase.exceptions.RegionMovedException;
75 import org.apache.hadoop.hbase.ipc.RpcClient;
76 import org.apache.hadoop.hbase.ipc.RpcClientFactory;
77 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
78 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
79 import org.apache.hadoop.hbase.protobuf.RequestConverter;
80 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
81 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
82 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
83 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
84 import org.apache.hadoop.hbase.protobuf.generated.*;
85 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AddColumnRequest;
86 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AddColumnResponse;
87 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AssignRegionRequest;
88 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AssignRegionResponse;
89 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.BalanceRequest;
90 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.BalanceResponse;
91 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.CreateNamespaceRequest;
92 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.CreateNamespaceResponse;
93 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.CreateTableRequest;
94 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.CreateTableResponse;
95 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteColumnRequest;
96 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteColumnResponse;
97 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteNamespaceRequest;
98 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteNamespaceResponse;
99 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteSnapshotRequest;
100 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteSnapshotResponse;
101 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteTableRequest;
102 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteTableResponse;
103 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DisableTableRequest;
104 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DisableTableResponse;
105 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DispatchMergingRegionsRequest;
106 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DispatchMergingRegionsResponse;
107 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.EnableCatalogJanitorRequest;
108 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.EnableCatalogJanitorResponse;
109 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.EnableTableRequest;
110 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.EnableTableResponse;
111 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ExecProcedureRequest;
112 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ExecProcedureResponse;
113 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetClusterStatusRequest;
114 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetClusterStatusResponse;
115 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetCompletedSnapshotsRequest;
116 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetCompletedSnapshotsResponse;
117 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetNamespaceDescriptorRequest;
118 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetNamespaceDescriptorResponse;
119 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetSchemaAlterStatusRequest;
120 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetSchemaAlterStatusResponse;
121 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableDescriptorsRequest;
122 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableDescriptorsResponse;
123 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableNamesRequest;
124 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableNamesResponse;
125 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetProcedureResultRequest;
126 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetProcedureResultResponse;
127 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsBalancerEnabledRequest;
128 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsBalancerEnabledResponse;
129 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsCatalogJanitorEnabledRequest;
130 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsCatalogJanitorEnabledResponse;
131 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsMasterRunningRequest;
132 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsMasterRunningResponse;
133 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsNormalizerEnabledRequest;
134 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsNormalizerEnabledResponse;
135 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsProcedureDoneRequest;
136 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsProcedureDoneResponse;
137 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsRestoreSnapshotDoneRequest;
138 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsRestoreSnapshotDoneResponse;
139 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsSnapshotDoneRequest;
140 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsSnapshotDoneResponse;
141 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ListNamespaceDescriptorsRequest;
142 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ListNamespaceDescriptorsResponse;
143 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceRequest;
144 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceResponse;
145 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ListTableNamesByNamespaceRequest;
146 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ListTableNamesByNamespaceResponse;
147 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MajorCompactionTimestampForRegionRequest;
148 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MajorCompactionTimestampRequest;
149 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MajorCompactionTimestampResponse;
150 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MasterService;
151 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ModifyColumnRequest;
152 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ModifyColumnResponse;
153 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ModifyNamespaceRequest;
154 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ModifyNamespaceResponse;
155 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ModifyTableRequest;
156 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ModifyTableResponse;
157 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MoveRegionRequest;
158 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MoveRegionResponse;
159 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.NormalizeRequest;
160 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.NormalizeResponse;
161 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.OfflineRegionRequest;
162 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.OfflineRegionResponse;
163 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.RestoreSnapshotRequest;
164 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.RestoreSnapshotResponse;
165 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.RunCatalogScanRequest;
166 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.RunCatalogScanResponse;
167 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SecurityCapabilitiesRequest;
168 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SecurityCapabilitiesResponse;
169 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SetBalancerRunningRequest;
170 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SetBalancerRunningResponse;
171 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SetNormalizerRunningRequest;
172 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SetNormalizerRunningResponse;
173 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SetQuotaRequest;
174 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SetQuotaResponse;
175 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ShutdownRequest;
176 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ShutdownResponse;
177 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SnapshotRequest;
178 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SnapshotResponse;
179 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.StopMasterRequest;
180 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.StopMasterResponse;
181 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.TruncateTableRequest;
182 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.TruncateTableResponse;
183 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.UnassignRegionRequest;
184 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.UnassignRegionResponse;
185 import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
186 import org.apache.hadoop.hbase.security.User;
187 import org.apache.hadoop.hbase.security.UserProvider;
188 import org.apache.hadoop.hbase.util.Bytes;
189 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
190 import org.apache.hadoop.hbase.util.ExceptionUtil;
191 import org.apache.hadoop.hbase.util.Threads;
192 import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
193 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
194 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
195 import org.apache.hadoop.ipc.RemoteException;
196 import org.apache.zookeeper.KeeperException;
197
198 import com.google.common.annotations.VisibleForTesting;
199 import com.google.protobuf.BlockingRpcChannel;
200 import com.google.protobuf.RpcController;
201 import com.google.protobuf.ServiceException;
202
203
204
205
206 @SuppressWarnings("serial")
207 @InterfaceAudience.Private
208
209 class ConnectionManager {
210 static final Log LOG = LogFactory.getLog(ConnectionManager.class);
211
212 public static final String RETRIES_BY_SERVER_KEY = "hbase.client.retries.by.server";
213 private static final String CLIENT_NONCES_ENABLED_KEY = "hbase.client.nonces.enabled";
214 private static final String RESOLVE_HOSTNAME_ON_FAIL_KEY = "hbase.resolve.hostnames.on.failure";
215
216
217
218
219 static final Map<HConnectionKey, HConnectionImplementation> CONNECTION_INSTANCES;
220
221 public static final int MAX_CACHED_CONNECTION_INSTANCES;
222
223
224
225
226
227 private static volatile NonceGenerator nonceGenerator = null;
228
229 private static Object nonceGeneratorCreateLock = new Object();
230
231 static {
232
233
234
235
236 MAX_CACHED_CONNECTION_INSTANCES = HBaseConfiguration.create().getInt(
237 HConstants.ZOOKEEPER_MAX_CLIENT_CNXNS, HConstants.DEFAULT_ZOOKEPER_MAX_CLIENT_CNXNS) + 1;
238 CONNECTION_INSTANCES = new LinkedHashMap<HConnectionKey, HConnectionImplementation>(
239 (int) (MAX_CACHED_CONNECTION_INSTANCES / 0.75F) + 1, 0.75F, true) {
240 @Override
241 protected boolean removeEldestEntry(
242 Map.Entry<HConnectionKey, HConnectionImplementation> eldest) {
243 return size() > MAX_CACHED_CONNECTION_INSTANCES;
244 }
245 };
246 }
247
248
249 static class NoNonceGenerator implements NonceGenerator {
250 @Override
251 public long getNonceGroup() {
252 return HConstants.NO_NONCE;
253 }
254 @Override
255 public long newNonce() {
256 return HConstants.NO_NONCE;
257 }
258 }
259
260
261
262
263 private ConnectionManager() {
264 super();
265 }
266
267
268
269
270
271
272 @VisibleForTesting
273 static NonceGenerator injectNonceGeneratorForTesting(
274 ClusterConnection conn, NonceGenerator cnm) {
275 HConnectionImplementation connImpl = (HConnectionImplementation)conn;
276 NonceGenerator ng = connImpl.getNonceGenerator();
277 LOG.warn("Nonce generator is being replaced by test code for " + cnm.getClass().getName());
278 connImpl.nonceGenerator = cnm;
279 return ng;
280 }
281
282
283
284
285
286
287
288
289
290
291 @Deprecated
292 public static HConnection getConnection(final Configuration conf) throws IOException {
293 return getConnectionInternal(conf);
294 }
295
296
297 static ClusterConnection getConnectionInternal(final Configuration conf)
298 throws IOException {
299 HConnectionKey connectionKey = new HConnectionKey(conf);
300 synchronized (CONNECTION_INSTANCES) {
301 HConnectionImplementation connection = CONNECTION_INSTANCES.get(connectionKey);
302 if (connection == null) {
303 connection = (HConnectionImplementation)createConnection(conf, true);
304 CONNECTION_INSTANCES.put(connectionKey, connection);
305 } else if (connection.isClosed()) {
306 ConnectionManager.deleteConnection(connectionKey, true);
307 connection = (HConnectionImplementation)createConnection(conf, true);
308 CONNECTION_INSTANCES.put(connectionKey, connection);
309 }
310 connection.incCount();
311 return connection;
312 }
313 }
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335 public static HConnection createConnection(Configuration conf) throws IOException {
336 return createConnectionInternal(conf);
337 }
338
339 static ClusterConnection createConnectionInternal(Configuration conf) throws IOException {
340 UserProvider provider = UserProvider.instantiate(conf);
341 return createConnection(conf, false, null, provider.getCurrent());
342 }
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364 public static HConnection createConnection(Configuration conf, ExecutorService pool)
365 throws IOException {
366 UserProvider provider = UserProvider.instantiate(conf);
367 return createConnection(conf, false, pool, provider.getCurrent());
368 }
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390 public static HConnection createConnection(Configuration conf, User user)
391 throws IOException {
392 return createConnection(conf, false, null, user);
393 }
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416 public static HConnection createConnection(Configuration conf, ExecutorService pool, User user)
417 throws IOException {
418 return createConnection(conf, false, pool, user);
419 }
420
421 @Deprecated
422 static HConnection createConnection(final Configuration conf, final boolean managed)
423 throws IOException {
424 UserProvider provider = UserProvider.instantiate(conf);
425 return createConnection(conf, managed, null, provider.getCurrent());
426 }
427
428 @Deprecated
429 static ClusterConnection createConnection(final Configuration conf, final boolean managed,
430 final ExecutorService pool, final User user)
431 throws IOException {
432 return (ClusterConnection) ConnectionFactory.createConnection(conf, managed, pool, user);
433 }
434
435
436
437
438
439
440
441
442
443 @Deprecated
444 public static void deleteConnection(Configuration conf) {
445 deleteConnection(new HConnectionKey(conf), false);
446 }
447
448
449
450
451
452
453
454
455 @Deprecated
456 public static void deleteStaleConnection(HConnection connection) {
457 deleteConnection(connection, true);
458 }
459
460
461
462
463
464
465
466 @Deprecated
467 public static void deleteAllConnections(boolean staleConnection) {
468 synchronized (CONNECTION_INSTANCES) {
469 Set<HConnectionKey> connectionKeys = new HashSet<HConnectionKey>();
470 connectionKeys.addAll(CONNECTION_INSTANCES.keySet());
471 for (HConnectionKey connectionKey : connectionKeys) {
472 deleteConnection(connectionKey, staleConnection);
473 }
474 CONNECTION_INSTANCES.clear();
475 }
476 }
477
478
479
480
481
482 @Deprecated
483 public static void deleteAllConnections() {
484 deleteAllConnections(false);
485 }
486
487
488 @Deprecated
489 private static void deleteConnection(HConnection connection, boolean staleConnection) {
490 synchronized (CONNECTION_INSTANCES) {
491 for (Entry<HConnectionKey, HConnectionImplementation> e: CONNECTION_INSTANCES.entrySet()) {
492 if (e.getValue() == connection) {
493 deleteConnection(e.getKey(), staleConnection);
494 break;
495 }
496 }
497 }
498 }
499
500 @Deprecated
501 private static void deleteConnection(HConnectionKey connectionKey, boolean staleConnection) {
502 synchronized (CONNECTION_INSTANCES) {
503 HConnectionImplementation connection = CONNECTION_INSTANCES.get(connectionKey);
504 if (connection != null) {
505 connection.decCount();
506 if (connection.isZeroReference() || staleConnection) {
507 CONNECTION_INSTANCES.remove(connectionKey);
508 connection.internalClose();
509 }
510 } else {
511 LOG.error("Connection not found in the list, can't delete it "+
512 "(connection key=" + connectionKey + "). May be the key was modified?", new Exception());
513 }
514 }
515 }
516
517
518
519
520
521
522
523
524
525
526
527
528 @InterfaceAudience.Private
529 public static <T> T execute(HConnectable<T> connectable) throws IOException {
530 if (connectable == null || connectable.conf == null) {
531 return null;
532 }
533 Configuration conf = connectable.conf;
534 HConnection connection = getConnection(conf);
535 boolean connectSucceeded = false;
536 try {
537 T returnValue = connectable.connect(connection);
538 connectSucceeded = true;
539 return returnValue;
540 } finally {
541 try {
542 connection.close();
543 } catch (Exception e) {
544 ExceptionUtil.rethrowIfInterrupt(e);
545 if (connectSucceeded) {
546 throw new IOException("The connection to " + connection
547 + " could not be deleted.", e);
548 }
549 }
550 }
551 }
552
553
554 @edu.umd.cs.findbugs.annotations.SuppressWarnings(
555 value="AT_OPERATION_SEQUENCE_ON_CONCURRENT_ABSTRACTION",
556 justification="Access to the conncurrent hash map is under a lock so should be fine.")
557 static class HConnectionImplementation implements ClusterConnection, Closeable {
558 static final Log LOG = LogFactory.getLog(HConnectionImplementation.class);
559 private final boolean hostnamesCanChange;
560 private final long pause;
561 private final boolean useMetaReplicas;
562 private final int numTries;
563 final int rpcTimeout;
564 private NonceGenerator nonceGenerator = null;
565 private final AsyncProcess asyncProcess;
566
567 private final ServerStatisticTracker stats;
568
569 private volatile boolean closed;
570 private volatile boolean aborted;
571
572
573 ClusterStatusListener clusterStatusListener;
574
575
576 private final Object metaRegionLock = new Object();
577
578
579
580
581
582
583 private final Object masterAndZKLock = new Object();
584
585 private long keepZooKeeperWatcherAliveUntil = Long.MAX_VALUE;
586
587
588
589 private volatile ExecutorService batchPool = null;
590
591
592 private volatile ExecutorService metaLookupPool = null;
593 private volatile boolean cleanupPool = false;
594
595 private final Configuration conf;
596
597
598
599 private final ConnectionConfiguration connectionConfig;
600
601
602 private RpcClient rpcClient;
603
604 private final MetaCache metaCache;
605 private final MetricsConnection metrics;
606
607 private int refCount;
608
609
610 private boolean managed;
611
612 protected User user;
613
614 private RpcRetryingCallerFactory rpcCallerFactory;
615
616 private RpcControllerFactory rpcControllerFactory;
617
618 private final RetryingCallerInterceptor interceptor;
619
620
621
622
623 Registry registry;
624
625 private final ClientBackoffPolicy backoffPolicy;
626
627
628 private final ReentrantLock userRegionLock = new ReentrantLock();
629
630
631 HConnectionImplementation(Configuration conf, boolean managed) throws IOException {
632 this(conf, managed, null, null);
633 }
634
635 HConnectionImplementation(Configuration conf, boolean managed, ExecutorService pool, User user)
636 throws IOException {
637 this(conf, managed, pool, user, null);
638 }
639
640
641
642
643
644
645
646
647
648
649
650
651 HConnectionImplementation(Configuration conf, boolean managed,
652 ExecutorService pool, User user, String clusterId) throws IOException {
653 this(conf);
654 this.clusterId = clusterId;
655 this.user = user;
656 this.batchPool = pool;
657 this.managed = managed;
658 this.registry = setupRegistry();
659 retrieveClusterId();
660
661 this.rpcClient = RpcClientFactory.createClient(this.conf, this.clusterId, this.metrics);
662 this.rpcControllerFactory = RpcControllerFactory.instantiate(conf);
663
664
665 boolean shouldListen = conf.getBoolean(HConstants.STATUS_PUBLISHED,
666 HConstants.STATUS_PUBLISHED_DEFAULT);
667 Class<? extends ClusterStatusListener.Listener> listenerClass =
668 conf.getClass(ClusterStatusListener.STATUS_LISTENER_CLASS,
669 ClusterStatusListener.DEFAULT_STATUS_LISTENER_CLASS,
670 ClusterStatusListener.Listener.class);
671 if (shouldListen) {
672 if (listenerClass == null) {
673 LOG.warn(HConstants.STATUS_PUBLISHED + " is true, but " +
674 ClusterStatusListener.STATUS_LISTENER_CLASS + " is not set - not listening status");
675 } else {
676 clusterStatusListener = new ClusterStatusListener(
677 new ClusterStatusListener.DeadServerHandler() {
678 @Override
679 public void newDead(ServerName sn) {
680 clearCaches(sn);
681 rpcClient.cancelConnections(sn);
682 }
683 }, conf, listenerClass);
684 }
685 }
686 }
687
688
689
690
691 protected HConnectionImplementation(Configuration conf) {
692 this.conf = conf;
693 this.connectionConfig = new ConnectionConfiguration(conf);
694 this.closed = false;
695 this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
696 HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
697 this.useMetaReplicas = conf.getBoolean(HConstants.USE_META_REPLICAS,
698 HConstants.DEFAULT_USE_META_REPLICAS);
699 this.numTries = connectionConfig.getRetriesNumber();
700 this.rpcTimeout = conf.getInt(
701 HConstants.HBASE_RPC_TIMEOUT_KEY,
702 HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
703 if (conf.getBoolean(CLIENT_NONCES_ENABLED_KEY, true)) {
704 synchronized (nonceGeneratorCreateLock) {
705 if (ConnectionManager.nonceGenerator == null) {
706 ConnectionManager.nonceGenerator = new PerClientRandomNonceGenerator();
707 }
708 this.nonceGenerator = ConnectionManager.nonceGenerator;
709 }
710 } else {
711 this.nonceGenerator = new NoNonceGenerator();
712 }
713 stats = ServerStatisticTracker.create(conf);
714 this.asyncProcess = createAsyncProcess(this.conf);
715 this.interceptor = (new RetryingCallerInterceptorFactory(conf)).build();
716 this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf, interceptor, this.stats);
717 this.backoffPolicy = ClientBackoffPolicyFactory.create(conf);
718 if (conf.getBoolean(CLIENT_SIDE_METRICS_ENABLED_KEY, false)) {
719 this.metrics = new MetricsConnection(this);
720 } else {
721 this.metrics = null;
722 }
723
724 this.hostnamesCanChange = conf.getBoolean(RESOLVE_HOSTNAME_ON_FAIL_KEY, true);
725 this.metaCache = new MetaCache(this.metrics);
726 }
727
728 @Override
729 public HTableInterface getTable(String tableName) throws IOException {
730 return getTable(TableName.valueOf(tableName));
731 }
732
733 @Override
734 public HTableInterface getTable(byte[] tableName) throws IOException {
735 return getTable(TableName.valueOf(tableName));
736 }
737
738 @Override
739 public HTableInterface getTable(TableName tableName) throws IOException {
740 return getTable(tableName, getBatchPool());
741 }
742
743 @Override
744 public HTableInterface getTable(String tableName, ExecutorService pool) throws IOException {
745 return getTable(TableName.valueOf(tableName), pool);
746 }
747
748 @Override
749 public HTableInterface getTable(byte[] tableName, ExecutorService pool) throws IOException {
750 return getTable(TableName.valueOf(tableName), pool);
751 }
752
753 @Override
754 public HTableInterface getTable(TableName tableName, ExecutorService pool) throws IOException {
755 if (managed) {
756 throw new NeedUnmanagedConnectionException();
757 }
758 return new HTable(tableName, this, connectionConfig, rpcCallerFactory, rpcControllerFactory, pool);
759 }
760
761 @Override
762 public BufferedMutator getBufferedMutator(BufferedMutatorParams params) {
763 if (params.getTableName() == null) {
764 throw new IllegalArgumentException("TableName cannot be null.");
765 }
766 if (params.getPool() == null) {
767 params.pool(HTable.getDefaultExecutor(getConfiguration()));
768 }
769 if (params.getWriteBufferSize() == BufferedMutatorParams.UNSET) {
770 params.writeBufferSize(connectionConfig.getWriteBufferSize());
771 }
772 if (params.getMaxKeyValueSize() == BufferedMutatorParams.UNSET) {
773 params.maxKeyValueSize(connectionConfig.getMaxKeyValueSize());
774 }
775 return new BufferedMutatorImpl(this, rpcCallerFactory, rpcControllerFactory, params);
776 }
777
778 @Override
779 public BufferedMutator getBufferedMutator(TableName tableName) {
780 return getBufferedMutator(new BufferedMutatorParams(tableName));
781 }
782
783 @Override
784 public RegionLocator getRegionLocator(TableName tableName) throws IOException {
785 return new HRegionLocator(tableName, this);
786 }
787
788 @Override
789 public Admin getAdmin() throws IOException {
790 if (managed) {
791 throw new NeedUnmanagedConnectionException();
792 }
793 return new HBaseAdmin(this);
794 }
795
796 @Override
797 public MetricsConnection getConnectionMetrics() {
798 return this.metrics;
799 }
800
801 private ExecutorService getBatchPool() {
802 if (batchPool == null) {
803 synchronized (this) {
804 if (batchPool == null) {
805 this.batchPool = getThreadPool(conf.getInt("hbase.hconnection.threads.max", 256),
806 conf.getInt("hbase.hconnection.threads.core", 256), "-shared-", null);
807 this.cleanupPool = true;
808 }
809 }
810 }
811 return this.batchPool;
812 }
813
814 private ExecutorService getThreadPool(int maxThreads, int coreThreads, String nameHint,
815 BlockingQueue<Runnable> passedWorkQueue) {
816
817 if (maxThreads == 0) {
818 maxThreads = Runtime.getRuntime().availableProcessors() * 8;
819 }
820 if (coreThreads == 0) {
821 coreThreads = Runtime.getRuntime().availableProcessors() * 8;
822 }
823 long keepAliveTime = conf.getLong("hbase.hconnection.threads.keepalivetime", 60);
824 BlockingQueue<Runnable> workQueue = passedWorkQueue;
825 if (workQueue == null) {
826 workQueue =
827 new LinkedBlockingQueue<Runnable>(maxThreads *
828 conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS,
829 HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS));
830 }
831 ThreadPoolExecutor tpe = new ThreadPoolExecutor(
832 coreThreads,
833 maxThreads,
834 keepAliveTime,
835 TimeUnit.SECONDS,
836 workQueue,
837 Threads.newDaemonThreadFactory(toString() + nameHint));
838 tpe.allowCoreThreadTimeOut(true);
839 return tpe;
840 }
841
842 private ExecutorService getMetaLookupPool() {
843 if (this.metaLookupPool == null) {
844 synchronized (this) {
845 if (this.metaLookupPool == null) {
846
847
848
849
850 this.metaLookupPool = getThreadPool(
851 conf.getInt("hbase.hconnection.meta.lookup.threads.max", 128),
852 conf.getInt("hbase.hconnection.meta.lookup.threads.core", 10),
853 "-metaLookup-shared-", new LinkedBlockingQueue<Runnable>());
854 }
855 }
856 }
857 return this.metaLookupPool;
858 }
859
860 protected ExecutorService getCurrentMetaLookupPool() {
861 return metaLookupPool;
862 }
863
864 protected ExecutorService getCurrentBatchPool() {
865 return batchPool;
866 }
867
868 private void shutdownPools() {
869 if (this.cleanupPool && this.batchPool != null && !this.batchPool.isShutdown()) {
870 shutdownBatchPool(this.batchPool);
871 }
872 if (this.metaLookupPool != null && !this.metaLookupPool.isShutdown()) {
873 shutdownBatchPool(this.metaLookupPool);
874 }
875 }
876
877 private void shutdownBatchPool(ExecutorService pool) {
878 pool.shutdown();
879 try {
880 if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
881 pool.shutdownNow();
882 }
883 } catch (InterruptedException e) {
884 pool.shutdownNow();
885 }
886 }
887
888
889
890
891
892 private Registry setupRegistry() throws IOException {
893 return RegistryFactory.getRegistry(this);
894 }
895
896
897
898
899 @VisibleForTesting
900 RpcClient getRpcClient() {
901 return rpcClient;
902 }
903
904
905
906
907 @Override
908 public String toString(){
909 return "hconnection-0x" + Integer.toHexString(hashCode());
910 }
911
912 protected String clusterId = null;
913
914 void retrieveClusterId() {
915 if (clusterId != null) return;
916 this.clusterId = this.registry.getClusterId();
917 if (clusterId == null) {
918 clusterId = HConstants.CLUSTER_ID_DEFAULT;
919 LOG.debug("clusterid came back null, using default " + clusterId);
920 }
921 }
922
923 @Override
924 public Configuration getConfiguration() {
925 return this.conf;
926 }
927
928 private void checkIfBaseNodeAvailable(ZooKeeperWatcher zkw)
929 throws MasterNotRunningException {
930 String errorMsg;
931 try {
932 if (ZKUtil.checkExists(zkw, zkw.baseZNode) == -1) {
933 errorMsg = "The node " + zkw.baseZNode+" is not in ZooKeeper. "
934 + "It should have been written by the master. "
935 + "Check the value configured in 'zookeeper.znode.parent'. "
936 + "There could be a mismatch with the one configured in the master.";
937 LOG.error(errorMsg);
938 throw new MasterNotRunningException(errorMsg);
939 }
940 } catch (KeeperException e) {
941 errorMsg = "Can't get connection to ZooKeeper: " + e.getMessage();
942 LOG.error(errorMsg);
943 throw new MasterNotRunningException(errorMsg, e);
944 }
945 }
946
947
948
949
950
951
952 @Deprecated
953 @Override
954 public boolean isMasterRunning()
955 throws MasterNotRunningException, ZooKeeperConnectionException {
956
957
958
959 MasterKeepAliveConnection m = getKeepAliveMasterService();
960 m.close();
961 return true;
962 }
963
964 @Override
965 public HRegionLocation getRegionLocation(final TableName tableName,
966 final byte [] row, boolean reload)
967 throws IOException {
968 return reload? relocateRegion(tableName, row): locateRegion(tableName, row);
969 }
970
971 @Override
972 public HRegionLocation getRegionLocation(final byte[] tableName,
973 final byte [] row, boolean reload)
974 throws IOException {
975 return getRegionLocation(TableName.valueOf(tableName), row, reload);
976 }
977
978 @Override
979 public boolean isTableEnabled(TableName tableName) throws IOException {
980 return this.registry.isTableOnlineState(tableName, true);
981 }
982
983 @Override
984 public boolean isTableEnabled(byte[] tableName) throws IOException {
985 return isTableEnabled(TableName.valueOf(tableName));
986 }
987
988 @Override
989 public boolean isTableDisabled(TableName tableName) throws IOException {
990 return this.registry.isTableOnlineState(tableName, false);
991 }
992
993 @Override
994 public boolean isTableDisabled(byte[] tableName) throws IOException {
995 return isTableDisabled(TableName.valueOf(tableName));
996 }
997
998 @Override
999 public boolean isTableAvailable(final TableName tableName) throws IOException {
1000 final AtomicBoolean available = new AtomicBoolean(true);
1001 final AtomicInteger regionCount = new AtomicInteger(0);
1002 MetaScannerVisitor visitor = new MetaScannerVisitorBase() {
1003 @Override
1004 public boolean processRow(Result row) throws IOException {
1005 HRegionInfo info = MetaScanner.getHRegionInfo(row);
1006 if (info != null && !info.isSplitParent()) {
1007 if (tableName.equals(info.getTable())) {
1008 ServerName server = HRegionInfo.getServerName(row);
1009 if (server == null) {
1010 available.set(false);
1011 return false;
1012 }
1013 regionCount.incrementAndGet();
1014 } else if (tableName.compareTo(info.getTable()) < 0) {
1015
1016 return false;
1017 }
1018 }
1019 return true;
1020 }
1021 };
1022 MetaScanner.metaScan(this, visitor, tableName);
1023 return available.get() && (regionCount.get() > 0);
1024 }
1025
1026 @Override
1027 public boolean isTableAvailable(final byte[] tableName) throws IOException {
1028 return isTableAvailable(TableName.valueOf(tableName));
1029 }
1030
1031 @Override
1032 public boolean isTableAvailable(final TableName tableName, final byte[][] splitKeys)
1033 throws IOException {
1034 final AtomicBoolean available = new AtomicBoolean(true);
1035 final AtomicInteger regionCount = new AtomicInteger(0);
1036 MetaScannerVisitor visitor = new MetaScannerVisitorBase() {
1037 @Override
1038 public boolean processRow(Result row) throws IOException {
1039 HRegionInfo info = MetaScanner.getHRegionInfo(row);
1040 if (info != null && !info.isSplitParent()) {
1041 if (tableName.equals(info.getTable())) {
1042 ServerName server = HRegionInfo.getServerName(row);
1043 if (server == null) {
1044 available.set(false);
1045 return false;
1046 }
1047 if (!Bytes.equals(info.getStartKey(), HConstants.EMPTY_BYTE_ARRAY)) {
1048 for (byte[] splitKey : splitKeys) {
1049
1050 if (Bytes.equals(info.getStartKey(), splitKey)) {
1051 regionCount.incrementAndGet();
1052 break;
1053 }
1054 }
1055 } else {
1056
1057 regionCount.incrementAndGet();
1058 }
1059 } else if (tableName.compareTo(info.getTable()) < 0) {
1060
1061 return false;
1062 }
1063 }
1064 return true;
1065 }
1066 };
1067 MetaScanner.metaScan(this, visitor, tableName);
1068
1069 return available.get() && (regionCount.get() == splitKeys.length + 1);
1070 }
1071
1072 @Override
1073 public boolean isTableAvailable(final byte[] tableName, final byte[][] splitKeys)
1074 throws IOException {
1075 return isTableAvailable(TableName.valueOf(tableName), splitKeys);
1076 }
1077
1078 @Override
1079 public HRegionLocation locateRegion(final byte[] regionName) throws IOException {
1080 RegionLocations locations = locateRegion(HRegionInfo.getTable(regionName),
1081 HRegionInfo.getStartKey(regionName), false, true);
1082 return locations == null ? null : locations.getRegionLocation();
1083 }
1084
1085 @Override
1086 public boolean isDeadServer(ServerName sn) {
1087 if (clusterStatusListener == null) {
1088 return false;
1089 } else {
1090 return clusterStatusListener.isDeadServer(sn);
1091 }
1092 }
1093
1094 @Override
1095 public List<HRegionLocation> locateRegions(final TableName tableName)
1096 throws IOException {
1097 return locateRegions (tableName, false, true);
1098 }
1099
1100 @Override
1101 public List<HRegionLocation> locateRegions(final byte[] tableName)
1102 throws IOException {
1103 return locateRegions(TableName.valueOf(tableName));
1104 }
1105
1106 @Override
1107 public List<HRegionLocation> locateRegions(final TableName tableName,
1108 final boolean useCache, final boolean offlined) throws IOException {
1109 NavigableMap<HRegionInfo, ServerName> regions = MetaScanner.allTableRegions(this, tableName);
1110 final List<HRegionLocation> locations = new ArrayList<HRegionLocation>();
1111 for (HRegionInfo regionInfo : regions.keySet()) {
1112 RegionLocations list = locateRegion(tableName, regionInfo.getStartKey(), useCache, true);
1113 if (list != null) {
1114 for (HRegionLocation loc : list.getRegionLocations()) {
1115 if (loc != null) {
1116 locations.add(loc);
1117 }
1118 }
1119 }
1120 }
1121 return locations;
1122 }
1123
1124 @Override
1125 public List<HRegionLocation> locateRegions(final byte[] tableName,
1126 final boolean useCache, final boolean offlined) throws IOException {
1127 return locateRegions(TableName.valueOf(tableName), useCache, offlined);
1128 }
1129
1130 @Override
1131 public HRegionLocation locateRegion(
1132 final TableName tableName, final byte[] row) throws IOException{
1133 RegionLocations locations = locateRegion(tableName, row, true, true);
1134 return locations == null ? null : locations.getRegionLocation();
1135 }
1136
1137 @Override
1138 public HRegionLocation locateRegion(final byte[] tableName,
1139 final byte [] row)
1140 throws IOException{
1141 return locateRegion(TableName.valueOf(tableName), row);
1142 }
1143
1144 @Override
1145 public HRegionLocation relocateRegion(final TableName tableName,
1146 final byte [] row) throws IOException{
1147 RegionLocations locations = relocateRegion(tableName, row,
1148 RegionReplicaUtil.DEFAULT_REPLICA_ID);
1149 return locations == null ? null :
1150 locations.getRegionLocation(RegionReplicaUtil.DEFAULT_REPLICA_ID);
1151 }
1152
1153 @Override
1154 public RegionLocations relocateRegion(final TableName tableName,
1155 final byte [] row, int replicaId) throws IOException{
1156
1157
1158
1159 if (!tableName.equals(TableName.META_TABLE_NAME) && isTableDisabled(tableName)) {
1160 throw new TableNotEnabledException(tableName.getNameAsString() + " is disabled.");
1161 }
1162
1163 return locateRegion(tableName, row, false, true, replicaId);
1164 }
1165
1166 @Override
1167 public HRegionLocation relocateRegion(final byte[] tableName,
1168 final byte [] row) throws IOException {
1169 return relocateRegion(TableName.valueOf(tableName), row);
1170 }
1171
1172 @Override
1173 public RegionLocations locateRegion(final TableName tableName,
1174 final byte [] row, boolean useCache, boolean retry)
1175 throws IOException {
1176 return locateRegion(tableName, row, useCache, retry, RegionReplicaUtil.DEFAULT_REPLICA_ID);
1177 }
1178
1179 @Override
1180 public RegionLocations locateRegion(final TableName tableName,
1181 final byte [] row, boolean useCache, boolean retry, int replicaId)
1182 throws IOException {
1183 if (this.closed) throw new DoNotRetryIOException(toString() + " closed");
1184 if (tableName== null || tableName.getName().length == 0) {
1185 throw new IllegalArgumentException(
1186 "table name cannot be null or zero length");
1187 }
1188 if (tableName.equals(TableName.META_TABLE_NAME)) {
1189 return locateMeta(tableName, useCache, replicaId);
1190 } else {
1191
1192 return locateRegionInMeta(tableName, row, useCache, retry, replicaId);
1193 }
1194 }
1195
1196 private RegionLocations locateMeta(final TableName tableName,
1197 boolean useCache, int replicaId) throws IOException {
1198
1199
1200
1201 byte[] metaCacheKey = HConstants.EMPTY_START_ROW;
1202 RegionLocations locations = null;
1203 if (useCache) {
1204 locations = getCachedLocation(tableName, metaCacheKey);
1205 if (locations != null && locations.getRegionLocation(replicaId) != null) {
1206 return locations;
1207 }
1208 }
1209
1210
1211 synchronized (metaRegionLock) {
1212
1213
1214 if (useCache) {
1215 locations = getCachedLocation(tableName, metaCacheKey);
1216 if (locations != null && locations.getRegionLocation(replicaId) != null) {
1217 return locations;
1218 }
1219 }
1220
1221
1222 locations = this.registry.getMetaRegionLocation();
1223 if (locations != null) {
1224 cacheLocation(tableName, locations);
1225 }
1226 }
1227 return locations;
1228 }
1229
1230
1231
1232
1233
1234 private RegionLocations locateRegionInMeta(TableName tableName, byte[] row,
1235 boolean useCache, boolean retry, int replicaId) throws IOException {
1236
1237
1238
1239 if (useCache) {
1240 RegionLocations locations = getCachedLocation(tableName, row);
1241 if (locations != null && locations.getRegionLocation(replicaId) != null) {
1242 return locations;
1243 }
1244 }
1245
1246
1247
1248
1249 byte[] metaKey = HRegionInfo.createRegionName(tableName, row, HConstants.NINES, false);
1250
1251 Scan s = new Scan();
1252 s.setReversed(true);
1253 s.setStartRow(metaKey);
1254 s.setSmall(true);
1255 s.setCaching(1);
1256 if (this.useMetaReplicas) {
1257 s.setConsistency(Consistency.TIMELINE);
1258 }
1259
1260 int localNumRetries = (retry ? numTries : 1);
1261
1262 for (int tries = 0; true; tries++) {
1263 if (tries >= localNumRetries) {
1264 throw new NoServerForRegionException("Unable to find region for " +
1265 Bytes.toStringBinary(row) + " in " + tableName + " after " + tries + " tries.");
1266 }
1267 if (useCache) {
1268 RegionLocations locations = getCachedLocation(tableName, row);
1269 if (locations != null && locations.getRegionLocation(replicaId) != null) {
1270 return locations;
1271 }
1272 } else {
1273
1274
1275 metaCache.clearCache(tableName, row);
1276 }
1277
1278
1279 long pauseBase = this.pause;
1280 userRegionLock.lock();
1281 try {
1282 if (useCache) {
1283 RegionLocations locations = getCachedLocation(tableName, row);
1284 if (locations != null && locations.getRegionLocation(replicaId) != null) {
1285 return locations;
1286 }
1287 }
1288 Result regionInfoRow = null;
1289 ReversedClientScanner rcs = null;
1290 try {
1291 rcs = new ClientSmallReversedScanner(conf, s, TableName.META_TABLE_NAME, this,
1292 rpcCallerFactory, rpcControllerFactory, getMetaLookupPool(), 0);
1293 regionInfoRow = rcs.next();
1294 } finally {
1295 if (rcs != null) {
1296 rcs.close();
1297 }
1298 }
1299
1300 if (regionInfoRow == null) {
1301 throw new TableNotFoundException(tableName);
1302 }
1303
1304
1305 RegionLocations locations = MetaTableAccessor.getRegionLocations(regionInfoRow);
1306 if (locations == null || locations.getRegionLocation(replicaId) == null) {
1307 throw new IOException("HRegionInfo was null in " +
1308 tableName + ", row=" + regionInfoRow);
1309 }
1310 HRegionInfo regionInfo = locations.getRegionLocation(replicaId).getRegionInfo();
1311 if (regionInfo == null) {
1312 throw new IOException("HRegionInfo was null or empty in " +
1313 TableName.META_TABLE_NAME + ", row=" + regionInfoRow);
1314 }
1315
1316
1317 if (!regionInfo.getTable().equals(tableName)) {
1318 throw new TableNotFoundException(
1319 "Table '" + tableName + "' was not found, got: " +
1320 regionInfo.getTable() + ".");
1321 }
1322 if (regionInfo.isSplit()) {
1323 throw new RegionOfflineException(
1324 "the only available region for the required row is a split parent," +
1325 " the daughters should be online soon: " + regionInfo.getRegionNameAsString());
1326 }
1327 if (regionInfo.isOffline()) {
1328 throw new RegionOfflineException("the region is offline, could" +
1329 " be caused by a disable table call: " + regionInfo.getRegionNameAsString());
1330 }
1331
1332 ServerName serverName = locations.getRegionLocation(replicaId).getServerName();
1333 if (serverName == null) {
1334 throw new NoServerForRegionException("No server address listed in " +
1335 TableName.META_TABLE_NAME + " for region " + regionInfo.getRegionNameAsString() +
1336 " containing row " + Bytes.toStringBinary(row));
1337 }
1338
1339 if (isDeadServer(serverName)){
1340 throw new RegionServerStoppedException("hbase:meta says the region "+
1341 regionInfo.getRegionNameAsString()+" is managed by the server " + serverName +
1342 ", but it is dead.");
1343 }
1344
1345 cacheLocation(tableName, locations);
1346 return locations;
1347 } catch (TableNotFoundException e) {
1348
1349
1350
1351 throw e;
1352 } catch (IOException e) {
1353 ExceptionUtil.rethrowIfInterrupt(e);
1354
1355 if (e instanceof RemoteException) {
1356 e = ((RemoteException)e).unwrapRemoteException();
1357 }
1358 if (tries < localNumRetries - 1) {
1359 if (LOG.isDebugEnabled()) {
1360 LOG.debug("locateRegionInMeta parentTable=" + TableName.META_TABLE_NAME +
1361 ", metaLocation=" + ", attempt=" + tries + " of " + localNumRetries +
1362 " failed; retrying after sleep of " +
1363 ConnectionUtils.getPauseTime(pauseBase, tries) + " because: " + e.getMessage());
1364 }
1365 } else {
1366 throw e;
1367 }
1368
1369 if(!(e instanceof RegionOfflineException ||
1370 e instanceof NoServerForRegionException)) {
1371 relocateRegion(TableName.META_TABLE_NAME, metaKey, replicaId);
1372 }
1373 } finally {
1374 userRegionLock.unlock();
1375 }
1376 try{
1377 Thread.sleep(ConnectionUtils.getPauseTime(this.pause, tries));
1378 } catch (InterruptedException e) {
1379 throw new InterruptedIOException("Giving up trying to location region in " +
1380 "meta: thread is interrupted.");
1381 }
1382 }
1383 }
1384
1385
1386
1387
1388
1389
1390 @Override
1391 public void cacheLocation(final TableName tableName, final RegionLocations location) {
1392 metaCache.cacheLocation(tableName, location);
1393 }
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403 RegionLocations getCachedLocation(final TableName tableName,
1404 final byte [] row) {
1405 return metaCache.getCachedLocation(tableName, row);
1406 }
1407
1408 public void clearRegionCache(final TableName tableName, byte[] row) {
1409 metaCache.clearCache(tableName, row);
1410 }
1411
1412
1413
1414
1415 @Override
1416 public void clearCaches(final ServerName serverName) {
1417 metaCache.clearCache(serverName);
1418 }
1419
1420 @Override
1421 public void clearRegionCache() {
1422 metaCache.clearCache();
1423 }
1424
1425 @Override
1426 public void clearRegionCache(final TableName tableName) {
1427 metaCache.clearCache(tableName);
1428 }
1429
1430 @Override
1431 public void clearRegionCache(final byte[] tableName) {
1432 clearRegionCache(TableName.valueOf(tableName));
1433 }
1434
1435
1436
1437
1438
1439
1440
1441 private void cacheLocation(final TableName tableName, final ServerName source,
1442 final HRegionLocation location) {
1443 metaCache.cacheLocation(tableName, source, location);
1444 }
1445
1446
1447 private final ConcurrentHashMap<String, Object> stubs =
1448 new ConcurrentHashMap<String, Object>();
1449
1450 private final ConcurrentHashMap<String, String> connectionLock =
1451 new ConcurrentHashMap<String, String>();
1452
1453
1454
1455
1456 static class MasterServiceState {
1457 HConnection connection;
1458 MasterService.BlockingInterface stub;
1459 int userCount;
1460
1461 MasterServiceState (final HConnection connection) {
1462 super();
1463 this.connection = connection;
1464 }
1465
1466 @Override
1467 public String toString() {
1468 return "MasterService";
1469 }
1470
1471 Object getStub() {
1472 return this.stub;
1473 }
1474
1475 void clearStub() {
1476 this.stub = null;
1477 }
1478
1479 boolean isMasterRunning() throws ServiceException {
1480 IsMasterRunningResponse response =
1481 this.stub.isMasterRunning(null, RequestConverter.buildIsMasterRunningRequest());
1482 return response != null? response.getIsMasterRunning(): false;
1483 }
1484 }
1485
1486
1487
1488
1489
1490
1491 abstract class StubMaker {
1492
1493
1494
1495 protected abstract String getServiceName();
1496
1497
1498
1499
1500
1501 protected abstract Object makeStub(final BlockingRpcChannel channel);
1502
1503
1504
1505
1506
1507 protected abstract void isMasterRunning() throws ServiceException;
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517 private Object makeStubNoRetries() throws IOException, KeeperException, ServiceException {
1518 ZooKeeperKeepAliveConnection zkw;
1519 try {
1520 zkw = getKeepAliveZooKeeperWatcher();
1521 } catch (IOException e) {
1522 ExceptionUtil.rethrowIfInterrupt(e);
1523 throw new ZooKeeperConnectionException("Can't connect to ZooKeeper", e);
1524 }
1525 try {
1526 checkIfBaseNodeAvailable(zkw);
1527 ServerName sn = MasterAddressTracker.getMasterAddress(zkw);
1528 if (sn == null) {
1529 String msg = "ZooKeeper available but no active master location found";
1530 LOG.info(msg);
1531 throw new MasterNotRunningException(msg);
1532 }
1533 if (isDeadServer(sn)) {
1534 throw new MasterNotRunningException(sn + " is dead.");
1535 }
1536
1537 String key = getStubKey(getServiceName(),
1538 sn.getHostname(), sn.getPort(), hostnamesCanChange);
1539 connectionLock.putIfAbsent(key, key);
1540 Object stub = null;
1541 synchronized (connectionLock.get(key)) {
1542 stub = stubs.get(key);
1543 if (stub == null) {
1544 BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(sn, user, rpcTimeout);
1545 stub = makeStub(channel);
1546 isMasterRunning();
1547 stubs.put(key, stub);
1548 }
1549 }
1550 return stub;
1551 } finally {
1552 zkw.close();
1553 }
1554 }
1555
1556
1557
1558
1559
1560
1561 Object makeStub() throws IOException {
1562
1563
1564 synchronized (masterAndZKLock) {
1565 Exception exceptionCaught = null;
1566 if (!closed) {
1567 try {
1568 return makeStubNoRetries();
1569 } catch (IOException e) {
1570 exceptionCaught = e;
1571 } catch (KeeperException e) {
1572 exceptionCaught = e;
1573 } catch (ServiceException e) {
1574 exceptionCaught = e;
1575 }
1576
1577 throw new MasterNotRunningException(exceptionCaught);
1578 } else {
1579 throw new DoNotRetryIOException("Connection was closed while trying to get master");
1580 }
1581 }
1582 }
1583 }
1584
1585
1586
1587
1588 class MasterServiceStubMaker extends StubMaker {
1589 private MasterService.BlockingInterface stub;
1590 @Override
1591 protected String getServiceName() {
1592 return MasterService.getDescriptor().getName();
1593 }
1594
1595 @Override
1596 MasterService.BlockingInterface makeStub() throws IOException {
1597 return (MasterService.BlockingInterface)super.makeStub();
1598 }
1599
1600 @Override
1601 protected Object makeStub(BlockingRpcChannel channel) {
1602 this.stub = MasterService.newBlockingStub(channel);
1603 return this.stub;
1604 }
1605
1606 @Override
1607 protected void isMasterRunning() throws ServiceException {
1608 this.stub.isMasterRunning(null, RequestConverter.buildIsMasterRunningRequest());
1609 }
1610 }
1611
1612 @Override
1613 public AdminService.BlockingInterface getAdmin(final ServerName serverName)
1614 throws IOException {
1615 return getAdmin(serverName, false);
1616 }
1617
1618 @Override
1619
1620 public AdminService.BlockingInterface getAdmin(final ServerName serverName,
1621 final boolean master)
1622 throws IOException {
1623 if (isDeadServer(serverName)) {
1624 throw new RegionServerStoppedException(serverName + " is dead.");
1625 }
1626 String key = getStubKey(AdminService.BlockingInterface.class.getName(),
1627 serverName.getHostname(), serverName.getPort(), this.hostnamesCanChange);
1628 this.connectionLock.putIfAbsent(key, key);
1629 AdminService.BlockingInterface stub = null;
1630 synchronized (this.connectionLock.get(key)) {
1631 stub = (AdminService.BlockingInterface)this.stubs.get(key);
1632 if (stub == null) {
1633 BlockingRpcChannel channel =
1634 this.rpcClient.createBlockingRpcChannel(serverName, user, rpcTimeout);
1635 stub = AdminService.newBlockingStub(channel);
1636 this.stubs.put(key, stub);
1637 }
1638 }
1639 return stub;
1640 }
1641
1642 @Override
1643 public ClientService.BlockingInterface getClient(final ServerName sn)
1644 throws IOException {
1645 if (isDeadServer(sn)) {
1646 throw new RegionServerStoppedException(sn + " is dead.");
1647 }
1648 String key = getStubKey(ClientService.BlockingInterface.class.getName(), sn.getHostname(),
1649 sn.getPort(), this.hostnamesCanChange);
1650 this.connectionLock.putIfAbsent(key, key);
1651 ClientService.BlockingInterface stub = null;
1652 synchronized (this.connectionLock.get(key)) {
1653 stub = (ClientService.BlockingInterface)this.stubs.get(key);
1654 if (stub == null) {
1655 BlockingRpcChannel channel =
1656 this.rpcClient.createBlockingRpcChannel(sn, user, rpcTimeout);
1657 stub = ClientService.newBlockingStub(channel);
1658
1659
1660 this.stubs.put(key, stub);
1661 }
1662 }
1663 return stub;
1664 }
1665
1666 static String getStubKey(final String serviceName,
1667 final String rsHostname,
1668 int port,
1669 boolean resolveHostnames) {
1670
1671
1672
1673
1674
1675
1676 String address = rsHostname;
1677 if (resolveHostnames) {
1678 InetAddress i = new InetSocketAddress(rsHostname, port).getAddress();
1679 if (i != null) {
1680 address = i.getHostAddress() + "-" + rsHostname;
1681 }
1682 }
1683 return serviceName + "@" + address + ":" + port;
1684 }
1685
1686 private ZooKeeperKeepAliveConnection keepAliveZookeeper;
1687 private AtomicInteger keepAliveZookeeperUserCount = new AtomicInteger(0);
1688 private boolean canCloseZKW = true;
1689
1690
1691 private static final long keepAlive = 5 * 60 * 1000;
1692
1693
1694
1695
1696
1697 ZooKeeperKeepAliveConnection getKeepAliveZooKeeperWatcher()
1698 throws IOException {
1699 synchronized (masterAndZKLock) {
1700 if (keepAliveZookeeper == null) {
1701 if (this.closed) {
1702 throw new IOException(toString() + " closed");
1703 }
1704
1705
1706 keepAliveZookeeper = new ZooKeeperKeepAliveConnection(conf, this.toString(), this);
1707 }
1708 keepAliveZookeeperUserCount.addAndGet(1);
1709 keepZooKeeperWatcherAliveUntil = Long.MAX_VALUE;
1710 return keepAliveZookeeper;
1711 }
1712 }
1713
1714 void releaseZooKeeperWatcher(final ZooKeeperWatcher zkw) {
1715 if (zkw == null){
1716 return;
1717 }
1718 if (keepAliveZookeeperUserCount.addAndGet(-1) <= 0 ){
1719 keepZooKeeperWatcherAliveUntil = System.currentTimeMillis() + keepAlive;
1720 }
1721 }
1722
1723 private void closeZooKeeperWatcher() {
1724 synchronized (masterAndZKLock) {
1725 if (keepAliveZookeeper != null) {
1726 LOG.info("Closing zookeeper sessionid=0x" +
1727 Long.toHexString(
1728 keepAliveZookeeper.getRecoverableZooKeeper().getSessionId()));
1729 keepAliveZookeeper.internalClose();
1730 keepAliveZookeeper = null;
1731 }
1732 keepAliveZookeeperUserCount.set(0);
1733 }
1734 }
1735
1736 final MasterServiceState masterServiceState = new MasterServiceState(this);
1737
1738 @Override
1739 public MasterService.BlockingInterface getMaster() throws MasterNotRunningException {
1740 return getKeepAliveMasterService();
1741 }
1742
1743 private void resetMasterServiceState(final MasterServiceState mss) {
1744 mss.userCount++;
1745 }
1746
1747 @Override
1748 public MasterKeepAliveConnection getKeepAliveMasterService()
1749 throws MasterNotRunningException {
1750 synchronized (masterAndZKLock) {
1751 if (!isKeepAliveMasterConnectedAndRunning(this.masterServiceState)) {
1752 MasterServiceStubMaker stubMaker = new MasterServiceStubMaker();
1753 try {
1754 this.masterServiceState.stub = stubMaker.makeStub();
1755 } catch (MasterNotRunningException ex) {
1756 throw ex;
1757 } catch (IOException e) {
1758
1759 throw new MasterNotRunningException(e);
1760 }
1761 }
1762 resetMasterServiceState(this.masterServiceState);
1763 }
1764
1765 final MasterService.BlockingInterface stub = this.masterServiceState.stub;
1766 return new MasterKeepAliveConnection() {
1767 MasterServiceState mss = masterServiceState;
1768 @Override
1769 public MasterProtos.AbortProcedureResponse abortProcedure(
1770 RpcController controller,
1771 MasterProtos.AbortProcedureRequest request) throws ServiceException {
1772 return stub.abortProcedure(controller, request);
1773 }
1774 @Override
1775 public MasterProtos.ListProceduresResponse listProcedures(
1776 RpcController controller,
1777 MasterProtos.ListProceduresRequest request) throws ServiceException {
1778 return stub.listProcedures(controller, request);
1779 }
1780 @Override
1781 public AddColumnResponse addColumn(RpcController controller, AddColumnRequest request)
1782 throws ServiceException {
1783 return stub.addColumn(controller, request);
1784 }
1785
1786 @Override
1787 public DeleteColumnResponse deleteColumn(RpcController controller,
1788 DeleteColumnRequest request)
1789 throws ServiceException {
1790 return stub.deleteColumn(controller, request);
1791 }
1792
1793 @Override
1794 public ModifyColumnResponse modifyColumn(RpcController controller,
1795 ModifyColumnRequest request)
1796 throws ServiceException {
1797 return stub.modifyColumn(controller, request);
1798 }
1799
1800 @Override
1801 public MoveRegionResponse moveRegion(RpcController controller,
1802 MoveRegionRequest request) throws ServiceException {
1803 return stub.moveRegion(controller, request);
1804 }
1805
1806 @Override
1807 public DispatchMergingRegionsResponse dispatchMergingRegions(
1808 RpcController controller, DispatchMergingRegionsRequest request)
1809 throws ServiceException {
1810 return stub.dispatchMergingRegions(controller, request);
1811 }
1812
1813 @Override
1814 public AssignRegionResponse assignRegion(RpcController controller,
1815 AssignRegionRequest request) throws ServiceException {
1816 return stub.assignRegion(controller, request);
1817 }
1818
1819 @Override
1820 public UnassignRegionResponse unassignRegion(RpcController controller,
1821 UnassignRegionRequest request) throws ServiceException {
1822 return stub.unassignRegion(controller, request);
1823 }
1824
1825 @Override
1826 public OfflineRegionResponse offlineRegion(RpcController controller,
1827 OfflineRegionRequest request) throws ServiceException {
1828 return stub.offlineRegion(controller, request);
1829 }
1830
1831 @Override
1832 public DeleteTableResponse deleteTable(RpcController controller,
1833 DeleteTableRequest request) throws ServiceException {
1834 return stub.deleteTable(controller, request);
1835 }
1836
1837 @Override
1838 public TruncateTableResponse truncateTable(RpcController controller,
1839 TruncateTableRequest request) throws ServiceException {
1840 return stub.truncateTable(controller, request);
1841 }
1842
1843 @Override
1844 public EnableTableResponse enableTable(RpcController controller,
1845 EnableTableRequest request) throws ServiceException {
1846 return stub.enableTable(controller, request);
1847 }
1848
1849 @Override
1850 public DisableTableResponse disableTable(RpcController controller,
1851 DisableTableRequest request) throws ServiceException {
1852 return stub.disableTable(controller, request);
1853 }
1854
1855 @Override
1856 public ModifyTableResponse modifyTable(RpcController controller,
1857 ModifyTableRequest request) throws ServiceException {
1858 return stub.modifyTable(controller, request);
1859 }
1860
1861 @Override
1862 public CreateTableResponse createTable(RpcController controller,
1863 CreateTableRequest request) throws ServiceException {
1864 return stub.createTable(controller, request);
1865 }
1866
1867 @Override
1868 public ShutdownResponse shutdown(RpcController controller,
1869 ShutdownRequest request) throws ServiceException {
1870 return stub.shutdown(controller, request);
1871 }
1872
1873 @Override
1874 public StopMasterResponse stopMaster(RpcController controller,
1875 StopMasterRequest request) throws ServiceException {
1876 return stub.stopMaster(controller, request);
1877 }
1878
1879 @Override
1880 public BalanceResponse balance(RpcController controller,
1881 BalanceRequest request) throws ServiceException {
1882 return stub.balance(controller, request);
1883 }
1884
1885 @Override
1886 public SetBalancerRunningResponse setBalancerRunning(
1887 RpcController controller, SetBalancerRunningRequest request)
1888 throws ServiceException {
1889 return stub.setBalancerRunning(controller, request);
1890 }
1891
1892 @Override
1893 public NormalizeResponse normalize(RpcController controller,
1894 NormalizeRequest request) throws ServiceException {
1895 return stub.normalize(controller, request);
1896 }
1897
1898 @Override
1899 public SetNormalizerRunningResponse setNormalizerRunning(
1900 RpcController controller, SetNormalizerRunningRequest request)
1901 throws ServiceException {
1902 return stub.setNormalizerRunning(controller, request);
1903 }
1904
1905 @Override
1906 public RunCatalogScanResponse runCatalogScan(RpcController controller,
1907 RunCatalogScanRequest request) throws ServiceException {
1908 return stub.runCatalogScan(controller, request);
1909 }
1910
1911 @Override
1912 public EnableCatalogJanitorResponse enableCatalogJanitor(
1913 RpcController controller, EnableCatalogJanitorRequest request)
1914 throws ServiceException {
1915 return stub.enableCatalogJanitor(controller, request);
1916 }
1917
1918 @Override
1919 public IsCatalogJanitorEnabledResponse isCatalogJanitorEnabled(
1920 RpcController controller, IsCatalogJanitorEnabledRequest request)
1921 throws ServiceException {
1922 return stub.isCatalogJanitorEnabled(controller, request);
1923 }
1924
1925 @Override
1926 public CoprocessorServiceResponse execMasterService(
1927 RpcController controller, CoprocessorServiceRequest request)
1928 throws ServiceException {
1929 return stub.execMasterService(controller, request);
1930 }
1931
1932 @Override
1933 public SnapshotResponse snapshot(RpcController controller,
1934 SnapshotRequest request) throws ServiceException {
1935 return stub.snapshot(controller, request);
1936 }
1937
1938 @Override
1939 public GetCompletedSnapshotsResponse getCompletedSnapshots(
1940 RpcController controller, GetCompletedSnapshotsRequest request)
1941 throws ServiceException {
1942 return stub.getCompletedSnapshots(controller, request);
1943 }
1944
1945 @Override
1946 public DeleteSnapshotResponse deleteSnapshot(RpcController controller,
1947 DeleteSnapshotRequest request) throws ServiceException {
1948 return stub.deleteSnapshot(controller, request);
1949 }
1950
1951 @Override
1952 public IsSnapshotDoneResponse isSnapshotDone(RpcController controller,
1953 IsSnapshotDoneRequest request) throws ServiceException {
1954 return stub.isSnapshotDone(controller, request);
1955 }
1956
1957 @Override
1958 public RestoreSnapshotResponse restoreSnapshot(
1959 RpcController controller, RestoreSnapshotRequest request)
1960 throws ServiceException {
1961 return stub.restoreSnapshot(controller, request);
1962 }
1963
1964 @Override
1965 public IsRestoreSnapshotDoneResponse isRestoreSnapshotDone(
1966 RpcController controller, IsRestoreSnapshotDoneRequest request)
1967 throws ServiceException {
1968 return stub.isRestoreSnapshotDone(controller, request);
1969 }
1970
1971 @Override
1972 public ExecProcedureResponse execProcedure(
1973 RpcController controller, ExecProcedureRequest request)
1974 throws ServiceException {
1975 return stub.execProcedure(controller, request);
1976 }
1977
1978 @Override
1979 public ExecProcedureResponse execProcedureWithRet(
1980 RpcController controller, ExecProcedureRequest request)
1981 throws ServiceException {
1982 return stub.execProcedureWithRet(controller, request);
1983 }
1984
1985 @Override
1986 public IsProcedureDoneResponse isProcedureDone(RpcController controller,
1987 IsProcedureDoneRequest request) throws ServiceException {
1988 return stub.isProcedureDone(controller, request);
1989 }
1990
1991 @Override
1992 public GetProcedureResultResponse getProcedureResult(RpcController controller,
1993 GetProcedureResultRequest request) throws ServiceException {
1994 return stub.getProcedureResult(controller, request);
1995 }
1996
1997 @Override
1998 public IsMasterRunningResponse isMasterRunning(
1999 RpcController controller, IsMasterRunningRequest request)
2000 throws ServiceException {
2001 return stub.isMasterRunning(controller, request);
2002 }
2003
2004 @Override
2005 public ModifyNamespaceResponse modifyNamespace(RpcController controller,
2006 ModifyNamespaceRequest request)
2007 throws ServiceException {
2008 return stub.modifyNamespace(controller, request);
2009 }
2010
2011 @Override
2012 public CreateNamespaceResponse createNamespace(
2013 RpcController controller, CreateNamespaceRequest request) throws ServiceException {
2014 return stub.createNamespace(controller, request);
2015 }
2016
2017 @Override
2018 public DeleteNamespaceResponse deleteNamespace(
2019 RpcController controller, DeleteNamespaceRequest request) throws ServiceException {
2020 return stub.deleteNamespace(controller, request);
2021 }
2022
2023 @Override
2024 public GetNamespaceDescriptorResponse getNamespaceDescriptor(RpcController controller,
2025 GetNamespaceDescriptorRequest request) throws ServiceException {
2026 return stub.getNamespaceDescriptor(controller, request);
2027 }
2028
2029 @Override
2030 public ListNamespaceDescriptorsResponse listNamespaceDescriptors(RpcController controller,
2031 ListNamespaceDescriptorsRequest request) throws ServiceException {
2032 return stub.listNamespaceDescriptors(controller, request);
2033 }
2034
2035 @Override
2036 public ListTableDescriptorsByNamespaceResponse listTableDescriptorsByNamespace(
2037 RpcController controller, ListTableDescriptorsByNamespaceRequest request)
2038 throws ServiceException {
2039 return stub.listTableDescriptorsByNamespace(controller, request);
2040 }
2041
2042 @Override
2043 public ListTableNamesByNamespaceResponse listTableNamesByNamespace(
2044 RpcController controller, ListTableNamesByNamespaceRequest request)
2045 throws ServiceException {
2046 return stub.listTableNamesByNamespace(controller, request);
2047 }
2048
2049 @Override
2050 public void close() {
2051 release(this.mss);
2052 }
2053
2054 @Override
2055 public GetSchemaAlterStatusResponse getSchemaAlterStatus(
2056 RpcController controller, GetSchemaAlterStatusRequest request)
2057 throws ServiceException {
2058 return stub.getSchemaAlterStatus(controller, request);
2059 }
2060
2061 @Override
2062 public GetTableDescriptorsResponse getTableDescriptors(
2063 RpcController controller, GetTableDescriptorsRequest request)
2064 throws ServiceException {
2065 return stub.getTableDescriptors(controller, request);
2066 }
2067
2068 @Override
2069 public GetTableNamesResponse getTableNames(
2070 RpcController controller, GetTableNamesRequest request)
2071 throws ServiceException {
2072 return stub.getTableNames(controller, request);
2073 }
2074
2075 @Override
2076 public GetClusterStatusResponse getClusterStatus(
2077 RpcController controller, GetClusterStatusRequest request)
2078 throws ServiceException {
2079 return stub.getClusterStatus(controller, request);
2080 }
2081
2082 @Override
2083 public SetQuotaResponse setQuota(RpcController controller, SetQuotaRequest request)
2084 throws ServiceException {
2085 return stub.setQuota(controller, request);
2086 }
2087
2088 @Override
2089 public MajorCompactionTimestampResponse getLastMajorCompactionTimestamp(
2090 RpcController controller, MajorCompactionTimestampRequest request)
2091 throws ServiceException {
2092 return stub.getLastMajorCompactionTimestamp(controller, request);
2093 }
2094
2095 @Override
2096 public MajorCompactionTimestampResponse getLastMajorCompactionTimestampForRegion(
2097 RpcController controller, MajorCompactionTimestampForRegionRequest request)
2098 throws ServiceException {
2099 return stub.getLastMajorCompactionTimestampForRegion(controller, request);
2100 }
2101
2102 @Override
2103 public IsBalancerEnabledResponse isBalancerEnabled(RpcController controller,
2104 IsBalancerEnabledRequest request) throws ServiceException {
2105 return stub.isBalancerEnabled(controller, request);
2106 }
2107
2108 @Override
2109 public IsNormalizerEnabledResponse isNormalizerEnabled(RpcController controller,
2110 IsNormalizerEnabledRequest request) throws ServiceException {
2111 return stub.isNormalizerEnabled(controller, request);
2112 }
2113
2114 @Override
2115 public SecurityCapabilitiesResponse getSecurityCapabilities(RpcController controller,
2116 SecurityCapabilitiesRequest request) throws ServiceException {
2117 return stub.getSecurityCapabilities(controller, request);
2118 }
2119 };
2120 }
2121
2122
2123 private static void release(MasterServiceState mss) {
2124 if (mss != null && mss.connection != null) {
2125 ((HConnectionImplementation)mss.connection).releaseMaster(mss);
2126 }
2127 }
2128
2129 private boolean isKeepAliveMasterConnectedAndRunning(MasterServiceState mss) {
2130 if (mss.getStub() == null){
2131 return false;
2132 }
2133 try {
2134 return mss.isMasterRunning();
2135 } catch (UndeclaredThrowableException e) {
2136
2137
2138 LOG.info("Master connection is not running anymore", e.getUndeclaredThrowable());
2139 return false;
2140 } catch (ServiceException se) {
2141 LOG.warn("Checking master connection", se);
2142 return false;
2143 }
2144 }
2145
2146 void releaseMaster(MasterServiceState mss) {
2147 if (mss.getStub() == null) return;
2148 synchronized (masterAndZKLock) {
2149 --mss.userCount;
2150 }
2151 }
2152
2153 private void closeMasterService(MasterServiceState mss) {
2154 if (mss.getStub() != null) {
2155 LOG.info("Closing master protocol: " + mss);
2156 mss.clearStub();
2157 }
2158 mss.userCount = 0;
2159 }
2160
2161
2162
2163
2164
2165 private void closeMaster() {
2166 synchronized (masterAndZKLock) {
2167 closeMasterService(masterServiceState);
2168 }
2169 }
2170
2171 void updateCachedLocation(HRegionInfo hri, ServerName source,
2172 ServerName serverName, long seqNum) {
2173 HRegionLocation newHrl = new HRegionLocation(hri, serverName, seqNum);
2174 cacheLocation(hri.getTable(), source, newHrl);
2175 }
2176
2177 @Override
2178 public void deleteCachedRegionLocation(final HRegionLocation location) {
2179 metaCache.clearCache(location);
2180 }
2181
2182 @Override
2183 public void updateCachedLocations(final TableName tableName, byte[] rowkey,
2184 final Object exception, final HRegionLocation source) {
2185 assert source != null;
2186 updateCachedLocations(tableName, source.getRegionInfo().getRegionName()
2187 , rowkey, exception, source.getServerName());
2188 }
2189
2190
2191
2192
2193
2194
2195
2196
2197
2198 @Override
2199 public void updateCachedLocations(final TableName tableName, byte[] regionName, byte[] rowkey,
2200 final Object exception, final ServerName source) {
2201 if (rowkey == null || tableName == null) {
2202 LOG.warn("Coding error, see method javadoc. row=" + (rowkey == null ? "null" : rowkey) +
2203 ", tableName=" + (tableName == null ? "null" : tableName));
2204 return;
2205 }
2206
2207 if (source == null) {
2208
2209 return;
2210 }
2211
2212 if (regionName == null) {
2213
2214 metaCache.clearCache(tableName, rowkey, source);
2215 return;
2216 }
2217
2218
2219 final RegionLocations oldLocations = getCachedLocation(tableName, rowkey);
2220 HRegionLocation oldLocation = null;
2221 if (oldLocations != null) {
2222 oldLocation = oldLocations.getRegionLocationByRegionName(regionName);
2223 }
2224 if (oldLocation == null || !source.equals(oldLocation.getServerName())) {
2225
2226
2227 return;
2228 }
2229
2230 HRegionInfo regionInfo = oldLocation.getRegionInfo();
2231 Throwable cause = ClientExceptionsUtil.findException(exception);
2232 if (cause != null) {
2233 if (!ClientExceptionsUtil.isMetaClearingException(cause)) {
2234
2235 return;
2236 }
2237
2238 if (cause instanceof RegionMovedException) {
2239 RegionMovedException rme = (RegionMovedException) cause;
2240 if (LOG.isTraceEnabled()) {
2241 LOG.trace("Region " + regionInfo.getRegionNameAsString() + " moved to " +
2242 rme.getHostname() + ":" + rme.getPort() +
2243 " according to " + source.getHostAndPort());
2244 }
2245
2246
2247 updateCachedLocation(
2248 regionInfo, source, rme.getServerName(), rme.getLocationSeqNum());
2249 return;
2250 }
2251 }
2252
2253
2254
2255 metaCache.clearCache(regionInfo);
2256 }
2257
2258 @Override
2259 public void updateCachedLocations(final byte[] tableName, byte[] rowkey,
2260 final Object exception, final HRegionLocation source) {
2261 updateCachedLocations(TableName.valueOf(tableName), rowkey, exception, source);
2262 }
2263
2264 @Override
2265 @Deprecated
2266 public void processBatch(List<? extends Row> list,
2267 final TableName tableName,
2268 ExecutorService pool,
2269 Object[] results) throws IOException, InterruptedException {
2270
2271
2272
2273 if (results.length != list.size()) {
2274 throw new IllegalArgumentException(
2275 "argument results must be the same size as argument list");
2276 }
2277 processBatchCallback(list, tableName, pool, results, null);
2278 }
2279
2280 @Override
2281 @Deprecated
2282 public void processBatch(List<? extends Row> list,
2283 final byte[] tableName,
2284 ExecutorService pool,
2285 Object[] results) throws IOException, InterruptedException {
2286 processBatch(list, TableName.valueOf(tableName), pool, results);
2287 }
2288
2289
2290
2291
2292
2293
2294
2295
2296 @Override
2297 @Deprecated
2298 public <R> void processBatchCallback(
2299 List<? extends Row> list,
2300 TableName tableName,
2301 ExecutorService pool,
2302 Object[] results,
2303 Batch.Callback<R> callback)
2304 throws IOException, InterruptedException {
2305
2306 AsyncRequestFuture ars = this.asyncProcess.submitAll(
2307 pool, tableName, list, callback, results);
2308 ars.waitUntilDone();
2309 if (ars.hasError()) {
2310 throw ars.getErrors();
2311 }
2312 }
2313
2314 @Override
2315 @Deprecated
2316 public <R> void processBatchCallback(
2317 List<? extends Row> list,
2318 byte[] tableName,
2319 ExecutorService pool,
2320 Object[] results,
2321 Batch.Callback<R> callback)
2322 throws IOException, InterruptedException {
2323 processBatchCallback(list, TableName.valueOf(tableName), pool, results, callback);
2324 }
2325
2326
2327 protected AsyncProcess createAsyncProcess(Configuration conf) {
2328
2329 return new AsyncProcess(this, conf, this.batchPool,
2330 RpcRetryingCallerFactory.instantiate(conf, this.getStatisticsTracker()), false,
2331 RpcControllerFactory.instantiate(conf));
2332 }
2333
2334 @Override
2335 public AsyncProcess getAsyncProcess() {
2336 return asyncProcess;
2337 }
2338
2339 @Override
2340 public ServerStatisticTracker getStatisticsTracker() {
2341 return this.stats;
2342 }
2343
2344 @Override
2345 public ClientBackoffPolicy getBackoffPolicy() {
2346 return this.backoffPolicy;
2347 }
2348
2349
2350
2351
2352
2353 @VisibleForTesting
2354 int getNumberOfCachedRegionLocations(final TableName tableName) {
2355 return metaCache.getNumberOfCachedRegionLocations(tableName);
2356 }
2357
2358 @Override
2359 @Deprecated
2360 public void setRegionCachePrefetch(final TableName tableName, final boolean enable) {
2361 }
2362
2363 @Override
2364 @Deprecated
2365 public void setRegionCachePrefetch(final byte[] tableName,
2366 final boolean enable) {
2367 }
2368
2369 @Override
2370 @Deprecated
2371 public boolean getRegionCachePrefetch(TableName tableName) {
2372 return false;
2373 }
2374
2375 @Override
2376 @Deprecated
2377 public boolean getRegionCachePrefetch(byte[] tableName) {
2378 return false;
2379 }
2380
2381 @Override
2382 public void abort(final String msg, Throwable t) {
2383 if (t instanceof KeeperException.SessionExpiredException
2384 && keepAliveZookeeper != null) {
2385 synchronized (masterAndZKLock) {
2386 if (keepAliveZookeeper != null) {
2387 LOG.warn("This client just lost it's session with ZooKeeper," +
2388 " closing it." +
2389 " It will be recreated next time someone needs it", t);
2390 closeZooKeeperWatcher();
2391 }
2392 }
2393 } else {
2394 if (t != null) {
2395 LOG.fatal(msg, t);
2396 } else {
2397 LOG.fatal(msg);
2398 }
2399 this.aborted = true;
2400 close();
2401 this.closed = true;
2402 }
2403 }
2404
2405 @Override
2406 public boolean isClosed() {
2407 return this.closed;
2408 }
2409
2410 @Override
2411 public boolean isAborted(){
2412 return this.aborted;
2413 }
2414
2415 @Override
2416 public int getCurrentNrHRS() throws IOException {
2417 return this.registry.getCurrentNrHRS();
2418 }
2419
2420
2421
2422
2423 void incCount() {
2424 ++refCount;
2425 }
2426
2427
2428
2429
2430 void decCount() {
2431 if (refCount > 0) {
2432 --refCount;
2433 }
2434 }
2435
2436
2437
2438
2439
2440
2441 boolean isZeroReference() {
2442 return refCount == 0;
2443 }
2444
2445 void internalClose() {
2446 if (this.closed) {
2447 return;
2448 }
2449 closeMaster();
2450 shutdownPools();
2451 if (this.metrics != null) {
2452 this.metrics.shutdown();
2453 }
2454 this.closed = true;
2455 closeZooKeeperWatcher();
2456 this.stubs.clear();
2457 if (clusterStatusListener != null) {
2458 clusterStatusListener.close();
2459 }
2460 if (rpcClient != null) {
2461 rpcClient.close();
2462 }
2463 }
2464
2465 @Override
2466 public void close() {
2467 if (managed) {
2468 if (aborted) {
2469 ConnectionManager.deleteStaleConnection(this);
2470 } else {
2471 ConnectionManager.deleteConnection(this, false);
2472 }
2473 } else {
2474 internalClose();
2475 }
2476 }
2477
2478
2479
2480
2481
2482
2483
2484
2485
2486
2487
2488
2489 @Override
2490 protected void finalize() throws Throwable {
2491 super.finalize();
2492
2493 refCount = 1;
2494 close();
2495 }
2496
2497
2498
2499
2500 @Deprecated
2501 @Override
2502 public HTableDescriptor[] listTables() throws IOException {
2503 MasterKeepAliveConnection master = getKeepAliveMasterService();
2504 try {
2505 GetTableDescriptorsRequest req =
2506 RequestConverter.buildGetTableDescriptorsRequest((List<TableName>)null);
2507 return ProtobufUtil.getHTableDescriptorArray(master.getTableDescriptors(null, req));
2508 } catch (ServiceException se) {
2509 throw ProtobufUtil.getRemoteException(se);
2510 } finally {
2511 master.close();
2512 }
2513 }
2514
2515
2516
2517
2518 @Deprecated
2519 @Override
2520 public String[] getTableNames() throws IOException {
2521 TableName[] tableNames = listTableNames();
2522 String result[] = new String[tableNames.length];
2523 for (int i = 0; i < tableNames.length; i++) {
2524 result[i] = tableNames[i].getNameAsString();
2525 }
2526 return result;
2527 }
2528
2529
2530
2531
2532 @Deprecated
2533 @Override
2534 public TableName[] listTableNames() throws IOException {
2535 MasterKeepAliveConnection master = getKeepAliveMasterService();
2536 try {
2537 return ProtobufUtil.getTableNameArray(master.getTableNames(null,
2538 GetTableNamesRequest.newBuilder().build())
2539 .getTableNamesList());
2540 } catch (ServiceException se) {
2541 throw ProtobufUtil.getRemoteException(se);
2542 } finally {
2543 master.close();
2544 }
2545 }
2546
2547
2548
2549
2550 @Deprecated
2551 @Override
2552 public HTableDescriptor[] getHTableDescriptorsByTableName(
2553 List<TableName> tableNames) throws IOException {
2554 if (tableNames == null || tableNames.isEmpty()) return new HTableDescriptor[0];
2555 MasterKeepAliveConnection master = getKeepAliveMasterService();
2556 try {
2557 GetTableDescriptorsRequest req =
2558 RequestConverter.buildGetTableDescriptorsRequest(tableNames);
2559 return ProtobufUtil.getHTableDescriptorArray(master.getTableDescriptors(null, req));
2560 } catch (ServiceException se) {
2561 throw ProtobufUtil.getRemoteException(se);
2562 } finally {
2563 master.close();
2564 }
2565 }
2566
2567
2568
2569
2570 @Deprecated
2571 @Override
2572 public HTableDescriptor[] getHTableDescriptors(
2573 List<String> names) throws IOException {
2574 List<TableName> tableNames = new ArrayList<TableName>(names.size());
2575 for(String name : names) {
2576 tableNames.add(TableName.valueOf(name));
2577 }
2578
2579 return getHTableDescriptorsByTableName(tableNames);
2580 }
2581
2582 @Override
2583 public NonceGenerator getNonceGenerator() {
2584 return this.nonceGenerator;
2585 }
2586
2587
2588
2589
2590
2591
2592
2593
2594 @Deprecated
2595 @Override
2596 public HTableDescriptor getHTableDescriptor(final TableName tableName)
2597 throws IOException {
2598 if (tableName == null) return null;
2599 MasterKeepAliveConnection master = getKeepAliveMasterService();
2600 GetTableDescriptorsResponse htds;
2601 try {
2602 GetTableDescriptorsRequest req =
2603 RequestConverter.buildGetTableDescriptorsRequest(tableName);
2604 htds = master.getTableDescriptors(null, req);
2605 } catch (ServiceException se) {
2606 throw ProtobufUtil.getRemoteException(se);
2607 } finally {
2608 master.close();
2609 }
2610 if (!htds.getTableSchemaList().isEmpty()) {
2611 return HTableDescriptor.convert(htds.getTableSchemaList().get(0));
2612 }
2613 throw new TableNotFoundException(tableName.getNameAsString());
2614 }
2615
2616
2617
2618
2619 @Deprecated
2620 @Override
2621 public HTableDescriptor getHTableDescriptor(final byte[] tableName)
2622 throws IOException {
2623 return getHTableDescriptor(TableName.valueOf(tableName));
2624 }
2625
2626 @Override
2627 public RpcRetryingCallerFactory getNewRpcRetryingCallerFactory(Configuration conf) {
2628 return RpcRetryingCallerFactory
2629 .instantiate(conf, this.interceptor, this.getStatisticsTracker());
2630 }
2631
2632 @Override
2633 public boolean isManaged() {
2634 return managed;
2635 }
2636
2637 @Override
2638 public boolean hasCellBlockSupport() {
2639 return this.rpcClient.hasCellBlockSupport();
2640 }
2641
2642 @Override
2643 public ConnectionConfiguration getConnectionConfiguration() {
2644 return this.connectionConfig;
2645 }
2646
2647 @Override
2648 public RpcRetryingCallerFactory getRpcRetryingCallerFactory() {
2649 return this.rpcCallerFactory;
2650 }
2651
2652 @Override
2653 public RpcControllerFactory getRpcControllerFactory() {
2654 return this.rpcControllerFactory;
2655 }
2656 }
2657
2658
2659
2660
2661 static class ServerErrorTracker {
2662
2663 private final ConcurrentMap<ServerName, ServerErrors> errorsByServer =
2664 new ConcurrentHashMap<ServerName, ServerErrors>();
2665 private final long canRetryUntil;
2666 private final int maxRetries;
2667 private final long startTrackingTime;
2668
2669 public ServerErrorTracker(long timeout, int maxRetries) {
2670 this.maxRetries = maxRetries;
2671 this.canRetryUntil = EnvironmentEdgeManager.currentTime() + timeout;
2672 this.startTrackingTime = new Date().getTime();
2673 }
2674
2675
2676
2677
2678 boolean canRetryMore(int numRetry) {
2679
2680 return numRetry < maxRetries || (maxRetries > 1 &&
2681 EnvironmentEdgeManager.currentTime() < this.canRetryUntil);
2682 }
2683
2684
2685
2686
2687
2688
2689
2690
2691 long calculateBackoffTime(ServerName server, long basePause) {
2692 long result;
2693 ServerErrors errorStats = errorsByServer.get(server);
2694 if (errorStats != null) {
2695 result = ConnectionUtils.getPauseTime(basePause, errorStats.retries.get());
2696 } else {
2697 result = 0;
2698 }
2699 return result;
2700 }
2701
2702
2703
2704
2705
2706
2707 void reportServerError(ServerName server) {
2708 ServerErrors errors = errorsByServer.get(server);
2709 if (errors != null) {
2710 errors.addError();
2711 } else {
2712 errors = errorsByServer.putIfAbsent(server, new ServerErrors());
2713 if (errors != null){
2714 errors.addError();
2715 }
2716 }
2717 }
2718
2719 long getStartTrackingTime() {
2720 return startTrackingTime;
2721 }
2722
2723
2724
2725
2726 private static class ServerErrors {
2727 public final AtomicInteger retries = new AtomicInteger(0);
2728
2729 public void addError() {
2730 retries.incrementAndGet();
2731 }
2732 }
2733 }
2734 }