Skip to content

Commit 2e67308

Browse files
danielcweeksFokko
andauthored
Hive locking (#405)
* Add hive locking for commit path * Add integration test * Fix identifier assignment * Fix identifer unpacking * Import properties * Use properties * Remove unnecessary fixture * Update `catalog_hive` * Separate lock client from commit client * Fix match expression * Use separate client for locking * Lint --------- Co-authored-by: Fokko Driesprong <[email protected]>
1 parent 33b555a commit 2e67308

File tree

2 files changed

+55
-5
lines changed

2 files changed

+55
-5
lines changed

pyiceberg/catalog/hive.py

Lines changed: 30 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
# specific language governing permissions and limitations
1616
# under the License.
1717
import getpass
18+
import socket
1819
import time
1920
from types import TracebackType
2021
from typing import (
@@ -34,10 +35,17 @@
3435
AlreadyExistsException,
3536
FieldSchema,
3637
InvalidOperationException,
38+
LockComponent,
39+
LockLevel,
40+
LockRequest,
41+
LockResponse,
42+
LockState,
43+
LockType,
3744
MetaException,
3845
NoSuchObjectException,
3946
SerDeInfo,
4047
StorageDescriptor,
48+
UnlockRequest,
4149
)
4250
from hive_metastore.ttypes import Database as HiveDatabase
4351
from hive_metastore.ttypes import Table as HiveTable
@@ -56,6 +64,7 @@
5664
PropertiesUpdateSummary,
5765
)
5866
from pyiceberg.exceptions import (
67+
CommitFailedException,
5968
NamespaceAlreadyExistsError,
6069
NamespaceNotEmptyError,
6170
NoSuchIcebergTableError,
@@ -331,6 +340,15 @@ def register_table(self, identifier: Union[str, Identifier], metadata_location:
331340
"""
332341
raise NotImplementedError
333342

343+
def _create_lock_request(self, database_name: str, table_name: str) -> LockRequest:
344+
lock_component: LockComponent = LockComponent(
345+
level=LockLevel.TABLE, type=LockType.EXCLUSIVE, dbname=database_name, tablename=table_name, isTransactional=True
346+
)
347+
348+
lock_request: LockRequest = LockRequest(component=[lock_component], user=getpass.getuser(), hostname=socket.gethostname())
349+
350+
return lock_request
351+
334352
def _commit_table(self, table_request: CommitTableRequest) -> CommitTableResponse:
335353
"""Update the table.
336354
@@ -363,15 +381,23 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons
363381
self._write_metadata(updated_metadata, current_table.io, new_metadata_location)
364382

365383
# commit to hive
366-
try:
367-
with self._client as open_client:
384+
# https://github.com/apache/hive/blob/master/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift#L1232
385+
with self._client as open_client:
386+
lock: LockResponse = open_client.lock(self._create_lock_request(database_name, table_name))
387+
388+
try:
389+
if lock.state != LockState.ACQUIRED:
390+
raise CommitFailedException(f"Failed to acquire lock for {table_request.identifier}, state: {lock.state}")
391+
368392
tbl = open_client.get_table(dbname=database_name, tbl_name=table_name)
369393
tbl.parameters = _construct_parameters(
370394
metadata_location=new_metadata_location, previous_metadata_location=current_table.metadata_location
371395
)
372396
open_client.alter_table(dbname=database_name, tbl_name=table_name, new_tbl=tbl)
373-
except NoSuchObjectException as e:
374-
raise NoSuchTableError(f"Table does not exist: {table_name}") from e
397+
except NoSuchObjectException as e:
398+
raise NoSuchTableError(f"Table does not exist: {table_name}") from e
399+
finally:
400+
open_client.unlock(UnlockRequest(lockid=lock.lockid))
375401

376402
return CommitTableResponse(metadata=updated_metadata, metadata_location=new_metadata_location)
377403

tests/integration/test_reads.py

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,12 @@
2222

2323
import pyarrow.parquet as pq
2424
import pytest
25+
from hive_metastore.ttypes import LockRequest, LockResponse, LockState, UnlockRequest
2526
from pyarrow.fs import S3FileSystem
2627

2728
from pyiceberg.catalog import Catalog, load_catalog
28-
from pyiceberg.exceptions import NoSuchTableError
29+
from pyiceberg.catalog.hive import HiveCatalog, _HiveClient
30+
from pyiceberg.exceptions import CommitFailedException, NoSuchTableError
2931
from pyiceberg.expressions import (
3032
And,
3133
EqualTo,
@@ -467,3 +469,25 @@ def test_null_list_and_map(catalog: Catalog) -> None:
467469
# assert arrow_table["col_list_with_struct"].to_pylist() == [None, [{'test': 1}]]
468470
# Once https://github.com/apache/arrow/issues/38809 has been fixed
469471
assert arrow_table["col_list_with_struct"].to_pylist() == [[], [{'test': 1}]]
472+
473+
474+
@pytest.mark.integration
475+
def test_hive_locking(catalog_hive: HiveCatalog) -> None:
476+
table = create_table(catalog_hive)
477+
478+
database_name: str
479+
table_name: str
480+
_, database_name, table_name = table.identifier
481+
482+
hive_client: _HiveClient = _HiveClient(catalog_hive.properties["uri"])
483+
blocking_lock_request: LockRequest = catalog_hive._create_lock_request(database_name, table_name)
484+
485+
with hive_client as open_client:
486+
# Force a lock on the test table
487+
lock: LockResponse = open_client.lock(blocking_lock_request)
488+
assert lock.state == LockState.ACQUIRED
489+
try:
490+
with pytest.raises(CommitFailedException, match="(Failed to acquire lock for).*"):
491+
table.transaction().set_properties(lock="fail").commit_transaction()
492+
finally:
493+
open_client.unlock(UnlockRequest(lock.lockid))

0 commit comments

Comments
 (0)