1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.regionserver;
20
21 import java.io.IOException;
22 import java.io.InterruptedIOException;
23 import java.lang.Thread.UncaughtExceptionHandler;
24 import java.lang.management.ManagementFactory;
25 import java.lang.management.MemoryUsage;
26 import java.lang.reflect.Constructor;
27 import java.net.BindException;
28 import java.net.InetAddress;
29 import java.net.InetSocketAddress;
30 import java.util.ArrayList;
31 import java.util.Collection;
32 import java.util.Collections;
33 import java.util.Comparator;
34 import java.util.HashMap;
35 import java.util.HashSet;
36 import java.util.Iterator;
37 import java.util.List;
38 import java.util.Map;
39 import java.util.Map.Entry;
40 import java.util.Set;
41 import java.util.SortedMap;
42 import java.util.TreeMap;
43 import java.util.TreeSet;
44 import java.util.concurrent.ConcurrentHashMap;
45 import java.util.concurrent.ConcurrentMap;
46 import java.util.concurrent.ConcurrentSkipListMap;
47 import java.util.concurrent.atomic.AtomicBoolean;
48 import java.util.concurrent.atomic.AtomicReference;
49 import java.util.concurrent.locks.ReentrantReadWriteLock;
50
51 import javax.management.MalformedObjectNameException;
52 import javax.management.ObjectName;
53 import javax.servlet.http.HttpServlet;
54
55 import org.apache.commons.lang.math.RandomUtils;
56 import org.apache.commons.logging.Log;
57 import org.apache.commons.logging.LogFactory;
58 import org.apache.hadoop.conf.Configuration;
59 import org.apache.hadoop.fs.FileSystem;
60 import org.apache.hadoop.fs.Path;
61 import org.apache.hadoop.hbase.ChoreService;
62 import org.apache.hadoop.hbase.ClockOutOfSyncException;
63 import org.apache.hadoop.hbase.CoordinatedStateManager;
64 import org.apache.hadoop.hbase.CoordinatedStateManagerFactory;
65 import org.apache.hadoop.hbase.HBaseConfiguration;
66 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
67 import org.apache.hadoop.hbase.HConstants;
68 import org.apache.hadoop.hbase.HRegionInfo;
69 import org.apache.hadoop.hbase.HealthCheckChore;
70 import org.apache.hadoop.hbase.MetaTableAccessor;
71 import org.apache.hadoop.hbase.NotServingRegionException;
72 import org.apache.hadoop.hbase.RemoteExceptionHandler;
73 import org.apache.hadoop.hbase.ScheduledChore;
74 import org.apache.hadoop.hbase.ServerName;
75 import org.apache.hadoop.hbase.Stoppable;
76 import org.apache.hadoop.hbase.TableDescriptors;
77 import org.apache.hadoop.hbase.TableName;
78 import org.apache.hadoop.hbase.YouAreDeadException;
79 import org.apache.hadoop.hbase.ZNodeClearer;
80 import org.apache.hadoop.hbase.classification.InterfaceAudience;
81 import org.apache.hadoop.hbase.client.ClusterConnection;
82 import org.apache.hadoop.hbase.client.ConnectionUtils;
83 import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
84 import org.apache.hadoop.hbase.conf.ConfigurationManager;
85 import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
86 import org.apache.hadoop.hbase.coordination.CloseRegionCoordination;
87 import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination;
88 import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
89 import org.apache.hadoop.hbase.exceptions.RegionMovedException;
90 import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
91 import org.apache.hadoop.hbase.exceptions.UnknownProtocolException;
92 import org.apache.hadoop.hbase.executor.ExecutorService;
93 import org.apache.hadoop.hbase.executor.ExecutorType;
94 import org.apache.hadoop.hbase.fs.HFileSystem;
95 import org.apache.hadoop.hbase.http.InfoServer;
96 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
97 import org.apache.hadoop.hbase.ipc.RpcClient;
98 import org.apache.hadoop.hbase.ipc.RpcClientFactory;
99 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
100 import org.apache.hadoop.hbase.ipc.RpcServerInterface;
101 import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
102 import org.apache.hadoop.hbase.ipc.ServerRpcController;
103 import org.apache.hadoop.hbase.master.HMaster;
104 import org.apache.hadoop.hbase.master.RegionState.State;
105 import org.apache.hadoop.hbase.master.TableLockManager;
106 import org.apache.hadoop.hbase.procedure.RegionServerProcedureManagerHost;
107 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
108 import org.apache.hadoop.hbase.protobuf.RequestConverter;
109 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
110 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceCall;
111 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
112 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
113 import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos;
114 import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.RegionLoad;
115 import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
116 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.Coprocessor;
117 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.Coprocessor.Builder;
118 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair;
119 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionServerInfo;
120 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier;
121 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
122 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdRequest;
123 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdResponse;
124 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerReportRequest;
125 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest;
126 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupResponse;
127 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStatusService;
128 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition;
129 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
130 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorRequest;
131 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest;
132 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse;
133 import org.apache.hadoop.hbase.quotas.RegionServerQuotaManager;
134 import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
135 import org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler;
136 import org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler;
137 import org.apache.hadoop.hbase.regionserver.handler.RegionReplicaFlushHandler;
138 import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL;
139 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
140 import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad;
141 import org.apache.hadoop.hbase.security.Superusers;
142 import org.apache.hadoop.hbase.security.UserProvider;
143 import org.apache.hadoop.hbase.trace.SpanReceiverHost;
144 import org.apache.hadoop.hbase.util.Addressing;
145 import org.apache.hadoop.hbase.util.ByteStringer;
146 import org.apache.hadoop.hbase.util.Bytes;
147 import org.apache.hadoop.hbase.util.CompressionTest;
148 import org.apache.hadoop.hbase.util.ConfigUtil;
149 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
150 import org.apache.hadoop.hbase.util.FSTableDescriptors;
151 import org.apache.hadoop.hbase.util.FSUtils;
152 import org.apache.hadoop.hbase.util.HasThread;
153 import org.apache.hadoop.hbase.util.JSONBean;
154 import org.apache.hadoop.hbase.util.JvmPauseMonitor;
155 import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
156 import org.apache.hadoop.hbase.util.Sleeper;
157 import org.apache.hadoop.hbase.util.Threads;
158 import org.apache.hadoop.hbase.util.VersionInfo;
159 import org.apache.hadoop.hbase.wal.DefaultWALProvider;
160 import org.apache.hadoop.hbase.wal.WAL;
161 import org.apache.hadoop.hbase.wal.WALFactory;
162 import org.apache.hadoop.hbase.zookeeper.ClusterStatusTracker;
163 import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
164 import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
165 import org.apache.hadoop.hbase.zookeeper.RecoveringRegionWatcher;
166 import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
167 import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
168 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
169 import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker;
170 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
171 import org.apache.hadoop.ipc.RemoteException;
172 import org.apache.hadoop.metrics.util.MBeanUtil;
173 import org.apache.hadoop.util.ReflectionUtils;
174 import org.apache.hadoop.util.StringUtils;
175 import org.apache.zookeeper.KeeperException;
176 import org.apache.zookeeper.KeeperException.NoNodeException;
177 import org.apache.zookeeper.data.Stat;
178
179 import com.google.common.annotations.VisibleForTesting;
180 import com.google.common.base.Preconditions;
181 import com.google.common.collect.Maps;
182 import com.google.protobuf.BlockingRpcChannel;
183 import com.google.protobuf.Descriptors;
184 import com.google.protobuf.Message;
185 import com.google.protobuf.RpcCallback;
186 import com.google.protobuf.RpcController;
187 import com.google.protobuf.Service;
188 import com.google.protobuf.ServiceException;
189
190
191
192
193
194 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
195 @SuppressWarnings("deprecation")
196 public class HRegionServer extends HasThread implements
197 RegionServerServices, LastSequenceId {
198
199 public static final Log LOG = LogFactory.getLog(HRegionServer.class);
200
201
202
203
204
205 protected static final String OPEN = "OPEN";
206 protected static final String CLOSE = "CLOSE";
207
208
209
210
211 protected final ConcurrentMap<byte[], Boolean> regionsInTransitionInRS =
212 new ConcurrentSkipListMap<byte[], Boolean>(Bytes.BYTES_COMPARATOR);
213
214
215 protected MemStoreFlusher cacheFlusher;
216
217 protected HeapMemoryManager hMemManager;
218
219
220
221
222
223
224 protected ClusterConnection clusterConnection;
225
226
227
228
229
230
231
232 protected MetaTableLocator metaTableLocator;
233
234
235 @SuppressWarnings("unused")
236 private RecoveringRegionWatcher recoveringRegionWatcher;
237
238
239
240
241 protected TableDescriptors tableDescriptors;
242
243
244 protected ReplicationSourceService replicationSourceHandler;
245 protected ReplicationSinkService replicationSinkHandler;
246
247
248 public CompactSplitThread compactSplitThread;
249
250
251
252
253
254 protected final Map<String, Region> onlineRegions = new ConcurrentHashMap<String, Region>();
255
256
257
258
259
260
261
262
263
264
265 protected final Map<String, InetSocketAddress[]> regionFavoredNodesMap =
266 new ConcurrentHashMap<String, InetSocketAddress[]>();
267
268
269
270
271
272 protected final Map<String, Region> recoveringRegions = Collections
273 .synchronizedMap(new HashMap<String, Region>());
274
275
276 protected Leases leases;
277
278
279 protected ExecutorService service;
280
281
282 protected volatile boolean fsOk;
283 protected HFileSystem fs;
284
285
286
287
288 private volatile boolean stopped = false;
289
290
291
292 private volatile boolean abortRequested;
293
294 ConcurrentMap<String, Integer> rowlocks = new ConcurrentHashMap<String, Integer>();
295
296
297
298 private boolean stopping = false;
299
300 private volatile boolean killed = false;
301
302 protected final Configuration conf;
303
304 private Path rootDir;
305
306 protected final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
307
308 final int numRetries;
309 protected final int threadWakeFrequency;
310 protected final int msgInterval;
311
312 protected final int numRegionsToReport;
313
314
315 private volatile RegionServerStatusService.BlockingInterface rssStub;
316
317 RpcClient rpcClient;
318
319 private RpcRetryingCallerFactory rpcRetryingCallerFactory;
320 private RpcControllerFactory rpcControllerFactory;
321
322 private UncaughtExceptionHandler uncaughtExceptionHandler;
323
324
325
326
327 protected InfoServer infoServer;
328 private JvmPauseMonitor pauseMonitor;
329
330
331 public static final String REGIONSERVER = "regionserver";
332
333 MetricsRegionServer metricsRegionServer;
334 private SpanReceiverHost spanReceiverHost;
335
336
337
338
339 private final ChoreService choreService;
340
341
342
343
344 ScheduledChore compactionChecker;
345
346
347
348
349 ScheduledChore periodicFlusher;
350
351 protected volatile WALFactory walFactory;
352
353
354
355 final LogRoller walRoller;
356
357 final AtomicReference<LogRoller> metawalRoller = new AtomicReference<LogRoller>();
358
359
360 final AtomicBoolean online = new AtomicBoolean(false);
361
362
363 protected ZooKeeperWatcher zooKeeper;
364
365
366 private MasterAddressTracker masterAddressTracker;
367
368
369 protected ClusterStatusTracker clusterStatusTracker;
370
371
372 private SplitLogWorker splitLogWorker;
373
374
375 protected final Sleeper sleeper;
376
377 private final int operationTimeout;
378 private final int shortOperationTimeout;
379
380 private final RegionServerAccounting regionServerAccounting;
381
382
383 protected CacheConfig cacheConfig;
384
385
386 private HealthCheckChore healthCheckChore;
387
388
389 private ScheduledChore nonceManagerChore;
390
391 private Map<String, Service> coprocessorServiceHandlers = Maps.newHashMap();
392
393
394
395
396
397
398 protected ServerName serverName;
399
400
401
402
403 private String useThisHostnameInstead;
404
405
406
407
408 final static String RS_HOSTNAME_KEY = "hbase.regionserver.hostname";
409
410 final static String MASTER_HOSTNAME_KEY = "hbase.master.hostname";
411
412
413
414
415 protected final long startcode;
416
417
418
419
420 private String clusterId;
421
422
423
424
425 private ObjectName mxBean = null;
426
427
428
429
430 private MovedRegionsCleaner movedRegionsCleaner;
431
432
433 private StorefileRefresherChore storefileRefresher;
434
435 private RegionServerCoprocessorHost rsHost;
436
437 private RegionServerProcedureManagerHost rspmHost;
438
439 private RegionServerQuotaManager rsQuotaManager;
440
441
442 protected TableLockManager tableLockManager;
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462 final ServerNonceManager nonceManager;
463
464 private UserProvider userProvider;
465
466 protected final RSRpcServices rpcServices;
467
468 protected BaseCoordinatedStateManager csm;
469
470 private final boolean useZKForAssignment;
471
472
473
474
475
476 protected final ConfigurationManager configurationManager;
477
478
479
480
481
482
483
484 public HRegionServer(Configuration conf) throws IOException, InterruptedException {
485 this(conf, CoordinatedStateManagerFactory.getCoordinatedStateManager(conf));
486 }
487
488
489
490
491
492
493
494
495 public HRegionServer(Configuration conf, CoordinatedStateManager csm)
496 throws IOException, InterruptedException {
497 this.fsOk = true;
498 this.conf = conf;
499 checkCodecs(this.conf);
500 this.userProvider = UserProvider.instantiate(conf);
501 FSUtils.setupShortCircuitRead(this.conf);
502
503 this.conf.setBoolean(HConstants.USE_META_REPLICAS, false);
504
505
506 this.numRetries = this.conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
507 HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
508 this.threadWakeFrequency = conf.getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
509 this.msgInterval = conf.getInt("hbase.regionserver.msginterval", 3 * 1000);
510
511 this.sleeper = new Sleeper(this.msgInterval, this);
512
513 boolean isNoncesEnabled = conf.getBoolean(HConstants.HBASE_RS_NONCES_ENABLED, true);
514 this.nonceManager = isNoncesEnabled ? new ServerNonceManager(this.conf) : null;
515
516 this.numRegionsToReport = conf.getInt(
517 "hbase.regionserver.numregionstoreport", 10);
518
519 this.operationTimeout = conf.getInt(
520 HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
521 HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
522
523 this.shortOperationTimeout = conf.getInt(
524 HConstants.HBASE_RPC_SHORTOPERATION_TIMEOUT_KEY,
525 HConstants.DEFAULT_HBASE_RPC_SHORTOPERATION_TIMEOUT);
526
527 this.abortRequested = false;
528 this.stopped = false;
529
530 rpcServices = createRpcServices();
531 this.startcode = System.currentTimeMillis();
532 if (this instanceof HMaster) {
533 useThisHostnameInstead = conf.get(MASTER_HOSTNAME_KEY);
534 } else {
535 useThisHostnameInstead = conf.get(RS_HOSTNAME_KEY);
536 }
537 String hostName = shouldUseThisHostnameInstead() ? useThisHostnameInstead :
538 rpcServices.isa.getHostName();
539 serverName = ServerName.valueOf(hostName, rpcServices.isa.getPort(), startcode);
540
541 rpcControllerFactory = RpcControllerFactory.instantiate(this.conf);
542 rpcRetryingCallerFactory = RpcRetryingCallerFactory.instantiate(this.conf);
543
544
545 ZKUtil.loginClient(this.conf, HConstants.ZK_CLIENT_KEYTAB_FILE,
546 HConstants.ZK_CLIENT_KERBEROS_PRINCIPAL, hostName);
547
548 login(userProvider, hostName);
549
550
551 Superusers.initialize(conf);
552
553 regionServerAccounting = new RegionServerAccounting();
554 uncaughtExceptionHandler = new UncaughtExceptionHandler() {
555 @Override
556 public void uncaughtException(Thread t, Throwable e) {
557 abort("Uncaught exception in service thread " + t.getName(), e);
558 }
559 };
560
561 useZKForAssignment = ConfigUtil.useZKForAssignment(conf);
562
563
564
565
566 FSUtils.setFsDefault(this.conf, FSUtils.getRootDir(this.conf));
567
568
569 boolean useHBaseChecksum = conf.getBoolean(HConstants.HBASE_CHECKSUM_VERIFICATION, true);
570 this.fs = new HFileSystem(this.conf, useHBaseChecksum);
571 this.rootDir = FSUtils.getRootDir(this.conf);
572 this.tableDescriptors = new FSTableDescriptors(
573 this.conf, this.fs, this.rootDir, !canUpdateTableDescriptor(), false);
574
575 service = new ExecutorService(getServerName().toShortString());
576 spanReceiverHost = SpanReceiverHost.getInstance(getConfiguration());
577
578
579 if (!conf.getBoolean("hbase.testing.nocluster", false)) {
580
581 zooKeeper = new ZooKeeperWatcher(conf, getProcessName() + ":" +
582 rpcServices.isa.getPort(), this, canCreateBaseZNode());
583
584 this.csm = (BaseCoordinatedStateManager) csm;
585 this.csm.initialize(this);
586 this.csm.start();
587
588 tableLockManager = TableLockManager.createTableLockManager(
589 conf, zooKeeper, serverName);
590
591 masterAddressTracker = new MasterAddressTracker(getZooKeeper(), this);
592 masterAddressTracker.start();
593
594 clusterStatusTracker = new ClusterStatusTracker(zooKeeper, this);
595 clusterStatusTracker.start();
596 }
597 this.configurationManager = new ConfigurationManager();
598
599 rpcServices.start();
600 putUpWebUI();
601 this.walRoller = new LogRoller(this, this);
602 this.choreService = new ChoreService(getServerName().toString());
603 }
604
605
606
607
608 protected boolean shouldUseThisHostnameInstead() {
609 return useThisHostnameInstead != null && !useThisHostnameInstead.isEmpty();
610 }
611
612 protected void login(UserProvider user, String host) throws IOException {
613 user.login("hbase.regionserver.keytab.file",
614 "hbase.regionserver.kerberos.principal", host);
615 }
616
617 protected void waitForMasterActive(){
618 }
619
620 protected String getProcessName() {
621 return REGIONSERVER;
622 }
623
624 protected boolean canCreateBaseZNode() {
625 return false;
626 }
627
628 protected boolean canUpdateTableDescriptor() {
629 return false;
630 }
631
632 protected RSRpcServices createRpcServices() throws IOException {
633 return new RSRpcServices(this);
634 }
635
636 protected void configureInfoServer() {
637 infoServer.addServlet("rs-status", "/rs-status", RSStatusServlet.class);
638 infoServer.setAttribute(REGIONSERVER, this);
639 }
640
641 protected Class<? extends HttpServlet> getDumpServlet() {
642 return RSDumpServlet.class;
643 }
644
645 protected void doMetrics() {
646 }
647
648 @Override
649 public boolean registerService(Service instance) {
650
651
652
653 Descriptors.ServiceDescriptor serviceDesc = instance.getDescriptorForType();
654 if (coprocessorServiceHandlers.containsKey(serviceDesc.getFullName())) {
655 LOG.error("Coprocessor service " + serviceDesc.getFullName()
656 + " already registered, rejecting request from " + instance);
657 return false;
658 }
659
660 coprocessorServiceHandlers.put(serviceDesc.getFullName(), instance);
661 if (LOG.isDebugEnabled()) {
662 LOG.debug("Registered regionserver coprocessor service: service="+serviceDesc.getFullName());
663 }
664 return true;
665 }
666
667
668
669
670
671
672
673 @VisibleForTesting
674 protected ClusterConnection createClusterConnection() throws IOException {
675
676
677
678 return ConnectionUtils.createShortCircuitConnection(conf, null, userProvider.getCurrent(),
679 serverName, rpcServices, rpcServices);
680 }
681
682
683
684
685
686
687 private static void checkCodecs(final Configuration c) throws IOException {
688
689 String [] codecs = c.getStrings("hbase.regionserver.codecs", (String[])null);
690 if (codecs == null) return;
691 for (String codec : codecs) {
692 if (!CompressionTest.testCompression(codec)) {
693 throw new IOException("Compression codec " + codec +
694 " not supported, aborting RS construction");
695 }
696 }
697 }
698
699 public String getClusterId() {
700 return this.clusterId;
701 }
702
703
704
705
706
707 protected synchronized void setupClusterConnection() throws IOException {
708 if (clusterConnection == null) {
709 clusterConnection = createClusterConnection();
710 metaTableLocator = new MetaTableLocator();
711 }
712 }
713
714
715
716
717
718
719
720 private void preRegistrationInitialization(){
721 try {
722 setupClusterConnection();
723
724
725 if (isHealthCheckerConfigured()) {
726 int sleepTime = this.conf.getInt(HConstants.HEALTH_CHORE_WAKE_FREQ,
727 HConstants.DEFAULT_THREAD_WAKE_FREQUENCY);
728 healthCheckChore = new HealthCheckChore(sleepTime, this, getConfiguration());
729 }
730 this.pauseMonitor = new JvmPauseMonitor(conf);
731 pauseMonitor.start();
732
733 initializeZooKeeper();
734 if (!isStopped() && !isAborted()) {
735 initializeThreads();
736 }
737 } catch (Throwable t) {
738
739
740 this.rpcServices.stop();
741 abort("Initialization of RS failed. Hence aborting RS.", t);
742 }
743 }
744
745
746
747
748
749
750
751
752
753 private void initializeZooKeeper() throws IOException, InterruptedException {
754
755
756
757 blockAndCheckIfStopped(this.masterAddressTracker);
758
759
760
761 blockAndCheckIfStopped(this.clusterStatusTracker);
762
763
764
765
766 try {
767 clusterId = ZKClusterId.readClusterIdZNode(this.zooKeeper);
768 if (clusterId == null) {
769 this.abort("Cluster ID has not been set");
770 }
771 LOG.info("ClusterId : "+clusterId);
772 } catch (KeeperException e) {
773 this.abort("Failed to retrieve Cluster ID",e);
774 }
775
776
777
778
779
780 waitForMasterActive();
781 if (isStopped() || isAborted()) {
782 return;
783 }
784
785
786 try {
787 rspmHost = new RegionServerProcedureManagerHost();
788 rspmHost.loadProcedures(conf);
789 rspmHost.initialize(this);
790 } catch (KeeperException e) {
791 this.abort("Failed to reach zk cluster when creating procedure handler.", e);
792 }
793
794 this.recoveringRegionWatcher = new RecoveringRegionWatcher(this.zooKeeper, this);
795 }
796
797
798
799
800
801
802
803
804 private void blockAndCheckIfStopped(ZooKeeperNodeTracker tracker)
805 throws IOException, InterruptedException {
806 while (tracker.blockUntilAvailable(this.msgInterval, false) == null) {
807 if (this.stopped) {
808 throw new IOException("Received the shutdown message while waiting.");
809 }
810 }
811 }
812
813
814
815
816 private boolean isClusterUp() {
817 return clusterStatusTracker != null && clusterStatusTracker.isClusterUp();
818 }
819
820 private void initializeThreads() throws IOException {
821
822 this.cacheFlusher = new MemStoreFlusher(conf, this);
823
824
825 this.compactSplitThread = new CompactSplitThread(this);
826
827
828
829 this.compactionChecker = new CompactionChecker(this, this.threadWakeFrequency, this);
830 this.periodicFlusher = new PeriodicMemstoreFlusher(this.threadWakeFrequency, this);
831 this.leases = new Leases(this.threadWakeFrequency);
832
833
834 movedRegionsCleaner = MovedRegionsCleaner.create(this);
835
836 if (this.nonceManager != null) {
837
838 nonceManagerChore = this.nonceManager.createCleanupScheduledChore(this);
839 }
840
841
842 rsQuotaManager = new RegionServerQuotaManager(this);
843
844
845 rpcClient = RpcClientFactory.createClient(conf, clusterId, new InetSocketAddress(
846 rpcServices.isa.getAddress(), 0));
847
848 boolean onlyMetaRefresh = false;
849 int storefileRefreshPeriod = conf.getInt(
850 StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD
851 , StorefileRefresherChore.DEFAULT_REGIONSERVER_STOREFILE_REFRESH_PERIOD);
852 if (storefileRefreshPeriod == 0) {
853 storefileRefreshPeriod = conf.getInt(
854 StorefileRefresherChore.REGIONSERVER_META_STOREFILE_REFRESH_PERIOD,
855 StorefileRefresherChore.DEFAULT_REGIONSERVER_STOREFILE_REFRESH_PERIOD);
856 onlyMetaRefresh = true;
857 }
858 if (storefileRefreshPeriod > 0) {
859 this.storefileRefresher = new StorefileRefresherChore(storefileRefreshPeriod,
860 onlyMetaRefresh, this, this);
861 }
862 registerConfigurationObservers();
863 }
864
865 private void registerConfigurationObservers() {
866
867 configurationManager.registerObserver(this.compactSplitThread);
868 }
869
870
871
872
873 @Override
874 public void run() {
875 try {
876
877 preRegistrationInitialization();
878 } catch (Throwable e) {
879 abort("Fatal exception during initialization", e);
880 }
881
882 try {
883 if (!isStopped() && !isAborted()) {
884 ShutdownHook.install(conf, fs, this, Thread.currentThread());
885
886 createMyEphemeralNode();
887
888
889 this.rsHost = new RegionServerCoprocessorHost(this, this.conf);
890 }
891
892
893
894 while (keepLooping()) {
895 RegionServerStartupResponse w = reportForDuty();
896 if (w == null) {
897 LOG.warn("reportForDuty failed; sleeping and then retrying.");
898 this.sleeper.sleep();
899 } else {
900 handleReportForDutyResponse(w);
901 break;
902 }
903 }
904
905 if (!isStopped() && isHealthy()){
906
907
908 rspmHost.start();
909 }
910
911
912 if (this.rsQuotaManager != null) {
913 rsQuotaManager.start(getRpcServer().getScheduler());
914 }
915
916
917 long lastMsg = System.currentTimeMillis();
918 long oldRequestCount = -1;
919
920 while (!isStopped() && isHealthy()) {
921 if (!isClusterUp()) {
922 if (isOnlineRegionsEmpty()) {
923 stop("Exiting; cluster shutdown set and not carrying any regions");
924 } else if (!this.stopping) {
925 this.stopping = true;
926 LOG.info("Closing user regions");
927 closeUserRegions(this.abortRequested);
928 } else if (this.stopping) {
929 boolean allUserRegionsOffline = areAllUserRegionsOffline();
930 if (allUserRegionsOffline) {
931
932
933
934 if (oldRequestCount == getWriteRequestCount()) {
935 stop("Stopped; only catalog regions remaining online");
936 break;
937 }
938 oldRequestCount = getWriteRequestCount();
939 } else {
940
941
942
943 closeUserRegions(this.abortRequested);
944 }
945 LOG.debug("Waiting on " + getOnlineRegionsAsPrintableString());
946 }
947 }
948 long now = System.currentTimeMillis();
949 if ((now - lastMsg) >= msgInterval) {
950 tryRegionServerReport(lastMsg, now);
951 lastMsg = System.currentTimeMillis();
952 doMetrics();
953 }
954 if (!isStopped() && !isAborted()) {
955 this.sleeper.sleep();
956 }
957 }
958 } catch (Throwable t) {
959 if (!rpcServices.checkOOME(t)) {
960 String prefix = t instanceof YouAreDeadException? "": "Unhandled: ";
961 abort(prefix + t.getMessage(), t);
962 }
963 }
964
965 if (mxBean != null) {
966 MBeanUtil.unregisterMBean(mxBean);
967 mxBean = null;
968 }
969 if (this.leases != null) this.leases.closeAfterLeasesExpire();
970 if (this.splitLogWorker != null) {
971 splitLogWorker.stop();
972 }
973 if (this.infoServer != null) {
974 LOG.info("Stopping infoServer");
975 try {
976 this.infoServer.stop();
977 } catch (Exception e) {
978 LOG.error("Failed to stop infoServer", e);
979 }
980 }
981
982 if (cacheConfig != null && cacheConfig.isBlockCacheEnabled()) {
983 cacheConfig.getBlockCache().shutdown();
984 }
985
986 if (movedRegionsCleaner != null) {
987 movedRegionsCleaner.stop("Region Server stopping");
988 }
989
990
991
992 if (this.hMemManager != null) this.hMemManager.stop();
993 if (this.cacheFlusher != null) this.cacheFlusher.interruptIfNecessary();
994 if (this.compactSplitThread != null) this.compactSplitThread.interruptIfNecessary();
995 if (this.compactionChecker != null) this.compactionChecker.cancel(true);
996 if (this.healthCheckChore != null) this.healthCheckChore.cancel(true);
997 if (this.nonceManagerChore != null) this.nonceManagerChore.cancel(true);
998 if (this.storefileRefresher != null) this.storefileRefresher.cancel(true);
999 sendShutdownInterrupt();
1000
1001
1002 if (rsQuotaManager != null) {
1003 rsQuotaManager.stop();
1004 }
1005
1006
1007 if (rspmHost != null) {
1008 rspmHost.stop(this.abortRequested || this.killed);
1009 }
1010
1011 if (this.killed) {
1012
1013 } else if (abortRequested) {
1014 if (this.fsOk) {
1015 closeUserRegions(abortRequested);
1016 }
1017 LOG.info("aborting server " + this.serverName);
1018 } else {
1019 closeUserRegions(abortRequested);
1020 LOG.info("stopping server " + this.serverName);
1021 }
1022
1023
1024 if (this.metaTableLocator != null) this.metaTableLocator.stop();
1025 if (this.clusterConnection != null && !clusterConnection.isClosed()) {
1026 try {
1027 this.clusterConnection.close();
1028 } catch (IOException e) {
1029
1030
1031 LOG.warn("Attempt to close server's short circuit HConnection failed.", e);
1032 }
1033 }
1034
1035
1036 if (!this.killed && containsMetaTableRegions()) {
1037 if (!abortRequested || this.fsOk) {
1038 if (this.compactSplitThread != null) {
1039 this.compactSplitThread.join();
1040 this.compactSplitThread = null;
1041 }
1042 closeMetaTableRegions(abortRequested);
1043 }
1044 }
1045
1046 if (!this.killed && this.fsOk) {
1047 waitOnAllRegionsToClose(abortRequested);
1048 LOG.info("stopping server " + this.serverName +
1049 "; all regions closed.");
1050 }
1051
1052
1053 if (this.fsOk) {
1054 shutdownWAL(!abortRequested);
1055 }
1056
1057
1058 if (this.rssStub != null) {
1059 this.rssStub = null;
1060 }
1061 if (this.rpcClient != null) {
1062 this.rpcClient.close();
1063 }
1064 if (this.leases != null) {
1065 this.leases.close();
1066 }
1067 if (this.pauseMonitor != null) {
1068 this.pauseMonitor.stop();
1069 }
1070
1071 if (!killed) {
1072 stopServiceThreads();
1073 }
1074
1075 if (this.rpcServices != null) {
1076 this.rpcServices.stop();
1077 }
1078
1079 try {
1080 deleteMyEphemeralNode();
1081 } catch (KeeperException.NoNodeException nn) {
1082 } catch (KeeperException e) {
1083 LOG.warn("Failed deleting my ephemeral node", e);
1084 }
1085
1086
1087 ZNodeClearer.deleteMyEphemeralNodeOnDisk();
1088
1089 if (this.zooKeeper != null) {
1090 this.zooKeeper.close();
1091 }
1092 LOG.info("stopping server " + this.serverName +
1093 "; zookeeper connection closed.");
1094
1095 LOG.info(Thread.currentThread().getName() + " exiting");
1096 }
1097
1098 private boolean containsMetaTableRegions() {
1099 return onlineRegions.containsKey(HRegionInfo.FIRST_META_REGIONINFO.getEncodedName());
1100 }
1101
1102 private boolean areAllUserRegionsOffline() {
1103 if (getNumberOfOnlineRegions() > 2) return false;
1104 boolean allUserRegionsOffline = true;
1105 for (Map.Entry<String, Region> e: this.onlineRegions.entrySet()) {
1106 if (!e.getValue().getRegionInfo().isMetaTable()) {
1107 allUserRegionsOffline = false;
1108 break;
1109 }
1110 }
1111 return allUserRegionsOffline;
1112 }
1113
1114
1115
1116
1117 private long getWriteRequestCount() {
1118 long writeCount = 0;
1119 for (Map.Entry<String, Region> e: this.onlineRegions.entrySet()) {
1120 writeCount += e.getValue().getWriteRequestsCount();
1121 }
1122 return writeCount;
1123 }
1124
1125 @VisibleForTesting
1126 protected void tryRegionServerReport(long reportStartTime, long reportEndTime)
1127 throws IOException {
1128 RegionServerStatusService.BlockingInterface rss = rssStub;
1129 if (rss == null) {
1130
1131 return;
1132 }
1133 ClusterStatusProtos.ServerLoad sl = buildServerLoad(reportStartTime, reportEndTime);
1134 try {
1135 RegionServerReportRequest.Builder request = RegionServerReportRequest.newBuilder();
1136 ServerName sn = ServerName.parseVersionedServerName(
1137 this.serverName.getVersionedBytes());
1138 request.setServer(ProtobufUtil.toServerName(sn));
1139 request.setLoad(sl);
1140 rss.regionServerReport(null, request.build());
1141 } catch (ServiceException se) {
1142 IOException ioe = ProtobufUtil.getRemoteException(se);
1143 if (ioe instanceof YouAreDeadException) {
1144
1145 throw ioe;
1146 }
1147 if (rssStub == rss) {
1148 rssStub = null;
1149 }
1150
1151
1152 createRegionServerStatusStub();
1153 }
1154 }
1155
1156 ClusterStatusProtos.ServerLoad buildServerLoad(long reportStartTime, long reportEndTime)
1157 throws IOException {
1158
1159
1160
1161
1162
1163
1164
1165 MetricsRegionServerWrapper regionServerWrapper = metricsRegionServer.getRegionServerWrapper();
1166 Collection<Region> regions = getOnlineRegionsLocalContext();
1167 MemoryUsage memory = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage();
1168
1169 ClusterStatusProtos.ServerLoad.Builder serverLoad =
1170 ClusterStatusProtos.ServerLoad.newBuilder();
1171 serverLoad.setNumberOfRequests((int) regionServerWrapper.getRequestsPerSecond());
1172 serverLoad.setTotalNumberOfRequests((int) regionServerWrapper.getTotalRequestCount());
1173 serverLoad.setUsedHeapMB((int)(memory.getUsed() / 1024 / 1024));
1174 serverLoad.setMaxHeapMB((int) (memory.getMax() / 1024 / 1024));
1175 Set<String> coprocessors = getWAL(null).getCoprocessorHost().getCoprocessors();
1176 Builder coprocessorBuilder = Coprocessor.newBuilder();
1177 for (String coprocessor : coprocessors) {
1178 serverLoad.addCoprocessors(coprocessorBuilder.setName(coprocessor).build());
1179 }
1180 RegionLoad.Builder regionLoadBldr = RegionLoad.newBuilder();
1181 RegionSpecifier.Builder regionSpecifier = RegionSpecifier.newBuilder();
1182 for (Region region : regions) {
1183 if (region.getCoprocessorHost() != null) {
1184 Set<String> regionCoprocessors = region.getCoprocessorHost().getCoprocessors();
1185 Iterator<String> iterator = regionCoprocessors.iterator();
1186 while (iterator.hasNext()) {
1187 serverLoad.addCoprocessors(coprocessorBuilder.setName(iterator.next()).build());
1188 }
1189 }
1190 serverLoad.addRegionLoads(createRegionLoad(region, regionLoadBldr, regionSpecifier));
1191 for (String coprocessor : getWAL(region.getRegionInfo()).getCoprocessorHost()
1192 .getCoprocessors()) {
1193 serverLoad.addCoprocessors(coprocessorBuilder.setName(coprocessor).build());
1194 }
1195 }
1196 serverLoad.setReportStartTime(reportStartTime);
1197 serverLoad.setReportEndTime(reportEndTime);
1198 if (this.infoServer != null) {
1199 serverLoad.setInfoServerPort(this.infoServer.getPort());
1200 } else {
1201 serverLoad.setInfoServerPort(-1);
1202 }
1203
1204
1205
1206 ReplicationSourceService rsources = getReplicationSourceService();
1207
1208 if (rsources != null) {
1209
1210 ReplicationLoad rLoad = rsources.refreshAndGetReplicationLoad();
1211 if (rLoad != null) {
1212 serverLoad.setReplLoadSink(rLoad.getReplicationLoadSink());
1213 for (ClusterStatusProtos.ReplicationLoadSource rLS : rLoad.getReplicationLoadSourceList()) {
1214 serverLoad.addReplLoadSource(rLS);
1215 }
1216 }
1217 }
1218
1219 return serverLoad.build();
1220 }
1221
1222 String getOnlineRegionsAsPrintableString() {
1223 StringBuilder sb = new StringBuilder();
1224 for (Region r: this.onlineRegions.values()) {
1225 if (sb.length() > 0) sb.append(", ");
1226 sb.append(r.getRegionInfo().getEncodedName());
1227 }
1228 return sb.toString();
1229 }
1230
1231
1232
1233
1234 private void waitOnAllRegionsToClose(final boolean abort) {
1235
1236 int lastCount = -1;
1237 long previousLogTime = 0;
1238 Set<String> closedRegions = new HashSet<String>();
1239 boolean interrupted = false;
1240 try {
1241 while (!isOnlineRegionsEmpty()) {
1242 int count = getNumberOfOnlineRegions();
1243
1244 if (count != lastCount) {
1245
1246 if (System.currentTimeMillis() > (previousLogTime + 1000)) {
1247 previousLogTime = System.currentTimeMillis();
1248 lastCount = count;
1249 LOG.info("Waiting on " + count + " regions to close");
1250
1251
1252 if (count < 10 && LOG.isDebugEnabled()) {
1253 LOG.debug(this.onlineRegions);
1254 }
1255 }
1256 }
1257
1258
1259
1260 for (Map.Entry<String, Region> e : this.onlineRegions.entrySet()) {
1261 HRegionInfo hri = e.getValue().getRegionInfo();
1262 if (!this.regionsInTransitionInRS.containsKey(hri.getEncodedNameAsBytes())
1263 && !closedRegions.contains(hri.getEncodedName())) {
1264 closedRegions.add(hri.getEncodedName());
1265
1266 closeRegionIgnoreErrors(hri, abort);
1267 }
1268 }
1269
1270 if (this.regionsInTransitionInRS.isEmpty()) {
1271 if (!isOnlineRegionsEmpty()) {
1272 LOG.info("We were exiting though online regions are not empty," +
1273 " because some regions failed closing");
1274 }
1275 break;
1276 }
1277 if (sleep(200)) {
1278 interrupted = true;
1279 }
1280 }
1281 } finally {
1282 if (interrupted) {
1283 Thread.currentThread().interrupt();
1284 }
1285 }
1286 }
1287
1288 private boolean sleep(long millis) {
1289 boolean interrupted = false;
1290 try {
1291 Thread.sleep(millis);
1292 } catch (InterruptedException e) {
1293 LOG.warn("Interrupted while sleeping");
1294 interrupted = true;
1295 }
1296 return interrupted;
1297 }
1298
1299 private void shutdownWAL(final boolean close) {
1300 if (this.walFactory != null) {
1301 try {
1302 if (close) {
1303 walFactory.close();
1304 } else {
1305 walFactory.shutdown();
1306 }
1307 } catch (Throwable e) {
1308 e = RemoteExceptionHandler.checkThrowable(e);
1309 LOG.error("Shutdown / close of WAL failed: " + e);
1310 LOG.debug("Shutdown / close exception details:", e);
1311 }
1312 }
1313 }
1314
1315
1316
1317
1318
1319
1320 protected void handleReportForDutyResponse(final RegionServerStartupResponse c)
1321 throws IOException {
1322 try {
1323 for (NameStringPair e : c.getMapEntriesList()) {
1324 String key = e.getName();
1325
1326 if (key.equals(HConstants.KEY_FOR_HOSTNAME_SEEN_BY_MASTER)) {
1327 String hostnameFromMasterPOV = e.getValue();
1328 this.serverName = ServerName.valueOf(hostnameFromMasterPOV,
1329 rpcServices.isa.getPort(), this.startcode);
1330 if (shouldUseThisHostnameInstead() &&
1331 !hostnameFromMasterPOV.equals(useThisHostnameInstead)) {
1332 String msg = "Master passed us a different hostname to use; was=" +
1333 this.useThisHostnameInstead + ", but now=" + hostnameFromMasterPOV;
1334 LOG.error(msg);
1335 throw new IOException(msg);
1336 }
1337 if (!shouldUseThisHostnameInstead() &&
1338 !hostnameFromMasterPOV.equals(rpcServices.isa.getHostName())) {
1339 String msg = "Master passed us a different hostname to use; was=" +
1340 rpcServices.isa.getHostName() + ", but now=" + hostnameFromMasterPOV;
1341 LOG.error(msg);
1342 }
1343 continue;
1344 }
1345 String value = e.getValue();
1346 if (LOG.isDebugEnabled()) {
1347 LOG.info("Config from master: " + key + "=" + value);
1348 }
1349 this.conf.set(key, value);
1350 }
1351
1352
1353
1354 if (this.conf.get("mapreduce.task.attempt.id") == null) {
1355 this.conf.set("mapreduce.task.attempt.id", "hb_rs_" +
1356 this.serverName.toString());
1357 }
1358
1359
1360 ZNodeClearer.writeMyEphemeralNodeOnDisk(getMyEphemeralNodePath());
1361
1362 this.cacheConfig = new CacheConfig(conf);
1363 this.walFactory = setupWALAndReplication();
1364
1365 this.metricsRegionServer = new MetricsRegionServer(new MetricsRegionServerWrapperImpl(this));
1366
1367 startServiceThreads();
1368 startHeapMemoryManager();
1369 LOG.info("Serving as " + this.serverName +
1370 ", RpcServer on " + rpcServices.isa +
1371 ", sessionid=0x" +
1372 Long.toHexString(this.zooKeeper.getRecoverableZooKeeper().getSessionId()));
1373
1374
1375 synchronized (online) {
1376 online.set(true);
1377 online.notifyAll();
1378 }
1379 } catch (Throwable e) {
1380 stop("Failed initialization");
1381 throw convertThrowableToIOE(cleanup(e, "Failed init"),
1382 "Region server startup failed");
1383 } finally {
1384 sleeper.skipSleepCycle();
1385 }
1386 }
1387
1388 private void startHeapMemoryManager() {
1389 this.hMemManager = HeapMemoryManager.create(this.conf, this.cacheFlusher, this);
1390 if (this.hMemManager != null) {
1391 this.hMemManager.start(getChoreService());
1392 }
1393 }
1394
1395 private void createMyEphemeralNode() throws KeeperException, IOException {
1396 RegionServerInfo.Builder rsInfo = RegionServerInfo.newBuilder();
1397 rsInfo.setInfoPort(infoServer != null ? infoServer.getPort() : -1);
1398 byte[] data = ProtobufUtil.prependPBMagic(rsInfo.build().toByteArray());
1399 ZKUtil.createEphemeralNodeAndWatch(this.zooKeeper,
1400 getMyEphemeralNodePath(), data);
1401 }
1402
1403 private void deleteMyEphemeralNode() throws KeeperException {
1404 ZKUtil.deleteNode(this.zooKeeper, getMyEphemeralNodePath());
1405 }
1406
1407 @Override
1408 public RegionServerAccounting getRegionServerAccounting() {
1409 return regionServerAccounting;
1410 }
1411
1412 @Override
1413 public TableLockManager getTableLockManager() {
1414 return tableLockManager;
1415 }
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425 private RegionLoad createRegionLoad(final Region r, RegionLoad.Builder regionLoadBldr,
1426 RegionSpecifier.Builder regionSpecifier) throws IOException {
1427 byte[] name = r.getRegionInfo().getRegionName();
1428 int stores = 0;
1429 int storefiles = 0;
1430 int storeUncompressedSizeMB = 0;
1431 int storefileSizeMB = 0;
1432 int memstoreSizeMB = (int) (r.getMemstoreSize() / 1024 / 1024);
1433 int storefileIndexSizeMB = 0;
1434 int rootIndexSizeKB = 0;
1435 int totalStaticIndexSizeKB = 0;
1436 int totalStaticBloomSizeKB = 0;
1437 long totalCompactingKVs = 0;
1438 long currentCompactedKVs = 0;
1439 List<Store> storeList = r.getStores();
1440 stores += storeList.size();
1441 for (Store store : storeList) {
1442 storefiles += store.getStorefilesCount();
1443 storeUncompressedSizeMB += (int) (store.getStoreSizeUncompressed() / 1024 / 1024);
1444 storefileSizeMB += (int) (store.getStorefilesSize() / 1024 / 1024);
1445 storefileIndexSizeMB += (int) (store.getStorefilesIndexSize() / 1024 / 1024);
1446 CompactionProgress progress = store.getCompactionProgress();
1447 if (progress != null) {
1448 totalCompactingKVs += progress.totalCompactingKVs;
1449 currentCompactedKVs += progress.currentCompactedKVs;
1450 }
1451 rootIndexSizeKB += (int) (store.getStorefilesIndexSize() / 1024);
1452 totalStaticIndexSizeKB += (int) (store.getTotalStaticIndexSize() / 1024);
1453 totalStaticBloomSizeKB += (int) (store.getTotalStaticBloomSize() / 1024);
1454 }
1455
1456 float dataLocality =
1457 r.getHDFSBlocksDistribution().getBlockLocalityIndex(serverName.getHostname());
1458 if (regionLoadBldr == null) {
1459 regionLoadBldr = RegionLoad.newBuilder();
1460 }
1461 if (regionSpecifier == null) {
1462 regionSpecifier = RegionSpecifier.newBuilder();
1463 }
1464 regionSpecifier.setType(RegionSpecifierType.REGION_NAME);
1465 regionSpecifier.setValue(ByteStringer.wrap(name));
1466 regionLoadBldr.setRegionSpecifier(regionSpecifier.build())
1467 .setStores(stores)
1468 .setStorefiles(storefiles)
1469 .setStoreUncompressedSizeMB(storeUncompressedSizeMB)
1470 .setStorefileSizeMB(storefileSizeMB)
1471 .setMemstoreSizeMB(memstoreSizeMB)
1472 .setStorefileIndexSizeMB(storefileIndexSizeMB)
1473 .setRootIndexSizeKB(rootIndexSizeKB)
1474 .setTotalStaticIndexSizeKB(totalStaticIndexSizeKB)
1475 .setTotalStaticBloomSizeKB(totalStaticBloomSizeKB)
1476 .setReadRequestsCount(r.getReadRequestsCount())
1477 .setWriteRequestsCount(r.getWriteRequestsCount())
1478 .setTotalCompactingKVs(totalCompactingKVs)
1479 .setCurrentCompactedKVs(currentCompactedKVs)
1480 .setDataLocality(dataLocality)
1481 .setLastMajorCompactionTs(r.getOldestHfileTs(true));
1482 ((HRegion)r).setCompleteSequenceId(regionLoadBldr);
1483
1484 return regionLoadBldr.build();
1485 }
1486
1487
1488
1489
1490
1491 public RegionLoad createRegionLoad(final String encodedRegionName) throws IOException {
1492 Region r = onlineRegions.get(encodedRegionName);
1493 return r != null ? createRegionLoad(r, null, null) : null;
1494 }
1495
1496
1497
1498
1499 private static class CompactionChecker extends ScheduledChore {
1500 private final HRegionServer instance;
1501 private final int majorCompactPriority;
1502 private final static int DEFAULT_PRIORITY = Integer.MAX_VALUE;
1503 private long iteration = 0;
1504
1505 CompactionChecker(final HRegionServer h, final int sleepTime,
1506 final Stoppable stopper) {
1507 super("CompactionChecker", stopper, sleepTime);
1508 this.instance = h;
1509 LOG.info(this.getName() + " runs every " + StringUtils.formatTime(sleepTime));
1510
1511
1512
1513
1514 this.majorCompactPriority = this.instance.conf.
1515 getInt("hbase.regionserver.compactionChecker.majorCompactPriority",
1516 DEFAULT_PRIORITY);
1517 }
1518
1519 @Override
1520 protected void chore() {
1521 for (Region r : this.instance.onlineRegions.values()) {
1522 if (r == null)
1523 continue;
1524 for (Store s : r.getStores()) {
1525 try {
1526 long multiplier = s.getCompactionCheckMultiplier();
1527 assert multiplier > 0;
1528 if (iteration % multiplier != 0) continue;
1529 if (s.needsCompaction()) {
1530
1531 this.instance.compactSplitThread.requestSystemCompaction(r, s, getName()
1532 + " requests compaction");
1533 } else if (s.isMajorCompaction()) {
1534 if (majorCompactPriority == DEFAULT_PRIORITY
1535 || majorCompactPriority > ((HRegion)r).getCompactPriority()) {
1536 this.instance.compactSplitThread.requestCompaction(r, s, getName()
1537 + " requests major compaction; use default priority", null);
1538 } else {
1539 this.instance.compactSplitThread.requestCompaction(r, s, getName()
1540 + " requests major compaction; use configured priority",
1541 this.majorCompactPriority, null, null);
1542 }
1543 }
1544 } catch (IOException e) {
1545 LOG.warn("Failed major compaction check on " + r, e);
1546 }
1547 }
1548 }
1549 iteration = (iteration == Long.MAX_VALUE) ? 0 : (iteration + 1);
1550 }
1551 }
1552
1553 static class PeriodicMemstoreFlusher extends ScheduledChore {
1554 final HRegionServer server;
1555 final static int RANGE_OF_DELAY = 20000;
1556 final static int MIN_DELAY_TIME = 3000;
1557 public PeriodicMemstoreFlusher(int cacheFlushInterval, final HRegionServer server) {
1558 super(server.getServerName() + "-MemstoreFlusherChore", server, cacheFlushInterval);
1559 this.server = server;
1560 }
1561
1562 @Override
1563 protected void chore() {
1564 for (Region r : this.server.onlineRegions.values()) {
1565 if (r == null)
1566 continue;
1567 if (((HRegion)r).shouldFlush()) {
1568 FlushRequester requester = server.getFlushRequester();
1569 if (requester != null) {
1570 long randomDelay = RandomUtils.nextInt(RANGE_OF_DELAY) + MIN_DELAY_TIME;
1571 LOG.info(getName() + " requesting flush for region " +
1572 r.getRegionInfo().getRegionNameAsString() + " after a delay of " + randomDelay);
1573
1574
1575
1576 requester.requestDelayedFlush(r, randomDelay, false);
1577 }
1578 }
1579 }
1580 }
1581 }
1582
1583
1584
1585
1586
1587
1588
1589
1590 public boolean isOnline() {
1591 return online.get();
1592 }
1593
1594
1595
1596
1597
1598
1599
1600 private WALFactory setupWALAndReplication() throws IOException {
1601
1602 final Path oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
1603 final String logName = DefaultWALProvider.getWALDirectoryName(this.serverName.toString());
1604
1605 Path logdir = new Path(rootDir, logName);
1606 if (LOG.isDebugEnabled()) LOG.debug("logdir=" + logdir);
1607 if (this.fs.exists(logdir)) {
1608 throw new RegionServerRunningException("Region server has already " +
1609 "created directory at " + this.serverName.toString());
1610 }
1611
1612
1613
1614 createNewReplicationInstance(conf, this, this.fs, logdir, oldLogDir);
1615
1616
1617 final List<WALActionsListener> listeners = new ArrayList<WALActionsListener>();
1618 listeners.add(new MetricsWAL());
1619 if (this.replicationSourceHandler != null &&
1620 this.replicationSourceHandler.getWALActionsListener() != null) {
1621
1622 listeners.add(this.replicationSourceHandler.getWALActionsListener());
1623 }
1624
1625 return new WALFactory(conf, listeners, serverName.toString());
1626 }
1627
1628
1629
1630
1631
1632
1633
1634
1635
1636 protected LogRoller ensureMetaWALRoller() {
1637
1638
1639 LogRoller roller = metawalRoller.get();
1640 if (null == roller) {
1641 LogRoller tmpLogRoller = new LogRoller(this, this);
1642 String n = Thread.currentThread().getName();
1643 Threads.setDaemonThreadRunning(tmpLogRoller.getThread(),
1644 n + "-MetaLogRoller", uncaughtExceptionHandler);
1645 if (metawalRoller.compareAndSet(null, tmpLogRoller)) {
1646 roller = tmpLogRoller;
1647 } else {
1648
1649 Threads.shutdown(tmpLogRoller.getThread());
1650 roller = metawalRoller.get();
1651 }
1652 }
1653 return roller;
1654 }
1655
1656 public MetricsRegionServer getRegionServerMetrics() {
1657 return this.metricsRegionServer;
1658 }
1659
1660
1661
1662
1663 public MasterAddressTracker getMasterAddressTracker() {
1664 return this.masterAddressTracker;
1665 }
1666
1667
1668
1669
1670
1671
1672
1673
1674
1675
1676
1677
1678
1679 private void startServiceThreads() throws IOException {
1680
1681 this.service.startExecutorService(ExecutorType.RS_OPEN_REGION,
1682 conf.getInt("hbase.regionserver.executor.openregion.threads", 3));
1683 this.service.startExecutorService(ExecutorType.RS_OPEN_META,
1684 conf.getInt("hbase.regionserver.executor.openmeta.threads", 1));
1685 this.service.startExecutorService(ExecutorType.RS_CLOSE_REGION,
1686 conf.getInt("hbase.regionserver.executor.closeregion.threads", 3));
1687 this.service.startExecutorService(ExecutorType.RS_CLOSE_META,
1688 conf.getInt("hbase.regionserver.executor.closemeta.threads", 1));
1689 if (conf.getBoolean(StoreScanner.STORESCANNER_PARALLEL_SEEK_ENABLE, false)) {
1690 this.service.startExecutorService(ExecutorType.RS_PARALLEL_SEEK,
1691 conf.getInt("hbase.storescanner.parallel.seek.threads", 10));
1692 }
1693 this.service.startExecutorService(ExecutorType.RS_LOG_REPLAY_OPS, conf.getInt(
1694 "hbase.regionserver.wal.max.splitters", SplitLogWorkerCoordination.DEFAULT_MAX_SPLITTERS));
1695
1696 if (ServerRegionReplicaUtil.isRegionReplicaWaitForPrimaryFlushEnabled(conf)) {
1697 this.service.startExecutorService(ExecutorType.RS_REGION_REPLICA_FLUSH_OPS,
1698 conf.getInt("hbase.regionserver.region.replica.flusher.threads",
1699 conf.getInt("hbase.regionserver.executor.openregion.threads", 3)));
1700 }
1701
1702 Threads.setDaemonThreadRunning(this.walRoller.getThread(), getName() + ".logRoller",
1703 uncaughtExceptionHandler);
1704 this.cacheFlusher.start(uncaughtExceptionHandler);
1705
1706 if (this.compactionChecker != null) choreService.scheduleChore(compactionChecker);
1707 if (this.periodicFlusher != null) choreService.scheduleChore(periodicFlusher);
1708 if (this.healthCheckChore != null) choreService.scheduleChore(healthCheckChore);
1709 if (this.nonceManagerChore != null) choreService.scheduleChore(nonceManagerChore);
1710 if (this.storefileRefresher != null) choreService.scheduleChore(storefileRefresher);
1711 if (this.movedRegionsCleaner != null) choreService.scheduleChore(movedRegionsCleaner);
1712
1713
1714
1715 Threads.setDaemonThreadRunning(this.leases.getThread(), getName() + ".leaseChecker",
1716 uncaughtExceptionHandler);
1717
1718 if (this.replicationSourceHandler == this.replicationSinkHandler &&
1719 this.replicationSourceHandler != null) {
1720 this.replicationSourceHandler.startReplicationService();
1721 } else {
1722 if (this.replicationSourceHandler != null) {
1723 this.replicationSourceHandler.startReplicationService();
1724 }
1725 if (this.replicationSinkHandler != null) {
1726 this.replicationSinkHandler.startReplicationService();
1727 }
1728 }
1729
1730
1731
1732
1733
1734 Configuration sinkConf = HBaseConfiguration.create(conf);
1735 sinkConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
1736 conf.getInt("hbase.log.replay.retries.number", 8));
1737 sinkConf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
1738 conf.getInt("hbase.log.replay.rpc.timeout", 30000));
1739 sinkConf.setInt("hbase.client.serverside.retries.multiplier", 1);
1740 this.splitLogWorker = new SplitLogWorker(this, sinkConf, this, this, walFactory);
1741 splitLogWorker.start();
1742 }
1743
1744
1745
1746
1747
1748
1749 private int putUpWebUI() throws IOException {
1750 int port = this.conf.getInt(HConstants.REGIONSERVER_INFO_PORT,
1751 HConstants.DEFAULT_REGIONSERVER_INFOPORT);
1752 String addr = this.conf.get("hbase.regionserver.info.bindAddress", "0.0.0.0");
1753
1754 if(this instanceof HMaster) {
1755 port = conf.getInt(HConstants.MASTER_INFO_PORT,
1756 HConstants.DEFAULT_MASTER_INFOPORT);
1757 addr = this.conf.get("hbase.master.info.bindAddress", "0.0.0.0");
1758 }
1759
1760 if (port < 0) return port;
1761
1762 if (!Addressing.isLocalAddress(InetAddress.getByName(addr))) {
1763 String msg =
1764 "Failed to start http info server. Address " + addr
1765 + " does not belong to this host. Correct configuration parameter: "
1766 + "hbase.regionserver.info.bindAddress";
1767 LOG.error(msg);
1768 throw new IOException(msg);
1769 }
1770
1771 boolean auto = this.conf.getBoolean(HConstants.REGIONSERVER_INFO_PORT_AUTO,
1772 false);
1773 while (true) {
1774 try {
1775 this.infoServer = new InfoServer(getProcessName(), addr, port, false, this.conf);
1776 infoServer.addServlet("dump", "/dump", getDumpServlet());
1777 configureInfoServer();
1778 this.infoServer.start();
1779 break;
1780 } catch (BindException e) {
1781 if (!auto) {
1782
1783 LOG.error("Failed binding http info server to port: " + port);
1784 throw e;
1785 }
1786
1787 LOG.info("Failed binding http info server to port: " + port);
1788 port++;
1789 }
1790 }
1791 port = this.infoServer.getPort();
1792 conf.setInt(HConstants.REGIONSERVER_INFO_PORT, port);
1793 int masterInfoPort = conf.getInt(HConstants.MASTER_INFO_PORT,
1794 HConstants.DEFAULT_MASTER_INFOPORT);
1795 conf.setInt("hbase.master.info.port.orig", masterInfoPort);
1796 conf.setInt(HConstants.MASTER_INFO_PORT, port);
1797 return port;
1798 }
1799
1800
1801
1802
1803 private boolean isHealthy() {
1804 if (!fsOk) {
1805
1806 return false;
1807 }
1808
1809 if (!(leases.isAlive()
1810 && cacheFlusher.isAlive() && walRoller.isAlive()
1811 && this.compactionChecker.isScheduled()
1812 && this.periodicFlusher.isScheduled())) {
1813 stop("One or more threads are no longer alive -- stop");
1814 return false;
1815 }
1816 final LogRoller metawalRoller = this.metawalRoller.get();
1817 if (metawalRoller != null && !metawalRoller.isAlive()) {
1818 stop("Meta WAL roller thread is no longer alive -- stop");
1819 return false;
1820 }
1821 return true;
1822 }
1823
1824 private static final byte[] UNSPECIFIED_REGION = new byte[]{};
1825
1826 @Override
1827 public WAL getWAL(HRegionInfo regionInfo) throws IOException {
1828 WAL wal;
1829 LogRoller roller = walRoller;
1830
1831 if (regionInfo != null && regionInfo.isMetaTable() &&
1832 regionInfo.getReplicaId() == HRegionInfo.DEFAULT_REPLICA_ID) {
1833 roller = ensureMetaWALRoller();
1834 wal = walFactory.getMetaWAL(regionInfo.getEncodedNameAsBytes());
1835 } else if (regionInfo == null) {
1836 wal = walFactory.getWAL(UNSPECIFIED_REGION);
1837 } else {
1838 wal = walFactory.getWAL(regionInfo.getEncodedNameAsBytes());
1839 }
1840 roller.addWAL(wal);
1841 return wal;
1842 }
1843
1844 @Override
1845 public ClusterConnection getConnection() {
1846 return this.clusterConnection;
1847 }
1848
1849 @Override
1850 public MetaTableLocator getMetaTableLocator() {
1851 return this.metaTableLocator;
1852 }
1853
1854 @Override
1855 public void stop(final String msg) {
1856 if (!this.stopped) {
1857 try {
1858 if (this.rsHost != null) {
1859 this.rsHost.preStop(msg);
1860 }
1861 this.stopped = true;
1862 LOG.info("STOPPED: " + msg);
1863
1864 sleeper.skipSleepCycle();
1865 } catch (IOException exp) {
1866 LOG.warn("The region server did not stop", exp);
1867 }
1868 }
1869 }
1870
1871 public void waitForServerOnline(){
1872 while (!isStopped() && !isOnline()) {
1873 synchronized (online) {
1874 try {
1875 online.wait(msgInterval);
1876 } catch (InterruptedException ie) {
1877 Thread.currentThread().interrupt();
1878 break;
1879 }
1880 }
1881 }
1882 }
1883
1884 @Override
1885 public void postOpenDeployTasks(final Region r) throws KeeperException, IOException {
1886 postOpenDeployTasks(new PostOpenDeployContext(r, -1));
1887 }
1888
1889 @Override
1890 public void postOpenDeployTasks(final PostOpenDeployContext context)
1891 throws KeeperException, IOException {
1892 Region r = context.getRegion();
1893 long masterSystemTime = context.getMasterSystemTime();
1894 Preconditions.checkArgument(r instanceof HRegion, "r must be an HRegion");
1895 rpcServices.checkOpen();
1896 LOG.info("Post open deploy tasks for " + r.getRegionInfo().getRegionNameAsString());
1897
1898 for (Store s : r.getStores()) {
1899 if (s.hasReferences() || s.needsCompaction()) {
1900 this.compactSplitThread.requestSystemCompaction(r, s, "Opening Region");
1901 }
1902 }
1903 long openSeqNum = r.getOpenSeqNum();
1904 if (openSeqNum == HConstants.NO_SEQNUM) {
1905
1906 LOG.error("No sequence number found when opening " +
1907 r.getRegionInfo().getRegionNameAsString());
1908 openSeqNum = 0;
1909 }
1910
1911
1912 updateRecoveringRegionLastFlushedSequenceId(r);
1913
1914
1915 if (r.getRegionInfo().isMetaRegion()) {
1916 MetaTableLocator.setMetaLocation(getZooKeeper(), serverName, r.getRegionInfo().getReplicaId(),
1917 State.OPEN);
1918 } else if (useZKForAssignment) {
1919 MetaTableAccessor.updateRegionLocation(getConnection(), r.getRegionInfo(),
1920 this.serverName, openSeqNum, masterSystemTime);
1921 }
1922 if (!useZKForAssignment && !reportRegionStateTransition(new RegionStateTransitionContext(
1923 TransitionCode.OPENED, openSeqNum, masterSystemTime, r.getRegionInfo()))) {
1924 throw new IOException("Failed to report opened region to master: "
1925 + r.getRegionInfo().getRegionNameAsString());
1926 }
1927
1928 triggerFlushInPrimaryRegion((HRegion)r);
1929
1930 LOG.debug("Finished post open deploy task for " + r.getRegionInfo().getRegionNameAsString());
1931 }
1932
1933 @Override
1934 public boolean reportRegionStateTransition(TransitionCode code, HRegionInfo... hris) {
1935 return reportRegionStateTransition(code, HConstants.NO_SEQNUM, hris);
1936 }
1937
1938 @Override
1939 public boolean reportRegionStateTransition(
1940 TransitionCode code, long openSeqNum, HRegionInfo... hris) {
1941 return reportRegionStateTransition(
1942 new RegionStateTransitionContext(code, HConstants.NO_SEQNUM, -1, hris));
1943 }
1944
1945 @Override
1946 public boolean reportRegionStateTransition(final RegionStateTransitionContext context) {
1947 TransitionCode code = context.getCode();
1948 long openSeqNum = context.getOpenSeqNum();
1949 long masterSystemTime = context.getMasterSystemTime();
1950 HRegionInfo[] hris = context.getHris();
1951
1952 ReportRegionStateTransitionRequest.Builder builder =
1953 ReportRegionStateTransitionRequest.newBuilder();
1954 builder.setServer(ProtobufUtil.toServerName(serverName));
1955 RegionStateTransition.Builder transition = builder.addTransitionBuilder();
1956 transition.setTransitionCode(code);
1957 if (code == TransitionCode.OPENED && openSeqNum >= 0) {
1958 transition.setOpenSeqNum(openSeqNum);
1959 }
1960 for (HRegionInfo hri: hris) {
1961 transition.addRegionInfo(HRegionInfo.convert(hri));
1962 }
1963 ReportRegionStateTransitionRequest request = builder.build();
1964 while (keepLooping()) {
1965 RegionServerStatusService.BlockingInterface rss = rssStub;
1966 try {
1967 if (rss == null) {
1968 createRegionServerStatusStub();
1969 continue;
1970 }
1971 ReportRegionStateTransitionResponse response =
1972 rss.reportRegionStateTransition(null, request);
1973 if (response.hasErrorMessage()) {
1974 LOG.info("Failed to transition " + hris[0]
1975 + " to " + code + ": " + response.getErrorMessage());
1976 return false;
1977 }
1978 return true;
1979 } catch (ServiceException se) {
1980 IOException ioe = ProtobufUtil.getRemoteException(se);
1981 LOG.info("Failed to report region transition, will retry", ioe);
1982 if (rssStub == rss) {
1983 rssStub = null;
1984 }
1985 }
1986 }
1987 return false;
1988 }
1989
1990
1991
1992
1993
1994 void triggerFlushInPrimaryRegion(final HRegion region) {
1995 if (ServerRegionReplicaUtil.isDefaultReplica(region.getRegionInfo())) {
1996 return;
1997 }
1998 if (!ServerRegionReplicaUtil.isRegionReplicaReplicationEnabled(region.conf) ||
1999 !ServerRegionReplicaUtil.isRegionReplicaWaitForPrimaryFlushEnabled(
2000 region.conf)) {
2001 region.setReadsEnabled(true);
2002 return;
2003 }
2004
2005 region.setReadsEnabled(false);
2006
2007
2008
2009 this.service.submit(
2010 new RegionReplicaFlushHandler(this, clusterConnection,
2011 rpcRetryingCallerFactory, rpcControllerFactory, operationTimeout, region));
2012 }
2013
2014 @Override
2015 public RpcServerInterface getRpcServer() {
2016 return rpcServices.rpcServer;
2017 }
2018
2019 @VisibleForTesting
2020 public RSRpcServices getRSRpcServices() {
2021 return rpcServices;
2022 }
2023
2024
2025
2026
2027
2028
2029
2030
2031
2032
2033
2034 @Override
2035 public void abort(String reason, Throwable cause) {
2036 String msg = "ABORTING region server " + this + ": " + reason;
2037 if (cause != null) {
2038 LOG.fatal(msg, cause);
2039 } else {
2040 LOG.fatal(msg);
2041 }
2042 this.abortRequested = true;
2043
2044
2045
2046 LOG.fatal("RegionServer abort: loaded coprocessors are: " +
2047 CoprocessorHost.getLoadedCoprocessors());
2048
2049 try {
2050 LOG.info("Dump of metrics as JSON on abort: " + JSONBean.dumpRegionServerMetrics());
2051 } catch (MalformedObjectNameException | IOException e) {
2052 LOG.warn("Failed dumping metrics", e);
2053 }
2054
2055
2056 try {
2057 if (cause != null) {
2058 msg += "\nCause:\n" + StringUtils.stringifyException(cause);
2059 }
2060
2061 if (rssStub != null && this.serverName != null) {
2062 ReportRSFatalErrorRequest.Builder builder =
2063 ReportRSFatalErrorRequest.newBuilder();
2064 ServerName sn =
2065 ServerName.parseVersionedServerName(this.serverName.getVersionedBytes());
2066 builder.setServer(ProtobufUtil.toServerName(sn));
2067 builder.setErrorMessage(msg);
2068 rssStub.reportRSFatalError(null, builder.build());
2069 }
2070 } catch (Throwable t) {
2071 LOG.warn("Unable to report fatal error to master", t);
2072 }
2073 stop(reason);
2074 }
2075
2076
2077
2078
2079 public void abort(String reason) {
2080 abort(reason, null);
2081 }
2082
2083 @Override
2084 public boolean isAborted() {
2085 return this.abortRequested;
2086 }
2087
2088
2089
2090
2091
2092
2093 protected void kill() {
2094 this.killed = true;
2095 abort("Simulated kill");
2096 }
2097
2098
2099
2100
2101 protected void sendShutdownInterrupt() {
2102 }
2103
2104
2105
2106
2107
2108 protected void stopServiceThreads() {
2109
2110 if (this.choreService != null) choreService.shutdown();
2111 if (this.nonceManagerChore != null) nonceManagerChore.cancel(true);
2112 if (this.compactionChecker != null) compactionChecker.cancel(true);
2113 if (this.periodicFlusher != null) periodicFlusher.cancel(true);
2114 if (this.healthCheckChore != null) healthCheckChore.cancel(true);
2115 if (this.storefileRefresher != null) storefileRefresher.cancel(true);
2116 if (this.movedRegionsCleaner != null) movedRegionsCleaner.cancel(true);
2117
2118 if (this.cacheFlusher != null) {
2119 this.cacheFlusher.join();
2120 }
2121
2122 if (this.spanReceiverHost != null) {
2123 this.spanReceiverHost.closeReceivers();
2124 }
2125 if (this.walRoller != null) {
2126 Threads.shutdown(this.walRoller.getThread());
2127 }
2128 final LogRoller metawalRoller = this.metawalRoller.get();
2129 if (metawalRoller != null) {
2130 Threads.shutdown(metawalRoller.getThread());
2131 }
2132 if (this.compactSplitThread != null) {
2133 this.compactSplitThread.join();
2134 }
2135 if (this.service != null) this.service.shutdown();
2136 if (this.replicationSourceHandler != null &&
2137 this.replicationSourceHandler == this.replicationSinkHandler) {
2138 this.replicationSourceHandler.stopReplicationService();
2139 } else {
2140 if (this.replicationSourceHandler != null) {
2141 this.replicationSourceHandler.stopReplicationService();
2142 }
2143 if (this.replicationSinkHandler != null) {
2144 this.replicationSinkHandler.stopReplicationService();
2145 }
2146 }
2147 }
2148
2149
2150
2151
2152
2153 ReplicationSourceService getReplicationSourceService() {
2154 return replicationSourceHandler;
2155 }
2156
2157
2158
2159
2160
2161 ReplicationSinkService getReplicationSinkService() {
2162 return replicationSinkHandler;
2163 }
2164
2165
2166
2167
2168
2169
2170
2171
2172
2173 @VisibleForTesting
2174 protected synchronized ServerName createRegionServerStatusStub() {
2175 if (rssStub != null) {
2176 return masterAddressTracker.getMasterAddress();
2177 }
2178 ServerName sn = null;
2179 long previousLogTime = 0;
2180 boolean refresh = false;
2181 RegionServerStatusService.BlockingInterface intf = null;
2182 boolean interrupted = false;
2183 try {
2184 while (keepLooping()) {
2185 sn = this.masterAddressTracker.getMasterAddress(refresh);
2186 if (sn == null) {
2187 if (!keepLooping()) {
2188
2189 LOG.debug("No master found and cluster is stopped; bailing out");
2190 return null;
2191 }
2192 if (System.currentTimeMillis() > (previousLogTime + 1000)) {
2193 LOG.debug("No master found; retry");
2194 previousLogTime = System.currentTimeMillis();
2195 }
2196 refresh = true;
2197 if (sleep(200)) {
2198 interrupted = true;
2199 }
2200 continue;
2201 }
2202
2203
2204 if (this instanceof HMaster && sn.equals(getServerName())) {
2205 intf = ((HMaster)this).getMasterRpcServices();
2206 break;
2207 }
2208 try {
2209 BlockingRpcChannel channel =
2210 this.rpcClient.createBlockingRpcChannel(sn, userProvider.getCurrent(),
2211 shortOperationTimeout);
2212 intf = RegionServerStatusService.newBlockingStub(channel);
2213 break;
2214 } catch (IOException e) {
2215 if (System.currentTimeMillis() > (previousLogTime + 1000)) {
2216 e = e instanceof RemoteException ?
2217 ((RemoteException)e).unwrapRemoteException() : e;
2218 if (e instanceof ServerNotRunningYetException) {
2219 LOG.info("Master isn't available yet, retrying");
2220 } else {
2221 LOG.warn("Unable to connect to master. Retrying. Error was:", e);
2222 }
2223 previousLogTime = System.currentTimeMillis();
2224 }
2225 if (sleep(200)) {
2226 interrupted = true;
2227 }
2228 }
2229 }
2230 } finally {
2231 if (interrupted) {
2232 Thread.currentThread().interrupt();
2233 }
2234 }
2235 rssStub = intf;
2236 return sn;
2237 }
2238
2239
2240
2241
2242
2243 private boolean keepLooping() {
2244 return !this.stopped && isClusterUp();
2245 }
2246
2247
2248
2249
2250
2251
2252
2253
2254 private RegionServerStartupResponse reportForDuty() throws IOException {
2255 ServerName masterServerName = createRegionServerStatusStub();
2256 if (masterServerName == null) return null;
2257 RegionServerStartupResponse result = null;
2258 try {
2259 rpcServices.requestCount.set(0);
2260 LOG.info("reportForDuty to master=" + masterServerName + " with port="
2261 + rpcServices.isa.getPort() + ", startcode=" + this.startcode);
2262 long now = EnvironmentEdgeManager.currentTime();
2263 int port = rpcServices.isa.getPort();
2264 RegionServerStartupRequest.Builder request = RegionServerStartupRequest.newBuilder();
2265 if (shouldUseThisHostnameInstead()) {
2266 request.setUseThisHostnameInstead(useThisHostnameInstead);
2267 }
2268 request.setPort(port);
2269 request.setServerStartCode(this.startcode);
2270 request.setServerCurrentTime(now);
2271 result = this.rssStub.regionServerStartup(null, request.build());
2272 } catch (ServiceException se) {
2273 IOException ioe = ProtobufUtil.getRemoteException(se);
2274 if (ioe instanceof ClockOutOfSyncException) {
2275 LOG.fatal("Master rejected startup because clock is out of sync", ioe);
2276
2277 throw ioe;
2278 } else if (ioe instanceof ServerNotRunningYetException) {
2279 LOG.debug("Master is not running yet");
2280 } else {
2281 LOG.warn("error telling master we are up", se);
2282 }
2283 rssStub = null;
2284 }
2285 return result;
2286 }
2287
2288 @Override
2289 public RegionStoreSequenceIds getLastSequenceId(byte[] encodedRegionName) {
2290 try {
2291 GetLastFlushedSequenceIdRequest req =
2292 RequestConverter.buildGetLastFlushedSequenceIdRequest(encodedRegionName);
2293 RegionServerStatusService.BlockingInterface rss = rssStub;
2294 if (rss == null) {
2295 createRegionServerStatusStub();
2296 rss = rssStub;
2297 if (rss == null) {
2298
2299 LOG.warn("Unable to connect to the master to check " + "the last flushed sequence id");
2300 return RegionStoreSequenceIds.newBuilder().setLastFlushedSequenceId(HConstants.NO_SEQNUM)
2301 .build();
2302 }
2303 }
2304 GetLastFlushedSequenceIdResponse resp = rss.getLastFlushedSequenceId(null, req);
2305 return RegionStoreSequenceIds.newBuilder()
2306 .setLastFlushedSequenceId(resp.getLastFlushedSequenceId())
2307 .addAllStoreSequenceId(resp.getStoreLastFlushedSequenceIdList()).build();
2308 } catch (ServiceException e) {
2309 LOG.warn("Unable to connect to the master to check the last flushed sequence id", e);
2310 return RegionStoreSequenceIds.newBuilder().setLastFlushedSequenceId(HConstants.NO_SEQNUM)
2311 .build();
2312 }
2313 }
2314
2315
2316
2317
2318
2319
2320 protected void closeAllRegions(final boolean abort) {
2321 closeUserRegions(abort);
2322 closeMetaTableRegions(abort);
2323 }
2324
2325
2326
2327
2328
2329 void closeMetaTableRegions(final boolean abort) {
2330 Region meta = null;
2331 this.lock.writeLock().lock();
2332 try {
2333 for (Map.Entry<String, Region> e: onlineRegions.entrySet()) {
2334 HRegionInfo hri = e.getValue().getRegionInfo();
2335 if (hri.isMetaRegion()) {
2336 meta = e.getValue();
2337 }
2338 if (meta != null) break;
2339 }
2340 } finally {
2341 this.lock.writeLock().unlock();
2342 }
2343 if (meta != null) closeRegionIgnoreErrors(meta.getRegionInfo(), abort);
2344 }
2345
2346
2347
2348
2349
2350
2351
2352 void closeUserRegions(final boolean abort) {
2353 this.lock.writeLock().lock();
2354 try {
2355 for (Map.Entry<String, Region> e: this.onlineRegions.entrySet()) {
2356 Region r = e.getValue();
2357 if (!r.getRegionInfo().isMetaTable() && r.isAvailable()) {
2358
2359 closeRegionIgnoreErrors(r.getRegionInfo(), abort);
2360 }
2361 }
2362 } finally {
2363 this.lock.writeLock().unlock();
2364 }
2365 }
2366
2367
2368 public InfoServer getInfoServer() {
2369 return infoServer;
2370 }
2371
2372
2373
2374
2375 @Override
2376 public boolean isStopped() {
2377 return this.stopped;
2378 }
2379
2380 @Override
2381 public boolean isStopping() {
2382 return this.stopping;
2383 }
2384
2385 @Override
2386 public Map<String, Region> getRecoveringRegions() {
2387 return this.recoveringRegions;
2388 }
2389
2390
2391
2392
2393
2394 @Override
2395 public Configuration getConfiguration() {
2396 return conf;
2397 }
2398
2399
2400 ReentrantReadWriteLock.WriteLock getWriteLock() {
2401 return lock.writeLock();
2402 }
2403
2404 public int getNumberOfOnlineRegions() {
2405 return this.onlineRegions.size();
2406 }
2407
2408 boolean isOnlineRegionsEmpty() {
2409 return this.onlineRegions.isEmpty();
2410 }
2411
2412
2413
2414
2415
2416
2417 public Collection<Region> getOnlineRegionsLocalContext() {
2418 Collection<Region> regions = this.onlineRegions.values();
2419 return Collections.unmodifiableCollection(regions);
2420 }
2421
2422 @Override
2423 public void addToOnlineRegions(Region region) {
2424 this.onlineRegions.put(region.getRegionInfo().getEncodedName(), region);
2425 configurationManager.registerObserver(region);
2426 }
2427
2428
2429
2430
2431
2432
2433 SortedMap<Long, Region> getCopyOfOnlineRegionsSortedBySize() {
2434
2435 SortedMap<Long, Region> sortedRegions = new TreeMap<Long, Region>(
2436 new Comparator<Long>() {
2437 @Override
2438 public int compare(Long a, Long b) {
2439 return -1 * a.compareTo(b);
2440 }
2441 });
2442
2443 for (Region region : this.onlineRegions.values()) {
2444 sortedRegions.put(region.getMemstoreSize(), region);
2445 }
2446 return sortedRegions;
2447 }
2448
2449
2450
2451
2452 public long getStartcode() {
2453 return this.startcode;
2454 }
2455
2456
2457 @Override
2458 public FlushRequester getFlushRequester() {
2459 return this.cacheFlusher;
2460 }
2461
2462
2463
2464
2465
2466
2467
2468 protected HRegionInfo[] getMostLoadedRegions() {
2469 ArrayList<HRegionInfo> regions = new ArrayList<HRegionInfo>();
2470 for (Region r : onlineRegions.values()) {
2471 if (!r.isAvailable()) {
2472 continue;
2473 }
2474 if (regions.size() < numRegionsToReport) {
2475 regions.add(r.getRegionInfo());
2476 } else {
2477 break;
2478 }
2479 }
2480 return regions.toArray(new HRegionInfo[regions.size()]);
2481 }
2482
2483 @Override
2484 public Leases getLeases() {
2485 return leases;
2486 }
2487
2488
2489
2490
2491 protected Path getRootDir() {
2492 return rootDir;
2493 }
2494
2495
2496
2497
2498 @Override
2499 public FileSystem getFileSystem() {
2500 return fs;
2501 }
2502
2503 @Override
2504 public String toString() {
2505 return getServerName().toString();
2506 }
2507
2508
2509
2510
2511
2512
2513 public int getThreadWakeFrequency() {
2514 return threadWakeFrequency;
2515 }
2516
2517 @Override
2518 public ZooKeeperWatcher getZooKeeper() {
2519 return zooKeeper;
2520 }
2521
2522 @Override
2523 public BaseCoordinatedStateManager getCoordinatedStateManager() {
2524 return csm;
2525 }
2526
2527 @Override
2528 public ServerName getServerName() {
2529 return serverName;
2530 }
2531
2532 @Override
2533 public CompactionRequestor getCompactionRequester() {
2534 return this.compactSplitThread;
2535 }
2536
2537 public RegionServerCoprocessorHost getRegionServerCoprocessorHost(){
2538 return this.rsHost;
2539 }
2540
2541 @Override
2542 public ConcurrentMap<byte[], Boolean> getRegionsInTransitionInRS() {
2543 return this.regionsInTransitionInRS;
2544 }
2545
2546 @Override
2547 public ExecutorService getExecutorService() {
2548 return service;
2549 }
2550
2551 @Override
2552 public ChoreService getChoreService() {
2553 return choreService;
2554 }
2555
2556 @Override
2557 public RegionServerQuotaManager getRegionServerQuotaManager() {
2558 return rsQuotaManager;
2559 }
2560
2561
2562
2563
2564
2565
2566
2567
2568 static private void createNewReplicationInstance(Configuration conf,
2569 HRegionServer server, FileSystem fs, Path logDir, Path oldLogDir) throws IOException{
2570
2571
2572 if (!conf.getBoolean(HConstants.REPLICATION_ENABLE_KEY,
2573 HConstants.REPLICATION_ENABLE_DEFAULT)) {
2574 return;
2575 }
2576
2577
2578 String sourceClassname = conf.get(HConstants.REPLICATION_SOURCE_SERVICE_CLASSNAME,
2579 HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT);
2580
2581
2582 String sinkClassname = conf.get(HConstants.REPLICATION_SINK_SERVICE_CLASSNAME,
2583 HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT);
2584
2585
2586
2587 if (sourceClassname.equals(sinkClassname)) {
2588 server.replicationSourceHandler = (ReplicationSourceService)
2589 newReplicationInstance(sourceClassname,
2590 conf, server, fs, logDir, oldLogDir);
2591 server.replicationSinkHandler = (ReplicationSinkService)
2592 server.replicationSourceHandler;
2593 } else {
2594 server.replicationSourceHandler = (ReplicationSourceService)
2595 newReplicationInstance(sourceClassname,
2596 conf, server, fs, logDir, oldLogDir);
2597 server.replicationSinkHandler = (ReplicationSinkService)
2598 newReplicationInstance(sinkClassname,
2599 conf, server, fs, logDir, oldLogDir);
2600 }
2601 }
2602
2603 static private ReplicationService newReplicationInstance(String classname,
2604 Configuration conf, HRegionServer server, FileSystem fs, Path logDir,
2605 Path oldLogDir) throws IOException{
2606
2607 Class<?> clazz = null;
2608 try {
2609 ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
2610 clazz = Class.forName(classname, true, classLoader);
2611 } catch (java.lang.ClassNotFoundException nfe) {
2612 throw new IOException("Could not find class for " + classname);
2613 }
2614
2615
2616 ReplicationService service = (ReplicationService)
2617 ReflectionUtils.newInstance(clazz, conf);
2618 service.initialize(server, fs, logDir, oldLogDir);
2619 return service;
2620 }
2621
2622
2623
2624
2625
2626
2627
2628
2629 public static HRegionServer constructRegionServer(
2630 Class<? extends HRegionServer> regionServerClass,
2631 final Configuration conf2, CoordinatedStateManager cp) {
2632 try {
2633 Constructor<? extends HRegionServer> c = regionServerClass
2634 .getConstructor(Configuration.class, CoordinatedStateManager.class);
2635 return c.newInstance(conf2, cp);
2636 } catch (Exception e) {
2637 throw new RuntimeException("Failed construction of " + "Regionserver: "
2638 + regionServerClass.toString(), e);
2639 }
2640 }
2641
2642
2643
2644
2645 public static void main(String[] args) throws Exception {
2646 VersionInfo.logVersion();
2647 Configuration conf = HBaseConfiguration.create();
2648 @SuppressWarnings("unchecked")
2649 Class<? extends HRegionServer> regionServerClass = (Class<? extends HRegionServer>) conf
2650 .getClass(HConstants.REGION_SERVER_IMPL, HRegionServer.class);
2651
2652 new HRegionServerCommandLine(regionServerClass).doMain(args);
2653 }
2654
2655
2656
2657
2658
2659
2660
2661
2662
2663
2664
2665 @Override
2666 public List<Region> getOnlineRegions(TableName tableName) {
2667 List<Region> tableRegions = new ArrayList<Region>();
2668 synchronized (this.onlineRegions) {
2669 for (Region region: this.onlineRegions.values()) {
2670 HRegionInfo regionInfo = region.getRegionInfo();
2671 if(regionInfo.getTable().equals(tableName)) {
2672 tableRegions.add(region);
2673 }
2674 }
2675 }
2676 return tableRegions;
2677 }
2678
2679
2680
2681
2682
2683
2684 @Override
2685 public Set<TableName> getOnlineTables() {
2686 Set<TableName> tables = new HashSet<TableName>();
2687 synchronized (this.onlineRegions) {
2688 for (Region region: this.onlineRegions.values()) {
2689 tables.add(region.getTableDesc().getTableName());
2690 }
2691 }
2692 return tables;
2693 }
2694
2695
2696 public String[] getRegionServerCoprocessors() {
2697 TreeSet<String> coprocessors = new TreeSet<String>();
2698 try {
2699 coprocessors.addAll(getWAL(null).getCoprocessorHost().getCoprocessors());
2700 } catch (IOException exception) {
2701 LOG.warn("Exception attempting to fetch wal coprocessor information for the common wal; " +
2702 "skipping.");
2703 LOG.debug("Exception details for failure to fetch wal coprocessor information.", exception);
2704 }
2705 Collection<Region> regions = getOnlineRegionsLocalContext();
2706 for (Region region: regions) {
2707 coprocessors.addAll(region.getCoprocessorHost().getCoprocessors());
2708 try {
2709 coprocessors.addAll(getWAL(region.getRegionInfo()).getCoprocessorHost().getCoprocessors());
2710 } catch (IOException exception) {
2711 LOG.warn("Exception attempting to fetch wal coprocessor information for region " + region +
2712 "; skipping.");
2713 LOG.debug("Exception details for failure to fetch wal coprocessor information.", exception);
2714 }
2715 }
2716 return coprocessors.toArray(new String[coprocessors.size()]);
2717 }
2718
2719
2720
2721
2722
2723 private void closeRegionIgnoreErrors(HRegionInfo region, final boolean abort) {
2724 try {
2725 CloseRegionCoordination.CloseRegionDetails details =
2726 csm.getCloseRegionCoordination().getDetaultDetails();
2727 if (!closeRegion(region.getEncodedName(), abort, details, null)) {
2728 LOG.warn("Failed to close " + region.getRegionNameAsString() +
2729 " - ignoring and continuing");
2730 }
2731 } catch (IOException e) {
2732 LOG.warn("Failed to close " + region.getRegionNameAsString() +
2733 " - ignoring and continuing", e);
2734 }
2735 }
2736
2737
2738
2739
2740
2741
2742
2743
2744
2745
2746
2747
2748
2749
2750
2751
2752
2753
2754
2755
2756
2757 protected boolean closeRegion(String encodedName, final boolean abort,
2758 CloseRegionCoordination.CloseRegionDetails crd, final ServerName sn)
2759 throws NotServingRegionException, RegionAlreadyInTransitionException {
2760
2761 Region actualRegion = this.getFromOnlineRegions(encodedName);
2762
2763 if ((actualRegion != null) && (actualRegion.getCoprocessorHost() != null)) {
2764 try {
2765 actualRegion.getCoprocessorHost().preClose(false);
2766 } catch (IOException exp) {
2767 LOG.warn("Unable to close region: the coprocessor launched an error ", exp);
2768 return false;
2769 }
2770 }
2771
2772 final Boolean previous = this.regionsInTransitionInRS.putIfAbsent(encodedName.getBytes(),
2773 Boolean.FALSE);
2774
2775 if (Boolean.TRUE.equals(previous)) {
2776 LOG.info("Received CLOSE for the region:" + encodedName + " , which we are already " +
2777 "trying to OPEN. Cancelling OPENING.");
2778 if (!regionsInTransitionInRS.replace(encodedName.getBytes(), previous, Boolean.FALSE)){
2779
2780
2781 LOG.warn("The opening for region " + encodedName + " was done before we could cancel it." +
2782 " Doing a standard close now");
2783 return closeRegion(encodedName, abort, crd, sn);
2784 }
2785
2786 actualRegion = this.getFromOnlineRegions(encodedName);
2787 if (actualRegion == null) {
2788 LOG.info("The opening previously in progress has been cancelled by a CLOSE request.");
2789
2790 throw new RegionAlreadyInTransitionException("The region " + encodedName +
2791 " was opening but not yet served. Opening is cancelled.");
2792 }
2793 } else if (Boolean.FALSE.equals(previous)) {
2794 LOG.info("Received CLOSE for the region: " + encodedName +
2795 ", which we are already trying to CLOSE, but not completed yet");
2796
2797
2798
2799
2800
2801
2802 throw new RegionAlreadyInTransitionException("The region " + encodedName +
2803 " was already closing. New CLOSE request is ignored.");
2804 }
2805
2806 if (actualRegion == null) {
2807 LOG.error("Received CLOSE for a region which is not online, and we're not opening.");
2808 this.regionsInTransitionInRS.remove(encodedName.getBytes());
2809
2810 throw new NotServingRegionException("The region " + encodedName +
2811 " is not online, and is not opening.");
2812 }
2813
2814 CloseRegionHandler crh;
2815 final HRegionInfo hri = actualRegion.getRegionInfo();
2816 if (hri.isMetaRegion()) {
2817 crh = new CloseMetaHandler(this, this, hri, abort,
2818 csm.getCloseRegionCoordination(), crd);
2819 } else {
2820 crh = new CloseRegionHandler(this, this, hri, abort,
2821 csm.getCloseRegionCoordination(), crd, sn);
2822 }
2823 this.service.submit(crh);
2824 return true;
2825 }
2826
2827
2828
2829
2830
2831
2832 public Region getOnlineRegion(final byte[] regionName) {
2833 String encodedRegionName = HRegionInfo.encodeRegionName(regionName);
2834 return this.onlineRegions.get(encodedRegionName);
2835 }
2836
2837 public InetSocketAddress[] getRegionBlockLocations(final String encodedRegionName) {
2838 return this.regionFavoredNodesMap.get(encodedRegionName);
2839 }
2840
2841 @Override
2842 public Region getFromOnlineRegions(final String encodedRegionName) {
2843 return this.onlineRegions.get(encodedRegionName);
2844 }
2845
2846
2847 @Override
2848 public boolean removeFromOnlineRegions(final Region r, ServerName destination) {
2849 Region toReturn = this.onlineRegions.remove(r.getRegionInfo().getEncodedName());
2850
2851 if (destination != null) {
2852 long closeSeqNum = r.getMaxFlushedSeqId();
2853 if (closeSeqNum == HConstants.NO_SEQNUM) {
2854
2855 closeSeqNum = r.getOpenSeqNum();
2856 if (closeSeqNum == HConstants.NO_SEQNUM) {
2857 closeSeqNum = 0;
2858 }
2859 }
2860 addToMovedRegions(r.getRegionInfo().getEncodedName(), destination, closeSeqNum);
2861 }
2862 this.regionFavoredNodesMap.remove(r.getRegionInfo().getEncodedName());
2863 return toReturn != null;
2864 }
2865
2866
2867
2868
2869
2870
2871
2872
2873
2874 protected Region getRegion(final byte[] regionName)
2875 throws NotServingRegionException {
2876 String encodedRegionName = HRegionInfo.encodeRegionName(regionName);
2877 return getRegionByEncodedName(regionName, encodedRegionName);
2878 }
2879
2880 public Region getRegionByEncodedName(String encodedRegionName)
2881 throws NotServingRegionException {
2882 return getRegionByEncodedName(null, encodedRegionName);
2883 }
2884
2885 protected Region getRegionByEncodedName(byte[] regionName, String encodedRegionName)
2886 throws NotServingRegionException {
2887 Region region = this.onlineRegions.get(encodedRegionName);
2888 if (region == null) {
2889 MovedRegionInfo moveInfo = getMovedRegion(encodedRegionName);
2890 if (moveInfo != null) {
2891 throw new RegionMovedException(moveInfo.getServerName(), moveInfo.getSeqNum());
2892 }
2893 Boolean isOpening = this.regionsInTransitionInRS.get(Bytes.toBytes(encodedRegionName));
2894 String regionNameStr = regionName == null?
2895 encodedRegionName: Bytes.toStringBinary(regionName);
2896 if (isOpening != null && isOpening.booleanValue()) {
2897 throw new RegionOpeningException("Region " + regionNameStr +
2898 " is opening on " + this.serverName);
2899 }
2900 throw new NotServingRegionException("Region " + regionNameStr +
2901 " is not online on " + this.serverName);
2902 }
2903 return region;
2904 }
2905
2906
2907
2908
2909
2910
2911
2912
2913
2914
2915
2916 private Throwable cleanup(final Throwable t, final String msg) {
2917
2918 if (t instanceof NotServingRegionException) {
2919 LOG.debug("NotServingRegionException; " + t.getMessage());
2920 return t;
2921 }
2922 if (msg == null) {
2923 LOG.error("", RemoteExceptionHandler.checkThrowable(t));
2924 } else {
2925 LOG.error(msg, RemoteExceptionHandler.checkThrowable(t));
2926 }
2927 if (!rpcServices.checkOOME(t)) {
2928 checkFileSystem();
2929 }
2930 return t;
2931 }
2932
2933
2934
2935
2936
2937
2938
2939
2940 protected IOException convertThrowableToIOE(final Throwable t, final String msg) {
2941 return (t instanceof IOException ? (IOException) t : msg == null
2942 || msg.length() == 0 ? new IOException(t) : new IOException(msg, t));
2943 }
2944
2945
2946
2947
2948
2949
2950
2951 public boolean checkFileSystem() {
2952 if (this.fsOk && this.fs != null) {
2953 try {
2954 FSUtils.checkFileSystemAvailable(this.fs);
2955 } catch (IOException e) {
2956 abort("File System not available", e);
2957 this.fsOk = false;
2958 }
2959 }
2960 return this.fsOk;
2961 }
2962
2963 @Override
2964 public void updateRegionFavoredNodesMapping(String encodedRegionName,
2965 List<org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName> favoredNodes) {
2966 InetSocketAddress[] addr = new InetSocketAddress[favoredNodes.size()];
2967
2968
2969 for (int i = 0; i < favoredNodes.size(); i++) {
2970 addr[i] = InetSocketAddress.createUnresolved(favoredNodes.get(i).getHostName(),
2971 favoredNodes.get(i).getPort());
2972 }
2973 regionFavoredNodesMap.put(encodedRegionName, addr);
2974 }
2975
2976
2977
2978
2979
2980
2981
2982 @Override
2983 public InetSocketAddress[] getFavoredNodesForRegion(String encodedRegionName) {
2984 return regionFavoredNodesMap.get(encodedRegionName);
2985 }
2986
2987 @Override
2988 public ServerNonceManager getNonceManager() {
2989 return this.nonceManager;
2990 }
2991
2992 private static class MovedRegionInfo {
2993 private final ServerName serverName;
2994 private final long seqNum;
2995 private final long ts;
2996
2997 public MovedRegionInfo(ServerName serverName, long closeSeqNum) {
2998 this.serverName = serverName;
2999 this.seqNum = closeSeqNum;
3000 ts = EnvironmentEdgeManager.currentTime();
3001 }
3002
3003 public ServerName getServerName() {
3004 return serverName;
3005 }
3006
3007 public long getSeqNum() {
3008 return seqNum;
3009 }
3010
3011 public long getMoveTime() {
3012 return ts;
3013 }
3014 }
3015
3016
3017
3018 protected Map<String, MovedRegionInfo> movedRegions =
3019 new ConcurrentHashMap<String, MovedRegionInfo>(3000);
3020
3021
3022
3023 private static final int TIMEOUT_REGION_MOVED = (2 * 60 * 1000);
3024
3025 protected void addToMovedRegions(String encodedName, ServerName destination, long closeSeqNum) {
3026 if (ServerName.isSameHostnameAndPort(destination, this.getServerName())) {
3027 LOG.warn("Not adding moved region record: " + encodedName + " to self.");
3028 return;
3029 }
3030 LOG.info("Adding moved region record: "
3031 + encodedName + " to " + destination + " as of " + closeSeqNum);
3032 movedRegions.put(encodedName, new MovedRegionInfo(destination, closeSeqNum));
3033 }
3034
3035 void removeFromMovedRegions(String encodedName) {
3036 movedRegions.remove(encodedName);
3037 }
3038
3039 private MovedRegionInfo getMovedRegion(final String encodedRegionName) {
3040 MovedRegionInfo dest = movedRegions.get(encodedRegionName);
3041
3042 long now = EnvironmentEdgeManager.currentTime();
3043 if (dest != null) {
3044 if (dest.getMoveTime() > (now - TIMEOUT_REGION_MOVED)) {
3045 return dest;
3046 } else {
3047 movedRegions.remove(encodedRegionName);
3048 }
3049 }
3050
3051 return null;
3052 }
3053
3054
3055
3056
3057 protected void cleanMovedRegions() {
3058 final long cutOff = System.currentTimeMillis() - TIMEOUT_REGION_MOVED;
3059 Iterator<Entry<String, MovedRegionInfo>> it = movedRegions.entrySet().iterator();
3060
3061 while (it.hasNext()){
3062 Map.Entry<String, MovedRegionInfo> e = it.next();
3063 if (e.getValue().getMoveTime() < cutOff) {
3064 it.remove();
3065 }
3066 }
3067 }
3068
3069
3070
3071
3072
3073 protected int movedRegionCleanerPeriod() {
3074 return TIMEOUT_REGION_MOVED;
3075 }
3076
3077
3078
3079
3080
3081 protected final static class MovedRegionsCleaner extends ScheduledChore implements Stoppable {
3082 private HRegionServer regionServer;
3083 Stoppable stoppable;
3084
3085 private MovedRegionsCleaner(
3086 HRegionServer regionServer, Stoppable stoppable){
3087 super("MovedRegionsCleaner for region " + regionServer, stoppable,
3088 regionServer.movedRegionCleanerPeriod());
3089 this.regionServer = regionServer;
3090 this.stoppable = stoppable;
3091 }
3092
3093 static MovedRegionsCleaner create(HRegionServer rs){
3094 Stoppable stoppable = new Stoppable() {
3095 private volatile boolean isStopped = false;
3096 @Override public void stop(String why) { isStopped = true;}
3097 @Override public boolean isStopped() {return isStopped;}
3098 };
3099
3100 return new MovedRegionsCleaner(rs, stoppable);
3101 }
3102
3103 @Override
3104 protected void chore() {
3105 regionServer.cleanMovedRegions();
3106 }
3107
3108 @Override
3109 public void stop(String why) {
3110 stoppable.stop(why);
3111 }
3112
3113 @Override
3114 public boolean isStopped() {
3115 return stoppable.isStopped();
3116 }
3117 }
3118
3119 private String getMyEphemeralNodePath() {
3120 return ZKUtil.joinZNode(this.zooKeeper.rsZNode, getServerName().toString());
3121 }
3122
3123 private boolean isHealthCheckerConfigured() {
3124 String healthScriptLocation = this.conf.get(HConstants.HEALTH_SCRIPT_LOC);
3125 return org.apache.commons.lang.StringUtils.isNotBlank(healthScriptLocation);
3126 }
3127
3128
3129
3130
3131 public CompactSplitThread getCompactSplitThread() {
3132 return this.compactSplitThread;
3133 }
3134
3135
3136
3137
3138
3139
3140
3141
3142 private void updateRecoveringRegionLastFlushedSequenceId(Region r) throws KeeperException,
3143 IOException {
3144 if (!r.isRecovering()) {
3145
3146 return;
3147 }
3148
3149 HRegionInfo regionInfo = r.getRegionInfo();
3150 ZooKeeperWatcher zkw = getZooKeeper();
3151 String previousRSName = this.getLastFailedRSFromZK(regionInfo.getEncodedName());
3152 Map<byte[], Long> maxSeqIdInStores = r.getMaxStoreSeqId();
3153 long minSeqIdForLogReplay = -1;
3154 for (Long storeSeqIdForReplay : maxSeqIdInStores.values()) {
3155 if (minSeqIdForLogReplay == -1 || storeSeqIdForReplay < minSeqIdForLogReplay) {
3156 minSeqIdForLogReplay = storeSeqIdForReplay;
3157 }
3158 }
3159
3160 try {
3161 long lastRecordedFlushedSequenceId = -1;
3162 String nodePath = ZKUtil.joinZNode(this.zooKeeper.recoveringRegionsZNode,
3163 regionInfo.getEncodedName());
3164
3165 byte[] data;
3166 try {
3167 data = ZKUtil.getData(zkw, nodePath);
3168 } catch (InterruptedException e) {
3169 throw new InterruptedIOException();
3170 }
3171 if (data != null) {
3172 lastRecordedFlushedSequenceId = ZKSplitLog.parseLastFlushedSequenceIdFrom(data);
3173 }
3174 if (data == null || lastRecordedFlushedSequenceId < minSeqIdForLogReplay) {
3175 ZKUtil.setData(zkw, nodePath, ZKUtil.positionToByteArray(minSeqIdForLogReplay));
3176 }
3177 if (previousRSName != null) {
3178
3179 nodePath = ZKUtil.joinZNode(nodePath, previousRSName);
3180 ZKUtil.setData(zkw, nodePath,
3181 ZKUtil.regionSequenceIdsToByteArray(minSeqIdForLogReplay, maxSeqIdInStores));
3182 LOG.debug("Update last flushed sequence id of region " + regionInfo.getEncodedName() +
3183 " for " + previousRSName);
3184 } else {
3185 LOG.warn("Can't find failed region server for recovering region " +
3186 regionInfo.getEncodedName());
3187 }
3188 } catch (NoNodeException ignore) {
3189 LOG.debug("Region " + regionInfo.getEncodedName() +
3190 " must have completed recovery because its recovery znode has been removed", ignore);
3191 }
3192 }
3193
3194
3195
3196
3197
3198
3199 private String getLastFailedRSFromZK(String encodedRegionName) throws KeeperException {
3200 String result = null;
3201 long maxZxid = 0;
3202 ZooKeeperWatcher zkw = this.getZooKeeper();
3203 String nodePath = ZKUtil.joinZNode(zkw.recoveringRegionsZNode, encodedRegionName);
3204 List<String> failedServers = ZKUtil.listChildrenNoWatch(zkw, nodePath);
3205 if (failedServers == null || failedServers.isEmpty()) {
3206 return result;
3207 }
3208 for (String failedServer : failedServers) {
3209 String rsPath = ZKUtil.joinZNode(nodePath, failedServer);
3210 Stat stat = new Stat();
3211 ZKUtil.getDataNoWatch(zkw, rsPath, stat);
3212 if (maxZxid < stat.getCzxid()) {
3213 maxZxid = stat.getCzxid();
3214 result = failedServer;
3215 }
3216 }
3217 return result;
3218 }
3219
3220 public CoprocessorServiceResponse execRegionServerService(final RpcController controller,
3221 final CoprocessorServiceRequest serviceRequest) throws ServiceException {
3222 try {
3223 ServerRpcController execController = new ServerRpcController();
3224 CoprocessorServiceCall call = serviceRequest.getCall();
3225 String serviceName = call.getServiceName();
3226 String methodName = call.getMethodName();
3227 if (!coprocessorServiceHandlers.containsKey(serviceName)) {
3228 throw new UnknownProtocolException(null,
3229 "No registered coprocessor service found for name " + serviceName);
3230 }
3231 Service service = coprocessorServiceHandlers.get(serviceName);
3232 Descriptors.ServiceDescriptor serviceDesc = service.getDescriptorForType();
3233 Descriptors.MethodDescriptor methodDesc = serviceDesc.findMethodByName(methodName);
3234 if (methodDesc == null) {
3235 throw new UnknownProtocolException(service.getClass(), "Unknown method " + methodName
3236 + " called on service " + serviceName);
3237 }
3238 Message.Builder builderForType = service.getRequestPrototype(methodDesc).newBuilderForType();
3239 ProtobufUtil.mergeFrom(builderForType, call.getRequest());
3240 Message request = builderForType.build();
3241 final Message.Builder responseBuilder =
3242 service.getResponsePrototype(methodDesc).newBuilderForType();
3243 service.callMethod(methodDesc, controller, request, new RpcCallback<Message>() {
3244 @Override
3245 public void run(Message message) {
3246 if (message != null) {
3247 responseBuilder.mergeFrom(message);
3248 }
3249 }
3250 });
3251 Message execResult = responseBuilder.build();
3252 if (execController.getFailedOn() != null) {
3253 throw execController.getFailedOn();
3254 }
3255 ClientProtos.CoprocessorServiceResponse.Builder builder =
3256 ClientProtos.CoprocessorServiceResponse.newBuilder();
3257 builder.setRegion(RequestConverter.buildRegionSpecifier(RegionSpecifierType.REGION_NAME,
3258 HConstants.EMPTY_BYTE_ARRAY));
3259 builder.setValue(builder.getValueBuilder().setName(execResult.getClass().getName())
3260 .setValue(execResult.toByteString()));
3261 return builder.build();
3262 } catch (IOException ie) {
3263 throw new ServiceException(ie);
3264 }
3265 }
3266
3267
3268
3269
3270 public CacheConfig getCacheConfig() {
3271 return this.cacheConfig;
3272 }
3273
3274
3275
3276
3277 protected ConfigurationManager getConfigurationManager() {
3278 return configurationManager;
3279 }
3280
3281
3282
3283
3284 public TableDescriptors getTableDescriptors() {
3285 return this.tableDescriptors;
3286 }
3287
3288
3289
3290
3291 public void updateConfiguration() {
3292 LOG.info("Reloading the configuration from disk.");
3293
3294 conf.reloadConfiguration();
3295 configurationManager.notifyAllObservers(conf);
3296 }
3297
3298 @Override
3299 public HeapMemoryManager getHeapMemoryManager() {
3300 return hMemManager;
3301 }
3302
3303 @Override
3304 public double getCompactionPressure() {
3305 double max = 0;
3306 for (Region region : onlineRegions.values()) {
3307 for (Store store : region.getStores()) {
3308 double normCount = store.getCompactionPressure();
3309 if (normCount > max) {
3310 max = normCount;
3311 }
3312 }
3313 }
3314 return max;
3315 }
3316 }