1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.regionserver;
20
21 import java.io.IOException;
22 import java.lang.Thread.UncaughtExceptionHandler;
23 import java.lang.annotation.Retention;
24 import java.lang.annotation.RetentionPolicy;
25 import java.lang.management.ManagementFactory;
26 import java.lang.management.MemoryUsage;
27 import java.lang.reflect.Constructor;
28 import java.net.BindException;
29 import java.net.InetSocketAddress;
30 import java.util.ArrayList;
31 import java.util.Collection;
32 import java.util.Collections;
33 import java.util.Comparator;
34 import java.util.HashMap;
35 import java.util.HashSet;
36 import java.util.Iterator;
37 import java.util.List;
38 import java.util.Map;
39 import java.util.Map.Entry;
40 import java.util.Random;
41 import java.util.Set;
42 import java.util.SortedMap;
43 import java.util.TreeMap;
44 import java.util.TreeSet;
45 import java.util.concurrent.ConcurrentHashMap;
46 import java.util.concurrent.ConcurrentMap;
47 import java.util.concurrent.ConcurrentSkipListMap;
48 import java.util.concurrent.locks.ReentrantReadWriteLock;
49
50 import javax.management.ObjectName;
51
52 import org.apache.commons.logging.Log;
53 import org.apache.commons.logging.LogFactory;
54 import org.apache.hadoop.classification.InterfaceAudience;
55 import org.apache.hadoop.conf.Configuration;
56 import org.apache.hadoop.fs.FileSystem;
57 import org.apache.hadoop.fs.Path;
58 import org.apache.hadoop.hbase.CellScannable;
59 import org.apache.hadoop.hbase.CellScanner;
60 import org.apache.hadoop.hbase.CellUtil;
61 import org.apache.hadoop.hbase.Chore;
62 import org.apache.hadoop.hbase.HBaseConfiguration;
63 import org.apache.hadoop.hbase.HConstants;
64 import org.apache.hadoop.hbase.HRegionInfo;
65 import org.apache.hadoop.hbase.HTableDescriptor;
66 import org.apache.hadoop.hbase.HealthCheckChore;
67 import org.apache.hadoop.hbase.KeyValue;
68 import org.apache.hadoop.hbase.RemoteExceptionHandler;
69 import org.apache.hadoop.hbase.ServerName;
70 import org.apache.hadoop.hbase.Stoppable;
71 import org.apache.hadoop.hbase.TableDescriptors;
72 import org.apache.hadoop.hbase.ZNodeClearer;
73 import org.apache.hadoop.hbase.catalog.CatalogTracker;
74 import org.apache.hadoop.hbase.catalog.MetaEditor;
75 import org.apache.hadoop.hbase.catalog.MetaReader;
76 import org.apache.hadoop.hbase.client.Append;
77 import org.apache.hadoop.hbase.client.Delete;
78 import org.apache.hadoop.hbase.client.Get;
79 import org.apache.hadoop.hbase.client.HConnectionManager;
80 import org.apache.hadoop.hbase.client.Increment;
81 import org.apache.hadoop.hbase.client.Mutation;
82 import org.apache.hadoop.hbase.client.Put;
83 import org.apache.hadoop.hbase.client.Result;
84 import org.apache.hadoop.hbase.client.RowMutations;
85 import org.apache.hadoop.hbase.client.Scan;
86 import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
87 import org.apache.hadoop.hbase.exceptions.ClockOutOfSyncException;
88 import org.apache.hadoop.hbase.exceptions.DoNotRetryIOException;
89 import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
90 import org.apache.hadoop.hbase.exceptions.LeaseException;
91 import org.apache.hadoop.hbase.exceptions.NoSuchColumnFamilyException;
92 import org.apache.hadoop.hbase.exceptions.NotServingRegionException;
93 import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
94 import org.apache.hadoop.hbase.exceptions.RegionAlreadyInTransitionException;
95 import org.apache.hadoop.hbase.exceptions.RegionInRecoveryException;
96 import org.apache.hadoop.hbase.exceptions.RegionMovedException;
97 import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
98 import org.apache.hadoop.hbase.exceptions.RegionServerRunningException;
99 import org.apache.hadoop.hbase.exceptions.RegionServerStoppedException;
100 import org.apache.hadoop.hbase.exceptions.ServerNotRunningYetException;
101 import org.apache.hadoop.hbase.exceptions.UnknownScannerException;
102 import org.apache.hadoop.hbase.exceptions.YouAreDeadException;
103 import org.apache.hadoop.hbase.executor.ExecutorService;
104 import org.apache.hadoop.hbase.executor.ExecutorType;
105 import org.apache.hadoop.hbase.filter.ByteArrayComparable;
106 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
107 import org.apache.hadoop.hbase.fs.HFileSystem;
108 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
109 import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler;
110 import org.apache.hadoop.hbase.ipc.RpcClient;
111 import org.apache.hadoop.hbase.ipc.RpcServer;
112 import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
113 import org.apache.hadoop.hbase.ipc.RpcServerInterface;
114 import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
115 import org.apache.hadoop.hbase.ipc.ServerRpcController;
116 import org.apache.hadoop.hbase.master.SplitLogManager;
117 import org.apache.hadoop.hbase.master.TableLockManager;
118 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
119 import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil;
120 import org.apache.hadoop.hbase.protobuf.RequestConverter;
121 import org.apache.hadoop.hbase.protobuf.ResponseConverter;
122 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
123 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionRequest;
124 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionResponse;
125 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CompactRegionRequest;
126 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CompactRegionResponse;
127 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.FlushRegionRequest;
128 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.FlushRegionResponse;
129 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
130 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetOnlineRegionResponse;
131 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoRequest;
132 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse;
133 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetServerInfoRequest;
134 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetServerInfoResponse;
135 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetStoreFileRequest;
136 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetStoreFileResponse;
137 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.MergeRegionsRequest;
138 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.MergeRegionsResponse;
139 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionRequest;
140 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionRequest.RegionOpenInfo;
141 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionResponse;
142 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionResponse.RegionOpeningState;
143 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryRequest;
144 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryResponse;
145 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.RollWALWriterRequest;
146 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.RollWALWriterResponse;
147 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.SplitRegionRequest;
148 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.SplitRegionResponse;
149 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.StopServerRequest;
150 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.StopServerResponse;
151 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
152 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ActionResult;
153 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
154 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest.FamilyPath;
155 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileResponse;
156 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Condition;
157 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
158 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
159 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetRequest;
160 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetResponse;
161 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiGetRequest;
162 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiGetResponse;
163 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
164 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiResponse;
165 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
166 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateResponse;
167 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto;
168 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
169 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
170 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
171 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
172 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.Coprocessor;
173 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair;
174 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionLoad;
175 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier;
176 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
177 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdRequest;
178 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerReportRequest;
179 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest;
180 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupResponse;
181 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStatusService;
182 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorRequest;
183 import org.apache.hadoop.hbase.regionserver.HRegion.Operation;
184 import org.apache.hadoop.hbase.regionserver.Leases.LeaseStillHeldException;
185 import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
186 import org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler;
187 import org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler;
188 import org.apache.hadoop.hbase.regionserver.handler.OpenMetaHandler;
189 import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler;
190 import org.apache.hadoop.hbase.regionserver.snapshot.RegionServerSnapshotManager;
191 import org.apache.hadoop.hbase.regionserver.wal.HLog;
192 import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
193 import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
194 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
195 import org.apache.hadoop.hbase.security.User;
196 import org.apache.hadoop.hbase.util.Bytes;
197 import org.apache.hadoop.hbase.util.CompressionTest;
198 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
199 import org.apache.hadoop.hbase.util.FSTableDescriptors;
200 import org.apache.hadoop.hbase.util.FSUtils;
201 import org.apache.hadoop.hbase.util.InfoServer;
202 import org.apache.hadoop.hbase.util.Pair;
203 import org.apache.hadoop.hbase.util.Sleeper;
204 import org.apache.hadoop.hbase.util.Strings;
205 import org.apache.hadoop.hbase.util.Threads;
206 import org.apache.hadoop.hbase.util.VersionInfo;
207 import org.apache.hadoop.hbase.zookeeper.ClusterStatusTracker;
208 import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
209 import org.apache.hadoop.hbase.zookeeper.MetaRegionTracker;
210 import org.apache.hadoop.hbase.zookeeper.RecoveringRegionWatcher;
211 import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
212 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
213 import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker;
214 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
215 import org.apache.hadoop.ipc.RemoteException;
216 import org.apache.hadoop.metrics.util.MBeanUtil;
217 import org.apache.hadoop.net.DNS;
218 import org.apache.hadoop.util.ReflectionUtils;
219 import org.apache.hadoop.util.StringUtils;
220 import org.apache.zookeeper.KeeperException;
221 import org.apache.zookeeper.data.Stat;
222 import org.cliffc.high_scale_lib.Counter;
223
224 import com.google.protobuf.BlockingRpcChannel;
225 import com.google.protobuf.ByteString;
226 import com.google.protobuf.Message;
227 import com.google.protobuf.RpcController;
228 import com.google.protobuf.ServiceException;
229 import com.google.protobuf.TextFormat;
230
231
232
233
234
235 @InterfaceAudience.Private
236 @SuppressWarnings("deprecation")
237 public class HRegionServer implements ClientProtos.ClientService.BlockingInterface,
238 AdminProtos.AdminService.BlockingInterface, Runnable, RegionServerServices,
239 HBaseRPCErrorHandler, LastSequenceId {
240
241 public static final Log LOG = LogFactory.getLog(HRegionServer.class);
242
243 private final Random rand;
244
245
246
247
248
249 protected static final String OPEN = "OPEN";
250 protected static final String CLOSE = "CLOSE";
251
252
253
254
255 protected final ConcurrentMap<byte[], Boolean> regionsInTransitionInRS =
256 new ConcurrentSkipListMap<byte[], Boolean>(Bytes.BYTES_COMPARATOR);
257
258 protected long maxScannerResultSize;
259
260
261 protected MemStoreFlusher cacheFlusher;
262
263
264 protected CatalogTracker catalogTracker;
265
266
267 @SuppressWarnings("unused")
268 private RecoveringRegionWatcher recoveringRegionWatcher;
269
270
271
272
273 protected TableDescriptors tableDescriptors;
274
275
276 protected ReplicationSourceService replicationSourceHandler;
277 protected ReplicationSinkService replicationSinkHandler;
278
279
280 public CompactSplitThread compactSplitThread;
281
282 final ConcurrentHashMap<String, RegionScannerHolder> scanners =
283 new ConcurrentHashMap<String, RegionScannerHolder>();
284
285
286
287
288
289 protected final Map<String, HRegion> onlineRegions =
290 new ConcurrentHashMap<String, HRegion>();
291
292
293
294
295
296
297
298
299
300
301 protected final Map<String, InetSocketAddress[]> regionFavoredNodesMap =
302 new ConcurrentHashMap<String, InetSocketAddress[]>();
303
304
305
306
307
308 protected final Map<String, HRegion> recoveringRegions = Collections
309 .synchronizedMap(new HashMap<String, HRegion>());
310
311
312 protected Leases leases;
313
314
315 protected ExecutorService service;
316
317
318 final Counter requestCount = new Counter();
319
320
321 protected volatile boolean fsOk;
322 protected HFileSystem fs;
323
324
325
326
327 protected volatile boolean stopped = false;
328
329
330
331 protected volatile boolean abortRequested;
332
333
334 protected int webuiport = -1;
335
336 ConcurrentMap<String, Integer> rowlocks = new ConcurrentHashMap<String, Integer>();
337
338
339
340 private boolean stopping = false;
341
342 private volatile boolean killed = false;
343
344 protected final Configuration conf;
345
346 private boolean useHBaseChecksum;
347 private Path rootDir;
348
349 protected final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
350
351 final int numRetries;
352 protected final int threadWakeFrequency;
353 private final int msgInterval;
354
355 protected final int numRegionsToReport;
356
357
358 private RegionServerStatusService.BlockingInterface rssStub;
359
360 RpcClient rpcClient;
361
362
363
364 RpcServerInterface rpcServer;
365
366 private final InetSocketAddress isa;
367 private UncaughtExceptionHandler uncaughtExceptionHandler;
368
369
370
371
372 InfoServer infoServer;
373
374
375 public static final String REGIONSERVER = "regionserver";
376
377
378 public static final String REGIONSERVER_CONF = "regionserver_conf";
379
380 private MetricsRegionServer metricsRegionServer;
381
382
383
384
385 Chore compactionChecker;
386
387
388
389
390 Chore periodicFlusher;
391
392
393
394 protected volatile HLog hlog;
395
396
397 protected volatile HLog hlogForMeta;
398
399 LogRoller hlogRoller;
400 LogRoller metaHLogRoller;
401
402
403 protected volatile boolean isOnline;
404
405
406 private ZooKeeperWatcher zooKeeper;
407
408
409 private MasterAddressTracker masterAddressManager;
410
411
412 private ClusterStatusTracker clusterStatusTracker;
413
414
415 private SplitLogWorker splitLogWorker;
416
417
418 private final Sleeper sleeper;
419
420 private final int rpcTimeout;
421
422 private final RegionServerAccounting regionServerAccounting;
423
424
425 final CacheConfig cacheConfig;
426
427
428 volatile private HRegionThriftServer thriftServer;
429
430
431 private HealthCheckChore healthCheckChore;
432
433
434
435
436
437
438
439 private ServerName serverNameFromMasterPOV;
440
441
442
443
444 private final long startcode;
445
446
447
448
449 private String clusterId;
450
451
452
453
454 private ObjectName mxBean = null;
455
456
457
458
459 private MovedRegionsCleaner movedRegionsCleaner;
460
461
462
463
464 private final int scannerLeaseTimeoutPeriod;
465
466
467
468
469 private final QosFunction qosFunction;
470
471 private RegionServerCoprocessorHost rsHost;
472
473
474 RegionServerSnapshotManager snapshotManager;
475
476
477 private final boolean distributedLogReplay;
478
479
480 private TableLockManager tableLockManager;
481
482
483
484
485
486
487
488
489 public HRegionServer(Configuration conf)
490 throws IOException, InterruptedException {
491 this.fsOk = true;
492 this.conf = conf;
493 this.isOnline = false;
494 checkCodecs(this.conf);
495
496
497
498 this.useHBaseChecksum = conf.getBoolean(HConstants.HBASE_CHECKSUM_VERIFICATION, false);
499
500
501 this.numRetries = conf.getInt("hbase.client.retries.number", 10);
502 this.threadWakeFrequency = conf.getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
503 this.msgInterval = conf.getInt("hbase.regionserver.msginterval", 3 * 1000);
504
505 this.sleeper = new Sleeper(this.msgInterval, this);
506
507 this.maxScannerResultSize = conf.getLong(
508 HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY,
509 HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE);
510
511 this.numRegionsToReport = conf.getInt(
512 "hbase.regionserver.numregionstoreport", 10);
513
514 this.rpcTimeout = conf.getInt(
515 HConstants.HBASE_RPC_TIMEOUT_KEY,
516 HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
517
518 this.abortRequested = false;
519 this.stopped = false;
520
521 this.scannerLeaseTimeoutPeriod = conf.getInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
522 HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD);
523
524
525 String hostname = conf.get("hbase.regionserver.ipc.address",
526 Strings.domainNamePointerToHostName(DNS.getDefaultHost(
527 conf.get("hbase.regionserver.dns.interface", "default"),
528 conf.get("hbase.regionserver.dns.nameserver", "default"))));
529 int port = conf.getInt(HConstants.REGIONSERVER_PORT,
530 HConstants.DEFAULT_REGIONSERVER_PORT);
531
532 InetSocketAddress initialIsa = new InetSocketAddress(hostname, port);
533 if (initialIsa.getAddress() == null) {
534 throw new IllegalArgumentException("Failed resolve of " + initialIsa);
535 }
536 this.rand = new Random(initialIsa.hashCode());
537 String name = "regionserver/" + initialIsa.toString();
538
539 HConnectionManager.setServerSideHConnectionRetries(this.conf, name, LOG);
540 this.rpcServer = new RpcServer(this, name, getServices(),
541
542 initialIsa,
543 conf.getInt("hbase.regionserver.handler.count", 10),
544 conf.getInt("hbase.regionserver.metahandler.count", 10),
545 conf, HConstants.QOS_THRESHOLD);
546
547
548 this.isa = this.rpcServer.getListenerAddress();
549
550 this.rpcServer.setErrorHandler(this);
551 this.rpcServer.setQosFunction((qosFunction = new QosFunction(this)));
552 this.startcode = System.currentTimeMillis();
553
554
555 ZKUtil.loginClient(this.conf, "hbase.zookeeper.client.keytab.file",
556 "hbase.zookeeper.client.kerberos.principal", this.isa.getHostName());
557
558
559 User.login(this.conf, "hbase.regionserver.keytab.file",
560 "hbase.regionserver.kerberos.principal", this.isa.getHostName());
561 regionServerAccounting = new RegionServerAccounting();
562 cacheConfig = new CacheConfig(conf);
563 uncaughtExceptionHandler = new UncaughtExceptionHandler() {
564 public void uncaughtException(Thread t, Throwable e) {
565 abort("Uncaught exception in service thread " + t.getName(), e);
566 }
567 };
568 this.rsHost = new RegionServerCoprocessorHost(this, this.conf);
569
570 this.distributedLogReplay = this.conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY,
571 HConstants.DEFAULT_DISTRIBUTED_LOG_REPLAY_CONFIG);
572 }
573
574
575
576
577 private List<BlockingServiceAndInterface> getServices() {
578 List<BlockingServiceAndInterface> bssi = new ArrayList<BlockingServiceAndInterface>(2);
579 bssi.add(new BlockingServiceAndInterface(
580 ClientProtos.ClientService.newReflectiveBlockingService(this),
581 ClientProtos.ClientService.BlockingInterface.class));
582 bssi.add(new BlockingServiceAndInterface(
583 AdminProtos.AdminService.newReflectiveBlockingService(this),
584 AdminProtos.AdminService.BlockingInterface.class));
585 return bssi;
586 }
587
588
589
590
591
592
593 private static void checkCodecs(final Configuration c) throws IOException {
594
595 String [] codecs = c.getStrings("hbase.regionserver.codecs", (String[])null);
596 if (codecs == null) return;
597 for (String codec : codecs) {
598 if (!CompressionTest.testCompression(codec)) {
599 throw new IOException("Compression codec " + codec +
600 " not supported, aborting RS construction");
601 }
602 }
603 }
604
605 String getClusterId() {
606 return this.clusterId;
607 }
608
609 @Retention(RetentionPolicy.RUNTIME)
610 protected @interface QosPriority {
611 int priority() default 0;
612 }
613
614 QosFunction getQosFunction() {
615 return qosFunction;
616 }
617
618 RegionScanner getScanner(long scannerId) {
619 String scannerIdString = Long.toString(scannerId);
620 RegionScannerHolder scannerHolder = scanners.get(scannerIdString);
621 if (scannerHolder != null) {
622 return scannerHolder.s;
623 }
624 return null;
625 }
626
627
628
629
630
631
632
633 private void preRegistrationInitialization(){
634 try {
635 initializeZooKeeper();
636 initializeThreads();
637 } catch (Throwable t) {
638
639
640 this.rpcServer.stop();
641 abort("Initialization of RS failed. Hence aborting RS.", t);
642 }
643 }
644
645
646
647
648
649
650
651
652
653 private void initializeZooKeeper() throws IOException, InterruptedException {
654
655 this.zooKeeper = new ZooKeeperWatcher(conf, REGIONSERVER + ":" +
656 this.isa.getPort(), this);
657
658
659
660
661 this.masterAddressManager = new MasterAddressTracker(this.zooKeeper, this);
662 this.masterAddressManager.start();
663 blockAndCheckIfStopped(this.masterAddressManager);
664
665
666
667 this.clusterStatusTracker = new ClusterStatusTracker(this.zooKeeper, this);
668 this.clusterStatusTracker.start();
669 blockAndCheckIfStopped(this.clusterStatusTracker);
670
671
672 this.catalogTracker = new CatalogTracker(this.zooKeeper, this.conf, this);
673 catalogTracker.start();
674
675
676
677
678 try {
679 clusterId = ZKClusterId.readClusterIdZNode(this.zooKeeper);
680 if (clusterId == null) {
681 this.abort("Cluster ID has not been set");
682 }
683 LOG.info("ClusterId : "+clusterId);
684 } catch (KeeperException e) {
685 this.abort("Failed to retrieve Cluster ID",e);
686 }
687
688
689 try {
690 this.snapshotManager = new RegionServerSnapshotManager(this);
691 } catch (KeeperException e) {
692 this.abort("Failed to reach zk cluster when creating snapshot handler.");
693 }
694 this.tableLockManager = TableLockManager.createTableLockManager(conf, zooKeeper,
695 new ServerName(isa.getHostName(), isa.getPort(), startcode));
696
697
698 if(this.distributedLogReplay) {
699 this.recoveringRegionWatcher = new RecoveringRegionWatcher(this.zooKeeper, this);
700 }
701 }
702
703
704
705
706
707
708
709
710 private void blockAndCheckIfStopped(ZooKeeperNodeTracker tracker)
711 throws IOException, InterruptedException {
712 while (tracker.blockUntilAvailable(this.msgInterval, false) == null) {
713 if (this.stopped) {
714 throw new IOException("Received the shutdown message while waiting.");
715 }
716 }
717 }
718
719
720
721
722 private boolean isClusterUp() {
723 return this.clusterStatusTracker.isClusterUp();
724 }
725
726 private void initializeThreads() throws IOException {
727
728 this.cacheFlusher = new MemStoreFlusher(conf, this);
729
730
731 this.compactSplitThread = new CompactSplitThread(this);
732
733
734
735 this.compactionChecker = new CompactionChecker(this, this.threadWakeFrequency, this);
736 this.periodicFlusher = new PeriodicMemstoreFlusher(this.threadWakeFrequency, this);
737
738 int sleepTime = this.conf.getInt(HConstants.HEALTH_CHORE_WAKE_FREQ,
739 HConstants.DEFAULT_THREAD_WAKE_FREQUENCY);
740 if (isHealthCheckerConfigured()) {
741 healthCheckChore = new HealthCheckChore(sleepTime, this, getConfiguration());
742 }
743
744 this.leases = new Leases(this.threadWakeFrequency);
745
746
747 if (conf.getBoolean("hbase.regionserver.export.thrift", false)) {
748 thriftServer = new HRegionThriftServer(this, conf);
749 thriftServer.start();
750 LOG.info("Started Thrift API from Region Server.");
751 }
752
753
754 movedRegionsCleaner = MovedRegionsCleaner.createAndStart(this);
755
756
757 rpcClient = new RpcClient(conf, clusterId);
758 }
759
760
761
762
763 public void run() {
764 try {
765
766 preRegistrationInitialization();
767 } catch (Throwable e) {
768 abort("Fatal exception during initialization", e);
769 }
770
771 try {
772
773
774 while (keepLooping()) {
775 RegionServerStartupResponse w = reportForDuty();
776 if (w == null) {
777 LOG.warn("reportForDuty failed; sleeping and then retrying.");
778 this.sleeper.sleep();
779 } else {
780 handleReportForDutyResponse(w);
781 break;
782 }
783 }
784
785
786 this.snapshotManager.start();
787
788
789 long lastMsg = 0;
790 long oldRequestCount = -1;
791
792 while (!this.stopped && isHealthy()) {
793 if (!isClusterUp()) {
794 if (isOnlineRegionsEmpty()) {
795 stop("Exiting; cluster shutdown set and not carrying any regions");
796 } else if (!this.stopping) {
797 this.stopping = true;
798 LOG.info("Closing user regions");
799 closeUserRegions(this.abortRequested);
800 } else if (this.stopping) {
801 boolean allUserRegionsOffline = areAllUserRegionsOffline();
802 if (allUserRegionsOffline) {
803
804
805 if (oldRequestCount == this.requestCount.get()) {
806 stop("Stopped; only catalog regions remaining online");
807 break;
808 }
809 oldRequestCount = this.requestCount.get();
810 } else {
811
812
813
814 closeUserRegions(this.abortRequested);
815 }
816 LOG.debug("Waiting on " + getOnlineRegionsAsPrintableString());
817 }
818 }
819 long now = System.currentTimeMillis();
820 if ((now - lastMsg) >= msgInterval) {
821 tryRegionServerReport(lastMsg, now);
822 lastMsg = System.currentTimeMillis();
823 }
824 if (!this.stopped) this.sleeper.sleep();
825 }
826 } catch (Throwable t) {
827 if (!checkOOME(t)) {
828 String prefix = t instanceof YouAreDeadException? "": "Unhandled: ";
829 abort(prefix + t.getMessage(), t);
830 }
831 }
832
833 if (mxBean != null) {
834 MBeanUtil.unregisterMBean(mxBean);
835 mxBean = null;
836 }
837 if (this.thriftServer != null) this.thriftServer.shutdown();
838 this.leases.closeAfterLeasesExpire();
839 this.rpcServer.stop();
840 if (this.splitLogWorker != null) {
841 splitLogWorker.stop();
842 }
843 if (this.infoServer != null) {
844 LOG.info("Stopping infoServer");
845 try {
846 this.infoServer.stop();
847 } catch (Exception e) {
848 e.printStackTrace();
849 }
850 }
851
852 if (cacheConfig.isBlockCacheEnabled()) {
853 cacheConfig.getBlockCache().shutdown();
854 }
855
856 movedRegionsCleaner.stop("Region Server stopping");
857
858
859
860 if (this.cacheFlusher != null) this.cacheFlusher.interruptIfNecessary();
861 if (this.compactSplitThread != null) this.compactSplitThread.interruptIfNecessary();
862 if (this.hlogRoller != null) this.hlogRoller.interruptIfNecessary();
863 if (this.metaHLogRoller != null) this.metaHLogRoller.interruptIfNecessary();
864 if (this.compactionChecker != null)
865 this.compactionChecker.interrupt();
866 if (this.healthCheckChore != null) {
867 this.healthCheckChore.interrupt();
868 }
869
870 try {
871 if (snapshotManager != null) snapshotManager.stop(this.abortRequested);
872 } catch (IOException e) {
873 LOG.warn("Failed to close snapshot handler cleanly", e);
874 }
875
876 if (this.killed) {
877
878 } else if (abortRequested) {
879 if (this.fsOk) {
880 closeUserRegions(abortRequested);
881 }
882 LOG.info("aborting server " + this.serverNameFromMasterPOV);
883 } else {
884 closeUserRegions(abortRequested);
885 closeAllScanners();
886 LOG.info("stopping server " + this.serverNameFromMasterPOV);
887 }
888
889
890 if (this.catalogTracker != null) this.catalogTracker.stop();
891
892
893 try {
894 if (snapshotManager != null) snapshotManager.stop(this.abortRequested || this.killed);
895 } catch (IOException e) {
896 LOG.warn("Failed to close snapshot handler cleanly", e);
897 }
898
899
900 if (!this.killed && containsMetaTableRegions()) {
901 if (!abortRequested || this.fsOk) {
902 if (this.compactSplitThread != null) {
903 this.compactSplitThread.join();
904 this.compactSplitThread = null;
905 }
906 closeMetaTableRegions(abortRequested);
907 }
908 }
909
910 if (!this.killed && this.fsOk) {
911 waitOnAllRegionsToClose(abortRequested);
912 LOG.info("stopping server " + this.serverNameFromMasterPOV +
913 "; all regions closed.");
914 }
915
916
917 if (!this.killed && this.fsOk) {
918 closeWAL(!abortRequested);
919 }
920
921
922 if (this.rssStub != null) {
923 this.rssStub = null;
924 }
925 this.rpcClient.stop();
926 this.leases.close();
927
928 if (!killed) {
929 join();
930 }
931
932 try {
933 deleteMyEphemeralNode();
934 } catch (KeeperException e) {
935 LOG.warn("Failed deleting my ephemeral node", e);
936 }
937
938
939 ZNodeClearer.deleteMyEphemeralNodeOnDisk();
940 this.zooKeeper.close();
941 LOG.info("stopping server " + this.serverNameFromMasterPOV +
942 "; zookeeper connection closed.");
943
944 LOG.info(Thread.currentThread().getName() + " exiting");
945 }
946
947 private boolean containsMetaTableRegions() {
948 return onlineRegions.containsKey(HRegionInfo.FIRST_META_REGIONINFO.getEncodedName());
949 }
950
951 private boolean areAllUserRegionsOffline() {
952 if (getNumberOfOnlineRegions() > 2) return false;
953 boolean allUserRegionsOffline = true;
954 for (Map.Entry<String, HRegion> e: this.onlineRegions.entrySet()) {
955 if (!e.getValue().getRegionInfo().isMetaTable()) {
956 allUserRegionsOffline = false;
957 break;
958 }
959 }
960 return allUserRegionsOffline;
961 }
962
963 void tryRegionServerReport(long reportStartTime, long reportEndTime)
964 throws IOException {
965 HBaseProtos.ServerLoad sl = buildServerLoad(reportStartTime, reportEndTime);
966 try {
967 RegionServerReportRequest.Builder request = RegionServerReportRequest.newBuilder();
968 ServerName sn = ServerName.parseVersionedServerName(
969 this.serverNameFromMasterPOV.getVersionedBytes());
970 request.setServer(ProtobufUtil.toServerName(sn));
971 request.setLoad(sl);
972 this.rssStub.regionServerReport(null, request.build());
973 } catch (ServiceException se) {
974 IOException ioe = ProtobufUtil.getRemoteException(se);
975 if (ioe instanceof YouAreDeadException) {
976
977 throw ioe;
978 }
979
980
981 Pair<ServerName, RegionServerStatusService.BlockingInterface> p =
982 createRegionServerStatusStub();
983 this.rssStub = p.getSecond();
984 }
985 }
986
987 HBaseProtos.ServerLoad buildServerLoad(long reportStartTime, long reportEndTime) {
988
989
990
991
992
993
994
995 MetricsRegionServerWrapper regionServerWrapper = this.metricsRegionServer.getRegionServerWrapper();
996 Collection<HRegion> regions = getOnlineRegionsLocalContext();
997 MemoryUsage memory =
998 ManagementFactory.getMemoryMXBean().getHeapMemoryUsage();
999
1000 HBaseProtos.ServerLoad.Builder serverLoad = HBaseProtos.ServerLoad.newBuilder();
1001 serverLoad.setNumberOfRequests((int) regionServerWrapper.getRequestsPerSecond());
1002 serverLoad.setTotalNumberOfRequests((int) regionServerWrapper.getTotalRequestCount());
1003 serverLoad.setUsedHeapMB((int)(memory.getUsed() / 1024 / 1024));
1004 serverLoad.setMaxHeapMB((int) (memory.getMax() / 1024 / 1024));
1005 Set<String> coprocessors = this.hlog.getCoprocessorHost().getCoprocessors();
1006 for (String coprocessor : coprocessors) {
1007 serverLoad.addCoprocessors(
1008 Coprocessor.newBuilder().setName(coprocessor).build());
1009 }
1010 for (HRegion region : regions) {
1011 serverLoad.addRegionLoads(createRegionLoad(region));
1012 }
1013 serverLoad.setReportStartTime(reportStartTime);
1014 serverLoad.setReportEndTime(reportEndTime);
1015 if (this.infoServer != null) {
1016 serverLoad.setInfoServerPort(this.infoServer.getPort());
1017 } else {
1018 serverLoad.setInfoServerPort(-1);
1019 }
1020 return serverLoad.build();
1021 }
1022
1023 String getOnlineRegionsAsPrintableString() {
1024 StringBuilder sb = new StringBuilder();
1025 for (HRegion r: this.onlineRegions.values()) {
1026 if (sb.length() > 0) sb.append(", ");
1027 sb.append(r.getRegionInfo().getEncodedName());
1028 }
1029 return sb.toString();
1030 }
1031
1032
1033
1034
1035 private void waitOnAllRegionsToClose(final boolean abort) {
1036
1037 int lastCount = -1;
1038 long previousLogTime = 0;
1039 Set<String> closedRegions = new HashSet<String>();
1040 while (!isOnlineRegionsEmpty()) {
1041 int count = getNumberOfOnlineRegions();
1042
1043 if (count != lastCount) {
1044
1045 if (System.currentTimeMillis() > (previousLogTime + 1000)) {
1046 previousLogTime = System.currentTimeMillis();
1047 lastCount = count;
1048 LOG.info("Waiting on " + count + " regions to close");
1049
1050
1051 if (count < 10 && LOG.isDebugEnabled()) {
1052 LOG.debug(this.onlineRegions);
1053 }
1054 }
1055 }
1056
1057
1058
1059 for (Map.Entry<String, HRegion> e : this.onlineRegions.entrySet()) {
1060 HRegionInfo hri = e.getValue().getRegionInfo();
1061 if (!this.regionsInTransitionInRS.containsKey(hri.getEncodedNameAsBytes())
1062 && !closedRegions.contains(hri.getEncodedName())) {
1063 closedRegions.add(hri.getEncodedName());
1064
1065 closeRegionIgnoreErrors(hri, abort);
1066 }
1067 }
1068
1069 if (this.regionsInTransitionInRS.isEmpty()) {
1070 if (!isOnlineRegionsEmpty()) {
1071 LOG.info("We were exiting though online regions are not empty," +
1072 " because some regions failed closing");
1073 }
1074 break;
1075 }
1076 Threads.sleep(200);
1077 }
1078 }
1079
1080 private void closeWAL(final boolean delete) {
1081 if (this.hlogForMeta != null) {
1082
1083
1084
1085
1086 try {
1087 this.hlogForMeta.close();
1088 } catch (Throwable e) {
1089 LOG.error("Metalog close and delete failed", RemoteExceptionHandler.checkThrowable(e));
1090 }
1091 }
1092 if (this.hlog != null) {
1093 try {
1094 if (delete) {
1095 hlog.closeAndDelete();
1096 } else {
1097 hlog.close();
1098 }
1099 } catch (Throwable e) {
1100 LOG.error("Close and delete failed", RemoteExceptionHandler.checkThrowable(e));
1101 }
1102 }
1103 }
1104
1105 private void closeAllScanners() {
1106
1107
1108 for (Map.Entry<String, RegionScannerHolder> e : this.scanners.entrySet()) {
1109 try {
1110 e.getValue().s.close();
1111 } catch (IOException ioe) {
1112 LOG.warn("Closing scanner " + e.getKey(), ioe);
1113 }
1114 }
1115 }
1116
1117
1118
1119
1120
1121
1122 protected void handleReportForDutyResponse(final RegionServerStartupResponse c)
1123 throws IOException {
1124 try {
1125 for (NameStringPair e : c.getMapEntriesList()) {
1126 String key = e.getName();
1127
1128 if (key.equals(HConstants.KEY_FOR_HOSTNAME_SEEN_BY_MASTER)) {
1129 String hostnameFromMasterPOV = e.getValue();
1130 this.serverNameFromMasterPOV = new ServerName(hostnameFromMasterPOV,
1131 this.isa.getPort(), this.startcode);
1132 if (!this.serverNameFromMasterPOV.equals(this.isa.getHostName())) {
1133 LOG.info("Master passed us a different hostname to use; was=" +
1134 this.isa.getHostName() + ", but now=" +
1135 this.serverNameFromMasterPOV.getHostname());
1136 }
1137 continue;
1138 }
1139 String value = e.getValue();
1140 if (LOG.isDebugEnabled()) {
1141 LOG.debug("Config from master: " + key + "=" + value);
1142 }
1143 this.conf.set(key, value);
1144 }
1145
1146
1147
1148 if (this.conf.get("mapred.task.id") == null) {
1149 this.conf.set("mapred.task.id", "hb_rs_" +
1150 this.serverNameFromMasterPOV.toString());
1151 }
1152
1153 createMyEphemeralNode();
1154
1155
1156 ZNodeClearer.writeMyEphemeralNodeOnDisk(getMyEphemeralNodePath());
1157
1158
1159
1160
1161
1162
1163 FSUtils.setFsDefault(this.conf, FSUtils.getRootDir(this.conf));
1164
1165 this.fs = new HFileSystem(this.conf, this.useHBaseChecksum);
1166 this.rootDir = FSUtils.getRootDir(this.conf);
1167 this.tableDescriptors = new FSTableDescriptors(this.fs, this.rootDir, true);
1168 this.hlog = setupWALAndReplication();
1169
1170 this.metricsRegionServer = new MetricsRegionServer(new MetricsRegionServerWrapperImpl(this));
1171 startServiceThreads();
1172 LOG.info("Serving as " + this.serverNameFromMasterPOV +
1173 ", RPC listening on " + this.isa +
1174 ", sessionid=0x" +
1175 Long.toHexString(this.zooKeeper.getRecoverableZooKeeper().getSessionId()));
1176 isOnline = true;
1177 } catch (Throwable e) {
1178 this.isOnline = false;
1179 stop("Failed initialization");
1180 throw convertThrowableToIOE(cleanup(e, "Failed init"),
1181 "Region server startup failed");
1182 } finally {
1183 sleeper.skipSleepCycle();
1184 }
1185 }
1186
1187 private void createMyEphemeralNode() throws KeeperException {
1188 ZKUtil.createEphemeralNodeAndWatch(this.zooKeeper, getMyEphemeralNodePath(),
1189 HConstants.EMPTY_BYTE_ARRAY);
1190 }
1191
1192 private void deleteMyEphemeralNode() throws KeeperException {
1193 ZKUtil.deleteNode(this.zooKeeper, getMyEphemeralNodePath());
1194 }
1195
1196 public RegionServerAccounting getRegionServerAccounting() {
1197 return regionServerAccounting;
1198 }
1199
1200 @Override
1201 public TableLockManager getTableLockManager() {
1202 return tableLockManager;
1203 }
1204
1205
1206
1207
1208
1209
1210
1211
1212 private RegionLoad createRegionLoad(final HRegion r) {
1213 byte[] name = r.getRegionName();
1214 int stores = 0;
1215 int storefiles = 0;
1216 int storeUncompressedSizeMB = 0;
1217 int storefileSizeMB = 0;
1218 int memstoreSizeMB = (int) (r.memstoreSize.get() / 1024 / 1024);
1219 int storefileIndexSizeMB = 0;
1220 int rootIndexSizeKB = 0;
1221 int totalStaticIndexSizeKB = 0;
1222 int totalStaticBloomSizeKB = 0;
1223 long totalCompactingKVs = 0;
1224 long currentCompactedKVs = 0;
1225 synchronized (r.stores) {
1226 stores += r.stores.size();
1227 for (Store store : r.stores.values()) {
1228 storefiles += store.getStorefilesCount();
1229 storeUncompressedSizeMB += (int) (store.getStoreSizeUncompressed()
1230 / 1024 / 1024);
1231 storefileSizeMB += (int) (store.getStorefilesSize() / 1024 / 1024);
1232 storefileIndexSizeMB += (int) (store.getStorefilesIndexSize() / 1024 / 1024);
1233 CompactionProgress progress = store.getCompactionProgress();
1234 if (progress != null) {
1235 totalCompactingKVs += progress.totalCompactingKVs;
1236 currentCompactedKVs += progress.currentCompactedKVs;
1237 }
1238
1239 rootIndexSizeKB +=
1240 (int) (store.getStorefilesIndexSize() / 1024);
1241
1242 totalStaticIndexSizeKB +=
1243 (int) (store.getTotalStaticIndexSize() / 1024);
1244
1245 totalStaticBloomSizeKB +=
1246 (int) (store.getTotalStaticBloomSize() / 1024);
1247 }
1248 }
1249 RegionLoad.Builder regionLoad = RegionLoad.newBuilder();
1250 RegionSpecifier.Builder regionSpecifier = RegionSpecifier.newBuilder();
1251 regionSpecifier.setType(RegionSpecifierType.REGION_NAME);
1252 regionSpecifier.setValue(ByteString.copyFrom(name));
1253 regionLoad.setRegionSpecifier(regionSpecifier.build())
1254 .setStores(stores)
1255 .setStorefiles(storefiles)
1256 .setStoreUncompressedSizeMB(storeUncompressedSizeMB)
1257 .setStorefileSizeMB(storefileSizeMB)
1258 .setMemstoreSizeMB(memstoreSizeMB)
1259 .setStorefileIndexSizeMB(storefileIndexSizeMB)
1260 .setRootIndexSizeKB(rootIndexSizeKB)
1261 .setTotalStaticIndexSizeKB(totalStaticIndexSizeKB)
1262 .setTotalStaticBloomSizeKB(totalStaticBloomSizeKB)
1263 .setReadRequestsCount((int) r.readRequestsCount.get())
1264 .setWriteRequestsCount((int) r.writeRequestsCount.get())
1265 .setTotalCompactingKVs(totalCompactingKVs)
1266 .setCurrentCompactedKVs(currentCompactedKVs)
1267 .setCompleteSequenceId(r.completeSequenceId);
1268
1269 return regionLoad.build();
1270 }
1271
1272
1273
1274
1275
1276 public RegionLoad createRegionLoad(final String encodedRegionName) {
1277 HRegion r = null;
1278 r = this.onlineRegions.get(encodedRegionName);
1279 return r != null ? createRegionLoad(r) : null;
1280 }
1281
1282
1283
1284
1285 private static class CompactionChecker extends Chore {
1286 private final HRegionServer instance;
1287 private final int majorCompactPriority;
1288 private final static int DEFAULT_PRIORITY = Integer.MAX_VALUE;
1289 private long iteration = 0;
1290
1291 CompactionChecker(final HRegionServer h, final int sleepTime,
1292 final Stoppable stopper) {
1293 super("CompactionChecker", sleepTime, h);
1294 this.instance = h;
1295 LOG.info("Runs every " + StringUtils.formatTime(sleepTime));
1296
1297
1298
1299
1300 this.majorCompactPriority = this.instance.conf.
1301 getInt("hbase.regionserver.compactionChecker.majorCompactPriority",
1302 DEFAULT_PRIORITY);
1303 }
1304
1305 @Override
1306 protected void chore() {
1307 for (HRegion r : this.instance.onlineRegions.values()) {
1308 if (r == null)
1309 continue;
1310 for (Store s : r.getStores().values()) {
1311 try {
1312 long multiplier = s.getCompactionCheckMultiplier();
1313 assert multiplier > 0;
1314 if (iteration % multiplier != 0) continue;
1315 if (s.needsCompaction()) {
1316
1317 this.instance.compactSplitThread.requestCompaction(r, s, getName()
1318 + " requests compaction", null);
1319 } else if (s.isMajorCompaction()) {
1320 if (majorCompactPriority == DEFAULT_PRIORITY
1321 || majorCompactPriority > r.getCompactPriority()) {
1322 this.instance.compactSplitThread.requestCompaction(r, s, getName()
1323 + " requests major compaction; use default priority", null);
1324 } else {
1325 this.instance.compactSplitThread.requestCompaction(r, s, getName()
1326 + " requests major compaction; use configured priority",
1327 this.majorCompactPriority, null);
1328 }
1329 }
1330 } catch (IOException e) {
1331 LOG.warn("Failed major compaction check on " + r, e);
1332 }
1333 }
1334 }
1335 iteration = (iteration == Long.MAX_VALUE) ? 0 : (iteration + 1);
1336 }
1337 }
1338
1339 class PeriodicMemstoreFlusher extends Chore {
1340 final HRegionServer server;
1341 final static int RANGE_OF_DELAY = 20000;
1342 final static int MIN_DELAY_TIME = 3000;
1343 public PeriodicMemstoreFlusher(int cacheFlushInterval, final HRegionServer server) {
1344 super(server.getServerName() + "-MemstoreFlusherChore", cacheFlushInterval, server);
1345 this.server = server;
1346 }
1347
1348 @Override
1349 protected void chore() {
1350 for (HRegion r : this.server.onlineRegions.values()) {
1351 if (r == null)
1352 continue;
1353 if (r.shouldFlush()) {
1354 FlushRequester requester = server.getFlushRequester();
1355 if (requester != null) {
1356 long randomDelay = rand.nextInt(RANGE_OF_DELAY) + MIN_DELAY_TIME;
1357 LOG.info(getName() + " requesting flush for region " + r.getRegionNameAsString() +
1358 " after a delay of " + randomDelay);
1359
1360
1361
1362 requester.requestDelayedFlush(r, randomDelay);
1363 }
1364 }
1365 }
1366 }
1367 }
1368
1369
1370
1371
1372
1373
1374
1375
1376 public boolean isOnline() {
1377 return isOnline;
1378 }
1379
1380
1381
1382
1383
1384
1385
1386 private HLog setupWALAndReplication() throws IOException {
1387 final Path oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
1388 final String logName
1389 = HLogUtil.getHLogDirectoryName(this.serverNameFromMasterPOV.toString());
1390
1391 Path logdir = new Path(rootDir, logName);
1392 if (LOG.isDebugEnabled()) LOG.debug("logdir=" + logdir);
1393 if (this.fs.exists(logdir)) {
1394 throw new RegionServerRunningException("Region server has already " +
1395 "created directory at " + this.serverNameFromMasterPOV.toString());
1396 }
1397
1398
1399
1400 createNewReplicationInstance(conf, this, this.fs, logdir, oldLogDir);
1401
1402 return instantiateHLog(rootDir, logName);
1403 }
1404
1405 private HLog getMetaWAL() throws IOException {
1406 if (this.hlogForMeta == null) {
1407 final String logName
1408 = HLogUtil.getHLogDirectoryName(this.serverNameFromMasterPOV.toString());
1409
1410 Path logdir = new Path(rootDir, logName);
1411 if (LOG.isDebugEnabled()) LOG.debug("logdir=" + logdir);
1412
1413 this.hlogForMeta = HLogFactory.createMetaHLog(this.fs.getBackingFs(),
1414 rootDir, logName, this.conf, getMetaWALActionListeners(),
1415 this.serverNameFromMasterPOV.toString());
1416 }
1417 return this.hlogForMeta;
1418 }
1419
1420
1421
1422
1423
1424
1425
1426
1427 protected HLog instantiateHLog(Path rootdir, String logName) throws IOException {
1428 return HLogFactory.createHLog(this.fs.getBackingFs(), rootdir, logName, this.conf,
1429 getWALActionListeners(), this.serverNameFromMasterPOV.toString());
1430 }
1431
1432
1433
1434
1435
1436
1437
1438 protected List<WALActionsListener> getWALActionListeners() {
1439 List<WALActionsListener> listeners = new ArrayList<WALActionsListener>();
1440
1441 this.hlogRoller = new LogRoller(this, this);
1442 listeners.add(this.hlogRoller);
1443 if (this.replicationSourceHandler != null &&
1444 this.replicationSourceHandler.getWALActionsListener() != null) {
1445
1446 listeners.add(this.replicationSourceHandler.getWALActionsListener());
1447 }
1448 return listeners;
1449 }
1450
1451 protected List<WALActionsListener> getMetaWALActionListeners() {
1452 List<WALActionsListener> listeners = new ArrayList<WALActionsListener>();
1453
1454
1455 MetaLogRoller tmpLogRoller = new MetaLogRoller(this, this);
1456 String n = Thread.currentThread().getName();
1457 Threads.setDaemonThreadRunning(tmpLogRoller.getThread(),
1458 n + "MetaLogRoller", uncaughtExceptionHandler);
1459 this.metaHLogRoller = tmpLogRoller;
1460 tmpLogRoller = null;
1461 listeners.add(this.metaHLogRoller);
1462 return listeners;
1463 }
1464
1465 protected LogRoller getLogRoller() {
1466 return hlogRoller;
1467 }
1468
1469 public MetricsRegionServer getMetrics() {
1470 return this.metricsRegionServer;
1471 }
1472
1473
1474
1475
1476 public MasterAddressTracker getMasterAddressManager() {
1477 return this.masterAddressManager;
1478 }
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492 private void startServiceThreads() throws IOException {
1493 String n = Thread.currentThread().getName();
1494
1495 this.service = new ExecutorService(getServerName().toString());
1496 this.service.startExecutorService(ExecutorType.RS_OPEN_REGION,
1497 conf.getInt("hbase.regionserver.executor.openregion.threads", 3));
1498 this.service.startExecutorService(ExecutorType.RS_OPEN_META,
1499 conf.getInt("hbase.regionserver.executor.openmeta.threads", 1));
1500 this.service.startExecutorService(ExecutorType.RS_CLOSE_REGION,
1501 conf.getInt("hbase.regionserver.executor.closeregion.threads", 3));
1502 this.service.startExecutorService(ExecutorType.RS_CLOSE_META,
1503 conf.getInt("hbase.regionserver.executor.closemeta.threads", 1));
1504 if (conf.getBoolean(StoreScanner.STORESCANNER_PARALLEL_SEEK_ENABLE, false)) {
1505 this.service.startExecutorService(ExecutorType.RS_PARALLEL_SEEK,
1506 conf.getInt("hbase.storescanner.parallel.seek.threads", 10));
1507 }
1508
1509 Threads.setDaemonThreadRunning(this.hlogRoller.getThread(), n + ".logRoller",
1510 uncaughtExceptionHandler);
1511 this.cacheFlusher.start(uncaughtExceptionHandler);
1512 Threads.setDaemonThreadRunning(this.compactionChecker.getThread(), n +
1513 ".compactionChecker", uncaughtExceptionHandler);
1514 Threads.setDaemonThreadRunning(this.periodicFlusher.getThread(), n +
1515 ".periodicFlusher", uncaughtExceptionHandler);
1516 if (this.healthCheckChore != null) {
1517 Threads
1518 .setDaemonThreadRunning(this.healthCheckChore.getThread(), n + ".healthChecker",
1519 uncaughtExceptionHandler);
1520 }
1521
1522
1523
1524 this.leases.setName(n + ".leaseChecker");
1525 this.leases.start();
1526
1527
1528
1529 this.webuiport = putUpWebUI();
1530
1531 if (this.replicationSourceHandler == this.replicationSinkHandler &&
1532 this.replicationSourceHandler != null) {
1533 this.replicationSourceHandler.startReplicationService();
1534 } else if (this.replicationSourceHandler != null) {
1535 this.replicationSourceHandler.startReplicationService();
1536 } else if (this.replicationSinkHandler != null) {
1537 this.replicationSinkHandler.startReplicationService();
1538 }
1539
1540
1541
1542 this.rpcServer.start();
1543
1544
1545 this.splitLogWorker = new SplitLogWorker(this.zooKeeper, this.getConfiguration(), this, this);
1546 splitLogWorker.start();
1547 }
1548
1549
1550
1551
1552
1553
1554 private int putUpWebUI() throws IOException {
1555 int port = this.conf.getInt(HConstants.REGIONSERVER_INFO_PORT, 60030);
1556
1557 if (port < 0) return port;
1558 String addr = this.conf.get("hbase.regionserver.info.bindAddress", "0.0.0.0");
1559
1560 boolean auto = this.conf.getBoolean(HConstants.REGIONSERVER_INFO_PORT_AUTO,
1561 false);
1562 while (true) {
1563 try {
1564 this.infoServer = new InfoServer("regionserver", addr, port, false, this.conf);
1565 this.infoServer.addServlet("status", "/rs-status", RSStatusServlet.class);
1566 this.infoServer.addServlet("dump", "/dump", RSDumpServlet.class);
1567 this.infoServer.setAttribute(REGIONSERVER, this);
1568 this.infoServer.setAttribute(REGIONSERVER_CONF, conf);
1569 this.infoServer.start();
1570 break;
1571 } catch (BindException e) {
1572 if (!auto) {
1573
1574 throw e;
1575 }
1576
1577 LOG.info("Failed binding http info server to port: " + port);
1578 port++;
1579 }
1580 }
1581 return port;
1582 }
1583
1584
1585
1586
1587 private boolean isHealthy() {
1588 if (!fsOk) {
1589
1590 return false;
1591 }
1592
1593 if (!(leases.isAlive()
1594 && cacheFlusher.isAlive() && hlogRoller.isAlive()
1595 && this.compactionChecker.isAlive())
1596 && this.periodicFlusher.isAlive()) {
1597 stop("One or more threads are no longer alive -- stop");
1598 return false;
1599 }
1600 if (metaHLogRoller != null && !metaHLogRoller.isAlive()) {
1601 stop("Meta HLog roller thread is no longer alive -- stop");
1602 return false;
1603 }
1604 return true;
1605 }
1606
1607 public HLog getWAL() {
1608 try {
1609 return getWAL(null);
1610 } catch (IOException e) {
1611 LOG.warn("getWAL threw exception " + e);
1612 return null;
1613 }
1614 }
1615
1616 @Override
1617 public HLog getWAL(HRegionInfo regionInfo) throws IOException {
1618
1619
1620
1621
1622 if (regionInfo != null &&
1623 regionInfo.isMetaTable()) {
1624 return getMetaWAL();
1625 }
1626 return this.hlog;
1627 }
1628
1629 @Override
1630 public CatalogTracker getCatalogTracker() {
1631 return this.catalogTracker;
1632 }
1633
1634 @Override
1635 public void stop(final String msg) {
1636 try {
1637 this.rsHost.preStop(msg);
1638 this.stopped = true;
1639 LOG.info("STOPPED: " + msg);
1640
1641 sleeper.skipSleepCycle();
1642 } catch (IOException exp) {
1643 LOG.warn("The region server did not stop", exp);
1644 }
1645 }
1646
1647 public void waitForServerOnline(){
1648 while (!isOnline() && !isStopped()){
1649 sleeper.sleep();
1650 }
1651 }
1652
1653 @Override
1654 public void postOpenDeployTasks(final HRegion r, final CatalogTracker ct)
1655 throws KeeperException, IOException {
1656 checkOpen();
1657 LOG.info("Post open deploy tasks for region=" + r.getRegionNameAsString());
1658
1659 for (Store s : r.getStores().values()) {
1660 if (s.hasReferences() || s.needsCompaction()) {
1661 getCompactionRequester().requestCompaction(r, s, "Opening Region", null);
1662 }
1663 }
1664 long openSeqNum = r.getOpenSeqNum();
1665 if (openSeqNum == HConstants.NO_SEQNUM) {
1666
1667 LOG.error("No sequence number found when opening " + r.getRegionNameAsString());
1668 openSeqNum = 0;
1669 }
1670
1671
1672 updateRecoveringRegionLastFlushedSequenceId(r);
1673
1674
1675 if (r.getRegionInfo().isMetaRegion()) {
1676 MetaRegionTracker.setMetaLocation(getZooKeeper(),
1677 this.serverNameFromMasterPOV);
1678 } else {
1679 MetaEditor.updateRegionLocation(ct, r.getRegionInfo(),
1680 this.serverNameFromMasterPOV, openSeqNum);
1681 }
1682 LOG.info("Done with post open deploy task for region=" +
1683 r.getRegionNameAsString());
1684
1685 }
1686
1687 @Override
1688 public RpcServerInterface getRpcServer() {
1689 return rpcServer;
1690 }
1691
1692
1693
1694
1695
1696
1697
1698
1699
1700
1701
1702 public void abort(String reason, Throwable cause) {
1703 String msg = "ABORTING region server " + this + ": " + reason;
1704 if (cause != null) {
1705 LOG.fatal(msg, cause);
1706 } else {
1707 LOG.fatal(msg);
1708 }
1709 this.abortRequested = true;
1710
1711
1712
1713 LOG.fatal("RegionServer abort: loaded coprocessors are: " +
1714 CoprocessorHost.getLoadedCoprocessors());
1715
1716 try {
1717 if (cause != null) {
1718 msg += "\nCause:\n" + StringUtils.stringifyException(cause);
1719 }
1720
1721 if (rssStub != null && this.serverNameFromMasterPOV != null) {
1722 ReportRSFatalErrorRequest.Builder builder =
1723 ReportRSFatalErrorRequest.newBuilder();
1724 ServerName sn =
1725 ServerName.parseVersionedServerName(this.serverNameFromMasterPOV.getVersionedBytes());
1726 builder.setServer(ProtobufUtil.toServerName(sn));
1727 builder.setErrorMessage(msg);
1728 rssStub.reportRSFatalError(null, builder.build());
1729 }
1730 } catch (Throwable t) {
1731 LOG.warn("Unable to report fatal error to master", t);
1732 }
1733 stop(reason);
1734 }
1735
1736
1737
1738
1739 public void abort(String reason) {
1740 abort(reason, null);
1741 }
1742
1743 public boolean isAborted() {
1744 return this.abortRequested;
1745 }
1746
1747
1748
1749
1750
1751
1752 protected void kill() {
1753 this.killed = true;
1754 abort("Simulated kill");
1755 }
1756
1757
1758
1759
1760
1761 protected void join() {
1762 Threads.shutdown(this.compactionChecker.getThread());
1763 Threads.shutdown(this.periodicFlusher.getThread());
1764 this.cacheFlusher.join();
1765 if (this.healthCheckChore != null) {
1766 Threads.shutdown(this.healthCheckChore.getThread());
1767 }
1768 if (this.hlogRoller != null) {
1769 Threads.shutdown(this.hlogRoller.getThread());
1770 }
1771 if (this.metaHLogRoller != null) {
1772 Threads.shutdown(this.metaHLogRoller.getThread());
1773 }
1774 if (this.compactSplitThread != null) {
1775 this.compactSplitThread.join();
1776 }
1777 if (this.service != null) this.service.shutdown();
1778 if (this.replicationSourceHandler != null &&
1779 this.replicationSourceHandler == this.replicationSinkHandler) {
1780 this.replicationSourceHandler.stopReplicationService();
1781 } else if (this.replicationSourceHandler != null) {
1782 this.replicationSourceHandler.stopReplicationService();
1783 } else if (this.replicationSinkHandler != null) {
1784 this.replicationSinkHandler.stopReplicationService();
1785 }
1786 }
1787
1788
1789
1790
1791
1792 ReplicationSourceService getReplicationSourceService() {
1793 return replicationSourceHandler;
1794 }
1795
1796
1797
1798
1799
1800 ReplicationSinkService getReplicationSinkService() {
1801 return replicationSinkHandler;
1802 }
1803
1804
1805
1806
1807
1808
1809
1810
1811
1812 private Pair<ServerName, RegionServerStatusService.BlockingInterface>
1813 createRegionServerStatusStub() {
1814 ServerName sn = null;
1815 long previousLogTime = 0;
1816 RegionServerStatusService.BlockingInterface master = null;
1817 boolean refresh = false;
1818 RegionServerStatusService.BlockingInterface intf = null;
1819 while (keepLooping() && master == null) {
1820 sn = this.masterAddressManager.getMasterAddress(refresh);
1821 if (sn == null) {
1822 if (!keepLooping()) {
1823
1824 LOG.debug("No master found and cluster is stopped; bailing out");
1825 return null;
1826 }
1827 LOG.debug("No master found; retry");
1828 previousLogTime = System.currentTimeMillis();
1829 refresh = true;
1830 sleeper.sleep();
1831 continue;
1832 }
1833
1834 InetSocketAddress isa =
1835 new InetSocketAddress(sn.getHostname(), sn.getPort());
1836
1837 LOG.info("Attempting connect to Master server at " +
1838 this.masterAddressManager.getMasterAddress());
1839 try {
1840 BlockingRpcChannel channel = this.rpcClient.createBlockingRpcChannel(sn,
1841 User.getCurrent(), this.rpcTimeout);
1842 intf = RegionServerStatusService.newBlockingStub(channel);
1843 break;
1844 } catch (IOException e) {
1845 e = e instanceof RemoteException ?
1846 ((RemoteException)e).unwrapRemoteException() : e;
1847 if (e instanceof ServerNotRunningYetException) {
1848 if (System.currentTimeMillis() > (previousLogTime+1000)){
1849 LOG.info("Master isn't available yet, retrying");
1850 previousLogTime = System.currentTimeMillis();
1851 }
1852 } else {
1853 if (System.currentTimeMillis() > (previousLogTime + 1000)) {
1854 LOG.warn("Unable to connect to master. Retrying. Error was:", e);
1855 previousLogTime = System.currentTimeMillis();
1856 }
1857 }
1858 try {
1859 Thread.sleep(200);
1860 } catch (InterruptedException ignored) {
1861 }
1862 }
1863 }
1864 return new Pair<ServerName, RegionServerStatusService.BlockingInterface>(sn, intf);
1865 }
1866
1867
1868
1869
1870
1871 private boolean keepLooping() {
1872 return !this.stopped && isClusterUp();
1873 }
1874
1875
1876
1877
1878
1879
1880
1881
1882 private RegionServerStartupResponse reportForDuty() throws IOException {
1883 RegionServerStartupResponse result = null;
1884 Pair<ServerName, RegionServerStatusService.BlockingInterface> p =
1885 createRegionServerStatusStub();
1886 this.rssStub = p.getSecond();
1887 ServerName masterServerName = p.getFirst();
1888 if (masterServerName == null) return result;
1889 try {
1890 this.requestCount.set(0);
1891 LOG.info("Telling master at " + masterServerName + " that we are up " +
1892 "with port=" + this.isa.getPort() + ", startcode=" + this.startcode);
1893 long now = EnvironmentEdgeManager.currentTimeMillis();
1894 int port = this.isa.getPort();
1895 RegionServerStartupRequest.Builder request = RegionServerStartupRequest.newBuilder();
1896 request.setPort(port);
1897 request.setServerStartCode(this.startcode);
1898 request.setServerCurrentTime(now);
1899 result = this.rssStub.regionServerStartup(null, request.build());
1900 } catch (ServiceException se) {
1901 IOException ioe = ProtobufUtil.getRemoteException(se);
1902 if (ioe instanceof ClockOutOfSyncException) {
1903 LOG.fatal("Master rejected startup because clock is out of sync", ioe);
1904
1905 throw ioe;
1906 } else {
1907 LOG.warn("error telling master we are up", se);
1908 }
1909 }
1910 return result;
1911 }
1912
1913 @Override
1914 public long getLastSequenceId(byte[] region) {
1915 Long lastFlushedSequenceId = -1l;
1916 try {
1917 GetLastFlushedSequenceIdRequest req = RequestConverter
1918 .buildGetLastFlushedSequenceIdRequest(region);
1919 lastFlushedSequenceId = rssStub.getLastFlushedSequenceId(null, req)
1920 .getLastFlushedSequenceId();
1921 } catch (ServiceException e) {
1922 lastFlushedSequenceId = -1l;
1923 LOG.warn("Unable to connect to the master to check " + "the last flushed sequence id", e);
1924 }
1925 return lastFlushedSequenceId;
1926 }
1927
1928
1929
1930
1931
1932
1933 protected void closeAllRegions(final boolean abort) {
1934 closeUserRegions(abort);
1935 closeMetaTableRegions(abort);
1936 }
1937
1938
1939
1940
1941
1942 void closeMetaTableRegions(final boolean abort) {
1943 HRegion meta = null;
1944 this.lock.writeLock().lock();
1945 try {
1946 for (Map.Entry<String, HRegion> e: onlineRegions.entrySet()) {
1947 HRegionInfo hri = e.getValue().getRegionInfo();
1948 if (hri.isMetaRegion()) {
1949 meta = e.getValue();
1950 }
1951 if (meta != null) break;
1952 }
1953 } finally {
1954 this.lock.writeLock().unlock();
1955 }
1956 if (meta != null) closeRegionIgnoreErrors(meta.getRegionInfo(), abort);
1957 }
1958
1959
1960
1961
1962
1963
1964
1965 void closeUserRegions(final boolean abort) {
1966 this.lock.writeLock().lock();
1967 try {
1968 for (Map.Entry<String, HRegion> e: this.onlineRegions.entrySet()) {
1969 HRegion r = e.getValue();
1970 if (!r.getRegionInfo().isMetaTable() && r.isAvailable()) {
1971
1972 closeRegionIgnoreErrors(r.getRegionInfo(), abort);
1973 }
1974 }
1975 } finally {
1976 this.lock.writeLock().unlock();
1977 }
1978 }
1979
1980
1981 public InfoServer getInfoServer() {
1982 return infoServer;
1983 }
1984
1985
1986
1987
1988 public boolean isStopped() {
1989 return this.stopped;
1990 }
1991
1992 @Override
1993 public boolean isStopping() {
1994 return this.stopping;
1995 }
1996
1997 public Map<String, HRegion> getRecoveringRegions() {
1998 return this.recoveringRegions;
1999 }
2000
2001
2002
2003
2004
2005 public Configuration getConfiguration() {
2006 return conf;
2007 }
2008
2009
2010 ReentrantReadWriteLock.WriteLock getWriteLock() {
2011 return lock.writeLock();
2012 }
2013
2014 public int getNumberOfOnlineRegions() {
2015 return this.onlineRegions.size();
2016 }
2017
2018 boolean isOnlineRegionsEmpty() {
2019 return this.onlineRegions.isEmpty();
2020 }
2021
2022
2023
2024
2025
2026
2027 public Collection<HRegion> getOnlineRegionsLocalContext() {
2028 Collection<HRegion> regions = this.onlineRegions.values();
2029 return Collections.unmodifiableCollection(regions);
2030 }
2031
2032 @Override
2033 public void addToOnlineRegions(HRegion region) {
2034 this.onlineRegions.put(region.getRegionInfo().getEncodedName(), region);
2035 }
2036
2037
2038
2039
2040
2041 public SortedMap<Long, HRegion> getCopyOfOnlineRegionsSortedBySize() {
2042
2043 SortedMap<Long, HRegion> sortedRegions = new TreeMap<Long, HRegion>(
2044 new Comparator<Long>() {
2045 public int compare(Long a, Long b) {
2046 return -1 * a.compareTo(b);
2047 }
2048 });
2049
2050 for (HRegion region : this.onlineRegions.values()) {
2051 sortedRegions.put(region.memstoreSize.get(), region);
2052 }
2053 return sortedRegions;
2054 }
2055
2056
2057
2058
2059 public long getStartcode() {
2060 return this.startcode;
2061 }
2062
2063
2064 public FlushRequester getFlushRequester() {
2065 return this.cacheFlusher;
2066 }
2067
2068
2069
2070
2071
2072
2073
2074 protected HRegionInfo[] getMostLoadedRegions() {
2075 ArrayList<HRegionInfo> regions = new ArrayList<HRegionInfo>();
2076 for (HRegion r : onlineRegions.values()) {
2077 if (!r.isAvailable()) {
2078 continue;
2079 }
2080 if (regions.size() < numRegionsToReport) {
2081 regions.add(r.getRegionInfo());
2082 } else {
2083 break;
2084 }
2085 }
2086 return regions.toArray(new HRegionInfo[regions.size()]);
2087 }
2088
2089 @Override
2090 public Leases getLeases() {
2091 return leases;
2092 }
2093
2094
2095
2096
2097 protected Path getRootDir() {
2098 return rootDir;
2099 }
2100
2101
2102
2103
2104 public FileSystem getFileSystem() {
2105 return fs;
2106 }
2107
2108 public String toString() {
2109 return getServerName().toString();
2110 }
2111
2112
2113
2114
2115
2116
2117 public int getThreadWakeFrequency() {
2118 return threadWakeFrequency;
2119 }
2120
2121 @Override
2122 public ZooKeeperWatcher getZooKeeper() {
2123 return zooKeeper;
2124 }
2125
2126 @Override
2127 public ServerName getServerName() {
2128
2129 return this.serverNameFromMasterPOV == null?
2130 new ServerName(this.isa.getHostName(), this.isa.getPort(), this.startcode):
2131 this.serverNameFromMasterPOV;
2132 }
2133
2134 @Override
2135 public CompactionRequestor getCompactionRequester() {
2136 return this.compactSplitThread;
2137 }
2138
2139 public ZooKeeperWatcher getZooKeeperWatcher() {
2140 return this.zooKeeper;
2141 }
2142
2143 public RegionServerCoprocessorHost getCoprocessorHost(){
2144 return this.rsHost;
2145 }
2146
2147
2148 public ConcurrentMap<byte[], Boolean> getRegionsInTransitionInRS() {
2149 return this.regionsInTransitionInRS;
2150 }
2151
2152 public ExecutorService getExecutorService() {
2153 return service;
2154 }
2155
2156
2157
2158
2159
2160
2161
2162
2163 static private void createNewReplicationInstance(Configuration conf,
2164 HRegionServer server, FileSystem fs, Path logDir, Path oldLogDir) throws IOException{
2165
2166
2167 if (!conf.getBoolean(HConstants.REPLICATION_ENABLE_KEY, false)) {
2168 return;
2169 }
2170
2171
2172 String sourceClassname = conf.get(HConstants.REPLICATION_SOURCE_SERVICE_CLASSNAME,
2173 HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT);
2174
2175
2176 String sinkClassname = conf.get(HConstants.REPLICATION_SINK_SERVICE_CLASSNAME,
2177 HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT);
2178
2179
2180
2181 if (sourceClassname.equals(sinkClassname)) {
2182 server.replicationSourceHandler = (ReplicationSourceService)
2183 newReplicationInstance(sourceClassname,
2184 conf, server, fs, logDir, oldLogDir);
2185 server.replicationSinkHandler = (ReplicationSinkService)
2186 server.replicationSourceHandler;
2187 }
2188 else {
2189 server.replicationSourceHandler = (ReplicationSourceService)
2190 newReplicationInstance(sourceClassname,
2191 conf, server, fs, logDir, oldLogDir);
2192 server.replicationSinkHandler = (ReplicationSinkService)
2193 newReplicationInstance(sinkClassname,
2194 conf, server, fs, logDir, oldLogDir);
2195 }
2196 }
2197
2198 static private ReplicationService newReplicationInstance(String classname,
2199 Configuration conf, HRegionServer server, FileSystem fs, Path logDir,
2200 Path oldLogDir) throws IOException{
2201
2202 Class<?> clazz = null;
2203 try {
2204 ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
2205 clazz = Class.forName(classname, true, classLoader);
2206 } catch (java.lang.ClassNotFoundException nfe) {
2207 throw new IOException("Cound not find class for " + classname);
2208 }
2209
2210
2211 ReplicationService service = (ReplicationService)
2212 ReflectionUtils.newInstance(clazz, conf);
2213 service.initialize(server, fs, logDir, oldLogDir);
2214 return service;
2215 }
2216
2217
2218
2219
2220
2221
2222 public static Thread startRegionServer(final HRegionServer hrs)
2223 throws IOException {
2224 return startRegionServer(hrs, "regionserver" + hrs.isa.getPort());
2225 }
2226
2227
2228
2229
2230
2231
2232
2233 public static Thread startRegionServer(final HRegionServer hrs,
2234 final String name) throws IOException {
2235 Thread t = new Thread(hrs);
2236 t.setName(name);
2237 t.start();
2238
2239
2240 ShutdownHook.install(hrs.getConfiguration(), FileSystem.get(hrs
2241 .getConfiguration()), hrs, t);
2242 return t;
2243 }
2244
2245
2246
2247
2248
2249
2250
2251
2252 public static HRegionServer constructRegionServer(
2253 Class<? extends HRegionServer> regionServerClass,
2254 final Configuration conf2) {
2255 try {
2256 Constructor<? extends HRegionServer> c = regionServerClass
2257 .getConstructor(Configuration.class);
2258 return c.newInstance(conf2);
2259 } catch (Exception e) {
2260 throw new RuntimeException("Failed construction of " + "Regionserver: "
2261 + regionServerClass.toString(), e);
2262 }
2263 }
2264
2265
2266
2267
2268 public static void main(String[] args) throws Exception {
2269 VersionInfo.logVersion();
2270 Configuration conf = HBaseConfiguration.create();
2271 @SuppressWarnings("unchecked")
2272 Class<? extends HRegionServer> regionServerClass = (Class<? extends HRegionServer>) conf
2273 .getClass(HConstants.REGION_SERVER_IMPL, HRegionServer.class);
2274
2275 new HRegionServerCommandLine(regionServerClass).doMain(args);
2276 }
2277
2278
2279
2280
2281
2282
2283
2284
2285
2286
2287
2288 public List<HRegion> getOnlineRegions(byte[] tableName) {
2289 List<HRegion> tableRegions = new ArrayList<HRegion>();
2290 synchronized (this.onlineRegions) {
2291 for (HRegion region: this.onlineRegions.values()) {
2292 HRegionInfo regionInfo = region.getRegionInfo();
2293 if(Bytes.equals(regionInfo.getTableName(), tableName)) {
2294 tableRegions.add(region);
2295 }
2296 }
2297 }
2298 return tableRegions;
2299 }
2300
2301
2302 public String[] getCoprocessors() {
2303 TreeSet<String> coprocessors = new TreeSet<String>(
2304 this.hlog.getCoprocessorHost().getCoprocessors());
2305 Collection<HRegion> regions = getOnlineRegionsLocalContext();
2306 for (HRegion region: regions) {
2307 coprocessors.addAll(region.getCoprocessorHost().getCoprocessors());
2308 }
2309 return coprocessors.toArray(new String[coprocessors.size()]);
2310 }
2311
2312
2313
2314
2315
2316 private class ScannerListener implements LeaseListener {
2317 private final String scannerName;
2318
2319 ScannerListener(final String n) {
2320 this.scannerName = n;
2321 }
2322
2323 public void leaseExpired() {
2324 RegionScannerHolder rsh = scanners.remove(this.scannerName);
2325 if (rsh != null) {
2326 RegionScanner s = rsh.s;
2327 LOG.info("Scanner " + this.scannerName + " lease expired on region "
2328 + s.getRegionInfo().getRegionNameAsString());
2329 try {
2330 HRegion region = getRegion(s.getRegionInfo().getRegionName());
2331 if (region != null && region.getCoprocessorHost() != null) {
2332 region.getCoprocessorHost().preScannerClose(s);
2333 }
2334
2335 s.close();
2336 if (region != null && region.getCoprocessorHost() != null) {
2337 region.getCoprocessorHost().postScannerClose(s);
2338 }
2339 } catch (IOException e) {
2340 LOG.error("Closing scanner for "
2341 + s.getRegionInfo().getRegionNameAsString(), e);
2342 }
2343 } else {
2344 LOG.info("Scanner " + this.scannerName + " lease expired");
2345 }
2346 }
2347 }
2348
2349
2350
2351
2352
2353
2354 protected void checkOpen() throws IOException {
2355 if (this.stopped || this.abortRequested) {
2356 throw new RegionServerStoppedException("Server " + getServerName() +
2357 " not running" + (this.abortRequested ? ", aborting" : ""));
2358 }
2359 if (!fsOk) {
2360 throw new RegionServerStoppedException("File system not available");
2361 }
2362 }
2363
2364
2365
2366
2367
2368
2369 private void closeRegionIgnoreErrors(HRegionInfo region, final boolean abort) {
2370 try {
2371 if (!closeRegion(region.getEncodedName(), abort, false, -1, null)) {
2372 LOG.warn("Failed to close " + region.getRegionNameAsString() +
2373 " - ignoring and continuing");
2374 }
2375 } catch (NotServingRegionException e) {
2376 LOG.warn("Failed to close " + region.getRegionNameAsString() +
2377 " - ignoring and continuing", e);
2378 }
2379 }
2380
2381
2382
2383
2384
2385
2386
2387
2388
2389
2390
2391
2392
2393
2394
2395
2396
2397
2398
2399
2400
2401
2402
2403
2404
2405 protected boolean closeRegion(String encodedName, final boolean abort,
2406 final boolean zk, final int versionOfClosingNode, final ServerName sn)
2407 throws NotServingRegionException {
2408
2409 final HRegion actualRegion = this.getFromOnlineRegions(encodedName);
2410 if ((actualRegion != null) && (actualRegion.getCoprocessorHost() != null)) {
2411 try {
2412 actualRegion.getCoprocessorHost().preClose(false);
2413 } catch (IOException exp) {
2414 LOG.warn("Unable to close region: the coprocessor launched an error ", exp);
2415 return false;
2416 }
2417 }
2418
2419 final Boolean previous = this.regionsInTransitionInRS.putIfAbsent(encodedName.getBytes(),
2420 Boolean.FALSE);
2421
2422 if (Boolean.TRUE.equals(previous)) {
2423 LOG.info("Received CLOSE for the region:" + encodedName + " , which we are already " +
2424 "trying to OPEN. Cancelling OPENING.");
2425 if (!regionsInTransitionInRS.replace(encodedName.getBytes(), previous, Boolean.FALSE)){
2426
2427
2428 LOG.warn("The opening for region " + encodedName + " was done before we could cancel it." +
2429 " Doing a standard close now");
2430 return closeRegion(encodedName, abort, zk, versionOfClosingNode, sn);
2431 } else {
2432 LOG.info("The opening previously in progress has been cancelled by a CLOSE request.");
2433
2434 throw new NotServingRegionException("The region " + encodedName +
2435 " was opening but not yet served. Opening is cancelled.");
2436 }
2437 } else if (Boolean.FALSE.equals(previous)) {
2438 LOG.info("Received CLOSE for the region: " + encodedName +
2439 " ,which we are already trying to CLOSE");
2440
2441 throw new NotServingRegionException("The region " + encodedName +
2442 " was already closing. New CLOSE request is ignored.");
2443 }
2444
2445 if (actualRegion == null){
2446 LOG.error("Received CLOSE for a region which is not online, and we're not opening.");
2447 this.regionsInTransitionInRS.remove(encodedName.getBytes());
2448
2449 throw new NotServingRegionException("The region " + encodedName +
2450 " is not online, and is not opening.");
2451 }
2452
2453 CloseRegionHandler crh;
2454 final HRegionInfo hri = actualRegion.getRegionInfo();
2455 if (hri.isMetaRegion()) {
2456 crh = new CloseMetaHandler(this, this, hri, abort, zk, versionOfClosingNode);
2457 } else {
2458 crh = new CloseRegionHandler(this, this, hri, abort, zk, versionOfClosingNode, sn);
2459 }
2460 this.service.submit(crh);
2461 return true;
2462 }
2463
2464
2465
2466
2467
2468
2469 public HRegion getOnlineRegion(final byte[] regionName) {
2470 String encodedRegionName = HRegionInfo.encodeRegionName(regionName);
2471 return this.onlineRegions.get(encodedRegionName);
2472 }
2473
2474 public InetSocketAddress[] getRegionBlockLocations(final String encodedRegionName) {
2475 return this.regionFavoredNodesMap.get(encodedRegionName);
2476 }
2477
2478 @Override
2479 public HRegion getFromOnlineRegions(final String encodedRegionName) {
2480 return this.onlineRegions.get(encodedRegionName);
2481 }
2482
2483
2484 @Override
2485 public boolean removeFromOnlineRegions(final HRegion r, ServerName destination) {
2486 HRegion toReturn = this.onlineRegions.remove(r.getRegionInfo().getEncodedName());
2487
2488 if (destination != null) {
2489 HLog wal = getWAL();
2490 long closeSeqNum = wal.getEarliestMemstoreSeqNum(r.getRegionInfo().getEncodedNameAsBytes());
2491 if (closeSeqNum == HConstants.NO_SEQNUM) {
2492
2493 closeSeqNum = r.getOpenSeqNum();
2494 if (closeSeqNum == HConstants.NO_SEQNUM) {
2495 closeSeqNum = 0;
2496 }
2497 }
2498 addToMovedRegions(r.getRegionInfo().getEncodedName(), destination, closeSeqNum);
2499 }
2500 this.regionFavoredNodesMap.remove(r.getRegionInfo().getEncodedName());
2501 return toReturn != null;
2502 }
2503
2504
2505
2506
2507
2508
2509
2510
2511
2512 protected HRegion getRegion(final byte[] regionName)
2513 throws NotServingRegionException {
2514 String encodedRegionName = HRegionInfo.encodeRegionName(regionName);
2515 return getRegionByEncodedName(encodedRegionName);
2516 }
2517
2518 protected HRegion getRegionByEncodedName(String encodedRegionName)
2519 throws NotServingRegionException {
2520 HRegion region = this.onlineRegions.get(encodedRegionName);
2521 if (region == null) {
2522 MovedRegionInfo moveInfo = getMovedRegion(encodedRegionName);
2523 if (moveInfo != null) {
2524 throw new RegionMovedException(moveInfo.getServerName(), moveInfo.getSeqNum());
2525 }
2526 Boolean isOpening = this.regionsInTransitionInRS.get(Bytes.toBytes(encodedRegionName));
2527 if (isOpening != null && isOpening.booleanValue()) {
2528 throw new RegionOpeningException("Region is being opened: " + encodedRegionName);
2529 }
2530 throw new NotServingRegionException("Region is not online: " + encodedRegionName);
2531 }
2532 return region;
2533 }
2534
2535
2536
2537
2538
2539
2540
2541
2542
2543 protected Throwable cleanup(final Throwable t) {
2544 return cleanup(t, null);
2545 }
2546
2547
2548
2549
2550
2551
2552
2553
2554
2555
2556
2557 protected Throwable cleanup(final Throwable t, final String msg) {
2558
2559 if (t instanceof NotServingRegionException) {
2560 LOG.debug("NotServingRegionException; " + t.getMessage());
2561 return t;
2562 }
2563 if (msg == null) {
2564 LOG.error("", RemoteExceptionHandler.checkThrowable(t));
2565 } else {
2566 LOG.error(msg, RemoteExceptionHandler.checkThrowable(t));
2567 }
2568 if (!checkOOME(t)) {
2569 checkFileSystem();
2570 }
2571 return t;
2572 }
2573
2574
2575
2576
2577
2578
2579 protected IOException convertThrowableToIOE(final Throwable t) {
2580 return convertThrowableToIOE(t, null);
2581 }
2582
2583
2584
2585
2586
2587
2588
2589
2590 protected IOException convertThrowableToIOE(final Throwable t, final String msg) {
2591 return (t instanceof IOException ? (IOException) t : msg == null
2592 || msg.length() == 0 ? new IOException(t) : new IOException(msg, t));
2593 }
2594
2595
2596
2597
2598
2599
2600
2601
2602 public boolean checkOOME(final Throwable e) {
2603 boolean stop = false;
2604 try {
2605 if (e instanceof OutOfMemoryError
2606 || (e.getCause() != null && e.getCause() instanceof OutOfMemoryError)
2607 || (e.getMessage() != null && e.getMessage().contains(
2608 "java.lang.OutOfMemoryError"))) {
2609 stop = true;
2610 LOG.fatal(
2611 "Run out of memory; HRegionServer will abort itself immediately", e);
2612 }
2613 } finally {
2614 if (stop) {
2615 Runtime.getRuntime().halt(1);
2616 }
2617 }
2618 return stop;
2619 }
2620
2621
2622
2623
2624
2625
2626
2627 public boolean checkFileSystem() {
2628 if (this.fsOk && this.fs != null) {
2629 try {
2630 FSUtils.checkFileSystemAvailable(this.fs);
2631 } catch (IOException e) {
2632 abort("File System not available", e);
2633 this.fsOk = false;
2634 }
2635 }
2636 return this.fsOk;
2637 }
2638
2639 protected long addScanner(RegionScanner s) throws LeaseStillHeldException {
2640 long scannerId = -1;
2641 while (true) {
2642 scannerId = rand.nextLong();
2643 if (scannerId == -1) continue;
2644 String scannerName = String.valueOf(scannerId);
2645 RegionScannerHolder existing = scanners.putIfAbsent(scannerName, new RegionScannerHolder(s));
2646 if (existing == null) {
2647 this.leases.createLease(scannerName, this.scannerLeaseTimeoutPeriod,
2648 new ScannerListener(scannerName));
2649 break;
2650 }
2651 }
2652 return scannerId;
2653 }
2654
2655
2656
2657
2658
2659
2660 protected long nextLong() {
2661 long n = rand.nextLong();
2662 if (n == 0) {
2663 return nextLong();
2664 }
2665 if (n < 0) {
2666 n = -n;
2667 }
2668 return n;
2669 }
2670
2671
2672
2673
2674
2675
2676
2677
2678
2679
2680 @Override
2681 public GetResponse get(final RpcController controller,
2682 final GetRequest request) throws ServiceException {
2683 long before = EnvironmentEdgeManager.currentTimeMillis();
2684 try {
2685 requestCount.increment();
2686 HRegion region = getRegion(request.getRegion());
2687
2688 GetResponse.Builder builder = GetResponse.newBuilder();
2689 ClientProtos.Get get = request.getGet();
2690 Boolean existence = null;
2691 Result r = null;
2692
2693 if (request.getClosestRowBefore()) {
2694 if (get.getColumnCount() != 1) {
2695 throw new DoNotRetryIOException(
2696 "get ClosestRowBefore supports one and only one family now, not "
2697 + get.getColumnCount() + " families");
2698 }
2699 byte[] row = get.getRow().toByteArray();
2700 byte[] family = get.getColumn(0).getFamily().toByteArray();
2701 r = region.getClosestRowBefore(row, family);
2702 } else {
2703 Get clientGet = ProtobufUtil.toGet(get);
2704 if (request.getExistenceOnly() && region.getCoprocessorHost() != null) {
2705 existence = region.getCoprocessorHost().preExists(clientGet);
2706 }
2707 if (existence == null) {
2708 r = region.get(clientGet);
2709 if (request.getExistenceOnly()) {
2710 boolean exists = r != null && !r.isEmpty();
2711 if (region.getCoprocessorHost() != null) {
2712 exists = region.getCoprocessorHost().postExists(clientGet, exists);
2713 }
2714 existence = exists;
2715 }
2716 }
2717 }
2718 if (existence != null) {
2719 builder.setExists(existence.booleanValue());
2720 } else if (r != null) {
2721 builder.setResult(ProtobufUtil.toResult(r));
2722 }
2723 return builder.build();
2724 } catch (IOException ie) {
2725 throw new ServiceException(ie);
2726 } finally {
2727 metricsRegionServer.updateGet(EnvironmentEdgeManager.currentTimeMillis() - before);
2728 }
2729 }
2730
2731
2732
2733
2734
2735
2736
2737
2738 @Override
2739 public MultiGetResponse multiGet(final RpcController controller, final MultiGetRequest request)
2740 throws ServiceException {
2741 long before = EnvironmentEdgeManager.currentTimeMillis();
2742 try {
2743 requestCount.add(request.getGetCount());
2744 HRegion region = getRegion(request.getRegion());
2745 MultiGetResponse.Builder builder = MultiGetResponse.newBuilder();
2746 for (ClientProtos.Get get: request.getGetList())
2747 {
2748 Boolean existence = null;
2749 Result r = null;
2750 if (request.getClosestRowBefore()) {
2751 if (get.getColumnCount() != 1) {
2752 throw new DoNotRetryIOException(
2753 "get ClosestRowBefore supports one and only one family now, not "
2754 + get.getColumnCount() + " families");
2755 }
2756 byte[] row = get.getRow().toByteArray();
2757 byte[] family = get.getColumn(0).getFamily().toByteArray();
2758 r = region.getClosestRowBefore(row, family);
2759 } else {
2760 Get clientGet = ProtobufUtil.toGet(get);
2761 if (request.getExistenceOnly() && region.getCoprocessorHost() != null) {
2762 existence = region.getCoprocessorHost().preExists(clientGet);
2763 }
2764 if (existence == null) {
2765 r = region.get(clientGet);
2766 if (request.getExistenceOnly()) {
2767 boolean exists = r != null && !r.isEmpty();
2768 if (region.getCoprocessorHost() != null) {
2769 exists = region.getCoprocessorHost().postExists(clientGet, exists);
2770 }
2771 existence = exists;
2772 }
2773 }
2774 }
2775 if (existence != null) {
2776 builder.addExists(existence.booleanValue());
2777 } else if (r != null) {
2778 builder.addResult(ProtobufUtil.toResult(r));
2779 }
2780 }
2781 return builder.build();
2782 } catch (IOException ie) {
2783 throw new ServiceException(ie);
2784 } finally {
2785 metricsRegionServer.updateGet(EnvironmentEdgeManager.currentTimeMillis() - before);
2786 }
2787 }
2788
2789
2790
2791
2792
2793
2794
2795
2796 @Override
2797 public MutateResponse mutate(final RpcController rpcc,
2798 final MutateRequest request) throws ServiceException {
2799
2800
2801 PayloadCarryingRpcController controller = (PayloadCarryingRpcController)rpcc;
2802 CellScanner cellScanner = controller != null? controller.cellScanner(): null;
2803
2804 controller.setCellScanner(null);
2805 try {
2806 requestCount.increment();
2807 HRegion region = getRegion(request.getRegion());
2808 MutateResponse.Builder builder = MutateResponse.newBuilder();
2809 MutationProto mutation = request.getMutation();
2810 if (!region.getRegionInfo().isMetaTable()) {
2811 cacheFlusher.reclaimMemStoreMemory();
2812 }
2813 Result r = null;
2814 Boolean processed = null;
2815 MutationType type = mutation.getMutateType();
2816 switch (type) {
2817 case APPEND:
2818 r = append(region, mutation, cellScanner);
2819 break;
2820 case INCREMENT:
2821 r = increment(region, mutation, cellScanner);
2822 break;
2823 case PUT:
2824 Put put = ProtobufUtil.toPut(mutation, cellScanner);
2825 if (request.hasCondition()) {
2826 Condition condition = request.getCondition();
2827 byte[] row = condition.getRow().toByteArray();
2828 byte[] family = condition.getFamily().toByteArray();
2829 byte[] qualifier = condition.getQualifier().toByteArray();
2830 CompareOp compareOp = CompareOp.valueOf(condition.getCompareType().name());
2831 ByteArrayComparable comparator =
2832 ProtobufUtil.toComparator(condition.getComparator());
2833 if (region.getCoprocessorHost() != null) {
2834 processed = region.getCoprocessorHost().preCheckAndPut(
2835 row, family, qualifier, compareOp, comparator, put);
2836 }
2837 if (processed == null) {
2838 boolean result = region.checkAndMutate(row, family,
2839 qualifier, compareOp, comparator, put, true);
2840 if (region.getCoprocessorHost() != null) {
2841 result = region.getCoprocessorHost().postCheckAndPut(row, family,
2842 qualifier, compareOp, comparator, put, result);
2843 }
2844 processed = result;
2845 }
2846 } else {
2847 region.put(put);
2848 processed = Boolean.TRUE;
2849 }
2850 break;
2851 case DELETE:
2852 Delete delete = ProtobufUtil.toDelete(mutation, cellScanner);
2853 if (request.hasCondition()) {
2854 Condition condition = request.getCondition();
2855 byte[] row = condition.getRow().toByteArray();
2856 byte[] family = condition.getFamily().toByteArray();
2857 byte[] qualifier = condition.getQualifier().toByteArray();
2858 CompareOp compareOp = CompareOp.valueOf(condition.getCompareType().name());
2859 ByteArrayComparable comparator =
2860 ProtobufUtil.toComparator(condition.getComparator());
2861 if (region.getCoprocessorHost() != null) {
2862 processed = region.getCoprocessorHost().preCheckAndDelete(
2863 row, family, qualifier, compareOp, comparator, delete);
2864 }
2865 if (processed == null) {
2866 boolean result = region.checkAndMutate(row, family,
2867 qualifier, compareOp, comparator, delete, true);
2868 if (region.getCoprocessorHost() != null) {
2869 result = region.getCoprocessorHost().postCheckAndDelete(row, family,
2870 qualifier, compareOp, comparator, delete, result);
2871 }
2872 processed = result;
2873 }
2874 } else {
2875 region.delete(delete);
2876 processed = Boolean.TRUE;
2877 }
2878 break;
2879 default:
2880 throw new DoNotRetryIOException(
2881 "Unsupported mutate type: " + type.name());
2882 }
2883 CellScannable cellsToReturn = null;
2884 if (processed != null) {
2885 builder.setProcessed(processed.booleanValue());
2886 } else if (r != null) {
2887 builder.setResult(ProtobufUtil.toResultNoData(r));
2888 cellsToReturn = r;
2889 }
2890 if (cellsToReturn != null) {
2891 controller.setCellScanner(cellsToReturn.cellScanner());
2892 }
2893 return builder.build();
2894 } catch (IOException ie) {
2895 checkFileSystem();
2896 throw new ServiceException(ie);
2897 }
2898 }
2899
2900
2901
2902
2903
2904
2905
2906
2907
2908
2909
2910
2911 @Override
2912 public ScanResponse scan(final RpcController controller,
2913 final ScanRequest request) throws ServiceException {
2914 Leases.Lease lease = null;
2915 String scannerName = null;
2916 try {
2917 if (!request.hasScannerId() && !request.hasScan()) {
2918 throw new DoNotRetryIOException(
2919 "Missing required input: scannerId or scan");
2920 }
2921 long scannerId = -1;
2922 if (request.hasScannerId()) {
2923 scannerId = request.getScannerId();
2924 scannerName = String.valueOf(scannerId);
2925 }
2926 try {
2927 checkOpen();
2928 } catch (IOException e) {
2929
2930
2931 if (scannerName != null) {
2932 try {
2933 leases.cancelLease(scannerName);
2934 } catch (LeaseException le) {
2935 LOG.info("Server shutting down and client tried to access missing scanner " +
2936 scannerName);
2937 }
2938 }
2939 throw e;
2940 }
2941 requestCount.increment();
2942
2943 try {
2944 int ttl = 0;
2945 HRegion region = null;
2946 RegionScanner scanner = null;
2947 RegionScannerHolder rsh = null;
2948 boolean moreResults = true;
2949 boolean closeScanner = false;
2950 Long resultsWireSize = null;
2951 ScanResponse.Builder builder = ScanResponse.newBuilder();
2952 if (request.hasCloseScanner()) {
2953 closeScanner = request.getCloseScanner();
2954 }
2955 int rows = 1;
2956 if (request.hasNumberOfRows()) {
2957 rows = request.getNumberOfRows();
2958 }
2959 if (request.hasScannerId()) {
2960 rsh = scanners.get(scannerName);
2961 if (rsh == null) {
2962 throw new UnknownScannerException(
2963 "Name: " + scannerName + ", already closed?");
2964 }
2965 scanner = rsh.s;
2966 region = getRegion(scanner.getRegionInfo().getRegionName());
2967 } else {
2968 region = getRegion(request.getRegion());
2969 ClientProtos.Scan protoScan = request.getScan();
2970 boolean isLoadingCfsOnDemandSet = protoScan.hasLoadColumnFamiliesOnDemand();
2971 Scan scan = ProtobufUtil.toScan(protoScan);
2972
2973 if (!isLoadingCfsOnDemandSet) {
2974 scan.setLoadColumnFamiliesOnDemand(region.isLoadingCfsOnDemandDefault());
2975 }
2976 byte[] hasMetrics = scan.getAttribute(Scan.SCAN_ATTRIBUTES_METRICS_ENABLE);
2977 resultsWireSize = (hasMetrics != null && Bytes.toBoolean(hasMetrics)) ? 0L : null;
2978 region.prepareScanner(scan);
2979 if (region.getCoprocessorHost() != null) {
2980 scanner = region.getCoprocessorHost().preScannerOpen(scan);
2981 }
2982 if (scanner == null) {
2983 scanner = region.getScanner(scan);
2984 }
2985 if (region.getCoprocessorHost() != null) {
2986 scanner = region.getCoprocessorHost().postScannerOpen(scan, scanner);
2987 }
2988 scannerId = addScanner(scanner);
2989 scannerName = String.valueOf(scannerId);
2990 ttl = this.scannerLeaseTimeoutPeriod;
2991 }
2992
2993 if (rows > 0) {
2994
2995
2996
2997 if (request.hasNextCallSeq()) {
2998 if (rsh == null) {
2999 rsh = scanners.get(scannerName);
3000 }
3001 if (rsh != null) {
3002 if (request.getNextCallSeq() != rsh.nextCallSeq) {
3003 throw new OutOfOrderScannerNextException("Expected nextCallSeq: " + rsh.nextCallSeq
3004 + " But the nextCallSeq got from client: " + request.getNextCallSeq() +
3005 "; request=" + TextFormat.shortDebugString(request));
3006 }
3007
3008 rsh.nextCallSeq++;
3009 }
3010 }
3011 try {
3012
3013
3014 lease = leases.removeLease(scannerName);
3015 List<Result> results = new ArrayList<Result>(rows);
3016 long currentScanResultSize = 0;
3017
3018 boolean done = false;
3019
3020 if (region != null && region.getCoprocessorHost() != null) {
3021 Boolean bypass = region.getCoprocessorHost().preScannerNext(
3022 scanner, results, rows);
3023 if (!results.isEmpty()) {
3024 for (Result r : results) {
3025 if (maxScannerResultSize < Long.MAX_VALUE){
3026 for (KeyValue kv : r.raw()) {
3027 currentScanResultSize += kv.heapSize();
3028 }
3029 }
3030 }
3031 }
3032 if (bypass != null && bypass.booleanValue()) {
3033 done = true;
3034 }
3035 }
3036
3037 if (!done) {
3038 long maxResultSize = scanner.getMaxResultSize();
3039 if (maxResultSize <= 0) {
3040 maxResultSize = maxScannerResultSize;
3041 }
3042 List<KeyValue> values = new ArrayList<KeyValue>();
3043 MultiVersionConsistencyControl.setThreadReadPoint(scanner.getMvccReadPoint());
3044 region.startRegionOperation(Operation.SCAN);
3045 try {
3046 int i = 0;
3047 synchronized(scanner) {
3048 for (; i < rows
3049 && currentScanResultSize < maxResultSize; i++) {
3050
3051 boolean moreRows = scanner.nextRaw(values);
3052 if (!values.isEmpty()) {
3053 if (maxScannerResultSize < Long.MAX_VALUE){
3054 for (KeyValue kv : values) {
3055 currentScanResultSize += kv.heapSize();
3056 }
3057 }
3058 results.add(new Result(values));
3059 }
3060 if (!moreRows) {
3061 break;
3062 }
3063 values.clear();
3064 }
3065 }
3066 region.readRequestsCount.add(i);
3067 } finally {
3068 region.closeRegionOperation();
3069 }
3070
3071
3072 if (region != null && region.getCoprocessorHost() != null) {
3073 region.getCoprocessorHost().postScannerNext(scanner, results, rows, true);
3074 }
3075 }
3076
3077
3078
3079
3080 if (scanner.isFilterDone() && results.isEmpty()) {
3081 moreResults = false;
3082 results = null;
3083 } else {
3084 for (Result result: results) {
3085 if (result != null) {
3086 ClientProtos.Result pbResult = ProtobufUtil.toResult(result);
3087 if (resultsWireSize != null) {
3088 resultsWireSize += pbResult.getSerializedSize();
3089 }
3090 builder.addResult(pbResult);
3091 }
3092 }
3093 if (resultsWireSize != null) {
3094 builder.setResultSizeBytes(resultsWireSize.longValue());
3095 }
3096 }
3097 } finally {
3098
3099
3100 if (scanners.containsKey(scannerName)) {
3101 if (lease != null) leases.addLease(lease);
3102 ttl = this.scannerLeaseTimeoutPeriod;
3103 }
3104 }
3105 }
3106
3107 if (!moreResults || closeScanner) {
3108 ttl = 0;
3109 moreResults = false;
3110 if (region != null && region.getCoprocessorHost() != null) {
3111 if (region.getCoprocessorHost().preScannerClose(scanner)) {
3112 return builder.build();
3113 }
3114 }
3115 rsh = scanners.remove(scannerName);
3116 if (rsh != null) {
3117 scanner = rsh.s;
3118 scanner.close();
3119 leases.cancelLease(scannerName);
3120 if (region != null && region.getCoprocessorHost() != null) {
3121 region.getCoprocessorHost().postScannerClose(scanner);
3122 }
3123 }
3124 }
3125
3126 if (ttl > 0) {
3127 builder.setTtl(ttl);
3128 }
3129 builder.setScannerId(scannerId);
3130 builder.setMoreResults(moreResults);
3131 return builder.build();
3132 } catch (Throwable t) {
3133 if (scannerName != null && t instanceof NotServingRegionException) {
3134 scanners.remove(scannerName);
3135 }
3136 throw convertThrowableToIOE(cleanup(t));
3137 }
3138 } catch (IOException ie) {
3139 throw new ServiceException(ie);
3140 }
3141 }
3142
3143
3144
3145
3146
3147
3148 @Override
3149 public BulkLoadHFileResponse bulkLoadHFile(final RpcController controller,
3150 final BulkLoadHFileRequest request) throws ServiceException {
3151 try {
3152 requestCount.increment();
3153 HRegion region = getRegion(request.getRegion());
3154 List<Pair<byte[], String>> familyPaths = new ArrayList<Pair<byte[], String>>();
3155 for (FamilyPath familyPath: request.getFamilyPathList()) {
3156 familyPaths.add(new Pair<byte[], String>(familyPath.getFamily().toByteArray(),
3157 familyPath.getPath()));
3158 }
3159 boolean bypass = false;
3160 if (region.getCoprocessorHost() != null) {
3161 bypass = region.getCoprocessorHost().preBulkLoadHFile(familyPaths);
3162 }
3163 boolean loaded = false;
3164 if (!bypass) {
3165 loaded = region.bulkLoadHFiles(familyPaths, request.getAssignSeqNum());
3166 }
3167 if (region.getCoprocessorHost() != null) {
3168 loaded = region.getCoprocessorHost().postBulkLoadHFile(familyPaths, loaded);
3169 }
3170 BulkLoadHFileResponse.Builder builder = BulkLoadHFileResponse.newBuilder();
3171 builder.setLoaded(loaded);
3172 return builder.build();
3173 } catch (IOException ie) {
3174 throw new ServiceException(ie);
3175 }
3176 }
3177
3178 @Override
3179 public CoprocessorServiceResponse execService(final RpcController controller,
3180 final CoprocessorServiceRequest request) throws ServiceException {
3181 try {
3182 requestCount.increment();
3183 HRegion region = getRegion(request.getRegion());
3184
3185 ServerRpcController execController = new ServerRpcController();
3186 Message result = region.execService(execController, request.getCall());
3187 if (execController.getFailedOn() != null) {
3188 throw execController.getFailedOn();
3189 }
3190 CoprocessorServiceResponse.Builder builder =
3191 CoprocessorServiceResponse.newBuilder();
3192 builder.setRegion(RequestConverter.buildRegionSpecifier(
3193 RegionSpecifierType.REGION_NAME, region.getRegionName()));
3194 builder.setValue(
3195 builder.getValueBuilder().setName(result.getClass().getName())
3196 .setValue(result.toByteString()));
3197 return builder.build();
3198 } catch (IOException ie) {
3199 throw new ServiceException(ie);
3200 }
3201 }
3202
3203
3204
3205
3206
3207
3208
3209
3210 @Override
3211 public MultiResponse multi(final RpcController rpcc, final MultiRequest request)
3212 throws ServiceException {
3213
3214
3215 PayloadCarryingRpcController controller = (PayloadCarryingRpcController)rpcc;
3216 CellScanner cellScanner = controller != null? controller.cellScanner(): null;
3217
3218 controller.setCellScanner(null);
3219 List<CellScannable> cellsToReturn = null;
3220 try {
3221 HRegion region = getRegion(request.getRegion());
3222 MultiResponse.Builder builder = MultiResponse.newBuilder();
3223 List<MutationProto> mutations = new ArrayList<MutationProto>(request.getActionCount());
3224
3225 if (request.hasAtomic() && request.getAtomic()) {
3226
3227 for (ClientProtos.MultiAction actionUnion : request.getActionList()) {
3228 if (actionUnion.hasMutation()) {
3229 mutations.add(actionUnion.getMutation());
3230 } else {
3231 throw new DoNotRetryIOException("Unsupported atomic action type: " + actionUnion);
3232 }
3233 }
3234
3235 if (!mutations.isEmpty()) mutateRows(region, mutations, cellScanner);
3236 } else {
3237
3238 ActionResult.Builder resultBuilder = null;
3239 cellsToReturn = new ArrayList<CellScannable>(request.getActionCount());
3240 for (ClientProtos.MultiAction actionUnion : request.getActionList()) {
3241 this.requestCount.increment();
3242 ClientProtos.Result result = null;
3243 try {
3244 if (actionUnion.hasGet()) {
3245 Get get = ProtobufUtil.toGet(actionUnion.getGet());
3246 Result r = region.get(get);
3247 if (r != null) {
3248
3249 result = ProtobufUtil.toResultNoData(r);
3250
3251
3252 cellsToReturn.add(r);
3253 }
3254 } else if (actionUnion.hasMutation()) {
3255 MutationProto mutation = actionUnion.getMutation();
3256 MutationType type = mutation.getMutateType();
3257 if (type != MutationType.PUT && type != MutationType.DELETE) {
3258 if (!mutations.isEmpty()) {
3259 doBatchOp(builder, region, mutations, cellScanner);
3260 mutations.clear();
3261 } else if (!region.getRegionInfo().isMetaTable()) {
3262 cacheFlusher.reclaimMemStoreMemory();
3263 }
3264 }
3265 Result r = null;
3266 switch (type) {
3267 case APPEND:
3268 r = append(region, mutation, cellScanner);
3269 break;
3270 case INCREMENT:
3271 r = increment(region, mutation, cellScanner);
3272 break;
3273 case PUT:
3274 case DELETE:
3275 mutations.add(mutation);
3276 break;
3277 default:
3278 throw new DoNotRetryIOException("Unsupported mutate type: " + type.name());
3279 }
3280 if (r != null) {
3281
3282
3283 result = ProtobufUtil.toResultNoData(r);
3284 cellsToReturn.add(r);
3285 }
3286 } else {
3287 LOG.warn("Error: invalid action: " + actionUnion + ". "
3288 + "it must be a Get, Mutate, or Exec.");
3289 throw new DoNotRetryIOException("Invalid action, "
3290 + "it must be a Get, Mutate, or Exec.");
3291 }
3292 if (result != null) {
3293 if (resultBuilder == null) {
3294 resultBuilder = ActionResult.newBuilder();
3295 } else {
3296 resultBuilder.clear();
3297 }
3298 resultBuilder.setValue(result);
3299 builder.addResult(resultBuilder.build());
3300 }
3301 } catch (IOException ie) {
3302 builder.addResult(ResponseConverter.buildActionResult(ie));
3303 }
3304 }
3305 if (!mutations.isEmpty()) {
3306 doBatchOp(builder, region, mutations, cellScanner);
3307 }
3308 }
3309
3310 if (cellsToReturn != null && !cellsToReturn.isEmpty()) {
3311 controller.setCellScanner(CellUtil.createCellScanner(cellsToReturn));
3312 }
3313 return builder.build();
3314 } catch (IOException ie) {
3315 throw new ServiceException(ie);
3316 }
3317 }
3318
3319
3320
3321
3322 @Override
3323 @QosPriority(priority=HConstants.HIGH_QOS)
3324 public GetRegionInfoResponse getRegionInfo(final RpcController controller,
3325 final GetRegionInfoRequest request) throws ServiceException {
3326 try {
3327 checkOpen();
3328 requestCount.increment();
3329 HRegion region = getRegion(request.getRegion());
3330 HRegionInfo info = region.getRegionInfo();
3331 GetRegionInfoResponse.Builder builder = GetRegionInfoResponse.newBuilder();
3332 builder.setRegionInfo(HRegionInfo.convert(info));
3333 if (request.hasCompactionState() && request.getCompactionState()) {
3334 builder.setCompactionState(region.getCompactionState());
3335 }
3336 return builder.build();
3337 } catch (IOException ie) {
3338 throw new ServiceException(ie);
3339 }
3340 }
3341
3342 @Override
3343 public GetStoreFileResponse getStoreFile(final RpcController controller,
3344 final GetStoreFileRequest request) throws ServiceException {
3345 try {
3346 HRegion region = getRegion(request.getRegion());
3347 requestCount.increment();
3348 Set<byte[]> columnFamilies;
3349 if (request.getFamilyCount() == 0) {
3350 columnFamilies = region.getStores().keySet();
3351 } else {
3352 columnFamilies = new TreeSet<byte[]>(Bytes.BYTES_RAWCOMPARATOR);
3353 for (ByteString cf: request.getFamilyList()) {
3354 columnFamilies.add(cf.toByteArray());
3355 }
3356 }
3357 int nCF = columnFamilies.size();
3358 List<String> fileList = region.getStoreFileList(
3359 columnFamilies.toArray(new byte[nCF][]));
3360 GetStoreFileResponse.Builder builder = GetStoreFileResponse.newBuilder();
3361 builder.addAllStoreFile(fileList);
3362 return builder.build();
3363 } catch (IOException ie) {
3364 throw new ServiceException(ie);
3365 }
3366 }
3367
3368 @Override
3369 @QosPriority(priority=HConstants.HIGH_QOS)
3370 public GetOnlineRegionResponse getOnlineRegion(final RpcController controller,
3371 final GetOnlineRegionRequest request) throws ServiceException {
3372 try {
3373 checkOpen();
3374 requestCount.increment();
3375 List<HRegionInfo> list = new ArrayList<HRegionInfo>(onlineRegions.size());
3376 for (HRegion region: this.onlineRegions.values()) {
3377 list.add(region.getRegionInfo());
3378 }
3379 Collections.sort(list);
3380 return ResponseConverter.buildGetOnlineRegionResponse(list);
3381 } catch (IOException ie) {
3382 throw new ServiceException(ie);
3383 }
3384 }
3385
3386
3387
3388
3389
3390
3391
3392
3393
3394
3395
3396
3397
3398
3399
3400
3401
3402
3403
3404
3405
3406
3407
3408
3409
3410
3411
3412 @Override
3413 @QosPriority(priority=HConstants.HIGH_QOS)
3414 public OpenRegionResponse openRegion(final RpcController controller,
3415 final OpenRegionRequest request) throws ServiceException {
3416 try {
3417 checkOpen();
3418 } catch (IOException ie) {
3419 throw new ServiceException(ie);
3420 }
3421 requestCount.increment();
3422 OpenRegionResponse.Builder builder = OpenRegionResponse.newBuilder();
3423 final int regionCount = request.getOpenInfoCount();
3424 final Map<String, HTableDescriptor> htds = new HashMap<String, HTableDescriptor>(regionCount);
3425 final boolean isBulkAssign = regionCount > 1;
3426 for (RegionOpenInfo regionOpenInfo : request.getOpenInfoList()) {
3427 final HRegionInfo region = HRegionInfo.convert(regionOpenInfo.getRegion());
3428
3429 int versionOfOfflineNode = -1;
3430 if (regionOpenInfo.hasVersionOfOfflineNode()) {
3431 versionOfOfflineNode = regionOpenInfo.getVersionOfOfflineNode();
3432 }
3433 HTableDescriptor htd;
3434 try {
3435 final HRegion onlineRegion = getFromOnlineRegions(region.getEncodedName());
3436 if (onlineRegion != null) {
3437
3438 if (onlineRegion.getCoprocessorHost() != null) {
3439 onlineRegion.getCoprocessorHost().preOpen();
3440 }
3441
3442
3443 Pair<HRegionInfo, ServerName> p = MetaReader.getRegion(
3444 this.catalogTracker, region.getRegionName());
3445 if (this.getServerName().equals(p.getSecond())) {
3446 LOG.warn("Attempted open of " + region.getEncodedName()
3447 + " but already online on this server");
3448 builder.addOpeningState(RegionOpeningState.ALREADY_OPENED);
3449 continue;
3450 } else {
3451 LOG.warn("The region " + region.getEncodedName() + " is online on this server" +
3452 " but META does not have this server - continue opening.");
3453 removeFromOnlineRegions(onlineRegion, null);
3454 }
3455 }
3456 LOG.info("Received request to open region: " + region.getRegionNameAsString() + " on "
3457 + this.serverNameFromMasterPOV);
3458 htd = htds.get(region.getTableNameAsString());
3459 if (htd == null) {
3460 htd = this.tableDescriptors.get(region.getTableName());
3461 htds.put(region.getTableNameAsString(), htd);
3462 }
3463
3464 final Boolean previous = this.regionsInTransitionInRS.putIfAbsent(
3465 region.getEncodedNameAsBytes(), Boolean.TRUE);
3466
3467 if (Boolean.FALSE.equals(previous)) {
3468
3469 OpenRegionHandler.
3470 tryTransitionFromOfflineToFailedOpen(this, region, versionOfOfflineNode);
3471
3472 throw new RegionAlreadyInTransitionException("Received OPEN for the region:" +
3473 region.getRegionNameAsString() + " , which we are already trying to CLOSE ");
3474 }
3475
3476 if (Boolean.TRUE.equals(previous)) {
3477
3478 LOG.info("Receiving OPEN for the region:" +
3479 region.getRegionNameAsString() + " , which we are already trying to OPEN" +
3480 " - ignoring this new request for this region.");
3481 }
3482
3483
3484
3485 removeFromMovedRegions(region.getEncodedName());
3486
3487 if (previous == null) {
3488
3489 if (isRegionMarkedRecoveringInZK(region.getEncodedName())) {
3490 this.recoveringRegions.put(region.getEncodedName(), null);
3491 }
3492
3493
3494 if (region.isMetaRegion()) {
3495 this.service.submit(new OpenMetaHandler(this, this, region, htd,
3496 versionOfOfflineNode));
3497 } else {
3498 updateRegionFavoredNodesMapping(region.getEncodedName(),
3499 regionOpenInfo.getFavoredNodesList());
3500 this.service.submit(new OpenRegionHandler(this, this, region, htd,
3501 versionOfOfflineNode));
3502 }
3503 }
3504
3505 builder.addOpeningState(RegionOpeningState.OPENED);
3506
3507 } catch (KeeperException zooKeeperEx) {
3508 LOG.error("Can't retrieve recovering state from zookeeper", zooKeeperEx);
3509 throw new ServiceException(zooKeeperEx);
3510 } catch (IOException ie) {
3511 LOG.warn("Failed opening region " + region.getRegionNameAsString(), ie);
3512 if (isBulkAssign) {
3513 builder.addOpeningState(RegionOpeningState.FAILED_OPENING);
3514 } else {
3515 throw new ServiceException(ie);
3516 }
3517 }
3518 }
3519
3520 return builder.build();
3521 }
3522
3523 private void updateRegionFavoredNodesMapping(String encodedRegionName,
3524 List<org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName> favoredNodes) {
3525 InetSocketAddress[] addr = new InetSocketAddress[favoredNodes.size()];
3526
3527
3528 for (int i = 0; i < favoredNodes.size(); i++) {
3529 addr[i] = InetSocketAddress.createUnresolved(favoredNodes.get(i).getHostName(),
3530 favoredNodes.get(i).getPort());
3531 }
3532 regionFavoredNodesMap.put(encodedRegionName, addr);
3533 }
3534
3535
3536
3537
3538
3539
3540
3541 public InetSocketAddress[] getFavoredNodesForRegion(String encodedRegionName) {
3542 return regionFavoredNodesMap.get(encodedRegionName);
3543 }
3544
3545
3546
3547
3548
3549
3550
3551
3552 @Override
3553 @QosPriority(priority=HConstants.HIGH_QOS)
3554 public CloseRegionResponse closeRegion(final RpcController controller,
3555 final CloseRegionRequest request) throws ServiceException {
3556 int versionOfClosingNode = -1;
3557 if (request.hasVersionOfClosingNode()) {
3558 versionOfClosingNode = request.getVersionOfClosingNode();
3559 }
3560 boolean zk = request.getTransitionInZK();
3561 final ServerName sn = (request.hasDestinationServer() ?
3562 ProtobufUtil.toServerName(request.getDestinationServer()) : null);
3563
3564 try {
3565 checkOpen();
3566 final String encodedRegionName = ProtobufUtil.getRegionEncodedName(request.getRegion());
3567
3568
3569 final HRegion region = this.getFromOnlineRegions(encodedRegionName);
3570 if ((region != null) && (region .getCoprocessorHost() != null)) {
3571 region.getCoprocessorHost().preClose(false);
3572 }
3573
3574 requestCount.increment();
3575 LOG.info("Received close region: " + encodedRegionName +
3576 "Transitioning in ZK: " + (zk ? "yes" : "no") +
3577 ". Version of ZK closing node:" + versionOfClosingNode +
3578 ". Destination server:" + sn);
3579
3580 boolean closed = closeRegion(encodedRegionName, false, zk, versionOfClosingNode, sn);
3581 CloseRegionResponse.Builder builder = CloseRegionResponse.newBuilder().setClosed(closed);
3582 return builder.build();
3583 } catch (IOException ie) {
3584 throw new ServiceException(ie);
3585 }
3586 }
3587
3588
3589
3590
3591
3592
3593
3594
3595 @Override
3596 @QosPriority(priority=HConstants.HIGH_QOS)
3597 public FlushRegionResponse flushRegion(final RpcController controller,
3598 final FlushRegionRequest request) throws ServiceException {
3599 try {
3600 checkOpen();
3601 requestCount.increment();
3602 HRegion region = getRegion(request.getRegion());
3603 LOG.info("Flushing " + region.getRegionNameAsString());
3604 boolean shouldFlush = true;
3605 if (request.hasIfOlderThanTs()) {
3606 shouldFlush = region.getLastFlushTime() < request.getIfOlderThanTs();
3607 }
3608 FlushRegionResponse.Builder builder = FlushRegionResponse.newBuilder();
3609 if (shouldFlush) {
3610 builder.setFlushed(region.flushcache());
3611 }
3612 builder.setLastFlushTime(region.getLastFlushTime());
3613 return builder.build();
3614 } catch (IOException ie) {
3615 throw new ServiceException(ie);
3616 }
3617 }
3618
3619
3620
3621
3622
3623
3624
3625
3626 @Override
3627 @QosPriority(priority=HConstants.HIGH_QOS)
3628 public SplitRegionResponse splitRegion(final RpcController controller,
3629 final SplitRegionRequest request) throws ServiceException {
3630 try {
3631 checkOpen();
3632 requestCount.increment();
3633 HRegion region = getRegion(request.getRegion());
3634 region.startRegionOperation(Operation.SPLIT_REGION);
3635 LOG.info("Splitting " + region.getRegionNameAsString());
3636 region.flushcache();
3637 byte[] splitPoint = null;
3638 if (request.hasSplitPoint()) {
3639 splitPoint = request.getSplitPoint().toByteArray();
3640 }
3641 region.forceSplit(splitPoint);
3642 compactSplitThread.requestSplit(region, region.checkSplit());
3643 return SplitRegionResponse.newBuilder().build();
3644 } catch (IOException ie) {
3645 throw new ServiceException(ie);
3646 }
3647 }
3648
3649
3650
3651
3652
3653
3654
3655
3656
3657 @Override
3658 @QosPriority(priority = HConstants.HIGH_QOS)
3659 public MergeRegionsResponse mergeRegions(final RpcController controller,
3660 final MergeRegionsRequest request) throws ServiceException {
3661 try {
3662 checkOpen();
3663 requestCount.increment();
3664 HRegion regionA = getRegion(request.getRegionA());
3665 HRegion regionB = getRegion(request.getRegionB());
3666 boolean forcible = request.getForcible();
3667 regionA.startRegionOperation(Operation.MERGE_REGION);
3668 regionB.startRegionOperation(Operation.MERGE_REGION);
3669 LOG.info("Receiving merging request for " + regionA + ", " + regionB
3670 + ",forcible=" + forcible);
3671 regionA.flushcache();
3672 regionB.flushcache();
3673 compactSplitThread.requestRegionsMerge(regionA, regionB, forcible);
3674 return MergeRegionsResponse.newBuilder().build();
3675 } catch (IOException ie) {
3676 throw new ServiceException(ie);
3677 }
3678 }
3679
3680
3681
3682
3683
3684
3685
3686
3687 @Override
3688 @QosPriority(priority=HConstants.HIGH_QOS)
3689 public CompactRegionResponse compactRegion(final RpcController controller,
3690 final CompactRegionRequest request) throws ServiceException {
3691 try {
3692 checkOpen();
3693 requestCount.increment();
3694 HRegion region = getRegion(request.getRegion());
3695 LOG.info("Compacting " + region.getRegionNameAsString());
3696 boolean major = false;
3697 byte [] family = null;
3698 Store store = null;
3699 if (request.hasFamily()) {
3700 family = request.getFamily().toByteArray();
3701 store = region.getStore(family);
3702 if (store == null) {
3703 throw new ServiceException(new IOException("column family " + Bytes.toString(family) +
3704 " does not exist in region " + region.getRegionNameAsString()));
3705 }
3706 }
3707 if (request.hasMajor()) {
3708 major = request.getMajor();
3709 }
3710 if (major) {
3711 if (family != null) {
3712 store.triggerMajorCompaction();
3713 } else {
3714 region.triggerMajorCompaction();
3715 }
3716 }
3717
3718 String familyLogMsg = (family != null)?" for column family: " + Bytes.toString(family):"";
3719 LOG.trace("User-triggered compaction requested for region " +
3720 region.getRegionNameAsString() + familyLogMsg);
3721 String log = "User-triggered " + (major ? "major " : "") + "compaction" + familyLogMsg;
3722 if(family != null) {
3723 compactSplitThread.requestCompaction(region, store, log,
3724 Store.PRIORITY_USER, null);
3725 } else {
3726 compactSplitThread.requestCompaction(region, log,
3727 Store.PRIORITY_USER, null);
3728 }
3729 return CompactRegionResponse.newBuilder().build();
3730 } catch (IOException ie) {
3731 throw new ServiceException(ie);
3732 }
3733 }
3734
3735
3736
3737
3738
3739
3740
3741
3742 @Override
3743 @QosPriority(priority=HConstants.REPLICATION_QOS)
3744 public ReplicateWALEntryResponse replicateWALEntry(final RpcController controller,
3745 final ReplicateWALEntryRequest request) throws ServiceException {
3746 try {
3747 if (replicationSinkHandler != null) {
3748 checkOpen();
3749 requestCount.increment();
3750 HLog.Entry[] entries = ReplicationProtbufUtil.toHLogEntries(request.getEntryList());
3751 if (entries != null && entries.length > 0) {
3752 replicationSinkHandler.replicateLogEntries(entries);
3753 }
3754 }
3755 return ReplicateWALEntryResponse.newBuilder().build();
3756 } catch (IOException ie) {
3757 throw new ServiceException(ie);
3758 }
3759 }
3760
3761
3762
3763
3764
3765
3766
3767
3768
3769 @Override
3770 @QosPriority(priority = HConstants.REPLAY_QOS)
3771 public MultiResponse replay(final RpcController rpcc, final MultiRequest request)
3772 throws ServiceException {
3773 long before = EnvironmentEdgeManager.currentTimeMillis();
3774 PayloadCarryingRpcController controller = (PayloadCarryingRpcController) rpcc;
3775 CellScanner cellScanner = controller != null ? controller.cellScanner() : null;
3776
3777 controller.setCellScanner(null);
3778 try {
3779 checkOpen();
3780 HRegion region = getRegion(request.getRegion());
3781 MultiResponse.Builder builder = MultiResponse.newBuilder();
3782 List<MutationProto> mutates = new ArrayList<MutationProto>();
3783 for (ClientProtos.MultiAction actionUnion : request.getActionList()) {
3784 if (actionUnion.hasMutation()) {
3785 MutationProto mutate = actionUnion.getMutation();
3786 MutationType type = mutate.getMutateType();
3787 switch (type) {
3788 case PUT:
3789 case DELETE:
3790 mutates.add(mutate);
3791 break;
3792 default:
3793 throw new DoNotRetryIOException("Unsupported mutate type: " + type.name());
3794 }
3795 } else {
3796 LOG.warn("Error: invalid action: " + actionUnion + ". " + "it must be a Mutation.");
3797 throw new DoNotRetryIOException("Invalid action, " + "it must be a Mutation.");
3798 }
3799 }
3800 if (!mutates.isEmpty()) {
3801 doBatchOp(builder, region, mutates, cellScanner, true);
3802 }
3803 return builder.build();
3804 } catch (IOException ie) {
3805 throw new ServiceException(ie);
3806 } finally {
3807 metricsRegionServer.updateReplay(EnvironmentEdgeManager.currentTimeMillis() - before);
3808 }
3809 }
3810
3811
3812
3813
3814
3815
3816
3817 @Override
3818 public RollWALWriterResponse rollWALWriter(final RpcController controller,
3819 final RollWALWriterRequest request) throws ServiceException {
3820 try {
3821 requestCount.increment();
3822 HLog wal = this.getWAL();
3823 byte[][] regionsToFlush = wal.rollWriter(true);
3824 RollWALWriterResponse.Builder builder = RollWALWriterResponse.newBuilder();
3825 if (regionsToFlush != null) {
3826 for (byte[] region: regionsToFlush) {
3827 builder.addRegionToFlush(ByteString.copyFrom(region));
3828 }
3829 }
3830 return builder.build();
3831 } catch (IOException ie) {
3832 throw new ServiceException(ie);
3833 }
3834 }
3835
3836
3837
3838
3839
3840
3841
3842
3843 @Override
3844 public StopServerResponse stopServer(final RpcController controller,
3845 final StopServerRequest request) throws ServiceException {
3846 requestCount.increment();
3847 String reason = request.getReason();
3848 stop(reason);
3849 return StopServerResponse.newBuilder().build();
3850 }
3851
3852
3853
3854
3855
3856
3857
3858
3859 @Override
3860 public GetServerInfoResponse getServerInfo(final RpcController controller,
3861 final GetServerInfoRequest request) throws ServiceException {
3862 ServerName serverName = getServerName();
3863 requestCount.increment();
3864 return ResponseConverter.buildGetServerInfoResponse(serverName, webuiport);
3865 }
3866
3867
3868
3869
3870
3871
3872
3873
3874
3875
3876
3877 protected HRegion getRegion(
3878 final RegionSpecifier regionSpecifier) throws IOException {
3879 return getRegionByEncodedName(
3880 ProtobufUtil.getRegionEncodedName(regionSpecifier));
3881 }
3882
3883
3884
3885
3886
3887
3888
3889
3890
3891
3892
3893 protected Result append(final HRegion region,
3894 final MutationProto m, final CellScanner cellScanner) throws IOException {
3895 long before = EnvironmentEdgeManager.currentTimeMillis();
3896 Append append = ProtobufUtil.toAppend(m, cellScanner);
3897 Result r = null;
3898 if (region.getCoprocessorHost() != null) {
3899 r = region.getCoprocessorHost().preAppend(append);
3900 }
3901 if (r == null) {
3902 r = region.append(append);
3903 if (region.getCoprocessorHost() != null) {
3904 region.getCoprocessorHost().postAppend(append, r);
3905 }
3906 }
3907 metricsRegionServer.updateAppend(EnvironmentEdgeManager.currentTimeMillis() - before);
3908 return r;
3909 }
3910
3911
3912
3913
3914
3915
3916
3917
3918
3919 protected Result increment(final HRegion region, final MutationProto mutation,
3920 final CellScanner cells)
3921 throws IOException {
3922 long before = EnvironmentEdgeManager.currentTimeMillis();
3923 Increment increment = ProtobufUtil.toIncrement(mutation, cells);
3924 Result r = null;
3925 if (region.getCoprocessorHost() != null) {
3926 r = region.getCoprocessorHost().preIncrement(increment);
3927 }
3928 if (r == null) {
3929 r = region.increment(increment);
3930 if (region.getCoprocessorHost() != null) {
3931 r = region.getCoprocessorHost().postIncrement(increment, r);
3932 }
3933 }
3934 metricsRegionServer.updateIncrement(EnvironmentEdgeManager.currentTimeMillis() - before);
3935 return r;
3936 }
3937
3938
3939
3940
3941 protected void doBatchOp(final MultiResponse.Builder builder,
3942 final HRegion region, final List<MutationProto> mutates, final CellScanner cells) {
3943 doBatchOp(builder, region, mutates, cells, false);
3944 }
3945
3946
3947
3948
3949
3950
3951
3952
3953 protected void doBatchOp(final MultiResponse.Builder builder, final HRegion region,
3954 final List<MutationProto> mutations, final CellScanner cells, boolean isReplay) {
3955 @SuppressWarnings("unchecked")
3956 Pair<Mutation, Integer>[] mutationsWithLocks = new Pair[mutations.size()];
3957 long before = EnvironmentEdgeManager.currentTimeMillis();
3958 boolean batchContainsPuts = false, batchContainsDelete = false;
3959 try {
3960 ActionResult.Builder resultBuilder = ActionResult.newBuilder();
3961 resultBuilder.setValue(ClientProtos.Result.newBuilder().build());
3962 ActionResult result = resultBuilder.build();
3963 int i = 0;
3964 for (MutationProto m : mutations) {
3965 Mutation mutation;
3966 if (m.getMutateType() == MutationType.PUT) {
3967 mutation = ProtobufUtil.toPut(m, cells);
3968 batchContainsPuts = true;
3969 } else {
3970 mutation = ProtobufUtil.toDelete(m, cells);
3971 batchContainsDelete = true;
3972 }
3973 mutationsWithLocks[i++] = new Pair<Mutation, Integer>(mutation, null);
3974 builder.addResult(result);
3975 }
3976
3977 requestCount.add(mutations.size());
3978 if (!region.getRegionInfo().isMetaTable()) {
3979 cacheFlusher.reclaimMemStoreMemory();
3980 }
3981
3982 OperationStatus codes[] = region.batchMutate(mutationsWithLocks, isReplay);
3983 for (i = 0; i < codes.length; i++) {
3984 switch (codes[i].getOperationStatusCode()) {
3985 case BAD_FAMILY:
3986 result = ResponseConverter.buildActionResult(
3987 new NoSuchColumnFamilyException(codes[i].getExceptionMsg()));
3988 builder.setResult(i, result);
3989 break;
3990
3991 case SANITY_CHECK_FAILURE:
3992 result = ResponseConverter.buildActionResult(
3993 new FailedSanityCheckException(codes[i].getExceptionMsg()));
3994 builder.setResult(i, result);
3995 break;
3996
3997 default:
3998 result = ResponseConverter.buildActionResult(
3999 new DoNotRetryIOException(codes[i].getExceptionMsg()));
4000 builder.setResult(i, result);
4001 break;
4002
4003 case SUCCESS:
4004 break;
4005 }
4006 }
4007 } catch (IOException ie) {
4008 ActionResult result = ResponseConverter.buildActionResult(ie);
4009 for (int i = 0; i < mutations.size(); i++) {
4010 builder.setResult(i, result);
4011 }
4012 }
4013 long after = EnvironmentEdgeManager.currentTimeMillis();
4014 if (batchContainsPuts) {
4015 metricsRegionServer.updatePut(after - before);
4016 }
4017 if (batchContainsDelete) {
4018 metricsRegionServer.updateDelete(after - before);
4019 }
4020 }
4021
4022
4023
4024
4025
4026
4027
4028
4029
4030 protected void mutateRows(final HRegion region, final List<MutationProto> mutations,
4031 final CellScanner cellScanner)
4032 throws IOException {
4033 MutationProto firstMutate = mutations.get(0);
4034 if (!region.getRegionInfo().isMetaTable()) {
4035 cacheFlusher.reclaimMemStoreMemory();
4036 }
4037 byte [] row = firstMutate.getRow().toByteArray();
4038 RowMutations rm = new RowMutations(row);
4039 for (MutationProto mutate: mutations) {
4040 MutationType type = mutate.getMutateType();
4041 switch (mutate.getMutateType()) {
4042 case PUT:
4043 rm.add(ProtobufUtil.toPut(mutate, cellScanner));
4044 break;
4045 case DELETE:
4046 rm.add(ProtobufUtil.toDelete(mutate, cellScanner));
4047 break;
4048 default:
4049 throw new DoNotRetryIOException(
4050 "mutate supports atomic put and/or delete, not "
4051 + type.name());
4052 }
4053 }
4054 region.mutateRow(rm);
4055 }
4056
4057 private static class MovedRegionInfo {
4058 private final ServerName serverName;
4059 private final long seqNum;
4060 private final long ts;
4061
4062 public MovedRegionInfo(ServerName serverName, long closeSeqNum) {
4063 this.serverName = serverName;
4064 this.seqNum = closeSeqNum;
4065 ts = EnvironmentEdgeManager.currentTimeMillis();
4066 }
4067
4068 public ServerName getServerName() {
4069 return serverName;
4070 }
4071
4072 public long getSeqNum() {
4073 return seqNum;
4074 }
4075
4076 public long getMoveTime() {
4077 return ts;
4078 }
4079 }
4080
4081
4082
4083 protected Map<String, MovedRegionInfo> movedRegions =
4084 new ConcurrentHashMap<String, MovedRegionInfo>(3000);
4085
4086
4087
4088 private static final int TIMEOUT_REGION_MOVED = (2 * 60 * 1000);
4089
4090 protected void addToMovedRegions(String encodedName, ServerName destination, long closeSeqNum) {
4091 if (ServerName.isSameHostnameAndPort(destination, this.getServerName())) {
4092 LOG.warn("Not adding moved region record: " + encodedName + " to self.");
4093 return;
4094 }
4095 LOG.info("Adding moved region record: " + encodedName + " to "
4096 + destination.getServerName() + ":" + destination.getPort()
4097 + " as of " + closeSeqNum);
4098 movedRegions.put(encodedName, new MovedRegionInfo(destination, closeSeqNum));
4099 }
4100
4101 private void removeFromMovedRegions(String encodedName) {
4102 movedRegions.remove(encodedName);
4103 }
4104
4105 private MovedRegionInfo getMovedRegion(final String encodedRegionName) {
4106 MovedRegionInfo dest = movedRegions.get(encodedRegionName);
4107
4108 long now = EnvironmentEdgeManager.currentTimeMillis();
4109 if (dest != null) {
4110 if (dest.getMoveTime() > (now - TIMEOUT_REGION_MOVED)) {
4111 return dest;
4112 } else {
4113 movedRegions.remove(encodedRegionName);
4114 }
4115 }
4116
4117 return null;
4118 }
4119
4120
4121
4122
4123 protected void cleanMovedRegions() {
4124 final long cutOff = System.currentTimeMillis() - TIMEOUT_REGION_MOVED;
4125 Iterator<Entry<String, MovedRegionInfo>> it = movedRegions.entrySet().iterator();
4126
4127 while (it.hasNext()){
4128 Map.Entry<String, MovedRegionInfo> e = it.next();
4129 if (e.getValue().getMoveTime() < cutOff) {
4130 it.remove();
4131 }
4132 }
4133 }
4134
4135
4136
4137
4138 protected static class MovedRegionsCleaner extends Chore implements Stoppable {
4139 private HRegionServer regionServer;
4140 Stoppable stoppable;
4141
4142 private MovedRegionsCleaner(
4143 HRegionServer regionServer, Stoppable stoppable){
4144 super("MovedRegionsCleaner for region "+regionServer, TIMEOUT_REGION_MOVED, stoppable);
4145 this.regionServer = regionServer;
4146 this.stoppable = stoppable;
4147 }
4148
4149 static MovedRegionsCleaner createAndStart(HRegionServer rs){
4150 Stoppable stoppable = new Stoppable() {
4151 private volatile boolean isStopped = false;
4152 @Override public void stop(String why) { isStopped = true;}
4153 @Override public boolean isStopped() {return isStopped;}
4154 };
4155
4156 return new MovedRegionsCleaner(rs, stoppable);
4157 }
4158
4159 @Override
4160 protected void chore() {
4161 regionServer.cleanMovedRegions();
4162 }
4163
4164 @Override
4165 public void stop(String why) {
4166 stoppable.stop(why);
4167 }
4168
4169 @Override
4170 public boolean isStopped() {
4171 return stoppable.isStopped();
4172 }
4173 }
4174
4175 private String getMyEphemeralNodePath() {
4176 return ZKUtil.joinZNode(this.zooKeeper.rsZNode, getServerName().toString());
4177 }
4178
4179
4180
4181
4182 private static class RegionScannerHolder {
4183 private RegionScanner s;
4184 private long nextCallSeq = 0L;
4185
4186 public RegionScannerHolder(RegionScanner s) {
4187 this.s = s;
4188 }
4189 }
4190
4191 private boolean isHealthCheckerConfigured() {
4192 String healthScriptLocation = this.conf.get(HConstants.HEALTH_SCRIPT_LOC);
4193 return org.apache.commons.lang.StringUtils.isNotBlank(healthScriptLocation);
4194 }
4195
4196
4197
4198
4199 public CompactSplitThread getCompactSplitThread() {
4200 return this.compactSplitThread;
4201 }
4202
4203
4204
4205
4206
4207
4208
4209
4210 private boolean isRegionMarkedRecoveringInZK(String regionEncodedName) throws KeeperException {
4211 boolean result = false;
4212 String nodePath = ZKUtil.joinZNode(this.zooKeeper.recoveringRegionsZNode, regionEncodedName);
4213
4214 byte[] node = ZKUtil.getDataAndWatch(this.zooKeeper, nodePath);
4215 if (node != null) {
4216 result = true;
4217 }
4218
4219 return result;
4220 }
4221
4222
4223
4224
4225
4226
4227
4228
4229 private void updateRecoveringRegionLastFlushedSequenceId(HRegion r) throws KeeperException,
4230 IOException {
4231 if (!r.isRecovering()) {
4232
4233 return;
4234 }
4235
4236 HRegionInfo region = r.getRegionInfo();
4237 ZooKeeperWatcher zkw = getZooKeeper();
4238 String previousRSName = this.getLastFailedRSFromZK(region.getEncodedName());
4239 long minSeqIdForLogReplay = r.getMinSeqIdForLogReplay();
4240 long lastRecordedFlushedSequenceId = -1;
4241 String nodePath = ZKUtil.joinZNode(this.zooKeeper.recoveringRegionsZNode,
4242 region.getEncodedName());
4243
4244 byte[] data = ZKUtil.getData(zkw, nodePath);
4245 if (data != null) {
4246 lastRecordedFlushedSequenceId = SplitLogManager.parseLastFlushedSequenceIdFrom(data);
4247 }
4248 if (data == null || lastRecordedFlushedSequenceId < minSeqIdForLogReplay) {
4249 ZKUtil.setData(zkw, nodePath, ZKUtil.positionToByteArray(minSeqIdForLogReplay));
4250 }
4251 if (previousRSName != null) {
4252
4253 nodePath = ZKUtil.joinZNode(nodePath, previousRSName);
4254 ZKUtil.setData(zkw, nodePath, ZKUtil.positionToByteArray(minSeqIdForLogReplay));
4255 LOG.debug("Update last flushed sequence id of region " + region.getEncodedName() + " for "
4256 + previousRSName);
4257 } else {
4258 LOG.warn("Can't find failed region server for recovering region " + region.getEncodedName());
4259 }
4260 }
4261
4262
4263
4264
4265
4266
4267
4268
4269 private String getLastFailedRSFromZK(String encodedRegionName) throws KeeperException {
4270 String result = null;
4271 long maxZxid = 0;
4272 ZooKeeperWatcher zkw = this.getZooKeeper();
4273 String nodePath = ZKUtil.joinZNode(zkw.recoveringRegionsZNode, encodedRegionName);
4274 List<String> failedServers = ZKUtil.listChildrenNoWatch(zkw, nodePath);
4275 if (failedServers == null || failedServers.isEmpty()) {
4276 return result;
4277 }
4278 for (String failedServer : failedServers) {
4279 String rsPath = ZKUtil.joinZNode(nodePath, failedServer);
4280 Stat stat = new Stat();
4281 ZKUtil.getDataNoWatch(zkw, rsPath, stat);
4282 if (maxZxid < stat.getCzxid()) {
4283 maxZxid = stat.getCzxid();
4284 result = failedServer;
4285 }
4286 }
4287 return result;
4288 }
4289 }