001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import static org.junit.Assert.assertEquals; 021 022import java.util.Arrays; 023 024import org.apache.commons.lang3.ArrayUtils; 025import org.apache.hadoop.fs.FileSystem; 026import org.apache.hadoop.fs.Path; 027import org.apache.hadoop.hbase.Cell; 028import org.apache.hadoop.hbase.CellUtil; 029import org.apache.hadoop.hbase.HBaseClassTestRule; 030import org.apache.hadoop.hbase.HBaseTestingUtility; 031import org.apache.hadoop.hbase.TableName; 032import org.apache.hadoop.hbase.client.Put; 033import org.apache.hadoop.hbase.client.Result; 034import org.apache.hadoop.hbase.client.ResultScanner; 035import org.apache.hadoop.hbase.client.Scan; 036import org.apache.hadoop.hbase.client.Table; 037import org.apache.hadoop.hbase.mapreduce.SyncTable.SyncMapper.Counter; 038import org.apache.hadoop.hbase.testclassification.LargeTests; 039import org.apache.hadoop.hbase.util.Bytes; 040import org.apache.hadoop.mapreduce.Counters; 041import org.junit.AfterClass; 042import org.junit.Assert; 043import org.junit.BeforeClass; 044import org.junit.ClassRule; 045import org.junit.Rule; 046import org.junit.Test; 047import org.junit.experimental.categories.Category; 048import org.junit.rules.TestName; 049import org.slf4j.Logger; 050import org.slf4j.LoggerFactory; 051 052import org.apache.hbase.thirdparty.com.google.common.base.Throwables; 053 054/** 055 * Basic test for the SyncTable M/R tool 056 */ 057@Category(LargeTests.class) 058public class TestSyncTable { 059 @ClassRule 060 public static final HBaseClassTestRule CLASS_RULE = 061 HBaseClassTestRule.forClass(TestSyncTable.class); 062 063 private static final Logger LOG = LoggerFactory.getLogger(TestSyncTable.class); 064 065 private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 066 067 @Rule 068 public TestName name = new TestName(); 069 070 @BeforeClass 071 public static void beforeClass() throws Exception { 072 TEST_UTIL.startMiniCluster(3); 073 } 074 075 @AfterClass 076 public static void afterClass() throws Exception { 077 TEST_UTIL.cleanupDataTestDirOnTestFS(); 078 TEST_UTIL.shutdownMiniCluster(); 079 } 080 081 private static byte[][] generateSplits(int numRows, int numRegions) { 082 byte[][] splitRows = new byte[numRegions-1][]; 083 for (int i = 1; i < numRegions; i++) { 084 splitRows[i-1] = Bytes.toBytes(numRows * i / numRegions); 085 } 086 return splitRows; 087 } 088 089 @Test 090 public void testSyncTable() throws Exception { 091 final TableName sourceTableName = TableName.valueOf(name.getMethodName() + "_source"); 092 final TableName targetTableName = TableName.valueOf(name.getMethodName() + "_target"); 093 Path testDir = TEST_UTIL.getDataTestDirOnTestFS("testSyncTable"); 094 095 writeTestData(sourceTableName, targetTableName); 096 hashSourceTable(sourceTableName, testDir); 097 Counters syncCounters = syncTables(sourceTableName, targetTableName, testDir); 098 assertEqualTables(90, sourceTableName, targetTableName, false); 099 100 assertEquals(60, syncCounters.findCounter(Counter.ROWSWITHDIFFS).getValue()); 101 assertEquals(10, syncCounters.findCounter(Counter.SOURCEMISSINGROWS).getValue()); 102 assertEquals(10, syncCounters.findCounter(Counter.TARGETMISSINGROWS).getValue()); 103 assertEquals(50, syncCounters.findCounter(Counter.SOURCEMISSINGCELLS).getValue()); 104 assertEquals(50, syncCounters.findCounter(Counter.TARGETMISSINGCELLS).getValue()); 105 assertEquals(20, syncCounters.findCounter(Counter.DIFFERENTCELLVALUES).getValue()); 106 107 TEST_UTIL.deleteTable(sourceTableName); 108 TEST_UTIL.deleteTable(targetTableName); 109 } 110 111 @Test 112 public void testSyncTableDoDeletesFalse() throws Exception { 113 final TableName sourceTableName = TableName.valueOf(name.getMethodName() + "_source"); 114 final TableName targetTableName = TableName.valueOf(name.getMethodName() + "_target"); 115 Path testDir = TEST_UTIL.getDataTestDirOnTestFS("testSyncTableDoDeletesFalse"); 116 117 writeTestData(sourceTableName, targetTableName); 118 hashSourceTable(sourceTableName, testDir); 119 Counters syncCounters = syncTables(sourceTableName, targetTableName, 120 testDir, "--doDeletes=false"); 121 assertTargetDoDeletesFalse(100, sourceTableName, targetTableName); 122 123 assertEquals(60, syncCounters.findCounter(Counter.ROWSWITHDIFFS).getValue()); 124 assertEquals(10, syncCounters.findCounter(Counter.SOURCEMISSINGROWS).getValue()); 125 assertEquals(10, syncCounters.findCounter(Counter.TARGETMISSINGROWS).getValue()); 126 assertEquals(50, syncCounters.findCounter(Counter.SOURCEMISSINGCELLS).getValue()); 127 assertEquals(50, syncCounters.findCounter(Counter.TARGETMISSINGCELLS).getValue()); 128 assertEquals(20, syncCounters.findCounter(Counter.DIFFERENTCELLVALUES).getValue()); 129 130 TEST_UTIL.deleteTable(sourceTableName); 131 TEST_UTIL.deleteTable(targetTableName); 132 } 133 134 @Test 135 public void testSyncTableDoPutsFalse() throws Exception { 136 final TableName sourceTableName = TableName.valueOf(name.getMethodName() + "_source"); 137 final TableName targetTableName = TableName.valueOf(name.getMethodName() + "_target"); 138 Path testDir = TEST_UTIL.getDataTestDirOnTestFS("testSyncTableDoPutsFalse"); 139 140 writeTestData(sourceTableName, targetTableName); 141 hashSourceTable(sourceTableName, testDir); 142 Counters syncCounters = syncTables(sourceTableName, targetTableName, 143 testDir, "--doPuts=false"); 144 assertTargetDoPutsFalse(70, sourceTableName, targetTableName); 145 146 assertEquals(60, syncCounters.findCounter(Counter.ROWSWITHDIFFS).getValue()); 147 assertEquals(10, syncCounters.findCounter(Counter.SOURCEMISSINGROWS).getValue()); 148 assertEquals(10, syncCounters.findCounter(Counter.TARGETMISSINGROWS).getValue()); 149 assertEquals(50, syncCounters.findCounter(Counter.SOURCEMISSINGCELLS).getValue()); 150 assertEquals(50, syncCounters.findCounter(Counter.TARGETMISSINGCELLS).getValue()); 151 assertEquals(20, syncCounters.findCounter(Counter.DIFFERENTCELLVALUES).getValue()); 152 153 TEST_UTIL.deleteTable(sourceTableName); 154 TEST_UTIL.deleteTable(targetTableName); 155 } 156 157 @Test 158 public void testSyncTableIgnoreTimestampsTrue() throws Exception { 159 final TableName sourceTableName = TableName.valueOf(name.getMethodName() + "_source"); 160 final TableName targetTableName = TableName.valueOf(name.getMethodName() + "_target"); 161 Path testDir = TEST_UTIL.getDataTestDirOnTestFS("testSyncTableIgnoreTimestampsTrue"); 162 long current = System.currentTimeMillis(); 163 writeTestData(sourceTableName, targetTableName, current - 1000, current); 164 hashSourceTable(sourceTableName, testDir, "--ignoreTimestamps=true"); 165 Counters syncCounters = syncTables(sourceTableName, targetTableName, 166 testDir, "--ignoreTimestamps=true"); 167 assertEqualTables(90, sourceTableName, targetTableName, true); 168 169 assertEquals(50, syncCounters.findCounter(Counter.ROWSWITHDIFFS).getValue()); 170 assertEquals(10, syncCounters.findCounter(Counter.SOURCEMISSINGROWS).getValue()); 171 assertEquals(10, syncCounters.findCounter(Counter.TARGETMISSINGROWS).getValue()); 172 assertEquals(30, syncCounters.findCounter(Counter.SOURCEMISSINGCELLS).getValue()); 173 assertEquals(30, syncCounters.findCounter(Counter.TARGETMISSINGCELLS).getValue()); 174 assertEquals(20, syncCounters.findCounter(Counter.DIFFERENTCELLVALUES).getValue()); 175 176 TEST_UTIL.deleteTable(sourceTableName); 177 TEST_UTIL.deleteTable(targetTableName); 178 } 179 180 private void assertEqualTables(int expectedRows, TableName sourceTableName, 181 TableName targetTableName, boolean ignoreTimestamps) throws Exception { 182 Table sourceTable = TEST_UTIL.getConnection().getTable(sourceTableName); 183 Table targetTable = TEST_UTIL.getConnection().getTable(targetTableName); 184 185 ResultScanner sourceScanner = sourceTable.getScanner(new Scan()); 186 ResultScanner targetScanner = targetTable.getScanner(new Scan()); 187 188 for (int i = 0; i < expectedRows; i++) { 189 Result sourceRow = sourceScanner.next(); 190 Result targetRow = targetScanner.next(); 191 192 LOG.debug("SOURCE row: " + (sourceRow == null ? "null" : Bytes.toInt(sourceRow.getRow())) 193 + " cells:" + sourceRow); 194 LOG.debug("TARGET row: " + (targetRow == null ? "null" : Bytes.toInt(targetRow.getRow())) 195 + " cells:" + targetRow); 196 197 if (sourceRow == null) { 198 Assert.fail("Expected " + expectedRows 199 + " source rows but only found " + i); 200 } 201 if (targetRow == null) { 202 Assert.fail("Expected " + expectedRows 203 + " target rows but only found " + i); 204 } 205 Cell[] sourceCells = sourceRow.rawCells(); 206 Cell[] targetCells = targetRow.rawCells(); 207 if (sourceCells.length != targetCells.length) { 208 LOG.debug("Source cells: " + Arrays.toString(sourceCells)); 209 LOG.debug("Target cells: " + Arrays.toString(targetCells)); 210 Assert.fail("Row " + Bytes.toInt(sourceRow.getRow()) 211 + " has " + sourceCells.length 212 + " cells in source table but " + targetCells.length 213 + " cells in target table"); 214 } 215 for (int j = 0; j < sourceCells.length; j++) { 216 Cell sourceCell = sourceCells[j]; 217 Cell targetCell = targetCells[j]; 218 try { 219 if (!CellUtil.matchingRows(sourceCell, targetCell)) { 220 Assert.fail("Rows don't match"); 221 } 222 if (!CellUtil.matchingFamily(sourceCell, targetCell)) { 223 Assert.fail("Families don't match"); 224 } 225 if (!CellUtil.matchingQualifier(sourceCell, targetCell)) { 226 Assert.fail("Qualifiers don't match"); 227 } 228 if (!ignoreTimestamps && !CellUtil.matchingTimestamp(sourceCell, targetCell)) { 229 Assert.fail("Timestamps don't match"); 230 } 231 if (!CellUtil.matchingValue(sourceCell, targetCell)) { 232 Assert.fail("Values don't match"); 233 } 234 } catch (Throwable t) { 235 LOG.debug("Source cell: " + sourceCell + " target cell: " + targetCell); 236 Throwables.propagate(t); 237 } 238 } 239 } 240 Result sourceRow = sourceScanner.next(); 241 if (sourceRow != null) { 242 Assert.fail("Source table has more than " + expectedRows 243 + " rows. Next row: " + Bytes.toInt(sourceRow.getRow())); 244 } 245 Result targetRow = targetScanner.next(); 246 if (targetRow != null) { 247 Assert.fail("Target table has more than " + expectedRows 248 + " rows. Next row: " + Bytes.toInt(targetRow.getRow())); 249 } 250 sourceScanner.close(); 251 targetScanner.close(); 252 sourceTable.close(); 253 targetTable.close(); 254 } 255 256 private void assertTargetDoDeletesFalse(int expectedRows, TableName sourceTableName, 257 TableName targetTableName) throws Exception { 258 Table sourceTable = TEST_UTIL.getConnection().getTable(sourceTableName); 259 Table targetTable = TEST_UTIL.getConnection().getTable(targetTableName); 260 261 ResultScanner sourceScanner = sourceTable.getScanner(new Scan()); 262 ResultScanner targetScanner = targetTable.getScanner(new Scan()); 263 Result targetRow = targetScanner.next(); 264 Result sourceRow = sourceScanner.next(); 265 int rowsCount = 0; 266 while (targetRow != null) { 267 rowsCount++; 268 //only compares values for existing rows, skipping rows existing on 269 //target only that were not deleted given --doDeletes=false 270 if (Bytes.toInt(sourceRow.getRow()) != Bytes.toInt(targetRow.getRow())) { 271 targetRow = targetScanner.next(); 272 continue; 273 } 274 275 LOG.debug("SOURCE row: " + (sourceRow == null ? "null" 276 : Bytes.toInt(sourceRow.getRow())) 277 + " cells:" + sourceRow); 278 LOG.debug("TARGET row: " + (targetRow == null ? "null" 279 : Bytes.toInt(targetRow.getRow())) 280 + " cells:" + targetRow); 281 282 Cell[] sourceCells = sourceRow.rawCells(); 283 Cell[] targetCells = targetRow.rawCells(); 284 int targetRowKey = Bytes.toInt(targetRow.getRow()); 285 if (targetRowKey >= 70 && targetRowKey < 80) { 286 if (sourceCells.length == targetCells.length) { 287 LOG.debug("Source cells: " + Arrays.toString(sourceCells)); 288 LOG.debug("Target cells: " + Arrays.toString(targetCells)); 289 Assert.fail("Row " + targetRowKey + " should have more cells in " 290 + "target than in source"); 291 } 292 293 } else { 294 if (sourceCells.length != targetCells.length) { 295 LOG.debug("Source cells: " + Arrays.toString(sourceCells)); 296 LOG.debug("Target cells: " + Arrays.toString(targetCells)); 297 Assert.fail("Row " + Bytes.toInt(sourceRow.getRow()) 298 + " has " + sourceCells.length 299 + " cells in source table but " + targetCells.length 300 + " cells in target table"); 301 } 302 } 303 for (int j = 0; j < sourceCells.length; j++) { 304 Cell sourceCell = sourceCells[j]; 305 Cell targetCell = targetCells[j]; 306 try { 307 if (!CellUtil.matchingRow(sourceCell, targetCell)) { 308 Assert.fail("Rows don't match"); 309 } 310 if (!CellUtil.matchingFamily(sourceCell, targetCell)) { 311 Assert.fail("Families don't match"); 312 } 313 if (!CellUtil.matchingQualifier(sourceCell, targetCell)) { 314 Assert.fail("Qualifiers don't match"); 315 } 316 if (targetRowKey < 80 && targetRowKey >= 90){ 317 if (!CellUtil.matchingTimestamp(sourceCell, targetCell)) { 318 Assert.fail("Timestamps don't match"); 319 } 320 } 321 if (!CellUtil.matchingValue(sourceCell, targetCell)) { 322 Assert.fail("Values don't match"); 323 } 324 } catch (Throwable t) { 325 LOG.debug("Source cell: " + sourceCell + " target cell: " 326 + targetCell); 327 Throwables.propagate(t); 328 } 329 } 330 targetRow = targetScanner.next(); 331 sourceRow = sourceScanner.next(); 332 } 333 assertEquals("Target expected rows does not match.",expectedRows, 334 rowsCount); 335 sourceScanner.close(); 336 targetScanner.close(); 337 sourceTable.close(); 338 targetTable.close(); 339 } 340 341 private void assertTargetDoPutsFalse(int expectedRows, TableName sourceTableName, 342 TableName targetTableName) throws Exception { 343 Table sourceTable = TEST_UTIL.getConnection().getTable(sourceTableName); 344 Table targetTable = TEST_UTIL.getConnection().getTable(targetTableName); 345 346 ResultScanner sourceScanner = sourceTable.getScanner(new Scan()); 347 ResultScanner targetScanner = targetTable.getScanner(new Scan()); 348 Result targetRow = targetScanner.next(); 349 Result sourceRow = sourceScanner.next(); 350 int rowsCount = 0; 351 352 while (targetRow!=null) { 353 //only compares values for existing rows, skipping rows existing on 354 //source only that were not added to target given --doPuts=false 355 if (Bytes.toInt(sourceRow.getRow()) != Bytes.toInt(targetRow.getRow())) { 356 sourceRow = sourceScanner.next(); 357 continue; 358 } 359 360 LOG.debug("SOURCE row: " + (sourceRow == null ? 361 "null" : 362 Bytes.toInt(sourceRow.getRow())) 363 + " cells:" + sourceRow); 364 LOG.debug("TARGET row: " + (targetRow == null ? 365 "null" : 366 Bytes.toInt(targetRow.getRow())) 367 + " cells:" + targetRow); 368 369 LOG.debug("rowsCount: " + rowsCount); 370 371 Cell[] sourceCells = sourceRow.rawCells(); 372 Cell[] targetCells = targetRow.rawCells(); 373 int targetRowKey = Bytes.toInt(targetRow.getRow()); 374 if (targetRowKey >= 40 && targetRowKey < 60) { 375 LOG.debug("Source cells: " + Arrays.toString(sourceCells)); 376 LOG.debug("Target cells: " + Arrays.toString(targetCells)); 377 Assert.fail("There shouldn't exist any rows between 40 and 60, since " 378 + "Puts are disabled and Deletes are enabled."); 379 } else if (targetRowKey >= 60 && targetRowKey < 70) { 380 if (sourceCells.length == targetCells.length) { 381 LOG.debug("Source cells: " + Arrays.toString(sourceCells)); 382 LOG.debug("Target cells: " + Arrays.toString(targetCells)); 383 Assert.fail("Row " + Bytes.toInt(sourceRow.getRow()) 384 + " shouldn't have same number of cells."); 385 } 386 } else if (targetRowKey >= 80 && targetRowKey < 90) { 387 LOG.debug("Source cells: " + Arrays.toString(sourceCells)); 388 LOG.debug("Target cells: " + Arrays.toString(targetCells)); 389 Assert.fail("There should be no rows between 80 and 90 on target, as " 390 + "these had different timestamps and should had been deleted."); 391 } else if (targetRowKey >= 90 && targetRowKey < 100) { 392 for (int j = 0; j < sourceCells.length; j++) { 393 Cell sourceCell = sourceCells[j]; 394 Cell targetCell = targetCells[j]; 395 if (CellUtil.matchingValue(sourceCell, targetCell)) { 396 Assert.fail("Cells values should not match for rows between " 397 + "90 and 100. Target row id: " + (Bytes.toInt(targetRow 398 .getRow()))); 399 } 400 } 401 } else { 402 for (int j = 0; j < sourceCells.length; j++) { 403 Cell sourceCell = sourceCells[j]; 404 Cell targetCell = targetCells[j]; 405 try { 406 if (!CellUtil.matchingRow(sourceCell, targetCell)) { 407 Assert.fail("Rows don't match"); 408 } 409 if (!CellUtil.matchingFamily(sourceCell, targetCell)) { 410 Assert.fail("Families don't match"); 411 } 412 if (!CellUtil.matchingQualifier(sourceCell, targetCell)) { 413 Assert.fail("Qualifiers don't match"); 414 } 415 if (!CellUtil.matchingTimestamp(sourceCell, targetCell)) { 416 Assert.fail("Timestamps don't match"); 417 } 418 if (!CellUtil.matchingValue(sourceCell, targetCell)) { 419 Assert.fail("Values don't match"); 420 } 421 } catch (Throwable t) { 422 LOG.debug( 423 "Source cell: " + sourceCell + " target cell: " + targetCell); 424 Throwables.propagate(t); 425 } 426 } 427 } 428 rowsCount++; 429 targetRow = targetScanner.next(); 430 sourceRow = sourceScanner.next(); 431 } 432 assertEquals("Target expected rows does not match.",expectedRows, 433 rowsCount); 434 sourceScanner.close(); 435 targetScanner.close(); 436 sourceTable.close(); 437 targetTable.close(); 438 } 439 440 private Counters syncTables(TableName sourceTableName, TableName targetTableName, 441 Path testDir, String... options) throws Exception { 442 SyncTable syncTable = new SyncTable(TEST_UTIL.getConfiguration()); 443 String[] args = Arrays.copyOf(options, options.length+3); 444 args[options.length] = testDir.toString(); 445 args[options.length+1] = sourceTableName.getNameAsString(); 446 args[options.length+2] = targetTableName.getNameAsString(); 447 int code = syncTable.run(args); 448 assertEquals("sync table job failed", 0, code); 449 450 LOG.info("Sync tables completed"); 451 return syncTable.counters; 452 } 453 454 private void hashSourceTable(TableName sourceTableName, Path testDir, String... options) 455 throws Exception { 456 int numHashFiles = 3; 457 long batchSize = 100; // should be 2 batches per region 458 int scanBatch = 1; 459 HashTable hashTable = new HashTable(TEST_UTIL.getConfiguration()); 460 String[] args = Arrays.copyOf(options, options.length+5); 461 args[options.length] = "--batchsize=" + batchSize; 462 args[options.length + 1] = "--numhashfiles=" + numHashFiles; 463 args[options.length + 2] = "--scanbatch=" + scanBatch; 464 args[options.length + 3] = sourceTableName.getNameAsString(); 465 args[options.length + 4] = testDir.toString(); 466 int code = hashTable.run(args); 467 assertEquals("hash table job failed", 0, code); 468 469 FileSystem fs = TEST_UTIL.getTestFileSystem(); 470 471 HashTable.TableHash tableHash = HashTable.TableHash.read(fs.getConf(), testDir); 472 assertEquals(sourceTableName.getNameAsString(), tableHash.tableName); 473 assertEquals(batchSize, tableHash.batchSize); 474 assertEquals(numHashFiles, tableHash.numHashFiles); 475 assertEquals(numHashFiles - 1, tableHash.partitions.size()); 476 477 LOG.info("Hash table completed"); 478 } 479 480 private void writeTestData(TableName sourceTableName, TableName targetTableName, 481 long... timestamps) throws Exception { 482 final byte[] family = Bytes.toBytes("family"); 483 final byte[] column1 = Bytes.toBytes("c1"); 484 final byte[] column2 = Bytes.toBytes("c2"); 485 final byte[] value1 = Bytes.toBytes("val1"); 486 final byte[] value2 = Bytes.toBytes("val2"); 487 final byte[] value3 = Bytes.toBytes("val3"); 488 489 int numRows = 100; 490 int sourceRegions = 10; 491 int targetRegions = 6; 492 if (ArrayUtils.isEmpty(timestamps)) { 493 long current = System.currentTimeMillis(); 494 timestamps = new long[]{current,current}; 495 } 496 497 Table sourceTable = TEST_UTIL.createTable(sourceTableName, 498 family, generateSplits(numRows, sourceRegions)); 499 500 Table targetTable = TEST_UTIL.createTable(targetTableName, 501 family, generateSplits(numRows, targetRegions)); 502 503 int rowIndex = 0; 504 // a bunch of identical rows 505 for (; rowIndex < 40; rowIndex++) { 506 Put sourcePut = new Put(Bytes.toBytes(rowIndex)); 507 sourcePut.addColumn(family, column1, timestamps[0], value1); 508 sourcePut.addColumn(family, column2, timestamps[0], value2); 509 sourceTable.put(sourcePut); 510 511 Put targetPut = new Put(Bytes.toBytes(rowIndex)); 512 targetPut.addColumn(family, column1, timestamps[1], value1); 513 targetPut.addColumn(family, column2, timestamps[1], value2); 514 targetTable.put(targetPut); 515 } 516 // some rows only in the source table 517 // ROWSWITHDIFFS: 10 518 // TARGETMISSINGROWS: 10 519 // TARGETMISSINGCELLS: 20 520 for (; rowIndex < 50; rowIndex++) { 521 Put put = new Put(Bytes.toBytes(rowIndex)); 522 put.addColumn(family, column1, timestamps[0], value1); 523 put.addColumn(family, column2, timestamps[0], value2); 524 sourceTable.put(put); 525 } 526 // some rows only in the target table 527 // ROWSWITHDIFFS: 10 528 // SOURCEMISSINGROWS: 10 529 // SOURCEMISSINGCELLS: 20 530 for (; rowIndex < 60; rowIndex++) { 531 Put put = new Put(Bytes.toBytes(rowIndex)); 532 put.addColumn(family, column1, timestamps[1], value1); 533 put.addColumn(family, column2, timestamps[1], value2); 534 targetTable.put(put); 535 } 536 // some rows with 1 missing cell in target table 537 // ROWSWITHDIFFS: 10 538 // TARGETMISSINGCELLS: 10 539 for (; rowIndex < 70; rowIndex++) { 540 Put sourcePut = new Put(Bytes.toBytes(rowIndex)); 541 sourcePut.addColumn(family, column1, timestamps[0], value1); 542 sourcePut.addColumn(family, column2, timestamps[0], value2); 543 sourceTable.put(sourcePut); 544 545 Put targetPut = new Put(Bytes.toBytes(rowIndex)); 546 targetPut.addColumn(family, column1, timestamps[1], value1); 547 targetTable.put(targetPut); 548 } 549 // some rows with 1 missing cell in source table 550 // ROWSWITHDIFFS: 10 551 // SOURCEMISSINGCELLS: 10 552 for (; rowIndex < 80; rowIndex++) { 553 Put sourcePut = new Put(Bytes.toBytes(rowIndex)); 554 sourcePut.addColumn(family, column1, timestamps[0], value1); 555 sourceTable.put(sourcePut); 556 557 Put targetPut = new Put(Bytes.toBytes(rowIndex)); 558 targetPut.addColumn(family, column1, timestamps[1], value1); 559 targetPut.addColumn(family, column2, timestamps[1], value2); 560 targetTable.put(targetPut); 561 } 562 // some rows differing only in timestamp 563 // ROWSWITHDIFFS: 10 564 // SOURCEMISSINGCELLS: 20 565 // TARGETMISSINGCELLS: 20 566 for (; rowIndex < 90; rowIndex++) { 567 Put sourcePut = new Put(Bytes.toBytes(rowIndex)); 568 sourcePut.addColumn(family, column1, timestamps[0], column1); 569 sourcePut.addColumn(family, column2, timestamps[0], value2); 570 sourceTable.put(sourcePut); 571 572 Put targetPut = new Put(Bytes.toBytes(rowIndex)); 573 targetPut.addColumn(family, column1, timestamps[1]+1, column1); 574 targetPut.addColumn(family, column2, timestamps[1]-1, value2); 575 targetTable.put(targetPut); 576 } 577 // some rows with different values 578 // ROWSWITHDIFFS: 10 579 // DIFFERENTCELLVALUES: 20 580 for (; rowIndex < numRows; rowIndex++) { 581 Put sourcePut = new Put(Bytes.toBytes(rowIndex)); 582 sourcePut.addColumn(family, column1, timestamps[0], value1); 583 sourcePut.addColumn(family, column2, timestamps[0], value2); 584 sourceTable.put(sourcePut); 585 586 Put targetPut = new Put(Bytes.toBytes(rowIndex)); 587 targetPut.addColumn(family, column1, timestamps[1], value3); 588 targetPut.addColumn(family, column2, timestamps[1], value3); 589 targetTable.put(targetPut); 590 } 591 592 sourceTable.close(); 593 targetTable.close(); 594 } 595}