001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import java.io.IOException; 021import java.nio.channels.CancelledKeyException; 022import java.nio.channels.ClosedChannelException; 023import java.nio.channels.SelectionKey; 024import java.nio.channels.Selector; 025import java.util.ArrayList; 026import java.util.Collections; 027import java.util.Iterator; 028import java.util.Set; 029import java.util.concurrent.ConcurrentHashMap; 030import org.apache.hadoop.hbase.HBaseIOException; 031import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 032import org.apache.hadoop.hbase.util.Threads; 033import org.apache.hadoop.util.StringUtils; 034import org.apache.yetus.audience.InterfaceAudience; 035 036/** 037 * Sends responses of RPC back to clients. 038 */ 039@InterfaceAudience.Private 040class SimpleRpcServerResponder extends Thread { 041 042 private final SimpleRpcServer simpleRpcServer; 043 private final Selector writeSelector; 044 private final Set<SimpleServerRpcConnection> writingCons = 045 Collections.newSetFromMap(new ConcurrentHashMap<>()); 046 047 SimpleRpcServerResponder(SimpleRpcServer simpleRpcServer) throws IOException { 048 this.simpleRpcServer = simpleRpcServer; 049 this.setName("RpcServer.responder"); 050 this.setDaemon(true); 051 this.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER); 052 writeSelector = Selector.open(); // create a selector 053 } 054 055 @Override 056 public void run() { 057 SimpleRpcServer.LOG.debug(getName() + ": starting"); 058 try { 059 doRunLoop(); 060 } finally { 061 SimpleRpcServer.LOG.info(getName() + ": stopping"); 062 try { 063 writeSelector.close(); 064 } catch (IOException ioe) { 065 SimpleRpcServer.LOG.error(getName() + ": couldn't close write selector", ioe); 066 } 067 } 068 } 069 070 /** 071 * Take the list of the connections that want to write, and register them in the selector. 072 */ 073 private void registerWrites() { 074 Iterator<SimpleServerRpcConnection> it = writingCons.iterator(); 075 while (it.hasNext()) { 076 SimpleServerRpcConnection c = it.next(); 077 it.remove(); 078 SelectionKey sk = c.channel.keyFor(writeSelector); 079 try { 080 if (sk == null) { 081 try { 082 c.channel.register(writeSelector, SelectionKey.OP_WRITE, c); 083 } catch (ClosedChannelException e) { 084 // ignore: the client went away. 085 if (SimpleRpcServer.LOG.isTraceEnabled()) SimpleRpcServer.LOG.trace("ignored", e); 086 } 087 } else { 088 sk.interestOps(SelectionKey.OP_WRITE); 089 } 090 } catch (CancelledKeyException e) { 091 // ignore: the client went away. 092 if (SimpleRpcServer.LOG.isTraceEnabled()) SimpleRpcServer.LOG.trace("ignored", e); 093 } 094 } 095 } 096 097 /** 098 * Add a connection to the list that want to write, 099 */ 100 public void registerForWrite(SimpleServerRpcConnection c) { 101 if (writingCons.add(c)) { 102 writeSelector.wakeup(); 103 } 104 } 105 106 private void doRunLoop() { 107 long lastPurgeTime = 0; // last check for old calls. 108 while (this.simpleRpcServer.running) { 109 try { 110 registerWrites(); 111 int keyCt = writeSelector.select(this.simpleRpcServer.purgeTimeout); 112 if (keyCt == 0) { 113 continue; 114 } 115 116 Set<SelectionKey> keys = writeSelector.selectedKeys(); 117 Iterator<SelectionKey> iter = keys.iterator(); 118 while (iter.hasNext()) { 119 SelectionKey key = iter.next(); 120 iter.remove(); 121 try { 122 if (key.isValid() && key.isWritable()) { 123 doAsyncWrite(key); 124 } 125 } catch (IOException e) { 126 SimpleRpcServer.LOG.debug(getName() + ": asyncWrite", e); 127 } 128 } 129 130 lastPurgeTime = purge(lastPurgeTime); 131 132 } catch (OutOfMemoryError e) { 133 if (this.simpleRpcServer.errorHandler != null) { 134 if (this.simpleRpcServer.errorHandler.checkOOME(e)) { 135 SimpleRpcServer.LOG.info(getName() + ": exiting on OutOfMemoryError"); 136 return; 137 } 138 } else { 139 // 140 // we can run out of memory if we have too many threads 141 // log the event and sleep for a minute and give 142 // some thread(s) a chance to finish 143 // 144 SimpleRpcServer.LOG.warn(getName() + ": OutOfMemoryError in server select", e); 145 try { 146 Thread.sleep(60000); 147 } catch (InterruptedException ex) { 148 SimpleRpcServer.LOG.debug("Interrupted while sleeping"); 149 return; 150 } 151 } 152 } catch (Exception e) { 153 SimpleRpcServer.LOG 154 .warn(getName() + ": exception in Responder " + StringUtils.stringifyException(e), e); 155 } 156 } 157 SimpleRpcServer.LOG.info(getName() + ": stopped"); 158 } 159 160 /** 161 * If there were some calls that have not been sent out for a long time, we close the connection. 162 * @return the time of the purge. 163 */ 164 private long purge(long lastPurgeTime) { 165 long now = EnvironmentEdgeManager.currentTime(); 166 if (now < lastPurgeTime + this.simpleRpcServer.purgeTimeout) { 167 return lastPurgeTime; 168 } 169 170 ArrayList<SimpleServerRpcConnection> conWithOldCalls = new ArrayList<>(); 171 // get the list of channels from list of keys. 172 synchronized (writeSelector.keys()) { 173 for (SelectionKey key : writeSelector.keys()) { 174 SimpleServerRpcConnection connection = (SimpleServerRpcConnection) key.attachment(); 175 if (connection == null) { 176 throw new IllegalStateException("Coding error: SelectionKey key without attachment."); 177 } 178 if ( 179 connection.lastSentTime > 0 180 && now > connection.lastSentTime + this.simpleRpcServer.purgeTimeout 181 ) { 182 conWithOldCalls.add(connection); 183 } 184 } 185 } 186 187 // Seems safer to close the connection outside of the synchronized loop... 188 for (SimpleServerRpcConnection connection : conWithOldCalls) { 189 this.simpleRpcServer.closeConnection(connection); 190 } 191 192 return now; 193 } 194 195 private void doAsyncWrite(SelectionKey key) throws IOException { 196 SimpleServerRpcConnection connection = (SimpleServerRpcConnection) key.attachment(); 197 if (connection == null) { 198 throw new IOException("doAsyncWrite: no connection"); 199 } 200 if (key.channel() != connection.channel) { 201 throw new IOException("doAsyncWrite: bad channel"); 202 } 203 204 if (processAllResponses(connection)) { 205 try { 206 // We wrote everything, so we don't need to be told when the socket is ready for 207 // write anymore. 208 key.interestOps(0); 209 } catch (CancelledKeyException e) { 210 /* 211 * The Listener/reader might have closed the socket. We don't explicitly cancel the key, so 212 * not sure if this will ever fire. This warning could be removed. 213 */ 214 SimpleRpcServer.LOG.warn("Exception while changing ops : " + e); 215 } 216 } 217 } 218 219 /** 220 * Process the response for this call. You need to have the lock on 221 * {@link org.apache.hadoop.hbase.ipc.SimpleServerRpcConnection#responseWriteLock} 222 * @return true if we proceed the call fully, false otherwise. n 223 */ 224 private boolean processResponse(SimpleServerRpcConnection conn, RpcResponse resp) 225 throws IOException { 226 boolean error = true; 227 BufferChain buf = resp.getResponse(); 228 try { 229 // Send as much data as we can in the non-blocking fashion 230 long numBytes = this.simpleRpcServer.channelWrite(conn.channel, buf); 231 if (numBytes < 0) { 232 throw new HBaseIOException("Error writing on the socket " + conn); 233 } 234 error = false; 235 } finally { 236 if (error) { 237 SimpleRpcServer.LOG.debug(conn + ": output error -- closing"); 238 // We will be closing this connection itself. Mark this call as done so that all the 239 // buffer(s) it got from pool can get released 240 resp.done(); 241 this.simpleRpcServer.closeConnection(conn); 242 } 243 } 244 245 if (!buf.hasRemaining()) { 246 resp.done(); 247 return true; 248 } else { 249 // set the serve time when the response has to be sent later 250 conn.lastSentTime = EnvironmentEdgeManager.currentTime(); 251 return false; // Socket can't take more, we will have to come back. 252 } 253 } 254 255 /** 256 * Process all the responses for this connection 257 * @return true if all the calls were processed or that someone else is doing it. false if there * 258 * is still some work to do. In this case, we expect the caller to delay us. n 259 */ 260 private boolean processAllResponses(final SimpleServerRpcConnection connection) 261 throws IOException { 262 // We want only one writer on the channel for a connection at a time. 263 connection.responseWriteLock.lock(); 264 try { 265 for (int i = 0; i < 20; i++) { 266 // protection if some handlers manage to need all the responder 267 RpcResponse resp = connection.responseQueue.pollFirst(); 268 if (resp == null) { 269 return true; 270 } 271 if (!processResponse(connection, resp)) { 272 connection.responseQueue.addFirst(resp); 273 return false; 274 } 275 } 276 } finally { 277 connection.responseWriteLock.unlock(); 278 } 279 280 return connection.responseQueue.isEmpty(); 281 } 282 283 // 284 // Enqueue a response from the application. 285 // 286 void doRespond(SimpleServerRpcConnection conn, RpcResponse resp) throws IOException { 287 boolean added = false; 288 // If there is already a write in progress, we don't wait. This allows to free the handlers 289 // immediately for other tasks. 290 if (conn.responseQueue.isEmpty() && conn.responseWriteLock.tryLock()) { 291 try { 292 if (conn.responseQueue.isEmpty()) { 293 // If we're alone, we can try to do a direct call to the socket. It's 294 // an optimization to save on context switches and data transfer between cores.. 295 if (processResponse(conn, resp)) { 296 return; // we're done. 297 } 298 // Too big to fit, putting ahead. 299 conn.responseQueue.addFirst(resp); 300 added = true; // We will register to the selector later, outside of the lock. 301 } 302 } finally { 303 conn.responseWriteLock.unlock(); 304 } 305 } 306 307 if (!added) { 308 conn.responseQueue.addLast(resp); 309 } 310 registerForWrite(conn); 311 } 312}