c# - misunderstanding of concurrentQueue, a single consumer working from the queue on it's own thread -
i'm having trouble creating functioning systemfilewatcher takes created event , stores in queue separate thread work from. i've read countless threads here regarding issue can't head around particular problem.
using system; using system.io; using system.collections.generic; using system.collections.concurrent; using system.collections; using system.threading; namespace filesystemwatchertest { class program { public static blockingcollection<string> processcollection = new blockingcollection<string>(new concurrentqueue<string>()); static void main(string[] args) { string path = @"c:\test\"; filesystemwatcher watcher = new filesystemwatcher(); watcher.path = path; watcher.enableraisingevents = true; watcher.filter = "*.*"; watcher.created += new filesystemeventhandler(oncreated); thread consumer = new thread(new threadstart(mover)); consumer.start(); while (true) ;//run infinite loop program doesn't terminate untill force it. } static void oncreated(object sender, filesystemeventargs e) { processcollection.add(e.fullpath); } static void mover() { string current; string processed = @"c:\test\processed\"; while (true) { while (processcollection.iscompleted) { thread.sleep(1000); } while (processcollection.trytake(out current)) { system.io.file.move(current, processed); } } } }
}
this i'd test. i'm aware not work. i've verified fsw works when i'm writing console when file placed inside queue. problem begins around when try start mover function in it's own thread. mover function , oncreated not appear communicate once start working off queue.
my expectation of code start mover function in it's own thread , run alongside sfw. expectation concurrentqueue attached blockingcollection auto updates (i enqueue item through oncreated, mover sees has +1 queue. mover takes 1 queue, oncreated sees this.) i'm using thread.sleep incorrectly. no longer have supporting reason using blockingcollection (which chose @ first handle waiting queue fill and, basically, check queue item process) , open changing whatever might work. i've seen use of locks, understand not necessary due how concurrentqueue synchronizes.
the ultimate goal process large quantities of small files come in @ random times , can range 1 several hundred @ given time. these files .emls.
if @ possible, appreciate explanation of happening , suggestion around problem. come humbly , expect told understand incorrect!
edit: i'm testing console application used service afterwards. added while (true) ; before oncreated() keep fsw running.
you have several different problems in code example:
- you misusing
file.move()
method. requires both parameters full file name. passing directory name second parameter, incorrect. - you inspecting
iscompleted
property of collection, if useful.false
, , block of code nothing. leads next problem… - your thread running in tight loop, consuming massive amounts of cpu time. may or may not cause errors, could…
filesystemwatcher
not guaranteed report changes, , 1 of reasons might not if can't enough cpu time monitor file system. if starve using cpu time, might find doesn't report change. note problem exists in primary thread too; running in tight loop, consuming massive amounts of cpu time doing nothing. occupying 2 cores of system. - you failing take advantage of producer/consumer model of execution
blockingcollection
designed for. should having worker thread enumerate enumeration returnedgetconsumingenumerable()
, usingcompleteadding()
method signal thread there no more work.
here version of code example corrects above mistakes, cleans example bit it's more self-contained:
// default backing collection blockingcollection<t> // concurrentqueue<t>. there's no need specify // explicitly. public static blockingcollection<string> processcollection = new blockingcollection<string>(); static void main(string[] args) { string testdirectory = path.combine(environment.currentdirectory, "test"); console.writeline("creating directory: \"{0}\"", testdirectory); directory.createdirectory(testdirectory); filesystemwatcher watcher = new filesystemwatcher(); watcher.path = testdirectory; watcher.enableraisingevents = true; watcher.filter = "*.*"; watcher.created += new filesystemeventhandler(oncreated); thread consumer = new thread(new parameterizedthreadstart(mover)); consumer.start(testdirectory); string text; while ((text = console.readline()) != "") { string newfile = path.combine(testdirectory, text + ".txt"); file.writealltext(newfile, "test file"); } processcollection.completeadding(); } static void oncreated(object sender, filesystemeventargs e) { if (e.changetype == watcherchangetypes.created) { processcollection.add(e.fullpath); } } static void mover(object testdirectory) { string processed = path.combine((string)testdirectory, "processed"); console.writeline("creating directory: \"{0}\"", processed); directory.createdirectory(processed); foreach (string current in processcollection.getconsumingenumerable()) { // ensure file in fact file , not else. if (file.exists(current)) { system.io.file.move(current, path.combine(processed, path.getfilename(current))); } } }
Comments
Post a Comment