phantasmobile/app/signalbot/rpc.py
2025-02-25 22:55:10 +01:00

80 lines
2.3 KiB
Python

import json
import logging
import subprocess
import requests
from django.conf import settings
logger = logging.getLogger(__name__)
rpc_id = 1
def api(method, params):
global rpc_id
rpc_id += 1
url = "http://127.0.0.1:8080/api/v1/rpc"
msg = {
"jsonrpc": "2.0",
"id": str(rpc_id),
"method": method,
}
if params:
msg["params"] = params
if settings.DEBUG:
print("POST signal rpc:", msg)
response = requests.post(url, json=msg).json()
if "result" in response:
return response["result"]
else:
raise Exception("Error: %s", response)
def send(msg, to=None, group=None, preview_url=None, preview_title=None, preview_description=None, preview_image=None):
params = {
"message": msg
}
if group:
params["groupId"] = group
verifyGroupMembers(group)
else:
params["recipient"] = to
if preview_url:
params['previewUrl'] = preview_url
if preview_title:
params['previewTitle'] = preview_title
if preview_description:
params['previewDescription'] = preview_description
if preview_image:
params['previewImage'] = preview_image
return api("send", params)
def send_reaction(target_address, target_ts, emoji, to=None, group=None, remove=False):
params = {
"emoji": emoji,
"targetTimestamp": target_ts,
"targetAuthor": target_address,
}
if group:
params["groupId"] = group
else:
params["recipient"] = to
return api("sendReaction", params)
def verifyGroupMembers(group_id):
for group in api("listGroups", {}):
if group["id"] == group_id:
for member in group["members"]:
if member["number"] != settings.SIGNAL_ACCOUNT:
for detail in api("listIdentities", {
"number": member["number"]
}):
if detail['trustLevel'] != 'TRUSTED_VERIFIED':
logger.error("%s verification numbers changed (accepting new numbers)", member["number"])
r = api("trust", {
"recipient": member["number"],
"verifiedSafetyNumber": detail['fingerprint'],
})