diff -r 000000000000 -r a279a342bc78 src/main/scala/CouchStream.scala --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/main/scala/CouchStream.scala Tue Jul 21 09:24:52 2015 +0200 @@ -0,0 +1,52 @@ +package couchdb.changes + +import spray.httpx.unmarshalling.{MalformedContent, Unmarshaller, Deserialized} +import spray.http._ +import spray.json._ +import spray.client.pipelining._ +import akka.actor.{ActorRef, Actor, ActorLogging} +import scala.util.Try +import spray.can.Http +import akka.io.IO + +case class ResultRow(seq: Int, id: String, doc: JsObject) + +trait ChangesMarshaller { + + implicit object ChangesMarshaller extends Unmarshaller[ResultRow] { + import DefaultJsonProtocol._ + implicit val changesFormat = jsonFormat3(ResultRow) + + def apply(entity: HttpEntity): Deserialized[ResultRow] = (Try { + Right(JsonParser(entity.asString).asJsObject.convertTo[ResultRow]) + }).getOrElse(Left(MalformedContent("bad json"))) + + } +} + +class ChangesStreamActor(processor: ActorRef) extends Actor with ActorLogging + with ChangesMarshaller { + + val io = IO(Http)(context.system) + + def receive: Receive = { + case uri: Uri => + val rq = HttpRequest(HttpMethods.GET, uri = uri) + log.debug("Sending request {}", rq) + sendTo(io).withResponsesReceivedBy(self)(rq) + case x@ChunkedResponseStart(_) => + log.debug("Received {}", x) + case MessageChunk(data, _) => + log.debug("Received data chunk {}", data) + if (data.length > 2) // skip heartbeat messages + ChangesMarshaller(data).fold( + _ => log.warning("Failed to deserialize entity {}", data), + processor ! + ) + case x => + log.debug("Received {}", x) + } +} + + +// vim: set ts=2 sw=2 et: