--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/src/main/scala/CouchStream.scala Tue Jul 21 09:24:52 2015 +0200
@@ -0,0 +1,52 @@
+package couchdb.changes
+
+import spray.httpx.unmarshalling.{MalformedContent, Unmarshaller, Deserialized}
+import spray.http._
+import spray.json._
+import spray.client.pipelining._
+import akka.actor.{ActorRef, Actor, ActorLogging}
+import scala.util.Try
+import spray.can.Http
+import akka.io.IO
+
+case class ResultRow(seq: Int, id: String, doc: JsObject)
+
+trait ChangesMarshaller {
+
+ implicit object ChangesMarshaller extends Unmarshaller[ResultRow] {
+ import DefaultJsonProtocol._
+ implicit val changesFormat = jsonFormat3(ResultRow)
+
+ def apply(entity: HttpEntity): Deserialized[ResultRow] = (Try {
+ Right(JsonParser(entity.asString).asJsObject.convertTo[ResultRow])
+ }).getOrElse(Left(MalformedContent("bad json")))
+
+ }
+}
+
+class ChangesStreamActor(processor: ActorRef) extends Actor with ActorLogging
+ with ChangesMarshaller {
+
+ val io = IO(Http)(context.system)
+
+ def receive: Receive = {
+ case uri: Uri =>
+ val rq = HttpRequest(HttpMethods.GET, uri = uri)
+ log.debug("Sending request {}", rq)
+ sendTo(io).withResponsesReceivedBy(self)(rq)
+ case x@ChunkedResponseStart(_) =>
+ log.debug("Received {}", x)
+ case MessageChunk(data, _) =>
+ log.debug("Received data chunk {}", data)
+ if (data.length > 2) // skip heartbeat messages
+ ChangesMarshaller(data).fold(
+ _ => log.warning("Failed to deserialize entity {}", data),
+ processor !
+ )
+ case x =>
+ log.debug("Received {}", x)
+ }
+}
+
+
+// vim: set ts=2 sw=2 et: