001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import java.io.IOException; 021import java.net.InetSocketAddress; 022import java.nio.channels.SocketChannel; 023import java.util.List; 024import org.apache.hadoop.conf.Configuration; 025import org.apache.hadoop.hbase.DoNotRetryIOException; 026import org.apache.hadoop.hbase.HBaseClassTestRule; 027import org.apache.hadoop.hbase.Server; 028import org.apache.hadoop.hbase.codec.Codec; 029import org.apache.hadoop.hbase.nio.ByteBuff; 030import org.apache.hadoop.hbase.testclassification.MediumTests; 031import org.apache.hadoop.hbase.testclassification.RPCTests; 032import org.junit.ClassRule; 033import org.junit.experimental.categories.Category; 034 035@Category({ RPCTests.class, MediumTests.class }) 036public class TestBlockingIPC extends AbstractTestIPC { 037 038 @ClassRule 039 public static final HBaseClassTestRule CLASS_RULE = 040 HBaseClassTestRule.forClass(TestBlockingIPC.class); 041 042 @Override 043 protected RpcServer createRpcServer(Server server, String name, 044 List<RpcServer.BlockingServiceAndInterface> services, InetSocketAddress bindAddress, 045 Configuration conf, RpcScheduler scheduler) throws IOException { 046 return RpcServerFactory.createRpcServer(server, name, services, bindAddress, conf, scheduler); 047 } 048 049 @Override 050 protected BlockingRpcClient createRpcClientNoCodec(Configuration conf) { 051 return new BlockingRpcClient(conf) { 052 @Override 053 Codec getCodec() { 054 return null; 055 } 056 }; 057 } 058 059 @Override 060 protected BlockingRpcClient createRpcClient(Configuration conf) { 061 return new BlockingRpcClient(conf); 062 } 063 064 @Override 065 protected BlockingRpcClient createRpcClientRTEDuringConnectionSetup(Configuration conf) 066 throws IOException { 067 return new BlockingRpcClient(conf) { 068 069 @Override 070 boolean isTcpNoDelay() { 071 throw new RuntimeException("Injected fault"); 072 } 073 }; 074 } 075 076 private static class TestFailingRpcServer extends SimpleRpcServer { 077 078 TestFailingRpcServer(Server server, String name, 079 List<RpcServer.BlockingServiceAndInterface> services, InetSocketAddress bindAddress, 080 Configuration conf, RpcScheduler scheduler) throws IOException { 081 super(server, name, services, bindAddress, conf, scheduler, true); 082 } 083 084 final class FailingConnection extends SimpleServerRpcConnection { 085 private FailingConnection(TestFailingRpcServer rpcServer, SocketChannel channel, 086 long lastContact) { 087 super(rpcServer, channel, lastContact); 088 } 089 090 @Override 091 public void processRequest(ByteBuff buf) throws IOException, InterruptedException { 092 // this will throw exception after the connection header is read, and an RPC is sent 093 // from client 094 throw new DoNotRetryIOException("Failing for test"); 095 } 096 } 097 098 @Override 099 protected SimpleServerRpcConnection getConnection(SocketChannel channel, long time) { 100 return new FailingConnection(this, channel, time); 101 } 102 } 103 104 @Override 105 protected RpcServer createTestFailingRpcServer(Server server, String name, 106 List<RpcServer.BlockingServiceAndInterface> services, InetSocketAddress bindAddress, 107 Configuration conf, RpcScheduler scheduler) throws IOException { 108 return new TestFailingRpcServer(server, name, services, bindAddress, conf, scheduler); 109 } 110}