001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import java.io.IOException; 021import java.util.Collections; 022import java.util.Iterator; 023import org.apache.hadoop.conf.Configuration; 024import org.apache.hadoop.conf.Configured; 025import org.apache.hadoop.fs.FileStatus; 026import org.apache.hadoop.fs.FileSystem; 027import org.apache.hadoop.fs.Path; 028import org.apache.hadoop.hbase.Cell; 029import org.apache.hadoop.hbase.CellComparator; 030import org.apache.hadoop.hbase.CellUtil; 031import org.apache.hadoop.hbase.HBaseConfiguration; 032import org.apache.hadoop.hbase.TableName; 033import org.apache.hadoop.hbase.client.Connection; 034import org.apache.hadoop.hbase.client.ConnectionFactory; 035import org.apache.hadoop.hbase.client.Delete; 036import org.apache.hadoop.hbase.client.Mutation; 037import org.apache.hadoop.hbase.client.Put; 038import org.apache.hadoop.hbase.client.Result; 039import org.apache.hadoop.hbase.client.ResultScanner; 040import org.apache.hadoop.hbase.client.Scan; 041import org.apache.hadoop.hbase.client.Table; 042import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 043import org.apache.hadoop.hbase.util.Bytes; 044import org.apache.hadoop.mapreduce.Counters; 045import org.apache.hadoop.mapreduce.Job; 046import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; 047import org.apache.hadoop.util.GenericOptionsParser; 048import org.apache.hadoop.util.Tool; 049import org.apache.hadoop.util.ToolRunner; 050import org.apache.yetus.audience.InterfaceAudience; 051import org.slf4j.Logger; 052import org.slf4j.LoggerFactory; 053 054import org.apache.hbase.thirdparty.com.google.common.base.Throwables; 055 056@InterfaceAudience.Private 057public class SyncTable extends Configured implements Tool { 058 059 private static final Logger LOG = LoggerFactory.getLogger(SyncTable.class); 060 061 static final String SOURCE_HASH_DIR_CONF_KEY = "sync.table.source.hash.dir"; 062 static final String SOURCE_TABLE_CONF_KEY = "sync.table.source.table.name"; 063 static final String TARGET_TABLE_CONF_KEY = "sync.table.target.table.name"; 064 static final String SOURCE_ZK_CLUSTER_CONF_KEY = "sync.table.source.zk.cluster"; 065 static final String TARGET_ZK_CLUSTER_CONF_KEY = "sync.table.target.zk.cluster"; 066 static final String DRY_RUN_CONF_KEY="sync.table.dry.run"; 067 068 Path sourceHashDir; 069 String sourceTableName; 070 String targetTableName; 071 072 String sourceZkCluster; 073 String targetZkCluster; 074 boolean dryRun; 075 076 Counters counters; 077 078 public SyncTable(Configuration conf) { 079 super(conf); 080 } 081 082 public Job createSubmittableJob(String[] args) throws IOException { 083 FileSystem fs = sourceHashDir.getFileSystem(getConf()); 084 if (!fs.exists(sourceHashDir)) { 085 throw new IOException("Source hash dir not found: " + sourceHashDir); 086 } 087 088 HashTable.TableHash tableHash = HashTable.TableHash.read(getConf(), sourceHashDir); 089 LOG.info("Read source hash manifest: " + tableHash); 090 LOG.info("Read " + tableHash.partitions.size() + " partition keys"); 091 if (!tableHash.tableName.equals(sourceTableName)) { 092 LOG.warn("Table name mismatch - manifest indicates hash was taken from: " 093 + tableHash.tableName + " but job is reading from: " + sourceTableName); 094 } 095 if (tableHash.numHashFiles != tableHash.partitions.size() + 1) { 096 throw new RuntimeException("Hash data appears corrupt. The number of of hash files created" 097 + " should be 1 more than the number of partition keys. However, the manifest file " 098 + " says numHashFiles=" + tableHash.numHashFiles + " but the number of partition keys" 099 + " found in the partitions file is " + tableHash.partitions.size()); 100 } 101 102 Path dataDir = new Path(sourceHashDir, HashTable.HASH_DATA_DIR); 103 int dataSubdirCount = 0; 104 for (FileStatus file : fs.listStatus(dataDir)) { 105 if (file.getPath().getName().startsWith(HashTable.OUTPUT_DATA_FILE_PREFIX)) { 106 dataSubdirCount++; 107 } 108 } 109 110 if (dataSubdirCount != tableHash.numHashFiles) { 111 throw new RuntimeException("Hash data appears corrupt. The number of of hash files created" 112 + " should be 1 more than the number of partition keys. However, the number of data dirs" 113 + " found is " + dataSubdirCount + " but the number of partition keys" 114 + " found in the partitions file is " + tableHash.partitions.size()); 115 } 116 117 Job job = Job.getInstance(getConf(),getConf().get("mapreduce.job.name", 118 "syncTable_" + sourceTableName + "-" + targetTableName)); 119 Configuration jobConf = job.getConfiguration(); 120 job.setJarByClass(HashTable.class); 121 jobConf.set(SOURCE_HASH_DIR_CONF_KEY, sourceHashDir.toString()); 122 jobConf.set(SOURCE_TABLE_CONF_KEY, sourceTableName); 123 jobConf.set(TARGET_TABLE_CONF_KEY, targetTableName); 124 if (sourceZkCluster != null) { 125 jobConf.set(SOURCE_ZK_CLUSTER_CONF_KEY, sourceZkCluster); 126 } 127 if (targetZkCluster != null) { 128 jobConf.set(TARGET_ZK_CLUSTER_CONF_KEY, targetZkCluster); 129 } 130 jobConf.setBoolean(DRY_RUN_CONF_KEY, dryRun); 131 132 TableMapReduceUtil.initTableMapperJob(targetTableName, tableHash.initScan(), 133 SyncMapper.class, null, null, job); 134 135 job.setNumReduceTasks(0); 136 137 if (dryRun) { 138 job.setOutputFormatClass(NullOutputFormat.class); 139 } else { 140 // No reducers. Just write straight to table. Call initTableReducerJob 141 // because it sets up the TableOutputFormat. 142 TableMapReduceUtil.initTableReducerJob(targetTableName, null, job, null, 143 targetZkCluster, null, null); 144 145 // would be nice to add an option for bulk load instead 146 } 147 148 // Obtain an authentication token, for the specified cluster, on behalf of the current user 149 if (sourceZkCluster != null) { 150 Configuration peerConf = 151 HBaseConfiguration.createClusterConf(job.getConfiguration(), sourceZkCluster); 152 TableMapReduceUtil.initCredentialsForCluster(job, peerConf); 153 } 154 return job; 155 } 156 157 public static class SyncMapper extends TableMapper<ImmutableBytesWritable, Mutation> { 158 Path sourceHashDir; 159 160 Connection sourceConnection; 161 Connection targetConnection; 162 Table sourceTable; 163 Table targetTable; 164 boolean dryRun; 165 166 HashTable.TableHash sourceTableHash; 167 HashTable.TableHash.Reader sourceHashReader; 168 ImmutableBytesWritable currentSourceHash; 169 ImmutableBytesWritable nextSourceKey; 170 HashTable.ResultHasher targetHasher; 171 172 Throwable mapperException; 173 174 public static enum Counter {BATCHES, HASHES_MATCHED, HASHES_NOT_MATCHED, SOURCEMISSINGROWS, 175 SOURCEMISSINGCELLS, TARGETMISSINGROWS, TARGETMISSINGCELLS, ROWSWITHDIFFS, DIFFERENTCELLVALUES, 176 MATCHINGROWS, MATCHINGCELLS, EMPTY_BATCHES, RANGESMATCHED, RANGESNOTMATCHED}; 177 178 @Override 179 protected void setup(Context context) throws IOException { 180 181 Configuration conf = context.getConfiguration(); 182 sourceHashDir = new Path(conf.get(SOURCE_HASH_DIR_CONF_KEY)); 183 sourceConnection = openConnection(conf, SOURCE_ZK_CLUSTER_CONF_KEY, null); 184 targetConnection = openConnection(conf, TARGET_ZK_CLUSTER_CONF_KEY, 185 TableOutputFormat.OUTPUT_CONF_PREFIX); 186 sourceTable = openTable(sourceConnection, conf, SOURCE_TABLE_CONF_KEY); 187 targetTable = openTable(targetConnection, conf, TARGET_TABLE_CONF_KEY); 188 dryRun = conf.getBoolean(SOURCE_TABLE_CONF_KEY, false); 189 190 sourceTableHash = HashTable.TableHash.read(conf, sourceHashDir); 191 LOG.info("Read source hash manifest: " + sourceTableHash); 192 LOG.info("Read " + sourceTableHash.partitions.size() + " partition keys"); 193 194 TableSplit split = (TableSplit) context.getInputSplit(); 195 ImmutableBytesWritable splitStartKey = new ImmutableBytesWritable(split.getStartRow()); 196 197 sourceHashReader = sourceTableHash.newReader(conf, splitStartKey); 198 findNextKeyHashPair(); 199 200 // create a hasher, but don't start it right away 201 // instead, find the first hash batch at or after the start row 202 // and skip any rows that come before. they will be caught by the previous task 203 targetHasher = new HashTable.ResultHasher(); 204 } 205 206 private static Connection openConnection(Configuration conf, String zkClusterConfKey, 207 String configPrefix) 208 throws IOException { 209 String zkCluster = conf.get(zkClusterConfKey); 210 Configuration clusterConf = HBaseConfiguration.createClusterConf(conf, 211 zkCluster, configPrefix); 212 return ConnectionFactory.createConnection(clusterConf); 213 } 214 215 private static Table openTable(Connection connection, Configuration conf, 216 String tableNameConfKey) throws IOException { 217 return connection.getTable(TableName.valueOf(conf.get(tableNameConfKey))); 218 } 219 220 /** 221 * Attempt to read the next source key/hash pair. 222 * If there are no more, set nextSourceKey to null 223 */ 224 private void findNextKeyHashPair() throws IOException { 225 boolean hasNext = sourceHashReader.next(); 226 if (hasNext) { 227 nextSourceKey = sourceHashReader.getCurrentKey(); 228 } else { 229 // no more keys - last hash goes to the end 230 nextSourceKey = null; 231 } 232 } 233 234 @Override 235 protected void map(ImmutableBytesWritable key, Result value, Context context) 236 throws IOException, InterruptedException { 237 try { 238 // first, finish any hash batches that end before the scanned row 239 while (nextSourceKey != null && key.compareTo(nextSourceKey) >= 0) { 240 moveToNextBatch(context); 241 } 242 243 // next, add the scanned row (as long as we've reached the first batch) 244 if (targetHasher.isBatchStarted()) { 245 targetHasher.hashResult(value); 246 } 247 } catch (Throwable t) { 248 mapperException = t; 249 Throwables.propagateIfInstanceOf(t, IOException.class); 250 Throwables.propagateIfInstanceOf(t, InterruptedException.class); 251 Throwables.propagate(t); 252 } 253 } 254 255 /** 256 * If there is an open hash batch, complete it and sync if there are diffs. 257 * Start a new batch, and seek to read the 258 */ 259 private void moveToNextBatch(Context context) throws IOException, InterruptedException { 260 if (targetHasher.isBatchStarted()) { 261 finishBatchAndCompareHashes(context); 262 } 263 targetHasher.startBatch(nextSourceKey); 264 currentSourceHash = sourceHashReader.getCurrentHash(); 265 266 findNextKeyHashPair(); 267 } 268 269 /** 270 * Finish the currently open hash batch. 271 * Compare the target hash to the given source hash. 272 * If they do not match, then sync the covered key range. 273 */ 274 private void finishBatchAndCompareHashes(Context context) 275 throws IOException, InterruptedException { 276 targetHasher.finishBatch(); 277 context.getCounter(Counter.BATCHES).increment(1); 278 if (targetHasher.getBatchSize() == 0) { 279 context.getCounter(Counter.EMPTY_BATCHES).increment(1); 280 } 281 ImmutableBytesWritable targetHash = targetHasher.getBatchHash(); 282 if (targetHash.equals(currentSourceHash)) { 283 context.getCounter(Counter.HASHES_MATCHED).increment(1); 284 } else { 285 context.getCounter(Counter.HASHES_NOT_MATCHED).increment(1); 286 287 ImmutableBytesWritable stopRow = nextSourceKey == null 288 ? new ImmutableBytesWritable(sourceTableHash.stopRow) 289 : nextSourceKey; 290 291 if (LOG.isDebugEnabled()) { 292 LOG.debug("Hash mismatch. Key range: " + toHex(targetHasher.getBatchStartKey()) 293 + " to " + toHex(stopRow) 294 + " sourceHash: " + toHex(currentSourceHash) 295 + " targetHash: " + toHex(targetHash)); 296 } 297 298 syncRange(context, targetHasher.getBatchStartKey(), stopRow); 299 } 300 } 301 private static String toHex(ImmutableBytesWritable bytes) { 302 return Bytes.toHex(bytes.get(), bytes.getOffset(), bytes.getLength()); 303 } 304 305 private static final CellScanner EMPTY_CELL_SCANNER 306 = new CellScanner(Collections.<Result>emptyIterator()); 307 308 /** 309 * Rescan the given range directly from the source and target tables. 310 * Count and log differences, and if this is not a dry run, output Puts and Deletes 311 * to make the target table match the source table for this range 312 */ 313 private void syncRange(Context context, ImmutableBytesWritable startRow, 314 ImmutableBytesWritable stopRow) throws IOException, InterruptedException { 315 Scan scan = sourceTableHash.initScan(); 316 scan.setStartRow(startRow.copyBytes()); 317 scan.setStopRow(stopRow.copyBytes()); 318 319 ResultScanner sourceScanner = sourceTable.getScanner(scan); 320 CellScanner sourceCells = new CellScanner(sourceScanner.iterator()); 321 322 ResultScanner targetScanner = targetTable.getScanner(new Scan(scan)); 323 CellScanner targetCells = new CellScanner(targetScanner.iterator()); 324 325 boolean rangeMatched = true; 326 byte[] nextSourceRow = sourceCells.nextRow(); 327 byte[] nextTargetRow = targetCells.nextRow(); 328 while(nextSourceRow != null || nextTargetRow != null) { 329 boolean rowMatched; 330 int rowComparison = compareRowKeys(nextSourceRow, nextTargetRow); 331 if (rowComparison < 0) { 332 if (LOG.isInfoEnabled()) { 333 LOG.info("Target missing row: " + Bytes.toHex(nextSourceRow)); 334 } 335 context.getCounter(Counter.TARGETMISSINGROWS).increment(1); 336 337 rowMatched = syncRowCells(context, nextSourceRow, sourceCells, EMPTY_CELL_SCANNER); 338 nextSourceRow = sourceCells.nextRow(); // advance only source to next row 339 } else if (rowComparison > 0) { 340 if (LOG.isInfoEnabled()) { 341 LOG.info("Source missing row: " + Bytes.toHex(nextTargetRow)); 342 } 343 context.getCounter(Counter.SOURCEMISSINGROWS).increment(1); 344 345 rowMatched = syncRowCells(context, nextTargetRow, EMPTY_CELL_SCANNER, targetCells); 346 nextTargetRow = targetCells.nextRow(); // advance only target to next row 347 } else { 348 // current row is the same on both sides, compare cell by cell 349 rowMatched = syncRowCells(context, nextSourceRow, sourceCells, targetCells); 350 nextSourceRow = sourceCells.nextRow(); 351 nextTargetRow = targetCells.nextRow(); 352 } 353 354 if (!rowMatched) { 355 rangeMatched = false; 356 } 357 } 358 359 sourceScanner.close(); 360 targetScanner.close(); 361 362 context.getCounter(rangeMatched ? Counter.RANGESMATCHED : Counter.RANGESNOTMATCHED) 363 .increment(1); 364 } 365 366 private static class CellScanner { 367 private final Iterator<Result> results; 368 369 private byte[] currentRow; 370 private Result currentRowResult; 371 private int nextCellInRow; 372 373 private Result nextRowResult; 374 375 public CellScanner(Iterator<Result> results) { 376 this.results = results; 377 } 378 379 /** 380 * Advance to the next row and return its row key. 381 * Returns null iff there are no more rows. 382 */ 383 public byte[] nextRow() { 384 if (nextRowResult == null) { 385 // no cached row - check scanner for more 386 while (results.hasNext()) { 387 nextRowResult = results.next(); 388 Cell nextCell = nextRowResult.rawCells()[0]; 389 if (currentRow == null 390 || !Bytes.equals(currentRow, 0, currentRow.length, nextCell.getRowArray(), 391 nextCell.getRowOffset(), nextCell.getRowLength())) { 392 // found next row 393 break; 394 } else { 395 // found another result from current row, keep scanning 396 nextRowResult = null; 397 } 398 } 399 400 if (nextRowResult == null) { 401 // end of data, no more rows 402 currentRowResult = null; 403 currentRow = null; 404 return null; 405 } 406 } 407 408 // advance to cached result for next row 409 currentRowResult = nextRowResult; 410 nextCellInRow = 0; 411 currentRow = currentRowResult.getRow(); 412 nextRowResult = null; 413 return currentRow; 414 } 415 416 /** 417 * Returns the next Cell in the current row or null iff none remain. 418 */ 419 public Cell nextCellInRow() { 420 if (currentRowResult == null) { 421 // nothing left in current row 422 return null; 423 } 424 425 Cell nextCell = currentRowResult.rawCells()[nextCellInRow]; 426 nextCellInRow++; 427 if (nextCellInRow == currentRowResult.size()) { 428 if (results.hasNext()) { 429 Result result = results.next(); 430 Cell cell = result.rawCells()[0]; 431 if (Bytes.equals(currentRow, 0, currentRow.length, cell.getRowArray(), 432 cell.getRowOffset(), cell.getRowLength())) { 433 // result is part of current row 434 currentRowResult = result; 435 nextCellInRow = 0; 436 } else { 437 // result is part of next row, cache it 438 nextRowResult = result; 439 // current row is complete 440 currentRowResult = null; 441 } 442 } else { 443 // end of data 444 currentRowResult = null; 445 } 446 } 447 return nextCell; 448 } 449 } 450 451 /** 452 * Compare the cells for the given row from the source and target tables. 453 * Count and log any differences. 454 * If not a dry run, output a Put and/or Delete needed to sync the target table 455 * to match the source table. 456 */ 457 private boolean syncRowCells(Context context, byte[] rowKey, CellScanner sourceCells, 458 CellScanner targetCells) throws IOException, InterruptedException { 459 Put put = null; 460 Delete delete = null; 461 long matchingCells = 0; 462 boolean matchingRow = true; 463 Cell sourceCell = sourceCells.nextCellInRow(); 464 Cell targetCell = targetCells.nextCellInRow(); 465 while (sourceCell != null || targetCell != null) { 466 467 int cellKeyComparison = compareCellKeysWithinRow(sourceCell, targetCell); 468 if (cellKeyComparison < 0) { 469 if (LOG.isDebugEnabled()) { 470 LOG.debug("Target missing cell: " + sourceCell); 471 } 472 context.getCounter(Counter.TARGETMISSINGCELLS).increment(1); 473 matchingRow = false; 474 475 if (!dryRun) { 476 if (put == null) { 477 put = new Put(rowKey); 478 } 479 put.add(sourceCell); 480 } 481 482 sourceCell = sourceCells.nextCellInRow(); 483 } else if (cellKeyComparison > 0) { 484 if (LOG.isDebugEnabled()) { 485 LOG.debug("Source missing cell: " + targetCell); 486 } 487 context.getCounter(Counter.SOURCEMISSINGCELLS).increment(1); 488 matchingRow = false; 489 490 if (!dryRun) { 491 if (delete == null) { 492 delete = new Delete(rowKey); 493 } 494 // add a tombstone to exactly match the target cell that is missing on the source 495 delete.addColumn(CellUtil.cloneFamily(targetCell), 496 CellUtil.cloneQualifier(targetCell), targetCell.getTimestamp()); 497 } 498 499 targetCell = targetCells.nextCellInRow(); 500 } else { 501 // the cell keys are equal, now check values 502 if (CellUtil.matchingValue(sourceCell, targetCell)) { 503 matchingCells++; 504 } else { 505 if (LOG.isDebugEnabled()) { 506 LOG.debug("Different values: "); 507 LOG.debug(" source cell: " + sourceCell 508 + " value: " + Bytes.toHex(sourceCell.getValueArray(), 509 sourceCell.getValueOffset(), sourceCell.getValueLength())); 510 LOG.debug(" target cell: " + targetCell 511 + " value: " + Bytes.toHex(targetCell.getValueArray(), 512 targetCell.getValueOffset(), targetCell.getValueLength())); 513 } 514 context.getCounter(Counter.DIFFERENTCELLVALUES).increment(1); 515 matchingRow = false; 516 517 if (!dryRun) { 518 // overwrite target cell 519 if (put == null) { 520 put = new Put(rowKey); 521 } 522 put.add(sourceCell); 523 } 524 } 525 sourceCell = sourceCells.nextCellInRow(); 526 targetCell = targetCells.nextCellInRow(); 527 } 528 529 if (!dryRun && sourceTableHash.scanBatch > 0) { 530 if (put != null && put.size() >= sourceTableHash.scanBatch) { 531 context.write(new ImmutableBytesWritable(rowKey), put); 532 put = null; 533 } 534 if (delete != null && delete.size() >= sourceTableHash.scanBatch) { 535 context.write(new ImmutableBytesWritable(rowKey), delete); 536 delete = null; 537 } 538 } 539 } 540 541 if (!dryRun) { 542 if (put != null) { 543 context.write(new ImmutableBytesWritable(rowKey), put); 544 } 545 if (delete != null) { 546 context.write(new ImmutableBytesWritable(rowKey), delete); 547 } 548 } 549 550 if (matchingCells > 0) { 551 context.getCounter(Counter.MATCHINGCELLS).increment(matchingCells); 552 } 553 if (matchingRow) { 554 context.getCounter(Counter.MATCHINGROWS).increment(1); 555 return true; 556 } else { 557 context.getCounter(Counter.ROWSWITHDIFFS).increment(1); 558 return false; 559 } 560 } 561 562 /** 563 * Compare row keys of the given Result objects. 564 * Nulls are after non-nulls 565 */ 566 private static int compareRowKeys(byte[] r1, byte[] r2) { 567 if (r1 == null) { 568 return 1; // source missing row 569 } else if (r2 == null) { 570 return -1; // target missing row 571 } else { 572 // Sync on no META tables only. We can directly do what CellComparator is doing inside. 573 // Never the call going to MetaCellComparator. 574 return Bytes.compareTo(r1, 0, r1.length, r2, 0, r2.length); 575 } 576 } 577 578 /** 579 * Compare families, qualifiers, and timestamps of the given Cells. 580 * They are assumed to be of the same row. 581 * Nulls are after non-nulls. 582 */ 583 private static int compareCellKeysWithinRow(Cell c1, Cell c2) { 584 if (c1 == null) { 585 return 1; // source missing cell 586 } 587 if (c2 == null) { 588 return -1; // target missing cell 589 } 590 591 int result = CellComparator.getInstance().compareFamilies(c1, c2); 592 if (result != 0) { 593 return result; 594 } 595 596 result = CellComparator.getInstance().compareQualifiers(c1, c2); 597 if (result != 0) { 598 return result; 599 } 600 601 // note timestamp comparison is inverted - more recent cells first 602 return CellComparator.getInstance().compareTimestamps(c1, c2); 603 } 604 605 @Override 606 protected void cleanup(Context context) 607 throws IOException, InterruptedException { 608 if (mapperException == null) { 609 try { 610 finishRemainingHashRanges(context); 611 } catch (Throwable t) { 612 mapperException = t; 613 } 614 } 615 616 try { 617 sourceTable.close(); 618 targetTable.close(); 619 sourceConnection.close(); 620 targetConnection.close(); 621 } catch (Throwable t) { 622 if (mapperException == null) { 623 mapperException = t; 624 } else { 625 LOG.error("Suppressing exception from closing tables", t); 626 } 627 } 628 629 // propagate first exception 630 if (mapperException != null) { 631 Throwables.propagateIfInstanceOf(mapperException, IOException.class); 632 Throwables.propagateIfInstanceOf(mapperException, InterruptedException.class); 633 Throwables.propagate(mapperException); 634 } 635 } 636 637 private void finishRemainingHashRanges(Context context) throws IOException, 638 InterruptedException { 639 TableSplit split = (TableSplit) context.getInputSplit(); 640 byte[] splitEndRow = split.getEndRow(); 641 boolean reachedEndOfTable = HashTable.isTableEndRow(splitEndRow); 642 643 // if there are more hash batches that begin before the end of this split move to them 644 while (nextSourceKey != null 645 && (nextSourceKey.compareTo(splitEndRow) < 0 || reachedEndOfTable)) { 646 moveToNextBatch(context); 647 } 648 649 if (targetHasher.isBatchStarted()) { 650 // need to complete the final open hash batch 651 652 if ((nextSourceKey != null && nextSourceKey.compareTo(splitEndRow) > 0) 653 || (nextSourceKey == null && !Bytes.equals(splitEndRow, sourceTableHash.stopRow))) { 654 // the open hash range continues past the end of this region 655 // add a scan to complete the current hash range 656 Scan scan = sourceTableHash.initScan(); 657 scan.setStartRow(splitEndRow); 658 if (nextSourceKey == null) { 659 scan.setStopRow(sourceTableHash.stopRow); 660 } else { 661 scan.setStopRow(nextSourceKey.copyBytes()); 662 } 663 664 ResultScanner targetScanner = null; 665 try { 666 targetScanner = targetTable.getScanner(scan); 667 for (Result row : targetScanner) { 668 targetHasher.hashResult(row); 669 } 670 } finally { 671 if (targetScanner != null) { 672 targetScanner.close(); 673 } 674 } 675 } // else current batch ends exactly at split end row 676 677 finishBatchAndCompareHashes(context); 678 } 679 } 680 } 681 682 private static final int NUM_ARGS = 3; 683 private static void printUsage(final String errorMsg) { 684 if (errorMsg != null && errorMsg.length() > 0) { 685 System.err.println("ERROR: " + errorMsg); 686 System.err.println(); 687 } 688 System.err.println("Usage: SyncTable [options] <sourcehashdir> <sourcetable> <targettable>"); 689 System.err.println(); 690 System.err.println("Options:"); 691 692 System.err.println(" sourcezkcluster ZK cluster key of the source table"); 693 System.err.println(" (defaults to cluster in classpath's config)"); 694 System.err.println(" targetzkcluster ZK cluster key of the target table"); 695 System.err.println(" (defaults to cluster in classpath's config)"); 696 System.err.println(" dryrun if true, output counters but no writes"); 697 System.err.println(" (defaults to false)"); 698 System.err.println(); 699 System.err.println("Args:"); 700 System.err.println(" sourcehashdir path to HashTable output dir for source table"); 701 System.err.println(" (see org.apache.hadoop.hbase.mapreduce.HashTable)"); 702 System.err.println(" sourcetable Name of the source table to sync from"); 703 System.err.println(" targettable Name of the target table to sync to"); 704 System.err.println(); 705 System.err.println("Examples:"); 706 System.err.println(" For a dry run SyncTable of tableA from a remote source cluster"); 707 System.err.println(" to a local target cluster:"); 708 System.err.println(" $ hbase " + 709 "org.apache.hadoop.hbase.mapreduce.SyncTable --dryrun=true" 710 + " --sourcezkcluster=zk1.example.com,zk2.example.com,zk3.example.com:2181:/hbase" 711 + " hdfs://nn:9000/hashes/tableA tableA tableA"); 712 } 713 714 private boolean doCommandLine(final String[] args) { 715 if (args.length < NUM_ARGS) { 716 printUsage(null); 717 return false; 718 } 719 try { 720 sourceHashDir = new Path(args[args.length - 3]); 721 sourceTableName = args[args.length - 2]; 722 targetTableName = args[args.length - 1]; 723 724 for (int i = 0; i < args.length - NUM_ARGS; i++) { 725 String cmd = args[i]; 726 if (cmd.equals("-h") || cmd.startsWith("--h")) { 727 printUsage(null); 728 return false; 729 } 730 731 final String sourceZkClusterKey = "--sourcezkcluster="; 732 if (cmd.startsWith(sourceZkClusterKey)) { 733 sourceZkCluster = cmd.substring(sourceZkClusterKey.length()); 734 continue; 735 } 736 737 final String targetZkClusterKey = "--targetzkcluster="; 738 if (cmd.startsWith(targetZkClusterKey)) { 739 targetZkCluster = cmd.substring(targetZkClusterKey.length()); 740 continue; 741 } 742 743 final String dryRunKey = "--dryrun="; 744 if (cmd.startsWith(dryRunKey)) { 745 dryRun = Boolean.parseBoolean(cmd.substring(dryRunKey.length())); 746 continue; 747 } 748 749 printUsage("Invalid argument '" + cmd + "'"); 750 return false; 751 } 752 753 754 } catch (Exception e) { 755 e.printStackTrace(); 756 printUsage("Can't start because " + e.getMessage()); 757 return false; 758 } 759 return true; 760 } 761 762 /** 763 * Main entry point. 764 */ 765 public static void main(String[] args) throws Exception { 766 int ret = ToolRunner.run(new SyncTable(HBaseConfiguration.create()), args); 767 System.exit(ret); 768 } 769 770 @Override 771 public int run(String[] args) throws Exception { 772 String[] otherArgs = new GenericOptionsParser(getConf(), args).getRemainingArgs(); 773 if (!doCommandLine(otherArgs)) { 774 return 1; 775 } 776 777 Job job = createSubmittableJob(otherArgs); 778 if (!job.waitForCompletion(true)) { 779 LOG.info("Map-reduce job failed!"); 780 return 1; 781 } 782 counters = job.getCounters(); 783 return 0; 784 } 785 786}