001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import java.io.IOException;
021import java.nio.ByteBuffer;
022import java.util.Deque;
023import java.util.Map;
024import java.util.concurrent.ExecutorService;
025import java.util.concurrent.atomic.AtomicReference;
026import java.util.function.Consumer;
027
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.fs.Path;
030import org.apache.hadoop.hbase.Cell;
031import org.apache.hadoop.hbase.DoNotRetryIOException;
032import org.apache.hadoop.hbase.HBaseClassTestRule;
033import org.apache.hadoop.hbase.HBaseTestingUtility;
034import org.apache.hadoop.hbase.TableName;
035import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
036import org.apache.hadoop.hbase.client.Connection;
037import org.apache.hadoop.hbase.client.Get;
038import org.apache.hadoop.hbase.client.Put;
039import org.apache.hadoop.hbase.client.Result;
040import org.apache.hadoop.hbase.client.Table;
041import org.apache.hadoop.hbase.client.TableDescriptor;
042import org.apache.hadoop.hbase.io.ByteBuffAllocator;
043import org.apache.hadoop.hbase.io.compress.Compression;
044import org.apache.hadoop.hbase.io.crypto.Encryption;
045import org.apache.hadoop.hbase.io.hfile.CacheConfig;
046import org.apache.hadoop.hbase.io.hfile.HFile;
047import org.apache.hadoop.hbase.io.hfile.HFileContext;
048import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
049import org.apache.hadoop.hbase.testclassification.MediumTests;
050import org.apache.hadoop.hbase.testclassification.RegionServerTests;
051import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
052import org.apache.hadoop.hbase.util.Bytes;
053import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
054import org.apache.hadoop.hbase.util.Threads;
055import org.junit.AfterClass;
056import org.junit.Assert;
057import org.junit.BeforeClass;
058import org.junit.ClassRule;
059import org.junit.Test;
060import org.junit.experimental.categories.Category;
061import org.slf4j.Logger;
062import org.slf4j.LoggerFactory;
063import org.apache.hbase.thirdparty.com.google.common.collect.Multimap;
064
065
066@Category({RegionServerTests.class, MediumTests.class})
067public class TestSecureBulkLoadManager {
068
069  @ClassRule
070  public static final HBaseClassTestRule CLASS_RULE =
071      HBaseClassTestRule.forClass(TestSecureBulkLoadManager.class);
072
073  private static final Logger LOG =
074      LoggerFactory.getLogger(TestSecureBulkLoadManager.class);
075
076  private static TableName TABLE = TableName.valueOf(Bytes.toBytes("TestSecureBulkLoadManager"));
077  private static byte[] FAMILY = Bytes.toBytes("family");
078  private static byte[] COLUMN = Bytes.toBytes("column");
079  private static byte[] key1 = Bytes.toBytes("row1");
080  private static byte[] key2 = Bytes.toBytes("row2");
081  private static byte[] key3 = Bytes.toBytes("row3");
082  private static byte[] value1 = Bytes.toBytes("t1");
083  private static byte[] value3 = Bytes.toBytes("t3");
084  private static byte[] SPLIT_ROWKEY = key2;
085
086  private Thread ealierBulkload;
087  private Thread laterBulkload;
088
089  protected final static HBaseTestingUtility testUtil = new HBaseTestingUtility();
090  private static Configuration conf = testUtil.getConfiguration();
091
092  @BeforeClass
093  public static void setUp() throws Exception {
094    testUtil.startMiniCluster();
095  }
096
097  @AfterClass
098  public static void tearDown() throws Exception {
099    testUtil.shutdownMiniCluster();
100    testUtil.cleanupTestDir();
101  }
102
103  /**
104   * After a secure bulkload finished , there is a clean-up for FileSystems used in the bulkload.
105   * Sometimes, FileSystems used in the finished bulkload might also be used in other bulkload
106   * calls, or there are other FileSystems created by the same user, they could be closed by a
107   * FileSystem.closeAllForUGI call. So during the clean-up, those FileSystems need to be used
108   * later can not get closed ,or else a race condition occurs.
109   *
110   * testForRaceCondition tests the case that two secure bulkload calls from the same UGI go
111   * into two different regions and one bulkload finishes earlier when the other bulkload still
112   * needs its FileSystems, checks that both bulkloads succeed.
113   */
114  @Test
115  public void testForRaceCondition() throws Exception {
116    Consumer<HRegion> fsCreatedListener = new Consumer<HRegion>() {
117      @Override
118      public void accept(HRegion hRegion) {
119        if (hRegion.getRegionInfo().containsRow(key3)) {
120          Threads.shutdown(ealierBulkload);/// wait util the other bulkload finished
121        }
122      }
123    } ;
124    testUtil.getMiniHBaseCluster().getRegionServerThreads().get(0).getRegionServer()
125        .getSecureBulkLoadManager().setFsCreatedListener(fsCreatedListener);
126    /// create table
127    testUtil.createTable(TABLE,FAMILY,Bytes.toByteArrays(SPLIT_ROWKEY));
128
129    /// prepare files
130    Path rootdir = testUtil.getMiniHBaseCluster().getRegionServerThreads().get(0)
131        .getRegionServer().getDataRootDir();
132    Path dir1 = new Path(rootdir, "dir1");
133    prepareHFile(dir1, key1, value1);
134    Path dir2 = new Path(rootdir, "dir2");
135    prepareHFile(dir2, key3, value3);
136
137    /// do bulkload
138    final AtomicReference<Throwable> t1Exception = new AtomicReference<>();
139    final AtomicReference<Throwable> t2Exception = new AtomicReference<>();
140    ealierBulkload = new Thread(new Runnable() {
141      @Override
142      public void run() {
143        try {
144          doBulkloadWithoutRetry(dir1);
145        } catch (Exception e) {
146          LOG.error("bulk load failed .",e);
147          t1Exception.set(e);
148        }
149      }
150    });
151    laterBulkload = new Thread(new Runnable() {
152      @Override
153      public void run() {
154        try {
155          doBulkloadWithoutRetry(dir2);
156        } catch (Exception e) {
157          LOG.error("bulk load failed .",e);
158          t2Exception.set(e);
159        }
160      }
161    });
162    ealierBulkload.start();
163    laterBulkload.start();
164    Threads.shutdown(ealierBulkload);
165    Threads.shutdown(laterBulkload);
166    Assert.assertNull(t1Exception.get());
167    Assert.assertNull(t2Exception.get());
168
169    /// check bulkload ok
170    Get get1 = new Get(key1);
171    Get get3 = new Get(key3);
172    Table t = testUtil.getConnection().getTable(TABLE);
173    Result r = t.get(get1);
174    Assert.assertArrayEquals(r.getValue(FAMILY, COLUMN), value1);
175    r = t.get(get3);
176    Assert.assertArrayEquals(r.getValue(FAMILY, COLUMN), value3);
177
178  }
179
180  /**
181   * A trick is used to make sure server-side failures( if any ) not being covered up by a client
182   * retry. Since LoadIncrementalHFiles.doBulkLoad keeps performing bulkload calls as long as the
183   * HFile queue is not empty, while server-side exceptions in the doAs block do not lead
184   * to a client exception, a bulkload will always succeed in this case by default, thus client
185   * will never be aware that failures have ever happened . To avoid this kind of retry ,
186   * a MyExceptionToAvoidRetry exception is thrown after bulkLoadPhase finished and caught
187   * silently outside the doBulkLoad call, so that the bulkLoadPhase would be called exactly
188   * once, and server-side failures, if any ,can be checked via data.
189   */
190  class MyExceptionToAvoidRetry extends DoNotRetryIOException {
191  }
192
193  private void doBulkloadWithoutRetry(Path dir) throws Exception {
194    Connection connection = testUtil.getConnection();
195    LoadIncrementalHFiles h = new LoadIncrementalHFiles(conf) {
196      @Override
197      protected void bulkLoadPhase(final Table htable, final Connection conn,
198          ExecutorService pool, Deque<LoadQueueItem> queue,
199          final Multimap<ByteBuffer, LoadQueueItem> regionGroups, boolean copyFile,
200          Map<LoadQueueItem, ByteBuffer> item2RegionMap) throws IOException {
201        super.bulkLoadPhase(htable, conn, pool, queue, regionGroups, copyFile, item2RegionMap);
202        throw new MyExceptionToAvoidRetry(); // throw exception to avoid retry
203      }
204    };
205    try {
206      h.doBulkLoad(dir, testUtil.getAdmin(), connection.getTable(TABLE),
207          connection.getRegionLocator(TABLE));
208      Assert.fail("MyExceptionToAvoidRetry is expected");
209    } catch (MyExceptionToAvoidRetry e) { //expected
210    }
211  }
212
213  private void prepareHFile(Path dir, byte[] key, byte[] value) throws Exception {
214    TableDescriptor desc = testUtil.getAdmin().getDescriptor(TABLE);
215    ColumnFamilyDescriptor family = desc.getColumnFamily(FAMILY);
216    Compression.Algorithm compression = HFile.DEFAULT_COMPRESSION_ALGORITHM;
217
218    CacheConfig writerCacheConf = new CacheConfig(conf, family, null, ByteBuffAllocator.HEAP);
219    writerCacheConf.setCacheDataOnWrite(false);
220    HFileContext hFileContext = new HFileContextBuilder()
221        .withIncludesMvcc(false)
222        .withIncludesTags(true)
223        .withCompression(compression)
224        .withCompressTags(family.isCompressTags())
225        .withChecksumType(HStore.getChecksumType(conf))
226        .withBytesPerCheckSum(HStore.getBytesPerChecksum(conf))
227        .withBlockSize(family.getBlocksize())
228        .withHBaseCheckSum(true)
229        .withDataBlockEncoding(family.getDataBlockEncoding())
230        .withEncryptionContext(Encryption.Context.NONE)
231        .withCreateTime(EnvironmentEdgeManager.currentTime())
232        .build();
233    StoreFileWriter.Builder builder =
234        new StoreFileWriter.Builder(conf, writerCacheConf, dir.getFileSystem(conf))
235        .withOutputDir(new Path(dir, family.getNameAsString()))
236        .withBloomType(family.getBloomFilterType())
237        .withMaxKeyCount(Integer.MAX_VALUE)
238        .withFileContext(hFileContext);
239    StoreFileWriter writer = builder.build();
240
241    Put put = new Put(key);
242    put.addColumn(FAMILY, COLUMN, value);
243    for (Cell c : put.get(FAMILY, COLUMN)) {
244      writer.append(c);
245    }
246
247    writer.close();
248  }
249}