Bug Summary

File:out/../src/node_messaging.cc
Warning:line 668, column 12
Potential leak of memory pointed to by 'port'

Annotated Source Code

Press '?' to see keyboard shortcuts

clang -cc1 -cc1 -triple x86_64-unknown-linux-gnu -analyze -disable-free -clear-ast-before-backend -disable-llvm-verifier -discard-value-names -main-file-name node_messaging.cc -analyzer-checker=core -analyzer-checker=apiModeling -analyzer-checker=unix -analyzer-checker=deadcode -analyzer-checker=cplusplus -analyzer-checker=security.insecureAPI.UncheckedReturn -analyzer-checker=security.insecureAPI.getpw -analyzer-checker=security.insecureAPI.gets -analyzer-checker=security.insecureAPI.mktemp -analyzer-checker=security.insecureAPI.mkstemp -analyzer-checker=security.insecureAPI.vfork -analyzer-checker=nullability.NullPassedToNonnull -analyzer-checker=nullability.NullReturnedFromNonnull -analyzer-output plist -w -setup-static-analyzer -mrelocation-model pic -pic-level 2 -pic-is-pie -mframe-pointer=all -fmath-errno -ffp-contract=on -fno-rounding-math -mconstructor-aliases -funwind-tables=2 -target-cpu x86-64 -tune-cpu generic -debugger-tuning=gdb -fcoverage-compilation-dir=/home/maurizio/node-v18.6.0/out -resource-dir /usr/local/lib/clang/16.0.0 -D V8_DEPRECATION_WARNINGS -D V8_IMMINENT_DEPRECATION_WARNINGS -D _GLIBCXX_USE_CXX11_ABI=1 -D NODE_OPENSSL_CONF_NAME=nodejs_conf -D NODE_OPENSSL_HAS_QUIC -D __STDC_FORMAT_MACROS -D OPENSSL_NO_PINSHARED -D OPENSSL_THREADS -D NODE_ARCH="x64" -D NODE_PLATFORM="linux" -D NODE_WANT_INTERNALS=1 -D V8_DEPRECATION_WARNINGS=1 -D NODE_OPENSSL_SYSTEM_CERT_PATH="" -D NODE_USE_NODE_CODE_CACHE=1 -D HAVE_INSPECTOR=1 -D NODE_ENABLE_LARGE_CODE_PAGES=1 -D __POSIX__ -D NODE_USE_V8_PLATFORM=1 -D NODE_HAVE_I18N_SUPPORT=1 -D HAVE_OPENSSL=1 -D OPENSSL_API_COMPAT=0x10100000L -D UCONFIG_NO_SERVICE=1 -D U_ENABLE_DYLOAD=0 -D U_STATIC_IMPLEMENTATION=1 -D U_HAVE_STD_STRING=1 -D UCONFIG_NO_BREAK_ITERATION=0 -D _LARGEFILE_SOURCE -D _FILE_OFFSET_BITS=64 -D _POSIX_C_SOURCE=200112 -D NGHTTP2_STATICLIB -D NDEBUG -D OPENSSL_USE_NODELETE -D L_ENDIAN -D OPENSSL_BUILDING_OPENSSL -D AES_ASM -D BSAES_ASM -D CMLL_ASM -D ECP_NISTZ256_ASM -D GHASH_ASM -D KECCAK1600_ASM -D MD5_ASM -D OPENSSL_BN_ASM_GF2m -D OPENSSL_BN_ASM_MONT -D OPENSSL_BN_ASM_MONT5 -D OPENSSL_CPUID_OBJ -D OPENSSL_IA32_SSE2 -D PADLOCK_ASM -D POLY1305_ASM -D SHA1_ASM -D SHA256_ASM -D SHA512_ASM -D VPAES_ASM -D WHIRLPOOL_ASM -D X25519_ASM -D OPENSSL_PIC -D NGTCP2_STATICLIB -D NGHTTP3_STATICLIB -I ../src -I /home/maurizio/node-v18.6.0/out/Release/obj/gen -I /home/maurizio/node-v18.6.0/out/Release/obj/gen/include -I /home/maurizio/node-v18.6.0/out/Release/obj/gen/src -I ../deps/googletest/include -I ../deps/histogram/src -I ../deps/uvwasi/include -I ../deps/v8/include -I ../deps/icu-small/source/i18n -I ../deps/icu-small/source/common -I ../deps/zlib -I ../deps/llhttp/include -I ../deps/cares/include -I ../deps/uv/include -I ../deps/nghttp2/lib/includes -I ../deps/brotli/c/include -I ../deps/openssl/openssl/include -I ../deps/openssl/openssl/crypto/include -I ../deps/openssl/config/archs/linux-x86_64/asm/include -I ../deps/openssl/config/archs/linux-x86_64/asm -I ../deps/ngtcp2 -I ../deps/ngtcp2/ngtcp2/lib/includes -I ../deps/ngtcp2/ngtcp2/crypto/includes -I ../deps/ngtcp2/nghttp3/lib/includes -internal-isystem /usr/lib/gcc/x86_64-redhat-linux/8/../../../../include/c++/8 -internal-isystem /usr/lib/gcc/x86_64-redhat-linux/8/../../../../include/c++/8/x86_64-redhat-linux -internal-isystem /usr/lib/gcc/x86_64-redhat-linux/8/../../../../include/c++/8/backward -internal-isystem /usr/local/lib/clang/16.0.0/include -internal-isystem /usr/local/include -internal-isystem /usr/lib/gcc/x86_64-redhat-linux/8/../../../../x86_64-redhat-linux/include -internal-externc-isystem /include -internal-externc-isystem /usr/include -O3 -Wno-unused-parameter -Wno-unused-parameter -std=gnu++17 -fdeprecated-macro -fdebug-compilation-dir=/home/maurizio/node-v18.6.0/out -ferror-limit 19 -fno-rtti -fgnuc-version=4.2.1 -vectorize-loops -vectorize-slp -analyzer-output=html -faddrsig -D__GCC_HAVE_DWARF2_CFI_ASM=1 -o /tmp/scan-build-2022-08-22-142216-507842-1 -x c++ ../src/node_messaging.cc

../src/node_messaging.cc

