@@ -72,35 +72,43 @@ def _sync_status(self):
72
72
DataSource .objects .filter (pk = self .pk ).update (status = self .status , last_synced = self .last_synced )
73
73
post_sync .send (sender = self .__class__ , instance = self )
74
74
75
- def partial_sync (self , device_filter : Q , batch_size : int = 1000 ) -> None :
75
+ def partial_sync (self , device_filter : Q , batch_size : int = 1000 ) -> set [ str ] :
76
76
def update_batch (batch ):
77
77
for datafile in self .datafiles .filter (path__in = batch ).iterator ():
78
78
if datafile .refresh_from_disk (local_path ):
79
79
yield datafile
80
- paths . discard (datafile .path )
80
+ updated_paths . add (datafile .path )
81
81
82
82
def new_data_file (path ):
83
83
df = DataFile (source = self , path = path )
84
84
df .refresh_from_disk (local_path )
85
85
df .full_clean ()
86
86
return df
87
87
88
- if self .type != "device_polling" :
89
- raise SyncError ("Partial sync is available only for Data Source with type Device Polling" )
90
88
backend = self .get_backend ()
91
- with backend .fetch (device_filter ) as local_path , self ._sync_status ():
92
- paths = self ._walk (local_path )
89
+ fetch = backend .fetch (device_filter ) if self .type == "device_polling" else backend .fetch ()
90
+ with fetch as local_path , self ._sync_status ():
91
+ all_new_paths = self ._walk (local_path )
92
+ updated_paths = set ()
93
93
datafiles_to_update = chain .from_iterable (
94
- update_batch (path_batch ) for path_batch in batched (paths , batch_size )
94
+ update_batch (path_batch ) for path_batch in batched (all_new_paths , batch_size )
95
95
)
96
96
updated = DataFile .objects .bulk_update (
97
97
datafiles_to_update , batch_size = batch_size , fields = ("last_updated" , "size" , "hash" , "data" )
98
98
)
99
- new_datafiles = (new_data_file (path ) for path in paths )
99
+ new_datafiles = (new_data_file (path ) for path in all_new_paths - updated_paths )
100
100
created = len (DataFile .objects .bulk_create (new_datafiles , batch_size = batch_size ))
101
101
logger .debug ("%s new files were created and %s existing files were updated during sync" , created , updated )
102
+ return all_new_paths
102
103
103
104
def sync (self , device_filter : Q | None = None ):
104
- if device_filter is not None and self .type == "device_polling" :
105
- return self .partial_sync (device_filter )
106
- return super ().sync ()
105
+ if device_filter is None or self .type != "device_polling" :
106
+ return super ().sync ()
107
+ self .partial_sync (device_filter )
108
+
109
+ def sync_in_migration (self , datafile_model : type ):
110
+ """
111
+ This method performs sync and avoids problems with historical models which have reference to DataFile
112
+ """
113
+ new_paths = self .partial_sync (Q ())
114
+ datafile_model .objects .exclude (path__in = new_paths ).delete ()
0 commit comments