001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.thrift;
019
020import java.io.IOException;
021import java.util.Arrays;
022import java.util.List;
023import java.util.Set;
024import java.util.concurrent.Callable;
025import java.util.concurrent.ConcurrentHashMap;
026import java.util.concurrent.ConcurrentMap;
027import java.util.concurrent.LinkedBlockingQueue;
028import java.util.concurrent.ThreadPoolExecutor;
029import java.util.concurrent.TimeUnit;
030import java.util.concurrent.atomic.LongAdder;
031import org.apache.hadoop.hbase.CellUtil;
032import org.apache.hadoop.hbase.client.Table;
033import org.apache.hadoop.hbase.thrift.generated.TIncrement;
034import org.apache.hadoop.hbase.util.Bytes;
035import org.apache.hadoop.hbase.util.Threads;
036import org.apache.hadoop.metrics2.util.MBeans;
037import org.apache.yetus.audience.InterfaceAudience;
038import org.slf4j.Logger;
039import org.slf4j.LoggerFactory;
040
041/**
042 * This class will coalesce increments from a thift server if
043 * hbase.regionserver.thrift.coalesceIncrement is set to true. Turning this
044 * config to true will cause the thrift server to queue increments into an
045 * instance of this class. The thread pool associated with this class will drain
046 * the coalesced increments as the thread is able. This can cause data loss if the
047 * thrift server dies or is shut down before everything in the queue is drained.
048 *
049 */
050@InterfaceAudience.Private
051public class IncrementCoalescer implements IncrementCoalescerMBean {
052  /**
053   * Used to identify a cell that will be incremented.
054   *
055   */
056  static class FullyQualifiedRow {
057    private byte[] table;
058    private byte[] rowKey;
059    private byte[] family;
060    private byte[] qualifier;
061
062    public FullyQualifiedRow(byte[] table, byte[] rowKey, byte[] fam, byte[] qual) {
063      super();
064      this.table = table;
065      this.rowKey = rowKey;
066      this.family = fam;
067      this.qualifier = qual;
068    }
069
070    public byte[] getTable() {
071      return table;
072    }
073
074    public void setTable(byte[] table) {
075      this.table = table;
076    }
077
078    public byte[] getRowKey() {
079      return rowKey;
080    }
081
082    public byte[] getFamily() {
083      return family;
084    }
085
086    public void setFamily(byte[] fam) {
087      this.family = fam;
088    }
089
090    public byte[] getQualifier() {
091      return qualifier;
092    }
093
094    public void setQualifier(byte[] qual) {
095      this.qualifier = qual;
096    }
097
098    @Override
099    public int hashCode() {
100      final int prime = 31;
101      int result = 1;
102      result = prime * result + Arrays.hashCode(family);
103      result = prime * result + Arrays.hashCode(qualifier);
104      result = prime * result + Arrays.hashCode(rowKey);
105      result = prime * result + Arrays.hashCode(table);
106      return result;
107    }
108
109    @Override
110    public boolean equals(Object obj) {
111      if (this == obj) return true;
112      if (obj == null) return false;
113      if (getClass() != obj.getClass()) return false;
114      FullyQualifiedRow other = (FullyQualifiedRow) obj;
115
116      if (!Arrays.equals(family, other.family)) {
117        return false;
118      }
119      if (!Arrays.equals(qualifier, other.qualifier)) {
120        return false;
121      }
122      if (!Arrays.equals(rowKey, other.rowKey)) {
123        return false;
124      }
125
126      return Arrays.equals(table, other.table);
127    }
128  }
129
130  private final LongAdder failedIncrements = new LongAdder();
131  private final LongAdder successfulCoalescings = new LongAdder();
132  private final LongAdder totalIncrements = new LongAdder();
133  private final ConcurrentMap<FullyQualifiedRow, Long> countersMap =
134      new ConcurrentHashMap<>(100000, 0.75f, 1500);
135  private final ThreadPoolExecutor pool;
136  private final ThriftHBaseServiceHandler handler;
137
138  private int maxQueueSize = 500000;
139  private static final int CORE_POOL_SIZE = 1;
140
141  private static final Logger LOG = LoggerFactory.getLogger(IncrementCoalescer.class);
142
143  public IncrementCoalescer(ThriftHBaseServiceHandler hand) {
144    this.handler = hand;
145    LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
146    pool = new ThreadPoolExecutor(CORE_POOL_SIZE, CORE_POOL_SIZE, 50,
147        TimeUnit.MILLISECONDS, queue,
148        Threads.newDaemonThreadFactory("IncrementCoalescer"));
149    MBeans.register("thrift", "Thrift", this);
150  }
151
152  public boolean queueIncrement(TIncrement inc) {
153    if (!canQueue()) {
154      failedIncrements.increment();
155      return false;
156    }
157    return internalQueueTincrement(inc);
158  }
159
160  public boolean queueIncrements(List<TIncrement> incs) {
161    if (!canQueue()) {
162      failedIncrements.increment();
163      return false;
164    }
165
166    for (TIncrement tinc : incs) {
167      internalQueueTincrement(tinc);
168    }
169    return true;
170
171  }
172
173  private boolean internalQueueTincrement(TIncrement inc) {
174    byte[][] famAndQf = CellUtil.parseColumn(inc.getColumn());
175    if (famAndQf.length != 2) return false;
176
177    return internalQueueIncrement(inc.getTable(), inc.getRow(), famAndQf[0], famAndQf[1],
178      inc.getAmmount());
179  }
180
181  @SuppressWarnings("FutureReturnValueIgnored")
182  private boolean internalQueueIncrement(byte[] tableName, byte[] rowKey, byte[] fam,
183      byte[] qual, long ammount) {
184    int countersMapSize = countersMap.size();
185
186
187    //Make sure that the number of threads is scaled.
188    dynamicallySetCoreSize(countersMapSize);
189
190    totalIncrements.increment();
191
192    FullyQualifiedRow key = new FullyQualifiedRow(tableName, rowKey, fam, qual);
193
194    long currentAmount = ammount;
195    // Spin until able to insert the value back without collisions
196    while (true) {
197      Long value = countersMap.remove(key);
198      if (value == null) {
199        // There was nothing there, create a new value
200        value = currentAmount;
201      } else {
202        value += currentAmount;
203        successfulCoalescings.increment();
204      }
205      // Try to put the value, only if there was none
206      Long oldValue = countersMap.putIfAbsent(key, value);
207      if (oldValue == null) {
208        // We were able to put it in, we're done
209        break;
210      }
211      // Someone else was able to put a value in, so let's remember our
212      // current value (plus what we picked up) and retry to add it in
213      currentAmount = value;
214    }
215
216    // We limit the size of the queue simply because all we need is a
217    // notification that something needs to be incremented. No need
218    // for millions of callables that mean the same thing.
219    if (pool.getQueue().size() <= 1000) {
220      // queue it up
221      Callable<Integer> callable = createIncCallable();
222      pool.submit(callable);
223    }
224
225    return true;
226  }
227
228  public boolean canQueue() {
229    return countersMap.size() < maxQueueSize;
230  }
231
232  private Callable<Integer> createIncCallable() {
233    return () -> {
234      int failures = 0;
235      Set<FullyQualifiedRow> keys = countersMap.keySet();
236      for (FullyQualifiedRow row : keys) {
237        Long counter = countersMap.remove(row);
238        if (counter == null) {
239          continue;
240        }
241        Table table = null;
242        try {
243          table = handler.getTable(row.getTable());
244          if (failures > 2) {
245            throw new IOException("Auto-Fail rest of ICVs");
246          }
247          table.incrementColumnValue(row.getRowKey(), row.getFamily(), row.getQualifier(),
248            counter);
249        } catch (IOException e) {
250          // log failure of increment
251          failures++;
252          LOG.error("FAILED_ICV: " + Bytes.toString(row.getTable()) + ", "
253              + Bytes.toStringBinary(row.getRowKey()) + ", "
254              + Bytes.toStringBinary(row.getFamily()) + ", "
255              + Bytes.toStringBinary(row.getQualifier()) + ", " + counter, e);
256        } finally{
257          if(table != null){
258            table.close();
259          }
260        }
261      }
262      return failures;
263    };
264  }
265
266  /**
267   * This method samples the incoming requests and, if selected, will check if
268   * the corePoolSize should be changed.
269   * @param countersMapSize
270   */
271  private void dynamicallySetCoreSize(int countersMapSize) {
272    // Here we are using countersMapSize as a random number, meaning this
273    // could be a Random object
274    if (countersMapSize % 10 != 0) {
275      return;
276    }
277    double currentRatio = (double) countersMapSize / (double) maxQueueSize;
278    int newValue = 1;
279    if (currentRatio < 0.1) {
280      // it's 1
281    } else if (currentRatio < 0.3) {
282      newValue = 2;
283    } else if (currentRatio < 0.5) {
284      newValue = 4;
285    } else if (currentRatio < 0.7) {
286      newValue = 8;
287    } else if (currentRatio < 0.9) {
288      newValue = 14;
289    } else {
290      newValue = 22;
291    }
292    if (pool.getCorePoolSize() != newValue) {
293      pool.setCorePoolSize(newValue);
294    }
295  }
296
297  // MBean get/set methods
298  @Override
299  public int getQueueSize() {
300    return pool.getQueue().size();
301  }
302
303  @Override
304  public int getMaxQueueSize() {
305    return this.maxQueueSize;
306  }
307
308  @Override
309  public void setMaxQueueSize(int newSize) {
310    this.maxQueueSize = newSize;
311  }
312
313  @Override
314  public long getPoolCompletedTaskCount() {
315    return pool.getCompletedTaskCount();
316  }
317
318  @Override
319  public long getPoolTaskCount() {
320    return pool.getTaskCount();
321  }
322
323  @Override
324  public int getPoolLargestPoolSize() {
325    return pool.getLargestPoolSize();
326  }
327
328  @Override
329  public int getCorePoolSize() {
330    return pool.getCorePoolSize();
331  }
332
333  @Override
334  public void setCorePoolSize(int newCoreSize) {
335    pool.setCorePoolSize(newCoreSize);
336  }
337
338  @Override
339  public int getMaxPoolSize() {
340    return pool.getMaximumPoolSize();
341  }
342
343  @Override
344  public void setMaxPoolSize(int newMaxSize) {
345    pool.setMaximumPoolSize(newMaxSize);
346  }
347
348  @Override
349  public long getFailedIncrements() {
350    return failedIncrements.sum();
351  }
352
353  @Override
354  public long getSuccessfulCoalescings() {
355    return successfulCoalescings.sum();
356  }
357
358  @Override
359  public long getTotalIncrements() {
360    return totalIncrements.sum();
361  }
362
363  @Override
364  public long getCountersMapSize() {
365    return countersMap.size();
366  }
367}