001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.coprocessor.example;
019
020import java.io.IOException;
021import java.math.RoundingMode;
022import java.util.ArrayList;
023import java.util.List;
024import java.util.Map;
025import java.util.NavigableMap;
026import java.util.Optional;
027import java.util.TreeMap;
028import java.util.stream.IntStream;
029import org.apache.commons.lang3.mutable.MutableLong;
030import org.apache.hadoop.hbase.Cell;
031import org.apache.hadoop.hbase.CellBuilderFactory;
032import org.apache.hadoop.hbase.CellBuilderType;
033import org.apache.hadoop.hbase.CellUtil;
034import org.apache.hadoop.hbase.ExtendedCell;
035import org.apache.hadoop.hbase.HConstants;
036import org.apache.hadoop.hbase.client.Get;
037import org.apache.hadoop.hbase.client.Increment;
038import org.apache.hadoop.hbase.client.Put;
039import org.apache.hadoop.hbase.client.Result;
040import org.apache.hadoop.hbase.client.Scan;
041import org.apache.hadoop.hbase.coprocessor.ObserverContext;
042import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
043import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
044import org.apache.hadoop.hbase.coprocessor.RegionObserver;
045import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker;
046import org.apache.hadoop.hbase.regionserver.InternalScanner;
047import org.apache.hadoop.hbase.regionserver.RegionScanner;
048import org.apache.hadoop.hbase.regionserver.ScanOptions;
049import org.apache.hadoop.hbase.regionserver.ScanType;
050import org.apache.hadoop.hbase.regionserver.ScannerContext;
051import org.apache.hadoop.hbase.regionserver.Store;
052import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
053import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
054import org.apache.hadoop.hbase.util.Bytes;
055import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
056import org.apache.yetus.audience.InterfaceAudience;
057
058import org.apache.hbase.thirdparty.com.google.common.math.IntMath;
059
060/**
061 * An example for implementing a counter that reads is much less than writes, i.e, write heavy.
062 * <p>
063 * We will convert increment to put, and do aggregating when get. And of course the return value of
064 * increment is useless then.
065 * <p>
066 * Notice that this is only an example so we do not handle most corner cases, for example, you must
067 * provide a qualifier when doing a get.
068 */
069@InterfaceAudience.Private
070public class WriteHeavyIncrementObserver implements RegionCoprocessor, RegionObserver {
071
072  @Override
073  public Optional<RegionObserver> getRegionObserver() {
074    return Optional.of(this);
075  }
076
077  @Override
078  public void preFlushScannerOpen(ObserverContext<? extends RegionCoprocessorEnvironment> c,
079    Store store, ScanOptions options, FlushLifeCycleTracker tracker) throws IOException {
080    options.readAllVersions();
081  }
082
083  private Cell createCell(byte[] row, byte[] family, byte[] qualifier, long ts, long value) {
084    return CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setRow(row)
085      .setType(Cell.Type.Put).setFamily(family).setQualifier(qualifier).setTimestamp(ts)
086      .setValue(Bytes.toBytes(value)).build();
087  }
088
089  private InternalScanner wrap(byte[] family, InternalScanner scanner) {
090    return new InternalScanner() {
091
092      private List<Cell> srcResult = new ArrayList<>();
093
094      private byte[] row;
095
096      private byte[] qualifier;
097
098      private long timestamp;
099
100      private long sum;
101
102      @Override
103      public boolean next(List<? super ExtendedCell> result, ScannerContext scannerContext)
104        throws IOException {
105        boolean moreRows = scanner.next(srcResult, scannerContext);
106        if (srcResult.isEmpty()) {
107          if (!moreRows && row != null) {
108            // all cells in HBase are ExtendedCells, so you are fine to cast it to ExtendedCell,
109            // just do not use its methods since it may change without any deprecation cycle
110            result.add((ExtendedCell) createCell(row, family, qualifier, timestamp, sum));
111          }
112          return moreRows;
113        }
114        Cell firstCell = srcResult.get(0);
115        // Check if there is a row change first. All the cells will come from the same row so just
116        // check the first one once is enough.
117        if (row == null) {
118          row = CellUtil.cloneRow(firstCell);
119          qualifier = CellUtil.cloneQualifier(firstCell);
120        } else if (!CellUtil.matchingRows(firstCell, row)) {
121          // all cells in HBase are ExtendedCells, so you are fine to cast it to ExtendedCell,
122          // just do not use its methods since it may change without any deprecation cycle
123          result.add((ExtendedCell) createCell(row, family, qualifier, timestamp, sum));
124          row = CellUtil.cloneRow(firstCell);
125          qualifier = CellUtil.cloneQualifier(firstCell);
126          sum = 0;
127        }
128        srcResult.forEach(c -> {
129          if (CellUtil.matchingQualifier(c, qualifier)) {
130            sum += Bytes.toLong(c.getValueArray(), c.getValueOffset());
131          } else {
132            // all cells in HBase are ExtendedCells, so you are fine to cast it to ExtendedCell,
133            // just do not use its methods since it may change without any deprecation cycle
134            result.add((ExtendedCell) createCell(row, family, qualifier, timestamp, sum));
135            qualifier = CellUtil.cloneQualifier(c);
136            sum = Bytes.toLong(c.getValueArray(), c.getValueOffset());
137          }
138          timestamp = c.getTimestamp();
139        });
140        if (!moreRows) {
141          // all cells in HBase are ExtendedCells, so you are fine to cast it to ExtendedCell,
142          // just do not use its methods since it may change without any deprecation cycle
143          result.add((ExtendedCell) createCell(row, family, qualifier, timestamp, sum));
144        }
145        srcResult.clear();
146        return moreRows;
147      }
148
149      @Override
150      public void close() throws IOException {
151        scanner.close();
152      }
153    };
154  }
155
156  @Override
157  public InternalScanner preFlush(ObserverContext<? extends RegionCoprocessorEnvironment> c,
158    Store store, InternalScanner scanner, FlushLifeCycleTracker tracker) throws IOException {
159    return wrap(store.getColumnFamilyDescriptor().getName(), scanner);
160  }
161
162  @Override
163  public void preCompactScannerOpen(ObserverContext<? extends RegionCoprocessorEnvironment> c,
164    Store store, ScanType scanType, ScanOptions options, CompactionLifeCycleTracker tracker,
165    CompactionRequest request) throws IOException {
166    options.readAllVersions();
167  }
168
169  @Override
170  public InternalScanner preCompact(ObserverContext<? extends RegionCoprocessorEnvironment> c,
171    Store store, InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker,
172    CompactionRequest request) throws IOException {
173    return wrap(store.getColumnFamilyDescriptor().getName(), scanner);
174  }
175
176  @Override
177  public void preMemStoreCompactionCompactScannerOpen(
178    ObserverContext<? extends RegionCoprocessorEnvironment> c, Store store, ScanOptions options)
179    throws IOException {
180    options.readAllVersions();
181  }
182
183  @Override
184  public InternalScanner preMemStoreCompactionCompact(
185    ObserverContext<? extends RegionCoprocessorEnvironment> c, Store store, InternalScanner scanner)
186    throws IOException {
187    return wrap(store.getColumnFamilyDescriptor().getName(), scanner);
188  }
189
190  @Override
191  public void preGetOp(ObserverContext<? extends RegionCoprocessorEnvironment> c, Get get,
192    List<Cell> result) throws IOException {
193    Scan scan =
194      new Scan().withStartRow(get.getRow()).withStopRow(get.getRow(), true).readAllVersions();
195    NavigableMap<byte[], NavigableMap<byte[], MutableLong>> sums =
196      new TreeMap<>(Bytes.BYTES_COMPARATOR);
197    get.getFamilyMap().forEach((cf, cqs) -> {
198      NavigableMap<byte[], MutableLong> ss = new TreeMap<>(Bytes.BYTES_COMPARATOR);
199      sums.put(cf, ss);
200      cqs.forEach(cq -> {
201        ss.put(cq, new MutableLong(0));
202        scan.addColumn(cf, cq);
203      });
204    });
205    List<Cell> cells = new ArrayList<>();
206    try (RegionScanner scanner = c.getEnvironment().getRegion().getScanner(scan)) {
207      boolean moreRows;
208      do {
209        moreRows = scanner.next(cells);
210        for (Cell cell : cells) {
211          byte[] family = CellUtil.cloneFamily(cell);
212          byte[] qualifier = CellUtil.cloneQualifier(cell);
213          long value = Bytes.toLong(cell.getValueArray(), cell.getValueOffset());
214          sums.get(family).get(qualifier).add(value);
215        }
216        cells.clear();
217      } while (moreRows);
218    }
219    sums.forEach((cf, m) -> m.forEach((cq, s) -> result
220      .add(createCell(get.getRow(), cf, cq, HConstants.LATEST_TIMESTAMP, s.longValue()))));
221    c.bypass();
222  }
223
224  private final int mask;
225  private final MutableLong[] lastTimestamps;
226  {
227    int stripes =
228      1 << IntMath.log2(Runtime.getRuntime().availableProcessors(), RoundingMode.CEILING);
229    lastTimestamps =
230      IntStream.range(0, stripes).mapToObj(i -> new MutableLong()).toArray(MutableLong[]::new);
231    mask = stripes - 1;
232  }
233
234  // We need make sure the different put uses different timestamp otherwise we may lost some
235  // increments. This is a known issue for HBase.
236  private long getUniqueTimestamp(byte[] row) {
237    int slot = Bytes.hashCode(row) & mask;
238    MutableLong lastTimestamp = lastTimestamps[slot];
239    long now = EnvironmentEdgeManager.currentTime();
240    synchronized (lastTimestamp) {
241      long pt = lastTimestamp.longValue() >> 10;
242      if (now > pt) {
243        lastTimestamp.setValue(now << 10);
244      } else {
245        lastTimestamp.increment();
246      }
247      return lastTimestamp.longValue();
248    }
249  }
250
251  @Override
252  public Result preIncrement(ObserverContext<? extends RegionCoprocessorEnvironment> c,
253    Increment increment) throws IOException {
254    byte[] row = increment.getRow();
255    Put put = new Put(row);
256    long ts = getUniqueTimestamp(row);
257    for (Map.Entry<byte[], List<Cell>> entry : increment.getFamilyCellMap().entrySet()) {
258      for (Cell cell : entry.getValue()) {
259        put.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setRow(row)
260          .setFamily(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength())
261          .setQualifier(cell.getQualifierArray(), cell.getQualifierOffset(),
262            cell.getQualifierLength())
263          .setValue(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength())
264          .setType(Cell.Type.Put).setTimestamp(ts).build());
265      }
266    }
267    c.getEnvironment().getRegion().put(put);
268    c.bypass();
269    return Result.EMPTY_RESULT;
270  }
271
272  @Override
273  public void preStoreScannerOpen(ObserverContext<? extends RegionCoprocessorEnvironment> ctx,
274    Store store, ScanOptions options) throws IOException {
275    options.readAllVersions();
276  }
277}