File tree 4 files changed +55
-1
lines changed
4 files changed +55
-1
lines changed Original file line number Diff line number Diff line change @@ -127,3 +127,5 @@ available as the attribute ``.loop``.
127
127
.. autofunction :: fsspec.asyn.sync_wrapper
128
128
129
129
.. autofunction :: fsspec.asyn.get_loop
130
+
131
+ .. autofunction :: fsspec.asyn.fsspec_loop
Original file line number Diff line number Diff line change @@ -4,7 +4,9 @@ Changelog
4
4
Dev
5
5
---
6
6
7
+ Enhancements
7
8
9
+ - Introduce ``fsspec.asyn.fsspec_loop `` to temporarily switch to the fsspec loop. (#671)
8
10
9
11
10
12
2021.06.0
Original file line number Diff line number Diff line change @@ -105,6 +105,17 @@ def _selector_policy():
105
105
asyncio .set_event_loop_policy (original_policy )
106
106
107
107
108
+ def get_running_loop ():
109
+ if hasattr (asyncio , "get_running_loop" ):
110
+ return asyncio .get_running_loop ()
111
+ else :
112
+ loop = asyncio ._get_running_loop ()
113
+ if loop is None :
114
+ raise RuntimeError ("no running event loop" )
115
+ else :
116
+ return loop
117
+
118
+
108
119
def get_loop ():
109
120
"""Create or return the default fsspec IO loop
110
121
@@ -124,6 +135,25 @@ def get_loop():
124
135
return loop [0 ]
125
136
126
137
138
+ @contextmanager
139
+ def fsspec_loop ():
140
+ """Temporarily switch the current event loop to the fsspec's
141
+ own loop, and then revert it back after the context gets
142
+ terinated.
143
+ """
144
+ try :
145
+ original_loop = get_running_loop ()
146
+ except RuntimeError :
147
+ original_loop = None
148
+
149
+ fsspec_loop = get_loop ()
150
+ try :
151
+ asyncio ._set_running_loop (fsspec_loop )
152
+ yield fsspec_loop
153
+ finally :
154
+ asyncio ._set_running_loop (original_loop )
155
+
156
+
127
157
try :
128
158
import resource
129
159
except ImportError :
Original file line number Diff line number Diff line change 8
8
9
9
import fsspec
10
10
import fsspec .asyn
11
- from fsspec .asyn import _throttled_gather
11
+ from fsspec .asyn import _throttled_gather , get_running_loop
12
12
13
13
14
14
def test_sync_methods ():
@@ -132,3 +132,23 @@ def test_windows_policy():
132
132
# check ensures that we are restoring the old policy back
133
133
# after our change.
134
134
assert isinstance (policy , asyncio .DefaultEventLoopPolicy )
135
+
136
+
137
+ def test_fsspec_loop ():
138
+ asyncio ._set_running_loop (None )
139
+
140
+ with fsspec .asyn .fsspec_loop () as loop :
141
+ assert get_running_loop () is loop
142
+ assert get_running_loop () is fsspec .asyn .get_loop ()
143
+
144
+ with pytest .raises (RuntimeError ):
145
+ get_running_loop ()
146
+
147
+ original_loop = asyncio .new_event_loop ()
148
+ asyncio ._set_running_loop (original_loop )
149
+
150
+ with fsspec .asyn .fsspec_loop () as loop :
151
+ assert get_running_loop () is loop
152
+ assert get_running_loop () is fsspec .asyn .get_loop ()
153
+
154
+ assert get_running_loop () is original_loop
You can’t perform that action at this time.
0 commit comments