001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import java.io.IOException; 021import java.net.URI; 022import java.net.URISyntaxException; 023import java.util.Collections; 024import java.util.Iterator; 025import org.apache.commons.lang3.StringUtils; 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.conf.Configured; 028import org.apache.hadoop.fs.FileStatus; 029import org.apache.hadoop.fs.FileSystem; 030import org.apache.hadoop.fs.Path; 031import org.apache.hadoop.hbase.Cell; 032import org.apache.hadoop.hbase.CellBuilderFactory; 033import org.apache.hadoop.hbase.CellBuilderType; 034import org.apache.hadoop.hbase.CellComparator; 035import org.apache.hadoop.hbase.CellUtil; 036import org.apache.hadoop.hbase.HBaseConfiguration; 037import org.apache.hadoop.hbase.TableName; 038import org.apache.hadoop.hbase.client.Connection; 039import org.apache.hadoop.hbase.client.ConnectionFactory; 040import org.apache.hadoop.hbase.client.Delete; 041import org.apache.hadoop.hbase.client.Mutation; 042import org.apache.hadoop.hbase.client.Put; 043import org.apache.hadoop.hbase.client.Result; 044import org.apache.hadoop.hbase.client.ResultScanner; 045import org.apache.hadoop.hbase.client.Scan; 046import org.apache.hadoop.hbase.client.Table; 047import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 048import org.apache.hadoop.hbase.util.Bytes; 049import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 050import org.apache.hadoop.mapreduce.Counters; 051import org.apache.hadoop.mapreduce.Job; 052import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; 053import org.apache.hadoop.mapreduce.security.TokenCache; 054import org.apache.hadoop.util.GenericOptionsParser; 055import org.apache.hadoop.util.Tool; 056import org.apache.hadoop.util.ToolRunner; 057import org.apache.yetus.audience.InterfaceAudience; 058import org.slf4j.Logger; 059import org.slf4j.LoggerFactory; 060 061import org.apache.hbase.thirdparty.com.google.common.base.Throwables; 062 063@InterfaceAudience.Private 064public class SyncTable extends Configured implements Tool { 065 066 private static final Logger LOG = LoggerFactory.getLogger(SyncTable.class); 067 068 static final String SOURCE_HASH_DIR_CONF_KEY = "sync.table.source.hash.dir"; 069 static final String SOURCE_TABLE_CONF_KEY = "sync.table.source.table.name"; 070 static final String TARGET_TABLE_CONF_KEY = "sync.table.target.table.name"; 071 static final String SOURCE_URI_CONF_KEY = "sync.table.source.uri"; 072 /** 073 * @deprecated Since 3.0.0, will be removed in 4.0.0 Use {@link #SOURCE_URI_CONF_KEY} instead. 074 */ 075 @Deprecated 076 static final String SOURCE_ZK_CLUSTER_CONF_KEY = "sync.table.source.zk.cluster"; 077 static final String TARGET_URI_CONF_KEY = "sync.table.target.uri"; 078 /** 079 * @deprecated Since 3.0.0, will be removed in 4.0.0 Use {@link #TARGET_URI_CONF_KEY} instead. 080 */ 081 @Deprecated 082 static final String TARGET_ZK_CLUSTER_CONF_KEY = "sync.table.target.zk.cluster"; 083 static final String DRY_RUN_CONF_KEY = "sync.table.dry.run"; 084 static final String DO_DELETES_CONF_KEY = "sync.table.do.deletes"; 085 static final String DO_PUTS_CONF_KEY = "sync.table.do.puts"; 086 static final String IGNORE_TIMESTAMPS = "sync.table.ignore.timestamps"; 087 088 Path sourceHashDir; 089 String sourceTableName; 090 String targetTableName; 091 092 URI sourceUri; 093 /** 094 * @deprecated Since 3.0.0, will be removed in 4.0.0 Use {@link #sourceUri} instead. 095 */ 096 @Deprecated 097 String sourceZkCluster; 098 URI targetUri; 099 /** 100 * @deprecated Since 3.0.0, will be removed in 4.0.0 Use {@link #targetUri} instead. 101 */ 102 @Deprecated 103 String targetZkCluster; 104 boolean dryRun; 105 boolean doDeletes = true; 106 boolean doPuts = true; 107 boolean ignoreTimestamps; 108 109 Counters counters; 110 111 public SyncTable(Configuration conf) { 112 super(conf); 113 } 114 115 private void initCredentialsForHBase(String clusterKey, Job job) throws IOException { 116 Configuration peerConf = 117 HBaseConfiguration.createClusterConf(job.getConfiguration(), clusterKey); 118 TableMapReduceUtil.initCredentialsForCluster(job, peerConf); 119 } 120 121 public Job createSubmittableJob(String[] args) throws IOException { 122 FileSystem fs = sourceHashDir.getFileSystem(getConf()); 123 if (!fs.exists(sourceHashDir)) { 124 throw new IOException("Source hash dir not found: " + sourceHashDir); 125 } 126 127 Job job = Job.getInstance(getConf(), 128 getConf().get("mapreduce.job.name", "syncTable_" + sourceTableName + "-" + targetTableName)); 129 Configuration jobConf = job.getConfiguration(); 130 if ("kerberos".equalsIgnoreCase(jobConf.get("hadoop.security.authentication"))) { 131 TokenCache.obtainTokensForNamenodes(job.getCredentials(), new Path[] { sourceHashDir }, 132 getConf()); 133 } 134 135 HashTable.TableHash tableHash = HashTable.TableHash.read(getConf(), sourceHashDir); 136 LOG.info("Read source hash manifest: " + tableHash); 137 LOG.info("Read " + tableHash.partitions.size() + " partition keys"); 138 if (!tableHash.tableName.equals(sourceTableName)) { 139 LOG.warn("Table name mismatch - manifest indicates hash was taken from: " 140 + tableHash.tableName + " but job is reading from: " + sourceTableName); 141 } 142 if (tableHash.numHashFiles != tableHash.partitions.size() + 1) { 143 throw new RuntimeException("Hash data appears corrupt. The number of of hash files created" 144 + " should be 1 more than the number of partition keys. However, the manifest file " 145 + " says numHashFiles=" + tableHash.numHashFiles + " but the number of partition keys" 146 + " found in the partitions file is " + tableHash.partitions.size()); 147 } 148 149 Path dataDir = new Path(sourceHashDir, HashTable.HASH_DATA_DIR); 150 int dataSubdirCount = 0; 151 for (FileStatus file : fs.listStatus(dataDir)) { 152 if (file.getPath().getName().startsWith(HashTable.OUTPUT_DATA_FILE_PREFIX)) { 153 dataSubdirCount++; 154 } 155 } 156 157 if (dataSubdirCount != tableHash.numHashFiles) { 158 throw new RuntimeException("Hash data appears corrupt. The number of of hash files created" 159 + " should be 1 more than the number of partition keys. However, the number of data dirs" 160 + " found is " + dataSubdirCount + " but the number of partition keys" 161 + " found in the partitions file is " + tableHash.partitions.size()); 162 } 163 164 job.setJarByClass(HashTable.class); 165 jobConf.set(SOURCE_HASH_DIR_CONF_KEY, sourceHashDir.toString()); 166 jobConf.set(SOURCE_TABLE_CONF_KEY, sourceTableName); 167 jobConf.set(TARGET_TABLE_CONF_KEY, targetTableName); 168 if (sourceUri != null) { 169 jobConf.set(SOURCE_URI_CONF_KEY, sourceUri.toString()); 170 TableMapReduceUtil.initCredentialsForCluster(job, jobConf, sourceUri); 171 } else if (sourceZkCluster != null) { 172 jobConf.set(SOURCE_ZK_CLUSTER_CONF_KEY, sourceZkCluster); 173 initCredentialsForHBase(sourceZkCluster, job); 174 } 175 if (targetUri != null) { 176 jobConf.set(TARGET_URI_CONF_KEY, targetUri.toString()); 177 TableMapReduceUtil.initCredentialsForCluster(job, jobConf, targetUri); 178 } else if (targetZkCluster != null) { 179 jobConf.set(TARGET_ZK_CLUSTER_CONF_KEY, targetZkCluster); 180 initCredentialsForHBase(targetZkCluster, job); 181 } 182 jobConf.setBoolean(DRY_RUN_CONF_KEY, dryRun); 183 jobConf.setBoolean(DO_DELETES_CONF_KEY, doDeletes); 184 jobConf.setBoolean(DO_PUTS_CONF_KEY, doPuts); 185 jobConf.setBoolean(IGNORE_TIMESTAMPS, ignoreTimestamps); 186 187 TableMapReduceUtil.initTableMapperJob(targetTableName, tableHash.initScan(), SyncMapper.class, 188 null, null, job); 189 190 job.setNumReduceTasks(0); 191 192 if (dryRun) { 193 job.setOutputFormatClass(NullOutputFormat.class); 194 } else { 195 // No reducers. Just write straight to table. Call initTableReducerJob 196 // because it sets up the TableOutputFormat. 197 if (targetUri != null) { 198 TableMapReduceUtil.initTableReducerJob(targetTableName, null, job, null, targetUri); 199 } else { 200 TableMapReduceUtil.initTableReducerJob(targetTableName, null, job, null, targetZkCluster); 201 } 202 // would be nice to add an option for bulk load instead 203 } 204 205 return job; 206 } 207 208 public static class SyncMapper extends TableMapper<ImmutableBytesWritable, Mutation> { 209 Path sourceHashDir; 210 211 Connection sourceConnection; 212 Connection targetConnection; 213 Table sourceTable; 214 Table targetTable; 215 boolean dryRun; 216 boolean doDeletes = true; 217 boolean doPuts = true; 218 boolean ignoreTimestamp; 219 220 HashTable.TableHash sourceTableHash; 221 HashTable.TableHash.Reader sourceHashReader; 222 ImmutableBytesWritable currentSourceHash; 223 ImmutableBytesWritable nextSourceKey; 224 HashTable.ResultHasher targetHasher; 225 226 Throwable mapperException; 227 228 public static enum Counter { 229 BATCHES, 230 HASHES_MATCHED, 231 HASHES_NOT_MATCHED, 232 SOURCEMISSINGROWS, 233 SOURCEMISSINGCELLS, 234 TARGETMISSINGROWS, 235 TARGETMISSINGCELLS, 236 ROWSWITHDIFFS, 237 DIFFERENTCELLVALUES, 238 MATCHINGROWS, 239 MATCHINGCELLS, 240 EMPTY_BATCHES, 241 RANGESMATCHED, 242 RANGESNOTMATCHED 243 } 244 245 @Override 246 protected void setup(Context context) throws IOException { 247 Configuration conf = context.getConfiguration(); 248 sourceHashDir = new Path(conf.get(SOURCE_HASH_DIR_CONF_KEY)); 249 sourceConnection = 250 openConnection(conf, SOURCE_URI_CONF_KEY, SOURCE_ZK_CLUSTER_CONF_KEY, null); 251 targetConnection = openConnection(conf, TARGET_URI_CONF_KEY, TARGET_ZK_CLUSTER_CONF_KEY, 252 TableOutputFormat.OUTPUT_CONF_PREFIX); 253 sourceTable = openTable(sourceConnection, conf, SOURCE_TABLE_CONF_KEY); 254 targetTable = openTable(targetConnection, conf, TARGET_TABLE_CONF_KEY); 255 dryRun = conf.getBoolean(DRY_RUN_CONF_KEY, false); 256 doDeletes = conf.getBoolean(DO_DELETES_CONF_KEY, true); 257 doPuts = conf.getBoolean(DO_PUTS_CONF_KEY, true); 258 ignoreTimestamp = conf.getBoolean(IGNORE_TIMESTAMPS, false); 259 260 sourceTableHash = HashTable.TableHash.read(conf, sourceHashDir); 261 LOG.info("Read source hash manifest: " + sourceTableHash); 262 LOG.info("Read " + sourceTableHash.partitions.size() + " partition keys"); 263 264 TableSplit split = (TableSplit) context.getInputSplit(); 265 ImmutableBytesWritable splitStartKey = new ImmutableBytesWritable(split.getStartRow()); 266 267 sourceHashReader = sourceTableHash.newReader(conf, splitStartKey); 268 findNextKeyHashPair(); 269 270 // create a hasher, but don't start it right away 271 // instead, find the first hash batch at or after the start row 272 // and skip any rows that come before. they will be caught by the previous task 273 targetHasher = new HashTable.ResultHasher(); 274 targetHasher.ignoreTimestamps = ignoreTimestamp; 275 } 276 277 private static Connection openConnection(Configuration conf, String uriConfKey, 278 String zkClusterConfKey, String configPrefix) throws IOException { 279 String uri = conf.get(uriConfKey); 280 if (!StringUtils.isBlank(uri)) { 281 try { 282 return ConnectionFactory.createConnection(new URI(uri), conf); 283 } catch (URISyntaxException e) { 284 throw new IOException( 285 "malformed connection uri: " + uri + ", please check config " + uriConfKey, e); 286 } 287 } else { 288 String zkCluster = conf.get(zkClusterConfKey); 289 Configuration clusterConf = 290 HBaseConfiguration.createClusterConf(conf, zkCluster, configPrefix); 291 return ConnectionFactory.createConnection(clusterConf); 292 } 293 } 294 295 private static Table openTable(Connection connection, Configuration conf, 296 String tableNameConfKey) throws IOException { 297 return connection.getTable(TableName.valueOf(conf.get(tableNameConfKey))); 298 } 299 300 /** 301 * Attempt to read the next source key/hash pair. If there are no more, set nextSourceKey to 302 * null 303 */ 304 private void findNextKeyHashPair() throws IOException { 305 boolean hasNext = sourceHashReader.next(); 306 if (hasNext) { 307 nextSourceKey = sourceHashReader.getCurrentKey(); 308 } else { 309 // no more keys - last hash goes to the end 310 nextSourceKey = null; 311 } 312 } 313 314 @Override 315 protected void map(ImmutableBytesWritable key, Result value, Context context) 316 throws IOException, InterruptedException { 317 try { 318 // first, finish any hash batches that end before the scanned row 319 while (nextSourceKey != null && key.compareTo(nextSourceKey) >= 0) { 320 moveToNextBatch(context); 321 } 322 323 // next, add the scanned row (as long as we've reached the first batch) 324 if (targetHasher.isBatchStarted()) { 325 targetHasher.hashResult(value); 326 } 327 } catch (Throwable t) { 328 mapperException = t; 329 throw t; 330 } 331 } 332 333 /** 334 * If there is an open hash batch, complete it and sync if there are diffs. Start a new batch, 335 * and seek to read the 336 */ 337 private void moveToNextBatch(Context context) throws IOException, InterruptedException { 338 if (targetHasher.isBatchStarted()) { 339 finishBatchAndCompareHashes(context); 340 } 341 targetHasher.startBatch(nextSourceKey); 342 currentSourceHash = sourceHashReader.getCurrentHash(); 343 344 findNextKeyHashPair(); 345 } 346 347 /** 348 * Finish the currently open hash batch. Compare the target hash to the given source hash. If 349 * they do not match, then sync the covered key range. 350 */ 351 private void finishBatchAndCompareHashes(Context context) 352 throws IOException, InterruptedException { 353 targetHasher.finishBatch(); 354 context.getCounter(Counter.BATCHES).increment(1); 355 if (targetHasher.getBatchSize() == 0) { 356 context.getCounter(Counter.EMPTY_BATCHES).increment(1); 357 } 358 ImmutableBytesWritable targetHash = targetHasher.getBatchHash(); 359 if (targetHash.equals(currentSourceHash)) { 360 context.getCounter(Counter.HASHES_MATCHED).increment(1); 361 } else { 362 context.getCounter(Counter.HASHES_NOT_MATCHED).increment(1); 363 364 ImmutableBytesWritable stopRow = nextSourceKey == null 365 ? new ImmutableBytesWritable(sourceTableHash.stopRow) 366 : nextSourceKey; 367 368 if (LOG.isDebugEnabled()) { 369 LOG.debug("Hash mismatch. Key range: " + toHex(targetHasher.getBatchStartKey()) + " to " 370 + toHex(stopRow) + " sourceHash: " + toHex(currentSourceHash) + " targetHash: " 371 + toHex(targetHash)); 372 } 373 374 syncRange(context, targetHasher.getBatchStartKey(), stopRow); 375 } 376 } 377 378 private static String toHex(ImmutableBytesWritable bytes) { 379 return Bytes.toHex(bytes.get(), bytes.getOffset(), bytes.getLength()); 380 } 381 382 private static final CellScanner EMPTY_CELL_SCANNER = 383 new CellScanner(Collections.<Result> emptyIterator()); 384 385 /** 386 * Rescan the given range directly from the source and target tables. Count and log differences, 387 * and if this is not a dry run, output Puts and Deletes to make the target table match the 388 * source table for this range 389 */ 390 private void syncRange(Context context, ImmutableBytesWritable startRow, 391 ImmutableBytesWritable stopRow) throws IOException, InterruptedException { 392 Scan scan = sourceTableHash.initScan(); 393 scan.withStartRow(startRow.copyBytes()); 394 scan.withStopRow(stopRow.copyBytes()); 395 396 ResultScanner sourceScanner = sourceTable.getScanner(scan); 397 CellScanner sourceCells = new CellScanner(sourceScanner.iterator()); 398 399 ResultScanner targetScanner = targetTable.getScanner(new Scan(scan)); 400 CellScanner targetCells = new CellScanner(targetScanner.iterator()); 401 402 boolean rangeMatched = true; 403 byte[] nextSourceRow = sourceCells.nextRow(); 404 byte[] nextTargetRow = targetCells.nextRow(); 405 while (nextSourceRow != null || nextTargetRow != null) { 406 boolean rowMatched; 407 int rowComparison = compareRowKeys(nextSourceRow, nextTargetRow); 408 if (rowComparison < 0) { 409 if (LOG.isDebugEnabled()) { 410 LOG.debug("Target missing row: " + Bytes.toString(nextSourceRow)); 411 } 412 context.getCounter(Counter.TARGETMISSINGROWS).increment(1); 413 414 rowMatched = syncRowCells(context, nextSourceRow, sourceCells, EMPTY_CELL_SCANNER); 415 nextSourceRow = sourceCells.nextRow(); // advance only source to next row 416 } else if (rowComparison > 0) { 417 if (LOG.isDebugEnabled()) { 418 LOG.debug("Source missing row: " + Bytes.toString(nextTargetRow)); 419 } 420 context.getCounter(Counter.SOURCEMISSINGROWS).increment(1); 421 422 rowMatched = syncRowCells(context, nextTargetRow, EMPTY_CELL_SCANNER, targetCells); 423 nextTargetRow = targetCells.nextRow(); // advance only target to next row 424 } else { 425 // current row is the same on both sides, compare cell by cell 426 rowMatched = syncRowCells(context, nextSourceRow, sourceCells, targetCells); 427 nextSourceRow = sourceCells.nextRow(); 428 nextTargetRow = targetCells.nextRow(); 429 } 430 431 if (!rowMatched) { 432 rangeMatched = false; 433 } 434 } 435 436 sourceScanner.close(); 437 targetScanner.close(); 438 439 context.getCounter(rangeMatched ? Counter.RANGESMATCHED : Counter.RANGESNOTMATCHED) 440 .increment(1); 441 } 442 443 private static class CellScanner { 444 private final Iterator<Result> results; 445 446 private byte[] currentRow; 447 private Result currentRowResult; 448 private int nextCellInRow; 449 450 private Result nextRowResult; 451 452 public CellScanner(Iterator<Result> results) { 453 this.results = results; 454 } 455 456 /** 457 * Advance to the next row and return its row key. Returns null iff there are no more rows. 458 */ 459 public byte[] nextRow() { 460 if (nextRowResult == null) { 461 // no cached row - check scanner for more 462 while (results.hasNext()) { 463 nextRowResult = results.next(); 464 Cell nextCell = nextRowResult.rawCells()[0]; 465 if ( 466 currentRow == null || !Bytes.equals(currentRow, 0, currentRow.length, 467 nextCell.getRowArray(), nextCell.getRowOffset(), nextCell.getRowLength()) 468 ) { 469 // found next row 470 break; 471 } else { 472 // found another result from current row, keep scanning 473 nextRowResult = null; 474 } 475 } 476 477 if (nextRowResult == null) { 478 // end of data, no more rows 479 currentRowResult = null; 480 currentRow = null; 481 return null; 482 } 483 } 484 485 // advance to cached result for next row 486 currentRowResult = nextRowResult; 487 nextCellInRow = 0; 488 currentRow = currentRowResult.getRow(); 489 nextRowResult = null; 490 return currentRow; 491 } 492 493 /** 494 * Returns the next Cell in the current row or null iff none remain. 495 */ 496 public Cell nextCellInRow() { 497 if (currentRowResult == null) { 498 // nothing left in current row 499 return null; 500 } 501 502 Cell nextCell = currentRowResult.rawCells()[nextCellInRow]; 503 nextCellInRow++; 504 if (nextCellInRow == currentRowResult.size()) { 505 if (results.hasNext()) { 506 Result result = results.next(); 507 Cell cell = result.rawCells()[0]; 508 if ( 509 Bytes.equals(currentRow, 0, currentRow.length, cell.getRowArray(), 510 cell.getRowOffset(), cell.getRowLength()) 511 ) { 512 // result is part of current row 513 currentRowResult = result; 514 nextCellInRow = 0; 515 } else { 516 // result is part of next row, cache it 517 nextRowResult = result; 518 // current row is complete 519 currentRowResult = null; 520 } 521 } else { 522 // end of data 523 currentRowResult = null; 524 } 525 } 526 return nextCell; 527 } 528 } 529 530 private Cell checkAndResetTimestamp(Cell sourceCell) { 531 if (ignoreTimestamp) { 532 sourceCell = 533 CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setType(sourceCell.getType()) 534 .setRow(sourceCell.getRowArray(), sourceCell.getRowOffset(), sourceCell.getRowLength()) 535 .setFamily(sourceCell.getFamilyArray(), sourceCell.getFamilyOffset(), 536 sourceCell.getFamilyLength()) 537 .setQualifier(sourceCell.getQualifierArray(), sourceCell.getQualifierOffset(), 538 sourceCell.getQualifierLength()) 539 .setTimestamp(EnvironmentEdgeManager.currentTime()).setValue(sourceCell.getValueArray(), 540 sourceCell.getValueOffset(), sourceCell.getValueLength()) 541 .build(); 542 } 543 return sourceCell; 544 } 545 546 /** 547 * Compare the cells for the given row from the source and target tables. Count and log any 548 * differences. If not a dry run, output a Put and/or Delete needed to sync the target table to 549 * match the source table. 550 */ 551 private boolean syncRowCells(Context context, byte[] rowKey, CellScanner sourceCells, 552 CellScanner targetCells) throws IOException, InterruptedException { 553 Put put = null; 554 Delete delete = null; 555 long matchingCells = 0; 556 boolean matchingRow = true; 557 Cell sourceCell = sourceCells.nextCellInRow(); 558 Cell targetCell = targetCells.nextCellInRow(); 559 while (sourceCell != null || targetCell != null) { 560 561 int cellKeyComparison = compareCellKeysWithinRow(sourceCell, targetCell); 562 if (cellKeyComparison < 0) { 563 if (LOG.isDebugEnabled()) { 564 LOG.debug("Target missing cell: " + sourceCell); 565 } 566 context.getCounter(Counter.TARGETMISSINGCELLS).increment(1); 567 matchingRow = false; 568 569 if (!dryRun && doPuts) { 570 if (put == null) { 571 put = new Put(rowKey); 572 } 573 sourceCell = checkAndResetTimestamp(sourceCell); 574 put.add(sourceCell); 575 } 576 577 sourceCell = sourceCells.nextCellInRow(); 578 } else if (cellKeyComparison > 0) { 579 if (LOG.isDebugEnabled()) { 580 LOG.debug("Source missing cell: " + targetCell); 581 } 582 context.getCounter(Counter.SOURCEMISSINGCELLS).increment(1); 583 matchingRow = false; 584 585 if (!dryRun && doDeletes) { 586 if (delete == null) { 587 delete = new Delete(rowKey); 588 } 589 // add a tombstone to exactly match the target cell that is missing on the source 590 delete.addColumn(CellUtil.cloneFamily(targetCell), CellUtil.cloneQualifier(targetCell), 591 targetCell.getTimestamp()); 592 } 593 594 targetCell = targetCells.nextCellInRow(); 595 } else { 596 // the cell keys are equal, now check values 597 if (CellUtil.matchingValue(sourceCell, targetCell)) { 598 matchingCells++; 599 } else { 600 if (LOG.isDebugEnabled()) { 601 LOG.debug("Different values: "); 602 LOG.debug(" source cell: " + sourceCell + " value: " 603 + Bytes.toString(sourceCell.getValueArray(), sourceCell.getValueOffset(), 604 sourceCell.getValueLength())); 605 LOG.debug(" target cell: " + targetCell + " value: " 606 + Bytes.toString(targetCell.getValueArray(), targetCell.getValueOffset(), 607 targetCell.getValueLength())); 608 } 609 context.getCounter(Counter.DIFFERENTCELLVALUES).increment(1); 610 matchingRow = false; 611 612 if (!dryRun && doPuts) { 613 // overwrite target cell 614 if (put == null) { 615 put = new Put(rowKey); 616 } 617 sourceCell = checkAndResetTimestamp(sourceCell); 618 put.add(sourceCell); 619 } 620 } 621 sourceCell = sourceCells.nextCellInRow(); 622 targetCell = targetCells.nextCellInRow(); 623 } 624 625 if (!dryRun && sourceTableHash.scanBatch > 0) { 626 if (put != null && put.size() >= sourceTableHash.scanBatch) { 627 context.write(new ImmutableBytesWritable(rowKey), put); 628 put = null; 629 } 630 if (delete != null && delete.size() >= sourceTableHash.scanBatch) { 631 context.write(new ImmutableBytesWritable(rowKey), delete); 632 delete = null; 633 } 634 } 635 } 636 637 if (!dryRun) { 638 if (put != null) { 639 context.write(new ImmutableBytesWritable(rowKey), put); 640 } 641 if (delete != null) { 642 context.write(new ImmutableBytesWritable(rowKey), delete); 643 } 644 } 645 646 if (matchingCells > 0) { 647 context.getCounter(Counter.MATCHINGCELLS).increment(matchingCells); 648 } 649 if (matchingRow) { 650 context.getCounter(Counter.MATCHINGROWS).increment(1); 651 return true; 652 } else { 653 context.getCounter(Counter.ROWSWITHDIFFS).increment(1); 654 return false; 655 } 656 } 657 658 /** 659 * Compare row keys of the given Result objects. Nulls are after non-nulls 660 */ 661 private static int compareRowKeys(byte[] r1, byte[] r2) { 662 if (r1 == null) { 663 return 1; // source missing row 664 } else if (r2 == null) { 665 return -1; // target missing row 666 } else { 667 // Sync on no META tables only. We can directly do what CellComparator is doing inside. 668 // Never the call going to MetaCellComparator. 669 return Bytes.compareTo(r1, 0, r1.length, r2, 0, r2.length); 670 } 671 } 672 673 /** 674 * Compare families, qualifiers, and timestamps of the given Cells. They are assumed to be of 675 * the same row. Nulls are after non-nulls. 676 */ 677 private int compareCellKeysWithinRow(Cell c1, Cell c2) { 678 if (c1 == null) { 679 return 1; // source missing cell 680 } 681 if (c2 == null) { 682 return -1; // target missing cell 683 } 684 685 int result = CellComparator.getInstance().compareFamilies(c1, c2); 686 if (result != 0) { 687 return result; 688 } 689 690 result = CellComparator.getInstance().compareQualifiers(c1, c2); 691 if (result != 0) { 692 return result; 693 } 694 695 if (this.ignoreTimestamp) { 696 return 0; 697 } else { 698 // note timestamp comparison is inverted - more recent cells first 699 return CellComparator.getInstance().compareTimestamps(c1, c2); 700 } 701 } 702 703 @Override 704 protected void cleanup(Context context) throws IOException, InterruptedException { 705 if (mapperException == null) { 706 try { 707 finishRemainingHashRanges(context); 708 } catch (Throwable t) { 709 mapperException = t; 710 } 711 } 712 713 try { 714 sourceTable.close(); 715 targetTable.close(); 716 sourceConnection.close(); 717 targetConnection.close(); 718 } catch (Throwable t) { 719 if (mapperException == null) { 720 mapperException = t; 721 } else { 722 LOG.error("Suppressing exception from closing tables", t); 723 } 724 } 725 726 // propagate first exception 727 if (mapperException != null) { 728 Throwables.throwIfInstanceOf(mapperException, IOException.class); 729 Throwables.throwIfInstanceOf(mapperException, InterruptedException.class); 730 Throwables.throwIfUnchecked(mapperException); 731 } 732 } 733 734 private void finishRemainingHashRanges(Context context) 735 throws IOException, InterruptedException { 736 TableSplit split = (TableSplit) context.getInputSplit(); 737 byte[] splitEndRow = split.getEndRow(); 738 boolean reachedEndOfTable = HashTable.isTableEndRow(splitEndRow); 739 740 // if there are more hash batches that begin before the end of this split move to them 741 while ( 742 nextSourceKey != null && (nextSourceKey.compareTo(splitEndRow) < 0 || reachedEndOfTable) 743 ) { 744 moveToNextBatch(context); 745 } 746 747 if (targetHasher.isBatchStarted()) { 748 // need to complete the final open hash batch 749 750 if ( 751 (nextSourceKey != null && nextSourceKey.compareTo(splitEndRow) > 0) 752 || (nextSourceKey == null && !Bytes.equals(splitEndRow, sourceTableHash.stopRow)) 753 ) { 754 // the open hash range continues past the end of this region 755 // add a scan to complete the current hash range 756 Scan scan = sourceTableHash.initScan(); 757 scan.withStartRow(splitEndRow); 758 if (nextSourceKey == null) { 759 scan.withStopRow(sourceTableHash.stopRow); 760 } else { 761 scan.withStopRow(nextSourceKey.copyBytes()); 762 } 763 764 ResultScanner targetScanner = null; 765 try { 766 targetScanner = targetTable.getScanner(scan); 767 for (Result row : targetScanner) { 768 targetHasher.hashResult(row); 769 } 770 } finally { 771 if (targetScanner != null) { 772 targetScanner.close(); 773 } 774 } 775 } // else current batch ends exactly at split end row 776 777 finishBatchAndCompareHashes(context); 778 } 779 } 780 } 781 782 private static final int NUM_ARGS = 3; 783 784 private static void printUsage(final String errorMsg) { 785 if (errorMsg != null && errorMsg.length() > 0) { 786 System.err.println("ERROR: " + errorMsg); 787 System.err.println(); 788 } 789 System.err.println("Usage: SyncTable [options] <sourcehashdir> <sourcetable> <targettable>"); 790 System.err.println(); 791 System.err.println("Options:"); 792 793 System.err.println(" sourceuri Cluster connection uri of the source table"); 794 System.err.println(" (defaults to cluster in classpath's config)"); 795 System.err.println(" sourcezkcluster ZK cluster key of the source table"); 796 System.err.println(" (defaults to cluster in classpath's config)"); 797 System.err.println(" Do not take effect if sourceuri is specified"); 798 System.err.println(" Deprecated, please use sourceuri instead"); 799 System.err.println(" targeturi Cluster connection uri of the target table"); 800 System.err.println(" (defaults to cluster in classpath's config)"); 801 System.err.println(" targetzkcluster ZK cluster key of the target table"); 802 System.err.println(" (defaults to cluster in classpath's config)"); 803 System.err.println(" Do not take effect if targeturi is specified"); 804 System.err.println(" Deprecated, please use targeturi instead"); 805 System.err.println(" dryrun if true, output counters but no writes"); 806 System.err.println(" (defaults to false)"); 807 System.err.println(" doDeletes if false, does not perform deletes"); 808 System.err.println(" (defaults to true)"); 809 System.err.println(" doPuts if false, does not perform puts"); 810 System.err.println(" (defaults to true)"); 811 System.err.println(" ignoreTimestamps if true, ignores cells timestamps while comparing "); 812 System.err.println(" cell values. Any missing cell on target then gets"); 813 System.err.println(" added with current time as timestamp "); 814 System.err.println(" (defaults to false)"); 815 System.err.println(); 816 System.err.println("Args:"); 817 System.err.println(" sourcehashdir path to HashTable output dir for source table"); 818 System.err.println(" (see org.apache.hadoop.hbase.mapreduce.HashTable)"); 819 System.err.println(" sourcetable Name of the source table to sync from"); 820 System.err.println(" targettable Name of the target table to sync to"); 821 System.err.println(); 822 System.err.println("Examples:"); 823 System.err.println(" For a dry run SyncTable of tableA from a remote source cluster"); 824 System.err.println(" to a local target cluster:"); 825 System.err.println(" $ hbase " + "org.apache.hadoop.hbase.mapreduce.SyncTable --dryrun=true" 826 + " --sourcezkcluster=zk1.example.com,zk2.example.com,zk3.example.com:2181:/hbase" 827 + " hdfs://nn:9000/hashes/tableA tableA tableA"); 828 } 829 830 private boolean doCommandLine(final String[] args) { 831 if (args.length < NUM_ARGS) { 832 printUsage(null); 833 return false; 834 } 835 try { 836 sourceHashDir = new Path(args[args.length - 3]); 837 sourceTableName = args[args.length - 2]; 838 targetTableName = args[args.length - 1]; 839 840 for (int i = 0; i < args.length - NUM_ARGS; i++) { 841 String cmd = args[i]; 842 if (cmd.equals("-h") || cmd.startsWith("--h")) { 843 printUsage(null); 844 return false; 845 } 846 final String sourceUriKey = "--sourceuri="; 847 if (cmd.startsWith(sourceUriKey)) { 848 sourceUri = new URI(cmd.substring(sourceUriKey.length())); 849 continue; 850 } 851 852 final String sourceZkClusterKey = "--sourcezkcluster="; 853 if (cmd.startsWith(sourceZkClusterKey)) { 854 sourceZkCluster = cmd.substring(sourceZkClusterKey.length()); 855 continue; 856 } 857 858 final String targetUriKey = "--targeturi="; 859 if (cmd.startsWith(targetUriKey)) { 860 targetUri = new URI(cmd.substring(targetUriKey.length())); 861 continue; 862 } 863 864 final String targetZkClusterKey = "--targetzkcluster="; 865 if (cmd.startsWith(targetZkClusterKey)) { 866 targetZkCluster = cmd.substring(targetZkClusterKey.length()); 867 continue; 868 } 869 870 final String dryRunKey = "--dryrun="; 871 if (cmd.startsWith(dryRunKey)) { 872 dryRun = Boolean.parseBoolean(cmd.substring(dryRunKey.length())); 873 continue; 874 } 875 876 final String doDeletesKey = "--doDeletes="; 877 if (cmd.startsWith(doDeletesKey)) { 878 doDeletes = Boolean.parseBoolean(cmd.substring(doDeletesKey.length())); 879 continue; 880 } 881 882 final String doPutsKey = "--doPuts="; 883 if (cmd.startsWith(doPutsKey)) { 884 doPuts = Boolean.parseBoolean(cmd.substring(doPutsKey.length())); 885 continue; 886 } 887 888 final String ignoreTimestampsKey = "--ignoreTimestamps="; 889 if (cmd.startsWith(ignoreTimestampsKey)) { 890 ignoreTimestamps = Boolean.parseBoolean(cmd.substring(ignoreTimestampsKey.length())); 891 continue; 892 } 893 894 printUsage("Invalid argument '" + cmd + "'"); 895 return false; 896 } 897 898 } catch (Exception e) { 899 LOG.error("Failed to parse commandLine arguments", e); 900 printUsage("Can't start because " + e.getMessage()); 901 return false; 902 } 903 return true; 904 } 905 906 /** 907 * Main entry point. 908 */ 909 public static void main(String[] args) throws Exception { 910 int ret = ToolRunner.run(new SyncTable(HBaseConfiguration.create()), args); 911 System.exit(ret); 912 } 913 914 @Override 915 public int run(String[] args) throws Exception { 916 String[] otherArgs = new GenericOptionsParser(getConf(), args).getRemainingArgs(); 917 if (!doCommandLine(otherArgs)) { 918 return 1; 919 } 920 921 Job job = createSubmittableJob(otherArgs); 922 if (!job.waitForCompletion(true)) { 923 LOG.info("Map-reduce job failed!"); 924 return 1; 925 } 926 counters = job.getCounters(); 927 return 0; 928 } 929 930}