Skip to content

Commit 3f7bcbb

Browse files
committed
cluster: add application_info
Implement clustr.application_info to make driver send following startup options to server: 1. `APPLICATION_NAME` - ID what application is using driver, example: repo of the application 2. `APPLICATION_VERSION` - Version of the application, example: release version or commit id of the application 3. `CLIENT_ID` - unique id of the client instance, example: pod name All strings.
1 parent 4ad0566 commit 3f7bcbb

File tree

4 files changed

+176
-2
lines changed

4 files changed

+176
-2
lines changed

cassandra/application_info.py

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
# Copyright 2025 ScyllaDB, Inc.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
from typing import Optional
15+
16+
17+
class ApplicationInfoBase:
18+
"""
19+
A class that holds application information and adds it to startup message options
20+
"""
21+
def add_startup_options(self, options: dict[str, str]):
22+
raise NotImplementedError()
23+
24+
25+
class ApplicationInfo(ApplicationInfoBase):
26+
application_name: Optional[str]
27+
application_version: Optional[str]
28+
client_id: Optional[str]
29+
30+
def __init__(
31+
self,
32+
application_name: Optional[str] = None,
33+
application_version: Optional[str] = None,
34+
client_id: Optional[str] = None
35+
):
36+
if application_name and not isinstance(application_name, str):
37+
raise TypeError('application_name must be a string')
38+
if application_version and not isinstance(application_version, str):
39+
raise TypeError('application_version must be a string')
40+
if client_id and not isinstance(client_id, str):
41+
raise TypeError('client_id must be a string')
42+
43+
self.application_name = application_name
44+
self.application_version = application_version
45+
self.client_id = client_id
46+
47+
def add_startup_options(self, options: dict[str, str]):
48+
if self.application_name:
49+
options['APPLICATION_NAME'] = self.application_name
50+
if self.application_version:
51+
options['APPLICATION_VERSION'] = self.application_version
52+
if self.client_id:
53+
options['CLIENT_ID'] = self.client_id

cassandra/cluster.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
from itertools import groupby, count, chain
3030
import json
3131
import logging
32+
from typing import Optional
3233
from warnings import warn
3334
from random import random
3435
import re
@@ -95,6 +96,7 @@
9596
from cassandra.datastax.graph.query import _request_timeout_key, _GraphSONContextRowFactory
9697
from cassandra.datastax import cloud as dscloud
9798
from cassandra.scylla.cloud import CloudConfiguration
99+
from cassandra.application_info import ApplicationInfoBase
98100

99101
try:
100102
from cassandra.io.twistedreactor import TwistedConnection
@@ -706,6 +708,19 @@ class Cluster(object):
706708
Setting this to :const:`False` disables compression.
707709
"""
708710

711+
_application_info: Optional[ApplicationInfoBase] = None
712+
713+
@property
714+
def application_info(self) -> Optional[ApplicationInfoBase]:
715+
"""
716+
An instance of any subclass of :class:`.application_info.ApplicationInfoBase`.
717+
718+
Defaults to None
719+
720+
When set makes driver sends information about application that uses driver in startup frame
721+
"""
722+
return self._application_info
723+
709724
_auth_provider = None
710725
_auth_provider_callable = None
711726

@@ -1204,6 +1219,7 @@ def __init__(self,
12041219
shard_aware_options=None,
12051220
metadata_request_timeout=None,
12061221
column_encryption_policy=None,
1222+
application_info:Optional[ApplicationInfoBase]=None
12071223
):
12081224
"""
12091225
``executor_threads`` defines the number of threads in a pool for handling asynchronous tasks such as
@@ -1329,6 +1345,12 @@ def __init__(self,
13291345
raise TypeError("address_translator should not be a class, it should be an instance of that class")
13301346
self.address_translator = address_translator
13311347

1348+
if application_info is not None:
1349+
if not isinstance(application_info, ApplicationInfoBase):
1350+
raise TypeError(
1351+
"application_info should be an instance of any ApplicationInfoBase class")
1352+
self._application_info = application_info
1353+
13321354
if timestamp_generator is not None:
13331355
if not callable(timestamp_generator):
13341356
raise ValueError("timestamp_generator must be callable")
@@ -1779,6 +1801,7 @@ def _make_connection_kwargs(self, endpoint, kwargs_dict):
17791801
kwargs_dict.setdefault('user_type_map', self._user_types)
17801802
kwargs_dict.setdefault('allow_beta_protocol_version', self.allow_beta_protocol_version)
17811803
kwargs_dict.setdefault('no_compact', self.no_compact)
1804+
kwargs_dict.setdefault('application_info', self.application_info)
17821805

17831806
return kwargs_dict
17841807

cassandra/connection.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,9 @@
2828
import weakref
2929
import random
3030
import itertools
31+
from typing import Optional
3132

33+
from cassandra.application_info import ApplicationInfoBase
3234
from cassandra.protocol_features import ProtocolFeatures
3335

3436
if 'gevent.monkey' in sys.modules:
@@ -761,8 +763,8 @@ class Connection(object):
761763
_is_checksumming_enabled = False
762764

763765
_on_orphaned_stream_released = None
764-
765766
features = None
767+
_application_info: Optional[ApplicationInfoBase] = None
766768

767769
@property
768770
def _iobuf(self):
@@ -774,7 +776,7 @@ def __init__(self, host='127.0.0.1', port=9042, authenticator=None,
774776
cql_version=None, protocol_version=ProtocolVersion.MAX_SUPPORTED, is_control_connection=False,
775777
user_type_map=None, connect_timeout=None, allow_beta_protocol_version=False, no_compact=False,
776778
ssl_context=None, owning_pool=None, shard_id=None, total_shards=None,
777-
on_orphaned_stream_released=None):
779+
on_orphaned_stream_released=None, application_info: Optional[ApplicationInfoBase] = None):
778780
# TODO next major rename host to endpoint and remove port kwarg.
779781
self.endpoint = host if isinstance(host, EndPoint) else DefaultEndPoint(host, port)
780782

