001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.thrift;
020
021import java.io.IOException;
022import java.util.Arrays;
023import java.util.List;
024import java.util.Set;
025import java.util.concurrent.Callable;
026import java.util.concurrent.ConcurrentHashMap;
027import java.util.concurrent.ConcurrentMap;
028import java.util.concurrent.LinkedBlockingQueue;
029import java.util.concurrent.ThreadFactory;
030import java.util.concurrent.ThreadPoolExecutor;
031import java.util.concurrent.TimeUnit;
032import java.util.concurrent.atomic.AtomicInteger;
033import java.util.concurrent.atomic.LongAdder;
034import org.apache.hadoop.hbase.CellUtil;
035import org.apache.hadoop.hbase.client.Table;
036import org.apache.hadoop.hbase.thrift.ThriftServerRunner.HBaseHandler;
037import org.apache.hadoop.hbase.thrift.generated.TIncrement;
038import org.apache.hadoop.hbase.util.Bytes;
039import org.apache.hadoop.hbase.util.Threads;
040import org.apache.hadoop.metrics2.util.MBeans;
041import org.apache.thrift.TException;
042import org.apache.yetus.audience.InterfaceAudience;
043import org.slf4j.Logger;
044import org.slf4j.LoggerFactory;
045
046/**
047 * This class will coalesce increments from a thift server if
048 * hbase.regionserver.thrift.coalesceIncrement is set to true. Turning this
049 * config to true will cause the thrift server to queue increments into an
050 * instance of this class. The thread pool associated with this class will drain
051 * the coalesced increments as the thread is able. This can cause data loss if the
052 * thrift server dies or is shut down before everything in the queue is drained.
053 *
054 */
055@InterfaceAudience.Private
056public class IncrementCoalescer implements IncrementCoalescerMBean {
057
058  /**
059   * Used to identify a cell that will be incremented.
060   *
061   */
062  static class FullyQualifiedRow {
063    private byte[] table;
064    private byte[] rowKey;
065    private byte[] family;
066    private byte[] qualifier;
067
068    public FullyQualifiedRow(byte[] table, byte[] rowKey, byte[] fam, byte[] qual) {
069      super();
070      this.table = table;
071      this.rowKey = rowKey;
072      this.family = fam;
073      this.qualifier = qual;
074    }
075
076    public byte[] getTable() {
077      return table;
078    }
079
080    public void setTable(byte[] table) {
081      this.table = table;
082    }
083
084    public byte[] getRowKey() {
085      return rowKey;
086    }
087
088    public void setRowKey(byte[] rowKey) {
089      this.rowKey = rowKey;
090    }
091
092    public byte[] getFamily() {
093      return family;
094    }
095
096    public void setFamily(byte[] fam) {
097      this.family = fam;
098    }
099
100    public byte[] getQualifier() {
101      return qualifier;
102    }
103
104    public void setQualifier(byte[] qual) {
105      this.qualifier = qual;
106    }
107
108    @Override
109    public int hashCode() {
110      final int prime = 31;
111      int result = 1;
112      result = prime * result + Arrays.hashCode(family);
113      result = prime * result + Arrays.hashCode(qualifier);
114      result = prime * result + Arrays.hashCode(rowKey);
115      result = prime * result + Arrays.hashCode(table);
116      return result;
117    }
118
119    @Override
120    public boolean equals(Object obj) {
121      if (this == obj) return true;
122      if (obj == null) return false;
123      if (getClass() != obj.getClass()) return false;
124      FullyQualifiedRow other = (FullyQualifiedRow) obj;
125      if (!Arrays.equals(family, other.family)) return false;
126      if (!Arrays.equals(qualifier, other.qualifier)) return false;
127      if (!Arrays.equals(rowKey, other.rowKey)) return false;
128      if (!Arrays.equals(table, other.table)) return false;
129      return true;
130    }
131
132  }
133
134  static class DaemonThreadFactory implements ThreadFactory {
135    static final AtomicInteger poolNumber = new AtomicInteger(1);
136    final ThreadGroup group;
137    final AtomicInteger threadNumber = new AtomicInteger(1);
138    final String namePrefix;
139
140    DaemonThreadFactory() {
141      SecurityManager s = System.getSecurityManager();
142      group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
143      namePrefix = "ICV-" + poolNumber.getAndIncrement() + "-thread-";
144    }
145
146    @Override
147    public Thread newThread(Runnable r) {
148      Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);
149      if (!t.isDaemon()) t.setDaemon(true);
150      if (t.getPriority() != Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY);
151      return t;
152    }
153  }
154
155  private final LongAdder failedIncrements = new LongAdder();
156  private final LongAdder successfulCoalescings = new LongAdder();
157  private final LongAdder totalIncrements = new LongAdder();
158  private final ConcurrentMap<FullyQualifiedRow, Long> countersMap =
159      new ConcurrentHashMap<>(100000, 0.75f, 1500);
160  private final ThreadPoolExecutor pool;
161  private final HBaseHandler handler;
162
163  private int maxQueueSize = 500000;
164  private static final int CORE_POOL_SIZE = 1;
165
166  private static final Logger LOG = LoggerFactory.getLogger(FullyQualifiedRow.class);
167
168  @SuppressWarnings("deprecation")
169  public IncrementCoalescer(HBaseHandler hand) {
170    this.handler = hand;
171    LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
172    pool =
173        new ThreadPoolExecutor(CORE_POOL_SIZE, CORE_POOL_SIZE, 50, TimeUnit.MILLISECONDS, queue,
174            Threads.newDaemonThreadFactory("IncrementCoalescer"));
175
176    MBeans.register("thrift", "Thrift", this);
177  }
178
179  public boolean queueIncrement(TIncrement inc) throws TException {
180    if (!canQueue()) {
181      failedIncrements.increment();
182      return false;
183    }
184    return internalQueueTincrement(inc);
185  }
186
187  public boolean queueIncrements(List<TIncrement> incs) throws TException {
188    if (!canQueue()) {
189      failedIncrements.increment();
190      return false;
191    }
192
193    for (TIncrement tinc : incs) {
194      internalQueueTincrement(tinc);
195    }
196    return true;
197
198  }
199
200  private boolean internalQueueTincrement(TIncrement inc) throws TException {
201    byte[][] famAndQf = CellUtil.parseColumn(inc.getColumn());
202    if (famAndQf.length != 2) return false;
203
204    return internalQueueIncrement(inc.getTable(), inc.getRow(), famAndQf[0], famAndQf[1],
205      inc.getAmmount());
206  }
207
208  private boolean internalQueueIncrement(byte[] tableName, byte[] rowKey, byte[] fam,
209      byte[] qual, long ammount) throws TException {
210    int countersMapSize = countersMap.size();
211
212
213    //Make sure that the number of threads is scaled.
214    dynamicallySetCoreSize(countersMapSize);
215
216    totalIncrements.increment();
217
218    FullyQualifiedRow key = new FullyQualifiedRow(tableName, rowKey, fam, qual);
219
220    long currentAmount = ammount;
221    // Spin until able to insert the value back without collisions
222    while (true) {
223      Long value = countersMap.remove(key);
224      if (value == null) {
225        // There was nothing there, create a new value
226        value = Long.valueOf(currentAmount);
227      } else {
228        value += currentAmount;
229        successfulCoalescings.increment();
230      }
231      // Try to put the value, only if there was none
232      Long oldValue = countersMap.putIfAbsent(key, value);
233      if (oldValue == null) {
234        // We were able to put it in, we're done
235        break;
236      }
237      // Someone else was able to put a value in, so let's remember our
238      // current value (plus what we picked up) and retry to add it in
239      currentAmount = value;
240    }
241
242    // We limit the size of the queue simply because all we need is a
243    // notification that something needs to be incremented. No need
244    // for millions of callables that mean the same thing.
245    if (pool.getQueue().size() <= 1000) {
246      // queue it up
247      Callable<Integer> callable = createIncCallable();
248      pool.submit(callable);
249    }
250
251    return true;
252  }
253
254  public boolean canQueue() {
255    return countersMap.size() < maxQueueSize;
256  }
257
258  private Callable<Integer> createIncCallable() {
259    return new Callable<Integer>() {
260      @Override
261      public Integer call() throws Exception {
262        int failures = 0;
263        Set<FullyQualifiedRow> keys = countersMap.keySet();
264        for (FullyQualifiedRow row : keys) {
265          Long counter = countersMap.remove(row);
266          if (counter == null) {
267            continue;
268          }
269          Table table = null;
270          try {
271            table = handler.getTable(row.getTable());
272            if (failures > 2) {
273              throw new IOException("Auto-Fail rest of ICVs");
274            }
275            table.incrementColumnValue(row.getRowKey(), row.getFamily(), row.getQualifier(),
276              counter);
277          } catch (IOException e) {
278            // log failure of increment
279            failures++;
280            LOG.error("FAILED_ICV: " + Bytes.toString(row.getTable()) + ", "
281                + Bytes.toStringBinary(row.getRowKey()) + ", "
282                + Bytes.toStringBinary(row.getFamily()) + ", "
283                + Bytes.toStringBinary(row.getQualifier()) + ", " + counter, e);
284          } finally{
285            if(table != null){
286              table.close();
287            }
288          }
289        }
290        return failures;
291      }
292    };
293  }
294
295  /**
296   * This method samples the incoming requests and, if selected, will check if
297   * the corePoolSize should be changed.
298   * @param countersMapSize
299   */
300  private void dynamicallySetCoreSize(int countersMapSize) {
301    // Here we are using countersMapSize as a random number, meaning this
302    // could be a Random object
303    if (countersMapSize % 10 != 0) {
304      return;
305    }
306    double currentRatio = (double) countersMapSize / (double) maxQueueSize;
307    int newValue = 1;
308    if (currentRatio < 0.1) {
309      // it's 1
310    } else if (currentRatio < 0.3) {
311      newValue = 2;
312    } else if (currentRatio < 0.5) {
313      newValue = 4;
314    } else if (currentRatio < 0.7) {
315      newValue = 8;
316    } else if (currentRatio < 0.9) {
317      newValue = 14;
318    } else {
319      newValue = 22;
320    }
321    if (pool.getCorePoolSize() != newValue) {
322      pool.setCorePoolSize(newValue);
323    }
324  }
325
326  // MBean get/set methods
327  @Override
328  public int getQueueSize() {
329    return pool.getQueue().size();
330  }
331
332  @Override
333  public int getMaxQueueSize() {
334    return this.maxQueueSize;
335  }
336
337  @Override
338  public void setMaxQueueSize(int newSize) {
339    this.maxQueueSize = newSize;
340  }
341
342  @Override
343  public long getPoolCompletedTaskCount() {
344    return pool.getCompletedTaskCount();
345  }
346
347  @Override
348  public long getPoolTaskCount() {
349    return pool.getTaskCount();
350  }
351
352  @Override
353  public int getPoolLargestPoolSize() {
354    return pool.getLargestPoolSize();
355  }
356
357  @Override
358  public int getCorePoolSize() {
359    return pool.getCorePoolSize();
360  }
361
362  @Override
363  public void setCorePoolSize(int newCoreSize) {
364    pool.setCorePoolSize(newCoreSize);
365  }
366
367  @Override
368  public int getMaxPoolSize() {
369    return pool.getMaximumPoolSize();
370  }
371
372  @Override
373  public void setMaxPoolSize(int newMaxSize) {
374    pool.setMaximumPoolSize(newMaxSize);
375  }
376
377  @Override
378  public long getFailedIncrements() {
379    return failedIncrements.sum();
380  }
381
382  @Override
383  public long getSuccessfulCoalescings() {
384    return successfulCoalescings.sum();
385  }
386
387  @Override
388  public long getTotalIncrements() {
389    return totalIncrements.sum();
390  }
391
392  @Override
393  public long getCountersMapSize() {
394    return countersMap.size();
395  }
396
397}