GCC Code Coverage Report
Directory: ../ Exec Total Coverage
File: /home/iojs/build/workspace/node-test-commit-linux-coverage-daily/nodes/benchmark/out/../src/stream_base.cc Lines: 303 319 95.0 %
Date: 2021-01-16 04:10:54 Branches: 210 336 62.5 %

Line Branch Exec Source
1
#include "stream_base.h"  // NOLINT(build/include_inline)
2
#include "stream_base-inl.h"
3
#include "stream_wrap.h"
4
#include "allocated_buffer-inl.h"
5
6
#include "node.h"
7
#include "node_buffer.h"
8
#include "node_errors.h"
9
#include "env-inl.h"
10
#include "js_stream.h"
11
#include "string_bytes.h"
12
#include "util-inl.h"
13
#include "v8.h"
14
15
#include <climits>  // INT_MAX
16
17
namespace node {
18
19
using v8::Array;
20
using v8::ArrayBuffer;
21
using v8::ConstructorBehavior;
22
using v8::Context;
23
using v8::DontDelete;
24
using v8::DontEnum;
25
using v8::External;
26
using v8::Function;
27
using v8::FunctionCallbackInfo;
28
using v8::FunctionTemplate;
29
using v8::HandleScope;
30
using v8::Integer;
31
using v8::Local;
32
using v8::MaybeLocal;
33
using v8::Object;
34
using v8::PropertyAttribute;
35
using v8::ReadOnly;
36
using v8::SideEffectType;
37
using v8::Signature;
38
using v8::String;
39
using v8::Value;
40
41
template int StreamBase::WriteString<ASCII>(
42
    const FunctionCallbackInfo<Value>& args);
43
template int StreamBase::WriteString<UTF8>(
44
    const FunctionCallbackInfo<Value>& args);
45
template int StreamBase::WriteString<UCS2>(
46
    const FunctionCallbackInfo<Value>& args);
47
template int StreamBase::WriteString<LATIN1>(
48
    const FunctionCallbackInfo<Value>& args);
49
50
51
54014
int StreamBase::ReadStartJS(const FunctionCallbackInfo<Value>& args) {
52
54014
  return ReadStart();
53
}
54
55
56
31918
int StreamBase::ReadStopJS(const FunctionCallbackInfo<Value>& args) {
57
31918
  return ReadStop();
58
}
59
60
12
int StreamBase::UseUserBuffer(const FunctionCallbackInfo<Value>& args) {
61
12
  CHECK(Buffer::HasInstance(args[0]));
62
63
24
  uv_buf_t buf = uv_buf_init(Buffer::Data(args[0]), Buffer::Length(args[0]));
64
12
  PushStreamListener(new CustomBufferJSListener(buf));
65
12
  return 0;
66
}
67
68
30700
int StreamBase::Shutdown(const FunctionCallbackInfo<Value>& args) {
69
61400
  CHECK(args[0]->IsObject());
70
61400
  Local<Object> req_wrap_obj = args[0].As<Object>();
71
72
30700
  return Shutdown(req_wrap_obj);
73
}
74
75
334766
void StreamBase::SetWriteResult(const StreamWriteResult& res) {
76
334766
  env_->stream_base_state()[kBytesWritten] = res.bytes;
77
334766
  env_->stream_base_state()[kLastWriteWasAsync] = res.async;
78
334766
}
79
80
13095
int StreamBase::Writev(const FunctionCallbackInfo<Value>& args) {
81
13095
  Environment* env = Environment::GetCurrent(args);
82
83
26190
  CHECK(args[0]->IsObject());
84
26190
  CHECK(args[1]->IsArray());
85
86
26190
  Local<Object> req_wrap_obj = args[0].As<Object>();
87
26190
  Local<Array> chunks = args[1].As<Array>();
88
26190
  bool all_buffers = args[2]->IsTrue();
89
90
  size_t count;
91
13095
  if (all_buffers)
92
317
    count = chunks->Length();
93
  else
94
12778
    count = chunks->Length() >> 1;
95
96
26190
  MaybeStackBuffer<uv_buf_t, 16> bufs(count);
97
98
13095
  size_t storage_size = 0;
99
  size_t offset;
100
101
13095
  if (!all_buffers) {
102
    // Determine storage size first
103
83008
    for (size_t i = 0; i < count; i++) {
104
210690
      Local<Value> chunk = chunks->Get(env->context(), i * 2).ToLocalChecked();
105
106
70230
      if (Buffer::HasInstance(chunk))
107
30053
        continue;
108
        // Buffer chunk, no additional storage required
109
110
      // String chunk
111
120531
      Local<String> string = chunk->ToString(env->context()).ToLocalChecked();
112
40177
      enum encoding encoding = ParseEncoding(env->isolate(),
113
120531
          chunks->Get(env->context(), i * 2 + 1).ToLocalChecked());
114
      size_t chunk_size;
115


87782
      if (encoding == UTF8 && string->Length() > 65535 &&
116
40203
          !StringBytes::Size(env->isolate(), string, encoding).To(&chunk_size))
117
        return 0;
118
80354
      else if (!StringBytes::StorageSize(env->isolate(), string, encoding)
119
40177
                    .To(&chunk_size))
120
        return 0;
121
40177
      storage_size += chunk_size;
122
    }
123
124
12778
    if (storage_size > INT_MAX)
125
      return UV_ENOBUFS;
126
  } else {
127
27288
    for (size_t i = 0; i < count; i++) {
128
80913
      Local<Value> chunk = chunks->Get(env->context(), i).ToLocalChecked();
129
26971
      bufs[i].base = Buffer::Data(chunk);
130
26971
      bufs[i].len = Buffer::Length(chunk);
131
    }
132
  }
133
134
26190
  AllocatedBuffer storage;
135
13095
  if (storage_size > 0)
136
12732
    storage = AllocatedBuffer::AllocateManaged(env, storage_size);
137
138
13095
  offset = 0;
139
13095
  if (!all_buffers) {
140
83008
    for (size_t i = 0; i < count; i++) {
141
210690
      Local<Value> chunk = chunks->Get(env->context(), i * 2).ToLocalChecked();
142
143
      // Write buffer
144
70230
      if (Buffer::HasInstance(chunk)) {
145
30053
        bufs[i].base = Buffer::Data(chunk);
146
30053
        bufs[i].len = Buffer::Length(chunk);
147
30053
        continue;
148
      }
149
150
      // Write string
151
40177
      CHECK_LE(offset, storage_size);
152
40177
      char* str_storage = storage.data() + offset;
153
40177
      size_t str_size = storage.size() - offset;
154
155
120531
      Local<String> string = chunk->ToString(env->context()).ToLocalChecked();
156
40177
      enum encoding encoding = ParseEncoding(env->isolate(),
157
120531
          chunks->Get(env->context(), i * 2 + 1).ToLocalChecked());
158
80354
      str_size = StringBytes::Write(env->isolate(),
159
                                    str_storage,
160
                                    str_size,
161
                                    string,
162
40177
                                    encoding);
163
40177
      bufs[i].base = str_storage;
164
40177
      bufs[i].len = str_size;
165
40177
      offset += str_size;
166
    }
167
  }
168
169
26190
  StreamWriteResult res = Write(*bufs, count, nullptr, req_wrap_obj);
170
13095
  SetWriteResult(res);
171

13095
  if (res.wrap != nullptr && storage_size > 0) {
172
351
    res.wrap->SetAllocatedStorage(std::move(storage));
173
  }
174
13095
  return res.err;
175
}
176
177
178
45337
int StreamBase::WriteBuffer(const FunctionCallbackInfo<Value>& args) {
179
90674
  CHECK(args[0]->IsObject());
180
181
45337
  Environment* env = Environment::GetCurrent(args);
182
183
90674
  if (!args[1]->IsUint8Array()) {
184
1
    node::THROW_ERR_INVALID_ARG_TYPE(env, "Second argument must be a buffer");
185
1
    return 0;
186
  }
187
188
90672
  Local<Object> req_wrap_obj = args[0].As<Object>();
189
  uv_buf_t buf;
190
45336
  buf.base = Buffer::Data(args[1]);
191
45336
  buf.len = Buffer::Length(args[1]);
192
193
45336
  uv_stream_t* send_handle = nullptr;
194
195

90672
  if (args[2]->IsObject() && IsIPCPipe()) {
196
    Local<Object> send_handle_obj = args[2].As<Object>();
197
198
    HandleWrap* wrap;
199
    ASSIGN_OR_RETURN_UNWRAP(&wrap, send_handle_obj, UV_EINVAL);
200
    send_handle = reinterpret_cast<uv_stream_t*>(wrap->GetHandle());
201
    // Reference LibuvStreamWrap instance to prevent it from being garbage
202
    // collected before `AfterWrite` is called.
203
    req_wrap_obj->Set(env->context(),
204
                      env->handle_string(),
205
                      send_handle_obj).Check();
206
  }
207
208
90672
  StreamWriteResult res = Write(&buf, 1, send_handle, req_wrap_obj);
209
45336
  SetWriteResult(res);
210
211
45336
  return res.err;
212
}
213
214
215
template <enum encoding enc>
216
276335
int StreamBase::WriteString(const FunctionCallbackInfo<Value>& args) {
217
276335
  Environment* env = Environment::GetCurrent(args);
218


552670
  CHECK(args[0]->IsObject());
219


829005
  CHECK(args[1]->IsString());
220
221
552670
  Local<Object> req_wrap_obj = args[0].As<Object>();
222
552670
  Local<String> string = args[1].As<String>();
223
  Local<Object> send_handle_obj;
224


552670
  if (args[2]->IsObject())
225
210
    send_handle_obj = args[2].As<Object>();
226
227
  // Compute the size of the storage that the string will be flattened into.
228
  // For UTF8 strings that are very long, go ahead and take the hit for
229
  // computing their actual size, rather than tripling the storage.
230
  size_t storage_size;
231



549707
  if (enc == UTF8 && string->Length() > 65535 &&
232
273443
      !StringBytes::Size(env->isolate(), string, enc).To(&storage_size))
233
    return 0;
234


552670
  else if (!StringBytes::StorageSize(env->isolate(), string, enc)
235
276335
                .To(&storage_size))
236
    return 0;
237
238


276335
  if (storage_size > INT_MAX)
239
    return UV_ENOBUFS;
240
241
  // Try writing immediately if write size isn't too big
242
  char stack_storage[16384];  // 16kb
243
  size_t data_size;
244
276335
  size_t synchronously_written = 0;
245
  uv_buf_t buf;
246
247




826700
  bool try_write = storage_size <= sizeof(stack_storage) &&
248


556943
                   (!IsIPCPipe() || send_handle_obj.IsEmpty());
249


276335
  if (try_write) {
250
275131
    data_size = StringBytes::Write(env->isolate(),
251
                                   stack_storage,
252
                                   storage_size,
253
                                   string,
254
                                   enc);
255
275131
    buf = uv_buf_init(stack_storage, data_size);
256
257
275131
    uv_buf_t* bufs = &buf;
258
275131
    size_t count = 1;
259
275131
    const int err = DoTryWrite(&bufs, &count);
260
    // Keep track of the bytes written here, because we're taking a shortcut
261
    // by using `DoTryWrite()` directly instead of using the utilities
262
    // provided by `Write()`.
263


275131
    synchronously_written = count == 0 ? data_size : data_size - buf.len;
264
275131
    bytes_written_ += synchronously_written;
265
266
    // Immediate failure or success
267




275131
    if (err != 0 || count == 0) {
268
273373
      SetWriteResult(StreamWriteResult { false, err, nullptr, data_size, {} });
269
273373
      return err;
270
    }
271
272
    // Partial write
273


1758
    CHECK_EQ(count, 1);
274
  }
275
276
5924
  AllocatedBuffer data;
277
278


2962
  if (try_write) {
279
    // Copy partial data
280
1758
    data = AllocatedBuffer::AllocateManaged(env, buf.len);
281
1758
    memcpy(data.data(), buf.base, buf.len);
282
1758
    data_size = buf.len;
283
  } else {
284
    // Write it
285
1204
    data = AllocatedBuffer::AllocateManaged(env, storage_size);
286
1204
    data_size = StringBytes::Write(env->isolate(),
287
                                   data.data(),
288
                                   storage_size,
289
                                   string,
290
                                   enc);
291
  }
292
293


2962
  CHECK_LE(data_size, storage_size);
294
295
2962
  buf = uv_buf_init(data.data(), data_size);
296
297
2962
  uv_stream_t* send_handle = nullptr;
298
299






3069
  if (IsIPCPipe() && !send_handle_obj.IsEmpty()) {
300
    HandleWrap* wrap;
301


105
    ASSIGN_OR_RETURN_UNWRAP(&wrap, send_handle_obj, UV_EINVAL);
302
105
    send_handle = reinterpret_cast<uv_stream_t*>(wrap->GetHandle());
303
    // Reference LibuvStreamWrap instance to prevent it from being garbage
304
    // collected before `AfterWrite` is called.
305
420
    req_wrap_obj->Set(env->context(),
306
                      env->handle_string(),
307
                      send_handle_obj).Check();
308
  }
309
310
5924
  StreamWriteResult res = Write(&buf, 1, send_handle, req_wrap_obj);
311
2962
  res.bytes += synchronously_written;
312
313
2962
  SetWriteResult(res);
314


2962
  if (res.wrap != nullptr) {
315
2937
    res.wrap->SetAllocatedStorage(std::move(data));
316
  }
317
318
2962
  return res.err;
319
}
320
321
322
310104
MaybeLocal<Value> StreamBase::CallJSOnreadMethod(ssize_t nread,
323
                                                 Local<ArrayBuffer> ab,
324
                                                 size_t offset,
325
                                                 StreamBaseJSChecks checks) {
326
310104
  Environment* env = env_;
327
328
  DCHECK_EQ(static_cast<int32_t>(nread), nread);
329
  DCHECK_LE(offset, INT32_MAX);
330
331
310104
  if (checks == DONT_SKIP_NREAD_CHECKS) {
332
308014
    if (ab.IsEmpty()) {
333
      DCHECK_EQ(offset, 0);
334
      DCHECK_LE(nread, 0);
335
    } else {
336
      DCHECK_GE(nread, 0);
337
    }
338
  }
339
340
310104
  env->stream_base_state()[kReadBytesOrError] = nread;
341
310104
  env->stream_base_state()[kArrayBufferOffset] = offset;
342
343
  Local<Value> argv[] = {
344
663878
    ab.IsEmpty() ? Undefined(env->isolate()).As<Value>() : ab.As<Value>()
345
1240416
  };
346
347
310104
  AsyncWrap* wrap = GetAsyncWrap();
348
310104
  CHECK_NOT_NULL(wrap);
349
620208
  Local<Value> onread = wrap->object()->GetInternalField(
350
620208
      StreamBase::kOnReadFunctionField);
351
310104
  CHECK(onread->IsFunction());
352
620208
  return wrap->MakeCallback(onread.As<Function>(), arraysize(argv), argv);
353
}
354
355
356
3467
bool StreamBase::IsIPCPipe() {
357
3467
  return false;
358
}
359
360
361
int StreamBase::GetFD() {
362
  return -1;
363
}
364
365
366
49261
Local<Object> StreamBase::GetObject() {
367
49261
  return GetAsyncWrap()->object();
368
}
369
370
43004
void StreamBase::AddMethod(Environment* env,
371
                           Local<Signature> signature,
372
                           enum PropertyAttribute attributes,
373
                           Local<FunctionTemplate> t,
374
                           JSMethodFunction* stream_method,
375
                           Local<String> string) {
376
  Local<FunctionTemplate> templ =
377
      env->NewFunctionTemplate(stream_method,
378
                               signature,
379
                               ConstructorBehavior::kThrow,
380
43004
                               SideEffectType::kHasNoSideEffect);
381
129012
  t->PrototypeTemplate()->SetAccessorProperty(
382
43004
      string, templ, Local<FunctionTemplate>(), attributes);
383
43004
}
384
385
10751
void StreamBase::AddMethods(Environment* env, Local<FunctionTemplate> t) {
386
21502
  HandleScope scope(env->isolate());
387
388
  enum PropertyAttribute attributes =
389
10751
      static_cast<PropertyAttribute>(ReadOnly | DontDelete | DontEnum);
390
10751
  Local<Signature> sig = Signature::New(env->isolate(), t);
391
392
10751
  AddMethod(env, sig, attributes, t, GetFD, env->fd_string());
393
10751
  AddMethod(
394
10751
      env, sig, attributes, t, GetExternal, env->external_stream_string());
395
10751
  AddMethod(env, sig, attributes, t, GetBytesRead, env->bytes_read_string());
396
10751
  AddMethod(
397
10751
      env, sig, attributes, t, GetBytesWritten, env->bytes_written_string());
398
10751
  env->SetProtoMethod(t, "readStart", JSMethod<&StreamBase::ReadStartJS>);
399
10751
  env->SetProtoMethod(t, "readStop", JSMethod<&StreamBase::ReadStopJS>);
400
10751
  env->SetProtoMethod(t, "shutdown", JSMethod<&StreamBase::Shutdown>);
401
  env->SetProtoMethod(t,
402
                      "useUserBuffer",
403
10751
                      JSMethod<&StreamBase::UseUserBuffer>);
404
10751
  env->SetProtoMethod(t, "writev", JSMethod<&StreamBase::Writev>);
405
10751
  env->SetProtoMethod(t, "writeBuffer", JSMethod<&StreamBase::WriteBuffer>);
406
  env->SetProtoMethod(
407
10751
      t, "writeAsciiString", JSMethod<&StreamBase::WriteString<ASCII>>);
408
  env->SetProtoMethod(
409
10751
      t, "writeUtf8String", JSMethod<&StreamBase::WriteString<UTF8>>);
410
  env->SetProtoMethod(
411
10751
      t, "writeUcs2String", JSMethod<&StreamBase::WriteString<UCS2>>);
412
  env->SetProtoMethod(
413
10751
      t, "writeLatin1String", JSMethod<&StreamBase::WriteString<LATIN1>>);
414
53755
  t->PrototypeTemplate()->Set(FIXED_ONE_BYTE_STRING(env->isolate(),
415
                                                    "isStreamBase"),
416
10751
                              True(env->isolate()));
417
43004
  t->PrototypeTemplate()->SetAccessor(
418
      FIXED_ONE_BYTE_STRING(env->isolate(), "onread"),
419
      BaseObject::InternalFieldGet<
420
          StreamBase::kOnReadFunctionField>,
421
      BaseObject::InternalFieldSet<
422
          StreamBase::kOnReadFunctionField,
423
10751
          &Value::IsFunction>);
424
10751
}
425
426
756
void StreamBase::GetFD(const FunctionCallbackInfo<Value>& args) {
427
  // Mimic implementation of StreamBase::GetFD() and UDPWrap::GetFD().
428
1512
  StreamBase* wrap = StreamBase::FromObject(args.This().As<Object>());
429
756
  if (wrap == nullptr) return args.GetReturnValue().Set(UV_EINVAL);
430
431
756
  if (!wrap->IsAlive()) return args.GetReturnValue().Set(UV_EINVAL);
432
433
2268
  args.GetReturnValue().Set(wrap->GetFD());
434
}
435
436
35160
void StreamBase::GetBytesRead(const FunctionCallbackInfo<Value>& args) {
437
70320
  StreamBase* wrap = StreamBase::FromObject(args.This().As<Object>());
438
35162
  if (wrap == nullptr) return args.GetReturnValue().Set(0);
439
440
  // uint64_t -> double. 53bits is enough for all real cases.
441
105477
  args.GetReturnValue().Set(static_cast<double>(wrap->bytes_read_));
442
}
443
444
35186
void StreamBase::GetBytesWritten(const FunctionCallbackInfo<Value>& args) {
445
70372
  StreamBase* wrap = StreamBase::FromObject(args.This().As<Object>());
446
35186
  if (wrap == nullptr) return args.GetReturnValue().Set(0);
447
448
  // uint64_t -> double. 53bits is enough for all real cases.
449
105558
  args.GetReturnValue().Set(static_cast<double>(wrap->bytes_written_));
450
}
451
452
2
void StreamBase::GetExternal(const FunctionCallbackInfo<Value>& args) {
453
4
  StreamBase* wrap = StreamBase::FromObject(args.This().As<Object>());
454
2
  if (wrap == nullptr) return;
455
456
2
  Local<External> ext = External::New(args.GetIsolate(), wrap);
457
4
  args.GetReturnValue().Set(ext);
458
}
459
460
template <int (StreamBase::*Method)(const FunctionCallbackInfo<Value>& args)>
461
451412
void StreamBase::JSMethod(const FunctionCallbackInfo<Value>& args) {
462
902824
  StreamBase* wrap = StreamBase::FromObject(args.Holder().As<Object>());
463





451413
  if (wrap == nullptr) return;
464
465





451414
  if (!wrap->IsAlive()) return args.GetReturnValue().Set(UV_EINVAL);
466
467
902822
  AsyncHooks::DefaultTriggerAsyncIdScope trigger_scope(wrap->GetAsyncWrap());
468
1354233
  args.GetReturnValue().Set((wrap->*Method)(args));
469
}
470
471
9037
int StreamResource::DoTryWrite(uv_buf_t** bufs, size_t* count) {
472
  // No TryWrite by default
473
9037
  return 0;
474
}
475
476
477
45953
const char* StreamResource::Error() const {
478
45953
  return nullptr;
479
}
480
481
482
void StreamResource::ClearError() {
483
  // No-op
484
}
485
486
487
279700
uv_buf_t EmitToJSStreamListener::OnStreamAlloc(size_t suggested_size) {
488
279700
  CHECK_NOT_NULL(stream_);
489
279700
  Environment* env = static_cast<StreamBase*>(stream_)->stream_env();
490
279700
  return AllocatedBuffer::AllocateManaged(env, suggested_size).release();
491
}
492
493
294219
void EmitToJSStreamListener::OnStreamRead(ssize_t nread, const uv_buf_t& buf_) {
494
294219
  CHECK_NOT_NULL(stream_);
495
294219
  StreamBase* stream = static_cast<StreamBase*>(stream_);
496
294219
  Environment* env = stream->stream_env();
497
568552
  HandleScope handle_scope(env->isolate());
498
568552
  Context::Scope context_scope(env->context());
499
568552
  AllocatedBuffer buf(env, buf_);
500
501
294219
  if (nread <= 0)  {
502
19742
    if (nread < 0)
503
19739
      stream->CallJSOnreadMethod(nread, Local<ArrayBuffer>());
504
19713
    return;
505
  }
506
507
274477
  CHECK_LE(static_cast<size_t>(nread), buf.size());
508
274477
  buf.Resize(nread);
509
510
274477
  stream->CallJSOnreadMethod(nread, buf.ToArrayBuffer());
511
}
512
513
514
2090
uv_buf_t CustomBufferJSListener::OnStreamAlloc(size_t suggested_size) {
515
2090
  return buffer_;
516
}
517
518
519
2096
void CustomBufferJSListener::OnStreamRead(ssize_t nread, const uv_buf_t& buf) {
520
2096
  CHECK_NOT_NULL(stream_);
521
522
2096
  StreamBase* stream = static_cast<StreamBase*>(stream_);
523
2096
  Environment* env = stream->stream_env();
524
4186
  HandleScope handle_scope(env->isolate());
525
4186
  Context::Scope context_scope(env->context());
526
527
  // To deal with the case where POLLHUP is received and UV_EOF is returned, as
528
  // libuv returns an empty buffer (on unices only).
529

2096
  if (nread == UV_EOF && buf.base == nullptr) {
530
6
    stream->CallJSOnreadMethod(nread, Local<ArrayBuffer>());
531
6
    return;
532
  }
533
534
2090
  CHECK_EQ(buf.base, buffer_.base);
535
536
  MaybeLocal<Value> ret = stream->CallJSOnreadMethod(nread,
537
                             Local<ArrayBuffer>(),
538
                             0,
539
2090
                             StreamBase::SKIP_NREAD_CHECKS);
540
  Local<Value> next_buf_v;
541

6270
  if (ret.ToLocal(&next_buf_v) && !next_buf_v->IsUndefined()) {
542
24
    buffer_.base = Buffer::Data(next_buf_v);
543
24
    buffer_.len = Buffer::Length(next_buf_v);
544
  }
545
}
546
547
548
12546
void ReportWritesToJSStreamListener::OnStreamAfterReqFinished(
549
    StreamReq* req_wrap, int status) {
550
12546
  StreamBase* stream = static_cast<StreamBase*>(stream_);
551
12546
  Environment* env = stream->stream_env();
552
12546
  AsyncWrap* async_wrap = req_wrap->GetAsyncWrap();
553
25091
  HandleScope handle_scope(env->isolate());
554
12546
  Context::Scope context_scope(env->context());
555
25092
  CHECK(!async_wrap->persistent().IsEmpty());
556
12546
  Local<Object> req_wrap_obj = async_wrap->object();
557
558
  Local<Value> argv[] = {
559
    Integer::New(env->isolate(), status),
560
12546
    stream->GetObject(),
561
    Undefined(env->isolate())
562
62730
  };
563
564
12546
  const char* msg = stream->Error();
565
12546
  if (msg != nullptr) {
566
    argv[2] = OneByteString(env->isolate(), msg);
567
    stream->ClearError();
568
  }
569
570
50184
  if (req_wrap_obj->Has(env->context(), env->oncomplete_string()).FromJust())
571
12545
    async_wrap->MakeCallback(env->oncomplete_string(), arraysize(argv), argv);
572
12545
}
573
574
5317
void ReportWritesToJSStreamListener::OnStreamAfterWrite(
575
    WriteWrap* req_wrap, int status) {
576
5317
  OnStreamAfterReqFinished(req_wrap, status);
577
5316
}
578
579
7229
void ReportWritesToJSStreamListener::OnStreamAfterShutdown(
580
    ShutdownWrap* req_wrap, int status) {
581
7229
  OnStreamAfterReqFinished(req_wrap, status);
582
7229
}
583
584
7229
void ShutdownWrap::OnDone(int status) {
585
7229
  stream()->EmitAfterShutdown(this, status);
586
7229
  Dispose();
587
7229
}
588
589
7531
void WriteWrap::OnDone(int status) {
590
7531
  stream()->EmitAfterWrite(this, status);
591
7530
  Dispose();
592
7530
}
593
594
208528
StreamListener::~StreamListener() {
595
104264
  if (stream_ != nullptr)
596
87896
    stream_->RemoveStreamListener(this);
597
104264
}
598
599
3428
void StreamListener::OnStreamAfterShutdown(ShutdownWrap* w, int status) {
600
3428
  CHECK_NOT_NULL(previous_listener_);
601
3428
  previous_listener_->OnStreamAfterShutdown(w, status);
602
3428
}
603
604
4007
void StreamListener::OnStreamAfterWrite(WriteWrap* w, int status) {
605
4007
  CHECK_NOT_NULL(previous_listener_);
606
4007
  previous_listener_->OnStreamAfterWrite(w, status);
607
4007
}
608
609
128072
StreamResource::~StreamResource() {
610
65240
  while (listener_ != nullptr) {
611
602
    StreamListener* listener = listener_;
612
602
    listener->OnStreamDestroy();
613
    // Remove the listener if it didn’t remove itself. This makes the logic
614
    // in `OnStreamDestroy()` implementations easier, because they
615
    // may call generic cleanup functions which can just remove the
616
    // listener unconditionally.
617
602
    if (listener == listener_)
618
590
      RemoveStreamListener(listener_);
619
  }
620
64036
}
621
622
12
ShutdownWrap* StreamBase::CreateShutdownWrap(
623
    Local<Object> object) {
624
12
  auto* wrap = new SimpleShutdownWrap<AsyncWrap>(this, object);
625
12
  wrap->MakeWeak();
626
12
  return wrap;
627
}
628
629
7279
WriteWrap* StreamBase::CreateWriteWrap(
630
    Local<Object> object) {
631
7279
  auto* wrap = new SimpleWriteWrap<AsyncWrap>(this, object);
632
7279
  wrap->MakeWeak();
633
7279
  return wrap;
634
}
635
636

13995
}  // namespace node