1#include "node_messaging.h"
2
3#include "async_wrap-inl.h"
4#include "debug_utils-inl.h"
5#include "memory_tracker-inl.h"
6#include "node_buffer.h"
7#include "node_contextify.h"
8#include "node_errors.h"
9#include "node_external_reference.h"
10#include "node_process-inl.h"
11#include "util-inl.h"
12
13using node::contextify::ContextifyContext;
14using node::errors::TryCatchScope;
15using v8::Array;
16using v8::ArrayBuffer;
17using v8::BackingStore;
18using v8::CompiledWasmModule;
19using v8::Context;
20using v8::EscapableHandleScope;
21using v8::Function;
22using v8::FunctionCallbackInfo;
23using v8::FunctionTemplate;
24using v8::Global;
25using v8::HandleScope;
26using v8::Isolate;
27using v8::Just;
28using v8::Local;
29using v8::Maybe;
30using v8::MaybeLocal;
31using v8::Nothing;
32using v8::Object;
33using v8::SharedArrayBuffer;
34using v8::String;
35using v8::Symbol;
36using v8::Value;
37using v8::ValueDeserializer;
38using v8::ValueSerializer;
39using v8::WasmModuleObject;
40
41namespace node {
42
43using BaseObjectList = std::vector<BaseObjectPtr<BaseObject>>;
44
45BaseObject::TransferMode BaseObject::GetTransferMode() const {
46 return BaseObject::TransferMode::kUntransferable;
47}
48
49std::unique_ptr<worker::TransferData> BaseObject::TransferForMessaging() {
50 return CloneForMessaging();
51}
52
53std::unique_ptr<worker::TransferData> BaseObject::CloneForMessaging() const {
54 return {};
55}
56
57Maybe<BaseObjectList> BaseObject::NestedTransferables() const {
58 return Just(BaseObjectList {});
59}
60
61Maybe<bool> BaseObject::FinalizeTransferRead(
62 Local<Context> context, ValueDeserializer* deserializer) {
63 return Just(true);
64}
65
66namespace worker {
67
68Maybe<bool> TransferData::FinalizeTransferWrite(
69 Local<Context> context, ValueSerializer* serializer) {
70 return Just(true);
71}
72
73Message::Message(MallocedBuffer<char>&& buffer)
74 : main_message_buf_(std::move(buffer)) {}
75
76bool Message::IsCloseMessage() const {
77 return main_message_buf_.data == nullptr;
78}
79
80namespace {
81
82// This is used to tell V8 how to read transferred host objects, like other
83// `MessagePort`s and `SharedArrayBuffer`s, and make new JS objects out of them.
84class DeserializerDelegate : public ValueDeserializer::Delegate {
85 public:
86 DeserializerDelegate(
87 Message* m,
88 Environment* env,
89 const std::vector<BaseObjectPtr<BaseObject>>& host_objects,
90 const std::vector<Local<SharedArrayBuffer>>& shared_array_buffers,
91 const std::vector<CompiledWasmModule>& wasm_modules)
92 : host_objects_(host_objects),
93 shared_array_buffers_(shared_array_buffers),
94 wasm_modules_(wasm_modules) {}
95
96 MaybeLocal<Object> ReadHostObject(Isolate* isolate) override {
97 // Identifying the index in the message's BaseObject array is sufficient.
98 uint32_t id;
99 if (!deserializer->ReadUint32(&id))
100 return MaybeLocal<Object>();
101 CHECK_LT(id, host_objects_.size())do { if (__builtin_expect(!!(!((id) < (host_objects_.size(
)))), 0)) { do { static const node::AssertionInfo args = { "../src/node_messaging.cc"
":" "101", "(id) < (host_objects_.size())", __PRETTY_FUNCTION__
}; node::Assert(args); } while (0); } } while (0)
;
102 return host_objects_[id]->object(isolate);
103 }
104
105 MaybeLocal<SharedArrayBuffer> GetSharedArrayBufferFromId(
106 Isolate* isolate, uint32_t clone_id) override {
107 CHECK_LT(clone_id, shared_array_buffers_.size())do { if (__builtin_expect(!!(!((clone_id) < (shared_array_buffers_
.size()))), 0)) { do { static const node::AssertionInfo args =
{ "../src/node_messaging.cc" ":" "107", "(clone_id) < (shared_array_buffers_.size())"
, __PRETTY_FUNCTION__ }; node::Assert(args); } while (0); } }
while (0)
;
108 return shared_array_buffers_[clone_id];
109 }
110
111 MaybeLocal<WasmModuleObject> GetWasmModuleFromId(
112 Isolate* isolate, uint32_t transfer_id) override {
113 CHECK_LT(transfer_id, wasm_modules_.size())do { if (__builtin_expect(!!(!((transfer_id) < (wasm_modules_
.size()))), 0)) { do { static const node::AssertionInfo args =
{ "../src/node_messaging.cc" ":" "113", "(transfer_id) < (wasm_modules_.size())"
, __PRETTY_FUNCTION__ }; node::Assert(args); } while (0); } }
while (0)
;
114 return WasmModuleObject::FromCompiledModule(
115 isolate, wasm_modules_[transfer_id]);
116 }
117
118 ValueDeserializer* deserializer = nullptr;
119
120 private:
121 const std::vector<BaseObjectPtr<BaseObject>>& host_objects_;
122 const std::vector<Local<SharedArrayBuffer>>& shared_array_buffers_;
123 const std::vector<CompiledWasmModule>& wasm_modules_;
124};
125
126} // anonymous namespace
127
128MaybeLocal<Value> Message::Deserialize(Environment* env,
129 Local<Context> context,
130 Local<Value>* port_list) {
131 Context::Scope context_scope(context);
132
133 CHECK(!IsCloseMessage())do { if (__builtin_expect(!!(!(!IsCloseMessage())), 0)) { do {
static const node::AssertionInfo args = { "../src/node_messaging.cc"
":" "133", "!IsCloseMessage()", __PRETTY_FUNCTION__ }; node::
Assert(args); } while (0); } } while (0)
;
134 if (port_list != nullptr && !transferables_.empty()) {
135 // Need to create this outside of the EscapableHandleScope, but inside
136 // the Context::Scope.
137 *port_list = Array::New(env->isolate());
138 }
139
140 EscapableHandleScope handle_scope(env->isolate());
141
142 // Create all necessary objects for transferables, e.g. MessagePort handles.
143 std::vector<BaseObjectPtr<BaseObject>> host_objects(transferables_.size());
144 auto cleanup = OnScopeLeave([&]() {
145 for (BaseObjectPtr<BaseObject> object : host_objects) {
146 if (!object) continue;
147
148 // If the function did not finish successfully, host_objects will contain
149 // a list of objects that will never be passed to JS. Therefore, we
150 // destroy them here.
151 object->Detach();
152 }
153 });
154
155 for (uint32_t i = 0; i < transferables_.size(); ++i) {
156 HandleScope handle_scope(env->isolate());
157 TransferData* data = transferables_[i].get();
158 host_objects[i] = data->Deserialize(
159 env, context, std::move(transferables_[i]));
160 if (!host_objects[i]) return {};
161 if (port_list != nullptr) {
162 // If we gather a list of all message ports, and this transferred object
163 // is a message port, add it to that list. This is a bit of an odd case
164 // of special handling for MessagePorts (as opposed to applying to all
165 // transferables), but it's required for spec compliance.
166 DCHECK((*port_list)->IsArray());
167 Local<Array> port_list_array = port_list->As<Array>();
168 Local<Object> obj = host_objects[i]->object();
169 if (env->message_port_constructor_template()->HasInstance(obj)) {
170 if (port_list_array->Set(context,
171 port_list_array->Length(),
172 obj).IsNothing()) {
173 return {};
174 }
175 }
176 }
177 }
178 transferables_.clear();
179
180 std::vector<Local<SharedArrayBuffer>> shared_array_buffers;
181 // Attach all transferred SharedArrayBuffers to their new Isolate.
182 for (uint32_t i = 0; i < shared_array_buffers_.size(); ++i) {
183 Local<SharedArrayBuffer> sab =
184 SharedArrayBuffer::New(env->isolate(), shared_array_buffers_[i]);
185 shared_array_buffers.push_back(sab);
186 }
187
188 DeserializerDelegate delegate(
189 this, env, host_objects, shared_array_buffers, wasm_modules_);
190 ValueDeserializer deserializer(
191 env->isolate(),
192 reinterpret_cast<const uint8_t*>(main_message_buf_.data),
193 main_message_buf_.size,
194 &delegate);
195 delegate.deserializer = &deserializer;
196
197 // Attach all transferred ArrayBuffers to their new Isolate.
198 for (uint32_t i = 0; i < array_buffers_.size(); ++i) {
199 Local<ArrayBuffer> ab =
200 ArrayBuffer::New(env->isolate(), std::move(array_buffers_[i]));
201 deserializer.TransferArrayBuffer(i, ab);
202 }
203
204 if (deserializer.ReadHeader(context).IsNothing())
205 return {};
206 Local<Value> return_value;
207 if (!deserializer.ReadValue(context).ToLocal(&return_value))
208 return {};
209
210 for (BaseObjectPtr<BaseObject> base_object : host_objects) {
211 if (base_object->FinalizeTransferRead(context, &deserializer).IsNothing())
212 return {};
213 }
214
215 host_objects.clear();
216 return handle_scope.Escape(return_value);
217}
218
219void Message::AddSharedArrayBuffer(
220 std::shared_ptr<BackingStore> backing_store) {
221 shared_array_buffers_.emplace_back(std::move(backing_store));
222}
223
224void Message::AddTransferable(std::unique_ptr<TransferData>&& data) {
225 transferables_.emplace_back(std::move(data));
226}
227
228uint32_t Message::AddWASMModule(CompiledWasmModule&& mod) {
229 wasm_modules_.emplace_back(std::move(mod));
230 return wasm_modules_.size() - 1;
231}
232
233namespace {
234
235MaybeLocal<Function> GetEmitMessageFunction(Local<Context> context) {
236 Isolate* isolate = context->GetIsolate();
237 Local<Object> per_context_bindings;
238 Local<Value> emit_message_val;
239 if (!GetPerContextExports(context).ToLocal(&per_context_bindings) ||
240 !per_context_bindings->Get(context,
241 FIXED_ONE_BYTE_STRING(isolate, "emitMessage"))
242 .ToLocal(&emit_message_val)) {
243 return MaybeLocal<Function>();
244 }
245 CHECK(emit_message_val->IsFunction())do { if (__builtin_expect(!!(!(emit_message_val->IsFunction
())), 0)) { do { static const node::AssertionInfo args = { "../src/node_messaging.cc"
":" "245", "emit_message_val->IsFunction()", __PRETTY_FUNCTION__
}; node::Assert(args); } while (0); } } while (0)
;
246 return emit_message_val.As<Function>();
247}
248
249MaybeLocal<Function> GetDOMException(Local<Context> context) {
250 Isolate* isolate = context->GetIsolate();
251 Local<Object> per_context_bindings;
252 Local<Value> domexception_ctor_val;
253 if (!GetPerContextExports(context).ToLocal(&per_context_bindings) ||
254 !per_context_bindings->Get(context,
255 FIXED_ONE_BYTE_STRING(isolate, "DOMException"))
256 .ToLocal(&domexception_ctor_val)) {
257 return MaybeLocal<Function>();
258 }
259 CHECK(domexception_ctor_val->IsFunction())do { if (__builtin_expect(!!(!(domexception_ctor_val->IsFunction
())), 0)) { do { static const node::AssertionInfo args = { "../src/node_messaging.cc"
":" "259", "domexception_ctor_val->IsFunction()", __PRETTY_FUNCTION__
}; node::Assert(args); } while (0); } } while (0)
;
260 Local<Function> domexception_ctor = domexception_ctor_val.As<Function>();
261 return domexception_ctor;
262}
263
264void ThrowDataCloneException(Local<Context> context, Local<String> message) {
265 Isolate* isolate = context->GetIsolate();
266 Local<Value> argv[] = {message,
267 FIXED_ONE_BYTE_STRING(isolate, "DataCloneError")};
268 Local<Value> exception;
269 Local<Function> domexception_ctor;
270 if (!GetDOMException(context).ToLocal(&domexception_ctor) ||
271 !domexception_ctor->NewInstance(context, arraysize(argv), argv)
272 .ToLocal(&exception)) {
273 return;
274 }
275 isolate->ThrowException(exception);
276}
277
278// This tells V8 how to serialize objects that it does not understand
279// (e.g. C++ objects) into the output buffer, in a way that our own
280// DeserializerDelegate understands how to unpack.
281class SerializerDelegate : public ValueSerializer::Delegate {
282 public:
283 SerializerDelegate(Environment* env, Local<Context> context, Message* m)
284 : env_(env), context_(context), msg_(m) {}
285
286 void ThrowDataCloneError(Local<String> message) override {
287 ThrowDataCloneException(context_, message);
288 }
289
290 Maybe<bool> WriteHostObject(Isolate* isolate, Local<Object> object) override {
291 if (env_->base_object_ctor_template()->HasInstance(object)) {
292 return WriteHostObject(
293 BaseObjectPtr<BaseObject> { Unwrap<BaseObject>(object) });
294 }
295
296 ThrowDataCloneError(env_->clone_unsupported_type_str());
297 return Nothing<bool>();
298 }
299
300 Maybe<uint32_t> GetSharedArrayBufferId(
301 Isolate* isolate,
302 Local<SharedArrayBuffer> shared_array_buffer) override {
303 uint32_t i;
304 for (i = 0; i < seen_shared_array_buffers_.size(); ++i) {
305 if (PersistentToLocal::Strong(seen_shared_array_buffers_[i]) ==
306 shared_array_buffer) {
307 return Just(i);
308 }
309 }
310
311 seen_shared_array_buffers_.emplace_back(
312 Global<SharedArrayBuffer> { isolate, shared_array_buffer });
313 msg_->AddSharedArrayBuffer(shared_array_buffer->GetBackingStore());
314 return Just(i);
315 }
316
317 Maybe<uint32_t> GetWasmModuleTransferId(
318 Isolate* isolate, Local<WasmModuleObject> module) override {
319 return Just(msg_->AddWASMModule(module->GetCompiledModule()));
320 }
321
322 Maybe<bool> Finish(Local<Context> context) {
323 for (uint32_t i = 0; i < host_objects_.size(); i++) {
324 BaseObjectPtr<BaseObject> host_object = std::move(host_objects_[i]);
325 std::unique_ptr<TransferData> data;
326 if (i < first_cloned_object_index_)
327 data = host_object->TransferForMessaging();
328 if (!data)
329 data = host_object->CloneForMessaging();
330 if (!data) return Nothing<bool>();
331 if (data->FinalizeTransferWrite(context, serializer).IsNothing())
332 return Nothing<bool>();
333 msg_->AddTransferable(std::move(data));
334 }
335 return Just(true);
336 }
337
338 inline void AddHostObject(BaseObjectPtr<BaseObject> host_object) {
339 // Make sure we have not started serializing the value itself yet.
340 CHECK_EQ(first_cloned_object_index_, SIZE_MAX)do { if (__builtin_expect(!!(!((first_cloned_object_index_) ==
((18446744073709551615UL)))), 0)) { do { static const node::
AssertionInfo args = { "../src/node_messaging.cc" ":" "340", "(first_cloned_object_index_) == ((18446744073709551615UL))"
, __PRETTY_FUNCTION__ }; node::Assert(args); } while (0); } }
while (0)
;
341 host_objects_.emplace_back(std::move(host_object));
342 }
343
344 // Some objects in the transfer list may register sub-objects that can be
345 // transferred. This could e.g. be a public JS wrapper object, such as a
346 // FileHandle, that is registering its C++ handle for transfer.
347 inline Maybe<bool> AddNestedHostObjects() {
348 for (size_t i = 0; i < host_objects_.size(); i++) {
349 std::vector<BaseObjectPtr<BaseObject>> nested_transferables;
350 if (!host_objects_[i]->NestedTransferables().To(&nested_transferables))
351 return Nothing<bool>();
352 for (auto nested_transferable : nested_transferables) {
353 if (std::find(host_objects_.begin(),
354 host_objects_.end(),
355 nested_transferable) == host_objects_.end()) {
356 AddHostObject(nested_transferable);
357 }
358 }
359 }
360 return Just(true);
361 }
362
363 ValueSerializer* serializer = nullptr;
364
365 private:
366 Maybe<bool> WriteHostObject(BaseObjectPtr<BaseObject> host_object) {
367 BaseObject::TransferMode mode = host_object->GetTransferMode();
368 if (mode == BaseObject::TransferMode::kUntransferable) {
369 ThrowDataCloneError(env_->clone_unsupported_type_str());
370 return Nothing<bool>();
371 }
372
373 for (uint32_t i = 0; i < host_objects_.size(); i++) {
374 if (host_objects_[i] == host_object) {
375 serializer->WriteUint32(i);
376 return Just(true);
377 }
378 }
379
380 if (mode == BaseObject::TransferMode::kTransferable) {
381 THROW_ERR_MISSING_TRANSFERABLE_IN_TRANSFER_LIST(env_);
382 return Nothing<bool>();
383 }
384
385 CHECK_EQ(mode, BaseObject::TransferMode::kCloneable)do { if (__builtin_expect(!!(!((mode) == (BaseObject::TransferMode
::kCloneable))), 0)) { do { static const node::AssertionInfo args
= { "../src/node_messaging.cc" ":" "385", "(mode) == (BaseObject::TransferMode::kCloneable)"
, __PRETTY_FUNCTION__ }; node::Assert(args); } while (0); } }
while (0)
;
386 uint32_t index = host_objects_.size();
387 if (first_cloned_object_index_ == SIZE_MAX(18446744073709551615UL))
388 first_cloned_object_index_ = index;
389 serializer->WriteUint32(index);
390 host_objects_.push_back(host_object);
391 return Just(true);
392 }
393
394 Environment* env_;
395 Local<Context> context_;
396 Message* msg_;
397 std::vector<Global<SharedArrayBuffer>> seen_shared_array_buffers_;
398 std::vector<BaseObjectPtr<BaseObject>> host_objects_;
399 size_t first_cloned_object_index_ = SIZE_MAX(18446744073709551615UL);
400
401 friend class worker::Message;
402};
403
404} // anonymous namespace
405
406Maybe<bool> Message::Serialize(Environment* env,
407 Local<Context> context,
408 Local<Value> input,
409 const TransferList& transfer_list_v,
410 Local<Object> source_port) {
411 HandleScope handle_scope(env->isolate());
412 Context::Scope context_scope(context);
413
414 // Verify that we're not silently overwriting an existing message.
415 CHECK(main_message_buf_.is_empty())do { if (__builtin_expect(!!(!(main_message_buf_.is_empty()))
, 0)) { do { static const node::AssertionInfo args = { "../src/node_messaging.cc"
":" "415", "main_message_buf_.is_empty()", __PRETTY_FUNCTION__
}; node::Assert(args); } while (0); } } while (0)
;
416
417 SerializerDelegate delegate(env, context, this);
418 ValueSerializer serializer(env->isolate(), &delegate);
419 delegate.serializer = &serializer;
420
421 std::vector<Local<ArrayBuffer>> array_buffers;
422 for (uint32_t i = 0; i < transfer_list_v.length(); ++i) {
423 Local<Value> entry = transfer_list_v[i];
424 if (entry->IsObject()) {
425 // See https://github.com/nodejs/node/pull/30339#issuecomment-552225353
426 // for details.
427 bool untransferable;
428 if (!entry.As<Object>()->HasPrivate(
429 context,
430 env->untransferable_object_private_symbol())
431 .To(&untransferable)) {
432 return Nothing<bool>();
433 }
434 if (untransferable) continue;
435 }
436
437 // Currently, we support ArrayBuffers and BaseObjects for which
438 // GetTransferMode() does not return kUntransferable.
439 if (entry->IsArrayBuffer()) {
440 Local<ArrayBuffer> ab = entry.As<ArrayBuffer>();
441 // If we cannot render the ArrayBuffer unusable in this Isolate,
442 // copying the buffer will have to do.
443 // Note that we can currently transfer ArrayBuffers even if they were
444 // not allocated by Node’s ArrayBufferAllocator in the first place,
445 // because we pass the underlying v8::BackingStore around rather than
446 // raw data *and* an Isolate with a non-default ArrayBuffer allocator
447 // is always going to outlive any Workers it creates, and so will its
448 // allocator along with it.
449 if (!ab->IsDetachable()) continue;
450 if (std::find(array_buffers.begin(), array_buffers.end(), ab) !=
451 array_buffers.end()) {
452 ThrowDataCloneException(
453 context,
454 FIXED_ONE_BYTE_STRING(
455 env->isolate(),
456 "Transfer list contains duplicate ArrayBuffer"));
457 return Nothing<bool>();
458 }
459 // We simply use the array index in the `array_buffers` list as the
460 // ID that we write into the serialized buffer.
461 uint32_t id = array_buffers.size();
462 array_buffers.push_back(ab);
463 serializer.TransferArrayBuffer(id, ab);
464 continue;
465 } else if (env->base_object_ctor_template()->HasInstance(entry)) {
466 // Check if the source MessagePort is being transferred.
467 if (!source_port.IsEmpty() && entry == source_port) {
468 ThrowDataCloneException(
469 context,
470 FIXED_ONE_BYTE_STRING(env->isolate(),
471 "Transfer list contains source port"));
472 return Nothing<bool>();
473 }
474 BaseObjectPtr<BaseObject> host_object {
475 Unwrap<BaseObject>(entry.As<Object>()) };
476 if (env->message_port_constructor_template()->HasInstance(entry) &&
477 (!host_object ||
478 static_cast<MessagePort*>(host_object.get())->IsDetached())) {
479 ThrowDataCloneException(
480 context,
481 FIXED_ONE_BYTE_STRING(
482 env->isolate(),
483 "MessagePort in transfer list is already detached"));
484 return Nothing<bool>();
485 }
486 if (std::find(delegate.host_objects_.begin(),
487 delegate.host_objects_.end(),
488 host_object) != delegate.host_objects_.end()) {
489 ThrowDataCloneException(
490 context,
491 String::Concat(env->isolate(),
492 FIXED_ONE_BYTE_STRING(
493 env->isolate(),
494 "Transfer list contains duplicate "),
495 entry.As<Object>()->GetConstructorName()));
496 return Nothing<bool>();
497 }
498 if (host_object && host_object->GetTransferMode() !=
499 BaseObject::TransferMode::kUntransferable) {
500 delegate.AddHostObject(host_object);
501 continue;
502 }
503 }
504
505 THROW_ERR_INVALID_TRANSFER_OBJECT(env);
506 return Nothing<bool>();
507 }
508 if (delegate.AddNestedHostObjects().IsNothing())
509 return Nothing<bool>();
510
511 serializer.WriteHeader();
512 if (serializer.WriteValue(context, input).IsNothing()) {
513 return Nothing<bool>();
514 }
515
516 for (Local<ArrayBuffer> ab : array_buffers) {
517 // If serialization succeeded, we render it inaccessible in this Isolate.
518 std::shared_ptr<BackingStore> backing_store = ab->GetBackingStore();
519 ab->Detach();
520
521 array_buffers_.emplace_back(std::move(backing_store));
522 }
523
524 if (delegate.Finish(context).IsNothing())
525 return Nothing<bool>();
526
527 // The serializer gave us a buffer allocated using `malloc()`.
528 std::pair<uint8_t*, size_t> data = serializer.Release();
529 CHECK_NOT_NULL(data.first)do { if (__builtin_expect(!!(!((data.first) != nullptr)), 0))
{ do { static const node::AssertionInfo args = { "../src/node_messaging.cc"
":" "529", "(data.first) != nullptr", __PRETTY_FUNCTION__ };
node::Assert(args); } while (0); } } while (0)
;
530 main_message_buf_ =
531 MallocedBuffer<char>(reinterpret_cast<char*>(data.first), data.second);
532 return Just(true);
533}
534
535void Message::MemoryInfo(MemoryTracker* tracker) const {
536 tracker->TrackField("array_buffers_", array_buffers_);
537 tracker->TrackField("shared_array_buffers", shared_array_buffers_);
538 tracker->TrackField("transferables", transferables_);
539}
540
541MessagePortData::MessagePortData(MessagePort* owner)
542 : owner_(owner) {
543}
544
545MessagePortData::~MessagePortData() {
546 CHECK_NULL(owner_)do { if (__builtin_expect(!!(!((owner_) == nullptr)), 0)) { do
{ static const node::AssertionInfo args = { "../src/node_messaging.cc"
":" "546", "(owner_) == nullptr", __PRETTY_FUNCTION__ }; node
::Assert(args); } while (0); } } while (0)
;
547 Disentangle();
548}
549
550void MessagePortData::MemoryInfo(MemoryTracker* tracker) const {
551 Mutex::ScopedLock lock(mutex_);
552 tracker->TrackField("incoming_messages", incoming_messages_);
553}
554
555void MessagePortData::AddToIncomingQueue(std::shared_ptr<Message> message) {
556 // This function will be called by other threads.
557 Mutex::ScopedLock lock(mutex_);
558 incoming_messages_.emplace_back(std::move(message));
559
560 if (owner_ != nullptr) {
561 Debug(owner_, "Adding message to incoming queue");
562 owner_->TriggerAsync();
563 }
564}
565
566void MessagePortData::Entangle(MessagePortData* a, MessagePortData* b) {
567 auto group = std::make_shared<SiblingGroup>();
568 group->Entangle({a, b});
569}
570
571void MessagePortData::Disentangle() {
572 if (group_) {
573 group_->Disentangle(this);
574 }
575}
576
577MessagePort::~MessagePort() {
578 if (data_) Detach();
579}
580
581MessagePort::MessagePort(Environment* env,
582 Local<Context> context,
583 Local<Object> wrap)
584 : HandleWrap(env,
585 wrap,
586 reinterpret_cast<uv_handle_t*>(&async_),
587 AsyncWrap::PROVIDER_MESSAGEPORT),
588 data_(new MessagePortData(this)) {
589 auto onmessage = [](uv_async_t* handle) {
590 // Called when data has been put into the queue.
591 MessagePort* channel = ContainerOf(&MessagePort::async_, handle);
592 channel->OnMessage(MessageProcessingMode::kNormalOperation);
593 };
594
595 CHECK_EQ(uv_async_init(env->event_loop(),do { if (__builtin_expect(!!(!((uv_async_init(env->event_loop
(), &async_, onmessage)) == (0))), 0)) { do { static const
node::AssertionInfo args = { "../src/node_messaging.cc" ":" "597"
, "(uv_async_init(env->event_loop(), &async_, onmessage)) == (0)"
, __PRETTY_FUNCTION__ }; node::Assert(args); } while (0); } }
while (0)
596 &async_,do { if (__builtin_expect(!!(!((uv_async_init(env->event_loop
(), &async_, onmessage)) == (0))), 0)) { do { static const
node::AssertionInfo args = { "../src/node_messaging.cc" ":" "597"
, "(uv_async_init(env->event_loop(), &async_, onmessage)) == (0)"
, __PRETTY_FUNCTION__ }; node::Assert(args); } while (0); } }
while (0)
597 onmessage), 0)do { if (__builtin_expect(!!(!((uv_async_init(env->event_loop
(), &async_, onmessage)) == (0))), 0)) { do { static const
node::AssertionInfo args = { "../src/node_messaging.cc" ":" "597"
, "(uv_async_init(env->event_loop(), &async_, onmessage)) == (0)"
, __PRETTY_FUNCTION__ }; node::Assert(args); } while (0); } }
while (0)
;
598 // Reset later to indicate success of the constructor.
599 bool succeeded = false;
600 auto cleanup = OnScopeLeave([&]() { if (!succeeded) Close(); });
601
602 Local<Value> fn;
603 if (!wrap->Get(context, env->oninit_symbol()).ToLocal(&fn))
604 return;
605
606 if (fn->IsFunction()) {
607 Local<Function> init = fn.As<Function>();
608 if (init->Call(context, wrap, 0, nullptr).IsEmpty())
609 return;
610 }
611
612 Local<Function> emit_message_fn;
613 if (!GetEmitMessageFunction(context).ToLocal(&emit_message_fn))
614 return;
615 emit_message_fn_.Reset(env->isolate(), emit_message_fn);
616
617 succeeded = true;
618 Debug(this, "Created message port");
619}
620
621bool MessagePort::IsDetached() const {
622 return data_ == nullptr || IsHandleClosing();
623}
624
625void MessagePort::TriggerAsync() {
626 if (IsHandleClosing()) return;
627 CHECK_EQ(uv_async_send(&async_), 0)do { if (__builtin_expect(!!(!((uv_async_send(&async_)) ==
(0))), 0)) { do { static const node::AssertionInfo args = { "../src/node_messaging.cc"
":" "627", "(uv_async_send(&async_)) == (0)", __PRETTY_FUNCTION__
}; node::Assert(args); } while (0); } } while (0)
;
628}
629
630void MessagePort::Close(v8::Local<v8::Value> close_callback) {
631 Debug(this, "Closing message port, data set = %d", static_cast<int>(!!data_));
632
633 if (data_) {
634 // Wrap this call with accessing the mutex, so that TriggerAsync()
635 // can check IsHandleClosing() without race conditions.
636 Mutex::ScopedLock sibling_lock(data_->mutex_);
637 HandleWrap::Close(close_callback);
638 } else {
639 HandleWrap::Close(close_callback);
640 }
641}
642
643void MessagePort::New(const FunctionCallbackInfo<Value>& args) {
644 // This constructor just throws an error. Unfortunately, we can’t use V8’s
645 // ConstructorBehavior::kThrow, as that also removes the prototype from the
646 // class (i.e. makes it behave like an arrow function).
647 Environment* env = Environment::GetCurrent(args);
648 THROW_ERR_CONSTRUCT_CALL_INVALID(env);
649}
650
651MessagePort* MessagePort::New(
652 Environment* env,
653 Local<Context> context,
654 std::unique_ptr<MessagePortData> data,
655 std::shared_ptr<SiblingGroup> sibling_group) {
656 Context::Scope context_scope(context);
657 Local<FunctionTemplate> ctor_templ = GetMessagePortConstructorTemplate(env);
658
659 // Construct a new instance, then assign the listener instance and possibly
660 // the MessagePortData to it.
661 Local<Object> instance;
662 if (!ctor_templ->InstanceTemplate()->NewInstance(context).ToLocal(&instance))
4
Taking false branch
663 return nullptr;
664 MessagePort* port = new MessagePort(env, context, instance);
5
Memory is allocated
665 CHECK_NOT_NULL(port)do { if (__builtin_expect(!!(!((port) != nullptr)), 0)) { do {
static const node::AssertionInfo args = { "../src/node_messaging.cc"
":" "665", "(port) != nullptr", __PRETTY_FUNCTION__ }; node::
Assert(args); } while (0); } } while (0)
;
6
Taking false branch
7
Loop condition is false. Exiting loop
666 if (port->IsHandleClosing()) {
8
Taking true branch
667 // Construction failed with an exception.
668 return nullptr;
9
Potential leak of memory pointed to by 'port'
669 }
670
671 if (data) {
672 CHECK(!sibling_group)do { if (__builtin_expect(!!(!(!sibling_group)), 0)) { do { static
const node::AssertionInfo args = { "../src/node_messaging.cc"
":" "672", "!sibling_group", __PRETTY_FUNCTION__ }; node::Assert
(args); } while (0); } } while (0)
;
673 port->Detach();
674 port->data_ = std::move(data);
675
676 // This lock is here to avoid race conditions with the `owner_` read
677 // in AddToIncomingQueue(). (This would likely be unproblematic without it,
678 // but it's better to be safe than sorry.)
679 Mutex::ScopedLock lock(port->data_->mutex_);
680 port->data_->owner_ = port;
681 // If the existing MessagePortData object had pending messages, this is
682 // the easiest way to run that queue.
683 port->TriggerAsync();
684 } else if (sibling_group) {
685 sibling_group->Entangle(port->data_.get());
686 }
687 return port;
688}
689
690MaybeLocal<Value> MessagePort::ReceiveMessage(Local<Context> context,
691 MessageProcessingMode mode,
692 Local<Value>* port_list) {
693 std::shared_ptr<Message> received;
694 {
695 // Get the head of the message queue.
696 Mutex::ScopedLock lock(data_->mutex_);
697
698 Debug(this, "MessagePort has message");
699
700 bool wants_message =
701 receiving_messages_ ||
702 mode == MessageProcessingMode::kForceReadMessages;
703 // We have nothing to do if:
704 // - There are no pending messages
705 // - We are not intending to receive messages, and the message we would
706 // receive is not the final "close" message.
707 if (data_->incoming_messages_.empty() ||
708 (!wants_message &&
709 !data_->incoming_messages_.front()->IsCloseMessage())) {
710 return env()->no_message_symbol();
711 }
712
713 received = data_->incoming_messages_.front();
714 data_->incoming_messages_.pop_front();
715 }
716
717 if (received->IsCloseMessage()) {
718 Close();
719 return env()->no_message_symbol();
720 }
721
722 if (!env()->can_call_into_js()) return MaybeLocal<Value>();
723
724 return received->Deserialize(env(), context, port_list);
725}
726
727void MessagePort::OnMessage(MessageProcessingMode mode) {
728 Debug(this, "Running MessagePort::OnMessage()");
729 HandleScope handle_scope(env()->isolate());
730 Local<Context> context =
731 object(env()->isolate())->GetCreationContext().ToLocalChecked();
732
733 size_t processing_limit;
734 if (mode == MessageProcessingMode::kNormalOperation) {
735 Mutex::ScopedLock(data_->mutex_);
736 processing_limit = std::max(data_->incoming_messages_.size(),
737 static_cast<size_t>(1000));
738 } else {
739 processing_limit = std::numeric_limits<size_t>::max();
740 }
741
742 // data_ can only ever be modified by the owner thread, so no need to lock.
743 // However, the message port may be transferred while it is processing
744 // messages, so we need to check that this handle still owns its `data_` field
745 // on every iteration.
746 while (data_) {
747 if (processing_limit-- == 0) {
748 // Prevent event loop starvation by only processing those messages without
749 // interruption that were already present when the OnMessage() call was
750 // first triggered, but at least 1000 messages because otherwise the
751 // overhead of repeatedly triggering the uv_async_t instance becomes
752 // noticeable, at least on Windows.
753 // (That might require more investigation by somebody more familiar with
754 // Windows.)
755 TriggerAsync();
756 return;
757 }
758
759 HandleScope handle_scope(env()->isolate());
760 Context::Scope context_scope(context);
761 Local<Function> emit_message = PersistentToLocal::Strong(emit_message_fn_);
762
763 Local<Value> payload;
764 Local<Value> port_list = Undefined(env()->isolate());
765 Local<Value> message_error;
766 Local<Value> argv[3];
767
768 {
769 // Catch any exceptions from parsing the message itself (not from
770 // emitting it) as 'messageeror' events.
771 TryCatchScope try_catch(env());
772 if (!ReceiveMessage(context, mode, &port_list).ToLocal(&payload)) {
773 if (try_catch.HasCaught() && !try_catch.HasTerminated())
774 message_error = try_catch.Exception();
775 goto reschedule;
776 }
777 }
778 if (payload == env()->no_message_symbol()) break;
779
780 if (!env()->can_call_into_js()) {
781 Debug(this, "MessagePort drains queue because !can_call_into_js()");
782 // In this case there is nothing to do but to drain the current queue.
783 continue;
784 }
785
786 argv[0] = payload;
787 argv[1] = port_list;
788 argv[2] = env()->message_string();
789
790 if (MakeCallback(emit_message, arraysize(argv), argv).IsEmpty()) {
791 reschedule:
792 if (!message_error.IsEmpty()) {
793 argv[0] = message_error;
794 argv[1] = Undefined(env()->isolate());
795 argv[2] = env()->messageerror_string();
796 USE(MakeCallback(emit_message, arraysize(argv), argv));
797 }
798
799 // Re-schedule OnMessage() execution in case of failure.
800 if (data_)
801 TriggerAsync();
802 return;
803 }
804 }
805}
806
807void MessagePort::OnClose() {
808 Debug(this, "MessagePort::OnClose()");
809 if (data_) {
810 // Detach() returns move(data_).
811 Detach()->Disentangle();
812 }
813}
814
815std::unique_ptr<MessagePortData> MessagePort::Detach() {
816 CHECK(data_)do { if (__builtin_expect(!!(!(data_)), 0)) { do { static const
node::AssertionInfo args = { "../src/node_messaging.cc" ":" "816"
, "data_", __PRETTY_FUNCTION__ }; node::Assert(args); } while
(0); } } while (0)
;
817 Mutex::ScopedLock lock(data_->mutex_);
818 data_->owner_ = nullptr;
819 return std::move(data_);
820}
821
822BaseObject::TransferMode MessagePort::GetTransferMode() const {
823 if (IsDetached())
824 return BaseObject::TransferMode::kUntransferable;
825 return BaseObject::TransferMode::kTransferable;
826}
827
828std::unique_ptr<TransferData> MessagePort::TransferForMessaging() {
829 Close();
830 return Detach();
831}
832
833BaseObjectPtr<BaseObject> MessagePortData::Deserialize(
834 Environment* env,
835 Local<Context> context,
836 std::unique_ptr<TransferData> self) {
837 return BaseObjectPtr<MessagePort> { MessagePort::New(
838 env, context,
839 static_unique_pointer_cast<MessagePortData>(std::move(self))) };
840}
841
842Maybe<bool> MessagePort::PostMessage(Environment* env,
843 Local<Context> context,
844 Local<Value> message_v,
845 const TransferList& transfer_v) {
846 Isolate* isolate = env->isolate();
847 Local<Object> obj = object(isolate);
848
849 std::shared_ptr<Message> msg = std::make_shared<Message>();
850
851 // Per spec, we need to both check if transfer list has the source port, and
852 // serialize the input message, even if the MessagePort is closed or detached.
853
854 Maybe<bool> serialization_maybe =
855 msg->Serialize(env, context, message_v, transfer_v, obj);
856 if (data_ == nullptr) {
857 return serialization_maybe;
858 }
859 if (serialization_maybe.IsNothing()) {
860 return Nothing<bool>();
861 }
862
863 std::string error;
864 Maybe<bool> res = data_->Dispatch(msg, &error);
865 if (res.IsNothing())
866 return res;
867
868 if (!error.empty())
869 ProcessEmitWarning(env, error.c_str());
870
871 return res;
872}
873
874Maybe<bool> MessagePortData::Dispatch(
875 std::shared_ptr<Message> message,
876 std::string* error) {
877 if (!group_) {
878 if (error != nullptr)
879 *error = "MessagePortData is not entangled.";
880 return Nothing<bool>();
881 }
882 return group_->Dispatch(this, message, error);
883}
884
885static Maybe<bool> ReadIterable(Environment* env,
886 Local<Context> context,
887 // NOLINTNEXTLINE(runtime/references)
888 TransferList& transfer_list,
889 Local<Value> object) {
890 if (!object->IsObject()) return Just(false);
891
892 if (object->IsArray()) {
893 Local<Array> arr = object.As<Array>();
894 size_t length = arr->Length();
895 transfer_list.AllocateSufficientStorage(length);
896 for (size_t i = 0; i < length; i++) {
897 if (!arr->Get(context, i).ToLocal(&transfer_list[i]))
898 return Nothing<bool>();
899 }
900 return Just(true);
901 }
902
903 Isolate* isolate = env->isolate();
904 Local<Value> iterator_method;
905 if (!object.As<Object>()->Get(context, Symbol::GetIterator(isolate))
906 .ToLocal(&iterator_method)) return Nothing<bool>();
907 if (!iterator_method->IsFunction()) return Just(false);
908
909 Local<Value> iterator;
910 if (!iterator_method.As<Function>()->Call(context, object, 0, nullptr)
911 .ToLocal(&iterator)) return Nothing<bool>();
912 if (!iterator->IsObject()) return Just(false);
913
914 Local<Value> next;
915 if (!iterator.As<Object>()->Get(context, env->next_string()).ToLocal(&next))
916 return Nothing<bool>();
917 if (!next->IsFunction()) return Just(false);
918
919 std::vector<Local<Value>> entries;
920 while (env->can_call_into_js()) {
921 Local<Value> result;
922 if (!next.As<Function>()->Call(context, iterator, 0, nullptr)
923 .ToLocal(&result)) return Nothing<bool>();
924 if (!result->IsObject()) return Just(false);
925
926 Local<Value> done;
927 if (!result.As<Object>()->Get(context, env->done_string()).ToLocal(&done))
928 return Nothing<bool>();
929 if (done->BooleanValue(isolate)) break;
930
931 Local<Value> val;
932 if (!result.As<Object>()->Get(context, env->value_string()).ToLocal(&val))
933 return Nothing<bool>();
934 entries.push_back(val);
935 }
936
937 transfer_list.AllocateSufficientStorage(entries.size());
938 std::copy(entries.begin(), entries.end(), &transfer_list[0]);
939 return Just(true);
940}
941
942void MessagePort::PostMessage(const FunctionCallbackInfo<Value>& args) {
943 Environment* env = Environment::GetCurrent(args);
944 Local<Object> obj = args.This();
945 Local<Context> context = obj->GetCreationContext().ToLocalChecked();
946
947 if (args.Length() == 0) {
948 return THROW_ERR_MISSING_ARGS(env, "Not enough arguments to "
949 "MessagePort.postMessage");
950 }
951
952 if (!args[1]->IsNullOrUndefined() && !args[1]->IsObject()) {
953 // Browsers ignore null or undefined, and otherwise accept an array or an
954 // options object.
955 return THROW_ERR_INVALID_ARG_TYPE(env,
956 "Optional transferList argument must be an iterable");
957 }
958
959 TransferList transfer_list;
960 if (args[1]->IsObject()) {
961 bool was_iterable;
962 if (!ReadIterable(env, context, transfer_list, args[1]).To(&was_iterable))
963 return;
964 if (!was_iterable) {
965 Local<Value> transfer_option;
966 if (!args[1].As<Object>()->Get(context, env->transfer_string())
967 .ToLocal(&transfer_option)) return;
968 if (!transfer_option->IsUndefined()) {
969 if (!ReadIterable(env, context, transfer_list, transfer_option)
970 .To(&was_iterable)) return;
971 if (!was_iterable) {
972 return THROW_ERR_INVALID_ARG_TYPE(env,
973 "Optional options.transfer argument must be an iterable");
974 }
975 }
976 }
977 }
978
979 MessagePort* port = Unwrap<MessagePort>(args.This());
980 // Even if the backing MessagePort object has already been deleted, we still
981 // want to serialize the message to ensure spec-compliant behavior w.r.t.
982 // transfers.
983 if (port == nullptr || port->IsHandleClosing()) {
984 Message msg;
985 USE(msg.Serialize(env, context, args[0], transfer_list, obj));
986 return;
987 }
988
989 Maybe<bool> res = port->PostMessage(env, context, args[0], transfer_list);
990 if (res.IsJust())
991 args.GetReturnValue().Set(res.FromJust());
992}
993
994void MessagePort::Start() {
995 Debug(this, "Start receiving messages");
996 receiving_messages_ = true;
997 Mutex::ScopedLock lock(data_->mutex_);
998 if (!data_->incoming_messages_.empty())
999 TriggerAsync();
1000}
1001
1002void MessagePort::Stop() {
1003 Debug(this, "Stop receiving messages");
1004 receiving_messages_ = false;
1005}
1006
1007void MessagePort::Start(const FunctionCallbackInfo<Value>& args) {
1008 MessagePort* port;
1009 ASSIGN_OR_RETURN_UNWRAP(&port, args.This())do { *&port = static_cast<typename std::remove_reference
<decltype(*&port)>::type>( BaseObject::FromJSObject
(args.This())); if (*&port == nullptr) return ; } while (
0)
;
1010 if (!port->data_) {
1011 return;
1012 }
1013 port->Start();
1014}
1015
1016void MessagePort::Stop(const FunctionCallbackInfo<Value>& args) {
1017 MessagePort* port;
1018 CHECK(args[0]->IsObject())do { if (__builtin_expect(!!(!(args[0]->IsObject())), 0)) {
do { static const node::AssertionInfo args = { "../src/node_messaging.cc"
":" "1018", "args[0]->IsObject()", __PRETTY_FUNCTION__ };
node::Assert(args); } while (0); } } while (0)
;
1019 ASSIGN_OR_RETURN_UNWRAP(&port, args[0].As<Object>())do { *&port = static_cast<typename std::remove_reference
<decltype(*&port)>::type>( BaseObject::FromJSObject
(args[0].As<Object>())); if (*&port == nullptr) return
; } while (0)
;
1020 if (!port->data_) {
1021 return;
1022 }
1023 port->Stop();
1024}
1025
1026void MessagePort::CheckType(const FunctionCallbackInfo<Value>& args) {
1027 Environment* env = Environment::GetCurrent(args);
1028 args.GetReturnValue().Set(
1029 GetMessagePortConstructorTemplate(env)->HasInstance(args[0]));
1030}
1031
1032void MessagePort::Drain(const FunctionCallbackInfo<Value>& args) {
1033 MessagePort* port;
1034 ASSIGN_OR_RETURN_UNWRAP(&port, args[0].As<Object>())do { *&port = static_cast<typename std::remove_reference
<decltype(*&port)>::type>( BaseObject::FromJSObject
(args[0].As<Object>())); if (*&port == nullptr) return
; } while (0)
;
1035 port->OnMessage(MessageProcessingMode::kForceReadMessages);
1036}
1037
1038void MessagePort::ReceiveMessage(const FunctionCallbackInfo<Value>& args) {
1039 Environment* env = Environment::GetCurrent(args);
1040 if (!args[0]->IsObject() ||
1041 !env->message_port_constructor_template()->HasInstance(args[0])) {
1042 return THROW_ERR_INVALID_ARG_TYPE(env,
1043 "The \"port\" argument must be a MessagePort instance");
1044 }
1045 MessagePort* port = Unwrap<MessagePort>(args[0].As<Object>());
1046 if (port == nullptr) {
1047 // Return 'no messages' for a closed port.
1048 args.GetReturnValue().Set(
1049 Environment::GetCurrent(args)->no_message_symbol());
1050 return;
1051 }
1052
1053 MaybeLocal<Value> payload = port->ReceiveMessage(
1054 port->object()->GetCreationContext().ToLocalChecked(),
1055 MessageProcessingMode::kForceReadMessages);
1056 if (!payload.IsEmpty())
1057 args.GetReturnValue().Set(payload.ToLocalChecked());
1058}
1059
1060void MessagePort::MoveToContext(const FunctionCallbackInfo<Value>& args) {
1061 Environment* env = Environment::GetCurrent(args);
1062 if (!args[0]->IsObject() ||
1063 !env->message_port_constructor_template()->HasInstance(args[0])) {
1064 return THROW_ERR_INVALID_ARG_TYPE(env,
1065 "The \"port\" argument must be a MessagePort instance");
1066 }
1067 MessagePort* port = Unwrap<MessagePort>(args[0].As<Object>());
1068 if (port == nullptr || port->IsHandleClosing()) {
1069 Isolate* isolate = env->isolate();
1070 THROW_ERR_CLOSED_MESSAGE_PORT(isolate);
1071 return;
1072 }
1073
1074 Local<Value> context_arg = args[1];
1075 ContextifyContext* context_wrapper;
1076 if (!context_arg->IsObject() ||
1077 (context_wrapper = ContextifyContext::ContextFromContextifiedSandbox(
1078 env, context_arg.As<Object>())) == nullptr) {
1079 return THROW_ERR_INVALID_ARG_TYPE(env, "Invalid context argument");
1080 }
1081
1082 std::unique_ptr<MessagePortData> data;
1083 if (!port->IsDetached())
1084 data = port->Detach();
1085
1086 Context::Scope context_scope(context_wrapper->context());
1087 MessagePort* target =
1088 MessagePort::New(env, context_wrapper->context(), std::move(data));
1089 if (target != nullptr)
1090 args.GetReturnValue().Set(target->object());
1091}
1092
1093void MessagePort::Entangle(MessagePort* a, MessagePort* b) {
1094 MessagePortData::Entangle(a->data_.get(), b->data_.get());
1095}
1096
1097void MessagePort::Entangle(MessagePort* a, MessagePortData* b) {
1098 MessagePortData::Entangle(a->data_.get(), b);
1099}
1100
1101void MessagePort::MemoryInfo(MemoryTracker* tracker) const {
1102 tracker->TrackField("data", data_);
1103 tracker->TrackField("emit_message_fn", emit_message_fn_);
1104}
1105
1106Local<FunctionTemplate> GetMessagePortConstructorTemplate(Environment* env) {
1107 // Factor generating the MessagePort JS constructor into its own piece
1108 // of code, because it is needed early on in the child environment setup.
1109 Local<FunctionTemplate> templ = env->message_port_constructor_template();
1110 if (!templ.IsEmpty())
1111 return templ;
1112
1113 {
1114 Local<FunctionTemplate> m = env->NewFunctionTemplate(MessagePort::New);
1115 m->SetClassName(env->message_port_constructor_string());
1116 m->InstanceTemplate()->SetInternalFieldCount(
1117 MessagePort::kInternalFieldCount);
1118 m->Inherit(HandleWrap::GetConstructorTemplate(env));
1119
1120 env->SetProtoMethod(m, "postMessage", MessagePort::PostMessage);
1121 env->SetProtoMethod(m, "start", MessagePort::Start);
1122
1123 env->set_message_port_constructor_template(m);
1124 }
1125
1126 return GetMessagePortConstructorTemplate(env);
1127}
1128
1129JSTransferable::JSTransferable(Environment* env, Local<Object> obj)
1130 : BaseObject(env, obj) {
1131 MakeWeak();
1132}
1133
1134void JSTransferable::New(const FunctionCallbackInfo<Value>& args) {
1135 CHECK(args.IsConstructCall())do { if (__builtin_expect(!!(!(args.IsConstructCall())), 0)) {
do { static const node::AssertionInfo args = { "../src/node_messaging.cc"
":" "1135", "args.IsConstructCall()", __PRETTY_FUNCTION__ };
node::Assert(args); } while (0); } } while (0)
;
1136 new JSTransferable(Environment::GetCurrent(args), args.This());
1137}
1138
1139JSTransferable::TransferMode JSTransferable::GetTransferMode() const {
1140 // Implement `kClone in this ? kCloneable : kTransferable`.
1141 HandleScope handle_scope(env()->isolate());
1142 errors::TryCatchScope ignore_exceptions(env());
1143
1144 bool has_clone;
1145 if (!object()->Has(env()->context(),
1146 env()->messaging_clone_symbol()).To(&has_clone)) {
1147 return TransferMode::kUntransferable;
1148 }
1149
1150 return has_clone ? TransferMode::kCloneable : TransferMode::kTransferable;
1151}
1152
1153std::unique_ptr<TransferData> JSTransferable::TransferForMessaging() {
1154 return TransferOrClone(TransferMode::kTransferable);
1155}
1156
1157std::unique_ptr<TransferData> JSTransferable::CloneForMessaging() const {
1158 return TransferOrClone(TransferMode::kCloneable);
1159}
1160
1161std::unique_ptr<TransferData> JSTransferable::TransferOrClone(
1162 TransferMode mode) const {
1163 // Call `this[symbol]()` where `symbol` is `kClone` or `kTransfer`,
1164 // which should return an object with `data` and `deserializeInfo` properties;
1165 // `data` is written to the serializer later, and `deserializeInfo` is stored
1166 // on the `TransferData` instance as a string.
1167 HandleScope handle_scope(env()->isolate());
1168 Local<Context> context = env()->isolate()->GetCurrentContext();
1169 Local<Symbol> method_name = mode == TransferMode::kCloneable ?
1170 env()->messaging_clone_symbol() : env()->messaging_transfer_symbol();
1171
1172 Local<Value> method;
1173 if (!object()->Get(context, method_name).ToLocal(&method)) {
1174 return {};
1175 }
1176 if (method->IsFunction()) {
1177 Local<Value> result_v;
1178 if (!method.As<Function>()->Call(
1179 context, object(), 0, nullptr).ToLocal(&result_v)) {
1180 return {};
1181 }
1182
1183 if (result_v->IsObject()) {
1184 Local<Object> result = result_v.As<Object>();
1185 Local<Value> data;
1186 Local<Value> deserialize_info;
1187 if (!result->Get(context, env()->data_string()).ToLocal(&data) ||
1188 !result->Get(context, env()->deserialize_info_string())
1189 .ToLocal(&deserialize_info)) {
1190 return {};
1191 }
1192 Utf8Value deserialize_info_str(env()->isolate(), deserialize_info);
1193 if (*deserialize_info_str == nullptr) return {};
1194 return std::make_unique<Data>(
1195 *deserialize_info_str, Global<Value>(env()->isolate(), data));
1196 }
1197 }
1198
1199 if (mode == TransferMode::kTransferable)
1200 return TransferOrClone(TransferMode::kCloneable);
1201 else
1202 return {};
1203}
1204
1205Maybe<BaseObjectList>
1206JSTransferable::NestedTransferables() const {
1207 // Call `this[kTransferList]()` and return the resulting list of BaseObjects.
1208 HandleScope handle_scope(env()->isolate());
1209 Local<Context> context = env()->isolate()->GetCurrentContext();
1210 Local<Symbol> method_name = env()->messaging_transfer_list_symbol();
1211
1212 Local<Value> method;
1213 if (!object()->Get(context, method_name).ToLocal(&method)) {
1214 return Nothing<BaseObjectList>();
1215 }
1216 if (!method->IsFunction()) return Just(BaseObjectList {});
1217
1218 Local<Value> list_v;
1219 if (!method.As<Function>()->Call(
1220 context, object(), 0, nullptr).ToLocal(&list_v)) {
1221 return Nothing<BaseObjectList>();
1222 }
1223 if (!list_v->IsArray()) return Just(BaseObjectList {});
1224 Local<Array> list = list_v.As<Array>();
1225
1226 BaseObjectList ret;
1227 for (size_t i = 0; i < list->Length(); i++) {
1228 Local<Value> value;
1229 if (!list->Get(context, i).ToLocal(&value))
1230 return Nothing<BaseObjectList>();
1231 if (env()->base_object_ctor_template()->HasInstance(value))
1232 ret.emplace_back(Unwrap<BaseObject>(value));
1233 }
1234 return Just(ret);
1235}
1236
1237Maybe<bool> JSTransferable::FinalizeTransferRead(
1238 Local<Context> context, ValueDeserializer* deserializer) {
1239 // Call `this[kDeserialize](data)` where `data` comes from the return value
1240 // of `this[kTransfer]()` or `this[kClone]()`.
1241 HandleScope handle_scope(env()->isolate());
1242 Local<Value> data;
1243 if (!deserializer->ReadValue(context).ToLocal(&data)) return Nothing<bool>();
1244
1245 Local<Symbol> method_name = env()->messaging_deserialize_symbol();
1246 Local<Value> method;
1247 if (!object()->Get(context, method_name).ToLocal(&method)) {
1248 return Nothing<bool>();
1249 }
1250 if (!method->IsFunction()) return Just(true);
1251
1252 if (method.As<Function>()->Call(context, object(), 1, &data).IsEmpty()) {
1253 return Nothing<bool>();
1254 }
1255 return Just(true);
1256}
1257
1258JSTransferable::Data::Data(std::string&& deserialize_info,
1259 v8::Global<v8::Value>&& data)
1260 : deserialize_info_(std::move(deserialize_info)),
1261 data_(std::move(data)) {}
1262
1263BaseObjectPtr<BaseObject> JSTransferable::Data::Deserialize(
1264 Environment* env,
1265 Local<Context> context,
1266 std::unique_ptr<TransferData> self) {
1267 // Create the JS wrapper object that will later be filled with data passed to
1268 // the `[kDeserialize]()` method on it. This split is necessary, because here
1269 // we need to create an object with the right prototype and internal fields,
1270 // but the actual JS data stored in the serialized data can only be read at
1271 // the end of the stream, after the main message has been read.
1272
1273 if (context != env->context()) {
1274 THROW_ERR_MESSAGE_TARGET_CONTEXT_UNAVAILABLE(env);
1275 return {};
1276 }
1277 HandleScope handle_scope(env->isolate());
1278 Local<Value> info;
1279 if (!ToV8Value(context, deserialize_info_).ToLocal(&info)) return {};
1280
1281 Local<Value> ret;
1282 CHECK(!env->messaging_deserialize_create_object().IsEmpty())do { if (__builtin_expect(!!(!(!env->messaging_deserialize_create_object
().IsEmpty())), 0)) { do { static const node::AssertionInfo args
= { "../src/node_messaging.cc" ":" "1282", "!env->messaging_deserialize_create_object().IsEmpty()"
, __PRETTY_FUNCTION__ }; node::Assert(args); } while (0); } }
while (0)
;
1283 if (!env->messaging_deserialize_create_object()->Call(
1284 context, Null(env->isolate()), 1, &info).ToLocal(&ret) ||
1285 !env->base_object_ctor_template()->HasInstance(ret)) {
1286 return {};
1287 }
1288
1289 return BaseObjectPtr<BaseObject> { Unwrap<BaseObject>(ret) };
1290}
1291
1292Maybe<bool> JSTransferable::Data::FinalizeTransferWrite(
1293 Local<Context> context, ValueSerializer* serializer) {
1294 HandleScope handle_scope(context->GetIsolate());
1295 auto ret = serializer->WriteValue(context, PersistentToLocal::Strong(data_));
1296 data_.Reset();
1297 return ret;
1298}
1299
1300std::shared_ptr<SiblingGroup> SiblingGroup::Get(const std::string& name) {
1301 Mutex::ScopedLock lock(SiblingGroup::groups_mutex_);
1302 std::shared_ptr<SiblingGroup> group;
1303 auto i = groups_.find(name);
1304 if (i == groups_.end() || i->second.expired()) {
1305 group = std::make_shared<SiblingGroup>(name);
1306 groups_[name] = group;
1307 } else {
1308 group = i->second.lock();
1309 }
1310 return group;
1311}
1312
1313void SiblingGroup::CheckSiblingGroup(const std::string& name) {
1314 Mutex::ScopedLock lock(SiblingGroup::groups_mutex_);
1315 auto i = groups_.find(name);
1316 if (i != groups_.end() && i->second.expired())
1317 groups_.erase(name);
1318}
1319
1320SiblingGroup::SiblingGroup(const std::string& name)
1321 : name_(name) { }
1322
1323SiblingGroup::~SiblingGroup() {
1324 // If this is a named group, check to see if we can remove the group
1325 if (!name_.empty())
1326 CheckSiblingGroup(name_);
1327}
1328
1329Maybe<bool> SiblingGroup::Dispatch(
1330 MessagePortData* source,
1331 std::shared_ptr<Message> message,
1332 std::string* error) {
1333
1334 RwLock::ScopedReadLock lock(group_mutex_);
1335
1336 // The source MessagePortData is not part of this group.
1337 if (ports_.find(source) == ports_.end()) {
1338 if (error != nullptr)
1339 *error = "Source MessagePort is not entangled with this group.";
1340 return Nothing<bool>();
1341 }
1342
1343 // There are no destination ports.
1344 if (size() <= 1)
1345 return Just(false);
1346
1347 // Transferables cannot be used when there is more
1348 // than a single destination.
1349 if (size() > 2 && message->has_transferables()) {
1350 if (error != nullptr)
1351 *error = "Transferables cannot be used with multiple destinations.";
1352 return Nothing<bool>();
1353 }
1354
1355 for (MessagePortData* port : ports_) {
1356 if (port == source)
1357 continue;
1358 // This loop should only be entered if there's only a single destination
1359 for (const auto& transferable : message->transferables()) {
1360 if (port == transferable.get()) {
1361 if (error != nullptr) {
1362 *error = "The target port was posted to itself, and the "
1363 "communication channel was lost";
1364 }
1365 return Just(true);
1366 }
1367 }
1368 port->AddToIncomingQueue(message);
1369 }
1370
1371 return Just(true);
1372}
1373
1374void SiblingGroup::Entangle(MessagePortData* port) {
1375 Entangle({ port });
1376}
1377
1378void SiblingGroup::Entangle(std::initializer_list<MessagePortData*> ports) {
1379 RwLock::ScopedWriteLock lock(group_mutex_);
1380 for (MessagePortData* data : ports) {
1381 ports_.insert(data);
1382 CHECK(!data->group_)do { if (__builtin_expect(!!(!(!data->group_)), 0)) { do {
static const node::AssertionInfo args = { "../src/node_messaging.cc"
":" "1382", "!data->group_", __PRETTY_FUNCTION__ }; node::
Assert(args); } while (0); } } while (0)
;
1383 data->group_ = shared_from_this();
1384 }
1385}
1386
1387void SiblingGroup::Disentangle(MessagePortData* data) {
1388 auto self = shared_from_this(); // Keep alive until end of function.
1389 RwLock::ScopedWriteLock lock(group_mutex_);
1390 ports_.erase(data);
1391 data->group_.reset();
1392
1393 data->AddToIncomingQueue(std::make_shared<Message>());
1394 // If this is an anonymous group and there's another port, close it.
1395 if (size() == 1 && name_.empty())
1396 (*(ports_.begin()))->AddToIncomingQueue(std::make_shared<Message>());
1397}
1398
1399SiblingGroup::Map SiblingGroup::groups_;
1400Mutex SiblingGroup::groups_mutex_;
1401
1402namespace {
1403
1404static void SetDeserializerCreateObjectFunction(
1405 const FunctionCallbackInfo<Value>& args) {
1406 Environment* env = Environment::GetCurrent(args);
1407 CHECK(args[0]->IsFunction())do { if (__builtin_expect(!!(!(args[0]->IsFunction())), 0)
) { do { static const node::AssertionInfo args = { "../src/node_messaging.cc"
":" "1407", "args[0]->IsFunction()", __PRETTY_FUNCTION__ }
; node::Assert(args); } while (0); } } while (0)
;
1408 env->set_messaging_deserialize_create_object(args[0].As<Function>());
1409}
1410
1411static void MessageChannel(const FunctionCallbackInfo<Value>& args) {
1412 Environment* env = Environment::GetCurrent(args);
1413 if (!args.IsConstructCall()) {
1
Taking false branch
1414 THROW_ERR_CONSTRUCT_CALL_REQUIRED(env);
1415 return;
1416 }
1417
1418 Local<Context> context = args.This()->GetCreationContext().ToLocalChecked();
1419 Context::Scope context_scope(context);
1420
1421 MessagePort* port1 = MessagePort::New(env, context);
1422 if (port1 == nullptr) return;
2
Taking false branch
1423 MessagePort* port2 = MessagePort::New(env, context);
3
Calling 'MessagePort::New'
1424 if (port2 == nullptr) {
1425 port1->Close();
1426 return;
1427 }
1428
1429 MessagePort::Entangle(port1, port2);
1430
1431 args.This()->Set(context, env->port1_string(), port1->object())
1432 .Check();
1433 args.This()->Set(context, env->port2_string(), port2->object())
1434 .Check();
1435}
1436
1437static void BroadcastChannel(const FunctionCallbackInfo<Value>& args) {
1438 CHECK(args[0]->IsString())do { if (__builtin_expect(!!(!(args[0]->IsString())), 0)) {
do { static const node::AssertionInfo args = { "../src/node_messaging.cc"
":" "1438", "args[0]->IsString()", __PRETTY_FUNCTION__ };
node::Assert(args); } while (0); } } while (0)
;
1439 Environment* env = Environment::GetCurrent(args);
1440 Context::Scope context_scope(env->context());
1441 Utf8Value name(env->isolate(), args[0]);
1442 MessagePort* port =
1443 MessagePort::New(env, env->context(), {}, SiblingGroup::Get(*name));
1444 if (port != nullptr) {
1445 args.GetReturnValue().Set(port->object());
1446 }
1447}
1448
1449static void InitMessaging(Local<Object> target,
1450 Local<Value> unused,
1451 Local<Context> context,
1452 void* priv) {
1453 Environment* env = Environment::GetCurrent(context);
1454
1455 {
1456 env->SetConstructorFunction(
1457 target,
1458 "MessageChannel",
1459 env->NewFunctionTemplate(MessageChannel));
1460 }
1461
1462 {
1463 Local<FunctionTemplate> t = env->NewFunctionTemplate(JSTransferable::New);
1464 t->Inherit(BaseObject::GetConstructorTemplate(env));
1465 t->InstanceTemplate()->SetInternalFieldCount(
1466 JSTransferable::kInternalFieldCount);
1467 env->SetConstructorFunction(target, "JSTransferable", t);
1468 }
1469
1470 env->SetConstructorFunction(
1471 target,
1472 env->message_port_constructor_string(),
1473 GetMessagePortConstructorTemplate(env));
1474
1475 // These are not methods on the MessagePort prototype, because
1476 // the browser equivalents do not provide them.
1477 env->SetMethod(target, "stopMessagePort", MessagePort::Stop);
1478 env->SetMethod(target, "checkMessagePort", MessagePort::CheckType);
1479 env->SetMethod(target, "drainMessagePort", MessagePort::Drain);
1480 env->SetMethod(target, "receiveMessageOnPort", MessagePort::ReceiveMessage);
1481 env->SetMethod(target, "moveMessagePortToContext",
1482 MessagePort::MoveToContext);
1483 env->SetMethod(target, "setDeserializerCreateObjectFunction",
1484 SetDeserializerCreateObjectFunction);
1485 env->SetMethod(target, "broadcastChannel", BroadcastChannel);
1486
1487 {
1488 Local<Function> domexception = GetDOMException(context).ToLocalChecked();
1489 target
1490 ->Set(context,
1491 FIXED_ONE_BYTE_STRING(env->isolate(), "DOMException"),
1492 domexception)
1493 .Check();
1494 }
1495}
1496
1497static void RegisterExternalReferences(ExternalReferenceRegistry* registry) {
1498 registry->Register(MessageChannel);
1499 registry->Register(BroadcastChannel);
1500 registry->Register(JSTransferable::New);
1501 registry->Register(MessagePort::New);
1502 registry->Register(MessagePort::PostMessage);
1503 registry->Register(MessagePort::Start);
1504 registry->Register(MessagePort::Stop);
1505 registry->Register(MessagePort::CheckType);
1506 registry->Register(MessagePort::Drain);
1507 registry->Register(MessagePort::ReceiveMessage);
1508 registry->Register(MessagePort::MoveToContext);
1509 registry->Register(SetDeserializerCreateObjectFunction);
1510}
1511
1512} // anonymous namespace
1513
1514} // namespace worker
1515} // namespace node
1516
1517NODE_MODULE_CONTEXT_AWARE_INTERNAL(messaging, node::worker::InitMessaging)static node::node_module _module = { 108, NM_F_INTERNAL, nullptr
, "../src/node_messaging.cc", nullptr, (node::addon_context_register_func
)(node::worker::InitMessaging), "messaging", nullptr, nullptr
}; void _register_messaging() { node_module_register(&_module
); }
1518NODE_MODULE_EXTERNAL_REFERENCE(messaging,void _register_external_reference_messaging( node::ExternalReferenceRegistry
* registry) { node::worker::RegisterExternalReferences(registry
); }
1519 node::worker::RegisterExternalReferences)void _register_external_reference_messaging( node::ExternalReferenceRegistry
* registry) { node::worker::RegisterExternalReferences(registry
); }

