001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import java.io.IOException;
021import java.util.Optional;
022import java.util.concurrent.atomic.AtomicLong;
023
024import org.apache.hadoop.conf.Configuration;
025import org.apache.hadoop.hbase.HRegionLocation;
026import org.apache.hadoop.hbase.RegionLocations;
027import org.apache.hadoop.hbase.ServerName;
028import org.apache.hadoop.hbase.TableName;
029import org.apache.hadoop.hbase.ZooKeeperConnectionException;
030import org.apache.hadoop.hbase.coprocessor.ObserverContext;
031import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
032import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
033import org.apache.hadoop.hbase.coprocessor.RegionObserver;
034import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
035import org.apache.hadoop.hbase.util.Threads;
036import org.mockito.Mockito;
037
038import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
039import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
040
041/**
042 * {@link ClusterConnection} testing utility.
043 */
044public class HConnectionTestingUtility {
045  /*
046   * Not part of {@link HBaseTestingUtility} because this class is not
047   * in same package as {@link ClusterConnection}.  Would have to reveal ugly
048   * {@link ConnectionImplementation} innards to HBaseTestingUtility to give it access.
049   */
050  /**
051   * Get a Mocked {@link ClusterConnection} that goes with the passed <code>conf</code>
052   * configuration instance.  Minimally the mock will return
053   * <code>conf</conf> when {@link ClusterConnection#getConfiguration()} is invoked.
054   * Be sure to shutdown the connection when done by calling
055   * {@link Connection#close()} else it will stick around; this is probably not what you want.
056   * @param conf configuration
057   * @return ClusterConnection object for <code>conf</code>
058   * @throws ZooKeeperConnectionException
059   */
060  public static ClusterConnection getMockedConnection(final Configuration conf)
061  throws ZooKeeperConnectionException {
062    ConnectionImplementation connection = Mockito.mock(ConnectionImplementation.class);
063    Mockito.when(connection.getConfiguration()).thenReturn(conf);
064    Mockito.when(connection.getRpcControllerFactory()).thenReturn(
065      Mockito.mock(RpcControllerFactory.class));
066    // we need a real retrying caller
067    RpcRetryingCallerFactory callerFactory = new RpcRetryingCallerFactory(conf);
068    Mockito.when(connection.getRpcRetryingCallerFactory()).thenReturn(callerFactory);
069    return connection;
070  }
071
072  /**
073   * Calls {@link #getMockedConnection(Configuration)} and then mocks a few
074   * more of the popular {@link ClusterConnection} methods so they do 'normal'
075   * operation (see return doc below for list). Be sure to shutdown the
076   * connection when done by calling {@link Connection#close()} else it will stick around;
077   * this is probably not what you want.
078   *
079   * @param conf Configuration to use
080   * @param admin An AdminProtocol; can be null but is usually
081   * itself a mock.
082   * @param client A ClientProtocol; can be null but is usually
083   * itself a mock.
084   * @param sn ServerName to include in the region location returned by this
085   * <code>connection</code>
086   * @param hri RegionInfo to include in the location returned when
087   * getRegionLocator is called on the mocked connection
088   * @return Mock up a connection that returns a {@link Configuration} when
089   * {@link ClusterConnection#getConfiguration()} is called, a 'location' when
090   * {@link ClusterConnection#getRegionLocation(org.apache.hadoop.hbase.TableName, byte[], boolean)}
091   * is called,
092   * and that returns the passed {@link AdminProtos.AdminService.BlockingInterface} instance when
093   * {@link ClusterConnection#getAdmin(ServerName)} is called, returns the passed
094   * {@link ClientProtos.ClientService.BlockingInterface} instance when
095   * {@link ClusterConnection#getClient(ServerName)} is called (Be sure to call
096   * {@link Connection#close()} when done with this mocked Connection.
097   * @throws IOException
098   */
099  public static ClusterConnection getMockedConnectionAndDecorate(final Configuration conf,
100      final AdminProtos.AdminService.BlockingInterface admin,
101      final ClientProtos.ClientService.BlockingInterface client,
102      final ServerName sn, final RegionInfo hri)
103  throws IOException {
104    ConnectionImplementation c = Mockito.mock(ConnectionImplementation.class);
105    Mockito.when(c.getConfiguration()).thenReturn(conf);
106    Mockito.doNothing().when(c).close();
107    // Make it so we return a particular location when asked.
108    final HRegionLocation loc = new HRegionLocation(hri, sn);
109    Mockito.when(c.getRegionLocation((TableName) Mockito.any(),
110        (byte[]) Mockito.any(), Mockito.anyBoolean())).
111      thenReturn(loc);
112    Mockito.when(c.locateRegion((TableName) Mockito.any(), (byte[]) Mockito.any())).
113      thenReturn(loc);
114    Mockito.when(c.locateRegion((TableName) Mockito.any(), (byte[]) Mockito.any(),
115        Mockito.anyBoolean(), Mockito.anyBoolean(),  Mockito.anyInt()))
116        .thenReturn(new RegionLocations(loc));
117    if (admin != null) {
118      // If a call to getAdmin, return this implementation.
119      Mockito.when(c.getAdmin(Mockito.any())).
120        thenReturn(admin);
121    }
122    if (client != null) {
123      // If a call to getClient, return this client.
124      Mockito.when(c.getClient(Mockito.any())).
125        thenReturn(client);
126    }
127    NonceGenerator ng = Mockito.mock(NonceGenerator.class);
128    Mockito.when(c.getNonceGenerator()).thenReturn(ng);
129    Mockito.when(c.getAsyncProcess()).thenReturn(
130      new AsyncProcess(c, conf, RpcRetryingCallerFactory.instantiate(conf),
131          RpcControllerFactory.instantiate(conf)));
132    Mockito.when(c.getNewRpcRetryingCallerFactory(conf)).thenReturn(
133        RpcRetryingCallerFactory.instantiate(conf,
134            RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, null));
135    Mockito.when(c.getRpcControllerFactory()).thenReturn(Mockito.mock(RpcControllerFactory.class));
136    Table t = Mockito.mock(Table.class);
137    Mockito.when(c.getTable((TableName)Mockito.any())).thenReturn(t);
138    ResultScanner rs = Mockito.mock(ResultScanner.class);
139    Mockito.when(t.getScanner((Scan)Mockito.any())).thenReturn(rs);
140    return c;
141  }
142
143  /**
144   * Get a Mockito spied-upon {@link ClusterConnection} that goes with the passed
145   * <code>conf</code> configuration instance.
146   * Be sure to shutdown the connection when done by calling
147   * {@link Connection#close()} else it will stick around; this is probably not what you want.
148   * @param conf configuration
149   * @return ClusterConnection object for <code>conf</code>
150   * @throws ZooKeeperConnectionException
151   * @see @link
152   * {http://mockito.googlecode.com/svn/branches/1.6/javadoc/org/mockito/Mockito.html#spy(T)}
153   */
154  public static ClusterConnection getSpiedConnection(final Configuration conf)
155  throws IOException {
156    ConnectionImplementation connection =
157      Mockito.spy(new ConnectionImplementation(conf, null, null));
158    return connection;
159  }
160
161  /**
162   * This coproceesor sleep 2s at first increment/append rpc call.
163   */
164  public static class SleepAtFirstRpcCall implements RegionCoprocessor, RegionObserver {
165    static final AtomicLong ct = new AtomicLong(0);
166    static final String SLEEP_TIME_CONF_KEY =
167        "hbase.coprocessor.SleepAtFirstRpcCall.sleepTime";
168    static final long DEFAULT_SLEEP_TIME = 2000;
169    static final AtomicLong sleepTime = new AtomicLong(DEFAULT_SLEEP_TIME);
170
171    @Override
172    public Optional<RegionObserver> getRegionObserver() {
173      return Optional.of(this);
174    }
175
176    public SleepAtFirstRpcCall() {
177    }
178
179    @Override
180    public void postOpen(ObserverContext<RegionCoprocessorEnvironment> c) {
181      RegionCoprocessorEnvironment env = c.getEnvironment();
182      Configuration conf = env.getConfiguration();
183      sleepTime.set(conf.getLong(SLEEP_TIME_CONF_KEY, DEFAULT_SLEEP_TIME));
184    }
185
186    @Override
187    public Result postIncrement(final ObserverContext<RegionCoprocessorEnvironment> e,
188        final Increment increment, final Result result) throws IOException {
189      if (ct.incrementAndGet() == 1) {
190        Threads.sleep(sleepTime.get());
191      }
192      return result;
193    }
194
195    @Override
196    public Result postAppend(final ObserverContext<RegionCoprocessorEnvironment> e,
197        final Append append, final Result result) throws IOException {
198      if (ct.incrementAndGet() == 1) {
199        Threads.sleep(sleepTime.get());
200      }
201      return result;
202    }
203  }
204}