001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import static org.junit.Assert.assertEquals; 021 022import java.io.IOException; 023import java.util.Arrays; 024import org.apache.hadoop.fs.FileSystem; 025import org.apache.hadoop.fs.Path; 026import org.apache.hadoop.hbase.Cell; 027import org.apache.hadoop.hbase.CellUtil; 028import org.apache.hadoop.hbase.HBaseClassTestRule; 029import org.apache.hadoop.hbase.HBaseTestingUtility; 030import org.apache.hadoop.hbase.TableName; 031import org.apache.hadoop.hbase.client.Put; 032import org.apache.hadoop.hbase.client.Result; 033import org.apache.hadoop.hbase.client.ResultScanner; 034import org.apache.hadoop.hbase.client.Scan; 035import org.apache.hadoop.hbase.client.Table; 036import org.apache.hadoop.hbase.mapreduce.SyncTable.SyncMapper.Counter; 037import org.apache.hadoop.hbase.testclassification.LargeTests; 038import org.apache.hadoop.hbase.util.Bytes; 039import org.apache.hadoop.mapreduce.Counters; 040import org.junit.AfterClass; 041import org.junit.Assert; 042import org.junit.BeforeClass; 043import org.junit.ClassRule; 044import org.junit.Rule; 045import org.junit.Test; 046import org.junit.experimental.categories.Category; 047import org.junit.rules.TestName; 048import org.slf4j.Logger; 049import org.slf4j.LoggerFactory; 050 051import org.apache.hbase.thirdparty.com.google.common.base.Throwables; 052 053/** 054 * Basic test for the SyncTable M/R tool 055 */ 056@Category(LargeTests.class) 057public class TestSyncTable { 058 059 @ClassRule 060 public static final HBaseClassTestRule CLASS_RULE = 061 HBaseClassTestRule.forClass(TestSyncTable.class); 062 063 private static final Logger LOG = LoggerFactory.getLogger(TestSyncTable.class); 064 065 private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 066 067 @Rule 068 public TestName name = new TestName(); 069 070 @BeforeClass 071 public static void beforeClass() throws Exception { 072 TEST_UTIL.startMiniCluster(3); 073 } 074 075 @AfterClass 076 public static void afterClass() throws Exception { 077 TEST_UTIL.shutdownMiniCluster(); 078 } 079 080 private static byte[][] generateSplits(int numRows, int numRegions) { 081 byte[][] splitRows = new byte[numRegions-1][]; 082 for (int i = 1; i < numRegions; i++) { 083 splitRows[i-1] = Bytes.toBytes(numRows * i / numRegions); 084 } 085 return splitRows; 086 } 087 088 @Test 089 public void testSyncTable() throws Exception { 090 final TableName sourceTableName = TableName.valueOf(name.getMethodName() + "_source"); 091 final TableName targetTableName = TableName.valueOf(name.getMethodName() + "_target"); 092 Path testDir = TEST_UTIL.getDataTestDirOnTestFS("testSyncTable"); 093 094 writeTestData(sourceTableName, targetTableName); 095 hashSourceTable(sourceTableName, testDir); 096 Counters syncCounters = syncTables(sourceTableName, targetTableName, testDir); 097 assertEqualTables(90, sourceTableName, targetTableName); 098 099 assertEquals(60, syncCounters.findCounter(Counter.ROWSWITHDIFFS).getValue()); 100 assertEquals(10, syncCounters.findCounter(Counter.SOURCEMISSINGROWS).getValue()); 101 assertEquals(10, syncCounters.findCounter(Counter.TARGETMISSINGROWS).getValue()); 102 assertEquals(50, syncCounters.findCounter(Counter.SOURCEMISSINGCELLS).getValue()); 103 assertEquals(50, syncCounters.findCounter(Counter.TARGETMISSINGCELLS).getValue()); 104 assertEquals(20, syncCounters.findCounter(Counter.DIFFERENTCELLVALUES).getValue()); 105 106 TEST_UTIL.deleteTable(sourceTableName); 107 TEST_UTIL.deleteTable(targetTableName); 108 TEST_UTIL.cleanupDataTestDirOnTestFS(); 109 } 110 111 private void assertEqualTables(int expectedRows, TableName sourceTableName, 112 TableName targetTableName) throws Exception { 113 Table sourceTable = TEST_UTIL.getConnection().getTable(sourceTableName); 114 Table targetTable = TEST_UTIL.getConnection().getTable(targetTableName); 115 116 ResultScanner sourceScanner = sourceTable.getScanner(new Scan()); 117 ResultScanner targetScanner = targetTable.getScanner(new Scan()); 118 119 for (int i = 0; i < expectedRows; i++) { 120 Result sourceRow = sourceScanner.next(); 121 Result targetRow = targetScanner.next(); 122 123 LOG.debug("SOURCE row: " + (sourceRow == null ? "null" : Bytes.toInt(sourceRow.getRow())) 124 + " cells:" + sourceRow); 125 LOG.debug("TARGET row: " + (targetRow == null ? "null" : Bytes.toInt(targetRow.getRow())) 126 + " cells:" + targetRow); 127 128 if (sourceRow == null) { 129 Assert.fail("Expected " + expectedRows 130 + " source rows but only found " + i); 131 } 132 if (targetRow == null) { 133 Assert.fail("Expected " + expectedRows 134 + " target rows but only found " + i); 135 } 136 Cell[] sourceCells = sourceRow.rawCells(); 137 Cell[] targetCells = targetRow.rawCells(); 138 if (sourceCells.length != targetCells.length) { 139 LOG.debug("Source cells: " + Arrays.toString(sourceCells)); 140 LOG.debug("Target cells: " + Arrays.toString(targetCells)); 141 Assert.fail("Row " + Bytes.toInt(sourceRow.getRow()) 142 + " has " + sourceCells.length 143 + " cells in source table but " + targetCells.length 144 + " cells in target table"); 145 } 146 for (int j = 0; j < sourceCells.length; j++) { 147 Cell sourceCell = sourceCells[j]; 148 Cell targetCell = targetCells[j]; 149 try { 150 if (!CellUtil.matchingRows(sourceCell, targetCell)) { 151 Assert.fail("Rows don't match"); 152 } 153 if (!CellUtil.matchingFamily(sourceCell, targetCell)) { 154 Assert.fail("Families don't match"); 155 } 156 if (!CellUtil.matchingQualifier(sourceCell, targetCell)) { 157 Assert.fail("Qualifiers don't match"); 158 } 159 if (!CellUtil.matchingTimestamp(sourceCell, targetCell)) { 160 Assert.fail("Timestamps don't match"); 161 } 162 if (!CellUtil.matchingValue(sourceCell, targetCell)) { 163 Assert.fail("Values don't match"); 164 } 165 } catch (Throwable t) { 166 LOG.debug("Source cell: " + sourceCell + " target cell: " + targetCell); 167 Throwables.propagate(t); 168 } 169 } 170 } 171 Result sourceRow = sourceScanner.next(); 172 if (sourceRow != null) { 173 Assert.fail("Source table has more than " + expectedRows 174 + " rows. Next row: " + Bytes.toInt(sourceRow.getRow())); 175 } 176 Result targetRow = targetScanner.next(); 177 if (targetRow != null) { 178 Assert.fail("Target table has more than " + expectedRows 179 + " rows. Next row: " + Bytes.toInt(targetRow.getRow())); 180 } 181 sourceScanner.close(); 182 targetScanner.close(); 183 sourceTable.close(); 184 targetTable.close(); 185 } 186 187 private Counters syncTables(TableName sourceTableName, TableName targetTableName, 188 Path testDir) throws Exception { 189 SyncTable syncTable = new SyncTable(TEST_UTIL.getConfiguration()); 190 int code = syncTable.run(new String[] { 191 testDir.toString(), 192 sourceTableName.getNameAsString(), 193 targetTableName.getNameAsString() 194 }); 195 assertEquals("sync table job failed", 0, code); 196 197 LOG.info("Sync tables completed"); 198 return syncTable.counters; 199 } 200 201 private void hashSourceTable(TableName sourceTableName, Path testDir) 202 throws Exception, IOException { 203 int numHashFiles = 3; 204 long batchSize = 100; // should be 2 batches per region 205 int scanBatch = 1; 206 HashTable hashTable = new HashTable(TEST_UTIL.getConfiguration()); 207 int code = hashTable.run(new String[] { 208 "--batchsize=" + batchSize, 209 "--numhashfiles=" + numHashFiles, 210 "--scanbatch=" + scanBatch, 211 sourceTableName.getNameAsString(), 212 testDir.toString()}); 213 assertEquals("hash table job failed", 0, code); 214 215 FileSystem fs = TEST_UTIL.getTestFileSystem(); 216 217 HashTable.TableHash tableHash = HashTable.TableHash.read(fs.getConf(), testDir); 218 assertEquals(sourceTableName.getNameAsString(), tableHash.tableName); 219 assertEquals(batchSize, tableHash.batchSize); 220 assertEquals(numHashFiles, tableHash.numHashFiles); 221 assertEquals(numHashFiles - 1, tableHash.partitions.size()); 222 223 LOG.info("Hash table completed"); 224 } 225 226 private void writeTestData(TableName sourceTableName, TableName targetTableName) 227 throws Exception { 228 final byte[] family = Bytes.toBytes("family"); 229 final byte[] column1 = Bytes.toBytes("c1"); 230 final byte[] column2 = Bytes.toBytes("c2"); 231 final byte[] value1 = Bytes.toBytes("val1"); 232 final byte[] value2 = Bytes.toBytes("val2"); 233 final byte[] value3 = Bytes.toBytes("val3"); 234 235 int numRows = 100; 236 int sourceRegions = 10; 237 int targetRegions = 6; 238 239 Table sourceTable = TEST_UTIL.createTable(sourceTableName, 240 family, generateSplits(numRows, sourceRegions)); 241 242 Table targetTable = TEST_UTIL.createTable(targetTableName, 243 family, generateSplits(numRows, targetRegions)); 244 245 long timestamp = 1430764183454L; 246 247 int rowIndex = 0; 248 // a bunch of identical rows 249 for (; rowIndex < 40; rowIndex++) { 250 Put sourcePut = new Put(Bytes.toBytes(rowIndex)); 251 sourcePut.addColumn(family, column1, timestamp, value1); 252 sourcePut.addColumn(family, column2, timestamp, value2); 253 sourceTable.put(sourcePut); 254 255 Put targetPut = new Put(Bytes.toBytes(rowIndex)); 256 targetPut.addColumn(family, column1, timestamp, value1); 257 targetPut.addColumn(family, column2, timestamp, value2); 258 targetTable.put(targetPut); 259 } 260 // some rows only in the source table 261 // ROWSWITHDIFFS: 10 262 // TARGETMISSINGROWS: 10 263 // TARGETMISSINGCELLS: 20 264 for (; rowIndex < 50; rowIndex++) { 265 Put put = new Put(Bytes.toBytes(rowIndex)); 266 put.addColumn(family, column1, timestamp, value1); 267 put.addColumn(family, column2, timestamp, value2); 268 sourceTable.put(put); 269 } 270 // some rows only in the target table 271 // ROWSWITHDIFFS: 10 272 // SOURCEMISSINGROWS: 10 273 // SOURCEMISSINGCELLS: 20 274 for (; rowIndex < 60; rowIndex++) { 275 Put put = new Put(Bytes.toBytes(rowIndex)); 276 put.addColumn(family, column1, timestamp, value1); 277 put.addColumn(family, column2, timestamp, value2); 278 targetTable.put(put); 279 } 280 // some rows with 1 missing cell in target table 281 // ROWSWITHDIFFS: 10 282 // TARGETMISSINGCELLS: 10 283 for (; rowIndex < 70; rowIndex++) { 284 Put sourcePut = new Put(Bytes.toBytes(rowIndex)); 285 sourcePut.addColumn(family, column1, timestamp, value1); 286 sourcePut.addColumn(family, column2, timestamp, value2); 287 sourceTable.put(sourcePut); 288 289 Put targetPut = new Put(Bytes.toBytes(rowIndex)); 290 targetPut.addColumn(family, column1, timestamp, value1); 291 targetTable.put(targetPut); 292 } 293 // some rows with 1 missing cell in source table 294 // ROWSWITHDIFFS: 10 295 // SOURCEMISSINGCELLS: 10 296 for (; rowIndex < 80; rowIndex++) { 297 Put sourcePut = new Put(Bytes.toBytes(rowIndex)); 298 sourcePut.addColumn(family, column1, timestamp, value1); 299 sourceTable.put(sourcePut); 300 301 Put targetPut = new Put(Bytes.toBytes(rowIndex)); 302 targetPut.addColumn(family, column1, timestamp, value1); 303 targetPut.addColumn(family, column2, timestamp, value2); 304 targetTable.put(targetPut); 305 } 306 // some rows differing only in timestamp 307 // ROWSWITHDIFFS: 10 308 // SOURCEMISSINGCELLS: 20 309 // TARGETMISSINGCELLS: 20 310 for (; rowIndex < 90; rowIndex++) { 311 Put sourcePut = new Put(Bytes.toBytes(rowIndex)); 312 sourcePut.addColumn(family, column1, timestamp, column1); 313 sourcePut.addColumn(family, column2, timestamp, value2); 314 sourceTable.put(sourcePut); 315 316 Put targetPut = new Put(Bytes.toBytes(rowIndex)); 317 targetPut.addColumn(family, column1, timestamp+1, column1); 318 targetPut.addColumn(family, column2, timestamp-1, value2); 319 targetTable.put(targetPut); 320 } 321 // some rows with different values 322 // ROWSWITHDIFFS: 10 323 // DIFFERENTCELLVALUES: 20 324 for (; rowIndex < numRows; rowIndex++) { 325 Put sourcePut = new Put(Bytes.toBytes(rowIndex)); 326 sourcePut.addColumn(family, column1, timestamp, value1); 327 sourcePut.addColumn(family, column2, timestamp, value2); 328 sourceTable.put(sourcePut); 329 330 Put targetPut = new Put(Bytes.toBytes(rowIndex)); 331 targetPut.addColumn(family, column1, timestamp, value3); 332 targetPut.addColumn(family, column2, timestamp, value3); 333 targetTable.put(targetPut); 334 } 335 336 sourceTable.close(); 337 targetTable.close(); 338 } 339 340 341}