Skip to content

Commit

Permalink
[MODINV-1162] Send userId in headers for modules clients requests (#810)
Browse files Browse the repository at this point in the history
  • Loading branch information
RomanChernetskyi authored Feb 7, 2025
1 parent 35d60dd commit 6463989
Show file tree
Hide file tree
Showing 22 changed files with 827 additions and 67 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package org.folio.inventory.client.util;

import io.vertx.core.buffer.Buffer;
import io.vertx.ext.web.client.HttpRequest;
import io.vertx.ext.web.client.WebClient;
import org.folio.rest.tools.ClientHelpers;
import io.vertx.core.http.HttpMethod;

import static org.folio.inventory.dataimport.handlers.matching.util.EventHandlingUtil.OKAPI_TENANT;
import static org.folio.inventory.dataimport.handlers.matching.util.EventHandlingUtil.OKAPI_TOKEN;
import static org.folio.inventory.dataimport.handlers.matching.util.EventHandlingUtil.OKAPI_URL;
import static org.folio.inventory.dataimport.handlers.matching.util.EventHandlingUtil.OKAPI_USER_ID;

/**
* Utility class for handling client wrapper operations.
*/
public final class ClientWrapperUtil {
public static final String CONTENT_TYPE = "Content-type";
public static final String APPLICATION_JSON = "application/json";
public static final String ACCEPT = "Accept";
public static final String APPLICATION_JSON_TEXT_PLAIN = "application/json,text/plain";

private ClientWrapperUtil() {
}

/**
* Creates an HTTP request with the specified method and URL, and populates it with Okapi headers.
*
* @param method the HTTP method to use (e.g., GET, POST, PUT)
* @param url the URL for the request
* @param okapiUrl the Okapi URL
* @param tenantId the tenant ID
* @param token the authentication token
* @param userId the user ID
* @param webClient the WebClient instance to use for creating the request
* @return the created HTTP request with populated headers
*/
public static HttpRequest<Buffer> createRequest(HttpMethod method, String url, String okapiUrl, String tenantId,
String token, String userId, WebClient webClient) {
HttpRequest<Buffer> request = webClient.requestAbs(method, url);
populateOkapiHeaders(request, okapiUrl, tenantId, token, userId);
return request;
}

/**
* Converts an object to a JSON buffer.
*
* @param object the object to convert
* @return the JSON buffer
*/
public static Buffer getBuffer(Object object) {
Buffer buffer = Buffer.buffer();
if (object != null) {
buffer.appendString(ClientHelpers.pojo2json(object));
}
return buffer;
}

/**
* Populates the Okapi headers for the given HTTP request.
*
* @param request the HTTP request to populate headers for
* @param okapiUrl the Okapi URL
* @param tenantId the tenant ID
* @param token the authentication token
* @param userId the user ID
*/
private static void populateOkapiHeaders(HttpRequest<Buffer> request, String okapiUrl, String tenantId, String token, String userId) {
request.putHeader(CONTENT_TYPE, APPLICATION_JSON);
request.putHeader(ACCEPT, APPLICATION_JSON_TEXT_PLAIN);

if (tenantId != null) {
request.putHeader(OKAPI_TOKEN, token);
request.putHeader(OKAPI_TENANT, tenantId);
}

if (userId != null) {
request.putHeader(OKAPI_USER_ID, userId);
}

if (okapiUrl != null) {
request.putHeader(OKAPI_URL, okapiUrl);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package org.folio.inventory.client.wrappers;

import io.vertx.core.Future;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpClient;
import io.vertx.core.http.HttpMethod;
import io.vertx.ext.web.client.HttpResponse;
import io.vertx.ext.web.client.WebClient;
import org.folio.rest.client.ChangeManagerClient;
import org.folio.rest.jaxrs.model.InitJobExecutionsRqDto;
import org.folio.rest.jaxrs.model.JobExecution;
import org.folio.rest.jaxrs.model.JobProfileInfo;
import org.folio.rest.jaxrs.model.ParsedRecordDto;
import org.folio.rest.jaxrs.model.RawRecordsDto;
import org.folio.rest.jaxrs.model.StatusDto;

import static org.folio.inventory.client.util.ClientWrapperUtil.createRequest;
import static org.folio.inventory.client.util.ClientWrapperUtil.getBuffer;

/**
* Wrapper class for ChangeManagerClient to handle POST and PUT HTTP requests with x-okapi-user-id header.
*/
public class ChangeManagerClientWrapper extends ChangeManagerClient {
private final String tenantId;
private final String token;
private final String okapiUrl;
private final String userId;
private final WebClient webClient;
public static final String CHANGE_MANAGER_JOB_EXECUTIONS = "/change-manager/jobExecutions/";
public static final String CHANGE_MANAGER_PARSED_RECORDS = "/change-manager/parsedRecords/";

public ChangeManagerClientWrapper(String okapiUrl, String tenantId, String token, String userId, HttpClient httpClient) {
super(okapiUrl, tenantId, token, httpClient);
this.okapiUrl = okapiUrl;
this.tenantId = tenantId;
this.token = token;
this.userId = userId;
this.webClient = WebClient.wrap(httpClient);
}

@Override
public Future<HttpResponse<Buffer>> postChangeManagerJobExecutions(InitJobExecutionsRqDto initJobExecutionsRqDto) {
return createRequest(HttpMethod.POST, okapiUrl + "/change-manager/jobExecutions", okapiUrl, tenantId, token, userId, webClient)
.sendBuffer(getBuffer(initJobExecutionsRqDto));
}

@Override
public Future<HttpResponse<Buffer>> postChangeManagerJobExecutionsRecordsById(String id, boolean acceptInstanceId, RawRecordsDto rawRecordsDto) {
StringBuilder queryParams = new StringBuilder("?");
queryParams.append("acceptInstanceId=");
queryParams.append(acceptInstanceId);

return createRequest(HttpMethod.POST, okapiUrl + CHANGE_MANAGER_JOB_EXECUTIONS + id + "/records" + queryParams,
okapiUrl, tenantId, token, userId, webClient)
.sendBuffer(getBuffer(rawRecordsDto));
}

@Override
public Future<HttpResponse<Buffer>> putChangeManagerJobExecutionsById(String id, JobExecution jobExecution) {
return createRequest(HttpMethod.PUT, okapiUrl + CHANGE_MANAGER_JOB_EXECUTIONS + id, okapiUrl, tenantId, token, userId, webClient)
.sendBuffer(getBuffer(jobExecution));
}

@Override
public Future<HttpResponse<Buffer>> putChangeManagerJobExecutionsJobProfileById(String id, JobProfileInfo jobProfileInfo) {
return createRequest(HttpMethod.PUT, okapiUrl + CHANGE_MANAGER_JOB_EXECUTIONS + id + "/jobProfile",
okapiUrl, tenantId, token, userId, webClient)
.sendBuffer(getBuffer(jobProfileInfo));
}

@Override
public Future<HttpResponse<Buffer>> putChangeManagerJobExecutionsStatusById(String id, StatusDto statusDto) {
return createRequest(HttpMethod.PUT, okapiUrl + CHANGE_MANAGER_JOB_EXECUTIONS + id + "/status",
okapiUrl, tenantId, token, userId, webClient)
.sendBuffer(getBuffer(statusDto));
}

@Override
public Future<HttpResponse<Buffer>> putChangeManagerParsedRecordsById(String id, ParsedRecordDto parsedRecordDto) {
return createRequest(HttpMethod.PUT, okapiUrl + CHANGE_MANAGER_PARSED_RECORDS + id,
okapiUrl, tenantId, token, userId, webClient)
.sendBuffer(getBuffer(parsedRecordDto));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package org.folio.inventory.client.wrappers;

import io.vertx.core.Future;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpClient;
import io.vertx.core.http.HttpMethod;
import io.vertx.ext.web.client.HttpResponse;
import io.vertx.ext.web.client.WebClient;
import org.folio.rest.client.SourceStorageRecordsClient;
import org.folio.rest.jaxrs.model.Record;
import org.folio.util.PercentCodec;

import static org.folio.inventory.client.util.ClientWrapperUtil.createRequest;
import static org.folio.inventory.client.util.ClientWrapperUtil.getBuffer;

/**
* Wrapper class for SourceStorageRecordsClient to handle POST and PUT HTTP requests with x-okapi-user-id header.
*/
public class SourceStorageRecordsClientWrapper extends SourceStorageRecordsClient {
private final String tenantId;
private final String token;
private final String okapiUrl;
private final String userId;
private final WebClient webClient;
public static final String SOURCE_STORAGE_RECORDS = "/source-storage/records/";

public SourceStorageRecordsClientWrapper(String okapiUrl, String tenantId, String token, String userId, HttpClient httpClient) {
super(okapiUrl, tenantId, token, httpClient);
this.okapiUrl = okapiUrl;
this.tenantId = tenantId;
this.token = token;
this.userId = userId;
this.webClient = WebClient.wrap(httpClient);
}

@Override
public Future<HttpResponse<Buffer>> postSourceStorageRecords(Record aRecord) {
return createRequest(HttpMethod.POST, okapiUrl + "/source-storage/records", okapiUrl, tenantId, token, userId, webClient)
.sendBuffer(getBuffer(aRecord));
}

@Override
public Future<HttpResponse<Buffer>> putSourceStorageRecordsById(String id, Record aRecord) {
return createRequest(HttpMethod.PUT, okapiUrl + SOURCE_STORAGE_RECORDS + id, okapiUrl, tenantId, token, userId, webClient)
.sendBuffer(getBuffer(aRecord));
}

@Override
public Future<HttpResponse<Buffer>> putSourceStorageRecordsGenerationById(String id, Record aRecord) {
return createRequest(HttpMethod.PUT, okapiUrl + SOURCE_STORAGE_RECORDS + id + "/generation",
okapiUrl, tenantId, token, userId, webClient)
.sendBuffer(getBuffer(aRecord));
}

@Override
public Future<HttpResponse<Buffer>> putSourceStorageRecordsSuppressFromDiscoveryById(String id, String idType, boolean suppress) {
StringBuilder queryParams = new StringBuilder("?");
if (idType != null) {
queryParams.append("idType=");
queryParams.append(PercentCodec.encode(idType));
queryParams.append("&");
}

queryParams.append("suppress=");
queryParams.append(suppress);

return createRequest(HttpMethod.PUT, okapiUrl + SOURCE_STORAGE_RECORDS + id + "/suppress-from-discovery" + queryParams,
okapiUrl, tenantId, token, userId, webClient)
.send();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package org.folio.inventory.client.wrappers;

import io.vertx.core.Future;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpClient;
import io.vertx.core.http.HttpMethod;
import io.vertx.ext.web.client.HttpResponse;
import io.vertx.ext.web.client.WebClient;
import org.folio.rest.client.SourceStorageSnapshotsClient;
import org.folio.rest.jaxrs.model.Snapshot;

import static org.folio.inventory.client.util.ClientWrapperUtil.createRequest;
import static org.folio.inventory.client.util.ClientWrapperUtil.getBuffer;

/**
* Wrapper class for SourceStorageSnapshotsClient to handle POST and PUT HTTP requests with x-okapi-user-id header.
*/
public class SourceStorageSnapshotsClientWrapper extends SourceStorageSnapshotsClient {
private final String tenantId;
private final String token;
private final String okapiUrl;
private final String userId;
private final WebClient webClient;

public SourceStorageSnapshotsClientWrapper(String okapiUrl, String tenantId, String token, String userId, HttpClient httpClient) {
super(okapiUrl, tenantId, token, httpClient);
this.okapiUrl = okapiUrl;
this.tenantId = tenantId;
this.token = token;
this.userId = userId;
this.webClient = WebClient.wrap(httpClient);
}

@Override
public Future<HttpResponse<Buffer>> postSourceStorageSnapshots(Snapshot snapshot) {
return createRequest(HttpMethod.POST, okapiUrl + "/source-storage/snapshots", okapiUrl, tenantId, token, userId, webClient)
.sendBuffer(getBuffer(snapshot));
}

@Override
public Future<HttpResponse<Buffer>> putSourceStorageSnapshotsByJobExecutionId(String jobExecutionId, Snapshot snapshot) {
return createRequest(HttpMethod.PUT, okapiUrl + "/source-storage/snapshots/" + jobExecutionId,
okapiUrl, tenantId, token, userId, webClient)
.sendBuffer(getBuffer(snapshot));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.folio.Link;
import org.folio.LinkingRuleDto;
import org.folio.Record;
import org.folio.inventory.client.wrappers.SourceStorageRecordsClientWrapper;
import org.folio.inventory.common.Context;
import org.folio.inventory.common.api.request.PagingParameters;
import org.folio.inventory.consortium.entities.SharingInstance;
Expand Down Expand Up @@ -282,10 +283,11 @@ Future<String> updateSourceRecordSuppressFromDiscoveryByInstanceId(String instan

public SourceStorageRecordsClient getSourceStorageRecordsClient(String tenant, Map<String, String> kafkaHeaders) {
LOGGER.info("getSourceStorageRecordsClient :: Creating SourceStorageRecordsClient for tenant={}", tenant);
return new SourceStorageRecordsClient(
return new SourceStorageRecordsClientWrapper(
kafkaHeaders.get(OKAPI_URL_HEADER),
tenant,
kafkaHeaders.get(OKAPI_TOKEN_HEADER),
kafkaHeaders.get(OKAPI_USER_ID),
vertx.createHttpClient());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.folio.Record;
import org.folio.inventory.client.wrappers.ChangeManagerClientWrapper;
import org.folio.inventory.consortium.entities.SharingInstance;
import org.folio.kafka.SimpleConfigurationReader;
import org.folio.rest.client.ChangeManagerClient;
Expand Down Expand Up @@ -290,10 +291,11 @@ private RawRecordsDto buildDataChunk(boolean isLast, List<InitialRecord> data) {
}

public ChangeManagerClient getChangeManagerClient(Map<String, String> kafkaHeaders) {
return new ChangeManagerClient(
return new ChangeManagerClientWrapper(
kafkaHeaders.get(URL.toLowerCase()),
kafkaHeaders.get(TENANT.toLowerCase()),
kafkaHeaders.get(TOKEN.toLowerCase()),
kafkaHeaders.get(USER_ID.toLowerCase()),
vertx.createHttpClient());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import org.apache.logging.log4j.Logger;
import org.folio.DataImportEventPayload;
import org.folio.HttpStatus;
import org.folio.inventory.client.wrappers.SourceStorageRecordsClientWrapper;
import org.folio.inventory.client.wrappers.SourceStorageSnapshotsClientWrapper;
import org.folio.inventory.common.Context;
import org.folio.inventory.dataimport.cache.MappingMetadataCache;
import org.folio.inventory.dataimport.util.AdditionalFieldsUtil;
Expand Down Expand Up @@ -94,9 +96,10 @@ protected org.folio.Instance defaultMapRecordToInstance(DataImportEventPayload d
}

protected Future<Instance> saveRecordInSrsAndHandleResponse(DataImportEventPayload payload, Record srcRecord,
Instance instance, InstanceCollection instanceCollection, String tenantId) {
Instance instance, InstanceCollection instanceCollection,
String tenantId, String userId) {
Promise<Instance> promise = Promise.promise();
getSourceStorageRecordsClient(payload.getOkapiUrl(), payload.getToken(), tenantId).postSourceStorageRecords(srcRecord)
getSourceStorageRecordsClient(payload.getOkapiUrl(), payload.getToken(), tenantId, userId).postSourceStorageRecords(srcRecord)
.onComplete(ar -> {
var result = ar.result();
if (ar.succeeded() && result.statusCode() == HttpStatus.HTTP_CREATED.toInt()) {
Expand All @@ -117,9 +120,10 @@ protected Future<Instance> saveRecordInSrsAndHandleResponse(DataImportEventPaylo
}

protected Future<Instance> putRecordInSrsAndHandleResponse(DataImportEventPayload payload, Record srcRecord,
Instance instance, String matchedId, String tenantId) {
Instance instance, String matchedId, String tenantId, String userId) {
Promise<Instance> promise = Promise.promise();
getSourceStorageRecordsClient(payload.getOkapiUrl(), payload.getToken(), tenantId).putSourceStorageRecordsGenerationById(matchedId ,srcRecord)
getSourceStorageRecordsClient(payload.getOkapiUrl(), payload.getToken(), tenantId, userId)
.putSourceStorageRecordsGenerationById(matchedId ,srcRecord)
.onComplete(ar -> {
var result = ar.result();
if (ar.succeeded() && result.statusCode() == HttpStatus.HTTP_OK.toInt()) {
Expand All @@ -140,7 +144,7 @@ protected Future<Instance> putRecordInSrsAndHandleResponse(DataImportEventPayloa

protected Future<Snapshot> postSnapshotInSrsAndHandleResponse(Context context, Snapshot snapshot) {
Promise<Snapshot> promise = Promise.promise();
getSourceStorageSnapshotsClient(context.getOkapiLocation(), context.getToken(), context.getTenantId()).postSourceStorageSnapshots(snapshot)
getSourceStorageSnapshotsClient(context.getOkapiLocation(), context.getToken(), context.getTenantId(), context.getUserId()).postSourceStorageSnapshots(snapshot)
.onComplete(ar -> {
var result = ar.result();
if (ar.succeeded() && result.statusCode() == HttpStatus.HTTP_CREATED.toInt()) {
Expand Down Expand Up @@ -202,12 +206,12 @@ protected void deleteInstance(String id, String jobExecutionId, InstanceCollecti
promise.future();
}

public SourceStorageRecordsClient getSourceStorageRecordsClient(String okapiUrl, String token, String tenantId) {
return new SourceStorageRecordsClient(okapiUrl, tenantId, token, getHttpClient());
public SourceStorageRecordsClient getSourceStorageRecordsClient(String okapiUrl, String token, String tenantId, String userId) {
return new SourceStorageRecordsClientWrapper(okapiUrl, tenantId, token, userId, getHttpClient());
}

public SourceStorageSnapshotsClient getSourceStorageSnapshotsClient(String okapiUrl, String token, String tenantId) {
return new SourceStorageSnapshotsClient(okapiUrl, tenantId, token, getHttpClient());
public SourceStorageSnapshotsClient getSourceStorageSnapshotsClient(String okapiUrl, String token, String tenantId, String userId) {
return new SourceStorageSnapshotsClientWrapper(okapiUrl, tenantId, token, userId, getHttpClient());
}

private Record encodeParsedRecordContent(Record srcRecord) {
Expand Down
Loading

0 comments on commit 6463989

Please sign in to comment.