001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import java.io.IOException; 021import java.nio.ByteBuffer; 022import java.util.Arrays; 023import java.util.Collection; 024import java.util.Deque; 025import java.util.Map; 026import java.util.concurrent.ExecutorService; 027import java.util.concurrent.atomic.AtomicReference; 028import java.util.function.Consumer; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.fs.Path; 031import org.apache.hadoop.hbase.Cell; 032import org.apache.hadoop.hbase.DoNotRetryIOException; 033import org.apache.hadoop.hbase.HBaseClassTestRule; 034import org.apache.hadoop.hbase.HBaseTestingUtility; 035import org.apache.hadoop.hbase.TableName; 036import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 037import org.apache.hadoop.hbase.client.Connection; 038import org.apache.hadoop.hbase.client.Get; 039import org.apache.hadoop.hbase.client.Put; 040import org.apache.hadoop.hbase.client.Result; 041import org.apache.hadoop.hbase.client.Table; 042import org.apache.hadoop.hbase.client.TableDescriptor; 043import org.apache.hadoop.hbase.io.ByteBuffAllocator; 044import org.apache.hadoop.hbase.io.compress.Compression; 045import org.apache.hadoop.hbase.io.crypto.Encryption; 046import org.apache.hadoop.hbase.io.hfile.CacheConfig; 047import org.apache.hadoop.hbase.io.hfile.HFile; 048import org.apache.hadoop.hbase.io.hfile.HFileContext; 049import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; 050import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory; 051import org.apache.hadoop.hbase.testclassification.MediumTests; 052import org.apache.hadoop.hbase.testclassification.RegionServerTests; 053import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles; 054import org.apache.hadoop.hbase.util.Bytes; 055import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 056import org.apache.hadoop.hbase.util.Threads; 057import org.junit.After; 058import org.junit.Assert; 059import org.junit.Before; 060import org.junit.ClassRule; 061import org.junit.Test; 062import org.junit.experimental.categories.Category; 063import org.junit.runner.RunWith; 064import org.junit.runners.Parameterized; 065import org.slf4j.Logger; 066import org.slf4j.LoggerFactory; 067 068import org.apache.hbase.thirdparty.com.google.common.collect.Multimap; 069 070@RunWith(Parameterized.class) 071@Category({ RegionServerTests.class, MediumTests.class }) 072public class TestSecureBulkLoadManager { 073 074 @ClassRule 075 public static final HBaseClassTestRule CLASS_RULE = 076 HBaseClassTestRule.forClass(TestSecureBulkLoadManager.class); 077 078 private static final Logger LOG = LoggerFactory.getLogger(TestSecureBulkLoadManager.class); 079 080 private static TableName TABLE = TableName.valueOf(Bytes.toBytes("TestSecureBulkLoadManager")); 081 private static byte[] FAMILY = Bytes.toBytes("family"); 082 private static byte[] COLUMN = Bytes.toBytes("column"); 083 private static byte[] key1 = Bytes.toBytes("row1"); 084 private static byte[] key2 = Bytes.toBytes("row2"); 085 private static byte[] key3 = Bytes.toBytes("row3"); 086 private static byte[] value1 = Bytes.toBytes("t1"); 087 private static byte[] value3 = Bytes.toBytes("t3"); 088 private static byte[] SPLIT_ROWKEY = key2; 089 090 private Thread ealierBulkload; 091 private Thread laterBulkload; 092 093 protected final static HBaseTestingUtility testUtil = new HBaseTestingUtility(); 094 protected Boolean useFileBasedSFT; 095 096 private static Configuration conf = testUtil.getConfiguration(); 097 098 public TestSecureBulkLoadManager(Boolean useFileBasedSFT) { 099 this.useFileBasedSFT = useFileBasedSFT; 100 } 101 102 @Parameterized.Parameters 103 public static Collection<Boolean> data() { 104 Boolean[] data = { false, true }; 105 return Arrays.asList(data); 106 } 107 108 @Before 109 public void setUp() throws Exception { 110 if (useFileBasedSFT) { 111 conf.set(StoreFileTrackerFactory.TRACKER_IMPL, 112 "org.apache.hadoop.hbase.regionserver.storefiletracker.FileBasedStoreFileTracker"); 113 } else { 114 conf.unset(StoreFileTrackerFactory.TRACKER_IMPL); 115 } 116 testUtil.startMiniCluster(); 117 } 118 119 @After 120 public void tearDown() throws Exception { 121 testUtil.shutdownMiniCluster(); 122 testUtil.cleanupTestDir(); 123 } 124 125 /** 126 * After a secure bulkload finished , there is a clean-up for FileSystems used in the bulkload. 127 * Sometimes, FileSystems used in the finished bulkload might also be used in other bulkload 128 * calls, or there are other FileSystems created by the same user, they could be closed by a 129 * FileSystem.closeAllForUGI call. So during the clean-up, those FileSystems need to be used later 130 * can not get closed ,or else a race condition occurs. testForRaceCondition tests the case that 131 * two secure bulkload calls from the same UGI go into two different regions and one bulkload 132 * finishes earlier when the other bulkload still needs its FileSystems, checks that both 133 * bulkloads succeed. 134 */ 135 @Test 136 public void testForRaceCondition() throws Exception { 137 Consumer<HRegion> fsCreatedListener = new Consumer<HRegion>() { 138 @Override 139 public void accept(HRegion hRegion) { 140 if (hRegion.getRegionInfo().containsRow(key3)) { 141 Threads.shutdown(ealierBulkload);/// wait util the other bulkload finished 142 } 143 } 144 }; 145 testUtil.getMiniHBaseCluster().getRegionServerThreads().get(0).getRegionServer() 146 .getSecureBulkLoadManager().setFsCreatedListener(fsCreatedListener); 147 /// create table 148 testUtil.createTable(TABLE, FAMILY, Bytes.toByteArrays(SPLIT_ROWKEY)); 149 150 /// prepare files 151 Path rootdir = testUtil.getMiniHBaseCluster().getRegionServerThreads().get(0).getRegionServer() 152 .getDataRootDir(); 153 Path dir1 = new Path(rootdir, "dir1"); 154 prepareHFile(dir1, key1, value1); 155 Path dir2 = new Path(rootdir, "dir2"); 156 prepareHFile(dir2, key3, value3); 157 158 /// do bulkload 159 final AtomicReference<Throwable> t1Exception = new AtomicReference<>(); 160 final AtomicReference<Throwable> t2Exception = new AtomicReference<>(); 161 ealierBulkload = new Thread(new Runnable() { 162 @Override 163 public void run() { 164 try { 165 doBulkloadWithoutRetry(dir1); 166 } catch (Exception e) { 167 LOG.error("bulk load failed .", e); 168 t1Exception.set(e); 169 } 170 } 171 }); 172 laterBulkload = new Thread(new Runnable() { 173 @Override 174 public void run() { 175 try { 176 doBulkloadWithoutRetry(dir2); 177 } catch (Exception e) { 178 LOG.error("bulk load failed .", e); 179 t2Exception.set(e); 180 } 181 } 182 }); 183 ealierBulkload.start(); 184 laterBulkload.start(); 185 Threads.shutdown(ealierBulkload); 186 Threads.shutdown(laterBulkload); 187 Assert.assertNull(t1Exception.get()); 188 Assert.assertNull(t2Exception.get()); 189 190 /// check bulkload ok 191 Get get1 = new Get(key1); 192 Get get3 = new Get(key3); 193 Table t = testUtil.getConnection().getTable(TABLE); 194 Result r = t.get(get1); 195 Assert.assertArrayEquals(r.getValue(FAMILY, COLUMN), value1); 196 r = t.get(get3); 197 Assert.assertArrayEquals(r.getValue(FAMILY, COLUMN), value3); 198 199 } 200 201 /** 202 * A trick is used to make sure server-side failures( if any ) not being covered up by a client 203 * retry. Since LoadIncrementalHFiles.doBulkLoad keeps performing bulkload calls as long as the 204 * HFile queue is not empty, while server-side exceptions in the doAs block do not lead to a 205 * client exception, a bulkload will always succeed in this case by default, thus client will 206 * never be aware that failures have ever happened . To avoid this kind of retry , a 207 * MyExceptionToAvoidRetry exception is thrown after bulkLoadPhase finished and caught silently 208 * outside the doBulkLoad call, so that the bulkLoadPhase would be called exactly once, and 209 * server-side failures, if any ,can be checked via data. 210 */ 211 class MyExceptionToAvoidRetry extends DoNotRetryIOException { 212 } 213 214 private void doBulkloadWithoutRetry(Path dir) throws Exception { 215 Connection connection = testUtil.getConnection(); 216 LoadIncrementalHFiles h = new LoadIncrementalHFiles(conf) { 217 @Override 218 protected void bulkLoadPhase(final Table htable, final Connection conn, ExecutorService pool, 219 Deque<LoadQueueItem> queue, final Multimap<ByteBuffer, LoadQueueItem> regionGroups, 220 boolean copyFile, Map<LoadQueueItem, ByteBuffer> item2RegionMap) throws IOException { 221 super.bulkLoadPhase(htable, conn, pool, queue, regionGroups, copyFile, item2RegionMap); 222 throw new MyExceptionToAvoidRetry(); // throw exception to avoid retry 223 } 224 }; 225 try { 226 h.doBulkLoad(dir, testUtil.getAdmin(), connection.getTable(TABLE), 227 connection.getRegionLocator(TABLE)); 228 Assert.fail("MyExceptionToAvoidRetry is expected"); 229 } catch (MyExceptionToAvoidRetry e) { // expected 230 } 231 } 232 233 private void prepareHFile(Path dir, byte[] key, byte[] value) throws Exception { 234 TableDescriptor desc = testUtil.getAdmin().getDescriptor(TABLE); 235 ColumnFamilyDescriptor family = desc.getColumnFamily(FAMILY); 236 Compression.Algorithm compression = HFile.DEFAULT_COMPRESSION_ALGORITHM; 237 238 CacheConfig writerCacheConf = new CacheConfig(conf, family, null, ByteBuffAllocator.HEAP); 239 writerCacheConf.setCacheDataOnWrite(false); 240 HFileContext hFileContext = new HFileContextBuilder().withIncludesMvcc(false) 241 .withIncludesTags(true).withCompression(compression).withCompressTags(family.isCompressTags()) 242 .withChecksumType(StoreUtils.getChecksumType(conf)) 243 .withBytesPerCheckSum(StoreUtils.getBytesPerChecksum(conf)) 244 .withBlockSize(family.getBlocksize()).withHBaseCheckSum(true) 245 .withDataBlockEncoding(family.getDataBlockEncoding()) 246 .withEncryptionContext(Encryption.Context.NONE) 247 .withCreateTime(EnvironmentEdgeManager.currentTime()).build(); 248 StoreFileWriter.Builder builder = 249 new StoreFileWriter.Builder(conf, writerCacheConf, dir.getFileSystem(conf)) 250 .withOutputDir(new Path(dir, family.getNameAsString())) 251 .withBloomType(family.getBloomFilterType()).withMaxKeyCount(Integer.MAX_VALUE) 252 .withFileContext(hFileContext); 253 StoreFileWriter writer = builder.build(); 254 255 Put put = new Put(key); 256 put.addColumn(FAMILY, COLUMN, value); 257 for (Cell c : put.get(FAMILY, COLUMN)) { 258 writer.append(c); 259 } 260 261 writer.close(); 262 } 263}