001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.wal; 019 020import static org.junit.Assert.assertFalse; 021 022import java.io.IOException; 023import java.util.NavigableMap; 024import java.util.TreeMap; 025import java.util.concurrent.ThreadLocalRandom; 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.fs.Path; 028import org.apache.hadoop.hbase.HBaseClassTestRule; 029import org.apache.hadoop.hbase.HBaseTestingUtility; 030import org.apache.hadoop.hbase.HConstants; 031import org.apache.hadoop.hbase.KeyValue; 032import org.apache.hadoop.hbase.TableDescriptors; 033import org.apache.hadoop.hbase.TableName; 034import org.apache.hadoop.hbase.client.RegionInfo; 035import org.apache.hadoop.hbase.client.RegionInfoBuilder; 036import org.apache.hadoop.hbase.client.TableDescriptor; 037import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; 038import org.apache.hadoop.hbase.testclassification.MediumTests; 039import org.apache.hadoop.hbase.testclassification.RegionServerTests; 040import org.apache.hadoop.hbase.util.Bytes; 041import org.apache.hadoop.hbase.util.CommonFSUtils; 042import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 043import org.apache.hadoop.hbase.util.FSTableDescriptors; 044import org.apache.hadoop.hbase.util.Threads; 045import org.apache.hadoop.hbase.wal.WAL; 046import org.apache.hadoop.hbase.wal.WALEdit; 047import org.apache.hadoop.hbase.wal.WALFactory; 048import org.apache.hadoop.hbase.wal.WALKeyImpl; 049import org.junit.ClassRule; 050import org.junit.Test; 051import org.junit.experimental.categories.Category; 052import org.slf4j.Logger; 053import org.slf4j.LoggerFactory; 054 055/** 056 * Test many concurrent appenders to an WAL while rolling the log. 057 */ 058@Category({ RegionServerTests.class, MediumTests.class }) 059public class TestLogRollingNoCluster { 060 061 @ClassRule 062 public static final HBaseClassTestRule CLASS_RULE = 063 HBaseClassTestRule.forClass(TestLogRollingNoCluster.class); 064 065 private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 066 private final static byte[] EMPTY_1K_ARRAY = new byte[1024]; 067 private static final int NUM_THREADS = 100; // Spin up this many threads 068 private static final int NUM_ENTRIES = 100; // How many entries to write 069 070 /** ProtobufLogWriter that simulates higher latencies in sync() call */ 071 public static class HighLatencySyncWriter extends ProtobufLogWriter { 072 @Override 073 public void sync(boolean forceSync) throws IOException { 074 Threads.sleep(ThreadLocalRandom.current().nextInt(10)); 075 super.sync(forceSync); 076 Threads.sleep(ThreadLocalRandom.current().nextInt(10)); 077 } 078 } 079 080 /** 081 * Spin up a bunch of threads and have them all append to a WAL. Roll the WAL frequently to try 082 * and trigger NPE. nn 083 */ 084 @Test 085 public void testContendedLogRolling() throws Exception { 086 TEST_UTIL.startMiniDFSCluster(3); 087 Path dir = TEST_UTIL.getDataTestDirOnTestFS(); 088 089 // The implementation needs to know the 'handler' count. 090 TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HANDLER_COUNT, NUM_THREADS); 091 final Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); 092 conf.set(WALFactory.WAL_PROVIDER, "filesystem"); 093 CommonFSUtils.setRootDir(conf, dir); 094 FSTableDescriptors fsTableDescriptors = new FSTableDescriptors(TEST_UTIL.getConfiguration()); 095 FSTableDescriptors.tryUpdateMetaTableDescriptor(TEST_UTIL.getConfiguration()); 096 TableDescriptor metaTableDescriptor = fsTableDescriptors.get(TableName.META_TABLE_NAME); 097 conf.set("hbase.regionserver.hlog.writer.impl", HighLatencySyncWriter.class.getName()); 098 final WALFactory wals = new WALFactory(conf, TestLogRollingNoCluster.class.getName()); 099 final WAL wal = wals.getWAL(null); 100 101 Appender[] appenders = null; 102 103 final int numThreads = NUM_THREADS; 104 appenders = new Appender[numThreads]; 105 try { 106 for (int i = 0; i < numThreads; i++) { 107 // Have each appending thread write 'count' entries 108 appenders[i] = new Appender(metaTableDescriptor, wal, i, NUM_ENTRIES); 109 } 110 for (int i = 0; i < numThreads; i++) { 111 appenders[i].start(); 112 } 113 for (int i = 0; i < numThreads; i++) { 114 // ensure that all threads are joined before closing the wal 115 appenders[i].join(); 116 } 117 } finally { 118 wals.close(); 119 } 120 for (int i = 0; i < numThreads; i++) { 121 assertFalse("Error: " + appenders[i].getException(), appenders[i].isException()); 122 } 123 TEST_UTIL.shutdownMiniDFSCluster(); 124 } 125 126 /** 127 * Appender thread. Appends to passed wal file. 128 */ 129 static class Appender extends Thread { 130 private final Logger log; 131 private final WAL wal; 132 private final int count; 133 private Exception e = null; 134 private final TableDescriptor metaTableDescriptor; 135 136 Appender(TableDescriptor metaTableDescriptor, final WAL wal, final int index, final int count) { 137 super("" + index); 138 this.wal = wal; 139 this.count = count; 140 this.metaTableDescriptor = metaTableDescriptor; 141 this.log = LoggerFactory.getLogger("Appender:" + getName()); 142 } 143 144 /** Returns Call when the thread is done. */ 145 boolean isException() { 146 return !isAlive() && this.e != null; 147 } 148 149 Exception getException() { 150 return this.e; 151 } 152 153 @Override 154 public void run() { 155 this.log.info(getName() + " started"); 156 final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); 157 try { 158 TableDescriptors tds = new FSTableDescriptors(TEST_UTIL.getConfiguration()); 159 TableDescriptor htd = tds.get(TableName.META_TABLE_NAME); 160 for (int i = 0; i < this.count; i++) { 161 long now = EnvironmentEdgeManager.currentTime(); 162 // Roll every ten edits 163 if (i % 10 == 0) { 164 this.wal.rollWriter(); 165 } 166 WALEdit edit = new WALEdit(); 167 byte[] bytes = Bytes.toBytes(i); 168 edit.add(new KeyValue(bytes, bytes, bytes, now, EMPTY_1K_ARRAY)); 169 RegionInfo hri = RegionInfoBuilder.FIRST_META_REGIONINFO; 170 NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); 171 for (byte[] fam : this.metaTableDescriptor.getColumnFamilyNames()) { 172 scopes.put(fam, 0); 173 } 174 final long txid = wal.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), 175 TableName.META_TABLE_NAME, now, mvcc, scopes), edit); 176 Threads.sleep(ThreadLocalRandom.current().nextInt(5)); 177 wal.sync(txid); 178 } 179 String msg = getName() + " finished"; 180 if (isException()) this.log.info(msg, getException()); 181 else this.log.info(msg); 182 } catch (Exception e) { 183 this.e = e; 184 log.info("Caught exception from Appender:" + getName(), e); 185 } finally { 186 // Call sync on our log.else threads just hang out. 187 try { 188 this.wal.sync(); 189 } catch (IOException e) { 190 throw new RuntimeException(e); 191 } 192 } 193 } 194 } 195 196 // @org.junit.Rule 197 // public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu = 198 // new org.apache.hadoop.hbase.ResourceCheckerJUnitRule(); 199}