001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.rest;
019
020import com.fasterxml.jackson.core.JsonParseException;
021import com.fasterxml.jackson.databind.JsonMappingException;
022import com.github.benmanes.caffeine.cache.Cache;
023import com.github.benmanes.caffeine.cache.Caffeine;
024import com.github.benmanes.caffeine.cache.RemovalCause;
025import java.io.IOException;
026import java.net.URI;
027import java.util.concurrent.TimeUnit;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.hbase.HBaseConfiguration;
030import org.apache.hadoop.hbase.TableNotFoundException;
031import org.apache.hadoop.hbase.filter.Filter;
032import org.apache.hadoop.hbase.rest.model.ScannerModel;
033import org.apache.yetus.audience.InterfaceAudience;
034import org.slf4j.Logger;
035import org.slf4j.LoggerFactory;
036
037import org.apache.hbase.thirdparty.javax.ws.rs.Consumes;
038import org.apache.hbase.thirdparty.javax.ws.rs.POST;
039import org.apache.hbase.thirdparty.javax.ws.rs.PUT;
040import org.apache.hbase.thirdparty.javax.ws.rs.Path;
041import org.apache.hbase.thirdparty.javax.ws.rs.PathParam;
042import org.apache.hbase.thirdparty.javax.ws.rs.core.Context;
043import org.apache.hbase.thirdparty.javax.ws.rs.core.Response;
044import org.apache.hbase.thirdparty.javax.ws.rs.core.UriBuilder;
045import org.apache.hbase.thirdparty.javax.ws.rs.core.UriInfo;
046
047@InterfaceAudience.Private
048public class ScannerResource extends ResourceBase {
049
050  private static final Logger LOG = LoggerFactory.getLogger(ScannerResource.class);
051
052  private static final Cache<String, ScannerInstanceResource> scanners = setupScanners();
053  TableResource tableResource;
054
055  /**
056   * Constructor
057   */
058  public ScannerResource(TableResource tableResource) throws IOException {
059    super();
060    this.tableResource = tableResource;
061  }
062
063  private static Cache<String, ScannerInstanceResource> setupScanners() {
064    final Configuration conf = HBaseConfiguration.create();
065
066    int size = conf.getInt(REST_SCANNERCACHE_SIZE, DEFAULT_REST_SCANNERCACHE_SIZE);
067    long evictTimeoutMs = conf.getTimeDuration(REST_SCANNERCACHE_EXPIRE_TIME,
068      DEFAULT_REST_SCANNERCACHE_EXPIRE_TIME_MS, TimeUnit.MILLISECONDS);
069
070    Cache<String, ScannerInstanceResource> cache =
071      Caffeine.newBuilder().removalListener(ScannerResource::removalListener).maximumSize(size)
072        .expireAfterAccess(evictTimeoutMs, TimeUnit.MILLISECONDS)
073        .<String, ScannerInstanceResource> build();
074
075    return cache;
076  }
077
078  static boolean delete(final String id) {
079    ScannerInstanceResource instance = scanners.asMap().remove(id);
080    if (instance != null) {
081      instance.generator.close();
082      return true;
083    } else {
084      return false;
085    }
086  }
087
088  static void removalListener(String key, ScannerInstanceResource value, RemovalCause cause) {
089    if (cause.wasEvicted()) {
090      delete(key);
091    }
092  }
093
094  Response update(final ScannerModel model, final boolean replace, final UriInfo uriInfo) {
095    servlet.getMetrics().incrementRequests(1);
096    if (servlet.isReadOnly()) {
097      return Response.status(Response.Status.FORBIDDEN).type(MIMETYPE_TEXT)
098        .entity("Forbidden" + CRLF).build();
099    }
100    byte[] endRow = model.hasEndRow() ? model.getEndRow() : null;
101    RowSpec spec = null;
102    if (model.getLabels() != null) {
103      spec = new RowSpec(model.getStartRow(), endRow, model.getColumns(), model.getStartTime(),
104        model.getEndTime(), model.getMaxVersions(), model.getLabels());
105    } else {
106      spec = new RowSpec(model.getStartRow(), endRow, model.getColumns(), model.getStartTime(),
107        model.getEndTime(), model.getMaxVersions());
108    }
109
110    try {
111      Filter filter = ScannerResultGenerator.buildFilterFromModel(model);
112      String tableName = tableResource.getName();
113      ScannerResultGenerator gen = new ScannerResultGenerator(tableName, spec, filter,
114        model.getCaching(), model.getCacheBlocks(), model.getLimit());
115      String id = gen.getID();
116      ScannerInstanceResource instance =
117        new ScannerInstanceResource(tableName, id, gen, model.getBatch());
118      scanners.put(id, instance);
119      if (LOG.isTraceEnabled()) {
120        LOG.trace("new scanner: " + id);
121      }
122      UriBuilder builder = uriInfo.getAbsolutePathBuilder();
123      URI uri = builder.path(id).build();
124      servlet.getMetrics().incrementSucessfulPutRequests(1);
125      return Response.created(uri).build();
126    } catch (Exception e) {
127      LOG.error("Exception occurred while processing " + uriInfo.getAbsolutePath() + " : ", e);
128      servlet.getMetrics().incrementFailedPutRequests(1);
129      if (e instanceof TableNotFoundException) {
130        return Response.status(Response.Status.NOT_FOUND).type(MIMETYPE_TEXT)
131          .entity("Not found" + CRLF).build();
132      } else if (
133        e instanceof RuntimeException
134          || e instanceof JsonMappingException | e instanceof JsonParseException
135      ) {
136        return Response.status(Response.Status.BAD_REQUEST).type(MIMETYPE_TEXT)
137          .entity("Bad request" + CRLF).build();
138      }
139      return Response.status(Response.Status.SERVICE_UNAVAILABLE).type(MIMETYPE_TEXT)
140        .entity("Unavailable" + CRLF).build();
141    }
142  }
143
144  @PUT
145  @Consumes({ MIMETYPE_XML, MIMETYPE_JSON, MIMETYPE_PROTOBUF, MIMETYPE_PROTOBUF_IETF })
146  public Response put(final ScannerModel model, final @Context UriInfo uriInfo) {
147    if (LOG.isTraceEnabled()) {
148      LOG.trace("PUT " + uriInfo.getAbsolutePath());
149    }
150    return update(model, true, uriInfo);
151  }
152
153  @POST
154  @Consumes({ MIMETYPE_XML, MIMETYPE_JSON, MIMETYPE_PROTOBUF, MIMETYPE_PROTOBUF_IETF })
155  public Response post(final ScannerModel model, final @Context UriInfo uriInfo) {
156    if (LOG.isTraceEnabled()) {
157      LOG.trace("POST " + uriInfo.getAbsolutePath());
158    }
159    return update(model, false, uriInfo);
160  }
161
162  @Path("{scanner: .+}")
163  public ScannerInstanceResource getScannerInstanceResource(final @PathParam("scanner") String id)
164    throws IOException {
165    ScannerInstanceResource instance = scanners.getIfPresent(id);
166    if (instance == null) {
167      servlet.getMetrics().incrementFailedGetRequests(1);
168      return new ScannerInstanceResource();
169    } else {
170      servlet.getMetrics().incrementSucessfulGetRequests(1);
171    }
172    return instance;
173  }
174}