001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one or more contributor license 004 * agreements. See the NOTICE file distributed with this work for additional information regarding 005 * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the 006 * "License"); you may not use this file except in compliance with the License. You may obtain a 007 * copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software distributed under the 012 * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 013 * express or implied. See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package org.apache.hadoop.hbase.client; 017 018import static org.apache.hadoop.hbase.client.BufferedMutatorParams.UNSET; 019 020import java.io.Closeable; 021import java.io.IOException; 022import java.io.InterruptedIOException; 023import java.util.ArrayList; 024import java.util.Collections; 025import java.util.Iterator; 026import java.util.List; 027import java.util.NoSuchElementException; 028import java.util.Timer; 029import java.util.TimerTask; 030import java.util.concurrent.ConcurrentLinkedQueue; 031import java.util.concurrent.ExecutorService; 032import java.util.concurrent.TimeUnit; 033import java.util.concurrent.atomic.AtomicInteger; 034import java.util.concurrent.atomic.AtomicLong; 035import org.apache.hadoop.conf.Configuration; 036import org.apache.hadoop.hbase.TableName; 037import org.apache.hadoop.hbase.ipc.RpcControllerFactory; 038import org.apache.yetus.audience.InterfaceAudience; 039import org.apache.yetus.audience.InterfaceStability; 040import org.slf4j.Logger; 041import org.slf4j.LoggerFactory; 042 043/** 044 * <p> 045 * Used to communicate with a single HBase table similar to {@link Table} 046 * but meant for batched, potentially asynchronous puts. Obtain an instance from 047 * a {@link Connection} and call {@link #close()} afterwards. Provide an alternate 048 * to this implementation by setting {@link BufferedMutatorParams#implementationClassName(String)} 049 * or by setting alternate classname via the key {} in Configuration. 050 * </p> 051 * 052 * <p> 053 * While this can be used across threads, great care should be used when doing so. 054 * Errors are global to the buffered mutator and the Exceptions can be thrown on any 055 * thread that causes the flush for requests. 056 * </p> 057 * 058 * @see ConnectionFactory 059 * @see Connection 060 * @since 1.0.0 061 */ 062@InterfaceAudience.Private 063@InterfaceStability.Evolving 064public class BufferedMutatorImpl implements BufferedMutator { 065 066 private static final Logger LOG = LoggerFactory.getLogger(BufferedMutatorImpl.class); 067 068 private final ExceptionListener listener; 069 070 private final TableName tableName; 071 072 private final Configuration conf; 073 private final ConcurrentLinkedQueue<Mutation> writeAsyncBuffer = new ConcurrentLinkedQueue<>(); 074 private final AtomicLong currentWriteBufferSize = new AtomicLong(0); 075 /** 076 * Count the size of {@link BufferedMutatorImpl#writeAsyncBuffer}. 077 * The {@link ConcurrentLinkedQueue#size()} is NOT a constant-time operation. 078 */ 079 private final AtomicInteger undealtMutationCount = new AtomicInteger(0); 080 private final long writeBufferSize; 081 082 private final AtomicLong writeBufferPeriodicFlushTimeoutMs = new AtomicLong(0); 083 private final AtomicLong writeBufferPeriodicFlushTimerTickMs = 084 new AtomicLong(MIN_WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS); 085 private Timer writeBufferPeriodicFlushTimer = null; 086 087 private final int maxKeyValueSize; 088 private final ExecutorService pool; 089 private final AtomicInteger rpcTimeout; 090 private final AtomicInteger operationTimeout; 091 private final boolean cleanupPoolOnClose; 092 private volatile boolean closed = false; 093 private final AsyncProcess ap; 094 095 BufferedMutatorImpl(ClusterConnection conn, BufferedMutatorParams params, AsyncProcess ap) { 096 if (conn == null || conn.isClosed()) { 097 throw new IllegalArgumentException("Connection is null or closed."); 098 } 099 this.tableName = params.getTableName(); 100 this.conf = conn.getConfiguration(); 101 this.listener = params.getListener(); 102 if (params.getPool() == null) { 103 this.pool = HTable.getDefaultExecutor(conf); 104 cleanupPoolOnClose = true; 105 } else { 106 this.pool = params.getPool(); 107 cleanupPoolOnClose = false; 108 } 109 ConnectionConfiguration tableConf = new ConnectionConfiguration(conf); 110 this.writeBufferSize = 111 params.getWriteBufferSize() != UNSET ? 112 params.getWriteBufferSize() : tableConf.getWriteBufferSize(); 113 114 // Set via the setter because it does value validation and starts/stops the TimerTask 115 long newWriteBufferPeriodicFlushTimeoutMs = 116 params.getWriteBufferPeriodicFlushTimeoutMs() != UNSET 117 ? params.getWriteBufferPeriodicFlushTimeoutMs() 118 : tableConf.getWriteBufferPeriodicFlushTimeoutMs(); 119 long newWriteBufferPeriodicFlushTimerTickMs = 120 params.getWriteBufferPeriodicFlushTimerTickMs() != UNSET 121 ? params.getWriteBufferPeriodicFlushTimerTickMs() 122 : tableConf.getWriteBufferPeriodicFlushTimerTickMs(); 123 this.setWriteBufferPeriodicFlush( 124 newWriteBufferPeriodicFlushTimeoutMs, 125 newWriteBufferPeriodicFlushTimerTickMs); 126 127 this.maxKeyValueSize = 128 params.getMaxKeyValueSize() != UNSET ? 129 params.getMaxKeyValueSize() : tableConf.getMaxKeyValueSize(); 130 131 this.rpcTimeout = new AtomicInteger( 132 params.getRpcTimeout() != UNSET ? 133 params.getRpcTimeout() : conn.getConnectionConfiguration().getWriteRpcTimeout()); 134 135 this.operationTimeout = new AtomicInteger( 136 params.getOperationTimeout() != UNSET ? 137 params.getOperationTimeout() : conn.getConnectionConfiguration().getOperationTimeout()); 138 this.ap = ap; 139 } 140 BufferedMutatorImpl(ClusterConnection conn, RpcRetryingCallerFactory rpcCallerFactory, 141 RpcControllerFactory rpcFactory, BufferedMutatorParams params) { 142 this(conn, params, 143 // puts need to track errors globally due to how the APIs currently work. 144 new AsyncProcess(conn, conn.getConfiguration(), rpcCallerFactory, rpcFactory)); 145 } 146 147 private void checkClose() { 148 if (closed) { 149 throw new IllegalStateException("Cannot put when the BufferedMutator is closed."); 150 } 151 } 152 153 ExecutorService getPool() { 154 return pool; 155 } 156 157 AsyncProcess getAsyncProcess() { 158 return ap; 159 } 160 161 @Override 162 public TableName getName() { 163 return tableName; 164 } 165 166 @Override 167 public Configuration getConfiguration() { 168 return conf; 169 } 170 171 @Override 172 public void mutate(Mutation m) throws InterruptedIOException, 173 RetriesExhaustedWithDetailsException { 174 mutate(Collections.singletonList(m)); 175 } 176 177 @Override 178 public void mutate(List<? extends Mutation> ms) throws InterruptedIOException, 179 RetriesExhaustedWithDetailsException { 180 checkClose(); 181 182 long toAddSize = 0; 183 int toAddCount = 0; 184 for (Mutation m : ms) { 185 if (m instanceof Put) { 186 ConnectionUtils.validatePut((Put) m, maxKeyValueSize); 187 } 188 toAddSize += m.heapSize(); 189 ++toAddCount; 190 } 191 192 if (currentWriteBufferSize.get() == 0) { 193 firstRecordInBufferTimestamp.set(System.currentTimeMillis()); 194 } 195 currentWriteBufferSize.addAndGet(toAddSize); 196 writeAsyncBuffer.addAll(ms); 197 undealtMutationCount.addAndGet(toAddCount); 198 doFlush(false); 199 } 200 201 protected long getExecutedWriteBufferPeriodicFlushes() { 202 return executedWriteBufferPeriodicFlushes.get(); 203 } 204 205 private final AtomicLong firstRecordInBufferTimestamp = new AtomicLong(0); 206 private final AtomicLong executedWriteBufferPeriodicFlushes = new AtomicLong(0); 207 208 private void timerCallbackForWriteBufferPeriodicFlush() { 209 if (currentWriteBufferSize.get() == 0) { 210 return; // Nothing to flush 211 } 212 long now = System.currentTimeMillis(); 213 if (firstRecordInBufferTimestamp.get() + writeBufferPeriodicFlushTimeoutMs.get() > now) { 214 return; // No need to flush yet 215 } 216 // The first record in the writebuffer has been in there too long --> flush 217 try { 218 executedWriteBufferPeriodicFlushes.incrementAndGet(); 219 flush(); 220 } catch (InterruptedIOException | RetriesExhaustedWithDetailsException e) { 221 LOG.error("Exception during timerCallbackForWriteBufferPeriodicFlush --> " + e.getMessage()); 222 } 223 } 224 225 @Override 226 public synchronized void close() throws IOException { 227 if (closed) { 228 return; 229 } 230 // Stop any running Periodic Flush timer. 231 disableWriteBufferPeriodicFlush(); 232 try { 233 // As we can have an operation in progress even if the buffer is empty, we call 234 // doFlush at least one time. 235 doFlush(true); 236 } finally { 237 if (cleanupPoolOnClose) { 238 this.pool.shutdown(); 239 try { 240 if (!pool.awaitTermination(600, TimeUnit.SECONDS)) { 241 LOG.warn("close() failed to terminate pool after 10 minutes. Abandoning pool."); 242 } 243 } catch (InterruptedException e) { 244 LOG.warn("waitForTermination interrupted"); 245 Thread.currentThread().interrupt(); 246 } 247 } 248 closed = true; 249 } 250 } 251 252 private AsyncProcessTask createTask(QueueRowAccess access) { 253 return new AsyncProcessTask(AsyncProcessTask.newBuilder() 254 .setPool(pool) 255 .setTableName(tableName) 256 .setRowAccess(access) 257 .setSubmittedRows(AsyncProcessTask.SubmittedRows.AT_LEAST_ONE) 258 .build()) { 259 @Override 260 public int getRpcTimeout() { 261 return rpcTimeout.get(); 262 } 263 264 @Override 265 public int getOperationTimeout() { 266 return operationTimeout.get(); 267 } 268 }; 269 } 270 271 @Override 272 public void flush() throws InterruptedIOException, RetriesExhaustedWithDetailsException { 273 checkClose(); 274 doFlush(true); 275 } 276 277 /** 278 * Send the operations in the buffer to the servers. 279 * 280 * @param flushAll - if true, sends all the writes and wait for all of them to finish before 281 * returning. Otherwise, flush until buffer size is smaller than threshold 282 */ 283 private void doFlush(boolean flushAll) throws InterruptedIOException, 284 RetriesExhaustedWithDetailsException { 285 List<RetriesExhaustedWithDetailsException> errors = new ArrayList<>(); 286 while (true) { 287 if (!flushAll && currentWriteBufferSize.get() <= writeBufferSize) { 288 // There is the room to accept more mutations. 289 break; 290 } 291 AsyncRequestFuture asf; 292 try (QueueRowAccess access = createQueueRowAccess()) { 293 if (access.isEmpty()) { 294 // It means someone has gotten the ticker to run the flush. 295 break; 296 } 297 asf = ap.submit(createTask(access)); 298 } 299 // DON'T do the wait in the try-with-resources. Otherwise, the undealt mutations won't 300 // be released. 301 asf.waitUntilDone(); 302 if (asf.hasError()) { 303 errors.add(asf.getErrors()); 304 } 305 } 306 307 RetriesExhaustedWithDetailsException exception = makeException(errors); 308 if (exception == null) { 309 return; 310 } else if(listener == null) { 311 throw exception; 312 } else { 313 listener.onException(exception, this); 314 } 315 } 316 317 private static RetriesExhaustedWithDetailsException makeException( 318 List<RetriesExhaustedWithDetailsException> errors) { 319 switch (errors.size()) { 320 case 0: 321 return null; 322 case 1: 323 return errors.get(0); 324 default: 325 List<Throwable> exceptions = new ArrayList<>(); 326 List<Row> actions = new ArrayList<>(); 327 List<String> hostnameAndPort = new ArrayList<>(); 328 errors.forEach(e -> { 329 exceptions.addAll(e.exceptions); 330 actions.addAll(e.actions); 331 hostnameAndPort.addAll(e.hostnameAndPort); 332 }); 333 return new RetriesExhaustedWithDetailsException(exceptions, actions, hostnameAndPort); 334 } 335 } 336 337 /** 338 * {@inheritDoc} 339 */ 340 @Override 341 public long getWriteBufferSize() { 342 return this.writeBufferSize; 343 } 344 345 @Override 346 public synchronized void setWriteBufferPeriodicFlush(long timeoutMs, long timerTickMs) { 347 long originalTimeoutMs = this.writeBufferPeriodicFlushTimeoutMs.get(); 348 long originalTimerTickMs = this.writeBufferPeriodicFlushTimerTickMs.get(); 349 350 // Both parameters have minimal values. 351 writeBufferPeriodicFlushTimeoutMs.set(Math.max(0, timeoutMs)); 352 writeBufferPeriodicFlushTimerTickMs.set( 353 Math.max(MIN_WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS, timerTickMs)); 354 355 // If something changed we stop the old Timer. 356 if (writeBufferPeriodicFlushTimeoutMs.get() != originalTimeoutMs || 357 writeBufferPeriodicFlushTimerTickMs.get() != originalTimerTickMs) { 358 if (writeBufferPeriodicFlushTimer != null) { 359 writeBufferPeriodicFlushTimer.cancel(); 360 writeBufferPeriodicFlushTimer = null; 361 } 362 } 363 364 // If we have the need for a timer and there is none we start it 365 if (writeBufferPeriodicFlushTimer == null && 366 writeBufferPeriodicFlushTimeoutMs.get() > 0) { 367 writeBufferPeriodicFlushTimer = new Timer(true); // Create Timer running as Daemon. 368 writeBufferPeriodicFlushTimer.schedule(new TimerTask() { 369 @Override 370 public void run() { 371 BufferedMutatorImpl.this.timerCallbackForWriteBufferPeriodicFlush(); 372 } 373 }, writeBufferPeriodicFlushTimerTickMs.get(), 374 writeBufferPeriodicFlushTimerTickMs.get()); 375 } 376 } 377 378 @Override 379 public long getWriteBufferPeriodicFlushTimeoutMs() { 380 return writeBufferPeriodicFlushTimeoutMs.get(); 381 } 382 383 @Override 384 public long getWriteBufferPeriodicFlushTimerTickMs() { 385 return writeBufferPeriodicFlushTimerTickMs.get(); 386 } 387 388 @Override 389 public void setRpcTimeout(int rpcTimeout) { 390 this.rpcTimeout.set(rpcTimeout); 391 } 392 393 @Override 394 public void setOperationTimeout(int operationTimeout) { 395 this.operationTimeout.set(operationTimeout); 396 } 397 398 long getCurrentWriteBufferSize() { 399 return currentWriteBufferSize.get(); 400 } 401 402 /** 403 * Count the mutations which haven't been processed. 404 * @return count of undealt mutation 405 */ 406 int size() { 407 return undealtMutationCount.get(); 408 } 409 410 /** 411 * Count the mutations which haven't been flushed 412 * @return count of unflushed mutation 413 */ 414 int getUnflushedSize() { 415 return writeAsyncBuffer.size(); 416 } 417 418 QueueRowAccess createQueueRowAccess() { 419 return new QueueRowAccess(); 420 } 421 422 class QueueRowAccess implements RowAccess<Row>, Closeable { 423 private int remainder = undealtMutationCount.getAndSet(0); 424 private Mutation last = null; 425 426 private void restoreLastMutation() { 427 // restore the last mutation since it isn't submitted 428 if (last != null) { 429 writeAsyncBuffer.add(last); 430 currentWriteBufferSize.addAndGet(last.heapSize()); 431 last = null; 432 } 433 } 434 435 @Override 436 public void close() { 437 restoreLastMutation(); 438 if (remainder > 0) { 439 undealtMutationCount.addAndGet(remainder); 440 remainder = 0; 441 } 442 } 443 444 @Override 445 public Iterator<Row> iterator() { 446 return new Iterator<Row>() { 447 private int countDown = remainder; 448 @Override 449 public boolean hasNext() { 450 return countDown > 0; 451 } 452 @Override 453 public Row next() { 454 restoreLastMutation(); 455 if (!hasNext()) { 456 throw new NoSuchElementException(); 457 } 458 last = writeAsyncBuffer.poll(); 459 if (last == null) { 460 throw new NoSuchElementException(); 461 } 462 currentWriteBufferSize.addAndGet(-last.heapSize()); 463 --countDown; 464 return last; 465 } 466 @Override 467 public void remove() { 468 if (last == null) { 469 throw new IllegalStateException(); 470 } 471 --remainder; 472 last = null; 473 } 474 }; 475 } 476 477 @Override 478 public int size() { 479 return remainder; 480 } 481 482 @Override 483 public boolean isEmpty() { 484 return remainder <= 0; 485 } 486 } 487}