|
| 1 | +from atproto import Client, client_utils |
| 2 | +import networkx as nx |
| 3 | +import time |
| 4 | +import logging |
| 5 | +import pandas as pd |
| 6 | +from datetime import datetime, timedelta, timezone |
| 7 | + |
| 8 | +from atproto_client.exceptions import BadRequestError |
| 9 | + |
| 10 | +from spikexplore.NodeInfo import NodeInfo |
| 11 | +from spikexplore.graph import add_node_attributes, add_edges_attributes |
| 12 | + |
| 13 | +logger = logging.getLogger(__name__) |
| 14 | + |
| 15 | + |
| 16 | +class BlueskyCredentials: |
| 17 | + def __init__(self, handle, password): |
| 18 | + self.handle = handle |
| 19 | + self.password = password |
| 20 | + |
| 21 | + |
| 22 | +class SkeetsGetter: |
| 23 | + def __init__(self, credentials, config): |
| 24 | + # Instantiate an object |
| 25 | + self.config = config |
| 26 | + self.bsky_client = Client() |
| 27 | + self.bsky_client.login(credentials.handle, credentials.password) |
| 28 | + self.profiles_cache = {} |
| 29 | + self.features_attrs = {"mention": "did", "tag": "tag", "link": "uri"} |
| 30 | + |
| 31 | + def _filter_old_skeets(self, skeets): |
| 32 | + max_day_old = self.config.max_day_old |
| 33 | + if not max_day_old: |
| 34 | + return skeets |
| 35 | + |
| 36 | + days_limit = datetime.now() - timedelta(days=max_day_old) |
| 37 | + skeets_filt = filter( |
| 38 | + lambda t: datetime.fromisoformat(t.post.record["created_at"].replace("Z", "+00:00")).replace(tzinfo=None) >= days_limit, skeets |
| 39 | + ) |
| 40 | + return list(skeets_filt) |
| 41 | + |
| 42 | + def get_profile(self, did): |
| 43 | + handle = self.profiles_cache.get(did) |
| 44 | + if handle is not None: |
| 45 | + return handle |
| 46 | + try: |
| 47 | + p = self.bsky_client.get_profile(did) |
| 48 | + if p is not None: |
| 49 | + self.profiles_cache[did] = p.handle |
| 50 | + return p.handle |
| 51 | + except Exception as e: |
| 52 | + logger.error("Error in getting profile: ", e) |
| 53 | + finally: |
| 54 | + return None |
| 55 | + |
| 56 | + def facet_data(self, skeet, data): |
| 57 | + if not hasattr(skeet, "record"): |
| 58 | + return [] |
| 59 | + if skeet.record.facets is None: |
| 60 | + return [] |
| 61 | + return [ |
| 62 | + getattr(f.features[0], self.features_attrs[data]) |
| 63 | + for f in skeet.record.facets |
| 64 | + if f.features[0].py_type == f"app.bsky.richtext.facet#{data}" |
| 65 | + ] |
| 66 | + |
| 67 | + def get_user_skeets(self, username): |
| 68 | + # Collect skeets from a username/did |
| 69 | + |
| 70 | + count = self.config.max_skeets_per_user |
| 71 | + |
| 72 | + # Test if ok |
| 73 | + try: |
| 74 | + user_skeets_raw = self.bsky_client.get_author_feed(actor=username, limit=count).feed |
| 75 | + # remove old tweets |
| 76 | + user_skeets_filt = self._filter_old_skeets(user_skeets_raw) |
| 77 | + # make a dictionary |
| 78 | + user_skeets = {x.post.cid: x.post for x in user_skeets_filt} |
| 79 | + |
| 80 | + # update profile cache |
| 81 | + for v in user_skeets.items(): |
| 82 | + if v[1].author.did not in self.profiles_cache: |
| 83 | + self.profiles_cache[v[1].author.did] = v[1].author.handle |
| 84 | + |
| 85 | + skeets_metadata = map( |
| 86 | + lambda x: ( |
| 87 | + x[0], |
| 88 | + { |
| 89 | + "user_did": x[1].author.did, |
| 90 | + "user": x[1].author.handle, |
| 91 | + "name": x[1].author.display_name, |
| 92 | + "mentions": self.facet_data(x[1], "mention"), |
| 93 | + "hashtags": self.facet_data(x[1], "tag"), |
| 94 | + "links": self.facet_data(x[1], "link"), |
| 95 | + "repost_count": x[1].repost_count, |
| 96 | + "favorite_count": x[1].like_count, |
| 97 | + "reply_to": x[1].reply.parent.author.handle if hasattr(x[1], "reply") else [], |
| 98 | + "created_at": x[1].record.created_at, |
| 99 | + "account_creation": x[1].author.created_at, |
| 100 | + }, |
| 101 | + ), |
| 102 | + user_skeets.items(), |
| 103 | + ) |
| 104 | + return user_skeets, dict(skeets_metadata) |
| 105 | + except BadRequestError as e: |
| 106 | + logger.error(f"Error in getting user skeets: code {e.response.status_code} - {e.response.content.message}") |
| 107 | + return {}, {} |
| 108 | + except Exception as e: |
| 109 | + logger.error(f"Error in getting user skeets: {e}") |
| 110 | + return {}, {} |
| 111 | + |
| 112 | + def reshape_node_data(self, node_df): |
| 113 | + node_df = node_df[ |
| 114 | + [ |
| 115 | + "user_did", |
| 116 | + "user", |
| 117 | + "name", |
| 118 | + "spikyball_hop", |
| 119 | + "account_creation", |
| 120 | + ] |
| 121 | + ] |
| 122 | + node_df = node_df.reset_index().groupby("user").max().rename(columns={"index": "last_skeet_id"}) |
| 123 | + return node_df |
| 124 | + |
| 125 | + |
| 126 | +class BlueskyNetwork: |
| 127 | + class BlueskyNodeInfo(NodeInfo): |
| 128 | + def __init__(self, user_hashtags=None, user_skeets=None, user_links=None, skeets_meta=pd.DataFrame()): |
| 129 | + self.user_hashtags = user_hashtags if user_hashtags else {} |
| 130 | + self.user_links = user_links if user_links else {} |
| 131 | + self.user_skeets = user_skeets if user_skeets else {} |
| 132 | + self.skeets_meta = skeets_meta |
| 133 | + |
| 134 | + def update(self, new_info): |
| 135 | + self.user_hashtags.update(new_info.user_hashtags) |
| 136 | + self.user_skeets.update(new_info.user_skeets) |
| 137 | + self.user_links.update(new_info.user_skeets) |
| 138 | + |
| 139 | + def get_nodes(self): |
| 140 | + return self.skeets_meta |
| 141 | + |
| 142 | + def __init__(self, credentials, config): |
| 143 | + self.skeets_getter = SkeetsGetter(credentials, config) |
| 144 | + self.config = config |
| 145 | + |
| 146 | + def create_node_info(self): |
| 147 | + return self.BlueskyNodeInfo() |
| 148 | + |
| 149 | + def get_neighbors(self, user): |
| 150 | + if not isinstance(user, str): |
| 151 | + return self.BlueskyNodeInfo(), pd.DataFrame() |
| 152 | + skeets_dic, skeets_meta = self.skeets_getter.get_user_skeets(user) |
| 153 | + edges_df, node_info = self.edges_nodes_from_user(user, skeets_meta, skeets_dic) |
| 154 | + |
| 155 | + # replace user and mentions by source and target |
| 156 | + if not edges_df.empty: |
| 157 | + edges_df.index.names = ["source", "target"] |
| 158 | + edges_df.reset_index(level=["source", "target"], inplace=True) |
| 159 | + |
| 160 | + return node_info, edges_df |
| 161 | + |
| 162 | + def filter(self, node_info, edges_df): |
| 163 | + # filter edges according to node properties |
| 164 | + # filter according to edges properties |
| 165 | + edges_df = self.filter_edges(edges_df) |
| 166 | + return node_info, edges_df |
| 167 | + |
| 168 | + def filter_edges(self, edges_df): |
| 169 | + # filter edges according to their properties |
| 170 | + if edges_df.empty: |
| 171 | + return edges_df |
| 172 | + return edges_df[edges_df["weight"] >= self.config.min_mentions] |
| 173 | + |
| 174 | + def neighbors_list(self, edges_df): |
| 175 | + users_connected = edges_df["target"].tolist() |
| 176 | + return users_connected |
| 177 | + |
| 178 | + def neighbors_with_weights(self, edges_df): |
| 179 | + if edges_df.empty: |
| 180 | + return {} |
| 181 | + user_list = self.neighbors_list(edges_df) |
| 182 | + return dict.fromkeys(user_list, 1) |
| 183 | + |
| 184 | + ############################################################### |
| 185 | + # Functions for extracting skeet info from the bluesky API |
| 186 | + ############################################################### |
| 187 | + |
| 188 | + def edges_nodes_from_user(self, user, skeets_meta, skeets_dic): |
| 189 | + # Make an edge and node property dataframes |
| 190 | + edges_df = self.get_edges(user, skeets_meta) |
| 191 | + user_info = self.get_nodes_properties(skeets_meta, skeets_dic) |
| 192 | + return edges_df, user_info |
| 193 | + |
| 194 | + def did_to_handle(self, did): |
| 195 | + return self.skeets_getter.get_profile(did) |
| 196 | + |
| 197 | + def match_usernames(self, meta_df): |
| 198 | + mask = meta_df["mentions"].str.startswith("did:") |
| 199 | + meta_df.loc[mask, "mentions"] = meta_df.loc[mask, "mentions"].apply(self.did_to_handle) |
| 200 | + |
| 201 | + return meta_df.dropna(subset=["mentions"]) |
| 202 | + |
| 203 | + def get_edges(self, user, skeets_meta): |
| 204 | + if not skeets_meta: |
| 205 | + return pd.DataFrame() |
| 206 | + # Create the user -> mention table with their properties fom the list of tweets of a user |
| 207 | + mentions_df = pd.DataFrame.from_dict(skeets_meta, orient="index") |
| 208 | + mentions_df["full_mentions"] = mentions_df["mentions"] + mentions_df["reply_to"] # a reply is a kind of mention |
| 209 | + mentions_df = ( |
| 210 | + mentions_df.drop(columns=["reply_to", "mentions"]).explode("full_mentions").dropna().rename(columns={"full_mentions": "mentions"}) |
| 211 | + ) |
| 212 | + # Some bots to be removed from the collection |
| 213 | + users_to_remove = self.config.users_to_remove |
| 214 | + |
| 215 | + # mentions can be dids so need to translate that first into user handles |
| 216 | + mentions_df = self.match_usernames(mentions_df) |
| 217 | + filtered_mentions_df = mentions_df[~mentions_df["mentions"].isin(users_to_remove) & ~mentions_df["mentions"].isin(mentions_df["user"])] |
| 218 | + |
| 219 | + # group by mentions and keep list of tweets for each mention |
| 220 | + tmp = filtered_mentions_df.groupby(["user", "mentions"]).apply(lambda x: (x.index.tolist(), len(x.index)), include_groups=False) |
| 221 | + if tmp.empty: |
| 222 | + edge_mentions_df = pd.DataFrame([], columns=["source", "target", "cid", "weight"]) |
| 223 | + else: |
| 224 | + edge_mentions_df = pd.DataFrame(tmp.tolist(), index=tmp.index).rename(columns={0: "cid", 1: "weight"}) |
| 225 | + edge_mentions_df.index.names = ["source", "target"] |
| 226 | + # Now get reposts |
| 227 | + repost_df = pd.DataFrame.from_dict(skeets_meta, orient="index") |
| 228 | + repost_df["source_user"] = user |
| 229 | + # remove self edges |
| 230 | + repost_df = repost_df[repost_df["user"] != repost_df["source_user"]] |
| 231 | + tmp = repost_df.groupby(["source_user", "user"]).apply(lambda x: (x.index.tolist(), len(x.index)), include_groups=False) |
| 232 | + if tmp.empty: |
| 233 | + edge_repost_df = pd.DataFrame([], columns=["source", "target", "cid", "weight"]) |
| 234 | + else: |
| 235 | + edge_repost_df = pd.DataFrame(tmp.tolist(), index=tmp.index).rename(columns={0: "cid", 1: "weight"}) |
| 236 | + edge_repost_df.index.names = ["source", "target"] |
| 237 | + edges_df = pd.concat([edge_mentions_df, edge_repost_df]).groupby(["source", "target"]).sum() |
| 238 | + return edges_df |
| 239 | + |
| 240 | + def get_nodes_properties(self, skeets_meta, skeets_dic): |
| 241 | + if not skeets_meta: |
| 242 | + return self.BlueskyNodeInfo({}, {}, {}, pd.DataFrame()) |
| 243 | + nb_popular_skeets = self.config.nb_popular_skeets |
| 244 | + # global properties |
| 245 | + meta_df = pd.DataFrame.from_dict(skeets_meta, orient="index").sort_values("repost_count", ascending=False) |
| 246 | + # hashtags statistics |
| 247 | + ht_df = meta_df.explode("hashtags").dropna() |
| 248 | + htgb = ht_df.groupby(["hashtags"]).size() |
| 249 | + user_hashtags = pd.DataFrame(htgb).rename(columns={0: "count"}).sort_values("count", ascending=False).to_dict() |
| 250 | + links_df = meta_df.explode("links").dropna() |
| 251 | + links = links_df.groupby(["links"]).size() |
| 252 | + user_links = pd.DataFrame(links).rename(columns={0: "count"}).sort_values("count", ascending=False).to_dict() |
| 253 | + user_name = meta_df["user"].iloc[0] |
| 254 | + skeets_meta_kept = meta_df.head(nb_popular_skeets) |
| 255 | + skeets_kept = {k: skeets_dic[k] for k in skeets_meta_kept.index.to_list()} |
| 256 | + # Get most popular tweets of user |
| 257 | + return self.BlueskyNodeInfo( |
| 258 | + user_hashtags={user_name: user_hashtags["count"]}, |
| 259 | + user_skeets=skeets_kept, |
| 260 | + user_links={user_name: user_links["count"]}, |
| 261 | + skeets_meta=skeets_meta_kept, |
| 262 | + ) |
| 263 | + |
| 264 | + ##################################################### |
| 265 | + ## Utils functions for the graph |
| 266 | + ##################################################### |
| 267 | + |
| 268 | + def add_graph_attributes(self, g, nodes_df, edges_df, nodes_info): |
| 269 | + g = add_edges_attributes(g, edges_df, drop_cols=["cid"]) |
| 270 | + g = add_node_attributes(g, self.skeets_getter.reshape_node_data(nodes_df), attr_dic=nodes_info.user_hashtags, attr_name="all_hashtags") |
| 271 | + return g |
0 commit comments