Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: extract metadata for resources from entities and align attributes names with dcat #34

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,13 @@ public CKANBackend(String apiKey, String[] ckanHost, String ckanPort,
cache = new CKANCache(ckanHost, ckanPort, ssl, apiKey);
} // CKANBackendImpl

public void persist(String orgName, String pkgName, String pkgTitle, String resName, String records, DCATMetadata dcatMetadata, boolean createDataStore)
public void persist(String orgName, String pkgName, String resName, String records, DCATMetadata dcatMetadata, boolean createDataStore)
throws Exception {

logger.info("Going to lookup for the resource id, the cache may be updated during the process (orgName=\"{}\", " +
"pkgName=\"{}\", resName=\"{}\"" , orgName, pkgName, resName);

String resId = resourceLookupOrCreateDynamicFields(orgName, pkgName, pkgTitle, resName,records, dcatMetadata,createDataStore);
String resId = resourceLookupOrCreateDynamicFields(orgName, pkgName, resName,records, dcatMetadata,createDataStore);
if (resId == null) {
throw new Exception("Cannot persist the data (orgName=" + orgName + ", pkgName=" + pkgName
+ ", resName=" + resName + ")");
Expand All @@ -72,7 +72,7 @@ public void persist(String orgName, String pkgName, String pkgTitle, String resN
* @param resName The resource name to be created or lookup to
* @param records Te records to be inserted and used to create the datastore fields
*/
private String resourceLookupOrCreateDynamicFields(String orgName, String pkgName, String pkgTitle, String resName, String records,DCATMetadata dcatMetadata, boolean createDataStore)
private String resourceLookupOrCreateDynamicFields(String orgName, String pkgName, String resName, String records,DCATMetadata dcatMetadata, boolean createDataStore)
throws Exception {
if (!cache.isCachedOrg(orgName)) {
logger.info("The organization was not cached nor existed in CKAN (orgName=\"{}\")", orgName);
Expand All @@ -81,7 +81,7 @@ private String resourceLookupOrCreateDynamicFields(String orgName, String pkgNam
cache.setOrgId(orgName, orgId);
logger.info("Created new organization in CKAN (orgName=\"{}\", orgId=\"{}\")", orgName, orgId);

String pkgId = createPackage(pkgName, pkgTitle, orgId, dcatMetadata);
String pkgId = createPackage(pkgName, orgId, dcatMetadata);
cache.addPkg(orgName, pkgName);
cache.setPkgId(orgName, pkgName, pkgId);
String resId = createResource(resName, pkgId, dcatMetadata);
Expand All @@ -100,7 +100,7 @@ private String resourceLookupOrCreateDynamicFields(String orgName, String pkgNam
if (!cache.isCachedPkg(orgName, pkgName)) {
logger.info("The package was not cached nor existed in CKAN (orgName=\"{}\", pkgName=\"{}\")", orgName, pkgName);

String pkgId = createPackage(pkgName, pkgTitle, cache.getOrgId(orgName), dcatMetadata);
String pkgId = createPackage(pkgName, cache.getOrgId(orgName), dcatMetadata);
cache.addPkg(orgName, pkgName);
cache.setPkgId(orgName, pkgName, pkgId);
String resId = createResource(resName, pkgId, dcatMetadata);
Expand Down Expand Up @@ -200,11 +200,10 @@ private String createOrganization(String orgName, DCATMetadata dcatMetadata) thr
/**
* Creates a dataset/package within a given organization in CKAN.
* @param pkgName Package to be created
* @param pkgTitle Package title
* @param orgId Organization the package belongs to
* @return A package identifier if the package was created or an exception if something went wrong
*/
private String createPackage(String pkgName, String pkgTitle, String orgId, DCATMetadata dcatMetadata) throws Exception {
private String createPackage(String pkgName, String orgId, DCATMetadata dcatMetadata) throws Exception {
// create the CKAN request JSON
JsonArray extrasJsonArray = new JsonArray();
JsonArray tagsJsonArray = new JsonArray();
Expand All @@ -215,68 +214,64 @@ private String createPackage(String pkgName, String pkgTitle, String orgId, DCAT
JsonObject dataJson = new JsonObject();
dataJson.addProperty("name",pkgName);
dataJson.addProperty("owner_org",orgId);
dataJson.addProperty("title", pkgTitle);
if (dcatMetadata!=null){
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why remove the null check?

dataJson.addProperty("notes",dcatMetadata.getPackageDescription());
dataJson.addProperty("version",dcatMetadata.getVersion());
dataJson.addProperty("url",dcatMetadata.getLandingPage());
dataJson.addProperty("visibility",dcatMetadata.getVisibility());
dataJson.addProperty("url",dcatMetadata.getLandingPage());


extrasJson.addProperty("key","publisher_type");
extrasJson.addProperty("value",dcatMetadata.getOrganizationType());
extrasJsonArray.add(extrasJson);
extrasJson=new JsonObject();
extrasJson.addProperty("key","contact_uri");
extrasJson.addProperty("value",dcatMetadata.getContactPoint());
extrasJsonArray.add(extrasJson);
extrasJson=new JsonObject();
extrasJson.addProperty("key","contact_name");
extrasJson.addProperty("value",dcatMetadata.getContactName());
extrasJsonArray.add(extrasJson);
extrasJson=new JsonObject();
extrasJson.addProperty("key","contact_email");
extrasJson.addProperty("value",dcatMetadata.getContactEmail());
extrasJsonArray.add(extrasJson);
extrasJson=new JsonObject();
extrasJson.addProperty("key","spatial_uri");
extrasJson.addProperty("value",dcatMetadata.getSpatialUri());
extrasJsonArray.add(extrasJson);
extrasJson=new JsonObject();
extrasJson.addProperty("key","spatial");
extrasJson.addProperty("value",dcatMetadata.getSpatialCoverage());
extrasJsonArray.add(extrasJson);
extrasJson=new JsonObject();
extrasJson.addProperty("key","temporal_start");
extrasJson.addProperty("value",dcatMetadata.getTemporalStart());
extrasJsonArray.add(extrasJson);
extrasJson=new JsonObject();
extrasJson.addProperty("key","temporal_end");
extrasJson.addProperty("value",dcatMetadata.getTemporalEnd());
extrasJsonArray.add(extrasJson);
extrasJson=new JsonObject();
extrasJson.addProperty("key","theme");
extrasJson.addProperty("value",dcatMetadata.getThemes());
extrasJsonArray.add(extrasJson);
extrasJson=new JsonObject();
extrasJson.addProperty("key","access_rights");
extrasJson.addProperty("value",dcatMetadata.getDatasetRights());
extrasJsonArray.add(extrasJson);


//espacio para tags
keywords=dcatMetadata.getKeywords();
for (String tag: keywords){
tags=new JsonObject();
//tags.addProperty("vocabulary_id","null");
tags.addProperty("name",tag);
tagsJsonArray.add(tags);
}
//extrasJsonArray.add(extrasJson);}
dataJson.add("extras",extrasJsonArray);
dataJson.add("tags",tagsJsonArray);
dataJson.addProperty("title", dcatMetadata.getPackageName());
dataJson.addProperty("notes", dcatMetadata.getPackageDescription());
dataJson.addProperty("version",dcatMetadata.getVersion());
dataJson.addProperty("url",dcatMetadata.getLandingPage());
dataJson.addProperty("visibility",dcatMetadata.getVisibility());

extrasJson.addProperty("key","publisher_type");
extrasJson.addProperty("value",dcatMetadata.getOrganizationType());
extrasJsonArray.add(extrasJson);
extrasJson=new JsonObject();
extrasJson.addProperty("key","contact_uri");
extrasJson.addProperty("value",dcatMetadata.getContactPoint());
extrasJsonArray.add(extrasJson);
extrasJson=new JsonObject();
extrasJson.addProperty("key","contact_name");
extrasJson.addProperty("value",dcatMetadata.getContactName());
extrasJsonArray.add(extrasJson);
extrasJson=new JsonObject();
extrasJson.addProperty("key","contact_email");
extrasJson.addProperty("value",dcatMetadata.getContactEmail());
extrasJsonArray.add(extrasJson);
extrasJson=new JsonObject();
extrasJson.addProperty("key","spatial_uri");
extrasJson.addProperty("value",dcatMetadata.getSpatialUri());
extrasJsonArray.add(extrasJson);
extrasJson=new JsonObject();
extrasJson.addProperty("key","spatial");
extrasJson.addProperty("value",dcatMetadata.getSpatialCoverage());
extrasJsonArray.add(extrasJson);
extrasJson=new JsonObject();
extrasJson.addProperty("key","temporal_start");
extrasJson.addProperty("value",dcatMetadata.getTemporalStart());
extrasJsonArray.add(extrasJson);
extrasJson=new JsonObject();
extrasJson.addProperty("key","temporal_end");
extrasJson.addProperty("value",dcatMetadata.getTemporalEnd());
extrasJsonArray.add(extrasJson);
extrasJson=new JsonObject();
extrasJson.addProperty("key","theme");
extrasJson.addProperty("value",dcatMetadata.getThemes());
extrasJsonArray.add(extrasJson);
extrasJson=new JsonObject();
extrasJson.addProperty("key","access_rights");
extrasJson.addProperty("value",dcatMetadata.getDatasetRights());
extrasJsonArray.add(extrasJson);


//espacio para tags
keywords=dcatMetadata.getKeywords();
for (String tag: keywords){
tags=new JsonObject();
//tags.addProperty("vocabulary_id","null");
tags.addProperty("name",tag);
tagsJsonArray.add(tags);
}
//extrasJsonArray.add(extrasJson);}
dataJson.add("extras",extrasJsonArray);
dataJson.add("tags",tagsJsonArray);
logger.debug("dataJson: {}",dataJson);
// create the CKAN request URL
String urlPath = "/api/3/action/package_create";
Expand Down Expand Up @@ -495,14 +490,14 @@ public String buildOrgName(String organizationName, DCATMetadata dcatMetadata) t
* conventions are violated.
* @return Package name
*/
public String buildPkgName(String pkgTitle, DCATMetadata dcatMetadata) throws Exception {
public String buildPkgName(DCATMetadata dcatMetadata) throws Exception {
String pkgName;
String finalPackageName;

if (dcatMetadata != null && dcatMetadata.getPackageName() != null) {
if (dcatMetadata.getPackageName() != null) {
finalPackageName = dcatMetadata.getPackageName().toLowerCase(Locale.ENGLISH);
} else {
finalPackageName = pkgTitle.toLowerCase(Locale.ENGLISH);
throw new Exception("No package name found in the metadata!");
}

pkgName = NGSICharsets.encodeCKAN(finalPackageName);
Expand All @@ -528,8 +523,7 @@ public String buildResName(Entity entity, DCATMetadata dcatMetadata) throws Exce
if (dcatMetadata != null && dcatMetadata.getResourceName() != null) {
resName = dcatMetadata.getResourceName();
} else {
String entityTitle = ngsiUtils.getSpecificAttributeValue(entity, "title");
resName = entityTitle != null ? entityTitle : entity.getEntityId();
resName = entity.getEntityId();
}

if (resName.length() > NGSIConstants.CKAN_MAX_NAME_LEN) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,11 @@ public void aggregate(Entity entity, long creationTime) {
getLogger().debug("DCAT metadata: {}" , dcatMetadata);

for (Entity entity : entities) {
final String pkgTitle = flowFile.getAttribute("datasetTitle");
final String pkgName = ckanBackend.buildPkgName(pkgTitle, dcatMetadata);

// Update DCATMetadata with Distribution entity attributes
buildDCATMetadata.addMetadataFromEntity(entity, dcatMetadata);

final String pkgName = ckanBackend.buildPkgName(dcatMetadata);
final String resName = ckanBackend.buildResName(entity, dcatMetadata);
aggregator.initialize(entity);
aggregator.aggregate(entity, creationTime);
Expand All @@ -189,7 +192,7 @@ public void aggregate(Entity entity, long creationTime) {
getLogger().info("Persisting data at NGSICKANSink: orgName=" + orgName
+ ", pkgName=" + pkgName + ", resName=" + resName + ", data=" + aggregation);

ckanBackend.persist(orgName, pkgName, pkgTitle, resName, aggregation, dcatMetadata,createDataStore);
ckanBackend.persist(orgName, pkgName, resName, aggregation, dcatMetadata, createDataStore);
} // for

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,8 @@ public String getResourceName() {
return resourceName;
}

public void setResourceName(String resourceName) {this.resourceName = resourceName;}

public String getResourceRights() {
return resourceRights;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,16 +139,14 @@ private void addAttributeIfValid(List<AttributesLD> attributes, AttributesLD att
attributes.add(attribute);
}

public String getSpecificAttributeValue(Entity entity, String attributeName) {
public static String getSpecificAttributeValue(Entity entity, String attributeName) {
ArrayList<AttributesLD> entityAttributes = entity.getEntityAttrsLD();
for(AttributesLD attr : entityAttributes) {
if(attr.getAttrName().toLowerCase().equals(attributeName.toLowerCase())) {
return attr.getAttrValue();
}
for (AttributesLD attr : entityAttributes) {
if (attr.getAttrName().equalsIgnoreCase(attributeName))
return attr.getAttrValue();
}
logger.info("Did not find attribute " + attributeName + " in entity " + entity.getEntityId());
logger.info("Did not find attribute {} in entity {}", attributeName, entity.getEntityId());

return null;

}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package egm.io.nifi.processors.ckan.utils;

import egm.io.nifi.processors.ckan.model.DCATMetadata;
import egm.io.nifi.processors.ckan.ngsild.Entity;
import egm.io.nifi.processors.ckan.ngsild.NGSIUtils;
import org.apache.commons.collections.map.CaseInsensitiveMap;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessSession;
Expand All @@ -19,13 +21,13 @@ public DCATMetadata getMetadataFromFlowFile(FlowFile flowFile, final ProcessSess
// Create the PreparedStatement to use for this FlowFile.
Map<String, String> flowFileAttributes = flowFile.getAttributes();
Map<String, String> newFlowFileAttributes = new CaseInsensitiveMap(flowFileAttributes);
String [] keywords = newFlowFileAttributes.get("keywords").replace("[", "").replace("]","").replaceAll("\"","").split(",");
String [] keywords = newFlowFileAttributes.get("keyword").replace("[", "").replace("]","").replaceAll("\"","").split(",");

return new DCATMetadata(
newFlowFileAttributes.get("organizationName"),
newFlowFileAttributes.get("organizationType"),
newFlowFileAttributes.get("packageDescription"),
newFlowFileAttributes.get("packageName"),
newFlowFileAttributes.get("datasetTitle"),
newFlowFileAttributes.get("contactPoint"),
newFlowFileAttributes.get("contactName"),
newFlowFileAttributes.get("contactEmail"),
Expand All @@ -40,17 +42,30 @@ public DCATMetadata getMetadataFromFlowFile(FlowFile flowFile, final ProcessSess
newFlowFileAttributes.get("landingPage"),
newFlowFileAttributes.get("visibility"),
newFlowFileAttributes.get("datasetRights"),
newFlowFileAttributes.get("accessURL"),
newFlowFileAttributes.get("availability"),
newFlowFileAttributes.get("resourceDescription"),
null,
null,
null,
JSON_LD_FORMAT,
newFlowFileAttributes.get("mimeType"),
newFlowFileAttributes.get("license"),
newFlowFileAttributes.get("licenseType"),
newFlowFileAttributes.get("downloadURL"),
newFlowFileAttributes.get("byteSize"),
newFlowFileAttributes.get("resourceName"),
newFlowFileAttributes.get("resourceRights")
null,
null,
null,
null,
null,
null,
null
);
}

public void addMetadataFromEntity(Entity entity, DCATMetadata dcatMetadata) {
dcatMetadata.setAccessURL(NGSIUtils.getSpecificAttributeValue(entity, "accessURL"));
dcatMetadata.setAvailability(NGSIUtils.getSpecificAttributeValue(entity, "availability"));
dcatMetadata.setMimeType(NGSIUtils.getSpecificAttributeValue(entity, "mediaType"));
dcatMetadata.setLicense(NGSIUtils.getSpecificAttributeValue(entity, "license"));
dcatMetadata.setDownloadURL(NGSIUtils.getSpecificAttributeValue(entity, "downloadURL"));
dcatMetadata.setByteSize(NGSIUtils.getSpecificAttributeValue(entity, "byteSize"));
dcatMetadata.setResourceRights(NGSIUtils.getSpecificAttributeValue(entity, "rights"));
dcatMetadata.setResourceDescription(NGSIUtils.getSpecificAttributeValue(entity, "description"));
dcatMetadata.setResourceName(NGSIUtils.getSpecificAttributeValue(entity, "title"));
dcatMetadata.setLicenseType(NGSIUtils.getSpecificAttributeValue(entity, "licenseType"));
}
}
Loading