001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.thrift;
019
020import java.io.IOException;
021import java.util.Arrays;
022import java.util.List;
023import java.util.Set;
024import java.util.concurrent.Callable;
025import java.util.concurrent.ConcurrentHashMap;
026import java.util.concurrent.ConcurrentMap;
027import java.util.concurrent.LinkedBlockingQueue;
028import java.util.concurrent.ThreadPoolExecutor;
029import java.util.concurrent.TimeUnit;
030import java.util.concurrent.atomic.LongAdder;
031import org.apache.hadoop.hbase.CellUtil;
032import org.apache.hadoop.hbase.client.Table;
033import org.apache.hadoop.hbase.thrift.generated.TIncrement;
034import org.apache.hadoop.hbase.util.Bytes;
035import org.apache.hadoop.hbase.util.Threads;
036import org.apache.hadoop.metrics2.util.MBeans;
037import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
038import org.apache.yetus.audience.InterfaceAudience;
039import org.slf4j.Logger;
040import org.slf4j.LoggerFactory;
041
042/**
043 * This class will coalesce increments from a thift server if
044 * hbase.regionserver.thrift.coalesceIncrement is set to true. Turning this
045 * config to true will cause the thrift server to queue increments into an
046 * instance of this class. The thread pool associated with this class will drain
047 * the coalesced increments as the thread is able. This can cause data loss if the
048 * thrift server dies or is shut down before everything in the queue is drained.
049 *
050 */
051@InterfaceAudience.Private
052public class IncrementCoalescer implements IncrementCoalescerMBean {
053  /**
054   * Used to identify a cell that will be incremented.
055   *
056   */
057  static class FullyQualifiedRow {
058    private byte[] table;
059    private byte[] rowKey;
060    private byte[] family;
061    private byte[] qualifier;
062
063    public FullyQualifiedRow(byte[] table, byte[] rowKey, byte[] fam, byte[] qual) {
064      super();
065      this.table = table;
066      this.rowKey = rowKey;
067      this.family = fam;
068      this.qualifier = qual;
069    }
070
071    public byte[] getTable() {
072      return table;
073    }
074
075    public void setTable(byte[] table) {
076      this.table = table;
077    }
078
079    public byte[] getRowKey() {
080      return rowKey;
081    }
082
083    public byte[] getFamily() {
084      return family;
085    }
086
087    public void setFamily(byte[] fam) {
088      this.family = fam;
089    }
090
091    public byte[] getQualifier() {
092      return qualifier;
093    }
094
095    public void setQualifier(byte[] qual) {
096      this.qualifier = qual;
097    }
098
099    @Override
100    public int hashCode() {
101      final int prime = 31;
102      int result = 1;
103      result = prime * result + Arrays.hashCode(family);
104      result = prime * result + Arrays.hashCode(qualifier);
105      result = prime * result + Arrays.hashCode(rowKey);
106      result = prime * result + Arrays.hashCode(table);
107      return result;
108    }
109
110    @Override
111    public boolean equals(Object obj) {
112      if (this == obj) return true;
113      if (obj == null) return false;
114      if (getClass() != obj.getClass()) return false;
115      FullyQualifiedRow other = (FullyQualifiedRow) obj;
116
117      if (!Arrays.equals(family, other.family)) {
118        return false;
119      }
120      if (!Arrays.equals(qualifier, other.qualifier)) {
121        return false;
122      }
123      if (!Arrays.equals(rowKey, other.rowKey)) {
124        return false;
125      }
126
127      return Arrays.equals(table, other.table);
128    }
129  }
130
131  private final LongAdder failedIncrements = new LongAdder();
132  private final LongAdder successfulCoalescings = new LongAdder();
133  private final LongAdder totalIncrements = new LongAdder();
134  private final ConcurrentMap<FullyQualifiedRow, Long> countersMap =
135      new ConcurrentHashMap<>(100000, 0.75f, 1500);
136  private final ThreadPoolExecutor pool;
137  private final ThriftHBaseServiceHandler handler;
138
139  private int maxQueueSize = 500000;
140  private static final int CORE_POOL_SIZE = 1;
141
142  private static final Logger LOG = LoggerFactory.getLogger(IncrementCoalescer.class);
143
144  public IncrementCoalescer(ThriftHBaseServiceHandler hand) {
145    this.handler = hand;
146    LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
147    pool = new ThreadPoolExecutor(CORE_POOL_SIZE, CORE_POOL_SIZE, 50, TimeUnit.MILLISECONDS, queue,
148      new ThreadFactoryBuilder().setNameFormat("IncrementCoalescer-pool-%d").setDaemon(true)
149        .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
150    MBeans.register("thrift", "Thrift", this);
151  }
152
153  public boolean queueIncrement(TIncrement inc) {
154    if (!canQueue()) {
155      failedIncrements.increment();
156      return false;
157    }
158    return internalQueueTincrement(inc);
159  }
160
161  public boolean queueIncrements(List<TIncrement> incs) {
162    if (!canQueue()) {
163      failedIncrements.increment();
164      return false;
165    }
166
167    for (TIncrement tinc : incs) {
168      internalQueueTincrement(tinc);
169    }
170    return true;
171
172  }
173
174  private boolean internalQueueTincrement(TIncrement inc) {
175    byte[][] famAndQf = CellUtil.parseColumn(inc.getColumn());
176    if (famAndQf.length != 2) return false;
177
178    return internalQueueIncrement(inc.getTable(), inc.getRow(), famAndQf[0], famAndQf[1],
179      inc.getAmmount());
180  }
181
182  @SuppressWarnings("FutureReturnValueIgnored")
183  private boolean internalQueueIncrement(byte[] tableName, byte[] rowKey, byte[] fam,
184      byte[] qual, long ammount) {
185    int countersMapSize = countersMap.size();
186
187
188    //Make sure that the number of threads is scaled.
189    dynamicallySetCoreSize(countersMapSize);
190
191    totalIncrements.increment();
192
193    FullyQualifiedRow key = new FullyQualifiedRow(tableName, rowKey, fam, qual);
194
195    long currentAmount = ammount;
196    // Spin until able to insert the value back without collisions
197    while (true) {
198      Long value = countersMap.remove(key);
199      if (value == null) {
200        // There was nothing there, create a new value
201        value = currentAmount;
202      } else {
203        value += currentAmount;
204        successfulCoalescings.increment();
205      }
206      // Try to put the value, only if there was none
207      Long oldValue = countersMap.putIfAbsent(key, value);
208      if (oldValue == null) {
209        // We were able to put it in, we're done
210        break;
211      }
212      // Someone else was able to put a value in, so let's remember our
213      // current value (plus what we picked up) and retry to add it in
214      currentAmount = value;
215    }
216
217    // We limit the size of the queue simply because all we need is a
218    // notification that something needs to be incremented. No need
219    // for millions of callables that mean the same thing.
220    if (pool.getQueue().size() <= 1000) {
221      // queue it up
222      Callable<Integer> callable = createIncCallable();
223      pool.submit(callable);
224    }
225
226    return true;
227  }
228
229  public boolean canQueue() {
230    return countersMap.size() < maxQueueSize;
231  }
232
233  private Callable<Integer> createIncCallable() {
234    return () -> {
235      int failures = 0;
236      Set<FullyQualifiedRow> keys = countersMap.keySet();
237      for (FullyQualifiedRow row : keys) {
238        Long counter = countersMap.remove(row);
239        if (counter == null) {
240          continue;
241        }
242        Table table = null;
243        try {
244          table = handler.getTable(row.getTable());
245          if (failures > 2) {
246            throw new IOException("Auto-Fail rest of ICVs");
247          }
248          table.incrementColumnValue(row.getRowKey(), row.getFamily(), row.getQualifier(),
249            counter);
250        } catch (IOException e) {
251          // log failure of increment
252          failures++;
253          LOG.error("FAILED_ICV: " + Bytes.toString(row.getTable()) + ", "
254              + Bytes.toStringBinary(row.getRowKey()) + ", "
255              + Bytes.toStringBinary(row.getFamily()) + ", "
256              + Bytes.toStringBinary(row.getQualifier()) + ", " + counter, e);
257        } finally{
258          if(table != null){
259            table.close();
260          }
261        }
262      }
263      return failures;
264    };
265  }
266
267  /**
268   * This method samples the incoming requests and, if selected, will check if
269   * the corePoolSize should be changed.
270   * @param countersMapSize
271   */
272  private void dynamicallySetCoreSize(int countersMapSize) {
273    // Here we are using countersMapSize as a random number, meaning this
274    // could be a Random object
275    if (countersMapSize % 10 != 0) {
276      return;
277    }
278    double currentRatio = (double) countersMapSize / (double) maxQueueSize;
279    int newValue = 1;
280    if (currentRatio < 0.1) {
281      // it's 1
282    } else if (currentRatio < 0.3) {
283      newValue = 2;
284    } else if (currentRatio < 0.5) {
285      newValue = 4;
286    } else if (currentRatio < 0.7) {
287      newValue = 8;
288    } else if (currentRatio < 0.9) {
289      newValue = 14;
290    } else {
291      newValue = 22;
292    }
293    if (pool.getCorePoolSize() != newValue) {
294      pool.setCorePoolSize(newValue);
295    }
296  }
297
298  // MBean get/set methods
299  @Override
300  public int getQueueSize() {
301    return pool.getQueue().size();
302  }
303
304  @Override
305  public int getMaxQueueSize() {
306    return this.maxQueueSize;
307  }
308
309  @Override
310  public void setMaxQueueSize(int newSize) {
311    this.maxQueueSize = newSize;
312  }
313
314  @Override
315  public long getPoolCompletedTaskCount() {
316    return pool.getCompletedTaskCount();
317  }
318
319  @Override
320  public long getPoolTaskCount() {
321    return pool.getTaskCount();
322  }
323
324  @Override
325  public int getPoolLargestPoolSize() {
326    return pool.getLargestPoolSize();
327  }
328
329  @Override
330  public int getCorePoolSize() {
331    return pool.getCorePoolSize();
332  }
333
334  @Override
335  public void setCorePoolSize(int newCoreSize) {
336    pool.setCorePoolSize(newCoreSize);
337  }
338
339  @Override
340  public int getMaxPoolSize() {
341    return pool.getMaximumPoolSize();
342  }
343
344  @Override
345  public void setMaxPoolSize(int newMaxSize) {
346    pool.setMaximumPoolSize(newMaxSize);
347  }
348
349  @Override
350  public long getFailedIncrements() {
351    return failedIncrements.sum();
352  }
353
354  @Override
355  public long getSuccessfulCoalescings() {
356    return successfulCoalescings.sum();
357  }
358
359  @Override
360  public long getTotalIncrements() {
361    return totalIncrements.sum();
362  }
363
364  @Override
365  public long getCountersMapSize() {
366    return countersMap.size();
367  }
368}