001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.coprocessor.example;
019
020import java.io.IOException;
021import java.math.RoundingMode;
022import java.util.ArrayList;
023import java.util.List;
024import java.util.Map;
025import java.util.NavigableMap;
026import java.util.Optional;
027import java.util.TreeMap;
028import java.util.stream.IntStream;
029import org.apache.commons.lang3.mutable.MutableLong;
030import org.apache.hadoop.hbase.Cell;
031import org.apache.hadoop.hbase.CellBuilderFactory;
032import org.apache.hadoop.hbase.CellBuilderType;
033import org.apache.hadoop.hbase.CellUtil;
034import org.apache.hadoop.hbase.HConstants;
035import org.apache.hadoop.hbase.client.Get;
036import org.apache.hadoop.hbase.client.Increment;
037import org.apache.hadoop.hbase.client.Put;
038import org.apache.hadoop.hbase.client.Result;
039import org.apache.hadoop.hbase.client.Scan;
040import org.apache.hadoop.hbase.coprocessor.ObserverContext;
041import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
042import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
043import org.apache.hadoop.hbase.coprocessor.RegionObserver;
044import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker;
045import org.apache.hadoop.hbase.regionserver.InternalScanner;
046import org.apache.hadoop.hbase.regionserver.RegionScanner;
047import org.apache.hadoop.hbase.regionserver.ScanOptions;
048import org.apache.hadoop.hbase.regionserver.ScanType;
049import org.apache.hadoop.hbase.regionserver.ScannerContext;
050import org.apache.hadoop.hbase.regionserver.Store;
051import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
052import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
053import org.apache.hadoop.hbase.util.Bytes;
054import org.apache.yetus.audience.InterfaceAudience;
055
056import org.apache.hbase.thirdparty.com.google.common.math.IntMath;
057
058/**
059 * An example for implementing a counter that reads is much less than writes, i.e, write heavy.
060 * <p>
061 * We will convert increment to put, and do aggregating when get. And of course the return value of
062 * increment is useless then.
063 * <p>
064 * Notice that this is only an example so we do not handle most corner cases, for example, you must
065 * provide a qualifier when doing a get.
066 */
067@InterfaceAudience.Private
068public class WriteHeavyIncrementObserver implements RegionCoprocessor, RegionObserver {
069
070  @Override
071  public Optional<RegionObserver> getRegionObserver() {
072    return Optional.of(this);
073  }
074
075  @Override
076  public void preFlushScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
077      ScanOptions options, FlushLifeCycleTracker tracker) throws IOException {
078    options.readAllVersions();
079  }
080
081  private Cell createCell(byte[] row, byte[] family, byte[] qualifier, long ts, long value) {
082    return CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setRow(row)
083        .setType(Cell.Type.Put).setFamily(family).setQualifier(qualifier)
084        .setTimestamp(ts).setValue(Bytes.toBytes(value)).build();
085  }
086
087  private InternalScanner wrap(byte[] family, InternalScanner scanner) {
088    return new InternalScanner() {
089
090      private List<Cell> srcResult = new ArrayList<>();
091
092      private byte[] row;
093
094      private byte[] qualifier;
095
096      private long timestamp;
097
098      private long sum;
099
100      @Override
101      public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException {
102        boolean moreRows = scanner.next(srcResult, scannerContext);
103        if (srcResult.isEmpty()) {
104          if (!moreRows && row != null) {
105            result.add(createCell(row, family, qualifier, timestamp, sum));
106          }
107          return moreRows;
108        }
109        Cell firstCell = srcResult.get(0);
110        // Check if there is a row change first. All the cells will come from the same row so just
111        // check the first one once is enough.
112        if (row == null) {
113          row = CellUtil.cloneRow(firstCell);
114          qualifier = CellUtil.cloneQualifier(firstCell);
115        } else if (!CellUtil.matchingRows(firstCell, row)) {
116          result.add(createCell(row, family, qualifier, timestamp, sum));
117          row = CellUtil.cloneRow(firstCell);
118          qualifier = CellUtil.cloneQualifier(firstCell);
119          sum = 0;
120        }
121        srcResult.forEach(c -> {
122          if (CellUtil.matchingQualifier(c, qualifier)) {
123            sum += Bytes.toLong(c.getValueArray(), c.getValueOffset());
124          } else {
125            result.add(createCell(row, family, qualifier, timestamp, sum));
126            qualifier = CellUtil.cloneQualifier(c);
127            sum = Bytes.toLong(c.getValueArray(), c.getValueOffset());
128          }
129          timestamp = c.getTimestamp();
130        });
131        if (!moreRows) {
132          result.add(createCell(row, family, qualifier, timestamp, sum));
133        }
134        srcResult.clear();
135        return moreRows;
136      }
137
138      @Override
139      public void close() throws IOException {
140        scanner.close();
141      }
142    };
143  }
144
145  @Override
146  public InternalScanner preFlush(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
147      InternalScanner scanner, FlushLifeCycleTracker tracker) throws IOException {
148    return wrap(store.getColumnFamilyDescriptor().getName(), scanner);
149  }
150
151  @Override
152  public void preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
153      ScanType scanType, ScanOptions options, CompactionLifeCycleTracker tracker,
154      CompactionRequest request) throws IOException {
155    options.readAllVersions();
156  }
157
158  @Override
159  public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
160      InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker,
161      CompactionRequest request) throws IOException {
162    return wrap(store.getColumnFamilyDescriptor().getName(), scanner);
163  }
164
165  @Override
166  public void preMemStoreCompactionCompactScannerOpen(
167      ObserverContext<RegionCoprocessorEnvironment> c, Store store, ScanOptions options)
168      throws IOException {
169    options.readAllVersions();
170  }
171
172  @Override
173  public InternalScanner preMemStoreCompactionCompact(
174      ObserverContext<RegionCoprocessorEnvironment> c, Store store, InternalScanner scanner)
175      throws IOException {
176    return wrap(store.getColumnFamilyDescriptor().getName(), scanner);
177  }
178
179  @Override
180  public void preGetOp(ObserverContext<RegionCoprocessorEnvironment> c, Get get, List<Cell> result)
181      throws IOException {
182    Scan scan =
183        new Scan().withStartRow(get.getRow()).withStopRow(get.getRow(), true).readAllVersions();
184    NavigableMap<byte[], NavigableMap<byte[], MutableLong>> sums =
185        new TreeMap<>(Bytes.BYTES_COMPARATOR);
186    get.getFamilyMap().forEach((cf, cqs) -> {
187      NavigableMap<byte[], MutableLong> ss = new TreeMap<>(Bytes.BYTES_COMPARATOR);
188      sums.put(cf, ss);
189      cqs.forEach(cq -> {
190        ss.put(cq, new MutableLong(0));
191        scan.addColumn(cf, cq);
192      });
193    });
194    List<Cell> cells = new ArrayList<>();
195    try (RegionScanner scanner = c.getEnvironment().getRegion().getScanner(scan)) {
196      boolean moreRows;
197      do {
198        moreRows = scanner.next(cells);
199        for (Cell cell : cells) {
200          byte[] family = CellUtil.cloneFamily(cell);
201          byte[] qualifier = CellUtil.cloneQualifier(cell);
202          long value = Bytes.toLong(cell.getValueArray(), cell.getValueOffset());
203          sums.get(family).get(qualifier).add(value);
204        }
205        cells.clear();
206      } while (moreRows);
207    }
208    sums.forEach((cf, m) -> m.forEach((cq, s) -> result
209        .add(createCell(get.getRow(), cf, cq, HConstants.LATEST_TIMESTAMP, s.longValue()))));
210    c.bypass();
211  }
212
213  private final int mask;
214  private final MutableLong[] lastTimestamps;
215  {
216    int stripes =
217        1 << IntMath.log2(Runtime.getRuntime().availableProcessors(), RoundingMode.CEILING);
218    lastTimestamps =
219        IntStream.range(0, stripes).mapToObj(i -> new MutableLong()).toArray(MutableLong[]::new);
220    mask = stripes - 1;
221  }
222
223  // We need make sure the different put uses different timestamp otherwise we may lost some
224  // increments. This is a known issue for HBase.
225  private long getUniqueTimestamp(byte[] row) {
226    int slot = Bytes.hashCode(row) & mask;
227    MutableLong lastTimestamp = lastTimestamps[slot];
228    long now = System.currentTimeMillis();
229    synchronized (lastTimestamp) {
230      long pt = lastTimestamp.longValue() >> 10;
231      if (now > pt) {
232        lastTimestamp.setValue(now << 10);
233      } else {
234        lastTimestamp.increment();
235      }
236      return lastTimestamp.longValue();
237    }
238  }
239
240  @Override
241  public Result preIncrement(ObserverContext<RegionCoprocessorEnvironment> c, Increment increment)
242      throws IOException {
243    byte[] row = increment.getRow();
244    Put put = new Put(row);
245    long ts = getUniqueTimestamp(row);
246    for (Map.Entry<byte[], List<Cell>> entry : increment.getFamilyCellMap().entrySet()) {
247      for (Cell cell : entry.getValue()) {
248        put.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setRow(row)
249            .setFamily(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength())
250            .setQualifier(cell.getQualifierArray(), cell.getQualifierOffset(),
251              cell.getQualifierLength())
252            .setValue(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength())
253            .setType(Cell.Type.Put).setTimestamp(ts).build());
254      }
255    }
256    c.getEnvironment().getRegion().put(put);
257    c.bypass();
258    return Result.EMPTY_RESULT;
259  }
260
261  @Override
262  public void preStoreScannerOpen(ObserverContext<RegionCoprocessorEnvironment> ctx, Store store,
263      ScanOptions options) throws IOException {
264    options.readAllVersions();
265  }
266}