Skip to content

Commit 1c42790

Browse files
authored
Merge pull request #238 from yh-0/RDBC-670-1
RDBC-670 Streaming
2 parents f7d960f + facfdfb commit 1c42790

File tree

16 files changed

+715
-32
lines changed

16 files changed

+715
-32
lines changed

ravendb/documents/commands/stream.py

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
import requests
2+
from typing import TypeVar, Generic, Iterator
3+
4+
from ravendb.documents.queries.index_query import IndexQuery
5+
from ravendb.documents.conventions import DocumentConventions
6+
from ravendb.extensions.json_extensions import JsonExtensions
7+
from ravendb.http.http_cache import HttpCache
8+
from ravendb.http.misc import ResponseDisposeHandling
9+
from ravendb.http.server_node import ServerNode
10+
from ravendb.http.raven_command import RavenCommand, RavenCommandResponseType
11+
from ravendb.json.metadata_as_dictionary import MetadataAsDictionary
12+
13+
14+
_T = TypeVar("_T")
15+
16+
17+
class StreamResult(Generic[_T]):
18+
def __init__(
19+
self, key: str = None, change_vector: str = None, metadata: MetadataAsDictionary = None, document: _T = None
20+
):
21+
self.key = key
22+
self.change_vector = change_vector
23+
self.metadata = metadata
24+
self.document = document
25+
26+
27+
class StreamResultResponse:
28+
def __init__(self, response: requests.Response, stream_generator: Iterator):
29+
self.response = response
30+
self.stream_iterator = stream_generator
31+
32+
33+
class StreamCommand(RavenCommand[StreamResultResponse]):
34+
def __init__(self, url: str):
35+
super(StreamCommand, self).__init__(StreamResultResponse)
36+
if not url or url.isspace():
37+
raise ValueError("Url cannot be None or empty")
38+
39+
self._url = url
40+
self._response_type = RavenCommandResponseType.EMPTY
41+
42+
def create_request(self, node: ServerNode) -> requests.Request:
43+
return requests.Request("GET", f"{node.url}/databases/{node.database}/{self._url}")
44+
45+
def process_response(self, cache: HttpCache, response: requests.Response, url) -> ResponseDisposeHandling:
46+
try:
47+
result = StreamResultResponse(response, response.iter_lines())
48+
self.result = result
49+
50+
return ResponseDisposeHandling.MANUALLY
51+
except Exception as e:
52+
raise RuntimeError("Unable to process stream response", e)
53+
54+
def send(self, session: requests.Session, request: requests.Request) -> requests.Response:
55+
return session.request(
56+
request.method,
57+
url=request.url,
58+
data=request.data,
59+
files=request.files,
60+
cert=session.cert,
61+
headers=request.headers,
62+
stream=True,
63+
)
64+
65+
def is_read_request(self) -> bool:
66+
return True
67+
68+
69+
class QueryStreamCommand(RavenCommand[StreamResultResponse]):
70+
def __init__(self, conventions: DocumentConventions, query: IndexQuery):
71+
super(QueryStreamCommand, self).__init__(StreamResultResponse)
72+
73+
if conventions is None:
74+
raise ValueError("Conventions cannot be None")
75+
76+
if query is None:
77+
raise ValueError("Query cannot be None")
78+
79+
self._conventions = conventions
80+
self._index_query = query
81+
82+
self._response_type = RavenCommandResponseType.EMPTY
83+
84+
def create_request(self, node: ServerNode) -> requests.Request:
85+
request = requests.Request("POST")
86+
request.data = JsonExtensions.write_index_query(self._conventions, self._index_query)
87+
request.url = f"{node.url}/databases/{node.database}/streams/queries?format=jsonl"
88+
return request
89+
90+
def process_response(self, cache: HttpCache, response: requests.Response, url) -> ResponseDisposeHandling:
91+
try:
92+
stream_response = StreamResultResponse(response, response.iter_lines())
93+
self.result = stream_response
94+
95+
return ResponseDisposeHandling.MANUALLY
96+
except Exception as e:
97+
raise RuntimeError("Unable to process stream response: " + e.args[0], e)
98+
99+
def send(self, session: requests.Session, request: requests.Request) -> requests.Response:
100+
return session.request(
101+
request.method,
102+
url=request.url,
103+
data=request.data,
104+
files=request.files,
105+
cert=session.cert,
106+
headers=request.headers,
107+
stream=True,
108+
)
109+
110+
def is_read_request(self) -> bool:
111+
return True

