001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import static org.junit.jupiter.api.Assertions.assertEquals;
021import static org.junit.jupiter.api.Assertions.assertTrue;
022import static org.junit.jupiter.api.Assertions.fail;
023
024import java.util.Arrays;
025import java.util.function.BooleanSupplier;
026import org.apache.commons.lang3.ArrayUtils;
027import org.apache.hadoop.conf.Configuration;
028import org.apache.hadoop.fs.FileSystem;
029import org.apache.hadoop.fs.Path;
030import org.apache.hadoop.hbase.Cell;
031import org.apache.hadoop.hbase.CellUtil;
032import org.apache.hadoop.hbase.HBaseTestingUtil;
033import org.apache.hadoop.hbase.TableName;
034import org.apache.hadoop.hbase.client.Put;
035import org.apache.hadoop.hbase.client.Result;
036import org.apache.hadoop.hbase.client.ResultScanner;
037import org.apache.hadoop.hbase.client.Scan;
038import org.apache.hadoop.hbase.client.Table;
039import org.apache.hadoop.hbase.mapreduce.SyncTable.SyncMapper.Counter;
040import org.apache.hadoop.hbase.testclassification.LargeTests;
041import org.apache.hadoop.hbase.testclassification.MapReduceTests;
042import org.apache.hadoop.hbase.util.Bytes;
043import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
044import org.apache.hadoop.mapreduce.Counters;
045import org.junit.jupiter.api.AfterAll;
046import org.junit.jupiter.api.BeforeAll;
047import org.junit.jupiter.api.Tag;
048import org.junit.jupiter.api.Test;
049import org.junit.jupiter.api.TestInfo;
050import org.slf4j.Logger;
051import org.slf4j.LoggerFactory;
052
053/**
054 * Basic test for the SyncTable M/R tool
055 */
056@Tag(MapReduceTests.TAG)
057@Tag(LargeTests.TAG)
058public class TestSyncTable {
059
060  private static final Logger LOG = LoggerFactory.getLogger(TestSyncTable.class);
061
062  private static final HBaseTestingUtil UTIL1 = new HBaseTestingUtil();
063
064  private static final HBaseTestingUtil UTIL2 = new HBaseTestingUtil();
065
066  @BeforeAll
067  public static void beforeClass() throws Exception {
068    UTIL1.startMiniCluster(3);
069    UTIL2.startMiniCluster(3);
070  }
071
072  @AfterAll
073  public static void afterClass() throws Exception {
074    UTIL2.shutdownMiniCluster();
075    UTIL1.shutdownMiniCluster();
076  }
077
078  private static byte[][] generateSplits(int numRows, int numRegions) {
079    byte[][] splitRows = new byte[numRegions - 1][];
080    for (int i = 1; i < numRegions; i++) {
081      splitRows[i - 1] = Bytes.toBytes(numRows * i / numRegions);
082    }
083    return splitRows;
084  }
085
086  private void testSyncTable(TestInfo testInfo, HBaseTestingUtil source, HBaseTestingUtil target,
087    String... options) throws Exception {
088    final TableName sourceTableName =
089      TableName.valueOf(testInfo.getTestMethod().get().getName() + "_source");
090    final TableName targetTableName =
091      TableName.valueOf(testInfo.getTestMethod().get().getName() + "_target");
092    Path testDir = source.getDataTestDirOnTestFS(testInfo.getTestMethod().get().getName());
093
094    writeTestData(source, sourceTableName, target, targetTableName);
095    hashSourceTable(source, sourceTableName, testDir);
096    Counters syncCounters =
097      syncTables(target.getConfiguration(), sourceTableName, targetTableName, testDir, options);
098    assertEqualTables(90, source, sourceTableName, target, targetTableName, false);
099
100    assertEquals(60, syncCounters.findCounter(Counter.ROWSWITHDIFFS).getValue());
101    assertEquals(10, syncCounters.findCounter(Counter.SOURCEMISSINGROWS).getValue());
102    assertEquals(10, syncCounters.findCounter(Counter.TARGETMISSINGROWS).getValue());
103    assertEquals(50, syncCounters.findCounter(Counter.SOURCEMISSINGCELLS).getValue());
104    assertEquals(50, syncCounters.findCounter(Counter.TARGETMISSINGCELLS).getValue());
105    assertEquals(20, syncCounters.findCounter(Counter.DIFFERENTCELLVALUES).getValue());
106
107    source.deleteTable(sourceTableName);
108    target.deleteTable(targetTableName);
109  }
110
111  @Test
112  public void testSyncTable(TestInfo testInfo) throws Exception {
113    testSyncTable(testInfo, UTIL1, UTIL1);
114  }
115
116  @Test
117  public void testSyncTableToPeerCluster(TestInfo testInfo) throws Exception {
118    testSyncTable(testInfo, UTIL1, UTIL2, "--sourceuri=" + UTIL1.getRpcConnnectionURI());
119  }
120
121  @Test
122  public void testSyncTableFromSourceToPeerCluster(TestInfo testInfo) throws Exception {
123    testSyncTable(testInfo, UTIL2, UTIL1, "--sourceuri=" + UTIL2.getRpcConnnectionURI(),
124      "--targeturi=" + UTIL1.getZkConnectionURI());
125  }
126
127  @Test
128  public void testSyncTableFromSourceToPeerClusterWithClusterKey(TestInfo testInfo)
129    throws Exception {
130    testSyncTable(testInfo, UTIL2, UTIL1, "--sourcezkcluster=" + UTIL2.getClusterKey(),
131      "--targetzkcluster=" + UTIL1.getClusterKey());
132  }
133
134  @Test
135  public void testSyncTableDoDeletesFalse(TestInfo testInfo) throws Exception {
136    final TableName sourceTableName =
137      TableName.valueOf(testInfo.getTestMethod().get().getName() + "_source");
138    final TableName targetTableName =
139      TableName.valueOf(testInfo.getTestMethod().get().getName() + "_target");
140    Path testDir = UTIL1.getDataTestDirOnTestFS(testInfo.getTestMethod().get().getName());
141
142    writeTestData(UTIL1, sourceTableName, UTIL1, targetTableName);
143    hashSourceTable(UTIL1, sourceTableName, testDir);
144    Counters syncCounters = syncTables(UTIL1.getConfiguration(), sourceTableName, targetTableName,
145      testDir, "--doDeletes=false");
146    assertTargetDoDeletesFalse(100, UTIL1, sourceTableName, UTIL1, targetTableName);
147
148    assertEquals(60, syncCounters.findCounter(Counter.ROWSWITHDIFFS).getValue());
149    assertEquals(10, syncCounters.findCounter(Counter.SOURCEMISSINGROWS).getValue());
150    assertEquals(10, syncCounters.findCounter(Counter.TARGETMISSINGROWS).getValue());
151    assertEquals(50, syncCounters.findCounter(Counter.SOURCEMISSINGCELLS).getValue());
152    assertEquals(50, syncCounters.findCounter(Counter.TARGETMISSINGCELLS).getValue());
153    assertEquals(20, syncCounters.findCounter(Counter.DIFFERENTCELLVALUES).getValue());
154
155    UTIL1.deleteTable(sourceTableName);
156    UTIL1.deleteTable(targetTableName);
157  }
158
159  @Test
160  public void testSyncTableDoPutsFalse(TestInfo testInfo) throws Exception {
161    final TableName sourceTableName =
162      TableName.valueOf(testInfo.getTestMethod().get().getName() + "_source");
163    final TableName targetTableName =
164      TableName.valueOf(testInfo.getTestMethod().get().getName() + "_target");
165    Path testDir = UTIL2.getDataTestDirOnTestFS(testInfo.getTestMethod().get().getName());
166
167    writeTestData(UTIL2, sourceTableName, UTIL2, targetTableName);
168    hashSourceTable(UTIL2, sourceTableName, testDir);
169    Counters syncCounters = syncTables(UTIL2.getConfiguration(), sourceTableName, targetTableName,
170      testDir, "--doPuts=false");
171    assertTargetDoPutsFalse(70, UTIL2, sourceTableName, UTIL2, targetTableName);
172
173    assertEquals(60, syncCounters.findCounter(Counter.ROWSWITHDIFFS).getValue());
174    assertEquals(10, syncCounters.findCounter(Counter.SOURCEMISSINGROWS).getValue());
175    assertEquals(10, syncCounters.findCounter(Counter.TARGETMISSINGROWS).getValue());
176    assertEquals(50, syncCounters.findCounter(Counter.SOURCEMISSINGCELLS).getValue());
177    assertEquals(50, syncCounters.findCounter(Counter.TARGETMISSINGCELLS).getValue());
178    assertEquals(20, syncCounters.findCounter(Counter.DIFFERENTCELLVALUES).getValue());
179
180    UTIL2.deleteTable(sourceTableName);
181    UTIL2.deleteTable(targetTableName);
182  }
183
184  @Test
185  public void testSyncTableIgnoreTimestampsTrue(TestInfo testInfo) throws Exception {
186    final TableName sourceTableName =
187      TableName.valueOf(testInfo.getTestMethod().get().getName() + "_source");
188    final TableName targetTableName =
189      TableName.valueOf(testInfo.getTestMethod().get().getName() + "_target");
190    Path testDir = UTIL1.getDataTestDirOnTestFS(testInfo.getTestMethod().get().getName());
191    long current = EnvironmentEdgeManager.currentTime();
192    writeTestData(UTIL1, sourceTableName, UTIL2, targetTableName, current - 1000, current);
193    hashSourceTable(UTIL1, sourceTableName, testDir, "--ignoreTimestamps=true");
194    Counters syncCounters = syncTables(UTIL2.getConfiguration(), sourceTableName, targetTableName,
195      testDir, "--ignoreTimestamps=true", "--sourceuri=" + UTIL1.getRpcConnnectionURI());
196    assertEqualTables(90, UTIL1, sourceTableName, UTIL2, targetTableName, true);
197
198    assertEquals(50, syncCounters.findCounter(Counter.ROWSWITHDIFFS).getValue());
199    assertEquals(10, syncCounters.findCounter(Counter.SOURCEMISSINGROWS).getValue());
200    assertEquals(10, syncCounters.findCounter(Counter.TARGETMISSINGROWS).getValue());
201    assertEquals(30, syncCounters.findCounter(Counter.SOURCEMISSINGCELLS).getValue());
202    assertEquals(30, syncCounters.findCounter(Counter.TARGETMISSINGCELLS).getValue());
203    assertEquals(20, syncCounters.findCounter(Counter.DIFFERENTCELLVALUES).getValue());
204
205    UTIL1.deleteTable(sourceTableName);
206    UTIL2.deleteTable(targetTableName);
207  }
208
209  private void assertCellEquals(Cell sourceCell, Cell targetCell, BooleanSupplier checkTimestamp) {
210    assertTrue(CellUtil.matchingRows(sourceCell, targetCell),
211      "Rows don't match, source: " + sourceCell + ", target: " + targetCell);
212    assertTrue(CellUtil.matchingFamily(sourceCell, targetCell),
213      "Families don't match, source: " + sourceCell + ", target: " + targetCell);
214    assertTrue(CellUtil.matchingQualifier(sourceCell, targetCell),
215      "Qualifiers don't match, source: " + sourceCell + ", target: " + targetCell);
216    if (checkTimestamp.getAsBoolean()) {
217      assertTrue(CellUtil.matchingTimestamp(sourceCell, targetCell),
218        "Timestamps don't match, source: " + sourceCell + ", target: " + targetCell);
219    }
220    assertTrue(CellUtil.matchingValue(sourceCell, targetCell),
221      "Values don't match, source: " + sourceCell + ", target: " + targetCell);
222  }
223
224  private void assertEqualTables(int expectedRows, HBaseTestingUtil sourceCluster,
225    TableName sourceTableName, HBaseTestingUtil targetCluster, TableName targetTableName,
226    boolean ignoreTimestamps) throws Exception {
227    try (Table sourceTable = sourceCluster.getConnection().getTable(sourceTableName);
228      Table targetTable = targetCluster.getConnection().getTable(targetTableName);
229      ResultScanner sourceScanner = sourceTable.getScanner(new Scan());
230      ResultScanner targetScanner = targetTable.getScanner(new Scan())) {
231      for (int i = 0; i < expectedRows; i++) {
232        Result sourceRow = sourceScanner.next();
233        Result targetRow = targetScanner.next();
234
235        LOG.debug("SOURCE row: " + (sourceRow == null ? "null" : Bytes.toInt(sourceRow.getRow()))
236          + " cells:" + sourceRow);
237        LOG.debug("TARGET row: " + (targetRow == null ? "null" : Bytes.toInt(targetRow.getRow()))
238          + " cells:" + targetRow);
239
240        if (sourceRow == null) {
241          fail("Expected " + expectedRows + " source rows but only found " + i);
242        }
243        if (targetRow == null) {
244          fail("Expected " + expectedRows + " target rows but only found " + i);
245        }
246        Cell[] sourceCells = sourceRow.rawCells();
247        Cell[] targetCells = targetRow.rawCells();
248        if (sourceCells.length != targetCells.length) {
249          LOG.debug("Source cells: " + Arrays.toString(sourceCells));
250          LOG.debug("Target cells: " + Arrays.toString(targetCells));
251          fail("Row " + Bytes.toInt(sourceRow.getRow()) + " has " + sourceCells.length
252            + " cells in source table but " + targetCells.length + " cells in target table");
253        }
254        for (int j = 0; j < sourceCells.length; j++) {
255          Cell sourceCell = sourceCells[j];
256          Cell targetCell = targetCells[j];
257          assertCellEquals(sourceCell, targetCell, () -> !ignoreTimestamps);
258        }
259      }
260      Result sourceRow = sourceScanner.next();
261      if (sourceRow != null) {
262        fail("Source table has more than " + expectedRows + " rows.  Next row: "
263          + Bytes.toInt(sourceRow.getRow()));
264      }
265      Result targetRow = targetScanner.next();
266      if (targetRow != null) {
267        fail("Target table has more than " + expectedRows + " rows.  Next row: "
268          + Bytes.toInt(targetRow.getRow()));
269      }
270    }
271  }
272
273  private void assertTargetDoDeletesFalse(int expectedRows, HBaseTestingUtil sourceCluster,
274    TableName sourceTableName, HBaseTestingUtil targetCluster, TableName targetTableName)
275    throws Exception {
276    try (Table sourceTable = sourceCluster.getConnection().getTable(sourceTableName);
277      Table targetTable = targetCluster.getConnection().getTable(targetTableName);
278
279      ResultScanner sourceScanner = sourceTable.getScanner(new Scan());
280      ResultScanner targetScanner = targetTable.getScanner(new Scan())) {
281      Result targetRow = targetScanner.next();
282      Result sourceRow = sourceScanner.next();
283      int rowsCount = 0;
284      while (targetRow != null) {
285        rowsCount++;
286        // only compares values for existing rows, skipping rows existing on
287        // target only that were not deleted given --doDeletes=false
288        if (Bytes.toInt(sourceRow.getRow()) != Bytes.toInt(targetRow.getRow())) {
289          targetRow = targetScanner.next();
290          continue;
291        }
292
293        LOG.debug("SOURCE row: " + (sourceRow == null ? "null" : Bytes.toInt(sourceRow.getRow()))
294          + " cells:" + sourceRow);
295        LOG.debug("TARGET row: " + (targetRow == null ? "null" : Bytes.toInt(targetRow.getRow()))
296          + " cells:" + targetRow);
297
298        Cell[] sourceCells = sourceRow.rawCells();
299        Cell[] targetCells = targetRow.rawCells();
300        int targetRowKey = Bytes.toInt(targetRow.getRow());
301        if (targetRowKey >= 70 && targetRowKey < 80) {
302          if (sourceCells.length == targetCells.length) {
303            LOG.debug("Source cells: " + Arrays.toString(sourceCells));
304            LOG.debug("Target cells: " + Arrays.toString(targetCells));
305            fail("Row " + targetRowKey + " should have more cells in " + "target than in source");
306          }
307
308        } else {
309          if (sourceCells.length != targetCells.length) {
310            LOG.debug("Source cells: " + Arrays.toString(sourceCells));
311            LOG.debug("Target cells: " + Arrays.toString(targetCells));
312            fail("Row " + Bytes.toInt(sourceRow.getRow()) + " has " + sourceCells.length
313              + " cells in source table but " + targetCells.length + " cells in target table");
314          }
315        }
316        for (int j = 0; j < sourceCells.length; j++) {
317          Cell sourceCell = sourceCells[j];
318          Cell targetCell = targetCells[j];
319          assertCellEquals(sourceCell, targetCell, () -> targetRowKey < 80 && targetRowKey >= 90);
320        }
321        targetRow = targetScanner.next();
322        sourceRow = sourceScanner.next();
323      }
324      assertEquals(expectedRows, rowsCount, "Target expected rows does not match.");
325    }
326  }
327
328  private void assertTargetDoPutsFalse(int expectedRows, HBaseTestingUtil sourceCluster,
329    TableName sourceTableName, HBaseTestingUtil targetCluster, TableName targetTableName)
330    throws Exception {
331    try (Table sourceTable = sourceCluster.getConnection().getTable(sourceTableName);
332      Table targetTable = targetCluster.getConnection().getTable(targetTableName);
333      ResultScanner sourceScanner = sourceTable.getScanner(new Scan());
334      ResultScanner targetScanner = targetTable.getScanner(new Scan())) {
335      Result targetRow = targetScanner.next();
336      Result sourceRow = sourceScanner.next();
337      int rowsCount = 0;
338
339      while (targetRow != null) {
340        // only compares values for existing rows, skipping rows existing on
341        // source only that were not added to target given --doPuts=false
342        if (Bytes.toInt(sourceRow.getRow()) != Bytes.toInt(targetRow.getRow())) {
343          sourceRow = sourceScanner.next();
344          continue;
345        }
346
347        LOG.debug("SOURCE row: " + (sourceRow == null ? "null" : Bytes.toInt(sourceRow.getRow()))
348          + " cells:" + sourceRow);
349        LOG.debug("TARGET row: " + (targetRow == null ? "null" : Bytes.toInt(targetRow.getRow()))
350          + " cells:" + targetRow);
351
352        LOG.debug("rowsCount: " + rowsCount);
353
354        Cell[] sourceCells = sourceRow.rawCells();
355        Cell[] targetCells = targetRow.rawCells();
356        int targetRowKey = Bytes.toInt(targetRow.getRow());
357        if (targetRowKey >= 40 && targetRowKey < 60) {
358          LOG.debug("Source cells: " + Arrays.toString(sourceCells));
359          LOG.debug("Target cells: " + Arrays.toString(targetCells));
360          fail("There shouldn't exist any rows between 40 and 60, since "
361            + "Puts are disabled and Deletes are enabled.");
362        } else if (targetRowKey >= 60 && targetRowKey < 70) {
363          if (sourceCells.length == targetCells.length) {
364            LOG.debug("Source cells: " + Arrays.toString(sourceCells));
365            LOG.debug("Target cells: " + Arrays.toString(targetCells));
366            fail(
367              "Row " + Bytes.toInt(sourceRow.getRow()) + " shouldn't have same number of cells.");
368          }
369        } else if (targetRowKey >= 80 && targetRowKey < 90) {
370          LOG.debug("Source cells: " + Arrays.toString(sourceCells));
371          LOG.debug("Target cells: " + Arrays.toString(targetCells));
372          fail("There should be no rows between 80 and 90 on target, as "
373            + "these had different timestamps and should had been deleted.");
374        } else if (targetRowKey >= 90 && targetRowKey < 100) {
375          for (int j = 0; j < sourceCells.length; j++) {
376            Cell sourceCell = sourceCells[j];
377            Cell targetCell = targetCells[j];
378            if (CellUtil.matchingValue(sourceCell, targetCell)) {
379              fail("Cells values should not match for rows between " + "90 and 100. Target row id: "
380                + Bytes.toInt(targetRow.getRow()));
381            }
382          }
383        } else {
384          for (int j = 0; j < sourceCells.length; j++) {
385            Cell sourceCell = sourceCells[j];
386            Cell targetCell = targetCells[j];
387            assertCellEquals(sourceCell, targetCell, () -> true);
388          }
389        }
390        rowsCount++;
391        targetRow = targetScanner.next();
392        sourceRow = sourceScanner.next();
393      }
394      assertEquals(expectedRows, rowsCount, "Target expected rows does not match.");
395    }
396  }
397
398  private Counters syncTables(Configuration conf, TableName sourceTableName,
399    TableName targetTableName, Path testDir, String... options) throws Exception {
400    SyncTable syncTable = new SyncTable(conf);
401    String[] args = Arrays.copyOf(options, options.length + 3);
402    args[options.length] = testDir.toString();
403    args[options.length + 1] = sourceTableName.getNameAsString();
404    args[options.length + 2] = targetTableName.getNameAsString();
405    int code = syncTable.run(args);
406    assertEquals(0, code, "sync table job failed");
407
408    LOG.info("Sync tables completed");
409    return syncTable.counters;
410  }
411
412  private void hashSourceTable(HBaseTestingUtil sourceCluster, TableName sourceTableName,
413    Path testDir, String... options) throws Exception {
414    int numHashFiles = 3;
415    long batchSize = 100; // should be 2 batches per region
416    int scanBatch = 1;
417    HashTable hashTable = new HashTable(sourceCluster.getConfiguration());
418    String[] args = Arrays.copyOf(options, options.length + 5);
419    args[options.length] = "--batchsize=" + batchSize;
420    args[options.length + 1] = "--numhashfiles=" + numHashFiles;
421    args[options.length + 2] = "--scanbatch=" + scanBatch;
422    args[options.length + 3] = sourceTableName.getNameAsString();
423    args[options.length + 4] = testDir.toString();
424    int code = hashTable.run(args);
425    assertEquals(0, code, "hash table job failed");
426
427    FileSystem fs = sourceCluster.getTestFileSystem();
428
429    HashTable.TableHash tableHash = HashTable.TableHash.read(fs.getConf(), testDir);
430    assertEquals(sourceTableName.getNameAsString(), tableHash.tableName);
431    assertEquals(batchSize, tableHash.batchSize);
432    assertEquals(numHashFiles, tableHash.numHashFiles);
433    assertEquals(numHashFiles - 1, tableHash.partitions.size());
434
435    LOG.info("Hash table completed");
436  }
437
438  private void writeTestData(HBaseTestingUtil sourceCluster, TableName sourceTableName,
439    HBaseTestingUtil targetCluster, TableName targetTableName, long... timestamps)
440    throws Exception {
441    final byte[] family = Bytes.toBytes("family");
442    final byte[] column1 = Bytes.toBytes("c1");
443    final byte[] column2 = Bytes.toBytes("c2");
444    final byte[] value1 = Bytes.toBytes("val1");
445    final byte[] value2 = Bytes.toBytes("val2");
446    final byte[] value3 = Bytes.toBytes("val3");
447
448    int numRows = 100;
449    int sourceRegions = 10;
450    int targetRegions = 6;
451    if (ArrayUtils.isEmpty(timestamps)) {
452      long current = EnvironmentEdgeManager.currentTime();
453      timestamps = new long[] { current, current };
454    }
455
456    try (
457      Table sourceTable =
458        sourceCluster.createTable(sourceTableName, family, generateSplits(numRows, sourceRegions));
459      Table targetTable = targetCluster.createTable(targetTableName, family,
460        generateSplits(numRows, targetRegions))) {
461
462      int rowIndex = 0;
463      // a bunch of identical rows
464      for (; rowIndex < 40; rowIndex++) {
465        Put sourcePut = new Put(Bytes.toBytes(rowIndex));
466        sourcePut.addColumn(family, column1, timestamps[0], value1);
467        sourcePut.addColumn(family, column2, timestamps[0], value2);
468        sourceTable.put(sourcePut);
469
470        Put targetPut = new Put(Bytes.toBytes(rowIndex));
471        targetPut.addColumn(family, column1, timestamps[1], value1);
472        targetPut.addColumn(family, column2, timestamps[1], value2);
473        targetTable.put(targetPut);
474      }
475      // some rows only in the source table
476      // ROWSWITHDIFFS: 10
477      // TARGETMISSINGROWS: 10
478      // TARGETMISSINGCELLS: 20
479      for (; rowIndex < 50; rowIndex++) {
480        Put put = new Put(Bytes.toBytes(rowIndex));
481        put.addColumn(family, column1, timestamps[0], value1);
482        put.addColumn(family, column2, timestamps[0], value2);
483        sourceTable.put(put);
484      }
485      // some rows only in the target table
486      // ROWSWITHDIFFS: 10
487      // SOURCEMISSINGROWS: 10
488      // SOURCEMISSINGCELLS: 20
489      for (; rowIndex < 60; rowIndex++) {
490        Put put = new Put(Bytes.toBytes(rowIndex));
491        put.addColumn(family, column1, timestamps[1], value1);
492        put.addColumn(family, column2, timestamps[1], value2);
493        targetTable.put(put);
494      }
495      // some rows with 1 missing cell in target table
496      // ROWSWITHDIFFS: 10
497      // TARGETMISSINGCELLS: 10
498      for (; rowIndex < 70; rowIndex++) {
499        Put sourcePut = new Put(Bytes.toBytes(rowIndex));
500        sourcePut.addColumn(family, column1, timestamps[0], value1);
501        sourcePut.addColumn(family, column2, timestamps[0], value2);
502        sourceTable.put(sourcePut);
503
504        Put targetPut = new Put(Bytes.toBytes(rowIndex));
505        targetPut.addColumn(family, column1, timestamps[1], value1);
506        targetTable.put(targetPut);
507      }
508      // some rows with 1 missing cell in source table
509      // ROWSWITHDIFFS: 10
510      // SOURCEMISSINGCELLS: 10
511      for (; rowIndex < 80; rowIndex++) {
512        Put sourcePut = new Put(Bytes.toBytes(rowIndex));
513        sourcePut.addColumn(family, column1, timestamps[0], value1);
514        sourceTable.put(sourcePut);
515
516        Put targetPut = new Put(Bytes.toBytes(rowIndex));
517        targetPut.addColumn(family, column1, timestamps[1], value1);
518        targetPut.addColumn(family, column2, timestamps[1], value2);
519        targetTable.put(targetPut);
520      }
521      // some rows differing only in timestamp
522      // ROWSWITHDIFFS: 10
523      // SOURCEMISSINGCELLS: 20
524      // TARGETMISSINGCELLS: 20
525      for (; rowIndex < 90; rowIndex++) {
526        Put sourcePut = new Put(Bytes.toBytes(rowIndex));
527        sourcePut.addColumn(family, column1, timestamps[0], column1);
528        sourcePut.addColumn(family, column2, timestamps[0], value2);
529        sourceTable.put(sourcePut);
530
531        Put targetPut = new Put(Bytes.toBytes(rowIndex));
532        targetPut.addColumn(family, column1, timestamps[1] + 1, column1);
533        targetPut.addColumn(family, column2, timestamps[1] - 1, value2);
534        targetTable.put(targetPut);
535      }
536      // some rows with different values
537      // ROWSWITHDIFFS: 10
538      // DIFFERENTCELLVALUES: 20
539      for (; rowIndex < numRows; rowIndex++) {
540        Put sourcePut = new Put(Bytes.toBytes(rowIndex));
541        sourcePut.addColumn(family, column1, timestamps[0], value1);
542        sourcePut.addColumn(family, column2, timestamps[0], value2);
543        sourceTable.put(sourcePut);
544
545        Put targetPut = new Put(Bytes.toBytes(rowIndex));
546        targetPut.addColumn(family, column1, timestamps[1], value3);
547        targetPut.addColumn(family, column2, timestamps[1], value3);
548        targetTable.put(targetPut);
549      }
550    }
551  }
552}