001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.region; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertTrue; 023import static org.mockito.Mockito.mock; 024import static org.mockito.Mockito.when; 025 026import java.io.FileNotFoundException; 027import java.io.IOException; 028import java.util.ArrayList; 029import java.util.Arrays; 030import java.util.List; 031import java.util.concurrent.TimeUnit; 032import java.util.stream.Collectors; 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.fs.FileStatus; 035import org.apache.hadoop.fs.FileSystem; 036import org.apache.hadoop.fs.Path; 037import org.apache.hadoop.hbase.Cell; 038import org.apache.hadoop.hbase.HBaseClassTestRule; 039import org.apache.hadoop.hbase.HBaseCommonTestingUtility; 040import org.apache.hadoop.hbase.HBaseTestingUtility; 041import org.apache.hadoop.hbase.HConstants; 042import org.apache.hadoop.hbase.Server; 043import org.apache.hadoop.hbase.ServerName; 044import org.apache.hadoop.hbase.TableName; 045import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 046import org.apache.hadoop.hbase.client.Put; 047import org.apache.hadoop.hbase.client.Result; 048import org.apache.hadoop.hbase.client.Scan; 049import org.apache.hadoop.hbase.client.TableDescriptor; 050import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 051import org.apache.hadoop.hbase.regionserver.MemStoreLAB; 052import org.apache.hadoop.hbase.regionserver.RegionScanner; 053import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL; 054import org.apache.hadoop.hbase.testclassification.MasterTests; 055import org.apache.hadoop.hbase.testclassification.MediumTests; 056import org.apache.hadoop.hbase.util.Bytes; 057import org.apache.hadoop.hbase.util.CommonFSUtils; 058import org.apache.hadoop.hbase.util.HFileArchiveUtil; 059import org.junit.After; 060import org.junit.AfterClass; 061import org.junit.Before; 062import org.junit.BeforeClass; 063import org.junit.ClassRule; 064import org.junit.Test; 065import org.junit.experimental.categories.Category; 066import org.slf4j.Logger; 067import org.slf4j.LoggerFactory; 068 069import org.apache.hbase.thirdparty.com.google.common.collect.Iterables; 070 071@Category({ MasterTests.class, MediumTests.class }) 072public class TestMasterRegionOnTwoFileSystems { 073 private static final Logger LOG = LoggerFactory.getLogger(TestMasterRegionOnTwoFileSystems.class); 074 075 @ClassRule 076 public static final HBaseClassTestRule CLASS_RULE = 077 HBaseClassTestRule.forClass(TestMasterRegionOnTwoFileSystems.class); 078 079 private static final HBaseCommonTestingUtility HFILE_UTIL = new HBaseCommonTestingUtility(); 080 081 private static final HBaseTestingUtility WAL_UTIL = new HBaseTestingUtility(); 082 083 private static byte[] CF = Bytes.toBytes("f"); 084 085 private static byte[] CQ = Bytes.toBytes("q"); 086 087 private static TableDescriptor TD = 088 TableDescriptorBuilder.newBuilder(TableName.valueOf("test:local")) 089 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(CF)).build(); 090 091 private static int COMPACT_MIN = 4; 092 093 private MasterRegion region; 094 095 @BeforeClass 096 public static void setUp() throws Exception { 097 WAL_UTIL.startMiniDFSCluster(3); 098 Configuration conf = HFILE_UTIL.getConfiguration(); 099 conf.setBoolean(MemStoreLAB.USEMSLAB_KEY, false); 100 Path rootDir = HFILE_UTIL.getDataTestDir(); 101 CommonFSUtils.setRootDir(conf, rootDir); 102 Path walRootDir = WAL_UTIL.getDataTestDirOnTestFS(); 103 FileSystem walFs = WAL_UTIL.getTestFileSystem(); 104 CommonFSUtils.setWALRootDir(conf, 105 walRootDir.makeQualified(walFs.getUri(), walFs.getWorkingDirectory())); 106 107 } 108 109 @AfterClass 110 public static void tearDown() throws IOException { 111 WAL_UTIL.shutdownMiniDFSCluster(); 112 WAL_UTIL.cleanupTestDir(); 113 HFILE_UTIL.cleanupTestDir(); 114 } 115 116 private MasterRegion createMasterRegion(ServerName serverName) throws IOException { 117 Server server = mock(Server.class); 118 when(server.getConfiguration()).thenReturn(HFILE_UTIL.getConfiguration()); 119 when(server.getServerName()).thenReturn(serverName); 120 MasterRegionParams params = new MasterRegionParams(); 121 params.server(server).regionDirName("local").tableDescriptor(TD) 122 .flushSize(TableDescriptorBuilder.DEFAULT_MEMSTORE_FLUSH_SIZE).flushPerChanges(1_000_000) 123 .flushIntervalMs(TimeUnit.MINUTES.toMillis(15)).compactMin(COMPACT_MIN).maxWals(32) 124 .useHsync(false).ringBufferSlotCount(16).rollPeriodMs(TimeUnit.MINUTES.toMillis(15)) 125 .archivedWalSuffix(MasterRegionFactory.ARCHIVED_WAL_SUFFIX) 126 .archivedHFileSuffix(MasterRegionFactory.ARCHIVED_HFILE_SUFFIX); 127 return MasterRegion.create(params); 128 } 129 130 @Before 131 public void setUpBeforeTest() throws IOException { 132 Path rootDir = HFILE_UTIL.getDataTestDir(); 133 FileSystem fs = rootDir.getFileSystem(HFILE_UTIL.getConfiguration()); 134 fs.delete(rootDir, true); 135 Path walRootDir = WAL_UTIL.getDataTestDirOnTestFS(); 136 FileSystem walFs = WAL_UTIL.getTestFileSystem(); 137 walFs.delete(walRootDir, true); 138 region = createMasterRegion(ServerName.valueOf("localhost", 12345, System.currentTimeMillis())); 139 } 140 141 @After 142 public void tearDownAfterTest() { 143 region.close(true); 144 } 145 146 private int getStorefilesCount() { 147 return Iterables.getOnlyElement(region.region.getStores()).getStorefilesCount(); 148 } 149 150 @Test 151 public void testFlushAndCompact() throws Exception { 152 int compactMinMinusOne = COMPACT_MIN - 1; 153 for (int i = 0; i < compactMinMinusOne; i++) { 154 final int index = i; 155 region 156 .update(r -> r.put(new Put(Bytes.toBytes(index)).addColumn(CF, CQ, Bytes.toBytes(index)))); 157 region.flush(true); 158 } 159 byte[] bytes = Bytes.toBytes(compactMinMinusOne); 160 region.update(r -> r.put(new Put(bytes).addColumn(CF, CQ, bytes))); 161 region.flusherAndCompactor.requestFlush(); 162 163 HFILE_UTIL.waitFor(15000, () -> getStorefilesCount() == 1); 164 165 // make sure the archived hfiles are on the root fs 166 Path storeArchiveDir = HFileArchiveUtil.getStoreArchivePathForRootDir( 167 HFILE_UTIL.getDataTestDir(), region.region.getRegionInfo(), CF); 168 FileSystem rootFs = storeArchiveDir.getFileSystem(HFILE_UTIL.getConfiguration()); 169 HFILE_UTIL.waitFor(15000, () -> { 170 try { 171 FileStatus[] fses = rootFs.listStatus(storeArchiveDir); 172 return fses != null && fses.length == COMPACT_MIN; 173 } catch (FileNotFoundException e) { 174 return false; 175 } 176 }); 177 LOG.info("hfile archive content {}", Arrays.stream(rootFs.listStatus(storeArchiveDir)) 178 .map(f -> f.getPath().toString()).collect(Collectors.joining(","))); 179 180 // make sure the archived wal files are on the wal fs 181 Path walArchiveDir = new Path(CommonFSUtils.getWALRootDir(HFILE_UTIL.getConfiguration()), 182 HConstants.HREGION_OLDLOGDIR_NAME); 183 LOG.info("wal archive dir {}", walArchiveDir); 184 AbstractFSWAL<?> wal = (AbstractFSWAL<?>) region.region.getWAL(); 185 Path currentWALFile = wal.getCurrentFileName(); 186 for (;;) { 187 region.requestRollAll(); 188 region.waitUntilWalRollFinished(); 189 Path newWALFile = wal.getCurrentFileName(); 190 // make sure we actually rolled the wal 191 if (!newWALFile.equals(currentWALFile)) { 192 break; 193 } 194 } 195 HFILE_UTIL.waitFor(15000, () -> { 196 try { 197 FileStatus[] fses = WAL_UTIL.getTestFileSystem().listStatus(walArchiveDir); 198 if (fses != null && fses.length > 0) { 199 LOG.info("wal archive dir content {}", 200 Arrays.stream(fses).map(f -> f.getPath().toString()).collect(Collectors.joining(","))); 201 } else { 202 LOG.info("none found"); 203 } 204 return fses != null && fses.length >= 1; 205 } catch (FileNotFoundException e) { 206 return false; 207 } 208 }); 209 } 210 211 @Test 212 public void testRecovery() throws IOException { 213 int countPerRound = 100; 214 for (int round = 0; round < 5; round++) { 215 for (int i = 0; i < countPerRound; i++) { 216 int row = round * countPerRound + i; 217 Put put = new Put(Bytes.toBytes(row)).addColumn(CF, CQ, Bytes.toBytes(row)); 218 region.update(r -> r.put(put)); 219 } 220 region.close(true); 221 region = createMasterRegion( 222 ServerName.valueOf("localhost", 12345, System.currentTimeMillis() + round + 1)); 223 try (RegionScanner scanner = region.getScanner(new Scan())) { 224 List<Cell> cells = new ArrayList<>(); 225 boolean moreValues = true; 226 for (int i = 0; i < (round + 1) * countPerRound; i++) { 227 assertTrue(moreValues); 228 moreValues = scanner.next(cells); 229 assertEquals(1, cells.size()); 230 Result result = Result.create(cells); 231 cells.clear(); 232 assertEquals(i, Bytes.toInt(result.getRow())); 233 assertEquals(i, Bytes.toInt(result.getValue(CF, CQ))); 234 } 235 assertFalse(moreValues); 236 } 237 } 238 } 239}