1+ from playwright .sync_api import sync_playwright , Playwright
2+ from dataclasses import dataclass
3+ import pandas as pd
4+ from playwright .async_api import Locator
5+ import re
6+ import time
7+ import argparse
8+ import os
9+
10+ @dataclass
11+ class Business :
12+ """holds business data"""
13+ name : str = '-'
14+ address : str = '-'
15+ website : str = '-'
16+ phone_number : str = '-'
17+ reviews_count : int = '-'
18+ ratings : float = '-'
19+ industry :str = '-'
20+ google_link :str = '-'
21+ latitude : float = '-'
22+ longitude : float = '-'
23+
24+
25+ def __repr__ (self ) -> str :
26+ return f"\n Company:{ self .name } \n Stars:{ self .ratings } \n Website:{ self .website } \n Industry:{ self .industry } \n Phone:{ self .phone_number } \n Google Link:{ self .google_link } "
27+
28+ @dataclass
29+ class ElementAttributes :
30+ COMPANY_TILE = 'hfpxzc'
31+ FOCUS_REGION = 'hfpxzc'
32+ LIST_END = 'HlvSq' # The element we encounter when no-more data can be loaded.
33+ COMPANY_NAME = '.DUwDvf.lfPIob'
34+ COMPANY_WEBSITE = '.rogA2c.ITvuef'
35+ COMPANY_RATINGS = '.ceNzKf'
36+ COMPANY_INDUSTRY = '.DkEaL'
37+ COMPANY_DETAILS = '.Io6YTe.fontBodyMedium.kR99db'
38+
39+
40+ def scrape_google_links (query :str ):
41+ businesses :list [Business ] = []
42+
43+ with sync_playwright () as p :
44+ try :
45+ # DECLARATION
46+ browser = p .chromium .launch (headless = True )
47+ page = browser .new_page ()
48+
49+
50+ # INITIATE - SEARCH AND LOCATE SCRAPING REGION
51+ page .goto (query )
52+ page .locator (f'.{ ElementAttributes .FOCUS_REGION } ' ).first .focus ()
53+
54+
55+
56+ # SCROLL THE LIST TO LOAD EACH ELEMENT
57+ for _ in range (100 ):
58+ page .keyboard .press ("End" )
59+ print (f"\n { '-' * 10 } Scrolling{ '-' * 10 } \n " )
60+ if (page .locator (f'.{ ElementAttributes .LIST_END } ' ).is_visible ()):
61+ break
62+ time .sleep (1 )
63+
64+ # FETCHING ALL THE COMPANY TILE/BUSINESS PROFILE ELEMENTS.
65+ _companies = page .locator (f'.{ ElementAttributes .COMPANY_TILE } ' ).all ()
66+ companies :list [Locator ] = _companies
67+ total = (len (_companies ))
68+ i = 1
69+
70+
71+ # EXTRACT GOOGLE PAGE LINK FROM EACH OF THE RESULTS.
72+ for company in companies :
73+
74+ biz = Business ()
75+ biz .google_link = company .get_attribute ('href' )
76+ businesses .append (biz )
77+
78+ print (f"\n { '-' * 10 } \n { i } /{ total } \n { biz } \n { '-' * 10 } \n " )
79+
80+ i += 1
81+
82+
83+ print ("out of loop now" )
84+ browser .close ()
85+
86+ # SAVE THE RESULT IN A CSV
87+ df = make_dataframe_for_links (businesses )
88+
89+ return df
90+
91+ except Exception as e :
92+ print (f"{ '-' * 10 } x{ '-' * 10 } " )
93+ print (f"Some shit timed out." )
94+ print (e )
95+ print (f"{ '-' * 10 } x{ '-' * 10 } " )
96+
97+
98+ def scrape_google_page (page_link ) -> dict :
99+ with sync_playwright () as p :
100+ try :
101+ # DECLARATION
102+ browser = p .chromium .launch (headless = True )
103+ page = browser .new_page ()
104+
105+ # INITIATE - SEARCH AND LOCATE SCRAPING REGION
106+ page .goto (page_link )
107+ biz = Business ()
108+
109+ # SCRAPING DIFFERENT DATA POINTS
110+ name = page .locator (ElementAttributes .COMPANY_NAME ).text_content ()
111+ biz .name = name
112+
113+
114+ website = page .locator (ElementAttributes .COMPANY_WEBSITE ).text_content (timeout = 2500 )
115+ biz .website = website
116+
117+
118+ ratings = page .locator (ElementAttributes .COMPANY_RATINGS ).get_attribute ('aria-label' )
119+ biz .ratings = ratings
120+
121+
122+ industry = page .locator (ElementAttributes .COMPANY_INDUSTRY ).first .text_content ()
123+ biz .industry = industry
124+
125+ #SCRAPES ALL THE COMPANY DETAILS AND FILTERS THE PHONE NUMBER USING REGEX
126+ detail_elements = page .locator (ElementAttributes .COMPANY_DETAILS ).all ()
127+ pattern = re .compile (r"(\+\d{1,3})?\s?\(?\d{1,4}\)?[\s.-]?\d{3}[\s.-]?\d{4}" )
128+ for detail in detail_elements :
129+ phone_number = detail .all_text_contents ()
130+ if (len (phone_number [0 ])> 20 ):
131+ continue
132+ else :
133+ match = re .search (pattern , phone_number [0 ])
134+ if (match ):
135+ biz .phone_number = phone_number [0 ]
136+ break
137+
138+ biz .google_link = page_link
139+
140+ print (f"\n { '-' * 10 } \n { biz } \n { '-' * 10 } \n " )
141+
142+ df = make_dataframe_for_pages (biz )
143+
144+ return df
145+ except Exception as e :
146+ print (f"{ '-' * 10 } x{ '-' * 10 } " )
147+ print (f"Some shit timed out." )
148+ print (e )
149+ print (f"{ '-' * 10 } x{ '-' * 10 } " )
150+
151+ # MAKING A DATAFRAME FOR INFORMATION FROM BUSINESS PAGES
152+ def make_dataframe_for_links (bizlist :list [Business ]):
153+ data = {
154+ "google_link" :[],
155+ }
156+ for biz in bizlist :
157+ data ['google_link' ].append (biz .google_link )
158+
159+ return data
160+
161+
162+ # MAKING A DATAFRAME FOR INFORMATION TAKEN FROM BUSINESS PAGES
163+ def make_dataframe_for_pages (biz :Business ) -> dict :
164+ data = {"company_name" :[],
165+ "company_website" :[],
166+ "ratings" :[],
167+ "industry" :[],
168+ "phone" :[],
169+ "google_link" :[]}
170+
171+ data ['company_name' ].append (biz .name )
172+ data ['company_website' ].append (biz .website )
173+ data ['ratings' ].append (biz .ratings )
174+ data ['industry' ].append (biz .industry )
175+ data ['phone' ].append (biz .phone_number )
176+ data ['google_link' ].append (biz .google_link )
177+
178+ return data
179+
180+ # CREATE GOOGLE MAP URLS FROM THE LIST OF STATES IN USA
181+ def create_urls (keyword :str ,):
182+ slug = keyword .replace (" " , "+" )
183+ locations = []
184+ queries = []
185+ locations = open ('maps.txt' ,'r' ).read ().splitlines ()
186+ for loc in locations :
187+ query = f"https://www.google.com/maps/search/{ slug } +near+{ loc .replace (' ' , '+' )} "
188+ queries .append (query )
189+ return queries
190+
191+
192+ # SCRAPE URLS OF BUSINESS PAGES AND STORE IT IN 'data/links/{filename}.csv'
193+ def scrape_business_urls (keyword :str ):
194+ urls = create_urls (keyword )
195+
196+ for url in urls :
197+ result_df = scrape_google_links (url )
198+
199+ if (result_df ):
200+ df = pd .DataFrame (result_df )
201+ file_name = f'data/links/{ keyword } .csv'
202+ if (os .path .isfile (file_name )):
203+ df .to_csv (file_name , index = False , header = False , mode = 'a' )
204+ else :
205+ df .to_csv (file_name , index = False , header = True , mode = 'x' )
206+
207+
208+ # SCRAPE DATA USING THE BUSINESS PAGE LINKS IN 'data/links/{filename}.csv'
209+ # AND STORE IT IN 'data/{filename}.csv'
210+ def scrape_business_pages (urls_csv , keyword ):
211+ df = pd .read_csv (urls_csv )
212+ links = df ['google_link' ]
213+
214+ for page_link in links .tolist ():
215+ result_df :dict = scrape_google_page (page_link )
216+
217+ if (result_df != None ):
218+ if (len (result_df ) > 0 ):
219+ df = pd .DataFrame (result_df , index = None )
220+ file_name = f'data/{ keyword } .csv'
221+ if (os .path .isfile (file_name )):
222+ df .to_csv (file_name , index = False , header = False , mode = 'a' )
223+ else :
224+ df .to_csv (file_name , index = False , header = True , mode = 'x' )
225+
226+ def clean_data (filename :str ):
227+ df = pd .read_csv (filename )
228+ df .drop_duplicates (subset = ['company_name' ], keep = 'first' )
229+
230+ df .to_csv ('cleaned.csv' )
231+
232+
233+
234+ if __name__ == "__main__" :
235+ parser = argparse .ArgumentParser ()
236+ parser .add_argument ('--keyword' , type = str , help = 'Give Keyword' , required = True )
237+ parser .add_argument ('-l' , action = 'store_true' , help = 'Get links' )
238+ parser .add_argument ('-r' , action = 'store_true' , help = 'Get records' )
239+
240+ args = parser .parse_args ()
241+
242+ keyword = args .keyword
243+
244+ if (args .r ):
245+ scrape_business_pages (f"data/links/{ keyword } .csv" , keyword )
246+
247+ if (args .l ):
248+ scrape_business_urls (keyword )
0 commit comments