001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.apache.hadoop.hbase.regionserver.HStoreFile.BULKLOAD_TIME_KEY;
021import static org.hamcrest.MatcherAssert.assertThat;
022import static org.hamcrest.core.Is.is;
023
024import java.io.IOException;
025import java.io.InterruptedIOException;
026import java.util.ArrayList;
027import java.util.Collection;
028import java.util.List;
029import java.util.Map;
030import java.util.Optional;
031import java.util.concurrent.atomic.AtomicLong;
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.fs.FileSystem;
034import org.apache.hadoop.fs.Path;
035import org.apache.hadoop.hbase.Cell;
036import org.apache.hadoop.hbase.HBaseClassTestRule;
037import org.apache.hadoop.hbase.HBaseConfiguration;
038import org.apache.hadoop.hbase.HBaseTestingUtility;
039import org.apache.hadoop.hbase.HColumnDescriptor;
040import org.apache.hadoop.hbase.HConstants;
041import org.apache.hadoop.hbase.HTableDescriptor;
042import org.apache.hadoop.hbase.KeyValue;
043import org.apache.hadoop.hbase.KeyValueUtil;
044import org.apache.hadoop.hbase.MultithreadedTestUtil.RepeatingTestThread;
045import org.apache.hadoop.hbase.MultithreadedTestUtil.TestContext;
046import org.apache.hadoop.hbase.StartMiniClusterOption;
047import org.apache.hadoop.hbase.TableExistsException;
048import org.apache.hadoop.hbase.TableName;
049import org.apache.hadoop.hbase.client.ClientServiceCallable;
050import org.apache.hadoop.hbase.client.ClusterConnection;
051import org.apache.hadoop.hbase.client.RegionInfo;
052import org.apache.hadoop.hbase.client.Result;
053import org.apache.hadoop.hbase.client.ResultScanner;
054import org.apache.hadoop.hbase.client.RpcRetryingCaller;
055import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
056import org.apache.hadoop.hbase.client.Scan;
057import org.apache.hadoop.hbase.client.SecureBulkLoadClient;
058import org.apache.hadoop.hbase.client.Table;
059import org.apache.hadoop.hbase.coprocessor.ObserverContext;
060import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
061import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
062import org.apache.hadoop.hbase.coprocessor.RegionObserver;
063import org.apache.hadoop.hbase.io.compress.Compression;
064import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
065import org.apache.hadoop.hbase.io.hfile.CacheConfig;
066import org.apache.hadoop.hbase.io.hfile.HFile;
067import org.apache.hadoop.hbase.io.hfile.HFileContext;
068import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
069import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
070import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
071import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
072import org.apache.hadoop.hbase.regionserver.wal.TestWALActionsListener;
073import org.apache.hadoop.hbase.testclassification.LargeTests;
074import org.apache.hadoop.hbase.testclassification.RegionServerTests;
075import org.apache.hadoop.hbase.util.Bytes;
076import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
077import org.apache.hadoop.hbase.util.Pair;
078import org.apache.hadoop.hbase.wal.WAL;
079import org.apache.hadoop.hbase.wal.WALEdit;
080import org.apache.hadoop.hbase.wal.WALKey;
081import org.junit.BeforeClass;
082import org.junit.ClassRule;
083import org.junit.Test;
084import org.junit.experimental.categories.Category;
085import org.junit.runner.RunWith;
086import org.junit.runners.Parameterized;
087import org.junit.runners.Parameterized.Parameters;
088import org.slf4j.Logger;
089import org.slf4j.LoggerFactory;
090
091import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
092
093import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
094import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
095import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest;
096
097/**
098 * Tests bulk loading of HFiles and shows the atomicity or lack of atomicity of the region server's
099 * bullkLoad functionality.
100 */
101@RunWith(Parameterized.class)
102@Category({ RegionServerTests.class, LargeTests.class })
103public class TestHRegionServerBulkLoad {
104
105  @ClassRule
106  public static final HBaseClassTestRule CLASS_RULE =
107    HBaseClassTestRule.forClass(TestHRegionServerBulkLoad.class);
108
109  private static final Logger LOG = LoggerFactory.getLogger(TestHRegionServerBulkLoad.class);
110  protected static HBaseTestingUtility UTIL = new HBaseTestingUtility();
111  protected final static Configuration conf = UTIL.getConfiguration();
112  protected final static byte[] QUAL = Bytes.toBytes("qual");
113  protected final static int NUM_CFS = 10;
114  private int sleepDuration;
115  public static int BLOCKSIZE = 64 * 1024;
116  public static Algorithm COMPRESSION = Compression.Algorithm.NONE;
117
118  protected final static byte[][] families = new byte[NUM_CFS][];
119  static {
120    for (int i = 0; i < NUM_CFS; i++) {
121      families[i] = Bytes.toBytes(family(i));
122    }
123  }
124
125  @Parameters
126  public static final Collection<Object[]> parameters() {
127    int[] sleepDurations = new int[] { 0, 30000 };
128    List<Object[]> configurations = new ArrayList<>();
129    for (int i : sleepDurations) {
130      configurations.add(new Object[] { i });
131    }
132    return configurations;
133  }
134
135  public TestHRegionServerBulkLoad(int duration) {
136    this.sleepDuration = duration;
137  }
138
139  @BeforeClass
140  public static void setUpBeforeClass() throws Exception {
141    conf.setInt("hbase.rpc.timeout", 10 * 1000);
142  }
143
144  /**
145   * Create a rowkey compatible with
146   * {@link #createHFile(FileSystem, Path, byte[], byte[], byte[], int)}.
147   */
148  public static byte[] rowkey(int i) {
149    return Bytes.toBytes(String.format("row_%08d", i));
150  }
151
152  static String family(int i) {
153    return String.format("family_%04d", i);
154  }
155
156  /**
157   * Create an HFile with the given number of rows with a specified value.
158   */
159  public static void createHFile(FileSystem fs, Path path, byte[] family, byte[] qualifier,
160    byte[] value, int numRows) throws IOException {
161    HFileContext context =
162      new HFileContextBuilder().withBlockSize(BLOCKSIZE).withCompression(COMPRESSION).build();
163    HFile.Writer writer = HFile.getWriterFactory(conf, new CacheConfig(conf)).withPath(fs, path)
164      .withFileContext(context).create();
165    long now = EnvironmentEdgeManager.currentTime();
166    try {
167      // subtract 2 since iterateOnSplits doesn't include boundary keys
168      for (int i = 0; i < numRows; i++) {
169        KeyValue kv = new KeyValue(rowkey(i), family, qualifier, now, value);
170        writer.append(kv);
171      }
172      writer.appendFileInfo(BULKLOAD_TIME_KEY, Bytes.toBytes(now));
173    } finally {
174      writer.close();
175    }
176  }
177
178  /**
179   * Thread that does full scans of the table looking for any partially completed rows. Each
180   * iteration of this loads 10 hdfs files, which occupies 5 file open file handles. So every 10
181   * iterations (500 file handles) it does a region compaction to reduce the number of open file
182   * handles.
183   */
184  public static class AtomicHFileLoader extends RepeatingTestThread {
185    final AtomicLong numBulkLoads = new AtomicLong();
186    final AtomicLong numCompactions = new AtomicLong();
187    private TableName tableName;
188
189    public AtomicHFileLoader(TableName tableName, TestContext ctx, byte targetFamilies[][])
190      throws IOException {
191      super(ctx);
192      this.tableName = tableName;
193    }
194
195    @Override
196    public void doAnAction() throws Exception {
197      long iteration = numBulkLoads.getAndIncrement();
198      Path dir = UTIL.getDataTestDirOnTestFS(String.format("bulkLoad_%08d", iteration));
199
200      // create HFiles for different column families
201      FileSystem fs = UTIL.getTestFileSystem();
202      byte[] val = Bytes.toBytes(String.format("%010d", iteration));
203      final List<Pair<byte[], String>> famPaths = new ArrayList<>(NUM_CFS);
204      for (int i = 0; i < NUM_CFS; i++) {
205        Path hfile = new Path(dir, family(i));
206        byte[] fam = Bytes.toBytes(family(i));
207        createHFile(fs, hfile, fam, QUAL, val, 1000);
208        famPaths.add(new Pair<>(fam, hfile.toString()));
209      }
210
211      // bulk load HFiles
212      final ClusterConnection conn = (ClusterConnection) UTIL.getConnection();
213      Table table = conn.getTable(tableName);
214      final String bulkToken =
215        new SecureBulkLoadClient(UTIL.getConfiguration(), table).prepareBulkLoad(conn);
216      ClientServiceCallable<Void> callable = new ClientServiceCallable<Void>(conn, tableName,
217        Bytes.toBytes("aaa"), new RpcControllerFactory(UTIL.getConfiguration()).newController(),
218        HConstants.PRIORITY_UNSET) {
219        @Override
220        public Void rpcCall() throws Exception {
221          LOG.debug("Going to connect to server " + getLocation() + " for row "
222            + Bytes.toStringBinary(getRow()));
223          SecureBulkLoadClient secureClient = null;
224          byte[] regionName = getLocation().getRegionInfo().getRegionName();
225          try (Table table = conn.getTable(getTableName())) {
226            secureClient = new SecureBulkLoadClient(UTIL.getConfiguration(), table);
227            secureClient.secureBulkLoadHFiles(getStub(), famPaths, regionName, true, null,
228              bulkToken);
229          }
230          return null;
231        }
232      };
233      RpcRetryingCallerFactory factory = new RpcRetryingCallerFactory(conf);
234      RpcRetryingCaller<Void> caller = factory.<Void> newCaller();
235      caller.callWithRetries(callable, Integer.MAX_VALUE);
236
237      // Periodically do compaction to reduce the number of open file handles.
238      if (numBulkLoads.get() % 5 == 0) {
239        // 5 * 50 = 250 open file handles!
240        callable = new ClientServiceCallable<Void>(conn, tableName, Bytes.toBytes("aaa"),
241          new RpcControllerFactory(UTIL.getConfiguration()).newController(),
242          HConstants.PRIORITY_UNSET) {
243          @Override
244          protected Void rpcCall() throws Exception {
245            LOG.debug("compacting " + getLocation() + " for row " + Bytes.toStringBinary(getRow()));
246            AdminProtos.AdminService.BlockingInterface server =
247              conn.getAdmin(getLocation().getServerName());
248            CompactRegionRequest request = RequestConverter
249              .buildCompactRegionRequest(getLocation().getRegionInfo().getRegionName(), true, null);
250            server.compactRegion(null, request);
251            numCompactions.incrementAndGet();
252            return null;
253          }
254        };
255        caller.callWithRetries(callable, Integer.MAX_VALUE);
256      }
257    }
258  }
259
260  public static class MyObserver implements RegionCoprocessor, RegionObserver {
261    static int sleepDuration;
262
263    @Override
264    public Optional<RegionObserver> getRegionObserver() {
265      return Optional.of(this);
266    }
267
268    @Override
269    public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> e, Store store,
270      InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker,
271      CompactionRequest request) throws IOException {
272      try {
273        Thread.sleep(sleepDuration);
274      } catch (InterruptedException ie) {
275        IOException ioe = new InterruptedIOException();
276        ioe.initCause(ie);
277        throw ioe;
278      }
279      return scanner;
280    }
281  }
282
283  /**
284   * Thread that does full scans of the table looking for any partially completed rows.
285   */
286  public static class AtomicScanReader extends RepeatingTestThread {
287    byte targetFamilies[][];
288    Table table;
289    AtomicLong numScans = new AtomicLong();
290    AtomicLong numRowsScanned = new AtomicLong();
291    TableName TABLE_NAME;
292
293    public AtomicScanReader(TableName TABLE_NAME, TestContext ctx, byte targetFamilies[][])
294      throws IOException {
295      super(ctx);
296      this.TABLE_NAME = TABLE_NAME;
297      this.targetFamilies = targetFamilies;
298      table = UTIL.getConnection().getTable(TABLE_NAME);
299    }
300
301    @Override
302    public void doAnAction() throws Exception {
303      Scan s = new Scan();
304      for (byte[] family : targetFamilies) {
305        s.addFamily(family);
306      }
307      ResultScanner scanner = table.getScanner(s);
308
309      for (Result res : scanner) {
310        byte[] lastRow = null, lastFam = null, lastQual = null;
311        byte[] gotValue = null;
312        for (byte[] family : targetFamilies) {
313          byte qualifier[] = QUAL;
314          byte thisValue[] = res.getValue(family, qualifier);
315          if (gotValue != null && thisValue != null && !Bytes.equals(gotValue, thisValue)) {
316
317            StringBuilder msg = new StringBuilder();
318            msg.append("Failed on scan ").append(numScans).append(" after scanning ")
319              .append(numRowsScanned).append(" rows!\n");
320            msg.append("Current  was " + Bytes.toString(res.getRow()) + "/" + Bytes.toString(family)
321              + ":" + Bytes.toString(qualifier) + " = " + Bytes.toString(thisValue) + "\n");
322            msg.append("Previous  was " + Bytes.toString(lastRow) + "/" + Bytes.toString(lastFam)
323              + ":" + Bytes.toString(lastQual) + " = " + Bytes.toString(gotValue));
324            throw new RuntimeException(msg.toString());
325          }
326
327          lastFam = family;
328          lastQual = qualifier;
329          lastRow = res.getRow();
330          gotValue = thisValue;
331        }
332        numRowsScanned.getAndIncrement();
333      }
334      numScans.getAndIncrement();
335    }
336  }
337
338  /**
339   * Creates a table with given table name and specified number of column families if the table does
340   * not already exist.
341   */
342  public void setupTable(TableName table, int cfs) throws IOException {
343    try {
344      LOG.info("Creating table " + table);
345      HTableDescriptor htd = new HTableDescriptor(table);
346      htd.addCoprocessor(MyObserver.class.getName());
347      MyObserver.sleepDuration = this.sleepDuration;
348      for (int i = 0; i < 10; i++) {
349        htd.addFamily(new HColumnDescriptor(family(i)));
350      }
351
352      UTIL.getAdmin().createTable(htd);
353    } catch (TableExistsException tee) {
354      LOG.info("Table " + table + " already exists");
355    }
356  }
357
358  /**
359   * Atomic bulk load.
360   */
361  @Test
362  public void testAtomicBulkLoad() throws Exception {
363    TableName TABLE_NAME = TableName.valueOf("atomicBulkLoad");
364
365    int millisToRun = 30000;
366    int numScanners = 50;
367
368    // Set createWALDir to true and use default values for other options.
369    UTIL.startMiniCluster(StartMiniClusterOption.builder().createWALDir(true).build());
370    try {
371      WAL log = UTIL.getHBaseCluster().getRegionServer(0).getWAL(null);
372      FindBulkHBaseListener listener = new FindBulkHBaseListener();
373      log.registerWALActionsListener(listener);
374      runAtomicBulkloadTest(TABLE_NAME, millisToRun, numScanners);
375      assertThat(listener.isFound(), is(true));
376    } finally {
377      UTIL.shutdownMiniCluster();
378    }
379  }
380
381  void runAtomicBulkloadTest(TableName tableName, int millisToRun, int numScanners)
382    throws Exception {
383    setupTable(tableName, 10);
384
385    TestContext ctx = new TestContext(UTIL.getConfiguration());
386
387    AtomicHFileLoader loader = new AtomicHFileLoader(tableName, ctx, null);
388    ctx.addThread(loader);
389
390    List<AtomicScanReader> scanners = Lists.newArrayList();
391    for (int i = 0; i < numScanners; i++) {
392      AtomicScanReader scanner = new AtomicScanReader(tableName, ctx, families);
393      scanners.add(scanner);
394      ctx.addThread(scanner);
395    }
396
397    ctx.startThreads();
398    ctx.waitFor(millisToRun);
399    ctx.stop();
400
401    LOG.info("Loaders:");
402    LOG.info("  loaded " + loader.numBulkLoads.get());
403    LOG.info("  compations " + loader.numCompactions.get());
404
405    LOG.info("Scanners:");
406    for (AtomicScanReader scanner : scanners) {
407      LOG.info("  scanned " + scanner.numScans.get());
408      LOG.info("  verified " + scanner.numRowsScanned.get() + " rows");
409    }
410  }
411
412  /**
413   * Run test on an HBase instance for 5 minutes. This assumes that the table under test only has a
414   * single region.
415   */
416  public static void main(String args[]) throws Exception {
417    try {
418      Configuration c = HBaseConfiguration.create();
419      TestHRegionServerBulkLoad test = new TestHRegionServerBulkLoad(0);
420      test.setConf(c);
421      test.runAtomicBulkloadTest(TableName.valueOf("atomicTableTest"), 5 * 60 * 1000, 50);
422    } finally {
423      System.exit(0); // something hangs (believe it is lru threadpool)
424    }
425  }
426
427  private void setConf(Configuration c) {
428    UTIL = new HBaseTestingUtility(c);
429  }
430
431  static class FindBulkHBaseListener extends TestWALActionsListener.DummyWALActionsListener {
432    private boolean found = false;
433
434    @Override
435    public void visitLogEntryBeforeWrite(RegionInfo info, WALKey logKey, WALEdit logEdit) {
436      for (Cell cell : logEdit.getCells()) {
437        KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
438        for (Map.Entry entry : kv.toStringMap().entrySet()) {
439          if (entry.getValue().equals(Bytes.toString(WALEdit.BULK_LOAD))) {
440            found = true;
441          }
442        }
443      }
444    }
445
446    public boolean isFound() {
447      return found;
448    }
449  }
450}