|
| 1 | +import os |
| 2 | +import json |
| 3 | +import twint |
| 4 | +import pandas as pd |
| 5 | +from pathlib import Path |
| 6 | +from lib.common.analyser import Analyser |
| 7 | +from lib.common.etypes import Etype |
| 8 | +from lib.util.twint import to_serializable, pythonize |
| 9 | + |
| 10 | + |
| 11 | +from collections import namedtuple |
| 12 | +from datetime import datetime |
| 13 | + |
| 14 | + |
| 15 | +def fmt_timestmap(dstamp, tstamp, tzone): |
| 16 | + ds = datetime.strptime(dstamp, "%Y-%m-%d") |
| 17 | + fmtted_ds = ds.strftime("%m/%d/%y") |
| 18 | + return f"{fmtted_ds} {tstamp}" |
| 19 | + |
| 20 | + |
| 21 | +TMP = Path("/tmp") |
| 22 | +TweetEdge = namedtuple( |
| 23 | + "TweetEdge", "date tweet urls domains hashtags tweet_id inreplyto_id" |
| 24 | +) |
| 25 | + |
| 26 | + |
| 27 | +class CsvGraph: |
| 28 | + node_labels = [ |
| 29 | + "Vertex", |
| 30 | + "Followed", |
| 31 | + "Followers", |
| 32 | + "Tweets", |
| 33 | + "Favorites", |
| 34 | + "Description", |
| 35 | + "Location", |
| 36 | + "Web", |
| 37 | + "Time Zone", |
| 38 | + "Joined Twitter Date (UTC)", |
| 39 | + ] |
| 40 | + edge_labels = [ |
| 41 | + "Vertex 1", |
| 42 | + "Vertex 2", |
| 43 | + "Width", |
| 44 | + "Relationship", |
| 45 | + "Relationship Date (UTC)", |
| 46 | + "Tweet", |
| 47 | + "URLs in Tweet", |
| 48 | + "Domains in Tweet", |
| 49 | + "Hashtags in Tweet", |
| 50 | + "Tweet Date (UTC)", |
| 51 | + "Twitter Page for Tweet", |
| 52 | + "Imported ID", |
| 53 | + "In-Reply-To Tweet ID", |
| 54 | + ] |
| 55 | + |
| 56 | + def __init__(self): |
| 57 | + self.nodes = [] |
| 58 | + self.edges = [] |
| 59 | + |
| 60 | + def has_node(self, name: str): |
| 61 | + return name in self.nodes |
| 62 | + |
| 63 | + def add_node(self, name: str): |
| 64 | + if name not in self.nodes: |
| 65 | + self.nodes.append(name) |
| 66 | + |
| 67 | + def add_edge(self, _from: dict, _to: dict): |
| 68 | + is_reply = _to is not None |
| 69 | + |
| 70 | + self.add_node(_from["username"]) |
| 71 | + if is_reply: |
| 72 | + self.add_node(_to["username"]) |
| 73 | + |
| 74 | + edge = TweetEdge( |
| 75 | + date=fmt_timestmap( |
| 76 | + _from["datestamp"], _from["timestamp"], _from["timezone"] |
| 77 | + ), |
| 78 | + tweet=_from["tweet"], |
| 79 | + urls=_from["urls"], |
| 80 | + domains=[], # NB: no domains provided in obj |
| 81 | + hashtags=_from["hashtags"], |
| 82 | + tweet_id=_from["id"], |
| 83 | + inreplyto_id=_to["id"] if _to is not None else None, |
| 84 | + ) |
| 85 | + |
| 86 | + self.edges.append( |
| 87 | + [ |
| 88 | + _from["username"], |
| 89 | + _to["username"] if is_reply else _from["username"], |
| 90 | + 1, # width defaults to 1 |
| 91 | + "Tweet" if not is_reply else "Replies To", # relationship |
| 92 | + edge.date, # relationship date |
| 93 | + edge.tweet, |
| 94 | + "- ".join(edge.urls) if isinstance(edge.urls, list) else edge.urls, |
| 95 | + "- ".join(edge.domains) |
| 96 | + if isinstance(edge.domains, list) |
| 97 | + else edge.domains, |
| 98 | + "- ".join(edge.hashtags) |
| 99 | + if isinstance(edge.hashtags, list) |
| 100 | + else edge.hashtags, |
| 101 | + edge.date, # tweet date |
| 102 | + f"https://twitter.com/${_from['username']}/status/${_from['id']}", |
| 103 | + edge.tweet_id, # the tweet's id |
| 104 | + "" |
| 105 | + if not is_reply |
| 106 | + else edge.inreplyto_id, # the id of the tweet to which this replies. |
| 107 | + ] |
| 108 | + ) |
| 109 | + |
| 110 | + def to_xlsx(self, path): |
| 111 | + """ Save graph as XLSX file. The default tab will be edges, with an extra tab for nodes. """ |
| 112 | + edge_df = pd.DataFrame.from_records(self.edges) |
| 113 | + edge_df.columns = CsvGraph.edge_labels |
| 114 | + node_df = pd.DataFrame.from_records([[x] for x in self.nodes]) |
| 115 | + node_df.columns = ["Vertex"] |
| 116 | + |
| 117 | + writer = pd.ExcelWriter(path, engine="xlsxwriter") |
| 118 | + edge_df.to_excel(writer, sheet_name="Edges") |
| 119 | + node_df.to_excel(writer, sheet_name="Vertices") |
| 120 | + writer.save() |
| 121 | + |
| 122 | + |
| 123 | +class TwintToGephi(Analyser): |
| 124 | + def pre_analyse(self, _): |
| 125 | + # keeps a record of which user ids have been indexed so that there's no |
| 126 | + # repeated work. |
| 127 | + self.indexed_ids = [] |
| 128 | + # usernames (to easily check whether a user exists in the graph or not) |
| 129 | + self.graph = CsvGraph() |
| 130 | + |
| 131 | + def analyse_element(self, element: Etype.Json, _) -> Etype.Any: |
| 132 | + with open(element.paths[0], "r") as f: |
| 133 | + orig_tweet = json.load(f) |
| 134 | + orig_tweet = pythonize(orig_tweet) |
| 135 | + |
| 136 | + tweet_with_replies = [orig_tweet] |
| 137 | + reply_count = orig_tweet["replies_count"] |
| 138 | + # retweet_count = orig_tweet["retweets_count"] |
| 139 | + usr = orig_tweet["username"] |
| 140 | + |
| 141 | + # TODO: get retweets, as they are mentions |
| 142 | + # if retweet_count > 0: |
| 143 | + # retweets = self.get_all_retweets(usr) |
| 144 | + |
| 145 | + if reply_count > 0 and usr not in self.indexed_ids: |
| 146 | + # TODO: keep a record so that we don't need to rescrape |
| 147 | + # self.indexed_ids.append(usr) |
| 148 | + |
| 149 | + all_tweets = self.get_all_tweets_sent_to(usr) |
| 150 | + conv_tweets = [ |
| 151 | + tweet |
| 152 | + for tweet in all_tweets |
| 153 | + if tweet["conversation_id"] == orig_tweet["conversation_id"] |
| 154 | + ] |
| 155 | + if len(conv_tweets) > 0: |
| 156 | + tweet_with_replies = tweet_with_replies + conv_tweets |
| 157 | + self.logger(f"{len(conv_tweets)} replies added to tweet {element.id}.") |
| 158 | + |
| 159 | + output = TMP / f"{element.id}.json" |
| 160 | + with open(output, "w+") as f: |
| 161 | + json.dump(tweet_with_replies, f) |
| 162 | + |
| 163 | + element.paths = [output] |
| 164 | + |
| 165 | + return element |
| 166 | + |
| 167 | + def get_all_retweets(self, username): |
| 168 | + c = twint.Config() |
| 169 | + c.Username = username |
| 170 | + c.Retweets = True |
| 171 | + twint.run.Profile(c) |
| 172 | + |
| 173 | + def get_all_tweets_sent_to(self, username): |
| 174 | + """ See https://github.com/twintproject/twint/issues/513 """ |
| 175 | + c = twint.Config() |
| 176 | + c.To = f"@{username}" |
| 177 | + c.Retweets = True |
| 178 | + c.Since = self.config["uploaded_after"] |
| 179 | + c.Until = self.config["uploaded_before"] |
| 180 | + c.Store_object = True |
| 181 | + self.logger(f"Scraping tweets sent to {username}...") |
| 182 | + twint.run.Search(c) |
| 183 | + results = twint.output.tweets_list |
| 184 | + twint.output.tweets_list = [] |
| 185 | + |
| 186 | + return to_serializable(results) |
| 187 | + |
| 188 | + def add_to_graph(self, t, inreplyto=None): |
| 189 | + """ Add the relevant rows (for `nodes` and `edges`) to a graph from |
| 190 | + a Twint-formatted tweet (Python dictionary) """ |
| 191 | + self.graph.add_node(t["username"]) |
| 192 | + |
| 193 | + self.graph.add_edge(t, inreplyto) |
| 194 | + |
| 195 | + def post_analyse(self, _): |
| 196 | + # TODO: a kind of hack... should maybe make available as a func, i.e. `self.get_analysed()` |
| 197 | + analysed_els = self.disk.read_elements([self.dest_q]) |
| 198 | + for el in analysed_els: |
| 199 | + el_json = el.paths[0] |
| 200 | + with open(el_json) as f: |
| 201 | + tweets = json.load(f) |
| 202 | + |
| 203 | + initial_tweet = tweets[0] |
| 204 | + self.logger(f"Adding tweet {initial_tweet['id']} to graph...") |
| 205 | + self.add_to_graph(initial_tweet) |
| 206 | + for tweet in tweets[1:]: |
| 207 | + self.logger(f"Adding reply {tweet['id']} to graph...") |
| 208 | + self.add_to_graph(tweet, inreplyto=initial_tweet) |
| 209 | + |
| 210 | + xlsx_path = TMP / "final.xlsx" |
| 211 | + self.graph.to_xlsx(xlsx_path) |
| 212 | + return Etype.Any("FINAL", xlsx_path) |
| 213 | + |
| 214 | + |
| 215 | +module = TwintToGephi |
0 commit comments