001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.List;
023import java.util.concurrent.atomic.AtomicLong;
024import org.apache.hadoop.fs.FileSystem;
025import org.apache.hadoop.fs.Path;
026import org.apache.hadoop.hbase.HBaseClassTestRule;
027import org.apache.hadoop.hbase.HConstants;
028import org.apache.hadoop.hbase.MultithreadedTestUtil.RepeatingTestThread;
029import org.apache.hadoop.hbase.MultithreadedTestUtil.TestContext;
030import org.apache.hadoop.hbase.TableName;
031import org.apache.hadoop.hbase.client.ClientServiceCallable;
032import org.apache.hadoop.hbase.client.ClusterConnection;
033import org.apache.hadoop.hbase.client.RpcRetryingCaller;
034import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
035import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
036import org.apache.hadoop.hbase.testclassification.LargeTests;
037import org.apache.hadoop.hbase.testclassification.RegionServerTests;
038import org.apache.hadoop.hbase.util.Bytes;
039import org.apache.hadoop.hbase.util.Pair;
040import org.junit.ClassRule;
041import org.junit.experimental.categories.Category;
042import org.junit.runner.RunWith;
043import org.junit.runners.Parameterized;
044import org.slf4j.Logger;
045import org.slf4j.LoggerFactory;
046
047import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
048
049import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
050import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
051import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest;
052import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
053
054/**
055 * Tests bulk loading of HFiles with old non-secure client for backward compatibility. Will be
056 * removed when old non-secure client for backward compatibility is not supported.
057 */
058@RunWith(Parameterized.class)
059@Category({ RegionServerTests.class, LargeTests.class })
060public class TestHRegionServerBulkLoadWithOldClient extends TestHRegionServerBulkLoad {
061
062  @ClassRule
063  public static final HBaseClassTestRule CLASS_RULE =
064    HBaseClassTestRule.forClass(TestHRegionServerBulkLoadWithOldClient.class);
065
066  private static final Logger LOG =
067    LoggerFactory.getLogger(TestHRegionServerBulkLoadWithOldClient.class);
068
069  public TestHRegionServerBulkLoadWithOldClient(int duration) {
070    super(duration);
071  }
072
073  public static class AtomicHFileLoader extends RepeatingTestThread {
074    final AtomicLong numBulkLoads = new AtomicLong();
075    final AtomicLong numCompactions = new AtomicLong();
076    private TableName tableName;
077
078    public AtomicHFileLoader(TableName tableName, TestContext ctx, byte targetFamilies[][])
079      throws IOException {
080      super(ctx);
081      this.tableName = tableName;
082    }
083
084    @Override
085    public void doAnAction() throws Exception {
086      long iteration = numBulkLoads.getAndIncrement();
087      Path dir = UTIL.getDataTestDirOnTestFS(String.format("bulkLoad_%08d", iteration));
088
089      // create HFiles for different column families
090      FileSystem fs = UTIL.getTestFileSystem();
091      byte[] val = Bytes.toBytes(String.format("%010d", iteration));
092      final List<Pair<byte[], String>> famPaths = new ArrayList<>(NUM_CFS);
093      for (int i = 0; i < NUM_CFS; i++) {
094        Path hfile = new Path(dir, family(i));
095        byte[] fam = Bytes.toBytes(family(i));
096        createHFile(fs, hfile, fam, QUAL, val, 1000);
097        famPaths.add(new Pair<>(fam, hfile.toString()));
098      }
099
100      // bulk load HFiles
101      final ClusterConnection conn = (ClusterConnection) UTIL.getAdmin().getConnection();
102      RpcControllerFactory rpcControllerFactory = new RpcControllerFactory(UTIL.getConfiguration());
103      ClientServiceCallable<Void> callable = new ClientServiceCallable<Void>(conn, tableName,
104        Bytes.toBytes("aaa"), rpcControllerFactory.newController(), HConstants.PRIORITY_UNSET) {
105        @Override
106        protected Void rpcCall() throws Exception {
107          LOG.info("Non-secure old client");
108          byte[] regionName = getLocation().getRegionInfo().getRegionName();
109          BulkLoadHFileRequest request =
110            RequestConverter.buildBulkLoadHFileRequest(famPaths, regionName, true, null, null);
111          getStub().bulkLoadHFile(null, request);
112          return null;
113        }
114      };
115      RpcRetryingCallerFactory factory = new RpcRetryingCallerFactory(conf);
116      RpcRetryingCaller<Void> caller = factory.<Void> newCaller();
117      caller.callWithRetries(callable, Integer.MAX_VALUE);
118
119      // Periodically do compaction to reduce the number of open file handles.
120      if (numBulkLoads.get() % 5 == 0) {
121        // 5 * 50 = 250 open file handles!
122        callable = new ClientServiceCallable<Void>(conn, tableName, Bytes.toBytes("aaa"),
123          rpcControllerFactory.newController(), HConstants.PRIORITY_UNSET) {
124          @Override
125          protected Void rpcCall() throws Exception {
126            LOG.debug("compacting " + getLocation() + " for row " + Bytes.toStringBinary(getRow()));
127            AdminProtos.AdminService.BlockingInterface server =
128              conn.getAdmin(getLocation().getServerName());
129            CompactRegionRequest request = RequestConverter
130              .buildCompactRegionRequest(getLocation().getRegionInfo().getRegionName(), true, null);
131            server.compactRegion(null, request);
132            numCompactions.incrementAndGet();
133            return null;
134          }
135        };
136        caller.callWithRetries(callable, Integer.MAX_VALUE);
137      }
138    }
139  }
140
141  @Override
142  void runAtomicBulkloadTest(TableName tableName, int millisToRun, int numScanners)
143    throws Exception {
144    setupTable(tableName, 10);
145
146    TestContext ctx = new TestContext(UTIL.getConfiguration());
147
148    AtomicHFileLoader loader = new AtomicHFileLoader(tableName, ctx, null);
149    ctx.addThread(loader);
150
151    List<AtomicScanReader> scanners = Lists.newArrayList();
152    for (int i = 0; i < numScanners; i++) {
153      AtomicScanReader scanner = new AtomicScanReader(tableName, ctx, families);
154      scanners.add(scanner);
155      ctx.addThread(scanner);
156    }
157
158    ctx.startThreads();
159    ctx.waitFor(millisToRun);
160    ctx.stop();
161
162    LOG.info("Loaders:");
163    LOG.info("  loaded " + loader.numBulkLoads.get());
164    LOG.info("  compations " + loader.numCompactions.get());
165
166    LOG.info("Scanners:");
167    for (AtomicScanReader scanner : scanners) {
168      LOG.info("  scanned " + scanner.numScans.get());
169      LOG.info("  verified " + scanner.numRowsScanned.get() + " rows");
170    }
171  }
172}