Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SLING-9017 ErrorQueueDispatchingStrategy ends up using an incorrect … #33

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -253,19 +253,25 @@ protected SimpleDistributionAgent createAgent(String agentName, BundleContext co

DistributionPackageExporter packageExporter = new LocalDistributionPackageExporter(packageBuilder);

DistributionQueueProvider queueProvider;
DistributionQueueProvider queueProvider, errorQueueProvider;
String queueProviderName = PropertiesUtil.toString(config.get(QUEUE_PROVIDER), JobHandlingDistributionQueueProvider.TYPE);
if (JobHandlingDistributionQueueProvider.TYPE.equals(queueProviderName)) {
queueProvider = new JobHandlingDistributionQueueProvider(agentName, jobManager, context, configAdmin);
errorQueueProvider = queueProvider;
} else if (SimpleDistributionQueueProvider.TYPE.equals(queueProviderName)) {
queueProvider = new SimpleDistributionQueueProvider(scheduler, agentName, false);
errorQueueProvider = queueProvider;
} else if (ResourceQueueProvider.TYPE.equals(queueProviderName)) {
queueProvider = new ResourceQueueProvider(context,
resourceResolverFactory, SimpleDistributionAgent.DEFAULT_AGENT_SERVICE, agentName, scheduler, true);
errorQueueProvider = new ResourceQueueProvider(context,
resourceResolverFactory, SimpleDistributionAgent.DEFAULT_AGENT_SERVICE, agentName, scheduler, false);
} else { // when SimpleDistributionQueueProvider.TYPE_CHECKPOINT is "queueProviderName"
queueProvider = new SimpleDistributionQueueProvider(scheduler, agentName, true);
errorQueueProvider = queueProvider;
}
queueProvider = new MonitoringDistributionQueueProvider(queueProvider, context);
errorQueueProvider = new MonitoringDistributionQueueProvider(errorQueueProvider, context);

DistributionQueueDispatchingStrategy exportQueueStrategy;
DistributionQueueDispatchingStrategy errorQueueStrategy = null;
Expand Down Expand Up @@ -317,12 +323,14 @@ protected SimpleDistributionAgent createAgent(String agentName, BundleContext co
int retryAttepts = PropertiesUtil.toInteger(config.get(RETRY_ATTEMPTS), 100);

if ("errorQueue".equals(retryStrategy)) {
errorQueueStrategy = new ErrorQueueDispatchingStrategy(processingQueues.toArray(new String[processingQueues.size()]));
errorQueueStrategy = new ErrorQueueDispatchingStrategy(processingQueues.toArray(new String[processingQueues.size()]),
errorQueueProvider);
}

return new SimpleDistributionAgent(agentName, queueProcessingEnabled, processingQueues,
serviceName, packageImporter, packageExporter, requestAuthorizationStrategy,
queueProvider, exportQueueStrategy, errorQueueStrategy, distributionEventFactory, resourceResolverFactory, slingRepository,
queueProvider, exportQueueStrategy, errorQueueStrategy, errorQueueProvider,
distributionEventFactory, resourceResolverFactory, slingRepository,
distributionLog, allowedRequests, allowedRoots, retryAttepts);


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,8 @@ protected SimpleDistributionAgent createAgent(String agentName, BundleContext co

return new SimpleDistributionAgent(agentName, false, null,
serviceName, null, packageExporter, requestAuthorizationStrategy,
monitoringQueueProvider, exportQueueStrategy, null, distributionEventFactory, resourceResolverFactory, slingRepository,
monitoringQueueProvider, exportQueueStrategy, null, null,
distributionEventFactory, resourceResolverFactory, slingRepository,
distributionLog, allowedRequests, allowedRoots, 0);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,16 +203,14 @@ protected SimpleDistributionAgent createAgent(String agentName, BundleContext co
DistributionQueueProvider queueProvider = new MonitoringDistributionQueueProvider(new JobHandlingDistributionQueueProvider(agentName, jobManager, context), context);

DistributionQueueDispatchingStrategy exportQueueStrategy = new SingleQueueDispatchingStrategy();
DistributionQueueDispatchingStrategy importQueueStrategy = null;

DistributionRequestType[] allowedRequests = new DistributionRequestType[]{DistributionRequestType.PULL};
Set<String> processingQueues = new HashSet<String>();
processingQueues.addAll(exportQueueStrategy.getQueueNames());


return new SimpleDistributionAgent(agentName, queueProcessingEnabled, processingQueues,
serviceName, packageImporter, packageExporter, requestAuthorizationStrategy,
queueProvider, exportQueueStrategy, importQueueStrategy, distributionEventFactory, resourceResolverFactory, slingRepository, distributionLog, allowedRequests, null, 0);
queueProvider, exportQueueStrategy, null, null, distributionEventFactory, resourceResolverFactory, slingRepository, distributionLog, allowedRequests, null, 0);

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public class SimpleDistributionAgent implements DistributionAgent {
final static String DEFAULT_AGENT_SERVICE = "defaultAgentService";

private final DistributionQueueProvider queueProvider;
private final DistributionQueueProvider errorQueueProvider;
private final DistributionPackageImporter distributionPackageImporter;
private final DistributionPackageExporter distributionPackageExporter;
private final DistributionQueueDispatchingStrategy scheduleQueueStrategy;
Expand Down Expand Up @@ -92,6 +93,7 @@ public SimpleDistributionAgent(String name,
DistributionQueueProvider queueProvider,
DistributionQueueDispatchingStrategy scheduleQueueStrategy,
DistributionQueueDispatchingStrategy errorQueueStrategy,
DistributionQueueProvider errorQueueProvider,
DistributionEventFactory distributionEventFactory,
ResourceResolverFactory resourceResolverFactory,
SlingRepository slingRepository,
Expand All @@ -113,6 +115,7 @@ public SimpleDistributionAgent(String name,
this.distributionPackageExporter = distributionPackageExporter;
this.queueProvider = queueProvider;
this.scheduleQueueStrategy = scheduleQueueStrategy;
this.errorQueueProvider = errorQueueProvider;
this.errorQueueStrategy = errorQueueStrategy;
this.distributionEventFactory = distributionEventFactory;
this.agentAuthenticationInfo = new SimpleDistributionAgentAuthenticationInfo(slingRepository, DEFAULT_AGENT_SERVICE, resourceResolverFactory, subServiceName);
Expand Down Expand Up @@ -229,26 +232,41 @@ private CompositeDistributionResponse exportPackages(ResourceResolver agentResou
return new CompositeDistributionResponse(distributionResponses, packagesCount, packagesSize, endTime - startTime);
}

@NotNull
public Set<String> getQueueNames() {
private Set<String> getScheduledQueueNames() {
Set<String> queueNames = new TreeSet<String>();
queueNames.addAll(scheduleQueueStrategy.getQueueNames());
return queueNames;
}

private Set<String> getErrorQueueNames() {
Set<String> queueNames = new TreeSet<String>();
if (errorQueueStrategy != null) {
queueNames.addAll(errorQueueStrategy.getQueueNames());
}
return queueNames;
}

@NotNull
public Set<String> getQueueNames() {
Set<String> queueNames = getScheduledQueueNames();
queueNames.addAll(getErrorQueueNames());
return queueNames;
}

public DistributionQueue getQueue(@NotNull final String queueName) {
Set<String> queues = getQueueNames();
if (!queues.contains(queueName)) {
Set<String> scheduledQueues = getScheduledQueueNames();
Set<String> errorQueues = getErrorQueueNames();
boolean isErrorQueue = errorQueues.contains(queueName);
boolean isScheduledQueue = scheduledQueues.contains(queueName);
if (!isErrorQueue && !isScheduledQueue) {
return null;
}

DistributionQueue queue = null;

try {
queue = queueProvider.getQueue(queueName);
queue = isErrorQueue? errorQueueProvider.getQueue(queueName)
: queueProvider.getQueue(queueName);
} catch (DistributionException e) {
log.error("cannot get queue", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,14 +167,12 @@ protected SimpleDistributionAgent createAgent(String agentName, BundleContext co

DistributionQueueProvider queueProvider = new JobHandlingDistributionQueueProvider(agentName, jobManager, context);
DistributionQueueDispatchingStrategy exportQueueStrategy = new SingleQueueDispatchingStrategy();
DistributionQueueDispatchingStrategy importQueueStrategy = null;

Set<String> processingQueues = new HashSet<String>();
processingQueues.addAll(exportQueueStrategy.getQueueNames());

return new SimpleDistributionAgent(agentName, queueProcessingEnabled, processingQueues,
serviceName, packageImporter, packageExporter, requestAuthorizationStrategy,
queueProvider, exportQueueStrategy, importQueueStrategy, distributionEventFactory, resourceResolverFactory, slingRepository,
queueProvider, exportQueueStrategy, null, null, distributionEventFactory, resourceResolverFactory, slingRepository,
distributionLog, null, null, 0);

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,21 +239,23 @@ protected SimpleDistributionAgent createAgent(String agentName, BundleContext co

DistributionPackageExporter packageExporter = new RemoteDistributionPackageExporter(distributionLog, packageBuilder,
transportSecretProvider, exporterEndpoints, pullItems, httpConfiguration);
DistributionQueueProvider queueProvider = new MonitoringDistributionQueueProvider(new JobHandlingDistributionQueueProvider(agentName, jobManager, context), context);
DistributionQueueProvider exportQueueProvider = new MonitoringDistributionQueueProvider(new JobHandlingDistributionQueueProvider(agentName, jobManager, context), context);
DistributionRequestType[] allowedRequests = new DistributionRequestType[]{DistributionRequestType.PULL};

String retryStrategy = SettingsUtils.removeEmptyEntry(PropertiesUtil.toString(config.get(RETRY_STRATEGY), null));
int retryAttepts = PropertiesUtil.toInteger(config.get(RETRY_ATTEMPTS), 100);


DistributionQueueProvider importQueueProvider = null;
if ("errorQueue".equals(retryStrategy)) {
importQueueStrategy = new ErrorQueueDispatchingStrategy(processingQueues.toArray(new String[processingQueues.size()]));
importQueueProvider = new MonitoringDistributionQueueProvider(new JobHandlingDistributionQueueProvider(agentName, jobManager, context), context);
}


return new SimpleDistributionAgent(agentName, queueProcessingEnabled, processingQueues,
serviceName, packageImporter, packageExporter, requestAuthorizationStrategy,
queueProvider, exportQueueStrategy, importQueueStrategy, distributionEventFactory, resourceResolverFactory, slingRepository,
exportQueueProvider, exportQueueStrategy, importQueueStrategy, importQueueProvider,
distributionEventFactory, resourceResolverFactory, slingRepository,
distributionLog, allowedRequests, null, retryAttepts);

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.TreeSet;
import org.apache.sling.distribution.common.DistributionException;
Expand All @@ -47,13 +48,24 @@ public class ErrorQueueDispatchingStrategy implements DistributionQueueDispatchi

public final static String ERROR_PREFIX = "error-";
private final Set<String> queueNames = new TreeSet<String>();
private DistributionQueueProvider queueProvider;

public ErrorQueueDispatchingStrategy(String[] queueNames, @NotNull DistributionQueueProvider queueProvider) {
this(queueNames);
this.queueProvider = queueProvider;
}

public ErrorQueueDispatchingStrategy(String[] queueNames) {
this.queueNames.addAll(Arrays.asList(queueNames));
this.queueProvider = null;
}

@Override
public Iterable<DistributionQueueItemStatus> add(@NotNull DistributionPackage distributionPackage, @NotNull DistributionQueueProvider queueProvider) throws DistributionException {
public Iterable<DistributionQueueItemStatus> add(@NotNull DistributionPackage distributionPackage,
@NotNull DistributionQueueProvider suppliedQueueProvider) throws DistributionException {

DistributionQueueProvider queueProvider = Optional.ofNullable(this.queueProvider)
.orElse(suppliedQueueProvider);

List<DistributionQueueItemStatus> result = new ArrayList<DistributionQueueItemStatus>();
String originQueue = DistributionPackageUtils.getQueueName(distributionPackage.getInfo());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public void testDistributionEnable() throws Exception {
SimpleDistributionAgent agent = new SimpleDistributionAgent(name,
false, null, "serviceName", packageImporter,
packageExporter, packageExporterStrategy,
queueProvider, distributionHandler, null,
queueProvider, distributionHandler, null, null,
distributionEventFactory, resolverFactory, mock(SlingRepository.class), mock(DefaultDistributionLog.class), null, null, 0);

TestDistributionTrigger trigger = new TestDistributionTrigger();
Expand All @@ -106,7 +106,7 @@ public void testDistributionWithFailingDistributionStrategy() throws Exception {
SimpleDistributionAgent agent = new SimpleDistributionAgent(name,
false, null, "serviceName", packageImporter,
packageExporter, packageExporterStrategy,
queueProvider, distributionHandler, null,
queueProvider, distributionHandler, null, null,
distributionEventFactory, resolverFactory, mock(SlingRepository.class), mock(DefaultDistributionLog.class), null, null, 0);
DistributionRequest request = new SimpleDistributionRequest(DistributionRequestType.ADD, "/");
final DistributionPackage distributionPackage = mock(DistributionPackage.class);
Expand Down Expand Up @@ -144,8 +144,8 @@ public void testDistributionWithWorkingDistributionStrategy() throws Exception {
SimpleDistributionAgent agent = new SimpleDistributionAgent(name,
false, null, "subServiceName", packageImporter,
packageExporter, packageExporterStrategy,
queueProvider,
distributionHandler, null, distributionEventFactory, resolverFactory, mock(SlingRepository.class),
queueProvider, distributionHandler, null, null,
distributionEventFactory, resolverFactory, mock(SlingRepository.class),
mock(DefaultDistributionLog.class), null, null, 0);
DistributionRequest request = new SimpleDistributionRequest(DistributionRequestType.ADD, "/");
final DistributionPackage distributionPackage = mock(DistributionPackage.class);
Expand Down Expand Up @@ -187,7 +187,7 @@ public void testDistribution() throws Exception {
SimpleDistributionAgent agent = new SimpleDistributionAgent(name,
false, null, "serviceName", packageImporter,
packageExporter, requestAuthorizationStrategy,
queueProvider, dispatchingStrategy, null,
queueProvider, dispatchingStrategy, null, null,
distributionEventFactory, resolverFactory, mock(SlingRepository.class),
mock(DefaultDistributionLog.class), null, null, 0);
DistributionRequest request = new SimpleDistributionRequest(DistributionRequestType.ADD, "/");
Expand Down Expand Up @@ -225,7 +225,7 @@ public void testGetExistingNamedQueue() throws Exception {
SimpleDistributionAgent agent = new SimpleDistributionAgent(name,
false, null, "serviceName", packageImporter,
packageExporter, authorizationStrategy,
queueProvider, dispatchingStrategy, null,
queueProvider, dispatchingStrategy, null, null,
distributionEventFactory, resolverFactory, mock(SlingRepository.class),
mock(DefaultDistributionLog.class), null, null, 0);
DistributionQueue queue = mock(DistributionQueue.class);
Expand All @@ -248,7 +248,7 @@ public void testGetNonExistingNamedQueue() throws Exception {
SimpleDistributionAgent agent = new SimpleDistributionAgent(name,
false, null, "serviceName", packageImporter,
packageExporter, packageExporterStrategy,
queueProvider, distributionHandler, null,
queueProvider, distributionHandler, null, null,
distributionEventFactory, resolverFactory, mock(SlingRepository.class),
mock(DefaultDistributionLog.class), null, null, 0);
DistributionQueue queue = mock(DistributionQueue.class);
Expand Down Expand Up @@ -277,7 +277,7 @@ public void testDistributionWithAllowedRoot() throws Exception {
SimpleDistributionAgent agent = new SimpleDistributionAgent(name,
false, null, "serviceName", packageImporter,
packageExporter, packageExporterStrategy,
queueProvider, queueDistributionStrategy, null,
queueProvider, queueDistributionStrategy, null, null,
distributionEventFactory, resolverFactory, mock(SlingRepository.class),
mock(DefaultDistributionLog.class), null, new String[] { "/content" }, 0);

Expand Down Expand Up @@ -323,7 +323,7 @@ public void testDistributionWithDisallowedRoot() throws Exception {
SimpleDistributionAgent agent = new SimpleDistributionAgent(name,
false, null, "serviceName", packageImporter,
packageExporter, packageExporterStrategy,
queueProvider, queueDistributionStrategy, null,
queueProvider, queueDistributionStrategy, null, null,
distributionEventFactory, resolverFactory, mock(SlingRepository.class),
mock(DefaultDistributionLog.class), null, new String[] { "/content" }, 0);

Expand Down