001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.region; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021import static org.junit.jupiter.api.Assertions.assertFalse; 022import static org.junit.jupiter.api.Assertions.assertTrue; 023import static org.junit.jupiter.api.Assertions.fail; 024import static org.mockito.Mockito.mock; 025import static org.mockito.Mockito.when; 026 027import java.io.FileNotFoundException; 028import java.io.IOException; 029import java.util.ArrayList; 030import java.util.Arrays; 031import java.util.List; 032import java.util.concurrent.TimeUnit; 033import java.util.stream.Collectors; 034import org.apache.hadoop.conf.Configuration; 035import org.apache.hadoop.fs.FileStatus; 036import org.apache.hadoop.fs.FileSystem; 037import org.apache.hadoop.fs.Path; 038import org.apache.hadoop.hbase.Cell; 039import org.apache.hadoop.hbase.HBaseCommonTestingUtil; 040import org.apache.hadoop.hbase.HBaseTestingUtil; 041import org.apache.hadoop.hbase.HConstants; 042import org.apache.hadoop.hbase.ServerName; 043import org.apache.hadoop.hbase.TableName; 044import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 045import org.apache.hadoop.hbase.client.Put; 046import org.apache.hadoop.hbase.client.Result; 047import org.apache.hadoop.hbase.client.Scan; 048import org.apache.hadoop.hbase.client.TableDescriptor; 049import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 050import org.apache.hadoop.hbase.master.MasterServices; 051import org.apache.hadoop.hbase.regionserver.MemStoreLAB; 052import org.apache.hadoop.hbase.regionserver.RegionScanner; 053import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory; 054import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL; 055import org.apache.hadoop.hbase.testclassification.MasterTests; 056import org.apache.hadoop.hbase.testclassification.MediumTests; 057import org.apache.hadoop.hbase.util.Bytes; 058import org.apache.hadoop.hbase.util.CommonFSUtils; 059import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 060import org.apache.hadoop.hbase.util.HFileArchiveUtil; 061import org.junit.jupiter.api.AfterAll; 062import org.junit.jupiter.api.AfterEach; 063import org.junit.jupiter.api.BeforeAll; 064import org.junit.jupiter.api.BeforeEach; 065import org.junit.jupiter.api.Tag; 066import org.junit.jupiter.api.Test; 067import org.slf4j.Logger; 068import org.slf4j.LoggerFactory; 069 070import org.apache.hbase.thirdparty.com.google.common.collect.Iterables; 071 072@Tag(MasterTests.TAG) 073@Tag(MediumTests.TAG) 074public class TestMasterRegionOnTwoFileSystems { 075 private static final Logger LOG = LoggerFactory.getLogger(TestMasterRegionOnTwoFileSystems.class); 076 077 private static final HBaseCommonTestingUtil HFILE_UTIL = new HBaseCommonTestingUtil(); 078 079 private static final HBaseTestingUtil WAL_UTIL = new HBaseTestingUtil(); 080 081 private static byte[] CF = Bytes.toBytes("f"); 082 083 private static byte[] CQ = Bytes.toBytes("q"); 084 085 private static TableDescriptor TD = TableDescriptorBuilder 086 .newBuilder(TableName.valueOf("test:local")) 087 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(CF)) 088 .setValue(StoreFileTrackerFactory.TRACKER_IMPL, StoreFileTrackerFactory.Trackers.DEFAULT.name()) 089 .build(); 090 091 private static int COMPACT_MIN = 4; 092 093 private MasterRegion region; 094 095 @BeforeAll 096 public static void setUp() throws Exception { 097 WAL_UTIL.startMiniDFSCluster(3); 098 Configuration conf = HFILE_UTIL.getConfiguration(); 099 conf.setBoolean(MemStoreLAB.USEMSLAB_KEY, false); 100 Path rootDir = HFILE_UTIL.getDataTestDir(); 101 CommonFSUtils.setRootDir(conf, rootDir); 102 Path walRootDir = WAL_UTIL.getDataTestDirOnTestFS(); 103 FileSystem walFs = WAL_UTIL.getTestFileSystem(); 104 CommonFSUtils.setWALRootDir(conf, 105 walRootDir.makeQualified(walFs.getUri(), walFs.getWorkingDirectory())); 106 107 } 108 109 @AfterAll 110 public static void tearDown() throws IOException { 111 WAL_UTIL.shutdownMiniDFSCluster(); 112 WAL_UTIL.cleanupTestDir(); 113 HFILE_UTIL.cleanupTestDir(); 114 } 115 116 private MasterRegion createMasterRegion(ServerName serverName) throws IOException { 117 MasterServices server = mock(MasterServices.class); 118 when(server.getConfiguration()).thenReturn(HFILE_UTIL.getConfiguration()); 119 when(server.getServerName()).thenReturn(serverName); 120 MasterRegionParams params = new MasterRegionParams(); 121 params.server(server).regionDirName("local").tableDescriptor(TD) 122 .flushSize(TableDescriptorBuilder.DEFAULT_MEMSTORE_FLUSH_SIZE).flushPerChanges(1_000_000) 123 .flushIntervalMs(TimeUnit.MINUTES.toMillis(15)).compactMin(COMPACT_MIN).maxWals(32) 124 .useHsync(false).ringBufferSlotCount(16).rollPeriodMs(TimeUnit.MINUTES.toMillis(15)) 125 .archivedWalSuffix(MasterRegionFactory.ARCHIVED_WAL_SUFFIX) 126 .archivedHFileSuffix(MasterRegionFactory.ARCHIVED_HFILE_SUFFIX); 127 return MasterRegion.create(params); 128 } 129 130 @BeforeEach 131 public void setUpBeforeTest() throws IOException { 132 Path rootDir = HFILE_UTIL.getDataTestDir(); 133 FileSystem fs = rootDir.getFileSystem(HFILE_UTIL.getConfiguration()); 134 fs.delete(rootDir, true); 135 Path walRootDir = WAL_UTIL.getDataTestDirOnTestFS(); 136 FileSystem walFs = WAL_UTIL.getTestFileSystem(); 137 walFs.delete(walRootDir, true); 138 region = createMasterRegion( 139 ServerName.valueOf("localhost", 12345, EnvironmentEdgeManager.currentTime())); 140 } 141 142 @AfterEach 143 public void tearDownAfterTest() { 144 region.close(true); 145 } 146 147 private int getStorefilesCount() { 148 return Iterables.getOnlyElement(region.region.getStores()).getStorefilesCount(); 149 } 150 151 @Test 152 public void testFlushAndCompact() throws Exception { 153 int compactMinMinusOne = COMPACT_MIN - 1; 154 for (int i = 0; i < compactMinMinusOne; i++) { 155 final int index = i; 156 region 157 .update(r -> r.put(new Put(Bytes.toBytes(index)).addColumn(CF, CQ, Bytes.toBytes(index)))); 158 region.flush(true); 159 } 160 byte[] bytes = Bytes.toBytes(compactMinMinusOne); 161 region.update(r -> r.put(new Put(bytes).addColumn(CF, CQ, bytes))); 162 region.flusherAndCompactor.requestFlush(); 163 164 HFILE_UTIL.waitFor(15000, () -> getStorefilesCount() == 1); 165 166 // make sure the archived hfiles are on the root fs 167 Path storeArchiveDir = HFileArchiveUtil.getStoreArchivePathForRootDir( 168 HFILE_UTIL.getDataTestDir(), region.region.getRegionInfo(), CF); 169 FileSystem rootFs = storeArchiveDir.getFileSystem(HFILE_UTIL.getConfiguration()); 170 HFILE_UTIL.waitFor(15000, () -> { 171 try { 172 FileStatus[] fses = rootFs.listStatus(storeArchiveDir); 173 return fses != null && fses.length == COMPACT_MIN; 174 } catch (FileNotFoundException e) { 175 return false; 176 } 177 }); 178 LOG.info("hfile archive content {}", Arrays.stream(rootFs.listStatus(storeArchiveDir)) 179 .map(f -> f.getPath().toString()).collect(Collectors.joining(","))); 180 181 // make sure the archived wal files are on the wal fs 182 Path walArchiveDir = new Path(CommonFSUtils.getWALRootDir(HFILE_UTIL.getConfiguration()), 183 HConstants.HREGION_OLDLOGDIR_NAME); 184 LOG.info("wal archive dir {}", walArchiveDir); 185 AbstractFSWAL<?> wal = (AbstractFSWAL<?>) region.region.getWAL(); 186 Path currentWALFile = wal.getCurrentFileName(); 187 for (int i = 0;; i++) { 188 region.requestRollAll(); 189 region.waitUntilWalRollFinished(); 190 Path newWALFile = wal.getCurrentFileName(); 191 // make sure we actually rolled the wal 192 if (!newWALFile.equals(currentWALFile)) { 193 break; 194 } 195 if (i == 10) { 196 fail("Can not roll wal after " + i + " times"); 197 } 198 Thread.sleep(1000); 199 } 200 HFILE_UTIL.waitFor(15000, () -> { 201 try { 202 FileStatus[] fses = WAL_UTIL.getTestFileSystem().listStatus(walArchiveDir); 203 if (fses != null && fses.length > 0) { 204 LOG.info("wal archive dir content {}", 205 Arrays.stream(fses).map(f -> f.getPath().toString()).collect(Collectors.joining(","))); 206 } else { 207 LOG.info("none found"); 208 } 209 return fses != null && fses.length >= 1; 210 } catch (FileNotFoundException e) { 211 return false; 212 } 213 }); 214 } 215 216 @Test 217 public void testRecovery() throws IOException { 218 int countPerRound = 100; 219 for (int round = 0; round < 5; round++) { 220 for (int i = 0; i < countPerRound; i++) { 221 int row = round * countPerRound + i; 222 Put put = new Put(Bytes.toBytes(row)).addColumn(CF, CQ, Bytes.toBytes(row)); 223 region.update(r -> r.put(put)); 224 } 225 region.close(true); 226 region = createMasterRegion( 227 ServerName.valueOf("localhost", 12345, EnvironmentEdgeManager.currentTime() + round + 1)); 228 try (RegionScanner scanner = region.getRegionScanner(new Scan())) { 229 List<Cell> cells = new ArrayList<>(); 230 boolean moreValues = true; 231 for (int i = 0; i < (round + 1) * countPerRound; i++) { 232 assertTrue(moreValues); 233 moreValues = scanner.next(cells); 234 assertEquals(1, cells.size()); 235 Result result = Result.create(cells); 236 cells.clear(); 237 assertEquals(i, Bytes.toInt(result.getRow())); 238 assertEquals(i, Bytes.toInt(result.getValue(CF, CQ))); 239 } 240 assertFalse(moreValues); 241 } 242 } 243 } 244}