001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import static org.junit.Assert.assertEquals;
021
022import java.io.IOException;
023import java.util.Arrays;
024import org.apache.hadoop.fs.FileSystem;
025import org.apache.hadoop.fs.Path;
026import org.apache.hadoop.hbase.Cell;
027import org.apache.hadoop.hbase.CellUtil;
028import org.apache.hadoop.hbase.HBaseClassTestRule;
029import org.apache.hadoop.hbase.HBaseTestingUtility;
030import org.apache.hadoop.hbase.TableName;
031import org.apache.hadoop.hbase.client.Put;
032import org.apache.hadoop.hbase.client.Result;
033import org.apache.hadoop.hbase.client.ResultScanner;
034import org.apache.hadoop.hbase.client.Scan;
035import org.apache.hadoop.hbase.client.Table;
036import org.apache.hadoop.hbase.mapreduce.SyncTable.SyncMapper.Counter;
037import org.apache.hadoop.hbase.testclassification.LargeTests;
038import org.apache.hadoop.hbase.util.Bytes;
039import org.apache.hadoop.mapreduce.Counters;
040import org.junit.AfterClass;
041import org.junit.Assert;
042import org.junit.BeforeClass;
043import org.junit.ClassRule;
044import org.junit.Rule;
045import org.junit.Test;
046import org.junit.experimental.categories.Category;
047import org.junit.rules.TestName;
048import org.slf4j.Logger;
049import org.slf4j.LoggerFactory;
050
051import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
052
053/**
054 * Basic test for the SyncTable M/R tool
055 */
056@Category(LargeTests.class)
057public class TestSyncTable {
058
059  @ClassRule
060  public static final HBaseClassTestRule CLASS_RULE =
061      HBaseClassTestRule.forClass(TestSyncTable.class);
062
063  private static final Logger LOG = LoggerFactory.getLogger(TestSyncTable.class);
064
065  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
066
067  @Rule
068  public TestName name = new TestName();
069
070  @BeforeClass
071  public static void beforeClass() throws Exception {
072    TEST_UTIL.startMiniCluster(3);
073  }
074
075  @AfterClass
076  public static void afterClass() throws Exception {
077    TEST_UTIL.shutdownMiniCluster();
078  }
079
080  private static byte[][] generateSplits(int numRows, int numRegions) {
081    byte[][] splitRows = new byte[numRegions-1][];
082    for (int i = 1; i < numRegions; i++) {
083      splitRows[i-1] = Bytes.toBytes(numRows * i / numRegions);
084    }
085    return splitRows;
086  }
087
088  @Test
089  public void testSyncTable() throws Exception {
090    final TableName sourceTableName = TableName.valueOf(name.getMethodName() + "_source");
091    final TableName targetTableName = TableName.valueOf(name.getMethodName() + "_target");
092    Path testDir = TEST_UTIL.getDataTestDirOnTestFS("testSyncTable");
093
094    writeTestData(sourceTableName, targetTableName);
095    hashSourceTable(sourceTableName, testDir);
096    Counters syncCounters = syncTables(sourceTableName, targetTableName, testDir);
097    assertEqualTables(90, sourceTableName, targetTableName);
098
099    assertEquals(60, syncCounters.findCounter(Counter.ROWSWITHDIFFS).getValue());
100    assertEquals(10, syncCounters.findCounter(Counter.SOURCEMISSINGROWS).getValue());
101    assertEquals(10, syncCounters.findCounter(Counter.TARGETMISSINGROWS).getValue());
102    assertEquals(50, syncCounters.findCounter(Counter.SOURCEMISSINGCELLS).getValue());
103    assertEquals(50, syncCounters.findCounter(Counter.TARGETMISSINGCELLS).getValue());
104    assertEquals(20, syncCounters.findCounter(Counter.DIFFERENTCELLVALUES).getValue());
105
106    TEST_UTIL.deleteTable(sourceTableName);
107    TEST_UTIL.deleteTable(targetTableName);
108    TEST_UTIL.cleanupDataTestDirOnTestFS();
109  }
110
111  private void assertEqualTables(int expectedRows, TableName sourceTableName,
112      TableName targetTableName) throws Exception {
113    Table sourceTable = TEST_UTIL.getConnection().getTable(sourceTableName);
114    Table targetTable = TEST_UTIL.getConnection().getTable(targetTableName);
115
116    ResultScanner sourceScanner = sourceTable.getScanner(new Scan());
117    ResultScanner targetScanner = targetTable.getScanner(new Scan());
118
119    for (int i = 0; i < expectedRows; i++) {
120      Result sourceRow = sourceScanner.next();
121      Result targetRow = targetScanner.next();
122
123      LOG.debug("SOURCE row: " + (sourceRow == null ? "null" : Bytes.toInt(sourceRow.getRow()))
124          + " cells:" + sourceRow);
125      LOG.debug("TARGET row: " + (targetRow == null ? "null" : Bytes.toInt(targetRow.getRow()))
126          + " cells:" + targetRow);
127
128      if (sourceRow == null) {
129        Assert.fail("Expected " + expectedRows
130            + " source rows but only found " + i);
131      }
132      if (targetRow == null) {
133        Assert.fail("Expected " + expectedRows
134            + " target rows but only found " + i);
135      }
136      Cell[] sourceCells = sourceRow.rawCells();
137      Cell[] targetCells = targetRow.rawCells();
138      if (sourceCells.length != targetCells.length) {
139        LOG.debug("Source cells: " + Arrays.toString(sourceCells));
140        LOG.debug("Target cells: " + Arrays.toString(targetCells));
141        Assert.fail("Row " + Bytes.toInt(sourceRow.getRow())
142            + " has " + sourceCells.length
143            + " cells in source table but " + targetCells.length
144            + " cells in target table");
145      }
146      for (int j = 0; j < sourceCells.length; j++) {
147        Cell sourceCell = sourceCells[j];
148        Cell targetCell = targetCells[j];
149        try {
150          if (!CellUtil.matchingRows(sourceCell, targetCell)) {
151            Assert.fail("Rows don't match");
152          }
153          if (!CellUtil.matchingFamily(sourceCell, targetCell)) {
154            Assert.fail("Families don't match");
155          }
156          if (!CellUtil.matchingQualifier(sourceCell, targetCell)) {
157            Assert.fail("Qualifiers don't match");
158          }
159          if (!CellUtil.matchingTimestamp(sourceCell, targetCell)) {
160            Assert.fail("Timestamps don't match");
161          }
162          if (!CellUtil.matchingValue(sourceCell, targetCell)) {
163            Assert.fail("Values don't match");
164          }
165        } catch (Throwable t) {
166          LOG.debug("Source cell: " + sourceCell + " target cell: " + targetCell);
167          Throwables.propagate(t);
168        }
169      }
170    }
171    Result sourceRow = sourceScanner.next();
172    if (sourceRow != null) {
173      Assert.fail("Source table has more than " + expectedRows
174          + " rows.  Next row: " + Bytes.toInt(sourceRow.getRow()));
175    }
176    Result targetRow = targetScanner.next();
177    if (targetRow != null) {
178      Assert.fail("Target table has more than " + expectedRows
179          + " rows.  Next row: " + Bytes.toInt(targetRow.getRow()));
180    }
181    sourceScanner.close();
182    targetScanner.close();
183    sourceTable.close();
184    targetTable.close();
185  }
186
187  private Counters syncTables(TableName sourceTableName, TableName targetTableName,
188      Path testDir) throws Exception {
189    SyncTable syncTable = new SyncTable(TEST_UTIL.getConfiguration());
190    int code = syncTable.run(new String[] {
191        testDir.toString(),
192        sourceTableName.getNameAsString(),
193        targetTableName.getNameAsString()
194        });
195    assertEquals("sync table job failed", 0, code);
196
197    LOG.info("Sync tables completed");
198    return syncTable.counters;
199  }
200
201  private void hashSourceTable(TableName sourceTableName, Path testDir)
202      throws Exception, IOException {
203    int numHashFiles = 3;
204    long batchSize = 100;  // should be 2 batches per region
205    int scanBatch = 1;
206    HashTable hashTable = new HashTable(TEST_UTIL.getConfiguration());
207    int code = hashTable.run(new String[] {
208        "--batchsize=" + batchSize,
209        "--numhashfiles=" + numHashFiles,
210        "--scanbatch=" + scanBatch,
211        sourceTableName.getNameAsString(),
212        testDir.toString()});
213    assertEquals("hash table job failed", 0, code);
214
215    FileSystem fs = TEST_UTIL.getTestFileSystem();
216
217    HashTable.TableHash tableHash = HashTable.TableHash.read(fs.getConf(), testDir);
218    assertEquals(sourceTableName.getNameAsString(), tableHash.tableName);
219    assertEquals(batchSize, tableHash.batchSize);
220    assertEquals(numHashFiles, tableHash.numHashFiles);
221    assertEquals(numHashFiles - 1, tableHash.partitions.size());
222
223    LOG.info("Hash table completed");
224  }
225
226  private void writeTestData(TableName sourceTableName, TableName targetTableName)
227      throws Exception {
228    final byte[] family = Bytes.toBytes("family");
229    final byte[] column1 = Bytes.toBytes("c1");
230    final byte[] column2 = Bytes.toBytes("c2");
231    final byte[] value1 = Bytes.toBytes("val1");
232    final byte[] value2 = Bytes.toBytes("val2");
233    final byte[] value3 = Bytes.toBytes("val3");
234
235    int numRows = 100;
236    int sourceRegions = 10;
237    int targetRegions = 6;
238
239    Table sourceTable = TEST_UTIL.createTable(sourceTableName,
240        family, generateSplits(numRows, sourceRegions));
241
242    Table targetTable = TEST_UTIL.createTable(targetTableName,
243        family, generateSplits(numRows, targetRegions));
244
245    long timestamp = 1430764183454L;
246
247    int rowIndex = 0;
248    // a bunch of identical rows
249    for (; rowIndex < 40; rowIndex++) {
250      Put sourcePut = new Put(Bytes.toBytes(rowIndex));
251      sourcePut.addColumn(family, column1, timestamp, value1);
252      sourcePut.addColumn(family, column2, timestamp, value2);
253      sourceTable.put(sourcePut);
254
255      Put targetPut = new Put(Bytes.toBytes(rowIndex));
256      targetPut.addColumn(family, column1, timestamp, value1);
257      targetPut.addColumn(family, column2, timestamp, value2);
258      targetTable.put(targetPut);
259    }
260    // some rows only in the source table
261    // ROWSWITHDIFFS: 10
262    // TARGETMISSINGROWS: 10
263    // TARGETMISSINGCELLS: 20
264    for (; rowIndex < 50; rowIndex++) {
265      Put put = new Put(Bytes.toBytes(rowIndex));
266      put.addColumn(family, column1, timestamp, value1);
267      put.addColumn(family, column2, timestamp, value2);
268      sourceTable.put(put);
269    }
270    // some rows only in the target table
271    // ROWSWITHDIFFS: 10
272    // SOURCEMISSINGROWS: 10
273    // SOURCEMISSINGCELLS: 20
274    for (; rowIndex < 60; rowIndex++) {
275      Put put = new Put(Bytes.toBytes(rowIndex));
276      put.addColumn(family, column1, timestamp, value1);
277      put.addColumn(family, column2, timestamp, value2);
278      targetTable.put(put);
279    }
280    // some rows with 1 missing cell in target table
281    // ROWSWITHDIFFS: 10
282    // TARGETMISSINGCELLS: 10
283    for (; rowIndex < 70; rowIndex++) {
284      Put sourcePut = new Put(Bytes.toBytes(rowIndex));
285      sourcePut.addColumn(family, column1, timestamp, value1);
286      sourcePut.addColumn(family, column2, timestamp, value2);
287      sourceTable.put(sourcePut);
288
289      Put targetPut = new Put(Bytes.toBytes(rowIndex));
290      targetPut.addColumn(family, column1, timestamp, value1);
291      targetTable.put(targetPut);
292    }
293    // some rows with 1 missing cell in source table
294    // ROWSWITHDIFFS: 10
295    // SOURCEMISSINGCELLS: 10
296    for (; rowIndex < 80; rowIndex++) {
297      Put sourcePut = new Put(Bytes.toBytes(rowIndex));
298      sourcePut.addColumn(family, column1, timestamp, value1);
299      sourceTable.put(sourcePut);
300
301      Put targetPut = new Put(Bytes.toBytes(rowIndex));
302      targetPut.addColumn(family, column1, timestamp, value1);
303      targetPut.addColumn(family, column2, timestamp, value2);
304      targetTable.put(targetPut);
305    }
306    // some rows differing only in timestamp
307    // ROWSWITHDIFFS: 10
308    // SOURCEMISSINGCELLS: 20
309    // TARGETMISSINGCELLS: 20
310    for (; rowIndex < 90; rowIndex++) {
311      Put sourcePut = new Put(Bytes.toBytes(rowIndex));
312      sourcePut.addColumn(family, column1, timestamp, column1);
313      sourcePut.addColumn(family, column2, timestamp, value2);
314      sourceTable.put(sourcePut);
315
316      Put targetPut = new Put(Bytes.toBytes(rowIndex));
317      targetPut.addColumn(family, column1, timestamp+1, column1);
318      targetPut.addColumn(family, column2, timestamp-1, value2);
319      targetTable.put(targetPut);
320    }
321    // some rows with different values
322    // ROWSWITHDIFFS: 10
323    // DIFFERENTCELLVALUES: 20
324    for (; rowIndex < numRows; rowIndex++) {
325      Put sourcePut = new Put(Bytes.toBytes(rowIndex));
326      sourcePut.addColumn(family, column1, timestamp, value1);
327      sourcePut.addColumn(family, column2, timestamp, value2);
328      sourceTable.put(sourcePut);
329
330      Put targetPut = new Put(Bytes.toBytes(rowIndex));
331      targetPut.addColumn(family, column1, timestamp, value3);
332      targetPut.addColumn(family, column2, timestamp, value3);
333      targetTable.put(targetPut);
334    }
335
336    sourceTable.close();
337    targetTable.close();
338  }
339
340
341}