diff --git a/native/cpp/CommonCpp/NativeModules/CommCoreModule.cpp b/native/cpp/CommonCpp/NativeModules/CommCoreModule.cpp --- a/native/cpp/CommonCpp/NativeModules/CommCoreModule.cpp +++ b/native/cpp/CommonCpp/NativeModules/CommCoreModule.cpp @@ -81,6 +81,7 @@ std::vector messageStoreLocalMessageInfosVector; std::vector dmOperationsVector; std::vector holdersVector; + std::vector queuedDMOperationsVector; try { draftsVector = DatabaseManager::getQueryExecutor(identifier).getAllDrafts(); @@ -118,6 +119,9 @@ DatabaseManager::getQueryExecutor(identifier).getDMOperations(); holdersVector = DatabaseManager::getQueryExecutor(identifier).getHolders(); + queuedDMOperationsVector = + DatabaseManager::getQueryExecutor(identifier) + .getQueuedDMOperations(); } catch (std::system_error &e) { error = e.what(); } @@ -162,6 +166,9 @@ std::move(dmOperationsVector)); auto holdersVectorPtr = std::make_shared>(std::move(holdersVector)); + auto queuedDMOperationsVectorPtr = + std::make_shared>( + std::move(queuedDMOperationsVector)); this->jsInvoker_->invokeAsync( [&innerRt, draftsVectorPtr, @@ -180,6 +187,7 @@ messageStoreLocalMessageInfosVectorPtr, dmOperationsVectorPtr, holdersVectorPtr, + queuedDMOperationsVectorPtr, error, promise, draftStore = this->draftStore, @@ -236,6 +244,9 @@ innerRt, dmOperationsVectorPtr); jsi::Array jsiHolders = holderStore.parseDBDataStore(innerRt, holdersVectorPtr); + jsi::Array jsiQueuedDMOperations = + dmOperationStore.parseDBQueuedDMOperations( + innerRt, queuedDMOperationsVectorPtr); auto jsiClientDBStore = jsi::Object(innerRt); jsiClientDBStore.setProperty(innerRt, "messages", jsiMessages); @@ -266,6 +277,8 @@ jsiClientDBStore.setProperty( innerRt, "dmOperations", jsiDMOperations); jsiClientDBStore.setProperty(innerRt, "holders", jsiHolders); + jsiClientDBStore.setProperty( + innerRt, "queuedDMOperations", jsiQueuedDMOperations); promise->resolve(std::move(jsiClientDBStore)); }); diff --git a/native/cpp/CommonCpp/NativeModules/DMOperationStoreOperations.h b/native/cpp/CommonCpp/NativeModules/DMOperationStoreOperations.h --- a/native/cpp/CommonCpp/NativeModules/DMOperationStoreOperations.h +++ b/native/cpp/CommonCpp/NativeModules/DMOperationStoreOperations.h @@ -1,6 +1,7 @@ #pragma once #include "../DatabaseManagers/entities/DMOperation.h" +#include "../DatabaseManagers/entities/QueuedDMOperation.h" #include "DBOperationBase.h" #include "DatabaseManager.h" #include @@ -39,5 +40,49 @@ DatabaseManager::getQueryExecutor(id).removeAllDMOperations(); } }; +class AddQueuedDMOperationOperation : public DBOperationBase { +public: + AddQueuedDMOperationOperation(QueuedDMOperation &&operation) + : operation{std::move(operation)} { + } + + virtual void execute(DatabaseIdentifier id) override { + DatabaseManager::getQueryExecutor(id).addQueuedDMOperation(this->operation); + } + +private: + QueuedDMOperation operation; +}; + +class ClearQueuedDMOperationsOperation : public DBOperationBase { +public: + ClearQueuedDMOperationsOperation(std::string queueType, std::string queueKey) + : queueType{queueType}, queueKey{queueKey} { + } + + virtual void execute(DatabaseIdentifier id) override { + DatabaseManager::getQueryExecutor(id).clearQueuedDMOperations( + this->queueType, this->queueKey); + } + +private: + std::string queueType; + std::string queueKey; +}; + +class PruneQueuedDMOperationsOperation : public DBOperationBase { +public: + PruneQueuedDMOperationsOperation(std::string timestamp) + : timestamp{timestamp} { + } + + virtual void execute(DatabaseIdentifier id) override { + DatabaseManager::getQueryExecutor(id).removeQueuedDMOperationsOlderThan( + this->timestamp); + } + +private: + std::string timestamp; +}; } // namespace comm diff --git a/native/cpp/CommonCpp/NativeModules/PersistentStorageUtilities/DataStores/DMOperationStore.h b/native/cpp/CommonCpp/NativeModules/PersistentStorageUtilities/DataStores/DMOperationStore.h --- a/native/cpp/CommonCpp/NativeModules/PersistentStorageUtilities/DataStores/DMOperationStore.h +++ b/native/cpp/CommonCpp/NativeModules/PersistentStorageUtilities/DataStores/DMOperationStore.h @@ -1,6 +1,7 @@ #pragma once #include "../../../DatabaseManagers/entities/DMOperation.h" +#include "../../../DatabaseManagers/entities/QueuedDMOperation.h" #include "../../DBOperationBase.h" #include "BaseDataStore.h" #include "DMOperationStoreOperations.h" @@ -14,6 +15,9 @@ static OperationType REMOVE_OPERATION; static OperationType REMOVE_ALL_OPERATION; static OperationType REPLACE_OPERATION; + static OperationType ADD_QUEUED_OPERATION; + static OperationType CLEAR_QUEUED_OPERATION; + static OperationType PRUNE_QUEUED_OPERATION; public: DMOperationStore(std::shared_ptr jsInvoker); @@ -25,6 +29,10 @@ jsi::Array parseDBDataStore( jsi::Runtime &rt, std::shared_ptr> dataVectorPtr) const override; + + jsi::Array parseDBQueuedDMOperations( + jsi::Runtime &rt, + std::shared_ptr> dataVectorPtr) const; }; } // namespace comm diff --git a/native/cpp/CommonCpp/NativeModules/PersistentStorageUtilities/DataStores/DMOperationStore.cpp b/native/cpp/CommonCpp/NativeModules/PersistentStorageUtilities/DataStores/DMOperationStore.cpp --- a/native/cpp/CommonCpp/NativeModules/PersistentStorageUtilities/DataStores/DMOperationStore.cpp +++ b/native/cpp/CommonCpp/NativeModules/PersistentStorageUtilities/DataStores/DMOperationStore.cpp @@ -9,6 +9,12 @@ OperationType DMOperationStore::REMOVE_ALL_OPERATION = "remove_all_dm_operations"; OperationType DMOperationStore::REPLACE_OPERATION = "replace_dm_operation"; +OperationType DMOperationStore::ADD_QUEUED_OPERATION = + "add_queued_dm_operation"; +OperationType DMOperationStore::CLEAR_QUEUED_OPERATION = + "clear_dm_operations_queue"; +OperationType DMOperationStore::PRUNE_QUEUED_OPERATION = + "prune_queued_dm_operations"; DMOperationStore::DMOperationStore( std::shared_ptr jsInvoker) @@ -31,6 +37,23 @@ return jsiOperations; } +jsi::Array DMOperationStore::parseDBQueuedDMOperations( + jsi::Runtime &rt, + std::shared_ptr> dataVectorPtr) const { + size_t numOperations = dataVectorPtr->size(); + jsi::Array jsiOperations = jsi::Array(rt, numOperations); + size_t writeIdx = 0; + for (const QueuedDMOperation &operation : *dataVectorPtr) { + jsi::Object jsiOperation = jsi::Object(rt); + jsiOperation.setProperty(rt, "queueType", operation.queue_type); + jsiOperation.setProperty(rt, "queueKey", operation.queue_key); + jsiOperation.setProperty(rt, "operationData", operation.operation_data); + jsiOperation.setProperty(rt, "timestamp", operation.timestamp); + jsiOperations.setValueAtIndex(rt, writeIdx++, jsiOperation); + } + return jsiOperations; +} + std::vector> DMOperationStore::createOperations( jsi::Runtime &rt, @@ -67,6 +90,39 @@ dmOperationStoreOps.push_back( std::make_unique( std::move(dmOperation))); + } else if (opType == ADD_QUEUED_OPERATION) { + jsi::Object payloadObj = op.getProperty(rt, "payload").asObject(rt); + std::string queueType = + payloadObj.getProperty(rt, "queueType").asString(rt).utf8(rt); + std::string queueKey = + payloadObj.getProperty(rt, "queueKey").asString(rt).utf8(rt); + std::string operationData = + payloadObj.getProperty(rt, "operationData").asString(rt).utf8(rt); + std::string timestamp = + payloadObj.getProperty(rt, "timestamp").asString(rt).utf8(rt); + + QueuedDMOperation operation{ + queueType, queueKey, operationData, timestamp}; + dmOperationStoreOps.push_back( + std::make_unique( + std::move(operation))); + } else if (opType == CLEAR_QUEUED_OPERATION) { + jsi::Object payloadObj = op.getProperty(rt, "payload").asObject(rt); + std::string queueType = + payloadObj.getProperty(rt, "queueType").asString(rt).utf8(rt); + std::string queueKey = + payloadObj.getProperty(rt, "queueKey").asString(rt).utf8(rt); + + dmOperationStoreOps.push_back( + std::make_unique( + queueType, queueKey)); + } else if (opType == PRUNE_QUEUED_OPERATION) { + jsi::Object payloadObj = op.getProperty(rt, "payload").asObject(rt); + std::string timestamp = + payloadObj.getProperty(rt, "timestamp").asString(rt).utf8(rt); + + dmOperationStoreOps.push_back( + std::make_unique(timestamp)); } else { throw std::runtime_error("unsupported operation: " + opType); }