Skip to content
Merged

Uc #1193

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion common/client/src/main/java/zingg/common/client/Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ else if (options.get(ClientOptions.CONF).value.endsWith("env")) {
client.init();
// after setting arguments etc. as some of the listeners need it
client.execute();
client.postMetrics();

LOG.warn("Zingg processing has completed");
}
catch(ZinggClientException e) {
Expand Down Expand Up @@ -279,6 +279,7 @@ public IZArgs getArguments() {

public void execute() throws ZinggClientException {
zingg.execute();
postMetrics();
}

public void postMetrics() throws ZinggClientException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ public class FilePipe {
public static final String LOCATION = "location";
public static final String HEADER = "header";
public static final String DELIMITER = "delimiter";


public static final String TABLE = "table";

}
14 changes: 0 additions & 14 deletions common/client/src/main/java/zingg/common/client/pipe/Pipe.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ public class Pipe<D,R,C> implements Serializable{ // St:StructType, Sv:SaveMode
public static final String FORMAT_ELASTIC = "org.elasticsearch.spark.sql";
public static final String FORMAT_EXASOL = "com.exasol.spark";
public static final String FORMAT_BIGQUERY = "bigquery";
public static final String FORMAT_INMEMORY = "inMemory";

String name;
String format;
Expand All @@ -44,25 +43,18 @@ public class Pipe<D,R,C> implements Serializable{ // St:StructType, Sv:SaveMode
String schema;
String mode;





public String getSchema() {
return schema;
}


public void setSchema(String schema) {
this.schema = schema;
}


public String getName() {
return name;
}


@JsonValue
public void setName(String name) {
this.name = name;
Expand All @@ -76,7 +68,6 @@ public String getFormat() {
public void setFormat(String sinkType) {
this.format = sinkType;
}


@JsonValue
public void setProps(Map<String, String> props) {
Expand All @@ -102,23 +93,18 @@ public String get(String key) {
return props.get(key);
}


public String getPreprocessors() {
return preprocessors;
}


public void setPreprocessors(String preprocessors) {
this.preprocessors = preprocessors;
}



public int getId() {
return id;
}


public void setId(int recId) {
this.id = recId;
}
Expand Down
123 changes: 27 additions & 96 deletions common/client/src/main/java/zingg/common/client/util/PipeUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import zingg.common.client.ZFrame;
import zingg.common.client.ZinggClientException;
import zingg.common.client.pipe.FilePipe;
//import zingg.common.client.pipe.InMemoryPipe;
import zingg.common.client.pipe.Pipe;

//import com.datastax.spark.connector.cql.*;
Expand Down Expand Up @@ -50,25 +49,27 @@ public DFReader<D,R,C> getReader(Pipe<D,R,C> p) {
return reader;
}

protected ZFrame<D,R,C> read(DFReader<D,R,C> reader, Pipe<D,R,C> p, boolean addSource) throws ZinggClientException{
public ZFrame<D,R,C> getInput(Pipe<D,R,C> p, DFReader<D,R,C> reader) throws ZinggClientException{
ZFrame<D,R,C> input = null;
LOG.warn("Reading " + p);
try {

if (p.getFormat().equals(Pipe.FORMAT_INMEMORY)) {
input = p.getDataset(); //.df();
}
else {
if (p.getProps().containsKey(FilePipe.LOCATION)) {
input = reader.load(p.get(FilePipe.LOCATION));
}
else {
input = reader.load();
}
}
return input;
}

protected ZFrame<D,R,C> read(DFReader<D,R,C> reader, Pipe<D,R,C> p, boolean addSource) throws ZinggClientException{
ZFrame<D,R,C> input = null;
LOG.warn("Reading " + p);
try {
input = getInput(p, reader);

if (addSource) {
input = input.withColumn(ColName.SOURCE_COL, p.getName());
}

p.setDataset(input);
} catch (Exception ex) {
LOG.warn(ex.getMessage());
Expand Down Expand Up @@ -191,113 +192,43 @@ public void write(ZFrame<D,R,C> toWriteOrig,
for (Pipe<D,R,C> p: pipes) {
//Dataset<Row> toWrite = toWriteOrig.df();
//DataFrameWriter writer = toWrite.write();
DFWriter writer = getWriter(toWriteOrig);
DFWriter<D,R,C> writer = getWriter(toWriteOrig);

LOG.warn("Writing output " + p);

if (p.getFormat().equals(Pipe.FORMAT_INMEMORY)) {
p.setDataset(toWriteOrig);
return;
}
//SparkPipe sPipe = (SparkPipe) p;
if (p.getMode() != null) {
writer.setMode(p.getMode()); //SaveMode.valueOf(p.getMode()));
}
else {
writer.setMode("Append"); //SaveMode.valueOf("Append"));
}
/*
if (p.getFormat().equals(Pipe.FORMAT_ELASTIC)) {
ctx.getConf().set(ElasticPipe.NODE, p.getProps().get(ElasticPipe.NODE));
ctx.getConf().set(ElasticPipe.PORT, p.getProps().get(ElasticPipe.PORT));
ctx.getConf().set(ElasticPipe.ID, ColName.ID_COL);
ctx.getConf().set(ElasticPipe.RESOURCE, p.getName());
}
*/
writer = writer.format(p.getFormat());
writer = getWriterWithFormat(writer, p);

for (String key: p.getProps().keySet()) {
writer = writer.option(key, p.get(key));
}
if (p.getFormat() == Pipe.FORMAT_CASSANDRA) {
/*
ctx.getConf().set(CassandraPipe.HOST, p.getProps().get(CassandraPipe.HOST));
toWrite.sparkSession().conf().set(CassandraPipe.HOST, p.getProps().get(CassandraPipe.HOST));
//df.createCassandraTable(p.get("keyspace"), p.get("table"), opPk, opCl, CassandraConnector.apply(ctx.getConf()));

CassandraConnector connector = CassandraConnector.apply(ctx.getConf());
try (Session session = connector.openSession()) {
ResultSet rs = session.execute("SELECT table_name FROM system_schema.tables WHERE keyspace_name='"
+ p.get(CassandraPipe.KEYSPACE) + "' AND table_name='" + p.get(CassandraPipe.TABLE) + "'");
if (rs.all().size() == 0) {
List<String> pk = new ArrayList<String>();
if (p.get(CassandraPipe.PRIMARY_KEY) != null) {
//pk.add(p.get(CassandraPipe.PRIMARY_KEY));
pk = Arrays.asList(p.get(CassandraPipe.PRIMARY_KEY).split(","));
}
Option<Seq<String>> opPk = Option.apply(JavaConverters.asScalaIteratorConverter(pk.iterator()).asScala().toSeq());
List<String> cl = new ArrayList<String>();

if (p.getAddProps()!= null && p.getAddProps().containsKey("clusterBy")) {
cl=Arrays.asList(p.getAddProps().get("clusterBy").split(","));
}
Option<Seq<String>> opCl = Option.apply(JavaConverters.asScalaIteratorConverter(cl.iterator()).asScala().toSeq());

DataFrameFunctions df = new DataFrameFunctions(toWrite);
LOG.warn("received cassandra table - " + p.get(CassandraPipe.KEYSPACE) + " and " + p.get(CassandraPipe.TABLE));
df.createCassandraTable(p.get(CassandraPipe.KEYSPACE), p.get(CassandraPipe.TABLE), opPk, opCl, CassandraConnector.apply(ctx.getConf()));
if (p.getAddProps()!= null && p.getAddProps().containsKey("indexBy")) {
LOG.warn("creating index on cassandra");

session.execute("CREATE INDEX " + p.getAddProps().get("indexBy") + p.get(CassandraPipe.KEYSPACE) + "_" +
p.get(CassandraPipe.TABLE) + "_idx ON " + p.get(CassandraPipe.KEYSPACE) + "." +
p.get(CassandraPipe.TABLE) + "(" + p.getAddProps().get("indexBy") +
")");
}
}
else {
LOG.warn("existing cassandra table - " + p.get(CassandraPipe.KEYSPACE) + " and " + p.get(CassandraPipe.TABLE));

}

}
catch(Exception e) {
e.printStackTrace();
LOG.warn("Writing issue");
}*/
save(p, writer, toWriteOrig);
}
else if (p.getProps().containsKey("location")) {
} catch (Exception ex) {
throw new ZinggClientException(ex.getMessage());
}
}

public DFWriter<D,R,C> getWriterWithFormat(DFWriter<D,R,C> writer, Pipe<D,R,C> p) {
writer = writer.format(p.getFormat());
return writer;
}

public void save(Pipe<D,R,C> p, DFWriter<D,R,C> writer, ZFrame<D,R,C> toWriteOrig){
if (p.getProps().containsKey("location")) {
LOG.warn("Writing file");
writer.save(p.get(FilePipe.LOCATION));
}
else if (p.getFormat().equals(Pipe.FORMAT_JDBC)){
writer = getWriter(toWriteOrig);
writer = writer.format(p.getFormat());

//SparkPipe sPipe = (SparkPipe) p;
if (p.getMode() != null) {
writer.setMode(p.getMode()); //SaveMode.valueOf(p.getMode()));
}
else {
writer.setMode("Append") ;//SaveMode.valueOf("Append"));
}
for (String key: p.getProps().keySet()) {
writer = writer.option(key, p.get(key));
}
writer.save();
}
else {
else{
writer.save();

}


}
} catch (Exception ex) {
throw new ZinggClientException(ex.getMessage());
}
}

/*
public void writePerSource(Dataset<Row> toWrite, Arguments args, JavaSparkContext ctx, Pipe[] pipes ) throws ZinggClientException {
List<Row> sources = toWrite.select(ColName.SOURCE_COL).distinct().collectAsList();
Expand Down
11 changes: 7 additions & 4 deletions common/core/src/main/java/zingg/common/core/block/Block.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ public abstract class Block<D,R,C,T> implements Serializable {

public static final Log LOG = LogFactory.getLog(Block.class);
private final IHashFunctionUtility<D, R, C, T> hashFunctionUtility;
private FieldDefinitionStrategy<R> fieldDefinitionStrategy;

protected ZFrame<D,R,C> dupes;
// Class[] types;
Expand All @@ -46,11 +47,12 @@ public Block(ZFrame<D,R,C> training, ZFrame<D,R,C> dupes) {
}

public Block(ZFrame<D,R,C> training, ZFrame<D,R,C> dupes,
ListMap<T, HashFunction<D, R, C, T>> functionsMap, long maxSize) {
ListMap<T, HashFunction<D, R, C, T>> functionsMap, long maxSize, FieldDefinitionStrategy<R> fieldDefinitionStrategy) {
this(training, dupes);
this.functionsMap = functionsMap;
// functionsMap.prettyPrint();
this.maxSize = maxSize;
this.fieldDefinitionStrategy = fieldDefinitionStrategy;
}

/**
Expand Down Expand Up @@ -374,12 +376,13 @@ public void printTree(Tree<Canopy<R>> tree,
}

public List<FieldDefinition> getFieldOfInterestList(List<FieldDefinition> fieldDefinitions, Canopy<R> node) {
FieldDefinitionStrategy<R> fieldDefinitionStrategy = new DefaultFieldDefinitionStrategy<R>();
return fieldDefinitionStrategy.getAdjustedFieldDefinitions(fieldDefinitions, node);
}

public abstract FeatureFactory<T> getFeatureFactory();



public void setFieldDefinitionStrategy(FieldDefinitionStrategy<R> fieldDefinitionStrategy) {
this.fieldDefinitionStrategy = fieldDefinitionStrategy;
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ public void track(boolean collectMetrics){
Analytics.track(Metric.OUTPUT_FORMAT, getPipeUtil().getPipesAsString(args.getOutput()), collectMetrics);
Analytics.track(Metric.MODEL_ID, args.getModelId(), collectMetrics);
Analytics.track(Metric.STOPWORDS,new StopWordUtility().getFieldDefinitionNamesWithStopwords(args), collectMetrics);

}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,11 @@ public void postMetrics() {
Analytics.track(Metric.EXEC_TIME, (System.currentTimeMillis() - startTime) / 1000, true);
Analytics.track(Metric.MODEL_ID, getArgs().getModelId(), true);
Analytics.track(Metric.ZINGG_VERSION, "0.5.0", true);
Analytics.trackEnv(Metric.ZINGG_HOME, true);
Analytics.trackEnvValue(Metric.DATABRICKS_RUNTIME_VERSION, true);
Analytics.track(Metric.COUNTRY, Locale.getDefault().getCountry(), true);

Analytics.trackEnvValue(Metric.DB_INSTANCE_TYPE, collectMetrics);
Analytics.trackEnv(Metric.ZINGG_HOME, collectMetrics);
Analytics.trackPropValue(Metric.JAVA_VERSION, collectMetrics);
Analytics.trackPropValue(Metric.OS_ARCH, collectMetrics);
Analytics.trackPropValue(Metric.OS_NAME, collectMetrics);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public boolean isApplicable() {
public ZFrame<D, R, C> preprocess(ZFrame<D, R, C> df) {
try {
if(isApplicable()){
LOG.info("Applying preprocessor on input dataframe");
LOG.debug("Applying preprocessor on input dataframe");
return applyPreprocessor(df, relevantFields);
}
} catch (Exception exception) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ private static Map<String, String> getMetrics() {
public static void track(String metricName, String metricValue, boolean collectMetrics) {
if (collectMetrics) {
String metricNameToSend = metricName.replace(".", "_");
if (metricValue == null) metricValue = "";
getMetrics().put(metricNameToSend, metricValue);
}
}
Expand Down Expand Up @@ -123,7 +124,7 @@ public static void postEvent(String phase, boolean collectMetrics) {
rootNode.set("events", eventList);
rootNode.put("user_id", getUserId());
String metricEvent = rootNode.toString();
LOG.warn("event is " + metricEvent);
LOG.debug("event is " + metricEvent);
Analytics.sendEvents(metricEvent);
}

Expand All @@ -142,7 +143,7 @@ private static void sendEvents(String param) {
uri = builder.build();
URL url = uri.toURL();
String response = executePostRequest(url.toString(), param);
LOG.warn("Analytics event " + response);
LOG.debug("Analytics event " + response);
} catch (IOException | URISyntaxException e) {
if(LOG.isDebugEnabled()) e.printStackTrace();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public void setPipeUtil(PipeUtilBase<S, D, R, C> pipeUtil) {


public abstract Block<D,R,C,T> getBlock(ZFrame<D,R,C> sample, ZFrame<D,R,C> positives,
ListMap<T, HashFunction<D,R,C,T>>hashFunctions, long blockSize);
ListMap<T, HashFunction<D,R,C,T>>hashFunctions, long blockSize, IArguments arguments);


public Tree<Canopy<R>> createBlockingTree(ZFrame<D,R,C> testData,
Expand All @@ -55,7 +55,7 @@ public Tree<Canopy<R>> createBlockingTree(ZFrame<D,R,C> testData,
LOG.info("Learning indexing rules for block size " + blockSize);

positives = positives.coalesce(1);
Block<D,R,C,T> cblock = getBlock(sample, positives, hashFunctions, blockSize);
Block<D,R,C,T> cblock = getBlock(sample, positives, hashFunctions, blockSize, args);
Canopy<R> root = new Canopy<R>(sample.collectAsList(), positives.collectAsList());

List<FieldDefinition> fd = new ArrayList<FieldDefinition> ();
Expand Down Expand Up @@ -107,7 +107,7 @@ public Tree<Canopy<R>> readBlockingTree(IArguments args, IModelHelper mu) throws
//byte [] byteArrayBack = (byte[]) tree.df().head().get(0);
byte[] byteArrayBack = getTreeFromDF(tree);
Tree<Canopy<R>> blockingTree = null;
LOG.warn("byte array back is " + byteArrayBack);
LOG.debug("byte array back is " + byteArrayBack);
blockingTree = (Tree<Canopy<R>>) Util.revertObjectFromByteArray(byteArrayBack);
return blockingTree;
}
Expand Down
1 change: 1 addition & 0 deletions config/zingg.conf
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
spark.serializer=org.apache.spark.serializer.KryoSerializer
spark.default.parallelism=8
spark.debug.maxToStringFields=200
spark.sql.debug.maxToStringFields=200
spark.driver.memory=8g
spark.executor.memory=8g
#spark.jars=/home/zingg/pathto.jar
Expand Down
Loading
Loading