From 48d306a50e50f9c4f0332275686cd44559583a22 Mon Sep 17 00:00:00 2001 From: Thomas Grainger Date: Thu, 11 Apr 2019 14:30:36 +0100 Subject: [PATCH] add flow_collect, a version of collect that respects flow control Fixes #238 --- src/treq/content.py | 97 ++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 96 insertions(+), 1 deletion(-) diff --git a/src/treq/content.py b/src/treq/content.py index c6ac1406..389b61e2 100644 --- a/src/treq/content.py +++ b/src/treq/content.py @@ -3,7 +3,7 @@ import cgi import json -from twisted.internet.defer import Deferred, succeed +from twisted.internet.defer import Deferred, succeed, inlineCallbacks from twisted.internet.protocol import Protocol from twisted.web.client import ResponseDone @@ -65,6 +65,101 @@ def collect(response, collector): return d +from twisted.internet import defer + +from twisted.internet.protocol import Protocol +from twisted.web.client import ResponseDone +from twisted.web.http import PotentialDataLoss + + +class _FlowBodyCollector(Protocol): + def __init__(self, finished, collector): + self.buffer = b'' + self.writing = False + self.finished = finished + self.collector = collector + + @inlineCallbacks + def dataReceived(self, data): + try: + self.buffer += data + if self.writing: + return + + w = Deferred() + self.writing = w + self.transport.pauseProducing() + while self.buffer: + bufferred = self.buffer + self.buffer = b'' + yield self.collector(bufferred) + self.writing = False + self.transport.resumeProducing() + w.callback(None) + except Exception as e: + self.finished.errback(e) + self.transport.stopProducing() + + + @inlineCallbacks + def connectionLost(self, reason): + if self.finished.called: + return + yield self.writing + + if reason.check(ResponseDone): + self.finished.callback(None) + elif reason.check(PotentialDataLoss): + # http://twistedmatrix.com/trac/ticket/4840 + self.finished.callback(None) + else: + self.finished.errback(reason) + + +def flow_collect(response, collector): + """ + Incrementally collect the body of the response. Respecting flow control. + + This function may only be called **once** for a given response. + + :param IResponse response: The HTTP response to collect the body from. + :param collector: A callable that returns a deferred to be called each time + data is available from the response body. + :type collector: single argument callable + + :rtype: Deferred that fires with None when the entire body has been read. + """ + if response.length == 0: + return succeed(None) + + d = Deferred() + response.deliverBody(_FlowBodyCollector(d, collector)) + return d + + +@inlineCallbacks +def reduce(reducer, response, initializer): + """ + Incrementally collect the body of the response. Respecting flow control. + + This function may only be called **once** for a given response. + + :param IResponse response: The HTTP response to collect the body from. + :param reducer: A reducer function called with accumulator and next data + :type collector: two argument callable + + :rtype: Deferred that fires with the accumulator when the entire body has been read. + """ + ref = [initializer, ] # py2 nonlocal + + @inlineCallbacks + def collector(data): + ref[0] = yield reducer(ref[0], data) + + yield collect(response, collector) + defer.returnValue(ref[0]) + + def content(response): """ Read the contents of an HTTP response.