001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import java.io.IOException; 021import java.util.Collections; 022import java.util.Iterator; 023import org.apache.hadoop.conf.Configuration; 024import org.apache.hadoop.conf.Configured; 025import org.apache.hadoop.fs.FileStatus; 026import org.apache.hadoop.fs.FileSystem; 027import org.apache.hadoop.fs.Path; 028import org.apache.hadoop.hbase.Cell; 029import org.apache.hadoop.hbase.CellComparator; 030import org.apache.hadoop.hbase.CellUtil; 031import org.apache.hadoop.hbase.HBaseConfiguration; 032import org.apache.hadoop.hbase.TableName; 033import org.apache.hadoop.hbase.client.Connection; 034import org.apache.hadoop.hbase.client.ConnectionFactory; 035import org.apache.hadoop.hbase.client.Delete; 036import org.apache.hadoop.hbase.client.Mutation; 037import org.apache.hadoop.hbase.client.Put; 038import org.apache.hadoop.hbase.client.Result; 039import org.apache.hadoop.hbase.client.ResultScanner; 040import org.apache.hadoop.hbase.client.Scan; 041import org.apache.hadoop.hbase.client.Table; 042import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 043import org.apache.hadoop.hbase.util.Bytes; 044import org.apache.hadoop.mapreduce.Counters; 045import org.apache.hadoop.mapreduce.Job; 046import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; 047import org.apache.hadoop.mapreduce.security.TokenCache; 048import org.apache.hadoop.util.GenericOptionsParser; 049import org.apache.hadoop.util.Tool; 050import org.apache.hadoop.util.ToolRunner; 051import org.apache.yetus.audience.InterfaceAudience; 052import org.slf4j.Logger; 053import org.slf4j.LoggerFactory; 054 055import org.apache.hbase.thirdparty.com.google.common.base.Throwables; 056 057@InterfaceAudience.Private 058public class SyncTable extends Configured implements Tool { 059 060 private static final Logger LOG = LoggerFactory.getLogger(SyncTable.class); 061 062 static final String SOURCE_HASH_DIR_CONF_KEY = "sync.table.source.hash.dir"; 063 static final String SOURCE_TABLE_CONF_KEY = "sync.table.source.table.name"; 064 static final String TARGET_TABLE_CONF_KEY = "sync.table.target.table.name"; 065 static final String SOURCE_ZK_CLUSTER_CONF_KEY = "sync.table.source.zk.cluster"; 066 static final String TARGET_ZK_CLUSTER_CONF_KEY = "sync.table.target.zk.cluster"; 067 static final String DRY_RUN_CONF_KEY = "sync.table.dry.run"; 068 static final String DO_DELETES_CONF_KEY = "sync.table.do.deletes"; 069 static final String DO_PUTS_CONF_KEY = "sync.table.do.puts"; 070 071 Path sourceHashDir; 072 String sourceTableName; 073 String targetTableName; 074 075 String sourceZkCluster; 076 String targetZkCluster; 077 boolean dryRun; 078 boolean doDeletes = true; 079 boolean doPuts = true; 080 081 Counters counters; 082 083 public SyncTable(Configuration conf) { 084 super(conf); 085 } 086 087 private void initCredentialsForHBase(String zookeeper, Job job) throws IOException { 088 Configuration peerConf = HBaseConfiguration.createClusterConf(job 089 .getConfiguration(), zookeeper); 090 if(peerConf.get("hbase.security.authentication").equals("kerberos")){ 091 TableMapReduceUtil.initCredentialsForCluster(job, peerConf); 092 } 093 } 094 095 public Job createSubmittableJob(String[] args) throws IOException { 096 FileSystem fs = sourceHashDir.getFileSystem(getConf()); 097 if (!fs.exists(sourceHashDir)) { 098 throw new IOException("Source hash dir not found: " + sourceHashDir); 099 } 100 101 Job job = Job.getInstance(getConf(),getConf().get("mapreduce.job.name", 102 "syncTable_" + sourceTableName + "-" + targetTableName)); 103 Configuration jobConf = job.getConfiguration(); 104 if (jobConf.get("hadoop.security.authentication").equals("kerberos")) { 105 TokenCache.obtainTokensForNamenodes(job.getCredentials(), new 106 Path[] { sourceHashDir }, getConf()); 107 } 108 109 HashTable.TableHash tableHash = HashTable.TableHash.read(getConf(), sourceHashDir); 110 LOG.info("Read source hash manifest: " + tableHash); 111 LOG.info("Read " + tableHash.partitions.size() + " partition keys"); 112 if (!tableHash.tableName.equals(sourceTableName)) { 113 LOG.warn("Table name mismatch - manifest indicates hash was taken from: " 114 + tableHash.tableName + " but job is reading from: " + sourceTableName); 115 } 116 if (tableHash.numHashFiles != tableHash.partitions.size() + 1) { 117 throw new RuntimeException("Hash data appears corrupt. The number of of hash files created" 118 + " should be 1 more than the number of partition keys. However, the manifest file " 119 + " says numHashFiles=" + tableHash.numHashFiles + " but the number of partition keys" 120 + " found in the partitions file is " + tableHash.partitions.size()); 121 } 122 123 Path dataDir = new Path(sourceHashDir, HashTable.HASH_DATA_DIR); 124 int dataSubdirCount = 0; 125 for (FileStatus file : fs.listStatus(dataDir)) { 126 if (file.getPath().getName().startsWith(HashTable.OUTPUT_DATA_FILE_PREFIX)) { 127 dataSubdirCount++; 128 } 129 } 130 131 if (dataSubdirCount != tableHash.numHashFiles) { 132 throw new RuntimeException("Hash data appears corrupt. The number of of hash files created" 133 + " should be 1 more than the number of partition keys. However, the number of data dirs" 134 + " found is " + dataSubdirCount + " but the number of partition keys" 135 + " found in the partitions file is " + tableHash.partitions.size()); 136 } 137 138 job.setJarByClass(HashTable.class); 139 jobConf.set(SOURCE_HASH_DIR_CONF_KEY, sourceHashDir.toString()); 140 jobConf.set(SOURCE_TABLE_CONF_KEY, sourceTableName); 141 jobConf.set(TARGET_TABLE_CONF_KEY, targetTableName); 142 if (sourceZkCluster != null) { 143 jobConf.set(SOURCE_ZK_CLUSTER_CONF_KEY, sourceZkCluster); 144 initCredentialsForHBase(sourceZkCluster, job); 145 } 146 if (targetZkCluster != null) { 147 jobConf.set(TARGET_ZK_CLUSTER_CONF_KEY, targetZkCluster); 148 initCredentialsForHBase(targetZkCluster, job); 149 } 150 jobConf.setBoolean(DRY_RUN_CONF_KEY, dryRun); 151 jobConf.setBoolean(DO_DELETES_CONF_KEY, doDeletes); 152 jobConf.setBoolean(DO_PUTS_CONF_KEY, doPuts); 153 154 TableMapReduceUtil.initTableMapperJob(targetTableName, tableHash.initScan(), 155 SyncMapper.class, null, null, job); 156 157 job.setNumReduceTasks(0); 158 159 if (dryRun) { 160 job.setOutputFormatClass(NullOutputFormat.class); 161 } else { 162 // No reducers. Just write straight to table. Call initTableReducerJob 163 // because it sets up the TableOutputFormat. 164 TableMapReduceUtil.initTableReducerJob(targetTableName, null, job, null, 165 targetZkCluster, null, null); 166 167 // would be nice to add an option for bulk load instead 168 } 169 170 // Obtain an authentication token, for the specified cluster, on behalf of the current user 171 if (sourceZkCluster != null) { 172 Configuration peerConf = 173 HBaseConfiguration.createClusterConf(job.getConfiguration(), sourceZkCluster); 174 TableMapReduceUtil.initCredentialsForCluster(job, peerConf); 175 } 176 return job; 177 } 178 179 public static class SyncMapper extends TableMapper<ImmutableBytesWritable, Mutation> { 180 Path sourceHashDir; 181 182 Connection sourceConnection; 183 Connection targetConnection; 184 Table sourceTable; 185 Table targetTable; 186 boolean dryRun; 187 boolean doDeletes = true; 188 boolean doPuts = true; 189 190 HashTable.TableHash sourceTableHash; 191 HashTable.TableHash.Reader sourceHashReader; 192 ImmutableBytesWritable currentSourceHash; 193 ImmutableBytesWritable nextSourceKey; 194 HashTable.ResultHasher targetHasher; 195 196 Throwable mapperException; 197 198 public static enum Counter {BATCHES, HASHES_MATCHED, HASHES_NOT_MATCHED, SOURCEMISSINGROWS, 199 SOURCEMISSINGCELLS, TARGETMISSINGROWS, TARGETMISSINGCELLS, ROWSWITHDIFFS, DIFFERENTCELLVALUES, 200 MATCHINGROWS, MATCHINGCELLS, EMPTY_BATCHES, RANGESMATCHED, RANGESNOTMATCHED}; 201 202 @Override 203 protected void setup(Context context) throws IOException { 204 205 Configuration conf = context.getConfiguration(); 206 sourceHashDir = new Path(conf.get(SOURCE_HASH_DIR_CONF_KEY)); 207 sourceConnection = openConnection(conf, SOURCE_ZK_CLUSTER_CONF_KEY, null); 208 targetConnection = openConnection(conf, TARGET_ZK_CLUSTER_CONF_KEY, 209 TableOutputFormat.OUTPUT_CONF_PREFIX); 210 sourceTable = openTable(sourceConnection, conf, SOURCE_TABLE_CONF_KEY); 211 targetTable = openTable(targetConnection, conf, TARGET_TABLE_CONF_KEY); 212 dryRun = conf.getBoolean(DRY_RUN_CONF_KEY, false); 213 doDeletes = conf.getBoolean(DO_DELETES_CONF_KEY, true); 214 doPuts = conf.getBoolean(DO_PUTS_CONF_KEY, true); 215 216 sourceTableHash = HashTable.TableHash.read(conf, sourceHashDir); 217 LOG.info("Read source hash manifest: " + sourceTableHash); 218 LOG.info("Read " + sourceTableHash.partitions.size() + " partition keys"); 219 220 TableSplit split = (TableSplit) context.getInputSplit(); 221 ImmutableBytesWritable splitStartKey = new ImmutableBytesWritable(split.getStartRow()); 222 223 sourceHashReader = sourceTableHash.newReader(conf, splitStartKey); 224 findNextKeyHashPair(); 225 226 // create a hasher, but don't start it right away 227 // instead, find the first hash batch at or after the start row 228 // and skip any rows that come before. they will be caught by the previous task 229 targetHasher = new HashTable.ResultHasher(); 230 } 231 232 private static Connection openConnection(Configuration conf, String zkClusterConfKey, 233 String configPrefix) 234 throws IOException { 235 String zkCluster = conf.get(zkClusterConfKey); 236 Configuration clusterConf = HBaseConfiguration.createClusterConf(conf, 237 zkCluster, configPrefix); 238 return ConnectionFactory.createConnection(clusterConf); 239 } 240 241 private static Table openTable(Connection connection, Configuration conf, 242 String tableNameConfKey) throws IOException { 243 return connection.getTable(TableName.valueOf(conf.get(tableNameConfKey))); 244 } 245 246 /** 247 * Attempt to read the next source key/hash pair. 248 * If there are no more, set nextSourceKey to null 249 */ 250 private void findNextKeyHashPair() throws IOException { 251 boolean hasNext = sourceHashReader.next(); 252 if (hasNext) { 253 nextSourceKey = sourceHashReader.getCurrentKey(); 254 } else { 255 // no more keys - last hash goes to the end 256 nextSourceKey = null; 257 } 258 } 259 260 @Override 261 protected void map(ImmutableBytesWritable key, Result value, Context context) 262 throws IOException, InterruptedException { 263 try { 264 // first, finish any hash batches that end before the scanned row 265 while (nextSourceKey != null && key.compareTo(nextSourceKey) >= 0) { 266 moveToNextBatch(context); 267 } 268 269 // next, add the scanned row (as long as we've reached the first batch) 270 if (targetHasher.isBatchStarted()) { 271 targetHasher.hashResult(value); 272 } 273 } catch (Throwable t) { 274 mapperException = t; 275 Throwables.propagateIfInstanceOf(t, IOException.class); 276 Throwables.propagateIfInstanceOf(t, InterruptedException.class); 277 Throwables.propagate(t); 278 } 279 } 280 281 /** 282 * If there is an open hash batch, complete it and sync if there are diffs. 283 * Start a new batch, and seek to read the 284 */ 285 private void moveToNextBatch(Context context) throws IOException, InterruptedException { 286 if (targetHasher.isBatchStarted()) { 287 finishBatchAndCompareHashes(context); 288 } 289 targetHasher.startBatch(nextSourceKey); 290 currentSourceHash = sourceHashReader.getCurrentHash(); 291 292 findNextKeyHashPair(); 293 } 294 295 /** 296 * Finish the currently open hash batch. 297 * Compare the target hash to the given source hash. 298 * If they do not match, then sync the covered key range. 299 */ 300 private void finishBatchAndCompareHashes(Context context) 301 throws IOException, InterruptedException { 302 targetHasher.finishBatch(); 303 context.getCounter(Counter.BATCHES).increment(1); 304 if (targetHasher.getBatchSize() == 0) { 305 context.getCounter(Counter.EMPTY_BATCHES).increment(1); 306 } 307 ImmutableBytesWritable targetHash = targetHasher.getBatchHash(); 308 if (targetHash.equals(currentSourceHash)) { 309 context.getCounter(Counter.HASHES_MATCHED).increment(1); 310 } else { 311 context.getCounter(Counter.HASHES_NOT_MATCHED).increment(1); 312 313 ImmutableBytesWritable stopRow = nextSourceKey == null 314 ? new ImmutableBytesWritable(sourceTableHash.stopRow) 315 : nextSourceKey; 316 317 if (LOG.isDebugEnabled()) { 318 LOG.debug("Hash mismatch. Key range: " + toHex(targetHasher.getBatchStartKey()) 319 + " to " + toHex(stopRow) 320 + " sourceHash: " + toHex(currentSourceHash) 321 + " targetHash: " + toHex(targetHash)); 322 } 323 324 syncRange(context, targetHasher.getBatchStartKey(), stopRow); 325 } 326 } 327 private static String toHex(ImmutableBytesWritable bytes) { 328 return Bytes.toHex(bytes.get(), bytes.getOffset(), bytes.getLength()); 329 } 330 331 private static final CellScanner EMPTY_CELL_SCANNER 332 = new CellScanner(Collections.<Result>emptyIterator()); 333 334 /** 335 * Rescan the given range directly from the source and target tables. 336 * Count and log differences, and if this is not a dry run, output Puts and Deletes 337 * to make the target table match the source table for this range 338 */ 339 private void syncRange(Context context, ImmutableBytesWritable startRow, 340 ImmutableBytesWritable stopRow) throws IOException, InterruptedException { 341 Scan scan = sourceTableHash.initScan(); 342 scan.setStartRow(startRow.copyBytes()); 343 scan.setStopRow(stopRow.copyBytes()); 344 345 ResultScanner sourceScanner = sourceTable.getScanner(scan); 346 CellScanner sourceCells = new CellScanner(sourceScanner.iterator()); 347 348 ResultScanner targetScanner = targetTable.getScanner(new Scan(scan)); 349 CellScanner targetCells = new CellScanner(targetScanner.iterator()); 350 351 boolean rangeMatched = true; 352 byte[] nextSourceRow = sourceCells.nextRow(); 353 byte[] nextTargetRow = targetCells.nextRow(); 354 while(nextSourceRow != null || nextTargetRow != null) { 355 boolean rowMatched; 356 int rowComparison = compareRowKeys(nextSourceRow, nextTargetRow); 357 if (rowComparison < 0) { 358 if (LOG.isInfoEnabled()) { 359 LOG.info("Target missing row: " + Bytes.toHex(nextSourceRow)); 360 } 361 context.getCounter(Counter.TARGETMISSINGROWS).increment(1); 362 363 rowMatched = syncRowCells(context, nextSourceRow, sourceCells, EMPTY_CELL_SCANNER); 364 nextSourceRow = sourceCells.nextRow(); // advance only source to next row 365 } else if (rowComparison > 0) { 366 if (LOG.isInfoEnabled()) { 367 LOG.info("Source missing row: " + Bytes.toHex(nextTargetRow)); 368 } 369 context.getCounter(Counter.SOURCEMISSINGROWS).increment(1); 370 371 rowMatched = syncRowCells(context, nextTargetRow, EMPTY_CELL_SCANNER, targetCells); 372 nextTargetRow = targetCells.nextRow(); // advance only target to next row 373 } else { 374 // current row is the same on both sides, compare cell by cell 375 rowMatched = syncRowCells(context, nextSourceRow, sourceCells, targetCells); 376 nextSourceRow = sourceCells.nextRow(); 377 nextTargetRow = targetCells.nextRow(); 378 } 379 380 if (!rowMatched) { 381 rangeMatched = false; 382 } 383 } 384 385 sourceScanner.close(); 386 targetScanner.close(); 387 388 context.getCounter(rangeMatched ? Counter.RANGESMATCHED : Counter.RANGESNOTMATCHED) 389 .increment(1); 390 } 391 392 private static class CellScanner { 393 private final Iterator<Result> results; 394 395 private byte[] currentRow; 396 private Result currentRowResult; 397 private int nextCellInRow; 398 399 private Result nextRowResult; 400 401 public CellScanner(Iterator<Result> results) { 402 this.results = results; 403 } 404 405 /** 406 * Advance to the next row and return its row key. 407 * Returns null iff there are no more rows. 408 */ 409 public byte[] nextRow() { 410 if (nextRowResult == null) { 411 // no cached row - check scanner for more 412 while (results.hasNext()) { 413 nextRowResult = results.next(); 414 Cell nextCell = nextRowResult.rawCells()[0]; 415 if (currentRow == null 416 || !Bytes.equals(currentRow, 0, currentRow.length, nextCell.getRowArray(), 417 nextCell.getRowOffset(), nextCell.getRowLength())) { 418 // found next row 419 break; 420 } else { 421 // found another result from current row, keep scanning 422 nextRowResult = null; 423 } 424 } 425 426 if (nextRowResult == null) { 427 // end of data, no more rows 428 currentRowResult = null; 429 currentRow = null; 430 return null; 431 } 432 } 433 434 // advance to cached result for next row 435 currentRowResult = nextRowResult; 436 nextCellInRow = 0; 437 currentRow = currentRowResult.getRow(); 438 nextRowResult = null; 439 return currentRow; 440 } 441 442 /** 443 * Returns the next Cell in the current row or null iff none remain. 444 */ 445 public Cell nextCellInRow() { 446 if (currentRowResult == null) { 447 // nothing left in current row 448 return null; 449 } 450 451 Cell nextCell = currentRowResult.rawCells()[nextCellInRow]; 452 nextCellInRow++; 453 if (nextCellInRow == currentRowResult.size()) { 454 if (results.hasNext()) { 455 Result result = results.next(); 456 Cell cell = result.rawCells()[0]; 457 if (Bytes.equals(currentRow, 0, currentRow.length, cell.getRowArray(), 458 cell.getRowOffset(), cell.getRowLength())) { 459 // result is part of current row 460 currentRowResult = result; 461 nextCellInRow = 0; 462 } else { 463 // result is part of next row, cache it 464 nextRowResult = result; 465 // current row is complete 466 currentRowResult = null; 467 } 468 } else { 469 // end of data 470 currentRowResult = null; 471 } 472 } 473 return nextCell; 474 } 475 } 476 477 /** 478 * Compare the cells for the given row from the source and target tables. 479 * Count and log any differences. 480 * If not a dry run, output a Put and/or Delete needed to sync the target table 481 * to match the source table. 482 */ 483 private boolean syncRowCells(Context context, byte[] rowKey, CellScanner sourceCells, 484 CellScanner targetCells) throws IOException, InterruptedException { 485 Put put = null; 486 Delete delete = null; 487 long matchingCells = 0; 488 boolean matchingRow = true; 489 Cell sourceCell = sourceCells.nextCellInRow(); 490 Cell targetCell = targetCells.nextCellInRow(); 491 while (sourceCell != null || targetCell != null) { 492 493 int cellKeyComparison = compareCellKeysWithinRow(sourceCell, targetCell); 494 if (cellKeyComparison < 0) { 495 if (LOG.isDebugEnabled()) { 496 LOG.debug("Target missing cell: " + sourceCell); 497 } 498 context.getCounter(Counter.TARGETMISSINGCELLS).increment(1); 499 matchingRow = false; 500 501 if (!dryRun && doPuts) { 502 if (put == null) { 503 put = new Put(rowKey); 504 } 505 put.add(sourceCell); 506 } 507 508 sourceCell = sourceCells.nextCellInRow(); 509 } else if (cellKeyComparison > 0) { 510 if (LOG.isDebugEnabled()) { 511 LOG.debug("Source missing cell: " + targetCell); 512 } 513 context.getCounter(Counter.SOURCEMISSINGCELLS).increment(1); 514 matchingRow = false; 515 516 if (!dryRun && doDeletes) { 517 if (delete == null) { 518 delete = new Delete(rowKey); 519 } 520 // add a tombstone to exactly match the target cell that is missing on the source 521 delete.addColumn(CellUtil.cloneFamily(targetCell), 522 CellUtil.cloneQualifier(targetCell), targetCell.getTimestamp()); 523 } 524 525 targetCell = targetCells.nextCellInRow(); 526 } else { 527 // the cell keys are equal, now check values 528 if (CellUtil.matchingValue(sourceCell, targetCell)) { 529 matchingCells++; 530 } else { 531 if (LOG.isDebugEnabled()) { 532 LOG.debug("Different values: "); 533 LOG.debug(" source cell: " + sourceCell 534 + " value: " + Bytes.toHex(sourceCell.getValueArray(), 535 sourceCell.getValueOffset(), sourceCell.getValueLength())); 536 LOG.debug(" target cell: " + targetCell 537 + " value: " + Bytes.toHex(targetCell.getValueArray(), 538 targetCell.getValueOffset(), targetCell.getValueLength())); 539 } 540 context.getCounter(Counter.DIFFERENTCELLVALUES).increment(1); 541 matchingRow = false; 542 543 if (!dryRun && doPuts) { 544 // overwrite target cell 545 if (put == null) { 546 put = new Put(rowKey); 547 } 548 put.add(sourceCell); 549 } 550 } 551 sourceCell = sourceCells.nextCellInRow(); 552 targetCell = targetCells.nextCellInRow(); 553 } 554 555 if (!dryRun && sourceTableHash.scanBatch > 0) { 556 if (put != null && put.size() >= sourceTableHash.scanBatch) { 557 context.write(new ImmutableBytesWritable(rowKey), put); 558 put = null; 559 } 560 if (delete != null && delete.size() >= sourceTableHash.scanBatch) { 561 context.write(new ImmutableBytesWritable(rowKey), delete); 562 delete = null; 563 } 564 } 565 } 566 567 if (!dryRun) { 568 if (put != null) { 569 context.write(new ImmutableBytesWritable(rowKey), put); 570 } 571 if (delete != null) { 572 context.write(new ImmutableBytesWritable(rowKey), delete); 573 } 574 } 575 576 if (matchingCells > 0) { 577 context.getCounter(Counter.MATCHINGCELLS).increment(matchingCells); 578 } 579 if (matchingRow) { 580 context.getCounter(Counter.MATCHINGROWS).increment(1); 581 return true; 582 } else { 583 context.getCounter(Counter.ROWSWITHDIFFS).increment(1); 584 return false; 585 } 586 } 587 588 /** 589 * Compare row keys of the given Result objects. 590 * Nulls are after non-nulls 591 */ 592 private static int compareRowKeys(byte[] r1, byte[] r2) { 593 if (r1 == null) { 594 return 1; // source missing row 595 } else if (r2 == null) { 596 return -1; // target missing row 597 } else { 598 // Sync on no META tables only. We can directly do what CellComparator is doing inside. 599 // Never the call going to MetaCellComparator. 600 return Bytes.compareTo(r1, 0, r1.length, r2, 0, r2.length); 601 } 602 } 603 604 /** 605 * Compare families, qualifiers, and timestamps of the given Cells. 606 * They are assumed to be of the same row. 607 * Nulls are after non-nulls. 608 */ 609 private static int compareCellKeysWithinRow(Cell c1, Cell c2) { 610 if (c1 == null) { 611 return 1; // source missing cell 612 } 613 if (c2 == null) { 614 return -1; // target missing cell 615 } 616 617 int result = CellComparator.getInstance().compareFamilies(c1, c2); 618 if (result != 0) { 619 return result; 620 } 621 622 result = CellComparator.getInstance().compareQualifiers(c1, c2); 623 if (result != 0) { 624 return result; 625 } 626 627 // note timestamp comparison is inverted - more recent cells first 628 return CellComparator.getInstance().compareTimestamps(c1, c2); 629 } 630 631 @Override 632 protected void cleanup(Context context) 633 throws IOException, InterruptedException { 634 if (mapperException == null) { 635 try { 636 finishRemainingHashRanges(context); 637 } catch (Throwable t) { 638 mapperException = t; 639 } 640 } 641 642 try { 643 sourceTable.close(); 644 targetTable.close(); 645 sourceConnection.close(); 646 targetConnection.close(); 647 } catch (Throwable t) { 648 if (mapperException == null) { 649 mapperException = t; 650 } else { 651 LOG.error("Suppressing exception from closing tables", t); 652 } 653 } 654 655 // propagate first exception 656 if (mapperException != null) { 657 Throwables.propagateIfInstanceOf(mapperException, IOException.class); 658 Throwables.propagateIfInstanceOf(mapperException, InterruptedException.class); 659 Throwables.propagate(mapperException); 660 } 661 } 662 663 private void finishRemainingHashRanges(Context context) throws IOException, 664 InterruptedException { 665 TableSplit split = (TableSplit) context.getInputSplit(); 666 byte[] splitEndRow = split.getEndRow(); 667 boolean reachedEndOfTable = HashTable.isTableEndRow(splitEndRow); 668 669 // if there are more hash batches that begin before the end of this split move to them 670 while (nextSourceKey != null 671 && (nextSourceKey.compareTo(splitEndRow) < 0 || reachedEndOfTable)) { 672 moveToNextBatch(context); 673 } 674 675 if (targetHasher.isBatchStarted()) { 676 // need to complete the final open hash batch 677 678 if ((nextSourceKey != null && nextSourceKey.compareTo(splitEndRow) > 0) 679 || (nextSourceKey == null && !Bytes.equals(splitEndRow, sourceTableHash.stopRow))) { 680 // the open hash range continues past the end of this region 681 // add a scan to complete the current hash range 682 Scan scan = sourceTableHash.initScan(); 683 scan.setStartRow(splitEndRow); 684 if (nextSourceKey == null) { 685 scan.setStopRow(sourceTableHash.stopRow); 686 } else { 687 scan.setStopRow(nextSourceKey.copyBytes()); 688 } 689 690 ResultScanner targetScanner = null; 691 try { 692 targetScanner = targetTable.getScanner(scan); 693 for (Result row : targetScanner) { 694 targetHasher.hashResult(row); 695 } 696 } finally { 697 if (targetScanner != null) { 698 targetScanner.close(); 699 } 700 } 701 } // else current batch ends exactly at split end row 702 703 finishBatchAndCompareHashes(context); 704 } 705 } 706 } 707 708 private static final int NUM_ARGS = 3; 709 private static void printUsage(final String errorMsg) { 710 if (errorMsg != null && errorMsg.length() > 0) { 711 System.err.println("ERROR: " + errorMsg); 712 System.err.println(); 713 } 714 System.err.println("Usage: SyncTable [options] <sourcehashdir> <sourcetable> <targettable>"); 715 System.err.println(); 716 System.err.println("Options:"); 717 718 System.err.println(" sourcezkcluster ZK cluster key of the source table"); 719 System.err.println(" (defaults to cluster in classpath's config)"); 720 System.err.println(" targetzkcluster ZK cluster key of the target table"); 721 System.err.println(" (defaults to cluster in classpath's config)"); 722 System.err.println(" dryrun if true, output counters but no writes"); 723 System.err.println(" (defaults to false)"); 724 System.err.println(" doDeletes if false, does not perform deletes"); 725 System.err.println(" (defaults to true)"); 726 System.err.println(" doPuts if false, does not perform puts "); 727 System.err.println(" (defaults to true)"); 728 System.err.println(); 729 System.err.println("Args:"); 730 System.err.println(" sourcehashdir path to HashTable output dir for source table"); 731 System.err.println(" (see org.apache.hadoop.hbase.mapreduce.HashTable)"); 732 System.err.println(" sourcetable Name of the source table to sync from"); 733 System.err.println(" targettable Name of the target table to sync to"); 734 System.err.println(); 735 System.err.println("Examples:"); 736 System.err.println(" For a dry run SyncTable of tableA from a remote source cluster"); 737 System.err.println(" to a local target cluster:"); 738 System.err.println(" $ hbase " + 739 "org.apache.hadoop.hbase.mapreduce.SyncTable --dryrun=true" 740 + " --sourcezkcluster=zk1.example.com,zk2.example.com,zk3.example.com:2181:/hbase" 741 + " hdfs://nn:9000/hashes/tableA tableA tableA"); 742 } 743 744 private boolean doCommandLine(final String[] args) { 745 if (args.length < NUM_ARGS) { 746 printUsage(null); 747 return false; 748 } 749 try { 750 sourceHashDir = new Path(args[args.length - 3]); 751 sourceTableName = args[args.length - 2]; 752 targetTableName = args[args.length - 1]; 753 754 for (int i = 0; i < args.length - NUM_ARGS; i++) { 755 String cmd = args[i]; 756 if (cmd.equals("-h") || cmd.startsWith("--h")) { 757 printUsage(null); 758 return false; 759 } 760 761 final String sourceZkClusterKey = "--sourcezkcluster="; 762 if (cmd.startsWith(sourceZkClusterKey)) { 763 sourceZkCluster = cmd.substring(sourceZkClusterKey.length()); 764 continue; 765 } 766 767 final String targetZkClusterKey = "--targetzkcluster="; 768 if (cmd.startsWith(targetZkClusterKey)) { 769 targetZkCluster = cmd.substring(targetZkClusterKey.length()); 770 continue; 771 } 772 773 final String dryRunKey = "--dryrun="; 774 if (cmd.startsWith(dryRunKey)) { 775 dryRun = Boolean.parseBoolean(cmd.substring(dryRunKey.length())); 776 continue; 777 } 778 779 final String doDeletesKey = "--doDeletes="; 780 if (cmd.startsWith(doDeletesKey)) { 781 doDeletes = Boolean.parseBoolean(cmd.substring(doDeletesKey.length())); 782 continue; 783 } 784 785 final String doPutsKey = "--doPuts="; 786 if (cmd.startsWith(doPutsKey)) { 787 doPuts = Boolean.parseBoolean(cmd.substring(doPutsKey.length())); 788 continue; 789 } 790 791 printUsage("Invalid argument '" + cmd + "'"); 792 return false; 793 } 794 795 796 } catch (Exception e) { 797 e.printStackTrace(); 798 printUsage("Can't start because " + e.getMessage()); 799 return false; 800 } 801 return true; 802 } 803 804 /** 805 * Main entry point. 806 */ 807 public static void main(String[] args) throws Exception { 808 int ret = ToolRunner.run(new SyncTable(HBaseConfiguration.create()), args); 809 System.exit(ret); 810 } 811 812 @Override 813 public int run(String[] args) throws Exception { 814 String[] otherArgs = new GenericOptionsParser(getConf(), args).getRemainingArgs(); 815 if (!doCommandLine(otherArgs)) { 816 return 1; 817 } 818 819 Job job = createSubmittableJob(otherArgs); 820 if (!job.waitForCompletion(true)) { 821 LOG.info("Map-reduce job failed!"); 822 return 1; 823 } 824 counters = job.getCounters(); 825 return 0; 826 } 827 828}