View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.mapreduce;
19  
20  import java.io.IOException;
21  import java.util.Iterator;
22  
23  import org.apache.commons.logging.Log;
24  import org.apache.commons.logging.LogFactory;
25  import org.apache.hadoop.conf.Configuration;
26  import org.apache.hadoop.conf.Configured;
27  import org.apache.hadoop.fs.FileStatus;
28  import org.apache.hadoop.fs.FileSystem;
29  import org.apache.hadoop.fs.Path;
30  import org.apache.hadoop.hbase.Cell;
31  import org.apache.hadoop.hbase.CellComparator;
32  import org.apache.hadoop.hbase.CellUtil;
33  import org.apache.hadoop.hbase.HBaseConfiguration;
34  import org.apache.hadoop.hbase.TableName;
35  import org.apache.hadoop.hbase.client.Connection;
36  import org.apache.hadoop.hbase.client.ConnectionFactory;
37  import org.apache.hadoop.hbase.client.Delete;
38  import org.apache.hadoop.hbase.client.Mutation;
39  import org.apache.hadoop.hbase.client.Put;
40  import org.apache.hadoop.hbase.client.Result;
41  import org.apache.hadoop.hbase.client.ResultScanner;
42  import org.apache.hadoop.hbase.client.Scan;
43  import org.apache.hadoop.hbase.client.Table;
44  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
45  import org.apache.hadoop.hbase.util.Bytes;
46  import org.apache.hadoop.mapreduce.Counters;
47  import org.apache.hadoop.mapreduce.Job;
48  import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
49  import org.apache.hadoop.util.GenericOptionsParser;
50  import org.apache.hadoop.util.Tool;
51  import org.apache.hadoop.util.ToolRunner;
52  
53  import com.google.common.base.Throwables;
54  import com.google.common.collect.Iterators;
55  
56  public class SyncTable extends Configured implements Tool {
57  
58    private static final Log LOG = LogFactory.getLog(SyncTable.class);
59  
60    static final String SOURCE_HASH_DIR_CONF_KEY = "sync.table.source.hash.dir";
61    static final String SOURCE_TABLE_CONF_KEY = "sync.table.source.table.name";
62    static final String TARGET_TABLE_CONF_KEY = "sync.table.target.table.name";
63    static final String SOURCE_ZK_CLUSTER_CONF_KEY = "sync.table.source.zk.cluster";
64    static final String TARGET_ZK_CLUSTER_CONF_KEY = "sync.table.target.zk.cluster";
65    static final String DRY_RUN_CONF_KEY="sync.table.dry.run";
66    
67    Path sourceHashDir;
68    String sourceTableName;
69    String targetTableName;
70    
71    String sourceZkCluster;
72    String targetZkCluster;
73    boolean dryRun;
74    
75    Counters counters;
76    
77    public SyncTable(Configuration conf) {
78      super(conf);
79    }
80    
81    public Job createSubmittableJob(String[] args) throws IOException {
82      FileSystem fs = sourceHashDir.getFileSystem(getConf());
83      if (!fs.exists(sourceHashDir)) {
84        throw new IOException("Source hash dir not found: " + sourceHashDir);
85      }
86      
87      HashTable.TableHash tableHash = HashTable.TableHash.read(getConf(), sourceHashDir);
88      LOG.info("Read source hash manifest: " + tableHash);
89      LOG.info("Read " + tableHash.partitions.size() + " partition keys");
90      if (!tableHash.tableName.equals(sourceTableName)) {
91        LOG.warn("Table name mismatch - manifest indicates hash was taken from: "
92            + tableHash.tableName + " but job is reading from: " + sourceTableName);
93      }
94      if (tableHash.numHashFiles != tableHash.partitions.size() + 1) {
95        throw new RuntimeException("Hash data appears corrupt. The number of of hash files created"
96            + " should be 1 more than the number of partition keys.  However, the manifest file "
97            + " says numHashFiles=" + tableHash.numHashFiles + " but the number of partition keys"
98            + " found in the partitions file is " + tableHash.partitions.size());
99      }
100     
101     Path dataDir = new Path(sourceHashDir, HashTable.HASH_DATA_DIR);
102     int dataSubdirCount = 0;
103     for (FileStatus file : fs.listStatus(dataDir)) {
104       if (file.getPath().getName().startsWith(HashTable.OUTPUT_DATA_FILE_PREFIX)) {
105         dataSubdirCount++;
106       }
107     }
108     
109     if (dataSubdirCount != tableHash.numHashFiles) {
110       throw new RuntimeException("Hash data appears corrupt. The number of of hash files created"
111           + " should be 1 more than the number of partition keys.  However, the number of data dirs"
112           + " found is " + dataSubdirCount + " but the number of partition keys"
113           + " found in the partitions file is " + tableHash.partitions.size());
114     }
115     
116     Job job = Job.getInstance(getConf(),getConf().get("mapreduce.job.name",
117         "syncTable_" + sourceTableName + "-" + targetTableName));
118     Configuration jobConf = job.getConfiguration();
119     job.setJarByClass(HashTable.class);
120     jobConf.set(SOURCE_HASH_DIR_CONF_KEY, sourceHashDir.toString());
121     jobConf.set(SOURCE_TABLE_CONF_KEY, sourceTableName);
122     jobConf.set(TARGET_TABLE_CONF_KEY, targetTableName);
123     if (sourceZkCluster != null) {
124       jobConf.set(SOURCE_ZK_CLUSTER_CONF_KEY, sourceZkCluster);
125     }
126     if (targetZkCluster != null) {
127       jobConf.set(TARGET_ZK_CLUSTER_CONF_KEY, targetZkCluster);
128     }
129     jobConf.setBoolean(DRY_RUN_CONF_KEY, dryRun);
130     
131     TableMapReduceUtil.initTableMapperJob(targetTableName, tableHash.initScan(),
132         SyncMapper.class, null, null, job);
133     
134     job.setNumReduceTasks(0);
135      
136     if (dryRun) {
137       job.setOutputFormatClass(NullOutputFormat.class);
138     } else {
139       // No reducers.  Just write straight to table.  Call initTableReducerJob
140       // because it sets up the TableOutputFormat.
141       TableMapReduceUtil.initTableReducerJob(targetTableName, null, job, null,
142           targetZkCluster, null, null);
143       
144       // would be nice to add an option for bulk load instead
145     }
146     
147     return job;
148   }
149   
150   public static class SyncMapper extends TableMapper<ImmutableBytesWritable, Mutation> {
151     Path sourceHashDir;
152     
153     Connection sourceConnection;
154     Connection targetConnection;
155     Table sourceTable;
156     Table targetTable;
157     boolean dryRun;
158     
159     HashTable.TableHash sourceTableHash;
160     HashTable.TableHash.Reader sourceHashReader;
161     ImmutableBytesWritable currentSourceHash;
162     ImmutableBytesWritable nextSourceKey;
163     HashTable.ResultHasher targetHasher;
164     
165     Throwable mapperException;
166      
167     public static enum Counter {BATCHES, HASHES_MATCHED, HASHES_NOT_MATCHED, SOURCEMISSINGROWS,
168       SOURCEMISSINGCELLS, TARGETMISSINGROWS, TARGETMISSINGCELLS, ROWSWITHDIFFS, DIFFERENTCELLVALUES,
169       MATCHINGROWS, MATCHINGCELLS, EMPTY_BATCHES, RANGESMATCHED, RANGESNOTMATCHED};
170     
171     @Override
172     protected void setup(Context context) throws IOException {
173       
174       Configuration conf = context.getConfiguration();
175       sourceHashDir = new Path(conf.get(SOURCE_HASH_DIR_CONF_KEY));
176       sourceConnection = openConnection(conf, SOURCE_ZK_CLUSTER_CONF_KEY, null);
177       targetConnection = openConnection(conf, TARGET_ZK_CLUSTER_CONF_KEY,
178           TableOutputFormat.OUTPUT_CONF_PREFIX);
179       sourceTable = openTable(sourceConnection, conf, SOURCE_TABLE_CONF_KEY);
180       targetTable = openTable(targetConnection, conf, TARGET_TABLE_CONF_KEY);
181       dryRun = conf.getBoolean(SOURCE_TABLE_CONF_KEY, false);
182       
183       sourceTableHash = HashTable.TableHash.read(conf, sourceHashDir);
184       LOG.info("Read source hash manifest: " + sourceTableHash);
185       LOG.info("Read " + sourceTableHash.partitions.size() + " partition keys");
186       
187       TableSplit split = (TableSplit) context.getInputSplit();
188       ImmutableBytesWritable splitStartKey = new ImmutableBytesWritable(split.getStartRow());
189       
190       sourceHashReader = sourceTableHash.newReader(conf, splitStartKey);
191       findNextKeyHashPair();
192       
193       // create a hasher, but don't start it right away
194       // instead, find the first hash batch at or after the start row
195       // and skip any rows that come before.  they will be caught by the previous task
196       targetHasher = new HashTable.ResultHasher();
197     }
198   
199     private static Connection openConnection(Configuration conf, String zkClusterConfKey,
200                                              String configPrefix)
201       throws IOException {
202         String zkCluster = conf.get(zkClusterConfKey);
203         Configuration clusterConf = HBaseConfiguration.createClusterConf(conf,
204             zkCluster, configPrefix);
205         return ConnectionFactory.createConnection(clusterConf);
206     }
207     
208     private static Table openTable(Connection connection, Configuration conf,
209         String tableNameConfKey) throws IOException {
210       return connection.getTable(TableName.valueOf(conf.get(tableNameConfKey)));
211     }
212     
213     /**
214      * Attempt to read the next source key/hash pair.
215      * If there are no more, set nextSourceKey to null
216      */
217     private void findNextKeyHashPair() throws IOException {
218       boolean hasNext = sourceHashReader.next();
219       if (hasNext) {
220         nextSourceKey = sourceHashReader.getCurrentKey();
221       } else {
222         // no more keys - last hash goes to the end
223         nextSourceKey = null;
224       }
225     }
226     
227     @Override
228     protected void map(ImmutableBytesWritable key, Result value, Context context)
229         throws IOException, InterruptedException {
230       try {
231         // first, finish any hash batches that end before the scanned row
232         while (nextSourceKey != null && key.compareTo(nextSourceKey) >= 0) {
233           moveToNextBatch(context);
234         }
235         
236         // next, add the scanned row (as long as we've reached the first batch)
237         if (targetHasher.isBatchStarted()) {
238           targetHasher.hashResult(value);
239         }
240       } catch (Throwable t) {
241         mapperException = t;
242         Throwables.propagateIfInstanceOf(t, IOException.class);
243         Throwables.propagateIfInstanceOf(t, InterruptedException.class);
244         Throwables.propagate(t);
245       }
246     }
247 
248     /**
249      * If there is an open hash batch, complete it and sync if there are diffs.
250      * Start a new batch, and seek to read the 
251      */
252     private void moveToNextBatch(Context context) throws IOException, InterruptedException {
253       if (targetHasher.isBatchStarted()) {
254         finishBatchAndCompareHashes(context);
255       }
256       targetHasher.startBatch(nextSourceKey);
257       currentSourceHash = sourceHashReader.getCurrentHash();
258       
259       findNextKeyHashPair();
260     }
261 
262     /**
263      * Finish the currently open hash batch.
264      * Compare the target hash to the given source hash.
265      * If they do not match, then sync the covered key range.
266      */
267     private void finishBatchAndCompareHashes(Context context)
268         throws IOException, InterruptedException {
269       targetHasher.finishBatch();
270       context.getCounter(Counter.BATCHES).increment(1);
271       if (targetHasher.getBatchSize() == 0) {
272         context.getCounter(Counter.EMPTY_BATCHES).increment(1);
273       }
274       ImmutableBytesWritable targetHash = targetHasher.getBatchHash();
275       if (targetHash.equals(currentSourceHash)) {
276         context.getCounter(Counter.HASHES_MATCHED).increment(1);
277       } else {
278         context.getCounter(Counter.HASHES_NOT_MATCHED).increment(1);
279         
280         ImmutableBytesWritable stopRow = nextSourceKey == null
281                                           ? new ImmutableBytesWritable(sourceTableHash.stopRow)
282                                           : nextSourceKey;
283         
284         if (LOG.isDebugEnabled()) {
285           LOG.debug("Hash mismatch.  Key range: " + toHex(targetHasher.getBatchStartKey())
286               + " to " + toHex(stopRow)
287               + " sourceHash: " + toHex(currentSourceHash)
288               + " targetHash: " + toHex(targetHash));
289         }
290         
291         syncRange(context, targetHasher.getBatchStartKey(), stopRow);
292       }
293     }
294     private static String toHex(ImmutableBytesWritable bytes) {
295       return Bytes.toHex(bytes.get(), bytes.getOffset(), bytes.getLength());
296     }
297     
298     private static final CellScanner EMPTY_CELL_SCANNER
299       = new CellScanner(Iterators.<Result>emptyIterator());
300     
301     /**
302      * Rescan the given range directly from the source and target tables.
303      * Count and log differences, and if this is not a dry run, output Puts and Deletes
304      * to make the target table match the source table for this range
305      */
306     private void syncRange(Context context, ImmutableBytesWritable startRow,
307         ImmutableBytesWritable stopRow) throws IOException, InterruptedException {
308       
309       Scan scan = sourceTableHash.initScan();
310       scan.setStartRow(startRow.copyBytes());
311       scan.setStopRow(stopRow.copyBytes());
312       
313       ResultScanner sourceScanner = sourceTable.getScanner(scan);
314       CellScanner sourceCells = new CellScanner(sourceScanner.iterator());
315 
316       ResultScanner targetScanner = targetTable.getScanner(scan);
317       CellScanner targetCells = new CellScanner(targetScanner.iterator());
318       
319       boolean rangeMatched = true;
320       byte[] nextSourceRow = sourceCells.nextRow();
321       byte[] nextTargetRow = targetCells.nextRow();
322       while(nextSourceRow != null || nextTargetRow != null) {
323         boolean rowMatched;
324         int rowComparison = compareRowKeys(nextSourceRow, nextTargetRow);
325         if (rowComparison < 0) {
326           if (LOG.isInfoEnabled()) {
327             LOG.info("Target missing row: " + Bytes.toHex(nextSourceRow));
328           }
329           context.getCounter(Counter.TARGETMISSINGROWS).increment(1);
330           
331           rowMatched = syncRowCells(context, nextSourceRow, sourceCells, EMPTY_CELL_SCANNER);
332           nextSourceRow = sourceCells.nextRow();  // advance only source to next row
333         } else if (rowComparison > 0) {
334           if (LOG.isInfoEnabled()) {
335             LOG.info("Source missing row: " + Bytes.toHex(nextTargetRow));
336           }
337           context.getCounter(Counter.SOURCEMISSINGROWS).increment(1);
338           
339           rowMatched = syncRowCells(context, nextTargetRow, EMPTY_CELL_SCANNER, targetCells);
340           nextTargetRow = targetCells.nextRow();  // advance only target to next row
341         } else {
342           // current row is the same on both sides, compare cell by cell
343           rowMatched = syncRowCells(context, nextSourceRow, sourceCells, targetCells);
344           nextSourceRow = sourceCells.nextRow();  
345           nextTargetRow = targetCells.nextRow();
346         }
347         
348         if (!rowMatched) {
349           rangeMatched = false;
350         }
351       }
352       
353       sourceScanner.close();
354       targetScanner.close();
355       
356       context.getCounter(rangeMatched ? Counter.RANGESMATCHED : Counter.RANGESNOTMATCHED)
357         .increment(1);
358     }
359     
360     private static class CellScanner {
361       private final Iterator<Result> results;
362       
363       private byte[] currentRow;
364       private Result currentRowResult;
365       private int nextCellInRow;
366       
367       private Result nextRowResult;
368       
369       public CellScanner(Iterator<Result> results) {
370         this.results = results;
371       }
372       
373       /**
374        * Advance to the next row and return its row key.
375        * Returns null iff there are no more rows.
376        */
377       public byte[] nextRow() {
378         if (nextRowResult == null) {
379           // no cached row - check scanner for more
380           while (results.hasNext()) {
381             nextRowResult = results.next();
382             Cell nextCell = nextRowResult.rawCells()[0];
383             if (currentRow == null
384                 || !Bytes.equals(currentRow, 0, currentRow.length, nextCell.getRowArray(),
385                 nextCell.getRowOffset(), nextCell.getRowLength())) {
386               // found next row
387               break;
388             } else {
389               // found another result from current row, keep scanning
390               nextRowResult = null;
391             }
392           }
393           
394           if (nextRowResult == null) {
395             // end of data, no more rows
396             currentRowResult = null;
397             currentRow = null;
398             return null;
399           }
400         }
401         
402         // advance to cached result for next row
403         currentRowResult = nextRowResult;
404         nextCellInRow = 0;
405         currentRow = currentRowResult.getRow();
406         nextRowResult = null;
407         return currentRow;
408       }
409       
410       /**
411        * Returns the next Cell in the current row or null iff none remain.
412        */
413       public Cell nextCellInRow() {
414         if (currentRowResult == null) {
415           // nothing left in current row
416           return null;
417         }
418         
419         Cell nextCell = currentRowResult.rawCells()[nextCellInRow];
420         nextCellInRow++;
421         if (nextCellInRow == currentRowResult.size()) {
422           if (results.hasNext()) {
423             Result result = results.next();
424             Cell cell = result.rawCells()[0];
425             if (Bytes.equals(currentRow, 0, currentRow.length, cell.getRowArray(),
426                 cell.getRowOffset(), cell.getRowLength())) {
427               // result is part of current row
428               currentRowResult = result;
429               nextCellInRow = 0;
430             } else {
431               // result is part of next row, cache it
432               nextRowResult = result;
433               // current row is complete
434               currentRowResult = null;
435             }
436           } else {
437             // end of data
438             currentRowResult = null;
439           }
440         }
441         return nextCell;
442       }
443     }
444        
445     /**
446      * Compare the cells for the given row from the source and target tables.
447      * Count and log any differences.
448      * If not a dry run, output a Put and/or Delete needed to sync the target table
449      * to match the source table.
450      */
451     private boolean syncRowCells(Context context, byte[] rowKey, CellScanner sourceCells,
452         CellScanner targetCells) throws IOException, InterruptedException {
453       Put put = null;
454       Delete delete = null;
455       long matchingCells = 0;
456       boolean matchingRow = true;
457       Cell sourceCell = sourceCells.nextCellInRow();
458       Cell targetCell = targetCells.nextCellInRow();
459       while (sourceCell != null || targetCell != null) {
460 
461         int cellKeyComparison = compareCellKeysWithinRow(sourceCell, targetCell);
462         if (cellKeyComparison < 0) {
463           if (LOG.isDebugEnabled()) {
464             LOG.debug("Target missing cell: " + sourceCell);
465           }
466           context.getCounter(Counter.TARGETMISSINGCELLS).increment(1);
467           matchingRow = false;
468           
469           if (!dryRun) {
470             if (put == null) {
471               put = new Put(rowKey);
472             }
473             put.add(sourceCell);
474           }
475           
476           sourceCell = sourceCells.nextCellInRow();
477         } else if (cellKeyComparison > 0) {
478           if (LOG.isDebugEnabled()) {
479             LOG.debug("Source missing cell: " + targetCell);
480           }
481           context.getCounter(Counter.SOURCEMISSINGCELLS).increment(1);
482           matchingRow = false;
483           
484           if (!dryRun) {
485             if (delete == null) {
486               delete = new Delete(rowKey);
487             }
488             // add a tombstone to exactly match the target cell that is missing on the source
489             delete.addColumn(CellUtil.cloneFamily(targetCell),
490                 CellUtil.cloneQualifier(targetCell), targetCell.getTimestamp());
491           }
492           
493           targetCell = targetCells.nextCellInRow();
494         } else {
495           // the cell keys are equal, now check values
496           if (CellUtil.matchingValue(sourceCell, targetCell)) {
497             matchingCells++;
498           } else {
499             if (LOG.isDebugEnabled()) {
500               LOG.debug("Different values: ");
501               LOG.debug("  source cell: " + sourceCell
502                   + " value: " + Bytes.toHex(sourceCell.getValueArray(),
503                       sourceCell.getValueOffset(), sourceCell.getValueLength()));
504               LOG.debug("  target cell: " + targetCell
505                   + " value: " + Bytes.toHex(targetCell.getValueArray(),
506                       targetCell.getValueOffset(), targetCell.getValueLength()));
507             }
508             context.getCounter(Counter.DIFFERENTCELLVALUES).increment(1);
509             matchingRow = false;
510             
511             if (!dryRun) {
512               // overwrite target cell
513               if (put == null) {
514                 put = new Put(rowKey);
515               }
516               put.add(sourceCell);
517             }
518           }
519           sourceCell = sourceCells.nextCellInRow();
520           targetCell = targetCells.nextCellInRow();
521         }
522         
523         if (!dryRun && sourceTableHash.scanBatch > 0) {
524           if (put != null && put.size() >= sourceTableHash.scanBatch) {
525             context.write(new ImmutableBytesWritable(rowKey), put);
526             put = null;
527           }
528           if (delete != null && delete.size() >= sourceTableHash.scanBatch) {
529             context.write(new ImmutableBytesWritable(rowKey), delete);
530             delete = null;
531           }
532         }
533       }
534       
535       if (!dryRun) {
536         if (put != null) {
537           context.write(new ImmutableBytesWritable(rowKey), put);
538         }
539         if (delete != null) {
540           context.write(new ImmutableBytesWritable(rowKey), delete);
541         }
542       }
543       
544       if (matchingCells > 0) {
545         context.getCounter(Counter.MATCHINGCELLS).increment(matchingCells);
546       }
547       if (matchingRow) {
548         context.getCounter(Counter.MATCHINGROWS).increment(1);
549         return true;
550       } else {
551         context.getCounter(Counter.ROWSWITHDIFFS).increment(1);
552         return false;
553       }
554     }
555 
556     private static final CellComparator cellComparator = new CellComparator();
557     /**
558      * Compare row keys of the given Result objects.
559      * Nulls are after non-nulls
560      */
561     private static int compareRowKeys(byte[] r1, byte[] r2) {
562       if (r1 == null) {
563         return 1;  // source missing row
564       } else if (r2 == null) {
565         return -1; // target missing row
566       } else {
567         return cellComparator.compareRows(r1, 0, r1.length, r2, 0, r2.length);
568       }
569     }
570 
571     /**
572      * Compare families, qualifiers, and timestamps of the given Cells.
573      * They are assumed to be of the same row.
574      * Nulls are after non-nulls.
575      */
576      private static int compareCellKeysWithinRow(Cell c1, Cell c2) {
577       if (c1 == null) {
578         return 1; // source missing cell
579       }
580       if (c2 == null) {
581         return -1; // target missing cell
582       }
583       
584       int result = CellComparator.compareFamilies(c1, c2);
585       if (result != 0) {
586         return result;
587       }
588       
589       result = CellComparator.compareQualifiers(c1, c2);
590       if (result != 0) {
591         return result;
592       }
593       
594       // note timestamp comparison is inverted - more recent cells first
595       return CellComparator.compareTimestamps(c1, c2);
596     }
597      
598     @Override
599     protected void cleanup(Context context)
600         throws IOException, InterruptedException {
601       if (mapperException == null) {
602         try {
603           finishRemainingHashRanges(context);
604         } catch (Throwable t) {
605           mapperException = t;
606         }
607       }
608       
609       try {
610         sourceTable.close();
611         targetTable.close();
612         sourceConnection.close();
613         targetConnection.close();
614       } catch (Throwable t) {
615         if (mapperException == null) {
616           mapperException = t;
617         } else {
618           LOG.error("Suppressing exception from closing tables", t);
619         }
620       }
621       
622       // propagate first exception
623       if (mapperException != null) {
624         Throwables.propagateIfInstanceOf(mapperException, IOException.class);
625         Throwables.propagateIfInstanceOf(mapperException, InterruptedException.class);
626         Throwables.propagate(mapperException);
627       }
628     }
629 
630     private void finishRemainingHashRanges(Context context) throws IOException,
631         InterruptedException {
632       TableSplit split = (TableSplit) context.getInputSplit();
633       byte[] splitEndRow = split.getEndRow();
634       boolean reachedEndOfTable = HashTable.isTableEndRow(splitEndRow);
635 
636       // if there are more hash batches that begin before the end of this split move to them
637       while (nextSourceKey != null
638           && (nextSourceKey.compareTo(splitEndRow) < 0 || reachedEndOfTable)) {
639         moveToNextBatch(context);
640       }
641       
642       if (targetHasher.isBatchStarted()) {
643         // need to complete the final open hash batch
644 
645         if ((nextSourceKey != null && nextSourceKey.compareTo(splitEndRow) > 0)
646               || (nextSourceKey == null && !Bytes.equals(splitEndRow, sourceTableHash.stopRow))) {
647           // the open hash range continues past the end of this region
648           // add a scan to complete the current hash range
649           Scan scan = sourceTableHash.initScan();
650           scan.setStartRow(splitEndRow);
651           if (nextSourceKey == null) {
652             scan.setStopRow(sourceTableHash.stopRow);
653           } else {
654             scan.setStopRow(nextSourceKey.copyBytes());
655           }
656           
657           ResultScanner targetScanner = targetTable.getScanner(scan);
658           for (Result row : targetScanner) {
659             targetHasher.hashResult(row);          
660           }
661         } // else current batch ends exactly at split end row
662 
663         finishBatchAndCompareHashes(context);
664       }
665     }
666   }
667   
668   private static final int NUM_ARGS = 3;
669   private static void printUsage(final String errorMsg) {
670     if (errorMsg != null && errorMsg.length() > 0) {
671       System.err.println("ERROR: " + errorMsg);
672       System.err.println();
673     }
674     System.err.println("Usage: SyncTable [options] <sourcehashdir> <sourcetable> <targettable>");
675     System.err.println();
676     System.err.println("Options:");
677     
678     System.err.println(" sourcezkcluster  ZK cluster key of the source table");
679     System.err.println("                  (defaults to cluster in classpath's config)");
680     System.err.println(" targetzkcluster  ZK cluster key of the target table");
681     System.err.println("                  (defaults to cluster in classpath's config)");
682     System.err.println(" dryrun           if true, output counters but no writes");
683     System.err.println("                  (defaults to false)");
684     System.err.println();
685     System.err.println("Args:");
686     System.err.println(" sourcehashdir    path to HashTable output dir for source table");
687     System.err.println("                  if not specified, then all data will be scanned");
688     System.err.println(" sourcetable      Name of the source table to sync from");
689     System.err.println(" targettable      Name of the target table to sync to");
690     System.err.println();
691     System.err.println("Examples:");
692     System.err.println(" For a dry run SyncTable of tableA from a remote source cluster");
693     System.err.println(" to a local target cluster:");
694     System.err.println(" $ bin/hbase " +
695         "org.apache.hadoop.hbase.mapreduce.SyncTable --dryrun=true"
696         + " --sourcezkcluster=zk1.example.com,zk2.example.com,zk3.example.com:2181:/hbase"
697         + " hdfs://nn:9000/hashes/tableA tableA tableA");
698   }
699   
700   private boolean doCommandLine(final String[] args) {
701     if (args.length < NUM_ARGS) {
702       printUsage(null);
703       return false;
704     }
705     try {
706       sourceHashDir = new Path(args[args.length - 3]);
707       sourceTableName = args[args.length - 2];
708       targetTableName = args[args.length - 1];
709             
710       for (int i = 0; i < args.length - NUM_ARGS; i++) {
711         String cmd = args[i];
712         if (cmd.equals("-h") || cmd.startsWith("--h")) {
713           printUsage(null);
714           return false;
715         }
716         
717         final String sourceZkClusterKey = "--sourcezkcluster=";
718         if (cmd.startsWith(sourceZkClusterKey)) {
719           sourceZkCluster = cmd.substring(sourceZkClusterKey.length());
720           continue;
721         }
722         
723         final String targetZkClusterKey = "--targetzkcluster=";
724         if (cmd.startsWith(targetZkClusterKey)) {
725           targetZkCluster = cmd.substring(targetZkClusterKey.length());
726           continue;
727         }
728         
729         final String dryRunKey = "--dryrun=";
730         if (cmd.startsWith(dryRunKey)) {
731           dryRun = Boolean.parseBoolean(cmd.substring(dryRunKey.length()));
732           continue;
733         }
734         
735         printUsage("Invalid argument '" + cmd + "'");
736         return false;
737       }
738 
739       
740     } catch (Exception e) {
741       e.printStackTrace();
742       printUsage("Can't start because " + e.getMessage());
743       return false;
744     }
745     return true;
746   }
747   
748   /**
749    * Main entry point.
750    */
751   public static void main(String[] args) throws Exception {
752     int ret = ToolRunner.run(new SyncTable(HBaseConfiguration.create()), args);
753     System.exit(ret);
754   }
755 
756   @Override
757   public int run(String[] args) throws Exception {
758     String[] otherArgs = new GenericOptionsParser(getConf(), args).getRemainingArgs();
759     if (!doCommandLine(otherArgs)) {
760       return 1;
761     }
762 
763     Job job = createSubmittableJob(otherArgs);
764     if (!job.waitForCompletion(true)) {
765       LOG.info("Map-reduce job failed!");
766       return 1;
767     }
768     counters = job.getCounters();
769     return 0;
770   }
771 
772 }