src/main/scala/CouchStream.scala
author Tomas Zeman <tzeman@volny.cz>
Tue, 21 Jul 2015 09:24:52 +0200
changeset 0 a279a342bc78
child 4 b25c02bd6b11
permissions -rw-r--r--
Initial import
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
0
a279a342bc78 Initial import
Tomas Zeman <tzeman@volny.cz>
parents:
diff changeset
     1
package couchdb.changes
a279a342bc78 Initial import
Tomas Zeman <tzeman@volny.cz>
parents:
diff changeset
     2
a279a342bc78 Initial import
Tomas Zeman <tzeman@volny.cz>
parents:
diff changeset
     3
import spray.httpx.unmarshalling.{MalformedContent, Unmarshaller, Deserialized}
a279a342bc78 Initial import
Tomas Zeman <tzeman@volny.cz>
parents:
diff changeset
     4
import spray.http._
a279a342bc78 Initial import
Tomas Zeman <tzeman@volny.cz>
parents:
diff changeset
     5
import spray.json._
a279a342bc78 Initial import
Tomas Zeman <tzeman@volny.cz>
parents:
diff changeset
     6
import spray.client.pipelining._
a279a342bc78 Initial import
Tomas Zeman <tzeman@volny.cz>
parents:
diff changeset
     7
import akka.actor.{ActorRef, Actor, ActorLogging}
a279a342bc78 Initial import
Tomas Zeman <tzeman@volny.cz>
parents:
diff changeset
     8
import scala.util.Try
a279a342bc78 Initial import
Tomas Zeman <tzeman@volny.cz>
parents:
diff changeset
     9
import spray.can.Http
a279a342bc78 Initial import
Tomas Zeman <tzeman@volny.cz>
parents:
diff changeset
    10
import akka.io.IO
a279a342bc78 Initial import
Tomas Zeman <tzeman@volny.cz>
parents:
diff changeset
    11
a279a342bc78 Initial import
Tomas Zeman <tzeman@volny.cz>
parents:
diff changeset
    12
case class ResultRow(seq: Int, id: String, doc: JsObject)
a279a342bc78 Initial import
Tomas Zeman <tzeman@volny.cz>
parents:
diff changeset
    13
a279a342bc78 Initial import
Tomas Zeman <tzeman@volny.cz>
parents:
diff changeset
    14
trait ChangesMarshaller {
a279a342bc78 Initial import
Tomas Zeman <tzeman@volny.cz>
parents:
diff changeset
    15
a279a342bc78 Initial import
Tomas Zeman <tzeman@volny.cz>
parents:
diff changeset
    16
  implicit object ChangesMarshaller extends Unmarshaller[ResultRow] {
a279a342bc78 Initial import
Tomas Zeman <tzeman@volny.cz>
parents:
diff changeset
    17
    import DefaultJsonProtocol._
a279a342bc78 Initial import
Tomas Zeman <tzeman@volny.cz>
parents:
diff changeset
    18
    implicit val changesFormat = jsonFormat3(ResultRow)
a279a342bc78 Initial import
Tomas Zeman <tzeman@volny.cz>
parents:
diff changeset
    19
a279a342bc78 Initial import
Tomas Zeman <tzeman@volny.cz>
parents:
diff changeset
    20
    def apply(entity: HttpEntity): Deserialized[ResultRow] = (Try {
a279a342bc78 Initial import
Tomas Zeman <tzeman@volny.cz>
parents:
diff changeset
    21
      Right(JsonParser(entity.asString).asJsObject.convertTo[ResultRow])
a279a342bc78 Initial import
Tomas Zeman <tzeman@volny.cz>
parents:
diff changeset
    22
    }).getOrElse(Left(MalformedContent("bad json")))
a279a342bc78 Initial import
Tomas Zeman <tzeman@volny.cz>
parents:
diff changeset
    23
a279a342bc78 Initial import
Tomas Zeman <tzeman@volny.cz>
parents:
diff changeset
    24
  }
a279a342bc78 Initial import
Tomas Zeman <tzeman@volny.cz>
parents:
diff changeset
    25
}
a279a342bc78 Initial import
Tomas Zeman <tzeman@volny.cz>
parents:
diff changeset
    26
