View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.tool;
21  
22  import java.io.Closeable;
23  import java.io.IOException;
24  import java.util.ArrayList;
25  import java.util.Arrays;
26  import java.util.HashMap;
27  import java.util.HashSet;
28  import java.util.LinkedList;
29  import java.util.List;
30  import java.util.Map;
31  import java.util.Random;
32  import java.util.Set;
33  import java.util.TreeSet;
34  import java.util.concurrent.Callable;
35  import java.util.concurrent.ExecutionException;
36  import java.util.concurrent.ExecutorService;
37  import java.util.concurrent.Future;
38  import java.util.concurrent.ScheduledThreadPoolExecutor;
39  import java.util.regex.Matcher;
40  import java.util.regex.Pattern;
41  
42  import org.apache.commons.lang.time.StopWatch;
43  import org.apache.commons.logging.Log;
44  import org.apache.commons.logging.LogFactory;
45  import org.apache.hadoop.conf.Configuration;
46  import org.apache.hadoop.hbase.AuthUtil;
47  import org.apache.hadoop.hbase.ChoreService;
48  import org.apache.hadoop.hbase.DoNotRetryIOException;
49  import org.apache.hadoop.hbase.HBaseConfiguration;
50  import org.apache.hadoop.hbase.HColumnDescriptor;
51  import org.apache.hadoop.hbase.HConstants;
52  import org.apache.hadoop.hbase.HRegionInfo;
53  import org.apache.hadoop.hbase.HRegionLocation;
54  import org.apache.hadoop.hbase.HTableDescriptor;
55  import org.apache.hadoop.hbase.MetaTableAccessor;
56  import org.apache.hadoop.hbase.NamespaceDescriptor;
57  import org.apache.hadoop.hbase.ScheduledChore;
58  import org.apache.hadoop.hbase.ServerName;
59  import org.apache.hadoop.hbase.TableName;
60  import org.apache.hadoop.hbase.TableNotEnabledException;
61  import org.apache.hadoop.hbase.TableNotFoundException;
62  import org.apache.hadoop.hbase.client.Admin;
63  import org.apache.hadoop.hbase.client.Connection;
64  import org.apache.hadoop.hbase.client.ConnectionFactory;
65  import org.apache.hadoop.hbase.client.Get;
66  import org.apache.hadoop.hbase.client.Put;
67  import org.apache.hadoop.hbase.client.RegionLocator;
68  import org.apache.hadoop.hbase.client.ResultScanner;
69  import org.apache.hadoop.hbase.client.Scan;
70  import org.apache.hadoop.hbase.client.Table;
71  import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
72  import org.apache.hadoop.hbase.tool.Canary.RegionTask.TaskType;
73  import org.apache.hadoop.hbase.util.Bytes;
74  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
75  import org.apache.hadoop.hbase.util.Pair;
76  import org.apache.hadoop.hbase.util.ReflectionUtils;
77  import org.apache.hadoop.hbase.util.RegionSplitter;
78  import org.apache.hadoop.util.Tool;
79  import org.apache.hadoop.util.ToolRunner;
80  
81  /**
82   * HBase Canary Tool, that that can be used to do
83   * "canary monitoring" of a running HBase cluster.
84   *
85   * Here are two modes
86   * 1. region mode - Foreach region tries to get one row per column family
87   * and outputs some information about failure or latency.
88   *
89   * 2. regionserver mode - Foreach regionserver tries to get one row from one table
90   * selected randomly and outputs some information about failure or latency.
91   */
92  public final class Canary implements Tool {
93    // Sink interface used by the canary to outputs information
94    public interface Sink {
95      public void publishReadFailure(HRegionInfo region, Exception e);
96      public void publishReadFailure(HRegionInfo region, HColumnDescriptor column, Exception e);
97      public void publishReadTiming(HRegionInfo region, HColumnDescriptor column, long msTime);
98      public void publishWriteFailure(HRegionInfo region, Exception e);
99      public void publishWriteFailure(HRegionInfo region, HColumnDescriptor column, Exception e);
100     public void publishWriteTiming(HRegionInfo region, HColumnDescriptor column, long msTime);
101   }
102   // new extended sink for output regionserver mode info
103   // do not change the Sink interface directly due to maintaining the API
104   public interface ExtendedSink extends Sink {
105     public void publishReadFailure(String table, String server);
106     public void publishReadTiming(String table, String server, long msTime);
107   }
108 
109   // Simple implementation of canary sink that allows to plot on
110   // file or standard output timings or failures.
111   public static class StdOutSink implements Sink {
112     @Override
113     public void publishReadFailure(HRegionInfo region, Exception e) {
114       LOG.error(String.format("read from region %s failed", region.getRegionNameAsString()), e);
115     }
116 
117     @Override
118     public void publishReadFailure(HRegionInfo region, HColumnDescriptor column, Exception e) {
119       LOG.error(String.format("read from region %s column family %s failed",
120                 region.getRegionNameAsString(), column.getNameAsString()), e);
121     }
122 
123     @Override
124     public void publishReadTiming(HRegionInfo region, HColumnDescriptor column, long msTime) {
125       LOG.info(String.format("read from region %s column family %s in %dms",
126                region.getRegionNameAsString(), column.getNameAsString(), msTime));
127     }
128 
129     @Override
130     public void publishWriteFailure(HRegionInfo region, Exception e) {
131       LOG.error(String.format("write to region %s failed", region.getRegionNameAsString()), e);
132     }
133 
134     @Override
135     public void publishWriteFailure(HRegionInfo region, HColumnDescriptor column, Exception e) {
136       LOG.error(String.format("write to region %s column family %s failed",
137         region.getRegionNameAsString(), column.getNameAsString()), e);
138     }
139 
140     @Override
141     public void publishWriteTiming(HRegionInfo region, HColumnDescriptor column, long msTime) {
142       LOG.info(String.format("write to region %s column family %s in %dms",
143         region.getRegionNameAsString(), column.getNameAsString(), msTime));
144     }
145   }
146   // a ExtendedSink implementation
147   public static class RegionServerStdOutSink extends StdOutSink implements ExtendedSink {
148 
149     @Override
150     public void publishReadFailure(String table, String server) {
151       LOG.error(String.format("Read from table:%s on region server:%s", table, server));
152     }
153 
154     @Override
155     public void publishReadTiming(String table, String server, long msTime) {
156       LOG.info(String.format("Read from table:%s on region server:%s in %dms",
157           table, server, msTime));
158     }
159   }
160 
161   /**
162    * For each column family of the region tries to get one row and outputs the latency, or the
163    * failure.
164    */
165   static class RegionTask implements Callable<Void> {
166     public enum TaskType{
167       READ, WRITE
168     }
169     private Connection connection;
170     private HRegionInfo region;
171     private Sink sink;
172     private TaskType taskType;
173 
174     RegionTask(Connection connection, HRegionInfo region, Sink sink, TaskType taskType) {
175       this.connection = connection;
176       this.region = region;
177       this.sink = sink;
178       this.taskType = taskType;
179     }
180 
181     @Override
182     public Void call() {
183       switch (taskType) {
184       case READ:
185         return read();
186       case WRITE:
187         return write();
188       default:
189         return read();
190       }
191     }
192 
193     public Void read() {
194       Table table = null;
195       HTableDescriptor tableDesc = null;
196       try {
197         table = connection.getTable(region.getTable());
198         tableDesc = table.getTableDescriptor();
199       } catch (IOException e) {
200         LOG.debug("sniffRegion failed", e);
201         sink.publishReadFailure(region, e);
202         if (table != null) {
203           try {
204             table.close();
205           } catch (IOException ioe) {
206             LOG.error("Close table failed", e);
207           }
208         }
209         return null;
210       }
211 
212       byte[] startKey = null;
213       Get get = null;
214       Scan scan = null;
215       ResultScanner rs = null;
216       StopWatch stopWatch = new StopWatch();
217       for (HColumnDescriptor column : tableDesc.getColumnFamilies()) {
218         stopWatch.reset();
219         startKey = region.getStartKey();
220         // Can't do a get on empty start row so do a Scan of first element if any instead.
221         if (startKey.length > 0) {
222           get = new Get(startKey);
223           get.setCacheBlocks(false);
224           get.setFilter(new FirstKeyOnlyFilter());
225           get.addFamily(column.getName());
226         } else {
227           scan = new Scan();
228           scan.setRaw(true);
229           scan.setCaching(1);
230           scan.setCacheBlocks(false);
231           scan.setFilter(new FirstKeyOnlyFilter());
232           scan.addFamily(column.getName());
233           scan.setMaxResultSize(1L);
234         }
235 
236         try {
237           if (startKey.length > 0) {
238             stopWatch.start();
239             table.get(get);
240             stopWatch.stop();
241             sink.publishReadTiming(region, column, stopWatch.getTime());
242           } else {
243             stopWatch.start();
244             rs = table.getScanner(scan);
245             stopWatch.stop();
246             sink.publishReadTiming(region, column, stopWatch.getTime());
247           }
248         } catch (Exception e) {
249           sink.publishReadFailure(region, column, e);
250         } finally {
251           if (rs != null) {
252             rs.close();
253           }
254           scan = null;
255           get = null;
256           startKey = null;
257         }
258       }
259       try {
260         table.close();
261       } catch (IOException e) {
262         LOG.error("Close table failed", e);
263       }
264       return null;
265     }
266 
267     /**
268      * Check writes for the canary table
269      * @return
270      */
271     private Void write() {
272       Table table = null;
273       HTableDescriptor tableDesc = null;
274       try {
275         table = connection.getTable(region.getTable());
276         tableDesc = table.getTableDescriptor();
277         byte[] rowToCheck = region.getStartKey();
278         if (rowToCheck.length == 0) {
279           rowToCheck = new byte[]{0x0};
280         }
281         int writeValueSize =
282             connection.getConfiguration().getInt(HConstants.HBASE_CANARY_WRITE_VALUE_SIZE_KEY, 10);
283         for (HColumnDescriptor column : tableDesc.getColumnFamilies()) {
284           Put put = new Put(rowToCheck);
285           byte[] value = new byte[writeValueSize];
286           Bytes.random(value);
287           put.addColumn(column.getName(), HConstants.EMPTY_BYTE_ARRAY, value);
288           try {
289             long startTime = System.currentTimeMillis();
290             table.put(put);
291             long time = System.currentTimeMillis() - startTime;
292             sink.publishWriteTiming(region, column, time);
293           } catch (Exception e) {
294             sink.publishWriteFailure(region, column, e);
295           }
296         }
297         table.close();
298       } catch (IOException e) {
299         sink.publishWriteFailure(region, e);
300       }
301       return null;
302     }
303   }
304 
305   /**
306    * Get one row from a region on the regionserver and outputs the latency, or the failure.
307    */
308   static class RegionServerTask implements Callable<Void> {
309     private Connection connection;
310     private String serverName;
311     private HRegionInfo region;
312     private ExtendedSink sink;
313 
314     RegionServerTask(Connection connection, String serverName, HRegionInfo region,
315         ExtendedSink sink) {
316       this.connection = connection;
317       this.serverName = serverName;
318       this.region = region;
319       this.sink = sink;
320     }
321 
322     @Override
323     public Void call() {
324       TableName tableName = null;
325       Table table = null;
326       Get get = null;
327       byte[] startKey = null;
328       Scan scan = null;
329       StopWatch stopWatch = new StopWatch();
330       // monitor one region on every region server
331       stopWatch.reset();
332       try {
333         tableName = region.getTable();
334         table = connection.getTable(tableName);
335         startKey = region.getStartKey();
336         // Can't do a get on empty start row so do a Scan of first element if any instead.
337         if (startKey.length > 0) {
338           get = new Get(startKey);
339           get.setCacheBlocks(false);
340           get.setFilter(new FirstKeyOnlyFilter());
341           stopWatch.start();
342           table.get(get);
343           stopWatch.stop();
344         } else {
345           scan = new Scan();
346           scan.setCacheBlocks(false);
347           scan.setFilter(new FirstKeyOnlyFilter());
348           scan.setCaching(1);
349           scan.setMaxResultSize(1L);
350           stopWatch.start();
351           ResultScanner s = table.getScanner(scan);
352           s.close();
353           stopWatch.stop();
354         }
355         sink.publishReadTiming(tableName.getNameAsString(), serverName, stopWatch.getTime());
356       } catch (TableNotFoundException tnfe) {
357         LOG.error("Table may be deleted", tnfe);
358         // This is ignored because it doesn't imply that the regionserver is dead
359       } catch (TableNotEnabledException tnee) {
360         // This is considered a success since we got a response.
361         LOG.debug("The targeted table was disabled.  Assuming success.");
362       } catch (DoNotRetryIOException dnrioe) {
363         sink.publishReadFailure(tableName.getNameAsString(), serverName);
364         LOG.error(dnrioe);
365       } catch (IOException e) {
366         sink.publishReadFailure(tableName.getNameAsString(), serverName);
367         LOG.error(e);
368       } finally {
369         if (table != null) {
370           try {
371             table.close();
372           } catch (IOException e) {/* DO NOTHING */
373             LOG.error("Close table failed", e);
374           }
375         }
376         scan = null;
377         get = null;
378         startKey = null;
379       }
380       return null;
381     }
382   }
383 
384   private static final int USAGE_EXIT_CODE = 1;
385   private static final int INIT_ERROR_EXIT_CODE = 2;
386   private static final int TIMEOUT_ERROR_EXIT_CODE = 3;
387   private static final int ERROR_EXIT_CODE = 4;
388 
389   private static final long DEFAULT_INTERVAL = 6000;
390 
391   private static final long DEFAULT_TIMEOUT = 600000; // 10 mins
392   private static final int MAX_THREADS_NUM = 16; // #threads to contact regions
393 
394   private static final Log LOG = LogFactory.getLog(Canary.class);
395 
396   public static final TableName DEFAULT_WRITE_TABLE_NAME = TableName.valueOf(
397     NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR, "canary");
398 
399   private static final String CANARY_TABLE_FAMILY_NAME = "Test";
400 
401   private Configuration conf = null;
402   private long interval = 0;
403   private Sink sink = null;
404 
405   private boolean useRegExp;
406   private long timeout = DEFAULT_TIMEOUT;
407   private boolean failOnError = true;
408   private boolean regionServerMode = false;
409   private boolean writeSniffing = false;
410   private TableName writeTableName = DEFAULT_WRITE_TABLE_NAME;
411 
412   private ExecutorService executor; // threads to retrieve data from regionservers
413 
414   public Canary() {
415     this(new ScheduledThreadPoolExecutor(1), new RegionServerStdOutSink());
416   }
417 
418   public Canary(ExecutorService executor, Sink sink) {
419     this.executor = executor;
420     this.sink = sink;
421   }
422 
423   @Override
424   public Configuration getConf() {
425     return conf;
426   }
427 
428   @Override
429   public void setConf(Configuration conf) {
430     this.conf = conf;
431   }
432 
433   private int parseArgs(String[] args) {
434     int index = -1;
435     // Process command line args
436     for (int i = 0; i < args.length; i++) {
437       String cmd = args[i];
438 
439       if (cmd.startsWith("-")) {
440         if (index >= 0) {
441           // command line args must be in the form: [opts] [table 1 [table 2 ...]]
442           System.err.println("Invalid command line options");
443           printUsageAndExit();
444         }
445 
446         if (cmd.equals("-help")) {
447           // user asked for help, print the help and quit.
448           printUsageAndExit();
449         } else if (cmd.equals("-daemon") && interval == 0) {
450           // user asked for daemon mode, set a default interval between checks
451           interval = DEFAULT_INTERVAL;
452         } else if (cmd.equals("-interval")) {
453           // user has specified an interval for canary breaths (-interval N)
454           i++;
455 
456           if (i == args.length) {
457             System.err.println("-interval needs a numeric value argument.");
458             printUsageAndExit();
459           }
460 
461           try {
462             interval = Long.parseLong(args[i]) * 1000;
463           } catch (NumberFormatException e) {
464             System.err.println("-interval needs a numeric value argument.");
465             printUsageAndExit();
466           }
467         } else if(cmd.equals("-regionserver")) {
468           this.regionServerMode = true;
469         } else if(cmd.equals("-writeSniffing")) {
470           this.writeSniffing = true;
471         } else if (cmd.equals("-e")) {
472           this.useRegExp = true;
473         } else if (cmd.equals("-t")) {
474           i++;
475 
476           if (i == args.length) {
477             System.err.println("-t needs a numeric value argument.");
478             printUsageAndExit();
479           }
480 
481           try {
482             this.timeout = Long.parseLong(args[i]);
483           } catch (NumberFormatException e) {
484             System.err.println("-t needs a numeric value argument.");
485             printUsageAndExit();
486           }
487         } else if (cmd.equals("-writeTable")) {
488           i++;
489 
490           if (i == args.length) {
491             System.err.println("-writeTable needs a string value argument.");
492             printUsageAndExit();
493           }
494           this.writeTableName = TableName.valueOf(args[i]);
495         } else if (cmd.equals("-f")) {
496           i++;
497 
498           if (i == args.length) {
499             System.err
500                 .println("-f needs a boolean value argument (true|false).");
501             printUsageAndExit();
502           }
503 
504           this.failOnError = Boolean.parseBoolean(args[i]);
505         } else {
506           // no options match
507           System.err.println(cmd + " options is invalid.");
508           printUsageAndExit();
509         }
510       } else if (index < 0) {
511         // keep track of first table name specified by the user
512         index = i;
513       }
514     }
515     return index;
516   }
517 
518   @Override
519   public int run(String[] args) throws Exception {
520     int index = parseArgs(args);
521     ChoreService choreService = null;
522 
523     // Launches chore for refreshing kerberos credentials if security is enabled.
524     // Please see http://hbase.apache.org/book.html#_running_canary_in_a_kerberos_enabled_cluster
525     // for more details.
526     final ScheduledChore authChore = AuthUtil.getAuthChore(conf);
527     if (authChore != null) {
528       choreService = new ChoreService("CANARY_TOOL");
529       choreService.scheduleChore(authChore);
530     }
531 
532     // Start to prepare the stuffs
533     Monitor monitor = null;
534     Thread monitorThread = null;
535     long startTime = 0;
536     long currentTimeLength = 0;
537     // Get a connection to use in below.
538     // try-with-resources jdk7 construct. See
539     // http://docs.oracle.com/javase/tutorial/essential/exceptions/tryResourceClose.html
540     try (Connection connection = ConnectionFactory.createConnection(this.conf)) {
541       do {
542         // Do monitor !!
543         try {
544           monitor = this.newMonitor(connection, index, args);
545           monitorThread = new Thread(monitor);
546           startTime = System.currentTimeMillis();
547           monitorThread.start();
548           while (!monitor.isDone()) {
549             // wait for 1 sec
550             Thread.sleep(1000);
551             // exit if any error occurs
552             if (this.failOnError && monitor.hasError()) {
553               monitorThread.interrupt();
554               if (monitor.initialized) {
555                 System.exit(monitor.errorCode);
556               } else {
557                 System.exit(INIT_ERROR_EXIT_CODE);
558               }
559             }
560             currentTimeLength = System.currentTimeMillis() - startTime;
561             if (currentTimeLength > this.timeout) {
562               LOG.error("The monitor is running too long (" + currentTimeLength
563                   + ") after timeout limit:" + this.timeout
564                   + " will be killed itself !!");
565               if (monitor.initialized) {
566                 System.exit(TIMEOUT_ERROR_EXIT_CODE);
567               } else {
568                 System.exit(INIT_ERROR_EXIT_CODE);
569               }
570               break;
571             }
572           }
573 
574           if (this.failOnError && monitor.hasError()) {
575             monitorThread.interrupt();
576             System.exit(monitor.errorCode);
577           }
578         } finally {
579           if (monitor != null) monitor.close();
580         }
581 
582         Thread.sleep(interval);
583       } while (interval > 0);
584     } // try-with-resources close
585 
586     if (choreService != null) {
587       choreService.shutdown();
588     }
589     return(monitor.errorCode);
590   }
591 
592   private void printUsageAndExit() {
593     System.err.printf(
594       "Usage: bin/hbase %s [opts] [table1 [table2]...] | [regionserver1 [regionserver2]..]%n",
595         getClass().getName());
596     System.err.println(" where [opts] are:");
597     System.err.println("   -help          Show this help and exit.");
598     System.err.println("   -regionserver  replace the table argument to regionserver,");
599     System.err.println("      which means to enable regionserver mode");
600     System.err.println("   -daemon        Continuous check at defined intervals.");
601     System.err.println("   -interval <N>  Interval between checks (sec)");
602     System.err.println("   -e             Use region/regionserver as regular expression");
603     System.err.println("      which means the region/regionserver is regular expression pattern");
604     System.err.println("   -f <B>         stop whole program if first error occurs," +
605         " default is true");
606     System.err.println("   -t <N>         timeout for a check, default is 600000 (milisecs)");
607     System.err.println("   -writeSniffing enable the write sniffing in canary");
608     System.err.println("   -writeTable    The table used for write sniffing."
609         + " Default is hbase:canary");
610     System.exit(USAGE_EXIT_CODE);
611   }
612 
613   /**
614    * A Factory method for {@link Monitor}.
615    * Can be overridden by user.
616    * @param index a start index for monitor target
617    * @param args args passed from user
618    * @return a Monitor instance
619    */
620   public Monitor newMonitor(final Connection connection, int index, String[] args) {
621     Monitor monitor = null;
622     String[] monitorTargets = null;
623 
624     if(index >= 0) {
625       int length = args.length - index;
626       monitorTargets = new String[length];
627       System.arraycopy(args, index, monitorTargets, 0, length);
628     }
629 
630     if (this.regionServerMode) {
631       monitor =
632           new RegionServerMonitor(connection, monitorTargets, this.useRegExp,
633               (ExtendedSink) this.sink, this.executor);
634     } else {
635       monitor =
636           new RegionMonitor(connection, monitorTargets, this.useRegExp, this.sink, this.executor,
637               this.writeSniffing, this.writeTableName);
638     }
639     return monitor;
640   }
641 
642   // a Monitor super-class can be extended by users
643   public static abstract class Monitor implements Runnable, Closeable {
644 
645     protected Connection connection;
646     protected Admin admin;
647     protected String[] targets;
648     protected boolean useRegExp;
649     protected boolean initialized = false;
650 
651     protected boolean done = false;
652     protected int errorCode = 0;
653     protected Sink sink;
654     protected ExecutorService executor;
655 
656     public boolean isDone() {
657       return done;
658     }
659 
660     public boolean hasError() {
661       return errorCode != 0;
662     }
663 
664     @Override
665     public void close() throws IOException {
666       if (this.admin != null) this.admin.close();
667     }
668 
669     protected Monitor(Connection connection, String[] monitorTargets, boolean useRegExp, Sink sink,
670         ExecutorService executor) {
671       if (null == connection) throw new IllegalArgumentException("connection shall not be null");
672 
673       this.connection = connection;
674       this.targets = monitorTargets;
675       this.useRegExp = useRegExp;
676       this.sink = sink;
677       this.executor = executor;
678     }
679 
680     public abstract void run();
681 
682     protected boolean initAdmin() {
683       if (null == this.admin) {
684         try {
685           this.admin = this.connection.getAdmin();
686         } catch (Exception e) {
687           LOG.error("Initial HBaseAdmin failed...", e);
688           this.errorCode = INIT_ERROR_EXIT_CODE;
689         }
690       } else if (admin.isAborted()) {
691         LOG.error("HBaseAdmin aborted");
692         this.errorCode = INIT_ERROR_EXIT_CODE;
693       }
694       return !this.hasError();
695     }
696   }
697 
698   // a monitor for region mode
699   private static class RegionMonitor extends Monitor {
700     // 10 minutes
701     private static final int DEFAULT_WRITE_TABLE_CHECK_PERIOD = 10 * 60 * 1000;
702     // 1 days
703     private static final int DEFAULT_WRITE_DATA_TTL = 24 * 60 * 60;
704 
705     private long lastCheckTime = -1;
706     private boolean writeSniffing;
707     private TableName writeTableName;
708     private int writeDataTTL;
709     private float regionsLowerLimit;
710     private float regionsUpperLimit;
711     private int checkPeriod;
712 
713     public RegionMonitor(Connection connection, String[] monitorTargets, boolean useRegExp,
714         Sink sink, ExecutorService executor, boolean writeSniffing, TableName writeTableName) {
715       super(connection, monitorTargets, useRegExp, sink, executor);
716       Configuration conf = connection.getConfiguration();
717       this.writeSniffing = writeSniffing;
718       this.writeTableName = writeTableName;
719       this.writeDataTTL =
720           conf.getInt(HConstants.HBASE_CANARY_WRITE_DATA_TTL_KEY, DEFAULT_WRITE_DATA_TTL);
721       this.regionsLowerLimit =
722           conf.getFloat(HConstants.HBASE_CANARY_WRITE_PERSERVER_REGIONS_LOWERLIMIT_KEY, 1.0f);
723       this.regionsUpperLimit =
724           conf.getFloat(HConstants.HBASE_CANARY_WRITE_PERSERVER_REGIONS_UPPERLIMIT_KEY, 1.5f);
725       this.checkPeriod =
726           conf.getInt(HConstants.HBASE_CANARY_WRITE_TABLE_CHECK_PERIOD_KEY,
727             DEFAULT_WRITE_TABLE_CHECK_PERIOD);
728     }
729 
730     @Override
731     public void run() {
732       if (this.initAdmin()) {
733         try {
734           List<Future<Void>> taskFutures = new LinkedList<Future<Void>>();
735           if (this.targets != null && this.targets.length > 0) {
736             String[] tables = generateMonitorTables(this.targets);
737             this.initialized = true;
738             for (String table : tables) {
739               taskFutures.addAll(Canary.sniff(admin, sink, table, executor, TaskType.READ));
740             }
741           } else {
742             taskFutures.addAll(sniff(TaskType.READ));
743           }
744 
745           if (writeSniffing) {
746             if (EnvironmentEdgeManager.currentTime() - lastCheckTime > checkPeriod) {
747               try {
748                 checkWriteTableDistribution();
749               } catch (IOException e) {
750                 LOG.error("Check canary table distribution failed!", e);
751               }
752               lastCheckTime = EnvironmentEdgeManager.currentTime();
753             }
754             // sniff canary table with write operation
755             taskFutures.addAll(Canary.sniff(admin, sink,
756               admin.getTableDescriptor(writeTableName), executor, TaskType.WRITE));
757           }
758 
759           for (Future<Void> future : taskFutures) {
760             try {
761               future.get();
762             } catch (ExecutionException e) {
763               LOG.error("Sniff region failed!", e);
764             }
765           }
766         } catch (Exception e) {
767           LOG.error("Run regionMonitor failed", e);
768           this.errorCode = ERROR_EXIT_CODE;
769         }
770       }
771       this.done = true;
772     }
773 
774     private String[] generateMonitorTables(String[] monitorTargets) throws IOException {
775       String[] returnTables = null;
776 
777       if (this.useRegExp) {
778         Pattern pattern = null;
779         HTableDescriptor[] tds = null;
780         Set<String> tmpTables = new TreeSet<String>();
781         try {
782           for (String monitorTarget : monitorTargets) {
783             pattern = Pattern.compile(monitorTarget);
784             tds = this.admin.listTables(pattern);
785             if (tds != null) {
786               for (HTableDescriptor td : tds) {
787                 tmpTables.add(td.getNameAsString());
788               }
789             }
790           }
791         } catch (IOException e) {
792           LOG.error("Communicate with admin failed", e);
793           throw e;
794         }
795 
796         if (tmpTables.size() > 0) {
797           returnTables = tmpTables.toArray(new String[tmpTables.size()]);
798         } else {
799           String msg = "No HTable found, tablePattern:" + Arrays.toString(monitorTargets);
800           LOG.error(msg);
801           this.errorCode = INIT_ERROR_EXIT_CODE;
802           throw new TableNotFoundException(msg);
803         }
804       } else {
805         returnTables = monitorTargets;
806       }
807 
808       return returnTables;
809     }
810 
811     /*
812      * canary entry point to monitor all the tables.
813      */
814     private List<Future<Void>> sniff(TaskType taskType) throws Exception {
815       List<Future<Void>> taskFutures = new LinkedList<Future<Void>>();
816       for (HTableDescriptor table : admin.listTables()) {
817         if (admin.isTableEnabled(table.getTableName())
818             && (!table.getTableName().equals(writeTableName))) {
819           taskFutures.addAll(Canary.sniff(admin, sink, table, executor, taskType));
820         }
821       }
822       return taskFutures;
823     }
824 
825     private void checkWriteTableDistribution() throws IOException {
826       if (!admin.tableExists(writeTableName)) {
827         int numberOfServers = admin.getClusterStatus().getServers().size();
828         if (numberOfServers == 0) {
829           throw new IllegalStateException("No live regionservers");
830         }
831         createWriteTable(numberOfServers);
832       }
833 
834       if (!admin.isTableEnabled(writeTableName)) {
835         admin.enableTable(writeTableName);
836       }
837 
838       int numberOfServers = admin.getClusterStatus().getServers().size();
839       List<Pair<HRegionInfo, ServerName>> pairs =
840           MetaTableAccessor.getTableRegionsAndLocations(connection, writeTableName);
841       int numberOfRegions = pairs.size();
842       if (numberOfRegions < numberOfServers * regionsLowerLimit
843           || numberOfRegions > numberOfServers * regionsUpperLimit) {
844         admin.disableTable(writeTableName);
845         admin.deleteTable(writeTableName);
846         createWriteTable(numberOfServers);
847       }
848       HashSet<ServerName> serverSet = new HashSet<ServerName>();
849       for (Pair<HRegionInfo, ServerName> pair : pairs) {
850         serverSet.add(pair.getSecond());
851       }
852       int numberOfCoveredServers = serverSet.size();
853       if (numberOfCoveredServers < numberOfServers) {
854         admin.balancer();
855       }
856     }
857 
858     private void createWriteTable(int numberOfServers) throws IOException {
859       int numberOfRegions = (int)(numberOfServers * regionsLowerLimit);
860       LOG.info("Number of live regionservers: " + numberOfServers + ", "
861           + "pre-splitting the canary table into " + numberOfRegions + " regions "
862           + "(current  lower limi of regions per server is " + regionsLowerLimit
863           + " and you can change it by config: "
864           + HConstants.HBASE_CANARY_WRITE_PERSERVER_REGIONS_LOWERLIMIT_KEY + " )");
865       HTableDescriptor desc = new HTableDescriptor(writeTableName);
866       HColumnDescriptor family = new HColumnDescriptor(CANARY_TABLE_FAMILY_NAME);
867       family.setMaxVersions(1);
868       family.setTimeToLive(writeDataTTL);
869 
870       desc.addFamily(family);
871       byte[][] splits = new RegionSplitter.HexStringSplit().split(numberOfRegions);
872       admin.createTable(desc, splits);
873     }
874   }
875 
876   /**
877    * Canary entry point for specified table.
878    * @throws Exception
879    */
880   public static void sniff(final Admin admin, TableName tableName)
881       throws Exception {
882     sniff(admin, tableName, TaskType.READ);
883   }
884 
885   /**
886    * Canary entry point for specified table with task type(read/write)
887    * @throws Exception
888    */
889   public static void sniff(final Admin admin, TableName tableName, TaskType taskType)
890       throws Exception {
891     List<Future<Void>> taskFutures =
892         Canary.sniff(admin, new StdOutSink(), tableName.getNameAsString(),
893           new ScheduledThreadPoolExecutor(1), taskType);
894     for (Future<Void> future : taskFutures) {
895       future.get();
896     }
897   }
898 
899   /**
900    * Canary entry point for specified table.
901    * @throws Exception
902    */
903   private static List<Future<Void>> sniff(final Admin admin, final Sink sink, String tableName,
904       ExecutorService executor, TaskType taskType) throws Exception {
905     if (admin.isTableEnabled(TableName.valueOf(tableName))) {
906       return Canary.sniff(admin, sink, admin.getTableDescriptor(TableName.valueOf(tableName)),
907         executor, taskType);
908     } else {
909       LOG.warn(String.format("Table %s is not enabled", tableName));
910     }
911     return new LinkedList<Future<Void>>();
912   }
913 
914   /*
915    * Loops over regions that owns this table, and output some information abouts the state.
916    */
917   private static List<Future<Void>> sniff(final Admin admin, final Sink sink,
918       HTableDescriptor tableDesc, ExecutorService executor, TaskType taskType) throws Exception {
919     Table table = null;
920     try {
921       table = admin.getConnection().getTable(tableDesc.getTableName());
922     } catch (TableNotFoundException e) {
923       return new ArrayList<Future<Void>>();
924     }
925     List<RegionTask> tasks = new ArrayList<RegionTask>();
926     try {
927       for (HRegionInfo region : admin.getTableRegions(tableDesc.getTableName())) {
928         tasks.add(new RegionTask(admin.getConnection(), region, sink, taskType));
929       }
930     } finally {
931       table.close();
932     }
933     return executor.invokeAll(tasks);
934   }
935   // a monitor for regionserver mode
936   private static class RegionServerMonitor extends Monitor {
937 
938     public RegionServerMonitor(Connection connection, String[] monitorTargets, boolean useRegExp,
939         ExtendedSink sink, ExecutorService executor) {
940       super(connection, monitorTargets, useRegExp, sink, executor);
941     }
942 
943     private ExtendedSink getSink() {
944       return (ExtendedSink) this.sink;
945     }
946 
947     @Override
948     public void run() {
949       if (this.initAdmin() && this.checkNoTableNames()) {
950         Map<String, List<HRegionInfo>> rsAndRMap = this.filterRegionServerByName();
951         this.initialized = true;
952         this.monitorRegionServers(rsAndRMap);
953       }
954       this.done = true;
955     }
956 
957     private boolean checkNoTableNames() {
958       List<String> foundTableNames = new ArrayList<String>();
959       TableName[] tableNames = null;
960 
961       try {
962         tableNames = this.admin.listTableNames();
963       } catch (IOException e) {
964         LOG.error("Get listTableNames failed", e);
965         this.errorCode = INIT_ERROR_EXIT_CODE;
966         return false;
967       }
968 
969       if (this.targets == null || this.targets.length == 0) return true;
970 
971       for (String target : this.targets) {
972         for (TableName tableName : tableNames) {
973           if (target.equals(tableName.getNameAsString())) {
974             foundTableNames.add(target);
975           }
976         }
977       }
978 
979       if (foundTableNames.size() > 0) {
980         System.err.println("Cannot pass a tablename when using the -regionserver " +
981             "option, tablenames:" + foundTableNames.toString());
982         this.errorCode = USAGE_EXIT_CODE;
983       }
984       return foundTableNames.size() == 0;
985     }
986 
987     private void monitorRegionServers(Map<String, List<HRegionInfo>> rsAndRMap) {
988       List<RegionServerTask> tasks = new ArrayList<RegionServerTask>();
989       Random rand =new Random();
990       // monitor one region on every region server
991       for (Map.Entry<String, List<HRegionInfo>> entry : rsAndRMap.entrySet()) {
992         String serverName = entry.getKey();
993         // random select a region
994         HRegionInfo region = entry.getValue().get(rand.nextInt(entry.getValue().size()));
995         tasks.add(new RegionServerTask(this.connection, serverName, region, getSink()));
996       }
997       try {
998         for (Future<Void> future : this.executor.invokeAll(tasks)) {
999           try {
1000             future.get();
1001           } catch (ExecutionException e) {
1002             LOG.error("Sniff regionserver failed!", e);
1003             this.errorCode = ERROR_EXIT_CODE;
1004           }
1005         }
1006       } catch (InterruptedException e) {
1007         this.errorCode = ERROR_EXIT_CODE;
1008         LOG.error("Sniff regionserver failed!", e);
1009       }
1010     }
1011 
1012     private Map<String, List<HRegionInfo>> filterRegionServerByName() {
1013       Map<String, List<HRegionInfo>> regionServerAndRegionsMap = this.getAllRegionServerByName();
1014       regionServerAndRegionsMap = this.doFilterRegionServerByName(regionServerAndRegionsMap);
1015       return regionServerAndRegionsMap;
1016     }
1017 
1018     private Map<String, List<HRegionInfo>> getAllRegionServerByName() {
1019       Map<String, List<HRegionInfo>> rsAndRMap = new HashMap<String, List<HRegionInfo>>();
1020       Table table = null;
1021       RegionLocator regionLocator = null;
1022       try {
1023         HTableDescriptor[] tableDescs = this.admin.listTables();
1024         List<HRegionInfo> regions = null;
1025         for (HTableDescriptor tableDesc : tableDescs) {
1026           table = this.admin.getConnection().getTable(tableDesc.getTableName());
1027           regionLocator = this.admin.getConnection().getRegionLocator(tableDesc.getTableName());
1028 
1029           for (HRegionLocation location : regionLocator.getAllRegionLocations()) {
1030             ServerName rs = location.getServerName();
1031             String rsName = rs.getHostname();
1032             HRegionInfo r = location.getRegionInfo();
1033 
1034             if (rsAndRMap.containsKey(rsName)) {
1035               regions = rsAndRMap.get(rsName);
1036             } else {
1037               regions = new ArrayList<HRegionInfo>();
1038               rsAndRMap.put(rsName, regions);
1039             }
1040             regions.add(r);
1041           }
1042           table.close();
1043         }
1044 
1045       } catch (IOException e) {
1046         String msg = "Get HTables info failed";
1047         LOG.error(msg, e);
1048         this.errorCode = INIT_ERROR_EXIT_CODE;
1049       } finally {
1050         if (table != null) {
1051           try {
1052             table.close();
1053           } catch (IOException e) {
1054             LOG.warn("Close table failed", e);
1055           }
1056         }
1057       }
1058 
1059       return rsAndRMap;
1060     }
1061 
1062     private Map<String, List<HRegionInfo>> doFilterRegionServerByName(
1063         Map<String, List<HRegionInfo>> fullRsAndRMap) {
1064 
1065       Map<String, List<HRegionInfo>> filteredRsAndRMap = null;
1066 
1067       if (this.targets != null && this.targets.length > 0) {
1068         filteredRsAndRMap = new HashMap<String, List<HRegionInfo>>();
1069         Pattern pattern = null;
1070         Matcher matcher = null;
1071         boolean regExpFound = false;
1072         for (String rsName : this.targets) {
1073           if (this.useRegExp) {
1074             regExpFound = false;
1075             pattern = Pattern.compile(rsName);
1076             for (Map.Entry<String, List<HRegionInfo>> entry : fullRsAndRMap.entrySet()) {
1077               matcher = pattern.matcher(entry.getKey());
1078               if (matcher.matches()) {
1079                 filteredRsAndRMap.put(entry.getKey(), entry.getValue());
1080                 regExpFound = true;
1081               }
1082             }
1083             if (!regExpFound) {
1084               LOG.info("No RegionServerInfo found, regionServerPattern:" + rsName);
1085             }
1086           } else {
1087             if (fullRsAndRMap.containsKey(rsName)) {
1088               filteredRsAndRMap.put(rsName, fullRsAndRMap.get(rsName));
1089             } else {
1090               LOG.info("No RegionServerInfo found, regionServerName:" + rsName);
1091             }
1092           }
1093         }
1094       } else {
1095         filteredRsAndRMap = fullRsAndRMap;
1096       }
1097       return filteredRsAndRMap;
1098     }
1099   }
1100 
1101   public static void main(String[] args) throws Exception {
1102     final Configuration conf = HBaseConfiguration.create();
1103     int numThreads = conf.getInt("hbase.canary.threads.num", MAX_THREADS_NUM);
1104     ExecutorService executor = new ScheduledThreadPoolExecutor(numThreads);
1105 
1106     Class<? extends Sink> sinkClass =
1107         conf.getClass("hbase.canary.sink.class", StdOutSink.class, Sink.class);
1108     Sink sink = ReflectionUtils.newInstance(sinkClass);
1109 
1110     int exitCode = ToolRunner.run(conf, new Canary(executor, sink), args);
1111     executor.shutdown();
1112     System.exit(exitCode);
1113   }
1114 }