001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import java.io.IOException; 021import java.nio.ByteBuffer; 022import java.util.Deque; 023import java.util.Map; 024import java.util.concurrent.ExecutorService; 025import java.util.concurrent.atomic.AtomicReference; 026import java.util.function.Consumer; 027 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.fs.Path; 030import org.apache.hadoop.hbase.Cell; 031import org.apache.hadoop.hbase.DoNotRetryIOException; 032import org.apache.hadoop.hbase.HBaseClassTestRule; 033import org.apache.hadoop.hbase.HBaseTestingUtility; 034import org.apache.hadoop.hbase.TableName; 035import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 036import org.apache.hadoop.hbase.client.Connection; 037import org.apache.hadoop.hbase.client.Get; 038import org.apache.hadoop.hbase.client.Put; 039import org.apache.hadoop.hbase.client.Result; 040import org.apache.hadoop.hbase.client.Table; 041import org.apache.hadoop.hbase.client.TableDescriptor; 042import org.apache.hadoop.hbase.io.compress.Compression; 043import org.apache.hadoop.hbase.io.crypto.Encryption; 044import org.apache.hadoop.hbase.io.hfile.CacheConfig; 045import org.apache.hadoop.hbase.io.hfile.HFile; 046import org.apache.hadoop.hbase.io.hfile.HFileContext; 047import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; 048import org.apache.hadoop.hbase.testclassification.MediumTests; 049import org.apache.hadoop.hbase.testclassification.RegionServerTests; 050import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles; 051import org.apache.hadoop.hbase.util.Bytes; 052import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 053import org.apache.hadoop.hbase.util.Threads; 054import org.junit.AfterClass; 055import org.junit.Assert; 056import org.junit.BeforeClass; 057import org.junit.ClassRule; 058import org.junit.Test; 059import org.junit.experimental.categories.Category; 060import org.slf4j.Logger; 061import org.slf4j.LoggerFactory; 062import org.apache.hbase.thirdparty.com.google.common.collect.Multimap; 063 064 065@Category({RegionServerTests.class, MediumTests.class}) 066public class TestSecureBulkLoadManager { 067 068 @ClassRule 069 public static final HBaseClassTestRule CLASS_RULE = 070 HBaseClassTestRule.forClass(TestSecureBulkLoadManager.class); 071 072 private static final Logger LOG = 073 LoggerFactory.getLogger(TestSecureBulkLoadManager.class); 074 075 private static TableName TABLE = TableName.valueOf(Bytes.toBytes("TestSecureBulkLoadManager")); 076 private static byte[] FAMILY = Bytes.toBytes("family"); 077 private static byte[] COLUMN = Bytes.toBytes("column"); 078 private static byte[] key1 = Bytes.toBytes("row1"); 079 private static byte[] key2 = Bytes.toBytes("row2"); 080 private static byte[] key3 = Bytes.toBytes("row3"); 081 private static byte[] value1 = Bytes.toBytes("t1"); 082 private static byte[] value3 = Bytes.toBytes("t3"); 083 private static byte[] SPLIT_ROWKEY = key2; 084 085 private Thread ealierBulkload; 086 private Thread laterBulkload; 087 088 protected final static HBaseTestingUtility testUtil = new HBaseTestingUtility(); 089 private static Configuration conf = testUtil.getConfiguration(); 090 091 @BeforeClass 092 public static void setUp() throws Exception { 093 testUtil.startMiniCluster(); 094 } 095 096 @AfterClass 097 public static void tearDown() throws Exception { 098 testUtil.shutdownMiniCluster(); 099 testUtil.cleanupTestDir(); 100 } 101 102 /** 103 * After a secure bulkload finished , there is a clean-up for FileSystems used in the bulkload. 104 * Sometimes, FileSystems used in the finished bulkload might also be used in other bulkload 105 * calls, or there are other FileSystems created by the same user, they could be closed by a 106 * FileSystem.closeAllForUGI call. So during the clean-up, those FileSystems need to be used 107 * later can not get closed ,or else a race condition occurs. 108 * 109 * testForRaceCondition tests the case that two secure bulkload calls from the same UGI go 110 * into two different regions and one bulkload finishes earlier when the other bulkload still 111 * needs its FileSystems, checks that both bulkloads succeed. 112 */ 113 @Test 114 public void testForRaceCondition() throws Exception { 115 Consumer<HRegion> fsCreatedListener = new Consumer<HRegion>() { 116 @Override 117 public void accept(HRegion hRegion) { 118 if (hRegion.getRegionInfo().containsRow(key3)) { 119 Threads.shutdown(ealierBulkload);/// wait util the other bulkload finished 120 } 121 } 122 } ; 123 testUtil.getMiniHBaseCluster().getRegionServerThreads().get(0).getRegionServer() 124 .secureBulkLoadManager.setFsCreatedListener(fsCreatedListener); 125 /// create table 126 testUtil.createTable(TABLE,FAMILY,Bytes.toByteArrays(SPLIT_ROWKEY)); 127 128 /// prepare files 129 Path rootdir = testUtil.getMiniHBaseCluster().getRegionServerThreads().get(0) 130 .getRegionServer().getRootDir(); 131 Path dir1 = new Path(rootdir, "dir1"); 132 prepareHFile(dir1, key1, value1); 133 Path dir2 = new Path(rootdir, "dir2"); 134 prepareHFile(dir2, key3, value3); 135 136 /// do bulkload 137 final AtomicReference<Throwable> t1Exception = new AtomicReference<>(); 138 final AtomicReference<Throwable> t2Exception = new AtomicReference<>(); 139 ealierBulkload = new Thread(new Runnable() { 140 @Override 141 public void run() { 142 try { 143 doBulkloadWithoutRetry(dir1); 144 } catch (Exception e) { 145 LOG.error("bulk load failed .",e); 146 t1Exception.set(e); 147 } 148 } 149 }); 150 laterBulkload = new Thread(new Runnable() { 151 @Override 152 public void run() { 153 try { 154 doBulkloadWithoutRetry(dir2); 155 } catch (Exception e) { 156 LOG.error("bulk load failed .",e); 157 t2Exception.set(e); 158 } 159 } 160 }); 161 ealierBulkload.start(); 162 laterBulkload.start(); 163 Threads.shutdown(ealierBulkload); 164 Threads.shutdown(laterBulkload); 165 Assert.assertNull(t1Exception.get()); 166 Assert.assertNull(t2Exception.get()); 167 168 /// check bulkload ok 169 Get get1 = new Get(key1); 170 Get get3 = new Get(key3); 171 Table t = testUtil.getConnection().getTable(TABLE); 172 Result r = t.get(get1); 173 Assert.assertArrayEquals(r.getValue(FAMILY, COLUMN), value1); 174 r = t.get(get3); 175 Assert.assertArrayEquals(r.getValue(FAMILY, COLUMN), value3); 176 177 } 178 179 /** 180 * A trick is used to make sure server-side failures( if any ) not being covered up by a client 181 * retry. Since LoadIncrementalHFiles.doBulkLoad keeps performing bulkload calls as long as the 182 * HFile queue is not empty, while server-side exceptions in the doAs block do not lead 183 * to a client exception, a bulkload will always succeed in this case by default, thus client 184 * will never be aware that failures have ever happened . To avoid this kind of retry , 185 * a MyExceptionToAvoidRetry exception is thrown after bulkLoadPhase finished and caught 186 * silently outside the doBulkLoad call, so that the bulkLoadPhase would be called exactly 187 * once, and server-side failures, if any ,can be checked via data. 188 */ 189 class MyExceptionToAvoidRetry extends DoNotRetryIOException { 190 } 191 192 private void doBulkloadWithoutRetry(Path dir) throws Exception { 193 Connection connection = testUtil.getConnection(); 194 LoadIncrementalHFiles h = new LoadIncrementalHFiles(conf) { 195 @Override 196 protected void bulkLoadPhase(final Table htable, final Connection conn, 197 ExecutorService pool, Deque<LoadQueueItem> queue, 198 final Multimap<ByteBuffer, LoadQueueItem> regionGroups, boolean copyFile, 199 Map<LoadQueueItem, ByteBuffer> item2RegionMap) throws IOException { 200 super.bulkLoadPhase(htable, conn, pool, queue, regionGroups, copyFile, item2RegionMap); 201 throw new MyExceptionToAvoidRetry(); // throw exception to avoid retry 202 } 203 }; 204 try { 205 h.doBulkLoad(dir, testUtil.getAdmin(), connection.getTable(TABLE), 206 connection.getRegionLocator(TABLE)); 207 Assert.fail("MyExceptionToAvoidRetry is expected"); 208 } catch (MyExceptionToAvoidRetry e) { //expected 209 } 210 } 211 212 private void prepareHFile(Path dir, byte[] key, byte[] value) throws Exception { 213 TableDescriptor desc = testUtil.getAdmin().getDescriptor(TABLE); 214 ColumnFamilyDescriptor family = desc.getColumnFamily(FAMILY); 215 Compression.Algorithm compression = HFile.DEFAULT_COMPRESSION_ALGORITHM; 216 217 CacheConfig writerCacheConf = new CacheConfig(conf, family, null); 218 writerCacheConf.setCacheDataOnWrite(false); 219 HFileContext hFileContext = new HFileContextBuilder() 220 .withIncludesMvcc(false) 221 .withIncludesTags(true) 222 .withCompression(compression) 223 .withCompressTags(family.isCompressTags()) 224 .withChecksumType(HStore.getChecksumType(conf)) 225 .withBytesPerCheckSum(HStore.getBytesPerChecksum(conf)) 226 .withBlockSize(family.getBlocksize()) 227 .withHBaseCheckSum(true) 228 .withDataBlockEncoding(family.getDataBlockEncoding()) 229 .withEncryptionContext(Encryption.Context.NONE) 230 .withCreateTime(EnvironmentEdgeManager.currentTime()) 231 .build(); 232 StoreFileWriter.Builder builder = 233 new StoreFileWriter.Builder(conf, writerCacheConf, dir.getFileSystem(conf)) 234 .withOutputDir(new Path(dir, family.getNameAsString())) 235 .withBloomType(family.getBloomFilterType()) 236 .withMaxKeyCount(Integer.MAX_VALUE) 237 .withFileContext(hFileContext); 238 StoreFileWriter writer = builder.build(); 239 240 Put put = new Put(key); 241 put.addColumn(FAMILY, COLUMN, value); 242 for (Cell c : put.get(FAMILY, COLUMN)) { 243 writer.append(c); 244 } 245 246 writer.close(); 247 } 248}