001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.apache.hadoop.hbase.regionserver.HStoreFile.BULKLOAD_TIME_KEY;
021import static org.hamcrest.MatcherAssert.assertThat;
022import static org.hamcrest.core.Is.is;
023
024import java.io.IOException;
025import java.io.InterruptedIOException;
026import java.util.ArrayList;
027import java.util.Collection;
028import java.util.Collections;
029import java.util.List;
030import java.util.Map;
031import java.util.Optional;
032import java.util.TreeMap;
033import java.util.concurrent.atomic.AtomicLong;
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.fs.FileSystem;
036import org.apache.hadoop.fs.Path;
037import org.apache.hadoop.hbase.Cell;
038import org.apache.hadoop.hbase.HBaseClassTestRule;
039import org.apache.hadoop.hbase.HBaseConfiguration;
040import org.apache.hadoop.hbase.HBaseTestingUtil;
041import org.apache.hadoop.hbase.HRegionLocation;
042import org.apache.hadoop.hbase.KeyValue;
043import org.apache.hadoop.hbase.KeyValueUtil;
044import org.apache.hadoop.hbase.MultithreadedTestUtil.RepeatingTestThread;
045import org.apache.hadoop.hbase.MultithreadedTestUtil.TestContext;
046import org.apache.hadoop.hbase.StartTestingClusterOption;
047import org.apache.hadoop.hbase.TableExistsException;
048import org.apache.hadoop.hbase.TableName;
049import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
050import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
051import org.apache.hadoop.hbase.client.Connection;
052import org.apache.hadoop.hbase.client.RegionInfo;
053import org.apache.hadoop.hbase.client.RegionLocator;
054import org.apache.hadoop.hbase.client.Result;
055import org.apache.hadoop.hbase.client.ResultScanner;
056import org.apache.hadoop.hbase.client.Scan;
057import org.apache.hadoop.hbase.client.Table;
058import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
059import org.apache.hadoop.hbase.coprocessor.ObserverContext;
060import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
061import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
062import org.apache.hadoop.hbase.coprocessor.RegionObserver;
063import org.apache.hadoop.hbase.io.compress.Compression;
064import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
065import org.apache.hadoop.hbase.io.hfile.CacheConfig;
066import org.apache.hadoop.hbase.io.hfile.HFile;
067import org.apache.hadoop.hbase.io.hfile.HFileContext;
068import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
069import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
070import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
071import org.apache.hadoop.hbase.regionserver.wal.TestWALActionsListener;
072import org.apache.hadoop.hbase.testclassification.LargeTests;
073import org.apache.hadoop.hbase.testclassification.RegionServerTests;
074import org.apache.hadoop.hbase.tool.BulkLoadHFiles;
075import org.apache.hadoop.hbase.util.Bytes;
076import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
077import org.apache.hadoop.hbase.wal.WAL;
078import org.apache.hadoop.hbase.wal.WALEdit;
079import org.apache.hadoop.hbase.wal.WALKey;
080import org.junit.BeforeClass;
081import org.junit.ClassRule;
082import org.junit.Test;
083import org.junit.experimental.categories.Category;
084import org.junit.runner.RunWith;
085import org.junit.runners.Parameterized;
086import org.junit.runners.Parameterized.Parameters;
087import org.slf4j.Logger;
088import org.slf4j.LoggerFactory;
089
090import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
091
092/**
093 * Tests bulk loading of HFiles and shows the atomicity or lack of atomicity of
094 * the region server's bullkLoad functionality.
095 */
096@RunWith(Parameterized.class)
097@Category({RegionServerTests.class, LargeTests.class})
098public class TestHRegionServerBulkLoad {
099
100  @ClassRule
101  public static final HBaseClassTestRule CLASS_RULE =
102      HBaseClassTestRule.forClass(TestHRegionServerBulkLoad.class);
103
104  private static final Logger LOG = LoggerFactory.getLogger(TestHRegionServerBulkLoad.class);
105  protected static HBaseTestingUtil UTIL = new HBaseTestingUtil();
106  protected final static Configuration conf = UTIL.getConfiguration();
107  protected final static byte[] QUAL = Bytes.toBytes("qual");
108  protected final static int NUM_CFS = 10;
109  private int sleepDuration;
110  public static int BLOCKSIZE = 64 * 1024;
111  public static Algorithm COMPRESSION = Compression.Algorithm.NONE;
112
113  protected final static byte[][] families = new byte[NUM_CFS][];
114  static {
115    for (int i = 0; i < NUM_CFS; i++) {
116      families[i] = Bytes.toBytes(family(i));
117    }
118  }
119  @Parameters
120  public static final Collection<Object[]> parameters() {
121    int[] sleepDurations = new int[] { 0, 30000 };
122    List<Object[]> configurations = new ArrayList<>();
123    for (int i : sleepDurations) {
124      configurations.add(new Object[] { i });
125    }
126    return configurations;
127  }
128
129  public TestHRegionServerBulkLoad(int duration) {
130    this.sleepDuration = duration;
131  }
132
133  @BeforeClass
134  public static void setUpBeforeClass() throws Exception {
135    conf.setInt("hbase.rpc.timeout", 10 * 1000);
136  }
137
138  /**
139   * Create a rowkey compatible with
140   * {@link #createHFile(FileSystem, Path, byte[], byte[], byte[], int)}.
141   */
142  public static byte[] rowkey(int i) {
143    return Bytes.toBytes(String.format("row_%08d", i));
144  }
145
146  static String family(int i) {
147    return String.format("family_%04d", i);
148  }
149
150  /**
151   * Create an HFile with the given number of rows with a specified value.
152   */
153  public static void createHFile(FileSystem fs, Path path, byte[] family,
154      byte[] qualifier, byte[] value, int numRows) throws IOException {
155    HFileContext context = new HFileContextBuilder().withBlockSize(BLOCKSIZE)
156                            .withCompression(COMPRESSION)
157                            .build();
158    HFile.Writer writer = HFile
159        .getWriterFactory(conf, new CacheConfig(conf))
160        .withPath(fs, path)
161        .withFileContext(context)
162        .create();
163    long now = EnvironmentEdgeManager.currentTime();
164    try {
165      // subtract 2 since iterateOnSplits doesn't include boundary keys
166      for (int i = 0; i < numRows; i++) {
167        KeyValue kv = new KeyValue(rowkey(i), family, qualifier, now, value);
168        writer.append(kv);
169      }
170      writer.appendFileInfo(BULKLOAD_TIME_KEY, Bytes.toBytes(now));
171    } finally {
172      writer.close();
173    }
174  }
175
176  /**
177   * Thread that does full scans of the table looking for any partially
178   * completed rows.
179   *
180   * Each iteration of this loads 10 hdfs files, which occupies 5 file open file
181   * handles. So every 10 iterations (500 file handles) it does a region
182   * compaction to reduce the number of open file handles.
183   */
184  public static class AtomicHFileLoader extends RepeatingTestThread {
185    final AtomicLong numBulkLoads = new AtomicLong();
186    final AtomicLong numCompactions = new AtomicLong();
187    private TableName tableName;
188
189    public AtomicHFileLoader(TableName tableName, TestContext ctx,
190        byte targetFamilies[][]) throws IOException {
191      super(ctx);
192      this.tableName = tableName;
193    }
194
195    @Override
196    public void doAnAction() throws Exception {
197      long iteration = numBulkLoads.getAndIncrement();
198      Path dir =  UTIL.getDataTestDirOnTestFS(String.format("bulkLoad_%08d",
199          iteration));
200
201      // create HFiles for different column families
202      FileSystem fs = UTIL.getTestFileSystem();
203      byte[] val = Bytes.toBytes(String.format("%010d", iteration));
204      Map<byte[], List<Path>> family2Files = new TreeMap<>(Bytes.BYTES_COMPARATOR);
205      for (int i = 0; i < NUM_CFS; i++) {
206        Path hfile = new Path(dir, family(i));
207        byte[] fam = Bytes.toBytes(family(i));
208        createHFile(fs, hfile, fam, QUAL, val, 1000);
209        family2Files.put(fam, Collections.singletonList(hfile));
210      }
211      // bulk load HFiles
212      BulkLoadHFiles.create(UTIL.getConfiguration()).bulkLoad(tableName, family2Files);
213      final Connection conn = UTIL.getConnection();
214      // Periodically do compaction to reduce the number of open file handles.
215      if (numBulkLoads.get() % 5 == 0) {
216        // 5 * 50 = 250 open file handles!
217        try (RegionLocator locator = conn.getRegionLocator(tableName)) {
218          HRegionLocation loc = locator.getRegionLocation(Bytes.toBytes("aaa"), true);
219          conn.getAdmin().compactRegion(loc.getRegion().getRegionName());
220          numCompactions.incrementAndGet();
221        }
222      }
223    }
224  }
225
226  public static class MyObserver implements RegionCoprocessor, RegionObserver {
227    static int sleepDuration;
228
229    @Override
230    public Optional<RegionObserver> getRegionObserver() {
231      return Optional.of(this);
232    }
233
234    @Override
235    public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> e, Store store,
236        InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker,
237        CompactionRequest request)
238        throws IOException {
239      try {
240        Thread.sleep(sleepDuration);
241      } catch (InterruptedException ie) {
242        IOException ioe = new InterruptedIOException();
243        ioe.initCause(ie);
244        throw ioe;
245      }
246      return scanner;
247    }
248  }
249
250  /**
251   * Thread that does full scans of the table looking for any partially
252   * completed rows.
253   */
254  public static class AtomicScanReader extends RepeatingTestThread {
255    byte targetFamilies[][];
256    Table table;
257    AtomicLong numScans = new AtomicLong();
258    AtomicLong numRowsScanned = new AtomicLong();
259    TableName TABLE_NAME;
260
261    public AtomicScanReader(TableName TABLE_NAME, TestContext ctx,
262        byte targetFamilies[][]) throws IOException {
263      super(ctx);
264      this.TABLE_NAME = TABLE_NAME;
265      this.targetFamilies = targetFamilies;
266      table = UTIL.getConnection().getTable(TABLE_NAME);
267    }
268
269    @Override
270    public void doAnAction() throws Exception {
271      Scan s = new Scan();
272      for (byte[] family : targetFamilies) {
273        s.addFamily(family);
274      }
275      ResultScanner scanner = table.getScanner(s);
276
277      for (Result res : scanner) {
278        byte[] lastRow = null, lastFam = null, lastQual = null;
279        byte[] gotValue = null;
280        for (byte[] family : targetFamilies) {
281          byte qualifier[] = QUAL;
282          byte thisValue[] = res.getValue(family, qualifier);
283          if (gotValue != null && thisValue != null
284              && !Bytes.equals(gotValue, thisValue)) {
285
286            StringBuilder msg = new StringBuilder();
287            msg.append("Failed on scan ").append(numScans)
288                .append(" after scanning ").append(numRowsScanned)
289                .append(" rows!\n");
290            msg.append("Current  was " + Bytes.toString(res.getRow()) + "/"
291                + Bytes.toString(family) + ":" + Bytes.toString(qualifier)
292                + " = " + Bytes.toString(thisValue) + "\n");
293            msg.append("Previous  was " + Bytes.toString(lastRow) + "/"
294                + Bytes.toString(lastFam) + ":" + Bytes.toString(lastQual)
295                + " = " + Bytes.toString(gotValue));
296            throw new RuntimeException(msg.toString());
297          }
298
299          lastFam = family;
300          lastQual = qualifier;
301          lastRow = res.getRow();
302          gotValue = thisValue;
303        }
304        numRowsScanned.getAndIncrement();
305      }
306      numScans.getAndIncrement();
307    }
308  }
309
310  /**
311   * Creates a table with given table name and specified number of column
312   * families if the table does not already exist.
313   */
314  public void setupTable(TableName table, int cfs) throws IOException {
315    try {
316      LOG.info("Creating table " + table);
317      TableDescriptorBuilder tableDescriptorBuilder =
318        TableDescriptorBuilder.newBuilder(table);
319
320      tableDescriptorBuilder.setCoprocessor(MyObserver.class.getName());
321      MyObserver.sleepDuration = this.sleepDuration;
322      for (int i = 0; i < 10; i++) {
323        ColumnFamilyDescriptor columnFamilyDescriptor =
324          ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(family(i))).build();
325        tableDescriptorBuilder.setColumnFamily(columnFamilyDescriptor);
326      }
327
328      UTIL.getAdmin().createTable(tableDescriptorBuilder.build());
329    } catch (TableExistsException tee) {
330      LOG.info("Table " + table + " already exists");
331    }
332  }
333
334  /**
335   * Atomic bulk load.
336   */
337  @Test
338  public void testAtomicBulkLoad() throws Exception {
339    TableName TABLE_NAME = TableName.valueOf("atomicBulkLoad");
340
341    int millisToRun = 30000;
342    int numScanners = 50;
343
344    // Set createWALDir to true and use default values for other options.
345    UTIL.startMiniCluster(StartTestingClusterOption.builder().createWALDir(true).build());
346    try {
347      WAL log = UTIL.getHBaseCluster().getRegionServer(0).getWAL(null);
348      FindBulkHBaseListener listener = new FindBulkHBaseListener();
349      log.registerWALActionsListener(listener);
350      runAtomicBulkloadTest(TABLE_NAME, millisToRun, numScanners);
351      assertThat(listener.isFound(), is(true));
352    } finally {
353      UTIL.shutdownMiniCluster();
354    }
355  }
356
357  void runAtomicBulkloadTest(TableName tableName, int millisToRun, int numScanners)
358      throws Exception {
359    setupTable(tableName, 10);
360
361    TestContext ctx = new TestContext(UTIL.getConfiguration());
362
363    AtomicHFileLoader loader = new AtomicHFileLoader(tableName, ctx, null);
364    ctx.addThread(loader);
365
366    List<AtomicScanReader> scanners = Lists.newArrayList();
367    for (int i = 0; i < numScanners; i++) {
368      AtomicScanReader scanner = new AtomicScanReader(tableName, ctx, families);
369      scanners.add(scanner);
370      ctx.addThread(scanner);
371    }
372
373    ctx.startThreads();
374    ctx.waitFor(millisToRun);
375    ctx.stop();
376
377    LOG.info("Loaders:");
378    LOG.info("  loaded " + loader.numBulkLoads.get());
379    LOG.info("  compations " + loader.numCompactions.get());
380
381    LOG.info("Scanners:");
382    for (AtomicScanReader scanner : scanners) {
383      LOG.info("  scanned " + scanner.numScans.get());
384      LOG.info("  verified " + scanner.numRowsScanned.get() + " rows");
385    }
386  }
387
388  /**
389   * Run test on an HBase instance for 5 minutes. This assumes that the table
390   * under test only has a single region.
391   */
392  public static void main(String args[]) throws Exception {
393    try {
394      Configuration c = HBaseConfiguration.create();
395      TestHRegionServerBulkLoad test = new TestHRegionServerBulkLoad(0);
396      test.setConf(c);
397      test.runAtomicBulkloadTest(TableName.valueOf("atomicTableTest"), 5 * 60 * 1000, 50);
398    } finally {
399      System.exit(0); // something hangs (believe it is lru threadpool)
400    }
401  }
402
403  private void setConf(Configuration c) {
404    UTIL = new HBaseTestingUtil(c);
405  }
406
407  static class FindBulkHBaseListener extends TestWALActionsListener.DummyWALActionsListener {
408    private boolean found = false;
409
410    @Override
411    public void visitLogEntryBeforeWrite(RegionInfo info, WALKey logKey, WALEdit logEdit) {
412      for (Cell cell : logEdit.getCells()) {
413        KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
414        for (Map.Entry entry : kv.toStringMap().entrySet()) {
415          if (entry.getValue().equals(Bytes.toString(WALEdit.BULK_LOAD))) {
416            found = true;
417          }
418        }
419      }
420    }
421
422    public boolean isFound() {
423      return found;
424    }
425  }
426}
427
428