001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import static org.junit.jupiter.api.Assertions.assertEquals;
021import static org.junit.jupiter.api.Assertions.assertTrue;
022
023import java.io.IOException;
024import java.util.ArrayList;
025import java.util.HashMap;
026import java.util.List;
027import java.util.Map;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.fs.FileStatus;
030import org.apache.hadoop.fs.Path;
031import org.apache.hadoop.hbase.Cell;
032import org.apache.hadoop.hbase.CellUtil;
033import org.apache.hadoop.hbase.HBaseTestingUtil;
034import org.apache.hadoop.hbase.HConstants;
035import org.apache.hadoop.hbase.HDFSBlocksDistribution;
036import org.apache.hadoop.hbase.StartTestingClusterOption;
037import org.apache.hadoop.hbase.TableName;
038import org.apache.hadoop.hbase.client.Admin;
039import org.apache.hadoop.hbase.client.RegionLocator;
040import org.apache.hadoop.hbase.client.Result;
041import org.apache.hadoop.hbase.client.ResultScanner;
042import org.apache.hadoop.hbase.client.Scan;
043import org.apache.hadoop.hbase.client.Table;
044import org.apache.hadoop.hbase.regionserver.HRegion;
045import org.apache.hadoop.hbase.tool.BulkLoadHFiles;
046import org.apache.hadoop.hbase.util.Bytes;
047import org.junit.jupiter.api.AfterAll;
048import org.slf4j.Logger;
049import org.slf4j.LoggerFactory;
050
051public class MRIncrementalLoadTestBase extends HFileOutputFormat2TestBase {
052
053  private static final Logger LOG = LoggerFactory.getLogger(MRIncrementalLoadTestBase.class);
054
055  private static boolean SHOULD_KEEP_LOCALITY;
056
057  private static String[] HOSTNAMES;
058
059  public boolean shouldChangeRegions;
060
061  public boolean putSortReducer;
062
063  public List<String> tableStr;
064
065  private Map<String, Table> allTables;
066
067  private List<HFileOutputFormat2.TableInfo> tableInfo;
068
069  private Path testDir;
070
071  protected static void setupCluster(boolean shouldKeepLocality) throws Exception {
072    SHOULD_KEEP_LOCALITY = shouldKeepLocality;
073    Configuration conf = UTIL.getConfiguration();
074    conf.setBoolean(MultiTableHFileOutputFormat.LOCALITY_SENSITIVE_CONF_KEY, shouldKeepLocality);
075    // We should change host count higher than hdfs replica count when MiniHBaseCluster supports
076    // explicit hostnames parameter just like MiniDFSCluster does.
077    int hostCount = shouldKeepLocality ? 3 : 1;
078
079    HOSTNAMES = new String[hostCount];
080    for (int i = 0; i < hostCount; ++i) {
081      HOSTNAMES[i] = "datanode_" + i;
082    }
083    StartTestingClusterOption option = StartTestingClusterOption.builder()
084      .numRegionServers(hostCount).dataNodeHosts(HOSTNAMES).build();
085    UTIL.getConfiguration().unset(HConstants.TEMPORARY_FS_DIRECTORY_KEY);
086    UTIL.startMiniCluster(option);
087
088  }
089
090  @AfterAll
091  public static void tearDownAfterClass() throws IOException {
092    UTIL.shutdownMiniCluster();
093  }
094
095  public void setUp() throws IOException {
096    int regionNum = SHOULD_KEEP_LOCALITY ? 20 : 5;
097    allTables = new HashMap<>(tableStr.size());
098    tableInfo = new ArrayList<>(tableStr.size());
099    for (String tableStrSingle : tableStr) {
100      byte[][] splitKeys = generateRandomSplitKeys(regionNum - 1);
101      TableName tableName = TableName.valueOf(tableStrSingle);
102      Table table = UTIL.createTable(tableName, FAMILIES, splitKeys);
103
104      RegionLocator r = UTIL.getConnection().getRegionLocator(tableName);
105      assertEquals(0, HBaseTestingUtil.countRows(table), "Should start with empty table");
106      int numRegions = r.getStartKeys().length;
107      assertEquals(numRegions, regionNum, "Should make " + regionNum + " regions");
108
109      allTables.put(tableStrSingle, table);
110      tableInfo.add(new HFileOutputFormat2.TableInfo(table.getDescriptor(), r));
111    }
112    testDir = UTIL.getDataTestDirOnTestFS(tableStr.get(0));
113  }
114
115  public void tearDown() throws IOException {
116    for (HFileOutputFormat2.TableInfo tableInfoSingle : tableInfo) {
117      tableInfoSingle.getRegionLocator().close();
118    }
119    tableInfo.clear();
120    allTables.clear();
121    for (String tableStrSingle : tableStr) {
122      UTIL.deleteTable(TableName.valueOf(tableStrSingle));
123    }
124  }
125
126  protected void runTest(boolean shouldChangeRegions, boolean putSortReducer, List<String> tableStr)
127    throws Exception {
128    this.shouldChangeRegions = shouldChangeRegions;
129    this.putSortReducer = putSortReducer;
130    this.tableStr = tableStr;
131    setUp();
132    try {
133      doIncrementalLoadTest();
134    } finally {
135      tearDown();
136    }
137  }
138
139  public void doIncrementalLoadTest() throws Exception {
140    boolean writeMultipleTables = tableStr.size() > 1;
141    // Generate the bulk load files
142    runIncrementalPELoad(UTIL.getConfiguration(), tableInfo, testDir, putSortReducer);
143    if (writeMultipleTables) {
144      testDir = new Path(testDir, "default");
145    }
146
147    for (Table tableSingle : allTables.values()) {
148      // This doesn't write into the table, just makes files
149      assertEquals(0, HBaseTestingUtil.countRows(tableSingle),
150        "HFOF should not touch actual table");
151    }
152    int numTableDirs = 0;
153    FileStatus[] fss = testDir.getFileSystem(UTIL.getConfiguration()).listStatus(testDir);
154    for (FileStatus tf : fss) {
155      Path tablePath = testDir;
156      if (writeMultipleTables) {
157        if (allTables.containsKey(tf.getPath().getName())) {
158          ++numTableDirs;
159          tablePath = tf.getPath();
160        } else {
161          continue;
162        }
163      }
164
165      // Make sure that a directory was created for every CF
166      int dir = 0;
167      fss = tablePath.getFileSystem(UTIL.getConfiguration()).listStatus(tablePath);
168      for (FileStatus f : fss) {
169        for (byte[] family : FAMILIES) {
170          if (Bytes.toString(family).equals(f.getPath().getName())) {
171            ++dir;
172          }
173        }
174      }
175      assertEquals(FAMILIES.length, dir, "Column family not found in FS.");
176    }
177    if (writeMultipleTables) {
178      assertEquals(numTableDirs, allTables.size(), "Dir for all input tables not created");
179    }
180
181    Admin admin = UTIL.getAdmin();
182
183    // handle the split case
184    if (shouldChangeRegions) {
185      Table chosenTable = allTables.values().iterator().next();
186      // Choose a semi-random table if multiple tables are available
187      LOG.info("Changing regions in table " + chosenTable.getName().getNameAsString());
188      admin.disableTable(chosenTable.getName());
189      UTIL.waitUntilNoRegionsInTransition();
190
191      UTIL.deleteTable(chosenTable.getName());
192      byte[][] newSplitKeys = generateRandomSplitKeys(14);
193      UTIL.createTable(chosenTable.getName(), FAMILIES, newSplitKeys);
194      UTIL.waitTableAvailable(chosenTable.getName());
195    }
196
197    // Perform the actual load
198    for (HFileOutputFormat2.TableInfo singleTableInfo : tableInfo) {
199      Path tableDir = testDir;
200      String tableNameStr = singleTableInfo.getTableDescriptor().getTableName().getNameAsString();
201      LOG.info("Running BulkLoadHFiles on table" + tableNameStr);
202      if (writeMultipleTables) {
203        tableDir = new Path(testDir, tableNameStr);
204      }
205      Table currentTable = allTables.get(tableNameStr);
206      TableName currentTableName = currentTable.getName();
207      BulkLoadHFiles.create(UTIL.getConfiguration()).bulkLoad(currentTableName, tableDir);
208
209      // Ensure data shows up
210      int expectedRows = 0;
211      if (putSortReducer) {
212        // no rows should be extracted
213        assertEquals(expectedRows, HBaseTestingUtil.countRows(currentTable),
214          "BulkLoadHFiles should put expected data in table");
215      } else {
216        expectedRows = NMapInputFormat.getNumMapTasks(UTIL.getConfiguration()) * ROWSPERSPLIT;
217        assertEquals(expectedRows, HBaseTestingUtil.countRows(currentTable),
218          "BulkLoadHFiles should put expected data in table");
219        Scan scan = new Scan();
220        ResultScanner results = currentTable.getScanner(scan);
221        for (Result res : results) {
222          assertEquals(FAMILIES.length, res.rawCells().length);
223          Cell first = res.rawCells()[0];
224          for (Cell kv : res.rawCells()) {
225            assertTrue(CellUtil.matchingRows(first, kv));
226            assertTrue(Bytes.equals(CellUtil.cloneValue(first), CellUtil.cloneValue(kv)));
227          }
228        }
229        results.close();
230      }
231      String tableDigestBefore = UTIL.checksumRows(currentTable);
232      // Check region locality
233      HDFSBlocksDistribution hbd = new HDFSBlocksDistribution();
234      for (HRegion region : UTIL.getHBaseCluster().getRegions(currentTableName)) {
235        hbd.add(region.getHDFSBlocksDistribution());
236      }
237      for (String hostname : HOSTNAMES) {
238        float locality = hbd.getBlockLocalityIndex(hostname);
239        LOG.info("locality of [" + hostname + "]: " + locality);
240        assertEquals(100, (int) (locality * 100));
241      }
242
243      // Cause regions to reopen
244      admin.disableTable(currentTableName);
245      while (!admin.isTableDisabled(currentTableName)) {
246        Thread.sleep(200);
247        LOG.info("Waiting for table to disable");
248      }
249      admin.enableTable(currentTableName);
250      UTIL.waitTableAvailable(currentTableName);
251      assertEquals(tableDigestBefore, UTIL.checksumRows(currentTable),
252        "Data should remain after reopening of regions");
253    }
254  }
255}