|
| 1 | +#!/usr/bin/env perl |
| 2 | + |
| 3 | +use strict; |
| 4 | +use warnings; |
| 5 | +use Time::HiRes qw| usleep |; |
| 6 | +use Test::More; |
| 7 | +use Data::Dumper; |
| 8 | + |
| 9 | +local $Data::Dumper::Indent = 1; |
| 10 | +local $Data::Dumper::Terse = 1; |
| 11 | + |
| 12 | +$ENV{DBD_ORACLE_DUMP} = 0; |
| 13 | + |
| 14 | +our $VERSION = 0.1; |
| 15 | +our $VERBOSE = 0; |
| 16 | +our $ORACLE_HOME = $ENV{ORACLE_HOME}; |
| 17 | + |
| 18 | +my $TEST_START = Time::HiRes::time(); |
| 19 | + |
| 20 | +sub section |
| 21 | +{ |
| 22 | + my $msg = shift; |
| 23 | + note '+ --------------------------------------------- +'; |
| 24 | + note " $msg"; |
| 25 | + note '+ --------------------------------------------- +'; |
| 26 | + return; |
| 27 | +} |
| 28 | + |
| 29 | +sub abort |
| 30 | +{ |
| 31 | + my $msg = shift; |
| 32 | + printf STDERR "\n"; |
| 33 | + printf STDERR "# + --------------------------------------------- +\n"; |
| 34 | + printf STDERR "# %s\n", $msg; |
| 35 | + printf STDERR "# + --------------------------------------------- +\n"; |
| 36 | + printf STDERR "\n"; |
| 37 | + note sprintf 'Completed in %5.3fs', Time::HiRes::time() - $TEST_START; |
| 38 | + done_testing(); |
| 39 | + exit 1; |
| 40 | +} |
| 41 | + |
| 42 | +## Noise hides real issues (if there are any) |
| 43 | +local $SIG{__WARN__} = sub { warn $_[0] unless $_[0] =~ m/^Subroutine/xi }; |
| 44 | + |
| 45 | +PERL_NOTICE: |
| 46 | +{ |
| 47 | + note qx|perl -V| if $VERBOSE; |
| 48 | +} |
| 49 | + |
| 50 | +ORACLE_READY: |
| 51 | +{ |
| 52 | + Child::Queue->do_connect( { PrintError => 0 } ) or plan skip_all => "Unable to connect to oracle\n"; |
| 53 | +} |
| 54 | + |
| 55 | +QUEUE_BASICS: |
| 56 | +{ |
| 57 | + section 'QUEUE - BASICS'; |
| 58 | + |
| 59 | + my $queue = Child::Queue->new( -DEPTH => 8 ); |
| 60 | + |
| 61 | + is $queue->depth, 8, 'Queue depth'; |
| 62 | + is $queue->size, 0, 'Queue size'; |
| 63 | + is $queue->running, 0, 'Queue running'; |
| 64 | + ok $queue->isIdle, 'Queue is idle'; |
| 65 | + ok !$queue->isBusy, 'Queue is not busy'; |
| 66 | + ok $queue->hasSlots, 'Queue has slots'; |
| 67 | + ok !$queue->isFull, 'Queue is not full'; |
| 68 | + ok $queue->enqueue(1), 'Enqueue 1'; |
| 69 | + is $queue->size, 1, 'Queue size'; |
| 70 | + ok $queue->enqueue(2), 'Enqueue 2'; |
| 71 | + is $queue->size, 2, 'Queue size'; |
| 72 | + is $queue->running, 0, 'Queue running'; |
| 73 | + is $queue->dequeue, 1, 'Dequeue 1'; |
| 74 | + is $queue->size, 1, 'Queue size'; |
| 75 | + is $queue->dequeue, 2, 'Dequeue 2'; |
| 76 | + is $queue->size, 0, 'Queue size'; |
| 77 | + ok $queue->isIdle, 'Queue is idle'; |
| 78 | + ok !$queue->isBusy, 'Queue is not busy'; |
| 79 | + ok $queue->hasSlots, 'Queue has slots'; |
| 80 | +} |
| 81 | + |
| 82 | + |
| 83 | +FORK_SEGV: |
| 84 | +{ |
| 85 | +# last FORK_SEGV if 1; |
| 86 | + |
| 87 | + section 'FORK - SEGV'; |
| 88 | + |
| 89 | + my $queue = Child::Queue->new( -DEPTH => 8 ); |
| 90 | + my $jobs = 80; |
| 91 | + |
| 92 | + is $queue->depth, 8, 'Queue depth'; |
| 93 | + is $queue->size, 0, 'Queue size'; |
| 94 | + is $queue->running, 0, 'Queue running'; |
| 95 | + ok $queue->isIdle, 'Queue is idle'; |
| 96 | + ok !$queue->isBusy, 'Queue is not busy'; |
| 97 | + ok $queue->hasSlots, 'Queue has slots'; |
| 98 | + ok !$queue->isFull, 'Queue is not full'; |
| 99 | + |
| 100 | + |
| 101 | + for my $i ( 1 .. $jobs ) |
| 102 | + { |
| 103 | + my $job = sprintf 'JOB-%03d', $i; |
| 104 | + ok $queue->enqueue($job), 'Enqueue ' . $job; |
| 105 | + } |
| 106 | + |
| 107 | + is $queue->size, $jobs, 'Queue size'; |
| 108 | + is $queue->running, 0, 'Queue running - zero'; |
| 109 | + |
| 110 | + ok $queue->startone($queue->dequeue), 'Start one child ->> 1'; |
| 111 | + is $queue->size, $jobs-1, 'Queue size verified'; |
| 112 | + ok $queue->startone($queue->dequeue), 'Start one child ->> 2'; |
| 113 | + is $queue->size, $jobs-2, 'Queue size verified'; |
| 114 | + ok $queue->run, 'queue->run - start -DEPTH children'; |
| 115 | + is $queue->running, 8, 'Queue running - 8 children started'; |
| 116 | + ok $queue->isFull, 'Queue is full'; |
| 117 | + |
| 118 | + # note Dumper($Child::Queue::WORKSET); |
| 119 | + |
| 120 | + while ( $queue->isBusy ) |
| 121 | + { |
| 122 | + usleep(50000); |
| 123 | + $queue->run if $queue->hasSlots && $queue->size; |
| 124 | + usleep(15000); |
| 125 | + } |
| 126 | + |
| 127 | + is $queue->size, 0, 'Queue size - all jobs done'; |
| 128 | + is $queue->running, 0, 'Queue running - zero'; |
| 129 | + ok $queue->isIdle, 'Queue is idle'; |
| 130 | + ok !$queue->isBusy, 'Queue is not busy'; |
| 131 | + ok $queue->hasSlots, 'Queue has slots'; |
| 132 | + ok !$queue->isFull, 'Queue is not full'; |
| 133 | +} |
| 134 | + |
| 135 | + |
| 136 | +note sprintf 'Completed in %5.3fs', Time::HiRes::time() - $TEST_START; |
| 137 | +done_testing(); |
| 138 | + |
| 139 | + |
| 140 | +## Children QUEUE |
| 141 | + |
| 142 | +package Child::Queue; |
| 143 | + |
| 144 | +use strict; |
| 145 | +use warnings; |
| 146 | +use Data::Dumper; |
| 147 | +use POSIX ":sys_wait_h"; |
| 148 | + |
| 149 | +use lib 't/lib'; |
| 150 | +use DBDOracleTestLib qw/ db_handle /; |
| 151 | + |
| 152 | +our $VERSION; |
| 153 | +our $VERBOSE; |
| 154 | +our $QUEUE; |
| 155 | +our $WORKSET; |
| 156 | + |
| 157 | +sub _SIG_CHLD |
| 158 | +{ |
| 159 | + my $pid = waitpid(-1, WNOHANG); |
| 160 | + my $code = $? >> 8; |
| 161 | + |
| 162 | + return unless $pid > 0; |
| 163 | + |
| 164 | + if ( exists $WORKSET->{$pid} ) |
| 165 | + { |
| 166 | + my $child = delete $WORKSET->{$pid}; |
| 167 | + my $results = $child->finish( $code ); |
| 168 | + printf "# Child %d finished with code %d\n", $pid, $results->{CODE}; |
| 169 | + print Dumper($results); |
| 170 | + } |
| 171 | + else |
| 172 | + { |
| 173 | + printf "# Child %d finished but not in workset", $pid; |
| 174 | + } |
| 175 | +} |
| 176 | + |
| 177 | +BEGIN { |
| 178 | + $VERSION = 0.1; |
| 179 | + $VERBOSE = $main::VERBOSE || 0; |
| 180 | + $QUEUE = []; |
| 181 | + $WORKSET = {}; # PID => Child::Runner |
| 182 | + |
| 183 | + $SIG{CHLD} = \&_SIG_CHLD; |
| 184 | +} |
| 185 | + |
| 186 | +sub new |
| 187 | +{ |
| 188 | + my $self = shift; |
| 189 | + my $args = ref $_[0] ? shift : { @_ }; |
| 190 | + return bless $args, $self |
| 191 | +} |
| 192 | + |
| 193 | +sub depth { return $_[0]->{-DEPTH} } |
| 194 | +sub isBusy { return $_[0]->size > 0 || $_[0]->running > 0 } |
| 195 | +sub isIdle { return ! $_[0]->isBusy } |
| 196 | +sub enqueue { return push @ $QUEUE, pop } |
| 197 | +sub dequeue { return shift @ $QUEUE } |
| 198 | +sub size { return scalar @ $QUEUE } |
| 199 | +sub running { return scalar keys % $WORKSET } |
| 200 | +sub isFull { return $_[0]->running >= $_[0]->depth } |
| 201 | +sub hasSlots { return ! $_[0]->isFull } |
| 202 | + |
| 203 | +sub do_connect |
| 204 | +{ |
| 205 | + shift if $_[0] && ( ref($_[0]) eq __PACKAGE__ || $_[0] eq __PACKAGE__ ); |
| 206 | + return db_handle(@_); |
| 207 | +} |
| 208 | + |
| 209 | +sub startone |
| 210 | +{ |
| 211 | + my $self = shift; |
| 212 | + my $job = shift; |
| 213 | + my $child = Child::Runner->new($job); |
| 214 | + |
| 215 | + ## Make sure it stays set???? |
| 216 | + # $SIG{CHLD} = \&_SIG_CHLD; |
| 217 | + |
| 218 | + if ( ! defined $child->pid ) |
| 219 | + { |
| 220 | + warn "Unable to start child for job: $job"; |
| 221 | + return; |
| 222 | + } |
| 223 | + |
| 224 | + $WORKSET->{$child->pid} = $child; |
| 225 | +} |
| 226 | + |
| 227 | +sub run |
| 228 | +{ |
| 229 | + my $self = shift; |
| 230 | + |
| 231 | + while ( $self->hasSlots && $self->size ) |
| 232 | + { |
| 233 | + $self->startone( $self->dequeue ); |
| 234 | + |
| 235 | + # my $job = shift @ $QUEUE; |
| 236 | + # my $child = Child::Runner->new($job); |
| 237 | + |
| 238 | + # $WORKSET->{$child->pid} = $child; |
| 239 | + } |
| 240 | + |
| 241 | + return $self->isFull; |
| 242 | +} |
| 243 | + |
| 244 | + |
| 245 | +package Child::Runner; |
| 246 | + |
| 247 | +use strict; |
| 248 | +use warnings; |
| 249 | +use IPC::Open3 (); |
| 250 | +use Symbol 'gensym'; |
| 251 | + |
| 252 | +sub new |
| 253 | +{ |
| 254 | + my $self = bless {}, shift; |
| 255 | + my $job = $self->job(shift); |
| 256 | + my ( $in, $out, $err ) = (undef, undef, gensym); |
| 257 | + my $pid = IPC::Open3::open3( $in, $out, $err, $^X, 't/92-segv-fork.pl', $job ); |
| 258 | + |
| 259 | + if ( ! defined $pid ) |
| 260 | + { |
| 261 | + warn "Unable to fork: $!"; |
| 262 | + return; |
| 263 | + } |
| 264 | + |
| 265 | + $in->close or warn $! if $in; |
| 266 | + $self->pid($pid); |
| 267 | + $self->out($out); |
| 268 | + $self->err($err); |
| 269 | + |
| 270 | + return $self; |
| 271 | +} |
| 272 | + |
| 273 | +sub finish |
| 274 | +{ |
| 275 | + my $self = shift; |
| 276 | + my $code = shift; |
| 277 | + my $job = $self->job; |
| 278 | + my $pid = $self->pid; |
| 279 | + my $out = $self->out; |
| 280 | + my $err = $self->err; |
| 281 | + my $results = { -JOB => $job, -PID => $pid, -OUT => [], -ERR => [] }; |
| 282 | + |
| 283 | + if ( $self->pid ) |
| 284 | + { |
| 285 | + my $O = $results->{-OUT}; |
| 286 | + my $E = $results->{-ERR}; |
| 287 | + |
| 288 | + while ( my $l = <$out> ) { chomp $l; push @ $O, $l } |
| 289 | + while ( my $l = <$err> ) { chomp $l; push @ $E, $l } |
| 290 | + |
| 291 | + close $out or warn "Unable to close out: $!"; |
| 292 | + close $err or warn "Unable to close err: $!"; |
| 293 | + |
| 294 | + # waitpid( $pid, 0 ); |
| 295 | + # $results->{ CODE } = $? >> 8; |
| 296 | + $results->{ CODE } = $code; |
| 297 | + } |
| 298 | + |
| 299 | + return $results; |
| 300 | +} |
| 301 | + |
| 302 | +sub job { return defined $_[1] ? $_[0]->{_JOB______} = $_[1] : $_[0]->{_JOB______} } |
| 303 | +sub pid { return defined $_[1] ? $_[0]->{_PID______} = $_[1] : $_[0]->{_PID______} } |
| 304 | +sub out { return defined $_[1] ? $_[0]->{_OUT______} = $_[1] : $_[0]->{_OUT______} } |
| 305 | +sub err { return defined $_[1] ? $_[0]->{_ERR______} = $_[1] : $_[0]->{_ERR______} } |
| 306 | + |
| 307 | +1; |
| 308 | + |
| 309 | +## vim: number expandtab tabstop=2 shiftwidth=2 |
| 310 | +## END |
0 commit comments