001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.region;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertTrue;
023import static org.junit.Assert.fail;
024import static org.mockito.Mockito.mock;
025import static org.mockito.Mockito.when;
026
027import java.io.FileNotFoundException;
028import java.io.IOException;
029import java.util.ArrayList;
030import java.util.Arrays;
031import java.util.List;
032import java.util.concurrent.TimeUnit;
033import java.util.stream.Collectors;
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.fs.FileStatus;
036import org.apache.hadoop.fs.FileSystem;
037import org.apache.hadoop.fs.Path;
038import org.apache.hadoop.hbase.Cell;
039import org.apache.hadoop.hbase.HBaseClassTestRule;
040import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
041import org.apache.hadoop.hbase.HBaseTestingUtility;
042import org.apache.hadoop.hbase.HConstants;
043import org.apache.hadoop.hbase.Server;
044import org.apache.hadoop.hbase.ServerName;
045import org.apache.hadoop.hbase.TableName;
046import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
047import org.apache.hadoop.hbase.client.Put;
048import org.apache.hadoop.hbase.client.Result;
049import org.apache.hadoop.hbase.client.Scan;
050import org.apache.hadoop.hbase.client.TableDescriptor;
051import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
052import org.apache.hadoop.hbase.regionserver.MemStoreLAB;
053import org.apache.hadoop.hbase.regionserver.RegionScanner;
054import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
055import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
056import org.apache.hadoop.hbase.testclassification.MasterTests;
057import org.apache.hadoop.hbase.testclassification.MediumTests;
058import org.apache.hadoop.hbase.util.Bytes;
059import org.apache.hadoop.hbase.util.CommonFSUtils;
060import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
061import org.apache.hadoop.hbase.util.HFileArchiveUtil;
062import org.junit.After;
063import org.junit.AfterClass;
064import org.junit.Before;
065import org.junit.BeforeClass;
066import org.junit.ClassRule;
067import org.junit.Test;
068import org.junit.experimental.categories.Category;
069import org.slf4j.Logger;
070import org.slf4j.LoggerFactory;
071
072import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
073
074@Category({ MasterTests.class, MediumTests.class })
075public class TestMasterRegionOnTwoFileSystems {
076  private static final Logger LOG = LoggerFactory.getLogger(TestMasterRegionOnTwoFileSystems.class);
077
078  @ClassRule
079  public static final HBaseClassTestRule CLASS_RULE =
080    HBaseClassTestRule.forClass(TestMasterRegionOnTwoFileSystems.class);
081
082  private static final HBaseCommonTestingUtility HFILE_UTIL = new HBaseCommonTestingUtility();
083
084  private static final HBaseTestingUtility WAL_UTIL = new HBaseTestingUtility();
085
086  private static byte[] CF = Bytes.toBytes("f");
087
088  private static byte[] CQ = Bytes.toBytes("q");
089
090  private static TableDescriptor TD = TableDescriptorBuilder
091    .newBuilder(TableName.valueOf("test:local"))
092    .setColumnFamily(ColumnFamilyDescriptorBuilder.of(CF))
093    .setValue(StoreFileTrackerFactory.TRACKER_IMPL, StoreFileTrackerFactory.Trackers.DEFAULT.name())
094    .build();
095
096  private static int COMPACT_MIN = 4;
097
098  private MasterRegion region;
099
100  @BeforeClass
101  public static void setUp() throws Exception {
102    WAL_UTIL.startMiniDFSCluster(3);
103    Configuration conf = HFILE_UTIL.getConfiguration();
104    conf.setBoolean(MemStoreLAB.USEMSLAB_KEY, false);
105    Path rootDir = HFILE_UTIL.getDataTestDir();
106    CommonFSUtils.setRootDir(conf, rootDir);
107    Path walRootDir = WAL_UTIL.getDataTestDirOnTestFS();
108    FileSystem walFs = WAL_UTIL.getTestFileSystem();
109    CommonFSUtils.setWALRootDir(conf,
110      walRootDir.makeQualified(walFs.getUri(), walFs.getWorkingDirectory()));
111
112  }
113
114  @AfterClass
115  public static void tearDown() throws IOException {
116    WAL_UTIL.shutdownMiniDFSCluster();
117    WAL_UTIL.cleanupTestDir();
118    HFILE_UTIL.cleanupTestDir();
119  }
120
121  private MasterRegion createMasterRegion(ServerName serverName) throws IOException {
122    Server server = mock(Server.class);
123    when(server.getConfiguration()).thenReturn(HFILE_UTIL.getConfiguration());
124    when(server.getServerName()).thenReturn(serverName);
125    MasterRegionParams params = new MasterRegionParams();
126    params.server(server).regionDirName("local").tableDescriptor(TD)
127      .flushSize(TableDescriptorBuilder.DEFAULT_MEMSTORE_FLUSH_SIZE).flushPerChanges(1_000_000)
128      .flushIntervalMs(TimeUnit.MINUTES.toMillis(15)).compactMin(COMPACT_MIN).maxWals(32)
129      .useHsync(false).ringBufferSlotCount(16).rollPeriodMs(TimeUnit.MINUTES.toMillis(15))
130      .archivedWalSuffix(MasterRegionFactory.ARCHIVED_WAL_SUFFIX)
131      .archivedHFileSuffix(MasterRegionFactory.ARCHIVED_HFILE_SUFFIX);
132    return MasterRegion.create(params);
133  }
134
135  @Before
136  public void setUpBeforeTest() throws IOException {
137    Path rootDir = HFILE_UTIL.getDataTestDir();
138    FileSystem fs = rootDir.getFileSystem(HFILE_UTIL.getConfiguration());
139    fs.delete(rootDir, true);
140    Path walRootDir = WAL_UTIL.getDataTestDirOnTestFS();
141    FileSystem walFs = WAL_UTIL.getTestFileSystem();
142    walFs.delete(walRootDir, true);
143    region = createMasterRegion(
144      ServerName.valueOf("localhost", 12345, EnvironmentEdgeManager.currentTime()));
145  }
146
147  @After
148  public void tearDownAfterTest() {
149    region.close(true);
150  }
151
152  private int getStorefilesCount() {
153    return Iterables.getOnlyElement(region.region.getStores()).getStorefilesCount();
154  }
155
156  @Test
157  public void testFlushAndCompact() throws Exception {
158    int compactMinMinusOne = COMPACT_MIN - 1;
159    for (int i = 0; i < compactMinMinusOne; i++) {
160      final int index = i;
161      region
162        .update(r -> r.put(new Put(Bytes.toBytes(index)).addColumn(CF, CQ, Bytes.toBytes(index))));
163      region.flush(true);
164    }
165    byte[] bytes = Bytes.toBytes(compactMinMinusOne);
166    region.update(r -> r.put(new Put(bytes).addColumn(CF, CQ, bytes)));
167    region.flusherAndCompactor.requestFlush();
168
169    HFILE_UTIL.waitFor(15000, () -> getStorefilesCount() == 1);
170
171    // make sure the archived hfiles are on the root fs
172    Path storeArchiveDir = HFileArchiveUtil.getStoreArchivePathForRootDir(
173      HFILE_UTIL.getDataTestDir(), region.region.getRegionInfo(), CF);
174    FileSystem rootFs = storeArchiveDir.getFileSystem(HFILE_UTIL.getConfiguration());
175    HFILE_UTIL.waitFor(15000, () -> {
176      try {
177        FileStatus[] fses = rootFs.listStatus(storeArchiveDir);
178        return fses != null && fses.length == COMPACT_MIN;
179      } catch (FileNotFoundException e) {
180        return false;
181      }
182    });
183    LOG.info("hfile archive content {}", Arrays.stream(rootFs.listStatus(storeArchiveDir))
184      .map(f -> f.getPath().toString()).collect(Collectors.joining(",")));
185
186    // make sure the archived wal files are on the wal fs
187    Path walArchiveDir = new Path(CommonFSUtils.getWALRootDir(HFILE_UTIL.getConfiguration()),
188      HConstants.HREGION_OLDLOGDIR_NAME);
189    LOG.info("wal archive dir {}", walArchiveDir);
190    AbstractFSWAL<?> wal = (AbstractFSWAL<?>) region.region.getWAL();
191    Path currentWALFile = wal.getCurrentFileName();
192    for (int i = 0;; i++) {
193      region.requestRollAll();
194      region.waitUntilWalRollFinished();
195      Path newWALFile = wal.getCurrentFileName();
196      // make sure we actually rolled the wal
197      if (!newWALFile.equals(currentWALFile)) {
198        break;
199      }
200      if (i == 10) {
201        fail("Can not roll wal after " + i + " times");
202      }
203      Thread.sleep(1000);
204    }
205    HFILE_UTIL.waitFor(15000, () -> {
206      try {
207        FileStatus[] fses = WAL_UTIL.getTestFileSystem().listStatus(walArchiveDir);
208        if (fses != null && fses.length > 0) {
209          LOG.info("wal archive dir content {}",
210            Arrays.stream(fses).map(f -> f.getPath().toString()).collect(Collectors.joining(",")));
211        } else {
212          LOG.info("none found");
213        }
214        return fses != null && fses.length >= 1;
215      } catch (FileNotFoundException e) {
216        return false;
217      }
218    });
219  }
220
221  @Test
222  public void testRecovery() throws IOException {
223    int countPerRound = 100;
224    for (int round = 0; round < 5; round++) {
225      for (int i = 0; i < countPerRound; i++) {
226        int row = round * countPerRound + i;
227        Put put = new Put(Bytes.toBytes(row)).addColumn(CF, CQ, Bytes.toBytes(row));
228        region.update(r -> r.put(put));
229      }
230      region.close(true);
231      region = createMasterRegion(
232        ServerName.valueOf("localhost", 12345, EnvironmentEdgeManager.currentTime() + round + 1));
233      try (RegionScanner scanner = region.getRegionScanner(new Scan())) {
234        List<Cell> cells = new ArrayList<>();
235        boolean moreValues = true;
236        for (int i = 0; i < (round + 1) * countPerRound; i++) {
237          assertTrue(moreValues);
238          moreValues = scanner.next(cells);
239          assertEquals(1, cells.size());
240          Result result = Result.create(cells);
241          cells.clear();
242          assertEquals(i, Bytes.toInt(result.getRow()));
243          assertEquals(i, Bytes.toInt(result.getValue(CF, CQ)));
244        }
245        assertFalse(moreValues);
246      }
247    }
248  }
249}