001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.apache.hadoop.hbase.regionserver.HStoreFile.BULKLOAD_TIME_KEY;
021import static org.hamcrest.MatcherAssert.assertThat;
022import static org.hamcrest.core.Is.is;
023
024import java.io.IOException;
025import java.io.InterruptedIOException;
026import java.util.ArrayList;
027import java.util.Collections;
028import java.util.List;
029import java.util.Map;
030import java.util.Optional;
031import java.util.TreeMap;
032import java.util.concurrent.atomic.AtomicLong;
033import java.util.stream.Stream;
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.fs.FileSystem;
036import org.apache.hadoop.fs.Path;
037import org.apache.hadoop.hbase.Cell;
038import org.apache.hadoop.hbase.ExtendedCell;
039import org.apache.hadoop.hbase.HBaseConfiguration;
040import org.apache.hadoop.hbase.HBaseParameterizedTestTemplate;
041import org.apache.hadoop.hbase.HBaseTestingUtil;
042import org.apache.hadoop.hbase.HRegionLocation;
043import org.apache.hadoop.hbase.KeyValue;
044import org.apache.hadoop.hbase.KeyValueUtil;
045import org.apache.hadoop.hbase.MultithreadedTestUtil.RepeatingTestThread;
046import org.apache.hadoop.hbase.MultithreadedTestUtil.TestContext;
047import org.apache.hadoop.hbase.StartTestingClusterOption;
048import org.apache.hadoop.hbase.TableExistsException;
049import org.apache.hadoop.hbase.TableName;
050import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
051import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
052import org.apache.hadoop.hbase.client.Connection;
053import org.apache.hadoop.hbase.client.RegionInfo;
054import org.apache.hadoop.hbase.client.RegionLocator;
055import org.apache.hadoop.hbase.client.Result;
056import org.apache.hadoop.hbase.client.ResultScanner;
057import org.apache.hadoop.hbase.client.Scan;
058import org.apache.hadoop.hbase.client.Table;
059import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
060import org.apache.hadoop.hbase.coprocessor.ObserverContext;
061import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
062import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
063import org.apache.hadoop.hbase.coprocessor.RegionObserver;
064import org.apache.hadoop.hbase.io.compress.Compression;
065import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
066import org.apache.hadoop.hbase.io.hfile.CacheConfig;
067import org.apache.hadoop.hbase.io.hfile.HFile;
068import org.apache.hadoop.hbase.io.hfile.HFileContext;
069import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
070import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
071import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
072import org.apache.hadoop.hbase.regionserver.wal.TestWALActionsListener;
073import org.apache.hadoop.hbase.testclassification.LargeTests;
074import org.apache.hadoop.hbase.testclassification.RegionServerTests;
075import org.apache.hadoop.hbase.tool.BulkLoadHFiles;
076import org.apache.hadoop.hbase.util.Bytes;
077import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
078import org.apache.hadoop.hbase.wal.WAL;
079import org.apache.hadoop.hbase.wal.WALEdit;
080import org.apache.hadoop.hbase.wal.WALKey;
081import org.junit.jupiter.api.BeforeAll;
082import org.junit.jupiter.api.Tag;
083import org.junit.jupiter.api.TestTemplate;
084import org.junit.jupiter.params.provider.Arguments;
085import org.slf4j.Logger;
086import org.slf4j.LoggerFactory;
087
088import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
089
090/**
091 * Tests bulk loading of HFiles and shows the atomicity or lack of atomicity of the region server's
092 * bullkLoad functionality.
093 */
094@Tag(RegionServerTests.TAG)
095@Tag(LargeTests.TAG)
096@HBaseParameterizedTestTemplate(name = "{index}: duration={0}")
097public class TestHRegionServerBulkLoad {
098
099  private static final Logger LOG = LoggerFactory.getLogger(TestHRegionServerBulkLoad.class);
100  protected static HBaseTestingUtil UTIL = new HBaseTestingUtil();
101  protected final static Configuration conf = UTIL.getConfiguration();
102  protected final static byte[] QUAL = Bytes.toBytes("qual");
103  protected final static int NUM_CFS = 10;
104  private int sleepDuration;
105  public static int BLOCKSIZE = 64 * 1024;
106  public static Algorithm COMPRESSION = Compression.Algorithm.NONE;
107
108  protected final static byte[][] families = new byte[NUM_CFS][];
109  static {
110    for (int i = 0; i < NUM_CFS; i++) {
111      families[i] = Bytes.toBytes(family(i));
112    }
113  }
114
115  public static final Stream<Arguments> parameters() {
116    int[] sleepDurations = new int[] { 0, 30000 };
117    List<Arguments> configurations = new ArrayList<>();
118    for (int i : sleepDurations) {
119      configurations.add(Arguments.of(i));
120    }
121    return configurations.stream();
122  }
123
124  public TestHRegionServerBulkLoad(int duration) {
125    this.sleepDuration = duration;
126  }
127
128  @BeforeAll
129  public static void setUpBeforeClass() throws Exception {
130    conf.setInt("hbase.rpc.timeout", 10 * 1000);
131  }
132
133  /**
134   * Create a rowkey compatible with
135   * {@link #createHFile(FileSystem, Path, byte[], byte[], byte[], int)}.
136   */
137  public static byte[] rowkey(int i) {
138    return Bytes.toBytes(String.format("row_%08d", i));
139  }
140
141  static String family(int i) {
142    return String.format("family_%04d", i);
143  }
144
145  /**
146   * Create an HFile with the given number of rows with a specified value.
147   */
148  public static void createHFile(FileSystem fs, Path path, byte[] family, byte[] qualifier,
149    byte[] value, int numRows) throws IOException {
150    HFileContext context =
151      new HFileContextBuilder().withBlockSize(BLOCKSIZE).withCompression(COMPRESSION).build();
152    HFile.Writer writer = HFile.getWriterFactory(conf, new CacheConfig(conf)).withPath(fs, path)
153      .withFileContext(context).create();
154    long now = EnvironmentEdgeManager.currentTime();
155    try {
156      // subtract 2 since iterateOnSplits doesn't include boundary keys
157      for (int i = 0; i < numRows; i++) {
158        KeyValue kv = new KeyValue(rowkey(i), family, qualifier, now, value);
159        writer.append(kv);
160      }
161      writer.appendFileInfo(BULKLOAD_TIME_KEY, Bytes.toBytes(now));
162    } finally {
163      writer.close();
164    }
165  }
166
167  /**
168   * Thread that does full scans of the table looking for any partially completed rows. Each
169   * iteration of this loads 10 hdfs files, which occupies 5 file open file handles. So every 10
170   * iterations (500 file handles) it does a region compaction to reduce the number of open file
171   * handles.
172   */
173  public static class AtomicHFileLoader extends RepeatingTestThread {
174    final AtomicLong numBulkLoads = new AtomicLong();
175    final AtomicLong numCompactions = new AtomicLong();
176    private TableName tableName;
177
178    public AtomicHFileLoader(TableName tableName, TestContext ctx, byte targetFamilies[][])
179      throws IOException {
180      super(ctx);
181      this.tableName = tableName;
182    }
183
184    @Override
185    public void doAnAction() throws Exception {
186      long iteration = numBulkLoads.getAndIncrement();
187      Path dir = UTIL.getDataTestDirOnTestFS(String.format("bulkLoad_%08d", iteration));
188
189      // create HFiles for different column families
190      FileSystem fs = UTIL.getTestFileSystem();
191      byte[] val = Bytes.toBytes(String.format("%010d", iteration));
192      Map<byte[], List<Path>> family2Files = new TreeMap<>(Bytes.BYTES_COMPARATOR);
193      for (int i = 0; i < NUM_CFS; i++) {
194        Path hfile = new Path(dir, family(i));
195        byte[] fam = Bytes.toBytes(family(i));
196        createHFile(fs, hfile, fam, QUAL, val, 1000);
197        family2Files.put(fam, Collections.singletonList(hfile));
198      }
199      // bulk load HFiles
200      BulkLoadHFiles.create(UTIL.getConfiguration()).bulkLoad(tableName, family2Files);
201      final Connection conn = UTIL.getConnection();
202      // Periodically do compaction to reduce the number of open file handles.
203      if (numBulkLoads.get() % 5 == 0) {
204        // 5 * 50 = 250 open file handles!
205        try (RegionLocator locator = conn.getRegionLocator(tableName)) {
206          HRegionLocation loc = locator.getRegionLocation(Bytes.toBytes("aaa"), true);
207          conn.getAdmin().compactRegion(loc.getRegion().getRegionName());
208          numCompactions.incrementAndGet();
209        }
210      }
211    }
212  }
213
214  public static class MyObserver implements RegionCoprocessor, RegionObserver {
215    static int sleepDuration;
216
217    @Override
218    public Optional<RegionObserver> getRegionObserver() {
219      return Optional.of(this);
220    }
221
222    @Override
223    public InternalScanner preCompact(ObserverContext<? extends RegionCoprocessorEnvironment> e,
224      Store store, InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker,
225      CompactionRequest request) throws IOException {
226      try {
227        Thread.sleep(sleepDuration);
228      } catch (InterruptedException ie) {
229        IOException ioe = new InterruptedIOException();
230        ioe.initCause(ie);
231        throw ioe;
232      }
233      return scanner;
234    }
235  }
236
237  /**
238   * Thread that does full scans of the table looking for any partially completed rows.
239   */
240  public static class AtomicScanReader extends RepeatingTestThread {
241    byte targetFamilies[][];
242    Table table;
243    AtomicLong numScans = new AtomicLong();
244    AtomicLong numRowsScanned = new AtomicLong();
245    TableName TABLE_NAME;
246
247    public AtomicScanReader(TableName TABLE_NAME, TestContext ctx, byte targetFamilies[][])
248      throws IOException {
249      super(ctx);
250      this.TABLE_NAME = TABLE_NAME;
251      this.targetFamilies = targetFamilies;
252      table = UTIL.getConnection().getTable(TABLE_NAME);
253    }
254
255    @Override
256    public void doAnAction() throws Exception {
257      Scan s = new Scan();
258      for (byte[] family : targetFamilies) {
259        s.addFamily(family);
260      }
261      ResultScanner scanner = table.getScanner(s);
262
263      for (Result res : scanner) {
264        byte[] lastRow = null, lastFam = null, lastQual = null;
265        byte[] gotValue = null;
266        for (byte[] family : targetFamilies) {
267          byte qualifier[] = QUAL;
268          byte thisValue[] = res.getValue(family, qualifier);
269          if (gotValue != null && thisValue != null && !Bytes.equals(gotValue, thisValue)) {
270
271            StringBuilder msg = new StringBuilder();
272            msg.append("Failed on scan ").append(numScans).append(" after scanning ")
273              .append(numRowsScanned).append(" rows!\n");
274            msg.append("Current  was " + Bytes.toString(res.getRow()) + "/" + Bytes.toString(family)
275              + ":" + Bytes.toString(qualifier) + " = " + Bytes.toString(thisValue) + "\n");
276            msg.append("Previous  was " + Bytes.toString(lastRow) + "/" + Bytes.toString(lastFam)
277              + ":" + Bytes.toString(lastQual) + " = " + Bytes.toString(gotValue));
278            throw new RuntimeException(msg.toString());
279          }
280
281          lastFam = family;
282          lastQual = qualifier;
283          lastRow = res.getRow();
284          gotValue = thisValue;
285        }
286        numRowsScanned.getAndIncrement();
287      }
288      numScans.getAndIncrement();
289    }
290  }
291
292  /**
293   * Creates a table with given table name and specified number of column families if the table does
294   * not already exist.
295   */
296  public void setupTable(TableName table, int cfs) throws IOException {
297    try {
298      LOG.info("Creating table " + table);
299      TableDescriptorBuilder tableDescriptorBuilder = TableDescriptorBuilder.newBuilder(table);
300
301      tableDescriptorBuilder.setCoprocessor(MyObserver.class.getName());
302      MyObserver.sleepDuration = this.sleepDuration;
303      for (int i = 0; i < 10; i++) {
304        ColumnFamilyDescriptor columnFamilyDescriptor =
305          ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(family(i))).build();
306        tableDescriptorBuilder.setColumnFamily(columnFamilyDescriptor);
307      }
308
309      UTIL.getAdmin().createTable(tableDescriptorBuilder.build());
310    } catch (TableExistsException tee) {
311      LOG.info("Table " + table + " already exists");
312    }
313  }
314
315  /**
316   * Atomic bulk load.
317   */
318  @TestTemplate
319  public void testAtomicBulkLoad() throws Exception {
320    TableName TABLE_NAME = TableName.valueOf("atomicBulkLoad");
321
322    int millisToRun = 30000;
323    int numScanners = 50;
324
325    // Set createWALDir to true and use default values for other options.
326    UTIL.startMiniCluster(StartTestingClusterOption.builder().createWALDir(true).build());
327    try {
328      WAL log = UTIL.getHBaseCluster().getRegionServer(0).getWAL(null);
329      FindBulkHBaseListener listener = new FindBulkHBaseListener();
330      log.registerWALActionsListener(listener);
331      runAtomicBulkloadTest(TABLE_NAME, millisToRun, numScanners);
332      assertThat(listener.isFound(), is(true));
333    } finally {
334      UTIL.shutdownMiniCluster();
335    }
336  }
337
338  void runAtomicBulkloadTest(TableName tableName, int millisToRun, int numScanners)
339    throws Exception {
340    setupTable(tableName, 10);
341
342    TestContext ctx = new TestContext(UTIL.getConfiguration());
343
344    AtomicHFileLoader loader = new AtomicHFileLoader(tableName, ctx, null);
345    ctx.addThread(loader);
346
347    List<AtomicScanReader> scanners = Lists.newArrayList();
348    for (int i = 0; i < numScanners; i++) {
349      AtomicScanReader scanner = new AtomicScanReader(tableName, ctx, families);
350      scanners.add(scanner);
351      ctx.addThread(scanner);
352    }
353
354    ctx.startThreads();
355    ctx.waitFor(millisToRun);
356    ctx.stop();
357
358    LOG.info("Loaders:");
359    LOG.info("  loaded " + loader.numBulkLoads.get());
360    LOG.info("  compations " + loader.numCompactions.get());
361
362    LOG.info("Scanners:");
363    for (AtomicScanReader scanner : scanners) {
364      LOG.info("  scanned " + scanner.numScans.get());
365      LOG.info("  verified " + scanner.numRowsScanned.get() + " rows");
366    }
367  }
368
369  /**
370   * Run test on an HBase instance for 5 minutes. This assumes that the table under test only has a
371   * single region.
372   */
373  public static void main(String args[]) throws Exception {
374    try {
375      Configuration c = HBaseConfiguration.create();
376      TestHRegionServerBulkLoad test = new TestHRegionServerBulkLoad(0);
377      test.setConf(c);
378      test.runAtomicBulkloadTest(TableName.valueOf("atomicTableTest"), 5 * 60 * 1000, 50);
379    } finally {
380      System.exit(0); // something hangs (believe it is lru threadpool)
381    }
382  }
383
384  private void setConf(Configuration c) {
385    UTIL = new HBaseTestingUtil(c);
386  }
387
388  static class FindBulkHBaseListener extends TestWALActionsListener.DummyWALActionsListener {
389    private boolean found = false;
390
391    @Override
392    public void visitLogEntryBeforeWrite(RegionInfo info, WALKey logKey, WALEdit logEdit) {
393      for (Cell cell : logEdit.getCells()) {
394        KeyValue kv = KeyValueUtil.ensureKeyValue((ExtendedCell) cell);
395        for (Map.Entry<String, Object> entry : kv.toStringMap().entrySet()) {
396          if (entry.getValue().equals(Bytes.toString(WALEdit.BULK_LOAD))) {
397            found = true;
398          }
399        }
400      }
401    }
402
403    public boolean isFound() {
404      return found;
405    }
406  }
407}