GCC Code Coverage Report
Directory: ../ Exec Total Coverage
File: /home/iojs/build/workspace/node-test-commit-linux-coverage-daily/nodes/benchmark/out/../src/stream_pipe.cc Lines: 132 164 80.5 %
Date: 2019-02-13 22:28:58 Branches: 59 118 50.0 %

Line Branch Exec Source
1
#include "stream_pipe.h"
2
#include "stream_base-inl.h"
3
#include "node_buffer.h"
4
5
using v8::Context;
6
using v8::External;
7
using v8::Function;
8
using v8::FunctionCallbackInfo;
9
using v8::FunctionTemplate;
10
using v8::Local;
11
using v8::Object;
12
using v8::Value;
13
14
namespace node {
15
16
14
StreamPipe::StreamPipe(StreamBase* source,
17
                       StreamBase* sink,
18
                       Local<Object> obj)
19
14
    : AsyncWrap(source->stream_env(), obj, AsyncWrap::PROVIDER_STREAMPIPE) {
20
14
  MakeWeak();
21
22
14
  CHECK_NOT_NULL(sink);
23
14
  CHECK_NOT_NULL(source);
24
25
14
  source->PushStreamListener(&readable_listener_);
26
14
  sink->PushStreamListener(&writable_listener_);
27
28
14
  CHECK(sink->HasWantsWrite());
29
30
  // Set up links between this object and the source/sink objects.
31
  // In particular, this makes sure that they are garbage collected as a group,
32
  // if that applies to the given streams (for example, Http2Streams use
33
  // weak references).
34
70
  obj->Set(env()->context(), env()->source_string(), source->GetObject())
35
28
      .FromJust();
36
56
  source->GetObject()->Set(env()->context(), env()->pipe_target_string(), obj)
37
28
      .FromJust();
38
70
  obj->Set(env()->context(), env()->sink_string(), sink->GetObject())
39
28
      .FromJust();
40
56
  sink->GetObject()->Set(env()->context(), env()->pipe_source_string(), obj)
41
28
      .FromJust();
42
14
}
43
44
42
StreamPipe::~StreamPipe() {
45
14
  CHECK(is_closed_);
46
28
}
47
48
677
StreamBase* StreamPipe::source() {
49
677
  return static_cast<StreamBase*>(readable_listener_.stream());
50
}
51
52
239
StreamBase* StreamPipe::sink() {
53
239
  return static_cast<StreamBase*>(writable_listener_.stream());
54
}
55
56
14
void StreamPipe::Unpipe() {
57
14
  if (is_closed_)
58
14
    return;
59
60
  // Note that we possibly cannot use virtual methods on `source` and `sink`
61
  // here, because this function can be called from their destructors via
62
  // `OnStreamDestroy()`.
63
14
  if (!source_destroyed_)
64
14
    source()->ReadStop();
65
66
14
  is_closed_ = true;
67
14
  is_reading_ = false;
68
14
  source()->RemoveStreamListener(&readable_listener_);
69
14
  sink()->RemoveStreamListener(&writable_listener_);
70
71
  // Delay the JS-facing part with SetImmediate, because this might be from
72
  // inside the garbage collector, so we can’t run JS here.
73
14
  HandleScope handle_scope(env()->isolate());
74
42
  env()->SetImmediate([](Environment* env, void* data) {
75
14
    StreamPipe* pipe = static_cast<StreamPipe*>(data);
76
77
14
    HandleScope handle_scope(env->isolate());
78
28
    Context::Scope context_scope(env->context());
79
14
    Local<Object> object = pipe->object();
80
81
    Local<Value> onunpipe;
82
56
    if (!object->Get(env->context(), env->onunpipe_string()).ToLocal(&onunpipe))
83
      return;
84

70
    if (onunpipe->IsFunction() &&
85
56
        pipe->MakeCallback(onunpipe.As<Function>(), 0, nullptr).IsEmpty()) {
86
      return;
87
    }
88
89
    // Set all the links established in the constructor to `null`.
90
14
    Local<Value> null = Null(env->isolate());
91
92
    Local<Value> source_v;
93
    Local<Value> sink_v;
94


112
    if (!object->Get(env->context(), env->source_string()).ToLocal(&source_v) ||
95

84
        !object->Get(env->context(), env->sink_string()).ToLocal(&sink_v) ||
96

70
        !source_v->IsObject() || !sink_v->IsObject()) {
97
      return;
98
    }
99
100


112
    if (object->Set(env->context(), env->source_string(), null).IsNothing() ||
101

98
        object->Set(env->context(), env->sink_string(), null).IsNothing() ||
102
        source_v.As<Object>()
103

70
            ->Set(env->context(), env->pipe_target_string(), null)
104

140
            .IsNothing() ||
105
        sink_v.As<Object>()
106

70
            ->Set(env->context(), env->pipe_source_string(), null)
107
70
            .IsNothing()) {
108
      return;
109
    }
110
56
  }, static_cast<void*>(this), object());
111
}
112
113
219
uv_buf_t StreamPipe::ReadableListener::OnStreamAlloc(size_t suggested_size) {
114
219
  StreamPipe* pipe = ContainerOf(&StreamPipe::readable_listener_, this);
115
219
  size_t size = std::min(suggested_size, pipe->wanted_data_);
116
219
  CHECK_GT(size, 0);
117
219
  return uv_buf_init(Malloc(size), size);
118
}
119
120
225
void StreamPipe::ReadableListener::OnStreamRead(ssize_t nread,
121
                                                const uv_buf_t& buf) {
122
225
  StreamPipe* pipe = ContainerOf(&StreamPipe::readable_listener_, this);
123
225
  AsyncScope async_scope(pipe);
124
225
  if (nread < 0) {
125
    // EOF or error; stop reading and pass the error to the previous listener
126
    // (which might end up in JS).
127
13
    free(buf.base);
128
13
    pipe->is_eof_ = true;
129
13
    stream()->ReadStop();
130
13
    CHECK_NOT_NULL(previous_listener_);
131
13
    previous_listener_->OnStreamRead(nread, uv_buf_init(nullptr, 0));
132
    // If we’re not writing, close now. Otherwise, we’ll do that in
133
    // `OnStreamAfterWrite()`.
134
13
    if (!pipe->is_writing_) {
135
7
      pipe->ShutdownWritable();
136
7
      pipe->Unpipe();
137
    }
138
238
    return;
139
  }
140
141
212
  pipe->ProcessData(nread, buf);
142
}
143
144
212
void StreamPipe::ProcessData(size_t nread, const uv_buf_t& buf) {
145
212
  uv_buf_t buffer = uv_buf_init(buf.base, nread);
146
212
  StreamWriteResult res = sink()->Write(&buffer, 1);
147
212
  if (!res.async) {
148
    free(buf.base);
149
    writable_listener_.OnStreamAfterWrite(nullptr, res.err);
150
  } else {
151
212
    is_writing_ = true;
152
212
    is_reading_ = false;
153
212
    res.wrap->SetAllocatedStorage(buf.base, buf.len);
154
212
    if (source() != nullptr)
155
212
      source()->ReadStop();
156
  }
157
212
}
158
159
13
void StreamPipe::ShutdownWritable() {
160
13
  sink()->Shutdown();
161
13
}
162
163
211
void StreamPipe::WritableListener::OnStreamAfterWrite(WriteWrap* w,
164
                                                      int status) {
165
211
  StreamPipe* pipe = ContainerOf(&StreamPipe::writable_listener_, this);
166
211
  pipe->is_writing_ = false;
167
211
  if (pipe->is_eof_) {
168
6
    AsyncScope async_scope(pipe);
169
6
    pipe->ShutdownWritable();
170
6
    pipe->Unpipe();
171
6
    return;
172
  }
173
174
205
  if (status != 0) {
175
1
    CHECK_NOT_NULL(previous_listener_);
176
1
    StreamListener* prev = previous_listener_;
177
1
    pipe->Unpipe();
178
1
    prev->OnStreamAfterWrite(w, status);
179
1
    return;
180
  }
181
}
182
183
void StreamPipe::WritableListener::OnStreamAfterShutdown(ShutdownWrap* w,
184
                                                         int status) {
185
  StreamPipe* pipe = ContainerOf(&StreamPipe::writable_listener_, this);
186
  CHECK_NOT_NULL(previous_listener_);
187
  StreamListener* prev = previous_listener_;
188
  pipe->Unpipe();
189
  prev->OnStreamAfterShutdown(w, status);
190
}
191
192
void StreamPipe::ReadableListener::OnStreamDestroy() {
193
  StreamPipe* pipe = ContainerOf(&StreamPipe::readable_listener_, this);
194
  pipe->source_destroyed_ = true;
195
  if (!pipe->is_eof_) {
196
    OnStreamRead(UV_EPIPE, uv_buf_init(nullptr, 0));
197
  }
198
}
199
200
void StreamPipe::WritableListener::OnStreamDestroy() {
201
  StreamPipe* pipe = ContainerOf(&StreamPipe::writable_listener_, this);
202
  pipe->sink_destroyed_ = true;
203
  pipe->is_eof_ = true;
204
  pipe->Unpipe();
205
}
206
207
225
void StreamPipe::WritableListener::OnStreamWantsWrite(size_t suggested_size) {
208
225
  StreamPipe* pipe = ContainerOf(&StreamPipe::writable_listener_, this);
209
225
  pipe->wanted_data_ = suggested_size;
210

225
  if (pipe->is_reading_ || pipe->is_closed_)
211
225
    return;
212
225
  AsyncScope async_scope(pipe);
213
225
  pipe->is_reading_ = true;
214
225
  pipe->source()->ReadStart();
215
}
216
217
uv_buf_t StreamPipe::WritableListener::OnStreamAlloc(size_t suggested_size) {
218
  CHECK_NOT_NULL(previous_listener_);
219
  return previous_listener_->OnStreamAlloc(suggested_size);
220
}
221
222
void StreamPipe::WritableListener::OnStreamRead(ssize_t nread,
223
                                                const uv_buf_t& buf) {
224
  CHECK_NOT_NULL(previous_listener_);
225
  return previous_listener_->OnStreamRead(nread, buf);
226
}
227
228
14
void StreamPipe::New(const FunctionCallbackInfo<Value>& args) {
229
14
  CHECK(args.IsConstructCall());
230
28
  CHECK(args[0]->IsExternal());
231
28
  CHECK(args[1]->IsExternal());
232
42
  auto source = static_cast<StreamBase*>(args[0].As<External>()->Value());
233
42
  auto sink = static_cast<StreamBase*>(args[1].As<External>()->Value());
234
235
14
  new StreamPipe(source, sink, args.This());
236
14
}
237
238
14
void StreamPipe::Start(const FunctionCallbackInfo<Value>& args) {
239
  StreamPipe* pipe;
240
28
  ASSIGN_OR_RETURN_UNWRAP(&pipe, args.Holder());
241
14
  pipe->is_closed_ = false;
242
14
  if (pipe->wanted_data_ > 0)
243
    pipe->writable_listener_.OnStreamWantsWrite(pipe->wanted_data_);
244
}
245
246
void StreamPipe::Unpipe(const FunctionCallbackInfo<Value>& args) {
247
  StreamPipe* pipe;
248
  ASSIGN_OR_RETURN_UNWRAP(&pipe, args.Holder());
249
  pipe->Unpipe();
250
}
251
252
namespace {
253
254
212
void InitializeStreamPipe(Local<Object> target,
255
                          Local<Value> unused,
256
                          Local<Context> context,
257
                          void* priv) {
258
212
  Environment* env = Environment::GetCurrent(context);
259
260
  // Create FunctionTemplate for FileHandle::CloseReq
261
212
  Local<FunctionTemplate> pipe = env->NewFunctionTemplate(StreamPipe::New);
262
  Local<String> stream_pipe_string =
263
212
      FIXED_ONE_BYTE_STRING(env->isolate(), "StreamPipe");
264
212
  env->SetProtoMethod(pipe, "unpipe", StreamPipe::Unpipe);
265
212
  env->SetProtoMethod(pipe, "start", StreamPipe::Start);
266
424
  pipe->Inherit(AsyncWrap::GetConstructorTemplate(env));
267
212
  pipe->SetClassName(stream_pipe_string);
268
424
  pipe->InstanceTemplate()->SetInternalFieldCount(1);
269
  target
270
      ->Set(context, stream_pipe_string,
271
636
            pipe->GetFunction(context).ToLocalChecked())
272
424
      .FromJust();
273
212
}
274
275
}  // anonymous namespace
276
277
}  // namespace node
278
279
4314
NODE_MODULE_CONTEXT_AWARE_INTERNAL(stream_pipe,
280
                                   node::InitializeStreamPipe)