001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.thrift;
019
020import static org.apache.hadoop.hbase.thrift.Constants.COALESCE_INC_KEY;
021import static org.apache.hadoop.hbase.util.Bytes.getBytes;
022
023import java.io.IOException;
024import java.nio.ByteBuffer;
025import java.util.ArrayList;
026import java.util.Collections;
027import java.util.HashMap;
028import java.util.List;
029import java.util.Map;
030import java.util.TreeMap;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.hbase.CatalogFamilyFormat;
033import org.apache.hadoop.hbase.Cell;
034import org.apache.hadoop.hbase.CellBuilder;
035import org.apache.hadoop.hbase.CellBuilderFactory;
036import org.apache.hadoop.hbase.CellBuilderType;
037import org.apache.hadoop.hbase.CellUtil;
038import org.apache.hadoop.hbase.DoNotRetryIOException;
039import org.apache.hadoop.hbase.HConstants;
040import org.apache.hadoop.hbase.HRegionLocation;
041import org.apache.hadoop.hbase.KeyValue;
042import org.apache.hadoop.hbase.ServerName;
043import org.apache.hadoop.hbase.TableName;
044import org.apache.hadoop.hbase.TableNotFoundException;
045import org.apache.hadoop.hbase.client.Append;
046import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
047import org.apache.hadoop.hbase.client.Delete;
048import org.apache.hadoop.hbase.client.Durability;
049import org.apache.hadoop.hbase.client.Get;
050import org.apache.hadoop.hbase.client.Increment;
051import org.apache.hadoop.hbase.client.OperationWithAttributes;
052import org.apache.hadoop.hbase.client.Put;
053import org.apache.hadoop.hbase.client.RegionInfo;
054import org.apache.hadoop.hbase.client.RegionLocator;
055import org.apache.hadoop.hbase.client.Result;
056import org.apache.hadoop.hbase.client.ResultScanner;
057import org.apache.hadoop.hbase.client.Scan;
058import org.apache.hadoop.hbase.client.Table;
059import org.apache.hadoop.hbase.client.TableDescriptor;
060import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
061import org.apache.hadoop.hbase.filter.Filter;
062import org.apache.hadoop.hbase.filter.ParseFilter;
063import org.apache.hadoop.hbase.filter.PrefixFilter;
064import org.apache.hadoop.hbase.filter.WhileMatchFilter;
065import org.apache.hadoop.hbase.security.UserProvider;
066import org.apache.hadoop.hbase.security.access.AccessControlClient;
067import org.apache.hadoop.hbase.security.access.Permission;
068import org.apache.hadoop.hbase.thrift.generated.AlreadyExists;
069import org.apache.hadoop.hbase.thrift.generated.BatchMutation;
070import org.apache.hadoop.hbase.thrift.generated.ColumnDescriptor;
071import org.apache.hadoop.hbase.thrift.generated.Hbase;
072import org.apache.hadoop.hbase.thrift.generated.IOError;
073import org.apache.hadoop.hbase.thrift.generated.IllegalArgument;
074import org.apache.hadoop.hbase.thrift.generated.Mutation;
075import org.apache.hadoop.hbase.thrift.generated.TAccessControlEntity;
076import org.apache.hadoop.hbase.thrift.generated.TAppend;
077import org.apache.hadoop.hbase.thrift.generated.TCell;
078import org.apache.hadoop.hbase.thrift.generated.TIncrement;
079import org.apache.hadoop.hbase.thrift.generated.TPermissionScope;
080import org.apache.hadoop.hbase.thrift.generated.TRegionInfo;
081import org.apache.hadoop.hbase.thrift.generated.TRowResult;
082import org.apache.hadoop.hbase.thrift.generated.TScan;
083import org.apache.hadoop.hbase.thrift.generated.TThriftServerType;
084import org.apache.hadoop.hbase.util.Bytes;
085import org.apache.thrift.TException;
086import org.apache.yetus.audience.InterfaceAudience;
087import org.slf4j.Logger;
088import org.slf4j.LoggerFactory;
089
090import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
091
092/**
093 * The HBaseServiceHandler is a glue object that connects Thrift RPC calls to the HBase client API
094 * primarily defined in the Admin and Table objects.
095 */
096@InterfaceAudience.Private
097@SuppressWarnings("deprecation")
098public class ThriftHBaseServiceHandler extends HBaseServiceHandler implements Hbase.Iface {
099  private static final Logger LOG = LoggerFactory.getLogger(ThriftHBaseServiceHandler.class);
100
101  public static final int HREGION_VERSION = 1;
102
103  IncrementCoalescer coalescer;
104
105  /**
106   * Returns a list of all the column families for a given Table.
107   */
108  byte[][] getAllColumns(Table table) throws IOException {
109    ColumnFamilyDescriptor[] cds = table.getDescriptor().getColumnFamilies();
110    byte[][] columns = new byte[cds.length][];
111    for (int i = 0; i < cds.length; i++) {
112      columns[i] = Bytes.add(cds[i].getName(), KeyValue.COLUMN_FAMILY_DELIM_ARRAY);
113    }
114    return columns;
115  }
116
117  protected ThriftHBaseServiceHandler(final Configuration c, final UserProvider userProvider)
118    throws IOException {
119    super(c, userProvider);
120
121    this.coalescer = new IncrementCoalescer(this);
122  }
123
124  @Override
125  public void enableTable(ByteBuffer tableName) throws IOError {
126    try {
127      getAdmin().enableTable(getTableName(tableName));
128    } catch (IOException e) {
129      LOG.warn(e.getMessage(), e);
130      throw getIOError(e);
131    }
132  }
133
134  @Override
135  public void disableTable(ByteBuffer tableName) throws IOError {
136    try {
137      getAdmin().disableTable(getTableName(tableName));
138    } catch (IOException e) {
139      LOG.warn(e.getMessage(), e);
140      throw getIOError(e);
141    }
142  }
143
144  @Override
145  public boolean isTableEnabled(ByteBuffer tableName) throws IOError {
146    try {
147      return this.connectionCache.getAdmin().isTableEnabled(getTableName(tableName));
148    } catch (IOException e) {
149      LOG.warn(e.getMessage(), e);
150      throw getIOError(e);
151    }
152  }
153
154  @Override
155  public Map<ByteBuffer, Boolean> getTableNamesWithIsTableEnabled() throws IOError {
156    try {
157      HashMap<ByteBuffer, Boolean> tables = new HashMap<>();
158      for (ByteBuffer tableName : this.getTableNames()) {
159        tables.put(tableName, this.isTableEnabled(tableName));
160      }
161      return tables;
162    } catch (IOError e) {
163      LOG.warn(e.getMessage(), e);
164      throw getIOError(e);
165    }
166  }
167
168  // ThriftServerRunner.compact should be deprecated and replaced with methods specific to
169  // table and region.
170  @Override
171  public void compact(ByteBuffer tableNameOrRegionName) throws IOError {
172    try {
173      try {
174        getAdmin().compactRegion(getBytes(tableNameOrRegionName));
175      } catch (IllegalArgumentException e) {
176        // Invalid region, try table
177        getAdmin().compact(TableName.valueOf(getBytes(tableNameOrRegionName)));
178      }
179    } catch (IOException e) {
180      LOG.warn(e.getMessage(), e);
181      throw getIOError(e);
182    }
183  }
184
185  // ThriftServerRunner.majorCompact should be deprecated and replaced with methods specific
186  // to table and region.
187  @Override
188  public void majorCompact(ByteBuffer tableNameOrRegionName) throws IOError {
189    try {
190      try {
191        getAdmin().compactRegion(getBytes(tableNameOrRegionName));
192      } catch (IllegalArgumentException e) {
193        // Invalid region, try table
194        getAdmin().compact(TableName.valueOf(getBytes(tableNameOrRegionName)));
195      }
196    } catch (IOException e) {
197      LOG.warn(e.getMessage(), e);
198      throw getIOError(e);
199    }
200  }
201
202  @Override
203  public List<ByteBuffer> getTableNames() throws IOError {
204    try {
205      TableName[] tableNames = this.getAdmin().listTableNames();
206      ArrayList<ByteBuffer> list = new ArrayList<>(tableNames.length);
207      for (TableName tableName : tableNames) {
208        list.add(ByteBuffer.wrap(tableName.getName()));
209      }
210      return list;
211    } catch (IOException e) {
212      LOG.warn(e.getMessage(), e);
213      throw getIOError(e);
214    }
215  }
216
217  /**
218   * Returns the list of regions in the given table, or an empty list if the table does not exist
219   */
220  @Override
221  public List<TRegionInfo> getTableRegions(ByteBuffer tableName) throws IOError {
222    try (RegionLocator locator = connectionCache.getRegionLocator(getBytes(tableName))) {
223      List<HRegionLocation> regionLocations = locator.getAllRegionLocations();
224      List<TRegionInfo> results = new ArrayList<>(regionLocations.size());
225      for (HRegionLocation regionLocation : regionLocations) {
226        RegionInfo info = regionLocation.getRegion();
227        ServerName serverName = regionLocation.getServerName();
228        TRegionInfo region = new TRegionInfo();
229        region.serverName = ByteBuffer.wrap(Bytes.toBytes(serverName.getHostname()));
230        region.port = serverName.getPort();
231        region.startKey = ByteBuffer.wrap(info.getStartKey());
232        region.endKey = ByteBuffer.wrap(info.getEndKey());
233        region.id = info.getRegionId();
234        region.name = ByteBuffer.wrap(info.getRegionName());
235        region.version = HREGION_VERSION; // HRegion now not versioned, PB encoding used
236        results.add(region);
237      }
238      return results;
239    } catch (TableNotFoundException e) {
240      // Return empty list for non-existing table
241      return Collections.emptyList();
242    } catch (IOException e) {
243      LOG.warn(e.getMessage(), e);
244      throw getIOError(e);
245    }
246  }
247
248  @Override
249  public List<TCell> get(ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
250    Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
251    byte[][] famAndQf = CellUtil.parseColumn(getBytes(column));
252    if (famAndQf.length == 1) {
253      return get(tableName, row, famAndQf[0], null, attributes);
254    }
255    if (famAndQf.length == 2) {
256      return get(tableName, row, famAndQf[0], famAndQf[1], attributes);
257    }
258    throw new IllegalArgumentException("Invalid familyAndQualifier provided.");
259  }
260
261  /**
262   * Note: this internal interface is slightly different from public APIs in regard to handling of
263   * the qualifier. Here we differ from the public Java API in that null != byte[0]. Rather, we
264   * respect qual == null as a request for the entire column family. The caller (
265   * {@link #get(ByteBuffer, ByteBuffer, ByteBuffer, Map)}) interface IS consistent in that the
266   * column is parse like normal.
267   */
268  protected List<TCell> get(ByteBuffer tableName, ByteBuffer row, byte[] family, byte[] qualifier,
269    Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
270    Table table = null;
271    try {
272      table = getTable(tableName);
273      Get get = new Get(getBytes(row));
274      addAttributes(get, attributes);
275      if (qualifier == null) {
276        get.addFamily(family);
277      } else {
278        get.addColumn(family, qualifier);
279      }
280      Result result = table.get(get);
281      return ThriftUtilities.cellFromHBase(result.rawCells());
282    } catch (IOException e) {
283      LOG.warn(e.getMessage(), e);
284      throw getIOError(e);
285    } finally {
286      closeTable(table);
287    }
288  }
289
290  @Override
291  public List<TCell> getVer(ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
292    int numVersions, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
293    byte[][] famAndQf = CellUtil.parseColumn(getBytes(column));
294    if (famAndQf.length == 1) {
295      return getVer(tableName, row, famAndQf[0], null, numVersions, attributes);
296    }
297    if (famAndQf.length == 2) {
298      return getVer(tableName, row, famAndQf[0], famAndQf[1], numVersions, attributes);
299    }
300    throw new IllegalArgumentException("Invalid familyAndQualifier provided.");
301
302  }
303
304  /**
305   * Note: this public interface is slightly different from public Java APIs in regard to handling
306   * of the qualifier. Here we differ from the public Java API in that null != byte[0]. Rather, we
307   * respect qual == null as a request for the entire column family. If you want to access the
308   * entire column family, use {@link #getVer(ByteBuffer, ByteBuffer, ByteBuffer, int, Map)} with a
309   * {@code column} value that lacks a {@code ':'}.
310   */
311  public List<TCell> getVer(ByteBuffer tableName, ByteBuffer row, byte[] family, byte[] qualifier,
312    int numVersions, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
313
314    Table table = null;
315    try {
316      table = getTable(tableName);
317      Get get = new Get(getBytes(row));
318      addAttributes(get, attributes);
319      if (null == qualifier) {
320        get.addFamily(family);
321      } else {
322        get.addColumn(family, qualifier);
323      }
324      get.readVersions(numVersions);
325      Result result = table.get(get);
326      return ThriftUtilities.cellFromHBase(result.rawCells());
327    } catch (IOException e) {
328      LOG.warn(e.getMessage(), e);
329      throw getIOError(e);
330    } finally {
331      closeTable(table);
332    }
333  }
334
335  @Override
336  public List<TCell> getVerTs(ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
337    long timestamp, int numVersions, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
338    byte[][] famAndQf = CellUtil.parseColumn(getBytes(column));
339    if (famAndQf.length == 1) {
340      return getVerTs(tableName, row, famAndQf[0], null, timestamp, numVersions, attributes);
341    }
342    if (famAndQf.length == 2) {
343      return getVerTs(tableName, row, famAndQf[0], famAndQf[1], timestamp, numVersions, attributes);
344    }
345    throw new IllegalArgumentException("Invalid familyAndQualifier provided.");
346  }
347
348  /**
349   * Note: this internal interface is slightly different from public APIs in regard to handling of
350   * the qualifier. Here we differ from the public Java API in that null != byte[0]. Rather, we
351   * respect qual == null as a request for the entire column family. The caller (
352   * {@link #getVerTs(ByteBuffer, ByteBuffer, ByteBuffer, long, int, Map)}) interface IS consistent
353   * in that the column is parse like normal.
354   */
355  protected List<TCell> getVerTs(ByteBuffer tableName, ByteBuffer row, byte[] family,
356    byte[] qualifier, long timestamp, int numVersions, Map<ByteBuffer, ByteBuffer> attributes)
357    throws IOError {
358
359    Table table = null;
360    try {
361      table = getTable(tableName);
362      Get get = new Get(getBytes(row));
363      addAttributes(get, attributes);
364      if (null == qualifier) {
365        get.addFamily(family);
366      } else {
367        get.addColumn(family, qualifier);
368      }
369      get.setTimeRange(0, timestamp);
370      get.readVersions(numVersions);
371      Result result = table.get(get);
372      return ThriftUtilities.cellFromHBase(result.rawCells());
373    } catch (IOException e) {
374      LOG.warn(e.getMessage(), e);
375      throw getIOError(e);
376    } finally {
377      closeTable(table);
378    }
379  }
380
381  @Override
382  public List<TRowResult> getRow(ByteBuffer tableName, ByteBuffer row,
383    Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
384    return getRowWithColumnsTs(tableName, row, null, HConstants.LATEST_TIMESTAMP, attributes);
385  }
386
387  @Override
388  public List<TRowResult> getRowWithColumns(ByteBuffer tableName, ByteBuffer row,
389    List<ByteBuffer> columns, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
390    return getRowWithColumnsTs(tableName, row, columns, HConstants.LATEST_TIMESTAMP, attributes);
391  }
392
393  @Override
394  public List<TRowResult> getRowTs(ByteBuffer tableName, ByteBuffer row, long timestamp,
395    Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
396    return getRowWithColumnsTs(tableName, row, null, timestamp, attributes);
397  }
398
399  @Override
400  public List<TRowResult> getRowWithColumnsTs(ByteBuffer tableName, ByteBuffer row,
401    List<ByteBuffer> columns, long timestamp, Map<ByteBuffer, ByteBuffer> attributes)
402    throws IOError {
403
404    Table table = null;
405    try {
406      table = getTable(tableName);
407      if (columns == null) {
408        Get get = new Get(getBytes(row));
409        addAttributes(get, attributes);
410        get.setTimeRange(0, timestamp);
411        Result result = table.get(get);
412        return ThriftUtilities.rowResultFromHBase(result);
413      }
414      Get get = new Get(getBytes(row));
415      addAttributes(get, attributes);
416      for (ByteBuffer column : columns) {
417        byte[][] famAndQf = CellUtil.parseColumn(getBytes(column));
418        if (famAndQf.length == 1) {
419          get.addFamily(famAndQf[0]);
420        } else {
421          get.addColumn(famAndQf[0], famAndQf[1]);
422        }
423      }
424      get.setTimeRange(0, timestamp);
425      Result result = table.get(get);
426      return ThriftUtilities.rowResultFromHBase(result);
427    } catch (IOException e) {
428      LOG.warn(e.getMessage(), e);
429      throw getIOError(e);
430    } finally {
431      closeTable(table);
432    }
433  }
434
435  @Override
436  public List<TRowResult> getRows(ByteBuffer tableName, List<ByteBuffer> rows,
437    Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
438    return getRowsWithColumnsTs(tableName, rows, null, HConstants.LATEST_TIMESTAMP, attributes);
439  }
440
441  @Override
442  public List<TRowResult> getRowsWithColumns(ByteBuffer tableName, List<ByteBuffer> rows,
443    List<ByteBuffer> columns, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
444    return getRowsWithColumnsTs(tableName, rows, columns, HConstants.LATEST_TIMESTAMP, attributes);
445  }
446
447  @Override
448  public List<TRowResult> getRowsTs(ByteBuffer tableName, List<ByteBuffer> rows, long timestamp,
449    Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
450    return getRowsWithColumnsTs(tableName, rows, null, timestamp, attributes);
451  }
452
453  @Override
454  public List<TRowResult> getRowsWithColumnsTs(ByteBuffer tableName, List<ByteBuffer> rows,
455    List<ByteBuffer> columns, long timestamp, Map<ByteBuffer, ByteBuffer> attributes)
456    throws IOError {
457
458    Table table = null;
459    try {
460      List<Get> gets = new ArrayList<>(rows.size());
461      table = getTable(tableName);
462      if (metrics != null) {
463        metrics.incNumRowKeysInBatchGet(rows.size());
464      }
465      for (ByteBuffer row : rows) {
466        Get get = new Get(getBytes(row));
467        addAttributes(get, attributes);
468        if (columns != null) {
469
470          for (ByteBuffer column : columns) {
471            byte[][] famAndQf = CellUtil.parseColumn(getBytes(column));
472            if (famAndQf.length == 1) {
473              get.addFamily(famAndQf[0]);
474            } else {
475              get.addColumn(famAndQf[0], famAndQf[1]);
476            }
477          }
478        }
479        get.setTimeRange(0, timestamp);
480        gets.add(get);
481      }
482      Result[] result = table.get(gets);
483      return ThriftUtilities.rowResultFromHBase(result);
484    } catch (IOException e) {
485      LOG.warn(e.getMessage(), e);
486      throw getIOError(e);
487    } finally {
488      closeTable(table);
489    }
490  }
491
492  @Override
493  public void deleteAll(ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
494    Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
495    deleteAllTs(tableName, row, column, HConstants.LATEST_TIMESTAMP, attributes);
496  }
497
498  @Override
499  public void deleteAllTs(ByteBuffer tableName, ByteBuffer row, ByteBuffer column, long timestamp,
500    Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
501    Table table = null;
502    try {
503      table = getTable(tableName);
504      Delete delete = new Delete(getBytes(row));
505      addAttributes(delete, attributes);
506      byte[][] famAndQf = CellUtil.parseColumn(getBytes(column));
507      if (famAndQf.length == 1) {
508        delete.addFamily(famAndQf[0], timestamp);
509      } else {
510        delete.addColumns(famAndQf[0], famAndQf[1], timestamp);
511      }
512      table.delete(delete);
513
514    } catch (IOException e) {
515      LOG.warn(e.getMessage(), e);
516      throw getIOError(e);
517    } finally {
518      closeTable(table);
519    }
520  }
521
522  @Override
523  public void deleteAllRow(ByteBuffer tableName, ByteBuffer row,
524    Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
525    deleteAllRowTs(tableName, row, HConstants.LATEST_TIMESTAMP, attributes);
526  }
527
528  @Override
529  public void deleteAllRowTs(ByteBuffer tableName, ByteBuffer row, long timestamp,
530    Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
531    Table table = null;
532    try {
533      table = getTable(tableName);
534      Delete delete = new Delete(getBytes(row), timestamp);
535      addAttributes(delete, attributes);
536      table.delete(delete);
537    } catch (IOException e) {
538      LOG.warn(e.getMessage(), e);
539      throw getIOError(e);
540    } finally {
541      closeTable(table);
542    }
543  }
544
545  @Override
546  public void createTable(ByteBuffer in_tableName, List<ColumnDescriptor> columnFamilies)
547    throws IOError, IllegalArgument, AlreadyExists {
548    TableName tableName = getTableName(in_tableName);
549    try {
550      if (getAdmin().tableExists(tableName)) {
551        throw new AlreadyExists("table name already in use");
552      }
553      TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
554      for (ColumnDescriptor col : columnFamilies) {
555        builder.setColumnFamily(ThriftUtilities.colDescFromThrift(col));
556      }
557      getAdmin().createTable(builder.build());
558    } catch (IOException e) {
559      LOG.warn(e.getMessage(), e);
560      throw getIOError(e);
561    } catch (IllegalArgumentException e) {
562      LOG.warn(e.getMessage(), e);
563      throw new IllegalArgument(Throwables.getStackTraceAsString(e));
564    }
565  }
566
567  private static TableName getTableName(ByteBuffer buffer) {
568    return TableName.valueOf(getBytes(buffer));
569  }
570
571  @Override
572  public void deleteTable(ByteBuffer in_tableName) throws IOError {
573    TableName tableName = getTableName(in_tableName);
574    if (LOG.isDebugEnabled()) {
575      LOG.debug("deleteTable: table={}", tableName);
576    }
577    try {
578      if (!getAdmin().tableExists(tableName)) {
579        throw new IOException("table does not exist");
580      }
581      getAdmin().deleteTable(tableName);
582    } catch (IOException e) {
583      LOG.warn(e.getMessage(), e);
584      throw getIOError(e);
585    }
586  }
587
588  @Override
589  public void mutateRow(ByteBuffer tableName, ByteBuffer row, List<Mutation> mutations,
590    Map<ByteBuffer, ByteBuffer> attributes) throws IOError, IllegalArgument {
591    mutateRowTs(tableName, row, mutations, HConstants.LATEST_TIMESTAMP, attributes);
592  }
593
594  @Override
595  public void mutateRowTs(ByteBuffer tableName, ByteBuffer row, List<Mutation> mutations,
596    long timestamp, Map<ByteBuffer, ByteBuffer> attributes) throws IOError, IllegalArgument {
597    Table table = null;
598    try {
599      table = getTable(tableName);
600      Put put = new Put(getBytes(row), timestamp);
601      addAttributes(put, attributes);
602
603      Delete delete = new Delete(getBytes(row));
604      addAttributes(delete, attributes);
605      if (metrics != null) {
606        metrics.incNumRowKeysInBatchMutate(mutations.size());
607      }
608
609      // I apologize for all this mess :)
610      CellBuilder builder = CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY);
611      for (Mutation m : mutations) {
612        byte[][] famAndQf = CellUtil.parseColumn(getBytes(m.column));
613        if (m.isDelete) {
614          if (famAndQf.length == 1) {
615            delete.addFamily(famAndQf[0], timestamp);
616          } else {
617            delete.addColumns(famAndQf[0], famAndQf[1], timestamp);
618          }
619          delete.setDurability(m.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
620        } else {
621          if (famAndQf.length == 1) {
622            LOG.warn("No column qualifier specified. Delete is the only mutation supported "
623              + "over the whole column family.");
624          } else {
625            put.add(builder.clear().setRow(put.getRow()).setFamily(famAndQf[0])
626              .setQualifier(famAndQf[1]).setTimestamp(put.getTimestamp()).setType(Cell.Type.Put)
627              .setValue(m.value != null ? getBytes(m.value) : HConstants.EMPTY_BYTE_ARRAY).build());
628          }
629          put.setDurability(m.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
630        }
631      }
632      if (!delete.isEmpty()) {
633        table.delete(delete);
634      }
635      if (!put.isEmpty()) {
636        table.put(put);
637      }
638    } catch (IOException e) {
639      LOG.warn(e.getMessage(), e);
640      throw getIOError(e);
641    } catch (IllegalArgumentException e) {
642      LOG.warn(e.getMessage(), e);
643      throw new IllegalArgument(Throwables.getStackTraceAsString(e));
644    } finally {
645      closeTable(table);
646    }
647  }
648
649  @Override
650  public void mutateRows(ByteBuffer tableName, List<BatchMutation> rowBatches,
651    Map<ByteBuffer, ByteBuffer> attributes) throws IOError, IllegalArgument, TException {
652    mutateRowsTs(tableName, rowBatches, HConstants.LATEST_TIMESTAMP, attributes);
653  }
654
655  @Override
656  public void mutateRowsTs(ByteBuffer tableName, List<BatchMutation> rowBatches, long timestamp,
657    Map<ByteBuffer, ByteBuffer> attributes) throws IOError, IllegalArgument, TException {
658    List<Put> puts = new ArrayList<>();
659    List<Delete> deletes = new ArrayList<>();
660    CellBuilder builder = CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY);
661    for (BatchMutation batch : rowBatches) {
662      byte[] row = getBytes(batch.row);
663      List<Mutation> mutations = batch.mutations;
664      Delete delete = new Delete(row);
665      addAttributes(delete, attributes);
666      Put put = new Put(row, timestamp);
667      addAttributes(put, attributes);
668      for (Mutation m : mutations) {
669        byte[][] famAndQf = CellUtil.parseColumn(getBytes(m.column));
670        if (m.isDelete) {
671          // no qualifier, family only.
672          if (famAndQf.length == 1) {
673            delete.addFamily(famAndQf[0], timestamp);
674          } else {
675            delete.addColumns(famAndQf[0], famAndQf[1], timestamp);
676          }
677          delete.setDurability(m.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
678        } else {
679          if (famAndQf.length == 1) {
680            LOG.warn("No column qualifier specified. Delete is the only mutation supported "
681              + "over the whole column family.");
682          }
683          if (famAndQf.length == 2) {
684            try {
685              put.add(builder.clear().setRow(put.getRow()).setFamily(famAndQf[0])
686                .setQualifier(famAndQf[1]).setTimestamp(put.getTimestamp()).setType(Cell.Type.Put)
687                .setValue(m.value != null ? getBytes(m.value) : HConstants.EMPTY_BYTE_ARRAY)
688                .build());
689            } catch (IOException e) {
690              throw new IllegalArgumentException(e);
691            }
692          } else {
693            throw new IllegalArgumentException("Invalid famAndQf provided.");
694          }
695          put.setDurability(m.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
696        }
697      }
698      if (!delete.isEmpty()) {
699        deletes.add(delete);
700      }
701      if (!put.isEmpty()) {
702        puts.add(put);
703      }
704    }
705
706    Table table = null;
707    try {
708      table = getTable(tableName);
709      if (!puts.isEmpty()) {
710        table.put(puts);
711      }
712      if (!deletes.isEmpty()) {
713        table.delete(deletes);
714      }
715    } catch (IOException e) {
716      LOG.warn(e.getMessage(), e);
717      throw getIOError(e);
718    } catch (IllegalArgumentException e) {
719      LOG.warn(e.getMessage(), e);
720      throw new IllegalArgument(Throwables.getStackTraceAsString(e));
721    } finally {
722      closeTable(table);
723    }
724  }
725
726  @Override
727  public long atomicIncrement(ByteBuffer tableName, ByteBuffer row, ByteBuffer column, long amount)
728    throws IOError, IllegalArgument, TException {
729    byte[][] famAndQf = CellUtil.parseColumn(getBytes(column));
730    if (famAndQf.length == 1) {
731      return atomicIncrement(tableName, row, famAndQf[0], HConstants.EMPTY_BYTE_ARRAY, amount);
732    }
733    return atomicIncrement(tableName, row, famAndQf[0], famAndQf[1], amount);
734  }
735
736  protected long atomicIncrement(ByteBuffer tableName, ByteBuffer row, byte[] family,
737    byte[] qualifier, long amount) throws IOError, IllegalArgument, TException {
738    Table table = null;
739    try {
740      table = getTable(tableName);
741      return table.incrementColumnValue(getBytes(row), family, qualifier, amount);
742    } catch (IOException e) {
743      LOG.warn(e.getMessage(), e);
744      throw getIOError(e);
745    } finally {
746      closeTable(table);
747    }
748  }
749
750  @Override
751  public void scannerClose(int id) throws IOError, IllegalArgument {
752    LOG.debug("scannerClose: id={}", id);
753    ResultScannerWrapper resultScannerWrapper;
754    try {
755      resultScannerWrapper = removeScanner(id);
756    } catch (IOException e) {
757      LOG.warn(e.getMessage(), e);
758      throw getIOError(e);
759    }
760    if (resultScannerWrapper == null) {
761      LOG.warn("scanner ID is invalid");
762      throw new IllegalArgument("scanner ID is invalid");
763    }
764    resultScannerWrapper.scanner.close();
765  }
766
767  @Override
768  public List<TRowResult> scannerGetList(int id, int nbRows) throws IllegalArgument, IOError {
769    LOG.debug("scannerGetList: id={}", id);
770    ResultScannerWrapper resultScannerWrapper;
771    try {
772      resultScannerWrapper = removeScanner(id);
773    } catch (IOException e) {
774      LOG.warn(e.getMessage(), e);
775      throw getIOError(e);
776    }
777    if (null == resultScannerWrapper) {
778      String message = "scanner ID is invalid";
779      LOG.warn(message);
780      throw new IllegalArgument("scanner ID is invalid");
781    }
782    try {
783      Result[] results;
784      try {
785        results = resultScannerWrapper.scanner.next(nbRows);
786        if (null == results) {
787          return new ArrayList<>();
788        }
789      } catch (IOException e) {
790        LOG.warn(e.getMessage(), e);
791        throw getIOError(e);
792      }
793      return ThriftUtilities.rowResultFromHBase(results, resultScannerWrapper.sortColumns);
794    } finally {
795      addScannerBack(id, resultScannerWrapper);
796    }
797  }
798
799  @Override
800  public List<TRowResult> scannerGet(int id) throws IllegalArgument, IOError {
801    return scannerGetList(id, 1);
802  }
803
804  @Override
805  public int scannerOpenWithScan(ByteBuffer tableName, TScan tScan,
806    Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
807    Table table = null;
808    try {
809      table = getTable(tableName);
810      Scan scan = new Scan();
811      addAttributes(scan, attributes);
812      if (tScan.isSetStartRow()) {
813        scan.withStartRow(tScan.getStartRow());
814      }
815      if (tScan.isSetStopRow()) {
816        scan.withStopRow(tScan.getStopRow());
817      }
818      if (tScan.isSetTimestamp()) {
819        scan.setTimeRange(0, tScan.getTimestamp());
820      }
821      if (tScan.isSetCaching()) {
822        scan.setCaching(tScan.getCaching());
823      }
824      if (tScan.isSetBatchSize()) {
825        scan.setBatch(tScan.getBatchSize());
826      }
827      if (tScan.isSetColumns() && !tScan.getColumns().isEmpty()) {
828        for (ByteBuffer column : tScan.getColumns()) {
829          byte[][] famQf = CellUtil.parseColumn(getBytes(column));
830          if (famQf.length == 1) {
831            scan.addFamily(famQf[0]);
832          } else {
833            scan.addColumn(famQf[0], famQf[1]);
834          }
835        }
836      }
837      if (tScan.isSetFilterString()) {
838        ParseFilter parseFilter = new ParseFilter();
839        scan.setFilter(parseFilter.parseFilterString(tScan.getFilterString()));
840      }
841      if (tScan.isSetReversed()) {
842        scan.setReversed(tScan.isReversed());
843      }
844      if (tScan.isSetCacheBlocks()) {
845        scan.setCacheBlocks(tScan.isCacheBlocks());
846      }
847      return addScanner(table.getScanner(scan), tScan.sortColumns);
848    } catch (IOException e) {
849      LOG.warn(e.getMessage(), e);
850      throw getIOError(e);
851    } finally {
852      closeTable(table);
853    }
854  }
855
856  @Override
857  public int scannerOpen(ByteBuffer tableName, ByteBuffer startRow, List<ByteBuffer> columns,
858    Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
859    Table table = null;
860    try {
861      table = getTable(tableName);
862      Scan scan = new Scan().withStartRow(getBytes(startRow));
863      addAttributes(scan, attributes);
864      if (columns != null && !columns.isEmpty()) {
865        for (ByteBuffer column : columns) {
866          byte[][] famQf = CellUtil.parseColumn(getBytes(column));
867          if (famQf.length == 1) {
868            scan.addFamily(famQf[0]);
869          } else {
870            scan.addColumn(famQf[0], famQf[1]);
871          }
872        }
873      }
874      return addScanner(table.getScanner(scan), false);
875    } catch (IOException e) {
876      LOG.warn(e.getMessage(), e);
877      throw getIOError(e);
878    } finally {
879      closeTable(table);
880    }
881  }
882
883  @Override
884  public int scannerOpenWithStop(ByteBuffer tableName, ByteBuffer startRow, ByteBuffer stopRow,
885    List<ByteBuffer> columns, Map<ByteBuffer, ByteBuffer> attributes) throws IOError, TException {
886    Table table = null;
887    try {
888      table = getTable(tableName);
889      Scan scan = new Scan().withStartRow(getBytes(startRow)).withStopRow(getBytes(stopRow));
890      addAttributes(scan, attributes);
891      if (columns != null && !columns.isEmpty()) {
892        for (ByteBuffer column : columns) {
893          byte[][] famQf = CellUtil.parseColumn(getBytes(column));
894          if (famQf.length == 1) {
895            scan.addFamily(famQf[0]);
896          } else {
897            scan.addColumn(famQf[0], famQf[1]);
898          }
899        }
900      }
901      return addScanner(table.getScanner(scan), false);
902    } catch (IOException e) {
903      LOG.warn(e.getMessage(), e);
904      throw getIOError(e);
905    } finally {
906      closeTable(table);
907    }
908  }
909
910  @Override
911  public int scannerOpenWithPrefix(ByteBuffer tableName, ByteBuffer startAndPrefix,
912    List<ByteBuffer> columns, Map<ByteBuffer, ByteBuffer> attributes) throws IOError, TException {
913    Table table = null;
914    try {
915      table = getTable(tableName);
916      Scan scan = new Scan().withStartRow(getBytes(startAndPrefix));
917      addAttributes(scan, attributes);
918      Filter f = new WhileMatchFilter(new PrefixFilter(getBytes(startAndPrefix)));
919      scan.setFilter(f);
920      if (columns != null && !columns.isEmpty()) {
921        for (ByteBuffer column : columns) {
922          byte[][] famQf = CellUtil.parseColumn(getBytes(column));
923          if (famQf.length == 1) {
924            scan.addFamily(famQf[0]);
925          } else {
926            scan.addColumn(famQf[0], famQf[1]);
927          }
928        }
929      }
930      return addScanner(table.getScanner(scan), false);
931    } catch (IOException e) {
932      LOG.warn(e.getMessage(), e);
933      throw getIOError(e);
934    } finally {
935      closeTable(table);
936    }
937  }
938
939  @Override
940  public int scannerOpenTs(ByteBuffer tableName, ByteBuffer startRow, List<ByteBuffer> columns,
941    long timestamp, Map<ByteBuffer, ByteBuffer> attributes) throws IOError, TException {
942    Table table = null;
943    try {
944      table = getTable(tableName);
945      Scan scan = new Scan().withStartRow(getBytes(startRow));
946      addAttributes(scan, attributes);
947      scan.setTimeRange(0, timestamp);
948      if (columns != null && !columns.isEmpty()) {
949        for (ByteBuffer column : columns) {
950          byte[][] famQf = CellUtil.parseColumn(getBytes(column));
951          if (famQf.length == 1) {
952            scan.addFamily(famQf[0]);
953          } else {
954            scan.addColumn(famQf[0], famQf[1]);
955          }
956        }
957      }
958      return addScanner(table.getScanner(scan), false);
959    } catch (IOException e) {
960      LOG.warn(e.getMessage(), e);
961      throw getIOError(e);
962    } finally {
963      closeTable(table);
964    }
965  }
966
967  @Override
968  public int scannerOpenWithStopTs(ByteBuffer tableName, ByteBuffer startRow, ByteBuffer stopRow,
969    List<ByteBuffer> columns, long timestamp, Map<ByteBuffer, ByteBuffer> attributes)
970    throws IOError, TException {
971    Table table = null;
972    try {
973      table = getTable(tableName);
974      Scan scan = new Scan().withStartRow(getBytes(startRow)).withStopRow(getBytes(stopRow));
975      addAttributes(scan, attributes);
976      scan.setTimeRange(0, timestamp);
977      if (columns != null && !columns.isEmpty()) {
978        for (ByteBuffer column : columns) {
979          byte[][] famQf = CellUtil.parseColumn(getBytes(column));
980          if (famQf.length == 1) {
981            scan.addFamily(famQf[0]);
982          } else {
983            scan.addColumn(famQf[0], famQf[1]);
984          }
985        }
986      }
987      scan.setTimeRange(0, timestamp);
988      return addScanner(table.getScanner(scan), false);
989    } catch (IOException e) {
990      LOG.warn(e.getMessage(), e);
991      throw getIOError(e);
992    } finally {
993      closeTable(table);
994    }
995  }
996
997  @Override
998  public Map<ByteBuffer, ColumnDescriptor> getColumnDescriptors(ByteBuffer tableName)
999    throws IOError, TException {
1000
1001    Table table = null;
1002    try {
1003      TreeMap<ByteBuffer, ColumnDescriptor> columns = new TreeMap<>();
1004
1005      table = getTable(tableName);
1006      TableDescriptor desc = table.getDescriptor();
1007
1008      for (ColumnFamilyDescriptor e : desc.getColumnFamilies()) {
1009        ColumnDescriptor col = ThriftUtilities.colDescFromHbase(e);
1010        columns.put(col.name, col);
1011      }
1012      return columns;
1013    } catch (IOException e) {
1014      LOG.warn(e.getMessage(), e);
1015      throw getIOError(e);
1016    } finally {
1017      closeTable(table);
1018    }
1019  }
1020
1021  private void closeTable(Table table) throws IOError {
1022    try {
1023      if (table != null) {
1024        table.close();
1025      }
1026    } catch (IOException e) {
1027      LOG.error(e.getMessage(), e);
1028      throw getIOError(e);
1029    }
1030  }
1031
1032  @Override
1033  public TRegionInfo getRegionInfo(ByteBuffer searchRow) throws IOError {
1034    try {
1035      byte[] row = getBytes(searchRow);
1036      Result startRowResult =
1037        getReverseScanResult(TableName.META_TABLE_NAME.getName(), row, HConstants.CATALOG_FAMILY);
1038
1039      if (startRowResult == null) {
1040        throw new IOException(
1041          "Cannot find row in " + TableName.META_TABLE_NAME + ", row=" + Bytes.toStringBinary(row));
1042      }
1043
1044      // find region start and end keys
1045      RegionInfo regionInfo = CatalogFamilyFormat.getRegionInfo(startRowResult);
1046      if (regionInfo == null) {
1047        throw new IOException("RegionInfo REGIONINFO was null or " + " empty in Meta for row="
1048          + Bytes.toStringBinary(row));
1049      }
1050      TRegionInfo region = new TRegionInfo();
1051      region.setStartKey(regionInfo.getStartKey());
1052      region.setEndKey(regionInfo.getEndKey());
1053      region.id = regionInfo.getRegionId();
1054      region.setName(regionInfo.getRegionName());
1055      region.version = HREGION_VERSION; // version not used anymore, PB encoding used.
1056
1057      // find region assignment to server
1058      ServerName serverName = CatalogFamilyFormat.getServerName(startRowResult, 0);
1059      if (serverName != null) {
1060        region.setServerName(Bytes.toBytes(serverName.getHostname()));
1061        region.port = serverName.getPort();
1062      }
1063      return region;
1064    } catch (IOException e) {
1065      LOG.warn(e.getMessage(), e);
1066      throw getIOError(e);
1067    }
1068  }
1069
1070  private Result getReverseScanResult(byte[] tableName, byte[] row, byte[] family)
1071    throws IOException {
1072    Scan scan = new Scan().withStartRow(row);
1073    scan.setReversed(true);
1074    scan.addFamily(family);
1075    scan.withStartRow(row);
1076    try (Table table = getTable(tableName); ResultScanner scanner = table.getScanner(scan)) {
1077      return scanner.next();
1078    }
1079  }
1080
1081  @Override
1082  public void increment(TIncrement tincrement) throws IOError, TException {
1083
1084    if (tincrement.getRow().length == 0 || tincrement.getTable().length == 0) {
1085      throw new TException("Must supply a table and a row key; can't increment");
1086    }
1087
1088    if (conf.getBoolean(COALESCE_INC_KEY, false)) {
1089      this.coalescer.queueIncrement(tincrement);
1090      return;
1091    }
1092
1093    Table table = null;
1094    try {
1095      table = getTable(tincrement.getTable());
1096      Increment inc = ThriftUtilities.incrementFromThrift(tincrement);
1097      table.increment(inc);
1098    } catch (IOException e) {
1099      LOG.warn(e.getMessage(), e);
1100      throw getIOError(e);
1101    } finally {
1102      closeTable(table);
1103    }
1104  }
1105
1106  @Override
1107  public void incrementRows(List<TIncrement> tincrements) throws IOError, TException {
1108    if (conf.getBoolean(COALESCE_INC_KEY, false)) {
1109      this.coalescer.queueIncrements(tincrements);
1110      return;
1111    }
1112    for (TIncrement tinc : tincrements) {
1113      increment(tinc);
1114    }
1115  }
1116
1117  @Override
1118  public List<TCell> append(TAppend tappend) throws IOError, TException {
1119    if (tappend.getRow().length == 0 || tappend.getTable().length == 0) {
1120      throw new TException("Must supply a table and a row key; can't append");
1121    }
1122
1123    Table table = null;
1124    try {
1125      table = getTable(tappend.getTable());
1126      Append append = ThriftUtilities.appendFromThrift(tappend);
1127      Result result = table.append(append);
1128      return ThriftUtilities.cellFromHBase(result.rawCells());
1129    } catch (IOException e) {
1130      LOG.warn(e.getMessage(), e);
1131      throw getIOError(e);
1132    } finally {
1133      closeTable(table);
1134    }
1135  }
1136
1137  @Override
1138  public boolean checkAndPut(ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
1139    ByteBuffer value, Mutation mput, Map<ByteBuffer, ByteBuffer> attributes)
1140    throws IOError, IllegalArgument, TException {
1141    Put put;
1142    try {
1143      put = new Put(getBytes(row), HConstants.LATEST_TIMESTAMP);
1144      addAttributes(put, attributes);
1145
1146      byte[][] famAndQf = CellUtil.parseColumn(getBytes(mput.column));
1147      put.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setRow(put.getRow())
1148        .setFamily(famAndQf[0]).setQualifier(famAndQf[1]).setTimestamp(put.getTimestamp())
1149        .setType(Cell.Type.Put)
1150        .setValue(mput.value != null ? getBytes(mput.value) : HConstants.EMPTY_BYTE_ARRAY).build());
1151      put.setDurability(mput.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
1152    } catch (IOException | IllegalArgumentException e) {
1153      LOG.warn(e.getMessage(), e);
1154      throw new IllegalArgument(Throwables.getStackTraceAsString(e));
1155    }
1156
1157    Table table = null;
1158    try {
1159      table = getTable(tableName);
1160      byte[][] famAndQf = CellUtil.parseColumn(getBytes(column));
1161      Table.CheckAndMutateBuilder mutateBuilder =
1162        table.checkAndMutate(getBytes(row), famAndQf[0]).qualifier(famAndQf[1]);
1163      if (value != null) {
1164        return mutateBuilder.ifEquals(getBytes(value)).thenPut(put);
1165      } else {
1166        return mutateBuilder.ifNotExists().thenPut(put);
1167      }
1168    } catch (IOException e) {
1169      LOG.warn(e.getMessage(), e);
1170      throw getIOError(e);
1171    } catch (IllegalArgumentException e) {
1172      LOG.warn(e.getMessage(), e);
1173      throw new IllegalArgument(Throwables.getStackTraceAsString(e));
1174    } finally {
1175      closeTable(table);
1176    }
1177  }
1178
1179  @Override
1180  public TThriftServerType getThriftServerType() {
1181    return TThriftServerType.ONE;
1182  }
1183
1184  @Override
1185  public String getClusterId() throws TException {
1186    return connectionCache.getClusterId();
1187  }
1188
1189  @Override
1190  public boolean grant(TAccessControlEntity info) throws IOError, TException {
1191    Permission.Action[] actions = ThriftUtilities.permissionActionsFromString(info.actions);
1192    try {
1193      if (info.scope == TPermissionScope.NAMESPACE) {
1194        AccessControlClient.grant(connectionCache.getAdmin().getConnection(), info.getNsName(),
1195          info.getUsername(), actions);
1196      } else if (info.scope == TPermissionScope.TABLE) {
1197        TableName tableName = TableName.valueOf(info.getTableName());
1198        AccessControlClient.grant(connectionCache.getAdmin().getConnection(), tableName,
1199          info.getUsername(), null, null, actions);
1200      }
1201    } catch (Throwable t) {
1202      if (t instanceof IOException) {
1203        throw getIOError(t);
1204      } else {
1205        throw getIOError(new DoNotRetryIOException(t.getMessage()));
1206      }
1207    }
1208    return true;
1209  }
1210
1211  @Override
1212  public boolean revoke(TAccessControlEntity info) throws IOError, TException {
1213    Permission.Action[] actions = ThriftUtilities.permissionActionsFromString(info.actions);
1214    try {
1215      if (info.scope == TPermissionScope.NAMESPACE) {
1216        AccessControlClient.revoke(connectionCache.getAdmin().getConnection(), info.getNsName(),
1217          info.getUsername(), actions);
1218      } else if (info.scope == TPermissionScope.TABLE) {
1219        TableName tableName = TableName.valueOf(info.getTableName());
1220        AccessControlClient.revoke(connectionCache.getAdmin().getConnection(), tableName,
1221          info.getUsername(), null, null, actions);
1222      }
1223    } catch (Throwable t) {
1224      if (t instanceof IOException) {
1225        throw getIOError(t);
1226      } else {
1227        throw getIOError(new DoNotRetryIOException(t.getMessage()));
1228      }
1229    }
1230    return true;
1231  }
1232
1233  private static IOError getIOError(Throwable throwable) {
1234    IOError error = new IOErrorWithCause(throwable);
1235    error.setCanRetry(!(throwable instanceof DoNotRetryIOException));
1236    error.setMessage(Throwables.getStackTraceAsString(throwable));
1237    return error;
1238  }
1239
1240  /**
1241   * Adds all the attributes into the Operation object
1242   */
1243  private static void addAttributes(OperationWithAttributes op,
1244    Map<ByteBuffer, ByteBuffer> attributes) {
1245    if (attributes == null || attributes.isEmpty()) {
1246      return;
1247    }
1248    for (Map.Entry<ByteBuffer, ByteBuffer> entry : attributes.entrySet()) {
1249      String name = Bytes.toStringBinary(getBytes(entry.getKey()));
1250      byte[] value = getBytes(entry.getValue());
1251      op.setAttribute(name, value);
1252    }
1253  }
1254
1255  public static class IOErrorWithCause extends IOError {
1256    private final Throwable cause;
1257
1258    public IOErrorWithCause(Throwable cause) {
1259      this.cause = cause;
1260    }
1261
1262    @Override
1263    public synchronized Throwable getCause() {
1264      return cause;
1265    }
1266
1267    @Override
1268    public boolean equals(Object other) {
1269      if (super.equals(other) && other instanceof IOErrorWithCause) {
1270        Throwable otherCause = ((IOErrorWithCause) other).getCause();
1271        if (this.getCause() != null) {
1272          return otherCause != null && this.getCause().equals(otherCause);
1273        } else {
1274          return otherCause == null;
1275        }
1276      }
1277      return false;
1278    }
1279
1280    @Override
1281    public int hashCode() {
1282      int result = super.hashCode();
1283      result = 31 * result + (cause != null ? cause.hashCode() : 0);
1284      return result;
1285    }
1286  }
1287
1288}