from collections import OrderedDict from http import HTTPStatus from io import BytesIO import hashlib import json import qrcode from asgiref.sync import async_to_sync from channels.generic.http import AsyncHttpConsumer from channels.layers import get_channel_layer from django.db import transaction from django.http import HttpResponse from django.shortcuts import get_object_or_404, redirect, render from django.views.decorators.http import require_safe, require_http_methods,\ require_POST from django.urls import reverse from .forms import NewGameForm, JoinGameForm, PlayerFormSet from .models import Game, Player, Fear def lookup_access_code(func): def with_game(request, access_code, *args, **kwargs): game = get_object_or_404(Game, access_code=access_code.lower()) return func(request, game, *args, **kwargs) return with_game @require_safe def index(request): return render(request, 'index.html') @require_http_methods(["HEAD", "GET", "POST"]) def enter_code(request): if request.method == 'POST': form = JoinGameForm(request.POST) if form.is_valid(): game = form.cleaned_data['game'] return redirect('game', access_code=game.access_code) else: form = JoinGameForm() return render(request, 'enter_code.html', {'form': form}) @require_http_methods(["HEAD", "GET", "POST"]) def new_game(request): if request.method == 'POST': form = NewGameForm(request.POST) formset = PlayerFormSet(request.POST) if form.is_valid() and formset.is_valid(): player_forms = list(filter(lambda p: p.cleaned_data.get('name'), formset.forms)) num_players = len(player_forms) game = Game() game.combined_growth_spirit =\ form.cleaned_data.get('combined_growth_spirit') game.enable_events = form.cleaned_data.get('enable_events') game.combined_event = form.cleaned_data.get('combined_event') game.england_build = form.cleaned_data.get('england_build') game.fear_per_card =\ form.cleaned_data.get('fear_per_player') * num_players game.save() for player_form in player_forms: player = Player() player.order = int(player_form.prefix[5:]) + 1 player.spirit = player_form.cleaned_data.get('spirit') player.name = player_form.cleaned_data.get('name') player.game = game player.save() game.advance_phase() return redirect('game', access_code=game.access_code) else: form = NewGameForm() initial_player_data = [{'spirit': i} for (i, _) in Player.enumerate_spirit_names()] formset = PlayerFormSet(initial=initial_player_data) for player_form in formset: player_form.player_id = int(player_form.prefix[5:]) + 1 return render(request, 'new_game.html', {'form': form, 'formset': formset}) @require_safe def qr_code(request, access_code): join_url = reverse('game', kwargs={'access_code': access_code.lower()}) join_url = request.build_absolute_uri(join_url) img = qrcode.make(join_url) output = BytesIO() img.save(output, "PNG") return HttpResponse(output.getvalue(), content_type='image/png') def load_players(game): players = OrderedDict() for player in game.player_set.order_by('order').all(): player.total_fear = 0 player.fear = OrderedDict() players[player.order] = player return players def get_players_with_fear(game, current_phase=None, players=None): if players is None: players = load_players(game) if current_phase is None: current_phase = game.get_current_phase() for fear in current_phase.fear_set.order_by('effect').all(): players[fear.player.order].fear[fear.effect] = { 'pure_fear': fear.pure_fear, 'towns': fear.towns_destroyed, 'cities': fear.cities_destroyed, } players[fear.player.order].total_fear +=\ fear.pure_fear + fear.towns_destroyed + 2*fear.cities_destroyed return players def game_status_object(game, current_phase=None, players_with_fear=None): if current_phase is None: current_phase = game.get_current_phase() if players_with_fear is None: players_with_fear = get_players_with_fear(game, current_phase) players = OrderedDict() for order, player in players_with_fear.items(): players[order] = { 'ready': player.ready, 'fear': player.fear, 'total_fear': player.total_fear } status_obj = { 'turn': game.game_turn, 'phase': game.get_current_phase_name(), 'phase_id': game.game_phase, 'available_fear_cards': game.num_available_fear_cards(), 'fear_to_next_card': game.get_fear_to_next_card(), 'fear_this_phase': current_phase.fear_this_phase(), 'total_fear': game.get_current_total_fear(), 'players': players, 'all_ready': all(player['ready'] for player in players.values()), } h = hashlib.sha256() h.update(json.dumps(status_obj).encode('utf-8')) status_hash = h.hexdigest() status_obj['hash'] = status_hash return status_obj @transaction.atomic @lookup_access_code @require_POST def update_game(request, game): return handle_game_request(request, game, update=True) @transaction.atomic @lookup_access_code @require_http_methods(["HEAD", "GET", "POST"]) def game(request, game): return handle_game_request(request, game, update=False) def handle_game_request(request, game, update): # TODO Should check if game is over and redirect to summary table view. players = load_players(game) for player in players.values(): player.changed_to_ready = False errors = [] fear_kind_dict = { 'fear': 'pure_fear', 'towns': 'towns_destroyed', 'cities': 'cities_destroyed', } current_value = None current_phase = game.get_current_phase() if request.method == 'POST': post_data = request.POST correct_phase =\ current_phase.game_turn == int(post_data['game_turn'])\ and current_phase.game_phase == int(post_data['game_phase']) advance_phase = 'advance' in post_data revert_phase = 'revert' in post_data for key, value in post_data.items(): sections = key.split('-') if key in ['csrfmiddlewaretoken', 'update', 'advance', 'revert', 'game_turn', 'game_phase', 'phase_name']\ or sections[-1] == 'orig': pass elif sections[0] == 'player': orig_value = post_data.get(key + '-orig', None) player_ord = int(sections[1]) player = players[player_ord] if sections[2] == 'visible': player.visible = True elif sections[2] == 'ready': if correct_phase and value != orig_value: if value == 'true': player.ready = True player.changed_to_ready = True else: player.ready = False player.save() elif sections[2] == 'effect': effect_num = int(sections[3]) effect_kind = sections[4] if orig_value != value: amount = int(value) orig = int(orig_value) if amount != orig: if player.ready and not player.changed_to_ready: errors.append( f"{player.name} " + f"({player.get_spirit_name()}) was " + "marked as ready by another user, so " + "your attempt to change their effect " + f"#{effect_num} {effect_kind} " + f"from {orig} to {amount} " + "was not processed.") elif not correct_phase or advance_phase: old_phase = post_data['phase_name'] errors.append( f"{player.name} " + f"({player.get_spirit_name()})'s " + f"#{effect_num} {effect_kind} " + "could not be changed from " + f"{orig} to {amount} " + f"because {old_phase} has already ended.") else: fear_row, created = Fear.objects.get_or_create( phase=current_phase, player=player, effect=effect_num) prop_name = fear_kind_dict[effect_kind] current_value = getattr(fear_row, prop_name) if current_value == orig: setattr(fear_row, prop_name, amount) fear_row.save() else: errors.append( f"{player.name} " + f"({player.get_spirit_name()})'s " + "effect " + f"#{effect_num} {effect_kind} " + "was changed by another user to " + f"{current_value}, so " + "your attempt to change it from " + f"{orig} to {amount} " + "was not processed.") else: raise Exception(key + ": " + value) else: raise Exception(key + ": " + value) if advance_phase: players_ready = all(player.ready for player in players.values()) if not players_ready: errors.append("Cannot advance to next phase until " + "all players are done with the current phase.") else: current_phase = game.advance_phase() for player in players.values(): player.ready = False elif revert_phase: current_phase = game.revert_phase() for player in players.values(): player.ready = True if update: if errors: res = { 'success': False, 'errors': '\n'.join(errors) } if current_value is not None: res['value'] = current_value else: res = {'success': True} async_to_sync(get_channel_layer().group_send)( "%s_status" % game.access_code, {"type": "fear_tracker.invalidate_status"}) return HttpResponse(json.dumps(res)) players = get_players_with_fear(game, current_phase, players) status_obj = game_status_object(game, current_phase, players) status_string = json.dumps(status_obj) async_to_sync(get_channel_layer().group_send)( "%s_status" % game.access_code, { "type": "fear_tracker.hashcode_seen", "hashcode": status_obj['hash'], "status_string": status_string, }) for player in players.values(): info = player.fear if not info: new_effect = 0 else: new_effect = max(info.keys()) + 1 info[new_effect] = { 'pure_fear': 0, 'towns': 0, 'cities': 0, } return render(request, 'game.html', { 'access_code': game.access_code, 'status': status_obj, 'status_json': status_string, 'players': players, 'range': range(21), 'errors': errors, }) @lookup_access_code @require_safe def status(request, game, hashcode=None): status_obj = game_status_object(game) if hashcode == status_obj['hash']: return HttpResponse(status=HTTPStatus.NOT_MODIFIED) else: status_string = json.dumps(status_obj) async_to_sync(get_channel_layer().group_send)( "%s_status" % game.access_code, { "type": "fear_tracker.hashcode_seen", "hashcode": status_obj['hash'], "status_string": status_string, }) return HttpResponse(status_string) class StatusLongPollConsumer(AsyncHttpConsumer): async def handle(self, body): self.access_code = self.scope["url_route"]["kwargs"]["access_code"] self.hashcode = self.scope["url_route"]["kwargs"]["hashcode"] await self.channel_layer.group_add("%s_status" % self.access_code, self.channel_name) await self.channel_layer.group_send( "%s_status" % self.access_code, { "type": "fear_tracker.hashcode_seen", "hashcode": self.hashcode, }) async def http_request(self, message): """ Async entrypoint - concatenates body fragments and hands off control to ``self.handle`` when the body has been completely received. """ if "body" in message: self.body.append(message["body"]) if not message.get("more_body"): await self.handle(b"".join(self.body)) async def disconnect(self): await self.channel_layer.group_discard("%s_status" % self.access_code, self.channel_name) async def fear_tracker_hashcode_seen(self, event): if self.hashcode != event["hashcode"]: if "status_string" in event and event["status_string"]: body = event["status_string"].encode('utf-8') await self.send_response(200, body) await self.disconnect() await self.channel_layer.group_send( "%s_status" % self.access_code, { "type": "fear_tracker.invalidate_status", }) async def fear_tracker_invalidate_status(self, event): no_hash_status = reverse('status', kwargs={'access_code': self.access_code}) await self.send_response(302, b'', headers=[ (b"Location", no_hash_status.encode('utf-8')) ]) await self.http_disconnect(None)