001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.apache.hadoop.hbase.regionserver.HStoreFile.BULKLOAD_TIME_KEY;
021import static org.hamcrest.core.Is.is;
022import static org.junit.Assert.assertThat;
023
024import java.io.IOException;
025import java.io.InterruptedIOException;
026import java.util.ArrayList;
027import java.util.Collection;
028import java.util.Collections;
029import java.util.List;
030import java.util.Map;
031import java.util.Optional;
032import java.util.TreeMap;
033import java.util.concurrent.atomic.AtomicLong;
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.fs.FileSystem;
036import org.apache.hadoop.fs.Path;
037import org.apache.hadoop.hbase.Cell;
038import org.apache.hadoop.hbase.HBaseClassTestRule;
039import org.apache.hadoop.hbase.HBaseConfiguration;
040import org.apache.hadoop.hbase.HBaseTestingUtility;
041import org.apache.hadoop.hbase.HColumnDescriptor;
042import org.apache.hadoop.hbase.HRegionLocation;
043import org.apache.hadoop.hbase.HTableDescriptor;
044import org.apache.hadoop.hbase.KeyValue;
045import org.apache.hadoop.hbase.KeyValueUtil;
046import org.apache.hadoop.hbase.MultithreadedTestUtil.RepeatingTestThread;
047import org.apache.hadoop.hbase.MultithreadedTestUtil.TestContext;
048import org.apache.hadoop.hbase.StartMiniClusterOption;
049import org.apache.hadoop.hbase.TableExistsException;
050import org.apache.hadoop.hbase.TableName;
051import org.apache.hadoop.hbase.client.Connection;
052import org.apache.hadoop.hbase.client.RegionLocator;
053import org.apache.hadoop.hbase.client.Result;
054import org.apache.hadoop.hbase.client.ResultScanner;
055import org.apache.hadoop.hbase.client.Scan;
056import org.apache.hadoop.hbase.client.Table;
057import org.apache.hadoop.hbase.coprocessor.ObserverContext;
058import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
059import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
060import org.apache.hadoop.hbase.coprocessor.RegionObserver;
061import org.apache.hadoop.hbase.io.compress.Compression;
062import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
063import org.apache.hadoop.hbase.io.hfile.CacheConfig;
064import org.apache.hadoop.hbase.io.hfile.HFile;
065import org.apache.hadoop.hbase.io.hfile.HFileContext;
066import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
067import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
068import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
069import org.apache.hadoop.hbase.regionserver.wal.TestWALActionsListener;
070import org.apache.hadoop.hbase.testclassification.LargeTests;
071import org.apache.hadoop.hbase.testclassification.RegionServerTests;
072import org.apache.hadoop.hbase.tool.BulkLoadHFiles;
073import org.apache.hadoop.hbase.util.Bytes;
074import org.apache.hadoop.hbase.wal.WAL;
075import org.apache.hadoop.hbase.wal.WALEdit;
076import org.apache.hadoop.hbase.wal.WALKey;
077import org.junit.BeforeClass;
078import org.junit.ClassRule;
079import org.junit.Test;
080import org.junit.experimental.categories.Category;
081import org.junit.runner.RunWith;
082import org.junit.runners.Parameterized;
083import org.junit.runners.Parameterized.Parameters;
084import org.slf4j.Logger;
085import org.slf4j.LoggerFactory;
086
087import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
088
089/**
090 * Tests bulk loading of HFiles and shows the atomicity or lack of atomicity of
091 * the region server's bullkLoad functionality.
092 */
093@RunWith(Parameterized.class)
094@Category({RegionServerTests.class, LargeTests.class})
095public class TestHRegionServerBulkLoad {
096
097  @ClassRule
098  public static final HBaseClassTestRule CLASS_RULE =
099      HBaseClassTestRule.forClass(TestHRegionServerBulkLoad.class);
100
101  private static final Logger LOG = LoggerFactory.getLogger(TestHRegionServerBulkLoad.class);
102  protected static HBaseTestingUtility UTIL = new HBaseTestingUtility();
103  protected final static Configuration conf = UTIL.getConfiguration();
104  protected final static byte[] QUAL = Bytes.toBytes("qual");
105  protected final static int NUM_CFS = 10;
106  private int sleepDuration;
107  public static int BLOCKSIZE = 64 * 1024;
108  public static Algorithm COMPRESSION = Compression.Algorithm.NONE;
109
110  protected final static byte[][] families = new byte[NUM_CFS][];
111  static {
112    for (int i = 0; i < NUM_CFS; i++) {
113      families[i] = Bytes.toBytes(family(i));
114    }
115  }
116  @Parameters
117  public static final Collection<Object[]> parameters() {
118    int[] sleepDurations = new int[] { 0, 30000 };
119    List<Object[]> configurations = new ArrayList<>();
120    for (int i : sleepDurations) {
121      configurations.add(new Object[] { i });
122    }
123    return configurations;
124  }
125
126  public TestHRegionServerBulkLoad(int duration) {
127    this.sleepDuration = duration;
128  }
129
130  @BeforeClass
131  public static void setUpBeforeClass() throws Exception {
132    conf.setInt("hbase.rpc.timeout", 10 * 1000);
133  }
134
135  /**
136   * Create a rowkey compatible with
137   * {@link #createHFile(FileSystem, Path, byte[], byte[], byte[], int)}.
138   */
139  public static byte[] rowkey(int i) {
140    return Bytes.toBytes(String.format("row_%08d", i));
141  }
142
143  static String family(int i) {
144    return String.format("family_%04d", i);
145  }
146
147  /**
148   * Create an HFile with the given number of rows with a specified value.
149   */
150  public static void createHFile(FileSystem fs, Path path, byte[] family,
151      byte[] qualifier, byte[] value, int numRows) throws IOException {
152    HFileContext context = new HFileContextBuilder().withBlockSize(BLOCKSIZE)
153                            .withCompression(COMPRESSION)
154                            .build();
155    HFile.Writer writer = HFile
156        .getWriterFactory(conf, new CacheConfig(conf))
157        .withPath(fs, path)
158        .withFileContext(context)
159        .create();
160    long now = System.currentTimeMillis();
161    try {
162      // subtract 2 since iterateOnSplits doesn't include boundary keys
163      for (int i = 0; i < numRows; i++) {
164        KeyValue kv = new KeyValue(rowkey(i), family, qualifier, now, value);
165        writer.append(kv);
166      }
167      writer.appendFileInfo(BULKLOAD_TIME_KEY, Bytes.toBytes(now));
168    } finally {
169      writer.close();
170    }
171  }
172
173  /**
174   * Thread that does full scans of the table looking for any partially
175   * completed rows.
176   *
177   * Each iteration of this loads 10 hdfs files, which occupies 5 file open file
178   * handles. So every 10 iterations (500 file handles) it does a region
179   * compaction to reduce the number of open file handles.
180   */
181  public static class AtomicHFileLoader extends RepeatingTestThread {
182    final AtomicLong numBulkLoads = new AtomicLong();
183    final AtomicLong numCompactions = new AtomicLong();
184    private TableName tableName;
185
186    public AtomicHFileLoader(TableName tableName, TestContext ctx,
187        byte targetFamilies[][]) throws IOException {
188      super(ctx);
189      this.tableName = tableName;
190    }
191
192    @Override
193    public void doAnAction() throws Exception {
194      long iteration = numBulkLoads.getAndIncrement();
195      Path dir =  UTIL.getDataTestDirOnTestFS(String.format("bulkLoad_%08d",
196          iteration));
197
198      // create HFiles for different column families
199      FileSystem fs = UTIL.getTestFileSystem();
200      byte[] val = Bytes.toBytes(String.format("%010d", iteration));
201      Map<byte[], List<Path>> family2Files = new TreeMap<>(Bytes.BYTES_COMPARATOR);
202      for (int i = 0; i < NUM_CFS; i++) {
203        Path hfile = new Path(dir, family(i));
204        byte[] fam = Bytes.toBytes(family(i));
205        createHFile(fs, hfile, fam, QUAL, val, 1000);
206        family2Files.put(fam, Collections.singletonList(hfile));
207      }
208      // bulk load HFiles
209      BulkLoadHFiles.create(UTIL.getConfiguration()).bulkLoad(tableName, family2Files);
210      final Connection conn = UTIL.getConnection();
211      // Periodically do compaction to reduce the number of open file handles.
212      if (numBulkLoads.get() % 5 == 0) {
213        // 5 * 50 = 250 open file handles!
214        try (RegionLocator locator = conn.getRegionLocator(tableName)) {
215          HRegionLocation loc = locator.getRegionLocation(Bytes.toBytes("aaa"), true);
216          conn.getAdmin().compactRegion(loc.getRegion().getRegionName());
217          numCompactions.incrementAndGet();
218        }
219      }
220    }
221  }
222
223  public static class MyObserver implements RegionCoprocessor, RegionObserver {
224    static int sleepDuration;
225
226    @Override
227    public Optional<RegionObserver> getRegionObserver() {
228      return Optional.of(this);
229    }
230
231    @Override
232    public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> e, Store store,
233        InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker,
234        CompactionRequest request)
235        throws IOException {
236      try {
237        Thread.sleep(sleepDuration);
238      } catch (InterruptedException ie) {
239        IOException ioe = new InterruptedIOException();
240        ioe.initCause(ie);
241        throw ioe;
242      }
243      return scanner;
244    }
245  }
246
247  /**
248   * Thread that does full scans of the table looking for any partially
249   * completed rows.
250   */
251  public static class AtomicScanReader extends RepeatingTestThread {
252    byte targetFamilies[][];
253    Table table;
254    AtomicLong numScans = new AtomicLong();
255    AtomicLong numRowsScanned = new AtomicLong();
256    TableName TABLE_NAME;
257
258    public AtomicScanReader(TableName TABLE_NAME, TestContext ctx,
259        byte targetFamilies[][]) throws IOException {
260      super(ctx);
261      this.TABLE_NAME = TABLE_NAME;
262      this.targetFamilies = targetFamilies;
263      table = UTIL.getConnection().getTable(TABLE_NAME);
264    }
265
266    @Override
267    public void doAnAction() throws Exception {
268      Scan s = new Scan();
269      for (byte[] family : targetFamilies) {
270        s.addFamily(family);
271      }
272      ResultScanner scanner = table.getScanner(s);
273
274      for (Result res : scanner) {
275        byte[] lastRow = null, lastFam = null, lastQual = null;
276        byte[] gotValue = null;
277        for (byte[] family : targetFamilies) {
278          byte qualifier[] = QUAL;
279          byte thisValue[] = res.getValue(family, qualifier);
280          if (gotValue != null && thisValue != null
281              && !Bytes.equals(gotValue, thisValue)) {
282
283            StringBuilder msg = new StringBuilder();
284            msg.append("Failed on scan ").append(numScans)
285                .append(" after scanning ").append(numRowsScanned)
286                .append(" rows!\n");
287            msg.append("Current  was " + Bytes.toString(res.getRow()) + "/"
288                + Bytes.toString(family) + ":" + Bytes.toString(qualifier)
289                + " = " + Bytes.toString(thisValue) + "\n");
290            msg.append("Previous  was " + Bytes.toString(lastRow) + "/"
291                + Bytes.toString(lastFam) + ":" + Bytes.toString(lastQual)
292                + " = " + Bytes.toString(gotValue));
293            throw new RuntimeException(msg.toString());
294          }
295
296          lastFam = family;
297          lastQual = qualifier;
298          lastRow = res.getRow();
299          gotValue = thisValue;
300        }
301        numRowsScanned.getAndIncrement();
302      }
303      numScans.getAndIncrement();
304    }
305  }
306
307  /**
308   * Creates a table with given table name and specified number of column
309   * families if the table does not already exist.
310   */
311  public void setupTable(TableName table, int cfs) throws IOException {
312    try {
313      LOG.info("Creating table " + table);
314      HTableDescriptor htd = new HTableDescriptor(table);
315      htd.addCoprocessor(MyObserver.class.getName());
316      MyObserver.sleepDuration = this.sleepDuration;
317      for (int i = 0; i < 10; i++) {
318        htd.addFamily(new HColumnDescriptor(family(i)));
319      }
320
321      UTIL.getAdmin().createTable(htd);
322    } catch (TableExistsException tee) {
323      LOG.info("Table " + table + " already exists");
324    }
325  }
326
327  /**
328   * Atomic bulk load.
329   */
330  @Test
331  public void testAtomicBulkLoad() throws Exception {
332    TableName TABLE_NAME = TableName.valueOf("atomicBulkLoad");
333
334    int millisToRun = 30000;
335    int numScanners = 50;
336
337    // Set createWALDir to true and use default values for other options.
338    UTIL.startMiniCluster(StartMiniClusterOption.builder().createWALDir(true).build());
339    try {
340      WAL log = UTIL.getHBaseCluster().getRegionServer(0).getWAL(null);
341      FindBulkHBaseListener listener = new FindBulkHBaseListener();
342      log.registerWALActionsListener(listener);
343      runAtomicBulkloadTest(TABLE_NAME, millisToRun, numScanners);
344      assertThat(listener.isFound(), is(true));
345    } finally {
346      UTIL.shutdownMiniCluster();
347    }
348  }
349
350  void runAtomicBulkloadTest(TableName tableName, int millisToRun, int numScanners)
351      throws Exception {
352    setupTable(tableName, 10);
353
354    TestContext ctx = new TestContext(UTIL.getConfiguration());
355
356    AtomicHFileLoader loader = new AtomicHFileLoader(tableName, ctx, null);
357    ctx.addThread(loader);
358
359    List<AtomicScanReader> scanners = Lists.newArrayList();
360    for (int i = 0; i < numScanners; i++) {
361      AtomicScanReader scanner = new AtomicScanReader(tableName, ctx, families);
362      scanners.add(scanner);
363      ctx.addThread(scanner);
364    }
365
366    ctx.startThreads();
367    ctx.waitFor(millisToRun);
368    ctx.stop();
369
370    LOG.info("Loaders:");
371    LOG.info("  loaded " + loader.numBulkLoads.get());
372    LOG.info("  compations " + loader.numCompactions.get());
373
374    LOG.info("Scanners:");
375    for (AtomicScanReader scanner : scanners) {
376      LOG.info("  scanned " + scanner.numScans.get());
377      LOG.info("  verified " + scanner.numRowsScanned.get() + " rows");
378    }
379  }
380
381  /**
382   * Run test on an HBase instance for 5 minutes. This assumes that the table
383   * under test only has a single region.
384   */
385  public static void main(String args[]) throws Exception {
386    try {
387      Configuration c = HBaseConfiguration.create();
388      TestHRegionServerBulkLoad test = new TestHRegionServerBulkLoad(0);
389      test.setConf(c);
390      test.runAtomicBulkloadTest(TableName.valueOf("atomicTableTest"), 5 * 60 * 1000, 50);
391    } finally {
392      System.exit(0); // something hangs (believe it is lru threadpool)
393    }
394  }
395
396  private void setConf(Configuration c) {
397    UTIL = new HBaseTestingUtility(c);
398  }
399
400  static class FindBulkHBaseListener extends TestWALActionsListener.DummyWALActionsListener {
401    private boolean found = false;
402
403    @Override
404    public void visitLogEntryBeforeWrite(WALKey logKey, WALEdit logEdit) {
405      for (Cell cell : logEdit.getCells()) {
406        KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
407        for (Map.Entry entry : kv.toStringMap().entrySet()) {
408          if (entry.getValue().equals(Bytes.toString(WALEdit.BULK_LOAD))) {
409            found = true;
410          }
411        }
412      }
413    }
414
415    public boolean isFound() {
416      return found;
417    }
418  }
419}
420
421