Backlogに大量に溜まった未読のお知らせを一括で既読に変更する方法

アプリケーション

とあるプロジェクトでBacklogを使っているのですが、直接関係ない課題でもメンションを付けられることが多く、全て確認できずにいたところ未読が1000以上…
一括で既読にしたいのですが、Backlogにはそのような機能がなかったので、ChatGTPにScriptを生成してもらいました。

ChatGPT+pythonで超便利に

仕事で使っているPCはWindowsですが、pythonのscriptをChatGPTで生成してもらうと、とても業務が楽になることが多いです。
今回もこの手法でScriptを生成しています。

生成の際のプロンプトに、

  • 仕様で不足している、あるいは矛盾している点が合った場合は些細な点でも必ず質問すること
  • 仕様で不足しているが、解決方法が予測できるものについては使い勝手を考慮し実装すること。その場合は、実装した仕様について簡潔に報告すること
  • スクリプトを生成する際は必ずスクリプト全文を表示すること
  • デバッグ可能なように適宜デバッグ用の仕組みを実装し、フラグで有効・無効を切り替えられること
  • pythonのライブラリを追加でインストールする必要がある場合はインストール方法を提示すること

を最初に指示しておくと、かなり実用的なスクリプトを生成してくれます。
また、何度も仕様追加・変更を繰り返しているとChatGPTにトラブルがあった際に更新できなくなると困りますので、時々「このチャットが失われた場合でも、完璧な状態でスクリプトを再生成できるよう、スクリプトの詳細設計をテキストで出力してください」としておけば、仕様を書いてくれるので、途中から作り直したいときに便利です。

Backlogの未読のお知らせ一括クリアスクリプト

Python
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import time
import requests
from concurrent.futures import ThreadPoolExecutor, as_completed

# ---- 以下の値は環境に合わせて設定してください ----
# 例: 環境変数で管理している場合は os.environ.get('BACKLOG_API_KEY') 等に変更可
BACKLOG_API_KEY = "【ここにAPIキー】"
BACKLOG_API_ENDPOINT = "https://【ワークスペースのURL】.backlog.com/api/v2"

# 並列実行するスレッド数を減らす(例: 2)
MAX_WORKERS = 3

# mark_as_read 実行時のリトライ最大回数
MAX_RETRIES = 3

# リトライ間隔(429が返ってきたら何秒待つか)
RETRY_WAIT_SECONDS = 5.0

# リクエスト毎に追加で待つ秒数(連続リクエストを避けるため)
REQUEST_SLEEP = 0.5

def get_unread_notifications_count():
    url = f"{BACKLOG_API_ENDPOINT}/notifications/count"
    params = {
        'apiKey': BACKLOG_API_KEY,
        'resourceAlreadyRead': 'false'
    }
    resp = requests.get(url, params=params)
    resp.raise_for_status()
    data = resp.json()
    return data['count']

def get_notifications(max_id=None):
    url = f"{BACKLOG_API_ENDPOINT}/notifications"
    params = {
        'apiKey': BACKLOG_API_KEY
    }
    if max_id is not None:
        params['maxId'] = str(max_id)
    resp = requests.get(url, params=params)
    resp.raise_for_status()
    return resp.json()

def mark_as_read(ntf_id):
    """
    通知ID ntf_id を既読化する。
    - リクエスト毎に time.sleep(REQUEST_SLEEP) で負荷を下げる
    - 429 が返ってきたら一定時間待って再試行(最大 MAX_RETRIES 回)
    - レスポンスボディが空の場合でもエラーにしない
    """
    url = f"{BACKLOG_API_ENDPOINT}/notifications/{ntf_id}/markAsRead"
    params = {'apiKey': BACKLOG_API_KEY}

    attempt = 0
    while attempt < MAX_RETRIES:
        attempt += 1
        try:
            resp = requests.post(url, params=params)
            # リクエスト間隔をあける
            time.sleep(REQUEST_SLEEP)

            if resp.status_code == 429:
                # レートリミット超過 → 待って再試行
                print(f"WARN: 429 Too Many Requests (通知ID={ntf_id}), {RETRY_WAIT_SECONDS}秒待機してリトライ...")
                time.sleep(RETRY_WAIT_SECONDS)
                continue

            resp.raise_for_status()

            # レスポンスボディが空 or JSONでない場合に備えた処理
            if not resp.text.strip():
                return None
            try:
                return resp.json()
            except ValueError:
                return None

        except requests.RequestException as e:
            # ネットワークエラーや他のHTTPエラー(4xx,5xx)
            # 429以外はリトライしても無駄なケースもあるが、必要に応じて再試行
            print(f"WARN: mark_as_read (通知ID={ntf_id}) 失敗: {e}")
            time.sleep(RETRY_WAIT_SECONDS)

    # MAX_RETRIES 回試みても成功しなかった
    print(f"ERROR: 通知ID={ntf_id} を既読化できませんでした({MAX_RETRIES}回リトライ)")
    return None

def main():
    try:
        unread_count = get_unread_notifications_count()
    except requests.RequestException as e:
        print(f"ERROR: 未読通知数の取得に失敗しました: {e}")
        return

    print(f"未読通知は {unread_count} 件です")
    if unread_count == 0:
        return

    last_id = None
    while True:
        try:
            ns = get_notifications(last_id)
        except requests.RequestException as e:
            print(f"ERROR: 通知の取得に失敗しました: {e}")
            break

        if not ns:
            # 取得した通知が空なら終了
            break

        last_id = ns[-1]['id']

        # 未読のみ抽出
        unread_ids = [item['id'] for item in ns if item.get('resourceAlreadyRead') == False]

        if unread_ids:
            print(f"今回の取得分({len(ns)}件)のうち未読 {len(unread_ids)} 件 → 並列(最大{MAX_WORKERS})で既読化")
            with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
                futures = [executor.submit(mark_as_read, uid) for uid in unread_ids]
                for future in as_completed(futures):
                    _ = future.result()
        else:
            print(f"今回の取得分({len(ns)}件)はすべて既読でした。")

    print("すべてのページングが完了しました。処理を終了します。")

if __name__ == "__main__":
    main()

BACKLOG_API_KEY = “【ここにAPIキー】”
BACKLOG_API_ENDPOINT = “https://【ワークスペースのURL】.backlog.com/api/v2”

の箇所にある、【】内の部分を、ご利用の環境に応じて書き換えてください。
APIキーの取得方法については下記に記載があります。

Just a moment...

実行方法はコマンドプロンプトから

上記のスクリプトを、適当なファイル名(script.py)で保存し、コマンドプロンプトで以下のコマンドを実行します。

python ./script.py

あとは処理が終わるのを待つだけです。
時々アクセス過多でエラーが返るときがありますが、その場合は再度実行するか、件数が少なければ手動で既読に変えてしまってもいいかもしれません。

コメント

タイトルとURLをコピーしました