With this script, can you get the list of subcribers ?
I don’t think the Lemmy API exposes the subscriber list of a community, you’ll need access to the instance database.
What I did was checking every post for the last 365 days for user activity and store every user that have interacted with the community.
lemmy_session and lemmy_references are the same as for the LiveThreadBot (inside the src folder): https://gitlab.com/UlrikHD/lemmy-match-thread-bot
get_posts() may be missing from the lemmy_session.py file though
def get_posts(self, *, community: int | str | LemmyCommunity, sort: str = 'New', page: int = 1) -> dict[str, any]:
""" Gets the posts of a community.
:param community: The ID of the community to get the posts of, can also be a LemmyCommunity parseable
string/object.
:param sort: The sorting method of the posts, by default 'New'.
:param page: The page number of the posts, by default 1.
:return: The response JSON of the request as a dictionary.
"""
if isinstance(community, LemmyCommunity) or isinstance(community, str):
response: Final[requests.Response] = self.srv.get_posts(community_name=community, sort=sort, page=page,
limit=50)
else:
response: Final[requests.Response] = self.srv.get_posts(community_id=community, sort=sort, page=page,
limit=50)
if response.status_code != 200:
raise requests.exceptions.HTTPError(response.text)
return response.json()
Excuse the ugly code, it was written as a one-off
import os
import time
import datetime
from json import load, dump
import requests
from lemmy_references import LemmyCommunity, LemmyUser
from lemmy_session import LemmySession
session: LemmySession = LemmySession(website='https://lemmy.world/',
username='TestUlrikHD',
password='---',
end_script_signal=None)
posts: list[dict[[str, any]]] = []
cutoff_date: datetime.datetime = datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(days=365)
page_count: int = 1
loop_break: bool = False
while True:
post_response: dict[str, any] = session.get_posts(community=LemmyCommunity('football', 'lemmy.world'),
page=page_count)
page_count += 1
for post in post_response['posts']:
if datetime.datetime.fromisoformat(post['post']['published']) > cutoff_date:
posts.append(post)
else:
loop_break = True
break
if loop_break:
break
user_dict: dict[str, dict[str, any]] = {}
for post in posts:
user_dict[str(LemmyUser(post['creator']['actor_id']))] = {'post': True, 'post_id': post['post']['id']}
comments = session.get_post_comments(post_id=post['post']['id'])
for comment in comments['comments']:
user: str = str(LemmyUser(comment['creator']['actor_id']))
if user not in user_dict:
user_dict[user] = {'post': False, 'post_id': comment['post']['id'], 'parent_id': comment['comment']['id']}
del user_dict[str(LemmyUser('FootballAutoMod@lemmy.world'))]
del user_dict[str(LemmyUser('LiveThreadBot@lemmy.world'))]
with open('user_dict', 'w', encoding='utf-8') as file:
dump(user_dict, file, ensure_ascii=False, indent=4)
def log_reply(usr: str) -> None:
user_list: list[str] = []
if os.path.isfile('reply_list.json'):
with open('reply_list.json', 'r', encoding='utf-8') as file:
user_list = load(file)
user_list.append(str(usr))
with open('reply_list.json', 'w', encoding='utf-8') as file:
dump(user_list, file, ensure_ascii=False, indent=4)
for username, user in user_dict.items():
time.sleep(1)
try:
#if user['post']:
# session.reply(content='migration message', post_id=user['post_id'], parent_id=None)
#else:
# session.reply(content='migration message', post_id=user['post_id'], parent_id=user['parent_id'])
log_reply(usr=LemmyUser(username).str_link())
except requests.HTTPError as e:
print(f'Failed to send message to {username} - {e}')
and this part creates txt for easy copy pasting for tagging.
from json import load
with open('reply_list.json', 'r', encoding='utf-8') as file:
user_list: list[str] = load(file)
loop_count: int = len(', '.join(user_list)) // 9500 + 1
for i in range(loop_count):
with open(f'reply_list_{i}.txt', 'w', encoding='utf-8') as file:
print(len(' '.join(user_list[i * len(user_list) // loop_count:(i + 1) * len(user_list) // loop_count])))
file.write(', '.join(user_list[i * len(user_list) // loop_count:(i + 1) * len(user_list) // loop_count]))
If you need any technical help, you can message me at @UlrikHD@programming.dev