View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.mapreduce.replication;
20  
21  import java.io.IOException;
22  import java.util.Arrays;
23
24  import org.apache.commons.logging.Log;
25  import org.apache.commons.logging.LogFactory;
26  import org.apache.hadoop.conf.Configuration;
27  import org.apache.hadoop.conf.Configured;
28  import org.apache.hadoop.hbase.Abortable;
29  import org.apache.hadoop.hbase.HBaseConfiguration;
30  import org.apache.hadoop.hbase.HConstants;
31  import org.apache.hadoop.hbase.TableName;
32  import org.apache.hadoop.hbase.client.Connection;
33  import org.apache.hadoop.hbase.client.ConnectionFactory;
34  import org.apache.hadoop.hbase.client.Put;
35  import org.apache.hadoop.hbase.client.Result;
36  import org.apache.hadoop.hbase.client.ResultScanner;
37  import org.apache.hadoop.hbase.client.Scan;
38  import org.apache.hadoop.hbase.client.Table;
39  import org.apache.hadoop.hbase.filter.Filter;
40  import org.apache.hadoop.hbase.filter.FilterList;
41  import org.apache.hadoop.hbase.filter.PrefixFilter;
42  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
43  import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
44  import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
45  import org.apache.hadoop.hbase.mapreduce.TableMapper;
46  import org.apache.hadoop.hbase.mapreduce.TableSplit;
47  import org.apache.hadoop.hbase.replication.ReplicationException;
48  import org.apache.hadoop.hbase.replication.ReplicationFactory;
49  import org.apache.hadoop.hbase.replication.ReplicationPeerZKImpl;
50  import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
51  import org.apache.hadoop.hbase.replication.ReplicationPeers;
52  import org.apache.hadoop.hbase.util.Bytes;
53  import org.apache.hadoop.hbase.util.Pair;
54  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
55  import org.apache.hadoop.mapreduce.Job;
56  import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
57  import org.apache.hadoop.util.Tool;
58  import org.apache.hadoop.util.ToolRunner;
59
60  /**
61   * This map-only job compares the data from a local table with a remote one.
62   * Every cell is compared and must have exactly the same keys (even timestamp)
63   * as well as same value. It is possible to restrict the job by time range and
64   * families. The peer id that's provided must match the one given when the
65   * replication stream was setup.
66   * <p>
67   * Two counters are provided, Verifier.Counters.GOODROWS and BADROWS. The reason
68   * for a why a row is different is shown in the map's log.
69   */
70  public class VerifyReplication extends Configured implements Tool {
71
72    private static final Log LOG =
73        LogFactory.getLog(VerifyReplication.class);
74
75    public final static String NAME = "verifyrep";
76    private final static String PEER_CONFIG_PREFIX = NAME + ".peer.";
77    static long startTime = 0;
78    static long endTime = Long.MAX_VALUE;
79    static int batch = Integer.MAX_VALUE;
80    static int versions = -1;
81    static String tableName = null;
82    static String families = null;
83    static String delimiter = "";
84    static String peerId = null;
85    static String rowPrefixes = null;
86
87    private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name";
88
89    /**
90     * Map-only comparator for 2 tables
91     */
92    public static class Verifier
93        extends TableMapper<ImmutableBytesWritable, Put> {
94
95
96
97      public static enum Counters {
98        GOODROWS, BADROWS, ONLY_IN_SOURCE_TABLE_ROWS, ONLY_IN_PEER_TABLE_ROWS, CONTENT_DIFFERENT_ROWS}
99
100     private Connection connection;
101     private Table replicatedTable;
102     private ResultScanner replicatedScanner;
103     private Result currentCompareRowInPeerTable;
104
105     /**
106      * Map method that compares every scanned row with the equivalent from
107      * a distant cluster.
108      * @param row  The current table row key.
109      * @param value  The columns.
110      * @param context  The current context.
111      * @throws IOException When something is broken with the data.
112      */
113     @Override
114     public void map(ImmutableBytesWritable row, final Result value,
115                     Context context)
116         throws IOException {
117       if (replicatedScanner == null) {
118         Configuration conf = context.getConfiguration();
119         final Scan scan = new Scan();
120         scan.setBatch(batch);
121         scan.setCacheBlocks(false);
122         scan.setCaching(conf.getInt(TableInputFormat.SCAN_CACHEDROWS, 1));
123         long startTime = conf.getLong(NAME + ".startTime", 0);
124         long endTime = conf.getLong(NAME + ".endTime", Long.MAX_VALUE);
125         String families = conf.get(NAME + ".families", null);
126         if(families != null) {
127           String[] fams = families.split(",");
128           for(String fam : fams) {
129             scan.addFamily(Bytes.toBytes(fam));
130           }
131         }
132         String rowPrefixes = conf.get(NAME + ".rowPrefixes", null);
133         setRowPrefixFilter(scan, rowPrefixes);
134         scan.setTimeRange(startTime, endTime);
135         int versions = conf.getInt(NAME+".versions", -1);
136         LOG.info("Setting number of version inside map as: " + versions);
137         if (versions >= 0) {
138           scan.setMaxVersions(versions);
139         }
140
141         final TableSplit tableSplit = (TableSplit)(context.getInputSplit());
142
143         String zkClusterKey = conf.get(NAME + ".peerQuorumAddress");
144         Configuration peerConf = HBaseConfiguration.createClusterConf(conf,
145             zkClusterKey, PEER_CONFIG_PREFIX);
146
147         TableName tableName = TableName.valueOf(conf.get(NAME + ".tableName"));
148         connection = ConnectionFactory.createConnection(peerConf);
149         replicatedTable = connection.getTable(tableName);
150         scan.setStartRow(value.getRow());
151         scan.setStopRow(tableSplit.getEndRow());
152         replicatedScanner = replicatedTable.getScanner(scan);
153         currentCompareRowInPeerTable = replicatedScanner.next();
154       }
155       while (true) {
156         if (currentCompareRowInPeerTable == null) {
157           // reach the region end of peer table, row only in source table
158           logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_SOURCE_TABLE_ROWS, value);
159           break;
160         }
161         int rowCmpRet = Bytes.compareTo(value.getRow(), currentCompareRowInPeerTable.getRow());
162         if (rowCmpRet == 0) {
163           // rowkey is same, need to compare the content of the row
164           try {
165             Result.compareResults(value, currentCompareRowInPeerTable);
166             context.getCounter(Counters.GOODROWS).increment(1);
167           } catch (Exception e) {
168             logFailRowAndIncreaseCounter(context, Counters.CONTENT_DIFFERENT_ROWS, value);
169             LOG.error("Exception while comparing row : " + e);
170           }
171           currentCompareRowInPeerTable = replicatedScanner.next();
172           break;
173         } else if (rowCmpRet < 0) {
174           // row only exists in source table
175           logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_SOURCE_TABLE_ROWS, value);
176           break;
177         } else {
178           // row only exists in peer table
179           logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_PEER_TABLE_ROWS,
180             currentCompareRowInPeerTable);
181           currentCompareRowInPeerTable = replicatedScanner.next();
182         }
183       }
184     }
185
186     private void logFailRowAndIncreaseCounter(Context context, Counters counter, Result row) {
187       context.getCounter(counter).increment(1);
188       context.getCounter(Counters.BADROWS).increment(1);
189       LOG.error(counter.toString() + ", rowkey=" + delimiter + Bytes.toString(row.getRow()) +
190           delimiter);
191     }
192
193     @Override
194     protected void cleanup(Context context) {
195       if (replicatedScanner != null) {
196         try {
197           while (currentCompareRowInPeerTable != null) {
198             logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_PEER_TABLE_ROWS,
199               currentCompareRowInPeerTable);
200             currentCompareRowInPeerTable = replicatedScanner.next();
201           }
202         } catch (Exception e) {
203           LOG.error("fail to scan peer table in cleanup", e);
204         } finally {
205           replicatedScanner.close();
206           replicatedScanner = null;
207         }
208       }
209       if(replicatedTable != null){
210         try{
211           replicatedTable.close();
212         } catch (Exception e) {
213           LOG.error("fail to close table in cleanup", e);
214         }
215       }
216       if(connection != null){
217         try {
218           connection.close();
219         } catch (Exception e) {
220           LOG.error("fail to close connection in cleanup", e);
221         }
222       }
223     }
224   }
225
226   private static Pair<ReplicationPeerConfig, Configuration> getPeerQuorumConfig(
227       final Configuration conf) throws IOException {
228     ZooKeeperWatcher localZKW = null;
229     ReplicationPeerZKImpl peer = null;
230     try {
231       localZKW = new ZooKeeperWatcher(conf, "VerifyReplication",
232           new Abortable() {
233             @Override public void abort(String why, Throwable e) {}
234             @Override public boolean isAborted() {return false;}
235           });
236
237       ReplicationPeers rp = ReplicationFactory.getReplicationPeers(localZKW, conf, localZKW);
238       rp.init();
239
240       Pair<ReplicationPeerConfig, Configuration> pair = rp.getPeerConf(peerId);
241       if (pair == null) {
242         throw new IOException("Couldn't get peer conf!");
243       }
244
245       return pair;
246     } catch (ReplicationException e) {
247       throw new IOException(
248           "An error occured while trying to connect to the remove peer cluster", e);
249     } finally {
250       if (peer != null) {
251         peer.close();
252       }
253       if (localZKW != null) {
254         localZKW.close();
255       }
256     }
257   }
258
259   /**
260    * Sets up the actual job.
261    *
262    * @param conf  The current configuration.
263    * @param args  The command line parameters.
264    * @return The newly created job.
265    * @throws java.io.IOException When setting up the job fails.
266    */
267   public static Job createSubmittableJob(Configuration conf, String[] args)
268   throws IOException {
269     if (!doCommandLine(args)) {
270       return null;
271     }
272     conf.set(NAME+".peerId", peerId);
273     conf.set(NAME+".tableName", tableName);
274     conf.setLong(NAME+".startTime", startTime);
275     conf.setLong(NAME+".endTime", endTime);
276     if (families != null) {
277       conf.set(NAME+".families", families);
278     }
279     if (rowPrefixes != null){
280       conf.set(NAME+".rowPrefixes", rowPrefixes);
281     }
282
283     Pair<ReplicationPeerConfig, Configuration> peerConfigPair = getPeerQuorumConfig(conf);
284     ReplicationPeerConfig peerConfig = peerConfigPair.getFirst();
285     String peerQuorumAddress = peerConfig.getClusterKey();
286     LOG.info("Peer Quorum Address: " + peerQuorumAddress + ", Peer Configuration: " +
287         peerConfig.getConfiguration());
288     conf.set(NAME + ".peerQuorumAddress", peerQuorumAddress);
289     HBaseConfiguration.setWithPrefix(conf, PEER_CONFIG_PREFIX,
290         peerConfig.getConfiguration().entrySet());
291
292     conf.setInt(NAME + ".versions", versions);
293     LOG.info("Number of version: " + versions);
294
295     Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + tableName));
296     job.setJarByClass(VerifyReplication.class);
297
298     Scan scan = new Scan();
299     scan.setTimeRange(startTime, endTime);
300     if (versions >= 0) {
301       scan.setMaxVersions(versions);
302       LOG.info("Number of versions set to " + versions);
303     }
304     if(families != null) {
305       String[] fams = families.split(",");
306       for(String fam : fams) {
307         scan.addFamily(Bytes.toBytes(fam));
308       }
309     }
310
311     setRowPrefixFilter(scan, rowPrefixes);
312
313     TableMapReduceUtil.initTableMapperJob(tableName, scan,
314         Verifier.class, null, null, job);
315
316     Configuration peerClusterConf = peerConfigPair.getSecond();
317     // Obtain the auth token from peer cluster
318     TableMapReduceUtil.initCredentialsForCluster(job, peerClusterConf);
319
320     job.setOutputFormatClass(NullOutputFormat.class);
321     job.setNumReduceTasks(0);
322     return job;
323   }
324
325   private static void setRowPrefixFilter(Scan scan, String rowPrefixes) {
326     if (rowPrefixes != null && !rowPrefixes.isEmpty()) {
327       String[] rowPrefixArray = rowPrefixes.split(",");
328       Arrays.sort(rowPrefixArray);
329       FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ONE);
330       for (String prefix : rowPrefixArray) {
331         Filter filter = new PrefixFilter(Bytes.toBytes(prefix));
332         filterList.addFilter(filter);
333       }
334       scan.setFilter(filterList);
335       byte[] startPrefixRow = Bytes.toBytes(rowPrefixArray[0]);
336       byte[] lastPrefixRow = Bytes.toBytes(rowPrefixArray[rowPrefixArray.length -1]);
337       setStartAndStopRows(scan, startPrefixRow, lastPrefixRow);
338     }
339   }
340
341   private static void setStartAndStopRows(Scan scan, byte[] startPrefixRow, byte[] lastPrefixRow) {
342     scan.setStartRow(startPrefixRow);
343     byte[] stopRow = Bytes.add(Bytes.head(lastPrefixRow, lastPrefixRow.length - 1),
344         new byte[]{(byte) (lastPrefixRow[lastPrefixRow.length - 1] + 1)});
345     scan.setStopRow(stopRow);
346   }
347
348   private static boolean doCommandLine(final String[] args) {
349     if (args.length < 2) {
350       printUsage(null);
351       return false;
352     }
353     //in case we've been run before, restore all parameters to their initial states
354     //Otherwise, if our previous run included a parameter not in args this time,
355     //we might hold on to the old value.
356     restoreDefaults();
357     try {
358       for (int i = 0; i < args.length; i++) {
359         String cmd = args[i];
360         if (cmd.equals("-h") || cmd.startsWith("--h")) {
361           printUsage(null);
362           return false;
363         }
364
365         final String startTimeArgKey = "--starttime=";
366         if (cmd.startsWith(startTimeArgKey)) {
367           startTime = Long.parseLong(cmd.substring(startTimeArgKey.length()));
368           continue;
369         }
370
371         final String endTimeArgKey = "--endtime=";
372         if (cmd.startsWith(endTimeArgKey)) {
373           endTime = Long.parseLong(cmd.substring(endTimeArgKey.length()));
374           continue;
375         }
376
377         final String versionsArgKey = "--versions=";
378         if (cmd.startsWith(versionsArgKey)) {
379           versions = Integer.parseInt(cmd.substring(versionsArgKey.length()));
380           continue;
381         }
382
383         final String batchArgKey = "--batch=";
384         if (cmd.startsWith(batchArgKey)) {
385           batch = Integer.parseInt(cmd.substring(batchArgKey.length()));
386           continue;
387         }
388
389         final String familiesArgKey = "--families=";
390         if (cmd.startsWith(familiesArgKey)) {
391           families = cmd.substring(familiesArgKey.length());
392           continue;
393         }
394
395         final String rowPrefixesKey = "--row-prefixes=";
396         if (cmd.startsWith(rowPrefixesKey)){
397           rowPrefixes = cmd.substring(rowPrefixesKey.length());
398           continue;
399         }
400
401         if (cmd.startsWith("--")) {
402           printUsage("Invalid argument '" + cmd + "'");
403         }
404
405         final String delimiterArgKey = "--delimiter=";
406         if (cmd.startsWith(delimiterArgKey)) {
407           delimiter = cmd.substring(delimiterArgKey.length());
408           continue;
409         }
410
411         if (i == args.length-2) {
412           peerId = cmd;
413         }
414
415         if (i == args.length-1) {
416           tableName = cmd;
417         }
418       }
419     } catch (Exception e) {
420       e.printStackTrace();
421       printUsage("Can't start because " + e.getMessage());
422       return false;
423     }
424     return true;
425   }
426
427   private static void restoreDefaults() {
428     startTime = 0;
429     endTime = Long.MAX_VALUE;
430     batch = Integer.MAX_VALUE;
431     versions = -1;
432     tableName = null;
433     families = null;
434     peerId = null;
435     rowPrefixes = null;
436   }
437
438   /*
439    * @param errorMsg Error message.  Can be null.
440    */
441   private static void printUsage(final String errorMsg) {
442     if (errorMsg != null && errorMsg.length() > 0) {
443       System.err.println("ERROR: " + errorMsg);
444     }
445     System.err.println("Usage: verifyrep [--starttime=X]" +
446         " [--endtime=Y] [--families=A] [--row-prefixes=B] [--delimiter=] <peerid> <tablename>");
447     System.err.println();
448     System.err.println("Options:");
449     System.err.println(" starttime    beginning of the time range");
450     System.err.println("              without endtime means from starttime to forever");
451     System.err.println(" endtime      end of the time range");
452     System.err.println(" versions     number of cell versions to verify");
453     System.err.println(" families     comma-separated list of families to copy");
454     System.err.println(" row-prefixes comma-separated list of row key prefixes to filter on ");
455     System.err.println(" delimiter    the delimiter used in display around rowkey");
456     System.err.println();
457     System.err.println("Args:");
458     System.err.println(" peerid       Id of the peer used for verification, must match the one given for replication");
459     System.err.println(" tablename    Name of the table to verify");
460     System.err.println();
461     System.err.println("Examples:");
462     System.err.println(" To verify the data replicated from TestTable for a 1 hour window with peer #5 ");
463     System.err.println(" $ bin/hbase " +
464         "org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication" +
465         " --starttime=1265875194289 --endtime=1265878794289 5 TestTable ");
466   }
467
468   @Override
469   public int run(String[] args) throws Exception {
470     Configuration conf = this.getConf();
471     Job job = createSubmittableJob(conf, args);
472     if (job != null) {
473       return job.waitForCompletion(true) ? 0 : 1;
474     }
475     return 1;
476   }
477
478   /**
479    * Main entry point.
480    *
481    * @param args  The command line parameters.
482    * @throws Exception When running the job fails.
483    */
484   public static void main(String[] args) throws Exception {
485     int res = ToolRunner.run(HBaseConfiguration.create(), new VerifyReplication(), args);
486     System.exit(res);
487   }
488 }