001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import java.io.IOException; 021import java.net.InetSocketAddress; 022import java.nio.channels.SocketChannel; 023import java.util.List; 024import org.apache.hadoop.conf.Configuration; 025import org.apache.hadoop.hbase.DoNotRetryIOException; 026import org.apache.hadoop.hbase.HBaseClassTestRule; 027import org.apache.hadoop.hbase.Server; 028import org.apache.hadoop.hbase.codec.Codec; 029import org.apache.hadoop.hbase.nio.ByteBuff; 030import org.apache.hadoop.hbase.testclassification.RPCTests; 031import org.apache.hadoop.hbase.testclassification.SmallTests; 032import org.junit.ClassRule; 033import org.junit.experimental.categories.Category; 034 035@Category({ RPCTests.class, SmallTests.class }) 036public class TestBlockingIPC extends AbstractTestIPC { 037 038 @ClassRule 039 public static final HBaseClassTestRule CLASS_RULE = 040 HBaseClassTestRule.forClass(TestBlockingIPC.class); 041 042 @Override protected RpcServer createRpcServer(Server server, String name, 043 List<RpcServer.BlockingServiceAndInterface> services, InetSocketAddress bindAddress, 044 Configuration conf, RpcScheduler scheduler) throws IOException { 045 return RpcServerFactory.createRpcServer(server, name, services, bindAddress, conf, scheduler); 046 } 047 048 @Override 049 protected BlockingRpcClient createRpcClientNoCodec(Configuration conf) { 050 return new BlockingRpcClient(conf) { 051 @Override 052 Codec getCodec() { 053 return null; 054 } 055 }; 056 } 057 058 @Override 059 protected BlockingRpcClient createRpcClient(Configuration conf) { 060 return new BlockingRpcClient(conf); 061 } 062 063 @Override 064 protected BlockingRpcClient createRpcClientRTEDuringConnectionSetup(Configuration conf) 065 throws IOException { 066 return new BlockingRpcClient(conf) { 067 068 @Override 069 boolean isTcpNoDelay() { 070 throw new RuntimeException("Injected fault"); 071 } 072 }; 073 } 074 075 private static class TestFailingRpcServer extends SimpleRpcServer { 076 077 TestFailingRpcServer(Server server, String name, 078 List<RpcServer.BlockingServiceAndInterface> services, InetSocketAddress bindAddress, 079 Configuration conf, RpcScheduler scheduler) throws IOException { 080 super(server, name, services, bindAddress, conf, scheduler, true); 081 } 082 083 final class FailingConnection extends SimpleServerRpcConnection { 084 private FailingConnection(TestFailingRpcServer rpcServer, SocketChannel channel, 085 long lastContact) { 086 super(rpcServer, channel, lastContact); 087 } 088 089 @Override 090 public void processRequest(ByteBuff buf) throws IOException, InterruptedException { 091 // this will throw exception after the connection header is read, and an RPC is sent 092 // from client 093 throw new DoNotRetryIOException("Failing for test"); 094 } 095 } 096 097 @Override 098 protected SimpleServerRpcConnection getConnection(SocketChannel channel, long time) { 099 return new FailingConnection(this, channel, time); 100 } 101 } 102 103 @Override 104 protected RpcServer createTestFailingRpcServer(Server server, String name, 105 List<RpcServer.BlockingServiceAndInterface> services, InetSocketAddress bindAddress, 106 Configuration conf, RpcScheduler scheduler) throws IOException { 107 return new TestFailingRpcServer(server, name, services, bindAddress, conf, scheduler); 108 } 109}