Skip to content

Commit

Permalink
Track time spent in the FetcherBolt queues, fixes #535
Browse files Browse the repository at this point in the history
  • Loading branch information
jnioche committed Feb 27, 2018
1 parent d2455c4 commit 0a690e5
Showing 1 changed file with 14 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -105,14 +105,14 @@ private static class FetchItem {

String queueID;
String url;
URL u;
Tuple t;
long creationTime;

public FetchItem(String url, URL u, Tuple t, String queueID) {
public FetchItem(String url, Tuple t, String queueID) {
this.url = url;
this.u = u;
this.queueID = queueID;
this.t = t;
this.creationTime = System.currentTimeMillis();
}

/**
Expand All @@ -135,7 +135,7 @@ public static FetchItem create(URL u, Tuple t, String queueMode) {
}
if (StringUtils.isNotBlank(key)) {
queueID = key.toLowerCase(Locale.ROOT);
return new FetchItem(url, u, t, queueID);
return new FetchItem(url, t, queueID);
}

if (FetchItemQueues.QUEUE_MODE_IP.equalsIgnoreCase(queueMode)) {
Expand Down Expand Up @@ -168,7 +168,7 @@ public static FetchItem create(URL u, Tuple t, String queueMode) {
}

queueID = key.toLowerCase(Locale.ROOT);
return new FetchItem(url, u, t, queueID);
return new FetchItem(url, t, queueID);
}

}
Expand Down Expand Up @@ -493,7 +493,7 @@ taskID, getName(), activeThreads, spinWaiting,
}
}

if (!rules.isAllowed(fit.u.toString())) {
if (!rules.isAllowed(fit.url)) {
LOG.info("Denied by robots.txt: {}", fit.url);
// pass the info about denied by robots
metadata.setValue(Constants.STATUS_ERROR_CAUSE,
Expand Down Expand Up @@ -535,13 +535,18 @@ fit.t, new Values(fit.url,
}

long start = System.currentTimeMillis();
long timeInQueues = start - fit.creationTime;

ProtocolResponse response = protocol.getProtocolOutput(
fit.url, metadata);

long timeFetching = System.currentTimeMillis() - start;

final int byteLength = response.getContent().length;

averagedMetrics.scope("fetch_time").update(timeFetching);
averagedMetrics.scope("time_in_queues")
.update(timeInQueues);
averagedMetrics.scope("bytes_fetched").update(byteLength);
perSecMetrics.scope("bytes_fetched_perSec").update(
byteLength);
Expand All @@ -563,6 +568,9 @@ fit.t, new Values(fit.url,
response.getMetadata().setValue("fetch.loadingTime",
Long.toString(timeFetching));

response.getMetadata().setValue("fetch.timeInQueues",
Long.toString(timeInQueues));

// determine the status based on the status code
final Status status = Status.fromHTTPCode(response
.getStatusCode());
Expand Down

0 comments on commit 0a690e5

Please sign in to comment.