001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import java.io.IOException; 021import java.util.Collections; 022import java.util.Iterator; 023import org.apache.hadoop.conf.Configuration; 024import org.apache.hadoop.conf.Configured; 025import org.apache.hadoop.fs.FileStatus; 026import org.apache.hadoop.fs.FileSystem; 027import org.apache.hadoop.fs.Path; 028import org.apache.hadoop.hbase.Cell; 029import org.apache.hadoop.hbase.CellBuilderFactory; 030import org.apache.hadoop.hbase.CellBuilderType; 031import org.apache.hadoop.hbase.CellComparator; 032import org.apache.hadoop.hbase.CellUtil; 033import org.apache.hadoop.hbase.HBaseConfiguration; 034import org.apache.hadoop.hbase.TableName; 035import org.apache.hadoop.hbase.client.Connection; 036import org.apache.hadoop.hbase.client.ConnectionFactory; 037import org.apache.hadoop.hbase.client.Delete; 038import org.apache.hadoop.hbase.client.Mutation; 039import org.apache.hadoop.hbase.client.Put; 040import org.apache.hadoop.hbase.client.Result; 041import org.apache.hadoop.hbase.client.ResultScanner; 042import org.apache.hadoop.hbase.client.Scan; 043import org.apache.hadoop.hbase.client.Table; 044import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 045import org.apache.hadoop.hbase.util.Bytes; 046import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 047import org.apache.hadoop.mapreduce.Counters; 048import org.apache.hadoop.mapreduce.Job; 049import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; 050import org.apache.hadoop.mapreduce.security.TokenCache; 051import org.apache.hadoop.util.GenericOptionsParser; 052import org.apache.hadoop.util.Tool; 053import org.apache.hadoop.util.ToolRunner; 054import org.apache.yetus.audience.InterfaceAudience; 055import org.slf4j.Logger; 056import org.slf4j.LoggerFactory; 057 058import org.apache.hbase.thirdparty.com.google.common.base.Throwables; 059 060@InterfaceAudience.Private 061public class SyncTable extends Configured implements Tool { 062 063 private static final Logger LOG = LoggerFactory.getLogger(SyncTable.class); 064 065 static final String SOURCE_HASH_DIR_CONF_KEY = "sync.table.source.hash.dir"; 066 static final String SOURCE_TABLE_CONF_KEY = "sync.table.source.table.name"; 067 static final String TARGET_TABLE_CONF_KEY = "sync.table.target.table.name"; 068 static final String SOURCE_ZK_CLUSTER_CONF_KEY = "sync.table.source.zk.cluster"; 069 static final String TARGET_ZK_CLUSTER_CONF_KEY = "sync.table.target.zk.cluster"; 070 static final String DRY_RUN_CONF_KEY = "sync.table.dry.run"; 071 static final String DO_DELETES_CONF_KEY = "sync.table.do.deletes"; 072 static final String DO_PUTS_CONF_KEY = "sync.table.do.puts"; 073 static final String IGNORE_TIMESTAMPS = "sync.table.ignore.timestamps"; 074 075 Path sourceHashDir; 076 String sourceTableName; 077 String targetTableName; 078 079 String sourceZkCluster; 080 String targetZkCluster; 081 boolean dryRun; 082 boolean doDeletes = true; 083 boolean doPuts = true; 084 boolean ignoreTimestamps; 085 086 Counters counters; 087 088 public SyncTable(Configuration conf) { 089 super(conf); 090 } 091 092 private void initCredentialsForHBase(String zookeeper, Job job) throws IOException { 093 Configuration peerConf = 094 HBaseConfiguration.createClusterConf(job.getConfiguration(), zookeeper); 095 if ("kerberos".equalsIgnoreCase(peerConf.get("hbase.security.authentication"))) { 096 TableMapReduceUtil.initCredentialsForCluster(job, peerConf); 097 } 098 } 099 100 public Job createSubmittableJob(String[] args) throws IOException { 101 FileSystem fs = sourceHashDir.getFileSystem(getConf()); 102 if (!fs.exists(sourceHashDir)) { 103 throw new IOException("Source hash dir not found: " + sourceHashDir); 104 } 105 106 Job job = Job.getInstance(getConf(), 107 getConf().get("mapreduce.job.name", "syncTable_" + sourceTableName + "-" + targetTableName)); 108 Configuration jobConf = job.getConfiguration(); 109 if ("kerberos".equalsIgnoreCase(jobConf.get("hadoop.security.authentication"))) { 110 TokenCache.obtainTokensForNamenodes(job.getCredentials(), new Path[] { sourceHashDir }, 111 getConf()); 112 } 113 114 HashTable.TableHash tableHash = HashTable.TableHash.read(getConf(), sourceHashDir); 115 LOG.info("Read source hash manifest: " + tableHash); 116 LOG.info("Read " + tableHash.partitions.size() + " partition keys"); 117 if (!tableHash.tableName.equals(sourceTableName)) { 118 LOG.warn("Table name mismatch - manifest indicates hash was taken from: " 119 + tableHash.tableName + " but job is reading from: " + sourceTableName); 120 } 121 if (tableHash.numHashFiles != tableHash.partitions.size() + 1) { 122 throw new RuntimeException("Hash data appears corrupt. The number of of hash files created" 123 + " should be 1 more than the number of partition keys. However, the manifest file " 124 + " says numHashFiles=" + tableHash.numHashFiles + " but the number of partition keys" 125 + " found in the partitions file is " + tableHash.partitions.size()); 126 } 127 128 Path dataDir = new Path(sourceHashDir, HashTable.HASH_DATA_DIR); 129 int dataSubdirCount = 0; 130 for (FileStatus file : fs.listStatus(dataDir)) { 131 if (file.getPath().getName().startsWith(HashTable.OUTPUT_DATA_FILE_PREFIX)) { 132 dataSubdirCount++; 133 } 134 } 135 136 if (dataSubdirCount != tableHash.numHashFiles) { 137 throw new RuntimeException("Hash data appears corrupt. The number of of hash files created" 138 + " should be 1 more than the number of partition keys. However, the number of data dirs" 139 + " found is " + dataSubdirCount + " but the number of partition keys" 140 + " found in the partitions file is " + tableHash.partitions.size()); 141 } 142 143 job.setJarByClass(HashTable.class); 144 jobConf.set(SOURCE_HASH_DIR_CONF_KEY, sourceHashDir.toString()); 145 jobConf.set(SOURCE_TABLE_CONF_KEY, sourceTableName); 146 jobConf.set(TARGET_TABLE_CONF_KEY, targetTableName); 147 if (sourceZkCluster != null) { 148 jobConf.set(SOURCE_ZK_CLUSTER_CONF_KEY, sourceZkCluster); 149 initCredentialsForHBase(sourceZkCluster, job); 150 } 151 if (targetZkCluster != null) { 152 jobConf.set(TARGET_ZK_CLUSTER_CONF_KEY, targetZkCluster); 153 initCredentialsForHBase(targetZkCluster, job); 154 } 155 jobConf.setBoolean(DRY_RUN_CONF_KEY, dryRun); 156 jobConf.setBoolean(DO_DELETES_CONF_KEY, doDeletes); 157 jobConf.setBoolean(DO_PUTS_CONF_KEY, doPuts); 158 jobConf.setBoolean(IGNORE_TIMESTAMPS, ignoreTimestamps); 159 160 TableMapReduceUtil.initTableMapperJob(targetTableName, tableHash.initScan(), SyncMapper.class, 161 null, null, job); 162 163 job.setNumReduceTasks(0); 164 165 if (dryRun) { 166 job.setOutputFormatClass(NullOutputFormat.class); 167 } else { 168 // No reducers. Just write straight to table. Call initTableReducerJob 169 // because it sets up the TableOutputFormat. 170 TableMapReduceUtil.initTableReducerJob(targetTableName, null, job, null, targetZkCluster, 171 null, null); 172 173 // would be nice to add an option for bulk load instead 174 } 175 176 // Obtain an authentication token, for the specified cluster, on behalf of the current user 177 if (sourceZkCluster != null) { 178 Configuration peerConf = 179 HBaseConfiguration.createClusterConf(job.getConfiguration(), sourceZkCluster); 180 TableMapReduceUtil.initCredentialsForCluster(job, peerConf); 181 } 182 return job; 183 } 184 185 public static class SyncMapper extends TableMapper<ImmutableBytesWritable, Mutation> { 186 Path sourceHashDir; 187 188 Connection sourceConnection; 189 Connection targetConnection; 190 Table sourceTable; 191 Table targetTable; 192 boolean dryRun; 193 boolean doDeletes = true; 194 boolean doPuts = true; 195 boolean ignoreTimestamp; 196 197 HashTable.TableHash sourceTableHash; 198 HashTable.TableHash.Reader sourceHashReader; 199 ImmutableBytesWritable currentSourceHash; 200 ImmutableBytesWritable nextSourceKey; 201 HashTable.ResultHasher targetHasher; 202 203 Throwable mapperException; 204 205 public static enum Counter { 206 BATCHES, 207 HASHES_MATCHED, 208 HASHES_NOT_MATCHED, 209 SOURCEMISSINGROWS, 210 SOURCEMISSINGCELLS, 211 TARGETMISSINGROWS, 212 TARGETMISSINGCELLS, 213 ROWSWITHDIFFS, 214 DIFFERENTCELLVALUES, 215 MATCHINGROWS, 216 MATCHINGCELLS, 217 EMPTY_BATCHES, 218 RANGESMATCHED, 219 RANGESNOTMATCHED 220 }; 221 222 @Override 223 protected void setup(Context context) throws IOException { 224 225 Configuration conf = context.getConfiguration(); 226 sourceHashDir = new Path(conf.get(SOURCE_HASH_DIR_CONF_KEY)); 227 sourceConnection = openConnection(conf, SOURCE_ZK_CLUSTER_CONF_KEY, null); 228 targetConnection = 229 openConnection(conf, TARGET_ZK_CLUSTER_CONF_KEY, TableOutputFormat.OUTPUT_CONF_PREFIX); 230 sourceTable = openTable(sourceConnection, conf, SOURCE_TABLE_CONF_KEY); 231 targetTable = openTable(targetConnection, conf, TARGET_TABLE_CONF_KEY); 232 dryRun = conf.getBoolean(DRY_RUN_CONF_KEY, false); 233 doDeletes = conf.getBoolean(DO_DELETES_CONF_KEY, true); 234 doPuts = conf.getBoolean(DO_PUTS_CONF_KEY, true); 235 ignoreTimestamp = conf.getBoolean(IGNORE_TIMESTAMPS, false); 236 237 sourceTableHash = HashTable.TableHash.read(conf, sourceHashDir); 238 LOG.info("Read source hash manifest: " + sourceTableHash); 239 LOG.info("Read " + sourceTableHash.partitions.size() + " partition keys"); 240 241 TableSplit split = (TableSplit) context.getInputSplit(); 242 ImmutableBytesWritable splitStartKey = new ImmutableBytesWritable(split.getStartRow()); 243 244 sourceHashReader = sourceTableHash.newReader(conf, splitStartKey); 245 findNextKeyHashPair(); 246 247 // create a hasher, but don't start it right away 248 // instead, find the first hash batch at or after the start row 249 // and skip any rows that come before. they will be caught by the previous task 250 targetHasher = new HashTable.ResultHasher(); 251 targetHasher.ignoreTimestamps = ignoreTimestamp; 252 } 253 254 private static Connection openConnection(Configuration conf, String zkClusterConfKey, 255 String configPrefix) throws IOException { 256 String zkCluster = conf.get(zkClusterConfKey); 257 Configuration clusterConf = 258 HBaseConfiguration.createClusterConf(conf, zkCluster, configPrefix); 259 return ConnectionFactory.createConnection(clusterConf); 260 } 261 262 private static Table openTable(Connection connection, Configuration conf, 263 String tableNameConfKey) throws IOException { 264 return connection.getTable(TableName.valueOf(conf.get(tableNameConfKey))); 265 } 266 267 /** 268 * Attempt to read the next source key/hash pair. If there are no more, set nextSourceKey to 269 * null 270 */ 271 private void findNextKeyHashPair() throws IOException { 272 boolean hasNext = sourceHashReader.next(); 273 if (hasNext) { 274 nextSourceKey = sourceHashReader.getCurrentKey(); 275 } else { 276 // no more keys - last hash goes to the end 277 nextSourceKey = null; 278 } 279 } 280 281 @Override 282 protected void map(ImmutableBytesWritable key, Result value, Context context) 283 throws IOException, InterruptedException { 284 try { 285 // first, finish any hash batches that end before the scanned row 286 while (nextSourceKey != null && key.compareTo(nextSourceKey) >= 0) { 287 moveToNextBatch(context); 288 } 289 290 // next, add the scanned row (as long as we've reached the first batch) 291 if (targetHasher.isBatchStarted()) { 292 targetHasher.hashResult(value); 293 } 294 } catch (Throwable t) { 295 mapperException = t; 296 Throwables.propagateIfInstanceOf(t, IOException.class); 297 Throwables.propagateIfInstanceOf(t, InterruptedException.class); 298 Throwables.propagate(t); 299 } 300 } 301 302 /** 303 * If there is an open hash batch, complete it and sync if there are diffs. Start a new batch, 304 * and seek to read the 305 */ 306 private void moveToNextBatch(Context context) throws IOException, InterruptedException { 307 if (targetHasher.isBatchStarted()) { 308 finishBatchAndCompareHashes(context); 309 } 310 targetHasher.startBatch(nextSourceKey); 311 currentSourceHash = sourceHashReader.getCurrentHash(); 312 313 findNextKeyHashPair(); 314 } 315 316 /** 317 * Finish the currently open hash batch. Compare the target hash to the given source hash. If 318 * they do not match, then sync the covered key range. 319 */ 320 private void finishBatchAndCompareHashes(Context context) 321 throws IOException, InterruptedException { 322 targetHasher.finishBatch(); 323 context.getCounter(Counter.BATCHES).increment(1); 324 if (targetHasher.getBatchSize() == 0) { 325 context.getCounter(Counter.EMPTY_BATCHES).increment(1); 326 } 327 ImmutableBytesWritable targetHash = targetHasher.getBatchHash(); 328 if (targetHash.equals(currentSourceHash)) { 329 context.getCounter(Counter.HASHES_MATCHED).increment(1); 330 } else { 331 context.getCounter(Counter.HASHES_NOT_MATCHED).increment(1); 332 333 ImmutableBytesWritable stopRow = nextSourceKey == null 334 ? new ImmutableBytesWritable(sourceTableHash.stopRow) 335 : nextSourceKey; 336 337 if (LOG.isDebugEnabled()) { 338 LOG.debug("Hash mismatch. Key range: " + toHex(targetHasher.getBatchStartKey()) + " to " 339 + toHex(stopRow) + " sourceHash: " + toHex(currentSourceHash) + " targetHash: " 340 + toHex(targetHash)); 341 } 342 343 syncRange(context, targetHasher.getBatchStartKey(), stopRow); 344 } 345 } 346 347 private static String toHex(ImmutableBytesWritable bytes) { 348 return Bytes.toHex(bytes.get(), bytes.getOffset(), bytes.getLength()); 349 } 350 351 private static final CellScanner EMPTY_CELL_SCANNER = 352 new CellScanner(Collections.<Result> emptyIterator()); 353 354 /** 355 * Rescan the given range directly from the source and target tables. Count and log differences, 356 * and if this is not a dry run, output Puts and Deletes to make the target table match the 357 * source table for this range 358 */ 359 private void syncRange(Context context, ImmutableBytesWritable startRow, 360 ImmutableBytesWritable stopRow) throws IOException, InterruptedException { 361 Scan scan = sourceTableHash.initScan(); 362 scan.setStartRow(startRow.copyBytes()); 363 scan.setStopRow(stopRow.copyBytes()); 364 365 ResultScanner sourceScanner = sourceTable.getScanner(scan); 366 CellScanner sourceCells = new CellScanner(sourceScanner.iterator()); 367 368 ResultScanner targetScanner = targetTable.getScanner(new Scan(scan)); 369 CellScanner targetCells = new CellScanner(targetScanner.iterator()); 370 371 boolean rangeMatched = true; 372 byte[] nextSourceRow = sourceCells.nextRow(); 373 byte[] nextTargetRow = targetCells.nextRow(); 374 while (nextSourceRow != null || nextTargetRow != null) { 375 boolean rowMatched; 376 int rowComparison = compareRowKeys(nextSourceRow, nextTargetRow); 377 if (rowComparison < 0) { 378 if (LOG.isDebugEnabled()) { 379 LOG.debug("Target missing row: " + Bytes.toString(nextSourceRow)); 380 } 381 context.getCounter(Counter.TARGETMISSINGROWS).increment(1); 382 383 rowMatched = syncRowCells(context, nextSourceRow, sourceCells, EMPTY_CELL_SCANNER); 384 nextSourceRow = sourceCells.nextRow(); // advance only source to next row 385 } else if (rowComparison > 0) { 386 if (LOG.isDebugEnabled()) { 387 LOG.debug("Source missing row: " + Bytes.toString(nextTargetRow)); 388 } 389 context.getCounter(Counter.SOURCEMISSINGROWS).increment(1); 390 391 rowMatched = syncRowCells(context, nextTargetRow, EMPTY_CELL_SCANNER, targetCells); 392 nextTargetRow = targetCells.nextRow(); // advance only target to next row 393 } else { 394 // current row is the same on both sides, compare cell by cell 395 rowMatched = syncRowCells(context, nextSourceRow, sourceCells, targetCells); 396 nextSourceRow = sourceCells.nextRow(); 397 nextTargetRow = targetCells.nextRow(); 398 } 399 400 if (!rowMatched) { 401 rangeMatched = false; 402 } 403 } 404 405 sourceScanner.close(); 406 targetScanner.close(); 407 408 context.getCounter(rangeMatched ? Counter.RANGESMATCHED : Counter.RANGESNOTMATCHED) 409 .increment(1); 410 } 411 412 private static class CellScanner { 413 private final Iterator<Result> results; 414 415 private byte[] currentRow; 416 private Result currentRowResult; 417 private int nextCellInRow; 418 419 private Result nextRowResult; 420 421 public CellScanner(Iterator<Result> results) { 422 this.results = results; 423 } 424 425 /** 426 * Advance to the next row and return its row key. Returns null iff there are no more rows. 427 */ 428 public byte[] nextRow() { 429 if (nextRowResult == null) { 430 // no cached row - check scanner for more 431 while (results.hasNext()) { 432 nextRowResult = results.next(); 433 Cell nextCell = nextRowResult.rawCells()[0]; 434 if ( 435 currentRow == null || !Bytes.equals(currentRow, 0, currentRow.length, 436 nextCell.getRowArray(), nextCell.getRowOffset(), nextCell.getRowLength()) 437 ) { 438 // found next row 439 break; 440 } else { 441 // found another result from current row, keep scanning 442 nextRowResult = null; 443 } 444 } 445 446 if (nextRowResult == null) { 447 // end of data, no more rows 448 currentRowResult = null; 449 currentRow = null; 450 return null; 451 } 452 } 453 454 // advance to cached result for next row 455 currentRowResult = nextRowResult; 456 nextCellInRow = 0; 457 currentRow = currentRowResult.getRow(); 458 nextRowResult = null; 459 return currentRow; 460 } 461 462 /** 463 * Returns the next Cell in the current row or null iff none remain. 464 */ 465 public Cell nextCellInRow() { 466 if (currentRowResult == null) { 467 // nothing left in current row 468 return null; 469 } 470 471 Cell nextCell = currentRowResult.rawCells()[nextCellInRow]; 472 nextCellInRow++; 473 if (nextCellInRow == currentRowResult.size()) { 474 if (results.hasNext()) { 475 Result result = results.next(); 476 Cell cell = result.rawCells()[0]; 477 if ( 478 Bytes.equals(currentRow, 0, currentRow.length, cell.getRowArray(), 479 cell.getRowOffset(), cell.getRowLength()) 480 ) { 481 // result is part of current row 482 currentRowResult = result; 483 nextCellInRow = 0; 484 } else { 485 // result is part of next row, cache it 486 nextRowResult = result; 487 // current row is complete 488 currentRowResult = null; 489 } 490 } else { 491 // end of data 492 currentRowResult = null; 493 } 494 } 495 return nextCell; 496 } 497 } 498 499 private Cell checkAndResetTimestamp(Cell sourceCell) { 500 if (ignoreTimestamp) { 501 sourceCell = 502 CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setType(sourceCell.getType()) 503 .setRow(sourceCell.getRowArray(), sourceCell.getRowOffset(), sourceCell.getRowLength()) 504 .setFamily(sourceCell.getFamilyArray(), sourceCell.getFamilyOffset(), 505 sourceCell.getFamilyLength()) 506 .setQualifier(sourceCell.getQualifierArray(), sourceCell.getQualifierOffset(), 507 sourceCell.getQualifierLength()) 508 .setTimestamp(EnvironmentEdgeManager.currentTime()).setValue(sourceCell.getValueArray(), 509 sourceCell.getValueOffset(), sourceCell.getValueLength()) 510 .build(); 511 } 512 return sourceCell; 513 } 514 515 /** 516 * Compare the cells for the given row from the source and target tables. Count and log any 517 * differences. If not a dry run, output a Put and/or Delete needed to sync the target table to 518 * match the source table. 519 */ 520 private boolean syncRowCells(Context context, byte[] rowKey, CellScanner sourceCells, 521 CellScanner targetCells) throws IOException, InterruptedException { 522 Put put = null; 523 Delete delete = null; 524 long matchingCells = 0; 525 boolean matchingRow = true; 526 Cell sourceCell = sourceCells.nextCellInRow(); 527 Cell targetCell = targetCells.nextCellInRow(); 528 while (sourceCell != null || targetCell != null) { 529 530 int cellKeyComparison = compareCellKeysWithinRow(sourceCell, targetCell); 531 if (cellKeyComparison < 0) { 532 if (LOG.isDebugEnabled()) { 533 LOG.debug("Target missing cell: " + sourceCell); 534 } 535 context.getCounter(Counter.TARGETMISSINGCELLS).increment(1); 536 matchingRow = false; 537 538 if (!dryRun && doPuts) { 539 if (put == null) { 540 put = new Put(rowKey); 541 } 542 sourceCell = checkAndResetTimestamp(sourceCell); 543 put.add(sourceCell); 544 } 545 546 sourceCell = sourceCells.nextCellInRow(); 547 } else if (cellKeyComparison > 0) { 548 if (LOG.isDebugEnabled()) { 549 LOG.debug("Source missing cell: " + targetCell); 550 } 551 context.getCounter(Counter.SOURCEMISSINGCELLS).increment(1); 552 matchingRow = false; 553 554 if (!dryRun && doDeletes) { 555 if (delete == null) { 556 delete = new Delete(rowKey); 557 } 558 // add a tombstone to exactly match the target cell that is missing on the source 559 delete.addColumn(CellUtil.cloneFamily(targetCell), CellUtil.cloneQualifier(targetCell), 560 targetCell.getTimestamp()); 561 } 562 563 targetCell = targetCells.nextCellInRow(); 564 } else { 565 // the cell keys are equal, now check values 566 if (CellUtil.matchingValue(sourceCell, targetCell)) { 567 matchingCells++; 568 } else { 569 if (LOG.isDebugEnabled()) { 570 LOG.debug("Different values: "); 571 LOG.debug(" source cell: " + sourceCell + " value: " 572 + Bytes.toString(sourceCell.getValueArray(), sourceCell.getValueOffset(), 573 sourceCell.getValueLength())); 574 LOG.debug(" target cell: " + targetCell + " value: " 575 + Bytes.toString(targetCell.getValueArray(), targetCell.getValueOffset(), 576 targetCell.getValueLength())); 577 } 578 context.getCounter(Counter.DIFFERENTCELLVALUES).increment(1); 579 matchingRow = false; 580 581 if (!dryRun && doPuts) { 582 // overwrite target cell 583 if (put == null) { 584 put = new Put(rowKey); 585 } 586 sourceCell = checkAndResetTimestamp(sourceCell); 587 put.add(sourceCell); 588 } 589 } 590 sourceCell = sourceCells.nextCellInRow(); 591 targetCell = targetCells.nextCellInRow(); 592 } 593 594 if (!dryRun && sourceTableHash.scanBatch > 0) { 595 if (put != null && put.size() >= sourceTableHash.scanBatch) { 596 context.write(new ImmutableBytesWritable(rowKey), put); 597 put = null; 598 } 599 if (delete != null && delete.size() >= sourceTableHash.scanBatch) { 600 context.write(new ImmutableBytesWritable(rowKey), delete); 601 delete = null; 602 } 603 } 604 } 605 606 if (!dryRun) { 607 if (put != null) { 608 context.write(new ImmutableBytesWritable(rowKey), put); 609 } 610 if (delete != null) { 611 context.write(new ImmutableBytesWritable(rowKey), delete); 612 } 613 } 614 615 if (matchingCells > 0) { 616 context.getCounter(Counter.MATCHINGCELLS).increment(matchingCells); 617 } 618 if (matchingRow) { 619 context.getCounter(Counter.MATCHINGROWS).increment(1); 620 return true; 621 } else { 622 context.getCounter(Counter.ROWSWITHDIFFS).increment(1); 623 return false; 624 } 625 } 626 627 /** 628 * Compare row keys of the given Result objects. Nulls are after non-nulls 629 */ 630 private static int compareRowKeys(byte[] r1, byte[] r2) { 631 if (r1 == null) { 632 return 1; // source missing row 633 } else if (r2 == null) { 634 return -1; // target missing row 635 } else { 636 // Sync on no META tables only. We can directly do what CellComparator is doing inside. 637 // Never the call going to MetaCellComparator. 638 return Bytes.compareTo(r1, 0, r1.length, r2, 0, r2.length); 639 } 640 } 641 642 /** 643 * Compare families, qualifiers, and timestamps of the given Cells. They are assumed to be of 644 * the same row. Nulls are after non-nulls. 645 */ 646 private int compareCellKeysWithinRow(Cell c1, Cell c2) { 647 if (c1 == null) { 648 return 1; // source missing cell 649 } 650 if (c2 == null) { 651 return -1; // target missing cell 652 } 653 654 int result = CellComparator.getInstance().compareFamilies(c1, c2); 655 if (result != 0) { 656 return result; 657 } 658 659 result = CellComparator.getInstance().compareQualifiers(c1, c2); 660 if (result != 0) { 661 return result; 662 } 663 664 if (this.ignoreTimestamp) { 665 return 0; 666 } else { 667 // note timestamp comparison is inverted - more recent cells first 668 return CellComparator.getInstance().compareTimestamps(c1, c2); 669 } 670 } 671 672 @Override 673 protected void cleanup(Context context) throws IOException, InterruptedException { 674 if (mapperException == null) { 675 try { 676 finishRemainingHashRanges(context); 677 } catch (Throwable t) { 678 mapperException = t; 679 } 680 } 681 682 try { 683 sourceTable.close(); 684 targetTable.close(); 685 sourceConnection.close(); 686 targetConnection.close(); 687 } catch (Throwable t) { 688 if (mapperException == null) { 689 mapperException = t; 690 } else { 691 LOG.error("Suppressing exception from closing tables", t); 692 } 693 } 694 695 // propagate first exception 696 if (mapperException != null) { 697 Throwables.propagateIfInstanceOf(mapperException, IOException.class); 698 Throwables.propagateIfInstanceOf(mapperException, InterruptedException.class); 699 Throwables.propagate(mapperException); 700 } 701 } 702 703 private void finishRemainingHashRanges(Context context) 704 throws IOException, InterruptedException { 705 TableSplit split = (TableSplit) context.getInputSplit(); 706 byte[] splitEndRow = split.getEndRow(); 707 boolean reachedEndOfTable = HashTable.isTableEndRow(splitEndRow); 708 709 // if there are more hash batches that begin before the end of this split move to them 710 while ( 711 nextSourceKey != null && (nextSourceKey.compareTo(splitEndRow) < 0 || reachedEndOfTable) 712 ) { 713 moveToNextBatch(context); 714 } 715 716 if (targetHasher.isBatchStarted()) { 717 // need to complete the final open hash batch 718 719 if ( 720 (nextSourceKey != null && nextSourceKey.compareTo(splitEndRow) > 0) 721 || (nextSourceKey == null && !Bytes.equals(splitEndRow, sourceTableHash.stopRow)) 722 ) { 723 // the open hash range continues past the end of this region 724 // add a scan to complete the current hash range 725 Scan scan = sourceTableHash.initScan(); 726 scan.setStartRow(splitEndRow); 727 if (nextSourceKey == null) { 728 scan.setStopRow(sourceTableHash.stopRow); 729 } else { 730 scan.setStopRow(nextSourceKey.copyBytes()); 731 } 732 733 ResultScanner targetScanner = null; 734 try { 735 targetScanner = targetTable.getScanner(scan); 736 for (Result row : targetScanner) { 737 targetHasher.hashResult(row); 738 } 739 } finally { 740 if (targetScanner != null) { 741 targetScanner.close(); 742 } 743 } 744 } // else current batch ends exactly at split end row 745 746 finishBatchAndCompareHashes(context); 747 } 748 } 749 } 750 751 private static final int NUM_ARGS = 3; 752 753 private static void printUsage(final String errorMsg) { 754 if (errorMsg != null && errorMsg.length() > 0) { 755 System.err.println("ERROR: " + errorMsg); 756 System.err.println(); 757 } 758 System.err.println("Usage: SyncTable [options] <sourcehashdir> <sourcetable> <targettable>"); 759 System.err.println(); 760 System.err.println("Options:"); 761 762 System.err.println(" sourcezkcluster ZK cluster key of the source table"); 763 System.err.println(" (defaults to cluster in classpath's config)"); 764 System.err.println(" targetzkcluster ZK cluster key of the target table"); 765 System.err.println(" (defaults to cluster in classpath's config)"); 766 System.err.println(" dryrun if true, output counters but no writes"); 767 System.err.println(" (defaults to false)"); 768 System.err.println(" doDeletes if false, does not perform deletes"); 769 System.err.println(" (defaults to true)"); 770 System.err.println(" doPuts if false, does not perform puts"); 771 System.err.println(" (defaults to true)"); 772 System.err.println(" ignoreTimestamps if true, ignores cells timestamps while comparing "); 773 System.err.println(" cell values. Any missing cell on target then gets"); 774 System.err.println(" added with current time as timestamp "); 775 System.err.println(" (defaults to false)"); 776 System.err.println(); 777 System.err.println("Args:"); 778 System.err.println(" sourcehashdir path to HashTable output dir for source table"); 779 System.err.println(" (see org.apache.hadoop.hbase.mapreduce.HashTable)"); 780 System.err.println(" sourcetable Name of the source table to sync from"); 781 System.err.println(" targettable Name of the target table to sync to"); 782 System.err.println(); 783 System.err.println("Examples:"); 784 System.err.println(" For a dry run SyncTable of tableA from a remote source cluster"); 785 System.err.println(" to a local target cluster:"); 786 System.err.println(" $ hbase " + "org.apache.hadoop.hbase.mapreduce.SyncTable --dryrun=true" 787 + " --sourcezkcluster=zk1.example.com,zk2.example.com,zk3.example.com:2181:/hbase" 788 + " hdfs://nn:9000/hashes/tableA tableA tableA"); 789 } 790 791 private boolean doCommandLine(final String[] args) { 792 if (args.length < NUM_ARGS) { 793 printUsage(null); 794 return false; 795 } 796 try { 797 sourceHashDir = new Path(args[args.length - 3]); 798 sourceTableName = args[args.length - 2]; 799 targetTableName = args[args.length - 1]; 800 801 for (int i = 0; i < args.length - NUM_ARGS; i++) { 802 String cmd = args[i]; 803 if (cmd.equals("-h") || cmd.startsWith("--h")) { 804 printUsage(null); 805 return false; 806 } 807 808 final String sourceZkClusterKey = "--sourcezkcluster="; 809 if (cmd.startsWith(sourceZkClusterKey)) { 810 sourceZkCluster = cmd.substring(sourceZkClusterKey.length()); 811 continue; 812 } 813 814 final String targetZkClusterKey = "--targetzkcluster="; 815 if (cmd.startsWith(targetZkClusterKey)) { 816 targetZkCluster = cmd.substring(targetZkClusterKey.length()); 817 continue; 818 } 819 820 final String dryRunKey = "--dryrun="; 821 if (cmd.startsWith(dryRunKey)) { 822 dryRun = Boolean.parseBoolean(cmd.substring(dryRunKey.length())); 823 continue; 824 } 825 826 final String doDeletesKey = "--doDeletes="; 827 if (cmd.startsWith(doDeletesKey)) { 828 doDeletes = Boolean.parseBoolean(cmd.substring(doDeletesKey.length())); 829 continue; 830 } 831 832 final String doPutsKey = "--doPuts="; 833 if (cmd.startsWith(doPutsKey)) { 834 doPuts = Boolean.parseBoolean(cmd.substring(doPutsKey.length())); 835 continue; 836 } 837 838 final String ignoreTimestampsKey = "--ignoreTimestamps="; 839 if (cmd.startsWith(ignoreTimestampsKey)) { 840 ignoreTimestamps = Boolean.parseBoolean(cmd.substring(ignoreTimestampsKey.length())); 841 continue; 842 } 843 844 printUsage("Invalid argument '" + cmd + "'"); 845 return false; 846 } 847 848 } catch (Exception e) { 849 e.printStackTrace(); 850 printUsage("Can't start because " + e.getMessage()); 851 return false; 852 } 853 return true; 854 } 855 856 /** 857 * Main entry point. 858 */ 859 public static void main(String[] args) throws Exception { 860 int ret = ToolRunner.run(new SyncTable(HBaseConfiguration.create()), args); 861 System.exit(ret); 862 } 863 864 @Override 865 public int run(String[] args) throws Exception { 866 String[] otherArgs = new GenericOptionsParser(getConf(), args).getRemainingArgs(); 867 if (!doCommandLine(otherArgs)) { 868 return 1; 869 } 870 871 Job job = createSubmittableJob(otherArgs); 872 if (!job.waitForCompletion(true)) { 873 LOG.info("Map-reduce job failed!"); 874 return 1; 875 } 876 counters = job.getCounters(); 877 return 0; 878 } 879 880}