1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.mapreduce;
19
20 import java.io.IOException;
21 import java.util.Iterator;
22
23 import org.apache.commons.logging.Log;
24 import org.apache.commons.logging.LogFactory;
25 import org.apache.hadoop.conf.Configuration;
26 import org.apache.hadoop.conf.Configured;
27 import org.apache.hadoop.fs.FileStatus;
28 import org.apache.hadoop.fs.FileSystem;
29 import org.apache.hadoop.fs.Path;
30 import org.apache.hadoop.hbase.Cell;
31 import org.apache.hadoop.hbase.CellComparator;
32 import org.apache.hadoop.hbase.CellUtil;
33 import org.apache.hadoop.hbase.HBaseConfiguration;
34 import org.apache.hadoop.hbase.TableName;
35 import org.apache.hadoop.hbase.client.Connection;
36 import org.apache.hadoop.hbase.client.ConnectionFactory;
37 import org.apache.hadoop.hbase.client.Delete;
38 import org.apache.hadoop.hbase.client.Mutation;
39 import org.apache.hadoop.hbase.client.Put;
40 import org.apache.hadoop.hbase.client.Result;
41 import org.apache.hadoop.hbase.client.ResultScanner;
42 import org.apache.hadoop.hbase.client.Scan;
43 import org.apache.hadoop.hbase.client.Table;
44 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
45 import org.apache.hadoop.hbase.util.Bytes;
46 import org.apache.hadoop.mapreduce.Counters;
47 import org.apache.hadoop.mapreduce.Job;
48 import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
49 import org.apache.hadoop.util.GenericOptionsParser;
50 import org.apache.hadoop.util.Tool;
51 import org.apache.hadoop.util.ToolRunner;
52
53 import com.google.common.base.Throwables;
54 import com.google.common.collect.Iterators;
55
56 public class SyncTable extends Configured implements Tool {
57
58 private static final Log LOG = LogFactory.getLog(SyncTable.class);
59
60 static final String SOURCE_HASH_DIR_CONF_KEY = "sync.table.source.hash.dir";
61 static final String SOURCE_TABLE_CONF_KEY = "sync.table.source.table.name";
62 static final String TARGET_TABLE_CONF_KEY = "sync.table.target.table.name";
63 static final String SOURCE_ZK_CLUSTER_CONF_KEY = "sync.table.source.zk.cluster";
64 static final String TARGET_ZK_CLUSTER_CONF_KEY = "sync.table.target.zk.cluster";
65 static final String DRY_RUN_CONF_KEY="sync.table.dry.run";
66
67 Path sourceHashDir;
68 String sourceTableName;
69 String targetTableName;
70
71 String sourceZkCluster;
72 String targetZkCluster;
73 boolean dryRun;
74
75 Counters counters;
76
77 public SyncTable(Configuration conf) {
78 super(conf);
79 }
80
81 public Job createSubmittableJob(String[] args) throws IOException {
82 FileSystem fs = sourceHashDir.getFileSystem(getConf());
83 if (!fs.exists(sourceHashDir)) {
84 throw new IOException("Source hash dir not found: " + sourceHashDir);
85 }
86
87 HashTable.TableHash tableHash = HashTable.TableHash.read(getConf(), sourceHashDir);
88 LOG.info("Read source hash manifest: " + tableHash);
89 LOG.info("Read " + tableHash.partitions.size() + " partition keys");
90 if (!tableHash.tableName.equals(sourceTableName)) {
91 LOG.warn("Table name mismatch - manifest indicates hash was taken from: "
92 + tableHash.tableName + " but job is reading from: " + sourceTableName);
93 }
94 if (tableHash.numHashFiles != tableHash.partitions.size() + 1) {
95 throw new RuntimeException("Hash data appears corrupt. The number of of hash files created"
96 + " should be 1 more than the number of partition keys. However, the manifest file "
97 + " says numHashFiles=" + tableHash.numHashFiles + " but the number of partition keys"
98 + " found in the partitions file is " + tableHash.partitions.size());
99 }
100
101 Path dataDir = new Path(sourceHashDir, HashTable.HASH_DATA_DIR);
102 int dataSubdirCount = 0;
103 for (FileStatus file : fs.listStatus(dataDir)) {
104 if (file.getPath().getName().startsWith(HashTable.OUTPUT_DATA_FILE_PREFIX)) {
105 dataSubdirCount++;
106 }
107 }
108
109 if (dataSubdirCount != tableHash.numHashFiles) {
110 throw new RuntimeException("Hash data appears corrupt. The number of of hash files created"
111 + " should be 1 more than the number of partition keys. However, the number of data dirs"
112 + " found is " + dataSubdirCount + " but the number of partition keys"
113 + " found in the partitions file is " + tableHash.partitions.size());
114 }
115
116 Job job = Job.getInstance(getConf(),getConf().get("mapreduce.job.name",
117 "syncTable_" + sourceTableName + "-" + targetTableName));
118 Configuration jobConf = job.getConfiguration();
119 job.setJarByClass(HashTable.class);
120 jobConf.set(SOURCE_HASH_DIR_CONF_KEY, sourceHashDir.toString());
121 jobConf.set(SOURCE_TABLE_CONF_KEY, sourceTableName);
122 jobConf.set(TARGET_TABLE_CONF_KEY, targetTableName);
123 if (sourceZkCluster != null) {
124 jobConf.set(SOURCE_ZK_CLUSTER_CONF_KEY, sourceZkCluster);
125 }
126 if (targetZkCluster != null) {
127 jobConf.set(TARGET_ZK_CLUSTER_CONF_KEY, targetZkCluster);
128 }
129 jobConf.setBoolean(DRY_RUN_CONF_KEY, dryRun);
130
131 TableMapReduceUtil.initTableMapperJob(targetTableName, tableHash.initScan(),
132 SyncMapper.class, null, null, job);
133
134 job.setNumReduceTasks(0);
135
136 if (dryRun) {
137 job.setOutputFormatClass(NullOutputFormat.class);
138 } else {
139
140
141 TableMapReduceUtil.initTableReducerJob(targetTableName, null, job, null,
142 targetZkCluster, null, null);
143
144
145 }
146
147
148 if (sourceZkCluster != null) {
149 Configuration peerConf =
150 HBaseConfiguration.createClusterConf(job.getConfiguration(), sourceZkCluster);
151 TableMapReduceUtil.initCredentialsForCluster(job, peerConf);
152 }
153 return job;
154 }
155
156 public static class SyncMapper extends TableMapper<ImmutableBytesWritable, Mutation> {
157 Path sourceHashDir;
158
159 Connection sourceConnection;
160 Connection targetConnection;
161 Table sourceTable;
162 Table targetTable;
163 boolean dryRun;
164
165 HashTable.TableHash sourceTableHash;
166 HashTable.TableHash.Reader sourceHashReader;
167 ImmutableBytesWritable currentSourceHash;
168 ImmutableBytesWritable nextSourceKey;
169 HashTable.ResultHasher targetHasher;
170
171 Throwable mapperException;
172
173 public static enum Counter {BATCHES, HASHES_MATCHED, HASHES_NOT_MATCHED, SOURCEMISSINGROWS,
174 SOURCEMISSINGCELLS, TARGETMISSINGROWS, TARGETMISSINGCELLS, ROWSWITHDIFFS, DIFFERENTCELLVALUES,
175 MATCHINGROWS, MATCHINGCELLS, EMPTY_BATCHES, RANGESMATCHED, RANGESNOTMATCHED};
176
177 @Override
178 protected void setup(Context context) throws IOException {
179
180 Configuration conf = context.getConfiguration();
181 sourceHashDir = new Path(conf.get(SOURCE_HASH_DIR_CONF_KEY));
182 sourceConnection = openConnection(conf, SOURCE_ZK_CLUSTER_CONF_KEY, null);
183 targetConnection = openConnection(conf, TARGET_ZK_CLUSTER_CONF_KEY,
184 TableOutputFormat.OUTPUT_CONF_PREFIX);
185 sourceTable = openTable(sourceConnection, conf, SOURCE_TABLE_CONF_KEY);
186 targetTable = openTable(targetConnection, conf, TARGET_TABLE_CONF_KEY);
187 dryRun = conf.getBoolean(SOURCE_TABLE_CONF_KEY, false);
188
189 sourceTableHash = HashTable.TableHash.read(conf, sourceHashDir);
190 LOG.info("Read source hash manifest: " + sourceTableHash);
191 LOG.info("Read " + sourceTableHash.partitions.size() + " partition keys");
192
193 TableSplit split = (TableSplit) context.getInputSplit();
194 ImmutableBytesWritable splitStartKey = new ImmutableBytesWritable(split.getStartRow());
195
196 sourceHashReader = sourceTableHash.newReader(conf, splitStartKey);
197 findNextKeyHashPair();
198
199
200
201
202 targetHasher = new HashTable.ResultHasher();
203 }
204
205 private static Connection openConnection(Configuration conf, String zkClusterConfKey,
206 String configPrefix)
207 throws IOException {
208 String zkCluster = conf.get(zkClusterConfKey);
209 Configuration clusterConf = HBaseConfiguration.createClusterConf(conf,
210 zkCluster, configPrefix);
211 return ConnectionFactory.createConnection(clusterConf);
212 }
213
214 private static Table openTable(Connection connection, Configuration conf,
215 String tableNameConfKey) throws IOException {
216 return connection.getTable(TableName.valueOf(conf.get(tableNameConfKey)));
217 }
218
219
220
221
222
223 private void findNextKeyHashPair() throws IOException {
224 boolean hasNext = sourceHashReader.next();
225 if (hasNext) {
226 nextSourceKey = sourceHashReader.getCurrentKey();
227 } else {
228
229 nextSourceKey = null;
230 }
231 }
232
233 @Override
234 protected void map(ImmutableBytesWritable key, Result value, Context context)
235 throws IOException, InterruptedException {
236 try {
237
238 while (nextSourceKey != null && key.compareTo(nextSourceKey) >= 0) {
239 moveToNextBatch(context);
240 }
241
242
243 if (targetHasher.isBatchStarted()) {
244 targetHasher.hashResult(value);
245 }
246 } catch (Throwable t) {
247 mapperException = t;
248 Throwables.propagateIfInstanceOf(t, IOException.class);
249 Throwables.propagateIfInstanceOf(t, InterruptedException.class);
250 Throwables.propagate(t);
251 }
252 }
253
254
255
256
257
258 private void moveToNextBatch(Context context) throws IOException, InterruptedException {
259 if (targetHasher.isBatchStarted()) {
260 finishBatchAndCompareHashes(context);
261 }
262 targetHasher.startBatch(nextSourceKey);
263 currentSourceHash = sourceHashReader.getCurrentHash();
264
265 findNextKeyHashPair();
266 }
267
268
269
270
271
272
273 private void finishBatchAndCompareHashes(Context context)
274 throws IOException, InterruptedException {
275 targetHasher.finishBatch();
276 context.getCounter(Counter.BATCHES).increment(1);
277 if (targetHasher.getBatchSize() == 0) {
278 context.getCounter(Counter.EMPTY_BATCHES).increment(1);
279 }
280 ImmutableBytesWritable targetHash = targetHasher.getBatchHash();
281 if (targetHash.equals(currentSourceHash)) {
282 context.getCounter(Counter.HASHES_MATCHED).increment(1);
283 } else {
284 context.getCounter(Counter.HASHES_NOT_MATCHED).increment(1);
285
286 ImmutableBytesWritable stopRow = nextSourceKey == null
287 ? new ImmutableBytesWritable(sourceTableHash.stopRow)
288 : nextSourceKey;
289
290 if (LOG.isDebugEnabled()) {
291 LOG.debug("Hash mismatch. Key range: " + toHex(targetHasher.getBatchStartKey())
292 + " to " + toHex(stopRow)
293 + " sourceHash: " + toHex(currentSourceHash)
294 + " targetHash: " + toHex(targetHash));
295 }
296
297 syncRange(context, targetHasher.getBatchStartKey(), stopRow);
298 }
299 }
300 private static String toHex(ImmutableBytesWritable bytes) {
301 return Bytes.toHex(bytes.get(), bytes.getOffset(), bytes.getLength());
302 }
303
304 private static final CellScanner EMPTY_CELL_SCANNER
305 = new CellScanner(Iterators.<Result>emptyIterator());
306
307
308
309
310
311
312 private void syncRange(Context context, ImmutableBytesWritable startRow,
313 ImmutableBytesWritable stopRow) throws IOException, InterruptedException {
314
315 Scan scan = sourceTableHash.initScan();
316 scan.setStartRow(startRow.copyBytes());
317 scan.setStopRow(stopRow.copyBytes());
318
319 ResultScanner sourceScanner = sourceTable.getScanner(scan);
320 CellScanner sourceCells = new CellScanner(sourceScanner.iterator());
321
322 ResultScanner targetScanner = targetTable.getScanner(scan);
323 CellScanner targetCells = new CellScanner(targetScanner.iterator());
324
325 boolean rangeMatched = true;
326 byte[] nextSourceRow = sourceCells.nextRow();
327 byte[] nextTargetRow = targetCells.nextRow();
328 while(nextSourceRow != null || nextTargetRow != null) {
329 boolean rowMatched;
330 int rowComparison = compareRowKeys(nextSourceRow, nextTargetRow);
331 if (rowComparison < 0) {
332 if (LOG.isInfoEnabled()) {
333 LOG.info("Target missing row: " + Bytes.toHex(nextSourceRow));
334 }
335 context.getCounter(Counter.TARGETMISSINGROWS).increment(1);
336
337 rowMatched = syncRowCells(context, nextSourceRow, sourceCells, EMPTY_CELL_SCANNER);
338 nextSourceRow = sourceCells.nextRow();
339 } else if (rowComparison > 0) {
340 if (LOG.isInfoEnabled()) {
341 LOG.info("Source missing row: " + Bytes.toHex(nextTargetRow));
342 }
343 context.getCounter(Counter.SOURCEMISSINGROWS).increment(1);
344
345 rowMatched = syncRowCells(context, nextTargetRow, EMPTY_CELL_SCANNER, targetCells);
346 nextTargetRow = targetCells.nextRow();
347 } else {
348
349 rowMatched = syncRowCells(context, nextSourceRow, sourceCells, targetCells);
350 nextSourceRow = sourceCells.nextRow();
351 nextTargetRow = targetCells.nextRow();
352 }
353
354 if (!rowMatched) {
355 rangeMatched = false;
356 }
357 }
358
359 sourceScanner.close();
360 targetScanner.close();
361
362 context.getCounter(rangeMatched ? Counter.RANGESMATCHED : Counter.RANGESNOTMATCHED)
363 .increment(1);
364 }
365
366 private static class CellScanner {
367 private final Iterator<Result> results;
368
369 private byte[] currentRow;
370 private Result currentRowResult;
371 private int nextCellInRow;
372
373 private Result nextRowResult;
374
375 public CellScanner(Iterator<Result> results) {
376 this.results = results;
377 }
378
379
380
381
382
383 public byte[] nextRow() {
384 if (nextRowResult == null) {
385
386 while (results.hasNext()) {
387 nextRowResult = results.next();
388 Cell nextCell = nextRowResult.rawCells()[0];
389 if (currentRow == null
390 || !Bytes.equals(currentRow, 0, currentRow.length, nextCell.getRowArray(),
391 nextCell.getRowOffset(), nextCell.getRowLength())) {
392
393 break;
394 } else {
395
396 nextRowResult = null;
397 }
398 }
399
400 if (nextRowResult == null) {
401
402 currentRowResult = null;
403 currentRow = null;
404 return null;
405 }
406 }
407
408
409 currentRowResult = nextRowResult;
410 nextCellInRow = 0;
411 currentRow = currentRowResult.getRow();
412 nextRowResult = null;
413 return currentRow;
414 }
415
416
417
418
419 public Cell nextCellInRow() {
420 if (currentRowResult == null) {
421
422 return null;
423 }
424
425 Cell nextCell = currentRowResult.rawCells()[nextCellInRow];
426 nextCellInRow++;
427 if (nextCellInRow == currentRowResult.size()) {
428 if (results.hasNext()) {
429 Result result = results.next();
430 Cell cell = result.rawCells()[0];
431 if (Bytes.equals(currentRow, 0, currentRow.length, cell.getRowArray(),
432 cell.getRowOffset(), cell.getRowLength())) {
433
434 currentRowResult = result;
435 nextCellInRow = 0;
436 } else {
437
438 nextRowResult = result;
439
440 currentRowResult = null;
441 }
442 } else {
443
444 currentRowResult = null;
445 }
446 }
447 return nextCell;
448 }
449 }
450
451
452
453
454
455
456
457 private boolean syncRowCells(Context context, byte[] rowKey, CellScanner sourceCells,
458 CellScanner targetCells) throws IOException, InterruptedException {
459 Put put = null;
460 Delete delete = null;
461 long matchingCells = 0;
462 boolean matchingRow = true;
463 Cell sourceCell = sourceCells.nextCellInRow();
464 Cell targetCell = targetCells.nextCellInRow();
465 while (sourceCell != null || targetCell != null) {
466
467 int cellKeyComparison = compareCellKeysWithinRow(sourceCell, targetCell);
468 if (cellKeyComparison < 0) {
469 if (LOG.isDebugEnabled()) {
470 LOG.debug("Target missing cell: " + sourceCell);
471 }
472 context.getCounter(Counter.TARGETMISSINGCELLS).increment(1);
473 matchingRow = false;
474
475 if (!dryRun) {
476 if (put == null) {
477 put = new Put(rowKey);
478 }
479 put.add(sourceCell);
480 }
481
482 sourceCell = sourceCells.nextCellInRow();
483 } else if (cellKeyComparison > 0) {
484 if (LOG.isDebugEnabled()) {
485 LOG.debug("Source missing cell: " + targetCell);
486 }
487 context.getCounter(Counter.SOURCEMISSINGCELLS).increment(1);
488 matchingRow = false;
489
490 if (!dryRun) {
491 if (delete == null) {
492 delete = new Delete(rowKey);
493 }
494
495 delete.addColumn(CellUtil.cloneFamily(targetCell),
496 CellUtil.cloneQualifier(targetCell), targetCell.getTimestamp());
497 }
498
499 targetCell = targetCells.nextCellInRow();
500 } else {
501
502 if (CellUtil.matchingValue(sourceCell, targetCell)) {
503 matchingCells++;
504 } else {
505 if (LOG.isDebugEnabled()) {
506 LOG.debug("Different values: ");
507 LOG.debug(" source cell: " + sourceCell
508 + " value: " + Bytes.toHex(sourceCell.getValueArray(),
509 sourceCell.getValueOffset(), sourceCell.getValueLength()));
510 LOG.debug(" target cell: " + targetCell
511 + " value: " + Bytes.toHex(targetCell.getValueArray(),
512 targetCell.getValueOffset(), targetCell.getValueLength()));
513 }
514 context.getCounter(Counter.DIFFERENTCELLVALUES).increment(1);
515 matchingRow = false;
516
517 if (!dryRun) {
518
519 if (put == null) {
520 put = new Put(rowKey);
521 }
522 put.add(sourceCell);
523 }
524 }
525 sourceCell = sourceCells.nextCellInRow();
526 targetCell = targetCells.nextCellInRow();
527 }
528
529 if (!dryRun && sourceTableHash.scanBatch > 0) {
530 if (put != null && put.size() >= sourceTableHash.scanBatch) {
531 context.write(new ImmutableBytesWritable(rowKey), put);
532 put = null;
533 }
534 if (delete != null && delete.size() >= sourceTableHash.scanBatch) {
535 context.write(new ImmutableBytesWritable(rowKey), delete);
536 delete = null;
537 }
538 }
539 }
540
541 if (!dryRun) {
542 if (put != null) {
543 context.write(new ImmutableBytesWritable(rowKey), put);
544 }
545 if (delete != null) {
546 context.write(new ImmutableBytesWritable(rowKey), delete);
547 }
548 }
549
550 if (matchingCells > 0) {
551 context.getCounter(Counter.MATCHINGCELLS).increment(matchingCells);
552 }
553 if (matchingRow) {
554 context.getCounter(Counter.MATCHINGROWS).increment(1);
555 return true;
556 } else {
557 context.getCounter(Counter.ROWSWITHDIFFS).increment(1);
558 return false;
559 }
560 }
561
562 private static final CellComparator cellComparator = new CellComparator();
563
564
565
566
567 private static int compareRowKeys(byte[] r1, byte[] r2) {
568 if (r1 == null) {
569 return 1;
570 } else if (r2 == null) {
571 return -1;
572 } else {
573 return cellComparator.compareRows(r1, 0, r1.length, r2, 0, r2.length);
574 }
575 }
576
577
578
579
580
581
582 private static int compareCellKeysWithinRow(Cell c1, Cell c2) {
583 if (c1 == null) {
584 return 1;
585 }
586 if (c2 == null) {
587 return -1;
588 }
589
590 int result = CellComparator.compareFamilies(c1, c2);
591 if (result != 0) {
592 return result;
593 }
594
595 result = CellComparator.compareQualifiers(c1, c2);
596 if (result != 0) {
597 return result;
598 }
599
600
601 return CellComparator.compareTimestamps(c1, c2);
602 }
603
604 @Override
605 protected void cleanup(Context context)
606 throws IOException, InterruptedException {
607 if (mapperException == null) {
608 try {
609 finishRemainingHashRanges(context);
610 } catch (Throwable t) {
611 mapperException = t;
612 }
613 }
614
615 try {
616 sourceTable.close();
617 targetTable.close();
618 sourceConnection.close();
619 targetConnection.close();
620 } catch (Throwable t) {
621 if (mapperException == null) {
622 mapperException = t;
623 } else {
624 LOG.error("Suppressing exception from closing tables", t);
625 }
626 }
627
628
629 if (mapperException != null) {
630 Throwables.propagateIfInstanceOf(mapperException, IOException.class);
631 Throwables.propagateIfInstanceOf(mapperException, InterruptedException.class);
632 Throwables.propagate(mapperException);
633 }
634 }
635
636 private void finishRemainingHashRanges(Context context) throws IOException,
637 InterruptedException {
638 TableSplit split = (TableSplit) context.getInputSplit();
639 byte[] splitEndRow = split.getEndRow();
640 boolean reachedEndOfTable = HashTable.isTableEndRow(splitEndRow);
641
642
643 while (nextSourceKey != null
644 && (nextSourceKey.compareTo(splitEndRow) < 0 || reachedEndOfTable)) {
645 moveToNextBatch(context);
646 }
647
648 if (targetHasher.isBatchStarted()) {
649
650
651 if ((nextSourceKey != null && nextSourceKey.compareTo(splitEndRow) > 0)
652 || (nextSourceKey == null && !Bytes.equals(splitEndRow, sourceTableHash.stopRow))) {
653
654
655 Scan scan = sourceTableHash.initScan();
656 scan.setStartRow(splitEndRow);
657 if (nextSourceKey == null) {
658 scan.setStopRow(sourceTableHash.stopRow);
659 } else {
660 scan.setStopRow(nextSourceKey.copyBytes());
661 }
662
663 ResultScanner targetScanner = null;
664 try {
665 targetScanner = targetTable.getScanner(scan);
666 for (Result row : targetScanner) {
667 targetHasher.hashResult(row);
668 }
669 } finally {
670 if (targetScanner != null) {
671 targetScanner.close();
672 }
673 }
674 }
675
676 finishBatchAndCompareHashes(context);
677 }
678 }
679 }
680
681 private static final int NUM_ARGS = 3;
682 private static void printUsage(final String errorMsg) {
683 if (errorMsg != null && errorMsg.length() > 0) {
684 System.err.println("ERROR: " + errorMsg);
685 System.err.println();
686 }
687 System.err.println("Usage: SyncTable [options] <sourcehashdir> <sourcetable> <targettable>");
688 System.err.println();
689 System.err.println("Options:");
690
691 System.err.println(" sourcezkcluster ZK cluster key of the source table");
692 System.err.println(" (defaults to cluster in classpath's config)");
693 System.err.println(" targetzkcluster ZK cluster key of the target table");
694 System.err.println(" (defaults to cluster in classpath's config)");
695 System.err.println(" dryrun if true, output counters but no writes");
696 System.err.println(" (defaults to false)");
697 System.err.println();
698 System.err.println("Args:");
699 System.err.println(" sourcehashdir path to HashTable output dir for source table");
700 System.err.println(" (see org.apache.hadoop.hbase.mapreduce.HashTable)");
701 System.err.println(" sourcetable Name of the source table to sync from");
702 System.err.println(" targettable Name of the target table to sync to");
703 System.err.println();
704 System.err.println("Examples:");
705 System.err.println(" For a dry run SyncTable of tableA from a remote source cluster");
706 System.err.println(" to a local target cluster:");
707 System.err.println(" $ bin/hbase " +
708 "org.apache.hadoop.hbase.mapreduce.SyncTable --dryrun=true"
709 + " --sourcezkcluster=zk1.example.com,zk2.example.com,zk3.example.com:2181:/hbase"
710 + " hdfs://nn:9000/hashes/tableA tableA tableA");
711 }
712
713 private boolean doCommandLine(final String[] args) {
714 if (args.length < NUM_ARGS) {
715 printUsage(null);
716 return false;
717 }
718 try {
719 sourceHashDir = new Path(args[args.length - 3]);
720 sourceTableName = args[args.length - 2];
721 targetTableName = args[args.length - 1];
722
723 for (int i = 0; i < args.length - NUM_ARGS; i++) {
724 String cmd = args[i];
725 if (cmd.equals("-h") || cmd.startsWith("--h")) {
726 printUsage(null);
727 return false;
728 }
729
730 final String sourceZkClusterKey = "--sourcezkcluster=";
731 if (cmd.startsWith(sourceZkClusterKey)) {
732 sourceZkCluster = cmd.substring(sourceZkClusterKey.length());
733 continue;
734 }
735
736 final String targetZkClusterKey = "--targetzkcluster=";
737 if (cmd.startsWith(targetZkClusterKey)) {
738 targetZkCluster = cmd.substring(targetZkClusterKey.length());
739 continue;
740 }
741
742 final String dryRunKey = "--dryrun=";
743 if (cmd.startsWith(dryRunKey)) {
744 dryRun = Boolean.parseBoolean(cmd.substring(dryRunKey.length()));
745 continue;
746 }
747
748 printUsage("Invalid argument '" + cmd + "'");
749 return false;
750 }
751
752
753 } catch (Exception e) {
754 e.printStackTrace();
755 printUsage("Can't start because " + e.getMessage());
756 return false;
757 }
758 return true;
759 }
760
761
762
763
764 public static void main(String[] args) throws Exception {
765 int ret = ToolRunner.run(new SyncTable(HBaseConfiguration.create()), args);
766 System.exit(ret);
767 }
768
769 @Override
770 public int run(String[] args) throws Exception {
771 String[] otherArgs = new GenericOptionsParser(getConf(), args).getRemainingArgs();
772 if (!doCommandLine(otherArgs)) {
773 return 1;
774 }
775
776 Job job = createSubmittableJob(otherArgs);
777 if (!job.waitForCompletion(true)) {
778 LOG.info("Map-reduce job failed!");
779 return 1;
780 }
781 counters = job.getCounters();
782 return 0;
783 }
784
785 }