GCC Code Coverage Report
Directory: ../ Exec Total Coverage
File: /home/iojs/build/workspace/node-test-commit-linux-coverage-daily/nodes/benchmark/out/../src/stream_base.h Lines: 27 34 79.4 %
Date: 2019-10-07 22:40:39 Branches: 12 24 50.0 %

Line Branch Exec Source
1
#ifndef SRC_STREAM_BASE_H_
2
#define SRC_STREAM_BASE_H_
3
4
#if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS
5
6
#include "env.h"
7
#include "async_wrap-inl.h"
8
#include "node.h"
9
#include "util.h"
10
11
#include "v8.h"
12
13
namespace node {
14
15
// Forward declarations
16
class ShutdownWrap;
17
class WriteWrap;
18
class StreamBase;
19
class StreamResource;
20
21
struct StreamWriteResult {
22
  bool async;
23
  int err;
24
  WriteWrap* wrap;
25
  size_t bytes;
26
};
27
28
using JSMethodFunction = void(const v8::FunctionCallbackInfo<v8::Value>& args);
29
30
class StreamReq {
31
 public:
32
  static constexpr int kStreamReqField = 1;
33
34
62629
  explicit StreamReq(StreamBase* stream,
35
62629
                     v8::Local<v8::Object> req_wrap_obj) : stream_(stream) {
36
62629
    AttachToObject(req_wrap_obj);
37
62629
  }
38
39
62614
  virtual ~StreamReq() = default;
40
  virtual AsyncWrap* GetAsyncWrap() = 0;
41
  v8::Local<v8::Object> object();
42
43
  void Done(int status, const char* error_str = nullptr);
44
  void Dispose();
45
46
38913
  inline StreamBase* stream() const { return stream_; }
47
48
  static StreamReq* FromObject(v8::Local<v8::Object> req_wrap_obj);
49
50
  // Sets all internal fields of `req_wrap_obj` to `nullptr`.
51
  // This is what the `WriteWrap` and `ShutdownWrap` JS constructors do,
52
  // and what we use in C++ after creating these objects from their
53
  // v8::ObjectTemplates, to avoid the overhead of calling the
54
  // constructor explicitly.
55
  static inline void ResetObject(v8::Local<v8::Object> req_wrap_obj);
56
57
 protected:
58
  virtual void OnDone(int status) = 0;
59
60
  void AttachToObject(v8::Local<v8::Object> req_wrap_obj);
61
62
 private:
63
  StreamBase* const stream_;
64
};
65
66
56412
class ShutdownWrap : public StreamReq {
67
 public:
68
56414
  ShutdownWrap(StreamBase* stream,
69
               v8::Local<v8::Object> req_wrap_obj)
70
56414
    : StreamReq(stream, req_wrap_obj) { }
71
72
  // Call stream()->EmitAfterShutdown() and dispose of this request wrap.
73
  void OnDone(int status) override;
74
};
75
76
6202
class WriteWrap : public StreamReq {
77
 public:
78
  void SetAllocatedStorage(AllocatedBuffer&& storage);
79
80
6215
  WriteWrap(StreamBase* stream,
81
            v8::Local<v8::Object> req_wrap_obj)
82
6215
    : StreamReq(stream, req_wrap_obj) { }
83
84
  // Call stream()->EmitAfterWrite() and dispose of this request wrap.
85
  void OnDone(int status) override;
86
87
 private:
88
  AllocatedBuffer storage_;
89
};
90
91
92
// This is the generic interface for objects that control Node.js' C++ streams.
93
// For example, the default `EmitToJSStreamListener` emits a stream's data
94
// as Buffers in JS, or `TLSWrap` reads and decrypts data from a stream.
95
133933
class StreamListener {
96
 public:
97
  virtual ~StreamListener();
98
99
  // This is called when a stream wants to allocate memory before
100
  // reading data into the freshly allocated buffer (i.e. it is always followed
101
  // by a `OnStreamRead()` call).
102
  // This memory may be statically or dynamically allocated; for example,
103
  // a protocol parser may want to read data into a static buffer if it knows
104
  // that all data is going to be fully handled during the next
105
  // `OnStreamRead()` call.
106
  // The returned buffer does not need to contain `suggested_size` bytes.
107
  // The default implementation of this method returns a buffer that has exactly
108
  // the suggested size and is allocated using malloc().
109
  // It is not valid to return a zero-length buffer from this method.
110
  // It is not guaranteed that the corresponding `OnStreamRead()` call
111
  // happens in the same event loop turn as this call.
112
  virtual uv_buf_t OnStreamAlloc(size_t suggested_size) = 0;
113
114
  // `OnStreamRead()` is called when data is available on the socket and has
115
  // been read into the buffer provided by `OnStreamAlloc()`.
116
  // The `buf` argument is the return value of `uv_buf_t`, or may be a buffer
117
  // with base nullptr in case of an error.
118
  // `nread` is the number of read bytes (which is at most the buffer length),
119
  // or, if negative, a libuv error code.
120
  virtual void OnStreamRead(ssize_t nread,
121
                            const uv_buf_t& buf) = 0;
122
123
  // This is called once a write has finished. `status` may be 0 or,
124
  // if negative, a libuv error code.
125
  // By default, this is simply passed on to the previous listener
126
  // (and raises an assertion if there is none).
127
  virtual void OnStreamAfterWrite(WriteWrap* w, int status);
128
129
  // This is called once a shutdown has finished. `status` may be 0 or,
130
  // if negative, a libuv error code.
131
  // By default, this is simply passed on to the previous listener
132
  // (and raises an assertion if there is none).
133
  virtual void OnStreamAfterShutdown(ShutdownWrap* w, int status);
134
135
  // This is called by the stream if it determines that it wants more data
136
  // to be written to it. Not all streams support this.
137
  // This callback will not be called as long as there are active writes.
138
  // It is not supported by all streams; `stream->HasWantsWrite()` returns
139
  // true if it is supported by a stream.
140
3364
  virtual void OnStreamWantsWrite(size_t suggested_size) {}
141
142
  // This is called immediately before the stream is destroyed.
143
553
  virtual void OnStreamDestroy() {}
144
145
  // The stream this is currently associated with, or nullptr if there is none.
146
940
  inline StreamResource* stream() { return stream_; }
147
148
 protected:
149
  // Pass along a read error to the `StreamListener` instance that was active
150
  // before this one. For example, a protocol parser does not care about read
151
  // errors and may instead want to let the original handler
152
  // (e.g. the JS handler) take care of the situation.
153
  void PassReadErrorToPreviousListener(ssize_t nread);
154
155
  StreamResource* stream_ = nullptr;
156
  StreamListener* previous_listener_ = nullptr;
157
158
  friend class StreamResource;
159
};
160
161
162
// An (incomplete) stream listener class that calls the `.oncomplete()`
163
// method of the JS objects associated with the wrap objects.
164
186840
class ReportWritesToJSStreamListener : public StreamListener {
165
 public:
166
  void OnStreamAfterWrite(WriteWrap* w, int status) override;
167
  void OnStreamAfterShutdown(ShutdownWrap* w, int status) override;
168
169
 private:
170
  void OnStreamAfterReqFinished(StreamReq* req_wrap, int status);
171
};
172
173
174
// A default emitter that just pushes data chunks as Buffer instances to
175
// JS land via the handle’s .ondata method.
176
186828
class EmitToJSStreamListener : public ReportWritesToJSStreamListener {
177
 public:
178
  uv_buf_t OnStreamAlloc(size_t suggested_size) override;
179
  void OnStreamRead(ssize_t nread, const uv_buf_t& buf) override;
180
};
181
182
183
// An alternative listener that uses a custom, user-provided buffer
184
// for reading data.
185
12
class CustomBufferJSListener : public ReportWritesToJSStreamListener {
186
 public:
187
  uv_buf_t OnStreamAlloc(size_t suggested_size) override;
188
  void OnStreamRead(ssize_t nread, const uv_buf_t& buf) override;
189
6
  void OnStreamDestroy() override { delete this; }
190
191
6
  explicit CustomBufferJSListener(uv_buf_t buffer) : buffer_(buffer) {}
192
193
 private:
194
  uv_buf_t buffer_;
195
};
196
197
198
// A generic stream, comparable to JS land’s `Duplex` streams.
199
// A stream is always controlled through one `StreamListener` instance.
200
93741
class StreamResource {
201
 public:
202
  virtual ~StreamResource();
203
204
  // These need to be implemented on the readable side of this stream:
205
206
  // Start reading from the underlying resource. This is called by the consumer
207
  // when more data is desired. Use `EmitAlloc()` and `EmitData()` to
208
  // pass data along to the consumer.
209
  virtual int ReadStart() = 0;
210
  // Stop reading from the underlying resource. This is called by the
211
  // consumer when its buffers are full and no more data can be handled.
212
  virtual int ReadStop() = 0;
213
214
  // These need to be implemented on the writable side of this stream:
215
  // All of these methods may return an error code synchronously.
216
  // In that case, the finish callback should *not* be called.
217
218
  // Perform a shutdown operation, and either call req_wrap->Done() when
219
  // finished and return 0, return 1 for synchronous success, or
220
  // a libuv error code for synchronous failures.
221
  virtual int DoShutdown(ShutdownWrap* req_wrap) = 0;
222
  // Try to write as much data as possible synchronously, and modify
223
  // `*bufs` and `*count` accordingly. This is a no-op by default.
224
  // Return 0 for success and a libuv error code for failures.
225
  virtual int DoTryWrite(uv_buf_t** bufs, size_t* count);
226
  // Initiate a write of data. If the write completes synchronously, return 0 on
227
  // success (with bufs modified to indicate how much data was consumed) or a
228
  // libuv error code on failure. If the write will complete asynchronously,
229
  // return 0. When the write completes asynchronously, call req_wrap->Done()
230
  // with 0 on success (with bufs modified to indicate how much data was
231
  // consumed) or a libuv error code on failure. Do not call req_wrap->Done() if
232
  // the write completes synchronously, that is, it should never be called
233
  // before DoWrite() has returned.
234
  virtual int DoWrite(WriteWrap* w,
235
                      uv_buf_t* bufs,
236
                      size_t count,
237
                      uv_stream_t* send_handle) = 0;
238
239
  // Returns true if the stream supports the `OnStreamWantsWrite()` interface.
240
  virtual bool HasWantsWrite() const { return false; }
241
242
  // Optionally, this may provide an error message to be used for
243
  // failing writes.
244
  virtual const char* Error() const;
245
  // Clear the current error (i.e. that would be returned by Error()).
246
  virtual void ClearError();
247
248
  // Transfer ownership of this stream to `listener`. The previous listener
249
  // will not receive any more callbacks while the new listener was active.
250
  void PushStreamListener(StreamListener* listener);
251
  // Remove a listener, and, if this was the currently active one,
252
  // transfer ownership back to the previous listener.
253
  void RemoveStreamListener(StreamListener* listener);
254
255
 protected:
256
  // Call the current listener's OnStreamAlloc() method.
257
  uv_buf_t EmitAlloc(size_t suggested_size);
258
  // Call the current listener's OnStreamRead() method and update the
259
  // stream's read byte counter.
260
  void EmitRead(ssize_t nread, const uv_buf_t& buf = uv_buf_init(nullptr, 0));
261
  // Call the current listener's OnStreamAfterWrite() method.
262
  void EmitAfterWrite(WriteWrap* w, int status);
263
  // Call the current listener's OnStreamAfterShutdown() method.
264
  void EmitAfterShutdown(ShutdownWrap* w, int status);
265
  // Call the current listener's OnStreamWantsWrite() method.
266
  void EmitWantsWrite(size_t suggested_size);
267
268
  StreamListener* listener_ = nullptr;
269
  uint64_t bytes_read_ = 0;
270
  uint64_t bytes_written_ = 0;
271
272
  friend class StreamListener;
273
};
274
275
276
93087
class StreamBase : public StreamResource {
277
 public:
278
  // 0 is reserved for the BaseObject pointer.
279
  static constexpr int kStreamBaseField = 1;
280
  static constexpr int kOnReadFunctionField = 2;
281
  static constexpr int kStreamBaseFieldCount = 3;
282
283
  static void AddMethods(Environment* env,
284
                         v8::Local<v8::FunctionTemplate> target);
285
286
  virtual bool IsAlive() = 0;
287
  virtual bool IsClosing() = 0;
288
  virtual bool IsIPCPipe();
289
  virtual int GetFD();
290
291
  enum StreamBaseJSChecks { DONT_SKIP_NREAD_CHECKS, SKIP_NREAD_CHECKS };
292
293
  v8::MaybeLocal<v8::Value> CallJSOnreadMethod(
294
      ssize_t nread,
295
      v8::Local<v8::ArrayBuffer> ab,
296
      size_t offset = 0,
297
      StreamBaseJSChecks checks = DONT_SKIP_NREAD_CHECKS);
298
299
  // This is named `stream_env` to avoid name clashes, because a lot of
300
  // subclasses are also `BaseObject`s.
301
  Environment* stream_env() const;
302
303
  // Shut down the current stream. This request can use an existing
304
  // ShutdownWrap object (that was created in JS), or a new one will be created.
305
  // Returns 1 in case of a synchronous completion, 0 in case of asynchronous
306
  // completion, and a libuv error case in case of synchronous failure.
307
  int Shutdown(v8::Local<v8::Object> req_wrap_obj = v8::Local<v8::Object>());
308
309
  // Write data to the current stream. This request can use an existing
310
  // WriteWrap object (that was created in JS), or a new one will be created.
311
  // This will first try to write synchronously using `DoTryWrite()`, then
312
  // asynchronously using `DoWrite()`.
313
  // If the return value indicates a synchronous completion, no callback will
314
  // be invoked.
315
  StreamWriteResult Write(
316
      uv_buf_t* bufs,
317
      size_t count,
318
      uv_stream_t* send_handle = nullptr,
319
      v8::Local<v8::Object> req_wrap_obj = v8::Local<v8::Object>());
320
321
  // These can be overridden by subclasses to get more specific wrap instances.
322
  // For example, a subclass Foo could create a FooWriteWrap or FooShutdownWrap
323
  // (inheriting from ShutdownWrap/WriteWrap) that has extra fields, like
324
  // an associated libuv request.
325
  virtual ShutdownWrap* CreateShutdownWrap(v8::Local<v8::Object> object);
326
  virtual WriteWrap* CreateWriteWrap(v8::Local<v8::Object> object);
327
328
  // One of these must be implemented
329
  virtual AsyncWrap* GetAsyncWrap() = 0;
330
  virtual v8::Local<v8::Object> GetObject();
331
332
  static StreamBase* FromObject(v8::Local<v8::Object> obj);
333
334
 protected:
335
  explicit StreamBase(Environment* env);
336
337
  // JS Methods
338
  int ReadStartJS(const v8::FunctionCallbackInfo<v8::Value>& args);
339
  int ReadStopJS(const v8::FunctionCallbackInfo<v8::Value>& args);
340
  int Shutdown(const v8::FunctionCallbackInfo<v8::Value>& args);
341
  int Writev(const v8::FunctionCallbackInfo<v8::Value>& args);
342
  int WriteBuffer(const v8::FunctionCallbackInfo<v8::Value>& args);
343
  template <enum encoding enc>
344
  int WriteString(const v8::FunctionCallbackInfo<v8::Value>& args);
345
  int UseUserBuffer(const v8::FunctionCallbackInfo<v8::Value>& args);
346
347
  static void GetFD(const v8::FunctionCallbackInfo<v8::Value>& args);
348
  static void GetExternal(const v8::FunctionCallbackInfo<v8::Value>& args);
349
  static void GetBytesRead(const v8::FunctionCallbackInfo<v8::Value>& args);
350
  static void GetBytesWritten(const v8::FunctionCallbackInfo<v8::Value>& args);
351
  void AttachToObject(v8::Local<v8::Object> obj);
352
353
  template <int (StreamBase::*Method)(
354
      const v8::FunctionCallbackInfo<v8::Value>& args)>
355
  static void JSMethod(const v8::FunctionCallbackInfo<v8::Value>& args);
356
357
  // Internal, used only in StreamBase methods + env.cc.
358
  enum StreamBaseStateFields {
359
    kReadBytesOrError,
360
    kArrayBufferOffset,
361
    kBytesWritten,
362
    kLastWriteWasAsync,
363
    kNumStreamBaseStateFields
364
  };
365
366
 private:
367
  Environment* env_;
368
  EmitToJSStreamListener default_listener_;
369
370
  void SetWriteResult(const StreamWriteResult& res);
371
  static void AddMethod(Environment* env,
372
                        v8::Local<v8::Signature> sig,
373
                        enum v8::PropertyAttribute attributes,
374
                        v8::Local<v8::FunctionTemplate> t,
375
                        JSMethodFunction* stream_method,
376
                        v8::Local<v8::String> str);
377
378
  friend class WriteWrap;
379
  friend class ShutdownWrap;
380
  friend class Environment;  // For kNumStreamBaseStateFields.
381
};
382
383
384
// These are helpers for creating `ShutdownWrap`/`WriteWrap` instances.
385
// `OtherBase` must have a constructor that matches the `AsyncWrap`
386
// constructors’s (Environment*, Local<Object>, AsyncWrap::Provider) signature
387
// and be a subclass of `AsyncWrap`.
388
template <typename OtherBase>
389

112824
class SimpleShutdownWrap : public ShutdownWrap, public OtherBase {
390
 public:
391
  SimpleShutdownWrap(StreamBase* stream,
392
                     v8::Local<v8::Object> req_wrap_obj);
393
394
121858
  AsyncWrap* GetAsyncWrap() override { return this; }
395
396
  SET_NO_MEMORY_INFO()
397
  SET_MEMORY_INFO_NAME(SimpleShutdownWrap)
398
  SET_SELF_SIZE(SimpleShutdownWrap)
399
};
400
401
template <typename OtherBase>
402

12404
class SimpleWriteWrap : public WriteWrap, public OtherBase {
403
 public:
404
  SimpleWriteWrap(StreamBase* stream,
405
                  v8::Local<v8::Object> req_wrap_obj);
406
407
17781
  AsyncWrap* GetAsyncWrap() override { return this; }
408
409
  SET_NO_MEMORY_INFO()
410
  SET_MEMORY_INFO_NAME(SimpleWriteWrap)
411
  SET_SELF_SIZE(SimpleWriteWrap)
412
};
413
414
}  // namespace node
415
416
#endif  // defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS
417
418
#endif  // SRC_STREAM_BASE_H_