Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Additional protocols for remote resources #408

Draft
wants to merge 7 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 24 additions & 4 deletions konduit-serving-pipeline/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,17 @@
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<!--repositories>
<repository> <id>mapr-releases</id>
<repositories>
<!--repository> <id>mapr-releases</id>
<url>http://repository.mapr.com/maven/</url>
<snapshots><enabled>false</enabled></snapshots>
<releases><enabled>true</enabled></releases>
</repository>
</repositories-->
</repository-->
<repository>
<id>oss.sonatype.org</id>
<url>https://oss.sonatype.org/content/repositories/snapshots/</url>
</repository>
</repositories>

<dependencies>

Expand All @@ -36,6 +40,22 @@
<artifactId>konduit-serving-annotation</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.jcabi</groupId>
<artifactId>jcabi-aspects</artifactId>
<version>0.22.6</version>
<exclusions>
<exclusion>
<groupId>javax.validation</groupId>
<artifactId>validation-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-csv</artifactId>
<version>1.4</version>
</dependency>
<!--dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package ai.konduit.serving.pipeline.api.protocol;

import lombok.AllArgsConstructor;
import lombok.Data;

@AllArgsConstructor
@Data
public class Credentials {
private String accessKey;
private String secretKey;
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,16 @@

package ai.konduit.serving.pipeline.api.protocol;

import com.jcabi.aspects.RetryOnFailure;
import lombok.extern.slf4j.Slf4j;
import lombok.val;
import org.apache.commons.csv.CSVFormat;
import org.apache.commons.csv.CSVRecord;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.FilenameUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.nd4j.common.primitives.Pair;

import java.io.*;
import java.net.URI;
Expand All @@ -40,39 +46,140 @@ public static boolean isUrl(String input) {
return false;
}

public static void removeOutdatedCacheEntries(File metaFile) throws IOException {
String lifeTimeProp = System.getProperty("konduit.serving.cache.lifetime");
if (StringUtils.isEmpty(lifeTimeProp))
return;
final int daysTimeout = Integer.parseInt(lifeTimeProp);
if (daysTimeout <= 0)
return;
File tempFile = new File(cacheDirectory, "metafile.temp");

try (BufferedReader in = new BufferedReader(new FileReader(metaFile));
PrintWriter writer = new PrintWriter(new OutputStreamWriter(new FileOutputStream(tempFile)))) {
String line = StringUtils.EMPTY;
while((line =in.readLine())!=null) {
Iterable<CSVRecord> records = CSVFormat.DEFAULT.parse(new StringReader(line));
for (CSVRecord record : records) {
long accessedTimestamp = Long.parseLong(record.get(3));
long elapsedMillis = System.currentTimeMillis() - accessedTimestamp;
long elapsedDays = elapsedMillis / (1000 * 60 * 60 * 24);
if (elapsedDays <= daysTimeout) {
writer.write(line);
} else {
log.info("Removing outdated cached file " + record.get(0));
new File(record.get(0)).delete();
}
}
}
FileUtils.moveFile(tempFile, metaFile);
}
}

private static File cacheDirectory;
private static File metaFile;
static {
File f = new File(System.getProperty("user.home"), ".konduit_cache/");
File f = new File(System.getProperty("user.home"), StringUtils.defaultIfEmpty(
System.getProperty("konduit.serving.cache.location"),
".konduit_cache/"));
if (!f.exists())
f.mkdirs();
cacheDirectory = f;
metaFile = new File(cacheDirectory, ".metadata");
try {
if (!metaFile.exists()) {
metaFile.createNewFile();
}
removeOutdatedCacheEntries(metaFile);
} catch (IOException e) {
log.error("Cache initialization failed", e);
}
}


public static File getCachedFile(String uri) {
URI u = URI.create(uri);
String fullPath = StringUtils.defaultIfEmpty(u.getScheme(), StringUtils.EMPTY);
System.out.println(u.getPath());
String[] dirs = u.getPath().split("/");
for (int i = 0; i < dirs.length-1; ++i) {
fullPath += File.separator + dirs[i];
}
fullPath += File.separator + FilenameUtils.getName(uri);
File effectiveDirectory = new File(cacheDirectory, fullPath);
return effectiveDirectory;
}

public static File getFile(String uri) throws IOException {
@RetryOnFailure(attempts = 3)
public static Pair<Long,Integer> getUrlProperties(URL url) throws IOException {
URLConnection connection = url.openConnection();
Pair<Long,Integer> retVal = new Pair<>();
retVal.setFirst(connection.getLastModified());
retVal.setSecond(connection.getContentLength());
return retVal;
}

private static File load(URL url, File cachedFile) throws IOException {

val urlProps = getUrlProperties(url);

int contentLength = 0;
long lastModified = 0;

BufferedReader in = new BufferedReader(new FileReader(metaFile));
String line = StringUtils.EMPTY;
while ((line = in.readLine()) != null) {
Iterable<CSVRecord> records = CSVFormat.DEFAULT
.parse(new StringReader(line));
for (CSVRecord record : records) {
if (record.get(0).equals(cachedFile.getAbsolutePath())) {
contentLength = Integer.parseInt(record.get(1));
lastModified = Long.parseLong(record.get(2));
break;
}
}
}
if (lastModified > 0 && urlProps.getFirst() == lastModified && urlProps.getSecond() == contentLength) {
// File is in cache and its timestamps are the same as of remote resource
return cachedFile;
}
else {
String warnOnly = StringUtils.defaultIfEmpty(System.getProperty("konduit.serving.cache.validation.warnonly"),"true");
if (warnOnly.equals("true")) {
log.error("Cached file " + cachedFile.getAbsolutePath() + " has inconsistent state.");
return cachedFile;
}
else {
log.error("Cached file " + cachedFile.getAbsolutePath() + " has inconsistent state and will be removed");
cachedFile.delete();
}
}
// File was either just deleted or didn't exist, so writing metadata here and caching in
// the calling method.
String metaData = cachedFile.getAbsolutePath() + "," + urlProps.getFirst() + "," +
urlProps.getSecond() + "," + System.currentTimeMillis() + System.lineSeparator();
FileUtils.writeStringToFile(metaFile, metaData, "UTF-8", true);
return null;
}

public static File getFile(String uri) throws IOException {
URI u = URI.create(uri);
String scheme = u.getScheme();
return getFile(u);
}

public static File getFile(URI uri) throws IOException {
String scheme = uri.getScheme();
if (scheme.equals("file")) {
return new File(u.getPath());
return new File(uri.getPath());
}
File cachedFile = getCachedFile(uri);
File cachedFile = getCachedFile(uri.getPath());

URL url = uri.toURL();
if (cachedFile.exists()) {
return cachedFile;
File verifiedFile = load(url, cachedFile);
if (verifiedFile != null) {
return verifiedFile;
}
}

URL url = u.toURL();
URLConnection connection = url.openConnection();
FileUtils.copyURLToFile(url, cachedFile);

return cachedFile;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package ai.konduit.serving.pipeline.api.protocol;

import java.io.IOException;
import java.io.InputStream;
import java.net.URISyntaxException;
import java.net.URL;

public interface URLAccessProvider {

Credentials getCredentials();

InputStream connect(URL url, Credentials credentials) throws IOException, URISyntaxException;
}
128 changes: 128 additions & 0 deletions konduit-serving-protocol/konduit-serving-protocol-hdfs/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>konduit-serving</artifactId>
<groupId>ai.konduit.serving</groupId>
<version>0.1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>konduit-serving-protocol-s3</artifactId>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<repositories>
<repository>
<id>mapr-releases</id>
<url>http://repository.mapr.com/maven/</url>
<snapshots><enabled>false</enabled></snapshots>
<releases><enabled>true</enabled></releases>
</repository>
<!--repository>
<id>oss.sonatype.org</id>
<url>https://oss.sonatype.org/content/repositories/snapshots/</url>
</repository-->
</repositories>

<dependencies>
<dependency>
<groupId>ai.konduit.serving</groupId>
<artifactId>konduit-serving-annotation</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<version>2.4.1-mapr-1408</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.4.1-mapr-1408</version>
</dependency>

<!-- TODO - do we replace this with non-shaded Jackson? May need OSGi to avoid dependency conflicts however -->
<dependency>
<groupId>org.nd4j</groupId>
<artifactId>jackson</artifactId>
<version>${dl4j.version}</version>
</dependency>

<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>

<!-- Test dependencies -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-minicluster</artifactId>
<scope>test</scope>
<version>3.2.1</version>
</dependency>

<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>${logback.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>ai.konduit.serving</groupId>
<artifactId>konduit-serving-pipeline</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>ai.konduit.serving</groupId>
<artifactId>konduit-serving-common-tests</artifactId>
<scope>test</scope>
<version>${project.version}</version>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<artifactId>maven-resources-plugin</artifactId>
<version>2.6</version>
<executions>
<execution>
<id>copy-resources</id>
<phase>process-resources</phase>
<goals>
<goal>copy-resources</goal>
</goals>
<configuration>
<outputDirectory>${basedir}/target/resources</outputDirectory>
<resources>
<resource>
<directory>resources</directory>
</resource>
</resources>
</configuration>
</execution>

</executions>
</plugin>
</plugins>
</build>
</project>
Loading