1
1
use std:: num:: NonZeroI32 ;
2
- use std:: sync:: { Mutex , Arc } ;
2
+ use std:: sync:: { Arc , Mutex } ;
3
3
4
4
// use mimalloc::MiMalloc;
5
+ use crossbeam:: queue:: ArrayQueue ;
6
+ use fffx:: { Fasta , Fastq } ;
5
7
use minimap2:: * ;
6
8
use minimap2_sys:: { mm_set_opt, MM_F_CIGAR } ;
9
+ use polars:: { df, prelude:: * } ;
7
10
use pyo3:: prelude:: * ;
8
11
use pyo3_polars:: { error:: PyPolarsErr , PyDataFrame } ;
9
- use polars:: { prelude:: * , df} ;
10
- use crossbeam:: queue:: ArrayQueue ;
11
- use fffx:: { Fasta , Fastq } ;
12
12
13
13
// #[global_allocator]
14
14
// static GLOBAL: MiMalloc = MiMalloc;
@@ -47,13 +47,15 @@ unsafe impl Send for Aligner {}
47
47
48
48
#[ pymethods]
49
49
impl Aligner {
50
-
51
50
// Mapping functions
52
51
/// Map a single sequence
53
52
fn map1 ( & self , seq : & Sequence ) -> PyResult < PyDataFrame > {
54
53
let mut mappings = Mappings :: default ( ) ;
55
54
56
- let results = self . aligner . map ( & seq. sequence , true , true , None , None ) . unwrap ( ) ;
55
+ let results = self
56
+ . aligner
57
+ . map ( & seq. sequence , true , true , None , None )
58
+ . unwrap ( ) ;
57
59
results. into_iter ( ) . for_each ( |mut r| {
58
60
r. query_name = Some ( seq. id . clone ( ) ) ;
59
61
mappings. push ( r)
@@ -73,7 +75,10 @@ impl Aligner {
73
75
let mut mappings = Mappings :: default ( ) ;
74
76
75
77
for seq in seqs {
76
- let results = self . aligner . map ( & seq. sequence , true , true , None , None ) . unwrap ( ) ;
78
+ let results = self
79
+ . aligner
80
+ . map ( & seq. sequence , true , true , None , None )
81
+ . unwrap ( ) ;
77
82
results. into_iter ( ) . for_each ( |mut r| {
78
83
r. query_name = Some ( seq. id . clone ( ) ) ;
79
84
mappings. push ( r)
@@ -86,7 +91,7 @@ impl Aligner {
86
91
let work_queue = Arc :: new ( Mutex :: new ( seqs) ) ;
87
92
let results_queue = Arc :: new ( ArrayQueue :: < WorkQueue < Vec < Mapping > > > :: new ( 128 ) ) ;
88
93
let mut thread_handles = Vec :: new ( ) ;
89
- for i in 0 ..( self . aligner . threads - 1 ) {
94
+ for i in 0 ..( self . aligner . threads - 1 ) {
90
95
let work_queue = Arc :: clone ( & work_queue) ;
91
96
let results_queue = Arc :: clone ( & results_queue) ;
92
97
@@ -127,13 +132,13 @@ impl Aligner {
127
132
match result {
128
133
Some ( WorkQueue :: Work ( result) ) => {
129
134
result. into_iter ( ) . for_each ( |r| mappings. push ( r) ) ;
130
- } ,
135
+ }
131
136
Some ( WorkQueue :: Done ) => {
132
137
finished_count += 1 ;
133
138
if finished_count == ( self . aligner . threads - 1 ) {
134
139
break ;
135
140
}
136
- } ,
141
+ }
137
142
None => {
138
143
// Probably should be backoff, but let's try this for now...
139
144
std:: thread:: sleep ( std:: time:: Duration :: from_millis ( 100 ) ) ;
@@ -203,7 +208,7 @@ impl Aligner {
203
208
self . preset ( Preset :: AvaPb ) ;
204
209
}
205
210
206
- /// Configure aligner for Asm
211
+ /// Configure aligner for Asm
207
212
fn asm ( & mut self ) {
208
213
self . preset ( Preset :: Asm ) ;
209
214
}
@@ -245,26 +250,26 @@ impl Aligner {
245
250
}
246
251
247
252
impl Aligner {
248
- /// Create an aligner using a preset.
249
- fn preset ( & mut self , preset : Preset ) {
250
- let mut idxopt = IdxOpt :: default ( ) ;
251
- let mut mapopt = MapOpt :: default ( ) ;
252
-
253
- unsafe {
254
- // Set preset
255
- mm_set_opt ( preset. into ( ) , & mut idxopt, & mut mapopt)
256
- } ;
257
-
258
- self . aligner . idxopt = idxopt;
259
- self . aligner . mapopt = mapopt;
260
- }
253
+ /// Create an aligner using a preset.
254
+ fn preset ( & mut self , preset : Preset ) {
255
+ let mut idxopt = IdxOpt :: default ( ) ;
256
+ let mut mapopt = MapOpt :: default ( ) ;
257
+
258
+ unsafe {
259
+ // Set preset
260
+ mm_set_opt ( preset. into ( ) , & mut idxopt, & mut mapopt)
261
+ } ;
262
+
263
+ self . aligner . idxopt = idxopt;
264
+ self . aligner . mapopt = mapopt;
265
+ }
261
266
}
262
267
263
268
/*
264
269
TODO - Destroy index when aligner is dropped or when new index is created
265
270
impl Drop for Aligner {
266
271
fn drop(&mut self) {
267
-
272
+
268
273
}
269
274
} */
270
275
@@ -431,66 +436,77 @@ impl Mappings {
431
436
}
432
437
433
438
pub fn to_df ( self ) -> Result < DataFrame , PolarsError > {
434
-
435
439
// Convert strand to string + or -
436
440
let strand: Vec < String > = self . strand . iter ( ) . map ( |x| x. to_string ( ) ) . collect ( ) ;
437
441
438
442
// Convert query len to Option<u32>
439
443
// let query_len: Vec<Option<u32>> = self.query_len.iter().map(|x| x.map(|y| y as u32.into())).collect();
440
- let query_len: Vec < Option < u32 > > = self . query_len . iter ( ) . map ( |x|
441
- match x {
444
+ let query_len: Vec < Option < u32 > > = self
445
+ . query_len
446
+ . iter ( )
447
+ . map ( |x| match x {
442
448
Some ( y) => Some ( y. get ( ) as u32 ) ,
443
449
None => None ,
444
- }
445
- ) . collect ( ) ;
450
+ } )
451
+ . collect ( ) ;
446
452
447
- let nm: Vec < Option < i32 > > = self . alignment . iter ( ) . map ( |x|
448
- match x {
453
+ let nm: Vec < Option < i32 > > = self
454
+ . alignment
455
+ . iter ( )
456
+ . map ( |x| match x {
449
457
// These are ugly but it's early in the morning...
450
458
Some ( y) => Some ( y. nm ) ,
451
459
None => None ,
452
- }
453
- ) . collect ( ) ;
460
+ } )
461
+ . collect ( ) ;
454
462
455
- let cigar: Vec < Option < Vec < ( u32 , u8 ) > > > = self . alignment . iter ( ) . map ( |x|
456
- match x {
463
+ let cigar: Vec < Option < Vec < ( u32 , u8 ) > > > = self
464
+ . alignment
465
+ . iter ( )
466
+ . map ( |x| match x {
457
467
Some ( y) => match & y. cigar {
458
468
Some ( z) => Some ( z. clone ( ) ) ,
459
469
None => None ,
460
470
} ,
461
471
None => None ,
462
- }
463
- ) . collect ( ) ;
472
+ } )
473
+ . collect ( ) ;
464
474
465
- let cigar_str: Vec < Option < String > > = self . alignment . iter ( ) . map ( |x|
466
- match x {
475
+ let cigar_str: Vec < Option < String > > = self
476
+ . alignment
477
+ . iter ( )
478
+ . map ( |x| match x {
467
479
Some ( y) => match & y. cigar_str {
468
480
Some ( z) => Some ( z. clone ( ) ) ,
469
481
None => None ,
470
482
} ,
471
483
None => None ,
472
- }
473
- ) . collect ( ) ;
484
+ } )
485
+ . collect ( ) ;
474
486
475
- let md: Vec < Option < String > > = self . alignment . iter ( ) . map ( |x|
476
- match x {
487
+ let md: Vec < Option < String > > = self
488
+ . alignment
489
+ . iter ( )
490
+ . map ( |x| match x {
477
491
Some ( y) => match & y. md {
478
492
Some ( z) => Some ( z. clone ( ) ) ,
479
493
None => None ,
480
494
} ,
481
495
None => None ,
482
- }
483
- ) . collect ( ) ;
496
+ } )
497
+ . collect ( ) ;
484
498
485
- let cs: Vec < Option < String > > = self . alignment . iter ( ) . map ( |x|
486
- match x {
499
+ let cs: Vec < Option < String > > = self
500
+ . alignment
501
+ . iter ( )
502
+ . map ( |x| match x {
487
503
Some ( y) => match & y. cs {
488
504
Some ( z) => Some ( z. clone ( ) ) ,
489
505
None => None ,
490
506
} ,
491
507
None => None ,
492
- }
493
- ) . collect ( ) ;
508
+ } )
509
+ . collect ( ) ;
494
510
495
511
let query_name = Series :: new ( "query_name" , self . query_name ) ;
496
512
let query_len = Series :: new ( "query_len" , query_len) ;
@@ -533,14 +549,3 @@ impl Mappings {
533
549
] )
534
550
}
535
551
}
536
-
537
- #[ cfg( test) ]
538
- mod tests {
539
- use super :: * ;
540
-
541
- #[ test]
542
- fn it_works ( ) {
543
- let result = add ( 2 , 2 ) ;
544
- assert_eq ! ( result, 4 ) ;
545
- }
546
- }
0 commit comments