View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.protobuf;
21  
22  
23  import com.google.protobuf.ServiceException;
24
25  import java.io.IOException;
26  import java.util.ArrayList;
27  import java.util.Iterator;
28  import java.util.List;
29  import java.util.Map;
30  import java.util.NavigableMap;
31  import java.util.UUID;
32
33  import org.apache.hadoop.fs.Path;
34  import org.apache.hadoop.hbase.Cell;
35  import org.apache.hadoop.hbase.CellScanner;
36  import org.apache.hadoop.hbase.CellUtil;
37  import org.apache.hadoop.hbase.HConstants;
38  import org.apache.hadoop.hbase.classification.InterfaceAudience;
39  import org.apache.hadoop.hbase.io.SizedCellScanner;
40  import org.apache.hadoop.hbase.ipc.HBaseRpcController;
41  import org.apache.hadoop.hbase.ipc.HBaseRpcControllerImpl;
42  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
43  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
44  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
45  import org.apache.hadoop.hbase.protobuf.generated.WALProtos;
46  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
47  import org.apache.hadoop.hbase.util.ByteStringer;
48  import org.apache.hadoop.hbase.util.Pair;
49  import org.apache.hadoop.hbase.wal.WAL.Entry;
50  import org.apache.hadoop.hbase.wal.WALKey;
51
52  @InterfaceAudience.Private
53  public class ReplicationProtbufUtil {
54    /**
55     * A helper to replicate a list of WAL entries using admin protocol.
56     * @param admin Admin service
57     * @param entries Array of WAL entries to be replicated
58     * @param replicationClusterId Id which will uniquely identify source cluster FS client
59     *          configurations in the replication configuration directory
60     * @param sourceBaseNamespaceDir Path to source cluster base namespace directory
61     * @param sourceHFileArchiveDir Path to the source cluster hfile archive directory
62     * @throws java.io.IOException
63     */
64    public static void replicateWALEntry(final AdminService.BlockingInterface admin,
65        final Entry[] entries, String replicationClusterId, Path sourceBaseNamespaceDir,
66        Path sourceHFileArchiveDir) throws IOException {
67      Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> p =
68          buildReplicateWALEntryRequest(entries, null, replicationClusterId, sourceBaseNamespaceDir,
69            sourceHFileArchiveDir);
70      HBaseRpcController controller = new HBaseRpcControllerImpl(p.getSecond());
71      try {
72        admin.replicateWALEntry(controller, p.getFirst());
73      } catch (ServiceException se) {
74        throw ProtobufUtil.getRemoteException(se);
75      }
76    }
77
78    /**
79     * Create a new ReplicateWALEntryRequest from a list of WAL entries
80     *
81     * @param entries the WAL entries to be replicated
82     * @return a pair of ReplicateWALEntryRequest and a CellScanner over all the WALEdit values
83     * found.
84     */
85    public static Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner>
86        buildReplicateWALEntryRequest(final Entry[] entries) {
87      return buildReplicateWALEntryRequest(entries, null, null, null, null);
88    }
89
90    /**
91     * Create a new ReplicateWALEntryRequest from a list of WAL entries
92     * @param entries the WAL entries to be replicated
93     * @param encodedRegionName alternative region name to use if not null
94     * @param replicationClusterId Id which will uniquely identify source cluster FS client
95     *          configurations in the replication configuration directory
96     * @param sourceBaseNamespaceDir Path to source cluster base namespace directory
97     * @param sourceHFileArchiveDir Path to the source cluster hfile archive directory
98     * @return a pair of ReplicateWALEntryRequest and a CellScanner over all the WALEdit values found.
99     */
100   public static Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner>
101       buildReplicateWALEntryRequest(final Entry[] entries, byte[] encodedRegionName,
102           String replicationClusterId, Path sourceBaseNamespaceDir, Path sourceHFileArchiveDir) {
103     // Accumulate all the Cells seen in here.
104     List<List<? extends Cell>> allCells = new ArrayList<List<? extends Cell>>(entries.length);
105     int size = 0;
106     WALProtos.FamilyScope.Builder scopeBuilder = WALProtos.FamilyScope.newBuilder();
107     AdminProtos.WALEntry.Builder entryBuilder = AdminProtos.WALEntry.newBuilder();
108     AdminProtos.ReplicateWALEntryRequest.Builder builder =
109       AdminProtos.ReplicateWALEntryRequest.newBuilder();
110     HBaseProtos.UUID.Builder uuidBuilder = HBaseProtos.UUID.newBuilder();
111     for (Entry entry: entries) {
112       entryBuilder.clear();
113       // TODO: this duplicates a lot in WALKey#getBuilder
114       WALProtos.WALKey.Builder keyBuilder = entryBuilder.getKeyBuilder();
115       WALKey key = entry.getKey();
116       keyBuilder.setEncodedRegionName(
117         ByteStringer.wrap(encodedRegionName == null
118             ? key.getEncodedRegionName()
119             : encodedRegionName));
120       keyBuilder.setTableName(ByteStringer.wrap(key.getTablename().getName()));
121       keyBuilder.setLogSequenceNumber(key.getLogSeqNum());
122       keyBuilder.setWriteTime(key.getWriteTime());
123       if (key.getNonce() != HConstants.NO_NONCE) {
124         keyBuilder.setNonce(key.getNonce());
125       }
126       if (key.getNonceGroup() != HConstants.NO_NONCE) {
127         keyBuilder.setNonceGroup(key.getNonceGroup());
128       }
129       for(UUID clusterId : key.getClusterIds()) {
130         uuidBuilder.setLeastSigBits(clusterId.getLeastSignificantBits());
131         uuidBuilder.setMostSigBits(clusterId.getMostSignificantBits());
132         keyBuilder.addClusterIds(uuidBuilder.build());
133       }
134       if(key.getOrigLogSeqNum() > 0) {
135         keyBuilder.setOrigSequenceNumber(key.getOrigLogSeqNum());
136       }
137       WALEdit edit = entry.getEdit();
138       NavigableMap<byte[], Integer> scopes = key.getReplicationScopes();
139       if (scopes != null && !scopes.isEmpty()) {
140         for (Map.Entry<byte[], Integer> scope: scopes.entrySet()) {
141           scopeBuilder.setFamily(ByteStringer.wrap(scope.getKey()));
142           WALProtos.ScopeType scopeType =
143               WALProtos.ScopeType.valueOf(scope.getValue().intValue());
144           scopeBuilder.setScopeType(scopeType);
145           keyBuilder.addScopes(scopeBuilder.build());
146         }
147       }
148       List<Cell> cells = edit.getCells();
149       // Add up the size.  It is used later serializing out the kvs.
150       for (Cell cell: cells) {
151         size += CellUtil.estimatedSerializedSizeOf(cell);
152       }
153       // Collect up the cells
154       allCells.add(cells);
155       // Write out how many cells associated with this entry.
156       entryBuilder.setAssociatedCellCount(cells.size());
157       builder.addEntry(entryBuilder.build());
158     }
159
160     if (replicationClusterId != null) {
161       builder.setReplicationClusterId(replicationClusterId);
162     }
163     if (sourceBaseNamespaceDir != null) {
164       builder.setSourceBaseNamespaceDirPath(sourceBaseNamespaceDir.toString());
165     }
166     if (sourceHFileArchiveDir != null) {
167       builder.setSourceHFileArchiveDirPath(sourceHFileArchiveDir.toString());
168     }
169
170     return new Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner>(builder.build(),
171       getCellScanner(allCells, size));
172   }
173
174   /**
175    * @param cells
176    * @return <code>cells</code> packaged as a CellScanner
177    */
178   static CellScanner getCellScanner(final List<List<? extends Cell>> cells, final int size) {
179     return new SizedCellScanner() {
180       private final Iterator<List<? extends Cell>> entries = cells.iterator();
181       private Iterator<? extends Cell> currentIterator = null;
182       private Cell currentCell;
183
184       @Override
185       public Cell current() {
186         return this.currentCell;
187       }
188
189       @Override
190       public boolean advance() {
191         if (this.currentIterator == null) {
192           if (!this.entries.hasNext()) return false;
193           this.currentIterator = this.entries.next().iterator();
194         }
195         if (this.currentIterator.hasNext()) {
196           this.currentCell = this.currentIterator.next();
197           return true;
198         }
199         this.currentCell = null;
200         this.currentIterator = null;
201         return advance();
202       }
203
204       @Override
205       public long heapSize() {
206         return size;
207       }
208     };
209   }
210 }