1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.regionserver;
20
21 import java.io.IOException;
22 import java.io.InterruptedIOException;
23 import java.lang.Thread.UncaughtExceptionHandler;
24 import java.lang.management.ManagementFactory;
25 import java.lang.management.MemoryUsage;
26 import java.lang.reflect.Constructor;
27 import java.net.BindException;
28 import java.net.InetAddress;
29 import java.net.InetSocketAddress;
30 import java.security.PrivilegedExceptionAction;
31 import java.util.ArrayList;
32 import java.util.Collection;
33 import java.util.Collections;
34 import java.util.Comparator;
35 import java.util.HashMap;
36 import java.util.HashSet;
37 import java.util.Iterator;
38 import java.util.List;
39 import java.util.Map;
40 import java.util.Map.Entry;
41 import java.util.Set;
42 import java.util.SortedMap;
43 import java.util.TreeMap;
44 import java.util.TreeSet;
45 import java.util.concurrent.ConcurrentHashMap;
46 import java.util.concurrent.ConcurrentMap;
47 import java.util.concurrent.ConcurrentSkipListMap;
48 import java.util.concurrent.atomic.AtomicBoolean;
49 import java.util.concurrent.atomic.AtomicReference;
50 import java.util.concurrent.locks.ReentrantReadWriteLock;
51
52 import javax.management.MalformedObjectNameException;
53 import javax.management.ObjectName;
54 import javax.servlet.http.HttpServlet;
55
56 import org.apache.commons.lang.SystemUtils;
57 import org.apache.commons.lang.math.RandomUtils;
58 import org.apache.commons.logging.Log;
59 import org.apache.commons.logging.LogFactory;
60 import org.apache.hadoop.conf.Configuration;
61 import org.apache.hadoop.fs.FileSystem;
62 import org.apache.hadoop.fs.Path;
63 import org.apache.hadoop.hbase.ChoreService;
64 import org.apache.hadoop.hbase.ClockOutOfSyncException;
65 import org.apache.hadoop.hbase.CoordinatedStateManager;
66 import org.apache.hadoop.hbase.CoordinatedStateManagerFactory;
67 import org.apache.hadoop.hbase.HBaseConfiguration;
68 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
69 import org.apache.hadoop.hbase.HConstants;
70 import org.apache.hadoop.hbase.HRegionInfo;
71 import org.apache.hadoop.hbase.HealthCheckChore;
72 import org.apache.hadoop.hbase.MetaTableAccessor;
73 import org.apache.hadoop.hbase.NotServingRegionException;
74 import org.apache.hadoop.hbase.RemoteExceptionHandler;
75 import org.apache.hadoop.hbase.ScheduledChore;
76 import org.apache.hadoop.hbase.ServerName;
77 import org.apache.hadoop.hbase.Stoppable;
78 import org.apache.hadoop.hbase.TableDescriptors;
79 import org.apache.hadoop.hbase.TableName;
80 import org.apache.hadoop.hbase.YouAreDeadException;
81 import org.apache.hadoop.hbase.ZNodeClearer;
82 import org.apache.hadoop.hbase.classification.InterfaceAudience;
83 import org.apache.hadoop.hbase.client.ClusterConnection;
84 import org.apache.hadoop.hbase.client.ConnectionUtils;
85 import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
86 import org.apache.hadoop.hbase.conf.ConfigurationManager;
87 import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
88 import org.apache.hadoop.hbase.coordination.CloseRegionCoordination;
89 import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination;
90 import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
91 import org.apache.hadoop.hbase.exceptions.RegionMovedException;
92 import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
93 import org.apache.hadoop.hbase.exceptions.UnknownProtocolException;
94 import org.apache.hadoop.hbase.executor.ExecutorService;
95 import org.apache.hadoop.hbase.executor.ExecutorType;
96 import org.apache.hadoop.hbase.fs.HFileSystem;
97 import org.apache.hadoop.hbase.http.InfoServer;
98 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
99 import org.apache.hadoop.hbase.io.util.HeapMemorySizeUtil;
100 import org.apache.hadoop.hbase.ipc.RpcClient;
101 import org.apache.hadoop.hbase.ipc.RpcClientFactory;
102 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
103 import org.apache.hadoop.hbase.ipc.RpcServerInterface;
104 import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
105 import org.apache.hadoop.hbase.ipc.ServerRpcController;
106 import org.apache.hadoop.hbase.master.HMaster;
107 import org.apache.hadoop.hbase.master.RegionState.State;
108 import org.apache.hadoop.hbase.master.TableLockManager;
109 import org.apache.hadoop.hbase.procedure.RegionServerProcedureManagerHost;
110 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
111 import org.apache.hadoop.hbase.protobuf.RequestConverter;
112 import org.apache.hadoop.hbase.protobuf.ResponseConverter;
113 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
114 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceCall;
115 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
116 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
117 import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos;
118 import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.RegionLoad;
119 import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
120 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.Coprocessor;
121 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.Coprocessor.Builder;
122 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair;
123 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionServerInfo;
124 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier;
125 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
126 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdRequest;
127 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdResponse;
128 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerReportRequest;
129 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest;
130 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupResponse;
131 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStatusService;
132 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition;
133 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
134 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorRequest;
135 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest;
136 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse;
137 import org.apache.hadoop.hbase.quotas.RegionServerQuotaManager;
138 import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
139 import org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler;
140 import org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler;
141 import org.apache.hadoop.hbase.regionserver.handler.RegionReplicaFlushHandler;
142 import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL;
143 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
144 import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad;
145 import org.apache.hadoop.hbase.security.Superusers;
146 import org.apache.hadoop.hbase.security.User;
147 import org.apache.hadoop.hbase.security.UserProvider;
148 import org.apache.hadoop.hbase.trace.SpanReceiverHost;
149 import org.apache.hadoop.hbase.util.Addressing;
150 import org.apache.hadoop.hbase.util.ByteStringer;
151 import org.apache.hadoop.hbase.util.Bytes;
152 import org.apache.hadoop.hbase.util.CompressionTest;
153 import org.apache.hadoop.hbase.util.ConfigUtil;
154 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
155 import org.apache.hadoop.hbase.util.FSTableDescriptors;
156 import org.apache.hadoop.hbase.util.FSUtils;
157 import org.apache.hadoop.hbase.util.HasThread;
158 import org.apache.hadoop.hbase.util.JSONBean;
159 import org.apache.hadoop.hbase.util.JvmPauseMonitor;
160 import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
161 import org.apache.hadoop.hbase.util.Sleeper;
162 import org.apache.hadoop.hbase.util.Threads;
163 import org.apache.hadoop.hbase.util.VersionInfo;
164 import org.apache.hadoop.hbase.wal.DefaultWALProvider;
165 import org.apache.hadoop.hbase.wal.WAL;
166 import org.apache.hadoop.hbase.wal.WALFactory;
167 import org.apache.hadoop.hbase.zookeeper.ClusterStatusTracker;
168 import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
169 import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
170 import org.apache.hadoop.hbase.zookeeper.RecoveringRegionWatcher;
171 import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
172 import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
173 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
174 import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker;
175 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
176 import org.apache.hadoop.ipc.RemoteException;
177 import org.apache.hadoop.metrics.util.MBeanUtil;
178 import org.apache.hadoop.util.ReflectionUtils;
179 import org.apache.hadoop.util.StringUtils;
180 import org.apache.zookeeper.KeeperException;
181 import org.apache.zookeeper.KeeperException.NoNodeException;
182 import org.apache.zookeeper.data.Stat;
183
184 import com.google.common.annotations.VisibleForTesting;
185 import com.google.common.base.Preconditions;
186 import com.google.common.collect.Maps;
187 import com.google.protobuf.BlockingRpcChannel;
188 import com.google.protobuf.Descriptors;
189 import com.google.protobuf.Message;
190 import com.google.protobuf.RpcCallback;
191 import com.google.protobuf.RpcController;
192 import com.google.protobuf.Service;
193 import com.google.protobuf.ServiceException;
194 import sun.misc.Signal;
195 import sun.misc.SignalHandler;
196
197
198
199
200
201 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
202 @SuppressWarnings("deprecation")
203 public class HRegionServer extends HasThread implements
204 RegionServerServices, LastSequenceId {
205
206 private static final Log LOG = LogFactory.getLog(HRegionServer.class);
207
208
209
210
211
212 protected static final String OPEN = "OPEN";
213 protected static final String CLOSE = "CLOSE";
214
215
216
217
218 protected final ConcurrentMap<byte[], Boolean> regionsInTransitionInRS =
219 new ConcurrentSkipListMap<byte[], Boolean>(Bytes.BYTES_COMPARATOR);
220
221
222 protected MemStoreFlusher cacheFlusher;
223
224 protected HeapMemoryManager hMemManager;
225
226
227
228
229
230
231 protected ClusterConnection clusterConnection;
232
233
234
235
236
237
238
239 protected MetaTableLocator metaTableLocator;
240
241
242 @SuppressWarnings("unused")
243 private RecoveringRegionWatcher recoveringRegionWatcher;
244
245
246
247
248 protected TableDescriptors tableDescriptors;
249
250
251 protected ReplicationSourceService replicationSourceHandler;
252 protected ReplicationSinkService replicationSinkHandler;
253
254
255 public CompactSplitThread compactSplitThread;
256
257
258
259
260
261 protected final Map<String, Region> onlineRegions = new ConcurrentHashMap<String, Region>();
262
263
264
265
266
267
268
269
270
271
272 protected final Map<String, InetSocketAddress[]> regionFavoredNodesMap =
273 new ConcurrentHashMap<String, InetSocketAddress[]>();
274
275
276
277
278
279 protected final Map<String, Region> recoveringRegions = Collections
280 .synchronizedMap(new HashMap<String, Region>());
281
282
283 protected Leases leases;
284
285
286 protected ExecutorService service;
287
288
289 protected volatile boolean fsOk;
290 protected HFileSystem fs;
291
292
293
294
295 private volatile boolean stopped = false;
296
297
298
299 private volatile boolean abortRequested;
300
301 ConcurrentMap<String, Integer> rowlocks = new ConcurrentHashMap<String, Integer>();
302
303
304
305 private boolean stopping = false;
306
307 private volatile boolean killed = false;
308
309 protected final Configuration conf;
310
311 private Path rootDir;
312
313 protected final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
314
315 final int numRetries;
316 protected final int threadWakeFrequency;
317 protected final int msgInterval;
318
319 protected final int numRegionsToReport;
320
321
322 private volatile RegionServerStatusService.BlockingInterface rssStub;
323
324 RpcClient rpcClient;
325
326 private RpcRetryingCallerFactory rpcRetryingCallerFactory;
327 private RpcControllerFactory rpcControllerFactory;
328
329 private UncaughtExceptionHandler uncaughtExceptionHandler;
330
331
332
333
334 protected InfoServer infoServer;
335 private JvmPauseMonitor pauseMonitor;
336
337
338 public static final String REGIONSERVER = "regionserver";
339
340 MetricsRegionServer metricsRegionServer;
341 private SpanReceiverHost spanReceiverHost;
342
343
344
345
346 private final ChoreService choreService;
347
348
349
350
351 ScheduledChore compactionChecker;
352
353
354
355
356 ScheduledChore periodicFlusher;
357
358 protected volatile WALFactory walFactory;
359
360
361
362 final LogRoller walRoller;
363
364 final AtomicReference<LogRoller> metawalRoller = new AtomicReference<LogRoller>();
365
366
367 final AtomicBoolean online = new AtomicBoolean(false);
368
369
370 protected ZooKeeperWatcher zooKeeper;
371
372
373 private MasterAddressTracker masterAddressTracker;
374
375
376 protected ClusterStatusTracker clusterStatusTracker;
377
378
379 private SplitLogWorker splitLogWorker;
380
381
382 protected final Sleeper sleeper;
383
384 private final int operationTimeout;
385 private final int shortOperationTimeout;
386
387 private final RegionServerAccounting regionServerAccounting;
388
389
390 protected CacheConfig cacheConfig;
391
392
393 private HealthCheckChore healthCheckChore;
394
395
396 private ScheduledChore nonceManagerChore;
397
398 private Map<String, Service> coprocessorServiceHandlers = Maps.newHashMap();
399
400
401
402
403
404
405 protected ServerName serverName;
406
407
408
409
410 protected String useThisHostnameInstead;
411
412
413
414
415 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
416 final static String RS_HOSTNAME_KEY = "hbase.regionserver.hostname";
417 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
418 protected final static String MASTER_HOSTNAME_KEY = "hbase.master.hostname";
419
420
421
422
423 protected final long startcode;
424
425
426
427
428 private String clusterId;
429
430
431
432
433 private ObjectName mxBean = null;
434
435
436
437
438 private MovedRegionsCleaner movedRegionsCleaner;
439
440
441 private StorefileRefresherChore storefileRefresher;
442
443 private RegionServerCoprocessorHost rsHost;
444
445 private RegionServerProcedureManagerHost rspmHost;
446
447 private RegionServerQuotaManager rsQuotaManager;
448
449
450 protected TableLockManager tableLockManager;
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470 final ServerNonceManager nonceManager;
471
472 private UserProvider userProvider;
473
474 protected final RSRpcServices rpcServices;
475
476 protected BaseCoordinatedStateManager csm;
477
478 private final boolean useZKForAssignment;
479
480
481
482
483
484 protected final ConfigurationManager configurationManager;
485
486
487
488
489 public HRegionServer(Configuration conf) throws IOException, InterruptedException {
490 this(conf, CoordinatedStateManagerFactory.getCoordinatedStateManager(conf));
491 }
492
493
494
495
496
497 public HRegionServer(Configuration conf, CoordinatedStateManager csm)
498 throws IOException, InterruptedException {
499 super("RegionServer");
500 this.fsOk = true;
501 this.conf = conf;
502 checkCodecs(this.conf);
503 this.userProvider = UserProvider.instantiate(conf);
504 FSUtils.setupShortCircuitRead(this.conf);
505
506 this.conf.setBoolean(HConstants.USE_META_REPLICAS, false);
507
508
509 this.numRetries = this.conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
510 HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
511 this.threadWakeFrequency = conf.getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
512 this.msgInterval = conf.getInt("hbase.regionserver.msginterval", 3 * 1000);
513
514 this.sleeper = new Sleeper(this.msgInterval, this);
515
516 boolean isNoncesEnabled = conf.getBoolean(HConstants.HBASE_RS_NONCES_ENABLED, true);
517 this.nonceManager = isNoncesEnabled ? new ServerNonceManager(this.conf) : null;
518
519 this.numRegionsToReport = conf.getInt(
520 "hbase.regionserver.numregionstoreport", 10);
521
522 this.operationTimeout = conf.getInt(
523 HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
524 HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
525
526 this.shortOperationTimeout = conf.getInt(
527 HConstants.HBASE_RPC_SHORTOPERATION_TIMEOUT_KEY,
528 HConstants.DEFAULT_HBASE_RPC_SHORTOPERATION_TIMEOUT);
529
530 this.abortRequested = false;
531 this.stopped = false;
532
533 rpcServices = createRpcServices();
534 this.startcode = System.currentTimeMillis();
535 if (this instanceof HMaster) {
536 useThisHostnameInstead = conf.get(MASTER_HOSTNAME_KEY);
537 } else {
538 useThisHostnameInstead = conf.get(RS_HOSTNAME_KEY);
539 }
540 String hostName = shouldUseThisHostnameInstead() ? useThisHostnameInstead :
541 rpcServices.isa.getHostName();
542 serverName = ServerName.valueOf(hostName, rpcServices.isa.getPort(), startcode);
543
544 rpcControllerFactory = RpcControllerFactory.instantiate(this.conf);
545 rpcRetryingCallerFactory = RpcRetryingCallerFactory.instantiate(this.conf);
546
547
548 ZKUtil.loginClient(this.conf, HConstants.ZK_CLIENT_KEYTAB_FILE,
549 HConstants.ZK_CLIENT_KERBEROS_PRINCIPAL, hostName);
550
551 login(userProvider, hostName);
552
553
554 Superusers.initialize(conf);
555
556 regionServerAccounting = new RegionServerAccounting();
557 uncaughtExceptionHandler = new UncaughtExceptionHandler() {
558 @Override
559 public void uncaughtException(Thread t, Throwable e) {
560 abort("Uncaught exception in service thread " + t.getName(), e);
561 }
562 };
563
564 useZKForAssignment = ConfigUtil.useZKForAssignment(conf);
565
566 initializeFileSystem();
567
568 service = new ExecutorService(getServerName().toShortString());
569 spanReceiverHost = SpanReceiverHost.getInstance(getConfiguration());
570
571
572 if (!conf.getBoolean("hbase.testing.nocluster", false)) {
573
574 zooKeeper = new ZooKeeperWatcher(conf, getProcessName() + ":" +
575 rpcServices.isa.getPort(), this, canCreateBaseZNode());
576
577 this.csm = (BaseCoordinatedStateManager) csm;
578 this.csm.initialize(this);
579 this.csm.start();
580
581 tableLockManager = TableLockManager.createTableLockManager(
582 conf, zooKeeper, serverName);
583
584 masterAddressTracker = new MasterAddressTracker(getZooKeeper(), this);
585 masterAddressTracker.start();
586
587 clusterStatusTracker = new ClusterStatusTracker(zooKeeper, this);
588 clusterStatusTracker.start();
589 }
590 this.configurationManager = new ConfigurationManager();
591
592 rpcServices.start();
593 putUpWebUI();
594 this.walRoller = new LogRoller(this, this);
595 this.choreService = new ChoreService(getServerName().toString(), true);
596
597 if (!SystemUtils.IS_OS_WINDOWS) {
598 Signal.handle(new Signal("HUP"), new SignalHandler() {
599 public void handle(Signal signal) {
600 getConfiguration().reloadConfiguration();
601 configurationManager.notifyAllObservers(getConfiguration());
602 }
603 });
604 }
605 }
606
607 private void initializeFileSystem() throws IOException {
608
609
610
611 FSUtils.setFsDefault(this.conf, FSUtils.getRootDir(this.conf));
612
613
614 boolean useHBaseChecksum = conf.getBoolean(HConstants.HBASE_CHECKSUM_VERIFICATION, true);
615 this.fs = new HFileSystem(this.conf, useHBaseChecksum);
616 this.rootDir = FSUtils.getRootDir(this.conf);
617 this.tableDescriptors = new FSTableDescriptors(
618 this.conf, this.fs, this.rootDir, !canUpdateTableDescriptor(), false);
619 }
620
621
622
623
624 protected boolean shouldUseThisHostnameInstead() {
625 return useThisHostnameInstead != null && !useThisHostnameInstead.isEmpty();
626 }
627
628 protected void login(UserProvider user, String host) throws IOException {
629 user.login("hbase.regionserver.keytab.file",
630 "hbase.regionserver.kerberos.principal", host);
631 }
632
633 protected void waitForMasterActive(){
634 }
635
636 protected String getProcessName() {
637 return REGIONSERVER;
638 }
639
640 protected boolean canCreateBaseZNode() {
641 return false;
642 }
643
644 protected boolean canUpdateTableDescriptor() {
645 return false;
646 }
647
648 protected RSRpcServices createRpcServices() throws IOException {
649 return new RSRpcServices(this);
650 }
651
652 protected void configureInfoServer() {
653 infoServer.addServlet("rs-status", "/rs-status", RSStatusServlet.class);
654 infoServer.setAttribute(REGIONSERVER, this);
655 }
656
657 protected Class<? extends HttpServlet> getDumpServlet() {
658 return RSDumpServlet.class;
659 }
660
661 @Override
662 public boolean registerService(Service instance) {
663
664
665
666 Descriptors.ServiceDescriptor serviceDesc = instance.getDescriptorForType();
667 if (coprocessorServiceHandlers.containsKey(serviceDesc.getFullName())) {
668 LOG.error("Coprocessor service " + serviceDesc.getFullName()
669 + " already registered, rejecting request from " + instance);
670 return false;
671 }
672
673 coprocessorServiceHandlers.put(serviceDesc.getFullName(), instance);
674 if (LOG.isDebugEnabled()) {
675 LOG.debug("Registered regionserver coprocessor service: service="+serviceDesc.getFullName());
676 }
677 return true;
678 }
679
680
681
682
683
684
685
686 @VisibleForTesting
687 protected ClusterConnection createClusterConnection() throws IOException {
688
689
690
691 return ConnectionUtils.createShortCircuitConnection(conf, null, userProvider.getCurrent(),
692 serverName, rpcServices, rpcServices);
693 }
694
695
696
697
698
699
700 private static void checkCodecs(final Configuration c) throws IOException {
701
702 String [] codecs = c.getStrings("hbase.regionserver.codecs", (String[])null);
703 if (codecs == null) return;
704 for (String codec : codecs) {
705 if (!CompressionTest.testCompression(codec)) {
706 throw new IOException("Compression codec " + codec +
707 " not supported, aborting RS construction");
708 }
709 }
710 }
711
712 public String getClusterId() {
713 return this.clusterId;
714 }
715
716
717
718
719
720 protected synchronized void setupClusterConnection() throws IOException {
721 if (clusterConnection == null) {
722 clusterConnection = createClusterConnection();
723 metaTableLocator = new MetaTableLocator();
724 }
725 }
726
727
728
729
730
731
732
733 private void preRegistrationInitialization(){
734 try {
735 setupClusterConnection();
736
737
738 if (isHealthCheckerConfigured()) {
739 int sleepTime = this.conf.getInt(HConstants.HEALTH_CHORE_WAKE_FREQ,
740 HConstants.DEFAULT_THREAD_WAKE_FREQUENCY);
741 healthCheckChore = new HealthCheckChore(sleepTime, this, getConfiguration());
742 }
743 this.pauseMonitor = new JvmPauseMonitor(conf);
744 pauseMonitor.start();
745
746 initializeZooKeeper();
747 if (!isStopped() && !isAborted()) {
748 initializeThreads();
749 }
750 } catch (Throwable t) {
751
752
753 this.rpcServices.stop();
754 abort("Initialization of RS failed. Hence aborting RS.", t);
755 }
756 }
757
758
759
760
761
762
763
764
765
766 private void initializeZooKeeper() throws IOException, InterruptedException {
767
768
769
770 blockAndCheckIfStopped(this.masterAddressTracker);
771
772
773
774 blockAndCheckIfStopped(this.clusterStatusTracker);
775
776
777
778
779 try {
780 clusterId = ZKClusterId.readClusterIdZNode(this.zooKeeper);
781 if (clusterId == null) {
782 this.abort("Cluster ID has not been set");
783 }
784 LOG.info("ClusterId : "+clusterId);
785 } catch (KeeperException e) {
786 this.abort("Failed to retrieve Cluster ID",e);
787 }
788
789
790
791
792
793 waitForMasterActive();
794 if (isStopped() || isAborted()) {
795 return;
796 }
797
798
799 try {
800 rspmHost = new RegionServerProcedureManagerHost();
801 rspmHost.loadProcedures(conf);
802 rspmHost.initialize(this);
803 } catch (KeeperException e) {
804 this.abort("Failed to reach zk cluster when creating procedure handler.", e);
805 }
806
807 this.recoveringRegionWatcher = new RecoveringRegionWatcher(this.zooKeeper, this);
808 }
809
810
811
812
813
814
815
816
817 private void blockAndCheckIfStopped(ZooKeeperNodeTracker tracker)
818 throws IOException, InterruptedException {
819 while (tracker.blockUntilAvailable(this.msgInterval, false) == null) {
820 if (this.stopped) {
821 throw new IOException("Received the shutdown message while waiting.");
822 }
823 }
824 }
825
826
827
828
829 private boolean isClusterUp() {
830 return clusterStatusTracker != null && clusterStatusTracker.isClusterUp();
831 }
832
833 private void initializeThreads() throws IOException {
834
835 this.cacheFlusher = new MemStoreFlusher(conf, this);
836
837
838 this.compactSplitThread = new CompactSplitThread(this);
839
840
841
842 this.compactionChecker = new CompactionChecker(this, this.threadWakeFrequency, this);
843 this.periodicFlusher = new PeriodicMemstoreFlusher(this.threadWakeFrequency, this);
844 this.leases = new Leases(this.threadWakeFrequency);
845
846
847 movedRegionsCleaner = MovedRegionsCleaner.create(this);
848
849 if (this.nonceManager != null) {
850
851 nonceManagerChore = this.nonceManager.createCleanupScheduledChore(this);
852 }
853
854
855 rsQuotaManager = new RegionServerQuotaManager(this);
856
857
858 rpcClient = RpcClientFactory.createClient(conf, clusterId, new InetSocketAddress(
859 rpcServices.isa.getAddress(), 0), clusterConnection.getConnectionMetrics());
860
861 boolean onlyMetaRefresh = false;
862 int storefileRefreshPeriod = conf.getInt(
863 StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD
864 , StorefileRefresherChore.DEFAULT_REGIONSERVER_STOREFILE_REFRESH_PERIOD);
865 if (storefileRefreshPeriod == 0) {
866 storefileRefreshPeriod = conf.getInt(
867 StorefileRefresherChore.REGIONSERVER_META_STOREFILE_REFRESH_PERIOD,
868 StorefileRefresherChore.DEFAULT_REGIONSERVER_STOREFILE_REFRESH_PERIOD);
869 onlyMetaRefresh = true;
870 }
871 if (storefileRefreshPeriod > 0) {
872 this.storefileRefresher = new StorefileRefresherChore(storefileRefreshPeriod,
873 onlyMetaRefresh, this, this);
874 }
875 registerConfigurationObservers();
876 }
877
878 private void registerConfigurationObservers() {
879
880 configurationManager.registerObserver(this.compactSplitThread);
881 configurationManager.registerObserver(this.rpcServices);
882 }
883
884
885
886
887 @Override
888 public void run() {
889 try {
890
891 preRegistrationInitialization();
892 } catch (Throwable e) {
893 abort("Fatal exception during initialization", e);
894 }
895
896 try {
897 if (!isStopped() && !isAborted()) {
898 ShutdownHook.install(conf, fs, this, Thread.currentThread());
899
900 createMyEphemeralNode();
901
902
903 this.rsHost = new RegionServerCoprocessorHost(this, this.conf);
904 }
905
906
907
908 while (keepLooping()) {
909 RegionServerStartupResponse w = reportForDuty();
910 if (w == null) {
911 LOG.warn("reportForDuty failed; sleeping and then retrying.");
912 this.sleeper.sleep();
913 } else {
914 handleReportForDutyResponse(w);
915 break;
916 }
917 }
918
919 if (!isStopped() && isHealthy()){
920
921
922 rspmHost.start();
923 }
924
925
926 if (this.rsQuotaManager != null) {
927 rsQuotaManager.start(getRpcServer().getScheduler());
928 }
929
930
931 long lastMsg = System.currentTimeMillis();
932 long oldRequestCount = -1;
933
934 while (!isStopped() && isHealthy()) {
935 if (!isClusterUp()) {
936 if (isOnlineRegionsEmpty()) {
937 stop("Exiting; cluster shutdown set and not carrying any regions");
938 } else if (!this.stopping) {
939 this.stopping = true;
940 LOG.info("Closing user regions");
941 closeUserRegions(this.abortRequested);
942 } else if (this.stopping) {
943 boolean allUserRegionsOffline = areAllUserRegionsOffline();
944 if (allUserRegionsOffline) {
945
946
947
948 if (oldRequestCount == getWriteRequestCount()) {
949 stop("Stopped; only catalog regions remaining online");
950 break;
951 }
952 oldRequestCount = getWriteRequestCount();
953 } else {
954
955
956
957 closeUserRegions(this.abortRequested);
958 }
959 LOG.debug("Waiting on " + getOnlineRegionsAsPrintableString());
960 }
961 }
962 long now = System.currentTimeMillis();
963 if ((now - lastMsg) >= msgInterval) {
964 tryRegionServerReport(lastMsg, now);
965 lastMsg = System.currentTimeMillis();
966 }
967 if (!isStopped() && !isAborted()) {
968 this.sleeper.sleep();
969 }
970 }
971 } catch (Throwable t) {
972 if (!rpcServices.checkOOME(t)) {
973 String prefix = t instanceof YouAreDeadException? "": "Unhandled: ";
974 abort(prefix + t.getMessage(), t);
975 }
976 }
977
978 if (mxBean != null) {
979 MBeanUtil.unregisterMBean(mxBean);
980 mxBean = null;
981 }
982 if (this.leases != null) this.leases.closeAfterLeasesExpire();
983 if (this.splitLogWorker != null) {
984 splitLogWorker.stop();
985 }
986 if (this.infoServer != null) {
987 LOG.info("Stopping infoServer");
988 try {
989 this.infoServer.stop();
990 } catch (Exception e) {
991 LOG.error("Failed to stop infoServer", e);
992 }
993 }
994
995 if (cacheConfig != null && cacheConfig.isBlockCacheEnabled()) {
996 cacheConfig.getBlockCache().shutdown();
997 }
998
999 if (movedRegionsCleaner != null) {
1000 movedRegionsCleaner.stop("Region Server stopping");
1001 }
1002
1003
1004
1005 if (this.hMemManager != null) this.hMemManager.stop();
1006 if (this.cacheFlusher != null) this.cacheFlusher.interruptIfNecessary();
1007 if (this.compactSplitThread != null) this.compactSplitThread.interruptIfNecessary();
1008 if (this.compactionChecker != null) this.compactionChecker.cancel(true);
1009 if (this.healthCheckChore != null) this.healthCheckChore.cancel(true);
1010 if (this.nonceManagerChore != null) this.nonceManagerChore.cancel(true);
1011 if (this.storefileRefresher != null) this.storefileRefresher.cancel(true);
1012 sendShutdownInterrupt();
1013
1014
1015 if (rsQuotaManager != null) {
1016 rsQuotaManager.stop();
1017 }
1018
1019
1020 if (rspmHost != null) {
1021 rspmHost.stop(this.abortRequested || this.killed);
1022 }
1023
1024 if (this.killed) {
1025
1026 } else if (abortRequested) {
1027 if (this.fsOk) {
1028 closeUserRegions(abortRequested);
1029 }
1030 LOG.info("aborting server " + this.serverName);
1031 } else {
1032 closeUserRegions(abortRequested);
1033 LOG.info("stopping server " + this.serverName);
1034 }
1035
1036
1037 if (this.metaTableLocator != null) this.metaTableLocator.stop();
1038 if (this.clusterConnection != null && !clusterConnection.isClosed()) {
1039 try {
1040 this.clusterConnection.close();
1041 } catch (IOException e) {
1042
1043
1044 LOG.warn("Attempt to close server's short circuit HConnection failed.", e);
1045 }
1046 }
1047
1048
1049 if (!this.killed && containsMetaTableRegions()) {
1050 if (!abortRequested || this.fsOk) {
1051 if (this.compactSplitThread != null) {
1052 this.compactSplitThread.join();
1053 this.compactSplitThread = null;
1054 }
1055 closeMetaTableRegions(abortRequested);
1056 }
1057 }
1058
1059 if (!this.killed && this.fsOk) {
1060 waitOnAllRegionsToClose(abortRequested);
1061 LOG.info("stopping server " + this.serverName +
1062 "; all regions closed.");
1063 }
1064
1065
1066 if (this.fsOk) {
1067 shutdownWAL(!abortRequested);
1068 }
1069
1070
1071 if (this.rssStub != null) {
1072 this.rssStub = null;
1073 }
1074 if (this.rpcClient != null) {
1075 this.rpcClient.close();
1076 }
1077 if (this.leases != null) {
1078 this.leases.close();
1079 }
1080 if (this.pauseMonitor != null) {
1081 this.pauseMonitor.stop();
1082 }
1083
1084 if (!killed) {
1085 stopServiceThreads();
1086 }
1087
1088 if (this.rpcServices != null) {
1089 this.rpcServices.stop();
1090 }
1091
1092 try {
1093 deleteMyEphemeralNode();
1094 } catch (KeeperException.NoNodeException nn) {
1095 } catch (KeeperException e) {
1096 LOG.warn("Failed deleting my ephemeral node", e);
1097 }
1098
1099
1100 ZNodeClearer.deleteMyEphemeralNodeOnDisk();
1101
1102 if (this.zooKeeper != null) {
1103 this.zooKeeper.close();
1104 }
1105 LOG.info("stopping server " + this.serverName +
1106 "; zookeeper connection closed.");
1107
1108 LOG.info(Thread.currentThread().getName() + " exiting");
1109 }
1110
1111 private boolean containsMetaTableRegions() {
1112 return onlineRegions.containsKey(HRegionInfo.FIRST_META_REGIONINFO.getEncodedName());
1113 }
1114
1115 private boolean areAllUserRegionsOffline() {
1116 if (getNumberOfOnlineRegions() > 2) return false;
1117 boolean allUserRegionsOffline = true;
1118 for (Map.Entry<String, Region> e: this.onlineRegions.entrySet()) {
1119 if (!e.getValue().getRegionInfo().isMetaTable()) {
1120 allUserRegionsOffline = false;
1121 break;
1122 }
1123 }
1124 return allUserRegionsOffline;
1125 }
1126
1127
1128
1129
1130 private long getWriteRequestCount() {
1131 long writeCount = 0;
1132 for (Map.Entry<String, Region> e: this.onlineRegions.entrySet()) {
1133 writeCount += e.getValue().getWriteRequestsCount();
1134 }
1135 return writeCount;
1136 }
1137
1138 @VisibleForTesting
1139 protected void tryRegionServerReport(long reportStartTime, long reportEndTime)
1140 throws IOException {
1141 RegionServerStatusService.BlockingInterface rss = rssStub;
1142 if (rss == null) {
1143
1144 return;
1145 }
1146 ClusterStatusProtos.ServerLoad sl = buildServerLoad(reportStartTime, reportEndTime);
1147 try {
1148 RegionServerReportRequest.Builder request = RegionServerReportRequest.newBuilder();
1149 ServerName sn = ServerName.parseVersionedServerName(
1150 this.serverName.getVersionedBytes());
1151 request.setServer(ProtobufUtil.toServerName(sn));
1152 request.setLoad(sl);
1153 rss.regionServerReport(null, request.build());
1154 } catch (ServiceException se) {
1155 IOException ioe = ProtobufUtil.getRemoteException(se);
1156 if (ioe instanceof YouAreDeadException) {
1157
1158 throw ioe;
1159 }
1160 if (rssStub == rss) {
1161 rssStub = null;
1162 }
1163
1164
1165 createRegionServerStatusStub(true);
1166 }
1167 }
1168
1169 ClusterStatusProtos.ServerLoad buildServerLoad(long reportStartTime, long reportEndTime)
1170 throws IOException {
1171
1172
1173
1174
1175
1176
1177
1178 MetricsRegionServerWrapper regionServerWrapper = metricsRegionServer.getRegionServerWrapper();
1179 Collection<Region> regions = getOnlineRegionsLocalContext();
1180 long usedMemory = -1L;
1181 long maxMemory = -1L;
1182 final MemoryUsage usage = HeapMemorySizeUtil.safeGetHeapMemoryUsage();
1183 if (usage != null) {
1184 usedMemory = usage.getUsed();
1185 maxMemory = usage.getMax();
1186 }
1187
1188 ClusterStatusProtos.ServerLoad.Builder serverLoad =
1189 ClusterStatusProtos.ServerLoad.newBuilder();
1190 serverLoad.setNumberOfRequests((int) regionServerWrapper.getRequestsPerSecond());
1191 serverLoad.setTotalNumberOfRequests((int) regionServerWrapper.getTotalRequestCount());
1192 serverLoad.setUsedHeapMB((int)(usedMemory / 1024 / 1024));
1193 serverLoad.setMaxHeapMB((int) (maxMemory / 1024 / 1024));
1194 Set<String> coprocessors = getWAL(null).getCoprocessorHost().getCoprocessors();
1195 Builder coprocessorBuilder = Coprocessor.newBuilder();
1196 for (String coprocessor : coprocessors) {
1197 serverLoad.addCoprocessors(coprocessorBuilder.setName(coprocessor).build());
1198 }
1199 RegionLoad.Builder regionLoadBldr = RegionLoad.newBuilder();
1200 RegionSpecifier.Builder regionSpecifier = RegionSpecifier.newBuilder();
1201 for (Region region : regions) {
1202 if (region.getCoprocessorHost() != null) {
1203 Set<String> regionCoprocessors = region.getCoprocessorHost().getCoprocessors();
1204 Iterator<String> iterator = regionCoprocessors.iterator();
1205 while (iterator.hasNext()) {
1206 serverLoad.addCoprocessors(coprocessorBuilder.setName(iterator.next()).build());
1207 }
1208 }
1209 serverLoad.addRegionLoads(createRegionLoad(region, regionLoadBldr, regionSpecifier));
1210 for (String coprocessor : getWAL(region.getRegionInfo()).getCoprocessorHost()
1211 .getCoprocessors()) {
1212 serverLoad.addCoprocessors(coprocessorBuilder.setName(coprocessor).build());
1213 }
1214 }
1215 serverLoad.setReportStartTime(reportStartTime);
1216 serverLoad.setReportEndTime(reportEndTime);
1217 if (this.infoServer != null) {
1218 serverLoad.setInfoServerPort(this.infoServer.getPort());
1219 } else {
1220 serverLoad.setInfoServerPort(-1);
1221 }
1222
1223
1224
1225 ReplicationSourceService rsources = getReplicationSourceService();
1226
1227 if (rsources != null) {
1228
1229 ReplicationLoad rLoad = rsources.refreshAndGetReplicationLoad();
1230 if (rLoad != null) {
1231 serverLoad.setReplLoadSink(rLoad.getReplicationLoadSink());
1232 for (ClusterStatusProtos.ReplicationLoadSource rLS : rLoad.getReplicationLoadSourceList()) {
1233 serverLoad.addReplLoadSource(rLS);
1234 }
1235 }
1236 }
1237
1238 return serverLoad.build();
1239 }
1240
1241 String getOnlineRegionsAsPrintableString() {
1242 StringBuilder sb = new StringBuilder();
1243 for (Region r: this.onlineRegions.values()) {
1244 if (sb.length() > 0) sb.append(", ");
1245 sb.append(r.getRegionInfo().getEncodedName());
1246 }
1247 return sb.toString();
1248 }
1249
1250
1251
1252
1253 private void waitOnAllRegionsToClose(final boolean abort) {
1254
1255 int lastCount = -1;
1256 long previousLogTime = 0;
1257 Set<String> closedRegions = new HashSet<String>();
1258 boolean interrupted = false;
1259 try {
1260 while (!isOnlineRegionsEmpty()) {
1261 int count = getNumberOfOnlineRegions();
1262
1263 if (count != lastCount) {
1264
1265 if (System.currentTimeMillis() > (previousLogTime + 1000)) {
1266 previousLogTime = System.currentTimeMillis();
1267 lastCount = count;
1268 LOG.info("Waiting on " + count + " regions to close");
1269
1270
1271 if (count < 10 && LOG.isDebugEnabled()) {
1272 LOG.debug(this.onlineRegions);
1273 }
1274 }
1275 }
1276
1277
1278
1279 for (Map.Entry<String, Region> e : this.onlineRegions.entrySet()) {
1280 HRegionInfo hri = e.getValue().getRegionInfo();
1281 if (!this.regionsInTransitionInRS.containsKey(hri.getEncodedNameAsBytes())
1282 && !closedRegions.contains(hri.getEncodedName())) {
1283 closedRegions.add(hri.getEncodedName());
1284
1285 closeRegionIgnoreErrors(hri, abort);
1286 }
1287 }
1288
1289 if (this.regionsInTransitionInRS.isEmpty()) {
1290 if (!isOnlineRegionsEmpty()) {
1291 LOG.info("We were exiting though online regions are not empty," +
1292 " because some regions failed closing");
1293 }
1294 break;
1295 }
1296 if (sleep(200)) {
1297 interrupted = true;
1298 }
1299 }
1300 } finally {
1301 if (interrupted) {
1302 Thread.currentThread().interrupt();
1303 }
1304 }
1305 }
1306
1307 private boolean sleep(long millis) {
1308 boolean interrupted = false;
1309 try {
1310 Thread.sleep(millis);
1311 } catch (InterruptedException e) {
1312 LOG.warn("Interrupted while sleeping");
1313 interrupted = true;
1314 }
1315 return interrupted;
1316 }
1317
1318 private void shutdownWAL(final boolean close) {
1319 if (this.walFactory != null) {
1320 try {
1321 if (close) {
1322 walFactory.close();
1323 } else {
1324 walFactory.shutdown();
1325 }
1326 } catch (Throwable e) {
1327 e = RemoteExceptionHandler.checkThrowable(e);
1328 LOG.error("Shutdown / close of WAL failed: " + e);
1329 LOG.debug("Shutdown / close exception details:", e);
1330 }
1331 }
1332 }
1333
1334
1335
1336
1337
1338
1339 protected void handleReportForDutyResponse(final RegionServerStartupResponse c)
1340 throws IOException {
1341 try {
1342 boolean updateRootDir = false;
1343 for (NameStringPair e : c.getMapEntriesList()) {
1344 String key = e.getName();
1345
1346 if (key.equals(HConstants.KEY_FOR_HOSTNAME_SEEN_BY_MASTER)) {
1347 String hostnameFromMasterPOV = e.getValue();
1348 this.serverName = ServerName.valueOf(hostnameFromMasterPOV,
1349 rpcServices.isa.getPort(), this.startcode);
1350 if (shouldUseThisHostnameInstead() &&
1351 !hostnameFromMasterPOV.equals(useThisHostnameInstead)) {
1352 String msg = "Master passed us a different hostname to use; was=" +
1353 this.useThisHostnameInstead + ", but now=" + hostnameFromMasterPOV;
1354 LOG.error(msg);
1355 throw new IOException(msg);
1356 }
1357 if (!shouldUseThisHostnameInstead() &&
1358 !hostnameFromMasterPOV.equals(rpcServices.isa.getHostName())) {
1359 String msg = "Master passed us a different hostname to use; was=" +
1360 rpcServices.isa.getHostName() + ", but now=" + hostnameFromMasterPOV;
1361 LOG.error(msg);
1362 }
1363 continue;
1364 }
1365
1366 String value = e.getValue();
1367 if (key.equals(HConstants.HBASE_DIR)) {
1368 if (value != null && !value.equals(conf.get(HConstants.HBASE_DIR))) {
1369 updateRootDir = true;
1370 }
1371 }
1372
1373 if (LOG.isDebugEnabled()) {
1374 LOG.debug("Config from master: " + key + "=" + value);
1375 }
1376 this.conf.set(key, value);
1377 }
1378
1379 if (updateRootDir) {
1380
1381 initializeFileSystem();
1382 }
1383
1384
1385
1386 if (this.conf.get("mapreduce.task.attempt.id") == null) {
1387 this.conf.set("mapreduce.task.attempt.id", "hb_rs_" +
1388 this.serverName.toString());
1389 }
1390
1391
1392 ZNodeClearer.writeMyEphemeralNodeOnDisk(getMyEphemeralNodePath());
1393
1394 this.cacheConfig = new CacheConfig(conf);
1395 this.walFactory = setupWALAndReplication();
1396
1397 this.metricsRegionServer = new MetricsRegionServer(new MetricsRegionServerWrapperImpl(this));
1398
1399 startServiceThreads();
1400 startHeapMemoryManager();
1401 LOG.info("Serving as " + this.serverName +
1402 ", RpcServer on " + rpcServices.isa +
1403 ", sessionid=0x" +
1404 Long.toHexString(this.zooKeeper.getRecoverableZooKeeper().getSessionId()));
1405
1406
1407 synchronized (online) {
1408 online.set(true);
1409 online.notifyAll();
1410 }
1411 } catch (Throwable e) {
1412 stop("Failed initialization");
1413 throw convertThrowableToIOE(cleanup(e, "Failed init"),
1414 "Region server startup failed");
1415 } finally {
1416 sleeper.skipSleepCycle();
1417 }
1418 }
1419
1420 private void startHeapMemoryManager() {
1421 this.hMemManager = HeapMemoryManager.create(this.conf, this.cacheFlusher,
1422 this, this.regionServerAccounting);
1423 if (this.hMemManager != null) {
1424 this.hMemManager.start(getChoreService());
1425 }
1426 }
1427
1428 private void createMyEphemeralNode() throws KeeperException, IOException {
1429 RegionServerInfo.Builder rsInfo = RegionServerInfo.newBuilder();
1430 rsInfo.setInfoPort(infoServer != null ? infoServer.getPort() : -1);
1431 rsInfo.setVersionInfo(ProtobufUtil.getVersionInfo());
1432 byte[] data = ProtobufUtil.prependPBMagic(rsInfo.build().toByteArray());
1433 ZKUtil.createEphemeralNodeAndWatch(this.zooKeeper,
1434 getMyEphemeralNodePath(), data);
1435 }
1436
1437 private void deleteMyEphemeralNode() throws KeeperException {
1438 ZKUtil.deleteNode(this.zooKeeper, getMyEphemeralNodePath());
1439 }
1440
1441 @Override
1442 public RegionServerAccounting getRegionServerAccounting() {
1443 return regionServerAccounting;
1444 }
1445
1446 @Override
1447 public TableLockManager getTableLockManager() {
1448 return tableLockManager;
1449 }
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459 private RegionLoad createRegionLoad(final Region r, RegionLoad.Builder regionLoadBldr,
1460 RegionSpecifier.Builder regionSpecifier) throws IOException {
1461 byte[] name = r.getRegionInfo().getRegionName();
1462 int stores = 0;
1463 int storefiles = 0;
1464 int storeUncompressedSizeMB = 0;
1465 int storefileSizeMB = 0;
1466 int memstoreSizeMB = (int) (r.getMemstoreSize() / 1024 / 1024);
1467 int storefileIndexSizeMB = 0;
1468 int rootIndexSizeKB = 0;
1469 int totalStaticIndexSizeKB = 0;
1470 int totalStaticBloomSizeKB = 0;
1471 long totalCompactingKVs = 0;
1472 long currentCompactedKVs = 0;
1473 List<Store> storeList = r.getStores();
1474 stores += storeList.size();
1475 for (Store store : storeList) {
1476 storefiles += store.getStorefilesCount();
1477 storeUncompressedSizeMB += (int) (store.getStoreSizeUncompressed() / 1024 / 1024);
1478 storefileSizeMB += (int) (store.getStorefilesSize() / 1024 / 1024);
1479 storefileIndexSizeMB += (int) (store.getStorefilesIndexSize() / 1024 / 1024);
1480 CompactionProgress progress = store.getCompactionProgress();
1481 if (progress != null) {
1482 totalCompactingKVs += progress.totalCompactingKVs;
1483 currentCompactedKVs += progress.currentCompactedKVs;
1484 }
1485 rootIndexSizeKB += (int) (store.getStorefilesIndexSize() / 1024);
1486 totalStaticIndexSizeKB += (int) (store.getTotalStaticIndexSize() / 1024);
1487 totalStaticBloomSizeKB += (int) (store.getTotalStaticBloomSize() / 1024);
1488 }
1489
1490 float dataLocality =
1491 r.getHDFSBlocksDistribution().getBlockLocalityIndex(serverName.getHostname());
1492 if (regionLoadBldr == null) {
1493 regionLoadBldr = RegionLoad.newBuilder();
1494 }
1495 if (regionSpecifier == null) {
1496 regionSpecifier = RegionSpecifier.newBuilder();
1497 }
1498 regionSpecifier.setType(RegionSpecifierType.REGION_NAME);
1499 regionSpecifier.setValue(ByteStringer.wrap(name));
1500 regionLoadBldr.setRegionSpecifier(regionSpecifier.build())
1501 .setStores(stores)
1502 .setStorefiles(storefiles)
1503 .setStoreUncompressedSizeMB(storeUncompressedSizeMB)
1504 .setStorefileSizeMB(storefileSizeMB)
1505 .setMemstoreSizeMB(memstoreSizeMB)
1506 .setStorefileIndexSizeMB(storefileIndexSizeMB)
1507 .setRootIndexSizeKB(rootIndexSizeKB)
1508 .setTotalStaticIndexSizeKB(totalStaticIndexSizeKB)
1509 .setTotalStaticBloomSizeKB(totalStaticBloomSizeKB)
1510 .setReadRequestsCount(r.getReadRequestsCount())
1511 .setWriteRequestsCount(r.getWriteRequestsCount())
1512 .setTotalCompactingKVs(totalCompactingKVs)
1513 .setCurrentCompactedKVs(currentCompactedKVs)
1514 .setDataLocality(dataLocality)
1515 .setLastMajorCompactionTs(r.getOldestHfileTs(true));
1516 ((HRegion)r).setCompleteSequenceId(regionLoadBldr);
1517
1518 return regionLoadBldr.build();
1519 }
1520
1521
1522
1523
1524
1525 public RegionLoad createRegionLoad(final String encodedRegionName) throws IOException {
1526 Region r = onlineRegions.get(encodedRegionName);
1527 return r != null ? createRegionLoad(r, null, null) : null;
1528 }
1529
1530
1531
1532
1533 private static class CompactionChecker extends ScheduledChore {
1534 private final HRegionServer instance;
1535 private final int majorCompactPriority;
1536 private final static int DEFAULT_PRIORITY = Integer.MAX_VALUE;
1537 private long iteration = 0;
1538
1539 CompactionChecker(final HRegionServer h, final int sleepTime,
1540 final Stoppable stopper) {
1541 super("CompactionChecker", stopper, sleepTime);
1542 this.instance = h;
1543 LOG.info(this.getName() + " runs every " + StringUtils.formatTime(sleepTime));
1544
1545
1546
1547
1548 this.majorCompactPriority = this.instance.conf.
1549 getInt("hbase.regionserver.compactionChecker.majorCompactPriority",
1550 DEFAULT_PRIORITY);
1551 }
1552
1553 @Override
1554 protected void chore() {
1555 for (Region r : this.instance.onlineRegions.values()) {
1556 if (r == null)
1557 continue;
1558 for (Store s : r.getStores()) {
1559 try {
1560 long multiplier = s.getCompactionCheckMultiplier();
1561 assert multiplier > 0;
1562 if (iteration % multiplier != 0) continue;
1563 if (s.needsCompaction()) {
1564
1565 this.instance.compactSplitThread.requestSystemCompaction(r, s, getName()
1566 + " requests compaction");
1567 } else if (s.isMajorCompaction()) {
1568 if (majorCompactPriority == DEFAULT_PRIORITY
1569 || majorCompactPriority > ((HRegion)r).getCompactPriority()) {
1570 this.instance.compactSplitThread.requestCompaction(r, s, getName()
1571 + " requests major compaction; use default priority", null);
1572 } else {
1573 this.instance.compactSplitThread.requestCompaction(r, s, getName()
1574 + " requests major compaction; use configured priority",
1575 this.majorCompactPriority, null, null);
1576 }
1577 }
1578 } catch (IOException e) {
1579 LOG.warn("Failed major compaction check on " + r, e);
1580 }
1581 }
1582 }
1583 iteration = (iteration == Long.MAX_VALUE) ? 0 : (iteration + 1);
1584 }
1585 }
1586
1587 static class PeriodicMemstoreFlusher extends ScheduledChore {
1588 final HRegionServer server;
1589 final static int RANGE_OF_DELAY = 5 * 60 * 1000;
1590 final static int MIN_DELAY_TIME = 0;
1591 public PeriodicMemstoreFlusher(int cacheFlushInterval, final HRegionServer server) {
1592 super(server.getServerName() + "-MemstoreFlusherChore", server, cacheFlushInterval);
1593 this.server = server;
1594 }
1595
1596 @Override
1597 protected void chore() {
1598 final StringBuffer whyFlush = new StringBuffer();
1599 for (Region r : this.server.onlineRegions.values()) {
1600 if (r == null) continue;
1601 if (((HRegion)r).shouldFlush(whyFlush)) {
1602 FlushRequester requester = server.getFlushRequester();
1603 if (requester != null) {
1604 long randomDelay = RandomUtils.nextInt(RANGE_OF_DELAY) + MIN_DELAY_TIME;
1605 LOG.info(getName() + " requesting flush of " +
1606 r.getRegionInfo().getRegionNameAsString() + " because " +
1607 whyFlush.toString() +
1608 " after random delay " + randomDelay + "ms");
1609
1610
1611
1612 requester.requestDelayedFlush(r, randomDelay, false);
1613 }
1614 }
1615 }
1616 }
1617 }
1618
1619
1620
1621
1622
1623
1624
1625
1626 public boolean isOnline() {
1627 return online.get();
1628 }
1629
1630
1631
1632
1633
1634
1635
1636 private WALFactory setupWALAndReplication() throws IOException {
1637
1638 final Path oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
1639 final String logName = DefaultWALProvider.getWALDirectoryName(this.serverName.toString());
1640
1641 Path logdir = new Path(rootDir, logName);
1642 if (LOG.isDebugEnabled()) LOG.debug("logdir=" + logdir);
1643 if (this.fs.exists(logdir)) {
1644 throw new RegionServerRunningException("Region server has already " +
1645 "created directory at " + this.serverName.toString());
1646 }
1647
1648
1649
1650 createNewReplicationInstance(conf, this, this.fs, logdir, oldLogDir);
1651
1652
1653 final List<WALActionsListener> listeners = new ArrayList<WALActionsListener>();
1654 listeners.add(new MetricsWAL());
1655 if (this.replicationSourceHandler != null &&
1656 this.replicationSourceHandler.getWALActionsListener() != null) {
1657
1658 listeners.add(this.replicationSourceHandler.getWALActionsListener());
1659 }
1660
1661 return new WALFactory(conf, listeners, serverName.toString());
1662 }
1663
1664
1665
1666
1667
1668
1669
1670
1671
1672 protected LogRoller ensureMetaWALRoller() {
1673
1674
1675 LogRoller roller = metawalRoller.get();
1676 if (null == roller) {
1677 LogRoller tmpLogRoller = new LogRoller(this, this);
1678 String n = Thread.currentThread().getName();
1679 Threads.setDaemonThreadRunning(tmpLogRoller.getThread(),
1680 n + "-MetaLogRoller", uncaughtExceptionHandler);
1681 if (metawalRoller.compareAndSet(null, tmpLogRoller)) {
1682 roller = tmpLogRoller;
1683 } else {
1684
1685 Threads.shutdown(tmpLogRoller.getThread());
1686 roller = metawalRoller.get();
1687 }
1688 }
1689 return roller;
1690 }
1691
1692 public MetricsRegionServer getRegionServerMetrics() {
1693 return this.metricsRegionServer;
1694 }
1695
1696
1697
1698
1699 public MasterAddressTracker getMasterAddressTracker() {
1700 return this.masterAddressTracker;
1701 }
1702
1703
1704
1705
1706
1707
1708
1709
1710
1711
1712
1713
1714
1715 private void startServiceThreads() throws IOException {
1716
1717 this.service.startExecutorService(ExecutorType.RS_OPEN_REGION,
1718 conf.getInt("hbase.regionserver.executor.openregion.threads", 3));
1719 this.service.startExecutorService(ExecutorType.RS_OPEN_META,
1720 conf.getInt("hbase.regionserver.executor.openmeta.threads", 1));
1721 this.service.startExecutorService(ExecutorType.RS_CLOSE_REGION,
1722 conf.getInt("hbase.regionserver.executor.closeregion.threads", 3));
1723 this.service.startExecutorService(ExecutorType.RS_CLOSE_META,
1724 conf.getInt("hbase.regionserver.executor.closemeta.threads", 1));
1725 if (conf.getBoolean(StoreScanner.STORESCANNER_PARALLEL_SEEK_ENABLE, false)) {
1726 this.service.startExecutorService(ExecutorType.RS_PARALLEL_SEEK,
1727 conf.getInt("hbase.storescanner.parallel.seek.threads", 10));
1728 }
1729 this.service.startExecutorService(ExecutorType.RS_LOG_REPLAY_OPS, conf.getInt(
1730 "hbase.regionserver.wal.max.splitters", SplitLogWorkerCoordination.DEFAULT_MAX_SPLITTERS));
1731
1732 if (ServerRegionReplicaUtil.isRegionReplicaWaitForPrimaryFlushEnabled(conf)) {
1733 this.service.startExecutorService(ExecutorType.RS_REGION_REPLICA_FLUSH_OPS,
1734 conf.getInt("hbase.regionserver.region.replica.flusher.threads",
1735 conf.getInt("hbase.regionserver.executor.openregion.threads", 3)));
1736 }
1737
1738 Threads.setDaemonThreadRunning(this.walRoller.getThread(), getName() + ".logRoller",
1739 uncaughtExceptionHandler);
1740 this.cacheFlusher.start(uncaughtExceptionHandler);
1741
1742 if (this.compactionChecker != null) choreService.scheduleChore(compactionChecker);
1743 if (this.periodicFlusher != null) choreService.scheduleChore(periodicFlusher);
1744 if (this.healthCheckChore != null) choreService.scheduleChore(healthCheckChore);
1745 if (this.nonceManagerChore != null) choreService.scheduleChore(nonceManagerChore);
1746 if (this.storefileRefresher != null) choreService.scheduleChore(storefileRefresher);
1747 if (this.movedRegionsCleaner != null) choreService.scheduleChore(movedRegionsCleaner);
1748
1749
1750
1751 Threads.setDaemonThreadRunning(this.leases.getThread(), getName() + ".leaseChecker",
1752 uncaughtExceptionHandler);
1753
1754 if (this.replicationSourceHandler == this.replicationSinkHandler &&
1755 this.replicationSourceHandler != null) {
1756 this.replicationSourceHandler.startReplicationService();
1757 } else {
1758 if (this.replicationSourceHandler != null) {
1759 this.replicationSourceHandler.startReplicationService();
1760 }
1761 if (this.replicationSinkHandler != null) {
1762 this.replicationSinkHandler.startReplicationService();
1763 }
1764 }
1765
1766
1767
1768
1769
1770 Configuration sinkConf = HBaseConfiguration.create(conf);
1771 sinkConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
1772 conf.getInt("hbase.log.replay.retries.number", 8));
1773 sinkConf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
1774 conf.getInt("hbase.log.replay.rpc.timeout", 30000));
1775 sinkConf.setInt("hbase.client.serverside.retries.multiplier", 1);
1776 this.splitLogWorker = new SplitLogWorker(this, sinkConf, this, this, walFactory);
1777 splitLogWorker.start();
1778 }
1779
1780
1781
1782
1783
1784
1785 private int putUpWebUI() throws IOException {
1786 int port = this.conf.getInt(HConstants.REGIONSERVER_INFO_PORT,
1787 HConstants.DEFAULT_REGIONSERVER_INFOPORT);
1788 String addr = this.conf.get("hbase.regionserver.info.bindAddress", "0.0.0.0");
1789
1790 if(this instanceof HMaster) {
1791 port = conf.getInt(HConstants.MASTER_INFO_PORT,
1792 HConstants.DEFAULT_MASTER_INFOPORT);
1793 addr = this.conf.get("hbase.master.info.bindAddress", "0.0.0.0");
1794 }
1795
1796 if (port < 0) return port;
1797
1798 if (!Addressing.isLocalAddress(InetAddress.getByName(addr))) {
1799 String msg =
1800 "Failed to start http info server. Address " + addr
1801 + " does not belong to this host. Correct configuration parameter: "
1802 + "hbase.regionserver.info.bindAddress";
1803 LOG.error(msg);
1804 throw new IOException(msg);
1805 }
1806
1807 boolean auto = this.conf.getBoolean(HConstants.REGIONSERVER_INFO_PORT_AUTO,
1808 false);
1809 while (true) {
1810 try {
1811 this.infoServer = new InfoServer(getProcessName(), addr, port, false, this.conf);
1812 infoServer.addServlet("dump", "/dump", getDumpServlet());
1813 configureInfoServer();
1814 this.infoServer.start();
1815 break;
1816 } catch (BindException e) {
1817 if (!auto) {
1818
1819 LOG.error("Failed binding http info server to port: " + port);
1820 throw e;
1821 }
1822
1823 LOG.info("Failed binding http info server to port: " + port);
1824 port++;
1825 }
1826 }
1827 port = this.infoServer.getPort();
1828 conf.setInt(HConstants.REGIONSERVER_INFO_PORT, port);
1829 int masterInfoPort = conf.getInt(HConstants.MASTER_INFO_PORT,
1830 HConstants.DEFAULT_MASTER_INFOPORT);
1831 conf.setInt("hbase.master.info.port.orig", masterInfoPort);
1832 conf.setInt(HConstants.MASTER_INFO_PORT, port);
1833 return port;
1834 }
1835
1836
1837
1838
1839 private boolean isHealthy() {
1840 if (!fsOk) {
1841
1842 return false;
1843 }
1844
1845 if (!(leases.isAlive()
1846 && cacheFlusher.isAlive() && walRoller.isAlive()
1847 && this.compactionChecker.isScheduled()
1848 && this.periodicFlusher.isScheduled())) {
1849 stop("One or more threads are no longer alive -- stop");
1850 return false;
1851 }
1852 final LogRoller metawalRoller = this.metawalRoller.get();
1853 if (metawalRoller != null && !metawalRoller.isAlive()) {
1854 stop("Meta WAL roller thread is no longer alive -- stop");
1855 return false;
1856 }
1857 return true;
1858 }
1859
1860 private static final byte[] UNSPECIFIED_REGION = new byte[]{};
1861
1862 @Override
1863 public WAL getWAL(HRegionInfo regionInfo) throws IOException {
1864 WAL wal;
1865 LogRoller roller = walRoller;
1866
1867 if (regionInfo != null && regionInfo.isMetaTable() &&
1868 regionInfo.getReplicaId() == HRegionInfo.DEFAULT_REPLICA_ID) {
1869 roller = ensureMetaWALRoller();
1870 wal = walFactory.getMetaWAL(regionInfo.getEncodedNameAsBytes());
1871 } else if (regionInfo == null) {
1872 wal = walFactory.getWAL(UNSPECIFIED_REGION);
1873 } else {
1874 wal = walFactory.getWAL(regionInfo.getEncodedNameAsBytes());
1875 }
1876 roller.addWAL(wal);
1877 return wal;
1878 }
1879
1880 @Override
1881 public ClusterConnection getConnection() {
1882 return this.clusterConnection;
1883 }
1884
1885 @Override
1886 public MetaTableLocator getMetaTableLocator() {
1887 return this.metaTableLocator;
1888 }
1889
1890 @Override
1891 public void stop(final String msg) {
1892 stop(msg, false);
1893 }
1894
1895
1896
1897
1898
1899
1900 public void stop(final String msg, final boolean force) {
1901 if (!this.stopped) {
1902 if (this.rsHost != null) {
1903
1904 try {
1905 this.rsHost.preStop(msg);
1906 } catch (IOException ioe) {
1907 if (!force) {
1908 LOG.warn("The region server did not stop", ioe);
1909 return;
1910 }
1911 LOG.warn("Skipping coprocessor exception on preStop() due to forced shutdown", ioe);
1912 }
1913 }
1914 this.stopped = true;
1915 LOG.info("STOPPED: " + msg);
1916
1917 sleeper.skipSleepCycle();
1918 }
1919 }
1920
1921 public void waitForServerOnline(){
1922 while (!isStopped() && !isOnline()) {
1923 synchronized (online) {
1924 try {
1925 online.wait(msgInterval);
1926 } catch (InterruptedException ie) {
1927 Thread.currentThread().interrupt();
1928 break;
1929 }
1930 }
1931 }
1932 }
1933
1934 @Override
1935 public void postOpenDeployTasks(final Region r) throws KeeperException, IOException {
1936 postOpenDeployTasks(new PostOpenDeployContext(r, -1));
1937 }
1938
1939 @Override
1940 public void postOpenDeployTasks(final PostOpenDeployContext context)
1941 throws KeeperException, IOException {
1942 Region r = context.getRegion();
1943 long masterSystemTime = context.getMasterSystemTime();
1944 Preconditions.checkArgument(r instanceof HRegion, "r must be an HRegion");
1945 rpcServices.checkOpen();
1946 LOG.info("Post open deploy tasks for " + r.getRegionInfo().getRegionNameAsString());
1947
1948 for (Store s : r.getStores()) {
1949 if (s.hasReferences() || s.needsCompaction()) {
1950 this.compactSplitThread.requestSystemCompaction(r, s, "Opening Region");
1951 }
1952 }
1953 long openSeqNum = r.getOpenSeqNum();
1954 if (openSeqNum == HConstants.NO_SEQNUM) {
1955
1956 LOG.error("No sequence number found when opening " +
1957 r.getRegionInfo().getRegionNameAsString());
1958 openSeqNum = 0;
1959 }
1960
1961
1962 updateRecoveringRegionLastFlushedSequenceId(r);
1963
1964
1965 if (r.getRegionInfo().isMetaRegion()) {
1966 MetaTableLocator.setMetaLocation(getZooKeeper(), serverName, r.getRegionInfo().getReplicaId(),
1967 State.OPEN);
1968 } else if (useZKForAssignment) {
1969 MetaTableAccessor.updateRegionLocation(getConnection(), r.getRegionInfo(),
1970 this.serverName, openSeqNum, masterSystemTime);
1971 }
1972 if (!useZKForAssignment && !reportRegionStateTransition(new RegionStateTransitionContext(
1973 TransitionCode.OPENED, openSeqNum, masterSystemTime, r.getRegionInfo()))) {
1974 throw new IOException("Failed to report opened region to master: "
1975 + r.getRegionInfo().getRegionNameAsString());
1976 }
1977
1978 triggerFlushInPrimaryRegion((HRegion)r);
1979
1980 LOG.debug("Finished post open deploy task for " + r.getRegionInfo().getRegionNameAsString());
1981 }
1982
1983 @Override
1984 public boolean reportRegionStateTransition(TransitionCode code, HRegionInfo... hris) {
1985 return reportRegionStateTransition(code, HConstants.NO_SEQNUM, hris);
1986 }
1987
1988 @Override
1989 public boolean reportRegionStateTransition(
1990 TransitionCode code, long openSeqNum, HRegionInfo... hris) {
1991 return reportRegionStateTransition(
1992 new RegionStateTransitionContext(code, HConstants.NO_SEQNUM, -1, hris));
1993 }
1994
1995 @Override
1996 public boolean reportRegionStateTransition(final RegionStateTransitionContext context) {
1997 TransitionCode code = context.getCode();
1998 long openSeqNum = context.getOpenSeqNum();
1999 HRegionInfo[] hris = context.getHris();
2000
2001 ReportRegionStateTransitionRequest.Builder builder =
2002 ReportRegionStateTransitionRequest.newBuilder();
2003 builder.setServer(ProtobufUtil.toServerName(serverName));
2004 RegionStateTransition.Builder transition = builder.addTransitionBuilder();
2005 transition.setTransitionCode(code);
2006 if (code == TransitionCode.OPENED && openSeqNum >= 0) {
2007 transition.setOpenSeqNum(openSeqNum);
2008 }
2009 for (HRegionInfo hri: hris) {
2010 transition.addRegionInfo(HRegionInfo.convert(hri));
2011 }
2012 ReportRegionStateTransitionRequest request = builder.build();
2013 while (keepLooping()) {
2014 RegionServerStatusService.BlockingInterface rss = rssStub;
2015 try {
2016 if (rss == null) {
2017 createRegionServerStatusStub();
2018 continue;
2019 }
2020 ReportRegionStateTransitionResponse response =
2021 rss.reportRegionStateTransition(null, request);
2022 if (response.hasErrorMessage()) {
2023 LOG.info("Failed to transition " + hris[0]
2024 + " to " + code + ": " + response.getErrorMessage());
2025 return false;
2026 }
2027 return true;
2028 } catch (ServiceException se) {
2029 IOException ioe = ProtobufUtil.getRemoteException(se);
2030 LOG.info("Failed to report region transition, will retry", ioe);
2031 if (rssStub == rss) {
2032 rssStub = null;
2033 }
2034 }
2035 }
2036 return false;
2037 }
2038
2039
2040
2041
2042
2043 void triggerFlushInPrimaryRegion(final HRegion region) {
2044 if (ServerRegionReplicaUtil.isDefaultReplica(region.getRegionInfo())) {
2045 return;
2046 }
2047 if (!ServerRegionReplicaUtil.isRegionReplicaReplicationEnabled(region.conf) ||
2048 !ServerRegionReplicaUtil.isRegionReplicaWaitForPrimaryFlushEnabled(
2049 region.conf)) {
2050 region.setReadsEnabled(true);
2051 return;
2052 }
2053
2054 region.setReadsEnabled(false);
2055
2056
2057
2058 this.service.submit(
2059 new RegionReplicaFlushHandler(this, clusterConnection,
2060 rpcRetryingCallerFactory, rpcControllerFactory, operationTimeout, region));
2061 }
2062
2063 @Override
2064 public RpcServerInterface getRpcServer() {
2065 return rpcServices.rpcServer;
2066 }
2067
2068 @VisibleForTesting
2069 public RSRpcServices getRSRpcServices() {
2070 return rpcServices;
2071 }
2072
2073
2074
2075
2076
2077
2078
2079
2080
2081
2082
2083 @Override
2084 public void abort(final String reason, Throwable cause) {
2085 String msg = "ABORTING region server " + this + ": " + reason;
2086 if (cause != null) {
2087 LOG.fatal(msg, cause);
2088 } else {
2089 LOG.fatal(msg);
2090 }
2091 this.abortRequested = true;
2092
2093
2094
2095 LOG.fatal("RegionServer abort: loaded coprocessors are: " +
2096 CoprocessorHost.getLoadedCoprocessors());
2097
2098 try {
2099 LOG.info("Dump of metrics as JSON on abort: " + JSONBean.dumpRegionServerMetrics());
2100 } catch (MalformedObjectNameException | IOException e) {
2101 LOG.warn("Failed dumping metrics", e);
2102 }
2103
2104
2105 try {
2106 if (cause != null) {
2107 msg += "\nCause:\n" + StringUtils.stringifyException(cause);
2108 }
2109
2110 if (rssStub != null && this.serverName != null) {
2111 ReportRSFatalErrorRequest.Builder builder =
2112 ReportRSFatalErrorRequest.newBuilder();
2113 ServerName sn =
2114 ServerName.parseVersionedServerName(this.serverName.getVersionedBytes());
2115 builder.setServer(ProtobufUtil.toServerName(sn));
2116 builder.setErrorMessage(msg);
2117 rssStub.reportRSFatalError(null, builder.build());
2118 }
2119 } catch (Throwable t) {
2120 LOG.warn("Unable to report fatal error to master", t);
2121 }
2122
2123 if (User.isHBaseSecurityEnabled(conf)) {
2124 try {
2125 User.runAsLoginUser(new PrivilegedExceptionAction<Object>() {
2126 @Override
2127 public Object run() throws Exception {
2128 stop(reason, true);
2129 return null;
2130 }
2131 });
2132 } catch (IOException neverThrown) {
2133 }
2134 } else {
2135 stop(reason, true);
2136 }
2137 }
2138
2139
2140
2141
2142 public void abort(String reason) {
2143 abort(reason, null);
2144 }
2145
2146 @Override
2147 public boolean isAborted() {
2148 return this.abortRequested;
2149 }
2150
2151
2152
2153
2154
2155
2156 protected void kill() {
2157 this.killed = true;
2158 abort("Simulated kill");
2159 }
2160
2161
2162
2163
2164 protected void sendShutdownInterrupt() {
2165 }
2166
2167
2168
2169
2170
2171 protected void stopServiceThreads() {
2172
2173 if (this.choreService != null) choreService.shutdown();
2174 if (this.nonceManagerChore != null) nonceManagerChore.cancel(true);
2175 if (this.compactionChecker != null) compactionChecker.cancel(true);
2176 if (this.periodicFlusher != null) periodicFlusher.cancel(true);
2177 if (this.healthCheckChore != null) healthCheckChore.cancel(true);
2178 if (this.storefileRefresher != null) storefileRefresher.cancel(true);
2179 if (this.movedRegionsCleaner != null) movedRegionsCleaner.cancel(true);
2180
2181 if (this.cacheFlusher != null) {
2182 this.cacheFlusher.join();
2183 }
2184
2185 if (this.spanReceiverHost != null) {
2186 this.spanReceiverHost.closeReceivers();
2187 }
2188 if (this.walRoller != null) {
2189 Threads.shutdown(this.walRoller.getThread());
2190 }
2191 final LogRoller metawalRoller = this.metawalRoller.get();
2192 if (metawalRoller != null) {
2193 Threads.shutdown(metawalRoller.getThread());
2194 }
2195 if (this.compactSplitThread != null) {
2196 this.compactSplitThread.join();
2197 }
2198 if (this.service != null) this.service.shutdown();
2199 if (this.replicationSourceHandler != null &&
2200 this.replicationSourceHandler == this.replicationSinkHandler) {
2201 this.replicationSourceHandler.stopReplicationService();
2202 } else {
2203 if (this.replicationSourceHandler != null) {
2204 this.replicationSourceHandler.stopReplicationService();
2205 }
2206 if (this.replicationSinkHandler != null) {
2207 this.replicationSinkHandler.stopReplicationService();
2208 }
2209 }
2210 }
2211
2212
2213
2214
2215
2216 @VisibleForTesting
2217 public ReplicationSourceService getReplicationSourceService() {
2218 return replicationSourceHandler;
2219 }
2220
2221
2222
2223
2224
2225 ReplicationSinkService getReplicationSinkService() {
2226 return replicationSinkHandler;
2227 }
2228
2229
2230
2231
2232
2233
2234
2235
2236
2237 @VisibleForTesting
2238 protected synchronized ServerName createRegionServerStatusStub() {
2239
2240 return createRegionServerStatusStub(false);
2241 }
2242
2243
2244
2245
2246
2247
2248
2249
2250 @VisibleForTesting
2251 protected synchronized ServerName createRegionServerStatusStub(boolean refresh) {
2252 if (rssStub != null) {
2253 return masterAddressTracker.getMasterAddress();
2254 }
2255 ServerName sn = null;
2256 long previousLogTime = 0;
2257 RegionServerStatusService.BlockingInterface intf = null;
2258 boolean interrupted = false;
2259 try {
2260 while (keepLooping()) {
2261 sn = this.masterAddressTracker.getMasterAddress(refresh);
2262 if (sn == null) {
2263 if (!keepLooping()) {
2264
2265 LOG.debug("No master found and cluster is stopped; bailing out");
2266 return null;
2267 }
2268 if (System.currentTimeMillis() > (previousLogTime + 1000)) {
2269 LOG.debug("No master found; retry");
2270 previousLogTime = System.currentTimeMillis();
2271 }
2272 refresh = true;
2273 if (sleep(200)) {
2274 interrupted = true;
2275 }
2276 continue;
2277 }
2278
2279
2280 if (this instanceof HMaster && sn.equals(getServerName())) {
2281 intf = ((HMaster)this).getMasterRpcServices();
2282 break;
2283 }
2284 try {
2285 BlockingRpcChannel channel =
2286 this.rpcClient.createBlockingRpcChannel(sn, userProvider.getCurrent(),
2287 shortOperationTimeout);
2288 intf = RegionServerStatusService.newBlockingStub(channel);
2289 break;
2290 } catch (IOException e) {
2291 if (System.currentTimeMillis() > (previousLogTime + 1000)) {
2292 e = e instanceof RemoteException ?
2293 ((RemoteException)e).unwrapRemoteException() : e;
2294 if (e instanceof ServerNotRunningYetException) {
2295 LOG.info("Master isn't available yet, retrying");
2296 } else {
2297 LOG.warn("Unable to connect to master. Retrying. Error was:", e);
2298 }
2299 previousLogTime = System.currentTimeMillis();
2300 }
2301 if (sleep(200)) {
2302 interrupted = true;
2303 }
2304 }
2305 }
2306 } finally {
2307 if (interrupted) {
2308 Thread.currentThread().interrupt();
2309 }
2310 }
2311 rssStub = intf;
2312 return sn;
2313 }
2314
2315
2316
2317
2318
2319 private boolean keepLooping() {
2320 return !this.stopped && isClusterUp();
2321 }
2322
2323
2324
2325
2326
2327
2328
2329
2330 private RegionServerStartupResponse reportForDuty() throws IOException {
2331 ServerName masterServerName = createRegionServerStatusStub(true);
2332 if (masterServerName == null) return null;
2333 RegionServerStartupResponse result = null;
2334 try {
2335 rpcServices.requestCount.set(0);
2336 LOG.info("reportForDuty to master=" + masterServerName + " with port="
2337 + rpcServices.isa.getPort() + ", startcode=" + this.startcode);
2338 long now = EnvironmentEdgeManager.currentTime();
2339 int port = rpcServices.isa.getPort();
2340 RegionServerStartupRequest.Builder request = RegionServerStartupRequest.newBuilder();
2341 if (shouldUseThisHostnameInstead()) {
2342 request.setUseThisHostnameInstead(useThisHostnameInstead);
2343 }
2344 request.setPort(port);
2345 request.setServerStartCode(this.startcode);
2346 request.setServerCurrentTime(now);
2347 result = this.rssStub.regionServerStartup(null, request.build());
2348 } catch (ServiceException se) {
2349 IOException ioe = ProtobufUtil.getRemoteException(se);
2350 if (ioe instanceof ClockOutOfSyncException) {
2351 LOG.fatal("Master rejected startup because clock is out of sync", ioe);
2352
2353 throw ioe;
2354 } else if (ioe instanceof ServerNotRunningYetException) {
2355 LOG.debug("Master is not running yet");
2356 } else {
2357 LOG.warn("error telling master we are up", se);
2358 }
2359 rssStub = null;
2360 }
2361 return result;
2362 }
2363
2364 @Override
2365 public RegionStoreSequenceIds getLastSequenceId(byte[] encodedRegionName) {
2366 try {
2367 GetLastFlushedSequenceIdRequest req =
2368 RequestConverter.buildGetLastFlushedSequenceIdRequest(encodedRegionName);
2369 RegionServerStatusService.BlockingInterface rss = rssStub;
2370 if (rss == null) {
2371 createRegionServerStatusStub();
2372 rss = rssStub;
2373 if (rss == null) {
2374
2375 LOG.warn("Unable to connect to the master to check " + "the last flushed sequence id");
2376 return RegionStoreSequenceIds.newBuilder().setLastFlushedSequenceId(HConstants.NO_SEQNUM)
2377 .build();
2378 }
2379 }
2380 GetLastFlushedSequenceIdResponse resp = rss.getLastFlushedSequenceId(null, req);
2381 return RegionStoreSequenceIds.newBuilder()
2382 .setLastFlushedSequenceId(resp.getLastFlushedSequenceId())
2383 .addAllStoreSequenceId(resp.getStoreLastFlushedSequenceIdList()).build();
2384 } catch (ServiceException e) {
2385 LOG.warn("Unable to connect to the master to check the last flushed sequence id", e);
2386 return RegionStoreSequenceIds.newBuilder().setLastFlushedSequenceId(HConstants.NO_SEQNUM)
2387 .build();
2388 }
2389 }
2390
2391
2392
2393
2394
2395
2396 protected void closeAllRegions(final boolean abort) {
2397 closeUserRegions(abort);
2398 closeMetaTableRegions(abort);
2399 }
2400
2401
2402
2403
2404
2405 void closeMetaTableRegions(final boolean abort) {
2406 Region meta = null;
2407 this.lock.writeLock().lock();
2408 try {
2409 for (Map.Entry<String, Region> e: onlineRegions.entrySet()) {
2410 HRegionInfo hri = e.getValue().getRegionInfo();
2411 if (hri.isMetaRegion()) {
2412 meta = e.getValue();
2413 }
2414 if (meta != null) break;
2415 }
2416 } finally {
2417 this.lock.writeLock().unlock();
2418 }
2419 if (meta != null) closeRegionIgnoreErrors(meta.getRegionInfo(), abort);
2420 }
2421
2422
2423
2424
2425
2426
2427
2428 void closeUserRegions(final boolean abort) {
2429 this.lock.writeLock().lock();
2430 try {
2431 for (Map.Entry<String, Region> e: this.onlineRegions.entrySet()) {
2432 Region r = e.getValue();
2433 if (!r.getRegionInfo().isMetaTable() && r.isAvailable()) {
2434
2435 closeRegionIgnoreErrors(r.getRegionInfo(), abort);
2436 }
2437 }
2438 } finally {
2439 this.lock.writeLock().unlock();
2440 }
2441 }
2442
2443
2444 public InfoServer getInfoServer() {
2445 return infoServer;
2446 }
2447
2448
2449
2450
2451 @Override
2452 public boolean isStopped() {
2453 return this.stopped;
2454 }
2455
2456 @Override
2457 public boolean isStopping() {
2458 return this.stopping;
2459 }
2460
2461 @Override
2462 public Map<String, Region> getRecoveringRegions() {
2463 return this.recoveringRegions;
2464 }
2465
2466
2467
2468
2469
2470 @Override
2471 public Configuration getConfiguration() {
2472 return conf;
2473 }
2474
2475
2476 ReentrantReadWriteLock.WriteLock getWriteLock() {
2477 return lock.writeLock();
2478 }
2479
2480 public int getNumberOfOnlineRegions() {
2481 return this.onlineRegions.size();
2482 }
2483
2484 boolean isOnlineRegionsEmpty() {
2485 return this.onlineRegions.isEmpty();
2486 }
2487
2488
2489
2490
2491
2492
2493 public Collection<Region> getOnlineRegionsLocalContext() {
2494 Collection<Region> regions = this.onlineRegions.values();
2495 return Collections.unmodifiableCollection(regions);
2496 }
2497
2498 @Override
2499 public void addToOnlineRegions(Region region) {
2500 this.onlineRegions.put(region.getRegionInfo().getEncodedName(), region);
2501 configurationManager.registerObserver(region);
2502 }
2503
2504
2505
2506
2507
2508
2509 SortedMap<Long, Region> getCopyOfOnlineRegionsSortedBySize() {
2510
2511 SortedMap<Long, Region> sortedRegions = new TreeMap<Long, Region>(
2512 new Comparator<Long>() {
2513 @Override
2514 public int compare(Long a, Long b) {
2515 return -1 * a.compareTo(b);
2516 }
2517 });
2518
2519 for (Region region : this.onlineRegions.values()) {
2520 sortedRegions.put(region.getMemstoreSize(), region);
2521 }
2522 return sortedRegions;
2523 }
2524
2525
2526
2527
2528 public long getStartcode() {
2529 return this.startcode;
2530 }
2531
2532
2533 @Override
2534 public FlushRequester getFlushRequester() {
2535 return this.cacheFlusher;
2536 }
2537
2538
2539
2540
2541
2542
2543
2544 protected HRegionInfo[] getMostLoadedRegions() {
2545 ArrayList<HRegionInfo> regions = new ArrayList<HRegionInfo>();
2546 for (Region r : onlineRegions.values()) {
2547 if (!r.isAvailable()) {
2548 continue;
2549 }
2550 if (regions.size() < numRegionsToReport) {
2551 regions.add(r.getRegionInfo());
2552 } else {
2553 break;
2554 }
2555 }
2556 return regions.toArray(new HRegionInfo[regions.size()]);
2557 }
2558
2559 @Override
2560 public Leases getLeases() {
2561 return leases;
2562 }
2563
2564
2565
2566
2567 protected Path getRootDir() {
2568 return rootDir;
2569 }
2570
2571
2572
2573
2574 @Override
2575 public FileSystem getFileSystem() {
2576 return fs;
2577 }
2578
2579 @Override
2580 public String toString() {
2581 return getServerName().toString();
2582 }
2583
2584
2585
2586
2587
2588
2589 public int getThreadWakeFrequency() {
2590 return threadWakeFrequency;
2591 }
2592
2593 @Override
2594 public ZooKeeperWatcher getZooKeeper() {
2595 return zooKeeper;
2596 }
2597
2598 @Override
2599 public BaseCoordinatedStateManager getCoordinatedStateManager() {
2600 return csm;
2601 }
2602
2603 @Override
2604 public ServerName getServerName() {
2605 return serverName;
2606 }
2607
2608 @Override
2609 public CompactionRequestor getCompactionRequester() {
2610 return this.compactSplitThread;
2611 }
2612
2613 public RegionServerCoprocessorHost getRegionServerCoprocessorHost(){
2614 return this.rsHost;
2615 }
2616
2617 @Override
2618 public ConcurrentMap<byte[], Boolean> getRegionsInTransitionInRS() {
2619 return this.regionsInTransitionInRS;
2620 }
2621
2622 @Override
2623 public ExecutorService getExecutorService() {
2624 return service;
2625 }
2626
2627 @Override
2628 public ChoreService getChoreService() {
2629 return choreService;
2630 }
2631
2632 @Override
2633 public RegionServerQuotaManager getRegionServerQuotaManager() {
2634 return rsQuotaManager;
2635 }
2636
2637
2638
2639
2640
2641
2642
2643
2644 static private void createNewReplicationInstance(Configuration conf,
2645 HRegionServer server, FileSystem fs, Path logDir, Path oldLogDir) throws IOException{
2646
2647
2648 if (!conf.getBoolean(HConstants.REPLICATION_ENABLE_KEY,
2649 HConstants.REPLICATION_ENABLE_DEFAULT)) {
2650 return;
2651 }
2652
2653
2654 String sourceClassname = conf.get(HConstants.REPLICATION_SOURCE_SERVICE_CLASSNAME,
2655 HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT);
2656
2657
2658 String sinkClassname = conf.get(HConstants.REPLICATION_SINK_SERVICE_CLASSNAME,
2659 HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT);
2660
2661
2662
2663 if (sourceClassname.equals(sinkClassname)) {
2664 server.replicationSourceHandler = (ReplicationSourceService)
2665 newReplicationInstance(sourceClassname,
2666 conf, server, fs, logDir, oldLogDir);
2667 server.replicationSinkHandler = (ReplicationSinkService)
2668 server.replicationSourceHandler;
2669 } else {
2670 server.replicationSourceHandler = (ReplicationSourceService)
2671 newReplicationInstance(sourceClassname,
2672 conf, server, fs, logDir, oldLogDir);
2673 server.replicationSinkHandler = (ReplicationSinkService)
2674 newReplicationInstance(sinkClassname,
2675 conf, server, fs, logDir, oldLogDir);
2676 }
2677 }
2678
2679 static private ReplicationService newReplicationInstance(String classname,
2680 Configuration conf, HRegionServer server, FileSystem fs, Path logDir,
2681 Path oldLogDir) throws IOException{
2682
2683 Class<?> clazz = null;
2684 try {
2685 ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
2686 clazz = Class.forName(classname, true, classLoader);
2687 } catch (java.lang.ClassNotFoundException nfe) {
2688 throw new IOException("Could not find class for " + classname);
2689 }
2690
2691
2692 ReplicationService service = (ReplicationService)
2693 ReflectionUtils.newInstance(clazz, conf);
2694 service.initialize(server, fs, logDir, oldLogDir);
2695 return service;
2696 }
2697
2698
2699
2700
2701
2702
2703
2704
2705 public static HRegionServer constructRegionServer(
2706 Class<? extends HRegionServer> regionServerClass,
2707 final Configuration conf2, CoordinatedStateManager cp) {
2708 try {
2709 Constructor<? extends HRegionServer> c = regionServerClass
2710 .getConstructor(Configuration.class, CoordinatedStateManager.class);
2711 return c.newInstance(conf2, cp);
2712 } catch (Exception e) {
2713 throw new RuntimeException("Failed construction of " + "Regionserver: "
2714 + regionServerClass.toString(), e);
2715 }
2716 }
2717
2718
2719
2720
2721 public static void main(String[] args) throws Exception {
2722 VersionInfo.logVersion();
2723 Configuration conf = HBaseConfiguration.create();
2724 @SuppressWarnings("unchecked")
2725 Class<? extends HRegionServer> regionServerClass = (Class<? extends HRegionServer>) conf
2726 .getClass(HConstants.REGION_SERVER_IMPL, HRegionServer.class);
2727
2728 new HRegionServerCommandLine(regionServerClass).doMain(args);
2729 }
2730
2731
2732
2733
2734
2735
2736
2737
2738
2739
2740
2741 @Override
2742 public List<Region> getOnlineRegions(TableName tableName) {
2743 List<Region> tableRegions = new ArrayList<Region>();
2744 synchronized (this.onlineRegions) {
2745 for (Region region: this.onlineRegions.values()) {
2746 HRegionInfo regionInfo = region.getRegionInfo();
2747 if(regionInfo.getTable().equals(tableName)) {
2748 tableRegions.add(region);
2749 }
2750 }
2751 }
2752 return tableRegions;
2753 }
2754
2755
2756
2757
2758
2759
2760 @Override
2761 public Set<TableName> getOnlineTables() {
2762 Set<TableName> tables = new HashSet<TableName>();
2763 synchronized (this.onlineRegions) {
2764 for (Region region: this.onlineRegions.values()) {
2765 tables.add(region.getTableDesc().getTableName());
2766 }
2767 }
2768 return tables;
2769 }
2770
2771
2772 public String[] getRegionServerCoprocessors() {
2773 TreeSet<String> coprocessors = new TreeSet<String>();
2774 try {
2775 coprocessors.addAll(getWAL(null).getCoprocessorHost().getCoprocessors());
2776 } catch (IOException exception) {
2777 LOG.warn("Exception attempting to fetch wal coprocessor information for the common wal; " +
2778 "skipping.");
2779 LOG.debug("Exception details for failure to fetch wal coprocessor information.", exception);
2780 }
2781 Collection<Region> regions = getOnlineRegionsLocalContext();
2782 for (Region region: regions) {
2783 coprocessors.addAll(region.getCoprocessorHost().getCoprocessors());
2784 try {
2785 coprocessors.addAll(getWAL(region.getRegionInfo()).getCoprocessorHost().getCoprocessors());
2786 } catch (IOException exception) {
2787 LOG.warn("Exception attempting to fetch wal coprocessor information for region " + region +
2788 "; skipping.");
2789 LOG.debug("Exception details for failure to fetch wal coprocessor information.", exception);
2790 }
2791 }
2792 return coprocessors.toArray(new String[coprocessors.size()]);
2793 }
2794
2795
2796
2797
2798
2799 private void closeRegionIgnoreErrors(HRegionInfo region, final boolean abort) {
2800 try {
2801 CloseRegionCoordination.CloseRegionDetails details =
2802 csm.getCloseRegionCoordination().getDetaultDetails();
2803 if (!closeRegion(region.getEncodedName(), abort, details, null)) {
2804 LOG.warn("Failed to close " + region.getRegionNameAsString() +
2805 " - ignoring and continuing");
2806 }
2807 } catch (IOException e) {
2808 LOG.warn("Failed to close " + region.getRegionNameAsString() +
2809 " - ignoring and continuing", e);
2810 }
2811 }
2812
2813
2814
2815
2816
2817
2818
2819
2820
2821
2822
2823
2824
2825
2826
2827
2828
2829
2830
2831
2832
2833 protected boolean closeRegion(String encodedName, final boolean abort,
2834 CloseRegionCoordination.CloseRegionDetails crd, final ServerName sn)
2835 throws NotServingRegionException, RegionAlreadyInTransitionException {
2836
2837 Region actualRegion = this.getFromOnlineRegions(encodedName);
2838
2839 if ((actualRegion != null) && (actualRegion.getCoprocessorHost() != null)) {
2840 try {
2841 actualRegion.getCoprocessorHost().preClose(false);
2842 } catch (IOException exp) {
2843 LOG.warn("Unable to close region: the coprocessor launched an error ", exp);
2844 return false;
2845 }
2846 }
2847
2848 final Boolean previous = this.regionsInTransitionInRS.putIfAbsent(encodedName.getBytes(),
2849 Boolean.FALSE);
2850
2851 if (Boolean.TRUE.equals(previous)) {
2852 LOG.info("Received CLOSE for the region:" + encodedName + " , which we are already " +
2853 "trying to OPEN. Cancelling OPENING.");
2854 if (!regionsInTransitionInRS.replace(encodedName.getBytes(), previous, Boolean.FALSE)){
2855
2856
2857 LOG.warn("The opening for region " + encodedName + " was done before we could cancel it." +
2858 " Doing a standard close now");
2859 return closeRegion(encodedName, abort, crd, sn);
2860 }
2861
2862 actualRegion = this.getFromOnlineRegions(encodedName);
2863 if (actualRegion == null) {
2864 LOG.info("The opening previously in progress has been cancelled by a CLOSE request.");
2865
2866 throw new RegionAlreadyInTransitionException("The region " + encodedName +
2867 " was opening but not yet served. Opening is cancelled.");
2868 }
2869 } else if (Boolean.FALSE.equals(previous)) {
2870 LOG.info("Received CLOSE for the region: " + encodedName +
2871 ", which we are already trying to CLOSE, but not completed yet");
2872
2873
2874
2875
2876
2877
2878 throw new RegionAlreadyInTransitionException("The region " + encodedName +
2879 " was already closing. New CLOSE request is ignored.");
2880 }
2881
2882 if (actualRegion == null) {
2883 LOG.error("Received CLOSE for a region which is not online, and we're not opening.");
2884 this.regionsInTransitionInRS.remove(encodedName.getBytes());
2885
2886 throw new NotServingRegionException("The region " + encodedName +
2887 " is not online, and is not opening.");
2888 }
2889
2890 CloseRegionHandler crh;
2891 final HRegionInfo hri = actualRegion.getRegionInfo();
2892 if (hri.isMetaRegion()) {
2893 crh = new CloseMetaHandler(this, this, hri, abort,
2894 csm.getCloseRegionCoordination(), crd);
2895 } else {
2896 crh = new CloseRegionHandler(this, this, hri, abort,
2897 csm.getCloseRegionCoordination(), crd, sn);
2898 }
2899 this.service.submit(crh);
2900 return true;
2901 }
2902
2903
2904
2905
2906
2907
2908 public Region getOnlineRegion(final byte[] regionName) {
2909 String encodedRegionName = HRegionInfo.encodeRegionName(regionName);
2910 return this.onlineRegions.get(encodedRegionName);
2911 }
2912
2913 public InetSocketAddress[] getRegionBlockLocations(final String encodedRegionName) {
2914 return this.regionFavoredNodesMap.get(encodedRegionName);
2915 }
2916
2917 @Override
2918 public Region getFromOnlineRegions(final String encodedRegionName) {
2919 return this.onlineRegions.get(encodedRegionName);
2920 }
2921
2922
2923 @Override
2924 public boolean removeFromOnlineRegions(final Region r, ServerName destination) {
2925 Region toReturn = this.onlineRegions.remove(r.getRegionInfo().getEncodedName());
2926 if (destination != null) {
2927 long closeSeqNum = r.getMaxFlushedSeqId();
2928 if (closeSeqNum == HConstants.NO_SEQNUM) {
2929
2930 closeSeqNum = r.getOpenSeqNum();
2931 if (closeSeqNum == HConstants.NO_SEQNUM) closeSeqNum = 0;
2932 }
2933 addToMovedRegions(r.getRegionInfo().getEncodedName(), destination, closeSeqNum);
2934 }
2935 this.regionFavoredNodesMap.remove(r.getRegionInfo().getEncodedName());
2936 return toReturn != null;
2937 }
2938
2939
2940
2941
2942
2943
2944
2945
2946
2947 protected Region getRegion(final byte[] regionName)
2948 throws NotServingRegionException {
2949 String encodedRegionName = HRegionInfo.encodeRegionName(regionName);
2950 return getRegionByEncodedName(regionName, encodedRegionName);
2951 }
2952
2953 public Region getRegionByEncodedName(String encodedRegionName)
2954 throws NotServingRegionException {
2955 return getRegionByEncodedName(null, encodedRegionName);
2956 }
2957
2958 protected Region getRegionByEncodedName(byte[] regionName, String encodedRegionName)
2959 throws NotServingRegionException {
2960 Region region = this.onlineRegions.get(encodedRegionName);
2961 if (region == null) {
2962 MovedRegionInfo moveInfo = getMovedRegion(encodedRegionName);
2963 if (moveInfo != null) {
2964 throw new RegionMovedException(moveInfo.getServerName(), moveInfo.getSeqNum());
2965 }
2966 Boolean isOpening = this.regionsInTransitionInRS.get(Bytes.toBytes(encodedRegionName));
2967 String regionNameStr = regionName == null?
2968 encodedRegionName: Bytes.toStringBinary(regionName);
2969 if (isOpening != null && isOpening.booleanValue()) {
2970 throw new RegionOpeningException("Region " + regionNameStr +
2971 " is opening on " + this.serverName);
2972 }
2973 throw new NotServingRegionException("Region " + regionNameStr +
2974 " is not online on " + this.serverName);
2975 }
2976 return region;
2977 }
2978
2979
2980
2981
2982
2983
2984
2985
2986
2987
2988
2989 private Throwable cleanup(final Throwable t, final String msg) {
2990
2991 if (t instanceof NotServingRegionException) {
2992 LOG.debug("NotServingRegionException; " + t.getMessage());
2993 return t;
2994 }
2995 if (msg == null) {
2996 LOG.error("", RemoteExceptionHandler.checkThrowable(t));
2997 } else {
2998 LOG.error(msg, RemoteExceptionHandler.checkThrowable(t));
2999 }
3000 if (!rpcServices.checkOOME(t)) {
3001 checkFileSystem();
3002 }
3003 return t;
3004 }
3005
3006
3007
3008
3009
3010
3011
3012
3013 protected IOException convertThrowableToIOE(final Throwable t, final String msg) {
3014 return (t instanceof IOException ? (IOException) t : msg == null
3015 || msg.length() == 0 ? new IOException(t) : new IOException(msg, t));
3016 }
3017
3018
3019
3020
3021
3022
3023
3024 public boolean checkFileSystem() {
3025 if (this.fsOk && this.fs != null) {
3026 try {
3027 FSUtils.checkFileSystemAvailable(this.fs);
3028 } catch (IOException e) {
3029 abort("File System not available", e);
3030 this.fsOk = false;
3031 }
3032 }
3033 return this.fsOk;
3034 }
3035
3036 @Override
3037 public void updateRegionFavoredNodesMapping(String encodedRegionName,
3038 List<org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName> favoredNodes) {
3039 InetSocketAddress[] addr = new InetSocketAddress[favoredNodes.size()];
3040
3041
3042 for (int i = 0; i < favoredNodes.size(); i++) {
3043 addr[i] = InetSocketAddress.createUnresolved(favoredNodes.get(i).getHostName(),
3044 favoredNodes.get(i).getPort());
3045 }
3046 regionFavoredNodesMap.put(encodedRegionName, addr);
3047 }
3048
3049
3050
3051
3052
3053
3054
3055 @Override
3056 public InetSocketAddress[] getFavoredNodesForRegion(String encodedRegionName) {
3057 return regionFavoredNodesMap.get(encodedRegionName);
3058 }
3059
3060 @Override
3061 public ServerNonceManager getNonceManager() {
3062 return this.nonceManager;
3063 }
3064
3065 private static class MovedRegionInfo {
3066 private final ServerName serverName;
3067 private final long seqNum;
3068 private final long ts;
3069
3070 public MovedRegionInfo(ServerName serverName, long closeSeqNum) {
3071 this.serverName = serverName;
3072 this.seqNum = closeSeqNum;
3073 ts = EnvironmentEdgeManager.currentTime();
3074 }
3075
3076 public ServerName getServerName() {
3077 return serverName;
3078 }
3079
3080 public long getSeqNum() {
3081 return seqNum;
3082 }
3083
3084 public long getMoveTime() {
3085 return ts;
3086 }
3087 }
3088
3089
3090
3091 protected Map<String, MovedRegionInfo> movedRegions =
3092 new ConcurrentHashMap<String, MovedRegionInfo>(3000);
3093
3094
3095
3096 private static final int TIMEOUT_REGION_MOVED = (2 * 60 * 1000);
3097
3098 protected void addToMovedRegions(String encodedName, ServerName destination, long closeSeqNum) {
3099 if (ServerName.isSameHostnameAndPort(destination, this.getServerName())) {
3100 LOG.warn("Not adding moved region record: " + encodedName + " to self.");
3101 return;
3102 }
3103 LOG.info("Adding moved region record: "
3104 + encodedName + " to " + destination + " as of " + closeSeqNum);
3105 movedRegions.put(encodedName, new MovedRegionInfo(destination, closeSeqNum));
3106 }
3107
3108 void removeFromMovedRegions(String encodedName) {
3109 movedRegions.remove(encodedName);
3110 }
3111
3112 private MovedRegionInfo getMovedRegion(final String encodedRegionName) {
3113 MovedRegionInfo dest = movedRegions.get(encodedRegionName);
3114
3115 long now = EnvironmentEdgeManager.currentTime();
3116 if (dest != null) {
3117 if (dest.getMoveTime() > (now - TIMEOUT_REGION_MOVED)) {
3118 return dest;
3119 } else {
3120 movedRegions.remove(encodedRegionName);
3121 }
3122 }
3123
3124 return null;
3125 }
3126
3127
3128
3129
3130 protected void cleanMovedRegions() {
3131 final long cutOff = System.currentTimeMillis() - TIMEOUT_REGION_MOVED;
3132 Iterator<Entry<String, MovedRegionInfo>> it = movedRegions.entrySet().iterator();
3133
3134 while (it.hasNext()){
3135 Map.Entry<String, MovedRegionInfo> e = it.next();
3136 if (e.getValue().getMoveTime() < cutOff) {
3137 it.remove();
3138 }
3139 }
3140 }
3141
3142
3143
3144
3145
3146 protected int movedRegionCleanerPeriod() {
3147 return TIMEOUT_REGION_MOVED;
3148 }
3149
3150
3151
3152
3153
3154 protected final static class MovedRegionsCleaner extends ScheduledChore implements Stoppable {
3155 private HRegionServer regionServer;
3156 Stoppable stoppable;
3157
3158 private MovedRegionsCleaner(
3159 HRegionServer regionServer, Stoppable stoppable){
3160 super("MovedRegionsCleaner for region " + regionServer, stoppable,
3161 regionServer.movedRegionCleanerPeriod());
3162 this.regionServer = regionServer;
3163 this.stoppable = stoppable;
3164 }
3165
3166 static MovedRegionsCleaner create(HRegionServer rs){
3167 Stoppable stoppable = new Stoppable() {
3168 private volatile boolean isStopped = false;
3169 @Override public void stop(String why) { isStopped = true;}
3170 @Override public boolean isStopped() {return isStopped;}
3171 };
3172
3173 return new MovedRegionsCleaner(rs, stoppable);
3174 }
3175
3176 @Override
3177 protected void chore() {
3178 regionServer.cleanMovedRegions();
3179 }
3180
3181 @Override
3182 public void stop(String why) {
3183 stoppable.stop(why);
3184 }
3185
3186 @Override
3187 public boolean isStopped() {
3188 return stoppable.isStopped();
3189 }
3190 }
3191
3192 private String getMyEphemeralNodePath() {
3193 return ZKUtil.joinZNode(this.zooKeeper.rsZNode, getServerName().toString());
3194 }
3195
3196 private boolean isHealthCheckerConfigured() {
3197 String healthScriptLocation = this.conf.get(HConstants.HEALTH_SCRIPT_LOC);
3198 return org.apache.commons.lang.StringUtils.isNotBlank(healthScriptLocation);
3199 }
3200
3201
3202
3203
3204 public CompactSplitThread getCompactSplitThread() {
3205 return this.compactSplitThread;
3206 }
3207
3208
3209
3210
3211
3212
3213
3214
3215 private void updateRecoveringRegionLastFlushedSequenceId(Region r) throws KeeperException,
3216 IOException {
3217 if (!r.isRecovering()) {
3218
3219 return;
3220 }
3221
3222 HRegionInfo regionInfo = r.getRegionInfo();
3223 ZooKeeperWatcher zkw = getZooKeeper();
3224 String previousRSName = this.getLastFailedRSFromZK(regionInfo.getEncodedName());
3225 Map<byte[], Long> maxSeqIdInStores = r.getMaxStoreSeqId();
3226 long minSeqIdForLogReplay = -1;
3227 for (Long storeSeqIdForReplay : maxSeqIdInStores.values()) {
3228 if (minSeqIdForLogReplay == -1 || storeSeqIdForReplay < minSeqIdForLogReplay) {
3229 minSeqIdForLogReplay = storeSeqIdForReplay;
3230 }
3231 }
3232
3233 try {
3234 long lastRecordedFlushedSequenceId = -1;
3235 String nodePath = ZKUtil.joinZNode(this.zooKeeper.recoveringRegionsZNode,
3236 regionInfo.getEncodedName());
3237
3238 byte[] data;
3239 try {
3240 data = ZKUtil.getData(zkw, nodePath);
3241 } catch (InterruptedException e) {
3242 throw new InterruptedIOException();
3243 }
3244 if (data != null) {
3245 lastRecordedFlushedSequenceId = ZKSplitLog.parseLastFlushedSequenceIdFrom(data);
3246 }
3247 if (data == null || lastRecordedFlushedSequenceId < minSeqIdForLogReplay) {
3248 ZKUtil.setData(zkw, nodePath, ZKUtil.positionToByteArray(minSeqIdForLogReplay));
3249 }
3250 if (previousRSName != null) {
3251
3252 nodePath = ZKUtil.joinZNode(nodePath, previousRSName);
3253 ZKUtil.setData(zkw, nodePath,
3254 ZKUtil.regionSequenceIdsToByteArray(minSeqIdForLogReplay, maxSeqIdInStores));
3255 LOG.debug("Update last flushed sequence id of region " + regionInfo.getEncodedName() +
3256 " for " + previousRSName);
3257 } else {
3258 LOG.warn("Can't find failed region server for recovering region " +
3259 regionInfo.getEncodedName());
3260 }
3261 } catch (NoNodeException ignore) {
3262 LOG.debug("Region " + regionInfo.getEncodedName() +
3263 " must have completed recovery because its recovery znode has been removed", ignore);
3264 }
3265 }
3266
3267
3268
3269
3270
3271
3272 private String getLastFailedRSFromZK(String encodedRegionName) throws KeeperException {
3273 String result = null;
3274 long maxZxid = 0;
3275 ZooKeeperWatcher zkw = this.getZooKeeper();
3276 String nodePath = ZKUtil.joinZNode(zkw.recoveringRegionsZNode, encodedRegionName);
3277 List<String> failedServers = ZKUtil.listChildrenNoWatch(zkw, nodePath);
3278 if (failedServers == null || failedServers.isEmpty()) {
3279 return result;
3280 }
3281 for (String failedServer : failedServers) {
3282 String rsPath = ZKUtil.joinZNode(nodePath, failedServer);
3283 Stat stat = new Stat();
3284 ZKUtil.getDataNoWatch(zkw, rsPath, stat);
3285 if (maxZxid < stat.getCzxid()) {
3286 maxZxid = stat.getCzxid();
3287 result = failedServer;
3288 }
3289 }
3290 return result;
3291 }
3292
3293 public CoprocessorServiceResponse execRegionServerService(
3294 @SuppressWarnings("UnusedParameters") final RpcController controller,
3295 final CoprocessorServiceRequest serviceRequest) throws ServiceException {
3296 try {
3297 ServerRpcController serviceController = new ServerRpcController();
3298 CoprocessorServiceCall call = serviceRequest.getCall();
3299 String serviceName = call.getServiceName();
3300 String methodName = call.getMethodName();
3301 if (!coprocessorServiceHandlers.containsKey(serviceName)) {
3302 throw new UnknownProtocolException(null,
3303 "No registered coprocessor service found for name " + serviceName);
3304 }
3305 Service service = coprocessorServiceHandlers.get(serviceName);
3306 Descriptors.ServiceDescriptor serviceDesc = service.getDescriptorForType();
3307 Descriptors.MethodDescriptor methodDesc = serviceDesc.findMethodByName(methodName);
3308 if (methodDesc == null) {
3309 throw new UnknownProtocolException(service.getClass(), "Unknown method " + methodName
3310 + " called on service " + serviceName);
3311 }
3312 Message.Builder builderForType = service.getRequestPrototype(methodDesc).newBuilderForType();
3313 ProtobufUtil.mergeFrom(builderForType, call.getRequest());
3314 Message request = builderForType.build();
3315 final Message.Builder responseBuilder =
3316 service.getResponsePrototype(methodDesc).newBuilderForType();
3317 service.callMethod(methodDesc, serviceController, request, new RpcCallback<Message>() {
3318 @Override
3319 public void run(Message message) {
3320 if (message != null) {
3321 responseBuilder.mergeFrom(message);
3322 }
3323 }
3324 });
3325 IOException exception = ResponseConverter.getControllerException(serviceController);
3326 if (exception != null) {
3327 throw exception;
3328 }
3329 Message execResult = responseBuilder.build();
3330 ClientProtos.CoprocessorServiceResponse.Builder builder =
3331 ClientProtos.CoprocessorServiceResponse.newBuilder();
3332 builder.setRegion(RequestConverter.buildRegionSpecifier(RegionSpecifierType.REGION_NAME,
3333 HConstants.EMPTY_BYTE_ARRAY));
3334 builder.setValue(builder.getValueBuilder().setName(execResult.getClass().getName())
3335 .setValue(execResult.toByteString()));
3336 return builder.build();
3337 } catch (IOException ie) {
3338 throw new ServiceException(ie);
3339 }
3340 }
3341
3342
3343
3344
3345 public CacheConfig getCacheConfig() {
3346 return this.cacheConfig;
3347 }
3348
3349
3350
3351
3352 protected ConfigurationManager getConfigurationManager() {
3353 return configurationManager;
3354 }
3355
3356
3357
3358
3359 public TableDescriptors getTableDescriptors() {
3360 return this.tableDescriptors;
3361 }
3362
3363
3364
3365
3366 public void updateConfiguration() {
3367 LOG.info("Reloading the configuration from disk.");
3368
3369 conf.reloadConfiguration();
3370 configurationManager.notifyAllObservers(conf);
3371 }
3372
3373 @Override
3374 public HeapMemoryManager getHeapMemoryManager() {
3375 return hMemManager;
3376 }
3377
3378 @Override
3379 public double getCompactionPressure() {
3380 double max = 0;
3381 for (Region region : onlineRegions.values()) {
3382 for (Store store : region.getStores()) {
3383 double normCount = store.getCompactionPressure();
3384 if (normCount > max) {
3385 max = normCount;
3386 }
3387 }
3388 }
3389 return max;
3390 }
3391
3392
3393
3394
3395
3396 @VisibleForTesting
3397 public boolean walRollRequestFinished() {
3398 return this.walRoller.walRollFinished();
3399 }
3400 }