|
| 1 | +<?php |
| 2 | + |
| 3 | +namespace App\Console\Commands; |
| 4 | + |
| 5 | +use Illuminate\Console\Command; |
| 6 | + |
| 7 | +use App\Stream; |
| 8 | +use App\Pmid; |
| 9 | +use App\Curation; |
| 10 | + |
| 11 | +class QueryKafka extends Command |
| 12 | +{ |
| 13 | + /** |
| 14 | + * The name and signature of the console command. |
| 15 | + * |
| 16 | + * @var string |
| 17 | + */ |
| 18 | + protected $signature = 'query:kafka {topic}'; |
| 19 | + |
| 20 | + /** |
| 21 | + * The console command description. |
| 22 | + * |
| 23 | + * @var string |
| 24 | + */ |
| 25 | + protected $description = 'Command description'; |
| 26 | + |
| 27 | + /** |
| 28 | + * Create a new command instance. |
| 29 | + * |
| 30 | + * @return void |
| 31 | + */ |
| 32 | + public function __construct() |
| 33 | + { |
| 34 | + parent::__construct(); |
| 35 | + } |
| 36 | + |
| 37 | + // gene_dosage, gene_dosage_raw, gene_dosage_sepio_in. |
| 38 | + |
| 39 | + /** |
| 40 | + * Execute the console command. |
| 41 | + * |
| 42 | + * @return mixed |
| 43 | + */ |
| 44 | + public function handle() |
| 45 | + { |
| 46 | + $topic = $this->argument('topic'); |
| 47 | + |
| 48 | + if ($topic === null) |
| 49 | + { |
| 50 | + echo "Topic not found \n"; |
| 51 | + exit; |
| 52 | + } |
| 53 | + |
| 54 | + $stream = Stream::name($topic)->first(); |
| 55 | + |
| 56 | + if ($stream === null) |
| 57 | + { |
| 58 | + echo "Topic not found \n"; |
| 59 | + exit; |
| 60 | + } |
| 61 | + |
| 62 | + $offset = $stream->offset; |
| 63 | + |
| 64 | + $conf = new \RdKafka\Conf(); |
| 65 | + |
| 66 | + // Set a rebalance callback to log partition assignments (optional) |
| 67 | + $conf->setRebalanceCb(function (\RdKafka\KafkaConsumer $kafka, $err, array $partitions = null) use ($offset) { |
| 68 | + switch ($err) { |
| 69 | + case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS: |
| 70 | + //echo "Assign: "; |
| 71 | + //var_dump($partitions); |
| 72 | + $kafka->assign($partitions); |
| 73 | + |
| 74 | + foreach ($partitions as $tp) { |
| 75 | + $tp->setOffset($offset); |
| 76 | + $kafka->commit([$tp]); |
| 77 | + } |
| 78 | + break; |
| 79 | + |
| 80 | + case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS: |
| 81 | + // echo "Revoke: "; |
| 82 | + //var_dump($partitions); |
| 83 | + $kafka->assign(NULL); |
| 84 | + break; |
| 85 | + default: |
| 86 | + throw new \Exception($err); |
| 87 | + } |
| 88 | + }); |
| 89 | + |
| 90 | + // Configure the group.id. All consumer with the same group.id will consume |
| 91 | + // different partitions. |
| 92 | + $conf->set('group.id', 'web_prod'); |
| 93 | + |
| 94 | + $conf->set('security.protocol', 'sasl_ssl'); |
| 95 | + $conf->set('sasl.mechanism', 'PLAIN'); |
| 96 | + $conf->set('sasl.username', $stream->username); |
| 97 | + $conf->set('sasl.password', $stream->password); |
| 98 | + |
| 99 | + // Initial list of Kafka brokers |
| 100 | + $conf->set('metadata.broker.list', $stream->endpoint); |
| 101 | + // Set where to start consuming messages when there is no initial offset in |
| 102 | + // offset store or the desired offset is out of range. |
| 103 | + // 'earliest': start from the beginning |
| 104 | + $conf->set('auto.offset.reset', 'earliest'); |
| 105 | + //$conf->set('enable.auto.commit', 'false'); |
| 106 | + |
| 107 | + |
| 108 | + $consumer = new \RdKafka\KafkaConsumer($conf); |
| 109 | + |
| 110 | + /*$availableTopics = $consumer->getMetadata(true, null, 60e3)->getTopics() |
| 111 | + ; |
| 112 | + echo "Available Topics: \n"; |
| 113 | + foreach ($availableTopics as $idx => $avlTopic) { |
| 114 | + echo $idx.': '.$avlTopic->getTopic()."\n"; |
| 115 | + }*/ |
| 116 | + |
| 117 | + //$a = $consumer->getCommittedOffsets([new \RdKafka\TopicPartition('gene_dosage', 0)], 60*1000); |
| 118 | + //$low = $high = 0; |
| 119 | + |
| 120 | + //$consumer->queryWatermarkOffsets('web-group-events', 0, $low, $high, 60*1000); |
| 121 | + //dd($high); |
| 122 | + |
| 123 | + // Subscribe to topic 'test' |
| 124 | + $consumer->subscribe([$stream->topic]); |
| 125 | + |
| 126 | + //echo "Waiting for partition assignment... (make take some time when\n"; |
| 127 | + //echo "quickly re-joining the group after leaving it.)\n"; |
| 128 | + |
| 129 | + echo "Reading...\n"; |
| 130 | + while (true) { |
| 131 | + $message = $consumer->consume(120*1000); |
| 132 | + //echo $message->err . "\n"; |
| 133 | + |
| 134 | + switch ($message->err) { |
| 135 | + case 0: |
| 136 | + case RD_KAFKA_RESP_ERR_NO_ERROR: |
| 137 | + $payload = json_decode($message->payload); |
| 138 | + //dd($payload); |
| 139 | + $a = $stream->parser; |
| 140 | + $a($payload); |
| 141 | + $stream->update(['offset' => $message->offset + 1]); |
| 142 | + break; |
| 143 | + case RD_KAFKA_RESP_ERR__PARTITION_EOF: |
| 144 | + echo "No more messages; will wait for more\n"; |
| 145 | + break 2; |
| 146 | + case RD_KAFKA_RESP_ERR__TIMED_OUT: |
| 147 | + echo "Timed out\n"; |
| 148 | + break 2; |
| 149 | + default: |
| 150 | + throw new \Exception($message->errstr(), $message->err); |
| 151 | + break; |
| 152 | + } |
| 153 | + } |
| 154 | + |
| 155 | + echo "Update Complete\n"; |
| 156 | + } |
| 157 | +} |
0 commit comments