GCC Code Coverage Report
Directory: ../ Exec Total Coverage
File: /home/iojs/build/workspace/node-test-commit-linux-coverage/nodes/benchmark/out/../src/stream_base.cc Lines: 184 195 94.4 %
Date: 2019-01-07 12:15:22 Branches: 158 242 65.3 %

Line Branch Exec Source
1
#include "stream_base-inl.h"
2
#include "stream_wrap.h"
3
4
#include "node.h"
5
#include "node_buffer.h"
6
#include "node_errors.h"
7
#include "node_internals.h"
8
#include "env-inl.h"
9
#include "js_stream.h"
10
#include "string_bytes.h"
11
#include "util.h"
12
#include "util-inl.h"
13
#include "v8.h"
14
15
#include <limits.h>  // INT_MAX
16
17
namespace node {
18
19
using v8::Array;
20
using v8::ArrayBuffer;
21
using v8::Context;
22
using v8::FunctionCallbackInfo;
23
using v8::HandleScope;
24
using v8::Integer;
25
using v8::Local;
26
using v8::Object;
27
using v8::String;
28
using v8::Value;
29
30
template int StreamBase::WriteString<ASCII>(
31
    const FunctionCallbackInfo<Value>& args);
32
template int StreamBase::WriteString<UTF8>(
33
    const FunctionCallbackInfo<Value>& args);
34
template int StreamBase::WriteString<UCS2>(
35
    const FunctionCallbackInfo<Value>& args);
36
template int StreamBase::WriteString<LATIN1>(
37
    const FunctionCallbackInfo<Value>& args);
38
39
40
42010
int StreamBase::ReadStartJS(const FunctionCallbackInfo<Value>& args) {
41
42010
  return ReadStart();
42
}
43
44
45
547
int StreamBase::ReadStopJS(const FunctionCallbackInfo<Value>& args) {
46
547
  return ReadStop();
47
}
48
49
50
24977
int StreamBase::Shutdown(const FunctionCallbackInfo<Value>& args) {
51
49954
  CHECK(args[0]->IsObject());
52
49954
  Local<Object> req_wrap_obj = args[0].As<Object>();
53
54
24977
  return Shutdown(req_wrap_obj);
55
}
56
57
349807
void StreamBase::SetWriteResult(const StreamWriteResult& res) {
58
349807
  env_->stream_base_state()[kBytesWritten] = res.bytes;
59
349807
  env_->stream_base_state()[kLastWriteWasAsync] = res.async;
60
349807
}
61
62
11983
int StreamBase::Writev(const FunctionCallbackInfo<Value>& args) {
63
11983
  Environment* env = Environment::GetCurrent(args);
64
65
23966
  CHECK(args[0]->IsObject());
66
23966
  CHECK(args[1]->IsArray());
67
68
23966
  Local<Object> req_wrap_obj = args[0].As<Object>();
69
23966
  Local<Array> chunks = args[1].As<Array>();
70
23966
  bool all_buffers = args[2]->IsTrue();
71
72
  size_t count;
73
11983
  if (all_buffers)
74
114
    count = chunks->Length();
75
  else
76
11869
    count = chunks->Length() >> 1;
77
78
11983
  MaybeStackBuffer<uv_buf_t, 16> bufs(count);
79
80
11983
  size_t storage_size = 0;
81
  size_t offset;
82
83
11983
  if (!all_buffers) {
84
    // Determine storage size first
85
68109
    for (size_t i = 0; i < count; i++) {
86
168720
      Local<Value> chunk = chunks->Get(env->context(), i * 2).ToLocalChecked();
87
88
56240
      if (Buffer::HasInstance(chunk))
89
22825
        continue;
90
        // Buffer chunk, no additional storage required
91
92
      // String chunk
93
100245
      Local<String> string = chunk->ToString(env->context()).ToLocalChecked();
94
      enum encoding encoding = ParseEncoding(env->isolate(),
95
100245
          chunks->Get(env->context(), i * 2 + 1).ToLocalChecked());
96
      size_t chunk_size;
97


105298
      if (encoding == UTF8 && string->Length() > 65535 &&
98

33424
          !StringBytes::Size(env->isolate(), string, encoding).To(&chunk_size))
99
        return 0;
100
100245
      else if (!StringBytes::StorageSize(env->isolate(), string, encoding)
101
100245
                    .To(&chunk_size))
102
        return 0;
103
33415
      storage_size += chunk_size;
104
    }
105
106
11869
    if (storage_size > INT_MAX)
107
      return UV_ENOBUFS;
108
  } else {
109
17567
    for (size_t i = 0; i < count; i++) {
110
52359
      Local<Value> chunk = chunks->Get(env->context(), i).ToLocalChecked();
111
17453
      bufs[i].base = Buffer::Data(chunk);
112
17453
      bufs[i].len = Buffer::Length(chunk);
113
    }
114
  }
115
116
23966
  MallocedBuffer<char> storage;
117
11983
  if (storage_size > 0)
118
11868
    storage = MallocedBuffer<char>(storage_size);
119
120
11983
  offset = 0;
121
11983
  if (!all_buffers) {
122
68109
    for (size_t i = 0; i < count; i++) {
123
168720
      Local<Value> chunk = chunks->Get(env->context(), i * 2).ToLocalChecked();
124
125
      // Write buffer
126
56240
      if (Buffer::HasInstance(chunk)) {
127
22825
        bufs[i].base = Buffer::Data(chunk);
128
22825
        bufs[i].len = Buffer::Length(chunk);
129
22825
        continue;
130
      }
131
132
      // Write string
133
33415
      CHECK_LE(offset, storage_size);
134
33415
      char* str_storage = storage.data + offset;
135
33415
      size_t str_size = storage_size - offset;
136
137
100245
      Local<String> string = chunk->ToString(env->context()).ToLocalChecked();
138
      enum encoding encoding = ParseEncoding(env->isolate(),
139
100245
          chunks->Get(env->context(), i * 2 + 1).ToLocalChecked());
140
      str_size = StringBytes::Write(env->isolate(),
141
                                    str_storage,
142
                                    str_size,
143
                                    string,
144
33415
                                    encoding);
145
33415
      bufs[i].base = str_storage;
146
33415
      bufs[i].len = str_size;
147
33415
      offset += str_size;
148
    }
149
  }
150
151
11983
  StreamWriteResult res = Write(*bufs, count, nullptr, req_wrap_obj);
152
11983
  SetWriteResult(res);
153

11983
  if (res.wrap != nullptr && storage_size > 0) {
154
289
    res.wrap->SetAllocatedStorage(storage.release(), storage_size);
155
  }
156
23966
  return res.err;
157
}
158
159
160
52337
int StreamBase::WriteBuffer(const FunctionCallbackInfo<Value>& args) {
161
104674
  CHECK(args[0]->IsObject());
162
163
52337
  Environment* env = Environment::GetCurrent(args);
164
165
104674
  if (!args[1]->IsUint8Array()) {
166
1
    node::THROW_ERR_INVALID_ARG_TYPE(env, "Second argument must be a buffer");
167
1
    return 0;
168
  }
169
170
104672
  Local<Object> req_wrap_obj = args[0].As<Object>();
171
172
  uv_buf_t buf;
173
52336
  buf.base = Buffer::Data(args[1]);
174
52336
  buf.len = Buffer::Length(args[1]);
175
176
52336
  StreamWriteResult res = Write(&buf, 1, nullptr, req_wrap_obj);
177
52336
  SetWriteResult(res);
178
179
52336
  return res.err;
180
}
181
182
183
template <enum encoding enc>
184
285488
int StreamBase::WriteString(const FunctionCallbackInfo<Value>& args) {
185
285488
  Environment* env = Environment::GetCurrent(args);
186


570976
  CHECK(args[0]->IsObject());
187


856464
  CHECK(args[1]->IsString());
188
189
570976
  Local<Object> req_wrap_obj = args[0].As<Object>();
190
570976
  Local<String> string = args[1].As<String>();
191
  Local<Object> send_handle_obj;
192


570976
  if (args[2]->IsObject())
193
200
    send_handle_obj = args[2].As<Object>();
194
195
  // Compute the size of the storage that the string will be flattened into.
196
  // For UTF8 strings that are very long, go ahead and take the hit for
197
  // computing their actual size, rather than tripling the storage.
198
  size_t storage_size;
199

1132336
  if (enc == UTF8 && string->Length() > 65535 &&
200

283117
      !StringBytes::Size(env->isolate(), string, enc).To(&storage_size))
201
    return 0;
202


856464
  else if (!StringBytes::StorageSize(env->isolate(), string, enc)
203
856464
                .To(&storage_size))
204
    return 0;
205
206


285488
  if (storage_size > INT_MAX)
207
    return UV_ENOBUFS;
208
209
  // Try writing immediately if write size isn't too big
210
  char stack_storage[16384];  // 16kb
211
  size_t data_size;
212
285488
  size_t synchronously_written = 0;
213
  uv_buf_t buf;
214
215
  bool try_write = storage_size <= sizeof(stack_storage) &&
216






286855
                   (!IsIPCPipe() || send_handle_obj.IsEmpty());
217


285488
  if (try_write) {
218
285363
    data_size = StringBytes::Write(env->isolate(),
219
                                   stack_storage,
220
                                   storage_size,
221
                                   string,
222
570726
                                   enc);
223
285363
    buf = uv_buf_init(stack_storage, data_size);
224
225
285363
    uv_buf_t* bufs = &buf;
226
285363
    size_t count = 1;
227
285363
    const int err = DoTryWrite(&bufs, &count);
228
    // Keep track of the bytes written here, because we're taking a shortcut
229
    // by using `DoTryWrite()` directly instead of using the utilities
230
    // provided by `Write()`.
231


285363
    synchronously_written = count == 0 ? data_size : data_size - buf.len;
232
285363
    bytes_written_ += synchronously_written;
233
234
    // Immediate failure or success
235




285363
    if (err != 0 || count == 0) {
236
283910
      SetWriteResult(StreamWriteResult { false, err, nullptr, data_size });
237
283910
      return err;
238
    }
239
240
    // Partial write
241


1453
    CHECK_EQ(count, 1);
242
  }
243
244
1578
  MallocedBuffer<char> data;
245
246


1578
  if (try_write) {
247
    // Copy partial data
248
1453
    data = MallocedBuffer<char>(buf.len);
249
1453
    memcpy(data.data, buf.base, buf.len);
250
1453
    data_size = buf.len;
251
  } else {
252
    // Write it
253
125
    data = MallocedBuffer<char>(storage_size);
254
125
    data_size = StringBytes::Write(env->isolate(),
255
                                   data.data,
256
                                   storage_size,
257
                                   string,
258
250
                                   enc);
259
  }
260
261


1578
  CHECK_LE(data_size, storage_size);
262
263
1578
  buf = uv_buf_init(data.data, data_size);
264
265
1578
  uv_stream_t* send_handle = nullptr;
266
267






1682
  if (IsIPCPipe() && !send_handle_obj.IsEmpty()) {
268
    HandleWrap* wrap;
269


100
    ASSIGN_OR_RETURN_UNWRAP(&wrap, send_handle_obj, UV_EINVAL);
270
100
    send_handle = reinterpret_cast<uv_stream_t*>(wrap->GetHandle());
271
    // Reference LibuvStreamWrap instance to prevent it from being garbage
272
    // collected before `AfterWrite` is called.
273
    req_wrap_obj->Set(env->context(),
274
                      env->handle_string(),
275
400
                      send_handle_obj).FromJust();
276
  }
277
278
1578
  StreamWriteResult res = Write(&buf, 1, send_handle, req_wrap_obj);
279
1578
  res.bytes += synchronously_written;
280
281
1578
  SetWriteResult(res);
282


1578
  if (res.wrap != nullptr) {
283
1561
    res.wrap->SetAllocatedStorage(data.release(), data_size);
284
  }
285
286
1578
  return res.err;
287
}
288
289
290
307671
void StreamBase::CallJSOnreadMethod(ssize_t nread,
291
                                    Local<ArrayBuffer> ab,
292
                                    size_t offset) {
293
307671
  Environment* env = env_;
294
295
  DCHECK_EQ(static_cast<int32_t>(nread), nread);
296
  DCHECK_LE(offset, INT32_MAX);
297
298
307671
  if (ab.IsEmpty()) {
299
    DCHECK_EQ(offset, 0);
300
    DCHECK_LE(nread, 0);
301
  } else {
302
    DCHECK_GE(nread, 0);
303
  }
304
305
307671
  env->stream_base_state()[kReadBytesOrError] = nread;
306
307671
  env->stream_base_state()[kArrayBufferOffset] = offset;
307
308
  Local<Value> argv[] = {
309
662961
    ab.IsEmpty() ? Undefined(env->isolate()).As<Value>() : ab.As<Value>()
310
1538355
  };
311
312
307671
  AsyncWrap* wrap = GetAsyncWrap();
313
307671
  CHECK_NOT_NULL(wrap);
314
307671
  wrap->MakeCallback(env->onread_string(), arraysize(argv), argv);
315
307599
}
316
317
318
2256
bool StreamBase::IsIPCPipe() {
319
2256
  return false;
320
}
321
322
323
int StreamBase::GetFD() {
324
  return -1;
325
}
326
327
328
28615
Local<Object> StreamBase::GetObject() {
329
28615
  return GetAsyncWrap()->object();
330
}
331
332
333
5668
int StreamResource::DoTryWrite(uv_buf_t** bufs, size_t* count) {
334
  // No TryWrite by default
335
5668
  return 0;
336
}
337
338
339
56340
const char* StreamResource::Error() const {
340
56340
  return nullptr;
341
}
342
343
344
void StreamResource::ClearError() {
345
  // No-op
346
}
347
348
349
313603
uv_buf_t StreamListener::OnStreamAlloc(size_t suggested_size) {
350
313603
  return uv_buf_init(Malloc(suggested_size), suggested_size);
351
}
352
353
354
297548
void EmitToJSStreamListener::OnStreamRead(ssize_t nread, const uv_buf_t& buf) {
355
297548
  CHECK_NOT_NULL(stream_);
356
297548
  StreamBase* stream = static_cast<StreamBase*>(stream_);
357
297548
  Environment* env = stream->stream_env();
358
297548
  HandleScope handle_scope(env->isolate());
359
579167
  Context::Scope context_scope(env->context());
360
361
297548
  if (nread <= 0)  {
362
15881
    free(buf.base);
363
15881
    if (nread < 0)
364
15873
      stream->CallJSOnreadMethod(nread, Local<ArrayBuffer>());
365
313333
    return;
366
  }
367
368
281667
  CHECK_LE(static_cast<size_t>(nread), buf.len);
369
281667
  char* base = Realloc(buf.base, nread);
370
371
  Local<ArrayBuffer> obj = ArrayBuffer::New(
372
      env->isolate(),
373
      base,
374
      nread,
375
281667
      v8::ArrayBufferCreationMode::kInternalized);  // Transfer ownership to V8.
376
563286
  stream->CallJSOnreadMethod(nread, obj);
377
}
378
379
380
28563
void ReportWritesToJSStreamListener::OnStreamAfterReqFinished(
381
    StreamReq* req_wrap, int status) {
382
28563
  StreamBase* stream = static_cast<StreamBase*>(stream_);
383
28563
  Environment* env = stream->stream_env();
384
28563
  AsyncWrap* async_wrap = req_wrap->GetAsyncWrap();
385
28563
  HandleScope handle_scope(env->isolate());
386
28563
  Context::Scope context_scope(env->context());
387
57126
  CHECK(!async_wrap->persistent().IsEmpty());
388
28563
  Local<Object> req_wrap_obj = async_wrap->object();
389
390
  Local<Value> argv[] = {
391
    Integer::New(env->isolate(), status),
392
28563
    stream->GetObject(),
393
    Undefined(env->isolate())
394
142815
  };
395
396
28563
  const char* msg = stream->Error();
397
28563
  if (msg != nullptr) {
398
    argv[2] = OneByteString(env->isolate(), msg);
399
    stream->ClearError();
400
  }
401
402
114252
  if (req_wrap_obj->Has(env->context(), env->oncomplete_string()).FromJust())
403
56695
    async_wrap->MakeCallback(env->oncomplete_string(), arraysize(argv), argv);
404
28560
}
405
406
3162
void ReportWritesToJSStreamListener::OnStreamAfterWrite(
407
    WriteWrap* req_wrap, int status) {
408
3162
  OnStreamAfterReqFinished(req_wrap, status);
409
3159
}
410
411
25401
void ReportWritesToJSStreamListener::OnStreamAfterShutdown(
412
    ShutdownWrap* req_wrap, int status) {
413
25401
  OnStreamAfterReqFinished(req_wrap, status);
414
25401
}
415
416
417
}  // namespace node