001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.junit.Assert.assertTrue;
021import static org.junit.Assert.fail;
022
023import java.io.IOException;
024import java.net.SocketTimeoutException;
025import java.util.Comparator;
026import java.util.HashMap;
027import java.util.Map;
028import java.util.Objects;
029import java.util.Random;
030import java.util.SortedMap;
031import java.util.concurrent.CompletableFuture;
032import java.util.concurrent.ConcurrentSkipListMap;
033import java.util.concurrent.ExecutorService;
034import java.util.concurrent.Executors;
035import java.util.concurrent.atomic.AtomicInteger;
036import java.util.concurrent.atomic.AtomicLong;
037import org.apache.commons.lang3.NotImplementedException;
038import org.apache.hadoop.conf.Configuration;
039import org.apache.hadoop.conf.Configured;
040import org.apache.hadoop.hbase.CellComparatorImpl;
041import org.apache.hadoop.hbase.DoNotRetryIOException;
042import org.apache.hadoop.hbase.HBaseClassTestRule;
043import org.apache.hadoop.hbase.HBaseConfiguration;
044import org.apache.hadoop.hbase.HConstants;
045import org.apache.hadoop.hbase.HRegionInfo;
046import org.apache.hadoop.hbase.HRegionLocation;
047import org.apache.hadoop.hbase.KeyValue;
048import org.apache.hadoop.hbase.MetaTableAccessor;
049import org.apache.hadoop.hbase.RegionLocations;
050import org.apache.hadoop.hbase.RegionTooBusyException;
051import org.apache.hadoop.hbase.ServerName;
052import org.apache.hadoop.hbase.TableName;
053import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
054import org.apache.hadoop.hbase.security.User;
055import org.apache.hadoop.hbase.testclassification.ClientTests;
056import org.apache.hadoop.hbase.testclassification.SmallTests;
057import org.apache.hadoop.hbase.util.Bytes;
058import org.apache.hadoop.hbase.util.Pair;
059import org.apache.hadoop.hbase.util.Threads;
060import org.apache.hadoop.util.Tool;
061import org.apache.hadoop.util.ToolRunner;
062import org.junit.Before;
063import org.junit.ClassRule;
064import org.junit.Ignore;
065import org.junit.Test;
066import org.junit.experimental.categories.Category;
067import org.mockito.Mockito;
068import org.slf4j.Logger;
069import org.slf4j.LoggerFactory;
070
071import org.apache.hbase.thirdparty.com.google.common.base.Stopwatch;
072import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
073import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
074import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
075import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
076
077import org.apache.hadoop.hbase.shaded.protobuf.generated.CellProtos;
078import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
079import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
080import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileResponse;
081import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadRequest;
082import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadResponse;
083import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
084import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService.BlockingInterface;
085import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
086import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
087import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest;
088import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetResponse;
089import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest;
090import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiResponse;
091import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest;
092import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse;
093import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadRequest;
094import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadResponse;
095import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction;
096import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionActionResult;
097import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ResultOrException;
098import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
099import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
100import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
101
102/**
103 * Test client behavior w/o setting up a cluster.
104 * Mock up cluster emissions.
105 */
106@Category({ClientTests.class, SmallTests.class})
107public class TestClientNoCluster extends Configured implements Tool {
108
109  @ClassRule
110  public static final HBaseClassTestRule CLASS_RULE =
111      HBaseClassTestRule.forClass(TestClientNoCluster.class);
112
113  private static final Logger LOG = LoggerFactory.getLogger(TestClientNoCluster.class);
114  private Configuration conf;
115  public static final ServerName META_SERVERNAME =
116      ServerName.valueOf("meta.example.org", 16010, 12345);
117
118  @Before
119  public void setUp() throws Exception {
120    this.conf = HBaseConfiguration.create();
121    // Run my Connection overrides.  Use my little ConnectionImplementation below which
122    // allows me insert mocks and also use my Registry below rather than the default zk based
123    // one so tests run faster and don't have zk dependency.
124    this.conf.set("hbase.client.registry.impl", SimpleRegistry.class.getName());
125  }
126
127  /**
128   * Simple cluster registry inserted in place of our usual zookeeper based one.
129   */
130  static class SimpleRegistry extends DoNothingAsyncRegistry {
131    final ServerName META_HOST = META_SERVERNAME;
132
133    public SimpleRegistry(Configuration conf) {
134      super(conf);
135    }
136
137    @Override
138    public CompletableFuture<RegionLocations> getMetaRegionLocation() {
139      return CompletableFuture.completedFuture(new RegionLocations(
140          new HRegionLocation(RegionInfoBuilder.FIRST_META_REGIONINFO, META_HOST)));
141    }
142
143    @Override
144    public CompletableFuture<String> getClusterId() {
145      return CompletableFuture.completedFuture(HConstants.CLUSTER_ID_DEFAULT);
146    }
147
148    @Override
149    public CompletableFuture<Integer> getCurrentNrHRS() {
150      return CompletableFuture.completedFuture(1);
151    }
152  }
153
154  /**
155   * Remove the @Ignore to try out timeout and retry asettings
156   * @throws IOException
157   */
158  @Ignore
159  @Test
160  public void testTimeoutAndRetries() throws IOException {
161    Configuration localConfig = HBaseConfiguration.create(this.conf);
162    // This override mocks up our exists/get call to throw a RegionServerStoppedException.
163    localConfig.set("hbase.client.connection.impl", RpcTimeoutConnection.class.getName());
164    Connection connection = ConnectionFactory.createConnection(localConfig);
165    Table table = connection.getTable(TableName.META_TABLE_NAME);
166    Throwable t = null;
167    LOG.info("Start");
168    try {
169      // An exists call turns into a get w/ a flag.
170      table.exists(new Get(Bytes.toBytes("abc")));
171    } catch (SocketTimeoutException e) {
172      // I expect this exception.
173      LOG.info("Got expected exception", e);
174      t = e;
175    } catch (RetriesExhaustedException e) {
176      // This is the old, unwanted behavior.  If we get here FAIL!!!
177      fail();
178    } finally {
179      table.close();
180    }
181    connection.close();
182    LOG.info("Stop");
183    assertTrue(t != null);
184  }
185
186  /**
187   * Test that operation timeout prevails over rpc default timeout and retries, etc.
188   * @throws IOException
189   */
190  @Test
191  public void testRpcTimeout() throws IOException {
192    Configuration localConfig = HBaseConfiguration.create(this.conf);
193    // This override mocks up our exists/get call to throw a RegionServerStoppedException.
194    localConfig.set("hbase.client.connection.impl", RpcTimeoutConnection.class.getName());
195    int pause = 10;
196    localConfig.setInt("hbase.client.pause", pause);
197    localConfig.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 10);
198    // Set the operation timeout to be < the pause.  Expectation is that after first pause, we will
199    // fail out of the rpc because the rpc timeout will have been set to the operation tiemout
200    // and it has expired.  Otherwise, if this functionality is broke, all retries will be run --
201    // all ten of them -- and we'll get the RetriesExhaustedException exception.
202    localConfig.setInt(HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT, pause - 1);
203    Connection connection = ConnectionFactory.createConnection(localConfig);
204    Table table = connection.getTable(TableName.META_TABLE_NAME);
205    Throwable t = null;
206    try {
207      // An exists call turns into a get w/ a flag.
208      table.exists(new Get(Bytes.toBytes("abc")));
209    } catch (SocketTimeoutException e) {
210      // I expect this exception.
211      LOG.info("Got expected exception", e);
212      t = e;
213    } catch (RetriesExhaustedException e) {
214      // This is the old, unwanted behavior.  If we get here FAIL!!!
215      fail();
216    } finally {
217      table.close();
218      connection.close();
219    }
220    assertTrue(t != null);
221  }
222
223  @Test
224  public void testDoNotRetryMetaTableAccessor() throws IOException {
225    this.conf.set("hbase.client.connection.impl",
226      RegionServerStoppedOnScannerOpenConnection.class.getName());
227    try (Connection connection = ConnectionFactory.createConnection(conf)) {
228      MetaTableAccessor.fullScanRegions(connection);
229    }
230  }
231
232  @Test
233  public void testDoNotRetryOnScanNext() throws IOException {
234    this.conf.set("hbase.client.connection.impl",
235      RegionServerStoppedOnScannerOpenConnection.class.getName());
236    // Go against meta else we will try to find first region for the table on construction which
237    // means we'll have to do a bunch more mocking.  Tests that go against meta only should be
238    // good for a bit of testing.
239    Connection connection = ConnectionFactory.createConnection(this.conf);
240    Table table = connection.getTable(TableName.META_TABLE_NAME);
241    ResultScanner scanner = table.getScanner(HConstants.CATALOG_FAMILY);
242    try {
243      Result result = null;
244      while ((result = scanner.next()) != null) {
245        LOG.info(Objects.toString(result));
246      }
247    } finally {
248      scanner.close();
249      table.close();
250      connection.close();
251    }
252  }
253
254  @Test
255  public void testRegionServerStoppedOnScannerOpen() throws IOException {
256    this.conf.set("hbase.client.connection.impl",
257      RegionServerStoppedOnScannerOpenConnection.class.getName());
258    // Go against meta else we will try to find first region for the table on construction which
259    // means we'll have to do a bunch more mocking.  Tests that go against meta only should be
260    // good for a bit of testing.
261    Connection connection = ConnectionFactory.createConnection(conf);
262    Table table = connection.getTable(TableName.META_TABLE_NAME);
263    ResultScanner scanner = table.getScanner(HConstants.CATALOG_FAMILY);
264    try {
265      Result result = null;
266      while ((result = scanner.next()) != null) {
267        LOG.info(Objects.toString(result));
268      }
269    } finally {
270      scanner.close();
271      table.close();
272      connection.close();
273    }
274  }
275
276  @Test
277  public void testConnectionClosedOnRegionLocate() throws IOException {
278    Configuration testConf = new Configuration(this.conf);
279    testConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
280    // Go against meta else we will try to find first region for the table on construction which
281    // means we'll have to do a bunch more mocking. Tests that go against meta only should be
282    // good for a bit of testing.
283    Connection connection = ConnectionFactory.createConnection(testConf);
284    Table table = connection.getTable(TableName.META_TABLE_NAME);
285    connection.close();
286    try {
287      Get get = new Get(Bytes.toBytes("dummyRow"));
288      table.get(get);
289      fail("Should have thrown DoNotRetryException but no exception thrown");
290    } catch (Exception e) {
291      if (!(e instanceof DoNotRetryIOException)) {
292        String errMsg =
293            "Should have thrown DoNotRetryException but actually " + e.getClass().getSimpleName();
294        LOG.error(errMsg, e);
295        fail(errMsg);
296      }
297    } finally {
298      table.close();
299    }
300  }
301
302  /**
303   * Override to shutdown going to zookeeper for cluster id and meta location.
304   */
305  static class RegionServerStoppedOnScannerOpenConnection
306  extends ConnectionImplementation {
307    final ClientService.BlockingInterface stub;
308
309    RegionServerStoppedOnScannerOpenConnection(Configuration conf,
310        ExecutorService pool, User user) throws IOException {
311      super(conf, pool, user);
312      // Mock up my stub so open scanner returns a scanner id and then on next, we throw
313      // exceptions for three times and then after that, we return no more to scan.
314      this.stub = Mockito.mock(ClientService.BlockingInterface.class);
315      long sid = 12345L;
316      try {
317        Mockito.when(stub.scan((RpcController)Mockito.any(),
318            (ClientProtos.ScanRequest)Mockito.any())).
319          thenReturn(ClientProtos.ScanResponse.newBuilder().setScannerId(sid).build()).
320          thenThrow(new ServiceException(new RegionServerStoppedException("From Mockito"))).
321          thenReturn(ClientProtos.ScanResponse.newBuilder().setScannerId(sid).
322              setMoreResults(false).build());
323      } catch (ServiceException e) {
324        throw new IOException(e);
325      }
326    }
327
328    @Override
329    public BlockingInterface getClient(ServerName sn) throws IOException {
330      return this.stub;
331    }
332  }
333
334  /**
335   * Override to check we are setting rpc timeout right.
336   */
337  static class RpcTimeoutConnection
338  extends ConnectionImplementation {
339    final ClientService.BlockingInterface stub;
340
341    RpcTimeoutConnection(Configuration conf, ExecutorService pool, User user)
342    throws IOException {
343      super(conf, pool, user);
344      // Mock up my stub so an exists call -- which turns into a get -- throws an exception
345      this.stub = Mockito.mock(ClientService.BlockingInterface.class);
346      try {
347        Mockito.when(stub.get((RpcController)Mockito.any(),
348            (ClientProtos.GetRequest)Mockito.any())).
349          thenThrow(new ServiceException(new RegionServerStoppedException("From Mockito")));
350      } catch (ServiceException e) {
351        throw new IOException(e);
352      }
353    }
354
355    @Override
356    public BlockingInterface getClient(ServerName sn) throws IOException {
357      return this.stub;
358    }
359  }
360
361  /**
362   * Fake many regionservers and many regions on a connection implementation.
363   */
364  static class ManyServersManyRegionsConnection
365  extends ConnectionImplementation {
366    // All access should be synchronized
367    final Map<ServerName, ClientService.BlockingInterface> serversByClient;
368
369    /**
370     * Map of faked-up rows of a 'meta table'.
371     */
372    final SortedMap<byte [], Pair<HRegionInfo, ServerName>> meta;
373    final AtomicLong sequenceids = new AtomicLong(0);
374    private final Configuration conf;
375
376    ManyServersManyRegionsConnection(Configuration conf,
377        ExecutorService pool, User user)
378    throws IOException {
379      super(conf, pool, user);
380      int serverCount = conf.getInt("hbase.test.servers", 10);
381      this.serversByClient = new HashMap<>(serverCount);
382      this.meta = makeMeta(Bytes.toBytes(
383        conf.get("hbase.test.tablename", Bytes.toString(BIG_USER_TABLE))),
384        conf.getInt("hbase.test.regions", 100),
385        conf.getLong("hbase.test.namespace.span", 1000),
386        serverCount);
387      this.conf = conf;
388    }
389
390    @Override
391    public ClientService.BlockingInterface getClient(ServerName sn) throws IOException {
392      // if (!sn.toString().startsWith("meta")) LOG.info(sn);
393      ClientService.BlockingInterface stub = null;
394      synchronized (this.serversByClient) {
395        stub = this.serversByClient.get(sn);
396        if (stub == null) {
397          stub = new FakeServer(this.conf, meta, sequenceids);
398          this.serversByClient.put(sn, stub);
399        }
400      }
401      return stub;
402    }
403  }
404
405  static MultiResponse doMultiResponse(final SortedMap<byte [], Pair<HRegionInfo, ServerName>> meta,
406      final AtomicLong sequenceids, final MultiRequest request) {
407    // Make a response to match the request.  Act like there were no failures.
408    ClientProtos.MultiResponse.Builder builder = ClientProtos.MultiResponse.newBuilder();
409    // Per Region.
410    RegionActionResult.Builder regionActionResultBuilder =
411        RegionActionResult.newBuilder();
412    ResultOrException.Builder roeBuilder = ResultOrException.newBuilder();
413    for (RegionAction regionAction: request.getRegionActionList()) {
414      regionActionResultBuilder.clear();
415      // Per Action in a Region.
416      for (ClientProtos.Action action: regionAction.getActionList()) {
417        roeBuilder.clear();
418        // Return empty Result and proper index as result.
419        roeBuilder.setResult(ClientProtos.Result.getDefaultInstance());
420        roeBuilder.setIndex(action.getIndex());
421        regionActionResultBuilder.addResultOrException(roeBuilder.build());
422      }
423      builder.addRegionActionResult(regionActionResultBuilder.build());
424    }
425    return builder.build();
426  }
427
428  /**
429   * Fake 'server'.
430   * Implements the ClientService responding as though it were a 'server' (presumes a new
431   * ClientService.BlockingInterface made per server).
432   */
433  static class FakeServer implements ClientService.BlockingInterface {
434    private AtomicInteger multiInvocationsCount = new AtomicInteger(0);
435    private final SortedMap<byte [], Pair<HRegionInfo, ServerName>> meta;
436    private final AtomicLong sequenceids;
437    private final long multiPause;
438    private final int tooManyMultiRequests;
439
440    FakeServer(final Configuration c, final SortedMap<byte [], Pair<HRegionInfo, ServerName>> meta,
441        final AtomicLong sequenceids) {
442      this.meta = meta;
443      this.sequenceids = sequenceids;
444
445      // Pause to simulate the server taking time applying the edits.  This will drive up the
446      // number of threads used over in client.
447      this.multiPause = c.getLong("hbase.test.multi.pause.when.done", 0);
448      this.tooManyMultiRequests = c.getInt("hbase.test.multi.too.many", 3);
449    }
450
451    @Override
452    public GetResponse get(RpcController controller, GetRequest request)
453    throws ServiceException {
454      boolean metaRegion = isMetaRegion(request.getRegion().getValue().toByteArray(),
455        request.getRegion().getType());
456      if (!metaRegion) {
457        return doGetResponse(request);
458      }
459      return doMetaGetResponse(meta, request);
460    }
461
462    private GetResponse doGetResponse(GetRequest request) {
463      ClientProtos.Result.Builder resultBuilder = ClientProtos.Result.newBuilder();
464      ByteString row = request.getGet().getRow();
465      resultBuilder.addCell(getStartCode(row));
466      GetResponse.Builder builder = GetResponse.newBuilder();
467      builder.setResult(resultBuilder.build());
468      return builder.build();
469    }
470
471    @Override
472    public MutateResponse mutate(RpcController controller,
473        MutateRequest request) throws ServiceException {
474      throw new NotImplementedException(HConstants.NOT_IMPLEMENTED);
475    }
476
477    @Override
478    public ScanResponse scan(RpcController controller,
479        ScanRequest request) throws ServiceException {
480      // Presume it is a scan of meta for now. Not all scans provide a region spec expecting
481      // the server to keep reference by scannerid.  TODO.
482      return doMetaScanResponse(meta, sequenceids, request);
483    }
484
485    @Override
486    public BulkLoadHFileResponse bulkLoadHFile(
487        RpcController controller, BulkLoadHFileRequest request)
488        throws ServiceException {
489      throw new NotImplementedException(HConstants.NOT_IMPLEMENTED);
490    }
491
492    @Override
493    public CoprocessorServiceResponse execService(
494        RpcController controller, CoprocessorServiceRequest request)
495        throws ServiceException {
496      throw new NotImplementedException(HConstants.NOT_IMPLEMENTED);
497    }
498
499    @Override
500    public MultiResponse multi(RpcController controller, MultiRequest request)
501    throws ServiceException {
502      int concurrentInvocations = this.multiInvocationsCount.incrementAndGet();
503      try {
504        if (concurrentInvocations >= tooManyMultiRequests) {
505          throw new ServiceException(new RegionTooBusyException("concurrentInvocations=" +
506           concurrentInvocations));
507        }
508        Threads.sleep(multiPause);
509        return doMultiResponse(meta, sequenceids, request);
510      } finally {
511        this.multiInvocationsCount.decrementAndGet();
512      }
513    }
514
515    @Override
516    public CoprocessorServiceResponse execRegionServerService(RpcController controller,
517        CoprocessorServiceRequest request) throws ServiceException {
518      throw new NotImplementedException(HConstants.NOT_IMPLEMENTED);
519    }
520
521    @Override
522    public PrepareBulkLoadResponse prepareBulkLoad(RpcController controller,
523        PrepareBulkLoadRequest request) throws ServiceException {
524      throw new NotImplementedException(HConstants.NOT_IMPLEMENTED);
525    }
526
527    @Override
528    public CleanupBulkLoadResponse cleanupBulkLoad(RpcController controller,
529        CleanupBulkLoadRequest request) throws ServiceException {
530      throw new NotImplementedException(HConstants.NOT_IMPLEMENTED);
531    }
532  }
533
534  static ScanResponse doMetaScanResponse(final SortedMap<byte [], Pair<HRegionInfo, ServerName>> meta,
535      final AtomicLong sequenceids, final ScanRequest request) {
536    ScanResponse.Builder builder = ScanResponse.newBuilder();
537    int max = request.getNumberOfRows();
538    int count = 0;
539    Map<byte [], Pair<HRegionInfo, ServerName>> tail =
540      request.hasScan()? meta.tailMap(request.getScan().getStartRow().toByteArray()): meta;
541      ClientProtos.Result.Builder resultBuilder = ClientProtos.Result.newBuilder();
542    for (Map.Entry<byte [], Pair<HRegionInfo, ServerName>> e: tail.entrySet()) {
543      // Can be 0 on open of a scanner -- i.e. rpc to setup scannerid only.
544      if (max <= 0) break;
545      if (++count > max) break;
546      HRegionInfo hri = e.getValue().getFirst();
547      ByteString row = UnsafeByteOperations.unsafeWrap(hri.getRegionName());
548      resultBuilder.clear();
549      resultBuilder.addCell(getRegionInfo(row, hri));
550      resultBuilder.addCell(getServer(row, e.getValue().getSecond()));
551      resultBuilder.addCell(getStartCode(row));
552      builder.addResults(resultBuilder.build());
553      // Set more to false if we are on the last region in table.
554      if (hri.getEndKey().length <= 0) builder.setMoreResults(false);
555      else builder.setMoreResults(true);
556    }
557    // If no scannerid, set one.
558    builder.setScannerId(request.hasScannerId()?
559      request.getScannerId(): sequenceids.incrementAndGet());
560    return builder.build();
561  }
562
563  static GetResponse doMetaGetResponse(final SortedMap<byte [], Pair<HRegionInfo, ServerName>> meta,
564      final GetRequest request) {
565    ClientProtos.Result.Builder resultBuilder = ClientProtos.Result.newBuilder();
566    ByteString row = request.getGet().getRow();
567    Pair<HRegionInfo, ServerName> p = meta.get(row.toByteArray());
568    if (p != null) {
569      resultBuilder.addCell(getRegionInfo(row, p.getFirst()));
570      resultBuilder.addCell(getServer(row, p.getSecond()));
571    }
572    resultBuilder.addCell(getStartCode(row));
573    GetResponse.Builder builder = GetResponse.newBuilder();
574    builder.setResult(resultBuilder.build());
575    return builder.build();
576  }
577
578  /**
579   * @param name region name or encoded region name.
580   * @param type
581   * @return True if we are dealing with a hbase:meta region.
582   */
583  static boolean isMetaRegion(final byte [] name, final RegionSpecifierType type) {
584    switch (type) {
585    case REGION_NAME:
586      return Bytes.equals(HRegionInfo.FIRST_META_REGIONINFO.getRegionName(), name);
587    case ENCODED_REGION_NAME:
588      return Bytes.equals(HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes(), name);
589    default: throw new UnsupportedOperationException();
590    }
591  }
592
593  private final static ByteString CATALOG_FAMILY_BYTESTRING =
594      UnsafeByteOperations.unsafeWrap(HConstants.CATALOG_FAMILY);
595  private final static ByteString REGIONINFO_QUALIFIER_BYTESTRING =
596      UnsafeByteOperations.unsafeWrap(HConstants.REGIONINFO_QUALIFIER);
597  private final static ByteString SERVER_QUALIFIER_BYTESTRING =
598      UnsafeByteOperations.unsafeWrap(HConstants.SERVER_QUALIFIER);
599
600  static CellProtos.Cell.Builder getBaseCellBuilder(final ByteString row) {
601    CellProtos.Cell.Builder cellBuilder = CellProtos.Cell.newBuilder();
602    cellBuilder.setRow(row);
603    cellBuilder.setFamily(CATALOG_FAMILY_BYTESTRING);
604    cellBuilder.setTimestamp(System.currentTimeMillis());
605    return cellBuilder;
606  }
607
608  static CellProtos.Cell getRegionInfo(final ByteString row, final HRegionInfo hri) {
609    CellProtos.Cell.Builder cellBuilder = getBaseCellBuilder(row);
610    cellBuilder.setQualifier(REGIONINFO_QUALIFIER_BYTESTRING);
611    cellBuilder.setValue(UnsafeByteOperations.unsafeWrap(hri.toByteArray()));
612    return cellBuilder.build();
613  }
614
615  static CellProtos.Cell getServer(final ByteString row, final ServerName sn) {
616    CellProtos.Cell.Builder cellBuilder = getBaseCellBuilder(row);
617    cellBuilder.setQualifier(SERVER_QUALIFIER_BYTESTRING);
618    cellBuilder.setValue(ByteString.copyFromUtf8(sn.getHostAndPort()));
619    return cellBuilder.build();
620  }
621
622  static CellProtos.Cell getStartCode(final ByteString row) {
623    CellProtos.Cell.Builder cellBuilder = getBaseCellBuilder(row);
624    cellBuilder.setQualifier(UnsafeByteOperations.unsafeWrap(HConstants.STARTCODE_QUALIFIER));
625    // TODO:
626    cellBuilder.setValue(UnsafeByteOperations.unsafeWrap(
627        Bytes.toBytes(META_SERVERNAME.getStartcode())));
628    return cellBuilder.build();
629  }
630
631  private static final byte [] BIG_USER_TABLE = Bytes.toBytes("t");
632
633  /**
634   * Format passed integer.  Zero-pad.
635   * Copied from hbase-server PE class and small amendment.  Make them share.
636   * @param number
637   * @return Returns zero-prefixed 10-byte wide decimal version of passed
638   * number (Does absolute in case number is negative).
639   */
640  private static byte [] format(final long number) {
641    byte [] b = new byte[10];
642    long d = number;
643    for (int i = b.length - 1; i >= 0; i--) {
644      b[i] = (byte)((d % 10) + '0');
645      d /= 10;
646    }
647    return b;
648  }
649
650  /**
651   * @param count
652   * @param namespaceSpan
653   * @return <code>count</code> regions
654   */
655  private static HRegionInfo [] makeHRegionInfos(final byte [] tableName, final int count,
656      final long namespaceSpan) {
657    byte [] startKey = HConstants.EMPTY_BYTE_ARRAY;
658    byte [] endKey = HConstants.EMPTY_BYTE_ARRAY;
659    long interval = namespaceSpan / count;
660    HRegionInfo [] hris = new HRegionInfo[count];
661    for (int i = 0; i < count; i++) {
662      if (i == 0) {
663        endKey = format(interval);
664      } else {
665        startKey = endKey;
666        if (i == count - 1) endKey = HConstants.EMPTY_BYTE_ARRAY;
667        else endKey = format((i + 1) * interval);
668      }
669      hris[i] = new HRegionInfo(TableName.valueOf(tableName), startKey, endKey);
670    }
671    return hris;
672  }
673
674  /**
675   * @param count
676   * @return Return <code>count</code> servernames.
677   */
678  private static ServerName [] makeServerNames(final int count) {
679    ServerName [] sns = new ServerName[count];
680    for (int i = 0; i < count; i++) {
681      sns[i] = ServerName.valueOf("" + i + ".example.org", 16010, i);
682    }
683    return sns;
684  }
685
686  /**
687   * Comparator for meta row keys.
688   */
689  private static class MetaRowsComparator implements Comparator<byte []> {
690    private final CellComparatorImpl delegate = CellComparatorImpl.META_COMPARATOR;
691    @Override
692    public int compare(byte[] left, byte[] right) {
693      return delegate.compareRows(new KeyValue.KeyOnlyKeyValue(left), right, 0, right.length);
694    }
695  }
696
697  /**
698   * Create up a map that is keyed by meta row name and whose value is the HRegionInfo and
699   * ServerName to return for this row.
700   * @return Map with faked hbase:meta content in it.
701   */
702  static SortedMap<byte [], Pair<HRegionInfo, ServerName>> makeMeta(final byte [] tableName,
703      final int regionCount, final long namespaceSpan, final int serverCount) {
704    // I need a comparator for meta rows so we sort properly.
705    SortedMap<byte [], Pair<HRegionInfo, ServerName>> meta =
706      new ConcurrentSkipListMap<>(new MetaRowsComparator());
707    HRegionInfo [] hris = makeHRegionInfos(tableName, regionCount, namespaceSpan);
708    ServerName [] serverNames = makeServerNames(serverCount);
709    int per = regionCount / serverCount;
710    int count = 0;
711    for (HRegionInfo hri: hris) {
712      Pair<HRegionInfo, ServerName> p = new Pair<>(hri, serverNames[count++ / per]);
713      meta.put(hri.getRegionName(), p);
714    }
715    return meta;
716  }
717
718  /**
719   * Code for each 'client' to run.
720   *
721   * @param id
722   * @param c
723   * @param sharedConnection
724   * @throws IOException
725   */
726  static void cycle(int id, final Configuration c, final Connection sharedConnection) throws IOException {
727    long namespaceSpan = c.getLong("hbase.test.namespace.span", 1000000);
728    long startTime = System.currentTimeMillis();
729    final int printInterval = 100000;
730    Random rd = new Random(id);
731    boolean get = c.getBoolean("hbase.test.do.gets", false);
732    TableName tableName = TableName.valueOf(BIG_USER_TABLE);
733    if (get) {
734      try (Table table = sharedConnection.getTable(tableName)){
735        Stopwatch stopWatch = Stopwatch.createStarted();
736        for (int i = 0; i < namespaceSpan; i++) {
737          byte [] b = format(rd.nextLong());
738          Get g = new Get(b);
739          table.get(g);
740          if (i % printInterval == 0) {
741            LOG.info("Get " + printInterval + "/" + stopWatch.elapsed(java.util.concurrent.TimeUnit.MILLISECONDS));
742            stopWatch.reset();
743            stopWatch.start();
744          }
745        }
746        LOG.info("Finished a cycle putting " + namespaceSpan + " in " +
747            (System.currentTimeMillis() - startTime) + "ms");
748      }
749    } else {
750      try (BufferedMutator mutator = sharedConnection.getBufferedMutator(tableName)) {
751        Stopwatch stopWatch = Stopwatch.createStarted();
752        for (int i = 0; i < namespaceSpan; i++) {
753          byte [] b = format(rd.nextLong());
754          Put p = new Put(b);
755          p.addColumn(HConstants.CATALOG_FAMILY, b, b);
756          mutator.mutate(p);
757          if (i % printInterval == 0) {
758            LOG.info("Put " + printInterval + "/" + stopWatch.elapsed(java.util.concurrent.TimeUnit.MILLISECONDS));
759            stopWatch.reset();
760            stopWatch.start();
761          }
762        }
763        LOG.info("Finished a cycle putting " + namespaceSpan + " in " +
764            (System.currentTimeMillis() - startTime) + "ms");
765        }
766    }
767  }
768
769  @Override
770  public int run(String[] arg0) throws Exception {
771    int errCode = 0;
772    // TODO: Make command options.
773    // How many servers to fake.
774    final int servers = 1;
775    // How many regions to put on the faked servers.
776    final int regions = 100000;
777    // How many 'keys' in the faked regions.
778    final long namespaceSpan = 50000000;
779    // How long to take to pause after doing a put; make this long if you want to fake a struggling
780    // server.
781    final long multiPause = 0;
782    // Check args make basic sense.
783    if ((namespaceSpan < regions) || (regions < servers)) {
784      throw new IllegalArgumentException("namespaceSpan=" + namespaceSpan + " must be > regions=" +
785        regions + " which must be > servers=" + servers);
786    }
787
788    // Set my many servers and many regions faking connection in place.
789    getConf().set("hbase.client.connection.impl",
790      ManyServersManyRegionsConnection.class.getName());
791    // Use simple kv registry rather than zk
792    getConf().set("hbase.client.registry.impl", SimpleRegistry.class.getName());
793    // When to report fails.  Default is we report the 10th.  This means we'll see log everytime
794    // an exception is thrown -- usually RegionTooBusyException when we have more than
795    // hbase.test.multi.too.many requests outstanding at any time.
796    getConf().setInt("hbase.client.start.log.errors.counter", 0);
797
798    // Ugly but this is only way to pass in configs.into ManyServersManyRegionsConnection class.
799    getConf().setInt("hbase.test.regions", regions);
800    getConf().setLong("hbase.test.namespace.span", namespaceSpan);
801    getConf().setLong("hbase.test.servers", servers);
802    getConf().set("hbase.test.tablename", Bytes.toString(BIG_USER_TABLE));
803    getConf().setLong("hbase.test.multi.pause.when.done", multiPause);
804    // Let there be ten outstanding requests at a time before we throw RegionBusyException.
805    getConf().setInt("hbase.test.multi.too.many", 10);
806    final int clients = 2;
807
808    // Have them all share the same connection so they all share the same instance of
809    // ManyServersManyRegionsConnection so I can keep an eye on how many requests by server.
810    final ExecutorService pool = Executors.newCachedThreadPool(Threads.getNamedThreadFactory("p"));
811      // Executors.newFixedThreadPool(servers * 10, Threads.getNamedThreadFactory("p"));
812    // Share a connection so I can keep counts in the 'server' on concurrency.
813    final Connection sharedConnection = ConnectionFactory.createConnection(getConf()/*, pool*/);
814    try {
815      Thread [] ts = new Thread[clients];
816      for (int j = 0; j < ts.length; j++) {
817        final int id = j;
818        ts[j] = new Thread("" + j) {
819          final Configuration c = getConf();
820
821          @Override
822          public void run() {
823            try {
824              cycle(id, c, sharedConnection);
825            } catch (IOException e) {
826              e.printStackTrace();
827            }
828          }
829        };
830        ts[j].start();
831      }
832      for (int j = 0; j < ts.length; j++) {
833        ts[j].join();
834      }
835    } finally {
836      sharedConnection.close();
837    }
838    return errCode;
839  }
840
841  /**
842   * Run a client instance against a faked up server.
843   * @param args TODO
844   * @throws Exception
845   */
846  public static void main(String[] args) throws Exception {
847    System.exit(ToolRunner.run(HBaseConfiguration.create(), new TestClientNoCluster(), args));
848  }
849}