Skip to content

additional features #4

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 12 additions & 5 deletions utils/comments.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import csv
from datetime import datetime as dt

comments = []
today = dt.today().strftime('%d-%m-%Y')
PATH = 'commentsFolder/'

def process_comments(response_items, csv_output=False):
comments = []

for res in response_items:

Expand All @@ -29,13 +30,19 @@ def process_comments(response_items, csv_output=False):
return comments


def make_csv(comments, channelID=None):
def make_csv(comments, channelID=None, videoID=None):
# Handle 0 comments issue
if len(comments) == 0:
return

header = comments[0].keys()

if channelID:
filename = f'comments_{channelID}_{today}.csv'
if channelID and videoID:
filename = f'{PATH}comments_{channelID}_{videoID}_{today}.csv'
elif channelID:
filename = f'{PATH}comments_{channelID}_{today}.csv'
else:
filename = f'comments_{today}.csv'
filename = f'{PATH}comments_{today}.csv'

with open(filename, 'w', encoding='utf8', newline='') as f:
writer = csv.DictWriter(f, fieldnames=header)
Expand Down
207 changes: 150 additions & 57 deletions yt_public.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,28 @@
import os
import csv
from datetime import datetime as dt
from urllib import response
from dotenv import load_dotenv
from googleapiclient.discovery import build

from utils.comments import process_comments, make_csv

load_dotenv()
API_KEY = os.getenv("API_KEY")
API_KEY_1 = os.getenv("API_KEY_1")
API_KEY_2 = os.getenv("API_KEY_2")
API_KEY_3 = os.getenv("API_KEY_3")
API_KEY_4 = os.getenv("API_KEY_4")
API_KEY_5 = os.getenv("API_KEY_5")

youtube = build("youtube", "v3", developerKey=API_KEY)
youtube_1 = build("youtube", "v3", developerKey=API_KEY_1)
youtube_2 = build("youtube", "v3", developerKey=API_KEY_2)
youtube_3 = build("youtube", "v3", developerKey=API_KEY_3)
youtube_4 = build("youtube", "v3", developerKey=API_KEY_4)
youtube_5 = build("youtube", "v3", developerKey=API_KEY_5)

def search_result(query):
scraped_videos = {}

def search_result(youtube, query):
"""
Refer to the documentation: https://googleapis.github.io/google-api-python-client/docs/dyn/youtube_v3.search.html
"""
Expand All @@ -21,87 +34,167 @@ def search_result(query):

return request.execute()

def channel_stats(channelID):
def get_video_ids(youtube, channelId):
"""
Refer to the documentation: https://googleapis.github.io/google-api-python-client/docs/dyn/youtube_v3.channels.html
Refer to the documentation: https://googleapis.github.io/google-api-python-client/docs/dyn/youtube_v3.search.html
"""
videoIds = []

request = youtube.channels().list(
part="statistics",
id=channelID
part="contentDetails",
id=channelId
)
return request.execute()

def comment_threads(channelID, to_csv=False):

comments_list = []

