001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.apache.hadoop.hbase.regionserver.HStoreFile.BULKLOAD_TIME_KEY;
021import static org.hamcrest.core.Is.is;
022import static org.junit.Assert.assertThat;
023
024import java.io.IOException;
025import java.io.InterruptedIOException;
026import java.util.ArrayList;
027import java.util.Collection;
028import java.util.List;
029import java.util.Map;
030import java.util.Optional;
031import java.util.concurrent.atomic.AtomicLong;
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.fs.FileSystem;
034import org.apache.hadoop.fs.Path;
035import org.apache.hadoop.hbase.Cell;
036import org.apache.hadoop.hbase.HBaseClassTestRule;
037import org.apache.hadoop.hbase.HBaseConfiguration;
038import org.apache.hadoop.hbase.HBaseTestingUtility;
039import org.apache.hadoop.hbase.HColumnDescriptor;
040import org.apache.hadoop.hbase.HConstants;
041import org.apache.hadoop.hbase.HTableDescriptor;
042import org.apache.hadoop.hbase.KeyValue;
043import org.apache.hadoop.hbase.KeyValueUtil;
044import org.apache.hadoop.hbase.MultithreadedTestUtil.RepeatingTestThread;
045import org.apache.hadoop.hbase.MultithreadedTestUtil.TestContext;
046import org.apache.hadoop.hbase.StartMiniClusterOption;
047import org.apache.hadoop.hbase.TableExistsException;
048import org.apache.hadoop.hbase.TableName;
049import org.apache.hadoop.hbase.client.ClientServiceCallable;
050import org.apache.hadoop.hbase.client.ClusterConnection;
051import org.apache.hadoop.hbase.client.Result;
052import org.apache.hadoop.hbase.client.ResultScanner;
053import org.apache.hadoop.hbase.client.RpcRetryingCaller;
054import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
055import org.apache.hadoop.hbase.client.Scan;
056import org.apache.hadoop.hbase.client.SecureBulkLoadClient;
057import org.apache.hadoop.hbase.client.Table;
058import org.apache.hadoop.hbase.coprocessor.ObserverContext;
059import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
060import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
061import org.apache.hadoop.hbase.coprocessor.RegionObserver;
062import org.apache.hadoop.hbase.io.compress.Compression;
063import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
064import org.apache.hadoop.hbase.io.hfile.CacheConfig;
065import org.apache.hadoop.hbase.io.hfile.HFile;
066import org.apache.hadoop.hbase.io.hfile.HFileContext;
067import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
068import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
069import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
070import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
071import org.apache.hadoop.hbase.regionserver.wal.TestWALActionsListener;
072import org.apache.hadoop.hbase.testclassification.LargeTests;
073import org.apache.hadoop.hbase.testclassification.RegionServerTests;
074import org.apache.hadoop.hbase.util.Bytes;
075import org.apache.hadoop.hbase.util.Pair;
076import org.apache.hadoop.hbase.wal.WAL;
077import org.apache.hadoop.hbase.wal.WALEdit;
078import org.apache.hadoop.hbase.wal.WALKey;
079import org.junit.BeforeClass;
080import org.junit.ClassRule;
081import org.junit.Test;
082import org.junit.experimental.categories.Category;
083import org.junit.runner.RunWith;
084import org.junit.runners.Parameterized;
085import org.junit.runners.Parameterized.Parameters;
086import org.slf4j.Logger;
087import org.slf4j.LoggerFactory;
088
089import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
090
091import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
092import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
093import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest;
094
095/**
096 * Tests bulk loading of HFiles and shows the atomicity or lack of atomicity of
097 * the region server's bullkLoad functionality.
098 */
099@RunWith(Parameterized.class)
100@Category({RegionServerTests.class, LargeTests.class})
101public class TestHRegionServerBulkLoad {
102
103  @ClassRule
104  public static final HBaseClassTestRule CLASS_RULE =
105      HBaseClassTestRule.forClass(TestHRegionServerBulkLoad.class);
106
107  private static final Logger LOG = LoggerFactory.getLogger(TestHRegionServerBulkLoad.class);
108  protected static HBaseTestingUtility UTIL = new HBaseTestingUtility();
109  protected final static Configuration conf = UTIL.getConfiguration();
110  protected final static byte[] QUAL = Bytes.toBytes("qual");
111  protected final static int NUM_CFS = 10;
112  private int sleepDuration;
113  public static int BLOCKSIZE = 64 * 1024;
114  public static Algorithm COMPRESSION = Compression.Algorithm.NONE;
115
116  protected final static byte[][] families = new byte[NUM_CFS][];
117  static {
118    for (int i = 0; i < NUM_CFS; i++) {
119      families[i] = Bytes.toBytes(family(i));
120    }
121  }
122  @Parameters
123  public static final Collection<Object[]> parameters() {
124    int[] sleepDurations = new int[] { 0, 30000 };
125    List<Object[]> configurations = new ArrayList<>();
126    for (int i : sleepDurations) {
127      configurations.add(new Object[] { i });
128    }
129    return configurations;
130  }
131
132  public TestHRegionServerBulkLoad(int duration) {
133    this.sleepDuration = duration;
134  }
135
136  @BeforeClass
137  public static void setUpBeforeClass() throws Exception {
138    conf.setInt("hbase.rpc.timeout", 10 * 1000);
139  }
140
141  /**
142   * Create a rowkey compatible with
143   * {@link #createHFile(FileSystem, Path, byte[], byte[], byte[], int)}.
144   */
145  public static byte[] rowkey(int i) {
146    return Bytes.toBytes(String.format("row_%08d", i));
147  }
148
149  static String family(int i) {
150    return String.format("family_%04d", i);
151  }
152
153  /**
154   * Create an HFile with the given number of rows with a specified value.
155   */
156  public static void createHFile(FileSystem fs, Path path, byte[] family,
157      byte[] qualifier, byte[] value, int numRows) throws IOException {
158    HFileContext context = new HFileContextBuilder().withBlockSize(BLOCKSIZE)
159                            .withCompression(COMPRESSION)
160                            .build();
161    HFile.Writer writer = HFile
162        .getWriterFactory(conf, new CacheConfig(conf))
163        .withPath(fs, path)
164        .withFileContext(context)
165        .create();
166    long now = System.currentTimeMillis();
167    try {
168      // subtract 2 since iterateOnSplits doesn't include boundary keys
169      for (int i = 0; i < numRows; i++) {
170        KeyValue kv = new KeyValue(rowkey(i), family, qualifier, now, value);
171        writer.append(kv);
172      }
173      writer.appendFileInfo(BULKLOAD_TIME_KEY, Bytes.toBytes(now));
174    } finally {
175      writer.close();
176    }
177  }
178
179  /**
180   * Thread that does full scans of the table looking for any partially
181   * completed rows.
182   *
183   * Each iteration of this loads 10 hdfs files, which occupies 5 file open file
184   * handles. So every 10 iterations (500 file handles) it does a region
185   * compaction to reduce the number of open file handles.
186   */
187  public static class AtomicHFileLoader extends RepeatingTestThread {
188    final AtomicLong numBulkLoads = new AtomicLong();
189    final AtomicLong numCompactions = new AtomicLong();
190    private TableName tableName;
191
192    public AtomicHFileLoader(TableName tableName, TestContext ctx,
193        byte targetFamilies[][]) throws IOException {
194      super(ctx);
195      this.tableName = tableName;
196    }
197
198    @Override
199    public void doAnAction() throws Exception {
200      long iteration = numBulkLoads.getAndIncrement();
201      Path dir =  UTIL.getDataTestDirOnTestFS(String.format("bulkLoad_%08d",
202          iteration));
203
204      // create HFiles for different column families
205      FileSystem fs = UTIL.getTestFileSystem();
206      byte[] val = Bytes.toBytes(String.format("%010d", iteration));
207      final List<Pair<byte[], String>> famPaths = new ArrayList<>(NUM_CFS);
208      for (int i = 0; i < NUM_CFS; i++) {
209        Path hfile = new Path(dir, family(i));
210        byte[] fam = Bytes.toBytes(family(i));
211        createHFile(fs, hfile, fam, QUAL, val, 1000);
212        famPaths.add(new Pair<>(fam, hfile.toString()));
213      }
214
215      // bulk load HFiles
216      final ClusterConnection conn = (ClusterConnection)UTIL.getConnection();
217      Table table = conn.getTable(tableName);
218      final String bulkToken = new SecureBulkLoadClient(UTIL.getConfiguration(), table).
219          prepareBulkLoad(conn);
220      ClientServiceCallable<Void> callable = new ClientServiceCallable<Void>(conn,
221          tableName, Bytes.toBytes("aaa"),
222          new RpcControllerFactory(UTIL.getConfiguration()).newController(), HConstants.PRIORITY_UNSET) {
223        @Override
224        public Void rpcCall() throws Exception {
225          LOG.debug("Going to connect to server " + getLocation() + " for row "
226              + Bytes.toStringBinary(getRow()));
227          SecureBulkLoadClient secureClient = null;
228          byte[] regionName = getLocation().getRegionInfo().getRegionName();
229          try (Table table = conn.getTable(getTableName())) {
230            secureClient = new SecureBulkLoadClient(UTIL.getConfiguration(), table);
231            secureClient.secureBulkLoadHFiles(getStub(), famPaths, regionName,
232                  true, null, bulkToken);
233          }
234          return null;
235        }
236      };
237      RpcRetryingCallerFactory factory = new RpcRetryingCallerFactory(conf);
238      RpcRetryingCaller<Void> caller = factory.<Void> newCaller();
239      caller.callWithRetries(callable, Integer.MAX_VALUE);
240
241      // Periodically do compaction to reduce the number of open file handles.
242      if (numBulkLoads.get() % 5 == 0) {
243        // 5 * 50 = 250 open file handles!
244        callable = new ClientServiceCallable<Void>(conn,
245            tableName, Bytes.toBytes("aaa"),
246            new RpcControllerFactory(UTIL.getConfiguration()).newController(), HConstants.PRIORITY_UNSET) {
247          @Override
248          protected Void rpcCall() throws Exception {
249            LOG.debug("compacting " + getLocation() + " for row "
250                + Bytes.toStringBinary(getRow()));
251            AdminProtos.AdminService.BlockingInterface server =
252              conn.getAdmin(getLocation().getServerName());
253            CompactRegionRequest request = RequestConverter.buildCompactRegionRequest(
254                getLocation().getRegionInfo().getRegionName(), true, null);
255            server.compactRegion(null, request);
256            numCompactions.incrementAndGet();
257            return null;
258          }
259        };
260        caller.callWithRetries(callable, Integer.MAX_VALUE);
261      }
262    }
263  }
264
265  public static class MyObserver implements RegionCoprocessor, RegionObserver {
266    static int sleepDuration;
267
268    @Override
269    public Optional<RegionObserver> getRegionObserver() {
270      return Optional.of(this);
271    }
272
273    @Override
274    public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> e, Store store,
275        InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker,
276        CompactionRequest request)
277        throws IOException {
278      try {
279        Thread.sleep(sleepDuration);
280      } catch (InterruptedException ie) {
281        IOException ioe = new InterruptedIOException();
282        ioe.initCause(ie);
283        throw ioe;
284      }
285      return scanner;
286    }
287  }
288
289  /**
290   * Thread that does full scans of the table looking for any partially
291   * completed rows.
292   */
293  public static class AtomicScanReader extends RepeatingTestThread {
294    byte targetFamilies[][];
295    Table table;
296    AtomicLong numScans = new AtomicLong();
297    AtomicLong numRowsScanned = new AtomicLong();
298    TableName TABLE_NAME;
299
300    public AtomicScanReader(TableName TABLE_NAME, TestContext ctx,
301        byte targetFamilies[][]) throws IOException {
302      super(ctx);
303      this.TABLE_NAME = TABLE_NAME;
304      this.targetFamilies = targetFamilies;
305      table = UTIL.getConnection().getTable(TABLE_NAME);
306    }
307
308    @Override
309    public void doAnAction() throws Exception {
310      Scan s = new Scan();
311      for (byte[] family : targetFamilies) {
312        s.addFamily(family);
313      }
314      ResultScanner scanner = table.getScanner(s);
315
316      for (Result res : scanner) {
317        byte[] lastRow = null, lastFam = null, lastQual = null;
318        byte[] gotValue = null;
319        for (byte[] family : targetFamilies) {
320          byte qualifier[] = QUAL;
321          byte thisValue[] = res.getValue(family, qualifier);
322          if (gotValue != null && thisValue != null
323              && !Bytes.equals(gotValue, thisValue)) {
324
325            StringBuilder msg = new StringBuilder();
326            msg.append("Failed on scan ").append(numScans)
327                .append(" after scanning ").append(numRowsScanned)
328                .append(" rows!\n");
329            msg.append("Current  was " + Bytes.toString(res.getRow()) + "/"
330                + Bytes.toString(family) + ":" + Bytes.toString(qualifier)
331                + " = " + Bytes.toString(thisValue) + "\n");
332            msg.append("Previous  was " + Bytes.toString(lastRow) + "/"
333                + Bytes.toString(lastFam) + ":" + Bytes.toString(lastQual)
334                + " = " + Bytes.toString(gotValue));
335            throw new RuntimeException(msg.toString());
336          }
337
338          lastFam = family;
339          lastQual = qualifier;
340          lastRow = res.getRow();
341          gotValue = thisValue;
342        }
343        numRowsScanned.getAndIncrement();
344      }
345      numScans.getAndIncrement();
346    }
347  }
348
349  /**
350   * Creates a table with given table name and specified number of column
351   * families if the table does not already exist.
352   */
353  public void setupTable(TableName table, int cfs) throws IOException {
354    try {
355      LOG.info("Creating table " + table);
356      HTableDescriptor htd = new HTableDescriptor(table);
357      htd.addCoprocessor(MyObserver.class.getName());
358      MyObserver.sleepDuration = this.sleepDuration;
359      for (int i = 0; i < 10; i++) {
360        htd.addFamily(new HColumnDescriptor(family(i)));
361      }
362
363      UTIL.getAdmin().createTable(htd);
364    } catch (TableExistsException tee) {
365      LOG.info("Table " + table + " already exists");
366    }
367  }
368
369  /**
370   * Atomic bulk load.
371   */
372  @Test
373  public void testAtomicBulkLoad() throws Exception {
374    TableName TABLE_NAME = TableName.valueOf("atomicBulkLoad");
375
376    int millisToRun = 30000;
377    int numScanners = 50;
378
379    // Set createWALDir to true and use default values for other options.
380    UTIL.startMiniCluster(StartMiniClusterOption.builder().createWALDir(true).build());
381    try {
382      WAL log = UTIL.getHBaseCluster().getRegionServer(0).getWAL(null);
383      FindBulkHBaseListener listener = new FindBulkHBaseListener();
384      log.registerWALActionsListener(listener);
385      runAtomicBulkloadTest(TABLE_NAME, millisToRun, numScanners);
386      assertThat(listener.isFound(), is(true));
387    } finally {
388      UTIL.shutdownMiniCluster();
389    }
390  }
391
392  void runAtomicBulkloadTest(TableName tableName, int millisToRun, int numScanners)
393      throws Exception {
394    setupTable(tableName, 10);
395
396    TestContext ctx = new TestContext(UTIL.getConfiguration());
397
398    AtomicHFileLoader loader = new AtomicHFileLoader(tableName, ctx, null);
399    ctx.addThread(loader);
400
401    List<AtomicScanReader> scanners = Lists.newArrayList();
402    for (int i = 0; i < numScanners; i++) {
403      AtomicScanReader scanner = new AtomicScanReader(tableName, ctx, families);
404      scanners.add(scanner);
405      ctx.addThread(scanner);
406    }
407
408    ctx.startThreads();
409    ctx.waitFor(millisToRun);
410    ctx.stop();
411
412    LOG.info("Loaders:");
413    LOG.info("  loaded " + loader.numBulkLoads.get());
414    LOG.info("  compations " + loader.numCompactions.get());
415
416    LOG.info("Scanners:");
417    for (AtomicScanReader scanner : scanners) {
418      LOG.info("  scanned " + scanner.numScans.get());
419      LOG.info("  verified " + scanner.numRowsScanned.get() + " rows");
420    }
421  }
422
423  /**
424   * Run test on an HBase instance for 5 minutes. This assumes that the table
425   * under test only has a single region.
426   */
427  public static void main(String args[]) throws Exception {
428    try {
429      Configuration c = HBaseConfiguration.create();
430      TestHRegionServerBulkLoad test = new TestHRegionServerBulkLoad(0);
431      test.setConf(c);
432      test.runAtomicBulkloadTest(TableName.valueOf("atomicTableTest"), 5 * 60 * 1000, 50);
433    } finally {
434      System.exit(0); // something hangs (believe it is lru threadpool)
435    }
436  }
437
438  private void setConf(Configuration c) {
439    UTIL = new HBaseTestingUtility(c);
440  }
441
442  static class FindBulkHBaseListener extends TestWALActionsListener.DummyWALActionsListener {
443    private boolean found = false;
444
445    @Override
446    public void visitLogEntryBeforeWrite(WALKey logKey, WALEdit logEdit) {
447      for (Cell cell : logEdit.getCells()) {
448        KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
449        for (Map.Entry entry : kv.toStringMap().entrySet()) {
450          if (entry.getValue().equals(Bytes.toString(WALEdit.BULK_LOAD))) {
451            found = true;
452          }
453        }
454      }
455    }
456
457    public boolean isFound() {
458      return found;
459    }
460  }
461}
462
463