Files
WatchLink/server.js
ToxicCrzay270 abee76c9b1
All checks were successful
Template Compliance / compliance (push) Successful in 6s
Template Compliance / compliance (pull_request) Successful in 6s
Add admin moderation and sync acknowledgements
2026-06-11 21:00:57 +02:00

519 lines
19 KiB
JavaScript

const { createHmac, timingSafeEqual, randomBytes } = require("node:crypto");
const { createServer } = require("node:http");
const { PrismaClient } = require("@prisma/client");
const next = require("next");
const { Server } = require("socket.io");
const dev = process.env.NODE_ENV !== "production";
const hostname = process.env.HOSTNAME || "0.0.0.0";
const port = Number(process.env.PORT || 3000);
const sessionCookie = "watchlink_session";
const prisma = new PrismaClient();
const app = next({ dev, hostname, port });
const handle = app.getRequestHandler();
const presence = new Map();
app.prepare().then(() => {
const httpServer = createServer((req, res) => handle(req, res));
const io = new Server(httpServer, {
path: "/api/socket",
cors: {
origin: process.env.NEXTAUTH_URL || "http://localhost:3000",
credentials: true
}
});
io.on("connection", (socket) => {
socket.on("room:join", async ({ roomSlug } = {}, acknowledge) => {
await safeSocket(socket, acknowledge, async () => {
const session = await getSocketSession(socket);
if (!session || !roomSlug) return reject(socket, "Sign in to join this room.", acknowledge);
const context = await getRoomContext(roomSlug, session.user.id);
if (!context.allowed) return reject(socket, "You do not have access to this room.", acknowledge);
socket.data.user = session.user;
socket.data.roomSlug = context.room.slug;
socket.data.roomId = context.room.id;
socket.join(context.room.slug);
addPresence(context.room.slug, session.user, socket.id);
socket.emit("room:state", await buildRoomSnapshot(context.room.id));
io.to(context.room.slug).emit("presence:list", getPresence(context.room.slug));
ok(acknowledge);
});
});
socket.on("queue:add", (payload, acknowledge) => safeRoomAction(socket, acknowledge, async ({ room, user }) => {
const sourceUrl = String(payload?.sourceUrl || "").trim();
if (!sourceUrl) return;
const settings = await getAppSettings();
const media = normalizeMediaUrl(sourceUrl);
if (!settings.allowedProviders.includes(media.provider)) return reject(socket, "This media provider is disabled.", acknowledge);
const nextPosition = await prisma.mediaSource.count({ where: { roomId: room.id } });
const created = await prisma.mediaSource.create({
data: {
roomId: room.id,
submitterId: user.id,
provider: media.provider,
originalUrl: media.originalUrl,
playbackUrl: media.playbackUrl,
thumbnailUrl: media.thumbnailUrl,
queuePosition: nextPosition + 1,
title: media.title || media.originalUrl
}
});
if (!room.currentState) {
await persistPlaybackState(room.id, user, {
mediaSourceId: created.id,
status: "PAUSED",
position: 0,
rate: 1
});
}
await audit("room.queue.add", user.id, room.id, { mediaSourceId: created.id, provider: media.provider });
await broadcastRoom(io, room.slug, room.id);
}));
socket.on("queue:play", (payload, acknowledge) => safeRoomAction(socket, acknowledge, async ({ room, user }) => {
const mediaSourceId = String(payload?.mediaSourceId || "");
const media = await prisma.mediaSource.findFirst({ where: { id: mediaSourceId, roomId: room.id } });
if (!media) return;
await persistPlaybackState(room.id, user, { mediaSourceId: media.id, status: "PLAYING", position: 0, rate: 1 });
await audit("room.queue.play", user.id, room.id, { mediaSourceId: media.id });
await broadcastRoom(io, room.slug, room.id);
}));
socket.on("queue:remove", (payload, acknowledge) => safeRoomAction(socket, acknowledge, async ({ room, user }) => {
const mediaSourceId = String(payload?.mediaSourceId || "");
const media = await prisma.mediaSource.findFirst({ where: { id: mediaSourceId, roomId: room.id } });
if (!media) return;
await prisma.mediaSource.delete({ where: { id: media.id } });
await normalizeQueue(room.id);
const current = parseState(room.currentState);
if (current?.mediaSourceId === media.id) {
const nextMedia = await prisma.mediaSource.findFirst({
where: { roomId: room.id },
orderBy: [{ queuePosition: "asc" }, { createdAt: "asc" }]
});
await prisma.room.update({
where: { id: room.id },
data: {
currentState: nextMedia
? playbackState(user, { mediaSourceId: nextMedia.id, status: "PAUSED", position: 0, rate: 1 })
: null
}
});
}
await audit("room.queue.remove", user.id, room.id, { mediaSourceId: media.id });
await broadcastRoom(io, room.slug, room.id);
}));
socket.on("queue:move", (payload, acknowledge) => safeRoomAction(socket, acknowledge, async ({ room, user }) => {
const mediaSourceId = String(payload?.mediaSourceId || "");
const direction = payload?.direction === "down" ? 1 : -1;
await moveMedia(room.id, mediaSourceId, direction);
await audit("room.queue.move", user.id, room.id, { mediaSourceId, direction });
await broadcastRoom(io, room.slug, room.id);
}));
socket.on("playback:play", (payload, acknowledge) => safeRoomAction(socket, acknowledge, async ({ room, user }) => {
await persistPlaybackState(room.id, user, {
mediaSourceId: String(payload?.mediaSourceId || parseState(room.currentState)?.mediaSourceId || ""),
status: "PLAYING",
position: Number(payload?.position || 0),
rate: 1
});
await broadcastRoom(io, room.slug, room.id);
}));
socket.on("playback:pause", (payload, acknowledge) => safeRoomAction(socket, acknowledge, async ({ room, user }) => {
await persistPlaybackState(room.id, user, {
mediaSourceId: String(payload?.mediaSourceId || parseState(room.currentState)?.mediaSourceId || ""),
status: "PAUSED",
position: Number(payload?.position || 0),
rate: 1
});
await broadcastRoom(io, room.slug, room.id);
}));
socket.on("playback:seek", (payload, acknowledge) => safeRoomAction(socket, acknowledge, async ({ room, user }) => {
const previous = parseState(room.currentState);
await persistPlaybackState(room.id, user, {
mediaSourceId: String(payload?.mediaSourceId || previous?.mediaSourceId || ""),
status: previous?.status || "PAUSED",
position: Number(payload?.position || 0),
rate: 1
});
await broadcastRoom(io, room.slug, room.id);
}));
socket.on("chat:message", (payload, acknowledge) => safeRoomAction(socket, acknowledge, async ({ room, user }) => {
const body = String(payload?.body || "").trim().slice(0, 1000);
if (!body) return;
await prisma.roomMessage.create({ data: { roomId: room.id, userId: user.id, body } });
await audit("room.chat.message", user.id, room.id, {});
await broadcastRoom(io, room.slug, room.id);
}));
socket.on("chat:delete", (payload, acknowledge) => safeRoomAction(socket, acknowledge, async ({ room, user }) => {
if (!canManageRoom(room, user.id, user)) return reject(socket, "Only room managers can moderate chat.", acknowledge);
const messageId = String(payload?.messageId || "");
if (!messageId) return;
const result = await prisma.roomMessage.deleteMany({ where: { id: messageId, roomId: room.id } });
if (result.count === 0) return;
await audit("room.chat.delete", user.id, room.id, { messageId });
await broadcastRoom(io, room.slug, room.id);
}));
socket.on("disconnect", () => {
const roomSlug = socket.data.roomSlug;
const user = socket.data.user;
if (roomSlug && user) {
removePresence(roomSlug, socket.id);
io.to(roomSlug).emit("presence:list", getPresence(roomSlug));
}
});
});
httpServer.listen(port, hostname, () => {
console.log(`WatchLink ready on http://${hostname}:${port}`);
});
});
async function safeSocket(socket, acknowledge, action) {
try {
await action();
} catch (error) {
console.error(error);
reject(socket, "Realtime action failed.", acknowledge);
}
}
async function safeRoomAction(socket, acknowledge, action) {
await safeSocket(socket, acknowledge, async () => {
const user = socket.data.user || (await getSocketSession(socket))?.user;
const roomSlug = socket.data.roomSlug;
if (!user || !roomSlug) return reject(socket, "Join a room before sending actions.", acknowledge);
const context = await getRoomContext(roomSlug, user.id);
if (!context.allowed) return reject(socket, "You do not have access to this room.", acknowledge);
const result = await action({ room: context.room, user });
if (result !== false) ok(acknowledge);
});
}
function ok(callback) {
if (typeof callback === "function") callback({ ok: true, at: Date.now() });
}
function reject(socket, message, callback) {
socket.emit("room:error", { message });
if (typeof callback === "function") callback({ ok: false, message, at: Date.now() });
return false;
}
async function broadcastRoom(io, roomSlug, roomId) {
io.to(roomSlug).emit("room:state", await buildRoomSnapshot(roomId));
}
async function getSocketSession(socket) {
const cookies = parseCookies(socket.handshake.headers.cookie || "");
const raw = cookies[sessionCookie];
if (!raw) return null;
const [userId, signature] = raw.split(".");
if (!userId || !signature || !verifySignature(userId, signature)) return null;
const user = await prisma.user.findUnique({
where: { id: userId },
include: { roles: { include: { role: true } } }
});
if (!user || user.disabledAt) return null;
return { user };
}
function parseCookies(header) {
return Object.fromEntries(
header
.split(";")
.map((part) => part.trim())
.filter(Boolean)
.map((part) => {
const index = part.indexOf("=");
return [decodeURIComponent(part.slice(0, index)), decodeURIComponent(part.slice(index + 1))];
})
);
}
function verifySignature(value, signature) {
const expected = createHmac("sha256", process.env.NEXTAUTH_SECRET || "development-only-change-me")
.update(value)
.digest("base64url");
const expectedBuffer = Buffer.from(expected);
const actualBuffer = Buffer.from(signature);
return expectedBuffer.length === actualBuffer.length && timingSafeEqual(expectedBuffer, actualBuffer);
}
async function getRoomContext(slug, userId) {
const room = await prisma.room.findUnique({
where: { slug },
include: {
owner: true,
members: { where: { userId }, select: { userId: true, canManage: true } }
}
});
if (!room) return { allowed: false, room: null };
const user = await prisma.user.findUnique({ where: { id: userId }, include: { roles: { include: { role: true } } } });
const isAdmin = Boolean(user?.roles.some((userRole) => userRole.role.name === "admin"));
const isOwner = room.ownerId === userId;
const explicitMember = room.members.length > 0;
const isFriend = room.ownerId
? Boolean(
await prisma.friendship.findFirst({
where: {
status: "ACCEPTED",
OR: [
{ requesterId: userId, receiverId: room.ownerId },
{ requesterId: room.ownerId, receiverId: userId }
]
},
select: { id: true }
})
)
: false;
const allowed =
isAdmin ||
isOwner ||
room.visibility === "PUBLIC" ||
(room.visibility === "FRIENDS" && (isFriend || explicitMember)) ||
(room.visibility === "EXPLICIT" && explicitMember) ||
(room.visibility === "ROLE_RESTRICTED" && explicitMember);
return { allowed, room };
}
function canManageRoom(room, userId, user) {
const isAdmin = Boolean(user?.roles?.some((userRole) => userRole.role.name === "admin"));
return room.ownerId === userId || isAdmin || room.members.some((member) => member.userId === userId && member.canManage);
}
async function buildRoomSnapshot(roomId) {
const room = await prisma.room.findUnique({
where: { id: roomId },
include: {
mediaSources: {
include: { submitter: true },
orderBy: [{ queuePosition: "asc" }, { createdAt: "asc" }, { id: "asc" }],
take: 60
},
messages: {
include: { user: true },
orderBy: { createdAt: "desc" },
take: 50
}
}
});
if (!room) return null;
const queue = room.mediaSources.map((item) => serializeMedia(item));
const state = hydrateState(parseState(room.currentState), queue);
return {
roomId: room.id,
roomSlug: room.slug,
queue,
playback: state,
messages: room.messages
.slice()
.reverse()
.map((message) => ({
id: message.id,
body: message.body,
createdAt: message.createdAt.toISOString(),
user: message.user?.displayName || message.user?.username || "Deleted user",
avatarUrl: message.user?.avatarUrl || null
}))
};
}
function hydrateState(state, queue) {
const current = queue.find((item) => item.id === state?.mediaSourceId) || queue[0] || null;
if (!current) return null;
const updatedAt = Number(state?.updatedAt || Date.now());
const basePosition = Number(state?.position || 0);
const status = state?.status === "PLAYING" ? "PLAYING" : "PAUSED";
const livePosition = status === "PLAYING" ? basePosition + Math.max(0, Date.now() - updatedAt) / 1000 : basePosition;
return {
...state,
mediaSourceId: current.id,
media: current,
status,
position: livePosition,
rate: Number(state?.rate || 1),
updatedAt
};
}
function serializeMedia(item) {
return {
id: item.id,
title: item.title || item.originalUrl,
provider: item.provider,
originalUrl: item.originalUrl,
playbackUrl: item.playbackUrl,
thumbnailUrl: item.thumbnailUrl,
by: item.submitter?.displayName || item.submitter?.username || "Unknown",
createdAt: new Intl.DateTimeFormat("en", { month: "short", day: "numeric" }).format(item.createdAt)
};
}
async function persistPlaybackState(roomId, user, input) {
if (!input.mediaSourceId) return;
const media = await prisma.mediaSource.findFirst({ where: { id: input.mediaSourceId, roomId } });
if (!media) return;
await prisma.room.update({
where: { id: roomId },
data: {
currentState: playbackState(user, input)
}
});
}
function playbackState(user, input) {
return {
mediaSourceId: input.mediaSourceId,
status: input.status === "PLAYING" ? "PLAYING" : "PAUSED",
position: Math.max(0, Number(input.position || 0)),
rate: Number(input.rate || 1),
updatedBy: user.username,
updatedById: user.id,
updatedAt: Date.now()
};
}
function parseState(value) {
return value && typeof value === "object" ? value : null;
}
async function moveMedia(roomId, mediaSourceId, direction) {
const queue = await prisma.mediaSource.findMany({
where: { roomId },
orderBy: [{ queuePosition: "asc" }, { createdAt: "asc" }, { id: "asc" }],
select: { id: true }
});
const index = queue.findIndex((item) => item.id === mediaSourceId);
const target = index + direction;
if (index < 0 || target < 0 || target >= queue.length) return;
const reordered = [...queue];
[reordered[index], reordered[target]] = [reordered[target], reordered[index]];
await prisma.$transaction(
reordered.map((item, itemIndex) =>
prisma.mediaSource.update({ where: { id: item.id }, data: { queuePosition: itemIndex + 1 } })
)
);
}
async function normalizeQueue(roomId) {
const queue = await prisma.mediaSource.findMany({
where: { roomId },
orderBy: [{ queuePosition: "asc" }, { createdAt: "asc" }, { id: "asc" }],
select: { id: true }
});
await prisma.$transaction(
queue.map((item, index) =>
prisma.mediaSource.update({ where: { id: item.id }, data: { queuePosition: index + 1 } })
)
);
}
function addPresence(roomSlug, user, socketId) {
const rows = presence.get(roomSlug) || new Map();
const existing = rows.get(user.id);
rows.set(user.id, {
id: user.id,
name: user.displayName || user.username,
avatarUrl: user.avatarUrl || null,
status: "Online",
sockets: new Set([...(existing?.sockets || []), socketId])
});
presence.set(roomSlug, rows);
}
function removePresence(roomSlug, socketId) {
const rows = presence.get(roomSlug);
if (!rows) return;
for (const user of rows.values()) {
user.sockets.delete(socketId);
if (user.sockets.size === 0) rows.delete(user.id);
}
}
function getPresence(roomSlug) {
return [...(presence.get(roomSlug)?.values() || [])].map(({ sockets, ...user }) => user);
}
async function getAppSettings() {
const rows = await prisma.appSetting.findMany();
const values = new Map(rows.map((row) => [row.key, row.value]));
return {
allowedProviders: (values.get("allowedProviders") || "YOUTUBE,TWITCH,DIRECT")
.split(",")
.map((item) => item.trim())
.filter(Boolean)
};
}
async function audit(action, actorId, roomId, metadata) {
await prisma.auditEvent.create({ data: { action, actorId, roomId, metadata } }).catch(() => {});
}
function normalizeMediaUrl(input) {
const originalUrl = input.trim();
try {
const url = new URL(originalUrl);
const host = url.hostname.replace(/^www\./, "");
if (host === "youtu.be") {
return youtubeMedia(originalUrl, url.pathname.slice(1));
}
if (host.endsWith("youtube.com")) {
return youtubeMedia(originalUrl, url.searchParams.get("v") || url.pathname.split("/").filter(Boolean).pop());
}
if (host.endsWith("twitch.tv")) {
const parts = url.pathname.split("/").filter(Boolean);
if (parts[0] === "videos" && parts[1]) {
return {
provider: "TWITCH",
originalUrl,
playbackUrl: `https://player.twitch.tv/?video=${parts[1]}&parent=localhost`,
thumbnailUrl: "/icon.svg",
title: originalUrl
};
}
if (parts[0]) {
return {
provider: "TWITCH",
originalUrl,
playbackUrl: `https://player.twitch.tv/?channel=${parts[0]}&parent=localhost`,
thumbnailUrl: "/icon.svg",
title: originalUrl
};
}
}
if (/\.(mp4|webm|ogg|mov|m4v)(\?.*)?$/i.test(url.pathname)) {
return { provider: "DIRECT", originalUrl, playbackUrl: originalUrl, thumbnailUrl: "/icon.svg", title: originalUrl };
}
} catch {}
return { provider: "UNKNOWN", originalUrl, playbackUrl: originalUrl, thumbnailUrl: "/icon.svg", title: originalUrl };
}
function youtubeMedia(originalUrl, id) {
const videoId = String(id || "").replace(/[^a-zA-Z0-9_-]/g, "");
return {
provider: videoId ? "YOUTUBE" : "UNKNOWN",
originalUrl,
playbackUrl: videoId ? `https://www.youtube.com/embed/${videoId}?enablejsapi=1&playsinline=1` : originalUrl,
thumbnailUrl: videoId ? `https://img.youtube.com/vi/${videoId}/hqdefault.jpg` : "/icon.svg",
title: originalUrl
};
}