001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.jupiter.api.Assertions.assertArrayEquals;
021import static org.junit.jupiter.api.Assertions.assertNull;
022import static org.junit.jupiter.api.Assertions.assertThrows;
023
024import java.io.IOException;
025import java.nio.ByteBuffer;
026import java.util.Deque;
027import java.util.Map;
028import java.util.concurrent.atomic.AtomicReference;
029import java.util.function.Consumer;
030import java.util.stream.Stream;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.fs.Path;
033import org.apache.hadoop.hbase.Cell;
034import org.apache.hadoop.hbase.DoNotRetryIOException;
035import org.apache.hadoop.hbase.ExtendedCell;
036import org.apache.hadoop.hbase.HBaseParameterizedTestTemplate;
037import org.apache.hadoop.hbase.HBaseTestingUtil;
038import org.apache.hadoop.hbase.TableName;
039import org.apache.hadoop.hbase.client.AsyncClusterConnection;
040import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
041import org.apache.hadoop.hbase.client.Get;
042import org.apache.hadoop.hbase.client.Put;
043import org.apache.hadoop.hbase.client.Result;
044import org.apache.hadoop.hbase.client.Table;
045import org.apache.hadoop.hbase.client.TableDescriptor;
046import org.apache.hadoop.hbase.io.ByteBuffAllocator;
047import org.apache.hadoop.hbase.io.compress.Compression;
048import org.apache.hadoop.hbase.io.crypto.Encryption;
049import org.apache.hadoop.hbase.io.hfile.CacheConfig;
050import org.apache.hadoop.hbase.io.hfile.HFile;
051import org.apache.hadoop.hbase.io.hfile.HFileContext;
052import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
053import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
054import org.apache.hadoop.hbase.testclassification.MediumTests;
055import org.apache.hadoop.hbase.testclassification.RegionServerTests;
056import org.apache.hadoop.hbase.tool.BulkLoadHFilesTool;
057import org.apache.hadoop.hbase.util.Bytes;
058import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
059import org.apache.hadoop.hbase.util.Threads;
060import org.junit.jupiter.api.AfterEach;
061import org.junit.jupiter.api.BeforeEach;
062import org.junit.jupiter.api.Tag;
063import org.junit.jupiter.api.TestTemplate;
064import org.junit.jupiter.params.provider.Arguments;
065import org.slf4j.Logger;
066import org.slf4j.LoggerFactory;
067
068import org.apache.hbase.thirdparty.com.google.common.collect.Multimap;
069
070@Tag(RegionServerTests.TAG)
071@Tag(MediumTests.TAG)
072@HBaseParameterizedTestTemplate(name = "{index}: useFileBasedSFT={0}")
073public class TestSecureBulkLoadManager {
074
075  private static final Logger LOG = LoggerFactory.getLogger(TestSecureBulkLoadManager.class);
076
077  private static TableName TABLE = TableName.valueOf(Bytes.toBytes("TestSecureBulkLoadManager"));
078  private static byte[] FAMILY = Bytes.toBytes("family");
079  private static byte[] COLUMN = Bytes.toBytes("column");
080  private static byte[] key1 = Bytes.toBytes("row1");
081  private static byte[] key2 = Bytes.toBytes("row2");
082  private static byte[] key3 = Bytes.toBytes("row3");
083  private static byte[] value1 = Bytes.toBytes("t1");
084  private static byte[] value3 = Bytes.toBytes("t3");
085  private static byte[] SPLIT_ROWKEY = key2;
086
087  private Thread ealierBulkload;
088  private Thread laterBulkload;
089
090  protected Boolean useFileBasedSFT;
091
092  protected final static HBaseTestingUtil testUtil = new HBaseTestingUtil();
093  private static Configuration conf = testUtil.getConfiguration();
094
095  public TestSecureBulkLoadManager(Boolean useFileBasedSFT) {
096    this.useFileBasedSFT = useFileBasedSFT;
097  }
098
099  public static Stream<Arguments> parameters() {
100    return Stream.of(Arguments.of(false), Arguments.of(true));
101  }
102
103  @BeforeEach
104  public void setUp() throws Exception {
105    if (useFileBasedSFT) {
106      conf.set(StoreFileTrackerFactory.TRACKER_IMPL,
107        "org.apache.hadoop.hbase.regionserver.storefiletracker.FileBasedStoreFileTracker");
108    } else {
109      conf.unset(StoreFileTrackerFactory.TRACKER_IMPL);
110    }
111    testUtil.startMiniCluster();
112  }
113
114  @AfterEach
115  public void tearDown() throws Exception {
116    testUtil.shutdownMiniCluster();
117    testUtil.cleanupTestDir();
118  }
119
120  /**
121   * After a secure bulkload finished , there is a clean-up for FileSystems used in the bulkload.
122   * Sometimes, FileSystems used in the finished bulkload might also be used in other bulkload
123   * calls, or there are other FileSystems created by the same user, they could be closed by a
124   * FileSystem.closeAllForUGI call. So during the clean-up, those FileSystems need to be used later
125   * can not get closed ,or else a race condition occurs. testForRaceCondition tests the case that
126   * two secure bulkload calls from the same UGI go into two different regions and one bulkload
127   * finishes earlier when the other bulkload still needs its FileSystems, checks that both
128   * bulkloads succeed.
129   */
130  @TestTemplate
131  public void testForRaceCondition() throws Exception {
132    Consumer<HRegion> fsCreatedListener = new Consumer<HRegion>() {
133      @Override
134      public void accept(HRegion hRegion) {
135        if (hRegion.getRegionInfo().containsRow(key3)) {
136          Threads.shutdown(ealierBulkload);/// wait util the other bulkload finished
137        }
138      }
139    };
140    testUtil.getMiniHBaseCluster().getRegionServerThreads().get(0).getRegionServer()
141      .getSecureBulkLoadManager().setFsCreatedListener(fsCreatedListener);
142    /// create table
143    testUtil.createTable(TABLE, FAMILY, Bytes.toByteArrays(SPLIT_ROWKEY));
144
145    /// prepare files
146    Path rootdir = testUtil.getMiniHBaseCluster().getRegionServerThreads().get(0).getRegionServer()
147      .getDataRootDir();
148    Path dir1 = new Path(rootdir, "dir1");
149    prepareHFile(dir1, key1, value1);
150    Path dir2 = new Path(rootdir, "dir2");
151    prepareHFile(dir2, key3, value3);
152
153    /// do bulkload
154    final AtomicReference<Throwable> t1Exception = new AtomicReference<>();
155    final AtomicReference<Throwable> t2Exception = new AtomicReference<>();
156    ealierBulkload = new Thread(new Runnable() {
157      @Override
158      public void run() {
159        try {
160          doBulkloadWithoutRetry(dir1);
161        } catch (Exception e) {
162          LOG.error("bulk load failed .", e);
163          t1Exception.set(e);
164        }
165      }
166    });
167    laterBulkload = new Thread(new Runnable() {
168      @Override
169      public void run() {
170        try {
171          doBulkloadWithoutRetry(dir2);
172        } catch (Exception e) {
173          LOG.error("bulk load failed .", e);
174          t2Exception.set(e);
175        }
176      }
177    });
178    ealierBulkload.start();
179    laterBulkload.start();
180    Threads.shutdown(ealierBulkload);
181    Threads.shutdown(laterBulkload);
182    assertNull(t1Exception.get());
183    assertNull(t2Exception.get());
184
185    /// check bulkload ok
186    Get get1 = new Get(key1);
187    Get get3 = new Get(key3);
188    Table t = testUtil.getConnection().getTable(TABLE);
189    Result r = t.get(get1);
190    assertArrayEquals(r.getValue(FAMILY, COLUMN), value1);
191    r = t.get(get3);
192    assertArrayEquals(r.getValue(FAMILY, COLUMN), value3);
193
194  }
195
196  /**
197   * A trick is used to make sure server-side failures( if any ) not being covered up by a client
198   * retry. Since BulkLoadHFilesTool.bulkLoad keeps performing bulkload calls as long as the HFile
199   * queue is not empty, while server-side exceptions in the doAs block do not lead to a client
200   * exception, a bulkload will always succeed in this case by default, thus client will never be
201   * aware that failures have ever happened . To avoid this kind of retry , a
202   * MyExceptionToAvoidRetry exception is thrown after bulkLoadPhase finished and caught silently
203   * outside the doBulkLoad call, so that the bulkLoadPhase would be called exactly once, and
204   * server-side failures, if any ,can be checked via data.
205   */
206  class MyExceptionToAvoidRetry extends DoNotRetryIOException {
207
208    private static final long serialVersionUID = -6802760664998771151L;
209  }
210
211  private void doBulkloadWithoutRetry(Path dir) throws Exception {
212    BulkLoadHFilesTool h = new BulkLoadHFilesTool(conf) {
213
214      @Override
215      protected void bulkLoadPhase(AsyncClusterConnection conn, TableName tableName,
216        Deque<LoadQueueItem> queue, Multimap<ByteBuffer, LoadQueueItem> regionGroups,
217        boolean copyFiles, Map<LoadQueueItem, ByteBuffer> item2RegionMap) throws IOException {
218        super.bulkLoadPhase(conn, tableName, queue, regionGroups, copyFiles, item2RegionMap);
219        throw new MyExceptionToAvoidRetry(); // throw exception to avoid retry
220      }
221    };
222    assertThrows(MyExceptionToAvoidRetry.class, () -> h.bulkLoad(TABLE, dir),
223      "MyExceptionToAvoidRetry is expected");
224  }
225
226  private void prepareHFile(Path dir, byte[] key, byte[] value) throws Exception {
227    TableDescriptor desc = testUtil.getAdmin().getDescriptor(TABLE);
228    ColumnFamilyDescriptor family = desc.getColumnFamily(FAMILY);
229    Compression.Algorithm compression = HFile.DEFAULT_COMPRESSION_ALGORITHM;
230
231    CacheConfig writerCacheConf = new CacheConfig(conf, family, null, ByteBuffAllocator.HEAP);
232    writerCacheConf.setCacheDataOnWrite(false);
233    HFileContext hFileContext = new HFileContextBuilder().withIncludesMvcc(false)
234      .withIncludesTags(true).withCompression(compression).withCompressTags(family.isCompressTags())
235      .withChecksumType(StoreUtils.getChecksumType(conf))
236      .withBytesPerCheckSum(StoreUtils.getBytesPerChecksum(conf))
237      .withBlockSize(family.getBlocksize()).withHBaseCheckSum(true)
238      .withDataBlockEncoding(family.getDataBlockEncoding())
239      .withEncryptionContext(Encryption.Context.NONE)
240      .withCreateTime(EnvironmentEdgeManager.currentTime()).build();
241    StoreFileWriter.Builder builder =
242      new StoreFileWriter.Builder(conf, writerCacheConf, dir.getFileSystem(conf))
243        .withOutputDir(new Path(dir, family.getNameAsString()))
244        .withBloomType(family.getBloomFilterType()).withMaxKeyCount(Integer.MAX_VALUE)
245        .withFileContext(hFileContext);
246    StoreFileWriter writer = builder.build();
247
248    Put put = new Put(key);
249    put.addColumn(FAMILY, COLUMN, value);
250    for (Cell c : put.get(FAMILY, COLUMN)) {
251      writer.append((ExtendedCell) c);
252    }
253
254    writer.close();
255  }
256}