001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import static org.junit.Assert.assertEquals;
021
022import java.util.Arrays;
023import org.apache.hadoop.fs.FileSystem;
024import org.apache.hadoop.fs.Path;
025import org.apache.hadoop.hbase.Cell;
026import org.apache.hadoop.hbase.CellUtil;
027import org.apache.hadoop.hbase.HBaseClassTestRule;
028import org.apache.hadoop.hbase.HBaseTestingUtility;
029import org.apache.hadoop.hbase.TableName;
030import org.apache.hadoop.hbase.client.Put;
031import org.apache.hadoop.hbase.client.Result;
032import org.apache.hadoop.hbase.client.ResultScanner;
033import org.apache.hadoop.hbase.client.Scan;
034import org.apache.hadoop.hbase.client.Table;
035import org.apache.hadoop.hbase.mapreduce.SyncTable.SyncMapper.Counter;
036import org.apache.hadoop.hbase.testclassification.LargeTests;
037import org.apache.hadoop.hbase.util.Bytes;
038import org.apache.hadoop.mapreduce.Counters;
039import org.junit.AfterClass;
040import org.junit.Assert;
041import org.junit.BeforeClass;
042import org.junit.ClassRule;
043import org.junit.Rule;
044import org.junit.Test;
045import org.junit.experimental.categories.Category;
046import org.junit.rules.TestName;
047import org.slf4j.Logger;
048import org.slf4j.LoggerFactory;
049
050import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
051
052/**
053 * Basic test for the SyncTable M/R tool
054 */
055@Category(LargeTests.class)
056public class TestSyncTable {
057  @ClassRule
058  public static final HBaseClassTestRule CLASS_RULE =
059      HBaseClassTestRule.forClass(TestSyncTable.class);
060
061  private static final Logger LOG = LoggerFactory.getLogger(TestSyncTable.class);
062
063  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
064
065  @Rule
066  public TestName name = new TestName();
067
068  @BeforeClass
069  public static void beforeClass() throws Exception {
070    TEST_UTIL.startMiniCluster(3);
071  }
072
073  @AfterClass
074  public static void afterClass() throws Exception {
075    TEST_UTIL.cleanupDataTestDirOnTestFS();
076    TEST_UTIL.shutdownMiniCluster();
077  }
078
079  private static byte[][] generateSplits(int numRows, int numRegions) {
080    byte[][] splitRows = new byte[numRegions-1][];
081    for (int i = 1; i < numRegions; i++) {
082      splitRows[i-1] = Bytes.toBytes(numRows * i / numRegions);
083    }
084    return splitRows;
085  }
086
087  @Test
088  public void testSyncTable() throws Exception {
089    final TableName sourceTableName = TableName.valueOf(name.getMethodName() + "_source");
090    final TableName targetTableName = TableName.valueOf(name.getMethodName() + "_target");
091    Path testDir = TEST_UTIL.getDataTestDirOnTestFS("testSyncTable");
092
093    writeTestData(sourceTableName, targetTableName);
094    hashSourceTable(sourceTableName, testDir);
095    Counters syncCounters = syncTables(sourceTableName, targetTableName, testDir);
096    assertEqualTables(90, sourceTableName, targetTableName);
097
098    assertEquals(60, syncCounters.findCounter(Counter.ROWSWITHDIFFS).getValue());
099    assertEquals(10, syncCounters.findCounter(Counter.SOURCEMISSINGROWS).getValue());
100    assertEquals(10, syncCounters.findCounter(Counter.TARGETMISSINGROWS).getValue());
101    assertEquals(50, syncCounters.findCounter(Counter.SOURCEMISSINGCELLS).getValue());
102    assertEquals(50, syncCounters.findCounter(Counter.TARGETMISSINGCELLS).getValue());
103    assertEquals(20, syncCounters.findCounter(Counter.DIFFERENTCELLVALUES).getValue());
104
105    TEST_UTIL.deleteTable(sourceTableName);
106    TEST_UTIL.deleteTable(targetTableName);
107  }
108
109  @Test
110  public void testSyncTableDoDeletesFalse() throws Exception {
111    final TableName sourceTableName = TableName.valueOf(name.getMethodName() + "_source");
112    final TableName targetTableName = TableName.valueOf(name.getMethodName() + "_target");
113    Path testDir = TEST_UTIL.getDataTestDirOnTestFS("testSyncTableDoDeletesFalse");
114
115    writeTestData(sourceTableName, targetTableName);
116    hashSourceTable(sourceTableName, testDir);
117    Counters syncCounters = syncTables(sourceTableName, targetTableName,
118        testDir, "--doDeletes=false");
119    assertTargetDoDeletesFalse(100, sourceTableName, targetTableName);
120
121    assertEquals(60, syncCounters.findCounter(Counter.ROWSWITHDIFFS).getValue());
122    assertEquals(10, syncCounters.findCounter(Counter.SOURCEMISSINGROWS).getValue());
123    assertEquals(10, syncCounters.findCounter(Counter.TARGETMISSINGROWS).getValue());
124    assertEquals(50, syncCounters.findCounter(Counter.SOURCEMISSINGCELLS).getValue());
125    assertEquals(50, syncCounters.findCounter(Counter.TARGETMISSINGCELLS).getValue());
126    assertEquals(20, syncCounters.findCounter(Counter.DIFFERENTCELLVALUES).getValue());
127
128    TEST_UTIL.deleteTable(sourceTableName);
129    TEST_UTIL.deleteTable(targetTableName);
130  }
131
132  @Test
133  public void testSyncTableDoPutsFalse() throws Exception {
134    final TableName sourceTableName = TableName.valueOf(name.getMethodName() + "_source");
135    final TableName targetTableName = TableName.valueOf(name.getMethodName() + "_target");
136    Path testDir = TEST_UTIL.getDataTestDirOnTestFS("testSyncTableDoPutsFalse");
137
138    writeTestData(sourceTableName, targetTableName);
139    hashSourceTable(sourceTableName, testDir);
140    Counters syncCounters = syncTables(sourceTableName, targetTableName,
141        testDir, "--doPuts=false");
142    assertTargetDoPutsFalse(70, sourceTableName, targetTableName);
143
144    assertEquals(60, syncCounters.findCounter(Counter.ROWSWITHDIFFS).getValue());
145    assertEquals(10, syncCounters.findCounter(Counter.SOURCEMISSINGROWS).getValue());
146    assertEquals(10, syncCounters.findCounter(Counter.TARGETMISSINGROWS).getValue());
147    assertEquals(50, syncCounters.findCounter(Counter.SOURCEMISSINGCELLS).getValue());
148    assertEquals(50, syncCounters.findCounter(Counter.TARGETMISSINGCELLS).getValue());
149    assertEquals(20, syncCounters.findCounter(Counter.DIFFERENTCELLVALUES).getValue());
150
151    TEST_UTIL.deleteTable(sourceTableName);
152    TEST_UTIL.deleteTable(targetTableName);
153  }
154
155  private void assertEqualTables(int expectedRows, TableName sourceTableName,
156      TableName targetTableName) throws Exception {
157    Table sourceTable = TEST_UTIL.getConnection().getTable(sourceTableName);
158    Table targetTable = TEST_UTIL.getConnection().getTable(targetTableName);
159
160    ResultScanner sourceScanner = sourceTable.getScanner(new Scan());
161    ResultScanner targetScanner = targetTable.getScanner(new Scan());
162
163    for (int i = 0; i < expectedRows; i++) {
164      Result sourceRow = sourceScanner.next();
165      Result targetRow = targetScanner.next();
166
167      LOG.debug("SOURCE row: " + (sourceRow == null ? "null" : Bytes.toInt(sourceRow.getRow()))
168          + " cells:" + sourceRow);
169      LOG.debug("TARGET row: " + (targetRow == null ? "null" : Bytes.toInt(targetRow.getRow()))
170          + " cells:" + targetRow);
171
172      if (sourceRow == null) {
173        Assert.fail("Expected " + expectedRows
174            + " source rows but only found " + i);
175      }
176      if (targetRow == null) {
177        Assert.fail("Expected " + expectedRows
178            + " target rows but only found " + i);
179      }
180      Cell[] sourceCells = sourceRow.rawCells();
181      Cell[] targetCells = targetRow.rawCells();
182      if (sourceCells.length != targetCells.length) {
183        LOG.debug("Source cells: " + Arrays.toString(sourceCells));
184        LOG.debug("Target cells: " + Arrays.toString(targetCells));
185        Assert.fail("Row " + Bytes.toInt(sourceRow.getRow())
186            + " has " + sourceCells.length
187            + " cells in source table but " + targetCells.length
188            + " cells in target table");
189      }
190      for (int j = 0; j < sourceCells.length; j++) {
191        Cell sourceCell = sourceCells[j];
192        Cell targetCell = targetCells[j];
193        try {
194          if (!CellUtil.matchingRows(sourceCell, targetCell)) {
195            Assert.fail("Rows don't match");
196          }
197          if (!CellUtil.matchingFamily(sourceCell, targetCell)) {
198            Assert.fail("Families don't match");
199          }
200          if (!CellUtil.matchingQualifier(sourceCell, targetCell)) {
201            Assert.fail("Qualifiers don't match");
202          }
203          if (!CellUtil.matchingTimestamp(sourceCell, targetCell)) {
204            Assert.fail("Timestamps don't match");
205          }
206          if (!CellUtil.matchingValue(sourceCell, targetCell)) {
207            Assert.fail("Values don't match");
208          }
209        } catch (Throwable t) {
210          LOG.debug("Source cell: " + sourceCell + " target cell: " + targetCell);
211          Throwables.propagate(t);
212        }
213      }
214    }
215    Result sourceRow = sourceScanner.next();
216    if (sourceRow != null) {
217      Assert.fail("Source table has more than " + expectedRows
218          + " rows.  Next row: " + Bytes.toInt(sourceRow.getRow()));
219    }
220    Result targetRow = targetScanner.next();
221    if (targetRow != null) {
222      Assert.fail("Target table has more than " + expectedRows
223          + " rows.  Next row: " + Bytes.toInt(targetRow.getRow()));
224    }
225    sourceScanner.close();
226    targetScanner.close();
227    sourceTable.close();
228    targetTable.close();
229  }
230
231  private void assertTargetDoDeletesFalse(int expectedRows, TableName sourceTableName,
232      TableName targetTableName) throws Exception {
233    Table sourceTable = TEST_UTIL.getConnection().getTable(sourceTableName);
234    Table targetTable = TEST_UTIL.getConnection().getTable(targetTableName);
235
236    ResultScanner sourceScanner = sourceTable.getScanner(new Scan());
237    ResultScanner targetScanner = targetTable.getScanner(new Scan());
238    Result targetRow = targetScanner.next();
239    Result sourceRow = sourceScanner.next();
240    int rowsCount = 0;
241    while (targetRow != null) {
242      rowsCount++;
243      //only compares values for existing rows, skipping rows existing on
244      //target only that were not deleted given --doDeletes=false
245      if (Bytes.toInt(sourceRow.getRow()) != Bytes.toInt(targetRow.getRow())) {
246        targetRow = targetScanner.next();
247        continue;
248      }
249
250      LOG.debug("SOURCE row: " + (sourceRow == null ? "null"
251          : Bytes.toInt(sourceRow.getRow()))
252          + " cells:" + sourceRow);
253      LOG.debug("TARGET row: " + (targetRow == null ? "null"
254          : Bytes.toInt(targetRow.getRow()))
255          + " cells:" + targetRow);
256
257      Cell[] sourceCells = sourceRow.rawCells();
258      Cell[] targetCells = targetRow.rawCells();
259      int targetRowKey = Bytes.toInt(targetRow.getRow());
260      if (targetRowKey >= 70 && targetRowKey < 80) {
261        if (sourceCells.length == targetCells.length) {
262          LOG.debug("Source cells: " + Arrays.toString(sourceCells));
263          LOG.debug("Target cells: " + Arrays.toString(targetCells));
264          Assert.fail("Row " + targetRowKey + " should have more cells in "
265              + "target than in source");
266        }
267
268      } else {
269        if (sourceCells.length != targetCells.length) {
270          LOG.debug("Source cells: " + Arrays.toString(sourceCells));
271          LOG.debug("Target cells: " + Arrays.toString(targetCells));
272          Assert.fail("Row " + Bytes.toInt(sourceRow.getRow())
273              + " has " + sourceCells.length
274              + " cells in source table but " + targetCells.length
275              + " cells in target table");
276        }
277      }
278      for (int j = 0; j < sourceCells.length; j++) {
279        Cell sourceCell = sourceCells[j];
280        Cell targetCell = targetCells[j];
281        try {
282          if (!CellUtil.matchingRow(sourceCell, targetCell)) {
283            Assert.fail("Rows don't match");
284          }
285          if (!CellUtil.matchingFamily(sourceCell, targetCell)) {
286            Assert.fail("Families don't match");
287          }
288          if (!CellUtil.matchingQualifier(sourceCell, targetCell)) {
289            Assert.fail("Qualifiers don't match");
290          }
291          if (targetRowKey < 80 && targetRowKey >= 90){
292            if (!CellUtil.matchingTimestamp(sourceCell, targetCell)) {
293              Assert.fail("Timestamps don't match");
294            }
295          }
296          if (!CellUtil.matchingValue(sourceCell, targetCell)) {
297            Assert.fail("Values don't match");
298          }
299        } catch (Throwable t) {
300          LOG.debug("Source cell: " + sourceCell + " target cell: "
301              + targetCell);
302          Throwables.propagate(t);
303        }
304      }
305      targetRow = targetScanner.next();
306      sourceRow = sourceScanner.next();
307    }
308    assertEquals("Target expected rows does not match.",expectedRows,
309        rowsCount);
310    sourceScanner.close();
311    targetScanner.close();
312    sourceTable.close();
313    targetTable.close();
314  }
315
316  private void assertTargetDoPutsFalse(int expectedRows, TableName sourceTableName,
317      TableName targetTableName) throws Exception {
318    Table sourceTable = TEST_UTIL.getConnection().getTable(sourceTableName);
319    Table targetTable = TEST_UTIL.getConnection().getTable(targetTableName);
320
321    ResultScanner sourceScanner = sourceTable.getScanner(new Scan());
322    ResultScanner targetScanner = targetTable.getScanner(new Scan());
323    Result targetRow = targetScanner.next();
324    Result sourceRow = sourceScanner.next();
325    int rowsCount = 0;
326
327    while (targetRow!=null) {
328      //only compares values for existing rows, skipping rows existing on
329      //source only that were not added to target given --doPuts=false
330      if (Bytes.toInt(sourceRow.getRow()) != Bytes.toInt(targetRow.getRow())) {
331        sourceRow = sourceScanner.next();
332        continue;
333      }
334
335      LOG.debug("SOURCE row: " + (sourceRow == null ?
336          "null" :
337          Bytes.toInt(sourceRow.getRow()))
338          + " cells:" + sourceRow);
339      LOG.debug("TARGET row: " + (targetRow == null ?
340          "null" :
341          Bytes.toInt(targetRow.getRow()))
342          + " cells:" + targetRow);
343
344      LOG.debug("rowsCount: " + rowsCount);
345
346      Cell[] sourceCells = sourceRow.rawCells();
347      Cell[] targetCells = targetRow.rawCells();
348      int targetRowKey = Bytes.toInt(targetRow.getRow());
349      if (targetRowKey >= 40 && targetRowKey < 60) {
350        LOG.debug("Source cells: " + Arrays.toString(sourceCells));
351        LOG.debug("Target cells: " + Arrays.toString(targetCells));
352        Assert.fail("There shouldn't exist any rows between 40 and 60, since "
353            + "Puts are disabled and Deletes are enabled.");
354      } else if (targetRowKey >= 60 && targetRowKey < 70) {
355        if (sourceCells.length == targetCells.length) {
356          LOG.debug("Source cells: " + Arrays.toString(sourceCells));
357          LOG.debug("Target cells: " + Arrays.toString(targetCells));
358          Assert.fail("Row " + Bytes.toInt(sourceRow.getRow())
359              + " shouldn't have same number of cells.");
360        }
361      } else if (targetRowKey >= 80 && targetRowKey < 90) {
362        LOG.debug("Source cells: " + Arrays.toString(sourceCells));
363        LOG.debug("Target cells: " + Arrays.toString(targetCells));
364        Assert.fail("There should be no rows between 80 and 90 on target, as "
365            + "these had different timestamps and should had been deleted.");
366      } else if (targetRowKey >= 90 && targetRowKey < 100) {
367        for (int j = 0; j < sourceCells.length; j++) {
368          Cell sourceCell = sourceCells[j];
369          Cell targetCell = targetCells[j];
370          if (CellUtil.matchingValue(sourceCell, targetCell)) {
371            Assert.fail("Cells values should not match for rows between "
372                + "90 and 100. Target row id: " + (Bytes.toInt(targetRow
373                .getRow())));
374          }
375        }
376      } else {
377        for (int j = 0; j < sourceCells.length; j++) {
378          Cell sourceCell = sourceCells[j];
379          Cell targetCell = targetCells[j];
380          try {
381            if (!CellUtil.matchingRow(sourceCell, targetCell)) {
382              Assert.fail("Rows don't match");
383            }
384            if (!CellUtil.matchingFamily(sourceCell, targetCell)) {
385              Assert.fail("Families don't match");
386            }
387            if (!CellUtil.matchingQualifier(sourceCell, targetCell)) {
388              Assert.fail("Qualifiers don't match");
389            }
390            if (!CellUtil.matchingTimestamp(sourceCell, targetCell)) {
391              Assert.fail("Timestamps don't match");
392            }
393            if (!CellUtil.matchingValue(sourceCell, targetCell)) {
394              Assert.fail("Values don't match");
395            }
396          } catch (Throwable t) {
397            LOG.debug(
398                "Source cell: " + sourceCell + " target cell: " + targetCell);
399            Throwables.propagate(t);
400          }
401        }
402      }
403      rowsCount++;
404      targetRow = targetScanner.next();
405      sourceRow = sourceScanner.next();
406    }
407    assertEquals("Target expected rows does not match.",expectedRows,
408        rowsCount);
409    sourceScanner.close();
410    targetScanner.close();
411    sourceTable.close();
412    targetTable.close();
413  }
414
415  private Counters syncTables(TableName sourceTableName, TableName targetTableName,
416      Path testDir, String... options) throws Exception {
417    SyncTable syncTable = new SyncTable(TEST_UTIL.getConfiguration());
418    String[] args = Arrays.copyOf(options, options.length+3);
419    args[options.length] = testDir.toString();
420    args[options.length+1] = sourceTableName.getNameAsString();
421    args[options.length+2] = targetTableName.getNameAsString();
422    int code = syncTable.run(args);
423    assertEquals("sync table job failed", 0, code);
424
425    LOG.info("Sync tables completed");
426    return syncTable.counters;
427  }
428
429  private void hashSourceTable(TableName sourceTableName, Path testDir) throws Exception {
430    int numHashFiles = 3;
431    long batchSize = 100;  // should be 2 batches per region
432    int scanBatch = 1;
433    HashTable hashTable = new HashTable(TEST_UTIL.getConfiguration());
434    int code = hashTable.run(new String[] {
435      "--batchsize=" + batchSize,
436      "--numhashfiles=" + numHashFiles,
437      "--scanbatch=" + scanBatch,
438      sourceTableName.getNameAsString(),
439      testDir.toString()
440    });
441    assertEquals("hash table job failed", 0, code);
442
443    FileSystem fs = TEST_UTIL.getTestFileSystem();
444
445    HashTable.TableHash tableHash = HashTable.TableHash.read(fs.getConf(), testDir);
446    assertEquals(sourceTableName.getNameAsString(), tableHash.tableName);
447    assertEquals(batchSize, tableHash.batchSize);
448    assertEquals(numHashFiles, tableHash.numHashFiles);
449    assertEquals(numHashFiles - 1, tableHash.partitions.size());
450
451    LOG.info("Hash table completed");
452  }
453
454  private void writeTestData(TableName sourceTableName, TableName targetTableName)
455      throws Exception {
456    final byte[] family = Bytes.toBytes("family");
457    final byte[] column1 = Bytes.toBytes("c1");
458    final byte[] column2 = Bytes.toBytes("c2");
459    final byte[] value1 = Bytes.toBytes("val1");
460    final byte[] value2 = Bytes.toBytes("val2");
461    final byte[] value3 = Bytes.toBytes("val3");
462
463    int numRows = 100;
464    int sourceRegions = 10;
465    int targetRegions = 6;
466
467    Table sourceTable = TEST_UTIL.createTable(sourceTableName,
468        family, generateSplits(numRows, sourceRegions));
469
470    Table targetTable = TEST_UTIL.createTable(targetTableName,
471        family, generateSplits(numRows, targetRegions));
472
473    long timestamp = 1430764183454L;
474
475    int rowIndex = 0;
476    // a bunch of identical rows
477    for (; rowIndex < 40; rowIndex++) {
478      Put sourcePut = new Put(Bytes.toBytes(rowIndex));
479      sourcePut.addColumn(family, column1, timestamp, value1);
480      sourcePut.addColumn(family, column2, timestamp, value2);
481      sourceTable.put(sourcePut);
482
483      Put targetPut = new Put(Bytes.toBytes(rowIndex));
484      targetPut.addColumn(family, column1, timestamp, value1);
485      targetPut.addColumn(family, column2, timestamp, value2);
486      targetTable.put(targetPut);
487    }
488    // some rows only in the source table
489    // ROWSWITHDIFFS: 10
490    // TARGETMISSINGROWS: 10
491    // TARGETMISSINGCELLS: 20
492    for (; rowIndex < 50; rowIndex++) {
493      Put put = new Put(Bytes.toBytes(rowIndex));
494      put.addColumn(family, column1, timestamp, value1);
495      put.addColumn(family, column2, timestamp, value2);
496      sourceTable.put(put);
497    }
498    // some rows only in the target table
499    // ROWSWITHDIFFS: 10
500    // SOURCEMISSINGROWS: 10
501    // SOURCEMISSINGCELLS: 20
502    for (; rowIndex < 60; rowIndex++) {
503      Put put = new Put(Bytes.toBytes(rowIndex));
504      put.addColumn(family, column1, timestamp, value1);
505      put.addColumn(family, column2, timestamp, value2);
506      targetTable.put(put);
507    }
508    // some rows with 1 missing cell in target table
509    // ROWSWITHDIFFS: 10
510    // TARGETMISSINGCELLS: 10
511    for (; rowIndex < 70; rowIndex++) {
512      Put sourcePut = new Put(Bytes.toBytes(rowIndex));
513      sourcePut.addColumn(family, column1, timestamp, value1);
514      sourcePut.addColumn(family, column2, timestamp, value2);
515      sourceTable.put(sourcePut);
516
517      Put targetPut = new Put(Bytes.toBytes(rowIndex));
518      targetPut.addColumn(family, column1, timestamp, value1);
519      targetTable.put(targetPut);
520    }
521    // some rows with 1 missing cell in source table
522    // ROWSWITHDIFFS: 10
523    // SOURCEMISSINGCELLS: 10
524    for (; rowIndex < 80; rowIndex++) {
525      Put sourcePut = new Put(Bytes.toBytes(rowIndex));
526      sourcePut.addColumn(family, column1, timestamp, value1);
527      sourceTable.put(sourcePut);
528
529      Put targetPut = new Put(Bytes.toBytes(rowIndex));
530      targetPut.addColumn(family, column1, timestamp, value1);
531      targetPut.addColumn(family, column2, timestamp, value2);
532      targetTable.put(targetPut);
533    }
534    // some rows differing only in timestamp
535    // ROWSWITHDIFFS: 10
536    // SOURCEMISSINGCELLS: 20
537    // TARGETMISSINGCELLS: 20
538    for (; rowIndex < 90; rowIndex++) {
539      Put sourcePut = new Put(Bytes.toBytes(rowIndex));
540      sourcePut.addColumn(family, column1, timestamp, column1);
541      sourcePut.addColumn(family, column2, timestamp, value2);
542      sourceTable.put(sourcePut);
543
544      Put targetPut = new Put(Bytes.toBytes(rowIndex));
545      targetPut.addColumn(family, column1, timestamp+1, column1);
546      targetPut.addColumn(family, column2, timestamp-1, value2);
547      targetTable.put(targetPut);
548    }
549    // some rows with different values
550    // ROWSWITHDIFFS: 10
551    // DIFFERENTCELLVALUES: 20
552    for (; rowIndex < numRows; rowIndex++) {
553      Put sourcePut = new Put(Bytes.toBytes(rowIndex));
554      sourcePut.addColumn(family, column1, timestamp, value1);
555      sourcePut.addColumn(family, column2, timestamp, value2);
556      sourceTable.put(sourcePut);
557
558      Put targetPut = new Put(Bytes.toBytes(rowIndex));
559      targetPut.addColumn(family, column1, timestamp, value3);
560      targetPut.addColumn(family, column2, timestamp, value3);
561      targetTable.put(targetPut);
562    }
563
564    sourceTable.close();
565    targetTable.close();
566  }
567}