fear_tracker/fear_tracker/views.py

396 lines
15 KiB
Python

from collections import OrderedDict
from http import HTTPStatus
from io import BytesIO
import hashlib
import json
import qrcode
from asgiref.sync import async_to_sync
from channels.generic.http import AsyncHttpConsumer
from channels.layers import get_channel_layer
from django.db import transaction
from django.http import HttpResponse
from django.shortcuts import get_object_or_404, redirect, render
from django.views.decorators.http import require_safe, require_http_methods,\
require_POST
from django.urls import reverse
from .forms import NewGameForm, JoinGameForm, PlayerFormSet
from .models import Game, Player, Fear
def lookup_access_code(func):
def with_game(request, access_code, *args, **kwargs):
game = get_object_or_404(Game, access_code=access_code.lower())
return func(request, game, *args, **kwargs)
return with_game
@require_safe
def index(request):
return render(request, 'index.html')
@require_http_methods(["HEAD", "GET", "POST"])
def enter_code(request):
if request.method == 'POST':
form = JoinGameForm(request.POST)
if form.is_valid():
game = form.cleaned_data['game']
return redirect('game', access_code=game.access_code)
else:
form = JoinGameForm()
return render(request, 'enter_code.html', {'form': form})
@require_http_methods(["HEAD", "GET", "POST"])
def new_game(request):
if request.method == 'POST':
form = NewGameForm(request.POST)
formset = PlayerFormSet(request.POST)
if form.is_valid() and formset.is_valid():
player_forms = list(filter(lambda p: p.cleaned_data.get('name'),
formset.forms))
num_players = len(player_forms)
game = Game()
game.combined_growth_spirit =\
form.cleaned_data.get('combined_growth_spirit')
game.enable_events = form.cleaned_data.get('enable_events')
game.combined_event = form.cleaned_data.get('combined_event')
game.england_build = form.cleaned_data.get('england_build')
game.fear_per_card =\
form.cleaned_data.get('fear_per_player') * num_players
game.save()
for player_form in player_forms:
player = Player()
player.order = int(player_form.prefix[5:]) + 1
player.spirit = player_form.cleaned_data.get('spirit')
player.name = player_form.cleaned_data.get('name')
player.game = game
player.save()
game.advance_phase()
return redirect('game', access_code=game.access_code)
else:
form = NewGameForm()
initial_player_data = [{'spirit': i}
for (i, _) in Player.enumerate_spirit_names()]
formset = PlayerFormSet(initial=initial_player_data)
for player_form in formset:
player_form.player_id = int(player_form.prefix[5:]) + 1
return render(request, 'new_game.html',
{'form': form, 'formset': formset})
@require_safe
def qr_code(request, access_code):
join_url = reverse('game', kwargs={'access_code': access_code.lower()})
join_url = request.build_absolute_uri(join_url)
img = qrcode.make(join_url)
output = BytesIO()
img.save(output, "PNG")
return HttpResponse(output.getvalue(), content_type='image/png')
def load_players(game):
players = OrderedDict()
for player in game.player_set.order_by('order').all():
player.total_fear = 0
player.fear = OrderedDict()
players[player.order] = player
return players
def get_players_with_fear(game, current_phase=None, players=None):
if players is None:
players = load_players(game)
if current_phase is None:
current_phase = game.get_current_phase()
for fear in current_phase.fear_set.order_by('effect').all():
players[fear.player.order].fear[fear.effect] = {
'pure_fear': fear.pure_fear,
'towns': fear.towns_destroyed,
'cities': fear.cities_destroyed,
}
players[fear.player.order].total_fear +=\
fear.pure_fear + fear.towns_destroyed + 2*fear.cities_destroyed
return players
def game_status_object(game, current_phase=None, players_with_fear=None):
if current_phase is None:
current_phase = game.get_current_phase()
if players_with_fear is None:
players_with_fear = get_players_with_fear(game, current_phase)
players = OrderedDict()
for order, player in players_with_fear.items():
players[order] = {
'ready': player.ready,
'fear': player.fear,
'total_fear': player.total_fear
}
status_obj = {
'turn': game.game_turn,
'phase': game.get_current_phase_name(),
'phase_id': game.game_phase,
'available_fear_cards': game.num_available_fear_cards(),
'fear_to_next_card': game.get_fear_to_next_card(),
'fear_this_phase': current_phase.fear_this_phase(),
'total_fear': game.get_current_total_fear(),
'players': players,
'all_ready': all(player['ready'] for player in players.values()),
}
h = hashlib.sha256()
h.update(json.dumps(status_obj).encode('utf-8'))
status_hash = h.hexdigest()
status_obj['hash'] = status_hash
return status_obj
@transaction.atomic
@lookup_access_code
@require_POST
def update_game(request, game):
return handle_game_request(request, game, update=True)
@transaction.atomic
@lookup_access_code
@require_http_methods(["HEAD", "GET", "POST"])
def game(request, game):
return handle_game_request(request, game, update=False)
def handle_game_request(request, game, update):
# TODO Should check if game is over and redirect to summary table view.
players = load_players(game)
for player in players.values():
player.changed_to_ready = False
errors = []
fear_kind_dict = {
'fear': 'pure_fear',
'towns': 'towns_destroyed',
'cities': 'cities_destroyed',
}
current_value = None
current_phase = game.get_current_phase()
if request.method == 'POST':
post_data = request.POST
correct_phase =\
current_phase.game_turn == int(post_data['game_turn'])\
and current_phase.game_phase == int(post_data['game_phase'])
advance_phase = 'advance' in post_data
revert_phase = 'revert' in post_data
for key, value in post_data.items():
sections = key.split('-')
if key in ['csrfmiddlewaretoken',
'update', 'advance', 'revert',
'game_turn', 'game_phase', 'phase_name']\
or sections[-1] == 'orig':
pass
elif sections[0] == 'player':
orig_value = post_data.get(key + '-orig', None)
player_ord = int(sections[1])
player = players[player_ord]
if sections[2] == 'visible':
player.visible = True
elif sections[2] == 'ready':
if correct_phase and value != orig_value:
if value == 'true':
player.ready = True
player.changed_to_ready = True
else:
player.ready = False
player.save()
elif sections[2] == 'effect':
effect_num = int(sections[3])
effect_kind = sections[4]
if orig_value != value:
amount = int(value)
orig = int(orig_value)
if amount != orig:
if player.ready and not player.changed_to_ready:
errors.append(
f"{player.name} " +
f"({player.get_spirit_name()}) was " +
"marked as ready by another user, so " +
"your attempt to change their effect " +
f"#{effect_num} {effect_kind} " +
f"from {orig} to {amount} " +
"was not processed.")
elif not correct_phase or advance_phase:
old_phase = post_data['phase_name']
errors.append(
f"{player.name} " +
f"({player.get_spirit_name()})'s " +
f"#{effect_num} {effect_kind} " +
"could not be changed from " +
f"{orig} to {amount} " +
f"because {old_phase} has already ended.")
else:
fear_row, created = Fear.objects.get_or_create(
phase=current_phase,
player=player,
effect=effect_num)
prop_name = fear_kind_dict[effect_kind]
current_value = getattr(fear_row, prop_name)
if current_value == orig:
setattr(fear_row, prop_name, amount)
fear_row.save()
else:
errors.append(
f"{player.name} " +
f"({player.get_spirit_name()})'s " +
"effect " +
f"#{effect_num} {effect_kind} " +
"was changed by another user to " +
f"{current_value}, so " +
"your attempt to change it from " +
f"{orig} to {amount} " +
"was not processed.")
else:
raise Exception(key + ": " + value)
else:
raise Exception(key + ": " + value)
if advance_phase:
players_ready = all(player.ready for player in players.values())
if not players_ready:
errors.append("Cannot advance to next phase until " +
"all players are done with the current phase.")
else:
current_phase = game.advance_phase()
for player in players.values():
player.ready = False
elif revert_phase:
current_phase = game.revert_phase()
for player in players.values():
player.ready = True
if update:
if errors:
res = {
'success': False,
'errors': '\n'.join(errors)
}
if current_value is not None:
res['value'] = current_value
else:
res = {'success': True}
async_to_sync(get_channel_layer().group_send)(
"%s_status" % game.access_code,
{"type": "fear_tracker.invalidate_status"})
return HttpResponse(json.dumps(res))
players = get_players_with_fear(game, current_phase, players)
status_obj = game_status_object(game, current_phase, players)
status_string = json.dumps(status_obj)
async_to_sync(get_channel_layer().group_send)(
"%s_status" % game.access_code, {
"type": "fear_tracker.hashcode_seen",
"hashcode": status_obj['hash'],
"status_string": status_string,
})
for player in players.values():
info = player.fear
if not info:
new_effect = 0
else:
new_effect = max(info.keys()) + 1
info[new_effect] = {
'pure_fear': 0,
'towns': 0,
'cities': 0,
}
return render(request, 'game.html', {
'access_code': game.access_code,
'status': status_obj,
'status_json': status_string,
'players': players,
'range': range(21),
'errors': errors,
})
@lookup_access_code
@require_safe
def status(request, game, hashcode=None):
status_obj = game_status_object(game)
if hashcode == status_obj['hash']:
return HttpResponse(status=HTTPStatus.NOT_MODIFIED)
else:
status_string = json.dumps(status_obj)
async_to_sync(get_channel_layer().group_send)(
"%s_status" % game.access_code, {
"type": "fear_tracker.hashcode_seen",
"hashcode": status_obj['hash'],
"status_string": status_string,
})
return HttpResponse(status_string)
class StatusLongPollConsumer(AsyncHttpConsumer):
async def handle(self, body):
self.access_code = self.scope["url_route"]["kwargs"]["access_code"]
self.hashcode = self.scope["url_route"]["kwargs"]["hashcode"]
await self.channel_layer.group_add("%s_status" % self.access_code,
self.channel_name)
await self.channel_layer.group_send(
"%s_status" % self.access_code, {
"type": "fear_tracker.hashcode_seen",
"hashcode": self.hashcode,
})
async def http_request(self, message):
"""
Async entrypoint - concatenates body fragments and hands off control
to ``self.handle`` when the body has been completely received.
"""
if "body" in message:
self.body.append(message["body"])
if not message.get("more_body"):
await self.handle(b"".join(self.body))
async def disconnect(self):
await self.channel_layer.group_discard("%s_status" % self.access_code,
self.channel_name)
async def fear_tracker_hashcode_seen(self, event):
if self.hashcode != event["hashcode"]:
if "status_string" in event and event["status_string"]:
body = event["status_string"].encode('utf-8')
await self.send_response(200, body)
await self.disconnect()
await self.channel_layer.group_send(
"%s_status" % self.access_code, {
"type": "fear_tracker.invalidate_status",
})
async def fear_tracker_invalidate_status(self, event):
no_hash_status = reverse('status',
kwargs={'access_code': self.access_code})
await self.send_response(302, b'', headers=[
(b"Location", no_hash_status.encode('utf-8'))
])
await self.http_disconnect(None)