GCC Code Coverage Report
Directory: ../ Exec Total Coverage
File: /home/iojs/build/workspace/node-test-commit-linux-coverage-daily/nodes/benchmark/out/../src/stream_pipe.cc Lines: 138 183 75.4 %
Date: 2020-08-22 22:13:06 Branches: 51 102 50.0 %

Line Branch Exec Source
1
#include "stream_pipe.h"
2
#include "allocated_buffer-inl.h"
3
#include "stream_base-inl.h"
4
#include "node_buffer.h"
5
#include "util-inl.h"
6
7
namespace node {
8
9
using v8::Context;
10
using v8::Function;
11
using v8::FunctionCallbackInfo;
12
using v8::FunctionTemplate;
13
using v8::HandleScope;
14
using v8::Local;
15
using v8::Object;
16
using v8::String;
17
using v8::Value;
18
19
15
StreamPipe::StreamPipe(StreamBase* source,
20
                       StreamBase* sink,
21
15
                       Local<Object> obj)
22
15
    : AsyncWrap(source->stream_env(), obj, AsyncWrap::PROVIDER_STREAMPIPE) {
23
15
  MakeWeak();
24
25
15
  CHECK_NOT_NULL(sink);
26
15
  CHECK_NOT_NULL(source);
27
28
15
  source->PushStreamListener(&readable_listener_);
29
15
  sink->PushStreamListener(&writable_listener_);
30
31
15
  uses_wants_write_ = sink->HasWantsWrite();
32
33
  // Set up links between this object and the source/sink objects.
34
  // In particular, this makes sure that they are garbage collected as a group,
35
  // if that applies to the given streams (for example, Http2Streams use
36
  // weak references).
37
75
  obj->Set(env()->context(), env()->source_string(), source->GetObject())
38
      .Check();
39
60
  source->GetObject()->Set(env()->context(), env()->pipe_target_string(), obj)
40
      .Check();
41
75
  obj->Set(env()->context(), env()->sink_string(), sink->GetObject())
42
      .Check();
43
60
  sink->GetObject()->Set(env()->context(), env()->pipe_source_string(), obj)
44
      .Check();
45
15
}
46
47
45
StreamPipe::~StreamPipe() {
48
15
  Unpipe(true);
49
30
}
50
51
640
StreamBase* StreamPipe::source() {
52
640
  return static_cast<StreamBase*>(readable_listener_.stream());
53
}
54
55
231
StreamBase* StreamPipe::sink() {
56
231
  return static_cast<StreamBase*>(writable_listener_.stream());
57
}
58
59
29
void StreamPipe::Unpipe(bool is_in_deletion) {
60
29
  if (is_closed_)
61
29
    return;
62
63
  // Note that we possibly cannot use virtual methods on `source` and `sink`
64
  // here, because this function can be called from their destructors via
65
  // `OnStreamDestroy()`.
66
15
  if (!source_destroyed_)
67
15
    source()->ReadStop();
68
69
15
  is_closed_ = true;
70
15
  is_reading_ = false;
71
15
  source()->RemoveStreamListener(&readable_listener_);
72
15
  if (pending_writes_ == 0)
73
14
    sink()->RemoveStreamListener(&writable_listener_);
74
75
15
  if (is_in_deletion) return;
76
77
  // Delay the JS-facing part with SetImmediate, because this might be from
78
  // inside the garbage collector, so we can’t run JS here.
79
28
  HandleScope handle_scope(env()->isolate());
80
28
  BaseObjectPtr<StreamPipe> strong_ref{this};
81
112
  env()->SetImmediate([this, strong_ref](Environment* env) {
82
28
    HandleScope handle_scope(env->isolate());
83
28
    Context::Scope context_scope(env->context());
84
14
    Local<Object> object = this->object();
85
86
    Local<Value> onunpipe;
87
56
    if (!object->Get(env->context(), env->onunpipe_string()).ToLocal(&onunpipe))
88
      return;
89

42
    if (onunpipe->IsFunction() &&
90
42
        MakeCallback(onunpipe.As<Function>(), 0, nullptr).IsEmpty()) {
91
      return;
92
    }
93
94
    // Set all the links established in the constructor to `null`.
95
14
    Local<Value> null = Null(env->isolate());
96
97
    Local<Value> source_v;
98
    Local<Value> sink_v;
99

98
    if (!object->Get(env->context(), env->source_string()).ToLocal(&source_v) ||
100
84
        !object->Get(env->context(), env->sink_string()).ToLocal(&sink_v) ||
101

42
        !source_v->IsObject() || !sink_v->IsObject()) {
102
      return;
103
    }
104
105

98
    if (object->Set(env->context(), env->source_string(), null).IsNothing() ||
106
84
        object->Set(env->context(), env->sink_string(), null).IsNothing() ||
107
28
        source_v.As<Object>()
108
70
            ->Set(env->context(), env->pipe_target_string(), null)
109

42
            .IsNothing() ||
110
28
        sink_v.As<Object>()
111
70
            ->Set(env->context(), env->pipe_source_string(), null)
112
14
            .IsNothing()) {
113
      return;
114
    }
115
14
  });
116
}
117
118
207
uv_buf_t StreamPipe::ReadableListener::OnStreamAlloc(size_t suggested_size) {
119
207
  StreamPipe* pipe = ContainerOf(&StreamPipe::readable_listener_, this);
120
207
  size_t size = std::min(suggested_size, pipe->wanted_data_);
121
207
  CHECK_GT(size, 0);
122
207
  return AllocatedBuffer::AllocateManaged(pipe->env(), size).release();
123
}
124
125
212
void StreamPipe::ReadableListener::OnStreamRead(ssize_t nread,
126
                                                const uv_buf_t& buf_) {
127
212
  StreamPipe* pipe = ContainerOf(&StreamPipe::readable_listener_, this);
128
411
  AllocatedBuffer buf(pipe->env(), buf_);
129
212
  if (nread < 0) {
130
    // EOF or error; stop reading and pass the error to the previous listener
131
    // (which might end up in JS).
132
13
    pipe->is_eof_ = true;
133
    // Cache `sink()` here because the previous listener might do things
134
    // that eventually lead to an `Unpipe()` call.
135
13
    StreamBase* sink = pipe->sink();
136
13
    stream()->ReadStop();
137
13
    CHECK_NOT_NULL(previous_listener_);
138
13
    previous_listener_->OnStreamRead(nread, uv_buf_init(nullptr, 0));
139
    // If we’re not writing, close now. Otherwise, we’ll do that in
140
    // `OnStreamAfterWrite()`.
141
13
    if (pipe->pending_writes_ == 0) {
142
8
      sink->Shutdown();
143
8
      pipe->Unpipe();
144
    }
145
13
    return;
146
  }
147
148
199
  pipe->ProcessData(nread, std::move(buf));
149
}
150
151
199
void StreamPipe::ProcessData(size_t nread, AllocatedBuffer&& buf) {
152

199
  CHECK(uses_wants_write_ || pending_writes_ == 0);
153
199
  uv_buf_t buffer = uv_buf_init(buf.data(), nread);
154
199
  StreamWriteResult res = sink()->Write(&buffer, 1);
155
199
  pending_writes_++;
156
199
  if (!res.async) {
157
    writable_listener_.OnStreamAfterWrite(nullptr, res.err);
158
  } else {
159
199
    is_reading_ = false;
160
199
    res.wrap->SetAllocatedStorage(std::move(buf));
161
199
    if (source() != nullptr)
162
199
      source()->ReadStop();
163
  }
164
199
}
165
166
198
void StreamPipe::WritableListener::OnStreamAfterWrite(WriteWrap* w,
167
                                                      int status) {
168
198
  StreamPipe* pipe = ContainerOf(&StreamPipe::writable_listener_, this);
169
198
  pipe->pending_writes_--;
170
198
  if (pipe->is_closed_) {
171
    if (pipe->pending_writes_ == 0) {
172
      Environment* env = pipe->env();
173
      HandleScope handle_scope(env->isolate());
174
      Context::Scope context_scope(env->context());
175
      pipe->MakeCallback(env->oncomplete_string(), 0, nullptr).ToLocalChecked();
176
      stream()->RemoveStreamListener(this);
177
    }
178
    return;
179
  }
180
181
198
  if (pipe->is_eof_) {
182
10
    HandleScope handle_scope(pipe->env()->isolate());
183
    InternalCallbackScope callback_scope(pipe,
184
10
        InternalCallbackScope::kSkipTaskQueues);
185
5
    pipe->sink()->Shutdown();
186
5
    pipe->Unpipe();
187
5
    return;
188
  }
189
190
193
  if (status != 0) {
191
1
    CHECK_NOT_NULL(previous_listener_);
192
1
    StreamListener* prev = previous_listener_;
193
1
    pipe->Unpipe();
194
1
    prev->OnStreamAfterWrite(w, status);
195
1
    return;
196
  }
197
198
192
  if (!pipe->uses_wants_write_) {
199
    OnStreamWantsWrite(65536);
200
  }
201
}
202
203
void StreamPipe::WritableListener::OnStreamAfterShutdown(ShutdownWrap* w,
204
                                                         int status) {
205
  StreamPipe* pipe = ContainerOf(&StreamPipe::writable_listener_, this);
206
  CHECK_NOT_NULL(previous_listener_);
207
  StreamListener* prev = previous_listener_;
208
  pipe->Unpipe();
209
  prev->OnStreamAfterShutdown(w, status);
210
}
211
212
void StreamPipe::ReadableListener::OnStreamDestroy() {
213
  StreamPipe* pipe = ContainerOf(&StreamPipe::readable_listener_, this);
214
  pipe->source_destroyed_ = true;
215
  if (!pipe->is_eof_) {
216
    OnStreamRead(UV_EPIPE, uv_buf_init(nullptr, 0));
217
  }
218
}
219
220
void StreamPipe::WritableListener::OnStreamDestroy() {
221
  StreamPipe* pipe = ContainerOf(&StreamPipe::writable_listener_, this);
222
  pipe->sink_destroyed_ = true;
223
  pipe->is_eof_ = true;
224
  pipe->pending_writes_ = 0;
225
  pipe->Unpipe();
226
}
227
228
227
void StreamPipe::WritableListener::OnStreamWantsWrite(size_t suggested_size) {
229
227
  StreamPipe* pipe = ContainerOf(&StreamPipe::writable_listener_, this);
230
227
  pipe->wanted_data_ = suggested_size;
231

227
  if (pipe->is_reading_ || pipe->is_closed_)
232
15
    return;
233
424
  HandleScope handle_scope(pipe->env()->isolate());
234
  InternalCallbackScope callback_scope(pipe,
235
424
      InternalCallbackScope::kSkipTaskQueues);
236
212
  pipe->is_reading_ = true;
237
212
  pipe->source()->ReadStart();
238
}
239
240
uv_buf_t StreamPipe::WritableListener::OnStreamAlloc(size_t suggested_size) {
241
  CHECK_NOT_NULL(previous_listener_);
242
  return previous_listener_->OnStreamAlloc(suggested_size);
243
}
244
245
void StreamPipe::WritableListener::OnStreamRead(ssize_t nread,
246
                                                const uv_buf_t& buf) {
247
  CHECK_NOT_NULL(previous_listener_);
248
  return previous_listener_->OnStreamRead(nread, buf);
249
}
250
251
15
void StreamPipe::New(const FunctionCallbackInfo<Value>& args) {
252
15
  CHECK(args.IsConstructCall());
253
30
  CHECK(args[0]->IsObject());
254
30
  CHECK(args[1]->IsObject());
255
30
  StreamBase* source = StreamBase::FromObject(args[0].As<Object>());
256
30
  StreamBase* sink = StreamBase::FromObject(args[1].As<Object>());
257
258
15
  new StreamPipe(source, sink, args.This());
259
15
}
260
261
15
void StreamPipe::Start(const FunctionCallbackInfo<Value>& args) {
262
  StreamPipe* pipe;
263
15
  ASSIGN_OR_RETURN_UNWRAP(&pipe, args.Holder());
264
15
  pipe->is_closed_ = false;
265
15
  pipe->writable_listener_.OnStreamWantsWrite(65536);
266
}
267
268
void StreamPipe::Unpipe(const FunctionCallbackInfo<Value>& args) {
269
  StreamPipe* pipe;
270
  ASSIGN_OR_RETURN_UNWRAP(&pipe, args.Holder());
271
  pipe->Unpipe();
272
}
273
274
void StreamPipe::IsClosed(const FunctionCallbackInfo<Value>& args) {
275
  StreamPipe* pipe;
276
  ASSIGN_OR_RETURN_UNWRAP(&pipe, args.Holder());
277
  args.GetReturnValue().Set(pipe->is_closed_);
278
}
279
280
void StreamPipe::PendingWrites(const FunctionCallbackInfo<Value>& args) {
281
  StreamPipe* pipe;
282
  ASSIGN_OR_RETURN_UNWRAP(&pipe, args.Holder());
283
  args.GetReturnValue().Set(pipe->pending_writes_);
284
}
285
286
namespace {
287
288
234
void InitializeStreamPipe(Local<Object> target,
289
                          Local<Value> unused,
290
                          Local<Context> context,
291
                          void* priv) {
292
234
  Environment* env = Environment::GetCurrent(context);
293
294
  // Create FunctionTemplate for FileHandle::CloseReq
295
234
  Local<FunctionTemplate> pipe = env->NewFunctionTemplate(StreamPipe::New);
296
  Local<String> stream_pipe_string =
297
234
      FIXED_ONE_BYTE_STRING(env->isolate(), "StreamPipe");
298
234
  env->SetProtoMethod(pipe, "unpipe", StreamPipe::Unpipe);
299
234
  env->SetProtoMethod(pipe, "start", StreamPipe::Start);
300
234
  env->SetProtoMethod(pipe, "isClosed", StreamPipe::IsClosed);
301
234
  env->SetProtoMethod(pipe, "pendingWrites", StreamPipe::PendingWrites);
302
468
  pipe->Inherit(AsyncWrap::GetConstructorTemplate(env));
303
234
  pipe->SetClassName(stream_pipe_string);
304
702
  pipe->InstanceTemplate()->SetInternalFieldCount(
305
234
      StreamPipe::kInternalFieldCount);
306
  target
307
468
      ->Set(context, stream_pipe_string,
308
702
            pipe->GetFunction(context).ToLocalChecked())
309
      .Check();
310
234
}
311
312
}  // anonymous namespace
313
314
}  // namespace node
315
316

17859
NODE_MODULE_CONTEXT_AWARE_INTERNAL(stream_pipe,
317
                                   node::InitializeStreamPipe)