001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertTrue;
022
023import java.io.IOException;
024import java.util.ArrayList;
025import java.util.HashMap;
026import java.util.List;
027import java.util.Map;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.fs.FileStatus;
030import org.apache.hadoop.fs.Path;
031import org.apache.hadoop.hbase.Cell;
032import org.apache.hadoop.hbase.CellUtil;
033import org.apache.hadoop.hbase.HBaseTestingUtil;
034import org.apache.hadoop.hbase.HConstants;
035import org.apache.hadoop.hbase.HDFSBlocksDistribution;
036import org.apache.hadoop.hbase.StartTestingClusterOption;
037import org.apache.hadoop.hbase.TableName;
038import org.apache.hadoop.hbase.client.Admin;
039import org.apache.hadoop.hbase.client.RegionLocator;
040import org.apache.hadoop.hbase.client.Result;
041import org.apache.hadoop.hbase.client.ResultScanner;
042import org.apache.hadoop.hbase.client.Scan;
043import org.apache.hadoop.hbase.client.Table;
044import org.apache.hadoop.hbase.regionserver.HRegion;
045import org.apache.hadoop.hbase.tool.BulkLoadHFiles;
046import org.apache.hadoop.hbase.util.Bytes;
047import org.junit.After;
048import org.junit.AfterClass;
049import org.junit.Before;
050import org.junit.Test;
051import org.junit.runners.Parameterized.Parameter;
052import org.slf4j.Logger;
053import org.slf4j.LoggerFactory;
054
055public class MRIncrementalLoadTestBase extends HFileOutputFormat2TestBase {
056
057  private static final Logger LOG = LoggerFactory.getLogger(MRIncrementalLoadTestBase.class);
058
059  private static boolean SHOULD_KEEP_LOCALITY;
060
061  private static String[] HOSTNAMES;
062
063  @Parameter(0)
064  public boolean shouldChangeRegions;
065
066  @Parameter(1)
067  public boolean putSortReducer;
068
069  @Parameter(2)
070  public List<String> tableStr;
071
072  private Map<String, Table> allTables;
073
074  private List<HFileOutputFormat2.TableInfo> tableInfo;
075
076  private Path testDir;
077
078  protected static void setupCluster(boolean shouldKeepLocality) throws Exception {
079    SHOULD_KEEP_LOCALITY = shouldKeepLocality;
080    Configuration conf = UTIL.getConfiguration();
081    conf.setBoolean(MultiTableHFileOutputFormat.LOCALITY_SENSITIVE_CONF_KEY, shouldKeepLocality);
082    // We should change host count higher than hdfs replica count when MiniHBaseCluster supports
083    // explicit hostnames parameter just like MiniDFSCluster does.
084    int hostCount = shouldKeepLocality ? 3 : 1;
085
086    HOSTNAMES = new String[hostCount];
087    for (int i = 0; i < hostCount; ++i) {
088      HOSTNAMES[i] = "datanode_" + i;
089    }
090    StartTestingClusterOption option = StartTestingClusterOption.builder()
091      .numRegionServers(hostCount).dataNodeHosts(HOSTNAMES).build();
092    UTIL.getConfiguration().unset(HConstants.TEMPORARY_FS_DIRECTORY_KEY);
093    UTIL.startMiniCluster(option);
094
095  }
096
097  @AfterClass
098  public static void tearDownAfterClass() throws IOException {
099    UTIL.shutdownMiniCluster();
100  }
101
102  @Before
103  public void setUp() throws IOException {
104    int regionNum = SHOULD_KEEP_LOCALITY ? 20 : 5;
105    allTables = new HashMap<>(tableStr.size());
106    tableInfo = new ArrayList<>(tableStr.size());
107    for (String tableStrSingle : tableStr) {
108      byte[][] splitKeys = generateRandomSplitKeys(regionNum - 1);
109      TableName tableName = TableName.valueOf(tableStrSingle);
110      Table table = UTIL.createTable(tableName, FAMILIES, splitKeys);
111
112      RegionLocator r = UTIL.getConnection().getRegionLocator(tableName);
113      assertEquals("Should start with empty table", 0, HBaseTestingUtil.countRows(table));
114      int numRegions = r.getStartKeys().length;
115      assertEquals("Should make " + regionNum + " regions", numRegions, regionNum);
116
117      allTables.put(tableStrSingle, table);
118      tableInfo.add(new HFileOutputFormat2.TableInfo(table.getDescriptor(), r));
119    }
120    testDir = UTIL.getDataTestDirOnTestFS(tableStr.get(0));
121  }
122
123  @After
124  public void tearDown() throws IOException {
125    for (HFileOutputFormat2.TableInfo tableInfoSingle : tableInfo) {
126      tableInfoSingle.getRegionLocator().close();
127    }
128    tableInfo.clear();
129    allTables.clear();
130    for (String tableStrSingle : tableStr) {
131      UTIL.deleteTable(TableName.valueOf(tableStrSingle));
132    }
133  }
134
135  @Test
136  public void doIncrementalLoadTest() throws Exception {
137    boolean writeMultipleTables = tableStr.size() > 1;
138    // Generate the bulk load files
139    runIncrementalPELoad(UTIL.getConfiguration(), tableInfo, testDir, putSortReducer);
140    if (writeMultipleTables) {
141      testDir = new Path(testDir, "default");
142    }
143
144    for (Table tableSingle : allTables.values()) {
145      // This doesn't write into the table, just makes files
146      assertEquals("HFOF should not touch actual table", 0,
147        HBaseTestingUtil.countRows(tableSingle));
148    }
149    int numTableDirs = 0;
150    FileStatus[] fss = testDir.getFileSystem(UTIL.getConfiguration()).listStatus(testDir);
151    for (FileStatus tf : fss) {
152      Path tablePath = testDir;
153      if (writeMultipleTables) {
154        if (allTables.containsKey(tf.getPath().getName())) {
155          ++numTableDirs;
156          tablePath = tf.getPath();
157        } else {
158          continue;
159        }
160      }
161
162      // Make sure that a directory was created for every CF
163      int dir = 0;
164      fss = tablePath.getFileSystem(UTIL.getConfiguration()).listStatus(tablePath);
165      for (FileStatus f : fss) {
166        for (byte[] family : FAMILIES) {
167          if (Bytes.toString(family).equals(f.getPath().getName())) {
168            ++dir;
169          }
170        }
171      }
172      assertEquals("Column family not found in FS.", FAMILIES.length, dir);
173    }
174    if (writeMultipleTables) {
175      assertEquals("Dir for all input tables not created", numTableDirs, allTables.size());
176    }
177
178    Admin admin = UTIL.getAdmin();
179
180    // handle the split case
181    if (shouldChangeRegions) {
182      Table chosenTable = allTables.values().iterator().next();
183      // Choose a semi-random table if multiple tables are available
184      LOG.info("Changing regions in table " + chosenTable.getName().getNameAsString());
185      admin.disableTable(chosenTable.getName());
186      UTIL.waitUntilNoRegionsInTransition();
187
188      UTIL.deleteTable(chosenTable.getName());
189      byte[][] newSplitKeys = generateRandomSplitKeys(14);
190      UTIL.createTable(chosenTable.getName(), FAMILIES, newSplitKeys);
191      UTIL.waitTableAvailable(chosenTable.getName());
192    }
193
194    // Perform the actual load
195    for (HFileOutputFormat2.TableInfo singleTableInfo : tableInfo) {
196      Path tableDir = testDir;
197      String tableNameStr = singleTableInfo.getTableDescriptor().getTableName().getNameAsString();
198      LOG.info("Running BulkLoadHFiles on table" + tableNameStr);
199      if (writeMultipleTables) {
200        tableDir = new Path(testDir, tableNameStr);
201      }
202      Table currentTable = allTables.get(tableNameStr);
203      TableName currentTableName = currentTable.getName();
204      BulkLoadHFiles.create(UTIL.getConfiguration()).bulkLoad(currentTableName, tableDir);
205
206      // Ensure data shows up
207      int expectedRows = 0;
208      if (putSortReducer) {
209        // no rows should be extracted
210        assertEquals("BulkLoadHFiles should put expected data in table", expectedRows,
211          HBaseTestingUtil.countRows(currentTable));
212      } else {
213        expectedRows = NMapInputFormat.getNumMapTasks(UTIL.getConfiguration()) * ROWSPERSPLIT;
214        assertEquals("BulkLoadHFiles should put expected data in table", expectedRows,
215          HBaseTestingUtil.countRows(currentTable));
216        Scan scan = new Scan();
217        ResultScanner results = currentTable.getScanner(scan);
218        for (Result res : results) {
219          assertEquals(FAMILIES.length, res.rawCells().length);
220          Cell first = res.rawCells()[0];
221          for (Cell kv : res.rawCells()) {
222            assertTrue(CellUtil.matchingRows(first, kv));
223            assertTrue(Bytes.equals(CellUtil.cloneValue(first), CellUtil.cloneValue(kv)));
224          }
225        }
226        results.close();
227      }
228      String tableDigestBefore = UTIL.checksumRows(currentTable);
229      // Check region locality
230      HDFSBlocksDistribution hbd = new HDFSBlocksDistribution();
231      for (HRegion region : UTIL.getHBaseCluster().getRegions(currentTableName)) {
232        hbd.add(region.getHDFSBlocksDistribution());
233      }
234      for (String hostname : HOSTNAMES) {
235        float locality = hbd.getBlockLocalityIndex(hostname);
236        LOG.info("locality of [" + hostname + "]: " + locality);
237        assertEquals(100, (int) (locality * 100));
238      }
239
240      // Cause regions to reopen
241      admin.disableTable(currentTableName);
242      while (!admin.isTableDisabled(currentTableName)) {
243        Thread.sleep(200);
244        LOG.info("Waiting for table to disable");
245      }
246      admin.enableTable(currentTableName);
247      UTIL.waitTableAvailable(currentTableName);
248      assertEquals("Data should remain after reopening of regions", tableDigestBefore,
249        UTIL.checksumRows(currentTable));
250    }
251  }
252}