-
Notifications
You must be signed in to change notification settings - Fork 45
/
watcher.js
178 lines (159 loc) · 4.66 KB
/
watcher.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
/**
* This is the {@link module:core/local/watcher|local watcher} implementation
* based on the {@link https://github.com/atom/watcher|@atom/watcher} library.
*
* The watcher is built as a chain of steps. Each step can be seen as an actor
* that communicates with the next one via a Channel. The first step is called
* the producer: even if the chain is ready at the end of this constructor,
* the producer won't start pushing batches of events until it is started.
*
* ## Windows
*
* [![Windows watcher workflow](../../doc/developer/win_watcher.png)](../../doc/developer/win_watcher.png)
*
* ## GNU/Linux
*
* [![GNU/Linux watcher workflow](../../doc/developer/linux_watcher.png)](../../doc/developer/linux_watcher.png)
*
* @module core/local/atom/watcher
* @flow
*/
const Promise = require('bluebird')
const _ = require('lodash')
const checksumer = require('./../checksumer')
const Producer = require('./producer')
const addInfos = require('./add_infos')
const filterIgnored = require('./filter_ignored')
const fireLocatStartEvent = require('./fire_local_start_event')
const winDetectMove = require('./win_detect_move')
const winIdenticalRenaming = require('./win_identical_renaming')
const scanFolder = require('./scan_folder')
const awaitWriteFinish = require('./await_write_finish')
const initialDiff = require('./initial_diff')
const addChecksum = require('./add_checksum')
const incompleteFixer = require('./incomplete_fixer')
const overwrite = require('./overwrite')
const dispatch = require('./dispatch')
const logger = require('../../utils/logger')
/*::
import type { Config } from '../../config'
import type { Pouch } from '../../pouch'
import type Prep from '../../prep'
import type EventEmitter from 'events'
import type { Ignore } from '../../ignore'
import type { Checksumer } from '../checksumer'
import type { AtomEventsDispatcher } from './dispatch'
import type { Scanner } from './producer'
type AtomWatcherOptions = {
config: Config,
onAtomEvents?: AtomEventsDispatcher,
prep: Prep,
pouch: Pouch,
events: EventEmitter,
ignore: Ignore
}
*/
const log = logger({
component: 'AtomWatcher'
})
/** Returns the step only when the given platform matches the current one.
*
* Makes it easy to include a step only for some platform.
*/
const only = (platform, step) => platform === process.platform && step
/** The steps for the current platform. */
const steps = _.compact([
addInfos,
filterIgnored,
fireLocatStartEvent,
only('win32', winIdenticalRenaming),
only('win32', winDetectMove),
scanFolder,
awaitWriteFinish,
initialDiff,
addChecksum,
incompleteFixer,
overwrite,
dispatch
])
/** The producer for the current platform. */
const producer = opts => {
if (['linux', 'win32'].includes(process.platform)) {
return new Producer(opts)
} else {
throw new Error('The atom watcher is not available on this platform')
}
}
const stepsInitialState = (
state /*: Object */,
opts /*: * */
) /*: Promise<Object> */ =>
Promise.reduce(
steps,
async (prevState, step) =>
step.initialState
? _.assign(prevState, await step.initialState(opts))
: prevState,
state
)
class AtomWatcher {
/*::
config: Config
pouch: Pouch
events: EventEmitter
checksumer: Checksumer
producer: Producer
state: Object
running: Promise<void>
_runningResolve: ?Function
_runningReject: ?Function
*/
constructor(opts /*: AtomWatcherOptions */) {
this.config = opts.config
this.pouch = opts.pouch
this.events = opts.events
this.checksumer = checksumer.init()
this.producer = producer(opts)
this.state = {}
const stepOptions = Object.assign(
({
checksumer: this.checksumer,
scan: this.producer.scan,
state: this.state
} /*: Object */),
opts
)
// Here, we build the chain of steps.
steps.reduce(
(chan, step) => step.loop(chan, stepOptions),
this.producer.channel
)
}
async start() {
log.debug('starting...')
await stepsInitialState(this.state, this)
const scanDone = new Promise(resolve => {
this.events.on('initial-scan-done', resolve)
})
await this.producer.start()
await scanDone
this.running = new Promise((resolve, reject) => {
this._runningResolve = resolve
// XXX: This rejecter is never used. How can the watcher fail? How to
// catch those errors and feed them to this rejecter?
this._runningReject = reject
})
}
async stop() /*: Promise<*> */ {
log.debug('stopping...')
this.producer.stop()
if (this._runningResolve) {
this._runningResolve()
this._runningResolve = null
}
}
}
module.exports = {
AtomWatcher,
stepsInitialState
}