001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021import static org.junit.jupiter.api.Assertions.assertTrue; 022import static org.junit.jupiter.api.Assertions.fail; 023 024import java.util.Arrays; 025import java.util.function.BooleanSupplier; 026import org.apache.commons.lang3.ArrayUtils; 027import org.apache.hadoop.conf.Configuration; 028import org.apache.hadoop.fs.FileSystem; 029import org.apache.hadoop.fs.Path; 030import org.apache.hadoop.hbase.Cell; 031import org.apache.hadoop.hbase.CellUtil; 032import org.apache.hadoop.hbase.HBaseTestingUtil; 033import org.apache.hadoop.hbase.TableName; 034import org.apache.hadoop.hbase.client.Put; 035import org.apache.hadoop.hbase.client.Result; 036import org.apache.hadoop.hbase.client.ResultScanner; 037import org.apache.hadoop.hbase.client.Scan; 038import org.apache.hadoop.hbase.client.Table; 039import org.apache.hadoop.hbase.mapreduce.SyncTable.SyncMapper.Counter; 040import org.apache.hadoop.hbase.testclassification.LargeTests; 041import org.apache.hadoop.hbase.testclassification.MapReduceTests; 042import org.apache.hadoop.hbase.util.Bytes; 043import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 044import org.apache.hadoop.mapreduce.Counters; 045import org.junit.jupiter.api.AfterAll; 046import org.junit.jupiter.api.BeforeAll; 047import org.junit.jupiter.api.Tag; 048import org.junit.jupiter.api.Test; 049import org.junit.jupiter.api.TestInfo; 050import org.slf4j.Logger; 051import org.slf4j.LoggerFactory; 052 053/** 054 * Basic test for the SyncTable M/R tool 055 */ 056@Tag(MapReduceTests.TAG) 057@Tag(LargeTests.TAG) 058public class TestSyncTable { 059 060 private static final Logger LOG = LoggerFactory.getLogger(TestSyncTable.class); 061 062 private static final HBaseTestingUtil UTIL1 = new HBaseTestingUtil(); 063 064 private static final HBaseTestingUtil UTIL2 = new HBaseTestingUtil(); 065 066 @BeforeAll 067 public static void beforeClass() throws Exception { 068 UTIL1.startMiniCluster(3); 069 UTIL2.startMiniCluster(3); 070 } 071 072 @AfterAll 073 public static void afterClass() throws Exception { 074 UTIL2.shutdownMiniCluster(); 075 UTIL1.shutdownMiniCluster(); 076 } 077 078 private static byte[][] generateSplits(int numRows, int numRegions) { 079 byte[][] splitRows = new byte[numRegions - 1][]; 080 for (int i = 1; i < numRegions; i++) { 081 splitRows[i - 1] = Bytes.toBytes(numRows * i / numRegions); 082 } 083 return splitRows; 084 } 085 086 private void testSyncTable(TestInfo testInfo, HBaseTestingUtil source, HBaseTestingUtil target, 087 String... options) throws Exception { 088 final TableName sourceTableName = 089 TableName.valueOf(testInfo.getTestMethod().get().getName() + "_source"); 090 final TableName targetTableName = 091 TableName.valueOf(testInfo.getTestMethod().get().getName() + "_target"); 092 Path testDir = source.getDataTestDirOnTestFS(testInfo.getTestMethod().get().getName()); 093 094 writeTestData(source, sourceTableName, target, targetTableName); 095 hashSourceTable(source, sourceTableName, testDir); 096 Counters syncCounters = 097 syncTables(target.getConfiguration(), sourceTableName, targetTableName, testDir, options); 098 assertEqualTables(90, source, sourceTableName, target, targetTableName, false); 099 100 assertEquals(60, syncCounters.findCounter(Counter.ROWSWITHDIFFS).getValue()); 101 assertEquals(10, syncCounters.findCounter(Counter.SOURCEMISSINGROWS).getValue()); 102 assertEquals(10, syncCounters.findCounter(Counter.TARGETMISSINGROWS).getValue()); 103 assertEquals(50, syncCounters.findCounter(Counter.SOURCEMISSINGCELLS).getValue()); 104 assertEquals(50, syncCounters.findCounter(Counter.TARGETMISSINGCELLS).getValue()); 105 assertEquals(20, syncCounters.findCounter(Counter.DIFFERENTCELLVALUES).getValue()); 106 107 source.deleteTable(sourceTableName); 108 target.deleteTable(targetTableName); 109 } 110 111 @Test 112 public void testSyncTable(TestInfo testInfo) throws Exception { 113 testSyncTable(testInfo, UTIL1, UTIL1); 114 } 115 116 @Test 117 public void testSyncTableToPeerCluster(TestInfo testInfo) throws Exception { 118 testSyncTable(testInfo, UTIL1, UTIL2, "--sourceuri=" + UTIL1.getRpcConnnectionURI()); 119 } 120 121 @Test 122 public void testSyncTableFromSourceToPeerCluster(TestInfo testInfo) throws Exception { 123 testSyncTable(testInfo, UTIL2, UTIL1, "--sourceuri=" + UTIL2.getRpcConnnectionURI(), 124 "--targeturi=" + UTIL1.getZkConnectionURI()); 125 } 126 127 @Test 128 public void testSyncTableFromSourceToPeerClusterWithClusterKey(TestInfo testInfo) 129 throws Exception { 130 testSyncTable(testInfo, UTIL2, UTIL1, "--sourcezkcluster=" + UTIL2.getClusterKey(), 131 "--targetzkcluster=" + UTIL1.getClusterKey()); 132 } 133 134 @Test 135 public void testSyncTableDoDeletesFalse(TestInfo testInfo) throws Exception { 136 final TableName sourceTableName = 137 TableName.valueOf(testInfo.getTestMethod().get().getName() + "_source"); 138 final TableName targetTableName = 139 TableName.valueOf(testInfo.getTestMethod().get().getName() + "_target"); 140 Path testDir = UTIL1.getDataTestDirOnTestFS(testInfo.getTestMethod().get().getName()); 141 142 writeTestData(UTIL1, sourceTableName, UTIL1, targetTableName); 143 hashSourceTable(UTIL1, sourceTableName, testDir); 144 Counters syncCounters = syncTables(UTIL1.getConfiguration(), sourceTableName, targetTableName, 145 testDir, "--doDeletes=false"); 146 assertTargetDoDeletesFalse(100, UTIL1, sourceTableName, UTIL1, targetTableName); 147 148 assertEquals(60, syncCounters.findCounter(Counter.ROWSWITHDIFFS).getValue()); 149 assertEquals(10, syncCounters.findCounter(Counter.SOURCEMISSINGROWS).getValue()); 150 assertEquals(10, syncCounters.findCounter(Counter.TARGETMISSINGROWS).getValue()); 151 assertEquals(50, syncCounters.findCounter(Counter.SOURCEMISSINGCELLS).getValue()); 152 assertEquals(50, syncCounters.findCounter(Counter.TARGETMISSINGCELLS).getValue()); 153 assertEquals(20, syncCounters.findCounter(Counter.DIFFERENTCELLVALUES).getValue()); 154 155 UTIL1.deleteTable(sourceTableName); 156 UTIL1.deleteTable(targetTableName); 157 } 158 159 @Test 160 public void testSyncTableDoPutsFalse(TestInfo testInfo) throws Exception { 161 final TableName sourceTableName = 162 TableName.valueOf(testInfo.getTestMethod().get().getName() + "_source"); 163 final TableName targetTableName = 164 TableName.valueOf(testInfo.getTestMethod().get().getName() + "_target"); 165 Path testDir = UTIL2.getDataTestDirOnTestFS(testInfo.getTestMethod().get().getName()); 166 167 writeTestData(UTIL2, sourceTableName, UTIL2, targetTableName); 168 hashSourceTable(UTIL2, sourceTableName, testDir); 169 Counters syncCounters = syncTables(UTIL2.getConfiguration(), sourceTableName, targetTableName, 170 testDir, "--doPuts=false"); 171 assertTargetDoPutsFalse(70, UTIL2, sourceTableName, UTIL2, targetTableName); 172 173 assertEquals(60, syncCounters.findCounter(Counter.ROWSWITHDIFFS).getValue()); 174 assertEquals(10, syncCounters.findCounter(Counter.SOURCEMISSINGROWS).getValue()); 175 assertEquals(10, syncCounters.findCounter(Counter.TARGETMISSINGROWS).getValue()); 176 assertEquals(50, syncCounters.findCounter(Counter.SOURCEMISSINGCELLS).getValue()); 177 assertEquals(50, syncCounters.findCounter(Counter.TARGETMISSINGCELLS).getValue()); 178 assertEquals(20, syncCounters.findCounter(Counter.DIFFERENTCELLVALUES).getValue()); 179 180 UTIL2.deleteTable(sourceTableName); 181 UTIL2.deleteTable(targetTableName); 182 } 183 184 @Test 185 public void testSyncTableIgnoreTimestampsTrue(TestInfo testInfo) throws Exception { 186 final TableName sourceTableName = 187 TableName.valueOf(testInfo.getTestMethod().get().getName() + "_source"); 188 final TableName targetTableName = 189 TableName.valueOf(testInfo.getTestMethod().get().getName() + "_target"); 190 Path testDir = UTIL1.getDataTestDirOnTestFS(testInfo.getTestMethod().get().getName()); 191 long current = EnvironmentEdgeManager.currentTime(); 192 writeTestData(UTIL1, sourceTableName, UTIL2, targetTableName, current - 1000, current); 193 hashSourceTable(UTIL1, sourceTableName, testDir, "--ignoreTimestamps=true"); 194 Counters syncCounters = syncTables(UTIL2.getConfiguration(), sourceTableName, targetTableName, 195 testDir, "--ignoreTimestamps=true", "--sourceuri=" + UTIL1.getRpcConnnectionURI()); 196 assertEqualTables(90, UTIL1, sourceTableName, UTIL2, targetTableName, true); 197 198 assertEquals(50, syncCounters.findCounter(Counter.ROWSWITHDIFFS).getValue()); 199 assertEquals(10, syncCounters.findCounter(Counter.SOURCEMISSINGROWS).getValue()); 200 assertEquals(10, syncCounters.findCounter(Counter.TARGETMISSINGROWS).getValue()); 201 assertEquals(30, syncCounters.findCounter(Counter.SOURCEMISSINGCELLS).getValue()); 202 assertEquals(30, syncCounters.findCounter(Counter.TARGETMISSINGCELLS).getValue()); 203 assertEquals(20, syncCounters.findCounter(Counter.DIFFERENTCELLVALUES).getValue()); 204 205 UTIL1.deleteTable(sourceTableName); 206 UTIL2.deleteTable(targetTableName); 207 } 208 209 private void assertCellEquals(Cell sourceCell, Cell targetCell, BooleanSupplier checkTimestamp) { 210 assertTrue(CellUtil.matchingRows(sourceCell, targetCell), 211 "Rows don't match, source: " + sourceCell + ", target: " + targetCell); 212 assertTrue(CellUtil.matchingFamily(sourceCell, targetCell), 213 "Families don't match, source: " + sourceCell + ", target: " + targetCell); 214 assertTrue(CellUtil.matchingQualifier(sourceCell, targetCell), 215 "Qualifiers don't match, source: " + sourceCell + ", target: " + targetCell); 216 if (checkTimestamp.getAsBoolean()) { 217 assertTrue(CellUtil.matchingTimestamp(sourceCell, targetCell), 218 "Timestamps don't match, source: " + sourceCell + ", target: " + targetCell); 219 } 220 assertTrue(CellUtil.matchingValue(sourceCell, targetCell), 221 "Values don't match, source: " + sourceCell + ", target: " + targetCell); 222 } 223 224 private void assertEqualTables(int expectedRows, HBaseTestingUtil sourceCluster, 225 TableName sourceTableName, HBaseTestingUtil targetCluster, TableName targetTableName, 226 boolean ignoreTimestamps) throws Exception { 227 try (Table sourceTable = sourceCluster.getConnection().getTable(sourceTableName); 228 Table targetTable = targetCluster.getConnection().getTable(targetTableName); 229 ResultScanner sourceScanner = sourceTable.getScanner(new Scan()); 230 ResultScanner targetScanner = targetTable.getScanner(new Scan())) { 231 for (int i = 0; i < expectedRows; i++) { 232 Result sourceRow = sourceScanner.next(); 233 Result targetRow = targetScanner.next(); 234 235 LOG.debug("SOURCE row: " + (sourceRow == null ? "null" : Bytes.toInt(sourceRow.getRow())) 236 + " cells:" + sourceRow); 237 LOG.debug("TARGET row: " + (targetRow == null ? "null" : Bytes.toInt(targetRow.getRow())) 238 + " cells:" + targetRow); 239 240 if (sourceRow == null) { 241 fail("Expected " + expectedRows + " source rows but only found " + i); 242 } 243 if (targetRow == null) { 244 fail("Expected " + expectedRows + " target rows but only found " + i); 245 } 246 Cell[] sourceCells = sourceRow.rawCells(); 247 Cell[] targetCells = targetRow.rawCells(); 248 if (sourceCells.length != targetCells.length) { 249 LOG.debug("Source cells: " + Arrays.toString(sourceCells)); 250 LOG.debug("Target cells: " + Arrays.toString(targetCells)); 251 fail("Row " + Bytes.toInt(sourceRow.getRow()) + " has " + sourceCells.length 252 + " cells in source table but " + targetCells.length + " cells in target table"); 253 } 254 for (int j = 0; j < sourceCells.length; j++) { 255 Cell sourceCell = sourceCells[j]; 256 Cell targetCell = targetCells[j]; 257 assertCellEquals(sourceCell, targetCell, () -> !ignoreTimestamps); 258 } 259 } 260 Result sourceRow = sourceScanner.next(); 261 if (sourceRow != null) { 262 fail("Source table has more than " + expectedRows + " rows. Next row: " 263 + Bytes.toInt(sourceRow.getRow())); 264 } 265 Result targetRow = targetScanner.next(); 266 if (targetRow != null) { 267 fail("Target table has more than " + expectedRows + " rows. Next row: " 268 + Bytes.toInt(targetRow.getRow())); 269 } 270 } 271 } 272 273 private void assertTargetDoDeletesFalse(int expectedRows, HBaseTestingUtil sourceCluster, 274 TableName sourceTableName, HBaseTestingUtil targetCluster, TableName targetTableName) 275 throws Exception { 276 try (Table sourceTable = sourceCluster.getConnection().getTable(sourceTableName); 277 Table targetTable = targetCluster.getConnection().getTable(targetTableName); 278 279 ResultScanner sourceScanner = sourceTable.getScanner(new Scan()); 280 ResultScanner targetScanner = targetTable.getScanner(new Scan())) { 281 Result targetRow = targetScanner.next(); 282 Result sourceRow = sourceScanner.next(); 283 int rowsCount = 0; 284 while (targetRow != null) { 285 rowsCount++; 286 // only compares values for existing rows, skipping rows existing on 287 // target only that were not deleted given --doDeletes=false 288 if (Bytes.toInt(sourceRow.getRow()) != Bytes.toInt(targetRow.getRow())) { 289 targetRow = targetScanner.next(); 290 continue; 291 } 292 293 LOG.debug("SOURCE row: " + (sourceRow == null ? "null" : Bytes.toInt(sourceRow.getRow())) 294 + " cells:" + sourceRow); 295 LOG.debug("TARGET row: " + (targetRow == null ? "null" : Bytes.toInt(targetRow.getRow())) 296 + " cells:" + targetRow); 297 298 Cell[] sourceCells = sourceRow.rawCells(); 299 Cell[] targetCells = targetRow.rawCells(); 300 int targetRowKey = Bytes.toInt(targetRow.getRow()); 301 if (targetRowKey >= 70 && targetRowKey < 80) { 302 if (sourceCells.length == targetCells.length) { 303 LOG.debug("Source cells: " + Arrays.toString(sourceCells)); 304 LOG.debug("Target cells: " + Arrays.toString(targetCells)); 305 fail("Row " + targetRowKey + " should have more cells in " + "target than in source"); 306 } 307 308 } else { 309 if (sourceCells.length != targetCells.length) { 310 LOG.debug("Source cells: " + Arrays.toString(sourceCells)); 311 LOG.debug("Target cells: " + Arrays.toString(targetCells)); 312 fail("Row " + Bytes.toInt(sourceRow.getRow()) + " has " + sourceCells.length 313 + " cells in source table but " + targetCells.length + " cells in target table"); 314 } 315 } 316 for (int j = 0; j < sourceCells.length; j++) { 317 Cell sourceCell = sourceCells[j]; 318 Cell targetCell = targetCells[j]; 319 assertCellEquals(sourceCell, targetCell, () -> targetRowKey < 80 && targetRowKey >= 90); 320 } 321 targetRow = targetScanner.next(); 322 sourceRow = sourceScanner.next(); 323 } 324 assertEquals(expectedRows, rowsCount, "Target expected rows does not match."); 325 } 326 } 327 328 private void assertTargetDoPutsFalse(int expectedRows, HBaseTestingUtil sourceCluster, 329 TableName sourceTableName, HBaseTestingUtil targetCluster, TableName targetTableName) 330 throws Exception { 331 try (Table sourceTable = sourceCluster.getConnection().getTable(sourceTableName); 332 Table targetTable = targetCluster.getConnection().getTable(targetTableName); 333 ResultScanner sourceScanner = sourceTable.getScanner(new Scan()); 334 ResultScanner targetScanner = targetTable.getScanner(new Scan())) { 335 Result targetRow = targetScanner.next(); 336 Result sourceRow = sourceScanner.next(); 337 int rowsCount = 0; 338 339 while (targetRow != null) { 340 // only compares values for existing rows, skipping rows existing on 341 // source only that were not added to target given --doPuts=false 342 if (Bytes.toInt(sourceRow.getRow()) != Bytes.toInt(targetRow.getRow())) { 343 sourceRow = sourceScanner.next(); 344 continue; 345 } 346 347 LOG.debug("SOURCE row: " + (sourceRow == null ? "null" : Bytes.toInt(sourceRow.getRow())) 348 + " cells:" + sourceRow); 349 LOG.debug("TARGET row: " + (targetRow == null ? "null" : Bytes.toInt(targetRow.getRow())) 350 + " cells:" + targetRow); 351 352 LOG.debug("rowsCount: " + rowsCount); 353 354 Cell[] sourceCells = sourceRow.rawCells(); 355 Cell[] targetCells = targetRow.rawCells(); 356 int targetRowKey = Bytes.toInt(targetRow.getRow()); 357 if (targetRowKey >= 40 && targetRowKey < 60) { 358 LOG.debug("Source cells: " + Arrays.toString(sourceCells)); 359 LOG.debug("Target cells: " + Arrays.toString(targetCells)); 360 fail("There shouldn't exist any rows between 40 and 60, since " 361 + "Puts are disabled and Deletes are enabled."); 362 } else if (targetRowKey >= 60 && targetRowKey < 70) { 363 if (sourceCells.length == targetCells.length) { 364 LOG.debug("Source cells: " + Arrays.toString(sourceCells)); 365 LOG.debug("Target cells: " + Arrays.toString(targetCells)); 366 fail( 367 "Row " + Bytes.toInt(sourceRow.getRow()) + " shouldn't have same number of cells."); 368 } 369 } else if (targetRowKey >= 80 && targetRowKey < 90) { 370 LOG.debug("Source cells: " + Arrays.toString(sourceCells)); 371 LOG.debug("Target cells: " + Arrays.toString(targetCells)); 372 fail("There should be no rows between 80 and 90 on target, as " 373 + "these had different timestamps and should had been deleted."); 374 } else if (targetRowKey >= 90 && targetRowKey < 100) { 375 for (int j = 0; j < sourceCells.length; j++) { 376 Cell sourceCell = sourceCells[j]; 377 Cell targetCell = targetCells[j]; 378 if (CellUtil.matchingValue(sourceCell, targetCell)) { 379 fail("Cells values should not match for rows between " + "90 and 100. Target row id: " 380 + Bytes.toInt(targetRow.getRow())); 381 } 382 } 383 } else { 384 for (int j = 0; j < sourceCells.length; j++) { 385 Cell sourceCell = sourceCells[j]; 386 Cell targetCell = targetCells[j]; 387 assertCellEquals(sourceCell, targetCell, () -> true); 388 } 389 } 390 rowsCount++; 391 targetRow = targetScanner.next(); 392 sourceRow = sourceScanner.next(); 393 } 394 assertEquals(expectedRows, rowsCount, "Target expected rows does not match."); 395 } 396 } 397 398 private Counters syncTables(Configuration conf, TableName sourceTableName, 399 TableName targetTableName, Path testDir, String... options) throws Exception { 400 SyncTable syncTable = new SyncTable(conf); 401 String[] args = Arrays.copyOf(options, options.length + 3); 402 args[options.length] = testDir.toString(); 403 args[options.length + 1] = sourceTableName.getNameAsString(); 404 args[options.length + 2] = targetTableName.getNameAsString(); 405 int code = syncTable.run(args); 406 assertEquals(0, code, "sync table job failed"); 407 408 LOG.info("Sync tables completed"); 409 return syncTable.counters; 410 } 411 412 private void hashSourceTable(HBaseTestingUtil sourceCluster, TableName sourceTableName, 413 Path testDir, String... options) throws Exception { 414 int numHashFiles = 3; 415 long batchSize = 100; // should be 2 batches per region 416 int scanBatch = 1; 417 HashTable hashTable = new HashTable(sourceCluster.getConfiguration()); 418 String[] args = Arrays.copyOf(options, options.length + 5); 419 args[options.length] = "--batchsize=" + batchSize; 420 args[options.length + 1] = "--numhashfiles=" + numHashFiles; 421 args[options.length + 2] = "--scanbatch=" + scanBatch; 422 args[options.length + 3] = sourceTableName.getNameAsString(); 423 args[options.length + 4] = testDir.toString(); 424 int code = hashTable.run(args); 425 assertEquals(0, code, "hash table job failed"); 426 427 FileSystem fs = sourceCluster.getTestFileSystem(); 428 429 HashTable.TableHash tableHash = HashTable.TableHash.read(fs.getConf(), testDir); 430 assertEquals(sourceTableName.getNameAsString(), tableHash.tableName); 431 assertEquals(batchSize, tableHash.batchSize); 432 assertEquals(numHashFiles, tableHash.numHashFiles); 433 assertEquals(numHashFiles - 1, tableHash.partitions.size()); 434 435 LOG.info("Hash table completed"); 436 } 437 438 private void writeTestData(HBaseTestingUtil sourceCluster, TableName sourceTableName, 439 HBaseTestingUtil targetCluster, TableName targetTableName, long... timestamps) 440 throws Exception { 441 final byte[] family = Bytes.toBytes("family"); 442 final byte[] column1 = Bytes.toBytes("c1"); 443 final byte[] column2 = Bytes.toBytes("c2"); 444 final byte[] value1 = Bytes.toBytes("val1"); 445 final byte[] value2 = Bytes.toBytes("val2"); 446 final byte[] value3 = Bytes.toBytes("val3"); 447 448 int numRows = 100; 449 int sourceRegions = 10; 450 int targetRegions = 6; 451 if (ArrayUtils.isEmpty(timestamps)) { 452 long current = EnvironmentEdgeManager.currentTime(); 453 timestamps = new long[] { current, current }; 454 } 455 456 try ( 457 Table sourceTable = 458 sourceCluster.createTable(sourceTableName, family, generateSplits(numRows, sourceRegions)); 459 Table targetTable = targetCluster.createTable(targetTableName, family, 460 generateSplits(numRows, targetRegions))) { 461 462 int rowIndex = 0; 463 // a bunch of identical rows 464 for (; rowIndex < 40; rowIndex++) { 465 Put sourcePut = new Put(Bytes.toBytes(rowIndex)); 466 sourcePut.addColumn(family, column1, timestamps[0], value1); 467 sourcePut.addColumn(family, column2, timestamps[0], value2); 468 sourceTable.put(sourcePut); 469 470 Put targetPut = new Put(Bytes.toBytes(rowIndex)); 471 targetPut.addColumn(family, column1, timestamps[1], value1); 472 targetPut.addColumn(family, column2, timestamps[1], value2); 473 targetTable.put(targetPut); 474 } 475 // some rows only in the source table 476 // ROWSWITHDIFFS: 10 477 // TARGETMISSINGROWS: 10 478 // TARGETMISSINGCELLS: 20 479 for (; rowIndex < 50; rowIndex++) { 480 Put put = new Put(Bytes.toBytes(rowIndex)); 481 put.addColumn(family, column1, timestamps[0], value1); 482 put.addColumn(family, column2, timestamps[0], value2); 483 sourceTable.put(put); 484 } 485 // some rows only in the target table 486 // ROWSWITHDIFFS: 10 487 // SOURCEMISSINGROWS: 10 488 // SOURCEMISSINGCELLS: 20 489 for (; rowIndex < 60; rowIndex++) { 490 Put put = new Put(Bytes.toBytes(rowIndex)); 491 put.addColumn(family, column1, timestamps[1], value1); 492 put.addColumn(family, column2, timestamps[1], value2); 493 targetTable.put(put); 494 } 495 // some rows with 1 missing cell in target table 496 // ROWSWITHDIFFS: 10 497 // TARGETMISSINGCELLS: 10 498 for (; rowIndex < 70; rowIndex++) { 499 Put sourcePut = new Put(Bytes.toBytes(rowIndex)); 500 sourcePut.addColumn(family, column1, timestamps[0], value1); 501 sourcePut.addColumn(family, column2, timestamps[0], value2); 502 sourceTable.put(sourcePut); 503 504 Put targetPut = new Put(Bytes.toBytes(rowIndex)); 505 targetPut.addColumn(family, column1, timestamps[1], value1); 506 targetTable.put(targetPut); 507 } 508 // some rows with 1 missing cell in source table 509 // ROWSWITHDIFFS: 10 510 // SOURCEMISSINGCELLS: 10 511 for (; rowIndex < 80; rowIndex++) { 512 Put sourcePut = new Put(Bytes.toBytes(rowIndex)); 513 sourcePut.addColumn(family, column1, timestamps[0], value1); 514 sourceTable.put(sourcePut); 515 516 Put targetPut = new Put(Bytes.toBytes(rowIndex)); 517 targetPut.addColumn(family, column1, timestamps[1], value1); 518 targetPut.addColumn(family, column2, timestamps[1], value2); 519 targetTable.put(targetPut); 520 } 521 // some rows differing only in timestamp 522 // ROWSWITHDIFFS: 10 523 // SOURCEMISSINGCELLS: 20 524 // TARGETMISSINGCELLS: 20 525 for (; rowIndex < 90; rowIndex++) { 526 Put sourcePut = new Put(Bytes.toBytes(rowIndex)); 527 sourcePut.addColumn(family, column1, timestamps[0], column1); 528 sourcePut.addColumn(family, column2, timestamps[0], value2); 529 sourceTable.put(sourcePut); 530 531 Put targetPut = new Put(Bytes.toBytes(rowIndex)); 532 targetPut.addColumn(family, column1, timestamps[1] + 1, column1); 533 targetPut.addColumn(family, column2, timestamps[1] - 1, value2); 534 targetTable.put(targetPut); 535 } 536 // some rows with different values 537 // ROWSWITHDIFFS: 10 538 // DIFFERENTCELLVALUES: 20 539 for (; rowIndex < numRows; rowIndex++) { 540 Put sourcePut = new Put(Bytes.toBytes(rowIndex)); 541 sourcePut.addColumn(family, column1, timestamps[0], value1); 542 sourcePut.addColumn(family, column2, timestamps[0], value2); 543 sourceTable.put(sourcePut); 544 545 Put targetPut = new Put(Bytes.toBytes(rowIndex)); 546 targetPut.addColumn(family, column1, timestamps[1], value3); 547 targetPut.addColumn(family, column2, timestamps[1], value3); 548 targetTable.put(targetPut); 549 } 550 } 551 } 552}