You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
wubloader/zulip_bots/zulip_bots/tootbot.py

247 lines
7.3 KiB
Python

import argh
import logging
import time
import mastodon
from bs4 import BeautifulSoup
from . import zulip
from .config import get_config
cli = argh.EntryPoint()
def format_account(account):
return f"**[{account['display_name']}]({account['url']})**"
def format_status(status):
sender = format_account(status["account"])
url = status["url"]
visibility = status["visibility"]
reply = status["in_reply_to_id"]
reblog = status["reblog"]
# private messages should not show content.
if visibility not in ("public", "unlisted"):
kind = "message"
if reblog is not None:
kind = "boost"
if reply is not None:
kind = "reply"
return f"{sender} sent [a {visibility} {kind}]({url})"
if reblog is not None:
boostee = format_account(reblog["account"])
boost_url = reblog["url"]
content = format_content(reblog)
return f"{sender} reblogged {boostee}'s [post]({boost_url})\n{content}"
content = format_content(status)
if reply:
return f"{sender} [replied]({url}) to message {reply}:\n{content}"
return f"{sender} [posted]({url}):\n{content}"
def md_wrap(kind, inner):
"""
Wrap inner in a markdown block with given kind, eg. "quote" or "spoiler TEXT HERE"
"""
return "\n".join([
f"```{kind}",
inner,
"```",
])
def format_content(status):
# Main status text should be rendered into text and quoted
html = BeautifulSoup(status["content"], "html.parser")
text = html_to_md(html)
parts = [md_wrap("quote", text)]
# Append attachments, with the link name being the alt text.
# For images at least, zulip will auto-preview them.
for attachment in status["media_attachments"]:
type = attachment["type"]
description = attachment["description"] or "(no description provided)"
url = attachment["url"]
parts.append(f"Attached {type}: [{description}]({url})")
output = "\n".join(parts)
# If content warnings present, wrap in a spoiler block.
warning = status["spoiler_text"]
if warning:
output = md_wrap(f"spoiler {warning}", output)
return output
def html_to_md(html):
"""
Take a status HTML as a BeautifulSoup object
and make a best-effort attempt to render it in markdown:
* Convert each <p> section to text ending in a newline
* Convert <a> tags to links
* Convert formatting tags to the markdown equivalent
* Ignore other tags and just pass through their inner content
Note that zulip has no way of escaping markdown syntax, so if a message contains
characters that happen to be valid markdown, there's not much we can do about it.
The only thing that could cause problems is a malicious input that breaks out of
our surrounding quote block. And you know what? Fine, they can have that. We can always
just read the link to see the real deal.
"""
if html.name is None:
# Raw string, return as-is.
return html
if html.name == "br":
# <br> should never have any contents, and just become a newline
return "\n"
# Lists need to be handled specially as they should only contain <li> elements
# and the <li> elements should be rendered differently depending on the outer element.
if html.name in ("ul", "ol"):
prefix = {"ul": "*", "ol": "1."}[html.name]
items = []
for item in html.children:
if item.name != "li":
logging.warning(f"Ignoring non-<li> inside <{html.name}> element")
continue
inner = "".join(html_to_md(child) for child in item.children)
# Prepend two spaces to every line, to create a "nesting" effect
# for sub-lists. This may break things if mixed with blockquotes but oh well.
inner = "\n".join(" " + line for line in inner.split("\n"))
items.append(prefix + inner)
return "\n".join(items)
# Get contents recursively. Bail if empty as this might cause weird things if rendered naively,
# eg. <i></i> would be rendered ** which would actually make other text bold.
inner = "".join(html_to_md(child) for child in html.children)
if not inner:
return ""
# <p> should insert a newline at the end
if html.name == "p":
return f"{inner}\n"
# <a> should become a link
href = html.get("href")
if html.name == "a" and href:
return f"[{inner}]({href})"
# <blockquote> creates a nested quote block which is its own paragraph.
if html.name == "blockquote":
return md_wrap("quote", inner)
# The following tag types are simple character-based format wrappers.
# Note we don't handle <u> due to lack of underline formatting in markdown.
# We treat all headings as bold.
CHAR_FORMAT = {
"b": "**",
"strong": "**",
"h1": "**",
"h2": "**",
"h3": "**",
"h4": "**",
"h5": "**",
"h6": "**",
"i": "*",
"em": "*",
"del": "~~",
"pre": "`",
"code": "`",
}
if html.name in CHAR_FORMAT:
char = CHAR_FORMAT[html.name]
return f"{char}{inner}{char}"
# For any other types, most notably <span> but also anything that we don't recognize,
# just pass the inner text though unchanged.
return inner
LINE = "\n---"
class Listener(mastodon.StreamListener):
def __init__(self, zulip_client, stream, post_topic, notification_topic):
self.zulip_client = zulip_client
self.stream = stream
self.post_topic = post_topic
self.notification_topic = notification_topic
def send(self, topic, content):
logging.info(f"Sending message to {self.stream}/{topic}: {content!r}")
self.zulip_client.send_to_stream(self.stream, topic, content)
def on_update(self, status):
logging.info(f"Got update: {status!r}")
self.send(self.post_topic, format_status(status) + LINE)
def on_delete(self, status_id):
logging.info(f"Got delete: {status_id}")
self.send(self.post_topic, f"*Status with id {status_id} was deleted*")
def on_status_update(self, status):
logging.info(f"Got status update: {status!r}")
self.send(self.post_topic, f"*The following status has been updated*\n{format_status(status)}" + LINE)
def on_notification(self, notification):
logging.info(f"Got {notification['type']} notification: {notification!r}")
if notification["type"] != "mention":
return
self.send(self.notification_topic, format_status(notification["status"]) + LINE)
@cli
def main(conf_file, stream="bot-spam", post_topic="Toots from Desert Bus", notification_topic="Mastodon Notifications"):
"""
Run the actual bot.
Config, in json or yaml format:
zulip:
url
email
api_key
mastodon:
url
client_id # only required for get-access-token
client_secret # only required for get-access-token
access_token # only required for main
"""
logging.basicConfig(level='INFO')
conf = get_config(conf_file)
zc = conf["zulip"]
mc = conf["mastodon"]
zulip_client = zulip.Client(zc["url"], zc["email"], zc["api_key"])
mastodon_client = mastodon.Mastodon(api_base_url=mc["url"], access_token=mc["access_token"])
listener = Listener(zulip_client, stream, post_topic, notification_topic)
RETRY_INTERVAL = 1
while True:
logging.info("Starting")
try:
mastodon_client.stream_user(listener)
except mastodon.MastodonNetworkError:
logging.warning(f"Lost connection, reconnecting in {RETRY_INTERVAL}s")
time.sleep(RETRY_INTERVAL)
@cli
def get_access_token(conf_file):
"""Do OAuth login flow and obtain an access token."""
mc = get_config(conf_file)["mastodon"]
client = mastodon.Mastodon(client_id=mc["client_id"], client_secret=mc["client_secret"], api_base_url=mc["url"])
print("Go to the following URL to obtain an access token:")
print(client.auth_request_url(scopes=["read:notifications", "read:statuses"]))
if __name__ == '__main__':
cli()