../src/node_messaging.h

1#ifndef SRC_NODE_MESSAGING_H_
2#define SRC_NODE_MESSAGING_H_
3
4#if defined(NODE_WANT_INTERNALS1) && NODE_WANT_INTERNALS1
5
6#include "env.h"
7#include "node_mutex.h"
8#include "v8.h"
9#include <deque>
10#include <string>
11#include <unordered_map>
12#include <set>
13
14namespace node {
15namespace worker {
16
17class MessagePortData;
18class MessagePort;
19
20typedef MaybeStackBuffer<v8::Local<v8::Value>, 8> TransferList;
21
22// Used to represent the in-flight structure of an object that is being
23// transferred or cloned using postMessage().
24class TransferData : public MemoryRetainer {
25 public:
26 // Deserialize this object on the receiving end after a .postMessage() call.
27 // - `context` may not be the same as `env->context()`. This method should
28 // not produce JS objects coming from Contexts other than `context`.
29 // - `self` is a unique_ptr for the object that this is being called on.
30 // - The return value is treated like a `Maybe`, i.e. if `nullptr` is
31 // returned, any further deserialization of the message is stopped and
32 // control is returned to the event loop or JS as soon as possible.
33 virtual BaseObjectPtr<BaseObject> Deserialize(
34 Environment* env,
35 v8::Local<v8::Context> context,
36 std::unique_ptr<TransferData> self) = 0;
37 // FinalizeTransferWrite() is the counterpart to
38 // BaseObject::FinalizeTransferRead(). It is called right after the transfer
39 // data was created, and defaults to doing nothing. After this function,
40 // this object should not hold any more Isolate-specific data.
41 virtual v8::Maybe<bool> FinalizeTransferWrite(
42 v8::Local<v8::Context> context, v8::ValueSerializer* serializer);
43};
44
45// Represents a single communication message.
46class Message : public MemoryRetainer {
47 public:
48 // Create a Message with a specific underlying payload, in the format of the
49 // V8 ValueSerializer API. If `payload` is empty, this message indicates
50 // that the receiving message port should close itself.
51 explicit Message(MallocedBuffer<char>&& payload = MallocedBuffer<char>());
52 ~Message() = default;
53
54 Message(Message&& other) = default;
55 Message& operator=(Message&& other) = default;
56 Message& operator=(const Message&) = delete;
57 Message(const Message&) = delete;
58
59 // Whether this is a message indicating that the port is to be closed.
60 // This is the last message to be received by a MessagePort.
61 bool IsCloseMessage() const;
62
63 // Deserialize the contained JS value. May only be called once, and only
64 // after Serialize() has been called (e.g. by another thread).
65 v8::MaybeLocal<v8::Value> Deserialize(
66 Environment* env,
67 v8::Local<v8::Context> context,
68 v8::Local<v8::Value>* port_list = nullptr);
69
70 // Serialize a JS value, and optionally transfer objects, into this message.
71 // The Message object retains ownership of all transferred objects until
72 // deserialization.
73 // The source_port parameter, if provided, will make Serialize() throw a
74 // "DataCloneError" DOMException if source_port is found in transfer_list.
75 v8::Maybe<bool> Serialize(Environment* env,
76 v8::Local<v8::Context> context,
77 v8::Local<v8::Value> input,
78 const TransferList& transfer_list,
79 v8::Local<v8::Object> source_port =
80 v8::Local<v8::Object>());
81
82 // Internal method of Message that is called when a new SharedArrayBuffer
83 // object is encountered in the incoming value's structure.
84 void AddSharedArrayBuffer(std::shared_ptr<v8::BackingStore> backing_store);
85 // Internal method of Message that is called once serialization finishes
86 // and that transfers ownership of `data` to this message.
87 void AddTransferable(std::unique_ptr<TransferData>&& data);
88 // Internal method of Message that is called when a new WebAssembly.Module
89 // object is encountered in the incoming value's structure.
90 uint32_t AddWASMModule(v8::CompiledWasmModule&& mod);
91
92 // The host objects that will be transferred, as recorded by Serialize()
93 // (e.g. MessagePorts).
94 // Used for warning user about posting the target MessagePort to itself,
95 // which will as a side effect destroy the communication channel.
96 const std::vector<std::unique_ptr<TransferData>>& transferables() const {
97 return transferables_;
98 }
99 bool has_transferables() const {
100 return !transferables_.empty() || !array_buffers_.empty();
101 }
102
103 void MemoryInfo(MemoryTracker* tracker) const override;
104
105 SET_MEMORY_INFO_NAME(Message)inline std::string MemoryInfoName() const override { return "Message"
; }
106 SET_SELF_SIZE(Message)inline size_t SelfSize() const override { return sizeof(Message
); }
107
108 private:
109 MallocedBuffer<char> main_message_buf_;
110 // TODO(addaleax): Make this a std::variant to save storage size in the common
111 // case (which is that all of these vectors are empty) once that is available
112 // with C++17.
113 std::vector<std::shared_ptr<v8::BackingStore>> array_buffers_;
114 std::vector<std::shared_ptr<v8::BackingStore>> shared_array_buffers_;
115 std::vector<std::unique_ptr<TransferData>> transferables_;
116 std::vector<v8::CompiledWasmModule> wasm_modules_;
117
118 friend class MessagePort;
119};
120
121class SiblingGroup final : public std::enable_shared_from_this<SiblingGroup> {
122 public:
123 // Named SiblingGroup, Used for one-to-many BroadcastChannels.
124 static std::shared_ptr<SiblingGroup> Get(const std::string& name);
125
126 // Anonymous SiblingGroup, Used for one-to-one MessagePort pairs.
127 SiblingGroup() = default;
128 explicit SiblingGroup(const std::string& name);
129 ~SiblingGroup();
130
131 // Dispatches the Message to the collection of associated
132 // ports. If there is more than one destination port and
133 // the Message contains transferables, Dispatch will fail.
134 // Returns Just(true) if successful and the message was
135 // dispatched to at least one destination. Returns Just(false)
136 // if there were no destinations. Returns Nothing<bool>()
137 // if there was an error. If error is not nullptr, it will
138 // be set to an error message or warning message as appropriate.
139 v8::Maybe<bool> Dispatch(
140 MessagePortData* source,
141 std::shared_ptr<Message> message,
142 std::string* error = nullptr);
143
144 void Entangle(MessagePortData* data);
145 void Entangle(std::initializer_list<MessagePortData*> data);
146 void Disentangle(MessagePortData* data);
147
148 const std::string& name() const { return name_; }
149
150 size_t size() const { return ports_.size(); }
151
152 private:
153 const std::string name_;
154 RwLock group_mutex_; // Protects ports_.
155 std::set<MessagePortData*> ports_;
156
157 static void CheckSiblingGroup(const std::string& name);
158
159 using Map =
160 std::unordered_map<std::string, std::weak_ptr<SiblingGroup>>;
161
162 static Mutex groups_mutex_;
163 static Map groups_;
164};
165
166// This contains all data for a `MessagePort` instance that is not tied to
167// a specific Environment/Isolate/event loop, for easier transfer between those.
168class MessagePortData : public TransferData {
169 public:
170 explicit MessagePortData(MessagePort* owner);
171 ~MessagePortData() override;
172
173 MessagePortData(MessagePortData&& other) = delete;
174 MessagePortData& operator=(MessagePortData&& other) = delete;
175 MessagePortData(const MessagePortData& other) = delete;
176 MessagePortData& operator=(const MessagePortData& other) = delete;
177
178 // Add a message to the incoming queue and notify the receiver.
179 // This may be called from any thread.
180 void AddToIncomingQueue(std::shared_ptr<Message> message);
181 v8::Maybe<bool> Dispatch(
182 std::shared_ptr<Message> message,
183 std::string* error = nullptr);
184
185 // Turns `a` and `b` into siblings, i.e. connects the sending side of one
186 // to the receiving side of the other. This is not thread-safe.
187 static void Entangle(MessagePortData* a, MessagePortData* b);
188
189 // Removes any possible sibling. This is thread-safe (it acquires both
190 // `sibling_mutex_` and `mutex_`), and has to be because it is called once
191 // the corresponding JS handle handle wants to close
192 // which can happen on either side of a worker.
193 void Disentangle();
194
195 void MemoryInfo(MemoryTracker* tracker) const override;
196 BaseObjectPtr<BaseObject> Deserialize(
197 Environment* env,
198 v8::Local<v8::Context> context,
199 std::unique_ptr<TransferData> self) override;
200
201 SET_MEMORY_INFO_NAME(MessagePortData)inline std::string MemoryInfoName() const override { return "MessagePortData"
; }
202 SET_SELF_SIZE(MessagePortData)inline size_t SelfSize() const override { return sizeof(MessagePortData
); }
203
204 private:
205 // This mutex protects all fields below it, with the exception of
206 // sibling_.
207 mutable Mutex mutex_;
208 // TODO(addaleax): Make this a std::variant<std::shared_ptr, std::unique_ptr>
209 // once that is available with C++17, because std::shared_ptr comes with
210 // overhead that is only necessary for BroadcastChannel.
211 std::deque<std::shared_ptr<Message>> incoming_messages_;
212 MessagePort* owner_ = nullptr;
213 std::shared_ptr<SiblingGroup> group_;
214 friend class MessagePort;
215 friend class SiblingGroup;
216};
217
218// A message port that receives messages from other threads, including
219// the uv_async_t handle that is used to notify the current event loop of
220// new incoming messages.
221class MessagePort : public HandleWrap {
222 private:
223 // Create a new MessagePort. The `context` argument specifies the Context
224 // instance that is used for creating the values emitted from this port.
225 // This is called by MessagePort::New(), which is the public API used for
226 // creating MessagePort instances.
227 MessagePort(Environment* env,
228 v8::Local<v8::Context> context,
229 v8::Local<v8::Object> wrap);
230
231 public:
232 ~MessagePort() override;
233
234 // Create a new message port instance, optionally over an existing
235 // `MessagePortData` object.
236 static MessagePort* New(Environment* env,
237 v8::Local<v8::Context> context,
238 std::unique_ptr<MessagePortData> data = {},
239 std::shared_ptr<SiblingGroup> sibling_group = {});
240
241 // Send a message, i.e. deliver it into the sibling's incoming queue.
242 // If this port is closed, or if there is no sibling, this message is
243 // serialized with transfers, then silently discarded.
244 v8::Maybe<bool> PostMessage(Environment* env,
245 v8::Local<v8::Context> context,
246 v8::Local<v8::Value> message,
247 const TransferList& transfer);
248
249 // Start processing messages on this port as a receiving end.
250 void Start();
251 // Stop processing messages on this port as a receiving end.
252 void Stop();
253
254 /* constructor */
255 static void New(const v8::FunctionCallbackInfo<v8::Value>& args);
256 /* prototype methods */
257 static void PostMessage(const v8::FunctionCallbackInfo<v8::Value>& args);
258 static void Start(const v8::FunctionCallbackInfo<v8::Value>& args);
259 static void Stop(const v8::FunctionCallbackInfo<v8::Value>& args);
260 static void CheckType(const v8::FunctionCallbackInfo<v8::Value>& args);
261 static void Drain(const v8::FunctionCallbackInfo<v8::Value>& args);
262 static void ReceiveMessage(const v8::FunctionCallbackInfo<v8::Value>& args);
263
264 /* static */
265 static void MoveToContext(const v8::FunctionCallbackInfo<v8::Value>& args);
266
267 // Turns `a` and `b` into siblings, i.e. connects the sending side of one
268 // to the receiving side of the other. This is not thread-safe.
269 static void Entangle(MessagePort* a, MessagePort* b);
270 static void Entangle(MessagePort* a, MessagePortData* b);
271
272 // Detach this port's data for transferring. After this, the MessagePortData
273 // is no longer associated with this handle, although it can still receive
274 // messages.
275 std::unique_ptr<MessagePortData> Detach();
276
277 void Close(
278 v8::Local<v8::Value> close_callback = v8::Local<v8::Value>()) override;
279
280 // Returns true if either data_ has been freed, or if the handle is being
281 // closed. Equivalent to the [[Detached]] internal slot in the HTML Standard.
282 //
283 // If checking if a JavaScript MessagePort object is detached, this method
284 // alone is often not enough, since the backing C++ MessagePort object may
285 // have been deleted already. For all intents and purposes, an object with a
286 // NULL pointer to the C++ MessagePort object is also detached.
287 inline bool IsDetached() const;
288
289 TransferMode GetTransferMode() const override;
290 std::unique_ptr<TransferData> TransferForMessaging() override;
291
292 void MemoryInfo(MemoryTracker* tracker) const override;
293 SET_MEMORY_INFO_NAME(MessagePort)inline std::string MemoryInfoName() const override { return "MessagePort"
; }
294 SET_SELF_SIZE(MessagePort)inline size_t SelfSize() const override { return sizeof(MessagePort
); }
295
296 private:
297 enum class MessageProcessingMode {
298 kNormalOperation,
299 kForceReadMessages
300 };
301
302 void OnClose() override;
303 void OnMessage(MessageProcessingMode mode);
304 void TriggerAsync();
305 v8::MaybeLocal<v8::Value> ReceiveMessage(
306 v8::Local<v8::Context> context,
307 MessageProcessingMode mode,
308 v8::Local<v8::Value>* port_list = nullptr);
309
310 std::unique_ptr<MessagePortData> data_ = nullptr;
311 bool receiving_messages_ = false;
312 uv_async_t async_;
313 v8::Global<v8::Function> emit_message_fn_;
314
315 friend class MessagePortData;
316};
317
318// Provide a base class from which JS classes that should be transferable or
319// cloneable by postMesssage() can inherit.
320// See e.g. FileHandle in internal/fs/promises.js for an example.
321class JSTransferable : public BaseObject {
322 public:
323 JSTransferable(Environment* env, v8::Local<v8::Object> obj);
324 static void New(const v8::FunctionCallbackInfo<v8::Value>& args);
325
326 TransferMode GetTransferMode() const override;
327 std::unique_ptr<TransferData> TransferForMessaging() override;
328 std::unique_ptr<TransferData> CloneForMessaging() const override;
329 v8::Maybe<std::vector<BaseObjectPtr<BaseObject>>>
330 NestedTransferables() const override;
331 v8::Maybe<bool> FinalizeTransferRead(
332 v8::Local<v8::Context> context,
333 v8::ValueDeserializer* deserializer) override;
334
335 SET_NO_MEMORY_INFO()inline void MemoryInfo(node::MemoryTracker* tracker) const override
{}
336 SET_MEMORY_INFO_NAME(JSTransferable)inline std::string MemoryInfoName() const override { return "JSTransferable"
; }
337 SET_SELF_SIZE(JSTransferable)inline size_t SelfSize() const override { return sizeof(JSTransferable
); }
338
339 private:
340 std::unique_ptr<TransferData> TransferOrClone(TransferMode mode) const;
341
342 class Data : public TransferData {
343 public:
344 Data(std::string&& deserialize_info, v8::Global<v8::Value>&& data);
345
346 BaseObjectPtr<BaseObject> Deserialize(
347 Environment* env,
348 v8::Local<v8::Context> context,
349 std::unique_ptr<TransferData> self) override;
350 v8::Maybe<bool> FinalizeTransferWrite(
351 v8::Local<v8::Context> context,
352 v8::ValueSerializer* serializer) override;
353
354 SET_NO_MEMORY_INFO()inline void MemoryInfo(node::MemoryTracker* tracker) const override
{}
355 SET_MEMORY_INFO_NAME(JSTransferableTransferData)inline std::string MemoryInfoName() const override { return "JSTransferableTransferData"
; }
356 SET_SELF_SIZE(Data)inline size_t SelfSize() const override { return sizeof(Data)
; }
357
358 private:
359 std::string deserialize_info_;
360 v8::Global<v8::Value> data_;
361 };
362};
363
364v8::Local<v8::FunctionTemplate> GetMessagePortConstructorTemplate(
365 Environment* env);
366
367} // namespace worker
368} // namespace node
369
370#endif // defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS
371
372
373#endif // SRC_NODE_MESSAGING_H_