001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.junit.jupiter.api.Assertions.fail; 021 022import java.io.IOException; 023import java.util.ArrayList; 024import java.util.Arrays; 025import java.util.List; 026import java.util.concurrent.CountDownLatch; 027import java.util.stream.Stream; 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.fs.FileSystem; 030import org.apache.hadoop.fs.Path; 031import org.apache.hadoop.hbase.HBaseParameterizedTestTemplate; 032import org.apache.hadoop.hbase.HBaseTestingUtil; 033import org.apache.hadoop.hbase.HConstants; 034import org.apache.hadoop.hbase.TableName; 035import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 036import org.apache.hadoop.hbase.client.Increment; 037import org.apache.hadoop.hbase.client.Mutation; 038import org.apache.hadoop.hbase.client.Put; 039import org.apache.hadoop.hbase.client.RegionInfo; 040import org.apache.hadoop.hbase.client.RegionInfoBuilder; 041import org.apache.hadoop.hbase.client.TableDescriptor; 042import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 043import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL; 044import org.apache.hadoop.hbase.testclassification.RegionServerTests; 045import org.apache.hadoop.hbase.testclassification.SmallTests; 046import org.apache.hadoop.hbase.util.Bytes; 047import org.apache.hadoop.hbase.util.CommonFSUtils; 048import org.apache.hadoop.hbase.wal.WAL; 049import org.apache.hadoop.hbase.wal.WALEdit; 050import org.apache.hadoop.hbase.wal.WALFactory; 051import org.apache.hadoop.hbase.wal.WALStreamReader; 052import org.junit.jupiter.api.AfterAll; 053import org.junit.jupiter.api.AfterEach; 054import org.junit.jupiter.api.BeforeEach; 055import org.junit.jupiter.api.Tag; 056import org.junit.jupiter.api.TestInfo; 057import org.junit.jupiter.api.TestTemplate; 058import org.junit.jupiter.params.provider.Arguments; 059import org.slf4j.Logger; 060import org.slf4j.LoggerFactory; 061 062/** 063 * Test for HBASE-17471. 064 * <p> 065 * MVCCPreAssign is added by HBASE-16698, but pre-assign mvcc is only used in put/delete path. Other 066 * write paths like increment/append still assign mvcc in ringbuffer's consumer thread. If put and 067 * increment are used parallel. Then seqid in WAL may not increase monotonically Disorder in wals 068 * will lead to data loss. 069 * <p> 070 * This case use two thread to put and increment at the same time in a single region. Then check the 071 * seqid in WAL. If seqid is wal is not monotonically increasing, this case will fail 072 */ 073@Tag(RegionServerTests.TAG) 074@Tag(SmallTests.TAG) 075@HBaseParameterizedTestTemplate(name = "{index}: wal={0}") 076public class TestWALMonotonicallyIncreasingSeqId { 077 078 private final Logger LOG = LoggerFactory.getLogger(getClass()); 079 private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 080 private static Path testDir = TEST_UTIL.getDataTestDir("TestWALMonotonicallyIncreasingSeqId"); 081 private WALFactory wals; 082 private FileSystem fileSystem; 083 private Configuration walConf; 084 private HRegion region; 085 086 public String walProvider; 087 088 public TestWALMonotonicallyIncreasingSeqId(String walProvider) { 089 this.walProvider = walProvider; 090 } 091 092 public static Stream<Arguments> parameters() { 093 return Stream.of(Arguments.of("asyncfs"), Arguments.of("filesystem")); 094 } 095 096 private TableDescriptor getTableDesc(TableName tableName, byte[]... families) { 097 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName); 098 Arrays.stream(families) 099 .map( 100 f -> ColumnFamilyDescriptorBuilder.newBuilder(f).setMaxVersions(Integer.MAX_VALUE).build()) 101 .forEachOrdered(builder::setColumnFamily); 102 return builder.build(); 103 } 104 105 private HRegion initHRegion(TableDescriptor htd, byte[] startKey, byte[] stopKey, int replicaId) 106 throws IOException { 107 Configuration conf = TEST_UTIL.getConfiguration(); 108 conf.set("hbase.wal.provider", walProvider); 109 conf.setBoolean("hbase.hregion.mvcc.preassign", false); 110 Path tableDir = CommonFSUtils.getTableDir(testDir, htd.getTableName()); 111 112 RegionInfo info = RegionInfoBuilder.newBuilder(htd.getTableName()).setStartKey(startKey) 113 .setEndKey(stopKey).setReplicaId(replicaId).setRegionId(0).build(); 114 fileSystem = tableDir.getFileSystem(conf); 115 final Configuration walConf = new Configuration(conf); 116 CommonFSUtils.setRootDir(walConf, tableDir); 117 this.walConf = walConf; 118 wals = new WALFactory(walConf, "log_" + replicaId); 119 ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null, 120 MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT); 121 HRegion region = 122 HRegion.createHRegion(info, TEST_UTIL.getDefaultRootDirPath(), conf, htd, wals.getWAL(info)); 123 return region; 124 } 125 126 CountDownLatch latch = new CountDownLatch(1); 127 128 public class PutThread extends Thread { 129 HRegion region; 130 131 public PutThread(HRegion region) { 132 this.region = region; 133 } 134 135 @Override 136 public void run() { 137 try { 138 for (int i = 0; i < 100; i++) { 139 byte[] row = Bytes.toBytes("putRow" + i); 140 Put put = new Put(row); 141 put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes(0), new byte[0]); 142 latch.await(); 143 region.batchMutate(new Mutation[] { put }); 144 Thread.sleep(10); 145 } 146 147 } catch (Throwable t) { 148 LOG.warn("Error happend when Increment: ", t); 149 } 150 } 151 } 152 153 public class IncThread extends Thread { 154 HRegion region; 155 156 public IncThread(HRegion region) { 157 this.region = region; 158 } 159 160 @Override 161 public void run() { 162 try { 163 for (int i = 0; i < 100; i++) { 164 byte[] row = Bytes.toBytes("incrementRow" + i); 165 Increment inc = new Increment(row); 166 inc.addColumn(Bytes.toBytes("cf"), Bytes.toBytes(0), 1); 167 // inc.setDurability(Durability.ASYNC_WAL); 168 region.increment(inc); 169 latch.countDown(); 170 Thread.sleep(10); 171 } 172 173 } catch (Throwable t) { 174 LOG.warn("Error happend when Put: ", t); 175 } 176 } 177 } 178 179 @BeforeEach 180 public void setUp(TestInfo testInfo) throws IOException { 181 String name = testInfo.getTestMethod().get().getName() + "_" + walProvider; 182 byte[][] families = new byte[][] { Bytes.toBytes("cf") }; 183 TableDescriptor htd = 184 getTableDesc(TableName.valueOf(name.replaceAll("[^0-9A-Za-z_]", "_")), families); 185 region = initHRegion(htd, HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW, 0); 186 } 187 188 @AfterEach 189 public void tearDown() throws IOException { 190 if (region != null) { 191 region.close(); 192 } 193 if (wals != null) { 194 wals.close(); 195 wals = null; 196 } 197 } 198 199 @AfterAll 200 public static void tearDownAfterClass() throws IOException { 201 TEST_UTIL.cleanupTestDir(); 202 } 203 204 private WALStreamReader createReader(Path logPath, Path oldWalsDir) throws IOException { 205 try { 206 return wals.createStreamReader(fileSystem, logPath); 207 } catch (IOException e) { 208 return wals.createStreamReader(fileSystem, new Path(oldWalsDir, logPath.getName())); 209 } 210 } 211 212 @TestTemplate 213 public void testWALMonotonicallyIncreasingSeqId() throws Exception { 214 List<Thread> putThreads = new ArrayList<>(); 215 for (int i = 0; i < 1; i++) { 216 putThreads.add(new PutThread(region)); 217 } 218 IncThread incThread = new IncThread(region); 219 for (int i = 0; i < 1; i++) { 220 putThreads.get(i).start(); 221 } 222 incThread.start(); 223 incThread.join(); 224 225 Path logPath = ((AbstractFSWAL<?>) region.getWAL()).getCurrentFileName(); 226 region.getWAL().rollWriter(); 227 Thread.sleep(10); 228 Path hbaseDir = new Path(walConf.get(HConstants.HBASE_DIR)); 229 Path oldWalsDir = new Path(hbaseDir, HConstants.HREGION_OLDLOGDIR_NAME); 230 try (WALStreamReader reader = createReader(logPath, oldWalsDir)) { 231 long currentMaxSeqid = 0; 232 for (WAL.Entry e; (e = reader.next()) != null;) { 233 if (!WALEdit.isMetaEditFamily(e.getEdit().getCells().get(0))) { 234 long currentSeqid = e.getKey().getSequenceId(); 235 if (currentSeqid > currentMaxSeqid) { 236 currentMaxSeqid = currentSeqid; 237 } else { 238 fail("Current max Seqid is " + currentMaxSeqid 239 + ", but the next seqid in wal is smaller:" + currentSeqid); 240 } 241 } 242 } 243 } 244 } 245}