-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathbluesky_rss_zotero.py
executable file
·227 lines (205 loc) · 8.4 KB
/
bluesky_rss_zotero.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
#!/Users/john_muccigrosso/.venv/bin/python3
# A script to check an RSS feed and share the latest new entry on Bluesky.
# Guts of it are from <https://sperea.es/blog/bot-bluesky-rss>, but now
# with sigificant upgrades, including the use of the atproto library.
# It will grab an image from the rss, defaulting to the feed icon.
# It logs both success and failure.
# I also pushed some constants into files for privacy.
from atproto import Client, client_utils
from datetime import datetime, timezone
from PIL import Image
import feedparser
import io
from io import BytesIO
import json
import os.path
import re
import sys
import time
import urllib3
#from urllib.request import urlopen
# Constants
CHECK_FILE = "zotero_date.txt"
BLUESKY_PW_FILE = "bluesky_app_password.txt"
BLUESKY_HANDLE_FILE = "bluesky_handle.txt"
ICON_FILE = "https://jmuccigr.github.io/images/zotero_icon.png"
MAX_POSTS = 3
MAX_IMAGE_SIZE = 1000000
# In this case I can limit the length of the returned feed to save processing time
FEED_URL = "https://api.zotero.org/users/493397/items/top?limit=" + MAX_POSTS.__str__() + "format=atom&v=3"
BLUESKY_API_ENDPOINT = "https://bsky.social/xrpc/com.atproto.repo.createRecord"
API_KEY_URL = "https://bsky.social/xrpc/com.atproto.server.createSession"
POST_DELAY = 5 #in seconds
def compare_post_dates(post_date):
global pubdate
# If not already done, check the file for the lastest published date.
# Report error & set an absurdly early date if the file doesn't exist
if pubdate == "":
if not os.path.isfile(check_file):
pubdate="1900-01-01T00:00:01+00:00"
with open(check_file, 'x') as file:
file.write(pubdate)
print(timestamp + "Zotero item check file does not exist", file=sys.stderr)
else:
# Open the file and read the date of the last published Zotero item.
f = open(check_file, "r")
pubdate = f.readlines()[0].replace("\n", "")
latest_post_date = datetime.strptime(post_date, "%Y-%m-%dT%H:%M:%S%z")
try:
last_published_date = datetime.strptime(pubdate, "%Y-%m-%dT%H:%M:%S%z")
except Exception as e:
print(timestamp + " Something wrong with last pub date in local file: " + e.__str__(), file=sys.stderr)
sys.exit()
if latest_post_date > last_published_date:
return latest_post_date # latest post is newer
else:
return False # last published is newer
def get_rss_content():
ct=0
# Parse the RSS feed
rssfeed = feedparser.parse(FEED_URL)
if hasattr(rssfeed.feed, 'icon'):
icon = rssfeed.feed.icon
else:
icon = ICON_FILE
validEntries=[]
# Iterate through the entries in the feed until we have enough or they're exhausted
max_posts=min(MAX_POSTS, len(rssfeed.entries))
for entry in rssfeed.entries:
if ct < max_posts:
post_title = entry.title
post_link = entry.link
if hasattr(entry, 'media_thumbnail'):
post_image = entry.media_thumbnail[0]['url']
post_image_desc = "Image from the post"
else:
post_image = icon
post_image_desc = "blog icon"
# Use only one of the next two lines.
# post_date = entry.updated
post_date = entry.published
response=compare_post_dates(post_date)
if response:
ct += 1
temp = dict()
temp["title"] = post_title
temp["link"] = post_link
temp["image"] = post_image
temp["image_desc"] = post_image_desc
validEntries.append(temp)
#return latest_post_title, latest_post_link, latest_post_date
if ct > 0:
return validEntries
else:
return False
def prepare_post_for_bluesky(title, link):
# Convert the RSS item into a format suitable for Bluesky.
short_title=title[0:240]
tb = client_utils.TextBuilder()
tb.text("Recently noted...\n\n" + short_title + "\n\nSee it in ")
tb.link("my Zotero library", link)
tb.text(".")
return tb
def prepare_image(image_url):
http = urllib3.PoolManager()
try:
response = http.request("GET", image_url)
status = response.status
if (response.status != 200):
print(timestamp + " Unable to download image file. Error " + response.status.__str__() + ": " + image_url, file=sys.stderr)
return ""
img_data = response.data
# Using a quick and dirty rule of thumb: images less than dim in size will be
# below the size limit for Bluesky. If an image is too big, just shrink it
# right away and don't sweat it. Alternative would be to iteratively shrink it
# until it's small enough.
if (sys.getsizeof(img_data)) > MAX_IMAGE_SIZE:
img = Image.open(BytesIO(img_data))
if img.format in ("JPEG", "GIF"):
dim=800
else:
dim=400
img.thumbnail((dim,dim))
img_byte_arr = io.BytesIO()
img.save(img_byte_arr, format=img.format)
img_data = img_byte_arr.getvalue()
except:
# Log error & return something small
print(timestamp + " Unable to get image file: " + image_url, file=sys.stderr)
img_data = ""
return(img_data)
def bluesky_rss_bot(app_password, client):
# Fetch content from the RSS feed
validEntries = get_rss_content()
# Only do something if there are valid entries
if validEntries:
# Authenticate and obtain necessary credentials
ct=0
# Prepare the fetched content for Bluesky
for entry in validEntries:
# Wait a little if posting more than one entry
if ct > 0:
time.sleep(POST_DELAY)
ct += 1
post_structure = prepare_post_for_bluesky(entry["title"], entry["link"])
if entry["image"] == "":
bluesky_reply = client.send_post(post_structure)
else:
image_url = entry["image"]
img_data = prepare_image(image_url)
if sys.getsizeof(img_data) < 100:
print(timestamp + " Bluesky post image couldn't be retrieved", file=sys.stderr)
bluesky_reply = client.send_post(post_structure)
else:
bluesky_reply = client.send_image(text=post_structure, image=img_data, image_alt=entry["image_desc"])
try:
reply = reply + bluesky_reply
except:
reply = bluesky_reply
print(timestamp + " Published latest Zotero items to Bluesky", file=sys.stderr)
return reply
else:
print(timestamp + " Latest Zotero item already published", file=sys.stderr)
return "No need to post."
def main():
global check_file
global handle
global timestamp
global pubdate
global userpath
pubdate=""
# Get timestamps for log entries and comparison
timestamp =(f'{datetime.now():%Y-%m-%d %H:%M:%S%z}')
check_date=datetime.now(timezone.utc).isoformat(sep="T", timespec="seconds")
# Get needed info from files. Adjust userpath as needed.
userpath=(re.sub("^(.+/Documents/).*", r"\1", os.path.dirname(os.path.realpath(__file__))))
check_file = userpath + CHECK_FILE
pw_file = userpath + BLUESKY_PW_FILE
handle_file=userpath + BLUESKY_HANDLE_FILE
if not os.path.isfile(pw_file):
print(timestamp + " Bluesky password file does not exist", file=sys.stderr)
elif not os.path.isfile(handle_file):
print(timestamp + " Bluesky handle file does not exist", file=sys.stderr)
else:
# Get needed info from files
f = open(pw_file, "r")
app_pw = f.readlines()[0].replace("\n", "")
f = open(handle_file, "r")
handle = f.readlines()[0].replace("\n", "")
# Do the actual work
client = Client()
try:
client.login(handle, app_pw)
except:
print(timestamp + " Problem logging into Bluesky", file=sys.stderr)
# Trapping this, but not doing anything with it now
e = sys.exc_info()[1]
else:
response = bluesky_rss_bot(app_pw, client)
# Finish by writing the date to file for next run
with open(check_file, 'w') as f:
f.write(check_date)
f.close()
print(response)
if __name__ == "__main__":
main()