GCC Code Coverage Report
Directory: ./ Exec Total Coverage
File: stream_base.cc Lines: 325 348 93.4 %
Date: 2022-10-31 03:21:21 Branches: 149 212 70.3 %

Line Branch Exec Source
1
#include "stream_base.h"  // NOLINT(build/include_inline)
2
#include "stream_base-inl.h"
3
#include "stream_wrap.h"
4
5
#include "env-inl.h"
6
#include "js_stream.h"
7
#include "node.h"
8
#include "node_buffer.h"
9
#include "node_errors.h"
10
#include "node_external_reference.h"
11
#include "string_bytes.h"
12
#include "util-inl.h"
13
#include "v8.h"
14
15
#include <climits>  // INT_MAX
16
17
namespace node {
18
19
using v8::Array;
20
using v8::ArrayBuffer;
21
using v8::BackingStore;
22
using v8::ConstructorBehavior;
23
using v8::Context;
24
using v8::DontDelete;
25
using v8::DontEnum;
26
using v8::External;
27
using v8::Function;
28
using v8::FunctionCallbackInfo;
29
using v8::FunctionTemplate;
30
using v8::HandleScope;
31
using v8::Integer;
32
using v8::Isolate;
33
using v8::Local;
34
using v8::MaybeLocal;
35
using v8::Object;
36
using v8::PropertyAttribute;
37
using v8::ReadOnly;
38
using v8::SideEffectType;
39
using v8::Signature;
40
using v8::String;
41
using v8::Value;
42
43
template int StreamBase::WriteString<ASCII>(
44
    const FunctionCallbackInfo<Value>& args);
45
template int StreamBase::WriteString<UTF8>(
46
    const FunctionCallbackInfo<Value>& args);
47
template int StreamBase::WriteString<UCS2>(
48
    const FunctionCallbackInfo<Value>& args);
49
template int StreamBase::WriteString<LATIN1>(
50
    const FunctionCallbackInfo<Value>& args);
51
52
53
54938
int StreamBase::ReadStartJS(const FunctionCallbackInfo<Value>& args) {
54
54938
  return ReadStart();
55
}
56
57
58
13384
int StreamBase::ReadStopJS(const FunctionCallbackInfo<Value>& args) {
59
13384
  return ReadStop();
60
}
61
62
12
int StreamBase::UseUserBuffer(const FunctionCallbackInfo<Value>& args) {
63
12
  CHECK(Buffer::HasInstance(args[0]));
64
65
24
  uv_buf_t buf = uv_buf_init(Buffer::Data(args[0]), Buffer::Length(args[0]));
66
12
  PushStreamListener(new CustomBufferJSListener(buf));
67
12
  return 0;
68
}
69
70
31438
int StreamBase::Shutdown(const FunctionCallbackInfo<Value>& args) {
71
31438
  CHECK(args[0]->IsObject());
72
31438
  Local<Object> req_wrap_obj = args[0].As<Object>();
73
74
31438
  return Shutdown(req_wrap_obj);
75
}
76
77
88707
void StreamBase::SetWriteResult(const StreamWriteResult& res) {
78
88707
  env_->stream_base_state()[kBytesWritten] = res.bytes;
79
88707
  env_->stream_base_state()[kLastWriteWasAsync] = res.async;
80
88707
}
81
82
13448
int StreamBase::Writev(const FunctionCallbackInfo<Value>& args) {
83
13448
  Environment* env = Environment::GetCurrent(args);
84
13448
  Isolate* isolate = env->isolate();
85
13448
  Local<Context> context = env->context();
86
87
13448
  CHECK(args[0]->IsObject());
88
13448
  CHECK(args[1]->IsArray());
89
90
26896
  Local<Object> req_wrap_obj = args[0].As<Object>();
91
26896
  Local<Array> chunks = args[1].As<Array>();
92
13448
  bool all_buffers = args[2]->IsTrue();
93
94
  size_t count;
95
13448
  if (all_buffers)
96
278
    count = chunks->Length();
97
  else
98
13170
    count = chunks->Length() >> 1;
99
100
26896
  MaybeStackBuffer<uv_buf_t, 16> bufs(count);
101
102
13448
  size_t storage_size = 0;
103
  size_t offset;
104
105
13448
  if (!all_buffers) {
106
    // Determine storage size first
107
88596
    for (size_t i = 0; i < count; i++) {
108
      Local<Value> chunk;
109
150852
      if (!chunks->Get(context, i * 2).ToLocal(&chunk))
110
        return -1;
111
112
75426
      if (Buffer::HasInstance(chunk))
113
33028
        continue;
114
        // Buffer chunk, no additional storage required
115
116
      // String chunk
117
      Local<String> string;
118
84796
      if (!chunk->ToString(context).ToLocal(&string))
119
        return -1;
120
      Local<Value> next_chunk;
121
84796
      if (!chunks->Get(context, i * 2 + 1).ToLocal(&next_chunk))
122
        return -1;
123
42398
      enum encoding encoding = ParseEncoding(isolate, next_chunk);
124
      size_t chunk_size;
125
8471
      if ((encoding == UTF8 &&
126
8471
             string->Length() > 65535 &&
127

84824
             !StringBytes::Size(isolate, string, encoding).To(&chunk_size)) ||
128
84796
              !StringBytes::StorageSize(isolate, string, encoding)
129
42398
                  .To(&chunk_size)) {
130
        return -1;
131
      }
132
42398
      storage_size += chunk_size;
133
    }
134
135
13170
    if (storage_size > INT_MAX)
136
      return UV_ENOBUFS;
137
  } else {
138
26673
    for (size_t i = 0; i < count; i++) {
139
      Local<Value> chunk;
140
52790
      if (!chunks->Get(context, i).ToLocal(&chunk))
141
        return -1;
142
26395
      bufs[i].base = Buffer::Data(chunk);
143
26395
      bufs[i].len = Buffer::Length(chunk);
144
    }
145
  }
146
147
13448
  std::unique_ptr<BackingStore> bs;
148
13448
  if (storage_size > 0) {
149
13124
    NoArrayBufferZeroFillScope no_zero_fill_scope(env->isolate_data());
150
13124
    bs = ArrayBuffer::NewBackingStore(isolate, storage_size);
151
  }
152
153
13448
  offset = 0;
154
13448
  if (!all_buffers) {
155
88596
    for (size_t i = 0; i < count; i++) {
156
      Local<Value> chunk;
157
150852
      if (!chunks->Get(context, i * 2).ToLocal(&chunk))
158
        return -1;
159
160
      // Write buffer
161
75426
      if (Buffer::HasInstance(chunk)) {
162
33028
        bufs[i].base = Buffer::Data(chunk);
163
33028
        bufs[i].len = Buffer::Length(chunk);
164
33028
        continue;
165
      }
166
167
      // Write string
168
42398
      CHECK_LE(offset, storage_size);
169
      char* str_storage =
170
42398
          static_cast<char*>(bs ? bs->Data() : nullptr) + offset;
171
42398
      size_t str_size = (bs ? bs->ByteLength() : 0) - offset;
172
173
      Local<String> string;
174
84796
      if (!chunk->ToString(context).ToLocal(&string))
175
        return -1;
176
      Local<Value> next_chunk;
177
84796
      if (!chunks->Get(context, i * 2 + 1).ToLocal(&next_chunk))
178
        return -1;
179
42398
      enum encoding encoding = ParseEncoding(isolate, next_chunk);
180
42398
      str_size = StringBytes::Write(isolate,
181
                                    str_storage,
182
                                    str_size,
183
                                    string,
184
                                    encoding);
185
42398
      bufs[i].base = str_storage;
186
42398
      bufs[i].len = str_size;
187
42398
      offset += str_size;
188
    }
189
  }
190
191
13448
  StreamWriteResult res = Write(*bufs, count, nullptr, req_wrap_obj);
192
13448
  SetWriteResult(res);
193

13448
  if (res.wrap != nullptr && storage_size > 0)
194
340
    res.wrap->SetBackingStore(std::move(bs));
195
13448
  return res.err;
196
}
197
198
199
46172
int StreamBase::WriteBuffer(const FunctionCallbackInfo<Value>& args) {
200
46172
  CHECK(args[0]->IsObject());
201
202
46172
  Environment* env = Environment::GetCurrent(args);
203
204
46172
  if (!args[1]->IsUint8Array()) {
205
1
    node::THROW_ERR_INVALID_ARG_TYPE(env, "Second argument must be a buffer");
206
1
    return 0;
207
  }
208
209
92342
  Local<Object> req_wrap_obj = args[0].As<Object>();
210
  uv_buf_t buf;
211
46171
  buf.base = Buffer::Data(args[1]);
212
46171
  buf.len = Buffer::Length(args[1]);
213
214
46171
  uv_stream_t* send_handle = nullptr;
215
216

46171
  if (args[2]->IsObject() && IsIPCPipe()) {
217
    Local<Object> send_handle_obj = args[2].As<Object>();
218
219
    HandleWrap* wrap;
220
    ASSIGN_OR_RETURN_UNWRAP(&wrap, send_handle_obj, UV_EINVAL);
221
    send_handle = reinterpret_cast<uv_stream_t*>(wrap->GetHandle());
222
    // Reference LibuvStreamWrap instance to prevent it from being garbage
223
    // collected before `AfterWrite` is called.
224
    if (req_wrap_obj->Set(env->context(),
225
                          env->handle_string(),
226
                          send_handle_obj).IsNothing()) {
227
      return -1;
228
    }
229
  }
230
231
46171
  StreamWriteResult res = Write(&buf, 1, send_handle, req_wrap_obj);
232
46171
  SetWriteResult(res);
233
234
46171
  return res.err;
235
}
236
237
238
template <enum encoding enc>
239
58176
int StreamBase::WriteString(const FunctionCallbackInfo<Value>& args) {
240
58176
  Environment* env = Environment::GetCurrent(args);
241
58176
  Isolate* isolate = env->isolate();
242
58176
  CHECK(args[0]->IsObject());
243
116352
  CHECK(args[1]->IsString());
244
245
116352
  Local<Object> req_wrap_obj = args[0].As<Object>();
246
116352
  Local<String> string = args[1].As<String>();
247
  Local<Object> send_handle_obj;
248
58176
  if (args[2]->IsObject())
249
512
    send_handle_obj = args[2].As<Object>();
250
251
  // Compute the size of the storage that the string will be flattened into.
252
  // For UTF8 strings that are very long, go ahead and take the hit for
253
  // computing their actual size, rather than tripling the storage.
254
  size_t storage_size;
255
58176
  if ((enc == UTF8 &&
256
51420
         string->Length() > 65535 &&
257

109880
         !StringBytes::Size(isolate, string, enc).To(&storage_size)) ||
258

167772
          !StringBytes::StorageSize(isolate, string, enc).To(&storage_size)) {
259
    return -1;
260
  }
261
262
58176
  if (storage_size > INT_MAX)
263
    return UV_ENOBUFS;
264
265
  // Try writing immediately if write size isn't too big
266
  char stack_storage[16384];  // 16kb
267
  size_t data_size;
268
58176
  size_t synchronously_written = 0;
269
  uv_buf_t buf;
270
271
114146
  bool try_write = storage_size <= sizeof(stack_storage) &&
272

67030
                   (!IsIPCPipe() || send_handle_obj.IsEmpty());
273
58176
  if (try_write) {
274
55718
    data_size = StringBytes::Write(isolate,
275
                                   stack_storage,
276
                                   storage_size,
277
                                   string,
278
                                   enc);
279
55718
    buf = uv_buf_init(stack_storage, data_size);
280
281
55718
    uv_buf_t* bufs = &buf;
282
55718
    size_t count = 1;
283
55718
    const int err = DoTryWrite(&bufs, &count);
284
    // Keep track of the bytes written here, because we're taking a shortcut
285
    // by using `DoTryWrite()` directly instead of using the utilities
286
    // provided by `Write()`.
287
55718
    synchronously_written = count == 0 ? data_size : data_size - buf.len;
288
55718
    bytes_written_ += synchronously_written;
289
290
    // Immediate failure or success
291

55718
    if (err != 0 || count == 0) {
292
51954
      SetWriteResult(StreamWriteResult { false, err, nullptr, data_size, {} });
293
51954
      return err;
294
    }
295
296
    // Partial write
297
3764
    CHECK_EQ(count, 1);
298
  }
299
300
6222
  std::unique_ptr<BackingStore> bs;
301
302
6222
  if (try_write) {
303
    // Copy partial data
304
3764
    NoArrayBufferZeroFillScope no_zero_fill_scope(env->isolate_data());
305
3764
    bs = ArrayBuffer::NewBackingStore(isolate, buf.len);
306
3764
    memcpy(static_cast<char*>(bs->Data()), buf.base, buf.len);
307
3764
    data_size = buf.len;
308
  } else {
309
    // Write it
310
2458
    NoArrayBufferZeroFillScope no_zero_fill_scope(env->isolate_data());
311
2458
    bs = ArrayBuffer::NewBackingStore(isolate, storage_size);
312
2458
    data_size = StringBytes::Write(isolate,
313
2458
                                   static_cast<char*>(bs->Data()),
314
                                   storage_size,
315
                                   string,
316
                                   enc);
317
  }
318
319
6222
  CHECK_LE(data_size, storage_size);
320
321
6222
  buf = uv_buf_init(static_cast<char*>(bs->Data()), data_size);
322
323
6222
  uv_stream_t* send_handle = nullptr;
324
325

6482
  if (IsIPCPipe() && !send_handle_obj.IsEmpty()) {
326
    HandleWrap* wrap;
327
256
    ASSIGN_OR_RETURN_UNWRAP(&wrap, send_handle_obj, UV_EINVAL);
328
256
    send_handle = reinterpret_cast<uv_stream_t*>(wrap->GetHandle());
329
    // Reference LibuvStreamWrap instance to prevent it from being garbage
330
    // collected before `AfterWrite` is called.
331
512
    if (req_wrap_obj->Set(env->context(),
332
                          env->handle_string(),
333
256
                          send_handle_obj).IsNothing()) {
334
      return -1;
335
    }
336
  }
337
338
6222
  StreamWriteResult res = Write(&buf, 1, send_handle, req_wrap_obj, try_write);
339
6222
  res.bytes += synchronously_written;
340
341
6222
  SetWriteResult(res);
342
6222
  if (res.wrap != nullptr)
343
6178
    res.wrap->SetBackingStore(std::move(bs));
344
345
6222
  return res.err;
346
}
347
348
349
65317
MaybeLocal<Value> StreamBase::CallJSOnreadMethod(ssize_t nread,
350
                                                 Local<ArrayBuffer> ab,
351
                                                 size_t offset,
352
                                                 StreamBaseJSChecks checks) {
353
65317
  Environment* env = env_;
354
355
  DCHECK_EQ(static_cast<int32_t>(nread), nread);
356
  DCHECK_LE(offset, INT32_MAX);
357
358
65317
  if (checks == DONT_SKIP_NREAD_CHECKS) {
359
63227
    if (ab.IsEmpty()) {
360
      DCHECK_EQ(offset, 0);
361
      DCHECK_LE(nread, 0);
362
    } else {
363
      DCHECK_GE(nread, 0);
364
    }
365
  }
366
367
65317
  env->stream_base_state()[kReadBytesOrError] = static_cast<int32_t>(nread);
368
65317
  env->stream_base_state()[kArrayBufferOffset] = offset;
369
370
  Local<Value> argv[] = {
371
109525
    ab.IsEmpty() ? Undefined(env->isolate()).As<Value>() : ab.As<Value>()
372
65317
  };
373
374
65317
  AsyncWrap* wrap = GetAsyncWrap();
375
65317
  CHECK_NOT_NULL(wrap);
376
65317
  Local<Value> onread = wrap->object()->GetInternalField(
377
130634
      StreamBase::kOnReadFunctionField);
378
65317
  CHECK(onread->IsFunction());
379
130634
  return wrap->MakeCallback(onread.As<Function>(), arraysize(argv), argv);
380
}
381
382
383
3707
bool StreamBase::IsIPCPipe() {
384
3707
  return false;
385
}
386
387
388
int StreamBase::GetFD() {
389
  return -1;
390
}
391
392
393
51049
Local<Object> StreamBase::GetObject() {
394
51049
  return GetAsyncWrap()->object();
395
}
396
397
34368
void StreamBase::AddMethod(Environment* env,
398
                           Local<Signature> signature,
399
                           enum PropertyAttribute attributes,
400
                           Local<FunctionTemplate> t,
401
                           JSMethodFunction* stream_method,
402
                           Local<String> string) {
403
34368
  Isolate* isolate = env->isolate();
404
  Local<FunctionTemplate> templ =
405
      NewFunctionTemplate(isolate,
406
                          stream_method,
407
                          signature,
408
                          ConstructorBehavior::kThrow,
409
34368
                          SideEffectType::kHasNoSideEffect);
410
68736
  t->PrototypeTemplate()->SetAccessorProperty(
411
      string, templ, Local<FunctionTemplate>(), attributes);
412
34368
}
413
414
8592
void StreamBase::AddMethods(Environment* env, Local<FunctionTemplate> t) {
415
8592
  Isolate* isolate = env->isolate();
416
8592
  HandleScope scope(isolate);
417
418
8592
  enum PropertyAttribute attributes =
419
      static_cast<PropertyAttribute>(ReadOnly | DontDelete | DontEnum);
420
8592
  Local<Signature> sig = Signature::New(isolate, t);
421
422
8592
  AddMethod(env, sig, attributes, t, GetFD, env->fd_string());
423
8592
  AddMethod(
424
      env, sig, attributes, t, GetExternal, env->external_stream_string());
425
8592
  AddMethod(env, sig, attributes, t, GetBytesRead, env->bytes_read_string());
426
8592
  AddMethod(
427
      env, sig, attributes, t, GetBytesWritten, env->bytes_written_string());
428
8592
  SetProtoMethod(isolate, t, "readStart", JSMethod<&StreamBase::ReadStartJS>);
429
8592
  SetProtoMethod(isolate, t, "readStop", JSMethod<&StreamBase::ReadStopJS>);
430
8592
  SetProtoMethod(isolate, t, "shutdown", JSMethod<&StreamBase::Shutdown>);
431
8592
  SetProtoMethod(
432
      isolate, t, "useUserBuffer", JSMethod<&StreamBase::UseUserBuffer>);
433
8592
  SetProtoMethod(isolate, t, "writev", JSMethod<&StreamBase::Writev>);
434
8592
  SetProtoMethod(isolate, t, "writeBuffer", JSMethod<&StreamBase::WriteBuffer>);
435
8592
  SetProtoMethod(isolate,
436
                 t,
437
                 "writeAsciiString",
438
                 JSMethod<&StreamBase::WriteString<ASCII>>);
439
8592
  SetProtoMethod(
440
      isolate, t, "writeUtf8String", JSMethod<&StreamBase::WriteString<UTF8>>);
441
8592
  SetProtoMethod(
442
      isolate, t, "writeUcs2String", JSMethod<&StreamBase::WriteString<UCS2>>);
443
8592
  SetProtoMethod(isolate,
444
                 t,
445
                 "writeLatin1String",
446
                 JSMethod<&StreamBase::WriteString<LATIN1>>);
447
34368
  t->PrototypeTemplate()->Set(FIXED_ONE_BYTE_STRING(isolate, "isStreamBase"),
448
                              True(isolate));
449
25776
  t->PrototypeTemplate()->SetAccessor(
450
      FIXED_ONE_BYTE_STRING(isolate, "onread"),
451
      BaseObject::InternalFieldGet<StreamBase::kOnReadFunctionField>,
452
      BaseObject::InternalFieldSet<StreamBase::kOnReadFunctionField,
453
                                   &Value::IsFunction>);
454
8592
}
455
456
11162
void StreamBase::RegisterExternalReferences(
457
    ExternalReferenceRegistry* registry) {
458
  // This function is called by a single thread during start up, so it is safe
459
  // to use a local static variable here.
460
  static bool is_registered = false;
461
11162
  if (is_registered) return;
462
5581
  registry->Register(GetFD);
463
5581
  registry->Register(GetExternal);
464
5581
  registry->Register(GetBytesRead);
465
5581
  registry->Register(GetBytesWritten);
466
5581
  registry->Register(JSMethod<&StreamBase::ReadStartJS>);
467
5581
  registry->Register(JSMethod<&StreamBase::ReadStopJS>);
468
5581
  registry->Register(JSMethod<&StreamBase::Shutdown>);
469
5581
  registry->Register(JSMethod<&StreamBase::UseUserBuffer>);
470
5581
  registry->Register(JSMethod<&StreamBase::Writev>);
471
5581
  registry->Register(JSMethod<&StreamBase::WriteBuffer>);
472
5581
  registry->Register(JSMethod<&StreamBase::WriteString<ASCII>>);
473
5581
  registry->Register(JSMethod<&StreamBase::WriteString<UTF8>>);
474
5581
  registry->Register(JSMethod<&StreamBase::WriteString<UCS2>>);
475
5581
  registry->Register(JSMethod<&StreamBase::WriteString<LATIN1>>);
476
5581
  registry->Register(
477
      BaseObject::InternalFieldGet<StreamBase::kOnReadFunctionField>);
478
5581
  registry->Register(
479
      BaseObject::InternalFieldSet<StreamBase::kOnReadFunctionField,
480
                                   &Value::IsFunction>);
481
5581
  is_registered = true;
482
}
483
484
1523
void StreamBase::GetFD(const FunctionCallbackInfo<Value>& args) {
485
  // Mimic implementation of StreamBase::GetFD() and UDPWrap::GetFD().
486
3046
  StreamBase* wrap = StreamBase::FromObject(args.This().As<Object>());
487
1523
  if (wrap == nullptr) return args.GetReturnValue().Set(UV_EINVAL);
488
489
1523
  if (!wrap->IsAlive()) return args.GetReturnValue().Set(UV_EINVAL);
490
491
3046
  args.GetReturnValue().Set(wrap->GetFD());
492
}
493
494
35609
void StreamBase::GetBytesRead(const FunctionCallbackInfo<Value>& args) {
495
71218
  StreamBase* wrap = StreamBase::FromObject(args.This().As<Object>());
496
35611
  if (wrap == nullptr) return args.GetReturnValue().Set(0);
497
498
  // uint64_t -> double. 53bits is enough for all real cases.
499
71216
  args.GetReturnValue().Set(static_cast<double>(wrap->bytes_read_));
500
}
501
502
35610
void StreamBase::GetBytesWritten(const FunctionCallbackInfo<Value>& args) {
503
71220
  StreamBase* wrap = StreamBase::FromObject(args.This().As<Object>());
504
35610
  if (wrap == nullptr) return args.GetReturnValue().Set(0);
505
506
  // uint64_t -> double. 53bits is enough for all real cases.
507
71220
  args.GetReturnValue().Set(static_cast<double>(wrap->bytes_written_));
508
}
509
510
2
void StreamBase::GetExternal(const FunctionCallbackInfo<Value>& args) {
511
4
  StreamBase* wrap = StreamBase::FromObject(args.This().As<Object>());
512
2
  if (wrap == nullptr) return;
513
514
2
  Local<External> ext = External::New(args.GetIsolate(), wrap);
515
4
  args.GetReturnValue().Set(ext);
516
}
517
518
template <int (StreamBase::*Method)(const FunctionCallbackInfo<Value>& args)>
519
376964
void StreamBase::JSMethod(const FunctionCallbackInfo<Value>& args) {
520
753928
  StreamBase* wrap = StreamBase::FromObject(args.Holder().As<Object>());
521
376968
  if (wrap == nullptr) return;
522
523
376972
  if (!wrap->IsAlive()) return args.GetReturnValue().Set(UV_EINVAL);
524
525
376960
  AsyncHooks::DefaultTriggerAsyncIdScope trigger_scope(wrap->GetAsyncWrap());
526
753920
  args.GetReturnValue().Set((wrap->*Method)(args));
527
}
528
529
7366
int StreamResource::DoTryWrite(uv_buf_t** bufs, size_t* count) {
530
  // No TryWrite by default
531
7366
  return 0;
532
}
533
534
535
47516
const char* StreamResource::Error() const {
536
47516
  return nullptr;
537
}
538
539
540
void StreamResource::ClearError() {
541
  // No-op
542
}
543
544
545
33976
uv_buf_t EmitToJSStreamListener::OnStreamAlloc(size_t suggested_size) {
546
33976
  CHECK_NOT_NULL(stream_);
547
33976
  Environment* env = static_cast<StreamBase*>(stream_)->stream_env();
548
33976
  return env->allocate_managed_buffer(suggested_size);
549
}
550
551
49387
void EmitToJSStreamListener::OnStreamRead(ssize_t nread, const uv_buf_t& buf_) {
552
49387
  CHECK_NOT_NULL(stream_);
553
49387
  StreamBase* stream = static_cast<StreamBase*>(stream_);
554
49387
  Environment* env = stream->stream_env();
555
49387
  Isolate* isolate = env->isolate();
556
49387
  HandleScope handle_scope(isolate);
557
49387
  Context::Scope context_scope(env->context());
558
49387
  std::unique_ptr<BackingStore> bs = env->release_managed_buffer(buf_);
559
560
49387
  if (nread <= 0)  {
561
20011
    if (nread < 0)
562
20008
      stream->CallJSOnreadMethod(nread, Local<ArrayBuffer>());
563
19952
    return;
564
  }
565
566
29376
  CHECK_LE(static_cast<size_t>(nread), bs->ByteLength());
567
29376
  bs = BackingStore::Reallocate(isolate, std::move(bs), nread);
568
569
29376
  stream->CallJSOnreadMethod(nread, ArrayBuffer::New(isolate, std::move(bs)));
570
}
571
572
573
2090
uv_buf_t CustomBufferJSListener::OnStreamAlloc(size_t suggested_size) {
574
2090
  return buffer_;
575
}
576
577
578
2096
void CustomBufferJSListener::OnStreamRead(ssize_t nread, const uv_buf_t& buf) {
579
2096
  CHECK_NOT_NULL(stream_);
580
581
2096
  StreamBase* stream = static_cast<StreamBase*>(stream_);
582
2096
  Environment* env = stream->stream_env();
583
2096
  HandleScope handle_scope(env->isolate());
584
2096
  Context::Scope context_scope(env->context());
585
586
  // To deal with the case where POLLHUP is received and UV_EOF is returned, as
587
  // libuv returns an empty buffer (on unices only).
588

2096
  if (nread == UV_EOF && buf.base == nullptr) {
589
6
    stream->CallJSOnreadMethod(nread, Local<ArrayBuffer>());
590
6
    return;
591
  }
592
593
2090
  CHECK_EQ(buf.base, buffer_.base);
594
595
  MaybeLocal<Value> ret = stream->CallJSOnreadMethod(nread,
596
                             Local<ArrayBuffer>(),
597
                             0,
598
2090
                             StreamBase::SKIP_NREAD_CHECKS);
599
  Local<Value> next_buf_v;
600

6270
  if (ret.ToLocal(&next_buf_v) && !next_buf_v->IsUndefined()) {
601
24
    buffer_.base = Buffer::Data(next_buf_v);
602
24
    buffer_.len = Buffer::Length(next_buf_v);
603
  }
604
}
605
606
607
12875
void ReportWritesToJSStreamListener::OnStreamAfterReqFinished(
608
    StreamReq* req_wrap, int status) {
609
12875
  StreamBase* stream = static_cast<StreamBase*>(stream_);
610
12875
  Environment* env = stream->stream_env();
611
12875
  if (env->is_stopping()) return;
612
12874
  AsyncWrap* async_wrap = req_wrap->GetAsyncWrap();
613
25747
  HandleScope handle_scope(env->isolate());
614
12874
  Context::Scope context_scope(env->context());
615
12874
  CHECK(!async_wrap->persistent().IsEmpty());
616
12874
  Local<Object> req_wrap_obj = async_wrap->object();
617
618
  Local<Value> argv[] = {
619
    Integer::New(env->isolate(), status),
620
12874
    stream->GetObject(),
621
    Undefined(env->isolate())
622
38622
  };
623
624
12874
  const char* msg = stream->Error();
625
12874
  if (msg != nullptr) {
626
    argv[2] = OneByteString(env->isolate(), msg);
627
    stream->ClearError();
628
  }
629
630
38622
  if (req_wrap_obj->Has(env->context(), env->oncomplete_string()).FromJust())
631
12873
    async_wrap->MakeCallback(env->oncomplete_string(), arraysize(argv), argv);
632
}
633
634
5232
void ReportWritesToJSStreamListener::OnStreamAfterWrite(
635
    WriteWrap* req_wrap, int status) {
636
5232
  OnStreamAfterReqFinished(req_wrap, status);
637
5231
}
638
639
7643
void ReportWritesToJSStreamListener::OnStreamAfterShutdown(
640
    ShutdownWrap* req_wrap, int status) {
641
7643
  OnStreamAfterReqFinished(req_wrap, status);
642
7643
}
643
644
7643
void ShutdownWrap::OnDone(int status) {
645
7643
  stream()->EmitAfterShutdown(this, status);
646
7643
  Dispose();
647
7643
}
648
649
7521
void WriteWrap::OnDone(int status) {
650
7521
  stream()->EmitAfterWrite(this, status);
651
7520
  Dispose();
652
7520
}
653
654
441408
StreamListener::~StreamListener() {
655
220704
  if (stream_ != nullptr)
656
184322
    stream_->RemoveStreamListener(this);
657
}
658
659
2635
void StreamListener::OnStreamAfterShutdown(ShutdownWrap* w, int status) {
660
2635
  CHECK_NOT_NULL(previous_listener_);
661
2635
  previous_listener_->OnStreamAfterShutdown(w, status);
662
2635
}
663
664
3979
void StreamListener::OnStreamAfterWrite(WriteWrap* w, int status) {
665
3979
  CHECK_NOT_NULL(previous_listener_);
666
3979
  previous_listener_->OnStreamAfterWrite(w, status);
667
3979
}
668
669
135228
StreamResource::~StreamResource() {
670
136364
  while (listener_ != nullptr) {
671
1136
    StreamListener* listener = listener_;
672
1136
    listener->OnStreamDestroy();
673
    // Remove the listener if it didn’t remove itself. This makes the logic
674
    // in `OnStreamDestroy()` implementations easier, because they
675
    // may call generic cleanup functions which can just remove the
676
    // listener unconditionally.
677
1136
    if (listener == listener_)
678
1112
      RemoveStreamListener(listener_);
679
  }
680
}
681
682
13
ShutdownWrap* StreamBase::CreateShutdownWrap(
683
    Local<Object> object) {
684
13
  auto* wrap = new SimpleShutdownWrap<AsyncWrap>(this, object);
685
13
  wrap->MakeWeak();
686
13
  return wrap;
687
}
688
689
7366
WriteWrap* StreamBase::CreateWriteWrap(
690
    Local<Object> object) {
691
7366
  auto* wrap = new SimpleWriteWrap<AsyncWrap>(this, object);
692
7366
  wrap->MakeWeak();
693
7366
  return wrap;
694
}
695
696
}  // namespace node