001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.junit.jupiter.api.Assertions.assertArrayEquals; 021import static org.junit.jupiter.api.Assertions.assertNull; 022import static org.junit.jupiter.api.Assertions.assertThrows; 023 024import java.io.IOException; 025import java.nio.ByteBuffer; 026import java.util.Deque; 027import java.util.Map; 028import java.util.concurrent.atomic.AtomicReference; 029import java.util.function.Consumer; 030import java.util.stream.Stream; 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.fs.Path; 033import org.apache.hadoop.hbase.Cell; 034import org.apache.hadoop.hbase.DoNotRetryIOException; 035import org.apache.hadoop.hbase.ExtendedCell; 036import org.apache.hadoop.hbase.HBaseParameterizedTestTemplate; 037import org.apache.hadoop.hbase.HBaseTestingUtil; 038import org.apache.hadoop.hbase.TableName; 039import org.apache.hadoop.hbase.client.AsyncClusterConnection; 040import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 041import org.apache.hadoop.hbase.client.Get; 042import org.apache.hadoop.hbase.client.Put; 043import org.apache.hadoop.hbase.client.Result; 044import org.apache.hadoop.hbase.client.Table; 045import org.apache.hadoop.hbase.client.TableDescriptor; 046import org.apache.hadoop.hbase.io.ByteBuffAllocator; 047import org.apache.hadoop.hbase.io.compress.Compression; 048import org.apache.hadoop.hbase.io.crypto.Encryption; 049import org.apache.hadoop.hbase.io.hfile.CacheConfig; 050import org.apache.hadoop.hbase.io.hfile.HFile; 051import org.apache.hadoop.hbase.io.hfile.HFileContext; 052import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; 053import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory; 054import org.apache.hadoop.hbase.testclassification.MediumTests; 055import org.apache.hadoop.hbase.testclassification.RegionServerTests; 056import org.apache.hadoop.hbase.tool.BulkLoadHFilesTool; 057import org.apache.hadoop.hbase.util.Bytes; 058import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 059import org.apache.hadoop.hbase.util.Threads; 060import org.junit.jupiter.api.AfterEach; 061import org.junit.jupiter.api.BeforeEach; 062import org.junit.jupiter.api.Tag; 063import org.junit.jupiter.api.TestTemplate; 064import org.junit.jupiter.params.provider.Arguments; 065import org.slf4j.Logger; 066import org.slf4j.LoggerFactory; 067 068import org.apache.hbase.thirdparty.com.google.common.collect.Multimap; 069 070@Tag(RegionServerTests.TAG) 071@Tag(MediumTests.TAG) 072@HBaseParameterizedTestTemplate(name = "{index}: useFileBasedSFT={0}") 073public class TestSecureBulkLoadManager { 074 075 private static final Logger LOG = LoggerFactory.getLogger(TestSecureBulkLoadManager.class); 076 077 private static TableName TABLE = TableName.valueOf(Bytes.toBytes("TestSecureBulkLoadManager")); 078 private static byte[] FAMILY = Bytes.toBytes("family"); 079 private static byte[] COLUMN = Bytes.toBytes("column"); 080 private static byte[] key1 = Bytes.toBytes("row1"); 081 private static byte[] key2 = Bytes.toBytes("row2"); 082 private static byte[] key3 = Bytes.toBytes("row3"); 083 private static byte[] value1 = Bytes.toBytes("t1"); 084 private static byte[] value3 = Bytes.toBytes("t3"); 085 private static byte[] SPLIT_ROWKEY = key2; 086 087 private Thread ealierBulkload; 088 private Thread laterBulkload; 089 090 protected Boolean useFileBasedSFT; 091 092 protected final static HBaseTestingUtil testUtil = new HBaseTestingUtil(); 093 private static Configuration conf = testUtil.getConfiguration(); 094 095 public TestSecureBulkLoadManager(Boolean useFileBasedSFT) { 096 this.useFileBasedSFT = useFileBasedSFT; 097 } 098 099 public static Stream<Arguments> parameters() { 100 return Stream.of(Arguments.of(false), Arguments.of(true)); 101 } 102 103 @BeforeEach 104 public void setUp() throws Exception { 105 if (useFileBasedSFT) { 106 conf.set(StoreFileTrackerFactory.TRACKER_IMPL, 107 "org.apache.hadoop.hbase.regionserver.storefiletracker.FileBasedStoreFileTracker"); 108 } else { 109 conf.unset(StoreFileTrackerFactory.TRACKER_IMPL); 110 } 111 testUtil.startMiniCluster(); 112 } 113 114 @AfterEach 115 public void tearDown() throws Exception { 116 testUtil.shutdownMiniCluster(); 117 testUtil.cleanupTestDir(); 118 } 119 120 /** 121 * After a secure bulkload finished , there is a clean-up for FileSystems used in the bulkload. 122 * Sometimes, FileSystems used in the finished bulkload might also be used in other bulkload 123 * calls, or there are other FileSystems created by the same user, they could be closed by a 124 * FileSystem.closeAllForUGI call. So during the clean-up, those FileSystems need to be used later 125 * can not get closed ,or else a race condition occurs. testForRaceCondition tests the case that 126 * two secure bulkload calls from the same UGI go into two different regions and one bulkload 127 * finishes earlier when the other bulkload still needs its FileSystems, checks that both 128 * bulkloads succeed. 129 */ 130 @TestTemplate 131 public void testForRaceCondition() throws Exception { 132 Consumer<HRegion> fsCreatedListener = new Consumer<HRegion>() { 133 @Override 134 public void accept(HRegion hRegion) { 135 if (hRegion.getRegionInfo().containsRow(key3)) { 136 Threads.shutdown(ealierBulkload);/// wait util the other bulkload finished 137 } 138 } 139 }; 140 testUtil.getMiniHBaseCluster().getRegionServerThreads().get(0).getRegionServer() 141 .getSecureBulkLoadManager().setFsCreatedListener(fsCreatedListener); 142 /// create table 143 testUtil.createTable(TABLE, FAMILY, Bytes.toByteArrays(SPLIT_ROWKEY)); 144 145 /// prepare files 146 Path rootdir = testUtil.getMiniHBaseCluster().getRegionServerThreads().get(0).getRegionServer() 147 .getDataRootDir(); 148 Path dir1 = new Path(rootdir, "dir1"); 149 prepareHFile(dir1, key1, value1); 150 Path dir2 = new Path(rootdir, "dir2"); 151 prepareHFile(dir2, key3, value3); 152 153 /// do bulkload 154 final AtomicReference<Throwable> t1Exception = new AtomicReference<>(); 155 final AtomicReference<Throwable> t2Exception = new AtomicReference<>(); 156 ealierBulkload = new Thread(new Runnable() { 157 @Override 158 public void run() { 159 try { 160 doBulkloadWithoutRetry(dir1); 161 } catch (Exception e) { 162 LOG.error("bulk load failed .", e); 163 t1Exception.set(e); 164 } 165 } 166 }); 167 laterBulkload = new Thread(new Runnable() { 168 @Override 169 public void run() { 170 try { 171 doBulkloadWithoutRetry(dir2); 172 } catch (Exception e) { 173 LOG.error("bulk load failed .", e); 174 t2Exception.set(e); 175 } 176 } 177 }); 178 ealierBulkload.start(); 179 laterBulkload.start(); 180 Threads.shutdown(ealierBulkload); 181 Threads.shutdown(laterBulkload); 182 assertNull(t1Exception.get()); 183 assertNull(t2Exception.get()); 184 185 /// check bulkload ok 186 Get get1 = new Get(key1); 187 Get get3 = new Get(key3); 188 Table t = testUtil.getConnection().getTable(TABLE); 189 Result r = t.get(get1); 190 assertArrayEquals(r.getValue(FAMILY, COLUMN), value1); 191 r = t.get(get3); 192 assertArrayEquals(r.getValue(FAMILY, COLUMN), value3); 193 194 } 195 196 /** 197 * A trick is used to make sure server-side failures( if any ) not being covered up by a client 198 * retry. Since BulkLoadHFilesTool.bulkLoad keeps performing bulkload calls as long as the HFile 199 * queue is not empty, while server-side exceptions in the doAs block do not lead to a client 200 * exception, a bulkload will always succeed in this case by default, thus client will never be 201 * aware that failures have ever happened . To avoid this kind of retry , a 202 * MyExceptionToAvoidRetry exception is thrown after bulkLoadPhase finished and caught silently 203 * outside the doBulkLoad call, so that the bulkLoadPhase would be called exactly once, and 204 * server-side failures, if any ,can be checked via data. 205 */ 206 class MyExceptionToAvoidRetry extends DoNotRetryIOException { 207 208 private static final long serialVersionUID = -6802760664998771151L; 209 } 210 211 private void doBulkloadWithoutRetry(Path dir) throws Exception { 212 BulkLoadHFilesTool h = new BulkLoadHFilesTool(conf) { 213 214 @Override 215 protected void bulkLoadPhase(AsyncClusterConnection conn, TableName tableName, 216 Deque<LoadQueueItem> queue, Multimap<ByteBuffer, LoadQueueItem> regionGroups, 217 boolean copyFiles, Map<LoadQueueItem, ByteBuffer> item2RegionMap) throws IOException { 218 super.bulkLoadPhase(conn, tableName, queue, regionGroups, copyFiles, item2RegionMap); 219 throw new MyExceptionToAvoidRetry(); // throw exception to avoid retry 220 } 221 }; 222 assertThrows(MyExceptionToAvoidRetry.class, () -> h.bulkLoad(TABLE, dir), 223 "MyExceptionToAvoidRetry is expected"); 224 } 225 226 private void prepareHFile(Path dir, byte[] key, byte[] value) throws Exception { 227 TableDescriptor desc = testUtil.getAdmin().getDescriptor(TABLE); 228 ColumnFamilyDescriptor family = desc.getColumnFamily(FAMILY); 229 Compression.Algorithm compression = HFile.DEFAULT_COMPRESSION_ALGORITHM; 230 231 CacheConfig writerCacheConf = new CacheConfig(conf, family, null, ByteBuffAllocator.HEAP); 232 writerCacheConf.setCacheDataOnWrite(false); 233 HFileContext hFileContext = new HFileContextBuilder().withIncludesMvcc(false) 234 .withIncludesTags(true).withCompression(compression).withCompressTags(family.isCompressTags()) 235 .withChecksumType(StoreUtils.getChecksumType(conf)) 236 .withBytesPerCheckSum(StoreUtils.getBytesPerChecksum(conf)) 237 .withBlockSize(family.getBlocksize()).withHBaseCheckSum(true) 238 .withDataBlockEncoding(family.getDataBlockEncoding()) 239 .withEncryptionContext(Encryption.Context.NONE) 240 .withCreateTime(EnvironmentEdgeManager.currentTime()).build(); 241 StoreFileWriter.Builder builder = 242 new StoreFileWriter.Builder(conf, writerCacheConf, dir.getFileSystem(conf)) 243 .withOutputDir(new Path(dir, family.getNameAsString())) 244 .withBloomType(family.getBloomFilterType()).withMaxKeyCount(Integer.MAX_VALUE) 245 .withFileContext(hFileContext); 246 StoreFileWriter writer = builder.build(); 247 248 Put put = new Put(key); 249 put.addColumn(FAMILY, COLUMN, value); 250 for (Cell c : put.get(FAMILY, COLUMN)) { 251 writer.append((ExtendedCell) c); 252 } 253 254 writer.close(); 255 } 256}