GCC Code Coverage Report
Directory: ../ Exec Total Coverage
File: /home/iojs/build/workspace/node-test-commit-linux-coverage-daily/nodes/benchmark/out/../src/stream_pipe.cc Lines: 137 182 75.3 %
Date: 2020-02-27 22:14:15 Branches: 49 98 50.0 %

Line Branch Exec Source
1
#include "stream_pipe.h"
2
#include "stream_base-inl.h"
3
#include "node_buffer.h"
4
#include "util-inl.h"
5
6
using v8::Context;
7
using v8::Function;
8
using v8::FunctionCallbackInfo;
9
using v8::FunctionTemplate;
10
using v8::Local;
11
using v8::Object;
12
using v8::Value;
13
14
namespace node {
15
16
14
StreamPipe::StreamPipe(StreamBase* source,
17
                       StreamBase* sink,
18
14
                       Local<Object> obj)
19
14
    : AsyncWrap(source->stream_env(), obj, AsyncWrap::PROVIDER_STREAMPIPE) {
20
14
  MakeWeak();
21
22
14
  CHECK_NOT_NULL(sink);
23
14
  CHECK_NOT_NULL(source);
24
25
14
  source->PushStreamListener(&readable_listener_);
26
14
  sink->PushStreamListener(&writable_listener_);
27
28
14
  uses_wants_write_ = sink->HasWantsWrite();
29
30
  // Set up links between this object and the source/sink objects.
31
  // In particular, this makes sure that they are garbage collected as a group,
32
  // if that applies to the given streams (for example, Http2Streams use
33
  // weak references).
34
70
  obj->Set(env()->context(), env()->source_string(), source->GetObject())
35
      .Check();
36
56
  source->GetObject()->Set(env()->context(), env()->pipe_target_string(), obj)
37
      .Check();
38
70
  obj->Set(env()->context(), env()->sink_string(), sink->GetObject())
39
      .Check();
40
56
  sink->GetObject()->Set(env()->context(), env()->pipe_source_string(), obj)
41
      .Check();
42
14
}
43
44
42
StreamPipe::~StreamPipe() {
45
14
  Unpipe(true);
46
28
}
47
48
619
StreamBase* StreamPipe::source() {
49
619
  return static_cast<StreamBase*>(readable_listener_.stream());
50
}
51
52
223
StreamBase* StreamPipe::sink() {
53
223
  return static_cast<StreamBase*>(writable_listener_.stream());
54
}
55
56
27
void StreamPipe::Unpipe(bool is_in_deletion) {
57
27
  if (is_closed_)
58
27
    return;
59
60
  // Note that we possibly cannot use virtual methods on `source` and `sink`
61
  // here, because this function can be called from their destructors via
62
  // `OnStreamDestroy()`.
63
14
  if (!source_destroyed_)
64
14
    source()->ReadStop();
65
66
14
  is_closed_ = true;
67
14
  is_reading_ = false;
68
14
  source()->RemoveStreamListener(&readable_listener_);
69
14
  if (pending_writes_ == 0)
70
13
    sink()->RemoveStreamListener(&writable_listener_);
71
72
14
  if (is_in_deletion) return;
73
74
  // Delay the JS-facing part with SetImmediate, because this might be from
75
  // inside the garbage collector, so we can’t run JS here.
76
26
  HandleScope handle_scope(env()->isolate());
77
26
  BaseObjectPtr<StreamPipe> strong_ref{this};
78
104
  env()->SetImmediate([this, strong_ref](Environment* env) {
79
26
    HandleScope handle_scope(env->isolate());
80
26
    Context::Scope context_scope(env->context());
81
13
    Local<Object> object = this->object();
82
83
    Local<Value> onunpipe;
84
52
    if (!object->Get(env->context(), env->onunpipe_string()).ToLocal(&onunpipe))
85
      return;
86

39
    if (onunpipe->IsFunction() &&
87
39
        MakeCallback(onunpipe.As<Function>(), 0, nullptr).IsEmpty()) {
88
      return;
89
    }
90
91
    // Set all the links established in the constructor to `null`.
92
13
    Local<Value> null = Null(env->isolate());
93
94
    Local<Value> source_v;
95
    Local<Value> sink_v;
96

91
    if (!object->Get(env->context(), env->source_string()).ToLocal(&source_v) ||
97
78
        !object->Get(env->context(), env->sink_string()).ToLocal(&sink_v) ||
98

39
        !source_v->IsObject() || !sink_v->IsObject()) {
99
      return;
100
    }
101
102

91
    if (object->Set(env->context(), env->source_string(), null).IsNothing() ||
103
78
        object->Set(env->context(), env->sink_string(), null).IsNothing() ||
104
26
        source_v.As<Object>()
105
65
            ->Set(env->context(), env->pipe_target_string(), null)
106

39
            .IsNothing() ||
107
26
        sink_v.As<Object>()
108
65
            ->Set(env->context(), env->pipe_source_string(), null)
109
13
            .IsNothing()) {
110
      return;
111
    }
112
13
  });
113
}
114
115
200
uv_buf_t StreamPipe::ReadableListener::OnStreamAlloc(size_t suggested_size) {
116
200
  StreamPipe* pipe = ContainerOf(&StreamPipe::readable_listener_, this);
117
200
  size_t size = std::min(suggested_size, pipe->wanted_data_);
118
200
  CHECK_GT(size, 0);
119
200
  return pipe->env()->AllocateManaged(size).release();
120
}
121
122
205
void StreamPipe::ReadableListener::OnStreamRead(ssize_t nread,
123
                                                const uv_buf_t& buf_) {
124
205
  StreamPipe* pipe = ContainerOf(&StreamPipe::readable_listener_, this);
125
398
  AllocatedBuffer buf(pipe->env(), buf_);
126
205
  if (nread < 0) {
127
    // EOF or error; stop reading and pass the error to the previous listener
128
    // (which might end up in JS).
129
12
    pipe->is_eof_ = true;
130
    // Cache `sink()` here because the previous listener might do things
131
    // that eventually lead to an `Unpipe()` call.
132
12
    StreamBase* sink = pipe->sink();
133
12
    stream()->ReadStop();
134
12
    CHECK_NOT_NULL(previous_listener_);
135
12
    previous_listener_->OnStreamRead(nread, uv_buf_init(nullptr, 0));
136
    // If we’re not writing, close now. Otherwise, we’ll do that in
137
    // `OnStreamAfterWrite()`.
138
12
    if (pipe->pending_writes_ == 0) {
139
7
      sink->Shutdown();
140
7
      pipe->Unpipe();
141
    }
142
12
    return;
143
  }
144
145
193
  pipe->ProcessData(nread, std::move(buf));
146
}
147
148
193
void StreamPipe::ProcessData(size_t nread, AllocatedBuffer&& buf) {
149

193
  CHECK(uses_wants_write_ || pending_writes_ == 0);
150
193
  uv_buf_t buffer = uv_buf_init(buf.data(), nread);
151
193
  StreamWriteResult res = sink()->Write(&buffer, 1);
152
193
  pending_writes_++;
153
193
  if (!res.async) {
154
    writable_listener_.OnStreamAfterWrite(nullptr, res.err);
155
  } else {
156
193
    is_reading_ = false;
157
193
    res.wrap->SetAllocatedStorage(std::move(buf));
158
193
    if (source() != nullptr)
159
193
      source()->ReadStop();
160
  }
161
193
}
162
163
192
void StreamPipe::WritableListener::OnStreamAfterWrite(WriteWrap* w,
164
                                                      int status) {
165
192
  StreamPipe* pipe = ContainerOf(&StreamPipe::writable_listener_, this);
166
192
  pipe->pending_writes_--;
167
192
  if (pipe->is_closed_) {
168
    if (pipe->pending_writes_ == 0) {
169
      Environment* env = pipe->env();
170
      HandleScope handle_scope(env->isolate());
171
      Context::Scope context_scope(env->context());
172
      pipe->MakeCallback(env->oncomplete_string(), 0, nullptr).ToLocalChecked();
173
      stream()->RemoveStreamListener(this);
174
    }
175
    return;
176
  }
177
178
192
  if (pipe->is_eof_) {
179
10
    HandleScope handle_scope(pipe->env()->isolate());
180
    InternalCallbackScope callback_scope(pipe,
181
10
        InternalCallbackScope::kSkipTaskQueues);
182
5
    pipe->sink()->Shutdown();
183
5
    pipe->Unpipe();
184
5
    return;
185
  }
186
187
187
  if (status != 0) {
188
1
    CHECK_NOT_NULL(previous_listener_);
189
1
    StreamListener* prev = previous_listener_;
190
1
    pipe->Unpipe();
191
1
    prev->OnStreamAfterWrite(w, status);
192
1
    return;
193
  }
194
195
186
  if (!pipe->uses_wants_write_) {
196
    OnStreamWantsWrite(65536);
197
  }
198
}
199
200
void StreamPipe::WritableListener::OnStreamAfterShutdown(ShutdownWrap* w,
201
                                                         int status) {
202
  StreamPipe* pipe = ContainerOf(&StreamPipe::writable_listener_, this);
203
  CHECK_NOT_NULL(previous_listener_);
204
  StreamListener* prev = previous_listener_;
205
  pipe->Unpipe();
206
  prev->OnStreamAfterShutdown(w, status);
207
}
208
209
void StreamPipe::ReadableListener::OnStreamDestroy() {
210
  StreamPipe* pipe = ContainerOf(&StreamPipe::readable_listener_, this);
211
  pipe->source_destroyed_ = true;
212
  if (!pipe->is_eof_) {
213
    OnStreamRead(UV_EPIPE, uv_buf_init(nullptr, 0));
214
  }
215
}
216
217
void StreamPipe::WritableListener::OnStreamDestroy() {
218
  StreamPipe* pipe = ContainerOf(&StreamPipe::writable_listener_, this);
219
  pipe->sink_destroyed_ = true;
220
  pipe->is_eof_ = true;
221
  pipe->pending_writes_ = 0;
222
  pipe->Unpipe();
223
}
224
225
219
void StreamPipe::WritableListener::OnStreamWantsWrite(size_t suggested_size) {
226
219
  StreamPipe* pipe = ContainerOf(&StreamPipe::writable_listener_, this);
227
219
  pipe->wanted_data_ = suggested_size;
228

219
  if (pipe->is_reading_ || pipe->is_closed_)
229
14
    return;
230
410
  HandleScope handle_scope(pipe->env()->isolate());
231
  InternalCallbackScope callback_scope(pipe,
232
410
      InternalCallbackScope::kSkipTaskQueues);
233
205
  pipe->is_reading_ = true;
234
205
  pipe->source()->ReadStart();
235
}
236
237
uv_buf_t StreamPipe::WritableListener::OnStreamAlloc(size_t suggested_size) {
238
  CHECK_NOT_NULL(previous_listener_);
239
  return previous_listener_->OnStreamAlloc(suggested_size);
240
}
241
242
void StreamPipe::WritableListener::OnStreamRead(ssize_t nread,
243
                                                const uv_buf_t& buf) {
244
  CHECK_NOT_NULL(previous_listener_);
245
  return previous_listener_->OnStreamRead(nread, buf);
246
}
247
248
14
void StreamPipe::New(const FunctionCallbackInfo<Value>& args) {
249
14
  CHECK(args.IsConstructCall());
250
28
  CHECK(args[0]->IsObject());
251
28
  CHECK(args[1]->IsObject());
252
28
  StreamBase* source = StreamBase::FromObject(args[0].As<Object>());
253
28
  StreamBase* sink = StreamBase::FromObject(args[1].As<Object>());
254
255
14
  new StreamPipe(source, sink, args.This());
256
14
}
257
258
14
void StreamPipe::Start(const FunctionCallbackInfo<Value>& args) {
259
  StreamPipe* pipe;
260
14
  ASSIGN_OR_RETURN_UNWRAP(&pipe, args.Holder());
261
14
  pipe->is_closed_ = false;
262
14
  pipe->writable_listener_.OnStreamWantsWrite(65536);
263
}
264
265
void StreamPipe::Unpipe(const FunctionCallbackInfo<Value>& args) {
266
  StreamPipe* pipe;
267
  ASSIGN_OR_RETURN_UNWRAP(&pipe, args.Holder());
268
  pipe->Unpipe();
269
}
270
271
void StreamPipe::IsClosed(const FunctionCallbackInfo<Value>& args) {
272
  StreamPipe* pipe;
273
  ASSIGN_OR_RETURN_UNWRAP(&pipe, args.Holder());
274
  args.GetReturnValue().Set(pipe->is_closed_);
275
}
276
277
void StreamPipe::PendingWrites(const FunctionCallbackInfo<Value>& args) {
278
  StreamPipe* pipe;
279
  ASSIGN_OR_RETURN_UNWRAP(&pipe, args.Holder());
280
  args.GetReturnValue().Set(pipe->pending_writes_);
281
}
282
283
namespace {
284
285
221
void InitializeStreamPipe(Local<Object> target,
286
                          Local<Value> unused,
287
                          Local<Context> context,
288
                          void* priv) {
289
221
  Environment* env = Environment::GetCurrent(context);
290
291
  // Create FunctionTemplate for FileHandle::CloseReq
292
221
  Local<FunctionTemplate> pipe = env->NewFunctionTemplate(StreamPipe::New);
293
  Local<String> stream_pipe_string =
294
221
      FIXED_ONE_BYTE_STRING(env->isolate(), "StreamPipe");
295
221
  env->SetProtoMethod(pipe, "unpipe", StreamPipe::Unpipe);
296
221
  env->SetProtoMethod(pipe, "start", StreamPipe::Start);
297
221
  env->SetProtoMethod(pipe, "isClosed", StreamPipe::IsClosed);
298
221
  env->SetProtoMethod(pipe, "pendingWrites", StreamPipe::PendingWrites);
299
442
  pipe->Inherit(AsyncWrap::GetConstructorTemplate(env));
300
221
  pipe->SetClassName(stream_pipe_string);
301
442
  pipe->InstanceTemplate()->SetInternalFieldCount(1);
302
  target
303
442
      ->Set(context, stream_pipe_string,
304
663
            pipe->GetFunction(context).ToLocalChecked())
305
      .Check();
306
221
}
307
308
}  // anonymous namespace
309
310
}  // namespace node
311
312
4201
NODE_MODULE_CONTEXT_AWARE_INTERNAL(stream_pipe,
313
                                   node::InitializeStreamPipe)