diff --git a/native/cpp/CommonCpp/NativeModules/InternalModules/NetworkModule.cpp b/native/cpp/CommonCpp/NativeModules/InternalModules/NetworkModule.cpp index dc09552c3..df60ec342 100644 --- a/native/cpp/CommonCpp/NativeModules/InternalModules/NetworkModule.cpp +++ b/native/cpp/CommonCpp/NativeModules/InternalModules/NetworkModule.cpp @@ -1,71 +1,78 @@ #include "NetworkModule.h" #include "Logger.h" namespace comm { void NetworkModule::initializeNetworkModule( const std::string &userId, const std::string &deviceToken, const std::string &hostname) { std::string host = (hostname.size() == 0) ? "localhost" : hostname; // initialize network module // this is going to differ depending on a device // 10.0.2.2 for android emulator // 192.168.x.x for a physical device etc const std::shared_ptr credentials = (host.substr(0, 5) == "https") ? grpc::SslCredentials(grpc::SslCredentialsOptions()) : grpc::InsecureChannelCredentials(); this->networkClient.reset( new network::Client(host, "50051", credentials, userId, deviceToken)); } void NetworkModule::sendPong() { this->networkClient->sendPong(); } void NetworkModule::get(std::string sessionID) { if (!this->networkClient) { return; } this->networkClient->get(sessionID); } void NetworkModule::close() { this->networkClient.reset(); } grpc::Status NetworkModule::send( std::string sessionID, std::string toDeviceID, std::string payload, std::vector blobHashes) { if (!this->networkClient) { return grpc::Status::CANCELLED; } return this->networkClient->send(sessionID, toDeviceID, payload, blobHashes); } void NetworkModule::setOnReadDoneCallback( std::function callback) { if (!this->networkClient) { return; } this->networkClient->setOnReadDoneCallback(callback); } void NetworkModule::setOnOpenCallback(std::function callback) { if (!this->networkClient) { return; } this->networkClient->setOnOpenCallback(callback); } void NetworkModule::setOnCloseCallback(std::function callback) { if (!this->networkClient) { return; } this->networkClient->setOnCloseCallback(callback); } +void NetworkModule::closeGetStream() { + if (!this->networkClient) { + return; + } + this->networkClient->closeGetStream(); +} + } // namespace comm diff --git a/native/cpp/CommonCpp/NativeModules/InternalModules/NetworkModule.h b/native/cpp/CommonCpp/NativeModules/InternalModules/NetworkModule.h index ec33258e4..7215939cf 100644 --- a/native/cpp/CommonCpp/NativeModules/InternalModules/NetworkModule.h +++ b/native/cpp/CommonCpp/NativeModules/InternalModules/NetworkModule.h @@ -1,28 +1,29 @@ #pragma once #include "../../grpc/Client.h" #include #include namespace comm { class NetworkModule { std::unique_ptr networkClient; public: void initializeNetworkModule( const std::string &userId, const std::string &deviceToken, const std::string &hostname = ""); void sendPong(); grpc::Status send( std::string sessionID, std::string toDeviceID, std::string payload, std::vector blobHashes); void close(); void get(std::string sessionID); + void closeGetStream(); void setOnReadDoneCallback(std::function callback); void setOnOpenCallback(std::function callback); void setOnCloseCallback(std::function callback); }; } // namespace comm diff --git a/native/cpp/CommonCpp/grpc/Client.cpp b/native/cpp/CommonCpp/grpc/Client.cpp index eb45c9e24..f82507c35 100644 --- a/native/cpp/CommonCpp/grpc/Client.cpp +++ b/native/cpp/CommonCpp/grpc/Client.cpp @@ -1,116 +1,123 @@ #include "Client.h" #include "Logger.h" #include namespace comm { namespace network { Client::Client( std::string hostname, std::string port, std::shared_ptr credentials, const std::string id, const std::string deviceToken) : id(id), deviceToken(deviceToken) { std::shared_ptr channel = grpc::CreateChannel(hostname + ":" + port, credentials); this->stub_ = TunnelbrokerService::NewStub(channel); } tunnelbroker::CheckResponseType Client::checkIfPrimaryDeviceOnline() { grpc::ClientContext context; tunnelbroker::CheckRequest request; tunnelbroker::CheckResponse response; request.set_userid(this->id); request.set_devicetoken(this->deviceToken); grpc::Status status = stub_->CheckIfPrimaryDeviceOnline(&context, request, &response); if (!status.ok()) { throw std::runtime_error(status.error_message()); } return response.checkresponsetype(); } bool Client::becomeNewPrimaryDevice() { grpc::ClientContext context; tunnelbroker::NewPrimaryRequest request; tunnelbroker::NewPrimaryResponse response; request.set_userid(this->id); request.set_devicetoken(this->deviceToken); grpc::Status status = stub_->BecomeNewPrimaryDevice(&context, request, &response); if (!status.ok()) { throw std::runtime_error(status.error_message()); } return response.success(); } void Client::sendPong() { grpc::ClientContext context; tunnelbroker::PongRequest request; google::protobuf::Empty response; request.set_userid(this->id); request.set_devicetoken(this->deviceToken); Logger::log("Sending PONG"); grpc::Status status = this->stub_->SendPong(&context, request, &response); if (!status.ok()) { std::ostringstream errorMsg; errorMsg << "Sending PONG failed: " << status.error_message() << std::endl; Logger::log(errorMsg.str()); } } grpc::Status Client::send( std::string sessionID, std::string toDeviceID, std::string payload, std::vector blobHashes) { grpc::ClientContext context; tunnelbroker::SendRequest request; google::protobuf::Empty response; request.set_sessionid(sessionID); request.set_todeviceid(toDeviceID); request.set_payload(payload); for (const auto &blob : blobHashes) { request.add_blobhashes(blob); } return this->stub_->Send(&context, request, &response); } void Client::get(std::string sessionID) { this->clientGetReadReactor = std::make_unique(this->stub_.get(), sessionID); } void Client::setOnReadDoneCallback(std::function callback) { if (!this->clientGetReadReactor) { return; } this->clientGetReadReactor->setOnReadDoneCallback(callback); } void Client::setOnOpenCallback(std::function callback) { if (!this->clientGetReadReactor) { return; } this->clientGetReadReactor->setOnOpenCallback(callback); } void Client::setOnCloseCallback(std::function callback) { if (!this->clientGetReadReactor) { return; } this->clientGetReadReactor->setOnCloseCallback(callback); } +void Client::closeGetStream() { + if (!this->clientGetReadReactor) { + return; + } + this->clientGetReadReactor->close(); +} + } // namespace network } // namespace comm diff --git a/native/cpp/CommonCpp/grpc/Client.h b/native/cpp/CommonCpp/grpc/Client.h index 4b020df41..ba13282ad 100644 --- a/native/cpp/CommonCpp/grpc/Client.h +++ b/native/cpp/CommonCpp/grpc/Client.h @@ -1,50 +1,51 @@ #pragma once #include #include #include #include "ClientGetReadReactor.h" #include "_generated/tunnelbroker.grpc.pb.h" #include "_generated/tunnelbroker.pb.h" namespace comm { namespace network { using grpc::Channel; using tunnelbroker::CheckResponseType; using tunnelbroker::TunnelbrokerService; class Client { std::unique_ptr stub_; const std::string id; const std::string deviceToken; std::unique_ptr clientGetReadReactor; public: Client( std::string hostname, std::string port, std::shared_ptr credentials, const std::string id, const std::string deviceToken); CheckResponseType checkIfPrimaryDeviceOnline(); bool becomeNewPrimaryDevice(); void sendPong(); grpc::Status send( std::string sessionID, std::string toDeviceID, std::string payload, std::vector blobHashes); void get(std::string sessionID); void setOnReadDoneCallback(std::function callback); void setOnOpenCallback(std::function callback); void setOnCloseCallback(std::function callback); + void closeGetStream(); }; } // namespace network } // namespace comm diff --git a/native/cpp/CommonCpp/grpc/ClientGetReadReactor.h b/native/cpp/CommonCpp/grpc/ClientGetReadReactor.h index 3d93a9658..23337d669 100644 --- a/native/cpp/CommonCpp/grpc/ClientGetReadReactor.h +++ b/native/cpp/CommonCpp/grpc/ClientGetReadReactor.h @@ -1,62 +1,62 @@ #pragma once #include #include "_generated/tunnelbroker.grpc.pb.h" #include "_generated/tunnelbroker.pb.h" class ClientGetReadReactor : public grpc::ClientReadReactor { std::string sessionID; grpc::ClientContext context; tunnelbroker::GetRequest request; tunnelbroker::GetResponse response; std::mutex onReadDoneCallbackMutex; std::mutex onOpenCallbackMutex; std::mutex onCloseCallbackMutex; std::function onReadDoneCallback; std::function onOpenCallback; std::function onCloseCallback; public: ClientGetReadReactor( tunnelbroker::TunnelbrokerService::Stub *stub, std::string sessionID) : sessionID{sessionID}, request{} { request.set_sessionid(sessionID); stub->async()->Get(&(this->context), &(this->request), this); StartRead(&(this->response)); StartCall(); } void OnReadDone(bool ok) override { if (!ok) { return; } std::lock_guard guard{this->onReadDoneCallbackMutex}; if (this->onReadDoneCallback) { this->onReadDoneCallback(this->response.payload()); } StartRead(&(this->response)); } void setOnReadDoneCallback(std::function onReadDoneCallback) { std::lock_guard guard{this->onReadDoneCallbackMutex}; this->onReadDoneCallback = onReadDoneCallback; } void setOnOpenCallback(std::function onOpenCallback) { std::lock_guard guard{this->onOpenCallbackMutex}; this->onOpenCallback = onOpenCallback; } void setOnCloseCallback(std::function onCloseCallback) { std::lock_guard guard{this->onCloseCallbackMutex}; this->onCloseCallback = onCloseCallback; } - void tryCancel() { + void close() { this->context.TryCancel(); } }; diff --git a/native/cpp/CommonCpp/grpc/GRPCStreamHostObject.cpp b/native/cpp/CommonCpp/grpc/GRPCStreamHostObject.cpp index 59f269250..92afcd15f 100644 --- a/native/cpp/CommonCpp/grpc/GRPCStreamHostObject.cpp +++ b/native/cpp/CommonCpp/grpc/GRPCStreamHostObject.cpp @@ -1,160 +1,164 @@ #include "GRPCStreamHostObject.h" #include "../NativeModules/InternalModules/GlobalNetworkSingleton.h" using namespace facebook; GRPCStreamHostObject::GRPCStreamHostObject( jsi::Runtime &rt, std::shared_ptr jsInvoker) : readyState{0}, onopen{}, onmessage{}, onclose{}, send{jsi::Function::createFromHostFunction( rt, jsi::PropNameID::forUtf8(rt, "send"), 1, [](jsi::Runtime &rt, const jsi::Value &thisVal, const jsi::Value *args, size_t count) { auto payload{args->asString(rt).utf8(rt)}; comm::GlobalNetworkSingleton::instance.scheduleOrRun( [=](comm::NetworkModule &networkModule) { std::vector blobHashes{}; networkModule.send( "sessionID-placeholder", "toDeviceID-placeholder", payload, blobHashes); }); return jsi::Value::undefined(); })}, close{jsi::Function::createFromHostFunction( rt, jsi::PropNameID::forUtf8(rt, "close"), 0, [](jsi::Runtime &rt, const jsi::Value &thisVal, const jsi::Value *args, size_t count) { - return jsi::String::createFromUtf8( - rt, std::string{"GRPCStream.close: unimplemented"}); + comm::GlobalNetworkSingleton::instance.scheduleOrRun( + [=](comm::NetworkModule &networkModule) { + networkModule.closeGetStream(); + }); + + return jsi::Value::undefined(); })}, jsInvoker{jsInvoker} { comm::GlobalNetworkSingleton::instance.scheduleOrRun( [](comm::NetworkModule &networkModule) { networkModule.initializeNetworkModule( "userId-placeholder", "deviceToken-placeholder", "localhost"); networkModule.get("sessionID-placeholder"); }); auto onReadDoneCallback = [this, &rt](std::string data) { this->jsInvoker->invokeAsync([this, &rt, data]() { if (this->onmessage.isNull()) { return; } auto msgObject = jsi::Object(rt); msgObject.setProperty(rt, "data", jsi::String::createFromUtf8(rt, data)); this->onmessage.asObject(rt).asFunction(rt).call(rt, msgObject, 1); }); }; auto onOpenCallback = [this, &rt]() { this->jsInvoker->invokeAsync([this, &rt]() { if (this->onopen.isNull()) { return; } this->onopen.asObject(rt).asFunction(rt).call( rt, jsi::Value::undefined(), 0); }); }; auto onCloseCallback = [this, &rt]() { this->jsInvoker->invokeAsync([this, &rt]() { if (this->onclose.isNull()) { return; } this->onclose.asObject(rt).asFunction(rt).call( rt, jsi::Value::undefined(), 0); }); }; comm::GlobalNetworkSingleton::instance.scheduleOrRun( [onReadDoneCallback = std::move(onReadDoneCallback), onOpenCallback = std::move(onOpenCallback), onCloseCallback = std::move(onCloseCallback)](comm::NetworkModule &networkModule) { networkModule.setOnReadDoneCallback(onReadDoneCallback); networkModule.setOnOpenCallback(onOpenCallback); networkModule.setOnCloseCallback(onCloseCallback); }); } std::vector GRPCStreamHostObject::getPropertyNames(jsi::Runtime &rt) { std::vector names; names.reserve(6); names.push_back(jsi::PropNameID::forUtf8(rt, std::string{"readyState"})); names.push_back(jsi::PropNameID::forUtf8(rt, std::string{"onopen"})); names.push_back(jsi::PropNameID::forUtf8(rt, std::string{"onmessage"})); names.push_back(jsi::PropNameID::forUtf8(rt, std::string{"onclose"})); names.push_back(jsi::PropNameID::forUtf8(rt, std::string{"close"})); names.push_back(jsi::PropNameID::forUtf8(rt, std::string{"send"})); return names; } jsi::Value GRPCStreamHostObject::get(jsi::Runtime &runtime, const jsi::PropNameID &name) { auto propName = name.utf8(runtime); if (propName == "readyState") { return jsi::Value(this->readyState); } if (propName == "send") { return this->send.asObject(runtime).asFunction(runtime); } if (propName == "close") { return this->close.asObject(runtime).asFunction(runtime); } if (propName == "onopen") { return this->onopen.isNull() ? jsi::Value::null() : this->onopen.asObject(runtime).asFunction(runtime); } if (propName == "onmessage") { return this->onmessage.isNull() ? jsi::Value::null() : this->onmessage.asObject(runtime).asFunction(runtime); } if (propName == "onclose") { return this->onclose.isNull() ? jsi::Value::null() : this->onclose.asObject(runtime).asFunction(runtime); } return jsi::String::createFromUtf8(runtime, std::string{"unimplemented"}); } void GRPCStreamHostObject::set( jsi::Runtime &runtime, const jsi::PropNameID &name, const jsi::Value &value) { auto propName = name.utf8(runtime); if (propName == "readyState" && value.isNumber()) { this->readyState = static_cast(value.asNumber()); } else if ( propName == "onopen" && value.isObject() && value.asObject(runtime).isFunction(runtime)) { this->onopen = value.asObject(runtime).asFunction(runtime); } else if ( propName == "onmessage" && value.isObject() && value.asObject(runtime).isFunction(runtime)) { this->onmessage = value.asObject(runtime).asFunction(runtime); } else if ( propName == "onclose" && value.isObject() && value.asObject(runtime).isFunction(runtime)) { this->onclose = value.asObject(runtime).asFunction(runtime); } }