request = youtube.commentThreads().list(
part='id,replies,snippet',
videoId=channelID,
response = request.execute()

playlistId = response['items'][0]['contentDetails']['relatedPlaylists']['uploads']

request = youtube.playlistItems().list(
part="contentDetails",
playlistId=playlistId,
maxResults=50
)

response = request.execute()
comments_list.extend(process_comments(response['items']))
responseItems = response['items']

videoIds.extend([item['contentDetails']['videoId'] for item in responseItems])

# if there is nextPageToken, then keep calling the API
while response.get('nextPageToken', None):
request = youtube.commentThreads().list(
part='id,replies,snippet',
videoId=channelID,
print(f'Fetching next page of videos for {channelId}_{playlistId}')
request = youtube.playlistItems().list(
part="contentDetails",
playlistId=playlistId,
maxResults=50,
pageToken=response['nextPageToken']
)
response = request.execute()
comments_list.extend(process_comments(response['items']))

print(f"Finished fetching comments for {channelID}. {len(comments_list)} comments found.")

if to_csv:
make_csv(comments_list, channelID)
responseItems = response['items']

videoIds.extend([item['contentDetails']['videoId'] for item in responseItems])

return comments_list
print(f"Finished fetching videoIds for {channelId}. {len(videoIds)} videos found.")

return videoIds

def get_video_ids(channelId):
def channel_stats(youtube, channelIDs, to_csv=False):
"""
Refer to the documentation: https://googleapis.github.io/google-api-python-client/docs/dyn/youtube_v3.search.html
Refer to the documentation: https://googleapis.github.io/google-api-python-client/docs/dyn/youtube_v3.channels.html
"""
videoIds = []

request = youtube.search().list(
part="snippet",
channelId=channelId,
type="video",
maxResults=50,
order="date"
)
if type(channelIDs) == str:
channelIDs = [channelIDs]

response = request.execute()
responseItems = response['items']

videoIds.extend([item['id']['videoId'] for item in responseItems if item['id'].get('videoId', None) != None])
stats_list = []

# if there is nextPageToken, then keep calling the API
while response.get('nextPageToken', None):
request = youtube.search().list(
part="snippet",
channelId=channelId,
for channelId in channelIDs:
request = youtube.channels().list(
part="statistics",
id=channelId
)
response = request.execute()
responseItems = response['items']
response = response['items'][0]['statistics']
response['channelId'] = channelId

stats_list.append(response)

if to_csv:
header = stats_list[0].keys()
with open(f'channelStats.csv', 'w') as f:
writer = csv.DictWriter(f, fieldnames=header)
writer.writeheader()
writer.writerows(stats_list)

return stats_list

videoIds.extend([item['id']['videoId'] for item in responseItems if item['id'].get('videoId', None) != None])
def video_stats(youtube, videoIDs, channelID, to_csv=False):
if type(videoIDs) == str:
videoIDs = [videoIDs]

print(f"Finished fetching videoIds for {channelId}. {len(videoIds)} videos found.")
stats_list = []

return videoIds
for videoId in videoIDs:
request = youtube.videos().list(
part="snippet, statistics, contentDetails",
id=videoId
)
response = request.execute()
statistics = response['items'][0]['statistics']
snippet = response['items'][0]['snippet']
statistics['videoId'] = videoId
statistics['title'] = snippet['title']
statistics['description'] = snippet['description']
statistics['publishedAt'] = snippet['publishedAt']
statistics['duration'] = response['items'][0]['contentDetails']['duration']
statistics['thumbnail'] = snippet['thumbnails']['high']['url']
statistics['channelId'] = channelID
statistics['likeCount'] = statistics.get('likeCount', 0)

print(f"Fetched stats for {videoId}")
stats_list.append(statistics)

if to_csv:
header = stats_list[0].keys()
with open(f'videosFolder/videoStats_{channelID}.csv', 'w', encoding='utf8', newline='') as f:
writer = csv.DictWriter(f, fieldnames=header)
writer.writeheader()
writer.writerows(stats_list)

print(f'Success in fetching video stats for {channelID}')

return stats_list


if __name__ == '__main__':
pyscriptVidId = 'Qo8dXyKXyME'
channelId = 'UCzIxc8Vg53_ewaRIk3shBug'
def comment_threads(youtube, videoID, channelID=None, to_csv=False):

comments_list = []

try:
request = youtube.commentThreads().list(
part='id,replies,snippet',
videoId=videoID,
)
response = request.execute()
except Exception as e:
print(f'Error fetching comments for {videoID} - error: {e}')
if scraped_videos.get('error_ids', None):
scraped_videos['error_ids'].append(videoID)
else:
scraped_videos['error_ids'] = [videoID]
return

# response = search_result("pyscript")
response = channel_stats(channelId)
# response = comment_threads(pyscriptVidId, to_csv=True)
comments_list.extend(process_comments(response['items']))

print(response)
# if there is nextPageToken, then keep calling the API
while response.get('nextPageToken', None):
request = youtube.commentThreads().list(
part='id,replies,snippet',
videoId=videoID,
pageToken=response['nextPageToken']
)
response = request.execute()
comments_list.extend(process_comments(response['items']))

print(f"Finished fetching comments for {videoID}. {len(comments_list)} comments found.")

if to_csv:
try:
make_csv(comments_list, channelID, videoID)
except Exception as e:
print(f'Error writing comments to csv for {videoID} - error: {e}')
if scraped_videos.get('error_csv_ids', None):
scraped_videos['error_csv_ids'].append(videoID)
else:
scraped_videos['error_csv_ids'] = [videoID]
return

if scraped_videos.get(channelID, None):
scraped_videos[channelID].append(videoID)
else:
scraped_videos[channelID] = [videoID]

return comments_list

if __name__ == '__main__':
pyscriptVidId = 'Qo8dXyKXyME'
channelId = 'UCzIxc8Vg53_ewaRIk3shBug'