ravendb/documents/operations/executor.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525

2626

2727
_T = TypeVar("_T")
28-
_Operation_T = TypeVar("_Operation_T")
28+
_T_Operation_Return = TypeVar("_T_Operation_Return")
2929

3030

3131
class OperationExecutor:
@@ -41,7 +41,7 @@ def for_database(self, database_name: str) -> OperationExecutor:
4141
return self
4242
return OperationExecutor(self._store, database_name)
4343

44-
def send(self, operation: IOperation[_Operation_T], session_info: SessionInfo = None) -> _Operation_T:
44+
def send(self, operation: IOperation[_T_Operation_Return], session_info: SessionInfo = None) -> _T_Operation_Return:
4545
command = operation.get_command(self._store, self._request_executor.conventions, self._request_executor.cache)
4646
self._request_executor.execute_command(command, session_info)
4747
return None if isinstance(operation, VoidOperation) else command.result
@@ -136,8 +136,8 @@ def for_database(self, database_name: str) -> MaintenanceOperationExecutor:
136136
return MaintenanceOperationExecutor(self._store, database_name)
137137

138138
def send(
139-
self, operation: Union[VoidMaintenanceOperation, MaintenanceOperation[_Operation_T]]
140-
) -> Optional[_Operation_T]:
139+
self, operation: Union[VoidMaintenanceOperation, MaintenanceOperation[_T_Operation_Return]]
140+
) -> Optional[_T_Operation_Return]:
141141
self._assert_database_name_set()
142142
command = operation.get_command(self.request_executor.conventions)
143143
self.request_executor.execute_command(command)

ravendb/documents/session/document_session.py

Lines changed: 127 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,21 @@
88
import os
99
import time
1010
import uuid
11-
from typing import Union, Callable, TYPE_CHECKING, Optional, Dict, List, Type, TypeVar, Tuple, Generic, Set
11+
from typing import (
12+
Union,
13+
Callable,
14+
TYPE_CHECKING,
15+
Optional,
16+
Dict,
17+
List,
18+
Type,
19+
TypeVar,
20+
Iterator,
21+
Iterable,
22+
Tuple,
23+
Generic,
24+
Set,
25+
)
1226

