| File: | out/../src/node_messaging.cc |
| Warning: | line 668, column 12 Potential leak of memory pointed to by 'port' |
Press '?' to see keyboard shortcuts
Keyboard shortcuts:
| 1 | #include "node_messaging.h" | |||
| 2 | ||||
| 3 | #include "async_wrap-inl.h" | |||
| 4 | #include "debug_utils-inl.h" | |||
| 5 | #include "memory_tracker-inl.h" | |||
| 6 | #include "node_buffer.h" | |||
| 7 | #include "node_contextify.h" | |||
| 8 | #include "node_errors.h" | |||
| 9 | #include "node_external_reference.h" | |||
| 10 | #include "node_process-inl.h" | |||
| 11 | #include "util-inl.h" | |||
| 12 | ||||
| 13 | using node::contextify::ContextifyContext; | |||
| 14 | using node::errors::TryCatchScope; | |||
| 15 | using v8::Array; | |||
| 16 | using v8::ArrayBuffer; | |||
| 17 | using v8::BackingStore; | |||
| 18 | using v8::CompiledWasmModule; | |||
| 19 | using v8::Context; | |||
| 20 | using v8::EscapableHandleScope; | |||
| 21 | using v8::Function; | |||
| 22 | using v8::FunctionCallbackInfo; | |||
| 23 | using v8::FunctionTemplate; | |||
| 24 | using v8::Global; | |||
| 25 | using v8::HandleScope; | |||
| 26 | using v8::Isolate; | |||
| 27 | using v8::Just; | |||
| 28 | using v8::Local; | |||
| 29 | using v8::Maybe; | |||
| 30 | using v8::MaybeLocal; | |||
| 31 | using v8::Nothing; | |||
| 32 | using v8::Object; | |||
| 33 | using v8::SharedArrayBuffer; | |||
| 34 | using v8::String; | |||
| 35 | using v8::Symbol; | |||
| 36 | using v8::Value; | |||
| 37 | using v8::ValueDeserializer; | |||
| 38 | using v8::ValueSerializer; | |||
| 39 | using v8::WasmModuleObject; | |||
| 40 | ||||
| 41 | namespace node { | |||
| 42 | ||||
| 43 | using BaseObjectList = std::vector<BaseObjectPtr<BaseObject>>; | |||
| 44 | ||||
| 45 | BaseObject::TransferMode BaseObject::GetTransferMode() const { | |||
| 46 | return BaseObject::TransferMode::kUntransferable; | |||
| 47 | } | |||
| 48 | ||||
| 49 | std::unique_ptr<worker::TransferData> BaseObject::TransferForMessaging() { | |||
| 50 | return CloneForMessaging(); | |||
| 51 | } | |||
| 52 | ||||
| 53 | std::unique_ptr<worker::TransferData> BaseObject::CloneForMessaging() const { | |||
| 54 | return {}; | |||
| 55 | } | |||
| 56 | ||||
| 57 | Maybe<BaseObjectList> BaseObject::NestedTransferables() const { | |||
| 58 | return Just(BaseObjectList {}); | |||
| 59 | } | |||
| 60 | ||||
| 61 | Maybe<bool> BaseObject::FinalizeTransferRead( | |||
| 62 | Local<Context> context, ValueDeserializer* deserializer) { | |||
| 63 | return Just(true); | |||
| 64 | } | |||
| 65 | ||||
| 66 | namespace worker { | |||
| 67 | ||||
| 68 | Maybe<bool> TransferData::FinalizeTransferWrite( | |||
| 69 | Local<Context> context, ValueSerializer* serializer) { | |||
| 70 | return Just(true); | |||
| 71 | } | |||
| 72 | ||||
| 73 | Message::Message(MallocedBuffer<char>&& buffer) | |||
| 74 | : main_message_buf_(std::move(buffer)) {} | |||
| 75 | ||||
| 76 | bool Message::IsCloseMessage() const { | |||
| 77 | return main_message_buf_.data == nullptr; | |||
| 78 | } | |||
| 79 | ||||
| 80 | namespace { | |||
| 81 | ||||
| 82 | // This is used to tell V8 how to read transferred host objects, like other | |||
| 83 | // `MessagePort`s and `SharedArrayBuffer`s, and make new JS objects out of them. | |||
| 84 | class DeserializerDelegate : public ValueDeserializer::Delegate { | |||
| 85 | public: | |||
| 86 | DeserializerDelegate( | |||
| 87 | Message* m, | |||
| 88 | Environment* env, | |||
| 89 | const std::vector<BaseObjectPtr<BaseObject>>& host_objects, | |||
| 90 | const std::vector<Local<SharedArrayBuffer>>& shared_array_buffers, | |||
| 91 | const std::vector<CompiledWasmModule>& wasm_modules) | |||
| 92 | : host_objects_(host_objects), | |||
| 93 | shared_array_buffers_(shared_array_buffers), | |||
| 94 | wasm_modules_(wasm_modules) {} | |||
| 95 | ||||
| 96 | MaybeLocal<Object> ReadHostObject(Isolate* isolate) override { | |||
| 97 | // Identifying the index in the message's BaseObject array is sufficient. | |||
| 98 | uint32_t id; | |||
| 99 | if (!deserializer->ReadUint32(&id)) | |||
| 100 | return MaybeLocal<Object>(); | |||
| 101 | CHECK_LT(id, host_objects_.size())do { if (__builtin_expect(!!(!((id) < (host_objects_.size( )))), 0)) { do { static const node::AssertionInfo args = { "../src/node_messaging.cc" ":" "101", "(id) < (host_objects_.size())", __PRETTY_FUNCTION__ }; node::Assert(args); } while (0); } } while (0); | |||
| 102 | return host_objects_[id]->object(isolate); | |||
| 103 | } | |||
| 104 | ||||
| 105 | MaybeLocal<SharedArrayBuffer> GetSharedArrayBufferFromId( | |||
| 106 | Isolate* isolate, uint32_t clone_id) override { | |||
| 107 | CHECK_LT(clone_id, shared_array_buffers_.size())do { if (__builtin_expect(!!(!((clone_id) < (shared_array_buffers_ .size()))), 0)) { do { static const node::AssertionInfo args = { "../src/node_messaging.cc" ":" "107", "(clone_id) < (shared_array_buffers_.size())" , __PRETTY_FUNCTION__ }; node::Assert(args); } while (0); } } while (0); | |||
| 108 | return shared_array_buffers_[clone_id]; | |||
| 109 | } | |||
| 110 | ||||
| 111 | MaybeLocal<WasmModuleObject> GetWasmModuleFromId( | |||
| 112 | Isolate* isolate, uint32_t transfer_id) override { | |||
| 113 | CHECK_LT(transfer_id, wasm_modules_.size())do { if (__builtin_expect(!!(!((transfer_id) < (wasm_modules_ .size()))), 0)) { do { static const node::AssertionInfo args = { "../src/node_messaging.cc" ":" "113", "(transfer_id) < (wasm_modules_.size())" , __PRETTY_FUNCTION__ }; node::Assert(args); } while (0); } } while (0); | |||
| 114 | return WasmModuleObject::FromCompiledModule( | |||
| 115 | isolate, wasm_modules_[transfer_id]); | |||
| 116 | } | |||
| 117 | ||||
| 118 | ValueDeserializer* deserializer = nullptr; | |||
| 119 | ||||
| 120 | private: | |||
| 121 | const std::vector<BaseObjectPtr<BaseObject>>& host_objects_; | |||
| 122 | const std::vector<Local<SharedArrayBuffer>>& shared_array_buffers_; | |||
| 123 | const std::vector<CompiledWasmModule>& wasm_modules_; | |||
| 124 | }; | |||
| 125 | ||||
| 126 | } // anonymous namespace | |||
| 127 | ||||
| 128 | MaybeLocal<Value> Message::Deserialize(Environment* env, | |||
| 129 | Local<Context> context, | |||
| 130 | Local<Value>* port_list) { | |||
| 131 | Context::Scope context_scope(context); | |||
| 132 | ||||
| 133 | CHECK(!IsCloseMessage())do { if (__builtin_expect(!!(!(!IsCloseMessage())), 0)) { do { static const node::AssertionInfo args = { "../src/node_messaging.cc" ":" "133", "!IsCloseMessage()", __PRETTY_FUNCTION__ }; node:: Assert(args); } while (0); } } while (0); | |||
| 134 | if (port_list != nullptr && !transferables_.empty()) { | |||
| 135 | // Need to create this outside of the EscapableHandleScope, but inside | |||
| 136 | // the Context::Scope. | |||
| 137 | *port_list = Array::New(env->isolate()); | |||
| 138 | } | |||
| 139 | ||||
| 140 | EscapableHandleScope handle_scope(env->isolate()); | |||
| 141 | ||||
| 142 | // Create all necessary objects for transferables, e.g. MessagePort handles. | |||
| 143 | std::vector<BaseObjectPtr<BaseObject>> host_objects(transferables_.size()); | |||
| 144 | auto cleanup = OnScopeLeave([&]() { | |||
| 145 | for (BaseObjectPtr<BaseObject> object : host_objects) { | |||
| 146 | if (!object) continue; | |||
| 147 | ||||
| 148 | // If the function did not finish successfully, host_objects will contain | |||
| 149 | // a list of objects that will never be passed to JS. Therefore, we | |||
| 150 | // destroy them here. | |||
| 151 | object->Detach(); | |||
| 152 | } | |||
| 153 | }); | |||
| 154 | ||||
| 155 | for (uint32_t i = 0; i < transferables_.size(); ++i) { | |||
| 156 | HandleScope handle_scope(env->isolate()); | |||
| 157 | TransferData* data = transferables_[i].get(); | |||
| 158 | host_objects[i] = data->Deserialize( | |||
| 159 | env, context, std::move(transferables_[i])); | |||
| 160 | if (!host_objects[i]) return {}; | |||
| 161 | if (port_list != nullptr) { | |||
| 162 | // If we gather a list of all message ports, and this transferred object | |||
| 163 | // is a message port, add it to that list. This is a bit of an odd case | |||
| 164 | // of special handling for MessagePorts (as opposed to applying to all | |||
| 165 | // transferables), but it's required for spec compliance. | |||
| 166 | DCHECK((*port_list)->IsArray()); | |||
| 167 | Local<Array> port_list_array = port_list->As<Array>(); | |||
| 168 | Local<Object> obj = host_objects[i]->object(); | |||
| 169 | if (env->message_port_constructor_template()->HasInstance(obj)) { | |||
| 170 | if (port_list_array->Set(context, | |||
| 171 | port_list_array->Length(), | |||
| 172 | obj).IsNothing()) { | |||
| 173 | return {}; | |||
| 174 | } | |||
| 175 | } | |||
| 176 | } | |||
| 177 | } | |||
| 178 | transferables_.clear(); | |||
| 179 | ||||
| 180 | std::vector<Local<SharedArrayBuffer>> shared_array_buffers; | |||
| 181 | // Attach all transferred SharedArrayBuffers to their new Isolate. | |||
| 182 | for (uint32_t i = 0; i < shared_array_buffers_.size(); ++i) { | |||
| 183 | Local<SharedArrayBuffer> sab = | |||
| 184 | SharedArrayBuffer::New(env->isolate(), shared_array_buffers_[i]); | |||
| 185 | shared_array_buffers.push_back(sab); | |||
| 186 | } | |||
| 187 | ||||
| 188 | DeserializerDelegate delegate( | |||
| 189 | this, env, host_objects, shared_array_buffers, wasm_modules_); | |||
| 190 | ValueDeserializer deserializer( | |||
| 191 | env->isolate(), | |||
| 192 | reinterpret_cast<const uint8_t*>(main_message_buf_.data), | |||
| 193 | main_message_buf_.size, | |||
| 194 | &delegate); | |||
| 195 | delegate.deserializer = &deserializer; | |||
| 196 | ||||
| 197 | // Attach all transferred ArrayBuffers to their new Isolate. | |||
| 198 | for (uint32_t i = 0; i < array_buffers_.size(); ++i) { | |||
| 199 | Local<ArrayBuffer> ab = | |||
| 200 | ArrayBuffer::New(env->isolate(), std::move(array_buffers_[i])); | |||
| 201 | deserializer.TransferArrayBuffer(i, ab); | |||
| 202 | } | |||
| 203 | ||||
| 204 | if (deserializer.ReadHeader(context).IsNothing()) | |||
| 205 | return {}; | |||
| 206 | Local<Value> return_value; | |||
| 207 | if (!deserializer.ReadValue(context).ToLocal(&return_value)) | |||
| 208 | return {}; | |||
| 209 | ||||
| 210 | for (BaseObjectPtr<BaseObject> base_object : host_objects) { | |||
| 211 | if (base_object->FinalizeTransferRead(context, &deserializer).IsNothing()) | |||
| 212 | return {}; | |||
| 213 | } | |||
| 214 | ||||
| 215 | host_objects.clear(); | |||
| 216 | return handle_scope.Escape(return_value); | |||
| 217 | } | |||
| 218 | ||||
| 219 | void Message::AddSharedArrayBuffer( | |||
| 220 | std::shared_ptr<BackingStore> backing_store) { | |||
| 221 | shared_array_buffers_.emplace_back(std::move(backing_store)); | |||
| 222 | } | |||
| 223 | ||||
| 224 | void Message::AddTransferable(std::unique_ptr<TransferData>&& data) { | |||
| 225 | transferables_.emplace_back(std::move(data)); | |||
| 226 | } | |||
| 227 | ||||
| 228 | uint32_t Message::AddWASMModule(CompiledWasmModule&& mod) { | |||
| 229 | wasm_modules_.emplace_back(std::move(mod)); | |||
| 230 | return wasm_modules_.size() - 1; | |||
| 231 | } | |||
| 232 | ||||
| 233 | namespace { | |||
| 234 | ||||
| 235 | MaybeLocal<Function> GetEmitMessageFunction(Local<Context> context) { | |||
| 236 | Isolate* isolate = context->GetIsolate(); | |||
| 237 | Local<Object> per_context_bindings; | |||
| 238 | Local<Value> emit_message_val; | |||
| 239 | if (!GetPerContextExports(context).ToLocal(&per_context_bindings) || | |||
| 240 | !per_context_bindings->Get(context, | |||
| 241 | FIXED_ONE_BYTE_STRING(isolate, "emitMessage")) | |||
| 242 | .ToLocal(&emit_message_val)) { | |||
| 243 | return MaybeLocal<Function>(); | |||
| 244 | } | |||
| 245 | CHECK(emit_message_val->IsFunction())do { if (__builtin_expect(!!(!(emit_message_val->IsFunction ())), 0)) { do { static const node::AssertionInfo args = { "../src/node_messaging.cc" ":" "245", "emit_message_val->IsFunction()", __PRETTY_FUNCTION__ }; node::Assert(args); } while (0); } } while (0); | |||
| 246 | return emit_message_val.As<Function>(); | |||
| 247 | } | |||
| 248 | ||||
| 249 | MaybeLocal<Function> GetDOMException(Local<Context> context) { | |||
| 250 | Isolate* isolate = context->GetIsolate(); | |||
| 251 | Local<Object> per_context_bindings; | |||
| 252 | Local<Value> domexception_ctor_val; | |||
| 253 | if (!GetPerContextExports(context).ToLocal(&per_context_bindings) || | |||
| 254 | !per_context_bindings->Get(context, | |||
| 255 | FIXED_ONE_BYTE_STRING(isolate, "DOMException")) | |||
| 256 | .ToLocal(&domexception_ctor_val)) { | |||
| 257 | return MaybeLocal<Function>(); | |||
| 258 | } | |||
| 259 | CHECK(domexception_ctor_val->IsFunction())do { if (__builtin_expect(!!(!(domexception_ctor_val->IsFunction ())), 0)) { do { static const node::AssertionInfo args = { "../src/node_messaging.cc" ":" "259", "domexception_ctor_val->IsFunction()", __PRETTY_FUNCTION__ }; node::Assert(args); } while (0); } } while (0); | |||
| 260 | Local<Function> domexception_ctor = domexception_ctor_val.As<Function>(); | |||
| 261 | return domexception_ctor; | |||
| 262 | } | |||
| 263 | ||||
| 264 | void ThrowDataCloneException(Local<Context> context, Local<String> message) { | |||
| 265 | Isolate* isolate = context->GetIsolate(); | |||
| 266 | Local<Value> argv[] = {message, | |||
| 267 | FIXED_ONE_BYTE_STRING(isolate, "DataCloneError")}; | |||
| 268 | Local<Value> exception; | |||
| 269 | Local<Function> domexception_ctor; | |||
| 270 | if (!GetDOMException(context).ToLocal(&domexception_ctor) || | |||
| 271 | !domexception_ctor->NewInstance(context, arraysize(argv), argv) | |||
| 272 | .ToLocal(&exception)) { | |||
| 273 | return; | |||
| 274 | } | |||
| 275 | isolate->ThrowException(exception); | |||
| 276 | } | |||
| 277 | ||||
| 278 | // This tells V8 how to serialize objects that it does not understand | |||
| 279 | // (e.g. C++ objects) into the output buffer, in a way that our own | |||
| 280 | // DeserializerDelegate understands how to unpack. | |||
| 281 | class SerializerDelegate : public ValueSerializer::Delegate { | |||
| 282 | public: | |||
| 283 | SerializerDelegate(Environment* env, Local<Context> context, Message* m) | |||
| 284 | : env_(env), context_(context), msg_(m) {} | |||
| 285 | ||||
| 286 | void ThrowDataCloneError(Local<String> message) override { | |||
| 287 | ThrowDataCloneException(context_, message); | |||
| 288 | } | |||
| 289 | ||||
| 290 | Maybe<bool> WriteHostObject(Isolate* isolate, Local<Object> object) override { | |||
| 291 | if (env_->base_object_ctor_template()->HasInstance(object)) { | |||
| 292 | return WriteHostObject( | |||
| 293 | BaseObjectPtr<BaseObject> { Unwrap<BaseObject>(object) }); | |||
| 294 | } | |||
| 295 | ||||
| 296 | ThrowDataCloneError(env_->clone_unsupported_type_str()); | |||
| 297 | return Nothing<bool>(); | |||
| 298 | } | |||
| 299 | ||||
| 300 | Maybe<uint32_t> GetSharedArrayBufferId( | |||
| 301 | Isolate* isolate, | |||
| 302 | Local<SharedArrayBuffer> shared_array_buffer) override { | |||
| 303 | uint32_t i; | |||
| 304 | for (i = 0; i < seen_shared_array_buffers_.size(); ++i) { | |||
| 305 | if (PersistentToLocal::Strong(seen_shared_array_buffers_[i]) == | |||
| 306 | shared_array_buffer) { | |||
| 307 | return Just(i); | |||
| 308 | } | |||
| 309 | } | |||
| 310 | ||||
| 311 | seen_shared_array_buffers_.emplace_back( | |||
| 312 | Global<SharedArrayBuffer> { isolate, shared_array_buffer }); | |||
| 313 | msg_->AddSharedArrayBuffer(shared_array_buffer->GetBackingStore()); | |||
| 314 | return Just(i); | |||
| 315 | } | |||
| 316 | ||||
| 317 | Maybe<uint32_t> GetWasmModuleTransferId( | |||
| 318 | Isolate* isolate, Local<WasmModuleObject> module) override { | |||
| 319 | return Just(msg_->AddWASMModule(module->GetCompiledModule())); | |||
| 320 | } | |||
| 321 | ||||
| 322 | Maybe<bool> Finish(Local<Context> context) { | |||
| 323 | for (uint32_t i = 0; i < host_objects_.size(); i++) { | |||
| 324 | BaseObjectPtr<BaseObject> host_object = std::move(host_objects_[i]); | |||
| 325 | std::unique_ptr<TransferData> data; | |||
| 326 | if (i < first_cloned_object_index_) | |||
| 327 | data = host_object->TransferForMessaging(); | |||
| 328 | if (!data) | |||
| 329 | data = host_object->CloneForMessaging(); | |||
| 330 | if (!data) return Nothing<bool>(); | |||
| 331 | if (data->FinalizeTransferWrite(context, serializer).IsNothing()) | |||
| 332 | return Nothing<bool>(); | |||
| 333 | msg_->AddTransferable(std::move(data)); | |||
| 334 | } | |||
| 335 | return Just(true); | |||
| 336 | } | |||
| 337 | ||||
| 338 | inline void AddHostObject(BaseObjectPtr<BaseObject> host_object) { | |||
| 339 | // Make sure we have not started serializing the value itself yet. | |||
| 340 | CHECK_EQ(first_cloned_object_index_, SIZE_MAX)do { if (__builtin_expect(!!(!((first_cloned_object_index_) == ((18446744073709551615UL)))), 0)) { do { static const node:: AssertionInfo args = { "../src/node_messaging.cc" ":" "340", "(first_cloned_object_index_) == ((18446744073709551615UL))" , __PRETTY_FUNCTION__ }; node::Assert(args); } while (0); } } while (0); | |||
| 341 | host_objects_.emplace_back(std::move(host_object)); | |||
| 342 | } | |||
| 343 | ||||
| 344 | // Some objects in the transfer list may register sub-objects that can be | |||
| 345 | // transferred. This could e.g. be a public JS wrapper object, such as a | |||
| 346 | // FileHandle, that is registering its C++ handle for transfer. | |||
| 347 | inline Maybe<bool> AddNestedHostObjects() { | |||
| 348 | for (size_t i = 0; i < host_objects_.size(); i++) { | |||
| 349 | std::vector<BaseObjectPtr<BaseObject>> nested_transferables; | |||
| 350 | if (!host_objects_[i]->NestedTransferables().To(&nested_transferables)) | |||
| 351 | return Nothing<bool>(); | |||
| 352 | for (auto nested_transferable : nested_transferables) { | |||
| 353 | if (std::find(host_objects_.begin(), | |||
| 354 | host_objects_.end(), | |||
| 355 | nested_transferable) == host_objects_.end()) { | |||
| 356 | AddHostObject(nested_transferable); | |||
| 357 | } | |||
| 358 | } | |||
| 359 | } | |||
| 360 | return Just(true); | |||
| 361 | } | |||
| 362 | ||||
| 363 | ValueSerializer* serializer = nullptr; | |||
| 364 | ||||
| 365 | private: | |||
| 366 | Maybe<bool> WriteHostObject(BaseObjectPtr<BaseObject> host_object) { | |||
| 367 | BaseObject::TransferMode mode = host_object->GetTransferMode(); | |||
| 368 | if (mode == BaseObject::TransferMode::kUntransferable) { | |||
| 369 | ThrowDataCloneError(env_->clone_unsupported_type_str()); | |||
| 370 | return Nothing<bool>(); | |||
| 371 | } | |||
| 372 | ||||
| 373 | for (uint32_t i = 0; i < host_objects_.size(); i++) { | |||
| 374 | if (host_objects_[i] == host_object) { | |||
| 375 | serializer->WriteUint32(i); | |||
| 376 | return Just(true); | |||
| 377 | } | |||
| 378 | } | |||
| 379 | ||||
| 380 | if (mode == BaseObject::TransferMode::kTransferable) { | |||
| 381 | THROW_ERR_MISSING_TRANSFERABLE_IN_TRANSFER_LIST(env_); | |||
| 382 | return Nothing<bool>(); | |||
| 383 | } | |||
| 384 | ||||
| 385 | CHECK_EQ(mode, BaseObject::TransferMode::kCloneable)do { if (__builtin_expect(!!(!((mode) == (BaseObject::TransferMode ::kCloneable))), 0)) { do { static const node::AssertionInfo args = { "../src/node_messaging.cc" ":" "385", "(mode) == (BaseObject::TransferMode::kCloneable)" , __PRETTY_FUNCTION__ }; node::Assert(args); } while (0); } } while (0); | |||
| 386 | uint32_t index = host_objects_.size(); | |||
| 387 | if (first_cloned_object_index_ == SIZE_MAX(18446744073709551615UL)) | |||
| 388 | first_cloned_object_index_ = index; | |||
| 389 | serializer->WriteUint32(index); | |||
| 390 | host_objects_.push_back(host_object); | |||
| 391 | return Just(true); | |||
| 392 | } | |||
| 393 | ||||
| 394 | Environment* env_; | |||
| 395 | Local<Context> context_; | |||
| 396 | Message* msg_; | |||
| 397 | std::vector<Global<SharedArrayBuffer>> seen_shared_array_buffers_; | |||
| 398 | std::vector<BaseObjectPtr<BaseObject>> host_objects_; | |||
| 399 | size_t first_cloned_object_index_ = SIZE_MAX(18446744073709551615UL); | |||
| 400 | ||||
| 401 | friend class worker::Message; | |||
| 402 | }; | |||
| 403 | ||||
| 404 | } // anonymous namespace | |||
| 405 | ||||
| 406 | Maybe<bool> Message::Serialize(Environment* env, | |||
| 407 | Local<Context> context, | |||
| 408 | Local<Value> input, | |||
| 409 | const TransferList& transfer_list_v, | |||
| 410 | Local<Object> source_port) { | |||
| 411 | HandleScope handle_scope(env->isolate()); | |||
| 412 | Context::Scope context_scope(context); | |||
| 413 | ||||
| 414 | // Verify that we're not silently overwriting an existing message. | |||
| 415 | CHECK(main_message_buf_.is_empty())do { if (__builtin_expect(!!(!(main_message_buf_.is_empty())) , 0)) { do { static const node::AssertionInfo args = { "../src/node_messaging.cc" ":" "415", "main_message_buf_.is_empty()", __PRETTY_FUNCTION__ }; node::Assert(args); } while (0); } } while (0); | |||
| 416 | ||||
| 417 | SerializerDelegate delegate(env, context, this); | |||
| 418 | ValueSerializer serializer(env->isolate(), &delegate); | |||
| 419 | delegate.serializer = &serializer; | |||
| 420 | ||||
| 421 | std::vector<Local<ArrayBuffer>> array_buffers; | |||
| 422 | for (uint32_t i = 0; i < transfer_list_v.length(); ++i) { | |||
| 423 | Local<Value> entry = transfer_list_v[i]; | |||
| 424 | if (entry->IsObject()) { | |||
| 425 | // See https://github.com/nodejs/node/pull/30339#issuecomment-552225353 | |||
| 426 | // for details. | |||
| 427 | bool untransferable; | |||
| 428 | if (!entry.As<Object>()->HasPrivate( | |||
| 429 | context, | |||
| 430 | env->untransferable_object_private_symbol()) | |||
| 431 | .To(&untransferable)) { | |||
| 432 | return Nothing<bool>(); | |||
| 433 | } | |||
| 434 | if (untransferable) continue; | |||
| 435 | } | |||
| 436 | ||||
| 437 | // Currently, we support ArrayBuffers and BaseObjects for which | |||
| 438 | // GetTransferMode() does not return kUntransferable. | |||
| 439 | if (entry->IsArrayBuffer()) { | |||
| 440 | Local<ArrayBuffer> ab = entry.As<ArrayBuffer>(); | |||
| 441 | // If we cannot render the ArrayBuffer unusable in this Isolate, | |||
| 442 | // copying the buffer will have to do. | |||
| 443 | // Note that we can currently transfer ArrayBuffers even if they were | |||
| 444 | // not allocated by Node’s ArrayBufferAllocator in the first place, | |||
| 445 | // because we pass the underlying v8::BackingStore around rather than | |||
| 446 | // raw data *and* an Isolate with a non-default ArrayBuffer allocator | |||
| 447 | // is always going to outlive any Workers it creates, and so will its | |||
| 448 | // allocator along with it. | |||
| 449 | if (!ab->IsDetachable()) continue; | |||
| 450 | if (std::find(array_buffers.begin(), array_buffers.end(), ab) != | |||
| 451 | array_buffers.end()) { | |||
| 452 | ThrowDataCloneException( | |||
| 453 | context, | |||
| 454 | FIXED_ONE_BYTE_STRING( | |||
| 455 | env->isolate(), | |||
| 456 | "Transfer list contains duplicate ArrayBuffer")); | |||
| 457 | return Nothing<bool>(); | |||
| 458 | } | |||
| 459 | // We simply use the array index in the `array_buffers` list as the | |||
| 460 | // ID that we write into the serialized buffer. | |||
| 461 | uint32_t id = array_buffers.size(); | |||
| 462 | array_buffers.push_back(ab); | |||
| 463 | serializer.TransferArrayBuffer(id, ab); | |||
| 464 | continue; | |||
| 465 | } else if (env->base_object_ctor_template()->HasInstance(entry)) { | |||
| 466 | // Check if the source MessagePort is being transferred. | |||
| 467 | if (!source_port.IsEmpty() && entry == source_port) { | |||
| 468 | ThrowDataCloneException( | |||
| 469 | context, | |||
| 470 | FIXED_ONE_BYTE_STRING(env->isolate(), | |||
| 471 | "Transfer list contains source port")); | |||
| 472 | return Nothing<bool>(); | |||
| 473 | } | |||
| 474 | BaseObjectPtr<BaseObject> host_object { | |||
| 475 | Unwrap<BaseObject>(entry.As<Object>()) }; | |||
| 476 | if (env->message_port_constructor_template()->HasInstance(entry) && | |||
| 477 | (!host_object || | |||
| 478 | static_cast<MessagePort*>(host_object.get())->IsDetached())) { | |||
| 479 | ThrowDataCloneException( | |||
| 480 | context, | |||
| 481 | FIXED_ONE_BYTE_STRING( | |||
| 482 | env->isolate(), | |||
| 483 | "MessagePort in transfer list is already detached")); | |||
| 484 | return Nothing<bool>(); | |||
| 485 | } | |||
| 486 | if (std::find(delegate.host_objects_.begin(), | |||
| 487 | delegate.host_objects_.end(), | |||
| 488 | host_object) != delegate.host_objects_.end()) { | |||
| 489 | ThrowDataCloneException( | |||
| 490 | context, | |||
| 491 | String::Concat(env->isolate(), | |||
| 492 | FIXED_ONE_BYTE_STRING( | |||
| 493 | env->isolate(), | |||
| 494 | "Transfer list contains duplicate "), | |||
| 495 | entry.As<Object>()->GetConstructorName())); | |||
| 496 | return Nothing<bool>(); | |||
| 497 | } | |||
| 498 | if (host_object && host_object->GetTransferMode() != | |||
| 499 | BaseObject::TransferMode::kUntransferable) { | |||
| 500 | delegate.AddHostObject(host_object); | |||
| 501 | continue; | |||
| 502 | } | |||
| 503 | } | |||
| 504 | ||||
| 505 | THROW_ERR_INVALID_TRANSFER_OBJECT(env); | |||
| 506 | return Nothing<bool>(); | |||
| 507 | } | |||
| 508 | if (delegate.AddNestedHostObjects().IsNothing()) | |||
| 509 | return Nothing<bool>(); | |||
| 510 | ||||
| 511 | serializer.WriteHeader(); | |||
| 512 | if (serializer.WriteValue(context, input).IsNothing()) { | |||
| 513 | return Nothing<bool>(); | |||
| 514 | } | |||
| 515 | ||||
| 516 | for (Local<ArrayBuffer> ab : array_buffers) { | |||
| 517 | // If serialization succeeded, we render it inaccessible in this Isolate. | |||
| 518 | std::shared_ptr<BackingStore> backing_store = ab->GetBackingStore(); | |||
| 519 | ab->Detach(); | |||
| 520 | ||||
| 521 | array_buffers_.emplace_back(std::move(backing_store)); | |||
| 522 | } | |||
| 523 | ||||
| 524 | if (delegate.Finish(context).IsNothing()) | |||
| 525 | return Nothing<bool>(); | |||
| 526 | ||||
| 527 | // The serializer gave us a buffer allocated using `malloc()`. | |||
| 528 | std::pair<uint8_t*, size_t> data = serializer.Release(); | |||
| 529 | CHECK_NOT_NULL(data.first)do { if (__builtin_expect(!!(!((data.first) != nullptr)), 0)) { do { static const node::AssertionInfo args = { "../src/node_messaging.cc" ":" "529", "(data.first) != nullptr", __PRETTY_FUNCTION__ }; node::Assert(args); } while (0); } } while (0); | |||
| 530 | main_message_buf_ = | |||
| 531 | MallocedBuffer<char>(reinterpret_cast<char*>(data.first), data.second); | |||
| 532 | return Just(true); | |||
| 533 | } | |||
| 534 | ||||
| 535 | void Message::MemoryInfo(MemoryTracker* tracker) const { | |||
| 536 | tracker->TrackField("array_buffers_", array_buffers_); | |||
| 537 | tracker->TrackField("shared_array_buffers", shared_array_buffers_); | |||
| 538 | tracker->TrackField("transferables", transferables_); | |||
| 539 | } | |||
| 540 | ||||
| 541 | MessagePortData::MessagePortData(MessagePort* owner) | |||
| 542 | : owner_(owner) { | |||
| 543 | } | |||
| 544 | ||||
| 545 | MessagePortData::~MessagePortData() { | |||
| 546 | CHECK_NULL(owner_)do { if (__builtin_expect(!!(!((owner_) == nullptr)), 0)) { do { static const node::AssertionInfo args = { "../src/node_messaging.cc" ":" "546", "(owner_) == nullptr", __PRETTY_FUNCTION__ }; node ::Assert(args); } while (0); } } while (0); | |||
| 547 | Disentangle(); | |||
| 548 | } | |||
| 549 | ||||
| 550 | void MessagePortData::MemoryInfo(MemoryTracker* tracker) const { | |||
| 551 | Mutex::ScopedLock lock(mutex_); | |||
| 552 | tracker->TrackField("incoming_messages", incoming_messages_); | |||
| 553 | } | |||
| 554 | ||||
| 555 | void MessagePortData::AddToIncomingQueue(std::shared_ptr<Message> message) { | |||
| 556 | // This function will be called by other threads. | |||
| 557 | Mutex::ScopedLock lock(mutex_); | |||
| 558 | incoming_messages_.emplace_back(std::move(message)); | |||
| 559 | ||||
| 560 | if (owner_ != nullptr) { | |||
| 561 | Debug(owner_, "Adding message to incoming queue"); | |||
| 562 | owner_->TriggerAsync(); | |||
| 563 | } | |||
| 564 | } | |||
| 565 | ||||
| 566 | void MessagePortData::Entangle(MessagePortData* a, MessagePortData* b) { | |||
| 567 | auto group = std::make_shared<SiblingGroup>(); | |||
| 568 | group->Entangle({a, b}); | |||
| 569 | } | |||
| 570 | ||||
| 571 | void MessagePortData::Disentangle() { | |||
| 572 | if (group_) { | |||
| 573 | group_->Disentangle(this); | |||
| 574 | } | |||
| 575 | } | |||
| 576 | ||||
| 577 | MessagePort::~MessagePort() { | |||
| 578 | if (data_) Detach(); | |||
| 579 | } | |||
| 580 | ||||
| 581 | MessagePort::MessagePort(Environment* env, | |||
| 582 | Local<Context> context, | |||
| 583 | Local<Object> wrap) | |||
| 584 | : HandleWrap(env, | |||
| 585 | wrap, | |||
| 586 | reinterpret_cast<uv_handle_t*>(&async_), | |||
| 587 | AsyncWrap::PROVIDER_MESSAGEPORT), | |||
| 588 | data_(new MessagePortData(this)) { | |||
| 589 | auto onmessage = [](uv_async_t* handle) { | |||
| 590 | // Called when data has been put into the queue. | |||
| 591 | MessagePort* channel = ContainerOf(&MessagePort::async_, handle); | |||
| 592 | channel->OnMessage(MessageProcessingMode::kNormalOperation); | |||
| 593 | }; | |||
| 594 | ||||
| 595 | CHECK_EQ(uv_async_init(env->event_loop(),do { if (__builtin_expect(!!(!((uv_async_init(env->event_loop (), &async_, onmessage)) == (0))), 0)) { do { static const node::AssertionInfo args = { "../src/node_messaging.cc" ":" "597" , "(uv_async_init(env->event_loop(), &async_, onmessage)) == (0)" , __PRETTY_FUNCTION__ }; node::Assert(args); } while (0); } } while (0) | |||
| 596 | &async_,do { if (__builtin_expect(!!(!((uv_async_init(env->event_loop (), &async_, onmessage)) == (0))), 0)) { do { static const node::AssertionInfo args = { "../src/node_messaging.cc" ":" "597" , "(uv_async_init(env->event_loop(), &async_, onmessage)) == (0)" , __PRETTY_FUNCTION__ }; node::Assert(args); } while (0); } } while (0) | |||
| 597 | onmessage), 0)do { if (__builtin_expect(!!(!((uv_async_init(env->event_loop (), &async_, onmessage)) == (0))), 0)) { do { static const node::AssertionInfo args = { "../src/node_messaging.cc" ":" "597" , "(uv_async_init(env->event_loop(), &async_, onmessage)) == (0)" , __PRETTY_FUNCTION__ }; node::Assert(args); } while (0); } } while (0); | |||
| 598 | // Reset later to indicate success of the constructor. | |||
| 599 | bool succeeded = false; | |||
| 600 | auto cleanup = OnScopeLeave([&]() { if (!succeeded) Close(); }); | |||
| 601 | ||||
| 602 | Local<Value> fn; | |||
| 603 | if (!wrap->Get(context, env->oninit_symbol()).ToLocal(&fn)) | |||
| 604 | return; | |||
| 605 | ||||
| 606 | if (fn->IsFunction()) { | |||
| 607 | Local<Function> init = fn.As<Function>(); | |||
| 608 | if (init->Call(context, wrap, 0, nullptr).IsEmpty()) | |||
| 609 | return; | |||
| 610 | } | |||
| 611 | ||||
| 612 | Local<Function> emit_message_fn; | |||
| 613 | if (!GetEmitMessageFunction(context).ToLocal(&emit_message_fn)) | |||
| 614 | return; | |||
| 615 | emit_message_fn_.Reset(env->isolate(), emit_message_fn); | |||
| 616 | ||||
| 617 | succeeded = true; | |||
| 618 | Debug(this, "Created message port"); | |||
| 619 | } | |||
| 620 | ||||
| 621 | bool MessagePort::IsDetached() const { | |||
| 622 | return data_ == nullptr || IsHandleClosing(); | |||
| 623 | } | |||
| 624 | ||||
| 625 | void MessagePort::TriggerAsync() { | |||
| 626 | if (IsHandleClosing()) return; | |||
| 627 | CHECK_EQ(uv_async_send(&async_), 0)do { if (__builtin_expect(!!(!((uv_async_send(&async_)) == (0))), 0)) { do { static const node::AssertionInfo args = { "../src/node_messaging.cc" ":" "627", "(uv_async_send(&async_)) == (0)", __PRETTY_FUNCTION__ }; node::Assert(args); } while (0); } } while (0); | |||
| 628 | } | |||
| 629 | ||||
| 630 | void MessagePort::Close(v8::Local<v8::Value> close_callback) { | |||
| 631 | Debug(this, "Closing message port, data set = %d", static_cast<int>(!!data_)); | |||
| 632 | ||||
| 633 | if (data_) { | |||
| 634 | // Wrap this call with accessing the mutex, so that TriggerAsync() | |||
| 635 | // can check IsHandleClosing() without race conditions. | |||
| 636 | Mutex::ScopedLock sibling_lock(data_->mutex_); | |||
| 637 | HandleWrap::Close(close_callback); | |||
| 638 | } else { | |||
| 639 | HandleWrap::Close(close_callback); | |||
| 640 | } | |||
| 641 | } | |||
| 642 | ||||
| 643 | void MessagePort::New(const FunctionCallbackInfo<Value>& args) { | |||
| 644 | // This constructor just throws an error. Unfortunately, we can’t use V8’s | |||
| 645 | // ConstructorBehavior::kThrow, as that also removes the prototype from the | |||
| 646 | // class (i.e. makes it behave like an arrow function). | |||
| 647 | Environment* env = Environment::GetCurrent(args); | |||
| 648 | THROW_ERR_CONSTRUCT_CALL_INVALID(env); | |||
| 649 | } | |||
| 650 | ||||
| 651 | MessagePort* MessagePort::New( | |||
| 652 | Environment* env, | |||
| 653 | Local<Context> context, | |||
| 654 | std::unique_ptr<MessagePortData> data, | |||
| 655 | std::shared_ptr<SiblingGroup> sibling_group) { | |||
| 656 | Context::Scope context_scope(context); | |||
| 657 | Local<FunctionTemplate> ctor_templ = GetMessagePortConstructorTemplate(env); | |||
| 658 | ||||
| 659 | // Construct a new instance, then assign the listener instance and possibly | |||
| 660 | // the MessagePortData to it. | |||
| 661 | Local<Object> instance; | |||
| 662 | if (!ctor_templ->InstanceTemplate()->NewInstance(context).ToLocal(&instance)) | |||
| 663 | return nullptr; | |||
| 664 | MessagePort* port = new MessagePort(env, context, instance); | |||
| 665 | CHECK_NOT_NULL(port)do { if (__builtin_expect(!!(!((port) != nullptr)), 0)) { do { static const node::AssertionInfo args = { "../src/node_messaging.cc" ":" "665", "(port) != nullptr", __PRETTY_FUNCTION__ }; node:: Assert(args); } while (0); } } while (0); | |||
| 666 | if (port->IsHandleClosing()) { | |||
| 667 | // Construction failed with an exception. | |||
| 668 | return nullptr; | |||
| ||||
| 669 | } | |||
| 670 | ||||
| 671 | if (data) { | |||
| 672 | CHECK(!sibling_group)do { if (__builtin_expect(!!(!(!sibling_group)), 0)) { do { static const node::AssertionInfo args = { "../src/node_messaging.cc" ":" "672", "!sibling_group", __PRETTY_FUNCTION__ }; node::Assert (args); } while (0); } } while (0); | |||
| 673 | port->Detach(); | |||
| 674 | port->data_ = std::move(data); | |||
| 675 | ||||
| 676 | // This lock is here to avoid race conditions with the `owner_` read | |||
| 677 | // in AddToIncomingQueue(). (This would likely be unproblematic without it, | |||
| 678 | // but it's better to be safe than sorry.) | |||
| 679 | Mutex::ScopedLock lock(port->data_->mutex_); | |||
| 680 | port->data_->owner_ = port; | |||
| 681 | // If the existing MessagePortData object had pending messages, this is | |||
| 682 | // the easiest way to run that queue. | |||
| 683 | port->TriggerAsync(); | |||
| 684 | } else if (sibling_group) { | |||
| 685 | sibling_group->Entangle(port->data_.get()); | |||
| 686 | } | |||
| 687 | return port; | |||
| 688 | } | |||
| 689 | ||||
| 690 | MaybeLocal<Value> MessagePort::ReceiveMessage(Local<Context> context, | |||
| 691 | MessageProcessingMode mode, | |||
| 692 | Local<Value>* port_list) { | |||
| 693 | std::shared_ptr<Message> received; | |||
| 694 | { | |||
| 695 | // Get the head of the message queue. | |||
| 696 | Mutex::ScopedLock lock(data_->mutex_); | |||
| 697 | ||||
| 698 | Debug(this, "MessagePort has message"); | |||
| 699 | ||||
| 700 | bool wants_message = | |||
| 701 | receiving_messages_ || | |||
| 702 | mode == MessageProcessingMode::kForceReadMessages; | |||
| 703 | // We have nothing to do if: | |||
| 704 | // - There are no pending messages | |||
| 705 | // - We are not intending to receive messages, and the message we would | |||
| 706 | // receive is not the final "close" message. | |||
| 707 | if (data_->incoming_messages_.empty() || | |||
| 708 | (!wants_message && | |||
| 709 | !data_->incoming_messages_.front()->IsCloseMessage())) { | |||
| 710 | return env()->no_message_symbol(); | |||
| 711 | } | |||
| 712 | ||||
| 713 | received = data_->incoming_messages_.front(); | |||
| 714 | data_->incoming_messages_.pop_front(); | |||
| 715 | } | |||
| 716 | ||||
| 717 | if (received->IsCloseMessage()) { | |||
| 718 | Close(); | |||
| 719 | return env()->no_message_symbol(); | |||
| 720 | } | |||
| 721 | ||||
| 722 | if (!env()->can_call_into_js()) return MaybeLocal<Value>(); | |||
| 723 | ||||
| 724 | return received->Deserialize(env(), context, port_list); | |||
| 725 | } | |||
| 726 | ||||
| 727 | void MessagePort::OnMessage(MessageProcessingMode mode) { | |||
| 728 | Debug(this, "Running MessagePort::OnMessage()"); | |||
| 729 | HandleScope handle_scope(env()->isolate()); | |||
| 730 | Local<Context> context = | |||
| 731 | object(env()->isolate())->GetCreationContext().ToLocalChecked(); | |||
| 732 | ||||
| 733 | size_t processing_limit; | |||
| 734 | if (mode == MessageProcessingMode::kNormalOperation) { | |||
| 735 | Mutex::ScopedLock(data_->mutex_); | |||
| 736 | processing_limit = std::max(data_->incoming_messages_.size(), | |||
| 737 | static_cast<size_t>(1000)); | |||
| 738 | } else { | |||
| 739 | processing_limit = std::numeric_limits<size_t>::max(); | |||
| 740 | } | |||
| 741 | ||||
| 742 | // data_ can only ever be modified by the owner thread, so no need to lock. | |||
| 743 | // However, the message port may be transferred while it is processing | |||
| 744 | // messages, so we need to check that this handle still owns its `data_` field | |||
| 745 | // on every iteration. | |||
| 746 | while (data_) { | |||
| 747 | if (processing_limit-- == 0) { | |||
| 748 | // Prevent event loop starvation by only processing those messages without | |||
| 749 | // interruption that were already present when the OnMessage() call was | |||
| 750 | // first triggered, but at least 1000 messages because otherwise the | |||
| 751 | // overhead of repeatedly triggering the uv_async_t instance becomes | |||
| 752 | // noticeable, at least on Windows. | |||
| 753 | // (That might require more investigation by somebody more familiar with | |||
| 754 | // Windows.) | |||
| 755 | TriggerAsync(); | |||
| 756 | return; | |||
| 757 | } | |||
| 758 | ||||
| 759 | HandleScope handle_scope(env()->isolate()); | |||
| 760 | Context::Scope context_scope(context); | |||
| 761 | Local<Function> emit_message = PersistentToLocal::Strong(emit_message_fn_); | |||
| 762 | ||||
| 763 | Local<Value> payload; | |||
| 764 | Local<Value> port_list = Undefined(env()->isolate()); | |||
| 765 | Local<Value> message_error; | |||
| 766 | Local<Value> argv[3]; | |||
| 767 | ||||
| 768 | { | |||
| 769 | // Catch any exceptions from parsing the message itself (not from | |||
| 770 | // emitting it) as 'messageeror' events. | |||
| 771 | TryCatchScope try_catch(env()); | |||
| 772 | if (!ReceiveMessage(context, mode, &port_list).ToLocal(&payload)) { | |||
| 773 | if (try_catch.HasCaught() && !try_catch.HasTerminated()) | |||
| 774 | message_error = try_catch.Exception(); | |||
| 775 | goto reschedule; | |||
| 776 | } | |||
| 777 | } | |||
| 778 | if (payload == env()->no_message_symbol()) break; | |||
| 779 | ||||
| 780 | if (!env()->can_call_into_js()) { | |||
| 781 | Debug(this, "MessagePort drains queue because !can_call_into_js()"); | |||
| 782 | // In this case there is nothing to do but to drain the current queue. | |||
| 783 | continue; | |||
| 784 | } | |||
| 785 | ||||
| 786 | argv[0] = payload; | |||
| 787 | argv[1] = port_list; | |||
| 788 | argv[2] = env()->message_string(); | |||
| 789 | ||||
| 790 | if (MakeCallback(emit_message, arraysize(argv), argv).IsEmpty()) { | |||
| 791 | reschedule: | |||
| 792 | if (!message_error.IsEmpty()) { | |||
| 793 | argv[0] = message_error; | |||
| 794 | argv[1] = Undefined(env()->isolate()); | |||
| 795 | argv[2] = env()->messageerror_string(); | |||
| 796 | USE(MakeCallback(emit_message, arraysize(argv), argv)); | |||
| 797 | } | |||
| 798 | ||||
| 799 | // Re-schedule OnMessage() execution in case of failure. | |||
| 800 | if (data_) | |||
| 801 | TriggerAsync(); | |||
| 802 | return; | |||
| 803 | } | |||
| 804 | } | |||
| 805 | } | |||
| 806 | ||||
| 807 | void MessagePort::OnClose() { | |||
| 808 | Debug(this, "MessagePort::OnClose()"); | |||
| 809 | if (data_) { | |||
| 810 | // Detach() returns move(data_). | |||
| 811 | Detach()->Disentangle(); | |||
| 812 | } | |||
| 813 | } | |||
| 814 | ||||
| 815 | std::unique_ptr<MessagePortData> MessagePort::Detach() { | |||
| 816 | CHECK(data_)do { if (__builtin_expect(!!(!(data_)), 0)) { do { static const node::AssertionInfo args = { "../src/node_messaging.cc" ":" "816" , "data_", __PRETTY_FUNCTION__ }; node::Assert(args); } while (0); } } while (0); | |||
| 817 | Mutex::ScopedLock lock(data_->mutex_); | |||
| 818 | data_->owner_ = nullptr; | |||
| 819 | return std::move(data_); | |||
| 820 | } | |||
| 821 | ||||
| 822 | BaseObject::TransferMode MessagePort::GetTransferMode() const { | |||
| 823 | if (IsDetached()) | |||
| 824 | return BaseObject::TransferMode::kUntransferable; | |||
| 825 | return BaseObject::TransferMode::kTransferable; | |||
| 826 | } | |||
| 827 | ||||
| 828 | std::unique_ptr<TransferData> MessagePort::TransferForMessaging() { | |||
| 829 | Close(); | |||
| 830 | return Detach(); | |||
| 831 | } | |||
| 832 | ||||
| 833 | BaseObjectPtr<BaseObject> MessagePortData::Deserialize( | |||
| 834 | Environment* env, | |||
| 835 | Local<Context> context, | |||
| 836 | std::unique_ptr<TransferData> self) { | |||
| 837 | return BaseObjectPtr<MessagePort> { MessagePort::New( | |||
| 838 | env, context, | |||
| 839 | static_unique_pointer_cast<MessagePortData>(std::move(self))) }; | |||
| 840 | } | |||
| 841 | ||||
| 842 | Maybe<bool> MessagePort::PostMessage(Environment* env, | |||
| 843 | Local<Context> context, | |||
| 844 | Local<Value> message_v, | |||
| 845 | const TransferList& transfer_v) { | |||
| 846 | Isolate* isolate = env->isolate(); | |||
| 847 | Local<Object> obj = object(isolate); | |||
| 848 | ||||
| 849 | std::shared_ptr<Message> msg = std::make_shared<Message>(); | |||
| 850 | ||||
| 851 | // Per spec, we need to both check if transfer list has the source port, and | |||
| 852 | // serialize the input message, even if the MessagePort is closed or detached. | |||
| 853 | ||||
| 854 | Maybe<bool> serialization_maybe = | |||
| 855 | msg->Serialize(env, context, message_v, transfer_v, obj); | |||
| 856 | if (data_ == nullptr) { | |||
| 857 | return serialization_maybe; | |||
| 858 | } | |||
| 859 | if (serialization_maybe.IsNothing()) { | |||
| 860 | return Nothing<bool>(); | |||
| 861 | } | |||
| 862 | ||||
| 863 | std::string error; | |||
| 864 | Maybe<bool> res = data_->Dispatch(msg, &error); | |||
| 865 | if (res.IsNothing()) | |||
| 866 | return res; | |||
| 867 | ||||
| 868 | if (!error.empty()) | |||
| 869 | ProcessEmitWarning(env, error.c_str()); | |||
| 870 | ||||
| 871 | return res; | |||
| 872 | } | |||
| 873 | ||||
| 874 | Maybe<bool> MessagePortData::Dispatch( | |||
| 875 | std::shared_ptr<Message> message, | |||
| 876 | std::string* error) { | |||
| 877 | if (!group_) { | |||
| 878 | if (error != nullptr) | |||
| 879 | *error = "MessagePortData is not entangled."; | |||
| 880 | return Nothing<bool>(); | |||
| 881 | } | |||
| 882 | return group_->Dispatch(this, message, error); | |||
| 883 | } | |||
| 884 | ||||
| 885 | static Maybe<bool> ReadIterable(Environment* env, | |||
| 886 | Local<Context> context, | |||
| 887 | // NOLINTNEXTLINE(runtime/references) | |||
| 888 | TransferList& transfer_list, | |||
| 889 | Local<Value> object) { | |||
| 890 | if (!object->IsObject()) return Just(false); | |||
| 891 | ||||
| 892 | if (object->IsArray()) { | |||
| 893 | Local<Array> arr = object.As<Array>(); | |||
| 894 | size_t length = arr->Length(); | |||
| 895 | transfer_list.AllocateSufficientStorage(length); | |||
| 896 | for (size_t i = 0; i < length; i++) { | |||
| 897 | if (!arr->Get(context, i).ToLocal(&transfer_list[i])) | |||
| 898 | return Nothing<bool>(); | |||
| 899 | } | |||
| 900 | return Just(true); | |||
| 901 | } | |||
| 902 | ||||
| 903 | Isolate* isolate = env->isolate(); | |||
| 904 | Local<Value> iterator_method; | |||
| 905 | if (!object.As<Object>()->Get(context, Symbol::GetIterator(isolate)) | |||
| 906 | .ToLocal(&iterator_method)) return Nothing<bool>(); | |||
| 907 | if (!iterator_method->IsFunction()) return Just(false); | |||
| 908 | ||||
| 909 | Local<Value> iterator; | |||
| 910 | if (!iterator_method.As<Function>()->Call(context, object, 0, nullptr) | |||
| 911 | .ToLocal(&iterator)) return Nothing<bool>(); | |||
| 912 | if (!iterator->IsObject()) return Just(false); | |||
| 913 | ||||
| 914 | Local<Value> next; | |||
| 915 | if (!iterator.As<Object>()->Get(context, env->next_string()).ToLocal(&next)) | |||
| 916 | return Nothing<bool>(); | |||
| 917 | if (!next->IsFunction()) return Just(false); | |||
| 918 | ||||
| 919 | std::vector<Local<Value>> entries; | |||
| 920 | while (env->can_call_into_js()) { | |||
| 921 | Local<Value> result; | |||
| 922 | if (!next.As<Function>()->Call(context, iterator, 0, nullptr) | |||
| 923 | .ToLocal(&result)) return Nothing<bool>(); | |||
| 924 | if (!result->IsObject()) return Just(false); | |||
| 925 | ||||
| 926 | Local<Value> done; | |||
| 927 | if (!result.As<Object>()->Get(context, env->done_string()).ToLocal(&done)) | |||
| 928 | return Nothing<bool>(); | |||
| 929 | if (done->BooleanValue(isolate)) break; | |||
| 930 | ||||
| 931 | Local<Value> val; | |||
| 932 | if (!result.As<Object>()->Get(context, env->value_string()).ToLocal(&val)) | |||
| 933 | return Nothing<bool>(); | |||
| 934 | entries.push_back(val); | |||
| 935 | } | |||
| 936 | ||||
| 937 | transfer_list.AllocateSufficientStorage(entries.size()); | |||
| 938 | std::copy(entries.begin(), entries.end(), &transfer_list[0]); | |||
| 939 | return Just(true); | |||
| 940 | } | |||
| 941 | ||||
| 942 | void MessagePort::PostMessage(const FunctionCallbackInfo<Value>& args) { | |||
| 943 | Environment* env = Environment::GetCurrent(args); | |||
| 944 | Local<Object> obj = args.This(); | |||
| 945 | Local<Context> context = obj->GetCreationContext().ToLocalChecked(); | |||
| 946 | ||||
| 947 | if (args.Length() == 0) { | |||
| 948 | return THROW_ERR_MISSING_ARGS(env, "Not enough arguments to " | |||
| 949 | "MessagePort.postMessage"); | |||
| 950 | } | |||
| 951 | ||||
| 952 | if (!args[1]->IsNullOrUndefined() && !args[1]->IsObject()) { | |||
| 953 | // Browsers ignore null or undefined, and otherwise accept an array or an | |||
| 954 | // options object. | |||
| 955 | return THROW_ERR_INVALID_ARG_TYPE(env, | |||
| 956 | "Optional transferList argument must be an iterable"); | |||
| 957 | } | |||
| 958 | ||||
| 959 | TransferList transfer_list; | |||
| 960 | if (args[1]->IsObject()) { | |||
| 961 | bool was_iterable; | |||
| 962 | if (!ReadIterable(env, context, transfer_list, args[1]).To(&was_iterable)) | |||
| 963 | return; | |||
| 964 | if (!was_iterable) { | |||
| 965 | Local<Value> transfer_option; | |||
| 966 | if (!args[1].As<Object>()->Get(context, env->transfer_string()) | |||
| 967 | .ToLocal(&transfer_option)) return; | |||
| 968 | if (!transfer_option->IsUndefined()) { | |||
| 969 | if (!ReadIterable(env, context, transfer_list, transfer_option) | |||
| 970 | .To(&was_iterable)) return; | |||
| 971 | if (!was_iterable) { | |||
| 972 | return THROW_ERR_INVALID_ARG_TYPE(env, | |||
| 973 | "Optional options.transfer argument must be an iterable"); | |||
| 974 | } | |||
| 975 | } | |||
| 976 | } | |||
| 977 | } | |||
| 978 | ||||
| 979 | MessagePort* port = Unwrap<MessagePort>(args.This()); | |||
| 980 | // Even if the backing MessagePort object has already been deleted, we still | |||
| 981 | // want to serialize the message to ensure spec-compliant behavior w.r.t. | |||
| 982 | // transfers. | |||
| 983 | if (port == nullptr || port->IsHandleClosing()) { | |||
| 984 | Message msg; | |||
| 985 | USE(msg.Serialize(env, context, args[0], transfer_list, obj)); | |||
| 986 | return; | |||
| 987 | } | |||
| 988 | ||||
| 989 | Maybe<bool> res = port->PostMessage(env, context, args[0], transfer_list); | |||
| 990 | if (res.IsJust()) | |||
| 991 | args.GetReturnValue().Set(res.FromJust()); | |||
| 992 | } | |||
| 993 | ||||
| 994 | void MessagePort::Start() { | |||
| 995 | Debug(this, "Start receiving messages"); | |||
| 996 | receiving_messages_ = true; | |||
| 997 | Mutex::ScopedLock lock(data_->mutex_); | |||
| 998 | if (!data_->incoming_messages_.empty()) | |||
| 999 | TriggerAsync(); | |||
| 1000 | } | |||
| 1001 | ||||
| 1002 | void MessagePort::Stop() { | |||
| 1003 | Debug(this, "Stop receiving messages"); | |||
| 1004 | receiving_messages_ = false; | |||
| 1005 | } | |||
| 1006 | ||||
| 1007 | void MessagePort::Start(const FunctionCallbackInfo<Value>& args) { | |||
| 1008 | MessagePort* port; | |||
| 1009 | ASSIGN_OR_RETURN_UNWRAP(&port, args.This())do { *&port = static_cast<typename std::remove_reference <decltype(*&port)>::type>( BaseObject::FromJSObject (args.This())); if (*&port == nullptr) return ; } while ( 0); | |||
| 1010 | if (!port->data_) { | |||
| 1011 | return; | |||
| 1012 | } | |||
| 1013 | port->Start(); | |||
| 1014 | } | |||
| 1015 | ||||
| 1016 | void MessagePort::Stop(const FunctionCallbackInfo<Value>& args) { | |||
| 1017 | MessagePort* port; | |||
| 1018 | CHECK(args[0]->IsObject())do { if (__builtin_expect(!!(!(args[0]->IsObject())), 0)) { do { static const node::AssertionInfo args = { "../src/node_messaging.cc" ":" "1018", "args[0]->IsObject()", __PRETTY_FUNCTION__ }; node::Assert(args); } while (0); } } while (0); | |||
| 1019 | ASSIGN_OR_RETURN_UNWRAP(&port, args[0].As<Object>())do { *&port = static_cast<typename std::remove_reference <decltype(*&port)>::type>( BaseObject::FromJSObject (args[0].As<Object>())); if (*&port == nullptr) return ; } while (0); | |||
| 1020 | if (!port->data_) { | |||
| 1021 | return; | |||
| 1022 | } | |||
| 1023 | port->Stop(); | |||
| 1024 | } | |||
| 1025 | ||||
| 1026 | void MessagePort::CheckType(const FunctionCallbackInfo<Value>& args) { | |||
| 1027 | Environment* env = Environment::GetCurrent(args); | |||
| 1028 | args.GetReturnValue().Set( | |||
| 1029 | GetMessagePortConstructorTemplate(env)->HasInstance(args[0])); | |||
| 1030 | } | |||
| 1031 | ||||
| 1032 | void MessagePort::Drain(const FunctionCallbackInfo<Value>& args) { | |||
| 1033 | MessagePort* port; | |||
| 1034 | ASSIGN_OR_RETURN_UNWRAP(&port, args[0].As<Object>())do { *&port = static_cast<typename std::remove_reference <decltype(*&port)>::type>( BaseObject::FromJSObject (args[0].As<Object>())); if (*&port == nullptr) return ; } while (0); | |||
| 1035 | port->OnMessage(MessageProcessingMode::kForceReadMessages); | |||
| 1036 | } | |||
| 1037 | ||||
| 1038 | void MessagePort::ReceiveMessage(const FunctionCallbackInfo<Value>& args) { | |||
| 1039 | Environment* env = Environment::GetCurrent(args); | |||
| 1040 | if (!args[0]->IsObject() || | |||
| 1041 | !env->message_port_constructor_template()->HasInstance(args[0])) { | |||
| 1042 | return THROW_ERR_INVALID_ARG_TYPE(env, | |||
| 1043 | "The \"port\" argument must be a MessagePort instance"); | |||
| 1044 | } | |||
| 1045 | MessagePort* port = Unwrap<MessagePort>(args[0].As<Object>()); | |||
| 1046 | if (port == nullptr) { | |||
| 1047 | // Return 'no messages' for a closed port. | |||
| 1048 | args.GetReturnValue().Set( | |||
| 1049 | Environment::GetCurrent(args)->no_message_symbol()); | |||
| 1050 | return; | |||
| 1051 | } | |||
| 1052 | ||||
| 1053 | MaybeLocal<Value> payload = port->ReceiveMessage( | |||
| 1054 | port->object()->GetCreationContext().ToLocalChecked(), | |||
| 1055 | MessageProcessingMode::kForceReadMessages); | |||
| 1056 | if (!payload.IsEmpty()) | |||
| 1057 | args.GetReturnValue().Set(payload.ToLocalChecked()); | |||
| 1058 | } | |||
| 1059 | ||||
| 1060 | void MessagePort::MoveToContext(const FunctionCallbackInfo<Value>& args) { | |||
| 1061 | Environment* env = Environment::GetCurrent(args); | |||
| 1062 | if (!args[0]->IsObject() || | |||
| 1063 | !env->message_port_constructor_template()->HasInstance(args[0])) { | |||
| 1064 | return THROW_ERR_INVALID_ARG_TYPE(env, | |||
| 1065 | "The \"port\" argument must be a MessagePort instance"); | |||
| 1066 | } | |||
| 1067 | MessagePort* port = Unwrap<MessagePort>(args[0].As<Object>()); | |||
| 1068 | if (port == nullptr || port->IsHandleClosing()) { | |||
| 1069 | Isolate* isolate = env->isolate(); | |||
| 1070 | THROW_ERR_CLOSED_MESSAGE_PORT(isolate); | |||
| 1071 | return; | |||
| 1072 | } | |||
| 1073 | ||||
| 1074 | Local<Value> context_arg = args[1]; | |||
| 1075 | ContextifyContext* context_wrapper; | |||
| 1076 | if (!context_arg->IsObject() || | |||
| 1077 | (context_wrapper = ContextifyContext::ContextFromContextifiedSandbox( | |||
| 1078 | env, context_arg.As<Object>())) == nullptr) { | |||
| 1079 | return THROW_ERR_INVALID_ARG_TYPE(env, "Invalid context argument"); | |||
| 1080 | } | |||
| 1081 | ||||
| 1082 | std::unique_ptr<MessagePortData> data; | |||
| 1083 | if (!port->IsDetached()) | |||
| 1084 | data = port->Detach(); | |||
| 1085 | ||||
| 1086 | Context::Scope context_scope(context_wrapper->context()); | |||
| 1087 | MessagePort* target = | |||
| 1088 | MessagePort::New(env, context_wrapper->context(), std::move(data)); | |||
| 1089 | if (target != nullptr) | |||
| 1090 | args.GetReturnValue().Set(target->object()); | |||
| 1091 | } | |||
| 1092 | ||||
| 1093 | void MessagePort::Entangle(MessagePort* a, MessagePort* b) { | |||
| 1094 | MessagePortData::Entangle(a->data_.get(), b->data_.get()); | |||
| 1095 | } | |||
| 1096 | ||||
| 1097 | void MessagePort::Entangle(MessagePort* a, MessagePortData* b) { | |||
| 1098 | MessagePortData::Entangle(a->data_.get(), b); | |||
| 1099 | } | |||
| 1100 | ||||
| 1101 | void MessagePort::MemoryInfo(MemoryTracker* tracker) const { | |||
| 1102 | tracker->TrackField("data", data_); | |||
| 1103 | tracker->TrackField("emit_message_fn", emit_message_fn_); | |||
| 1104 | } | |||
| 1105 | ||||
| 1106 | Local<FunctionTemplate> GetMessagePortConstructorTemplate(Environment* env) { | |||
| 1107 | // Factor generating the MessagePort JS constructor into its own piece | |||
| 1108 | // of code, because it is needed early on in the child environment setup. | |||
| 1109 | Local<FunctionTemplate> templ = env->message_port_constructor_template(); | |||
| 1110 | if (!templ.IsEmpty()) | |||
| 1111 | return templ; | |||
| 1112 | ||||
| 1113 | { | |||
| 1114 | Local<FunctionTemplate> m = env->NewFunctionTemplate(MessagePort::New); | |||
| 1115 | m->SetClassName(env->message_port_constructor_string()); | |||
| 1116 | m->InstanceTemplate()->SetInternalFieldCount( | |||
| 1117 | MessagePort::kInternalFieldCount); | |||
| 1118 | m->Inherit(HandleWrap::GetConstructorTemplate(env)); | |||
| 1119 | ||||
| 1120 | env->SetProtoMethod(m, "postMessage", MessagePort::PostMessage); | |||
| 1121 | env->SetProtoMethod(m, "start", MessagePort::Start); | |||
| 1122 | ||||
| 1123 | env->set_message_port_constructor_template(m); | |||
| 1124 | } | |||
| 1125 | ||||
| 1126 | return GetMessagePortConstructorTemplate(env); | |||
| 1127 | } | |||
| 1128 | ||||
| 1129 | JSTransferable::JSTransferable(Environment* env, Local<Object> obj) | |||
| 1130 | : BaseObject(env, obj) { | |||
| 1131 | MakeWeak(); | |||
| 1132 | } | |||
| 1133 | ||||
| 1134 | void JSTransferable::New(const FunctionCallbackInfo<Value>& args) { | |||
| 1135 | CHECK(args.IsConstructCall())do { if (__builtin_expect(!!(!(args.IsConstructCall())), 0)) { do { static const node::AssertionInfo args = { "../src/node_messaging.cc" ":" "1135", "args.IsConstructCall()", __PRETTY_FUNCTION__ }; node::Assert(args); } while (0); } } while (0); | |||
| 1136 | new JSTransferable(Environment::GetCurrent(args), args.This()); | |||
| 1137 | } | |||
| 1138 | ||||
| 1139 | JSTransferable::TransferMode JSTransferable::GetTransferMode() const { | |||
| 1140 | // Implement `kClone in this ? kCloneable : kTransferable`. | |||
| 1141 | HandleScope handle_scope(env()->isolate()); | |||
| 1142 | errors::TryCatchScope ignore_exceptions(env()); | |||
| 1143 | ||||
| 1144 | bool has_clone; | |||
| 1145 | if (!object()->Has(env()->context(), | |||
| 1146 | env()->messaging_clone_symbol()).To(&has_clone)) { | |||
| 1147 | return TransferMode::kUntransferable; | |||
| 1148 | } | |||
| 1149 | ||||
| 1150 | return has_clone ? TransferMode::kCloneable : TransferMode::kTransferable; | |||
| 1151 | } | |||
| 1152 | ||||
| 1153 | std::unique_ptr<TransferData> JSTransferable::TransferForMessaging() { | |||
| 1154 | return TransferOrClone(TransferMode::kTransferable); | |||
| 1155 | } | |||
| 1156 | ||||
| 1157 | std::unique_ptr<TransferData> JSTransferable::CloneForMessaging() const { | |||
| 1158 | return TransferOrClone(TransferMode::kCloneable); | |||
| 1159 | } | |||
| 1160 | ||||
| 1161 | std::unique_ptr<TransferData> JSTransferable::TransferOrClone( | |||
| 1162 | TransferMode mode) const { | |||
| 1163 | // Call `this[symbol]()` where `symbol` is `kClone` or `kTransfer`, | |||
| 1164 | // which should return an object with `data` and `deserializeInfo` properties; | |||
| 1165 | // `data` is written to the serializer later, and `deserializeInfo` is stored | |||
| 1166 | // on the `TransferData` instance as a string. | |||
| 1167 | HandleScope handle_scope(env()->isolate()); | |||
| 1168 | Local<Context> context = env()->isolate()->GetCurrentContext(); | |||
| 1169 | Local<Symbol> method_name = mode == TransferMode::kCloneable ? | |||
| 1170 | env()->messaging_clone_symbol() : env()->messaging_transfer_symbol(); | |||
| 1171 | ||||
| 1172 | Local<Value> method; | |||
| 1173 | if (!object()->Get(context, method_name).ToLocal(&method)) { | |||
| 1174 | return {}; | |||
| 1175 | } | |||
| 1176 | if (method->IsFunction()) { | |||
| 1177 | Local<Value> result_v; | |||
| 1178 | if (!method.As<Function>()->Call( | |||
| 1179 | context, object(), 0, nullptr).ToLocal(&result_v)) { | |||
| 1180 | return {}; | |||
| 1181 | } | |||
| 1182 | ||||
| 1183 | if (result_v->IsObject()) { | |||
| 1184 | Local<Object> result = result_v.As<Object>(); | |||
| 1185 | Local<Value> data; | |||
| 1186 | Local<Value> deserialize_info; | |||
| 1187 | if (!result->Get(context, env()->data_string()).ToLocal(&data) || | |||
| 1188 | !result->Get(context, env()->deserialize_info_string()) | |||
| 1189 | .ToLocal(&deserialize_info)) { | |||
| 1190 | return {}; | |||
| 1191 | } | |||
| 1192 | Utf8Value deserialize_info_str(env()->isolate(), deserialize_info); | |||
| 1193 | if (*deserialize_info_str == nullptr) return {}; | |||
| 1194 | return std::make_unique<Data>( | |||
| 1195 | *deserialize_info_str, Global<Value>(env()->isolate(), data)); | |||
| 1196 | } | |||
| 1197 | } | |||
| 1198 | ||||
| 1199 | if (mode == TransferMode::kTransferable) | |||
| 1200 | return TransferOrClone(TransferMode::kCloneable); | |||
| 1201 | else | |||
| 1202 | return {}; | |||
| 1203 | } | |||
| 1204 | ||||
| 1205 | Maybe<BaseObjectList> | |||
| 1206 | JSTransferable::NestedTransferables() const { | |||
| 1207 | // Call `this[kTransferList]()` and return the resulting list of BaseObjects. | |||
| 1208 | HandleScope handle_scope(env()->isolate()); | |||
| 1209 | Local<Context> context = env()->isolate()->GetCurrentContext(); | |||
| 1210 | Local<Symbol> method_name = env()->messaging_transfer_list_symbol(); | |||
| 1211 | ||||
| 1212 | Local<Value> method; | |||
| 1213 | if (!object()->Get(context, method_name).ToLocal(&method)) { | |||
| 1214 | return Nothing<BaseObjectList>(); | |||
| 1215 | } | |||
| 1216 | if (!method->IsFunction()) return Just(BaseObjectList {}); | |||
| 1217 | ||||
| 1218 | Local<Value> list_v; | |||
| 1219 | if (!method.As<Function>()->Call( | |||
| 1220 | context, object(), 0, nullptr).ToLocal(&list_v)) { | |||
| 1221 | return Nothing<BaseObjectList>(); | |||
| 1222 | } | |||
| 1223 | if (!list_v->IsArray()) return Just(BaseObjectList {}); | |||
| 1224 | Local<Array> list = list_v.As<Array>(); | |||
| 1225 | ||||
| 1226 | BaseObjectList ret; | |||
| 1227 | for (size_t i = 0; i < list->Length(); i++) { | |||
| 1228 | Local<Value> value; | |||
| 1229 | if (!list->Get(context, i).ToLocal(&value)) | |||
| 1230 | return Nothing<BaseObjectList>(); | |||
| 1231 | if (env()->base_object_ctor_template()->HasInstance(value)) | |||
| 1232 | ret.emplace_back(Unwrap<BaseObject>(value)); | |||
| 1233 | } | |||
| 1234 | return Just(ret); | |||
| 1235 | } | |||
| 1236 | ||||
| 1237 | Maybe<bool> JSTransferable::FinalizeTransferRead( | |||
| 1238 | Local<Context> context, ValueDeserializer* deserializer) { | |||
| 1239 | // Call `this[kDeserialize](data)` where `data` comes from the return value | |||
| 1240 | // of `this[kTransfer]()` or `this[kClone]()`. | |||
| 1241 | HandleScope handle_scope(env()->isolate()); | |||
| 1242 | Local<Value> data; | |||
| 1243 | if (!deserializer->ReadValue(context).ToLocal(&data)) return Nothing<bool>(); | |||
| 1244 | ||||
| 1245 | Local<Symbol> method_name = env()->messaging_deserialize_symbol(); | |||
| 1246 | Local<Value> method; | |||
| 1247 | if (!object()->Get(context, method_name).ToLocal(&method)) { | |||
| 1248 | return Nothing<bool>(); | |||
| 1249 | } | |||
| 1250 | if (!method->IsFunction()) return Just(true); | |||
| 1251 | ||||
| 1252 | if (method.As<Function>()->Call(context, object(), 1, &data).IsEmpty()) { | |||
| 1253 | return Nothing<bool>(); | |||
| 1254 | } | |||
| 1255 | return Just(true); | |||
| 1256 | } | |||
| 1257 | ||||
| 1258 | JSTransferable::Data::Data(std::string&& deserialize_info, | |||
| 1259 | v8::Global<v8::Value>&& data) | |||
| 1260 | : deserialize_info_(std::move(deserialize_info)), | |||
| 1261 | data_(std::move(data)) {} | |||
| 1262 | ||||
| 1263 | BaseObjectPtr<BaseObject> JSTransferable::Data::Deserialize( | |||
| 1264 | Environment* env, | |||
| 1265 | Local<Context> context, | |||
| 1266 | std::unique_ptr<TransferData> self) { | |||
| 1267 | // Create the JS wrapper object that will later be filled with data passed to | |||
| 1268 | // the `[kDeserialize]()` method on it. This split is necessary, because here | |||
| 1269 | // we need to create an object with the right prototype and internal fields, | |||
| 1270 | // but the actual JS data stored in the serialized data can only be read at | |||
| 1271 | // the end of the stream, after the main message has been read. | |||
| 1272 | ||||
| 1273 | if (context != env->context()) { | |||
| 1274 | THROW_ERR_MESSAGE_TARGET_CONTEXT_UNAVAILABLE(env); | |||
| 1275 | return {}; | |||
| 1276 | } | |||
| 1277 | HandleScope handle_scope(env->isolate()); | |||
| 1278 | Local<Value> info; | |||
| 1279 | if (!ToV8Value(context, deserialize_info_).ToLocal(&info)) return {}; | |||
| 1280 | ||||
| 1281 | Local<Value> ret; | |||
| 1282 | CHECK(!env->messaging_deserialize_create_object().IsEmpty())do { if (__builtin_expect(!!(!(!env->messaging_deserialize_create_object ().IsEmpty())), 0)) { do { static const node::AssertionInfo args = { "../src/node_messaging.cc" ":" "1282", "!env->messaging_deserialize_create_object().IsEmpty()" , __PRETTY_FUNCTION__ }; node::Assert(args); } while (0); } } while (0); | |||
| 1283 | if (!env->messaging_deserialize_create_object()->Call( | |||
| 1284 | context, Null(env->isolate()), 1, &info).ToLocal(&ret) || | |||
| 1285 | !env->base_object_ctor_template()->HasInstance(ret)) { | |||
| 1286 | return {}; | |||
| 1287 | } | |||
| 1288 | ||||
| 1289 | return BaseObjectPtr<BaseObject> { Unwrap<BaseObject>(ret) }; | |||
| 1290 | } | |||
| 1291 | ||||
| 1292 | Maybe<bool> JSTransferable::Data::FinalizeTransferWrite( | |||
| 1293 | Local<Context> context, ValueSerializer* serializer) { | |||
| 1294 | HandleScope handle_scope(context->GetIsolate()); | |||
| 1295 | auto ret = serializer->WriteValue(context, PersistentToLocal::Strong(data_)); | |||
| 1296 | data_.Reset(); | |||
| 1297 | return ret; | |||
| 1298 | } | |||
| 1299 | ||||
| 1300 | std::shared_ptr<SiblingGroup> SiblingGroup::Get(const std::string& name) { | |||
| 1301 | Mutex::ScopedLock lock(SiblingGroup::groups_mutex_); | |||
| 1302 | std::shared_ptr<SiblingGroup> group; | |||
| 1303 | auto i = groups_.find(name); | |||
| 1304 | if (i == groups_.end() || i->second.expired()) { | |||
| 1305 | group = std::make_shared<SiblingGroup>(name); | |||
| 1306 | groups_[name] = group; | |||
| 1307 | } else { | |||
| 1308 | group = i->second.lock(); | |||
| 1309 | } | |||
| 1310 | return group; | |||
| 1311 | } | |||
| 1312 | ||||
| 1313 | void SiblingGroup::CheckSiblingGroup(const std::string& name) { | |||
| 1314 | Mutex::ScopedLock lock(SiblingGroup::groups_mutex_); | |||
| 1315 | auto i = groups_.find(name); | |||
| 1316 | if (i != groups_.end() && i->second.expired()) | |||
| 1317 | groups_.erase(name); | |||
| 1318 | } | |||
| 1319 | ||||
| 1320 | SiblingGroup::SiblingGroup(const std::string& name) | |||
| 1321 | : name_(name) { } | |||
| 1322 | ||||
| 1323 | SiblingGroup::~SiblingGroup() { | |||
| 1324 | // If this is a named group, check to see if we can remove the group | |||
| 1325 | if (!name_.empty()) | |||
| 1326 | CheckSiblingGroup(name_); | |||
| 1327 | } | |||
| 1328 | ||||
| 1329 | Maybe<bool> SiblingGroup::Dispatch( | |||
| 1330 | MessagePortData* source, | |||
| 1331 | std::shared_ptr<Message> message, | |||
| 1332 | std::string* error) { | |||
| 1333 | ||||
| 1334 | RwLock::ScopedReadLock lock(group_mutex_); | |||
| 1335 | ||||
| 1336 | // The source MessagePortData is not part of this group. | |||
| 1337 | if (ports_.find(source) == ports_.end()) { | |||
| 1338 | if (error != nullptr) | |||
| 1339 | *error = "Source MessagePort is not entangled with this group."; | |||
| 1340 | return Nothing<bool>(); | |||
| 1341 | } | |||
| 1342 | ||||
| 1343 | // There are no destination ports. | |||
| 1344 | if (size() <= 1) | |||
| 1345 | return Just(false); | |||
| 1346 | ||||
| 1347 | // Transferables cannot be used when there is more | |||
| 1348 | // than a single destination. | |||
| 1349 | if (size() > 2 && message->has_transferables()) { | |||
| 1350 | if (error != nullptr) | |||
| 1351 | *error = "Transferables cannot be used with multiple destinations."; | |||
| 1352 | return Nothing<bool>(); | |||
| 1353 | } | |||
| 1354 | ||||
| 1355 | for (MessagePortData* port : ports_) { | |||
| 1356 | if (port == source) | |||
| 1357 | continue; | |||
| 1358 | // This loop should only be entered if there's only a single destination | |||
| 1359 | for (const auto& transferable : message->transferables()) { | |||
| 1360 | if (port == transferable.get()) { | |||
| 1361 | if (error != nullptr) { | |||
| 1362 | *error = "The target port was posted to itself, and the " | |||
| 1363 | "communication channel was lost"; | |||
| 1364 | } | |||
| 1365 | return Just(true); | |||
| 1366 | } | |||
| 1367 | } | |||
| 1368 | port->AddToIncomingQueue(message); | |||
| 1369 | } | |||
| 1370 | ||||
| 1371 | return Just(true); | |||
| 1372 | } | |||
| 1373 | ||||
| 1374 | void SiblingGroup::Entangle(MessagePortData* port) { | |||
| 1375 | Entangle({ port }); | |||
| 1376 | } | |||
| 1377 | ||||
| 1378 | void SiblingGroup::Entangle(std::initializer_list<MessagePortData*> ports) { | |||
| 1379 | RwLock::ScopedWriteLock lock(group_mutex_); | |||
| 1380 | for (MessagePortData* data : ports) { | |||
| 1381 | ports_.insert(data); | |||
| 1382 | CHECK(!data->group_)do { if (__builtin_expect(!!(!(!data->group_)), 0)) { do { static const node::AssertionInfo args = { "../src/node_messaging.cc" ":" "1382", "!data->group_", __PRETTY_FUNCTION__ }; node:: Assert(args); } while (0); } } while (0); | |||
| 1383 | data->group_ = shared_from_this(); | |||
| 1384 | } | |||
| 1385 | } | |||
| 1386 | ||||
| 1387 | void SiblingGroup::Disentangle(MessagePortData* data) { | |||
| 1388 | auto self = shared_from_this(); // Keep alive until end of function. | |||
| 1389 | RwLock::ScopedWriteLock lock(group_mutex_); | |||
| 1390 | ports_.erase(data); | |||
| 1391 | data->group_.reset(); | |||
| 1392 | ||||
| 1393 | data->AddToIncomingQueue(std::make_shared<Message>()); | |||
| 1394 | // If this is an anonymous group and there's another port, close it. | |||
| 1395 | if (size() == 1 && name_.empty()) | |||
| 1396 | (*(ports_.begin()))->AddToIncomingQueue(std::make_shared<Message>()); | |||
| 1397 | } | |||
| 1398 | ||||
| 1399 | SiblingGroup::Map SiblingGroup::groups_; | |||
| 1400 | Mutex SiblingGroup::groups_mutex_; | |||
| 1401 | ||||
| 1402 | namespace { | |||
| 1403 | ||||
| 1404 | static void SetDeserializerCreateObjectFunction( | |||
| 1405 | const FunctionCallbackInfo<Value>& args) { | |||
| 1406 | Environment* env = Environment::GetCurrent(args); | |||
| 1407 | CHECK(args[0]->IsFunction())do { if (__builtin_expect(!!(!(args[0]->IsFunction())), 0) ) { do { static const node::AssertionInfo args = { "../src/node_messaging.cc" ":" "1407", "args[0]->IsFunction()", __PRETTY_FUNCTION__ } ; node::Assert(args); } while (0); } } while (0); | |||
| 1408 | env->set_messaging_deserialize_create_object(args[0].As<Function>()); | |||
| 1409 | } | |||
| 1410 | ||||
| 1411 | static void MessageChannel(const FunctionCallbackInfo<Value>& args) { | |||
| 1412 | Environment* env = Environment::GetCurrent(args); | |||
| 1413 | if (!args.IsConstructCall()) { | |||
| ||||
| 1414 | THROW_ERR_CONSTRUCT_CALL_REQUIRED(env); | |||
| 1415 | return; | |||
| 1416 | } | |||
| 1417 | ||||
| 1418 | Local<Context> context = args.This()->GetCreationContext().ToLocalChecked(); | |||
| 1419 | Context::Scope context_scope(context); | |||
| 1420 | ||||
| 1421 | MessagePort* port1 = MessagePort::New(env, context); | |||
| 1422 | if (port1 == nullptr) return; | |||
| 1423 | MessagePort* port2 = MessagePort::New(env, context); | |||
| 1424 | if (port2 == nullptr) { | |||
| 1425 | port1->Close(); | |||
| 1426 | return; | |||
| 1427 | } | |||
| 1428 | ||||
| 1429 | MessagePort::Entangle(port1, port2); | |||
| 1430 | ||||
| 1431 | args.This()->Set(context, env->port1_string(), port1->object()) | |||
| 1432 | .Check(); | |||
| 1433 | args.This()->Set(context, env->port2_string(), port2->object()) | |||
| 1434 | .Check(); | |||
| 1435 | } | |||
| 1436 | ||||
| 1437 | static void BroadcastChannel(const FunctionCallbackInfo<Value>& args) { | |||
| 1438 | CHECK(args[0]->IsString())do { if (__builtin_expect(!!(!(args[0]->IsString())), 0)) { do { static const node::AssertionInfo args = { "../src/node_messaging.cc" ":" "1438", "args[0]->IsString()", __PRETTY_FUNCTION__ }; node::Assert(args); } while (0); } } while (0); | |||
| 1439 | Environment* env = Environment::GetCurrent(args); | |||
| 1440 | Context::Scope context_scope(env->context()); | |||
| 1441 | Utf8Value name(env->isolate(), args[0]); | |||
| 1442 | MessagePort* port = | |||
| 1443 | MessagePort::New(env, env->context(), {}, SiblingGroup::Get(*name)); | |||
| 1444 | if (port != nullptr) { | |||
| 1445 | args.GetReturnValue().Set(port->object()); | |||
| 1446 | } | |||
| 1447 | } | |||
| 1448 | ||||
| 1449 | static void InitMessaging(Local<Object> target, | |||
| 1450 | Local<Value> unused, | |||
| 1451 | Local<Context> context, | |||
| 1452 | void* priv) { | |||
| 1453 | Environment* env = Environment::GetCurrent(context); | |||
| 1454 | ||||
| 1455 | { | |||
| 1456 | env->SetConstructorFunction( | |||
| 1457 | target, | |||
| 1458 | "MessageChannel", | |||
| 1459 | env->NewFunctionTemplate(MessageChannel)); | |||
| 1460 | } | |||
| 1461 | ||||
| 1462 | { | |||
| 1463 | Local<FunctionTemplate> t = env->NewFunctionTemplate(JSTransferable::New); | |||
| 1464 | t->Inherit(BaseObject::GetConstructorTemplate(env)); | |||
| 1465 | t->InstanceTemplate()->SetInternalFieldCount( | |||
| 1466 | JSTransferable::kInternalFieldCount); | |||
| 1467 | env->SetConstructorFunction(target, "JSTransferable", t); | |||
| 1468 | } | |||
| 1469 | ||||
| 1470 | env->SetConstructorFunction( | |||
| 1471 | target, | |||
| 1472 | env->message_port_constructor_string(), | |||
| 1473 | GetMessagePortConstructorTemplate(env)); | |||
| 1474 | ||||
| 1475 | // These are not methods on the MessagePort prototype, because | |||
| 1476 | // the browser equivalents do not provide them. | |||
| 1477 | env->SetMethod(target, "stopMessagePort", MessagePort::Stop); | |||
| 1478 | env->SetMethod(target, "checkMessagePort", MessagePort::CheckType); | |||
| 1479 | env->SetMethod(target, "drainMessagePort", MessagePort::Drain); | |||
| 1480 | env->SetMethod(target, "receiveMessageOnPort", MessagePort::ReceiveMessage); | |||
| 1481 | env->SetMethod(target, "moveMessagePortToContext", | |||
| 1482 | MessagePort::MoveToContext); | |||
| 1483 | env->SetMethod(target, "setDeserializerCreateObjectFunction", | |||
| 1484 | SetDeserializerCreateObjectFunction); | |||
| 1485 | env->SetMethod(target, "broadcastChannel", BroadcastChannel); | |||
| 1486 | ||||
| 1487 | { | |||
| 1488 | Local<Function> domexception = GetDOMException(context).ToLocalChecked(); | |||
| 1489 | target | |||
| 1490 | ->Set(context, | |||
| 1491 | FIXED_ONE_BYTE_STRING(env->isolate(), "DOMException"), | |||
| 1492 | domexception) | |||
| 1493 | .Check(); | |||
| 1494 | } | |||
| 1495 | } | |||
| 1496 | ||||
| 1497 | static void RegisterExternalReferences(ExternalReferenceRegistry* registry) { | |||
| 1498 | registry->Register(MessageChannel); | |||
| 1499 | registry->Register(BroadcastChannel); | |||
| 1500 | registry->Register(JSTransferable::New); | |||
| 1501 | registry->Register(MessagePort::New); | |||
| 1502 | registry->Register(MessagePort::PostMessage); | |||
| 1503 | registry->Register(MessagePort::Start); | |||
| 1504 | registry->Register(MessagePort::Stop); | |||
| 1505 | registry->Register(MessagePort::CheckType); | |||
| 1506 | registry->Register(MessagePort::Drain); | |||
| 1507 | registry->Register(MessagePort::ReceiveMessage); | |||
| 1508 | registry->Register(MessagePort::MoveToContext); | |||
| 1509 | registry->Register(SetDeserializerCreateObjectFunction); | |||
| 1510 | } | |||
| 1511 | ||||
| 1512 | } // anonymous namespace | |||
| 1513 | ||||
| 1514 | } // namespace worker | |||
| 1515 | } // namespace node | |||
| 1516 | ||||
| 1517 | NODE_MODULE_CONTEXT_AWARE_INTERNAL(messaging, node::worker::InitMessaging)static node::node_module _module = { 108, NM_F_INTERNAL, nullptr , "../src/node_messaging.cc", nullptr, (node::addon_context_register_func )(node::worker::InitMessaging), "messaging", nullptr, nullptr }; void _register_messaging() { node_module_register(&_module ); } | |||
| 1518 | NODE_MODULE_EXTERNAL_REFERENCE(messaging,void _register_external_reference_messaging( node::ExternalReferenceRegistry * registry) { node::worker::RegisterExternalReferences(registry ); } | |||
| 1519 | node::worker::RegisterExternalReferences)void _register_external_reference_messaging( node::ExternalReferenceRegistry * registry) { node::worker::RegisterExternalReferences(registry ); } |
| 1 | #ifndef SRC_NODE_MESSAGING_H_ |
| 2 | #define SRC_NODE_MESSAGING_H_ |
| 3 | |
| 4 | #if defined(NODE_WANT_INTERNALS1) && NODE_WANT_INTERNALS1 |
| 5 | |
| 6 | #include "env.h" |
| 7 | #include "node_mutex.h" |
| 8 | #include "v8.h" |
| 9 | #include <deque> |
| 10 | #include <string> |
| 11 | #include <unordered_map> |
| 12 | #include <set> |
| 13 | |
| 14 | namespace node { |
| 15 | namespace worker { |
| 16 | |
| 17 | class MessagePortData; |
| 18 | class MessagePort; |
| 19 | |
| 20 | typedef MaybeStackBuffer<v8::Local<v8::Value>, 8> TransferList; |
| 21 | |
| 22 | // Used to represent the in-flight structure of an object that is being |
| 23 | // transferred or cloned using postMessage(). |
| 24 | class TransferData : public MemoryRetainer { |
| 25 | public: |
| 26 | // Deserialize this object on the receiving end after a .postMessage() call. |
| 27 | // - `context` may not be the same as `env->context()`. This method should |
| 28 | // not produce JS objects coming from Contexts other than `context`. |
| 29 | // - `self` is a unique_ptr for the object that this is being called on. |
| 30 | // - The return value is treated like a `Maybe`, i.e. if `nullptr` is |
| 31 | // returned, any further deserialization of the message is stopped and |
| 32 | // control is returned to the event loop or JS as soon as possible. |
| 33 | virtual BaseObjectPtr<BaseObject> Deserialize( |
| 34 | Environment* env, |
| 35 | v8::Local<v8::Context> context, |
| 36 | std::unique_ptr<TransferData> self) = 0; |
| 37 | // FinalizeTransferWrite() is the counterpart to |
| 38 | // BaseObject::FinalizeTransferRead(). It is called right after the transfer |
| 39 | // data was created, and defaults to doing nothing. After this function, |
| 40 | // this object should not hold any more Isolate-specific data. |
| 41 | virtual v8::Maybe<bool> FinalizeTransferWrite( |
| 42 | v8::Local<v8::Context> context, v8::ValueSerializer* serializer); |
| 43 | }; |
| 44 | |
| 45 | // Represents a single communication message. |
| 46 | class Message : public MemoryRetainer { |
| 47 | public: |
| 48 | // Create a Message with a specific underlying payload, in the format of the |
| 49 | // V8 ValueSerializer API. If `payload` is empty, this message indicates |
| 50 | // that the receiving message port should close itself. |
| 51 | explicit Message(MallocedBuffer<char>&& payload = MallocedBuffer<char>()); |
| 52 | ~Message() = default; |
| 53 | |
| 54 | Message(Message&& other) = default; |
| 55 | Message& operator=(Message&& other) = default; |
| 56 | Message& operator=(const Message&) = delete; |
| 57 | Message(const Message&) = delete; |
| 58 | |
| 59 | // Whether this is a message indicating that the port is to be closed. |
| 60 | // This is the last message to be received by a MessagePort. |
| 61 | bool IsCloseMessage() const; |
| 62 | |
| 63 | // Deserialize the contained JS value. May only be called once, and only |
| 64 | // after Serialize() has been called (e.g. by another thread). |
| 65 | v8::MaybeLocal<v8::Value> Deserialize( |
| 66 | Environment* env, |
| 67 | v8::Local<v8::Context> context, |
| 68 | v8::Local<v8::Value>* port_list = nullptr); |
| 69 | |
| 70 | // Serialize a JS value, and optionally transfer objects, into this message. |
| 71 | // The Message object retains ownership of all transferred objects until |
| 72 | // deserialization. |
| 73 | // The source_port parameter, if provided, will make Serialize() throw a |
| 74 | // "DataCloneError" DOMException if source_port is found in transfer_list. |
| 75 | v8::Maybe<bool> Serialize(Environment* env, |
| 76 | v8::Local<v8::Context> context, |
| 77 | v8::Local<v8::Value> input, |
| 78 | const TransferList& transfer_list, |
| 79 | v8::Local<v8::Object> source_port = |
| 80 | v8::Local<v8::Object>()); |
| 81 | |
| 82 | // Internal method of Message that is called when a new SharedArrayBuffer |
| 83 | // object is encountered in the incoming value's structure. |
| 84 | void AddSharedArrayBuffer(std::shared_ptr<v8::BackingStore> backing_store); |
| 85 | // Internal method of Message that is called once serialization finishes |
| 86 | // and that transfers ownership of `data` to this message. |
| 87 | void AddTransferable(std::unique_ptr<TransferData>&& data); |
| 88 | // Internal method of Message that is called when a new WebAssembly.Module |
| 89 | // object is encountered in the incoming value's structure. |
| 90 | uint32_t AddWASMModule(v8::CompiledWasmModule&& mod); |
| 91 | |
| 92 | // The host objects that will be transferred, as recorded by Serialize() |
| 93 | // (e.g. MessagePorts). |
| 94 | // Used for warning user about posting the target MessagePort to itself, |
| 95 | // which will as a side effect destroy the communication channel. |
| 96 | const std::vector<std::unique_ptr<TransferData>>& transferables() const { |
| 97 | return transferables_; |
| 98 | } |
| 99 | bool has_transferables() const { |
| 100 | return !transferables_.empty() || !array_buffers_.empty(); |
| 101 | } |
| 102 | |
| 103 | void MemoryInfo(MemoryTracker* tracker) const override; |
| 104 | |
| 105 | SET_MEMORY_INFO_NAME(Message)inline std::string MemoryInfoName() const override { return "Message" ; } |
| 106 | SET_SELF_SIZE(Message)inline size_t SelfSize() const override { return sizeof(Message ); } |
| 107 | |
| 108 | private: |
| 109 | MallocedBuffer<char> main_message_buf_; |
| 110 | // TODO(addaleax): Make this a std::variant to save storage size in the common |
| 111 | // case (which is that all of these vectors are empty) once that is available |
| 112 | // with C++17. |
| 113 | std::vector<std::shared_ptr<v8::BackingStore>> array_buffers_; |
| 114 | std::vector<std::shared_ptr<v8::BackingStore>> shared_array_buffers_; |
| 115 | std::vector<std::unique_ptr<TransferData>> transferables_; |
| 116 | std::vector<v8::CompiledWasmModule> wasm_modules_; |
| 117 | |
| 118 | friend class MessagePort; |
| 119 | }; |
| 120 | |
| 121 | class SiblingGroup final : public std::enable_shared_from_this<SiblingGroup> { |
| 122 | public: |
| 123 | // Named SiblingGroup, Used for one-to-many BroadcastChannels. |
| 124 | static std::shared_ptr<SiblingGroup> Get(const std::string& name); |
| 125 | |
| 126 | // Anonymous SiblingGroup, Used for one-to-one MessagePort pairs. |
| 127 | SiblingGroup() = default; |
| 128 | explicit SiblingGroup(const std::string& name); |
| 129 | ~SiblingGroup(); |
| 130 | |
| 131 | // Dispatches the Message to the collection of associated |
| 132 | // ports. If there is more than one destination port and |
| 133 | // the Message contains transferables, Dispatch will fail. |
| 134 | // Returns Just(true) if successful and the message was |
| 135 | // dispatched to at least one destination. Returns Just(false) |
| 136 | // if there were no destinations. Returns Nothing<bool>() |
| 137 | // if there was an error. If error is not nullptr, it will |
| 138 | // be set to an error message or warning message as appropriate. |
| 139 | v8::Maybe<bool> Dispatch( |
| 140 | MessagePortData* source, |
| 141 | std::shared_ptr<Message> message, |
| 142 | std::string* error = nullptr); |
| 143 | |
| 144 | void Entangle(MessagePortData* data); |
| 145 | void Entangle(std::initializer_list<MessagePortData*> data); |
| 146 | void Disentangle(MessagePortData* data); |
| 147 | |
| 148 | const std::string& name() const { return name_; } |
| 149 | |
| 150 | size_t size() const { return ports_.size(); } |
| 151 | |
| 152 | private: |
| 153 | const std::string name_; |
| 154 | RwLock group_mutex_; // Protects ports_. |
| 155 | std::set<MessagePortData*> ports_; |
| 156 | |
| 157 | static void CheckSiblingGroup(const std::string& name); |
| 158 | |
| 159 | using Map = |
| 160 | std::unordered_map<std::string, std::weak_ptr<SiblingGroup>>; |
| 161 | |
| 162 | static Mutex groups_mutex_; |
| 163 | static Map groups_; |
| 164 | }; |
| 165 | |
| 166 | // This contains all data for a `MessagePort` instance that is not tied to |
| 167 | // a specific Environment/Isolate/event loop, for easier transfer between those. |
| 168 | class MessagePortData : public TransferData { |
| 169 | public: |
| 170 | explicit MessagePortData(MessagePort* owner); |
| 171 | ~MessagePortData() override; |
| 172 | |
| 173 | MessagePortData(MessagePortData&& other) = delete; |
| 174 | MessagePortData& operator=(MessagePortData&& other) = delete; |
| 175 | MessagePortData(const MessagePortData& other) = delete; |
| 176 | MessagePortData& operator=(const MessagePortData& other) = delete; |
| 177 | |
| 178 | // Add a message to the incoming queue and notify the receiver. |
| 179 | // This may be called from any thread. |
| 180 | void AddToIncomingQueue(std::shared_ptr<Message> message); |
| 181 | v8::Maybe<bool> Dispatch( |
| 182 | std::shared_ptr<Message> message, |
| 183 | std::string* error = nullptr); |
| 184 | |
| 185 | // Turns `a` and `b` into siblings, i.e. connects the sending side of one |
| 186 | // to the receiving side of the other. This is not thread-safe. |
| 187 | static void Entangle(MessagePortData* a, MessagePortData* b); |
| 188 | |
| 189 | // Removes any possible sibling. This is thread-safe (it acquires both |
| 190 | // `sibling_mutex_` and `mutex_`), and has to be because it is called once |
| 191 | // the corresponding JS handle handle wants to close |
| 192 | // which can happen on either side of a worker. |
| 193 | void Disentangle(); |
| 194 | |
| 195 | void MemoryInfo(MemoryTracker* tracker) const override; |
| 196 | BaseObjectPtr<BaseObject> Deserialize( |
| 197 | Environment* env, |
| 198 | v8::Local<v8::Context> context, |
| 199 | std::unique_ptr<TransferData> self) override; |
| 200 | |
| 201 | SET_MEMORY_INFO_NAME(MessagePortData)inline std::string MemoryInfoName() const override { return "MessagePortData" ; } |
| 202 | SET_SELF_SIZE(MessagePortData)inline size_t SelfSize() const override { return sizeof(MessagePortData ); } |
| 203 | |
| 204 | private: |
| 205 | // This mutex protects all fields below it, with the exception of |
| 206 | // sibling_. |
| 207 | mutable Mutex mutex_; |
| 208 | // TODO(addaleax): Make this a std::variant<std::shared_ptr, std::unique_ptr> |
| 209 | // once that is available with C++17, because std::shared_ptr comes with |
| 210 | // overhead that is only necessary for BroadcastChannel. |
| 211 | std::deque<std::shared_ptr<Message>> incoming_messages_; |
| 212 | MessagePort* owner_ = nullptr; |
| 213 | std::shared_ptr<SiblingGroup> group_; |
| 214 | friend class MessagePort; |
| 215 | friend class SiblingGroup; |
| 216 | }; |
| 217 | |
| 218 | // A message port that receives messages from other threads, including |
| 219 | // the uv_async_t handle that is used to notify the current event loop of |
| 220 | // new incoming messages. |
| 221 | class MessagePort : public HandleWrap { |
| 222 | private: |
| 223 | // Create a new MessagePort. The `context` argument specifies the Context |
| 224 | // instance that is used for creating the values emitted from this port. |
| 225 | // This is called by MessagePort::New(), which is the public API used for |
| 226 | // creating MessagePort instances. |
| 227 | MessagePort(Environment* env, |
| 228 | v8::Local<v8::Context> context, |
| 229 | v8::Local<v8::Object> wrap); |
| 230 | |
| 231 | public: |
| 232 | ~MessagePort() override; |
| 233 | |
| 234 | // Create a new message port instance, optionally over an existing |
| 235 | // `MessagePortData` object. |
| 236 | static MessagePort* New(Environment* env, |
| 237 | v8::Local<v8::Context> context, |
| 238 | std::unique_ptr<MessagePortData> data = {}, |
| 239 | std::shared_ptr<SiblingGroup> sibling_group = {}); |
| 240 | |
| 241 | // Send a message, i.e. deliver it into the sibling's incoming queue. |
| 242 | // If this port is closed, or if there is no sibling, this message is |
| 243 | // serialized with transfers, then silently discarded. |
| 244 | v8::Maybe<bool> PostMessage(Environment* env, |
| 245 | v8::Local<v8::Context> context, |
| 246 | v8::Local<v8::Value> message, |
| 247 | const TransferList& transfer); |
| 248 | |
| 249 | // Start processing messages on this port as a receiving end. |
| 250 | void Start(); |
| 251 | // Stop processing messages on this port as a receiving end. |
| 252 | void Stop(); |
| 253 | |
| 254 | /* constructor */ |
| 255 | static void New(const v8::FunctionCallbackInfo<v8::Value>& args); |
| 256 | /* prototype methods */ |
| 257 | static void PostMessage(const v8::FunctionCallbackInfo<v8::Value>& args); |
| 258 | static void Start(const v8::FunctionCallbackInfo<v8::Value>& args); |
| 259 | static void Stop(const v8::FunctionCallbackInfo<v8::Value>& args); |
| 260 | static void CheckType(const v8::FunctionCallbackInfo<v8::Value>& args); |
| 261 | static void Drain(const v8::FunctionCallbackInfo<v8::Value>& args); |
| 262 | static void ReceiveMessage(const v8::FunctionCallbackInfo<v8::Value>& args); |
| 263 | |
| 264 | /* static */ |
| 265 | static void MoveToContext(const v8::FunctionCallbackInfo<v8::Value>& args); |
| 266 | |
| 267 | // Turns `a` and `b` into siblings, i.e. connects the sending side of one |
| 268 | // to the receiving side of the other. This is not thread-safe. |
| 269 | static void Entangle(MessagePort* a, MessagePort* b); |
| 270 | static void Entangle(MessagePort* a, MessagePortData* b); |
| 271 | |
| 272 | // Detach this port's data for transferring. After this, the MessagePortData |
| 273 | // is no longer associated with this handle, although it can still receive |
| 274 | // messages. |
| 275 | std::unique_ptr<MessagePortData> Detach(); |
| 276 | |
| 277 | void Close( |
| 278 | v8::Local<v8::Value> close_callback = v8::Local<v8::Value>()) override; |
| 279 | |
| 280 | // Returns true if either data_ has been freed, or if the handle is being |
| 281 | // closed. Equivalent to the [[Detached]] internal slot in the HTML Standard. |
| 282 | // |
| 283 | // If checking if a JavaScript MessagePort object is detached, this method |
| 284 | // alone is often not enough, since the backing C++ MessagePort object may |
| 285 | // have been deleted already. For all intents and purposes, an object with a |
| 286 | // NULL pointer to the C++ MessagePort object is also detached. |
| 287 | inline bool IsDetached() const; |
| 288 | |
| 289 | TransferMode GetTransferMode() const override; |
| 290 | std::unique_ptr<TransferData> TransferForMessaging() override; |
| 291 | |
| 292 | void MemoryInfo(MemoryTracker* tracker) const override; |
| 293 | SET_MEMORY_INFO_NAME(MessagePort)inline std::string MemoryInfoName() const override { return "MessagePort" ; } |
| 294 | SET_SELF_SIZE(MessagePort)inline size_t SelfSize() const override { return sizeof(MessagePort ); } |
| 295 | |
| 296 | private: |
| 297 | enum class MessageProcessingMode { |
| 298 | kNormalOperation, |
| 299 | kForceReadMessages |
| 300 | }; |
| 301 | |
| 302 | void OnClose() override; |
| 303 | void OnMessage(MessageProcessingMode mode); |
| 304 | void TriggerAsync(); |
| 305 | v8::MaybeLocal<v8::Value> ReceiveMessage( |
| 306 | v8::Local<v8::Context> context, |
| 307 | MessageProcessingMode mode, |
| 308 | v8::Local<v8::Value>* port_list = nullptr); |
| 309 | |
| 310 | std::unique_ptr<MessagePortData> data_ = nullptr; |
| 311 | bool receiving_messages_ = false; |
| 312 | uv_async_t async_; |
| 313 | v8::Global<v8::Function> emit_message_fn_; |
| 314 | |
| 315 | friend class MessagePortData; |
| 316 | }; |
| 317 | |
| 318 | // Provide a base class from which JS classes that should be transferable or |
| 319 | // cloneable by postMesssage() can inherit. |
| 320 | // See e.g. FileHandle in internal/fs/promises.js for an example. |
| 321 | class JSTransferable : public BaseObject { |
| 322 | public: |
| 323 | JSTransferable(Environment* env, v8::Local<v8::Object> obj); |
| 324 | static void New(const v8::FunctionCallbackInfo<v8::Value>& args); |
| 325 | |
| 326 | TransferMode GetTransferMode() const override; |
| 327 | std::unique_ptr<TransferData> TransferForMessaging() override; |
| 328 | std::unique_ptr<TransferData> CloneForMessaging() const override; |
| 329 | v8::Maybe<std::vector<BaseObjectPtr<BaseObject>>> |
| 330 | NestedTransferables() const override; |
| 331 | v8::Maybe<bool> FinalizeTransferRead( |
| 332 | v8::Local<v8::Context> context, |
| 333 | v8::ValueDeserializer* deserializer) override; |
| 334 | |
| 335 | SET_NO_MEMORY_INFO()inline void MemoryInfo(node::MemoryTracker* tracker) const override {} |
| 336 | SET_MEMORY_INFO_NAME(JSTransferable)inline std::string MemoryInfoName() const override { return "JSTransferable" ; } |
| 337 | SET_SELF_SIZE(JSTransferable)inline size_t SelfSize() const override { return sizeof(JSTransferable ); } |
| 338 | |
| 339 | private: |
| 340 | std::unique_ptr<TransferData> TransferOrClone(TransferMode mode) const; |
| 341 | |
| 342 | class Data : public TransferData { |
| 343 | public: |
| 344 | Data(std::string&& deserialize_info, v8::Global<v8::Value>&& data); |
| 345 | |
| 346 | BaseObjectPtr<BaseObject> Deserialize( |
| 347 | Environment* env, |
| 348 | v8::Local<v8::Context> context, |
| 349 | std::unique_ptr<TransferData> self) override; |
| 350 | v8::Maybe<bool> FinalizeTransferWrite( |
| 351 | v8::Local<v8::Context> context, |
| 352 | v8::ValueSerializer* serializer) override; |
| 353 | |
| 354 | SET_NO_MEMORY_INFO()inline void MemoryInfo(node::MemoryTracker* tracker) const override {} |
| 355 | SET_MEMORY_INFO_NAME(JSTransferableTransferData)inline std::string MemoryInfoName() const override { return "JSTransferableTransferData" ; } |
| 356 | SET_SELF_SIZE(Data)inline size_t SelfSize() const override { return sizeof(Data) ; } |
| 357 | |
| 358 | private: |
| 359 | std::string deserialize_info_; |
| 360 | v8::Global<v8::Value> data_; |
| 361 | }; |
| 362 | }; |
| 363 | |
| 364 | v8::Local<v8::FunctionTemplate> GetMessagePortConstructorTemplate( |
| 365 | Environment* env); |
| 366 | |
| 367 | } // namespace worker |
| 368 | } // namespace node |
| 369 | |
| 370 | #endif // defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS |
| 371 | |
| 372 | |
| 373 | #endif // SRC_NODE_MESSAGING_H_ |