001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.region;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertTrue;
023import static org.mockito.Mockito.mock;
024import static org.mockito.Mockito.when;
025
026import java.io.FileNotFoundException;
027import java.io.IOException;
028import java.util.ArrayList;
029import java.util.Arrays;
030import java.util.List;
031import java.util.concurrent.TimeUnit;
032import java.util.stream.Collectors;
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.fs.FileStatus;
035import org.apache.hadoop.fs.FileSystem;
036import org.apache.hadoop.fs.Path;
037import org.apache.hadoop.hbase.Cell;
038import org.apache.hadoop.hbase.HBaseClassTestRule;
039import org.apache.hadoop.hbase.HBaseCommonTestingUtil;
040import org.apache.hadoop.hbase.HBaseTestingUtil;
041import org.apache.hadoop.hbase.HConstants;
042import org.apache.hadoop.hbase.Server;
043import org.apache.hadoop.hbase.ServerName;
044import org.apache.hadoop.hbase.TableName;
045import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
046import org.apache.hadoop.hbase.client.Put;
047import org.apache.hadoop.hbase.client.Result;
048import org.apache.hadoop.hbase.client.Scan;
049import org.apache.hadoop.hbase.client.TableDescriptor;
050import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
051import org.apache.hadoop.hbase.regionserver.MemStoreLAB;
052import org.apache.hadoop.hbase.regionserver.RegionScanner;
053import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
054import org.apache.hadoop.hbase.testclassification.MasterTests;
055import org.apache.hadoop.hbase.testclassification.MediumTests;
056import org.apache.hadoop.hbase.util.Bytes;
057import org.apache.hadoop.hbase.util.CommonFSUtils;
058import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
059import org.apache.hadoop.hbase.util.HFileArchiveUtil;
060import org.junit.After;
061import org.junit.AfterClass;
062import org.junit.Before;
063import org.junit.BeforeClass;
064import org.junit.ClassRule;
065import org.junit.Test;
066import org.junit.experimental.categories.Category;
067import org.slf4j.Logger;
068import org.slf4j.LoggerFactory;
069
070import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
071
072@Category({ MasterTests.class, MediumTests.class })
073public class TestMasterRegionOnTwoFileSystems {
074  private static final Logger LOG = LoggerFactory.getLogger(TestMasterRegionOnTwoFileSystems.class);
075
076  @ClassRule
077  public static final HBaseClassTestRule CLASS_RULE =
078    HBaseClassTestRule.forClass(TestMasterRegionOnTwoFileSystems.class);
079
080  private static final HBaseCommonTestingUtil HFILE_UTIL = new HBaseCommonTestingUtil();
081
082  private static final HBaseTestingUtil WAL_UTIL = new HBaseTestingUtil();
083
084  private static byte[] CF = Bytes.toBytes("f");
085
086  private static byte[] CQ = Bytes.toBytes("q");
087
088  private static TableDescriptor TD =
089    TableDescriptorBuilder.newBuilder(TableName.valueOf("test:local"))
090      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(CF)).build();
091
092  private static int COMPACT_MIN = 4;
093
094  private MasterRegion region;
095
096  @BeforeClass
097  public static void setUp() throws Exception {
098    WAL_UTIL.startMiniDFSCluster(3);
099    Configuration conf = HFILE_UTIL.getConfiguration();
100    conf.setBoolean(MemStoreLAB.USEMSLAB_KEY, false);
101    Path rootDir = HFILE_UTIL.getDataTestDir();
102    CommonFSUtils.setRootDir(conf, rootDir);
103    Path walRootDir = WAL_UTIL.getDataTestDirOnTestFS();
104    FileSystem walFs = WAL_UTIL.getTestFileSystem();
105    CommonFSUtils.setWALRootDir(conf,
106      walRootDir.makeQualified(walFs.getUri(), walFs.getWorkingDirectory()));
107
108  }
109
110  @AfterClass
111  public static void tearDown() throws IOException {
112    WAL_UTIL.shutdownMiniDFSCluster();
113    WAL_UTIL.cleanupTestDir();
114    HFILE_UTIL.cleanupTestDir();
115  }
116
117  private MasterRegion createMasterRegion(ServerName serverName) throws IOException {
118    Server server = mock(Server.class);
119    when(server.getConfiguration()).thenReturn(HFILE_UTIL.getConfiguration());
120    when(server.getServerName()).thenReturn(serverName);
121    MasterRegionParams params = new MasterRegionParams();
122    params.server(server).regionDirName("local").tableDescriptor(TD)
123      .flushSize(TableDescriptorBuilder.DEFAULT_MEMSTORE_FLUSH_SIZE).flushPerChanges(1_000_000)
124      .flushIntervalMs(TimeUnit.MINUTES.toMillis(15)).compactMin(COMPACT_MIN).maxWals(32)
125      .useHsync(false).ringBufferSlotCount(16).rollPeriodMs(TimeUnit.MINUTES.toMillis(15))
126      .archivedWalSuffix(MasterRegionFactory.ARCHIVED_WAL_SUFFIX)
127      .archivedHFileSuffix(MasterRegionFactory.ARCHIVED_HFILE_SUFFIX);
128    return MasterRegion.create(params);
129  }
130
131  @Before
132  public void setUpBeforeTest() throws IOException {
133    Path rootDir = HFILE_UTIL.getDataTestDir();
134    FileSystem fs = rootDir.getFileSystem(HFILE_UTIL.getConfiguration());
135    fs.delete(rootDir, true);
136    Path walRootDir = WAL_UTIL.getDataTestDirOnTestFS();
137    FileSystem walFs = WAL_UTIL.getTestFileSystem();
138    walFs.delete(walRootDir, true);
139    region = createMasterRegion(ServerName.valueOf("localhost", 12345,
140      EnvironmentEdgeManager.currentTime()));
141  }
142
143  @After
144  public void tearDownAfterTest() {
145    region.close(true);
146  }
147
148  private int getStorefilesCount() {
149    return Iterables.getOnlyElement(region.region.getStores()).getStorefilesCount();
150  }
151
152  @Test
153  public void testFlushAndCompact() throws Exception {
154    int compactMinMinusOne = COMPACT_MIN - 1;
155    for (int i = 0; i < compactMinMinusOne; i++) {
156      final int index = i;
157      region
158        .update(r -> r.put(new Put(Bytes.toBytes(index)).addColumn(CF, CQ, Bytes.toBytes(index))));
159      region.flush(true);
160    }
161    byte[] bytes = Bytes.toBytes(compactMinMinusOne);
162    region.update(r -> r.put(new Put(bytes).addColumn(CF, CQ, bytes)));
163    region.flusherAndCompactor.requestFlush();
164
165    HFILE_UTIL.waitFor(15000, () -> getStorefilesCount() == 1);
166
167    // make sure the archived hfiles are on the root fs
168    Path storeArchiveDir = HFileArchiveUtil.getStoreArchivePathForRootDir(
169      HFILE_UTIL.getDataTestDir(), region.region.getRegionInfo(), CF);
170    FileSystem rootFs = storeArchiveDir.getFileSystem(HFILE_UTIL.getConfiguration());
171    HFILE_UTIL.waitFor(15000, () -> {
172      try {
173        FileStatus[] fses = rootFs.listStatus(storeArchiveDir);
174        return fses != null && fses.length == COMPACT_MIN;
175      } catch (FileNotFoundException e) {
176        return false;
177      }
178    });
179    LOG.info("hfile archive content {}", Arrays.stream(rootFs.listStatus(storeArchiveDir))
180      .map(f -> f.getPath().toString()).collect(Collectors.joining(",")));
181
182    // make sure the archived wal files are on the wal fs
183    Path walArchiveDir = new Path(CommonFSUtils.getWALRootDir(HFILE_UTIL.getConfiguration()),
184      HConstants.HREGION_OLDLOGDIR_NAME);
185    LOG.info("wal archive dir {}", walArchiveDir);
186    AbstractFSWAL<?> wal = (AbstractFSWAL<?>) region.region.getWAL();
187    Path currentWALFile = wal.getCurrentFileName();
188    for (;;) {
189      region.requestRollAll();
190      region.waitUntilWalRollFinished();
191      Path newWALFile = wal.getCurrentFileName();
192      // make sure we actually rolled the wal
193      if (!newWALFile.equals(currentWALFile)) {
194        break;
195      }
196    }
197    HFILE_UTIL.waitFor(15000, () -> {
198      try {
199        FileStatus[] fses = WAL_UTIL.getTestFileSystem().listStatus(walArchiveDir);
200        if (fses != null && fses.length > 0) {
201          LOG.info("wal archive dir content {}",
202            Arrays.stream(fses).map(f -> f.getPath().toString()).collect(Collectors.joining(",")));
203        } else {
204          LOG.info("none found");
205        }
206        return fses != null && fses.length >= 1;
207      } catch (FileNotFoundException e) {
208        return false;
209      }
210    });
211  }
212
213  @Test
214  public void testRecovery() throws IOException {
215    int countPerRound = 100;
216    for (int round = 0; round < 5; round++) {
217      for (int i = 0; i < countPerRound; i++) {
218        int row = round * countPerRound + i;
219        Put put = new Put(Bytes.toBytes(row)).addColumn(CF, CQ, Bytes.toBytes(row));
220        region.update(r -> r.put(put));
221      }
222      region.close(true);
223      region = createMasterRegion(
224        ServerName.valueOf("localhost", 12345, EnvironmentEdgeManager.currentTime() + round + 1));
225      try (RegionScanner scanner = region.getRegionScanner(new Scan())) {
226        List<Cell> cells = new ArrayList<>();
227        boolean moreValues = true;
228        for (int i = 0; i < (round + 1) * countPerRound; i++) {
229          assertTrue(moreValues);
230          moreValues = scanner.next(cells);
231          assertEquals(1, cells.size());
232          Result result = Result.create(cells);
233          cells.clear();
234          assertEquals(i, Bytes.toInt(result.getRow()));
235          assertEquals(i, Bytes.toInt(result.getValue(CF, CQ)));
236        }
237        assertFalse(moreValues);
238      }
239    }
240  }
241}