GCC Code Coverage Report
Directory: ../ Exec Total Coverage
File: /home/iojs/build/workspace/node-test-commit-linux-coverage-daily/nodes/benchmark/out/../src/stream_pipe.cc Lines: 1 175 0.6 %
Date: 2021-02-19 04:08:54 Branches: 2 102 2.0 %

Line Branch Exec Source
1
#include "stream_pipe.h"
2
#include "allocated_buffer-inl.h"
3
#include "stream_base-inl.h"
4
#include "node_buffer.h"
5
#include "util-inl.h"
6
7
namespace node {
8
9
using v8::Context;
10
using v8::Function;
11
using v8::FunctionCallbackInfo;
12
using v8::FunctionTemplate;
13
using v8::HandleScope;
14
using v8::Local;
15
using v8::Object;
16
using v8::Value;
17
18
StreamPipe::StreamPipe(StreamBase* source,
19
                       StreamBase* sink,
20
                       Local<Object> obj)
21
    : AsyncWrap(source->stream_env(), obj, AsyncWrap::PROVIDER_STREAMPIPE) {
22
  MakeWeak();
23
24
  CHECK_NOT_NULL(sink);
25
  CHECK_NOT_NULL(source);
26
27
  source->PushStreamListener(&readable_listener_);
28
  sink->PushStreamListener(&writable_listener_);
29
30
  uses_wants_write_ = sink->HasWantsWrite();
31
32
  // Set up links between this object and the source/sink objects.
33
  // In particular, this makes sure that they are garbage collected as a group,
34
  // if that applies to the given streams (for example, Http2Streams use
35
  // weak references).
36
  obj->Set(env()->context(), env()->source_string(), source->GetObject())
37
      .Check();
38
  source->GetObject()->Set(env()->context(), env()->pipe_target_string(), obj)
39
      .Check();
40
  obj->Set(env()->context(), env()->sink_string(), sink->GetObject())
41
      .Check();
42
  sink->GetObject()->Set(env()->context(), env()->pipe_source_string(), obj)
43
      .Check();
44
}
45
46
StreamPipe::~StreamPipe() {
47
  Unpipe(true);
48
}
49
50
StreamBase* StreamPipe::source() {
51
  return static_cast<StreamBase*>(readable_listener_.stream());
52
}
53
54
StreamBase* StreamPipe::sink() {
55
  return static_cast<StreamBase*>(writable_listener_.stream());
56
}
57
58
void StreamPipe::Unpipe(bool is_in_deletion) {
59
  if (is_closed_)
60
    return;
61
62
  // Note that we possibly cannot use virtual methods on `source` and `sink`
63
  // here, because this function can be called from their destructors via
64
  // `OnStreamDestroy()`.
65
  if (!source_destroyed_)
66
    source()->ReadStop();
67
68
  is_closed_ = true;
69
  is_reading_ = false;
70
  source()->RemoveStreamListener(&readable_listener_);
71
  if (pending_writes_ == 0)
72
    sink()->RemoveStreamListener(&writable_listener_);
73
74
  if (is_in_deletion) return;
75
76
  // Delay the JS-facing part with SetImmediate, because this might be from
77
  // inside the garbage collector, so we can’t run JS here.
78
  HandleScope handle_scope(env()->isolate());
79
  BaseObjectPtr<StreamPipe> strong_ref{this};
80
  env()->SetImmediate([this, strong_ref](Environment* env) {
81
    HandleScope handle_scope(env->isolate());
82
    Context::Scope context_scope(env->context());
83
    Local<Object> object = this->object();
84
85
    Local<Value> onunpipe;
86
    if (!object->Get(env->context(), env->onunpipe_string()).ToLocal(&onunpipe))
87
      return;
88
    if (onunpipe->IsFunction() &&
89
        MakeCallback(onunpipe.As<Function>(), 0, nullptr).IsEmpty()) {
90
      return;
91
    }
92
93
    // Set all the links established in the constructor to `null`.
94
    Local<Value> null = Null(env->isolate());
95
96
    Local<Value> source_v;
97
    Local<Value> sink_v;
98
    if (!object->Get(env->context(), env->source_string()).ToLocal(&source_v) ||
99
        !object->Get(env->context(), env->sink_string()).ToLocal(&sink_v) ||
100
        !source_v->IsObject() || !sink_v->IsObject()) {
101
      return;
102
    }
103
104
    if (object->Set(env->context(), env->source_string(), null).IsNothing() ||
105
        object->Set(env->context(), env->sink_string(), null).IsNothing() ||
106
        source_v.As<Object>()
107
            ->Set(env->context(), env->pipe_target_string(), null)
108
            .IsNothing() ||
109
        sink_v.As<Object>()
110
            ->Set(env->context(), env->pipe_source_string(), null)
111
            .IsNothing()) {
112
      return;
113
    }
114
  });
115
}
116
117
uv_buf_t StreamPipe::ReadableListener::OnStreamAlloc(size_t suggested_size) {
118
  StreamPipe* pipe = ContainerOf(&StreamPipe::readable_listener_, this);
119
  size_t size = std::min(suggested_size, pipe->wanted_data_);
120
  CHECK_GT(size, 0);
121
  return AllocatedBuffer::AllocateManaged(pipe->env(), size).release();
122
}
123
124
void StreamPipe::ReadableListener::OnStreamRead(ssize_t nread,
125
                                                const uv_buf_t& buf_) {
126
  StreamPipe* pipe = ContainerOf(&StreamPipe::readable_listener_, this);
127
  AllocatedBuffer buf(pipe->env(), buf_);
128
  if (nread < 0) {
129
    // EOF or error; stop reading and pass the error to the previous listener
130
    // (which might end up in JS).
131
    pipe->is_eof_ = true;
132
    // Cache `sink()` here because the previous listener might do things
133
    // that eventually lead to an `Unpipe()` call.
134
    StreamBase* sink = pipe->sink();
135
    stream()->ReadStop();
136
    CHECK_NOT_NULL(previous_listener_);
137
    previous_listener_->OnStreamRead(nread, uv_buf_init(nullptr, 0));
138
    // If we’re not writing, close now. Otherwise, we’ll do that in
139
    // `OnStreamAfterWrite()`.
140
    if (pipe->pending_writes_ == 0) {
141
      sink->Shutdown();
142
      pipe->Unpipe();
143
    }
144
    return;
145
  }
146
147
  pipe->ProcessData(nread, std::move(buf));
148
}
149
150
void StreamPipe::ProcessData(size_t nread, AllocatedBuffer&& buf) {
151
  CHECK(uses_wants_write_ || pending_writes_ == 0);
152
  uv_buf_t buffer = uv_buf_init(buf.data(), nread);
153
  StreamWriteResult res = sink()->Write(&buffer, 1);
154
  pending_writes_++;
155
  if (!res.async) {
156
    writable_listener_.OnStreamAfterWrite(nullptr, res.err);
157
  } else {
158
    is_reading_ = false;
159
    res.wrap->SetAllocatedStorage(std::move(buf));
160
    if (source() != nullptr)
161
      source()->ReadStop();
162
  }
163
}
164
165
void StreamPipe::WritableListener::OnStreamAfterWrite(WriteWrap* w,
166
                                                      int status) {
167
  StreamPipe* pipe = ContainerOf(&StreamPipe::writable_listener_, this);
168
  pipe->pending_writes_--;
169
  if (pipe->is_closed_) {
170
    if (pipe->pending_writes_ == 0) {
171
      Environment* env = pipe->env();
172
      HandleScope handle_scope(env->isolate());
173
      Context::Scope context_scope(env->context());
174
      pipe->MakeCallback(env->oncomplete_string(), 0, nullptr).ToLocalChecked();
175
      stream()->RemoveStreamListener(this);
176
    }
177
    return;
178
  }
179
180
  if (pipe->is_eof_) {
181
    HandleScope handle_scope(pipe->env()->isolate());
182
    InternalCallbackScope callback_scope(pipe,
183
        InternalCallbackScope::kSkipTaskQueues);
184
    pipe->sink()->Shutdown();
185
    pipe->Unpipe();
186
    return;
187
  }
188
189
  if (status != 0) {
190
    CHECK_NOT_NULL(previous_listener_);
191
    StreamListener* prev = previous_listener_;
192
    pipe->Unpipe();
193
    prev->OnStreamAfterWrite(w, status);
194
    return;
195
  }
196
197
  if (!pipe->uses_wants_write_) {
198
    OnStreamWantsWrite(65536);
199
  }
200
}
201
202
void StreamPipe::WritableListener::OnStreamAfterShutdown(ShutdownWrap* w,
203
                                                         int status) {
204
  StreamPipe* pipe = ContainerOf(&StreamPipe::writable_listener_, this);
205
  CHECK_NOT_NULL(previous_listener_);
206
  StreamListener* prev = previous_listener_;
207
  pipe->Unpipe();
208
  prev->OnStreamAfterShutdown(w, status);
209
}
210
211
void StreamPipe::ReadableListener::OnStreamDestroy() {
212
  StreamPipe* pipe = ContainerOf(&StreamPipe::readable_listener_, this);
213
  pipe->source_destroyed_ = true;
214
  if (!pipe->is_eof_) {
215
    OnStreamRead(UV_EPIPE, uv_buf_init(nullptr, 0));
216
  }
217
}
218
219
void StreamPipe::WritableListener::OnStreamDestroy() {
220
  StreamPipe* pipe = ContainerOf(&StreamPipe::writable_listener_, this);
221
  pipe->sink_destroyed_ = true;
222
  pipe->is_eof_ = true;
223
  pipe->pending_writes_ = 0;
224
  pipe->Unpipe();
225
}
226
227
void StreamPipe::WritableListener::OnStreamWantsWrite(size_t suggested_size) {
228
  StreamPipe* pipe = ContainerOf(&StreamPipe::writable_listener_, this);
229
  pipe->wanted_data_ = suggested_size;
230
  if (pipe->is_reading_ || pipe->is_closed_)
231
    return;
232
  HandleScope handle_scope(pipe->env()->isolate());
233
  InternalCallbackScope callback_scope(pipe,
234
      InternalCallbackScope::kSkipTaskQueues);
235
  pipe->is_reading_ = true;
236
  pipe->source()->ReadStart();
237
}
238
239
uv_buf_t StreamPipe::WritableListener::OnStreamAlloc(size_t suggested_size) {
240
  CHECK_NOT_NULL(previous_listener_);
241
  return previous_listener_->OnStreamAlloc(suggested_size);
242
}
243
244
void StreamPipe::WritableListener::OnStreamRead(ssize_t nread,
245
                                                const uv_buf_t& buf) {
246
  CHECK_NOT_NULL(previous_listener_);
247
  return previous_listener_->OnStreamRead(nread, buf);
248
}
249
250
void StreamPipe::New(const FunctionCallbackInfo<Value>& args) {
251
  CHECK(args.IsConstructCall());
252
  CHECK(args[0]->IsObject());
253
  CHECK(args[1]->IsObject());
254
  StreamBase* source = StreamBase::FromObject(args[0].As<Object>());
255
  StreamBase* sink = StreamBase::FromObject(args[1].As<Object>());
256
257
  new StreamPipe(source, sink, args.This());
258
}
259
260
void StreamPipe::Start(const FunctionCallbackInfo<Value>& args) {
261
  StreamPipe* pipe;
262
  ASSIGN_OR_RETURN_UNWRAP(&pipe, args.Holder());
263
  pipe->is_closed_ = false;
264
  pipe->writable_listener_.OnStreamWantsWrite(65536);
265
}
266
267
void StreamPipe::Unpipe(const FunctionCallbackInfo<Value>& args) {
268
  StreamPipe* pipe;
269
  ASSIGN_OR_RETURN_UNWRAP(&pipe, args.Holder());
270
  pipe->Unpipe();
271
}
272
273
void StreamPipe::IsClosed(const FunctionCallbackInfo<Value>& args) {
274
  StreamPipe* pipe;
275
  ASSIGN_OR_RETURN_UNWRAP(&pipe, args.Holder());
276
  args.GetReturnValue().Set(pipe->is_closed_);
277
}
278
279
void StreamPipe::PendingWrites(const FunctionCallbackInfo<Value>& args) {
280
  StreamPipe* pipe;
281
  ASSIGN_OR_RETURN_UNWRAP(&pipe, args.Holder());
282
  args.GetReturnValue().Set(pipe->pending_writes_);
283
}
284
285
namespace {
286
287
void InitializeStreamPipe(Local<Object> target,
288
                          Local<Value> unused,
289
                          Local<Context> context,
290
                          void* priv) {
291
  Environment* env = Environment::GetCurrent(context);
292
293
  // Create FunctionTemplate for FileHandle::CloseReq
294
  Local<FunctionTemplate> pipe = env->NewFunctionTemplate(StreamPipe::New);
295
  env->SetProtoMethod(pipe, "unpipe", StreamPipe::Unpipe);
296
  env->SetProtoMethod(pipe, "start", StreamPipe::Start);
297
  env->SetProtoMethod(pipe, "isClosed", StreamPipe::IsClosed);
298
  env->SetProtoMethod(pipe, "pendingWrites", StreamPipe::PendingWrites);
299
  pipe->Inherit(AsyncWrap::GetConstructorTemplate(env));
300
  pipe->InstanceTemplate()->SetInternalFieldCount(
301
      StreamPipe::kInternalFieldCount);
302
  env->SetConstructorFunction(target, "StreamPipe", pipe);
303
}
304
305
}  // anonymous namespace
306
307
}  // namespace node
308
309

481
NODE_MODULE_CONTEXT_AWARE_INTERNAL(stream_pipe,
310
                                   node::InitializeStreamPipe)