Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
wecharyu committed Dec 23, 2023
1 parent 14de002 commit fe19a79
Show file tree
Hide file tree
Showing 4 changed files with 128 additions and 138 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -679,24 +679,6 @@ public List<String> createInsertValuesStmt(String tblColumns, List<String> rows,
}
}

public String createUpdatePreparedStmt(String tableName, List<String> columnNames,
List<String> conditionKeys) {
StringBuilder sb = new StringBuilder();
sb.append("update " + tableName + " set ");
sb.append(columnNames.stream().map(col -> col + "=?").collect(Collectors.joining(",")));
sb.append(" where " + conditionKeys.stream().map(cond -> cond + "=?").collect(Collectors.joining(" and ")));
return sb.toString();
}

public String createInsertPreparedStmt(String tableName, List<String> columnNames) {
StringBuilder sb = new StringBuilder();
sb.append("insert into " + tableName + "(");
sb.append(columnNames.stream().collect(Collectors.joining(",")));
String placeholder = columnNames.stream().map(col -> "?").collect(Collectors.joining(","));
sb.append(") values (" + placeholder + ")");
return sb.toString();
}

public String addEscapeCharacters(String s) {
if (isMYSQL()) {
return s.replaceAll("\\\\", "\\\\\\\\");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,19 +80,21 @@
* the underlying database. It should use ANSI SQL and be compatible with common databases
* such as MySQL (note that MySQL doesn't use full ANSI mode by default), Postgres, etc.
*
* This class separates out the statistics update part from MetaStoreDirectSql class.
* This class separates out the update part from MetaStoreDirectSql class.
*/
class DirectSqlUpdate {
private static final Logger LOG = LoggerFactory.getLogger(DirectSqlUpdate.class.getName());
PersistenceManager pm;
Configuration conf;
DatabaseProduct dbType;
int maxBatchSize;
SQLGenerator sqlGenerator;
class DirectSqlUpdatePart {
private static final Logger LOG = LoggerFactory.getLogger(DirectSqlUpdatePart.class.getName());

private final PersistenceManager pm;
private final Configuration conf;
private final DatabaseProduct dbType;
private final int maxBatchSize;
private final SQLGenerator sqlGenerator;

private static final ReentrantLock derbyLock = new ReentrantLock(true);
public DirectSqlUpdate(PersistenceManager pm, Configuration conf,
DatabaseProduct dbType, int batchSize) {

public DirectSqlUpdatePart(PersistenceManager pm, Configuration conf,
DatabaseProduct dbType, int batchSize) {
this.pm = pm;
this.conf = conf;
this.dbType = dbType;
Expand Down Expand Up @@ -362,9 +364,6 @@ private Map<String, Map<String, String>> updatePartitionParamTable(Connection db
throws SQLException, MetaException {
Map<String, Map<String, String>> result = new HashMap<>();
boolean areTxnStatsSupported = MetastoreConf.getBoolVar(conf, ConfVars.HIVE_TXN_STATS_ENABLED);
PreparedStatement statementInsert = null;
PreparedStatement statementDelete = null;
PreparedStatement statementUpdate = null;
String insert = "INSERT INTO \"PARTITION_PARAMS\" (\"PART_ID\", \"PARAM_KEY\", \"PARAM_VALUE\") "
+ "VALUES( ? , 'COLUMN_STATS_ACCURATE' , ? )";
String delete = "DELETE from \"PARTITION_PARAMS\" "
Expand All @@ -384,10 +383,9 @@ private Map<String, Map<String, String>> updatePartitionParamTable(Connection db
// get the old parameters from PARTITION_PARAMS table.
Map<Long, String> partIdToParaMap = getParamValues(dbConn, partIdList);

try {
statementInsert = dbConn.prepareStatement(insert);
statementDelete = dbConn.prepareStatement(delete);
statementUpdate = dbConn.prepareStatement(update);
try (PreparedStatement statementInsert = dbConn.prepareStatement(insert);
PreparedStatement statementDelete = dbConn.prepareStatement(delete);
PreparedStatement statementUpdate = dbConn.prepareStatement(update)) {
for (Map.Entry entry : partitionInfoMap.entrySet()) {
PartitionInfo partitionInfo = (PartitionInfo) entry.getKey();
ColumnStatistics colStats = (ColumnStatistics) entry.getValue();
Expand Down Expand Up @@ -472,83 +470,9 @@ private Map<String, Map<String, String>> updatePartitionParamTable(Connection db
updateWriteIdForPartitions(dbConn, writeId, partIdList);
}
return result;
} finally {
closeStmt(statementInsert);
closeStmt(statementUpdate);
closeStmt(statementDelete);
}
}

private static class PartitionInfo {
long partitionId;
long writeId;
String partitionName;
public PartitionInfo(long partitionId, long writeId, String partitionName) {
this.partitionId = partitionId;
this.writeId = writeId;
this.partitionName = partitionName;
}

@Override
public int hashCode() {
return (int)partitionId;
}

@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null) {
return false;
}
if (!(o instanceof PartitionInfo)) {
return false;
}
PartitionInfo other = (PartitionInfo)o;
if (this.partitionId != other.partitionId) {
return false;
}
return true;
}
}

private static class PartColNameInfo {
long partitionId;
String colName;
public PartColNameInfo(long partitionId, String colName) {
this.partitionId = partitionId;
this.colName = colName;
}

@Override
public int hashCode() {
return (int)partitionId;
}

@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null) {
return false;
}
if (!(o instanceof PartColNameInfo)) {
return false;
}
PartColNameInfo other = (PartColNameInfo)o;
if (this.partitionId != other.partitionId) {
return false;
}
if (this.colName.equalsIgnoreCase(other.colName)) {
return true;
}
return false;
}
}

private Map<PartitionInfo, ColumnStatistics> getPartitionInfo(Connection dbConn, long tblId,
Map<String, ColumnStatistics> partColStatsMap)
Expand Down Expand Up @@ -792,7 +716,7 @@ private void updatePartitionsInBatch(Map<List<String>, Long> partValuesToId,
List<Partition> newParts) throws MetaException {
List<String> columns = Arrays.asList("\"CREATE_TIME\"", "\"LAST_ACCESS_TIME\"", "\"WRITE_ID\"");
List<String> conditionKeys = Arrays.asList("\"PART_ID\"");
String stmt = dbType.createUpdatePreparedStmt("\"PARTITIONS\"", columns, conditionKeys);
String stmt = TxnUtils.createUpdatePreparedStmt("\"PARTITIONS\"", columns, conditionKeys);
int maxRows = dbType.getMaxRows(maxBatchSize, 4);
updateWithStatement(statement -> Batchable.runBatched(maxRows, newParts, new Batchable<Partition, Void>() {
@Override
Expand Down Expand Up @@ -916,7 +840,7 @@ private void updateParams(String paramTable, String idColumn,
List<Pair<Long, Pair<String, String>>> updateIdAndParams) throws MetaException {
List<String> columns = Arrays.asList("\"PARAM_VALUE\"");
List<String> conditionKeys = Arrays.asList(idColumn, "\"PARAM_KEY\"");
String stmt = dbType.createUpdatePreparedStmt(paramTable, columns, conditionKeys);
String stmt = TxnUtils.createUpdatePreparedStmt(paramTable, columns, conditionKeys);
int maxRows = dbType.getMaxRows(maxBatchSize, 3);
updateWithStatement(statement -> Batchable.runBatched(maxRows, updateIdAndParams,
new Batchable<Pair<Long, Pair<String, String>>, Object>() {
Expand All @@ -938,7 +862,7 @@ public List<Object> run(List<Pair<Long, Pair<String, String>>> input) throws SQL
private void insertParams(String paramTable, String idColumn,
List<Pair<Long, Pair<String, String>>> addIdAndParams) throws MetaException {
List<String> columns = Arrays.asList(idColumn, "\"PARAM_KEY\"", "\"PARAM_VALUE\"");
String query = dbType.createInsertPreparedStmt(paramTable, columns);
String query = TxnUtils.createInsertPreparedStmt(paramTable, columns);
int maxRows = dbType.getMaxRows(maxBatchSize, 3);
updateWithStatement(statement -> Batchable.runBatched(maxRows, addIdAndParams,
new Batchable<Pair<Long, Pair<String, String>>, Void>() {
Expand Down Expand Up @@ -1046,7 +970,7 @@ private void updateSDInBatch(List<Long> ids, Map<Long, StorageDescriptor> idToSd
List<String> columns = Arrays.asList("\"CD_ID\"", "\"INPUT_FORMAT\"", "\"IS_COMPRESSED\"",
"\"IS_STOREDASSUBDIRECTORIES\"", "\"LOCATION\"", "\"NUM_BUCKETS\"", "\"OUTPUT_FORMAT\"");
List<String> conditionKeys = Arrays.asList("\"SD_ID\"");
String stmt = dbType.createUpdatePreparedStmt("\"SDS\"", columns, conditionKeys);
String stmt = TxnUtils.createUpdatePreparedStmt("\"SDS\"", columns, conditionKeys);
int maxRows = dbType.getMaxRows(maxBatchSize, 8);
updateWithStatement(statement -> Batchable.runBatched(maxRows, ids,
new Batchable<Long, Void>() {
Expand Down Expand Up @@ -1083,7 +1007,7 @@ public List<Void> run(List<Long> input) throws MetaException {
}
});
List<String> columns = Arrays.asList("\"SD_ID\"", "\"INTEGER_IDX\"", "\"BUCKET_COL_NAME\"");
String stmt = dbType.createInsertPreparedStmt("\"BUCKETING_COLS\"", columns);
String stmt = TxnUtils.createInsertPreparedStmt("\"BUCKETING_COLS\"", columns);
List<Long> idWithBucketCols = filterIdsByNonNullValue(sdIds, sdIdToBucketCols);
int maxRows = dbType.getMaxRows(maxBatchSize, 3);
updateWithStatement(statement -> Batchable.runBatched(maxRows, idWithBucketCols, new Batchable<Long, Object>() {
Expand Down Expand Up @@ -1117,7 +1041,7 @@ public List<Void> run(List<Long> input) throws MetaException {
});

List<String> columns = Arrays.asList("\"SD_ID\"", "\"INTEGER_IDX\"", "\"COLUMN_NAME\"", "\"ORDER\"");
String stmt = dbType.createInsertPreparedStmt("\"SORT_COLS\"", columns);
String stmt = TxnUtils.createInsertPreparedStmt("\"SORT_COLS\"", columns);
List<Long> idWithSortCols = filterIdsByNonNullValue(sdIds, sdIdToSortCols);
int maxRows = dbType.getMaxRows(maxBatchSize, 4);
updateWithStatement(statement -> Batchable.runBatched(maxRows, idWithSortCols, new Batchable<Long, Object>() {
Expand Down Expand Up @@ -1229,7 +1153,7 @@ private Long getDataStoreId(Class<?> modelClass) throws MetaException {
private void insertSkewedColNamesInBatch(Map<Long, List<String>> sdIdToSkewedColNames,
List<Long> sdIds) throws MetaException {
List<String> columns = Arrays.asList("\"SD_ID\"", "\"INTEGER_IDX\"", "\"SKEWED_COL_NAME\"");
String stmt = dbType.createInsertPreparedStmt("\"SKEWED_COL_NAMES\"", columns);
String stmt = TxnUtils.createInsertPreparedStmt("\"SKEWED_COL_NAMES\"", columns);
List<Long> idWithSkewedCols = filterIdsByNonNullValue(sdIds, sdIdToSkewedColNames);
int maxRows = dbType.getMaxRows(maxBatchSize, 3);
updateWithStatement(statement -> Batchable.runBatched(maxRows, idWithSkewedCols, new Batchable<Long, Object>() {
Expand All @@ -1252,7 +1176,7 @@ public List<Object> run(List<Long> input) throws SQLException {

private void insertStringListInBatch(List<Long> stringListIds) throws MetaException {
List<String> columns = Arrays.asList("\"STRING_LIST_ID\"");
String insertQuery = dbType.createInsertPreparedStmt("\"SKEWED_STRING_LIST\"", columns);
String insertQuery = TxnUtils.createInsertPreparedStmt("\"SKEWED_STRING_LIST\"", columns);
int maxRows = dbType.getMaxRows(maxBatchSize, 1);
updateWithStatement(statement -> Batchable.runBatched(maxRows, stringListIds,
new Batchable<Long, Void>() {
Expand All @@ -1272,7 +1196,7 @@ public List<Void> run(List<Long> input) throws SQLException {
private void insertStringListValuesInBatch(Map<Long, List<String>> stringListIdToValues,
List<Long> stringListIds) throws MetaException {
List<String> columns = Arrays.asList("\"STRING_LIST_ID\"", "\"INTEGER_IDX\"", "\"STRING_LIST_VALUE\"");
String insertQuery = dbType.createInsertPreparedStmt("\"SKEWED_STRING_LIST_VALUES\"", columns);
String insertQuery = TxnUtils.createInsertPreparedStmt("\"SKEWED_STRING_LIST_VALUES\"", columns);
List<Long> idWithStringList = filterIdsByNonNullValue(stringListIds, stringListIdToValues);
int maxRows = dbType.getMaxRows(maxBatchSize, 3);
updateWithStatement(statement -> Batchable.runBatched(maxRows, idWithStringList,
Expand All @@ -1298,7 +1222,7 @@ public List<Void> run(List<Long> input) throws SQLException {
private void insertSkewedValuesInBatch(Map<Long, List<Long>> sdIdToStringListId,
List<Long> sdIds) throws MetaException {
List<String> columns = Arrays.asList("\"SD_ID_OID\"", "\"INTEGER_IDX\"", "\"STRING_LIST_ID_EID\"");
String insertQuery = dbType.createInsertPreparedStmt("\"SKEWED_VALUES\"", columns);
String insertQuery = TxnUtils.createInsertPreparedStmt("\"SKEWED_VALUES\"", columns);
List<Long> idWithSkewedValues = filterIdsByNonNullValue(sdIds, sdIdToStringListId);
int maxRows = dbType.getMaxRows(maxBatchSize, 3);
updateWithStatement(statement -> Batchable.runBatched(maxRows, idWithSkewedValues,
Expand All @@ -1324,7 +1248,7 @@ public List<Void> run(List<Long> input) throws Exception {
private void insertSkewColValueLocInBatch(Map<Long, List<Pair<Long, String>>> sdIdToColValueLoc,
List<Long> sdIds) throws MetaException {
List<String> columns = Arrays.asList("\"SD_ID\"", "\"STRING_LIST_ID_KID\"", "\"LOCATION\"");
String insertQuery = dbType.createInsertPreparedStmt("\"SKEWED_COL_VALUE_LOC_MAP\"", columns);
String insertQuery = TxnUtils.createInsertPreparedStmt("\"SKEWED_COL_VALUE_LOC_MAP\"", columns);
List<Long> idWithColValueLoc = filterIdsByNonNullValue(sdIds, sdIdToColValueLoc);
int maxRows = dbType.getMaxRows(maxBatchSize, 3);
updateWithStatement(statement -> Batchable.runBatched(maxRows, idWithColValueLoc,
Expand Down Expand Up @@ -1415,7 +1339,7 @@ public List<Void> run(List<Long> input) throws Exception {

private void insertCDInBatch(List<Long> ids, Map<Long, List<FieldSchema>> idToCols)
throws MetaException {
String insertCds = dbType.createInsertPreparedStmt("\"CDS\"", Arrays.asList("\"CD_ID\""));
String insertCds = TxnUtils.createInsertPreparedStmt("\"CDS\"", Arrays.asList("\"CD_ID\""));
int maxRows = dbType.getMaxRows(maxBatchSize, 1);
updateWithStatement(statement -> Batchable.runBatched(maxRows, ids,
new Batchable<Long, Void>() {
Expand All @@ -1432,7 +1356,7 @@ public List<Void> run(List<Long> input) throws SQLException {

List<String> columns = Arrays.asList("\"CD_ID\"",
"\"COMMENT\"", "\"COLUMN_NAME\"", "\"TYPE_NAME\"", "\"INTEGER_IDX\"");
String insertColumns = dbType.createInsertPreparedStmt("\"COLUMNS_V2\"", columns);
String insertColumns = TxnUtils.createInsertPreparedStmt("\"COLUMNS_V2\"", columns);
int maxRowsForCDs = dbType.getMaxRows(maxBatchSize, 5);
updateWithStatement(statement -> Batchable.runBatched(maxRowsForCDs, ids,
new Batchable<Long, Void>() {
Expand Down Expand Up @@ -1463,8 +1387,8 @@ private void updateKeyConstraintsInBatch(Map<Long, Long> oldCdIdToNewCdId,
List<String> parentColumns = Arrays.asList("\"PARENT_CD_ID\"", "\"PARENT_INTEGER_IDX\"");
List<String> childColumns = Arrays.asList("\"CHILD_CD_ID\"", "\"CHILD_INTEGER_IDX\"");

String updateParent = dbType.createUpdatePreparedStmt(tableName, parentColumns, parentColumns);
String updateChild = dbType.createUpdatePreparedStmt(tableName, childColumns, childColumns);
String updateParent = TxnUtils.createUpdatePreparedStmt(tableName, parentColumns, parentColumns);
String updateChild = TxnUtils.createUpdatePreparedStmt(tableName, childColumns, childColumns);
for (String updateStmt : new String[]{updateParent, updateChild}) {
int maxRows = dbType.getMaxRows(maxBatchSize, 4);
updateWithStatement(statement -> Batchable.runBatched(maxRows, oldCdIds,
Expand Down Expand Up @@ -1519,7 +1443,7 @@ private void updateSerdeInBatch(List<Long> ids, Map<Long, SerDeInfo> idToSerde)
// Followed the jdo implement to update only NAME and SLIB of SERDES.
List<String> columns = Arrays.asList("\"NAME\"", "\"SLIB\"");
List<String> condKeys = Arrays.asList("\"SERDE_ID\"");
String updateStmt = dbType.createUpdatePreparedStmt("\"SERDES\"", columns, condKeys);
String updateStmt = TxnUtils.createUpdatePreparedStmt("\"SERDES\"", columns, condKeys);
List<Long> idWithSerde = filterIdsByNonNullValue(ids, idToSerde);
int maxRows = dbType.getMaxRows(maxBatchSize, 3);
updateWithStatement(statement -> Batchable.runBatched(maxRows, idWithSerde,
Expand All @@ -1538,4 +1462,75 @@ public List<Void> run(List<Long> input) throws SQLException {
}
}), updateStmt);
}

private static final class PartitionInfo {
long partitionId;
long writeId;
String partitionName;
public PartitionInfo(long partitionId, long writeId, String partitionName) {
this.partitionId = partitionId;
this.writeId = writeId;
this.partitionName = partitionName;
}

@Override
public int hashCode() {
return (int)partitionId;
}

@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null) {
return false;
}
if (!(o instanceof PartitionInfo)) {
return false;
}
PartitionInfo other = (PartitionInfo)o;
if (this.partitionId != other.partitionId) {
return false;
}
return true;
}
}

private static final class PartColNameInfo {
long partitionId;
String colName;
public PartColNameInfo(long partitionId, String colName) {
this.partitionId = partitionId;
this.colName = colName;
}

@Override
public int hashCode() {
return (int)partitionId;
}

@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null) {
return false;
}
if (!(o instanceof PartColNameInfo)) {
return false;
}
PartColNameInfo other = (PartColNameInfo)o;
if (this.partitionId != other.partitionId) {
return false;
}
if (this.colName.equalsIgnoreCase(other.colName)) {
return true;
}
return false;
}
}
}
Loading

0 comments on commit fe19a79

Please sign in to comment.