001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import java.io.IOException; 021import java.nio.ByteBuffer; 022import java.util.Deque; 023import java.util.Map; 024import java.util.concurrent.ExecutorService; 025import java.util.concurrent.atomic.AtomicReference; 026import java.util.function.Consumer; 027 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.fs.Path; 030import org.apache.hadoop.hbase.Cell; 031import org.apache.hadoop.hbase.DoNotRetryIOException; 032import org.apache.hadoop.hbase.HBaseClassTestRule; 033import org.apache.hadoop.hbase.HBaseTestingUtility; 034import org.apache.hadoop.hbase.TableName; 035import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 036import org.apache.hadoop.hbase.client.Connection; 037import org.apache.hadoop.hbase.client.Get; 038import org.apache.hadoop.hbase.client.Put; 039import org.apache.hadoop.hbase.client.Result; 040import org.apache.hadoop.hbase.client.Table; 041import org.apache.hadoop.hbase.client.TableDescriptor; 042import org.apache.hadoop.hbase.io.ByteBuffAllocator; 043import org.apache.hadoop.hbase.io.compress.Compression; 044import org.apache.hadoop.hbase.io.crypto.Encryption; 045import org.apache.hadoop.hbase.io.hfile.CacheConfig; 046import org.apache.hadoop.hbase.io.hfile.HFile; 047import org.apache.hadoop.hbase.io.hfile.HFileContext; 048import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; 049import org.apache.hadoop.hbase.testclassification.MediumTests; 050import org.apache.hadoop.hbase.testclassification.RegionServerTests; 051import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles; 052import org.apache.hadoop.hbase.util.Bytes; 053import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 054import org.apache.hadoop.hbase.util.Threads; 055import org.junit.AfterClass; 056import org.junit.Assert; 057import org.junit.BeforeClass; 058import org.junit.ClassRule; 059import org.junit.Test; 060import org.junit.experimental.categories.Category; 061import org.slf4j.Logger; 062import org.slf4j.LoggerFactory; 063import org.apache.hbase.thirdparty.com.google.common.collect.Multimap; 064 065 066@Category({RegionServerTests.class, MediumTests.class}) 067public class TestSecureBulkLoadManager { 068 069 @ClassRule 070 public static final HBaseClassTestRule CLASS_RULE = 071 HBaseClassTestRule.forClass(TestSecureBulkLoadManager.class); 072 073 private static final Logger LOG = 074 LoggerFactory.getLogger(TestSecureBulkLoadManager.class); 075 076 private static TableName TABLE = TableName.valueOf(Bytes.toBytes("TestSecureBulkLoadManager")); 077 private static byte[] FAMILY = Bytes.toBytes("family"); 078 private static byte[] COLUMN = Bytes.toBytes("column"); 079 private static byte[] key1 = Bytes.toBytes("row1"); 080 private static byte[] key2 = Bytes.toBytes("row2"); 081 private static byte[] key3 = Bytes.toBytes("row3"); 082 private static byte[] value1 = Bytes.toBytes("t1"); 083 private static byte[] value3 = Bytes.toBytes("t3"); 084 private static byte[] SPLIT_ROWKEY = key2; 085 086 private Thread ealierBulkload; 087 private Thread laterBulkload; 088 089 protected final static HBaseTestingUtility testUtil = new HBaseTestingUtility(); 090 private static Configuration conf = testUtil.getConfiguration(); 091 092 @BeforeClass 093 public static void setUp() throws Exception { 094 testUtil.startMiniCluster(); 095 } 096 097 @AfterClass 098 public static void tearDown() throws Exception { 099 testUtil.shutdownMiniCluster(); 100 testUtil.cleanupTestDir(); 101 } 102 103 /** 104 * After a secure bulkload finished , there is a clean-up for FileSystems used in the bulkload. 105 * Sometimes, FileSystems used in the finished bulkload might also be used in other bulkload 106 * calls, or there are other FileSystems created by the same user, they could be closed by a 107 * FileSystem.closeAllForUGI call. So during the clean-up, those FileSystems need to be used 108 * later can not get closed ,or else a race condition occurs. 109 * 110 * testForRaceCondition tests the case that two secure bulkload calls from the same UGI go 111 * into two different regions and one bulkload finishes earlier when the other bulkload still 112 * needs its FileSystems, checks that both bulkloads succeed. 113 */ 114 @Test 115 public void testForRaceCondition() throws Exception { 116 Consumer<HRegion> fsCreatedListener = new Consumer<HRegion>() { 117 @Override 118 public void accept(HRegion hRegion) { 119 if (hRegion.getRegionInfo().containsRow(key3)) { 120 Threads.shutdown(ealierBulkload);/// wait util the other bulkload finished 121 } 122 } 123 } ; 124 testUtil.getMiniHBaseCluster().getRegionServerThreads().get(0).getRegionServer() 125 .getSecureBulkLoadManager().setFsCreatedListener(fsCreatedListener); 126 /// create table 127 testUtil.createTable(TABLE,FAMILY,Bytes.toByteArrays(SPLIT_ROWKEY)); 128 129 /// prepare files 130 Path rootdir = testUtil.getMiniHBaseCluster().getRegionServerThreads().get(0) 131 .getRegionServer().getDataRootDir(); 132 Path dir1 = new Path(rootdir, "dir1"); 133 prepareHFile(dir1, key1, value1); 134 Path dir2 = new Path(rootdir, "dir2"); 135 prepareHFile(dir2, key3, value3); 136 137 /// do bulkload 138 final AtomicReference<Throwable> t1Exception = new AtomicReference<>(); 139 final AtomicReference<Throwable> t2Exception = new AtomicReference<>(); 140 ealierBulkload = new Thread(new Runnable() { 141 @Override 142 public void run() { 143 try { 144 doBulkloadWithoutRetry(dir1); 145 } catch (Exception e) { 146 LOG.error("bulk load failed .",e); 147 t1Exception.set(e); 148 } 149 } 150 }); 151 laterBulkload = new Thread(new Runnable() { 152 @Override 153 public void run() { 154 try { 155 doBulkloadWithoutRetry(dir2); 156 } catch (Exception e) { 157 LOG.error("bulk load failed .",e); 158 t2Exception.set(e); 159 } 160 } 161 }); 162 ealierBulkload.start(); 163 laterBulkload.start(); 164 Threads.shutdown(ealierBulkload); 165 Threads.shutdown(laterBulkload); 166 Assert.assertNull(t1Exception.get()); 167 Assert.assertNull(t2Exception.get()); 168 169 /// check bulkload ok 170 Get get1 = new Get(key1); 171 Get get3 = new Get(key3); 172 Table t = testUtil.getConnection().getTable(TABLE); 173 Result r = t.get(get1); 174 Assert.assertArrayEquals(r.getValue(FAMILY, COLUMN), value1); 175 r = t.get(get3); 176 Assert.assertArrayEquals(r.getValue(FAMILY, COLUMN), value3); 177 178 } 179 180 /** 181 * A trick is used to make sure server-side failures( if any ) not being covered up by a client 182 * retry. Since LoadIncrementalHFiles.doBulkLoad keeps performing bulkload calls as long as the 183 * HFile queue is not empty, while server-side exceptions in the doAs block do not lead 184 * to a client exception, a bulkload will always succeed in this case by default, thus client 185 * will never be aware that failures have ever happened . To avoid this kind of retry , 186 * a MyExceptionToAvoidRetry exception is thrown after bulkLoadPhase finished and caught 187 * silently outside the doBulkLoad call, so that the bulkLoadPhase would be called exactly 188 * once, and server-side failures, if any ,can be checked via data. 189 */ 190 class MyExceptionToAvoidRetry extends DoNotRetryIOException { 191 } 192 193 private void doBulkloadWithoutRetry(Path dir) throws Exception { 194 Connection connection = testUtil.getConnection(); 195 LoadIncrementalHFiles h = new LoadIncrementalHFiles(conf) { 196 @Override 197 protected void bulkLoadPhase(final Table htable, final Connection conn, 198 ExecutorService pool, Deque<LoadQueueItem> queue, 199 final Multimap<ByteBuffer, LoadQueueItem> regionGroups, boolean copyFile, 200 Map<LoadQueueItem, ByteBuffer> item2RegionMap) throws IOException { 201 super.bulkLoadPhase(htable, conn, pool, queue, regionGroups, copyFile, item2RegionMap); 202 throw new MyExceptionToAvoidRetry(); // throw exception to avoid retry 203 } 204 }; 205 try { 206 h.doBulkLoad(dir, testUtil.getAdmin(), connection.getTable(TABLE), 207 connection.getRegionLocator(TABLE)); 208 Assert.fail("MyExceptionToAvoidRetry is expected"); 209 } catch (MyExceptionToAvoidRetry e) { //expected 210 } 211 } 212 213 private void prepareHFile(Path dir, byte[] key, byte[] value) throws Exception { 214 TableDescriptor desc = testUtil.getAdmin().getDescriptor(TABLE); 215 ColumnFamilyDescriptor family = desc.getColumnFamily(FAMILY); 216 Compression.Algorithm compression = HFile.DEFAULT_COMPRESSION_ALGORITHM; 217 218 CacheConfig writerCacheConf = new CacheConfig(conf, family, null, ByteBuffAllocator.HEAP); 219 writerCacheConf.setCacheDataOnWrite(false); 220 HFileContext hFileContext = new HFileContextBuilder() 221 .withIncludesMvcc(false) 222 .withIncludesTags(true) 223 .withCompression(compression) 224 .withCompressTags(family.isCompressTags()) 225 .withChecksumType(HStore.getChecksumType(conf)) 226 .withBytesPerCheckSum(HStore.getBytesPerChecksum(conf)) 227 .withBlockSize(family.getBlocksize()) 228 .withHBaseCheckSum(true) 229 .withDataBlockEncoding(family.getDataBlockEncoding()) 230 .withEncryptionContext(Encryption.Context.NONE) 231 .withCreateTime(EnvironmentEdgeManager.currentTime()) 232 .build(); 233 StoreFileWriter.Builder builder = 234 new StoreFileWriter.Builder(conf, writerCacheConf, dir.getFileSystem(conf)) 235 .withOutputDir(new Path(dir, family.getNameAsString())) 236 .withBloomType(family.getBloomFilterType()) 237 .withMaxKeyCount(Integer.MAX_VALUE) 238 .withFileContext(hFileContext); 239 StoreFileWriter writer = builder.build(); 240 241 Put put = new Put(key); 242 put.addColumn(FAMILY, COLUMN, value); 243 for (Cell c : put.get(FAMILY, COLUMN)) { 244 writer.append(c); 245 } 246 247 writer.close(); 248 } 249}