001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.wal; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021 022import java.io.IOException; 023import java.lang.reflect.Field; 024import java.util.List; 025import java.util.NavigableMap; 026import java.util.TreeMap; 027import java.util.concurrent.CountDownLatch; 028import java.util.concurrent.ExecutorService; 029import java.util.concurrent.Executors; 030import java.util.concurrent.atomic.AtomicBoolean; 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.fs.FileSystem; 033import org.apache.hadoop.fs.Path; 034import org.apache.hadoop.hbase.HConstants; 035import org.apache.hadoop.hbase.TableName; 036import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 037import org.apache.hadoop.hbase.client.Put; 038import org.apache.hadoop.hbase.client.RegionInfo; 039import org.apache.hadoop.hbase.client.RegionInfoBuilder; 040import org.apache.hadoop.hbase.client.TableDescriptor; 041import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 042import org.apache.hadoop.hbase.regionserver.ChunkCreator; 043import org.apache.hadoop.hbase.regionserver.HRegion; 044import org.apache.hadoop.hbase.regionserver.MemStoreLAB; 045import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; 046import org.apache.hadoop.hbase.testclassification.MediumTests; 047import org.apache.hadoop.hbase.testclassification.RegionServerTests; 048import org.apache.hadoop.hbase.util.Bytes; 049import org.apache.hadoop.hbase.util.CommonFSUtils; 050import org.apache.hadoop.hbase.util.Threads; 051import org.apache.hadoop.hbase.wal.WALEdit; 052import org.apache.hadoop.hbase.wal.WALKey; 053import org.junit.jupiter.api.BeforeEach; 054import org.junit.jupiter.api.Tag; 055import org.junit.jupiter.api.Test; 056import org.junit.jupiter.api.TestInfo; 057 058/** 059 * Provides FSHLog test cases. 060 */ 061@Tag(RegionServerTests.TAG) 062@Tag(MediumTests.TAG) 063public class TestFSHLog extends AbstractTestFSWAL { 064 065 private String name; 066 067 @BeforeEach 068 public void initTestName(TestInfo testInfo) { 069 name = testInfo.getTestMethod().get().getName(); 070 } 071 072 @Override 073 protected AbstractFSWAL<?> newWAL(FileSystem fs, Path rootDir, String walDir, String archiveDir, 074 Configuration conf, List<WALActionsListener> listeners, boolean failIfWALExists, String prefix, 075 String suffix) throws IOException { 076 FSHLog wal = 077 new FSHLog(fs, rootDir, walDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix); 078 wal.init(); 079 return wal; 080 } 081 082 @Override 083 protected AbstractFSWAL<?> newSlowWAL(FileSystem fs, Path rootDir, String walDir, 084 String archiveDir, Configuration conf, List<WALActionsListener> listeners, 085 boolean failIfWALExists, String prefix, String suffix, final Runnable action) 086 throws IOException { 087 FSHLog wal = new FSHLog(fs, rootDir, walDir, archiveDir, conf, listeners, failIfWALExists, 088 prefix, suffix) { 089 090 @Override 091 protected void atHeadOfRingBufferEventHandlerAppend() { 092 action.run(); 093 super.atHeadOfRingBufferEventHandlerAppend(); 094 } 095 }; 096 wal.init(); 097 return wal; 098 } 099 100 @Test 101 public void testSyncRunnerIndexOverflow() throws IOException, NoSuchFieldException, 102 SecurityException, IllegalArgumentException, IllegalAccessException { 103 FS.mkdirs(new Path(CommonFSUtils.getRootDir(CONF), this.name)); 104 FSHLog log = new FSHLog(FS, CommonFSUtils.getRootDir(CONF), this.name, 105 HConstants.HREGION_OLDLOGDIR_NAME, CONF, null, true, null, null); 106 log.init(); 107 try { 108 Field syncRunnerIndexField = FSHLog.class.getDeclaredField("syncRunnerIndex"); 109 syncRunnerIndexField.setAccessible(true); 110 syncRunnerIndexField.set(log, Integer.MAX_VALUE - 1); 111 TableDescriptor htd = TableDescriptorBuilder.newBuilder(TableName.valueOf(this.name)) 112 .setColumnFamily(ColumnFamilyDescriptorBuilder.of("row")).build(); 113 NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); 114 for (byte[] fam : htd.getColumnFamilyNames()) { 115 scopes.put(fam, 0); 116 } 117 RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).build(); 118 MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); 119 for (int i = 0; i < 10; i++) { 120 addEdits(log, hri, htd, 1, mvcc, scopes, "row"); 121 } 122 } finally { 123 log.close(); 124 } 125 } 126 127 /** 128 * Test case for https://issues.apache.org/jira/browse/HBASE-16721 129 */ 130 @Test 131 public void testUnflushedSeqIdTracking() throws IOException, InterruptedException { 132 final byte[] b = Bytes.toBytes("b"); 133 134 final AtomicBoolean startHoldingForAppend = new AtomicBoolean(false); 135 final CountDownLatch holdAppend = new CountDownLatch(1); 136 final CountDownLatch flushFinished = new CountDownLatch(1); 137 final CountDownLatch putFinished = new CountDownLatch(1); 138 139 FS.mkdirs(new Path(CommonFSUtils.getRootDir(CONF), this.name)); 140 try (FSHLog log = new FSHLog(FS, CommonFSUtils.getRootDir(CONF), this.name, 141 HConstants.HREGION_OLDLOGDIR_NAME, CONF, null, true, null, null)) { 142 log.init(); 143 log.registerWALActionsListener(new WALActionsListener() { 144 @Override 145 public void visitLogEntryBeforeWrite(RegionInfo info, WALKey logKey, WALEdit logEdit) { 146 if (startHoldingForAppend.get()) { 147 try { 148 holdAppend.await(); 149 } catch (InterruptedException e) { 150 LOG.error(e.toString(), e); 151 } 152 } 153 } 154 }); 155 156 // open a new region which uses this WAL 157 TableDescriptor htd = TableDescriptorBuilder.newBuilder(TableName.valueOf(this.name)) 158 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(b)).build(); 159 RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).build(); 160 ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null, 161 MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT); 162 final HRegion region = TEST_UTIL.createLocalHRegion(hri, CONF, htd, log); 163 ExecutorService exec = Executors.newFixedThreadPool(2); 164 165 // do a regular write first because of memstore size calculation. 166 region.put(new Put(b).addColumn(b, b, b)); 167 168 startHoldingForAppend.set(true); 169 exec.submit(new Runnable() { 170 @Override 171 public void run() { 172 try { 173 region.put(new Put(b).addColumn(b, b, b)); 174 putFinished.countDown(); 175 } catch (IOException e) { 176 LOG.error(e.toString(), e); 177 } 178 } 179 }); 180 181 // give the put a chance to start 182 Threads.sleep(3000); 183 184 exec.submit(new Runnable() { 185 @Override 186 public void run() { 187 try { 188 HRegion.FlushResult flushResult = region.flush(true); 189 LOG.info("Flush result:" + flushResult.getResult()); 190 LOG.info("Flush succeeded:" + flushResult.isFlushSucceeded()); 191 flushFinished.countDown(); 192 } catch (IOException e) { 193 LOG.error(e.toString(), e); 194 } 195 } 196 }); 197 198 // give the flush a chance to start. Flush should have got the region lock, and 199 // should have been waiting on the mvcc complete after this. 200 Threads.sleep(3000); 201 202 // let the append to WAL go through now that the flush already started 203 holdAppend.countDown(); 204 putFinished.await(); 205 flushFinished.await(); 206 207 // check whether flush went through 208 assertEquals(1, region.getStoreFileList(new byte[][] { b }).size(), "Region did not flush?"); 209 210 // now check the region's unflushed seqIds. 211 long seqId = AbstractTestFSWAL.getEarliestMemStoreSeqNum(log, hri.getEncodedNameAsBytes()); 212 assertEquals(HConstants.NO_SEQNUM, seqId, 213 "Found seqId for the region which is already flushed"); 214 215 region.close(); 216 } 217 } 218}