001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import java.io.IOException; 021import java.nio.ByteBuffer; 022import java.util.Arrays; 023import java.util.Collection; 024import java.util.Deque; 025import java.util.Map; 026import java.util.concurrent.atomic.AtomicReference; 027import java.util.function.Consumer; 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.fs.Path; 030import org.apache.hadoop.hbase.Cell; 031import org.apache.hadoop.hbase.DoNotRetryIOException; 032import org.apache.hadoop.hbase.HBaseClassTestRule; 033import org.apache.hadoop.hbase.HBaseTestingUtil; 034import org.apache.hadoop.hbase.TableName; 035import org.apache.hadoop.hbase.client.AsyncClusterConnection; 036import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 037import org.apache.hadoop.hbase.client.Get; 038import org.apache.hadoop.hbase.client.Put; 039import org.apache.hadoop.hbase.client.Result; 040import org.apache.hadoop.hbase.client.Table; 041import org.apache.hadoop.hbase.client.TableDescriptor; 042import org.apache.hadoop.hbase.io.ByteBuffAllocator; 043import org.apache.hadoop.hbase.io.compress.Compression; 044import org.apache.hadoop.hbase.io.crypto.Encryption; 045import org.apache.hadoop.hbase.io.hfile.CacheConfig; 046import org.apache.hadoop.hbase.io.hfile.HFile; 047import org.apache.hadoop.hbase.io.hfile.HFileContext; 048import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; 049import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory; 050import org.apache.hadoop.hbase.testclassification.MediumTests; 051import org.apache.hadoop.hbase.testclassification.RegionServerTests; 052import org.apache.hadoop.hbase.tool.BulkLoadHFilesTool; 053import org.apache.hadoop.hbase.util.Bytes; 054import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 055import org.apache.hadoop.hbase.util.Threads; 056import org.junit.After; 057import org.junit.Assert; 058import org.junit.Before; 059import org.junit.ClassRule; 060import org.junit.Test; 061import org.junit.experimental.categories.Category; 062import org.junit.runner.RunWith; 063import org.junit.runners.Parameterized; 064import org.slf4j.Logger; 065import org.slf4j.LoggerFactory; 066 067import org.apache.hbase.thirdparty.com.google.common.collect.Multimap; 068 069@RunWith(Parameterized.class) 070@Category({ RegionServerTests.class, MediumTests.class }) 071public class TestSecureBulkLoadManager { 072 073 @ClassRule 074 public static final HBaseClassTestRule CLASS_RULE = 075 HBaseClassTestRule.forClass(TestSecureBulkLoadManager.class); 076 077 private static final Logger LOG = LoggerFactory.getLogger(TestSecureBulkLoadManager.class); 078 079 private static TableName TABLE = TableName.valueOf(Bytes.toBytes("TestSecureBulkLoadManager")); 080 private static byte[] FAMILY = Bytes.toBytes("family"); 081 private static byte[] COLUMN = Bytes.toBytes("column"); 082 private static byte[] key1 = Bytes.toBytes("row1"); 083 private static byte[] key2 = Bytes.toBytes("row2"); 084 private static byte[] key3 = Bytes.toBytes("row3"); 085 private static byte[] value1 = Bytes.toBytes("t1"); 086 private static byte[] value3 = Bytes.toBytes("t3"); 087 private static byte[] SPLIT_ROWKEY = key2; 088 089 private Thread ealierBulkload; 090 private Thread laterBulkload; 091 092 protected Boolean useFileBasedSFT; 093 094 protected final static HBaseTestingUtil testUtil = new HBaseTestingUtil(); 095 private static Configuration conf = testUtil.getConfiguration(); 096 097 public TestSecureBulkLoadManager(Boolean useFileBasedSFT) { 098 this.useFileBasedSFT = useFileBasedSFT; 099 } 100 101 @Parameterized.Parameters 102 public static Collection<Boolean> data() { 103 Boolean[] data = { false, true }; 104 return Arrays.asList(data); 105 } 106 107 @Before 108 public void setUp() throws Exception { 109 if (useFileBasedSFT) { 110 conf.set(StoreFileTrackerFactory.TRACKER_IMPL, 111 "org.apache.hadoop.hbase.regionserver.storefiletracker.FileBasedStoreFileTracker"); 112 } else { 113 conf.unset(StoreFileTrackerFactory.TRACKER_IMPL); 114 } 115 testUtil.startMiniCluster(); 116 } 117 118 @After 119 public void tearDown() throws Exception { 120 testUtil.shutdownMiniCluster(); 121 testUtil.cleanupTestDir(); 122 } 123 124 /** 125 * After a secure bulkload finished , there is a clean-up for FileSystems used in the bulkload. 126 * Sometimes, FileSystems used in the finished bulkload might also be used in other bulkload 127 * calls, or there are other FileSystems created by the same user, they could be closed by a 128 * FileSystem.closeAllForUGI call. So during the clean-up, those FileSystems need to be used later 129 * can not get closed ,or else a race condition occurs. testForRaceCondition tests the case that 130 * two secure bulkload calls from the same UGI go into two different regions and one bulkload 131 * finishes earlier when the other bulkload still needs its FileSystems, checks that both 132 * bulkloads succeed. 133 */ 134 @Test 135 public void testForRaceCondition() throws Exception { 136 Consumer<HRegion> fsCreatedListener = new Consumer<HRegion>() { 137 @Override 138 public void accept(HRegion hRegion) { 139 if (hRegion.getRegionInfo().containsRow(key3)) { 140 Threads.shutdown(ealierBulkload);/// wait util the other bulkload finished 141 } 142 } 143 }; 144 testUtil.getMiniHBaseCluster().getRegionServerThreads().get(0).getRegionServer() 145 .getSecureBulkLoadManager().setFsCreatedListener(fsCreatedListener); 146 /// create table 147 testUtil.createTable(TABLE, FAMILY, Bytes.toByteArrays(SPLIT_ROWKEY)); 148 149 /// prepare files 150 Path rootdir = testUtil.getMiniHBaseCluster().getRegionServerThreads().get(0).getRegionServer() 151 .getDataRootDir(); 152 Path dir1 = new Path(rootdir, "dir1"); 153 prepareHFile(dir1, key1, value1); 154 Path dir2 = new Path(rootdir, "dir2"); 155 prepareHFile(dir2, key3, value3); 156 157 /// do bulkload 158 final AtomicReference<Throwable> t1Exception = new AtomicReference<>(); 159 final AtomicReference<Throwable> t2Exception = new AtomicReference<>(); 160 ealierBulkload = new Thread(new Runnable() { 161 @Override 162 public void run() { 163 try { 164 doBulkloadWithoutRetry(dir1); 165 } catch (Exception e) { 166 LOG.error("bulk load failed .", e); 167 t1Exception.set(e); 168 } 169 } 170 }); 171 laterBulkload = new Thread(new Runnable() { 172 @Override 173 public void run() { 174 try { 175 doBulkloadWithoutRetry(dir2); 176 } catch (Exception e) { 177 LOG.error("bulk load failed .", e); 178 t2Exception.set(e); 179 } 180 } 181 }); 182 ealierBulkload.start(); 183 laterBulkload.start(); 184 Threads.shutdown(ealierBulkload); 185 Threads.shutdown(laterBulkload); 186 Assert.assertNull(t1Exception.get()); 187 Assert.assertNull(t2Exception.get()); 188 189 /// check bulkload ok 190 Get get1 = new Get(key1); 191 Get get3 = new Get(key3); 192 Table t = testUtil.getConnection().getTable(TABLE); 193 Result r = t.get(get1); 194 Assert.assertArrayEquals(r.getValue(FAMILY, COLUMN), value1); 195 r = t.get(get3); 196 Assert.assertArrayEquals(r.getValue(FAMILY, COLUMN), value3); 197 198 } 199 200 /** 201 * A trick is used to make sure server-side failures( if any ) not being covered up by a client 202 * retry. Since BulkLoadHFilesTool.bulkLoad keeps performing bulkload calls as long as the HFile 203 * queue is not empty, while server-side exceptions in the doAs block do not lead to a client 204 * exception, a bulkload will always succeed in this case by default, thus client will never be 205 * aware that failures have ever happened . To avoid this kind of retry , a 206 * MyExceptionToAvoidRetry exception is thrown after bulkLoadPhase finished and caught silently 207 * outside the doBulkLoad call, so that the bulkLoadPhase would be called exactly once, and 208 * server-side failures, if any ,can be checked via data. 209 */ 210 class MyExceptionToAvoidRetry extends DoNotRetryIOException { 211 212 private static final long serialVersionUID = -6802760664998771151L; 213 } 214 215 private void doBulkloadWithoutRetry(Path dir) throws Exception { 216 BulkLoadHFilesTool h = new BulkLoadHFilesTool(conf) { 217 218 @Override 219 protected void bulkLoadPhase(AsyncClusterConnection conn, TableName tableName, 220 Deque<LoadQueueItem> queue, Multimap<ByteBuffer, LoadQueueItem> regionGroups, 221 boolean copyFiles, Map<LoadQueueItem, ByteBuffer> item2RegionMap) throws IOException { 222 super.bulkLoadPhase(conn, tableName, queue, regionGroups, copyFiles, item2RegionMap); 223 throw new MyExceptionToAvoidRetry(); // throw exception to avoid retry 224 } 225 }; 226 try { 227 h.bulkLoad(TABLE, dir); 228 Assert.fail("MyExceptionToAvoidRetry is expected"); 229 } catch (MyExceptionToAvoidRetry e) { // expected 230 } 231 } 232 233 private void prepareHFile(Path dir, byte[] key, byte[] value) throws Exception { 234 TableDescriptor desc = testUtil.getAdmin().getDescriptor(TABLE); 235 ColumnFamilyDescriptor family = desc.getColumnFamily(FAMILY); 236 Compression.Algorithm compression = HFile.DEFAULT_COMPRESSION_ALGORITHM; 237 238 CacheConfig writerCacheConf = new CacheConfig(conf, family, null, ByteBuffAllocator.HEAP); 239 writerCacheConf.setCacheDataOnWrite(false); 240 HFileContext hFileContext = new HFileContextBuilder().withIncludesMvcc(false) 241 .withIncludesTags(true).withCompression(compression).withCompressTags(family.isCompressTags()) 242 .withChecksumType(StoreUtils.getChecksumType(conf)) 243 .withBytesPerCheckSum(StoreUtils.getBytesPerChecksum(conf)) 244 .withBlockSize(family.getBlocksize()).withHBaseCheckSum(true) 245 .withDataBlockEncoding(family.getDataBlockEncoding()) 246 .withEncryptionContext(Encryption.Context.NONE) 247 .withCreateTime(EnvironmentEdgeManager.currentTime()).build(); 248 StoreFileWriter.Builder builder = 249 new StoreFileWriter.Builder(conf, writerCacheConf, dir.getFileSystem(conf)) 250 .withOutputDir(new Path(dir, family.getNameAsString())) 251 .withBloomType(family.getBloomFilterType()).withMaxKeyCount(Integer.MAX_VALUE) 252 .withFileContext(hFileContext); 253 StoreFileWriter writer = builder.build(); 254 255 Put put = new Put(key); 256 put.addColumn(FAMILY, COLUMN, value); 257 for (Cell c : put.get(FAMILY, COLUMN)) { 258 writer.append(c); 259 } 260 261 writer.close(); 262 } 263}