001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import java.io.IOException; 021import java.nio.channels.CancelledKeyException; 022import java.nio.channels.ClosedChannelException; 023import java.nio.channels.SelectionKey; 024import java.nio.channels.Selector; 025import java.util.ArrayList; 026import java.util.Collections; 027import java.util.Iterator; 028import java.util.Set; 029import java.util.concurrent.ConcurrentHashMap; 030import org.apache.hadoop.hbase.HBaseIOException; 031import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 032import org.apache.hadoop.hbase.util.Threads; 033import org.apache.hadoop.util.StringUtils; 034import org.apache.yetus.audience.InterfaceAudience; 035 036/** 037 * Sends responses of RPC back to clients. 038 */ 039@Deprecated 040@InterfaceAudience.Private 041class SimpleRpcServerResponder extends Thread { 042 043 private final SimpleRpcServer simpleRpcServer; 044 private final Selector writeSelector; 045 private final Set<SimpleServerRpcConnection> writingCons = 046 Collections.newSetFromMap(new ConcurrentHashMap<>()); 047 048 SimpleRpcServerResponder(SimpleRpcServer simpleRpcServer) throws IOException { 049 this.simpleRpcServer = simpleRpcServer; 050 this.setName("RpcServer.responder"); 051 this.setDaemon(true); 052 this.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER); 053 writeSelector = Selector.open(); // create a selector 054 } 055 056 @Override 057 public void run() { 058 SimpleRpcServer.LOG.debug(getName() + ": starting"); 059 try { 060 doRunLoop(); 061 } finally { 062 SimpleRpcServer.LOG.info(getName() + ": stopping"); 063 try { 064 writeSelector.close(); 065 } catch (IOException ioe) { 066 SimpleRpcServer.LOG.error(getName() + ": couldn't close write selector", ioe); 067 } 068 } 069 } 070 071 /** 072 * Take the list of the connections that want to write, and register them in the selector. 073 */ 074 private void registerWrites() { 075 Iterator<SimpleServerRpcConnection> it = writingCons.iterator(); 076 while (it.hasNext()) { 077 SimpleServerRpcConnection c = it.next(); 078 it.remove(); 079 SelectionKey sk = c.channel.keyFor(writeSelector); 080 try { 081 if (sk == null) { 082 try { 083 c.channel.register(writeSelector, SelectionKey.OP_WRITE, c); 084 } catch (ClosedChannelException e) { 085 // ignore: the client went away. 086 if (SimpleRpcServer.LOG.isTraceEnabled()) SimpleRpcServer.LOG.trace("ignored", e); 087 } 088 } else { 089 sk.interestOps(SelectionKey.OP_WRITE); 090 } 091 } catch (CancelledKeyException e) { 092 // ignore: the client went away. 093 if (SimpleRpcServer.LOG.isTraceEnabled()) SimpleRpcServer.LOG.trace("ignored", e); 094 } 095 } 096 } 097 098 /** 099 * Add a connection to the list that want to write, 100 */ 101 public void registerForWrite(SimpleServerRpcConnection c) { 102 if (writingCons.add(c)) { 103 writeSelector.wakeup(); 104 } 105 } 106 107 private void doRunLoop() { 108 long lastPurgeTime = 0; // last check for old calls. 109 while (this.simpleRpcServer.running) { 110 try { 111 registerWrites(); 112 int keyCt = writeSelector.select(this.simpleRpcServer.purgeTimeout); 113 if (keyCt == 0) { 114 continue; 115 } 116 117 Set<SelectionKey> keys = writeSelector.selectedKeys(); 118 Iterator<SelectionKey> iter = keys.iterator(); 119 while (iter.hasNext()) { 120 SelectionKey key = iter.next(); 121 iter.remove(); 122 try { 123 if (key.isValid() && key.isWritable()) { 124 doAsyncWrite(key); 125 } 126 } catch (IOException e) { 127 SimpleRpcServer.LOG.debug(getName() + ": asyncWrite", e); 128 } 129 } 130 131 lastPurgeTime = purge(lastPurgeTime); 132 133 } catch (OutOfMemoryError e) { 134 if (this.simpleRpcServer.errorHandler != null) { 135 if (this.simpleRpcServer.errorHandler.checkOOME(e)) { 136 SimpleRpcServer.LOG.info(getName() + ": exiting on OutOfMemoryError"); 137 return; 138 } 139 } else { 140 // 141 // we can run out of memory if we have too many threads 142 // log the event and sleep for a minute and give 143 // some thread(s) a chance to finish 144 // 145 SimpleRpcServer.LOG.warn(getName() + ": OutOfMemoryError in server select", e); 146 try { 147 Thread.sleep(60000); 148 } catch (InterruptedException ex) { 149 SimpleRpcServer.LOG.debug("Interrupted while sleeping"); 150 return; 151 } 152 } 153 } catch (Exception e) { 154 SimpleRpcServer.LOG 155 .warn(getName() + ": exception in Responder " + StringUtils.stringifyException(e), e); 156 } 157 } 158 SimpleRpcServer.LOG.info(getName() + ": stopped"); 159 } 160 161 /** 162 * If there were some calls that have not been sent out for a long time, we close the connection. 163 * @return the time of the purge. 164 */ 165 private long purge(long lastPurgeTime) { 166 long now = EnvironmentEdgeManager.currentTime(); 167 if (now < lastPurgeTime + this.simpleRpcServer.purgeTimeout) { 168 return lastPurgeTime; 169 } 170 171 ArrayList<SimpleServerRpcConnection> conWithOldCalls = new ArrayList<>(); 172 // get the list of channels from list of keys. 173 synchronized (writeSelector.keys()) { 174 for (SelectionKey key : writeSelector.keys()) { 175 SimpleServerRpcConnection connection = (SimpleServerRpcConnection) key.attachment(); 176 if (connection == null) { 177 throw new IllegalStateException("Coding error: SelectionKey key without attachment."); 178 } 179 if ( 180 connection.lastSentTime > 0 181 && now > connection.lastSentTime + this.simpleRpcServer.purgeTimeout 182 ) { 183 conWithOldCalls.add(connection); 184 } 185 } 186 } 187 188 // Seems safer to close the connection outside of the synchronized loop... 189 for (SimpleServerRpcConnection connection : conWithOldCalls) { 190 this.simpleRpcServer.closeConnection(connection); 191 } 192 193 return now; 194 } 195 196 private void doAsyncWrite(SelectionKey key) throws IOException { 197 SimpleServerRpcConnection connection = (SimpleServerRpcConnection) key.attachment(); 198 if (connection == null) { 199 throw new IOException("doAsyncWrite: no connection"); 200 } 201 if (key.channel() != connection.channel) { 202 throw new IOException("doAsyncWrite: bad channel"); 203 } 204 205 if (processAllResponses(connection)) { 206 try { 207 // We wrote everything, so we don't need to be told when the socket is ready for 208 // write anymore. 209 key.interestOps(0); 210 } catch (CancelledKeyException e) { 211 /* 212 * The Listener/reader might have closed the socket. We don't explicitly cancel the key, so 213 * not sure if this will ever fire. This warning could be removed. 214 */ 215 SimpleRpcServer.LOG.warn("Exception while changing ops : " + e); 216 } 217 } 218 } 219 220 /** 221 * Process the response for this call. You need to have the lock on 222 * {@link org.apache.hadoop.hbase.ipc.SimpleServerRpcConnection#responseWriteLock} 223 * @return true if we proceed the call fully, false otherwise. n 224 */ 225 private boolean processResponse(SimpleServerRpcConnection conn, RpcResponse resp) 226 throws IOException { 227 boolean error = true; 228 BufferChain buf = resp.getResponse(); 229 try { 230 // Send as much data as we can in the non-blocking fashion 231 long numBytes = this.simpleRpcServer.channelWrite(conn.channel, buf); 232 if (numBytes < 0) { 233 throw new HBaseIOException("Error writing on the socket " + conn); 234 } 235 error = false; 236 } finally { 237 if (error) { 238 SimpleRpcServer.LOG.debug(conn + ": output error -- closing"); 239 // We will be closing this connection itself. Mark this call as done so that all the 240 // buffer(s) it got from pool can get released 241 resp.done(); 242 this.simpleRpcServer.closeConnection(conn); 243 } 244 } 245 246 if (!buf.hasRemaining()) { 247 resp.done(); 248 return true; 249 } else { 250 // set the serve time when the response has to be sent later 251 conn.lastSentTime = EnvironmentEdgeManager.currentTime(); 252 return false; // Socket can't take more, we will have to come back. 253 } 254 } 255 256 /** 257 * Process all the responses for this connection 258 * @return true if all the calls were processed or that someone else is doing it. false if there * 259 * is still some work to do. In this case, we expect the caller to delay us. n 260 */ 261 private boolean processAllResponses(final SimpleServerRpcConnection connection) 262 throws IOException { 263 // We want only one writer on the channel for a connection at a time. 264 connection.responseWriteLock.lock(); 265 try { 266 for (int i = 0; i < 20; i++) { 267 // protection if some handlers manage to need all the responder 268 RpcResponse resp = connection.responseQueue.pollFirst(); 269 if (resp == null) { 270 return true; 271 } 272 if (!processResponse(connection, resp)) { 273 connection.responseQueue.addFirst(resp); 274 return false; 275 } 276 } 277 } finally { 278 connection.responseWriteLock.unlock(); 279 } 280 281 return connection.responseQueue.isEmpty(); 282 } 283 284 // 285 // Enqueue a response from the application. 286 // 287 void doRespond(SimpleServerRpcConnection conn, RpcResponse resp) throws IOException { 288 boolean added = false; 289 // If there is already a write in progress, we don't wait. This allows to free the handlers 290 // immediately for other tasks. 291 if (conn.responseQueue.isEmpty() && conn.responseWriteLock.tryLock()) { 292 try { 293 if (conn.responseQueue.isEmpty()) { 294 // If we're alone, we can try to do a direct call to the socket. It's 295 // an optimization to save on context switches and data transfer between cores.. 296 if (processResponse(conn, resp)) { 297 return; // we're done. 298 } 299 // Too big to fit, putting ahead. 300 conn.responseQueue.addFirst(resp); 301 added = true; // We will register to the selector later, outside of the lock. 302 } 303 } finally { 304 conn.responseWriteLock.unlock(); 305 } 306 } 307 308 if (!added) { 309 conn.responseQueue.addLast(resp); 310 } 311 registerForWrite(conn); 312 } 313}