001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import java.io.IOException;
021import java.nio.ByteBuffer;
022import java.util.Arrays;
023import java.util.Collection;
024import java.util.Deque;
025import java.util.Map;
026import java.util.concurrent.ExecutorService;
027import java.util.concurrent.atomic.AtomicReference;
028import java.util.function.Consumer;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.fs.Path;
031import org.apache.hadoop.hbase.Cell;
032import org.apache.hadoop.hbase.DoNotRetryIOException;
033import org.apache.hadoop.hbase.HBaseClassTestRule;
034import org.apache.hadoop.hbase.HBaseTestingUtility;
035import org.apache.hadoop.hbase.TableName;
036import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
037import org.apache.hadoop.hbase.client.Connection;
038import org.apache.hadoop.hbase.client.Get;
039import org.apache.hadoop.hbase.client.Put;
040import org.apache.hadoop.hbase.client.Result;
041import org.apache.hadoop.hbase.client.Table;
042import org.apache.hadoop.hbase.client.TableDescriptor;
043import org.apache.hadoop.hbase.io.ByteBuffAllocator;
044import org.apache.hadoop.hbase.io.compress.Compression;
045import org.apache.hadoop.hbase.io.crypto.Encryption;
046import org.apache.hadoop.hbase.io.hfile.CacheConfig;
047import org.apache.hadoop.hbase.io.hfile.HFile;
048import org.apache.hadoop.hbase.io.hfile.HFileContext;
049import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
050import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
051import org.apache.hadoop.hbase.testclassification.MediumTests;
052import org.apache.hadoop.hbase.testclassification.RegionServerTests;
053import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
054import org.apache.hadoop.hbase.util.Bytes;
055import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
056import org.apache.hadoop.hbase.util.Threads;
057import org.junit.After;
058import org.junit.Assert;
059import org.junit.Before;
060import org.junit.ClassRule;
061import org.junit.Test;
062import org.junit.experimental.categories.Category;
063import org.junit.runner.RunWith;
064import org.junit.runners.Parameterized;
065import org.slf4j.Logger;
066import org.slf4j.LoggerFactory;
067
068import org.apache.hbase.thirdparty.com.google.common.collect.Multimap;
069
070@RunWith(Parameterized.class)
071@Category({ RegionServerTests.class, MediumTests.class })
072public class TestSecureBulkLoadManager {
073
074  @ClassRule
075  public static final HBaseClassTestRule CLASS_RULE =
076    HBaseClassTestRule.forClass(TestSecureBulkLoadManager.class);
077
078  private static final Logger LOG = LoggerFactory.getLogger(TestSecureBulkLoadManager.class);
079
080  private static TableName TABLE = TableName.valueOf(Bytes.toBytes("TestSecureBulkLoadManager"));
081  private static byte[] FAMILY = Bytes.toBytes("family");
082  private static byte[] COLUMN = Bytes.toBytes("column");
083  private static byte[] key1 = Bytes.toBytes("row1");
084  private static byte[] key2 = Bytes.toBytes("row2");
085  private static byte[] key3 = Bytes.toBytes("row3");
086  private static byte[] value1 = Bytes.toBytes("t1");
087  private static byte[] value3 = Bytes.toBytes("t3");
088  private static byte[] SPLIT_ROWKEY = key2;
089
090  private Thread ealierBulkload;
091  private Thread laterBulkload;
092
093  protected final static HBaseTestingUtility testUtil = new HBaseTestingUtility();
094  protected Boolean useFileBasedSFT;
095
096  private static Configuration conf = testUtil.getConfiguration();
097
098  public TestSecureBulkLoadManager(Boolean useFileBasedSFT) {
099    this.useFileBasedSFT = useFileBasedSFT;
100  }
101
102  @Parameterized.Parameters
103  public static Collection<Boolean> data() {
104    Boolean[] data = { false, true };
105    return Arrays.asList(data);
106  }
107
108  @Before
109  public void setUp() throws Exception {
110    if (useFileBasedSFT) {
111      conf.set(StoreFileTrackerFactory.TRACKER_IMPL,
112        "org.apache.hadoop.hbase.regionserver.storefiletracker.FileBasedStoreFileTracker");
113    } else {
114      conf.unset(StoreFileTrackerFactory.TRACKER_IMPL);
115    }
116    testUtil.startMiniCluster();
117  }
118
119  @After
120  public void tearDown() throws Exception {
121    testUtil.shutdownMiniCluster();
122    testUtil.cleanupTestDir();
123  }
124
125  /**
126   * After a secure bulkload finished , there is a clean-up for FileSystems used in the bulkload.
127   * Sometimes, FileSystems used in the finished bulkload might also be used in other bulkload
128   * calls, or there are other FileSystems created by the same user, they could be closed by a
129   * FileSystem.closeAllForUGI call. So during the clean-up, those FileSystems need to be used later
130   * can not get closed ,or else a race condition occurs. testForRaceCondition tests the case that
131   * two secure bulkload calls from the same UGI go into two different regions and one bulkload
132   * finishes earlier when the other bulkload still needs its FileSystems, checks that both
133   * bulkloads succeed.
134   */
135  @Test
136  public void testForRaceCondition() throws Exception {
137    Consumer<HRegion> fsCreatedListener = new Consumer<HRegion>() {
138      @Override
139      public void accept(HRegion hRegion) {
140        if (hRegion.getRegionInfo().containsRow(key3)) {
141          Threads.shutdown(ealierBulkload);/// wait util the other bulkload finished
142        }
143      }
144    };
145    testUtil.getMiniHBaseCluster().getRegionServerThreads().get(0).getRegionServer()
146      .getSecureBulkLoadManager().setFsCreatedListener(fsCreatedListener);
147    /// create table
148    testUtil.createTable(TABLE, FAMILY, Bytes.toByteArrays(SPLIT_ROWKEY));
149
150    /// prepare files
151    Path rootdir = testUtil.getMiniHBaseCluster().getRegionServerThreads().get(0).getRegionServer()
152      .getDataRootDir();
153    Path dir1 = new Path(rootdir, "dir1");
154    prepareHFile(dir1, key1, value1);
155    Path dir2 = new Path(rootdir, "dir2");
156    prepareHFile(dir2, key3, value3);
157
158    /// do bulkload
159    final AtomicReference<Throwable> t1Exception = new AtomicReference<>();
160    final AtomicReference<Throwable> t2Exception = new AtomicReference<>();
161    ealierBulkload = new Thread(new Runnable() {
162      @Override
163      public void run() {
164        try {
165          doBulkloadWithoutRetry(dir1);
166        } catch (Exception e) {
167          LOG.error("bulk load failed .", e);
168          t1Exception.set(e);
169        }
170      }
171    });
172    laterBulkload = new Thread(new Runnable() {
173      @Override
174      public void run() {
175        try {
176          doBulkloadWithoutRetry(dir2);
177        } catch (Exception e) {
178          LOG.error("bulk load failed .", e);
179          t2Exception.set(e);
180        }
181      }
182    });
183    ealierBulkload.start();
184    laterBulkload.start();
185    Threads.shutdown(ealierBulkload);
186    Threads.shutdown(laterBulkload);
187    Assert.assertNull(t1Exception.get());
188    Assert.assertNull(t2Exception.get());
189
190    /// check bulkload ok
191    Get get1 = new Get(key1);
192    Get get3 = new Get(key3);
193    Table t = testUtil.getConnection().getTable(TABLE);
194    Result r = t.get(get1);
195    Assert.assertArrayEquals(r.getValue(FAMILY, COLUMN), value1);
196    r = t.get(get3);
197    Assert.assertArrayEquals(r.getValue(FAMILY, COLUMN), value3);
198
199  }
200
201  /**
202   * A trick is used to make sure server-side failures( if any ) not being covered up by a client
203   * retry. Since LoadIncrementalHFiles.doBulkLoad keeps performing bulkload calls as long as the
204   * HFile queue is not empty, while server-side exceptions in the doAs block do not lead to a
205   * client exception, a bulkload will always succeed in this case by default, thus client will
206   * never be aware that failures have ever happened . To avoid this kind of retry , a
207   * MyExceptionToAvoidRetry exception is thrown after bulkLoadPhase finished and caught silently
208   * outside the doBulkLoad call, so that the bulkLoadPhase would be called exactly once, and
209   * server-side failures, if any ,can be checked via data.
210   */
211  class MyExceptionToAvoidRetry extends DoNotRetryIOException {
212  }
213
214  private void doBulkloadWithoutRetry(Path dir) throws Exception {
215    Connection connection = testUtil.getConnection();
216    LoadIncrementalHFiles h = new LoadIncrementalHFiles(conf) {
217      @Override
218      protected void bulkLoadPhase(final Table htable, final Connection conn, ExecutorService pool,
219        Deque<LoadQueueItem> queue, final Multimap<ByteBuffer, LoadQueueItem> regionGroups,
220        boolean copyFile, Map<LoadQueueItem, ByteBuffer> item2RegionMap) throws IOException {
221        super.bulkLoadPhase(htable, conn, pool, queue, regionGroups, copyFile, item2RegionMap);
222        throw new MyExceptionToAvoidRetry(); // throw exception to avoid retry
223      }
224    };
225    try {
226      h.doBulkLoad(dir, testUtil.getAdmin(), connection.getTable(TABLE),
227        connection.getRegionLocator(TABLE));
228      Assert.fail("MyExceptionToAvoidRetry is expected");
229    } catch (MyExceptionToAvoidRetry e) { // expected
230    }
231  }
232
233  private void prepareHFile(Path dir, byte[] key, byte[] value) throws Exception {
234    TableDescriptor desc = testUtil.getAdmin().getDescriptor(TABLE);
235    ColumnFamilyDescriptor family = desc.getColumnFamily(FAMILY);
236    Compression.Algorithm compression = HFile.DEFAULT_COMPRESSION_ALGORITHM;
237
238    CacheConfig writerCacheConf = new CacheConfig(conf, family, null, ByteBuffAllocator.HEAP);
239    writerCacheConf.setCacheDataOnWrite(false);
240    HFileContext hFileContext = new HFileContextBuilder().withIncludesMvcc(false)
241      .withIncludesTags(true).withCompression(compression).withCompressTags(family.isCompressTags())
242      .withChecksumType(StoreUtils.getChecksumType(conf))
243      .withBytesPerCheckSum(StoreUtils.getBytesPerChecksum(conf))
244      .withBlockSize(family.getBlocksize()).withHBaseCheckSum(true)
245      .withDataBlockEncoding(family.getDataBlockEncoding())
246      .withEncryptionContext(Encryption.Context.NONE)
247      .withCreateTime(EnvironmentEdgeManager.currentTime()).build();
248    StoreFileWriter.Builder builder =
249      new StoreFileWriter.Builder(conf, writerCacheConf, dir.getFileSystem(conf))
250        .withOutputDir(new Path(dir, family.getNameAsString()))
251        .withBloomType(family.getBloomFilterType()).withMaxKeyCount(Integer.MAX_VALUE)
252        .withFileContext(hFileContext);
253    StoreFileWriter writer = builder.build();
254
255    Put put = new Put(key);
256    put.addColumn(FAMILY, COLUMN, value);
257    for (Cell c : put.get(FAMILY, COLUMN)) {
258      writer.append(c);
259    }
260
261    writer.close();
262  }
263}