1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.client;
20
21 import java.io.Closeable;
22 import java.io.IOException;
23 import java.io.InterruptedIOException;
24 import java.lang.reflect.UndeclaredThrowableException;
25 import java.net.InetAddress;
26 import java.net.InetSocketAddress;
27 import java.util.ArrayList;
28 import java.util.Date;
29 import java.util.HashSet;
30 import java.util.LinkedHashMap;
31 import java.util.List;
32 import java.util.concurrent.BlockingQueue;
33 import java.util.Map;
34 import java.util.Map.Entry;
35 import java.util.NavigableMap;
36 import java.util.Set;
37 import java.util.concurrent.ConcurrentHashMap;
38 import java.util.concurrent.ConcurrentMap;
39 import java.util.concurrent.ExecutorService;
40 import java.util.concurrent.LinkedBlockingQueue;
41 import java.util.concurrent.ThreadPoolExecutor;
42 import java.util.concurrent.TimeUnit;
43 import java.util.concurrent.atomic.AtomicBoolean;
44 import java.util.concurrent.atomic.AtomicInteger;
45
46 import org.apache.commons.logging.Log;
47 import org.apache.commons.logging.LogFactory;
48 import org.apache.hadoop.conf.Configuration;
49 import org.apache.hadoop.hbase.DoNotRetryIOException;
50 import org.apache.hadoop.hbase.HBaseConfiguration;
51 import org.apache.hadoop.hbase.HConstants;
52 import org.apache.hadoop.hbase.HRegionInfo;
53 import org.apache.hadoop.hbase.HRegionLocation;
54 import org.apache.hadoop.hbase.HTableDescriptor;
55 import org.apache.hadoop.hbase.MasterNotRunningException;
56 import org.apache.hadoop.hbase.MetaTableAccessor;
57 import org.apache.hadoop.hbase.RegionLocations;
58 import org.apache.hadoop.hbase.RegionTooBusyException;
59 import org.apache.hadoop.hbase.ServerName;
60 import org.apache.hadoop.hbase.TableName;
61 import org.apache.hadoop.hbase.TableNotEnabledException;
62 import org.apache.hadoop.hbase.TableNotFoundException;
63 import org.apache.hadoop.hbase.ZooKeeperConnectionException;
64 import org.apache.hadoop.hbase.classification.InterfaceAudience;
65 import org.apache.hadoop.hbase.client.AsyncProcess.AsyncRequestFuture;
66 import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor;
67 import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitorBase;
68 import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy;
69 import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicyFactory;
70 import org.apache.hadoop.hbase.client.coprocessor.Batch;
71 import org.apache.hadoop.hbase.exceptions.RegionMovedException;
72 import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
73 import org.apache.hadoop.hbase.ipc.RpcClient;
74 import org.apache.hadoop.hbase.ipc.RpcClientFactory;
75 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
76 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
77 import org.apache.hadoop.hbase.protobuf.RequestConverter;
78 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
79 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
80 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
81 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
82 import org.apache.hadoop.hbase.protobuf.generated.*;
83 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AddColumnRequest;
84 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AddColumnResponse;
85 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AssignRegionRequest;
86 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AssignRegionResponse;
87 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.BalanceRequest;
88 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.BalanceResponse;
89 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.CreateNamespaceRequest;
90 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.CreateNamespaceResponse;
91 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.CreateTableRequest;
92 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.CreateTableResponse;
93 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteColumnRequest;
94 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteColumnResponse;
95 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteNamespaceRequest;
96 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteNamespaceResponse;
97 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteSnapshotRequest;
98 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteSnapshotResponse;
99 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteTableRequest;
100 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteTableResponse;
101 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DisableTableRequest;
102 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DisableTableResponse;
103 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DispatchMergingRegionsRequest;
104 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DispatchMergingRegionsResponse;
105 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.EnableCatalogJanitorRequest;
106 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.EnableCatalogJanitorResponse;
107 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.EnableTableRequest;
108 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.EnableTableResponse;
109 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ExecProcedureRequest;
110 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ExecProcedureResponse;
111 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetClusterStatusRequest;
112 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetClusterStatusResponse;
113 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetCompletedSnapshotsRequest;
114 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetCompletedSnapshotsResponse;
115 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetNamespaceDescriptorRequest;
116 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetNamespaceDescriptorResponse;
117 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetSchemaAlterStatusRequest;
118 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetSchemaAlterStatusResponse;
119 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableDescriptorsRequest;
120 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableDescriptorsResponse;
121 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableNamesRequest;
122 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableNamesResponse;
123 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetProcedureResultRequest;
124 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetProcedureResultResponse;
125 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsBalancerEnabledRequest;
126 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsBalancerEnabledResponse;
127 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsCatalogJanitorEnabledRequest;
128 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsCatalogJanitorEnabledResponse;
129 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsMasterRunningRequest;
130 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsMasterRunningResponse;
131 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsProcedureDoneRequest;
132 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsProcedureDoneResponse;
133 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsRestoreSnapshotDoneRequest;
134 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsRestoreSnapshotDoneResponse;
135 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsSnapshotDoneRequest;
136 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsSnapshotDoneResponse;
137 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ListNamespaceDescriptorsRequest;
138 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ListNamespaceDescriptorsResponse;
139 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceRequest;
140 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceResponse;
141 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ListTableNamesByNamespaceRequest;
142 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ListTableNamesByNamespaceResponse;
143 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MajorCompactionTimestampForRegionRequest;
144 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MajorCompactionTimestampRequest;
145 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MajorCompactionTimestampResponse;
146 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MasterService;
147 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ModifyColumnRequest;
148 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ModifyColumnResponse;
149 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ModifyNamespaceRequest;
150 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ModifyNamespaceResponse;
151 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ModifyTableRequest;
152 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ModifyTableResponse;
153 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MoveRegionRequest;
154 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MoveRegionResponse;
155 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.OfflineRegionRequest;
156 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.OfflineRegionResponse;
157 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.RestoreSnapshotRequest;
158 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.RestoreSnapshotResponse;
159 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.RunCatalogScanRequest;
160 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.RunCatalogScanResponse;
161 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SetBalancerRunningRequest;
162 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SetBalancerRunningResponse;
163 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SetQuotaRequest;
164 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SetQuotaResponse;
165 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ShutdownRequest;
166 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ShutdownResponse;
167 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SnapshotRequest;
168 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SnapshotResponse;
169 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.StopMasterRequest;
170 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.StopMasterResponse;
171 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.TruncateTableRequest;
172 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.TruncateTableResponse;
173 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.UnassignRegionRequest;
174 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.UnassignRegionResponse;
175 import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
176 import org.apache.hadoop.hbase.security.User;
177 import org.apache.hadoop.hbase.security.UserProvider;
178 import org.apache.hadoop.hbase.util.Bytes;
179 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
180 import org.apache.hadoop.hbase.util.ExceptionUtil;
181 import org.apache.hadoop.hbase.util.Threads;
182 import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
183 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
184 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
185 import org.apache.hadoop.ipc.RemoteException;
186 import org.apache.zookeeper.KeeperException;
187
188 import com.google.common.annotations.VisibleForTesting;
189 import com.google.protobuf.BlockingRpcChannel;
190 import com.google.protobuf.RpcController;
191 import com.google.protobuf.ServiceException;
192
193
194
195
196 @SuppressWarnings("serial")
197 @InterfaceAudience.Private
198
199 class ConnectionManager {
200 static final Log LOG = LogFactory.getLog(ConnectionManager.class);
201
202 public static final String RETRIES_BY_SERVER_KEY = "hbase.client.retries.by.server";
203 private static final String CLIENT_NONCES_ENABLED_KEY = "hbase.client.nonces.enabled";
204
205
206
207
208 static final Map<HConnectionKey, HConnectionImplementation> CONNECTION_INSTANCES;
209
210 public static final int MAX_CACHED_CONNECTION_INSTANCES;
211
212
213
214
215
216 private static volatile NonceGenerator nonceGenerator = null;
217
218 private static Object nonceGeneratorCreateLock = new Object();
219
220 static {
221
222
223
224
225 MAX_CACHED_CONNECTION_INSTANCES = HBaseConfiguration.create().getInt(
226 HConstants.ZOOKEEPER_MAX_CLIENT_CNXNS, HConstants.DEFAULT_ZOOKEPER_MAX_CLIENT_CNXNS) + 1;
227 CONNECTION_INSTANCES = new LinkedHashMap<HConnectionKey, HConnectionImplementation>(
228 (int) (MAX_CACHED_CONNECTION_INSTANCES / 0.75F) + 1, 0.75F, true) {
229 @Override
230 protected boolean removeEldestEntry(
231 Map.Entry<HConnectionKey, HConnectionImplementation> eldest) {
232 return size() > MAX_CACHED_CONNECTION_INSTANCES;
233 }
234 };
235 }
236
237
238 static class NoNonceGenerator implements NonceGenerator {
239 @Override
240 public long getNonceGroup() {
241 return HConstants.NO_NONCE;
242 }
243 @Override
244 public long newNonce() {
245 return HConstants.NO_NONCE;
246 }
247 }
248
249
250
251
252 private ConnectionManager() {
253 super();
254 }
255
256
257
258
259
260
261 @VisibleForTesting
262 static NonceGenerator injectNonceGeneratorForTesting(
263 ClusterConnection conn, NonceGenerator cnm) {
264 HConnectionImplementation connImpl = (HConnectionImplementation)conn;
265 NonceGenerator ng = connImpl.getNonceGenerator();
266 LOG.warn("Nonce generator is being replaced by test code for " + cnm.getClass().getName());
267 connImpl.nonceGenerator = cnm;
268 return ng;
269 }
270
271
272
273
274
275
276
277
278
279
280 @Deprecated
281 public static HConnection getConnection(final Configuration conf) throws IOException {
282 return getConnectionInternal(conf);
283 }
284
285
286 static ClusterConnection getConnectionInternal(final Configuration conf)
287 throws IOException {
288 HConnectionKey connectionKey = new HConnectionKey(conf);
289 synchronized (CONNECTION_INSTANCES) {
290 HConnectionImplementation connection = CONNECTION_INSTANCES.get(connectionKey);
291 if (connection == null) {
292 connection = (HConnectionImplementation)createConnection(conf, true);
293 CONNECTION_INSTANCES.put(connectionKey, connection);
294 } else if (connection.isClosed()) {
295 ConnectionManager.deleteConnection(connectionKey, true);
296 connection = (HConnectionImplementation)createConnection(conf, true);
297 CONNECTION_INSTANCES.put(connectionKey, connection);
298 }
299 connection.incCount();
300 return connection;
301 }
302 }
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324 public static HConnection createConnection(Configuration conf) throws IOException {
325 return createConnectionInternal(conf);
326 }
327
328 static ClusterConnection createConnectionInternal(Configuration conf) throws IOException {
329 UserProvider provider = UserProvider.instantiate(conf);
330 return createConnection(conf, false, null, provider.getCurrent());
331 }
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353 public static HConnection createConnection(Configuration conf, ExecutorService pool)
354 throws IOException {
355 UserProvider provider = UserProvider.instantiate(conf);
356 return createConnection(conf, false, pool, provider.getCurrent());
357 }
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379 public static HConnection createConnection(Configuration conf, User user)
380 throws IOException {
381 return createConnection(conf, false, null, user);
382 }
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405 public static HConnection createConnection(Configuration conf, ExecutorService pool, User user)
406 throws IOException {
407 return createConnection(conf, false, pool, user);
408 }
409
410 @Deprecated
411 static HConnection createConnection(final Configuration conf, final boolean managed)
412 throws IOException {
413 UserProvider provider = UserProvider.instantiate(conf);
414 return createConnection(conf, managed, null, provider.getCurrent());
415 }
416
417 @Deprecated
418 static ClusterConnection createConnection(final Configuration conf, final boolean managed,
419 final ExecutorService pool, final User user)
420 throws IOException {
421 return (ClusterConnection) ConnectionFactory.createConnection(conf, managed, pool, user);
422 }
423
424
425
426
427
428
429
430
431
432 @Deprecated
433 public static void deleteConnection(Configuration conf) {
434 deleteConnection(new HConnectionKey(conf), false);
435 }
436
437
438
439
440
441
442
443
444 @Deprecated
445 public static void deleteStaleConnection(HConnection connection) {
446 deleteConnection(connection, true);
447 }
448
449
450
451
452
453
454
455 @Deprecated
456 public static void deleteAllConnections(boolean staleConnection) {
457 synchronized (CONNECTION_INSTANCES) {
458 Set<HConnectionKey> connectionKeys = new HashSet<HConnectionKey>();
459 connectionKeys.addAll(CONNECTION_INSTANCES.keySet());
460 for (HConnectionKey connectionKey : connectionKeys) {
461 deleteConnection(connectionKey, staleConnection);
462 }
463 CONNECTION_INSTANCES.clear();
464 }
465 }
466
467
468
469
470
471 @Deprecated
472 public static void deleteAllConnections() {
473 deleteAllConnections(false);
474 }
475
476
477 @Deprecated
478 private static void deleteConnection(HConnection connection, boolean staleConnection) {
479 synchronized (CONNECTION_INSTANCES) {
480 for (Entry<HConnectionKey, HConnectionImplementation> e: CONNECTION_INSTANCES.entrySet()) {
481 if (e.getValue() == connection) {
482 deleteConnection(e.getKey(), staleConnection);
483 break;
484 }
485 }
486 }
487 }
488
489 @Deprecated
490 private static void deleteConnection(HConnectionKey connectionKey, boolean staleConnection) {
491 synchronized (CONNECTION_INSTANCES) {
492 HConnectionImplementation connection = CONNECTION_INSTANCES.get(connectionKey);
493 if (connection != null) {
494 connection.decCount();
495 if (connection.isZeroReference() || staleConnection) {
496 CONNECTION_INSTANCES.remove(connectionKey);
497 connection.internalClose();
498 }
499 } else {
500 LOG.error("Connection not found in the list, can't delete it "+
501 "(connection key=" + connectionKey + "). May be the key was modified?", new Exception());
502 }
503 }
504 }
505
506
507
508
509
510
511
512
513
514
515
516
517 @InterfaceAudience.Private
518 public static <T> T execute(HConnectable<T> connectable) throws IOException {
519 if (connectable == null || connectable.conf == null) {
520 return null;
521 }
522 Configuration conf = connectable.conf;
523 HConnection connection = getConnection(conf);
524 boolean connectSucceeded = false;
525 try {
526 T returnValue = connectable.connect(connection);
527 connectSucceeded = true;
528 return returnValue;
529 } finally {
530 try {
531 connection.close();
532 } catch (Exception e) {
533 ExceptionUtil.rethrowIfInterrupt(e);
534 if (connectSucceeded) {
535 throw new IOException("The connection to " + connection
536 + " could not be deleted.", e);
537 }
538 }
539 }
540 }
541
542
543 @edu.umd.cs.findbugs.annotations.SuppressWarnings(
544 value="AT_OPERATION_SEQUENCE_ON_CONCURRENT_ABSTRACTION",
545 justification="Access to the conncurrent hash map is under a lock so should be fine.")
546 static class HConnectionImplementation implements ClusterConnection, Closeable {
547 static final Log LOG = LogFactory.getLog(HConnectionImplementation.class);
548 private final long pause;
549 private final boolean useMetaReplicas;
550 private final int numTries;
551 final int rpcTimeout;
552 private NonceGenerator nonceGenerator = null;
553 private final AsyncProcess asyncProcess;
554
555 private final ServerStatisticTracker stats;
556
557 private volatile boolean closed;
558 private volatile boolean aborted;
559
560
561 ClusterStatusListener clusterStatusListener;
562
563
564 private final Object metaRegionLock = new Object();
565
566
567
568
569
570
571 private final Object masterAndZKLock = new Object();
572
573 private long keepZooKeeperWatcherAliveUntil = Long.MAX_VALUE;
574
575
576
577 private volatile ExecutorService batchPool = null;
578
579
580 private volatile ExecutorService metaLookupPool = null;
581 private volatile boolean cleanupPool = false;
582
583 private final Configuration conf;
584
585
586
587 private final ConnectionConfiguration connectionConfig;
588
589
590 private RpcClient rpcClient;
591
592 private MetaCache metaCache = new MetaCache();
593
594 private int refCount;
595
596
597 private boolean managed;
598
599 private User user;
600
601 private RpcRetryingCallerFactory rpcCallerFactory;
602
603 private RpcControllerFactory rpcControllerFactory;
604
605 private final RetryingCallerInterceptor interceptor;
606
607
608
609
610 Registry registry;
611
612 private final ClientBackoffPolicy backoffPolicy;
613
614 HConnectionImplementation(Configuration conf, boolean managed) throws IOException {
615 this(conf, managed, null, null);
616 }
617
618
619
620
621
622
623
624
625
626
627
628
629 HConnectionImplementation(Configuration conf, boolean managed,
630 ExecutorService pool, User user) throws IOException {
631 this(conf);
632 this.user = user;
633 this.batchPool = pool;
634 this.managed = managed;
635 this.registry = setupRegistry();
636 retrieveClusterId();
637
638 this.rpcClient = RpcClientFactory.createClient(this.conf, this.clusterId);
639 this.rpcControllerFactory = RpcControllerFactory.instantiate(conf);
640
641
642 boolean shouldListen = conf.getBoolean(HConstants.STATUS_PUBLISHED,
643 HConstants.STATUS_PUBLISHED_DEFAULT);
644 Class<? extends ClusterStatusListener.Listener> listenerClass =
645 conf.getClass(ClusterStatusListener.STATUS_LISTENER_CLASS,
646 ClusterStatusListener.DEFAULT_STATUS_LISTENER_CLASS,
647 ClusterStatusListener.Listener.class);
648 if (shouldListen) {
649 if (listenerClass == null) {
650 LOG.warn(HConstants.STATUS_PUBLISHED + " is true, but " +
651 ClusterStatusListener.STATUS_LISTENER_CLASS + " is not set - not listening status");
652 } else {
653 clusterStatusListener = new ClusterStatusListener(
654 new ClusterStatusListener.DeadServerHandler() {
655 @Override
656 public void newDead(ServerName sn) {
657 clearCaches(sn);
658 rpcClient.cancelConnections(sn);
659 }
660 }, conf, listenerClass);
661 }
662 }
663 }
664
665
666
667
668 protected HConnectionImplementation(Configuration conf) {
669 this.conf = conf;
670 this.connectionConfig = new ConnectionConfiguration(conf);
671 this.closed = false;
672 this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
673 HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
674 this.useMetaReplicas = conf.getBoolean(HConstants.USE_META_REPLICAS,
675 HConstants.DEFAULT_USE_META_REPLICAS);
676 this.numTries = connectionConfig.getRetriesNumber();
677 this.rpcTimeout = conf.getInt(
678 HConstants.HBASE_RPC_TIMEOUT_KEY,
679 HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
680 if (conf.getBoolean(CLIENT_NONCES_ENABLED_KEY, true)) {
681 synchronized (nonceGeneratorCreateLock) {
682 if (ConnectionManager.nonceGenerator == null) {
683 ConnectionManager.nonceGenerator = new PerClientRandomNonceGenerator();
684 }
685 this.nonceGenerator = ConnectionManager.nonceGenerator;
686 }
687 } else {
688 this.nonceGenerator = new NoNonceGenerator();
689 }
690 stats = ServerStatisticTracker.create(conf);
691 this.asyncProcess = createAsyncProcess(this.conf);
692 this.interceptor = (new RetryingCallerInterceptorFactory(conf)).build();
693 this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf, interceptor, this.stats);
694 this.backoffPolicy = ClientBackoffPolicyFactory.create(conf);
695 }
696
697 @Override
698 public HTableInterface getTable(String tableName) throws IOException {
699 return getTable(TableName.valueOf(tableName));
700 }
701
702 @Override
703 public HTableInterface getTable(byte[] tableName) throws IOException {
704 return getTable(TableName.valueOf(tableName));
705 }
706
707 @Override
708 public HTableInterface getTable(TableName tableName) throws IOException {
709 return getTable(tableName, getBatchPool());
710 }
711
712 @Override
713 public HTableInterface getTable(String tableName, ExecutorService pool) throws IOException {
714 return getTable(TableName.valueOf(tableName), pool);
715 }
716
717 @Override
718 public HTableInterface getTable(byte[] tableName, ExecutorService pool) throws IOException {
719 return getTable(TableName.valueOf(tableName), pool);
720 }
721
722 @Override
723 public HTableInterface getTable(TableName tableName, ExecutorService pool) throws IOException {
724 if (managed) {
725 throw new NeedUnmanagedConnectionException();
726 }
727 return new HTable(tableName, this, connectionConfig, rpcCallerFactory, rpcControllerFactory, pool);
728 }
729
730 @Override
731 public BufferedMutator getBufferedMutator(BufferedMutatorParams params) {
732 if (params.getTableName() == null) {
733 throw new IllegalArgumentException("TableName cannot be null.");
734 }
735 if (params.getPool() == null) {
736 params.pool(HTable.getDefaultExecutor(getConfiguration()));
737 }
738 if (params.getWriteBufferSize() == BufferedMutatorParams.UNSET) {
739 params.writeBufferSize(connectionConfig.getWriteBufferSize());
740 }
741 if (params.getMaxKeyValueSize() == BufferedMutatorParams.UNSET) {
742 params.maxKeyValueSize(connectionConfig.getMaxKeyValueSize());
743 }
744 return new BufferedMutatorImpl(this, rpcCallerFactory, rpcControllerFactory, params);
745 }
746
747 @Override
748 public BufferedMutator getBufferedMutator(TableName tableName) {
749 return getBufferedMutator(new BufferedMutatorParams(tableName));
750 }
751
752 @Override
753 public RegionLocator getRegionLocator(TableName tableName) throws IOException {
754 return new HRegionLocator(tableName, this);
755 }
756
757 @Override
758 public Admin getAdmin() throws IOException {
759 if (managed) {
760 throw new NeedUnmanagedConnectionException();
761 }
762 return new HBaseAdmin(this);
763 }
764
765 private ExecutorService getBatchPool() {
766 if (batchPool == null) {
767 synchronized (this) {
768 if (batchPool == null) {
769 this.batchPool = getThreadPool(conf.getInt("hbase.hconnection.threads.max", 256),
770 conf.getInt("hbase.hconnection.threads.core", 256), "-shared-", null);
771 this.cleanupPool = true;
772 }
773 }
774 }
775 return this.batchPool;
776 }
777
778 private ExecutorService getThreadPool(int maxThreads, int coreThreads, String nameHint,
779 BlockingQueue<Runnable> passedWorkQueue) {
780
781 if (maxThreads == 0) {
782 maxThreads = Runtime.getRuntime().availableProcessors() * 8;
783 }
784 if (coreThreads == 0) {
785 coreThreads = Runtime.getRuntime().availableProcessors() * 8;
786 }
787 long keepAliveTime = conf.getLong("hbase.hconnection.threads.keepalivetime", 60);
788 BlockingQueue<Runnable> workQueue = passedWorkQueue;
789 if (workQueue == null) {
790 workQueue =
791 new LinkedBlockingQueue<Runnable>(maxThreads *
792 conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS,
793 HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS));
794 }
795 ThreadPoolExecutor tpe = new ThreadPoolExecutor(
796 coreThreads,
797 maxThreads,
798 keepAliveTime,
799 TimeUnit.SECONDS,
800 workQueue,
801 Threads.newDaemonThreadFactory(toString() + nameHint));
802 tpe.allowCoreThreadTimeOut(true);
803 return tpe;
804 }
805
806 private ExecutorService getMetaLookupPool() {
807 if (this.metaLookupPool == null) {
808 synchronized (this) {
809 if (this.metaLookupPool == null) {
810
811
812
813
814 this.metaLookupPool = getThreadPool(
815 conf.getInt("hbase.hconnection.meta.lookup.threads.max", 128),
816 conf.getInt("hbase.hconnection.meta.lookup.threads.core", 10),
817 "-metaLookup-shared-", new LinkedBlockingQueue<Runnable>());
818 }
819 }
820 }
821 return this.metaLookupPool;
822 }
823
824 protected ExecutorService getCurrentMetaLookupPool() {
825 return metaLookupPool;
826 }
827
828 protected ExecutorService getCurrentBatchPool() {
829 return batchPool;
830 }
831
832 private void shutdownPools() {
833 if (this.cleanupPool && this.batchPool != null && !this.batchPool.isShutdown()) {
834 shutdownBatchPool(this.batchPool);
835 }
836 if (this.metaLookupPool != null && !this.metaLookupPool.isShutdown()) {
837 shutdownBatchPool(this.metaLookupPool);
838 }
839 }
840
841 private void shutdownBatchPool(ExecutorService pool) {
842 pool.shutdown();
843 try {
844 if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
845 pool.shutdownNow();
846 }
847 } catch (InterruptedException e) {
848 pool.shutdownNow();
849 }
850 }
851
852
853
854
855
856 private Registry setupRegistry() throws IOException {
857 return RegistryFactory.getRegistry(this);
858 }
859
860
861
862
863 @VisibleForTesting
864 RpcClient getRpcClient() {
865 return rpcClient;
866 }
867
868
869
870
871 @Override
872 public String toString(){
873 return "hconnection-0x" + Integer.toHexString(hashCode());
874 }
875
876 protected String clusterId = null;
877
878 void retrieveClusterId() {
879 if (clusterId != null) return;
880 this.clusterId = this.registry.getClusterId();
881 if (clusterId == null) {
882 clusterId = HConstants.CLUSTER_ID_DEFAULT;
883 LOG.debug("clusterid came back null, using default " + clusterId);
884 }
885 }
886
887 @Override
888 public Configuration getConfiguration() {
889 return this.conf;
890 }
891
892 private void checkIfBaseNodeAvailable(ZooKeeperWatcher zkw)
893 throws MasterNotRunningException {
894 String errorMsg;
895 try {
896 if (ZKUtil.checkExists(zkw, zkw.baseZNode) == -1) {
897 errorMsg = "The node " + zkw.baseZNode+" is not in ZooKeeper. "
898 + "It should have been written by the master. "
899 + "Check the value configured in 'zookeeper.znode.parent'. "
900 + "There could be a mismatch with the one configured in the master.";
901 LOG.error(errorMsg);
902 throw new MasterNotRunningException(errorMsg);
903 }
904 } catch (KeeperException e) {
905 errorMsg = "Can't get connection to ZooKeeper: " + e.getMessage();
906 LOG.error(errorMsg);
907 throw new MasterNotRunningException(errorMsg, e);
908 }
909 }
910
911
912
913
914
915
916 @Deprecated
917 @Override
918 public boolean isMasterRunning()
919 throws MasterNotRunningException, ZooKeeperConnectionException {
920
921
922
923 MasterKeepAliveConnection m = getKeepAliveMasterService();
924 m.close();
925 return true;
926 }
927
928 @Override
929 public HRegionLocation getRegionLocation(final TableName tableName,
930 final byte [] row, boolean reload)
931 throws IOException {
932 return reload? relocateRegion(tableName, row): locateRegion(tableName, row);
933 }
934
935 @Override
936 public HRegionLocation getRegionLocation(final byte[] tableName,
937 final byte [] row, boolean reload)
938 throws IOException {
939 return getRegionLocation(TableName.valueOf(tableName), row, reload);
940 }
941
942 @Override
943 public boolean isTableEnabled(TableName tableName) throws IOException {
944 return this.registry.isTableOnlineState(tableName, true);
945 }
946
947 @Override
948 public boolean isTableEnabled(byte[] tableName) throws IOException {
949 return isTableEnabled(TableName.valueOf(tableName));
950 }
951
952 @Override
953 public boolean isTableDisabled(TableName tableName) throws IOException {
954 return this.registry.isTableOnlineState(tableName, false);
955 }
956
957 @Override
958 public boolean isTableDisabled(byte[] tableName) throws IOException {
959 return isTableDisabled(TableName.valueOf(tableName));
960 }
961
962 @Override
963 public boolean isTableAvailable(final TableName tableName) throws IOException {
964 final AtomicBoolean available = new AtomicBoolean(true);
965 final AtomicInteger regionCount = new AtomicInteger(0);
966 MetaScannerVisitor visitor = new MetaScannerVisitorBase() {
967 @Override
968 public boolean processRow(Result row) throws IOException {
969 HRegionInfo info = MetaScanner.getHRegionInfo(row);
970 if (info != null && !info.isSplitParent()) {
971 if (tableName.equals(info.getTable())) {
972 ServerName server = HRegionInfo.getServerName(row);
973 if (server == null) {
974 available.set(false);
975 return false;
976 }
977 regionCount.incrementAndGet();
978 } else if (tableName.compareTo(info.getTable()) < 0) {
979
980 return false;
981 }
982 }
983 return true;
984 }
985 };
986 MetaScanner.metaScan(this, visitor, tableName);
987 return available.get() && (regionCount.get() > 0);
988 }
989
990 @Override
991 public boolean isTableAvailable(final byte[] tableName) throws IOException {
992 return isTableAvailable(TableName.valueOf(tableName));
993 }
994
995 @Override
996 public boolean isTableAvailable(final TableName tableName, final byte[][] splitKeys)
997 throws IOException {
998 final AtomicBoolean available = new AtomicBoolean(true);
999 final AtomicInteger regionCount = new AtomicInteger(0);
1000 MetaScannerVisitor visitor = new MetaScannerVisitorBase() {
1001 @Override
1002 public boolean processRow(Result row) throws IOException {
1003 HRegionInfo info = MetaScanner.getHRegionInfo(row);
1004 if (info != null && !info.isSplitParent()) {
1005 if (tableName.equals(info.getTable())) {
1006 ServerName server = HRegionInfo.getServerName(row);
1007 if (server == null) {
1008 available.set(false);
1009 return false;
1010 }
1011 if (!Bytes.equals(info.getStartKey(), HConstants.EMPTY_BYTE_ARRAY)) {
1012 for (byte[] splitKey : splitKeys) {
1013
1014 if (Bytes.equals(info.getStartKey(), splitKey)) {
1015 regionCount.incrementAndGet();
1016 break;
1017 }
1018 }
1019 } else {
1020
1021 regionCount.incrementAndGet();
1022 }
1023 } else if (tableName.compareTo(info.getTable()) < 0) {
1024
1025 return false;
1026 }
1027 }
1028 return true;
1029 }
1030 };
1031 MetaScanner.metaScan(this, visitor, tableName);
1032
1033 return available.get() && (regionCount.get() == splitKeys.length + 1);
1034 }
1035
1036 @Override
1037 public boolean isTableAvailable(final byte[] tableName, final byte[][] splitKeys)
1038 throws IOException {
1039 return isTableAvailable(TableName.valueOf(tableName), splitKeys);
1040 }
1041
1042 @Override
1043 public HRegionLocation locateRegion(final byte[] regionName) throws IOException {
1044 RegionLocations locations = locateRegion(HRegionInfo.getTable(regionName),
1045 HRegionInfo.getStartKey(regionName), false, true);
1046 return locations == null ? null : locations.getRegionLocation();
1047 }
1048
1049 @Override
1050 public boolean isDeadServer(ServerName sn) {
1051 if (clusterStatusListener == null) {
1052 return false;
1053 } else {
1054 return clusterStatusListener.isDeadServer(sn);
1055 }
1056 }
1057
1058 @Override
1059 public List<HRegionLocation> locateRegions(final TableName tableName)
1060 throws IOException {
1061 return locateRegions (tableName, false, true);
1062 }
1063
1064 @Override
1065 public List<HRegionLocation> locateRegions(final byte[] tableName)
1066 throws IOException {
1067 return locateRegions(TableName.valueOf(tableName));
1068 }
1069
1070 @Override
1071 public List<HRegionLocation> locateRegions(final TableName tableName,
1072 final boolean useCache, final boolean offlined) throws IOException {
1073 NavigableMap<HRegionInfo, ServerName> regions = MetaScanner.allTableRegions(this, tableName);
1074 final List<HRegionLocation> locations = new ArrayList<HRegionLocation>();
1075 for (HRegionInfo regionInfo : regions.keySet()) {
1076 RegionLocations list = locateRegion(tableName, regionInfo.getStartKey(), useCache, true);
1077 if (list != null) {
1078 for (HRegionLocation loc : list.getRegionLocations()) {
1079 if (loc != null) {
1080 locations.add(loc);
1081 }
1082 }
1083 }
1084 }
1085 return locations;
1086 }
1087
1088 @Override
1089 public List<HRegionLocation> locateRegions(final byte[] tableName,
1090 final boolean useCache, final boolean offlined) throws IOException {
1091 return locateRegions(TableName.valueOf(tableName), useCache, offlined);
1092 }
1093
1094 @Override
1095 public HRegionLocation locateRegion(
1096 final TableName tableName, final byte[] row) throws IOException{
1097 RegionLocations locations = locateRegion(tableName, row, true, true);
1098 return locations == null ? null : locations.getRegionLocation();
1099 }
1100
1101 @Override
1102 public HRegionLocation locateRegion(final byte[] tableName,
1103 final byte [] row)
1104 throws IOException{
1105 return locateRegion(TableName.valueOf(tableName), row);
1106 }
1107
1108 @Override
1109 public HRegionLocation relocateRegion(final TableName tableName,
1110 final byte [] row) throws IOException{
1111 RegionLocations locations = relocateRegion(tableName, row,
1112 RegionReplicaUtil.DEFAULT_REPLICA_ID);
1113 return locations == null ? null :
1114 locations.getRegionLocation(RegionReplicaUtil.DEFAULT_REPLICA_ID);
1115 }
1116
1117 @Override
1118 public RegionLocations relocateRegion(final TableName tableName,
1119 final byte [] row, int replicaId) throws IOException{
1120
1121
1122
1123 if (!tableName.equals(TableName.META_TABLE_NAME) && isTableDisabled(tableName)) {
1124 throw new TableNotEnabledException(tableName.getNameAsString() + " is disabled.");
1125 }
1126
1127 return locateRegion(tableName, row, false, true, replicaId);
1128 }
1129
1130 @Override
1131 public HRegionLocation relocateRegion(final byte[] tableName,
1132 final byte [] row) throws IOException {
1133 return relocateRegion(TableName.valueOf(tableName), row);
1134 }
1135
1136 @Override
1137 public RegionLocations locateRegion(final TableName tableName,
1138 final byte [] row, boolean useCache, boolean retry)
1139 throws IOException {
1140 return locateRegion(tableName, row, useCache, retry, RegionReplicaUtil.DEFAULT_REPLICA_ID);
1141 }
1142
1143 @Override
1144 public RegionLocations locateRegion(final TableName tableName,
1145 final byte [] row, boolean useCache, boolean retry, int replicaId)
1146 throws IOException {
1147 if (this.closed) throw new IOException(toString() + " closed");
1148 if (tableName== null || tableName.getName().length == 0) {
1149 throw new IllegalArgumentException(
1150 "table name cannot be null or zero length");
1151 }
1152 if (tableName.equals(TableName.META_TABLE_NAME)) {
1153 return locateMeta(tableName, useCache, replicaId);
1154 } else {
1155
1156 return locateRegionInMeta(tableName, row, useCache, retry, replicaId);
1157 }
1158 }
1159
1160 private RegionLocations locateMeta(final TableName tableName,
1161 boolean useCache, int replicaId) throws IOException {
1162
1163
1164
1165 byte[] metaCacheKey = HConstants.EMPTY_START_ROW;
1166 RegionLocations locations = null;
1167 if (useCache) {
1168 locations = getCachedLocation(tableName, metaCacheKey);
1169 if (locations != null && locations.getRegionLocation(replicaId) != null) {
1170 return locations;
1171 }
1172 }
1173
1174
1175 synchronized (metaRegionLock) {
1176
1177
1178 if (useCache) {
1179 locations = getCachedLocation(tableName, metaCacheKey);
1180 if (locations != null && locations.getRegionLocation(replicaId) != null) {
1181 return locations;
1182 }
1183 }
1184
1185
1186 locations = this.registry.getMetaRegionLocation();
1187 if (locations != null) {
1188 cacheLocation(tableName, locations);
1189 }
1190 }
1191 return locations;
1192 }
1193
1194
1195
1196
1197
1198 private RegionLocations locateRegionInMeta(TableName tableName, byte[] row,
1199 boolean useCache, boolean retry, int replicaId) throws IOException {
1200
1201
1202
1203 if (useCache) {
1204 RegionLocations locations = getCachedLocation(tableName, row);
1205 if (locations != null && locations.getRegionLocation(replicaId) != null) {
1206 return locations;
1207 }
1208 }
1209
1210
1211
1212
1213 byte[] metaKey = HRegionInfo.createRegionName(tableName, row, HConstants.NINES, false);
1214
1215 Scan s = new Scan();
1216 s.setReversed(true);
1217 s.setStartRow(metaKey);
1218 s.setSmall(true);
1219 s.setCaching(1);
1220 if (this.useMetaReplicas) {
1221 s.setConsistency(Consistency.TIMELINE);
1222 }
1223
1224 int localNumRetries = (retry ? numTries : 1);
1225
1226 for (int tries = 0; true; tries++) {
1227 if (tries >= localNumRetries) {
1228 throw new NoServerForRegionException("Unable to find region for "
1229 + Bytes.toStringBinary(row) + " in " + tableName +
1230 " after " + localNumRetries + " tries.");
1231 }
1232 if (useCache) {
1233 RegionLocations locations = getCachedLocation(tableName, row);
1234 if (locations != null && locations.getRegionLocation(replicaId) != null) {
1235 return locations;
1236 }
1237 } else {
1238
1239
1240 metaCache.clearCache(tableName, row);
1241 }
1242
1243
1244 try {
1245 Result regionInfoRow = null;
1246 ReversedClientScanner rcs = null;
1247 try {
1248 rcs = new ClientSmallReversedScanner(conf, s, TableName.META_TABLE_NAME, this,
1249 rpcCallerFactory, rpcControllerFactory, getMetaLookupPool(), 0);
1250 regionInfoRow = rcs.next();
1251 } finally {
1252 if (rcs != null) {
1253 rcs.close();
1254 }
1255 }
1256
1257 if (regionInfoRow == null) {
1258 throw new TableNotFoundException(tableName);
1259 }
1260
1261
1262 RegionLocations locations = MetaTableAccessor.getRegionLocations(regionInfoRow);
1263 if (locations == null || locations.getRegionLocation(replicaId) == null) {
1264 throw new IOException("HRegionInfo was null in " +
1265 tableName + ", row=" + regionInfoRow);
1266 }
1267 HRegionInfo regionInfo = locations.getRegionLocation(replicaId).getRegionInfo();
1268 if (regionInfo == null) {
1269 throw new IOException("HRegionInfo was null or empty in " +
1270 TableName.META_TABLE_NAME + ", row=" + regionInfoRow);
1271 }
1272
1273
1274 if (!regionInfo.getTable().equals(tableName)) {
1275 throw new TableNotFoundException(
1276 "Table '" + tableName + "' was not found, got: " +
1277 regionInfo.getTable() + ".");
1278 }
1279 if (regionInfo.isSplit()) {
1280 throw new RegionOfflineException("the only available region for" +
1281 " the required row is a split parent," +
1282 " the daughters should be online soon: " +
1283 regionInfo.getRegionNameAsString());
1284 }
1285 if (regionInfo.isOffline()) {
1286 throw new RegionOfflineException("the region is offline, could" +
1287 " be caused by a disable table call: " +
1288 regionInfo.getRegionNameAsString());
1289 }
1290
1291 ServerName serverName = locations.getRegionLocation(replicaId).getServerName();
1292 if (serverName == null) {
1293 throw new NoServerForRegionException("No server address listed " +
1294 "in " + TableName.META_TABLE_NAME + " for region " +
1295 regionInfo.getRegionNameAsString() + " containing row " +
1296 Bytes.toStringBinary(row));
1297 }
1298
1299 if (isDeadServer(serverName)){
1300 throw new RegionServerStoppedException("hbase:meta says the region "+
1301 regionInfo.getRegionNameAsString()+" is managed by the server " + serverName +
1302 ", but it is dead.");
1303 }
1304
1305 cacheLocation(tableName, locations);
1306 return locations;
1307 } catch (TableNotFoundException e) {
1308
1309
1310
1311 throw e;
1312 } catch (IOException e) {
1313 ExceptionUtil.rethrowIfInterrupt(e);
1314
1315 if (e instanceof RemoteException) {
1316 e = ((RemoteException)e).unwrapRemoteException();
1317 }
1318 if (tries < localNumRetries - 1) {
1319 if (LOG.isDebugEnabled()) {
1320 LOG.debug("locateRegionInMeta parentTable=" +
1321 TableName.META_TABLE_NAME + ", metaLocation=" +
1322 ", attempt=" + tries + " of " +
1323 localNumRetries + " failed; retrying after sleep of " +
1324 ConnectionUtils.getPauseTime(this.pause, tries) + " because: " + e.getMessage());
1325 }
1326 } else {
1327 throw e;
1328 }
1329
1330 if(!(e instanceof RegionOfflineException ||
1331 e instanceof NoServerForRegionException)) {
1332 relocateRegion(TableName.META_TABLE_NAME, metaKey, replicaId);
1333 }
1334 }
1335 try{
1336 Thread.sleep(ConnectionUtils.getPauseTime(this.pause, tries));
1337 } catch (InterruptedException e) {
1338 throw new InterruptedIOException("Giving up trying to location region in " +
1339 "meta: thread is interrupted.");
1340 }
1341 }
1342 }
1343
1344
1345
1346
1347
1348
1349 @Override
1350 public void cacheLocation(final TableName tableName, final RegionLocations location) {
1351 metaCache.cacheLocation(tableName, location);
1352 }
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362 RegionLocations getCachedLocation(final TableName tableName,
1363 final byte [] row) {
1364 return metaCache.getCachedLocation(tableName, row);
1365 }
1366
1367 public void clearRegionCache(final TableName tableName, byte[] row) {
1368 metaCache.clearCache(tableName, row);
1369 }
1370
1371
1372
1373
1374 @Override
1375 public void clearCaches(final ServerName serverName) {
1376 metaCache.clearCache(serverName);
1377 }
1378
1379 @Override
1380 public void clearRegionCache() {
1381 metaCache.clearCache();
1382 }
1383
1384 @Override
1385 public void clearRegionCache(final TableName tableName) {
1386 metaCache.clearCache(tableName);
1387 }
1388
1389 @Override
1390 public void clearRegionCache(final byte[] tableName) {
1391 clearRegionCache(TableName.valueOf(tableName));
1392 }
1393
1394
1395
1396
1397
1398
1399
1400 private void cacheLocation(final TableName tableName, final ServerName source,
1401 final HRegionLocation location) {
1402 metaCache.cacheLocation(tableName, source, location);
1403 }
1404
1405
1406 private final ConcurrentHashMap<String, Object> stubs =
1407 new ConcurrentHashMap<String, Object>();
1408
1409 private final ConcurrentHashMap<String, String> connectionLock =
1410 new ConcurrentHashMap<String, String>();
1411
1412
1413
1414
1415 static class MasterServiceState {
1416 HConnection connection;
1417 MasterService.BlockingInterface stub;
1418 int userCount;
1419
1420 MasterServiceState (final HConnection connection) {
1421 super();
1422 this.connection = connection;
1423 }
1424
1425 @Override
1426 public String toString() {
1427 return "MasterService";
1428 }
1429
1430 Object getStub() {
1431 return this.stub;
1432 }
1433
1434 void clearStub() {
1435 this.stub = null;
1436 }
1437
1438 boolean isMasterRunning() throws ServiceException {
1439 IsMasterRunningResponse response =
1440 this.stub.isMasterRunning(null, RequestConverter.buildIsMasterRunningRequest());
1441 return response != null? response.getIsMasterRunning(): false;
1442 }
1443 }
1444
1445
1446
1447
1448
1449
1450 abstract class StubMaker {
1451
1452
1453
1454 protected abstract String getServiceName();
1455
1456
1457
1458
1459
1460 protected abstract Object makeStub(final BlockingRpcChannel channel);
1461
1462
1463
1464
1465
1466 protected abstract void isMasterRunning() throws ServiceException;
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476 private Object makeStubNoRetries() throws IOException, KeeperException, ServiceException {
1477 ZooKeeperKeepAliveConnection zkw;
1478 try {
1479 zkw = getKeepAliveZooKeeperWatcher();
1480 } catch (IOException e) {
1481 ExceptionUtil.rethrowIfInterrupt(e);
1482 throw new ZooKeeperConnectionException("Can't connect to ZooKeeper", e);
1483 }
1484 try {
1485 checkIfBaseNodeAvailable(zkw);
1486 ServerName sn = MasterAddressTracker.getMasterAddress(zkw);
1487 if (sn == null) {
1488 String msg = "ZooKeeper available but no active master location found";
1489 LOG.info(msg);
1490 throw new MasterNotRunningException(msg);
1491 }
1492 if (isDeadServer(sn)) {
1493 throw new MasterNotRunningException(sn + " is dead.");
1494 }
1495
1496 String key = getStubKey(getServiceName(), sn.getHostname(), sn.getPort());
1497 connectionLock.putIfAbsent(key, key);
1498 Object stub = null;
1499 synchronized (connectionLock.get(key)) {
1500 stub = stubs.get(key);
1501 if (stub == null) {
1502 BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(sn, user, rpcTimeout);
1503 stub = makeStub(channel);
1504 isMasterRunning();
1505 stubs.put(key, stub);
1506 }
1507 }
1508 return stub;
1509 } finally {
1510 zkw.close();
1511 }
1512 }
1513
1514
1515
1516
1517
1518
1519 Object makeStub() throws IOException {
1520
1521
1522 synchronized (masterAndZKLock) {
1523 Exception exceptionCaught = null;
1524 if (!closed) {
1525 try {
1526 return makeStubNoRetries();
1527 } catch (IOException e) {
1528 exceptionCaught = e;
1529 } catch (KeeperException e) {
1530 exceptionCaught = e;
1531 } catch (ServiceException e) {
1532 exceptionCaught = e;
1533 }
1534
1535 throw new MasterNotRunningException(exceptionCaught);
1536 } else {
1537 throw new DoNotRetryIOException("Connection was closed while trying to get master");
1538 }
1539 }
1540 }
1541 }
1542
1543
1544
1545
1546 class MasterServiceStubMaker extends StubMaker {
1547 private MasterService.BlockingInterface stub;
1548 @Override
1549 protected String getServiceName() {
1550 return MasterService.getDescriptor().getName();
1551 }
1552
1553 @Override
1554 MasterService.BlockingInterface makeStub() throws IOException {
1555 return (MasterService.BlockingInterface)super.makeStub();
1556 }
1557
1558 @Override
1559 protected Object makeStub(BlockingRpcChannel channel) {
1560 this.stub = MasterService.newBlockingStub(channel);
1561 return this.stub;
1562 }
1563
1564 @Override
1565 protected void isMasterRunning() throws ServiceException {
1566 this.stub.isMasterRunning(null, RequestConverter.buildIsMasterRunningRequest());
1567 }
1568 }
1569
1570 @Override
1571 public AdminService.BlockingInterface getAdmin(final ServerName serverName)
1572 throws IOException {
1573 return getAdmin(serverName, false);
1574 }
1575
1576 @Override
1577
1578 public AdminService.BlockingInterface getAdmin(final ServerName serverName,
1579 final boolean master)
1580 throws IOException {
1581 if (isDeadServer(serverName)) {
1582 throw new RegionServerStoppedException(serverName + " is dead.");
1583 }
1584 String key = getStubKey(AdminService.BlockingInterface.class.getName(),
1585 serverName.getHostname(), serverName.getPort());
1586 this.connectionLock.putIfAbsent(key, key);
1587 AdminService.BlockingInterface stub = null;
1588 synchronized (this.connectionLock.get(key)) {
1589 stub = (AdminService.BlockingInterface)this.stubs.get(key);
1590 if (stub == null) {
1591 BlockingRpcChannel channel =
1592 this.rpcClient.createBlockingRpcChannel(serverName, user, rpcTimeout);
1593 stub = AdminService.newBlockingStub(channel);
1594 this.stubs.put(key, stub);
1595 }
1596 }
1597 return stub;
1598 }
1599
1600 @Override
1601 public ClientService.BlockingInterface getClient(final ServerName sn)
1602 throws IOException {
1603 if (isDeadServer(sn)) {
1604 throw new RegionServerStoppedException(sn + " is dead.");
1605 }
1606 String key = getStubKey(ClientService.BlockingInterface.class.getName(), sn.getHostname(),
1607 sn.getPort());
1608 this.connectionLock.putIfAbsent(key, key);
1609 ClientService.BlockingInterface stub = null;
1610 synchronized (this.connectionLock.get(key)) {
1611 stub = (ClientService.BlockingInterface)this.stubs.get(key);
1612 if (stub == null) {
1613 BlockingRpcChannel channel =
1614 this.rpcClient.createBlockingRpcChannel(sn, user, rpcTimeout);
1615 stub = ClientService.newBlockingStub(channel);
1616
1617
1618 this.stubs.put(key, stub);
1619 }
1620 }
1621 return stub;
1622 }
1623
1624 static String getStubKey(final String serviceName, final String rsHostname, int port) {
1625
1626
1627
1628
1629
1630 InetAddress i = new InetSocketAddress(rsHostname, port).getAddress();
1631 String address = rsHostname;
1632 if (i != null) {
1633 address = i.getHostAddress() + "-" + rsHostname;
1634 }
1635 return serviceName + "@" + address + ":" + port;
1636 }
1637
1638 private ZooKeeperKeepAliveConnection keepAliveZookeeper;
1639 private AtomicInteger keepAliveZookeeperUserCount = new AtomicInteger(0);
1640 private boolean canCloseZKW = true;
1641
1642
1643 private static final long keepAlive = 5 * 60 * 1000;
1644
1645
1646
1647
1648
1649 ZooKeeperKeepAliveConnection getKeepAliveZooKeeperWatcher()
1650 throws IOException {
1651 synchronized (masterAndZKLock) {
1652 if (keepAliveZookeeper == null) {
1653 if (this.closed) {
1654 throw new IOException(toString() + " closed");
1655 }
1656
1657
1658 keepAliveZookeeper = new ZooKeeperKeepAliveConnection(conf, this.toString(), this);
1659 }
1660 keepAliveZookeeperUserCount.addAndGet(1);
1661 keepZooKeeperWatcherAliveUntil = Long.MAX_VALUE;
1662 return keepAliveZookeeper;
1663 }
1664 }
1665
1666 void releaseZooKeeperWatcher(final ZooKeeperWatcher zkw) {
1667 if (zkw == null){
1668 return;
1669 }
1670 if (keepAliveZookeeperUserCount.addAndGet(-1) <= 0 ){
1671 keepZooKeeperWatcherAliveUntil = System.currentTimeMillis() + keepAlive;
1672 }
1673 }
1674
1675 private void closeZooKeeperWatcher() {
1676 synchronized (masterAndZKLock) {
1677 if (keepAliveZookeeper != null) {
1678 LOG.info("Closing zookeeper sessionid=0x" +
1679 Long.toHexString(
1680 keepAliveZookeeper.getRecoverableZooKeeper().getSessionId()));
1681 keepAliveZookeeper.internalClose();
1682 keepAliveZookeeper = null;
1683 }
1684 keepAliveZookeeperUserCount.set(0);
1685 }
1686 }
1687
1688 final MasterServiceState masterServiceState = new MasterServiceState(this);
1689
1690 @Override
1691 public MasterService.BlockingInterface getMaster() throws MasterNotRunningException {
1692 return getKeepAliveMasterService();
1693 }
1694
1695 private void resetMasterServiceState(final MasterServiceState mss) {
1696 mss.userCount++;
1697 }
1698
1699 @Override
1700 public MasterKeepAliveConnection getKeepAliveMasterService()
1701 throws MasterNotRunningException {
1702 synchronized (masterAndZKLock) {
1703 if (!isKeepAliveMasterConnectedAndRunning(this.masterServiceState)) {
1704 MasterServiceStubMaker stubMaker = new MasterServiceStubMaker();
1705 try {
1706 this.masterServiceState.stub = stubMaker.makeStub();
1707 } catch (MasterNotRunningException ex) {
1708 throw ex;
1709 } catch (IOException e) {
1710
1711 throw new MasterNotRunningException(e);
1712 }
1713 }
1714 resetMasterServiceState(this.masterServiceState);
1715 }
1716
1717 final MasterService.BlockingInterface stub = this.masterServiceState.stub;
1718 return new MasterKeepAliveConnection() {
1719 MasterServiceState mss = masterServiceState;
1720 @Override
1721 public MasterProtos.AbortProcedureResponse abortProcedure(
1722 RpcController controller,
1723 MasterProtos.AbortProcedureRequest request) throws ServiceException {
1724 return stub.abortProcedure(controller, request);
1725 }
1726 @Override
1727 public MasterProtos.ListProceduresResponse listProcedures(
1728 RpcController controller,
1729 MasterProtos.ListProceduresRequest request) throws ServiceException {
1730 return stub.listProcedures(controller, request);
1731 }
1732 @Override
1733 public AddColumnResponse addColumn(RpcController controller, AddColumnRequest request)
1734 throws ServiceException {
1735 return stub.addColumn(controller, request);
1736 }
1737
1738 @Override
1739 public DeleteColumnResponse deleteColumn(RpcController controller,
1740 DeleteColumnRequest request)
1741 throws ServiceException {
1742 return stub.deleteColumn(controller, request);
1743 }
1744
1745 @Override
1746 public ModifyColumnResponse modifyColumn(RpcController controller,
1747 ModifyColumnRequest request)
1748 throws ServiceException {
1749 return stub.modifyColumn(controller, request);
1750 }
1751
1752 @Override
1753 public MoveRegionResponse moveRegion(RpcController controller,
1754 MoveRegionRequest request) throws ServiceException {
1755 return stub.moveRegion(controller, request);
1756 }
1757
1758 @Override
1759 public DispatchMergingRegionsResponse dispatchMergingRegions(
1760 RpcController controller, DispatchMergingRegionsRequest request)
1761 throws ServiceException {
1762 return stub.dispatchMergingRegions(controller, request);
1763 }
1764
1765 @Override
1766 public AssignRegionResponse assignRegion(RpcController controller,
1767 AssignRegionRequest request) throws ServiceException {
1768 return stub.assignRegion(controller, request);
1769 }
1770
1771 @Override
1772 public UnassignRegionResponse unassignRegion(RpcController controller,
1773 UnassignRegionRequest request) throws ServiceException {
1774 return stub.unassignRegion(controller, request);
1775 }
1776
1777 @Override
1778 public OfflineRegionResponse offlineRegion(RpcController controller,
1779 OfflineRegionRequest request) throws ServiceException {
1780 return stub.offlineRegion(controller, request);
1781 }
1782
1783 @Override
1784 public DeleteTableResponse deleteTable(RpcController controller,
1785 DeleteTableRequest request) throws ServiceException {
1786 return stub.deleteTable(controller, request);
1787 }
1788
1789 @Override
1790 public TruncateTableResponse truncateTable(RpcController controller,
1791 TruncateTableRequest request) throws ServiceException {
1792 return stub.truncateTable(controller, request);
1793 }
1794
1795 @Override
1796 public EnableTableResponse enableTable(RpcController controller,
1797 EnableTableRequest request) throws ServiceException {
1798 return stub.enableTable(controller, request);
1799 }
1800
1801 @Override
1802 public DisableTableResponse disableTable(RpcController controller,
1803 DisableTableRequest request) throws ServiceException {
1804 return stub.disableTable(controller, request);
1805 }
1806
1807 @Override
1808 public ModifyTableResponse modifyTable(RpcController controller,
1809 ModifyTableRequest request) throws ServiceException {
1810 return stub.modifyTable(controller, request);
1811 }
1812
1813 @Override
1814 public CreateTableResponse createTable(RpcController controller,
1815 CreateTableRequest request) throws ServiceException {
1816 return stub.createTable(controller, request);
1817 }
1818
1819 @Override
1820 public ShutdownResponse shutdown(RpcController controller,
1821 ShutdownRequest request) throws ServiceException {
1822 return stub.shutdown(controller, request);
1823 }
1824
1825 @Override
1826 public StopMasterResponse stopMaster(RpcController controller,
1827 StopMasterRequest request) throws ServiceException {
1828 return stub.stopMaster(controller, request);
1829 }
1830
1831 @Override
1832 public BalanceResponse balance(RpcController controller,
1833 BalanceRequest request) throws ServiceException {
1834 return stub.balance(controller, request);
1835 }
1836
1837 @Override
1838 public SetBalancerRunningResponse setBalancerRunning(
1839 RpcController controller, SetBalancerRunningRequest request)
1840 throws ServiceException {
1841 return stub.setBalancerRunning(controller, request);
1842 }
1843
1844 @Override
1845 public RunCatalogScanResponse runCatalogScan(RpcController controller,
1846 RunCatalogScanRequest request) throws ServiceException {
1847 return stub.runCatalogScan(controller, request);
1848 }
1849
1850 @Override
1851 public EnableCatalogJanitorResponse enableCatalogJanitor(
1852 RpcController controller, EnableCatalogJanitorRequest request)
1853 throws ServiceException {
1854 return stub.enableCatalogJanitor(controller, request);
1855 }
1856
1857 @Override
1858 public IsCatalogJanitorEnabledResponse isCatalogJanitorEnabled(
1859 RpcController controller, IsCatalogJanitorEnabledRequest request)
1860 throws ServiceException {
1861 return stub.isCatalogJanitorEnabled(controller, request);
1862 }
1863
1864 @Override
1865 public CoprocessorServiceResponse execMasterService(
1866 RpcController controller, CoprocessorServiceRequest request)
1867 throws ServiceException {
1868 return stub.execMasterService(controller, request);
1869 }
1870
1871 @Override
1872 public SnapshotResponse snapshot(RpcController controller,
1873 SnapshotRequest request) throws ServiceException {
1874 return stub.snapshot(controller, request);
1875 }
1876
1877 @Override
1878 public GetCompletedSnapshotsResponse getCompletedSnapshots(
1879 RpcController controller, GetCompletedSnapshotsRequest request)
1880 throws ServiceException {
1881 return stub.getCompletedSnapshots(controller, request);
1882 }
1883
1884 @Override
1885 public DeleteSnapshotResponse deleteSnapshot(RpcController controller,
1886 DeleteSnapshotRequest request) throws ServiceException {
1887 return stub.deleteSnapshot(controller, request);
1888 }
1889
1890 @Override
1891 public IsSnapshotDoneResponse isSnapshotDone(RpcController controller,
1892 IsSnapshotDoneRequest request) throws ServiceException {
1893 return stub.isSnapshotDone(controller, request);
1894 }
1895
1896 @Override
1897 public RestoreSnapshotResponse restoreSnapshot(
1898 RpcController controller, RestoreSnapshotRequest request)
1899 throws ServiceException {
1900 return stub.restoreSnapshot(controller, request);
1901 }
1902
1903 @Override
1904 public IsRestoreSnapshotDoneResponse isRestoreSnapshotDone(
1905 RpcController controller, IsRestoreSnapshotDoneRequest request)
1906 throws ServiceException {
1907 return stub.isRestoreSnapshotDone(controller, request);
1908 }
1909
1910 @Override
1911 public ExecProcedureResponse execProcedure(
1912 RpcController controller, ExecProcedureRequest request)
1913 throws ServiceException {
1914 return stub.execProcedure(controller, request);
1915 }
1916
1917 @Override
1918 public ExecProcedureResponse execProcedureWithRet(
1919 RpcController controller, ExecProcedureRequest request)
1920 throws ServiceException {
1921 return stub.execProcedureWithRet(controller, request);
1922 }
1923
1924 @Override
1925 public IsProcedureDoneResponse isProcedureDone(RpcController controller,
1926 IsProcedureDoneRequest request) throws ServiceException {
1927 return stub.isProcedureDone(controller, request);
1928 }
1929
1930 @Override
1931 public GetProcedureResultResponse getProcedureResult(RpcController controller,
1932 GetProcedureResultRequest request) throws ServiceException {
1933 return stub.getProcedureResult(controller, request);
1934 }
1935
1936 @Override
1937 public IsMasterRunningResponse isMasterRunning(
1938 RpcController controller, IsMasterRunningRequest request)
1939 throws ServiceException {
1940 return stub.isMasterRunning(controller, request);
1941 }
1942
1943 @Override
1944 public ModifyNamespaceResponse modifyNamespace(RpcController controller,
1945 ModifyNamespaceRequest request)
1946 throws ServiceException {
1947 return stub.modifyNamespace(controller, request);
1948 }
1949
1950 @Override
1951 public CreateNamespaceResponse createNamespace(
1952 RpcController controller, CreateNamespaceRequest request) throws ServiceException {
1953 return stub.createNamespace(controller, request);
1954 }
1955
1956 @Override
1957 public DeleteNamespaceResponse deleteNamespace(
1958 RpcController controller, DeleteNamespaceRequest request) throws ServiceException {
1959 return stub.deleteNamespace(controller, request);
1960 }
1961
1962 @Override
1963 public GetNamespaceDescriptorResponse getNamespaceDescriptor(RpcController controller,
1964 GetNamespaceDescriptorRequest request) throws ServiceException {
1965 return stub.getNamespaceDescriptor(controller, request);
1966 }
1967
1968 @Override
1969 public ListNamespaceDescriptorsResponse listNamespaceDescriptors(RpcController controller,
1970 ListNamespaceDescriptorsRequest request) throws ServiceException {
1971 return stub.listNamespaceDescriptors(controller, request);
1972 }
1973
1974 @Override
1975 public ListTableDescriptorsByNamespaceResponse listTableDescriptorsByNamespace(
1976 RpcController controller, ListTableDescriptorsByNamespaceRequest request)
1977 throws ServiceException {
1978 return stub.listTableDescriptorsByNamespace(controller, request);
1979 }
1980
1981 @Override
1982 public ListTableNamesByNamespaceResponse listTableNamesByNamespace(
1983 RpcController controller, ListTableNamesByNamespaceRequest request)
1984 throws ServiceException {
1985 return stub.listTableNamesByNamespace(controller, request);
1986 }
1987
1988 @Override
1989 public void close() {
1990 release(this.mss);
1991 }
1992
1993 @Override
1994 public GetSchemaAlterStatusResponse getSchemaAlterStatus(
1995 RpcController controller, GetSchemaAlterStatusRequest request)
1996 throws ServiceException {
1997 return stub.getSchemaAlterStatus(controller, request);
1998 }
1999
2000 @Override
2001 public GetTableDescriptorsResponse getTableDescriptors(
2002 RpcController controller, GetTableDescriptorsRequest request)
2003 throws ServiceException {
2004 return stub.getTableDescriptors(controller, request);
2005 }
2006
2007 @Override
2008 public GetTableNamesResponse getTableNames(
2009 RpcController controller, GetTableNamesRequest request)
2010 throws ServiceException {
2011 return stub.getTableNames(controller, request);
2012 }
2013
2014 @Override
2015 public GetClusterStatusResponse getClusterStatus(
2016 RpcController controller, GetClusterStatusRequest request)
2017 throws ServiceException {
2018 return stub.getClusterStatus(controller, request);
2019 }
2020
2021 @Override
2022 public SetQuotaResponse setQuota(RpcController controller, SetQuotaRequest request)
2023 throws ServiceException {
2024 return stub.setQuota(controller, request);
2025 }
2026
2027 @Override
2028 public MajorCompactionTimestampResponse getLastMajorCompactionTimestamp(
2029 RpcController controller, MajorCompactionTimestampRequest request)
2030 throws ServiceException {
2031 return stub.getLastMajorCompactionTimestamp(controller, request);
2032 }
2033
2034 @Override
2035 public MajorCompactionTimestampResponse getLastMajorCompactionTimestampForRegion(
2036 RpcController controller, MajorCompactionTimestampForRegionRequest request)
2037 throws ServiceException {
2038 return stub.getLastMajorCompactionTimestampForRegion(controller, request);
2039 }
2040
2041 @Override
2042 public IsBalancerEnabledResponse isBalancerEnabled(RpcController controller,
2043 IsBalancerEnabledRequest request) throws ServiceException {
2044 return stub.isBalancerEnabled(controller, request);
2045 }
2046 };
2047 }
2048
2049
2050 private static void release(MasterServiceState mss) {
2051 if (mss != null && mss.connection != null) {
2052 ((HConnectionImplementation)mss.connection).releaseMaster(mss);
2053 }
2054 }
2055
2056 private boolean isKeepAliveMasterConnectedAndRunning(MasterServiceState mss) {
2057 if (mss.getStub() == null){
2058 return false;
2059 }
2060 try {
2061 return mss.isMasterRunning();
2062 } catch (UndeclaredThrowableException e) {
2063
2064
2065 LOG.info("Master connection is not running anymore", e.getUndeclaredThrowable());
2066 return false;
2067 } catch (ServiceException se) {
2068 LOG.warn("Checking master connection", se);
2069 return false;
2070 }
2071 }
2072
2073 void releaseMaster(MasterServiceState mss) {
2074 if (mss.getStub() == null) return;
2075 synchronized (masterAndZKLock) {
2076 --mss.userCount;
2077 }
2078 }
2079
2080 private void closeMasterService(MasterServiceState mss) {
2081 if (mss.getStub() != null) {
2082 LOG.info("Closing master protocol: " + mss);
2083 mss.clearStub();
2084 }
2085 mss.userCount = 0;
2086 }
2087
2088
2089
2090
2091
2092 private void closeMaster() {
2093 synchronized (masterAndZKLock) {
2094 closeMasterService(masterServiceState);
2095 }
2096 }
2097
2098 void updateCachedLocation(HRegionInfo hri, ServerName source,
2099 ServerName serverName, long seqNum) {
2100 HRegionLocation newHrl = new HRegionLocation(hri, serverName, seqNum);
2101 cacheLocation(hri.getTable(), source, newHrl);
2102 }
2103
2104 @Override
2105 public void deleteCachedRegionLocation(final HRegionLocation location) {
2106 metaCache.clearCache(location);
2107 }
2108
2109 @Override
2110 public void updateCachedLocations(final TableName tableName, byte[] rowkey,
2111 final Object exception, final HRegionLocation source) {
2112 assert source != null;
2113 updateCachedLocations(tableName, source.getRegionInfo().getRegionName()
2114 , rowkey, exception, source.getServerName());
2115 }
2116
2117
2118
2119
2120
2121
2122
2123
2124
2125 @Override
2126 public void updateCachedLocations(final TableName tableName, byte[] regionName, byte[] rowkey,
2127 final Object exception, final ServerName source) {
2128 if (rowkey == null || tableName == null) {
2129 LOG.warn("Coding error, see method javadoc. row=" + (rowkey == null ? "null" : rowkey) +
2130 ", tableName=" + (tableName == null ? "null" : tableName));
2131 return;
2132 }
2133
2134 if (source == null) {
2135
2136 return;
2137 }
2138
2139 if (regionName == null) {
2140
2141 metaCache.clearCache(tableName, rowkey, source);
2142 return;
2143 }
2144
2145
2146 final RegionLocations oldLocations = getCachedLocation(tableName, rowkey);
2147 HRegionLocation oldLocation = null;
2148 if (oldLocations != null) {
2149 oldLocation = oldLocations.getRegionLocationByRegionName(regionName);
2150 }
2151 if (oldLocation == null || !source.equals(oldLocation.getServerName())) {
2152
2153
2154 return;
2155 }
2156
2157 HRegionInfo regionInfo = oldLocation.getRegionInfo();
2158 Throwable cause = findException(exception);
2159 if (cause != null) {
2160 if (cause instanceof RegionTooBusyException || cause instanceof RegionOpeningException) {
2161
2162 return;
2163 }
2164
2165 if (cause instanceof RegionMovedException) {
2166 RegionMovedException rme = (RegionMovedException) cause;
2167 if (LOG.isTraceEnabled()) {
2168 LOG.trace("Region " + regionInfo.getRegionNameAsString() + " moved to " +
2169 rme.getHostname() + ":" + rme.getPort() +
2170 " according to " + source.getHostAndPort());
2171 }
2172
2173
2174 updateCachedLocation(
2175 regionInfo, source, rme.getServerName(), rme.getLocationSeqNum());
2176 return;
2177 }
2178 }
2179
2180
2181
2182 metaCache.clearCache(regionInfo);
2183 }
2184
2185 @Override
2186 public void updateCachedLocations(final byte[] tableName, byte[] rowkey,
2187 final Object exception, final HRegionLocation source) {
2188 updateCachedLocations(TableName.valueOf(tableName), rowkey, exception, source);
2189 }
2190
2191 @Override
2192 @Deprecated
2193 public void processBatch(List<? extends Row> list,
2194 final TableName tableName,
2195 ExecutorService pool,
2196 Object[] results) throws IOException, InterruptedException {
2197
2198
2199
2200 if (results.length != list.size()) {
2201 throw new IllegalArgumentException(
2202 "argument results must be the same size as argument list");
2203 }
2204 processBatchCallback(list, tableName, pool, results, null);
2205 }
2206
2207 @Override
2208 @Deprecated
2209 public void processBatch(List<? extends Row> list,
2210 final byte[] tableName,
2211 ExecutorService pool,
2212 Object[] results) throws IOException, InterruptedException {
2213 processBatch(list, TableName.valueOf(tableName), pool, results);
2214 }
2215
2216
2217
2218
2219
2220
2221
2222
2223 @Override
2224 @Deprecated
2225 public <R> void processBatchCallback(
2226 List<? extends Row> list,
2227 TableName tableName,
2228 ExecutorService pool,
2229 Object[] results,
2230 Batch.Callback<R> callback)
2231 throws IOException, InterruptedException {
2232
2233 AsyncRequestFuture ars = this.asyncProcess.submitAll(
2234 pool, tableName, list, callback, results);
2235 ars.waitUntilDone();
2236 if (ars.hasError()) {
2237 throw ars.getErrors();
2238 }
2239 }
2240
2241 @Override
2242 @Deprecated
2243 public <R> void processBatchCallback(
2244 List<? extends Row> list,
2245 byte[] tableName,
2246 ExecutorService pool,
2247 Object[] results,
2248 Batch.Callback<R> callback)
2249 throws IOException, InterruptedException {
2250 processBatchCallback(list, TableName.valueOf(tableName), pool, results, callback);
2251 }
2252
2253
2254 protected AsyncProcess createAsyncProcess(Configuration conf) {
2255
2256 return new AsyncProcess(this, conf, this.batchPool,
2257 RpcRetryingCallerFactory.instantiate(conf, this.getStatisticsTracker()), false,
2258 RpcControllerFactory.instantiate(conf));
2259 }
2260
2261 @Override
2262 public AsyncProcess getAsyncProcess() {
2263 return asyncProcess;
2264 }
2265
2266 @Override
2267 public ServerStatisticTracker getStatisticsTracker() {
2268 return this.stats;
2269 }
2270
2271 @Override
2272 public ClientBackoffPolicy getBackoffPolicy() {
2273 return this.backoffPolicy;
2274 }
2275
2276
2277
2278
2279
2280 @VisibleForTesting
2281 int getNumberOfCachedRegionLocations(final TableName tableName) {
2282 return metaCache.getNumberOfCachedRegionLocations(tableName);
2283 }
2284
2285 @Override
2286 @Deprecated
2287 public void setRegionCachePrefetch(final TableName tableName, final boolean enable) {
2288 }
2289
2290 @Override
2291 @Deprecated
2292 public void setRegionCachePrefetch(final byte[] tableName,
2293 final boolean enable) {
2294 }
2295
2296 @Override
2297 @Deprecated
2298 public boolean getRegionCachePrefetch(TableName tableName) {
2299 return false;
2300 }
2301
2302 @Override
2303 @Deprecated
2304 public boolean getRegionCachePrefetch(byte[] tableName) {
2305 return false;
2306 }
2307
2308 @Override
2309 public void abort(final String msg, Throwable t) {
2310 if (t instanceof KeeperException.SessionExpiredException
2311 && keepAliveZookeeper != null) {
2312 synchronized (masterAndZKLock) {
2313 if (keepAliveZookeeper != null) {
2314 LOG.warn("This client just lost it's session with ZooKeeper," +
2315 " closing it." +
2316 " It will be recreated next time someone needs it", t);
2317 closeZooKeeperWatcher();
2318 }
2319 }
2320 } else {
2321 if (t != null) {
2322 LOG.fatal(msg, t);
2323 } else {
2324 LOG.fatal(msg);
2325 }
2326 this.aborted = true;
2327 close();
2328 this.closed = true;
2329 }
2330 }
2331
2332 @Override
2333 public boolean isClosed() {
2334 return this.closed;
2335 }
2336
2337 @Override
2338 public boolean isAborted(){
2339 return this.aborted;
2340 }
2341
2342 @Override
2343 public int getCurrentNrHRS() throws IOException {
2344 return this.registry.getCurrentNrHRS();
2345 }
2346
2347
2348
2349
2350 void incCount() {
2351 ++refCount;
2352 }
2353
2354
2355
2356
2357 void decCount() {
2358 if (refCount > 0) {
2359 --refCount;
2360 }
2361 }
2362
2363
2364
2365
2366
2367
2368 boolean isZeroReference() {
2369 return refCount == 0;
2370 }
2371
2372 void internalClose() {
2373 if (this.closed) {
2374 return;
2375 }
2376 closeMaster();
2377 shutdownPools();
2378 this.closed = true;
2379 closeZooKeeperWatcher();
2380 this.stubs.clear();
2381 if (clusterStatusListener != null) {
2382 clusterStatusListener.close();
2383 }
2384 if (rpcClient != null) {
2385 rpcClient.close();
2386 }
2387 }
2388
2389 @Override
2390 public void close() {
2391 if (managed) {
2392 if (aborted) {
2393 ConnectionManager.deleteStaleConnection(this);
2394 } else {
2395 ConnectionManager.deleteConnection(this, false);
2396 }
2397 } else {
2398 internalClose();
2399 }
2400 }
2401
2402
2403
2404
2405
2406
2407
2408
2409
2410
2411
2412
2413 @Override
2414 protected void finalize() throws Throwable {
2415 super.finalize();
2416
2417 refCount = 1;
2418 close();
2419 }
2420
2421
2422
2423
2424 @Deprecated
2425 @Override
2426 public HTableDescriptor[] listTables() throws IOException {
2427 MasterKeepAliveConnection master = getKeepAliveMasterService();
2428 try {
2429 GetTableDescriptorsRequest req =
2430 RequestConverter.buildGetTableDescriptorsRequest((List<TableName>)null);
2431 return ProtobufUtil.getHTableDescriptorArray(master.getTableDescriptors(null, req));
2432 } catch (ServiceException se) {
2433 throw ProtobufUtil.getRemoteException(se);
2434 } finally {
2435 master.close();
2436 }
2437 }
2438
2439
2440
2441
2442 @Deprecated
2443 @Override
2444 public String[] getTableNames() throws IOException {
2445 TableName[] tableNames = listTableNames();
2446 String result[] = new String[tableNames.length];
2447 for (int i = 0; i < tableNames.length; i++) {
2448 result[i] = tableNames[i].getNameAsString();
2449 }
2450 return result;
2451 }
2452
2453
2454
2455
2456 @Deprecated
2457 @Override
2458 public TableName[] listTableNames() throws IOException {
2459 MasterKeepAliveConnection master = getKeepAliveMasterService();
2460 try {
2461 return ProtobufUtil.getTableNameArray(master.getTableNames(null,
2462 GetTableNamesRequest.newBuilder().build())
2463 .getTableNamesList());
2464 } catch (ServiceException se) {
2465 throw ProtobufUtil.getRemoteException(se);
2466 } finally {
2467 master.close();
2468 }
2469 }
2470
2471
2472
2473
2474 @Deprecated
2475 @Override
2476 public HTableDescriptor[] getHTableDescriptorsByTableName(
2477 List<TableName> tableNames) throws IOException {
2478 if (tableNames == null || tableNames.isEmpty()) return new HTableDescriptor[0];
2479 MasterKeepAliveConnection master = getKeepAliveMasterService();
2480 try {
2481 GetTableDescriptorsRequest req =
2482 RequestConverter.buildGetTableDescriptorsRequest(tableNames);
2483 return ProtobufUtil.getHTableDescriptorArray(master.getTableDescriptors(null, req));
2484 } catch (ServiceException se) {
2485 throw ProtobufUtil.getRemoteException(se);
2486 } finally {
2487 master.close();
2488 }
2489 }
2490
2491
2492
2493
2494 @Deprecated
2495 @Override
2496 public HTableDescriptor[] getHTableDescriptors(
2497 List<String> names) throws IOException {
2498 List<TableName> tableNames = new ArrayList<TableName>(names.size());
2499 for(String name : names) {
2500 tableNames.add(TableName.valueOf(name));
2501 }
2502
2503 return getHTableDescriptorsByTableName(tableNames);
2504 }
2505
2506 @Override
2507 public NonceGenerator getNonceGenerator() {
2508 return this.nonceGenerator;
2509 }
2510
2511
2512
2513
2514
2515
2516
2517
2518 @Deprecated
2519 @Override
2520 public HTableDescriptor getHTableDescriptor(final TableName tableName)
2521 throws IOException {
2522 if (tableName == null) return null;
2523 MasterKeepAliveConnection master = getKeepAliveMasterService();
2524 GetTableDescriptorsResponse htds;
2525 try {
2526 GetTableDescriptorsRequest req =
2527 RequestConverter.buildGetTableDescriptorsRequest(tableName);
2528 htds = master.getTableDescriptors(null, req);
2529 } catch (ServiceException se) {
2530 throw ProtobufUtil.getRemoteException(se);
2531 } finally {
2532 master.close();
2533 }
2534 if (!htds.getTableSchemaList().isEmpty()) {
2535 return HTableDescriptor.convert(htds.getTableSchemaList().get(0));
2536 }
2537 throw new TableNotFoundException(tableName.getNameAsString());
2538 }
2539
2540
2541
2542
2543 @Deprecated
2544 @Override
2545 public HTableDescriptor getHTableDescriptor(final byte[] tableName)
2546 throws IOException {
2547 return getHTableDescriptor(TableName.valueOf(tableName));
2548 }
2549
2550 @Override
2551 public RpcRetryingCallerFactory getNewRpcRetryingCallerFactory(Configuration conf) {
2552 return RpcRetryingCallerFactory
2553 .instantiate(conf, this.interceptor, this.getStatisticsTracker());
2554 }
2555
2556 @Override
2557 public boolean isManaged() {
2558 return managed;
2559 }
2560
2561 @Override
2562 public boolean hasCellBlockSupport() {
2563 return this.rpcClient.hasCellBlockSupport();
2564 }
2565
2566 @Override
2567 public ConnectionConfiguration getConnectionConfiguration() {
2568 return this.connectionConfig;
2569 }
2570
2571 @Override
2572 public RpcRetryingCallerFactory getRpcRetryingCallerFactory() {
2573 return this.rpcCallerFactory;
2574 }
2575
2576 @Override
2577 public RpcControllerFactory getRpcControllerFactory() {
2578 return this.rpcControllerFactory;
2579 }
2580 }
2581
2582
2583
2584
2585 static class ServerErrorTracker {
2586
2587 private final ConcurrentMap<ServerName, ServerErrors> errorsByServer =
2588 new ConcurrentHashMap<ServerName, ServerErrors>();
2589 private final long canRetryUntil;
2590 private final int maxRetries;
2591 private final long startTrackingTime;
2592
2593 public ServerErrorTracker(long timeout, int maxRetries) {
2594 this.maxRetries = maxRetries;
2595 this.canRetryUntil = EnvironmentEdgeManager.currentTime() + timeout;
2596 this.startTrackingTime = new Date().getTime();
2597 }
2598
2599
2600
2601
2602 boolean canRetryMore(int numRetry) {
2603
2604 return numRetry < maxRetries || (maxRetries > 1 &&
2605 EnvironmentEdgeManager.currentTime() < this.canRetryUntil);
2606 }
2607
2608
2609
2610
2611
2612
2613
2614
2615 long calculateBackoffTime(ServerName server, long basePause) {
2616 long result;
2617 ServerErrors errorStats = errorsByServer.get(server);
2618 if (errorStats != null) {
2619 result = ConnectionUtils.getPauseTime(basePause, errorStats.retries.get());
2620 } else {
2621 result = 0;
2622 }
2623 return result;
2624 }
2625
2626
2627
2628
2629
2630
2631 void reportServerError(ServerName server) {
2632 ServerErrors errors = errorsByServer.get(server);
2633 if (errors != null) {
2634 errors.addError();
2635 } else {
2636 errors = errorsByServer.putIfAbsent(server, new ServerErrors());
2637 if (errors != null){
2638 errors.addError();
2639 }
2640 }
2641 }
2642
2643 long getStartTrackingTime() {
2644 return startTrackingTime;
2645 }
2646
2647
2648
2649
2650 private static class ServerErrors {
2651 public final AtomicInteger retries = new AtomicInteger(0);
2652
2653 public void addError() {
2654 retries.incrementAndGet();
2655 }
2656 }
2657 }
2658
2659
2660
2661
2662
2663
2664
2665
2666
2667 public static Throwable findException(Object exception) {
2668 if (exception == null || !(exception instanceof Throwable)) {
2669 return null;
2670 }
2671 Throwable cur = (Throwable) exception;
2672 while (cur != null) {
2673 if (cur instanceof RegionMovedException || cur instanceof RegionOpeningException
2674 || cur instanceof RegionTooBusyException) {
2675 return cur;
2676 }
2677 if (cur instanceof RemoteException) {
2678 RemoteException re = (RemoteException) cur;
2679 cur = re.unwrapRemoteException(
2680 RegionOpeningException.class, RegionMovedException.class,
2681 RegionTooBusyException.class);
2682 if (cur == null) {
2683 cur = re.unwrapRemoteException();
2684 }
2685
2686
2687
2688 if (cur == re) {
2689 return null;
2690 }
2691 } else {
2692 cur = cur.getCause();
2693 }
2694 }
2695
2696 return null;
2697 }
2698 }