diff --git a/notion/block.py b/notion/block.py index f19784f..cc99ed0 100644 --- a/notion/block.py +++ b/notion/block.py @@ -425,6 +425,36 @@ def move_to(self, target_block, position="last-child"): ] ) + def duplicate(self): + + # Inspired by https://github.com/jamalex/notion-py/pull/266/ + + duplicate_target_id = self._client.create_record( + "block", self.parent, type="copy_indicator" + ) + + data = self._client.post( + "enqueueTask", + { + "task": { + "eventName": "duplicateBlock", + "request": { + "sourceBlockId": self.id, + "targetBlockId": duplicate_target_id, + "addCopyName": True, + }, + } + }, + ).json() + task_id = data.get("taskId") + + if task_id: + # Wait until the duplication task finishes + self._client.wait_for_task(task_id) + + self._client.refresh_records(block=[duplicate_target_id]) + + return self.__class__(self._client, duplicate_target_id) class DividerBlock(Block): diff --git a/notion/client.py b/notion/client.py index b49cd77..3c5a01f 100644 --- a/notion/client.py +++ b/notion/client.py @@ -2,6 +2,7 @@ import json import re import uuid +import time from requests import Session, HTTPError from requests.cookies import cookiejar_from_dict @@ -403,6 +404,46 @@ def create_record(self, table, parent, **kwargs): return record_id + # https://github.com/jamalex/notion-py/blob/391ac0ed311f68d83d70d411d00659a3e1a658a5/notion/client.py + def get_task_status(self, task_id): + """ + Get a status of a single task + """ + data = self.post( + "getTasks", + { + "taskIds": [task_id] + } + ).json() + + results = data.get("results") + + if not results: + # Notion does not know about such a task + print("Invalid task ID.") + return None + + try: + task = results.pop() + return task.get("state", None) + except IndexError: + logger.error("There is no task {}".format(results)) + return None + + def wait_for_task(self, task_id, interval=0.5, tries=10): + """ + Wait for a task by looping 'tries' times ever 'interval' seconds. + The 'interval' parameter can be used to specify milliseconds using double (e.g 0.75). + """ + for i in range(tries): + state = self.get_task_status(task_id) + if state in ["not_started", "in_progress"]: + time.sleep(interval) + elif state == "success": + return state + + logger.debug("Task takes more time than expected. Specify 'interval' or 'tries' to wait more.") + class Transaction(object):