GCC Code Coverage Report
Directory: ../ Exec Total Coverage
File: /home/iojs/build/workspace/node-test-commit-linux-coverage-daily/nodes/benchmark/out/../src/stream_base.cc Lines: 185 196 94.4 %
Date: 2019-03-02 22:23:06 Branches: 161 246 65.4 %

Line Branch Exec Source
1
#include "stream_base-inl.h"
2
#include "stream_wrap.h"
3
4
#include "node.h"
5
#include "node_buffer.h"
6
#include "node_errors.h"
7
#include "env-inl.h"
8
#include "js_stream.h"
9
#include "string_bytes.h"
10
#include "util-inl.h"
11
#include "v8.h"
12
13
#include <climits>  // INT_MAX
14
15
namespace node {
16
17
using v8::Array;
18
using v8::ArrayBuffer;
19
using v8::Context;
20
using v8::FunctionCallbackInfo;
21
using v8::HandleScope;
22
using v8::Integer;
23
using v8::Local;
24
using v8::Object;
25
using v8::String;
26
using v8::Value;
27
28
template int StreamBase::WriteString<ASCII>(
29
    const FunctionCallbackInfo<Value>& args);
30
template int StreamBase::WriteString<UTF8>(
31
    const FunctionCallbackInfo<Value>& args);
32
template int StreamBase::WriteString<UCS2>(
33
    const FunctionCallbackInfo<Value>& args);
34
template int StreamBase::WriteString<LATIN1>(
35
    const FunctionCallbackInfo<Value>& args);
36
37
38
61087
int StreamBase::ReadStartJS(const FunctionCallbackInfo<Value>& args) {
39
61087
  return ReadStart();
40
}
41
42
43
565
int StreamBase::ReadStopJS(const FunctionCallbackInfo<Value>& args) {
44
565
  return ReadStop();
45
}
46
47
48
42379
int StreamBase::Shutdown(const FunctionCallbackInfo<Value>& args) {
49
84758
  CHECK(args[0]->IsObject());
50
84758
  Local<Object> req_wrap_obj = args[0].As<Object>();
51
52
42379
  return Shutdown(req_wrap_obj);
53
}
54
55
755032
void StreamBase::SetWriteResult(const StreamWriteResult& res) {
56
755032
  env_->stream_base_state()[kBytesWritten] = res.bytes;
57
755032
  env_->stream_base_state()[kLastWriteWasAsync] = res.async;
58
755032
}
59
60
22472
int StreamBase::Writev(const FunctionCallbackInfo<Value>& args) {
61
22472
  Environment* env = Environment::GetCurrent(args);
62
63
44944
  CHECK(args[0]->IsObject());
64
44944
  CHECK(args[1]->IsArray());
65
66
44944
  Local<Object> req_wrap_obj = args[0].As<Object>();
67
44944
  Local<Array> chunks = args[1].As<Array>();
68
44944
  bool all_buffers = args[2]->IsTrue();
69
70
  size_t count;
71
22472
  if (all_buffers)
72
280
    count = chunks->Length();
73
  else
74
22192
    count = chunks->Length() >> 1;
75
76
22472
  MaybeStackBuffer<uv_buf_t, 16> bufs(count);
77
78
22472
  size_t storage_size = 0;
79
  size_t offset;
80
81
22472
  if (!all_buffers) {
82
    // Determine storage size first
83
311179
    for (size_t i = 0; i < count; i++) {
84
866961
      Local<Value> chunk = chunks->Get(env->context(), i * 2).ToLocalChecked();
85
86
288987
      if (Buffer::HasInstance(chunk))
87
43399
        continue;
88
        // Buffer chunk, no additional storage required
89
90
      // String chunk
91
736764
      Local<String> string = chunk->ToString(env->context()).ToLocalChecked();
92
      enum encoding encoding = ParseEncoding(env->isolate(),
93
736764
          chunks->Get(env->context(), i * 2 + 1).ToLocalChecked());
94
      size_t chunk_size;
95


925371
      if (encoding == UTF8 && string->Length() > 65535 &&
96

245600
          !StringBytes::Size(env->isolate(), string, encoding).To(&chunk_size))
97
        return 0;
98
736764
      else if (!StringBytes::StorageSize(env->isolate(), string, encoding)
99
736764
                    .To(&chunk_size))
100
        return 0;
101
245588
      storage_size += chunk_size;
102
    }
103
104
22192
    if (storage_size > INT_MAX)
105
      return UV_ENOBUFS;
106
  } else {
107
28564
    for (size_t i = 0; i < count; i++) {
108
84852
      Local<Value> chunk = chunks->Get(env->context(), i).ToLocalChecked();
109
28284
      bufs[i].base = Buffer::Data(chunk);
110
28284
      bufs[i].len = Buffer::Length(chunk);
111
    }
112
  }
113
114
44944
  AllocatedBuffer storage;
115
22472
  if (storage_size > 0)
116
22146
    storage = env->AllocateManaged(storage_size);
117
118
22472
  offset = 0;
119
22472
  if (!all_buffers) {
120
311179
    for (size_t i = 0; i < count; i++) {
121
866961
      Local<Value> chunk = chunks->Get(env->context(), i * 2).ToLocalChecked();
122
123
      // Write buffer
124
288987
      if (Buffer::HasInstance(chunk)) {
125
43399
        bufs[i].base = Buffer::Data(chunk);
126
43399
        bufs[i].len = Buffer::Length(chunk);
127
43399
        continue;
128
      }
129
130
      // Write string
131
245588
      CHECK_LE(offset, storage_size);
132
245588
      char* str_storage = storage.data() + offset;
133
245588
      size_t str_size = storage.size() - offset;
134
135
736764
      Local<String> string = chunk->ToString(env->context()).ToLocalChecked();
136
      enum encoding encoding = ParseEncoding(env->isolate(),
137
736764
          chunks->Get(env->context(), i * 2 + 1).ToLocalChecked());
138
      str_size = StringBytes::Write(env->isolate(),
139
                                    str_storage,
140
                                    str_size,
141
                                    string,
142
245588
                                    encoding);
143
245588
      bufs[i].base = str_storage;
144
245588
      bufs[i].len = str_size;
145
245588
      offset += str_size;
146
    }
147
  }
148
149
22472
  StreamWriteResult res = Write(*bufs, count, nullptr, req_wrap_obj);
150
22472
  SetWriteResult(res);
151

22472
  if (res.wrap != nullptr && storage_size > 0) {
152
262
    res.wrap->SetAllocatedStorage(std::move(storage));
153
  }
154
44944
  return res.err;
155
}
156
157
158
67908
int StreamBase::WriteBuffer(const FunctionCallbackInfo<Value>& args) {
159
135816
  CHECK(args[0]->IsObject());
160
161
67908
  Environment* env = Environment::GetCurrent(args);
162
163
135816
  if (!args[1]->IsUint8Array()) {
164
1
    node::THROW_ERR_INVALID_ARG_TYPE(env, "Second argument must be a buffer");
165
1
    return 0;
166
  }
167
168
135814
  Local<Object> req_wrap_obj = args[0].As<Object>();
169
170
  uv_buf_t buf;
171
67907
  buf.base = Buffer::Data(args[1]);
172
67907
  buf.len = Buffer::Length(args[1]);
173
174
67907
  StreamWriteResult res = Write(&buf, 1, nullptr, req_wrap_obj);
175
67907
  SetWriteResult(res);
176
177
67907
  return res.err;
178
}
179
180
181
template <enum encoding enc>
182
664653
int StreamBase::WriteString(const FunctionCallbackInfo<Value>& args) {
183
664653
  Environment* env = Environment::GetCurrent(args);
184


1329306
  CHECK(args[0]->IsObject());
185


1993959
  CHECK(args[1]->IsString());
186
187
1329306
  Local<Object> req_wrap_obj = args[0].As<Object>();
188
1329306
  Local<String> string = args[1].As<String>();
189
  Local<Object> send_handle_obj;
190


1329306
  if (args[2]->IsObject())
191
206
    send_handle_obj = args[2].As<Object>();
192
193
  // Compute the size of the storage that the string will be flattened into.
194
  // For UTF8 strings that are very long, go ahead and take the hit for
195
  // computing their actual size, rather than tripling the storage.
196
  size_t storage_size;
197

2608082
  if (enc == UTF8 && string->Length() > 65535 &&
198

652070
      !StringBytes::Size(env->isolate(), string, enc).To(&storage_size))
199
    return 0;
200


1993959
  else if (!StringBytes::StorageSize(env->isolate(), string, enc)
201
1993959
                .To(&storage_size))
202
    return 0;
203
204


664653
  if (storage_size > INT_MAX)
205
    return UV_ENOBUFS;
206
207
  // Try writing immediately if write size isn't too big
208
  char stack_storage[16384];  // 16kb
209
  size_t data_size;
210
664653
  size_t synchronously_written = 0;
211
  uv_buf_t buf;
212
213
  bool try_write = storage_size <= sizeof(stack_storage) &&
214






666474
                   (!IsIPCPipe() || send_handle_obj.IsEmpty());
215


664653
  if (try_write) {
216
664366
    data_size = StringBytes::Write(env->isolate(),
217
                                   stack_storage,
218
                                   storage_size,
219
                                   string,
220
1328732
                                   enc);
221
664366
    buf = uv_buf_init(stack_storage, data_size);
222
223
664366
    uv_buf_t* bufs = &buf;
224
664366
    size_t count = 1;
225
664366
    const int err = DoTryWrite(&bufs, &count);
226
    // Keep track of the bytes written here, because we're taking a shortcut
227
    // by using `DoTryWrite()` directly instead of using the utilities
228
    // provided by `Write()`.
229


664366
    synchronously_written = count == 0 ? data_size : data_size - buf.len;
230
664366
    bytes_written_ += synchronously_written;
231
232
    // Immediate failure or success
233




664366
    if (err != 0 || count == 0) {
234
662694
      SetWriteResult(StreamWriteResult { false, err, nullptr, data_size });
235
662694
      return err;
236
    }
237
238
    // Partial write
239


1672
    CHECK_EQ(count, 1);
240
  }
241
242
1959
  AllocatedBuffer data;
243
244


1959
  if (try_write) {
245
    // Copy partial data
246
1672
    data = env->AllocateManaged(buf.len);
247
1672
    memcpy(data.data(), buf.base, buf.len);
248
1672
    data_size = buf.len;
249
  } else {
250
    // Write it
251
287
    data = env->AllocateManaged(storage_size);
252
287
    data_size = StringBytes::Write(env->isolate(),
253
                                   data.data(),
254
                                   storage_size,
255
                                   string,
256
574
                                   enc);
257
  }
258
259


1959
  CHECK_LE(data_size, storage_size);
260
261
1959
  buf = uv_buf_init(data.data(), data_size);
262
263
1959
  uv_stream_t* send_handle = nullptr;
264
265






2065
  if (IsIPCPipe() && !send_handle_obj.IsEmpty()) {
266
    HandleWrap* wrap;
267


103
    ASSIGN_OR_RETURN_UNWRAP(&wrap, send_handle_obj, UV_EINVAL);
268
103
    send_handle = reinterpret_cast<uv_stream_t*>(wrap->GetHandle());
269
    // Reference LibuvStreamWrap instance to prevent it from being garbage
270
    // collected before `AfterWrite` is called.
271
    req_wrap_obj->Set(env->context(),
272
                      env->handle_string(),
273
412
                      send_handle_obj).FromJust();
274
  }
275
276
1959
  StreamWriteResult res = Write(&buf, 1, send_handle, req_wrap_obj);
277
1959
  res.bytes += synchronously_written;
278
279
1959
  SetWriteResult(res);
280


1959
  if (res.wrap != nullptr) {
281
1786
    res.wrap->SetAllocatedStorage(std::move(data));
282
  }
283
284
1959
  return res.err;
285
}
286
287
288
400452
void StreamBase::CallJSOnreadMethod(ssize_t nread,
289
                                    Local<ArrayBuffer> ab,
290
                                    size_t offset) {
291
400452
  Environment* env = env_;
292
293
  DCHECK_EQ(static_cast<int32_t>(nread), nread);
294
  DCHECK_LE(offset, INT32_MAX);
295
296
400452
  if (ab.IsEmpty()) {
297
    DCHECK_EQ(offset, 0);
298
    DCHECK_LE(nread, 0);
299
  } else {
300
    DCHECK_GE(nread, 0);
301
  }
302
303
400452
  env->stream_base_state()[kReadBytesOrError] = nread;
304
400452
  env->stream_base_state()[kArrayBufferOffset] = offset;
305
306
  Local<Value> argv[] = {
307
853005
    ab.IsEmpty() ? Undefined(env->isolate()).As<Value>() : ab.As<Value>()
308
2002260
  };
309
310
400452
  AsyncWrap* wrap = GetAsyncWrap();
311
400452
  CHECK_NOT_NULL(wrap);
312
400452
  wrap->MakeCallback(env->onread_string(), arraysize(argv), argv);
313
400386
}
314
315
316
2385
bool StreamBase::IsIPCPipe() {
317
2385
  return false;
318
}
319
320
321
int StreamBase::GetFD() {
322
  return -1;
323
}
324
325
326
24682
Local<Object> StreamBase::GetObject() {
327
24682
  return GetAsyncWrap()->object();
328
}
329
330
331
6206
int StreamResource::DoTryWrite(uv_buf_t** bufs, size_t* count) {
332
  // No TryWrite by default
333
6206
  return 0;
334
}
335
336
337
69589
const char* StreamResource::Error() const {
338
69589
  return nullptr;
339
}
340
341
342
void StreamResource::ClearError() {
343
  // No-op
344
}
345
346
378420
uv_buf_t EmitToJSStreamListener::OnStreamAlloc(size_t suggested_size) {
347
378420
  CHECK_NOT_NULL(stream_);
348
378420
  Environment* env = static_cast<StreamBase*>(stream_)->stream_env();
349
378420
  return env->AllocateManaged(suggested_size).release();
350
}
351
352
390218
void EmitToJSStreamListener::OnStreamRead(ssize_t nread, const uv_buf_t& buf_) {
353
390218
  CHECK_NOT_NULL(stream_);
354
390218
  StreamBase* stream = static_cast<StreamBase*>(stream_);
355
390218
  Environment* env = stream->stream_env();
356
390218
  HandleScope handle_scope(env->isolate());
357
763016
  Context::Scope context_scope(env->context());
358
763016
  AllocatedBuffer buf(env, buf_);
359
360
390218
  if (nread <= 0)  {
361
17378
    if (nread < 0)
362
17367
      stream->CallJSOnreadMethod(nread, Local<ArrayBuffer>());
363
407506
    return;
364
  }
365
366
372840
  CHECK_LE(static_cast<size_t>(nread), buf.size());
367
372840
  buf.Resize(nread);
368
369
745638
  stream->CallJSOnreadMethod(nread, buf.ToArrayBuffer());
370
}
371
372
373
24622
void ReportWritesToJSStreamListener::OnStreamAfterReqFinished(
374
    StreamReq* req_wrap, int status) {
375
24622
  StreamBase* stream = static_cast<StreamBase*>(stream_);
376
24622
  Environment* env = stream->stream_env();
377
24622
  AsyncWrap* async_wrap = req_wrap->GetAsyncWrap();
378
24622
  HandleScope handle_scope(env->isolate());
379
24622
  Context::Scope context_scope(env->context());
380
49244
  CHECK(!async_wrap->persistent().IsEmpty());
381
24622
  Local<Object> req_wrap_obj = async_wrap->object();
382
383
  Local<Value> argv[] = {
384
    Integer::New(env->isolate(), status),
385
24622
    stream->GetObject(),
386
    Undefined(env->isolate())
387
123110
  };
388
389
24622
  const char* msg = stream->Error();
390
24622
  if (msg != nullptr) {
391
    argv[2] = OneByteString(env->isolate(), msg);
392
    stream->ClearError();
393
  }
394
395
98488
  if (req_wrap_obj->Has(env->context(), env->oncomplete_string()).FromJust())
396
49239
    async_wrap->MakeCallback(env->oncomplete_string(), arraysize(argv), argv);
397
24619
}
398
399
3395
void ReportWritesToJSStreamListener::OnStreamAfterWrite(
400
    WriteWrap* req_wrap, int status) {
401
3395
  OnStreamAfterReqFinished(req_wrap, status);
402
3392
}
403
404
21227
void ReportWritesToJSStreamListener::OnStreamAfterShutdown(
405
    ShutdownWrap* req_wrap, int status) {
406
21227
  OnStreamAfterReqFinished(req_wrap, status);
407
21227
}
408
409
410
}  // namespace node