001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.thrift;
019
020import java.io.IOException;
021import java.util.Arrays;
022import java.util.List;
023import java.util.Set;
024import java.util.concurrent.Callable;
025import java.util.concurrent.ConcurrentHashMap;
026import java.util.concurrent.ConcurrentMap;
027import java.util.concurrent.LinkedBlockingQueue;
028import java.util.concurrent.ThreadPoolExecutor;
029import java.util.concurrent.TimeUnit;
030import java.util.concurrent.atomic.LongAdder;
031import org.apache.hadoop.hbase.CellUtil;
032import org.apache.hadoop.hbase.client.Table;
033import org.apache.hadoop.hbase.thrift.generated.TIncrement;
034import org.apache.hadoop.hbase.util.Bytes;
035import org.apache.hadoop.hbase.util.Threads;
036import org.apache.hadoop.metrics2.util.MBeans;
037import org.apache.yetus.audience.InterfaceAudience;
038import org.slf4j.Logger;
039import org.slf4j.LoggerFactory;
040
041import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
042
043/**
044 * This class will coalesce increments from a thift server if
045 * hbase.regionserver.thrift.coalesceIncrement is set to true. Turning this config to true will
046 * cause the thrift server to queue increments into an instance of this class. The thread pool
047 * associated with this class will drain the coalesced increments as the thread is able. This can
048 * cause data loss if the thrift server dies or is shut down before everything in the queue is
049 * drained.
050 */
051@InterfaceAudience.Private
052public class IncrementCoalescer implements IncrementCoalescerMBean {
053  /**
054   * Used to identify a cell that will be incremented.
055   */
056  static class FullyQualifiedRow {
057    private byte[] table;
058    private byte[] rowKey;
059    private byte[] family;
060    private byte[] qualifier;
061
062    public FullyQualifiedRow(byte[] table, byte[] rowKey, byte[] fam, byte[] qual) {
063      super();
064      this.table = table;
065      this.rowKey = rowKey;
066      this.family = fam;
067      this.qualifier = qual;
068    }
069
070    public byte[] getTable() {
071      return table;
072    }
073
074    public void setTable(byte[] table) {
075      this.table = table;
076    }
077
078    public byte[] getRowKey() {
079      return rowKey;
080    }
081
082    public byte[] getFamily() {
083      return family;
084    }
085
086    public void setFamily(byte[] fam) {
087      this.family = fam;
088    }
089
090    public byte[] getQualifier() {
091      return qualifier;
092    }
093
094    public void setQualifier(byte[] qual) {
095      this.qualifier = qual;
096    }
097
098    @Override
099    public int hashCode() {
100      final int prime = 31;
101      int result = 1;
102      result = prime * result + Arrays.hashCode(family);
103      result = prime * result + Arrays.hashCode(qualifier);
104      result = prime * result + Arrays.hashCode(rowKey);
105      result = prime * result + Arrays.hashCode(table);
106      return result;
107    }
108
109    @Override
110    public boolean equals(Object obj) {
111      if (this == obj) {
112        return true;
113      }
114      if (obj == null) {
115        return false;
116      }
117      if (getClass() != obj.getClass()) {
118        return false;
119      }
120
121      FullyQualifiedRow other = (FullyQualifiedRow) obj;
122
123      if (!Arrays.equals(family, other.family)) {
124        return false;
125      }
126      if (!Arrays.equals(qualifier, other.qualifier)) {
127        return false;
128      }
129      if (!Arrays.equals(rowKey, other.rowKey)) {
130        return false;
131      }
132
133      return Arrays.equals(table, other.table);
134    }
135  }
136
137  private final LongAdder failedIncrements = new LongAdder();
138  private final LongAdder successfulCoalescings = new LongAdder();
139  private final LongAdder totalIncrements = new LongAdder();
140  private final ConcurrentMap<FullyQualifiedRow, Long> countersMap =
141    new ConcurrentHashMap<>(100000, 0.75f, 1500);
142  private final ThreadPoolExecutor pool;
143  private final ThriftHBaseServiceHandler handler;
144
145  private int maxQueueSize = 500000;
146  private static final int CORE_POOL_SIZE = 1;
147
148  private static final Logger LOG = LoggerFactory.getLogger(IncrementCoalescer.class);
149
150  public IncrementCoalescer(ThriftHBaseServiceHandler hand) {
151    this.handler = hand;
152    LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
153    pool = new ThreadPoolExecutor(CORE_POOL_SIZE, CORE_POOL_SIZE, 50, TimeUnit.MILLISECONDS, queue,
154      new ThreadFactoryBuilder().setNameFormat("IncrementCoalescer-pool-%d").setDaemon(true)
155        .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
156    MBeans.register("thrift", "Thrift", this);
157  }
158
159  public boolean queueIncrement(TIncrement inc) {
160    if (!canQueue()) {
161      failedIncrements.increment();
162      return false;
163    }
164    return internalQueueTincrement(inc);
165  }
166
167  public boolean queueIncrements(List<TIncrement> incs) {
168    if (!canQueue()) {
169      failedIncrements.increment();
170      return false;
171    }
172
173    for (TIncrement tinc : incs) {
174      internalQueueTincrement(tinc);
175    }
176
177    return true;
178  }
179
180  private boolean internalQueueTincrement(TIncrement inc) {
181    byte[][] famAndQf = CellUtil.parseColumn(inc.getColumn());
182
183    if (famAndQf.length != 2) {
184      return false;
185    }
186
187    return internalQueueIncrement(inc.getTable(), inc.getRow(), famAndQf[0], famAndQf[1],
188      inc.getAmmount());
189  }
190
191  @SuppressWarnings("FutureReturnValueIgnored")
192  private boolean internalQueueIncrement(byte[] tableName, byte[] rowKey, byte[] fam, byte[] qual,
193    long ammount) {
194    int countersMapSize = countersMap.size();
195
196    // Make sure that the number of threads is scaled.
197    dynamicallySetCoreSize(countersMapSize);
198
199    totalIncrements.increment();
200
201    FullyQualifiedRow key = new FullyQualifiedRow(tableName, rowKey, fam, qual);
202
203    long currentAmount = ammount;
204    // Spin until able to insert the value back without collisions
205    while (true) {
206      Long value = countersMap.remove(key);
207      if (value == null) {
208        // There was nothing there, create a new value
209        value = currentAmount;
210      } else {
211        value += currentAmount;
212        successfulCoalescings.increment();
213      }
214      // Try to put the value, only if there was none
215      Long oldValue = countersMap.putIfAbsent(key, value);
216      if (oldValue == null) {
217        // We were able to put it in, we're done
218        break;
219      }
220      // Someone else was able to put a value in, so let's remember our
221      // current value (plus what we picked up) and retry to add it in
222      currentAmount = value;
223    }
224
225    // We limit the size of the queue simply because all we need is a
226    // notification that something needs to be incremented. No need
227    // for millions of callables that mean the same thing.
228    if (pool.getQueue().size() <= 1000) {
229      // queue it up
230      Callable<Integer> callable = createIncCallable();
231      pool.submit(callable);
232    }
233
234    return true;
235  }
236
237  public boolean canQueue() {
238    return countersMap.size() < maxQueueSize;
239  }
240
241  private Callable<Integer> createIncCallable() {
242    return () -> {
243      int failures = 0;
244      Set<FullyQualifiedRow> keys = countersMap.keySet();
245      for (FullyQualifiedRow row : keys) {
246        Long counter = countersMap.remove(row);
247        if (counter == null) {
248          continue;
249        }
250        Table table = null;
251        try {
252          table = handler.getTable(row.getTable());
253          if (failures > 2) {
254            throw new IOException("Auto-Fail rest of ICVs");
255          }
256          table.incrementColumnValue(row.getRowKey(), row.getFamily(), row.getQualifier(), counter);
257        } catch (IOException e) {
258          // log failure of increment
259          failures++;
260          LOG.error("FAILED_ICV: " + Bytes.toString(row.getTable()) + ", "
261            + Bytes.toStringBinary(row.getRowKey()) + ", " + Bytes.toStringBinary(row.getFamily())
262            + ", " + Bytes.toStringBinary(row.getQualifier()) + ", " + counter, e);
263        } finally {
264          if (table != null) {
265            table.close();
266          }
267        }
268      }
269      return failures;
270    };
271  }
272
273  /**
274   * This method samples the incoming requests and, if selected, will check if the corePoolSize
275   * should be changed.
276   * @param countersMapSize the size of the counters map
277   */
278  private void dynamicallySetCoreSize(int countersMapSize) {
279    // Here we are using countersMapSize as a random number, meaning this
280    // could be a Random object
281    if (countersMapSize % 10 != 0) {
282      return;
283    }
284    double currentRatio = (double) countersMapSize / (double) maxQueueSize;
285    int newValue;
286
287    if (currentRatio < 0.1) {
288      newValue = 1;
289    } else if (currentRatio < 0.3) {
290      newValue = 2;
291    } else if (currentRatio < 0.5) {
292      newValue = 4;
293    } else if (currentRatio < 0.7) {
294      newValue = 8;
295    } else if (currentRatio < 0.9) {
296      newValue = 14;
297    } else {
298      newValue = 22;
299    }
300
301    if (pool.getCorePoolSize() != newValue) {
302      pool.setCorePoolSize(newValue);
303    }
304  }
305
306  // MBean get/set methods
307  @Override
308  public int getQueueSize() {
309    return pool.getQueue().size();
310  }
311
312  @Override
313  public int getMaxQueueSize() {
314    return this.maxQueueSize;
315  }
316
317  @Override
318  public void setMaxQueueSize(int newSize) {
319    this.maxQueueSize = newSize;
320  }
321
322  @Override
323  public long getPoolCompletedTaskCount() {
324    return pool.getCompletedTaskCount();
325  }
326
327  @Override
328  public long getPoolTaskCount() {
329    return pool.getTaskCount();
330  }
331
332  @Override
333  public int getPoolLargestPoolSize() {
334    return pool.getLargestPoolSize();
335  }
336
337  @Override
338  public int getCorePoolSize() {
339    return pool.getCorePoolSize();
340  }
341
342  @Override
343  public void setCorePoolSize(int newCoreSize) {
344    pool.setCorePoolSize(newCoreSize);
345  }
346
347  @Override
348  public int getMaxPoolSize() {
349    return pool.getMaximumPoolSize();
350  }
351
352  @Override
353  public void setMaxPoolSize(int newMaxSize) {
354    pool.setMaximumPoolSize(newMaxSize);
355  }
356
357  @Override
358  public long getFailedIncrements() {
359    return failedIncrements.sum();
360  }
361
362  @Override
363  public long getSuccessfulCoalescings() {
364    return successfulCoalescings.sum();
365  }
366
367  @Override
368  public long getTotalIncrements() {
369    return totalIncrements.sum();
370  }
371
372  @Override
373  public long getCountersMapSize() {
374    return countersMap.size();
375  }
376}