001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.region; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertTrue; 023import static org.junit.Assert.fail; 024import static org.mockito.Mockito.mock; 025import static org.mockito.Mockito.when; 026 027import java.io.FileNotFoundException; 028import java.io.IOException; 029import java.util.ArrayList; 030import java.util.Arrays; 031import java.util.List; 032import java.util.concurrent.TimeUnit; 033import java.util.stream.Collectors; 034import org.apache.hadoop.conf.Configuration; 035import org.apache.hadoop.fs.FileStatus; 036import org.apache.hadoop.fs.FileSystem; 037import org.apache.hadoop.fs.Path; 038import org.apache.hadoop.hbase.Cell; 039import org.apache.hadoop.hbase.HBaseClassTestRule; 040import org.apache.hadoop.hbase.HBaseCommonTestingUtility; 041import org.apache.hadoop.hbase.HBaseTestingUtility; 042import org.apache.hadoop.hbase.HConstants; 043import org.apache.hadoop.hbase.Server; 044import org.apache.hadoop.hbase.ServerName; 045import org.apache.hadoop.hbase.TableName; 046import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 047import org.apache.hadoop.hbase.client.Put; 048import org.apache.hadoop.hbase.client.Result; 049import org.apache.hadoop.hbase.client.Scan; 050import org.apache.hadoop.hbase.client.TableDescriptor; 051import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 052import org.apache.hadoop.hbase.regionserver.MemStoreLAB; 053import org.apache.hadoop.hbase.regionserver.RegionScanner; 054import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory; 055import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL; 056import org.apache.hadoop.hbase.testclassification.MasterTests; 057import org.apache.hadoop.hbase.testclassification.MediumTests; 058import org.apache.hadoop.hbase.util.Bytes; 059import org.apache.hadoop.hbase.util.CommonFSUtils; 060import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 061import org.apache.hadoop.hbase.util.HFileArchiveUtil; 062import org.junit.After; 063import org.junit.AfterClass; 064import org.junit.Before; 065import org.junit.BeforeClass; 066import org.junit.ClassRule; 067import org.junit.Test; 068import org.junit.experimental.categories.Category; 069import org.slf4j.Logger; 070import org.slf4j.LoggerFactory; 071 072import org.apache.hbase.thirdparty.com.google.common.collect.Iterables; 073 074@Category({ MasterTests.class, MediumTests.class }) 075public class TestMasterRegionOnTwoFileSystems { 076 private static final Logger LOG = LoggerFactory.getLogger(TestMasterRegionOnTwoFileSystems.class); 077 078 @ClassRule 079 public static final HBaseClassTestRule CLASS_RULE = 080 HBaseClassTestRule.forClass(TestMasterRegionOnTwoFileSystems.class); 081 082 private static final HBaseCommonTestingUtility HFILE_UTIL = new HBaseCommonTestingUtility(); 083 084 private static final HBaseTestingUtility WAL_UTIL = new HBaseTestingUtility(); 085 086 private static byte[] CF = Bytes.toBytes("f"); 087 088 private static byte[] CQ = Bytes.toBytes("q"); 089 090 private static TableDescriptor TD = TableDescriptorBuilder 091 .newBuilder(TableName.valueOf("test:local")) 092 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(CF)) 093 .setValue(StoreFileTrackerFactory.TRACKER_IMPL, StoreFileTrackerFactory.Trackers.DEFAULT.name()) 094 .build(); 095 096 private static int COMPACT_MIN = 4; 097 098 private MasterRegion region; 099 100 @BeforeClass 101 public static void setUp() throws Exception { 102 WAL_UTIL.startMiniDFSCluster(3); 103 Configuration conf = HFILE_UTIL.getConfiguration(); 104 conf.setBoolean(MemStoreLAB.USEMSLAB_KEY, false); 105 Path rootDir = HFILE_UTIL.getDataTestDir(); 106 CommonFSUtils.setRootDir(conf, rootDir); 107 Path walRootDir = WAL_UTIL.getDataTestDirOnTestFS(); 108 FileSystem walFs = WAL_UTIL.getTestFileSystem(); 109 CommonFSUtils.setWALRootDir(conf, 110 walRootDir.makeQualified(walFs.getUri(), walFs.getWorkingDirectory())); 111 112 } 113 114 @AfterClass 115 public static void tearDown() throws IOException { 116 WAL_UTIL.shutdownMiniDFSCluster(); 117 WAL_UTIL.cleanupTestDir(); 118 HFILE_UTIL.cleanupTestDir(); 119 } 120 121 private MasterRegion createMasterRegion(ServerName serverName) throws IOException { 122 Server server = mock(Server.class); 123 when(server.getConfiguration()).thenReturn(HFILE_UTIL.getConfiguration()); 124 when(server.getServerName()).thenReturn(serverName); 125 MasterRegionParams params = new MasterRegionParams(); 126 params.server(server).regionDirName("local").tableDescriptor(TD) 127 .flushSize(TableDescriptorBuilder.DEFAULT_MEMSTORE_FLUSH_SIZE).flushPerChanges(1_000_000) 128 .flushIntervalMs(TimeUnit.MINUTES.toMillis(15)).compactMin(COMPACT_MIN).maxWals(32) 129 .useHsync(false).ringBufferSlotCount(16).rollPeriodMs(TimeUnit.MINUTES.toMillis(15)) 130 .archivedWalSuffix(MasterRegionFactory.ARCHIVED_WAL_SUFFIX) 131 .archivedHFileSuffix(MasterRegionFactory.ARCHIVED_HFILE_SUFFIX); 132 return MasterRegion.create(params); 133 } 134 135 @Before 136 public void setUpBeforeTest() throws IOException { 137 Path rootDir = HFILE_UTIL.getDataTestDir(); 138 FileSystem fs = rootDir.getFileSystem(HFILE_UTIL.getConfiguration()); 139 fs.delete(rootDir, true); 140 Path walRootDir = WAL_UTIL.getDataTestDirOnTestFS(); 141 FileSystem walFs = WAL_UTIL.getTestFileSystem(); 142 walFs.delete(walRootDir, true); 143 region = createMasterRegion( 144 ServerName.valueOf("localhost", 12345, EnvironmentEdgeManager.currentTime())); 145 } 146 147 @After 148 public void tearDownAfterTest() { 149 region.close(true); 150 } 151 152 private int getStorefilesCount() { 153 return Iterables.getOnlyElement(region.region.getStores()).getStorefilesCount(); 154 } 155 156 @Test 157 public void testFlushAndCompact() throws Exception { 158 int compactMinMinusOne = COMPACT_MIN - 1; 159 for (int i = 0; i < compactMinMinusOne; i++) { 160 final int index = i; 161 region 162 .update(r -> r.put(new Put(Bytes.toBytes(index)).addColumn(CF, CQ, Bytes.toBytes(index)))); 163 region.flush(true); 164 } 165 byte[] bytes = Bytes.toBytes(compactMinMinusOne); 166 region.update(r -> r.put(new Put(bytes).addColumn(CF, CQ, bytes))); 167 region.flusherAndCompactor.requestFlush(); 168 169 HFILE_UTIL.waitFor(15000, () -> getStorefilesCount() == 1); 170 171 // make sure the archived hfiles are on the root fs 172 Path storeArchiveDir = HFileArchiveUtil.getStoreArchivePathForRootDir( 173 HFILE_UTIL.getDataTestDir(), region.region.getRegionInfo(), CF); 174 FileSystem rootFs = storeArchiveDir.getFileSystem(HFILE_UTIL.getConfiguration()); 175 HFILE_UTIL.waitFor(15000, () -> { 176 try { 177 FileStatus[] fses = rootFs.listStatus(storeArchiveDir); 178 return fses != null && fses.length == COMPACT_MIN; 179 } catch (FileNotFoundException e) { 180 return false; 181 } 182 }); 183 LOG.info("hfile archive content {}", Arrays.stream(rootFs.listStatus(storeArchiveDir)) 184 .map(f -> f.getPath().toString()).collect(Collectors.joining(","))); 185 186 // make sure the archived wal files are on the wal fs 187 Path walArchiveDir = new Path(CommonFSUtils.getWALRootDir(HFILE_UTIL.getConfiguration()), 188 HConstants.HREGION_OLDLOGDIR_NAME); 189 LOG.info("wal archive dir {}", walArchiveDir); 190 AbstractFSWAL<?> wal = (AbstractFSWAL<?>) region.region.getWAL(); 191 Path currentWALFile = wal.getCurrentFileName(); 192 for (int i = 0;; i++) { 193 region.requestRollAll(); 194 region.waitUntilWalRollFinished(); 195 Path newWALFile = wal.getCurrentFileName(); 196 // make sure we actually rolled the wal 197 if (!newWALFile.equals(currentWALFile)) { 198 break; 199 } 200 if (i == 10) { 201 fail("Can not roll wal after " + i + " times"); 202 } 203 Thread.sleep(1000); 204 } 205 HFILE_UTIL.waitFor(15000, () -> { 206 try { 207 FileStatus[] fses = WAL_UTIL.getTestFileSystem().listStatus(walArchiveDir); 208 if (fses != null && fses.length > 0) { 209 LOG.info("wal archive dir content {}", 210 Arrays.stream(fses).map(f -> f.getPath().toString()).collect(Collectors.joining(","))); 211 } else { 212 LOG.info("none found"); 213 } 214 return fses != null && fses.length >= 1; 215 } catch (FileNotFoundException e) { 216 return false; 217 } 218 }); 219 } 220 221 @Test 222 public void testRecovery() throws IOException { 223 int countPerRound = 100; 224 for (int round = 0; round < 5; round++) { 225 for (int i = 0; i < countPerRound; i++) { 226 int row = round * countPerRound + i; 227 Put put = new Put(Bytes.toBytes(row)).addColumn(CF, CQ, Bytes.toBytes(row)); 228 region.update(r -> r.put(put)); 229 } 230 region.close(true); 231 region = createMasterRegion( 232 ServerName.valueOf("localhost", 12345, EnvironmentEdgeManager.currentTime() + round + 1)); 233 try (RegionScanner scanner = region.getRegionScanner(new Scan())) { 234 List<Cell> cells = new ArrayList<>(); 235 boolean moreValues = true; 236 for (int i = 0; i < (round + 1) * countPerRound; i++) { 237 assertTrue(moreValues); 238 moreValues = scanner.next(cells); 239 assertEquals(1, cells.size()); 240 Result result = Result.create(cells); 241 cells.clear(); 242 assertEquals(i, Bytes.toInt(result.getRow())); 243 assertEquals(i, Bytes.toInt(result.getValue(CF, CQ))); 244 } 245 assertFalse(moreValues); 246 } 247 } 248 } 249}