001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.wal; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021 022import java.io.IOException; 023import java.lang.reflect.Field; 024import java.util.List; 025import java.util.NavigableMap; 026import java.util.TreeMap; 027import java.util.concurrent.CountDownLatch; 028import java.util.concurrent.ExecutorService; 029import java.util.concurrent.Executors; 030import java.util.concurrent.atomic.AtomicBoolean; 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.fs.FileSystem; 033import org.apache.hadoop.fs.Path; 034import org.apache.hadoop.hbase.HConstants; 035import org.apache.hadoop.hbase.TableName; 036import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 037import org.apache.hadoop.hbase.client.Put; 038import org.apache.hadoop.hbase.client.RegionInfo; 039import org.apache.hadoop.hbase.client.RegionInfoBuilder; 040import org.apache.hadoop.hbase.client.TableDescriptor; 041import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 042import org.apache.hadoop.hbase.regionserver.ChunkCreator; 043import org.apache.hadoop.hbase.regionserver.HRegion; 044import org.apache.hadoop.hbase.regionserver.MemStoreLAB; 045import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; 046import org.apache.hadoop.hbase.testclassification.MediumTests; 047import org.apache.hadoop.hbase.testclassification.RegionServerTests; 048import org.apache.hadoop.hbase.util.Bytes; 049import org.apache.hadoop.hbase.util.CommonFSUtils; 050import org.apache.hadoop.hbase.util.Threads; 051import org.apache.hadoop.hbase.wal.WALEdit; 052import org.apache.hadoop.hbase.wal.WALKey; 053import org.junit.jupiter.api.BeforeEach; 054import org.junit.jupiter.api.Tag; 055import org.junit.jupiter.api.Test; 056import org.junit.jupiter.api.TestInfo; 057 058/** 059 * Provides FSHLog test cases. 060 */ 061@Tag(RegionServerTests.TAG) 062@Tag(MediumTests.TAG) 063public class TestFSHLog extends AbstractTestFSWAL { 064 065 private static final long TEST_TIMEOUT_MS = 10000; 066 067 private String name; 068 069 @BeforeEach 070 public void initTestName(TestInfo testInfo) { 071 name = testInfo.getTestMethod().get().getName(); 072 } 073 074 @Override 075 protected AbstractFSWAL<?> newWAL(FileSystem fs, Path rootDir, String walDir, String archiveDir, 076 Configuration conf, List<WALActionsListener> listeners, boolean failIfWALExists, String prefix, 077 String suffix) throws IOException { 078 FSHLog wal = 079 new FSHLog(fs, rootDir, walDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix); 080 wal.init(); 081 return wal; 082 } 083 084 @Override 085 protected AbstractFSWAL<?> newSlowWAL(FileSystem fs, Path rootDir, String walDir, 086 String archiveDir, Configuration conf, List<WALActionsListener> listeners, 087 boolean failIfWALExists, String prefix, String suffix, final Runnable action) 088 throws IOException { 089 FSHLog wal = new FSHLog(fs, rootDir, walDir, archiveDir, conf, listeners, failIfWALExists, 090 prefix, suffix) { 091 092 @Override 093 protected void atHeadOfRingBufferEventHandlerAppend() { 094 action.run(); 095 super.atHeadOfRingBufferEventHandlerAppend(); 096 } 097 }; 098 wal.init(); 099 return wal; 100 } 101 102 @Test 103 public void testSyncRunnerIndexOverflow() throws IOException, NoSuchFieldException, 104 SecurityException, IllegalArgumentException, IllegalAccessException { 105 final String name = this.name; 106 FS.mkdirs(new Path(CommonFSUtils.getRootDir(CONF), name)); 107 FSHLog log = new FSHLog(FS, CommonFSUtils.getRootDir(CONF), name, 108 HConstants.HREGION_OLDLOGDIR_NAME, CONF, null, true, null, null); 109 log.init(); 110 try { 111 Field syncRunnerIndexField = FSHLog.class.getDeclaredField("syncRunnerIndex"); 112 syncRunnerIndexField.setAccessible(true); 113 syncRunnerIndexField.set(log, Integer.MAX_VALUE - 1); 114 TableDescriptor htd = TableDescriptorBuilder.newBuilder(TableName.valueOf(this.name)) 115 .setColumnFamily(ColumnFamilyDescriptorBuilder.of("row")).build(); 116 NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); 117 for (byte[] fam : htd.getColumnFamilyNames()) { 118 scopes.put(fam, 0); 119 } 120 RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).build(); 121 MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); 122 for (int i = 0; i < 10; i++) { 123 addEdits(log, hri, htd, 1, mvcc, scopes, "row"); 124 } 125 } finally { 126 log.close(); 127 } 128 } 129 130 /** 131 * Test case for https://issues.apache.org/jira/browse/HBASE-16721 132 */ 133 @Test 134 public void testUnflushedSeqIdTracking() throws IOException, InterruptedException { 135 final String name = this.name; 136 final byte[] b = Bytes.toBytes("b"); 137 138 final AtomicBoolean startHoldingForAppend = new AtomicBoolean(false); 139 final CountDownLatch holdAppend = new CountDownLatch(1); 140 final CountDownLatch flushFinished = new CountDownLatch(1); 141 final CountDownLatch putFinished = new CountDownLatch(1); 142 143 FS.mkdirs(new Path(CommonFSUtils.getRootDir(CONF), name)); 144 try (FSHLog log = new FSHLog(FS, CommonFSUtils.getRootDir(CONF), name, 145 HConstants.HREGION_OLDLOGDIR_NAME, CONF, null, true, null, null)) { 146 log.init(); 147 log.registerWALActionsListener(new WALActionsListener() { 148 @Override 149 public void visitLogEntryBeforeWrite(RegionInfo info, WALKey logKey, WALEdit logEdit) { 150 if (startHoldingForAppend.get()) { 151 try { 152 holdAppend.await(); 153 } catch (InterruptedException e) { 154 LOG.error(e.toString(), e); 155 } 156 } 157 } 158 }); 159 160 // open a new region which uses this WAL 161 TableDescriptor htd = TableDescriptorBuilder.newBuilder(TableName.valueOf(this.name)) 162 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(b)).build(); 163 RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).build(); 164 ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null, 165 MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT); 166 final HRegion region = TEST_UTIL.createLocalHRegion(hri, CONF, htd, log); 167 ExecutorService exec = Executors.newFixedThreadPool(2); 168 169 // do a regular write first because of memstore size calculation. 170 region.put(new Put(b).addColumn(b, b, b)); 171 172 startHoldingForAppend.set(true); 173 exec.submit(new Runnable() { 174 @Override 175 public void run() { 176 try { 177 region.put(new Put(b).addColumn(b, b, b)); 178 putFinished.countDown(); 179 } catch (IOException e) { 180 LOG.error(e.toString(), e); 181 } 182 } 183 }); 184 185 // give the put a chance to start 186 Threads.sleep(3000); 187 188 exec.submit(new Runnable() { 189 @Override 190 public void run() { 191 try { 192 HRegion.FlushResult flushResult = region.flush(true); 193 LOG.info("Flush result:" + flushResult.getResult()); 194 LOG.info("Flush succeeded:" + flushResult.isFlushSucceeded()); 195 flushFinished.countDown(); 196 } catch (IOException e) { 197 LOG.error(e.toString(), e); 198 } 199 } 200 }); 201 202 // give the flush a chance to start. Flush should have got the region lock, and 203 // should have been waiting on the mvcc complete after this. 204 Threads.sleep(3000); 205 206 // let the append to WAL go through now that the flush already started 207 holdAppend.countDown(); 208 putFinished.await(); 209 flushFinished.await(); 210 211 // check whether flush went through 212 assertEquals(1, region.getStoreFileList(new byte[][] { b }).size(), "Region did not flush?"); 213 214 // now check the region's unflushed seqIds. 215 long seqId = AbstractTestFSWAL.getEarliestMemStoreSeqNum(log, hri.getEncodedNameAsBytes()); 216 assertEquals(HConstants.NO_SEQNUM, seqId, 217 "Found seqId for the region which is already flushed"); 218 219 region.close(); 220 } 221 } 222}