from collaboration import ChangeSession, Comment, InMemoryCollaborationStore, Ownership, Task, User def test_start_session_requires_known_task_and_user(): store = InMemoryCollaborationStore() store.upsert_user(User(user_id="user.1", display_name="Tester")) store.upsert_task(Task(task_id="task.1", project_id="demo", title="Index project")) session = store.start_session( ChangeSession(session_id="session.1", task_id="task.1", user_id="user.1") ) assert session.session_id == "session.1" def test_finish_session_sets_finished_at_once(): store = InMemoryCollaborationStore() store.upsert_user(User(user_id="user.1", display_name="Tester")) store.upsert_task(Task(task_id="task.1", project_id="demo", title="Index project")) store.start_session(ChangeSession(session_id="session.1", task_id="task.1", user_id="user.1")) finished = store.finish_session("session.1") finished_again = store.finish_session("session.1") assert finished.finished_at is not None assert finished_again.finished_at == finished.finished_at def test_assign_owner_is_project_and_target_scoped(): store = InMemoryCollaborationStore() store.upsert_user(User(user_id="user.1", display_name="Tester")) ownership = store.assign_owner( Ownership( owner_user_id="user.1", project_id="demo", target_id="lineage.document.order", role="RESPONSIBLE", ) ) assert ownership.owner_user_id == "user.1" assert store.owners_for_project("demo") == [ownership] assert store.owners_for_target("demo", "lineage.document.order") == [ownership] def test_add_comment_is_project_and_target_scoped(): store = InMemoryCollaborationStore() store.upsert_user(User(user_id="user.1", display_name="Tester")) comment = store.add_comment( Comment( comment_id="comment.1", project_id="demo", target_id="lineage.document.order", user_id="user.1", body="Check posting rules.", ) ) assert store.comments_for_project("demo") == [comment] assert store.comments_for_target("demo", "lineage.document.order") == [comment]