001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.region;
019
020import static org.junit.jupiter.api.Assertions.assertEquals;
021import static org.junit.jupiter.api.Assertions.assertFalse;
022import static org.junit.jupiter.api.Assertions.assertTrue;
023import static org.junit.jupiter.api.Assertions.fail;
024import static org.mockito.Mockito.mock;
025import static org.mockito.Mockito.when;
026
027import java.io.FileNotFoundException;
028import java.io.IOException;
029import java.util.ArrayList;
030import java.util.Arrays;
031import java.util.List;
032import java.util.concurrent.TimeUnit;
033import java.util.stream.Collectors;
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.fs.FileStatus;
036import org.apache.hadoop.fs.FileSystem;
037import org.apache.hadoop.fs.Path;
038import org.apache.hadoop.hbase.Cell;
039import org.apache.hadoop.hbase.HBaseCommonTestingUtil;
040import org.apache.hadoop.hbase.HBaseTestingUtil;
041import org.apache.hadoop.hbase.HConstants;
042import org.apache.hadoop.hbase.ServerName;
043import org.apache.hadoop.hbase.TableName;
044import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
045import org.apache.hadoop.hbase.client.Put;
046import org.apache.hadoop.hbase.client.Result;
047import org.apache.hadoop.hbase.client.Scan;
048import org.apache.hadoop.hbase.client.TableDescriptor;
049import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
050import org.apache.hadoop.hbase.master.MasterServices;
051import org.apache.hadoop.hbase.regionserver.MemStoreLAB;
052import org.apache.hadoop.hbase.regionserver.RegionScanner;
053import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
054import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
055import org.apache.hadoop.hbase.testclassification.MasterTests;
056import org.apache.hadoop.hbase.testclassification.MediumTests;
057import org.apache.hadoop.hbase.util.Bytes;
058import org.apache.hadoop.hbase.util.CommonFSUtils;
059import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
060import org.apache.hadoop.hbase.util.HFileArchiveUtil;
061import org.junit.jupiter.api.AfterAll;
062import org.junit.jupiter.api.AfterEach;
063import org.junit.jupiter.api.BeforeAll;
064import org.junit.jupiter.api.BeforeEach;
065import org.junit.jupiter.api.Tag;
066import org.junit.jupiter.api.Test;
067import org.slf4j.Logger;
068import org.slf4j.LoggerFactory;
069
070import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
071
072@Tag(MasterTests.TAG)
073@Tag(MediumTests.TAG)
074public class TestMasterRegionOnTwoFileSystems {
075  private static final Logger LOG = LoggerFactory.getLogger(TestMasterRegionOnTwoFileSystems.class);
076
077  private static final HBaseCommonTestingUtil HFILE_UTIL = new HBaseCommonTestingUtil();
078
079  private static final HBaseTestingUtil WAL_UTIL = new HBaseTestingUtil();
080
081  private static byte[] CF = Bytes.toBytes("f");
082
083  private static byte[] CQ = Bytes.toBytes("q");
084
085  private static TableDescriptor TD = TableDescriptorBuilder
086    .newBuilder(TableName.valueOf("test:local"))
087    .setColumnFamily(ColumnFamilyDescriptorBuilder.of(CF))
088    .setValue(StoreFileTrackerFactory.TRACKER_IMPL, StoreFileTrackerFactory.Trackers.DEFAULT.name())
089    .build();
090
091  private static int COMPACT_MIN = 4;
092
093  private MasterRegion region;
094
095  @BeforeAll
096  public static void setUp() throws Exception {
097    WAL_UTIL.startMiniDFSCluster(3);
098    Configuration conf = HFILE_UTIL.getConfiguration();
099    conf.setBoolean(MemStoreLAB.USEMSLAB_KEY, false);
100    Path rootDir = HFILE_UTIL.getDataTestDir();
101    CommonFSUtils.setRootDir(conf, rootDir);
102    Path walRootDir = WAL_UTIL.getDataTestDirOnTestFS();
103    FileSystem walFs = WAL_UTIL.getTestFileSystem();
104    CommonFSUtils.setWALRootDir(conf,
105      walRootDir.makeQualified(walFs.getUri(), walFs.getWorkingDirectory()));
106
107  }
108
109  @AfterAll
110  public static void tearDown() throws IOException {
111    WAL_UTIL.shutdownMiniDFSCluster();
112    WAL_UTIL.cleanupTestDir();
113    HFILE_UTIL.cleanupTestDir();
114  }
115
116  private MasterRegion createMasterRegion(ServerName serverName) throws IOException {
117    MasterServices server = mock(MasterServices.class);
118    when(server.getConfiguration()).thenReturn(HFILE_UTIL.getConfiguration());
119    when(server.getServerName()).thenReturn(serverName);
120    MasterRegionParams params = new MasterRegionParams();
121    params.server(server).regionDirName("local").tableDescriptor(TD)
122      .flushSize(TableDescriptorBuilder.DEFAULT_MEMSTORE_FLUSH_SIZE).flushPerChanges(1_000_000)
123      .flushIntervalMs(TimeUnit.MINUTES.toMillis(15)).compactMin(COMPACT_MIN).maxWals(32)
124      .useHsync(false).ringBufferSlotCount(16).rollPeriodMs(TimeUnit.MINUTES.toMillis(15))
125      .archivedWalSuffix(MasterRegionFactory.ARCHIVED_WAL_SUFFIX)
126      .archivedHFileSuffix(MasterRegionFactory.ARCHIVED_HFILE_SUFFIX);
127    return MasterRegion.create(params);
128  }
129
130  @BeforeEach
131  public void setUpBeforeTest() throws IOException {
132    Path rootDir = HFILE_UTIL.getDataTestDir();
133    FileSystem fs = rootDir.getFileSystem(HFILE_UTIL.getConfiguration());
134    fs.delete(rootDir, true);
135    Path walRootDir = WAL_UTIL.getDataTestDirOnTestFS();
136    FileSystem walFs = WAL_UTIL.getTestFileSystem();
137    walFs.delete(walRootDir, true);
138    region = createMasterRegion(
139      ServerName.valueOf("localhost", 12345, EnvironmentEdgeManager.currentTime()));
140  }
141
142  @AfterEach
143  public void tearDownAfterTest() {
144    region.close(true);
145  }
146
147  private int getStorefilesCount() {
148    return Iterables.getOnlyElement(region.region.getStores()).getStorefilesCount();
149  }
150
151  @Test
152  public void testFlushAndCompact() throws Exception {
153    int compactMinMinusOne = COMPACT_MIN - 1;
154    for (int i = 0; i < compactMinMinusOne; i++) {
155      final int index = i;
156      region
157        .update(r -> r.put(new Put(Bytes.toBytes(index)).addColumn(CF, CQ, Bytes.toBytes(index))));
158      region.flush(true);
159    }
160    byte[] bytes = Bytes.toBytes(compactMinMinusOne);
161    region.update(r -> r.put(new Put(bytes).addColumn(CF, CQ, bytes)));
162    region.flusherAndCompactor.requestFlush();
163
164    HFILE_UTIL.waitFor(15000, () -> getStorefilesCount() == 1);
165
166    // make sure the archived hfiles are on the root fs
167    Path storeArchiveDir = HFileArchiveUtil.getStoreArchivePathForRootDir(
168      HFILE_UTIL.getDataTestDir(), region.region.getRegionInfo(), CF);
169    FileSystem rootFs = storeArchiveDir.getFileSystem(HFILE_UTIL.getConfiguration());
170    HFILE_UTIL.waitFor(15000, () -> {
171      try {
172        FileStatus[] fses = rootFs.listStatus(storeArchiveDir);
173        return fses != null && fses.length == COMPACT_MIN;
174      } catch (FileNotFoundException e) {
175        return false;
176      }
177    });
178    LOG.info("hfile archive content {}", Arrays.stream(rootFs.listStatus(storeArchiveDir))
179      .map(f -> f.getPath().toString()).collect(Collectors.joining(",")));
180
181    // make sure the archived wal files are on the wal fs
182    Path walArchiveDir = new Path(CommonFSUtils.getWALRootDir(HFILE_UTIL.getConfiguration()),
183      HConstants.HREGION_OLDLOGDIR_NAME);
184    LOG.info("wal archive dir {}", walArchiveDir);
185    AbstractFSWAL<?> wal = (AbstractFSWAL<?>) region.region.getWAL();
186    Path currentWALFile = wal.getCurrentFileName();
187    for (int i = 0;; i++) {
188      region.requestRollAll();
189      region.waitUntilWalRollFinished();
190      Path newWALFile = wal.getCurrentFileName();
191      // make sure we actually rolled the wal
192      if (!newWALFile.equals(currentWALFile)) {
193        break;
194      }
195      if (i == 10) {
196        fail("Can not roll wal after " + i + " times");
197      }
198      Thread.sleep(1000);
199    }
200    HFILE_UTIL.waitFor(15000, () -> {
201      try {
202        FileStatus[] fses = WAL_UTIL.getTestFileSystem().listStatus(walArchiveDir);
203        if (fses != null && fses.length > 0) {
204          LOG.info("wal archive dir content {}",
205            Arrays.stream(fses).map(f -> f.getPath().toString()).collect(Collectors.joining(",")));
206        } else {
207          LOG.info("none found");
208        }
209        return fses != null && fses.length >= 1;
210      } catch (FileNotFoundException e) {
211        return false;
212      }
213    });
214  }
215
216  @Test
217  public void testRecovery() throws IOException {
218    int countPerRound = 100;
219    for (int round = 0; round < 5; round++) {
220      for (int i = 0; i < countPerRound; i++) {
221        int row = round * countPerRound + i;
222        Put put = new Put(Bytes.toBytes(row)).addColumn(CF, CQ, Bytes.toBytes(row));
223        region.update(r -> r.put(put));
224      }
225      region.close(true);
226      region = createMasterRegion(
227        ServerName.valueOf("localhost", 12345, EnvironmentEdgeManager.currentTime() + round + 1));
228      try (RegionScanner scanner = region.getRegionScanner(new Scan())) {
229        List<Cell> cells = new ArrayList<>();
230        boolean moreValues = true;
231        for (int i = 0; i < (round + 1) * countPerRound; i++) {
232          assertTrue(moreValues);
233          moreValues = scanner.next(cells);
234          assertEquals(1, cells.size());
235          Result result = Result.create(cells);
236          cells.clear();
237          assertEquals(i, Bytes.toInt(result.getRow()));
238          assertEquals(i, Bytes.toInt(result.getValue(CF, CQ)));
239        }
240        assertFalse(moreValues);
241      }
242    }
243  }
244}