Skip to content

Commit

Permalink
add flow_collect, a version of collect that respects flow control Fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
graingert authored Apr 11, 2019
1 parent 2ed0d46 commit 48d306a
Showing 1 changed file with 96 additions and 1 deletion.
97 changes: 96 additions & 1 deletion src/treq/content.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import cgi
import json

from twisted.internet.defer import Deferred, succeed
from twisted.internet.defer import Deferred, succeed, inlineCallbacks

from twisted.internet.protocol import Protocol
from twisted.web.client import ResponseDone
Expand Down Expand Up @@ -65,6 +65,101 @@ def collect(response, collector):
return d


from twisted.internet import defer

from twisted.internet.protocol import Protocol
from twisted.web.client import ResponseDone
from twisted.web.http import PotentialDataLoss


class _FlowBodyCollector(Protocol):
def __init__(self, finished, collector):
self.buffer = b''
self.writing = False
self.finished = finished
self.collector = collector

@inlineCallbacks
def dataReceived(self, data):
try:
self.buffer += data
if self.writing:
return

w = Deferred()
self.writing = w
self.transport.pauseProducing()
while self.buffer:
bufferred = self.buffer
self.buffer = b''
yield self.collector(bufferred)
self.writing = False
self.transport.resumeProducing()
w.callback(None)
except Exception as e:
self.finished.errback(e)
self.transport.stopProducing()


@inlineCallbacks
def connectionLost(self, reason):
if self.finished.called:
return
yield self.writing

if reason.check(ResponseDone):
self.finished.callback(None)
elif reason.check(PotentialDataLoss):
# http://twistedmatrix.com/trac/ticket/4840
self.finished.callback(None)
else:
self.finished.errback(reason)


def flow_collect(response, collector):
"""
Incrementally collect the body of the response. Respecting flow control.
This function may only be called **once** for a given response.
:param IResponse response: The HTTP response to collect the body from.
:param collector: A callable that returns a deferred to be called each time
data is available from the response body.
:type collector: single argument callable
:rtype: Deferred that fires with None when the entire body has been read.
"""
if response.length == 0:
return succeed(None)

d = Deferred()
response.deliverBody(_FlowBodyCollector(d, collector))
return d


@inlineCallbacks
def reduce(reducer, response, initializer):
"""
Incrementally collect the body of the response. Respecting flow control.
This function may only be called **once** for a given response.
:param IResponse response: The HTTP response to collect the body from.
:param reducer: A reducer function called with accumulator and next data
:type collector: two argument callable
:rtype: Deferred that fires with the accumulator when the entire body has been read.
"""
ref = [initializer, ] # py2 nonlocal

@inlineCallbacks
def collector(data):
ref[0] = yield reducer(ref[0], data)

yield collect(response, collector)
defer.returnValue(ref[0])


def content(response):
"""
Read the contents of an HTTP response.
Expand Down

0 comments on commit 48d306a

Please sign in to comment.