3434import com .linkedin .metadata .query .ListResultMetadata ;
3535import com .linkedin .metadata .query .SortOrder ;
3636import io .ebean .DuplicateKeyException ;
37+ import io .ebean .Ebean ;
3738import io .ebean .EbeanServer ;
3839import io .ebean .ExpressionList ;
3940import io .ebean .PagedList ;
4445import java .lang .reflect .InvocationTargetException ;
4546import java .lang .reflect .Method ;
4647import java .net .URISyntaxException ;
48+ import java .sql .PreparedStatement ;
49+ import java .sql .ResultSet ;
50+ import java .sql .SQLException ;
4751import java .sql .Timestamp ;
4852import java .util .ArrayList ;
4953import java .util .Arrays ;
@@ -439,54 +443,73 @@ public <ASPECT extends RecordTemplate> void backfillLocalRelationshipsFromEntity
439443 }
440444
441445 private <ASPECT extends RecordTemplate > EbeanMetadataAspect queryLatest (@ Nonnull URN urn , @ Nonnull Class <ASPECT > aspectClass ) {
446+ final String aspectName = ModelUtils .getAspectName (aspectClass );
447+
442448 if (_findMethodology == FindMethodology .UNIQUE_ID ) {
443- final PrimaryKey key = new PrimaryKey (urn .toString (), ModelUtils . getAspectName ( aspectClass ) , 0L );
449+ final PrimaryKey key = new PrimaryKey (urn .toString (), aspectName , 0L );
444450 return _server .find (EbeanMetadataAspect .class , key );
445451 }
446452
453+ // TODO(@jphui) added for job-gms duplicity debug, throwaway afterwards
454+
455+ // JDBC sanity check: should MATCH Ebean's results
456+ if (log .isDebugEnabled () && "AzkabanFlowInfo" .equals (aspectClass .getSimpleName ())) {
457+ final String sqlQuery = "SELECT * FROM metadata_aspect "
458+ + "WHERE urn = ? and aspect = ? and version = 0" ;
459+
460+ try (Transaction transaction = _server .beginTransaction ()) {
461+
462+ // use PreparedStatement
463+ try (PreparedStatement stmt = transaction .getConnection ().prepareStatement (sqlQuery )) {
464+ stmt .setString (1 , urn .toString ());
465+ stmt .setString (2 , aspectName );
466+
467+ try (ResultSet rset = stmt .executeQuery ()) {
468+ rset .last (); // go to the last returned record
469+ log .debug ("JDBC found {} existing records" , rset .getRow ());
470+ }
471+ }
472+
473+ transaction .commit ();
474+ } catch (SQLException e ) {
475+ log .debug ("JDBC ran into a SQLException: {}" , e .getMessage ());
476+ }
477+ }
478+
447479 List <EbeanMetadataAspect > results = Collections .emptyList ();
448- // TODO (@jphui): remove following pathway(s) that are not used
480+ Query < EbeanMetadataAspect > query = Ebean . find ( EbeanMetadataAspect . class ); // non-null placeholder to be overridden
449481
450482 if (_findMethodology == FindMethodology .DIRECT_SQL ) {
451483 final String selectQuery = "SELECT * FROM metadata_aspect "
452484 + "WHERE urn = :urn and aspect = :aspect and version = 0 "
453485 + "ORDER BY createdOn DESC" ;
454486
455- Query <EbeanMetadataAspect > query = _server .findNative (EbeanMetadataAspect .class , selectQuery )
456- .setParameter ("urn" , urn .toString ())
457- .setParameter ("aspect" , ModelUtils .getAspectName (aspectClass ));
458-
459- results = query .findList ();
460-
461- // TODO(yanyang) added for job-gms duplicity debug, throwaway afterwards
462- if (log .isDebugEnabled ()) {
463- if ("AzkabanFlowInfo" .equals (aspectClass .getSimpleName ())) {
464- log .debug ("Using DIRECT_SQL retrieval." );
465- log .debug ("queryLatest SQL: " + query .getGeneratedSql ());
466- log .debug ("queryLatest Result: " + results );
467- }
468- }
487+ query = _server .findNative (EbeanMetadataAspect .class , selectQuery )
488+ .setParameter ("urn" , urn .toString ())
489+ .setParameter ("aspect" , aspectName );
469490 }
470491
471492 if (_findMethodology == FindMethodology .QUERY_BUILDER ) {
472- Query < EbeanMetadataAspect > query = _server .find (EbeanMetadataAspect .class )
493+ query = _server .find (EbeanMetadataAspect .class )
473494 .where ()
474495 .eq (URN_COLUMN , urn .toString ())
475- .eq (ASPECT_COLUMN , ModelUtils . getAspectName ( aspectClass ) )
496+ .eq (ASPECT_COLUMN , aspectName )
476497 .eq (VERSION_COLUMN , 0L )
477498 .orderBy ()
478499 .desc (CREATED_ON_COLUMN );
500+ }
479501
480- results = query .findList ();
502+ results = query .findList ();
481503
482- // TODO(yanyang) added for job-gms duplicity debug, throwaway afterwards
483- if (log .isDebugEnabled ()) {
484- if ("AzkabanFlowInfo" .equals (aspectClass .getSimpleName ())) {
485- log .debug ("Using QUERY_BUILDER retrieval." );
486- log .debug ("queryLatest SQL: " + query .getGeneratedSql ());
487- log .debug ("queryLatest Result: " + results );
488- }
489- }
504+ // Encouraged to run AFTER query execution based on getGeneratedSql() documentation
505+ if (log .isDebugEnabled () && "AzkabanFlowInfo" .equals (aspectClass .getSimpleName ())) {
506+ log .debug ("Using {} retrieval; " + "Generated SQL: {}; urn: {}, aspect: {}, version: {}" ,
507+ _findMethodology .toString (),
508+ query .getGeneratedSql (),
509+ urn .toString (),
510+ aspectName ,
511+ 0L
512+ );
490513 }
491514
492515 if (results .isEmpty ()) {
@@ -495,7 +518,7 @@ private <ASPECT extends RecordTemplate> EbeanMetadataAspect queryLatest(@Nonnul
495518
496519 // don't crash if find duplicates, but log an error
497520 if (results .size () > 1 ) {
498- log .error ("Two version=0 records found for {}, {}" , urn , aspectClass . getSimpleName () );
521+ log .error ("Two version=0 records found for {}, {}" , urn , aspectName );
499522 }
500523
501524 // return value at the top of the list (latest createdOn)
@@ -572,13 +595,6 @@ protected <ASPECT extends RecordTemplate> void updateWithOptimisticLocking(@Nonn
572595 update .setParameter ("createdBy" , aspect .getCreatedBy ());
573596 update .setParameter ("oldTimestamp" , oldTimestamp );
574597
575- // TODO(yanyang) added for job-gms duplicity debug, throwaway afterwards
576- if (log .isDebugEnabled ()) {
577- if ("AzkabanFlowInfo" .equals (aspectClass .getSimpleName ())) {
578- log .debug ("updateWithOptimisticLocking SQL: " + update .getGeneratedSql ());
579- }
580- }
581-
582598 int numOfUpdatedRows ;
583599 if (_schemaConfig == SchemaConfig .NEW_SCHEMA_ONLY || _schemaConfig == SchemaConfig .DUAL_SCHEMA ) {
584600 // ensure atomicity by running old schema update + new schema update in a transaction
0 commit comments