001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import java.io.IOException;
021import java.util.Optional;
022import java.util.concurrent.atomic.AtomicLong;
023
024import org.apache.hadoop.conf.Configuration;
025import org.apache.hadoop.hbase.HRegionLocation;
026import org.apache.hadoop.hbase.RegionLocations;
027import org.apache.hadoop.hbase.ServerName;
028import org.apache.hadoop.hbase.TableName;
029import org.apache.hadoop.hbase.ZooKeeperConnectionException;
030import org.apache.hadoop.hbase.coprocessor.ObserverContext;
031import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
032import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
033import org.apache.hadoop.hbase.coprocessor.RegionObserver;
034import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
035import org.apache.hadoop.hbase.util.Threads;
036import org.mockito.Mockito;
037
038import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
039import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
040
041/**
042 * {@link Connection} testing utility.
043 */
044public class HConnectionTestingUtility {
045  /*
046   * Not part of {@link HBaseTestingUtility} because this class is not
047   * in same package as {@link ClusterConnection}.  Would have to reveal ugly
048   * {@link ConnectionImplementation} innards to HBaseTestingUtility to give it access.
049   */
050  /**
051<<<<<<< HEAD
052   * Get a Mocked {@link ClusterConnection} that goes with the passed <code>conf</code>
053   * configuration instance.  Minimally the mock will return
054   * &lt;code>conf&lt;/conf> when {@link ClusterConnection#getConfiguration()} is invoked.
055   * Be sure to shutdown the connection when done by calling
056   * {@link Connection#close()} else it will stick around; this is probably not what you want.
057=======
058   * Get a Mocked {@link Connection} that goes with the passed <code>conf</code>
059   * configuration instance. Minimally the mock will return &lt;code>conf&lt;/conf> when
060   * {@link Connection#getConfiguration()} is invoked. Be sure to shutdown the
061   * connection when done by calling {@link Connection#close()} else it will stick around; this is
062   * probably not what you want.
063>>>>>>> fabf2b8282... HBASE-22572 Javadoc Warnings: @link reference not found (#306)
064   * @param conf configuration
065   * @return ClusterConnection object for <code>conf</code>
066   * @throws ZooKeeperConnectionException
067   */
068  public static ClusterConnection getMockedConnection(final Configuration conf)
069  throws ZooKeeperConnectionException {
070    ConnectionImplementation connection = Mockito.mock(ConnectionImplementation.class);
071    Mockito.when(connection.getConfiguration()).thenReturn(conf);
072    Mockito.when(connection.getRpcControllerFactory()).thenReturn(
073      Mockito.mock(RpcControllerFactory.class));
074    // we need a real retrying caller
075    RpcRetryingCallerFactory callerFactory = new RpcRetryingCallerFactory(conf);
076    Mockito.when(connection.getRpcRetryingCallerFactory()).thenReturn(callerFactory);
077    return connection;
078  }
079
080  /**
081   * Calls {@link #getMockedConnection(Configuration)} and then mocks a few
082   * more of the popular {@link ClusterConnection} methods so they do 'normal'
083   * operation (see return doc below for list). Be sure to shutdown the
084   * connection when done by calling {@link Connection#close()} else it will stick around;
085   * this is probably not what you want.
086   *
087   * @param conf Configuration to use
088   * @param admin An AdminProtocol; can be null but is usually
089   * itself a mock.
090   * @param client A ClientProtocol; can be null but is usually
091   * itself a mock.
092   * @param sn ServerName to include in the region location returned by this
093   * <code>connection</code>
094   * @param hri RegionInfo to include in the location returned when
095   * getRegionLocator is called on the mocked connection
096   * @return Mock up a connection that returns a {@link Configuration} when
097   * {@link ClusterConnection#getConfiguration()} is called, a 'location' when
098   * {@link ClusterConnection#getRegionLocation(org.apache.hadoop.hbase.TableName, byte[], boolean)}
099   * is called,
100   * and that returns the passed {@link AdminProtos.AdminService.BlockingInterface} instance when
101   * {@link ClusterConnection#getAdmin(ServerName)} is called, returns the passed
102   * {@link ClientProtos.ClientService.BlockingInterface} instance when
103   * {@link ClusterConnection#getClient(ServerName)} is called (Be sure to call
104   * {@link Connection#close()} when done with this mocked Connection.
105   * @throws IOException
106   */
107  public static ClusterConnection getMockedConnectionAndDecorate(final Configuration conf,
108      final AdminProtos.AdminService.BlockingInterface admin,
109      final ClientProtos.ClientService.BlockingInterface client,
110      final ServerName sn, final RegionInfo hri)
111  throws IOException {
112    ConnectionImplementation c = Mockito.mock(ConnectionImplementation.class);
113    Mockito.when(c.getConfiguration()).thenReturn(conf);
114    Mockito.doNothing().when(c).close();
115    // Make it so we return a particular location when asked.
116    final HRegionLocation loc = new HRegionLocation(hri, sn);
117    Mockito.when(c.getRegionLocation((TableName) Mockito.any(),
118        (byte[]) Mockito.any(), Mockito.anyBoolean())).
119      thenReturn(loc);
120    Mockito.when(c.locateRegion((TableName) Mockito.any(), (byte[]) Mockito.any())).
121      thenReturn(loc);
122    Mockito.when(c.locateRegion((TableName) Mockito.any(), (byte[]) Mockito.any(),
123        Mockito.anyBoolean(), Mockito.anyBoolean(),  Mockito.anyInt()))
124        .thenReturn(new RegionLocations(loc));
125    if (admin != null) {
126      // If a call to getAdmin, return this implementation.
127      Mockito.when(c.getAdmin(Mockito.any())).
128        thenReturn(admin);
129    }
130    if (client != null) {
131      // If a call to getClient, return this client.
132      Mockito.when(c.getClient(Mockito.any())).
133        thenReturn(client);
134    }
135    NonceGenerator ng = Mockito.mock(NonceGenerator.class);
136    Mockito.when(c.getNonceGenerator()).thenReturn(ng);
137    Mockito.when(c.getAsyncProcess()).thenReturn(
138      new AsyncProcess(c, conf, RpcRetryingCallerFactory.instantiate(conf),
139          RpcControllerFactory.instantiate(conf)));
140    Mockito.when(c.getNewRpcRetryingCallerFactory(conf)).thenReturn(
141        RpcRetryingCallerFactory.instantiate(conf,
142            RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, null));
143    Mockito.when(c.getRpcControllerFactory()).thenReturn(Mockito.mock(RpcControllerFactory.class));
144    Table t = Mockito.mock(Table.class);
145    Mockito.when(c.getTable((TableName)Mockito.any())).thenReturn(t);
146    ResultScanner rs = Mockito.mock(ResultScanner.class);
147    Mockito.when(t.getScanner((Scan)Mockito.any())).thenReturn(rs);
148    return c;
149  }
150
151  /**
152   * Get a Mockito spied-upon {@link ClusterConnection} that goes with the passed
153   * <code>conf</code> configuration instance.
154   * Be sure to shutdown the connection when done by calling
155   * {@link Connection#close()} else it will stick around; this is probably not what you want.
156   * @param conf configuration
157   * @return ClusterConnection object for <code>conf</code>
158   * @throws ZooKeeperConnectionException
159   * [Dead link]: See also
160   * {http://mockito.googlecode.com/svn/branches/1.6/javadoc/org/mockito/Mockito.html#spy(T)}
161   */
162  public static ClusterConnection getSpiedConnection(final Configuration conf)
163  throws IOException {
164    ConnectionImplementation connection =
165      Mockito.spy(new ConnectionImplementation(conf, null, null));
166    return connection;
167  }
168
169  /**
170   * This coproceesor sleep 2s at first increment/append rpc call.
171   */
172  public static class SleepAtFirstRpcCall implements RegionCoprocessor, RegionObserver {
173    static final AtomicLong ct = new AtomicLong(0);
174    static final String SLEEP_TIME_CONF_KEY =
175        "hbase.coprocessor.SleepAtFirstRpcCall.sleepTime";
176    static final long DEFAULT_SLEEP_TIME = 2000;
177    static final AtomicLong sleepTime = new AtomicLong(DEFAULT_SLEEP_TIME);
178
179    @Override
180    public Optional<RegionObserver> getRegionObserver() {
181      return Optional.of(this);
182    }
183
184    public SleepAtFirstRpcCall() {
185    }
186
187    @Override
188    public void postOpen(ObserverContext<RegionCoprocessorEnvironment> c) {
189      RegionCoprocessorEnvironment env = c.getEnvironment();
190      Configuration conf = env.getConfiguration();
191      sleepTime.set(conf.getLong(SLEEP_TIME_CONF_KEY, DEFAULT_SLEEP_TIME));
192    }
193
194    @Override
195    public Result postIncrement(final ObserverContext<RegionCoprocessorEnvironment> e,
196        final Increment increment, final Result result) throws IOException {
197      if (ct.incrementAndGet() == 1) {
198        Threads.sleep(sleepTime.get());
199      }
200      return result;
201    }
202
203    @Override
204    public Result postAppend(final ObserverContext<RegionCoprocessorEnvironment> e,
205        final Append append, final Result result) throws IOException {
206      if (ct.incrementAndGet() == 1) {
207        Threads.sleep(sleepTime.get());
208      }
209      return result;
210    }
211  }
212}