|
| 1 | +from typing import List |
| 2 | + |
| 3 | +import requests |
| 4 | + |
| 5 | +from python_pik_api_wrapper.entity.block import Block |
| 6 | +from python_pik_api_wrapper.entity.bulk import Bulk |
| 7 | +from python_pik_api_wrapper.entity.item import Item |
| 8 | +from python_pik_api_wrapper.entity.location import Location |
| 9 | +from python_pik_api_wrapper.value_object.photo import Photo |
| 10 | + |
| 11 | + |
| 12 | +class PikWrapper: |
| 13 | + def __init__(self): |
| 14 | + self.base_url = 'https://api.pik.ru/v1/' |
| 15 | + self.headers = { |
| 16 | + 'accept': '*/*', |
| 17 | + 'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_1) AppleWebKit/605.1.15 (KHTML, like Gecko) ' |
| 18 | + 'Version/13.0.3 Safari/605.1.15', |
| 19 | + 'wrapper': 'https://github.com/drillcoder/python_pik_api_wrapper' |
| 20 | + } |
| 21 | + |
| 22 | + def send_request(self, url:str) -> List: |
| 23 | + if type(url) is not str: |
| 24 | + raise ValueError |
| 25 | + response = requests.get(self.base_url + url, headers=self.headers) |
| 26 | + if response.status_code != requests.codes.ok: |
| 27 | + raise ValueError |
| 28 | + if response.status_code == requests.codes.unprocessable: |
| 29 | + return [] |
| 30 | + content = response.json() |
| 31 | + if type(content) is dict: |
| 32 | + if content.get('message') == 'ERR_NEWS_NOT_FOUND': |
| 33 | + raise [] |
| 34 | + if type(content.get('items')) is not None: |
| 35 | + content = content.get('items') |
| 36 | + if type(content) is not list: |
| 37 | + raise ValueError |
| 38 | + return content |
| 39 | + |
| 40 | + def get_locations(self) -> List[Location]: |
| 41 | + locations = self.send_request('location') |
| 42 | + locations_list = [] |
| 43 | + count = 0 |
| 44 | + for location in locations: |
| 45 | + if location['id'] == 2: |
| 46 | + locations_list.append(Location('2', 'Москва', count)) |
| 47 | + count += 1 |
| 48 | + locations_list.append(Location('3', 'Московская область', count)) |
| 49 | + continue |
| 50 | + locations_list.append(Location(str(location['id']), location['name'], count)) |
| 51 | + count += 1 |
| 52 | + return locations_list |
| 53 | + |
| 54 | + def get_blocks(self, location: Location) -> List[Block]: |
| 55 | + if type(location) is not Location: |
| 56 | + raise ValueError |
| 57 | + blocks = self.send_request(f"block?metadata=1&types=1,2&locations={location.id}") |
| 58 | + blocks_list = [] |
| 59 | + for block in blocks: |
| 60 | + blocks_list.append(Block(str(block['id']), block['name'], block['sort'])) |
| 61 | + return blocks_list |
| 62 | + |
| 63 | + def get_bulks(self, block: Block) -> List[Bulk]: |
| 64 | + if type(block) is not Block: |
| 65 | + raise ValueError |
| 66 | + bulks = self.send_request(f"bulk?block_id={block.id}") |
| 67 | + bulks_list = [] |
| 68 | + count = 0 |
| 69 | + for bulk in bulks: |
| 70 | + if bulk['type_id'] not in {1, 2, 103} or len(bulk['name']) == 0: |
| 71 | + continue |
| 72 | + bulks_list.append(Bulk(str(bulk['id']), bulk['name'], count, bulk['type_id'])) |
| 73 | + count += 1 |
| 74 | + return bulks_list |
| 75 | + |
| 76 | + def get_items(self, bulk: Bulk) -> List[Item]: |
| 77 | + if type(bulk) is not Bulk: |
| 78 | + raise ValueError |
| 79 | + items = self.send_request(f"news?limit=all&is_content=1&is_progress=1&bulk_id={bulk.id}") |
| 80 | + items_list = [] |
| 81 | + for item in items: |
| 82 | + url = 'https:' + item['preview'] |
| 83 | + images_path = [item['preview']] |
| 84 | + photos = [Photo(url, 0)] |
| 85 | + sort = 1 |
| 86 | + if type(item.get('gallery')) is list: |
| 87 | + for photo in item['gallery']: |
| 88 | + if photo['file_path'] in images_path: |
| 89 | + continue |
| 90 | + images_path.append(photo['file_path']) |
| 91 | + url = 'https:' + photo['file_path'] |
| 92 | + photos.append(Photo(url, sort)) |
| 93 | + sort += 1 |
| 94 | + items_list.append(Item(item['id'], item['public_date'], item['date'], photos)) |
| 95 | + return items_list |
0 commit comments