1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.tool;
21
22 import java.io.Closeable;
23 import java.io.IOException;
24 import java.util.ArrayList;
25 import java.util.Arrays;
26 import java.util.Collections;
27 import java.util.HashMap;
28 import java.util.HashSet;
29 import java.util.LinkedList;
30 import java.util.List;
31 import java.util.Map;
32 import java.util.Random;
33 import java.util.Set;
34 import java.util.TreeSet;
35 import java.util.concurrent.atomic.AtomicLong;
36 import java.util.concurrent.Callable;
37 import java.util.concurrent.ExecutionException;
38 import java.util.concurrent.ExecutorService;
39 import java.util.concurrent.Future;
40 import java.util.concurrent.ScheduledThreadPoolExecutor;
41 import java.util.regex.Matcher;
42 import java.util.regex.Pattern;
43
44 import org.apache.commons.lang.time.StopWatch;
45 import org.apache.commons.logging.Log;
46 import org.apache.commons.logging.LogFactory;
47 import org.apache.hadoop.conf.Configuration;
48 import org.apache.hadoop.hbase.AuthUtil;
49 import org.apache.hadoop.hbase.ChoreService;
50 import org.apache.hadoop.hbase.DoNotRetryIOException;
51 import org.apache.hadoop.hbase.HBaseConfiguration;
52 import org.apache.hadoop.hbase.HColumnDescriptor;
53 import org.apache.hadoop.hbase.HConstants;
54 import org.apache.hadoop.hbase.HRegionInfo;
55 import org.apache.hadoop.hbase.HRegionLocation;
56 import org.apache.hadoop.hbase.HTableDescriptor;
57 import org.apache.hadoop.hbase.NamespaceDescriptor;
58 import org.apache.hadoop.hbase.ScheduledChore;
59 import org.apache.hadoop.hbase.ServerName;
60 import org.apache.hadoop.hbase.TableName;
61 import org.apache.hadoop.hbase.TableNotEnabledException;
62 import org.apache.hadoop.hbase.TableNotFoundException;
63 import org.apache.hadoop.hbase.client.Admin;
64 import org.apache.hadoop.hbase.client.Connection;
65 import org.apache.hadoop.hbase.client.ConnectionFactory;
66 import org.apache.hadoop.hbase.client.Get;
67 import org.apache.hadoop.hbase.client.Put;
68 import org.apache.hadoop.hbase.client.RegionLocator;
69 import org.apache.hadoop.hbase.client.ResultScanner;
70 import org.apache.hadoop.hbase.client.Scan;
71 import org.apache.hadoop.hbase.client.Table;
72 import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
73 import org.apache.hadoop.hbase.tool.Canary.RegionTask.TaskType;
74 import org.apache.hadoop.hbase.util.Bytes;
75 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
76 import org.apache.hadoop.hbase.util.ReflectionUtils;
77 import org.apache.hadoop.hbase.util.RegionSplitter;
78 import org.apache.hadoop.util.GenericOptionsParser;
79 import org.apache.hadoop.util.Tool;
80 import org.apache.hadoop.util.ToolRunner;
81
82
83
84
85
86
87
88
89
90
91
92
93 public final class Canary implements Tool {
94
95 public interface Sink {
96 public long getReadFailureCount();
97 public long incReadFailureCount();
98 public void publishReadFailure(HRegionInfo region, Exception e);
99 public void publishReadFailure(HRegionInfo region, HColumnDescriptor column, Exception e);
100 public void publishReadTiming(HRegionInfo region, HColumnDescriptor column, long msTime);
101 public long getWriteFailureCount();
102 public void publishWriteFailure(HRegionInfo region, Exception e);
103 public void publishWriteFailure(HRegionInfo region, HColumnDescriptor column, Exception e);
104 public void publishWriteTiming(HRegionInfo region, HColumnDescriptor column, long msTime);
105 }
106
107
108 public interface ExtendedSink extends Sink {
109 public void publishReadFailure(String table, String server);
110 public void publishReadTiming(String table, String server, long msTime);
111 }
112
113
114
115 public static class StdOutSink implements Sink {
116 private AtomicLong readFailureCount = new AtomicLong(0),
117 writeFailureCount = new AtomicLong(0);
118
119 @Override
120 public long getReadFailureCount() {
121 return readFailureCount.get();
122 }
123
124 @Override
125 public long incReadFailureCount() {
126 return readFailureCount.incrementAndGet();
127 }
128
129 @Override
130 public void publishReadFailure(HRegionInfo region, Exception e) {
131 readFailureCount.incrementAndGet();
132 LOG.error(String.format("read from region %s failed", region.getRegionNameAsString()), e);
133 }
134
135 @Override
136 public void publishReadFailure(HRegionInfo region, HColumnDescriptor column, Exception e) {
137 readFailureCount.incrementAndGet();
138 LOG.error(String.format("read from region %s column family %s failed",
139 region.getRegionNameAsString(), column.getNameAsString()), e);
140 }
141
142 @Override
143 public void publishReadTiming(HRegionInfo region, HColumnDescriptor column, long msTime) {
144 LOG.info(String.format("read from region %s column family %s in %dms",
145 region.getRegionNameAsString(), column.getNameAsString(), msTime));
146 }
147
148 @Override
149 public long getWriteFailureCount() {
150 return writeFailureCount.get();
151 }
152
153 @Override
154 public void publishWriteFailure(HRegionInfo region, Exception e) {
155 writeFailureCount.incrementAndGet();
156 LOG.error(String.format("write to region %s failed", region.getRegionNameAsString()), e);
157 }
158
159 @Override
160 public void publishWriteFailure(HRegionInfo region, HColumnDescriptor column, Exception e) {
161 writeFailureCount.incrementAndGet();
162 LOG.error(String.format("write to region %s column family %s failed",
163 region.getRegionNameAsString(), column.getNameAsString()), e);
164 }
165
166 @Override
167 public void publishWriteTiming(HRegionInfo region, HColumnDescriptor column, long msTime) {
168 LOG.info(String.format("write to region %s column family %s in %dms",
169 region.getRegionNameAsString(), column.getNameAsString(), msTime));
170 }
171 }
172
173 public static class RegionServerStdOutSink extends StdOutSink implements ExtendedSink {
174
175 @Override
176 public void publishReadFailure(String table, String server) {
177 incReadFailureCount();
178 LOG.error(String.format("Read from table:%s on region server:%s", table, server));
179 }
180
181 @Override
182 public void publishReadTiming(String table, String server, long msTime) {
183 LOG.info(String.format("Read from table:%s on region server:%s in %dms",
184 table, server, msTime));
185 }
186 }
187
188
189
190
191
192 static class RegionTask implements Callable<Void> {
193 public enum TaskType{
194 READ, WRITE
195 }
196 private Connection connection;
197 private HRegionInfo region;
198 private Sink sink;
199 private TaskType taskType;
200
201 RegionTask(Connection connection, HRegionInfo region, Sink sink, TaskType taskType) {
202 this.connection = connection;
203 this.region = region;
204 this.sink = sink;
205 this.taskType = taskType;
206 }
207
208 @Override
209 public Void call() {
210 switch (taskType) {
211 case READ:
212 return read();
213 case WRITE:
214 return write();
215 default:
216 return read();
217 }
218 }
219
220 public Void read() {
221 Table table = null;
222 HTableDescriptor tableDesc = null;
223 try {
224 table = connection.getTable(region.getTable());
225 tableDesc = table.getTableDescriptor();
226 } catch (IOException e) {
227 LOG.debug("sniffRegion failed", e);
228 sink.publishReadFailure(region, e);
229 if (table != null) {
230 try {
231 table.close();
232 } catch (IOException ioe) {
233 LOG.error("Close table failed", e);
234 }
235 }
236 return null;
237 }
238
239 byte[] startKey = null;
240 Get get = null;
241 Scan scan = null;
242 ResultScanner rs = null;
243 StopWatch stopWatch = new StopWatch();
244 for (HColumnDescriptor column : tableDesc.getColumnFamilies()) {
245 stopWatch.reset();
246 startKey = region.getStartKey();
247
248 if (startKey.length > 0) {
249 get = new Get(startKey);
250 get.setCacheBlocks(false);
251 get.setFilter(new FirstKeyOnlyFilter());
252 get.addFamily(column.getName());
253 } else {
254 scan = new Scan();
255 scan.setCaching(1);
256 scan.setCacheBlocks(false);
257 scan.setFilter(new FirstKeyOnlyFilter());
258 scan.addFamily(column.getName());
259 scan.setMaxResultSize(1L);
260 }
261
262 try {
263 if (startKey.length > 0) {
264 stopWatch.start();
265 table.get(get);
266 stopWatch.stop();
267 sink.publishReadTiming(region, column, stopWatch.getTime());
268 } else {
269 stopWatch.start();
270 rs = table.getScanner(scan);
271 stopWatch.stop();
272 sink.publishReadTiming(region, column, stopWatch.getTime());
273 }
274 } catch (Exception e) {
275 sink.publishReadFailure(region, column, e);
276 } finally {
277 if (rs != null) {
278 rs.close();
279 }
280 scan = null;
281 get = null;
282 startKey = null;
283 }
284 }
285 try {
286 table.close();
287 } catch (IOException e) {
288 LOG.error("Close table failed", e);
289 }
290 return null;
291 }
292
293
294
295
296
297 private Void write() {
298 Table table = null;
299 HTableDescriptor tableDesc = null;
300 try {
301 table = connection.getTable(region.getTable());
302 tableDesc = table.getTableDescriptor();
303 byte[] rowToCheck = region.getStartKey();
304 if (rowToCheck.length == 0) {
305 rowToCheck = new byte[]{0x0};
306 }
307 int writeValueSize =
308 connection.getConfiguration().getInt(HConstants.HBASE_CANARY_WRITE_VALUE_SIZE_KEY, 10);
309 for (HColumnDescriptor column : tableDesc.getColumnFamilies()) {
310 Put put = new Put(rowToCheck);
311 byte[] value = new byte[writeValueSize];
312 Bytes.random(value);
313 put.addColumn(column.getName(), HConstants.EMPTY_BYTE_ARRAY, value);
314 try {
315 long startTime = System.currentTimeMillis();
316 table.put(put);
317 long time = System.currentTimeMillis() - startTime;
318 sink.publishWriteTiming(region, column, time);
319 } catch (Exception e) {
320 sink.publishWriteFailure(region, column, e);
321 }
322 }
323 table.close();
324 } catch (IOException e) {
325 sink.publishWriteFailure(region, e);
326 }
327 return null;
328 }
329 }
330
331
332
333
334 static class RegionServerTask implements Callable<Void> {
335 private Connection connection;
336 private String serverName;
337 private HRegionInfo region;
338 private ExtendedSink sink;
339 private AtomicLong successes;
340
341 RegionServerTask(Connection connection, String serverName, HRegionInfo region,
342 ExtendedSink sink, AtomicLong successes) {
343 this.connection = connection;
344 this.serverName = serverName;
345 this.region = region;
346 this.sink = sink;
347 this.successes = successes;
348 }
349
350 @Override
351 public Void call() {
352 TableName tableName = null;
353 Table table = null;
354 Get get = null;
355 byte[] startKey = null;
356 Scan scan = null;
357 StopWatch stopWatch = new StopWatch();
358
359 stopWatch.reset();
360 try {
361 tableName = region.getTable();
362 table = connection.getTable(tableName);
363 startKey = region.getStartKey();
364
365 if (startKey.length > 0) {
366 get = new Get(startKey);
367 get.setCacheBlocks(false);
368 get.setFilter(new FirstKeyOnlyFilter());
369 stopWatch.start();
370 table.get(get);
371 stopWatch.stop();
372 } else {
373 scan = new Scan();
374 scan.setCacheBlocks(false);
375 scan.setFilter(new FirstKeyOnlyFilter());
376 scan.setCaching(1);
377 scan.setMaxResultSize(1L);
378 stopWatch.start();
379 ResultScanner s = table.getScanner(scan);
380 s.close();
381 stopWatch.stop();
382 }
383 successes.incrementAndGet();
384 sink.publishReadTiming(tableName.getNameAsString(), serverName, stopWatch.getTime());
385 } catch (TableNotFoundException tnfe) {
386 LOG.error("Table may be deleted", tnfe);
387
388 } catch (TableNotEnabledException tnee) {
389
390 successes.incrementAndGet();
391 LOG.debug("The targeted table was disabled. Assuming success.");
392 } catch (DoNotRetryIOException dnrioe) {
393 sink.publishReadFailure(tableName.getNameAsString(), serverName);
394 LOG.error(dnrioe);
395 } catch (IOException e) {
396 sink.publishReadFailure(tableName.getNameAsString(), serverName);
397 LOG.error(e);
398 } finally {
399 if (table != null) {
400 try {
401 table.close();
402 } catch (IOException e) {
403 LOG.error("Close table failed", e);
404 }
405 }
406 scan = null;
407 get = null;
408 startKey = null;
409 }
410 return null;
411 }
412 }
413
414 private static final int USAGE_EXIT_CODE = 1;
415 private static final int INIT_ERROR_EXIT_CODE = 2;
416 private static final int TIMEOUT_ERROR_EXIT_CODE = 3;
417 private static final int ERROR_EXIT_CODE = 4;
418 private static final int FAILURE_EXIT_CODE = 5;
419
420 private static final long DEFAULT_INTERVAL = 60000;
421
422 private static final long DEFAULT_TIMEOUT = 600000;
423 private static final int MAX_THREADS_NUM = 16;
424
425 private static final Log LOG = LogFactory.getLog(Canary.class);
426
427 public static final TableName DEFAULT_WRITE_TABLE_NAME = TableName.valueOf(
428 NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR, "canary");
429
430 private static final String CANARY_TABLE_FAMILY_NAME = "Test";
431
432 private Configuration conf = null;
433 private long interval = 0;
434 private Sink sink = null;
435
436 private boolean useRegExp;
437 private long timeout = DEFAULT_TIMEOUT;
438 private boolean failOnError = true;
439 private boolean regionServerMode = false;
440 private boolean regionServerAllRegions = false;
441 private boolean writeSniffing = false;
442 private boolean treatFailureAsError = false;
443 private TableName writeTableName = DEFAULT_WRITE_TABLE_NAME;
444
445 private ExecutorService executor;
446
447 public Canary() {
448 this(new ScheduledThreadPoolExecutor(1), new RegionServerStdOutSink());
449 }
450
451 public Canary(ExecutorService executor, Sink sink) {
452 this.executor = executor;
453 this.sink = sink;
454 }
455
456 @Override
457 public Configuration getConf() {
458 return conf;
459 }
460
461 @Override
462 public void setConf(Configuration conf) {
463 this.conf = conf;
464 }
465
466 private int parseArgs(String[] args) {
467 int index = -1;
468
469 for (int i = 0; i < args.length; i++) {
470 String cmd = args[i];
471
472 if (cmd.startsWith("-")) {
473 if (index >= 0) {
474
475 System.err.println("Invalid command line options");
476 printUsageAndExit();
477 }
478
479 if (cmd.equals("-help")) {
480
481 printUsageAndExit();
482 } else if (cmd.equals("-daemon") && interval == 0) {
483
484 interval = DEFAULT_INTERVAL;
485 } else if (cmd.equals("-interval")) {
486
487 i++;
488
489 if (i == args.length) {
490 System.err.println("-interval needs a numeric value argument.");
491 printUsageAndExit();
492 }
493
494 try {
495 interval = Long.parseLong(args[i]) * 1000;
496 } catch (NumberFormatException e) {
497 System.err.println("-interval needs a numeric value argument.");
498 printUsageAndExit();
499 }
500 } else if(cmd.equals("-regionserver")) {
501 this.regionServerMode = true;
502 } else if(cmd.equals("-allRegions")) {
503 this.regionServerAllRegions = true;
504 } else if(cmd.equals("-writeSniffing")) {
505 this.writeSniffing = true;
506 } else if(cmd.equals("-treatFailureAsError")) {
507 this.treatFailureAsError = true;
508 } else if (cmd.equals("-e")) {
509 this.useRegExp = true;
510 } else if (cmd.equals("-t")) {
511 i++;
512
513 if (i == args.length) {
514 System.err.println("-t needs a numeric value argument.");
515 printUsageAndExit();
516 }
517
518 try {
519 this.timeout = Long.parseLong(args[i]);
520 } catch (NumberFormatException e) {
521 System.err.println("-t needs a numeric value argument.");
522 printUsageAndExit();
523 }
524 } else if (cmd.equals("-writeTable")) {
525 i++;
526
527 if (i == args.length) {
528 System.err.println("-writeTable needs a string value argument.");
529 printUsageAndExit();
530 }
531 this.writeTableName = TableName.valueOf(args[i]);
532 } else if (cmd.equals("-f")) {
533 i++;
534
535 if (i == args.length) {
536 System.err
537 .println("-f needs a boolean value argument (true|false).");
538 printUsageAndExit();
539 }
540
541 this.failOnError = Boolean.parseBoolean(args[i]);
542 } else {
543
544 System.err.println(cmd + " options is invalid.");
545 printUsageAndExit();
546 }
547 } else if (index < 0) {
548
549 index = i;
550 }
551 }
552 if (this.regionServerAllRegions && !this.regionServerMode) {
553 System.err.println("-allRegions can only be specified in regionserver mode.");
554 printUsageAndExit();
555 }
556 return index;
557 }
558
559 @Override
560 public int run(String[] args) throws Exception {
561 int index = parseArgs(args);
562 ChoreService choreService = null;
563
564
565
566
567 final ScheduledChore authChore = AuthUtil.getAuthChore(conf);
568 if (authChore != null) {
569 choreService = new ChoreService("CANARY_TOOL");
570 choreService.scheduleChore(authChore);
571 }
572
573
574 Monitor monitor = null;
575 Thread monitorThread = null;
576 long startTime = 0;
577 long currentTimeLength = 0;
578
579
580
581 try (Connection connection = ConnectionFactory.createConnection(this.conf)) {
582 do {
583
584 try {
585 monitor = this.newMonitor(connection, index, args);
586 monitorThread = new Thread(monitor, "CanaryMonitor-" + System.currentTimeMillis());
587 startTime = System.currentTimeMillis();
588 monitorThread.start();
589 while (!monitor.isDone()) {
590
591 Thread.sleep(1000);
592
593 if (this.failOnError && monitor.hasError()) {
594 monitorThread.interrupt();
595 if (monitor.initialized) {
596 return monitor.errorCode;
597 } else {
598 return INIT_ERROR_EXIT_CODE;
599 }
600 }
601 currentTimeLength = System.currentTimeMillis() - startTime;
602 if (currentTimeLength > this.timeout) {
603 LOG.error("The monitor is running too long (" + currentTimeLength
604 + ") after timeout limit:" + this.timeout
605 + " will be killed itself !!");
606 if (monitor.initialized) {
607 return TIMEOUT_ERROR_EXIT_CODE;
608 } else {
609 return INIT_ERROR_EXIT_CODE;
610 }
611 }
612 }
613
614 if (this.failOnError && monitor.finalCheckForErrors()) {
615 monitorThread.interrupt();
616 return monitor.errorCode;
617 }
618 } finally {
619 if (monitor != null) monitor.close();
620 }
621
622 Thread.sleep(interval);
623 } while (interval > 0);
624 }
625
626 if (choreService != null) {
627 choreService.shutdown();
628 }
629 return monitor.errorCode;
630 }
631
632 private void printUsageAndExit() {
633 System.err.printf(
634 "Usage: bin/hbase %s [opts] [table1 [table2]...] | [regionserver1 [regionserver2]..]%n",
635 getClass().getName());
636 System.err.println(" where [opts] are:");
637 System.err.println(" -help Show this help and exit.");
638 System.err.println(" -regionserver replace the table argument to regionserver,");
639 System.err.println(" which means to enable regionserver mode");
640 System.err.println(" -allRegions Tries all regions on a regionserver,");
641 System.err.println(" only works in regionserver mode.");
642 System.err.println(" -daemon Continuous check at defined intervals.");
643 System.err.println(" -interval <N> Interval between checks (sec)");
644 System.err.println(" -e Use region/regionserver as regular expression");
645 System.err.println(" which means the region/regionserver is regular expression pattern");
646 System.err.println(" -f <B> stop whole program if first error occurs," +
647 " default is true");
648 System.err.println(" -t <N> timeout for a check, default is 600000 (milisecs)");
649 System.err.println(" -writeSniffing enable the write sniffing in canary");
650 System.err.println(" -treatFailureAsError treats read / write failure as error");
651 System.err.println(" -writeTable The table used for write sniffing."
652 + " Default is hbase:canary");
653 System.err
654 .println(" -D<configProperty>=<value> assigning or override the configuration params");
655 System.exit(USAGE_EXIT_CODE);
656 }
657
658
659
660
661
662
663
664
665 public Monitor newMonitor(final Connection connection, int index, String[] args) {
666 Monitor monitor = null;
667 String[] monitorTargets = null;
668
669 if(index >= 0) {
670 int length = args.length - index;
671 monitorTargets = new String[length];
672 System.arraycopy(args, index, monitorTargets, 0, length);
673 }
674
675 if (this.regionServerMode) {
676 monitor =
677 new RegionServerMonitor(connection, monitorTargets, this.useRegExp,
678 (ExtendedSink) this.sink, this.executor, this.regionServerAllRegions,
679 this.treatFailureAsError);
680 } else {
681 monitor =
682 new RegionMonitor(connection, monitorTargets, this.useRegExp, this.sink, this.executor,
683 this.writeSniffing, this.writeTableName, this.treatFailureAsError);
684 }
685 return monitor;
686 }
687
688
689 public static abstract class Monitor implements Runnable, Closeable {
690
691 protected Connection connection;
692 protected Admin admin;
693 protected String[] targets;
694 protected boolean useRegExp;
695 protected boolean treatFailureAsError;
696 protected boolean initialized = false;
697
698 protected boolean done = false;
699 protected int errorCode = 0;
700 protected Sink sink;
701 protected ExecutorService executor;
702
703 public boolean isDone() {
704 return done;
705 }
706
707 public boolean hasError() {
708 return errorCode != 0;
709 }
710
711 public boolean finalCheckForErrors() {
712 if (errorCode != 0) {
713 return true;
714 }
715 if (treatFailureAsError &&
716 (sink.getReadFailureCount() > 0 || sink.getWriteFailureCount() > 0)) {
717 errorCode = FAILURE_EXIT_CODE;
718 return true;
719 }
720 return false;
721 }
722
723 @Override
724 public void close() throws IOException {
725 if (this.admin != null) this.admin.close();
726 }
727
728 protected Monitor(Connection connection, String[] monitorTargets, boolean useRegExp, Sink sink,
729 ExecutorService executor, boolean treatFailureAsError) {
730 if (null == connection) throw new IllegalArgumentException("connection shall not be null");
731
732 this.connection = connection;
733 this.targets = monitorTargets;
734 this.useRegExp = useRegExp;
735 this.treatFailureAsError = treatFailureAsError;
736 this.sink = sink;
737 this.executor = executor;
738 }
739
740 public abstract void run();
741
742 protected boolean initAdmin() {
743 if (null == this.admin) {
744 try {
745 this.admin = this.connection.getAdmin();
746 } catch (Exception e) {
747 LOG.error("Initial HBaseAdmin failed...", e);
748 this.errorCode = INIT_ERROR_EXIT_CODE;
749 }
750 } else if (admin.isAborted()) {
751 LOG.error("HBaseAdmin aborted");
752 this.errorCode = INIT_ERROR_EXIT_CODE;
753 }
754 return !this.hasError();
755 }
756 }
757
758
759 private static class RegionMonitor extends Monitor {
760
761 private static final int DEFAULT_WRITE_TABLE_CHECK_PERIOD = 10 * 60 * 1000;
762
763 private static final int DEFAULT_WRITE_DATA_TTL = 24 * 60 * 60;
764
765 private long lastCheckTime = -1;
766 private boolean writeSniffing;
767 private TableName writeTableName;
768 private int writeDataTTL;
769 private float regionsLowerLimit;
770 private float regionsUpperLimit;
771 private int checkPeriod;
772
773 public RegionMonitor(Connection connection, String[] monitorTargets, boolean useRegExp,
774 Sink sink, ExecutorService executor, boolean writeSniffing, TableName writeTableName,
775 boolean treatFailureAsError) {
776 super(connection, monitorTargets, useRegExp, sink, executor, treatFailureAsError);
777 Configuration conf = connection.getConfiguration();
778 this.writeSniffing = writeSniffing;
779 this.writeTableName = writeTableName;
780 this.writeDataTTL =
781 conf.getInt(HConstants.HBASE_CANARY_WRITE_DATA_TTL_KEY, DEFAULT_WRITE_DATA_TTL);
782 this.regionsLowerLimit =
783 conf.getFloat(HConstants.HBASE_CANARY_WRITE_PERSERVER_REGIONS_LOWERLIMIT_KEY, 1.0f);
784 this.regionsUpperLimit =
785 conf.getFloat(HConstants.HBASE_CANARY_WRITE_PERSERVER_REGIONS_UPPERLIMIT_KEY, 1.5f);
786 this.checkPeriod =
787 conf.getInt(HConstants.HBASE_CANARY_WRITE_TABLE_CHECK_PERIOD_KEY,
788 DEFAULT_WRITE_TABLE_CHECK_PERIOD);
789 }
790
791 @Override
792 public void run() {
793 if (this.initAdmin()) {
794 try {
795 List<Future<Void>> taskFutures = new LinkedList<Future<Void>>();
796 if (this.targets != null && this.targets.length > 0) {
797 String[] tables = generateMonitorTables(this.targets);
798 this.initialized = true;
799 for (String table : tables) {
800 taskFutures.addAll(Canary.sniff(admin, sink, table, executor, TaskType.READ));
801 }
802 } else {
803 taskFutures.addAll(sniff(TaskType.READ));
804 }
805
806 if (writeSniffing) {
807 if (EnvironmentEdgeManager.currentTime() - lastCheckTime > checkPeriod) {
808 try {
809 checkWriteTableDistribution();
810 } catch (IOException e) {
811 LOG.error("Check canary table distribution failed!", e);
812 }
813 lastCheckTime = EnvironmentEdgeManager.currentTime();
814 }
815
816 taskFutures.addAll(Canary.sniff(admin, sink,
817 admin.getTableDescriptor(writeTableName), executor, TaskType.WRITE));
818 }
819
820 for (Future<Void> future : taskFutures) {
821 try {
822 future.get();
823 } catch (ExecutionException e) {
824 LOG.error("Sniff region failed!", e);
825 }
826 }
827 } catch (Exception e) {
828 LOG.error("Run regionMonitor failed", e);
829 this.errorCode = ERROR_EXIT_CODE;
830 }
831 }
832 this.done = true;
833 }
834
835 private String[] generateMonitorTables(String[] monitorTargets) throws IOException {
836 String[] returnTables = null;
837
838 if (this.useRegExp) {
839 Pattern pattern = null;
840 HTableDescriptor[] tds = null;
841 Set<String> tmpTables = new TreeSet<String>();
842 try {
843 for (String monitorTarget : monitorTargets) {
844 pattern = Pattern.compile(monitorTarget);
845 tds = this.admin.listTables(pattern);
846 if (tds != null) {
847 for (HTableDescriptor td : tds) {
848 tmpTables.add(td.getNameAsString());
849 }
850 }
851 }
852 } catch (IOException e) {
853 LOG.error("Communicate with admin failed", e);
854 throw e;
855 }
856
857 if (tmpTables.size() > 0) {
858 returnTables = tmpTables.toArray(new String[tmpTables.size()]);
859 } else {
860 String msg = "No HTable found, tablePattern:" + Arrays.toString(monitorTargets);
861 LOG.error(msg);
862 this.errorCode = INIT_ERROR_EXIT_CODE;
863 throw new TableNotFoundException(msg);
864 }
865 } else {
866 returnTables = monitorTargets;
867 }
868
869 return returnTables;
870 }
871
872
873
874
875 private List<Future<Void>> sniff(TaskType taskType) throws Exception {
876 List<Future<Void>> taskFutures = new LinkedList<Future<Void>>();
877 for (HTableDescriptor table : admin.listTables()) {
878 if (admin.isTableEnabled(table.getTableName())
879 && (!table.getTableName().equals(writeTableName))) {
880 taskFutures.addAll(Canary.sniff(admin, sink, table, executor, taskType));
881 }
882 }
883 return taskFutures;
884 }
885
886 private void checkWriteTableDistribution() throws IOException {
887 if (!admin.tableExists(writeTableName)) {
888 int numberOfServers = admin.getClusterStatus().getServers().size();
889 if (numberOfServers == 0) {
890 throw new IllegalStateException("No live regionservers");
891 }
892 createWriteTable(numberOfServers);
893 }
894
895 if (!admin.isTableEnabled(writeTableName)) {
896 admin.enableTable(writeTableName);
897 }
898
899 int numberOfServers = admin.getClusterStatus().getServers().size();
900 List<HRegionLocation> locations;
901 RegionLocator locator = connection.getRegionLocator(writeTableName);
902 try {
903 locations = locator.getAllRegionLocations();
904 } finally {
905 locator.close();
906 }
907 int numberOfRegions = locations.size();
908 if (numberOfRegions < numberOfServers * regionsLowerLimit
909 || numberOfRegions > numberOfServers * regionsUpperLimit) {
910 admin.disableTable(writeTableName);
911 admin.deleteTable(writeTableName);
912 createWriteTable(numberOfServers);
913 }
914 HashSet<ServerName> serverSet = new HashSet<ServerName>();
915 for (HRegionLocation location: locations) {
916 serverSet.add(location.getServerName());
917 }
918 int numberOfCoveredServers = serverSet.size();
919 if (numberOfCoveredServers < numberOfServers) {
920 admin.balancer();
921 }
922 }
923
924 private void createWriteTable(int numberOfServers) throws IOException {
925 int numberOfRegions = (int)(numberOfServers * regionsLowerLimit);
926 LOG.info("Number of live regionservers: " + numberOfServers + ", "
927 + "pre-splitting the canary table into " + numberOfRegions + " regions "
928 + "(current lower limi of regions per server is " + regionsLowerLimit
929 + " and you can change it by config: "
930 + HConstants.HBASE_CANARY_WRITE_PERSERVER_REGIONS_LOWERLIMIT_KEY + " )");
931 HTableDescriptor desc = new HTableDescriptor(writeTableName);
932 HColumnDescriptor family = new HColumnDescriptor(CANARY_TABLE_FAMILY_NAME);
933 family.setMaxVersions(1);
934 family.setTimeToLive(writeDataTTL);
935
936 desc.addFamily(family);
937 byte[][] splits = new RegionSplitter.HexStringSplit().split(numberOfRegions);
938 admin.createTable(desc, splits);
939 }
940 }
941
942
943
944
945
946 public static void sniff(final Admin admin, TableName tableName)
947 throws Exception {
948 sniff(admin, tableName, TaskType.READ);
949 }
950
951
952
953
954
955 public static void sniff(final Admin admin, TableName tableName, TaskType taskType)
956 throws Exception {
957 List<Future<Void>> taskFutures =
958 Canary.sniff(admin, new StdOutSink(), tableName.getNameAsString(),
959 new ScheduledThreadPoolExecutor(1), taskType);
960 for (Future<Void> future : taskFutures) {
961 future.get();
962 }
963 }
964
965
966
967
968
969 private static List<Future<Void>> sniff(final Admin admin, final Sink sink, String tableName,
970 ExecutorService executor, TaskType taskType) throws Exception {
971 if (admin.isTableEnabled(TableName.valueOf(tableName))) {
972 return Canary.sniff(admin, sink, admin.getTableDescriptor(TableName.valueOf(tableName)),
973 executor, taskType);
974 } else {
975 LOG.warn(String.format("Table %s is not enabled", tableName));
976 }
977 return new LinkedList<Future<Void>>();
978 }
979
980
981
982
983 private static List<Future<Void>> sniff(final Admin admin, final Sink sink,
984 HTableDescriptor tableDesc, ExecutorService executor, TaskType taskType) throws Exception {
985 Table table = null;
986 try {
987 table = admin.getConnection().getTable(tableDesc.getTableName());
988 } catch (TableNotFoundException e) {
989 return new ArrayList<Future<Void>>();
990 }
991 List<RegionTask> tasks = new ArrayList<RegionTask>();
992 try {
993 for (HRegionInfo region : admin.getTableRegions(tableDesc.getTableName())) {
994 tasks.add(new RegionTask(admin.getConnection(), region, sink, taskType));
995 }
996 } finally {
997 table.close();
998 }
999 return executor.invokeAll(tasks);
1000 }
1001
1002
1003
1004
1005
1006 private static void sniffRegion(
1007 final Admin admin,
1008 final Sink sink,
1009 HRegionInfo region,
1010 Table table) throws Exception {
1011 HTableDescriptor tableDesc = table.getTableDescriptor();
1012 byte[] startKey = null;
1013 Get get = null;
1014 Scan scan = null;
1015 ResultScanner rs = null;
1016 StopWatch stopWatch = new StopWatch();
1017 for (HColumnDescriptor column : tableDesc.getColumnFamilies()) {
1018 stopWatch.reset();
1019 startKey = region.getStartKey();
1020
1021 if (startKey.length > 0) {
1022 get = new Get(startKey);
1023 get.setCacheBlocks(false);
1024 get.setFilter(new FirstKeyOnlyFilter());
1025 get.addFamily(column.getName());
1026 } else {
1027 scan = new Scan();
1028 scan.setRaw(true);
1029 scan.setCaching(1);
1030 scan.setCacheBlocks(false);
1031 scan.setFilter(new FirstKeyOnlyFilter());
1032 scan.addFamily(column.getName());
1033 scan.setMaxResultSize(1L);
1034 }
1035
1036 try {
1037 if (startKey.length > 0) {
1038 stopWatch.start();
1039 table.get(get);
1040 stopWatch.stop();
1041 sink.publishReadTiming(region, column, stopWatch.getTime());
1042 } else {
1043 stopWatch.start();
1044 rs = table.getScanner(scan);
1045 stopWatch.stop();
1046 sink.publishReadTiming(region, column, stopWatch.getTime());
1047 }
1048 } catch (Exception e) {
1049 sink.publishReadFailure(region, column, e);
1050 } finally {
1051 if (rs != null) {
1052 rs.close();
1053 }
1054 scan = null;
1055 get = null;
1056 startKey = null;
1057 }
1058 }
1059 }
1060
1061 private static class RegionServerMonitor extends Monitor {
1062
1063 private boolean allRegions;
1064
1065 public RegionServerMonitor(Connection connection, String[] monitorTargets, boolean useRegExp,
1066 ExtendedSink sink, ExecutorService executor, boolean allRegions,
1067 boolean treatFailureAsError) {
1068 super(connection, monitorTargets, useRegExp, sink, executor, treatFailureAsError);
1069 this.allRegions = allRegions;
1070 }
1071
1072 private ExtendedSink getSink() {
1073 return (ExtendedSink) this.sink;
1074 }
1075
1076 @Override
1077 public void run() {
1078 if (this.initAdmin() && this.checkNoTableNames()) {
1079 Map<String, List<HRegionInfo>> rsAndRMap = this.filterRegionServerByName();
1080 this.initialized = true;
1081 this.monitorRegionServers(rsAndRMap);
1082 }
1083 this.done = true;
1084 }
1085
1086 private boolean checkNoTableNames() {
1087 List<String> foundTableNames = new ArrayList<String>();
1088 TableName[] tableNames = null;
1089
1090 try {
1091 tableNames = this.admin.listTableNames();
1092 } catch (IOException e) {
1093 LOG.error("Get listTableNames failed", e);
1094 this.errorCode = INIT_ERROR_EXIT_CODE;
1095 return false;
1096 }
1097
1098 if (this.targets == null || this.targets.length == 0) return true;
1099
1100 for (String target : this.targets) {
1101 for (TableName tableName : tableNames) {
1102 if (target.equals(tableName.getNameAsString())) {
1103 foundTableNames.add(target);
1104 }
1105 }
1106 }
1107
1108 if (foundTableNames.size() > 0) {
1109 System.err.println("Cannot pass a tablename when using the -regionserver " +
1110 "option, tablenames:" + foundTableNames.toString());
1111 this.errorCode = USAGE_EXIT_CODE;
1112 }
1113 return foundTableNames.size() == 0;
1114 }
1115
1116 private void monitorRegionServers(Map<String, List<HRegionInfo>> rsAndRMap) {
1117 List<RegionServerTask> tasks = new ArrayList<RegionServerTask>();
1118 Map<String, AtomicLong> successMap = new HashMap<String, AtomicLong>();
1119 Random rand = new Random();
1120 for (Map.Entry<String, List<HRegionInfo>> entry : rsAndRMap.entrySet()) {
1121 String serverName = entry.getKey();
1122 AtomicLong successes = new AtomicLong(0);
1123 successMap.put(serverName, successes);
1124 if (entry.getValue().isEmpty()) {
1125 LOG.error(String.format("Regionserver not serving any regions - %s", serverName));
1126 } else if (this.allRegions) {
1127 for (HRegionInfo region : entry.getValue()) {
1128 tasks.add(new RegionServerTask(this.connection,
1129 serverName,
1130 region,
1131 getSink(),
1132 successes));
1133 }
1134 } else {
1135
1136 HRegionInfo region = entry.getValue().get(rand.nextInt(entry.getValue().size()));
1137 tasks.add(new RegionServerTask(this.connection,
1138 serverName,
1139 region,
1140 getSink(),
1141 successes));
1142 }
1143 }
1144 try {
1145 for (Future<Void> future : this.executor.invokeAll(tasks)) {
1146 try {
1147 future.get();
1148 } catch (ExecutionException e) {
1149 LOG.error("Sniff regionserver failed!", e);
1150 this.errorCode = ERROR_EXIT_CODE;
1151 }
1152 }
1153 if (this.allRegions) {
1154 for (Map.Entry<String, List<HRegionInfo>> entry : rsAndRMap.entrySet()) {
1155 String serverName = entry.getKey();
1156 LOG.info("Successfully read " + successMap.get(serverName) + " regions out of "
1157 + entry.getValue().size() + " on regionserver:" + serverName);
1158 }
1159 }
1160 } catch (InterruptedException e) {
1161 this.errorCode = ERROR_EXIT_CODE;
1162 LOG.error("Sniff regionserver interrupted!", e);
1163 }
1164 }
1165
1166 private Map<String, List<HRegionInfo>> filterRegionServerByName() {
1167 Map<String, List<HRegionInfo>> regionServerAndRegionsMap = this.getAllRegionServerByName();
1168 regionServerAndRegionsMap = this.doFilterRegionServerByName(regionServerAndRegionsMap);
1169 return regionServerAndRegionsMap;
1170 }
1171
1172 private Map<String, List<HRegionInfo>> getAllRegionServerByName() {
1173 Map<String, List<HRegionInfo>> rsAndRMap = new HashMap<String, List<HRegionInfo>>();
1174 Table table = null;
1175 RegionLocator regionLocator = null;
1176 try {
1177 HTableDescriptor[] tableDescs = this.admin.listTables();
1178 List<HRegionInfo> regions = null;
1179 for (HTableDescriptor tableDesc : tableDescs) {
1180 table = this.admin.getConnection().getTable(tableDesc.getTableName());
1181 regionLocator = this.admin.getConnection().getRegionLocator(tableDesc.getTableName());
1182
1183 for (HRegionLocation location : regionLocator.getAllRegionLocations()) {
1184 ServerName rs = location.getServerName();
1185 String rsName = rs.getHostname();
1186 HRegionInfo r = location.getRegionInfo();
1187
1188 if (rsAndRMap.containsKey(rsName)) {
1189 regions = rsAndRMap.get(rsName);
1190 } else {
1191 regions = new ArrayList<HRegionInfo>();
1192 rsAndRMap.put(rsName, regions);
1193 }
1194 regions.add(r);
1195 }
1196 table.close();
1197 }
1198
1199
1200 for (ServerName rs : this.admin.getClusterStatus().getServers()) {
1201 String rsName = rs.getHostname();
1202 if (!rsAndRMap.containsKey(rsName)) {
1203 rsAndRMap.put(rsName, Collections.<HRegionInfo>emptyList());
1204 }
1205 }
1206 } catch (IOException e) {
1207 String msg = "Get HTables info failed";
1208 LOG.error(msg, e);
1209 this.errorCode = INIT_ERROR_EXIT_CODE;
1210 } finally {
1211 if (table != null) {
1212 try {
1213 table.close();
1214 } catch (IOException e) {
1215 LOG.warn("Close table failed", e);
1216 }
1217 }
1218 }
1219
1220 return rsAndRMap;
1221 }
1222
1223 private Map<String, List<HRegionInfo>> doFilterRegionServerByName(
1224 Map<String, List<HRegionInfo>> fullRsAndRMap) {
1225
1226 Map<String, List<HRegionInfo>> filteredRsAndRMap = null;
1227
1228 if (this.targets != null && this.targets.length > 0) {
1229 filteredRsAndRMap = new HashMap<String, List<HRegionInfo>>();
1230 Pattern pattern = null;
1231 Matcher matcher = null;
1232 boolean regExpFound = false;
1233 for (String rsName : this.targets) {
1234 if (this.useRegExp) {
1235 regExpFound = false;
1236 pattern = Pattern.compile(rsName);
1237 for (Map.Entry<String, List<HRegionInfo>> entry : fullRsAndRMap.entrySet()) {
1238 matcher = pattern.matcher(entry.getKey());
1239 if (matcher.matches()) {
1240 filteredRsAndRMap.put(entry.getKey(), entry.getValue());
1241 regExpFound = true;
1242 }
1243 }
1244 if (!regExpFound) {
1245 LOG.info("No RegionServerInfo found, regionServerPattern:" + rsName);
1246 }
1247 } else {
1248 if (fullRsAndRMap.containsKey(rsName)) {
1249 filteredRsAndRMap.put(rsName, fullRsAndRMap.get(rsName));
1250 } else {
1251 LOG.info("No RegionServerInfo found, regionServerName:" + rsName);
1252 }
1253 }
1254 }
1255 } else {
1256 filteredRsAndRMap = fullRsAndRMap;
1257 }
1258 return filteredRsAndRMap;
1259 }
1260 }
1261
1262 public static void main(String[] args) throws Exception {
1263 final Configuration conf = HBaseConfiguration.create();
1264
1265
1266 new GenericOptionsParser(conf, args);
1267
1268 int numThreads = conf.getInt("hbase.canary.threads.num", MAX_THREADS_NUM);
1269 LOG.info("Number of exection threads " + numThreads);
1270
1271 ExecutorService executor = new ScheduledThreadPoolExecutor(numThreads);
1272
1273 Class<? extends Sink> sinkClass =
1274 conf.getClass("hbase.canary.sink.class", RegionServerStdOutSink.class, Sink.class);
1275 Sink sink = ReflectionUtils.newInstance(sinkClass);
1276
1277 int exitCode = ToolRunner.run(conf, new Canary(executor, sink), args);
1278 executor.shutdown();
1279 System.exit(exitCode);
1280 }
1281 }