1327
from ravendb.documents.session.document_session_revisions import DocumentSessionRevisions
1428
from ravendb.primitives import constants
@@ -22,7 +36,11 @@
2236
GetMultipleTimeSeriesOperation,
2337
TimeSeriesConfiguration,
2438
)
39+
from ravendb.documents.commands.stream import StreamResult
2540
from ravendb.documents.session.conditional_load import ConditionalLoadResult
41+
from ravendb.documents.session.operations.stream import StreamOperation
42+
from ravendb.documents.session.stream_statistics import StreamQueryStatistics
43+
from ravendb.documents.session.tokens.query_tokens.definitions import FieldsToFetchToken
2644
from ravendb.documents.session.time_series import (
2745
TimeSeriesEntry,
2846
TypedTimeSeriesEntry,
@@ -58,6 +76,7 @@
5876
from ravendb.documents.session.loaders.loaders import LoaderWithInclude, MultiLoaderWithInclude
5977
from ravendb.documents.session.operations.lazy import LazyLoadOperation, LazySessionOperations
6078
from ravendb.documents.session.operations.operations import MultiGetOperation, LoadStartingWithOperation
79+
from ravendb.documents.session.operations.query import QueryOperation
6180
from ravendb.documents.session.misc import (
6281
SessionOptions,
6382
ResponseTimeInformation,
@@ -67,7 +86,7 @@
6786
SessionInfo,
6887
TransactionMode,
6988
)
70-
from ravendb.documents.session.query import DocumentQuery, RawDocumentQuery
89+
from ravendb.documents.session.query import DocumentQuery, RawDocumentQuery, AbstractDocumentQuery
7190
from ravendb.json.metadata_as_dictionary import MetadataAsDictionary
7291
from ravendb.documents.session.operations.load_operation import LoadOperation
7392
from ravendb.tools.time_series import TSRangeHelper
@@ -1030,23 +1049,117 @@ def add_or_increment(self, key: str, entity: object, path_to_object: str, val_to
10301049
patch_command_data.create_if_missing = new_instance
10311050
self.defer(patch_command_data)
10321051

1033-
def stream(
1034-
self,
1035-
query_or_raw_query, # : Union[RawDocumentQuery, DocumentQuery],
1036-
stream_query_stats, # : Optional[StreamQueryStatistics] = None,
1037-
) -> iter:
1038-
pass
1052+
def stream(self, query_or_raw_query: AbstractDocumentQuery[_T]) -> Iterator[StreamResult[_T]]:
1053+
stream_operation = StreamOperation(self._session)
1054+
command = stream_operation.create_request(query_or_raw_query.index_query)
1055+
1056+
self.request_executor.execute_command(command, self.session_info)
1057+
1058+
result = stream_operation.set_result(command.result)
1059+
1060+
return self._yield_result(query_or_raw_query, result)
1061+
1062+
def stream_with_statistics(
1063+
self, query_or_raw_query: AbstractDocumentQuery[_T], stats_callback: Callable[[StreamQueryStatistics], None]
1064+
) -> Iterator[StreamResult[_T]]:
1065+
stats = StreamQueryStatistics()
1066+
stream_operation = StreamOperation(self._session, stats)
1067+
command = stream_operation.create_request(query_or_raw_query.index_query)
1068+
1069+
self.request_executor.execute_command(command, self.session_info)
1070+
1071+
result = stream_operation.set_result(command.result)
1072+
1073+
stats_callback(stats)
1074+
1075+
return self._yield_result(query_or_raw_query, result)
10391076

10401077
def stream_starting_with(
10411078
self,
1042-
object_type: type,
10431079
starts_with: str,
1044-
matches: str = None,
1080+
matches: Optional[str] = None,
10451081
start: int = 0,
1046-
page_size: int = 25,
1047-
starting_after: str = None,
1048-
) -> iter:
1049-
pass
1082+
page_size: int = constants.int_max,
1083+
start_after: Optional[str] = None,
1084+
object_type: Optional[Type[_T]] = None,
1085+
) -> Iterable[StreamResult[_T]]:
1086+
stream_operation = StreamOperation(self._session)
1087+
1088+
command = stream_operation.create_request_for_starts_with(
1089+
starts_with, matches, start, page_size, None, start_after
1090+
)
1091+
self.request_executor.execute_command(command, self.session_info)
1092+
1093+
result = stream_operation.set_result(command.result)
1094+
return DocumentSession._Advanced._StreamIterator(self, result, None, False, None, object_type)
1095+
1096+
class _StreamIterator(Iterator):
1097+
def __init__(
1098+
self,
1099+
session_advanced_reference: DocumentSession._Advanced,
1100+
inner_iterator: Iterator[Dict],
1101+
fields_to_fetch_token: Optional[FieldsToFetchToken],
1102+
is_project_into: bool,
1103+
on_next_item: Optional[Callable[[Dict], None]] = None,
1104+
object_type: Optional[Type[_T]] = None,
1105+
):
1106+
self._session_advanced_reference = session_advanced_reference
1107+
self._inner_iterator = inner_iterator
1108+
self._fields_to_fetch_token = fields_to_fetch_token
1109+
self._is_project_into = is_project_into
1110+
self._on_next_item = on_next_item
1111+
self._object_type = object_type
1112+
1113+
def __iter__(self):
1114+
return self
1115+
1116+
def __next__(self):
1117+
next_value = self._inner_iterator.__next__()
1118+
try:
1119+
if self._on_next_item is not None:
1120+
self._on_next_item(next_value)
1121+
return DocumentSession._Advanced._create_stream_result(
1122+
self._session_advanced_reference,
1123+
next_value,
1124+
self._fields_to_fetch_token,
1125+
self._is_project_into,
1126+
self._object_type,
1127+
)
1128+
except Exception as e:
1129+
raise RuntimeError(f"Unable to parse stream result: {e.args[0]}", e)
1130+
1131+
def __exit__(self, exc_type, exc_val, exc_tb):
1132+
pass
1133+
1134+
def _create_stream_result(
1135+
self,
1136+
json_dict: Dict,
1137+
fields_to_fetch: FieldsToFetchToken,
1138+
is_project_into: bool,
1139+
object_type: Optional[Type[_T]],
1140+
) -> StreamResult[_T]:
1141+
metadata = json_dict.get(constants.Documents.Metadata.KEY)
1142+
change_vector = metadata.get(constants.Documents.Metadata.CHANGE_VECTOR)
1143+
1144+
# MapReduce indexes return reduce results tht don't have @id property
1145+
key = metadata.get(constants.Documents.Metadata.ID, None)
1146+
1147+
entity = QueryOperation.deserialize(
1148+
object_type, key, json_dict, metadata, fields_to_fetch, True, self._session, is_project_into
1149+
)
1150+
1151+
stream_result = StreamResult(key, change_vector, MetadataAsDictionary(metadata), entity)
1152+
return stream_result
1153+
1154+
def _yield_result(self, query: AbstractDocumentQuery, enumerator: Iterator[Dict]) -> Iterator[StreamResult[_T]]:
1155+
return DocumentSession._Advanced._StreamIterator(
1156+
self,
1157+
enumerator,
1158+
query._fields_to_fetch_token,
1159+
query.is_project_into,
1160+
query.invoke_after_stream_executed,
1161+
object_type=query.query_class,
1162+
)
10501163

10511164
def stream_into(self): # query: Union[DocumentQuery, RawDocumentQuery], output: iter):
10521165
pass

ravendb/documents/session/operations/query.py

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -150,25 +150,25 @@ def deserialize(
150150
key: str,
151151
document: dict,
152152
metadata: dict,
153-
fields_to_fetch: FieldsToFetchToken,
153+
fields_to_fetch_token: FieldsToFetchToken,
154154
disable_entities_tracking: bool,
155155
session: "InMemoryDocumentSessionOperations",
156156
is_project_into: bool,
157-
):
157+
) -> _T:
158158
projection = metadata.get("@projection")
159159
if not projection:
160160
return session.track_entity(object_type, key, document, metadata, disable_entities_tracking)
161161

