001/*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019
020package org.apache.hadoop.hbase.rest.client;
021
022import com.google.protobuf.Descriptors;
023import com.google.protobuf.Message;
024import com.google.protobuf.Service;
025import com.google.protobuf.ServiceException;
026
027import java.io.IOException;
028import java.io.InterruptedIOException;
029import java.io.UnsupportedEncodingException;
030import java.net.URLEncoder;
031import java.util.ArrayList;
032import java.util.Collection;
033import java.util.Iterator;
034import java.util.List;
035import java.util.Map;
036import java.util.Set;
037import java.util.TreeMap;
038import java.util.concurrent.TimeUnit;
039import org.apache.commons.lang3.NotImplementedException;
040import org.apache.hadoop.conf.Configuration;
041import org.apache.hadoop.hbase.Cell;
042import org.apache.hadoop.hbase.CellUtil;
043import org.apache.hadoop.hbase.CompareOperator;
044import org.apache.hadoop.hbase.HBaseConfiguration;
045import org.apache.hadoop.hbase.HConstants;
046import org.apache.hadoop.hbase.HTableDescriptor;
047import org.apache.hadoop.hbase.KeyValue;
048import org.apache.hadoop.hbase.TableName;
049import org.apache.hadoop.hbase.client.Durability;
050import org.apache.hadoop.hbase.filter.Filter;
051import org.apache.yetus.audience.InterfaceAudience;
052import org.slf4j.Logger;
053import org.slf4j.LoggerFactory;
054import org.apache.hadoop.hbase.client.Append;
055import org.apache.hadoop.hbase.client.CheckAndMutate;
056import org.apache.hadoop.hbase.client.CheckAndMutateResult;
057import org.apache.hadoop.hbase.client.Delete;
058import org.apache.hadoop.hbase.client.Get;
059import org.apache.hadoop.hbase.client.Increment;
060import org.apache.hadoop.hbase.client.Put;
061import org.apache.hadoop.hbase.client.RegionLocator;
062import org.apache.hadoop.hbase.client.Result;
063import org.apache.hadoop.hbase.client.ResultScanner;
064import org.apache.hadoop.hbase.client.Row;
065import org.apache.hadoop.hbase.client.RowMutations;
066import org.apache.hadoop.hbase.client.Scan;
067import org.apache.hadoop.hbase.client.Table;
068import org.apache.hadoop.hbase.client.TableDescriptor;
069import org.apache.hadoop.hbase.client.coprocessor.Batch;
070import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
071import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
072import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
073import org.apache.hadoop.hbase.io.TimeRange;
074import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
075import org.apache.hadoop.hbase.rest.Constants;
076import org.apache.hadoop.hbase.rest.model.CellModel;
077import org.apache.hadoop.hbase.rest.model.CellSetModel;
078import org.apache.hadoop.hbase.rest.model.RowModel;
079import org.apache.hadoop.hbase.rest.model.ScannerModel;
080import org.apache.hadoop.hbase.rest.model.TableSchemaModel;
081import org.apache.hadoop.hbase.util.Bytes;
082import org.apache.hadoop.util.StringUtils;
083
084import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
085
086/**
087 * HTable interface to remote tables accessed via REST gateway
088 */
089@InterfaceAudience.Private
090public class RemoteHTable implements Table {
091
092  private static final Logger LOG = LoggerFactory.getLogger(RemoteHTable.class);
093
094  final Client client;
095  final Configuration conf;
096  final byte[] name;
097  final int maxRetries;
098  final long sleepTime;
099
100  @SuppressWarnings("rawtypes")
101  protected String buildRowSpec(final byte[] row, final Map familyMap,
102      final long startTime, final long endTime, final int maxVersions) {
103    StringBuffer sb = new StringBuffer();
104    sb.append('/');
105    sb.append(Bytes.toString(name));
106    sb.append('/');
107    sb.append(toURLEncodedBytes(row));
108    Set families = familyMap.entrySet();
109    if (families != null) {
110      Iterator i = familyMap.entrySet().iterator();
111      sb.append('/');
112      while (i.hasNext()) {
113        Map.Entry e = (Map.Entry)i.next();
114        Collection quals = (Collection)e.getValue();
115        if (quals == null || quals.isEmpty()) {
116          // this is an unqualified family. append the family name and NO ':'
117          sb.append(toURLEncodedBytes((byte[])e.getKey()));
118        } else {
119          Iterator ii = quals.iterator();
120          while (ii.hasNext()) {
121            sb.append(toURLEncodedBytes((byte[])e.getKey()));
122            Object o = ii.next();
123            // Puts use byte[] but Deletes use KeyValue
124            if (o instanceof byte[]) {
125              sb.append(':');
126              sb.append(toURLEncodedBytes((byte[]) o));
127            } else if (o instanceof KeyValue) {
128              if (((KeyValue) o).getQualifierLength() != 0) {
129                sb.append(':');
130                sb.append(toURLEncodedBytes(CellUtil.cloneQualifier((KeyValue) o)));
131              }
132            } else {
133              throw new RuntimeException("object type not handled");
134            }
135            if (ii.hasNext()) {
136              sb.append(',');
137            }
138          }
139        }
140        if (i.hasNext()) {
141          sb.append(',');
142        }
143      }
144    }
145    if (startTime >= 0 && endTime != Long.MAX_VALUE) {
146      sb.append('/');
147      sb.append(startTime);
148      if (startTime != endTime) {
149        sb.append(',');
150        sb.append(endTime);
151      }
152    } else if (endTime != Long.MAX_VALUE) {
153      sb.append('/');
154      sb.append(endTime);
155    }
156    if (maxVersions > 1) {
157      sb.append("?v=");
158      sb.append(maxVersions);
159    }
160    return sb.toString();
161  }
162
163  protected String buildMultiRowSpec(final byte[][] rows, int maxVersions) {
164    StringBuilder sb = new StringBuilder();
165    sb.append('/');
166    sb.append(Bytes.toString(name));
167    sb.append("/multiget/");
168    if (rows == null || rows.length == 0) {
169      return sb.toString();
170    }
171    sb.append("?");
172    for(int i=0; i<rows.length; i++) {
173      byte[] rk = rows[i];
174      if (i != 0) {
175        sb.append('&');
176      }
177      sb.append("row=");
178      sb.append(toURLEncodedBytes(rk));
179    }
180    sb.append("&v=");
181    sb.append(maxVersions);
182
183    return sb.toString();
184  }
185
186  protected Result[] buildResultFromModel(final CellSetModel model) {
187    List<Result> results = new ArrayList<>();
188    for (RowModel row: model.getRows()) {
189      List<Cell> kvs = new ArrayList<>(row.getCells().size());
190      for (CellModel cell: row.getCells()) {
191        byte[][] split = CellUtil.parseColumn(cell.getColumn());
192        byte[] column = split[0];
193        byte[] qualifier = null;
194        if (split.length == 1) {
195          qualifier = HConstants.EMPTY_BYTE_ARRAY;
196        } else if (split.length == 2) {
197          qualifier = split[1];
198        } else {
199          throw new IllegalArgumentException("Invalid familyAndQualifier provided.");
200        }
201        kvs.add(new KeyValue(row.getKey(), column, qualifier,
202          cell.getTimestamp(), cell.getValue()));
203      }
204      results.add(Result.create(kvs));
205    }
206    return results.toArray(new Result[results.size()]);
207  }
208
209  protected CellSetModel buildModelFromPut(Put put) {
210    RowModel row = new RowModel(put.getRow());
211    long ts = put.getTimestamp();
212    for (List<Cell> cells: put.getFamilyCellMap().values()) {
213      for (Cell cell: cells) {
214        row.addCell(new CellModel(CellUtil.cloneFamily(cell), CellUtil.cloneQualifier(cell),
215          ts != HConstants.LATEST_TIMESTAMP ? ts : cell.getTimestamp(),
216          CellUtil.cloneValue(cell)));
217      }
218    }
219    CellSetModel model = new CellSetModel();
220    model.addRow(row);
221    return model;
222  }
223
224  /**
225   * Constructor
226   */
227  public RemoteHTable(Client client, String name) {
228    this(client, HBaseConfiguration.create(), Bytes.toBytes(name));
229  }
230
231  /**
232   * Constructor
233   */
234  public RemoteHTable(Client client, Configuration conf, String name) {
235    this(client, conf, Bytes.toBytes(name));
236  }
237
238  /**
239   * Constructor
240   */
241  public RemoteHTable(Client client, Configuration conf, byte[] name) {
242    this.client = client;
243    this.conf = conf;
244    this.name = name;
245    this.maxRetries = conf.getInt("hbase.rest.client.max.retries", 10);
246    this.sleepTime = conf.getLong("hbase.rest.client.sleep", 1000);
247  }
248
249  public byte[] getTableName() {
250    return name.clone();
251  }
252
253  @Override
254  public TableName getName() {
255    return TableName.valueOf(name);
256  }
257
258  @Override
259  public Configuration getConfiguration() {
260    return conf;
261  }
262
263  @Override
264  @Deprecated
265  public HTableDescriptor getTableDescriptor() throws IOException {
266    StringBuilder sb = new StringBuilder();
267    sb.append('/');
268    sb.append(Bytes.toString(name));
269    sb.append('/');
270    sb.append("schema");
271    for (int i = 0; i < maxRetries; i++) {
272      Response response = client.get(sb.toString(), Constants.MIMETYPE_PROTOBUF);
273      int code = response.getCode();
274      switch (code) {
275      case 200:
276        TableSchemaModel schema = new TableSchemaModel();
277        schema.getObjectFromMessage(response.getBody());
278        return schema.getTableDescriptor();
279      case 509:
280        try {
281          Thread.sleep(sleepTime);
282        } catch (InterruptedException e) {
283          throw (InterruptedIOException)new InterruptedIOException().initCause(e);
284        }
285        break;
286      default:
287        throw new IOException("schema request returned " + code);
288      }
289    }
290    throw new IOException("schema request timed out");
291  }
292
293  @Override
294  public void close() throws IOException {
295    client.shutdown();
296  }
297
298  @Override
299  public Result get(Get get) throws IOException {
300    TimeRange range = get.getTimeRange();
301    String spec = buildRowSpec(get.getRow(), get.getFamilyMap(),
302      range.getMin(), range.getMax(), get.getMaxVersions());
303    if (get.getFilter() != null) {
304      LOG.warn("filters not supported on gets");
305    }
306    Result[] results = getResults(spec);
307    if (results.length > 0) {
308      if (results.length > 1) {
309        LOG.warn("too many results for get (" + results.length + ")");
310      }
311      return results[0];
312    } else {
313      return new Result();
314    }
315  }
316
317  @Override
318  public Result[] get(List<Get> gets) throws IOException {
319    byte[][] rows = new byte[gets.size()][];
320    int maxVersions = 1;
321    int count = 0;
322
323    for(Get g:gets) {
324
325      if ( count == 0 ) {
326        maxVersions = g.getMaxVersions();
327      } else if (g.getMaxVersions() != maxVersions) {
328        LOG.warn("MaxVersions on Gets do not match, using the first in the list ("+maxVersions+")");
329      }
330
331      if (g.getFilter() != null) {
332        LOG.warn("filters not supported on gets");
333      }
334
335      rows[count] = g.getRow();
336      count ++;
337    }
338
339    String spec = buildMultiRowSpec(rows, maxVersions);
340
341    return getResults(spec);
342  }
343
344  private Result[] getResults(String spec) throws IOException {
345    for (int i = 0; i < maxRetries; i++) {
346      Response response = client.get(spec, Constants.MIMETYPE_PROTOBUF);
347      int code = response.getCode();
348      switch (code) {
349        case 200:
350          CellSetModel model = new CellSetModel();
351          model.getObjectFromMessage(response.getBody());
352          Result[] results = buildResultFromModel(model);
353          if ( results.length > 0) {
354            return results;
355          }
356          // fall through
357        case 404:
358          return new Result[0];
359
360        case 509:
361          try {
362            Thread.sleep(sleepTime);
363          } catch (InterruptedException e) {
364            throw (InterruptedIOException)new InterruptedIOException().initCause(e);
365          }
366          break;
367        default:
368          throw new IOException("get request returned " + code);
369      }
370    }
371    throw new IOException("get request timed out");
372  }
373
374  @Override
375  public boolean exists(Get get) throws IOException {
376    LOG.warn("exists() is really get(), just use get()");
377    Result result = get(get);
378    return (result != null && !(result.isEmpty()));
379  }
380
381  @Override
382  public boolean[] exists(List<Get> gets) throws IOException {
383    LOG.warn("exists(List<Get>) is really list of get() calls, just use get()");
384    boolean[] results = new boolean[gets.size()];
385    for (int i = 0; i < results.length; i++) {
386      results[i] = exists(gets.get(i));
387    }
388    return results;
389  }
390
391  @Override
392  public void put(Put put) throws IOException {
393    CellSetModel model = buildModelFromPut(put);
394    StringBuilder sb = new StringBuilder();
395    sb.append('/');
396    sb.append(Bytes.toString(name));
397    sb.append('/');
398    sb.append(toURLEncodedBytes(put.getRow()));
399    for (int i = 0; i < maxRetries; i++) {
400      Response response = client.put(sb.toString(), Constants.MIMETYPE_PROTOBUF,
401        model.createProtobufOutput());
402      int code = response.getCode();
403      switch (code) {
404      case 200:
405        return;
406      case 509:
407        try {
408          Thread.sleep(sleepTime);
409        } catch (InterruptedException e) {
410          throw (InterruptedIOException)new InterruptedIOException().initCause(e);
411        }
412        break;
413      default:
414        throw new IOException("put request failed with " + code);
415      }
416    }
417    throw new IOException("put request timed out");
418  }
419
420  @Override
421  public void put(List<Put> puts) throws IOException {
422    // this is a trick: The gateway accepts multiple rows in a cell set and
423    // ignores the row specification in the URI
424
425    // separate puts by row
426    TreeMap<byte[],List<Cell>> map = new TreeMap<>(Bytes.BYTES_COMPARATOR);
427    for (Put put: puts) {
428      byte[] row = put.getRow();
429      List<Cell> cells = map.get(row);
430      if (cells == null) {
431        cells = new ArrayList<>();
432        map.put(row, cells);
433      }
434      for (List<Cell> l: put.getFamilyCellMap().values()) {
435        cells.addAll(l);
436      }
437    }
438
439    // build the cell set
440    CellSetModel model = new CellSetModel();
441    for (Map.Entry<byte[], List<Cell>> e: map.entrySet()) {
442      RowModel row = new RowModel(e.getKey());
443      for (Cell cell: e.getValue()) {
444        row.addCell(new CellModel(cell));
445      }
446      model.addRow(row);
447    }
448
449    // build path for multiput
450    StringBuilder sb = new StringBuilder();
451    sb.append('/');
452    sb.append(Bytes.toString(name));
453    sb.append("/$multiput"); // can be any nonexistent row
454    for (int i = 0; i < maxRetries; i++) {
455      Response response = client.put(sb.toString(), Constants.MIMETYPE_PROTOBUF,
456        model.createProtobufOutput());
457      int code = response.getCode();
458      switch (code) {
459      case 200:
460        return;
461      case 509:
462        try {
463          Thread.sleep(sleepTime);
464        } catch (InterruptedException e) {
465          throw (InterruptedIOException)new InterruptedIOException().initCause(e);
466        }
467        break;
468      default:
469        throw new IOException("multiput request failed with " + code);
470      }
471    }
472    throw new IOException("multiput request timed out");
473  }
474
475  @Override
476  public void delete(Delete delete) throws IOException {
477    String spec = buildRowSpec(delete.getRow(), delete.getFamilyCellMap(),
478      delete.getTimestamp(), delete.getTimestamp(), 1);
479    for (int i = 0; i < maxRetries; i++) {
480      Response response = client.delete(spec);
481      int code = response.getCode();
482      switch (code) {
483      case 200:
484        return;
485      case 509:
486        try {
487          Thread.sleep(sleepTime);
488        } catch (InterruptedException e) {
489          throw (InterruptedIOException)new InterruptedIOException().initCause(e);
490        }
491        break;
492      default:
493        throw new IOException("delete request failed with " + code);
494      }
495    }
496    throw new IOException("delete request timed out");
497  }
498
499  @Override
500  public void delete(List<Delete> deletes) throws IOException {
501    for (Delete delete: deletes) {
502      delete(delete);
503    }
504  }
505
506  public void flushCommits() throws IOException {
507    // no-op
508  }
509
510  @Override
511  public TableDescriptor getDescriptor() throws IOException {
512    return getTableDescriptor();
513  }
514
515  class Scanner implements ResultScanner {
516
517    String uri;
518
519    public Scanner(Scan scan) throws IOException {
520      ScannerModel model;
521      try {
522        model = ScannerModel.fromScan(scan);
523      } catch (Exception e) {
524        throw new IOException(e);
525      }
526      StringBuffer sb = new StringBuffer();
527      sb.append('/');
528      sb.append(Bytes.toString(name));
529      sb.append('/');
530      sb.append("scanner");
531      for (int i = 0; i < maxRetries; i++) {
532        Response response = client.post(sb.toString(),
533          Constants.MIMETYPE_PROTOBUF, model.createProtobufOutput());
534        int code = response.getCode();
535        switch (code) {
536        case 201:
537          uri = response.getLocation();
538          return;
539        case 509:
540          try {
541            Thread.sleep(sleepTime);
542          } catch (InterruptedException e) {
543            throw (InterruptedIOException)new InterruptedIOException().initCause(e);
544          }
545          break;
546        default:
547          throw new IOException("scan request failed with " + code);
548        }
549      }
550      throw new IOException("scan request timed out");
551    }
552
553    @Override
554    public Result[] next(int nbRows) throws IOException {
555      StringBuilder sb = new StringBuilder(uri);
556      sb.append("?n=");
557      sb.append(nbRows);
558      for (int i = 0; i < maxRetries; i++) {
559        Response response = client.get(sb.toString(),
560          Constants.MIMETYPE_PROTOBUF);
561        int code = response.getCode();
562        switch (code) {
563        case 200:
564          CellSetModel model = new CellSetModel();
565          model.getObjectFromMessage(response.getBody());
566          return buildResultFromModel(model);
567        case 204:
568        case 206:
569          return null;
570        case 509:
571          try {
572            Thread.sleep(sleepTime);
573          } catch (InterruptedException e) {
574            throw (InterruptedIOException)new InterruptedIOException().initCause(e);
575          }
576          break;
577        default:
578          throw new IOException("scanner.next request failed with " + code);
579        }
580      }
581      throw new IOException("scanner.next request timed out");
582    }
583
584    @Override
585    public Result next() throws IOException {
586      Result[] results = next(1);
587      if (results == null || results.length < 1) {
588        return null;
589      }
590      return results[0];
591    }
592
593    class Iter implements Iterator<Result> {
594
595      Result cache;
596
597      public Iter() {
598        try {
599          cache = Scanner.this.next();
600        } catch (IOException e) {
601          LOG.warn(StringUtils.stringifyException(e));
602        }
603      }
604
605      @Override
606      public boolean hasNext() {
607        return cache != null;
608      }
609
610      @Override
611      public Result next() {
612        Result result = cache;
613        try {
614          cache = Scanner.this.next();
615        } catch (IOException e) {
616          LOG.warn(StringUtils.stringifyException(e));
617          cache = null;
618        }
619        return result;
620      }
621
622      @Override
623      public void remove() {
624        throw new RuntimeException("remove() not supported");
625      }
626
627    }
628
629    @Override
630    public Iterator<Result> iterator() {
631      return new Iter();
632    }
633
634    @Override
635    public void close() {
636      try {
637        client.delete(uri);
638      } catch (IOException e) {
639        LOG.warn(StringUtils.stringifyException(e));
640      }
641    }
642
643    @Override
644    public boolean renewLease() {
645      throw new RuntimeException("renewLease() not supported");
646    }
647
648    @Override
649    public ScanMetrics getScanMetrics() {
650      throw new RuntimeException("getScanMetrics() not supported");
651    }
652  }
653
654  @Override
655  public ResultScanner getScanner(Scan scan) throws IOException {
656    return new Scanner(scan);
657  }
658
659  @Override
660  public ResultScanner getScanner(byte[] family) throws IOException {
661    Scan scan = new Scan();
662    scan.addFamily(family);
663    return new Scanner(scan);
664  }
665
666  @Override
667  public ResultScanner getScanner(byte[] family, byte[] qualifier)
668      throws IOException {
669    Scan scan = new Scan();
670    scan.addColumn(family, qualifier);
671    return new Scanner(scan);
672  }
673
674  public boolean isAutoFlush() {
675    return true;
676  }
677
678  @Override
679  @Deprecated
680  public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier,
681      byte[] value, Put put) throws IOException {
682    return doCheckAndPut(row, family, qualifier, value, put);
683  }
684
685  private boolean doCheckAndPut(byte[] row, byte[] family, byte[] qualifier,
686      byte[] value, Put put) throws IOException {
687    // column to check-the-value
688    put.add(new KeyValue(row, family, qualifier, value));
689
690    CellSetModel model = buildModelFromPut(put);
691    StringBuilder sb = new StringBuilder();
692    sb.append('/');
693    sb.append(Bytes.toString(name));
694    sb.append('/');
695    sb.append(toURLEncodedBytes(put.getRow()));
696    sb.append("?check=put");
697
698    for (int i = 0; i < maxRetries; i++) {
699      Response response = client.put(sb.toString(),
700          Constants.MIMETYPE_PROTOBUF, model.createProtobufOutput());
701      int code = response.getCode();
702      switch (code) {
703      case 200:
704        return true;
705      case 304: // NOT-MODIFIED
706        return false;
707      case 509:
708        try {
709          Thread.sleep(sleepTime);
710        } catch (final InterruptedException e) {
711          throw (InterruptedIOException)new InterruptedIOException().initCause(e);
712        }
713        break;
714      default:
715        throw new IOException("checkAndPut request failed with " + code);
716      }
717    }
718    throw new IOException("checkAndPut request timed out");
719  }
720
721  @Override
722  @Deprecated
723  public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier,
724      CompareOp compareOp, byte[] value, Put put) throws IOException {
725    throw new IOException("checkAndPut for non-equal comparison not implemented");
726  }
727
728  @Override
729  @Deprecated
730  public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier,
731                             CompareOperator compareOp, byte[] value, Put put) throws IOException {
732    throw new IOException("checkAndPut for non-equal comparison not implemented");
733  }
734
735  @Override
736  public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier,
737      byte[] value, Delete delete) throws IOException {
738    return doCheckAndDelete(row, family, qualifier, value, delete);
739  }
740
741  private boolean doCheckAndDelete(byte[] row, byte[] family, byte[] qualifier,
742      byte[] value, Delete delete) throws IOException {
743    Put put = new Put(row);
744    put.setFamilyCellMap(delete.getFamilyCellMap());
745    // column to check-the-value
746    put.add(new KeyValue(row, family, qualifier, value));
747    CellSetModel model = buildModelFromPut(put);
748    StringBuilder sb = new StringBuilder();
749    sb.append('/');
750    sb.append(Bytes.toString(name));
751    sb.append('/');
752    sb.append(toURLEncodedBytes(row));
753    sb.append("?check=delete");
754
755    for (int i = 0; i < maxRetries; i++) {
756      Response response = client.put(sb.toString(),
757          Constants.MIMETYPE_PROTOBUF, model.createProtobufOutput());
758      int code = response.getCode();
759      switch (code) {
760      case 200:
761        return true;
762      case 304: // NOT-MODIFIED
763        return false;
764      case 509:
765        try {
766          Thread.sleep(sleepTime);
767        } catch (final InterruptedException e) {
768          throw (InterruptedIOException)new InterruptedIOException().initCause(e);
769        }
770        break;
771      default:
772        throw new IOException("checkAndDelete request failed with " + code);
773      }
774    }
775    throw new IOException("checkAndDelete request timed out");
776  }
777
778  @Override
779  @Deprecated
780  public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier,
781      CompareOp compareOp, byte[] value, Delete delete) throws IOException {
782    throw new IOException("checkAndDelete for non-equal comparison not implemented");
783  }
784
785  @Override
786  @Deprecated
787  public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier,
788                                CompareOperator compareOp, byte[] value, Delete delete) throws IOException {
789    throw new IOException("checkAndDelete for non-equal comparison not implemented");
790  }
791
792  @Override
793  public CheckAndMutateBuilder checkAndMutate(byte[] row, byte[] family) {
794    return new CheckAndMutateBuilderImpl(row, family);
795  }
796
797  @Override
798  public CheckAndMutateWithFilterBuilder checkAndMutate(byte[] row, Filter filter) {
799    throw new NotImplementedException("Implement later");
800  }
801
802  @Override
803  @Deprecated
804  public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier,
805      CompareOp compareOp, byte[] value, RowMutations rm) throws IOException {
806    throw new UnsupportedOperationException("checkAndMutate not implemented");
807  }
808
809  @Override
810  @Deprecated
811  public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier,
812      CompareOperator compareOp, byte[] value, RowMutations rm) throws IOException {
813    throw new UnsupportedOperationException("checkAndMutate not implemented");
814  }
815
816  public CheckAndMutateResult checkAndMutate(CheckAndMutate checkAndMutate) {
817    throw new NotImplementedException("Implement later");
818  }
819
820  @Override
821  public List<CheckAndMutateResult> checkAndMutate(List<CheckAndMutate> checkAndMutates) {
822    throw new NotImplementedException("Implement later");
823  }
824
825  @Override
826  public Result increment(Increment increment) throws IOException {
827    throw new IOException("Increment not supported");
828  }
829
830  @Override
831  public Result append(Append append) throws IOException {
832    throw new IOException("Append not supported");
833  }
834
835  @Override
836  public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier,
837      long amount) throws IOException {
838    throw new IOException("incrementColumnValue not supported");
839  }
840
841  @Override
842  public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier,
843      long amount, Durability durability) throws IOException {
844    throw new IOException("incrementColumnValue not supported");
845  }
846
847  @Override
848  public void batch(List<? extends Row> actions, Object[] results) throws IOException {
849    throw new IOException("batch not supported");
850  }
851
852  @Override
853  public <R> void batchCallback(List<? extends Row> actions, Object[] results,
854      Batch.Callback<R> callback) throws IOException, InterruptedException {
855    throw new IOException("batchCallback not supported");
856  }
857
858  @Override
859  public CoprocessorRpcChannel coprocessorService(byte[] row) {
860    throw new UnsupportedOperationException("coprocessorService not implemented");
861  }
862
863  @Override
864  public <T extends Service, R> Map<byte[], R> coprocessorService(Class<T> service,
865      byte[] startKey, byte[] endKey, Batch.Call<T, R> callable)
866      throws ServiceException, Throwable {
867    throw new UnsupportedOperationException("coprocessorService not implemented");
868  }
869
870  @Override
871  public <T extends Service, R> void coprocessorService(Class<T> service,
872      byte[] startKey, byte[] endKey, Batch.Call<T, R> callable, Batch.Callback<R> callback)
873      throws ServiceException, Throwable {
874    throw new UnsupportedOperationException("coprocessorService not implemented");
875  }
876
877  @Override
878  public Result mutateRow(RowMutations rm) throws IOException {
879    throw new IOException("atomicMutation not supported");
880  }
881
882  @Override
883  public <R extends Message> Map<byte[], R> batchCoprocessorService(
884      Descriptors.MethodDescriptor method, Message request,
885      byte[] startKey, byte[] endKey, R responsePrototype) throws ServiceException, Throwable {
886    throw new UnsupportedOperationException("batchCoprocessorService not implemented");
887  }
888
889  @Override
890  public <R extends Message> void batchCoprocessorService(
891      Descriptors.MethodDescriptor method, Message request,
892      byte[] startKey, byte[] endKey, R responsePrototype, Callback<R> callback)
893      throws ServiceException, Throwable {
894    throw new UnsupportedOperationException("batchCoprocessorService not implemented");
895  }
896
897  @Override
898  @Deprecated
899  public void setOperationTimeout(int operationTimeout) {
900    throw new UnsupportedOperationException();
901  }
902
903  @Override
904  @Deprecated
905  public int getOperationTimeout() {
906    throw new UnsupportedOperationException();
907  }
908
909  @Override
910  @Deprecated
911  public void setRpcTimeout(int rpcTimeout) {
912    throw new UnsupportedOperationException();
913  }
914
915  @Override
916  public long getReadRpcTimeout(TimeUnit unit) {
917    throw new UnsupportedOperationException();
918  }
919
920  @Override
921  @Deprecated
922  public int getRpcTimeout() {
923    throw new UnsupportedOperationException();
924  }
925
926  @Override
927  public long getRpcTimeout(TimeUnit unit) {
928    throw new UnsupportedOperationException();
929  }
930
931  @Override
932  @Deprecated
933  public int getReadRpcTimeout() {
934    throw new UnsupportedOperationException();
935  }
936
937  @Override
938  @Deprecated
939  public void setReadRpcTimeout(int readRpcTimeout) {
940    throw new UnsupportedOperationException();
941  }
942
943  @Override
944  public long getWriteRpcTimeout(TimeUnit unit) {
945    throw new UnsupportedOperationException();
946  }
947
948  @Override
949  @Deprecated
950  public int getWriteRpcTimeout() {
951    throw new UnsupportedOperationException();
952  }
953
954  @Override
955  @Deprecated
956  public void setWriteRpcTimeout(int writeRpcTimeout) {
957    throw new UnsupportedOperationException();
958  }
959
960  @Override
961  public long getOperationTimeout(TimeUnit unit) {
962    throw new UnsupportedOperationException();
963  }
964
965  /*
966   * Only a small subset of characters are valid in URLs.
967   *
968   * Row keys, column families, and qualifiers cannot be appended to URLs without first URL
969   * escaping. Table names are ok because they can only contain alphanumeric, ".","_", and "-"
970   * which are valid characters in URLs.
971   */
972  private static String toURLEncodedBytes(byte[] row) {
973    try {
974      return URLEncoder.encode(new String(row, "UTF-8"), "UTF-8");
975    } catch (UnsupportedEncodingException e) {
976      throw new IllegalStateException("URLEncoder doesn't support UTF-8", e);
977    }
978  }
979
980  private class CheckAndMutateBuilderImpl implements CheckAndMutateBuilder {
981
982    private final byte[] row;
983    private final byte[] family;
984    private byte[] qualifier;
985    private byte[] value;
986
987    CheckAndMutateBuilderImpl(byte[] row, byte[] family) {
988      this.row = Preconditions.checkNotNull(row, "row is null");
989      this.family = Preconditions.checkNotNull(family, "family is null");
990    }
991
992    @Override
993    public CheckAndMutateBuilder qualifier(byte[] qualifier) {
994      this.qualifier = Preconditions.checkNotNull(qualifier, "qualifier is null. Consider using" +
995          " an empty byte array, or just do not call this method if you want a null qualifier");
996      return this;
997    }
998
999    @Override
1000    public CheckAndMutateBuilder timeRange(TimeRange timeRange) {
1001      throw new UnsupportedOperationException("timeRange not implemented");
1002    }
1003
1004    @Override
1005    public CheckAndMutateBuilder ifNotExists() {
1006      throw new UnsupportedOperationException("CheckAndMutate for non-equal comparison "
1007          + "not implemented");
1008    }
1009
1010    @Override
1011    public CheckAndMutateBuilder ifMatches(CompareOperator compareOp, byte[] value) {
1012      if (compareOp == CompareOperator.EQUAL) {
1013        this.value = Preconditions.checkNotNull(value, "value is null");
1014        return this;
1015      } else {
1016        throw new UnsupportedOperationException("CheckAndMutate for non-equal comparison " +
1017            "not implemented");
1018      }
1019    }
1020
1021    @Override
1022    public CheckAndMutateBuilder ifEquals(byte[] value) {
1023      this.value = Preconditions.checkNotNull(value, "value is null");
1024      return this;
1025    }
1026
1027    @Override
1028    public boolean thenPut(Put put) throws IOException {
1029      return doCheckAndPut(row, family, qualifier, value, put);
1030    }
1031
1032    @Override
1033    public boolean thenDelete(Delete delete) throws IOException {
1034      return doCheckAndDelete(row, family, qualifier, value, delete);
1035    }
1036
1037    @Override
1038    public boolean thenMutate(RowMutations mutation) throws IOException {
1039      throw new UnsupportedOperationException("thenMutate not implemented");
1040    }
1041  }
1042
1043  @Override
1044  public RegionLocator getRegionLocator() throws IOException {
1045    throw new UnsupportedOperationException();
1046  }
1047}