Add support for forwarding activities

This commit is contained in:
Thomas Sileo 2022-07-06 19:04:38 +02:00
parent 9594cd3108
commit 8fbb48f671
7 changed files with 171 additions and 8 deletions

View file

@ -0,0 +1,65 @@
"""Allow activity forwarding
Revision ID: 93e36ff5c691
Revises: ba131b14c3a1
Create Date: 2022-07-06 09:03:57.656539
"""
import sqlalchemy as sa
from sqlalchemy.schema import CreateTable
from alembic import op
from app.database import engine
from app.models import OutgoingActivity
# revision identifiers, used by Alembic.
revision = '93e36ff5c691'
down_revision = 'ba131b14c3a1'
branch_labels = None
depends_on = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.create_index(op.f('ix_inbox_activity_object_ap_id'), 'inbox', ['activity_object_ap_id'], unique=False)
op.create_index(op.f('ix_inbox_ap_type'), 'inbox', ['ap_type'], unique=False)
op.create_index(op.f('ix_outbox_activity_object_ap_id'), 'outbox', ['activity_object_ap_id'], unique=False)
op.create_index(op.f('ix_outbox_ap_type'), 'outbox', ['ap_type'], unique=False)
# ### end Alembic commands ###
# XXX: cannot remove alter to make a column nullable, we have to drop/recreate it
create_statement = CreateTable(OutgoingActivity.__table__).compile(engine)
op.execute("DROP TABLE IF EXISTS outgoing_activity;")
op.execute(f"{create_statement};")
# Instead of this:
# op.add_column('outgoing_activity', sa.Column('inbox_object_id', sa.Integer(), nullable=True))
# op.alter_column('outgoing_activity', 'outbox_object_id',
# existing_type=sa.INTEGER(),
# nullable=True)
# op.create_foreign_key(None, 'outgoing_activity', 'inbox', ['inbox_object_id'], ['id'])
# op.create_foreign_key(None, 'outgoing_activity', 'outbox', ['outbox_object_id'], ['id'])
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_table("outgoing_activity")
op.create_table('outgoing_activity',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('created_at', sa.DateTime(timezone=True), nullable=False),
sa.Column('recipient', sa.String(), nullable=False),
sa.Column('outbox_object_id', sa.Integer(), nullable=False),
sa.Column('tries', sa.Integer(), nullable=False),
sa.Column('next_try', sa.DateTime(timezone=True), nullable=True),
sa.Column('last_try', sa.DateTime(timezone=True), nullable=True),
sa.Column('last_status_code', sa.Integer(), nullable=True),
sa.Column('last_response', sa.String(), nullable=True),
sa.Column('is_sent', sa.Boolean(), nullable=False),
sa.Column('is_errored', sa.Boolean(), nullable=False),
sa.Column('error', sa.String(), nullable=True),
sa.ForeignKeyConstraint(['outbox_object_id'], ['outbox.id'], ),
sa.PrimaryKeyConstraint('id')
)
op.drop_index(op.f('ix_outbox_ap_type'), table_name='outbox')
op.drop_index(op.f('ix_outbox_activity_object_ap_id'), table_name='outbox')
op.drop_index(op.f('ix_inbox_ap_type'), table_name='inbox')
op.drop_index(op.f('ix_inbox_activity_object_ap_id'), table_name='inbox')
# ### end Alembic commands ###

View file

@ -155,6 +155,10 @@ class Object:
def in_reply_to(self) -> str | None: def in_reply_to(self) -> str | None:
return self.ap_object.get("inReplyTo") return self.ap_object.get("inReplyTo")
@property
def has_ld_signature(self) -> bool:
return bool(self.ap_object.get("signature"))
def _to_camel(string: str) -> str: def _to_camel(string: str) -> str:
cased = "".join(word.capitalize() for word in string.split("_")) cased = "".join(word.capitalize() for word in string.split("_"))

View file

