001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.junit.Assert.assertTrue;
021import static org.junit.Assert.fail;
022
023import java.io.IOException;
024import java.net.SocketTimeoutException;
025import java.util.Comparator;
026import java.util.HashMap;
027import java.util.Map;
028import java.util.Objects;
029import java.util.Random;
030import java.util.SortedMap;
031import java.util.concurrent.CompletableFuture;
032import java.util.concurrent.ConcurrentSkipListMap;
033import java.util.concurrent.ExecutorService;
034import java.util.concurrent.Executors;
035import java.util.concurrent.atomic.AtomicInteger;
036import java.util.concurrent.atomic.AtomicLong;
037import org.apache.commons.lang3.NotImplementedException;
038import org.apache.hadoop.conf.Configuration;
039import org.apache.hadoop.conf.Configured;
040import org.apache.hadoop.hbase.CellComparatorImpl;
041import org.apache.hadoop.hbase.DoNotRetryIOException;
042import org.apache.hadoop.hbase.HBaseClassTestRule;
043import org.apache.hadoop.hbase.HBaseConfiguration;
044import org.apache.hadoop.hbase.HConstants;
045import org.apache.hadoop.hbase.HRegionInfo;
046import org.apache.hadoop.hbase.HRegionLocation;
047import org.apache.hadoop.hbase.KeyValue;
048import org.apache.hadoop.hbase.MetaTableAccessor;
049import org.apache.hadoop.hbase.RegionLocations;
050import org.apache.hadoop.hbase.RegionTooBusyException;
051import org.apache.hadoop.hbase.ServerName;
052import org.apache.hadoop.hbase.TableName;
053import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
054import org.apache.hadoop.hbase.security.User;
055import org.apache.hadoop.hbase.testclassification.ClientTests;
056import org.apache.hadoop.hbase.testclassification.SmallTests;
057import org.apache.hadoop.hbase.util.Bytes;
058import org.apache.hadoop.hbase.util.Pair;
059import org.apache.hadoop.hbase.util.Threads;
060import org.apache.hadoop.util.Tool;
061import org.apache.hadoop.util.ToolRunner;
062import org.junit.Before;
063import org.junit.ClassRule;
064import org.junit.Ignore;
065import org.junit.Test;
066import org.junit.experimental.categories.Category;
067import org.mockito.Mockito;
068import org.slf4j.Logger;
069import org.slf4j.LoggerFactory;
070
071import org.apache.hbase.thirdparty.com.google.common.base.Stopwatch;
072import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
073import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
074import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
075import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
076
077import org.apache.hadoop.hbase.shaded.protobuf.generated.CellProtos;
078import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
079import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
080import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileResponse;
081import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadRequest;
082import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadResponse;
083import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
084import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService.BlockingInterface;
085import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
086import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
087import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest;
088import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetResponse;
089import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest;
090import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiResponse;
091import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest;
092import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse;
093import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadRequest;
094import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadResponse;
095import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction;
096import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionActionResult;
097import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ResultOrException;
098import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
099import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
100import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
101
102/**
103 * Test client behavior w/o setting up a cluster.
104 * Mock up cluster emissions.
105 */
106@Category({ClientTests.class, SmallTests.class})
107public class TestClientNoCluster extends Configured implements Tool {
108
109  @ClassRule
110  public static final HBaseClassTestRule CLASS_RULE =
111      HBaseClassTestRule.forClass(TestClientNoCluster.class);
112
113  private static final Logger LOG = LoggerFactory.getLogger(TestClientNoCluster.class);
114  private Configuration conf;
115  public static final ServerName META_SERVERNAME =
116      ServerName.valueOf("meta.example.org", 16010, 12345);
117
118  @Before
119  public void setUp() throws Exception {
120    this.conf = HBaseConfiguration.create();
121    // Run my Connection overrides.  Use my little ConnectionImplementation below which
122    // allows me insert mocks and also use my Registry below rather than the default zk based
123    // one so tests run faster and don't have zk dependency.
124    this.conf.set("hbase.client.registry.impl", SimpleRegistry.class.getName());
125  }
126
127  /**
128   * Simple cluster registry inserted in place of our usual zookeeper based one.
129   */
130  static class SimpleRegistry extends DoNothingConnectionRegistry {
131    final ServerName META_HOST = META_SERVERNAME;
132
133    public SimpleRegistry(Configuration conf) {
134      super(conf);
135    }
136
137    @Override
138    public CompletableFuture<RegionLocations> getMetaRegionLocations() {
139      return CompletableFuture.completedFuture(new RegionLocations(
140          new HRegionLocation(RegionInfoBuilder.FIRST_META_REGIONINFO, META_HOST)));
141    }
142
143    @Override
144    public CompletableFuture<String> getClusterId() {
145      return CompletableFuture.completedFuture(HConstants.CLUSTER_ID_DEFAULT);
146    }
147  }
148
149  /**
150   * Remove the @Ignore to try out timeout and retry asettings
151   * @throws IOException
152   */
153  @Ignore
154  @Test
155  public void testTimeoutAndRetries() throws IOException {
156    Configuration localConfig = HBaseConfiguration.create(this.conf);
157    // This override mocks up our exists/get call to throw a RegionServerStoppedException.
158    localConfig.set("hbase.client.connection.impl", RpcTimeoutConnection.class.getName());
159    Connection connection = ConnectionFactory.createConnection(localConfig);
160    Table table = connection.getTable(TableName.META_TABLE_NAME);
161    Throwable t = null;
162    LOG.info("Start");
163    try {
164      // An exists call turns into a get w/ a flag.
165      table.exists(new Get(Bytes.toBytes("abc")));
166    } catch (SocketTimeoutException e) {
167      // I expect this exception.
168      LOG.info("Got expected exception", e);
169      t = e;
170    } catch (RetriesExhaustedException e) {
171      // This is the old, unwanted behavior.  If we get here FAIL!!!
172      fail();
173    } finally {
174      table.close();
175    }
176    connection.close();
177    LOG.info("Stop");
178    assertTrue(t != null);
179  }
180
181  /**
182   * Test that operation timeout prevails over rpc default timeout and retries, etc.
183   * @throws IOException
184   */
185  @Test
186  public void testRpcTimeout() throws IOException {
187    Configuration localConfig = HBaseConfiguration.create(this.conf);
188    // This override mocks up our exists/get call to throw a RegionServerStoppedException.
189    localConfig.set("hbase.client.connection.impl", RpcTimeoutConnection.class.getName());
190    int pause = 10;
191    localConfig.setInt("hbase.client.pause", pause);
192    localConfig.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 10);
193    // Set the operation timeout to be < the pause.  Expectation is that after first pause, we will
194    // fail out of the rpc because the rpc timeout will have been set to the operation tiemout
195    // and it has expired.  Otherwise, if this functionality is broke, all retries will be run --
196    // all ten of them -- and we'll get the RetriesExhaustedException exception.
197    localConfig.setInt(HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT, pause - 1);
198    Connection connection = ConnectionFactory.createConnection(localConfig);
199    Table table = connection.getTable(TableName.META_TABLE_NAME);
200    Throwable t = null;
201    try {
202      // An exists call turns into a get w/ a flag.
203      table.exists(new Get(Bytes.toBytes("abc")));
204    } catch (SocketTimeoutException e) {
205      // I expect this exception.
206      LOG.info("Got expected exception", e);
207      t = e;
208    } catch (RetriesExhaustedException e) {
209      // This is the old, unwanted behavior.  If we get here FAIL!!!
210      fail();
211    } finally {
212      table.close();
213      connection.close();
214    }
215    assertTrue(t != null);
216  }
217
218  @Test
219  public void testDoNotRetryMetaTableAccessor() throws IOException {
220    this.conf.set("hbase.client.connection.impl",
221      RegionServerStoppedOnScannerOpenConnection.class.getName());
222    try (Connection connection = ConnectionFactory.createConnection(conf)) {
223      MetaTableAccessor.fullScanRegions(connection);
224    }
225  }
226
227  @Test
228  public void testDoNotRetryOnScanNext() throws IOException {
229    this.conf.set("hbase.client.connection.impl",
230      RegionServerStoppedOnScannerOpenConnection.class.getName());
231    // Go against meta else we will try to find first region for the table on construction which
232    // means we'll have to do a bunch more mocking.  Tests that go against meta only should be
233    // good for a bit of testing.
234    Connection connection = ConnectionFactory.createConnection(this.conf);
235    Table table = connection.getTable(TableName.META_TABLE_NAME);
236    ResultScanner scanner = table.getScanner(HConstants.CATALOG_FAMILY);
237    try {
238      Result result = null;
239      while ((result = scanner.next()) != null) {
240        LOG.info(Objects.toString(result));
241      }
242    } finally {
243      scanner.close();
244      table.close();
245      connection.close();
246    }
247  }
248
249  @Test
250  public void testRegionServerStoppedOnScannerOpen() throws IOException {
251    this.conf.set("hbase.client.connection.impl",
252      RegionServerStoppedOnScannerOpenConnection.class.getName());
253    // Go against meta else we will try to find first region for the table on construction which
254    // means we'll have to do a bunch more mocking.  Tests that go against meta only should be
255    // good for a bit of testing.
256    Connection connection = ConnectionFactory.createConnection(conf);
257    Table table = connection.getTable(TableName.META_TABLE_NAME);
258    ResultScanner scanner = table.getScanner(HConstants.CATALOG_FAMILY);
259    try {
260      Result result = null;
261      while ((result = scanner.next()) != null) {
262        LOG.info(Objects.toString(result));
263      }
264    } finally {
265      scanner.close();
266      table.close();
267      connection.close();
268    }
269  }
270
271  @Test
272  public void testConnectionClosedOnRegionLocate() throws IOException {
273    Configuration testConf = new Configuration(this.conf);
274    testConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
275    // Go against meta else we will try to find first region for the table on construction which
276    // means we'll have to do a bunch more mocking. Tests that go against meta only should be
277    // good for a bit of testing.
278    Connection connection = ConnectionFactory.createConnection(testConf);
279    Table table = connection.getTable(TableName.META_TABLE_NAME);
280    connection.close();
281    try {
282      Get get = new Get(Bytes.toBytes("dummyRow"));
283      table.get(get);
284      fail("Should have thrown DoNotRetryException but no exception thrown");
285    } catch (Exception e) {
286      if (!(e instanceof DoNotRetryIOException)) {
287        String errMsg =
288            "Should have thrown DoNotRetryException but actually " + e.getClass().getSimpleName();
289        LOG.error(errMsg, e);
290        fail(errMsg);
291      }
292    } finally {
293      table.close();
294    }
295  }
296
297  /**
298   * Override to shutdown going to zookeeper for cluster id and meta location.
299   */
300  static class RegionServerStoppedOnScannerOpenConnection
301  extends ConnectionImplementation {
302    final ClientService.BlockingInterface stub;
303
304    RegionServerStoppedOnScannerOpenConnection(Configuration conf,
305        ExecutorService pool, User user) throws IOException {
306      super(conf, pool, user);
307      // Mock up my stub so open scanner returns a scanner id and then on next, we throw
308      // exceptions for three times and then after that, we return no more to scan.
309      this.stub = Mockito.mock(ClientService.BlockingInterface.class);
310      long sid = 12345L;
311      try {
312        Mockito.when(stub.scan((RpcController)Mockito.any(),
313            (ClientProtos.ScanRequest)Mockito.any())).
314          thenReturn(ClientProtos.ScanResponse.newBuilder().setScannerId(sid).build()).
315          thenThrow(new ServiceException(new RegionServerStoppedException("From Mockito"))).
316          thenReturn(ClientProtos.ScanResponse.newBuilder().setScannerId(sid).
317              setMoreResults(false).build());
318      } catch (ServiceException e) {
319        throw new IOException(e);
320      }
321    }
322
323    @Override
324    public BlockingInterface getClient(ServerName sn) throws IOException {
325      return this.stub;
326    }
327  }
328
329  /**
330   * Override to check we are setting rpc timeout right.
331   */
332  static class RpcTimeoutConnection
333  extends ConnectionImplementation {
334    final ClientService.BlockingInterface stub;
335
336    RpcTimeoutConnection(Configuration conf, ExecutorService pool, User user)
337    throws IOException {
338      super(conf, pool, user);
339      // Mock up my stub so an exists call -- which turns into a get -- throws an exception
340      this.stub = Mockito.mock(ClientService.BlockingInterface.class);
341      try {
342        Mockito.when(stub.get((RpcController)Mockito.any(),
343            (ClientProtos.GetRequest)Mockito.any())).
344          thenThrow(new ServiceException(new RegionServerStoppedException("From Mockito")));
345      } catch (ServiceException e) {
346        throw new IOException(e);
347      }
348    }
349
350    @Override
351    public BlockingInterface getClient(ServerName sn) throws IOException {
352      return this.stub;
353    }
354  }
355
356  /**
357   * Fake many regionservers and many regions on a connection implementation.
358   */
359  static class ManyServersManyRegionsConnection
360  extends ConnectionImplementation {
361    // All access should be synchronized
362    final Map<ServerName, ClientService.BlockingInterface> serversByClient;
363
364    /**
365     * Map of faked-up rows of a 'meta table'.
366     */
367    final SortedMap<byte [], Pair<HRegionInfo, ServerName>> meta;
368    final AtomicLong sequenceids = new AtomicLong(0);
369    private final Configuration conf;
370
371    ManyServersManyRegionsConnection(Configuration conf,
372        ExecutorService pool, User user)
373    throws IOException {
374      super(conf, pool, user);
375      int serverCount = conf.getInt("hbase.test.servers", 10);
376      this.serversByClient = new HashMap<>(serverCount);
377      this.meta = makeMeta(Bytes.toBytes(
378        conf.get("hbase.test.tablename", Bytes.toString(BIG_USER_TABLE))),
379        conf.getInt("hbase.test.regions", 100),
380        conf.getLong("hbase.test.namespace.span", 1000),
381        serverCount);
382      this.conf = conf;
383    }
384
385    @Override
386    public ClientService.BlockingInterface getClient(ServerName sn) throws IOException {
387      // if (!sn.toString().startsWith("meta")) LOG.info(sn);
388      ClientService.BlockingInterface stub = null;
389      synchronized (this.serversByClient) {
390        stub = this.serversByClient.get(sn);
391        if (stub == null) {
392          stub = new FakeServer(this.conf, meta, sequenceids);
393          this.serversByClient.put(sn, stub);
394        }
395      }
396      return stub;
397    }
398  }
399
400  static MultiResponse doMultiResponse(final SortedMap<byte [], Pair<HRegionInfo, ServerName>> meta,
401      final AtomicLong sequenceids, final MultiRequest request) {
402    // Make a response to match the request.  Act like there were no failures.
403    ClientProtos.MultiResponse.Builder builder = ClientProtos.MultiResponse.newBuilder();
404    // Per Region.
405    RegionActionResult.Builder regionActionResultBuilder =
406        RegionActionResult.newBuilder();
407    ResultOrException.Builder roeBuilder = ResultOrException.newBuilder();
408    for (RegionAction regionAction: request.getRegionActionList()) {
409      regionActionResultBuilder.clear();
410      // Per Action in a Region.
411      for (ClientProtos.Action action: regionAction.getActionList()) {
412        roeBuilder.clear();
413        // Return empty Result and proper index as result.
414        roeBuilder.setResult(ClientProtos.Result.getDefaultInstance());
415        roeBuilder.setIndex(action.getIndex());
416        regionActionResultBuilder.addResultOrException(roeBuilder.build());
417      }
418      builder.addRegionActionResult(regionActionResultBuilder.build());
419    }
420    return builder.build();
421  }
422
423  /**
424   * Fake 'server'.
425   * Implements the ClientService responding as though it were a 'server' (presumes a new
426   * ClientService.BlockingInterface made per server).
427   */
428  static class FakeServer implements ClientService.BlockingInterface {
429    private AtomicInteger multiInvocationsCount = new AtomicInteger(0);
430    private final SortedMap<byte [], Pair<HRegionInfo, ServerName>> meta;
431    private final AtomicLong sequenceids;
432    private final long multiPause;
433    private final int tooManyMultiRequests;
434
435    FakeServer(final Configuration c, final SortedMap<byte [], Pair<HRegionInfo, ServerName>> meta,
436        final AtomicLong sequenceids) {
437      this.meta = meta;
438      this.sequenceids = sequenceids;
439
440      // Pause to simulate the server taking time applying the edits.  This will drive up the
441      // number of threads used over in client.
442      this.multiPause = c.getLong("hbase.test.multi.pause.when.done", 0);
443      this.tooManyMultiRequests = c.getInt("hbase.test.multi.too.many", 3);
444    }
445
446    @Override
447    public GetResponse get(RpcController controller, GetRequest request)
448    throws ServiceException {
449      boolean metaRegion = isMetaRegion(request.getRegion().getValue().toByteArray(),
450        request.getRegion().getType());
451      if (!metaRegion) {
452        return doGetResponse(request);
453      }
454      return doMetaGetResponse(meta, request);
455    }
456
457    private GetResponse doGetResponse(GetRequest request) {
458      ClientProtos.Result.Builder resultBuilder = ClientProtos.Result.newBuilder();
459      ByteString row = request.getGet().getRow();
460      resultBuilder.addCell(getStartCode(row));
461      GetResponse.Builder builder = GetResponse.newBuilder();
462      builder.setResult(resultBuilder.build());
463      return builder.build();
464    }
465
466    @Override
467    public MutateResponse mutate(RpcController controller,
468        MutateRequest request) throws ServiceException {
469      throw new NotImplementedException(HConstants.NOT_IMPLEMENTED);
470    }
471
472    @Override
473    public ScanResponse scan(RpcController controller,
474        ScanRequest request) throws ServiceException {
475      // Presume it is a scan of meta for now. Not all scans provide a region spec expecting
476      // the server to keep reference by scannerid.  TODO.
477      return doMetaScanResponse(meta, sequenceids, request);
478    }
479
480    @Override
481    public BulkLoadHFileResponse bulkLoadHFile(
482        RpcController controller, BulkLoadHFileRequest request)
483        throws ServiceException {
484      throw new NotImplementedException(HConstants.NOT_IMPLEMENTED);
485    }
486
487    @Override
488    public CoprocessorServiceResponse execService(
489        RpcController controller, CoprocessorServiceRequest request)
490        throws ServiceException {
491      throw new NotImplementedException(HConstants.NOT_IMPLEMENTED);
492    }
493
494    @Override
495    public MultiResponse multi(RpcController controller, MultiRequest request)
496    throws ServiceException {
497      int concurrentInvocations = this.multiInvocationsCount.incrementAndGet();
498      try {
499        if (concurrentInvocations >= tooManyMultiRequests) {
500          throw new ServiceException(new RegionTooBusyException("concurrentInvocations=" +
501           concurrentInvocations));
502        }
503        Threads.sleep(multiPause);
504        return doMultiResponse(meta, sequenceids, request);
505      } finally {
506        this.multiInvocationsCount.decrementAndGet();
507      }
508    }
509
510    @Override
511    public CoprocessorServiceResponse execRegionServerService(RpcController controller,
512        CoprocessorServiceRequest request) throws ServiceException {
513      throw new NotImplementedException(HConstants.NOT_IMPLEMENTED);
514    }
515
516    @Override
517    public PrepareBulkLoadResponse prepareBulkLoad(RpcController controller,
518        PrepareBulkLoadRequest request) throws ServiceException {
519      throw new NotImplementedException(HConstants.NOT_IMPLEMENTED);
520    }
521
522    @Override
523    public CleanupBulkLoadResponse cleanupBulkLoad(RpcController controller,
524        CleanupBulkLoadRequest request) throws ServiceException {
525      throw new NotImplementedException(HConstants.NOT_IMPLEMENTED);
526    }
527  }
528
529  static ScanResponse doMetaScanResponse(final SortedMap<byte [], Pair<HRegionInfo, ServerName>> meta,
530      final AtomicLong sequenceids, final ScanRequest request) {
531    ScanResponse.Builder builder = ScanResponse.newBuilder();
532    int max = request.getNumberOfRows();
533    int count = 0;
534    Map<byte [], Pair<HRegionInfo, ServerName>> tail =
535      request.hasScan()? meta.tailMap(request.getScan().getStartRow().toByteArray()): meta;
536      ClientProtos.Result.Builder resultBuilder = ClientProtos.Result.newBuilder();
537    for (Map.Entry<byte [], Pair<HRegionInfo, ServerName>> e: tail.entrySet()) {
538      // Can be 0 on open of a scanner -- i.e. rpc to setup scannerid only.
539      if (max <= 0) break;
540      if (++count > max) break;
541      HRegionInfo hri = e.getValue().getFirst();
542      ByteString row = UnsafeByteOperations.unsafeWrap(hri.getRegionName());
543      resultBuilder.clear();
544      resultBuilder.addCell(getRegionInfo(row, hri));
545      resultBuilder.addCell(getServer(row, e.getValue().getSecond()));
546      resultBuilder.addCell(getStartCode(row));
547      builder.addResults(resultBuilder.build());
548      // Set more to false if we are on the last region in table.
549      if (hri.getEndKey().length <= 0) builder.setMoreResults(false);
550      else builder.setMoreResults(true);
551    }
552    // If no scannerid, set one.
553    builder.setScannerId(request.hasScannerId()?
554      request.getScannerId(): sequenceids.incrementAndGet());
555    return builder.build();
556  }
557
558  static GetResponse doMetaGetResponse(final SortedMap<byte [], Pair<HRegionInfo, ServerName>> meta,
559      final GetRequest request) {
560    ClientProtos.Result.Builder resultBuilder = ClientProtos.Result.newBuilder();
561    ByteString row = request.getGet().getRow();
562    Pair<HRegionInfo, ServerName> p = meta.get(row.toByteArray());
563    if (p != null) {
564      resultBuilder.addCell(getRegionInfo(row, p.getFirst()));
565      resultBuilder.addCell(getServer(row, p.getSecond()));
566    }
567    resultBuilder.addCell(getStartCode(row));
568    GetResponse.Builder builder = GetResponse.newBuilder();
569    builder.setResult(resultBuilder.build());
570    return builder.build();
571  }
572
573  /**
574   * @param name region name or encoded region name.
575   * @param type
576   * @return True if we are dealing with a hbase:meta region.
577   */
578  static boolean isMetaRegion(final byte [] name, final RegionSpecifierType type) {
579    switch (type) {
580    case REGION_NAME:
581      return Bytes.equals(HRegionInfo.FIRST_META_REGIONINFO.getRegionName(), name);
582    case ENCODED_REGION_NAME:
583      return Bytes.equals(HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes(), name);
584    default: throw new UnsupportedOperationException();
585    }
586  }
587
588  private final static ByteString CATALOG_FAMILY_BYTESTRING =
589      UnsafeByteOperations.unsafeWrap(HConstants.CATALOG_FAMILY);
590  private final static ByteString REGIONINFO_QUALIFIER_BYTESTRING =
591      UnsafeByteOperations.unsafeWrap(HConstants.REGIONINFO_QUALIFIER);
592  private final static ByteString SERVER_QUALIFIER_BYTESTRING =
593      UnsafeByteOperations.unsafeWrap(HConstants.SERVER_QUALIFIER);
594
595  static CellProtos.Cell.Builder getBaseCellBuilder(final ByteString row) {
596    CellProtos.Cell.Builder cellBuilder = CellProtos.Cell.newBuilder();
597    cellBuilder.setRow(row);
598    cellBuilder.setFamily(CATALOG_FAMILY_BYTESTRING);
599    cellBuilder.setTimestamp(System.currentTimeMillis());
600    return cellBuilder;
601  }
602
603  static CellProtos.Cell getRegionInfo(final ByteString row, final HRegionInfo hri) {
604    CellProtos.Cell.Builder cellBuilder = getBaseCellBuilder(row);
605    cellBuilder.setQualifier(REGIONINFO_QUALIFIER_BYTESTRING);
606    cellBuilder.setValue(UnsafeByteOperations.unsafeWrap(hri.toByteArray()));
607    return cellBuilder.build();
608  }
609
610  static CellProtos.Cell getServer(final ByteString row, final ServerName sn) {
611    CellProtos.Cell.Builder cellBuilder = getBaseCellBuilder(row);
612    cellBuilder.setQualifier(SERVER_QUALIFIER_BYTESTRING);
613    cellBuilder.setValue(ByteString.copyFromUtf8(sn.getHostAndPort()));
614    return cellBuilder.build();
615  }
616
617  static CellProtos.Cell getStartCode(final ByteString row) {
618    CellProtos.Cell.Builder cellBuilder = getBaseCellBuilder(row);
619    cellBuilder.setQualifier(UnsafeByteOperations.unsafeWrap(HConstants.STARTCODE_QUALIFIER));
620    // TODO:
621    cellBuilder.setValue(UnsafeByteOperations.unsafeWrap(
622        Bytes.toBytes(META_SERVERNAME.getStartcode())));
623    return cellBuilder.build();
624  }
625
626  private static final byte [] BIG_USER_TABLE = Bytes.toBytes("t");
627
628  /**
629   * Format passed integer.  Zero-pad.
630   * Copied from hbase-server PE class and small amendment.  Make them share.
631   * @param number
632   * @return Returns zero-prefixed 10-byte wide decimal version of passed
633   * number (Does absolute in case number is negative).
634   */
635  private static byte [] format(final long number) {
636    byte [] b = new byte[10];
637    long d = number;
638    for (int i = b.length - 1; i >= 0; i--) {
639      b[i] = (byte)((d % 10) + '0');
640      d /= 10;
641    }
642    return b;
643  }
644
645  /**
646   * @param count
647   * @param namespaceSpan
648   * @return <code>count</code> regions
649   */
650  private static HRegionInfo [] makeHRegionInfos(final byte [] tableName, final int count,
651      final long namespaceSpan) {
652    byte [] startKey = HConstants.EMPTY_BYTE_ARRAY;
653    byte [] endKey = HConstants.EMPTY_BYTE_ARRAY;
654    long interval = namespaceSpan / count;
655    HRegionInfo [] hris = new HRegionInfo[count];
656    for (int i = 0; i < count; i++) {
657      if (i == 0) {
658        endKey = format(interval);
659      } else {
660        startKey = endKey;
661        if (i == count - 1) endKey = HConstants.EMPTY_BYTE_ARRAY;
662        else endKey = format((i + 1) * interval);
663      }
664      hris[i] = new HRegionInfo(TableName.valueOf(tableName), startKey, endKey);
665    }
666    return hris;
667  }
668
669  /**
670   * @param count
671   * @return Return <code>count</code> servernames.
672   */
673  private static ServerName [] makeServerNames(final int count) {
674    ServerName [] sns = new ServerName[count];
675    for (int i = 0; i < count; i++) {
676      sns[i] = ServerName.valueOf("" + i + ".example.org", 16010, i);
677    }
678    return sns;
679  }
680
681  /**
682   * Comparator for meta row keys.
683   */
684  private static class MetaRowsComparator implements Comparator<byte []> {
685    private final CellComparatorImpl delegate = CellComparatorImpl.META_COMPARATOR;
686    @Override
687    public int compare(byte[] left, byte[] right) {
688      return delegate.compareRows(new KeyValue.KeyOnlyKeyValue(left), right, 0, right.length);
689    }
690  }
691
692  /**
693   * Create up a map that is keyed by meta row name and whose value is the HRegionInfo and
694   * ServerName to return for this row.
695   * @return Map with faked hbase:meta content in it.
696   */
697  static SortedMap<byte [], Pair<HRegionInfo, ServerName>> makeMeta(final byte [] tableName,
698      final int regionCount, final long namespaceSpan, final int serverCount) {
699    // I need a comparator for meta rows so we sort properly.
700    SortedMap<byte [], Pair<HRegionInfo, ServerName>> meta =
701      new ConcurrentSkipListMap<>(new MetaRowsComparator());
702    HRegionInfo [] hris = makeHRegionInfos(tableName, regionCount, namespaceSpan);
703    ServerName [] serverNames = makeServerNames(serverCount);
704    int per = regionCount / serverCount;
705    int count = 0;
706    for (HRegionInfo hri: hris) {
707      Pair<HRegionInfo, ServerName> p = new Pair<>(hri, serverNames[count++ / per]);
708      meta.put(hri.getRegionName(), p);
709    }
710    return meta;
711  }
712
713  /**
714   * Code for each 'client' to run.
715   *
716   * @param id
717   * @param c
718   * @param sharedConnection
719   * @throws IOException
720   */
721  static void cycle(int id, final Configuration c, final Connection sharedConnection) throws IOException {
722    long namespaceSpan = c.getLong("hbase.test.namespace.span", 1000000);
723    long startTime = System.currentTimeMillis();
724    final int printInterval = 100000;
725    Random rd = new Random(id);
726    boolean get = c.getBoolean("hbase.test.do.gets", false);
727    TableName tableName = TableName.valueOf(BIG_USER_TABLE);
728    if (get) {
729      try (Table table = sharedConnection.getTable(tableName)){
730        Stopwatch stopWatch = Stopwatch.createStarted();
731        for (int i = 0; i < namespaceSpan; i++) {
732          byte [] b = format(rd.nextLong());
733          Get g = new Get(b);
734          table.get(g);
735          if (i % printInterval == 0) {
736            LOG.info("Get " + printInterval + "/" + stopWatch.elapsed(java.util.concurrent.TimeUnit.MILLISECONDS));
737            stopWatch.reset();
738            stopWatch.start();
739          }
740        }
741        LOG.info("Finished a cycle putting " + namespaceSpan + " in " +
742            (System.currentTimeMillis() - startTime) + "ms");
743      }
744    } else {
745      try (BufferedMutator mutator = sharedConnection.getBufferedMutator(tableName)) {
746        Stopwatch stopWatch = Stopwatch.createStarted();
747        for (int i = 0; i < namespaceSpan; i++) {
748          byte [] b = format(rd.nextLong());
749          Put p = new Put(b);
750          p.addColumn(HConstants.CATALOG_FAMILY, b, b);
751          mutator.mutate(p);
752          if (i % printInterval == 0) {
753            LOG.info("Put " + printInterval + "/" + stopWatch.elapsed(java.util.concurrent.TimeUnit.MILLISECONDS));
754            stopWatch.reset();
755            stopWatch.start();
756          }
757        }
758        LOG.info("Finished a cycle putting " + namespaceSpan + " in " +
759            (System.currentTimeMillis() - startTime) + "ms");
760        }
761    }
762  }
763
764  @Override
765  public int run(String[] arg0) throws Exception {
766    int errCode = 0;
767    // TODO: Make command options.
768    // How many servers to fake.
769    final int servers = 1;
770    // How many regions to put on the faked servers.
771    final int regions = 100000;
772    // How many 'keys' in the faked regions.
773    final long namespaceSpan = 50000000;
774    // How long to take to pause after doing a put; make this long if you want to fake a struggling
775    // server.
776    final long multiPause = 0;
777    // Check args make basic sense.
778    if ((namespaceSpan < regions) || (regions < servers)) {
779      throw new IllegalArgumentException("namespaceSpan=" + namespaceSpan + " must be > regions=" +
780        regions + " which must be > servers=" + servers);
781    }
782
783    // Set my many servers and many regions faking connection in place.
784    getConf().set("hbase.client.connection.impl",
785      ManyServersManyRegionsConnection.class.getName());
786    // Use simple kv registry rather than zk
787    getConf().set("hbase.client.registry.impl", SimpleRegistry.class.getName());
788    // When to report fails.  Default is we report the 10th.  This means we'll see log everytime
789    // an exception is thrown -- usually RegionTooBusyException when we have more than
790    // hbase.test.multi.too.many requests outstanding at any time.
791    getConf().setInt("hbase.client.start.log.errors.counter", 0);
792
793    // Ugly but this is only way to pass in configs.into ManyServersManyRegionsConnection class.
794    getConf().setInt("hbase.test.regions", regions);
795    getConf().setLong("hbase.test.namespace.span", namespaceSpan);
796    getConf().setLong("hbase.test.servers", servers);
797    getConf().set("hbase.test.tablename", Bytes.toString(BIG_USER_TABLE));
798    getConf().setLong("hbase.test.multi.pause.when.done", multiPause);
799    // Let there be ten outstanding requests at a time before we throw RegionBusyException.
800    getConf().setInt("hbase.test.multi.too.many", 10);
801    final int clients = 2;
802
803    // Have them all share the same connection so they all share the same instance of
804    // ManyServersManyRegionsConnection so I can keep an eye on how many requests by server.
805    final ExecutorService pool = Executors.newCachedThreadPool(
806        Threads.newDaemonThreadFactory("p"));
807      // Executors.newFixedThreadPool(servers * 10, Threads.getNamedThreadFactory("p"));
808    // Share a connection so I can keep counts in the 'server' on concurrency.
809    final Connection sharedConnection = ConnectionFactory.createConnection(getConf()/*, pool*/);
810    try {
811      Thread [] ts = new Thread[clients];
812      for (int j = 0; j < ts.length; j++) {
813        final int id = j;
814        ts[j] = new Thread("" + j) {
815          final Configuration c = getConf();
816
817          @Override
818          public void run() {
819            try {
820              cycle(id, c, sharedConnection);
821            } catch (IOException e) {
822              e.printStackTrace();
823            }
824          }
825        };
826        ts[j].start();
827      }
828      for (int j = 0; j < ts.length; j++) {
829        ts[j].join();
830      }
831    } finally {
832      sharedConnection.close();
833    }
834    return errCode;
835  }
836
837  /**
838   * Run a client instance against a faked up server.
839   * @param args TODO
840   * @throws Exception
841   */
842  public static void main(String[] args) throws Exception {
843    System.exit(ToolRunner.run(HBaseConfiguration.create(), new TestClientNoCluster(), args));
844  }
845}