diff --git a/.gitignore b/.gitignore --- a/.gitignore +++ b/.gitignore @@ -33,7 +33,6 @@ # Nix result* -services/backup/old/blob_client/target/ .direnv # Electron diff --git a/services/backup/old/CMakeLists.txt b/services/backup/old/CMakeLists.txt deleted file mode 100644 --- a/services/backup/old/CMakeLists.txt +++ /dev/null @@ -1,136 +0,0 @@ -PROJECT(backup CXX) - -cmake_minimum_required(VERSION 3.16) - -set(CMAKE_RUNTIME_OUTPUT_DIRECTORY bin) - -if(COMMAND cmake_policy) - cmake_policy(SET CMP0003 NEW) -endif(COMMAND cmake_policy) - -set(CMAKE_CXX_STANDARD 17) -# For C++17 on MacOS, we must set minimum target to 10.14+ -set(CMAKE_OSX_DEPLOYMENT_TARGET 10.14) - -find_package(glog REQUIRED) -find_package(Protobuf REQUIRED) -find_package(gRPC REQUIRED) -find_package(Folly REQUIRED) -find_package(AWSSDK REQUIRED COMPONENTS core dynamodb) -find_package(Boost 1.40 - COMPONENTS program_options context filesystem regex system thread - REQUIRED -) - -# Rust integration -find_package(Corrosion REQUIRED) - -set(_shared_path "../../shared") -set(_proto_path "${_shared_path}/protos") -set(_shared_cmake "${_shared_path}/cmake") - -include(${_shared_cmake}/corrosion-cxx.cmake) - -# Shared Comm protos -add_subdirectory(${CMAKE_CURRENT_SOURCE_DIR}/${_proto_path} - ${CMAKE_CURRENT_BINARY_DIR}/protos - EXCLUDE_FROM_ALL -) - -add_subdirectory(${CMAKE_CURRENT_SOURCE_DIR}/../lib/src - ${CMAKE_CURRENT_BINARY_DIR}/lib/src - EXCLUDE_FROM_ALL -) - -file(GLOB_RECURSE SOURCE_CODE "./src/*.cpp") - -# SERVER -add_executable( - backup - - ${SOURCE_CODE} -) - -add_library_rust(PATH blob_client NAMESPACE backup) - -set(INCLUDE_DIRS - ${CMAKE_CURRENT_SOURCE_DIR}/src - ${CMAKE_CURRENT_SOURCE_DIR}/src/DatabaseEntities - ${CMAKE_CURRENT_SOURCE_DIR}/src/Reactors - ${CMAKE_CURRENT_SOURCE_DIR}/src/Reactors/server - ${CMAKE_CURRENT_SOURCE_DIR}/src/Reactors/server/base-reactors - - ${Boost_INCLUDE_DIR} -) - -target_include_directories( - backup - PUBLIC - ${INCLUDE_DIRS} -) - -set( - LIBS - - ${GRPC_LIBS} - ${AWSSDK_LINK_LIBRARIES} - ${Boost_LIBRARIES} - glog::glog - Folly::folly - gRPC::grpc++ - - comm-blob-grpc - comm-backup-grpc - comm-services-common - comm-server-base-reactors - backup::blob_client -) - -target_link_libraries( - backup - - ${LIBS} -) - -install( - TARGETS backup - RUNTIME DESTINATION bin/ -) - -# TEST -option(BUILD_TESTING "Turn off tests" ON) - -if (BUILD_TESTING) - file(GLOB TEST_CODE "./test/*.cpp") - list(FILTER SOURCE_CODE EXCLUDE REGEX "./src/server.cpp") - enable_testing() - - find_package(GTest CONFIG REQUIRED) - include_directories( - ${GTEST_INCLUDE_DIR} - ./test - ) - - add_executable( - runTests - - ${SOURCE_CODE} - ${TEST_CODE} - ) - - target_include_directories( - runTests - PUBLIC - ${INCLUDE_DIRS} - ) - - target_link_libraries( - runTests - - ${LIBS} - GTest::gtest_main - ) - - include(GoogleTest) - gtest_discover_tests(runTests) -endif() diff --git a/services/backup/old/blob_client/Cargo.lock b/services/backup/old/blob_client/Cargo.lock deleted file mode 100644 --- a/services/backup/old/blob_client/Cargo.lock +++ /dev/null @@ -1,1195 +0,0 @@ -# This file is automatically @generated by Cargo. -# It is not intended for manual editing. -version = 3 - -[[package]] -name = "aho-corasick" -version = "0.7.18" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1e37cfd5e7657ada45f742d6e99ca5788580b5c529dc78faf11ece6dc702656f" -dependencies = [ - "memchr", -] - -[[package]] -name = "anyhow" -version = "1.0.64" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b9a8f622bcf6ff3df478e9deba3e03e4e04b300f8e6a139e192c05fa3490afc7" - -[[package]] -name = "async-stream" -version = "0.3.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dad5c83079eae9969be7fadefe640a1c566901f05ff91ab221de4b6f68d9507e" -dependencies = [ - "async-stream-impl", - "futures-core", -] - -[[package]] -name = "async-stream-impl" -version = "0.3.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "10f203db73a71dfa2fb6dd22763990fa26f3d2625a6da2da900d23b87d26be27" -dependencies = [ - "proc-macro2", - "quote", - "syn", -] - -[[package]] -name = "async-trait" -version = "0.1.57" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "76464446b8bc32758d7e88ee1a804d9914cd9b1cb264c029899680b0be29826f" -dependencies = [ - "proc-macro2", - "quote", - "syn", -] - -[[package]] -name = "atty" -version = "0.2.14" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d9b39be18770d11421cdb1b9947a45dd3f37e93092cbf377614828a319d5fee8" -dependencies = [ - "hermit-abi", - "libc", - "winapi", -] - -[[package]] -name = "autocfg" -version = "1.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" - -[[package]] -name = "axum" -version = "0.5.15" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9de18bc5f2e9df8f52da03856bf40e29b747de5a84e43aefff90e3dc4a21529b" -dependencies = [ - "async-trait", - "axum-core", - "bitflags", - "bytes", - "futures-util", - "http", - "http-body", - "hyper", - "itoa", - "matchit", - "memchr", - "mime", - "percent-encoding", - "pin-project-lite", - "serde", - "sync_wrapper", - "tokio", - "tower", - "tower-http", - "tower-layer", - "tower-service", -] - -[[package]] -name = "axum-core" -version = "0.2.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d9f0c0a60006f2a293d82d571f635042a72edf927539b7685bd62d361963839b" -dependencies = [ - "async-trait", - "bytes", - "futures-util", - "http", - "http-body", - "mime", - "tower-layer", - "tower-service", -] - -[[package]] -name = "base64" -version = "0.13.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "904dfeac50f3cdaba28fc6f57fdcddb75f49ed61346676a78c4ffe55877802fd" - -[[package]] -name = "bitflags" -version = "1.3.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" - -[[package]] -name = "blob_client" -version = "0.1.0" -dependencies = [ - "anyhow", - "async-stream", - "cxx", - "cxx-build", - "env_logger", - "lazy_static", - "libc", - "log", - "prost", - "regex", - "tokio", - "tokio-stream", - "tonic", - "tonic-build", - "tracing", -] - -[[package]] -name = "bytes" -version = "1.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ec8a7b6a70fde80372154c65702f00a0f56f3e1c36abbc6c440484be248856db" - -[[package]] -name = "cc" -version = "1.0.73" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2fff2a6927b3bb87f9595d67196a70493f627687a71d87a0d692242c33f58c11" - -[[package]] -name = "cfg-if" -version = "1.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" - -[[package]] -name = "codespan-reporting" -version = "0.11.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3538270d33cc669650c4b093848450d380def10c331d38c768e34cac80576e6e" -dependencies = [ - "termcolor", - "unicode-width", -] - -[[package]] -name = "cxx" -version = "1.0.73" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "873c2e83af70859af2aaecd1f5d862f3790b747b1f4f50fb45a931d000ac0422" -dependencies = [ - "cc", - "cxxbridge-flags", - "cxxbridge-macro", - "link-cplusplus", -] - -[[package]] -name = "cxx-build" -version = "1.0.73" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b49edea7163bbc7a39e3d829b4b0b66a9d30486973152842b7413f2c7b5632bf" -dependencies = [ - "cc", - "codespan-reporting", - "once_cell", - "proc-macro2", - "quote", - "scratch", - "syn", -] - -[[package]] -name = "cxxbridge-flags" -version = "1.0.73" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f46b787c15af80277db5c88c6ac6c502ae545e622f010e06f95e540d34931acf" - -[[package]] -name = "cxxbridge-macro" -version = "1.0.73" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2ba3f3a7efa46626878fb5d324fabca4d19d2956b6ae97ce43044ef4515f5abc" -dependencies = [ - "proc-macro2", - "quote", - "syn", -] - -[[package]] -name = "either" -version = "1.8.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "90e5c1c8368803113bf0c9584fc495a58b86dc8a29edbf8fe877d21d9507e797" - -[[package]] -name = "env_logger" -version = "0.9.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0b2cf0344971ee6c64c31be0d530793fba457d322dfec2810c453d0ef228f9c3" -dependencies = [ - "atty", - "humantime", - "log", - "regex", - "termcolor", -] - -[[package]] -name = "fastrand" -version = "1.8.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a7a407cfaa3385c4ae6b23e84623d48c2798d06e3e6a1878f7f59f17b3f86499" -dependencies = [ - "instant", -] - -[[package]] -name = "fixedbitset" -version = "0.4.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80" - -[[package]] -name = "fnv" -version = "1.0.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" - -[[package]] -name = "futures-channel" -version = "0.3.24" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "30bdd20c28fadd505d0fd6712cdfcb0d4b5648baf45faef7f852afb2399bb050" -dependencies = [ - "futures-core", -] - -[[package]] -name = "futures-core" -version = "0.3.24" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4e5aa3de05362c3fb88de6531e6296e85cde7739cccad4b9dfeeb7f6ebce56bf" - -[[package]] -name = "futures-sink" -version = "0.3.24" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "21b20ba5a92e727ba30e72834706623d94ac93a725410b6a6b6fbc1b07f7ba56" - -[[package]] -name = "futures-task" -version = "0.3.24" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a6508c467c73851293f390476d4491cf4d227dbabcd4170f3bb6044959b294f1" - -[[package]] -name = "futures-util" -version = "0.3.24" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "44fb6cb1be61cc1d2e43b262516aafcf63b241cffdb1d3fa115f91d9c7b09c90" -dependencies = [ - "futures-core", - "futures-task", - "pin-project-lite", - "pin-utils", -] - -[[package]] -name = "getrandom" -version = "0.2.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4eb1a864a501629691edf6c15a593b7a51eebaa1e8468e9ddc623de7c9b58ec6" -dependencies = [ - "cfg-if", - "libc", - "wasi", -] - -[[package]] -name = "h2" -version = "0.3.14" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5ca32592cf21ac7ccab1825cd87f6c9b3d9022c44d086172ed0966bec8af30be" -dependencies = [ - "bytes", - "fnv", - "futures-core", - "futures-sink", - "futures-util", - "http", - "indexmap", - "slab", - "tokio", - "tokio-util", - "tracing", -] - -[[package]] -name = "hashbrown" -version = "0.12.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" - -[[package]] -name = "heck" -version = "0.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2540771e65fc8cb83cd6e8a237f70c319bd5c29f78ed1084ba5d50eeac86f7f9" - -[[package]] -name = "hermit-abi" -version = "0.1.19" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "62b467343b94ba476dcb2500d242dadbb39557df889310ac77c5d99100aaac33" -dependencies = [ - "libc", -] - -[[package]] -name = "http" -version = "0.2.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "75f43d41e26995c17e71ee126451dd3941010b0514a81a9d11f3b341debc2399" -dependencies = [ - "bytes", - "fnv", - "itoa", -] - -[[package]] -name = "http-body" -version = "0.4.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d5f38f16d184e36f2408a55281cd658ecbd3ca05cce6d6510a176eca393e26d1" -dependencies = [ - "bytes", - "http", - "pin-project-lite", -] - -[[package]] -name = "http-range-header" -version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0bfe8eed0a9285ef776bb792479ea3834e8b94e13d615c2f66d03dd50a435a29" - -[[package]] -name = "httparse" -version = "1.7.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "496ce29bb5a52785b44e0f7ca2847ae0bb839c9bd28f69acac9b99d461c0c04c" - -[[package]] -name = "httpdate" -version = "1.0.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c4a1e36c821dbe04574f602848a19f742f4fb3c98d40449f11bcad18d6b17421" - -[[package]] -name = "humantime" -version = "2.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" - -[[package]] -name = "hyper" -version = "0.14.20" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "02c929dc5c39e335a03c405292728118860721b10190d98c2a0f0efd5baafbac" -dependencies = [ - "bytes", - "futures-channel", - "futures-core", - "futures-util", - "h2", - "http", - "http-body", - "httparse", - "httpdate", - "itoa", - "pin-project-lite", - "socket2", - "tokio", - "tower-service", - "tracing", - "want", -] - -[[package]] -name = "hyper-timeout" -version = "0.4.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bbb958482e8c7be4bc3cf272a766a2b0bf1a6755e7a6ae777f017a31d11b13b1" -dependencies = [ - "hyper", - "pin-project-lite", - "tokio", - "tokio-io-timeout", -] - -[[package]] -name = "indexmap" -version = "1.9.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "10a35a97730320ffe8e2d410b5d3b69279b98d2c14bdb8b70ea89ecf7888d41e" -dependencies = [ - "autocfg", - "hashbrown", -] - -[[package]] -name = "instant" -version = "0.1.12" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7a5bbe824c507c5da5956355e86a746d82e0e1464f65d862cc5e71da70e94b2c" -dependencies = [ - "cfg-if", -] - -[[package]] -name = "itertools" -version = "0.10.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a9a9d19fa1e79b6215ff29b9d6880b706147f16e9b1dbb1e4e5947b5b02bc5e3" -dependencies = [ - "either", -] - -[[package]] -name = "itoa" -version = "1.0.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6c8af84674fe1f223a982c933a0ee1086ac4d4052aa0fb8060c12c6ad838e754" - -[[package]] -name = "lazy_static" -version = "1.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" - -[[package]] -name = "libc" -version = "0.2.132" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8371e4e5341c3a96db127eb2465ac681ced4c433e01dd0e938adbef26ba93ba5" - -[[package]] -name = "link-cplusplus" -version = "1.0.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9272ab7b96c9046fbc5bc56c06c117cb639fe2d509df0c421cad82d2915cf369" -dependencies = [ - "cc", -] - -[[package]] -name = "log" -version = "0.4.17" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "abb12e687cfb44aa40f41fc3978ef76448f9b6038cad6aef4259d3c095a2382e" -dependencies = [ - "cfg-if", -] - -[[package]] -name = "matchit" -version = "0.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "73cbba799671b762df5a175adf59ce145165747bb891505c43d09aefbbf38beb" - -[[package]] -name = "memchr" -version = "2.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2dffe52ecf27772e601905b7522cb4ef790d2cc203488bbd0e2fe85fcb74566d" - -[[package]] -name = "mime" -version = "0.3.16" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2a60c7ce501c71e03a9c9c0d35b861413ae925bd979cc7a4e30d060069aaac8d" - -[[package]] -name = "mio" -version = "0.8.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "57ee1c23c7c63b0c9250c339ffdc69255f110b298b901b9f6c82547b7b87caaf" -dependencies = [ - "libc", - "log", - "wasi", - "windows-sys 0.36.1", -] - -[[package]] -name = "multimap" -version = "0.8.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e5ce46fe64a9d73be07dcbe690a38ce1b293be448fd8ce1e6c1b8062c9f72c6a" - -[[package]] -name = "num_cpus" -version = "1.13.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "19e64526ebdee182341572e50e9ad03965aa510cd94427a4549448f285e957a1" -dependencies = [ - "hermit-abi", - "libc", -] - -[[package]] -name = "once_cell" -version = "1.13.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "074864da206b4973b84eb91683020dbefd6a8c3f0f38e054d93954e891935e4e" - -[[package]] -name = "percent-encoding" -version = "2.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e" - -[[package]] -name = "petgraph" -version = "0.6.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e6d5014253a1331579ce62aa67443b4a658c5e7dd03d4bc6d302b94474888143" -dependencies = [ - "fixedbitset", - "indexmap", -] - -[[package]] -name = "pin-project" -version = "1.0.12" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ad29a609b6bcd67fee905812e544992d216af9d755757c05ed2d0e15a74c6ecc" -dependencies = [ - "pin-project-internal", -] - -[[package]] -name = "pin-project-internal" -version = "1.0.12" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "069bdb1e05adc7a8990dce9cc75370895fbe4e3d58b9b73bf1aee56359344a55" -dependencies = [ - "proc-macro2", - "quote", - "syn", -] - -[[package]] -name = "pin-project-lite" -version = "0.2.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e0a7ae3ac2f1173085d398531c705756c94a4c56843785df85a60c1a0afac116" - -[[package]] -name = "pin-utils" -version = "0.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" - -[[package]] -name = "ppv-lite86" -version = "0.2.16" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eb9f9e6e233e5c4a35559a617bf40a4ec447db2e84c20b55a6f83167b7e57872" - -[[package]] -name = "prettyplease" -version = "0.1.18" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "697ae720ee02011f439e0701db107ffe2916d83f718342d65d7f8bf7b8a5fee9" -dependencies = [ - "proc-macro2", - "syn", -] - -[[package]] -name = "proc-macro2" -version = "1.0.43" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0a2ca2c61bc9f3d74d2886294ab7b9853abd9c1ad903a3ac7815c58989bb7bab" -dependencies = [ - "unicode-ident", -] - -[[package]] -name = "prost" -version = "0.11.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "399c3c31cdec40583bb68f0b18403400d01ec4289c383aa047560439952c4dd7" -dependencies = [ - "bytes", - "prost-derive", -] - -[[package]] -name = "prost-build" -version = "0.11.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7f835c582e6bd972ba8347313300219fed5bfa52caf175298d860b61ff6069bb" -dependencies = [ - "bytes", - "heck", - "itertools", - "lazy_static", - "log", - "multimap", - "petgraph", - "prost", - "prost-types", - "regex", - "tempfile", - "which", -] - -[[package]] -name = "prost-derive" -version = "0.11.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7345d5f0e08c0536d7ac7229952590239e77abf0a0100a1b1d890add6ea96364" -dependencies = [ - "anyhow", - "itertools", - "proc-macro2", - "quote", - "syn", -] - -[[package]] -name = "prost-types" -version = "0.11.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4dfaa718ad76a44b3415e6c4d53b17c8f99160dcb3a99b10470fce8ad43f6e3e" -dependencies = [ - "bytes", - "prost", -] - -[[package]] -name = "quote" -version = "1.0.21" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bbe448f377a7d6961e30f5955f9b8d106c3f5e449d493ee1b125c1d43c2b5179" -dependencies = [ - "proc-macro2", -] - -[[package]] -name = "rand" -version = "0.8.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" -dependencies = [ - "libc", - "rand_chacha", - "rand_core", -] - -[[package]] -name = "rand_chacha" -version = "0.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" -dependencies = [ - "ppv-lite86", - "rand_core", -] - -[[package]] -name = "rand_core" -version = "0.6.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d34f1408f55294453790c48b2f1ebbb1c5b4b7563eb1f418bcfcfdbb06ebb4e7" -dependencies = [ - "getrandom", -] - -[[package]] -name = "redox_syscall" -version = "0.2.16" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fb5a58c1855b4b6819d59012155603f0b22ad30cad752600aadfcb695265519a" -dependencies = [ - "bitflags", -] - -[[package]] -name = "regex" -version = "1.6.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4c4eb3267174b8c6c2f654116623910a0fef09c4753f8dd83db29c48a0df988b" -dependencies = [ - "aho-corasick", - "memchr", - "regex-syntax", -] - -[[package]] -name = "regex-syntax" -version = "0.6.27" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a3f87b73ce11b1619a3c6332f45341e0047173771e8b8b73f87bfeefb7b56244" - -[[package]] -name = "remove_dir_all" -version = "0.5.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3acd125665422973a33ac9d3dd2df85edad0f4ae9b00dafb1a05e43a9f5ef8e7" -dependencies = [ - "winapi", -] - -[[package]] -name = "scratch" -version = "1.0.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9c8132065adcfd6e02db789d9285a0deb2f3fcb04002865ab67d5fb103533898" - -[[package]] -name = "serde" -version = "1.0.144" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0f747710de3dcd43b88c9168773254e809d8ddbdf9653b84e2554ab219f17860" - -[[package]] -name = "slab" -version = "0.4.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4614a76b2a8be0058caa9dbbaf66d988527d86d003c11a94fbd335d7661edcef" -dependencies = [ - "autocfg", -] - -[[package]] -name = "socket2" -version = "0.4.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "10c98bba371b9b22a71a9414e420f92ddeb2369239af08200816169d5e2dd7aa" -dependencies = [ - "libc", - "winapi", -] - -[[package]] -name = "syn" -version = "1.0.99" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "58dbef6ec655055e20b86b15a8cc6d439cca19b667537ac6a1369572d151ab13" -dependencies = [ - "proc-macro2", - "quote", - "unicode-ident", -] - -[[package]] -name = "sync_wrapper" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "20518fe4a4c9acf048008599e464deb21beeae3d3578418951a189c235a7a9a8" - -[[package]] -name = "tempfile" -version = "3.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5cdb1ef4eaeeaddc8fbd371e5017057064af0911902ef36b39801f67cc6d79e4" -dependencies = [ - "cfg-if", - "fastrand", - "libc", - "redox_syscall", - "remove_dir_all", - "winapi", -] - -[[package]] -name = "termcolor" -version = "1.1.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bab24d30b911b2376f3a13cc2cd443142f0c81dda04c118693e35b3835757755" -dependencies = [ - "winapi-util", -] - -[[package]] -name = "tokio" -version = "1.24.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1d9f76183f91ecfb55e1d7d5602bd1d979e38a3a522fe900241cf195624d67ae" -dependencies = [ - "autocfg", - "bytes", - "libc", - "memchr", - "mio", - "num_cpus", - "pin-project-lite", - "socket2", - "tokio-macros", - "windows-sys 0.42.0", -] - -[[package]] -name = "tokio-io-timeout" -version = "1.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "30b74022ada614a1b4834de765f9bb43877f910cc8ce4be40e89042c9223a8bf" -dependencies = [ - "pin-project-lite", - "tokio", -] - -[[package]] -name = "tokio-macros" -version = "1.8.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9724f9a975fb987ef7a3cd9be0350edcbe130698af5b8f7a631e23d42d052484" -dependencies = [ - "proc-macro2", - "quote", - "syn", -] - -[[package]] -name = "tokio-stream" -version = "0.1.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "df54d54117d6fdc4e4fea40fe1e4e566b3505700e148a6827e59b34b0d2600d9" -dependencies = [ - "futures-core", - "pin-project-lite", - "tokio", -] - -[[package]] -name = "tokio-util" -version = "0.7.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cc463cd8deddc3770d20f9852143d50bf6094e640b485cb2e189a2099085ff45" -dependencies = [ - "bytes", - "futures-core", - "futures-sink", - "pin-project-lite", - "tokio", - "tracing", -] - -[[package]] -name = "tonic" -version = "0.8.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "498f271adc46acce75d66f639e4d35b31b2394c295c82496727dafa16d465dd2" -dependencies = [ - "async-stream", - "async-trait", - "axum", - "base64", - "bytes", - "futures-core", - "futures-util", - "h2", - "http", - "http-body", - "hyper", - "hyper-timeout", - "percent-encoding", - "pin-project", - "prost", - "prost-derive", - "tokio", - "tokio-stream", - "tokio-util", - "tower", - "tower-layer", - "tower-service", - "tracing", - "tracing-futures", -] - -[[package]] -name = "tonic-build" -version = "0.8.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2fbcd2800e34e743b9ae795867d5f77b535d3a3be69fd731e39145719752df8c" -dependencies = [ - "prettyplease", - "proc-macro2", - "prost-build", - "quote", - "syn", -] - -[[package]] -name = "tower" -version = "0.4.13" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c" -dependencies = [ - "futures-core", - "futures-util", - "indexmap", - "pin-project", - "pin-project-lite", - "rand", - "slab", - "tokio", - "tokio-util", - "tower-layer", - "tower-service", - "tracing", -] - -[[package]] -name = "tower-http" -version = "0.3.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3c530c8675c1dbf98facee631536fa116b5fb6382d7dd6dc1b118d970eafe3ba" -dependencies = [ - "bitflags", - "bytes", - "futures-core", - "futures-util", - "http", - "http-body", - "http-range-header", - "pin-project-lite", - "tower", - "tower-layer", - "tower-service", -] - -[[package]] -name = "tower-layer" -version = "0.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "343bc9466d3fe6b0f960ef45960509f84480bf4fd96f92901afe7ff3df9d3a62" - -[[package]] -name = "tower-service" -version = "0.3.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b6bc1c9ce2b5135ac7f93c72918fc37feb872bdc6a5533a8b85eb4b86bfdae52" - -[[package]] -name = "tracing" -version = "0.1.36" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2fce9567bd60a67d08a16488756721ba392f24f29006402881e43b19aac64307" -dependencies = [ - "cfg-if", - "log", - "pin-project-lite", - "tracing-attributes", - "tracing-core", -] - -[[package]] -name = "tracing-attributes" -version = "0.1.22" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "11c75893af559bc8e10716548bdef5cb2b983f8e637db9d0e15126b61b484ee2" -dependencies = [ - "proc-macro2", - "quote", - "syn", -] - -[[package]] -name = "tracing-core" -version = "0.1.29" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5aeea4303076558a00714b823f9ad67d58a3bbda1df83d8827d21193156e22f7" -dependencies = [ - "once_cell", -] - -[[package]] -name = "tracing-futures" -version = "0.2.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "97d095ae15e245a057c8e8451bab9b3ee1e1f68e9ba2b4fbc18d0ac5237835f2" -dependencies = [ - "pin-project", - "tracing", -] - -[[package]] -name = "try-lock" -version = "0.2.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "59547bce71d9c38b83d9c0e92b6066c4253371f15005def0c30d9657f50c7642" - -[[package]] -name = "unicode-ident" -version = "1.0.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c4f5b37a154999a8f3f98cc23a628d850e154479cd94decf3414696e12e31aaf" - -[[package]] -name = "unicode-width" -version = "0.1.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3ed742d4ea2bd1176e236172c8429aaf54486e7ac098db29ffe6529e0ce50973" - -[[package]] -name = "want" -version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1ce8a968cb1cd110d136ff8b819a556d6fb6d919363c61534f6860c7eb172ba0" -dependencies = [ - "log", - "try-lock", -] - -[[package]] -name = "wasi" -version = "0.11.0+wasi-snapshot-preview1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" - -[[package]] -name = "which" -version = "4.2.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5c4fb54e6113b6a8772ee41c3404fb0301ac79604489467e0a9ce1f3e97c24ae" -dependencies = [ - "either", - "lazy_static", - "libc", -] - -[[package]] -name = "winapi" -version = "0.3.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419" -dependencies = [ - "winapi-i686-pc-windows-gnu", - "winapi-x86_64-pc-windows-gnu", -] - -[[package]] -name = "winapi-i686-pc-windows-gnu" -version = "0.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" - -[[package]] -name = "winapi-util" -version = "0.1.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "70ec6ce85bb158151cae5e5c87f95a8e97d2c0c4b001223f33a334e3ce5de178" -dependencies = [ - "winapi", -] - -[[package]] -name = "winapi-x86_64-pc-windows-gnu" -version = "0.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" - -[[package]] -name = "windows-sys" -version = "0.36.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ea04155a16a59f9eab786fe12a4a450e75cdb175f9e0d80da1e17db09f55b8d2" -dependencies = [ - "windows_aarch64_msvc 0.36.1", - "windows_i686_gnu 0.36.1", - "windows_i686_msvc 0.36.1", - "windows_x86_64_gnu 0.36.1", - "windows_x86_64_msvc 0.36.1", -] - -[[package]] -name = "windows-sys" -version = "0.42.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5a3e1820f08b8513f676f7ab6c1f99ff312fb97b553d30ff4dd86f9f15728aa7" -dependencies = [ - "windows_aarch64_gnullvm", - "windows_aarch64_msvc 0.42.1", - "windows_i686_gnu 0.42.1", - "windows_i686_msvc 0.42.1", - "windows_x86_64_gnu 0.42.1", - "windows_x86_64_gnullvm", - "windows_x86_64_msvc 0.42.1", -] - -[[package]] -name = "windows_aarch64_gnullvm" -version = "0.42.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8c9864e83243fdec7fc9c5444389dcbbfd258f745e7853198f365e3c4968a608" - -[[package]] -name = "windows_aarch64_msvc" -version = "0.36.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9bb8c3fd39ade2d67e9874ac4f3db21f0d710bee00fe7cab16949ec184eeaa47" - -[[package]] -name = "windows_aarch64_msvc" -version = "0.42.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4c8b1b673ffc16c47a9ff48570a9d85e25d265735c503681332589af6253c6c7" - -[[package]] -name = "windows_i686_gnu" -version = "0.36.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "180e6ccf01daf4c426b846dfc66db1fc518f074baa793aa7d9b9aaeffad6a3b6" - -[[package]] -name = "windows_i686_gnu" -version = "0.42.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "de3887528ad530ba7bdbb1faa8275ec7a1155a45ffa57c37993960277145d640" - -[[package]] -name = "windows_i686_msvc" -version = "0.36.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e2e7917148b2812d1eeafaeb22a97e4813dfa60a3f8f78ebe204bcc88f12f024" - -[[package]] -name = "windows_i686_msvc" -version = "0.42.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bf4d1122317eddd6ff351aa852118a2418ad4214e6613a50e0191f7004372605" - -[[package]] -name = "windows_x86_64_gnu" -version = "0.36.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4dcd171b8776c41b97521e5da127a2d86ad280114807d0b2ab1e462bc764d9e1" - -[[package]] -name = "windows_x86_64_gnu" -version = "0.42.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c1040f221285e17ebccbc2591ffdc2d44ee1f9186324dd3e84e99ac68d699c45" - -[[package]] -name = "windows_x86_64_gnullvm" -version = "0.42.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "628bfdf232daa22b0d64fdb62b09fcc36bb01f05a3939e20ab73aaf9470d0463" - -[[package]] -name = "windows_x86_64_msvc" -version = "0.36.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c811ca4a8c853ef420abd8592ba53ddbbac90410fab6903b3e79972a631f7680" - -[[package]] -name = "windows_x86_64_msvc" -version = "0.42.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "447660ad36a13288b1db4d4248e857b510e8c3a225c822ba4fb748c0aafecffd" diff --git a/services/backup/old/blob_client/Cargo.toml b/services/backup/old/blob_client/Cargo.toml deleted file mode 100644 --- a/services/backup/old/blob_client/Cargo.toml +++ /dev/null @@ -1,33 +0,0 @@ -[package] -name = "blob_client" -version = "0.1.0" -edition = "2021" - -[dependencies] -cxx = "1.0" -env_logger = "0.9" -tokio = { version = "1.23", features = ["macros", "rt-multi-thread"] } -tokio-stream = "0.1" -lazy_static = "1.4" -libc = "0.2" -log = "0.4" -tonic = "0.8" -prost = "0.11" -tracing = "0.1" -async-stream = "0.3" -anyhow = "1.0" - -[build-dependencies] -cxx-build = "1.0" -tonic-build = "0.8" -regex = "1.6" - -[lib] -crate-type = ["staticlib"] - -[profile.release] -panic = "abort" - -[profile.dev] -debug = true -panic = "abort" diff --git a/services/backup/old/blob_client/build.rs b/services/backup/old/blob_client/build.rs deleted file mode 100644 --- a/services/backup/old/blob_client/build.rs +++ /dev/null @@ -1,10 +0,0 @@ -const PROTO_DIR: &'static str = "../../../shared/protos"; - -fn main() -> Result<(), std::io::Error> { - let _build = cxx_build::bridge("src/lib.rs").flag_if_supported("-std=c++17"); - println!("cargo:rerun-if-changed=src/lib.rs"); - - tonic_build::compile_protos(format!("{}/blob.proto", PROTO_DIR))?; - - Ok(()) -} diff --git a/services/backup/old/blob_client/src/constants.rs b/services/backup/old/blob_client/src/constants.rs deleted file mode 100644 --- a/services/backup/old/blob_client/src/constants.rs +++ /dev/null @@ -1,2 +0,0 @@ -pub const MPSC_CHANNEL_BUFFER_CAPACITY: usize = 1; -pub const BLOB_ADDRESS: &str = "http://blob-server:50051"; diff --git a/services/backup/old/blob_client/src/get_client.rs b/services/backup/old/blob_client/src/get_client.rs deleted file mode 100644 --- a/services/backup/old/blob_client/src/get_client.rs +++ /dev/null @@ -1,135 +0,0 @@ -mod proto { - tonic::include_proto!("blob"); -} - -use proto::blob_service_client::BlobServiceClient; -use proto::GetRequest; - -use crate::constants::{BLOB_ADDRESS, MPSC_CHANNEL_BUFFER_CAPACITY}; -use anyhow::{bail, ensure}; -use crate::RUNTIME; -use lazy_static::lazy_static; -use std::collections::HashMap; -use std::sync::Mutex; -use tokio::sync::mpsc; -use tokio::task::JoinHandle; - -struct ReadClient { - rx: mpsc::Receiver>, - rx_handle: JoinHandle>, -} - -lazy_static! { - // todo: we should consider limiting the clients size, - // if every client is able to allocate up to 4MB data at a time - static ref CLIENTS: Mutex> = - Mutex::new(HashMap::new()); - static ref ERROR_MESSAGES: Mutex> = - Mutex::new(Vec::new()); -} - -fn is_initialized(holder: &str) -> anyhow::Result { - if let Ok(clients) = CLIENTS.lock() { - return Ok(clients.contains_key(holder)); - } - bail!("couldn't access client"); -} - -pub fn get_client_initialize_cxx( - holder: &str, -) -> anyhow::Result<()> { - if is_initialized(&holder)? { - get_client_terminate_cxx(holder.clone())?; - } - - // grpc - if let Ok(mut grpc_client) = - RUNTIME.block_on(async { BlobServiceClient::connect(BLOB_ADDRESS).await }) - { - // spawn receiver thread - let (response_thread_tx, response_thread_rx) = - mpsc::channel::>(MPSC_CHANNEL_BUFFER_CAPACITY); - let holder_string = holder.to_string(); - let rx_handle = RUNTIME.spawn(async move { - let response = grpc_client - .get(GetRequest { - holder: holder_string, - }) - .await?; - let mut inner_response = response.into_inner(); - loop { - let maybe_data = inner_response.message().await?; - let mut result = false; - if let Some(data) = maybe_data { - let data: Vec = data.data_chunk; - result = match response_thread_tx.send(data).await { - Ok(_) => true, - Err(err) => { - bail!(err); - } - } - } - if !result { - break; - } - } - Ok(()) - }); - - if let Ok(mut clients) = CLIENTS.lock() { - let client = ReadClient { - rx_handle, - rx: response_thread_rx, - }; - (*clients).insert(holder.to_string(), client); - return Ok(()); - } - bail!("could not access client"); - } - bail!("could not successfully connect to the blob server") -} - -pub fn get_client_blocking_read_cxx( - holder: &str, -) -> anyhow::Result> { - Ok(RUNTIME.block_on(async { - if let Ok(mut clients) = CLIENTS.lock() { - if let Some(client) = clients.get_mut(&holder.to_string()) { - let maybe_data = client.rx.recv().await; - return Ok(maybe_data.unwrap_or_else(|| vec![])); - } else { - bail!(format!("no client present for {}", holder)); - } - } else { - bail!("couldn't access client"); - } - })?) -} - -pub fn get_client_terminate_cxx( - holder: &str, -) -> anyhow::Result<()> { - if !is_initialized(&holder)? { - return Ok(()); - } - if let Ok(mut clients) = CLIENTS.lock() { - match clients.remove(&holder.to_string()) { - Some(client) => { - RUNTIME.block_on(async { - if client.rx_handle.await.is_err() { - bail!(format!("awaiting for the client {} failed", holder)); - } - Ok(()) - })?; - } - None => { - bail!(format!("no client foudn for {}", holder)); - } - } - } else { - bail!("couldn't access client"); - } - - ensure!(!is_initialized(&holder)?, "client transmitter handler released properly"); - Ok(()) -} diff --git a/services/backup/old/blob_client/src/lib.rs b/services/backup/old/blob_client/src/lib.rs deleted file mode 100644 --- a/services/backup/old/blob_client/src/lib.rs +++ /dev/null @@ -1,55 +0,0 @@ -mod constants; -mod get_client; -mod put_client; - -use env_logger; -use lazy_static::lazy_static; -use log::info; -use tokio::runtime; - -use put_client::{ - put_client_blocking_read_cxx, put_client_initialize_cxx, - put_client_terminate_cxx, put_client_write_cxx, -}; - -use get_client::{ - get_client_blocking_read_cxx, get_client_initialize_cxx, - get_client_terminate_cxx, -}; - -lazy_static! { - static ref RUNTIME: runtime::Runtime = { - env_logger::init(); - info!("Creating tokio runtime"); - runtime::Runtime::new().expect("Unable to create tokio runtime") - }; -} - -#[cxx::bridge] -mod ffi { - extern "Rust" { - fn put_client_initialize_cxx( - holder_char: &str, - ) -> Result<()>; - unsafe fn put_client_write_cxx( - holder_char: &str, - field_index: usize, - data: *const c_char, - ) -> Result<()>; - fn put_client_blocking_read_cxx( - holder_char: &str, - ) -> Result; - fn put_client_terminate_cxx( - holder_char: &str, - ) -> Result<()>; - fn get_client_initialize_cxx( - holder_char: &str, - ) -> Result<()>; - fn get_client_blocking_read_cxx( - holder_char: &str, - ) -> Result>; - fn get_client_terminate_cxx( - holder_char: &str, - ) -> Result<()>; - } -} diff --git a/services/backup/old/blob_client/src/put_client.rs b/services/backup/old/blob_client/src/put_client.rs deleted file mode 100644 --- a/services/backup/old/blob_client/src/put_client.rs +++ /dev/null @@ -1,246 +0,0 @@ -mod proto { - tonic::include_proto!("blob"); -} - -use proto::blob_service_client::BlobServiceClient; -use proto::put_request; -use proto::put_request::Data::*; -use proto::PutRequest; - -use crate::constants::{BLOB_ADDRESS, MPSC_CHANNEL_BUFFER_CAPACITY}; -use anyhow::{bail, ensure}; -use crate::RUNTIME; -use lazy_static::lazy_static; -use libc; -use libc::c_char; -use std::collections::HashMap; -use std::ffi::CStr; -use std::sync::Mutex; -use tokio::sync::mpsc; -use tokio::task::JoinHandle; - -#[derive(Debug)] -struct PutRequestData { - field_index: usize, - data: Vec, -} - -struct BidiClient { - tx: mpsc::Sender, - - rx: mpsc::Receiver, - rx_handle: JoinHandle>, -} - -lazy_static! { - // todo: we should consider limiting the clients size, - // if every client is able to allocate up to 4MB data at a time - static ref CLIENTS: Mutex> = - Mutex::new(HashMap::new()); - static ref ERROR_MESSAGES: Mutex> = - Mutex::new(Vec::new()); -} - -fn is_initialized(holder: &str) -> anyhow::Result { - match CLIENTS.lock() { - Ok(clients) => Ok(clients.contains_key(holder)), - _ => bail!("couldn't access client") - } -} - -pub fn put_client_initialize_cxx( - holder: &str, -) -> anyhow::Result<()> { - if is_initialized(&holder)? { - put_client_terminate_cxx(&holder.to_string())?; - } - ensure!(!is_initialized(&holder)?, "client cannot be initialized twice"); - - // grpc - if let Ok(mut grpc_client) = - RUNTIME.block_on(async { BlobServiceClient::connect(BLOB_ADDRESS).await }) - { - let (request_thread_tx, mut request_thread_rx) = - mpsc::channel::(MPSC_CHANNEL_BUFFER_CAPACITY); - - let outbound = async_stream::stream! { - let mut maybe_error: Option = None; - while let Some(data) = request_thread_rx.recv().await { - let request_data: Option = match data.field_index { - 1 => { - match String::from_utf8(data.data) { - Ok(utf8_data) => Some(Holder(utf8_data)), - _ => { - maybe_error = Some("invalid utf-8".to_string()); - break; - }, - } - } - 2 => { - match String::from_utf8(data.data).ok() { - Some(utf8_data) => Some(BlobHash(utf8_data)), - None => { - maybe_error = Some("invalid utf-8".to_string()); - break; - }, - } - } - 3 => { - Some(DataChunk(data.data)) - } - _ => { - maybe_error = Some(format!("invalid field index value {}", data.field_index)); - break; - } - }; - if let Some (unpacked_data) = request_data { - let request = PutRequest { - data: Some(unpacked_data), - }; - yield request; - } else { - maybe_error = Some("an error occured, aborting connection".to_string()); - break; - } - } - if let Some(error) = maybe_error { - // todo consider handling this differently - println!("an error occured in the stream: {}", error); - } - }; - - // spawn receiver thread - let (response_thread_tx, response_thread_rx) = - mpsc::channel::(MPSC_CHANNEL_BUFFER_CAPACITY); - let rx_handle = RUNTIME.spawn(async move { - match grpc_client.put(tonic::Request::new(outbound)).await { - Ok(response) => { - let mut inner_response = response.into_inner(); - loop { - let maybe_response_message = inner_response.message().await?; - let mut result = false; - if let Some(response_message) = maybe_response_message { - // warning: this will produce an error if there's more unread - // responses than MPSC_CHANNEL_BUFFER_CAPACITY - // you should then use put_client_blocking_read_cxx in order - // to dequeue the responses in c++ and make room for more - if let Ok(_) = response_thread_tx - .try_send((response_message.data_exists as i32).to_string()) - { - result = true; - } else { - bail!("response queue full"); - } - } - if !result { - break; - } - } - } - Err(err) => { - bail!(err.to_string()); - } - }; - Ok(()) - }); - - ensure!(!is_initialized(&holder)?, format!( - "client initialization overlapped for holder {}", - holder - )); - - if let Ok(mut clients) = CLIENTS.lock() { - let client = BidiClient { - tx: request_thread_tx, - rx: response_thread_rx, - rx_handle, - }; - (*clients).insert(holder.to_string(), client); - return Ok(()); - } - bail!(format!("could not access client for holder {}", holder)); - } - bail!("could not successfully connect to the blob server"); -} - -pub fn put_client_blocking_read_cxx( - holder: &str, -) -> anyhow::Result { - Ok(RUNTIME.block_on(async { - if let Ok(mut clients) = CLIENTS.lock() { - let maybe_client = clients.get_mut(holder); - if let Some(client) = maybe_client { - if let Some(data) = client.rx.recv().await { - return Ok(data); - } else { - bail!("couldn't receive data via client's receiver"); - } - } else { - bail!(format!( - "no client detected for {} in blocking read", - holder - )); - } - } else { - bail!("couldn't access clients"); - } - })?) -} - -/** - * field index: - * 1 - holder (utf8 string) - * 2 - blob hash (utf8 string) - * 3 - data chunk (bytes) - */ -pub fn put_client_write_cxx( - holder: &str, - field_index: usize, - data: *const c_char, -) -> anyhow::Result<()> { - let data_c_str: &CStr = unsafe { CStr::from_ptr(data) }; - let data_bytes: Vec = data_c_str.to_bytes().to_vec(); - - RUNTIME.block_on(async { - if let Ok(clients) = CLIENTS.lock() { - let maybe_client = clients.get(&holder.to_string()); - if let Some(client) = maybe_client { - client - .tx - .send(PutRequestData { - field_index, - data: data_bytes, - }) - .await?; - return Ok(()); - } - bail!(format!("no client detected for {} in write", holder)); - } else { - bail!("couldn't access clients"); - } - })?; - Ok(()) -} - -pub fn put_client_terminate_cxx( - holder: &str, -) -> anyhow::Result<()> { - if !is_initialized(&holder)? { - return Ok(()); - } - - if let Ok(mut clients) = CLIENTS.lock() { - let maybe_client = clients.remove(&holder.to_string()); - if let Some(client) = maybe_client { - drop(client.tx); - RUNTIME.block_on(async { client.rx_handle.await? })?; - } else { - bail!("no client detected in terminate"); - } - } else { - bail!("couldn't access client"); - } - - ensure!(!is_initialized(&holder)?, "client transmitter handler released properly"); - Ok(()) -} diff --git a/services/backup/old/src/BackupServiceImpl.h b/services/backup/old/src/BackupServiceImpl.h deleted file mode 100644 --- a/services/backup/old/src/BackupServiceImpl.h +++ /dev/null @@ -1,42 +0,0 @@ -#pragma once - -#include -#include - -#include - -namespace comm { -namespace network { - -class BackupServiceImpl final : public backup::BackupService::CallbackService { - -public: - BackupServiceImpl(); - virtual ~BackupServiceImpl(); - - grpc::ServerBidiReactor< - backup::CreateNewBackupRequest, - backup::CreateNewBackupResponse> * - CreateNewBackup(grpc::CallbackServerContext *context) override; - - grpc::ServerReadReactor *SendLog( - grpc::CallbackServerContext *context, - backup::SendLogResponse *response) override; - - grpc::ServerBidiReactor< - backup::RecoverBackupKeyRequest, - backup::RecoverBackupKeyResponse> * - RecoverBackupKey(grpc::CallbackServerContext *context) override; - - grpc::ServerWriteReactor *PullBackup( - grpc::CallbackServerContext *context, - const backup::PullBackupRequest *request) override; - - grpc::ServerUnaryReactor *AddAttachments( - grpc::CallbackServerContext *context, - const backup::AddAttachmentsRequest *request, - google::protobuf::Empty *response) override; -}; - -} // namespace network -} // namespace comm diff --git a/services/backup/old/src/BackupServiceImpl.cpp b/services/backup/old/src/BackupServiceImpl.cpp deleted file mode 100644 --- a/services/backup/old/src/BackupServiceImpl.cpp +++ /dev/null @@ -1,63 +0,0 @@ -#include "BackupServiceImpl.h" - -#include "AddAttachmentsUtility.h" -#include "CreateNewBackupReactor.h" -#include "PullBackupReactor.h" -#include "RecoverBackupKeyReactor.h" -#include "SendLogReactor.h" - -#include - -namespace comm { -namespace network { - -BackupServiceImpl::BackupServiceImpl() { - Aws::InitAPI({}); -} - -BackupServiceImpl::~BackupServiceImpl() { - Aws::ShutdownAPI({}); -} - -grpc::ServerBidiReactor< - backup::CreateNewBackupRequest, - backup::CreateNewBackupResponse> * -BackupServiceImpl::CreateNewBackup(grpc::CallbackServerContext *context) { - return new reactor::CreateNewBackupReactor(); -} - -grpc::ServerReadReactor *BackupServiceImpl::SendLog( - grpc::CallbackServerContext *context, - backup::SendLogResponse *response) { - return new reactor::SendLogReactor(response); -} - -grpc::ServerBidiReactor< - backup::RecoverBackupKeyRequest, - backup::RecoverBackupKeyResponse> * -BackupServiceImpl::RecoverBackupKey(grpc::CallbackServerContext *context) { - return new reactor::RecoverBackupKeyReactor(); -} - -grpc::ServerWriteReactor * -BackupServiceImpl::PullBackup( - grpc::CallbackServerContext *context, - const backup::PullBackupRequest *request) { - reactor::PullBackupReactor *reactor = new reactor::PullBackupReactor(request); - reactor->start(); - return reactor; -} - -grpc::ServerUnaryReactor *BackupServiceImpl::AddAttachments( - grpc::CallbackServerContext *context, - const backup::AddAttachmentsRequest *request, - google::protobuf::Empty *response) { - grpc::Status status = - reactor::AddAttachmentsUtility().processRequest(request); - auto *reactor = context->DefaultReactor(); - reactor->Finish(status); - return reactor; -} - -} // namespace network -} // namespace comm diff --git a/services/backup/old/src/Constants.h b/services/backup/old/src/Constants.h deleted file mode 100644 --- a/services/backup/old/src/Constants.h +++ /dev/null @@ -1,26 +0,0 @@ -#pragma once - -#include "GlobalTools.h" -#include "Tools.h" - -#include - -namespace comm { -namespace network { - -const std::string LOG_TABLE_NAME = - tools::decorateTableName("backup-service-log"); -const std::string BACKUP_TABLE_NAME = - tools::decorateTableName("backup-service-backup"); - -// This has to be smaller than GRPC_CHUNK_SIZE_LIMIT because we need to -// recognize if we may receive multiple chunks or just one. If it was larger -// than the chunk limit, once we get the amount of data of size equal to the -// limit, we wouldn't know if we should put this in the database right away or -// wait for more data. -// 400KiB limit (in docs there is KB but they mean KiB) - -// https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/ServiceQuotas.html -const size_t LOG_DATA_SIZE_DATABASE_LIMIT = 1024 * 400; - -} // namespace network -} // namespace comm diff --git a/services/backup/old/src/DatabaseEntities/BackupItem.h b/services/backup/old/src/DatabaseEntities/BackupItem.h deleted file mode 100644 --- a/services/backup/old/src/DatabaseEntities/BackupItem.h +++ /dev/null @@ -1,81 +0,0 @@ -#pragma once - -#include "Item.h" - -#include - -namespace comm { -namespace network { -namespace database { - -/** - * backup - backups assigned to users along with the data necessary to - * decrypt - * `created` - when the backup was created. This is a search key because - * we want to be able to perform effective queries based on this info - * (for example get me the latest backup, get me backup from some day) - * `attachmentHolders` - this is a list of attachment references - * `recoveryData` - data serialized with protobuf which is described by - * one of the following structures: - * { authType: 'password', pakePasswordCiphertext: string, nonce: string } - * { authType: 'wallet', walletAddress: string, rawMessage: string } - * - * this class is used for representing two things: the rows in the main table, - * and also the rows in the secondary index - * - * Needs userID(pk)-created(sk)-index that projects: - * userID - * backupID - * created - * recoveryData - */ -class BackupItem : public Item { - - std::string userID; - std::string backupID; - uint64_t created; - std::string recoveryData; - std::string compactionHolder; - std::string attachmentHolders; - - void validate() const override; - -public: - static const std::string TABLE_NAME; - static const std::string FIELD_USER_ID; - static const std::string FIELD_BACKUP_ID; - static const std::string FIELD_CREATED; - static const std::string FIELD_RECOVERY_DATA; - static const std::string FIELD_COMPACTION_HOLDER; - static const std::string FIELD_ATTACHMENT_HOLDERS; - - BackupItem() { - } - BackupItem( - std::string userID, - std::string backupID, - uint64_t created, - std::string recoveryData, - std::string compactionHolder, - std::string attachmentHolders); - BackupItem(const AttributeValues &itemFromDB); - - void assignItemFromDatabase(const AttributeValues &itemFromDB) override; - - std::string getTableName() const override; - PrimaryKeyDescriptor getPrimaryKeyDescriptor() const override; - PrimaryKeyValue getPrimaryKeyValue() const override; - - std::string getUserID() const; - std::string getBackupID() const; - uint64_t getCreated() const; - std::string getRecoveryData() const; - std::string getCompactionHolder() const; - std::string getAttachmentHolders() const; - - void addAttachmentHolders(const std::string &attachmentHolders); -}; - -} // namespace database -} // namespace network -} // namespace comm diff --git a/services/backup/old/src/DatabaseEntities/BackupItem.cpp b/services/backup/old/src/DatabaseEntities/BackupItem.cpp deleted file mode 100644 --- a/services/backup/old/src/DatabaseEntities/BackupItem.cpp +++ /dev/null @@ -1,122 +0,0 @@ -#include "BackupItem.h" - -#include "Constants.h" -#include "Tools.h" - -namespace comm { -namespace network { -namespace database { - -const std::string BackupItem::FIELD_USER_ID = "userID"; -const std::string BackupItem::FIELD_BACKUP_ID = "backupID"; -const std::string BackupItem::FIELD_CREATED = "created"; -const std::string BackupItem::FIELD_RECOVERY_DATA = "recoveryData"; -const std::string BackupItem::FIELD_COMPACTION_HOLDER = "compactionHolder"; -const std::string BackupItem::FIELD_ATTACHMENT_HOLDERS = "attachmentHolders"; - -const std::string BackupItem::TABLE_NAME = BACKUP_TABLE_NAME; - -BackupItem::BackupItem( - std::string userID, - std::string backupID, - uint64_t created, - std::string recoveryData, - std::string compactionHolder, - std::string attachmentHolders) - : userID(userID), - backupID(backupID), - created(created), - recoveryData(recoveryData), - compactionHolder(compactionHolder), - attachmentHolders(attachmentHolders) { - this->validate(); -} - -BackupItem::BackupItem(const AttributeValues &itemFromDB) { - this->assignItemFromDatabase(itemFromDB); -} - -void BackupItem::validate() const { - if (!this->userID.size()) { - throw std::runtime_error("userID empty"); - } - if (!this->backupID.size()) { - throw std::runtime_error("backupID empty"); - } - if (!this->created) { - throw std::runtime_error("created not provided"); - } - if (!this->recoveryData.size()) { - throw std::runtime_error("recoveryData empty"); - } -} - -void BackupItem::assignItemFromDatabase(const AttributeValues &itemFromDB) { - try { - this->userID = itemFromDB.at(BackupItem::FIELD_USER_ID).GetS(); - this->backupID = itemFromDB.at(BackupItem::FIELD_BACKUP_ID).GetS(); - this->created = std::stoll( - std::string(itemFromDB.at(BackupItem::FIELD_CREATED).GetS()).c_str()); - this->recoveryData = itemFromDB.at(BackupItem::FIELD_RECOVERY_DATA).GetS(); - auto compactionHolder = - itemFromDB.find(BackupItem::FIELD_COMPACTION_HOLDER); - if (compactionHolder != itemFromDB.end()) { - this->compactionHolder = compactionHolder->second.GetS(); - } - auto attachmentsHolders = - itemFromDB.find(BackupItem::FIELD_ATTACHMENT_HOLDERS); - if (attachmentsHolders != itemFromDB.end()) { - this->attachmentHolders = attachmentsHolders->second.GetS(); - } - } catch (std::logic_error &e) { - throw std::runtime_error( - "invalid backup item provided, " + std::string(e.what())); - } - this->validate(); -} - -std::string BackupItem::getTableName() const { - return BackupItem::TABLE_NAME; -} - -PrimaryKeyDescriptor BackupItem::getPrimaryKeyDescriptor() const { - return PrimaryKeyDescriptor( - BackupItem::FIELD_USER_ID, BackupItem::FIELD_BACKUP_ID); -} - -PrimaryKeyValue BackupItem::getPrimaryKeyValue() const { - return PrimaryKeyValue(this->userID, this->backupID); -} - -std::string BackupItem::getUserID() const { - return this->userID; -} - -std::string BackupItem::getBackupID() const { - return this->backupID; -} - -uint64_t BackupItem::getCreated() const { - return this->created; -} - -std::string BackupItem::getRecoveryData() const { - return this->recoveryData; -} - -std::string BackupItem::getCompactionHolder() const { - return this->compactionHolder; -} - -std::string BackupItem::getAttachmentHolders() const { - return this->attachmentHolders; -} - -void BackupItem::addAttachmentHolders(const std::string &attachmentHolders) { - this->attachmentHolders += - tools::validateAttachmentHolders(attachmentHolders); -} - -} // namespace database -} // namespace network -} // namespace comm diff --git a/services/backup/old/src/DatabaseEntities/LogItem.h b/services/backup/old/src/DatabaseEntities/LogItem.h deleted file mode 100644 --- a/services/backup/old/src/DatabaseEntities/LogItem.h +++ /dev/null @@ -1,70 +0,0 @@ -#pragma once - -#include "Item.h" - -#include - -namespace comm { -namespace network { -namespace database { - -/* - * log - a single log record - * `backupID` - id of the backup that this log is assigned to - * `value` - either the value itself which is a dump of a single operation (if - * `persistedInBlob` is false) or the holder to blob (if `persistedInBlob` is - * true) - * `attachmentHolders` - this is a list of attachment references - */ -class LogItem : public Item { - - std::string backupID; - std::string logID; - bool persistedInBlob; - std::string value; - std::string attachmentHolders; - std::string dataHash; - - void validate() const override; - -public: - static const std::string TABLE_NAME; - static const std::string FIELD_BACKUP_ID; - static const std::string FIELD_LOG_ID; - static const std::string FIELD_PERSISTED_IN_BLOB; - static const std::string FIELD_VALUE; - static const std::string FIELD_ATTACHMENT_HOLDERS; - static const std::string FIELD_DATA_HASH; - - LogItem() { - } - LogItem( - const std::string backupID, - const std::string logID, - const bool persistedInBlob, - const std::string value, - std::string attachmentHolders, - const std::string dataHash); - LogItem(const AttributeValues &itemFromDB); - - void assignItemFromDatabase(const AttributeValues &itemFromDB) override; - - std::string getTableName() const override; - PrimaryKeyDescriptor getPrimaryKeyDescriptor() const override; - PrimaryKeyValue getPrimaryKeyValue() const override; - - std::string getBackupID() const; - std::string getLogID() const; - bool getPersistedInBlob() const; - std::string getValue() const; - std::string getAttachmentHolders() const; - std::string getDataHash() const; - - void addAttachmentHolders(const std::string &attachmentHolders); - - static size_t getItemSize(const LogItem *item); -}; - -} // namespace database -} // namespace network -} // namespace comm diff --git a/services/backup/old/src/DatabaseEntities/LogItem.cpp b/services/backup/old/src/DatabaseEntities/LogItem.cpp deleted file mode 100644 --- a/services/backup/old/src/DatabaseEntities/LogItem.cpp +++ /dev/null @@ -1,148 +0,0 @@ -#include "LogItem.h" - -#include "Constants.h" -#include "Tools.h" - -#include - -namespace comm { -namespace network { -namespace database { - -const std::string LogItem::FIELD_BACKUP_ID = "backupID"; -const std::string LogItem::FIELD_LOG_ID = "logID"; -const std::string LogItem::FIELD_PERSISTED_IN_BLOB = "persistedInBlob"; -const std::string LogItem::FIELD_VALUE = "value"; -const std::string LogItem::FIELD_ATTACHMENT_HOLDERS = "attachmentHolders"; -const std::string LogItem::FIELD_DATA_HASH = "dataHash"; - -const std::string LogItem::TABLE_NAME = LOG_TABLE_NAME; - -LogItem::LogItem( - const std::string backupID, - const std::string logID, - const bool persistedInBlob, - const std::string value, - std::string attachmentHolders, - const std::string dataHash) - : backupID(backupID), - logID(logID), - persistedInBlob(persistedInBlob), - value(value), - attachmentHolders(attachmentHolders), - dataHash(dataHash) { - this->validate(); -} - -LogItem::LogItem(const AttributeValues &itemFromDB) { - this->assignItemFromDatabase(itemFromDB); -} - -void LogItem::validate() const { - if (!this->backupID.size()) { - throw std::runtime_error("backupID empty"); - } - if (!this->logID.size()) { - throw std::runtime_error("logID empty"); - } - if (!this->value.size()) { - throw std::runtime_error("value empty"); - } - const size_t itemSize = LogItem::getItemSize(this); - if (!this->persistedInBlob && itemSize > LOG_DATA_SIZE_DATABASE_LIMIT) { - throw std::runtime_error( - "the value of this log is too big to be stored in the database, it " - "should be stored in the blob instead (" + - std::to_string(itemSize) + "/" + - std::to_string(LOG_DATA_SIZE_DATABASE_LIMIT) + ")"); - } - if (!this->dataHash.size()) { - throw std::runtime_error("data hash empty"); - } -} - -void LogItem::assignItemFromDatabase(const AttributeValues &itemFromDB) { - try { - this->backupID = itemFromDB.at(LogItem::FIELD_BACKUP_ID).GetS(); - this->logID = itemFromDB.at(LogItem::FIELD_LOG_ID).GetS(); - this->persistedInBlob = std::stoi( - std::string(itemFromDB.at(LogItem::FIELD_PERSISTED_IN_BLOB).GetS()) - .c_str()); - this->value = itemFromDB.at(LogItem::FIELD_VALUE).GetS(); - auto attachmentsHolders = - itemFromDB.find(LogItem::FIELD_ATTACHMENT_HOLDERS); - if (attachmentsHolders != itemFromDB.end()) { - this->attachmentHolders = attachmentsHolders->second.GetS(); - } - this->dataHash = itemFromDB.at(LogItem::FIELD_DATA_HASH).GetS(); - } catch (std::logic_error &e) { - throw std::runtime_error( - "invalid log item provided, " + std::string(e.what())); - } - this->validate(); -} - -std::string LogItem::getTableName() const { - return LogItem::TABLE_NAME; -} - -PrimaryKeyDescriptor LogItem::getPrimaryKeyDescriptor() const { - return PrimaryKeyDescriptor(LogItem::FIELD_BACKUP_ID, LogItem::FIELD_LOG_ID); -} - -PrimaryKeyValue LogItem::getPrimaryKeyValue() const { - return PrimaryKeyValue(this->backupID, this->logID); -} - -std::string LogItem::getBackupID() const { - return this->backupID; -} - -std::string LogItem::getLogID() const { - return this->logID; -} - -bool LogItem::getPersistedInBlob() const { - return this->persistedInBlob; -} - -std::string LogItem::getValue() const { - return this->value; -} - -std::string LogItem::getAttachmentHolders() const { - return this->attachmentHolders; -} - -std::string LogItem::getDataHash() const { - return this->dataHash; -} - -void LogItem::addAttachmentHolders(const std::string &attachmentHolders) { - this->attachmentHolders += - tools::validateAttachmentHolders(attachmentHolders); -} - -size_t LogItem::getItemSize(const LogItem *item) { - size_t size = 0; - - size += LogItem::FIELD_BACKUP_ID.size(); - size += LogItem::FIELD_LOG_ID.size(); - size += LogItem::FIELD_PERSISTED_IN_BLOB.size(); - size += LogItem::FIELD_VALUE.size(); - size += LogItem::FIELD_ATTACHMENT_HOLDERS.size(); - size += LogItem::FIELD_DATA_HASH.size(); - - size += item->getBackupID().size(); - size += item->getLogID().size(); - size += std::to_string(item->getPersistedInBlob()).size(); - size += item->getValue().size(); - size += item->getAttachmentHolders().size(); - size += item->getDataHash().size(); - - return size; -} - -} // namespace database -} // namespace network -} // namespace comm diff --git a/services/backup/old/src/DatabaseManager.h b/services/backup/old/src/DatabaseManager.h deleted file mode 100644 --- a/services/backup/old/src/DatabaseManager.h +++ /dev/null @@ -1,42 +0,0 @@ -#pragma once - -#include "BackupItem.h" -#include "DatabaseEntitiesTools.h" -#include "DatabaseManagerBase.h" -#include "DynamoDBTools.h" -#include "LogItem.h" - -#include -#include -#include -#include - -#include -#include - -namespace comm { -namespace network { -namespace database { - -// this class should be thread-safe in case any shared resources appear -class DatabaseManager : public DatabaseManagerBase { -public: - static DatabaseManager &getInstance(); - - void putBackupItem(const BackupItem &item); - std::shared_ptr - findBackupItem(const std::string &userID, const std::string &backupID); - std::shared_ptr findLastBackupItem(const std::string &userID); - void removeBackupItem(std::shared_ptr item); - - void putLogItem(const LogItem &item); - std::shared_ptr - findLogItem(const std::string &backupID, const std::string &logID); - std::vector> - findLogItemsForBackup(const std::string &backupID); - void removeLogItem(std::shared_ptr item); -}; - -} // namespace database -} // namespace network -} // namespace comm diff --git a/services/backup/old/src/DatabaseManager.cpp b/services/backup/old/src/DatabaseManager.cpp deleted file mode 100644 --- a/services/backup/old/src/DatabaseManager.cpp +++ /dev/null @@ -1,173 +0,0 @@ -#include "DatabaseManager.h" -#include "Constants.h" -#include "GlobalTools.h" -#include "Tools.h" - -#include -#include -#include -#include -#include - -namespace comm { -namespace network { -namespace database { - -DatabaseManager &DatabaseManager::getInstance() { - static DatabaseManager instance; - return instance; -} - -void DatabaseManager::putBackupItem(const BackupItem &item) { - Aws::DynamoDB::Model::PutItemRequest request; - request.SetTableName(BackupItem::TABLE_NAME); - request.AddItem( - BackupItem::FIELD_USER_ID, - Aws::DynamoDB::Model::AttributeValue(item.getUserID())); - request.AddItem( - BackupItem::FIELD_CREATED, - Aws::DynamoDB::Model::AttributeValue( - std::to_string(tools::getCurrentTimestamp()))); - request.AddItem( - BackupItem::FIELD_BACKUP_ID, - Aws::DynamoDB::Model::AttributeValue(item.getBackupID())); - request.AddItem( - BackupItem::FIELD_RECOVERY_DATA, - Aws::DynamoDB::Model::AttributeValue(item.getRecoveryData())); - request.AddItem( - BackupItem::FIELD_COMPACTION_HOLDER, - Aws::DynamoDB::Model::AttributeValue(item.getCompactionHolder())); - if (!item.getAttachmentHolders().empty()) { - request.AddItem( - BackupItem::FIELD_ATTACHMENT_HOLDERS, - Aws::DynamoDB::Model::AttributeValue(item.getAttachmentHolders())); - } - - this->innerPutItem(std::make_shared(item), request); -} - -std::shared_ptr DatabaseManager::findBackupItem( - const std::string &userID, - const std::string &backupID) { - Aws::DynamoDB::Model::GetItemRequest request; - request.AddKey( - BackupItem::FIELD_USER_ID, Aws::DynamoDB::Model::AttributeValue(userID)); - request.AddKey( - BackupItem::FIELD_BACKUP_ID, - Aws::DynamoDB::Model::AttributeValue(backupID)); - - return this->innerFindItem(request); -} - -std::shared_ptr -DatabaseManager::findLastBackupItem(const std::string &userID) { - std::shared_ptr item = createItemByType(); - - Aws::DynamoDB::Model::QueryRequest req; - req.SetTableName(BackupItem::TABLE_NAME); - req.SetKeyConditionExpression(BackupItem::FIELD_USER_ID + " = :valueToMatch"); - - AttributeValues attributeValues; - attributeValues.emplace(":valueToMatch", userID); - - req.SetExpressionAttributeValues(attributeValues); - req.SetIndexName("userID-created-index"); - - req.SetLimit(1); - req.SetScanIndexForward(false); - - const Aws::DynamoDB::Model::QueryOutcome &outcome = - getDynamoDBClient()->Query(req); - if (!outcome.IsSuccess()) { - throw std::runtime_error(outcome.GetError().GetMessage()); - } - const Aws::Vector &items = outcome.GetResult().GetItems(); - if (items.empty()) { - return nullptr; - } - return std::make_shared(items[0]); -} - -void DatabaseManager::removeBackupItem(std::shared_ptr item) { - if (item == nullptr) { - return; - } - this->innerRemoveItem(*item); -} - -void DatabaseManager::putLogItem(const LogItem &item) { - Aws::DynamoDB::Model::PutItemRequest request; - request.SetTableName(LogItem::TABLE_NAME); - request.AddItem( - LogItem::FIELD_BACKUP_ID, - Aws::DynamoDB::Model::AttributeValue(item.getBackupID())); - request.AddItem( - LogItem::FIELD_LOG_ID, - Aws::DynamoDB::Model::AttributeValue(item.getLogID())); - request.AddItem( - LogItem::FIELD_PERSISTED_IN_BLOB, - Aws::DynamoDB::Model::AttributeValue( - std::to_string(item.getPersistedInBlob()))); - request.AddItem( - LogItem::FIELD_VALUE, - Aws::DynamoDB::Model::AttributeValue(item.getValue())); - if (!item.getAttachmentHolders().empty()) { - request.AddItem( - LogItem::FIELD_ATTACHMENT_HOLDERS, - Aws::DynamoDB::Model::AttributeValue(item.getAttachmentHolders())); - } - request.AddItem( - LogItem::FIELD_DATA_HASH, - Aws::DynamoDB::Model::AttributeValue(item.getDataHash())); - this->innerPutItem(std::make_shared(item), request); -} - -std::shared_ptr DatabaseManager::findLogItem( - const std::string &backupID, - const std::string &logID) { - Aws::DynamoDB::Model::GetItemRequest request; - request.AddKey( - LogItem::FIELD_BACKUP_ID, Aws::DynamoDB::Model::AttributeValue(backupID)); - request.AddKey( - LogItem::FIELD_LOG_ID, Aws::DynamoDB::Model::AttributeValue(logID)); - - return this->innerFindItem(request); -} - -std::vector> -DatabaseManager::findLogItemsForBackup(const std::string &backupID) { - std::vector> result; - std::shared_ptr item = createItemByType(); - - Aws::DynamoDB::Model::QueryRequest req; - req.SetTableName(LogItem::TABLE_NAME); - req.SetKeyConditionExpression(LogItem::FIELD_BACKUP_ID + " = :valueToMatch"); - - AttributeValues attributeValues; - attributeValues.emplace(":valueToMatch", backupID); - - req.SetExpressionAttributeValues(attributeValues); - - const Aws::DynamoDB::Model::QueryOutcome &outcome = - getDynamoDBClient()->Query(req); - if (!outcome.IsSuccess()) { - throw std::runtime_error(outcome.GetError().GetMessage()); - } - const Aws::Vector &items = outcome.GetResult().GetItems(); - for (auto &item : items) { - result.push_back(std::make_shared(item)); - } - - return result; -} - -void DatabaseManager::removeLogItem(std::shared_ptr item) { - if (item == nullptr) { - return; - } - this->innerRemoveItem(*item); -} - -} // namespace database -} // namespace network -} // namespace comm diff --git a/services/backup/old/src/Reactors/server/AddAttachmentsUtility.h b/services/backup/old/src/Reactors/server/AddAttachmentsUtility.h deleted file mode 100644 --- a/services/backup/old/src/Reactors/server/AddAttachmentsUtility.h +++ /dev/null @@ -1,25 +0,0 @@ -#pragma once - -#include "LogItem.h" - -#include "backup.grpc.pb.h" -#include "backup.pb.h" - -#include -#include - -namespace comm { -namespace network { -namespace reactor { - -class AddAttachmentsUtility { - std::shared_ptr - moveToS3(std::shared_ptr logItem); - -public: - grpc::Status processRequest(const backup::AddAttachmentsRequest *request); -}; - -} // namespace reactor -} // namespace network -} // namespace comm diff --git a/services/backup/old/src/Reactors/server/AddAttachmentsUtility.cpp b/services/backup/old/src/Reactors/server/AddAttachmentsUtility.cpp deleted file mode 100644 --- a/services/backup/old/src/Reactors/server/AddAttachmentsUtility.cpp +++ /dev/null @@ -1,102 +0,0 @@ -#include "AddAttachmentsUtility.h" - -#include "blob_client/src/lib.rs.h" - -#include - -#include "BackupItem.h" -#include "Constants.h" -#include "DatabaseManager.h" -#include "Tools.h" - -namespace comm { -namespace network { -namespace reactor { - -grpc::Status AddAttachmentsUtility::processRequest( - const backup::AddAttachmentsRequest *request) { - grpc::Status status = grpc::Status::OK; - std::string userID = request->userid(); - std::string backupID = request->backupid(); - std::string logID = request->logid(); - const std::string holders = request->holders(); - try { - if (userID.empty()) { - throw std::runtime_error("user id required but not provided"); - } - if (backupID.empty()) { - throw std::runtime_error("backup id required but not provided"); - } - if (holders.empty()) { - throw std::runtime_error("holders required but not provided"); - } - - if (logID.empty()) { - // add these attachments to backup - std::shared_ptr backupItem = - database::DatabaseManager::getInstance().findBackupItem( - userID, backupID); - backupItem->addAttachmentHolders(holders); - database::DatabaseManager::getInstance().putBackupItem(*backupItem); - } else { - // add these attachments to log - std::shared_ptr logItem = - database::DatabaseManager::getInstance().findLogItem(backupID, logID); - logItem->addAttachmentHolders(holders); - if (!logItem->getPersistedInBlob() && - database::LogItem::getItemSize(logItem.get()) > - LOG_DATA_SIZE_DATABASE_LIMIT) { - bool old = logItem->getPersistedInBlob(); - logItem = this->moveToS3(logItem); - } - database::DatabaseManager::getInstance().putLogItem(*logItem); - } - } catch (std::exception &e) { - LOG(ERROR) << e.what(); - status = grpc::Status(grpc::StatusCode::INTERNAL, e.what()); - } - return status; -} - -std::shared_ptr -AddAttachmentsUtility::moveToS3(std::shared_ptr logItem) { - std::string holder = tools::generateHolder( - logItem->getDataHash(), logItem->getBackupID(), logItem->getLogID()); - std::string data = std::move(logItem->getValue()); - std::shared_ptr newLogItem = - std::make_shared( - logItem->getBackupID(), - logItem->getLogID(), - true, - holder, - logItem->getAttachmentHolders(), - logItem->getDataHash()); - // put into S3 - std::condition_variable blobPutDoneCV; - std::mutex blobPutDoneCVMutex; - put_client_initialize_cxx(rust::String(holder)); - put_client_write_cxx( - rust::String(holder), - tools::getBlobPutField(blob::PutRequest::DataCase::kHolder), - holder.c_str()); - put_client_blocking_read_cxx(rust::String(holder)); - put_client_write_cxx( - rust::String(holder), - tools::getBlobPutField(blob::PutRequest::DataCase::kBlobHash), - newLogItem->getDataHash().c_str()); - rust::String responseStr = put_client_blocking_read_cxx(rust::String(holder)); - // data exists? - if (!(bool)tools::charPtrToInt(responseStr.c_str())) { - put_client_write_cxx( - rust::String(holder), - tools::getBlobPutField(blob::PutRequest::DataCase::kDataChunk), - std::move(data).c_str()); - put_client_blocking_read_cxx(rust::String(holder)); - } - put_client_terminate_cxx(rust::String(holder)); - return newLogItem; -} - -} // namespace reactor -} // namespace network -} // namespace comm diff --git a/services/backup/old/src/Reactors/server/CreateNewBackupReactor.h b/services/backup/old/src/Reactors/server/CreateNewBackupReactor.h deleted file mode 100644 --- a/services/backup/old/src/Reactors/server/CreateNewBackupReactor.h +++ /dev/null @@ -1,52 +0,0 @@ -#pragma once - -#include "backup.grpc.pb.h" -#include "backup.pb.h" - -#include "ServerBidiReactorBase.h" - -#include -#include -#include -#include - -namespace comm { -namespace network { -namespace reactor { - -class CreateNewBackupReactor : public ServerBidiReactorBase< - backup::CreateNewBackupRequest, - backup::CreateNewBackupResponse> { - enum class State { - USER_ID = 1, - DEVICE_ID = 2, - KEY_ENTROPY = 3, - DATA_HASH = 4, - DATA_CHUNKS = 5, - }; - - State state = State::USER_ID; - std::string userID; - std::string deviceID; - std::string keyEntropy; - std::string dataHash; - std::string holder; - std::string backupID; - - std::mutex reactorStateMutex; - - std::condition_variable blobPutDoneCV; - std::mutex blobPutDoneCVMutex; - - std::string generateBackupID(); - -public: - std::unique_ptr handleRequest( - backup::CreateNewBackupRequest request, - backup::CreateNewBackupResponse *response) override; - void terminateCallback() override; -}; - -} // namespace reactor -} // namespace network -} // namespace comm diff --git a/services/backup/old/src/Reactors/server/CreateNewBackupReactor.cpp b/services/backup/old/src/Reactors/server/CreateNewBackupReactor.cpp deleted file mode 100644 --- a/services/backup/old/src/Reactors/server/CreateNewBackupReactor.cpp +++ /dev/null @@ -1,124 +0,0 @@ -#include "CreateNewBackupReactor.h" - -#include "DatabaseManager.h" -#include "GlobalTools.h" -#include "Tools.h" - -#include "blob_client/src/lib.rs.h" - -namespace comm { -namespace network { -namespace reactor { - -std::string CreateNewBackupReactor::generateBackupID() { - if (this->deviceID.empty()) { - throw std::runtime_error( - "trying to generate a backup ID with an empty device ID"); - } - return this->deviceID + std::to_string(tools::getCurrentTimestamp()); -} - -std::unique_ptr CreateNewBackupReactor::handleRequest( - backup::CreateNewBackupRequest request, - backup::CreateNewBackupResponse *response) { - // we make sure that the blob client's state is flushed to the main memory - // as there may be multiple threads from the pool taking over here - const std::lock_guard lock(this->reactorStateMutex); - switch (this->state) { - case State::USER_ID: { - if (!request.has_userid()) { - throw std::runtime_error("user id expected but not received"); - } - this->userID = request.userid(); - this->state = State::DEVICE_ID; - return nullptr; - } - case State::DEVICE_ID: { - if (!request.has_deviceid()) { - throw std::runtime_error("device id expected but not received"); - } - this->deviceID = request.deviceid(); - this->state = State::KEY_ENTROPY; - return nullptr; - } - case State::KEY_ENTROPY: { - if (!request.has_keyentropy()) { - throw std::runtime_error( - "backup key entropy expected but not received"); - } - this->keyEntropy = request.keyentropy(); - this->state = State::DATA_HASH; - return nullptr; - } - case State::DATA_HASH: { - if (!request.has_newcompactionhash()) { - throw std::runtime_error("data hash expected but not received"); - } - this->dataHash = request.newcompactionhash(); - this->state = State::DATA_CHUNKS; - - this->backupID = this->generateBackupID(); - if (database::DatabaseManager::getInstance().findBackupItem( - this->userID, this->backupID) != nullptr) { - throw std::runtime_error( - "Backup with id [" + this->backupID + "] for user [" + - this->userID + "] already exists, creation aborted"); - } - response->set_backupid(this->backupID); - this->holder = tools::generateHolder(this->dataHash, this->backupID); - put_client_initialize_cxx(rust::String(this->holder)); - put_client_write_cxx( - rust::String(this->holder), - tools::getBlobPutField(blob::PutRequest::DataCase::kHolder), - this->holder.c_str()); - put_client_blocking_read_cxx(rust::String(this->holder)); - put_client_write_cxx( - rust::String(this->holder), - tools::getBlobPutField(blob::PutRequest::DataCase::kBlobHash), - this->dataHash.c_str()); - - rust::String responseStr = - put_client_blocking_read_cxx(rust::String(this->holder)); - // data exists? - if ((bool)tools::charPtrToInt(responseStr.c_str())) { - return std::make_unique( - grpc::Status::OK, true); - } - return nullptr; - } - case State::DATA_CHUNKS: { - if (request.mutable_newcompactionchunk()->empty()) { - return std::make_unique(grpc::Status::OK); - } - put_client_write_cxx( - rust::String(this->holder), - tools::getBlobPutField(blob::PutRequest::DataCase::kDataChunk), - std::string(std::move(*request.mutable_newcompactionchunk())) - .c_str()); - put_client_blocking_read_cxx(rust::String(this->holder)); - - return nullptr; - } - } - throw std::runtime_error("new backup - invalid state"); -} - -void CreateNewBackupReactor::terminateCallback() { - const std::lock_guard lock(this->reactorStateMutex); - put_client_terminate_cxx(rust::String(this->holder)); - - // TODO add recovery data - // TODO handle attachments holders - database::BackupItem backupItem( - this->userID, - this->backupID, - tools::getCurrentTimestamp(), - tools::generateRandomString(), - this->holder, - {}); - database::DatabaseManager::getInstance().putBackupItem(backupItem); -} - -} // namespace reactor -} // namespace network -} // namespace comm diff --git a/services/backup/old/src/Reactors/server/PullBackupReactor.h b/services/backup/old/src/Reactors/server/PullBackupReactor.h deleted file mode 100644 --- a/services/backup/old/src/Reactors/server/PullBackupReactor.h +++ /dev/null @@ -1,66 +0,0 @@ -#pragma once - -#include "BackupItem.h" -#include "DatabaseEntitiesTools.h" -#include "GlobalConstants.h" -#include "LogItem.h" - -#include -#include - -#include "ServerWriteReactorBase.h" - -#include - -#include -#include -#include - -namespace comm { -namespace network { -namespace reactor { - -class PullBackupReactor : public ServerWriteReactorBase< - backup::PullBackupRequest, - backup::PullBackupResponse> { - - enum class State { - COMPACTION = 1, - COMPACTION_ATTACHMENTS = 2, - LOGS = 3, - LOG_ATTACHMENTS = 4, - }; - - std::shared_ptr backupItem; - std::mutex reactorStateMutex; - State state = State::COMPACTION; - std::vector> logs; - size_t currentLogIndex = 0; - std::shared_ptr currentLog; - std::string internalBuffer; - std::string previousLogID; - bool endOfQueue = false; - bool clientInitialized = false; - std::unique_ptr currentLogHolder; - - const size_t chunkLimit = - GRPC_CHUNK_SIZE_LIMIT - GRPC_METADATA_SIZE_PER_MESSAGE; - - void initializeGetReactor(const std::string &holder); - void nextLog(); - std::string - prepareDataChunkWithPadding(const std::string &dataChunk, size_t padding); - -public: - PullBackupReactor(const backup::PullBackupRequest *request); - - void initialize() override; - - std::unique_ptr - writeResponse(backup::PullBackupResponse *response) override; - void terminateCallback() override; -}; - -} // namespace reactor -} // namespace network -} // namespace comm diff --git a/services/backup/old/src/Reactors/server/PullBackupReactor.cpp b/services/backup/old/src/Reactors/server/PullBackupReactor.cpp deleted file mode 100644 --- a/services/backup/old/src/Reactors/server/PullBackupReactor.cpp +++ /dev/null @@ -1,216 +0,0 @@ -#include "PullBackupReactor.h" - -#include "blob_client/src/lib.rs.h" - -#include "DatabaseManager.h" - -namespace comm { -namespace network { -namespace reactor { - -PullBackupReactor::PullBackupReactor(const backup::PullBackupRequest *request) - : ServerWriteReactorBase< - backup::PullBackupRequest, - backup::PullBackupResponse>(request) { -} - -void PullBackupReactor::initializeGetReactor(const std::string &holder) { - if (this->backupItem == nullptr) { - throw std::runtime_error( - "get reactor cannot be initialized when backup item is missing"); - } - get_client_initialize_cxx(rust::String(holder)); - this->clientInitialized = true; -} - -void PullBackupReactor::initialize() { - // we make sure that the blob client's state is flushed to the main memory - // as there may be multiple threads from the pool taking over here - const std::lock_guard lock(this->reactorStateMutex); - if (this->request.userid().empty()) { - throw std::runtime_error("no user id provided"); - } - if (this->request.backupid().empty()) { - throw std::runtime_error("no backup id provided"); - } - this->backupItem = database::DatabaseManager::getInstance().findBackupItem( - this->request.userid(), this->request.backupid()); - if (this->backupItem == nullptr) { - throw std::runtime_error( - "no backup found for provided parameters: user id [" + - this->request.userid() + "], backup id [" + this->request.backupid() + - "]"); - } - this->logs = database::DatabaseManager::getInstance().findLogItemsForBackup( - this->request.backupid()); -} - -std::unique_ptr -PullBackupReactor::writeResponse(backup::PullBackupResponse *response) { - // we make sure that the blob client's state is flushed to the main memory - // as there may be multiple threads from the pool taking over here - const std::lock_guard lock(this->reactorStateMutex); - response->set_attachmentholders(""); - response->set_backupid(""); - size_t extraBytesNeeded = 0; - if (this->state == State::COMPACTION) { - response->set_backupid(this->backupItem->getBackupID()); - extraBytesNeeded += database::BackupItem::FIELD_BACKUP_ID.size(); - extraBytesNeeded += this->backupItem->getBackupID().size(); - - if (!this->clientInitialized) { - extraBytesNeeded += database::BackupItem::FIELD_ATTACHMENT_HOLDERS.size(); - extraBytesNeeded += this->backupItem->getAttachmentHolders().size(); - response->set_attachmentholders(this->backupItem->getAttachmentHolders()); - this->initializeGetReactor(this->backupItem->getCompactionHolder()); - } - std::string dataChunk; - if (this->internalBuffer.size() < this->chunkLimit) { - rust::Vec responseVec = get_client_blocking_read_cxx( - this->backupItem->getCompactionHolder().c_str()); - dataChunk = (responseVec.empty()) - ? "" - : std::string(reinterpret_cast(responseVec.data())); - dataChunk.resize(responseVec.size()); - } - if (!dataChunk.empty() || - this->internalBuffer.size() + extraBytesNeeded >= this->chunkLimit) { - dataChunk = - this->prepareDataChunkWithPadding(dataChunk, extraBytesNeeded); - response->set_compactionchunk(dataChunk); - return nullptr; - } - this->state = State::LOGS; - if (!this->internalBuffer.empty()) { - response->set_compactionchunk(std::move(this->internalBuffer)); - return nullptr; - } - } - if (this->state == State::LOGS) { - // TODO make sure logs are received in correct order regardless their size - if (this->logs.empty()) { - // this means that there are no logs at all so we just terminate with - // the compaction - return std::make_unique(grpc::Status::OK); - } - if (this->currentLogIndex == this->logs.size()) { - if (!this->internalBuffer.empty()) { - response->set_logid(this->previousLogID); - response->set_logchunk(std::move(this->internalBuffer)); - return nullptr; - } - return std::make_unique(grpc::Status::OK); - } - if (this->currentLogIndex > this->logs.size()) { - // we went out of the scope of the logs collection, this should never - // happen and should be perceived as an error - throw std::runtime_error("log index out of bound"); - } - // this means that we're not reading anything between invocations of - // writeResponse - // it is only not null when we read data in chunks - if (this->currentLog == nullptr) { - this->currentLog = this->logs.at(this->currentLogIndex); - extraBytesNeeded += database::LogItem::FIELD_LOG_ID.size(); - extraBytesNeeded += this->currentLog->getLogID().size(); - - response->set_attachmentholders(this->currentLog->getAttachmentHolders()); - extraBytesNeeded += database::LogItem::FIELD_ATTACHMENT_HOLDERS.size(); - extraBytesNeeded += this->currentLog->getAttachmentHolders().size(); - - if (this->currentLog->getPersistedInBlob()) { - // if the item is stored in the blob, we initialize the get reactor - // and proceed - this->initializeGetReactor(this->currentLog->getValue()); - this->currentLogHolder = - std::make_unique(this->currentLog->getValue()); - } else { - // if the item is persisted in the database, we just take it, send the - // data to the client and reset currentLog so the next invocation of - // writeResponse will take another one from the collection - response->set_logid(this->currentLog->getLogID()); - response->set_logchunk(this->currentLog->getValue()); - this->nextLog(); - return nullptr; - } - } else { - extraBytesNeeded += database::LogItem::FIELD_LOG_ID.size(); - extraBytesNeeded += this->currentLog->getLogID().size(); - } - response->set_backupid(this->currentLog->getBackupID()); - response->set_logid(this->currentLog->getLogID()); - // we want to read the chunks from the blob through the get client until - // we get an empty chunk - a sign of "end of chunks" - std::string dataChunk; - if (this->internalBuffer.size() < this->chunkLimit && !this->endOfQueue) { - rust::Vec responseVec = - get_client_blocking_read_cxx(this->currentLogHolder->c_str()); - dataChunk = (responseVec.empty()) - ? "" - : std::string(reinterpret_cast(responseVec.data())); - dataChunk.resize(responseVec.size()); - } - this->endOfQueue = this->endOfQueue || (dataChunk.size() == 0); - dataChunk = this->prepareDataChunkWithPadding(dataChunk, extraBytesNeeded); - // if we get an empty chunk, we reset the currentLog so we can read the - // next one from the logs collection. - // If there's data inside, we write it to the client and proceed. - if (dataChunk.empty()) { - this->nextLog(); - } else { - response->set_logchunk(dataChunk); - } - return nullptr; - } - - throw std::runtime_error("unhandled state"); -} - -void PullBackupReactor::nextLog() { - ++this->currentLogIndex; - this->previousLogID = this->currentLog->getLogID(); - this->currentLog = nullptr; - this->endOfQueue = false; - if (this->currentLogHolder != nullptr) { - get_client_terminate_cxx(this->currentLogHolder->c_str()); - this->currentLogHolder = nullptr; - } -} - -std::string PullBackupReactor::prepareDataChunkWithPadding( - const std::string &dataChunk, - size_t padding) { - if (dataChunk.size() > this->chunkLimit) { - throw std::runtime_error(std::string( - "received data chunk bigger than the chunk limit: " + - std::to_string(dataChunk.size()) + "/" + - std::to_string(this->chunkLimit))); - } - - std::string chunk = std::move(this->internalBuffer) + dataChunk; - const size_t realSize = chunk.size() + padding; - if (realSize <= this->chunkLimit) { - return chunk; - } - const size_t bytesToStash = realSize - this->chunkLimit; - this->internalBuffer = std::string(chunk.end() - bytesToStash, chunk.end()); - chunk.resize(chunk.size() - bytesToStash); - if (chunk.size() > this->chunkLimit) { - throw std::runtime_error("new data chunk incorrectly calculated"); - } - - return chunk; -} - -void PullBackupReactor::terminateCallback() { - const std::lock_guard lock(this->reactorStateMutex); - get_client_terminate_cxx(this->backupItem->getCompactionHolder().c_str()); - if (!this->getStatusHolder()->getStatus().ok()) { - throw std::runtime_error( - this->getStatusHolder()->getStatus().error_message()); - } -} - -} // namespace reactor -} // namespace network -} // namespace comm diff --git a/services/backup/old/src/Reactors/server/RecoverBackupKeyReactor.h b/services/backup/old/src/Reactors/server/RecoverBackupKeyReactor.h deleted file mode 100644 --- a/services/backup/old/src/Reactors/server/RecoverBackupKeyReactor.h +++ /dev/null @@ -1,33 +0,0 @@ -#pragma once - -#include "ServerBidiReactorBase.h" - -#include "backup.grpc.pb.h" -#include "backup.pb.h" - -#include -#include - -namespace comm { -namespace network { -namespace reactor { - -class RecoverBackupKeyReactor : public ServerBidiReactorBase< - backup::RecoverBackupKeyRequest, - backup::RecoverBackupKeyResponse> { -public: - std::unique_ptr handleRequest( - backup::RecoverBackupKeyRequest request, - backup::RecoverBackupKeyResponse *response); -}; - -std::unique_ptr RecoverBackupKeyReactor::handleRequest( - backup::RecoverBackupKeyRequest request, - backup::RecoverBackupKeyResponse *response) { // TODO handle request - return std::make_unique( - grpc::Status(grpc::StatusCode::UNIMPLEMENTED, "unimplemented")); -} - -} // namespace reactor -} // namespace network -} // namespace comm diff --git a/services/backup/old/src/Reactors/server/SendLogReactor.h b/services/backup/old/src/Reactors/server/SendLogReactor.h deleted file mode 100644 --- a/services/backup/old/src/Reactors/server/SendLogReactor.h +++ /dev/null @@ -1,61 +0,0 @@ -#pragma once - -#include "LogItem.h" -#include "ServerReadReactorBase.h" - -#include "backup.grpc.pb.h" -#include "backup.pb.h" - -#include -#include - -namespace comm { -namespace network { -namespace reactor { - -class SendLogReactor : public ServerReadReactorBase< - backup::SendLogRequest, - backup::SendLogResponse> { - enum class State { - USER_ID = 1, - BACKUP_ID = 2, - LOG_HASH = 3, - LOG_CHUNK = 4, - }; - - enum class PersistenceMethod { - UNKNOWN = 0, - DB = 1, - BLOB = 2, - }; - - State state = State::USER_ID; - PersistenceMethod persistenceMethod = PersistenceMethod::UNKNOWN; - std::string userID; - std::string logID; - std::string backupID; - std::string hash; - std::string blobHolder; - std::string value; - std::mutex reactorStateMutex; - - std::condition_variable blobPutDoneCV; - std::mutex blobPutDoneCVMutex; - - void storeInDatabase(); - std::string generateLogID(const std::string &backupID); - void initializePutClient(); - -public: - using ServerReadReactorBase:: - ServerReadReactorBase; - - std::unique_ptr - readRequest(backup::SendLogRequest request) override; - void doneCallback() override; - void terminateCallback() override; -}; - -} // namespace reactor -} // namespace network -} // namespace comm diff --git a/services/backup/old/src/Reactors/server/SendLogReactor.cpp b/services/backup/old/src/Reactors/server/SendLogReactor.cpp deleted file mode 100644 --- a/services/backup/old/src/Reactors/server/SendLogReactor.cpp +++ /dev/null @@ -1,178 +0,0 @@ -#include "SendLogReactor.h" - -#include "blob_client/src/lib.rs.h" - -#include "Constants.h" -#include "DatabaseManager.h" -#include "GlobalTools.h" -#include "Tools.h" - -namespace comm { -namespace network { -namespace reactor { - -void SendLogReactor::storeInDatabase() { - bool storedInBlob = this->persistenceMethod == PersistenceMethod::BLOB; - database::LogItem logItem( - this->backupID, - this->logID, - storedInBlob, - storedInBlob ? this->blobHolder : this->value, - {}, - this->hash); - if (database::LogItem::getItemSize(&logItem) > LOG_DATA_SIZE_DATABASE_LIMIT) { - throw std::runtime_error( - "trying to put into the database an item with size " + - std::to_string(database::LogItem::getItemSize(&logItem)) + - " that exceeds the limit " + - std::to_string(LOG_DATA_SIZE_DATABASE_LIMIT)); - } - database::DatabaseManager::getInstance().putLogItem(logItem); -} - -std::string SendLogReactor::generateLogID(const std::string &backupID) { - return backupID + tools::ID_SEPARATOR + tools::generateUUID(); -} - -void SendLogReactor::initializePutClient() { - if (this->blobHolder.empty()) { - throw std::runtime_error( - "put reactor cannot be initialized with empty blob holder"); - } - if (this->hash.empty()) { - throw std::runtime_error( - "put reactor cannot be initialized with empty hash"); - } - put_client_initialize_cxx(rust::String(this->blobHolder)); -} - -std::unique_ptr -SendLogReactor::readRequest(backup::SendLogRequest request) { - // we make sure that the blob client's state is flushed to the main memory - // as there may be multiple threads from the pool taking over here - const std::lock_guard lock(this->reactorStateMutex); - switch (this->state) { - case State::USER_ID: { - if (!request.has_userid()) { - throw std::runtime_error("user id expected but not received"); - } - this->userID = request.userid(); - this->state = State::BACKUP_ID; - return nullptr; - }; - case State::BACKUP_ID: { - if (!request.has_backupid()) { - throw std::runtime_error("backup id expected but not received"); - } - this->backupID = request.backupid(); - if (database::DatabaseManager::getInstance().findBackupItem( - this->userID, this->backupID) == nullptr) { - throw std::runtime_error( - "trying to send log for a non-existent backup"); - } - this->logID = this->generateLogID(this->backupID); - this->response->set_logcheckpoint(this->logID); - this->state = State::LOG_HASH; - return nullptr; - }; - case State::LOG_HASH: { - if (!request.has_loghash()) { - throw std::runtime_error("log hash expected but not received"); - } - this->hash = request.loghash(); - this->state = State::LOG_CHUNK; - return nullptr; - }; - case State::LOG_CHUNK: { - if (!request.has_logdata()) { - throw std::runtime_error("log data expected but not received"); - } - if (request.mutable_logdata()->size() == 0) { - return std::make_unique(grpc::Status::OK); - } - if (this->persistenceMethod == PersistenceMethod::DB) { - throw std::runtime_error( - "please do not send multiple tiny chunks (less than " + - std::to_string(LOG_DATA_SIZE_DATABASE_LIMIT) + - "), merge them into bigger parts instead"); - } - if (this->persistenceMethod == PersistenceMethod::BLOB) { - put_client_write_cxx( - rust::String(this->blobHolder), - tools::getBlobPutField(blob::PutRequest::DataCase::kDataChunk), - request.mutable_logdata()->c_str()); - put_client_blocking_read_cxx(this->blobHolder.c_str()); - - return nullptr; - } - this->value += std::move(*request.mutable_logdata()); - database::LogItem logItem = database::LogItem( - this->backupID, this->logID, true, this->value, "", this->hash); - if (database::LogItem::getItemSize(&logItem) > - LOG_DATA_SIZE_DATABASE_LIMIT) { - this->persistenceMethod = PersistenceMethod::BLOB; - this->blobHolder = - tools::generateHolder(this->hash, this->backupID, this->logID); - this->initializePutClient(); - put_client_write_cxx( - rust::String(this->blobHolder), - tools::getBlobPutField(blob::PutRequest::DataCase::kHolder), - this->blobHolder.c_str()); - put_client_blocking_read_cxx(this->blobHolder.c_str()); - put_client_write_cxx( - rust::String(this->blobHolder), - tools::getBlobPutField(blob::PutRequest::DataCase::kBlobHash), - this->hash.c_str()); - rust::String responseStr = - put_client_blocking_read_cxx(this->blobHolder.c_str()); - // data exists? - if ((bool)tools::charPtrToInt(responseStr.c_str())) { - return std::make_unique(grpc::Status::OK); - } - put_client_write_cxx( - rust::String(this->blobHolder), - tools::getBlobPutField(blob::PutRequest::DataCase::kDataChunk), - std::move(this->value).c_str()); - put_client_blocking_read_cxx(this->blobHolder.c_str()); - this->value = ""; - } else { - this->persistenceMethod = PersistenceMethod::DB; - } - return nullptr; - }; - } - throw std::runtime_error("send log - invalid state"); -} - -void SendLogReactor::terminateCallback() { - const std::lock_guard lock(this->reactorStateMutex); - put_client_terminate_cxx(rust::String(this->blobHolder)); - - if (!this->getStatusHolder()->getStatus().ok()) { - throw std::runtime_error( - this->getStatusHolder()->getStatus().error_message()); - } - - if (this->persistenceMethod != PersistenceMethod::BLOB && - this->persistenceMethod != PersistenceMethod::DB) { - throw std::runtime_error("Invalid persistence method detected"); - } - - if (this->persistenceMethod == PersistenceMethod::DB) { - this->storeInDatabase(); - return; - } - // store in db only when we successfully upload chunks - this->storeInDatabase(); -} - -void SendLogReactor::doneCallback() { - // we make sure that the blob client's state is flushed to the main memory - // as there may be multiple threads from the pool taking over here - const std::lock_guard lock(this->reactorStateMutex); - // TODO implement -} - -} // namespace reactor -} // namespace network -} // namespace comm diff --git a/services/backup/old/src/Tools.h b/services/backup/old/src/Tools.h deleted file mode 100644 --- a/services/backup/old/src/Tools.h +++ /dev/null @@ -1,26 +0,0 @@ -#pragma once - -#include - -#include - -namespace comm { -namespace network { -namespace tools { - -std::string generateRandomString(std::size_t length = 20); - -std::string generateHolder( - const std::string &blobHash, - const std::string &backupID, - const std::string &resourceID = ""); - -std::string validateAttachmentHolders(const std::string &holders); - -int charPtrToInt(const char *str); - -size_t getBlobPutField(blob::PutRequest::DataCase field); - -} // namespace tools -} // namespace network -} // namespace comm diff --git a/services/backup/old/src/Tools.cpp b/services/backup/old/src/Tools.cpp deleted file mode 100644 --- a/services/backup/old/src/Tools.cpp +++ /dev/null @@ -1,71 +0,0 @@ -#include "Tools.h" - -#include "GlobalConstants.h" -#include "GlobalTools.h" - -#include -#include -#include -#include -#include - -namespace comm { -namespace network { -namespace tools { - -std::string generateRandomString(std::size_t length) { - const std::string CHARACTERS = - "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"; - thread_local std::random_device generator; - std::uniform_int_distribution<> distribution(0, CHARACTERS.size() - 1); - std::string random_string; - for (std::size_t i = 0; i < length; ++i) { - random_string += CHARACTERS[distribution(generator)]; - } - return random_string; -} - -std::string generateHolder( - const std::string &blobHash, - const std::string &backupID, - const std::string &resourceID) { - return backupID + ID_SEPARATOR + resourceID + ID_SEPARATOR + blobHash + - ID_SEPARATOR + tools::generateUUID(); -} - -std::string validateAttachmentHolders(const std::string &holders) { - std::stringstream stream(holders); - std::string item; - std::string result; - - while (std::getline(stream, item, ATTACHMENT_DELIMITER)) { - if (item.empty()) { - throw std::runtime_error("empty holder detected"); - } - result += item; - result += ATTACHMENT_DELIMITER; - } - if (result.empty()) { - throw std::runtime_error("parse attachment holders failed"); - } - - return result; -} - -int charPtrToInt(const char *str) { - unsigned int intValue; - std::stringstream strValue; - - strValue << str; - strValue >> intValue; - - return intValue; -} - -size_t getBlobPutField(blob::PutRequest::DataCase field) { - return static_cast(field); -} - -} // namespace tools -} // namespace network -} // namespace comm diff --git a/services/backup/old/src/server.cpp b/services/backup/old/src/server.cpp deleted file mode 100644 --- a/services/backup/old/src/server.cpp +++ /dev/null @@ -1,42 +0,0 @@ -#include "BackupServiceImpl.h" - -#include "GlobalConstants.h" -#include "GlobalTools.h" - -#include -#include - -#include - -namespace comm { -namespace network { - -void RunServer() { - BackupServiceImpl backupService; - - grpc::EnableDefaultHealthCheckService(true); - grpc::ServerBuilder builder; - // Listen on the given address without any authentication mechanism. - builder.AddListeningPort( - SERVER_LISTEN_ADDRESS, grpc::InsecureServerCredentials()); - // Register "service" as the instance through which we'll communicate with - // clients. In this case it corresponds to an *synchronous* service. - builder.RegisterService(&backupService); - // Finally assemble the server. - std::unique_ptr server(builder.BuildAndStart()); - LOG(INFO) << "server listening at :" << SERVER_LISTEN_ADDRESS; - - // Wait for the server to shutdown. Note that some other thread must be - // responsible for shutting down the server for this call to ever return. - server->Wait(); -} - -} // namespace network -} // namespace comm - -int main(int argc, char **argv) { - comm::network::tools::InitLogging("backup"); - comm::network::RunServer(); - - return 0; -} diff --git a/services/backup/old/test/BackupTest.cpp b/services/backup/old/test/BackupTest.cpp deleted file mode 100644 --- a/services/backup/old/test/BackupTest.cpp +++ /dev/null @@ -1,20 +0,0 @@ -#include - -class BackupTest : public testing::Test { -protected: - virtual void SetUp() { - //... - } - - virtual void TearDown() { - //... - } -}; - -TEST_F(BackupTest, passingTest) { - EXPECT_TRUE(true); -} - -TEST_F(BackupTest, failingTest) { - // EXPECT_TRUE(false); -} diff --git a/services/backup/old/test/DatabaseManagerTest.cpp b/services/backup/old/test/DatabaseManagerTest.cpp deleted file mode 100644 --- a/services/backup/old/test/DatabaseManagerTest.cpp +++ /dev/null @@ -1,132 +0,0 @@ -#include - -#include "DatabaseManager.h" -#include "GlobalTools.h" -#include "Tools.h" - -#include -#include -#include - -using namespace comm::network::database; - -class DatabaseManagerTest : public testing::Test { -protected: - virtual void SetUp() { - Aws::InitAPI({}); - } - - virtual void TearDown() { - Aws::ShutdownAPI({}); - } -}; - -std::string generateName(const std::string prefix = "") { - return prefix + "-" + - std::to_string(comm::network::tools::getCurrentTimestamp()); -} - -BackupItem -generateBackupItem(const std::string &userID, const std::string &backupID) { - return BackupItem( - userID, - backupID, - comm::network::tools::getCurrentTimestamp(), - "xxx", - "xxx", - ""); -} - -LogItem generateLogItem(const std::string &backupID, const std::string &logID) { - return LogItem( - backupID, logID, false, "xxx", "", "1655e920c4eda97e0be1acae74a5ab51"); -} - -TEST_F(DatabaseManagerTest, TestOperationsOnBackupItems) { - const std::string userID = generateName("user001"); - - std::vector backupIDs = {"backup001", "backup002", "backup003"}; - for (const std::string &backupID : backupIDs) { - DatabaseManager::getInstance().putBackupItem( - generateBackupItem(userID, backupID)); - } - - std::shared_ptr item; - while (!backupIDs.empty()) { - item = DatabaseManager::getInstance().findLastBackupItem(userID); - EXPECT_NE(item, nullptr); - EXPECT_EQ(item->getBackupID(), backupIDs.back()); - backupIDs.pop_back(); - DatabaseManager::getInstance().removeBackupItem(item); - }; - EXPECT_EQ(DatabaseManager::getInstance().findLastBackupItem(userID), nullptr); -} - -TEST_F(DatabaseManagerTest, TestOperationsOnLogItems) { - const std::string backupID1 = generateName("backup001"); - const std::string backupID2 = generateName("backup002"); - - std::vector logIDs1 = {"log001", "log002", "log003"}; - for (const std::string &logID : logIDs1) { - LogItem generatedItem = generateLogItem(backupID1, logID); - DatabaseManager::getInstance().putLogItem(generatedItem); - std::shared_ptr foundItem = - DatabaseManager::getInstance().findLogItem(backupID1, logID); - EXPECT_NE(foundItem, nullptr); - EXPECT_EQ(foundItem->getBackupID(), generatedItem.getBackupID()); - EXPECT_EQ(foundItem->getLogID(), generatedItem.getLogID()); - EXPECT_EQ( - foundItem->getPersistedInBlob(), generatedItem.getPersistedInBlob()); - EXPECT_EQ(foundItem->getValue(), generatedItem.getValue()); - EXPECT_EQ( - foundItem->getAttachmentHolders(), - generatedItem.getAttachmentHolders()); - } - std::vector logIDs2 = {"log021", "log022"}; - for (const std::string &logID : logIDs2) { - LogItem generatedItem = generateLogItem(backupID2, logID); - DatabaseManager::getInstance().putLogItem(generatedItem); - std::shared_ptr foundItem = - DatabaseManager::getInstance().findLogItem(backupID2, logID); - EXPECT_NE(foundItem, nullptr); - EXPECT_EQ(foundItem->getBackupID(), generatedItem.getBackupID()); - EXPECT_EQ(foundItem->getLogID(), generatedItem.getLogID()); - EXPECT_EQ( - foundItem->getPersistedInBlob(), generatedItem.getPersistedInBlob()); - EXPECT_EQ(foundItem->getValue(), generatedItem.getValue()); - EXPECT_EQ( - foundItem->getAttachmentHolders(), - generatedItem.getAttachmentHolders()); - } - - std::vector> items1 = - DatabaseManager::getInstance().findLogItemsForBackup(backupID1); - - std::vector> items2 = - DatabaseManager::getInstance().findLogItemsForBackup(backupID2); - - EXPECT_EQ(items1.size(), 3); - EXPECT_EQ(items2.size(), 2); - - for (size_t i = 0; i < items1.size(); ++i) { - EXPECT_EQ(logIDs1.at(i), items1.at(i)->getLogID()); - DatabaseManager::getInstance().removeLogItem(items1.at(i)); - EXPECT_EQ( - DatabaseManager::getInstance().findLogItem(backupID1, logIDs1.at(i)), - nullptr); - } - EXPECT_EQ( - DatabaseManager::getInstance().findLogItemsForBackup(backupID1).size(), - 0); - - for (size_t i = 0; i < items2.size(); ++i) { - EXPECT_EQ(logIDs2.at(i), items2.at(i)->getLogID()); - DatabaseManager::getInstance().removeLogItem(items2.at(i)); - EXPECT_EQ( - DatabaseManager::getInstance().findLogItem(backupID2, logIDs2.at(i)), - nullptr); - } - EXPECT_EQ( - DatabaseManager::getInstance().findLogItemsForBackup(backupID2).size(), - 0); -}