001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import static org.junit.Assert.assertEquals;
021
022import java.util.Arrays;
023
024import org.apache.commons.lang3.ArrayUtils;
025import org.apache.hadoop.fs.FileSystem;
026import org.apache.hadoop.fs.Path;
027import org.apache.hadoop.hbase.Cell;
028import org.apache.hadoop.hbase.CellUtil;
029import org.apache.hadoop.hbase.HBaseClassTestRule;
030import org.apache.hadoop.hbase.HBaseTestingUtility;
031import org.apache.hadoop.hbase.TableName;
032import org.apache.hadoop.hbase.client.Put;
033import org.apache.hadoop.hbase.client.Result;
034import org.apache.hadoop.hbase.client.ResultScanner;
035import org.apache.hadoop.hbase.client.Scan;
036import org.apache.hadoop.hbase.client.Table;
037import org.apache.hadoop.hbase.mapreduce.SyncTable.SyncMapper.Counter;
038import org.apache.hadoop.hbase.testclassification.LargeTests;
039import org.apache.hadoop.hbase.util.Bytes;
040import org.apache.hadoop.mapreduce.Counters;
041import org.junit.AfterClass;
042import org.junit.Assert;
043import org.junit.BeforeClass;
044import org.junit.ClassRule;
045import org.junit.Rule;
046import org.junit.Test;
047import org.junit.experimental.categories.Category;
048import org.junit.rules.TestName;
049import org.slf4j.Logger;
050import org.slf4j.LoggerFactory;
051
052import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
053
054/**
055 * Basic test for the SyncTable M/R tool
056 */
057@Category(LargeTests.class)
058public class TestSyncTable {
059  @ClassRule
060  public static final HBaseClassTestRule CLASS_RULE =
061      HBaseClassTestRule.forClass(TestSyncTable.class);
062
063  private static final Logger LOG = LoggerFactory.getLogger(TestSyncTable.class);
064
065  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
066
067  @Rule
068  public TestName name = new TestName();
069
070  @BeforeClass
071  public static void beforeClass() throws Exception {
072    TEST_UTIL.startMiniCluster(3);
073  }
074
075  @AfterClass
076  public static void afterClass() throws Exception {
077    TEST_UTIL.cleanupDataTestDirOnTestFS();
078    TEST_UTIL.shutdownMiniCluster();
079  }
080
081  private static byte[][] generateSplits(int numRows, int numRegions) {
082    byte[][] splitRows = new byte[numRegions-1][];
083    for (int i = 1; i < numRegions; i++) {
084      splitRows[i-1] = Bytes.toBytes(numRows * i / numRegions);
085    }
086    return splitRows;
087  }
088
089  @Test
090  public void testSyncTable() throws Exception {
091    final TableName sourceTableName = TableName.valueOf(name.getMethodName() + "_source");
092    final TableName targetTableName = TableName.valueOf(name.getMethodName() + "_target");
093    Path testDir = TEST_UTIL.getDataTestDirOnTestFS("testSyncTable");
094
095    writeTestData(sourceTableName, targetTableName);
096    hashSourceTable(sourceTableName, testDir);
097    Counters syncCounters = syncTables(sourceTableName, targetTableName, testDir);
098    assertEqualTables(90, sourceTableName, targetTableName, false);
099
100    assertEquals(60, syncCounters.findCounter(Counter.ROWSWITHDIFFS).getValue());
101    assertEquals(10, syncCounters.findCounter(Counter.SOURCEMISSINGROWS).getValue());
102    assertEquals(10, syncCounters.findCounter(Counter.TARGETMISSINGROWS).getValue());
103    assertEquals(50, syncCounters.findCounter(Counter.SOURCEMISSINGCELLS).getValue());
104    assertEquals(50, syncCounters.findCounter(Counter.TARGETMISSINGCELLS).getValue());
105    assertEquals(20, syncCounters.findCounter(Counter.DIFFERENTCELLVALUES).getValue());
106
107    TEST_UTIL.deleteTable(sourceTableName);
108    TEST_UTIL.deleteTable(targetTableName);
109  }
110
111  @Test
112  public void testSyncTableDoDeletesFalse() throws Exception {
113    final TableName sourceTableName = TableName.valueOf(name.getMethodName() + "_source");
114    final TableName targetTableName = TableName.valueOf(name.getMethodName() + "_target");
115    Path testDir = TEST_UTIL.getDataTestDirOnTestFS("testSyncTableDoDeletesFalse");
116
117    writeTestData(sourceTableName, targetTableName);
118    hashSourceTable(sourceTableName, testDir);
119    Counters syncCounters = syncTables(sourceTableName, targetTableName,
120        testDir, "--doDeletes=false");
121    assertTargetDoDeletesFalse(100, sourceTableName, targetTableName);
122
123    assertEquals(60, syncCounters.findCounter(Counter.ROWSWITHDIFFS).getValue());
124    assertEquals(10, syncCounters.findCounter(Counter.SOURCEMISSINGROWS).getValue());
125    assertEquals(10, syncCounters.findCounter(Counter.TARGETMISSINGROWS).getValue());
126    assertEquals(50, syncCounters.findCounter(Counter.SOURCEMISSINGCELLS).getValue());
127    assertEquals(50, syncCounters.findCounter(Counter.TARGETMISSINGCELLS).getValue());
128    assertEquals(20, syncCounters.findCounter(Counter.DIFFERENTCELLVALUES).getValue());
129
130    TEST_UTIL.deleteTable(sourceTableName);
131    TEST_UTIL.deleteTable(targetTableName);
132  }
133
134  @Test
135  public void testSyncTableDoPutsFalse() throws Exception {
136    final TableName sourceTableName = TableName.valueOf(name.getMethodName() + "_source");
137    final TableName targetTableName = TableName.valueOf(name.getMethodName() + "_target");
138    Path testDir = TEST_UTIL.getDataTestDirOnTestFS("testSyncTableDoPutsFalse");
139
140    writeTestData(sourceTableName, targetTableName);
141    hashSourceTable(sourceTableName, testDir);
142    Counters syncCounters = syncTables(sourceTableName, targetTableName,
143        testDir, "--doPuts=false");
144    assertTargetDoPutsFalse(70, sourceTableName, targetTableName);
145
146    assertEquals(60, syncCounters.findCounter(Counter.ROWSWITHDIFFS).getValue());
147    assertEquals(10, syncCounters.findCounter(Counter.SOURCEMISSINGROWS).getValue());
148    assertEquals(10, syncCounters.findCounter(Counter.TARGETMISSINGROWS).getValue());
149    assertEquals(50, syncCounters.findCounter(Counter.SOURCEMISSINGCELLS).getValue());
150    assertEquals(50, syncCounters.findCounter(Counter.TARGETMISSINGCELLS).getValue());
151    assertEquals(20, syncCounters.findCounter(Counter.DIFFERENTCELLVALUES).getValue());
152
153    TEST_UTIL.deleteTable(sourceTableName);
154    TEST_UTIL.deleteTable(targetTableName);
155  }
156
157  @Test
158  public void testSyncTableIgnoreTimestampsTrue() throws Exception {
159    final TableName sourceTableName = TableName.valueOf(name.getMethodName() + "_source");
160    final TableName targetTableName = TableName.valueOf(name.getMethodName() + "_target");
161    Path testDir = TEST_UTIL.getDataTestDirOnTestFS("testSyncTableIgnoreTimestampsTrue");
162    long current = System.currentTimeMillis();
163    writeTestData(sourceTableName, targetTableName, current - 1000, current);
164    hashSourceTable(sourceTableName, testDir, "--ignoreTimestamps=true");
165    Counters syncCounters = syncTables(sourceTableName, targetTableName,
166      testDir, "--ignoreTimestamps=true");
167    assertEqualTables(90, sourceTableName, targetTableName, true);
168
169    assertEquals(50, syncCounters.findCounter(Counter.ROWSWITHDIFFS).getValue());
170    assertEquals(10, syncCounters.findCounter(Counter.SOURCEMISSINGROWS).getValue());
171    assertEquals(10, syncCounters.findCounter(Counter.TARGETMISSINGROWS).getValue());
172    assertEquals(30, syncCounters.findCounter(Counter.SOURCEMISSINGCELLS).getValue());
173    assertEquals(30, syncCounters.findCounter(Counter.TARGETMISSINGCELLS).getValue());
174    assertEquals(20, syncCounters.findCounter(Counter.DIFFERENTCELLVALUES).getValue());
175
176    TEST_UTIL.deleteTable(sourceTableName);
177    TEST_UTIL.deleteTable(targetTableName);
178  }
179
180  private void assertEqualTables(int expectedRows, TableName sourceTableName,
181      TableName targetTableName, boolean ignoreTimestamps) throws Exception {
182    Table sourceTable = TEST_UTIL.getConnection().getTable(sourceTableName);
183    Table targetTable = TEST_UTIL.getConnection().getTable(targetTableName);
184
185    ResultScanner sourceScanner = sourceTable.getScanner(new Scan());
186    ResultScanner targetScanner = targetTable.getScanner(new Scan());
187
188    for (int i = 0; i < expectedRows; i++) {
189      Result sourceRow = sourceScanner.next();
190      Result targetRow = targetScanner.next();
191
192      LOG.debug("SOURCE row: " + (sourceRow == null ? "null" : Bytes.toInt(sourceRow.getRow()))
193          + " cells:" + sourceRow);
194      LOG.debug("TARGET row: " + (targetRow == null ? "null" : Bytes.toInt(targetRow.getRow()))
195          + " cells:" + targetRow);
196
197      if (sourceRow == null) {
198        Assert.fail("Expected " + expectedRows
199            + " source rows but only found " + i);
200      }
201      if (targetRow == null) {
202        Assert.fail("Expected " + expectedRows
203            + " target rows but only found " + i);
204      }
205      Cell[] sourceCells = sourceRow.rawCells();
206      Cell[] targetCells = targetRow.rawCells();
207      if (sourceCells.length != targetCells.length) {
208        LOG.debug("Source cells: " + Arrays.toString(sourceCells));
209        LOG.debug("Target cells: " + Arrays.toString(targetCells));
210        Assert.fail("Row " + Bytes.toInt(sourceRow.getRow())
211            + " has " + sourceCells.length
212            + " cells in source table but " + targetCells.length
213            + " cells in target table");
214      }
215      for (int j = 0; j < sourceCells.length; j++) {
216        Cell sourceCell = sourceCells[j];
217        Cell targetCell = targetCells[j];
218        try {
219          if (!CellUtil.matchingRows(sourceCell, targetCell)) {
220            Assert.fail("Rows don't match");
221          }
222          if (!CellUtil.matchingFamily(sourceCell, targetCell)) {
223            Assert.fail("Families don't match");
224          }
225          if (!CellUtil.matchingQualifier(sourceCell, targetCell)) {
226            Assert.fail("Qualifiers don't match");
227          }
228          if (!ignoreTimestamps && !CellUtil.matchingTimestamp(sourceCell, targetCell)) {
229            Assert.fail("Timestamps don't match");
230          }
231          if (!CellUtil.matchingValue(sourceCell, targetCell)) {
232            Assert.fail("Values don't match");
233          }
234        } catch (Throwable t) {
235          LOG.debug("Source cell: " + sourceCell + " target cell: " + targetCell);
236          Throwables.propagate(t);
237        }
238      }
239    }
240    Result sourceRow = sourceScanner.next();
241    if (sourceRow != null) {
242      Assert.fail("Source table has more than " + expectedRows
243          + " rows.  Next row: " + Bytes.toInt(sourceRow.getRow()));
244    }
245    Result targetRow = targetScanner.next();
246    if (targetRow != null) {
247      Assert.fail("Target table has more than " + expectedRows
248          + " rows.  Next row: " + Bytes.toInt(targetRow.getRow()));
249    }
250    sourceScanner.close();
251    targetScanner.close();
252    sourceTable.close();
253    targetTable.close();
254  }
255
256  private void assertTargetDoDeletesFalse(int expectedRows, TableName sourceTableName,
257      TableName targetTableName) throws Exception {
258    Table sourceTable = TEST_UTIL.getConnection().getTable(sourceTableName);
259    Table targetTable = TEST_UTIL.getConnection().getTable(targetTableName);
260
261    ResultScanner sourceScanner = sourceTable.getScanner(new Scan());
262    ResultScanner targetScanner = targetTable.getScanner(new Scan());
263    Result targetRow = targetScanner.next();
264    Result sourceRow = sourceScanner.next();
265    int rowsCount = 0;
266    while (targetRow != null) {
267      rowsCount++;
268      //only compares values for existing rows, skipping rows existing on
269      //target only that were not deleted given --doDeletes=false
270      if (Bytes.toInt(sourceRow.getRow()) != Bytes.toInt(targetRow.getRow())) {
271        targetRow = targetScanner.next();
272        continue;
273      }
274
275      LOG.debug("SOURCE row: " + (sourceRow == null ? "null"
276          : Bytes.toInt(sourceRow.getRow()))
277          + " cells:" + sourceRow);
278      LOG.debug("TARGET row: " + (targetRow == null ? "null"
279          : Bytes.toInt(targetRow.getRow()))
280          + " cells:" + targetRow);
281
282      Cell[] sourceCells = sourceRow.rawCells();
283      Cell[] targetCells = targetRow.rawCells();
284      int targetRowKey = Bytes.toInt(targetRow.getRow());
285      if (targetRowKey >= 70 && targetRowKey < 80) {
286        if (sourceCells.length == targetCells.length) {
287          LOG.debug("Source cells: " + Arrays.toString(sourceCells));
288          LOG.debug("Target cells: " + Arrays.toString(targetCells));
289          Assert.fail("Row " + targetRowKey + " should have more cells in "
290              + "target than in source");
291        }
292
293      } else {
294        if (sourceCells.length != targetCells.length) {
295          LOG.debug("Source cells: " + Arrays.toString(sourceCells));
296          LOG.debug("Target cells: " + Arrays.toString(targetCells));
297          Assert.fail("Row " + Bytes.toInt(sourceRow.getRow())
298              + " has " + sourceCells.length
299              + " cells in source table but " + targetCells.length
300              + " cells in target table");
301        }
302      }
303      for (int j = 0; j < sourceCells.length; j++) {
304        Cell sourceCell = sourceCells[j];
305        Cell targetCell = targetCells[j];
306        try {
307          if (!CellUtil.matchingRow(sourceCell, targetCell)) {
308            Assert.fail("Rows don't match");
309          }
310          if (!CellUtil.matchingFamily(sourceCell, targetCell)) {
311            Assert.fail("Families don't match");
312          }
313          if (!CellUtil.matchingQualifier(sourceCell, targetCell)) {
314            Assert.fail("Qualifiers don't match");
315          }
316          if (targetRowKey < 80 && targetRowKey >= 90){
317            if (!CellUtil.matchingTimestamp(sourceCell, targetCell)) {
318              Assert.fail("Timestamps don't match");
319            }
320          }
321          if (!CellUtil.matchingValue(sourceCell, targetCell)) {
322            Assert.fail("Values don't match");
323          }
324        } catch (Throwable t) {
325          LOG.debug("Source cell: " + sourceCell + " target cell: "
326              + targetCell);
327          Throwables.propagate(t);
328        }
329      }
330      targetRow = targetScanner.next();
331      sourceRow = sourceScanner.next();
332    }
333    assertEquals("Target expected rows does not match.",expectedRows,
334        rowsCount);
335    sourceScanner.close();
336    targetScanner.close();
337    sourceTable.close();
338    targetTable.close();
339  }
340
341  private void assertTargetDoPutsFalse(int expectedRows, TableName sourceTableName,
342      TableName targetTableName) throws Exception {
343    Table sourceTable = TEST_UTIL.getConnection().getTable(sourceTableName);
344    Table targetTable = TEST_UTIL.getConnection().getTable(targetTableName);
345
346    ResultScanner sourceScanner = sourceTable.getScanner(new Scan());
347    ResultScanner targetScanner = targetTable.getScanner(new Scan());
348    Result targetRow = targetScanner.next();
349    Result sourceRow = sourceScanner.next();
350    int rowsCount = 0;
351
352    while (targetRow!=null) {
353      //only compares values for existing rows, skipping rows existing on
354      //source only that were not added to target given --doPuts=false
355      if (Bytes.toInt(sourceRow.getRow()) != Bytes.toInt(targetRow.getRow())) {
356        sourceRow = sourceScanner.next();
357        continue;
358      }
359
360      LOG.debug("SOURCE row: " + (sourceRow == null ?
361          "null" :
362          Bytes.toInt(sourceRow.getRow()))
363          + " cells:" + sourceRow);
364      LOG.debug("TARGET row: " + (targetRow == null ?
365          "null" :
366          Bytes.toInt(targetRow.getRow()))
367          + " cells:" + targetRow);
368
369      LOG.debug("rowsCount: " + rowsCount);
370
371      Cell[] sourceCells = sourceRow.rawCells();
372      Cell[] targetCells = targetRow.rawCells();
373      int targetRowKey = Bytes.toInt(targetRow.getRow());
374      if (targetRowKey >= 40 && targetRowKey < 60) {
375        LOG.debug("Source cells: " + Arrays.toString(sourceCells));
376        LOG.debug("Target cells: " + Arrays.toString(targetCells));
377        Assert.fail("There shouldn't exist any rows between 40 and 60, since "
378            + "Puts are disabled and Deletes are enabled.");
379      } else if (targetRowKey >= 60 && targetRowKey < 70) {
380        if (sourceCells.length == targetCells.length) {
381          LOG.debug("Source cells: " + Arrays.toString(sourceCells));
382          LOG.debug("Target cells: " + Arrays.toString(targetCells));
383          Assert.fail("Row " + Bytes.toInt(sourceRow.getRow())
384              + " shouldn't have same number of cells.");
385        }
386      } else if (targetRowKey >= 80 && targetRowKey < 90) {
387        LOG.debug("Source cells: " + Arrays.toString(sourceCells));
388        LOG.debug("Target cells: " + Arrays.toString(targetCells));
389        Assert.fail("There should be no rows between 80 and 90 on target, as "
390            + "these had different timestamps and should had been deleted.");
391      } else if (targetRowKey >= 90 && targetRowKey < 100) {
392        for (int j = 0; j < sourceCells.length; j++) {
393          Cell sourceCell = sourceCells[j];
394          Cell targetCell = targetCells[j];
395          if (CellUtil.matchingValue(sourceCell, targetCell)) {
396            Assert.fail("Cells values should not match for rows between "
397                + "90 and 100. Target row id: " + (Bytes.toInt(targetRow
398                .getRow())));
399          }
400        }
401      } else {
402        for (int j = 0; j < sourceCells.length; j++) {
403          Cell sourceCell = sourceCells[j];
404          Cell targetCell = targetCells[j];
405          try {
406            if (!CellUtil.matchingRow(sourceCell, targetCell)) {
407              Assert.fail("Rows don't match");
408            }
409            if (!CellUtil.matchingFamily(sourceCell, targetCell)) {
410              Assert.fail("Families don't match");
411            }
412            if (!CellUtil.matchingQualifier(sourceCell, targetCell)) {
413              Assert.fail("Qualifiers don't match");
414            }
415            if (!CellUtil.matchingTimestamp(sourceCell, targetCell)) {
416              Assert.fail("Timestamps don't match");
417            }
418            if (!CellUtil.matchingValue(sourceCell, targetCell)) {
419              Assert.fail("Values don't match");
420            }
421          } catch (Throwable t) {
422            LOG.debug(
423                "Source cell: " + sourceCell + " target cell: " + targetCell);
424            Throwables.propagate(t);
425          }
426        }
427      }
428      rowsCount++;
429      targetRow = targetScanner.next();
430      sourceRow = sourceScanner.next();
431    }
432    assertEquals("Target expected rows does not match.",expectedRows,
433        rowsCount);
434    sourceScanner.close();
435    targetScanner.close();
436    sourceTable.close();
437    targetTable.close();
438  }
439
440  private Counters syncTables(TableName sourceTableName, TableName targetTableName,
441      Path testDir, String... options) throws Exception {
442    SyncTable syncTable = new SyncTable(TEST_UTIL.getConfiguration());
443    String[] args = Arrays.copyOf(options, options.length+3);
444    args[options.length] = testDir.toString();
445    args[options.length+1] = sourceTableName.getNameAsString();
446    args[options.length+2] = targetTableName.getNameAsString();
447    int code = syncTable.run(args);
448    assertEquals("sync table job failed", 0, code);
449
450    LOG.info("Sync tables completed");
451    return syncTable.counters;
452  }
453
454  private void hashSourceTable(TableName sourceTableName, Path testDir, String... options)
455      throws Exception {
456    int numHashFiles = 3;
457    long batchSize = 100;  // should be 2 batches per region
458    int scanBatch = 1;
459    HashTable hashTable = new HashTable(TEST_UTIL.getConfiguration());
460    String[] args = Arrays.copyOf(options, options.length+5);
461    args[options.length] = "--batchsize=" + batchSize;
462    args[options.length + 1] = "--numhashfiles=" + numHashFiles;
463    args[options.length + 2] = "--scanbatch=" + scanBatch;
464    args[options.length + 3] = sourceTableName.getNameAsString();
465    args[options.length + 4] = testDir.toString();
466    int code = hashTable.run(args);
467    assertEquals("hash table job failed", 0, code);
468
469    FileSystem fs = TEST_UTIL.getTestFileSystem();
470
471    HashTable.TableHash tableHash = HashTable.TableHash.read(fs.getConf(), testDir);
472    assertEquals(sourceTableName.getNameAsString(), tableHash.tableName);
473    assertEquals(batchSize, tableHash.batchSize);
474    assertEquals(numHashFiles, tableHash.numHashFiles);
475    assertEquals(numHashFiles - 1, tableHash.partitions.size());
476
477    LOG.info("Hash table completed");
478  }
479
480  private void writeTestData(TableName sourceTableName, TableName targetTableName,
481      long... timestamps) throws Exception {
482    final byte[] family = Bytes.toBytes("family");
483    final byte[] column1 = Bytes.toBytes("c1");
484    final byte[] column2 = Bytes.toBytes("c2");
485    final byte[] value1 = Bytes.toBytes("val1");
486    final byte[] value2 = Bytes.toBytes("val2");
487    final byte[] value3 = Bytes.toBytes("val3");
488
489    int numRows = 100;
490    int sourceRegions = 10;
491    int targetRegions = 6;
492    if (ArrayUtils.isEmpty(timestamps)) {
493      long current = System.currentTimeMillis();
494      timestamps = new long[]{current,current};
495    }
496
497    Table sourceTable = TEST_UTIL.createTable(sourceTableName,
498        family, generateSplits(numRows, sourceRegions));
499
500    Table targetTable = TEST_UTIL.createTable(targetTableName,
501        family, generateSplits(numRows, targetRegions));
502
503    int rowIndex = 0;
504    // a bunch of identical rows
505    for (; rowIndex < 40; rowIndex++) {
506      Put sourcePut = new Put(Bytes.toBytes(rowIndex));
507      sourcePut.addColumn(family, column1, timestamps[0], value1);
508      sourcePut.addColumn(family, column2, timestamps[0], value2);
509      sourceTable.put(sourcePut);
510
511      Put targetPut = new Put(Bytes.toBytes(rowIndex));
512      targetPut.addColumn(family, column1, timestamps[1], value1);
513      targetPut.addColumn(family, column2, timestamps[1], value2);
514      targetTable.put(targetPut);
515    }
516    // some rows only in the source table
517    // ROWSWITHDIFFS: 10
518    // TARGETMISSINGROWS: 10
519    // TARGETMISSINGCELLS: 20
520    for (; rowIndex < 50; rowIndex++) {
521      Put put = new Put(Bytes.toBytes(rowIndex));
522      put.addColumn(family, column1, timestamps[0], value1);
523      put.addColumn(family, column2, timestamps[0], value2);
524      sourceTable.put(put);
525    }
526    // some rows only in the target table
527    // ROWSWITHDIFFS: 10
528    // SOURCEMISSINGROWS: 10
529    // SOURCEMISSINGCELLS: 20
530    for (; rowIndex < 60; rowIndex++) {
531      Put put = new Put(Bytes.toBytes(rowIndex));
532      put.addColumn(family, column1, timestamps[1], value1);
533      put.addColumn(family, column2, timestamps[1], value2);
534      targetTable.put(put);
535    }
536    // some rows with 1 missing cell in target table
537    // ROWSWITHDIFFS: 10
538    // TARGETMISSINGCELLS: 10
539    for (; rowIndex < 70; rowIndex++) {
540      Put sourcePut = new Put(Bytes.toBytes(rowIndex));
541      sourcePut.addColumn(family, column1, timestamps[0], value1);
542      sourcePut.addColumn(family, column2, timestamps[0], value2);
543      sourceTable.put(sourcePut);
544
545      Put targetPut = new Put(Bytes.toBytes(rowIndex));
546      targetPut.addColumn(family, column1, timestamps[1], value1);
547      targetTable.put(targetPut);
548    }
549    // some rows with 1 missing cell in source table
550    // ROWSWITHDIFFS: 10
551    // SOURCEMISSINGCELLS: 10
552    for (; rowIndex < 80; rowIndex++) {
553      Put sourcePut = new Put(Bytes.toBytes(rowIndex));
554      sourcePut.addColumn(family, column1, timestamps[0], value1);
555      sourceTable.put(sourcePut);
556
557      Put targetPut = new Put(Bytes.toBytes(rowIndex));
558      targetPut.addColumn(family, column1, timestamps[1], value1);
559      targetPut.addColumn(family, column2, timestamps[1], value2);
560      targetTable.put(targetPut);
561    }
562    // some rows differing only in timestamp
563    // ROWSWITHDIFFS: 10
564    // SOURCEMISSINGCELLS: 20
565    // TARGETMISSINGCELLS: 20
566    for (; rowIndex < 90; rowIndex++) {
567      Put sourcePut = new Put(Bytes.toBytes(rowIndex));
568      sourcePut.addColumn(family, column1, timestamps[0], column1);
569      sourcePut.addColumn(family, column2, timestamps[0], value2);
570      sourceTable.put(sourcePut);
571
572      Put targetPut = new Put(Bytes.toBytes(rowIndex));
573      targetPut.addColumn(family, column1, timestamps[1]+1, column1);
574      targetPut.addColumn(family, column2, timestamps[1]-1, value2);
575      targetTable.put(targetPut);
576    }
577    // some rows with different values
578    // ROWSWITHDIFFS: 10
579    // DIFFERENTCELLVALUES: 20
580    for (; rowIndex < numRows; rowIndex++) {
581      Put sourcePut = new Put(Bytes.toBytes(rowIndex));
582      sourcePut.addColumn(family, column1, timestamps[0], value1);
583      sourcePut.addColumn(family, column2, timestamps[0], value2);
584      sourceTable.put(sourcePut);
585
586      Put targetPut = new Put(Bytes.toBytes(rowIndex));
587      targetPut.addColumn(family, column1, timestamps[1], value3);
588      targetPut.addColumn(family, column2, timestamps[1], value3);
589      targetTable.put(targetPut);
590    }
591
592    sourceTable.close();
593    targetTable.close();
594  }
595}