Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Modify Snapshot Event to publish updated local changes #923

Merged
merged 10 commits into from
Nov 7, 2024
51 changes: 44 additions & 7 deletions examples/vanilla-codemirror6/src/main.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
/* eslint-disable jsdoc/require-jsdoc */
import yorkie, { DocEventType } from 'yorkie-js-sdk';
import type { TextOperationInfo, EditOpInfo } from 'yorkie-js-sdk';
import type { EditOpInfo, OperationInfo } from 'yorkie-js-sdk';
import { basicSetup, EditorView } from 'codemirror';
import { keymap } from '@codemirror/view';
import {
markdown,
markdownKeymap,
markdownLanguage,
} from '@codemirror/lang-markdown';
import { Transaction } from '@codemirror/state';
import { Transaction, TransactionSpec } from '@codemirror/state';
import { network } from './network';
import { displayLog, displayPeers } from './utils';
import { YorkieDoc } from './type';
import { YorkieDoc, YorkiePresence } from './type';
import './style.css';

const editorParentElem = document.getElementById('editor')!;
Expand All @@ -28,7 +28,7 @@ async function main() {
await client.activate();

// 02-1. create a document then attach it into the client.
const doc = new yorkie.Document<YorkieDoc>(
const doc = new yorkie.Document<YorkieDoc, YorkiePresence>(
`codemirror6-${new Date()
.toISOString()
.substring(0, 10)
Expand All @@ -55,10 +55,21 @@ async function main() {
// 02-2. subscribe document event.
const syncText = () => {
const text = doc.getRoot().content;
view.dispatch({
const selection = doc.getMyPresence().selection;
const transactionSpec: TransactionSpec = {
changes: { from: 0, to: view.state.doc.length, insert: text.toString() },
annotations: [Transaction.remote.of(true)],
});
};

if (selection) {
// Restore the cursor position when the text is replaced.
const cursor = text.posRangeToIndexRange(selection);
transactionSpec['selection'] = {
anchor: cursor[0],
head: cursor[1],
};
}
view.dispatch(transactionSpec);
};
doc.subscribe((event) => {
if (event.type === 'snapshot') {
Expand Down Expand Up @@ -98,6 +109,32 @@ async function main() {
});
}
}

const hasFocus =
viewUpdate.view.hasFocus && viewUpdate.view.dom.ownerDocument.hasFocus();
const sel = hasFocus ? viewUpdate.state.selection.main : null;

doc.update((root, presence) => {
if (sel && root.content) {
const selection = root.content.indexRangeToPosRange([
sel.anchor,
sel.head,
]);

if (
JSON.stringify(selection) !==
JSON.stringify(presence.get('selection'))
) {
presence.set({
selection,
});
}
} else if (presence.get('selection')) {
presence.set({
selection: undefined,
});
}
});
});

// 03-2. create codemirror instance
Expand All @@ -113,7 +150,7 @@ async function main() {
});

// 03-3. define event handler that apply remote changes to local
function handleOperations(operations: Array<TextOperationInfo>) {
function handleOperations(operations: Array<OperationInfo>) {
for (const op of operations) {
if (op.type === 'edit') {
handleEditOp(op);
Expand Down
6 changes: 5 additions & 1 deletion examples/vanilla-codemirror6/src/type.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
import { type Text } from 'yorkie-js-sdk';
import { TextPosStructRange, type Text } from 'yorkie-js-sdk';

export type YorkieDoc = {
content: Text;
};

export type YorkiePresence = {
selection?: TextPosStructRange;
};
40 changes: 25 additions & 15 deletions packages/sdk/src/document/document.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1148,6 +1148,21 @@ export class Document<T, P extends Indexable = Indexable> {
return targetPath.every((path, index) => path === nodePath[index]);
}

/**
* `removeAppliedLocalChanges` removes local changes applied to the server.
*
* @param clientSeq - client sequence number to remove local changes before it
*/
private removeAppliedLocalChanges(clientSeq: number) {
while (this.localChanges.length) {
const change = this.localChanges[0];
if (change.getID().getClientSeq() > clientSeq) {
break;
}
this.localChanges.shift();
}
}

/**
* `applyChangePack` applies the given change pack into this document.
* 1. Remove local changes applied to server.
Expand All @@ -1165,27 +1180,14 @@ export class Document<T, P extends Indexable = Indexable> {
pack.getCheckpoint().getServerSeq(),
pack.getVersionVector()!,
pack.getSnapshot()!,
pack.getCheckpoint().getClientSeq(),
);
} else if (pack.hasChanges()) {
this.applyChanges(pack.getChanges(), OpSource.Remote);
}

// 02. Remove local changes applied to server.
while (this.localChanges.length) {
const change = this.localChanges[0];
if (change.getID().getClientSeq() > pack.getCheckpoint().getClientSeq()) {
break;
}
this.localChanges.shift();
}

// NOTE(hackerwins): If the document has local changes, we need to apply
// them after applying the snapshot. We need to treat the local changes
// as remote changes because the application should apply the local
// changes to their own document.
if (hasSnapshot) {
this.applyChanges(this.localChanges, OpSource.Remote);
}
this.removeAppliedLocalChanges(pack.getCheckpoint().getClientSeq());

// 03. Update the checkpoint.
this.checkpoint = this.checkpoint.forward(pack.getCheckpoint());
Expand Down Expand Up @@ -1412,6 +1414,7 @@ export class Document<T, P extends Indexable = Indexable> {
serverSeq: bigint,
snapshotVector: VersionVector,
snapshot?: Uint8Array,
clientSeq: number = -1,
) {
const { root, presences } = converter.bytesToSnapshot<P>(snapshot);
this.root = new CRDTRoot(root);
Expand All @@ -1421,6 +1424,13 @@ export class Document<T, P extends Indexable = Indexable> {
// drop clone because it is contaminated.
this.clone = undefined;

this.removeAppliedLocalChanges(clientSeq);

// NOTE(hackerwins): If the document has local changes, we need to apply
// them after applying the snapshot, as local changes are not included in the snapshot data.
// Afterward, we should publish a snapshot event with the latest
// version of the document to ensure the user receives the most up-to-date snapshot.
this.applyChanges(this.localChanges, OpSource.Local);
this.publish([
{
type: DocEventType.Snapshot,
Expand Down
2 changes: 1 addition & 1 deletion packages/sdk/test/integration/client_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -852,7 +852,7 @@ describe.sequential('Client', function () {
await c2.sync();

// 01. c1 increases the counter for creating snapshot.
for (let i = 0; i < 500; i++) {
for (let i = 0; i < 1000; i++) {
d1.update((r) => r.counter.increase(1));
}
await c1.sync();
Expand Down
38 changes: 37 additions & 1 deletion packages/sdk/test/unit/document/document_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,14 @@ import {

import { Document, DocEventType } from '@yorkie-js-sdk/src/document/document';
import { OperationInfo } from '@yorkie-js-sdk/src/document/operation/operation';
import { JSONArray, Text, Counter, Tree } from '@yorkie-js-sdk/src/yorkie';
import yorkie, {
JSONArray,
Text,
Counter,
Tree,
} from '@yorkie-js-sdk/src/yorkie';
import { CounterType } from '@yorkie-js-sdk/src/document/crdt/counter';
import { withTwoClientsAndDocuments } from '@yorkie-js-sdk/test/integration/integration_helper';

describe.sequential('Document', function () {
afterEach(() => {
Expand Down Expand Up @@ -1490,4 +1496,34 @@ describe.sequential('Document', function () {
});
});
});

it('should publish snapshot event with up-to-date document', async function ({
task,
}) {
type TestDoc = { counter: Counter };
await withTwoClientsAndDocuments<TestDoc>(async (c1, d1, c2, d2) => {
const eventCollector = new EventCollector<number>();
d2.subscribe((event) => {
if (event.type === DocEventType.Snapshot) {
eventCollector.add(d2.getRoot().counter.getValue() as number);
}
});

d1.update((r) => (r.counter = new Counter(yorkie.IntType, 0)));
await c1.sync();
await c2.sync();

// 01. c1 increases the counter for creating snapshot.
for (let i = 0; i < 1000; i++) {
d1.update((r) => r.counter.increase(1));
}
await c1.sync();

// 02. c2 receives the snapshot and increases the counter simultaneously.
c2.sync();
d2.update((r) => r.counter.increase(1));

await eventCollector.waitAndVerifyNthEvent(1, 1001);
}, task.name);
});
});
Loading