Working the test suite, and fixing formatting

This commit is contained in:
Thomas Sileo 2018-05-21 11:21:11 +02:00
parent 3496aee23f
commit c9ba124bdd
5 changed files with 62 additions and 39 deletions

View file

@ -13,7 +13,7 @@ install:
- pip install -r dev-requirements.txt
script:
- mypy --ignore-missing-imports .
# - flake8
- flake8 activitypub.py
- cp -r tests/me.yml config/me.yml
- docker-compose up -d
- docker-compose ps

View file

@ -1,5 +1,3 @@
import typing
import re
import json
import binascii
import os
@ -7,16 +5,12 @@ from datetime import datetime
from enum import Enum
import requests
from bleach.linkifier import Linker
from bson.objectid import ObjectId
from html2text import html2text
from feedgen.feed import FeedGenerator
from markdown import markdown
from utils.linked_data_sig import generate_signature
from utils.actor_service import NotAnActorError
from utils.webfinger import get_actor_url
from utils.content_helper import parse_markdown
from config import USERNAME, BASE_URL, ID
from config import CTX_AS, CTX_SECURITY, AS_PUBLIC
from config import KEY, DB, ME, ACTOR_SERVICE
@ -24,7 +18,7 @@ from config import OBJECT_SERVICE
from config import PUBLIC_INSTANCES
import tasks
from typing import List, Optional, Tuple, Dict, Any, Union, Type
from typing import List, Optional, Dict, Any, Union
from typing import TypeVar
A = TypeVar('A', bound='BaseActivity')
@ -32,10 +26,6 @@ ObjectType = Dict[str, Any]
ObjectOrIDType = Union[str, ObjectType]
# Pleroma sample
# {'@context': ['https://www.w3.org/ns/activitystreams', 'https://w3id.org/security/v1', {'Emoji': 'toot:Emoji', 'Hashtag': 'as:Hashtag', 'atomUri': 'ostatus:atomUri', 'conversation': 'ostatus:conversation', 'inReplyToAtomUri': 'ostatus:inReplyToAtomUri', 'manuallyApprovesFollowers': 'as:manuallyApprovesFollowers', 'ostatus': 'http://ostatus.org#', 'sensitive': 'as:sensitive', 'toot': 'http://joinmastodon.org/ns#'}], 'actor': 'https://soc.freedombone.net/users/bob', 'attachment': [{'mediaType': 'image/jpeg', 'name': 'stallmanlemote.jpg', 'type': 'Document', 'url': 'https://soc.freedombone.net/media/e1a3ca6f-df73-4f2d-a931-c389a221b008/stallmanlemote.jpg'}], 'attributedTo': 'https://soc.freedombone.net/users/bob', 'cc': ['https://cybre.space/users/vantablack', 'https://soc.freedombone.net/users/bob/followers'], 'content': '<span><a href=\'https://cybre.space/users/vantablack\'>@<span>vantablack</span></a></span><br><a href="https://soc.freedombone.net/media/e1a3ca6f-df73-4f2d-a931-c389a221b008/stallmanlemote.jpg" class=\'attachment\'>stallmanlemote.jpg</a>', 'context': 'tag:cybre.space,2018-04-05:objectId=5756519:objectType=Conversation', 'conversation': 'tag:cybre.space,2018-04-05:objectId=5756519:objectType=Conversation', 'emoji': {}, 'id': 'https://soc.freedombone.net/objects/3f0faeca-4d37-4acf-b990-6a50146d23cc', 'inReplyTo': 'https://cybre.space/users/vantablack/statuses/99808953472969467', 'inReplyToStatusId': 300713, 'like_count': 1, 'likes': ['https://cybre.space/users/vantablack'], 'published': '2018-04-05T21:30:52.658817Z', 'sensitive': False, 'summary': None, 'tag': [{'href': 'https://cybre.space/users/vantablack', 'name': '@vantablack@cybre.space', 'type': 'Mention'}], 'to': ['https://www.w3.org/ns/activitystreams#Public'], 'type': 'Note'}
class ActivityTypes(Enum):
ANNOUNCE = 'Announce'
BLOCK = 'Block'
@ -142,7 +132,7 @@ class BaseActivity(object):
if not self.NO_CONTEXT:
if not isinstance(self._data['@context'], list):
self._data['@context'] = [self._data['@context']]
if not CTX_SECURITY in self._data['@context']:
if CTX_SECURITY not in self._data['@context']:
self._data['@context'].append(CTX_SECURITY)
if isinstance(self._data['@context'][-1], dict):
self._data['@context'][-1]['Hashtag'] = 'as:Hashtag'
@ -326,7 +316,6 @@ class BaseActivity(object):
except NotImplementedError:
pass
#return
generate_signature(activity, KEY.privkey)
payload = json.dumps(activity)
print('will post')
@ -409,7 +398,6 @@ class Person(BaseActivity):
def _to_dict(self, data):
# if 'icon' in data:
# data['icon'] = data['icon'].to_dict()
#
return data
@ -512,6 +500,7 @@ class Undo(BaseActivity):
except NotImplementedError:
pass
class Like(BaseActivity):
ACTIVITY_TYPE = ActivityTypes.LIKE
ALLOWED_OBJECT_TYPES = [ActivityTypes.NOTE]
@ -567,7 +556,12 @@ class Announce(BaseActivity):
return
# Save/cache the object, and make it part of the stream so we can fetch it
if isinstance(self._data['object'], str):
raw_obj = OBJECT_SERVICE.get(self._data['object'], reload_cache=True, part_of_stream=True, announce_published=self._data['published'])
raw_obj = OBJECT_SERVICE.get(
self._data['object'],
reload_cache=True,
part_of_stream=True,
announce_published=self._data['published'],
)
obj = parse_activity(raw_obj)
else:
obj = self.get_object()
@ -581,7 +575,12 @@ class Announce(BaseActivity):
def _post_to_outbox(self, obj_id: str, activity: ObjectType, recipients: List[str]) -> None:
if isinstance(self._data['object'], str):
# Put the object in the cache
OBJECT_SERVICE.get(self._data['object'], reload_cache=True, part_of_stream=True, announce_published=self._data['published'])
OBJECT_SERVICE.get(
self._data['object'],
reload_cache=True,
part_of_stream=True,
announce_published=self._data['published'],
)
obj = self.get_object()
DB.inbox.update_one({'activity.object.id': obj.id}, {'$set': {'meta.boosted': obj_id}})
@ -702,7 +701,7 @@ class Create(BaseActivity):
parent = DB.inbox.find_one({'activity.type': 'Create', 'activity.object.id': in_reply_to})
if parent is None:
# The reply is a note from the outbox
data = DB.outbox.update_one(
DB.outbox.update_one(
{'activity.object.id': in_reply_to},
{'$inc': {'meta.count_reply': 1}},
)
@ -772,6 +771,7 @@ class Note(BaseActivity):
published=datetime.utcnow().replace(microsecond=0).isoformat() + 'Z',
)
_ACTIVITY_TYPE_TO_CLS = {
ActivityTypes.IMAGE: Image,
ActivityTypes.PERSON: Person,
@ -789,6 +789,7 @@ _ACTIVITY_TYPE_TO_CLS = {
ActivityTypes.TOMBSTONE: Tombstone,
}
def parse_activity(payload: ObjectType) -> BaseActivity:
t = ActivityTypes(payload['type'])
if t not in _ACTIVITY_TYPE_TO_CLS:
@ -805,7 +806,6 @@ def gen_feed():
fg.link(href=ID, rel='alternate')
fg.description(f'{USERNAME} notes')
fg.logo(ME.get('icon', {}).get('url'))
#fg.link( href='http://larskiesow.de/test.atom', rel='self' )
fg.language('en')
for item in DB.outbox.find({'type': 'Create'}, limit=50):
fe = fg.add_entry()
@ -829,7 +829,8 @@ def json_feed(path: str) -> Dict[str, Any]:
})
return {
"version": "https://jsonfeed.org/version/1",
"user_comment": "This is a microblog feed. You can add this to your feed reader using the following URL: " + ID + path,
"user_comment": ("This is a microblog feed. You can add this to your feed reader using the following URL: "
+ ID + path),
"title": USERNAME,
"home_page_url": ID,
"feed_url": ID + path,
@ -959,16 +960,16 @@ def build_ordered_collection(col, q=None, cursor=None, map_func=None, limit=50,
if not cursor:
resp = {
'@context': CTX_AS,
'id': f'{BASE_URL}/{col_name}',
'totalItems': total_items,
'type': 'OrderedCollection',
'first': {
'id': BASE_URL + '/' + col_name + '?cursor=' + start_cursor,
'id': f'{BASE_URL}/{col_name}?cursor={start_cursor}',
'orderedItems': data,
'partOf': BASE_URL + '/' + col_name,
'partOf': f'{BASE_URL}/{col_name}',
'totalItems': total_items,
'type': 'OrderedCollectionPage'
},
'id': BASE_URL + '/' + col_name,
'totalItems': total_items,
'type': 'OrderedCollection'
}
if len(data) == limit:

2
app.py
View file

@ -34,7 +34,7 @@ import activitypub
import config
from activitypub import ActivityTypes
from activitypub import clean_activity
from activitypub import parse_markdown
from utils.content_helper import parse_markdown
from config import KEY
from config import DB
from config import ME

View file

@ -1,4 +1,5 @@
pytest
requests
html2text
flake8
mypy

View file

@ -1,7 +1,28 @@
import requests
import os
def test_ping_homepage():
import pytest
import requests
from html2text import html2text
@pytest.fixture
def config():
"""Return the current config as a dict."""
import yaml
with open(os.path.join(os.path.dirname(__file__), '..', 'config/me.yml'), 'rb') as f:
yield yaml.load(f)
def resp2plaintext(resp):
"""Convert the body of a requests reponse to plain text in order to make basic assertions."""
return html2text(resp.text)
def test_ping_homepage(config):
"""Ensure the homepage is accessible."""
resp = requests.get('http://localhost:5005')
resp.raise_for_status()
assert resp.status_code == 200
body = resp2plaintext(resp)
assert config['name'] in body
assert f"@{config['username']}@{config['domain']}" in body