001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.wal; 019 020import static org.junit.jupiter.api.Assertions.assertFalse; 021 022import java.io.IOException; 023import java.util.NavigableMap; 024import java.util.TreeMap; 025import java.util.concurrent.ThreadLocalRandom; 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.fs.Path; 028import org.apache.hadoop.hbase.HBaseTestingUtil; 029import org.apache.hadoop.hbase.HConstants; 030import org.apache.hadoop.hbase.KeyValue; 031import org.apache.hadoop.hbase.TableDescriptors; 032import org.apache.hadoop.hbase.TableName; 033import org.apache.hadoop.hbase.client.RegionInfo; 034import org.apache.hadoop.hbase.client.RegionInfoBuilder; 035import org.apache.hadoop.hbase.client.TableDescriptor; 036import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; 037import org.apache.hadoop.hbase.testclassification.MediumTests; 038import org.apache.hadoop.hbase.testclassification.RegionServerTests; 039import org.apache.hadoop.hbase.util.Bytes; 040import org.apache.hadoop.hbase.util.CommonFSUtils; 041import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 042import org.apache.hadoop.hbase.util.FSTableDescriptors; 043import org.apache.hadoop.hbase.util.Threads; 044import org.apache.hadoop.hbase.wal.FSHLogProvider; 045import org.apache.hadoop.hbase.wal.WAL; 046import org.apache.hadoop.hbase.wal.WALEdit; 047import org.apache.hadoop.hbase.wal.WALEditInternalHelper; 048import org.apache.hadoop.hbase.wal.WALFactory; 049import org.apache.hadoop.hbase.wal.WALKeyImpl; 050import org.junit.jupiter.api.Tag; 051import org.junit.jupiter.api.Test; 052import org.slf4j.Logger; 053import org.slf4j.LoggerFactory; 054 055/** 056 * Test many concurrent appenders to an WAL while rolling the log. 057 */ 058@Tag(RegionServerTests.TAG) 059@Tag(MediumTests.TAG) 060public class TestLogRollingNoCluster { 061 062 private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 063 private final static byte[] EMPTY_1K_ARRAY = new byte[1024]; 064 private static final int NUM_THREADS = 100; // Spin up this many threads 065 private static final int NUM_ENTRIES = 100; // How many entries to write 066 067 /** ProtobufLogWriter that simulates higher latencies in sync() call */ 068 public static class HighLatencySyncWriter extends ProtobufLogWriter { 069 @Override 070 public void sync(boolean forceSync) throws IOException { 071 Threads.sleep(ThreadLocalRandom.current().nextInt(10)); 072 super.sync(forceSync); 073 Threads.sleep(ThreadLocalRandom.current().nextInt(10)); 074 } 075 } 076 077 /** 078 * Spin up a bunch of threads and have them all append to a WAL. Roll the WAL frequently to try 079 * and trigger NPE. 080 */ 081 @Test 082 public void testContendedLogRolling() throws Exception { 083 TEST_UTIL.startMiniDFSCluster(3); 084 Path dir = TEST_UTIL.getDataTestDirOnTestFS(); 085 086 // The implementation needs to know the 'handler' count. 087 TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HANDLER_COUNT, NUM_THREADS); 088 final Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); 089 conf.set(WALFactory.WAL_PROVIDER, "filesystem"); 090 CommonFSUtils.setRootDir(conf, dir); 091 FSTableDescriptors fsTableDescriptors = new FSTableDescriptors(TEST_UTIL.getConfiguration()); 092 FSTableDescriptors.tryUpdateMetaTableDescriptor(TEST_UTIL.getConfiguration()); 093 TableDescriptor metaTableDescriptor = fsTableDescriptors.get(TableName.META_TABLE_NAME); 094 conf.set(FSHLogProvider.WRITER_IMPL, HighLatencySyncWriter.class.getName()); 095 final WALFactory wals = new WALFactory(conf, TestLogRollingNoCluster.class.getName()); 096 final WAL wal = wals.getWAL(null); 097 098 Appender[] appenders = null; 099 100 final int numThreads = NUM_THREADS; 101 appenders = new Appender[numThreads]; 102 try { 103 for (int i = 0; i < numThreads; i++) { 104 // Have each appending thread write 'count' entries 105 appenders[i] = new Appender(metaTableDescriptor, wal, i, NUM_ENTRIES); 106 } 107 for (int i = 0; i < numThreads; i++) { 108 appenders[i].start(); 109 } 110 for (int i = 0; i < numThreads; i++) { 111 // ensure that all threads are joined before closing the wal 112 appenders[i].join(); 113 } 114 } finally { 115 wals.close(); 116 } 117 for (int i = 0; i < numThreads; i++) { 118 assertFalse(appenders[i].isException(), "Error: " + appenders[i].getException()); 119 } 120 TEST_UTIL.shutdownMiniDFSCluster(); 121 } 122 123 /** 124 * Appender thread. Appends to passed wal file. 125 */ 126 static class Appender extends Thread { 127 private final Logger log; 128 private final WAL wal; 129 private final int count; 130 private Exception e = null; 131 private final TableDescriptor metaTableDescriptor; 132 133 Appender(TableDescriptor metaTableDescriptor, final WAL wal, final int index, final int count) { 134 super("" + index); 135 this.wal = wal; 136 this.count = count; 137 this.metaTableDescriptor = metaTableDescriptor; 138 this.log = LoggerFactory.getLogger("Appender:" + getName()); 139 } 140 141 /** Returns Call when the thread is done. */ 142 boolean isException() { 143 return !isAlive() && this.e != null; 144 } 145 146 Exception getException() { 147 return this.e; 148 } 149 150 @Override 151 public void run() { 152 this.log.info(getName() + " started"); 153 final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); 154 try { 155 TableDescriptors tds = new FSTableDescriptors(TEST_UTIL.getConfiguration()); 156 FSTableDescriptors.tryUpdateMetaTableDescriptor(TEST_UTIL.getConfiguration()); 157 TableDescriptor htd = tds.get(TableName.META_TABLE_NAME); 158 for (int i = 0; i < this.count; i++) { 159 long now = EnvironmentEdgeManager.currentTime(); 160 // Roll every ten edits 161 if (i % 10 == 0) { 162 this.wal.rollWriter(); 163 } 164 WALEdit edit = new WALEdit(); 165 byte[] bytes = Bytes.toBytes(i); 166 WALEditInternalHelper.addExtendedCell(edit, 167 new KeyValue(bytes, bytes, bytes, now, EMPTY_1K_ARRAY)); 168 RegionInfo hri = RegionInfoBuilder.FIRST_META_REGIONINFO; 169 NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); 170 for (byte[] fam : this.metaTableDescriptor.getColumnFamilyNames()) { 171 scopes.put(fam, 0); 172 } 173 final long txid = wal.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), 174 TableName.META_TABLE_NAME, now, mvcc, scopes), edit); 175 Threads.sleep(ThreadLocalRandom.current().nextInt(5)); 176 wal.sync(txid); 177 } 178 String msg = getName() + " finished"; 179 if (isException()) this.log.info(msg, getException()); 180 else this.log.info(msg); 181 } catch (Exception e) { 182 this.e = e; 183 log.info("Caught exception from Appender:" + getName(), e); 184 } finally { 185 // Call sync on our log.else threads just hang out. 186 try { 187 this.wal.sync(); 188 } catch (IOException e) { 189 throw new RuntimeException(e); 190 } 191 } 192 } 193 } 194 195 // @org.junit.Rule 196 // public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu = 197 // new org.apache.hadoop.hbase.ResourceCheckerJUnitRule(); 198}