@@ -338,16 +338,18 @@ impl<'a> std::iter::Iterator for Iterator<'a> {
338
338
}
339
339
}
340
340
341
+ /// Most Raft tests are Goldenscripts under src/raft/testscripts.
341
342
#[ cfg( test) ]
342
343
mod tests {
343
344
use super :: * ;
344
345
use crate :: encoding:: format:: { self , Formatter as _} ;
346
+ use crate :: storage:: engine:: test as testengine;
347
+
345
348
use crossbeam:: channel:: Receiver ;
346
349
use itertools:: Itertools as _;
347
350
use regex:: Regex ;
348
351
use std:: fmt:: Write as _;
349
352
use std:: { error:: Error , result:: Result } ;
350
- use storage:: engine:: test:: { self as testengine, Operation } ;
351
353
use test_each_file:: test_each_path;
352
354
353
355
// Run goldenscript tests in src/raft/testscripts/log.
@@ -362,12 +364,58 @@ mod tests {
362
364
log : Log ,
363
365
op_rx : Receiver < testengine:: Operation > ,
364
366
#[ allow( dead_code) ]
365
- tempdir : tempfile:: TempDir , // deleted when dropped
367
+ tempdir : tempfile:: TempDir ,
368
+ }
369
+
370
+ impl TestRunner {
371
+ fn new ( ) -> Self {
372
+ // Use both a BitCask and a Memory engine, and mirror operations
373
+ // across them. Emit write events to op_tx.
374
+ let ( op_tx, op_rx) = crossbeam:: channel:: unbounded ( ) ;
375
+ let tempdir = tempfile:: TempDir :: with_prefix ( "toydb" ) . expect ( "tempdir failed" ) ;
376
+ let bitcask =
377
+ storage:: BitCask :: new ( tempdir. path ( ) . join ( "bitcask" ) ) . expect ( "bitcask failed" ) ;
378
+ let memory = storage:: Memory :: new ( ) ;
379
+ let engine = testengine:: Emit :: new ( testengine:: Mirror :: new ( bitcask, memory) , op_tx) ;
380
+ let log = Log :: new ( Box :: new ( engine) ) . expect ( "log failed" ) ;
381
+ Self { log, op_rx, tempdir }
382
+ }
383
+
384
+ /// Parses an index@term pair.
385
+ fn parse_index_term ( s : & str ) -> Result < ( Index , Term ) , Box < dyn Error > > {
386
+ let re = Regex :: new ( r"^(\d+)@(\d+)$" ) . expect ( "invalid regex" ) ;
387
+ let groups = re. captures ( s) . ok_or_else ( || format ! ( "invalid index/term {s}" ) ) ?;
388
+ let index = groups. get ( 1 ) . unwrap ( ) . as_str ( ) . parse ( ) ?;
389
+ let term = groups. get ( 2 ) . unwrap ( ) . as_str ( ) . parse ( ) ?;
390
+ Ok ( ( index, term) )
391
+ }
392
+
393
+ /// Parses an index range, in Rust range syntax.
394
+ fn parse_index_range ( s : & str ) -> Result < impl std:: ops:: RangeBounds < Index > , Box < dyn Error > > {
395
+ use std:: ops:: Bound ;
396
+ let mut bound = ( Bound :: < Index > :: Unbounded , Bound :: < Index > :: Unbounded ) ;
397
+ let re = Regex :: new ( r"^(\d+)?\.\.(=)?(\d+)?" ) . expect ( "invalid regex" ) ;
398
+ let groups = re. captures ( s) . ok_or_else ( || format ! ( "invalid range {s}" ) ) ?;
399
+ if let Some ( start) = groups. get ( 1 ) {
400
+ bound. 0 = Bound :: Included ( start. as_str ( ) . parse ( ) ?) ;
401
+ }
402
+ if let Some ( end) = groups. get ( 3 ) {
403
+ let end = end. as_str ( ) . parse ( ) ?;
404
+ if groups. get ( 2 ) . is_some ( ) {
405
+ bound. 1 = Bound :: Included ( end)
406
+ } else {
407
+ bound. 1 = Bound :: Excluded ( end)
408
+ }
409
+ }
410
+ Ok ( bound)
411
+ }
366
412
}
367
413
368
414
impl goldenscript:: Runner for TestRunner {
369
415
fn run ( & mut self , command : & goldenscript:: Command ) -> Result < String , Box < dyn Error > > {
370
416
let mut output = String :: new ( ) ;
417
+ let mut tags = command. tags . clone ( ) ;
418
+
371
419
match command. name . as_str ( ) {
372
420
// append [COMMAND]
373
421
"append" => {
@@ -376,10 +424,8 @@ mod tests {
376
424
args. reject_rest ( ) ?;
377
425
let index = self . log . append ( command) ?;
378
426
let entry = self . log . get ( index) ?. expect ( "entry not found" ) ;
379
- output. push_str ( & format ! (
380
- "append → {}\n " ,
381
- format:: Raft :: <format:: Raw >:: entry( & entry)
382
- ) ) ;
427
+ let fmtentry = format:: Raft :: < format:: Raw > :: entry ( & entry) ;
428
+ writeln ! ( output, "append → {fmtentry}" ) ?;
383
429
}
384
430
385
431
// commit INDEX
@@ -389,10 +435,8 @@ mod tests {
389
435
args. reject_rest ( ) ?;
390
436
let index = self . log . commit ( index) ?;
391
437
let entry = self . log . get ( index) ?. expect ( "entry not found" ) ;
392
- output. push_str ( & format ! (
393
- "commit → {}\n " ,
394
- format:: Raft :: <format:: Raw >:: entry( & entry)
395
- ) ) ;
438
+ let fmtentry = format:: Raft :: < format:: Raw > :: entry ( & entry) ;
439
+ writeln ! ( output, "commit → {fmtentry}" ) ?;
396
440
}
397
441
398
442
// dump
@@ -401,12 +445,9 @@ mod tests {
401
445
let range = ( std:: ops:: Bound :: Unbounded , std:: ops:: Bound :: Unbounded ) ;
402
446
let mut scan = self . log . engine . scan_dyn ( range) ;
403
447
while let Some ( ( key, value) ) = scan. next ( ) . transpose ( ) ? {
404
- writeln ! (
405
- output,
406
- "{} [{}]" ,
407
- format:: Raft :: <format:: Raw >:: key_value( & key, & value) ,
408
- format:: Raw :: key_value( & key, & value)
409
- ) ?;
448
+ let fmtkv = format:: Raft :: < format:: Raw > :: key_value ( & key, & value) ;
449
+ let rawkv = format:: Raw :: key_value ( & key, & value) ;
450
+ writeln ! ( output, "{fmtkv} [{rawkv}]" ) ?;
410
451
}
411
452
}
412
453
@@ -417,24 +458,21 @@ mod tests {
417
458
args. rest_pos ( ) . iter ( ) . map ( |a| a. parse ( ) ) . try_collect ( ) ?;
418
459
args. reject_rest ( ) ?;
419
460
for index in indexes {
420
- let entry = self
421
- . log
422
- . get ( index) ?
461
+ let entry = self . log . get ( index) ?;
462
+ let fmtentry = entry
423
463
. as_ref ( )
424
464
. map ( format:: Raft :: < format:: Raw > :: entry)
425
465
. unwrap_or ( "None" . to_string ( ) ) ;
426
- output . push_str ( & format ! ( "{entry} \n " ) ) ;
466
+ writeln ! ( output , "{fmtentry}" ) ? ;
427
467
}
428
468
}
429
469
430
470
// get_term
431
471
"get_term" => {
432
472
command. consume_args ( ) . reject_rest ( ) ?;
433
473
let ( term, vote) = self . log . get_term ( ) ;
434
- output. push_str ( & format ! (
435
- "term={term} vote={}\n " ,
436
- vote. map( |v| v. to_string( ) ) . unwrap_or( "None" . to_string( ) )
437
- ) ) ;
474
+ let vote = vote. map ( |v| v. to_string ( ) ) . unwrap_or ( "None" . to_string ( ) ) ;
475
+ writeln ! ( output, "term={term} vote={vote}" ) ?;
438
476
}
439
477
440
478
// has INDEX@TERM...
@@ -448,7 +486,7 @@ mod tests {
448
486
args. reject_rest ( ) ?;
449
487
for ( index, term) in indexes {
450
488
let has = self . log . has ( index, term) ?;
451
- output . push_str ( & format ! ( "{has}\n " ) ) ;
489
+ writeln ! ( output , "{has}" ) ? ;
452
490
}
453
491
}
454
492
@@ -471,8 +509,8 @@ mod tests {
471
509
args. reject_rest ( ) ?;
472
510
let mut scan = self . log . scan ( range) ;
473
511
while let Some ( entry) = scan. next ( ) . transpose ( ) ? {
474
- output
475
- . push_str ( & format ! ( "{} \n " , format :: Raft :: <format :: Raw > :: entry ( & entry ) ) ) ;
512
+ let fmtentry = format :: Raft :: < format :: Raw > :: entry ( & entry ) ;
513
+ writeln ! ( output , "{fmtentry}" ) ? ;
476
514
}
477
515
}
478
516
@@ -484,8 +522,8 @@ mod tests {
484
522
args. reject_rest ( ) ?;
485
523
let mut scan = self . log . scan_apply ( applied_index) ;
486
524
while let Some ( entry) = scan. next ( ) . transpose ( ) ? {
487
- output
488
- . push_str ( & format ! ( "{} \n " , format :: Raft :: <format :: Raw > :: entry ( & entry ) ) ) ;
525
+ let fmtentry = format :: Raft :: < format :: Raw > :: entry ( & entry ) ;
526
+ writeln ! ( output , "{fmtentry}" ) ? ;
489
527
}
490
528
}
491
529
@@ -513,10 +551,8 @@ mod tests {
513
551
args. reject_rest ( ) ?;
514
552
let index = self . log . splice ( entries) ?;
515
553
let entry = self . log . get ( index) ?. expect ( "entry not found" ) ;
516
- output. push_str ( & format ! (
517
- "splice → {}\n " ,
518
- format:: Raft :: <format:: Raw >:: entry( & entry)
519
- ) ) ;
554
+ let fmtentry = format:: Raft :: < format:: Raw > :: entry ( & entry) ;
555
+ writeln ! ( output, "splice → {fmtentry}" ) ?;
520
556
}
521
557
522
558
// status [engine=BOOL]
@@ -527,100 +563,51 @@ mod tests {
527
563
let ( term, vote) = self . log . get_term ( ) ;
528
564
let ( last_index, last_term) = self . log . get_last_index ( ) ;
529
565
let ( commit_index, commit_term) = self . log . get_commit_index ( ) ;
530
- output. push_str ( & format ! (
531
- "term={term} last={last_index}@{last_term} commit={commit_index}@{commit_term} vote={}" ,
532
- vote. map( |id| id. to_string( ) ) . unwrap_or( "None" . to_string( ) )
533
- ) ) ;
566
+ let vote = vote. map ( |id| id. to_string ( ) ) . unwrap_or ( "None" . to_string ( ) ) ;
567
+ write ! (
568
+ output,
569
+ "term={term} last={last_index}@{last_term} commit={commit_index}@{commit_term} vote={vote}" ,
570
+ ) ?;
534
571
if engine {
535
- output . push_str ( & format ! ( " engine={:#?}" , self . log. status( ) ?) ) ;
572
+ write ! ( output , " engine={:#?}" , self . log. status( ) ?) ? ;
536
573
}
537
- output . push ( '\n' ) ;
574
+ writeln ! ( output ) ? ;
538
575
}
539
576
540
577
name => return Err ( format ! ( "unknown command {name}" ) . into ( ) ) ,
541
578
}
542
- Ok ( output)
543
- }
544
579
545
- /// If requested via [ops] tag, output engine operations for the command.
546
- fn end_command (
547
- & mut self ,
548
- command : & goldenscript:: Command ,
549
- ) -> Result < String , Box < dyn Error > > {
550
- // Parse tags.
551
- let mut show_ops = false ;
552
- for tag in & command. tags {
553
- match tag. as_str ( ) {
554
- "ops" => show_ops = true ,
555
- tag => return Err ( format ! ( "invalid tag {tag}" ) . into ( ) ) ,
580
+ // If requested, output engine operations.
581
+ if tags. remove ( "ops" ) {
582
+ while let Ok ( op) = self . op_rx . try_recv ( ) {
583
+ match op {
584
+ testengine:: Operation :: Delete { key } => {
585
+ let fmtkey = format:: Raft :: < format:: Raw > :: key ( & key) ;
586
+ let rawkey = format:: Raw :: key ( & key) ;
587
+ writeln ! ( output, "engine delete {fmtkey} [{rawkey}]" ) ?
588
+ }
589
+ testengine:: Operation :: Flush => writeln ! ( output, "engine flush" ) ?,
590
+ testengine:: Operation :: Set { key, value } => {
591
+ let fmtkv = format:: Raft :: < format:: Raw > :: key_value ( & key, & value) ;
592
+ let rawkv = format:: Raw :: key_value ( & key, & value) ;
593
+ writeln ! ( output, "engine set {fmtkv} [{rawkv}]" ) ?
594
+ }
595
+ }
556
596
}
557
597
}
558
598
559
- // Process engine operations, either output or drain.
560
- let mut output = String :: new ( ) ;
561
- while let Ok ( op) = self . op_rx . try_recv ( ) {
562
- match op {
563
- _ if !show_ops => { }
564
- Operation :: Delete { key } => writeln ! (
565
- output,
566
- "engine delete {} [{}]" ,
567
- format:: Raft :: <format:: Raw >:: key( & key) ,
568
- format:: Raw :: key( & key)
569
- ) ?,
570
- Operation :: Flush => writeln ! ( output, "engine flush" ) ?,
571
- Operation :: Set { key, value } => writeln ! (
572
- output,
573
- "engine set {} [{}]" ,
574
- format:: Raft :: <format:: Raw >:: key_value( & key, & value) ,
575
- format:: Raw :: key_value( & key, & value)
576
- ) ?,
577
- }
599
+ if let Some ( tag) = tags. iter ( ) . next ( ) {
600
+ return Err ( format ! ( "unknown tag {tag}" ) . into ( ) ) ;
578
601
}
579
- Ok ( output)
580
- }
581
- }
582
602
583
- impl TestRunner {
584
- fn new ( ) -> Self {
585
- // Use both a BitCask and a Memory engine, and mirror operations
586
- // across them. Emit write events to op_tx.
587
- let ( op_tx, op_rx) = crossbeam:: channel:: unbounded ( ) ;
588
- let tempdir = tempfile:: TempDir :: with_prefix ( "toydb" ) . expect ( "tempdir failed" ) ;
589
- let bitcask =
590
- storage:: BitCask :: new ( tempdir. path ( ) . join ( "bitcask" ) ) . expect ( "bitcask failed" ) ;
591
- let memory = storage:: Memory :: new ( ) ;
592
- let engine = testengine:: Emit :: new ( testengine:: Mirror :: new ( bitcask, memory) , op_tx) ;
593
- let log = Log :: new ( Box :: new ( engine) ) . expect ( "log init failed" ) ;
594
- Self { log, op_rx, tempdir }
595
- }
596
-
597
- /// Parses an index@term pair.
598
- fn parse_index_term ( s : & str ) -> Result < ( Index , Term ) , Box < dyn Error > > {
599
- let re = Regex :: new ( r"^(\d+)@(\d+)$" ) . expect ( "invalid regex" ) ;
600
- let groups = re. captures ( s) . ok_or_else ( || format ! ( "invalid index/term {s}" ) ) ?;
601
- let index = groups. get ( 1 ) . unwrap ( ) . as_str ( ) . parse ( ) ?;
602
- let term = groups. get ( 2 ) . unwrap ( ) . as_str ( ) . parse ( ) ?;
603
- Ok ( ( index, term) )
603
+ Ok ( output)
604
604
}
605
605
606
- /// Parses an index range, in Rust range syntax.
607
- fn parse_index_range ( s : & str ) -> Result < impl std:: ops:: RangeBounds < Index > , Box < dyn Error > > {
608
- use std:: ops:: Bound ;
609
- let mut bound = ( Bound :: < Index > :: Unbounded , Bound :: < Index > :: Unbounded ) ;
610
- let re = Regex :: new ( r"^(\d+)?\.\.(=)?(\d+)?" ) . expect ( "invalid regex" ) ;
611
- let groups = re. captures ( s) . ok_or_else ( || format ! ( "invalid range {s}" ) ) ?;
612
- if let Some ( start) = groups. get ( 1 ) {
613
- bound. 0 = Bound :: Included ( start. as_str ( ) . parse ( ) ?) ;
614
- }
615
- if let Some ( end) = groups. get ( 3 ) {
616
- let end = end. as_str ( ) . parse ( ) ?;
617
- if groups. get ( 2 ) . is_some ( ) {
618
- bound. 1 = Bound :: Included ( end)
619
- } else {
620
- bound. 1 = Bound :: Excluded ( end)
621
- }
622
- }
623
- Ok ( bound)
606
+ /// If requested via [ops] tag, output engine operations for the command.
607
+ fn end_command ( & mut self , _: & goldenscript:: Command ) -> Result < String , Box < dyn Error > > {
608
+ // Drain any remaining engine operations.
609
+ while self . op_rx . try_recv ( ) . is_ok ( ) { }
610
+ Ok ( String :: new ( ) )
624
611
}
625
612
}
626
613
}
0 commit comments