162162
if (
163-
fields_to_fetch is not None
164-
and fields_to_fetch.projections is not None
165-
and len(fields_to_fetch.projections) == 1
163+
fields_to_fetch_token is not None
164+
and fields_to_fetch_token.projections is not None
165+
and len(fields_to_fetch_token.projections) == 1
166166
):
167-
projection_field = fields_to_fetch.projections[0]
167+
projection_field = fields_to_fetch_token.projections[0]
168168

169-
if fields_to_fetch.source_alias is not None:
170-
if projection_field.startswith(fields_to_fetch.source_alias):
171-
projection_field = projection_field[len(fields_to_fetch.source_alias) + 1 :]
169+
if fields_to_fetch_token.source_alias is not None:
170+
if projection_field.startswith(fields_to_fetch_token.source_alias):
171+
projection_field = projection_field[len(fields_to_fetch_token.source_alias) + 1 :]
172172

173173
if projection_field.startswith("'"):
174174
projection_field = projection_field[1:-1]
@@ -188,8 +188,8 @@ def deserialize(
188188
return Utils.get_default_value(object_type)
189189

190190
if (
191-
fields_to_fetch.fields_to_fetch is not None
192-
and fields_to_fetch.fields_to_fetch[0] == fields_to_fetch.projections[0]
191+
fields_to_fetch_token.fields_to_fetch is not None
192+
and fields_to_fetch_token.fields_to_fetch[0] == fields_to_fetch_token.projections[0]
193193
):
194194
document = inner
195195

0 commit comments

Comments
 (0)