@@ -797,6 +799,7 @@ def __init__(self, host='127.0.0.1', port=9042, authenticator=None,
797799
self._socket_writable = True
798800
self.orphaned_request_ids = set()
799801
self._on_orphaned_stream_released = on_orphaned_stream_released
802+
self._application_info = application_info
800803

801804
if ssl_options:
802805
self.ssl_options.update(self.endpoint.ssl_options or {})
@@ -1379,6 +1382,8 @@ def _handle_options_response(self, options_response):
13791382
self._product_type = options_response.options.get('PRODUCT_TYPE', [None])[0]
13801383

13811384
options = {}
1385+
if self._application_info:
1386+
self._application_info.add_startup_options(options)
13821387
self.features.add_startup_options(options)
13831388

13841389
if self.cql_version:
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
# Copyright 2025 ScyllaDB, Inc.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import unittest
16+
17+
from cassandra.application_info import ApplicationInfo
18+
from tests.integration import TestCluster, use_single_node, remove_cluster, xfail_scylla
19+
20+
21+
def setup_module():
22+
use_single_node()
23+
24+
25+
def teardown_module():
26+
remove_cluster()
27+
28+
29+
@xfail_scylla("#scylladb/scylla-enterprise#5467 - not released yet")
30+
class ApplicationInfoTest(unittest.TestCase):
31+
attribute_to_startup_key = {
32+
'application_name': 'APPLICATION_NAME',
33+
'application_version': 'APPLICATION_VERSION',
34+
'client_id': 'CLIENT_ID',
35+
}
36+
37+
def test_create_session_and_check_system_views_clients(self):
38+
"""
39+
Test to ensure that ApplicationInfo user provides endup in `client_options` of `system_views.clients` table
40+
"""
41+
42+
for application_info_args in [
43+
{
44+
'application_name': None,
45+
'application_version': None,
46+
'client_id': None,
47+
},
48+
{
49+
'application_name': 'some-application-name',
50+
'application_version': 'some-application-version',
51+
'client_id': 'some-client-id',
52+
},
53+
{
54+
'application_name': 'some-application-name',
55+
'application_version': None,
56+
'client_id': None,
57+
},
58+
{
59+
'application_name': None,
60+
'application_version': 'some-application-version',
61+
'client_id': None,
62+
},
63+
{
64+
'application_name': None,
65+
'application_version': None,
66+
'client_id': 'some-client-id',
67+
},
68+
]:
69+
with self.subTest(**application_info_args):
70+
try:
71+
cluster = TestCluster(
72+
application_info=ApplicationInfo(
73+
**application_info_args
74+
))
75+
76+
found = False
77+
for row in cluster.connect().execute("select client_options from system_views.clients"):
78+
if not row[0]:
79+
continue
80+
for attribute_key, startup_key in self.attribute_to_startup_key.items():
81+
expected_value = application_info_args.get(attribute_key)
82+
if expected_value:
83+
if row[0].get(startup_key) != expected_value:
84+
break
85+
else:
86+
# Check that it is absent
87+
if row[0].get(startup_key, None) is not None:
88+
break
89+
else:
90+
found = True
91+
assert found
92+
finally:
93+
cluster.shutdown()

0 commit comments

Comments
 (0)