View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.mapreduce;
19  
20  import java.io.IOException;
21  import java.util.Iterator;
22  
23  import org.apache.commons.logging.Log;
24  import org.apache.commons.logging.LogFactory;
25  import org.apache.hadoop.conf.Configuration;
26  import org.apache.hadoop.conf.Configured;
27  import org.apache.hadoop.fs.FileStatus;
28  import org.apache.hadoop.fs.FileSystem;
29  import org.apache.hadoop.fs.Path;
30  import org.apache.hadoop.hbase.Cell;
31  import org.apache.hadoop.hbase.CellComparator;
32  import org.apache.hadoop.hbase.CellUtil;
33  import org.apache.hadoop.hbase.HBaseConfiguration;
34  import org.apache.hadoop.hbase.TableName;
35  import org.apache.hadoop.hbase.client.Connection;
36  import org.apache.hadoop.hbase.client.ConnectionFactory;
37  import org.apache.hadoop.hbase.client.Delete;
38  import org.apache.hadoop.hbase.client.Mutation;
39  import org.apache.hadoop.hbase.client.Put;
40  import org.apache.hadoop.hbase.client.Result;
41  import org.apache.hadoop.hbase.client.ResultScanner;
42  import org.apache.hadoop.hbase.client.Scan;
43  import org.apache.hadoop.hbase.client.Table;
44  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
45  import org.apache.hadoop.hbase.util.Bytes;
46  import org.apache.hadoop.mapreduce.Counters;
47  import org.apache.hadoop.mapreduce.Job;
48  import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
49  import org.apache.hadoop.util.GenericOptionsParser;
50  import org.apache.hadoop.util.Tool;
51  import org.apache.hadoop.util.ToolRunner;
52  
53  import com.google.common.base.Throwables;
54  import com.google.common.collect.Iterators;
55  
56  public class SyncTable extends Configured implements Tool {
57  
58    private static final Log LOG = LogFactory.getLog(SyncTable.class);
59  
60    static final String SOURCE_HASH_DIR_CONF_KEY = "sync.table.source.hash.dir";
61    static final String SOURCE_TABLE_CONF_KEY = "sync.table.source.table.name";
62    static final String TARGET_TABLE_CONF_KEY = "sync.table.target.table.name";
63    static final String SOURCE_ZK_CLUSTER_CONF_KEY = "sync.table.source.zk.cluster";
64    static final String TARGET_ZK_CLUSTER_CONF_KEY = "sync.table.target.zk.cluster";
65    static final String DRY_RUN_CONF_KEY="sync.table.dry.run";
66    
67    Path sourceHashDir;
68    String sourceTableName;
69    String targetTableName;
70    
71    String sourceZkCluster;
72    String targetZkCluster;
73    boolean dryRun;
74    
75    Counters counters;
76    
77    public SyncTable(Configuration conf) {
78      super(conf);
79    }
80    
81    public Job createSubmittableJob(String[] args) throws IOException {
82      FileSystem fs = sourceHashDir.getFileSystem(getConf());
83      if (!fs.exists(sourceHashDir)) {
84        throw new IOException("Source hash dir not found: " + sourceHashDir);
85      }
86      
87      HashTable.TableHash tableHash = HashTable.TableHash.read(getConf(), sourceHashDir);
88      LOG.info("Read source hash manifest: " + tableHash);
89      LOG.info("Read " + tableHash.partitions.size() + " partition keys");
90      if (!tableHash.tableName.equals(sourceTableName)) {
91        LOG.warn("Table name mismatch - manifest indicates hash was taken from: "
92            + tableHash.tableName + " but job is reading from: " + sourceTableName);
93      }
94      if (tableHash.numHashFiles != tableHash.partitions.size() + 1) {
95        throw new RuntimeException("Hash data appears corrupt. The number of of hash files created"
96            + " should be 1 more than the number of partition keys.  However, the manifest file "
97            + " says numHashFiles=" + tableHash.numHashFiles + " but the number of partition keys"
98            + " found in the partitions file is " + tableHash.partitions.size());
99      }
100     
101     Path dataDir = new Path(sourceHashDir, HashTable.HASH_DATA_DIR);
102     int dataSubdirCount = 0;
103     for (FileStatus file : fs.listStatus(dataDir)) {
104       if (file.getPath().getName().startsWith(HashTable.OUTPUT_DATA_FILE_PREFIX)) {
105         dataSubdirCount++;
106       }
107     }
108     
109     if (dataSubdirCount != tableHash.numHashFiles) {
110       throw new RuntimeException("Hash data appears corrupt. The number of of hash files created"
111           + " should be 1 more than the number of partition keys.  However, the number of data dirs"
112           + " found is " + dataSubdirCount + " but the number of partition keys"
113           + " found in the partitions file is " + tableHash.partitions.size());
114     }
115     
116     Job job = Job.getInstance(getConf(),getConf().get("mapreduce.job.name",
117         "syncTable_" + sourceTableName + "-" + targetTableName));
118     Configuration jobConf = job.getConfiguration();
119     job.setJarByClass(HashTable.class);
120     jobConf.set(SOURCE_HASH_DIR_CONF_KEY, sourceHashDir.toString());
121     jobConf.set(SOURCE_TABLE_CONF_KEY, sourceTableName);
122     jobConf.set(TARGET_TABLE_CONF_KEY, targetTableName);
123     if (sourceZkCluster != null) {
124       jobConf.set(SOURCE_ZK_CLUSTER_CONF_KEY, sourceZkCluster);
125     }
126     if (targetZkCluster != null) {
127       jobConf.set(TARGET_ZK_CLUSTER_CONF_KEY, targetZkCluster);
128     }
129     jobConf.setBoolean(DRY_RUN_CONF_KEY, dryRun);
130     
131     TableMapReduceUtil.initTableMapperJob(targetTableName, tableHash.initScan(),
132         SyncMapper.class, null, null, job);
133     
134     job.setNumReduceTasks(0);
135      
136     if (dryRun) {
137       job.setOutputFormatClass(NullOutputFormat.class);
138     } else {
139       // No reducers.  Just write straight to table.  Call initTableReducerJob
140       // because it sets up the TableOutputFormat.
141       TableMapReduceUtil.initTableReducerJob(targetTableName, null, job, null,
142           targetZkCluster, null, null);
143       
144       // would be nice to add an option for bulk load instead
145     }
146 
147     // Obtain an authentication token, for the specified cluster, on behalf of the current user
148     if (sourceZkCluster != null) {
149       Configuration peerConf =
150           HBaseConfiguration.createClusterConf(job.getConfiguration(), sourceZkCluster);
151       TableMapReduceUtil.initCredentialsForCluster(job, peerConf);
152     }
153     return job;
154   }
155   
156   public static class SyncMapper extends TableMapper<ImmutableBytesWritable, Mutation> {
157     Path sourceHashDir;
158     
159     Connection sourceConnection;
160     Connection targetConnection;
161     Table sourceTable;
162     Table targetTable;
163     boolean dryRun;
164     
165     HashTable.TableHash sourceTableHash;
166     HashTable.TableHash.Reader sourceHashReader;
167     ImmutableBytesWritable currentSourceHash;
168     ImmutableBytesWritable nextSourceKey;
169     HashTable.ResultHasher targetHasher;
170     
171     Throwable mapperException;
172      
173     public static enum Counter {BATCHES, HASHES_MATCHED, HASHES_NOT_MATCHED, SOURCEMISSINGROWS,
174       SOURCEMISSINGCELLS, TARGETMISSINGROWS, TARGETMISSINGCELLS, ROWSWITHDIFFS, DIFFERENTCELLVALUES,
175       MATCHINGROWS, MATCHINGCELLS, EMPTY_BATCHES, RANGESMATCHED, RANGESNOTMATCHED};
176     
177     @Override
178     protected void setup(Context context) throws IOException {
179       
180       Configuration conf = context.getConfiguration();
181       sourceHashDir = new Path(conf.get(SOURCE_HASH_DIR_CONF_KEY));
182       sourceConnection = openConnection(conf, SOURCE_ZK_CLUSTER_CONF_KEY, null);
183       targetConnection = openConnection(conf, TARGET_ZK_CLUSTER_CONF_KEY,
184           TableOutputFormat.OUTPUT_CONF_PREFIX);
185       sourceTable = openTable(sourceConnection, conf, SOURCE_TABLE_CONF_KEY);
186       targetTable = openTable(targetConnection, conf, TARGET_TABLE_CONF_KEY);
187       dryRun = conf.getBoolean(SOURCE_TABLE_CONF_KEY, false);
188       
189       sourceTableHash = HashTable.TableHash.read(conf, sourceHashDir);
190       LOG.info("Read source hash manifest: " + sourceTableHash);
191       LOG.info("Read " + sourceTableHash.partitions.size() + " partition keys");
192       
193       TableSplit split = (TableSplit) context.getInputSplit();
194       ImmutableBytesWritable splitStartKey = new ImmutableBytesWritable(split.getStartRow());
195       
196       sourceHashReader = sourceTableHash.newReader(conf, splitStartKey);
197       findNextKeyHashPair();
198       
199       // create a hasher, but don't start it right away
200       // instead, find the first hash batch at or after the start row
201       // and skip any rows that come before.  they will be caught by the previous task
202       targetHasher = new HashTable.ResultHasher();
203     }
204   
205     private static Connection openConnection(Configuration conf, String zkClusterConfKey,
206                                              String configPrefix)
207       throws IOException {
208         String zkCluster = conf.get(zkClusterConfKey);
209         Configuration clusterConf = HBaseConfiguration.createClusterConf(conf,
210             zkCluster, configPrefix);
211         return ConnectionFactory.createConnection(clusterConf);
212     }
213     
214     private static Table openTable(Connection connection, Configuration conf,
215         String tableNameConfKey) throws IOException {
216       return connection.getTable(TableName.valueOf(conf.get(tableNameConfKey)));
217     }
218     
219     /**
220      * Attempt to read the next source key/hash pair.
221      * If there are no more, set nextSourceKey to null
222      */
223     private void findNextKeyHashPair() throws IOException {
224       boolean hasNext = sourceHashReader.next();
225       if (hasNext) {
226         nextSourceKey = sourceHashReader.getCurrentKey();
227       } else {
228         // no more keys - last hash goes to the end
229         nextSourceKey = null;
230       }
231     }
232     
233     @Override
234     protected void map(ImmutableBytesWritable key, Result value, Context context)
235         throws IOException, InterruptedException {
236       try {
237         // first, finish any hash batches that end before the scanned row
238         while (nextSourceKey != null && key.compareTo(nextSourceKey) >= 0) {
239           moveToNextBatch(context);
240         }
241         
242         // next, add the scanned row (as long as we've reached the first batch)
243         if (targetHasher.isBatchStarted()) {
244           targetHasher.hashResult(value);
245         }
246       } catch (Throwable t) {
247         mapperException = t;
248         Throwables.propagateIfInstanceOf(t, IOException.class);
249         Throwables.propagateIfInstanceOf(t, InterruptedException.class);
250         Throwables.propagate(t);
251       }
252     }
253 
254     /**
255      * If there is an open hash batch, complete it and sync if there are diffs.
256      * Start a new batch, and seek to read the 
257      */
258     private void moveToNextBatch(Context context) throws IOException, InterruptedException {
259       if (targetHasher.isBatchStarted()) {
260         finishBatchAndCompareHashes(context);
261       }
262       targetHasher.startBatch(nextSourceKey);
263       currentSourceHash = sourceHashReader.getCurrentHash();
264       
265       findNextKeyHashPair();
266     }
267 
268     /**
269      * Finish the currently open hash batch.
270      * Compare the target hash to the given source hash.
271      * If they do not match, then sync the covered key range.
272      */
273     private void finishBatchAndCompareHashes(Context context)
274         throws IOException, InterruptedException {
275       targetHasher.finishBatch();
276       context.getCounter(Counter.BATCHES).increment(1);
277       if (targetHasher.getBatchSize() == 0) {
278         context.getCounter(Counter.EMPTY_BATCHES).increment(1);
279       }
280       ImmutableBytesWritable targetHash = targetHasher.getBatchHash();
281       if (targetHash.equals(currentSourceHash)) {
282         context.getCounter(Counter.HASHES_MATCHED).increment(1);
283       } else {
284         context.getCounter(Counter.HASHES_NOT_MATCHED).increment(1);
285         
286         ImmutableBytesWritable stopRow = nextSourceKey == null
287                                           ? new ImmutableBytesWritable(sourceTableHash.stopRow)
288                                           : nextSourceKey;
289         
290         if (LOG.isDebugEnabled()) {
291           LOG.debug("Hash mismatch.  Key range: " + toHex(targetHasher.getBatchStartKey())
292               + " to " + toHex(stopRow)
293               + " sourceHash: " + toHex(currentSourceHash)
294               + " targetHash: " + toHex(targetHash));
295         }
296         
297         syncRange(context, targetHasher.getBatchStartKey(), stopRow);
298       }
299     }
300     private static String toHex(ImmutableBytesWritable bytes) {
301       return Bytes.toHex(bytes.get(), bytes.getOffset(), bytes.getLength());
302     }
303     
304     private static final CellScanner EMPTY_CELL_SCANNER
305       = new CellScanner(Iterators.<Result>emptyIterator());
306     
307     /**
308      * Rescan the given range directly from the source and target tables.
309      * Count and log differences, and if this is not a dry run, output Puts and Deletes
310      * to make the target table match the source table for this range
311      */
312     private void syncRange(Context context, ImmutableBytesWritable startRow,
313         ImmutableBytesWritable stopRow) throws IOException, InterruptedException {
314       
315       Scan scan = sourceTableHash.initScan();
316       scan.setStartRow(startRow.copyBytes());
317       scan.setStopRow(stopRow.copyBytes());
318       
319       ResultScanner sourceScanner = sourceTable.getScanner(scan);
320       CellScanner sourceCells = new CellScanner(sourceScanner.iterator());
321 
322       ResultScanner targetScanner = targetTable.getScanner(scan);
323       CellScanner targetCells = new CellScanner(targetScanner.iterator());
324       
325       boolean rangeMatched = true;
326       byte[] nextSourceRow = sourceCells.nextRow();
327       byte[] nextTargetRow = targetCells.nextRow();
328       while(nextSourceRow != null || nextTargetRow != null) {
329         boolean rowMatched;
330         int rowComparison = compareRowKeys(nextSourceRow, nextTargetRow);
331         if (rowComparison < 0) {
332           if (LOG.isInfoEnabled()) {
333             LOG.info("Target missing row: " + Bytes.toHex(nextSourceRow));
334           }
335           context.getCounter(Counter.TARGETMISSINGROWS).increment(1);
336           
337           rowMatched = syncRowCells(context, nextSourceRow, sourceCells, EMPTY_CELL_SCANNER);
338           nextSourceRow = sourceCells.nextRow();  // advance only source to next row
339         } else if (rowComparison > 0) {
340           if (LOG.isInfoEnabled()) {
341             LOG.info("Source missing row: " + Bytes.toHex(nextTargetRow));
342           }
343           context.getCounter(Counter.SOURCEMISSINGROWS).increment(1);
344           
345           rowMatched = syncRowCells(context, nextTargetRow, EMPTY_CELL_SCANNER, targetCells);
346           nextTargetRow = targetCells.nextRow();  // advance only target to next row
347         } else {
348           // current row is the same on both sides, compare cell by cell
349           rowMatched = syncRowCells(context, nextSourceRow, sourceCells, targetCells);
350           nextSourceRow = sourceCells.nextRow();  
351           nextTargetRow = targetCells.nextRow();
352         }
353         
354         if (!rowMatched) {
355           rangeMatched = false;
356         }
357       }
358       
359       sourceScanner.close();
360       targetScanner.close();
361       
362       context.getCounter(rangeMatched ? Counter.RANGESMATCHED : Counter.RANGESNOTMATCHED)
363         .increment(1);
364     }
365     
366     private static class CellScanner {
367       private final Iterator<Result> results;
368       
369       private byte[] currentRow;
370       private Result currentRowResult;
371       private int nextCellInRow;
372       
373       private Result nextRowResult;
374       
375       public CellScanner(Iterator<Result> results) {
376         this.results = results;
377       }
378       
379       /**
380        * Advance to the next row and return its row key.
381        * Returns null iff there are no more rows.
382        */
383       public byte[] nextRow() {
384         if (nextRowResult == null) {
385           // no cached row - check scanner for more
386           while (results.hasNext()) {
387             nextRowResult = results.next();
388             Cell nextCell = nextRowResult.rawCells()[0];
389             if (currentRow == null
390                 || !Bytes.equals(currentRow, 0, currentRow.length, nextCell.getRowArray(),
391                 nextCell.getRowOffset(), nextCell.getRowLength())) {
392               // found next row
393               break;
394             } else {
395               // found another result from current row, keep scanning
396               nextRowResult = null;
397             }
398           }
399           
400           if (nextRowResult == null) {
401             // end of data, no more rows
402             currentRowResult = null;
403             currentRow = null;
404             return null;
405           }
406         }
407         
408         // advance to cached result for next row
409         currentRowResult = nextRowResult;
410         nextCellInRow = 0;
411         currentRow = currentRowResult.getRow();
412         nextRowResult = null;
413         return currentRow;
414       }
415       
416       /**
417        * Returns the next Cell in the current row or null iff none remain.
418        */
419       public Cell nextCellInRow() {
420         if (currentRowResult == null) {
421           // nothing left in current row
422           return null;
423         }
424         
425         Cell nextCell = currentRowResult.rawCells()[nextCellInRow];
426         nextCellInRow++;
427         if (nextCellInRow == currentRowResult.size()) {
428           if (results.hasNext()) {
429             Result result = results.next();
430             Cell cell = result.rawCells()[0];
431             if (Bytes.equals(currentRow, 0, currentRow.length, cell.getRowArray(),
432                 cell.getRowOffset(), cell.getRowLength())) {
433               // result is part of current row
434               currentRowResult = result;
435               nextCellInRow = 0;
436             } else {
437               // result is part of next row, cache it
438               nextRowResult = result;
439               // current row is complete
440               currentRowResult = null;
441             }
442           } else {
443             // end of data
444             currentRowResult = null;
445           }
446         }
447         return nextCell;
448       }
449     }
450        
451     /**
452      * Compare the cells for the given row from the source and target tables.
453      * Count and log any differences.
454      * If not a dry run, output a Put and/or Delete needed to sync the target table
455      * to match the source table.
456      */
457     private boolean syncRowCells(Context context, byte[] rowKey, CellScanner sourceCells,
458         CellScanner targetCells) throws IOException, InterruptedException {
459       Put put = null;
460       Delete delete = null;
461       long matchingCells = 0;
462       boolean matchingRow = true;
463       Cell sourceCell = sourceCells.nextCellInRow();
464       Cell targetCell = targetCells.nextCellInRow();
465       while (sourceCell != null || targetCell != null) {
466 
467         int cellKeyComparison = compareCellKeysWithinRow(sourceCell, targetCell);
468         if (cellKeyComparison < 0) {
469           if (LOG.isDebugEnabled()) {
470             LOG.debug("Target missing cell: " + sourceCell);
471           }
472           context.getCounter(Counter.TARGETMISSINGCELLS).increment(1);
473           matchingRow = false;
474           
475           if (!dryRun) {
476             if (put == null) {
477               put = new Put(rowKey);
478             }
479             put.add(sourceCell);
480           }
481           
482           sourceCell = sourceCells.nextCellInRow();
483         } else if (cellKeyComparison > 0) {
484           if (LOG.isDebugEnabled()) {
485             LOG.debug("Source missing cell: " + targetCell);
486           }
487           context.getCounter(Counter.SOURCEMISSINGCELLS).increment(1);
488           matchingRow = false;
489           
490           if (!dryRun) {
491             if (delete == null) {
492               delete = new Delete(rowKey);
493             }
494             // add a tombstone to exactly match the target cell that is missing on the source
495             delete.addColumn(CellUtil.cloneFamily(targetCell),
496                 CellUtil.cloneQualifier(targetCell), targetCell.getTimestamp());
497           }
498           
499           targetCell = targetCells.nextCellInRow();
500         } else {
501           // the cell keys are equal, now check values
502           if (CellUtil.matchingValue(sourceCell, targetCell)) {
503             matchingCells++;
504           } else {
505             if (LOG.isDebugEnabled()) {
506               LOG.debug("Different values: ");
507               LOG.debug("  source cell: " + sourceCell
508                   + " value: " + Bytes.toHex(sourceCell.getValueArray(),
509                       sourceCell.getValueOffset(), sourceCell.getValueLength()));
510               LOG.debug("  target cell: " + targetCell
511                   + " value: " + Bytes.toHex(targetCell.getValueArray(),
512                       targetCell.getValueOffset(), targetCell.getValueLength()));
513             }
514             context.getCounter(Counter.DIFFERENTCELLVALUES).increment(1);
515             matchingRow = false;
516             
517             if (!dryRun) {
518               // overwrite target cell
519               if (put == null) {
520                 put = new Put(rowKey);
521               }
522               put.add(sourceCell);
523             }
524           }
525           sourceCell = sourceCells.nextCellInRow();
526           targetCell = targetCells.nextCellInRow();
527         }
528         
529         if (!dryRun && sourceTableHash.scanBatch > 0) {
530           if (put != null && put.size() >= sourceTableHash.scanBatch) {
531             context.write(new ImmutableBytesWritable(rowKey), put);
532             put = null;
533           }
534           if (delete != null && delete.size() >= sourceTableHash.scanBatch) {
535             context.write(new ImmutableBytesWritable(rowKey), delete);
536             delete = null;
537           }
538         }
539       }
540       
541       if (!dryRun) {
542         if (put != null) {
543           context.write(new ImmutableBytesWritable(rowKey), put);
544         }
545         if (delete != null) {
546           context.write(new ImmutableBytesWritable(rowKey), delete);
547         }
548       }
549       
550       if (matchingCells > 0) {
551         context.getCounter(Counter.MATCHINGCELLS).increment(matchingCells);
552       }
553       if (matchingRow) {
554         context.getCounter(Counter.MATCHINGROWS).increment(1);
555         return true;
556       } else {
557         context.getCounter(Counter.ROWSWITHDIFFS).increment(1);
558         return false;
559       }
560     }
561 
562     private static final CellComparator cellComparator = new CellComparator();
563     /**
564      * Compare row keys of the given Result objects.
565      * Nulls are after non-nulls
566      */
567     private static int compareRowKeys(byte[] r1, byte[] r2) {
568       if (r1 == null) {
569         return 1;  // source missing row
570       } else if (r2 == null) {
571         return -1; // target missing row
572       } else {
573         return cellComparator.compareRows(r1, 0, r1.length, r2, 0, r2.length);
574       }
575     }
576 
577     /**
578      * Compare families, qualifiers, and timestamps of the given Cells.
579      * They are assumed to be of the same row.
580      * Nulls are after non-nulls.
581      */
582      private static int compareCellKeysWithinRow(Cell c1, Cell c2) {
583       if (c1 == null) {
584         return 1; // source missing cell
585       }
586       if (c2 == null) {
587         return -1; // target missing cell
588       }
589       
590       int result = CellComparator.compareFamilies(c1, c2);
591       if (result != 0) {
592         return result;
593       }
594       
595       result = CellComparator.compareQualifiers(c1, c2);
596       if (result != 0) {
597         return result;
598       }
599       
600       // note timestamp comparison is inverted - more recent cells first
601       return CellComparator.compareTimestamps(c1, c2);
602     }
603      
604     @Override
605     protected void cleanup(Context context)
606         throws IOException, InterruptedException {
607       if (mapperException == null) {
608         try {
609           finishRemainingHashRanges(context);
610         } catch (Throwable t) {
611           mapperException = t;
612         }
613       }
614       
615       try {
616         sourceTable.close();
617         targetTable.close();
618         sourceConnection.close();
619         targetConnection.close();
620       } catch (Throwable t) {
621         if (mapperException == null) {
622           mapperException = t;
623         } else {
624           LOG.error("Suppressing exception from closing tables", t);
625         }
626       }
627       
628       // propagate first exception
629       if (mapperException != null) {
630         Throwables.propagateIfInstanceOf(mapperException, IOException.class);
631         Throwables.propagateIfInstanceOf(mapperException, InterruptedException.class);
632         Throwables.propagate(mapperException);
633       }
634     }
635 
636     private void finishRemainingHashRanges(Context context) throws IOException,
637         InterruptedException {
638       TableSplit split = (TableSplit) context.getInputSplit();
639       byte[] splitEndRow = split.getEndRow();
640       boolean reachedEndOfTable = HashTable.isTableEndRow(splitEndRow);
641 
642       // if there are more hash batches that begin before the end of this split move to them
643       while (nextSourceKey != null
644           && (nextSourceKey.compareTo(splitEndRow) < 0 || reachedEndOfTable)) {
645         moveToNextBatch(context);
646       }
647       
648       if (targetHasher.isBatchStarted()) {
649         // need to complete the final open hash batch
650 
651         if ((nextSourceKey != null && nextSourceKey.compareTo(splitEndRow) > 0)
652               || (nextSourceKey == null && !Bytes.equals(splitEndRow, sourceTableHash.stopRow))) {
653           // the open hash range continues past the end of this region
654           // add a scan to complete the current hash range
655           Scan scan = sourceTableHash.initScan();
656           scan.setStartRow(splitEndRow);
657           if (nextSourceKey == null) {
658             scan.setStopRow(sourceTableHash.stopRow);
659           } else {
660             scan.setStopRow(nextSourceKey.copyBytes());
661           }
662           
663           ResultScanner targetScanner = null;
664           try {
665             targetScanner = targetTable.getScanner(scan);
666             for (Result row : targetScanner) {
667               targetHasher.hashResult(row);
668             }
669           } finally {
670             if (targetScanner != null) {
671               targetScanner.close();
672             }
673           }
674         } // else current batch ends exactly at split end row
675 
676         finishBatchAndCompareHashes(context);
677       }
678     }
679   }
680   
681   private static final int NUM_ARGS = 3;
682   private static void printUsage(final String errorMsg) {
683     if (errorMsg != null && errorMsg.length() > 0) {
684       System.err.println("ERROR: " + errorMsg);
685       System.err.println();
686     }
687     System.err.println("Usage: SyncTable [options] <sourcehashdir> <sourcetable> <targettable>");
688     System.err.println();
689     System.err.println("Options:");
690     
691     System.err.println(" sourcezkcluster  ZK cluster key of the source table");
692     System.err.println("                  (defaults to cluster in classpath's config)");
693     System.err.println(" targetzkcluster  ZK cluster key of the target table");
694     System.err.println("                  (defaults to cluster in classpath's config)");
695     System.err.println(" dryrun           if true, output counters but no writes");
696     System.err.println("                  (defaults to false)");
697     System.err.println();
698     System.err.println("Args:");
699     System.err.println(" sourcehashdir    path to HashTable output dir for source table");
700     System.err.println("                  (see org.apache.hadoop.hbase.mapreduce.HashTable)");
701     System.err.println(" sourcetable      Name of the source table to sync from");
702     System.err.println(" targettable      Name of the target table to sync to");
703     System.err.println();
704     System.err.println("Examples:");
705     System.err.println(" For a dry run SyncTable of tableA from a remote source cluster");
706     System.err.println(" to a local target cluster:");
707     System.err.println(" $ bin/hbase " +
708         "org.apache.hadoop.hbase.mapreduce.SyncTable --dryrun=true"
709         + " --sourcezkcluster=zk1.example.com,zk2.example.com,zk3.example.com:2181:/hbase"
710         + " hdfs://nn:9000/hashes/tableA tableA tableA");
711   }
712   
713   private boolean doCommandLine(final String[] args) {
714     if (args.length < NUM_ARGS) {
715       printUsage(null);
716       return false;
717     }
718     try {
719       sourceHashDir = new Path(args[args.length - 3]);
720       sourceTableName = args[args.length - 2];
721       targetTableName = args[args.length - 1];
722             
723       for (int i = 0; i < args.length - NUM_ARGS; i++) {
724         String cmd = args[i];
725         if (cmd.equals("-h") || cmd.startsWith("--h")) {
726           printUsage(null);
727           return false;
728         }
729         
730         final String sourceZkClusterKey = "--sourcezkcluster=";
731         if (cmd.startsWith(sourceZkClusterKey)) {
732           sourceZkCluster = cmd.substring(sourceZkClusterKey.length());
733           continue;
734         }
735         
736         final String targetZkClusterKey = "--targetzkcluster=";
737         if (cmd.startsWith(targetZkClusterKey)) {
738           targetZkCluster = cmd.substring(targetZkClusterKey.length());
739           continue;
740         }
741         
742         final String dryRunKey = "--dryrun=";
743         if (cmd.startsWith(dryRunKey)) {
744           dryRun = Boolean.parseBoolean(cmd.substring(dryRunKey.length()));
745           continue;
746         }
747         
748         printUsage("Invalid argument '" + cmd + "'");
749         return false;
750       }
751 
752       
753     } catch (Exception e) {
754       e.printStackTrace();
755       printUsage("Can't start because " + e.getMessage());
756       return false;
757     }
758     return true;
759   }
760   
761   /**
762    * Main entry point.
763    */
764   public static void main(String[] args) throws Exception {
765     int ret = ToolRunner.run(new SyncTable(HBaseConfiguration.create()), args);
766     System.exit(ret);
767   }
768 
769   @Override
770   public int run(String[] args) throws Exception {
771     String[] otherArgs = new GenericOptionsParser(getConf(), args).getRemainingArgs();
772     if (!doCommandLine(otherArgs)) {
773       return 1;
774     }
775 
776     Job job = createSubmittableJob(otherArgs);
777     if (!job.waitForCompletion(true)) {
778       LOG.info("Map-reduce job failed!");
779       return 1;
780     }
781     counters = job.getCounters();
782     return 0;
783   }
784 
785 }