001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.ipc;
019
020import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.SERVICE;
021import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.newBlockingStub;
022import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.newStub;
023import static org.junit.Assert.assertEquals;
024import static org.junit.Assert.assertFalse;
025import static org.junit.Assert.assertNotNull;
026import static org.junit.Assert.assertNull;
027import static org.junit.Assert.assertTrue;
028import static org.junit.Assert.fail;
029import static org.mockito.ArgumentMatchers.anyObject;
030import static org.mockito.Mockito.spy;
031import static org.mockito.Mockito.verify;
032import static org.mockito.internal.verification.VerificationModeFactory.times;
033
034import java.io.IOException;
035import java.net.InetSocketAddress;
036import java.util.ArrayList;
037import java.util.List;
038import org.apache.hadoop.conf.Configuration;
039import org.apache.hadoop.hbase.Cell;
040import org.apache.hadoop.hbase.CellScanner;
041import org.apache.hadoop.hbase.CellUtil;
042import org.apache.hadoop.hbase.DoNotRetryIOException;
043import org.apache.hadoop.hbase.HBaseConfiguration;
044import org.apache.hadoop.hbase.KeyValue;
045import org.apache.hadoop.hbase.Server;
046import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
047import org.apache.hadoop.hbase.util.Bytes;
048import org.apache.hadoop.io.compress.GzipCodec;
049import org.apache.hadoop.util.StringUtils;
050import org.junit.Test;
051import org.slf4j.Logger;
052import org.slf4j.LoggerFactory;
053
054import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
055import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
056import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
057
058import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoRequestProto;
059import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoResponseProto;
060import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EmptyRequestProto;
061import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EmptyResponseProto;
062import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.PauseRequestProto;
063import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface;
064import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.Interface;
065import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
066
067/**
068 * Some basic ipc tests.
069 */
070public abstract class AbstractTestIPC {
071
072  private static final Logger LOG = LoggerFactory.getLogger(AbstractTestIPC.class);
073
074  private static final byte[] CELL_BYTES = Bytes.toBytes("xyz");
075  private static final KeyValue CELL = new KeyValue(CELL_BYTES, CELL_BYTES, CELL_BYTES, CELL_BYTES);
076
077  protected static final Configuration CONF = HBaseConfiguration.create();
078  static {
079    // Set the default to be the old SimpleRpcServer. Subclasses test it and netty.
080    CONF.set(RpcServerFactory.CUSTOM_RPC_SERVER_IMPL_CONF_KEY, SimpleRpcServer.class.getName());
081  }
082
083  protected abstract RpcServer createRpcServer(final Server server, final String name,
084      final List<BlockingServiceAndInterface> services,
085      final InetSocketAddress bindAddress, Configuration conf,
086      RpcScheduler scheduler) throws IOException;
087
088  protected abstract AbstractRpcClient<?> createRpcClientNoCodec(Configuration conf);
089
090  /**
091   * Ensure we do not HAVE TO HAVE a codec.
092   */
093  @Test
094  public void testNoCodec() throws IOException, ServiceException {
095    Configuration conf = HBaseConfiguration.create();
096    RpcServer rpcServer = createRpcServer(null, "testRpcServer",
097        Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(
098            SERVICE, null)), new InetSocketAddress("localhost", 0), CONF,
099        new FifoRpcScheduler(CONF, 1));
100    try (AbstractRpcClient<?> client = createRpcClientNoCodec(conf)) {
101      rpcServer.start();
102      BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
103      HBaseRpcController pcrc = new HBaseRpcControllerImpl();
104      String message = "hello";
105      assertEquals(message,
106        stub.echo(pcrc, EchoRequestProto.newBuilder().setMessage(message).build()).getMessage());
107      assertNull(pcrc.cellScanner());
108    } finally {
109      rpcServer.stop();
110    }
111  }
112
113  protected abstract AbstractRpcClient<?> createRpcClient(Configuration conf);
114
115  /**
116   * It is hard to verify the compression is actually happening under the wraps. Hope that if
117   * unsupported, we'll get an exception out of some time (meantime, have to trace it manually to
118   * confirm that compression is happening down in the client and server).
119   */
120  @Test
121  public void testCompressCellBlock() throws IOException, ServiceException {
122    Configuration conf = new Configuration(HBaseConfiguration.create());
123    conf.set("hbase.client.rpc.compressor", GzipCodec.class.getCanonicalName());
124    List<Cell> cells = new ArrayList<>();
125    int count = 3;
126    for (int i = 0; i < count; i++) {
127      cells.add(CELL);
128    }
129    RpcServer rpcServer = createRpcServer(null, "testRpcServer",
130        Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(
131            SERVICE, null)), new InetSocketAddress("localhost", 0), CONF,
132        new FifoRpcScheduler(CONF, 1));
133
134    try (AbstractRpcClient<?> client = createRpcClient(conf)) {
135      rpcServer.start();
136      BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
137      HBaseRpcController pcrc = new HBaseRpcControllerImpl(CellUtil.createCellScanner(cells));
138      String message = "hello";
139      assertEquals(message,
140        stub.echo(pcrc, EchoRequestProto.newBuilder().setMessage(message).build()).getMessage());
141      int index = 0;
142      CellScanner cellScanner = pcrc.cellScanner();
143      assertNotNull(cellScanner);
144      while (cellScanner.advance()) {
145        assertEquals(CELL, cellScanner.current());
146        index++;
147      }
148      assertEquals(count, index);
149    } finally {
150      rpcServer.stop();
151    }
152  }
153
154  protected abstract AbstractRpcClient<?> createRpcClientRTEDuringConnectionSetup(
155      Configuration conf) throws IOException;
156
157  @Test
158  public void testRTEDuringConnectionSetup() throws Exception {
159    Configuration conf = HBaseConfiguration.create();
160    RpcServer rpcServer = createRpcServer(null, "testRpcServer",
161        Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(
162            SERVICE, null)), new InetSocketAddress("localhost", 0), CONF,
163        new FifoRpcScheduler(CONF, 1));
164    try (AbstractRpcClient<?> client = createRpcClientRTEDuringConnectionSetup(conf)) {
165      rpcServer.start();
166      BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
167      stub.ping(null, EmptyRequestProto.getDefaultInstance());
168      fail("Expected an exception to have been thrown!");
169    } catch (Exception e) {
170      LOG.info("Caught expected exception: " + e.toString());
171      assertTrue(e.toString(), StringUtils.stringifyException(e).contains("Injected fault"));
172    } finally {
173      rpcServer.stop();
174    }
175  }
176
177  /**
178   * Tests that the rpc scheduler is called when requests arrive.
179   */
180  @Test
181  public void testRpcScheduler() throws IOException, ServiceException, InterruptedException {
182    RpcScheduler scheduler = spy(new FifoRpcScheduler(CONF, 1));
183    RpcServer rpcServer = createRpcServer(null, "testRpcServer",
184        Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(
185            SERVICE, null)), new InetSocketAddress("localhost", 0), CONF, scheduler);
186    verify(scheduler).init((RpcScheduler.Context) anyObject());
187    try (AbstractRpcClient<?> client = createRpcClient(CONF)) {
188      rpcServer.start();
189      verify(scheduler).start();
190      BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
191      EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
192      for (int i = 0; i < 10; i++) {
193        stub.echo(null, param);
194      }
195      verify(scheduler, times(10)).dispatch((CallRunner) anyObject());
196    } finally {
197      rpcServer.stop();
198      verify(scheduler).stop();
199    }
200  }
201
202  /** Tests that the rpc scheduler is called when requests arrive. */
203  @Test
204  public void testRpcMaxRequestSize() throws IOException, ServiceException {
205    Configuration conf = new Configuration(CONF);
206    conf.setInt(RpcServer.MAX_REQUEST_SIZE, 1000);
207    RpcServer rpcServer = createRpcServer(null, "testRpcServer",
208        Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(
209            SERVICE, null)), new InetSocketAddress("localhost", 0), conf,
210        new FifoRpcScheduler(conf, 1));
211    try (AbstractRpcClient<?> client = createRpcClient(conf)) {
212      rpcServer.start();
213      BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
214      StringBuilder message = new StringBuilder(1200);
215      for (int i = 0; i < 200; i++) {
216        message.append("hello.");
217      }
218      // set total RPC size bigger than 100 bytes
219      EchoRequestProto param = EchoRequestProto.newBuilder().setMessage(message.toString()).build();
220      stub.echo(
221        new HBaseRpcControllerImpl(CellUtil.createCellScanner(ImmutableList.<Cell> of(CELL))),
222        param);
223      fail("RPC should have failed because it exceeds max request size");
224    } catch (ServiceException e) {
225      LOG.info("Caught expected exception: " + e);
226      assertTrue(e.toString(),
227          StringUtils.stringifyException(e).contains("RequestTooBigException"));
228    } finally {
229      rpcServer.stop();
230    }
231  }
232
233  /**
234   * Tests that the RpcServer creates & dispatches CallRunner object to scheduler with non-null
235   * remoteAddress set to its Call Object
236   */
237  @Test
238  public void testRpcServerForNotNullRemoteAddressInCallObject()
239      throws IOException, ServiceException {
240    RpcServer rpcServer = createRpcServer(null, "testRpcServer",
241        Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(
242            SERVICE, null)), new InetSocketAddress("localhost", 0), CONF,
243        new FifoRpcScheduler(CONF, 1));
244    InetSocketAddress localAddr = new InetSocketAddress("localhost", 0);
245    try (AbstractRpcClient<?> client = createRpcClient(CONF)) {
246      rpcServer.start();
247      BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
248      assertEquals(localAddr.getAddress().getHostAddress(),
249        stub.addr(null, EmptyRequestProto.getDefaultInstance()).getAddr());
250    } finally {
251      rpcServer.stop();
252    }
253  }
254
255  @Test
256  public void testRemoteError() throws IOException, ServiceException {
257    RpcServer rpcServer = createRpcServer(null, "testRpcServer",
258        Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(
259            SERVICE, null)), new InetSocketAddress("localhost", 0), CONF,
260        new FifoRpcScheduler(CONF, 1));
261    try (AbstractRpcClient<?> client = createRpcClient(CONF)) {
262      rpcServer.start();
263      BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
264      stub.error(null, EmptyRequestProto.getDefaultInstance());
265    } catch (ServiceException e) {
266      LOG.info("Caught expected exception: " + e);
267      IOException ioe = ProtobufUtil.handleRemoteException(e);
268      assertTrue(ioe instanceof DoNotRetryIOException);
269      assertTrue(ioe.getMessage().contains("server error!"));
270    } finally {
271      rpcServer.stop();
272    }
273  }
274
275  @Test
276  public void testTimeout() throws IOException {
277    RpcServer rpcServer = createRpcServer(null, "testRpcServer",
278        Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(
279            SERVICE, null)), new InetSocketAddress("localhost", 0), CONF,
280        new FifoRpcScheduler(CONF, 1));
281    try (AbstractRpcClient<?> client = createRpcClient(CONF)) {
282      rpcServer.start();
283      BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
284      HBaseRpcController pcrc = new HBaseRpcControllerImpl();
285      int ms = 1000;
286      int timeout = 100;
287      for (int i = 0; i < 10; i++) {
288        pcrc.reset();
289        pcrc.setCallTimeout(timeout);
290        long startTime = System.nanoTime();
291        try {
292          stub.pause(pcrc, PauseRequestProto.newBuilder().setMs(ms).build());
293        } catch (ServiceException e) {
294          long waitTime = (System.nanoTime() - startTime) / 1000000;
295          // expected
296          LOG.info("Caught expected exception: " + e);
297          IOException ioe = ProtobufUtil.handleRemoteException(e);
298          assertTrue(ioe.getCause() instanceof CallTimeoutException);
299          // confirm that we got exception before the actual pause.
300          assertTrue(waitTime < ms);
301        }
302      }
303    } finally {
304      rpcServer.stop();
305    }
306  }
307
308  protected abstract RpcServer createTestFailingRpcServer(final Server server, final String name,
309      final List<BlockingServiceAndInterface> services,
310      final InetSocketAddress bindAddress, Configuration conf,
311      RpcScheduler scheduler) throws IOException;
312
313  /** Tests that the connection closing is handled by the client with outstanding RPC calls */
314  @Test
315  public void testConnectionCloseWithOutstandingRPCs() throws InterruptedException, IOException {
316    Configuration conf = new Configuration(CONF);
317    RpcServer rpcServer = createTestFailingRpcServer(null, "testRpcServer",
318        Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(
319            SERVICE, null)), new InetSocketAddress("localhost", 0), CONF,
320        new FifoRpcScheduler(CONF, 1));
321
322    try (AbstractRpcClient<?> client = createRpcClient(conf)) {
323      rpcServer.start();
324      BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
325      EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
326      stub.echo(null, param);
327      fail("RPC should have failed because connection closed");
328    } catch (ServiceException e) {
329      LOG.info("Caught expected exception: " + e.toString());
330    } finally {
331      rpcServer.stop();
332    }
333  }
334
335  @Test
336  public void testAsyncEcho() throws IOException {
337    Configuration conf = HBaseConfiguration.create();
338    RpcServer rpcServer = createRpcServer(null, "testRpcServer",
339        Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(
340            SERVICE, null)), new InetSocketAddress("localhost", 0), CONF,
341        new FifoRpcScheduler(CONF, 1));
342    try (AbstractRpcClient<?> client = createRpcClient(conf)) {
343      rpcServer.start();
344      Interface stub = newStub(client, rpcServer.getListenerAddress());
345      int num = 10;
346      List<HBaseRpcController> pcrcList = new ArrayList<>();
347      List<BlockingRpcCallback<EchoResponseProto>> callbackList = new ArrayList<>();
348      for (int i = 0; i < num; i++) {
349        HBaseRpcController pcrc = new HBaseRpcControllerImpl();
350        BlockingRpcCallback<EchoResponseProto> done = new BlockingRpcCallback<>();
351        stub.echo(pcrc, EchoRequestProto.newBuilder().setMessage("hello-" + i).build(), done);
352        pcrcList.add(pcrc);
353        callbackList.add(done);
354      }
355      for (int i = 0; i < num; i++) {
356        HBaseRpcController pcrc = pcrcList.get(i);
357        assertFalse(pcrc.failed());
358        assertNull(pcrc.cellScanner());
359        assertEquals("hello-" + i, callbackList.get(i).get().getMessage());
360      }
361    } finally {
362      rpcServer.stop();
363    }
364  }
365
366  @Test
367  public void testAsyncRemoteError() throws IOException {
368    AbstractRpcClient<?> client = createRpcClient(CONF);
369    RpcServer rpcServer = createRpcServer(null, "testRpcServer",
370        Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(
371            SERVICE, null)), new InetSocketAddress("localhost", 0), CONF,
372        new FifoRpcScheduler(CONF, 1));
373    try {
374      rpcServer.start();
375      Interface stub = newStub(client, rpcServer.getListenerAddress());
376      BlockingRpcCallback<EmptyResponseProto> callback = new BlockingRpcCallback<>();
377      HBaseRpcController pcrc = new HBaseRpcControllerImpl();
378      stub.error(pcrc, EmptyRequestProto.getDefaultInstance(), callback);
379      assertNull(callback.get());
380      assertTrue(pcrc.failed());
381      LOG.info("Caught expected exception: " + pcrc.getFailed());
382      IOException ioe = ProtobufUtil.handleRemoteException(pcrc.getFailed());
383      assertTrue(ioe instanceof DoNotRetryIOException);
384      assertTrue(ioe.getMessage().contains("server error!"));
385    } finally {
386      client.close();
387      rpcServer.stop();
388    }
389  }
390
391  @Test
392  public void testAsyncTimeout() throws IOException {
393    RpcServer rpcServer = createRpcServer(null, "testRpcServer",
394        Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(
395            SERVICE, null)), new InetSocketAddress("localhost", 0), CONF,
396        new FifoRpcScheduler(CONF, 1));
397    try (AbstractRpcClient<?> client = createRpcClient(CONF)) {
398      rpcServer.start();
399      Interface stub = newStub(client, rpcServer.getListenerAddress());
400      List<HBaseRpcController> pcrcList = new ArrayList<>();
401      List<BlockingRpcCallback<EmptyResponseProto>> callbackList = new ArrayList<>();
402      int ms = 1000;
403      int timeout = 100;
404      long startTime = System.nanoTime();
405      for (int i = 0; i < 10; i++) {
406        HBaseRpcController pcrc = new HBaseRpcControllerImpl();
407        pcrc.setCallTimeout(timeout);
408        BlockingRpcCallback<EmptyResponseProto> callback = new BlockingRpcCallback<>();
409        stub.pause(pcrc, PauseRequestProto.newBuilder().setMs(ms).build(), callback);
410        pcrcList.add(pcrc);
411        callbackList.add(callback);
412      }
413      for (BlockingRpcCallback<?> callback : callbackList) {
414        assertNull(callback.get());
415      }
416      long waitTime = (System.nanoTime() - startTime) / 1000000;
417      for (HBaseRpcController pcrc : pcrcList) {
418        assertTrue(pcrc.failed());
419        LOG.info("Caught expected exception: " + pcrc.getFailed());
420        IOException ioe = ProtobufUtil.handleRemoteException(pcrc.getFailed());
421        assertTrue(ioe.getCause() instanceof CallTimeoutException);
422      }
423      // confirm that we got exception before the actual pause.
424      assertTrue(waitTime < ms);
425    } finally {
426      rpcServer.stop();
427    }
428  }
429
430}