diff --git a/native/cpp/CommonCpp/DatabaseManagers/CMakeLists.txt b/native/cpp/CommonCpp/DatabaseManagers/CMakeLists.txt --- a/native/cpp/CommonCpp/DatabaseManagers/CMakeLists.txt +++ b/native/cpp/CommonCpp/DatabaseManagers/CMakeLists.txt @@ -17,6 +17,7 @@ "entities/SQLiteDataConverters.h" "entities/Draft.h" "entities/Holder.h" + "entities/QueuedDMOperation.h" "entities/Media.h" "entities/Message.h" "entities/Metadata.h" diff --git a/native/cpp/CommonCpp/DatabaseManagers/DatabaseQueryExecutor.h b/native/cpp/CommonCpp/DatabaseManagers/DatabaseQueryExecutor.h --- a/native/cpp/CommonCpp/DatabaseManagers/DatabaseQueryExecutor.h +++ b/native/cpp/CommonCpp/DatabaseManagers/DatabaseQueryExecutor.h @@ -18,6 +18,7 @@ #include "entities/OlmPersistSession.h" #include "entities/OutboundP2PMessage.h" #include "entities/PersistItem.h" +#include "entities/QueuedDMOperation.h" #include "entities/Report.h" #include "entities/SyncedMetadataEntry.h" #include "entities/Thread.h" @@ -213,6 +214,14 @@ virtual void replaceHolder(const Holder &holder) const = 0; virtual void removeHolders(const std::vector &hashes) const = 0; virtual std::vector getHolders() const = 0; + virtual void + addQueuedDMOperation(const QueuedDMOperation &operation) const = 0; + virtual void + removeQueuedDMOperationsOlderThan(const std::string ×tamp) const = 0; + virtual void clearQueuedDMOperations( + const std::string &queueType, + const std::string &queueKey) const = 0; + virtual std::vector getQueuedDMOperations() const = 0; virtual void removeLocalMessageInfos(bool includeNonLocalMessages) const = 0; virtual ~DatabaseQueryExecutor() = default; diff --git a/native/cpp/CommonCpp/DatabaseManagers/SQLiteQueryExecutor.h b/native/cpp/CommonCpp/DatabaseManagers/SQLiteQueryExecutor.h --- a/native/cpp/CommonCpp/DatabaseManagers/SQLiteQueryExecutor.h +++ b/native/cpp/CommonCpp/DatabaseManagers/SQLiteQueryExecutor.h @@ -14,6 +14,7 @@ #include "entities/Media.h" #include "entities/Message.h" #include "entities/MessageSearchResult.h" +#include "entities/QueuedDMOperation.h" #include "entities/SQLiteStatementWrapper.h" #include "entities/ThreadActivityEntry.h" #include "entities/UserInfo.h" @@ -204,6 +205,13 @@ void replaceHolder(const Holder &holder) const override; void removeHolders(const std::vector &hashes) const override; std::vector getHolders() const override; + void addQueuedDMOperation(const QueuedDMOperation &operation) const override; + void removeQueuedDMOperationsOlderThan( + const std::string ×tamp) const override; + void clearQueuedDMOperations( + const std::string &queueType, + const std::string &queueKey) const override; + std::vector getQueuedDMOperations() const override; void removeLocalMessageInfos(bool includeNonLocalMessages) const override; }; diff --git a/native/cpp/CommonCpp/DatabaseManagers/SQLiteQueryExecutor.cpp b/native/cpp/CommonCpp/DatabaseManagers/SQLiteQueryExecutor.cpp --- a/native/cpp/CommonCpp/DatabaseManagers/SQLiteQueryExecutor.cpp +++ b/native/cpp/CommonCpp/DatabaseManagers/SQLiteQueryExecutor.cpp @@ -1870,4 +1870,69 @@ } } +void SQLiteQueryExecutor::addQueuedDMOperation( + const QueuedDMOperation &operation) const { + static std::string query = + "INSERT INTO queued_dm_operations( " + " queue_type, queue_key, operation_data, timestamp) " + "VALUES (:queue_type, :queue_key, :operation_data, :timestamp);"; + SQLiteQueuedDMOperation sqliteOperation = + operation.toSQLiteQueuedDMOperation(); + replaceEntity( + this->getConnection(), query, sqliteOperation); +} + +void SQLiteQueryExecutor::removeQueuedDMOperationsOlderThan( + const std::string ×tamp) const { + static std::string query = + "DELETE FROM queued_dm_operations " + "WHERE timestamp < :timestamp;"; + + SQLiteStatementWrapper preparedSQL( + getConnection(), query, "removeQueuedDMOperationsOlderThan"); + int timestamp_index = sqlite3_bind_parameter_index(preparedSQL, ":timestamp"); + bindInt64ToSQL(std::stoll(timestamp), preparedSQL, timestamp_index); + sqlite3_step(preparedSQL); +} + +void SQLiteQueryExecutor::clearQueuedDMOperations( + const std::string &queueType, + const std::string &queueKey) const { + static std::string query = + "DELETE FROM queued_dm_operations " + "WHERE queue_type = :queue_type AND queue_key = :queue_key;"; + + SQLiteStatementWrapper preparedSQL( + getConnection(), query, "clearQueuedDMOperations"); + + int queue_type_index = + sqlite3_bind_parameter_index(preparedSQL, ":queue_type"); + bindStringToSQL(queueType, preparedSQL, queue_type_index); + + int queue_key_index = sqlite3_bind_parameter_index(preparedSQL, ":queue_key"); + bindStringToSQL(queueKey, preparedSQL, queue_key_index); + + sqlite3_step(preparedSQL); +} + +std::vector +SQLiteQueryExecutor::getQueuedDMOperations() const { + static std::string query = + "SELECT queue_type, queue_key, operation_data, timestamp " + "FROM queued_dm_operations " + "ORDER BY timestamp ASC;"; + + std::vector sqliteOperations = + getAllEntities(this->getConnection(), query); + + std::vector operations; + operations.reserve(sqliteOperations.size()); + + for (const auto &sqliteOp : sqliteOperations) { + operations.emplace_back(sqliteOp); + } + + return operations; +} + } // namespace comm diff --git a/native/cpp/CommonCpp/DatabaseManagers/entities/QueuedDMOperation.h b/native/cpp/CommonCpp/DatabaseManagers/entities/QueuedDMOperation.h new file mode 100644 --- /dev/null +++ b/native/cpp/CommonCpp/DatabaseManagers/entities/QueuedDMOperation.h @@ -0,0 +1,92 @@ +#pragma once + +#include "SQLiteDataConverters.h" +#include +#include + +namespace comm { + +struct SQLiteQueuedDMOperation { + std::string queue_type; + std::string queue_key; + std::string operation_data; + int64_t timestamp; + + static SQLiteQueuedDMOperation fromSQLResult(sqlite3_stmt *sqlRow, int idx) { + return SQLiteQueuedDMOperation{ + getStringFromSQLRow(sqlRow, idx), + getStringFromSQLRow(sqlRow, idx + 1), + getStringFromSQLRow(sqlRow, idx + 2), + getInt64FromSQLRow(sqlRow, idx + 3), + }; + } + + int bindToSQL(sqlite3_stmt *sql, int idx) const { + int res; + + int queue_type_index = sqlite3_bind_parameter_index(sql, ":queue_type"); + res = bindStringToSQL(queue_type, sql, queue_type_index); + if (res != SQLITE_OK) { + return res; + } + + int queue_key_index = sqlite3_bind_parameter_index(sql, ":queue_key"); + res = bindStringToSQL(queue_key, sql, queue_key_index); + if (res != SQLITE_OK) { + return res; + } + + int operation_data_index = + sqlite3_bind_parameter_index(sql, ":operation_data"); + res = bindStringToSQL(operation_data, sql, operation_data_index); + if (res != SQLITE_OK) { + return res; + } + + int timestamp_index = sqlite3_bind_parameter_index(sql, ":timestamp"); + res = bindInt64ToSQL(timestamp, sql, timestamp_index); + if (res != SQLITE_OK) { + return res; + } + + return res; + } +}; + +struct QueuedDMOperation { + std::string queue_type; + std::string queue_key; + std::string operation_data; + std::string timestamp; + + QueuedDMOperation() = default; + + QueuedDMOperation( + const std::string &queue_type, + const std::string &queue_key, + const std::string &operation_data, + const std::string ×tamp) + : queue_type(queue_type), + queue_key(queue_key), + operation_data(operation_data), + timestamp(timestamp) { + } + + QueuedDMOperation(const SQLiteQueuedDMOperation &op) { + queue_type = op.queue_type; + queue_key = op.queue_key; + operation_data = op.operation_data; + timestamp = std::to_string(op.timestamp); + } + + SQLiteQueuedDMOperation toSQLiteQueuedDMOperation() const { + SQLiteQueuedDMOperation op; + op.queue_type = queue_type; + op.queue_key = queue_key; + op.operation_data = operation_data; + op.timestamp = std::stoll(timestamp); + return op; + } +}; + +} // namespace comm diff --git a/native/ios/Comm.xcodeproj/project.pbxproj b/native/ios/Comm.xcodeproj/project.pbxproj --- a/native/ios/Comm.xcodeproj/project.pbxproj +++ b/native/ios/Comm.xcodeproj/project.pbxproj @@ -280,6 +280,7 @@ 8EA59BD82A73DAB000EB4F53 /* rustJSI.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = rustJSI.h; sourceTree = ""; }; 8ECF4E162DDDD9EF0079D3D6 /* WebSQLiteConnectionManager.h */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.c.h; path = WebSQLiteConnectionManager.h; sourceTree = ""; }; 8ECF4E172DDDD9EF0079D3D6 /* WebSQLiteConnectionManager.cpp */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.cpp.cpp; path = WebSQLiteConnectionManager.cpp; sourceTree = ""; }; + 8EDBEFE52E310EE5002FFFB7 /* QueuedDMOperation.h */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.c.h; path = QueuedDMOperation.h; sourceTree = ""; }; 8EE6E49F2A39CCAB00AE6BCD /* ReportStoreOperations.h */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.c.h; path = ReportStoreOperations.h; sourceTree = ""; }; 8EE6E4A02A39CCAB00AE6BCD /* DraftStoreOperations.h */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.c.h; path = DraftStoreOperations.h; sourceTree = ""; }; 8EF0F6012DCA43CD00F2B171 /* SQLiteBackup.h */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.c.h; path = SQLiteBackup.h; sourceTree = ""; }; @@ -594,6 +595,7 @@ 71BE84442636A944002849D2 /* entities */ = { isa = PBXGroup; children = ( + 8EDBEFE52E310EE5002FFFB7 /* QueuedDMOperation.h */, 8E189A282E16D1B500E11E87 /* Holder.h */, 0E02676D2D81EAD800788249 /* DMOperation.h */, 816D2D5B2C480E9E001C0B67 /* MessageSearchResult.h */, diff --git a/web/cpp/SQLiteQueryExecutorBindings.cpp b/web/cpp/SQLiteQueryExecutorBindings.cpp --- a/web/cpp/SQLiteQueryExecutorBindings.cpp +++ b/web/cpp/SQLiteQueryExecutorBindings.cpp @@ -145,6 +145,12 @@ .field("holder", &Holder::holder) .field("status", &Holder::status); + value_object("QueuedDMOperation") + .field("queueType", &QueuedDMOperation::queue_type) + .field("queueKey", &QueuedDMOperation::queue_key) + .field("operationData", &QueuedDMOperation::operation_data) + .field("timestamp", &QueuedDMOperation::timestamp); + class_("SQLiteQueryExecutor") .constructor() .function("migrate", &SQLiteQueryExecutor::migrate) @@ -353,6 +359,16 @@ .function("replaceHolder", &SQLiteQueryExecutor::replaceHolder) .function("removeHolders", &SQLiteQueryExecutor::removeHolders) .function("getHolders", &SQLiteQueryExecutor::getHolders) + .function( + "addQueuedDMOperation", &SQLiteQueryExecutor::addQueuedDMOperation) + .function( + "removeQueuedDMOperationsOlderThan", + &SQLiteQueryExecutor::removeQueuedDMOperationsOlderThan) + .function( + "clearQueuedDMOperations", + &SQLiteQueryExecutor::clearQueuedDMOperations) + .function( + "getQueuedDMOperations", &SQLiteQueryExecutor::getQueuedDMOperations) .function( "removeLocalMessageInfos", &SQLiteQueryExecutor::removeLocalMessageInfos); diff --git a/web/shared-worker/_generated/comm_query_executor.wasm b/web/shared-worker/_generated/comm_query_executor.wasm index 0000000000000000000000000000000000000000..0000000000000000000000000000000000000000 GIT binary patch literal 0 Hc$@ { + let queryExecutor: ?SQLiteQueryExecutor = null; + let dbModule: ?EmscriptenModule = null; + + beforeAll(async () => { + dbModule = getDatabaseModule(); + }); + + beforeEach(() => { + if (!dbModule) { + throw new Error('Database module is missing'); + } + queryExecutor = new dbModule.SQLiteQueryExecutor(FILE_PATH, false); + if (!queryExecutor) { + throw new Error('SQLiteQueryExecutor is missing'); + } + queryExecutor?.addQueuedDMOperation(TEST_OPERATION_1); + queryExecutor?.addQueuedDMOperation(TEST_OPERATION_2); + queryExecutor?.addQueuedDMOperation(TEST_OPERATION_3); + }); + + afterEach(() => { + if (!dbModule || !queryExecutor) { + return; + } + clearSensitiveData(dbModule, FILE_PATH, queryExecutor); + }); + + it('should return all queued operations', () => { + const operations = queryExecutor?.getQueuedDMOperations() ?? []; + expect(operations?.length).toBe(3); + + // Operations should be ordered by timestamp + const timestamps = operations.map(op => op.timestamp); + expect(timestamps).toEqual([ + '1642500000000', + '1642500001000', + '1642500002000', + ]); + }); + + it('should add new queued operation', () => { + const newOperation: ClientDBQueuedDMOperation = { + queueType: 'entry', + queueKey: 'entry123', + operationData: + '{"type":"create_entry","entryID":"entry123","text":"New entry"}', + timestamp: '1642500003000', + }; + + queryExecutor?.addQueuedDMOperation(newOperation); + const operations = queryExecutor?.getQueuedDMOperations() ?? []; + expect(operations?.length).toBe(4); + + const addedOperation = operations.find(op => op.queueKey === 'entry123'); + expect(addedOperation).toBeDefined(); + expect(addedOperation?.operationData).toContain('New entry'); + }); + + it('should remove operations older than timestamp', () => { + // Remove operations older than 1642500001500 (between operation 2 and 3) + queryExecutor?.removeQueuedDMOperationsOlderThan('1642500001500'); + + const remainingOperations = queryExecutor?.getQueuedDMOperations() ?? []; + expect(remainingOperations?.length).toBe(1); + + // Only the newest operation should remain + expect(remainingOperations[0].timestamp).toBe('1642500002000'); + expect(remainingOperations[0].queueKey).toBe('msg101'); + }); + + it('should clear operations for specific queue', () => { + // Clear all operations for the thread queue + queryExecutor?.clearQueuedDMOperations('thread', 'thread123'); + + const operations = queryExecutor?.getQueuedDMOperations() ?? []; + expect(operations?.length).toBe(2); + + // Thread operation should be gone + const threadOperations = operations.filter(op => op.queueType === 'thread'); + expect(threadOperations?.length).toBe(0); + + // Other operations should remain + const queueKeys = operations.map(op => op.queueKey); + expect(queueKeys).toContain('thread456#user789'); + expect(queueKeys).toContain('msg101'); + expect(queueKeys).not.toContain('thread123'); + }); + + it('should clear membership operations correctly', () => { + // Clear membership operation for specific thread and user + queryExecutor?.clearQueuedDMOperations('membership', 'thread456#user789'); + + const operations = queryExecutor?.getQueuedDMOperations() ?? []; + expect(operations?.length).toBe(2); + + // Membership operation should be gone + const membershipOperations = operations.filter( + op => op.queueType === 'membership', + ); + expect(membershipOperations?.length).toBe(0); + + // Other operations should remain + const queueKeys = operations.map(op => op.queueKey); + expect(queueKeys).toContain('thread123'); + expect(queueKeys).toContain('msg101'); + expect(queueKeys).not.toContain('thread456#user789'); + }); + + it('should handle empty results gracefully', () => { + // Clear all operations by removing everything older than current timestamp + const currentTimestamp = Date.now().toString(); + queryExecutor?.removeQueuedDMOperationsOlderThan(currentTimestamp); + + const operations = queryExecutor?.getQueuedDMOperations() ?? []; + expect(operations?.length).toBe(0); + }); +}); diff --git a/web/shared-worker/types/sqlite-query-executor.js b/web/shared-worker/types/sqlite-query-executor.js --- a/web/shared-worker/types/sqlite-query-executor.js +++ b/web/shared-worker/types/sqlite-query-executor.js @@ -51,6 +51,13 @@ +medias: $ReadOnlyArray, }; +export type ClientDBQueuedDMOperation = { + +queueType: string, + +queueKey: string, + +operationData: string, + +timestamp: string, +}; + declare export class SQLiteQueryExecutor { constructor(sqliteFilePath: string, skipMigration: boolean): void; @@ -232,6 +239,11 @@ removeHolders(hashes: $ReadOnlyArray): void; getHolders(): $ReadOnlyArray; + addQueuedDMOperation(operation: ClientDBQueuedDMOperation): void; + removeQueuedDMOperationsOlderThan(timestamp: string): void; + clearQueuedDMOperations(queueType: string, queueKey: string): void; + getQueuedDMOperations(): $ReadOnlyArray; + removeLocalMessageInfos(includeNonLocalMessages: boolean): void; // method is provided to manually signal that a C++ object