32
32
import org .reactivestreams .Publisher ;
33
33
34
34
import org .springframework .beans .BeansException ;
35
+ import org .springframework .beans .factory .BeanFactory ;
36
+ import org .springframework .beans .factory .BeanFactoryAware ;
35
37
import org .springframework .context .ApplicationContext ;
36
38
import org .springframework .context .ApplicationContextAware ;
37
39
import org .springframework .core .convert .ConversionService ;
77
79
* @author Bogdan Ilchyshyn
78
80
* @since 1.1
79
81
*/
80
- public class R2dbcEntityTemplate implements R2dbcEntityOperations , ApplicationContextAware {
82
+ public class R2dbcEntityTemplate implements R2dbcEntityOperations , BeanFactoryAware , ApplicationContextAware {
81
83
82
84
private final DatabaseClient databaseClient ;
83
85
@@ -123,6 +125,15 @@ public DatabaseClient getDatabaseClient() {
123
125
return this .databaseClient ;
124
126
}
125
127
128
+ /*
129
+ * (non-Javadoc)
130
+ * @see org.springframework.beans.factory.BeanFactoryAware#setBeanFactory(org.springframework.beans.factory.BeanFactory)
131
+ * @deprecated since 1.2 in favor of #setApplicationContext.
132
+ */
133
+ @ Override
134
+ @ Deprecated
135
+ public void setBeanFactory (BeanFactory beanFactory ) throws BeansException {}
136
+
126
137
/*
127
138
* (non-Javadoc)
128
139
* @see org.springframework.context.ApplicationContextAware#setApplicationContext(org.springframework.context.ApplicationContext)
@@ -431,33 +442,38 @@ <T> Mono<T> doInsert(T entity, SqlIdentifier tableName) {
431
442
432
443
RelationalPersistentEntity <T > persistentEntity = getRequiredEntity (entity );
433
444
434
- return Mono .defer (() -> maybeCallBeforeConvert (setVersionIfNecessary (persistentEntity , entity ), tableName )
445
+ T entityWithVersion = setVersionIfNecessary (persistentEntity , entity );
446
+
447
+ return maybeCallBeforeConvert (entityWithVersion , tableName )
435
448
.flatMap (beforeConvert -> {
436
449
437
450
OutboundRow outboundRow = dataAccessStrategy .getOutboundRow (beforeConvert );
438
451
439
- return maybeCallBeforeSave (beforeConvert , outboundRow , tableName ).flatMap (entityToSave -> {
452
+ return maybeCallBeforeSave (beforeConvert , outboundRow , tableName ) //
453
+ .flatMap (entityToSave -> doInsert (entityToSave , tableName , outboundRow ));
454
+ });
455
+ }
456
+
457
+ private <T > Mono <T > doInsert (T entity , SqlIdentifier tableName , OutboundRow outboundRow ) {
440
458
441
- StatementMapper mapper = dataAccessStrategy .getStatementMapper ();
442
- StatementMapper .InsertSpec insert = mapper .createInsert (tableName );
459
+ StatementMapper mapper = dataAccessStrategy .getStatementMapper ();
460
+ StatementMapper .InsertSpec insert = mapper .createInsert (tableName );
443
461
444
- for (SqlIdentifier column : outboundRow .keySet ()) {
445
- SettableValue settableValue = outboundRow .get (column );
446
- if (settableValue .hasValue ()) {
447
- insert = insert .withColumn (column , settableValue );
448
- }
449
- }
462
+ for (SqlIdentifier column : outboundRow .keySet ()) {
463
+ SettableValue settableValue = outboundRow .get (column );
464
+ if (settableValue .hasValue ()) {
465
+ insert = insert .withColumn (column , settableValue );
466
+ }
467
+ }
450
468
451
- PreparedOperation <?> operation = mapper .getMappedObject (insert );
469
+ PreparedOperation <?> operation = mapper .getMappedObject (insert );
452
470
453
- return this .databaseClient .execute (operation ) //
454
- .filter (statement -> statement .returnGeneratedValues ())
455
- .map (this .dataAccessStrategy .getConverter ().populateIdIfNecessary (entityToSave )) //
456
- .first () //
457
- .defaultIfEmpty (entityToSave ) //
458
- .flatMap (saved -> maybeCallAfterSave (saved , outboundRow , tableName ));
459
- });
460
- }));
471
+ return this .databaseClient .execute (operation ) //
472
+ .filter (statement -> statement .returnGeneratedValues ())
473
+ .map (this .dataAccessStrategy .getConverter ().populateIdIfNecessary (entity )) //
474
+ .first () //
475
+ .defaultIfEmpty (entity ) //
476
+ .flatMap (saved -> maybeCallAfterSave (saved , outboundRow , tableName ));
461
477
}
462
478
463
479
@ SuppressWarnings ("unchecked" )
@@ -493,9 +509,22 @@ private <T> Mono<T> doUpdate(T entity, SqlIdentifier tableName) {
493
509
494
510
RelationalPersistentEntity <T > persistentEntity = getRequiredEntity (entity );
495
511
496
- return maybeCallBeforeConvert (entity , tableName ).flatMap (beforeConvert -> {
512
+ T entityToUse ;
513
+ Criteria matchingVersionCriteria ;
514
+
515
+ if (persistentEntity .hasVersionProperty ()) {
497
516
498
- OutboundRow outboundRow = dataAccessStrategy .getOutboundRow (entity );
517
+ matchingVersionCriteria = createMatchingVersionCriteria (entity , persistentEntity );
518
+ entityToUse = incrementVersion (persistentEntity , entity );
519
+ } else {
520
+
521
+ entityToUse = entity ;
522
+ matchingVersionCriteria = null ;
523
+ }
524
+
525
+ return maybeCallBeforeConvert (entityToUse , tableName ).flatMap (beforeConvert -> {
526
+
527
+ OutboundRow outboundRow = dataAccessStrategy .getOutboundRow (beforeConvert );
499
528
500
529
return maybeCallBeforeSave (beforeConvert , outboundRow , tableName ) //
501
530
.flatMap (entityToSave -> {
@@ -504,43 +533,44 @@ private <T> Mono<T> doUpdate(T entity, SqlIdentifier tableName) {
504
533
SettableValue id = outboundRow .remove (idColumn );
505
534
Criteria criteria = Criteria .where (dataAccessStrategy .toSql (idColumn )).is (id );
506
535
507
- T saved ;
508
-
509
- if (persistentEntity .hasVersionProperty ()) {
510
- criteria = criteria .and (createMatchingVersionCriteria (entity , persistentEntity ));
511
- saved = incrementVersion (persistentEntity , entity , outboundRow );
512
- } else {
513
- saved = entityToSave ;
536
+ if (matchingVersionCriteria != null ) {
537
+ criteria = criteria .and (matchingVersionCriteria );
514
538
}
515
539
516
- Update update = Update .from ((Map ) outboundRow );
540
+ return doUpdate (entityToSave , tableName , persistentEntity , criteria , outboundRow );
541
+ });
542
+ });
543
+ }
544
+
545
+ @ SuppressWarnings ({ "unchecked" , "rawtypes" })
546
+ private <T > Mono <T > doUpdate (T entity , SqlIdentifier tableName , RelationalPersistentEntity <T > persistentEntity ,
547
+ Criteria criteria , OutboundRow outboundRow ) {
517
548
518
- StatementMapper mapper = dataAccessStrategy .getStatementMapper ();
519
- StatementMapper .UpdateSpec updateSpec = mapper .createUpdate (tableName , update ).withCriteria (criteria );
549
+ Update update = Update .from ((Map ) outboundRow );
520
550
521
- PreparedOperation <?> operation = mapper .getMappedObject (updateSpec );
551
+ StatementMapper mapper = dataAccessStrategy .getStatementMapper ();
552
+ StatementMapper .UpdateSpec updateSpec = mapper .createUpdate (tableName , update ).withCriteria (criteria );
522
553
523
- return this .databaseClient .execute (operation ) //
524
- .fetch () //
525
- .rowsUpdated () //
526
- .handle ((rowsUpdated , sink ) -> {
554
+ PreparedOperation <?> operation = mapper .getMappedObject (updateSpec );
527
555
528
- if (rowsUpdated != 0 ) {
529
- return ;
530
- }
556
+ return this .databaseClient .execute (operation ) //
557
+ .fetch () //
558
+ .rowsUpdated () //
559
+ .handle ((rowsUpdated , sink ) -> {
531
560
532
- if (persistentEntity .hasVersionProperty ()) {
533
- sink .error (new OptimisticLockingFailureException (
534
- formatOptimisticLockingExceptionMessage (saved , persistentEntity )));
535
- } else {
536
- sink .error (new TransientDataAccessResourceException (
537
- formatTransientEntityExceptionMessage (saved , persistentEntity )));
538
- }
539
- }).then (maybeCallAfterSave (saved , outboundRow , tableName ));
540
- });
541
- });
542
- }
561
+ if (rowsUpdated != 0 ) {
562
+ return ;
563
+ }
543
564
565
+ if (persistentEntity .hasVersionProperty ()) {
566
+ sink .error (new OptimisticLockingFailureException (
567
+ formatOptimisticLockingExceptionMessage (entity , persistentEntity )));
568
+ } else {
569
+ sink .error (new TransientDataAccessResourceException (
570
+ formatTransientEntityExceptionMessage (entity , persistentEntity )));
571
+ }
572
+ }).then (maybeCallAfterSave (entity , outboundRow , tableName ));
573
+ }
544
574
545
575
private <T > String formatOptimisticLockingExceptionMessage (T entity , RelationalPersistentEntity <T > persistentEntity ) {
546
576
@@ -555,7 +585,7 @@ private <T> String formatTransientEntityExceptionMessage(T entity, RelationalPer
555
585
}
556
586
557
587
@ SuppressWarnings ("unchecked" )
558
- private <T > T incrementVersion (RelationalPersistentEntity <T > persistentEntity , T entity , OutboundRow outboundRow ) {
588
+ private <T > T incrementVersion (RelationalPersistentEntity <T > persistentEntity , T entity ) {
559
589
560
590
PersistentPropertyAccessor <?> propertyAccessor = persistentEntity .getPropertyAccessor (entity );
561
591
RelationalPersistentProperty versionProperty = persistentEntity .getVersionProperty ();
@@ -569,8 +599,6 @@ private <T> T incrementVersion(RelationalPersistentEntity<T> persistentEntity, T
569
599
Class <?> versionPropertyType = versionProperty .getType ();
570
600
propertyAccessor .setProperty (versionProperty , conversionService .convert (newVersionValue , versionPropertyType ));
571
601
572
- outboundRow .put (versionProperty .getColumnName (), SettableValue .from (newVersionValue ));
573
-
574
602
return (T ) propertyAccessor .getBean ();
575
603
}
576
604
0 commit comments