GCC Code Coverage Report
Directory: ../ Exec Total Coverage
File: /home/iojs/build/workspace/node-test-commit-linux-coverage-daily/nodes/benchmark/out/../src/stream_base.cc Lines: 296 314 94.3 %
Date: 2020-05-27 22:15:15 Branches: 206 332 62.0 %

Line Branch Exec Source
1
#include "stream_base.h"  // NOLINT(build/include_inline)
2
#include "stream_base-inl.h"
3
#include "stream_wrap.h"
4
5
#include "node.h"
6
#include "node_buffer.h"
7
#include "node_errors.h"
8
#include "env-inl.h"
9
#include "js_stream.h"
10
#include "string_bytes.h"
11
#include "util-inl.h"
12
#include "v8.h"
13
14
#include <climits>  // INT_MAX
15
16
namespace node {
17
18
using v8::Array;
19
using v8::ArrayBuffer;
20
using v8::ConstructorBehavior;
21
using v8::Context;
22
using v8::DontDelete;
23
using v8::DontEnum;
24
using v8::External;
25
using v8::Function;
26
using v8::FunctionCallbackInfo;
27
using v8::FunctionTemplate;
28
using v8::HandleScope;
29
using v8::Integer;
30
using v8::Local;
31
using v8::MaybeLocal;
32
using v8::Object;
33
using v8::PropertyAttribute;
34
using v8::ReadOnly;
35
using v8::SideEffectType;
36
using v8::Signature;
37
using v8::String;
38
using v8::Value;
39
40
template int StreamBase::WriteString<ASCII>(
41
    const FunctionCallbackInfo<Value>& args);
42
template int StreamBase::WriteString<UTF8>(
43
    const FunctionCallbackInfo<Value>& args);
44
template int StreamBase::WriteString<UCS2>(
45
    const FunctionCallbackInfo<Value>& args);
46
template int StreamBase::WriteString<LATIN1>(
47
    const FunctionCallbackInfo<Value>& args);
48
49
50
46355
int StreamBase::ReadStartJS(const FunctionCallbackInfo<Value>& args) {
51
46355
  return ReadStart();
52
}
53
54
55
20101
int StreamBase::ReadStopJS(const FunctionCallbackInfo<Value>& args) {
56
20101
  return ReadStop();
57
}
58
59
6
int StreamBase::UseUserBuffer(const FunctionCallbackInfo<Value>& args) {
60
6
  CHECK(Buffer::HasInstance(args[0]));
61
62
12
  uv_buf_t buf = uv_buf_init(Buffer::Data(args[0]), Buffer::Length(args[0]));
63
6
  PushStreamListener(new CustomBufferJSListener(buf));
64
6
  return 0;
65
}
66
67
30004
int StreamBase::Shutdown(const FunctionCallbackInfo<Value>& args) {
68
60008
  CHECK(args[0]->IsObject());
69
60008
  Local<Object> req_wrap_obj = args[0].As<Object>();
70
71
30004
  return Shutdown(req_wrap_obj);
72
}
73
74
330374
void StreamBase::SetWriteResult(const StreamWriteResult& res) {
75
330374
  env_->stream_base_state()[kBytesWritten] = res.bytes;
76
330374
  env_->stream_base_state()[kLastWriteWasAsync] = res.async;
77
330374
}
78
79
12674
int StreamBase::Writev(const FunctionCallbackInfo<Value>& args) {
80
12674
  Environment* env = Environment::GetCurrent(args);
81
82
25348
  CHECK(args[0]->IsObject());
83
25348
  CHECK(args[1]->IsArray());
84
85
25348
  Local<Object> req_wrap_obj = args[0].As<Object>();
86
25348
  Local<Array> chunks = args[1].As<Array>();
87
25348
  bool all_buffers = args[2]->IsTrue();
88
89
  size_t count;
90
12674
  if (all_buffers)
91
286
    count = chunks->Length();
92
  else
93
12388
    count = chunks->Length() >> 1;
94
95
25348
  MaybeStackBuffer<uv_buf_t, 16> bufs(count);
96
97
12674
  size_t storage_size = 0;
98
  size_t offset;
99
100
12674
  if (!all_buffers) {
101
    // Determine storage size first
102
81145
    for (size_t i = 0; i < count; i++) {
103
206271
      Local<Value> chunk = chunks->Get(env->context(), i * 2).ToLocalChecked();
104
105
68757
      if (Buffer::HasInstance(chunk))
106
29366
        continue;
107
        // Buffer chunk, no additional storage required
108
109
      // String chunk
110
118173
      Local<String> string = chunk->ToString(env->context()).ToLocalChecked();
111
39391
      enum encoding encoding = ParseEncoding(env->isolate(),
112
118173
          chunks->Get(env->context(), i * 2 + 1).ToLocalChecked());
113
      size_t chunk_size;
114


86178
      if (encoding == UTF8 && string->Length() > 65535 &&
115
39417
          !StringBytes::Size(env->isolate(), string, encoding).To(&chunk_size))
116
        return 0;
117
78782
      else if (!StringBytes::StorageSize(env->isolate(), string, encoding)
118
39391
                    .To(&chunk_size))
119
        return 0;
120
39391
      storage_size += chunk_size;
121
    }
122
123
12388
    if (storage_size > INT_MAX)
124
      return UV_ENOBUFS;
125
  } else {
126
26834
    for (size_t i = 0; i < count; i++) {
127
79644
      Local<Value> chunk = chunks->Get(env->context(), i).ToLocalChecked();
128
26548
      bufs[i].base = Buffer::Data(chunk);
129
26548
      bufs[i].len = Buffer::Length(chunk);
130
    }
131
  }
132
133
25348
  AllocatedBuffer storage;
134
12674
  if (storage_size > 0)
135
12342
    storage = env->AllocateManaged(storage_size);
136
137
12674
  offset = 0;
138
12674
  if (!all_buffers) {
139
81145
    for (size_t i = 0; i < count; i++) {
140
206271
      Local<Value> chunk = chunks->Get(env->context(), i * 2).ToLocalChecked();
141
142
      // Write buffer
143
68757
      if (Buffer::HasInstance(chunk)) {
144
29366
        bufs[i].base = Buffer::Data(chunk);
145
29366
        bufs[i].len = Buffer::Length(chunk);
146
29366
        continue;
147
      }
148
149
      // Write string
150
39391
      CHECK_LE(offset, storage_size);
151
39391
      char* str_storage = storage.data() + offset;
152
39391
      size_t str_size = storage.size() - offset;
153
154
118173
      Local<String> string = chunk->ToString(env->context()).ToLocalChecked();
155
39391
      enum encoding encoding = ParseEncoding(env->isolate(),
156
118173
          chunks->Get(env->context(), i * 2 + 1).ToLocalChecked());
157
78782
      str_size = StringBytes::Write(env->isolate(),
158
                                    str_storage,
159
                                    str_size,
160
                                    string,
161
39391
                                    encoding);
162
39391
      bufs[i].base = str_storage;
163
39391
      bufs[i].len = str_size;
164
39391
      offset += str_size;
165
    }
166
  }
167
168
12674
  StreamWriteResult res = Write(*bufs, count, nullptr, req_wrap_obj);
169
12674
  SetWriteResult(res);
170

12674
  if (res.wrap != nullptr && storage_size > 0) {
171
331
    res.wrap->SetAllocatedStorage(std::move(storage));
172
  }
173
12674
  return res.err;
174
}
175
176
177
44750
int StreamBase::WriteBuffer(const FunctionCallbackInfo<Value>& args) {
178
89500
  CHECK(args[0]->IsObject());
179
180
44750
  Environment* env = Environment::GetCurrent(args);
181
182
89500
  if (!args[1]->IsUint8Array()) {
183
1
    node::THROW_ERR_INVALID_ARG_TYPE(env, "Second argument must be a buffer");
184
1
    return 0;
185
  }
186
187
89498
  Local<Object> req_wrap_obj = args[0].As<Object>();
188
  uv_buf_t buf;
189
44749
  buf.base = Buffer::Data(args[1]);
190
44749
  buf.len = Buffer::Length(args[1]);
191
192
44749
  uv_stream_t* send_handle = nullptr;
193
194

89498
  if (args[2]->IsObject() && IsIPCPipe()) {
195
    Local<Object> send_handle_obj = args[2].As<Object>();
196
197
    HandleWrap* wrap;
198
    ASSIGN_OR_RETURN_UNWRAP(&wrap, send_handle_obj, UV_EINVAL);
199
    send_handle = reinterpret_cast<uv_stream_t*>(wrap->GetHandle());
200
    // Reference LibuvStreamWrap instance to prevent it from being garbage
201
    // collected before `AfterWrite` is called.
202
    req_wrap_obj->Set(env->context(),
203
                      env->handle_string(),
204
                      send_handle_obj).Check();
205
  }
206
207
44749
  StreamWriteResult res = Write(&buf, 1, send_handle, req_wrap_obj);
208
44749
  SetWriteResult(res);
209
210
44749
  return res.err;
211
}
212
213
214
template <enum encoding enc>
215
272951
int StreamBase::WriteString(const FunctionCallbackInfo<Value>& args) {
216
272951
  Environment* env = Environment::GetCurrent(args);
217


545902
  CHECK(args[0]->IsObject());
218


818853
  CHECK(args[1]->IsString());
219
220
545902
  Local<Object> req_wrap_obj = args[0].As<Object>();
221
545902
  Local<String> string = args[1].As<String>();
222
  Local<Object> send_handle_obj;
223


545902
  if (args[2]->IsObject())
224
202
    send_handle_obj = args[2].As<Object>();
225
226
  // Compute the size of the storage that the string will be flattened into.
227
  // For UTF8 strings that are very long, go ahead and take the hit for
228
  // computing their actual size, rather than tripling the storage.
229
  size_t storage_size;
230



543340
  if (enc == UTF8 && string->Length() > 65535 &&
231
270460
      !StringBytes::Size(env->isolate(), string, enc).To(&storage_size))
232
    return 0;
233


545902
  else if (!StringBytes::StorageSize(env->isolate(), string, enc)
234
272951
                .To(&storage_size))
235
    return 0;
236
237


272951
  if (storage_size > INT_MAX)
238
    return UV_ENOBUFS;
239
240
  // Try writing immediately if write size isn't too big
241
  char stack_storage[16384];  // 16kb
242
  size_t data_size;
243
272951
  size_t synchronously_written = 0;
244
  uv_buf_t buf;
245
246




816554
  bool try_write = storage_size <= sizeof(stack_storage) &&
247


546017
                   (!IsIPCPipe() || send_handle_obj.IsEmpty());
248


272951
  if (try_write) {
249
271752
    data_size = StringBytes::Write(env->isolate(),
250
                                   stack_storage,
251
                                   storage_size,
252
                                   string,
253
                                   enc);
254
271752
    buf = uv_buf_init(stack_storage, data_size);
255
256
271752
    uv_buf_t* bufs = &buf;
257
271752
    size_t count = 1;
258
271752
    const int err = DoTryWrite(&bufs, &count);
259
    // Keep track of the bytes written here, because we're taking a shortcut
260
    // by using `DoTryWrite()` directly instead of using the utilities
261
    // provided by `Write()`.
262


271752
    synchronously_written = count == 0 ? data_size : data_size - buf.len;
263
271752
    bytes_written_ += synchronously_written;
264
265
    // Immediate failure or success
266




271752
    if (err != 0 || count == 0) {
267
270038
      SetWriteResult(StreamWriteResult { false, err, nullptr, data_size });
268
270038
      return err;
269
    }
270
271
    // Partial write
272


1714
    CHECK_EQ(count, 1);
273
  }
274
275
5826
  AllocatedBuffer data;
276
277


2913
  if (try_write) {
278
    // Copy partial data
279
1714
    data = env->AllocateManaged(buf.len);
280
1714
    memcpy(data.data(), buf.base, buf.len);
281
1714
    data_size = buf.len;
282
  } else {
283
    // Write it
284
1199
    data = env->AllocateManaged(storage_size);
285
1199
    data_size = StringBytes::Write(env->isolate(),
286
                                   data.data(),
287
                                   storage_size,
288
                                   string,
289
                                   enc);
290
  }
291
292


2913
  CHECK_LE(data_size, storage_size);
293
294
2913
  buf = uv_buf_init(data.data(), data_size);
295
296
2913
  uv_stream_t* send_handle = nullptr;
297
298






3016
  if (IsIPCPipe() && !send_handle_obj.IsEmpty()) {
299
    HandleWrap* wrap;
300


101
    ASSIGN_OR_RETURN_UNWRAP(&wrap, send_handle_obj, UV_EINVAL);
301
101
    send_handle = reinterpret_cast<uv_stream_t*>(wrap->GetHandle());
302
    // Reference LibuvStreamWrap instance to prevent it from being garbage
303
    // collected before `AfterWrite` is called.
304
404
    req_wrap_obj->Set(env->context(),
305
                      env->handle_string(),
306
                      send_handle_obj).Check();
307
  }
308
309
2913
  StreamWriteResult res = Write(&buf, 1, send_handle, req_wrap_obj);
310
2913
  res.bytes += synchronously_written;
311
312
2913
  SetWriteResult(res);
313


2913
  if (res.wrap != nullptr) {
314
2891
    res.wrap->SetAllocatedStorage(std::move(data));
315
  }
316
317
2913
  return res.err;
318
}
319
320
321
317047
MaybeLocal<Value> StreamBase::CallJSOnreadMethod(ssize_t nread,
322
                                                 Local<ArrayBuffer> ab,
323
                                                 size_t offset,
324
                                                 StreamBaseJSChecks checks) {
325
317047
  Environment* env = env_;
326
327
  DCHECK_EQ(static_cast<int32_t>(nread), nread);
328
  DCHECK_LE(offset, INT32_MAX);
329
330
317047
  if (checks == DONT_SKIP_NREAD_CHECKS) {
331
317021
    if (ab.IsEmpty()) {
332
      DCHECK_EQ(offset, 0);
333
      DCHECK_LE(nread, 0);
334
    } else {
335
      DCHECK_GE(nread, 0);
336
    }
337
  }
338
339
317047
  env->stream_base_state()[kReadBytesOrError] = nread;
340
317047
  env->stream_base_state()[kArrayBufferOffset] = offset;
341
342
  Local<Value> argv[] = {
343
672588
    ab.IsEmpty() ? Undefined(env->isolate()).As<Value>() : ab.As<Value>()
344
1268188
  };
345
346
317047
  AsyncWrap* wrap = GetAsyncWrap();
347
317047
  CHECK_NOT_NULL(wrap);
348
634094
  Local<Value> onread = wrap->object()->GetInternalField(
349
634094
      StreamBase::kOnReadFunctionField);
350
317047
  CHECK(onread->IsFunction());
351
634094
  return wrap->MakeCallback(onread.As<Function>(), arraysize(argv), argv);
352
}
353
354
355
3379
bool StreamBase::IsIPCPipe() {
356
3379
  return false;
357
}
358
359
360
int StreamBase::GetFD() {
361
  return -1;
362
}
363
364
365
48104
Local<Object> StreamBase::GetObject() {
366
48104
  return GetAsyncWrap()->object();
367
}
368
369
39544
void StreamBase::AddMethod(Environment* env,
370
                           Local<Signature> signature,
371
                           enum PropertyAttribute attributes,
372
                           Local<FunctionTemplate> t,
373
                           JSMethodFunction* stream_method,
374
                           Local<String> string) {
375
  Local<FunctionTemplate> templ =
376
      env->NewFunctionTemplate(stream_method,
377
                               signature,
378
                               ConstructorBehavior::kThrow,
379
39544
                               SideEffectType::kHasNoSideEffect);
380
118632
  t->PrototypeTemplate()->SetAccessorProperty(
381
39544
      string, templ, Local<FunctionTemplate>(), attributes);
382
39544
}
383
384
9886
void StreamBase::AddMethods(Environment* env, Local<FunctionTemplate> t) {
385
19772
  HandleScope scope(env->isolate());
386
387
  enum PropertyAttribute attributes =
388
9886
      static_cast<PropertyAttribute>(ReadOnly | DontDelete | DontEnum);
389
9886
  Local<Signature> sig = Signature::New(env->isolate(), t);
390
391
9886
  AddMethod(env, sig, attributes, t, GetFD, env->fd_string());
392
9886
  AddMethod(
393
9886
      env, sig, attributes, t, GetExternal, env->external_stream_string());
394
9886
  AddMethod(env, sig, attributes, t, GetBytesRead, env->bytes_read_string());
395
9886
  AddMethod(
396
9886
      env, sig, attributes, t, GetBytesWritten, env->bytes_written_string());
397
9886
  env->SetProtoMethod(t, "readStart", JSMethod<&StreamBase::ReadStartJS>);
398
9886
  env->SetProtoMethod(t, "readStop", JSMethod<&StreamBase::ReadStopJS>);
399
9886
  env->SetProtoMethod(t, "shutdown", JSMethod<&StreamBase::Shutdown>);
400
  env->SetProtoMethod(t,
401
                      "useUserBuffer",
402
9886
                      JSMethod<&StreamBase::UseUserBuffer>);
403
9886
  env->SetProtoMethod(t, "writev", JSMethod<&StreamBase::Writev>);
404
9886
  env->SetProtoMethod(t, "writeBuffer", JSMethod<&StreamBase::WriteBuffer>);
405
  env->SetProtoMethod(
406
9886
      t, "writeAsciiString", JSMethod<&StreamBase::WriteString<ASCII>>);
407
  env->SetProtoMethod(
408
9886
      t, "writeUtf8String", JSMethod<&StreamBase::WriteString<UTF8>>);
409
  env->SetProtoMethod(
410
9886
      t, "writeUcs2String", JSMethod<&StreamBase::WriteString<UCS2>>);
411
  env->SetProtoMethod(
412
9886
      t, "writeLatin1String", JSMethod<&StreamBase::WriteString<LATIN1>>);
413
49430
  t->PrototypeTemplate()->Set(FIXED_ONE_BYTE_STRING(env->isolate(),
414
                                                    "isStreamBase"),
415
9886
                              True(env->isolate()));
416
39544
  t->PrototypeTemplate()->SetAccessor(
417
      FIXED_ONE_BYTE_STRING(env->isolate(), "onread"),
418
      BaseObject::InternalFieldGet<
419
          StreamBase::kOnReadFunctionField>,
420
      BaseObject::InternalFieldSet<
421
          StreamBase::kOnReadFunctionField,
422
9886
          &Value::IsFunction>);
423
9886
}
424
425
400
void StreamBase::GetFD(const FunctionCallbackInfo<Value>& args) {
426
  // Mimic implementation of StreamBase::GetFD() and UDPWrap::GetFD().
427
800
  StreamBase* wrap = StreamBase::FromObject(args.This().As<Object>());
428
400
  if (wrap == nullptr) return args.GetReturnValue().Set(UV_EINVAL);
429
430
400
  if (!wrap->IsAlive()) return args.GetReturnValue().Set(UV_EINVAL);
431
432
1200
  args.GetReturnValue().Set(wrap->GetFD());
433
}
434
435
34403
void StreamBase::GetBytesRead(const FunctionCallbackInfo<Value>& args) {
436
68806
  StreamBase* wrap = StreamBase::FromObject(args.This().As<Object>());
437
34405
  if (wrap == nullptr) return args.GetReturnValue().Set(0);
438
439
  // uint64_t -> double. 53bits is enough for all real cases.
440
103206
  args.GetReturnValue().Set(static_cast<double>(wrap->bytes_read_));
441
}
442
443
34404
void StreamBase::GetBytesWritten(const FunctionCallbackInfo<Value>& args) {
444
68808
  StreamBase* wrap = StreamBase::FromObject(args.This().As<Object>());
445
34404
  if (wrap == nullptr) return args.GetReturnValue().Set(0);
446
447
  // uint64_t -> double. 53bits is enough for all real cases.
448
103212
  args.GetReturnValue().Set(static_cast<double>(wrap->bytes_written_));
449
}
450
451
2
void StreamBase::GetExternal(const FunctionCallbackInfo<Value>& args) {
452
4
  StreamBase* wrap = StreamBase::FromObject(args.This().As<Object>());
453
2
  if (wrap == nullptr) return;
454
455
2
  Local<External> ext = External::New(args.GetIsolate(), wrap);
456
4
  args.GetReturnValue().Set(ext);
457
}
458
459
template <int (StreamBase::*Method)(const FunctionCallbackInfo<Value>& args)>
460
426842
void StreamBase::JSMethod(const FunctionCallbackInfo<Value>& args) {
461
853684
  StreamBase* wrap = StreamBase::FromObject(args.Holder().As<Object>());
462





426843
  if (wrap == nullptr) return;
463
464





426844
  if (!wrap->IsAlive()) return args.GetReturnValue().Set(UV_EINVAL);
465
466
853682
  AsyncHooks::DefaultTriggerAsyncIdScope trigger_scope(wrap->GetAsyncWrap());
467
1280523
  args.GetReturnValue().Set((wrap->*Method)(args));
468
}
469
470
8565
int StreamResource::DoTryWrite(uv_buf_t** bufs, size_t* count) {
471
  // No TryWrite by default
472
8565
  return 0;
473
}
474
475
476
44577
const char* StreamResource::Error() const {
477
44577
  return nullptr;
478
}
479
480
481
void StreamResource::ClearError() {
482
  // No-op
483
}
484
485
486
288962
uv_buf_t EmitToJSStreamListener::OnStreamAlloc(size_t suggested_size) {
487
288962
  CHECK_NOT_NULL(stream_);
488
288962
  Environment* env = static_cast<StreamBase*>(stream_)->stream_env();
489
288962
  return env->AllocateManaged(suggested_size).release();
490
}
491
492
303332
void EmitToJSStreamListener::OnStreamRead(ssize_t nread, const uv_buf_t& buf_) {
493
303332
  CHECK_NOT_NULL(stream_);
494
303332
  StreamBase* stream = static_cast<StreamBase*>(stream_);
495
303332
  Environment* env = stream->stream_env();
496
587338
  HandleScope handle_scope(env->isolate());
497
587338
  Context::Scope context_scope(env->context());
498
587338
  AllocatedBuffer buf(env, buf_);
499
500
303332
  if (nread <= 0)  {
501
19272
    if (nread < 0)
502
19221
      stream->CallJSOnreadMethod(nread, Local<ArrayBuffer>());
503
19245
    return;
504
  }
505
506
284060
  CHECK_LE(static_cast<size_t>(nread), buf.size());
507
284060
  buf.Resize(nread);
508
509
284060
  stream->CallJSOnreadMethod(nread, buf.ToArrayBuffer());
510
}
511
512
513
26
uv_buf_t CustomBufferJSListener::OnStreamAlloc(size_t suggested_size) {
514
26
  return buffer_;
515
}
516
517
518
26
void CustomBufferJSListener::OnStreamRead(ssize_t nread, const uv_buf_t& buf) {
519
26
  CHECK_NOT_NULL(stream_);
520
521
26
  StreamBase* stream = static_cast<StreamBase*>(stream_);
522
26
  Environment* env = stream->stream_env();
523
52
  HandleScope handle_scope(env->isolate());
524
52
  Context::Scope context_scope(env->context());
525
526
  // To deal with the case where POLLHUP is received and UV_EOF is returned, as
527
  // libuv returns an empty buffer (on unices only).
528

26
  if (nread == UV_EOF && buf.base == nullptr) {
529
    stream->CallJSOnreadMethod(nread, Local<ArrayBuffer>());
530
    return;
531
  }
532
533
26
  CHECK_EQ(buf.base, buffer_.base);
534
535
  MaybeLocal<Value> ret = stream->CallJSOnreadMethod(nread,
536
                             Local<ArrayBuffer>(),
537
                             0,
538
26
                             StreamBase::SKIP_NREAD_CHECKS);
539
  Local<Value> next_buf_v;
540

78
  if (ret.ToLocal(&next_buf_v) && !next_buf_v->IsUndefined()) {
541
12
    buffer_.base = Buffer::Data(next_buf_v);
542
12
    buffer_.len = Buffer::Length(next_buf_v);
543
  }
544
}
545
546
547
11747
void ReportWritesToJSStreamListener::OnStreamAfterReqFinished(
548
    StreamReq* req_wrap, int status) {
549
11747
  StreamBase* stream = static_cast<StreamBase*>(stream_);
550
11747
  Environment* env = stream->stream_env();
551
11747
  AsyncWrap* async_wrap = req_wrap->GetAsyncWrap();
552
23493
  HandleScope handle_scope(env->isolate());
553
11747
  Context::Scope context_scope(env->context());
554
23494
  CHECK(!async_wrap->persistent().IsEmpty());
555
11747
  Local<Object> req_wrap_obj = async_wrap->object();
556
557
  Local<Value> argv[] = {
558
    Integer::New(env->isolate(), status),
559
11747
    stream->GetObject(),
560
    Undefined(env->isolate())
561
58735
  };
562
563
11747
  const char* msg = stream->Error();
564
11747
  if (msg != nullptr) {
565
    argv[2] = OneByteString(env->isolate(), msg);
566
    stream->ClearError();
567
  }
568
569
46988
  if (req_wrap_obj->Has(env->context(), env->oncomplete_string()).FromJust())
570
11746
    async_wrap->MakeCallback(env->oncomplete_string(), arraysize(argv), argv);
571
11746
}
572
573
5259
void ReportWritesToJSStreamListener::OnStreamAfterWrite(
574
    WriteWrap* req_wrap, int status) {
575
5259
  OnStreamAfterReqFinished(req_wrap, status);
576
5258
}
577
578
6488
void ReportWritesToJSStreamListener::OnStreamAfterShutdown(
579
    ShutdownWrap* req_wrap, int status) {
580
6488
  OnStreamAfterReqFinished(req_wrap, status);
581
6488
}
582
583
6488
void ShutdownWrap::OnDone(int status) {
584
6488
  stream()->EmitAfterShutdown(this, status);
585
6488
  Dispose();
586
6488
}
587
588
7332
void WriteWrap::OnDone(int status) {
589
7332
  stream()->EmitAfterWrite(this, status);
590
7331
  Dispose();
591
7331
}
592
593
204794
StreamListener::~StreamListener() {
594
102397
  if (stream_ != nullptr)
595
86736
    stream_->RemoveStreamListener(this);
596
102397
}
597
598
3050
void StreamListener::OnStreamAfterShutdown(ShutdownWrap* w, int status) {
599
3050
  CHECK_NOT_NULL(previous_listener_);
600
3050
  previous_listener_->OnStreamAfterShutdown(w, status);
601
3050
}
602
603
3754
void StreamListener::OnStreamAfterWrite(WriteWrap* w, int status) {
604
3754
  CHECK_NOT_NULL(previous_listener_);
605
3754
  previous_listener_->OnStreamAfterWrite(w, status);
606
3754
}
607
608
125674
StreamResource::~StreamResource() {
609
63955
  while (listener_ != nullptr) {
610
559
    StreamListener* listener = listener_;
611
559
    listener->OnStreamDestroy();
612
    // Remove the listener if it didn’t remove itself. This makes the logic
613
    // in `OnStreamDestroy()` implementations easier, because they
614
    // may call generic cleanup functions which can just remove the
615
    // listener unconditionally.
616
559
    if (listener == listener_)
617
553
      RemoveStreamListener(listener_);
618
  }
619
62837
}
620
621
12
ShutdownWrap* StreamBase::CreateShutdownWrap(
622
    Local<Object> object) {
623
12
  return new SimpleShutdownWrap<AsyncWrap>(this, object);
624
}
625
626
6851
WriteWrap* StreamBase::CreateWriteWrap(
627
    Local<Object> object) {
628
6851
  return new SimpleWriteWrap<AsyncWrap>(this, object);
629
}
630
631
}  // namespace node