GCC Code Coverage Report
Directory: ../ Exec Total Coverage
File: /home/iojs/build/workspace/node-test-commit-linux-coverage-daily/nodes/benchmark/out/../src/stream_base.h Lines: 24 32 75.0 %
Date: 2020-11-21 04:10:54 Branches: 1 2 50.0 %

Line Branch Exec Source
1
#ifndef SRC_STREAM_BASE_H_
2
#define SRC_STREAM_BASE_H_
3
4
#if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS
5
6
#include "env.h"
7
#include "allocated_buffer.h"
8
#include "async_wrap.h"
9
#include "node.h"
10
#include "util.h"
11
12
#include "v8.h"
13
14
namespace node {
15
16
// Forward declarations
17
class Environment;
18
class ShutdownWrap;
19
class WriteWrap;
20
class StreamBase;
21
class StreamResource;
22
23
struct StreamWriteResult {
24
  bool async;
25
  int err;
26
  WriteWrap* wrap;
27
  size_t bytes;
28
};
29
30
using JSMethodFunction = void(const v8::FunctionCallbackInfo<v8::Value>& args);
31
32
class StreamReq {
33
 public:
34
  // The kSlot internal field here mirrors BaseObject::InternalFields::kSlot
35
  // here because instances derived from StreamReq will also derive from
36
  // BaseObject, and the slots are used for the identical purpose.
37
  enum InternalFields {
38
    kSlot = BaseObject::kSlot,
39
    kStreamReqField = BaseObject::kInternalFieldCount,
40
    kInternalFieldCount
41
  };
42
43
  inline explicit StreamReq(
44
      StreamBase* stream,
45
      v8::Local<v8::Object> req_wrap_obj);
46
47
14733
  virtual ~StreamReq() = default;
48
  virtual AsyncWrap* GetAsyncWrap() = 0;
49
  inline v8::Local<v8::Object> object();
50
51
  inline void Done(int status, const char* error_str = nullptr);
52
  inline void Dispose();
53
54
14794
  StreamBase* stream() const { return stream_; }
55
56
  static inline StreamReq* FromObject(v8::Local<v8::Object> req_wrap_obj);
57
58
  // Sets all internal fields of `req_wrap_obj` to `nullptr`.
59
  // This is what the `WriteWrap` and `ShutdownWrap` JS constructors do,
60
  // and what we use in C++ after creating these objects from their
61
  // v8::ObjectTemplates, to avoid the overhead of calling the
62
  // constructor explicitly.
63
  static inline void ResetObject(v8::Local<v8::Object> req_wrap_obj);
64
65
 protected:
66
  virtual void OnDone(int status) = 0;
67
68
  inline void AttachToObject(v8::Local<v8::Object> req_wrap_obj);
69
70
 private:
71
  StreamBase* const stream_;
72
};
73
74
7256
class ShutdownWrap : public StreamReq {
75
 public:
76
  inline ShutdownWrap(
77
      StreamBase* stream,
78
      v8::Local<v8::Object> req_wrap_obj);
79
80
  static inline ShutdownWrap* FromObject(v8::Local<v8::Object> req_wrap_obj);
81
  template <typename T, bool kIsWeak>
82
  static inline ShutdownWrap* FromObject(
83
      const BaseObjectPtrImpl<T, kIsWeak>& base_obj);
84
85
  // Call stream()->EmitAfterShutdown() and dispose of this request wrap.
86
  void OnDone(int status) override;
87
};
88
89
7477
class WriteWrap : public StreamReq {
90
 public:
91
  inline void SetAllocatedStorage(AllocatedBuffer&& storage);
92
93
  inline WriteWrap(
94
      StreamBase* stream,
95
      v8::Local<v8::Object> req_wrap_obj);
96
97
  static inline WriteWrap* FromObject(v8::Local<v8::Object> req_wrap_obj);
98
  template <typename T, bool kIsWeak>
99
  static inline WriteWrap* FromObject(
100
      const BaseObjectPtrImpl<T, kIsWeak>& base_obj);
101
102
  // Call stream()->EmitAfterWrite() and dispose of this request wrap.
103
  void OnDone(int status) override;
104
105
 private:
106
  AllocatedBuffer storage_;
107
};
108
109
110
// This is the generic interface for objects that control Node.js' C++ streams.
111
// For example, the default `EmitToJSStreamListener` emits a stream's data
112
// as Buffers in JS, or `TLSWrap` reads and decrypts data from a stream.
113
104866
class StreamListener {
114
 public:
115
  virtual ~StreamListener();
116
117
  // This is called when a stream wants to allocate memory before
118
  // reading data into the freshly allocated buffer (i.e. it is always followed
119
  // by a `OnStreamRead()` call).
120
  // This memory may be statically or dynamically allocated; for example,
121
  // a protocol parser may want to read data into a static buffer if it knows
122
  // that all data is going to be fully handled during the next
123
  // `OnStreamRead()` call.
124
  // The returned buffer does not need to contain `suggested_size` bytes.
125
  // The default implementation of this method returns a buffer that has exactly
126
  // the suggested size and is allocated using malloc().
127
  // It is not valid to return a zero-length buffer from this method.
128
  // It is not guaranteed that the corresponding `OnStreamRead()` call
129
  // happens in the same event loop turn as this call.
130
  virtual uv_buf_t OnStreamAlloc(size_t suggested_size) = 0;
131
132
  // `OnStreamRead()` is called when data is available on the socket and has
133
  // been read into the buffer provided by `OnStreamAlloc()`.
134
  // The `buf` argument is the return value of `uv_buf_t`, or may be a buffer
135
  // with base nullptr in case of an error.
136
  // `nread` is the number of read bytes (which is at most the buffer length),
137
  // or, if negative, a libuv error code.
138
  virtual void OnStreamRead(ssize_t nread,
139
                            const uv_buf_t& buf) = 0;
140
141
  // This is called once a write has finished. `status` may be 0 or,
142
  // if negative, a libuv error code.
143
  // By default, this is simply passed on to the previous listener
144
  // (and raises an assertion if there is none).
145
  virtual void OnStreamAfterWrite(WriteWrap* w, int status);
146
147
  // This is called once a shutdown has finished. `status` may be 0 or,
148
  // if negative, a libuv error code.
149
  // By default, this is simply passed on to the previous listener
150
  // (and raises an assertion if there is none).
151
  virtual void OnStreamAfterShutdown(ShutdownWrap* w, int status);
152
153
  // This is called by the stream if it determines that it wants more data
154
  // to be written to it. Not all streams support this.
155
  // This callback will not be called as long as there are active writes.
156
  // It is not supported by all streams; `stream->HasWantsWrite()` returns
157
  // true if it is supported by a stream.
158
2357
  virtual void OnStreamWantsWrite(size_t suggested_size) {}
159
160
  // This is called immediately before the stream is destroyed.
161
596
  virtual void OnStreamDestroy() {}
162
163
  // The stream this is currently associated with, or nullptr if there is none.
164
57345
  StreamResource* stream() const { return stream_; }
165
166
 protected:
167
  // Pass along a read error to the `StreamListener` instance that was active
168
  // before this one. For example, a protocol parser does not care about read
169
  // errors and may instead want to let the original handler
170
  // (e.g. the JS handler) take care of the situation.
171
  inline void PassReadErrorToPreviousListener(ssize_t nread);
172
173
  StreamResource* stream_ = nullptr;
174
  StreamListener* previous_listener_ = nullptr;
175
176
  friend class StreamResource;
177
};
178
179
180
// An (incomplete) stream listener class that calls the `.oncomplete()`
181
// method of the JS objects associated with the wrap objects.
182
128873
class ReportWritesToJSStreamListener : public StreamListener {
183
 public:
184
  void OnStreamAfterWrite(WriteWrap* w, int status) override;
185
  void OnStreamAfterShutdown(ShutdownWrap* w, int status) override;
186
187
 private:
188
  void OnStreamAfterReqFinished(StreamReq* req_wrap, int status);
189
};
190
191
192
// A default emitter that just pushes data chunks as Buffer instances to
193
// JS land via the handle’s .ondata method.
194
128849
class EmitToJSStreamListener : public ReportWritesToJSStreamListener {
195
 public:
196
  uv_buf_t OnStreamAlloc(size_t suggested_size) override;
197
  void OnStreamRead(ssize_t nread, const uv_buf_t& buf) override;
198
};
199
200
201
// An alternative listener that uses a custom, user-provided buffer
202
// for reading data.
203
24
class CustomBufferJSListener : public ReportWritesToJSStreamListener {
204
 public:
205
  uv_buf_t OnStreamAlloc(size_t suggested_size) override;
206
  void OnStreamRead(ssize_t nread, const uv_buf_t& buf) override;
207
12
  void OnStreamDestroy() override { delete this; }
208
209
12
  explicit CustomBufferJSListener(uv_buf_t buffer) : buffer_(buffer) {}
210
211
 private:
212
  uv_buf_t buffer_;
213
};
214
215
216
// A generic stream, comparable to JS land’s `Duplex` streams.
217
// A stream is always controlled through one `StreamListener` instance.
218
64603
class StreamResource {
219
 public:
220
  virtual ~StreamResource();
221
222
  // These need to be implemented on the readable side of this stream:
223
224
  // Start reading from the underlying resource. This is called by the consumer
225
  // when more data is desired. Use `EmitAlloc()` and `EmitData()` to
226
  // pass data along to the consumer.
227
  virtual int ReadStart() = 0;
228
  // Stop reading from the underlying resource. This is called by the
229
  // consumer when its buffers are full and no more data can be handled.
230
  virtual int ReadStop() = 0;
231
232
  // These need to be implemented on the writable side of this stream:
233
  // All of these methods may return an error code synchronously.
234
  // In that case, the finish callback should *not* be called.
235
236
  // Perform a shutdown operation, and either call req_wrap->Done() when
237
  // finished and return 0, return 1 for synchronous success, or
238
  // a libuv error code for synchronous failures.
239
  virtual int DoShutdown(ShutdownWrap* req_wrap) = 0;
240
  // Try to write as much data as possible synchronously, and modify
241
  // `*bufs` and `*count` accordingly. This is a no-op by default.
242
  // Return 0 for success and a libuv error code for failures.
243
  virtual int DoTryWrite(uv_buf_t** bufs, size_t* count);
244
  // Initiate a write of data. If the write completes synchronously, return 0 on
245
  // success (with bufs modified to indicate how much data was consumed) or a
246
  // libuv error code on failure. If the write will complete asynchronously,
247
  // return 0. When the write completes asynchronously, call req_wrap->Done()
248
  // with 0 on success (with bufs modified to indicate how much data was
249
  // consumed) or a libuv error code on failure. Do not call req_wrap->Done() if
250
  // the write completes synchronously, that is, it should never be called
251
  // before DoWrite() has returned.
252
  virtual int DoWrite(WriteWrap* w,
253
                      uv_buf_t* bufs,
254
                      size_t count,
255
                      uv_stream_t* send_handle) = 0;
256
257
  // Returns true if the stream supports the `OnStreamWantsWrite()` interface.
258
  virtual bool HasWantsWrite() const { return false; }
259
260
  // Optionally, this may provide an error message to be used for
261
  // failing writes.
262
  virtual const char* Error() const;
263
  // Clear the current error (i.e. that would be returned by Error()).
264
  virtual void ClearError();
265
266
  // Transfer ownership of this stream to `listener`. The previous listener
267
  // will not receive any more callbacks while the new listener was active.
268
  inline void PushStreamListener(StreamListener* listener);
269
  // Remove a listener, and, if this was the currently active one,
270
  // transfer ownership back to the previous listener.
271
  inline void RemoveStreamListener(StreamListener* listener);
272
273
 protected:
274
  // Call the current listener's OnStreamAlloc() method.
275
  inline uv_buf_t EmitAlloc(size_t suggested_size);
276
  // Call the current listener's OnStreamRead() method and update the
277
  // stream's read byte counter.
278
  inline void EmitRead(
279
      ssize_t nread,
280
28042
      const uv_buf_t& buf = uv_buf_init(nullptr, 0));
281
  // Call the current listener's OnStreamAfterWrite() method.
282
  inline void EmitAfterWrite(WriteWrap* w, int status);
283
  // Call the current listener's OnStreamAfterShutdown() method.
284
  inline void EmitAfterShutdown(ShutdownWrap* w, int status);
285
  // Call the current listener's OnStreamWantsWrite() method.
286
  inline void EmitWantsWrite(size_t suggested_size);
287
288
  StreamListener* listener_ = nullptr;
289
  uint64_t bytes_read_ = 0;
290
  uint64_t bytes_written_ = 0;
291
292
  friend class StreamListener;
293
};
294
295
296
64246
class StreamBase : public StreamResource {
297
 public:
298
  // The kSlot field here mirrors that of BaseObject::InternalFields::kSlot
299
  // because instances deriving from StreamBase generally also derived from
300
  // BaseObject (it's possible for it not to, however).
301
  enum InternalFields {
302
    kSlot = BaseObject::kSlot,
303
    kStreamBaseField = BaseObject::kInternalFieldCount,
304
    kOnReadFunctionField,
305
    kInternalFieldCount
306
  };
307
308
  static void AddMethods(Environment* env,
309
                         v8::Local<v8::FunctionTemplate> target);
310
311
  virtual bool IsAlive() = 0;
312
  virtual bool IsClosing() = 0;
313
  virtual bool IsIPCPipe();
314
  virtual int GetFD();
315
316
  enum StreamBaseJSChecks { DONT_SKIP_NREAD_CHECKS, SKIP_NREAD_CHECKS };
317
318
  v8::MaybeLocal<v8::Value> CallJSOnreadMethod(
319
      ssize_t nread,
320
      v8::Local<v8::ArrayBuffer> ab,
321
      size_t offset = 0,
322
      StreamBaseJSChecks checks = DONT_SKIP_NREAD_CHECKS);
323
324
  // This is named `stream_env` to avoid name clashes, because a lot of
325
  // subclasses are also `BaseObject`s.
326
736744
  Environment* stream_env() const { return env_; }
327
328
  // Shut down the current stream. This request can use an existing
329
  // ShutdownWrap object (that was created in JS), or a new one will be created.
330
  // Returns 1 in case of a synchronous completion, 0 in case of asynchronous
331
  // completion, and a libuv error case in case of synchronous failure.
332
  inline int Shutdown(
333
      v8::Local<v8::Object> req_wrap_obj = v8::Local<v8::Object>());
334
335
  // Write data to the current stream. This request can use an existing
336
  // WriteWrap object (that was created in JS), or a new one will be created.
337
  // This will first try to write synchronously using `DoTryWrite()`, then
338
  // asynchronously using `DoWrite()`.
339
  // If the return value indicates a synchronous completion, no callback will
340
  // be invoked.
341
  inline StreamWriteResult Write(
342
      uv_buf_t* bufs,
343
      size_t count,
344
      uv_stream_t* send_handle = nullptr,
345
      v8::Local<v8::Object> req_wrap_obj = v8::Local<v8::Object>());
346
347
  // These can be overridden by subclasses to get more specific wrap instances.
348
  // For example, a subclass Foo could create a FooWriteWrap or FooShutdownWrap
349
  // (inheriting from ShutdownWrap/WriteWrap) that has extra fields, like
350
  // an associated libuv request.
351
  virtual ShutdownWrap* CreateShutdownWrap(v8::Local<v8::Object> object);
352
  virtual WriteWrap* CreateWriteWrap(v8::Local<v8::Object> object);
353
354
  // One of these must be implemented
355
  virtual AsyncWrap* GetAsyncWrap() = 0;
356
  virtual v8::Local<v8::Object> GetObject();
357
358
  static inline StreamBase* FromObject(v8::Local<v8::Object> obj);
359
360
 protected:
361
  inline explicit StreamBase(Environment* env);
362
363
  // JS Methods
364
  int ReadStartJS(const v8::FunctionCallbackInfo<v8::Value>& args);
365
  int ReadStopJS(const v8::FunctionCallbackInfo<v8::Value>& args);
366
  int Shutdown(const v8::FunctionCallbackInfo<v8::Value>& args);
367
  int Writev(const v8::FunctionCallbackInfo<v8::Value>& args);
368
  int WriteBuffer(const v8::FunctionCallbackInfo<v8::Value>& args);
369
  template <enum encoding enc>
370
  int WriteString(const v8::FunctionCallbackInfo<v8::Value>& args);
371
  int UseUserBuffer(const v8::FunctionCallbackInfo<v8::Value>& args);
372
373
  static void GetFD(const v8::FunctionCallbackInfo<v8::Value>& args);
374
  static void GetExternal(const v8::FunctionCallbackInfo<v8::Value>& args);
375
  static void GetBytesRead(const v8::FunctionCallbackInfo<v8::Value>& args);
376
  static void GetBytesWritten(const v8::FunctionCallbackInfo<v8::Value>& args);
377
  inline void AttachToObject(v8::Local<v8::Object> obj);
378
379
  template <int (StreamBase::*Method)(
380
      const v8::FunctionCallbackInfo<v8::Value>& args)>
381
  static void JSMethod(const v8::FunctionCallbackInfo<v8::Value>& args);
382
383
  // Internal, used only in StreamBase methods + env.cc.
384
  enum StreamBaseStateFields {
385
    kReadBytesOrError,
386
    kArrayBufferOffset,
387
    kBytesWritten,
388
    kLastWriteWasAsync,
389
    kNumStreamBaseStateFields
390
  };
391
392
 private:
393
  Environment* env_;
394
  EmitToJSStreamListener default_listener_;
395
396
  void SetWriteResult(const StreamWriteResult& res);
397
  static void AddMethod(Environment* env,
398
                        v8::Local<v8::Signature> sig,
399
                        enum v8::PropertyAttribute attributes,
400
                        v8::Local<v8::FunctionTemplate> t,
401
                        JSMethodFunction* stream_method,
402
                        v8::Local<v8::String> str);
403
404
  friend class WriteWrap;
405
  friend class ShutdownWrap;
406
  friend class Environment;  // For kNumStreamBaseStateFields.
407
};
408
409
410
// These are helpers for creating `ShutdownWrap`/`WriteWrap` instances.
411
// `OtherBase` must have a constructor that matches the `AsyncWrap`
412
// constructors’s (Environment*, Local<Object>, AsyncWrap::Provider) signature
413
// and be a subclass of `AsyncWrap`.
414
template <typename OtherBase>
415
14512
class SimpleShutdownWrap : public ShutdownWrap, public OtherBase {
416
 public:
417
  SimpleShutdownWrap(StreamBase* stream,
418
                     v8::Local<v8::Object> req_wrap_obj);
419
420
29024
  AsyncWrap* GetAsyncWrap() override { return this; }
421
422
3
  SET_NO_MEMORY_INFO()
423
3
  SET_MEMORY_INFO_NAME(SimpleShutdownWrap)
424
3
  SET_SELF_SIZE(SimpleShutdownWrap)
425
426
  bool IsNotIndicativeOfMemoryLeakAtExit() const override {
427
    return OtherBase::IsNotIndicativeOfMemoryLeakAtExit();
428
  }
429
};
430
431
template <typename OtherBase>
432
14954
class SimpleWriteWrap : public WriteWrap, public OtherBase {
433
 public:
434
  SimpleWriteWrap(StreamBase* stream,
435
                  v8::Local<v8::Object> req_wrap_obj);
436
437
34904
  AsyncWrap* GetAsyncWrap() override { return this; }
438
439
  SET_NO_MEMORY_INFO()
440
  SET_MEMORY_INFO_NAME(SimpleWriteWrap)
441
  SET_SELF_SIZE(SimpleWriteWrap)
442
443
  bool IsNotIndicativeOfMemoryLeakAtExit() const override {
444
    return OtherBase::IsNotIndicativeOfMemoryLeakAtExit();
445
  }
446
};
447
448
}  // namespace node
449
450
#endif  // defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS
451
452
#endif  // SRC_STREAM_BASE_H_