View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.mapreduce.replication;
20  
21  import java.io.IOException;
22  import java.util.Arrays;
23
24  import org.apache.commons.logging.Log;
25  import org.apache.commons.logging.LogFactory;
26  import org.apache.hadoop.conf.Configuration;
27  import org.apache.hadoop.conf.Configured;
28  import org.apache.hadoop.hbase.Abortable;
29  import org.apache.hadoop.hbase.HBaseConfiguration;
30  import org.apache.hadoop.hbase.HConstants;
31  import org.apache.hadoop.hbase.TableName;
32  import org.apache.hadoop.hbase.client.Connection;
33  import org.apache.hadoop.hbase.client.ConnectionFactory;
34  import org.apache.hadoop.hbase.client.Put;
35  import org.apache.hadoop.hbase.client.Result;
36  import org.apache.hadoop.hbase.client.ResultScanner;
37  import org.apache.hadoop.hbase.client.Scan;
38  import org.apache.hadoop.hbase.client.Table;
39  import org.apache.hadoop.hbase.filter.Filter;
40  import org.apache.hadoop.hbase.filter.FilterList;
41  import org.apache.hadoop.hbase.filter.PrefixFilter;
42  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
43  import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
44  import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
45  import org.apache.hadoop.hbase.mapreduce.TableMapper;
46  import org.apache.hadoop.hbase.mapreduce.TableSplit;
47  import org.apache.hadoop.hbase.replication.ReplicationException;
48  import org.apache.hadoop.hbase.replication.ReplicationFactory;
49  import org.apache.hadoop.hbase.replication.ReplicationPeerZKImpl;
50  import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
51  import org.apache.hadoop.hbase.replication.ReplicationPeers;
52  import org.apache.hadoop.hbase.util.Bytes;
53  import org.apache.hadoop.hbase.util.Pair;
54  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
55  import org.apache.hadoop.mapreduce.Job;
56  import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
57  import org.apache.hadoop.util.Tool;
58  import org.apache.hadoop.util.ToolRunner;
59
60  /**
61   * This map-only job compares the data from a local table with a remote one.
62   * Every cell is compared and must have exactly the same keys (even timestamp)
63   * as well as same value. It is possible to restrict the job by time range and
64   * families. The peer id that's provided must match the one given when the
65   * replication stream was setup.
66   * <p>
67   * Two counters are provided, Verifier.Counters.GOODROWS and BADROWS. The reason
68   * for a why a row is different is shown in the map's log.
69   */
70  public class VerifyReplication extends Configured implements Tool {
71
72    private static final Log LOG =
73        LogFactory.getLog(VerifyReplication.class);
74
75    public final static String NAME = "verifyrep";
76    private final static String PEER_CONFIG_PREFIX = NAME + ".peer.";
77    static long startTime = 0;
78    static long endTime = Long.MAX_VALUE;
79    static int batch = Integer.MAX_VALUE;
80    static int versions = -1;
81    static String tableName = null;
82    static String families = null;
83    static String peerId = null;
84    static String rowPrefixes = null;
85
86    private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name";
87
88    /**
89     * Map-only comparator for 2 tables
90     */
91    public static class Verifier
92        extends TableMapper<ImmutableBytesWritable, Put> {
93  
94
95
96      public static enum Counters {
97        GOODROWS, BADROWS, ONLY_IN_SOURCE_TABLE_ROWS, ONLY_IN_PEER_TABLE_ROWS, CONTENT_DIFFERENT_ROWS}
98  
99      private Connection connection;
100     private Table replicatedTable;
101     private ResultScanner replicatedScanner;
102     private Result currentCompareRowInPeerTable;
103
104     /**
105      * Map method that compares every scanned row with the equivalent from
106      * a distant cluster.
107      * @param row  The current table row key.
108      * @param value  The columns.
109      * @param context  The current context.
110      * @throws IOException When something is broken with the data.
111      */
112     @Override
113     public void map(ImmutableBytesWritable row, final Result value,
114                     Context context)
115         throws IOException {
116       if (replicatedScanner == null) {
117         Configuration conf = context.getConfiguration();
118         final Scan scan = new Scan();
119         scan.setBatch(batch);
120         scan.setCacheBlocks(false);
121         scan.setCaching(conf.getInt(TableInputFormat.SCAN_CACHEDROWS, 1));
122         long startTime = conf.getLong(NAME + ".startTime", 0);
123         long endTime = conf.getLong(NAME + ".endTime", Long.MAX_VALUE);
124         String families = conf.get(NAME + ".families", null);
125         if(families != null) {
126           String[] fams = families.split(",");
127           for(String fam : fams) {
128             scan.addFamily(Bytes.toBytes(fam));
129           }
130         }
131         String rowPrefixes = conf.get(NAME + ".rowPrefixes", null);
132         setRowPrefixFilter(scan, rowPrefixes);
133         scan.setTimeRange(startTime, endTime);
134         int versions = conf.getInt(NAME+".versions", -1);
135         LOG.info("Setting number of version inside map as: " + versions);
136         if (versions >= 0) {
137           scan.setMaxVersions(versions);
138         }
139
140         final TableSplit tableSplit = (TableSplit)(context.getInputSplit());
141
142         String zkClusterKey = conf.get(NAME + ".peerQuorumAddress");
143         Configuration peerConf = HBaseConfiguration.createClusterConf(conf,
144             zkClusterKey, PEER_CONFIG_PREFIX);
145
146         TableName tableName = TableName.valueOf(conf.get(NAME + ".tableName"));
147         connection = ConnectionFactory.createConnection(peerConf);
148         replicatedTable = connection.getTable(tableName);
149         scan.setStartRow(value.getRow());
150         scan.setStopRow(tableSplit.getEndRow());
151         replicatedScanner = replicatedTable.getScanner(scan);
152         currentCompareRowInPeerTable = replicatedScanner.next();
153       }
154       while (true) {
155         if (currentCompareRowInPeerTable == null) {
156           // reach the region end of peer table, row only in source table
157           logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_SOURCE_TABLE_ROWS, value);
158           break;
159         }
160         int rowCmpRet = Bytes.compareTo(value.getRow(), currentCompareRowInPeerTable.getRow());
161         if (rowCmpRet == 0) {
162           // rowkey is same, need to compare the content of the row
163           try {
164             Result.compareResults(value, currentCompareRowInPeerTable);
165             context.getCounter(Counters.GOODROWS).increment(1);
166           } catch (Exception e) {
167             logFailRowAndIncreaseCounter(context, Counters.CONTENT_DIFFERENT_ROWS, value);
168             LOG.error("Exception while comparing row : " + e);
169           }
170           currentCompareRowInPeerTable = replicatedScanner.next();
171           break;
172         } else if (rowCmpRet < 0) {
173           // row only exists in source table
174           logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_SOURCE_TABLE_ROWS, value);
175           break;
176         } else {
177           // row only exists in peer table
178           logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_PEER_TABLE_ROWS,
179             currentCompareRowInPeerTable);
180           currentCompareRowInPeerTable = replicatedScanner.next();
181         }
182       }
183     }
184
185     private void logFailRowAndIncreaseCounter(Context context, Counters counter, Result row) {
186       context.getCounter(counter).increment(1);
187       context.getCounter(Counters.BADROWS).increment(1);
188       LOG.error(counter.toString() + ", rowkey=" + Bytes.toString(row.getRow()));
189     }
190
191     @Override
192     protected void cleanup(Context context) {
193       if (replicatedScanner != null) {
194         try {
195           while (currentCompareRowInPeerTable != null) {
196             logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_PEER_TABLE_ROWS,
197               currentCompareRowInPeerTable);
198             currentCompareRowInPeerTable = replicatedScanner.next();
199           }
200         } catch (Exception e) {
201           LOG.error("fail to scan peer table in cleanup", e);
202         } finally {
203           replicatedScanner.close();
204           replicatedScanner = null;
205         }
206       }
207       if(replicatedTable != null){
208         try{
209           replicatedTable.close();
210         } catch (Exception e) {
211           LOG.error("fail to close table in cleanup", e);
212         }
213       }
214       if(connection != null){
215         try {
216           connection.close();
217         } catch (Exception e) {
218           LOG.error("fail to close connection in cleanup", e);
219         }
220       }
221     }
222   }
223
224   private static Pair<ReplicationPeerConfig, Configuration> getPeerQuorumConfig(
225       final Configuration conf) throws IOException {
226     ZooKeeperWatcher localZKW = null;
227     ReplicationPeerZKImpl peer = null;
228     try {
229       localZKW = new ZooKeeperWatcher(conf, "VerifyReplication",
230           new Abortable() {
231             @Override public void abort(String why, Throwable e) {}
232             @Override public boolean isAborted() {return false;}
233           });
234
235       ReplicationPeers rp = ReplicationFactory.getReplicationPeers(localZKW, conf, localZKW);
236       rp.init();
237
238       Pair<ReplicationPeerConfig, Configuration> pair = rp.getPeerConf(peerId);
239       if (pair == null) {
240         throw new IOException("Couldn't get peer conf!");
241       }
242
243       return pair;
244     } catch (ReplicationException e) {
245       throw new IOException(
246           "An error occured while trying to connect to the remove peer cluster", e);
247     } finally {
248       if (peer != null) {
249         peer.close();
250       }
251       if (localZKW != null) {
252         localZKW.close();
253       }
254     }
255   }
256
257   /**
258    * Sets up the actual job.
259    *
260    * @param conf  The current configuration.
261    * @param args  The command line parameters.
262    * @return The newly created job.
263    * @throws java.io.IOException When setting up the job fails.
264    */
265   public static Job createSubmittableJob(Configuration conf, String[] args)
266   throws IOException {
267     if (!doCommandLine(args)) {
268       return null;
269     }
270     conf.set(NAME+".peerId", peerId);
271     conf.set(NAME+".tableName", tableName);
272     conf.setLong(NAME+".startTime", startTime);
273     conf.setLong(NAME+".endTime", endTime);
274     if (families != null) {
275       conf.set(NAME+".families", families);
276     }
277     if (rowPrefixes != null){
278       conf.set(NAME+".rowPrefixes", rowPrefixes);
279     }
280
281     Pair<ReplicationPeerConfig, Configuration> peerConfigPair = getPeerQuorumConfig(conf);
282     ReplicationPeerConfig peerConfig = peerConfigPair.getFirst();
283     String peerQuorumAddress = peerConfig.getClusterKey();
284     LOG.info("Peer Quorum Address: " + peerQuorumAddress + ", Peer Configuration: " +
285         peerConfig.getConfiguration());
286     conf.set(NAME + ".peerQuorumAddress", peerQuorumAddress);
287     HBaseConfiguration.setWithPrefix(conf, PEER_CONFIG_PREFIX,
288         peerConfig.getConfiguration().entrySet());
289
290     conf.setInt(NAME + ".versions", versions);
291     LOG.info("Number of version: " + versions);
292
293     Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + tableName));
294     job.setJarByClass(VerifyReplication.class);
295
296     Scan scan = new Scan();
297     scan.setTimeRange(startTime, endTime);
298     if (versions >= 0) {
299       scan.setMaxVersions(versions);
300       LOG.info("Number of versions set to " + versions);
301     }
302     if(families != null) {
303       String[] fams = families.split(",");
304       for(String fam : fams) {
305         scan.addFamily(Bytes.toBytes(fam));
306       }
307     }
308
309     setRowPrefixFilter(scan, rowPrefixes);
310
311     TableMapReduceUtil.initTableMapperJob(tableName, scan,
312         Verifier.class, null, null, job);
313
314     Configuration peerClusterConf = peerConfigPair.getSecond();
315     // Obtain the auth token from peer cluster
316     TableMapReduceUtil.initCredentialsForCluster(job, peerClusterConf);
317
318     job.setOutputFormatClass(NullOutputFormat.class);
319     job.setNumReduceTasks(0);
320     return job;
321   }
322
323   private static void setRowPrefixFilter(Scan scan, String rowPrefixes) {
324     if (rowPrefixes != null && !rowPrefixes.isEmpty()) {
325       String[] rowPrefixArray = rowPrefixes.split(",");
326       Arrays.sort(rowPrefixArray);
327       FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ONE);
328       for (String prefix : rowPrefixArray) {
329         Filter filter = new PrefixFilter(Bytes.toBytes(prefix));
330         filterList.addFilter(filter);
331       }
332       scan.setFilter(filterList);
333       byte[] startPrefixRow = Bytes.toBytes(rowPrefixArray[0]);
334       byte[] lastPrefixRow = Bytes.toBytes(rowPrefixArray[rowPrefixArray.length -1]);
335       setStartAndStopRows(scan, startPrefixRow, lastPrefixRow);
336     }
337   }
338
339   private static void setStartAndStopRows(Scan scan, byte[] startPrefixRow, byte[] lastPrefixRow) {
340     scan.setStartRow(startPrefixRow);
341     byte[] stopRow = Bytes.add(Bytes.head(lastPrefixRow, lastPrefixRow.length - 1),
342         new byte[]{(byte) (lastPrefixRow[lastPrefixRow.length - 1] + 1)});
343     scan.setStopRow(stopRow);
344   }
345
346   private static boolean doCommandLine(final String[] args) {
347     if (args.length < 2) {
348       printUsage(null);
349       return false;
350     }
351     //in case we've been run before, restore all parameters to their initial states
352     //Otherwise, if our previous run included a parameter not in args this time,
353     //we might hold on to the old value.
354     restoreDefaults();
355     try {
356       for (int i = 0; i < args.length; i++) {
357         String cmd = args[i];
358         if (cmd.equals("-h") || cmd.startsWith("--h")) {
359           printUsage(null);
360           return false;
361         }
362
363         final String startTimeArgKey = "--starttime=";
364         if (cmd.startsWith(startTimeArgKey)) {
365           startTime = Long.parseLong(cmd.substring(startTimeArgKey.length()));
366           continue;
367         }
368
369         final String endTimeArgKey = "--endtime=";
370         if (cmd.startsWith(endTimeArgKey)) {
371           endTime = Long.parseLong(cmd.substring(endTimeArgKey.length()));
372           continue;
373         }
374
375         final String versionsArgKey = "--versions=";
376         if (cmd.startsWith(versionsArgKey)) {
377           versions = Integer.parseInt(cmd.substring(versionsArgKey.length()));
378           continue;
379         }
380
381         final String batchArgKey = "--batch=";
382         if (cmd.startsWith(batchArgKey)) {
383           batch = Integer.parseInt(cmd.substring(batchArgKey.length()));
384           continue;
385         }
386
387         final String familiesArgKey = "--families=";
388         if (cmd.startsWith(familiesArgKey)) {
389           families = cmd.substring(familiesArgKey.length());
390           continue;
391         }
392
393         final String rowPrefixesKey = "--row-prefixes=";
394         if (cmd.startsWith(rowPrefixesKey)){
395           rowPrefixes = cmd.substring(rowPrefixesKey.length());
396           continue;
397         }
398
399         if (cmd.startsWith("--")) {
400           printUsage("Invalid argument '" + cmd + "'");
401         }
402
403         if (i == args.length-2) {
404           peerId = cmd;
405         }
406
407         if (i == args.length-1) {
408           tableName = cmd;
409         }
410       }
411     } catch (Exception e) {
412       e.printStackTrace();
413       printUsage("Can't start because " + e.getMessage());
414       return false;
415     }
416     return true;
417   }
418
419   private static void restoreDefaults() {
420     startTime = 0;
421     endTime = Long.MAX_VALUE;
422     batch = Integer.MAX_VALUE;
423     versions = -1;
424     tableName = null;
425     families = null;
426     peerId = null;
427     rowPrefixes = null;
428   }
429
430   /*
431    * @param errorMsg Error message.  Can be null.
432    */
433   private static void printUsage(final String errorMsg) {
434     if (errorMsg != null && errorMsg.length() > 0) {
435       System.err.println("ERROR: " + errorMsg);
436     }
437     System.err.println("Usage: verifyrep [--starttime=X]" +
438         " [--endtime=Y] [--families=A] [--row-prefixes=B] <peerid> <tablename>");
439     System.err.println();
440     System.err.println("Options:");
441     System.err.println(" starttime    beginning of the time range");
442     System.err.println("              without endtime means from starttime to forever");
443     System.err.println(" endtime      end of the time range");
444     System.err.println(" versions     number of cell versions to verify");
445     System.err.println(" families     comma-separated list of families to copy");
446     System.err.println(" row-prefixes comma-separated list of row key prefixes to filter on ");
447     System.err.println();
448     System.err.println("Args:");
449     System.err.println(" peerid       Id of the peer used for verification, must match the one given for replication");
450     System.err.println(" tablename    Name of the table to verify");
451     System.err.println();
452     System.err.println("Examples:");
453     System.err.println(" To verify the data replicated from TestTable for a 1 hour window with peer #5 ");
454     System.err.println(" $ bin/hbase " +
455         "org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication" +
456         " --starttime=1265875194289 --endtime=1265878794289 5 TestTable ");
457   }
458
459   @Override
460   public int run(String[] args) throws Exception {
461     Configuration conf = this.getConf();
462     Job job = createSubmittableJob(conf, args);
463     if (job != null) {
464       return job.waitForCompletion(true) ? 0 : 1;
465     }
466     return 1;
467   }
468
469   /**
470    * Main entry point.
471    *
472    * @param args  The command line parameters.
473    * @throws Exception When running the job fails.
474    */
475   public static void main(String[] args) throws Exception {
476     int res = ToolRunner.run(HBaseConfiguration.create(), new VerifyReplication(), args);
477     System.exit(res);
478   }
479 }