Complete WatchLink V1 realtime features
This commit is contained in:
501
server.js
501
server.js
@@ -1,70 +1,495 @@
|
||||
const { createHmac, timingSafeEqual, randomBytes } = require("node:crypto");
|
||||
const { createServer } = require("node:http");
|
||||
const { PrismaClient } = require("@prisma/client");
|
||||
const next = require("next");
|
||||
const { Server } = require("socket.io");
|
||||
|
||||
const dev = process.env.NODE_ENV !== "production";
|
||||
const hostname = process.env.HOSTNAME || "0.0.0.0";
|
||||
const port = Number(process.env.PORT || 3000);
|
||||
const sessionCookie = "watchlink_session";
|
||||
const prisma = new PrismaClient();
|
||||
|
||||
const app = next({ dev, hostname, port });
|
||||
const handle = app.getRequestHandler();
|
||||
|
||||
const roomStates = new Map();
|
||||
const presence = new Map();
|
||||
|
||||
app.prepare().then(() => {
|
||||
const httpServer = createServer((req, res) => handle(req, res));
|
||||
const io = new Server(httpServer, {
|
||||
path: "/api/socket",
|
||||
cors: {
|
||||
origin: process.env.NEXTAUTH_URL || "http://localhost:3000"
|
||||
origin: process.env.NEXTAUTH_URL || "http://localhost:3000",
|
||||
credentials: true
|
||||
}
|
||||
});
|
||||
|
||||
io.on("connection", (socket) => {
|
||||
socket.on("room:join", ({ roomSlug, user }) => {
|
||||
if (!roomSlug) return;
|
||||
socket.join(roomSlug);
|
||||
socket.data.roomSlug = roomSlug;
|
||||
socket.data.user = user || "Guest";
|
||||
socket.emit("room:state", roomStates.get(roomSlug) || null);
|
||||
socket.to(roomSlug).emit("presence:join", { user: socket.data.user });
|
||||
socket.on("room:join", async ({ roomSlug } = {}) => {
|
||||
await safeSocket(socket, async () => {
|
||||
const session = await getSocketSession(socket);
|
||||
if (!session || !roomSlug) return reject(socket, "Sign in to join this room.");
|
||||
const context = await getRoomContext(roomSlug, session.user.id);
|
||||
if (!context.allowed) return reject(socket, "You do not have access to this room.");
|
||||
|
||||
socket.data.user = session.user;
|
||||
socket.data.roomSlug = context.room.slug;
|
||||
socket.data.roomId = context.room.id;
|
||||
socket.join(context.room.slug);
|
||||
addPresence(context.room.slug, session.user, socket.id);
|
||||
|
||||
socket.emit("room:state", await buildRoomSnapshot(context.room.id));
|
||||
io.to(context.room.slug).emit("presence:list", getPresence(context.room.slug));
|
||||
});
|
||||
});
|
||||
|
||||
socket.on("media:set", (payload) => updateRoom(socket, "media:set", payload));
|
||||
socket.on("playback:play", (payload) => updateRoom(socket, "playback:play", payload));
|
||||
socket.on("playback:pause", (payload) => updateRoom(socket, "playback:pause", payload));
|
||||
socket.on("playback:seek", (payload) => updateRoom(socket, "playback:seek", payload));
|
||||
socket.on("chat:message", (payload) => relayRoom(socket, "chat:message", payload));
|
||||
socket.on("queue:add", (payload) => safeRoomAction(socket, async ({ room, user }) => {
|
||||
const sourceUrl = String(payload?.sourceUrl || "").trim();
|
||||
if (!sourceUrl) return;
|
||||
const settings = await getAppSettings();
|
||||
const media = normalizeMediaUrl(sourceUrl);
|
||||
if (!settings.allowedProviders.includes(media.provider)) return reject(socket, "This media provider is disabled.");
|
||||
|
||||
const nextPosition = await prisma.mediaSource.count({ where: { roomId: room.id } });
|
||||
const created = await prisma.mediaSource.create({
|
||||
data: {
|
||||
roomId: room.id,
|
||||
submitterId: user.id,
|
||||
provider: media.provider,
|
||||
originalUrl: media.originalUrl,
|
||||
playbackUrl: media.playbackUrl,
|
||||
thumbnailUrl: media.thumbnailUrl,
|
||||
queuePosition: nextPosition + 1,
|
||||
title: media.title || media.originalUrl
|
||||
}
|
||||
});
|
||||
|
||||
if (!room.currentState) {
|
||||
await persistPlaybackState(room.id, user, {
|
||||
mediaSourceId: created.id,
|
||||
status: "PAUSED",
|
||||
position: 0,
|
||||
rate: 1
|
||||
});
|
||||
}
|
||||
|
||||
await audit("room.queue.add", user.id, room.id, { mediaSourceId: created.id, provider: media.provider });
|
||||
await broadcastRoom(io, room.slug, room.id);
|
||||
}));
|
||||
|
||||
socket.on("queue:play", (payload) => safeRoomAction(socket, async ({ room, user }) => {
|
||||
const mediaSourceId = String(payload?.mediaSourceId || "");
|
||||
const media = await prisma.mediaSource.findFirst({ where: { id: mediaSourceId, roomId: room.id } });
|
||||
if (!media) return;
|
||||
await persistPlaybackState(room.id, user, { mediaSourceId: media.id, status: "PLAYING", position: 0, rate: 1 });
|
||||
await audit("room.queue.play", user.id, room.id, { mediaSourceId: media.id });
|
||||
await broadcastRoom(io, room.slug, room.id);
|
||||
}));
|
||||
|
||||
socket.on("queue:remove", (payload) => safeRoomAction(socket, async ({ room, user }) => {
|
||||
const mediaSourceId = String(payload?.mediaSourceId || "");
|
||||
const media = await prisma.mediaSource.findFirst({ where: { id: mediaSourceId, roomId: room.id } });
|
||||
if (!media) return;
|
||||
await prisma.mediaSource.delete({ where: { id: media.id } });
|
||||
await normalizeQueue(room.id);
|
||||
|
||||
const current = parseState(room.currentState);
|
||||
if (current?.mediaSourceId === media.id) {
|
||||
const nextMedia = await prisma.mediaSource.findFirst({
|
||||
where: { roomId: room.id },
|
||||
orderBy: [{ queuePosition: "asc" }, { createdAt: "asc" }]
|
||||
});
|
||||
await prisma.room.update({
|
||||
where: { id: room.id },
|
||||
data: {
|
||||
currentState: nextMedia
|
||||
? playbackState(user, { mediaSourceId: nextMedia.id, status: "PAUSED", position: 0, rate: 1 })
|
||||
: null
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
await audit("room.queue.remove", user.id, room.id, { mediaSourceId: media.id });
|
||||
await broadcastRoom(io, room.slug, room.id);
|
||||
}));
|
||||
|
||||
socket.on("queue:move", (payload) => safeRoomAction(socket, async ({ room, user }) => {
|
||||
const mediaSourceId = String(payload?.mediaSourceId || "");
|
||||
const direction = payload?.direction === "down" ? 1 : -1;
|
||||
await moveMedia(room.id, mediaSourceId, direction);
|
||||
await audit("room.queue.move", user.id, room.id, { mediaSourceId, direction });
|
||||
await broadcastRoom(io, room.slug, room.id);
|
||||
}));
|
||||
|
||||
socket.on("playback:play", (payload) => safeRoomAction(socket, async ({ room, user }) => {
|
||||
await persistPlaybackState(room.id, user, {
|
||||
mediaSourceId: String(payload?.mediaSourceId || parseState(room.currentState)?.mediaSourceId || ""),
|
||||
status: "PLAYING",
|
||||
position: Number(payload?.position || 0),
|
||||
rate: 1
|
||||
});
|
||||
await broadcastRoom(io, room.slug, room.id);
|
||||
}));
|
||||
|
||||
socket.on("playback:pause", (payload) => safeRoomAction(socket, async ({ room, user }) => {
|
||||
await persistPlaybackState(room.id, user, {
|
||||
mediaSourceId: String(payload?.mediaSourceId || parseState(room.currentState)?.mediaSourceId || ""),
|
||||
status: "PAUSED",
|
||||
position: Number(payload?.position || 0),
|
||||
rate: 1
|
||||
});
|
||||
await broadcastRoom(io, room.slug, room.id);
|
||||
}));
|
||||
|
||||
socket.on("playback:seek", (payload) => safeRoomAction(socket, async ({ room, user }) => {
|
||||
const previous = parseState(room.currentState);
|
||||
await persistPlaybackState(room.id, user, {
|
||||
mediaSourceId: String(payload?.mediaSourceId || previous?.mediaSourceId || ""),
|
||||
status: previous?.status || "PAUSED",
|
||||
position: Number(payload?.position || 0),
|
||||
rate: 1
|
||||
});
|
||||
await broadcastRoom(io, room.slug, room.id);
|
||||
}));
|
||||
|
||||
socket.on("chat:message", (payload) => safeRoomAction(socket, async ({ room, user }) => {
|
||||
const body = String(payload?.body || "").trim().slice(0, 1000);
|
||||
if (!body) return;
|
||||
await prisma.roomMessage.create({ data: { roomId: room.id, userId: user.id, body } });
|
||||
await audit("room.chat.message", user.id, room.id, {});
|
||||
await broadcastRoom(io, room.slug, room.id);
|
||||
}));
|
||||
|
||||
socket.on("disconnect", () => {
|
||||
if (socket.data.roomSlug) {
|
||||
socket.to(socket.data.roomSlug).emit("presence:leave", { user: socket.data.user || "Guest" });
|
||||
const roomSlug = socket.data.roomSlug;
|
||||
const user = socket.data.user;
|
||||
if (roomSlug && user) {
|
||||
removePresence(roomSlug, socket.id);
|
||||
io.to(roomSlug).emit("presence:list", getPresence(roomSlug));
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
function relayRoom(socket, event, payload) {
|
||||
const roomSlug = socket.data.roomSlug;
|
||||
if (!roomSlug) return;
|
||||
io.to(roomSlug).emit(event, { ...payload, user: socket.data.user || "Guest", at: Date.now() });
|
||||
}
|
||||
|
||||
function updateRoom(socket, event, payload) {
|
||||
const roomSlug = socket.data.roomSlug;
|
||||
if (!roomSlug) return;
|
||||
const previous = roomStates.get(roomSlug) || {};
|
||||
const nextState = {
|
||||
...previous,
|
||||
...payload,
|
||||
lastEvent: event,
|
||||
updatedBy: socket.data.user || "Guest",
|
||||
updatedAt: Date.now()
|
||||
};
|
||||
roomStates.set(roomSlug, nextState);
|
||||
io.to(roomSlug).emit(event, nextState);
|
||||
}
|
||||
|
||||
httpServer.listen(port, hostname, () => {
|
||||
console.log(`WatchLink ready on http://${hostname}:${port}`);
|
||||
});
|
||||
});
|
||||
|
||||
async function safeSocket(socket, action) {
|
||||
try {
|
||||
await action();
|
||||
} catch (error) {
|
||||
console.error(error);
|
||||
reject(socket, "Realtime action failed.");
|
||||
}
|
||||
}
|
||||
|
||||
async function safeRoomAction(socket, action) {
|
||||
await safeSocket(socket, async () => {
|
||||
const user = socket.data.user || (await getSocketSession(socket))?.user;
|
||||
const roomSlug = socket.data.roomSlug;
|
||||
if (!user || !roomSlug) return reject(socket, "Join a room before sending actions.");
|
||||
const context = await getRoomContext(roomSlug, user.id);
|
||||
if (!context.allowed) return reject(socket, "You do not have access to this room.");
|
||||
await action({ room: context.room, user });
|
||||
});
|
||||
}
|
||||
|
||||
function reject(socket, message) {
|
||||
socket.emit("room:error", { message });
|
||||
}
|
||||
|
||||
async function broadcastRoom(io, roomSlug, roomId) {
|
||||
io.to(roomSlug).emit("room:state", await buildRoomSnapshot(roomId));
|
||||
}
|
||||
|
||||
async function getSocketSession(socket) {
|
||||
const cookies = parseCookies(socket.handshake.headers.cookie || "");
|
||||
const raw = cookies[sessionCookie];
|
||||
if (!raw) return null;
|
||||
const [userId, signature] = raw.split(".");
|
||||
if (!userId || !signature || !verifySignature(userId, signature)) return null;
|
||||
const user = await prisma.user.findUnique({
|
||||
where: { id: userId },
|
||||
include: { roles: { include: { role: true } } }
|
||||
});
|
||||
if (!user || user.disabledAt) return null;
|
||||
return { user };
|
||||
}
|
||||
|
||||
function parseCookies(header) {
|
||||
return Object.fromEntries(
|
||||
header
|
||||
.split(";")
|
||||
.map((part) => part.trim())
|
||||
.filter(Boolean)
|
||||
.map((part) => {
|
||||
const index = part.indexOf("=");
|
||||
return [decodeURIComponent(part.slice(0, index)), decodeURIComponent(part.slice(index + 1))];
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
function verifySignature(value, signature) {
|
||||
const expected = createHmac("sha256", process.env.NEXTAUTH_SECRET || "development-only-change-me")
|
||||
.update(value)
|
||||
.digest("base64url");
|
||||
const expectedBuffer = Buffer.from(expected);
|
||||
const actualBuffer = Buffer.from(signature);
|
||||
return expectedBuffer.length === actualBuffer.length && timingSafeEqual(expectedBuffer, actualBuffer);
|
||||
}
|
||||
|
||||
async function getRoomContext(slug, userId) {
|
||||
const room = await prisma.room.findUnique({
|
||||
where: { slug },
|
||||
include: {
|
||||
owner: true,
|
||||
members: { where: { userId }, select: { userId: true, canManage: true } }
|
||||
}
|
||||
});
|
||||
if (!room) return { allowed: false, room: null };
|
||||
const user = await prisma.user.findUnique({ where: { id: userId }, include: { roles: { include: { role: true } } } });
|
||||
const isAdmin = Boolean(user?.roles.some((userRole) => userRole.role.name === "admin"));
|
||||
const isOwner = room.ownerId === userId;
|
||||
const explicitMember = room.members.length > 0;
|
||||
const isFriend = room.ownerId
|
||||
? Boolean(
|
||||
await prisma.friendship.findFirst({
|
||||
where: {
|
||||
status: "ACCEPTED",
|
||||
OR: [
|
||||
{ requesterId: userId, receiverId: room.ownerId },
|
||||
{ requesterId: room.ownerId, receiverId: userId }
|
||||
]
|
||||
},
|
||||
select: { id: true }
|
||||
})
|
||||
)
|
||||
: false;
|
||||
const allowed =
|
||||
isAdmin ||
|
||||
isOwner ||
|
||||
room.visibility === "PUBLIC" ||
|
||||
(room.visibility === "FRIENDS" && (isFriend || explicitMember)) ||
|
||||
(room.visibility === "EXPLICIT" && explicitMember) ||
|
||||
(room.visibility === "ROLE_RESTRICTED" && explicitMember);
|
||||
return { allowed, room };
|
||||
}
|
||||
|
||||
async function buildRoomSnapshot(roomId) {
|
||||
const room = await prisma.room.findUnique({
|
||||
where: { id: roomId },
|
||||
include: {
|
||||
mediaSources: {
|
||||
include: { submitter: true },
|
||||
orderBy: [{ queuePosition: "asc" }, { createdAt: "asc" }, { id: "asc" }],
|
||||
take: 60
|
||||
},
|
||||
messages: {
|
||||
include: { user: true },
|
||||
orderBy: { createdAt: "desc" },
|
||||
take: 50
|
||||
}
|
||||
}
|
||||
});
|
||||
if (!room) return null;
|
||||
const queue = room.mediaSources.map((item) => serializeMedia(item));
|
||||
const state = hydrateState(parseState(room.currentState), queue);
|
||||
return {
|
||||
roomId: room.id,
|
||||
roomSlug: room.slug,
|
||||
queue,
|
||||
playback: state,
|
||||
messages: room.messages
|
||||
.slice()
|
||||
.reverse()
|
||||
.map((message) => ({
|
||||
id: message.id,
|
||||
body: message.body,
|
||||
createdAt: message.createdAt.toISOString(),
|
||||
user: message.user?.displayName || message.user?.username || "Deleted user",
|
||||
avatarUrl: message.user?.avatarUrl || null
|
||||
}))
|
||||
};
|
||||
}
|
||||
|
||||
function hydrateState(state, queue) {
|
||||
const current = queue.find((item) => item.id === state?.mediaSourceId) || queue[0] || null;
|
||||
if (!current) return null;
|
||||
const updatedAt = Number(state?.updatedAt || Date.now());
|
||||
const basePosition = Number(state?.position || 0);
|
||||
const status = state?.status === "PLAYING" ? "PLAYING" : "PAUSED";
|
||||
const livePosition = status === "PLAYING" ? basePosition + Math.max(0, Date.now() - updatedAt) / 1000 : basePosition;
|
||||
return {
|
||||
...state,
|
||||
mediaSourceId: current.id,
|
||||
media: current,
|
||||
status,
|
||||
position: livePosition,
|
||||
rate: Number(state?.rate || 1),
|
||||
updatedAt
|
||||
};
|
||||
}
|
||||
|
||||
function serializeMedia(item) {
|
||||
return {
|
||||
id: item.id,
|
||||
title: item.title || item.originalUrl,
|
||||
provider: item.provider,
|
||||
originalUrl: item.originalUrl,
|
||||
playbackUrl: item.playbackUrl,
|
||||
thumbnailUrl: item.thumbnailUrl,
|
||||
by: item.submitter?.displayName || item.submitter?.username || "Unknown",
|
||||
createdAt: new Intl.DateTimeFormat("en", { month: "short", day: "numeric" }).format(item.createdAt)
|
||||
};
|
||||
}
|
||||
|
||||
async function persistPlaybackState(roomId, user, input) {
|
||||
if (!input.mediaSourceId) return;
|
||||
const media = await prisma.mediaSource.findFirst({ where: { id: input.mediaSourceId, roomId } });
|
||||
if (!media) return;
|
||||
await prisma.room.update({
|
||||
where: { id: roomId },
|
||||
data: {
|
||||
currentState: playbackState(user, input)
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
function playbackState(user, input) {
|
||||
return {
|
||||
mediaSourceId: input.mediaSourceId,
|
||||
status: input.status === "PLAYING" ? "PLAYING" : "PAUSED",
|
||||
position: Math.max(0, Number(input.position || 0)),
|
||||
rate: Number(input.rate || 1),
|
||||
updatedBy: user.username,
|
||||
updatedById: user.id,
|
||||
updatedAt: Date.now()
|
||||
};
|
||||
}
|
||||
|
||||
function parseState(value) {
|
||||
return value && typeof value === "object" ? value : null;
|
||||
}
|
||||
|
||||
async function moveMedia(roomId, mediaSourceId, direction) {
|
||||
const queue = await prisma.mediaSource.findMany({
|
||||
where: { roomId },
|
||||
orderBy: [{ queuePosition: "asc" }, { createdAt: "asc" }, { id: "asc" }],
|
||||
select: { id: true }
|
||||
});
|
||||
const index = queue.findIndex((item) => item.id === mediaSourceId);
|
||||
const target = index + direction;
|
||||
if (index < 0 || target < 0 || target >= queue.length) return;
|
||||
const reordered = [...queue];
|
||||
[reordered[index], reordered[target]] = [reordered[target], reordered[index]];
|
||||
await prisma.$transaction(
|
||||
reordered.map((item, itemIndex) =>
|
||||
prisma.mediaSource.update({ where: { id: item.id }, data: { queuePosition: itemIndex + 1 } })
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
async function normalizeQueue(roomId) {
|
||||
const queue = await prisma.mediaSource.findMany({
|
||||
where: { roomId },
|
||||
orderBy: [{ queuePosition: "asc" }, { createdAt: "asc" }, { id: "asc" }],
|
||||
select: { id: true }
|
||||
});
|
||||
await prisma.$transaction(
|
||||
queue.map((item, index) =>
|
||||
prisma.mediaSource.update({ where: { id: item.id }, data: { queuePosition: index + 1 } })
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
function addPresence(roomSlug, user, socketId) {
|
||||
const rows = presence.get(roomSlug) || new Map();
|
||||
const existing = rows.get(user.id);
|
||||
rows.set(user.id, {
|
||||
id: user.id,
|
||||
name: user.displayName || user.username,
|
||||
avatarUrl: user.avatarUrl || null,
|
||||
status: "Online",
|
||||
sockets: new Set([...(existing?.sockets || []), socketId])
|
||||
});
|
||||
presence.set(roomSlug, rows);
|
||||
}
|
||||
|
||||
function removePresence(roomSlug, socketId) {
|
||||
const rows = presence.get(roomSlug);
|
||||
if (!rows) return;
|
||||
for (const user of rows.values()) {
|
||||
user.sockets.delete(socketId);
|
||||
if (user.sockets.size === 0) rows.delete(user.id);
|
||||
}
|
||||
}
|
||||
|
||||
function getPresence(roomSlug) {
|
||||
return [...(presence.get(roomSlug)?.values() || [])].map(({ sockets, ...user }) => user);
|
||||
}
|
||||
|
||||
async function getAppSettings() {
|
||||
const rows = await prisma.appSetting.findMany();
|
||||
const values = new Map(rows.map((row) => [row.key, row.value]));
|
||||
return {
|
||||
allowedProviders: (values.get("allowedProviders") || "YOUTUBE,TWITCH,DIRECT")
|
||||
.split(",")
|
||||
.map((item) => item.trim())
|
||||
.filter(Boolean)
|
||||
};
|
||||
}
|
||||
|
||||
async function audit(action, actorId, roomId, metadata) {
|
||||
await prisma.auditEvent.create({ data: { action, actorId, roomId, metadata } }).catch(() => {});
|
||||
}
|
||||
|
||||
function normalizeMediaUrl(input) {
|
||||
const originalUrl = input.trim();
|
||||
try {
|
||||
const url = new URL(originalUrl);
|
||||
const host = url.hostname.replace(/^www\./, "");
|
||||
if (host === "youtu.be") {
|
||||
return youtubeMedia(originalUrl, url.pathname.slice(1));
|
||||
}
|
||||
if (host.endsWith("youtube.com")) {
|
||||
return youtubeMedia(originalUrl, url.searchParams.get("v") || url.pathname.split("/").filter(Boolean).pop());
|
||||
}
|
||||
if (host.endsWith("twitch.tv")) {
|
||||
const parts = url.pathname.split("/").filter(Boolean);
|
||||
if (parts[0] === "videos" && parts[1]) {
|
||||
return {
|
||||
provider: "TWITCH",
|
||||
originalUrl,
|
||||
playbackUrl: `https://player.twitch.tv/?video=${parts[1]}&parent=localhost`,
|
||||
thumbnailUrl: "/icon.svg",
|
||||
title: originalUrl
|
||||
};
|
||||
}
|
||||
if (parts[0]) {
|
||||
return {
|
||||
provider: "TWITCH",
|
||||
originalUrl,
|
||||
playbackUrl: `https://player.twitch.tv/?channel=${parts[0]}&parent=localhost`,
|
||||
thumbnailUrl: "/icon.svg",
|
||||
title: originalUrl
|
||||
};
|
||||
}
|
||||
}
|
||||
if (/\.(mp4|webm|ogg|mov|m4v)(\?.*)?$/i.test(url.pathname)) {
|
||||
return { provider: "DIRECT", originalUrl, playbackUrl: originalUrl, thumbnailUrl: "/icon.svg", title: originalUrl };
|
||||
}
|
||||
} catch {}
|
||||
return { provider: "UNKNOWN", originalUrl, playbackUrl: originalUrl, thumbnailUrl: "/icon.svg", title: originalUrl };
|
||||
}
|
||||
|
||||
function youtubeMedia(originalUrl, id) {
|
||||
const videoId = String(id || "").replace(/[^a-zA-Z0-9_-]/g, "");
|
||||
return {
|
||||
provider: videoId ? "YOUTUBE" : "UNKNOWN",
|
||||
originalUrl,
|
||||
playbackUrl: videoId ? `https://www.youtube.com/embed/${videoId}?enablejsapi=1&playsinline=1` : originalUrl,
|
||||
thumbnailUrl: videoId ? `https://img.youtube.com/vi/${videoId}/hqdefault.jpg` : "/icon.svg",
|
||||
title: originalUrl
|
||||
};
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user