001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import java.io.IOException; 021import java.util.Collections; 022import java.util.Iterator; 023import org.apache.hadoop.conf.Configuration; 024import org.apache.hadoop.conf.Configured; 025import org.apache.hadoop.fs.FileStatus; 026import org.apache.hadoop.fs.FileSystem; 027import org.apache.hadoop.fs.Path; 028import org.apache.hadoop.hbase.Cell; 029import org.apache.hadoop.hbase.CellBuilderFactory; 030import org.apache.hadoop.hbase.CellBuilderType; 031import org.apache.hadoop.hbase.CellComparator; 032import org.apache.hadoop.hbase.CellUtil; 033import org.apache.hadoop.hbase.HBaseConfiguration; 034import org.apache.hadoop.hbase.TableName; 035import org.apache.hadoop.hbase.client.Connection; 036import org.apache.hadoop.hbase.client.ConnectionFactory; 037import org.apache.hadoop.hbase.client.Delete; 038import org.apache.hadoop.hbase.client.Mutation; 039import org.apache.hadoop.hbase.client.Put; 040import org.apache.hadoop.hbase.client.Result; 041import org.apache.hadoop.hbase.client.ResultScanner; 042import org.apache.hadoop.hbase.client.Scan; 043import org.apache.hadoop.hbase.client.Table; 044import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 045import org.apache.hadoop.hbase.util.Bytes; 046import org.apache.hadoop.mapreduce.Counters; 047import org.apache.hadoop.mapreduce.Job; 048import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; 049import org.apache.hadoop.mapreduce.security.TokenCache; 050import org.apache.hadoop.util.GenericOptionsParser; 051import org.apache.hadoop.util.Tool; 052import org.apache.hadoop.util.ToolRunner; 053import org.apache.yetus.audience.InterfaceAudience; 054import org.slf4j.Logger; 055import org.slf4j.LoggerFactory; 056 057import org.apache.hbase.thirdparty.com.google.common.base.Throwables; 058 059@InterfaceAudience.Private 060public class SyncTable extends Configured implements Tool { 061 062 private static final Logger LOG = LoggerFactory.getLogger(SyncTable.class); 063 064 static final String SOURCE_HASH_DIR_CONF_KEY = "sync.table.source.hash.dir"; 065 static final String SOURCE_TABLE_CONF_KEY = "sync.table.source.table.name"; 066 static final String TARGET_TABLE_CONF_KEY = "sync.table.target.table.name"; 067 static final String SOURCE_ZK_CLUSTER_CONF_KEY = "sync.table.source.zk.cluster"; 068 static final String TARGET_ZK_CLUSTER_CONF_KEY = "sync.table.target.zk.cluster"; 069 static final String DRY_RUN_CONF_KEY = "sync.table.dry.run"; 070 static final String DO_DELETES_CONF_KEY = "sync.table.do.deletes"; 071 static final String DO_PUTS_CONF_KEY = "sync.table.do.puts"; 072 static final String IGNORE_TIMESTAMPS = "sync.table.ignore.timestamps"; 073 074 Path sourceHashDir; 075 String sourceTableName; 076 String targetTableName; 077 078 String sourceZkCluster; 079 String targetZkCluster; 080 boolean dryRun; 081 boolean doDeletes = true; 082 boolean doPuts = true; 083 boolean ignoreTimestamps; 084 085 Counters counters; 086 087 public SyncTable(Configuration conf) { 088 super(conf); 089 } 090 091 private void initCredentialsForHBase(String zookeeper, Job job) throws IOException { 092 Configuration peerConf = HBaseConfiguration.createClusterConf(job 093 .getConfiguration(), zookeeper); 094 if("kerberos".equalsIgnoreCase(peerConf.get("hbase.security.authentication"))){ 095 TableMapReduceUtil.initCredentialsForCluster(job, peerConf); 096 } 097 } 098 099 public Job createSubmittableJob(String[] args) throws IOException { 100 FileSystem fs = sourceHashDir.getFileSystem(getConf()); 101 if (!fs.exists(sourceHashDir)) { 102 throw new IOException("Source hash dir not found: " + sourceHashDir); 103 } 104 105 Job job = Job.getInstance(getConf(),getConf().get("mapreduce.job.name", 106 "syncTable_" + sourceTableName + "-" + targetTableName)); 107 Configuration jobConf = job.getConfiguration(); 108 if ("kerberos".equalsIgnoreCase(jobConf.get("hadoop.security.authentication"))) { 109 TokenCache.obtainTokensForNamenodes(job.getCredentials(), new 110 Path[] { sourceHashDir }, getConf()); 111 } 112 113 HashTable.TableHash tableHash = HashTable.TableHash.read(getConf(), sourceHashDir); 114 LOG.info("Read source hash manifest: " + tableHash); 115 LOG.info("Read " + tableHash.partitions.size() + " partition keys"); 116 if (!tableHash.tableName.equals(sourceTableName)) { 117 LOG.warn("Table name mismatch - manifest indicates hash was taken from: " 118 + tableHash.tableName + " but job is reading from: " + sourceTableName); 119 } 120 if (tableHash.numHashFiles != tableHash.partitions.size() + 1) { 121 throw new RuntimeException("Hash data appears corrupt. The number of of hash files created" 122 + " should be 1 more than the number of partition keys. However, the manifest file " 123 + " says numHashFiles=" + tableHash.numHashFiles + " but the number of partition keys" 124 + " found in the partitions file is " + tableHash.partitions.size()); 125 } 126 127 Path dataDir = new Path(sourceHashDir, HashTable.HASH_DATA_DIR); 128 int dataSubdirCount = 0; 129 for (FileStatus file : fs.listStatus(dataDir)) { 130 if (file.getPath().getName().startsWith(HashTable.OUTPUT_DATA_FILE_PREFIX)) { 131 dataSubdirCount++; 132 } 133 } 134 135 if (dataSubdirCount != tableHash.numHashFiles) { 136 throw new RuntimeException("Hash data appears corrupt. The number of of hash files created" 137 + " should be 1 more than the number of partition keys. However, the number of data dirs" 138 + " found is " + dataSubdirCount + " but the number of partition keys" 139 + " found in the partitions file is " + tableHash.partitions.size()); 140 } 141 142 job.setJarByClass(HashTable.class); 143 jobConf.set(SOURCE_HASH_DIR_CONF_KEY, sourceHashDir.toString()); 144 jobConf.set(SOURCE_TABLE_CONF_KEY, sourceTableName); 145 jobConf.set(TARGET_TABLE_CONF_KEY, targetTableName); 146 if (sourceZkCluster != null) { 147 jobConf.set(SOURCE_ZK_CLUSTER_CONF_KEY, sourceZkCluster); 148 initCredentialsForHBase(sourceZkCluster, job); 149 } 150 if (targetZkCluster != null) { 151 jobConf.set(TARGET_ZK_CLUSTER_CONF_KEY, targetZkCluster); 152 initCredentialsForHBase(targetZkCluster, job); 153 } 154 jobConf.setBoolean(DRY_RUN_CONF_KEY, dryRun); 155 jobConf.setBoolean(DO_DELETES_CONF_KEY, doDeletes); 156 jobConf.setBoolean(DO_PUTS_CONF_KEY, doPuts); 157 jobConf.setBoolean(IGNORE_TIMESTAMPS, ignoreTimestamps); 158 159 TableMapReduceUtil.initTableMapperJob(targetTableName, tableHash.initScan(), 160 SyncMapper.class, null, null, job); 161 162 job.setNumReduceTasks(0); 163 164 if (dryRun) { 165 job.setOutputFormatClass(NullOutputFormat.class); 166 } else { 167 // No reducers. Just write straight to table. Call initTableReducerJob 168 // because it sets up the TableOutputFormat. 169 TableMapReduceUtil.initTableReducerJob(targetTableName, null, job, null, 170 targetZkCluster, null, null); 171 172 // would be nice to add an option for bulk load instead 173 } 174 175 // Obtain an authentication token, for the specified cluster, on behalf of the current user 176 if (sourceZkCluster != null) { 177 Configuration peerConf = 178 HBaseConfiguration.createClusterConf(job.getConfiguration(), sourceZkCluster); 179 TableMapReduceUtil.initCredentialsForCluster(job, peerConf); 180 } 181 return job; 182 } 183 184 public static class SyncMapper extends TableMapper<ImmutableBytesWritable, Mutation> { 185 Path sourceHashDir; 186 187 Connection sourceConnection; 188 Connection targetConnection; 189 Table sourceTable; 190 Table targetTable; 191 boolean dryRun; 192 boolean doDeletes = true; 193 boolean doPuts = true; 194 boolean ignoreTimestamp; 195 196 HashTable.TableHash sourceTableHash; 197 HashTable.TableHash.Reader sourceHashReader; 198 ImmutableBytesWritable currentSourceHash; 199 ImmutableBytesWritable nextSourceKey; 200 HashTable.ResultHasher targetHasher; 201 202 Throwable mapperException; 203 204 public static enum Counter {BATCHES, HASHES_MATCHED, HASHES_NOT_MATCHED, SOURCEMISSINGROWS, 205 SOURCEMISSINGCELLS, TARGETMISSINGROWS, TARGETMISSINGCELLS, ROWSWITHDIFFS, DIFFERENTCELLVALUES, 206 MATCHINGROWS, MATCHINGCELLS, EMPTY_BATCHES, RANGESMATCHED, RANGESNOTMATCHED}; 207 208 @Override 209 protected void setup(Context context) throws IOException { 210 211 Configuration conf = context.getConfiguration(); 212 sourceHashDir = new Path(conf.get(SOURCE_HASH_DIR_CONF_KEY)); 213 sourceConnection = openConnection(conf, SOURCE_ZK_CLUSTER_CONF_KEY, null); 214 targetConnection = openConnection(conf, TARGET_ZK_CLUSTER_CONF_KEY, 215 TableOutputFormat.OUTPUT_CONF_PREFIX); 216 sourceTable = openTable(sourceConnection, conf, SOURCE_TABLE_CONF_KEY); 217 targetTable = openTable(targetConnection, conf, TARGET_TABLE_CONF_KEY); 218 dryRun = conf.getBoolean(DRY_RUN_CONF_KEY, false); 219 doDeletes = conf.getBoolean(DO_DELETES_CONF_KEY, true); 220 doPuts = conf.getBoolean(DO_PUTS_CONF_KEY, true); 221 ignoreTimestamp = conf.getBoolean(IGNORE_TIMESTAMPS, false); 222 223 sourceTableHash = HashTable.TableHash.read(conf, sourceHashDir); 224 LOG.info("Read source hash manifest: " + sourceTableHash); 225 LOG.info("Read " + sourceTableHash.partitions.size() + " partition keys"); 226 227 TableSplit split = (TableSplit) context.getInputSplit(); 228 ImmutableBytesWritable splitStartKey = new ImmutableBytesWritable(split.getStartRow()); 229 230 sourceHashReader = sourceTableHash.newReader(conf, splitStartKey); 231 findNextKeyHashPair(); 232 233 // create a hasher, but don't start it right away 234 // instead, find the first hash batch at or after the start row 235 // and skip any rows that come before. they will be caught by the previous task 236 targetHasher = new HashTable.ResultHasher(); 237 targetHasher.ignoreTimestamps = ignoreTimestamp; 238 } 239 240 private static Connection openConnection(Configuration conf, String zkClusterConfKey, 241 String configPrefix) 242 throws IOException { 243 String zkCluster = conf.get(zkClusterConfKey); 244 Configuration clusterConf = HBaseConfiguration.createClusterConf(conf, 245 zkCluster, configPrefix); 246 return ConnectionFactory.createConnection(clusterConf); 247 } 248 249 private static Table openTable(Connection connection, Configuration conf, 250 String tableNameConfKey) throws IOException { 251 return connection.getTable(TableName.valueOf(conf.get(tableNameConfKey))); 252 } 253 254 /** 255 * Attempt to read the next source key/hash pair. 256 * If there are no more, set nextSourceKey to null 257 */ 258 private void findNextKeyHashPair() throws IOException { 259 boolean hasNext = sourceHashReader.next(); 260 if (hasNext) { 261 nextSourceKey = sourceHashReader.getCurrentKey(); 262 } else { 263 // no more keys - last hash goes to the end 264 nextSourceKey = null; 265 } 266 } 267 268 @Override 269 protected void map(ImmutableBytesWritable key, Result value, Context context) 270 throws IOException, InterruptedException { 271 try { 272 // first, finish any hash batches that end before the scanned row 273 while (nextSourceKey != null && key.compareTo(nextSourceKey) >= 0) { 274 moveToNextBatch(context); 275 } 276 277 // next, add the scanned row (as long as we've reached the first batch) 278 if (targetHasher.isBatchStarted()) { 279 targetHasher.hashResult(value); 280 } 281 } catch (Throwable t) { 282 mapperException = t; 283 Throwables.propagateIfInstanceOf(t, IOException.class); 284 Throwables.propagateIfInstanceOf(t, InterruptedException.class); 285 Throwables.propagate(t); 286 } 287 } 288 289 /** 290 * If there is an open hash batch, complete it and sync if there are diffs. 291 * Start a new batch, and seek to read the 292 */ 293 private void moveToNextBatch(Context context) throws IOException, InterruptedException { 294 if (targetHasher.isBatchStarted()) { 295 finishBatchAndCompareHashes(context); 296 } 297 targetHasher.startBatch(nextSourceKey); 298 currentSourceHash = sourceHashReader.getCurrentHash(); 299 300 findNextKeyHashPair(); 301 } 302 303 /** 304 * Finish the currently open hash batch. 305 * Compare the target hash to the given source hash. 306 * If they do not match, then sync the covered key range. 307 */ 308 private void finishBatchAndCompareHashes(Context context) 309 throws IOException, InterruptedException { 310 targetHasher.finishBatch(); 311 context.getCounter(Counter.BATCHES).increment(1); 312 if (targetHasher.getBatchSize() == 0) { 313 context.getCounter(Counter.EMPTY_BATCHES).increment(1); 314 } 315 ImmutableBytesWritable targetHash = targetHasher.getBatchHash(); 316 if (targetHash.equals(currentSourceHash)) { 317 context.getCounter(Counter.HASHES_MATCHED).increment(1); 318 } else { 319 context.getCounter(Counter.HASHES_NOT_MATCHED).increment(1); 320 321 ImmutableBytesWritable stopRow = nextSourceKey == null 322 ? new ImmutableBytesWritable(sourceTableHash.stopRow) 323 : nextSourceKey; 324 325 if (LOG.isDebugEnabled()) { 326 LOG.debug("Hash mismatch. Key range: " + toHex(targetHasher.getBatchStartKey()) 327 + " to " + toHex(stopRow) 328 + " sourceHash: " + toHex(currentSourceHash) 329 + " targetHash: " + toHex(targetHash)); 330 } 331 332 syncRange(context, targetHasher.getBatchStartKey(), stopRow); 333 } 334 } 335 private static String toHex(ImmutableBytesWritable bytes) { 336 return Bytes.toHex(bytes.get(), bytes.getOffset(), bytes.getLength()); 337 } 338 339 private static final CellScanner EMPTY_CELL_SCANNER 340 = new CellScanner(Collections.<Result>emptyIterator()); 341 342 /** 343 * Rescan the given range directly from the source and target tables. 344 * Count and log differences, and if this is not a dry run, output Puts and Deletes 345 * to make the target table match the source table for this range 346 */ 347 private void syncRange(Context context, ImmutableBytesWritable startRow, 348 ImmutableBytesWritable stopRow) throws IOException, InterruptedException { 349 Scan scan = sourceTableHash.initScan(); 350 scan.setStartRow(startRow.copyBytes()); 351 scan.setStopRow(stopRow.copyBytes()); 352 353 ResultScanner sourceScanner = sourceTable.getScanner(scan); 354 CellScanner sourceCells = new CellScanner(sourceScanner.iterator()); 355 356 ResultScanner targetScanner = targetTable.getScanner(new Scan(scan)); 357 CellScanner targetCells = new CellScanner(targetScanner.iterator()); 358 359 boolean rangeMatched = true; 360 byte[] nextSourceRow = sourceCells.nextRow(); 361 byte[] nextTargetRow = targetCells.nextRow(); 362 while(nextSourceRow != null || nextTargetRow != null) { 363 boolean rowMatched; 364 int rowComparison = compareRowKeys(nextSourceRow, nextTargetRow); 365 if (rowComparison < 0) { 366 if (LOG.isDebugEnabled()) { 367 LOG.debug("Target missing row: " + Bytes.toString(nextSourceRow)); 368 } 369 context.getCounter(Counter.TARGETMISSINGROWS).increment(1); 370 371 rowMatched = syncRowCells(context, nextSourceRow, sourceCells, EMPTY_CELL_SCANNER); 372 nextSourceRow = sourceCells.nextRow(); // advance only source to next row 373 } else if (rowComparison > 0) { 374 if (LOG.isDebugEnabled()) { 375 LOG.debug("Source missing row: " + Bytes.toString(nextTargetRow)); 376 } 377 context.getCounter(Counter.SOURCEMISSINGROWS).increment(1); 378 379 rowMatched = syncRowCells(context, nextTargetRow, EMPTY_CELL_SCANNER, targetCells); 380 nextTargetRow = targetCells.nextRow(); // advance only target to next row 381 } else { 382 // current row is the same on both sides, compare cell by cell 383 rowMatched = syncRowCells(context, nextSourceRow, sourceCells, targetCells); 384 nextSourceRow = sourceCells.nextRow(); 385 nextTargetRow = targetCells.nextRow(); 386 } 387 388 if (!rowMatched) { 389 rangeMatched = false; 390 } 391 } 392 393 sourceScanner.close(); 394 targetScanner.close(); 395 396 context.getCounter(rangeMatched ? Counter.RANGESMATCHED : Counter.RANGESNOTMATCHED) 397 .increment(1); 398 } 399 400 private static class CellScanner { 401 private final Iterator<Result> results; 402 403 private byte[] currentRow; 404 private Result currentRowResult; 405 private int nextCellInRow; 406 407 private Result nextRowResult; 408 409 public CellScanner(Iterator<Result> results) { 410 this.results = results; 411 } 412 413 /** 414 * Advance to the next row and return its row key. 415 * Returns null iff there are no more rows. 416 */ 417 public byte[] nextRow() { 418 if (nextRowResult == null) { 419 // no cached row - check scanner for more 420 while (results.hasNext()) { 421 nextRowResult = results.next(); 422 Cell nextCell = nextRowResult.rawCells()[0]; 423 if (currentRow == null 424 || !Bytes.equals(currentRow, 0, currentRow.length, nextCell.getRowArray(), 425 nextCell.getRowOffset(), nextCell.getRowLength())) { 426 // found next row 427 break; 428 } else { 429 // found another result from current row, keep scanning 430 nextRowResult = null; 431 } 432 } 433 434 if (nextRowResult == null) { 435 // end of data, no more rows 436 currentRowResult = null; 437 currentRow = null; 438 return null; 439 } 440 } 441 442 // advance to cached result for next row 443 currentRowResult = nextRowResult; 444 nextCellInRow = 0; 445 currentRow = currentRowResult.getRow(); 446 nextRowResult = null; 447 return currentRow; 448 } 449 450 /** 451 * Returns the next Cell in the current row or null iff none remain. 452 */ 453 public Cell nextCellInRow() { 454 if (currentRowResult == null) { 455 // nothing left in current row 456 return null; 457 } 458 459 Cell nextCell = currentRowResult.rawCells()[nextCellInRow]; 460 nextCellInRow++; 461 if (nextCellInRow == currentRowResult.size()) { 462 if (results.hasNext()) { 463 Result result = results.next(); 464 Cell cell = result.rawCells()[0]; 465 if (Bytes.equals(currentRow, 0, currentRow.length, cell.getRowArray(), 466 cell.getRowOffset(), cell.getRowLength())) { 467 // result is part of current row 468 currentRowResult = result; 469 nextCellInRow = 0; 470 } else { 471 // result is part of next row, cache it 472 nextRowResult = result; 473 // current row is complete 474 currentRowResult = null; 475 } 476 } else { 477 // end of data 478 currentRowResult = null; 479 } 480 } 481 return nextCell; 482 } 483 } 484 485 private Cell checkAndResetTimestamp(Cell sourceCell){ 486 if (ignoreTimestamp) { 487 sourceCell = CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY) 488 .setType(sourceCell.getType()) 489 .setRow(sourceCell.getRowArray(), 490 sourceCell.getRowOffset(), sourceCell.getRowLength()) 491 .setFamily(sourceCell.getFamilyArray(), 492 sourceCell.getFamilyOffset(), sourceCell.getFamilyLength()) 493 .setQualifier(sourceCell.getQualifierArray(), 494 sourceCell.getQualifierOffset(), sourceCell.getQualifierLength()) 495 .setTimestamp(System.currentTimeMillis()) 496 .setValue(sourceCell.getValueArray(), 497 sourceCell.getValueOffset(), sourceCell.getValueLength()).build(); 498 } 499 return sourceCell; 500 } 501 502 /** 503 * Compare the cells for the given row from the source and target tables. 504 * Count and log any differences. 505 * If not a dry run, output a Put and/or Delete needed to sync the target table 506 * to match the source table. 507 */ 508 private boolean syncRowCells(Context context, byte[] rowKey, CellScanner sourceCells, 509 CellScanner targetCells) throws IOException, InterruptedException { 510 Put put = null; 511 Delete delete = null; 512 long matchingCells = 0; 513 boolean matchingRow = true; 514 Cell sourceCell = sourceCells.nextCellInRow(); 515 Cell targetCell = targetCells.nextCellInRow(); 516 while (sourceCell != null || targetCell != null) { 517 518 int cellKeyComparison = compareCellKeysWithinRow(sourceCell, targetCell); 519 if (cellKeyComparison < 0) { 520 if (LOG.isDebugEnabled()) { 521 LOG.debug("Target missing cell: " + sourceCell); 522 } 523 context.getCounter(Counter.TARGETMISSINGCELLS).increment(1); 524 matchingRow = false; 525 526 if (!dryRun && doPuts) { 527 if (put == null) { 528 put = new Put(rowKey); 529 } 530 sourceCell = checkAndResetTimestamp(sourceCell); 531 put.add(sourceCell); 532 } 533 534 sourceCell = sourceCells.nextCellInRow(); 535 } else if (cellKeyComparison > 0) { 536 if (LOG.isDebugEnabled()) { 537 LOG.debug("Source missing cell: " + targetCell); 538 } 539 context.getCounter(Counter.SOURCEMISSINGCELLS).increment(1); 540 matchingRow = false; 541 542 if (!dryRun && doDeletes) { 543 if (delete == null) { 544 delete = new Delete(rowKey); 545 } 546 // add a tombstone to exactly match the target cell that is missing on the source 547 delete.addColumn(CellUtil.cloneFamily(targetCell), 548 CellUtil.cloneQualifier(targetCell), targetCell.getTimestamp()); 549 } 550 551 targetCell = targetCells.nextCellInRow(); 552 } else { 553 // the cell keys are equal, now check values 554 if (CellUtil.matchingValue(sourceCell, targetCell)) { 555 matchingCells++; 556 } else { 557 if (LOG.isDebugEnabled()) { 558 LOG.debug("Different values: "); 559 LOG.debug(" source cell: " + sourceCell 560 + " value: " + Bytes.toString(sourceCell.getValueArray(), 561 sourceCell.getValueOffset(), sourceCell.getValueLength())); 562 LOG.debug(" target cell: " + targetCell 563 + " value: " + Bytes.toString(targetCell.getValueArray(), 564 targetCell.getValueOffset(), targetCell.getValueLength())); 565 } 566 context.getCounter(Counter.DIFFERENTCELLVALUES).increment(1); 567 matchingRow = false; 568 569 if (!dryRun && doPuts) { 570 // overwrite target cell 571 if (put == null) { 572 put = new Put(rowKey); 573 } 574 sourceCell = checkAndResetTimestamp(sourceCell); 575 put.add(sourceCell); 576 } 577 } 578 sourceCell = sourceCells.nextCellInRow(); 579 targetCell = targetCells.nextCellInRow(); 580 } 581 582 if (!dryRun && sourceTableHash.scanBatch > 0) { 583 if (put != null && put.size() >= sourceTableHash.scanBatch) { 584 context.write(new ImmutableBytesWritable(rowKey), put); 585 put = null; 586 } 587 if (delete != null && delete.size() >= sourceTableHash.scanBatch) { 588 context.write(new ImmutableBytesWritable(rowKey), delete); 589 delete = null; 590 } 591 } 592 } 593 594 if (!dryRun) { 595 if (put != null) { 596 context.write(new ImmutableBytesWritable(rowKey), put); 597 } 598 if (delete != null) { 599 context.write(new ImmutableBytesWritable(rowKey), delete); 600 } 601 } 602 603 if (matchingCells > 0) { 604 context.getCounter(Counter.MATCHINGCELLS).increment(matchingCells); 605 } 606 if (matchingRow) { 607 context.getCounter(Counter.MATCHINGROWS).increment(1); 608 return true; 609 } else { 610 context.getCounter(Counter.ROWSWITHDIFFS).increment(1); 611 return false; 612 } 613 } 614 615 /** 616 * Compare row keys of the given Result objects. 617 * Nulls are after non-nulls 618 */ 619 private static int compareRowKeys(byte[] r1, byte[] r2) { 620 if (r1 == null) { 621 return 1; // source missing row 622 } else if (r2 == null) { 623 return -1; // target missing row 624 } else { 625 // Sync on no META tables only. We can directly do what CellComparator is doing inside. 626 // Never the call going to MetaCellComparator. 627 return Bytes.compareTo(r1, 0, r1.length, r2, 0, r2.length); 628 } 629 } 630 631 /** 632 * Compare families, qualifiers, and timestamps of the given Cells. 633 * They are assumed to be of the same row. 634 * Nulls are after non-nulls. 635 */ 636 private int compareCellKeysWithinRow(Cell c1, Cell c2) { 637 if (c1 == null) { 638 return 1; // source missing cell 639 } 640 if (c2 == null) { 641 return -1; // target missing cell 642 } 643 644 int result = CellComparator.getInstance().compareFamilies(c1, c2); 645 if (result != 0) { 646 return result; 647 } 648 649 result = CellComparator.getInstance().compareQualifiers(c1, c2); 650 if (result != 0) { 651 return result; 652 } 653 654 if (this.ignoreTimestamp) { 655 return 0; 656 } else { 657 // note timestamp comparison is inverted - more recent cells first 658 return CellComparator.getInstance().compareTimestamps(c1, c2); 659 } 660 } 661 662 @Override 663 protected void cleanup(Context context) 664 throws IOException, InterruptedException { 665 if (mapperException == null) { 666 try { 667 finishRemainingHashRanges(context); 668 } catch (Throwable t) { 669 mapperException = t; 670 } 671 } 672 673 try { 674 sourceTable.close(); 675 targetTable.close(); 676 sourceConnection.close(); 677 targetConnection.close(); 678 } catch (Throwable t) { 679 if (mapperException == null) { 680 mapperException = t; 681 } else { 682 LOG.error("Suppressing exception from closing tables", t); 683 } 684 } 685 686 // propagate first exception 687 if (mapperException != null) { 688 Throwables.propagateIfInstanceOf(mapperException, IOException.class); 689 Throwables.propagateIfInstanceOf(mapperException, InterruptedException.class); 690 Throwables.propagate(mapperException); 691 } 692 } 693 694 private void finishRemainingHashRanges(Context context) throws IOException, 695 InterruptedException { 696 TableSplit split = (TableSplit) context.getInputSplit(); 697 byte[] splitEndRow = split.getEndRow(); 698 boolean reachedEndOfTable = HashTable.isTableEndRow(splitEndRow); 699 700 // if there are more hash batches that begin before the end of this split move to them 701 while (nextSourceKey != null 702 && (nextSourceKey.compareTo(splitEndRow) < 0 || reachedEndOfTable)) { 703 moveToNextBatch(context); 704 } 705 706 if (targetHasher.isBatchStarted()) { 707 // need to complete the final open hash batch 708 709 if ((nextSourceKey != null && nextSourceKey.compareTo(splitEndRow) > 0) 710 || (nextSourceKey == null && !Bytes.equals(splitEndRow, sourceTableHash.stopRow))) { 711 // the open hash range continues past the end of this region 712 // add a scan to complete the current hash range 713 Scan scan = sourceTableHash.initScan(); 714 scan.setStartRow(splitEndRow); 715 if (nextSourceKey == null) { 716 scan.setStopRow(sourceTableHash.stopRow); 717 } else { 718 scan.setStopRow(nextSourceKey.copyBytes()); 719 } 720 721 ResultScanner targetScanner = null; 722 try { 723 targetScanner = targetTable.getScanner(scan); 724 for (Result row : targetScanner) { 725 targetHasher.hashResult(row); 726 } 727 } finally { 728 if (targetScanner != null) { 729 targetScanner.close(); 730 } 731 } 732 } // else current batch ends exactly at split end row 733 734 finishBatchAndCompareHashes(context); 735 } 736 } 737 } 738 739 private static final int NUM_ARGS = 3; 740 private static void printUsage(final String errorMsg) { 741 if (errorMsg != null && errorMsg.length() > 0) { 742 System.err.println("ERROR: " + errorMsg); 743 System.err.println(); 744 } 745 System.err.println("Usage: SyncTable [options] <sourcehashdir> <sourcetable> <targettable>"); 746 System.err.println(); 747 System.err.println("Options:"); 748 749 System.err.println(" sourcezkcluster ZK cluster key of the source table"); 750 System.err.println(" (defaults to cluster in classpath's config)"); 751 System.err.println(" targetzkcluster ZK cluster key of the target table"); 752 System.err.println(" (defaults to cluster in classpath's config)"); 753 System.err.println(" dryrun if true, output counters but no writes"); 754 System.err.println(" (defaults to false)"); 755 System.err.println(" doDeletes if false, does not perform deletes"); 756 System.err.println(" (defaults to true)"); 757 System.err.println(" doPuts if false, does not perform puts"); 758 System.err.println(" (defaults to true)"); 759 System.err.println(" ignoreTimestamps if true, ignores cells timestamps while comparing "); 760 System.err.println(" cell values. Any missing cell on target then gets"); 761 System.err.println(" added with current time as timestamp "); 762 System.err.println(" (defaults to false)"); 763 System.err.println(); 764 System.err.println("Args:"); 765 System.err.println(" sourcehashdir path to HashTable output dir for source table"); 766 System.err.println(" (see org.apache.hadoop.hbase.mapreduce.HashTable)"); 767 System.err.println(" sourcetable Name of the source table to sync from"); 768 System.err.println(" targettable Name of the target table to sync to"); 769 System.err.println(); 770 System.err.println("Examples:"); 771 System.err.println(" For a dry run SyncTable of tableA from a remote source cluster"); 772 System.err.println(" to a local target cluster:"); 773 System.err.println(" $ hbase " + 774 "org.apache.hadoop.hbase.mapreduce.SyncTable --dryrun=true" 775 + " --sourcezkcluster=zk1.example.com,zk2.example.com,zk3.example.com:2181:/hbase" 776 + " hdfs://nn:9000/hashes/tableA tableA tableA"); 777 } 778 779 private boolean doCommandLine(final String[] args) { 780 if (args.length < NUM_ARGS) { 781 printUsage(null); 782 return false; 783 } 784 try { 785 sourceHashDir = new Path(args[args.length - 3]); 786 sourceTableName = args[args.length - 2]; 787 targetTableName = args[args.length - 1]; 788 789 for (int i = 0; i < args.length - NUM_ARGS; i++) { 790 String cmd = args[i]; 791 if (cmd.equals("-h") || cmd.startsWith("--h")) { 792 printUsage(null); 793 return false; 794 } 795 796 final String sourceZkClusterKey = "--sourcezkcluster="; 797 if (cmd.startsWith(sourceZkClusterKey)) { 798 sourceZkCluster = cmd.substring(sourceZkClusterKey.length()); 799 continue; 800 } 801 802 final String targetZkClusterKey = "--targetzkcluster="; 803 if (cmd.startsWith(targetZkClusterKey)) { 804 targetZkCluster = cmd.substring(targetZkClusterKey.length()); 805 continue; 806 } 807 808 final String dryRunKey = "--dryrun="; 809 if (cmd.startsWith(dryRunKey)) { 810 dryRun = Boolean.parseBoolean(cmd.substring(dryRunKey.length())); 811 continue; 812 } 813 814 final String doDeletesKey = "--doDeletes="; 815 if (cmd.startsWith(doDeletesKey)) { 816 doDeletes = Boolean.parseBoolean(cmd.substring(doDeletesKey.length())); 817 continue; 818 } 819 820 final String doPutsKey = "--doPuts="; 821 if (cmd.startsWith(doPutsKey)) { 822 doPuts = Boolean.parseBoolean(cmd.substring(doPutsKey.length())); 823 continue; 824 } 825 826 final String ignoreTimestampsKey = "--ignoreTimestamps="; 827 if (cmd.startsWith(ignoreTimestampsKey)) { 828 ignoreTimestamps = Boolean.parseBoolean(cmd.substring(ignoreTimestampsKey.length())); 829 continue; 830 } 831 832 printUsage("Invalid argument '" + cmd + "'"); 833 return false; 834 } 835 836 837 } catch (Exception e) { 838 e.printStackTrace(); 839 printUsage("Can't start because " + e.getMessage()); 840 return false; 841 } 842 return true; 843 } 844 845 /** 846 * Main entry point. 847 */ 848 public static void main(String[] args) throws Exception { 849 int ret = ToolRunner.run(new SyncTable(HBaseConfiguration.create()), args); 850 System.exit(ret); 851 } 852 853 @Override 854 public int run(String[] args) throws Exception { 855 String[] otherArgs = new GenericOptionsParser(getConf(), args).getRemainingArgs(); 856 if (!doCommandLine(otherArgs)) { 857 return 1; 858 } 859 860 Job job = createSubmittableJob(otherArgs); 861 if (!job.waitForCompletion(true)) { 862 LOG.info("Map-reduce job failed!"); 863 return 1; 864 } 865 counters = job.getCounters(); 866 return 0; 867 } 868 869}