a279a342bc78 Initial import
Tomas Zeman <tzeman@volny.cz>
parents:
diff changeset
    27
class ChangesStreamActor(processor: ActorRef) extends Actor with ActorLogging
a279a342bc78 Initial import
Tomas Zeman <tzeman@volny.cz>
parents:
diff changeset
    28
  with ChangesMarshaller {
a279a342bc78 Initial import
Tomas Zeman <tzeman@volny.cz>
parents:
diff changeset
    29
a279a342bc78 Initial import
Tomas Zeman <tzeman@volny.cz>
parents:
diff changeset
    30
  val io = IO(Http)(context.system)
a279a342bc78 Initial import
Tomas Zeman <tzeman@volny.cz>
parents:
diff changeset
    31
a279a342bc78 Initial import
Tomas Zeman <tzeman@volny.cz>
parents:
diff changeset
    32
  def receive: Receive = {
a279a342bc78 Initial import
Tomas Zeman <tzeman@volny.cz>
parents:
diff changeset
    33
    case uri: Uri =>
a279a342bc78 Initial import
Tomas Zeman <tzeman@volny.cz>
parents:
diff changeset
    34
      val rq = HttpRequest(HttpMethods.GET, uri = uri)
a279a342bc78 Initial import
Tomas Zeman <tzeman@volny.cz>
parents:
diff changeset
    35
      log.debug("Sending request {}", rq)
a279a342bc78 Initial import
Tomas Zeman <tzeman@volny.cz>
parents:
diff changeset
    36
      sendTo(io).withResponsesReceivedBy(self)(rq)
a279a342bc78 Initial import
Tomas Zeman <tzeman@volny.cz>
parents:
diff changeset
    37
    case x@ChunkedResponseStart(_) =>
a279a342bc78 Initial import
Tomas Zeman <tzeman@volny.cz>
parents:
diff changeset
    38
      log.debug("Received {}", x)
a279a342bc78 Initial import
Tomas Zeman <tzeman@volny.cz>
parents:
diff changeset
    39
    case MessageChunk(data, _) =>
a279a342bc78 Initial import
Tomas Zeman <tzeman@volny.cz>
parents:
diff changeset
    40
      log.debug("Received data chunk {}", data)
a279a342bc78 Initial import
Tomas Zeman <tzeman@volny.cz>
parents:
diff changeset
    41
      if (data.length > 2) // skip heartbeat messages
a279a342bc78 Initial import
Tomas Zeman <tzeman@volny.cz>
parents:
diff changeset
    42
        ChangesMarshaller(data).fold(
a279a342bc78 Initial import
Tomas Zeman <tzeman@volny.cz>
parents:
diff changeset
    43
          _ => log.warning("Failed to deserialize entity {}", data),
a279a342bc78 Initial import
Tomas Zeman <tzeman@volny.cz>
parents:
diff changeset
    44
          processor !
a279a342bc78 Initial import
Tomas Zeman <tzeman@volny.cz>
parents:
diff changeset
    45
        )
a279a342bc78 Initial import
Tomas Zeman <tzeman@volny.cz>
parents:
diff changeset
    46
    case x =>
a279a342bc78 Initial import
Tomas Zeman <tzeman@volny.cz>
parents:
diff changeset
    47
      log.debug("Received {}", x)
a279a342bc78 Initial import
Tomas Zeman <tzeman@volny.cz>
parents:
diff changeset
    48
  }
a279a342bc78 Initial import
Tomas Zeman <tzeman@volny.cz>
parents:
diff changeset
    49
}
a279a342bc78 Initial import
Tomas Zeman <tzeman@volny.cz>
parents:
diff changeset
    50
a279a342bc78 Initial import
Tomas Zeman <tzeman@volny.cz>
parents:
diff changeset
    51
a279a342bc78 Initial import
Tomas Zeman <tzeman@volny.cz>
parents:
diff changeset
    52
// vim: set ts=2 sw=2 et: