001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.junit.Assert.assertTrue;
021import static org.junit.Assert.fail;
022
023import java.io.IOException;
024import java.net.SocketTimeoutException;
025import java.util.Comparator;
026import java.util.HashMap;
027import java.util.Map;
028import java.util.Objects;
029import java.util.Random;
030import java.util.SortedMap;
031import java.util.concurrent.CompletableFuture;
032import java.util.concurrent.ConcurrentSkipListMap;
033import java.util.concurrent.ExecutorService;
034import java.util.concurrent.Executors;
035import java.util.concurrent.atomic.AtomicInteger;
036import java.util.concurrent.atomic.AtomicLong;
037import org.apache.commons.lang3.NotImplementedException;
038import org.apache.hadoop.conf.Configuration;
039import org.apache.hadoop.conf.Configured;
040import org.apache.hadoop.hbase.CellComparatorImpl;
041import org.apache.hadoop.hbase.DoNotRetryIOException;
042import org.apache.hadoop.hbase.HBaseClassTestRule;
043import org.apache.hadoop.hbase.HBaseConfiguration;
044import org.apache.hadoop.hbase.HConstants;
045import org.apache.hadoop.hbase.HRegionInfo;
046import org.apache.hadoop.hbase.HRegionLocation;
047import org.apache.hadoop.hbase.KeyValue;
048import org.apache.hadoop.hbase.MetaCellComparator;
049import org.apache.hadoop.hbase.MetaTableAccessor;
050import org.apache.hadoop.hbase.RegionLocations;
051import org.apache.hadoop.hbase.RegionTooBusyException;
052import org.apache.hadoop.hbase.ServerName;
053import org.apache.hadoop.hbase.TableName;
054import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
055import org.apache.hadoop.hbase.security.User;
056import org.apache.hadoop.hbase.testclassification.ClientTests;
057import org.apache.hadoop.hbase.testclassification.SmallTests;
058import org.apache.hadoop.hbase.util.Bytes;
059import org.apache.hadoop.hbase.util.Pair;
060import org.apache.hadoop.hbase.util.Threads;
061import org.apache.hadoop.util.Tool;
062import org.apache.hadoop.util.ToolRunner;
063import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
064import org.junit.Before;
065import org.junit.ClassRule;
066import org.junit.Ignore;
067import org.junit.Test;
068import org.junit.experimental.categories.Category;
069import org.mockito.Mockito;
070import org.slf4j.Logger;
071import org.slf4j.LoggerFactory;
072
073import org.apache.hbase.thirdparty.com.google.common.base.Stopwatch;
074import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
075import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
076import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
077import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
078
079import org.apache.hadoop.hbase.shaded.protobuf.generated.CellProtos;
080import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
081import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
082import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileResponse;
083import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadRequest;
084import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadResponse;
085import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
086import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService.BlockingInterface;
087import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
088import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
089import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest;
090import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetResponse;
091import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest;
092import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiResponse;
093import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest;
094import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse;
095import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadRequest;
096import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadResponse;
097import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction;
098import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionActionResult;
099import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ResultOrException;
100import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
101import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
102import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
103
104/**
105 * Test client behavior w/o setting up a cluster.
106 * Mock up cluster emissions.
107 */
108@Category({ClientTests.class, SmallTests.class})
109public class TestClientNoCluster extends Configured implements Tool {
110
111  @ClassRule
112  public static final HBaseClassTestRule CLASS_RULE =
113      HBaseClassTestRule.forClass(TestClientNoCluster.class);
114
115  private static final Logger LOG = LoggerFactory.getLogger(TestClientNoCluster.class);
116  private Configuration conf;
117  public static final ServerName META_SERVERNAME =
118      ServerName.valueOf("meta.example.org", 16010, 12345);
119
120  @Before
121  public void setUp() throws Exception {
122    this.conf = HBaseConfiguration.create();
123    // Run my Connection overrides.  Use my little ConnectionImplementation below which
124    // allows me insert mocks and also use my Registry below rather than the default zk based
125    // one so tests run faster and don't have zk dependency.
126    this.conf.set("hbase.client.registry.impl", SimpleRegistry.class.getName());
127  }
128
129  /**
130   * Simple cluster registry inserted in place of our usual zookeeper based one.
131   */
132  static class SimpleRegistry extends DoNothingConnectionRegistry {
133    final ServerName META_HOST = META_SERVERNAME;
134
135    public SimpleRegistry(Configuration conf) {
136      super(conf);
137    }
138
139    @Override
140    public CompletableFuture<RegionLocations> getMetaRegionLocations() {
141      return CompletableFuture.completedFuture(new RegionLocations(
142          new HRegionLocation(RegionInfoBuilder.FIRST_META_REGIONINFO, META_HOST)));
143    }
144
145    @Override
146    public CompletableFuture<String> getClusterId() {
147      return CompletableFuture.completedFuture(HConstants.CLUSTER_ID_DEFAULT);
148    }
149  }
150
151  /**
152   * Remove the @Ignore to try out timeout and retry asettings
153   * @throws IOException
154   */
155  @Ignore
156  @Test
157  public void testTimeoutAndRetries() throws IOException {
158    Configuration localConfig = HBaseConfiguration.create(this.conf);
159    // This override mocks up our exists/get call to throw a RegionServerStoppedException.
160    localConfig.set("hbase.client.connection.impl", RpcTimeoutConnection.class.getName());
161    Connection connection = ConnectionFactory.createConnection(localConfig);
162    Table table = connection.getTable(TableName.META_TABLE_NAME);
163    Throwable t = null;
164    LOG.info("Start");
165    try {
166      // An exists call turns into a get w/ a flag.
167      table.exists(new Get(Bytes.toBytes("abc")));
168    } catch (SocketTimeoutException e) {
169      // I expect this exception.
170      LOG.info("Got expected exception", e);
171      t = e;
172    } catch (RetriesExhaustedException e) {
173      // This is the old, unwanted behavior.  If we get here FAIL!!!
174      fail();
175    } finally {
176      table.close();
177    }
178    connection.close();
179    LOG.info("Stop");
180    assertTrue(t != null);
181  }
182
183  /**
184   * Test that operation timeout prevails over rpc default timeout and retries, etc.
185   * @throws IOException
186   */
187  @Test
188  public void testRpcTimeout() throws IOException {
189    Configuration localConfig = HBaseConfiguration.create(this.conf);
190    // This override mocks up our exists/get call to throw a RegionServerStoppedException.
191    localConfig.set("hbase.client.connection.impl", RpcTimeoutConnection.class.getName());
192    int pause = 10;
193    localConfig.setInt("hbase.client.pause", pause);
194    localConfig.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 10);
195    // Set the operation timeout to be < the pause.  Expectation is that after first pause, we will
196    // fail out of the rpc because the rpc timeout will have been set to the operation tiemout
197    // and it has expired.  Otherwise, if this functionality is broke, all retries will be run --
198    // all ten of them -- and we'll get the RetriesExhaustedException exception.
199    localConfig.setInt(HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT, pause - 1);
200    Connection connection = ConnectionFactory.createConnection(localConfig);
201    Table table = connection.getTable(TableName.META_TABLE_NAME);
202    Throwable t = null;
203    try {
204      // An exists call turns into a get w/ a flag.
205      table.exists(new Get(Bytes.toBytes("abc")));
206    } catch (SocketTimeoutException e) {
207      // I expect this exception.
208      LOG.info("Got expected exception", e);
209      t = e;
210    } catch (RetriesExhaustedException e) {
211      // This is the old, unwanted behavior.  If we get here FAIL!!!
212      fail();
213    } finally {
214      table.close();
215      connection.close();
216    }
217    assertTrue(t != null);
218  }
219
220  @Test
221  public void testDoNotRetryMetaTableAccessor() throws IOException {
222    this.conf.set("hbase.client.connection.impl",
223      RegionServerStoppedOnScannerOpenConnection.class.getName());
224    try (Connection connection = ConnectionFactory.createConnection(conf)) {
225      MetaTableAccessor.fullScanRegions(connection);
226    }
227  }
228
229  @Test
230  public void testDoNotRetryOnScanNext() throws IOException {
231    this.conf.set("hbase.client.connection.impl",
232      RegionServerStoppedOnScannerOpenConnection.class.getName());
233    // Go against meta else we will try to find first region for the table on construction which
234    // means we'll have to do a bunch more mocking.  Tests that go against meta only should be
235    // good for a bit of testing.
236    Connection connection = ConnectionFactory.createConnection(this.conf);
237    Table table = connection.getTable(TableName.META_TABLE_NAME);
238    ResultScanner scanner = table.getScanner(HConstants.CATALOG_FAMILY);
239    try {
240      Result result = null;
241      while ((result = scanner.next()) != null) {
242        LOG.info(Objects.toString(result));
243      }
244    } finally {
245      scanner.close();
246      table.close();
247      connection.close();
248    }
249  }
250
251  @Test
252  public void testRegionServerStoppedOnScannerOpen() throws IOException {
253    this.conf.set("hbase.client.connection.impl",
254      RegionServerStoppedOnScannerOpenConnection.class.getName());
255    // Go against meta else we will try to find first region for the table on construction which
256    // means we'll have to do a bunch more mocking.  Tests that go against meta only should be
257    // good for a bit of testing.
258    Connection connection = ConnectionFactory.createConnection(conf);
259    Table table = connection.getTable(TableName.META_TABLE_NAME);
260    ResultScanner scanner = table.getScanner(HConstants.CATALOG_FAMILY);
261    try {
262      Result result = null;
263      while ((result = scanner.next()) != null) {
264        LOG.info(Objects.toString(result));
265      }
266    } finally {
267      scanner.close();
268      table.close();
269      connection.close();
270    }
271  }
272
273  @Test
274  public void testConnectionClosedOnRegionLocate() throws IOException {
275    Configuration testConf = new Configuration(this.conf);
276    testConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
277    // Go against meta else we will try to find first region for the table on construction which
278    // means we'll have to do a bunch more mocking. Tests that go against meta only should be
279    // good for a bit of testing.
280    Connection connection = ConnectionFactory.createConnection(testConf);
281    Table table = connection.getTable(TableName.META_TABLE_NAME);
282    connection.close();
283    try {
284      Get get = new Get(Bytes.toBytes("dummyRow"));
285      table.get(get);
286      fail("Should have thrown DoNotRetryException but no exception thrown");
287    } catch (Exception e) {
288      if (!(e instanceof DoNotRetryIOException)) {
289        String errMsg =
290            "Should have thrown DoNotRetryException but actually " + e.getClass().getSimpleName();
291        LOG.error(errMsg, e);
292        fail(errMsg);
293      }
294    } finally {
295      table.close();
296    }
297  }
298
299  /**
300   * Override to shutdown going to zookeeper for cluster id and meta location.
301   */
302  static class RegionServerStoppedOnScannerOpenConnection
303  extends ConnectionImplementation {
304    final ClientService.BlockingInterface stub;
305
306    RegionServerStoppedOnScannerOpenConnection(Configuration conf,
307        ExecutorService pool, User user) throws IOException {
308      super(conf, pool, user);
309      // Mock up my stub so open scanner returns a scanner id and then on next, we throw
310      // exceptions for three times and then after that, we return no more to scan.
311      this.stub = Mockito.mock(ClientService.BlockingInterface.class);
312      long sid = 12345L;
313      try {
314        Mockito.when(stub.scan((RpcController)Mockito.any(),
315            (ClientProtos.ScanRequest)Mockito.any())).
316          thenReturn(ClientProtos.ScanResponse.newBuilder().setScannerId(sid).build()).
317          thenThrow(new ServiceException(new RegionServerStoppedException("From Mockito"))).
318          thenReturn(ClientProtos.ScanResponse.newBuilder().setScannerId(sid).
319              setMoreResults(false).build());
320      } catch (ServiceException e) {
321        throw new IOException(e);
322      }
323    }
324
325    @Override
326    public BlockingInterface getClient(ServerName sn) throws IOException {
327      return this.stub;
328    }
329  }
330
331  /**
332   * Override to check we are setting rpc timeout right.
333   */
334  static class RpcTimeoutConnection
335  extends ConnectionImplementation {
336    final ClientService.BlockingInterface stub;
337
338    RpcTimeoutConnection(Configuration conf, ExecutorService pool, User user)
339    throws IOException {
340      super(conf, pool, user);
341      // Mock up my stub so an exists call -- which turns into a get -- throws an exception
342      this.stub = Mockito.mock(ClientService.BlockingInterface.class);
343      try {
344        Mockito.when(stub.get((RpcController)Mockito.any(),
345            (ClientProtos.GetRequest)Mockito.any())).
346          thenThrow(new ServiceException(new RegionServerStoppedException("From Mockito")));
347      } catch (ServiceException e) {
348        throw new IOException(e);
349      }
350    }
351
352    @Override
353    public BlockingInterface getClient(ServerName sn) throws IOException {
354      return this.stub;
355    }
356  }
357
358  /**
359   * Fake many regionservers and many regions on a connection implementation.
360   */
361  static class ManyServersManyRegionsConnection
362  extends ConnectionImplementation {
363    // All access should be synchronized
364    final Map<ServerName, ClientService.BlockingInterface> serversByClient;
365
366    /**
367     * Map of faked-up rows of a 'meta table'.
368     */
369    final SortedMap<byte [], Pair<HRegionInfo, ServerName>> meta;
370    final AtomicLong sequenceids = new AtomicLong(0);
371    private final Configuration conf;
372
373    ManyServersManyRegionsConnection(Configuration conf,
374        ExecutorService pool, User user)
375    throws IOException {
376      super(conf, pool, user);
377      int serverCount = conf.getInt("hbase.test.servers", 10);
378      this.serversByClient = new HashMap<>(serverCount);
379      this.meta = makeMeta(Bytes.toBytes(
380        conf.get("hbase.test.tablename", Bytes.toString(BIG_USER_TABLE))),
381        conf.getInt("hbase.test.regions", 100),
382        conf.getLong("hbase.test.namespace.span", 1000),
383        serverCount);
384      this.conf = conf;
385    }
386
387    @Override
388    public ClientService.BlockingInterface getClient(ServerName sn) throws IOException {
389      // if (!sn.toString().startsWith("meta")) LOG.info(sn);
390      ClientService.BlockingInterface stub = null;
391      synchronized (this.serversByClient) {
392        stub = this.serversByClient.get(sn);
393        if (stub == null) {
394          stub = new FakeServer(this.conf, meta, sequenceids);
395          this.serversByClient.put(sn, stub);
396        }
397      }
398      return stub;
399    }
400  }
401
402  static MultiResponse doMultiResponse(final SortedMap<byte [], Pair<HRegionInfo, ServerName>> meta,
403      final AtomicLong sequenceids, final MultiRequest request) {
404    // Make a response to match the request.  Act like there were no failures.
405    ClientProtos.MultiResponse.Builder builder = ClientProtos.MultiResponse.newBuilder();
406    // Per Region.
407    RegionActionResult.Builder regionActionResultBuilder =
408        RegionActionResult.newBuilder();
409    ResultOrException.Builder roeBuilder = ResultOrException.newBuilder();
410    for (RegionAction regionAction: request.getRegionActionList()) {
411      regionActionResultBuilder.clear();
412      // Per Action in a Region.
413      for (ClientProtos.Action action: regionAction.getActionList()) {
414        roeBuilder.clear();
415        // Return empty Result and proper index as result.
416        roeBuilder.setResult(ClientProtos.Result.getDefaultInstance());
417        roeBuilder.setIndex(action.getIndex());
418        regionActionResultBuilder.addResultOrException(roeBuilder.build());
419      }
420      builder.addRegionActionResult(regionActionResultBuilder.build());
421    }
422    return builder.build();
423  }
424
425  /**
426   * Fake 'server'.
427   * Implements the ClientService responding as though it were a 'server' (presumes a new
428   * ClientService.BlockingInterface made per server).
429   */
430  static class FakeServer implements ClientService.BlockingInterface {
431    private AtomicInteger multiInvocationsCount = new AtomicInteger(0);
432    private final SortedMap<byte [], Pair<HRegionInfo, ServerName>> meta;
433    private final AtomicLong sequenceids;
434    private final long multiPause;
435    private final int tooManyMultiRequests;
436
437    FakeServer(final Configuration c, final SortedMap<byte [], Pair<HRegionInfo, ServerName>> meta,
438        final AtomicLong sequenceids) {
439      this.meta = meta;
440      this.sequenceids = sequenceids;
441
442      // Pause to simulate the server taking time applying the edits.  This will drive up the
443      // number of threads used over in client.
444      this.multiPause = c.getLong("hbase.test.multi.pause.when.done", 0);
445      this.tooManyMultiRequests = c.getInt("hbase.test.multi.too.many", 3);
446    }
447
448    @Override
449    public GetResponse get(RpcController controller, GetRequest request)
450    throws ServiceException {
451      boolean metaRegion = isMetaRegion(request.getRegion().getValue().toByteArray(),
452        request.getRegion().getType());
453      if (!metaRegion) {
454        return doGetResponse(request);
455      }
456      return doMetaGetResponse(meta, request);
457    }
458
459    private GetResponse doGetResponse(GetRequest request) {
460      ClientProtos.Result.Builder resultBuilder = ClientProtos.Result.newBuilder();
461      ByteString row = request.getGet().getRow();
462      resultBuilder.addCell(getStartCode(row));
463      GetResponse.Builder builder = GetResponse.newBuilder();
464      builder.setResult(resultBuilder.build());
465      return builder.build();
466    }
467
468    @Override
469    public MutateResponse mutate(RpcController controller,
470        MutateRequest request) throws ServiceException {
471      throw new NotImplementedException(HConstants.NOT_IMPLEMENTED);
472    }
473
474    @Override
475    public ScanResponse scan(RpcController controller,
476        ScanRequest request) throws ServiceException {
477      // Presume it is a scan of meta for now. Not all scans provide a region spec expecting
478      // the server to keep reference by scannerid.  TODO.
479      return doMetaScanResponse(meta, sequenceids, request);
480    }
481
482    @Override
483    public BulkLoadHFileResponse bulkLoadHFile(
484        RpcController controller, BulkLoadHFileRequest request)
485        throws ServiceException {
486      throw new NotImplementedException(HConstants.NOT_IMPLEMENTED);
487    }
488
489    @Override
490    public CoprocessorServiceResponse execService(
491        RpcController controller, CoprocessorServiceRequest request)
492        throws ServiceException {
493      throw new NotImplementedException(HConstants.NOT_IMPLEMENTED);
494    }
495
496    @Override
497    public MultiResponse multi(RpcController controller, MultiRequest request)
498    throws ServiceException {
499      int concurrentInvocations = this.multiInvocationsCount.incrementAndGet();
500      try {
501        if (concurrentInvocations >= tooManyMultiRequests) {
502          throw new ServiceException(new RegionTooBusyException("concurrentInvocations=" +
503           concurrentInvocations));
504        }
505        Threads.sleep(multiPause);
506        return doMultiResponse(meta, sequenceids, request);
507      } finally {
508        this.multiInvocationsCount.decrementAndGet();
509      }
510    }
511
512    @Override
513    public CoprocessorServiceResponse execRegionServerService(RpcController controller,
514        CoprocessorServiceRequest request) throws ServiceException {
515      throw new NotImplementedException(HConstants.NOT_IMPLEMENTED);
516    }
517
518    @Override
519    public PrepareBulkLoadResponse prepareBulkLoad(RpcController controller,
520        PrepareBulkLoadRequest request) throws ServiceException {
521      throw new NotImplementedException(HConstants.NOT_IMPLEMENTED);
522    }
523
524    @Override
525    public CleanupBulkLoadResponse cleanupBulkLoad(RpcController controller,
526        CleanupBulkLoadRequest request) throws ServiceException {
527      throw new NotImplementedException(HConstants.NOT_IMPLEMENTED);
528    }
529  }
530
531  static ScanResponse doMetaScanResponse(final SortedMap<byte [], Pair<HRegionInfo, ServerName>> meta,
532      final AtomicLong sequenceids, final ScanRequest request) {
533    ScanResponse.Builder builder = ScanResponse.newBuilder();
534    int max = request.getNumberOfRows();
535    int count = 0;
536    Map<byte [], Pair<HRegionInfo, ServerName>> tail =
537      request.hasScan()? meta.tailMap(request.getScan().getStartRow().toByteArray()): meta;
538      ClientProtos.Result.Builder resultBuilder = ClientProtos.Result.newBuilder();
539    for (Map.Entry<byte [], Pair<HRegionInfo, ServerName>> e: tail.entrySet()) {
540      // Can be 0 on open of a scanner -- i.e. rpc to setup scannerid only.
541      if (max <= 0) break;
542      if (++count > max) break;
543      HRegionInfo hri = e.getValue().getFirst();
544      ByteString row = UnsafeByteOperations.unsafeWrap(hri.getRegionName());
545      resultBuilder.clear();
546      resultBuilder.addCell(getRegionInfo(row, hri));
547      resultBuilder.addCell(getServer(row, e.getValue().getSecond()));
548      resultBuilder.addCell(getStartCode(row));
549      builder.addResults(resultBuilder.build());
550      // Set more to false if we are on the last region in table.
551      if (hri.getEndKey().length <= 0) builder.setMoreResults(false);
552      else builder.setMoreResults(true);
553    }
554    // If no scannerid, set one.
555    builder.setScannerId(request.hasScannerId()?
556      request.getScannerId(): sequenceids.incrementAndGet());
557    return builder.build();
558  }
559
560  static GetResponse doMetaGetResponse(final SortedMap<byte [], Pair<HRegionInfo, ServerName>> meta,
561      final GetRequest request) {
562    ClientProtos.Result.Builder resultBuilder = ClientProtos.Result.newBuilder();
563    ByteString row = request.getGet().getRow();
564    Pair<HRegionInfo, ServerName> p = meta.get(row.toByteArray());
565    if (p != null) {
566      resultBuilder.addCell(getRegionInfo(row, p.getFirst()));
567      resultBuilder.addCell(getServer(row, p.getSecond()));
568    }
569    resultBuilder.addCell(getStartCode(row));
570    GetResponse.Builder builder = GetResponse.newBuilder();
571    builder.setResult(resultBuilder.build());
572    return builder.build();
573  }
574
575  /**
576   * @param name region name or encoded region name.
577   * @param type
578   * @return True if we are dealing with a hbase:meta region.
579   */
580  static boolean isMetaRegion(final byte [] name, final RegionSpecifierType type) {
581    switch (type) {
582    case REGION_NAME:
583      return Bytes.equals(HRegionInfo.FIRST_META_REGIONINFO.getRegionName(), name);
584    case ENCODED_REGION_NAME:
585      return Bytes.equals(HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes(), name);
586    default: throw new UnsupportedOperationException();
587    }
588  }
589
590  private final static ByteString CATALOG_FAMILY_BYTESTRING =
591      UnsafeByteOperations.unsafeWrap(HConstants.CATALOG_FAMILY);
592  private final static ByteString REGIONINFO_QUALIFIER_BYTESTRING =
593      UnsafeByteOperations.unsafeWrap(HConstants.REGIONINFO_QUALIFIER);
594  private final static ByteString SERVER_QUALIFIER_BYTESTRING =
595      UnsafeByteOperations.unsafeWrap(HConstants.SERVER_QUALIFIER);
596
597  static CellProtos.Cell.Builder getBaseCellBuilder(final ByteString row) {
598    CellProtos.Cell.Builder cellBuilder = CellProtos.Cell.newBuilder();
599    cellBuilder.setRow(row);
600    cellBuilder.setFamily(CATALOG_FAMILY_BYTESTRING);
601    cellBuilder.setTimestamp(System.currentTimeMillis());
602    return cellBuilder;
603  }
604
605  static CellProtos.Cell getRegionInfo(final ByteString row, final HRegionInfo hri) {
606    CellProtos.Cell.Builder cellBuilder = getBaseCellBuilder(row);
607    cellBuilder.setQualifier(REGIONINFO_QUALIFIER_BYTESTRING);
608    cellBuilder.setValue(UnsafeByteOperations.unsafeWrap(hri.toByteArray()));
609    return cellBuilder.build();
610  }
611
612  static CellProtos.Cell getServer(final ByteString row, final ServerName sn) {
613    CellProtos.Cell.Builder cellBuilder = getBaseCellBuilder(row);
614    cellBuilder.setQualifier(SERVER_QUALIFIER_BYTESTRING);
615    cellBuilder.setValue(ByteString.copyFromUtf8(sn.getHostAndPort()));
616    return cellBuilder.build();
617  }
618
619  static CellProtos.Cell getStartCode(final ByteString row) {
620    CellProtos.Cell.Builder cellBuilder = getBaseCellBuilder(row);
621    cellBuilder.setQualifier(UnsafeByteOperations.unsafeWrap(HConstants.STARTCODE_QUALIFIER));
622    // TODO:
623    cellBuilder.setValue(UnsafeByteOperations.unsafeWrap(
624        Bytes.toBytes(META_SERVERNAME.getStartcode())));
625    return cellBuilder.build();
626  }
627
628  private static final byte [] BIG_USER_TABLE = Bytes.toBytes("t");
629
630  /**
631   * Format passed integer.  Zero-pad.
632   * Copied from hbase-server PE class and small amendment.  Make them share.
633   * @param number
634   * @return Returns zero-prefixed 10-byte wide decimal version of passed
635   * number (Does absolute in case number is negative).
636   */
637  private static byte [] format(final long number) {
638    byte [] b = new byte[10];
639    long d = number;
640    for (int i = b.length - 1; i >= 0; i--) {
641      b[i] = (byte)((d % 10) + '0');
642      d /= 10;
643    }
644    return b;
645  }
646
647  /**
648   * @param count
649   * @param namespaceSpan
650   * @return <code>count</code> regions
651   */
652  private static HRegionInfo [] makeHRegionInfos(final byte [] tableName, final int count,
653      final long namespaceSpan) {
654    byte [] startKey = HConstants.EMPTY_BYTE_ARRAY;
655    byte [] endKey = HConstants.EMPTY_BYTE_ARRAY;
656    long interval = namespaceSpan / count;
657    HRegionInfo [] hris = new HRegionInfo[count];
658    for (int i = 0; i < count; i++) {
659      if (i == 0) {
660        endKey = format(interval);
661      } else {
662        startKey = endKey;
663        if (i == count - 1) endKey = HConstants.EMPTY_BYTE_ARRAY;
664        else endKey = format((i + 1) * interval);
665      }
666      hris[i] = new HRegionInfo(TableName.valueOf(tableName), startKey, endKey);
667    }
668    return hris;
669  }
670
671  /**
672   * @param count
673   * @return Return <code>count</code> servernames.
674   */
675  private static ServerName [] makeServerNames(final int count) {
676    ServerName [] sns = new ServerName[count];
677    for (int i = 0; i < count; i++) {
678      sns[i] = ServerName.valueOf("" + i + ".example.org", 16010, i);
679    }
680    return sns;
681  }
682
683  /**
684   * Comparator for meta row keys.
685   */
686  private static class MetaRowsComparator implements Comparator<byte []> {
687    private final CellComparatorImpl delegate = MetaCellComparator.META_COMPARATOR;
688    @Override
689    public int compare(byte[] left, byte[] right) {
690      return delegate.compareRows(new KeyValue.KeyOnlyKeyValue(left), right, 0, right.length);
691    }
692  }
693
694  /**
695   * Create up a map that is keyed by meta row name and whose value is the HRegionInfo and
696   * ServerName to return for this row.
697   * @return Map with faked hbase:meta content in it.
698   */
699  static SortedMap<byte [], Pair<HRegionInfo, ServerName>> makeMeta(final byte [] tableName,
700      final int regionCount, final long namespaceSpan, final int serverCount) {
701    // I need a comparator for meta rows so we sort properly.
702    SortedMap<byte [], Pair<HRegionInfo, ServerName>> meta =
703      new ConcurrentSkipListMap<>(new MetaRowsComparator());
704    HRegionInfo [] hris = makeHRegionInfos(tableName, regionCount, namespaceSpan);
705    ServerName [] serverNames = makeServerNames(serverCount);
706    int per = regionCount / serverCount;
707    int count = 0;
708    for (HRegionInfo hri: hris) {
709      Pair<HRegionInfo, ServerName> p = new Pair<>(hri, serverNames[count++ / per]);
710      meta.put(hri.getRegionName(), p);
711    }
712    return meta;
713  }
714
715  /**
716   * Code for each 'client' to run.
717   *
718   * @param id
719   * @param c
720   * @param sharedConnection
721   * @throws IOException
722   */
723  static void cycle(int id, final Configuration c, final Connection sharedConnection) throws IOException {
724    long namespaceSpan = c.getLong("hbase.test.namespace.span", 1000000);
725    long startTime = System.currentTimeMillis();
726    final int printInterval = 100000;
727    Random rd = new Random(id);
728    boolean get = c.getBoolean("hbase.test.do.gets", false);
729    TableName tableName = TableName.valueOf(BIG_USER_TABLE);
730    if (get) {
731      try (Table table = sharedConnection.getTable(tableName)){
732        Stopwatch stopWatch = Stopwatch.createStarted();
733        for (int i = 0; i < namespaceSpan; i++) {
734          byte [] b = format(rd.nextLong());
735          Get g = new Get(b);
736          table.get(g);
737          if (i % printInterval == 0) {
738            LOG.info("Get " + printInterval + "/" + stopWatch.elapsed(java.util.concurrent.TimeUnit.MILLISECONDS));
739            stopWatch.reset();
740            stopWatch.start();
741          }
742        }
743        LOG.info("Finished a cycle putting " + namespaceSpan + " in " +
744            (System.currentTimeMillis() - startTime) + "ms");
745      }
746    } else {
747      try (BufferedMutator mutator = sharedConnection.getBufferedMutator(tableName)) {
748        Stopwatch stopWatch = Stopwatch.createStarted();
749        for (int i = 0; i < namespaceSpan; i++) {
750          byte [] b = format(rd.nextLong());
751          Put p = new Put(b);
752          p.addColumn(HConstants.CATALOG_FAMILY, b, b);
753          mutator.mutate(p);
754          if (i % printInterval == 0) {
755            LOG.info("Put " + printInterval + "/" + stopWatch.elapsed(java.util.concurrent.TimeUnit.MILLISECONDS));
756            stopWatch.reset();
757            stopWatch.start();
758          }
759        }
760        LOG.info("Finished a cycle putting " + namespaceSpan + " in " +
761            (System.currentTimeMillis() - startTime) + "ms");
762        }
763    }
764  }
765
766  @Override
767  public int run(String[] arg0) throws Exception {
768    int errCode = 0;
769    // TODO: Make command options.
770    // How many servers to fake.
771    final int servers = 1;
772    // How many regions to put on the faked servers.
773    final int regions = 100000;
774    // How many 'keys' in the faked regions.
775    final long namespaceSpan = 50000000;
776    // How long to take to pause after doing a put; make this long if you want to fake a struggling
777    // server.
778    final long multiPause = 0;
779    // Check args make basic sense.
780    if ((namespaceSpan < regions) || (regions < servers)) {
781      throw new IllegalArgumentException("namespaceSpan=" + namespaceSpan + " must be > regions=" +
782        regions + " which must be > servers=" + servers);
783    }
784
785    // Set my many servers and many regions faking connection in place.
786    getConf().set("hbase.client.connection.impl",
787      ManyServersManyRegionsConnection.class.getName());
788    // Use simple kv registry rather than zk
789    getConf().set("hbase.client.registry.impl", SimpleRegistry.class.getName());
790    // When to report fails.  Default is we report the 10th.  This means we'll see log everytime
791    // an exception is thrown -- usually RegionTooBusyException when we have more than
792    // hbase.test.multi.too.many requests outstanding at any time.
793    getConf().setInt("hbase.client.start.log.errors.counter", 0);
794
795    // Ugly but this is only way to pass in configs.into ManyServersManyRegionsConnection class.
796    getConf().setInt("hbase.test.regions", regions);
797    getConf().setLong("hbase.test.namespace.span", namespaceSpan);
798    getConf().setLong("hbase.test.servers", servers);
799    getConf().set("hbase.test.tablename", Bytes.toString(BIG_USER_TABLE));
800    getConf().setLong("hbase.test.multi.pause.when.done", multiPause);
801    // Let there be ten outstanding requests at a time before we throw RegionBusyException.
802    getConf().setInt("hbase.test.multi.too.many", 10);
803    final int clients = 2;
804
805    // Have them all share the same connection so they all share the same instance of
806    // ManyServersManyRegionsConnection so I can keep an eye on how many requests by server.
807    final ExecutorService pool = Executors.newCachedThreadPool(
808      new ThreadFactoryBuilder().setNameFormat("p-pool-%d")
809        .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
810      // Executors.newFixedThreadPool(servers * 10, Threads.getNamedThreadFactory("p"));
811    // Share a connection so I can keep counts in the 'server' on concurrency.
812    final Connection sharedConnection = ConnectionFactory.createConnection(getConf()/*, pool*/);
813    try {
814      Thread [] ts = new Thread[clients];
815      for (int j = 0; j < ts.length; j++) {
816        final int id = j;
817        ts[j] = new Thread("" + j) {
818          final Configuration c = getConf();
819
820          @Override
821          public void run() {
822            try {
823              cycle(id, c, sharedConnection);
824            } catch (IOException e) {
825              e.printStackTrace();
826            }
827          }
828        };
829        ts[j].start();
830      }
831      for (int j = 0; j < ts.length; j++) {
832        ts[j].join();
833      }
834    } finally {
835      sharedConnection.close();
836    }
837    return errCode;
838  }
839
840  /**
841   * Run a client instance against a faked up server.
842   * @param args TODO
843   * @throws Exception
844   */
845  public static void main(String[] args) throws Exception {
846    System.exit(ToolRunner.run(HBaseConfiguration.create(), new TestClientNoCluster(), args));
847  }
848}