@ -417,6 +417,23 @@ async def _compute_recipients(
return recipients return recipients
async def _get_followers_recipients(db_session: AsyncSession) -> set[str]:
"""Returns all the recipients from the local follower collection."""
followers = (
(
await db_session.scalars(
select(models.Follower).options(joinedload(models.Follower.actor))
)
)
.unique()
.all()
)
return {
follower.actor.shared_inbox_url or follower.actor.inbox_url
for follower in followers
}
async def get_inbox_object_by_ap_id( async def get_inbox_object_by_ap_id(
db_session: AsyncSession, ap_id: str db_session: AsyncSession, ap_id: str
) -> models.InboxObject | None: ) -> models.InboxObject | None:
@ -455,6 +472,7 @@ async def get_anybox_object_by_ap_id(
async def _handle_delete_activity( async def _handle_delete_activity(
db_session: AsyncSession, db_session: AsyncSession,
from_actor: models.Actor, from_actor: models.Actor,
delete_activity: models.InboxObject,
ap_object_to_delete: models.InboxObject, ap_object_to_delete: models.InboxObject,
) -> None: ) -> None:
if from_actor.ap_id != ap_object_to_delete.actor.ap_id: if from_actor.ap_id != ap_object_to_delete.actor.ap_id:
@ -464,6 +482,23 @@ async def _handle_delete_activity(
) )
return return
# If it's a local replies, it was forwarded, so we also need to forward
# the Delete activity if possible
if (
delete_activity.has_ld_signature
and ap_object_to_delete.in_reply_to
and ap_object_to_delete.in_reply_to.startswith(BASE_URL)
):
logger.info("Forwarding Delete activity as it's a local reply")
recipients = await _get_followers_recipients(db_session)
for rcp in recipients:
await new_outgoing_activity(
db_session,
rcp,
outbox_object_id=None,
inbox_object_id=delete_activity.id,
)
# TODO(ts): do we need to delete related activities? should we keep # TODO(ts): do we need to delete related activities? should we keep
# bookmarked objects with a deleted flag? # bookmarked objects with a deleted flag?
logger.info(f"Deleting {ap_object_to_delete.ap_type}/{ap_object_to_delete.ap_id}") logger.info(f"Deleting {ap_object_to_delete.ap_type}/{ap_object_to_delete.ap_id}")
@ -680,6 +715,19 @@ async def _handle_create_activity(
.values(replies_count=models.OutboxObject.replies_count + 1) .values(replies_count=models.OutboxObject.replies_count + 1)
) )
# This object is a reply of a local object, we may need to forward it
# to our followers (we can only forward JSON-LD signed activities)
if create_activity.has_ld_signature:
logger.info("Forwarding Create activity as it's a local reply")
recipients = await _get_followers_recipients(db_session)
for rcp in recipients:
await new_outgoing_activity(
db_session,
rcp,
outbox_object_id=None,
inbox_object_id=create_activity.id,
)
for tag in tags: for tag in tags:
if tag.get("name") == LOCAL_ACTOR.handle or tag.get("href") == LOCAL_ACTOR.url: if tag.get("name") == LOCAL_ACTOR.handle or tag.get("href") == LOCAL_ACTOR.url:
notif = models.Notification( notif = models.Notification(
@ -773,7 +821,12 @@ async def save_to_inbox(
await _handle_update_activity(db_session, actor, inbox_object) await _handle_update_activity(db_session, actor, inbox_object)
elif activity_ro.ap_type == "Delete": elif activity_ro.ap_type == "Delete":
if relates_to_inbox_object: if relates_to_inbox_object:
await _handle_delete_activity(db_session, actor, relates_to_inbox_object) await _handle_delete_activity(
db_session,
actor,
inbox_object,
relates_to_inbox_object,
)
else: else:
# TODO(ts): handle delete actor # TODO(ts): handle delete actor
logger.info( logger.info(

View file

@ -113,6 +113,7 @@ async def _get_public_key(db_session: AsyncSession, key_id: str) -> Key:
class HTTPSigInfo: class HTTPSigInfo:
has_valid_signature: bool has_valid_signature: bool
signed_by_ap_actor_id: str | None = None signed_by_ap_actor_id: str | None = None
is_ap_actor_gone: bool = False
async def httpsig_checker( async def httpsig_checker(
@ -139,7 +140,7 @@ async def httpsig_checker(
k = await _get_public_key(db_session, hsig["keyId"]) k = await _get_public_key(db_session, hsig["keyId"])
except ap.ObjectIsGoneError: except ap.ObjectIsGoneError:
logger.info("Actor is gone") logger.info("Actor is gone")
return HTTPSigInfo(has_valid_signature=False) return HTTPSigInfo(has_valid_signature=False, is_ap_actor_gone=True)
except Exception: except Exception:
logger.exception(f'Failed to fetch HTTP sig key {hsig["keyId"]}') logger.exception(f'Failed to fetch HTTP sig key {hsig["keyId"]}')
return HTTPSigInfo(has_valid_signature=False) return HTTPSigInfo(has_valid_signature=False)
@ -162,6 +163,13 @@ async def enforce_httpsig(
logger.warning(f"Invalid HTTP sig {httpsig_info=}") logger.warning(f"Invalid HTTP sig {httpsig_info=}")
body = await request.body() body = await request.body()
logger.info(f"{body=}") logger.info(f"{body=}")
# Special case for Mastoodon instance that keep resending Delete
# activities for actor we don't know about if we raise a 401
if httpsig_info.is_ap_actor_gone:
logger.info("Let's make Mastodon happy, returning a 204")
raise fastapi.HTTPException(status_code=204)
raise fastapi.HTTPException(status_code=401, detail="Invalid HTTP sig") raise fastapi.HTTPException(status_code=401, detail="Invalid HTTP sig")
return httpsig_info return httpsig_info

View file

@ -6,6 +6,7 @@ from datetime import datetime
import pyld # type: ignore import pyld # type: ignore
from Crypto.Hash import SHA256 from Crypto.Hash import SHA256
from Crypto.Signature import PKCS1_v1_5 from Crypto.Signature import PKCS1_v1_5
from loguru import logger
from pyld import jsonld # type: ignore from pyld import jsonld # type: ignore
from app import activitypub as ap from app import activitypub as ap
@ -59,7 +60,8 @@ async def verify_signature(
doc: ap.RawObject, doc: ap.RawObject,
) -> bool: ) -> bool:
if "signature" not in doc: if "signature" not in doc:
raise ValueError("No embedded signature") logger.warning("The object does contain a signature")
return False
key_id = doc["signature"]["creator"] key_id = doc["signature"]["creator"]
key = await _get_public_key(db_session, key_id) key = await _get_public_key(db_session, key_id)

View file

@ -321,9 +321,14 @@ class OutgoingActivity(Base):
created_at = Column(DateTime(timezone=True), nullable=False, default=now) created_at = Column(DateTime(timezone=True), nullable=False, default=now)
recipient = Column(String, nullable=False) recipient = Column(String, nullable=False)
outbox_object_id = Column(Integer, ForeignKey("outbox.id"), nullable=False)
outbox_object_id = Column(Integer, ForeignKey("outbox.id"), nullable=True)
outbox_object = relationship(OutboxObject, uselist=False) outbox_object = relationship(OutboxObject, uselist=False)
# Can also reference an inbox object if it needds to be forwarded
inbox_object_id = Column(Integer, ForeignKey("inbox.id"), nullable=True)
inbox_object = relationship(InboxObject, uselist=False)
tries = Column(Integer, nullable=False, default=0) tries = Column(Integer, nullable=False, default=0)
next_try = Column(DateTime(timezone=True), nullable=True, default=now) next_try = Column(DateTime(timezone=True), nullable=True, default=now)
@ -335,6 +340,15 @@ class OutgoingActivity(Base):
is_errored = Column(Boolean, nullable=False, default=False) is_errored = Column(Boolean, nullable=False, default=False)
error = Column(String, nullable=True) error = Column(String, nullable=True)
@property
def anybox_object(self) -> OutboxObject | InboxObject:
if self.outbox_object_id:
return self.outbox_object # type: ignore
elif self.inbox_object_id:
return self.inbox_object # type: ignore
else:
raise ValueError("Should never happen")
class TaggedOutboxObject(Base): class TaggedOutboxObject(Base):
__tablename__ = "tagged_outbox_object" __tablename__ = "tagged_outbox_object"

View file

@ -9,6 +9,7 @@ from loguru import logger
from sqlalchemy import func from sqlalchemy import func
from sqlalchemy import select from sqlalchemy import select
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
from sqlalchemy.orm import joinedload
from app import activitypub as ap from app import activitypub as ap
from app import config from app import config
@ -29,11 +30,18 @@ k.load(KEY_PATH.read_text())
async def new_outgoing_activity( async def new_outgoing_activity(
db_session: AsyncSession, db_session: AsyncSession,
recipient: str, recipient: str,
outbox_object_id: int, outbox_object_id: int | None,
inbox_object_id: int | None = None,
) -> models.OutgoingActivity: ) -> models.OutgoingActivity:
if outbox_object_id is None and inbox_object_id is None:
raise ValueError("Must reference at least one inbox/outbox activity")
elif outbox_object_id and inbox_object_id:
raise ValueError("Cannot reference both inbox/outbox activities")
outgoing_activity = models.OutgoingActivity( outgoing_activity = models.OutgoingActivity(
recipient=recipient, recipient=recipient,
outbox_object_id=outbox_object_id, outbox_object_id=outbox_object_id,
inbox_object_id=inbox_object_id,
) )
db_session.add(outgoing_activity) db_session.add(outgoing_activity)
@ -93,17 +101,26 @@ def process_next_outgoing_activity(db: Session) -> bool:
select(models.OutgoingActivity) select(models.OutgoingActivity)
.where(*where) .where(*where)
.limit(1) .limit(1)
.options(
joinedload(models.OutgoingActivity.inbox_object),
joinedload(models.OutgoingActivity.outbox_object),
)
.order_by(models.OutgoingActivity.next_try) .order_by(models.OutgoingActivity.next_try)
).scalar_one() ).scalar_one()
next_activity.tries = next_activity.tries + 1 next_activity.tries = next_activity.tries + 1
next_activity.last_try = now() next_activity.last_try = now()
payload = ap.wrap_object_if_needed(next_activity.outbox_object.ap_object) payload = ap.wrap_object_if_needed(next_activity.anybox_object.ap_object)
# Use LD sig if the activity may need to be forwarded by recipients # Use LD sig if the activity may need to be forwarded by recipients
if payload["type"] in ["Create", "Delete"]: if next_activity.anybox_object.is_from_outbox and payload["type"] in [
ldsig.generate_signature(payload, k) "Create",
"Delete",
]:
# But only if the object is public (to help with deniability/privacy)
if next_activity.outbox_object.visibility == ap.VisibilityEnum.PUBLIC:
ldsig.generate_signature(payload, k)
logger.info(f"{payload=}") logger.info(f"{payload=}")
try: try: