Skip to content

Commit

Permalink
add dropPartitionsByNames api in client
Browse files Browse the repository at this point in the history
  • Loading branch information
wecharyu committed Jan 15, 2024
1 parent 41bf6f6 commit ea8ab97
Show file tree
Hide file tree
Showing 8 changed files with 238 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,11 @@
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.PartitionDropOptions;
import org.apache.hadoop.hive.metastore.Warehouse;
import org.apache.hadoop.hive.metastore.api.DropPartitionsExpr;
import org.apache.hadoop.hive.metastore.api.EnvironmentContext;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.RequestPartsSpec;
import org.apache.hadoop.hive.metastore.api.SerDeInfo;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
Expand Down Expand Up @@ -1085,6 +1087,23 @@ public void preDropPartitions(org.apache.hadoop.hive.metastore.api.Table hmsTabl
context.putToProperties(HiveMetaStoreClient.SKIP_DROP_PARTITION, "true");
}

@Override
public void preDropPartitions(org.apache.hadoop.hive.metastore.api.Table hmsTable,
EnvironmentContext context, RequestPartsSpec partsSpec) throws MetaException {
if (partsSpec.isSetExprs()) {
List<DropPartitionsExpr> exprs = partsSpec.getExprs();
List<org.apache.commons.lang3.tuple.Pair<Integer, byte[]>> partExprs = Lists.newArrayList();
for (DropPartitionsExpr expr : exprs) {
partExprs.add(
org.apache.commons.lang3.tuple.Pair.of(expr.getPartArchiveLevel(), expr.getExpr()));
}
preDropPartitions(hmsTable, context, partExprs);
} else if (partsSpec.isSetNames()) {
preTruncateTable(hmsTable, context, partsSpec.getNames());
context.putToProperties(HiveMetaStoreClient.SKIP_DROP_PARTITION, "true");
}
}

private class PreAlterTableProperties {
private String tableLocation;
private String format;
Expand Down
29 changes: 26 additions & 3 deletions ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,10 @@
import org.apache.hadoop.hive.metastore.api.AbortTxnsRequest;
import org.apache.hadoop.hive.metastore.api.CompactionRequest;
import org.apache.hadoop.hive.metastore.api.CreateTableRequest;
import org.apache.hadoop.hive.metastore.api.DropPartitionsExpr;
import org.apache.hadoop.hive.metastore.api.GetPartitionsByNamesRequest;
import org.apache.hadoop.hive.metastore.api.GetTableRequest;
import org.apache.hadoop.hive.metastore.api.RequestPartsSpec;
import org.apache.hadoop.hive.metastore.api.SourceTable;
import org.apache.hadoop.hive.metastore.api.UpdateTransactionalStatsRequest;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
Expand Down Expand Up @@ -3994,6 +3996,27 @@ public boolean dropPartition(String dbName, String tableName, List<String> parti
public List<Partition> dropPartitions(String dbName, String tableName,
List<Pair<Integer, byte[]>> partitionExpressions,
PartitionDropOptions dropOptions) throws HiveException {
RequestPartsSpec rps = new RequestPartsSpec();
List<DropPartitionsExpr> exprs = new ArrayList<>(partitionExpressions.size());
for (Pair<Integer, byte[]> partExpr : partitionExpressions) {
DropPartitionsExpr dpe = new DropPartitionsExpr();
dpe.setExpr(partExpr.getRight());
dpe.setPartArchiveLevel(partExpr.getLeft());
exprs.add(dpe);
}
rps.setExprs(exprs);
return dropPartitions(dbName, tableName, rps, dropOptions);
}

public List<Partition> dropPartitionsByNames(String dbName, String tableName,
List<String> partitionNames, PartitionDropOptions dropOptions) throws HiveException {
RequestPartsSpec partsSpec = new RequestPartsSpec();
partsSpec.setNames(partitionNames);
return dropPartitions(dbName, tableName, partsSpec, dropOptions);
}

public List<Partition> dropPartitions(String dbName, String tableName,
RequestPartsSpec partsSpec, PartitionDropOptions dropOptions) throws HiveException {
try {
Table table = getTable(dbName, tableName);
if (!dropOptions.deleteData) {
Expand All @@ -4002,11 +4025,11 @@ public List<Partition> dropPartitions(String dbName, String tableName,
dropOptions.setWriteId(snapshot.getWriteId());
}
long txnId = Optional.ofNullable(SessionState.get())
.map(ss -> ss.getTxnMgr().getCurrentTxnId()).orElse(0L);
.map(ss -> ss.getTxnMgr().getCurrentTxnId()).orElse(0L);
dropOptions.setTxnId(txnId);
}
List<org.apache.hadoop.hive.metastore.api.Partition> partitions = getMSC().dropPartitions(dbName, tableName,
partitionExpressions, dropOptions);
List<org.apache.hadoop.hive.metastore.api.Partition> partitions = getMSC().dropPartitions(
getDefaultCatalog(conf), dbName, tableName, partsSpec, dropOptions);
return convertFromMetastore(table, partitions);
} catch (NoSuchObjectException e) {
throw new HiveException("Partition or table doesn't exist.", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import org.apache.hadoop.hive.metastore.api.ConfigValSecurityException;
import org.apache.hadoop.hive.metastore.api.CreateTableRequest;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.DropPartitionsExpr;
import org.apache.hadoop.hive.metastore.api.EnvironmentContext;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.ForeignKeysRequest;
Expand Down Expand Up @@ -98,6 +99,7 @@
import org.apache.hadoop.hive.metastore.api.PrimaryKeysRequest;
import org.apache.hadoop.hive.metastore.api.PrimaryKeysResponse;
import org.apache.hadoop.hive.metastore.api.PrincipalPrivilegeSet;
import org.apache.hadoop.hive.metastore.api.RequestPartsSpec;
import org.apache.hadoop.hive.metastore.api.SetPartitionsStatsRequest;
import org.apache.hadoop.hive.metastore.api.TableMeta;
import org.apache.hadoop.hive.metastore.api.TableStatsRequest;
Expand All @@ -107,6 +109,7 @@
import org.apache.hadoop.hive.metastore.api.UniqueConstraintsResponse;
import org.apache.hadoop.hive.metastore.api.UnknownDBException;
import org.apache.hadoop.hive.metastore.api.UnknownTableException;
import org.apache.hadoop.hive.metastore.cache.CachedStore;
import org.apache.hadoop.hive.metastore.client.builder.PartitionBuilder;
import org.apache.hadoop.hive.metastore.parser.ExpressionTree;
import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy;
Expand Down Expand Up @@ -1437,26 +1440,38 @@ public boolean dropPartition(String catName, String dbName, String tableName, St

@Override
public List<Partition> dropPartitions(String catName, String dbName, String tblName,
List<Pair<Integer, byte[]>> partExprs, PartitionDropOptions options) throws TException {
RequestPartsSpec partsSpec, PartitionDropOptions options)
throws NoSuchObjectException, MetaException, TException {
org.apache.hadoop.hive.metastore.api.Table table = getTempTable(dbName, tblName);
if (table == null) {
return super.dropPartitions(catName, dbName, tblName, partExprs, options);
return super.dropPartitions(catName, dbName, tblName, partsSpec, options);
}
TempTable tt = getPartitionedTempTable(table);
List<List<String>> partValues = new ArrayList<>();
if (partsSpec.isSetExprs()) {
List<DropPartitionsExpr> exprs = partsSpec.getExprs();
for (DropPartitionsExpr expr : exprs) {
String filter = generateJDOFilter(table, expr.getExpr(), conf.get(HiveConf.ConfVars.DEFAULT_PARTITION_NAME.varname));
List<Partition> partitions = tt.listPartitionsByFilter(filter);
for (Partition p : partitions) {
partValues.add(p.getValues());
}
}
} else if (partsSpec.isSetNames()) {
List<String> partNames = partsSpec.getNames();
for (String partName : partNames) {
partValues.add(CachedStore.partNameToVals(partName));
}
}
List<Partition> result = new ArrayList<>();
for (Pair<Integer, byte[]> pair : partExprs) {
byte[] expr = pair.getRight();
String filter = generateJDOFilter(table, expr, conf.get(HiveConf.ConfVars.DEFAULT_PARTITION_NAME.varname));
List<Partition> partitions = tt.listPartitionsByFilter(filter);
for (Partition p : partitions) {
Partition droppedPartition = tt.dropPartition(p.getValues());
if (droppedPartition != null) {
result.add(droppedPartition);
boolean purgeData = options != null ? options.purgeData : true;
boolean deleteData = options != null ? options.deleteData : true;
if (deleteData && !tt.isExternal()) {
deletePartitionLocation(droppedPartition, purgeData);
}
for (List<String> partValue : partValues) {
Partition droppedPartition = tt.dropPartition(partValue);
if (droppedPartition != null) {
result.add(droppedPartition);
boolean purgeData = options != null ? options.purgeData : true;
boolean deleteData = options != null ? options.deleteData : true;
if (deleteData && !tt.isExternal()) {
deletePartitionLocation(droppedPartition, purgeData);
}
}
}
Expand Down
40 changes: 38 additions & 2 deletions ql/src/test/org/apache/hadoop/hive/ql/metadata/TestHive.java
Original file line number Diff line number Diff line change
Expand Up @@ -700,8 +700,7 @@ public void testDropMissingPartitionsByFilter() throws Throwable {
hm.createPartition(table, partitionSpec);
}

List<Partition> partitions = hm.getPartitions(table);
assertEquals(3, partitions.size());
assertEquals(3, hm.getPartitions(table).size());

// drop partitions by filter with missing predicate
try {
Expand All @@ -725,6 +724,43 @@ public void testDropMissingPartitionsByFilter() throws Throwable {
}
}

@Test
public void testDropPartitionsByNames() throws Throwable {
String dbName = Warehouse.DEFAULT_DATABASE_NAME;
String tableName = "table_for_testDropPartitionsByNames";

Table table = createPartitionedTable(dbName, tableName);
for (int i = 10; i <= 12; i++) {
Map<String, String> partitionSpec = new ImmutableMap.Builder<String, String>()
.put("ds", "20231129")
.put("hr", String.valueOf(i))
.build();
hm.createPartition(table, partitionSpec);
}

List<Partition> partitions = hm.getPartitions(table);
assertEquals(3, partitions.size());

List<String> names = Arrays.asList("ds=20231129/hr=10");
hm.dropPartitionsByNames(dbName, tableName, names, PartitionDropOptions.instance());
assertEquals(2, hm.getPartitions(table).size());

try {
// drop missing partition name
names = Arrays.asList("ds=20231129/hr=10", "ds=20231129/hr=11");
hm.dropPartitionsByNames(dbName, tableName, names, PartitionDropOptions.instance());
fail("Expected exception");
} catch (HiveException e) {
// expected
assertEquals("Some partitions to drop are missing", e.getCause().getMessage());
assertEquals(2, hm.getPartitions(table).size());
}

names = Arrays.asList("ds=20231129/hr=12", "ds=20231129/hr=11");
hm.dropPartitionsByNames(dbName, tableName, names, PartitionDropOptions.instance());
assertEquals(0, hm.getPartitions(table).size());
}

/**
* Test that tables set up with auto-purge skip trash-directory when tables/partitions are dropped.
* @throws Throwable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hive.metastore.api.EnvironmentContext;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.RequestPartsSpec;
import org.apache.hadoop.hive.metastore.api.Table;
import com.google.common.collect.ImmutableList;

Expand Down Expand Up @@ -213,4 +214,16 @@ default void preDropPartitions(Table table,
EnvironmentContext context, List<Pair<Integer, byte[]>> partExprs) throws MetaException {
// Do nothing
}

/**
* Called before dropping the partitions from the table in the metastore during ALTER TABLE DROP PARTITION.
* @param table table whose partition needs to be dropped
* @param context context of the operation
* @param partsSpec request partition specification
* @throws MetaException
*/
default void preDropPartitions(Table table,
EnvironmentContext context, RequestPartsSpec partsSpec) throws MetaException {
// Do nothing
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1933,24 +1933,32 @@ public List<Partition> dropPartitions(String catName, String dbName, String tblN
PartitionDropOptions options) throws TException {
RequestPartsSpec rps = new RequestPartsSpec();
List<DropPartitionsExpr> exprs = new ArrayList<>(partExprs.size());
for (Pair<Integer, byte[]> partExpr : partExprs) {
DropPartitionsExpr dpe = new DropPartitionsExpr();
dpe.setExpr(partExpr.getRight());
dpe.setPartArchiveLevel(partExpr.getLeft());
exprs.add(dpe);
}
rps.setExprs(exprs);
return dropPartitions(catName, dbName, tblName, rps, options);
}

@Override
public List<Partition> dropPartitions(String catName, String dbName, String tblName,
RequestPartsSpec partsSpec, PartitionDropOptions options)
throws TException {
EnvironmentContext context = new EnvironmentContext();
Table table = getTable(catName, dbName, tblName);
HiveMetaHook hook = getHook(table);
EnvironmentContext context = new EnvironmentContext();
if (hook != null) {
hook.preDropPartitions(table, context, partExprs);
hook.preDropPartitions(table, context, partsSpec);
}
if (context.getProperties() != null &&
Boolean.parseBoolean(context.getProperties().get(SKIP_DROP_PARTITION))) {
return Lists.newArrayList();
}
for (Pair<Integer, byte[]> partExpr : partExprs) {
DropPartitionsExpr dpe = new DropPartitionsExpr();
dpe.setExpr(partExpr.getRight());
dpe.setPartArchiveLevel(partExpr.getLeft());
exprs.add(dpe);
}
rps.setExprs(exprs);
DropPartitionsRequest req = new DropPartitionsRequest(dbName, tblName, rps);

DropPartitionsRequest req = new DropPartitionsRequest(dbName, tblName, partsSpec);
req.setCatName(catName);
req.setDeleteData(options.deleteData);
req.setNeedResult(options.returnResults);
Expand All @@ -1975,6 +1983,22 @@ public List<Partition> dropPartitions(String catName, String dbName, String tblN
return client.drop_partitions_req(req).getPartitions();
}

@Override
public List<Partition> dropPartitionsByNames(String dbName, String tblName,
List<String> partitionNames, PartitionDropOptions options)
throws NoSuchObjectException, MetaException, TException {
return dropPartitionsByNames(getDefaultCatalog(conf), dbName, tblName, partitionNames, options);
}

@Override
public List<Partition> dropPartitionsByNames(String catName, String dbName, String tblName,
List<String> partitionNames, PartitionDropOptions options)
throws NoSuchObjectException, MetaException, TException {
RequestPartsSpec rps = new RequestPartsSpec();
rps.setNames(partitionNames);
return dropPartitions(catName, dbName, tblName, rps, options);
}

@Override
public void dropTable(String dbname, String name, boolean deleteData,
boolean ignoreUnknownTab) throws MetaException, TException,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2174,6 +2174,53 @@ List<Partition> dropPartitions(String catName, String dbName, String tblName,
PartitionDropOptions options)
throws NoSuchObjectException, MetaException, TException;

/**
* Generalization of dropPartitions(),
* @param catName catalog name
* @param dbName Name of the database
* @param tblName Name of the table
* @param partsSpec request partition specification
* @param options Boolean options for dropping partitions
* @return List of Partitions dropped
* @throws NoSuchObjectException No partition matches the expression(s), and ifExists was false.
* @throws MetaException error access the RDBMS or storage.
* @throws TException On failure
*/
List<Partition> dropPartitions(String catName, String dbName, String tblName,
RequestPartsSpec partsSpec, PartitionDropOptions options)
throws NoSuchObjectException, MetaException, TException;

/**
* Generalization of dropPartitionsByNames(),
* @param dbName Name of the database
* @param tblName Name of the table
* @param partitionNames List of partition names
* @param options Boolean options for dropping partitions
* @return List of Partitions dropped
* @throws NoSuchObjectException No partition matches the partition name(s), and ifExists was false.
* @throws MetaException error access the RDBMS or storage.
* @throws TException On failure
*/
List<Partition> dropPartitionsByNames(String dbName, String tblName,
List<String> partitionNames, PartitionDropOptions options)
throws NoSuchObjectException, MetaException, TException;

/**
* Generalization of dropPartitionsByNames(),
* @param catName catalog name
* @param dbName Name of the database
* @param tblName Name of the table
* @param partitionNames List of partition names
* @param options Boolean options for dropping partitions
* @return List of Partitions dropped
* @throws NoSuchObjectException No partition matches the partition name(s), and ifExists was false.
* @throws MetaException error access the RDBMS or storage.
* @throws TException On failure
*/
List<Partition> dropPartitionsByNames(String catName, String dbName, String tblName,
List<String> partitionNames, PartitionDropOptions options)
throws NoSuchObjectException, MetaException, TException;

/**
* Drop a partition.
* @param db_name database name.
Expand Down
Loading

0 comments on commit ea8ab97

Please sign in to comment.