Skip to content

Commit

Permalink
feat: 변경 사항에 대한 병합처리를 위한 queue 기능 구현
Browse files Browse the repository at this point in the history
  • Loading branch information
yangchef1 committed Nov 19, 2024
1 parent f7da3c2 commit b6281a7
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 57 deletions.
1 change: 1 addition & 0 deletions apps/server/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
"passport-jwt": "^4.0.1",
"reflect-metadata": "^0.2.0",
"rxjs": "^7.8.1",
"sharedb": "^5.1.1",
"typeorm": "^0.3.20"
},
"devDependencies": {
Expand Down
1 change: 1 addition & 0 deletions apps/server/sharedb.d.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
declare module 'sharedb';
12 changes: 10 additions & 2 deletions apps/server/src/project/controller/project.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ export class ProjectController {
await this.projectService.getProject(user.id, projectId)
);
}
constructor(
private projectService: ProjectService,
private taskService: TaskService
) {}

@Get(':id/members')
async getMembers(@AuthUser() user: Account, @Param('id') projectId: number) {
Expand Down Expand Up @@ -91,7 +95,11 @@ export class ProjectController {
}

@Post(':id/update')
async handleEvent(@AuthUser() user: Account, @Body() taskEvent: TaskEvent) {
async handleEvent(
@AuthUser() user: Account,
@Param('id') projectId: number,
@Body() taskEvent: TaskEvent
) {
const event = taskEvent.event;
let response;
switch (event) {
Expand All @@ -106,7 +114,7 @@ export class ProjectController {
break;
case EventType.INSERT_TITLE:
case EventType.DELETE_TITLE:
// response = await this.taskService.update(user.id, taskEvent);
response = await this.taskService.enqueue(user.id, projectId, taskEvent);
break;
default:
throw new BadRequestException('올바르지 않은 이벤트 타입입니다.');
Expand Down
114 changes: 59 additions & 55 deletions apps/server/src/task/service/task.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ import { Repository } from 'typeorm';
import { LexoRank } from 'lexorank';
import { Task } from '@/task/domain/task.entity';
import { Section } from '@/task/domain/section.entity';
import { UpdateTaskRequest } from '@/task/dto/update-task-request.dto';
import { UpdateTaskResponse } from '@/task/dto/update-task-response.dto';
import { MoveTaskRequest } from '@/task/dto/move-task-request.dto';
import { MoveTaskResponse } from '@/task/dto/move-task-response.dto';
import { TaskResponse } from '@/task/dto/task-response.dto';
Expand All @@ -14,13 +12,16 @@ import { CreateTaskResponse } from '@/task/dto/create-task-response.dto';
import { Project } from '@/project/entity/project.entity';
import { CreateTaskRequest } from '@/task/dto/create-task-request.dto';
import { CustomResponse } from '@/task/domain/custom-response.interface';
import { Snapshot } from '../domain/snapshot';
import { EventEmitter2 } from '@nestjs/event-emitter';
import { TaskEvent } from '../dto/task-event.dto';
import { EventType } from '../domain/eventType.enum';
import ShareDB from 'sharedb';

const json0 = ShareDB.types.json0;

@Injectable()
export class TaskService {
private snapshots: Map<string, Snapshot> = new Map();
private operations: Map<string, []> = new Map();
private operations: Map<string, TaskEvent[]> = new Map();
private connections: Map<string, CustomResponse[]> = new Map();

constructor(
Expand All @@ -31,7 +32,11 @@ export class TaskService {
@InjectRepository(Project)
private projectRepository: Repository<Project>,
private eventEmitter: EventEmitter2
) {}
) {
this.eventEmitter.on('operationAdded', async (userId: number, projectId: number) => {
await this.dequeue(userId, projectId);
});
}

addConnection(projectId: number, res: CustomResponse) {
if (!this.connections.has(projectId.toString())) {
Expand All @@ -53,37 +58,59 @@ export class TaskService {
if (!connections) {
return;
}
}

connections.forEach((res) => {
if (res.userId !== userId) {
const snapshot = this.snapshots.get(projectId.toString());
if (res.headersSent) {
return;
}
res.json({
status: 200,
message: '스냅샷에 변경 사항이 발생했습니다.',
result: {
version: snapshot.version,
project: snapshot.project,
},
});
}
});
async enqueue(userId: number, projectId: number, taskEvent: TaskEvent) {
const key = projectId.toString();
const currentEvents = this.operations.get(key) || [];
this.operations.set(key, [...currentEvents, taskEvent]);
this.eventEmitter.emit('operationAdded', userId, projectId);
}

private async dequeue(userId: number, projectId: number) {
const key = projectId.toString();
const changes = this.operations.get(key);
if (!changes) {
return;
}

while (changes) {
const change = changes.shift();
const existing = await this.findTaskOrThrow(change.taskId);
const result = this.merge(change, existing);
this.taskRepository.save(result);
this.sendConnection(projectId, userId);
}
}

private updateSnapshot(
projectId: number,
prevSectionId: number,
userId: number,
savedTask: Task
private merge(change: TaskEvent, existing: Task) {
const updateTitle = change.title;
const existingTitle = existing.title;
const event = change.event;
const op = this.convertToShareDbOp(event, updateTitle, existingTitle);
const newTitle = json0.type.apply(existingTitle, op);

return { ...existing, title: newTitle };
}

private convertToShareDbOp(
event: EventType,
updateTitle: UpdateInformation,
existingTitle: string
) {
const snapshot = this.snapshots.get(projectId.toString());
if (!snapshot) {
throw new NotFoundException('Snapshot not found');
const { content, position, length } = updateTitle;

switch (event) {
case EventType.INSERT_TITLE:
return [{ p: [position], si: content }];
case EventType.DELETE_TITLE:
return [
{
p: [position],
sd: existingTitle.slice(position, position + length),
},
];
}
snapshot.update(prevSectionId, savedTask);
this.sendConnection(projectId, userId);
}

async create(createTaskRequest: CreateTaskRequest) {
Expand Down Expand Up @@ -127,32 +154,9 @@ export class TaskService {
});
});

this.snapshots.set(projectId.toString(), new Snapshot(taskBySection));

return taskBySection;
}

async update(id: number, userId: number, updateTaskRequest: UpdateTaskRequest) {
const prevTask = await this.findTaskOrThrow(id);
const prevSectionId = prevTask.section.id;
const projectId = prevTask.section.project.id;

const newTask = new Task();
newTask.title = updateTaskRequest.title ?? prevTask.title;
newTask.description = updateTaskRequest.description ?? prevTask.description;

if (updateTaskRequest.sectionId) {
const section = await this.findSectionOrThrow(updateTaskRequest.sectionId);
newTask.section = section;
}

const savedTask = await this.taskRepository.save(newTask);

this.updateSnapshot(projectId, prevSectionId, userId, savedTask);

return new UpdateTaskResponse(savedTask);
}

async move(id: number, moveTaskRequest: MoveTaskRequest) {
const task = await this.findTaskOrThrow(id);

Expand Down

0 comments on commit b6281a7

Please sign in to comment.