001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import java.io.IOException;
021import java.util.AbstractMap.SimpleEntry;
022import java.util.ArrayList;
023import java.util.Collections;
024import java.util.HashMap;
025import java.util.List;
026import java.util.Map;
027import java.util.concurrent.ConcurrentHashMap;
028import java.util.concurrent.ExecutorService;
029import java.util.concurrent.Executors;
030import java.util.concurrent.LinkedBlockingQueue;
031import java.util.concurrent.ScheduledExecutorService;
032import java.util.concurrent.TimeUnit;
033import java.util.concurrent.atomic.AtomicInteger;
034import java.util.concurrent.atomic.AtomicLong;
035import org.apache.hadoop.conf.Configuration;
036import org.apache.hadoop.hbase.HConstants;
037import org.apache.hadoop.hbase.HRegionLocation;
038import org.apache.hadoop.hbase.ServerName;
039import org.apache.hadoop.hbase.TableName;
040import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
041import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
042import org.apache.yetus.audience.InterfaceAudience;
043import org.slf4j.Logger;
044import org.slf4j.LoggerFactory;
045
046import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
047
048/**
049 * HTableMultiplexer provides a thread-safe non blocking PUT API across all the tables. Each put
050 * will be sharded into different buffer queues based on its destination region server. So each
051 * region server buffer queue will only have the puts which share the same destination. And each
052 * queue will have a flush worker thread to flush the puts request to the region server. If any
053 * queue is full, the HTableMultiplexer starts to drop the Put requests for that particular queue.
054 * </p>
055 * Also all the puts will be retried as a configuration number before dropping. And the
056 * HTableMultiplexer can report the number of buffered requests and the number of the failed
057 * (dropped) requests in total or on per region server basis.
058 * <p/>
059 * This class is thread safe.
060 * @deprecated since 2.2.0, will be removed in 3.0.0, without replacement. Please use
061 *             {@link BufferedMutator} for batching mutations.
062 */
063@Deprecated
064@InterfaceAudience.Public
065public class HTableMultiplexer {
066  private static final Logger LOG = LoggerFactory.getLogger(HTableMultiplexer.class.getName());
067
068  public static final String TABLE_MULTIPLEXER_FLUSH_PERIOD_MS =
069    "hbase.tablemultiplexer.flush.period.ms";
070  public static final String TABLE_MULTIPLEXER_INIT_THREADS = "hbase.tablemultiplexer.init.threads";
071  public static final String TABLE_MULTIPLEXER_MAX_RETRIES_IN_QUEUE =
072    "hbase.client.max.retries.in.queue";
073
074  /** The map between each region server to its flush worker */
075  private final Map<HRegionLocation, FlushWorker> serverToFlushWorkerMap =
076    new ConcurrentHashMap<>();
077
078  private final Configuration conf;
079  private final ClusterConnection conn;
080  private final ExecutorService pool;
081  private final int maxAttempts;
082  private final int perRegionServerBufferQueueSize;
083  private final int maxKeyValueSize;
084  private final ScheduledExecutorService executor;
085  private final long flushPeriod;
086
087  /**
088   * @param conf                           The HBaseConfiguration
089   * @param perRegionServerBufferQueueSize determines the max number of the buffered Put ops for
090   *                                       each region server before dropping the request.
091   */
092  public HTableMultiplexer(Configuration conf, int perRegionServerBufferQueueSize)
093    throws IOException {
094    this(ConnectionFactory.createConnection(conf), conf, perRegionServerBufferQueueSize);
095  }
096
097  /**
098   * @param conn                           The HBase connection.
099   * @param conf                           The HBase configuration
100   * @param perRegionServerBufferQueueSize determines the max number of the buffered Put ops for
101   *                                       each region server before dropping the request.
102   */
103  public HTableMultiplexer(Connection conn, Configuration conf,
104    int perRegionServerBufferQueueSize) {
105    this.conn = (ClusterConnection) conn;
106    this.pool = HTable.getDefaultExecutor(conf);
107    // how many times we could try in total, one more than retry number
108    this.maxAttempts = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
109      HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER) + 1;
110    this.perRegionServerBufferQueueSize = perRegionServerBufferQueueSize;
111    this.maxKeyValueSize = HTable.getMaxKeyValueSize(conf);
112    this.flushPeriod = conf.getLong(TABLE_MULTIPLEXER_FLUSH_PERIOD_MS, 100);
113    int initThreads = conf.getInt(TABLE_MULTIPLEXER_INIT_THREADS, 10);
114    this.executor = Executors.newScheduledThreadPool(initThreads,
115      new ThreadFactoryBuilder().setDaemon(true).setNameFormat("HTableFlushWorker-%d").build());
116    this.conf = conf;
117  }
118
119  /**
120   * Closes the internal {@link Connection}. Does nothing if the {@link Connection} has already been
121   * closed.
122   * @throws IOException If there is an error closing the connection.
123   */
124  public synchronized void close() throws IOException {
125    if (!getConnection().isClosed()) {
126      getConnection().close();
127    }
128  }
129
130  /**
131   * The put request will be buffered by its corresponding buffer queue. Return false if the queue
132   * is already full. nn * @return true if the request can be accepted by its corresponding buffer
133   * queue.
134   */
135  public boolean put(TableName tableName, final Put put) {
136    return put(tableName, put, this.maxAttempts);
137  }
138
139  /**
140   * The puts request will be buffered by their corresponding buffer queue. Return the list of puts
141   * which could not be queued. nn * @return the list of puts which could not be queued
142   */
143  public List<Put> put(TableName tableName, final List<Put> puts) {
144    if (puts == null) return null;
145
146    List<Put> failedPuts = null;
147    boolean result;
148    for (Put put : puts) {
149      result = put(tableName, put, this.maxAttempts);
150      if (result == false) {
151
152        // Create the failed puts list if necessary
153        if (failedPuts == null) {
154          failedPuts = new ArrayList<>();
155        }
156        // Add the put to the failed puts list
157        failedPuts.add(put);
158      }
159    }
160    return failedPuts;
161  }
162
163  /**
164   * @deprecated Use {@link #put(TableName, List) } instead.
165   */
166  @Deprecated
167  public List<Put> put(byte[] tableName, final List<Put> puts) {
168    return put(TableName.valueOf(tableName), puts);
169  }
170
171  /**
172   * The put request will be buffered by its corresponding buffer queue. And the put request will be
173   * retried before dropping the request. Return false if the queue is already full.
174   * @return true if the request can be accepted by its corresponding buffer queue.
175   */
176  public boolean put(final TableName tableName, final Put put, int maxAttempts) {
177    if (maxAttempts <= 0) {
178      return false;
179    }
180
181    try {
182      ConnectionUtils.validatePut(put, maxKeyValueSize);
183      // Allow mocking to get at the connection, but don't expose the connection to users.
184      ClusterConnection conn = (ClusterConnection) getConnection();
185      // AsyncProcess in the FlushWorker should take care of refreshing the location cache
186      // as necessary. We shouldn't have to do that here.
187      HRegionLocation loc = conn.getRegionLocation(tableName, put.getRow(), false);
188      if (loc != null) {
189        // Add the put pair into its corresponding queue.
190        LinkedBlockingQueue<PutStatus> queue = getQueue(loc);
191
192        // Generate a MultiPutStatus object and offer it into the queue
193        PutStatus s = new PutStatus(loc.getRegion(), put, maxAttempts);
194
195        return queue.offer(s);
196      }
197    } catch (IOException e) {
198      LOG.debug("Cannot process the put " + put, e);
199    }
200    return false;
201  }
202
203  /**
204   * @deprecated Use {@link #put(TableName, Put) } instead.
205   */
206  @Deprecated
207  public boolean put(final byte[] tableName, final Put put, int retry) {
208    return put(TableName.valueOf(tableName), put, retry);
209  }
210
211  /**
212   * @deprecated Use {@link #put(TableName, Put)} instead.
213   */
214  @Deprecated
215  public boolean put(final byte[] tableName, Put put) {
216    return put(TableName.valueOf(tableName), put);
217  }
218
219  /** Returns the current HTableMultiplexerStatus */
220  public HTableMultiplexerStatus getHTableMultiplexerStatus() {
221    return new HTableMultiplexerStatus(serverToFlushWorkerMap);
222  }
223
224  @InterfaceAudience.Private
225  @SuppressWarnings("FutureReturnValueIgnored")
226  LinkedBlockingQueue<PutStatus> getQueue(HRegionLocation addr) {
227    FlushWorker worker = serverToFlushWorkerMap.get(addr);
228    if (worker == null) {
229      synchronized (this.serverToFlushWorkerMap) {
230        worker = serverToFlushWorkerMap.get(addr);
231        if (worker == null) {
232          // Create the flush worker
233          worker = new FlushWorker(conf, this.conn, addr, this, perRegionServerBufferQueueSize,
234            pool, executor);
235          this.serverToFlushWorkerMap.put(addr, worker);
236          executor.scheduleAtFixedRate(worker, flushPeriod, flushPeriod, TimeUnit.MILLISECONDS);
237        }
238      }
239    }
240    return worker.getQueue();
241  }
242
243  @InterfaceAudience.Private
244  ClusterConnection getConnection() {
245    return this.conn;
246  }
247
248  /**
249   * HTableMultiplexerStatus keeps track of the current status of the HTableMultiplexer. report the
250   * number of buffered requests and the number of the failed (dropped) requests in total or on per
251   * region server basis.
252   * @deprecated since 2.2.0, will be removed in 3.0.0, without replacement. Please use
253   *             {@link BufferedMutator} for batching mutations.
254   */
255  @Deprecated
256  @InterfaceAudience.Public
257  public static class HTableMultiplexerStatus {
258    private long totalFailedPutCounter;
259    private long totalBufferedPutCounter;
260    private long maxLatency;
261    private long overallAverageLatency;
262    private Map<String, Long> serverToFailedCounterMap;
263    private Map<String, Long> serverToBufferedCounterMap;
264    private Map<String, Long> serverToAverageLatencyMap;
265    private Map<String, Long> serverToMaxLatencyMap;
266
267    public HTableMultiplexerStatus(Map<HRegionLocation, FlushWorker> serverToFlushWorkerMap) {
268      this.totalBufferedPutCounter = 0;
269      this.totalFailedPutCounter = 0;
270      this.maxLatency = 0;
271      this.overallAverageLatency = 0;
272      this.serverToBufferedCounterMap = new HashMap<>();
273      this.serverToFailedCounterMap = new HashMap<>();
274      this.serverToAverageLatencyMap = new HashMap<>();
275      this.serverToMaxLatencyMap = new HashMap<>();
276      this.initialize(serverToFlushWorkerMap);
277    }
278
279    private void initialize(Map<HRegionLocation, FlushWorker> serverToFlushWorkerMap) {
280      if (serverToFlushWorkerMap == null) {
281        return;
282      }
283
284      long averageCalcSum = 0;
285      int averageCalcCount = 0;
286      for (Map.Entry<HRegionLocation, FlushWorker> entry : serverToFlushWorkerMap.entrySet()) {
287        HRegionLocation addr = entry.getKey();
288        FlushWorker worker = entry.getValue();
289
290        long bufferedCounter = worker.getTotalBufferedCount();
291        long failedCounter = worker.getTotalFailedCount();
292        long serverMaxLatency = worker.getMaxLatency();
293        AtomicAverageCounter averageCounter = worker.getAverageLatencyCounter();
294        // Get sum and count pieces separately to compute overall average
295        SimpleEntry<Long, Integer> averageComponents = averageCounter.getComponents();
296        long serverAvgLatency = averageCounter.getAndReset();
297
298        this.totalBufferedPutCounter += bufferedCounter;
299        this.totalFailedPutCounter += failedCounter;
300        if (serverMaxLatency > this.maxLatency) {
301          this.maxLatency = serverMaxLatency;
302        }
303        averageCalcSum += averageComponents.getKey();
304        averageCalcCount += averageComponents.getValue();
305
306        this.serverToBufferedCounterMap.put(addr.getHostnamePort(), bufferedCounter);
307        this.serverToFailedCounterMap.put(addr.getHostnamePort(), failedCounter);
308        this.serverToAverageLatencyMap.put(addr.getHostnamePort(), serverAvgLatency);
309        this.serverToMaxLatencyMap.put(addr.getHostnamePort(), serverMaxLatency);
310      }
311      this.overallAverageLatency = averageCalcCount != 0 ? averageCalcSum / averageCalcCount : 0;
312    }
313
314    public long getTotalBufferedCounter() {
315      return this.totalBufferedPutCounter;
316    }
317
318    public long getTotalFailedCounter() {
319      return this.totalFailedPutCounter;
320    }
321
322    public long getMaxLatency() {
323      return this.maxLatency;
324    }
325
326    public long getOverallAverageLatency() {
327      return this.overallAverageLatency;
328    }
329
330    public Map<String, Long> getBufferedCounterForEachRegionServer() {
331      return this.serverToBufferedCounterMap;
332    }
333
334    public Map<String, Long> getFailedCounterForEachRegionServer() {
335      return this.serverToFailedCounterMap;
336    }
337
338    public Map<String, Long> getMaxLatencyForEachRegionServer() {
339      return this.serverToMaxLatencyMap;
340    }
341
342    public Map<String, Long> getAverageLatencyForEachRegionServer() {
343      return this.serverToAverageLatencyMap;
344    }
345  }
346
347  @InterfaceAudience.Private
348  static class PutStatus {
349    final RegionInfo regionInfo;
350    final Put put;
351    final int maxAttempCount;
352
353    public PutStatus(RegionInfo regionInfo, Put put, int maxAttempCount) {
354      this.regionInfo = regionInfo;
355      this.put = put;
356      this.maxAttempCount = maxAttempCount;
357    }
358  }
359
360  /**
361   * Helper to count the average over an interval until reset.
362   */
363  private static class AtomicAverageCounter {
364    private long sum;
365    private int count;
366
367    public AtomicAverageCounter() {
368      this.sum = 0L;
369      this.count = 0;
370    }
371
372    public synchronized long getAndReset() {
373      long result = this.get();
374      this.reset();
375      return result;
376    }
377
378    public synchronized long get() {
379      if (this.count == 0) {
380        return 0;
381      }
382      return this.sum / this.count;
383    }
384
385    public synchronized SimpleEntry<Long, Integer> getComponents() {
386      return new SimpleEntry<>(sum, count);
387    }
388
389    public synchronized void reset() {
390      this.sum = 0L;
391      this.count = 0;
392    }
393
394    public synchronized void add(long value) {
395      this.sum += value;
396      this.count++;
397    }
398  }
399
400  @InterfaceAudience.Private
401  static class FlushWorker implements Runnable {
402    private final HRegionLocation addr;
403    private final LinkedBlockingQueue<PutStatus> queue;
404    private final HTableMultiplexer multiplexer;
405    private final AtomicLong totalFailedPutCount = new AtomicLong(0);
406    private final AtomicInteger currentProcessingCount = new AtomicInteger(0);
407    private final AtomicAverageCounter averageLatency = new AtomicAverageCounter();
408    private final AtomicLong maxLatency = new AtomicLong(0);
409
410    private final AsyncProcess ap;
411    private final List<PutStatus> processingList = new ArrayList<>();
412    private final ScheduledExecutorService executor;
413    private final int maxRetryInQueue;
414    private final AtomicInteger retryInQueue = new AtomicInteger(0);
415    private final int writeRpcTimeout; // needed to pass in through AsyncProcess constructor
416    private final int operationTimeout;
417    private final ExecutorService pool;
418
419    public FlushWorker(Configuration conf, ClusterConnection conn, HRegionLocation addr,
420      HTableMultiplexer htableMultiplexer, int perRegionServerBufferQueueSize, ExecutorService pool,
421      ScheduledExecutorService executor) {
422      this.addr = addr;
423      this.multiplexer = htableMultiplexer;
424      this.queue = new LinkedBlockingQueue<>(perRegionServerBufferQueueSize);
425      RpcRetryingCallerFactory rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf);
426      RpcControllerFactory rpcControllerFactory = RpcControllerFactory.instantiate(conf);
427      this.writeRpcTimeout = conf.getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY,
428        conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
429      this.operationTimeout = conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
430        HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
431      // Specify 0 retries in AsyncProcess because we need to reassign puts to different queues
432      // if regions are moved.
433      this.ap = new AsyncProcess(conn, conf, rpcCallerFactory, rpcControllerFactory, 0);
434      this.executor = executor;
435      this.maxRetryInQueue = conf.getInt(TABLE_MULTIPLEXER_MAX_RETRIES_IN_QUEUE, 10000);
436      this.pool = pool;
437    }
438
439    protected LinkedBlockingQueue<PutStatus> getQueue() {
440      return this.queue;
441    }
442
443    public long getTotalFailedCount() {
444      return totalFailedPutCount.get();
445    }
446
447    public long getTotalBufferedCount() {
448      return (long) queue.size() + currentProcessingCount.get();
449    }
450
451    public AtomicAverageCounter getAverageLatencyCounter() {
452      return this.averageLatency;
453    }
454
455    public long getMaxLatency() {
456      return this.maxLatency.getAndSet(0);
457    }
458
459    @SuppressWarnings("FutureReturnValueIgnored")
460    boolean resubmitFailedPut(PutStatus ps, HRegionLocation oldLoc) throws IOException {
461      // Decrease the retry count
462      final int retryCount = ps.maxAttempCount - 1;
463
464      if (retryCount <= 0) {
465        // Update the failed counter and no retry any more.
466        return false;
467      }
468
469      int cnt = getRetryInQueue().incrementAndGet();
470      if (cnt > getMaxRetryInQueue()) {
471        // Too many Puts in queue for resubmit, give up this
472        getRetryInQueue().decrementAndGet();
473        return false;
474      }
475
476      final Put failedPut = ps.put;
477      // The currentPut is failed. So get the table name for the currentPut.
478      final TableName tableName = ps.regionInfo.getTable();
479
480      long delayMs = getNextDelay(retryCount);
481      if (LOG.isDebugEnabled()) {
482        LOG.debug("resubmitting after " + delayMs + "ms: " + retryCount);
483      }
484
485      // HBASE-12198, HBASE-15221, HBASE-15232: AsyncProcess should be responsible for updating
486      // the region location cache when the Put original failed with some exception. If we keep
487      // re-trying the same Put to the same location, AsyncProcess isn't doing the right stuff
488      // that we expect it to.
489      getExecutor().schedule(new Runnable() {
490        @Override
491        public void run() {
492          boolean succ = false;
493          try {
494            succ = FlushWorker.this.getMultiplexer().put(tableName, failedPut, retryCount);
495          } finally {
496            FlushWorker.this.getRetryInQueue().decrementAndGet();
497            if (!succ) {
498              FlushWorker.this.getTotalFailedPutCount().incrementAndGet();
499            }
500          }
501        }
502      }, delayMs, TimeUnit.MILLISECONDS);
503      return true;
504    }
505
506    @InterfaceAudience.Private
507    long getNextDelay(int retryCount) {
508      return ConnectionUtils.getPauseTime(multiplexer.flushPeriod,
509        multiplexer.maxAttempts - retryCount - 1);
510    }
511
512    @InterfaceAudience.Private
513    AtomicInteger getRetryInQueue() {
514      return this.retryInQueue;
515    }
516
517    @InterfaceAudience.Private
518    int getMaxRetryInQueue() {
519      return this.maxRetryInQueue;
520    }
521
522    @InterfaceAudience.Private
523    AtomicLong getTotalFailedPutCount() {
524      return this.totalFailedPutCount;
525    }
526
527    @InterfaceAudience.Private
528    HTableMultiplexer getMultiplexer() {
529      return this.multiplexer;
530    }
531
532    @InterfaceAudience.Private
533    ScheduledExecutorService getExecutor() {
534      return this.executor;
535    }
536
537    @Override
538    public void run() {
539      int failedCount = 0;
540      try {
541        long start = EnvironmentEdgeManager.currentTime();
542
543        // drain all the queued puts into the tmp list
544        processingList.clear();
545        queue.drainTo(processingList);
546        if (processingList.isEmpty()) {
547          // Nothing to flush
548          return;
549        }
550
551        currentProcessingCount.set(processingList.size());
552        // failedCount is decreased whenever a Put is success or resubmit.
553        failedCount = processingList.size();
554
555        List<Action> retainedActions = new ArrayList<>(processingList.size());
556        MultiAction actions = new MultiAction();
557        for (int i = 0; i < processingList.size(); i++) {
558          PutStatus putStatus = processingList.get(i);
559          Action action = new Action(putStatus.put, i);
560          actions.add(putStatus.regionInfo.getRegionName(), action);
561          retainedActions.add(action);
562        }
563
564        // Process this multi-put request
565        List<PutStatus> failed = null;
566        Object[] results = new Object[actions.size()];
567        ServerName server = addr.getServerName();
568        Map<ServerName, MultiAction> actionsByServer = Collections.singletonMap(server, actions);
569        try {
570          AsyncProcessTask task = AsyncProcessTask.newBuilder().setResults(results).setPool(pool)
571            .setRpcTimeout(writeRpcTimeout).setOperationTimeout(operationTimeout).build();
572          AsyncRequestFuture arf =
573            ap.submitMultiActions(task, retainedActions, 0L, null, null, actionsByServer);
574          arf.waitUntilDone();
575          if (arf.hasError()) {
576            // We just log and ignore the exception here since failed Puts will be resubmit again.
577            LOG.debug("Caught some exceptions when flushing puts to region server "
578              + addr.getHostnamePort(), arf.getErrors());
579          }
580        } finally {
581          for (int i = 0; i < results.length; i++) {
582            if (results[i] instanceof Result) {
583              failedCount--;
584            } else {
585              if (failed == null) {
586                failed = new ArrayList<>();
587              }
588              failed.add(processingList.get(i));
589            }
590          }
591        }
592
593        if (failed != null) {
594          // Resubmit failed puts
595          for (PutStatus putStatus : failed) {
596            if (resubmitFailedPut(putStatus, this.addr)) {
597              failedCount--;
598            }
599          }
600        }
601
602        long elapsed = EnvironmentEdgeManager.currentTime() - start;
603        // Update latency counters
604        averageLatency.add(elapsed);
605        if (elapsed > maxLatency.get()) {
606          maxLatency.set(elapsed);
607        }
608
609        // Log some basic info
610        if (LOG.isDebugEnabled()) {
611          LOG.debug(
612            "Processed " + currentProcessingCount + " put requests for " + addr.getHostnamePort()
613              + " and " + failedCount + " failed" + ", latency for this send: " + elapsed);
614        }
615
616        // Reset the current processing put count
617        currentProcessingCount.set(0);
618      } catch (RuntimeException e) {
619        // To make findbugs happy
620        // Log all the exceptions and move on
621        LOG.debug("Caught some exceptions " + e + " when flushing puts to region server "
622          + addr.getHostnamePort(), e);
623      } catch (Exception e) {
624        if (e instanceof InterruptedException) {
625          Thread.currentThread().interrupt();
626        }
627        // Log all the exceptions and move on
628        LOG.debug("Caught some exceptions " + e + " when flushing puts to region server "
629          + addr.getHostnamePort(), e);
630      } finally {
631        // Update the totalFailedCount
632        this.totalFailedPutCount.addAndGet(failedCount);
633      }
634    }
635  }
636}