001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.thrift;
020
021import java.io.IOException;
022import java.util.Arrays;
023import java.util.List;
024import java.util.Set;
025import java.util.concurrent.Callable;
026import java.util.concurrent.ConcurrentHashMap;
027import java.util.concurrent.ConcurrentMap;
028import java.util.concurrent.LinkedBlockingQueue;
029import java.util.concurrent.ThreadPoolExecutor;
030import java.util.concurrent.TimeUnit;
031import java.util.concurrent.atomic.LongAdder;
032import org.apache.hadoop.hbase.CellUtil;
033import org.apache.hadoop.hbase.client.Table;
034import org.apache.hadoop.hbase.thrift.ThriftServerRunner.HBaseHandler;
035import org.apache.hadoop.hbase.thrift.generated.TIncrement;
036import org.apache.hadoop.hbase.util.Bytes;
037import org.apache.hadoop.hbase.util.Threads;
038import org.apache.hadoop.metrics2.util.MBeans;
039import org.apache.thrift.TException;
040import org.apache.yetus.audience.InterfaceAudience;
041import org.slf4j.Logger;
042import org.slf4j.LoggerFactory;
043
044/**
045 * This class will coalesce increments from a thift server if
046 * hbase.regionserver.thrift.coalesceIncrement is set to true. Turning this
047 * config to true will cause the thrift server to queue increments into an
048 * instance of this class. The thread pool associated with this class will drain
049 * the coalesced increments as the thread is able. This can cause data loss if the
050 * thrift server dies or is shut down before everything in the queue is drained.
051 *
052 */
053@InterfaceAudience.Private
054public class IncrementCoalescer implements IncrementCoalescerMBean {
055
056  /**
057   * Used to identify a cell that will be incremented.
058   *
059   */
060  static class FullyQualifiedRow {
061    private byte[] table;
062    private byte[] rowKey;
063    private byte[] family;
064    private byte[] qualifier;
065
066    public FullyQualifiedRow(byte[] table, byte[] rowKey, byte[] fam, byte[] qual) {
067      super();
068      this.table = table;
069      this.rowKey = rowKey;
070      this.family = fam;
071      this.qualifier = qual;
072    }
073
074    public byte[] getTable() {
075      return table;
076    }
077
078    public void setTable(byte[] table) {
079      this.table = table;
080    }
081
082    public byte[] getRowKey() {
083      return rowKey;
084    }
085
086    public void setRowKey(byte[] rowKey) {
087      this.rowKey = rowKey;
088    }
089
090    public byte[] getFamily() {
091      return family;
092    }
093
094    public void setFamily(byte[] fam) {
095      this.family = fam;
096    }
097
098    public byte[] getQualifier() {
099      return qualifier;
100    }
101
102    public void setQualifier(byte[] qual) {
103      this.qualifier = qual;
104    }
105
106    @Override
107    public int hashCode() {
108      final int prime = 31;
109      int result = 1;
110      result = prime * result + Arrays.hashCode(family);
111      result = prime * result + Arrays.hashCode(qualifier);
112      result = prime * result + Arrays.hashCode(rowKey);
113      result = prime * result + Arrays.hashCode(table);
114      return result;
115    }
116
117    @Override
118    public boolean equals(Object obj) {
119      if (this == obj) return true;
120      if (obj == null) return false;
121      if (getClass() != obj.getClass()) return false;
122      FullyQualifiedRow other = (FullyQualifiedRow) obj;
123      if (!Arrays.equals(family, other.family)) return false;
124      if (!Arrays.equals(qualifier, other.qualifier)) return false;
125      if (!Arrays.equals(rowKey, other.rowKey)) return false;
126      if (!Arrays.equals(table, other.table)) return false;
127      return true;
128    }
129
130  }
131
132  private final LongAdder failedIncrements = new LongAdder();
133  private final LongAdder successfulCoalescings = new LongAdder();
134  private final LongAdder totalIncrements = new LongAdder();
135  private final ConcurrentMap<FullyQualifiedRow, Long> countersMap =
136      new ConcurrentHashMap<>(100000, 0.75f, 1500);
137  private final ThreadPoolExecutor pool;
138  private final HBaseHandler handler;
139
140  private int maxQueueSize = 500000;
141  private static final int CORE_POOL_SIZE = 1;
142
143  private static final Logger LOG = LoggerFactory.getLogger(FullyQualifiedRow.class);
144
145  @SuppressWarnings("deprecation")
146  public IncrementCoalescer(HBaseHandler hand) {
147    this.handler = hand;
148    LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
149    pool = new ThreadPoolExecutor(CORE_POOL_SIZE, CORE_POOL_SIZE, 50,
150        TimeUnit.MILLISECONDS, queue,
151        Threads.newDaemonThreadFactory("IncrementCoalescer"));
152    MBeans.register("thrift", "Thrift", this);
153  }
154
155  public boolean queueIncrement(TIncrement inc) throws TException {
156    if (!canQueue()) {
157      failedIncrements.increment();
158      return false;
159    }
160    return internalQueueTincrement(inc);
161  }
162
163  public boolean queueIncrements(List<TIncrement> incs) throws TException {
164    if (!canQueue()) {
165      failedIncrements.increment();
166      return false;
167    }
168
169    for (TIncrement tinc : incs) {
170      internalQueueTincrement(tinc);
171    }
172    return true;
173
174  }
175
176  private boolean internalQueueTincrement(TIncrement inc) throws TException {
177    byte[][] famAndQf = CellUtil.parseColumn(inc.getColumn());
178    if (famAndQf.length != 2) return false;
179
180    return internalQueueIncrement(inc.getTable(), inc.getRow(), famAndQf[0], famAndQf[1],
181      inc.getAmmount());
182  }
183
184  private boolean internalQueueIncrement(byte[] tableName, byte[] rowKey, byte[] fam,
185      byte[] qual, long ammount) throws TException {
186    int countersMapSize = countersMap.size();
187
188
189    //Make sure that the number of threads is scaled.
190    dynamicallySetCoreSize(countersMapSize);
191
192    totalIncrements.increment();
193
194    FullyQualifiedRow key = new FullyQualifiedRow(tableName, rowKey, fam, qual);
195
196    long currentAmount = ammount;
197    // Spin until able to insert the value back without collisions
198    while (true) {
199      Long value = countersMap.remove(key);
200      if (value == null) {
201        // There was nothing there, create a new value
202        value = Long.valueOf(currentAmount);
203      } else {
204        value += currentAmount;
205        successfulCoalescings.increment();
206      }
207      // Try to put the value, only if there was none
208      Long oldValue = countersMap.putIfAbsent(key, value);
209      if (oldValue == null) {
210        // We were able to put it in, we're done
211        break;
212      }
213      // Someone else was able to put a value in, so let's remember our
214      // current value (plus what we picked up) and retry to add it in
215      currentAmount = value;
216    }
217
218    // We limit the size of the queue simply because all we need is a
219    // notification that something needs to be incremented. No need
220    // for millions of callables that mean the same thing.
221    if (pool.getQueue().size() <= 1000) {
222      // queue it up
223      Callable<Integer> callable = createIncCallable();
224      pool.submit(callable);
225    }
226
227    return true;
228  }
229
230  public boolean canQueue() {
231    return countersMap.size() < maxQueueSize;
232  }
233
234  private Callable<Integer> createIncCallable() {
235    return new Callable<Integer>() {
236      @Override
237      public Integer call() throws Exception {
238        int failures = 0;
239        Set<FullyQualifiedRow> keys = countersMap.keySet();
240        for (FullyQualifiedRow row : keys) {
241          Long counter = countersMap.remove(row);
242          if (counter == null) {
243            continue;
244          }
245          Table table = null;
246          try {
247            table = handler.getTable(row.getTable());
248            if (failures > 2) {
249              throw new IOException("Auto-Fail rest of ICVs");
250            }
251            table.incrementColumnValue(row.getRowKey(), row.getFamily(), row.getQualifier(),
252              counter);
253          } catch (IOException e) {
254            // log failure of increment
255            failures++;
256            LOG.error("FAILED_ICV: " + Bytes.toString(row.getTable()) + ", "
257                + Bytes.toStringBinary(row.getRowKey()) + ", "
258                + Bytes.toStringBinary(row.getFamily()) + ", "
259                + Bytes.toStringBinary(row.getQualifier()) + ", " + counter, e);
260          } finally{
261            if(table != null){
262              table.close();
263            }
264          }
265        }
266        return failures;
267      }
268    };
269  }
270
271  /**
272   * This method samples the incoming requests and, if selected, will check if
273   * the corePoolSize should be changed.
274   * @param countersMapSize
275   */
276  private void dynamicallySetCoreSize(int countersMapSize) {
277    // Here we are using countersMapSize as a random number, meaning this
278    // could be a Random object
279    if (countersMapSize % 10 != 0) {
280      return;
281    }
282    double currentRatio = (double) countersMapSize / (double) maxQueueSize;
283    int newValue = 1;
284    if (currentRatio < 0.1) {
285      // it's 1
286    } else if (currentRatio < 0.3) {
287      newValue = 2;
288    } else if (currentRatio < 0.5) {
289      newValue = 4;
290    } else if (currentRatio < 0.7) {
291      newValue = 8;
292    } else if (currentRatio < 0.9) {
293      newValue = 14;
294    } else {
295      newValue = 22;
296    }
297    if (pool.getCorePoolSize() != newValue) {
298      pool.setCorePoolSize(newValue);
299    }
300  }
301
302  // MBean get/set methods
303  @Override
304  public int getQueueSize() {
305    return pool.getQueue().size();
306  }
307
308  @Override
309  public int getMaxQueueSize() {
310    return this.maxQueueSize;
311  }
312
313  @Override
314  public void setMaxQueueSize(int newSize) {
315    this.maxQueueSize = newSize;
316  }
317
318  @Override
319  public long getPoolCompletedTaskCount() {
320    return pool.getCompletedTaskCount();
321  }
322
323  @Override
324  public long getPoolTaskCount() {
325    return pool.getTaskCount();
326  }
327
328  @Override
329  public int getPoolLargestPoolSize() {
330    return pool.getLargestPoolSize();
331  }
332
333  @Override
334  public int getCorePoolSize() {
335    return pool.getCorePoolSize();
336  }
337
338  @Override
339  public void setCorePoolSize(int newCoreSize) {
340    pool.setCorePoolSize(newCoreSize);
341  }
342
343  @Override
344  public int getMaxPoolSize() {
345    return pool.getMaximumPoolSize();
346  }
347
348  @Override
349  public void setMaxPoolSize(int newMaxSize) {
350    pool.setMaximumPoolSize(newMaxSize);
351  }
352
353  @Override
354  public long getFailedIncrements() {
355    return failedIncrements.sum();
356  }
357
358  @Override
359  public long getSuccessfulCoalescings() {
360    return successfulCoalescings.sum();
361  }
362
363  @Override
364  public long getTotalIncrements() {
365    return totalIncrements.sum();
366  }
367
368  @Override
369  public long getCountersMapSize() {
370    return countersMap.size();
371  }
372
373}