001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.ipc;
019
020import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.SERVICE;
021import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.newBlockingStub;
022import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.newStub;
023import static org.junit.Assert.assertEquals;
024import static org.junit.Assert.assertFalse;
025import static org.junit.Assert.assertNotNull;
026import static org.junit.Assert.assertNull;
027import static org.junit.Assert.assertTrue;
028import static org.junit.Assert.fail;
029import static org.mockito.Matchers.anyObject;
030import static org.mockito.Mockito.spy;
031import static org.mockito.Mockito.verify;
032import static org.mockito.internal.verification.VerificationModeFactory.times;
033
034import java.io.IOException;
035import java.net.InetSocketAddress;
036import java.util.ArrayList;
037import java.util.List;
038
039import org.apache.hadoop.conf.Configuration;
040import org.apache.hadoop.hbase.Cell;
041import org.apache.hadoop.hbase.CellScanner;
042import org.apache.hadoop.hbase.CellUtil;
043import org.apache.hadoop.hbase.DoNotRetryIOException;
044import org.apache.hadoop.hbase.HBaseConfiguration;
045import org.apache.hadoop.hbase.KeyValue;
046import org.apache.hadoop.hbase.Server;
047import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
048import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
049import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoRequestProto;
050import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoResponseProto;
051import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EmptyRequestProto;
052import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EmptyResponseProto;
053import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.PauseRequestProto;
054import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface;
055import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.Interface;
056import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
057import org.apache.hadoop.hbase.util.Bytes;
058import org.apache.hadoop.io.compress.GzipCodec;
059import org.apache.hadoop.util.StringUtils;
060import org.junit.Test;
061import org.slf4j.Logger;
062import org.slf4j.LoggerFactory;
063import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
064import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
065
066/**
067 * Some basic ipc tests.
068 */
069public abstract class AbstractTestIPC {
070
071  private static final Logger LOG = LoggerFactory.getLogger(AbstractTestIPC.class);
072
073  private static final byte[] CELL_BYTES = Bytes.toBytes("xyz");
074  private static final KeyValue CELL = new KeyValue(CELL_BYTES, CELL_BYTES, CELL_BYTES, CELL_BYTES);
075
076  protected static final Configuration CONF = HBaseConfiguration.create();
077  static {
078    // Set the default to be the old SimpleRpcServer. Subclasses test it and netty.
079    CONF.set(RpcServerFactory.CUSTOM_RPC_SERVER_IMPL_CONF_KEY, SimpleRpcServer.class.getName());
080  }
081
082  protected abstract RpcServer createRpcServer(final Server server, final String name,
083      final List<BlockingServiceAndInterface> services,
084      final InetSocketAddress bindAddress, Configuration conf,
085      RpcScheduler scheduler) throws IOException;
086
087  protected abstract AbstractRpcClient<?> createRpcClientNoCodec(Configuration conf);
088
089  /**
090   * Ensure we do not HAVE TO HAVE a codec.
091   */
092  @Test
093  public void testNoCodec() throws IOException, ServiceException {
094    Configuration conf = HBaseConfiguration.create();
095    RpcServer rpcServer = createRpcServer(null, "testRpcServer",
096        Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(
097            SERVICE, null)), new InetSocketAddress("localhost", 0), CONF,
098        new FifoRpcScheduler(CONF, 1));
099    try (AbstractRpcClient<?> client = createRpcClientNoCodec(conf)) {
100      rpcServer.start();
101      BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
102      HBaseRpcController pcrc = new HBaseRpcControllerImpl();
103      String message = "hello";
104      assertEquals(message,
105        stub.echo(pcrc, EchoRequestProto.newBuilder().setMessage(message).build()).getMessage());
106      assertNull(pcrc.cellScanner());
107    } finally {
108      rpcServer.stop();
109    }
110  }
111
112  protected abstract AbstractRpcClient<?> createRpcClient(Configuration conf);
113
114  /**
115   * It is hard to verify the compression is actually happening under the wraps. Hope that if
116   * unsupported, we'll get an exception out of some time (meantime, have to trace it manually to
117   * confirm that compression is happening down in the client and server).
118   */
119  @Test
120  public void testCompressCellBlock() throws IOException, ServiceException {
121    Configuration conf = new Configuration(HBaseConfiguration.create());
122    conf.set("hbase.client.rpc.compressor", GzipCodec.class.getCanonicalName());
123    List<Cell> cells = new ArrayList<>();
124    int count = 3;
125    for (int i = 0; i < count; i++) {
126      cells.add(CELL);
127    }
128    RpcServer rpcServer = createRpcServer(null, "testRpcServer",
129        Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(
130            SERVICE, null)), new InetSocketAddress("localhost", 0), CONF,
131        new FifoRpcScheduler(CONF, 1));
132
133    try (AbstractRpcClient<?> client = createRpcClient(conf)) {
134      rpcServer.start();
135      BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
136      HBaseRpcController pcrc = new HBaseRpcControllerImpl(CellUtil.createCellScanner(cells));
137      String message = "hello";
138      assertEquals(message,
139        stub.echo(pcrc, EchoRequestProto.newBuilder().setMessage(message).build()).getMessage());
140      int index = 0;
141      CellScanner cellScanner = pcrc.cellScanner();
142      assertNotNull(cellScanner);
143      while (cellScanner.advance()) {
144        assertEquals(CELL, cellScanner.current());
145        index++;
146      }
147      assertEquals(count, index);
148    } finally {
149      rpcServer.stop();
150    }
151  }
152
153  protected abstract AbstractRpcClient<?> createRpcClientRTEDuringConnectionSetup(
154      Configuration conf) throws IOException;
155
156  @Test
157  public void testRTEDuringConnectionSetup() throws Exception {
158    Configuration conf = HBaseConfiguration.create();
159    RpcServer rpcServer = createRpcServer(null, "testRpcServer",
160        Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(
161            SERVICE, null)), new InetSocketAddress("localhost", 0), CONF,
162        new FifoRpcScheduler(CONF, 1));
163    try (AbstractRpcClient<?> client = createRpcClientRTEDuringConnectionSetup(conf)) {
164      rpcServer.start();
165      BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
166      stub.ping(null, EmptyRequestProto.getDefaultInstance());
167      fail("Expected an exception to have been thrown!");
168    } catch (Exception e) {
169      LOG.info("Caught expected exception: " + e.toString());
170      assertTrue(e.toString(), StringUtils.stringifyException(e).contains("Injected fault"));
171    } finally {
172      rpcServer.stop();
173    }
174  }
175
176  /**
177   * Tests that the rpc scheduler is called when requests arrive.
178   */
179  @Test
180  public void testRpcScheduler() throws IOException, ServiceException, InterruptedException {
181    RpcScheduler scheduler = spy(new FifoRpcScheduler(CONF, 1));
182    RpcServer rpcServer = createRpcServer(null, "testRpcServer",
183        Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(
184            SERVICE, null)), new InetSocketAddress("localhost", 0), CONF, scheduler);
185    verify(scheduler).init((RpcScheduler.Context) anyObject());
186    try (AbstractRpcClient<?> client = createRpcClient(CONF)) {
187      rpcServer.start();
188      verify(scheduler).start();
189      BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
190      EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
191      for (int i = 0; i < 10; i++) {
192        stub.echo(null, param);
193      }
194      verify(scheduler, times(10)).dispatch((CallRunner) anyObject());
195    } finally {
196      rpcServer.stop();
197      verify(scheduler).stop();
198    }
199  }
200
201  /** Tests that the rpc scheduler is called when requests arrive. */
202  @Test
203  public void testRpcMaxRequestSize() throws IOException, ServiceException {
204    Configuration conf = new Configuration(CONF);
205    conf.setInt(RpcServer.MAX_REQUEST_SIZE, 1000);
206    RpcServer rpcServer = createRpcServer(null, "testRpcServer",
207        Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(
208            SERVICE, null)), new InetSocketAddress("localhost", 0), conf,
209        new FifoRpcScheduler(conf, 1));
210    try (AbstractRpcClient<?> client = createRpcClient(conf)) {
211      rpcServer.start();
212      BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
213      StringBuilder message = new StringBuilder(1200);
214      for (int i = 0; i < 200; i++) {
215        message.append("hello.");
216      }
217      // set total RPC size bigger than 100 bytes
218      EchoRequestProto param = EchoRequestProto.newBuilder().setMessage(message.toString()).build();
219      stub.echo(
220        new HBaseRpcControllerImpl(CellUtil.createCellScanner(ImmutableList.<Cell> of(CELL))),
221        param);
222      fail("RPC should have failed because it exceeds max request size");
223    } catch (ServiceException e) {
224      LOG.info("Caught expected exception: " + e);
225      assertTrue(e.toString(),
226          StringUtils.stringifyException(e).contains("RequestTooBigException"));
227    } finally {
228      rpcServer.stop();
229    }
230  }
231
232  /**
233   * Tests that the RpcServer creates & dispatches CallRunner object to scheduler with non-null
234   * remoteAddress set to its Call Object
235   * @throws ServiceException
236   */
237  @Test
238  public void testRpcServerForNotNullRemoteAddressInCallObject()
239      throws IOException, ServiceException {
240    RpcServer rpcServer = createRpcServer(null, "testRpcServer",
241        Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(
242            SERVICE, null)), new InetSocketAddress("localhost", 0), CONF,
243        new FifoRpcScheduler(CONF, 1));
244    InetSocketAddress localAddr = new InetSocketAddress("localhost", 0);
245    try (AbstractRpcClient<?> client = createRpcClient(CONF)) {
246      rpcServer.start();
247      BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
248      assertEquals(localAddr.getAddress().getHostAddress(),
249        stub.addr(null, EmptyRequestProto.getDefaultInstance()).getAddr());
250    } finally {
251      rpcServer.stop();
252    }
253  }
254
255  @Test
256  public void testRemoteError() throws IOException, ServiceException {
257    RpcServer rpcServer = createRpcServer(null, "testRpcServer",
258        Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(
259            SERVICE, null)), new InetSocketAddress("localhost", 0), CONF,
260        new FifoRpcScheduler(CONF, 1));
261    try (AbstractRpcClient<?> client = createRpcClient(CONF)) {
262      rpcServer.start();
263      BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
264      stub.error(null, EmptyRequestProto.getDefaultInstance());
265    } catch (ServiceException e) {
266      LOG.info("Caught expected exception: " + e);
267      IOException ioe = ProtobufUtil.handleRemoteException(e);
268      assertTrue(ioe instanceof DoNotRetryIOException);
269      assertTrue(ioe.getMessage().contains("server error!"));
270    } finally {
271      rpcServer.stop();
272    }
273  }
274
275  @Test
276  public void testTimeout() throws IOException {
277    RpcServer rpcServer = createRpcServer(null, "testRpcServer",
278        Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(
279            SERVICE, null)), new InetSocketAddress("localhost", 0), CONF,
280        new FifoRpcScheduler(CONF, 1));
281    try (AbstractRpcClient<?> client = createRpcClient(CONF)) {
282      rpcServer.start();
283      BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
284      HBaseRpcController pcrc = new HBaseRpcControllerImpl();
285      int ms = 1000;
286      int timeout = 100;
287      for (int i = 0; i < 10; i++) {
288        pcrc.reset();
289        pcrc.setCallTimeout(timeout);
290        long startTime = System.nanoTime();
291        try {
292          stub.pause(pcrc, PauseRequestProto.newBuilder().setMs(ms).build());
293        } catch (ServiceException e) {
294          long waitTime = (System.nanoTime() - startTime) / 1000000;
295          // expected
296          LOG.info("Caught expected exception: " + e);
297          IOException ioe = ProtobufUtil.handleRemoteException(e);
298          assertTrue(ioe.getCause() instanceof CallTimeoutException);
299          // confirm that we got exception before the actual pause.
300          assertTrue(waitTime < ms);
301        }
302      }
303    } finally {
304      rpcServer.stop();
305    }
306  }
307
308  protected abstract RpcServer createTestFailingRpcServer(final Server server, final String name,
309      final List<BlockingServiceAndInterface> services,
310      final InetSocketAddress bindAddress, Configuration conf,
311      RpcScheduler scheduler) throws IOException;
312
313  /** Tests that the connection closing is handled by the client with outstanding RPC calls */
314  @Test
315  public void testConnectionCloseWithOutstandingRPCs() throws InterruptedException, IOException {
316    Configuration conf = new Configuration(CONF);
317    RpcServer rpcServer = createTestFailingRpcServer(null, "testRpcServer",
318        Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(
319            SERVICE, null)), new InetSocketAddress("localhost", 0), CONF,
320        new FifoRpcScheduler(CONF, 1));
321
322    try (AbstractRpcClient<?> client = createRpcClient(conf)) {
323      rpcServer.start();
324      BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
325      EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
326      stub.echo(null, param);
327      fail("RPC should have failed because connection closed");
328    } catch (ServiceException e) {
329      LOG.info("Caught expected exception: " + e.toString());
330    } finally {
331      rpcServer.stop();
332    }
333  }
334
335  @Test
336  public void testAsyncEcho() throws IOException {
337    Configuration conf = HBaseConfiguration.create();
338    RpcServer rpcServer = createRpcServer(null, "testRpcServer",
339        Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(
340            SERVICE, null)), new InetSocketAddress("localhost", 0), CONF,
341        new FifoRpcScheduler(CONF, 1));
342    try (AbstractRpcClient<?> client = createRpcClient(conf)) {
343      rpcServer.start();
344      Interface stub = newStub(client, rpcServer.getListenerAddress());
345      int num = 10;
346      List<HBaseRpcController> pcrcList = new ArrayList<>();
347      List<BlockingRpcCallback<EchoResponseProto>> callbackList = new ArrayList<>();
348      for (int i = 0; i < num; i++) {
349        HBaseRpcController pcrc = new HBaseRpcControllerImpl();
350        BlockingRpcCallback<EchoResponseProto> done = new BlockingRpcCallback<>();
351        stub.echo(pcrc, EchoRequestProto.newBuilder().setMessage("hello-" + i).build(), done);
352        pcrcList.add(pcrc);
353        callbackList.add(done);
354      }
355      for (int i = 0; i < num; i++) {
356        HBaseRpcController pcrc = pcrcList.get(i);
357        assertFalse(pcrc.failed());
358        assertNull(pcrc.cellScanner());
359        assertEquals("hello-" + i, callbackList.get(i).get().getMessage());
360      }
361    } finally {
362      rpcServer.stop();
363    }
364  }
365
366  @Test
367  public void testAsyncRemoteError() throws IOException {
368    AbstractRpcClient<?> client = createRpcClient(CONF);
369    RpcServer rpcServer = createRpcServer(null, "testRpcServer",
370        Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(
371            SERVICE, null)), new InetSocketAddress("localhost", 0), CONF,
372        new FifoRpcScheduler(CONF, 1));
373    try {
374      rpcServer.start();
375      Interface stub = newStub(client, rpcServer.getListenerAddress());
376      BlockingRpcCallback<EmptyResponseProto> callback = new BlockingRpcCallback<>();
377      HBaseRpcController pcrc = new HBaseRpcControllerImpl();
378      stub.error(pcrc, EmptyRequestProto.getDefaultInstance(), callback);
379      assertNull(callback.get());
380      assertTrue(pcrc.failed());
381      LOG.info("Caught expected exception: " + pcrc.getFailed());
382      IOException ioe = ProtobufUtil.handleRemoteException(pcrc.getFailed());
383      assertTrue(ioe instanceof DoNotRetryIOException);
384      assertTrue(ioe.getMessage().contains("server error!"));
385    } finally {
386      client.close();
387      rpcServer.stop();
388    }
389  }
390
391  @Test
392  public void testAsyncTimeout() throws IOException {
393    RpcServer rpcServer = createRpcServer(null, "testRpcServer",
394        Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(
395            SERVICE, null)), new InetSocketAddress("localhost", 0), CONF,
396        new FifoRpcScheduler(CONF, 1));
397    try (AbstractRpcClient<?> client = createRpcClient(CONF)) {
398      rpcServer.start();
399      Interface stub = newStub(client, rpcServer.getListenerAddress());
400      List<HBaseRpcController> pcrcList = new ArrayList<>();
401      List<BlockingRpcCallback<EmptyResponseProto>> callbackList = new ArrayList<>();
402      int ms = 1000;
403      int timeout = 100;
404      long startTime = System.nanoTime();
405      for (int i = 0; i < 10; i++) {
406        HBaseRpcController pcrc = new HBaseRpcControllerImpl();
407        pcrc.setCallTimeout(timeout);
408        BlockingRpcCallback<EmptyResponseProto> callback = new BlockingRpcCallback<>();
409        stub.pause(pcrc, PauseRequestProto.newBuilder().setMs(ms).build(), callback);
410        pcrcList.add(pcrc);
411        callbackList.add(callback);
412      }
413      for (BlockingRpcCallback<?> callback : callbackList) {
414        assertNull(callback.get());
415      }
416      long waitTime = (System.nanoTime() - startTime) / 1000000;
417      for (HBaseRpcController pcrc : pcrcList) {
418        assertTrue(pcrc.failed());
419        LOG.info("Caught expected exception: " + pcrc.getFailed());
420        IOException ioe = ProtobufUtil.handleRemoteException(pcrc.getFailed());
421        assertTrue(ioe.getCause() instanceof CallTimeoutException);
422      }
423      // confirm that we got exception before the actual pause.
424      assertTrue(waitTime < ms);
425    } finally {
426      rpcServer.stop();
427    }
428  }
429}