001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import static org.junit.Assert.assertEquals;
021
022import java.util.Arrays;
023import org.apache.commons.lang3.ArrayUtils;
024import org.apache.hadoop.fs.FileSystem;
025import org.apache.hadoop.fs.Path;
026import org.apache.hadoop.hbase.Cell;
027import org.apache.hadoop.hbase.CellUtil;
028import org.apache.hadoop.hbase.HBaseClassTestRule;
029import org.apache.hadoop.hbase.HBaseTestingUtil;
030import org.apache.hadoop.hbase.TableName;
031import org.apache.hadoop.hbase.client.Put;
032import org.apache.hadoop.hbase.client.Result;
033import org.apache.hadoop.hbase.client.ResultScanner;
034import org.apache.hadoop.hbase.client.Scan;
035import org.apache.hadoop.hbase.client.Table;
036import org.apache.hadoop.hbase.mapreduce.SyncTable.SyncMapper.Counter;
037import org.apache.hadoop.hbase.testclassification.LargeTests;
038import org.apache.hadoop.hbase.util.Bytes;
039import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
040import org.apache.hadoop.mapreduce.Counters;
041import org.junit.AfterClass;
042import org.junit.Assert;
043import org.junit.BeforeClass;
044import org.junit.ClassRule;
045import org.junit.Rule;
046import org.junit.Test;
047import org.junit.experimental.categories.Category;
048import org.junit.rules.TestName;
049import org.slf4j.Logger;
050import org.slf4j.LoggerFactory;
051
052import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
053
054/**
055 * Basic test for the SyncTable M/R tool
056 */
057@Category(LargeTests.class)
058public class TestSyncTable {
059  @ClassRule
060  public static final HBaseClassTestRule CLASS_RULE =
061    HBaseClassTestRule.forClass(TestSyncTable.class);
062
063  private static final Logger LOG = LoggerFactory.getLogger(TestSyncTable.class);
064
065  private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
066
067  @Rule
068  public TestName name = new TestName();
069
070  @BeforeClass
071  public static void beforeClass() throws Exception {
072    TEST_UTIL.startMiniCluster(3);
073  }
074
075  @AfterClass
076  public static void afterClass() throws Exception {
077    TEST_UTIL.cleanupDataTestDirOnTestFS();
078    TEST_UTIL.shutdownMiniCluster();
079  }
080
081  private static byte[][] generateSplits(int numRows, int numRegions) {
082    byte[][] splitRows = new byte[numRegions - 1][];
083    for (int i = 1; i < numRegions; i++) {
084      splitRows[i - 1] = Bytes.toBytes(numRows * i / numRegions);
085    }
086    return splitRows;
087  }
088
089  @Test
090  public void testSyncTable() throws Exception {
091    final TableName sourceTableName = TableName.valueOf(name.getMethodName() + "_source");
092    final TableName targetTableName = TableName.valueOf(name.getMethodName() + "_target");
093    Path testDir = TEST_UTIL.getDataTestDirOnTestFS("testSyncTable");
094
095    writeTestData(sourceTableName, targetTableName);
096    hashSourceTable(sourceTableName, testDir);
097    Counters syncCounters = syncTables(sourceTableName, targetTableName, testDir);
098    assertEqualTables(90, sourceTableName, targetTableName, false);
099
100    assertEquals(60, syncCounters.findCounter(Counter.ROWSWITHDIFFS).getValue());
101    assertEquals(10, syncCounters.findCounter(Counter.SOURCEMISSINGROWS).getValue());
102    assertEquals(10, syncCounters.findCounter(Counter.TARGETMISSINGROWS).getValue());
103    assertEquals(50, syncCounters.findCounter(Counter.SOURCEMISSINGCELLS).getValue());
104    assertEquals(50, syncCounters.findCounter(Counter.TARGETMISSINGCELLS).getValue());
105    assertEquals(20, syncCounters.findCounter(Counter.DIFFERENTCELLVALUES).getValue());
106
107    TEST_UTIL.deleteTable(sourceTableName);
108    TEST_UTIL.deleteTable(targetTableName);
109  }
110
111  @Test
112  public void testSyncTableDoDeletesFalse() throws Exception {
113    final TableName sourceTableName = TableName.valueOf(name.getMethodName() + "_source");
114    final TableName targetTableName = TableName.valueOf(name.getMethodName() + "_target");
115    Path testDir = TEST_UTIL.getDataTestDirOnTestFS("testSyncTableDoDeletesFalse");
116
117    writeTestData(sourceTableName, targetTableName);
118    hashSourceTable(sourceTableName, testDir);
119    Counters syncCounters =
120      syncTables(sourceTableName, targetTableName, testDir, "--doDeletes=false");
121    assertTargetDoDeletesFalse(100, sourceTableName, targetTableName);
122
123    assertEquals(60, syncCounters.findCounter(Counter.ROWSWITHDIFFS).getValue());
124    assertEquals(10, syncCounters.findCounter(Counter.SOURCEMISSINGROWS).getValue());
125    assertEquals(10, syncCounters.findCounter(Counter.TARGETMISSINGROWS).getValue());
126    assertEquals(50, syncCounters.findCounter(Counter.SOURCEMISSINGCELLS).getValue());
127    assertEquals(50, syncCounters.findCounter(Counter.TARGETMISSINGCELLS).getValue());
128    assertEquals(20, syncCounters.findCounter(Counter.DIFFERENTCELLVALUES).getValue());
129
130    TEST_UTIL.deleteTable(sourceTableName);
131    TEST_UTIL.deleteTable(targetTableName);
132  }
133
134  @Test
135  public void testSyncTableDoPutsFalse() throws Exception {
136    final TableName sourceTableName = TableName.valueOf(name.getMethodName() + "_source");
137    final TableName targetTableName = TableName.valueOf(name.getMethodName() + "_target");
138    Path testDir = TEST_UTIL.getDataTestDirOnTestFS("testSyncTableDoPutsFalse");
139
140    writeTestData(sourceTableName, targetTableName);
141    hashSourceTable(sourceTableName, testDir);
142    Counters syncCounters = syncTables(sourceTableName, targetTableName, testDir, "--doPuts=false");
143    assertTargetDoPutsFalse(70, sourceTableName, targetTableName);
144
145    assertEquals(60, syncCounters.findCounter(Counter.ROWSWITHDIFFS).getValue());
146    assertEquals(10, syncCounters.findCounter(Counter.SOURCEMISSINGROWS).getValue());
147    assertEquals(10, syncCounters.findCounter(Counter.TARGETMISSINGROWS).getValue());
148    assertEquals(50, syncCounters.findCounter(Counter.SOURCEMISSINGCELLS).getValue());
149    assertEquals(50, syncCounters.findCounter(Counter.TARGETMISSINGCELLS).getValue());
150    assertEquals(20, syncCounters.findCounter(Counter.DIFFERENTCELLVALUES).getValue());
151
152    TEST_UTIL.deleteTable(sourceTableName);
153    TEST_UTIL.deleteTable(targetTableName);
154  }
155
156  @Test
157  public void testSyncTableIgnoreTimestampsTrue() throws Exception {
158    final TableName sourceTableName = TableName.valueOf(name.getMethodName() + "_source");
159    final TableName targetTableName = TableName.valueOf(name.getMethodName() + "_target");
160    Path testDir = TEST_UTIL.getDataTestDirOnTestFS("testSyncTableIgnoreTimestampsTrue");
161    long current = EnvironmentEdgeManager.currentTime();
162    writeTestData(sourceTableName, targetTableName, current - 1000, current);
163    hashSourceTable(sourceTableName, testDir, "--ignoreTimestamps=true");
164    Counters syncCounters =
165      syncTables(sourceTableName, targetTableName, testDir, "--ignoreTimestamps=true");
166    assertEqualTables(90, sourceTableName, targetTableName, true);
167
168    assertEquals(50, syncCounters.findCounter(Counter.ROWSWITHDIFFS).getValue());
169    assertEquals(10, syncCounters.findCounter(Counter.SOURCEMISSINGROWS).getValue());
170    assertEquals(10, syncCounters.findCounter(Counter.TARGETMISSINGROWS).getValue());
171    assertEquals(30, syncCounters.findCounter(Counter.SOURCEMISSINGCELLS).getValue());
172    assertEquals(30, syncCounters.findCounter(Counter.TARGETMISSINGCELLS).getValue());
173    assertEquals(20, syncCounters.findCounter(Counter.DIFFERENTCELLVALUES).getValue());
174
175    TEST_UTIL.deleteTable(sourceTableName);
176    TEST_UTIL.deleteTable(targetTableName);
177  }
178
179  private void assertEqualTables(int expectedRows, TableName sourceTableName,
180    TableName targetTableName, boolean ignoreTimestamps) throws Exception {
181    Table sourceTable = TEST_UTIL.getConnection().getTable(sourceTableName);
182    Table targetTable = TEST_UTIL.getConnection().getTable(targetTableName);
183
184    ResultScanner sourceScanner = sourceTable.getScanner(new Scan());
185    ResultScanner targetScanner = targetTable.getScanner(new Scan());
186
187    for (int i = 0; i < expectedRows; i++) {
188      Result sourceRow = sourceScanner.next();
189      Result targetRow = targetScanner.next();
190
191      LOG.debug("SOURCE row: " + (sourceRow == null ? "null" : Bytes.toInt(sourceRow.getRow()))
192        + " cells:" + sourceRow);
193      LOG.debug("TARGET row: " + (targetRow == null ? "null" : Bytes.toInt(targetRow.getRow()))
194        + " cells:" + targetRow);
195
196      if (sourceRow == null) {
197        Assert.fail("Expected " + expectedRows + " source rows but only found " + i);
198      }
199      if (targetRow == null) {
200        Assert.fail("Expected " + expectedRows + " target rows but only found " + i);
201      }
202      Cell[] sourceCells = sourceRow.rawCells();
203      Cell[] targetCells = targetRow.rawCells();
204      if (sourceCells.length != targetCells.length) {
205        LOG.debug("Source cells: " + Arrays.toString(sourceCells));
206        LOG.debug("Target cells: " + Arrays.toString(targetCells));
207        Assert.fail("Row " + Bytes.toInt(sourceRow.getRow()) + " has " + sourceCells.length
208          + " cells in source table but " + targetCells.length + " cells in target table");
209      }
210      for (int j = 0; j < sourceCells.length; j++) {
211        Cell sourceCell = sourceCells[j];
212        Cell targetCell = targetCells[j];
213        try {
214          if (!CellUtil.matchingRows(sourceCell, targetCell)) {
215            Assert.fail("Rows don't match");
216          }
217          if (!CellUtil.matchingFamily(sourceCell, targetCell)) {
218            Assert.fail("Families don't match");
219          }
220          if (!CellUtil.matchingQualifier(sourceCell, targetCell)) {
221            Assert.fail("Qualifiers don't match");
222          }
223          if (!ignoreTimestamps && !CellUtil.matchingTimestamp(sourceCell, targetCell)) {
224            Assert.fail("Timestamps don't match");
225          }
226          if (!CellUtil.matchingValue(sourceCell, targetCell)) {
227            Assert.fail("Values don't match");
228          }
229        } catch (Throwable t) {
230          LOG.debug("Source cell: " + sourceCell + " target cell: " + targetCell);
231          Throwables.propagate(t);
232        }
233      }
234    }
235    Result sourceRow = sourceScanner.next();
236    if (sourceRow != null) {
237      Assert.fail("Source table has more than " + expectedRows + " rows.  Next row: "
238        + Bytes.toInt(sourceRow.getRow()));
239    }
240    Result targetRow = targetScanner.next();
241    if (targetRow != null) {
242      Assert.fail("Target table has more than " + expectedRows + " rows.  Next row: "
243        + Bytes.toInt(targetRow.getRow()));
244    }
245    sourceScanner.close();
246    targetScanner.close();
247    sourceTable.close();
248    targetTable.close();
249  }
250
251  private void assertTargetDoDeletesFalse(int expectedRows, TableName sourceTableName,
252    TableName targetTableName) throws Exception {
253    Table sourceTable = TEST_UTIL.getConnection().getTable(sourceTableName);
254    Table targetTable = TEST_UTIL.getConnection().getTable(targetTableName);
255
256    ResultScanner sourceScanner = sourceTable.getScanner(new Scan());
257    ResultScanner targetScanner = targetTable.getScanner(new Scan());
258    Result targetRow = targetScanner.next();
259    Result sourceRow = sourceScanner.next();
260    int rowsCount = 0;
261    while (targetRow != null) {
262      rowsCount++;
263      // only compares values for existing rows, skipping rows existing on
264      // target only that were not deleted given --doDeletes=false
265      if (Bytes.toInt(sourceRow.getRow()) != Bytes.toInt(targetRow.getRow())) {
266        targetRow = targetScanner.next();
267        continue;
268      }
269
270      LOG.debug("SOURCE row: " + (sourceRow == null ? "null" : Bytes.toInt(sourceRow.getRow()))
271        + " cells:" + sourceRow);
272      LOG.debug("TARGET row: " + (targetRow == null ? "null" : Bytes.toInt(targetRow.getRow()))
273        + " cells:" + targetRow);
274
275      Cell[] sourceCells = sourceRow.rawCells();
276      Cell[] targetCells = targetRow.rawCells();
277      int targetRowKey = Bytes.toInt(targetRow.getRow());
278      if (targetRowKey >= 70 && targetRowKey < 80) {
279        if (sourceCells.length == targetCells.length) {
280          LOG.debug("Source cells: " + Arrays.toString(sourceCells));
281          LOG.debug("Target cells: " + Arrays.toString(targetCells));
282          Assert
283            .fail("Row " + targetRowKey + " should have more cells in " + "target than in source");
284        }
285
286      } else {
287        if (sourceCells.length != targetCells.length) {
288          LOG.debug("Source cells: " + Arrays.toString(sourceCells));
289          LOG.debug("Target cells: " + Arrays.toString(targetCells));
290          Assert.fail("Row " + Bytes.toInt(sourceRow.getRow()) + " has " + sourceCells.length
291            + " cells in source table but " + targetCells.length + " cells in target table");
292        }
293      }
294      for (int j = 0; j < sourceCells.length; j++) {
295        Cell sourceCell = sourceCells[j];
296        Cell targetCell = targetCells[j];
297        try {
298          if (!CellUtil.matchingRows(sourceCell, targetCell)) {
299            Assert.fail("Rows don't match");
300          }
301          if (!CellUtil.matchingFamily(sourceCell, targetCell)) {
302            Assert.fail("Families don't match");
303          }
304          if (!CellUtil.matchingQualifier(sourceCell, targetCell)) {
305            Assert.fail("Qualifiers don't match");
306          }
307          if (targetRowKey < 80 && targetRowKey >= 90) {
308            if (!CellUtil.matchingTimestamp(sourceCell, targetCell)) {
309              Assert.fail("Timestamps don't match");
310            }
311          }
312          if (!CellUtil.matchingValue(sourceCell, targetCell)) {
313            Assert.fail("Values don't match");
314          }
315        } catch (Throwable t) {
316          LOG.debug("Source cell: " + sourceCell + " target cell: " + targetCell);
317          Throwables.propagate(t);
318        }
319      }
320      targetRow = targetScanner.next();
321      sourceRow = sourceScanner.next();
322    }
323    assertEquals("Target expected rows does not match.", expectedRows, rowsCount);
324    sourceScanner.close();
325    targetScanner.close();
326    sourceTable.close();
327    targetTable.close();
328  }
329
330  private void assertTargetDoPutsFalse(int expectedRows, TableName sourceTableName,
331    TableName targetTableName) throws Exception {
332    Table sourceTable = TEST_UTIL.getConnection().getTable(sourceTableName);
333    Table targetTable = TEST_UTIL.getConnection().getTable(targetTableName);
334
335    ResultScanner sourceScanner = sourceTable.getScanner(new Scan());
336    ResultScanner targetScanner = targetTable.getScanner(new Scan());
337    Result targetRow = targetScanner.next();
338    Result sourceRow = sourceScanner.next();
339    int rowsCount = 0;
340
341    while (targetRow != null) {
342      // only compares values for existing rows, skipping rows existing on
343      // source only that were not added to target given --doPuts=false
344      if (Bytes.toInt(sourceRow.getRow()) != Bytes.toInt(targetRow.getRow())) {
345        sourceRow = sourceScanner.next();
346        continue;
347      }
348
349      LOG.debug("SOURCE row: " + (sourceRow == null ? "null" : Bytes.toInt(sourceRow.getRow()))
350        + " cells:" + sourceRow);
351      LOG.debug("TARGET row: " + (targetRow == null ? "null" : Bytes.toInt(targetRow.getRow()))
352        + " cells:" + targetRow);
353
354      LOG.debug("rowsCount: " + rowsCount);
355
356      Cell[] sourceCells = sourceRow.rawCells();
357      Cell[] targetCells = targetRow.rawCells();
358      int targetRowKey = Bytes.toInt(targetRow.getRow());
359      if (targetRowKey >= 40 && targetRowKey < 60) {
360        LOG.debug("Source cells: " + Arrays.toString(sourceCells));
361        LOG.debug("Target cells: " + Arrays.toString(targetCells));
362        Assert.fail("There shouldn't exist any rows between 40 and 60, since "
363          + "Puts are disabled and Deletes are enabled.");
364      } else if (targetRowKey >= 60 && targetRowKey < 70) {
365        if (sourceCells.length == targetCells.length) {
366          LOG.debug("Source cells: " + Arrays.toString(sourceCells));
367          LOG.debug("Target cells: " + Arrays.toString(targetCells));
368          Assert.fail(
369            "Row " + Bytes.toInt(sourceRow.getRow()) + " shouldn't have same number of cells.");
370        }
371      } else if (targetRowKey >= 80 && targetRowKey < 90) {
372        LOG.debug("Source cells: " + Arrays.toString(sourceCells));
373        LOG.debug("Target cells: " + Arrays.toString(targetCells));
374        Assert.fail("There should be no rows between 80 and 90 on target, as "
375          + "these had different timestamps and should had been deleted.");
376      } else if (targetRowKey >= 90 && targetRowKey < 100) {
377        for (int j = 0; j < sourceCells.length; j++) {
378          Cell sourceCell = sourceCells[j];
379          Cell targetCell = targetCells[j];
380          if (CellUtil.matchingValue(sourceCell, targetCell)) {
381            Assert.fail("Cells values should not match for rows between "
382              + "90 and 100. Target row id: " + (Bytes.toInt(targetRow.getRow())));
383          }
384        }
385      } else {
386        for (int j = 0; j < sourceCells.length; j++) {
387          Cell sourceCell = sourceCells[j];
388          Cell targetCell = targetCells[j];
389          try {
390            if (!CellUtil.matchingRows(sourceCell, targetCell)) {
391              Assert.fail("Rows don't match");
392            }
393            if (!CellUtil.matchingFamily(sourceCell, targetCell)) {
394              Assert.fail("Families don't match");
395            }
396            if (!CellUtil.matchingQualifier(sourceCell, targetCell)) {
397              Assert.fail("Qualifiers don't match");
398            }
399            if (!CellUtil.matchingTimestamp(sourceCell, targetCell)) {
400              Assert.fail("Timestamps don't match");
401            }
402            if (!CellUtil.matchingValue(sourceCell, targetCell)) {
403              Assert.fail("Values don't match");
404            }
405          } catch (Throwable t) {
406            LOG.debug("Source cell: " + sourceCell + " target cell: " + targetCell);
407            Throwables.propagate(t);
408          }
409        }
410      }
411      rowsCount++;
412      targetRow = targetScanner.next();
413      sourceRow = sourceScanner.next();
414    }
415    assertEquals("Target expected rows does not match.", expectedRows, rowsCount);
416    sourceScanner.close();
417    targetScanner.close();
418    sourceTable.close();
419    targetTable.close();
420  }
421
422  private Counters syncTables(TableName sourceTableName, TableName targetTableName, Path testDir,
423    String... options) throws Exception {
424    SyncTable syncTable = new SyncTable(TEST_UTIL.getConfiguration());
425    String[] args = Arrays.copyOf(options, options.length + 3);
426    args[options.length] = testDir.toString();
427    args[options.length + 1] = sourceTableName.getNameAsString();
428    args[options.length + 2] = targetTableName.getNameAsString();
429    int code = syncTable.run(args);
430    assertEquals("sync table job failed", 0, code);
431
432    LOG.info("Sync tables completed");
433    return syncTable.counters;
434  }
435
436  private void hashSourceTable(TableName sourceTableName, Path testDir, String... options)
437    throws Exception {
438    int numHashFiles = 3;
439    long batchSize = 100; // should be 2 batches per region
440    int scanBatch = 1;
441    HashTable hashTable = new HashTable(TEST_UTIL.getConfiguration());
442    String[] args = Arrays.copyOf(options, options.length + 5);
443    args[options.length] = "--batchsize=" + batchSize;
444    args[options.length + 1] = "--numhashfiles=" + numHashFiles;
445    args[options.length + 2] = "--scanbatch=" + scanBatch;
446    args[options.length + 3] = sourceTableName.getNameAsString();
447    args[options.length + 4] = testDir.toString();
448    int code = hashTable.run(args);
449    assertEquals("hash table job failed", 0, code);
450
451    FileSystem fs = TEST_UTIL.getTestFileSystem();
452
453    HashTable.TableHash tableHash = HashTable.TableHash.read(fs.getConf(), testDir);
454    assertEquals(sourceTableName.getNameAsString(), tableHash.tableName);
455    assertEquals(batchSize, tableHash.batchSize);
456    assertEquals(numHashFiles, tableHash.numHashFiles);
457    assertEquals(numHashFiles - 1, tableHash.partitions.size());
458
459    LOG.info("Hash table completed");
460  }
461
462  private void writeTestData(TableName sourceTableName, TableName targetTableName,
463    long... timestamps) throws Exception {
464    final byte[] family = Bytes.toBytes("family");
465    final byte[] column1 = Bytes.toBytes("c1");
466    final byte[] column2 = Bytes.toBytes("c2");
467    final byte[] value1 = Bytes.toBytes("val1");
468    final byte[] value2 = Bytes.toBytes("val2");
469    final byte[] value3 = Bytes.toBytes("val3");
470
471    int numRows = 100;
472    int sourceRegions = 10;
473    int targetRegions = 6;
474    if (ArrayUtils.isEmpty(timestamps)) {
475      long current = EnvironmentEdgeManager.currentTime();
476      timestamps = new long[] { current, current };
477    }
478
479    Table sourceTable =
480      TEST_UTIL.createTable(sourceTableName, family, generateSplits(numRows, sourceRegions));
481
482    Table targetTable =
483      TEST_UTIL.createTable(targetTableName, family, generateSplits(numRows, targetRegions));
484
485    int rowIndex = 0;
486    // a bunch of identical rows
487    for (; rowIndex < 40; rowIndex++) {
488      Put sourcePut = new Put(Bytes.toBytes(rowIndex));
489      sourcePut.addColumn(family, column1, timestamps[0], value1);
490      sourcePut.addColumn(family, column2, timestamps[0], value2);
491      sourceTable.put(sourcePut);
492
493      Put targetPut = new Put(Bytes.toBytes(rowIndex));
494      targetPut.addColumn(family, column1, timestamps[1], value1);
495      targetPut.addColumn(family, column2, timestamps[1], value2);
496      targetTable.put(targetPut);
497    }
498    // some rows only in the source table
499    // ROWSWITHDIFFS: 10
500    // TARGETMISSINGROWS: 10
501    // TARGETMISSINGCELLS: 20
502    for (; rowIndex < 50; rowIndex++) {
503      Put put = new Put(Bytes.toBytes(rowIndex));
504      put.addColumn(family, column1, timestamps[0], value1);
505      put.addColumn(family, column2, timestamps[0], value2);
506      sourceTable.put(put);
507    }
508    // some rows only in the target table
509    // ROWSWITHDIFFS: 10
510    // SOURCEMISSINGROWS: 10
511    // SOURCEMISSINGCELLS: 20
512    for (; rowIndex < 60; rowIndex++) {
513      Put put = new Put(Bytes.toBytes(rowIndex));
514      put.addColumn(family, column1, timestamps[1], value1);
515      put.addColumn(family, column2, timestamps[1], value2);
516      targetTable.put(put);
517    }
518    // some rows with 1 missing cell in target table
519    // ROWSWITHDIFFS: 10
520    // TARGETMISSINGCELLS: 10
521    for (; rowIndex < 70; rowIndex++) {
522      Put sourcePut = new Put(Bytes.toBytes(rowIndex));
523      sourcePut.addColumn(family, column1, timestamps[0], value1);
524      sourcePut.addColumn(family, column2, timestamps[0], value2);
525      sourceTable.put(sourcePut);
526
527      Put targetPut = new Put(Bytes.toBytes(rowIndex));
528      targetPut.addColumn(family, column1, timestamps[1], value1);
529      targetTable.put(targetPut);
530    }
531    // some rows with 1 missing cell in source table
532    // ROWSWITHDIFFS: 10
533    // SOURCEMISSINGCELLS: 10
534    for (; rowIndex < 80; rowIndex++) {
535      Put sourcePut = new Put(Bytes.toBytes(rowIndex));
536      sourcePut.addColumn(family, column1, timestamps[0], value1);
537      sourceTable.put(sourcePut);
538
539      Put targetPut = new Put(Bytes.toBytes(rowIndex));
540      targetPut.addColumn(family, column1, timestamps[1], value1);
541      targetPut.addColumn(family, column2, timestamps[1], value2);
542      targetTable.put(targetPut);
543    }
544    // some rows differing only in timestamp
545    // ROWSWITHDIFFS: 10
546    // SOURCEMISSINGCELLS: 20
547    // TARGETMISSINGCELLS: 20
548    for (; rowIndex < 90; rowIndex++) {
549      Put sourcePut = new Put(Bytes.toBytes(rowIndex));
550      sourcePut.addColumn(family, column1, timestamps[0], column1);
551      sourcePut.addColumn(family, column2, timestamps[0], value2);
552      sourceTable.put(sourcePut);
553
554      Put targetPut = new Put(Bytes.toBytes(rowIndex));
555      targetPut.addColumn(family, column1, timestamps[1] + 1, column1);
556      targetPut.addColumn(family, column2, timestamps[1] - 1, value2);
557      targetTable.put(targetPut);
558    }
559    // some rows with different values
560    // ROWSWITHDIFFS: 10
561    // DIFFERENTCELLVALUES: 20
562    for (; rowIndex < numRows; rowIndex++) {
563      Put sourcePut = new Put(Bytes.toBytes(rowIndex));
564      sourcePut.addColumn(family, column1, timestamps[0], value1);
565      sourcePut.addColumn(family, column2, timestamps[0], value2);
566      sourceTable.put(sourcePut);
567
568      Put targetPut = new Put(Bytes.toBytes(rowIndex));
569      targetPut.addColumn(family, column1, timestamps[1], value3);
570      targetPut.addColumn(family, column2, timestamps[1], value3);
571      targetTable.put(targetPut);
572    }
573
574    sourceTable.close();
575    targetTable.close();
576  }
577}