22
33namespace craft \feedme \queue \jobs ;
44
5+ use Cake \Utility \Hash ;
56use Craft ;
7+ use craft \base \Batchable ;
8+ use craft \feedme \datatypes \DataBatcher ;
9+ use craft \feedme \events \FeedProcessEvent ;
610use craft \feedme \models \FeedModel ;
711use craft \feedme \Plugin ;
8- use craft \queue \BaseJob ;
12+ use craft \feedme \services \Process ;
13+ use craft \helpers \Queue ;
14+ use craft \queue \BaseBatchedJob ;
915use Throwable ;
1016use yii \queue \RetryableJobInterface ;
1117
1218/**
1319 *
1420 * @property-read mixed $ttr
1521 */
16- class FeedImport extends BaseJob implements RetryableJobInterface
22+ class FeedImport extends BaseBatchedJob implements RetryableJobInterface
1723{
1824 // Properties
1925 // =========================================================================
@@ -44,6 +50,22 @@ class FeedImport extends BaseJob implements RetryableJobInterface
4450 */
4551 public bool $ continueOnError = true ;
4652
53+ /**
54+ * @var mixed The Unix timestamp with microseconds of when the feed import started being processed
55+ * @since 5.11.0
56+ */
57+ public mixed $ startTime = null ;
58+
59+ /**
60+ * @var array The Feed's settings as prepared by beforeProcessFeed()
61+ */
62+ private array $ _feedSettings = [];
63+
64+ /**
65+ * @var int The index of currently processed item in current batch
66+ */
67+ private int $ _index = 0 ;
68+
4769 // Public Methods
4870 // =========================================================================
4971
@@ -65,73 +87,108 @@ public function canRetry($attempt, $error): bool
6587 }
6688
6789 /**
68- * @inheritDoc
90+ * @inheritdoc
6991 */
70- public function execute ( $ queue ): void
92+ protected function loadData ( ): Batchable
7193 {
72- try {
73- $ feedData = $ this ->feed ->getFeedData ();
94+ $ feedData = $ this ->feed ->getFeedData ();
7495
75- if ($ this ->offset ) {
76- $ feedData = array_slice ($ feedData , $ this ->offset );
77- }
96+ if ($ this ->offset ) {
97+ $ feedData = array_slice ($ feedData , $ this ->offset );
98+ }
7899
79- if ($ this ->limit ) {
80- $ feedData = array_slice ($ feedData , 0 , $ this ->limit );
81- }
100+ if ($ this ->limit ) {
101+ $ feedData = array_slice ($ feedData , 0 , $ this ->limit );
102+ }
103+
104+ $ data = $ feedData ;
82105
83- // Do we even have any data to process?
84- if (! $ feedData ) {
85- Plugin:: info ( ' No feed items to process. ' );
86- return ;
106+ // Our main data-parsing function. Handles the actual data values, defaults and field options
107+ foreach ( $ feedData as $ key => $ nodeData ) {
108+ if (! is_array ( $ nodeData )) {
109+ $ nodeData = [ $ nodeData ] ;
87110 }
88111
89- $ feedSettings = Plugin::$ plugin ->process ->beforeProcessFeed ($ this ->feed , $ feedData );
112+ $ data [$ key ] = Hash::flatten ($ nodeData , '/ ' );
113+ }
114+
115+ $ data = array_values ($ data );
90116
91- $ feedData = $ feedSettings ['feedData ' ];
117+ // Fire an 'onBeforeProcessFeed' event
118+ $ event = new FeedProcessEvent ([
119+ 'feed ' => $ this ->feed ,
120+ 'feedData ' => $ data ,
121+ ]);
92122
93- $ totalSteps = count ( $ feedData );
123+ Plugin:: $ plugin -> process -> trigger (Process:: EVENT_BEFORE_PROCESS_FEED , $ event );
94124
95- $ index = 0 ;
125+ if (!$ event ->isValid ) {
126+ return new DataBatcher ([]);
127+ }
96128
97- foreach ($ feedData as $ data ) {
98- try {
99- Plugin::$ plugin ->process ->processFeed ($ index , $ feedSettings , $ this ->processedElementIds );
100- } catch (Throwable $ e ) {
101- if (!$ this ->continueOnError ) {
102- throw $ e ;
103- }
129+ // Allow event to modify the feed data
130+ $ data = $ event ->feedData ;
104131
105- // We want to catch any issues in each iteration of the loop (and log them), but this allows the
106- // rest of the feed to continue processing.
107- Plugin::error ('`{e} - {f}: {l}`. ' , ['e ' => $ e ->getMessage (), 'f ' => basename ($ e ->getFile ()), 'l ' => $ e ->getLine ()]);
108- Craft::$ app ->getErrorHandler ()->logException ($ e );
109- }
132+ return new DataBatcher ($ data );
133+ }
110134
111- $ this ->setProgress ($ queue , $ index ++ / $ totalSteps );
135+ /**
136+ * @inheritdoc
137+ */
138+ protected function processItem (mixed $ item ): void
139+ {
140+ try {
141+ Plugin::$ plugin ->process ->processFeed ($ this ->_index , $ this ->_feedSettings , $ this ->processedElementIds , $ item , $ this ->batchIndex );
142+ } catch (Throwable $ e ) {
143+ if (!$ this ->continueOnError ) {
144+ throw $ e ;
112145 }
113146
114- // Check if we need to paginate the feed to run again
147+ // We want to catch any issues in each iteration of the loop (and log them), but this allows the
148+ // rest of the feed to continue processing.
149+ Plugin::error ('`{e} - {f}: {l}`. ' , ['e ' => $ e ->getMessage (), 'f ' => basename ($ e ->getFile ()), 'l ' => $ e ->getLine ()]);
150+ Craft::$ app ->getErrorHandler ()->logException ($ e );
151+ }
152+
153+ $ this ->_index ++;
154+ }
155+ /**
156+ * @inheritDoc
157+ */
158+ public function execute ($ queue ): void
159+ {
160+ $ processService = Plugin::$ plugin ->getProcess ();
161+ if ($ this ->itemOffset == 0 ) {
162+ $ processService ->beforeProcessFeed ($ this ->feed , (array )$ this ->data ());
163+ }
164+
165+ if (!$ this ->startTime ) {
166+ $ this ->startTime = $ processService ->time_start ;
167+ }
168+
169+ if (empty ($ this ->_feedSettings )) {
170+ $ this ->_feedSettings = $ processService ->getFeedSettings ($ this ->feed , (array )$ this ->data ());
171+ }
172+
173+ parent ::execute ($ queue );
174+
175+ // Check if we need to paginate the feed to run again
176+ if ($ this ->itemOffset == $ this ->totalItems ()) {
115177 if ($ this ->feed ->getNextPagination ()) {
116- Plugin:: getInstance ()-> queue -> push (new self ([
178+ Queue:: push (new self ([
117179 'feed ' => $ this ->feed ,
118180 'limit ' => $ this ->limit ,
119181 'offset ' => $ this ->offset ,
120182 'processedElementIds ' => $ this ->processedElementIds ,
183+ 'startTime ' => $ this ->startTime ,
121184 ]));
122185 } else {
123186 // Only perform the afterProcessFeed function after any/all pagination is done
124- Plugin:: $ plugin -> process -> afterProcessFeed ($ feedSettings , $ this ->feed , $ this ->processedElementIds );
187+ $ processService -> afterProcessFeed ($ this -> _feedSettings , $ this ->feed , $ this ->processedElementIds , $ this -> startTime );
125188 }
126- } catch (Throwable $ e ) {
127- // Even though we catch errors on each step of the loop, make sure to catch errors that can be anywhere
128- // else in this function, just to be super-safe and not cause the queue job to die.
129- Plugin::error ('`{e} - {f}: {l}`. ' , ['e ' => $ e ->getMessage (), 'f ' => basename ($ e ->getFile ()), 'l ' => $ e ->getLine ()]);
130- Craft::$ app ->getErrorHandler ()->logException ($ e );
131189 }
132190 }
133191
134-
135192 // Protected Methods
136193 // =========================================================================
137194
0 commit comments