GCC Code Coverage Report
Directory: ../ Exec Total Coverage
File: /home/iojs/build/workspace/node-test-commit-linux-coverage-daily/nodes/benchmark/out/../src/stream_pipe.cc Lines: 131 162 80.9 %
Date: 2019-09-16 22:31:04 Branches: 61 118 51.7 %

Line Branch Exec Source
1
#include "stream_pipe.h"
2
#include "stream_base-inl.h"
3
#include "node_buffer.h"
4
#include "util-inl.h"
5
6
using v8::Context;
7
using v8::Function;
8
using v8::FunctionCallbackInfo;
9
using v8::FunctionTemplate;
10
using v8::Local;
11
using v8::Object;
12
using v8::Value;
13
14
namespace node {
15
16
15
StreamPipe::StreamPipe(StreamBase* source,
17
                       StreamBase* sink,
18
                       Local<Object> obj)
19
15
    : AsyncWrap(source->stream_env(), obj, AsyncWrap::PROVIDER_STREAMPIPE) {
20
15
  MakeWeak();
21
22
15
  CHECK_NOT_NULL(sink);
23
15
  CHECK_NOT_NULL(source);
24
25
15
  source->PushStreamListener(&readable_listener_);
26
15
  sink->PushStreamListener(&writable_listener_);
27
28
15
  CHECK(sink->HasWantsWrite());
29
30
  // Set up links between this object and the source/sink objects.
31
  // In particular, this makes sure that they are garbage collected as a group,
32
  // if that applies to the given streams (for example, Http2Streams use
33
  // weak references).
34
75
  obj->Set(env()->context(), env()->source_string(), source->GetObject())
35
30
      .Check();
36
60
  source->GetObject()->Set(env()->context(), env()->pipe_target_string(), obj)
37
30
      .Check();
38
75
  obj->Set(env()->context(), env()->sink_string(), sink->GetObject())
39
30
      .Check();
40
60
  sink->GetObject()->Set(env()->context(), env()->pipe_source_string(), obj)
41
30
      .Check();
42
15
}
43
44
45
StreamPipe::~StreamPipe() {
45
15
  Unpipe();
46
30
}
47
48
685
StreamBase* StreamPipe::source() {
49
685
  return static_cast<StreamBase*>(readable_listener_.stream());
50
}
51
52
242
StreamBase* StreamPipe::sink() {
53
242
  return static_cast<StreamBase*>(writable_listener_.stream());
54
}
55
56
29
void StreamPipe::Unpipe() {
57
29
  if (is_closed_)
58
43
    return;
59
60
  // Note that we possibly cannot use virtual methods on `source` and `sink`
61
  // here, because this function can be called from their destructors via
62
  // `OnStreamDestroy()`.
63
15
  if (!source_destroyed_)
64
15
    source()->ReadStop();
65
66
15
  is_closed_ = true;
67
15
  is_reading_ = false;
68
15
  source()->RemoveStreamListener(&readable_listener_);
69
15
  sink()->RemoveStreamListener(&writable_listener_);
70
71
  // Delay the JS-facing part with SetImmediate, because this might be from
72
  // inside the garbage collector, so we can’t run JS here.
73
15
  HandleScope handle_scope(env()->isolate());
74
14
  env()->SetImmediate([this](Environment* env) {
75
14
    HandleScope handle_scope(env->isolate());
76
28
    Context::Scope context_scope(env->context());
77
14
    Local<Object> object = this->object();
78
79
    Local<Value> onunpipe;
80
56
    if (!object->Get(env->context(), env->onunpipe_string()).ToLocal(&onunpipe))
81
      return;
82

70
    if (onunpipe->IsFunction() &&
83
56
        MakeCallback(onunpipe.As<Function>(), 0, nullptr).IsEmpty()) {
84
      return;
85
    }
86
87
    // Set all the links established in the constructor to `null`.
88
14
    Local<Value> null = Null(env->isolate());
89
90
    Local<Value> source_v;
91
    Local<Value> sink_v;
92


112
    if (!object->Get(env->context(), env->source_string()).ToLocal(&source_v) ||
93

84
        !object->Get(env->context(), env->sink_string()).ToLocal(&sink_v) ||
94

70
        !source_v->IsObject() || !sink_v->IsObject()) {
95
      return;
96
    }
97
98


112
    if (object->Set(env->context(), env->source_string(), null).IsNothing() ||
99

98
        object->Set(env->context(), env->sink_string(), null).IsNothing() ||
100
        source_v.As<Object>()
101

70
            ->Set(env->context(), env->pipe_target_string(), null)
102

140
            .IsNothing() ||
103
        sink_v.As<Object>()
104

70
            ->Set(env->context(), env->pipe_source_string(), null)
105
70
            .IsNothing()) {
106
      return;
107
    }
108
29
  }, object());
109
}
110
111
221
uv_buf_t StreamPipe::ReadableListener::OnStreamAlloc(size_t suggested_size) {
112
221
  StreamPipe* pipe = ContainerOf(&StreamPipe::readable_listener_, this);
113
221
  size_t size = std::min(suggested_size, pipe->wanted_data_);
114
221
  CHECK_GT(size, 0);
115
221
  return pipe->env()->AllocateManaged(size).release();
116
}
117
118
227
void StreamPipe::ReadableListener::OnStreamRead(ssize_t nread,
119
                                                const uv_buf_t& buf_) {
120
227
  StreamPipe* pipe = ContainerOf(&StreamPipe::readable_listener_, this);
121
227
  AllocatedBuffer buf(pipe->env(), buf_);
122
441
  AsyncScope async_scope(pipe);
123
227
  if (nread < 0) {
124
    // EOF or error; stop reading and pass the error to the previous listener
125
    // (which might end up in JS).
126
13
    pipe->is_eof_ = true;
127
13
    stream()->ReadStop();
128
13
    CHECK_NOT_NULL(previous_listener_);
129
13
    previous_listener_->OnStreamRead(nread, uv_buf_init(nullptr, 0));
130
    // If we’re not writing, close now. Otherwise, we’ll do that in
131
    // `OnStreamAfterWrite()`.
132
13
    if (!pipe->is_writing_) {
133
7
      pipe->ShutdownWritable();
134
7
      pipe->Unpipe();
135
    }
136
240
    return;
137
  }
138
139
428
  pipe->ProcessData(nread, std::move(buf));
140
}
141
142
214
void StreamPipe::ProcessData(size_t nread, AllocatedBuffer&& buf) {
143
214
  uv_buf_t buffer = uv_buf_init(buf.data(), nread);
144
214
  StreamWriteResult res = sink()->Write(&buffer, 1);
145
214
  if (!res.async) {
146
    writable_listener_.OnStreamAfterWrite(nullptr, res.err);
147
  } else {
148
214
    is_writing_ = true;
149
214
    is_reading_ = false;
150
214
    res.wrap->SetAllocatedStorage(std::move(buf));
151
214
    if (source() != nullptr)
152
214
      source()->ReadStop();
153
  }
154
214
}
155
156
13
void StreamPipe::ShutdownWritable() {
157
13
  sink()->Shutdown();
158
13
}
159
160
211
void StreamPipe::WritableListener::OnStreamAfterWrite(WriteWrap* w,
161
                                                      int status) {
162
211
  StreamPipe* pipe = ContainerOf(&StreamPipe::writable_listener_, this);
163
211
  pipe->is_writing_ = false;
164
211
  if (pipe->is_eof_) {
165
6
    AsyncScope async_scope(pipe);
166
6
    pipe->ShutdownWritable();
167
6
    pipe->Unpipe();
168
6
    return;
169
  }
170
171
205
  if (status != 0) {
172
1
    CHECK_NOT_NULL(previous_listener_);
173
1
    StreamListener* prev = previous_listener_;
174
1
    pipe->Unpipe();
175
1
    prev->OnStreamAfterWrite(w, status);
176
1
    return;
177
  }
178
}
179
180
void StreamPipe::WritableListener::OnStreamAfterShutdown(ShutdownWrap* w,
181
                                                         int status) {
182
  StreamPipe* pipe = ContainerOf(&StreamPipe::writable_listener_, this);
183
  CHECK_NOT_NULL(previous_listener_);
184
  StreamListener* prev = previous_listener_;
185
  pipe->Unpipe();
186
  prev->OnStreamAfterShutdown(w, status);
187
}
188
189
void StreamPipe::ReadableListener::OnStreamDestroy() {
190
  StreamPipe* pipe = ContainerOf(&StreamPipe::readable_listener_, this);
191
  pipe->source_destroyed_ = true;
192
  if (!pipe->is_eof_) {
193
    OnStreamRead(UV_EPIPE, uv_buf_init(nullptr, 0));
194
  }
195
}
196
197
void StreamPipe::WritableListener::OnStreamDestroy() {
198
  StreamPipe* pipe = ContainerOf(&StreamPipe::writable_listener_, this);
199
  pipe->sink_destroyed_ = true;
200
  pipe->is_eof_ = true;
201
  pipe->Unpipe();
202
}
203
204
227
void StreamPipe::WritableListener::OnStreamWantsWrite(size_t suggested_size) {
205
227
  StreamPipe* pipe = ContainerOf(&StreamPipe::writable_listener_, this);
206
227
  pipe->wanted_data_ = suggested_size;
207

227
  if (pipe->is_reading_ || pipe->is_closed_)
208
227
    return;
209
227
  AsyncScope async_scope(pipe);
210
227
  pipe->is_reading_ = true;
211
227
  pipe->source()->ReadStart();
212
}
213
214
uv_buf_t StreamPipe::WritableListener::OnStreamAlloc(size_t suggested_size) {
215
  CHECK_NOT_NULL(previous_listener_);
216
  return previous_listener_->OnStreamAlloc(suggested_size);
217
}
218
219
void StreamPipe::WritableListener::OnStreamRead(ssize_t nread,
220
                                                const uv_buf_t& buf) {
221
  CHECK_NOT_NULL(previous_listener_);
222
  return previous_listener_->OnStreamRead(nread, buf);
223
}
224
225
15
void StreamPipe::New(const FunctionCallbackInfo<Value>& args) {
226
15
  CHECK(args.IsConstructCall());
227
30
  CHECK(args[0]->IsObject());
228
30
  CHECK(args[1]->IsObject());
229
30
  StreamBase* source = StreamBase::FromObject(args[0].As<Object>());
230
30
  StreamBase* sink = StreamBase::FromObject(args[1].As<Object>());
231
232
15
  new StreamPipe(source, sink, args.This());
233
15
}
234
235
15
void StreamPipe::Start(const FunctionCallbackInfo<Value>& args) {
236
  StreamPipe* pipe;
237
30
  ASSIGN_OR_RETURN_UNWRAP(&pipe, args.Holder());
238
15
  pipe->is_closed_ = false;
239
15
  if (pipe->wanted_data_ > 0)
240
    pipe->writable_listener_.OnStreamWantsWrite(pipe->wanted_data_);
241
}
242
243
void StreamPipe::Unpipe(const FunctionCallbackInfo<Value>& args) {
244
  StreamPipe* pipe;
245
  ASSIGN_OR_RETURN_UNWRAP(&pipe, args.Holder());
246
  pipe->Unpipe();
247
}
248
249
namespace {
250
251
226
void InitializeStreamPipe(Local<Object> target,
252
                          Local<Value> unused,
253
                          Local<Context> context,
254
                          void* priv) {
255
226
  Environment* env = Environment::GetCurrent(context);
256
257
  // Create FunctionTemplate for FileHandle::CloseReq
258
226
  Local<FunctionTemplate> pipe = env->NewFunctionTemplate(StreamPipe::New);
259
  Local<String> stream_pipe_string =
260
226
      FIXED_ONE_BYTE_STRING(env->isolate(), "StreamPipe");
261
226
  env->SetProtoMethod(pipe, "unpipe", StreamPipe::Unpipe);
262
226
  env->SetProtoMethod(pipe, "start", StreamPipe::Start);
263
452
  pipe->Inherit(AsyncWrap::GetConstructorTemplate(env));
264
226
  pipe->SetClassName(stream_pipe_string);
265
452
  pipe->InstanceTemplate()->SetInternalFieldCount(1);
266
  target
267
      ->Set(context, stream_pipe_string,
268
678
            pipe->GetFunction(context).ToLocalChecked())
269
452
      .Check();
270
226
}
271
272
}  // anonymous namespace
273
274
}  // namespace node
275
276
4952
NODE_MODULE_CONTEXT_AWARE_INTERNAL(stream_pipe,
277
                                   node::InitializeStreamPipe)